Freigeben über


Konfigurieren von Auto Loader Streams im Dateibenachrichtigungsmodus

Auf dieser Seite wird beschrieben, wie Sie Auto Loader-Streams so konfigurieren, dass der Dateibenachrichtigungsmodus verwendet wird, um Clouddaten schrittweise zu entdecken und zu erfassen.

Im Dateibenachrichtigungsmodus richtet Auto Loader automatisch einen Benachrichtigungsdienst und einen Warteschlangendienst ein, der Dateiereignisse aus dem Eingabeverzeichnis abonniert. Sie können Dateibenachrichtigungen verwenden, um den Autoloader so zu skalieren, dass pro Stunde Millionen von Dateien erfasst werden. Im Vergleich mit dem Verzeichnislistenmodus ist der Dateibenachrichtigungsmodus leistungsfähiger und skalierbarer.

Sie können jederzeit zwischen Dateibenachrichtigungen und Verzeichnisauflistung wechseln und trotzdem die Garantien für die einmalige Datenverarbeitung einhalten.

Hinweis

Der Dateibenachrichtigungsmodus wird für Azure Premium-Speicherkonten nicht unterstützt, da Premiumkonten den Warteschlangenspeicher nicht unterstützen.

Warnung

Das Ändern des Quellpfads für Autoloader wird für den Dateibenachrichtigungsmodus nicht unterstützt. Wenn der Dateibenachrichtigungsmodus verwendet und der Pfad geändert wird, können Sie möglicherweise keine Dateien erfassen, die zum Zeitpunkt der Verzeichnisaktualisierung bereits im neuen Verzeichnis vorhanden sind.

Dateibenachrichtigungsmodus mit und ohne Dateiereignisse, die an externen Speicherorten aktiviert sind

Es gibt zwei Möglichkeiten, den Auto Loader so zu konfigurieren, dass der Dateibenachrichtigungsmodus verwendet wird.

  • (Empfohlen) Dateiereignisse: Sie verwenden eine einzelne Dateibenachrichtigungswarteschlange für alle Datenströme, die Dateien von einem bestimmten externen Speicherort verarbeiten.

    Dieser Ansatz bietet gegenüber dem Legacy-Dateibenachrichtigungsmodus die folgenden Vorteile:

    • Azure Databricks kann Abonnements und Dateiereignisse in Ihrem Cloudspeicherkonto für Sie einrichten, ohne dass Sie zusätzliche Anmeldeinformationen für das automatische Laden mithilfe von Dienstanmeldeinformationen oder anderen cloudspezifischen Authentifizierungsoptionen angeben müssen. Siehe (Empfohlen) Aktivieren von Dateiereignissen für einen externen Speicherort.
    • Sie haben weniger von Azure verwaltete Identitätsrichtlinien, um sie in Ihrem Cloudspeicherkonto zu erstellen.
    • Da Sie keine Warteschlange mehr für jeden Auto Loader-Datenstrom erstellen müssen, ist es einfacher, die Benachrichtigungsgrenzen des Cloud-Anbieters zu vermeiden, die in den Cloud-Ressourcen im älteren Auto Loader-Benachrichtigungsmodus aufgeführt sind.
    • Azure Databricks verwaltet automatisch die Optimierung der Ressourcenanforderungen, sodass Sie parameter wie cloudFiles.fetchParallelismz. B. nicht optimieren müssen.
    • Die Bereinigungsfunktion bedeutet, dass Sie sich nicht so viel gedanken über den Lebenszyklus von Benachrichtigungen machen müssen, die in der Cloud erstellt werden, z. B. wenn ein Datenstrom gelöscht oder vollständig aktualisiert wird.

Wenn Sie Auto Loader im Verzeichnisauflistungsmodus verwenden, empfiehlt Databricks die Migration zum Dateibenachrichtigungsmodus mit Dateiereignissen. Das automatische Laden mit Dateiereignissen bietet erhebliche Leistungsverbesserungen. Aktivieren Sie zunächst Dateiereignisse für Ihren externen Speicherort, und legen Sie dann cloudFiles.useManagedFileEvents in der Streamkonfiguration des Auto Loaders fest.

  • Legacydateibenachrichtigungsmodus: Sie verwalten Dateibenachrichtigungswarteschlangen für jeden Auto Loader-Datenstrom separat. Auto Loader richtet automatisch einen Benachrichtigungsdienst und einen Warteschlangendienst ein, der Dateiereignisse aus dem Eingabeverzeichnis abonniert.

    Dies ist der herkömmliche Ansatz.

Verwenden Sie den Dateibenachrichtigungsmodus mit Dateiereignissen

In diesem Abschnitt wird beschrieben, wie Sie Auto Loader Streams erstellen und aktualisieren, um Dateiereignisse zu nutzen.

Bevor Sie anfangen

