Freigeben über


Vollständige Aktualisierung von Zieltabellen

Gilt für: ja SaaS-Connectors ja Datenbank-Connectors

Fully refreshing the ingestion pipeline clears the data und Zustand of the target tables, then reprocesses all records von der data source. Sie können alle Tabellen in der Pipeline vollständig aktualisieren oder Tabellen auswählen, die aktualisiert werden sollen.

Schnittstelle Anweisungen
Lakehouse-Benutzeroberfläche Manuelles Auslösen eines Pipelineupdates
Pipelines-API POST /api/2.0/pipelines/{pipeline_id}/updates
Databricks-Befehlszeilenschnittstelle databricks pipelines start-update beginnt ein Update der Databricks-Pipeline.

Von Bedeutung

Das Update der Aufnahmepipeline kann während der Initializing- oder Resetting tables-Phase fehlschlagen. Lakeflow Connect wiederholt die Pipeline automatisch mehrmals. Wenn Sie die automatischen Wiederholungen unterbrechen oder sie schließlich tödlich ausfallen, starten Sie ein neues Pipelineupdate manuell mit derselben Tabellenaktualisierungsauswahl wie zuvor. Andernfalls können die Zieltabellen in einem inkonsistenten Zustand mit Teildaten enden. Wenn manuelle Wiederholungen ebenfalls fehlschlagen, erstellen Sie ein Supportticket.

Vollständiges Aktualisierungsverhalten (CDC)

Gilt für: Mit Ja markierte Datenbankverbindungen

Wenn Sie eine vollständige Aktualisierung einer Tabelle auslösen, optimiert Databricks den Prozess, um Ausfallzeiten zu reduzieren und die Datenverfügbarkeit aufrechtzuerhalten:

  1. Momentaufnahmeanforderung: Wenn Sie eine vollständige Aktualisierung anfordern, beginnt das Ingestion-Gateway sofort mit dem Erstellen einer neuen Momentaufnahme der Quelltabelle. Die Zielstreamingtabelle wird aus der Auswahl der zu aktualisierenden Tabellen ausgeschlossen, bis die Momentaufnahme abgeschlossen ist.
  2. Fortgesetzte Verfügbarkeit: Während des Momentaufnahmevorgangs behält die Zielstreamingtabelle ihre vorhandenen Daten bei und bleibt für Abfragen verfügbar. Es werden keine Aktualisierungen, Einfügungen oder Löschungen auf die Tabelle angewendet, während der Momentaufnahme.
  3. Atomic refresh: Nachdem der Snapshot abgeschlossen ist, führt Databricks automatisch die vollständige Aktualisierung in einem einzigen Update durch. Dieses Update wendet alle Momentaufnahmendaten und alle CDC-Einträge an, die seit der Anforderung der Momentaufnahme gesammelt wurden.

Wenn Ihre Tabelle beispielsweise 50 Datensätze am Ende von Update 15 enthält und Sie eine vollständige Aktualisierung in Update 16 anfordern:

  1. Das Ingestion-Gateway beginnt mit der Erstellung einer Momentaufnahme während Update 16.
  2. Die Tabelle zeigt weiterhin die ursprünglichen 50 Datensätze an, bis die Momentaufnahme abgeschlossen ist.
  3. Wenn der Schnappschuss abgeschlossen ist (in Update 16 oder höher, abhängig von der Quelltabellengröße), wird der vollständige Refresh automatisch in einem atomaren Vorgang angewendet.

Dieser Ansatz reduziert die Ausfallzeiten während des vollständigen Aktualisierungsvorgangs erheblich und hilft dabei, PENDING_RESET und Timeout-Fehler zu vermeiden.

Konfigurieren Sie das Verhalten der vollständigen Aktualisierung für Datenbank-Connectoren

Gilt für:Ja-markierte Datenbank-ConnectorenJa-markierte API-basierte Pipeline-Erstellung

