Compartilhar via


Configurar fluxos do Carregador Automático no modo de notificação de arquivo

Esta página descreve como configurar fluxos do Carregador Automático para usar o modo de notificação de arquivo para descobrir e ingerir dados de nuvem de forma incremental.

No modo de notificação de arquivo, o Carregador Automático configura automaticamente um serviço de notificação e um serviço de fila que assina eventos de arquivo do diretório de entrada. Você pode usar as notificações de arquivo para escalar o Carregador Automático para ingerir milhões de arquivos por hora. Quando comparado ao modo de listagem de diretórios, o modo de notificação de arquivo é mais executável e escalonável.

Alterne entre as notificações de arquivo e a listagem de diretório a qualquer momento, mantendo as garantias de processamento de dados de apenas uma vez.

Observação

Não há suporte para o modo de notificação de arquivo para contas de armazenamento premium do Azure porque as contas premium não oferecem suporte ao armazenamento em fila.

Aviso

Não há suporte para a alteração do caminho de origem para o Carregador Automático no modo de notificação de arquivo. Se o modo de notificação de arquivo for usado e o caminho for alterado, você poderá falhar ao ingerir arquivos que já estão presentes no novo diretório no momento da atualização do diretório.

Modo de notificação de arquivo com e sem eventos de arquivo habilitados em locais externos

Há duas maneiras de configurar o Carregador Automático para usar o modo de notificação de arquivo:

  • (Recomendado) Eventos de arquivo: você usa uma única fila de notificação de arquivo para todos os fluxos que processam arquivos de um determinado local externo.

    Essa abordagem tem as seguintes vantagens em relação ao modo de notificação de arquivo herdado:

    • O Azure Databricks pode configurar assinaturas e eventos de arquivo em sua conta de armazenamento em nuvem para você sem exigir que você forneça credenciais adicionais para o Carregador Automático usando uma credencial de serviço ou outras opções de autenticação específicas da nuvem. Confira (Recomendado) Habilitar eventos de arquivo para um local externo.
    • Você tem menos políticas de identidade gerenciada do Azure para criar em sua conta de armazenamento em nuvem.
    • Como você não precisa mais criar uma fila para cada fluxo do Carregador Automático, é mais fácil evitar atingir os limites de notificação do provedor de nuvem listados em recursos de nuvem usados no modo de notificação de arquivo do Carregador Automático herdado.
    • O Azure Databricks gerencia automaticamente o ajuste dos requisitos de recursos, portanto, você não precisa ajustar parâmetros como cloudFiles.fetchParallelism.
    • A funcionalidade de limpeza significa que você não precisa se preocupar tanto com o ciclo de vida das notificações que são criadas na nuvem, como quando um fluxo é excluído ou totalmente atualizado.

Se você usar o Carregador Automático no modo de listagem de diretórios, o Databricks recomendará a migração para o modo de notificação de arquivo com eventos de arquivo. O Carregador Automático com eventos de arquivo oferece melhorias significativas de desempenho. Comece habilitando eventos de arquivo para sua localização externa e, em seguida, defina cloudFiles.useManagedFileEvents na configuração de fluxo do Carregador Automático.

  • Modo de notificação de arquivo herdado: você gerencia filas de notificação de arquivo para cada fluxo do Auto Loader separadamente. O Carregador Automático configura automaticamente um serviço de notificação e um serviço de fila que se inscreve em eventos de arquivo do diretório de entrada.

    Essa é a abordagem herdada.

Ativar o modo de notificação de arquivo com eventos de arquivo

Esta seção descreve como criar e atualizar fluxos do Carregador Automático para usar eventos de arquivo.

Antes de começar

A configuração de eventos de arquivo requer:

  • Um workspace do Azure Databricks habilitado para o Catálogo do Unity.
  • Permissão para criar credenciais de armazenamento e objetos de localização externos no Catálogo do Unity.

Os fluxos do Carregamento Automático com eventos de arquivo necessitam:

  • Computação no Databricks Runtime 14.3 LTS ou superior.

Instruções de configuração

