Partager via


Effectuer une inférence par lots à l’aide d’un DataFrame Spark

Important

Les rubriques de cette page sont uniquement pertinentes pour les scénarios d’inférence par lots qui n’utilisent pas les modèles de base hébergés par Databricks optimisés pour les scénarios d’inférence par lots. Consultez Appliquer l’IA aux données à l’aide d’Azure Databricks AI Functions.

Cette page explique comment effectuer une inférence par lots sur un DataFrame Spark à l’aide d’un modèle inscrit dans Databricks. Le flux de travail s’applique à différents modèles Machine Learning et Deep Learning, notamment TensorFlow, PyTorch et scikit-learn. Il inclut les meilleures pratiques pour le chargement des données, l’inférence de modèle et le réglage des performances.

Pour l’inférence de modèle pour les applications de Deep Learning, Azure Databricks recommande le workflow suivant. Pour obtenir des exemples de notebooks qui utilisent TensorFlow et PyTorch, consultez des exemples d’inférence Batch.

Workflow d’inférence de modèle

Databricks recommande le flux de travail suivant pour effectuer une inférence par lots à l’aide de DataFrames Spark.

Étape 1 : Configuration de l’environnement

Vérifiez que votre cluster exécute une version compatible de Databricks ML Runtime pour qu’il corresponde à l’environnement d’entraînement. Le modèle, journalisé à l’aide de MLflow, contient les exigences qui peuvent être installées pour garantir que les environnements d’apprentissage et d’inférence correspondent.

requirements_path = os.path.join(local_path, "requirements.txt")
if not os.path.exists(requirements_path):
    dbutils.fs.put("file:" + requirements_path, "", True)

%pip install -r $requirements_path
%restart_python

Étape 2 : Charger des données dans des DataFrames Spark

Selon le type de données, utilisez la méthode appropriée pour charger des données dans un DataFrame Spark :

Type de données Méthode
Table à partir du catalogue Unity (recommandé) table = spark.table(input_table_name)
Fichiers image (JPG, PNG) files_df = spark.createDataFrame(map(lambda path: (path,), file_paths), ["path"])
TFRecords df = spark.read.format("tfrecords").load(image_path)
Autres formats (Parquet, CSV, JSON, JDBC) Chargez à l’aide de sources de données Spark.

Étape 3 : Charger le modèle à partir du registre de modèles

Cet exemple utilise un modèle à partir du Registre de modèles Databricks pour l’inférence.

predict_udf = mlflow.pyfunc.spark_udf(spark, model_uri)

Étape 4 : Effectuer une inférence de modèle à l’aide des fonctions définies par l’utilisateur (UDF) de Pandas

Les UDFs (fonctions définies par l'utilisateur) de Pandas tirent parti d'Apache Arrow pour un transfert de données efficace et de pandas pour le traitement. Les étapes typiques de l'inférence avec les UDF pandas sont les suivantes :

  1. Chargez le modèle entraîné : utilisez MLflow pour créer une fonction UDF Spark pour l’inférence.
  2. Prétraitez les données d’entrée : vérifiez que le schéma d’entrée correspond aux exigences du modèle.
  3. Exécuter la prédiction de modèle : utilisez la fonction UDF du modèle sur le DataFrame.
df_result = df_spark.withColumn("prediction", predict_udf(*df_spark.columns))
  1. (Recommandé) Enregistrez les prédictions dans le catalogue Unity.

L’exemple suivant enregistre les prédictions dans le catalogue Unity.

df_result.write.mode("overwrite").saveAsTable(output_table)

Réglage des performances pour l’inférence de modèle

Cette section fournit des conseils pour le débogage et le réglage des performances pour l’inférence de modèle sur Azure Databricks. Pour obtenir une vue d’ensemble, consultez l’inférence par lots à l’aide d’un DataFrame Spark.

En règle générale, il existe deux parties principales dans l’inférence du modèle : le pipeline d’entrée de données et l’inférence de modèle. Le pipeline de traitement de données est consommateur en E/S de données et l’inférence de modèle requiert beaucoup de calculs. La détermination du goulot d’étranglement du flux de travail est simple. Voici quelques approches :

  • Réduisez le modèle à un modèle trivial et mesurez les exemples par seconde. Si la différence entre le temps de bout en bout entre le modèle complet et le modèle trivial est minimale, le pipeline d’entrée de données est probablement un goulot d’étranglement, sinon l’inférence du modèle est le goulot d’étranglement.
  • Si vous exécutez l’inférence de modèle avec GPU, vérifiez les métriques d’utilisation du GPU. Si l’utilisation du GPU n’est pas élevée en continu, le pipeline d’entrée de données peut être le goulot d’étranglement.

Optimiser le pipeline d’entrée de données

L’utilisation de GPU peut optimiser efficacement la vitesse d’exécution pour l’inférence du modèle. À mesure que les GPU et d’autres accélérateurs deviennent plus rapides, il est important que le pipeline d’entrée de données continue à respecter la demande. Le pipeline d’entrée de données lit les données dans des DataFrames Spark, les transforme et les charge en tant qu’entrée pour l’inférence du modèle. Si l’entrée de données est le goulot d’étranglement, voici quelques conseils pour augmenter le débit d’E/S :

  • Définissez les enregistrements max par lot. Un plus grand nombre d’enregistrements maximum peut réduire la surcharge d’E/S associée à l’appel de la fonction UDF, à condition que les enregistrements puissent tenir dans la mémoire. Pour définir la taille du lot, définissez la configuration suivante :

    spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "5000")
    
  • Chargez les données par lots et prérécupez-les lors du prétraitement des données d’entrée dans l’UDF pandas.

    Pour TensorFlow, Azure Databricks recommande d’utiliser l’API tf.data. Vous pouvez parcourir la carte en parallèle en définissant num_parallel_calls dans la fonction map et en appelant prefetch et batch pour la prérécupération et le traitement par lots.

    dataset.map(parse_example, num_parallel_calls=num_process).prefetch(prefetch_size).batch(batch_size)
    

    Pour PyTorch, Azure Databricks recommande d’utiliser la classe DataLoader. Vous pouvez définir batch_size pour le traitement par lots et num_workers pour le chargement de données parallèles.

    torch.utils.data.DataLoader(images, batch_size=batch_size, num_workers=num_process)
    

Exemples d’inférence batch

Les exemples dans cette section suivent le workflow d’inférence de Deep Learning recommandé. Ces exemples montrent comment effectuer l’inférence de modèle à l’aide d’un modèle de réseau neuronal de réseaux résiduels (ResNets) profond pré-formé.

Extraction de données structurées et inférence par lots à l’aide de la fonction UDF Spark

L’exemple de notebook suivant illustre le développement, la journalisation et l’évaluation d’un agent simple pour l’extraction de données structurées afin de transformer des données brutes et non structurées en informations organisées et utilisables par le biais de techniques d’extraction automatisée. Cette approche montre comment implémenter des agents personnalisés pour l’inférence par lots en utilisant la classe PythonModel de MLflow et en employant le modèle d’agent répertorié en tant que fonction Spark User-Defined (UDF). Ce notebook montre également comment tirer profit de l’évaluation de l’agent Mosaic AI pour évaluer l’exactitude en utilisant des données de vérité terrain.

Extraction de données structurées et inférence par lots à l’aide de la fonction UDF Spark

Obtenir un ordinateur portable