Erfahren Sie, wie Sie das vollständige Aktualisierungsverhalten für verwaltete Aufnahmepipelines mit Datenbankconnectors (z. B. SQL Server) in Lakeflow Connect konfigurieren. Sie können planen, wann vollständige Aktualisierungssnapshots durchgeführt werden, und die automatische vollständige Aktualisierung aktivieren, um sich von nicht unterstützten Schemaänderungen zu erholen.

Vollständiges Aktualisierungsfenster

Mit einem vollständigen Aktualisierungsfenster können Sie festlegen, wann Momentaufnahmeoperationen für die vollständige Aktualisierung vorgesehen sind. Wenn Sie eine vollständige Aktualisierung anfordern oder wenn das System automatisch eine vollständige Aktualisierung auslöst, wird die Momentaufnahme während der nächsten verfügbaren Zeit im konfigurierten Fenster gestartet. Die folgende Tabelle zeigt, wie die Planung funktioniert:

Anforderungszeit Fenster Momentaufnahme wird gestartet Hinweise
Montag 2025-10-20 10:00:00 UTC Startstunde: 20, Tage: Dienstag, Zeitzone: UTC Dienstag 2025-10-21 20:00:00 UTC Momentaufnahme wird auf den nächsten verfügbaren Fenstertag zurückgestellt
Montag 2025-10-20 09:30:00 UTC Startstunde: 9, Tage: Montag, Zeitzone: UTC Montag 2025-10-20 09:30:00 UTC Am selben Tag, Anforderungszeit innerhalb des Zeitfensters
Montag 2025-10-20 10:00:00 UTC Startstunde: 9, Tage: Montag, Zeitzone: UTC Montag 2025-10-27 09:00:00 UTC Zeit nach dem letzten Fenster anfordern, auf die nächste Woche zurückgestellt

Konfigurationsparameter

Konfigurieren Sie das vollständige Aktualisierungsfenster in der ingestion_definition der Pipelinespezifikation:

Parameter Typ Description Erforderlich
start_hour Integer Die Startstunde für das Fenster (0-23) im 24-Stunden-Tag. Yes
days_of_week Array Tage, an dem das Fenster aktiv ist. Gültige Werte: MONDAY, , TUESDAY, WEDNESDAYTHURSDAY, FRIDAY, , SATURDAY. SUNDAY Wenn nicht angegeben, werden alle Tage verwendet. Nein
time_zone_id String Zeitzonen-ID für das Fenster. Siehe Festlegen der Sitzungszeitzone für unterstützte Zeitzonen-IDs. Wenn nicht angegeben, wird standardmäßig UTC verwendet. Nein

Beispiel: Konfigurieren eines vollständigen Aktualisierungsfensters

Die folgenden Beispiele zeigen, wie Sie Ihrer Pipelinedefinition ein vollständiges Aktualisierungsfenster hinzufügen.

Databricks-Ressourcenpakete
resources:
  pipelines:
    gateway:
      name: <gateway-name>
      gateway_definition:
        connection_id: <connection-id>
        gateway_storage_catalog: <destination-catalog>
        gateway_storage_schema: <destination-schema>
        gateway_storage_name: <destination-schema>
      target: <destination-schema>
      catalog: <destination-catalog>

    pipeline_sqlserver:
      name: <pipeline-name>
      catalog: <destination-catalog>
      schema: <destination-schema>
      ingestion_definition:
        ingestion_gateway_id: <gateway-id>
        objects:
          - table:
              source_schema: <source-schema>
              source_table: <source-table>
              destination_catalog: <destination-catalog>
              destination_schema: <destination-schema>
        full_refresh_window:
          start_hour: 20
          days_of_week:
            - MONDAY
            - TUESDAY
          time_zone_id: 'America/Los_Angeles'
Databricks-Notizbuch
gateway_pipeline_spec = {
  "pipeline_type": "INGESTION_GATEWAY",
  "name": "<gateway-name>",
  "catalog": "<destination-catalog>",
  "target": "<destination-schema>",
  "gateway_definition": {
    "connection_id": "<connection-id>",
    "gateway_storage_catalog": "<destination-catalog>",
    "gateway_storage_schema": "<destination-schema>",
    "gateway_storage_name": "<destination-schema>"
  }
}

