Delen via


Logboek pijplijngebeurtenissen

Het gebeurtenislogboek van de pijplijn bevat alle informatie met betrekking tot een pijplijn, waaronder auditlogboeken, controles van gegevenskwaliteit, voortgang van pijplijnen en gegevensherkomst. U kunt het gebeurtenislogboek gebruiken om de status van uw gegevenspijplijnen bij te houden, te begrijpen en te bewaken.

U kunt vermeldingen in gebeurtenislogboeken bekijken in de gebruikersinterface voor pijplijnbewaking, de REST API voor pijplijnen of door rechtstreeks een query uit te voeren op het gebeurtenislogboek. Deze sectie is gericht op het rechtstreeks opvragen van het gebeurtenislogboek.

U kunt ook aangepaste acties definiëren die moeten worden uitgevoerd wanneer gebeurtenissen worden geregistreerd, bijvoorbeeld het verzenden van waarschuwingen, met gebeurtenishook.

Belangrijk

Verwijder niet het gebeurtenislogboek, de bovenliggende catalogus of het schema waarin het gebeurtenislogboek wordt gepubliceerd. Als u het gebeurtenislogboek verwijdert, kan dit ertoe leiden dat uw pijplijn niet kan worden bijgewerkt tijdens toekomstige uitvoeringen.

Zie het pijplijn gebeurtenislogboek schema voor volledige details van het schema van het gebeurtenislogboek.

het gebeurtenislogboek opvragen

Opmerking

In deze sectie worden het standaardgedrag en de syntaxis beschreven voor het werken met gebeurtenislogboeken voor pijplijnen die zijn geconfigureerd met Unity Catalog en de standaardpublicatiemodus.

Standaard schrijft een pijplijn het gebeurtenislogboek naar een verborgen Delta-tabel in de standaardcatalogus en het schema dat is geconfigureerd voor de pijplijn. Hoewel de tabel is verborgen, kan de tabel nog steeds worden opgevraagd door alle gebruikers met voldoende bevoegdheden. Standaard kan alleen de eigenaar van de pijplijn een query uitvoeren op de tabel van het gebeurtenislogboek.

Als u een query wilt uitvoeren op het gebeurtenislogboek als eigenaar, gebruikt u de pijplijn-id:

SELECT * FROM event_log(<pipelineId>);

Standaard wordt de naam voor het verborgen gebeurtenislogboek opgemaakt als event_log_{pipeline_id}, waarbij de pijplijn-id de door het systeem toegewezen UUID is, waarbij de streepjes zijn vervangen door onderstrepingstekens.

U kunt het gebeurtenislogboek publiceren door de geavanceerde instellingen voor uw pijplijn te bewerken. Zie de pijplijninstelling voor gebeurtenislogboeken voor meer informatie. Wanneer u een gebeurtenislogboek publiceert, geeft u de naam op voor het gebeurtenislogboek en geeft u desgewenst een catalogus en schema op, zoals in het volgende voorbeeld:

{
  "id": "ec2a0ff4-d2a5-4c8c-bf1d-d9f12f10e749",
  "name": "billing_pipeline",
  "event_log": {
    "catalog": "catalog_name",
    "schema": "schema_name",
    "name": "event_log_table_name"
  }
}

De locatie van het gebeurtenislogboek fungeert ook als de schemalocatie voor alle AutoLoader-query's in de pijplijn. Databricks raadt aan om een weergave te maken van de gebeurtenislogboektabel voordat u de bevoegdheden wijzigt, omdat sommige rekeninstellingen gebruikers mogelijk toegang geven tot schemametagegevens als de gebeurtenislogboektabel rechtstreeks wordt gedeeld. De volgende voorbeeldsyntaxis maakt een weergave in een gebeurtenislogboektabel en wordt gebruikt in de voorbeeldquery's voor gebeurtenislogboeken die in dit artikel zijn opgenomen. Vervang door <catalog_name>.<schema_name>.<event_log_table_name> de volledig gekwalificeerde tabelnaam van het gebeurtenislogboek van de pijplijn. Als u het gebeurtenislogboek hebt gepubliceerd, gebruikt u de naam die is opgegeven bij het publiceren. Gebruik anders event_log(<pipelineId>) waar de pipelineId de id is van de pijplijn die u wilt opvragen.