As instruções a seguir se aplicam se você está criando novos fluxos do Carregador Automático ou migrando fluxos existentes para usar o modo de notificação de arquivo atualizado com eventos de arquivo:

  1. Crie uma credencial de armazenamento e um local externo no Catálogo do Unity que concedam acesso ao local de origem no armazenamento em nuvem para seus fluxos do Carregador Automático.

  2. Habilitar eventos de arquivo para a localização externa. Confira (Recomendado) Habilitar eventos de arquivo para um local externo.

  3. Quando você cria um novo fluxo do Carregador Automático ou edita um existente para trabalhar com o local externo:

    • Se você tiver fluxos de Carregador Automático baseados em notificações existentes que consomem dados do local externo, desative-os e exclua os recursos de notificação associados.
    • Certifique-se de que pathRewrites não esteja definido (esta não é uma opção comum).
    • Examine a lista de configurações que o Carregador Automático ignora ao gerenciar notificações de arquivo usando eventos de arquivo. Evite-os em novos fluxos do Carregador Automático e remova-os dos fluxos existentes que você está migrando para esse modo.
    • Defina a opção cloudFiles.useManagedFileEvents como true no seu código do Carregador Automático.

Por exemplo:

autoLoaderStream = (spark.readStream
  .format("cloudFiles")
  ...
  .options("cloudFiles.useManagedFileEvents", True)
  ...)

Se você estiver usando o Lakeflow Spark Declarative Pipelines e já tiver um pipeline com uma tabela de streaming, atualize-o para incluir a opção useManagedFileEvents :

CREATE OR REFRESH STREAMING LIVE TABLE <table-name>
AS SELECT <select clause expressions>
  FROM STREAM read_files('abfss://path/to/external/location/or/volume',
                   format => '<format>',
                   useManagedFileEvents => 'True'
                   ...
                   );

Configurações do Carregador Automático sem suporte

As seguintes configurações do Carregador Automático não têm suporte quando os fluxos usam eventos de arquivo:

Configurações Alteração
useIncremental Você não precisa mais decidir entre a eficiência das notificações de arquivo e a simplicidade da listagem de diretórios. O Carregador Automático com eventos de arquivo está disponível em um único modo.
useNotifications Há apenas uma fila e uma assinatura de evento de armazenamento por localização externa.
cloudFiles.fetchParallelism O Carregador Automático com eventos de arquivo não oferece uma otimização manual de paralelismo.
cloudFiles.backfillInterval O Azure Databricks gerencia automaticamente o reabastecimento de dados para locais externos habilitados para eventos de arquivo.
cloudFiles.pathRewrites Essa opção se aplica somente quando você monta locais de dados externos no DBFS, que é preterido.
resourceTags Você deve definir marcas de recurso usando o console de nuvem.

Para práticas recomendadas de eventos de arquivo gerenciado, consulte As práticas recomendadas para o Carregador Automático com eventos de arquivo.

Limitações do Carregador Automático com os eventos de arquivo

O serviço de eventos de arquivo otimiza a descoberta de arquivos armazenando em cache os arquivos criados mais recentemente. Se o Carregador Automático for executado com pouca frequência, esse cache poderá expirar e o Carregador Automático retornará à listagem de diretórios para descobrir arquivos e atualizar o cache. Para evitar esse cenário, invoque o Carregador Automático pelo menos uma vez a cada sete dias.

Para obter uma lista geral de limitações em eventos de arquivo, consulte Limitações de eventos de arquivo.

Gerencie separadamente as filas de notificação de arquivo para cada fluxo do Auto Loader (legado)

Importante

Você precisa de permissões elevadas para configurar automaticamente a infraestrutura de nuvem para o modo de notificação de arquivo. Entre em contato com o administrador de nuvem ou o administrador de workspace. Ver:

Recursos de nuvem usados no modo de notificação de arquivo herdado do Carregador Automático

O Carregador automático pode configurar notificações de arquivos automaticamente quando se define a opção cloudFiles.useNotifications como true e fornece as permissões para criar os recursos de nuvem. Além disso, talvez seja necessário fornecer as opções adicionais para conceder autorização ao Carregador Automático para criar esses recursos.

