Udostępnij przez


Szybki start: wysyłanie zdarzeń do lub odbieranie zdarzeń z centrów zdarzeń przy użyciu języka Python

Z tego przewodnika Szybki start dowiesz się, jak wysyłać zdarzenia do centrum zdarzeń i odbierać je z centrum zdarzeń przy użyciu pakietu języka Python azure-eventhub .

Wymagania wstępne

Jeśli dopiero zaczynasz korzystać z usługi Azure Event Hubs, zobacz Omówienie usługi Event Hubs przed wykonaniem tego przewodnika Szybki start.

Aby ukończyć ten przewodnik Szybki start, upewnij się, że masz następujące wymagania wstępne:

  • Subskrypcja platformy Microsoft Azure: utwórz konto bezpłatnej wersji próbnej , jeśli jej nie masz.
  • Środowisko Python w wersji 3.8 lub nowszej: upewnij się, że narzędzie jest zainstalowane i zaktualizowane.
  • Visual Studio Code (zalecane): lub użyj dowolnego innego wybranego środowiska IDE.
  • Przestrzeń nazw i centrum zdarzeń usługi Event Hubs: postępuj zgodnie z tym przewodnikiem , aby utworzyć je w witrynie Azure Portal.

Instalowanie pakietów w celu wysyłania zdarzeń

Aby zainstalować pakiety języka Python dla usługi Event Hubs, otwórz wiersz polecenia zawierający język Python w swojej ścieżce. Zmień katalog na folder, w którym chcesz zachować przykłady.

pip install azure-eventhub
pip install azure-identity
pip install aiohttp

Uwierzytelnianie aplikacji na platformie Azure

Ten szybki start przedstawia dwa sposoby nawiązywania połączenia z Azure Event Hubs.

  • Bez hasła. Użyj swojego podmiotu zabezpieczeń w Microsoft Entra ID i kontroli dostępu opartej na rolach (RBAC), aby połączyć się z przestrzenią nazw Event Hubs. Nie musisz martwić się o zakodowane parametry połączenia w kodzie, w pliku konfiguracji lub w bezpiecznym magazynie, na przykład w usłudze Azure Key Vault.
  • Parametry połączenia. Użyj parametrów połączenia, aby nawiązać połączenie z przestrzenią nazw usługi Event Hubs. Jeśli dopiero zaczynasz korzystać z platformy Azure, możesz znaleźć opcję parametry połączenia łatwiejszą do naśladowania.

Zalecamy użycie opcji bez hasła w rzeczywistych aplikacjach i środowiskach produkcyjnych. Aby uzyskać więcej informacji, zobacz Uwierzytelnianie i autoryzacja usługi Service Bus oraz połączenia bez hasła dla usług platformy Azure.

Przypisywanie ról do użytkownika firmy Microsoft Entra

Podczas tworzenia aplikacji lokalnie upewnij się, że konto użytkownika, które nawiązuje połączenie z usługą Azure Event Hubs, ma odpowiednie uprawnienia. Do wysyłania i odbierania komunikatów potrzebna jest rola właściciela danych usługi Azure Event Hubs . Aby przypisać sobie tę rolę, musisz mieć rolę administratora dostępu użytkowników lub inną rolę obejmującą Microsoft.Authorization/roleAssignments/write akcję. Role RBAC platformy Azure można przypisać użytkownikowi przy użyciu witryny Azure Portal, interfejsu wiersza polecenia platformy Azure lub programu Azure PowerShell. Aby uzyskać więcej informacji, zobacz Zrozumienie zakresu kontroli dostępu opartej na rolach Azure.

Poniższy przykład przypisuje Azure Event Hubs Data Owner rolę do konta użytkownika, co zapewnia pełny dostęp do zasobów usługi Azure Event Hubs. W rzeczywistym scenariuszu postępuj zgodnie z zasadą najniższych uprawnień , aby dać użytkownikom tylko minimalne uprawnienia wymagane do bezpieczniejszego środowiska produkcyjnego.

Wbudowane role platformy Azure dla usługi Azure Event Hubs

