Partager via


Configurer des flux de chargeur automatique en mode de notification de fichier

Cette page explique comment configurer des flux de chargeur automatique pour utiliser le mode de notification de fichier pour découvrir et ingérer de manière incrémentielle des données cloud.

Dans le mode Notification de fichiers, Auto Loader configure automatiquement un service de notification et un service de file d’attente qui s’abonnent aux événements de fichiers du répertoire d’entrée. Vous pouvez utiliser les notifications de fichiers pour adapter l'Auto Loader à l'ingestion de millions de fichiers par heure. Par rapport au mode de liste de répertoires, le mode de notification de fichier est plus performant et évolutif.

Vous pouvez basculer entre les notifications de fichiers et la liste de répertoires à tout moment tout en conservant la garantie de traitement des données « une seule fois ».

Remarque

Le mode de notification de fichier n’est pas pris en charge pour les comptes de stockage Azure Premium, car les comptes Premium ne prennent pas en charge le stockage de file d’attente.

Avertissement

La modification du chemin d’accès source pour Auto Loader n’est pas prise en charge pour le mode de notification de fichier. Si le mode de notification de fichier est utilisé et que le chemin d’accès a été modifié, vous risquez de ne pas ingérer les fichiers qui sont déjà présents dans le nouveau répertoire lors de sa mise à jour.

Mode de notification de fichier avec et sans événements de fichier activés sur des emplacements externes

Il existe deux façons de configurer le chargeur automatique pour utiliser le mode de notification de fichier :

  • (Recommandé) Événements de fichier : vous utilisez une file d’attente de notification de fichier unique pour tous les flux qui traitent les fichiers à partir d’un emplacement externe donné.

    Cette approche présente les avantages suivants par rapport au mode de notification de fichier hérité :

    • Azure Databricks peut configurer des abonnements et des événements de fichier dans votre compte de stockage cloud sans avoir à fournir d’informations d’identification supplémentaires au chargeur automatique à l’aide d’informations d’identification de service ou d’autres options d’authentification spécifiques au cloud. Voir (Recommandé) Activer les événements de fichier pour un emplacement externe.
    • Vous avez moins de stratégies d’identité managée Azure à créer dans votre compte de stockage cloud.
    • Étant donné que vous n’avez plus besoin de créer une file d’attente pour chaque flux de chargeur automatique, il est plus facile d’éviter d’atteindre les limites de notification du fournisseur de cloud répertoriées dans les ressources cloud utilisées en mode de notification de fichier de chargeur automatique hérité.
    • Azure Databricks gère automatiquement le réglage des besoins en ressources. Vous n’avez donc pas besoin de régler les paramètres tels que cloudFiles.fetchParallelism.
    • La fonctionnalité de nettoyage signifie que vous n’avez pas besoin de vous soucier autant du cycle de vie des notifications créées dans le cloud, par exemple lorsqu’un flux est supprimé ou entièrement actualisé.

Si vous utilisez le chargeur automatique en mode liste de répertoires, Databricks recommande de migrer vers le mode de notification de fichier avec des événements de fichier. Le chargeur automatique avec des événements de fichier offre des améliorations significatives des performances. Commencez par activer les événements de fichier pour votre emplacement externe, puis définissez cloudFiles.useManagedFileEvents dans votre configuration de flux de chargeur automatique.

  • Mode de notification de fichier hérité : vous gérez séparément les files d'attente de notification de fichier pour chaque flux de chargement automatique. Le chargeur automatique configure automatiquement un service de notification et un service de file d’attente qui s’abonne aux événements de fichier à partir du répertoire d’entrée.

    Il s’agit de l’approche héritée.

Utiliser le mode de notification de fichier avec les événements de fichier

Cette section explique comment créer et mettre à jour des flux de chargeur automatique pour utiliser des événements de fichier.

Avant de commencer

La configuration des événements de fichier nécessite :

  • Espace de travail Azure Databricks activé pour Unity Catalog.
  • Autorisation de créer des informations d’identification de stockage et des objets d’emplacement externe dans le catalogue Unity.

Les flux de chargeur automatique avec des événements de fichier nécessitent :

  • Effectuer des calculs sur Databricks Runtime 14.3 LTS ou version ultérieure.

Instructions de configuration