A tabela a seguir lista os recursos criados pelo Carregador Automático para cada provedor de nuvem.

Armazenamento em nuvem Serviço da assinatura Serviço Fila Prefixo * Limite **
Amazon S3 AWS SNS AWS SQS ingestão automática do Databricks 100 por bucket S3
ADLS Grade de Eventos do Azure Armazenamento de Filas do Azure databricks 500 por conta de armazenamento
GCS Google Pub/Sub Google Pub/Sub ingestão automática do Databricks 100 por bucket GCS
Armazenamento de Blobs do Azure Grade de Eventos do Azure Armazenamento de Filas do Azure databricks 500 por conta de armazenamento

* O Carregador Automático nomeia os recursos com esse prefixo.

** Quantos pipelines simultâneos de notificação de arquivo podem ser inicializados

Se você precisar executar mais fluxos do Carregador Automático baseado em notificação de arquivo do que o permitido por esses limites, poderá usar eventos de arquivo ou um serviço como AWS Lambda, Azure Functions ou Google Cloud Functions para distribuir notificações de uma única fila que escuta um contêiner inteiro ou bucket em filas específicas para diretórios.

Eventos de notificação de arquivos legados

A Amazon S3 fornece um evento ObjectCreated sempre que um arquivo é carregado em um bucket S3, independentemente de ter sido carregado por um upload de várias partes ou por put.

O Azure Data Lake Storage fornece notificações de evento diferentes para arquivos que aparecem em seu contêiner de armazenamento.

  • O Carregador automático escuta o evento FlushWithClose para processar um arquivo.
  • Os fluxos do Carregador Automático dão suporte à ação RenameFile para descobrir arquivos. As ações RenameFile exigem uma solicitação de API para o sistema de armazenamento a fim de obter o tamanho do arquivo renomeado.
  • Os fluxos do Carregador automático criados com Databricks Runtime 9.0 e versões posteriores suportam a ação RenameDirectory para descoberta de arquivos. As ações RenameDirectory exigem solicitações de API para o sistema de armazenamento a fim de listar o conteúdo do diretório renomeado.

O Google Cloud Armazenamento fornece um evento OBJECT_FINALIZE quando um arquivo é carregado, que inclui substituições e cópias de arquivo. Os uploads com falha não geram esse evento.

Observação

Os provedores de nuvem não garantem a entrega de 100% de todos os eventos de arquivo em condições muito raras e não fornecem SLAs rigorosos da latência dos eventos de arquivo. O Databricks recomenda disparar os provisionamentos regulares com o Carregador automático ao usar a opção cloudFiles.backfillInterval para garantir que todos os arquivos sejam descobertos em um determinado SLA, se a conclusão dos dados for um requisito. O disparo de provisionamentos regulares não causa duplicações.

Permissões necessárias para configurar a notificação de arquivo para o Azure Data Lake Storage e o Armazenamento de Blobs do Azure

Você deve ter permissões de leitura para o diretório de entrada. Confira Armazenamento de Blobs do Azure.

Para usar o modo de notificação de arquivo, você deve fornecer credenciais de autenticação para configurar e acessar os serviços de notificação de eventos.

Você pode autenticar usando um dos seguintes métodos:

Depois de obter as credenciais de autenticação, atribua as permissões necessárias ao conector de acesso do Databricks (para credenciais de serviço) ou ao aplicativo do Microsoft Entra ID (para um principal de serviço).

  • Usando funções internas do Azure

    Atribua ao conector de acesso as seguintes funções à conta de armazenamento na qual reside o caminho de entrada:

    • Colaborador: esta função é usada para configurar os recursos na conta de armazenamento, como filas e assinaturas de evento.
    • Colaborador de dados da fila de armazenamento: essa função é usada para executar operações de fila, como recuperar e excluir mensagens das filas. Esta função é necessária somente quando você fornece uma entidade de serviço sem uma cadeia de conexão.

    Atribua a esse conector de acesso a seguinte função ao grupo de recursos relacionado:

    Para obter mais informações, confira Atribuir funções do Azure usando o portal do Azure.

  • Usar uma função personalizada

    Se você estiver preocupado com as permissões excessivas necessárias para as funções anteriores, crie uma Função Personalizada com, pelo menos, as seguintes permissões listadas abaixo no formato JSON da função do Azure:

    "permissions": [
      {
        "actions": [
          "Microsoft.EventGrid/eventSubscriptions/write",
          "Microsoft.EventGrid/eventSubscriptions/read",
          "Microsoft.EventGrid/eventSubscriptions/delete",
          "Microsoft.EventGrid/locations/eventSubscriptions/read",
          "Microsoft.Storage/storageAccounts/read",
          "Microsoft.Storage/storageAccounts/write",
          "Microsoft.Storage/storageAccounts/queueServices/read",
          "Microsoft.Storage/storageAccounts/queueServices/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/read",
          "Microsoft.Storage/storageAccounts/queueServices/queues/delete"
      ],
        "notActions": [],
        "dataActions": [
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/delete",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/read",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/write",
          "Microsoft.Storage/storageAccounts/queueServices/queues/messages/process/action"
        ],
        "notDataActions": []
      }
    ]
    

    Em seguida, você pode atribuir essa função personalizada ao conector de acesso.

    Para obter mais informações, confira Atribuir funções do Azure usando o portal do Azure.

configurações de permissões do Carregador Automático

permissões necessárias para configurar a notificação de arquivo para o Amazon S3

Você deve ter permissões de leitura para o diretório de entrada. Consulte os detalhes sobre a conexão S3 para obter mais informações.

Para usar o modo de notificação de arquivo, anexe o seguinte documento de política JSON ao usuário ou função do IAM. Essa função IAM é necessária para criar uma credencial de serviço com a qual o Carregador Automático possa se autenticar.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DatabricksAutoLoaderSetup",
      "Effect": "Allow",
      "Action": [
        "s3:GetBucketNotification",
        "s3:PutBucketNotification",
        "sns:ListSubscriptionsByTopic",
        "sns:GetTopicAttributes",
        "sns:SetTopicAttributes",
        "sns:CreateTopic",
        "sns:TagResource",
        "sns:Publish",
        "sns:Subscribe",
        "sqs:CreateQueue",
        "sqs:DeleteMessage",
        "sqs:ReceiveMessage",
        "sqs:SendMessage",
        "sqs:GetQueueUrl",
        "sqs:GetQueueAttributes",
        "sqs:SetQueueAttributes",
        "sqs:TagQueue",
        "sqs:ChangeMessageVisibility",
        "sqs:PurgeQueue"
      ],
      "Resource": [
        "arn:aws:s3:::<bucket-name>",
        "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
        "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
      ]
    },
    {
      "Sid": "DatabricksAutoLoaderList",
      "Effect": "Allow",
      "Action": ["sqs:ListQueues", "sqs:ListQueueTags", "sns:ListTopics"],
      "Resource": "*"
    },
    {
      "Sid": "DatabricksAutoLoaderTeardown",
      "Effect": "Allow",
      "Action": ["sns:Unsubscribe", "sns:DeleteTopic", "sqs:DeleteQueue"],
      "Resource": [
        "arn:aws:sqs:<region>:<account-number>:databricks-auto-ingest-*",
        "arn:aws:sns:<region>:<account-number>:databricks-auto-ingest-*"
      ]
    }
  ]
}

onde:

  • <bucket-name>: nome do bucket S3 em que o fluxo lerá arquivos, por exemplo, auto-logs. Você pode usar * como um curinga, por exemplo, databricks-*-logs. Para descobrir o bucket S3 subjacente para o caminho DBFS, liste todos os pontos de montagem do DBFS em um notebook executando %fs mounts.
  • <region>: região da AWS em que o bucket S3 reside, por exemplo, us-west-2. Se você não quiser especificar a região, use *.
  • <account-number>: número da conta da AWS que possui o bucket S3, por exemplo, 123456789012. Se não quiser especificar o número da conta, use *.

