Udostępnij przez


Możliwość obserwowania w usłudze Azure Databricks dla zadań, potoków deklaratywnych platformy Spark w usłudze Lakeflow i Lakeflow Connect

Monitorowanie wydajności, kosztów i kondycji aplikacji przesyłania strumieniowego jest niezbędne do tworzenia niezawodnych, wydajnych potoków ETL. Usługa Azure Databricks udostępnia bogaty zestaw funkcji obserwowalności w zadaniach, deklaratywnych potokach Lakeflow Spark i Lakeflow Connect, aby ułatwić diagnozowanie wąskich gardeł, optymalizowanie wydajności oraz zarządzanie użyciem zasobów i kosztami.

W tym artykule opisano najlepsze rozwiązania w następujących obszarach:

  • Kluczowe metryki wydajności przesyłania strumieniowego
  • Schematy dziennika zdarzeń i przykładowe zapytania
  • Monitorowanie zapytań przesyłanych strumieniowo
  • Możliwość obserwowania kosztów przy użyciu tabel systemowych
  • Eksportowanie dzienników i metryk do narzędzi zewnętrznych

Kluczowe metryki dotyczące obserwowalności przesyłania strumieniowego

Podczas obsługi potoków przesyłania strumieniowego monitoruj następujące kluczowe metryki:

Metric Purpose
Backpressure Monitoruje liczbę plików i przesunięć (rozmiarów). Pomaga identyfikować wąskie gardła i zapewnia, że system jest w stanie obsługiwać dane przychodzące bez opóźnień.
Throughput Śledzi liczbę komunikatów przetworzonych na mikropartię. Oceń wydajność potoku i sprawdź, czy nadąża za pozyskiwaniem danych.
Duration Mierzy średni czas trwania mikrosadowej partii. Wskazuje szybkość przetwarzania i pomaga dostroić interwały wsadowe.
Latency Wskazuje, ile rekordów/komunikatów jest przetwarzanych w czasie. Pomaga zrozumieć opóźnienia na poziomie całego procesu i zoptymalizować je pod kątem mniejszych opóźnień.
Wykorzystanie klastra Odzwierciedla użycie procesora i pamięci (%). Zapewnia efektywne użycie zasobów i pomaga skalować klastry w celu spełnienia wymagań dotyczących przetwarzania.
Network Mierzy dane przesyłane i odbierane. Przydatne do identyfikowania wąskich gardeł sieci i poprawy wydajności transferu danych.
Checkpoint Identyfikuje przetworzone dane i wartości przesunięcia. Zapewnia spójność i zapewnia odporność na uszkodzenia podczas awarii.
Cost Pokazuje godzinowe, dzienne i miesięczne koszty aplikacji przesyłania strumieniowego. Pomoc w budżetowaniu i optymalizacji zasobów.
Lineage Wyświetla zestawy danych i warstwy utworzone w aplikacji przesyłania strumieniowego. Ułatwia przekształcanie danych, śledzenie, zapewnianie jakości i debugowanie.

Dzienniki i metryki klastra

Dzienniki i metryki klastra usługi Azure Databricks zapewniają szczegółowy wgląd w wydajność i wykorzystanie klastra. Te dzienniki i metryki obejmują informacje na temat procesora CPU, pamięci, operacji we/wy dysku, ruchu sieciowego i innych metryk systemu. Monitorowanie tych metryk ma kluczowe znaczenie dla optymalizacji wydajności klastra, wydajnego zarządzania zasobami i rozwiązywania problemów.

Dzienniki i metryki klastra usługi Azure Databricks oferują szczegółowy wgląd w wydajność klastra i wykorzystanie zasobów. Obejmują one użycie procesora CPU i pamięci, we/wy dysku i ruch sieciowy. Monitorowanie tych metryk ma kluczowe znaczenie dla:

  • Optymalizacja wydajności klastra.
  • Efektywne zarządzanie zasobami.
  • Rozwiązywanie problemów operacyjnych.

Metryki można wykorzystać za pomocą interfejsu użytkownika usługi Databricks lub wyeksportować je do osobistych narzędzi do monitorowania. Zobacz Przykład notatnika: metryki Datadog.

Interfejs użytkownika platformy Spark

Interfejs użytkownika platformy Spark zawiera szczegółowe informacje o postępie zadań i etapach, w tym liczbę ukończonych zadań, oczekujących i zakończonych niepowodzeniem. Pomaga to zrozumieć przebieg wykonywania i zidentyfikować wąskie gardła.