Les instructions suivantes s’appliquent si vous créez des flux de chargeur automatique ou que vous migrez des flux existants pour utiliser le mode de notification de fichier mis à niveau avec les événements de fichier :

  1. Créez des informations d’identification de stockage et un emplacement externe dans le catalogue Unity qui accordent l’accès à l’emplacement source dans le stockage cloud pour vos flux de chargeur automatique.

  2. Activez les événements de fichier pour l’emplacement externe. Voir (Recommandé) Activer les événements de fichier pour un emplacement externe.

  3. Lorsque vous créez un flux de chargeur automatique ou modifiez-en un existant pour travailler avec l’emplacement externe :

    • Si vous avez des flux de chargeur automatique basés sur des notifications existants qui consomment des données à partir de l’emplacement externe, désactivez-les et supprimez les ressources de notification associées.
    • Assurez-vous que pathRewrites ne soit pas défini (ce n'est pas une option courante).
    • Passez en revue la liste des paramètres ignorés par le chargeur automatique lorsqu’il gère les notifications de fichiers à l’aide d’événements de fichier. Évitez-les dans les nouveaux flux de chargeur automatique et supprimez-les des flux existants que vous migrez vers ce mode.
    • Définissez l'option cloudFiles.useManagedFileEvents à true dans votre code de chargeur automatique.

Par exemple:

autoLoaderStream = (spark.readStream
  .format("cloudFiles")
  ...
  .options("cloudFiles.useManagedFileEvents", True)
  ...)

Si vous utilisez des pipelines déclaratifs Spark Lakeflow et que vous disposez déjà d’un pipeline avec une table de diffusion en continu, mettez-le à jour pour inclure l’option useManagedFileEvents :

CREATE OR REFRESH STREAMING LIVE TABLE <table-name>
AS SELECT <select clause expressions>
  FROM STREAM read_files('abfss://path/to/external/location/or/volume',
                   format => '<format>',
                   useManagedFileEvents => 'True'
                   ...
                   );

Paramètres du chargeur automatique non pris en charge

Les paramètres de chargeur automatique suivants ne sont pas pris en charge lorsque les flux utilisent des événements de fichier :

Réglage Changez
useIncremental Vous n’avez plus besoin de décider entre l’efficacité des notifications de fichiers et la simplicité de la description des répertoires. Le chargeur automatique avec les événements de fichier est disponible en un seul mode.
useNotifications Il n’existe qu’un seul abonnement aux événements de file d’attente et de stockage par emplacement externe.
cloudFiles.fetchParallelism Le chargeur automatique avec les événements de fichier n’offre pas d’optimisation manuelle du parallélisme.
cloudFiles.backfillInterval Azure Databricks gère automatiquement le remplissage rétroactif pour les emplacements externes activés pour les événements de fichier.
cloudFiles.pathRewrites Cette option s’applique uniquement lorsque vous montez des emplacements de données externes dans le DBFS, qui est déconseillé.
resourceTags Vous devez définir des étiquettes de ressources à l’aide de la console cloud.

Pour connaître les meilleures pratiques relatives aux événements de fichiers managés, consultez les meilleures pratiques pour le chargeur automatique avec les événements de fichier.

Limitations de l'Auto Loader avec les événements de fichier

Le service d’événements de fichiers optimise la découverte de fichiers en mettant en cache les fichiers les plus récemment créés. Si le chargeur automatique s’exécute rarement, ce cache peut expirer et le chargeur automatique revient à la liste des répertoires pour découvrir les fichiers et mettre à jour le cache. Pour éviter ce scénario, appelez le chargeur automatique au moins une fois tous les sept jours.

Pour obtenir la liste générale des limitations relatives aux événements de fichier, consultez les limitations des événements de fichier.

Gérer les files d’attente de notification de fichier pour chaque flux de chargement automatique séparément (hérité)

Important

Vous avez besoin d’autorisations élevées pour configurer automatiquement l’infrastructure cloud pour le mode de notification de fichier. Contactez votre administrateur cloud ou administrateur d’espace de travail. Voir :

Ressources cloud utilisées en mode de notification de fichier pour les versions antérieures de Auto Loader

Auto Loader peut configurer automatiquement des notifications de fichiers pour vous lorsque vous définissez l’option cloudFiles.useNotifications sur true et que vous fournissez les autorisations nécessaires pour créer des ressources cloud. En outre, vous devrez peut-être fournir des options supplémentaires pour donner à Auto Loader l’autorisation de créer ces ressources.

Le tableau suivant répertorie les ressources créées par le chargeur automatique pour chaque fournisseur de cloud.

Stockage cloud Service d’abonnement Service File d’attente Préfixe * Limite **
Amazon S3 AWS SNS AWS SQS (Amazon Simple Queue Service) databricks-auto-ingestion 100 par compartiment S3
ADLS Azure Event Grid Stockage File d’attente Azure Databricks (plateforme de traitement de données) 500 par compte de stockage
GCS Google Pub/Sub Google Pub/Sub databricks-auto-ingestion 100 par compartiment GCS
Stockage Blob Azure Azure Event Grid Stockage File d’attente Azure Databricks (plateforme de traitement de données) 500 par compte de stockage

* Le chargeur automatique nomme les ressources avec ce préfixe.

** Nombre de pipelines de notification de fichiers pouvant être lancés simultanément

Si vous devez exécuter davantage de flux de chargement automatique basés sur des notifications de fichiers que autorisés par ces limites, vous pouvez utiliser des événements de fichier ou un service tel que AWS Lambda, Azure Functions ou Google Cloud Functions pour faner les notifications d’une seule file d’attente qui écoute un conteneur ou un compartiment entier dans des files d’attente spécifiques au répertoire.

Événements de notification de fichiers hérités

Amazon S3 fournit un événement de ObjectCreated lorsqu’un fichier est chargé dans un compartiment S3, qu’il ait été téléchargé par un chargement en put ou en plusieurs parties.

Azure Data Lake Storage fournit différentes notifications d’événements pour les fichiers qui apparaissent dans votre conteneur de stockage.

  • Auto Loader écoute l’événement FlushWithClose pour traiter un fichier.
  • Les flux Auto Loader prennent en charge l’action RenameFile de découverte de fichiers. Les actions RenameFile nécessitent l’envoi d’une demande d’API au système de stockage pour obtenir la taille du fichier renommé.
  • Les flux Auto Loader créés avec Databricks Runtime 9.0 et versions ultérieures prennent en charge l’action RenameDirectory pour découvrir les fichiers. Les actions RenameDirectory nécessitent l’envoi de demandes d’API au système de stockage pour lister le contenu du répertoire renommé.

Google Cloud Storage fournit un événement OBJECT_FINALIZE lorsqu’un fichier est chargé, ce qui inclut les remplacements et les copies de fichiers. Les échecs de chargement ne génèrent pas cet événement.

Remarque

Les fournisseurs de cloud ne garantissent pas une livraison à 100 % de tous les événements de fichiers dans des conditions très rares et ne fournissent pas de SLA stricts sur la latence des événements de fichiers. Databricks vous recommande de déclencher des renvois standard avec Auto Loader à l’aide de l’option cloudFiles.backfillInterval pour garantir que tous les fichiers sont découverts dans un SLA donné si l’exhaustivité des données est obligatoire. Le déclenchement de renvois normaux n’entraîne pas de doublons.

Autorisations requises pour la configuration de la notification de fichier pour Azure Data Lake Storage et Stockage Blob d'Azure

Vous devez disposer d’autorisations de lecture pour le répertoire d’entrée. Consultez Stockage Blob Azure.

Pour utiliser le mode de notification de fichiers, vous devez fournir des informations d’authentification pour configurer les services de notification d’événements et y accéder.

Vous pouvez vous authentifier à l’aide de l’une des méthodes suivantes :

Après avoir obtenu les informations d’identification d’authentification, attribuez les autorisations nécessaires au connecteur d’accès Databricks (pour les informations d’identification du service) ou à l’application Microsoft Entra ID (pour un principal de service).

  • Utilisation de rôles intégrés Azure

    Attribuez au connecteur d’accès les rôles suivants au compte de stockage dans lequel réside le chemin d’accès d’entrée :

    • Contributor : Ce rôle sert à configurer les ressources de votre compte de stockage, telles que les files d’attente et les abonnements aux événements.
    • Storage Queue Data Contributor : Ce rôle sert à effectuer des opérations sur les files d’attente, telles que la récupération et la suppression de messages dans les files d’attente. Ce rôle est obligatoire uniquement lorsque vous fournissez un principal de service sans chaîne de connexion.

    Attribuez à ce connecteur d’accès le rôle suivant au groupe de ressources associé :

    • EventGrid EventSubscription Contributor : Ce rôle permet d’effectuer des opérations d’abonnement à Azure Event Grid (Event Grid), telles que la création ou l’énumération d’abonnements à des événements.

    Pour plus d’informations, consultez Attribuer des rôles Azure en utilisant le portail Azure.

  • Utilisation d’un rôle personnalisé

    Si vous êtes préoccupé par les autorisations excessives requises pour les rôles précédents, vous pouvez créer un Rôle personnalisé avec au moins les autorisations suivantes, listées ci-dessous au format JSON de rôle Azure :

    "permissions": [
      {
        "actions": [
          "Microsoft.EventGrid/eventSubscriptions/write",
          "Microsoft.EventGrid/eventSubscriptions/read",
          "Microsoft.EventGrid/eventSubscriptions/delete",
          "Microsoft.EventGrid/locations/eventSubscriptions/read",
          "Microsoft.Storage/storageAccounts/read",
          "Microsoft.Storage/storageAccounts/write",
          "Microsoft.Storage/storageAccounts/queueServices/read",
          "Microsoft.Storage/storageAccounts/queueServices/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/read",
          "Microsoft.Storage/storageAccounts/queueServices/queues/delete"
      ],
        "notActions": [],
        "dataActions": [
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/delete",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/read",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/process/action"
        ],
        "notDataActions": []
      }
    ]
    

    Ensuite, vous pouvez attribuer ce rôle personnalisé à votre connecteur d’accès.

    Pour plus d’informations, consultez Attribuer des rôles Azure en utilisant le portail Azure.

paramètres d’autorisations du chargeur automatique

Autorisations requises pour la configuration de la notification de fichier pour Amazon S3

Vous devez disposer d’autorisations de lecture pour le répertoire d’entrée. Pour plus d’informations, consultez les détails de la connexion S3.

Pour utiliser le mode Notification de fichiers, joignez le document de stratégie JSON suivant à votre utilisateur ou rôle IAM. Ce rôle IAM est nécessaire pour créer des informations d’identification de service avec lesquelles le chargeur automatique s’authentifie.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DatabricksAutoLoaderSetup",
      "Effect": "Allow",
      "Action": [
        "s3:GetBucketNotification",
        "s3:PutBucketNotification",
        "sns:ListSubscriptionsByTopic",
        "sns:GetTopicAttributes",
        "sns:SetTopicAttributes",
        "sns:CreateTopic",
        "sns:TagResource",
        "sns:Publish",
        "sns:Subscribe",
        "sqs:CreateQueue",
        "sqs:DeleteMessage",
        "sqs:ReceiveMessage",
        "sqs:SendMessage",
        "sqs:GetQueueUrl",
        "sqs:GetQueueAttributes",
        "sqs:SetQueueAttributes",
        "sqs:TagQueue",
        "sqs:ChangeMessageVisibility",
        "sqs:PurgeQueue"
      ],
      "Resource": [
        "arn:aws:s3:::<bucket-name>",
        "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
        "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
      ]
    },
    {
      "Sid": "DatabricksAutoLoaderList",
      "Effect": "Allow",
      "Action": ["sqs:ListQueues", "sqs:ListQueueTags", "sns:ListTopics"],
      "Resource": "*"
    },
    {
      "Sid": "DatabricksAutoLoaderTeardown",
      "Effect": "Allow",
      "Action": ["sns:Unsubscribe", "sns:DeleteTopic", "sqs:DeleteQueue"],
      "Resource": [
        "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
        "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
      ]
    }
  ]
}

