次の方法で共有


Azure Functions の Apache Kafka 出力バインド

出力バインドを使用すると、Azure Functions アプリは Kafka トピックにメッセージを送信できます。

重要

Kafka バインドは、 Flex 従量課金プランElastic Premium プラン、専用 (App Service) プランの Functions で使用できます。 これらは、Functions ランタイムのバージョン 4.x でのみサポートされています。

バインドの使用方法は、関数アプリの C# モダリティによって異なります。 次のいずれかのモダリティを使用できます。

ランタイムとは別のプロセスで実行 される分離ワーカー プロセス クラス ライブラリ を使用するコンパイル済み C# 関数。

使用する属性は、個別のイベント プロバイダーによって異なります。

次の例では、HTTP 応答と Kafka 出力で構成される MultipleOutputType という名前のカスタム戻り値の型を使用します。

[Function("KafkaOutput")]

public static MultipleOutputType Output(
    [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
    FunctionContext executionContext)
{
    var log = executionContext.GetLogger("HttpFunction");
    log.LogInformation("C# HTTP trigger function processed a request.");

    string message = req.FunctionContext
                        .BindingContext
                        .BindingData["message"]
                        .ToString();

    var response = req.CreateResponse(HttpStatusCode.OK);
    return new MultipleOutputType()
    {
        Kevent = message,
        HttpResponse = response
    };
}

MultipleOutputType クラスでは、Keventは Kafka バインドの出力バインド変数です。

public class MultipleOutputType
{
    [KafkaOutput("BrokerList",
                "topic",
                Username = "ConfluentCloudUserName",
                Password = "ConfluentCloudPassword",
        Protocol = BrokerProtocol.SaslSsl,
        AuthenticationMode = BrokerAuthenticationMode.Plain
    )]        
    public string Kevent { get; set; }

    public HttpResponseData HttpResponse { get; set; }
}

イベントのバッチを送信するには、次の例に示すように、出力の種類に文字列配列を渡します。

[Function("KafkaOutputMany")]

public static MultipleOutputTypeForBatch Output(
    [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
    FunctionContext executionContext)
{
    var log = executionContext.GetLogger("HttpFunction");
    log.LogInformation("C# HTTP trigger function processed a request.");
    var response = req.CreateResponse(HttpStatusCode.OK);

    string[] messages = new string[2];
    messages[0] = "one";
    messages[1] = "two";

    return new MultipleOutputTypeForBatch()
    {
        Kevents = messages,
        HttpResponse = response
    };
}

文字列配列はクラスの Kevents プロパティとして定義され、出力バインドはこのプロパティで定義されます。

public class MultipleOutputTypeForBatch
{
    [KafkaOutput("BrokerList",
                 "topic",
                 Username = "ConfluentCloudUserName",
                 Password = "ConfluentCloudPassword",
        Protocol = BrokerProtocol.SaslSsl,
        AuthenticationMode = BrokerAuthenticationMode.Plain
    )]        
    public string[] Kevents { get; set; }

    public HttpResponseData HttpResponse { get; set; }
}

次の関数は、Kafka 出力データにヘッダーを追加します。

[Function("KafkaOutputWithHeaders")]

public static MultipleOutputType Output(
    [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
    FunctionContext executionContext)
{
    var log = executionContext.GetLogger("HttpFunction");
    log.LogInformation("C# HTTP trigger function processed a request.");

    string message = req.FunctionContext
                        .BindingContext
                        .BindingData["message"]
                        .ToString();
    string kevent = "{ \"Offset\":364,\"Partition\":0,\"Topic\":\"kafkaeventhubtest1\",\"Timestamp\":\"2022-04-09T03:20:06.591Z\", \"Value\": \"" + message + "\", \"Headers\": [{ \"Key\": \"test\", \"Value\": \"dotnet-isolated\" }] }";
    var response = req.CreateResponse(HttpStatusCode.OK);
    return new MultipleOutputType()
    {
        Kevent = kevent,
        HttpResponse = response
    };
}

動作する .NET の例の完全なセットについては、Kafka 拡張機能リポジトリを参照してください。

出力バインドの使用方法は、Node.js プログラミング モデルのバージョンによって異なります。

Node.js v4 モデルでは、関数コードで出力バインドを直接定義します。 詳細については、「Azure Functions Node.js 開発者ガイド」を参照してください。

これらの例では、イベント プロバイダーは Confluent または Azure Event Hubs です。 これらの例は、HTTP 要求がトリガーして要求から Kafka トピックにデータを送信する関数の Kafka 出力バインドを示しています。

import {
  app,
  HttpRequest,
  HttpResponseInit,
  InvocationContext,
  output,
} from "@azure/functions";

const kafkaOutput = output.generic({
  type: "kafka",
  direction: "out",
  topic: "topic",
  brokerList: "%BrokerList%",
  username: "ConfluentCloudUsername",
  password: "ConfluentCloudPassword",
  protocol: "saslSsl",
  authenticationMode: "plain",
});

export async function kafkaOutputWithHttp(
  request: HttpRequest,
  context: InvocationContext
): Promise<HttpResponseInit> {
  context.log(`Http function processed request for url "${request.url}"`);

  const body = await request.text();
  const queryName = request.query.get("name");
  const parsedbody = JSON.parse(body);
  const name = queryName || parsedbody.name || "world";
  context.extraOutputs.set(kafkaOutput, `Hello, ${parsedbody.name}!`);
  context.log(
    `Sending message to kafka: ${context.extraOutputs.get(kafkaOutput)}`
  );
  return {
    body: `Message sent to kafka with value: ${context.extraOutputs.get(
      kafkaOutput
    )}`,
    status: 200,
  };
}

const extraOutputs = [];
extraOutputs.push(kafkaOutput);

app.http("kafkaOutputWithHttp", {
  methods: ["GET", "POST"],
  authLevel: "anonymous",
  extraOutputs,
  handler: kafkaOutputWithHttp,
});

バッチでイベントを送信するには、次の例に示すように、メッセージの配列を送信します。

const { app, output } = require("@azure/functions");

const kafkaOutput = output.generic({
  type: "kafka",
  direction: "out",
  topic: "topic",
  brokerList: "%BrokerList%",
  username: "ConfluentCloudUsername",
  password: "ConfluentCloudPassword",
  protocol: "saslSsl",
  authenticationMode: "plain",
});

async function kafkaOutputManyWithHttp(request, context) {
  context.log(`Http function processed request for url "${request.url}"`);

  const queryName = request.query.get("name");
  const body = await request.text();
  const parsedbody = body ? JSON.parse(body) : {};
  parsedbody.name = parsedbody.name || "world";
  const name = queryName || parsedbody.name;
  context.extraOutputs.set(kafkaOutput, `Message one. Hello, ${name}!`);
  context.extraOutputs.set(kafkaOutput, `Message two. Hello, ${name}!`);
  return {
    body: `Messages sent to kafka.`,
    status: 200,
  };
}

const extraOutputs = [];
extraOutputs.push(kafkaOutput);

app.http("kafkaOutputManyWithHttp", {
  methods: ["GET", "POST"],
  authLevel: "anonymous",
  extraOutputs,
  handler: kafkaOutputManyWithHttp,
});

次の例は、ヘッダーを含むイベント メッセージを Kafka トピックに送信する方法を示しています。

import {
  app,
  HttpRequest,
  HttpResponseInit,
  InvocationContext,
  output,
} from "@azure/functions";

const kafkaOutput = output.generic({
  type: "kafka",
  direction: "out",
  topic: "topic",
  brokerList: "%BrokerList%",
  username: "ConfluentCloudUsername",
  password: "ConfluentCloudPassword",
  protocol: "saslSsl",
  authenticationMode: "plain",
});

export async function kafkaOutputWithHttp(
  request: HttpRequest,
  context: InvocationContext
): Promise<HttpResponseInit> {
  context.log(`Http function processed request for url "${request.url}"`);

  const body = await request.text();
  const parsedbody = JSON.parse(body);
  // assuming body is of the format { "key": "key", "value": {JSON object} }
  context.extraOutputs.set(
    kafkaOutput,
    `{ "Offset":364,"Partition":0,"Topic":"test-topic","Timestamp":"2022-04-09T03:20:06.591Z", "Value": "${JSON.stringify(
      parsedbody.value
    ).replace(/"/g, '\\"')}", "Key":"${
      parsedbody.key
    }", "Headers": [{ "Key": "language", "Value": "javascript" }] }`
  );
  context.log(
    `Sending message to kafka: ${context.extraOutputs.get(kafkaOutput)}`
  );
  return {
    body: `Message sent to kafka with value: ${context.extraOutputs.get(
      kafkaOutput
    )}`,
    status: 200,
  };
}

const extraOutputs = [];
extraOutputs.push(kafkaOutput);

app.http("kafkaOutputWithHttp", {
  methods: ["GET", "POST"],
  authLevel: "anonymous",
  extraOutputs,
  handler: kafkaOutputWithHttp,
});

動作する JavaScript の例の完全なセットについては、Kafka 拡張機能リポジトリを参照してください。

import {
  app,
  HttpRequest,
  HttpResponseInit,
  InvocationContext,
  output,
} from "@azure/functions";

const kafkaOutput = output.generic({
  type: "kafka",
  direction: "out",
  topic: "topic",
  brokerList: "%BrokerList%",
  username: "ConfluentCloudUsername",
  password: "ConfluentCloudPassword",
  protocol: "saslSsl",
  authenticationMode: "plain",
});

export async function kafkaOutputWithHttp(
  request: HttpRequest,
  context: InvocationContext
): Promise<HttpResponseInit> {
  context.log(`Http function processed request for url "${request.url}"`);

  const body = await request.text();
  const queryName = request.query.get("name");
  const parsedbody = JSON.parse(body);
  const name = queryName || parsedbody.name || "world";
  context.extraOutputs.set(kafkaOutput, `Hello, ${parsedbody.name}!`);
  context.log(
    `Sending message to kafka: ${context.extraOutputs.get(kafkaOutput)}`
  );
  return {
    body: `Message sent to kafka with value: ${context.extraOutputs.get(
      kafkaOutput
    )}`,
    status: 200,
  };
}

const extraOutputs = [];
extraOutputs.push(kafkaOutput);

app.http("kafkaOutputWithHttp", {
  methods: ["GET", "POST"],
  authLevel: "anonymous",
  extraOutputs,
  handler: kafkaOutputWithHttp,
});

バッチでイベントを送信するには、次の例に示すように、メッセージの配列を送信します。

import {
  app,
  HttpRequest,
  HttpResponseInit,
  InvocationContext,
  output,
} from "@azure/functions";

const kafkaOutput = output.generic({
  type: "kafka",
  direction: "out",
  topic: "topic",
  brokerList: "%BrokerList%",
  username: "ConfluentCloudUsername",
  password: "ConfluentCloudPassword",
  protocol: "saslSsl",
  authenticationMode: "plain",
});

export async function kafkaOutputManyWithHttp(
  request: HttpRequest,
  context: InvocationContext
): Promise<HttpResponseInit> {
  context.log(`Http function processed request for url "${request.url}"`);

  const queryName = request.query.get("name");
  const body = await request.text();
  const parsedbody = body ? JSON.parse(body) : {};
  parsedbody.name = parsedbody.name || "world";
  const name = queryName || parsedbody.name;
  context.extraOutputs.set(kafkaOutput, `Message one. Hello, ${name}!`);
  context.extraOutputs.set(kafkaOutput, `Message two. Hello, ${name}!`);
  return {
    body: `Messages sent to kafka.`,
    status: 200,
  };
}

const extraOutputs = [];
extraOutputs.push(kafkaOutput);

app.http("kafkaOutputManyWithHttp", {
  methods: ["GET", "POST"],
  authLevel: "anonymous",
  extraOutputs,
  handler: kafkaOutputManyWithHttp,
});

次の例は、ヘッダーを含むイベント メッセージを Kafka トピックに送信する方法を示しています。

import {
  app,
  HttpRequest,
  HttpResponseInit,
  InvocationContext,
  output,
} from "@azure/functions";

const kafkaOutput = output.generic({
  type: "kafka",
  direction: "out",
  topic: "topic",
  brokerList: "%BrokerList%",
  username: "ConfluentCloudUsername",
  password: "ConfluentCloudPassword",
  protocol: "saslSsl",
  authenticationMode: "plain",
});

export async function kafkaOutputWithHttp(
  request: HttpRequest,
  context: InvocationContext
): Promise<HttpResponseInit> {
  context.log(`Http function processed request for url "${request.url}"`);

  const body = await request.text();
  const parsedbody = JSON.parse(body);
  // assuming body is of the format { "key": "key", "value": {JSON object} }
  context.extraOutputs.set(
    kafkaOutput,
    `{ "Offset":364,"Partition":0,"Topic":"test-topic","Timestamp":"2022-04-09T03:20:06.591Z", "Value": "${JSON.stringify(
      parsedbody.value
    ).replace(/"/g, '\\"')}", "Key":"${
      parsedbody.key
    }", "Headers": [{ "Key": "language", "Value": "typescript" }] }`
  );
  context.log(
    `Sending message to kafka: ${context.extraOutputs.get(kafkaOutput)}`
  );
  return {
    body: `Message sent to kafka with value: ${context.extraOutputs.get(
      kafkaOutput
    )}`,
    status: 200,
  };
}

const extraOutputs = [];
extraOutputs.push(kafkaOutput);

app.http("kafkaOutputWithHttp", {
  methods: ["GET", "POST"],
  authLevel: "anonymous",
  extraOutputs,
  handler: kafkaOutputWithHttp,
});

動作する TypeScript の例の完全なセットについては、 Kafka 拡張機能リポジトリを参照してください。

function.json ファイルの個別のプロパティは、イベント プロバイダーによって異なります。ここにあげた例では Confluent または Azure Event Hubs のいずれかです。 次の例は、HTTP 要求がトリガーして要求から Kafka トピックにデータを送信する関数の Kafka 出力バインドを示しています。

この function.json では、次の例の特定のプロバイダーのトリガーを定義します。

{
  "bindings": [
    {
      "authLevel": "function",
      "type": "httpTrigger",
      "direction": "in",
      "name": "Request",
      "methods": [
        "get"
      ]
    },
    {
      "type": "kafka",
      "name": "outputMessage",
      "brokerList": "BrokerList",
      "topic": "topic",
      "username" : "%ConfluentCloudUserName%",
      "password" : "%ConfluentCloudPassword%",
      "protocol": "SASLSSL",
      "authenticationMode": "PLAIN",
      "direction": "out"
    },
    {
      "type": "http",
      "direction": "out",
      "name": "Response"
    }
  ]
}

次のコードは、トピックにメッセージを送信します。

using namespace System.Net

# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)

# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."

# Interact with query parameters or the body of the request.
$message = $Request.Query.Message

$message

Push-OutputBinding -Name outputMessage -Value ($message)

# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
    StatusCode = [HttpStatusCode]::OK
})

次のコードは、複数のメッセージを配列として同じトピックに送信します。

using namespace System.Net

# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)

# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."

$message = @("one", "two")
Push-OutputBinding -Name outputMessage -Value ($message)

# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
    StatusCode = [HttpStatusCode]::OK
})

