Freigeben über


Erfassen von Daten aus SQL Server

Auf dieser Seite wird beschrieben, wie Sie Daten aus SQL Server aufnehmen und in Azure Databricks mithilfe von Lakeflow Connect laden. Der SQL Server-Connector unterstützt Azure SQL-Datenbank, azure SQL Managed Instance und Amazon RDS SQL-Datenbanken. Dazu gehört SQL Server, der auf virtuellen Azure-Computern (VMs) und Amazon EC2 ausgeführt wird. Der Connector unterstützt auch sql Server lokal mit Azure ExpressRoute und AWS Direct Connect-Netzwerk.

Bevor Sie beginnen

Um ein Aufnahmegateway und eine Aufnahmepipeline zu erstellen, müssen Sie die folgenden Anforderungen erfüllen:

  • Ihr Arbeitsbereich ist für Unity Catalog aktiviert.

  • Serverlose Berechnung ist für Ihren Arbeitsbereich aktiviert. Siehe Serverlose Computeanforderungen.

  • Wenn Sie eine Verbindung erstellen möchten: Sie verfügen über CREATE CONNECTION Berechtigungen für den Metastore.

    Wenn Ihr Connector die benutzeroberflächenbasierte Pipelineerstellung unterstützt, können Sie die Verbindung und die Pipeline gleichzeitig erstellen, indem Sie die Schritte auf dieser Seite ausführen. Wenn Sie jedoch apibasierte Pipelineerstellung verwenden, müssen Sie die Verbindung im Katalog-Explorer erstellen, bevor Sie die Schritte auf dieser Seite ausführen. Siehe Herstellen einer Verbindung mit verwalteten Aufnahmequellen.

  • Wenn Sie beabsichtigen, eine vorhandene Verbindung zu verwenden: Sie verfügen über USE CONNECTION Berechtigungen oder ALL PRIVILEGES für die Verbindung.

  • Sie verfügen über USE CATALOG-Berechtigungen für den Zielkatalog.

  • Sie verfügen über USE SCHEMA, CREATE TABLE und CREATE VOLUME Berechtigungen für ein vorhandenes Schema oder über CREATE SCHEMA Berechtigungen für den Zielkatalog.

  • Sie haben Zugriff auf eine primäre SQL Server-Instanz. Die Funktionen zur Änderungsverfolgung und zur Erfassung von Änderungsdaten werden auf Leserepliken oder sekundären Instanzen nicht unterstützt.

  • Uneingeschränkte Berechtigungen zum Erstellen von Clustern oder einer benutzerdefinierten Richtlinie (nur API). Eine benutzerdefinierte Richtlinie für das Gateway muss die folgenden Anforderungen erfüllen:

    • Familie: Job-Compute

    • Außerkraftsetzungen der Richtlinienfamilie:

      {
        "cluster_type": {
          "type": "fixed",
          "value": "dlt"
        },
        "num_workers": {
          "type": "unlimited",
          "defaultValue": 1,
          "isOptional": true
        },
        "runtime_engine": {
          "type": "fixed",
          "value": "STANDARD",
          "hidden": true
        }
      }
      
    • Databricks empfiehlt, die kleinstmöglichen Workerknoten für Aufnahmegateways anzugeben, da sie sich nicht auf die Leistung der Gateways auswirken. Mit der folgenden Berechnungsrichtlinie können Azure Databricks das Aufnahmegateway skalieren, um die Anforderungen Ihrer Workload zu erfüllen. Die Mindestanforderung ist 8 Kerne, um eine effiziente und leistungsfähige Datenextraktion aus Der Quelldatenbank zu ermöglichen.

      {
        "driver_node_type_id": {
          "type": "fixed",
          "value": "Standard_E64d_v4"
        },
        "node_type_id": {
          "type": "fixed",
          "value": "Standard_F4s"
        }
      }
      

    Weitere Informationen zu Clusterrichtlinien finden Sie unter Auswählen einer Richtlinie für Rechenressourcen.

