Azure Functions の Apache Kafka トリガーを使用して、Kafka トピックのメッセージに応答して関数コードを実行します。 Kafka 出力バインドを使用して、関数からトピックに書き込むこともできます。 セットアップと構成の詳細については、「Azure Functions における Apache Kafka バインドの概要」を参照してください。
重要
Kafka バインドは、 Flex 従量課金プラン、 Elastic Premium プラン、専用 (App Service) プランの Functions で使用できます。 これらは、Functions ランタイムのバージョン 4.x でのみサポートされています。
例
トリガーの使用方法は、拡張機能パッケージのバージョンと、関数アプリで使用される C# のモダリティによって異なり、次のモードのいずれかになります。
ランタイムとは別のプロセスで実行 される分離ワーカー プロセス クラス ライブラリ を使用するコンパイル済み C# 関数。
使用する属性は、個別のイベント プロバイダーによって異なります。
次の例は、Kafka メッセージを Kafka イベントとして読み取り、ログする C# 関数を示しています。
[Function("KafkaTrigger")]
public static void Run(
[KafkaTrigger("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain,
ConsumerGroup = "$Default")] string eventData, FunctionContext context)
{
var logger = context.GetLogger("KafkaFunction");
logger.LogInformation($"C# Kafka trigger function processed a message: {JObject.Parse(eventData)["Value"]}");
}
バッチでイベントを受信するには、次の例に示すように、文字列配列を入力として使用します。
[Function("KafkaTriggerMany")]
public static void Run(
[KafkaTrigger("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain,
ConsumerGroup = "$Default",
IsBatched = true)] string[] events, FunctionContext context)
{
foreach (var kevent in events)
{
var logger = context.GetLogger("KafkaFunction");
logger.LogInformation($"C# Kafka trigger function processed a message: {JObject.Parse(kevent)["Value"]}");
}
次の関数により、Kafka イベントのメッセージとヘッダーがログされます。
[Function("KafkaTriggerWithHeaders")]
public static void Run(
[KafkaTrigger("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain,
ConsumerGroup = "$Default")] string eventData, FunctionContext context)
{
var eventJsonObject = JObject.Parse(eventData);
var logger = context.GetLogger("KafkaFunction");
logger.LogInformation($"C# Kafka trigger function processed a message: {eventJsonObject["Value"]}");
var headersJArr = eventJsonObject["Headers"] as JArray;
logger.LogInformation("Headers for this event: ");
foreach (JObject header in headersJArr)
{
logger.LogInformation($"{header["Key"]} {System.Text.Encoding.UTF8.GetString((byte[])header["Value"])}");
}
}
動作する .NET の例の完全なセットについては、Kafka 拡張機能リポジトリを参照してください。
トリガーの使用方法は、Node.js プログラミング モデルのバージョンによって異なります。
Node.js v4 モデルでは、関数コードでトリガーを直接定義します。 詳細については、「Azure Functions Node.js 開発者ガイド」を参照してください。
これらの例では、イベント プロバイダーは Confluent または Azure Event Hubs です。 これらの例では、Kafka メッセージを読み取る関数の Kafka トリガーを定義する方法を示します。
const { app } = require("@azure/functions");
async function kafkaTrigger(event, context) {
context.log("Event Offset: " + event.Offset);
context.log("Event Partition: " + event.Partition);
context.log("Event Topic: " + event.Topic);
context.log("Event Timestamp: " + event.Timestamp);
context.log("Event Key: " + event.Key);
context.log("Event Value (as string): " + event.Value);
let event_obj = JSON.parse(event.Value);
context.log("Event Value Object: ");
context.log(" Value.registertime: ", event_obj.registertime.toString());
context.log(" Value.userid: ", event_obj.userid);
context.log(" Value.regionid: ", event_obj.regionid);
context.log(" Value.gender: ", event_obj.gender);
}
app.generic("Kafkatrigger", {
trigger: {
type: "kafkaTrigger",
direction: "in",
name: "event",
topic: "topic",
brokerList: "%BrokerList%",
username: "%ConfluentCloudUserName%",
password: "%ConfluentCloudPassword%",
consumerGroup: "$Default",
protocol: "saslSsl",
authenticationMode: "plain",
dataType: "string"
},
handler: kafkaTrigger,
});
バッチでイベントを受信するには、次の例に示すように、 cardinality 値を many に設定します。
const { app } = require("@azure/functions");
async function kafkaTriggerMany(events, context) {
for (const event of events) {
context.log("Event Offset: " + event.Offset);
context.log("Event Partition: " + event.Partition);
context.log("Event Topic: " + event.Topic);
context.log("Event Key: " + event.Key);
context.log("Event Timestamp: " + event.Timestamp);
context.log("Event Value (as string): " + event.Value);
let event_obj = JSON.parse(event.Value);
context.log("Event Value Object: ");
context.log(" Value.registertime: ", event_obj.registertime.toString());
context.log(" Value.userid: ", event_obj.userid);
context.log(" Value.regionid: ", event_obj.regionid);
context.log(" Value.gender: ", event_obj.gender);
}
}
app.generic("kafkaTriggerMany", {
trigger: {
type: "kafkaTrigger",
direction: "in",
name: "event",
topic: "topic",
brokerList: "%BrokerList%",
username: "%ConfluentCloudUserName%",
password: "%ConfluentCloudPassword%",
consumerGroup: "$Default",
protocol: "saslSsl",
authenticationMode: "plain",
dataType: "string",
cardinality: "MANY"
},
handler: kafkaTriggerMany,
});
トリガーに渡されるイベントに対して汎用の Avro スキーマ を定義できます。 この例では、汎用 Avro スキーマを使用して、特定のプロバイダーのトリガーを定義します。
const { app } = require("@azure/functions");
async function kafkaAvroGenericTrigger(event, context) {
context.log("Processed kafka event: ", event);
if (context.triggerMetadata?.key !== undefined) {
context.log("message key: ", context.triggerMetadata?.key);
}
}
app.generic("kafkaAvroGenericTrigger", {
trigger: {
type: "kafkaTrigger",
direction: "in",
name: "event",
protocol: "SASLSSL",
password: "EventHubConnectionString",
dataType: "string",
topic: "topic",
authenticationMode: "PLAIN",
avroSchema:
'{"type":"record","name":"Payment","namespace":"io.confluent.examples.clients.basicavro","fields":[{"name":"id","type":"string"},{"name":"amount","type":"double"},{"name":"type","type":"string"}]}',
consumerGroup: "$Default",
username: "$ConnectionString",
brokerList: "%BrokerList%",
},
handler: kafkaAvroGenericTrigger,
});
動作する JavaScript の例の完全なセットについては、Kafka 拡張機能リポジトリを参照してください。
import { app, InvocationContext } from "@azure/functions";
// This is a sample interface that describes the actual data in your event.
interface EventData {
registertime: number;
userid: string;
regionid: string;
gender: string;
}
export async function kafkaTrigger(
event: any,
context: InvocationContext
): Promise<void> {
context.log("Event Offset: " + event.Offset);
context.log("Event Partition: " + event.Partition);
context.log("Event Topic: " + event.Topic);
context.log("Event Timestamp: " + event.Timestamp);
context.log("Event Value (as string): " + event.Value);
let event_obj: EventData = JSON.parse(event.Value);
context.log("Event Value Object: ");
context.log(" Value.registertime: ", event_obj.registertime.toString());
context.log(" Value.userid: ", event_obj.userid);
context.log(" Value.regionid: ", event_obj.regionid);
context.log(" Value.gender: ", event_obj.gender);
}
app.generic("Kafkatrigger", {
trigger: {
type: "kafkaTrigger",
direction: "in",
name: "event",
topic: "topic",
brokerList: "%BrokerList%",
username: "%ConfluentCloudUserName%",
password: "%ConfluentCloudPassword%",
consumerGroup: "$Default",
protocol: "saslSsl",
authenticationMode: "plain",
dataType: "string"
},
handler: kafkaTrigger,
});
バッチでイベントを受信するには、次の例に示すように、 cardinality 値を many に設定します。
import { app, InvocationContext } from "@azure/functions";
// This is a sample interface that describes the actual data in your event.
interface EventData {
registertime: number;
userid: string;
regionid: string;
gender: string;
}
interface KafkaEvent {
Offset: number;
Partition: number;
Topic: string;
Timestamp: number;
Value: string;
}
export async function kafkaTriggerMany(
events: any,
context: InvocationContext
): Promise<void> {
for (const event of events) {
context.log("Event Offset: " + event.Offset);
context.log("Event Partition: " + event.Partition);
context.log("Event Topic: " + event.Topic);
context.log("Event Timestamp: " + event.Timestamp);
context.log("Event Value (as string): " + event.Value);
let event_obj: EventData = JSON.parse(event.Value);
context.log("Event Value Object: ");
context.log(" Value.registertime: ", event_obj.registertime.toString());
context.log(" Value.userid: ", event_obj.userid);
context.log(" Value.regionid: ", event_obj.regionid);
context.log(" Value.gender: ", event_obj.gender);
}
}
app.generic("kafkaTriggerMany", {
trigger: {
type: "kafkaTrigger",
direction: "in",
name: "event",
topic: "topic",
brokerList: "%BrokerList%",
username: "%ConfluentCloudUserName%",
password: "%ConfluentCloudPassword%",
consumerGroup: "$Default",
protocol: "saslSsl",
authenticationMode: "plain",
dataType: "string",
cardinality: "MANY"
},
handler: kafkaTriggerMany,
});
トリガーに渡されるイベントに対して汎用の Avro スキーマ を定義できます。 この例では、汎用 Avro スキーマを使用して、特定のプロバイダーのトリガーを定義します。
import { app, InvocationContext } from "@azure/functions";
export async function kafkaAvroGenericTrigger(
event: any,
context: InvocationContext
): Promise<void> {
context.log("Processed kafka event: ", event);
context.log(
`Message ID: ${event.id}, amount: ${event.amount}, type: ${event.type}`
);
if (context.triggerMetadata?.key !== undefined) {
context.log(`Message Key : ${context.triggerMetadata?.key}`);
}
}
app.generic("kafkaAvroGenericTrigger", {
trigger: {
type: "kafkaTrigger",
direction: "in",
name: "event",
protocol: "SASLSSL",
username: "ConfluentCloudUsername",
password: "ConfluentCloudPassword",
dataType: "string",
topic: "topic",
authenticationMode: "PLAIN",
avroSchema:
'{"type":"record","name":"Payment","namespace":"io.confluent.examples.clients.basicavro","fields":[{"name":"id","type":"string"},{"name":"amount","type":"double"},{"name":"type","type":"string"}]}',
consumerGroup: "$Default",
brokerList: "%BrokerList%",
},
handler: kafkaAvroGenericTrigger,
});
動作する TypeScript の例の完全なセットについては、 Kafka 拡張機能リポジトリを参照してください。
function.json ファイルの特定のプロパティは、イベント プロバイダーによって異なります。 これらの例では、イベント プロバイダーは Confluent または Azure Event Hubs です。 次の例は、Kafka メッセージを読み取ってログする関数の Kafka トリガーを示しています。
次の function.json ファイルは、特定のプロバイダーのトリガーを定義します。
{
"bindings": [
{
"type": "kafkaTrigger",
"name": "kafkaEvent",
"direction": "in",
"protocol" : "SASLSSL",
"password" : "%ConfluentCloudPassword%",
"dataType" : "string",
"topic" : "topic",
"authenticationMode" : "PLAIN",
"consumerGroup" : "$Default",
"username" : "%ConfluentCloudUserName%",
"brokerList" : "%BrokerList%",
"sslCaLocation": "confluent_cloud_cacert.pem"
}
]
}
関数がトリガーされると、次のコードが実行されます。
using namespace System.Net
param($kafkaEvent, $TriggerMetadata)
Write-Output "Powershell Kafka trigger function called for message $kafkaEvent.Value"
バッチでイベントを受信するには、次の例に示すように、function.json ファイルで cardinality の値を many に設定します。
{
"bindings": [
{
"type": "kafkaTrigger",
"name": "kafkaEvent",
"direction": "in",
"protocol" : "SASLSSL",
"password" : "%ConfluentCloudPassword%",
"dataType" : "string",
"topic" : "topic",
"authenticationMode" : "PLAIN",
"cardinality" : "MANY",
"consumerGroup" : "$Default",
"username" : "%ConfluentCloudUserName%",
"brokerList" : "%BrokerList%",
"sslCaLocation": "confluent_cloud_cacert.pem"
}
]
}
次のコードは、イベントの配列を解析し、イベント データをログに記録します。
using namespace System.Net
param($kafkaEvents, $TriggerMetadata)
$kafkaEvents
foreach ($kafkaEvent in $kafkaEvents) {
$event = $kafkaEvent | ConvertFrom-Json -AsHashtable
Write-Output "Powershell Kafka trigger function called for message $event.Value"
}
次のコードは、ヘッダー データをログに記録します。
using namespace System.Net
param($kafkaEvents, $TriggerMetadata)
foreach ($kafkaEvent in $kafkaEvents) {
$kevent = $kafkaEvent | ConvertFrom-Json -AsHashtable
Write-Output "Powershell Kafka trigger function called for message $kevent.Value"
Write-Output "Headers for this message:"
foreach ($header in $kevent.Headers) {
$DecodedValue = [System.Text.Encoding]::Unicode.GetString([System.Convert]::FromBase64String($header.Value))
$Key = $header.Key
Write-Output "Key: $Key Value: $DecodedValue"
}
}
トリガーに渡されるイベントに対して汎用の Avro スキーマ を定義できます。 次の function.json では、汎用の Avro スキーマを使用して、特定のプロバイダーのトリガーを定義しています。
{
"bindings" : [ {
"type" : "kafkaTrigger",
"direction" : "in",
"name" : "kafkaEvent",
"protocol" : "SASLSSL",
"password" : "ConfluentCloudPassword",
"topic" : "topic",
"authenticationMode" : "PLAIN",
"avroSchema" : "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
"consumerGroup" : "$Default",
"username" : "ConfluentCloudUsername",
"brokerList" : "%BrokerList%"
} ]
}
関数がトリガーされると、次のコードが実行されます。
using namespace System.Net
param($kafkaEvent, $TriggerMetadata)
Write-Output "Powershell Kafka trigger function called for message $kafkaEvent.Value"
動作する PowerShell の例の完全なセットについては、Kafka 拡張機能リポジトリを参照してください。
トリガーの使用方法は、Python プログラミング モデルのバージョンによって異なります。
Python v2 モデルでは、デコレーターを使用して関数コードでトリガーを直接定義します。 詳細については、 Azure Functions Python 開発者ガイドを参照してください。
これらの例では、Kafka メッセージを読み取る関数の Kafka トリガーを定義する方法を示します。
@KafkaTrigger.function_name(name="KafkaTrigger")
@KafkaTrigger.kafka_trigger(
arg_name="kevent",
topic="KafkaTopic",
broker_list="KafkaBrokerList",
username="KafkaUsername",
password="KafkaPassword",
protocol="SaslSsl",
authentication_mode="Plain",
consumer_group="$Default1")
def kafka_trigger(kevent : func.KafkaEvent):
logging.info(kevent.get_body().decode('utf-8'))
logging.info(kevent.metadata)
次の使用例は、 cardinality 値を many に設定して、バッチでイベントを受信します。
@KafkaTrigger.function_name(name="KafkaTriggerMany")
@KafkaTrigger.kafka_trigger(
arg_name="kevents",
topic="KafkaTopic",
broker_list="KafkaBrokerList",
username="KafkaUsername",
password="KafkaPassword",
protocol="SaslSsl",
authentication_mode="Plain",
cardinality="MANY",
data_type="string",
consumer_group="$Default2")
def kafka_trigger_many(kevents : typing.List[func.KafkaEvent]):
for event in kevents:
logging.info(event.get_body())
トリガーに渡されるイベントに対して汎用の Avro スキーマ を定義できます。
@KafkaTriggerAvro.function_name(name="KafkaTriggerAvroOne")
@KafkaTriggerAvro.kafka_trigger(
arg_name="kafkaTriggerAvroGeneric",
topic="KafkaTopic",
broker_list="KafkaBrokerList",
username="KafkaUsername",
password="KafkaPassword",
protocol="SaslSsl",
authentication_mode="Plain",
consumer_group="$Default",
avro_schema= "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}")
def kafka_trigger_avro_one(kafkaTriggerAvroGeneric : func.KafkaEvent):
logging.info(kafkaTriggerAvroGeneric.get_body().decode('utf-8'))
logging.info(kafkaTriggerAvroGeneric.metadata)
動作する Python の例の完全なセットについては、Kafka 拡張機能リポジトリを参照してください。
トリガーを構成するために使用する注釈は、個別のイベント プロバイダーによって異なります。
次の例は、Kafka イベントの内容を読み取り、ログする Java 関数を示しています。
@FunctionName("KafkaTrigger")
public void runSingle(
@KafkaTrigger(
name = "KafkaTrigger",
topic = "topic",
brokerList="%BrokerList%",
consumerGroup="$Default",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
protocol = BrokerProtocol.SASLSSL,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
dataType = "string"
) String kafkaEventData,
final ExecutionContext context) {
context.getLogger().info(kafkaEventData);
}
バッチでイベントを受信するには、次の例に示すように、入力文字列を配列として使用します。
@FunctionName("KafkaTriggerMany")
public void runMany(
@KafkaTrigger(
name = "kafkaTriggerMany",
topic = "topic",
brokerList="%BrokerList%",
consumerGroup="$Default",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
protocol = BrokerProtocol.SASLSSL,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
cardinality = Cardinality.MANY,
dataType = "string"
) String[] kafkaEvents,
final ExecutionContext context) {
for (String kevent: kafkaEvents) {
context.getLogger().info(kevent);
}
}
次の関数により、Kafka イベントのメッセージとヘッダーがログされます。
@FunctionName("KafkaTriggerManyWithHeaders")
public void runSingle(
@KafkaTrigger(
name = "KafkaTrigger",
topic = "topic",
brokerList="%BrokerList%",
consumerGroup="$Default",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
protocol = BrokerProtocol.SASLSSL,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
dataType = "string",
cardinality = Cardinality.MANY
) List<String> kafkaEvents,
final ExecutionContext context) {
Gson gson = new Gson();
for (String keventstr: kafkaEvents) {
KafkaEntity kevent = gson.fromJson(keventstr, KafkaEntity.class);
context.getLogger().info("Java Kafka trigger function called for message: " + kevent.Value);
context.getLogger().info("Headers for the message:");
for (KafkaHeaders header : kevent.Headers) {
String decodedValue = new String(Base64.getDecoder().decode(header.Value));
context.getLogger().info("Key:" + header.Key + " Value:" + decodedValue);
}
}
}
トリガーに渡されるイベントに対して汎用の Avro スキーマ を定義できます。 次の関数では、汎用の Avro スキーマを使用して、特定のプロバイダーのトリガーを定義しています。
private static final String schema = "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}";
@FunctionName("KafkaAvroGenericTrigger")
public void runOne(
@KafkaTrigger(
name = "kafkaAvroGenericSingle",
topic = "topic",
brokerList="%BrokerList%",
consumerGroup="$Default",
username = "ConfluentCloudUsername",
password = "ConfluentCloudPassword",
avroSchema = schema,
authenticationMode = BrokerAuthenticationMode.PLAIN,
protocol = BrokerProtocol.SASLSSL) Payment payment,
final ExecutionContext context) {
context.getLogger().info(payment.toString());
}
Confluent で動作する Java の例の完全なセットについては、Kafka 拡張機能リポジトリを参照してください。
属性
インプロセスと分離ワーカー プロセスの C# ライブラリはどちらも、KafkaTriggerAttribute を使用して関数トリガーを定義します。
次の表では、このトリガー属性を使用して設定できるプロパティについて説明します。
| パラメーター | 説明 |
|---|---|
| BrokerList | (必須) トリガーによって監視される Kafka ブローカーの一覧。 詳細については、「接続」を参照してください。 |
| トピック | (必須) トリガーによって監視されるトピック。 |
| ConsumerGroup | (省略可能) トリガーで使用される Kafka コンシューマー グループ。 |
| AvroSchema | (省略可能)Avro プロトコルを使用する場合のメッセージ値の汎用レコードのスキーマ。 |
| KeyAvroSchema | (省略可能)Avro プロトコルを使用する場合のメッセージ キーの汎用レコードのスキーマ。 |
| KeyDataType | (省略可能)Kafka トピックからメッセージ キーを受け取るデータ型。
KeyAvroSchemaが設定されている場合、この値は汎用レコードです。 使用できる値は、 Int、 Long、 String、および Binaryです。 |
| AuthenticationMode | (省略可能) 簡易認証およびセキュリティ層 (SASL) 認証を使用する場合の認証モード。 サポートされている値は、 NotSet (既定)、 Gssapi、 Plain、 ScramSha256、 ScramSha512、および OAuthBearerです。 |
| ユーザー名 | (省略可能) SASL 認証のユーザー名。
AuthenticationMode が Gssapi の場合はサポートされません。 詳細については、「接続」を参照してください。 |
| パスワード | (省略可能) SASL 認証のパスワード。
AuthenticationMode が Gssapi の場合はサポートされません。 詳細については、「接続」を参照してください。 |
| プロトコル | (省略可能) ブローカーと通信するときに使用されるセキュリティ プロトコル。 サポートされている値は、 NotSet (既定)、 plaintext、 ssl、 sasl_plaintext、 sasl_sslです。 |
| SslCaLocation | (省略可能) ブローカーの証明書を検証するための CA 証明書ファイルへのパス。 |
| SslCertificateLocation | (省略可能) クライアントの証明書へのパス。 |
| SslKeyLocation | (省略可能) 認証に使用されるクライアントの秘密キー (PEM) へのパス。 |
| SslKeyPassword | (省略可能) クライアントの証明書のパスワード。 |
| SslCertificatePEM | (省略可能)文字列としての PEM 形式のクライアント証明書。 詳細については、「接続」を参照してください。 |
| SslKeyPEM | (省略可能)文字列としての PEM 形式のクライアント秘密キー。 詳細については、「接続」を参照してください。 |
| SslCaPEM | (省略可能)文字列としての PEM 形式の CA 証明書。 詳細については、「接続」を参照してください。 |
| SslCertificateandKeyPEM | (省略可能)PEM 形式のクライアント証明書とキーを文字列として使用します。 詳細については、「接続」を参照してください。 |
| SchemaRegistryUrl | (省略可能)Avro スキーマ レジストリの URL。 詳細については、「接続」を参照してください。 |
| SchemaRegistryUsername | (省略可能)Avro スキーマ レジストリのユーザー名。 詳細については、「接続」を参照してください。 |
| SchemaRegistryPassword | (省略可能)Avro スキーマ レジストリのパスワード。 詳細については、「接続」を参照してください。 |
| OAuthBearerMethod | (省略可能)OAuth Bearer メソッド。 使用できる値は、 oidc と defaultです。 |
| OAuthBearerClientId | (省略可能) OAuthBearerMethod を oidc に設定すると、OAuth ベアラー クライアント ID が指定されます。 詳細については、「接続」を参照してください。 |
| OAuthBearerClientSecret | (省略可能) OAuthBearerMethod を oidc に設定すると、OAuth ベアラー クライアント シークレットが指定されます。 詳細については、「接続」を参照してください。 |
| OAuthBearerScope | (省略可能)ブローカーへのアクセス要求のスコープを指定します。 |
| OAuthBearerTokenEndpointUrl | (省略可能)メソッドを使用するときにトークンを取得するために使用される OAuth/OIDC 発行者トークン エンドポイント HTTP(S) URI oidc 使用されます。 詳細については、「接続」を参照してください。 |
| OAuthBearerExtensions | (省略可能)メソッドの使用時にブローカーに追加情報として提供されるキーと値のペアのコンマ区切りのリスト oidc 。 たとえば、 supportFeatureX=true,organizationId=sales-emeaと指定します。 |
注釈
KafkaTrigger注釈を使用すると、トピックを受け取ったときに実行される関数を作成できます。 サポートされるオプションには、次の要素が含まれます。
| 要素 | 説明 |
|---|---|
| name | (必須) 関数コード内のキューまたはトピック メッセージを表す変数の名前。 |
| brokerList | (必須) トリガーによって監視される Kafka ブローカーの一覧。 詳細については、「接続」を参照してください。 |
| topic | (必須) トリガーによって監視されるトピック。 |
| cardinality | (省略可能) トリガー入力のカーディナリティを示します。 サポートされる値は、ONE (既定値) とMANY です。 入力が 1 つのメッセージである場合は ONE、入力がメッセージの配列である場合は MANY を使用します。
MANY を使用する場合は、dataType も設定する必要があります。 |
| dataType | Functions によりパラメーター値が処理される方法を定義します。 既定では、値は文字列として取得され、Functions により文字列を実際の単純な従来の Java オブジェクト (POJO) への逆シリアル化が試みられます。
string の場合、入力は単なる文字列として扱われます。
binary の場合、メッセージはバイナリ データとして受信され、Functions により実際のパラメーター型 byte[] への逆シリアル化が試みられます。 |
| consumerGroup | (省略可能) トリガーで使用される Kafka コンシューマー グループ。 |
| avroSchema | (省略可能) Avro プロトコルを使用する場合の汎用レコードのスキーマ。 |
| authenticationMode | (省略可能) 簡易認証およびセキュリティ層 (SASL) 認証を使用する場合の認証モード。 サポートされている値は、 NotSet (既定)、 Gssapi、 Plain、 ScramSha256、 ScramSha512です。 |
| username | (省略可能) SASL 認証のユーザー名。
AuthenticationMode が Gssapi の場合はサポートされません。 詳細については、「接続」を参照してください。 |
| password | (省略可能) SASL 認証のパスワード。
AuthenticationMode が Gssapi の場合はサポートされません。 詳細については、「接続」を参照してください。 |
| protocol | (省略可能) ブローカーと通信するときに使用されるセキュリティ プロトコル。 サポートされている値は、 NotSet (既定)、 plaintext、 ssl、 sasl_plaintext、 sasl_sslです。 |
| sslCaLocation | (省略可能) ブローカーの証明書を検証するための CA 証明書ファイルへのパス。 |
| sslCertificateLocation | (省略可能) クライアントの証明書へのパス。 |
| sslKeyLocation | (省略可能) 認証に使用されるクライアントの秘密キー (PEM) へのパス。 |
| sslKeyPassword | (省略可能) クライアントの証明書のパスワード。 |
| lagThreshold | (省略可能)トリガーのラグしきい値。 |
| schemaRegistryUrl | (省略可能)Avro スキーマ レジストリの URL。 詳細については、「接続」を参照してください。 |
| schemaRegistryUsername | (省略可能)Avro スキーマ レジストリのユーザー名。 詳細については、「接続」を参照してください。 |
| schemaRegistryPassword | (省略可能)Avro スキーマ レジストリのパスワード。 詳細については、「接続」を参照してください。 |
構成
次の表は、function.json ファイルで設定したバインド構成のプロパティを説明しています。
| function.json のプロパティ | 説明 |
|---|---|
| type | (必須) kafkaTriggerに設定します。 |
| direction | (必須) inに設定します。 |
| name | (必須) 関数コード内のブローカー データを表す変数の名前。 |
| brokerList | (必須) トリガーによって監視される Kafka ブローカーの一覧。 詳細については、「接続」を参照してください。 |
| topic | (必須) トリガーによって監視されるトピック。 |
| cardinality | (省略可能) トリガー入力のカーディナリティを示します。 サポートされる値は、ONE (既定値) とMANY です。 入力が 1 つのメッセージである場合は ONE、入力がメッセージの配列である場合は MANY を使用します。
MANY を使用する場合は、dataType も設定する必要があります。 |
| dataType | Functions によりパラメーター値が処理される方法を定義します。 既定では、値は文字列として取得され、Functions により文字列を実際の単純な従来の Java オブジェクト (POJO) への逆シリアル化が試みられます。
string の場合、入力は単なる文字列として扱われます。
binaryすると、メッセージはバイナリ データとして受信され、Functions は実際のバイト配列パラメーター型に逆シリアル化しようとします。 |
| consumerGroup | (省略可能) トリガーで使用される Kafka コンシューマー グループ。 |
| avroSchema | (省略可能) Avro プロトコルを使用する場合の汎用レコードのスキーマ。 |
| keyAvroSchema | (省略可能)Avro プロトコルを使用する場合のメッセージ キーの汎用レコードのスキーマ。 |
| keyDataType | (省略可能)Kafka トピックからメッセージ キーを受け取るデータ型。
keyAvroSchemaが設定されている場合、この値は汎用レコードです。 使用できる値は、 Int、 Long、 String、および Binaryです。 |
| authenticationMode | (省略可能) 簡易認証およびセキュリティ層 (SASL) 認証を使用する場合の認証モード。 サポートされている値は、 NotSet (既定)、 Gssapi、 Plain、 ScramSha256、 ScramSha512です。 |
| username | (省略可能) SASL 認証のユーザー名。
AuthenticationMode が Gssapi の場合はサポートされません。 詳細については、「接続」を参照してください。 |
| password | (省略可能) SASL 認証のパスワード。
AuthenticationMode が Gssapi の場合はサポートされません。 詳細については、「接続」を参照してください。 |
| protocol | (省略可能) ブローカーと通信するときに使用されるセキュリティ プロトコル。 サポートされている値は、 NotSet (既定)、 plaintext、 ssl、 sasl_plaintext、 sasl_sslです。 |
| sslCaLocation | (省略可能) ブローカーの証明書を検証するための CA 証明書ファイルへのパス。 |
| sslCertificateLocation | (省略可能) クライアントの証明書へのパス。 |
| sslKeyLocation | (省略可能) 認証に使用されるクライアントの秘密キー (PEM) へのパス。 |
| sslKeyPassword | (省略可能) クライアントの証明書のパスワード。 |
| sslCertificatePEM | (省略可能)文字列としての PEM 形式のクライアント証明書。 詳細については、「接続」を参照してください。 |
| sslKeyPEM | (省略可能)文字列としての PEM 形式のクライアント秘密キー。 詳細については、「接続」を参照してください。 |
| sslCaPEM | (省略可能)文字列としての PEM 形式の CA 証明書。 詳細については、「接続」を参照してください。 |
| sslCertificateandKeyPEM | (省略可能)PEM 形式のクライアント証明書とキーを文字列として使用します。 詳細については、「接続」を参照してください。 |
| lagThreshold | (省略可能)トリガーのラグしきい値。 |
| schemaRegistryUrl | (省略可能)Avro スキーマ レジストリの URL。 詳細については、「接続」を参照してください。 |
| schemaRegistryUsername | (省略可能)Avro スキーマ レジストリのユーザー名。 詳細については、「接続」を参照してください。 |
| schemaRegistryPassword | (省略可能)Avro スキーマ レジストリのパスワード。 詳細については、「接続」を参照してください。 |
| oAuthBearerMethod | (省略可能)OAuth Bearer メソッド。 使用できる値は、 oidc と defaultです。 |
| oAuthBearerClientId | (省略可能) oAuthBearerMethod を oidc に設定すると、OAuth ベアラー クライアント ID が指定されます。 詳細については、「接続」を参照してください。 |
| oAuthBearerClientSecret | (省略可能) oAuthBearerMethod を oidc に設定すると、OAuth ベアラー クライアント シークレットが指定されます。 詳細については、「接続」を参照してください。 |
| oAuthBearerScope | (省略可能)ブローカーへのアクセス要求のスコープを指定します。 |
| oAuthBearerTokenEndpointUrl | (省略可能)メソッドを使用するときにトークンを取得するために使用される OAuth/OIDC 発行者トークン エンドポイント HTTP(S) URI oidc 使用されます。 詳細については、「接続」を参照してください。 |
構成
次の表は、function.json ファイルで設定したバインド構成のプロパティを説明しています。 Python では、構成プロパティsnake_case名前付け規則が使用されます。
| function.json のプロパティ | 説明 |
|---|---|
| type | (必須) kafkaTriggerに設定します。 |
| direction | (必須) inに設定します。 |
| name | (必須) 関数コード内のブローカー データを表す変数の名前。 |
| broker_list | (必須) トリガーによって監視される Kafka ブローカーの一覧。 詳細については、「接続」を参照してください。 |
| topic | (必須) トリガーによって監視されるトピック。 |
| cardinality | (省略可能) トリガー入力のカーディナリティを示します。 サポートされる値は、ONE (既定値) とMANY です。 入力が 1 つのメッセージである場合は ONE、入力がメッセージの配列である場合は MANY を使用します。
MANY を使用する場合は、data_type も設定する必要があります。 |
| data_type | Functions によりパラメーター値が処理される方法を定義します。 既定では、値は文字列として取得され、Functions により文字列を実際の単純な従来の Java オブジェクト (POJO) への逆シリアル化が試みられます。
string の場合、入力は単なる文字列として扱われます。
binary の場合、メッセージはバイナリ データとして受信され、Functions により実際のパラメーター型 byte[] への逆シリアル化が試みられます。 |
| consumerGroup | (省略可能) トリガーで使用される Kafka コンシューマー グループ。 |
| avroSchema | (省略可能) Avro プロトコルを使用する場合の汎用レコードのスキーマ。 |
| authentication_mode | (省略可能) 簡易認証およびセキュリティ層 (SASL) 認証を使用する場合の認証モード。 サポートされている値は、 NOTSET (既定)、 Gssapi、 Plain、 ScramSha256、 ScramSha512です。 |
| username | (省略可能) SASL 認証のユーザー名。
authentication_mode が Gssapi の場合はサポートされません。 詳細については、「接続」を参照してください。 |
| password | (省略可能) SASL 認証のパスワード。
authentication_mode が Gssapi の場合はサポートされません。 詳細については、「接続」を参照してください。 |
| protocol | (省略可能) ブローカーと通信するときに使用されるセキュリティ プロトコル。 サポートされている値は、 NOTSET (既定)、 plaintext、 ssl、 sasl_plaintext、 sasl_sslです。 |
| sslCaLocation | (省略可能) ブローカーの証明書を検証するための CA 証明書ファイルへのパス。 |
| sslCertificateLocation | (省略可能) クライアントの証明書へのパス。 |
| sslKeyLocation | (省略可能) 認証に使用されるクライアントの秘密キー (PEM) へのパス。 |
| sslKeyPassword | (省略可能) クライアントの証明書のパスワード。 |
| lag_threshold | (省略可能)トリガーのラグしきい値。 |
| schema_registry_url | (省略可能)Avro スキーマ レジストリの URL。 詳細については、「接続」を参照してください。 |
| schema_registry_username | (省略可能)Avro スキーマ レジストリのユーザー名。 詳細については、「接続」を参照してください。 |
| schema_registry_password | (省略可能)Avro スキーマ レジストリのパスワード。 詳細については、「接続」を参照してください。 |
| o_auth_bearer_method | (省略可能)OAuth Bearer メソッド。 使用できる値は、 oidc と defaultです。 |
| o_auth_bearer_client_id | (省略可能) o_auth_bearer_method を oidc に設定すると、OAuth ベアラー クライアント ID が指定されます。 詳細については、「接続」を参照してください。 |
| o_auth_bearer_client_secret | (省略可能) o_auth_bearer_method を oidc に設定すると、OAuth ベアラー クライアント シークレットが指定されます。 詳細については、「接続」を参照してください。 |
| o_auth_bearer_scope | (省略可能)ブローカーへのアクセス要求のスコープを指定します。 |
| o_auth_bearer_token_endpoint_url | (省略可能)メソッドを使用するときにトークンを取得するために使用される OAuth/OIDC 発行者トークン エンドポイント HTTP(S) URI oidc 使用されます。 詳細については、「接続」を参照してください。 |
注意
証明書 PEM 関連のプロパティと Avro キー関連のプロパティは、Python ライブラリではまだ使用できません。
使用方法
Kafka トリガーは現在、JSON ペイロードである文字列および文字列配列として Kafka イベントをサポートしています。
Kafka トリガーは、Kafka メッセージを文字列として関数に渡します。 トリガーでは、JSON ペイロードである文字列配列もサポートされます。
Premium プランでは、Kafka 出力のランタイム スケール監視を有効にして、複数のインスタンスにスケールアウトする必要があります。 詳細については、「ランタイム スケールを有効にする」を参照してください。
Azure portal の [コード + テスト] ページのテスト/実行機能を使用して Kafka トリガーを操作することはできません。 代わりに、トリガーによって監視されているトピックにテスト イベントを直接送信する必要があります。
Kafka トリガーでサポートされている host.json 設定の完全なセットについては、「host.json 設定」を参照してください。
接続
トリガーとバインドに必要なすべての接続情報を、コード内のバインド定義ではなく、アプリケーション設定に格納します。 このガイダンスは資格情報に適用され、コードに保存しないでください。
重要
資格情報の設定はアプリケーション設定を参照する必要があります。 コードや構成のファイル内に資格情報をハードコーディングしないでください。 ローカルで実行する場合は、資格情報に local.settings.json ファイルを使用します。local.settings.json ファイルは公開しないでください。
Azure の Confluent によって提供されるマネージド Kafka クラスターに接続する場合は、次のいずれかの認証方法を使用できます。
注意
Flex 従量課金プランを使用する場合、ファイルの場所ベースの証明書認証プロパティ (SslCaLocation、 SslCertificateLocation、 SslKeyLocation) はサポートされていません。 代わりに、PEM ベースの証明書プロパティ (SslCaPEM、 SslCertificatePEM、 SslKeyPEM、 SslCertificateandKeyPEM) を使用するか、Azure Key Vault に証明書を格納します。
スキーマ レジストリ
Kafka 拡張機能で Confluent によって提供されるスキーマ レジストリを使用するには、次の資格情報を設定します。
| 設定 | 推奨値 | 説明 |
|---|---|---|
| SchemaRegistryUrl | SchemaRegistryUrl |
スキーマ管理に使用されるスキーマ レジストリ サービスの URL。 通常、形式 https://psrc-xyz.us-east-2.aws.confluent.cloud |
| SchemaRegistryUsername | CONFLUENT_API_KEY |
スキーマ レジストリの基本認証のユーザー名 (必要な場合)。 |
| SchemaRegistryPassword | CONFLUENT_API_SECRET |
スキーマ レジストリでの基本認証のパスワード (必要な場合)。 |
ユーザー名/パスワード認証
この形式の認証を使用する場合は、 Protocol が SaslPlaintext または SaslSsl のいずれかに設定されていることを確認します。 AuthenticationMode は Plain、 ScramSha256 、または ScramSha512 に設定され、使用されている CA 証明書が既定の ISRG ルート X1 証明書と異なる場合は、必ず SslCaLocation または SslCaPEMを更新してください。
| 設定 | 推奨値 | 説明 |
|---|---|---|
| BrokerList | BootstrapServer |
BootstrapServer という名前のアプリ設定には、Confluent Cloud の設定ページで検出されたブートストラップ サーバーの値が含まれています。 値は xyz-xyzxzy.westeurope.azure.confluent.cloud:9092 のようになります。 |
| ユーザー名 | ConfluentCloudUsername |
ConfluentCloudUsername という名前のアプリ設定には、Confluent Cloud Web サイトからの API アクセス キーが含まれています。 |
| パスワード | ConfluentCloudPassword |
ConfluentCloudPassword という名前のアプリ設定には、Confluent Cloud Web サイトから取得した API シークレットが含まれています。 |
| SslCaPEM | SSLCaPemCertificate |
CA 証明書を PEM 形式の文字列として含む SSLCaPemCertificate という名前のアプリ設定。 値は標準の形式 (例: -----BEGIN CERTIFICATE-----\nMII....JQ==\n-----END CERTIFICATE-----) に従う必要があります。 |
SSL 認証
Protocolが SSL に設定されていることを確認します。
| 設定 | 推奨値 | 説明 |
|---|---|---|
| BrokerList | BootstrapServer |
BootstrapServer という名前のアプリ設定には、Confluent Cloud の設定ページで検出されたブートストラップ サーバーの値が含まれています。 値は xyz-xyzxzy.westeurope.azure.confluent.cloud:9092 のようになります。 |
| SslCaPEM | SslCaCertificatePem |
CA 証明書の PEM 値を文字列として含む SslCaCertificatePem という名前のアプリ設定。 値は標準の形式に従う必要があります。 -----BEGIN CERTIFICATE-----\nMII...JQ==\n-----END CERTIFICATE----- |
| SslCertificatePEM | SslClientCertificatePem |
クライアント証明書の PEM 値を文字列として含む SslClientCertificatePem という名前のアプリ設定。 値は標準の形式に従う必要があります。 -----BEGIN CERTIFICATE-----\nMII...JQ==\n-----END CERTIFICATE----- |
| SslKeyPEM | SslClientKeyPem |
クライアント秘密キーの PEM 値を文字列として含む SslClientKeyPem という名前のアプリ設定。 値は標準の形式に従う必要があります。 -----BEGIN PRIVATE KEY-----\nMII...JQ==\n-----END PRIVATE KEY----- |
| SslCertificateandKeyPEM | SslClientCertificateAndKeyPem |
クライアント証明書の PEM 値と、文字列として連結されたクライアント秘密キーを含む、 SslClientCertificateAndKeyPem という名前のアプリ設定。 値は標準の形式に従う必要があります。 -----BEGIN CERTIFICATE-----\nMII....JQ==\n-----END CERTIFICATE-----\n-----BEGIN PRIVATE KEY-----\nMIIE....BM=\n-----END PRIVATE KEY----- |
| SslKeyPassword | SslClientKeyPassword |
秘密キーのパスワードを含む SslClientKeyPassword という名前のアプリ設定 (存在する場合)。 |
OAuth 認証
OAuth 認証を使用する場合は、バインド定義で OAuth 関連のプロパティを構成します。
これらの設定に使用する文字列値は、Azure のアプリケーション設定として、またはローカル開発中に Values内の コレクションに存在する必要があります。
また、バインド定義で Protocol と AuthenticationMode も設定する必要があります。