Partilhar via


Guia de início rápido: enviar ou receber eventos de hubs de eventos usando Python

Neste Guia de início rápido, você aprenderá como enviar e receber eventos de um hub de eventos usando o pacote Python azure-eventhub .

Pré-requisitos

Se você é novo nos Hubs de Eventos do Azure, consulte Visão geral dos Hubs de Eventos antes de fazer este início rápido.

Para concluir este início rápido, certifique-se de ter os seguintes pré-requisitos:

  • Subscrição do Microsoft Azure: inscreva-se para uma avaliação gratuita se não tiver uma.
  • Python 3.8 ou posterior: Certifique-se de que o pip está instalado e atualizado.
  • Código do Visual Studio (recomendado): Ou use qualquer outro IDE de sua escolha.
  • Espaço de nomes de Hubs de Eventos e hub de eventos: siga este guia para criá-los no portal do Azure.

Instale os pacotes para enviar eventos

Para instalar os pacotes Python para Hubs de Eventos, abra um prompt de comando que tenha Python em seu caminho. Altere o diretório para a pasta onde você deseja manter suas amostras.

pip install azure-eventhub
pip install azure-identity
pip install aiohttp

Autenticar o aplicativo no Azure

Este guia de início rápido mostra duas maneiras de se conectar aos Hubs de Eventos do Azure:

  • Autenticação sem senha. Use sua entidade de segurança no Microsoft Entra ID e no RBAC (controle de acesso baseado em função) para se conectar a um namespace de Hubs de Eventos. Você não precisa se preocupar em ter cadeias de conexão codificadas em seu código, em um arquivo de configuração ou em armazenamento seguro, como o Azure Key Vault.
  • Cadeia de conexão. Use uma cadeia de conexão para se conectar a um namespace de Hubs de Eventos. Se você for novo no Azure, talvez ache a opção de cadeia de conexão mais fácil de seguir.

Recomendamos o uso da opção sem senha em aplicativos e ambientes de produção do mundo real. Para obter mais informações, consulte Autenticação e autorização do Service Bus e Conexões sem senha para serviços do Azure.

Atribuir funções ao usuário do Microsoft Entra

Ao desenvolver localmente, verifique se a conta de usuário que se conecta aos Hubs de Eventos do Azure tem as permissões corretas. Você precisa da função Proprietário de Dados dos Hubs de Eventos do Azure para enviar e receber mensagens. Para atribuir essa função a si mesmo, você precisa da função de Administrador de Acesso de Usuário ou outra função que inclua a Microsoft.Authorization/roleAssignments/write ação. Você pode atribuir funções do RBAC do Azure a um usuário usando o portal do Azure, a CLI do Azure ou o Azure PowerShell. Para obter mais informações, consulte Entender o escopo da página RBAC do Azure .

O exemplo a seguir atribui a Azure Event Hubs Data Owner função à sua conta de usuário, que fornece acesso total aos recursos dos Hubs de Eventos do Azure. Em um cenário real, siga o Princípio do Menor Privilégio para dar aos usuários apenas as permissões mínimas necessárias para um ambiente de produção mais seguro.

Funções incorporadas do Azure para Hubs de Eventos do Azure

Para Hubs de Eventos do Azure, o gerenciamento de namespaces e todos os recursos relacionados por meio do portal do Azure e da API de gerenciamento de recursos do Azure já está protegido usando o modelo RBAC do Azure. O Azure fornece as seguintes funções internas para autorizar o acesso a um namespace de Hubs de Eventos:

Se quiser criar uma função personalizada, consulte Direitos necessários para operações de Hubs de Eventos.

Importante

