Nota:
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
La supervisión del rendimiento, el costo y el estado de las aplicaciones de streaming es esencial para crear canalizaciones de ETL confiables y eficaces. Azure Databricks proporciona un amplio conjunto de capacidades de observabilidad para Jobs, canalizaciones declarativas de Spark de Lakeflow y Lakeflow Connect para ayudar a diagnosticar cuellos de botella, optimizar el rendimiento y gestionar tanto el uso como los costos de los recursos.
En este artículo se describen los procedimientos recomendados en las siguientes áreas:
- Métricas clave de rendimiento de streaming
- Esquemas de registro de eventos y consultas de ejemplo
- Supervisión de consultas de streaming
- Observabilidad de costos mediante tablas del sistema
- Exportación de registros y métricas a herramientas externas
Métricas clave para la observabilidad de streaming
Al operar canalizaciones de streaming, supervise las siguientes métricas clave:
| Metric | Purpose |
|---|---|
| Backpressure | Supervisa el número de archivos y desplazamientos (tamaños). Ayuda a identificar cuellos de botella y garantiza que el sistema pueda controlar los datos entrantes sin que se quede atrás. |
| Throughput | Realiza un seguimiento del número de mensajes procesados por microproceso. Evalúe la eficacia de la canalización y compruebe que sigue el ritmo de la ingesta de datos. |
| Duration | Mide la duración media de un microproceso. Indica la velocidad de procesamiento y ayuda a optimizar los intervalos por lotes. |
| Latency | Indica cuántos registros o mensajes se procesan con el tiempo. Ayuda a comprender los retrasos de la canalización de un extremo a otro y a optimizar las latencias más bajas. |
| Uso del clúster | Refleja el uso de cpu y memoria (%). Garantiza un uso eficaz de los recursos y ayuda a escalar clústeres para satisfacer las demandas de procesamiento. |
| Network | Mide los datos transferidos y recibidos. Resulta útil para identificar cuellos de botella de red y mejorar el rendimiento de la transferencia de datos. |
| Checkpoint | Identifica los datos procesados y los desplazamientos. Garantiza la coherencia y habilita la tolerancia a errores durante los errores. |
| Cost | Muestra los costos por hora, diario y mensual de una aplicación de streaming. Ayuda en la optimización de presupuestos y recursos. |
| Lineage | Muestra conjuntos de datos y capas creados en la aplicación de streaming. Facilita la transformación, el seguimiento, la garantía de calidad y la depuración de datos. |
Registros y métricas del clúster
Los registros y métricas del clúster de Azure Databricks proporcionan información detallada sobre el rendimiento y el uso del clúster. Estos registros y métricas incluyen información sobre cpu, memoria, E/S de disco, tráfico de red y otras métricas del sistema. La supervisión de estas métricas es fundamental para optimizar el rendimiento del clúster, administrar los recursos de forma eficaz y solucionar problemas.
Los registros y las métricas del clúster de Azure Databricks ofrecen información detallada sobre el rendimiento del clúster y el uso de recursos. Entre ellas se incluyen el uso de CPU y memoria, la E/S de disco y el tráfico de red. La supervisión de estas métricas es fundamental para:
- Optimización del rendimiento del clúster.
- Administrar recursos de forma eficaz.
- Solución de problemas operativos.
Las métricas se pueden aprovechar a través de la interfaz de usuario de Databricks o exportarlas a herramientas de supervisión personales. Vea Ejemplo de Notebook: Métricas de Datadog.
Interfaz de usuario de Spark
La interfaz de usuario de Spark muestra información detallada sobre el progreso de los trabajos y las fases, incluido el número de tareas completadas, pendientes y con errores. Esto te ayuda a entender el flujo de ejecución y a identificar cuellos de botella.
En el caso de las aplicaciones de streaming, la pestaña Streaming muestra métricas como la velocidad de entrada, la velocidad de procesamiento y la duración del lote. Ayuda a supervisar el rendimiento de los trabajos de streaming e identificar cualquier problema de ingesta o procesamiento de datos.
Consulte Depuración con la interfaz de usuario de Spark para obtener más información.
Métricas de cálculo
Las métricas de proceso le ayudarán a comprender el uso del clúster. A medida que se ejecuta el trabajo, puede ver cómo se escala y cómo se ven afectados los recursos. Podrá encontrar presión de memoria que podría provocar errores de OOM o presión de CPU que podría provocar retrasos prolongados. Estas son las métricas específicas que verá:
- Distribución de carga del servidor: el uso de CPU de cada nodo durante el último minuto.
- Uso de CPU: el porcentaje de tiempo que la CPU pasó en varios modos (por ejemplo, usuario, sistema, inactivo y iowait).
- Uso de memoria: uso total de memoria por cada modo (por ejemplo, usado, libre, búfer y almacenado en caché).
- Utilización del swap de memoria: uso total del swap de memoria.
- Espacio libre del sistema de archivos: uso pleno del sistema de archivos por cada punto de montaje.
- Rendimiento de red: el número de bytes recibidos y transmitidos a través de la red por cada dispositivo.
- Número de nodos activos: número de nodos activos en cada marca de tiempo para el proceso especificado.
Consulte Supervisar el rendimiento y Gráficos de métricas de hardware para obtener más información.
Tablas del sistema
Supervisión de costos
Las tablas del sistema de Azure Databricks proporcionan un enfoque estructurado para supervisar el costo y el rendimiento del trabajo. Estas tablas incluyen lo siguiente:
- Detalles de ejecución del trabajo.
- Uso de recursos.
- Costos asociados.
Use estas tablas para comprender el estado operativo y el impacto financiero.
Requirements
Para usar tablas del sistema para la supervisión de costos:
- Un administrador de cuenta debe habilitar el
system.lakeflow schema. - Los usuarios deben, o bien:
- Ser un administrador de metastore y un administrador de cuenta, o
- Tener permisos
USEySELECTen los esquemas del sistema.
Consulta de ejemplo: Trabajos más caros (últimos 30 días)
Esta consulta identifica los trabajos más caros en los últimos 30 días, lo que ayuda en el análisis y la optimización de costos.
WITH list_cost_per_job AS (
SELECT
t1.workspace_id,
t1.usage_metadata.job_id,
COUNT(DISTINCT t1.usage_metadata.job_run_id) AS runs,
SUM(t1.usage_quantity * list_prices.pricing.default) AS list_cost,
FIRST(identity_metadata.run_as, true) AS run_as,
FIRST(t1.custom_tags, true) AS custom_tags,
MAX(t1.usage_end_time) AS last_seen_date
FROM system.billing.usage t1
INNER JOIN system.billing.list_prices list_prices ON
t1.cloud = list_prices.cloud AND
t1.sku_name = list_prices.sku_name AND
t1.usage_start_time >= list_prices.price_start_time AND
(t1.usage_end_time <= list_prices.price_end_time OR list_prices.price_end_time IS NULL)
WHERE
t1.billing_origin_product = "JOBS"
AND t1.usage_date >= CURRENT_DATE() - INTERVAL 30 DAY
GROUP BY ALL
),
most_recent_jobs AS (
SELECT
*,
ROW_NUMBER() OVER(PARTITION BY workspace_id, job_id ORDER BY change_time DESC) AS rn
FROM
system.lakeflow.jobs QUALIFY rn=1
)
SELECT
t2.name,
t1.job_id,
t1.workspace_id,
t1.runs,
t1.run_as,
SUM(list_cost) AS list_cost,
t1.last_seen_date
FROM list_cost_per_job t1
LEFT JOIN most_recent_jobs t2 USING (workspace_id, job_id)
GROUP BY ALL
ORDER BY list_cost DESC
Canalizaciones declarativas de Spark de Lakeflow
El registro de eventos de Lakeflow Spark Declarative Pipelines captura un registro detallado de todos los eventos de la canalización, incluidos:
- Registros de auditoría.
- Comprobaciones de calidad de datos.
- Progreso de la canalización.
- Linaje de datos.
El registro de eventos se habilita automáticamente para todas las canalizaciones declarativas de Spark de Lakeflow y se puede acceder a él a través de:
- Interfaz de usuario de Pipeline: vea los registros directamente.
- Pipelines API: acceso mediante programación.
- Consulta directa: consulte la tabla de registro de eventos.
Para más información, consulte Esquema de registro de eventos para canalizaciones declarativas de Spark de Lakeflow.
Consultas de ejemplo
Estas consultas de ejemplo ayudan a supervisar el rendimiento y el estado de las canalizaciones proporcionando métricas clave, como la duración del lote, el rendimiento, la represión y el uso de recursos.
Duración media del lote
Esta consulta calcula la duración media de los lotes procesados por la canalización.
SELECT
(max_t - min_t) / batch_count as avg_batch_duration_seconds,
batch_count,
min_t,
max_t,
date_hr,
message
FROM
-- /60 for minutes
(
SELECT
count(*) as batch_count,
unix_timestamp(
min(timestamp)
) as min_t,
unix_timestamp(
max(timestamp)
) as max_t,
date_format(timestamp, 'yyyy-MM-dd:HH') as date_hr,
message
FROM
event_log
WHERE
event_type = 'flow_progress'
AND level = 'METRICS'
GROUP BY
date_hr,
message
)
ORDER BY
date_hr desc
Rendimiento medio
Esta consulta calcula el rendimiento medio de la canalización en términos de filas procesadas por segundo.
SELECT
(max_t - min_t) / total_rows as avg_throughput_rps,
total_rows,
min_t,
max_t,
date_hr,
message
FROM
-- /60 for minutes
(
SELECT
sum(
details:flow_progress:metrics:num_output_rows
) as total_rows,
unix_timestamp(
min(timestamp)
) as min_t,
unix_timestamp(
max(timestamp)
) as max_t,
date_format(timestamp, 'yyyy-MM-dd:HH') as date_hr,
message
FROM
event_log
WHERE
event_type = 'flow_progress'
AND level = 'METRICS'
GROUP BY
date_hr,
message
)
ORDER BY
date_hr desc
Backpressure
Esta consulta mide la contrapresión de la canalización comprobando la acumulación de datos.
SELECT
timestamp,
DOUBLE(
details:flow_progress:metrics:backlog_bytes
) AS backlog_bytes,
DOUBLE(
details:flow_progress:metrics:backlog_files
) AS backlog_files
FROM
event_log
WHERE
event_type = 'flow_progress'
Uso de clústeres y ranuras
Esta consulta tiene información sobre el uso de clústeres o ranuras que se utilizan en la canalización.
SELECT
date_trunc("hour", timestamp) AS hour,
AVG (
DOUBLE (
details:cluster_resources:num_task_slots
)
) AS num_task_slots,
AVG (
DOUBLE (
details:cluster_resources:avg_num_task_slots
)
) AS avg_num_task_slots,
AVG (
DOUBLE (
details:cluster_resources:num_executors
)
) AS num_executors,
AVG (
DOUBLE (
details:cluster_resources:avg_task_slot_utilization
)
) AS avg_utilization,
AVG (
DOUBLE (
details:cluster_resources:avg_num_queued_tasks
)
) AS queue_size
FROM
event_log
WHERE
details : cluster_resources : avg_num_queued_tasks IS NOT NULL
AND origin.update_id = '${latest_update_id}'
GROUP BY
1;
Jobs
Puede supervisar las consultas de streaming en los trabajos a través del agente de escucha de consultas de streaming.
Adjunte un agente de escucha a la sesión de Spark para habilitar el agente de escucha de consultas de streaming enAzure Databricks. Este agente de escucha supervisará el progreso y las métricas de las consultas de streaming. Se puede usar para insertar métricas en herramientas de supervisión externas o registrarlas para su posterior análisis.
Ejemplo: Exportación de métricas a herramientas de supervisión externas
Note
Esto está disponible en Databricks Runtime 11.3 LTS y versiones posteriores para Python y Scala.
Puede exportar métricas de streaming a servicios externos para alertas o paneles mediante la StreamingQueryListener interfaz .
Este es un ejemplo básico de cómo implementar un agente de escucha:
from pyspark.sql.streaming import StreamingQueryListener
class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
print("Query started: ", event.id)
def onQueryProgress(self, event):
print("Query made progress: ", event.progress)
def onQueryTerminated(self, event):
print("Query terminated: ", event.id)
spark.streams.addListener(MyListener())
Ejemplo: Uso del agente de escucha de consultas en Azure Databricks
A continuación se muestra un ejemplo de un registro de eventos de StreamingQueryListener para una consulta de streaming de Kafka a Delta Lake:
{
"id": "210f4746-7caa-4a51-bd08-87cabb45bdbe",
"runId": "42a2f990-c463-4a9c-9aae-95d6990e63f4",
"timestamp": "2024-05-15T21:57:50.782Z",
"batchId": 0,
"batchDuration": 3601,
"numInputRows": 20,
"inputRowsPerSecond": 0.0,
"processedRowsPerSecond": 5.55401277422938,
"durationMs": {
"addBatch": 1544,
"commitBatch": 686,
"commitOffsets": 27,
"getBatch": 12,
"latestOffset": 577,
"queryPlanning": 105,
"triggerExecution": 3600,
"walCommit": 34
},
"stateOperators": [
{
"operatorName": "symmetricHashJoin",
"numRowsTotal": 20,
"numRowsUpdated": 20,
"allUpdatesTimeMs": 473,
"numRowsRemoved": 0,
"allRemovalsTimeMs": 0,
"commitTimeMs": 277,
"memoryUsedBytes": 13120,
"numRowsDroppedByWatermark": 0,
"numShufflePartitions": 5,
"numStateStoreInstances": 20,
"customMetrics": {
"loadedMapCacheHitCount": 0,
"loadedMapCacheMissCount": 0,
"stateOnCurrentVersionSizeBytes": 5280
}
}
],
"sources": [
{
"description": "KafkaV2[Subscribe[topic-1]]",
"numInputRows": 10,
"inputRowsPerSecond": 0.0,
"processedRowsPerSecond": 2.77700638711469,
"metrics": {
"avgOffsetsBehindLatest": "0.0",
"estimatedTotalBytesBehindLatest": "0.0",
"maxOffsetsBehindLatest": "0",
"minOffsetsBehindLatest": "0"
}
},
{
"description": "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
"numInputRows": 10,
"inputRowsPerSecond": 0.0,
"processedRowsPerSecond": 2.77700638711469,
"metrics": {
"numBytesOutstanding": "0",
"numFilesOutstanding": "0"
}
}
]
}
Para obtener más ejemplos, vea: Ejemplos.
Métricas de progreso de la consulta
Las métricas de progreso de las consultas son esenciales para supervisar el rendimiento y el estado de las consultas de streaming. Estas métricas incluyen el número de filas de entrada, velocidades de procesamiento y varias duraciones relacionadas con la ejecución de la consulta. Para observar estas métricas, adjunte un StreamingQueryListener elemento a la sesión de Spark. El oyente emitirá eventos que contengan estas métricas al final de cada ciclo de transmisión.
Por ejemplo, puede obtener acceso a las métricas mediante la asignación StreamingQueryProgress.observedMetrics del método onQueryProgress del oyente. Esto le permite realizar un seguimiento y analizar el rendimiento de las consultas de streaming en tiempo real.
class MyListener(StreamingQueryListener):
def onQueryProgress(self, event):
print("Query made progress: ", event.progress.observedMetrics)