Udostępnij przez


Konfiguracja strumieni Auto Loader w trybie powiadamiania o plikach

Na tej stronie opisano sposób konfigurowania strumieni Auto Loader do korzystania z trybu powiadamiania plików, aby stopniowo odkrywać i pobierać dane z chmury.

W trybie powiadamiania plików automatyczne moduł ładujący automatycznie konfiguruje usługę powiadomień i usługę kolejki, która subskrybuje zdarzenia plików z katalogu wejściowego. Powiadomienia dotyczące plików umożliwiają skalowanie automatycznego modułu ładującego w celu pozyskiwania milionów plików na godzinę. W porównaniu z trybem wyświetlania listy katalogów tryb powiadomień plików jest bardziej wydajny i skalowalny.

Możesz przełączać się między powiadomieniami o plikach i listami katalogów w dowolnym momencie i nadal utrzymywać dokładnie jednokrotne gwarancje przetwarzania danych.

Uwaga

Tryb powiadamiania o plikach nie jest obsługiwany dla kont usługi Azure Premium Storage, ponieważ konta w warstwie Premium nie obsługują magazynu kolejek.

Ostrzeżenie

Zmiana ścieżki źródłowej dla automatycznego modułu ładującego nie jest obsługiwana w trybie powiadomień plików. Jeśli używany jest tryb powiadamiania o plikach i zmieniona zostanie ścieżka, może dojść do błędu w przypadku pobierania plików, które już znajdują się w nowym katalogu w chwili jego aktualizacji.

Tryb powiadomień plików ze zdarzeniami plików włączonymi i wyłączonymi w lokalizacjach zewnętrznych

Istnieją dwa sposoby konfigurowania automatycznego modułu ładującego do korzystania z trybu powiadomień plików:

  • (Zalecane) Zdarzenia plików: używasz pojedynczej kolejki powiadomień plików dla wszystkich strumieni, które przetwarzają pliki z danej lokalizacji zewnętrznej.

    Takie podejście ma następujące zalety w porównaniu ze starszym trybem powiadamiania o plikach:

    • Usługa Azure Databricks może skonfigurować subskrypcje i zdarzenia plików na koncie magazynu w chmurze bez konieczności podawania dodatkowych poświadczeń do Auto Loader przy użyciu poświadczeń usługi lub innych opcji uwierzytelniania specyficznych dla chmury. Zobacz (Zalecane) Włączanie zdarzeń plików dla lokalizacji zewnętrznej.
    • Masz mniej zasad zarządzania tożsamościami platformy Azure, które musisz utworzyć na koncie magazynu chmurowego.
    • Ponieważ nie musisz już tworzyć kolejki dla każdego strumienia Auto Loader, łatwiej uniknąć przekroczenia limitów powiadomień dostawcy usług w chmurze wymienionych w zasobach chmury używanych w starszym trybie powiadomień Auto Loader.
    • Usługa Azure Databricks automatycznie zarządza dostrajaniem wymagań dotyczących zasobów, więc nie trzeba dostrajać parametrów, takich jak cloudFiles.fetchParallelism.
    • Funkcja oczyszczania oznacza, że nie musisz się zbytnio martwić o cykl życia powiadomień tworzonych w chmurze, na przykład podczas usuwania strumienia lub jego pełnego odświeżenia.

Jeśli używasz Auto Loader w trybie wyświetlania katalogu, usługa Databricks zaleca migrację do trybu powiadamiania o plikach z użyciem zdarzeń plików. Automatyczne ładowanie (Auto Loader) ze zdarzeniami plików zapewnia znaczną poprawę wydajności. Zacznij od włączenia zdarzeń plików dla lokalizacji zewnętrznej, a następnie ustaw cloudFiles.useManagedFileEvents w konfiguracji strumienia Auto Loader.

  • Starszy tryb powiadamiania o plikach: zarządzasz kolejkami powiadomień plików dla każdego strumienia automatycznego modułu ładującego oddzielnie. Automatyczne ładowanie automatycznie konfiguruje usługę powiadomień i usługę kolejki, która subskrybuje zdarzenia plików z katalogu wejściowego.

    Jest to starsze podejście.

Używanie trybu powiadomień plików ze zdarzeniami plików

W tej sekcji opisano, jak tworzyć oraz aktualizować strumienie Auto Loader w celu używania zdarzeń plików.

Zanim rozpoczniesz

Konfigurowanie zdarzeń plików wymaga:

  • Obszar roboczy usługi Azure Databricks, który jest włączony dla Unity Catalog.
  • Uprawnienie do tworzenia poświadczeń magazynowych i obiektów lokalizacji zewnętrznej w Unity Catalog.