Einrichten von Dateiereignissen erfordert Folgendes:

  • Ein Azure Databricks-Arbeitsbereich, der für den Unity-Katalog aktiviert ist.
  • Berechtigung zum Erstellen von Speicheranmeldeinformationen und Objekten in externen Speicherorten in Unity Catalog.

Auto Loader-Datenströme mit Dateiereignissen erfordern:

  • Verwenden Sie Databricks Runtime 14.3 LTS oder höher.

Konfigurationsanweisungen

Die folgenden Anweisungen gelten unabhängig davon, ob Sie neue Datenströme für das automatische Laden erstellen oder vorhandene Datenströme migrieren, um den Aktualisierten Dateibenachrichtigungsmodus mit Dateiereignissen zu verwenden:

  1. Erstellen Sie eine Speicheranmeldeinformationen und einen externen Speicherort im Unity-Katalog, die Zugriff auf den Quellspeicherort im Cloudspeicher für Ihre Datenströme für das automatische Laden gewähren.

  2. Aktivieren Sie Dateiereignisse für den externen Speicherort. Siehe (Empfohlen) Aktivieren von Dateiereignissen für einen externen Speicherort.

  3. Wenn Sie einen neuen Stream für das automatische Laden erstellen oder einen vorhandenen datenstrom bearbeiten, um mit dem externen Speicherort zu arbeiten:

    • Wenn Sie über vorhandene benachrichtigungsbasierte Auto Loader-Streams verfügen, die Daten vom externen Speicherort nutzen, schalten Sie diese aus und löschen Sie die zugehörigen Benachrichtigungsressourcen.
    • Stellen Sie sicher, dass pathRewrites nicht gesetzt ist. Diese Option ist nicht häufig.
    • Überprüfen Sie die Liste der Einstellungen, die der Auto Loader ignoriert, wenn er Dateibenachrichtigungen mithilfe von Dateiereignissen verwaltet. Vermeiden Sie sie in neuen Datenströmen für das automatische Laden, und entfernen Sie sie aus vorhandenen Datenströmen, die Sie zu diesem Modus migrieren.
    • Stellen Sie die Option cloudFiles.useManagedFileEvents in Ihrem Auto-Loader-Code auf true ein.

Beispiel:

autoLoaderStream = (spark.readStream
  .format("cloudFiles")
  ...
  .options("cloudFiles.useManagedFileEvents", True)
  ...)

Wenn Sie Lakeflow Spark Declarative Pipelines verwenden und bereits über eine Pipeline mit einer Streamingtabelle verfügen, aktualisieren Sie sie so, dass sie die useManagedFileEvents Option enthält:

CREATE OR REFRESH STREAMING LIVE TABLE <table-name>
AS SELECT <select clause expressions>
  FROM STREAM read_files('abfss://path/to/external/location/or/volume',
                   format => '<format>',
                   useManagedFileEvents => 'True'
                   ...
                   );

Nicht unterstützte Einstellungen für das automatische Laden

Die folgenden Einstellungen für das automatische Laden werden nicht unterstützt, wenn Datenströme Dateiereignisse verwenden:

Konfiguration Veränderung
useIncremental Sie müssen nicht mehr zwischen der Effizienz von Dateibenachrichtigungen und der Einfachheit der Verzeichnisauflistung entscheiden. Auto Loader mit Dateiereignissen wird in einem Modus angezeigt.
useNotifications Pro externem Standort gibt es nur ein Abonnement für Warteschlange und Speicherereignis.
cloudFiles.fetchParallelism Auto Loader mit Dateiereignissen bietet keine Möglichkeit zur manuellen Optimierung der Parallelität.
cloudFiles.backfillInterval Azure Databricks behandelt das Backfill automatisch für externe Speicherorte, die für Dateiereignisse aktiviert sind.
cloudFiles.pathRewrites Diese Option gilt nur, wenn externe Datenspeicherorte in das DBFS eingehängt werden, das veraltet ist.
resourceTags Sie sollten Ressourcentags mithilfe der Cloudkonsole festlegen.

Bewährte Methoden für verwaltete Dateiereignisse finden Sie unter Best Practices für Auto Loader und Dateiereignisse.

Einschränkungen beim automatischen Laden mit Dateiereignissen

Der Dateiereignisdienst optimiert die Ermittlung von Dateien, indem er die zuletzt erstellten Dateien zwischenspeichert. Wenn das automatische Laden selten ausgeführt wird, kann dieser Cache ablaufen, und das automatische Laden greift auf die Verzeichnisauflistung zurück, um Dateien zu ermitteln und den Cache zu aktualisieren. Um dieses Szenario zu vermeiden, rufen Sie das automatische Laden mindestens einmal alle sieben Tage auf.

Eine allgemeine Liste der Einschränkungen für Dateiereignisse finden Sie unter Dateiereignisseinschränkungen.

