Compartir a través de


Observabilidad en Azure Databricks para tareas, canalizaciones declarativas de Lakeflow Spark y Lakeflow Connect

La supervisión del rendimiento, el costo y el estado de las aplicaciones de streaming es esencial para crear canalizaciones de ETL confiables y eficaces. Azure Databricks proporciona un amplio conjunto de capacidades de observabilidad para Jobs, canalizaciones declarativas de Spark de Lakeflow y Lakeflow Connect para ayudar a diagnosticar cuellos de botella, optimizar el rendimiento y gestionar tanto el uso como los costos de los recursos.

En este artículo se describen los procedimientos recomendados en las siguientes áreas:

  • Métricas clave de rendimiento de streaming
  • Esquemas de registro de eventos y consultas de ejemplo
  • Supervisión de consultas de streaming
  • Observabilidad de costos mediante tablas del sistema
  • Exportación de registros y métricas a herramientas externas

Métricas clave para la observabilidad de streaming

Al operar canalizaciones de streaming, supervise las siguientes métricas clave:

Metric Purpose
Backpressure Supervisa el número de archivos y desplazamientos (tamaños). Ayuda a identificar cuellos de botella y garantiza que el sistema pueda controlar los datos entrantes sin que se quede atrás.
Throughput Realiza un seguimiento del número de mensajes procesados por microproceso. Evalúe la eficacia de la canalización y compruebe que sigue el ritmo de la ingesta de datos.
Duration Mide la duración media de un microproceso. Indica la velocidad de procesamiento y ayuda a optimizar los intervalos por lotes.
Latency Indica cuántos registros o mensajes se procesan con el tiempo. Ayuda a comprender los retrasos de la canalización de un extremo a otro y a optimizar las latencias más bajas.
Uso del clúster Refleja el uso de cpu y memoria (%). Garantiza un uso eficaz de los recursos y ayuda a escalar clústeres para satisfacer las demandas de procesamiento.
Network Mide los datos transferidos y recibidos. Resulta útil para identificar cuellos de botella de red y mejorar el rendimiento de la transferencia de datos.
Checkpoint Identifica los datos procesados y los desplazamientos. Garantiza la coherencia y habilita la tolerancia a errores durante los errores.
Cost Muestra los costos por hora, diario y mensual de una aplicación de streaming. Ayuda en la optimización de presupuestos y recursos.
Lineage Muestra conjuntos de datos y capas creados en la aplicación de streaming. Facilita la transformación, el seguimiento, la garantía de calidad y la depuración de datos.

Registros y métricas del clúster

Los registros y métricas del clúster de Azure Databricks proporcionan información detallada sobre el rendimiento y el uso del clúster. Estos registros y métricas incluyen información sobre cpu, memoria, E/S de disco, tráfico de red y otras métricas del sistema. La supervisión de estas métricas es fundamental para optimizar el rendimiento del clúster, administrar los recursos de forma eficaz y solucionar problemas.

Los registros y las métricas del clúster de Azure Databricks ofrecen información detallada sobre el rendimiento del clúster y el uso de recursos. Entre ellas se incluyen el uso de CPU y memoria, la E/S de disco y el tráfico de red. La supervisión de estas métricas es fundamental para:

  • Optimización del rendimiento del clúster.
  • Administrar recursos de forma eficaz.
  • Solución de problemas operativos.

Las métricas se pueden aprovechar a través de la interfaz de usuario de Databricks o exportarlas a herramientas de supervisión personales. Vea Ejemplo de Notebook: Métricas de Datadog.

Interfaz de usuario de Spark

La interfaz de usuario de Spark muestra información detallada sobre el progreso de los trabajos y las fases, incluido el número de tareas completadas, pendientes y con errores. Esto te ayuda a entender el flujo de ejecución y a identificar cuellos de botella.

En el caso de las aplicaciones de streaming, la pestaña Streaming muestra métricas como la velocidad de entrada, la velocidad de procesamiento y la duración del lote. Ayuda a supervisar el rendimiento de los trabajos de streaming e identificar cualquier problema de ingesta o procesamiento de datos.

