次の方法で共有


Azure Functions の Apache Kafka トリガー

Azure Functions の Apache Kafka トリガーを使用して、Kafka トピックのメッセージに応答して関数コードを実行します。 Kafka 出力バインドを使用して、関数からトピックに書き込むこともできます。 セットアップと構成の詳細については、「Azure Functions における Apache Kafka バインドの概要」を参照してください。

重要

Kafka バインドは、 Flex 従量課金プランElastic Premium プラン、専用 (App Service) プランの Functions で使用できます。 これらは、Functions ランタイムのバージョン 4.x でのみサポートされています。

トリガーの使用方法は、拡張機能パッケージのバージョンと、関数アプリで使用される C# のモダリティによって異なり、次のモードのいずれかになります。

ランタイムとは別のプロセスで実行 される分離ワーカー プロセス クラス ライブラリ を使用するコンパイル済み C# 関数。

使用する属性は、個別のイベント プロバイダーによって異なります。

次の例は、Kafka メッセージを Kafka イベントとして読み取り、ログする C# 関数を示しています。

[Function("KafkaTrigger")]
public static void Run(
    [KafkaTrigger("BrokerList",
                  "topic",
                  Username = "ConfluentCloudUserName",
                  Password = "ConfluentCloudPassword",
                  Protocol = BrokerProtocol.SaslSsl,
                  AuthenticationMode = BrokerAuthenticationMode.Plain,
                  ConsumerGroup = "$Default")] string eventData, FunctionContext context)
{
    var logger = context.GetLogger("KafkaFunction");
    logger.LogInformation($"C# Kafka trigger function processed a message: {JObject.Parse(eventData)["Value"]}");
}

バッチでイベントを受信するには、次の例に示すように、文字列配列を入力として使用します。

