Freigeben über


Schnellstart: Überprüfen der Verwendung eines Avro-Schemas beim Streamen von Ereignissen mit Event Hubs .NET SDKs (AMQP)

In dieser Schnellstartanleitung erfahren Sie, wie Sie Ereignisse an einen Event Hub mit Schemaüberprüfung mithilfe der .NET-Bibliothek "Azure.Messaging.EventHubs " senden und Ereignisse von einem Event Hub empfangen.

Azure Schema Registry ist ein Feature von Event Hubs. Die Registrierung stellt ein zentrales Repository für Schemas für ereignisgesteuerte und messagingorientierte Anwendungen bereit. Sie bietet Ihren Producer- und Consumeranwendungen die Flexibilität, Daten auszutauschen, ohne das Schema verwalten und gemeinsam nutzen zu müssen. Sie stellt außerdem ein einfaches Governanceframework für wiederverwendbare Schemas bereit und definiert die Beziehung zwischen Schemas über ein Gruppierungskonstrukt (Schemagruppen). Weitere Informationen finden Sie unter Azure-Schema Registry in Azure Event Hubs.

Voraussetzungen

Wenn Sie mit Azure Event Hubs noch nicht vertraut sind, lesen Sie vor dem Durcharbeiten dieser Schnellstartanleitung die Informationen unter Übersicht über Event Hubs.

Zum Durchführen dieser Schnellstartanleitung benötigen Sie Folgendes:

  • Wenn Sie kein Azure-Abonnement besitzen, erstellen Sie ein kostenloses Konto, bevor Sie beginnen.

  • Microsoft Visual Studio 2022.

    Die Azure Event Hubs-Clientbibliothek verwendet Features, die in C# 8.0 eingeführt wurden. Sie können die Bibliothek weiterhin mit früheren C#-Sprachversionen verwenden, aber die neue Syntax ist nicht verfügbar. Um die vollständige Syntax zu verwenden, empfehlen wir, dass Sie mit dem .NET Core SDK 3.0 oder höher kompilieren und die Sprachversion auf latest.NET Core SDK 3.0 festgelegt ist.

    Wenn Sie Visual Studio verwenden, sind Versionen vor Visual Studio 2019 nicht mit den Tools kompatibel, die zum Erstellen von C#8.0-Projekten erforderlich sind. Informationen zum Herunterladen von Visual Studio 2019 oder Visual Studio 2022, einschließlich der kostenlosen Community-Edition, finden Sie unter Visual Studio.

Erstellen eines Ereignis-Hubs

Um einen Event Hubs-Namespace und einen Event Hub zu erstellen, folgen Sie den Anweisungen unter Erstellen eines Event Hubs-Namespaces und eines Event Hubs.

Wenn Sie eine Verbindungszeichenfolge mit Ihrem Event Hubs-Namespace abrufen möchten, folgen Sie den Anweisungen von "Abrufen der Verbindungszeichenfolge".

Beachten Sie die folgenden Einstellungen, die in der aktuellen Schnellstartanleitung verwendet werden sollen:

  • Verbindungszeichenfolge für den Event Hubs-Namespace
  • Name des Ereignishubs

Erstellen eines Schemas

Um eine Schemagruppe und ein Schema zu erstellen, befolgen Sie die Anweisungen unter Erstellen von Schemas mithilfe der Schemaregistrierung.

  1. Erstellen Sie eine Schemagruppe namens "contoso-sg " mithilfe des Schemaregistrierungsportals. Verwenden Sie Avro als Serialisierungstyp und "None" für den Kompatibilitätsmodus.

  2. Erstellen Sie in dieser Schemagruppe ein neues Avro-Schema mit Schemaname: Microsoft.Azure.Data.SchemaRegistry.example.Order. Verwenden Sie den folgenden Schemainhalt.

    {
      "namespace": "Microsoft.Azure.Data.SchemaRegistry.example",
      "type": "record",
      "name": "Order",
      "fields": [
        {
          "name": "id",
          "type": "string"
        },
        {
          "name": "amount",
          "type": "double"
        },
        {
          "name": "description",
          "type": "string"
        }
      ]
    } 
    

Hinzufügen eines Benutzers zur Rolle "Schemaregistrierungsleser"

