Freigeben über


Observability in Azure Databricks für Aufträge, Lakeflow Spark Declarative Pipelines und Lakeflow Connect

Die Überwachung der Leistung, Kosten und Integrität Ihrer Streaminganwendungen ist für die Erstellung zuverlässiger, effizienter ETL-Pipelines unerlässlich. Azure Databricks bietet eine vielzahl von Observability-Features für Aufträge, Lakeflow Spark Declarative Pipelines und Lakeflow Connect, um Engpässe zu diagnostizieren, leistung zu optimieren und Ressourcennutzung und -kosten zu verwalten.

In diesem Artikel werden bewährte Methoden in den folgenden Bereichen beschrieben:

  • Wichtige Metriken zur Streamingleistung
  • Ereignisprotokollschemas und Beispielabfragen
  • Streamingabfrageüberwachung
  • Kostenbeobachtbarkeit mit Systemtabellen
  • Exportieren von Protokollen und Metriken in externe Tools

Wichtige Metriken für die Beobachtbarkeit von Streaming

Überwachen Sie beim Ausführen von Streamingpipelines die folgenden wichtigen Metriken:

Metric Purpose
Backpressure Überwacht die Anzahl der Dateien und Offsets (Größen). Hilft bei der Identifizierung von Engpässen und stellt sicher, dass das System eingehende Daten verarbeiten kann, ohne hinter sich zu fallen.
Throughput Erfasst die Anzahl der pro Mikrobatch verarbeiteten Nachrichten. Bewerten Sie die Pipelineeffizienz, und überprüfen Sie, ob sie mit der Datenaufnahme Schritt hält.
Duration Misst die durchschnittliche Dauer eines Micro-Batches. Gibt die Verarbeitungsgeschwindigkeit an und hilft beim Optimieren von Batchintervallen.
Latency Gibt an, wie viele Datensätze/Nachrichten im Laufe der Zeit verarbeitet werden. Hilft ihnen, End-to-End-Pipelineverzögerungen zu verstehen und für niedrigere Latenzen zu optimieren.
Clusterauslastung Spiegelt die CPU- und Speicherauslastung (%) wider. Stellt einen effizienten Ressourceneinsatz sicher und hilft Clustern zu skalieren, um die Verarbeitungsanforderungen zu erfüllen.
Network Misst übertragene und empfangene Daten. Nützlich zum Identifizieren von Netzwerkengpässen und zur Verbesserung der Leistung der Datenübertragung.
Checkpoint Identifiziert verarbeitete Daten und Offsets. Sorgt für Konsistenz und ermöglicht Fehlertoleranz bei Fehlern.
Cost Zeigt stündliche, tägliche und monatliche Kosten einer Streaminganwendung an. Unterstützung bei der Budgetierung und Ressourcenoptimierung.
Lineage Zeigt Datasets und Ebenen an, die in der Streaminganwendung erstellt wurden. Erleichtert die Datentransformation, Nachverfolgung, Qualitätssicherung und Debugging.

Clusterprotokolle und Metriken

Azure Databricks-Clusterprotokolle und -Metriken bieten detaillierte Einblicke in die Clusterleistung und -auslastung. Zu diesen Protokollen und Metriken gehören Informationen zu CPU, Arbeitsspeicher, Datenträger-E/A, Netzwerkdatenverkehr und anderen Systemmetriken. Die Überwachung dieser Metriken ist entscheidend, um die Clusterleistung zu optimieren, Ressourcen effizient zu verwalten und Probleme zu beheben.

Azure Databricks-Clusterprotokolle und Metriken bieten detaillierte Einblicke in die Clusterleistung und Ressourcennutzung. Dazu gehören CPU- und Arbeitsspeicherauslastung, Datenträger-E/A und Netzwerkdatenverkehr. Die Überwachung dieser Metriken ist für Folgendes wichtig:

  • Optimieren der Clusterleistung.
  • Effizientes Verwalten von Ressourcen.
  • Behandeln von Problemen im Betrieb.

Die Metriken können über die Databricks-Benutzeroberfläche genutzt oder in persönliche Überwachungstools exportiert werden. Siehe Notizbuchbeispiel: Datadog-Metriken.

Spark UI

Die Spark-Benutzeroberfläche zeigt detaillierte Informationen zum Fortschritt von Aufträgen und Phasen, einschließlich der Anzahl der abgeschlossenen, ausstehenden und fehlgeschlagenen Aufgaben. Auf diese Weise können Sie den Ausführungsfluss verstehen und Engpässe erkennen.