W przypadku usługi Azure Event Hubs zarządzanie przestrzeniami nazw i wszystkimi powiązanymi zasobami za pośrednictwem witryny Azure Portal i interfejsu API zarządzania zasobami platformy Azure jest już chronione przy użyciu modelu RBAC platformy Azure. Platforma Azure udostępnia następujące wbudowane role umożliwiające autoryzowanie dostępu do przestrzeni nazw usługi Event Hubs:

  • Właściciel danych usługi Azure Event Hubs: umożliwia dostęp danych do przestrzeni nazw usługi Event Hubs i jej jednostek (kolejek, tematów, subskrypcji i filtrów).
  • Nadawca danych usługi Azure Event Hubs: ta rola umożliwia nadawcy dostęp do przestrzeni nazw usługi Event Hubs i jej jednostek.
  • Odbiornik danych usługi Azure Event Hubs: ta rola umożliwia odbiorcy dostęp do przestrzeni nazw usługi Event Hubs i jej jednostek.

Jeśli chcesz utworzyć rolę niestandardową, zobacz Prawa wymagane dla operacji usługi Event Hubs.

Ważne

W większości przypadków propagacja przypisania roli na platformie Azure zajmuje minutę lub dwie. W rzadkich przypadkach może upłynąć do ośmiu minut. Jeśli podczas pierwszego uruchomienia kodu wystąpią błędy uwierzytelniania, zaczekaj chwilę i spróbuj ponownie.

  1. W witrynie Azure Portal znajdź przestrzeń nazw usługi Event Hubs przy użyciu głównego paska wyszukiwania lub nawigacji po lewej stronie.

  2. Na stronie przeglądu wybierz pozycję Kontrola dostępu (Zarządzanie dostępem i tożsamościami) z menu po lewej stronie.

  3. Na stronie Kontrola dostępu (Zarządzanie dostępem i tożsamościami) wybierz kartę Przypisania ról.

  4. Kliknij + Dodaj w górnym menu. Następnie wybierz pozycję Dodaj przypisanie roli.

    Zrzut ekranu przedstawiający sposób przypisywania roli.

  5. Użyj pola wyszukiwania, aby filtrować wyniki do żądanej roli. W tym przykładzie wyszukaj Azure Event Hubs Data Owner i wybierz pasujący wynik. Następnie wybierz pozycję Dalej.

  6. W obszarze Przypisz dostęp do wybierz pozycję Użytkownik, grupa lub jednostka usługi. Następnie wybierz pozycję + Wybierz członków.

  7. W oknie dialogowym wyszukaj nazwę użytkownika microsoft Entra (zazwyczaj adres e-mail user@domain ). Wybierz pozycję Wybierz w dolnej części okna dialogowego.

  8. Wybierz pozycję Przejrzyj i przypisz , aby przejść do ostatniej strony. Wybierz ponownie pozycję Przejrzyj i przypisz , aby ukończyć proces.

Wysyłanie zdarzeń

W tej sekcji utwórz skrypt języka Python w celu wysyłania zdarzeń do utworzonego wcześniej centrum zdarzeń.

  1. Otwórz ulubiony edytor języka Python, taki jak Visual Studio Code.

  2. Utwórz skrypt o nazwie send.py. Ten skrypt wysyła partię zdarzeń do utworzonego wcześniej centrum zdarzeń.

  3. Wklej następujący kod do send.py:

    W kodzie użyj rzeczywistych wartości, aby zastąpić następujące symbole zastępcze:

    • EVENT_HUB_FULLY_QUALIFIED_NAMESPACE — Na stronie Przegląd przestrzeni nazw zostanie wyświetlona w pełni kwalifikowana nazwa. Powinien mieć format: <NAMESPACENAME>>.servicebus.windows.net.
    • EVENT_HUB_NAME - Nazwa centrum zdarzeń.
    import asyncio
    
    from azure.eventhub import EventData
    from azure.eventhub.aio import EventHubProducerClient
    from azure.identity.aio import DefaultAzureCredential
    
    EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "EVENT_HUB_FULLY_QUALIFIED_NAMESPACE"
    EVENT_HUB_NAME = "EVENT_HUB_NAME"
    
    credential = DefaultAzureCredential()
    
    async def run():
        # Create a producer client to send messages to the event hub.
        # Specify a credential that has correct role assigned to access
        # event hubs namespace and the event hub name.
        producer = EventHubProducerClient(
            fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE,
            eventhub_name=EVENT_HUB_NAME,
            credential=credential,
        )
        print("Producer client created successfully.") 
        async with producer:
            # Create a batch.
            event_data_batch = await producer.create_batch()
    
            # Add events to the batch.
            event_data_batch.add(EventData("First event "))
            event_data_batch.add(EventData("Second event"))
            event_data_batch.add(EventData("Third event"))
    
            # Send the batch of events to the event hub.
            await producer.send_batch(event_data_batch)
    
            # Close credential when no longer needed.
            await credential.close()
    
    asyncio.run(run())
    

    Uwaga

    Przykłady innych opcji wysyłania zdarzeń do centrum zdarzeń asynchronicznie przy użyciu parametrów połączenia można znaleźć na stronie usługi GitHub send_async.py. Pokazane wzorce mają również zastosowanie do wysyłania zdarzeń bez hasła.

