演習 - イベントを処理して Azure Cosmos DB にデータを格納する

完了

2 つ目の関数は、Azure イベント ハブ内の特定の名前空間のイベントをリッスンし、Azure Cosmos DB で作成されたデータベースに処理して格納できます。

Azure Cosmos DB を使用してデータベースを作成する

データベースを作成するには、 az cosmosdb create コマンドを使用します。 このコマンドでは、Azure Cosmos DB アカウント、データベース、および SQL コンテナーが使用されます。

az cosmosdb create \
    --resource-group $RESOURCE_GROUP \
    --name $COSMOS_DB_ACCOUNT
az cosmosdb sql database create \
    --resource-group $RESOURCE_GROUP \
    --account-name $COSMOS_DB_ACCOUNT \
    --name TelemetryDb
az cosmosdb sql container create \
    --resource-group $RESOURCE_GROUP \
    --account-name $COSMOS_DB_ACCOUNT \
    --database-name TelemetryDb \
    --name TelemetryInfo \
    --partition-key-path '/temperatureStatus'

このシナリオでは、温度が興味深いです。 そのため、パーティション キーとして temperatureStatus を定義します。

別の Azure 関数をビルド、構成、デプロイする

イベント ハブを使用すると、データ ストリームをメガバイト単位で開始し、ギガバイトまたはテラバイトに拡張できます。 自動拡張機能は、使用量のニーズに合わせてスループット ユニットの数をスケーリングするために使用できる多くのオプションの 1 つです。

各関数の使用アプリケーションには、イベントのストリームが個別に表示されます。 自分のペースで、独自のオフセットを使用して、ストリームを個別に読み取ります。

このシナリオでは、1 つのコンシュームする Azure 関数を例として作成します。 ベスト プラクティスに従って関数を作成するには、独自のストレージ アカウントとバインドを使用して独立し、疎結合とスケーラビリティを実現する必要があります。

az storage account create \
    --resource-group $RESOURCE_GROUP \
    --name $STORAGE_ACCOUNT"c" \
    --sku Standard_LRS
az functionapp create \
    --resource-group $RESOURCE_GROUP \
    --name $FUNCTION_APP"-c"\
    --storage-account $STORAGE_ACCOUNT"c" \
    --consumption-plan-location $LOCATION \
    --runtime java \
    --functions-version 4

接続文字列を取得する

コンシューマー関数は、そのストレージ アカウントとイベント ハブを認識する必要があります。 また、処理されたイベントを書き込むデータベースも認識する必要があります。

AZURE_WEB_JOBS_STORAGE=$( \
    az storage account show-connection-string \
        --resource-group $RESOURCE_GROUP \
        --name $STORAGE_ACCOUNT"c" \
        --query connectionString \
        --output tsv)
echo $AZURE_WEB_JOBS_STORAGE
COSMOS_DB_CONNECTION_STRING=$( \
    az cosmosdb keys list \
        --resource-group $RESOURCE_GROUP \
        --name $COSMOS_DB_ACCOUNT \
        --type connection-strings \
        --query 'connectionStrings[0].connectionString' \
        --output tsv)
echo $COSMOS_DB_CONNECTION_STRING

コマンド echo $EVENT_HUB_CONNECTION_STRING を使用して、変数がまだ正しく設定されているかどうかを確認できます。 それ以外の場合は、次のコマンドを再実行します。

EVENT_HUB_CONNECTION_STRING=$( \
    az eventhubs eventhub authorization-rule keys list \
        --resource-group $RESOURCE_GROUP \
        --name $EVENT_HUB_AUTHORIZATION_RULE \
        --eventhub-name $EVENT_HUB_NAME \
        --namespace-name $EVENT_HUB_NAMESPACE \
        --query primaryConnectionString \
        --output tsv)
echo $EVENT_HUB_CONNECTION_STRING

これらの接続文字列は、Azure Functions アカウントのアプリケーション設定に格納する必要があります。

az functionapp config appsettings set \
    --resource-group $RESOURCE_GROUP \
    --name $FUNCTION_APP"-c" \
    --settings \
        AzureWebJobsStorage=$AZURE_WEB_JOBS_STORAGE \
        EventHubConnectionString=$EVENT_HUB_CONNECTION_STRING \
        CosmosDBConnectionString=$COSMOS_DB_CONNECTION_STRING

運用環境では、Azure Key Vault のインスタンスを使用して接続文字列を格納および管理できます。

関数アプリケーションを作成する

次の関数を作成する前に、正しいフォルダーにいることを確認します。

cd ..
mvn archetype:generate --batch-mode \
    -DarchetypeGroupId=com.microsoft.azure \
    -DarchetypeArtifactId=azure-functions-archetype \
    -DappName=$FUNCTION_APP"-c" \
    -DresourceGroup=$RESOURCE_GROUP \
    -DappRegion=$LOCATION \
    -DappServicePlanName=$LOCATION"plan" \
    -DgroupId=com.learn \
    -DartifactId=telemetry-functions-consumer

このコマンドは、前回の演習と同様にアプリケーションを作成します。 テスト ファイルを削除し、local.settings.file コマンドでfetch-app-settingsを更新してから、既存のFunction.java ファイルを置き換えます。