CREATE VIEW event_log_raw
AS SELECT * FROM <catalog_name>.<schema_name>.<event_log_table_name>;

In Unity Catalog bieden weergaven ondersteuning voor streamingquery's. In het volgende voorbeeld wordt Structured Streaming gebruikt om een query uit te voeren op een weergave die is gedefinieerd boven op een gebeurtenislogboektabel:

df = spark.readStream.table("event_log_raw")

Voorbeelden van basisquery's

In de volgende voorbeelden ziet u hoe u query's kunt uitvoeren op het gebeurtenislogboek om algemene informatie over pijplijnen op te halen en veelvoorkomende scenario's te helpen opsporen.

Pijplijnupdates bewaken door een query uit te voeren op eerdere updates

In het volgende voorbeeld worden de updates (of uitvoeringen) van uw pijplijn opgevraagd, met de update-id, status, begintijd, voltooiingstijd en duur. Dit geeft u een overzicht van uitvoeringen van de pijplijn.

U wordt verondersteld de event_log_raw weergave te hebben gemaakt voor de pijplijn waarin u geïnteresseerd bent, zoals beschreven in Het gebeurtenislogboek raadplegen.

with last_status_per_update AS (
    SELECT
        origin.pipeline_id AS pipeline_id,
        origin.pipeline_name AS pipeline_name,
        origin.update_id AS pipeline_update_id,
        FROM_JSON(details, 'struct<update_progress: struct<state: string>>').update_progress.state AS last_update_state,
        timestamp,
        ROW_NUMBER() OVER (
            PARTITION BY origin.update_id
            ORDER BY timestamp DESC
        ) AS rn
    FROM event_log_raw
    WHERE event_type = 'update_progress'
    QUALIFY rn = 1
),
update_durations AS (
    SELECT
        origin.pipeline_id AS pipeline_id,
        origin.pipeline_name AS pipeline_name,
        origin.update_id AS pipeline_update_id,
        -- Capture the start of the update
        MIN(CASE WHEN event_type = 'create_update' THEN timestamp END) AS start_time,

        -- Capture the end of the update based on terminal states or current timestamp (relevant for continuous mode pipelines)
        COALESCE(
            MAX(CASE
                WHEN event_type = 'update_progress'
                 AND FROM_JSON(details, 'struct<update_progress: struct<state: string>>').update_progress.state IN ('COMPLETED', 'FAILED', 'CANCELED')
                THEN timestamp
            END),
            current_timestamp()
        ) AS end_time
    FROM event_log_raw
    WHERE event_type IN ('create_update', 'update_progress')
      AND origin.update_id IS NOT NULL
    GROUP BY pipeline_id, pipeline_name, pipeline_update_id
    HAVING start_time IS NOT NULL
)
SELECT
    s.pipeline_id,
    s.pipeline_name,
    s.pipeline_update_id,
    d.start_time,
    d.end_time,
    CASE
        WHEN d.start_time IS NOT NULL AND d.end_time IS NOT NULL THEN
            ROUND(TIMESTAMPDIFF(MILLISECOND, d.start_time, d.end_time) / 1000)
        ELSE NULL
    END AS duration_seconds,
    s.last_update_state AS pipeline_update_status
FROM last_status_per_update s
JOIN update_durations d
  ON s.pipeline_id = d.pipeline_id
 AND s.pipeline_update_id = d.pipeline_update_id
ORDER BY d.start_time DESC;

Problemen met incrementeel vernieuwen van gerealiseerde weergaven opsporen

In dit voorbeeld worden alle stromen opgevraagd uit de meest recente update van een pijplijn. Er wordt aangegeven of ze incrementeel zijn bijgewerkt of niet, evenals andere relevante planningsgegevens die handig zijn voor foutopsporing waarom een incrementele vernieuwing niet plaatsvindt.