Verwalten von Dateibenachrichtigungswarteschlangen für jeden Auto Loader-Stream separat (Legacy)

Wichtig

Sie benötigen erhöhte Berechtigungen zum automatischen Konfigurieren der Cloudinfrastruktur für den Dateibenachrichtigungsmodus. Wenden Sie sich an Ihren Cloudadministrator oder Arbeitsbereichsadministrator. Siehe:

Cloudressourcen, die im Autoloader-Dateibenachrichtigungsmodus der Legacyversion verwendet werden

Der Autoloader kann Dateibenachrichtigungen automatisch für Sie einrichten, wenn Sie die Option cloudFiles.useNotifications auf true festlegen und die erforderlichen Berechtigungen zum Erstellen von Cloudressourcen bereitstellen. Darüber hinaus müssen Sie möglicherweise zusätzlichen Optionen bereitstellen, um Autoloader die Berechtigung zum Erstellen dieser Ressourcen zu gewähren.

In der folgenden Tabelle sind die Ressourcen aufgeführt, die vom automatischen Laden für jeden Cloudanbieter erstellt werden.

Cloudspeicher Abonnementdienst Warteschlangendienst Präfix * Grenzwert**
Amazon S3 AWS SNS AWS SQS databricks-auto-ingest 100 pro S3-Bucket
ADLS Azure-Ereignisraster Azure Queue Storage (Warteschlangenspeicher) databricks 500 pro Speicherkonto
GCS Google Pub/Sub (Nachrichtendienst) Google Pub/Sub (Nachrichtendienst) databricks-auto-ingest 100 pro GCS-Bucket
Azure Blob Storage (Speicherdienst von Azure für unstrukturierte Daten) Azure-Ereignisraster Azure Queue Storage (Warteschlangenspeicher) databricks 500 pro Speicherkonto

* Auto Loader benennt die Ressourcen mit diesem Präfix.

** Wie viele gleichzeitige Dateibenachrichtigungspipelines können gestartet werden

Wenn Sie mehr dateibenachrichtigungsbasierte Auto-Loader-Streams ausführen müssen, als diese Grenzwerte zulässig sind, können Sie Dateiereignisse oder einen Dienst wie AWS Lambda, Azure Functions oder Google Cloud Functions verwenden, um Benachrichtigungen aus einer einzelnen Warteschlange weiterzuleiten, die einen Container oder Bucket in verzeichnisspezifische Warteschlangen überwacht.

Dateibenachrichtigungsereignisse der Legacyversion

Amazon S3 stellt ein ObjectCreated-Ereignis bereit, wenn eine Datei in einen S3-Bucket hochgeladen wird, unabhängig davon, ob sie von einem Put- oder mehrteiligen Upload hochgeladen wurde.

Azure Data Lake Storage bietet unterschiedliche Ereignisbenachrichtigungen für Dateien, die in Ihrem Speichercontainer angezeigt werden.

  • Der Autoloader lauscht auf das FlushWithClose-Ereignis für die Verarbeitung einer Datei.
  • Auto Loader Streams unterstützen die RenameFile Aktion zum Ermitteln von Dateien. RenameFile-Aktionen erfordern eine API-Anforderung an das Speichersystem, um die Größe der umbenannten Datei abzurufen.
  • Autoloader-Streams, die mit Databricks Runtime 9.0 und höher erstellt wurden, unterstützen die RenameDirectory-Aktion zum Ermitteln von Dateien. RenameDirectory-Aktionen erfordern API-Anforderungen an das Speichersystem, um den Inhalt des umbenannten Verzeichnisses aufzulisten.

Google Cloud Storage stellt ein OBJECT_FINALIZE-Ereignis bereit, wenn eine Datei hochgeladen wird, einschließlich Überschreibungen und Dateikopien. Bei fehlgeschlagenen Uploads wird dieses Ereignis nicht generiert.

Hinweis

Cloud-Anbieter garantieren unter sehr seltenen Bedingungen keine 100-prozentige Zustellung aller Dateiereignisse und stellen keine strengen SLAs für die Latenz der Dateiereignisse bereit. Databricks empfiehlt, regelmäßige Abgleiche mit dem Autoloader auszulösen, indem Sie die Option cloudFiles.backfillInterval verwenden, um sicherzustellen, dass alle Dateien innerhalb einer bestimmten SLA ermittelt werden, wenn die Vollständigkeit der Daten eine Anforderung ist. Das Auslösen regelmäßiger Backfills verursacht keine Duplikate.

Erforderliche Berechtigungen zum Konfigurieren der Dateibenachrichtigung für Azure Data Lake Storage und Azure Blob Storage

Sie müssen über Leseberechtigungen für das Eingabeverzeichnis verfügen. Weitere Informationen finden Sie unter Azure Blob Storage.