A cadeia de caracteres databricks-auto-ingest-* na especificação do ARN SQS e SNS é o prefixo do nome que a origem cloudFiles usa ao criar serviços do SQS e SNS. Como o Azure Databricks configura os serviços de notificação na execução inicial do fluxo, você pode usar uma política com permissões reduzidas após a execução inicial (por exemplo, parar o fluxo e reiniciá-lo).

Observação

A política anterior se preocupa apenas com as permissões necessárias para configurar os serviços de notificação de arquivo, ou seja, notificação de bucket S3, serviços do SNS e SQS, e presume-se que você já tenha acesso de leitura ao bucket S3. Se precisar adicionar permissões de somente leitura ao S3, adicione o seguinte à lista Action na instrução DatabricksAutoLoaderSetup no documento JSON:

  • s3:ListBucket
  • s3:GetObject

Permissões reduzidas após a configuração inicial

As permissões de configuração de recurso descritas acima são necessárias apenas durante a execução inicial do fluxo. Após a primeira execução, você pode alternar para a seguinte política de IAM com permissões reduzidas.

Importante

Com as permissões reduzidas, você não pode iniciar novas consultas de streaming ou recriar recursos em caso de falhas (por exemplo, a fila do SQS foi excluída acidentalmente); você também não pode usar a API de gerenciamento de recursos de nuvem para listar ou derrubar recursos.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "DatabricksAutoLoaderUse",
      "Effect": "Allow",
      "Action": [
        "s3:GetBucketNotification",
        "sns:ListSubscriptionsByTopic",
        "sns:GetTopicAttributes",
        "sns:TagResource",
        "sns:Publish",
        "sqs:DeleteMessage",
        "sqs:ReceiveMessage",
        "sqs:SendMessage",
        "sqs:GetQueueUrl",
        "sqs:GetQueueAttributes",
        "sqs:TagQueue",
        "sqs:ChangeMessageVisibility",
        "sqs:PurgeQueue"
      ],
      "Resource": [
        "arn:aws:sqs:<region>:<account-number>:<queue-name>",
        "arn:aws:sns:<region>:<account-number>:<topic-name>",
        "arn:aws:s3:::<bucket-name>"
      ]
    },
    {
      "Effect": "Allow",
      "Action": ["s3:GetBucketLocation", "s3:ListBucket"],
      "Resource": ["arn:aws:s3:::<bucket-name>"]
    },
    {
      "Effect": "Allow",
      "Action": ["s3:PutObject", "s3:PutObjectAcl", "s3:GetObject", "s3:DeleteObject"],
      "Resource": ["arn:aws:s3:::<bucket-name>/*"]
    },
    {
      "Sid": "DatabricksAutoLoaderListTopics",
      "Effect": "Allow",
      "Action": ["sqs:ListQueues", "sqs:ListQueueTags", "sns:ListTopics"],
      "Resource": "arn:aws:sns:<region>:<account-number>:*"
    }
  ]
}

Permissões necessárias para configurar a notificação de arquivo para o GCS

Você precisa ter as permissões list e get no bucket do GCS e em todos os objetos. Para obter detalhes, confira a documentação do Google sobre permissões do IAM.

Para usar o modo de notificação de arquivo, você precisará adicionar permissões da conta de serviço do GCS e da conta de serviço usada para acessar os recursos do Google Cloud Pub/Sub.

Adicione a função Pub/Sub Publisher à conta de serviço do GCS. Isso permite que a conta publique mensagens de notificação de eventos dos buckets do GCS na Pub/Sub do Google Cloud.

Quanto à conta de serviço usada para os recursos do Google Cloud Pub/Sub, você precisa adicionar as permissões a seguir. Essa conta de serviço é criada automaticamente quando você cria uma credencial de serviço do Databricks . O suporte à credencial de serviço está disponível no Databricks Runtime 16.1 e superior.

pubsub.subscriptions.consume
pubsub.subscriptions.create
pubsub.subscriptions.delete
pubsub.subscriptions.get
pubsub.subscriptions.list
pubsub.subscriptions.update
pubsub.topics.attachSubscription
pubsub.topics.detachSubscription
pubsub.topics.create
pubsub.topics.delete
pubsub.topics.get
pubsub.topics.list
pubsub.topics.update