U wordt verondersteld de event_log_raw weergave te hebben gemaakt voor de pijplijn waarin u geïnteresseerd bent, zoals beschreven in Het gebeurtenislogboek raadplegen.

WITH latest_update AS (
  SELECT
    origin.pipeline_id,
    origin.update_id AS latest_update_id
  FROM event_log_raw AS origin
  WHERE origin.event_type = 'create_update'
  ORDER BY timestamp DESC
  -- LIMIT 1 -- remove if you want to get all of the update_ids
),
parsed_planning AS (
  SELECT
    origin.pipeline_name,
    origin.pipeline_id,
    origin.flow_name,
    lu.latest_update_id,
    from_json(
      details:planning_information,
      'struct<
        technique_information: array<struct<
          maintenance_type: string,
          is_chosen: boolean,
          is_applicable: boolean,
          cost: double,
          incrementalization_issues: array<struct<
            issue_type: string,
            prevent_incrementalization: boolean,
            operator_name: string,
            plan_not_incrementalizable_sub_type: string,
            expression_name: string,
            plan_not_deterministic_sub_type: string
          >>
        >>
      >'
    ) AS parsed
  FROM event_log_raw AS origin
  JOIN latest_update lu
    ON origin.update_id = lu.latest_update_id
  WHERE details:planning_information IS NOT NULL
),
chosen_technique AS (
  SELECT
    pipeline_name,
    pipeline_id,
    flow_name,
    latest_update_id,
    FILTER(parsed.technique_information, t -> t.is_chosen = true)[0] AS chosen_technique,
    parsed.technique_information AS planning_information
  FROM parsed_planning
)
SELECT
  pipeline_name,
  pipeline_id,
  flow_name,
  latest_update_id,
  chosen_technique.maintenance_type,
  chosen_technique,
  planning_information
FROM chosen_technique
ORDER BY latest_update_id DESC;

De kosten van een pijplijnupdate opvragen

In dit voorbeeld ziet u hoe u een query uitvoert op het DBU-gebruik voor een pijplijn, evenals de gebruiker voor een bepaalde pijplijnuitvoering.

SELECT
  sku_name,
  billing_origin_product,
  usage_date,
  collect_set(identity_metadata.run_as) as users,
  SUM(usage_quantity) AS `DBUs`
FROM
  system.billing.usage
WHERE
  usage_metadata.dlt_pipeline_id = :pipeline_id
GROUP BY
  ALL;

Geavanceerde query's

In de volgende voorbeelden ziet u hoe u een query uitvoert op het gebeurtenislogboek om minder algemene of meer geavanceerde scenario's af te handelen.

Query's uitvoeren op metrische gegevens voor alle stromen in een pijplijn

In dit voorbeeld ziet u hoe u gedetailleerde informatie over elke stroom in een pijplijn opvraagt. Hier ziet u de naam van de flow, de duur van de update, metrische gegevens over de gegevenskwaliteit, en informatie over de verwerkte rijen (uitvoerrijen, verwijderde, bijgewerkte en genegeerde records).

U wordt verondersteld de event_log_raw weergave te hebben gemaakt voor de pijplijn waarin u geïnteresseerd bent, zoals beschreven in Het gebeurtenislogboek raadplegen.

WITH flow_progress_raw AS (
  SELECT
    origin.pipeline_name         AS pipeline_name,
    origin.pipeline_id           AS pipeline_id,
    origin.flow_name             AS table_name,
    origin.update_id             AS update_id,
    timestamp,
    details:flow_progress.status AS status,
    TRY_CAST(details:flow_progress.metrics.num_output_rows AS BIGINT)      AS num_output_rows,
    TRY_CAST(details:flow_progress.metrics.num_upserted_rows AS BIGINT)    AS num_upserted_rows,
    TRY_CAST(details:flow_progress.metrics.num_deleted_rows AS BIGINT)     AS num_deleted_rows,
    TRY_CAST(details:flow_progress.data_quality.dropped_records AS BIGINT) AS num_expectation_dropped_rows,
    FROM_JSON(
      details:flow_progress.data_quality.expectations,
      SCHEMA_OF_JSON("[{'name':'str', 'dataset':'str', 'passed_records':42, 'failed_records':42}]")
    ) AS expectations_array

  FROM event_log_raw
  WHERE event_type = 'flow_progress'
    AND origin.flow_name IS NOT NULL
    AND origin.flow_name != 'pipelines.flowTimeMetrics.missingFlowName'
),