Strumienie automatycznego modułu ładującego ze zdarzeniami plików wymagają:

  • Wykonaj obliczenia w Databricks Runtime 14.3 LTS lub nowszym.

Instrukcje dotyczące konfiguracji

Poniższe instrukcje dotyczą tworzenia nowych strumieni Auto Loader lub migrowania istniejących strumieni w celu korzystania z uaktualnionego trybu powiadomień o zdarzeniach plików:

  1. Utwórz poświadczenia magazynu i lokalizację zewnętrzną w katalogu aparatu Unity, która udziela dostępu do lokalizacji źródłowej w magazynie w chmurze dla strumieni automatycznego modułu ładującego.

  2. Włącz zdarzenia plików dla lokalizacji zewnętrznej. Zobacz (Zalecane) Włączanie zdarzeń plików dla lokalizacji zewnętrznej.

  3. Podczas tworzenia nowego strumienia w narzędziu Auto Loader lub edytowania istniejącego, aby współpracować z lokalizacją zewnętrzną:

    • Jeśli masz istniejące strumienie Auto Loader oparte na powiadomieniach, które pobierają dane z zewnętrznej lokalizacji, wyłącz je i usuń powiązane zasoby powiadomień.
    • Upewnij się, że pathRewrites nie jest ustawiona (nie jest to typowa opcja).
    • Przejrzyj listę ustawień ignorowanych przez moduł automatycznego ładowania podczas zarządzania powiadomieniami o plikach przy użyciu zdarzeń plików. Unikaj ich w nowych strumieniach Auto Loader i usuń je z istniejących strumieni, które są migrowane do tego trybu.
    • Ustaw opcję cloudFiles.useManagedFileEvents na true w kodzie Auto Loader.

Przykład:

autoLoaderStream = (spark.readStream
  .format("cloudFiles")
  ...
  .options("cloudFiles.useManagedFileEvents", True)
  ...)

Jeśli korzystasz z potoków deklaratywnych Lakeflow Spark i masz już potok z tabelą przesyłania strumieniowego, zaktualizuj go, aby uwzględnić opcję useManagedFileEvents.

CREATE OR REFRESH STREAMING LIVE TABLE <table-name>
AS SELECT <select clause expressions>
  FROM STREAM read_files('abfss://path/to/external/location/or/volume',
                   format => '<format>',
                   useManagedFileEvents => 'True'
                   ...
                   );

Nieobsługiwane ustawienia autoloadera

Następujące ustawienia automatycznego modułu ładującego nie są obsługiwane, gdy strumienie używają zdarzeń plików:

Ustawienia Zmiana
useIncremental Nie musisz już decydować o wydajności powiadomień dotyczących plików i prostoty wyświetlania listy katalogów. Automatyczny ładowacz z obsługą zdarzeń plików jest dostępny w jednym trybie.
useNotifications Istnieje tylko jedna subskrypcja zdarzeń kolejki i magazynu na lokalizację zewnętrzną.
cloudFiles.fetchParallelism Auto Loader ze zdarzeniami plików nie oferuje ręcznej optymalizacji równoległego przetwarzania.
cloudFiles.backfillInterval Usługa Azure Databricks automatycznie obsługuje uzupełnianie zaległości dla zewnętrznych lokalizacji, które są włączone dla zdarzeń plikowych.
cloudFiles.pathRewrites Ta opcja ma zastosowanie tylko w przypadku instalowania lokalizacji danych zewnętrznych w systemie plików DBFS, który jest przestarzały.
resourceTags Należy ustawić tagi zasobów, używając konsoli chmury.

Aby zapoznać się z najlepszymi praktykami dotyczącymi zarządzanych zdarzeń plikowych, zobacz Najlepsze praktyki dla Auto Loader ze zdarzeniami plikowymi.

Ograniczenia dotyczące automatycznego modułu ładującego ze zdarzeniami plików

Usługa zdarzeń plików optymalizuje odnajdywanie plików przez buforowanie ostatnio utworzonych plików. Jeśli automatyczne ładowanie jest uruchamiane rzadko, ta pamięć podręczna może wygasnąć, a moduł automatycznego ładowania powróci do listy katalogów w celu odnalezienia plików i zaktualizowania pamięci podręcznej. Aby uniknąć tego scenariusza, należy wywołać moduł automatycznego ładowania co najmniej raz na siedem dni.

Aby uzyskać ogólną listę ograniczeń dotyczących zdarzeń plików, zobacz Ograniczenia zdarzeń plików.