où :

  • <bucket-name> : Nom du compartiment S3 dans lequel votre flux lira des fichiers, par exemple auto-logs. Vous pouvez utiliser * comme caractère générique, par exemple databricks-*-logs. Pour connaître le compartiment S3 sous-jacent de votre chemin d’accès DBFS, vous pouvez répertorier tous les points de montage DBFS d’un notebook en exécutant %fs mounts.
  • <region> : Région AWS dans laquelle réside le compartiment S3, par exemple us-west-2. Si vous ne souhaitez pas spécifier la région, utilisez *.
  • <account-number> : Numéro de compte AWS qui possède le compartiment S3, par exemple 123456789012. Si vous ne souhaitez pas spécifier le numéro de compte, utilisez *.

La chaîne databricks-auto-ingest-* dans la spécification ARN SQS et SNS est le préfixe de nom que la source cloudFiles utilise lors de la création des services SQS et SNS. Étant donné qu’Azure Databricks configure les services de notification lors de l’exécution initiale du flux, vous pouvez utiliser une stratégie avec des autorisations réduites après l’exécution initiale (par exemple, arrêter le flux, puis le redémarrer).

Remarque

La stratégie précédente concerne uniquement les autorisations nécessaires pour configurer les services de notification de fichiers, à savoir les services de notification de compartiment S3, SNS et SQS, et suppose que vous disposez déjà d’un accès en lecture au compartiment S3. Si vous devez ajouter des autorisations S3 en lecture seule, ajoutez ce qui suit à la liste Action dans l’instruction DatabricksAutoLoaderSetup dans le document JSON :

  • s3:ListBucket
  • s3:GetObject