Für Streaminganwendungen zeigt die Registerkarte "Streaming" Metriken wie Eingaberate, Verarbeitungsrate und Batchdauer an. Es hilft Ihnen, die Leistung Ihrer Streamingaufträge zu überwachen und Datenaufnahme- oder Verarbeitungsprobleme zu identifizieren.

Weitere Informationen finden Sie unter Debuggen mit der Spark-Benutzeroberfläche .

Berechnen von Metriken

Die Berechnungsmetriken helfen Ihnen, die Clusterauslastung zu verstehen. Während Ihr Einzelvorgang ausgeführt wird, können Sie sehen, wie er gestaffelt wird und wie sich dies auf Ihre Ressourcen auswirkt. Sie können Speicherdruck, der zu OOM-Fehlern führen könnte, oder CPU-Druck, der lange Verzögerungen verursachen könnte, finden. Hier sind die spezifischen Metriken, die Sie sehen:

  • Serverlastverteilung: Die CPU-Auslastung jedes Knotens in der letzten Minute.
  • CPU-Auslastung: Der Prozentsatz der Zeit, die die CPU in verschiedenen Modi aufgewendet hat (z. B. Benutzer, System, Leerlauf und iowait).
  • Arbeitsspeicherauslastung: Gesamtspeicherauslastung nach jedem Modus (z. B. verwendet, frei, Puffer und zwischengespeichert).
  • Speichertauschauslastung: Gesamtauslastung des Speichertauschs.
  • Freier Dateisystemspeicher: Gesamtauslastung des Dateisystems durch jeden Bereitstellungspunkt.
  • Netzwerkdurchsatz: Die Anzahl der empfangenen und über das Netzwerk übertragenen Bytes pro Gerät.
  • Anzahl der aktiven Knoten: Die Anzahl der aktiven Knoten bei jedem Zeitstempel für die angegebene Berechnung.

Weitere Informationen finden Sie unter Leistungsüberwachung und Hardware-Metriken-Diagramme.

Systemtabellen

Kostenüberwachung

Azure Databricks-Systemtabellen bieten einen strukturierten Ansatz zur Überwachung der Auftragskosten und -leistung. Diese Tabellen enthalten:

  • Einzelheiten zum Einzelvorgang.
  • Ressourcenauslastung.
  • Verbundene Kosten.

Verwenden Sie diese Tabellen, um die betriebliche Gesundheit und finanzielle Auswirkungen zu verstehen.

Requirements

So verwenden Sie Systemtabellen für die Kostenüberwachung:

  • Ein Kontoadministrator muss die Option system.lakeflow schema.
  • Benutzer müssen eine der folgenden Aktionen ausführen:
    • Seien Sie sowohl ein Metastore-Administrator als auch ein Kontoadministrator, oder ...
    • Verfügen Sie über USE- und SELECT-Berechtigungen für die Systemschemata.

Beispielabfrage: Die teuersten Aufträge (letzte 30 Tage)

Diese Abfrage identifiziert die teuersten Aufträge in den letzten 30 Tagen und unterstützt die Kostenanalyse und Optimierung.

WITH list_cost_per_job AS (
     SELECT
       t1.workspace_id,
       t1.usage_metadata.job_id,
       COUNT(DISTINCT t1.usage_metadata.job_run_id) AS runs,
       SUM(t1.usage_quantity * list_prices.pricing.default) AS list_cost,
       FIRST(identity_metadata.run_as, true) AS run_as,
       FIRST(t1.custom_tags, true) AS custom_tags,
       MAX(t1.usage_end_time) AS last_seen_date
     FROM system.billing.usage t1
     INNER JOIN system.billing.list_prices list_prices ON
       t1.cloud = list_prices.cloud AND
       t1.sku_name = list_prices.sku_name AND
       t1.usage_start_time >= list_prices.price_start_time AND
       (t1.usage_end_time <= list_prices.price_end_time OR list_prices.price_end_time IS NULL)
     WHERE
       t1.billing_origin_product = "JOBS"
       AND t1.usage_date >= CURRENT_DATE() - INTERVAL 30 DAY
     GROUP BY ALL
   ),
   most_recent_jobs AS (
     SELECT
       *,
       ROW_NUMBER() OVER(PARTITION BY workspace_id, job_id ORDER BY change_time DESC) AS rn
     FROM
       system.lakeflow.jobs QUALIFY rn=1
   )
   SELECT
     t2.name,
     t1.job_id,
     t1.workspace_id,
     t1.runs,
     t1.run_as,
     SUM(list_cost) AS list_cost,
     t1.last_seen_date
   FROM list_cost_per_job t1
   LEFT JOIN most_recent_jobs t2 USING (workspace_id, job_id)
   GROUP BY ALL
   ORDER BY list_cost DESC