Zur Verwendung des Dateibenachrichtigungsmodus müssen Sie Anmeldeinformationen für die Authentifizierung angeben, um die Ereignisbenachrichtigungsdienste einrichten und darauf zugreifen zu können.

Sie können sich mit einer der folgenden Methoden authentifizieren:

Weisen Sie nach dem Abrufen von Authentifizierungsanmeldeinformationen entweder dem Databricks-Zugriffsconnector (für Dienstanmeldeinformationen) oder der Microsoft Entra ID-App (für einen Dienstprinzipal) die erforderlichen Berechtigungen zu.

  • Verwenden von integrierten Azure-Rollen

    Weisen Sie dem Zugriffsconnector die folgenden Rollen für das Speicherkonto zu, in dem sich der Eingabepfad befindet:

    • Mitwirkender: Diese Rolle dient zum Einrichten von Ressourcen in Ihrem Speicherkonto, z. B. Warteschlangen und Ereignisabonnements.
    • Mitwirkender an Storage-Warteschlangendaten: Diese Rolle dient zum Ausführen von Warteschlangenvorgänge, z. B. Abrufen und Löschen von Nachrichten aus den Warteschlangen. Diese Rolle ist nur erforderlich, wenn Sie einen Dienstprinzipal ohne Verbindungszeichenfolge bereitstellen.

    Weisen Sie diesem Zugriffsconnector die folgende Rolle für die zugehörige Ressourcengruppe zu:

    Weitere Informationen finden Sie unter Hinzufügen oder Entfernen von Azure-Rollenzuweisungen über das Azure-Portal.

  • Verwenden einer benutzerdefinierten Rolle

    Wenn Sie Bedenken wegen der übermäßigen Berechtigungen haben, die für die vorherigen Rollen erforderlich sind, können Sie eine benutzerdefinierte Rolle mit mindestens den folgenden Berechtigungen erstellen, die unten im JSON-Format der Azure-Rolle aufgeführt sind:

    "permissions": [
      {
        "actions": [
          "Microsoft.EventGrid/eventSubscriptions/write",
          "Microsoft.EventGrid/eventSubscriptions/read",
          "Microsoft.EventGrid/eventSubscriptions/delete",
          "Microsoft.EventGrid/locations/eventSubscriptions/read",
          "Microsoft.Storage/storageAccounts/read",
          "Microsoft.Storage/storageAccounts/write",
          "Microsoft.Storage/storageAccounts/queueServices/read",
          "Microsoft.Storage/storageAccounts/queueServices/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/read",
          "Microsoft.Storage/storageAccounts/queueServices/queues/delete"
      ],
        "notActions": [],
        "dataActions": [
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/delete",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/read",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/process/action"
        ],
        "notDataActions": []
      }
    ]
    

    Anschließend können Sie diese benutzerdefinierte Rolle Ihrem Zugriffsconnector zuweisen.

    Weitere Informationen finden Sie unter Hinzufügen oder Entfernen von Azure-Rollenzuweisungen über das Azure-Portal.

Einstellungen für Berechtigungen für Auto Loader

Erforderliche Berechtigungen zum Konfigurieren der Dateibenachrichtigung für Amazon S3

Sie müssen über Leseberechtigungen für das Eingabeverzeichnis verfügen. Weitere Informationen finden Sie unter S3-Verbindungsdetails.

Um den Dateibenachrichtigungsmodus zu verwenden, fügen Sie das folgende JSON-Richtliniendokument an Ihren IAM-Benutzer oder Ihre Rolle an. Diese IAM-Rolle ist erforderlich, um eine Dienstanmeldeinformationen für Auto Loader für die Authentifizierung zu erstellen.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DatabricksAutoLoaderSetup",
      "Effect": "Allow",
      "Action": [
        "s3:GetBucketNotification",
        "s3:PutBucketNotification",
        "sns:ListSubscriptionsByTopic",
        "sns:GetTopicAttributes",
        "sns:SetTopicAttributes",
        "sns:CreateTopic",
        "sns:TagResource",
        "sns:Publish",
        "sns:Subscribe",
        "sqs:CreateQueue",
        "sqs:DeleteMessage",
        "sqs:ReceiveMessage",
        "sqs:SendMessage",
        "sqs:GetQueueUrl",
        "sqs:GetQueueAttributes",
        "sqs:SetQueueAttributes",
        "sqs:TagQueue",
        "sqs:ChangeMessageVisibility",
        "sqs:PurgeQueue"
      ],
      "Resource": [
        "arn:aws:s3:::<bucket-name>",
        "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
        "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
      ]
    },
    {
      "Sid": "DatabricksAutoLoaderList",
      "Effect": "Allow",
      "Action": ["sqs:ListQueues", "sqs:ListQueueTags", "sns:ListTopics"],
      "Resource": "*"
    },
    {
      "Sid": "DatabricksAutoLoaderTeardown",
      "Effect": "Allow",
      "Action": ["sns:Unsubscribe", "sns:DeleteTopic", "sqs:DeleteQueue"],
      "Resource": [
        "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
        "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
      ]
    }
  ]
}