aggregated_flows AS (
  SELECT
    pipeline_name,
    pipeline_id,
    update_id,
    table_name,
    MIN(CASE WHEN status IN ('STARTING', 'RUNNING', 'COMPLETED') THEN timestamp END) AS start_timestamp,
    MAX(CASE WHEN status IN ('STARTING', 'RUNNING', 'COMPLETED') THEN timestamp END) AS end_timestamp,
    MAX_BY(status, timestamp) FILTER (
      WHERE status IN ('COMPLETED', 'FAILED', 'CANCELLED', 'EXCLUDED', 'SKIPPED', 'STOPPED', 'IDLE')
    ) AS final_status,
    SUM(COALESCE(num_output_rows, 0))              AS total_output_records,
    SUM(COALESCE(num_upserted_rows, 0))            AS total_upserted_records,
    SUM(COALESCE(num_deleted_rows, 0))             AS total_deleted_records,
    MAX(COALESCE(num_expectation_dropped_rows, 0)) AS total_expectation_dropped_records,
    MAX(expectations_array)                        AS total_expectations

  FROM flow_progress_raw
  GROUP BY pipeline_name, pipeline_id, update_id, table_name
)
SELECT
  af.pipeline_name,
  af.pipeline_id,
  af.update_id,
  af.table_name,
  af.start_timestamp,
  af.end_timestamp,
  af.final_status,
  CASE
    WHEN af.start_timestamp IS NOT NULL AND af.end_timestamp IS NOT NULL THEN
      ROUND(TIMESTAMPDIFF(MILLISECOND, af.start_timestamp, af.end_timestamp) / 1000)
    ELSE NULL
  END AS duration_seconds,

  af.total_output_records,
  af.total_upserted_records,
  af.total_deleted_records,
  af.total_expectation_dropped_records,
  af.total_expectations
FROM aggregated_flows af
-- Optional: filter to latest update only
WHERE af.update_id = (
  SELECT update_id
  FROM aggregated_flows
  ORDER BY end_timestamp DESC
  LIMIT 1
)
ORDER BY af.end_timestamp DESC, af.pipeline_name, af.pipeline_id, af.update_id, af.table_name;

Metrische gegevens van gegevenskwaliteit of verwachtingen opvragen

Als u verwachtingen definieert voor gegevenssets in uw pijplijn, worden de metrische gegevens voor het aantal doorgegeven records en mislukte verwachtingen opgeslagen in het details:flow_progress.data_quality.expectations object. De metrische waarde voor het aantal verwijderde records wordt opgeslagen in het details:flow_progress.data_quality object. Gebeurtenissen met informatie over de gegevenskwaliteit hebben het gebeurtenistype flow_progress.

Metrische gegevens over gegevenskwaliteit zijn mogelijk niet beschikbaar voor sommige gegevenssets. Bekijk de verwachtingsbeperkingen.

De volgende metrische gegevens over gegevenskwaliteit zijn beschikbaar:

Metrische gegevens Description
dropped_records Het aantal records dat is verwijderd omdat er een of meer verwachtingen zijn mislukt.
passed_records Het aantal records dat voldoet aan de verwachtingencriteria.
failed_records Het aantal records dat niet aan de verwachtingscriteria voldeed.

In het volgende voorbeeld worden de metrische gegevens van de gegevenskwaliteit voor de laatste pijplijnupdate opgevraagd. Hierbij wordt ervan uitgegaan dat u de event_log_raw weergave hebt gemaakt voor de pijplijn waarin u geïnteresseerd bent, zoals beschreven in de query op het gebeurtenislogboek.