Lakeflow Spark Declarative Pipelines

Das Lakeflow Spark Declarative Pipelines-Ereignisprotokoll erfasst einen umfassenden Datensatz aller Pipelineereignisse, darunter:

  • Überwachungsprotokolle.
  • Datenqualitätsprüfungen.
  • Fortschritt der Pipeline.
  • Datenherkunft und -verlauf.

Das Ereignisprotokoll wird automatisch für alle Lakeflow Spark Declarative Pipelines aktiviert und kann über:

  • Pipeline-Benutzeroberfläche: Anzeigen von Protokollen direkt.
  • Pipelines-API: Programmgesteuerter Zugriff.
  • Direkte Abfrage: Abfrage der Ereignisprotokolltabelle.

Weitere Informationen finden Sie im Ereignisprotokollschema für Lakeflow Spark Declarative Pipelines.

Beispielabfragen

Diese Beispielabfragen helfen dabei, die Leistung und den Status von Pipelines zu überwachen, indem wichtige Metriken wie Batchdauer, Durchsatz, Rückdruck und Ressourcenauslastung bereitgestellt werden.

Durchschnittliche Batchdauer

Diese Abfrage berechnet die durchschnittliche Dauer der von der Pipeline verarbeiteten Batches.

SELECT
  (max_t - min_t) / batch_count as avg_batch_duration_seconds,
  batch_count,
  min_t,
  max_t,
  date_hr,
  message
FROM
  -- /60 for minutes
  (
    SELECT
      count(*) as batch_count,
      unix_timestamp(
        min(timestamp)
      ) as min_t,
      unix_timestamp(
        max(timestamp)
      ) as max_t,
      date_format(timestamp, 'yyyy-MM-dd:HH') as date_hr,
      message
    FROM
      event_log
    WHERE
      event_type = 'flow_progress'
      AND level = 'METRICS'
    GROUP BY
      date_hr,
      message
  )
ORDER BY
  date_hr desc

Durchschnittlicher Durchsatz

Diese Abfrage berechnet den durchschnittlichen Durchsatz der Pipeline in Bezug auf verarbeitete Zeilen pro Sekunde.

SELECT
  (max_t - min_t) / total_rows as avg_throughput_rps,
  total_rows,
  min_t,
  max_t,
  date_hr,
  message
FROM
  -- /60 for minutes
  (
    SELECT
      sum(
        details:flow_progress:metrics:num_output_rows
      ) as total_rows,
      unix_timestamp(
        min(timestamp)
      ) as min_t,
      unix_timestamp(
        max(timestamp)
      ) as max_t,
      date_format(timestamp, 'yyyy-MM-dd:HH') as date_hr,
      message
    FROM
      event_log
    WHERE
      event_type = 'flow_progress'
      AND level = 'METRICS'
    GROUP BY
      date_hr,
      message
  )
ORDER BY
  date_hr desc

Backpressure

Diese Abfrage misst den Rückdruck der Pipeline, indem der Datenrückstau überprüft wird.

SELECT
  timestamp,
  DOUBLE(
    details:flow_progress:metrics:backlog_bytes
  ) AS backlog_bytes,
  DOUBLE(
    details:flow_progress:metrics:backlog_files
  ) AS backlog_files
FROM
  event_log
WHERE
  event_type = 'flow_progress'

Cluster- und Slotauslastung

Diese Abfrage enthält Einblicke in die Nutzung von Clustern oder Slots, die von der Pipeline verwendet werden.

SELECT
  date_trunc("hour", timestamp) AS hour,
  AVG (
    DOUBLE (
      details:cluster_resources:num_task_slots
    )
  ) AS num_task_slots,
  AVG (
    DOUBLE (
      details:cluster_resources:avg_num_task_slots
    )
  ) AS avg_num_task_slots,
  AVG (
    DOUBLE (
      details:cluster_resources:num_executors
    )
  ) AS num_executors,
  AVG (
    DOUBLE (
      details:cluster_resources:avg_task_slot_utilization
    )
  ) AS avg_utilization,
  AVG (
    DOUBLE (
      details:cluster_resources:avg_num_queued_tasks
    )
  ) AS queue_size
FROM
  event_log
WHERE
  details : cluster_resources : avg_num_queued_tasks IS NOT NULL
  AND origin.update_id = '${latest_update_id}'
GROUP BY
  1;

Jobs

Sie können Streamingabfragen in Aufträgen über den Streamingabfragelistener überwachen.