Hierbei gilt:

  • <bucket-name>: Der Name des S3-Buckets, in dem der Stream Dateien liest, z. B. auto-logs. Sie können * als Platzhalter verwenden, z. B. databricks-*-logs. Um den zugrunde liegenden S3-Bucket für Ihren DBFS-Pfad zu ermitteln, können Sie durch Ausführung von %fs mounts alle DBFS-Bereitstellungspunkte in einem Notebook auflisten.
  • <region>: Die AWS-Region, in der sich der S3-Bucket befindet, z. B. us-west-2. Wenn Sie die Region nicht angeben möchten, verwenden Sie *.
  • <account-number>: Die Nummer des AWS-Kontos, das als Besitzer des S3-Buckets fungiert, z. B. 123456789012. Wenn Sie die Kontonummer nicht angeben möchten, verwenden Sie *.

Die Zeichenfolge databricks-auto-ingest-* in der SQS- und SNS-ARN-Spezifikation ist das Namenspräfix, das die cloudFiles-Quelle beim Erstellen von SQS- und SNS-Diensten verwendet. Da Azure Databricks die Benachrichtigungsdienste bei der ersten Ausführung des Streams einrichtet, können Sie nach der ersten Ausführung eine Richtlinie mit eingeschränkten Berechtigungen verwenden (halten Sie den Stream beispielsweise an, und starten Sie ihn dann neu).

Hinweis

Die vorstehende Richtlinie umfasst nur die Berechtigungen, die für die Einrichtung von Dateibenachrichtigungsdiensten erforderlich sind: S3-Bucket-Benachrichtigung, SNS- und SQS-Dienste. Es wird davon ausgegangen, dass Sie bereits über Lesezugriff für den S3-Bucket verfügen. Wenn Sie S3-Leseberechtigungen hinzufügen müssen, fügen Sie der Action-Liste in der DatabricksAutoLoaderSetup-Anweisung im JSON-Dokument Folgendes hinzu:

  • s3:ListBucket
  • s3:GetObject

Eingeschränkte Berechtigungen nach der Ersteinrichtung

Die oben beschriebenen Berechtigungen zum Einrichten von Ressourcen werden nur bei der ersten Ausführung des Streams benötigt. Nach der ersten Ausführung können Sie zur folgenden IAM-Richtlinie mit reduzierten Berechtigungen wechseln.

Wichtig

Mit den reduzierten Berechtigungen können Sie keine neuen Streamingabfragen starten oder Ressourcen bei Fehlern neu erstellen (z. B. wurde die SQS-Warteschlange versehentlich gelöscht); Sie können auch die Cloudressourcenverwaltungs-API nicht verwenden, um Ressourcen auflisten oder herunterzureißen.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DatabricksAutoLoaderUse",
      "Effect": "Allow",
      "Action": [
        "s3:GetBucketNotification",
        "sns:ListSubscriptionsByTopic",
        "sns:GetTopicAttributes",
        "sns:TagResource",
        "sns:Publish",
        "sqs:DeleteMessage",
        "sqs:ReceiveMessage",
        "sqs:SendMessage",
        "sqs:GetQueueUrl",
        "sqs:GetQueueAttributes",
        "sqs:TagQueue",
        "sqs:ChangeMessageVisibility",
        "sqs:PurgeQueue"
      ],
      "Resource": [
        "arn:aws:sqs:<region>:<account-number>:<queue-name>",
        "arn:aws:sns:<region>:<account-number>:<topic-name>",
        "arn:aws:s3:::<bucket-name>"
      ]
    },
    {
      "Effect": "Allow",
      "Action": ["s3:GetBucketLocation", "s3:ListBucket"],
      "Resource": ["arn:aws:s3:::<bucket-name>"]
    },
    {
      "Effect": "Allow",
      "Action": ["s3:PutObject", "s3:PutObjectAcl", "s3:GetObject", "s3:DeleteObject"],
      "Resource": ["arn:aws:s3:::<bucket-name>/*"]
    },
    {
      "Sid": "DatabricksAutoLoaderListTopics",
      "Effect": "Allow",
      "Action": ["sqs:ListQueues", "sqs:ListQueueTags", "sns:ListTopics"],
      "Resource": "arn:aws:sns:<region>:<account-number>:*"
    }
  ]
}

Erforderliche Berechtigungen zum Konfigurieren der Dateibenachrichtigung für GCS

Sie müssen über list- und get-Berechtigungen für Ihren GCS-Bucket und für alle Objekte verfügen. Weitere Informationen finden Sie in der Google-Dokumentation zu IAM-Berechtigungen.

