Ejercicio: Procesamiento de los eventos y almacenamiento de los datos en Azure Cosmos DB

Completado

Una segunda función puede escuchar eventos del espacio de nombres específico en el centro de eventos de Azure y procesarlos y almacenarlos en una base de datos creada con Azure Cosmos DB.

Creación de una base de datos con Azure Cosmos DB

Para crear la base de datos, use el az cosmosdb create comando . El comando usa una cuenta de Azure Cosmos DB, una base de datos y un contenedor de SQL.

az cosmosdb create \
    --resource-group $RESOURCE_GROUP \
    --name $COSMOS_DB_ACCOUNT
az cosmosdb sql database create \
    --resource-group $RESOURCE_GROUP \
    --account-name $COSMOS_DB_ACCOUNT \
    --name TelemetryDb
az cosmosdb sql container create \
    --resource-group $RESOURCE_GROUP \
    --account-name $COSMOS_DB_ACCOUNT \
    --database-name TelemetryDb \
    --name TelemetryInfo \
    --partition-key-path '/temperatureStatus'

Para nuestro escenario, la temperatura es interesante. Por lo tanto, definimos temperatureStatus como clave de partición.

Compilación, configuración e implementación de otra función de Azure

Con event Hubs, puede empezar con flujos de datos en megabytes y crecer a gigabytes o terabytes. La característica de autoinflado es una de las muchas opciones disponibles para escalar el número de unidades de rendimiento para satisfacer sus necesidades de uso.

Las aplicaciones que consumen para cada función tienen una vista independiente del flujo de eventos. Leen la secuencia de forma independiente, a su propio ritmo y con sus propios desplazamientos.

En nuestro escenario, creas una función de Azure consumidora como ejemplo. Para crear la función, siguiendo las mejores prácticas, debe ser independiente, debe contar con su propia cuenta de almacenamiento y vinculaciones para acoplamiento flexible y escalabilidad.

az storage account create \
    --resource-group $RESOURCE_GROUP \
    --name $STORAGE_ACCOUNT"c" \
    --sku Standard_LRS
az functionapp create \
    --resource-group $RESOURCE_GROUP \
    --name $FUNCTION_APP"-c"\
    --storage-account $STORAGE_ACCOUNT"c" \
    --consumption-plan-location $LOCATION \
    --runtime java \
    --functions-version 4

Recuperación de las cadenas de conexión

La función de consumidor necesita conocer su cuenta de almacenamiento y el centro de eventos, También debe tener en cuenta la base de datos en la que escribe los eventos procesados.

AZURE_WEB_JOBS_STORAGE=$( \
    az storage account show-connection-string \
        --resource-group $RESOURCE_GROUP \
        --name $STORAGE_ACCOUNT"c" \
        --query connectionString \
        --output tsv)
echo $AZURE_WEB_JOBS_STORAGE
COSMOS_DB_CONNECTION_STRING=$( \
    az cosmosdb keys list \
        --resource-group $RESOURCE_GROUP \
        --name $COSMOS_DB_ACCOUNT \
        --type connection-strings \
        --query 'connectionStrings[0].connectionString' \
        --output tsv)
echo $COSMOS_DB_CONNECTION_STRING

Puede usar el comando echo $EVENT_HUB_CONNECTION_STRING para comprobar si la variable todavía está establecida correctamente. De lo contrario, vuelva a ejecutar el siguiente comando:

EVENT_HUB_CONNECTION_STRING=$( \
    az eventhubs eventhub authorization-rule keys list \
        --resource-group $RESOURCE_GROUP \
        --name $EVENT_HUB_AUTHORIZATION_RULE \
        --eventhub-name $EVENT_HUB_NAME \
        --namespace-name $EVENT_HUB_NAMESPACE \
        --query primaryConnectionString \
        --output tsv)
echo $EVENT_HUB_CONNECTION_STRING

Estas cadenas de conexión deben almacenarse en la configuración de la aplicación para la cuenta de Azure Functions.

az functionapp config appsettings set \
    --resource-group $RESOURCE_GROUP \
    --name $FUNCTION_APP"-c" \
    --settings \
        AzureWebJobsStorage=$AZURE_WEB_JOBS_STORAGE \
        EventHubConnectionString=$EVENT_HUB_CONNECTION_STRING \
        CosmosDBConnectionString=$COSMOS_DB_CONNECTION_STRING

Nota:

En entornos de producción, puede usar una instancia de Azure Key Vault para almacenar y administrar las cadenas de conexión.

Creación de la aplicación de funciones

Antes de crear la siguiente función, asegúrese de que está en la carpeta correcta.

cd ..
mvn archetype:generate --batch-mode \
    -DarchetypeGroupId=com.microsoft.azure \
    -DarchetypeArtifactId=azure-functions-archetype \
    -DappName=$FUNCTION_APP"-c" \
    -DresourceGroup=$RESOURCE_GROUP \
    -DappRegion=$LOCATION \
    -DappServicePlanName=$LOCATION"plan" \
    -DgroupId=com.learn \
    -DartifactId=telemetry-functions-consumer

El comando crea una aplicación como en el último ejercicio. Elimine los archivos de prueba, actualice el local.settings.file con el comando fetch-app-settings y luego reemplace el archivo existente Function.java.