次の例に、ヘッダーを含むイベント メッセージを同じ Kafka トピックに送信する方法を示します。

using namespace System.Net

# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)

# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."

# Interact with query parameters or the body of the request.
$message = $Request.Query.Message
if (-not $message) {
    $message = $Request.Body.Message
}

$kevent = @{
    Offset = 364
    Partition = 0
    Topic = "kafkaeventhubtest1"
    Timestamp = "2022-04-09T03:20:06.591Z"
    Value = $message
    Headers= @(@{
        Key= "test"
        Value= "powershell"
    }
    )
}

Push-OutputBinding -Name Message -Value $kevent

# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
    StatusCode = [HttpStatusCode]::OK
    Body = 'ok'
})

動作する PowerShell の例の完全なセットについては、Kafka 拡張機能リポジトリを参照してください。


出力バインドの使用方法は、Python プログラミング モデルのバージョンによって異なります。

Python v2 モデルでは、デコレーターを使用して関数コードで出力バインドを直接定義します。 詳細については、 Azure Functions Python 開発者ガイドを参照してください。

これらの例は、HTTP 要求がトリガーして要求から Kafka トピックにデータを送信する関数の Kafka 出力バインドを示しています。

    input_msg = req.params.get('message')
    outputMessage.set(input_msg)
    return 'OK'