Fügen Sie Ihr Benutzerkonto zur Rolle "Schemaregistrierungsleser " auf Namespaceebene hinzu. Sie können auch die Rolle " Schemaregistrierungsmitwirkender" verwenden, aber das ist für diese Schnellstartanleitung nicht erforderlich.

  1. Wählen Sie auf der Seite "Event Hubs Namespace " im linken Menü die Option Access Control (IAM) aus.
  2. Wählen Sie auf der Seite Access Control (IAM)die Option +Rollenzuweisung hinzufügen> aus.
  3. Wählen Sie auf der Seite "Rollen" den Schemaregistrierungsleser und dann "Weiter" aus.
  4. Verwenden Sie den Link "+Mitglieder auswählen ", um Ihr Benutzerkonto zur Rolle hinzuzufügen, und wählen Sie dann "Weiter" aus.
  5. Wählen Sie auf der Seite Überprüfen + zuweisen die Option Überprüfen + zuweisen aus.

Erstellen von Ereignissen für Event Hubs mit Schemaüberprüfung

Erstellen einer Konsolenanwendung für Ereignisproduzent

  1. Starten Sie Visual Studio.

  2. Wählen Sie Neues Projekt erstellen aus.

  3. Führen Sie im Dialogfeld "Neues Projekt erstellen " die folgenden Schritte aus. Wenn dieses Dialogfeld nicht angezeigt wird, wählen Sie im Menü "Datei " aus, wählen Sie "Neu" und dann "Projekt" aus.

    1. Wählen Sie die Programmiersprache C# aus.

    2. Wählen Sie den Anwendungstyp Konsole aus.

    3. Wählen Sie in der Ergebnisliste die Option Konsolenanwendung aus.

    4. Wählen Sie anschließend Weiter aus.

      Screenshot des Dialogfelds

  4. Geben Sie OrderProducer für den Projektnamen, SRQuickStart für den Lösungsnamen ein, und wählen Sie dann OK aus, um das Projekt zu erstellen.

Hinzufügen des Event Hubs-NuGet-Pakets

  1. Klicken Sie auf Extras>NuGet-Paket-Manager>Paket-Manager-Konsole.

  2. Führen Sie die folgenden Befehle aus, um Azure.Messaging.EventHubs und andere NuGet-Pakete zu installieren. Drücken Sie die EINGABETASTE, um den letzten Befehl auszuführen.

    Install-Package Azure.Messaging.EventHubs
    Install-Package Azure.Identity
    Install-Package Microsoft.Azure.Data.SchemaRegistry.ApacheAvro
    Install-Package Azure.ResourceManager.Compute
    
  3. Authentifizieren Sie Produzentenanwendungen, um mithilfe von Visual Studio eine Verbindung mit Azure herzustellen. Weitere Informationen finden Sie unter Azure Identity-Clientbibliothek für .NET-.

  4. Melden Sie sich mit dem Benutzerkonto bei Azure an, das mitglied der Schema Registry Reader Rolle auf Namespaceebene ist. Informationen zu Schemaregistrierungsrollen finden Sie unter Azure rollenbasierte Zugriffssteuerung.

Codegenerierung mit dem Avro-Schema

  1. Verwenden Sie denselben Inhalt, den Sie zum Erstellen des Schemas zum Erstellen einer Datei namens Order.avscverwendet haben. Speichern Sie die Datei im Projekt- oder Lösungsordner.
  2. Verwenden Sie diese Schemadatei, um Code für .NET zu generieren. Sie können jedes tool zur Generierung externer Codes verwenden, z. B. avrogen für die Codegenerierung. Führen Sie z. B. die Ausführung avrogen -s .\Order.avsc . aus, um Code zu generieren.
  3. Nachdem Sie Code generiert haben, sehen Sie die Datei Order.cs im Ordner \Microsoft\Azure\Data\SchemaRegistry\example. Für das Avro-Schema werden hier die C#-Typen im Namespace Microsoft.Azure.Data.SchemaRegistry.example generiert.
  4. Fügen Sie die Order.cs Datei dem OrderProducer Projekt hinzu.