Consulte Depuración con la interfaz de usuario de Spark para obtener más información.

Métricas de cálculo

Las métricas de proceso le ayudarán a comprender el uso del clúster. A medida que se ejecuta el trabajo, puede ver cómo se escala y cómo se ven afectados los recursos. Podrá encontrar presión de memoria que podría provocar errores de OOM o presión de CPU que podría provocar retrasos prolongados. Estas son las métricas específicas que verá:

  • Distribución de carga del servidor: el uso de CPU de cada nodo durante el último minuto.
  • Uso de CPU: el porcentaje de tiempo que la CPU pasó en varios modos (por ejemplo, usuario, sistema, inactivo y iowait).
  • Uso de memoria: uso total de memoria por cada modo (por ejemplo, usado, libre, búfer y almacenado en caché).
  • Utilización del swap de memoria: uso total del swap de memoria.
  • Espacio libre del sistema de archivos: uso pleno del sistema de archivos por cada punto de montaje.
  • Rendimiento de red: el número de bytes recibidos y transmitidos a través de la red por cada dispositivo.
  • Número de nodos activos: número de nodos activos en cada marca de tiempo para el proceso especificado.

Consulte Supervisar el rendimiento y Gráficos de métricas de hardware para obtener más información.

Tablas del sistema

Supervisión de costos

Las tablas del sistema de Azure Databricks proporcionan un enfoque estructurado para supervisar el costo y el rendimiento del trabajo. Estas tablas incluyen lo siguiente:

  • Detalles de ejecución del trabajo.
  • Uso de recursos.
  • Costos asociados.

Use estas tablas para comprender el estado operativo y el impacto financiero.

Requirements

Para usar tablas del sistema para la supervisión de costos:

  • Un administrador de cuenta debe habilitar el system.lakeflow schema.
  • Los usuarios deben, o bien:
    • Ser un administrador de metastore y un administrador de cuenta, o
    • Tener permisos USE y SELECT en los esquemas del sistema.

Consulta de ejemplo: Trabajos más caros (últimos 30 días)

Esta consulta identifica los trabajos más caros en los últimos 30 días, lo que ayuda en el análisis y la optimización de costos.

WITH list_cost_per_job AS (
     SELECT
       t1.workspace_id,
       t1.usage_metadata.job_id,
       COUNT(DISTINCT t1.usage_metadata.job_run_id) AS runs,
       SUM(t1.usage_quantity * list_prices.pricing.default) AS list_cost,
       FIRST(identity_metadata.run_as, true) AS run_as,
       FIRST(t1.custom_tags, true) AS custom_tags,
       MAX(t1.usage_end_time) AS last_seen_date
     FROM system.billing.usage t1
     INNER JOIN system.billing.list_prices list_prices ON
       t1.cloud = list_prices.cloud AND
       t1.sku_name = list_prices.sku_name AND
       t1.usage_start_time >= list_prices.price_start_time AND
       (t1.usage_end_time <= list_prices.price_end_time OR list_prices.price_end_time IS NULL)
     WHERE
       t1.billing_origin_product = "JOBS"
       AND t1.usage_date >= CURRENT_DATE() - INTERVAL 30 DAY
     GROUP BY ALL
   ),
   most_recent_jobs AS (
     SELECT
       *,
       ROW_NUMBER() OVER(PARTITION BY workspace_id, job_id ORDER BY change_time DESC) AS rn
     FROM
       system.lakeflow.jobs QUALIFY rn=1
   )
   SELECT
     t2.name,
     t1.job_id,
     t1.workspace_id,
     t1.runs,
     t1.run_as,
     SUM(list_cost) AS list_cost,
     t1.last_seen_date
   FROM list_cost_per_job t1
   LEFT JOIN most_recent_jobs t2 USING (workspace_id, job_id)
   GROUP BY ALL
   ORDER BY list_cost DESC

Canalizaciones declarativas de Spark de Lakeflow

El registro de eventos de Lakeflow Spark Declarative Pipelines captura un registro detallado de todos los eventos de la canalización, incluidos:

  • Registros de auditoría.
  • Comprobaciones de calidad de datos.
  • Progreso de la canalización.
  • Linaje de datos.

