Exercice : traiter les événements et stocker les données dans Azure Cosmos DB

Effectué

Une deuxième fonction peut écouter les événements de l’espace de noms spécifique dans le hub d’événements Azure et les stocker dans une base de données créée avec Azure Cosmos DB.

Créer une base de données avec Azure Cosmos DB

Pour créer la base de données, utilisez la az cosmosdb create commande. La commande utilise un compte Azure Cosmos DB, une base de données et un conteneur SQL.

az cosmosdb create \
    --resource-group $RESOURCE_GROUP \
    --name $COSMOS_DB_ACCOUNT
az cosmosdb sql database create \
    --resource-group $RESOURCE_GROUP \
    --account-name $COSMOS_DB_ACCOUNT \
    --name TelemetryDb
az cosmosdb sql container create \
    --resource-group $RESOURCE_GROUP \
    --account-name $COSMOS_DB_ACCOUNT \
    --database-name TelemetryDb \
    --name TelemetryInfo \
    --partition-key-path '/temperatureStatus'

Pour notre scénario, la température est intéressante. Nous définissons temperatureStatus donc comme clé de partition.

Générer, configurer et déployer une autre fonction Azure

Avec event Hubs, vous pouvez commencer par des flux de données en mégaoctets et atteindre les gigaoctets ou les téraoctets. La fonctionnalité d’autoinflate est l’une des nombreuses options disponibles pour ajuster le nombre d’unités de débit afin de s'adapter à vos besoins.

Les applications consommatrices pour chaque fonction ont une vue distincte du flux d’événements. Ils lisent le flux indépendamment à leur rythme et avec leurs propres décalages.

Pour notre scénario, vous créez une fonction Azure consommatrice comme exemple. Pour créer la fonction, et en respectant les meilleures pratiques, elle doit être indépendante, avec son propre compte de stockage et des liaisons autonomes pour garantir un couplage lâche et une bonne scalabilité.

az storage account create \
    --resource-group $RESOURCE_GROUP \
    --name $STORAGE_ACCOUNT"c" \
    --sku Standard_LRS
az functionapp create \
    --resource-group $RESOURCE_GROUP \
    --name $FUNCTION_APP"-c"\
    --storage-account $STORAGE_ACCOUNT"c" \
    --consumption-plan-location $LOCATION \
    --runtime java \
    --functions-version 4

Récupérer les chaînes de connexion

La fonction consommateur doit connaître son compte de stockage et le hub d’événements. Il doit également être conscient de la base de données dans laquelle il écrit les événements traités.

AZURE_WEB_JOBS_STORAGE=$( \
    az storage account show-connection-string \
        --resource-group $RESOURCE_GROUP \
        --name $STORAGE_ACCOUNT"c" \
        --query connectionString \
        --output tsv)
echo $AZURE_WEB_JOBS_STORAGE
COSMOS_DB_CONNECTION_STRING=$( \
    az cosmosdb keys list \
        --resource-group $RESOURCE_GROUP \
        --name $COSMOS_DB_ACCOUNT \
        --type connection-strings \
        --query 'connectionStrings[0].connectionString' \
        --output tsv)
echo $COSMOS_DB_CONNECTION_STRING

Vous pouvez utiliser la commande echo $EVENT_HUB_CONNECTION_STRING pour vérifier si la variable est toujours définie correctement. Sinon, réexécutez la commande suivante :

EVENT_HUB_CONNECTION_STRING=$( \
    az eventhubs eventhub authorization-rule keys list \
        --resource-group $RESOURCE_GROUP \
        --name $EVENT_HUB_AUTHORIZATION_RULE \
        --eventhub-name $EVENT_HUB_NAME \
        --namespace-name $EVENT_HUB_NAMESPACE \
        --query primaryConnectionString \
        --output tsv)
echo $EVENT_HUB_CONNECTION_STRING

Ces chaînes de connexion doivent être stockées dans les paramètres d’application de votre compte Azure Functions.

az functionapp config appsettings set \
    --resource-group $RESOURCE_GROUP \
    --name $FUNCTION_APP"-c" \
    --settings \
        AzureWebJobsStorage=$AZURE_WEB_JOBS_STORAGE \
        EventHubConnectionString=$EVENT_HUB_CONNECTION_STRING \
        CosmosDBConnectionString=$COSMOS_DB_CONNECTION_STRING

Remarque

Pour les environnements de production, vous pouvez utiliser une instance d’Azure Key Vault pour stocker et gérer les chaînes de connexion.

Créer l’application de fonctions

Avant de créer la fonction suivante, vérifiez que vous êtes dans le dossier approprié.

cd ..
mvn archetype:generate --batch-mode \
    -DarchetypeGroupId=com.microsoft.azure \
    -DarchetypeArtifactId=azure-functions-archetype \
    -DappName=$FUNCTION_APP"-c" \
    -DresourceGroup=$RESOURCE_GROUP \
    -DappRegion=$LOCATION \
    -DappServicePlanName=$LOCATION"plan" \
    -DgroupId=com.learn \
    -DartifactId=telemetry-functions-consumer

La commande crée une application comme dans le dernier exercice. Vous supprimez les fichiers de test, mettez à jour le local.settings.file avec la commande fetch-app-settings, puis remplacez le fichier existant Function.java.