Autorisations réduites après la configuration initiale

Les autorisations de configuration des ressources décrites ci-dessus sont requises uniquement lors de l’exécution initiale du flux. Après la première exécution, vous pouvez passer à la stratégie IAM suivante avec des autorisations réduites.

Important

Avec les autorisations réduites, vous ne pouvez pas démarrer de nouvelles requêtes de streaming ou recréer des ressources en cas d’échecs (par exemple, la file d’attente SQS a été supprimée accidentellement) ; vous ne pouvez pas également utiliser l’API de gestion des ressources cloud pour répertorier ou supprimer des ressources.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DatabricksAutoLoaderUse",
      "Effect": "Allow",
      "Action": [
        "s3:GetBucketNotification",
        "sns:ListSubscriptionsByTopic",
        "sns:GetTopicAttributes",
        "sns:TagResource",
        "sns:Publish",
        "sqs:DeleteMessage",
        "sqs:ReceiveMessage",
        "sqs:SendMessage",
        "sqs:GetQueueUrl",
        "sqs:GetQueueAttributes",
        "sqs:TagQueue",
        "sqs:ChangeMessageVisibility",
        "sqs:PurgeQueue"
      ],
      "Resource": [
        "arn:aws:sqs:<region>:<account-number>:<queue-name>",
        "arn:aws:sns:<region>:<account-number>:<topic-name>",
        "arn:aws:s3:::<bucket-name>"
      ]
    },
    {
      "Effect": "Allow",
      "Action": ["s3:GetBucketLocation", "s3:ListBucket"],
      "Resource": ["arn:aws:s3:::<bucket-name>"]
    },
    {
      "Effect": "Allow",
      "Action": ["s3:PutObject", "s3:PutObjectAcl", "s3:GetObject", "s3:DeleteObject"],
      "Resource": ["arn:aws:s3:::<bucket-name>/*"]
    },
    {
      "Sid": "DatabricksAutoLoaderListTopics",
      "Effect": "Allow",
      "Action": ["sqs:ListQueues", "sqs:ListQueueTags", "sns:ListTopics"],
      "Resource": "arn:aws:sns:<region>:<account-number>:*"
    }
  ]
}