W przypadku aplikacji przesyłania strumieniowego karta Przesyłanie strumieniowe zawiera metryki, takie jak szybkość danych wejściowych, szybkość przetwarzania i czas trwania partii. Pomaga monitorować wydajność zadań przesyłania strumieniowego i identyfikować wszelkie problemy z pozyskiwaniem lub przetwarzaniem danych.

Aby uzyskać więcej informacji, zobacz Debugowanie za pomocą interfejsu użytkownika platformy Spark .

Metryki obliczeniowe

Metryki obliczeniowe pomogą Ci zrozumieć wykorzystanie klastra. W miarę uruchamiania zadania możesz zobaczyć, jak działa i jak wpływa na zasoby. Będzie można znaleźć ciśnienie pamięci, które może prowadzić do awarii Out Of Memory (OOM) lub obciążenie CPU, które może powodować długie opóźnienia. Poniżej przedstawiono konkretne metryki, które zobaczysz:

  • Dystrybucja obciążenia serwera: wykorzystanie procesora CPU każdego węzła w ciągu ostatniej minuty.
  • Użycie CPU: procent czasu spędzonego przez CPU w różnych trybach (na przykład użytkownik, system, bezczynny i iowait).
  • Wykorzystanie pamięci: łączne użycie pamięci przez każdy tryb (na przykład używane, wolne, buforowe i buforowane).
  • Wykorzystanie pamięci wymiennej: łączne użycie pamięci wymiennej.
  • Wolne miejsce w systemie plików: łączne użycie systemu plików przez każdy punkt instalacji.
  • Przepływność sieci: liczba bajtów odebranych i przesłanych za pośrednictwem sieci przez każde urządzenie.
  • Liczba aktywnych węzłów: liczba aktywnych węzłów w każdym znaczniku czasu dla danego obliczenia.

Aby uzyskać więcej informacji, zobacz Monitoruj wydajność i Wykresy metryk sprzętu.

Tabele systemowe

Monitorowanie kosztów

Tabele systemowe usługi Azure Databricks zapewniają ustrukturyzowane podejście do monitorowania kosztów i wydajności zadań. Tabele te obejmują:

  • Szczegóły przebiegu zadania.
  • Wykorzystanie zasobów.
  • Powiązane koszty.

Te tabele umożliwiają zrozumienie kondycji operacyjnej i wpływu finansowego.

Requirements

Aby użyć tabel systemowych do monitorowania kosztów:

  • Administrator konta musi włączyć element system.lakeflow schema.
  • Użytkownicy muszą albo:
    • Bądź zarówno administratorem metadanych, jak i administratorem konta lub
    • Mieć uprawnienia USE i SELECT na schematach systemowych.

Przykładowe zapytanie: najdroższe zadania (ostatnie 30 dni)

To zapytanie identyfikuje najdroższe zadania w ciągu ostatnich 30 dni, co pomaga w analizie kosztów i optymalizacji.

WITH list_cost_per_job AS (
     SELECT
       t1.workspace_id,
       t1.usage_metadata.job_id,
       COUNT(DISTINCT t1.usage_metadata.job_run_id) AS runs,
       SUM(t1.usage_quantity * list_prices.pricing.default) AS list_cost,
       FIRST(identity_metadata.run_as, true) AS run_as,
       FIRST(t1.custom_tags, true) AS custom_tags,
       MAX(t1.usage_end_time) AS last_seen_date
     FROM system.billing.usage t1
     INNER JOIN system.billing.list_prices list_prices ON
       t1.cloud = list_prices.cloud AND
       t1.sku_name = list_prices.sku_name AND
       t1.usage_start_time >= list_prices.price_start_time AND
       (t1.usage_end_time <= list_prices.price_end_time OR list_prices.price_end_time IS NULL)
     WHERE
       t1.billing_origin_product = "JOBS"
       AND t1.usage_date >= CURRENT_DATE() - INTERVAL 30 DAY
     GROUP BY ALL
   ),
   most_recent_jobs AS (
     SELECT
       *,
       ROW_NUMBER() OVER(PARTITION BY workspace_id, job_id ORDER BY change_time DESC) AS rn
     FROM
       system.lakeflow.jobs QUALIFY rn=1
   )
   SELECT
     t2.name,
     t1.job_id,
     t1.workspace_id,
     t1.runs,
     t1.run_as,
     SUM(list_cost) AS list_cost,
     t1.last_seen_date
   FROM list_cost_per_job t1
   LEFT JOIN most_recent_jobs t2 USING (workspace_id, job_id)
   GROUP BY ALL
   ORDER BY list_cost DESC