Schreiben von Code zum Serialisieren und Senden von Ereignissen an den Event Hub

  1. Fügen Sie der Datei Program.cs den folgenden Code hinzu. Details finden Sie in den Codekommentaren. Übergeordnete Schritte im Code sind:

    1. Erstellen Sie einen Produzentenclient, den Sie zum Senden von Ereignissen an einen Event Hub verwenden können.
    2. Erstellen Sie einen Schemaregistrierungsclient, den Sie zum Serialisieren und Überprüfen von Daten in einem Order Objekt verwenden können.
    3. Erstellen Sie ein neues Order Objekt mithilfe des generierten Order Typs.
    4. Verwenden Sie den Schemaregistrierungsclient, um das Order-Objekt in EventData zu serialisieren.
    5. Erstellen Sie einen Batch von Ereignissen.
    6. Fügen Sie die Ereignisdaten dem Ereignisbatch hinzu.
    7. Verwenden Sie den Produzentenclient, um den Batch von Ereignissen an den Event Hub zu senden.
    using Azure.Data.SchemaRegistry;
    using Azure.Identity;
    using Microsoft.Azure.Data.SchemaRegistry.ApacheAvro;
    using Azure.Messaging.EventHubs;
    using Azure.Messaging.EventHubs.Producer;
    
    using Microsoft.Azure.Data.SchemaRegistry.example;
    
    // connection string to the Event Hubs namespace
    const string connectionString = "EVENTHUBSNAMESPACECONNECTIONSTRING";
    
    // name of the event hub
    const string eventHubName = "EVENTHUBNAME";
    
    // Schema Registry endpoint 
    const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net";
    
    // name of the consumer group   
    const string schemaGroup = "SCHEMAGROUPNAME";
    
    // The Event Hubs client types are safe to cache and use as a singleton for the lifetime
    // of the application, which is best practice when events are being published or read regularly.
    EventHubProducerClient producerClient;
    
    // Create a producer client that you can use to send events to an event hub
    producerClient = new EventHubProducerClient(connectionString, eventHubName);
    
    // Create a schema registry client that you can use to serialize and validate data.  
    var schemaRegistryClient = new SchemaRegistryClient(schemaRegistryEndpoint, new DefaultAzureCredential());
    
    // Create an Avro object serializer using the Schema Registry client object. 
    var serializer = new SchemaRegistryAvroSerializer(schemaRegistryClient, schemaGroup, new SchemaRegistryAvroSerializerOptions { AutoRegisterSchemas = true });
    
    // Create a new order object using the generated type/class 'Order'. 
    var sampleOrder = new Order { id = "1234", amount = 45.29, description = "First sample order." };
    EventData eventData = (EventData)await serializer.SerializeAsync(sampleOrder, messageType: typeof(EventData));
    
    // Create a batch of events 
    using EventDataBatch eventBatch = await producerClient.CreateBatchAsync();
    
    // Add the event data to the event batch. 
    eventBatch.TryAdd(eventData);
    
    // Send the batch of events to the event hub. 
    await producerClient.SendAsync(eventBatch);
    Console.WriteLine("A batch of 1 order has been published.");        
    
  2. Ersetzen Sie die folgenden Platzhalterwerte durch die realen Werte.

    • EVENTHUBSNAMESPACECONNECTIONSTRING - Verbindungszeichenfolge für den Event Hubs-Namespace
    • EVENTHUBNAME - Name des Event Hubs
    • EVENTHUBSNAMESPACENAME - Name des Event Hubs-Namespaces
    • SCHEMAGROUPNAME - Name der Schemagruppe
    // connection string to the Event Hubs namespace
    const string connectionString = "EVENTHUBSNAMESPACECONNECTIONSTRING";
    
    // name of the event hub
    const string eventHubName = "EVENTHUBNAME";
    
    // Schema Registry endpoint 
    const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net";
    
    // name of the consumer group   
    const string schemaGroup = "SCHEMAGROUPNAME";
    
  3. Erstellen Sie das Projekt, und vergewissern Sie sich, dass keine Fehler vorhanden sind.

  4. Führen Sie das Programm aus, und warten Sie auf die Bestätigungsmeldung.

    A batch of 1 order has been published.
    
  5. Im Azure-Portal können Sie überprüfen, ob der Event Hub die Ereignisse empfangen hat. Wechseln zur Ansicht "Nachrichten " im Abschnitt "Metriken ". Aktualisieren Sie die Seite, um das Diagramm zu aktualisieren. Es kann einige Sekunden dauern, bis die Nachricht empfangen wurde.

    Abbildung der Azure-Portalseite, um zu überprüfen, ob der Event Hub die Ereignisse empfangen hat.

Nutzen von Ereignissen aus Event Hubs mit Schemaüberprüfung

In diesem Abschnitt wird gezeigt, wie Sie eine .NET Core-Konsolenanwendung schreiben, die Ereignisse von einem Event Hub empfängt, und die Schemaregistrierung zum Deserialisieren von Ereignisdaten verwenden.