Zarządzanie kolejkami powiadomień plików dla każdego strumienia automatycznego ładowania oddzielnie (starsza wersja)

Ważne

Aby automatycznie skonfigurować infrastrukturę chmury na potrzeby trybu powiadomień o plikach, potrzebne są podwyższone uprawnienia. Skontaktuj się z administratorem chmury lub administratorem obszaru roboczego. Widzieć:

Zasoby w chmurze używane w starszym trybie powiadamiania o plikach automatycznego ładowania

Automatyczne ładowanie może automatycznie konfigurować powiadomienia o plikach po ustawieniu opcji cloudFiles.useNotifications na true i podać niezbędne uprawnienia do tworzenia zasobów w chmurze. Ponadto może być konieczne podanie dodatkowych opcji, aby umożliwić Auto Loaderowi autoryzację do utworzenia tych zasobów.

W poniższej tabeli wymieniono zasoby utworzone przez moduł automatycznego ładowania dla każdego dostawcy usług w chmurze.

Magazyn w chmurze Usługa subskrypcji Usługa kolejki Przedrostek* Ograniczać**
Amazon S3 AWS SNS AWS SQS databricks-auto-pozyskiwanie 100 na zasobnik S3
ADLS Azure Event Grid Azure Queue Storage databricks 500 na konto magazynu
GCS Google Pub/Sub (usługa przesyłania wiadomości) Google Pub/Sub (usługa przesyłania wiadomości) databricks-auto-pozyskiwanie 100 na zasobnik GCS
Azure Blob Storage Azure Event Grid Azure Queue Storage databricks 500 na konto magazynu

* Auto Loader nazywa zasoby używając tego prefiksu.

** Ile współbieżnych potoków powiadomień o plikach można uruchomić

Jeśli musisz uruchomić więcej strumieni automatycznego ładowania opartego na powiadomieniach plików niż dozwolone przez te limity, możesz użyć zdarzeń plików lub usługi, takiej jak AWS Lambda, Azure Functions lub Google Cloud Functions, aby rozsyłać powiadomienia z pojedynczej kolejki, która nasłuchuje całego kontenera lub zasobnika do kolejek specyficznych dla katalogu.

Zdarzenia powiadomień o starszych plikach

Usługa Amazon S3 udostępnia zdarzenie ObjectCreated, gdy plik jest ładowany do zasobnika S3, niezależnie od tego, czy był przesłany za pomocą operacji 'put', czy za pomocą 'multi-part upload'.

Usługa Azure Data Lake Storage udostępnia różne powiadomienia o zdarzeniach dla plików pojawiających się w kontenerze magazynu.

  • Moduł automatycznego ładowania nasłuchuje zdarzenia FlushWithClose do przetwarzania pliku.
  • Strumienie automatycznego modułu ładującego obsługują RenameFile akcję odnajdywania plików. RenameFile akcje wymagają żądania interfejsu API do systemu pamięci masowej w celu uzyskania rozmiaru zmienionego pliku.
  • Strumienie automatycznego modułu ładującego utworzone za pomocą środowiska Databricks Runtime 9.0 i po obsłudze RenameDirectory akcji odnajdywania plików. RenameDirectory akcje wymagają, aby żądania interfejsu API do systemu przechowywania wymieniły zawartość zmienionego katalogu.

Usługa Google Cloud Storage udostępnia OBJECT_FINALIZE zdarzenie podczas przekazywania pliku, w tym zastępowania i kopiowania plików. Nieudane przesyłania nie generują tego zdarzenia.

Uwaga

Dostawcy usług w chmurze nie gwarantują 100% dostarczania wszystkich zdarzeń plików w bardzo rzadkich warunkach i nie zapewniają ścisłych umów SLA dotyczących opóźnienia zdarzeń plików. Usługa Databricks zaleca, aby wyzwalać regularne wypełnianie za pomocą modułu automatycznego ładującego przy użyciu cloudFiles.backfillInterval opcji zagwarantowania, że wszystkie pliki zostaną odnalezione w ramach danej umowy SLA, jeśli wymagana jest kompletność danych. Wyzwalanie regularnych wypełniania nie powoduje duplikatów.

Wymagane uprawnienia do konfigurowania powiadomień o plikach dla usług Azure Data Lake Storage i Azure Blob Storage

Musisz mieć uprawnienia do odczytu dla katalogu wejściowego. Zobacz Azure Blob Storage.

Aby użyć trybu powiadomień plików, należy podać poświadczenia uwierzytelniania na potrzeby konfigurowania usług powiadomień zdarzeń i uzyskiwania do nich dostępu.