Zum Abrufen von SQL Server müssen Sie auch das Quellsetup abschließen.

Option 1: Azure Databricks-Benutzeroberfläche

Administratoren können gleichzeitig in der Benutzeroberfläche eine Verbindung und eine Pipeline erstellen. Dies ist die einfachste Möglichkeit zum Erstellen von verwalteten Aufnahmepipelines.

  1. Klicken Sie in der Randleiste des Azure Databricks-Arbeitsbereichs auf "Datenaufnahme".

  2. Klicken Sie auf der Seite " Daten hinzufügen " unter "Databricks-Connectors" auf SQL Server.

    Der Erfassungs-Assistent wird geöffnet.

  3. Geben Sie auf der Seite Ingestion-Gateway des Assistenten einen eindeutigen Namen für das Gateway ein.

  4. Wählen Sie einen Katalog und ein Schema für die Staging-Aufnahmedaten aus, und klicken Sie dann auf "Weiter".

  5. Geben Sie auf der Seite "Ingestion-Pipeline " einen eindeutigen Namen für die Pipeline ein.

  6. Wählen Sie für den Zielkatalog einen Katalog aus, um die aufgenommenen Daten zu speichern.

  7. Wählen Sie die Unity-Katalogverbindung aus, die die für den Zugriff auf die Quelldaten erforderlichen Anmeldeinformationen speichert.

    Wenn keine Verbindungen mit der Quelle vorhanden sind, klicken Sie auf "Verbindung erstellen" , und geben Sie die Authentifizierungsdetails ein, die Sie aus der Quelleinrichtung erhalten haben. Sie müssen über CREATE CONNECTION-Berechtigungen für den Metaspeicher verfügen.

  8. Klicken Sie auf "Pipeline erstellen", und fahren Sie fort.

  9. Wählen Sie auf der Seite "Quelle" die aufzunehmenden Tabellen aus.

  10. Ändern Sie optional die Standardeinstellung für die Verlaufsverfolgung. Weitere Informationen finden Sie unter Aktivieren der Verlaufsverfolgung (SCD-Typ 2).

  11. Klicke auf Weiter.

  12. Wählen Sie auf der Seite "Ziel " den Unity-Katalog und das Schema aus, in das geschrieben werden soll.

    Wenn Sie kein vorhandenes Schema verwenden möchten, klicken Sie auf "Schema erstellen". Sie müssen über die Berechtigungen USE CATALOG und CREATE SCHEMA für den übergeordneten Katalog verfügen.

  13. Klicken Sie auf Speichern und fortfahren.

  14. (Optional) Klicken Sie auf der Seite "Einstellungen" auf " Zeitplan erstellen". Legen Sie die Häufigkeit fest, mit der die Zieltabellen aktualisiert werden.

  15. (Optional) Festlegen von E-Mail-Benachrichtigungen für Erfolg oder Fehler des Pipelinevorgangs.

  16. Klicken Sie auf "Speichern", und führen Sie die Pipelineaus.

Option 2: Andere Schnittstellen

Bevor Sie Databricks Asset Bundles, Databricks APIs, Databricks SDKs oder die Databricks CLI verwenden, müssen Sie Zugriff auf eine bestehende Unity-Katalog-Verbindung haben. Anweisungen finden Sie unter Herstellen einer Verbindung mit verwalteten Aufnahmequellen.

Erstellen des Staging-Katalogs und des Schemas

Der Stagingkatalog und das Stagingschema können mit dem Zielkatalog und -schema identisch sein. Bei dem Staging-Katalog kann es sich nicht um einen externen Katalog handeln.

Befehlszeilenschnittstelle (CLI)

export CONNECTION_NAME="my_connection"
export TARGET_CATALOG="main"
export TARGET_SCHEMA="lakeflow_sqlserver_connector_cdc"
export STAGING_CATALOG=$TARGET_CATALOG
export STAGING_SCHEMA=$TARGET_SCHEMA
export DB_HOST="cdc-connector.database.windows.net"
export DB_USER="..."
export DB_PASSWORD="..."