WITH latest_update AS (
  SELECT
    origin.pipeline_id,
    origin.update_id AS latest_update_id
  FROM event_log_raw AS origin
  WHERE origin.event_type = 'create_update'
  ORDER BY timestamp DESC
  LIMIT 1 -- remove if you want to get all of the update_ids
),
SELECT
  row_expectations.dataset as dataset,
  row_expectations.name as expectation,
  SUM(row_expectations.passed_records) as passing_records,
  SUM(row_expectations.failed_records) as failing_records
FROM
  (
    SELECT
      explode(
        from_json(
          details:flow_progress:data_quality:expectations,
          "array<struct<name: string, dataset: string, passed_records: int, failed_records: int>>"
        )
      ) row_expectations
    FROM
      event_log_raw,
      latest_update
    WHERE
      event_type = 'flow_progress'
      AND origin.update_id = latest_update.id
  )
GROUP BY
  row_expectations.dataset,
  row_expectations.name;

Informatie over de afstamming van queries

Gebeurtenissen met informatie over herkomst hebben het gebeurtenistype flow_definition. Het details:flow_definition-object bevat de output_dataset en input_datasets die elke relatie in de grafiek definiëren.

Gebruik de volgende query om de invoer- en uitvoergegevenssets te extraheren en afstammingsinformatie weer te geven. Hierbij wordt ervan uitgegaan dat u de event_log_raw weergave hebt gemaakt voor de pijplijn waarin u geïnteresseerd bent, zoals beschreven in de query op het gebeurtenislogboek.

with latest_update as (
  SELECT origin.update_id as id
    FROM event_log_raw
    WHERE event_type = 'create_update'
    ORDER BY timestamp DESC
    limit 1 -- remove if you want all of the update_ids
)
SELECT
  details:flow_definition.output_dataset as flow_name,
  details:flow_definition.input_datasets as input_flow_names,
  details:flow_definition.flow_type as flow_type,
  details:flow_definition.schema, -- the schema of the flow
  details:flow_definition -- overall flow_definition object
FROM event_log_raw inner join latest_update on origin.update_id = latest_update.id
WHERE details:flow_definition IS NOT NULL
ORDER BY timestamp;

Cloudbestand-ingestie monitoren met Auto Loader

Pijplijnen genereren gebeurtenissen wanneer Auto Loader bestanden verwerkt. Voor Auto Loader-gebeurtenissen is de event_typeoperation_progress en de details:operation_progress:type is AUTO_LOADER_LISTING of AUTO_LOADER_BACKFILL. Het object details:operation_progress bevat ook status, duration_ms, auto_loader_details:source_pathen auto_loader_details:num_files_listed velden.

In het volgende voorbeeld worden automatisch laadprogramma-gebeurtenissen voor de meest recente update opgevraagd. Hierbij wordt ervan uitgegaan dat u de event_log_raw weergave hebt gemaakt voor de pijplijn waarin u geïnteresseerd bent, zoals beschreven in de query op het gebeurtenislogboek.

with latest_update as (
  SELECT origin.update_id as id
    FROM event_log_raw
    WHERE event_type = 'create_update'
    ORDER BY timestamp DESC
    limit 1 -- remove if you want all of the update_ids
)
SELECT
  timestamp,
  details:operation_progress.status,
  details:operation_progress.type,
  details:operation_progress:auto_loader_details
FROM
  event_log_raw,latest_update
WHERE
  event_type like 'operation_progress'
  AND
  origin.update_id = latest_update.id
  AND
  details:operation_progress.type in ('AUTO_LOADER_LISTING', 'AUTO_LOADER_BACKFILL');

Gegevensachterstand bewaken voor het optimaliseren van de streamingduur

Elke pijplijn houdt bij hoeveel gegevens aanwezig zijn in de achterstand in het details:flow_progress.metrics.backlog_bytes object. Gebeurtenissen met metrische gegevens over achterstand hebben het gebeurtenistype flow_progress. In het volgende voorbeeld worden metrische gegevens over achterstand opgevraagd voor de laatste pijplijnupdate. Hierbij wordt ervan uitgegaan dat u de event_log_raw weergave hebt gemaakt voor de pijplijn waarin u geïnteresseerd bent, zoals beschreven in de query op het gebeurtenislogboek.