@KafkaOutput.function_name(name="KafkaOutputMany")
@KafkaOutput.route(route="kafka_output_many")
@KafkaOutput.kafka_output(arg_name="outputMessage", topic="KafkaTopic", broker_list="KafkaBrokerList", username="KafkaUsername", password="KafkaPassword", protocol="SaslSsl", authentication_mode="Plain", data_type="string")
def kafka_output_many(req: func.HttpRequest, outputMessage: func.Out[str] ) -> func.HttpResponse:
    outputMessage.set(json.dumps(['one', 'two']))
    return 'OK'

バッチでイベントを送信するには、次の例に示すように、メッセージの配列を送信します。

@KafkaOutput.route(route="kafka_output_with_headers")
@KafkaOutput.kafka_output(arg_name="out", topic="KafkaTopic", broker_list="KafkaBrokerList", username="KafkaUsername", password="KafkaPassword", protocol="SaslSsl", authentication_mode="Plain")
def kafka_output_with_headers(req: func.HttpRequest, out: func.Out[str]) -> func.HttpResponse:
    message = req.params.get('message')
    kevent =  { "Offset":0,"Partition":0,"Topic":"dummy","Timestamp":"2022-04-09T03:20:06.591Z", "Value": message, "Headers": [{ "Key": "test", "Value": "python" }] }
    out.set(json.dumps(kevent))
    return 'OK'