Para isso, você pode criar uma função personalizada do IAM com essas permissões ou atribuir funções do GCP pré-existentes para cobrir essas permissões.

Localizar a conta de serviço do GCS

No console do Google Cloud do projeto correspondente, navegue até Cloud Storage > Settings. A seção chamada “Conta do serviço de armazenamento em nuvem” contém o email da conta de serviço do GCS.

Conta de serviço do GCS

Criar uma função do IAM do Google Cloud personalizada para o modo de notificação de arquivo

No console do Google Cloud do projeto correspondente, navegue até IAM & Admin > Roles. Crie uma função na parte superior ou atualize uma função existente. Na tela para criação ou edição da função, clique em Add Permissions. Um menu será exibido, no qual você poderá adicionar as permissões à função.

Funções Personalizadas de IAM do GCP

Configurar ou gerenciar manualmente os recursos de notificação de arquivo

Usuários privilegiados podem configurar ou gerenciar manualmente os recursos de notificação de arquivo.

  • Configure os serviços de notificação de arquivo manualmente por meio do provedor de nuvem e especifique manualmente o identificador de fila. Confira Opções de notificação de arquivos para obter mais detalhes.
  • Use as APIs do Scala para criar ou gerenciar as notificações e serviços de enfileiramento, conforme mostrado no exemplo a seguir:

Observação

Você deve ter permissões apropriadas para configurar ou modificar a infraestrutura de nuvem. Consulte a documentação de permissões para o Azure, S3 ou GCS.

Python

# Databricks notebook source
# MAGIC %md ## Python bindings for CloudFiles Resource Managers for all 3 clouds

# COMMAND ----------

#####################################
## Creating a ResourceManager in AWS
#####################################

# Using a Databricks service credential
manager = spark._jvm.com.databricks.sql.CloudFilesAWSResourceManager \
  .newManager() \
  .option("cloudFiles.region", <region>) \
  .option("path", <path-to-specific-bucket-and-folder>) \
  .option("databricks.serviceCredential", <service-credential-name>) \
  .create()

# Using AWS access key and secret key
manager = spark._jvm.com.databricks.sql.CloudFilesAWSResourceManager \
  .newManager() \
  .option("cloudFiles.region", <region>) \
  .option("cloudFiles.awsAccessKey", <aws-access-key>) \
  .option("cloudFiles.awsSecretKey", <aws-secret-key>) \
  .option("cloudFiles.roleArn", <role-arn>) \
  .option("cloudFiles.roleExternalId", <role-external-id>) \
  .option("cloudFiles.roleSessionName", <role-session-name>) \
  .option("cloudFiles.stsEndpoint", <sts-endpoint>) \
  .option("path", <path-to-specific-bucket-and-folder>) \
  .create()

#######################################
## Creating a ResourceManager in Azure
#######################################

# Using a Databricks service credential
manager = spark._jvm.com.databricks.sql.CloudFilesAzureResourceManager \
  .newManager() \
  .option("cloudFiles.resourceGroup", <resource-group>) \
  .option("cloudFiles.subscriptionId", <subscription-id>) \
  .option("databricks.serviceCredential", <service-credential-name>) \
  .option("path", <path-to-specific-container-and-folder>) \
  .create()

# Using an Azure service principal
manager = spark._jvm.com.databricks.sql.CloudFilesAzureResourceManager \
  .newManager() \
  .option("cloudFiles.connectionString", <connection-string>) \
  .option("cloudFiles.resourceGroup", <resource-group>) \
  .option("cloudFiles.subscriptionId", <subscription-id>) \
  .option("cloudFiles.tenantId", <tenant-id>) \
  .option("cloudFiles.clientId", <service-principal-client-id>) \
  .option("cloudFiles.clientSecret", <service-principal-client-secret>) \
  .option("path", <path-to-specific-container-and-folder>) \
  .create()

#######################################
## Creating a ResourceManager in GCP
#######################################

