Übung – Verarbeiten der Ereignisse und Speichern der Daten in Azure Cosmos DB

Abgeschlossen

Eine zweite Funktion kann Ereignisse des spezifischen Namespaces im Azure Event Hub überwachen und verarbeiten und in einer Datenbank speichern, die mit Azure Cosmos DB erstellt wurde.

Erstellen einer Datenbank mit Azure Cosmos DB

Verwenden Sie den az cosmosdb create Befehl, um die Datenbank zu erstellen. Der Befehl verwendet ein Azure Cosmos DB-Konto, eine Datenbank und einen SQL-Container.

az cosmosdb create \
    --resource-group $RESOURCE_GROUP \
    --name $COSMOS_DB_ACCOUNT
az cosmosdb sql database create \
    --resource-group $RESOURCE_GROUP \
    --account-name $COSMOS_DB_ACCOUNT \
    --name TelemetryDb
az cosmosdb sql container create \
    --resource-group $RESOURCE_GROUP \
    --account-name $COSMOS_DB_ACCOUNT \
    --database-name TelemetryDb \
    --name TelemetryInfo \
    --partition-key-path '/temperatureStatus'

Für unser Szenario ist die Temperatur interessant. Daher definieren temperatureStatus wir den Partitionsschlüssel.

Erstellen, Konfigurieren und Bereitstellen einer anderen Azure-Funktion

Mit Event Hubs können Sie mit Datenströmen in Megabyte beginnen und auf Gigabyte oder Terabyte wachsen. Das Autoinflate-Feature ist eine der vielen Verfügbaren Optionen, um die Anzahl der Durchsatzeinheiten entsprechend Ihren Nutzungsanforderungen zu skalieren.

Die Anwendungen, die jede Funktion konsumieren, haben eine separate Ansicht des Ereignisstroms. Sie lesen den Stream unabhängig voneinander in ihrem eigenen Tempo und mit eigenen Offsets.

Für unser Szenario erstellen Sie als Beispiel eine Azure-Verbrauchsfunktion. Um die Funktion zu erstellen und bewährte Methoden zu beachten, sollte sie unabhängig gestaltet sein, mit einem eigenen Speicherkonto und "Bindings" für lose Kopplung und Skalierbarkeit.

az storage account create \
    --resource-group $RESOURCE_GROUP \
    --name $STORAGE_ACCOUNT"c" \
    --sku Standard_LRS
az functionapp create \
    --resource-group $RESOURCE_GROUP \
    --name $FUNCTION_APP"-c"\
    --storage-account $STORAGE_ACCOUNT"c" \
    --consumption-plan-location $LOCATION \
    --runtime java \
    --functions-version 4

Abrufen der Verbindungszeichenfolgen

Das Speicherkonto und der Event Hub müssen für die Consumerfunktion bekannt sein. Außerdem muss sie sich der Datenbank bewusst sein, in die die verarbeiteten Ereignisse geschrieben werden.

AZURE_WEB_JOBS_STORAGE=$( \
    az storage account show-connection-string \
        --resource-group $RESOURCE_GROUP \
        --name $STORAGE_ACCOUNT"c" \
        --query connectionString \
        --output tsv)
echo $AZURE_WEB_JOBS_STORAGE
COSMOS_DB_CONNECTION_STRING=$( \
    az cosmosdb keys list \
        --resource-group $RESOURCE_GROUP \
        --name $COSMOS_DB_ACCOUNT \
        --type connection-strings \
        --query 'connectionStrings[0].connectionString' \
        --output tsv)
echo $COSMOS_DB_CONNECTION_STRING

Mit dem Befehl echo $EVENT_HUB_CONNECTION_STRING können Sie überprüfen, ob die Variable weiterhin ordnungsgemäß festgelegt ist. Führen Sie andernfalls den folgenden Befehl erneut aus:

EVENT_HUB_CONNECTION_STRING=$( \
    az eventhubs eventhub authorization-rule keys list \
        --resource-group $RESOURCE_GROUP \
        --name $EVENT_HUB_AUTHORIZATION_RULE \
        --eventhub-name $EVENT_HUB_NAME \
        --namespace-name $EVENT_HUB_NAMESPACE \
        --query primaryConnectionString \
        --output tsv)
echo $EVENT_HUB_CONNECTION_STRING

Diese Verbindungszeichenfolgen müssen in den Anwendungseinstellungen für Ihr Azure Functions-Konto gespeichert werden.

az functionapp config appsettings set \
    --resource-group $RESOURCE_GROUP \
    --name $FUNCTION_APP"-c" \
    --settings \
        AzureWebJobsStorage=$AZURE_WEB_JOBS_STORAGE \
        EventHubConnectionString=$EVENT_HUB_CONNECTION_STRING \
        CosmosDBConnectionString=$COSMOS_DB_CONNECTION_STRING

Hinweis

Für Produktionsumgebungen können Sie eine Instanz von Azure Key Vault verwenden, um die Verbindungszeichenfolgen zu speichern und zu verwalten.

Erstellen der Funktionsanwendung

Bevor Sie die nächste Funktion erstellen, stellen Sie sicher, dass Sie sich im richtigen Ordner befinden.

cd ..
mvn archetype:generate --batch-mode \
    -DarchetypeGroupId=com.microsoft.azure \
    -DarchetypeArtifactId=azure-functions-archetype \
    -DappName=$FUNCTION_APP"-c" \
    -DresourceGroup=$RESOURCE_GROUP \
    -DappRegion=$LOCATION \
    -DappServicePlanName=$LOCATION"plan" \
    -DgroupId=com.learn \
    -DartifactId=telemetry-functions-consumer

Der Befehl erstellt eine Anwendung wie in der letzten Übung. Sie löschen die Testdateien, aktualisieren local.settings.file mit dem Befehl fetch-app-settings und ersetzen dann die vorhandene Function.java Datei.

cd telemetry-functions-consumer
rm -r src/test

Aktualisieren Sie die lokalen Einstellungen für die lokale Ausführung und das Debuggen.

func azure functionapp fetch-app-settings $FUNCTION_APP"-c"

Öffnen Sie als Nächstes die Function.java Datei, und ersetzen Sie den Inhalt durch den folgenden Code:

package com.learn;

import com.learn.TelemetryItem.status;
import com.microsoft.azure.functions.annotation.FunctionName;
import com.microsoft.azure.functions.ExecutionContext;
import com.microsoft.azure.functions.OutputBinding;
import com.microsoft.azure.functions.annotation.Cardinality;
import com.microsoft.azure.functions.annotation.CosmosDBOutput;
import com.microsoft.azure.functions.annotation.EventHubTrigger;

public class Function {

    @FunctionName("processSensorData")
    public void processSensorData(
        @EventHubTrigger(
            name = "msg",
            eventHubName = "", // blank because the value is included in the connection string
            cardinality = Cardinality.ONE,
            connection = "EventHubConnectionString")
            TelemetryItem item,
        @CosmosDBOutput(
            name = "databaseOutput",
            databaseName = "TelemetryDb",
            collectionName = "TelemetryInfo",
            connectionStringSetting = "CosmosDBConnectionString")
            OutputBinding<TelemetryItem> document,
        final ExecutionContext context) {
    
        context.getLogger().info("Event hub message received: " + item.toString());
    
        if (item.getPressure() > 30) {
            item.setNormalPressure(false);
        } else {
            item.setNormalPressure(true);
        }
    
        if (item.getTemperature() < 40) {
            item.setTemperatureStatus(status.COOL);
        } else if (item.getTemperature() > 90) {
            item.setTemperatureStatus(status.HOT);
        } else {
            item.setTemperatureStatus(status.WARM);
        }
    
        document.setValue(item);
    }
}