Um den Dateibenachrichtigungsmodus zu verwenden, müssen Sie für das GCS-Dienstkonto und für das Dienstkonto, das für den Zugriff auf die Google Cloud Pub/Sub-Ressourcen verwendet wird, Berechtigungen hinzufügen.

Fügen Sie die Pub/Sub Publisher-Rolle dem GCS-Dienstkonto hinzu. Dadurch kann das Konto Ereignisbenachrichtigungen aus Ihren GCS-Buckets in Google Cloud Pub/Sub veröffentlichen.

Für das Dienstkonto, das für die Google Cloud Pub/Sub-Ressourcen verwendet wird, müssen Sie die folgenden Berechtigungen hinzufügen. Dieses Dienstkonto wird automatisch erstellt, wenn Sie Databricks--Dienstanmeldedatenerstellen. Die Unterstützung für Dienstanmeldeinformationen ist in Databricks Runtime 16.1 und höher verfügbar.

pubsub.subscriptions.consume
pubsub.subscriptions.create
pubsub.subscriptions.delete
pubsub.subscriptions.get
pubsub.subscriptions.list
pubsub.subscriptions.update
pubsub.topics.attachSubscription
pubsub.topics.detachSubscription
pubsub.topics.create
pubsub.topics.delete
pubsub.topics.get
pubsub.topics.list
pubsub.topics.update

Hierzu können Sie entweder eine benutzerdefinierte IAM-Rolle mit diesen Berechtigungen erstellen oder bereits vorhandene GCP-Rollen zuweisen, um diese Berechtigungen abzudecken.

Suchen des GCS-Dienstkontos

Navigieren Sie in der Google Cloud Console für das entsprechende Projekt zu Cloud Storage > Settings. Der Abschnitt „Cloud Storage Service Account“ enthält die E-Mail des GCS-Dienstkontos.

GCS-Dienstkonto

Erstellen einer benutzerdefinierten Google Cloud-IAM-Rolle für den Dateibenachrichtigungsmodus

Navigieren Sie in der Google Cloud Console für das entsprechende Projekt zu IAM & Admin > Roles. Erstellen Sie dann entweder oben eine Rolle, oder aktualisieren Sie eine vorhandene Rolle. Klicken Sie auf dem Bildschirm für die Rollenerstellung oder -bearbeitung auf Add Permissions. Es erscheint ein Menü, in dem Sie der Rolle die gewünschten Berechtigungen hinzufügen können.

Benutzerdefinierte Rollen für GCP IAM

Manuelles Konfigurieren oder Verwalten von Dateibenachrichtigungsressourcen.

Privilegierte Benutzer können Dateibenachrichtigungsressourcen manuell konfigurieren oder verwalten.

  • Richten Sie die Dateibenachrichtigungsdienste manuell über den Cloudanbieter ein, und geben Sie den Warteschlangenbezeichner manuell an. Weitere Informationen finden Sie unter Dateibenachrichtigungsoptionen.
  • Verwenden Sie Scala-APIs, um die Benachrichtigungs- und Warteschlangendienste zu erstellen oder zu verwalten, wie im folgenden Beispiel gezeigt:

Hinweis

Sie müssen über geeignete Berechtigungen zum Konfigurieren oder Ändern der Cloudinfrastruktur verfügen. Weitere Informationen finden Sie in der Dokumentation zu Berechtigungen für Azure, S3 oder GCS.

Python

# Databricks notebook source
# MAGIC %md ## Python bindings for CloudFiles Resource Managers for all 3 clouds

# COMMAND ----------

#####################################
## Creating a ResourceManager in AWS
#####################################

# Using a Databricks service credential
manager = spark._jvm.com.databricks.sql.CloudFilesAWSResourceManager \
  .newManager() \
  .option("cloudFiles.region", <region>) \
  .option("path", <path-to-specific-bucket-and-folder>) \
  .option("databricks.serviceCredential", <service-credential-name>) \
  .create()

# Using AWS access key and secret key
manager = spark._jvm.com.databricks.sql.CloudFilesAWSResourceManager \
  .newManager() \
  .option("cloudFiles.region", <region>) \
  .option("cloudFiles.awsAccessKey", <aws-access-key>) \
  .option("cloudFiles.awsSecretKey", <aws-secret-key>) \
  .option("cloudFiles.roleArn", <role-arn>) \
  .option("cloudFiles.roleExternalId", <role-external-id>) \
  .option("cloudFiles.roleSessionName", <role-session-name>) \
  .option("cloudFiles.stsEndpoint", <sts-endpoint>) \
  .option("path", <path-to-specific-bucket-and-folder>) \
  .create()

#######################################
## Creating a ResourceManager in Azure
#######################################

# Using a Databricks service credential
manager = spark._jvm.com.databricks.sql.CloudFilesAzureResourceManager \
  .newManager() \
  .option("cloudFiles.resourceGroup", <resource-group>) \
  .option("cloudFiles.subscriptionId", <subscription-id>) \
  .option("databricks.serviceCredential", <service-credential-name>) \
  .option("path", <path-to-specific-container-and-folder>) \
  .create()