ingestion_pipeline_spec = {
  "pipeline_type": "MANAGED_INGESTION",
  "name": "<pipeline-name>",
  "catalog": "<destination-catalog>",
  "schema": "<destination-schema>",
  "ingestion_definition": {
    "ingestion_gateway_id": "<gateway-pipeline-id>",
    "source_type": "SQLSERVER",
    "objects": [
      {
        "table": {
          "source_schema": "<source-schema>",
          "source_table": "<source-table>",
          "destination_catalog": "<destination-catalog>",
          "destination_schema": "<destination-schema>"
        }
      }
    ],
    "full_refresh_window": {
      "start_hour": 20,
      "days_of_week": ["MONDAY", "TUESDAY"],
      "time_zone_id": "America/Los_Angeles"
    }
  }
}

Richtlinie für die automatische vollständige Aktualisierung

Um die Datenkonsistenz ohne manuelle Eingriffe zu gewährleisten, können Sie mit einer Richtlinie für die automatische vollständige Aktualisierung automatisch eine vollständige Aktualisierung auslösen, wenn die Pipeline auf nicht unterstützte DDL-Vorgänge trifft:

  • Tabelle kürzen
  • Inkompatible Schemaänderungen (z. B. Datentypänderungen)
  • Spaltenumbenennungen
  • Spaltenzufügungen mit Standardwerten

Ohne die automatische vollständige Aktualisierung aktiviert, müssen Sie beim Auftreten dieser Vorgänge manuell eine vollständige Aktualisierung auslösen.

Konfigurationsparameter

Konfigurieren Sie die automatische vollständige Aktualisierung auf Pipelineebene oder Tabellenebene in Ihrer Pipelinespezifikation:

Parameter Typ Description Standard
enabled Boolean Gibt an, ob die automatische vollständige Aktualisierung aktiviert ist. false
min_interval_hours Integer Mindestwarteintervall in Stunden zwischen vollständigen Aktualisierungen. Das System wartet auf dieses Intervall seit der letzten Momentaufnahme, bevor eine neue automatische vollständige Aktualisierung initiiert wird. 24

Sie können die automatische vollständige Aktualisierung auf mehreren Ebenen konfigurieren:

  • Pipelineebene: In ingestion_definition.table_configuration.auto_full_refresh_policy
  • Tabellenebene: In ingestion_definition.objects[].table.table_configuration.auto_full_refresh_policy

Konfiguration auf Tabellenebene setzt die Konfiguration auf Pipelineebene außer Kraft.

Beispiel: Konfigurieren der automatischen vollständigen Aktualisierung auf Pipelineebene

Die folgenden Beispiele zeigen, wie Sie die automatische vollständige Aktualisierung für alle Tabellen in einer Pipeline aktivieren.

Databricks-Ressourcenpakete
resources:
  pipelines:
    gateway:
      name: <gateway-name>
      gateway_definition:
        connection_id: <connection-id>
        gateway_storage_catalog: <destination-catalog>
        gateway_storage_schema: <destination-schema>
        gateway_storage_name: <destination-schema>
      target: <destination-schema>
      catalog: <destination-catalog>

    pipeline_sqlserver:
      name: <pipeline-name>
      catalog: <destination-catalog>
      schema: <destination-schema>
      ingestion_definition:
        ingestion_gateway_id: <gateway-id>
        objects:
          - table:
              source_schema: <source-schema>
              source_table: <source-table>
              destination_catalog: <destination-catalog>
              destination_schema: <destination-schema>
        table_configuration:
          auto_full_refresh_policy:
            enabled: true
            min_interval_hours: 24
Databricks-Notizbuch
gateway_pipeline_spec = {
  "pipeline_type": "INGESTION_GATEWAY",
  "name": "<gateway-name>",
  "catalog": "<destination-catalog>",
  "target": "<destination-schema>",
  "gateway_definition": {
    "connection_id": "<connection-id>",
    "gateway_storage_catalog": "<destination-catalog>",
    "gateway_storage_schema": "<destination-schema>",
    "gateway_storage_name": "<destination-schema>"
  }
}