Erstellen Sie eine weitere neue Datei namens TelemetryItem.java an demselben Speicherort wie Function.java, und fügen Sie den folgenden Code hinzu:

package com.learn;

public class TelemetryItem {

    private String id;
    private double temperature;
    private double pressure;
    private boolean isNormalPressure;
    private status temperatureStatus;
    static enum status {
        COOL,
        WARM,
        HOT
    }

    public TelemetryItem(double temperature, double pressure) {
        this.temperature = temperature;
        this.pressure = pressure;
    }

    public String getId() {
        return id;
    }

    public double getTemperature() {
        return temperature;
    }

    public double getPressure() {
        return pressure;
    }

    @Override
    public String toString() {
        return "TelemetryItem={id=" + id + ",temperature="
            + temperature + ",pressure=" + pressure + "}";
    }

    public boolean isNormalPressure() {
        return isNormalPressure;
    }

    public void setNormalPressure(boolean isNormal) {
        this.isNormalPressure = isNormal;
    }

    public status getTemperatureStatus() {
        return temperatureStatus;
    }

    public void setTemperatureStatus(status temperatureStatus) {
        this.temperatureStatus = temperatureStatus;
    }
}

Wenn der Event Hub die Nachricht empfängt, generiert er ein Ereignis. Die Funktion processSensorData wird ausgeführt, wenn sie das Ereignis empfängt. Anschließend verarbeitet sie die Ereignisdaten und verwendet eine Ausgabebindung von Azure Cosmos DB, um die Ergebnisse an die Datenbank zu senden. Wir verwenden die TelemetryItem.java Klasse erneut. Die TelemetryItem Objekte können als verbrauchergesteuerter Vertrag zwischen den Teilnehmern dieses ereignisgesteuerten Systems betrachtet werden.

Lokal ausführen

Mit Azure Functions können Sie Ereignisse aus aller Welt empfangen. Ja, Sie können ereignisse sogar lokal auf Ihrem Entwicklungscomputer empfangen!

mvn clean package
mvn azure-functions:run

Nach den Build- und Startmeldungen werden die eingehenden Ereignisse angezeigt, sobald die Funktion ausgeführt wird.

[2021-01-19T16:45:24.709Z] Executing 'Functions.processSensorData' (Reason='(null)', Id=87354afa-abf4-4963-bd44-0c1421048240)
[2021-01-19T16:45:24.712Z] Event hub message received: TelemetryItem={id=null,temperature=21.653044570769897,pressure=36.061288095436126}
[2021-01-19T16:45:24.712Z] Function "processSensorData" (Id: 87354afa-abf4-4963-bd44-0c1421048240) invoked by Java Worker

Wechseln Sie im Azure-Portal zu Ihrem Azure Cosmos DB-Konto. Wählen Sie "Daten-Explorer", dann "TelemetryInfo" und dann "Elemente " aus, um Ihre Daten anzuzeigen, sobald sie eintreffen.

Screenshot: TelemetryInfo im Daten-Explorer in Azure Cosmos DB

Bereitstellen in Azure

Jetzt verschieben wir die gesamte Workload in der Cloud. Um die Funktionen in Azure Functions bereitzustellen, verwenden Sie den Befehl mvn azure-functions:deployMaven. Stellen Sie sicher, dass Sie sich weiterhin im richtigen Repository telemetry-functions befinden.

mvn azure-functions:deploy

Wunderbar! Wir haben das gesamte Telemetrieszenario umgesetzt, indem wir die Daten zu einem Event-Hub geleitet und die Daten mit einer anderen unabhängigen Funktion verarbeitet haben. Die Funktion verarbeitet die Daten und speichert dann das Ergebnis in einer Datenbank, die mit Azure Cosmos DB erstellt wurde. Wie können wir sicherstellen, dass unsere Anwendung unsere vordefinierten Anforderungen erfüllt? Mithilfe der Überwachung.