El registro de eventos se habilita automáticamente para todas las canalizaciones declarativas de Spark de Lakeflow y se puede acceder a él a través de:

  • Interfaz de usuario de Pipeline: vea los registros directamente.
  • Pipelines API: acceso mediante programación.
  • Consulta directa: consulte la tabla de registro de eventos.

Para más información, consulte Esquema de registro de eventos para canalizaciones declarativas de Spark de Lakeflow.

Consultas de ejemplo

Estas consultas de ejemplo ayudan a supervisar el rendimiento y el estado de las canalizaciones proporcionando métricas clave, como la duración del lote, el rendimiento, la represión y el uso de recursos.

Duración media del lote

Esta consulta calcula la duración media de los lotes procesados por la canalización.

SELECT
  (max_t - min_t) / batch_count as avg_batch_duration_seconds,
  batch_count,
  min_t,
  max_t,
  date_hr,
  message
FROM
  -- /60 for minutes
  (
    SELECT
      count(*) as batch_count,
      unix_timestamp(
        min(timestamp)
      ) as min_t,
      unix_timestamp(
        max(timestamp)
      ) as max_t,
      date_format(timestamp, 'yyyy-MM-dd:HH') as date_hr,
      message
    FROM
      event_log
    WHERE
      event_type = 'flow_progress'
      AND level = 'METRICS'
    GROUP BY
      date_hr,
      message
  )
ORDER BY
  date_hr desc

Rendimiento medio

Esta consulta calcula el rendimiento medio de la canalización en términos de filas procesadas por segundo.

SELECT
  (max_t - min_t) / total_rows as avg_throughput_rps,
  total_rows,
  min_t,
  max_t,
  date_hr,
  message
FROM
  -- /60 for minutes
  (
    SELECT
      sum(
        details:flow_progress:metrics:num_output_rows
      ) as total_rows,
      unix_timestamp(
        min(timestamp)
      ) as min_t,
      unix_timestamp(
        max(timestamp)
      ) as max_t,
      date_format(timestamp, 'yyyy-MM-dd:HH') as date_hr,
      message
    FROM
      event_log
    WHERE
      event_type = 'flow_progress'
      AND level = 'METRICS'
    GROUP BY
      date_hr,
      message
  )
ORDER BY
  date_hr desc

Backpressure

Esta consulta mide la contrapresión de la canalización comprobando la acumulación de datos.

SELECT
  timestamp,
  DOUBLE(
    details:flow_progress:metrics:backlog_bytes
  ) AS backlog_bytes,
  DOUBLE(
    details:flow_progress:metrics:backlog_files
  ) AS backlog_files
FROM
  event_log
WHERE
  event_type = 'flow_progress'

Uso de clústeres y ranuras

Esta consulta tiene información sobre el uso de clústeres o ranuras que se utilizan en la canalización.

SELECT
  date_trunc("hour", timestamp) AS hour,
  AVG (
    DOUBLE (
      details:cluster_resources:num_task_slots
    )
  ) AS num_task_slots,
  AVG (
    DOUBLE (
      details:cluster_resources:avg_num_task_slots
    )
  ) AS avg_num_task_slots,
  AVG (
    DOUBLE (
      details:cluster_resources:num_executors
    )
  ) AS num_executors,
  AVG (
    DOUBLE (
      details:cluster_resources:avg_task_slot_utilization
    )
  ) AS avg_utilization,
  AVG (
    DOUBLE (
      details:cluster_resources:avg_num_queued_tasks
    )
  ) AS queue_size
FROM
  event_log
WHERE
  details : cluster_resources : avg_num_queued_tasks IS NOT NULL
  AND origin.update_id = '${latest_update_id}'
GROUP BY
  1;

Jobs

Puede supervisar las consultas de streaming en los trabajos a través del agente de escucha de consultas de streaming.

Adjunte un agente de escucha a la sesión de Spark para habilitar el agente de escucha de consultas de streaming enAzure Databricks. Este agente de escucha supervisará el progreso y las métricas de las consultas de streaming. Se puede usar para insertar métricas en herramientas de supervisión externas o registrarlas para su posterior análisis.