Zusätzliche Voraussetzungen

  • Erstellen Sie das Speicherkonto, das zum Einsatz des Ereignisprozessors verwendet werden soll.

Erstellen einer Consumeranwendung

  1. Klicken Sie im Projektmappen-Explorer-Fenster mit der rechten Maustaste auf die SRQuickStart-Projektmappe, wählen Sie Hinzufügen und dann Neues Projekt aus.
  2. Wählen Sie zuerst Konsolenanwendung, dann Weiter aus.
  3. Geben Sie " OrderConsumer " für den Projektnamen ein, und wählen Sie "Erstellen" aus.
  4. Klicken Sie im Projektmappen-Explorer-Fenster mit der rechten Maustaste auf OrderConsumer, und wählen Sie Als Startprojekt festlegen aus.

Hinzufügen des Event Hubs-NuGet-Pakets

  1. Klicken Sie auf Extras>NuGet-Paket-Manager>Paket-Manager-Konsole.

  2. Vergewissern Sie sich im Paket-Manager-Konsolenfenster , dass "OrderConsumer " für das Standardprojekt ausgewählt ist. Wenn nicht, verwenden Sie die Dropdownliste, um "OrderConsumer" auszuwählen.

  3. Führen Sie den folgenden Befehl aus, um die erforderlichen NuGet-Pakete zu installieren. Drücken Sie die EINGABETASTE, um den letzten Befehl auszuführen.

    Install-Package Azure.Messaging.EventHubs
    Install-Package Azure.Messaging.EventHubs.Processor
    Install-Package Azure.Identity
    Install-Package Microsoft.Azure.Data.SchemaRegistry.ApacheAvro
    Install-Package Azure.ResourceManager.Compute
    
  4. Authentifizieren Sie Produzentenanwendungen, um mithilfe von Visual Studio eine Verbindung mit Azure herzustellen, wie in der Azure Identity-Clientbibliothek für .NET gezeigt.

  5. Melden Sie sich bei Azure mit dem Benutzerkonto an, das mitglied der Schema Registry Reader Rolle auf Namespaceebene ist. Informationen zu Schemaregistrierungsrollen finden Sie unter Azure rollenbasierte Zugriffssteuerung.

  6. Fügen Sie die Order.cs Datei hinzu, die Sie als Teil der Erstellung der Produzenten-App zum OrderConsumer-Projekt generiert haben.

  7. Klicken Sie mit der rechten Maustaste auf das Projekt "OrderConsumer ", und wählen Sie "Als Startprojekt festlegen" aus.