Możesz uwierzytelnić się przy użyciu jednej z następujących metod:

Po uzyskaniu poświadczeń uwierzytelniania przypisz niezbędne uprawnienia do łącznika dostępu usługi Databricks (dla poświadczeń usługi) lub do aplikacji Microsoft Entra ID (dla jednostki usługi).

  • Korzystanie z ról wbudowanych platformy Azure

    Przypisz następujące role łącznikowi dostępu do konta magazynu, w którym znajduje się ścieżka wejściowa:

    • Współautor: Ta rola służy do konfigurowania zasobów na koncie magazynu, takich jak kolejki i subskrypcje zdarzeń.
    • Współautor danych kolejki usługi Storage: ta rola służy do wykonywania operacji kolejki, takich jak pobieranie i usuwanie komunikatów z kolejek. Ta rola jest wymagana tylko w przypadku podania jednostki usługi bez parametry połączenia.

    Przypisz ten łącznik dostępu następującą rolę do powiązanej grupy zasobów:

    Aby uzyskać więcej informacji, zobacz przypisywanie ról Azure za pomocą portalu Azure.

  • Używanie roli niestandardowej

    Jeśli interesuje Cię nadmierne uprawnienia wymagane dla poprzednich ról, możesz utworzyć rolę niestandardową z co najmniej następującymi uprawnieniami wymienionymi poniżej w formacie JSON roli platformy Azure:

    "permissions": [
      {
        "actions": [
          "Microsoft.EventGrid/eventSubscriptions/write",
          "Microsoft.EventGrid/eventSubscriptions/read",
          "Microsoft.EventGrid/eventSubscriptions/delete",
          "Microsoft.EventGrid/locations/eventSubscriptions/read",
          "Microsoft.Storage/storageAccounts/read",
          "Microsoft.Storage/storageAccounts/write",
          "Microsoft.Storage/storageAccounts/queueServices/read",
          "Microsoft.Storage/storageAccounts/queueServices/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/read",
          "Microsoft.Storage/storageAccounts/queueServices/queues/delete"
      ],
        "notActions": [],
        "dataActions": [
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/delete",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/read",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/process/action"
        ],
        "notDataActions": []
      }
    ]
    

    Następnie możesz przypisać tę rolę niestandardową do łącznika dostępu.

    Aby uzyskać więcej informacji, zobacz przypisywanie ról Azure za pomocą portalu Azure.

ustawienia uprawnień Auto Loader

Wymagane uprawnienia do konfigurowania powiadomień o plikach dla usługi Amazon S3

Musisz mieć uprawnienia do odczytu dla katalogu wejściowego. Aby uzyskać więcej informacji, zobacz szczegóły połączenia S3.

Aby użyć trybu powiadomień plików, dołącz następujący dokument zasad JSON do użytkownika lub roli IAM. Ta rola IAM jest wymagana do utworzenia poświadczeń usługi dla modułu Auto Loader do celów uwierzytelniania.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DatabricksAutoLoaderSetup",
      "Effect": "Allow",
      "Action": [
        "s3:GetBucketNotification",
        "s3:PutBucketNotification",
        "sns:ListSubscriptionsByTopic",
        "sns:GetTopicAttributes",
        "sns:SetTopicAttributes",
        "sns:CreateTopic",
        "sns:TagResource",
        "sns:Publish",
        "sns:Subscribe",
        "sqs:CreateQueue",
        "sqs:DeleteMessage",
        "sqs:ReceiveMessage",
        "sqs:SendMessage",
        "sqs:GetQueueUrl",
        "sqs:GetQueueAttributes",
        "sqs:SetQueueAttributes",
        "sqs:TagQueue",
        "sqs:ChangeMessageVisibility",
        "sqs:PurgeQueue"
      ],
      "Resource": [
        "arn:aws:s3:::<bucket-name>",
        "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
        "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
      ]
    },
    {
      "Sid": "DatabricksAutoLoaderList",
      "Effect": "Allow",
      "Action": ["sqs:ListQueues", "sqs:ListQueueTags", "sns:ListTopics"],
      "Resource": "*"
    },
    {
      "Sid": "DatabricksAutoLoaderTeardown",
      "Effect": "Allow",
      "Action": ["sns:Unsubscribe", "sns:DeleteTopic", "sqs:DeleteQueue"],
      "Resource": [
        "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
        "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
      ]
    }
  ]
}

