หมายเหตุ
การเข้าถึงหน้านี้ต้องได้รับการอนุญาต คุณสามารถลอง ลงชื่อเข้าใช้หรือเปลี่ยนไดเรกทอรีได้
การเข้าถึงหน้านี้ต้องได้รับการอนุญาต คุณสามารถลองเปลี่ยนไดเรกทอรีได้
The output binding enables an Azure Functions app to send messages to a Kafka topic.
Important
Kafka bindings are available for Functions on the Flex Consumption plan, Elastic Premium Plan, and Dedicated (App Service) plan. They are only supported on version 4.x of the Functions runtime.
Example
How you use the binding depends on the C# modality in your function app. You can use one of the following modalities:
A compiled C# function that uses an isolated worker process class library that runs in a process that's separate from the runtime.
The attributes you use depend on the specific event provider.
The following example uses a custom return type named MultipleOutputType, which consists of an HTTP response and a Kafka output.
[Function("KafkaOutput")]
public static MultipleOutputType Output(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
FunctionContext executionContext)
{
var log = executionContext.GetLogger("HttpFunction");
log.LogInformation("C# HTTP trigger function processed a request.");
string message = req.FunctionContext
.BindingContext
.BindingData["message"]
.ToString();
var response = req.CreateResponse(HttpStatusCode.OK);
return new MultipleOutputType()
{
Kevent = message,
HttpResponse = response
};
}
In the MultipleOutputType class, Kevent is the output binding variable for the Kafka binding.
public class MultipleOutputType
{
[KafkaOutput("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain
)]
public string Kevent { get; set; }
public HttpResponseData HttpResponse { get; set; }
}
To send a batch of events, pass a string array to the output type, as shown in the following example:
[Function("KafkaOutputMany")]
public static MultipleOutputTypeForBatch Output(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
FunctionContext executionContext)
{
var log = executionContext.GetLogger("HttpFunction");
log.LogInformation("C# HTTP trigger function processed a request.");
var response = req.CreateResponse(HttpStatusCode.OK);
string[] messages = new string[2];
messages[0] = "one";
messages[1] = "two";
return new MultipleOutputTypeForBatch()
{
Kevents = messages,
HttpResponse = response
};
}
The string array is defined as the Kevents property on the class, and the output binding is defined on this property:
public class MultipleOutputTypeForBatch
{
[KafkaOutput("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain
)]
public string[] Kevents { get; set; }
public HttpResponseData HttpResponse { get; set; }
}
The following function adds headers to the Kafka output data:
[Function("KafkaOutputWithHeaders")]
public static MultipleOutputType Output(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
FunctionContext executionContext)
{
var log = executionContext.GetLogger("HttpFunction");
log.LogInformation("C# HTTP trigger function processed a request.");
string message = req.FunctionContext
.BindingContext
.BindingData["message"]
.ToString();
string kevent = "{ \"Offset\":364,\"Partition\":0,\"Topic\":\"kafkaeventhubtest1\",\"Timestamp\":\"2022-04-09T03:20:06.591Z\", \"Value\": \"" + message + "\", \"Headers\": [{ \"Key\": \"test\", \"Value\": \"dotnet-isolated\" }] }";
var response = req.CreateResponse(HttpStatusCode.OK);
return new MultipleOutputType()
{
Kevent = kevent,
HttpResponse = response
};
}
For a complete set of working .NET examples, see the Kafka extension repository.
The usage of the output binding depends on your version of the Node.js programming model.
In the Node.js v4 model, you define your output binding directly in your function code. For more information, see the Azure Functions Node.js developer guide.
In these examples, the event providers are either Confluent or Azure Event Hubs. These examples show a Kafka output binding for a function that an HTTP request triggers and sends data from the request to the Kafka topic.
import {
app,
HttpRequest,
HttpResponseInit,
InvocationContext,
output,
} from "@azure/functions";
const kafkaOutput = output.generic({
type: "kafka",
direction: "out",
topic: "topic",
brokerList: "%BrokerList%",
username: "ConfluentCloudUsername",
password: "ConfluentCloudPassword",
protocol: "saslSsl",
authenticationMode: "plain",
});
export async function kafkaOutputWithHttp(
request: HttpRequest,
context: InvocationContext
): Promise<HttpResponseInit> {
context.log(`Http function processed request for url "${request.url}"`);
const body = await request.text();
const queryName = request.query.get("name");
const parsedbody = JSON.parse(body);
const name = queryName || parsedbody.name || "world";
context.extraOutputs.set(kafkaOutput, `Hello, ${parsedbody.name}!`);
context.log(
`Sending message to kafka: ${context.extraOutputs.get(kafkaOutput)}`
);
return {
body: `Message sent to kafka with value: ${context.extraOutputs.get(
kafkaOutput
)}`,
status: 200,
};
}
const extraOutputs = [];
extraOutputs.push(kafkaOutput);
app.http("kafkaOutputWithHttp", {
methods: ["GET", "POST"],
authLevel: "anonymous",
extraOutputs,
handler: kafkaOutputWithHttp,
});
To send events in a batch, send an array of messages, as shown in these examples:
const { app, output } = require("@azure/functions");
const kafkaOutput = output.generic({
type: "kafka",
direction: "out",
topic: "topic",
brokerList: "%BrokerList%",
username: "ConfluentCloudUsername",
password: "ConfluentCloudPassword",
protocol: "saslSsl",
authenticationMode: "plain",
});
async function kafkaOutputManyWithHttp(request, context) {
context.log(`Http function processed request for url "${request.url}"`);
const queryName = request.query.get("name");
const body = await request.text();
const parsedbody = body ? JSON.parse(body) : {};
parsedbody.name = parsedbody.name || "world";
const name = queryName || parsedbody.name;
context.extraOutputs.set(kafkaOutput, `Message one. Hello, ${name}!`);
context.extraOutputs.set(kafkaOutput, `Message two. Hello, ${name}!`);
return {
body: `Messages sent to kafka.`,
status: 200,
};
}
const extraOutputs = [];
extraOutputs.push(kafkaOutput);
app.http("kafkaOutputManyWithHttp", {
methods: ["GET", "POST"],
authLevel: "anonymous",
extraOutputs,
handler: kafkaOutputManyWithHttp,
});
These examples show how to send an event message with headers to a Kafka topic:
import {
app,
HttpRequest,
HttpResponseInit,
InvocationContext,
output,
} from "@azure/functions";
const kafkaOutput = output.generic({
type: "kafka",
direction: "out",
topic: "topic",
brokerList: "%BrokerList%",
username: "ConfluentCloudUsername",
password: "ConfluentCloudPassword",
protocol: "saslSsl",
authenticationMode: "plain",
});
export async function kafkaOutputWithHttp(
request: HttpRequest,
context: InvocationContext
): Promise<HttpResponseInit> {
context.log(`Http function processed request for url "${request.url}"`);
const body = await request.text();
const parsedbody = JSON.parse(body);
// assuming body is of the format { "key": "key", "value": {JSON object} }
context.extraOutputs.set(
kafkaOutput,
`{ "Offset":364,"Partition":0,"Topic":"test-topic","Timestamp":"2022-04-09T03:20:06.591Z", "Value": "${JSON.stringify(
parsedbody.value
).replace(/"/g, '\\"')}", "Key":"${
parsedbody.key
}", "Headers": [{ "Key": "language", "Value": "javascript" }] }`
);
context.log(
`Sending message to kafka: ${context.extraOutputs.get(kafkaOutput)}`
);
return {
body: `Message sent to kafka with value: ${context.extraOutputs.get(
kafkaOutput
)}`,
status: 200,
};
}
const extraOutputs = [];
extraOutputs.push(kafkaOutput);
app.http("kafkaOutputWithHttp", {
methods: ["GET", "POST"],
authLevel: "anonymous",
extraOutputs,
handler: kafkaOutputWithHttp,
});
For a complete set of working JavaScript examples, see the Kafka extension repository.
import {
app,
HttpRequest,
HttpResponseInit,
InvocationContext,
output,
} from "@azure/functions";
const kafkaOutput = output.generic({
type: "kafka",
direction: "out",
topic: "topic",
brokerList: "%BrokerList%",
username: "ConfluentCloudUsername",
password: "ConfluentCloudPassword",
protocol: "saslSsl",
authenticationMode: "plain",
});
export async function kafkaOutputWithHttp(
request: HttpRequest,
context: InvocationContext
): Promise<HttpResponseInit> {
context.log(`Http function processed request for url "${request.url}"`);
const body = await request.text();
const queryName = request.query.get("name");
const parsedbody = JSON.parse(body);
const name = queryName || parsedbody.name || "world";
context.extraOutputs.set(kafkaOutput, `Hello, ${parsedbody.name}!`);
context.log(
`Sending message to kafka: ${context.extraOutputs.get(kafkaOutput)}`
);
return {
body: `Message sent to kafka with value: ${context.extraOutputs.get(
kafkaOutput
)}`,
status: 200,
};
}
const extraOutputs = [];
extraOutputs.push(kafkaOutput);
app.http("kafkaOutputWithHttp", {
methods: ["GET", "POST"],
authLevel: "anonymous",
extraOutputs,
handler: kafkaOutputWithHttp,
});
To send events in a batch, send an array of messages, as shown in these examples:
import {
app,
HttpRequest,
HttpResponseInit,
InvocationContext,
output,
} from "@azure/functions";
const kafkaOutput = output.generic({
type: "kafka",
direction: "out",
topic: "topic",
brokerList: "%BrokerList%",
username: "ConfluentCloudUsername",
password: "ConfluentCloudPassword",
protocol: "saslSsl",
authenticationMode: "plain",
});
export async function kafkaOutputManyWithHttp(
request: HttpRequest,
context: InvocationContext
): Promise<HttpResponseInit> {
context.log(`Http function processed request for url "${request.url}"`);
const queryName = request.query.get("name");
const body = await request.text();
const parsedbody = body ? JSON.parse(body) : {};
parsedbody.name = parsedbody.name || "world";
const name = queryName || parsedbody.name;
context.extraOutputs.set(kafkaOutput, `Message one. Hello, ${name}!`);
context.extraOutputs.set(kafkaOutput, `Message two. Hello, ${name}!`);
return {
body: `Messages sent to kafka.`,
status: 200,
};
}
const extraOutputs = [];
extraOutputs.push(kafkaOutput);
app.http("kafkaOutputManyWithHttp", {
methods: ["GET", "POST"],
authLevel: "anonymous",
extraOutputs,
handler: kafkaOutputManyWithHttp,
});
These examples show how to send an event message with headers to a Kafka topic:
import {
app,
HttpRequest,
HttpResponseInit,
InvocationContext,
output,
} from "@azure/functions";
const kafkaOutput = output.generic({
type: "kafka",
direction: "out",
topic: "topic",
brokerList: "%BrokerList%",
username: "ConfluentCloudUsername",
password: "ConfluentCloudPassword",
protocol: "saslSsl",
authenticationMode: "plain",
});
export async function kafkaOutputWithHttp(
request: HttpRequest,
context: InvocationContext
): Promise<HttpResponseInit> {
context.log(`Http function processed request for url "${request.url}"`);
const body = await request.text();
const parsedbody = JSON.parse(body);
// assuming body is of the format { "key": "key", "value": {JSON object} }
context.extraOutputs.set(
kafkaOutput,
`{ "Offset":364,"Partition":0,"Topic":"test-topic","Timestamp":"2022-04-09T03:20:06.591Z", "Value": "${JSON.stringify(
parsedbody.value
).replace(/"/g, '\\"')}", "Key":"${
parsedbody.key
}", "Headers": [{ "Key": "language", "Value": "typescript" }] }`
);
context.log(
`Sending message to kafka: ${context.extraOutputs.get(kafkaOutput)}`
);
return {
body: `Message sent to kafka with value: ${context.extraOutputs.get(
kafkaOutput
)}`,
status: 200,
};
}
const extraOutputs = [];
extraOutputs.push(kafkaOutput);
app.http("kafkaOutputWithHttp", {
methods: ["GET", "POST"],
authLevel: "anonymous",
extraOutputs,
handler: kafkaOutputWithHttp,
});
For a complete set of working TypeScript examples, see the Kafka extension repository.
The specific properties of the function.json file depend on your event provider, which in these examples are either Confluent or Azure Event Hubs. The following examples show a Kafka output binding for a function that an HTTP request triggers and sends data from the request to the Kafka topic.
The following function.json defines the trigger for the specific provider in these examples:
{
"bindings": [
{
"authLevel": "function",
"type": "httpTrigger",
"direction": "in",
"name": "Request",
"methods": [
"get"
]
},
{
"type": "kafka",
"name": "outputMessage",
"brokerList": "BrokerList",
"topic": "topic",
"username" : "%ConfluentCloudUserName%",
"password" : "%ConfluentCloudPassword%",
"protocol": "SASLSSL",
"authenticationMode": "PLAIN",
"direction": "out"
},
{
"type": "http",
"direction": "out",
"name": "Response"
}
]
}
The following code sends a message to the topic:
using namespace System.Net
# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)
# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."
# Interact with query parameters or the body of the request.
$message = $Request.Query.Message
$message
Push-OutputBinding -Name outputMessage -Value ($message)
# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
StatusCode = [HttpStatusCode]::OK
})
The following code sends multiple messages as an array to the same topic:
using namespace System.Net
# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)
# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."
$message = @("one", "two")
Push-OutputBinding -Name outputMessage -Value ($message)
# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
StatusCode = [HttpStatusCode]::OK
})
The following example shows how to send an event message with headers to the same Kafka topic:
using namespace System.Net
# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)
# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."
# Interact with query parameters or the body of the request.
$message = $Request.Query.Message
if (-not $message) {
$message = $Request.Body.Message
}
$kevent = @{
Offset = 364
Partition = 0
Topic = "kafkaeventhubtest1"
Timestamp = "2022-04-09T03:20:06.591Z"
Value = $message
Headers= @(@{
Key= "test"
Value= "powershell"
}
)
}
Push-OutputBinding -Name Message -Value $kevent
# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
StatusCode = [HttpStatusCode]::OK
Body = 'ok'
})
For a complete set of working PowerShell examples, see the Kafka extension repository.
The usage of the output binding depends on your version of the Python programming model.
In the Python v2 model, you define your output binding directly in your function code using decorators. For more information, see the Azure Functions Python developer guide.
These examples show a Kafka output binding for a function that an HTTP request triggers and sends data from the request to the Kafka topic.
input_msg = req.params.get('message')
outputMessage.set(input_msg)
return 'OK'
@KafkaOutput.function_name(name="KafkaOutputMany")
@KafkaOutput.route(route="kafka_output_many")
@KafkaOutput.kafka_output(arg_name="outputMessage", topic="KafkaTopic", broker_list="KafkaBrokerList", username="KafkaUsername", password="KafkaPassword", protocol="SaslSsl", authentication_mode="Plain", data_type="string")
def kafka_output_many(req: func.HttpRequest, outputMessage: func.Out[str] ) -> func.HttpResponse:
outputMessage.set(json.dumps(['one', 'two']))
return 'OK'
To send events in a batch, send an array of messages, as shown in these examples:
@KafkaOutput.route(route="kafka_output_with_headers")
@KafkaOutput.kafka_output(arg_name="out", topic="KafkaTopic", broker_list="KafkaBrokerList", username="KafkaUsername", password="KafkaPassword", protocol="SaslSsl", authentication_mode="Plain")
def kafka_output_with_headers(req: func.HttpRequest, out: func.Out[str]) -> func.HttpResponse:
message = req.params.get('message')
kevent = { "Offset":0,"Partition":0,"Topic":"dummy","Timestamp":"2022-04-09T03:20:06.591Z", "Value": message, "Headers": [{ "Key": "test", "Value": "python" }] }
out.set(json.dumps(kevent))
return 'OK'
@KafkaOutput.function_name(name="KafkaOutputManyWithHeaders")
@KafkaOutput.route(route="kafka_output_many_with_headers")
@KafkaOutput.kafka_output(arg_name="out", topic="KafkaTopic", broker_list="KafkaBrokerList", username="KafkaUsername", password="KafkaPassword", protocol="SaslSsl", authentication_mode="Plain")
def kafka_output_many_with_headers(req: func.HttpRequest, out: func.Out[str]) -> func.HttpResponse:
kevent = [{ "Offset": 364, "Partition":0,"Topic":"kafkaeventhubtest1","Timestamp":"2022-04-09T03:20:06.591Z", "Value": "one", "Headers": [{ "Key": "test", "Value": "python" }] },
These examples show how to send an event message with headers to a Kafka topic:
For a complete set of working Python examples, see the Kafka extension repository.
The annotations you use to configure the output binding depend on the specific event provider.
The following function sends a message to the Kafka topic.
@FunctionName("KafkaOutput")
public HttpResponseMessage run(
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
@KafkaOutput(
name = "kafkaOutput",
topic = "topic",
brokerList="%BrokerList%",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
protocol = BrokerProtocol.SASLSSL
) OutputBinding<String> output,
final ExecutionContext context) {
context.getLogger().info("Java HTTP trigger processed a request.");
// Parse query parameter
String query = request.getQueryParameters().get("message");
String message = request.getBody().orElse(query);
context.getLogger().info("Message:" + message);
output.setValue(message);
return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
The following example shows how to send multiple messages to a Kafka topic.
@FunctionName("KafkaOutputMany")
public HttpResponseMessage run(
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
@KafkaOutput(
name = "kafkaOutput",
topic = "topic",
brokerList="%BrokerList%",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
protocol = BrokerProtocol.SASLSSL
) OutputBinding<String[]> output,
final ExecutionContext context) {
context.getLogger().info("Java HTTP trigger processed a request.");
String[] messages = new String[2];
messages[0] = "one";
messages[1] = "two";
output.setValue(messages);
return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
}
In this example, the output binding parameter is changed to string array.
The last example uses these KafkaEntity and KafkaHeader classes:
public class KafkaEntity {
public int Offset;
public int Partition;
public String Timestamp;
public String Topic;
public String Value;
public KafkaHeaders Headers[];
public KafkaEntity(int Offset, int Partition, String Topic, String Timestamp, String Value,KafkaHeaders[] headers) {
this.Offset = Offset;
this.Partition = Partition;
this.Topic = Topic;
this.Timestamp = Timestamp;
this.Value = Value;
this.Headers = headers;
}
public class KafkaHeaders{
public String Key;
public String Value;
public KafkaHeaders(String key, String value) {
this.Key = key;
this.Value = value;
}
The following example function sends a message with headers to a Kafka topic.
@FunctionName("KafkaOutputWithHeaders")
public HttpResponseMessage run(
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
@KafkaOutput(
name = "kafkaOutput",
topic = "topic",
brokerList="%BrokerList%",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
protocol = BrokerProtocol.SASLSSL
) OutputBinding<KafkaEntity> output,
final ExecutionContext context) {
context.getLogger().info("Java HTTP trigger processed a request.");
// Parse query parameter
String query = request.getQueryParameters().get("message");
String message = request.getBody().orElse(query);
KafkaHeaders[] headers = new KafkaHeaders[1];
headers[0] = new KafkaHeaders("test", "java");
KafkaEntity kevent = new KafkaEntity(364, 0, "topic", "2022-04-09T03:20:06.591Z", message, headers);
output.setValue(kevent);
return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
}
For a complete set of working Java examples for Confluent, see the Kafka extension repository.
Attributes
Both in-process and isolated worker process C# libraries use the Kafka attribute to define the function trigger.
The following table explains the properties you can set by using this attribute:
| Parameter | Description |
|---|---|
| BrokerList | (Required) The list of Kafka brokers to which the output is sent. See Connections for more information. |
| Topic | (Required) The topic to which the output is sent. |
| AvroSchema | (Optional) Schema of a generic record of message value when using the Avro protocol. |
| KeyAvroSchema | (Optional) Schema of a generic record of message key when using the Avro protocol. |
| KeyDataType | (Optional) Data type to send the message key as to Kafka Topic. If KeyAvroSchema is set, this value is generic record. Accepted values are Int, Long, String, and Binary. |
| MaxMessageBytes | (Optional) The maximum size of the output message being sent (in MB), with a default value of 1. |
| BatchSize | (Optional) Maximum number of messages batched in a single message set, with a default value of 10000. |
| EnableIdempotence | (Optional) When set to true, guarantees that messages are successfully produced exactly once and in the original produce order, with a default value of false |
| MessageTimeoutMs | (Optional) The local message timeout, in milliseconds. This value is only enforced locally and limits the time a produced message waits for successful delivery, with a default 300000. A time of 0 is infinite. This value is the maximum time used to deliver a message (including retries). Delivery error occurs when either the retry count or the message timeout are exceeded. |
| RequestTimeoutMs | (Optional) The acknowledgment timeout of the output request, in milliseconds, with a default of 5000. |
| MaxRetries | (Optional) The number of times to retry sending a failing Message, with a default of 2. Retrying may cause reordering, unless EnableIdempotence is set to true. |
| AuthenticationMode | (Optional) The authentication mode when using Simple Authentication and Security Layer (SASL) authentication. The supported values are NotSet (default), Gssapi, Plain, ScramSha256, ScramSha512, and OAuthBearer. |
| Username | (Optional) The username for SASL authentication. Not supported when AuthenticationMode is Gssapi. See Connections for more information. |
| Password | (Optional) The password for SASL authentication. Not supported when AuthenticationMode is Gssapi. See Connections for more information. |
| Protocol | (Optional) The security protocol used when communicating with brokers. The supported values are NotSet (default), plaintext, ssl, sasl_plaintext, sasl_ssl. |
| SslCaLocation | (Optional) Path to CA certificate file for verifying the broker's certificate. |
| SslCertificateLocation | (Optional) Path to the client's certificate. |
| SslKeyLocation | (Optional) Path to client's private key (PEM) used for authentication. |
| SslKeyPassword | (Optional) Password for client's certificate. |
| SslCertificatePEM | (Optional) Client certificate in PEM format as a string. See Connections for more information. |
| SslKeyPEM | (Optional) Client private key in PEM format as a string. See Connections for more information. |
| SslCaPEM | (Optional) CA certificate in PEM format as a string. See Connections for more information. |
| SslCertificateandKeyPEM | (Optional) Client certificate and key in PEM format as a string. See Connections for more information. |
| SchemaRegistryUrl | (Optional) URL for the Avro Schema Registry. See Connections for more information. |
| SchemaRegistryUsername | (Optional) Username for the Avro Schema Registry. See Connections for more information. |
| SchemaRegistryPassword | (Optional) Password for the Avro Schema Registry. See Connections for more information. |
| OAuthBearerMethod | (Optional) OAuth Bearer method. Accepted values are oidc and default. |
| OAuthBearerClientId | (Optional) When OAuthBearerMethod is set to oidc, this specifies the OAuth bearer client ID. See Connections for more information. |
| OAuthBearerClientSecret | (Optional) When OAuthBearerMethod is set to oidc, this specifies the OAuth bearer client secret. See Connections for more information. |
| OAuthBearerScope | (Optional) Specifies the scope of the access request to the broker. |
| OAuthBearerTokenEndpointUrl | (Optional) OAuth/OIDC issuer token endpoint HTTP(S) URI used to retrieve token when oidc method is used. See Connections for more information. |
| OAuthBearerExtensions | (Optional) Comma-separated list of key=value pairs to be provided as additional information to broker when oidc method is used. For example: supportFeatureX=true,organizationId=sales-emea. |
Annotations
The KafkaOutput annotation enables you to create a function that writes to a specific topic. Supported options include the following elements:
| Element | Description |
|---|---|
| name | The name of the variable that represents the brokered data in function code. |
| brokerList | (Required) The list of Kafka brokers to which the output is sent. See Connections for more information. |
| topic | (Required) The topic to which the output is sent. |
| dataType | Defines how Functions handles the parameter value. By default, the value is obtained as a string and Functions tries to deserialize the string to actual plain-old Java object (POJO). When string, the input is treated as just a string. When binary, the message is received as binary data, and Functions tries to deserialize it to an actual parameter type byte[]. |
| avroSchema | (Optional) Schema of a generic record when using the Avro protocol. (Currently not supported for Java.) |
| maxMessageBytes | (Optional) The maximum size of the output message being sent (in MB), with a default value of 1. |
| batchSize | (Optional) Maximum number of messages batched in a single message set, with a default value of 10000. |
| enableIdempotence | (Optional) When set to true, guarantees that messages are successfully produced exactly once and in the original produce order, with a default value of false. |
| messageTimeoutMs | (Optional) The local message timeout, in milliseconds. This value is only enforced locally and limits the time a produced message waits for successful delivery, with a default 300000. A time of 0 is infinite. This value is the maximum time used to deliver a message (including retries). Delivery error occurs when either the retry count or the message timeout are exceeded. |
| requestTimeoutMs | (Optional) The acknowledgment timeout of the output request, in milliseconds, with a default of 5000. |
| maxRetries | (Optional) The number of times to retry sending a failing Message, with a default of 2. Retrying might cause reordering, unless EnableIdempotence is set to true. |
| authenticationMode | (Optional) The authentication mode when using Simple Authentication and Security Layer (SASL) authentication. The supported values are NotSet (default), Gssapi, Plain, ScramSha256, ScramSha512. |
| username | (Optional) The username for SASL authentication. Not supported when AuthenticationMode is Gssapi. See Connections for more information. |
| password | (Optional) The password for SASL authentication. Not supported when AuthenticationMode is Gssapi. See Connections for more information. |
| protocol | (Optional) The security protocol used when communicating with brokers. The supported values are NotSet (default), plaintext, ssl, sasl_plaintext, sasl_ssl. |
| sslCaLocation | (Optional) Path to CA certificate file for verifying the broker's certificate. |
| sslCertificateLocation | (Optional) Path to the client's certificate. |
| sslKeyLocation | (Optional) Path to client's private key (PEM) used for authentication. |
| sslKeyPassword | (Optional) Password for client's certificate. |
| schemaRegistryUrl | (Optional) URL for the Avro Schema Registry. See Connections for more information. |
| schemaRegistryUsername | (Optional) Username for the Avro Schema Registry. See Connections for more information. |
| schemaRegistryPassword | (Optional) Password for the Avro Schema Registry. See Connections for more information. |
Configuration
The following table explains the binding configuration properties that you set in the function.json file.
| function.json property | Description |
|---|---|
| type | Set to kafka. |
| direction | Set to out. |
| name | The name of the variable that represents the brokered data in function code. |
| brokerList | (Required) The list of Kafka brokers to which the output is sent. See Connections for more information. |
| topic | (Required) The topic to which the output is sent. |
| avroSchema | (Optional) Schema of a generic record when using the Avro protocol. |
| keyAvroSchema | (Optional) Schema of a generic record of message key when using the Avro protocol. |
| keyDataType | (Optional) Data type to send the message key as to Kafka Topic. If keyAvroSchema is set, this value is generic record. Accepted values are Int, Long, String, and Binary. |
| maxMessageBytes | (Optional) The maximum size of the output message being sent (in MB), with a default value of 1. |
| batchSize | (Optional) Maximum number of messages batched in a single message set, with a default value of 10000. |
| enableIdempotence | (Optional) When set to true, guarantees that messages are successfully produced exactly once and in the original produce order, with a default value of false. |
| messageTimeoutMs | (Optional) The local message timeout, in milliseconds. This value is only enforced locally and limits the time a produced message waits for successful delivery, with a default 300000. A time of 0 is infinite. This value is the maximum time used to deliver a message (including retries). Delivery error occurs when either the retry count or the message timeout are exceeded. |
| requestTimeoutMs | (Optional) The acknowledgment timeout of the output request, in milliseconds, with a default of 5000. |
| maxRetries | (Optional) The number of times to retry sending a failing Message, with a default of 2. Retrying might cause reordering, unless EnableIdempotence is set to true. |
| authenticationMode | (Optional) The authentication mode when using Simple Authentication and Security Layer (SASL) authentication. The supported values are NotSet (default), Gssapi, Plain, ScramSha256, ScramSha512. |
| username | (Optional) The username for SASL authentication. Not supported when AuthenticationMode is Gssapi. See Connections for more information. |
| password | (Optional) The password for SASL authentication. Not supported when AuthenticationMode is Gssapi. See Connections for more information. |
| protocol | (Optional) The security protocol used when communicating with brokers. The supported values are NotSet (default), plaintext, ssl, sasl_plaintext, sasl_ssl. |
| sslCaLocation | (Optional) Path to CA certificate file for verifying the broker's certificate. |
| sslCertificateLocation | (Optional) Path to the client's certificate. |
| sslKeyLocation | (Optional) Path to client's private key (PEM) used for authentication. |
| sslKeyPassword | (Optional) Password for client's certificate. |
| sslCertificatePEM | (Optional) Client certificate in PEM format as a string. See Connections for more information. |
| sslKeyPEM | (Optional) Client private key in PEM format as a string. See Connections for more information. |
| sslCaPEM | (Optional) CA certificate in PEM format as a string. See Connections for more information. |
| sslCertificateandKeyPEM | (Optional) Client certificate and key in PEM format as a string. See Connections for more information. |
| schemaRegistryUrl | (Optional) URL for the Avro Schema Registry. See Connections for more information. |
| schemaRegistryUsername | (Optional) Username for the Avro Schema Registry. See Connections for more information. |
| schemaRegistryPassword | (Optional) Password for the Avro Schema Registry. See Connections for more information. |
| oAuthBearerMethod | (Optional) OAuth Bearer method. Accepted values are oidc and default. |
| oAuthBearerClientId | (Optional) When oAuthBearerMethod is set to oidc, this specifies the OAuth bearer client ID. See Connections for more information. |
| oAuthBearerClientSecret | (Optional) When oAuthBearerMethod is set to oidc, this specifies the OAuth bearer client secret. See Connections for more information. |
| oAuthBearerScope | (Optional) Specifies the scope of the access request to the broker. |
| oAuthBearerTokenEndpointUrl | (Optional) OAuth/OIDC issuer token endpoint HTTP(S) URI used to retrieve token when oidc method is used. See Connections for more information. |
Configuration
The following table explains the binding configuration properties that you set in the function.json file. Python uses snake_case naming conventions for configuration properties.
| function.json property | Description |
|---|---|
| type | Set to kafka. |
| direction | Set to out. |
| name | The name of the variable that represents the brokered data in function code. |
| broker_list | (Required) The list of Kafka brokers to which the output is sent. See Connections for more information. |
| topic | (Required) The topic to which the output is sent. |
| avroSchema | (Optional) Schema of a generic record when using the Avro protocol. |
| maxMessageBytes | (Optional) The maximum size of the output message being sent (in MB), with a default value of 1. |
| batchSize | (Optional) Maximum number of messages batched in a single message set, with a default value of 10000. |
| enableIdempotence | (Optional) When set to true, guarantees that messages are successfully produced exactly once and in the original produce order, with a default value of false. |
| messageTimeoutMs | (Optional) The local message timeout, in milliseconds. This value is only enforced locally and limits the time a produced message waits for successful delivery, with a default 300000. A time of 0 is infinite. This value is the maximum time used to deliver a message (including retries). Delivery error occurs when either the retry count or the message timeout are exceeded. |
| requestTimeoutMs | (Optional) The acknowledgment timeout of the output request, in milliseconds, with a default of 5000. |
| maxRetries | (Optional) The number of times to retry sending a failing Message, with a default of 2. Retrying might cause reordering, unless EnableIdempotence is set to true. |
| authentication_mode | (Optional) The authentication mode when using Simple Authentication and Security Layer (SASL) authentication. The supported values are NOTSET (default), Gssapi, Plain, ScramSha256, ScramSha512. |
| username | (Optional) The username for SASL authentication. Not supported when authentication_mode is Gssapi. See Connections for more information. |
| password | (Optional) The password for SASL authentication. Not supported when authentication_mode is Gssapi. See Connections for more information. |
| protocol | (Optional) The security protocol used when communicating with brokers. The supported values are NOTSET (default), plaintext, ssl, sasl_plaintext, sasl_ssl. |
| sslCaLocation | (Optional) Path to CA certificate file for verifying the broker's certificate. |
| sslCertificateLocation | (Optional) Path to the client's certificate. |
| sslKeyLocation | (Optional) Path to client's private key (PEM) used for authentication. |
| sslKeyPassword | (Optional) Password for client's certificate. |
| schema_registry_url | (Optional) URL for the Avro Schema Registry. See Connections for more information. |
| schema_registry_username | (Optional) Username for the Avro Schema Registry. See Connections for more information. |
| schema_registry_password | (Optional) Password for the Avro Schema Registry. See Connections for more information. |
| o_auth_bearer_method | (Optional) OAuth Bearer method. Accepted values are oidc and default. |
| o_auth_bearer_client_id | (Optional) When o_auth_bearer_method is set to oidc, this specifies the OAuth bearer client ID. See Connections for more information. |
| o_auth_bearer_client_secret | (Optional) When o_auth_bearer_method is set to oidc, this specifies the OAuth bearer client secret. See Connections for more information. |
| o_auth_bearer_scope | (Optional) Specifies the scope of the access request to the broker. |
| o_auth_bearer_token_endpoint_url | (Optional) OAuth/OIDC issuer token endpoint HTTP(S) URI used to retrieve token when oidc method is used. See Connections for more information. |
Note
Certificate PEM-related properties and Avro key-related properties aren't yet available in the Python library.
Usage
The offset, partition, and timestamp for the event are generated at runtime. You can set only the value and headers inside the function. You set the topic in the function.json file.
Make sure you have access to the Kafka topic where you want to write. You configure the binding with access and connection credentials to the Kafka topic.
In a Premium plan, you must enable runtime scale monitoring for the Kafka output to scale out to multiple instances. To learn more, see Enable runtime scaling.
For a complete set of supported host.json settings for the Kafka trigger, see host.json settings.
Connections
Store all connection information required by your triggers and bindings in application settings, not in the binding definitions in your code. This guidance applies to credentials, which you should never store in your code.
Important
Credential settings must reference an application setting. Don't hard-code credentials in your code or configuration files. When running locally, use the local.settings.json file for your credentials, and don't publish the local.settings.json file.
When connecting to a managed Kafka cluster provided by Confluent in Azure, you can use one of the following authentication methods.
Note
When using the Flex Consumption plan, file location-based certificate authentication properties (SslCaLocation, SslCertificateLocation, SslKeyLocation) aren't supported. Instead, use the PEM-based certificate properties (SslCaPEM, SslCertificatePEM, SslKeyPEM, SslCertificateandKeyPEM) or store certificates in Azure Key Vault.
Schema Registry
To make use of schema registry provided by Confluent in Kafka Extension, set the following credentials:
| Setting | Recommended Value | Description |
|---|---|---|
| SchemaRegistryUrl | SchemaRegistryUrl |
URL of the schema registry service used for schema management. Usually of the format https://psrc-xyz.us-east-2.aws.confluent.cloud |
| SchemaRegistryUsername | CONFLUENT_API_KEY |
Username for basic auth on schema registry (if required). |
| SchemaRegistryPassword | CONFLUENT_API_SECRET |
Password for basic auth on schema registry (if required). |
Username/Password authentication
While using this form of authentication, make sure that Protocol is set to either SaslPlaintext or SaslSsl, AuthenticationMode is set to Plain, ScramSha256 or ScramSha512 and, if the CA cert being used is different from the default ISRG Root X1 cert, make sure to update SslCaLocation or SslCaPEM.
| Setting | Recommended value | Description |
|---|---|---|
| BrokerList | BootstrapServer |
App setting named BootstrapServer contains the value of bootstrap server found in Confluent Cloud settings page. The value resembles xyz-xyzxzy.westeurope.azure.confluent.cloud:9092. |
| Username | ConfluentCloudUsername |
App setting named ConfluentCloudUsername contains the API access key from the Confluent Cloud web site. |
| Password | ConfluentCloudPassword |
App setting named ConfluentCloudPassword contains the API secret obtained from the Confluent Cloud web site. |
| SslCaPEM | SSLCaPemCertificate |
App setting named SSLCaPemCertificate that contains the CA certificate as a string in PEM format. The value should follow the standard format, for example: -----BEGIN CERTIFICATE-----\nMII....JQ==\n-----END CERTIFICATE-----. |
SSL authentication
Ensure that Protocol is set to SSL.
| Setting | Recommended Value | Description |
|---|---|---|
| BrokerList | BootstrapServer |
App setting named BootstrapServer contains the value of bootstrap server found in Confluent Cloud settings page. The value resembles xyz-xyzxzy.westeurope.azure.confluent.cloud:9092. |
| SslCaPEM | SslCaCertificatePem |
App setting named SslCaCertificatePem that contains PEM value of the CA certificate as a string. The value should follow the standard format: -----BEGIN CERTIFICATE-----\nMII...JQ==\n-----END CERTIFICATE----- |
| SslCertificatePEM | SslClientCertificatePem |
App setting named SslClientCertificatePem that contains PEM value of the client certificate as a string. The value should follow the standard format: -----BEGIN CERTIFICATE-----\nMII...JQ==\n-----END CERTIFICATE----- |
| SslKeyPEM | SslClientKeyPem |
App setting named SslClientKeyPem that contains PEM value of the client private key as a string. The value should follow the standard format: -----BEGIN PRIVATE KEY-----\nMII...JQ==\n-----END PRIVATE KEY----- |
| SslCertificateandKeyPEM | SslClientCertificateAndKeyPem |
App setting named SslClientCertificateAndKeyPem that contains PEM value of the client certificate and client private key concatenated as a string. The value should follow the standard format: -----BEGIN CERTIFICATE-----\nMII....JQ==\n-----END CERTIFICATE-----\n-----BEGIN PRIVATE KEY-----\nMIIE....BM=\n-----END PRIVATE KEY----- |
| SslKeyPassword | SslClientKeyPassword |
App setting named SslClientKeyPassword that contains the password for the private key (if any). |
OAuth authentication
When using OAuth authentication, configure the OAuth-related properties in your binding definitions.
The string values you use for these settings must be present as application settings in Azure or in the Values collection in the local.settings.json file during local development.
You should also set the Protocol and AuthenticationMode in your binding definitions.