Autorisations requises pour configurer la notification de fichier pour GCS

Vous devez disposer des autorisations list et get sur votre compartiment GCS et sur tous les objets. Pour plus d’informations, consultez la documentation de Google sur les autorisations IAM.

Pour utiliser le mode de notification de fichier, vous devez ajouter des autorisations pour le compte de service GCS et le compte de service utilisé pour accéder aux ressources Google Cloud Pub/Sub.

Ajoutez le rôle Pub/Sub Publisher au compte de service GCS. Cela permet au compte de publier des messages de notification d’événements de vos compartiments GCS vers Google Cloud Pub/Sub.

Quant au compte de service utilisé pour les ressources Google Cloud Pub/Sub, vous devez ajouter les autorisations suivantes. Ce compte de service est créé automatiquement lors de la création d'un credential de service Databricks . Le support des informations d’identification du service est disponible à partir de Databricks Runtime 16.1.

pubsub.subscriptions.consume
pubsub.subscriptions.create
pubsub.subscriptions.delete
pubsub.subscriptions.get
pubsub.subscriptions.list
pubsub.subscriptions.update
pubsub.topics.attachSubscription
pubsub.topics.detachSubscription
pubsub.topics.create
pubsub.topics.delete
pubsub.topics.get
pubsub.topics.list
pubsub.topics.update