output=$(databricks connections create --json '{
  "name": "'"$CONNECTION_NAME"'",
  "connection_type": "SQLSERVER",
  "options": {
    "host": "'"$DB_HOST"'",
    "port": "1433",
    "trustServerCertificate": "false",
    "user": "'"$DB_USER"'",
    "password": "'"$DB_PASSWORD"'"
  }
}')

export CONNECTION_ID=$(echo $output | jq -r '.connection_id')

Erstellen der Gateway- und Erfassungspipeline

Das Gateway zum Einbinden extrahiert Snapshot- und Änderungsdaten aus der Quelldatenbank und speichert sie im Unity Catalog Staging Volume. Sie müssen das Gateway als fortlaufende Pipeline ausführen. Dadurch können Sie alle Aufbewahrungsrichtlinien für Änderungsprotokolle berücksichtigen, die Sie in der Quelldatenbank haben.

Die Erfassungspipeline wendet die Momentaufnahme- und Änderungsdaten aus dem Stagingvolume in Zielstreamingtabellen an.

Databricks-Ressourcenpakete

Auf dieser Registerkarte wird beschrieben, wie Sie eine Datenerfassungspipeline mithilfe von Databricks Asset Bundles bereitstellen. Bundles können YAML-Definitionen von Aufträgen und Aufgaben enthalten, mithilfe der Databricks CLI verwaltet und in verschiedenen Zielarbeitsbereichen (z. B. Entwicklung, Staging und Produktion) freigegeben und ausgeführt werden. Weitere Informationen finden Sie unter Databricks Asset Bundles.

  1. Erstellen Eines neuen Bündels mithilfe der Databricks CLI:

    databricks bundle init
    
  2. Fügen Sie dem Bundle zwei neue Ressourcendateien hinzu:

    • Eine Pipelinedefinitionsdatei (resources/sqlserver_pipeline.yml).
    • Eine Workflowdatei, die die Häufigkeit der Datenaufnahme steuert (resources/sqlserver.yml).

    Im Folgenden finden Sie eine resources/sqlserver_pipeline.yml-Beispieldatei:

    variables:
      # Common variables used multiple places in the DAB definition.
      gateway_name:
        default: sqlserver-gateway
      dest_catalog:
        default: main
      dest_schema:
        default: ingest-destination-schema
    
    resources:
      pipelines:
        gateway:
          name: ${var.gateway_name}
          gateway_definition:
            connection_name: <sqlserver-connection>
            gateway_storage_catalog: main
            gateway_storage_schema: ${var.dest_schema}
            gateway_storage_name: ${var.gateway_name}
          target: ${var.dest_schema}
          catalog: ${var.dest_catalog}
    
        pipeline_sqlserver:
          name: sqlserver-ingestion-pipeline
          ingestion_definition:
            ingestion_gateway_id: ${resources.pipelines.gateway.id}
            objects:
              # Modify this with your tables!
              - table:
                  # Ingest the table test.ingestion_demo_lineitem to dest_catalog.dest_schema.ingestion_demo_line_item.
                  source_catalog: test
                  source_schema: ingestion_demo
                  source_table: lineitem
                  destination_catalog: ${var.dest_catalog}
                  destination_schema: ${var.dest_schema}
              - schema:
                  # Ingest all tables in the test.ingestion_whole_schema schema to dest_catalog.dest_schema. The destination
                  # table name will be the same as it is on the source.
                  source_catalog: test
                  source_schema: ingestion_whole_schema
                  destination_catalog: ${var.dest_catalog}
                  destination_schema: ${var.dest_schema}
          target: ${var.dest_schema}
          catalog: ${var.dest_catalog}
    

    Im Folgenden finden Sie eine resources/sqlserver_job.yml-Beispieldatei:

    resources:
      jobs:
        sqlserver_dab_job:
          name: sqlserver_dab_job
    
          trigger:
            # Run this job every day, exactly one day from the last run
            # See https://docs.databricks.com/api/workspace/jobs/create#trigger
            periodic:
              interval: 1
              unit: DAYS
    
          email_notifications:
            on_failure:
              - <email-address>
    
          tasks:
            - task_key: refresh_pipeline
              pipeline_task:
                pipeline_id: ${resources.pipelines.pipeline_sqlserver.id}
    
  3. Stellen Sie die Pipeline mithilfe der Databricks CLI bereit:

    databricks bundle deploy
    