@KafkaOutput.function_name(name="KafkaOutputManyWithHeaders")
@KafkaOutput.route(route="kafka_output_many_with_headers")
@KafkaOutput.kafka_output(arg_name="out", topic="KafkaTopic", broker_list="KafkaBrokerList", username="KafkaUsername", password="KafkaPassword", protocol="SaslSsl", authentication_mode="Plain")
def kafka_output_many_with_headers(req: func.HttpRequest, out: func.Out[str]) -> func.HttpResponse:
    kevent = [{ "Offset": 364, "Partition":0,"Topic":"kafkaeventhubtest1","Timestamp":"2022-04-09T03:20:06.591Z", "Value": "one", "Headers": [{ "Key": "test", "Value": "python" }]  },

次の例は、ヘッダーを含むイベント メッセージを Kafka トピックに送信する方法を示しています。

out.set(json.dumps(kevent))
return 'OK'

動作する Python の例の完全なセットについては、Kafka 拡張機能リポジトリを参照してください。

出力バインドの構成に使用する注釈は、個別のイベント プロバイダーによって異なります。

次の関数は、Kafka トピックにメッセージを送信します。

@FunctionName("KafkaOutput")
public HttpResponseMessage run(
        @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
        @KafkaOutput(
            name = "kafkaOutput",
            topic = "topic",  
            brokerList="%BrokerList%",
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.  
            protocol = BrokerProtocol.SASLSSL
        )  OutputBinding<String> output,
        final ExecutionContext context) {
    context.getLogger().info("Java HTTP trigger processed a request.");

    // Parse query parameter
    String query = request.getQueryParameters().get("message");
    String message = request.getBody().orElse(query);
    context.getLogger().info("Message:" + message);
    output.setValue(message);
    return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();

次の例は、Kafka トピックに複数のメッセージを送信する方法を示しています。

@FunctionName("KafkaOutputMany")
public HttpResponseMessage run(
        @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
        @KafkaOutput(
            name = "kafkaOutput",
            topic = "topic",  
            brokerList="%BrokerList%",
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.  
            protocol = BrokerProtocol.SASLSSL
        )  OutputBinding<String[]> output,
        final ExecutionContext context) {
    context.getLogger().info("Java HTTP trigger processed a request.");
    String[] messages = new String[2];
    messages[0] = "one";
    messages[1] = "two";
    output.setValue(messages);
    return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
}

この例では、出力バインド パラメーターが文字列配列に変更されています。

最後の例では、次の KafkaEntity クラスと KafkaHeader クラスを使用します。

public class KafkaEntity {
    public int Offset;
    public int Partition;
    public String Timestamp;
    public String Topic;
    public String Value;
    public KafkaHeaders Headers[];

    public KafkaEntity(int Offset, int Partition, String Topic, String Timestamp, String Value,KafkaHeaders[] headers) {
        this.Offset = Offset;
        this.Partition = Partition;
        this.Topic = Topic;
        this.Timestamp = Timestamp;
        this.Value = Value;
        this.Headers = headers;
    }
public class KafkaHeaders{
    public String Key;
    public String Value;

    public KafkaHeaders(String key, String value) {
        this.Key = key;
        this.Value = value;
    }

次の関数の例では、ヘッダーを含むメッセージを Kafka トピックに送信します。

@FunctionName("KafkaOutputWithHeaders")
public HttpResponseMessage run(
        @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
        @KafkaOutput(
            name = "kafkaOutput",
            topic = "topic",  
            brokerList="%BrokerList%",
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.  
            protocol = BrokerProtocol.SASLSSL
        )  OutputBinding<KafkaEntity> output,
        final ExecutionContext context) {
            context.getLogger().info("Java HTTP trigger processed a request.");
    
            // Parse query parameter
            String query = request.getQueryParameters().get("message");
            String message = request.getBody().orElse(query);
            KafkaHeaders[] headers = new KafkaHeaders[1];
            headers[0] = new KafkaHeaders("test", "java");
            KafkaEntity kevent = new KafkaEntity(364, 0, "topic", "2022-04-09T03:20:06.591Z", message, headers);
            output.setValue(kevent);
            return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
        }

Confluent で動作する Java の例の完全なセットについては、Kafka 拡張機能リポジトリを参照してください。

属性

インプロセス分離ワーカー プロセスの C# ライブラリは、どちらも Kafka 属性を使用して関数トリガーを定義します。

次の表では、この属性を使用して設定できるプロパティについて説明します。

パラメーター 説明
BrokerList (必須) 出力の送信先となる Kafka ブローカーのリスト。 詳細については、「接続」を参照してください。
トピック (必須) 出力の送信先となるトピック。
AvroSchema (省略可能)Avro プロトコルを使用する場合のメッセージ値の汎用レコードのスキーマ。
KeyAvroSchema (省略可能)Avro プロトコルを使用する場合のメッセージ キーの汎用レコードのスキーマ。
KeyDataType (省略可能)Kafka トピックにメッセージ キーを送信するデータ型。 KeyAvroSchemaが設定されている場合、この値は汎用レコードです。 使用できる値は、 IntLongString、および Binaryです。
MaxMessageBytes (省略可能) 送信される出力メッセージの最大サイズ (MB 単位)、既定値は 1
BatchSize (省略可能) 1 つのメッセージ セットでバッチ処理されるメッセージの最大数。既定値は 10000
EnableIdempotence (省略可能) true に設定すると、メッセージが 1 回だけ元の生成順序で正常に生成される。既定値は false
MessageTimeoutMs (省略可能) ローカル メッセージのタイムアウト (ミリ秒単位)。 この値はローカルでのみ適用され、生成されたメッセージが正常に配信されるまで待機する時間を制限します。既定値は 300000 です。 時間 0 は無限です。 この値は、メッセージの配信に使用される最大時間 (再試行を含む) です。 配信エラーは、再試行回数またはメッセージのタイムアウトを超えた場合に発生します。
RequestTimeoutMs (省略可能) 出力要求の受信確認タイムアウト (ミリ秒単位)。既定値は 5000
MaxRetries (省略可能) 失敗したメッセージの送信を再試行する回数。既定値は 2EnableIdempotencetrue に設定されている場合を除き、再試行すると、並べ替えが発生する場合があります。
AuthenticationMode (省略可能) 簡易認証およびセキュリティ層 (SASL) 認証を使用する場合の認証モード。 サポートされている値は、 NotSet (既定)、 GssapiPlainScramSha256ScramSha512、および OAuthBearerです。
ユーザー名 (省略可能) SASL 認証のユーザー名。 AuthenticationModeGssapi の場合はサポートされません。 詳細については、「接続」を参照してください。
パスワード (省略可能) SASL 認証のパスワード。 AuthenticationModeGssapi の場合はサポートされません。 詳細については、「接続」を参照してください。
プロトコル (省略可能) ブローカーと通信するときに使用されるセキュリティ プロトコル。 サポートされている値は、 NotSet (既定)、 plaintextsslsasl_plaintextsasl_sslです。
SslCaLocation (省略可能) ブローカーの証明書を検証するための CA 証明書ファイルへのパス。
SslCertificateLocation (省略可能) クライアントの証明書へのパス。
SslKeyLocation (省略可能) 認証に使用されるクライアントの秘密キー (PEM) へのパス。
SslKeyPassword (省略可能) クライアントの証明書のパスワード。
SslCertificatePEM (省略可能)文字列としての PEM 形式のクライアント証明書。 詳細については、「接続」を参照してください。
SslKeyPEM (省略可能)文字列としての PEM 形式のクライアント秘密キー。 詳細については、「接続」を参照してください。
SslCaPEM (省略可能)文字列としての PEM 形式の CA 証明書。 詳細については、「接続」を参照してください。
SslCertificateandKeyPEM (省略可能)PEM 形式のクライアント証明書とキーを文字列として使用します。 詳細については、「接続」を参照してください。
SchemaRegistryUrl (省略可能)Avro スキーマ レジストリの URL。 詳細については、「接続」を参照してください。
SchemaRegistryUsername (省略可能)Avro スキーマ レジストリのユーザー名。 詳細については、「接続」を参照してください。
SchemaRegistryPassword (省略可能)Avro スキーマ レジストリのパスワード。 詳細については、「接続」を参照してください。
OAuthBearerMethod (省略可能)OAuth Bearer メソッド。 使用できる値は、 oidcdefaultです。
OAuthBearerClientId (省略可能) OAuthBearerMethodoidc に設定すると、OAuth ベアラー クライアント ID が指定されます。 詳細については、「接続」を参照してください。
OAuthBearerClientSecret (省略可能) OAuthBearerMethodoidc に設定すると、OAuth ベアラー クライアント シークレットが指定されます。 詳細については、「接続」を参照してください。
OAuthBearerScope (省略可能)ブローカーへのアクセス要求のスコープを指定します。
OAuthBearerTokenEndpointUrl (省略可能)メソッドを使用するときにトークンを取得するために使用される OAuth/OIDC 発行者トークン エンドポイント HTTP(S) URI oidc 使用されます。 詳細については、「接続」を参照してください。
OAuthBearerExtensions (省略可能)メソッドの使用時にブローカーに追加情報として提供されるキーと値のペアのコンマ区切りのリスト oidc 。 たとえば、 supportFeatureX=true,organizationId=sales-emeaと指定します。

注釈

KafkaOutput注釈を使用すると、特定のトピックに書き込む関数を作成できます。 サポートされるオプションには、次の要素が含まれます。

要素 説明
name 関数コード内のブローカー データを表す変数の名前。
brokerList (必須) 出力の送信先となる Kafka ブローカーのリスト。 詳細については、「接続」を参照してください。
topic (必須) 出力の送信先となるトピック。
dataType Functions によりパラメーター値が処理される方法を定義します。 既定では、値は文字列として取得され、Functions により文字列を実際の単純な従来の Java オブジェクト (POJO) への逆シリアル化が試みられます。 string の場合、入力は単なる文字列として扱われます。 binary の場合、メッセージはバイナリ データとして受信され、Functions により実際のパラメーター型 byte[] への逆シリアル化が試みられます。
avroSchema (省略可能) Avro プロトコルを使用する場合の汎用レコードのスキーマ。 (現在、Java ではサポートされていません)。)
maxMessageBytes (省略可能) 送信される出力メッセージの最大サイズ (MB 単位)、既定値は 1
batchSize (省略可能) 1 つのメッセージ セットでバッチ処理されるメッセージの最大数。既定値は 10000
enableIdempotence (省略可能) trueに設定すると、メッセージが 1 回だけ正常に生成され、元の生成順序で正常に生成され、既定値は false になります。
messageTimeoutMs (省略可能) ローカル メッセージのタイムアウト (ミリ秒単位)。 この値はローカルでのみ適用され、生成されたメッセージが正常に配信されるまで待機する時間を制限します。既定値は 300000 です。 時間 0 は無限です。 この値は、メッセージの配信に使用される最大時間 (再試行を含む) です。 配信エラーは、再試行回数またはメッセージのタイムアウトを超えた場合に発生します。
requestTimeoutMs (省略可能) 出力要求の受信確認タイムアウト (ミリ秒単位)。既定値は 5000
maxRetries (省略可能) 失敗したメッセージの送信を再試行する回数。既定値は 2EnableIdempotencetrue に設定されていない限り、再試行によって並べ替えが発生する可能性があります。
authenticationMode (省略可能) 簡易認証およびセキュリティ層 (SASL) 認証を使用する場合の認証モード。 サポートされている値は、 NotSet (既定)、 GssapiPlainScramSha256ScramSha512です。
username (省略可能) SASL 認証のユーザー名。 AuthenticationModeGssapi の場合はサポートされません。 詳細については、「接続」を参照してください。
password (省略可能) SASL 認証のパスワード。 AuthenticationModeGssapi の場合はサポートされません。 詳細については、「接続」を参照してください。
protocol (省略可能) ブローカーと通信するときに使用されるセキュリティ プロトコル。 サポートされている値は、 NotSet (既定)、 plaintextsslsasl_plaintextsasl_sslです。
sslCaLocation (省略可能) ブローカーの証明書を検証するための CA 証明書ファイルへのパス。
sslCertificateLocation (省略可能) クライアントの証明書へのパス。
sslKeyLocation (省略可能) 認証に使用されるクライアントの秘密キー (PEM) へのパス。
sslKeyPassword (省略可能) クライアントの証明書のパスワード。
schemaRegistryUrl (省略可能)Avro スキーマ レジストリの URL。 詳細については、「接続」を参照してください。
schemaRegistryUsername (省略可能)Avro スキーマ レジストリのユーザー名。 詳細については、「接続」を参照してください。
schemaRegistryPassword (省略可能)Avro スキーマ レジストリのパスワード。 詳細については、「接続」を参照してください。

構成

次の表は、function.json ファイルで設定したバインド構成のプロパティを説明しています。

function.json のプロパティ 説明
type kafka に設定します。
direction out に設定します。
name 関数コード内のブローカー データを表す変数の名前。
brokerList (必須) 出力の送信先となる Kafka ブローカーのリスト。 詳細については、「接続」を参照してください。
topic (必須) 出力の送信先となるトピック。
avroSchema (省略可能) Avro プロトコルを使用する場合の汎用レコードのスキーマ。
keyAvroSchema (省略可能)Avro プロトコルを使用する場合のメッセージ キーの汎用レコードのスキーマ。
keyDataType (省略可能)Kafka トピックにメッセージ キーを送信するデータ型。 keyAvroSchemaが設定されている場合、この値は汎用レコードです。 使用できる値は、 IntLongString、および Binaryです。
maxMessageBytes (省略可能) 送信される出力メッセージの最大サイズ (MB 単位)、既定値は 1
batchSize (省略可能) 1 つのメッセージ セットでバッチ処理されるメッセージの最大数。既定値は 10000
enableIdempotence (省略可能) trueに設定すると、メッセージが 1 回だけ正常に生成され、元の生成順序で正常に生成され、既定値は false になります。
messageTimeoutMs (省略可能) ローカル メッセージのタイムアウト (ミリ秒単位)。 この値はローカルでのみ適用され、生成されたメッセージが正常に配信されるまで待機する時間を制限します。既定値は 300000 です。 時間 0 は無限です。 この値は、メッセージの配信に使用される最大時間 (再試行を含む) です。 配信エラーは、再試行回数またはメッセージのタイムアウトを超えた場合に発生します。
requestTimeoutMs (省略可能) 出力要求の受信確認タイムアウト (ミリ秒単位)。既定値は 5000
maxRetries (省略可能) 失敗したメッセージの送信を再試行する回数。既定値は 2EnableIdempotencetrue に設定されていない限り、再試行によって並べ替えが発生する可能性があります。
authenticationMode (省略可能) 簡易認証およびセキュリティ層 (SASL) 認証を使用する場合の認証モード。 サポートされている値は、 NotSet (既定)、 GssapiPlainScramSha256ScramSha512です。
username (省略可能) SASL 認証のユーザー名。 AuthenticationModeGssapi の場合はサポートされません。 詳細については、「接続」を参照してください。
password (省略可能) SASL 認証のパスワード。 AuthenticationModeGssapi の場合はサポートされません。 詳細については、「接続」を参照してください。
protocol (省略可能) ブローカーと通信するときに使用されるセキュリティ プロトコル。 サポートされている値は、 NotSet (既定)、 plaintextsslsasl_plaintextsasl_sslです。
sslCaLocation (省略可能) ブローカーの証明書を検証するための CA 証明書ファイルへのパス。
sslCertificateLocation (省略可能) クライアントの証明書へのパス。
sslKeyLocation (省略可能) 認証に使用されるクライアントの秘密キー (PEM) へのパス。
sslKeyPassword (省略可能) クライアントの証明書のパスワード。
sslCertificatePEM (省略可能)文字列としての PEM 形式のクライアント証明書。 詳細については、「接続」を参照してください。
sslKeyPEM (省略可能)文字列としての PEM 形式のクライアント秘密キー。 詳細については、「接続」を参照してください。
sslCaPEM (省略可能)文字列としての PEM 形式の CA 証明書。 詳細については、「接続」を参照してください。
sslCertificateandKeyPEM (省略可能)PEM 形式のクライアント証明書とキーを文字列として使用します。 詳細については、「接続」を参照してください。
schemaRegistryUrl (省略可能)Avro スキーマ レジストリの URL。 詳細については、「接続」を参照してください。
schemaRegistryUsername (省略可能)Avro スキーマ レジストリのユーザー名。 詳細については、「接続」を参照してください。
schemaRegistryPassword (省略可能)Avro スキーマ レジストリのパスワード。 詳細については、「接続」を参照してください。
oAuthBearerMethod (省略可能)OAuth Bearer メソッド。 使用できる値は、 oidcdefaultです。
oAuthBearerClientId (省略可能) oAuthBearerMethodoidc に設定すると、OAuth ベアラー クライアント ID が指定されます。 詳細については、「接続」を参照してください。
oAuthBearerClientSecret (省略可能) oAuthBearerMethodoidc に設定すると、OAuth ベアラー クライアント シークレットが指定されます。 詳細については、「接続」を参照してください。
oAuthBearerScope (省略可能)ブローカーへのアクセス要求のスコープを指定します。
oAuthBearerTokenEndpointUrl (省略可能)メソッドを使用するときにトークンを取得するために使用される OAuth/OIDC 発行者トークン エンドポイント HTTP(S) URI oidc 使用されます。 詳細については、「接続」を参照してください。

構成

次の表は、function.json ファイルで設定したバインド構成のプロパティを説明しています。 Python では、構成プロパティsnake_case名前付け規則が使用されます。

function.json のプロパティ 説明
type kafka に設定します。
direction out に設定します。
name 関数コード内のブローカー データを表す変数の名前。
broker_list (必須) 出力の送信先となる Kafka ブローカーのリスト。 詳細については、「接続」を参照してください。
topic (必須) 出力の送信先となるトピック。
avroSchema (省略可能) Avro プロトコルを使用する場合の汎用レコードのスキーマ。
maxMessageBytes (省略可能) 送信される出力メッセージの最大サイズ (MB 単位)、既定値は 1
batchSize (省略可能) 1 つのメッセージ セットでバッチ処理されるメッセージの最大数。既定値は 10000
enableIdempotence (省略可能) trueに設定すると、メッセージが 1 回だけ正常に生成され、元の生成順序で正常に生成され、既定値は false になります。
messageTimeoutMs (省略可能) ローカル メッセージのタイムアウト (ミリ秒単位)。 この値はローカルでのみ適用され、生成されたメッセージが正常に配信されるまで待機する時間を制限します。既定値は 300000 です。 時間 0 は無限です。 この値は、メッセージの配信に使用される最大時間 (再試行を含む) です。 配信エラーは、再試行回数またはメッセージのタイムアウトを超えた場合に発生します。
requestTimeoutMs (省略可能) 出力要求の受信確認タイムアウト (ミリ秒単位)。既定値は 5000
maxRetries (省略可能) 失敗したメッセージの送信を再試行する回数。既定値は 2EnableIdempotencetrue に設定されていない限り、再試行によって並べ替えが発生する可能性があります。
authentication_mode (省略可能) 簡易認証およびセキュリティ層 (SASL) 認証を使用する場合の認証モード。 サポートされている値は、 NOTSET (既定)、 GssapiPlainScramSha256ScramSha512です。
username (省略可能) SASL 認証のユーザー名。 authentication_modeGssapi の場合はサポートされません。 詳細については、「接続」を参照してください。
password (省略可能) SASL 認証のパスワード。 authentication_modeGssapi の場合はサポートされません。 詳細については、「接続」を参照してください。
protocol (省略可能) ブローカーと通信するときに使用されるセキュリティ プロトコル。 サポートされている値は、 NOTSET (既定)、 plaintextsslsasl_plaintextsasl_sslです。
sslCaLocation (省略可能) ブローカーの証明書を検証するための CA 証明書ファイルへのパス。
sslCertificateLocation (省略可能) クライアントの証明書へのパス。
sslKeyLocation (省略可能) 認証に使用されるクライアントの秘密キー (PEM) へのパス。
sslKeyPassword (省略可能) クライアントの証明書のパスワード。
schema_registry_url (省略可能)Avro スキーマ レジストリの URL。 詳細については、「接続」を参照してください。
schema_registry_username (省略可能)Avro スキーマ レジストリのユーザー名。 詳細については、「接続」を参照してください。
schema_registry_password (省略可能)Avro スキーマ レジストリのパスワード。 詳細については、「接続」を参照してください。
o_auth_bearer_method (省略可能)OAuth Bearer メソッド。 使用できる値は、 oidcdefaultです。
o_auth_bearer_client_id (省略可能) o_auth_bearer_methodoidc に設定すると、OAuth ベアラー クライアント ID が指定されます。 詳細については、「接続」を参照してください。
o_auth_bearer_client_secret (省略可能) o_auth_bearer_methodoidc に設定すると、OAuth ベアラー クライアント シークレットが指定されます。 詳細については、「接続」を参照してください。
o_auth_bearer_scope (省略可能)ブローカーへのアクセス要求のスコープを指定します。
o_auth_bearer_token_endpoint_url (省略可能)メソッドを使用するときにトークンを取得するために使用される OAuth/OIDC 発行者トークン エンドポイント HTTP(S) URI oidc 使用されます。 詳細については、「接続」を参照してください。

注意

証明書 PEM 関連のプロパティと Avro キー関連のプロパティは、Python ライブラリではまだ使用できません。

使用法

キー型と値型はどちらも、組み込みの Avro シリアル化と Protobuf シリアル化で機能します。

イベントのオフセット、パーティション、タイムスタンプは実行時に生成されます。 関数内の値とヘッダーのみを設定できます。 function.json ファイルでトピックを設定します。

記述する Kafka トピックにアクセスできることを確認します。 Kafka トピックへのアクセスおよび接続資格情報を使用してバインドを構成します。

Premium プランでは、Kafka 出力のランタイム スケール監視を有効にして、複数のインスタンスにスケールアウトする必要があります。 詳細については、「ランタイム スケールを有効にする」を参照してください。

Kafka トリガーでサポートされている host.json 設定の完全なセットについては、「host.json 設定」を参照してください。

接続

トリガーとバインドに必要なすべての接続情報を、コード内のバインド定義ではなく、アプリケーション設定に格納します。 このガイダンスは資格情報に適用され、コードに保存しないでください。

重要

資格情報の設定はアプリケーション設定を参照する必要があります。 コードや構成のファイル内に資格情報をハードコーディングしないでください。 ローカルで実行する場合は、資格情報に local.settings.json ファイルを使用します。local.settings.json ファイルは公開しないでください。

Azure の Confluent によって提供されるマネージド Kafka クラスターに接続する場合は、次のいずれかの認証方法を使用できます。

注意

Flex 従量課金プランを使用する場合、ファイルの場所ベースの証明書認証プロパティ (SslCaLocationSslCertificateLocationSslKeyLocation) はサポートされていません。 代わりに、PEM ベースの証明書プロパティ (SslCaPEMSslCertificatePEMSslKeyPEMSslCertificateandKeyPEM) を使用するか、Azure Key Vault に証明書を格納します。

スキーマ レジストリ

Kafka 拡張機能で Confluent によって提供されるスキーマ レジストリを使用するには、次の資格情報を設定します。

設定 推奨値 説明
SchemaRegistryUrl SchemaRegistryUrl スキーマ管理に使用されるスキーマ レジストリ サービスの URL。 通常、形式 https://psrc-xyz.us-east-2.aws.confluent.cloud
SchemaRegistryUsername CONFLUENT_API_KEY スキーマ レジストリの基本認証のユーザー名 (必要な場合)。
SchemaRegistryPassword CONFLUENT_API_SECRET スキーマ レジストリでの基本認証のパスワード (必要な場合)。

ユーザー名/パスワード認証

この形式の認証を使用する場合は、 ProtocolSaslPlaintext または SaslSsl のいずれかに設定されていることを確認します。 AuthenticationModePlainScramSha256 、または ScramSha512 に設定され、使用されている CA 証明書が既定の ISRG ルート X1 証明書と異なる場合は、必ず SslCaLocation または SslCaPEMを更新してください。

設定 推奨値 説明
BrokerList BootstrapServer BootstrapServer という名前のアプリ設定には、Confluent Cloud の設定ページで検出されたブートストラップ サーバーの値が含まれています。 値は xyz-xyzxzy.westeurope.azure.confluent.cloud:9092 のようになります。
ユーザー名 ConfluentCloudUsername ConfluentCloudUsername という名前のアプリ設定には、Confluent Cloud Web サイトからの API アクセス キーが含まれています。
パスワード ConfluentCloudPassword ConfluentCloudPassword という名前のアプリ設定には、Confluent Cloud Web サイトから取得した API シークレットが含まれています。
SslCaPEM SSLCaPemCertificate CA 証明書を PEM 形式の文字列として含む SSLCaPemCertificate という名前のアプリ設定。 値は標準の形式 (例: -----BEGIN CERTIFICATE-----\nMII....JQ==\n-----END CERTIFICATE-----) に従う必要があります。

SSL 認証

ProtocolSSL に設定されていることを確認します。

設定 推奨値 説明
BrokerList BootstrapServer BootstrapServer という名前のアプリ設定には、Confluent Cloud の設定ページで検出されたブートストラップ サーバーの値が含まれています。 値は xyz-xyzxzy.westeurope.azure.confluent.cloud:9092 のようになります。
SslCaPEM SslCaCertificatePem CA 証明書の PEM 値を文字列として含む SslCaCertificatePem という名前のアプリ設定。 値は標準の形式に従う必要があります。 -----BEGIN CERTIFICATE-----\nMII...JQ==\n-----END CERTIFICATE-----
SslCertificatePEM SslClientCertificatePem クライアント証明書の PEM 値を文字列として含む SslClientCertificatePem という名前のアプリ設定。 値は標準の形式に従う必要があります。 -----BEGIN CERTIFICATE-----\nMII...JQ==\n-----END CERTIFICATE-----
SslKeyPEM SslClientKeyPem クライアント秘密キーの PEM 値を文字列として含む SslClientKeyPem という名前のアプリ設定。 値は標準の形式に従う必要があります。 -----BEGIN PRIVATE KEY-----\nMII...JQ==\n-----END PRIVATE KEY-----
SslCertificateandKeyPEM SslClientCertificateAndKeyPem クライアント証明書の PEM 値と、文字列として連結されたクライアント秘密キーを含む、 SslClientCertificateAndKeyPem という名前のアプリ設定。 値は標準の形式に従う必要があります。 -----BEGIN CERTIFICATE-----\nMII....JQ==\n-----END CERTIFICATE-----\n-----BEGIN PRIVATE KEY-----\nMIIE....BM=\n-----END PRIVATE KEY-----
SslKeyPassword SslClientKeyPassword 秘密キーのパスワードを含む SslClientKeyPassword という名前のアプリ設定 (存在する場合)。

OAuth 認証

OAuth 認証を使用する場合は、バインド定義で OAuth 関連のプロパティを構成します。

これらの設定に使用する文字列値は、Azure のアプリケーション設定として、またはローカル開発中に Values内の コレクションに存在する必要があります。

また、バインド定義で ProtocolAuthenticationMode も設定する必要があります。

次のステップ