Compartilhar via


Carregar dados usando o Mosaic Streaming

Este artigo descreve como usar o Mosaico Streaming para converter dados do Apache Spark em um formato compatível com o PyTorch.

O Mosaic Streaming é uma biblioteca de carregamento de dados de código aberto. Ele permite o treinamento de nó único ou distribuído e a avaliação de modelos de aprendizado profundo de conjuntos de dados que já estão carregados como DataFrames do Apache Spark. O Mosaic Streaming dá suporte principalmente ao Mosaic Composer, mas também se integra ao PyTorch nativo, PyTorch Lightning e ao TorchDistributor. O Mosaic Streaming oferece uma série de benefícios em relação aos DataLoaders tradicionais do PyTorch, incluindo:

  • Compatibilidade com qualquer tipo de dados, incluindo imagens, texto, vídeo e dados multimodais.
  • Suporte para grandes provedores de armazenamento em nuvem (AWS, OCI, GCS, Azure, Databricks UC Volume e qualquer repositório de objetos compatível com S3, como Cloudflare R2, Coreweave, Backblaze b2, etc.)
  • Maximizando garantias de correção, desempenho, flexibilidade e facilidade de uso. Para obter mais informações, confira a página principais recursos.

Para obter informações gerais sobre o Mosaic Streaming, leia a Documentação da API de Streaming.

Observação

O Mosaic Streaming foi pré-instalado em todas as versões do Databricks Runtime 15.2 ML e superior.

Carregar dados de DataFrames do Spark usando o Mosaic Streaming

O Mosaic Streaming fornece um fluxo de trabalho simples para converter do Apache Spark no formato MDS (Mosaic Data Shard), que pode ser carregado para uso em um ambiente distribuído.

O fluxo de trabalho recomendado é:

  1. Usar o Apache Spark para carregar e, opcionalmente, processar previamente os dados.
  2. Use streaming.base.converters.dataframe_to_mds para salvar o dataframe em disco para armazenamento transitório e/ou em um volume do Catálogo do Unity para armazenamento persistente. Esses dados serão armazenados no formato MDS e poderão ser otimizados com suporte para compactação e hash. Casos de uso avançado também podem incluir o pré-processamento de dados usando UDFs. Veja o tutorial DataFrame do Spark para MDS para obter mais informações.
  3. Use streaming.StreamingDataset para carregar os dados necessários na memória. StreamingDataset é uma versão do IterableDataset do PyTorch que apresenta embaralhamento elasticamente determinístico, o que permite a rápida retomada no meio de uma época. Consulte a documentação do StreamingDataset para obter mais informações.
  4. Use streaming.StreamingDataLoader para carregar os dados necessários para treinamento, avaliação e teste. StreamingDataLoader é uma versão do DataLoader do PyTorch que fornece uma interface de ponto de verificação/retomada adicional, para a qual controla o número de amostras vistas pelo modelo nessa classificação.

Para ver exemplos de ponta a ponta, confira os seguintes notebooks:

Simplificar o carregamento de dados do Spark para o PyTorch usando o notebook do Mosaic Streaming

Obter notebook

Resolução de problemas

Erro de autenticação

Se você vir o seguinte erro ao carregar dados de um volume do Catálogo do Unity usando StreamingDataset, configure as variáveis de ambiente, conforme mostrado abaixo.

ValueError: default auth: cannot configure default credentials, please check https://docs.databricks.com/en/dev-tools/auth.html#databricks-client-unified-authentication to configure credentials for your preferred authentication method.

Observação

Se você vir esse erro ao executar o treinamento distribuído usando TorchDistributor, também deverá definir as variáveis de ambiente nos nós de trabalho.

db_host = "https://your-databricks-host.databricks.com"
db_token = "YOUR API TOKEN" # Create a token with either method from https://docs.databricks.com/en/dev-tools/auth/index.html#databricks-authentication-methods

def your_training_function():
  import os
  os.environ['DATABRICKS_HOST'] = db_host
  os.environ['DATABRICKS_TOKEN'] = db_token

# The above function can be distributed with TorchDistributor:
# from pyspark.ml.torch.distributor import TorchDistributor
# distributor = TorchDistributor(...)
# distributor.run(your_training_function)

Problemas de memória compartilhada do Python 3.11

Devido a problemas com a implementação de memória compartilhada do Python 3.11, StreamingDataset pode encontrar problemas transitórios no Databricks Runtime 15.4 LTS para Machine Learning. Você pode evitar esses problemas atualizando para o Databricks Runtime 16.4 LTS para Machine Learning, pois o Python 3.12 resolve esses problemas.