Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
Auf dieser Seite wird beschrieben, wie Sie Daten aus SQL Server aufnehmen und in Azure Databricks mithilfe von Lakeflow Connect laden. Der SQL Server-Connector unterstützt Azure SQL-Datenbank, azure SQL Managed Instance und Amazon RDS SQL-Datenbanken. Dazu gehört SQL Server, der auf virtuellen Azure-Computern (VMs) und Amazon EC2 ausgeführt wird. Der Connector unterstützt auch sql Server lokal mit Azure ExpressRoute und AWS Direct Connect-Netzwerk.
Bevor Sie beginnen
Um ein Aufnahmegateway und eine Aufnahmepipeline zu erstellen, müssen Sie die folgenden Anforderungen erfüllen:
Ihr Arbeitsbereich ist für Unity Catalog aktiviert.
Serverlose Berechnung ist für Ihren Arbeitsbereich aktiviert. Siehe Serverlose Computeanforderungen.
Wenn Sie eine Verbindung erstellen möchten: Sie verfügen über
CREATE CONNECTIONBerechtigungen für den Metastore.Wenn Ihr Connector die benutzeroberflächenbasierte Pipelineerstellung unterstützt, können Sie die Verbindung und die Pipeline gleichzeitig erstellen, indem Sie die Schritte auf dieser Seite ausführen. Wenn Sie jedoch apibasierte Pipelineerstellung verwenden, müssen Sie die Verbindung im Katalog-Explorer erstellen, bevor Sie die Schritte auf dieser Seite ausführen. Siehe Herstellen einer Verbindung mit verwalteten Aufnahmequellen.
Wenn Sie beabsichtigen, eine vorhandene Verbindung zu verwenden: Sie verfügen über
USE CONNECTIONBerechtigungen oderALL PRIVILEGESfür die Verbindung.Sie verfügen über
USE CATALOG-Berechtigungen für den Zielkatalog.Sie verfügen über
USE SCHEMA,CREATE TABLEundCREATE VOLUMEBerechtigungen für ein vorhandenes Schema oder überCREATE SCHEMABerechtigungen für den Zielkatalog.
Sie haben Zugriff auf eine primäre SQL Server-Instanz. Die Funktionen zur Änderungsverfolgung und zur Erfassung von Änderungsdaten werden auf Leserepliken oder sekundären Instanzen nicht unterstützt.
Uneingeschränkte Berechtigungen zum Erstellen von Clustern oder einer benutzerdefinierten Richtlinie (nur API). Eine benutzerdefinierte Richtlinie für das Gateway muss die folgenden Anforderungen erfüllen:
Familie: Job-Compute
Außerkraftsetzungen der Richtlinienfamilie:
{ "cluster_type": { "type": "fixed", "value": "dlt" }, "num_workers": { "type": "unlimited", "defaultValue": 1, "isOptional": true }, "runtime_engine": { "type": "fixed", "value": "STANDARD", "hidden": true } }Databricks empfiehlt, die kleinstmöglichen Workerknoten für Aufnahmegateways anzugeben, da sie sich nicht auf die Leistung der Gateways auswirken. Mit der folgenden Berechnungsrichtlinie können Azure Databricks das Aufnahmegateway skalieren, um die Anforderungen Ihrer Workload zu erfüllen. Die Mindestanforderung ist 8 Kerne, um eine effiziente und leistungsfähige Datenextraktion aus Der Quelldatenbank zu ermöglichen.
{ "driver_node_type_id": { "type": "fixed", "value": "Standard_E64d_v4" }, "node_type_id": { "type": "fixed", "value": "Standard_F4s" } }
Weitere Informationen zu Clusterrichtlinien finden Sie unter Auswählen einer Richtlinie für Rechenressourcen.
Zum Abrufen von SQL Server müssen Sie auch das Quellsetup abschließen.
Option 1: Azure Databricks-Benutzeroberfläche
Administratoren können gleichzeitig in der Benutzeroberfläche eine Verbindung und eine Pipeline erstellen. Dies ist die einfachste Möglichkeit zum Erstellen von verwalteten Aufnahmepipelines.
Klicken Sie in der Randleiste des Azure Databricks-Arbeitsbereichs auf "Datenaufnahme".
Klicken Sie auf der Seite " Daten hinzufügen " unter "Databricks-Connectors" auf SQL Server.
Der Erfassungs-Assistent wird geöffnet.
Geben Sie auf der Seite Ingestion-Gateway des Assistenten einen eindeutigen Namen für das Gateway ein.
Wählen Sie einen Katalog und ein Schema für die Staging-Aufnahmedaten aus, und klicken Sie dann auf "Weiter".
Geben Sie auf der Seite "Ingestion-Pipeline " einen eindeutigen Namen für die Pipeline ein.
Wählen Sie für den Zielkatalog einen Katalog aus, um die aufgenommenen Daten zu speichern.
Wählen Sie die Unity-Katalogverbindung aus, die die für den Zugriff auf die Quelldaten erforderlichen Anmeldeinformationen speichert.
Wenn keine Verbindungen mit der Quelle vorhanden sind, klicken Sie auf "Verbindung erstellen" , und geben Sie die Authentifizierungsdetails ein, die Sie aus der Quelleinrichtung erhalten haben. Sie müssen über
CREATE CONNECTION-Berechtigungen für den Metaspeicher verfügen.Klicken Sie auf "Pipeline erstellen", und fahren Sie fort.
Wählen Sie auf der Seite "Quelle" die aufzunehmenden Tabellen aus.
Ändern Sie optional die Standardeinstellung für die Verlaufsverfolgung. Weitere Informationen finden Sie unter Aktivieren der Verlaufsverfolgung (SCD-Typ 2).
Klicke auf Weiter.
Wählen Sie auf der Seite "Ziel " den Unity-Katalog und das Schema aus, in das geschrieben werden soll.
Wenn Sie kein vorhandenes Schema verwenden möchten, klicken Sie auf "Schema erstellen". Sie müssen über die Berechtigungen
USE CATALOGundCREATE SCHEMAfür den übergeordneten Katalog verfügen.Klicken Sie auf Speichern und fortfahren.
(Optional) Klicken Sie auf der Seite "Einstellungen" auf " Zeitplan erstellen". Legen Sie die Häufigkeit fest, mit der die Zieltabellen aktualisiert werden.
(Optional) Festlegen von E-Mail-Benachrichtigungen für Erfolg oder Fehler des Pipelinevorgangs.
Klicken Sie auf "Speichern", und führen Sie die Pipelineaus.
Option 2: Andere Schnittstellen
Bevor Sie Databricks Asset Bundles, Databricks APIs, Databricks SDKs oder die Databricks CLI verwenden, müssen Sie Zugriff auf eine bestehende Unity-Katalog-Verbindung haben. Anweisungen finden Sie unter Herstellen einer Verbindung mit verwalteten Aufnahmequellen.
Erstellen des Staging-Katalogs und des Schemas
Der Stagingkatalog und das Stagingschema können mit dem Zielkatalog und -schema identisch sein. Bei dem Staging-Katalog kann es sich nicht um einen externen Katalog handeln.
Befehlszeilenschnittstelle (CLI)
export CONNECTION_NAME="my_connection"
export TARGET_CATALOG="main"
export TARGET_SCHEMA="lakeflow_sqlserver_connector_cdc"
export STAGING_CATALOG=$TARGET_CATALOG
export STAGING_SCHEMA=$TARGET_SCHEMA
export DB_HOST="cdc-connector.database.windows.net"
export DB_USER="..."
export DB_PASSWORD="..."
output=$(databricks connections create --json '{
"name": "'"$CONNECTION_NAME"'",
"connection_type": "SQLSERVER",
"options": {
"host": "'"$DB_HOST"'",
"port": "1433",
"trustServerCertificate": "false",
"user": "'"$DB_USER"'",
"password": "'"$DB_PASSWORD"'"
}
}')
export CONNECTION_ID=$(echo $output | jq -r '.connection_id')
Erstellen der Gateway- und Erfassungspipeline
Das Gateway zum Einbinden extrahiert Snapshot- und Änderungsdaten aus der Quelldatenbank und speichert sie im Unity Catalog Staging Volume. Sie müssen das Gateway als fortlaufende Pipeline ausführen. Dadurch können Sie alle Aufbewahrungsrichtlinien für Änderungsprotokolle berücksichtigen, die Sie in der Quelldatenbank haben.
Die Erfassungspipeline wendet die Momentaufnahme- und Änderungsdaten aus dem Stagingvolume in Zielstreamingtabellen an.
Databricks-Ressourcenpakete
Auf dieser Registerkarte wird beschrieben, wie Sie eine Datenerfassungspipeline mithilfe von Databricks Asset Bundles bereitstellen. Bundles können YAML-Definitionen von Aufträgen und Aufgaben enthalten, mithilfe der Databricks CLI verwaltet und in verschiedenen Zielarbeitsbereichen (z. B. Entwicklung, Staging und Produktion) freigegeben und ausgeführt werden. Weitere Informationen finden Sie unter Databricks Asset Bundles.
Erstellen Eines neuen Bündels mithilfe der Databricks CLI:
databricks bundle initFügen Sie dem Bundle zwei neue Ressourcendateien hinzu:
- Eine Pipelinedefinitionsdatei (
resources/sqlserver_pipeline.yml). - Eine Workflowdatei, die die Häufigkeit der Datenaufnahme steuert (
resources/sqlserver.yml).
Im Folgenden finden Sie eine
resources/sqlserver_pipeline.yml-Beispieldatei:variables: # Common variables used multiple places in the DAB definition. gateway_name: default: sqlserver-gateway dest_catalog: default: main dest_schema: default: ingest-destination-schema resources: pipelines: gateway: name: ${var.gateway_name} gateway_definition: connection_name: <sqlserver-connection> gateway_storage_catalog: main gateway_storage_schema: ${var.dest_schema} gateway_storage_name: ${var.gateway_name} target: ${var.dest_schema} catalog: ${var.dest_catalog} pipeline_sqlserver: name: sqlserver-ingestion-pipeline ingestion_definition: ingestion_gateway_id: ${resources.pipelines.gateway.id} objects: # Modify this with your tables! - table: # Ingest the table test.ingestion_demo_lineitem to dest_catalog.dest_schema.ingestion_demo_line_item. source_catalog: test source_schema: ingestion_demo source_table: lineitem destination_catalog: ${var.dest_catalog} destination_schema: ${var.dest_schema} - schema: # Ingest all tables in the test.ingestion_whole_schema schema to dest_catalog.dest_schema. The destination # table name will be the same as it is on the source. source_catalog: test source_schema: ingestion_whole_schema destination_catalog: ${var.dest_catalog} destination_schema: ${var.dest_schema} target: ${var.dest_schema} catalog: ${var.dest_catalog}Im Folgenden finden Sie eine
resources/sqlserver_job.yml-Beispieldatei:resources: jobs: sqlserver_dab_job: name: sqlserver_dab_job trigger: # Run this job every day, exactly one day from the last run # See https://docs.databricks.com/api/workspace/jobs/create#trigger periodic: interval: 1 unit: DAYS email_notifications: on_failure: - <email-address> tasks: - task_key: refresh_pipeline pipeline_task: pipeline_id: ${resources.pipelines.pipeline_sqlserver.id}- Eine Pipelinedefinitionsdatei (
Stellen Sie die Pipeline mithilfe der Databricks CLI bereit:
databricks bundle deploy
Notebook
Aktualisieren Sie die Configuration-Zelle im folgenden Notebook mit der Quellverbindung, dem Zielkatalog, dem Zielschema und den Tabellen, um Daten aus der Quelle zu erfassen.
Erstellen einer Gateway- und Erfassungspipeline
Befehlszeilenschnittstelle (CLI)
So erstellen Sie das Gateway:
output=$(databricks pipelines create --json '{
"name": "'"$GATEWAY_PIPELINE_NAME"'",
"gateway_definition": {
"connection_id": "'"$CONNECTION_ID"'",
"gateway_storage_catalog": "'"$STAGING_CATALOG"'",
"gateway_storage_schema": "'"$STAGING_SCHEMA"'",
"gateway_storage_name": "'"$GATEWAY_PIPELINE_NAME"'"
}
}')
export GATEWAY_PIPELINE_ID=$(echo $output | jq -r '.pipeline_id')
So erstellen Sie die Erfassungspipeline:
databricks pipelines create --json '{
"name": "'"$INGESTION_PIPELINE_NAME"'",
"ingestion_definition": {
"ingestion_gateway_id": "'"$GATEWAY_PIPELINE_ID"'",
"objects": [
{"table": {
"source_catalog": "tpc",
"source_schema": "tpch",
"source_table": "lineitem",
"destination_catalog": "'"$TARGET_CATALOG"'",
"destination_schema": "'"$TARGET_SCHEMA"'",
"destination_table": "<YOUR_DATABRICKS_TABLE>",
}},
{"schema": {
"source_catalog": "tpc",
"source_schema": "tpcdi",
"destination_catalog": "'"$TARGET_CATALOG"'",
"destination_schema": "'"$TARGET_SCHEMA"'"
}}
]
}
}'
Starten, Planen und Einrichten von Benachrichtigungen für Ihre Pipeline
Sie können einen Zeitplan für die Pipeline auf der Pipelinedetailseite erstellen.
Nachdem die Pipeline erstellt wurde, überprüfen Sie den Azure Databricks-Arbeitsbereich, und klicken Sie dann auf "Pipelines".
Die neue Pipeline wird in der Pipelineliste angezeigt.
Um die Pipelinedetails anzuzeigen, klicken Sie auf den Pipelinenamen.
Auf der Seite mit den Pipelinedetails können Sie die Pipeline planen, indem Sie auf "Zeitplan" klicken.
Um Benachrichtigungen für die Pipeline festzulegen, klicken Sie auf "Einstellungen", und fügen Sie dann eine Benachrichtigung hinzu.
Für jeden Zeitplan, den Sie einer Pipeline hinzufügen, erstellt Lakeflow Connect automatisch einen Auftrag dafür. Die Pipeline zum Einbinden von Daten ist eine Aufgabe innerhalb eines Jobs. Sie können dem Auftrag optional weitere Aufgaben hinzufügen.
Überprüfen der erfolgreichen Datenerfassung
Die Listenansicht auf der Detailseite der Pipeline zeigt die Anzahl der Datensätze an, die beim Einbinden der Daten verarbeitet werden. Diese Zahlen werden automatisch aktualisiert.
Die Spalten Upserted records und Deleted records werden standardmäßig nicht angezeigt. Sie können sie aktivieren, indem Sie auf die Schaltfläche für die Spaltenkonfiguration (
) klicken und die Spalten auswählen.