with latest_update as (
  SELECT origin.update_id as id
    FROM event_log_raw
    WHERE event_type = 'create_update'
    ORDER BY timestamp DESC
    limit 1 -- remove if you want all of the update_ids
)
SELECT
  timestamp,
  Double(details :flow_progress.metrics.backlog_bytes) as backlog
FROM
  event_log_raw,
  latest_update
WHERE
  event_type ='flow_progress'
  AND
  origin.update_id = latest_update.id;

Opmerking

De metrische gegevens over achterstand zijn mogelijk niet beschikbaar, afhankelijk van het gegevensbrontype van de pijplijn en de Databricks Runtime-versie.

Autoscaling-gebeurtenissen monitoren voor het optimaliseren van klassieke computing

Voor pijplijnen die gebruikmaken van klassieke berekeningen (met andere woorden, geen serverloze berekening gebruiken), legt het gebeurtenislogboek de grootte van het cluster vast wanneer verbeterde automatische schaalaanpassing is ingeschakeld in uw pijplijnen. Gebeurtenissen met informatie over verbeterde automatische schaalaanpassing hebben het gebeurtenistype autoscale. De aanvraaggegevens voor het wijzigen van het formaat van het cluster worden opgeslagen in het details:autoscale-object.

In het volgende voorbeeld wordt een query uitgevoerd op de verbeterde aanvragen voor automatisch schalen van clusters voor de laatste pijplijnupdate. Hierbij wordt ervan uitgegaan dat u de event_log_raw weergave hebt gemaakt voor de pijplijn waarin u geïnteresseerd bent, zoals beschreven in de query op het gebeurtenislogboek.

with latest_update as (
  SELECT origin.update_id as id
    FROM event_log_raw
    WHERE event_type = 'create_update'
    ORDER BY timestamp DESC
    limit 1 -- remove if you want all of the update_ids
)
SELECT
  timestamp,
  Double(
    case
      when details :autoscale.status = 'RESIZING' then details :autoscale.requested_num_executors
      else null
    end
  ) as starting_num_executors,
  Double(
    case
      when details :autoscale.status = 'SUCCEEDED' then details :autoscale.requested_num_executors
      else null
    end
  ) as succeeded_num_executors,
  Double(
    case
      when details :autoscale.status = 'PARTIALLY_SUCCEEDED' then details :autoscale.requested_num_executors
      else null
    end
  ) as partially_succeeded_num_executors,
  Double(
    case
      when details :autoscale.status = 'FAILED' then details :autoscale.requested_num_executors
      else null
    end
  ) as failed_num_executors
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'autoscale'
  AND
  origin.update_id = latest_update.id

Bewaak het gebruik van rekenresources voor klassieke berekeningen

cluster_resources gebeurtenissen bieden metrische gegevens over het aantal taaksites in het cluster, hoeveel deze taaksites worden gebruikt en hoeveel taken er moeten worden gepland.

Wanneer verbeterde automatische schaalaanpassing is ingeschakeld, bevatten cluster_resources gebeurtenissen ook metrische gegevens voor het algoritme voor automatisch schalen, waaronder latest_requested_num_executorsen optimal_num_executors. De gebeurtenissen geven ook de status van het algoritme weer, zoals CLUSTER_AT_DESIRED_SIZE, SCALE_UP_IN_PROGRESS_WAITING_FOR_EXECUTORSen BLOCKED_FROM_SCALING_DOWN_BY_CONFIGURATION. Deze informatie kan worden weergegeven in combinatie met de gebeurtenissen voor automatisch schalen om een algemeen beeld te geven van verbeterde automatische schaalaanpassing.

