Freigeben über


Durchführen einer Batch-Ableitung mit einem Spark DataFrame

Von Bedeutung

Die Themen auf dieser Seite sind nur für Batchableitungsszenarien relevant, die keine für Batchableitungsszenarien optimierten Databricks-gehosteten Foundation-Modelle verwenden. Siehe Anwenden von KI auf Daten mithilfe von Azure Databricks AI Functions.

Diese Seite beschreibt, wie man Batch-Inferenz auf einem Spark DataFrame mithilfe eines registrierten Modells in Databricks ausführt. Der Workflow gilt für verschiedene Machine Learning- und Deep Learning-Modelle, einschließlich TensorFlow, PyTorch und Scikit-Learn. Es enthält bewährte Methoden zum Laden von Daten, modellieren und Leistungsoptimierungen.

Für den Modellrückschluss für Deep Learning-Anwendungen empfiehlt Azure Databricks den folgenden Workflow. Beispiele für Notizbücher, die TensorFlow und PyTorch verwenden, finden Sie unter Batch-Ableitungsbeispiele.

Modell-Inferenz-Workflow

Databricks empfiehlt den folgenden Workflow für die Durchführung von Batch-Inferenz mit Spark DataFrames.

Schritt 1: Umgebungseinrichtung

Stellen Sie sicher, dass Ihr Cluster eine kompatible Databricks ML-Runtime-Version ausführt, um der Schulungsumgebung zu entsprechen. Das Modell, das mit MLflow protokolliert wird, enthält die Anforderungen, die installiert werden können, um sicherzustellen, dass die Schulungs- und Ableitungsumgebungen übereinstimmen.

requirements_path = os.path.join(local_path, "requirements.txt")
if not os.path.exists(requirements_path):
    dbutils.fs.put("file:" + requirements_path, "", True)

%pip install -r $requirements_path
%restart_python

Schritt 2: Laden von Daten in Spark DataFrames

Verwenden Sie je nach Datentyp die entsprechende Methode, um Daten in einen Spark DataFrame zu laden:

Datentyp Methode
Tabelle aus Unity-Katalog (empfohlen) table = spark.table(input_table_name)
Bilddateien (JPG, PNG) files_df = spark.createDataFrame(map(lambda path: (path,), file_paths), ["path"])
TFRecords df = spark.read.format("tfrecords").load(image_path)
Andere Formate (Parquet, CSV, JSON, JDBC) Laden Sie mithilfe von Spark-Datenquellen.

Schritt 3: Laden des Modells aus der Modellregistrierung

In diesem Beispiel wird ein Modell aus der Databricks-Modellregistrierung für Rückschlüsse verwendet.

predict_udf = mlflow.pyfunc.spark_udf(spark, model_uri)

Schritt 4: Modellinferenz mithilfe von Pandas UDFs durchführen

Pandas UDFs nutzen Apache Arrow für eine effiziente Datenübertragung und Pandas zur Verarbeitung. Die typischen Schritte für die Ableitung mit Pandas UDFs sind:

  1. Laden Sie das trainierte Modell: Verwenden Sie MLflow, um eine Spark UDF für die Ableitung zu erstellen.
  2. Vorverarbeitung von Eingabedaten: Stellen Sie sicher, dass das Eingabeschema den Modellanforderungen entspricht.
  3. Ausführen der Modellvorhersage: Verwenden Sie die UDF-Funktion des Modells im DataFrame.
df_result = df_spark.withColumn("prediction", predict_udf(*df_spark.columns))
  1. (Empfohlen) Speichern Sie Vorhersagen im Unity-Katalog.

Im folgenden Beispiel werden Vorhersagen im Unity-Katalog gespeichert.

df_result.write.mode("overwrite").saveAsTable(output_table)

Leistungsoptimierung für Modellinferenz

Dieser Abschnitt enthält einige Tipps zum Debuggen und Leistungsoptimierung für Modellableitungen für Azure Databricks. Eine Übersicht finden Sie unter " Durchführen einer Batchrückleitung mithilfe eines Spark DataFrame".