gdzie:

  • <bucket-name>: nazwa zasobnika S3, z którego strumień będzie odczytywał pliki, na przykład auto-logs. Można użyć * jako symbolu wieloznacznych, na przykład databricks-*-logs. Aby dowiedzieć się podstawowego zasobnika S3 dla ścieżki systemu plików DBFS, możesz wyświetlić listę wszystkich punktów instalacji systemu plików DBFS w notesie, uruchamiając %fs mounts.
  • <region>: region platformy AWS, w którym znajduje się zasobnik S3, na przykład us-west-2. Jeśli nie chcesz określać regionu, użyj polecenia *.
  • <account-number>: numer konta platformy AWS, który jest właścicielem zasobnika S3, na przykład 123456789012. Jeśli nie chcesz określać numeru konta, użyj polecenia *.

Ciąg databricks-auto-ingest-* w specyfikacji SQS i SNS ARN jest prefiksem nazwy używanym cloudFiles przez źródło podczas tworzenia usług SQS i SNS. Ponieważ usługa Azure Databricks konfiguruje usługi powiadomień w początkowym uruchomieniu strumienia, możesz użyć zasad z ograniczonymi uprawnieniami po początkowym uruchomieniu (na przykład zatrzymać strumień, a następnie uruchomić go ponownie).

Uwaga

Powyższe zasady dotyczą tylko uprawnień wymaganych do konfigurowania usług powiadomień plików, a mianowicie powiadomień zasobników S3, SNS i SQS oraz zakłada, że masz już dostęp do odczytu do zasobnika S3. Jeśli musisz dodać uprawnienia tylko do odczytu S3, dodaj następujące elementy do listy Action w instrukcji DatabricksAutoLoaderSetup w dokumencie JSON:

  • s3:ListBucket
  • s3:GetObject

Ograniczone uprawnienia po początkowej konfiguracji

Uprawnienia konfiguracji zasobów opisane powyżej są wymagane tylko podczas początkowego uruchomienia strumienia. Po pierwszym uruchomieniu możesz przełączyć się na następujące zasady zarządzania dostępem i tożsamościami z ograniczonymi uprawnieniami.

Ważne

Przy ograniczonych uprawnieniach nie można uruchomić nowych zapytań przesyłania strumieniowego ani odtworzyć zasobów w przypadku awarii (na przykład kolejka SQS została przypadkowo usunięta); nie można również użyć interfejsu API zarządzania zasobami w chmurze do wyświetlania listy lub usuwania zasobów.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DatabricksAutoLoaderUse",
      "Effect": "Allow",
      "Action": [
        "s3:GetBucketNotification",
        "sns:ListSubscriptionsByTopic",
        "sns:GetTopicAttributes",
        "sns:TagResource",
        "sns:Publish",
        "sqs:DeleteMessage",
        "sqs:ReceiveMessage",
        "sqs:SendMessage",
        "sqs:GetQueueUrl",
        "sqs:GetQueueAttributes",
        "sqs:TagQueue",
        "sqs:ChangeMessageVisibility",
        "sqs:PurgeQueue"
      ],
      "Resource": [
        "arn:aws:sqs:<region>:<account-number>:<queue-name>",
        "arn:aws:sns:<region>:<account-number>:<topic-name>",
        "arn:aws:s3:::<bucket-name>"
      ]
    },
    {
      "Effect": "Allow",
      "Action": ["s3:GetBucketLocation", "s3:ListBucket"],
      "Resource": ["arn:aws:s3:::<bucket-name>"]
    },
    {
      "Effect": "Allow",
      "Action": ["s3:PutObject", "s3:PutObjectAcl", "s3:GetObject", "s3:DeleteObject"],
      "Resource": ["arn:aws:s3:::<bucket-name>/*"]
    },
    {
      "Sid": "DatabricksAutoLoaderListTopics",
      "Effect": "Allow",
      "Action": ["sqs:ListQueues", "sqs:ListQueueTags", "sns:ListTopics"],
      "Resource": "arn:aws:sns:<region>:<account-number>:*"
    }
  ]
}

Wymagane uprawnienia do konfigurowania powiadomień o plikach dla usługi GCS

Musisz mieć list uprawnienia i get w zasobniku GCS oraz na wszystkich obiektach. Aby uzyskać szczegółowe informacje, zobacz dokumentację firmy Google dotyczącą uprawnień do zarządzania dostępem i tożsamościami.

Aby użyć trybu powiadomień plików, musisz dodać uprawnienia dla konta usługi GCS i konta usługi używanego do uzyskiwania dostępu do zasobów usługi Google Cloud Pub/Sub.

Pub/Sub Publisher Dodaj rolę do konta usługi GCS. Dzięki temu konto może publikować komunikaty powiadomień o zdarzeniach z zasobników GCS do usługi Google Cloud Pub/Sub.