# Using a Databricks service credential
manager = spark._jvm.com.databricks.sql.CloudFilesGCPResourceManager \
  .newManager() \
  .option("cloudFiles.projectId", <project-id>) \
  .option("databricks.serviceCredential", <service-credential-name>) \
  .option("path", <path-to-specific-bucket-and-folder>) \
  .create()

# Using a Google service account
manager = spark._jvm.com.databricks.sql.CloudFilesGCPResourceManager \
  .newManager() \
  .option("cloudFiles.projectId", <project-id>) \
  .option("cloudFiles.client", <client-id>) \
  .option("cloudFiles.clientEmail", <client-email>) \
  .option("cloudFiles.privateKey", <private-key>) \
  .option("cloudFiles.privateKeyId", <private-key-id>) \
  .option("path", <path-to-specific-bucket-and-folder>) \
  .create()

# Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)

# List notification services created by <AL>
from pyspark.sql import DataFrame
df = DataFrame(manager.listNotificationServices(), spark)

# Tear down the notification services created for a specific stream ID.
# Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)

Scala (linguagem de programação)

/////////////////////////////////////
// Creating a ResourceManager in AWS
/////////////////////////////////////

import com.databricks.sql.CloudFilesAWSResourceManager

/**
 * Using a Databricks service credential
 */
val manager = CloudFilesAWSResourceManager
    .newManager
    .option("cloudFiles.region", <region>) // optional, will use the region of the EC2 instances by default
    .option("databricks.serviceCredential", <service-credential-name>)
    .option("path", <path-to-specific-bucket-and-folder>) // required only for setUpNotificationServices
    .create()

/**
 * Using AWS access key and secret key
 */
val manager = CloudFilesAWSResourceManager
    .newManager
    .option("cloudFiles.region", <region>)
    .option("cloudFiles.awsAccessKey", <aws-access-key>)
    .option("cloudFiles.awsSecretKey", <aws-secret-key>)
    .option("cloudFiles.roleArn", <role-arn>)
    .option("cloudFiles.roleExternalId", <role-external-id>)
    .option("cloudFiles.roleSessionName", <role-session-name>)
    .option("cloudFiles.stsEndpoint", <sts-endpoint>)
    .option("path", <path-to-specific-bucket-and-folder>) // required only for setUpNotificationServices
    .create()

///////////////////////////////////////
// Creating a ResourceManager in Azure
///////////////////////////////////////

import com.databricks.sql.CloudFilesAzureResourceManager

/**
 * Using a Databricks service credential
 */
val manager = CloudFilesAzureResourceManager
  .newManager
  .option("cloudFiles.resourceGroup", <resource-group>)
  .option("cloudFiles.subscriptionId", <subscription-id>)
  .option("databricks.serviceCredential", <service-credential-name>)
  .option("path", <path-to-specific-container-and-folder>) // required only for setUpNotificationServices
  .create()

/**
 * Using an Azure service principal
 */
val manager = CloudFilesAzureResourceManager
  .newManager
  .option("cloudFiles.connectionString", <connection-string>)
  .option("cloudFiles.resourceGroup", <resource-group>)
  .option("cloudFiles.subscriptionId", <subscription-id>)
  .option("cloudFiles.tenantId", <tenant-id>)
  .option("cloudFiles.clientId", <service-principal-client-id>)
  .option("cloudFiles.clientSecret", <service-principal-client-secret>)
  .option("path", <path-to-specific-container-and-folder>) // required only for setUpNotificationServices
  .create()

///////////////////////////////////////
// Creating a ResourceManager in GCP
///////////////////////////////////////

import com.databricks.sql.CloudFilesGCPResourceManager

/**
 * Using a Databricks service credential
 */
val manager = CloudFilesGCPResourceManager
    .newManager
    .option("cloudFiles.projectId", <project-id>)
    .option("databricks.serviceCredential", <service-credential-name>)
    .option("path", <path-to-specific-bucket-and-folder>) // Required only for setUpNotificationServices.
    .create()

/**
 * Using a Google service account
 */