In der Regel gibt es zwei Hauptkomponenten in der Modellleitung: Dateneingabepipeline und Modellinferenz. Die Dateneingabepipeline ist stark durch Daten-E/A-Eingaben belastet, und die Modellinferenz ist stark bei der Berechnung belastet. Die Ermittlung des Engpasses des Workflows ist einfach. Im Folgenden werden einige Ansätze erläutert:

  • Reduzieren Sie das Modell auf ein triviales Modell, und messen Sie die Beispiele pro Sekunde. Wenn die Differenz der End-to-End-Zeit zwischen dem vollständigen Modell und dem trivialen Modell minimal ist, ist die Dateneingabepipeline wahrscheinlich ein Engpass; andernfalls ist die Modellinferenz der Engpass.
  • Überprüfen Sie bei Ausführung der Modellausleitung mit GPU die GPU-Auslastungsmetriken. Wenn die GPU-Auslastung nicht kontinuierlich hoch ist, kann die Dateneingabepipeline der Engpass sein.

Optimieren der Dateneingabepipeline

Die Verwendung von GPUs kann die Laufgeschwindigkeit für Modellinferenz effizient optimieren. Da GPUs und andere Beschleuniger schneller werden, ist es wichtig, dass die Dateneingabepipeline mit der Nachfrage Schritt hält. Die Dateneingabepipeline liest die Daten in Spark DataFrames, transformiert sie und lädt sie als Eingabe für modellinferenz. Wenn die Dateneingabe der Engpass ist, sind hier einige Tipps zum Erhöhen des E/A-Durchsatzes:

  • Legen Sie die maximalen Datensätze pro Batch fest. Eine größere Anzahl von maximalen Datensätzen kann den E/A-Aufwand verringern, um die UDF-Funktion aufzurufen, solange die Datensätze in den Arbeitsspeicher passen können. Um die Batchgröße festzulegen, legen Sie die folgende Konfiguration fest:

    spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "5000")
    
  • Laden Sie die Daten in Batches und stellen Sie sie bei der Vorverarbeitung der Eingabedaten im Pandas UDF vor.

    Für TensorFlow empfiehlt Azure Databricks die Verwendung der tf.data-API. Sie können die Karte parallel analysieren, indem Sie num_parallel_calls in einer map Funktion festlegen und prefetch und batch für das Vorabrufen und Batching aufrufen.

    dataset.map(parse_example, num_parallel_calls=num_process).prefetch(prefetch_size).batch(batch_size)
    

    Für PyTorch empfiehlt Azure Databricks die Verwendung der DataLoader-Klasse. Sie können batch_size für die Batchverarbeitung und num_workers für das parallele Laden von Daten festlegen.

    torch.utils.data.DataLoader(images, batch_size=batch_size, num_workers=num_process)
    

Beispiele für Batch-Ableitungen

Die Beispiele in diesem Abschnitt folgen dem empfohlenen Deep Learning-Rückschlussworkflow. Diese Beispiele veranschaulichen, wie Modellrückschlüsse mit einem vortrainierten ResNets-Modell (Deep Residual Networks) erstellt werden.

Strukturierte Datenextraktion und Batchableitung mithilfe von Spark UDF

Das folgende Beispielnotizbuch veranschaulicht die Entwicklung, Protokollierung und Auswertung eines einfachen Agents für die strukturierte Datenextraktion, um unstrukturierte Rohdaten in organisierte, nutzbare Informationen durch automatisierte Extraktionstechniken zu transformieren. Dieser Ansatz veranschaulicht, wie benutzerdefinierte Agents für batchinference mithilfe der PythonModel MLflow-Klasse implementiert und das protokollierte Agent-Modell als Spark User-Defined Function (UDF) verwendet wird. Dieses Notizbuch zeigt auch, wie Sie die Mosaic AI Agent Evaluation nutzen, um die Genauigkeit mithilfe von Ground Truth Daten zu bewerten.

Strukturierte Datenextraktion und Batchableitung mithilfe von Spark UDF

Notebook abrufen