Edit

Share via


Apache Kafka trigger for Azure Functions

Use the Apache Kafka trigger in Azure Functions to run your function code in response to messages in Kafka topics. You can also use a Kafka output binding to write from your function to a topic. For information on setup and configuration details, see Apache Kafka bindings for Azure Functions overview.

Important

Kafka bindings are available for Functions on the Flex Consumption plan, Elastic Premium Plan, and Dedicated (App Service) plan. They are only supported on version 4.x of the Functions runtime.

Example

The usage of the trigger depends on the C# modality used in your function app, which can be one of the following modes:

A compiled C# function that uses an isolated worker process class library that runs in a process that's separate from the runtime.

The attributes you use depend on the specific event provider.

The following example shows a C# function that reads and logs the Kafka message as a Kafka event:

[Function("KafkaTrigger")]
public static void Run(
    [KafkaTrigger("BrokerList",
                  "topic",
                  Username = "ConfluentCloudUserName",
                  Password = "ConfluentCloudPassword",
                  Protocol = BrokerProtocol.SaslSsl,
                  AuthenticationMode = BrokerAuthenticationMode.Plain,
                  ConsumerGroup = "$Default")] string eventData, FunctionContext context)
{
    var logger = context.GetLogger("KafkaFunction");
    logger.LogInformation($"C# Kafka trigger function processed a message: {JObject.Parse(eventData)["Value"]}");
}

To receive events in a batch, use a string array as input, as shown in the following example:

[Function("KafkaTriggerMany")]
public static void Run(
    [KafkaTrigger("BrokerList",
                  "topic",
                  Username = "ConfluentCloudUserName",
                  Password = "ConfluentCloudPassword",
                  Protocol = BrokerProtocol.SaslSsl,
                  AuthenticationMode = BrokerAuthenticationMode.Plain,
                  ConsumerGroup = "$Default",
                  IsBatched = true)] string[] events, FunctionContext context)
{
    foreach (var kevent in events)
    {
        var logger = context.GetLogger("KafkaFunction");
        logger.LogInformation($"C# Kafka trigger function processed a message: {JObject.Parse(kevent)["Value"]}");
    }

The following function logs the message and headers for the Kafka Event:

[Function("KafkaTriggerWithHeaders")]
public static void Run(
    [KafkaTrigger("BrokerList",
                  "topic",
                  Username = "ConfluentCloudUserName",
                  Password = "ConfluentCloudPassword",
                  Protocol = BrokerProtocol.SaslSsl,
                  AuthenticationMode = BrokerAuthenticationMode.Plain,
                  ConsumerGroup = "$Default")] string eventData, FunctionContext context)
{
    var eventJsonObject = JObject.Parse(eventData);
    var logger = context.GetLogger("KafkaFunction");
    logger.LogInformation($"C# Kafka trigger function processed a message: {eventJsonObject["Value"]}");
    var headersJArr = eventJsonObject["Headers"] as JArray;
    logger.LogInformation("Headers for this event: ");
    foreach (JObject header in headersJArr)
    {
        logger.LogInformation($"{header["Key"]} {System.Text.Encoding.UTF8.GetString((byte[])header["Value"])}");

    }
}

For a complete set of working .NET examples, see the Kafka extension repository.

The usage of the trigger depends on your version of the Node.js programming model.

In the Node.js v4 model, you define your trigger directly in your function code. For more information, see the Azure Functions Node.js developer guide.

In these examples, the event providers are either Confluent or Azure Event Hubs. These examples show how to define a Kafka trigger for a function that reads a Kafka message.

const { app } = require("@azure/functions");

async function kafkaTrigger(event, context) {
  context.log("Event Offset: " + event.Offset);
  context.log("Event Partition: " + event.Partition);
  context.log("Event Topic: " + event.Topic);
  context.log("Event Timestamp: " + event.Timestamp);
  context.log("Event Key: " + event.Key);
  context.log("Event Value (as string): " + event.Value);

  let event_obj = JSON.parse(event.Value);

  context.log("Event Value Object: ");
  context.log("   Value.registertime: ", event_obj.registertime.toString());
  context.log("   Value.userid: ", event_obj.userid);
  context.log("   Value.regionid: ", event_obj.regionid);
  context.log("   Value.gender: ", event_obj.gender);
}

app.generic("Kafkatrigger", {
  trigger: {
    type: "kafkaTrigger",
    direction: "in",
    name: "event",
    topic: "topic",
    brokerList: "%BrokerList%",
    username: "%ConfluentCloudUserName%",
    password: "%ConfluentCloudPassword%",
    consumerGroup: "$Default",
    protocol: "saslSsl",
    authenticationMode: "plain",
    dataType: "string"
  },
  handler: kafkaTrigger,
});

To receive events in a batch, set the cardinality value to many, as shown in these examples:

const { app } = require("@azure/functions");

async function kafkaTriggerMany(events, context) {
  for (const event of events) {
    context.log("Event Offset: " + event.Offset);
    context.log("Event Partition: " + event.Partition);
    context.log("Event Topic: " + event.Topic);
    context.log("Event Key: " + event.Key);
    context.log("Event Timestamp: " + event.Timestamp);
    context.log("Event Value (as string): " + event.Value);

    let event_obj = JSON.parse(event.Value);

    context.log("Event Value Object: ");
    context.log("   Value.registertime: ", event_obj.registertime.toString());
    context.log("   Value.userid: ", event_obj.userid);
    context.log("   Value.regionid: ", event_obj.regionid);
    context.log("   Value.gender: ", event_obj.gender);
  }
}

app.generic("kafkaTriggerMany", {
  trigger: {
    type: "kafkaTrigger",
    direction: "in",
    name: "event",
    topic: "topic",
    brokerList: "%BrokerList%",
    username: "%ConfluentCloudUserName%",
    password: "%ConfluentCloudPassword%",
    consumerGroup: "$Default",
    protocol: "saslSsl",
    authenticationMode: "plain",
    dataType: "string",
    cardinality: "MANY"
  },
  handler: kafkaTriggerMany,
});

You can define a generic Avro schema for the event passed to the trigger. This example defines the trigger for the specific provider with a generic Avro schema:

const { app } = require("@azure/functions");

async function kafkaAvroGenericTrigger(event, context) {
  context.log("Processed kafka event: ", event);
  if (context.triggerMetadata?.key !== undefined) {
    context.log("message key: ", context.triggerMetadata?.key);
  }
}

app.generic("kafkaAvroGenericTrigger", {
  trigger: {
    type: "kafkaTrigger",
    direction: "in",
    name: "event",
    protocol: "SASLSSL",
    password: "EventHubConnectionString",
    dataType: "string",
    topic: "topic",
    authenticationMode: "PLAIN",
    avroSchema:
      '{"type":"record","name":"Payment","namespace":"io.confluent.examples.clients.basicavro","fields":[{"name":"id","type":"string"},{"name":"amount","type":"double"},{"name":"type","type":"string"}]}',
    consumerGroup: "$Default",
    username: "$ConnectionString",
    brokerList: "%BrokerList%",
  },
  handler: kafkaAvroGenericTrigger,
});

For a complete set of working JavaScript examples, see the Kafka extension repository.

import { app, InvocationContext } from "@azure/functions";

// This is a sample interface that describes the actual data in your event.
interface EventData {
  registertime: number;
  userid: string;
  regionid: string;
  gender: string;
}

export async function kafkaTrigger(
  event: any,
  context: InvocationContext
): Promise<void> {
  context.log("Event Offset: " + event.Offset);
  context.log("Event Partition: " + event.Partition);
  context.log("Event Topic: " + event.Topic);
  context.log("Event Timestamp: " + event.Timestamp);
  context.log("Event Value (as string): " + event.Value);

  let event_obj: EventData = JSON.parse(event.Value);

  context.log("Event Value Object: ");
  context.log("   Value.registertime: ", event_obj.registertime.toString());
  context.log("   Value.userid: ", event_obj.userid);
  context.log("   Value.regionid: ", event_obj.regionid);
  context.log("   Value.gender: ", event_obj.gender);
}

app.generic("Kafkatrigger", {
  trigger: {
    type: "kafkaTrigger",
    direction: "in",
    name: "event",
    topic: "topic",
    brokerList: "%BrokerList%",
    username: "%ConfluentCloudUserName%",
    password: "%ConfluentCloudPassword%",
    consumerGroup: "$Default",
    protocol: "saslSsl",
    authenticationMode: "plain",
    dataType: "string"
  },
  handler: kafkaTrigger,
});

To receive events in a batch, set the cardinality value to many, as shown in these examples:

import { app, InvocationContext } from "@azure/functions";

// This is a sample interface that describes the actual data in your event.
interface EventData {
    registertime: number;
    userid: string;
    regionid: string;
    gender: string;
}

interface KafkaEvent {
    Offset: number;
    Partition: number;
    Topic: string;
    Timestamp: number;
    Value: string;
}

export async function kafkaTriggerMany(
    events: any,
    context: InvocationContext
): Promise<void> {
    for (const event of events) {
        context.log("Event Offset: " + event.Offset);
        context.log("Event Partition: " + event.Partition);
        context.log("Event Topic: " + event.Topic);
        context.log("Event Timestamp: " + event.Timestamp);
        context.log("Event Value (as string): " + event.Value);

        let event_obj: EventData = JSON.parse(event.Value);

        context.log("Event Value Object: ");
        context.log("   Value.registertime: ", event_obj.registertime.toString());
        context.log("   Value.userid: ", event_obj.userid);
        context.log("   Value.regionid: ", event_obj.regionid);
        context.log("   Value.gender: ", event_obj.gender);
    }
}

app.generic("kafkaTriggerMany", {
  trigger: {
    type: "kafkaTrigger",
    direction: "in",
    name: "event",
    topic: "topic",
    brokerList: "%BrokerList%",
    username: "%ConfluentCloudUserName%",
    password: "%ConfluentCloudPassword%",
    consumerGroup: "$Default",
    protocol: "saslSsl",
    authenticationMode: "plain",
    dataType: "string",
    cardinality: "MANY"
  },
  handler: kafkaTriggerMany,
});

You can define a generic Avro schema for the event passed to the trigger. This example defines the trigger for the specific provider with a generic Avro schema:

import { app, InvocationContext } from "@azure/functions";

export async function kafkaAvroGenericTrigger(
  event: any,
  context: InvocationContext
): Promise<void> {
  context.log("Processed kafka event: ", event);
  context.log(
    `Message ID: ${event.id}, amount: ${event.amount}, type: ${event.type}`
  );
  if (context.triggerMetadata?.key !== undefined) {
    context.log(`Message Key : ${context.triggerMetadata?.key}`);
  }
}

app.generic("kafkaAvroGenericTrigger", {
  trigger: {
    type: "kafkaTrigger",
    direction: "in",
    name: "event",
    protocol: "SASLSSL",
    username: "ConfluentCloudUsername",
    password: "ConfluentCloudPassword",
    dataType: "string",
    topic: "topic",
    authenticationMode: "PLAIN",
    avroSchema:
      '{"type":"record","name":"Payment","namespace":"io.confluent.examples.clients.basicavro","fields":[{"name":"id","type":"string"},{"name":"amount","type":"double"},{"name":"type","type":"string"}]}',
    consumerGroup: "$Default",
    brokerList: "%BrokerList%",
  },
  handler: kafkaAvroGenericTrigger,
});

For a complete set of working TypeScript examples, see the Kafka extension repository.

The specific properties of the function.json file depend on your event provider. In these examples, the event providers are either Confluent or Azure Event Hubs. The following examples show a Kafka trigger for a function that reads and logs a Kafka message.

The following function.json file defines the trigger for the specific provider:

{
    "bindings": [
      {
            "type": "kafkaTrigger",
            "name": "kafkaEvent",
            "direction": "in",
            "protocol" : "SASLSSL",
            "password" : "%ConfluentCloudPassword%",
            "dataType" : "string",
            "topic" : "topic",
            "authenticationMode" : "PLAIN",
            "consumerGroup" : "$Default",
            "username" : "%ConfluentCloudUserName%",
            "brokerList" : "%BrokerList%",
            "sslCaLocation": "confluent_cloud_cacert.pem"
        }
    ]
}

The following code runs when the function is triggered:

using namespace System.Net

param($kafkaEvent, $TriggerMetadata)

Write-Output "Powershell Kafka trigger function called for message $kafkaEvent.Value"

To receive events in a batch, set the cardinality value to many in the function.json file, as shown in the following examples:

{
    "bindings": [
      {
            "type": "kafkaTrigger",
            "name": "kafkaEvent",
            "direction": "in",
            "protocol" : "SASLSSL",
            "password" : "%ConfluentCloudPassword%",
            "dataType" : "string",
            "topic" : "topic",
            "authenticationMode" : "PLAIN",
            "cardinality" : "MANY",
            "consumerGroup" : "$Default",
            "username" : "%ConfluentCloudUserName%",
            "brokerList" : "%BrokerList%",
            "sslCaLocation": "confluent_cloud_cacert.pem"
        }
    ]
}

The following code parses the array of events and logs the event data:

using namespace System.Net

param($kafkaEvents, $TriggerMetadata)

$kafkaEvents
foreach ($kafkaEvent in $kafkaEvents) {
    $event = $kafkaEvent | ConvertFrom-Json -AsHashtable
    Write-Output "Powershell Kafka trigger function called for message $event.Value"
}

The following code logs the header data:

using namespace System.Net

param($kafkaEvents, $TriggerMetadata)

foreach ($kafkaEvent in $kafkaEvents) {
    $kevent = $kafkaEvent | ConvertFrom-Json -AsHashtable
    Write-Output "Powershell Kafka trigger function called for message $kevent.Value"
    Write-Output "Headers for this message:"
    foreach ($header in $kevent.Headers) {
        $DecodedValue = [System.Text.Encoding]::Unicode.GetString([System.Convert]::FromBase64String($header.Value))
        $Key = $header.Key
        Write-Output "Key: $Key Value: $DecodedValue"
    }
}

You can define a generic Avro schema for the event passed to the trigger. The following function.json defines the trigger for the specific provider with a generic Avro schema:

{
  "bindings" : [ {
    "type" : "kafkaTrigger",
    "direction" : "in",
    "name" : "kafkaEvent",
    "protocol" : "SASLSSL",
    "password" : "ConfluentCloudPassword",
    "topic" : "topic",
    "authenticationMode" : "PLAIN",
    "avroSchema" : "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
    "consumerGroup" : "$Default",
    "username" : "ConfluentCloudUsername",
    "brokerList" : "%BrokerList%"
  } ]
}

The following code runs when the function is triggered:

using namespace System.Net

param($kafkaEvent, $TriggerMetadata)

Write-Output "Powershell Kafka trigger function called for message $kafkaEvent.Value"

For a complete set of working PowerShell examples, see the Kafka extension repository.

The usage of the trigger depends on your version of the Python programming model.

In the Python v2 model, you define your trigger directly in your function code using decorators. For more information, see the Azure Functions Python developer guide.

These examples show how to define a Kafka trigger for a function that reads a Kafka message.

@KafkaTrigger.function_name(name="KafkaTrigger")
@KafkaTrigger.kafka_trigger(
    arg_name="kevent",
    topic="KafkaTopic",
    broker_list="KafkaBrokerList",
    username="KafkaUsername",
    password="KafkaPassword",
    protocol="SaslSsl",
    authentication_mode="Plain",
    consumer_group="$Default1")
def kafka_trigger(kevent : func.KafkaEvent):
    logging.info(kevent.get_body().decode('utf-8'))
    logging.info(kevent.metadata)

This example receives events in a batch by setting the cardinality value to many.

@KafkaTrigger.function_name(name="KafkaTriggerMany")
@KafkaTrigger.kafka_trigger(
    arg_name="kevents",
    topic="KafkaTopic",
    broker_list="KafkaBrokerList",
    username="KafkaUsername",
    password="KafkaPassword",
    protocol="SaslSsl",
    authentication_mode="Plain",
    cardinality="MANY",
    data_type="string",
    consumer_group="$Default2")
def kafka_trigger_many(kevents : typing.List[func.KafkaEvent]):
    for event in kevents:
        logging.info(event.get_body())

You can define a generic Avro schema for the event passed to the trigger.

@KafkaTriggerAvro.function_name(name="KafkaTriggerAvroOne")
@KafkaTriggerAvro.kafka_trigger(
    arg_name="kafkaTriggerAvroGeneric",
    topic="KafkaTopic",
    broker_list="KafkaBrokerList",
    username="KafkaUsername",
    password="KafkaPassword",
    protocol="SaslSsl",
    authentication_mode="Plain",
    consumer_group="$Default",
    avro_schema= "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}")
def kafka_trigger_avro_one(kafkaTriggerAvroGeneric : func.KafkaEvent):
    logging.info(kafkaTriggerAvroGeneric.get_body().decode('utf-8'))
    logging.info(kafkaTriggerAvroGeneric.metadata)

For a complete set of working Python examples, see the Kafka extension repository.

The annotations you use to configure your trigger depend on the specific event provider.

The following example shows a Java function that reads and logs the content of the Kafka event:

@FunctionName("KafkaTrigger")
public void runSingle(
        @KafkaTrigger(
            name = "KafkaTrigger",
            topic = "topic",  
            brokerList="%BrokerList%",
            consumerGroup="$Default", 
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            protocol = BrokerProtocol.SASLSSL,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
            dataType = "string"
         ) String kafkaEventData,
        final ExecutionContext context) {
        context.getLogger().info(kafkaEventData);
}

To receive events in a batch, use an input string as an array, as shown in the following example:

@FunctionName("KafkaTriggerMany")
public void runMany(
        @KafkaTrigger(
            name = "kafkaTriggerMany",
            topic = "topic",  
            brokerList="%BrokerList%",
            consumerGroup="$Default", 
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            protocol = BrokerProtocol.SASLSSL,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
            cardinality = Cardinality.MANY,
            dataType = "string"
         ) String[] kafkaEvents,
        final ExecutionContext context) {
        for (String kevent: kafkaEvents) {
            context.getLogger().info(kevent);
        }    
}

The following function logs the message and headers for the Kafka Event:

@FunctionName("KafkaTriggerManyWithHeaders")
public void runSingle(
        @KafkaTrigger(
            name = "KafkaTrigger",
            topic = "topic",  
            brokerList="%BrokerList%",
            consumerGroup="$Default", 
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            protocol = BrokerProtocol.SASLSSL,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
            dataType = "string",
            cardinality = Cardinality.MANY
         ) List<String> kafkaEvents,
        final ExecutionContext context) {
            Gson gson = new Gson(); 
            for (String keventstr: kafkaEvents) {
                KafkaEntity kevent = gson.fromJson(keventstr, KafkaEntity.class);
                context.getLogger().info("Java Kafka trigger function called for message: " + kevent.Value);
                context.getLogger().info("Headers for the message:");
                for (KafkaHeaders header : kevent.Headers) {
                    String decodedValue = new String(Base64.getDecoder().decode(header.Value));
                    context.getLogger().info("Key:" + header.Key + " Value:" + decodedValue);                    
                }                
            }
        }

You can define a generic Avro schema for the event passed to the trigger. The following function defines a trigger for the specific provider with a generic Avro schema:

private static final String schema = "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}";

@FunctionName("KafkaAvroGenericTrigger")
public void runOne(
        @KafkaTrigger(
                name = "kafkaAvroGenericSingle",
                topic = "topic",
                brokerList="%BrokerList%",
                consumerGroup="$Default",
                username = "ConfluentCloudUsername",
                password = "ConfluentCloudPassword",
                avroSchema = schema,
                authenticationMode = BrokerAuthenticationMode.PLAIN,
                protocol = BrokerProtocol.SASLSSL) Payment payment,
        final ExecutionContext context) {
    context.getLogger().info(payment.toString());
}

For a complete set of working Java examples for Confluent, see the Kafka extension repository.

Attributes

Both in-process and isolated worker process C# libraries use the KafkaTriggerAttribute to define the function trigger.

The following table explains the properties you can set by using this trigger attribute:

Parameter Description
BrokerList (Required) The list of Kafka brokers monitored by the trigger. See Connections for more information.
Topic (Required) The topic monitored by the trigger.
ConsumerGroup (Optional) Kafka consumer group used by the trigger.
AvroSchema (Optional) Schema of a generic record of message value when using the Avro protocol.
KeyAvroSchema (Optional) Schema of a generic record of message key when using the Avro protocol.
KeyDataType (Optional) Data type to receive the message key as from Kafka Topic. If KeyAvroSchema is set, this value is generic record. Accepted values are Int, Long, String, and Binary.
AuthenticationMode (Optional) The authentication mode when using Simple Authentication and Security Layer (SASL) authentication. The supported values are NotSet (default), Gssapi, Plain, ScramSha256, ScramSha512, and OAuthBearer.
Username (Optional) The username for SASL authentication. Not supported when AuthenticationMode is Gssapi. See Connections for more information.
Password (Optional) The password for SASL authentication. Not supported when AuthenticationMode is Gssapi. See Connections for more information.
Protocol (Optional) The security protocol used when communicating with brokers. The supported values are NotSet (default), plaintext, ssl, sasl_plaintext, sasl_ssl.
SslCaLocation (Optional) Path to CA certificate file for verifying the broker's certificate.
SslCertificateLocation (Optional) Path to the client's certificate.
SslKeyLocation (Optional) Path to client's private key (PEM) used for authentication.
SslKeyPassword (Optional) Password for client's certificate.
SslCertificatePEM (Optional) Client certificate in PEM format as a string. See Connections for more information.
SslKeyPEM (Optional) Client private key in PEM format as a string. See Connections for more information.
SslCaPEM (Optional) CA certificate in PEM format as a string. See Connections for more information.
SslCertificateandKeyPEM (Optional) Client certificate and key in PEM format as a string. See Connections for more information.
SchemaRegistryUrl (Optional) URL for the Avro Schema Registry. See Connections for more information.
SchemaRegistryUsername (Optional) Username for the Avro Schema Registry. See Connections for more information.
SchemaRegistryPassword (Optional) Password for the Avro Schema Registry. See Connections for more information.
OAuthBearerMethod (Optional) OAuth Bearer method. Accepted values are oidc and default.
OAuthBearerClientId (Optional) When OAuthBearerMethod is set to oidc, this specifies the OAuth bearer client ID. See Connections for more information.
OAuthBearerClientSecret (Optional) When OAuthBearerMethod is set to oidc, this specifies the OAuth bearer client secret. See Connections for more information.
OAuthBearerScope (Optional) Specifies the scope of the access request to the broker.
OAuthBearerTokenEndpointUrl (Optional) OAuth/OIDC issuer token endpoint HTTP(S) URI used to retrieve token when oidc method is used. See Connections for more information.
OAuthBearerExtensions (Optional) Comma-separated list of key=value pairs to be provided as additional information to broker when oidc method is used. For example: supportFeatureX=true,organizationId=sales-emea.

Annotations

The KafkaTrigger annotation enables you to create a function that runs when it receives a topic. Supported options include the following elements:

Element Description
name (Required) The name of the variable that represents the queue or topic message in function code.
brokerList (Required) The list of Kafka brokers monitored by the trigger. See Connections for more information.
topic (Required) The topic monitored by the trigger.
cardinality (Optional) Indicates the cardinality of the trigger input. The supported values are ONE (default) and MANY. Use ONE when the input is a single message and MANY when the input is an array of messages. When you use MANY, you must also set a dataType.
dataType Defines how Functions handles the parameter value. By default, the value is obtained as a string and Functions tries to deserialize the string to actual plain-old Java object (POJO). When string, the input is treated as just a string. When binary, the message is received as binary data, and Functions tries to deserialize it to an actual parameter type byte[].
consumerGroup (Optional) Kafka consumer group used by the trigger.
avroSchema (Optional) Schema of a generic record when using the Avro protocol.
authenticationMode (Optional) The authentication mode when using Simple Authentication and Security Layer (SASL) authentication. The supported values are NotSet (default), Gssapi, Plain, ScramSha256, ScramSha512.
username (Optional) The username for SASL authentication. Not supported when AuthenticationMode is Gssapi. See Connections for more information.
password (Optional) The password for SASL authentication. Not supported when AuthenticationMode is Gssapi. See Connections for more information.
protocol (Optional) The security protocol used when communicating with brokers. The supported values are NotSet (default), plaintext, ssl, sasl_plaintext, sasl_ssl.
sslCaLocation (Optional) Path to CA certificate file for verifying the broker's certificate.
sslCertificateLocation (Optional) Path to the client's certificate.
sslKeyLocation (Optional) Path to client's private key (PEM) used for authentication.
sslKeyPassword (Optional) Password for client's certificate.
lagThreshold (Optional) Lag threshold for the trigger.
schemaRegistryUrl (Optional) URL for the Avro Schema Registry. See Connections for more information.
schemaRegistryUsername (Optional) Username for the Avro Schema Registry. See Connections for more information.
schemaRegistryPassword (Optional) Password for the Avro Schema Registry. See Connections for more information.

Configuration

The following table explains the binding configuration properties that you set in the function.json file.

function.json property Description
type (Required) Set to kafkaTrigger.
direction (Required) Set to in.
name (Required) The name of the variable that represents the brokered data in function code.
brokerList (Required) The list of Kafka brokers monitored by the trigger. See Connections for more information.
topic (Required) The topic monitored by the trigger.
cardinality (Optional) Indicates the cardinality of the trigger input. The supported values are ONE (default) and MANY. Use ONE when the input is a single message and MANY when the input is an array of messages. When you use MANY, you must also set a dataType.
dataType Defines how Functions handles the parameter value. By default, the value is obtained as a string and Functions tries to deserialize the string to actual plain-old Java object (POJO). When string, the input is treated as just a string. When binary, the message is received as binary data, and Functions tries to deserialize it to an actual byte array parameter type.
consumerGroup (Optional) Kafka consumer group used by the trigger.
avroSchema (Optional) Schema of a generic record when using the Avro protocol.
keyAvroSchema (Optional) Schema of a generic record of message key when using the Avro protocol.
keyDataType (Optional) Data type to receive the message key as from Kafka Topic. If keyAvroSchema is set, this value is generic record. Accepted values are Int, Long, String, and Binary.
authenticationMode (Optional) The authentication mode when using Simple Authentication and Security Layer (SASL) authentication. The supported values are NotSet (default), Gssapi, Plain, ScramSha256, ScramSha512.
username (Optional) The username for SASL authentication. Not supported when AuthenticationMode is Gssapi. See Connections for more information.
password (Optional) The password for SASL authentication. Not supported when AuthenticationMode is Gssapi. See Connections for more information.
protocol (Optional) The security protocol used when communicating with brokers. The supported values are NotSet (default), plaintext, ssl, sasl_plaintext, sasl_ssl.
sslCaLocation (Optional) Path to CA certificate file for verifying the broker's certificate.
sslCertificateLocation (Optional) Path to the client's certificate.
sslKeyLocation (Optional) Path to client's private key (PEM) used for authentication.
sslKeyPassword (Optional) Password for client's certificate.
sslCertificatePEM (Optional) Client certificate in PEM format as a string. See Connections for more information.
sslKeyPEM (Optional) Client private key in PEM format as a string. See Connections for more information.
sslCaPEM (Optional) CA certificate in PEM format as a string. See Connections for more information.
sslCertificateandKeyPEM (Optional) Client certificate and key in PEM format as a string. See Connections for more information.
lagThreshold (Optional) Lag threshold for the trigger.
schemaRegistryUrl (Optional) URL for the Avro Schema Registry. See Connections for more information.
schemaRegistryUsername (Optional) Username for the Avro Schema Registry. See Connections for more information.
schemaRegistryPassword (Optional) Password for the Avro Schema Registry. See Connections for more information.
oAuthBearerMethod (Optional) OAuth Bearer method. Accepted values are oidc and default.
oAuthBearerClientId (Optional) When oAuthBearerMethod is set to oidc, this specifies the OAuth bearer client ID. See Connections for more information.
oAuthBearerClientSecret (Optional) When oAuthBearerMethod is set to oidc, this specifies the OAuth bearer client secret. See Connections for more information.
oAuthBearerScope (Optional) Specifies the scope of the access request to the broker.
oAuthBearerTokenEndpointUrl (Optional) OAuth/OIDC issuer token endpoint HTTP(S) URI used to retrieve token when oidc method is used. See Connections for more information.

Configuration

The following table explains the binding configuration properties that you set in the function.json file. Python uses snake_case naming conventions for configuration properties.

function.json property Description
type (Required) Set to kafkaTrigger.
direction (Required) Set to in.
name (Required) The name of the variable that represents the brokered data in function code.
broker_list (Required) The list of Kafka brokers monitored by the trigger. See Connections for more information.
topic (Required) The topic monitored by the trigger.
cardinality (Optional) Indicates the cardinality of the trigger input. The supported values are ONE (default) and MANY. Use ONE when the input is a single message and MANY when the input is an array of messages. When you use MANY, you must also set a data_type.
data_type Defines how Functions handles the parameter value. By default, the value is obtained as a string and Functions tries to deserialize the string to actual plain-old Java object (POJO). When string, the input is treated as just a string. When binary, the message is received as binary data, and Functions tries to deserialize it to an actual parameter type byte[].
consumerGroup (Optional) Kafka consumer group used by the trigger.
avroSchema (Optional) Schema of a generic record when using the Avro protocol.
authentication_mode (Optional) The authentication mode when using Simple Authentication and Security Layer (SASL) authentication. The supported values are NOTSET (default), Gssapi, Plain, ScramSha256, ScramSha512.
username (Optional) The username for SASL authentication. Not supported when authentication_mode is Gssapi. See Connections for more information.
password (Optional) The password for SASL authentication. Not supported when authentication_mode is Gssapi. See Connections for more information.
protocol (Optional) The security protocol used when communicating with brokers. The supported values are NOTSET (default), plaintext, ssl, sasl_plaintext, sasl_ssl.
sslCaLocation (Optional) Path to CA certificate file for verifying the broker's certificate.
sslCertificateLocation (Optional) Path to the client's certificate.
sslKeyLocation (Optional) Path to client's private key (PEM) used for authentication.
sslKeyPassword (Optional) Password for client's certificate.
lag_threshold (Optional) Lag threshold for the trigger.
schema_registry_url (Optional) URL for the Avro Schema Registry. See Connections for more information.
schema_registry_username (Optional) Username for the Avro Schema Registry. See Connections for more information.
schema_registry_password (Optional) Password for the Avro Schema Registry. See Connections for more information.
o_auth_bearer_method (Optional) OAuth Bearer method. Accepted values are oidc and default.
o_auth_bearer_client_id (Optional) When o_auth_bearer_method is set to oidc, this specifies the OAuth bearer client ID. See Connections for more information.
o_auth_bearer_client_secret (Optional) When o_auth_bearer_method is set to oidc, this specifies the OAuth bearer client secret. See Connections for more information.
o_auth_bearer_scope (Optional) Specifies the scope of the access request to the broker.
o_auth_bearer_token_endpoint_url (Optional) OAuth/OIDC issuer token endpoint HTTP(S) URI used to retrieve token when oidc method is used. See Connections for more information.

Note

Certificate PEM-related properties and Avro key-related properties aren't yet available in the Python library.

Usage

The Kafka trigger currently supports Kafka events as strings and string arrays that are JSON payloads.

The Kafka trigger passes Kafka messages to the function as strings. The trigger also supports string arrays that are JSON payloads.

In a Premium plan, you must enable runtime scale monitoring for the Kafka output to scale out to multiple instances. To learn more, see Enable runtime scaling.

You can't use the Test/Run feature of the Code + Test page in the Azure portal to work with Kafka triggers. You must instead send test events directly to the topic being monitored by the trigger.

For a complete set of supported host.json settings for the Kafka trigger, see host.json settings.

Connections

Store all connection information required by your triggers and bindings in application settings, not in the binding definitions in your code. This guidance applies to credentials, which you should never store in your code.

Important

Credential settings must reference an application setting. Don't hard-code credentials in your code or configuration files. When running locally, use the local.settings.json file for your credentials, and don't publish the local.settings.json file.

When connecting to a managed Kafka cluster provided by Confluent in Azure, you can use one of the following authentication methods.

Note

When using the Flex Consumption plan, file location-based certificate authentication properties (SslCaLocation, SslCertificateLocation, SslKeyLocation) aren't supported. Instead, use the PEM-based certificate properties (SslCaPEM, SslCertificatePEM, SslKeyPEM, SslCertificateandKeyPEM) or store certificates in Azure Key Vault.

Schema Registry

To make use of schema registry provided by Confluent in Kafka Extension, set the following credentials:

Setting Recommended Value Description
SchemaRegistryUrl SchemaRegistryUrl URL of the schema registry service used for schema management. Usually of the format https://psrc-xyz.us-east-2.aws.confluent.cloud
SchemaRegistryUsername CONFLUENT_API_KEY Username for basic auth on schema registry (if required).
SchemaRegistryPassword CONFLUENT_API_SECRET Password for basic auth on schema registry (if required).

Username/Password authentication

While using this form of authentication, make sure that Protocol is set to either SaslPlaintext or SaslSsl, AuthenticationMode is set to Plain, ScramSha256 or ScramSha512 and, if the CA cert being used is different from the default ISRG Root X1 cert, make sure to update SslCaLocation or SslCaPEM.

Setting Recommended value Description
BrokerList BootstrapServer App setting named BootstrapServer contains the value of bootstrap server found in Confluent Cloud settings page. The value resembles xyz-xyzxzy.westeurope.azure.confluent.cloud:9092.
Username ConfluentCloudUsername App setting named ConfluentCloudUsername contains the API access key from the Confluent Cloud web site.
Password ConfluentCloudPassword App setting named ConfluentCloudPassword contains the API secret obtained from the Confluent Cloud web site.
SslCaPEM SSLCaPemCertificate App setting named SSLCaPemCertificate that contains the CA certificate as a string in PEM format. The value should follow the standard format, for example: -----BEGIN CERTIFICATE-----\nMII....JQ==\n-----END CERTIFICATE-----.

SSL authentication

Ensure that Protocol is set to SSL.

Setting Recommended Value Description
BrokerList BootstrapServer App setting named BootstrapServer contains the value of bootstrap server found in Confluent Cloud settings page. The value resembles xyz-xyzxzy.westeurope.azure.confluent.cloud:9092.
SslCaPEM SslCaCertificatePem App setting named SslCaCertificatePem that contains PEM value of the CA certificate as a string. The value should follow the standard format: -----BEGIN CERTIFICATE-----\nMII...JQ==\n-----END CERTIFICATE-----
SslCertificatePEM SslClientCertificatePem App setting named SslClientCertificatePem that contains PEM value of the client certificate as a string. The value should follow the standard format: -----BEGIN CERTIFICATE-----\nMII...JQ==\n-----END CERTIFICATE-----
SslKeyPEM SslClientKeyPem App setting named SslClientKeyPem that contains PEM value of the client private key as a string. The value should follow the standard format: -----BEGIN PRIVATE KEY-----\nMII...JQ==\n-----END PRIVATE KEY-----
SslCertificateandKeyPEM SslClientCertificateAndKeyPem App setting named SslClientCertificateAndKeyPem that contains PEM value of the client certificate and client private key concatenated as a string. The value should follow the standard format: -----BEGIN CERTIFICATE-----\nMII....JQ==\n-----END CERTIFICATE-----\n-----BEGIN PRIVATE KEY-----\nMIIE....BM=\n-----END PRIVATE KEY-----
SslKeyPassword SslClientKeyPassword App setting named SslClientKeyPassword that contains the password for the private key (if any).

OAuth authentication

When using OAuth authentication, configure the OAuth-related properties in your binding definitions.

The string values you use for these settings must be present as application settings in Azure or in the Values collection in the local.settings.json file during local development.

You should also set the Protocol and AuthenticationMode in your binding definitions.

Next steps