In het volgende voorbeeld wordt een query uitgevoerd op de geschiedenis van de taakwachtrijgrootte, de gebruiksgeschiedenis, de geschiedenis van het aantal uitvoerders en andere metrische gegevens en status voor automatisch schalen in de laatste pijplijnupdate. Hierbij wordt ervan uitgegaan dat u de event_log_raw weergave hebt gemaakt voor de pijplijn waarin u geïnteresseerd bent, zoals beschreven in de query op het gebeurtenislogboek.

with latest_update as (
  SELECT origin.update_id as id
    FROM event_log_raw
    WHERE event_type = 'create_update'
    ORDER BY timestamp DESC
    limit 1 -- remove if you want all of the update_ids
)
SELECT
  timestamp,
  Double(details:cluster_resources.avg_num_queued_tasks) as queue_size,
  Double(details:cluster_resources.avg_task_slot_utilization) as utilization,
  Double(details:cluster_resources.num_executors) as current_executors,
  Double(details:cluster_resources.latest_requested_num_executors) as latest_requested_num_executors,
  Double(details:cluster_resources.optimal_num_executors) as optimal_num_executors,
  details :cluster_resources.state as autoscaling_state
FROM
  event_log_raw,
  latest_update
WHERE
  event_type = 'cluster_resources'
  AND
  origin.update_id = latest_update.id;

Pijplijnstreaminggegevens monitoren

U kunt metrische gegevens over de voortgang van de stream bekijken in een pijplijn. Voer een query uit voor stream_progress gebeurtenissen om gebeurtenissen te verkrijgen die vergelijkbaar zijn met de metrische gegevens van StreamingQueryListener die zijn gemaakt door Structured Streaming, met de volgende uitzonderingen:

  • De volgende metrische gegevens zijn aanwezig in StreamingQueryListener, maar niet in stream_progress: numInputRows, inputRowsPerSeconden processedRowsPerSecond.
  • Voor Kafka- en Kineses-stromen kunnen de startOffset, endOffseten latestOffset velden te groot zijn en afgekapt zijn. Voor elk van deze velden wordt een extra ...Truncated veld, startOffsetTruncated, endOffsetTruncateden latestOffsetTruncated, toegevoegd met een Booleaanse waarde om te bepalen of de gegevens worden afgekapt.

Als u een query wilt uitvoeren op stream_progress gebeurtenissen, kunt u een query zoals de volgende gebruiken:

SELECT
  parse_json(get_json_object(details, '$.stream_progress.progress_json')) AS stream_progress_json
FROM event_log_raw
WHERE event_type = 'stream_progress';

Hier volgt een voorbeeld van een gebeurtenis in JSON:

{
  "id": "abcd1234-ef56-7890-abcd-ef1234abcd56",
  "sequence": {
    "control_plane_seq_no": 1234567890123456
  },
  "origin": {
    "cloud": "<cloud>",
    "region": "<region>",
    "org_id": 0123456789012345,
    "pipeline_id": "abcdef12-abcd-3456-7890-abcd1234ef56",
    "pipeline_type": "WORKSPACE",
    "pipeline_name": "<pipeline name>",
    "update_id": "1234abcd-ef56-7890-abcd-ef1234abcd56",
    "request_id": "1234abcd-ef56-7890-abcd-ef1234abcd56"
  },
  "timestamp": "2025-06-17T03:18:14.018Z",
  "message": "Completed a streaming update of 'flow_name'."
  "level": "INFO",
  "details": {
    "stream_progress": {
      "progress": {
        "id": "abcdef12-abcd-3456-7890-abcd1234ef56",
        "runId": "1234abcd-ef56-7890-abcd-ef1234abcd56",
        "name": "silverTransformFromBronze",
        "timestamp": "2022-11-01T18:21:29.500Z",
        "batchId": 4,
        "durationMs": {
          "latestOffset": 62,
          "triggerExecution": 62
        },
        "stateOperators": [],
        "sources": [
          {
            "description": "DeltaSource[dbfs:/path/to/table]",
            "startOffset": {
              "sourceVersion": 1,
              "reservoirId": "abcdef12-abcd-3456-7890-abcd1234ef56",
              "reservoirVersion": 3216,
              "index": 3214,
              "isStartingVersion": true
            },
            "endOffset": {
              "sourceVersion": 1,
              "reservoirId": "abcdef12-abcd-3456-7890-abcd1234ef56",
              "reservoirVersion": 3216,
              "index": 3214,
              "isStartingVersion": true
            },
            "latestOffset": null,
            "metrics": {
              "numBytesOutstanding": "0",
              "numFilesOutstanding": "0"
            }
          }
        ],
        "sink": {
          "description": "DeltaSink[dbfs:/path/to/sink]",
          "numOutputRows": -1
        }
      }
    }
  },
  "event_type": "stream_progress",
  "maturity_level": "EVOLVING"
}