Pour ce faire, vous pouvez soit créer un rôle personnalisé IAM avec ces autorisations, soit attribuer des rôles GCP préexistants pour couvrir ces autorisations.

Recherche du compte de service GCS

Dans la console Google Cloud pour le projet correspondant, accédez à Cloud Storage > Settings. La section « Compte de service de stockage cloud » contient l’e-mail du compte de service GCS.

Compte de service GCS

Création d’un rôle IAM Google Cloud personnalisé pour le mode Notification de fichiers

Dans la console Google Cloud pour le projet correspondant, accédez à IAM & Admin > Roles. Ensuite, créez un rôle en haut de l’écran ou mettez à jour un rôle existant. Dans l’écran de création ou de modification du rôle, cliquez sur Add Permissions. Dans le menu qui s’affiche, vous pouvez ajouter les autorisations souhaitées au rôle.

Rôles personnalisés GCP IAM

Configurer ou gérer manuellement des ressources de notification de fichier

Les utilisateurs privilégiés peuvent configurer ou gérer manuellement des ressources de notification de fichier.

  • Configurez manuellement les services de notification de fichiers via le fournisseur de cloud et spécifiez manuellement l’identificateur de file d’attente. Pour plus d’informations, consultez Options de notification de fichiers.
  • Utilisez les API Scala pour créer ou gérer les services de notifications et de mise en file d’attente, comme illustré dans l’exemple suivant :

Remarque

Vous devez disposer des autorisations appropriées pour configurer ou modifier l’infrastructure cloud. Consultez la documentation sur les autorisations pour Azure, S3 ou GCS.

Python

# Databricks notebook source
# MAGIC %md ## Python bindings for CloudFiles Resource Managers for all 3 clouds

# COMMAND ----------

#####################################
## Creating a ResourceManager in AWS
#####################################

# Using a Databricks service credential
manager = spark._jvm.com.databricks.sql.CloudFilesAWSResourceManager \
  .newManager() \
  .option("cloudFiles.region", <region>) \
  .option("path", <path-to-specific-bucket-and-folder>) \
  .option("databricks.serviceCredential", <service-credential-name>) \
  .create()

# Using AWS access key and secret key
manager = spark._jvm.com.databricks.sql.CloudFilesAWSResourceManager \
  .newManager() \
  .option("cloudFiles.region", <region>) \
  .option("cloudFiles.awsAccessKey", <aws-access-key>) \
  .option("cloudFiles.awsSecretKey", <aws-secret-key>) \
  .option("cloudFiles.roleArn", <role-arn>) \
  .option("cloudFiles.roleExternalId", <role-external-id>) \
  .option("cloudFiles.roleSessionName", <role-session-name>) \
  .option("cloudFiles.stsEndpoint", <sts-endpoint>) \
  .option("path", <path-to-specific-bucket-and-folder>) \
  .create()

#######################################
## Creating a ResourceManager in Azure
#######################################

# Using a Databricks service credential
manager = spark._jvm.com.databricks.sql.CloudFilesAzureResourceManager \
  .newManager() \
  .option("cloudFiles.resourceGroup", <resource-group>) \
  .option("cloudFiles.subscriptionId", <subscription-id>) \
  .option("databricks.serviceCredential", <service-credential-name>) \
  .option("path", <path-to-specific-container-and-folder>) \
  .create()

# Using an Azure service principal
manager = spark._jvm.com.databricks.sql.CloudFilesAzureResourceManager \
  .newManager() \
  .option("cloudFiles.connectionString", <connection-string>) \
  .option("cloudFiles.resourceGroup", <resource-group>) \
  .option("cloudFiles.subscriptionId", <subscription-id>) \
  .option("cloudFiles.tenantId", <tenant-id>) \
  .option("cloudFiles.clientId", <service-principal-client-id>) \
  .option("cloudFiles.clientSecret", <service-principal-client-secret>) \
  .option("path", <path-to-specific-container-and-folder>) \
  .create()