Schreiben von Code zum Empfangen von Ereignissen und Deserialisieren von Ereignissen mithilfe der Schemaregistrierung

  1. Fügen Sie der Datei Program.cs den folgenden Code hinzu. Details finden Sie in den Codekommentaren. Übergeordnete Schritte im Code sind:

    1. Erstellen Sie einen Consumerclient, den Sie zum Senden von Ereignissen an einen Event Hub verwenden können.
    2. Erstellen Sie einen Blob-Container-Client für den Blob-Container im Azure Blob Storage.
    3. Erstellen Sie einen Ereignisprozessorclient, und registrieren Sie Ereignis- und Fehlerhandler.
    4. Erstellen Sie im Ereignishandler einen Schemaregistrierungsclient, mit dem Sie Ereignisdaten in ein Order Objekt deserialisieren können.
    5. Deserialisieren Sie die Ereignisdaten in einem Order Objekt mithilfe des Serialisierungsprogramms.
    6. Drucken Sie die Informationen über die empfangene Bestellung.
    using Azure.Data.SchemaRegistry;
    using Azure.Identity;
    using Microsoft.Azure.Data.SchemaRegistry.ApacheAvro;
    using Azure.Storage.Blobs;
    using Azure.Messaging.EventHubs;
    using Azure.Messaging.EventHubs.Consumer;
    using Azure.Messaging.EventHubs.Processor;
    
    using Microsoft.Azure.Data.SchemaRegistry.example;
    
    
    // connection string to the Event Hubs namespace
    const string connectionString = "EVENTHUBSNAMESPACECONNECTIONSTRING";
    
    // name of the event hub
    const string eventHubName = "EVENTHUBNAME";
    
    // Schema Registry endpoint 
    const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net";
    
    // name of the consumer group   
    const string schemaGroup = "SCHEMAGROUPNAME";
    
    // connection string for the Azure Storage account
    const string blobStorageConnectionString = "AZURESTORAGECONNECTIONSTRING";
    
    // name of the blob container that will be used as a checkpoint store
    const string blobContainerName = "BLOBCONTAINERNAME";
    
    // Create a blob container client that the event processor will use 
    BlobContainerClient storageClient = new BlobContainerClient(blobStorageConnectionString, blobContainerName);
    
    // Create an event processor client to process events in the event hub
    EventProcessorClient processor = new EventProcessorClient(storageClient, EventHubConsumerClient.DefaultConsumerGroupName, connectionString, eventHubName);
    
    // Register handlers for processing events and handling errors
    processor.ProcessEventAsync += ProcessEventHandler;
    processor.ProcessErrorAsync += ProcessErrorHandler;
    
    // Start the processing
    await processor.StartProcessingAsync();
    
    // Wait for 30 seconds for the events to be processed
    await Task.Delay(TimeSpan.FromSeconds(30));
    
    // Stop the processing
    await processor.StopProcessingAsync();
    
    static async Task ProcessEventHandler(ProcessEventArgs eventArgs)
    {
        // Create a schema registry client that you can use to serialize and validate data.  
        var schemaRegistryClient = new SchemaRegistryClient(schemaRegistryEndpoint, new DefaultAzureCredential());
    
        // Create an Avro object serializer using the Schema Registry client object. 
        var serializer = new SchemaRegistryAvroSerializer(schemaRegistryClient, schemaGroup, new SchemaRegistryAvroSerializerOptions { AutoRegisterSchemas = true });
    
        // Deserialized data in the received event using the schema 
        Order sampleOrder = (Order)await serializer.DeserializeAsync(eventArgs.Data, typeof(Order));
    
        // Print the received event
        Console.WriteLine($"Received order with ID: {sampleOrder.id}, amount: {sampleOrder.amount}, description: {sampleOrder.description}");
    
           await eventArgs.UpdateCheckpointAsync(eventArgs.CancellationToken);
        }
    
        static Task ProcessErrorHandler(ProcessErrorEventArgs eventArgs)
    {
        // Write details about the error to the console window
        Console.WriteLine($"\tPartition '{eventArgs.PartitionId}': an unhandled exception was encountered. This was not expected to happen.");
        Console.WriteLine(eventArgs.Exception.Message);
        return Task.CompletedTask;
    }      
    
  2. Ersetzen Sie die folgenden Platzhalterwerte durch die realen Werte.

    • EVENTHUBSNAMESPACE-CONNECTIONSTRING - Verbindungszeichenfolge für den Event Hubs-Namespace
    • EVENTHUBNAME - Name des Event Hubs
    • EVENTHUBSNAMESPACENAME - Name des Event Hubs-Namespaces
    • SCHEMAGROUPNAME - Name der Schemagruppe
    • AZURESTORAGECONNECTIONSTRING – Verbindungszeichenfolge für das Azure-Speicherkonto
    • BLOBCONTAINERNAME: Name des Blobcontainers
    // connection string to the Event Hubs namespace
    const string connectionString = "EVENTHUBSNAMESPACE-CONNECTIONSTRING";
    
    // name of the event hub
    const string eventHubName = "EVENTHUBNAME";
    
    // Schema Registry endpoint 
    const string schemaRegistryEndpoint = "EVENTHUBSNAMESPACENAME.servicebus.windows.net";
    
    // name of the consumer group   
    const string schemaGroup = "SCHEMAGROUPNAME";
    
    // Azure storage connection string
    const string blobStorageConnectionString = "AZURESTORAGECONNECTIONSTRING";
    
    // Azure blob container name
    const string blobContainerName = "BLOBCONTAINERNAME";
    
  3. Erstellen Sie das Projekt, und vergewissern Sie sich, dass keine Fehler vorhanden sind.

  4. Führen Sie die Empfängeranwendung aus.

  5. Es sollte eine Meldung angezeigt werden, dass der Event Hub die Ereignisse empfangen hat.

    Received order with ID: 1234, amount: 45.29, description: First sample order.
    

    Bei diesen Ereignissen handelt es sich um die drei Ereignisse, die Sie zuvor durch Ausführen des Sendeprogramms an den Event Hub gesendet haben.

Beispiele

Siehe Azure Schema Registry Apache Avro-Clientbibliothek für .NET.

Bereinigen von Ressourcen

Löschen Sie den Event Hubs-Namespace, oder löschen Sie die Ressourcengruppe, die den Namespace enthält.

Nächster Schritt