Odbieranie zdarzeń

Ten przewodnik Szybki start używa usługi Azure Blob Storage jako magazynu punktów kontrolnych. Magazyn punktów kontrolnych służy do utrwalania punktów kontrolnych (czyli ostatnich pozycji odczytu).

Postępuj zgodnie z tymi zaleceniami, gdy używasz usługi Azure Blob Storage jako magazynu punktów kontrolnych:

  • Użyj oddzielnego kontenera dla każdej grupy odbiorców. Możesz użyć tego samego konta magazynu, ale użyj jednego kontenera dla każdej grupy.
  • Nie używaj konta magazynowego do niczego innego.
  • Nie używaj kontenera do niczego innego.
  • Utwórz konto magazynu w tym samym regionie co wdrożona aplikacja. Jeśli aplikacja jest lokalna, spróbuj wybrać możliwy region najbliżej.

Na stronie Konto magazynu w witrynie Azure Portal w sekcji Blob Service upewnij się, że następujące ustawienia są wyłączone.

  • Hierarchiczna przestrzeń nazw
  • Usuwanie nietrwałe obiektów blob
  • Wersje

Tworzenie konta usługi Azure Storage i kontenera obiektów blob

Utwórz w nim konto usługi Azure Storage i kontener obiektów blob, wykonując następujące czynności:

  1. Tworzenie konta usługi Azure Storage
  2. Utwórz kontener obiektów blob.
  3. Uwierzytelnij się w kontenerze obiektów blob.

Pamiętaj, aby zarejestrować parametry połączenia i nazwę kontenera do późniejszego użycia w kodzie odbioru.

Podczas tworzenia aplikacji lokalnie upewnij się, że konto użytkownika, które uzyskuje dostęp do danych obiektów blob, ma odpowiednie uprawnienia. Potrzebujesz Współautora danych usługi Storage Blob, aby odczytywać i zapisywać dane obiektów blob. Aby przypisać sobie tę rolę, musisz mieć przypisaną rolę Administratora dostępu użytkowników lub inną rolę obejmującą akcję Microsoft.Authorization/roleAssignments/write . Role RBAC platformy Azure można przypisać użytkownikowi przy użyciu witryny Azure Portal, interfejsu wiersza polecenia platformy Azure lub programu Azure PowerShell. Aby uzyskać więcej informacji, zobacz Zrozumienie zakresu dla Azure RBAC.

W tym scenariuszu przypisujesz uprawnienia do konta użytkownika w zakresie konta magazynu, aby postępować zgodnie z zasadą najniższych uprawnień. Ta praktyka zapewnia użytkownikom tylko minimalne wymagane uprawnienia i tworzy bezpieczniejsze środowiska produkcyjne.

W poniższym przykładzie przypisano rolę Współautor danych obiektu blob usługi Storage do konta użytkownika, która zapewnia zarówno dostęp do odczytu, jak i zapisu do danych obiektów blob na koncie magazynu.

Ważne