Jeśli chodzi o konto usługi używane dla zasobów Google Cloud Pub/Sub, należy dodać następujące uprawnienia. To konto usługi jest tworzone automatycznie podczas tworzenia poświadczeń usługi Databricks . Obsługa poświadczeń usługi jest dostępna w środowisku Databricks Runtime 16.1 lub nowszym.

pubsub.subscriptions.consume
pubsub.subscriptions.create
pubsub.subscriptions.delete
pubsub.subscriptions.get
pubsub.subscriptions.list
pubsub.subscriptions.update
pubsub.topics.attachSubscription
pubsub.topics.detachSubscription
pubsub.topics.create
pubsub.topics.delete
pubsub.topics.get
pubsub.topics.list
pubsub.topics.update

W tym celu możesz utworzyć rolę niestandardową IAM z tymi uprawnieniami lub przypisać istniejące role GCP, aby uwzględnić te uprawnienia.

Znajdowanie konta usługi GCS

W konsoli Google Cloud Console dla odpowiedniego projektu przejdź do Cloud Storage > Settingsstrony . Sekcja "Konto usługi magazynu w chmurze" zawiera adres e-mail konta usługi GCS.

Konto usługi GCS

Tworzenie niestandardowej roli IAM w chmurze Google dla trybu powiadomień plików

W konsoli Google Cloud dla odpowiedniego projektu przejdź do IAM & Admin > Rolesstrony . Następnie utwórz rolę u góry lub zaktualizuj istniejącą rolę. Na ekranie tworzenia lub edytowania roli kliknij pozycję Add Permissions. Zostanie wyświetlone menu, w którym można dodać odpowiednie uprawnienia do roli.

Role niestandardowe GCP IAM

Ręczne konfigurowanie zasobów powiadomień o plikach lub zarządzanie nimi

Użytkownicy uprzywilejowani mogą ręcznie konfigurować zasoby powiadomień o plikach lub zarządzać nimi.

  • Ręcznie skonfiguruj usługi powiadomień o plikach za pośrednictwem dostawcy usług w chmurze i ręcznie określ identyfikator kolejki. Aby uzyskać więcej informacji, zobacz Opcje powiadomień o plikach.
  • Użyj interfejsów API języka Scala, aby utworzyć powiadomienia i usługi kolejkowania oraz zarządzać nimi, jak pokazano w poniższym przykładzie:

Uwaga

Musisz mieć odpowiednie uprawnienia do konfigurowania lub modyfikowania infrastruktury chmury. Zobacz dokumentację uprawnień dla platformy Azure, S3 lub GCS.

Python

# Databricks notebook source
# MAGIC %md ## Python bindings for CloudFiles Resource Managers for all 3 clouds

# COMMAND ----------

#####################################
## Creating a ResourceManager in AWS
#####################################

# Using a Databricks service credential
manager = spark._jvm.com.databricks.sql.CloudFilesAWSResourceManager \
  .newManager() \
  .option("cloudFiles.region", <region>) \
  .option("path", <path-to-specific-bucket-and-folder>) \
  .option("databricks.serviceCredential", <service-credential-name>) \
  .create()

# Using AWS access key and secret key
manager = spark._jvm.com.databricks.sql.CloudFilesAWSResourceManager \
  .newManager() \
  .option("cloudFiles.region", <region>) \
  .option("cloudFiles.awsAccessKey", <aws-access-key>) \
  .option("cloudFiles.awsSecretKey", <aws-secret-key>) \
  .option("cloudFiles.roleArn", <role-arn>) \
  .option("cloudFiles.roleExternalId", <role-external-id>) \
  .option("cloudFiles.roleSessionName", <role-session-name>) \
  .option("cloudFiles.stsEndpoint", <sts-endpoint>) \
  .option("path", <path-to-specific-bucket-and-folder>) \
  .create()

#######################################
## Creating a ResourceManager in Azure
#######################################

# Using a Databricks service credential
manager = spark._jvm.com.databricks.sql.CloudFilesAzureResourceManager \
  .newManager() \
  .option("cloudFiles.resourceGroup", <resource-group>) \
  .option("cloudFiles.subscriptionId", <subscription-id>) \
  .option("databricks.serviceCredential", <service-credential-name>) \
  .option("path", <path-to-specific-container-and-folder>) \
  .create()