cd telemetry-functions-consumer
rm -r src/test

Actualice la configuración local para la ejecución y depuración locales.

func azure functionapp fetch-app-settings $FUNCTION_APP"-c"

A continuación, abra el Function.java archivo y reemplace el contenido por el código siguiente:

package com.learn;

import com.learn.TelemetryItem.status;
import com.microsoft.azure.functions.annotation.FunctionName;
import com.microsoft.azure.functions.ExecutionContext;
import com.microsoft.azure.functions.OutputBinding;
import com.microsoft.azure.functions.annotation.Cardinality;
import com.microsoft.azure.functions.annotation.CosmosDBOutput;
import com.microsoft.azure.functions.annotation.EventHubTrigger;

public class Function {

    @FunctionName("processSensorData")
    public void processSensorData(
        @EventHubTrigger(
            name = "msg",
            eventHubName = "", // blank because the value is included in the connection string
            cardinality = Cardinality.ONE,
            connection = "EventHubConnectionString")
            TelemetryItem item,
        @CosmosDBOutput(
            name = "databaseOutput",
            databaseName = "TelemetryDb",
            collectionName = "TelemetryInfo",
            connectionStringSetting = "CosmosDBConnectionString")
            OutputBinding<TelemetryItem> document,
        final ExecutionContext context) {
    
        context.getLogger().info("Event hub message received: " + item.toString());
    
        if (item.getPressure() > 30) {
            item.setNormalPressure(false);
        } else {
            item.setNormalPressure(true);
        }
    
        if (item.getTemperature() < 40) {
            item.setTemperatureStatus(status.COOL);
        } else if (item.getTemperature() > 90) {
            item.setTemperatureStatus(status.HOT);
        } else {
            item.setTemperatureStatus(status.WARM);
        }
    
        document.setValue(item);
    }
}

Cree otro archivo denominado TelemetryItem.java en la misma ubicación que Function.java y agregue el código siguiente:

package com.learn;

public class TelemetryItem {

    private String id;
    private double temperature;
    private double pressure;
    private boolean isNormalPressure;
    private status temperatureStatus;
    static enum status {
        COOL,
        WARM,
        HOT
    }

    public TelemetryItem(double temperature, double pressure) {
        this.temperature = temperature;
        this.pressure = pressure;
    }

    public String getId() {
        return id;
    }

    public double getTemperature() {
        return temperature;
    }

    public double getPressure() {
        return pressure;
    }

    @Override
    public String toString() {
        return "TelemetryItem={id=" + id + ",temperature="
            + temperature + ",pressure=" + pressure + "}";
    }

    public boolean isNormalPressure() {
        return isNormalPressure;
    }

    public void setNormalPressure(boolean isNormal) {
        this.isNormalPressure = isNormal;
    }

    public status getTemperatureStatus() {
        return temperatureStatus;
    }

    public void setTemperatureStatus(status temperatureStatus) {
        this.temperatureStatus = temperatureStatus;
    }
}

Cuando el centro de eventos recibe el mensaje, genera un evento. La función processSensorData se ejecuta cuando se recibe el evento. A continuación, procesa los datos del evento y usa un enlace de salida de Azure Cosmos DB para enviar los resultados a la base de datos. Usamos la TelemetryItem.java clase de nuevo. Los objetos TelemetryItem pueden considerarse el contrato impulsado por el consumidor entre los participantes de este sistema controlado por eventos.

Ejecución en modo local

Con Azure Functions, puede recibir eventos de todo el mundo. Incluso puede recibir eventos localmente en la máquina de desarrollo.

mvn clean package
mvn azure-functions:run

Después de los mensajes de compilación e inicio, verá los eventos entrantes cuando se ejecuta la función:

[2021-01-19T16:45:24.709Z] Executing 'Functions.processSensorData' (Reason='(null)', Id=87354afa-abf4-4963-bd44-0c1421048240)
[2021-01-19T16:45:24.712Z] Event hub message received: TelemetryItem={id=null,temperature=21.653044570769897,pressure=36.061288095436126}
[2021-01-19T16:45:24.712Z] Function "processSensorData" (Id: 87354afa-abf4-4963-bd44-0c1421048240) invoked by Java Worker

En Azure Portal, vaya a la cuenta de Azure Cosmos DB. Seleccione Explorador de datos, TelemetryInfo y, a continuación, seleccione Elementos para ver los datos cuando llegue.

Captura de pantalla que muestra TelemetryInfo en el Explorador de datos de Azure Cosmos DB.

Implementación en Azure

Ahora, vamos a cambiar toda la carga de trabajo en la nube. Para implementar las funciones en Azure Functions, use el comando mvn azure-functions:deployMaven . Asegúrese de que todavía está en el repositorio correcto, telemetry-functions.

mvn azure-functions:deploy

¡Maravilloso! Hemos implementado todo el escenario de telemetría enviando los datos hacia un centro de eventos y consumiendo los datos con una función independiente diferente. La función procesa los datos y, a continuación, almacena el resultado en una base de datos creada con Azure Cosmos DB. ¿Cómo podemos asegurarnos de que nuestra aplicación cumple nuestros requisitos predefinidos? Mediante la supervisión.