cd telemetry-functions-consumer
rm -r src/test

ローカルの実行とデバッグのローカル設定を更新します。

func azure functionapp fetch-app-settings $FUNCTION_APP"-c"

次に、 Function.java ファイルを開き、内容を次のコードに置き換えます。

package com.learn;

import com.learn.TelemetryItem.status;
import com.microsoft.azure.functions.annotation.FunctionName;
import com.microsoft.azure.functions.ExecutionContext;
import com.microsoft.azure.functions.OutputBinding;
import com.microsoft.azure.functions.annotation.Cardinality;
import com.microsoft.azure.functions.annotation.CosmosDBOutput;
import com.microsoft.azure.functions.annotation.EventHubTrigger;

public class Function {

    @FunctionName("processSensorData")
    public void processSensorData(
        @EventHubTrigger(
            name = "msg",
            eventHubName = "", // blank because the value is included in the connection string
            cardinality = Cardinality.ONE,
            connection = "EventHubConnectionString")
            TelemetryItem item,
        @CosmosDBOutput(
            name = "databaseOutput",
            databaseName = "TelemetryDb",
            collectionName = "TelemetryInfo",
            connectionStringSetting = "CosmosDBConnectionString")
            OutputBinding<TelemetryItem> document,
        final ExecutionContext context) {
    
        context.getLogger().info("Event hub message received: " + item.toString());
    
        if (item.getPressure() > 30) {
            item.setNormalPressure(false);
        } else {
            item.setNormalPressure(true);
        }
    
        if (item.getTemperature() < 40) {
            item.setTemperatureStatus(status.COOL);
        } else if (item.getTemperature() > 90) {
            item.setTemperatureStatus(status.HOT);
        } else {
            item.setTemperatureStatus(status.WARM);
        }
    
        document.setValue(item);
    }
}

Function.javaと同じ場所に TelemetryItem.java という名前の別の新しいファイルを作成し、次のコードを追加します。

package com.learn;

public class TelemetryItem {

    private String id;
    private double temperature;
    private double pressure;
    private boolean isNormalPressure;
    private status temperatureStatus;
    static enum status {
        COOL,
        WARM,
        HOT
    }

    public TelemetryItem(double temperature, double pressure) {
        this.temperature = temperature;
        this.pressure = pressure;
    }

    public String getId() {
        return id;
    }

    public double getTemperature() {
        return temperature;
    }

    public double getPressure() {
        return pressure;
    }

    @Override
    public String toString() {
        return "TelemetryItem={id=" + id + ",temperature="
            + temperature + ",pressure=" + pressure + "}";
    }

    public boolean isNormalPressure() {
        return isNormalPressure;
    }

    public void setNormalPressure(boolean isNormal) {
        this.isNormalPressure = isNormal;
    }

    public status getTemperatureStatus() {
        return temperatureStatus;
    }

    public void setTemperatureStatus(status temperatureStatus) {
        this.temperatureStatus = temperatureStatus;
    }
}

イベント ハブでは、メッセージを受け取ると、イベントが生成されます。 イベントを受け取ると、processSensorData 関数が実行されます。 次に、イベント データを処理し、Azure Cosmos DB の出力バインドを使用して結果をデータベースに送信します。 TelemetryItem.java クラスをもう一度使用します。 TelemetryItem オブジェクトは、このイベント ドリブン システムの参加者間のコンシューマー駆動型コントラクトと見なすことができます。

ローカルで実行する

Azure Functions を使用すると、世界中からイベントを受け取ることができます。 はい。開発マシンでローカルでイベントを受信することもできます。

mvn clean package
mvn azure-functions:run

ビルド メッセージとスタートアップ メッセージの後に、関数の実行時に受信イベントが表示されます。

[2021-01-19T16:45:24.709Z] Executing 'Functions.processSensorData' (Reason='(null)', Id=87354afa-abf4-4963-bd44-0c1421048240)
[2021-01-19T16:45:24.712Z] Event hub message received: TelemetryItem={id=null,temperature=21.653044570769897,pressure=36.061288095436126}
[2021-01-19T16:45:24.712Z] Function "processSensorData" (Id: 87354afa-abf4-4963-bd44-0c1421048240) invoked by Java Worker

Azure portal で、Azure Cosmos DB アカウントに移動します。 [データ エクスプローラー] を選択し、[TelemetryInfo] を選択し、[項目] を選択して、到着したときにデータを表示します。

Azure Cosmos DB データ エクスプローラーの TelemetryInfo を示すスクリーンショット。

Azure にデプロイする

次に、クラウド内のワークロード全体をシフトしてみましょう。 関数を Azure Functions にデプロイするには、Maven コマンド mvn azure-functions:deployを使用します。 正しいリポジトリ ( telemetry-functions) にまだいることを確認します。

mvn azure-functions:deploy

いいですね! イベント ハブにデータを送信し、別の独立した関数でデータを使用することで、テレメトリ シナリオ全体をデプロイしました。 この関数はデータを処理し、結果を Azure Cosmos DB で作成されたデータベースに格納します。 アプリケーションが定義済みの要件を満たしていることを確認するにはどうすればよいですか? 監視を使います。