# Using an Azure service principal
manager = spark._jvm.com.databricks.sql.CloudFilesAzureResourceManager \
  .newManager() \
  .option("cloudFiles.connectionString", <connection-string>) \
  .option("cloudFiles.resourceGroup", <resource-group>) \
  .option("cloudFiles.subscriptionId", <subscription-id>) \
  .option("cloudFiles.tenantId", <tenant-id>) \
  .option("cloudFiles.clientId", <service-principal-client-id>) \
  .option("cloudFiles.clientSecret", <service-principal-client-secret>) \
  .option("path", <path-to-specific-container-and-folder>) \
  .create()

#######################################
## Creating a ResourceManager in GCP
#######################################

# Using a Databricks service credential
manager = spark._jvm.com.databricks.sql.CloudFilesGCPResourceManager \
  .newManager() \
  .option("cloudFiles.projectId", <project-id>) \
  .option("databricks.serviceCredential", <service-credential-name>) \
  .option("path", <path-to-specific-bucket-and-folder>) \
  .create()

# Using a Google service account
manager = spark._jvm.com.databricks.sql.CloudFilesGCPResourceManager \
  .newManager() \
  .option("cloudFiles.projectId", <project-id>) \
  .option("cloudFiles.client", <client-id>) \
  .option("cloudFiles.clientEmail", <client-email>) \
  .option("cloudFiles.privateKey", <private-key>) \
  .option("cloudFiles.privateKeyId", <private-key-id>) \
  .option("path", <path-to-specific-bucket-and-folder>) \
  .create()

# Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)

# List notification services created by <AL>
from pyspark.sql import DataFrame
df = DataFrame(manager.listNotificationServices(), spark)

# Tear down the notification services created for a specific stream ID.
# Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)

Skala

/////////////////////////////////////
// Creating a ResourceManager in AWS
/////////////////////////////////////

import com.databricks.sql.CloudFilesAWSResourceManager

/**
 * Using a Databricks service credential
 */
val manager = CloudFilesAWSResourceManager
    .newManager
    .option("cloudFiles.region", <region>) // optional, will use the region of the EC2 instances by default
    .option("databricks.serviceCredential", <service-credential-name>)
    .option("path", <path-to-specific-bucket-and-folder>) // required only for setUpNotificationServices
    .create()

/**
 * Using AWS access key and secret key
 */
val manager = CloudFilesAWSResourceManager
    .newManager
    .option("cloudFiles.region", <region>)
    .option("cloudFiles.awsAccessKey", <aws-access-key>)
    .option("cloudFiles.awsSecretKey", <aws-secret-key>)
    .option("cloudFiles.roleArn", <role-arn>)
    .option("cloudFiles.roleExternalId", <role-external-id>)
    .option("cloudFiles.roleSessionName", <role-session-name>)
    .option("cloudFiles.stsEndpoint", <sts-endpoint>)
    .option("path", <path-to-specific-bucket-and-folder>) // required only for setUpNotificationServices
    .create()

///////////////////////////////////////
// Creating a ResourceManager in Azure
///////////////////////////////////////

import com.databricks.sql.CloudFilesAzureResourceManager

/**
 * Using a Databricks service credential
 */
val manager = CloudFilesAzureResourceManager
  .newManager
  .option("cloudFiles.resourceGroup", <resource-group>)
  .option("cloudFiles.subscriptionId", <subscription-id>)
  .option("databricks.serviceCredential", <service-credential-name>)
  .option("path", <path-to-specific-container-and-folder>) // required only for setUpNotificationServices
  .create()

/**
 * Using an Azure service principal
 */
val manager = CloudFilesAzureResourceManager
  .newManager
  .option("cloudFiles.connectionString", <connection-string>)
  .option("cloudFiles.resourceGroup", <resource-group>)
  .option("cloudFiles.subscriptionId", <subscription-id>)
  .option("cloudFiles.tenantId", <tenant-id>)
  .option("cloudFiles.clientId", <service-principal-client-id>)
  .option("cloudFiles.clientSecret", <service-principal-client-secret>)
  .option("path", <path-to-specific-container-and-folder>) // required only for setUpNotificationServices
  .create()

///////////////////////////////////////
// Creating a ResourceManager in GCP
///////////////////////////////////////

import com.databricks.sql.CloudFilesGCPResourceManager

/**
 * Using a Databricks service credential
 */
val manager = CloudFilesGCPResourceManager
    .newManager
    .option("cloudFiles.projectId", <project-id>)
    .option("databricks.serviceCredential", <service-credential-name>)
    .option("path", <path-to-specific-bucket-and-folder>) // Required only for setUpNotificationServices.
    .create()

/**
 * Using a Google service account
 */