val manager = CloudFilesGCPResourceManager
    .newManager
    .option("cloudFiles.projectId", <project-id>)
    .option("cloudFiles.client", <client-id>)
    .option("cloudFiles.clientEmail", <client-email>)
    .option("cloudFiles.privateKey", <private-key>)
    .option("cloudFiles.privateKeyId", <private-key-id>)
    .option("path", <path-to-specific-bucket-and-folder>) // Required only for setUpNotificationServices.
    .create()

// Set up a queue and a topic subscribed to the path provided in the manager.
manager.setUpNotificationServices(<resource-suffix>)

// List notification services created by <AL>
val df = manager.listNotificationServices()

// Tear down the notification services created for a specific stream ID.
// Stream ID is a GUID string that you can find in the list result above.
manager.tearDownNotificationServices(<stream-id>)

Use setUpNotificationServices(<resource-suffix>) para criar uma fila e uma assinatura com o nome <prefix>-<resource-suffix> (o prefixo depende do sistema de armazenamento resumido em Recursos de nuvem usados no modo de notificação de arquivo herdado do Carregador Automático. Se houver um recurso com o mesmo nome, o Azure Databricks reutilizará o recurso existente em vez de criar um. Essa função retorna um identificador de fila que pode ser passado para a origem cloudFiles usando o identificador em Opções de notificação de arquivos. Isso permite que o usuário da origem cloudFiles tenha menos permissões do que o usuário que cria os recursos.

Forneça a opção "path" para newManager somente se estiver chamando setUpNotificationServices; ela não é necessária para listNotificationServices ou tearDownNotificationServices. Isso é o mesmo path que você usa ao executar uma consulta de fluxo.

A matriz a seguir indica quais métodos de API têm suporte em qual Databricks Runtime para cada tipo de armazenamento:

Armazenamento em nuvem Instalação de API API de lista Desmontar a API
Amazon S3 Todas as versões Todas as versões Todas as versões
ADLS Todas as versões Todas as versões Todas as versões
GCS Databricks Runtime 9.1 e superiores Databricks Runtime 9.1 e superiores Databricks Runtime 9.1 e superiores
Armazenamento de Blobs do Azure Todas as versões Todas as versões Todas as versões

Limpar recursos de notificação de eventos criados pelo Auto Loader

O Carregador Automático não derruba automaticamente os recursos de notificação de arquivo. Para derrubar recursos de notificação de arquivo, você deve usar o gerenciador de recursos de nuvem, conforme mostrado na seção anterior. Você também pode excluir esses recursos manualmente usando a interface do usuário ou as APIs do provedor de nuvem.

Solução de problemas comuns

Esta seção descreve erros comuns ao usar o Carregador Automático com o modo de notificação de arquivo e como resolvê-los.

Falha ao criar uma assinatura da Grade de Eventos

Se você vir a seguinte mensagem de erro ao executar o Carregador Automático pela primeira vez, a Grade de Eventos não será registrada como um Provedor de Recursos na assinatura do Azure.

java.lang.RuntimeException: Failed to create event grid subscription.

Para registrar a Grade de Eventos como um provedor de recursos, faça o seguinte:

  1. No portal do Azure, acesse sua assinatura.
  2. Clique em Provedores de recursos na seção Configurações.
  3. Registre o provedor Microsoft.EventGrid.

Autorização necessária para executar operações de assinatura da Grade de Eventos

Se você vir a seguinte mensagem de erro ao executar o Auto Loader pela primeira vez, confirme se a função Colaborador está atribuída à entidade de serviço da Grade de Eventos e da conta de armazenamento.

403 Forbidden ... does not have authorization to perform action 'Microsoft.EventGrid/eventSubscriptions/[read|write]' over scope ...

O cliente da Grade de Eventos ignora o proxy

No Databricks Runtime 15.2 e versões posteriores, as conexões da Grade de Eventos no Auto Loader usam configurações de proxy a partir das propriedades do sistema por padrão. No Databricks Runtime 13.3 LTS, 14.3 LTS e 15.0 a 15.2, você pode configurar manualmente conexões da Grade de Eventos para usar um proxy definindo a propriedade Spark Configspark.databricks.cloudFiles.eventGridClient.useSystemProperties true. Confira o artigo Definir as propriedades de configuração do Spark no Azure Databricks.