#######################################
## Creating a ResourceManager in GCP
#######################################

# Using a Databricks service credential
manager = spark._jvm.com.databricks.sql.CloudFilesGCPResourceManager \
  .newManager() \
  .option("cloudFiles.projectId", <project-id>) \
  .option("databricks.serviceCredential", <service-credential-name>) \
  .option("path", <path-to-specific-bucket-and-folder>) \
  .create()

# Using a Google service account
manager = spark._jvm.com.databricks.sql.CloudFilesGCPResourceManager \
  .newManager() \
  .option("cloudFiles.projectId", <project-id>) \
  .option("cloudFiles.client", <client-id>) \
  .option("cloudFiles.clientEmail", <client-email>) \
  .option("cloudFiles.privateKey", <private-key>) \
  .option("cloudFiles.privateKeyId", <private-key-id>) \
  .option("path", <path-to-specific-bucket-and-folder>) \
  .create()

# Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)

# List notification services created by <AL>
from pyspark.sql import DataFrame
df = DataFrame(manager.listNotificationServices(), spark)

# Tear down the notification services created for a specific stream ID.
# Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)

Langage de programmation Scala

/////////////////////////////////////
// Creating a ResourceManager in AWS
/////////////////////////////////////

import com.databricks.sql.CloudFilesAWSResourceManager

/**
 * Using a Databricks service credential
 */
val manager = CloudFilesAWSResourceManager
    .newManager
    .option("cloudFiles.region", <region>) // optional, will use the region of the EC2 instances by default
    .option("databricks.serviceCredential", <service-credential-name>)
    .option("path", <path-to-specific-bucket-and-folder>) // required only for setUpNotificationServices
    .create()

/**
 * Using AWS access key and secret key
 */
val manager = CloudFilesAWSResourceManager
    .newManager
    .option("cloudFiles.region", <region>)
    .option("cloudFiles.awsAccessKey", <aws-access-key>)
    .option("cloudFiles.awsSecretKey", <aws-secret-key>)
    .option("cloudFiles.roleArn", <role-arn>)
    .option("cloudFiles.roleExternalId", <role-external-id>)
    .option("cloudFiles.roleSessionName", <role-session-name>)
    .option("cloudFiles.stsEndpoint", <sts-endpoint>)
    .option("path", <path-to-specific-bucket-and-folder>) // required only for setUpNotificationServices
    .create()

///////////////////////////////////////
// Creating a ResourceManager in Azure
///////////////////////////////////////

import com.databricks.sql.CloudFilesAzureResourceManager

/**
 * Using a Databricks service credential
 */
val manager = CloudFilesAzureResourceManager
  .newManager
  .option("cloudFiles.resourceGroup", <resource-group>)
  .option("cloudFiles.subscriptionId", <subscription-id>)
  .option("databricks.serviceCredential", <service-credential-name>)
  .option("path", <path-to-specific-container-and-folder>) // required only for setUpNotificationServices
  .create()

/**
 * Using an Azure service principal
 */
val manager = CloudFilesAzureResourceManager
  .newManager
  .option("cloudFiles.connectionString", <connection-string>)
  .option("cloudFiles.resourceGroup", <resource-group>)
  .option("cloudFiles.subscriptionId", <subscription-id>)
  .option("cloudFiles.tenantId", <tenant-id>)
  .option("cloudFiles.clientId", <service-principal-client-id>)
  .option("cloudFiles.clientSecret", <service-principal-client-secret>)
  .option("path", <path-to-specific-container-and-folder>) // required only for setUpNotificationServices
  .create()

///////////////////////////////////////
// Creating a ResourceManager in GCP
///////////////////////////////////////

import com.databricks.sql.CloudFilesGCPResourceManager

/**
 * Using a Databricks service credential
 */
val manager = CloudFilesGCPResourceManager
    .newManager
    .option("cloudFiles.projectId", <project-id>)
    .option("databricks.serviceCredential", <service-credential-name>)
    .option("path", <path-to-specific-bucket-and-folder>) // Required only for setUpNotificationServices.
    .create()

/**
 * Using a Google service account
 */