Na maioria dos casos, leva um ou dois minutos para que a atribuição de função se propague no Azure. Em casos raros, pode demorar até oito minutos. Se você receber erros de autenticação quando executar o código pela primeira vez, aguarde alguns momentos e tente novamente.

  1. No portal do Azure, localize seu namespace de Hubs de Eventos usando a barra de pesquisa principal ou a navegação à esquerda.

  2. Na página de visão geral, selecione Controle de acesso (IAM) no menu à esquerda.

  3. Na página Controlo de acesso (IAM), selecione o separador Atribuição de funções.

  4. Selecione + Adicionar no menu superior. Em seguida, selecione Adicionar atribuição de função.

    Captura de ecrã a mostrar como atribuir uma função.

  5. Use a caixa de pesquisa para filtrar os resultados para a função desejada. Para este exemplo, procure Azure Event Hubs Data Owner e selecione o resultado correspondente. Em seguida, escolha Avançar.

  6. Em Atribuir acesso a, selecione Usuário, grupo ou entidade de serviço. Em seguida, escolha + Selecionar membros.

  7. Na caixa de diálogo, procure o seu nome de utilizador do Microsoft Entra (normalmente o seu endereço de e-mail user@domain ). Escolha Selecionar na parte inferior da caixa de diálogo.

  8. Selecione Rever + atribuir para ir para a página final. Selecione Rever + atribuir novamente para concluir o processo.

Enviar eventos

Nesta seção, crie um script Python para enviar eventos para o hub de eventos que você criou anteriormente.

  1. Abra seu editor Python favorito, como o Visual Studio Code.

  2. Crie um script chamado send.py. Esse script envia um lote de eventos para o hub de eventos que você criou anteriormente.

  3. Cole o seguinte código em send.py:

    No código, use valores reais para substituir os seguintes espaços reservados:

    • EVENT_HUB_FULLY_QUALIFIED_NAMESPACE - Você vê o nome totalmente qualificado na página Visão geral do namespace. Deve estar no formato: <NAMESPACENAME>>.servicebus.windows.net.
    • EVENT_HUB_NAME - Nome do hub de eventos.
    import asyncio
    
    from azure.eventhub import EventData
    from azure.eventhub.aio import EventHubProducerClient
    from azure.identity.aio import DefaultAzureCredential
    
    EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "EVENT_HUB_FULLY_QUALIFIED_NAMESPACE"
    EVENT_HUB_NAME = "EVENT_HUB_NAME"
    
    credential = DefaultAzureCredential()
    
    async def run():
        # Create a producer client to send messages to the event hub.
        # Specify a credential that has correct role assigned to access
        # event hubs namespace and the event hub name.
        producer = EventHubProducerClient(
            fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE,
            eventhub_name=EVENT_HUB_NAME,
            credential=credential,
        )
        print("Producer client created successfully.") 
        async with producer:
            # Create a batch.
            event_data_batch = await producer.create_batch()
    
            # Add events to the batch.
            event_data_batch.add(EventData("First event "))
            event_data_batch.add(EventData("Second event"))
            event_data_batch.add(EventData("Third event"))
    
            # Send the batch of events to the event hub.
            await producer.send_batch(event_data_batch)
    
            # Close credential when no longer needed.
            await credential.close()
    
    asyncio.run(run())
    

    Observação

    Para obter exemplos de outras opções para enviar eventos para um hub de eventos de forma assíncrona usando uma cadeia de conexão, consulte a página send_async.py do GitHub. Os padrões mostrados também são aplicáveis ao envio de eventos sem senha.

Receber eventos

Este guia de início rápido usa o Armazenamento Blob do Azure como um armazenamento de ponto de verificação. O armazenamento de pontos de verificação é usado para persistir pontos de verificação (ou seja, as últimas posições lidas).

Siga estas recomendações ao usar o Armazenamento de Blobs do Azure como um armazenamento de ponto de verificação:

  • Use um contêiner separado para cada grupo de consumidores. Você pode usar a mesma conta de armazenamento, mas usar um contêiner por cada grupo.
  • Não use a conta de armazenamento para mais nada.
  • Não use o recipiente para mais nada.
  • Crie a conta de armazenamento na mesma região do aplicativo implantado. Se a aplicação for local, tente escolher a região mais próxima possível.

Na página Conta de armazenamento no portal do Azure, na seção Serviço de Blob, verifique se as configurações a seguir estão desabilitadas.

  • Espaço de nomes hierárquico
  • Eliminação de forma recuperável de blobs
  • Gestão de Versões

Criar uma conta de armazenamento do Azure e um contêiner de blob

Crie uma conta de armazenamento do Azure e um contêiner de blob nela executando as seguintes etapas:

  1. Criar uma conta de Armazenamento do Azure
  2. Crie um contêiner de blob.
  3. Autentique-se no contêiner de blob.

Certifique-se de registrar a cadeia de conexão e o nome do contêiner para uso posterior no código de recebimento.