In dit voorbeeld worden niet-afgekorte records in een Kafka-bron weergegeven, waarbij de ...Truncated velden zijn ingesteld op false:

{
  "description": "KafkaV2[Subscribe[KAFKA_TOPIC_NAME_INPUT_A]]",
  "startOffsetTruncated": false,
  "startOffset": {
    "KAFKA_TOPIC_NAME_INPUT_A": {
      "0": 349706380
    }
  },
  "endOffsetTruncated": false,
  "endOffset": {
    "KAFKA_TOPIC_NAME_INPUT_A": {
      "0": 349706672
    }
  },
  "latestOffsetTruncated": false,
  "latestOffset": {
    "KAFKA_TOPIC_NAME_INPUT_A": {
      "0": 349706672
    }
  },
  "numInputRows": 292,
  "inputRowsPerSecond": 13.65826278123392,
  "processedRowsPerSecond": 14.479817514628582,
  "metrics": {
    "avgOffsetsBehindLatest": "0.0",
    "estimatedTotalBytesBehindLatest": "0.0",
    "maxOffsetsBehindLatest": "0",
    "minOffsetsBehindLatest": "0"
  }
}

Pijplijnen auditeren

U kunt gebeurtenislogboekrecords en andere Auditlogboeken van Azure Databricks gebruiken om een volledig beeld te krijgen van hoe gegevens in een pijplijn worden bijgewerkt.

Lakeflow Spark Declaratieve Pijplijnen gebruikt de inloggegevens van de eigenaar van de pijplijn om updates uit te voeren. U kunt de inloggegevens wijzigen door de eigenaar van de pijplijn bij te werken. Het auditlogboek registreert de gebruiker voor acties in de pijplijn, inclusief het maken van pijplijnen, wijzigingen in de configuratie en het activeren van updates.

Zie Unity Catalog-gebeurtenissen voor een verwijzing naar Unity Catalog-auditgebeurtenissen.

Query uitvoeren op gebruikersacties in het gebeurtenislogboek

U kunt het gebeurtenislogboek gebruiken om gebeurtenissen te controleren, bijvoorbeeld gebruikersacties. Gebeurtenissen met informatie over gebruikersacties hebben het gebeurtenistype user_action.

Informatie over de actie wordt opgeslagen in het user_action-object in het veld details. Gebruik de volgende query om een auditlogboek van gebruikersgebeurtenissen samen te stellen. Hierbij wordt ervan uitgegaan dat u de event_log_raw weergave hebt gemaakt voor de pijplijn waarin u geïnteresseerd bent, zoals beschreven in de query op het gebeurtenislogboek.

SELECT timestamp, details:user_action:action, details:user_action:user_name FROM event_log_raw WHERE event_type = 'user_action'
timestamp action user_name
2021-05-20T19:36:03.517+0000 START user@company.com
2021-05-20T19:35:59.913+0000 CREATE user@company.com
2021-05-27T00:35:51.971+0000 START user@company.com

Runtime-gegevens

U kunt de runtime-informatie voor een pijplijnupdate weergeven, zoals de Databricks Runtime-versie voor de update, op voorwaarde dat u de weergave voor de interessante pijplijn hebt gemaakt, zoals beschreven in 'Vraag het gebeurtenislogboek op' .

SELECT details:create_update:runtime_version:dbr_version FROM event_log_raw WHERE event_type = 'create_update'
dbr_version
11.0