W większości przypadków propagacja przypisania roli na platformie Azure zajmuje minutę lub dwie. W rzadkich przypadkach może upłynąć do ośmiu minut. Jeśli podczas pierwszego uruchomienia kodu wystąpią błędy uwierzytelniania, zaczekaj chwilę i spróbuj ponownie.

  1. W witrynie Azure Portal znajdź konto magazynu przy użyciu głównego paska wyszukiwania lub nawigacji po lewej stronie.

  2. Na stronie konta magazynu wybierz pozycję Kontrola dostępu (IAM) z menu po lewej stronie.

  3. Na stronie Kontrola dostępu (Zarządzanie dostępem i tożsamościami) wybierz kartę Przypisania ról.

  4. Kliknij + Dodaj w górnym menu. Następnie wybierz pozycję Dodaj przypisanie roli.

    Zrzut ekranu pokazujący, jak nadać rolę konta magazynowego.

  5. Użyj pola wyszukiwania, aby filtrować wyniki do żądanej roli. W tym przykładzie wyszukaj pozycję Współautor danych obiektu blob usługi Storage. Wybierz pasujący wynik, a następnie wybierz pozycję Dalej.

  6. W obszarze Przypisz dostęp do wybierz pozycję Użytkownik, grupa lub jednostka usługi, a następnie wybierz pozycję + Wybierz członków.

  7. W oknie dialogowym wyszukaj nazwę użytkownika firmy Microsoft Entra (zazwyczaj adres e-mail user@domain ), a następnie wybierz pozycję Wybierz w dolnej części okna dialogowego.

  8. Wybierz pozycję Przejrzyj i przypisz , aby przejść do ostatniej strony. Wybierz ponownie pozycję Przejrzyj i przypisz , aby ukończyć proces.

Instalowanie pakietów w celu odbierania zdarzeń

W przypadku strony odbieranej należy zainstalować co najmniej jeden pakiet. W tym przewodniku Szybki start użyjesz usługi Azure Blob Storage do utrwalania punktów kontrolnych, aby program nie odczytywał już odczytanych zdarzeń. Wykonuje ona punkty kontrolne metadanych na odebranych komunikatach w regularnych odstępach czasu w obiekcie blob. Takie podejście ułatwia kontynuowanie odbierania komunikatów później z miejsca, w którym zostało przerwane.

pip install azure-eventhub-checkpointstoreblob-aio
pip install azure-identity

Tworzenie skryptu języka Python w celu odbierania zdarzeń

W tej sekcji utworzysz skrypt języka Python do odbierania zdarzeń z centrum zdarzeń:

  1. Otwórz ulubiony edytor języka Python, taki jak Visual Studio Code.

  2. Utwórz skrypt o nazwie recv.py.

  3. Wklej następujący kod do recv.py:

    W kodzie użyj rzeczywistych wartości, aby zastąpić następujące symbole zastępcze:

    • BLOB_STORAGE_ACCOUNT_URL - Ta wartość powinna być w formacie: https://<YOURSTORAGEACCOUNTNAME>.blob.core.windows.net/
    • BLOB_CONTAINER_NAME — Nazwa kontenera obiektów blob na koncie usługi Azure Storage.
    • EVENT_HUB_FULLY_QUALIFIED_NAMESPACE — Na stronie Przegląd przestrzeni nazw zostanie wyświetlona w pełni kwalifikowana nazwa. Powinien mieć format: <NAMESPACENAME>>.servicebus.windows.net.
    • EVENT_HUB_NAME - Nazwa centrum zdarzeń.
    import asyncio
    
    from azure.eventhub.aio import EventHubConsumerClient
    from azure.eventhub.extensions.checkpointstoreblobaio import (
        BlobCheckpointStore,
    )
    from azure.identity.aio import DefaultAzureCredential
    
    BLOB_STORAGE_ACCOUNT_URL = "BLOB_STORAGE_ACCOUNT_URL"
    BLOB_CONTAINER_NAME = "BLOB_CONTAINER_NAME"
    EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "EVENT_HUB_FULLY_QUALIFIED_NAMESPACE"
    EVENT_HUB_NAME = "EVENT_HUB_NAME"
    
    credential = DefaultAzureCredential()
    
    async def on_event(partition_context, event):
        # Print the event data.
        print(
            'Received the event: "{}" from the partition with ID: "{}"'.format(
                event.body_as_str(encoding="UTF-8"), partition_context.partition_id
            )
        )
    
        # Update the checkpoint so that the program doesn't read the events
        # that it has already read when you run it next time.
        await partition_context.update_checkpoint(event)
    
    
    async def main():
        # Create an Azure blob checkpoint store to store the checkpoints.
        checkpoint_store = BlobCheckpointStore(
            blob_account_url=BLOB_STORAGE_ACCOUNT_URL,
            container_name=BLOB_CONTAINER_NAME,
            credential=credential,
        )
    
        # Create a consumer client for the event hub.
        client = EventHubConsumerClient(
            fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE,
            eventhub_name=EVENT_HUB_NAME,
            consumer_group="$Default",
            checkpoint_store=checkpoint_store,
            credential=credential,
        )
        async with client:
            # Call the receive method. Read from the beginning of the partition
            # (starting_position: "-1")
            await client.receive(on_event=on_event, starting_position="-1")
    
        # Close credential when no longer needed.
        await credential.close()
    
    if __name__ == "__main__":
        # Run the main method.
        asyncio.run(main())
    

    Uwaga

    Przykłady innych opcji odbierania zdarzeń z centrum zdarzeń asynchronicznie przy użyciu parametrów połączenia można znaleźć na stronie usługi GitHub recv_with_checkpoint_store_async.py. Pokazane wzorce mają również zastosowanie do odbierania zdarzeń bez hasła.