Potoki deklaratywne platformy Spark w usłudze Lakeflow

Dziennik zdarzeń Lakeflow Spark Deklaratywnych Potoków przechwytuje kompleksowy rekord wszystkich zdarzeń potoku, w tym:

  • Dzienniki inspekcji.
  • Kontrole jakości danych.
  • Postęp rurociągu.
  • Pochodzenie danych.

Dziennik zdarzeń jest automatycznie włączony dla wszystkich potoków deklaratywnych Lakeflow Spark i można uzyskać do niego dostęp za pośrednictwem:

  • Interfejs użytkownika potoku: Przeglądaj dzienniki bezpośrednio.
  • API potoków: dostęp za pośrednictwem programu.
  • Zapytanie bezpośrednie: Kwerenda tabeli dziennika zdarzeń.

Aby uzyskać więcej informacji, zobacz Schemat dziennika zdarzeń dla Lakeflow Spark Deklaratywnych Potoków.

Przykładowe zapytania

Te przykładowe zapytania pomagają monitorować wydajność i kondycję potoków, udostępniając kluczowe metryki takie jak czas trwania partii, przepustowość, ciśnienie wsteczne oraz wykorzystanie zasobów.

Średni czas trwania serii

To zapytanie oblicza średni czas trwania partii przetwarzanych przez potok.

SELECT
  (max_t - min_t) / batch_count as avg_batch_duration_seconds,
  batch_count,
  min_t,
  max_t,
  date_hr,
  message
FROM
  -- /60 for minutes
  (
    SELECT
      count(*) as batch_count,
      unix_timestamp(
        min(timestamp)
      ) as min_t,
      unix_timestamp(
        max(timestamp)
      ) as max_t,
      date_format(timestamp, 'yyyy-MM-dd:HH') as date_hr,
      message
    FROM
      event_log
    WHERE
      event_type = 'flow_progress'
      AND level = 'METRICS'
    GROUP BY
      date_hr,
      message
  )
ORDER BY
  date_hr desc

Średnia przepływność

To zapytanie oblicza średnią przepływność potoku pod względem przetworzonych wierszy na sekundę.

SELECT
  (max_t - min_t) / total_rows as avg_throughput_rps,
  total_rows,
  min_t,
  max_t,
  date_hr,
  message
FROM
  -- /60 for minutes
  (
    SELECT
      sum(
        details:flow_progress:metrics:num_output_rows
      ) as total_rows,
      unix_timestamp(
        min(timestamp)
      ) as min_t,
      unix_timestamp(
        max(timestamp)
      ) as max_t,
      date_format(timestamp, 'yyyy-MM-dd:HH') as date_hr,
      message
    FROM
      event_log
    WHERE
      event_type = 'flow_progress'
      AND level = 'METRICS'
    GROUP BY
      date_hr,
      message
  )
ORDER BY
  date_hr desc

Backpressure

Niniejsze zapytanie mierzy przeciążenie potoku, sprawdzając zaległości danych.

SELECT
  timestamp,
  DOUBLE(
    details:flow_progress:metrics:backlog_bytes
  ) AS backlog_bytes,
  DOUBLE(
    details:flow_progress:metrics:backlog_files
  ) AS backlog_files
FROM
  event_log
WHERE
  event_type = 'flow_progress'

Wykorzystanie klastrów i gniazd

To zapytanie zawiera informacje o wykorzystaniu klastrów lub gniazd używanych przez potok.

SELECT
  date_trunc("hour", timestamp) AS hour,
  AVG (
    DOUBLE (
      details:cluster_resources:num_task_slots
    )
  ) AS num_task_slots,
  AVG (
    DOUBLE (
      details:cluster_resources:avg_num_task_slots
    )
  ) AS avg_num_task_slots,
  AVG (
    DOUBLE (
      details:cluster_resources:num_executors
    )
  ) AS num_executors,
  AVG (
    DOUBLE (
      details:cluster_resources:avg_task_slot_utilization
    )
  ) AS avg_utilization,
  AVG (
    DOUBLE (
      details:cluster_resources:avg_num_queued_tasks
    )
  ) AS queue_size
FROM
  event_log
WHERE
  details : cluster_resources : avg_num_queued_tasks IS NOT NULL
  AND origin.update_id = '${latest_update_id}'
GROUP BY
  1;

Jobs

Można monitorować zapytania przesyłane strumieniowo w zadaniach za pomocą nasłuchiwacza zapytań strumieniowych.