# Using an Azure service principal
manager = spark._jvm.com.databricks.sql.CloudFilesAzureResourceManager \
  .newManager() \
  .option("cloudFiles.connectionString", <connection-string>) \
  .option("cloudFiles.resourceGroup", <resource-group>) \
  .option("cloudFiles.subscriptionId", <subscription-id>) \
  .option("cloudFiles.tenantId", <tenant-id>) \
  .option("cloudFiles.clientId", <service-principal-client-id>) \
  .option("cloudFiles.clientSecret", <service-principal-client-secret>) \
  .option("path", <path-to-specific-container-and-folder>) \
  .create()

#######################################
## Creating a ResourceManager in GCP
#######################################

# Using a Databricks service credential
manager = spark._jvm.com.databricks.sql.CloudFilesGCPResourceManager \
  .newManager() \
  .option("cloudFiles.projectId", <project-id>) \
  .option("databricks.serviceCredential", <service-credential-name>) \
  .option("path", <path-to-specific-bucket-and-folder>) \
  .create()

# Using a Google service account
manager = spark._jvm.com.databricks.sql.CloudFilesGCPResourceManager \
  .newManager() \
  .option("cloudFiles.projectId", <project-id>) \
  .option("cloudFiles.client", <client-id>) \
  .option("cloudFiles.clientEmail", <client-email>) \
  .option("cloudFiles.privateKey", <private-key>) \
  .option("cloudFiles.privateKeyId", <private-key-id>) \
  .option("path", <path-to-specific-bucket-and-folder>) \
  .create()

# Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)

# List notification services created by <AL>
from pyspark.sql import DataFrame
df = DataFrame(manager.listNotificationServices(), spark)

# Tear down the notification services created for a specific stream ID.
# Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)

Scala

/////////////////////////////////////
// Creating a ResourceManager in AWS
/////////////////////////////////////

import com.databricks.sql.CloudFilesAWSResourceManager

/**
 * Using a Databricks service credential
 */
val manager = CloudFilesAWSResourceManager
    .newManager
    .option("cloudFiles.region", <region>) // optional, will use the region of the EC2 instances by default
    .option("databricks.serviceCredential", <service-credential-name>)
    .option("path", <path-to-specific-bucket-and-folder>) // required only for setUpNotificationServices
    .create()

/**
 * Using AWS access key and secret key
 */
val manager = CloudFilesAWSResourceManager
    .newManager
    .option("cloudFiles.region", <region>)
    .option("cloudFiles.awsAccessKey", <aws-access-key>)
    .option("cloudFiles.awsSecretKey", <aws-secret-key>)
    .option("cloudFiles.roleArn", <role-arn>)
    .option("cloudFiles.roleExternalId", <role-external-id>)
    .option("cloudFiles.roleSessionName", <role-session-name>)
    .option("cloudFiles.stsEndpoint", <sts-endpoint>)
    .option("path", <path-to-specific-bucket-and-folder>) // required only for setUpNotificationServices
    .create()

///////////////////////////////////////
// Creating a ResourceManager in Azure
///////////////////////////////////////

import com.databricks.sql.CloudFilesAzureResourceManager

/**
 * Using a Databricks service credential
 */
val manager = CloudFilesAzureResourceManager
  .newManager
  .option("cloudFiles.resourceGroup", <resource-group>)
  .option("cloudFiles.subscriptionId", <subscription-id>)
  .option("databricks.serviceCredential", <service-credential-name>)
  .option("path", <path-to-specific-container-and-folder>) // required only for setUpNotificationServices
  .create()

/**
 * Using an Azure service principal
 */
val manager = CloudFilesAzureResourceManager
  .newManager
  .option("cloudFiles.connectionString", <connection-string>)
  .option("cloudFiles.resourceGroup", <resource-group>)
  .option("cloudFiles.subscriptionId", <subscription-id>)
  .option("cloudFiles.tenantId", <tenant-id>)
  .option("cloudFiles.clientId", <service-principal-client-id>)
  .option("cloudFiles.clientSecret", <service-principal-client-secret>)
  .option("path", <path-to-specific-container-and-folder>) // required only for setUpNotificationServices
  .create()

///////////////////////////////////////
// Creating a ResourceManager in GCP
///////////////////////////////////////

import com.databricks.sql.CloudFilesGCPResourceManager

/**
 * Using a Databricks service credential
 */
val manager = CloudFilesGCPResourceManager
    .newManager
    .option("cloudFiles.projectId", <project-id>)
    .option("databricks.serviceCredential", <service-credential-name>)
    .option("path", <path-to-specific-bucket-and-folder>) // Required only for setUpNotificationServices.
    .create()

/**
 * Using a Google service account
 */