[Function("KafkaTriggerMany")]
public static void Run(
    [KafkaTrigger("BrokerList",
                  "topic",
                  Username = "ConfluentCloudUserName",
                  Password = "ConfluentCloudPassword",
                  Protocol = BrokerProtocol.SaslSsl,
                  AuthenticationMode = BrokerAuthenticationMode.Plain,
                  ConsumerGroup = "$Default",
                  IsBatched = true)] string[] events, FunctionContext context)
{
    foreach (var kevent in events)
    {
        var logger = context.GetLogger("KafkaFunction");
        logger.LogInformation($"C# Kafka trigger function processed a message: {JObject.Parse(kevent)["Value"]}");
    }

次の関数により、Kafka イベントのメッセージとヘッダーがログされます。

[Function("KafkaTriggerWithHeaders")]
public static void Run(
    [KafkaTrigger("BrokerList",
                  "topic",
                  Username = "ConfluentCloudUserName",
                  Password = "ConfluentCloudPassword",
                  Protocol = BrokerProtocol.SaslSsl,
                  AuthenticationMode = BrokerAuthenticationMode.Plain,
                  ConsumerGroup = "$Default")] string eventData, FunctionContext context)
{
    var eventJsonObject = JObject.Parse(eventData);
    var logger = context.GetLogger("KafkaFunction");
    logger.LogInformation($"C# Kafka trigger function processed a message: {eventJsonObject["Value"]}");
    var headersJArr = eventJsonObject["Headers"] as JArray;
    logger.LogInformation("Headers for this event: ");
    foreach (JObject header in headersJArr)
    {
        logger.LogInformation($"{header["Key"]} {System.Text.Encoding.UTF8.GetString((byte[])header["Value"])}");

    }
}

動作する .NET の例の完全なセットについては、Kafka 拡張機能リポジトリを参照してください。

トリガーの使用方法は、Node.js プログラミング モデルのバージョンによって異なります。

Node.js v4 モデルでは、関数コードでトリガーを直接定義します。 詳細については、「Azure Functions Node.js 開発者ガイド」を参照してください。

これらの例では、イベント プロバイダーは Confluent または Azure Event Hubs です。 これらの例では、Kafka メッセージを読み取る関数の Kafka トリガーを定義する方法を示します。

const { app } = require("@azure/functions");

async function kafkaTrigger(event, context) {
  context.log("Event Offset: " + event.Offset);
  context.log("Event Partition: " + event.Partition);
  context.log("Event Topic: " + event.Topic);
  context.log("Event Timestamp: " + event.Timestamp);
  context.log("Event Key: " + event.Key);
  context.log("Event Value (as string): " + event.Value);

  let event_obj = JSON.parse(event.Value);

  context.log("Event Value Object: ");
  context.log("   Value.registertime: ", event_obj.registertime.toString());
  context.log("   Value.userid: ", event_obj.userid);
  context.log("   Value.regionid: ", event_obj.regionid);
  context.log("   Value.gender: ", event_obj.gender);
}

app.generic("Kafkatrigger", {
  trigger: {
    type: "kafkaTrigger",
    direction: "in",
    name: "event",
    topic: "topic",
    brokerList: "%BrokerList%",
    username: "%ConfluentCloudUserName%",
    password: "%ConfluentCloudPassword%",
    consumerGroup: "$Default",
    protocol: "saslSsl",
    authenticationMode: "plain",
    dataType: "string"
  },
  handler: kafkaTrigger,
});

バッチでイベントを受信するには、次の例に示すように、 cardinality 値を many に設定します。

const { app } = require("@azure/functions");

async function kafkaTriggerMany(events, context) {
  for (const event of events) {
    context.log("Event Offset: " + event.Offset);
    context.log("Event Partition: " + event.Partition);
    context.log("Event Topic: " + event.Topic);
    context.log("Event Key: " + event.Key);
    context.log("Event Timestamp: " + event.Timestamp);
    context.log("Event Value (as string): " + event.Value);

    let event_obj = JSON.parse(event.Value);

    context.log("Event Value Object: ");
    context.log("   Value.registertime: ", event_obj.registertime.toString());
    context.log("   Value.userid: ", event_obj.userid);
    context.log("   Value.regionid: ", event_obj.regionid);
    context.log("   Value.gender: ", event_obj.gender);
  }
}

app.generic("kafkaTriggerMany", {
  trigger: {
    type: "kafkaTrigger",
    direction: "in",
    name: "event",
    topic: "topic",
    brokerList: "%BrokerList%",
    username: "%ConfluentCloudUserName%",
    password: "%ConfluentCloudPassword%",
    consumerGroup: "$Default",
    protocol: "saslSsl",
    authenticationMode: "plain",
    dataType: "string",
    cardinality: "MANY"
  },
  handler: kafkaTriggerMany,
});

トリガーに渡されるイベントに対して汎用の Avro スキーマ を定義できます。 この例では、汎用 Avro スキーマを使用して、特定のプロバイダーのトリガーを定義します。

const { app } = require("@azure/functions");

async function kafkaAvroGenericTrigger(event, context) {
  context.log("Processed kafka event: ", event);
  if (context.triggerMetadata?.key !== undefined) {
    context.log("message key: ", context.triggerMetadata?.key);
  }
}

app.generic("kafkaAvroGenericTrigger", {
  trigger: {
    type: "kafkaTrigger",
    direction: "in",
    name: "event",
    protocol: "SASLSSL",
    password: "EventHubConnectionString",
    dataType: "string",
    topic: "topic",
    authenticationMode: "PLAIN",
    avroSchema:
      '{"type":"record","name":"Payment","namespace":"io.confluent.examples.clients.basicavro","fields":[{"name":"id","type":"string"},{"name":"amount","type":"double"},{"name":"type","type":"string"}]}',
    consumerGroup: "$Default",
    username: "$ConnectionString",
    brokerList: "%BrokerList%",
  },
  handler: kafkaAvroGenericTrigger,
});

動作する JavaScript の例の完全なセットについては、Kafka 拡張機能リポジトリを参照してください。

import { app, InvocationContext } from "@azure/functions";

// This is a sample interface that describes the actual data in your event.
interface EventData {
  registertime: number;
  userid: string;
  regionid: string;
  gender: string;
}

export async function kafkaTrigger(
  event: any,
  context: InvocationContext
): Promise<void> {
  context.log("Event Offset: " + event.Offset);
  context.log("Event Partition: " + event.Partition);
  context.log("Event Topic: " + event.Topic);
  context.log("Event Timestamp: " + event.Timestamp);
  context.log("Event Value (as string): " + event.Value);

  let event_obj: EventData = JSON.parse(event.Value);

  context.log("Event Value Object: ");
  context.log("   Value.registertime: ", event_obj.registertime.toString());
  context.log("   Value.userid: ", event_obj.userid);
  context.log("   Value.regionid: ", event_obj.regionid);
  context.log("   Value.gender: ", event_obj.gender);
}

app.generic("Kafkatrigger", {
  trigger: {
    type: "kafkaTrigger",
    direction: "in",
    name: "event",
    topic: "topic",
    brokerList: "%BrokerList%",
    username: "%ConfluentCloudUserName%",
    password: "%ConfluentCloudPassword%",
    consumerGroup: "$Default",
    protocol: "saslSsl",
    authenticationMode: "plain",
    dataType: "string"
  },
  handler: kafkaTrigger,
});

バッチでイベントを受信するには、次の例に示すように、 cardinality 値を many に設定します。

import { app, InvocationContext } from "@azure/functions";

// This is a sample interface that describes the actual data in your event.
interface EventData {
    registertime: number;
    userid: string;
    regionid: string;
    gender: string;
}

interface KafkaEvent {
    Offset: number;
    Partition: number;
    Topic: string;
    Timestamp: number;
    Value: string;
}

export async function kafkaTriggerMany(
    events: any,
    context: InvocationContext
): Promise<void> {
    for (const event of events) {
        context.log("Event Offset: " + event.Offset);
        context.log("Event Partition: " + event.Partition);
        context.log("Event Topic: " + event.Topic);
        context.log("Event Timestamp: " + event.Timestamp);
        context.log("Event Value (as string): " + event.Value);

        let event_obj: EventData = JSON.parse(event.Value);

        context.log("Event Value Object: ");
        context.log("   Value.registertime: ", event_obj.registertime.toString());
        context.log("   Value.userid: ", event_obj.userid);
        context.log("   Value.regionid: ", event_obj.regionid);
        context.log("   Value.gender: ", event_obj.gender);
    }
}

app.generic("kafkaTriggerMany", {
  trigger: {
    type: "kafkaTrigger",
    direction: "in",
    name: "event",
    topic: "topic",
    brokerList: "%BrokerList%",
    username: "%ConfluentCloudUserName%",
    password: "%ConfluentCloudPassword%",
    consumerGroup: "$Default",
    protocol: "saslSsl",
    authenticationMode: "plain",
    dataType: "string",
    cardinality: "MANY"
  },
  handler: kafkaTriggerMany,
});

トリガーに渡されるイベントに対して汎用の Avro スキーマ を定義できます。 この例では、汎用 Avro スキーマを使用して、特定のプロバイダーのトリガーを定義します。

import { app, InvocationContext } from "@azure/functions";

export async function kafkaAvroGenericTrigger(
  event: any,
  context: InvocationContext
): Promise<void> {
  context.log("Processed kafka event: ", event);
  context.log(
    `Message ID: ${event.id}, amount: ${event.amount}, type: ${event.type}`
  );
  if (context.triggerMetadata?.key !== undefined) {
    context.log(`Message Key : ${context.triggerMetadata?.key}`);
  }
}

app.generic("kafkaAvroGenericTrigger", {
  trigger: {
    type: "kafkaTrigger",
    direction: "in",
    name: "event",
    protocol: "SASLSSL",
    username: "ConfluentCloudUsername",
    password: "ConfluentCloudPassword",
    dataType: "string",
    topic: "topic",
    authenticationMode: "PLAIN",
    avroSchema:
      '{"type":"record","name":"Payment","namespace":"io.confluent.examples.clients.basicavro","fields":[{"name":"id","type":"string"},{"name":"amount","type":"double"},{"name":"type","type":"string"}]}',
    consumerGroup: "$Default",
    brokerList: "%BrokerList%",
  },
  handler: kafkaAvroGenericTrigger,
});

動作する TypeScript の例の完全なセットについては、 Kafka 拡張機能リポジトリを参照してください。

function.json ファイルの特定のプロパティは、イベント プロバイダーによって異なります。 これらの例では、イベント プロバイダーは Confluent または Azure Event Hubs です。 次の例は、Kafka メッセージを読み取ってログする関数の Kafka トリガーを示しています。

次の function.json ファイルは、特定のプロバイダーのトリガーを定義します。

{
    "bindings": [
      {
            "type": "kafkaTrigger",
            "name": "kafkaEvent",
            "direction": "in",
            "protocol" : "SASLSSL",
            "password" : "%ConfluentCloudPassword%",
            "dataType" : "string",
            "topic" : "topic",
            "authenticationMode" : "PLAIN",
            "consumerGroup" : "$Default",
            "username" : "%ConfluentCloudUserName%",
            "brokerList" : "%BrokerList%",
            "sslCaLocation": "confluent_cloud_cacert.pem"
        }
    ]
}

関数がトリガーされると、次のコードが実行されます。

using namespace System.Net

param($kafkaEvent, $TriggerMetadata)

Write-Output "Powershell Kafka trigger function called for message $kafkaEvent.Value"

バッチでイベントを受信するには、次の例に示すように、function.json ファイルで cardinality の値を many に設定します。

{
    "bindings": [
      {
            "type": "kafkaTrigger",
            "name": "kafkaEvent",
            "direction": "in",
            "protocol" : "SASLSSL",
            "password" : "%ConfluentCloudPassword%",
            "dataType" : "string",
            "topic" : "topic",
            "authenticationMode" : "PLAIN",
            "cardinality" : "MANY",
            "consumerGroup" : "$Default",
            "username" : "%ConfluentCloudUserName%",
            "brokerList" : "%BrokerList%",
            "sslCaLocation": "confluent_cloud_cacert.pem"
        }
    ]
}

次のコードは、イベントの配列を解析し、イベント データをログに記録します。

using namespace System.Net

param($kafkaEvents, $TriggerMetadata)

$kafkaEvents
foreach ($kafkaEvent in $kafkaEvents) {
    $event = $kafkaEvent | ConvertFrom-Json -AsHashtable
    Write-Output "Powershell Kafka trigger function called for message $event.Value"
}

次のコードは、ヘッダー データをログに記録します。

using namespace System.Net

param($kafkaEvents, $TriggerMetadata)

foreach ($kafkaEvent in $kafkaEvents) {
    $kevent = $kafkaEvent | ConvertFrom-Json -AsHashtable
    Write-Output "Powershell Kafka trigger function called for message $kevent.Value"
    Write-Output "Headers for this message:"
    foreach ($header in $kevent.Headers) {
        $DecodedValue = [System.Text.Encoding]::Unicode.GetString([System.Convert]::FromBase64String($header.Value))
        $Key = $header.Key
        Write-Output "Key: $Key Value: $DecodedValue"
    }
}

トリガーに渡されるイベントに対して汎用の Avro スキーマ を定義できます。 次の function.json では、汎用の Avro スキーマを使用して、特定のプロバイダーのトリガーを定義しています。

{
  "bindings" : [ {
    "type" : "kafkaTrigger",
    "direction" : "in",
    "name" : "kafkaEvent",
    "protocol" : "SASLSSL",
    "password" : "ConfluentCloudPassword",
    "topic" : "topic",
    "authenticationMode" : "PLAIN",
    "avroSchema" : "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
    "consumerGroup" : "$Default",
    "username" : "ConfluentCloudUsername",
    "brokerList" : "%BrokerList%"
  } ]
}

関数がトリガーされると、次のコードが実行されます。

using namespace System.Net

param($kafkaEvent, $TriggerMetadata)

Write-Output "Powershell Kafka trigger function called for message $kafkaEvent.Value"

動作する PowerShell の例の完全なセットについては、Kafka 拡張機能リポジトリを参照してください。

トリガーの使用方法は、Python プログラミング モデルのバージョンによって異なります。

Python v2 モデルでは、デコレーターを使用して関数コードでトリガーを直接定義します。 詳細については、 Azure Functions Python 開発者ガイドを参照してください。

これらの例では、Kafka メッセージを読み取る関数の Kafka トリガーを定義する方法を示します。

@KafkaTrigger.function_name(name="KafkaTrigger")
@KafkaTrigger.kafka_trigger(
    arg_name="kevent",
    topic="KafkaTopic",
    broker_list="KafkaBrokerList",
    username="KafkaUsername",
    password="KafkaPassword",
    protocol="SaslSsl",
    authentication_mode="Plain",
    consumer_group="$Default1")
def kafka_trigger(kevent : func.KafkaEvent):
    logging.info(kevent.get_body().decode('utf-8'))
    logging.info(kevent.metadata)

次の使用例は、 cardinality 値を many に設定して、バッチでイベントを受信します。

@KafkaTrigger.function_name(name="KafkaTriggerMany")
@KafkaTrigger.kafka_trigger(
    arg_name="kevents",
    topic="KafkaTopic",
    broker_list="KafkaBrokerList",
    username="KafkaUsername",
    password="KafkaPassword",
    protocol="SaslSsl",
    authentication_mode="Plain",
    cardinality="MANY",
    data_type="string",
    consumer_group="$Default2")
def kafka_trigger_many(kevents : typing.List[func.KafkaEvent]):
    for event in kevents:
        logging.info(event.get_body())

トリガーに渡されるイベントに対して汎用の Avro スキーマ を定義できます。

@KafkaTriggerAvro.function_name(name="KafkaTriggerAvroOne")
@KafkaTriggerAvro.kafka_trigger(
    arg_name="kafkaTriggerAvroGeneric",
    topic="KafkaTopic",
    broker_list="KafkaBrokerList",
    username="KafkaUsername",
    password="KafkaPassword",
    protocol="SaslSsl",
    authentication_mode="Plain",
    consumer_group="$Default",
    avro_schema= "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}")
def kafka_trigger_avro_one(kafkaTriggerAvroGeneric : func.KafkaEvent):
    logging.info(kafkaTriggerAvroGeneric.get_body().decode('utf-8'))
    logging.info(kafkaTriggerAvroGeneric.metadata)

動作する Python の例の完全なセットについては、Kafka 拡張機能リポジトリを参照してください。

トリガーを構成するために使用する注釈は、個別のイベント プロバイダーによって異なります。

次の例は、Kafka イベントの内容を読み取り、ログする Java 関数を示しています。

@FunctionName("KafkaTrigger")
public void runSingle(
        @KafkaTrigger(
            name = "KafkaTrigger",
            topic = "topic",  
            brokerList="%BrokerList%",
            consumerGroup="$Default", 
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            protocol = BrokerProtocol.SASLSSL,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
            dataType = "string"
         ) String kafkaEventData,
        final ExecutionContext context) {
        context.getLogger().info(kafkaEventData);
}

バッチでイベントを受信するには、次の例に示すように、入力文字列を配列として使用します。

@FunctionName("KafkaTriggerMany")
public void runMany(
        @KafkaTrigger(
            name = "kafkaTriggerMany",
            topic = "topic",  
            brokerList="%BrokerList%",
            consumerGroup="$Default", 
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            protocol = BrokerProtocol.SASLSSL,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
            cardinality = Cardinality.MANY,
            dataType = "string"
         ) String[] kafkaEvents,
        final ExecutionContext context) {
        for (String kevent: kafkaEvents) {
            context.getLogger().info(kevent);
        }    
}

次の関数により、Kafka イベントのメッセージとヘッダーがログされます。

@FunctionName("KafkaTriggerManyWithHeaders")
public void runSingle(
        @KafkaTrigger(
            name = "KafkaTrigger",
            topic = "topic",  
            brokerList="%BrokerList%",
            consumerGroup="$Default", 
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            protocol = BrokerProtocol.SASLSSL,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
            dataType = "string",
            cardinality = Cardinality.MANY
         ) List<String> kafkaEvents,
        final ExecutionContext context) {
            Gson gson = new Gson(); 
            for (String keventstr: kafkaEvents) {
                KafkaEntity kevent = gson.fromJson(keventstr, KafkaEntity.class);
                context.getLogger().info("Java Kafka trigger function called for message: " + kevent.Value);
                context.getLogger().info("Headers for the message:");
                for (KafkaHeaders header : kevent.Headers) {
                    String decodedValue = new String(Base64.getDecoder().decode(header.Value));
                    context.getLogger().info("Key:" + header.Key + " Value:" + decodedValue);                    
                }                
            }
        }

トリガーに渡されるイベントに対して汎用の Avro スキーマ を定義できます。 次の関数では、汎用の Avro スキーマを使用して、特定のプロバイダーのトリガーを定義しています。

private static final String schema = "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}";

@FunctionName("KafkaAvroGenericTrigger")
public void runOne(
        @KafkaTrigger(
                name = "kafkaAvroGenericSingle",
                topic = "topic",
                brokerList="%BrokerList%",
                consumerGroup="$Default",
                username = "ConfluentCloudUsername",
                password = "ConfluentCloudPassword",
                avroSchema = schema,
                authenticationMode = BrokerAuthenticationMode.PLAIN,
                protocol = BrokerProtocol.SASLSSL) Payment payment,
        final ExecutionContext context) {
    context.getLogger().info(payment.toString());
}

Confluent で動作する Java の例の完全なセットについては、Kafka 拡張機能リポジトリを参照してください。

属性

インプロセス分離ワーカー プロセスの C# ライブラリはどちらも、KafkaTriggerAttribute を使用して関数トリガーを定義します。

次の表では、このトリガー属性を使用して設定できるプロパティについて説明します。

パラメーター 説明
BrokerList (必須) トリガーによって監視される Kafka ブローカーの一覧。 詳細については、「接続」を参照してください。
トピック (必須) トリガーによって監視されるトピック。
ConsumerGroup (省略可能) トリガーで使用される Kafka コンシューマー グループ。
AvroSchema (省略可能)Avro プロトコルを使用する場合のメッセージ値の汎用レコードのスキーマ。
KeyAvroSchema (省略可能)Avro プロトコルを使用する場合のメッセージ キーの汎用レコードのスキーマ。
KeyDataType (省略可能)Kafka トピックからメッセージ キーを受け取るデータ型。 KeyAvroSchemaが設定されている場合、この値は汎用レコードです。 使用できる値は、 IntLongString、および Binaryです。
AuthenticationMode (省略可能) 簡易認証およびセキュリティ層 (SASL) 認証を使用する場合の認証モード。 サポートされている値は、 NotSet (既定)、 GssapiPlainScramSha256ScramSha512、および OAuthBearerです。
ユーザー名 (省略可能) SASL 認証のユーザー名。 AuthenticationModeGssapi の場合はサポートされません。 詳細については、「接続」を参照してください。
パスワード (省略可能) SASL 認証のパスワード。 AuthenticationModeGssapi の場合はサポートされません。 詳細については、「接続」を参照してください。
プロトコル (省略可能) ブローカーと通信するときに使用されるセキュリティ プロトコル。 サポートされている値は、 NotSet (既定)、 plaintextsslsasl_plaintextsasl_sslです。
SslCaLocation (省略可能) ブローカーの証明書を検証するための CA 証明書ファイルへのパス。
SslCertificateLocation (省略可能) クライアントの証明書へのパス。
SslKeyLocation (省略可能) 認証に使用されるクライアントの秘密キー (PEM) へのパス。
SslKeyPassword (省略可能) クライアントの証明書のパスワード。
SslCertificatePEM (省略可能)文字列としての PEM 形式のクライアント証明書。 詳細については、「接続」を参照してください。
SslKeyPEM (省略可能)文字列としての PEM 形式のクライアント秘密キー。 詳細については、「接続」を参照してください。
SslCaPEM (省略可能)文字列としての PEM 形式の CA 証明書。 詳細については、「接続」を参照してください。
SslCertificateandKeyPEM (省略可能)PEM 形式のクライアント証明書とキーを文字列として使用します。 詳細については、「接続」を参照してください。
SchemaRegistryUrl (省略可能)Avro スキーマ レジストリの URL。 詳細については、「接続」を参照してください。
SchemaRegistryUsername (省略可能)Avro スキーマ レジストリのユーザー名。 詳細については、「接続」を参照してください。
SchemaRegistryPassword (省略可能)Avro スキーマ レジストリのパスワード。 詳細については、「接続」を参照してください。
OAuthBearerMethod (省略可能)OAuth Bearer メソッド。 使用できる値は、 oidcdefaultです。
OAuthBearerClientId (省略可能) OAuthBearerMethodoidc に設定すると、OAuth ベアラー クライアント ID が指定されます。 詳細については、「接続」を参照してください。
OAuthBearerClientSecret (省略可能) OAuthBearerMethodoidc に設定すると、OAuth ベアラー クライアント シークレットが指定されます。 詳細については、「接続」を参照してください。
OAuthBearerScope (省略可能)ブローカーへのアクセス要求のスコープを指定します。
OAuthBearerTokenEndpointUrl (省略可能)メソッドを使用するときにトークンを取得するために使用される OAuth/OIDC 発行者トークン エンドポイント HTTP(S) URI oidc 使用されます。 詳細については、「接続」を参照してください。
OAuthBearerExtensions (省略可能)メソッドの使用時にブローカーに追加情報として提供されるキーと値のペアのコンマ区切りのリスト oidc 。 たとえば、 supportFeatureX=true,organizationId=sales-emeaと指定します。

注釈

KafkaTrigger注釈を使用すると、トピックを受け取ったときに実行される関数を作成できます。 サポートされるオプションには、次の要素が含まれます。

要素 説明
name (必須) 関数コード内のキューまたはトピック メッセージを表す変数の名前。
brokerList (必須) トリガーによって監視される Kafka ブローカーの一覧。 詳細については、「接続」を参照してください。
topic (必須) トリガーによって監視されるトピック。
cardinality (省略可能) トリガー入力のカーディナリティを示します。 サポートされる値は、ONE (既定値) とMANY です。 入力が 1 つのメッセージである場合は ONE、入力がメッセージの配列である場合は MANY を使用します。 MANY を使用する場合は、dataType も設定する必要があります。
dataType Functions によりパラメーター値が処理される方法を定義します。 既定では、値は文字列として取得され、Functions により文字列を実際の単純な従来の Java オブジェクト (POJO) への逆シリアル化が試みられます。 string の場合、入力は単なる文字列として扱われます。 binary の場合、メッセージはバイナリ データとして受信され、Functions により実際のパラメーター型 byte[] への逆シリアル化が試みられます。
consumerGroup (省略可能) トリガーで使用される Kafka コンシューマー グループ。
avroSchema (省略可能) Avro プロトコルを使用する場合の汎用レコードのスキーマ。
authenticationMode (省略可能) 簡易認証およびセキュリティ層 (SASL) 認証を使用する場合の認証モード。 サポートされている値は、 NotSet (既定)、 GssapiPlainScramSha256ScramSha512です。
username (省略可能) SASL 認証のユーザー名。 AuthenticationModeGssapi の場合はサポートされません。 詳細については、「接続」を参照してください。
password (省略可能) SASL 認証のパスワード。 AuthenticationModeGssapi の場合はサポートされません。 詳細については、「接続」を参照してください。
protocol (省略可能) ブローカーと通信するときに使用されるセキュリティ プロトコル。 サポートされている値は、 NotSet (既定)、 plaintextsslsasl_plaintextsasl_sslです。
sslCaLocation (省略可能) ブローカーの証明書を検証するための CA 証明書ファイルへのパス。
sslCertificateLocation (省略可能) クライアントの証明書へのパス。
sslKeyLocation (省略可能) 認証に使用されるクライアントの秘密キー (PEM) へのパス。
sslKeyPassword (省略可能) クライアントの証明書のパスワード。
lagThreshold (省略可能)トリガーのラグしきい値。
schemaRegistryUrl (省略可能)Avro スキーマ レジストリの URL。 詳細については、「接続」を参照してください。
schemaRegistryUsername (省略可能)Avro スキーマ レジストリのユーザー名。 詳細については、「接続」を参照してください。
schemaRegistryPassword (省略可能)Avro スキーマ レジストリのパスワード。 詳細については、「接続」を参照してください。

構成

次の表は、function.json ファイルで設定したバインド構成のプロパティを説明しています。

function.json のプロパティ 説明
type (必須) kafkaTriggerに設定します。
direction (必須) inに設定します。
name (必須) 関数コード内のブローカー データを表す変数の名前。
brokerList (必須) トリガーによって監視される Kafka ブローカーの一覧。 詳細については、「接続」を参照してください。
topic (必須) トリガーによって監視されるトピック。
cardinality (省略可能) トリガー入力のカーディナリティを示します。 サポートされる値は、ONE (既定値) とMANY です。 入力が 1 つのメッセージである場合は ONE、入力がメッセージの配列である場合は MANY を使用します。 MANY を使用する場合は、dataType も設定する必要があります。
dataType Functions によりパラメーター値が処理される方法を定義します。 既定では、値は文字列として取得され、Functions により文字列を実際の単純な従来の Java オブジェクト (POJO) への逆シリアル化が試みられます。 string の場合、入力は単なる文字列として扱われます。 binaryすると、メッセージはバイナリ データとして受信され、Functions は実際のバイト配列パラメーター型に逆シリアル化しようとします。
consumerGroup (省略可能) トリガーで使用される Kafka コンシューマー グループ。
avroSchema (省略可能) Avro プロトコルを使用する場合の汎用レコードのスキーマ。
keyAvroSchema (省略可能)Avro プロトコルを使用する場合のメッセージ キーの汎用レコードのスキーマ。
keyDataType (省略可能)Kafka トピックからメッセージ キーを受け取るデータ型。 keyAvroSchemaが設定されている場合、この値は汎用レコードです。 使用できる値は、 IntLongString、および Binaryです。
authenticationMode (省略可能) 簡易認証およびセキュリティ層 (SASL) 認証を使用する場合の認証モード。 サポートされている値は、 NotSet (既定)、 GssapiPlainScramSha256ScramSha512です。
username (省略可能) SASL 認証のユーザー名。 AuthenticationModeGssapi の場合はサポートされません。 詳細については、「接続」を参照してください。
password (省略可能) SASL 認証のパスワード。 AuthenticationModeGssapi の場合はサポートされません。 詳細については、「接続」を参照してください。
protocol (省略可能) ブローカーと通信するときに使用されるセキュリティ プロトコル。 サポートされている値は、 NotSet (既定)、 plaintextsslsasl_plaintextsasl_sslです。
sslCaLocation (省略可能) ブローカーの証明書を検証するための CA 証明書ファイルへのパス。
sslCertificateLocation (省略可能) クライアントの証明書へのパス。
sslKeyLocation (省略可能) 認証に使用されるクライアントの秘密キー (PEM) へのパス。
sslKeyPassword (省略可能) クライアントの証明書のパスワード。
sslCertificatePEM (省略可能)文字列としての PEM 形式のクライアント証明書。 詳細については、「接続」を参照してください。
sslKeyPEM (省略可能)文字列としての PEM 形式のクライアント秘密キー。 詳細については、「接続」を参照してください。
sslCaPEM (省略可能)文字列としての PEM 形式の CA 証明書。 詳細については、「接続」を参照してください。
sslCertificateandKeyPEM (省略可能)PEM 形式のクライアント証明書とキーを文字列として使用します。 詳細については、「接続」を参照してください。
lagThreshold (省略可能)トリガーのラグしきい値。
schemaRegistryUrl (省略可能)Avro スキーマ レジストリの URL。 詳細については、「接続」を参照してください。
schemaRegistryUsername (省略可能)Avro スキーマ レジストリのユーザー名。 詳細については、「接続」を参照してください。
schemaRegistryPassword (省略可能)Avro スキーマ レジストリのパスワード。 詳細については、「接続」を参照してください。
oAuthBearerMethod (省略可能)OAuth Bearer メソッド。 使用できる値は、 oidcdefaultです。
oAuthBearerClientId (省略可能) oAuthBearerMethodoidc に設定すると、OAuth ベアラー クライアント ID が指定されます。 詳細については、「接続」を参照してください。
oAuthBearerClientSecret (省略可能) oAuthBearerMethodoidc に設定すると、OAuth ベアラー クライアント シークレットが指定されます。 詳細については、「接続」を参照してください。
oAuthBearerScope (省略可能)ブローカーへのアクセス要求のスコープを指定します。
oAuthBearerTokenEndpointUrl (省略可能)メソッドを使用するときにトークンを取得するために使用される OAuth/OIDC 発行者トークン エンドポイント HTTP(S) URI oidc 使用されます。 詳細については、「接続」を参照してください。

構成

次の表は、function.json ファイルで設定したバインド構成のプロパティを説明しています。 Python では、構成プロパティsnake_case名前付け規則が使用されます。

function.json のプロパティ 説明
type (必須) kafkaTriggerに設定します。
direction (必須) inに設定します。
name (必須) 関数コード内のブローカー データを表す変数の名前。
broker_list (必須) トリガーによって監視される Kafka ブローカーの一覧。 詳細については、「接続」を参照してください。
topic (必須) トリガーによって監視されるトピック。
cardinality (省略可能) トリガー入力のカーディナリティを示します。 サポートされる値は、ONE (既定値) とMANY です。 入力が 1 つのメッセージである場合は ONE、入力がメッセージの配列である場合は MANY を使用します。 MANY を使用する場合は、data_type も設定する必要があります。
data_type Functions によりパラメーター値が処理される方法を定義します。 既定では、値は文字列として取得され、Functions により文字列を実際の単純な従来の Java オブジェクト (POJO) への逆シリアル化が試みられます。 string の場合、入力は単なる文字列として扱われます。 binary の場合、メッセージはバイナリ データとして受信され、Functions により実際のパラメーター型 byte[] への逆シリアル化が試みられます。
consumerGroup (省略可能) トリガーで使用される Kafka コンシューマー グループ。
avroSchema (省略可能) Avro プロトコルを使用する場合の汎用レコードのスキーマ。
authentication_mode (省略可能) 簡易認証およびセキュリティ層 (SASL) 認証を使用する場合の認証モード。 サポートされている値は、 NOTSET (既定)、 GssapiPlainScramSha256ScramSha512です。
username (省略可能) SASL 認証のユーザー名。 authentication_modeGssapi の場合はサポートされません。 詳細については、「接続」を参照してください。
password (省略可能) SASL 認証のパスワード。 authentication_modeGssapi の場合はサポートされません。 詳細については、「接続」を参照してください。
protocol (省略可能) ブローカーと通信するときに使用されるセキュリティ プロトコル。 サポートされている値は、 NOTSET (既定)、 plaintextsslsasl_plaintextsasl_sslです。
sslCaLocation (省略可能) ブローカーの証明書を検証するための CA 証明書ファイルへのパス。
sslCertificateLocation (省略可能) クライアントの証明書へのパス。
sslKeyLocation (省略可能) 認証に使用されるクライアントの秘密キー (PEM) へのパス。
sslKeyPassword (省略可能) クライアントの証明書のパスワード。
lag_threshold (省略可能)トリガーのラグしきい値。
schema_registry_url (省略可能)Avro スキーマ レジストリの URL。 詳細については、「接続」を参照してください。
schema_registry_username (省略可能)Avro スキーマ レジストリのユーザー名。 詳細については、「接続」を参照してください。
schema_registry_password (省略可能)Avro スキーマ レジストリのパスワード。 詳細については、「接続」を参照してください。
o_auth_bearer_method (省略可能)OAuth Bearer メソッド。 使用できる値は、 oidcdefaultです。
o_auth_bearer_client_id (省略可能) o_auth_bearer_methodoidc に設定すると、OAuth ベアラー クライアント ID が指定されます。 詳細については、「接続」を参照してください。
o_auth_bearer_client_secret (省略可能) o_auth_bearer_methodoidc に設定すると、OAuth ベアラー クライアント シークレットが指定されます。 詳細については、「接続」を参照してください。
o_auth_bearer_scope (省略可能)ブローカーへのアクセス要求のスコープを指定します。
o_auth_bearer_token_endpoint_url (省略可能)メソッドを使用するときにトークンを取得するために使用される OAuth/OIDC 発行者トークン エンドポイント HTTP(S) URI oidc 使用されます。 詳細については、「接続」を参照してください。

注意

証明書 PEM 関連のプロパティと Avro キー関連のプロパティは、Python ライブラリではまだ使用できません。

使用方法

Kafka トリガーは現在、JSON ペイロードである文字列および文字列配列として Kafka イベントをサポートしています。

Kafka トリガーは、Kafka メッセージを文字列として関数に渡します。 トリガーでは、JSON ペイロードである文字列配列もサポートされます。

Premium プランでは、Kafka 出力のランタイム スケール監視を有効にして、複数のインスタンスにスケールアウトする必要があります。 詳細については、「ランタイム スケールを有効にする」を参照してください。

Azure portal の [コード + テスト] ページのテスト/実行機能を使用して Kafka トリガーを操作することはできません。 代わりに、トリガーによって監視されているトピックにテスト イベントを直接送信する必要があります。

Kafka トリガーでサポートされている host.json 設定の完全なセットについては、「host.json 設定」を参照してください。

接続

トリガーとバインドに必要なすべての接続情報を、コード内のバインド定義ではなく、アプリケーション設定に格納します。 このガイダンスは資格情報に適用され、コードに保存しないでください。

重要

資格情報の設定はアプリケーション設定を参照する必要があります。 コードや構成のファイル内に資格情報をハードコーディングしないでください。 ローカルで実行する場合は、資格情報に local.settings.json ファイルを使用します。local.settings.json ファイルは公開しないでください。

Azure の Confluent によって提供されるマネージド Kafka クラスターに接続する場合は、次のいずれかの認証方法を使用できます。

注意

Flex 従量課金プランを使用する場合、ファイルの場所ベースの証明書認証プロパティ (SslCaLocationSslCertificateLocationSslKeyLocation) はサポートされていません。 代わりに、PEM ベースの証明書プロパティ (SslCaPEMSslCertificatePEMSslKeyPEMSslCertificateandKeyPEM) を使用するか、Azure Key Vault に証明書を格納します。

スキーマ レジストリ

Kafka 拡張機能で Confluent によって提供されるスキーマ レジストリを使用するには、次の資格情報を設定します。

設定 推奨値 説明
SchemaRegistryUrl SchemaRegistryUrl スキーマ管理に使用されるスキーマ レジストリ サービスの URL。 通常、形式 https://psrc-xyz.us-east-2.aws.confluent.cloud
SchemaRegistryUsername CONFLUENT_API_KEY スキーマ レジストリの基本認証のユーザー名 (必要な場合)。
SchemaRegistryPassword CONFLUENT_API_SECRET スキーマ レジストリでの基本認証のパスワード (必要な場合)。

ユーザー名/パスワード認証

この形式の認証を使用する場合は、 ProtocolSaslPlaintext または SaslSsl のいずれかに設定されていることを確認します。 AuthenticationModePlainScramSha256 、または ScramSha512 に設定され、使用されている CA 証明書が既定の ISRG ルート X1 証明書と異なる場合は、必ず SslCaLocation または SslCaPEMを更新してください。

設定 推奨値 説明
BrokerList BootstrapServer BootstrapServer という名前のアプリ設定には、Confluent Cloud の設定ページで検出されたブートストラップ サーバーの値が含まれています。 値は xyz-xyzxzy.westeurope.azure.confluent.cloud:9092 のようになります。
ユーザー名 ConfluentCloudUsername ConfluentCloudUsername という名前のアプリ設定には、Confluent Cloud Web サイトからの API アクセス キーが含まれています。
パスワード ConfluentCloudPassword ConfluentCloudPassword という名前のアプリ設定には、Confluent Cloud Web サイトから取得した API シークレットが含まれています。
SslCaPEM SSLCaPemCertificate CA 証明書を PEM 形式の文字列として含む SSLCaPemCertificate という名前のアプリ設定。 値は標準の形式 (例: -----BEGIN CERTIFICATE-----\nMII....JQ==\n-----END CERTIFICATE-----) に従う必要があります。

SSL 認証

ProtocolSSL に設定されていることを確認します。

設定 推奨値 説明
BrokerList BootstrapServer BootstrapServer という名前のアプリ設定には、Confluent Cloud の設定ページで検出されたブートストラップ サーバーの値が含まれています。 値は xyz-xyzxzy.westeurope.azure.confluent.cloud:9092 のようになります。
SslCaPEM SslCaCertificatePem CA 証明書の PEM 値を文字列として含む SslCaCertificatePem という名前のアプリ設定。 値は標準の形式に従う必要があります。 -----BEGIN CERTIFICATE-----\nMII...JQ==\n-----END CERTIFICATE-----
SslCertificatePEM SslClientCertificatePem クライアント証明書の PEM 値を文字列として含む SslClientCertificatePem という名前のアプリ設定。 値は標準の形式に従う必要があります。 -----BEGIN CERTIFICATE-----\nMII...JQ==\n-----END CERTIFICATE-----
SslKeyPEM SslClientKeyPem クライアント秘密キーの PEM 値を文字列として含む SslClientKeyPem という名前のアプリ設定。 値は標準の形式に従う必要があります。 -----BEGIN PRIVATE KEY-----\nMII...JQ==\n-----END PRIVATE KEY-----
SslCertificateandKeyPEM SslClientCertificateAndKeyPem クライアント証明書の PEM 値と、文字列として連結されたクライアント秘密キーを含む、 SslClientCertificateAndKeyPem という名前のアプリ設定。 値は標準の形式に従う必要があります。 -----BEGIN CERTIFICATE-----\nMII....JQ==\n-----END CERTIFICATE-----\n-----BEGIN PRIVATE KEY-----\nMIIE....BM=\n-----END PRIVATE KEY-----
SslKeyPassword SslClientKeyPassword 秘密キーのパスワードを含む SslClientKeyPassword という名前のアプリ設定 (存在する場合)。

OAuth 認証

OAuth 認証を使用する場合は、バインド定義で OAuth 関連のプロパティを構成します。

これらの設定に使用する文字列値は、Azure のアプリケーション設定として、またはローカル開発中に Values内の コレクションに存在する必要があります。

また、バインド定義で ProtocolAuthenticationMode も設定する必要があります。

次のステップ