cd telemetry-functions-consumer
rm -r src/test

Mettez à jour les paramètres locaux pour l’exécution et le débogage locaux.

func azure functionapp fetch-app-settings $FUNCTION_APP"-c"

Ensuite, ouvrez le Function.java fichier et remplacez le contenu par le code suivant :

package com.learn;

import com.learn.TelemetryItem.status;
import com.microsoft.azure.functions.annotation.FunctionName;
import com.microsoft.azure.functions.ExecutionContext;
import com.microsoft.azure.functions.OutputBinding;
import com.microsoft.azure.functions.annotation.Cardinality;
import com.microsoft.azure.functions.annotation.CosmosDBOutput;
import com.microsoft.azure.functions.annotation.EventHubTrigger;

public class Function {

    @FunctionName("processSensorData")
    public void processSensorData(
        @EventHubTrigger(
            name = "msg",
            eventHubName = "", // blank because the value is included in the connection string
            cardinality = Cardinality.ONE,
            connection = "EventHubConnectionString")
            TelemetryItem item,
        @CosmosDBOutput(
            name = "databaseOutput",
            databaseName = "TelemetryDb",
            collectionName = "TelemetryInfo",
            connectionStringSetting = "CosmosDBConnectionString")
            OutputBinding<TelemetryItem> document,
        final ExecutionContext context) {
    
        context.getLogger().info("Event hub message received: " + item.toString());
    
        if (item.getPressure() > 30) {
            item.setNormalPressure(false);
        } else {
            item.setNormalPressure(true);
        }
    
        if (item.getTemperature() < 40) {
            item.setTemperatureStatus(status.COOL);
        } else if (item.getTemperature() > 90) {
            item.setTemperatureStatus(status.HOT);
        } else {
            item.setTemperatureStatus(status.WARM);
        }
    
        document.setValue(item);
    }
}

Créez un autre fichier appelé TelemetryItem.java dans le même emplacement que Function.java, puis ajoutez le code suivant :

package com.learn;

public class TelemetryItem {

    private String id;
    private double temperature;
    private double pressure;
    private boolean isNormalPressure;
    private status temperatureStatus;
    static enum status {
        COOL,
        WARM,
        HOT
    }

    public TelemetryItem(double temperature, double pressure) {
        this.temperature = temperature;
        this.pressure = pressure;
    }

    public String getId() {
        return id;
    }

    public double getTemperature() {
        return temperature;
    }

    public double getPressure() {
        return pressure;
    }

    @Override
    public String toString() {
        return "TelemetryItem={id=" + id + ",temperature="
            + temperature + ",pressure=" + pressure + "}";
    }

    public boolean isNormalPressure() {
        return isNormalPressure;
    }

    public void setNormalPressure(boolean isNormal) {
        this.isNormalPressure = isNormal;
    }

    public status getTemperatureStatus() {
        return temperatureStatus;
    }

    public void setTemperatureStatus(status temperatureStatus) {
        this.temperatureStatus = temperatureStatus;
    }
}

Lorsque le hub d'événements reçoit le message, il génère un événement. La fonction processSensorData s'exécute lorsqu'il reçoit l'événement. Ensuite, il traite les données d’événement et utilise une liaison de sortie d’Azure Cosmos DB pour envoyer les résultats à la base de données. Nous utilisons à nouveau la TelemetryItem.java classe. Les TelemetryItem objets peuvent être considérés comme le contrat piloté par le consommateur entre les participants de ce système piloté par les événements.

Exécution locale

Avec Azure Functions, vous pouvez recevoir des événements de partout dans le monde. Oui, vous pouvez même recevoir des événements localement sur votre ordinateur de développement !

mvn clean package
mvn azure-functions:run

Après les messages de génération et de démarrage, vous voyez les événements entrants lors de l’exécution de la fonction :

[2021-01-19T16:45:24.709Z] Executing 'Functions.processSensorData' (Reason='(null)', Id=87354afa-abf4-4963-bd44-0c1421048240)
[2021-01-19T16:45:24.712Z] Event hub message received: TelemetryItem={id=null,temperature=21.653044570769897,pressure=36.061288095436126}
[2021-01-19T16:45:24.712Z] Function "processSensorData" (Id: 87354afa-abf4-4963-bd44-0c1421048240) invoked by Java Worker

Dans le portail Azure, accédez à votre compte Azure Cosmos DB. Sélectionnez l’Explorateur de données, sélectionnez TelemetryInfo, puis sélectionnez Éléments pour afficher vos données lorsqu’elles arrivent.

Capture d’écran montrant TelemetryInfo dans Azure Cosmos DB Data Explorer.

Déployer sur Azure

À présent, nous allons déplacer toute la charge de travail dans le cloud. Pour déployer les fonctions sur Azure Functions, vous utilisez la commande mvn azure-functions:deployMaven . Vérifiez que vous êtes toujours dans le référentiel correct, telemetry-functions.

mvn azure-functions:deploy

Merveilleux! Nous avons déployé l’ensemble du scénario de télémétrie en envoyant les données vers un hub d’événements et en consommant les données avec une fonction indépendante différente. La fonction traite les données, puis stocke le résultat dans une base de données créée avec Azure Cosmos DB. Comment pouvons-nous nous assurer que notre application répond à nos exigences prédéfinies ? En utilisant la surveillance.