Ao desenvolver localmente, certifique-se de que a conta de usuário que acessa os dados de blob tenha as permissões corretas. Você precisa do Storage Blob Data Contributor para ler e gravar dados de blob. Para atribuir essa função a si mesmo, você precisa receber a função de Administrador de Acesso de Usuário ou outra função que inclua a ação Microsoft.Authorization/roleAssignments/write . Você pode atribuir funções do RBAC do Azure a um usuário usando o portal do Azure, a CLI do Azure ou o Azure PowerShell. Para obter mais informações, consulte Entender o escopo do Azure RBAC.

Nesse cenário, você atribui permissões à sua conta de usuário, com escopo para a conta de armazenamento, para seguir o Princípio do Menor Privilégio. Essa prática oferece aos usuários apenas as permissões mínimas necessárias e cria ambientes de produção mais seguros.

O exemplo a seguir atribui a função de Colaborador de Dados de Blob de Armazenamento à sua conta de usuário, que fornece acesso de leitura e gravação aos dados de blob em sua conta de armazenamento.

Importante

Na maioria dos casos, leva um ou dois minutos para que a atribuição de função se propague no Azure. Em casos raros, pode demorar até oito minutos. Se você receber erros de autenticação quando executar o código pela primeira vez, aguarde alguns momentos e tente novamente.

  1. No portal do Azure, localize sua conta de armazenamento usando a barra de pesquisa principal ou a navegação à esquerda.

  2. Na página da conta de armazenamento, selecione Controle de acesso (IAM) no menu à esquerda.

  3. Na página Controlo de acesso (IAM), selecione o separador Atribuição de funções.

  4. Selecione + Adicionar no menu superior. Em seguida, selecione Adicionar atribuição de função.

    Captura de ecrã a mostrar como atribuir uma função de conta de armazenamento.

  5. Use a caixa de pesquisa para filtrar os resultados para a função desejada. Para este exemplo, procure por Storage Blob Data Contributor. Selecione o resultado correspondente e, em seguida, escolha Avançar.

  6. Em Atribuir acesso a, selecione Utilizador, grupo ou entidade de serviço e, em seguida, selecione + Selecionar membros.

  7. Na caixa de diálogo, procure seu nome de usuário do Microsoft Entra (geralmente seu endereço de e-mail user@domain ) e escolha Selecionar na parte inferior da caixa de diálogo.

  8. Selecione Rever + atribuir para ir para a página final. Selecione Rever + atribuir novamente para concluir o processo.

Instalar os pacotes para receber eventos

Para o lado recetor, você precisa instalar um ou mais pacotes. Neste início rápido, você usa o armazenamento de Blob do Azure para persistir pontos de verificação para que o programa não leia os eventos que já leu. Ele executa pontos de verificação de metadados em mensagens recebidas em intervalos regulares em um blob. Essa abordagem facilita continuar recebendo mensagens mais tarde de onde você parou.

pip install azure-eventhub-checkpointstoreblob-aio
pip install azure-identity

Criar um script Python para receber eventos