val manager = CloudFilesGCPResourceManager
    .newManager
    .option("cloudFiles.projectId", <project-id>)
    .option("cloudFiles.client", <client-id>)
    .option("cloudFiles.clientEmail", <client-email>)
    .option("cloudFiles.privateKey", <private-key>)
    .option("cloudFiles.privateKeyId", <private-key-id>)
    .option("path", <path-to-specific-bucket-and-folder>) // Required only for setUpNotificationServices.
    .create()

// Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)

// List notification services created by <AL>
val df = manager.listNotificationServices()

// Tear down the notification services created for a specific stream ID.
// Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)

Verwenden Sie setUpNotificationServices(<resource-suffix>), um eine Warteschlange und ein Abonnement mit dem Namen <prefix>-<resource-suffix> zu erstellen (das Präfix hängt vom Speichersystem ab, siehe Zusammenfassung unter Cloudressourcen, die im Autoloader-Dateibenachrichtigungsmodus der Legacyversion verwendet werden). Wenn eine vorhandene Ressource mit demselben Namen vorhanden ist, verwendet Azure Databricks die vorhandene Ressource erneut, anstatt eine neue zu erstellen. Diese Funktion gibt einen Warteschlangenbezeichner zurück, den Sie mithilfe des Bezeichners in cloudFiles an die -Quelle übergeben können. Dadurch hat der Benutzer der cloudFiles-Quelle weniger Berechtigungen als der Benutzer, der die Ressourcen erstellt.

Geben Sie die Option "path" für newManager nur an, wenn Sie setUpNotificationServices aufrufen. Sie wird für listNotificationServices oder tearDownNotificationServices nicht benötigt. Dies ist derselbe path, den Sie beim Ausführen einer Streamingabfrage verwenden.

Die folgende Matrix gibt an, welche API-Methoden in welcher Databricks Runtime für jeden Speichertyp unterstützt wird:

Cloudspeicher Setup-API Listen-API Löschungs-API
Amazon S3 Alle Versionen Alle Versionen Alle Versionen
ADLS Alle Versionen Alle Versionen Alle Versionen
GCS Databricks Runtime 9.1 und höher Databricks Runtime 9.1 und höher Databricks Runtime 9.1 und höher
Azure Blob Storage (Speicherdienst von Azure für unstrukturierte Daten) Alle Versionen Alle Versionen Alle Versionen

Bereinigen der von Autoloader erstellten Ereignisbenachrichtigungsressourcen

Auto Loader reißt Dateibenachrichtigungsressourcen nicht automatisch ab. Um Dateibenachrichtigungsressourcen abzubauen, müssen Sie den Cloud-Ressourcen-Manager verwenden, wie im vorherigen Abschnitt beschrieben. Sie können diese Ressourcen auch manuell mithilfe der Benutzeroberfläche oder APIs des Cloudanbieters löschen.

Problembehandlung für häufige Fehler

In diesem Abschnitt werden häufige Fehler bei der Verwendung von Auto Loader im Dateibenachrichtigungsmodus und wie man sie behebt, beschrieben.

Fehler beim Erstellen des Event Grid-Abonnements

Wenn beim ersten Ausführen des Auto Loaders die folgende Fehlermeldung angezeigt wird, wird Event Grid nicht als Ressourcenanbieter im Azure-Abonnement registriert.

java.lang.RuntimeException: Failed to create event grid subscription.

Gehen Sie wie folgt vor, um Event Grid als Ressourcenanbieter zu registrieren:

  1. Navigieren Sie im Azure-Portal zu Ihrem Abonnement.
  2. Klicken Sie im Abschnitt „Einstellungen“ auf Ressourcenanbieter.
  3. Registrieren Sie den Anbieter Microsoft.EventGrid.

Zum Ausführen von Event Grid-Abonnementvorgängen erforderliche Autorisierung

Wenn bei der ersten Ausführung von Auto Loader die folgende Fehlermeldung angezeigt wird, vergewissern Sie sich, dass dem Dienstprinzipal für Event Grid und das Speicherkonto die Rolle Mitwirkender zugewiesen wurde.

403 Forbidden ... does not have authorization to perform action 'Microsoft.EventGrid/eventSubscriptions/[read|write]' over scope ...

Event Grid-Client umgeht Proxy

In Databricks Runtime 15.2 und höher verwenden Event Grid-Verbindungen in Auto Loader standardmäßig die Proxyeinstellungen aus den Systemeigenschaften. In Databricks Runtime 13.3 LTS, 14.3 LTS und 15.0 bis 15.2 können Sie Event Grid-Verbindungen manuell so konfigurieren, dass ein Proxy verwendet wird, indem Sie die Spark Config-Eigenschaft spark.databricks.cloudFiles.eventGridClient.useSystemProperties truefestlegen. Siehe Festlegen von Spark-Konfigurationseigenschaften in Azure Databricks.