val manager = CloudFilesGCPResourceManager
    .newManager
    .option("cloudFiles.projectId", <project-id>)
    .option("cloudFiles.client", <client-id>)
    .option("cloudFiles.clientEmail", <client-email>)
    .option("cloudFiles.privateKey", <private-key>)
    .option("cloudFiles.privateKeyId", <private-key-id>)
    .option("path", <path-to-specific-bucket-and-folder>) // Required only for setUpNotificationServices.
    .create()

// Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)

// List notification services created by <AL>
val df = manager.listNotificationServices()

// Tear down the notification services created for a specific stream ID.
// Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)

Utilisez setUpNotificationServices(<resource-suffix>) pour créer une file d’attente et un abonnement portant le nom <prefix>-<resource-suffix> (le préfixe dépend du système de stockage résumé dans les ressources cloud utilisées en mode de notification de fichier du chargeur automatique hérité). S’il existe une ressource portant le même nom, Azure Databricks réutilise la ressource qui existe déjà au lieu d’en créer une nouvelle. Cette fonction renvoie un identificateur de file d’attente que vous pouvez transmettre à la source cloudFiles à l’aide de l’identificateur figurant dans Options de notification de fichiers. Cela permet à l’utilisateur de la source cloudFiles d’avoir moins d’autorisations que l’utilisateur qui crée les ressources.

Fournissez l’option "path" à newManager uniquement si vous appelez setUpNotificationServices ; elle n’est pas nécessaire pour listNotificationServices ou tearDownNotificationServices. Il s’agit du même path que celui utilisé lors de l’exécution d’une requête de diffusion en continu.

La matrice suivante indique quelles méthodes d’API sont prises en charge dans Databricks Runtime pour chaque type de stockage :

Stockage cloud API Setup API List API Tear down
Amazon S3 Toutes les versions Toutes les versions Toutes les versions
ADLS Toutes les versions Toutes les versions Toutes les versions
GCS Databricks Runtime 9.1 et versions ultérieures Databricks Runtime 9.1 et versions ultérieures Databricks Runtime 9.1 et versions ultérieures
Stockage Blob Azure Toutes les versions Toutes les versions Toutes les versions

Nettoyer les ressources de notification d’événement créées par le chargeur automatique

Le chargeur automatique ne détruit pas automatiquement les ressources de notification de fichier. Pour supprimer les ressources de notification de fichier, vous devez utiliser le gestionnaire de ressources cloud comme indiqué dans la section précédente. Vous pouvez également supprimer ces ressources manuellement à l’aide de l’interface utilisateur ou des API du fournisseur cloud.

Résoudre les erreurs courantes

Cette section décrit les erreurs courantes lors de l’utilisation du chargeur automatique avec le mode de notification de fichier et comment les résoudre.

Échec de la création d’un abonnement Event Grid

Si le message d’erreur suivant s’affiche lorsque vous exécutez le chargeur automatique pour la première fois, Event Grid n’est pas inscrit en tant que fournisseur de ressources dans l’abonnement Azure.

java.lang.RuntimeException: Failed to create event grid subscription.

Pour inscrire Event Grid en tant que fournisseur de ressources, procédez comme suit :

  1. Dans le portail Azure, accédez à votre abonnement.
  2. Cliquez sur Fournisseurs de ressources sous la section Paramètres.
  3. Inscrivez le fournisseur Microsoft.EventGrid.

Autorisation requise pour effectuer des opérations d’abonnement Event Grid

Si le message d’erreur suivant s’affiche lorsque vous exécutez le chargeur automatique pour la première fois, vérifiez que le rôle Contributeur est affecté au principal de service pour Event Grid et le compte de stockage.

403 Forbidden ... does not have authorization to perform action 'Microsoft.EventGrid/eventSubscriptions/[read|write]' over scope ...

Le client Event Grid contourne le proxy

Dans Databricks Runtime 15.2 et versions ultérieures, les connexions Event Grid dans le chargeur automatique utilisent les paramètres proxy des propriétés système par défaut. Dans Databricks Runtime 13.3 LTS, 14.3 LTS et 15.0 à 15.2, vous pouvez configurer manuellement les connexions Event Grid pour utiliser un proxy en définissant la propriété Spark Configspark.databricks.cloudFiles.eventGridClient.useSystemProperties true. Consultez Configurer les propriétés Spark sur Azure Databricks.