Uruchamianie aplikacji odbiorcy

  1. Uruchom wiersz polecenia.

  2. Uruchom następujące polecenie i zaloguj się przy użyciu konta, które zostało dodane do roli Właściciel danych usługi Azure Event Hubs w przestrzeni nazw Event Hubs oraz roli Współautor danych obiektu blob usługi Azure Storage na koncie usługi Azure Storage.

    az login
    
  3. Przejdź do folderu z plikiem receive.py i uruchom następujące polecenie:

    python recv.py
    

Uruchamianie aplikacji nadawcy

  1. Uruchom wiersz polecenia.

  2. Uruchom następujące polecenie i zaloguj się przy użyciu konta, które zostało dodane do roli Właściciel danych usługi Azure Event Hubs w przestrzeni nazw Event Hubs oraz roli Współautor danych obiektu blob usługi Azure Storage na koncie usługi Azure Storage.

    az login
    
  3. Przejdź do folderu z send.py, a następnie uruchom następujące polecenie:

    python send.py
    

W oknie odbiorcy powinny być wyświetlane komunikaty, które zostały wysłane do centrum zdarzeń.

Rozwiązywanie problemów

Jeśli nie widzisz zdarzeń w oknie odbiorcy lub kod zgłasza błąd, wypróbuj następujące porady dotyczące rozwiązywania problemów:

  • Jeśli nie widzisz wyników z recy.py, uruchom send.py kilka razy.

  • Jeśli podczas korzystania z kodu bez hasła (z poświadczeniami) zostaną wyświetlone błędy dotyczące "coroutine", upewnij się, że używasz importowania z azure.identity.aioprogramu .

  • Jeśli zostanie wyświetlona opcja "Nieujawiona sesja klienta" z kodem bez hasła (z poświadczeniami), upewnij się, że po zakończeniu zamknij poświadczenie. Aby uzyskać więcej informacji, zobacz Async credentials (Poświadczenia asynchroniczne).

  • Jeśli podczas uzyskiwania dostępu do magazynu występują błędy autoryzacji w przypadku recv.py , upewnij się, że wykonano kroki opisane w artykule Tworzenie konta usługi Azure Storage i kontenera obiektów blob oraz przypisano rolę Współautor danych obiektu blob usługi Storage do jednostki usługi.

  • Jeśli otrzymasz zdarzenia z różnymi identyfikatorami partycji, ten wynik będzie oczekiwany. Partycje stanowią mechanizm organizacji danych powiązany z równoległością podrzędną wymaganą w aplikacjach korzystających z tych danych. Liczba partycji w centrum zdarzeń jest bezpośrednio związana z oczekiwaną liczbą jednoczesnych czytników. Aby uzyskać więcej informacji, zobacz Dowiedz się więcej o partycjach.

Następne kroki

W tym przewodniku Szybki start wysyłano i odbierano zdarzenia asynchronicznie. Aby dowiedzieć się, jak synchronicznie wysyłać i odbierać zdarzenia, przejdź do strony usługi GitHub sync_samples.

Zapoznaj się z bardziej przykładami i zaawansowanymi scenariuszami w bibliotece klienta usługi Azure Event Hubs dla przykładów języka Python.