val manager = CloudFilesGCPResourceManager
    .newManager
    .option("cloudFiles.projectId", <project-id>)
    .option("cloudFiles.client", <client-id>)
    .option("cloudFiles.clientEmail", <client-email>)
    .option("cloudFiles.privateKey", <private-key>)
    .option("cloudFiles.privateKeyId", <private-key-id>)
    .option("path", <path-to-specific-bucket-and-folder>) // Required only for setUpNotificationServices.
    .create()

// Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)

// List notification services created by <AL>
val df = manager.listNotificationServices()

// Tear down the notification services created for a specific stream ID.
// Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)

Użyj setUpNotificationServices(<resource-suffix>) do utworzenia kolejki i subskrypcji o nazwie <prefix>-<resource-suffix> (prefiks zależy od systemu przechowywania podsumowanego w zasobach w chmurze wykorzystywanych w starszym trybie powiadomień plików Auto Loader. Jeśli istnieje zasób o tej samej nazwie, usługa Azure Databricks ponownie użyje istniejącego zasobu zamiast utworzyć nowy. Ta funkcja zwraca identyfikator kolejki, który można przekazać do źródła cloudFiles, korzystając z identyfikatora w opcjach powiadomień pliku . Dzięki cloudFiles temu użytkownik źródłowy może mieć mniej uprawnień niż użytkownik tworzący zasoby.

Podaj opcję "path" tylko w przypadku wywołania newManagermetody ; nie jest ona wymagana dla setUpNotificationServices elementu lub listNotificationServices.tearDownNotificationServices Jest to samo path , które jest używane podczas uruchamiania zapytania przesyłania strumieniowego.

Poniższa macierz wskazuje, które metody interfejsu API są obsługiwane w środowisku Databricks Runtime dla każdego typu magazynu:

Magazyn w chmurze Konfigurowanie interfejsu API API listy Zrywanie interfejsu API
Amazon S3 Wszystkie wersje Wszystkie wersje Wszystkie wersje
ADLS Wszystkie wersje Wszystkie wersje Wszystkie wersje
GCS Databricks Runtime 9.1 i nowsze Databricks Runtime 9.1 i nowsze Databricks Runtime 9.1 i nowsze
Azure Blob Storage Wszystkie wersje Wszystkie wersje Wszystkie wersje

Czyszczenie zasobów powiadomień o zdarzeniach utworzonych przez moduł automatycznego ładowania

Automatyczne ładowanie nie powoduje automatycznego usuwania zasobów powiadomień o plikach. Aby usunąć zasoby powiadomień o plikach, należy użyć menedżera zasobów w chmurze, jak pokazano w poprzedniej sekcji. Te zasoby można również usunąć ręcznie przy użyciu interfejsu użytkownika lub interfejsów API dostawcy usług w chmurze.

Rozwiązywanie typowych błędów

W tej sekcji opisano typowe błędy występujące podczas korzystania z automatycznego modułu ładującego z trybem powiadamiania plików i sposobu ich rozwiązywania.

Nie można utworzyć subskrypcji usługi Event Grid

Jeśli podczas pierwszego uruchamiania automatycznego modułu ładującego zostanie wyświetlony następujący komunikat o błędzie, usługa Event Grid nie jest zarejestrowana jako dostawca zasobów w subskrypcji platformy Azure.

java.lang.RuntimeException: Failed to create event grid subscription.

Aby zarejestrować usługę Event Grid jako dostawcę zasobów, wykonaj następujące czynności:

  1. W witrynie Azure Portal przejdź do subskrypcji.
  2. Kliknij pozycję Dostawcy zasobów w sekcji Ustawienia.
  3. Zarejestruj dostawcę Microsoft.EventGrid.

Autoryzacja wymagana do wykonywania operacji subskrypcji usługi Event Grid

Jeśli podczas pierwszego uruchamiania Auto Loader'a wyświetli się następujący komunikat o błędzie, upewnij się, że rola Współautora jest przypisana do głównej usługi dla Event Grid i konta magazynu.

403 Forbidden ... does not have authorization to perform action 'Microsoft.EventGrid/eventSubscriptions/[read|write]' over scope ...

Klient usługi Event Grid pomija serwer proxy

W środowisku Databricks Runtime 15.2 lub nowszym połączenia usługi Event Grid w usłudze Auto Loader domyślnie używają ustawień serwera proxy z właściwości systemowych. W Databricks Runtime 13.3 LTS, 14.3 LTS i od 15.0 do 15.2 można ręcznie skonfigurować połączenia Event Grid, aby korzystać z serwera proxy, ustawiając właściwość Spark Configspark.databricks.cloudFiles.eventGridClient.useSystemProperties true. Zobacz Ustawianie właściwości konfiguracji platformy Spark w usłudze Azure Databricks.