Fügen Sie einen Listener an die Spark-Sitzung an, um den Streamingabfragelistener inAzure Databricks zu aktivieren. Dieser Listener überwacht den Fortschritt und die Metriken Ihrer Streamingabfragen. Es kann verwendet werden, um Metriken an externe Überwachungstools zu übertragen oder zur weiteren Analyse zu protokollieren.

Beispiel: Exportieren von Metriken in externe Überwachungstools

Note

Dies ist in Databricks Runtime 11.3 LTS und höher für Python und Scala verfügbar.

Sie können Streamingmetriken mithilfe der StreamingQueryListener Schnittstelle in externe Dienste exportieren, um Warnungen oder Dashboarding zu erhalten.

Hier ist ein einfaches Beispiel für die Implementierung eines Listeners:

from pyspark.sql.streaming import StreamingQueryListener

class MyListener(StreamingQueryListener):
   def onQueryStarted(self, event):
       print("Query started: ", event.id)

   def onQueryProgress(self, event):
       print("Query made progress: ", event.progress)

   def onQueryTerminated(self, event):
       print("Query terminated: ", event.id)

spark.streams.addListener(MyListener())

Beispiel: Verwenden des Abfragelisteners in Azure Databricks

Nachfolgend sehen Sie ein Beispiel für ein StreamingQueryListener-Ereignisprotokoll für eine Kafka-zu-Delta Lake-Streamingabfrage:

{
  "id": "210f4746-7caa-4a51-bd08-87cabb45bdbe",
  "runId": "42a2f990-c463-4a9c-9aae-95d6990e63f4",
  "timestamp": "2024-05-15T21:57:50.782Z",
  "batchId": 0,
  "batchDuration": 3601,
  "numInputRows": 20,
  "inputRowsPerSecond": 0.0,
  "processedRowsPerSecond": 5.55401277422938,
  "durationMs": {
    "addBatch": 1544,
    "commitBatch": 686,
    "commitOffsets": 27,
    "getBatch": 12,
    "latestOffset": 577,
    "queryPlanning": 105,
    "triggerExecution": 3600,
    "walCommit": 34
  },
  "stateOperators": [
    {
      "operatorName": "symmetricHashJoin",
      "numRowsTotal": 20,
      "numRowsUpdated": 20,
      "allUpdatesTimeMs": 473,
      "numRowsRemoved": 0,
      "allRemovalsTimeMs": 0,
      "commitTimeMs": 277,
      "memoryUsedBytes": 13120,
      "numRowsDroppedByWatermark": 0,
      "numShufflePartitions": 5,
      "numStateStoreInstances": 20,
      "customMetrics": {
        "loadedMapCacheHitCount": 0,
        "loadedMapCacheMissCount": 0,
        "stateOnCurrentVersionSizeBytes": 5280
      }
    }
  ],
  "sources": [
    {
      "description": "KafkaV2[Subscribe[topic-1]]",
      "numInputRows": 10,
      "inputRowsPerSecond": 0.0,
      "processedRowsPerSecond": 2.77700638711469,
      "metrics": {
        "avgOffsetsBehindLatest": "0.0",
        "estimatedTotalBytesBehindLatest": "0.0",
        "maxOffsetsBehindLatest": "0",
        "minOffsetsBehindLatest": "0"
      }
    },
    {
      "description": "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
      "numInputRows": 10,
      "inputRowsPerSecond": 0.0,
      "processedRowsPerSecond": 2.77700638711469,
      "metrics": {
        "numBytesOutstanding": "0",
        "numFilesOutstanding": "0"
      }
    }
  ]
}

Weitere Beispiele finden Sie unter: Beispiele.

Abfrage von Fortschrittsmetriken

Abfragefortschrittsmetriken sind für die Überwachung der Leistung und des Zustands Ihrer Streaming-Abfragen unerlässlich. Zu diesen Metriken gehören die Anzahl der Eingabezeilen, Verarbeitungsraten und verschiedene Dauern im Zusammenhang mit der Abfrageausführung. Sie können diese Metriken beobachten, indem Sie eine StreamingQueryListener an die Spark-Sitzung anfügen. Der Listener sendet am Ende jeder Streaming-Epoche Ereignisse aus, die diese Metriken enthalten.

Sie können zum Beispiel auf Metriken zugreifen, indem Sie die StreamingQueryProgress.observedMetrics Map in der Listener's onQueryProgress Methode. Auf diese Weise können Sie die Leistung Ihrer Streamingabfragen in Echtzeit nachverfolgen und analysieren.

class MyListener(StreamingQueryListener):
   def onQueryProgress(self, event):
       print("Query made progress: ", event.progress.observedMetrics)