Ejemplo: Exportación de métricas a herramientas de supervisión externas

Note

Esto está disponible en Databricks Runtime 11.3 LTS y versiones posteriores para Python y Scala.

Puede exportar métricas de streaming a servicios externos para alertas o paneles mediante la StreamingQueryListener interfaz .

Este es un ejemplo básico de cómo implementar un agente de escucha:

from pyspark.sql.streaming import StreamingQueryListener

class MyListener(StreamingQueryListener):
   def onQueryStarted(self, event):
       print("Query started: ", event.id)

   def onQueryProgress(self, event):
       print("Query made progress: ", event.progress)

   def onQueryTerminated(self, event):
       print("Query terminated: ", event.id)

spark.streams.addListener(MyListener())

Ejemplo: Uso del agente de escucha de consultas en Azure Databricks

A continuación se muestra un ejemplo de un registro de eventos de StreamingQueryListener para una consulta de streaming de Kafka a Delta Lake:

{
  "id": "210f4746-7caa-4a51-bd08-87cabb45bdbe",
  "runId": "42a2f990-c463-4a9c-9aae-95d6990e63f4",
  "timestamp": "2024-05-15T21:57:50.782Z",
  "batchId": 0,
  "batchDuration": 3601,
  "numInputRows": 20,
  "inputRowsPerSecond": 0.0,
  "processedRowsPerSecond": 5.55401277422938,
  "durationMs": {
    "addBatch": 1544,
    "commitBatch": 686,
    "commitOffsets": 27,
    "getBatch": 12,
    "latestOffset": 577,
    "queryPlanning": 105,
    "triggerExecution": 3600,
    "walCommit": 34
  },
  "stateOperators": [
    {
      "operatorName": "symmetricHashJoin",
      "numRowsTotal": 20,
      "numRowsUpdated": 20,
      "allUpdatesTimeMs": 473,
      "numRowsRemoved": 0,
      "allRemovalsTimeMs": 0,
      "commitTimeMs": 277,
      "memoryUsedBytes": 13120,
      "numRowsDroppedByWatermark": 0,
      "numShufflePartitions": 5,
      "numStateStoreInstances": 20,
      "customMetrics": {
        "loadedMapCacheHitCount": 0,
        "loadedMapCacheMissCount": 0,
        "stateOnCurrentVersionSizeBytes": 5280
      }
    }
  ],
  "sources": [
    {
      "description": "KafkaV2[Subscribe[topic-1]]",
      "numInputRows": 10,
      "inputRowsPerSecond": 0.0,
      "processedRowsPerSecond": 2.77700638711469,
      "metrics": {
        "avgOffsetsBehindLatest": "0.0",
        "estimatedTotalBytesBehindLatest": "0.0",
        "maxOffsetsBehindLatest": "0",
        "minOffsetsBehindLatest": "0"
      }
    },
    {
      "description": "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
      "numInputRows": 10,
      "inputRowsPerSecond": 0.0,
      "processedRowsPerSecond": 2.77700638711469,
      "metrics": {
        "numBytesOutstanding": "0",
        "numFilesOutstanding": "0"
      }
    }
  ]
}

Para obtener más ejemplos, vea: Ejemplos.

Métricas de progreso de la consulta

Las métricas de progreso de las consultas son esenciales para supervisar el rendimiento y el estado de las consultas de streaming. Estas métricas incluyen el número de filas de entrada, velocidades de procesamiento y varias duraciones relacionadas con la ejecución de la consulta. Para observar estas métricas, adjunte un StreamingQueryListener elemento a la sesión de Spark. El oyente emitirá eventos que contengan estas métricas al final de cada ciclo de transmisión.

Por ejemplo, puede obtener acceso a las métricas mediante la asignación StreamingQueryProgress.observedMetrics del método onQueryProgress del oyente. Esto le permite realizar un seguimiento y analizar el rendimiento de las consultas de streaming en tiempo real.

class MyListener(StreamingQueryListener):
   def onQueryProgress(self, event):
       print("Query made progress: ", event.progress.observedMetrics)