Observação
O acesso a essa página exige autorização. Você pode tentar entrar ou alterar diretórios.
O acesso a essa página exige autorização. Você pode tentar alterar os diretórios.
Você pode configurar um hub de eventos para que os dados enviados para um hub de eventos sejam capturados em uma conta de armazenamento do Azure ou no Azure Data Lake Storage Gen 1 ou Gen 2. Este artigo mostra como escrever código Python para enviar eventos para um hub de eventos e ler os dados capturados do Armazenamento de Blobs do Azure. Para obter mais informações sobre esse recurso, consulte a visão geral do recurso Captura de Hubs de Eventos.
Este início rápido usa o SDK Python do Azure para demonstrar o recurso de Captura. O aplicativo sender.py envia telemetria ambiental simulada para hubs de eventos no formato JSON. O hub de eventos é configurado para usar o recurso Capturar para registrar esses dados no Blob Storage em lotes. O aplicativo capturereader.py lê esses blobs e cria um arquivo de acréscimo para cada dispositivo. Em seguida, o aplicativo grava os dados em arquivos CSV.
Neste início rápido, você:
- Crie uma conta de Armazenamento de Blobs do Azure e um contêiner no portal do Azure.
- Crie um namespace dos Hubs de Eventos usando o portal do Azure.
- Crie um hub de eventos com o recurso Capturar habilitado e conecte-o à sua conta de armazenamento.
- Envie dados para o hub de eventos usando um script Python.
- Ler e processar arquivos do Event Hubs Capture usando um outro script Python.
Pré-requisitos
Python 3.8 ou posterior, com pip instalado e atualizado.
Uma assinatura do Azure. Se você não tiver uma conta gratuita, crie uma antes de começar.
Um namespace ativo do Event Hubs e um hub de eventos. Crie um namespace dos Hubs de Eventos e um hub de eventos no namespace. Registre o nome do namespace dos Hubs de Eventos, o nome do hub de eventos e a chave de acesso primária para o namespace. Para obter a chave de acesso, consulte Obter uma cadeia de conexão dos Hubs de Eventos. O nome da chave padrão é RootManageSharedAccessKey. Para este início rápido, você precisa apenas da chave primária. Você não precisa da string de conexão.
Uma conta de armazenamento do Azure, um contêiner de blobs na conta de armazenamento e uma cadeia de conexão para a conta de armazenamento. Se você não tiver esses itens, execute as seguintes etapas:
- Crie uma conta de armazenamento do Azure
- Criar um contêiner de blob na conta de armazenamento
- Obtenha a cadeia de conexão para a conta de armazenamento
Registre a cadeia de conexão e o nome do contêiner para uso posterior neste início rápido.
Habilitar o recurso De captura para o hub de eventos
Habilite o recurso Capturar para o hub de eventos. Para fazer isso, siga as instruções em Habilitar a Captura de Hubs de Eventos usando o portal do Azure. Selecione a conta de armazenamento e o contêiner de blob que você criou na etapa anterior. Selecione Avro para o formato de serialização de eventos de saída.
Criar um script python para enviar eventos para o hub de eventos
Nesta seção, você criará um script Python que envia 200 eventos (10 dispositivos * 20 eventos) para um hub de eventos. Esses eventos são uma leitura ambiental de exemplo enviada no formato JSON.
Abra seu editor favorito do Python, como o Visual Studio Code.
Crie um script chamado sender.py.
Cole o código a seguir em sender.py.
import time import os import uuid import datetime import random import json from azure.eventhub import EventHubProducerClient, EventData # This script simulates the production of events for 10 devices. devices = [] for x in range(0, 10): devices.append(str(uuid.uuid4())) # Create a producer client to produce and publish events to the event hub. producer = EventHubProducerClient.from_connection_string(conn_str="EVENT HUBS NAMESPACE CONNECTION STRING", eventhub_name="EVENT HUB NAME") for y in range(0,20): # For each device, produce 20 events. event_data_batch = producer.create_batch() # Create a batch. You will add events to the batch later. for dev in devices: # Create a dummy reading. reading = { 'id': dev, 'timestamp': str(datetime.datetime.utcnow()), 'uv': random.random(), 'temperature': random.randint(70, 100), 'humidity': random.randint(70, 100) } s = json.dumps(reading) # Convert the reading into a JSON string. event_data_batch.add(EventData(s)) # Add event data to the batch. producer.send_batch(event_data_batch) # Send the batch of events to the event hub. # Close the producer. producer.close()Substitua os seguintes valores nos scripts:
- Substitua
EVENT HUBS NAMESPACE CONNECTION STRINGpela cadeia de conexão do namespace dos Hubs de Eventos. - Substitua
EVENT HUB NAMEpelo nome do hub de eventos.
- Substitua
Execute o script para enviar eventos para o hub de eventos.
No portal do Azure, você pode verificar se o hub de eventos recebeu as mensagens. Alterne para o modo de exibição Mensagens na seção Métricas . Recarregue a página para atualizar o gráfico. Pode levar alguns segundos para a página exibir que as mensagens foram recebidas.
Criar um script python para ler seus arquivos de captura
Neste exemplo, os dados capturados são armazenados no Armazenamento de Blobs do Azure. O script nesta seção lê os arquivos de dados capturados de sua conta de armazenamento do Azure e gera arquivos CSV para que você abra e exiba facilmente. Você vê 10 arquivos no diretório de trabalho atual do aplicativo. Esses arquivos contêm as leituras ambientais para os 10 dispositivos.
No editor do Python, crie um script chamado capturereader.py. Esse script lê os arquivos capturados e cria um arquivo para cada dispositivo gravar os dados somente para esse dispositivo.
Cole o código a seguir em capturereader.py.
import os import string import json import uuid import avro.schema from azure.storage.blob import ContainerClient, BlobClient from avro.datafile import DataFileReader, DataFileWriter from avro.io import DatumReader, DatumWriter def processBlob2(filename): reader = DataFileReader(open(filename, 'rb'), DatumReader()) dict = {} for reading in reader: parsed_json = json.loads(reading["Body"]) if not 'id' in parsed_json: return if not parsed_json['id'] in dict: list = [] dict[parsed_json['id']] = list else: list = dict[parsed_json['id']] list.append(parsed_json) reader.close() for device in dict.keys(): filename = os.getcwd() + '\\' + str(device) + '.csv' deviceFile = open(filename, "a") for r in dict[device]: deviceFile.write(", ".join([str(r[x]) for x in r.keys()])+'\n') def startProcessing(): print('Processor started using path: ' + os.getcwd()) # Create a blob container client. container = ContainerClient.from_connection_string("AZURE STORAGE CONNECTION STRING", container_name="BLOB CONTAINER NAME") blob_list = container.list_blobs() # List all the blobs in the container. for blob in blob_list: # Content_length == 508 is an empty file, so process only content_length > 508 (skip empty files). if blob.size > 508: print('Downloaded a non empty blob: ' + blob.name) # Create a blob client for the blob. blob_client = ContainerClient.get_blob_client(container, blob=blob.name) # Construct a file name based on the blob name. cleanName = str.replace(blob.name, '/', '_') cleanName = os.getcwd() + '\\' + cleanName with open(cleanName, "wb+") as my_file: # Open the file to write. Create it if it doesn't exist. my_file.write(blob_client.download_blob().readall()) # Write blob contents into the file. processBlob2(cleanName) # Convert the file into a CSV file. os.remove(cleanName) # Remove the original downloaded file. # Delete the blob from the container after it's read. container.delete_blob(blob.name) startProcessing()Substitua
AZURE STORAGE CONNECTION STRINGpela cadeia de conexão para sua conta de armazenamento do Azure. O nome do contêiner que você criou neste início rápido é captura. Se você usou um nome diferente para o contêiner, substitua a captura pelo nome do contêiner na conta de armazenamento.
Executar os scripts
Abra um prompt de comando que tenha o Python em seu caminho e execute estes comandos para instalar pacotes de pré-requisitos do Python:
pip install azure-storage-blob pip install azure-eventhub pip install avro-python3Altere o diretório para o diretório em que você salvou sender.py e capturereader.py e execute este comando:
python sender.pyEsse comando inicia um novo processo do Python para executar o remetente.
Aguarde alguns minutos para que a captura seja executada e, em seguida, insira o seguinte comando na janela de comando original:
python capturereader.pyEsse processador de captura usa o diretório local para baixar todos os blobs da conta de armazenamento e do contêiner. Ele processa arquivos que não estão vazios e grava os resultados como arquivos CSV no diretório local.
Próximas etapas
Confira exemplos do Python no GitHub.