Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Le journal des événements de pipeline contient toutes les informations relatives à un pipeline, notamment les journaux d’audit, les vérifications de qualité des données, la progression du pipeline et la traçabilité des données. Vous pouvez utiliser le journal des événements pour suivre, comprendre et surveiller l’état de vos pipelines de données.
Vous pouvez afficher les entrées du journal des événements dans l’interface utilisateur de surveillance du pipeline, l’API REST Pipelines ou en interrogeant directement le journal des événements. Cette section se concentre sur la façon d’interroger directement le journal des événements.
Vous pouvez également définir des actions personnalisées à exécuter lorsque des événements sont enregistrés, par exemple en envoyant des alertes, avec des hooks d’événements.
Important
Ne supprimez pas le journal des événements ou le catalogue parent ou le schéma où le journal des événements est publié. La suppression du journal des événements peut entraîner l’échec de la mise à jour de votre pipeline pendant les prochaines exécutions.
Pour plus d’informations sur le schéma du journal des événements, consultez le schéma du journal des événements de pipeline.
Interroger le journal des événements
Note
Cette section décrit le comportement et la syntaxe par défaut de l’utilisation des journaux d’événements pour les pipelines configurés avec le catalogue Unity et le mode de publication par défaut.
- Pour connaître le comportement des pipelines de catalogue Unity qui utilisent le mode de publication hérité, consultez Utiliser le journal des événements pour les pipelines en mode de publication hérité de Catalogue Unity.
- Pour connaître le comportement et la syntaxe des pipelines de metastore Hive, consultez Utiliser le journal des événements pour les pipelines de metastore Hive.
Par défaut, un pipeline écrit le journal des événements dans une table Delta masquée dans le catalogue et le schéma par défaut configurés pour le pipeline. Bien que masquée, la table peut toujours être interrogée par tous les utilisateurs suffisamment privilégiés. Par défaut, seul le propriétaire du pipeline peut interroger la table du journal des événements.
Pour interroger le journal des événements en tant que propriétaire, utilisez l’ID de pipeline :
SELECT * FROM event_log(<pipelineId>);
Par défaut, le nom du journal des événements masqués est mis en forme event_log_{pipeline_id}, où l’ID de pipeline est l’UUID affecté par le système avec des tirets remplacés par des traits de soulignement.
Vous pouvez publier le journal des événements en modifiant les paramètres avancés de votre pipeline. Pour plus d’informations, consultez le paramètre pipeline pour le journal des événements. Lorsque vous publiez un journal des événements, spécifiez le nom du journal des événements et, éventuellement, spécifiez un catalogue et un schéma, comme dans l’exemple suivant :
{
"id": "ec2a0ff4-d2a5-4c8c-bf1d-d9f12f10e749",
"name": "billing_pipeline",
"event_log": {
"catalog": "catalog_name",
"schema": "schema_name",
"name": "event_log_table_name"
}
}
L’emplacement du journal des événements sert également d’emplacement de schéma pour toutes les requêtes du chargeur automatique dans le pipeline. Databricks recommande de créer une vue sur la table du journal des événements avant de modifier les privilèges, car certains paramètres de calcul peuvent permettre aux utilisateurs d’accéder aux métadonnées de schéma si la table du journal des événements est partagée directement. L’exemple de syntaxe suivant crée une vue sur une table du journal des événements et est utilisé dans l’exemple de requêtes de journal des événements incluses dans cet article. Remplacez <catalog_name>.<schema_name>.<event_log_table_name> par le nom complet de la table de votre journal des événements de pipeline. Si vous avez publié le journal des événements, utilisez le nom spécifié lors de la publication. Sinon, utilisez event_log(<pipelineId>) l’emplacement où pipelineId est l’ID du pipeline que vous souhaitez interroger.
CREATE VIEW event_log_raw
AS SELECT * FROM <catalog_name>.<schema_name>.<event_log_table_name>;
Dans Unity Catalog, les vues prennent en charge les requêtes en streaming. L’exemple suivant utilise Structured Streaming pour interroger une vue définie en haut d’une table du journal des événements :
df = spark.readStream.table("event_log_raw")
Exemples de requêtes de base
Les exemples suivants montrent comment interroger le journal des événements pour obtenir des informations générales sur les pipelines et aider à déboguer des scénarios courants.
Surveiller les mises à jour du pipeline en interrogeant les mises à jour précédentes
L’exemple suivant interroge les mises à jour (ou exécutions) de votre pipeline, montrant l’ID de mise à jour, l’état, l’heure de début, l’heure d’achèvement et la durée. Cela vous donne une vue d’ensemble des opérations du pipeline.
Cela suppose que vous avez créé la vue event_log_raw pour le pipeline qui vous intéresse, comme décrit dans Consulter le journal des événements.
with last_status_per_update AS (
SELECT
origin.pipeline_id AS pipeline_id,
origin.pipeline_name AS pipeline_name,
origin.update_id AS pipeline_update_id,
FROM_JSON(details, 'struct<update_progress: struct<state: string>>').update_progress.state AS last_update_state,
timestamp,
ROW_NUMBER() OVER (
PARTITION BY origin.update_id
ORDER BY timestamp DESC
) AS rn
FROM event_log_raw
WHERE event_type = 'update_progress'
QUALIFY rn = 1
),
update_durations AS (
SELECT
origin.pipeline_id AS pipeline_id,
origin.pipeline_name AS pipeline_name,
origin.update_id AS pipeline_update_id,
-- Capture the start of the update
MIN(CASE WHEN event_type = 'create_update' THEN timestamp END) AS start_time,
-- Capture the end of the update based on terminal states or current timestamp (relevant for continuous mode pipelines)
COALESCE(
MAX(CASE
WHEN event_type = 'update_progress'
AND FROM_JSON(details, 'struct<update_progress: struct<state: string>>').update_progress.state IN ('COMPLETED', 'FAILED', 'CANCELED')
THEN timestamp
END),
current_timestamp()
) AS end_time
FROM event_log_raw
WHERE event_type IN ('create_update', 'update_progress')
AND origin.update_id IS NOT NULL
GROUP BY pipeline_id, pipeline_name, pipeline_update_id
HAVING start_time IS NOT NULL
)
SELECT
s.pipeline_id,
s.pipeline_name,
s.pipeline_update_id,
d.start_time,
d.end_time,
CASE
WHEN d.start_time IS NOT NULL AND d.end_time IS NOT NULL THEN
ROUND(TIMESTAMPDIFF(MILLISECOND, d.start_time, d.end_time) / 1000)
ELSE NULL
END AS duration_seconds,
s.last_update_state AS pipeline_update_status
FROM last_status_per_update s
JOIN update_durations d
ON s.pipeline_id = d.pipeline_id
AND s.pipeline_update_id = d.pipeline_update_id
ORDER BY d.start_time DESC;
Déboguer les problèmes d'actualisation incrémentielle de la vue matérialisée
Cet exemple interroge tous les flux à partir de la mise à jour la plus récente d’un pipeline. Il indique s’ils ont été mis à jour de manière incrémentielle ou non, ainsi que d’autres informations de planification pertinentes qui sont utiles pour déboguer pourquoi une actualisation incrémentielle ne se produit pas.
Cela suppose que vous avez créé la vue event_log_raw pour le pipeline qui vous intéresse, comme décrit dans Consulter le journal des événements.
WITH latest_update AS (
SELECT
origin.pipeline_id,
origin.update_id AS latest_update_id
FROM event_log_raw AS origin
WHERE origin.event_type = 'create_update'
ORDER BY timestamp DESC
-- LIMIT 1 -- remove if you want to get all of the update_ids
),
parsed_planning AS (
SELECT
origin.pipeline_name,
origin.pipeline_id,
origin.flow_name,
lu.latest_update_id,
from_json(
details:planning_information,
'struct<
technique_information: array<struct<
maintenance_type: string,
is_chosen: boolean,
is_applicable: boolean,
cost: double,
incrementalization_issues: array<struct<
issue_type: string,
prevent_incrementalization: boolean,
operator_name: string,
plan_not_incrementalizable_sub_type: string,
expression_name: string,
plan_not_deterministic_sub_type: string
>>
>>
>'
) AS parsed
FROM event_log_raw AS origin
JOIN latest_update lu
ON origin.update_id = lu.latest_update_id
WHERE details:planning_information IS NOT NULL
),
chosen_technique AS (
SELECT
pipeline_name,
pipeline_id,
flow_name,
latest_update_id,
FILTER(parsed.technique_information, t -> t.is_chosen = true)[0] AS chosen_technique,
parsed.technique_information AS planning_information
FROM parsed_planning
)
SELECT
pipeline_name,
pipeline_id,
flow_name,
latest_update_id,
chosen_technique.maintenance_type,
chosen_technique,
planning_information
FROM chosen_technique
ORDER BY latest_update_id DESC;
Demander le coût d’une mise à jour de pipeline
Cet exemple montre comment interroger l’utilisation de DBU pour un pipeline, ainsi que l’utilisateur pour une exécution de pipeline donnée.
SELECT
sku_name,
billing_origin_product,
usage_date,
collect_set(identity_metadata.run_as) as users,
SUM(usage_quantity) AS `DBUs`
FROM
system.billing.usage
WHERE
usage_metadata.dlt_pipeline_id = :pipeline_id
GROUP BY
ALL;
Requêtes avancées
Les exemples suivants montrent comment interroger le journal des événements pour gérer des scénarios moins courants ou plus avancés.
Métriques de requête pour tous les flux dans un pipeline
Cet exemple montre comment interroger des informations détaillées sur chaque flux d’un pipeline. Il affiche le nom du flux, la durée de mise à jour, les métriques de qualité des données et les informations sur les lignes traitées (lignes de sortie, supprimées, mises à jour et enregistrements supprimés).
Cela suppose que vous avez créé la vue event_log_raw pour le pipeline qui vous intéresse, comme décrit dans Consulter le journal des événements.
WITH flow_progress_raw AS (
SELECT
origin.pipeline_name AS pipeline_name,
origin.pipeline_id AS pipeline_id,
origin.flow_name AS table_name,
origin.update_id AS update_id,
timestamp,
details:flow_progress.status AS status,
TRY_CAST(details:flow_progress.metrics.num_output_rows AS BIGINT) AS num_output_rows,
TRY_CAST(details:flow_progress.metrics.num_upserted_rows AS BIGINT) AS num_upserted_rows,
TRY_CAST(details:flow_progress.metrics.num_deleted_rows AS BIGINT) AS num_deleted_rows,
TRY_CAST(details:flow_progress.data_quality.dropped_records AS BIGINT) AS num_expectation_dropped_rows,
FROM_JSON(
details:flow_progress.data_quality.expectations,
SCHEMA_OF_JSON("[{'name':'str', 'dataset':'str', 'passed_records':42, 'failed_records':42}]")
) AS expectations_array
FROM event_log_raw
WHERE event_type = 'flow_progress'
AND origin.flow_name IS NOT NULL
AND origin.flow_name != 'pipelines.flowTimeMetrics.missingFlowName'
),
aggregated_flows AS (
SELECT
pipeline_name,
pipeline_id,
update_id,
table_name,
MIN(CASE WHEN status IN ('STARTING', 'RUNNING', 'COMPLETED') THEN timestamp END) AS start_timestamp,
MAX(CASE WHEN status IN ('STARTING', 'RUNNING', 'COMPLETED') THEN timestamp END) AS end_timestamp,
MAX_BY(status, timestamp) FILTER (
WHERE status IN ('COMPLETED', 'FAILED', 'CANCELLED', 'EXCLUDED', 'SKIPPED', 'STOPPED', 'IDLE')
) AS final_status,
SUM(COALESCE(num_output_rows, 0)) AS total_output_records,
SUM(COALESCE(num_upserted_rows, 0)) AS total_upserted_records,
SUM(COALESCE(num_deleted_rows, 0)) AS total_deleted_records,
MAX(COALESCE(num_expectation_dropped_rows, 0)) AS total_expectation_dropped_records,
MAX(expectations_array) AS total_expectations
FROM flow_progress_raw
GROUP BY pipeline_name, pipeline_id, update_id, table_name
)
SELECT
af.pipeline_name,
af.pipeline_id,
af.update_id,
af.table_name,
af.start_timestamp,
af.end_timestamp,
af.final_status,
CASE
WHEN af.start_timestamp IS NOT NULL AND af.end_timestamp IS NOT NULL THEN
ROUND(TIMESTAMPDIFF(MILLISECOND, af.start_timestamp, af.end_timestamp) / 1000)
ELSE NULL
END AS duration_seconds,
af.total_output_records,
af.total_upserted_records,
af.total_deleted_records,
af.total_expectation_dropped_records,
af.total_expectations
FROM aggregated_flows af
-- Optional: filter to latest update only
WHERE af.update_id = (
SELECT update_id
FROM aggregated_flows
ORDER BY end_timestamp DESC
LIMIT 1
)
ORDER BY af.end_timestamp DESC, af.pipeline_name, af.pipeline_id, af.update_id, af.table_name;
Interroger les métriques de qualité des données ou des exigences
Si vous définissez des attentes sur les jeux de données de votre pipeline, les métriques du nombre d’enregistrements passés et ayant échoué sont stockées dans l’objet details:flow_progress.data_quality.expectations . La métrique du nombre d’enregistrements supprimés est stockée dans l’objet details:flow_progress.data_quality . Les événements contenant des informations sur la qualité des données ont le type d’événement flow_progress.
Les métriques de qualité des données peuvent ne pas être disponibles pour certains jeux de données. Consultez les limitations attendues.
Les métriques de qualité des données suivantes sont disponibles :
| Unité de mesure | Descriptif |
|---|---|
dropped_records |
Nombre d’enregistrements qui ont été supprimés parce qu’ils ont échoué une ou plusieurs attentes. |
passed_records |
Nombre d’enregistrements qui ont passé les critères d’attente. |
failed_records |
Nombre d’enregistrements ayant échoué aux critères d'expectation. |
L’exemple suivant interroge les métriques de qualité des données pour la dernière mise à jour du pipeline. Cela suppose que vous avez créé la event_log_raw vue du pipeline qui vous intéresse, comme décrit dans Requête du journal des événements.
WITH latest_update AS (
SELECT
origin.pipeline_id,
origin.update_id AS latest_update_id
FROM event_log_raw AS origin
WHERE origin.event_type = 'create_update'
ORDER BY timestamp DESC
LIMIT 1 -- remove if you want to get all of the update_ids
),
SELECT
row_expectations.dataset as dataset,
row_expectations.name as expectation,
SUM(row_expectations.passed_records) as passing_records,
SUM(row_expectations.failed_records) as failing_records
FROM
(
SELECT
explode(
from_json(
details:flow_progress:data_quality:expectations,
"array<struct<name: string, dataset: string, passed_records: int, failed_records: int>>"
)
) row_expectations
FROM
event_log_raw,
latest_update
WHERE
event_type = 'flow_progress'
AND origin.update_id = latest_update.id
)
GROUP BY
row_expectations.dataset,
row_expectations.name;
Interroger les informations de traçabilité
Les événements contenant des informations de traçabilité ont le type d’événement flow_definition. L’objet details:flow_definition contient output_dataset et input_datasets qui définissent chaque relation dans le graphe.
Utilisez la requête suivante pour extraire les jeux de données d’entrée et de sortie pour afficher les informations de traçabilité. Cela suppose que vous avez créé la event_log_raw vue du pipeline qui vous intéresse, comme décrit dans Requête du journal des événements.
with latest_update as (
SELECT origin.update_id as id
FROM event_log_raw
WHERE event_type = 'create_update'
ORDER BY timestamp DESC
limit 1 -- remove if you want all of the update_ids
)
SELECT
details:flow_definition.output_dataset as flow_name,
details:flow_definition.input_datasets as input_flow_names,
details:flow_definition.flow_type as flow_type,
details:flow_definition.schema, -- the schema of the flow
details:flow_definition -- overall flow_definition object
FROM event_log_raw inner join latest_update on origin.update_id = latest_update.id
WHERE details:flow_definition IS NOT NULL
ORDER BY timestamp;
Surveiller l’ingestion de fichiers cloud avec le chargeur automatique
Les pipelines génèrent des événements lorsque le chargeur automatique traite les fichiers. Pour les événements du chargeur automatique, le event_type est operation_progress et le details:operation_progress:type est soit AUTO_LOADER_LISTING soit AUTO_LOADER_BACKFILL. L'objet details:operation_progress inclut également les champs status, duration_ms, auto_loader_details:source_path et auto_loader_details:num_files_listed.
L’exemple suivant interroge les événements du chargeur automatique pour la dernière mise à jour. Cela suppose que vous avez créé la event_log_raw vue du pipeline qui vous intéresse, comme décrit dans Requête du journal des événements.
with latest_update as (
SELECT origin.update_id as id
FROM event_log_raw
WHERE event_type = 'create_update'
ORDER BY timestamp DESC
limit 1 -- remove if you want all of the update_ids
)
SELECT
timestamp,
details:operation_progress.status,
details:operation_progress.type,
details:operation_progress:auto_loader_details
FROM
event_log_raw,latest_update
WHERE
event_type like 'operation_progress'
AND
origin.update_id = latest_update.id
AND
details:operation_progress.type in ('AUTO_LOADER_LISTING', 'AUTO_LOADER_BACKFILL');
Surveiller le backlog des données pour optimiser la durée de diffusion en continu
Chaque pipeline suit la quantité de données présentes dans le backlog de l’objet details:flow_progress.metrics.backlog_bytes . Les événements contenant les métriques de backlog ont le type d’événement flow_progress. L’exemple suivant interroge les métriques du backlog pour la dernière mise à jour du pipeline. Cela suppose que vous avez créé la event_log_raw vue du pipeline qui vous intéresse, comme décrit dans Requête du journal des événements.
with latest_update as (
SELECT origin.update_id as id
FROM event_log_raw
WHERE event_type = 'create_update'
ORDER BY timestamp DESC
limit 1 -- remove if you want all of the update_ids
)
SELECT
timestamp,
Double(details :flow_progress.metrics.backlog_bytes) as backlog
FROM
event_log_raw,
latest_update
WHERE
event_type ='flow_progress'
AND
origin.update_id = latest_update.id;
Note
Les métriques du backlog peuvent ne pas être disponibles en fonction du type de source de données du pipeline et de la version de Databricks Runtime.
Surveiller les événements de mise à l’échelle automatique pour optimiser le calcul classique
Pour les pipelines qui utilisent le calcul classique (c’est-à-dire, qui n’utilisent pas de calcul serverless), le journal des événements enregistre les redimensionnements des clusters lorsque la mise à l’échelle automatique améliorée est activée dans vos pipelines. Les événements contenant des informations sur la mise à l’échelle automatique améliorée ont le type autoscaled’événement . Les informations de demande de redimensionnement de cluster sont stockées dans l’objet details:autoscale.
L’exemple suivant interroge les demandes de redimensionnement de cluster de mise à l’échelle automatique améliorée sur la dernière mise à jour du pipeline. Cela suppose que vous avez créé la event_log_raw vue du pipeline qui vous intéresse, comme décrit dans Requête du journal des événements.
with latest_update as (
SELECT origin.update_id as id
FROM event_log_raw
WHERE event_type = 'create_update'
ORDER BY timestamp DESC
limit 1 -- remove if you want all of the update_ids
)
SELECT
timestamp,
Double(
case
when details :autoscale.status = 'RESIZING' then details :autoscale.requested_num_executors
else null
end
) as starting_num_executors,
Double(
case
when details :autoscale.status = 'SUCCEEDED' then details :autoscale.requested_num_executors
else null
end
) as succeeded_num_executors,
Double(
case
when details :autoscale.status = 'PARTIALLY_SUCCEEDED' then details :autoscale.requested_num_executors
else null
end
) as partially_succeeded_num_executors,
Double(
case
when details :autoscale.status = 'FAILED' then details :autoscale.requested_num_executors
else null
end
) as failed_num_executors
FROM
event_log_raw,
latest_update
WHERE
event_type = 'autoscale'
AND
origin.update_id = latest_update.id
Surveiller l’utilisation des ressources de calcul pour le calcul classique
cluster_resources les événements fournissent des métriques sur le nombre d’emplacements de tâches dans le cluster, la quantité de ces emplacements de tâches utilisés et le nombre de tâches qui attendent d’être planifiées.
Lorsque la mise à l’échelle automatique améliorée est activée, cluster_resources les événements contiennent également des métriques pour l’algorithme de mise à l’échelle automatique, notamment latest_requested_num_executors, et optimal_num_executors. Les événements montrent également l’état de l’algorithme sous la forme d’états différents tels que CLUSTER_AT_DESIRED_SIZE, SCALE_UP_IN_PROGRESS_WAITING_FOR_EXECUTORS et BLOCKED_FROM_SCALING_DOWN_BY_CONFIGURATION.
Ces informations peuvent être consultées conjointement avec les événements de mise à l’échelle automatique pour fournir une image globale de la mise à l’échelle automatique améliorée.
L’exemple suivant interroge l’historique des tailles de file d’attente des tâches, l’historique d’utilisation, l’historique du nombre d’exécuteurs, ainsi que d’autres métriques et états pour la mise à l’échelle automatique lors de la dernière mise à jour du pipeline. Cela suppose que vous avez créé la event_log_raw vue du pipeline qui vous intéresse, comme décrit dans Requête du journal des événements.
with latest_update as (
SELECT origin.update_id as id
FROM event_log_raw
WHERE event_type = 'create_update'
ORDER BY timestamp DESC
limit 1 -- remove if you want all of the update_ids
)
SELECT
timestamp,
Double(details:cluster_resources.avg_num_queued_tasks) as queue_size,
Double(details:cluster_resources.avg_task_slot_utilization) as utilization,
Double(details:cluster_resources.num_executors) as current_executors,
Double(details:cluster_resources.latest_requested_num_executors) as latest_requested_num_executors,
Double(details:cluster_resources.optimal_num_executors) as optimal_num_executors,
details :cluster_resources.state as autoscaling_state
FROM
event_log_raw,
latest_update
WHERE
event_type = 'cluster_resources'
AND
origin.update_id = latest_update.id;
Surveiller les métriques de streaming de pipeline
Vous pouvez afficher des métriques sur la progression du flux dans un pipeline. Recherchez des événements stream_progress pour obtenir des événements très similaires aux métriques StreamingQueryListener créées par Structured Streaming, avec les exceptions suivantes :
- Les métriques suivantes sont présentes dans
StreamingQueryListener, mais pas dansstream_progress:numInputRows,inputRowsPerSecondetprocessedRowsPerSecond. - Pour les flux Kafka et Kineses, les champs
startOffset,endOffsetetlatestOffsetpeuvent être trop volumineux et sont tronqués. Pour chacun de ces champs, un champ supplémentaire...Truncated,startOffsetTruncated,endOffsetTruncatedetlatestOffsetTruncatedest ajouté avec une valeur booléenne pour déterminer si les données sont tronquées.
Pour interroger stream_progress événements, vous pouvez utiliser une requête comme la suivante :
SELECT
parse_json(get_json_object(details, '$.stream_progress.progress_json')) AS stream_progress_json
FROM event_log_raw
WHERE event_type = 'stream_progress';
Voici un exemple d’événement, au format JSON :
{
"id": "abcd1234-ef56-7890-abcd-ef1234abcd56",
"sequence": {
"control_plane_seq_no": 1234567890123456
},
"origin": {
"cloud": "<cloud>",
"region": "<region>",
"org_id": 0123456789012345,
"pipeline_id": "abcdef12-abcd-3456-7890-abcd1234ef56",
"pipeline_type": "WORKSPACE",
"pipeline_name": "<pipeline name>",
"update_id": "1234abcd-ef56-7890-abcd-ef1234abcd56",
"request_id": "1234abcd-ef56-7890-abcd-ef1234abcd56"
},
"timestamp": "2025-06-17T03:18:14.018Z",
"message": "Completed a streaming update of 'flow_name'."
"level": "INFO",
"details": {
"stream_progress": {
"progress": {
"id": "abcdef12-abcd-3456-7890-abcd1234ef56",
"runId": "1234abcd-ef56-7890-abcd-ef1234abcd56",
"name": "silverTransformFromBronze",
"timestamp": "2022-11-01T18:21:29.500Z",
"batchId": 4,
"durationMs": {
"latestOffset": 62,
"triggerExecution": 62
},
"stateOperators": [],
"sources": [
{
"description": "DeltaSource[dbfs:/path/to/table]",
"startOffset": {
"sourceVersion": 1,
"reservoirId": "abcdef12-abcd-3456-7890-abcd1234ef56",
"reservoirVersion": 3216,
"index": 3214,
"isStartingVersion": true
},
"endOffset": {
"sourceVersion": 1,
"reservoirId": "abcdef12-abcd-3456-7890-abcd1234ef56",
"reservoirVersion": 3216,
"index": 3214,
"isStartingVersion": true
},
"latestOffset": null,
"metrics": {
"numBytesOutstanding": "0",
"numFilesOutstanding": "0"
}
}
],
"sink": {
"description": "DeltaSink[dbfs:/path/to/sink]",
"numOutputRows": -1
}
}
}
},
"event_type": "stream_progress",
"maturity_level": "EVOLVING"
}
Cet exemple montre des enregistrements nontruncés dans une source Kafka, avec les ...Truncated champs définis sur false:
{
"description": "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
"startOffsetTruncated": false,
"startOffset": {
"KAFKA_TOPIC_NAME_INPUT_A": {
"0": 349706380
}
},
"endOffsetTruncated": false,
"endOffset": {
"KAFKA_TOPIC_NAME_INPUT_A": {
"0": 349706672
}
},
"latestOffsetTruncated": false,
"latestOffset": {
"KAFKA_TOPIC_NAME_INPUT_A": {
"0": 349706672
}
},
"numInputRows": 292,
"inputRowsPerSecond": 13.65826278123392,
"processedRowsPerSecond": 14.479817514628582,
"metrics": {
"avgOffsetsBehindLatest": "0.0",
"estimatedTotalBytesBehindLatest": "0.0",
"maxOffsetsBehindLatest": "0",
"minOffsetsBehindLatest": "0"
}
}
Auditer les pipelines
Vous pouvez utiliser des enregistrements de journal des événements et d’autres journaux d’audit Azure Databricks pour obtenir une image complète de la façon dont les données sont mises à jour dans un pipeline.
Lakeflow Spark Declarative Pipelines utilise les crédences du propriétaire du pipeline pour exécuter les mises à jour. Vous pouvez modifier les informations d’identification utilisées en mettant à jour le propriétaire du pipeline. Le journal d’audit enregistre l’utilisateur pour les actions sur le pipeline, notamment la création du pipeline, les modifications apportées à la configuration et le déclenchement des mises à jour.
Consultez Événements Unity Catalog pour obtenir des informations de référence sur les événements d’audit Unity Catalog.
Interroger les actions utilisateur dans le journal des événements
Vous pouvez utiliser le journal des événements pour auditer des événements, comme les actions utilisateur. Les événements contenant des informations sur les actions utilisateur ont le type d’événement user_action.
Les informations relatives à chaque action sont stockées dans l’objet user_action dans le champ details. Utilisez la requête suivante pour créer un journal d’audit des événements utilisateur. Cela suppose que vous avez créé la event_log_raw vue du pipeline qui vous intéresse, comme décrit dans Requête du journal des événements.
SELECT timestamp, details:user_action:action, details:user_action:user_name FROM event_log_raw WHERE event_type = 'user_action'
timestamp |
action |
user_name |
|---|---|---|
| 2021-05-20T19:36:03.517+0000 | START |
user@company.com |
| 2021-05-20T19:35:59.913+0000 | CREATE |
user@company.com |
| 2021-05-27T00:35:51.971+0000 | START |
user@company.com |
Informations sur le runtime
Vous pouvez afficher les informations d’exécution d’une mise à jour de pipeline, par exemple la version Databricks Runtime de la mise à jour. Cela suppose que vous avez créé la vue event_log_raw pour le pipeline qui vous intéresse, comme décrit dans Interroger le journal des événements.
SELECT details:create_update:runtime_version:dbr_version FROM event_log_raw WHERE event_type = 'create_update'
dbr_version |
|---|
| 11.0 |