Dołącz odbiornik do sesji platformy Spark, aby włączyć odbiornik zapytań przesyłanych strumieniowo w usłudzeAzure Databricks. Ten odbiornik będzie monitorować postęp i metryki zapytań przesyłanych strumieniowo. Może służyć do wypychania metryk do zewnętrznych narzędzi do monitorowania lub rejestrowania ich w celu dalszej analizy.

Przykład: Eksportowanie metryk do zewnętrznych narzędzi do monitorowania

Note

Jest to dostępne w środowisku Databricks Runtime 11.3 LTS i nowszym dla języków Python i Scala.

Metryki przesyłania strumieniowego można eksportować do usług zewnętrznych na potrzeby alertów lub pulpitów nawigacyjnych przy użyciu interfejsu StreamingQueryListener .

Oto podstawowy przykład implementacji odbiornika:

from pyspark.sql.streaming import StreamingQueryListener

class MyListener(StreamingQueryListener):
   def onQueryStarted(self, event):
       print("Query started: ", event.id)

   def onQueryProgress(self, event):
       print("Query made progress: ", event.progress)

   def onQueryTerminated(self, event):
       print("Query terminated: ", event.id)

spark.streams.addListener(MyListener())

Przykład: używanie odbiornika zapytań w usłudze Azure Databricks

Poniżej przedstawiono przykład dziennika zdarzeń StreamingQueryListener dla zapytania przesyłania strumieniowego platformy Kafka do usługi Delta Lake:

{
  "id": "210f4746-7caa-4a51-bd08-87cabb45bdbe",
  "runId": "42a2f990-c463-4a9c-9aae-95d6990e63f4",
  "timestamp": "2024-05-15T21:57:50.782Z",
  "batchId": 0,
  "batchDuration": 3601,
  "numInputRows": 20,
  "inputRowsPerSecond": 0.0,
  "processedRowsPerSecond": 5.55401277422938,
  "durationMs": {
    "addBatch": 1544,
    "commitBatch": 686,
    "commitOffsets": 27,
    "getBatch": 12,
    "latestOffset": 577,
    "queryPlanning": 105,
    "triggerExecution": 3600,
    "walCommit": 34
  },
  "stateOperators": [
    {
      "operatorName": "symmetricHashJoin",
      "numRowsTotal": 20,
      "numRowsUpdated": 20,
      "allUpdatesTimeMs": 473,
      "numRowsRemoved": 0,
      "allRemovalsTimeMs": 0,
      "commitTimeMs": 277,
      "memoryUsedBytes": 13120,
      "numRowsDroppedByWatermark": 0,
      "numShufflePartitions": 5,
      "numStateStoreInstances": 20,
      "customMetrics": {
        "loadedMapCacheHitCount": 0,
        "loadedMapCacheMissCount": 0,
        "stateOnCurrentVersionSizeBytes": 5280
      }
    }
  ],
  "sources": [
    {
      "description": "KafkaV2[Subscribe[topic-1]]",
      "numInputRows": 10,
      "inputRowsPerSecond": 0.0,
      "processedRowsPerSecond": 2.77700638711469,
      "metrics": {
        "avgOffsetsBehindLatest": "0.0",
        "estimatedTotalBytesBehindLatest": "0.0",
        "maxOffsetsBehindLatest": "0",
        "minOffsetsBehindLatest": "0"
      }
    },
    {
      "description": "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
      "numInputRows": 10,
      "inputRowsPerSecond": 0.0,
      "processedRowsPerSecond": 2.77700638711469,
      "metrics": {
        "numBytesOutstanding": "0",
        "numFilesOutstanding": "0"
      }
    }
  ]
}

Aby uzyskać więcej przykładów, zobacz: Przykłady.

Metryki postępu zapytania

Metryki postępu zapytań są niezbędne do monitorowania wydajności i kondycji zapytań przesyłanych strumieniowo. Te metryki obejmują liczbę wierszy wejściowych, współczynniki przetwarzania i różne czasy trwania związane z wykonywaniem zapytania. Możesz obserwować te metryki, dołączając element StreamingQueryListener do sesji platformy Spark. Odbiornik będzie emitować zdarzenia zawierające te metryki na końcu każdej epoki przesyłania strumieniowego.

Na przykład możesz uzyskać dostęp do metryk używając StreamingQueryProgress.observedMetrics mapy w metodzie odbiornika onQueryProgress. Umożliwia to śledzenie i analizowanie wydajności zapytań przesyłanych strumieniowo w czasie rzeczywistym.

class MyListener(StreamingQueryListener):
   def onQueryProgress(self, event):
       print("Query made progress: ", event.progress.observedMetrics)