ingestion_pipeline_spec = {
  "pipeline_type": "MANAGED_INGESTION",
  "name": "<pipeline-name>",
  "catalog": "<destination-catalog>",
  "schema": "<destination-schema>",
  "ingestion_definition": {
    "ingestion_gateway_id": "<gateway-pipeline-id>",
    "source_type": "SQLSERVER",
    "objects": [
      {
        "table": {
          "source_schema": "<source-schema>",
          "source_table": "<source-table>",
          "destination_catalog": "<destination-catalog>",
          "destination_schema": "<destination-schema>"
        }
      }
    ],
    "table_configuration": {
      "auto_full_refresh_policy": {
        "enabled": True,
        "min_interval_hours": 24
      }
    }
  }
}

Beispiel: Konfigurieren der automatischen vollständigen Aktualisierung pro Tabelle

Die folgenden Beispiele zeigen, wie Sie die automatische vollständige Aktualisierung auf Pipelineebene aktivieren, aber für bestimmte Tabellen deaktivieren.

Databricks-Ressourcenpakete
resources:
  pipelines:
    gateway:
      name: <gateway-name>
      gateway_definition:
        connection_id: <connection-id>
        gateway_storage_catalog: <destination-catalog>
        gateway_storage_schema: <destination-schema>
        gateway_storage_name: <destination-schema>
      target: <destination-schema>
      catalog: <destination-catalog>

    pipeline_sqlserver:
      name: <pipeline-name>
      catalog: <destination-catalog>
      schema: <destination-schema>
      ingestion_definition:
        ingestion_gateway_id: <gateway-id>
        objects:
          - table:
              source_schema: <source-schema>
              source_table: table_1
              destination_catalog: <destination-catalog>
              destination_schema: <destination-schema>
          - table:
              source_schema: <source-schema>
              source_table: table_2
              destination_catalog: <destination-catalog>
              destination_schema: <destination-schema>
              table_configuration:
                auto_full_refresh_policy:
                  enabled: false
                  min_interval_hours: 24
        table_configuration:
          auto_full_refresh_policy:
            enabled: true
            min_interval_hours: 24
Databricks-Notizbuch
gateway_pipeline_spec = {
  "pipeline_type": "INGESTION_GATEWAY",
  "name": "<gateway-name>",
  "catalog": "<destination-catalog>",
  "target": "<destination-schema>",
  "gateway_definition": {
    "connection_id": "<connection-id>",
    "gateway_storage_catalog": "<destination-catalog>",
    "gateway_storage_schema": "<destination-schema>",
    "gateway_storage_name": "<destination-schema>"
  }
}

ingestion_pipeline_spec = {
  "pipeline_type": "MANAGED_INGESTION",
  "name": "<pipeline-name>",
  "catalog": "<destination-catalog>",
  "schema": "<destination-schema>",
  "ingestion_definition": {
    "ingestion_gateway_id": "<gateway-pipeline-id>",
    "source_type": "SQLSERVER",
    "objects": [
      {
        "table": {
          "source_schema": "<source-schema>",
          "source_table": "table_1",
          "destination_catalog": "<destination-catalog>",
          "destination_schema": "<destination-schema>"
        }
      },
      {
        "table": {
          "source_schema": "<source-schema>",
          "source_table": "table_2",
          "destination_catalog": "<destination-catalog>",
          "destination_schema": "<destination-schema>",
          "table_configuration": {
            "auto_full_refresh_policy": {
              "enabled": False,
              "min_interval_hours": 24
            }
          }
        }
      }
    ],
    "table_configuration": {
      "auto_full_refresh_policy": {
        "enabled": True,
        "min_interval_hours": 24
      }
    }
  }
}

In diesem Beispiel wird die Richtlinie auf Pipelineebene (aktiviert) verwendet, während sie bei table_1 aufgrund der Konfiguration auf Tabellenebene (deaktiviert) bei table_2 überschrieben wird.