Nesta seção, você cria um script Python para receber eventos do seu hub de eventos:

  1. Abra seu editor Python favorito, como o Visual Studio Code.

  2. Crie um script chamado recv.py.

  3. Cole o seguinte código no recv.py:

    No código, use valores reais para substituir os seguintes espaços reservados:

    • BLOB_STORAGE_ACCOUNT_URL - Este valor deve estar no formato: https://<YOURSTORAGEACCOUNTNAME>.blob.core.windows.net/
    • BLOB_CONTAINER_NAME - Nome do contêiner de blob na conta de armazenamento do Azure.
    • EVENT_HUB_FULLY_QUALIFIED_NAMESPACE - Você vê o nome totalmente qualificado na página Visão geral do namespace. Deve estar no formato: <NAMESPACENAME>>.servicebus.windows.net.
    • EVENT_HUB_NAME - Nome do hub de eventos.
    import asyncio
    
    from azure.eventhub.aio import EventHubConsumerClient
    from azure.eventhub.extensions.checkpointstoreblobaio import (
        BlobCheckpointStore,
    )
    from azure.identity.aio import DefaultAzureCredential
    
    BLOB_STORAGE_ACCOUNT_URL = "BLOB_STORAGE_ACCOUNT_URL"
    BLOB_CONTAINER_NAME = "BLOB_CONTAINER_NAME"
    EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "EVENT_HUB_FULLY_QUALIFIED_NAMESPACE"
    EVENT_HUB_NAME = "EVENT_HUB_NAME"
    
    credential = DefaultAzureCredential()
    
    async def on_event(partition_context, event):
        # Print the event data.
        print(
            'Received the event: "{}" from the partition with ID: "{}"'.format(
                event.body_as_str(encoding="UTF-8"), partition_context.partition_id
            )
        )
    
        # Update the checkpoint so that the program doesn't read the events
        # that it has already read when you run it next time.
        await partition_context.update_checkpoint(event)
    
    
    async def main():
        # Create an Azure blob checkpoint store to store the checkpoints.
        checkpoint_store = BlobCheckpointStore(
            blob_account_url=BLOB_STORAGE_ACCOUNT_URL,
            container_name=BLOB_CONTAINER_NAME,
            credential=credential,
        )
    
        # Create a consumer client for the event hub.
        client = EventHubConsumerClient(
            fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE,
            eventhub_name=EVENT_HUB_NAME,
            consumer_group="$Default",
            checkpoint_store=checkpoint_store,
            credential=credential,
        )
        async with client:
            # Call the receive method. Read from the beginning of the partition
            # (starting_position: "-1")
            await client.receive(on_event=on_event, starting_position="-1")
    
        # Close credential when no longer needed.
        await credential.close()
    
    if __name__ == "__main__":
        # Run the main method.
        asyncio.run(main())
    

    Observação

    Para obter exemplos de outras opções para receber eventos de um hub de eventos de forma assíncrona usando uma cadeia de conexão, consulte a página recv_with_checkpoint_store_async.py do GitHub. Os padrões mostrados também são aplicáveis ao recebimento de eventos sem senha.

Execute o aplicativo recetor

  1. Abra uma janela de linha de comandos.

  2. Execute o seguinte comando e inicie sessão usando a conta que foi adicionada à função Proprietário de Dados do Azure Event Hubs no namespace do Event Hubs e à função Colaborador de Dados de Blob de Armazenamento na conta de armazenamento do Azure.

    az login
    
  3. Alterne para a pasta que tem o arquivo receive.py e execute o seguinte comando:

    python recv.py
    

Executar o aplicativo de remetente

  1. Abra uma janela de linha de comandos.

  2. Execute o seguinte comando e inicie sessão usando a conta que foi adicionada à função Proprietário de Dados do Azure Event Hubs no namespace do Event Hubs e à função Colaborador de Dados de Blob de Armazenamento na conta de armazenamento do Azure.

    az login
    
  3. Alterne para a pasta que tem o send.py e, em seguida, execute este comando:

    python send.py
    

A janela do recetor deve exibir as mensagens que foram enviadas para o hub de eventos.

Solução de problemas

Se você não vir eventos na janela do recetor ou se o código relatar um erro, tente as seguintes dicas de solução de problemas:

  • Se não vir resultados de recy.py, execute send.py várias vezes.

  • Se vir erros sobre "corrotina" ao usar o código de login sem senha (usando credenciais), verifique se está a importar de azure.identity.aio.

  • Se vir "Sessão de cliente não fechada" com código sem palavra-passe (com credenciais), certifique-se de que fecha a credencial quando terminar. Para obter mais informações, consulte Credenciais assíncronas.

  • Se vires erros de autorização com recv.py ao aceder ao armazenamento, certifica-te de que seguiste os passos em Criar uma conta de armazenamento Azure e um contêiner de blob e atribuiste a função Armazenamento de Colaborador de Dados de Blob à entidade de serviço.

  • Se você receber eventos com IDs de partição diferentes, esse resultado é esperado. As partições são um mecanismo de organização de dados que se relaciona com o paralelismo a jusante necessário em aplicações que consomem dados. O número de partições num hub de eventos está diretamente relacionado com o número de leitores simultâneos que espera ter. Para obter mais informações, consulte Saiba mais sobre partições.

Próximos passos

Neste início rápido, você enviou e recebeu eventos de forma assíncrona. Para saber como enviar e receber eventos de forma síncrona, vá para a página sync_samples do GitHub.

Explore mais exemplos e cenários avançados na biblioteca de cliente dos Hubs de Eventos do Azure para exemplos de Python.