Notebook

Aktualisieren Sie die Configuration-Zelle im folgenden Notebook mit der Quellverbindung, dem Zielkatalog, dem Zielschema und den Tabellen, um Daten aus der Quelle zu erfassen.

Erstellen einer Gateway- und Erfassungspipeline

Notebook abrufen

Befehlszeilenschnittstelle (CLI)

So erstellen Sie das Gateway:

output=$(databricks pipelines create --json '{
"name": "'"$GATEWAY_PIPELINE_NAME"'",
"gateway_definition": {
  "connection_id": "'"$CONNECTION_ID"'",
  "gateway_storage_catalog": "'"$STAGING_CATALOG"'",
  "gateway_storage_schema": "'"$STAGING_SCHEMA"'",
  "gateway_storage_name": "'"$GATEWAY_PIPELINE_NAME"'"
  }
}')

export GATEWAY_PIPELINE_ID=$(echo $output | jq -r '.pipeline_id')

So erstellen Sie die Erfassungspipeline:

databricks pipelines create --json '{
"name": "'"$INGESTION_PIPELINE_NAME"'",
"ingestion_definition": {
  "ingestion_gateway_id": "'"$GATEWAY_PIPELINE_ID"'",
  "objects": [
    {"table": {
        "source_catalog": "tpc",
        "source_schema": "tpch",
        "source_table": "lineitem",
        "destination_catalog": "'"$TARGET_CATALOG"'",
        "destination_schema": "'"$TARGET_SCHEMA"'",
        "destination_table": "<YOUR_DATABRICKS_TABLE>",
        }},
     {"schema": {
        "source_catalog": "tpc",
        "source_schema": "tpcdi",
        "destination_catalog": "'"$TARGET_CATALOG"'",
        "destination_schema": "'"$TARGET_SCHEMA"'"
        }}
    ]
  }
}'

Starten, Planen und Einrichten von Benachrichtigungen für Ihre Pipeline

Sie können einen Zeitplan für die Pipeline auf der Pipelinedetailseite erstellen.

  1. Nachdem die Pipeline erstellt wurde, überprüfen Sie den Azure Databricks-Arbeitsbereich, und klicken Sie dann auf "Pipelines".

    Die neue Pipeline wird in der Pipelineliste angezeigt.

  2. Um die Pipelinedetails anzuzeigen, klicken Sie auf den Pipelinenamen.

  3. Auf der Seite mit den Pipelinedetails können Sie die Pipeline planen, indem Sie auf "Zeitplan" klicken.

  4. Um Benachrichtigungen für die Pipeline festzulegen, klicken Sie auf "Einstellungen", und fügen Sie dann eine Benachrichtigung hinzu.

Für jeden Zeitplan, den Sie einer Pipeline hinzufügen, erstellt Lakeflow Connect automatisch einen Auftrag dafür. Die Pipeline zum Einbinden von Daten ist eine Aufgabe innerhalb eines Jobs. Sie können dem Auftrag optional weitere Aufgaben hinzufügen.

Überprüfen der erfolgreichen Datenerfassung

Die Listenansicht auf der Detailseite der Pipeline zeigt die Anzahl der Datensätze an, die beim Einbinden der Daten verarbeitet werden. Diese Zahlen werden automatisch aktualisiert.

Überprüfen der Replikation

Die Spalten Upserted records und Deleted records werden standardmäßig nicht angezeigt. Sie können sie aktivieren, indem Sie auf die Schaltfläche für die Spaltenkonfiguration (Symbol für die Spaltenkonfiguration) klicken und die Spalten auswählen.

Weitere Ressourcen