Compartilhar via


Executar inferência em lote usando um DataFrame do Spark

Importante

Os tópicos nesta página são relevantes apenas para cenários de inferência em lotes que não usam modelos de base hospedados pelo Databricks otimizados para cenários de inferência em lotes. Consulte Aplicar IA em dados usando o Azure Databricks AI Functions.

Esta página descreve como executar a inferência em lote em um DataFrame do Spark usando um modelo registrado no Databricks. O fluxo de trabalho se aplica a vários modelos de aprendizado de máquina e aprendizado profundo, incluindo TensorFlow, PyTorch e scikit-learn. Ele inclui práticas recomendadas para carregamento de dados, inferência de modelo e ajuste de desempenho.

Para a inferência de modelo para aplicativos de aprendizado profundo, o Azure Databricks recomenda o fluxo de trabalho a seguir. Por exemplo, notebooks que usam TensorFlow e PyTorch, veja exemplos de inferência em lote.

Fluxo de trabalho de inferência de modelo

O Databricks recomenda o fluxo de trabalho a seguir para executar a inferência em lote usando DataFrames do Spark.

Etapa 1: Configuração do ambiente

Verifique se o cluster executa uma versão compatível do Databricks ML Runtime para corresponder ao ambiente de treinamento. O modelo, registrado usando o MLflow, contém os requisitos que podem ser instalados para garantir que os ambientes de treinamento e inferência correspondam.

requirements_path = os.path.join(local_path, "requirements.txt")
if not os.path.exists(requirements_path):
    dbutils.fs.put("file:" + requirements_path, "", True)

%pip install -r $requirements_path
%restart_python

Etapa 2: Carregar dados em DataFrames do Spark

Dependendo do tipo de dados, use o método apropriado para carregar dados em um DataFrame do Spark:

Tipo de dados Método
Tabela do Unity Catalog (Recomendado) table = spark.table(input_table_name)
Arquivos de imagem (JPG, PNG) files_df = spark.createDataFrame(map(lambda path: (path,), file_paths), ["path"])
TFRecords df = spark.read.format("tfrecords").load(image_path)
Outros formatos (Parquet, CSV, JSON, JDBC) Carregue usando fontes de dados do Spark.

Etapa 3: Carregar modelo do registro de modelo

Este exemplo usa um modelo do Registro de Modelo do Databricks para inferência.

predict_udf = mlflow.pyfunc.spark_udf(spark, model_uri)

Etapa 4: Executar inferência de modelo usando UDFs do Pandas

As UDFs do Pandas aproveitam o Apache Arrow para transferência eficiente de dados e pandas para processamento. As etapas típicas para inferência com UDFs do pandas são:

  1. Carregue o modelo treinado: use o MLflow para criar um UDF do Spark para inferência.
  2. Pré-processar dados de entrada: verifique se o esquema de entrada corresponde aos requisitos do modelo.
  3. Executar previsão de modelo: use a função UDF do modelo no DataFrame.
df_result = df_spark.withColumn("prediction", predict_udf(*df_spark.columns))
  1. (Recomendado) Salve previsões no Catálogo do Unity.

O exemplo seguinte salva previsões no Unity Catalog.

df_result.write.mode("overwrite").saveAsTable(output_table)

Ajuste de desempenho para inferência de modelo

Esta seção fornece algumas dicas para depuração e ajuste de desempenho para inferência de modelo no Azure Databricks. Para obter uma visão geral, consulte Executar inferência em lote usando um DataFrame do Spark.

Normalmente, há duas partes principais na inferência do modelo: pipeline de entrada de dados e inferência de modelo. O pipeline de entrada de dados é intenso em termos de entrada/saída de dados, e a inferência do modelo é intensiva em computação. Determinar o gargalo do fluxo de trabalho é simples. Aqui estão algumas abordagens:

  • Reduza o modelo a um modelo trivial e meça os exemplos por segundo. Se a diferença no tempo de ponta a ponta entre o modelo completo e o modelo trivial for mínima, o pipeline de entrada de dados provavelmente será um gargalo; caso contrário, a inferência do modelo será o gargalo.
  • Se estiver executando a inferência do modelo com GPU, verifique as métricas de utilização da GPU. Se a utilização de GPU não for continuamente alta, o fluxo de entrada de dados poderá ser o gargalo.

Otimizar o pipeline de entrada de dados

O uso de GPUs pode otimizar com eficiência a velocidade de execução para inferência de modelo. À medida que GPUs e outros aceleradores se tornam mais rápidos, é importante que o pipeline de entrada de dados acompanhe a demanda. O pipeline de entrada de dados coleta os dados nos DataFrames do Spark, os transforma e os carrega como entrada para a inferência do modelo. Se a entrada de dados for o gargalo, aqui estão algumas dicas para aumentar a taxa de transferência de E/S:

  • Defina os registros máximos por lote. Um número maior de registros máximos pode reduzir a sobrecarga de E/S para chamar a função UDF, desde que os registros possam caber na memória. Para definir o tamanho do lote, defina a seguinte configuração:

    spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "5000")
    
  • Carregue os dados em lotes e pré-processe-os ao pré-processar os dados de entrada na UDF pandas.

    Para TensorFlow, o Azure Databricks recomenda usar a API tf.data. Você pode analisar o mapa em paralelo, definindo num_parallel_calls em uma função map e chamando prefetch e batch para pré-busca e envio em lote.

    dataset.map(parse_example, num_parallel_calls=num_process).prefetch(prefetch_size).batch(batch_size)
    

    Para PyTorch, o Azure Databricks recomenda o uso da classe DataLoader. Você pode definir batch_size para envio em lote e num_workers para carregamento de dados paralelos.

    torch.utils.data.DataLoader(images, batch_size=batch_size, num_workers=num_process)
    

Exemplos de inferência em lote

Os exemplos desta seção seguem o fluxo de trabalho recomendado de inferência de aprendizado profundo. Esses exemplos mostram como executar a inferência de modelo com um modelo pré-treinado de rede neural ResNets (redes residuais profundas).

Extração de dados estruturados e inferência em lote usando a UDF do Spark

O notebook de exemplo a seguir demonstra o desenvolvimento, o registro em log e a avaliação de um agente simples para extração de dados estruturados para transformar dados brutos e não estruturados em informações organizadas e utilizáveis por meio de técnicas de extração automatizadas. Essa abordagem demonstra como implementar agentes personalizados para inferência em lotes usando a classe do PythonModel MLflow e empregar o modelo de agente registrado como uma UDF (Função User-Defined Spark). Esse notebook também mostra como aproveitar a Avaliação do Mosaic AI Agent para avaliar a precisão usando dados reais.

Extração de dados estruturados e inferência em lote usando a UDF do Spark

Obter laptop