Partager via


Déclencheur Apache Kafka pour Azure Functions

Utilisez le déclencheur Apache Kafka dans Azure Functions pour exécuter votre code de fonction en réponse aux messages dans les rubriques Kafka. Vous pouvez également utiliser une liaison de sortie Kafka pour écrire de votre fonction vers une rubrique. Pour plus d’informations sur l’installation et la configuration, consultez Vue d’ensemble des liaisons Apache Kafka pour Azure Functions.

Important

Les liaisons Kafka sont disponibles pour Functions sur le plan Flex Consumption, Elastic Premium Plan et Le plan Dédié (App Service). Elles ne sont prises en charge que sur la version 4.x du runtime Functions.

Exemple

L’utilisation du déclencheur dépend de la modalité C# utilisée dans votre application de fonction, qui peut être l’un des modes suivants :

Fonction C# compilée qui utilise une bibliothèque de classes de processus worker isolée qui s’exécute dans un processus distinct du runtime.

Les attributs que vous utilisez dépendent du fournisseur d’événements.

L’exemple suivant montre une fonction C# qui lit et journalise le message Kafka en tant qu’événement Kafka :

[Function("KafkaTrigger")]
public static void Run(
    [KafkaTrigger("BrokerList",
                  "topic",
                  Username = "ConfluentCloudUserName",
                  Password = "ConfluentCloudPassword",
                  Protocol = BrokerProtocol.SaslSsl,
                  AuthenticationMode = BrokerAuthenticationMode.Plain,
                  ConsumerGroup = "$Default")] string eventData, FunctionContext context)
{
    var logger = context.GetLogger("KafkaFunction");
    logger.LogInformation($"C# Kafka trigger function processed a message: {JObject.Parse(eventData)["Value"]}");
}

Pour recevoir des événements dans un lot, utilisez un tableau de chaînes comme entrée, tel qu’indiqué dans l’exemple suivant :

[Function("KafkaTriggerMany")]
public static void Run(
    [KafkaTrigger("BrokerList",
                  "topic",
                  Username = "ConfluentCloudUserName",
                  Password = "ConfluentCloudPassword",
                  Protocol = BrokerProtocol.SaslSsl,
                  AuthenticationMode = BrokerAuthenticationMode.Plain,
                  ConsumerGroup = "$Default",
                  IsBatched = true)] string[] events, FunctionContext context)
{
    foreach (var kevent in events)
    {
        var logger = context.GetLogger("KafkaFunction");
        logger.LogInformation($"C# Kafka trigger function processed a message: {JObject.Parse(kevent)["Value"]}");
    }

La fonction suivante enregistre le message et les en-têtes de l’événement Kafka :

[Function("KafkaTriggerWithHeaders")]
public static void Run(
    [KafkaTrigger("BrokerList",
                  "topic",
                  Username = "ConfluentCloudUserName",
                  Password = "ConfluentCloudPassword",
                  Protocol = BrokerProtocol.SaslSsl,
                  AuthenticationMode = BrokerAuthenticationMode.Plain,
                  ConsumerGroup = "$Default")] string eventData, FunctionContext context)
{
    var eventJsonObject = JObject.Parse(eventData);
    var logger = context.GetLogger("KafkaFunction");
    logger.LogInformation($"C# Kafka trigger function processed a message: {eventJsonObject["Value"]}");
    var headersJArr = eventJsonObject["Headers"] as JArray;
    logger.LogInformation("Headers for this event: ");
    foreach (JObject header in headersJArr)
    {
        logger.LogInformation($"{header["Key"]} {System.Text.Encoding.UTF8.GetString((byte[])header["Value"])}");

    }
}

Pour obtenir un ensemble complet d’exemples .NET opérationnels, consultez le dépôt d’extensions Kafka.

L’utilisation du déclencheur dépend de votre version du modèle de programmation Node.js.

Dans le modèle Node.js v4, vous définissez votre déclencheur directement dans votre code de fonction. Pour plus d’informations, consultez le Guide des développeurs Node.js sur Azure Functions.

Dans ces exemples, les fournisseurs d’événements sont Confluent ou Azure Event Hubs. Ces exemples montrent comment définir un déclencheur Kafka pour une fonction qui lit un message Kafka.

const { app } = require("@azure/functions");

async function kafkaTrigger(event, context) {
  context.log("Event Offset: " + event.Offset);
  context.log("Event Partition: " + event.Partition);
  context.log("Event Topic: " + event.Topic);
  context.log("Event Timestamp: " + event.Timestamp);
  context.log("Event Key: " + event.Key);
  context.log("Event Value (as string): " + event.Value);

  let event_obj = JSON.parse(event.Value);

  context.log("Event Value Object: ");
  context.log("   Value.registertime: ", event_obj.registertime.toString());
  context.log("   Value.userid: ", event_obj.userid);
  context.log("   Value.regionid: ", event_obj.regionid);
  context.log("   Value.gender: ", event_obj.gender);
}

app.generic("Kafkatrigger", {
  trigger: {
    type: "kafkaTrigger",
    direction: "in",
    name: "event",
    topic: "topic",
    brokerList: "%BrokerList%",
    username: "%ConfluentCloudUserName%",
    password: "%ConfluentCloudPassword%",
    consumerGroup: "$Default",
    protocol: "saslSsl",
    authenticationMode: "plain",
    dataType: "string"
  },
  handler: kafkaTrigger,
});

Pour recevoir des événements dans un lot, définissez la cardinality valeur manysur , comme indiqué dans ces exemples :

const { app } = require("@azure/functions");

async function kafkaTriggerMany(events, context) {
  for (const event of events) {
    context.log("Event Offset: " + event.Offset);
    context.log("Event Partition: " + event.Partition);
    context.log("Event Topic: " + event.Topic);
    context.log("Event Key: " + event.Key);
    context.log("Event Timestamp: " + event.Timestamp);
    context.log("Event Value (as string): " + event.Value);

    let event_obj = JSON.parse(event.Value);

    context.log("Event Value Object: ");
    context.log("   Value.registertime: ", event_obj.registertime.toString());
    context.log("   Value.userid: ", event_obj.userid);
    context.log("   Value.regionid: ", event_obj.regionid);
    context.log("   Value.gender: ", event_obj.gender);
  }
}

app.generic("kafkaTriggerMany", {
  trigger: {
    type: "kafkaTrigger",
    direction: "in",
    name: "event",
    topic: "topic",
    brokerList: "%BrokerList%",
    username: "%ConfluentCloudUserName%",
    password: "%ConfluentCloudPassword%",
    consumerGroup: "$Default",
    protocol: "saslSsl",
    authenticationMode: "plain",
    dataType: "string",
    cardinality: "MANY"
  },
  handler: kafkaTriggerMany,
});

Vous pouvez définir un schéma Avro générique pour l’événement passé au déclencheur. Cet exemple définit le déclencheur pour le fournisseur spécifique avec un schéma Avro générique :

const { app } = require("@azure/functions");

async function kafkaAvroGenericTrigger(event, context) {
  context.log("Processed kafka event: ", event);
  if (context.triggerMetadata?.key !== undefined) {
    context.log("message key: ", context.triggerMetadata?.key);
  }
}

app.generic("kafkaAvroGenericTrigger", {
  trigger: {
    type: "kafkaTrigger",
    direction: "in",
    name: "event",
    protocol: "SASLSSL",
    password: "EventHubConnectionString",
    dataType: "string",
    topic: "topic",
    authenticationMode: "PLAIN",
    avroSchema:
      '{"type":"record","name":"Payment","namespace":"io.confluent.examples.clients.basicavro","fields":[{"name":"id","type":"string"},{"name":"amount","type":"double"},{"name":"type","type":"string"}]}',
    consumerGroup: "$Default",
    username: "$ConnectionString",
    brokerList: "%BrokerList%",
  },
  handler: kafkaAvroGenericTrigger,
});

Pour obtenir un ensemble complet d’exemples JavaScript opérationnels, consultez le dépôt d’extensions Kafka.

import { app, InvocationContext } from "@azure/functions";

// This is a sample interface that describes the actual data in your event.
interface EventData {
  registertime: number;
  userid: string;
  regionid: string;
  gender: string;
}

export async function kafkaTrigger(
  event: any,
  context: InvocationContext
): Promise<void> {
  context.log("Event Offset: " + event.Offset);
  context.log("Event Partition: " + event.Partition);
  context.log("Event Topic: " + event.Topic);
  context.log("Event Timestamp: " + event.Timestamp);
  context.log("Event Value (as string): " + event.Value);

  let event_obj: EventData = JSON.parse(event.Value);

  context.log("Event Value Object: ");
  context.log("   Value.registertime: ", event_obj.registertime.toString());
  context.log("   Value.userid: ", event_obj.userid);
  context.log("   Value.regionid: ", event_obj.regionid);
  context.log("   Value.gender: ", event_obj.gender);
}

app.generic("Kafkatrigger", {
  trigger: {
    type: "kafkaTrigger",
    direction: "in",
    name: "event",
    topic: "topic",
    brokerList: "%BrokerList%",
    username: "%ConfluentCloudUserName%",
    password: "%ConfluentCloudPassword%",
    consumerGroup: "$Default",
    protocol: "saslSsl",
    authenticationMode: "plain",
    dataType: "string"
  },
  handler: kafkaTrigger,
});

Pour recevoir des événements dans un lot, définissez la cardinality valeur manysur , comme indiqué dans ces exemples :

import { app, InvocationContext } from "@azure/functions";

// This is a sample interface that describes the actual data in your event.
interface EventData {
    registertime: number;
    userid: string;
    regionid: string;
    gender: string;
}

interface KafkaEvent {
    Offset: number;
    Partition: number;
    Topic: string;
    Timestamp: number;
    Value: string;
}

export async function kafkaTriggerMany(
    events: any,
    context: InvocationContext
): Promise<void> {
    for (const event of events) {
        context.log("Event Offset: " + event.Offset);
        context.log("Event Partition: " + event.Partition);
        context.log("Event Topic: " + event.Topic);
        context.log("Event Timestamp: " + event.Timestamp);
        context.log("Event Value (as string): " + event.Value);

        let event_obj: EventData = JSON.parse(event.Value);

        context.log("Event Value Object: ");
        context.log("   Value.registertime: ", event_obj.registertime.toString());
        context.log("   Value.userid: ", event_obj.userid);
        context.log("   Value.regionid: ", event_obj.regionid);
        context.log("   Value.gender: ", event_obj.gender);
    }
}

app.generic("kafkaTriggerMany", {
  trigger: {
    type: "kafkaTrigger",
    direction: "in",
    name: "event",
    topic: "topic",
    brokerList: "%BrokerList%",
    username: "%ConfluentCloudUserName%",
    password: "%ConfluentCloudPassword%",
    consumerGroup: "$Default",
    protocol: "saslSsl",
    authenticationMode: "plain",
    dataType: "string",
    cardinality: "MANY"
  },
  handler: kafkaTriggerMany,
});

Vous pouvez définir un schéma Avro générique pour l’événement passé au déclencheur. Cet exemple définit le déclencheur pour le fournisseur spécifique avec un schéma Avro générique :

import { app, InvocationContext } from "@azure/functions";

export async function kafkaAvroGenericTrigger(
  event: any,
  context: InvocationContext
): Promise<void> {
  context.log("Processed kafka event: ", event);
  context.log(
    `Message ID: ${event.id}, amount: ${event.amount}, type: ${event.type}`
  );
  if (context.triggerMetadata?.key !== undefined) {
    context.log(`Message Key : ${context.triggerMetadata?.key}`);
  }
}

app.generic("kafkaAvroGenericTrigger", {
  trigger: {
    type: "kafkaTrigger",
    direction: "in",
    name: "event",
    protocol: "SASLSSL",
    username: "ConfluentCloudUsername",
    password: "ConfluentCloudPassword",
    dataType: "string",
    topic: "topic",
    authenticationMode: "PLAIN",
    avroSchema:
      '{"type":"record","name":"Payment","namespace":"io.confluent.examples.clients.basicavro","fields":[{"name":"id","type":"string"},{"name":"amount","type":"double"},{"name":"type","type":"string"}]}',
    consumerGroup: "$Default",
    brokerList: "%BrokerList%",
  },
  handler: kafkaAvroGenericTrigger,
});

Pour obtenir un ensemble complet d’exemples TypeScript de travail, consultez le référentiel d’extensions Kafka.

Les propriétés spécifiques du function.json fichier dépendent de votre fournisseur d’événements. Dans ces exemples, les fournisseurs d’événements sont Confluent ou Azure Event Hubs. Les exemples suivants illustrent un déclencheur Kafka pour une fonction qui lit et enregistre un message Kafka.

Le fichier suivant function.json définit le déclencheur pour le fournisseur spécifique :

{
    "bindings": [
      {
            "type": "kafkaTrigger",
            "name": "kafkaEvent",
            "direction": "in",
            "protocol" : "SASLSSL",
            "password" : "%ConfluentCloudPassword%",
            "dataType" : "string",
            "topic" : "topic",
            "authenticationMode" : "PLAIN",
            "consumerGroup" : "$Default",
            "username" : "%ConfluentCloudUserName%",
            "brokerList" : "%BrokerList%",
            "sslCaLocation": "confluent_cloud_cacert.pem"
        }
    ]
}

Le code suivant s’exécute lorsque la fonction est déclenchée :

using namespace System.Net

param($kafkaEvent, $TriggerMetadata)

Write-Output "Powershell Kafka trigger function called for message $kafkaEvent.Value"

Pour recevoir des événements dans un lot, affectez la valeur cardinality à many dans le fichier function.json, comme indiqué dans les exemples suivants :

{
    "bindings": [
      {
            "type": "kafkaTrigger",
            "name": "kafkaEvent",
            "direction": "in",
            "protocol" : "SASLSSL",
            "password" : "%ConfluentCloudPassword%",
            "dataType" : "string",
            "topic" : "topic",
            "authenticationMode" : "PLAIN",
            "cardinality" : "MANY",
            "consumerGroup" : "$Default",
            "username" : "%ConfluentCloudUserName%",
            "brokerList" : "%BrokerList%",
            "sslCaLocation": "confluent_cloud_cacert.pem"
        }
    ]
}

Le code suivant analyse le tableau d’événements et enregistre les données d’événement :

using namespace System.Net

param($kafkaEvents, $TriggerMetadata)

$kafkaEvents
foreach ($kafkaEvent in $kafkaEvents) {
    $event = $kafkaEvent | ConvertFrom-Json -AsHashtable
    Write-Output "Powershell Kafka trigger function called for message $event.Value"
}

Le code suivant enregistre les données d’en-tête :

using namespace System.Net

param($kafkaEvents, $TriggerMetadata)

foreach ($kafkaEvent in $kafkaEvents) {
    $kevent = $kafkaEvent | ConvertFrom-Json -AsHashtable
    Write-Output "Powershell Kafka trigger function called for message $kevent.Value"
    Write-Output "Headers for this message:"
    foreach ($header in $kevent.Headers) {
        $DecodedValue = [System.Text.Encoding]::Unicode.GetString([System.Convert]::FromBase64String($header.Value))
        $Key = $header.Key
        Write-Output "Key: $Key Value: $DecodedValue"
    }
}

Vous pouvez définir un schéma Avro générique pour l’événement passé au déclencheur. Le fichier function.json suivant définit le déclencheur pour le fournisseur spécifique avec un schéma Avro générique :

{
  "bindings" : [ {
    "type" : "kafkaTrigger",
    "direction" : "in",
    "name" : "kafkaEvent",
    "protocol" : "SASLSSL",
    "password" : "ConfluentCloudPassword",
    "topic" : "topic",
    "authenticationMode" : "PLAIN",
    "avroSchema" : "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
    "consumerGroup" : "$Default",
    "username" : "ConfluentCloudUsername",
    "brokerList" : "%BrokerList%"
  } ]
}

Le code suivant s’exécute lorsque la fonction est déclenchée :

using namespace System.Net

param($kafkaEvent, $TriggerMetadata)

Write-Output "Powershell Kafka trigger function called for message $kafkaEvent.Value"

Pour obtenir un ensemble complet d’exemples PowerShell opérationnels, consultez le dépôt d’extensions Kafka.

L’utilisation du déclencheur dépend de votre version du modèle de programmation Python.

Dans le modèle Python v2, vous définissez votre déclencheur directement dans votre code de fonction à l’aide de décorateurs. Pour plus d’informations, consultez le guide du développeur Python d’Azure Functions.

Ces exemples montrent comment définir un déclencheur Kafka pour une fonction qui lit un message Kafka.

@KafkaTrigger.function_name(name="KafkaTrigger")
@KafkaTrigger.kafka_trigger(
    arg_name="kevent",
    topic="KafkaTopic",
    broker_list="KafkaBrokerList",
    username="KafkaUsername",
    password="KafkaPassword",
    protocol="SaslSsl",
    authentication_mode="Plain",
    consumer_group="$Default1")
def kafka_trigger(kevent : func.KafkaEvent):
    logging.info(kevent.get_body().decode('utf-8'))
    logging.info(kevent.metadata)

Cet exemple reçoit des événements dans un lot en définissant la cardinality valeur sur many.

@KafkaTrigger.function_name(name="KafkaTriggerMany")
@KafkaTrigger.kafka_trigger(
    arg_name="kevents",
    topic="KafkaTopic",
    broker_list="KafkaBrokerList",
    username="KafkaUsername",
    password="KafkaPassword",
    protocol="SaslSsl",
    authentication_mode="Plain",
    cardinality="MANY",
    data_type="string",
    consumer_group="$Default2")
def kafka_trigger_many(kevents : typing.List[func.KafkaEvent]):
    for event in kevents:
        logging.info(event.get_body())

Vous pouvez définir un schéma Avro générique pour l’événement passé au déclencheur.

@KafkaTriggerAvro.function_name(name="KafkaTriggerAvroOne")
@KafkaTriggerAvro.kafka_trigger(
    arg_name="kafkaTriggerAvroGeneric",
    topic="KafkaTopic",
    broker_list="KafkaBrokerList",
    username="KafkaUsername",
    password="KafkaPassword",
    protocol="SaslSsl",
    authentication_mode="Plain",
    consumer_group="$Default",
    avro_schema= "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}")
def kafka_trigger_avro_one(kafkaTriggerAvroGeneric : func.KafkaEvent):
    logging.info(kafkaTriggerAvroGeneric.get_body().decode('utf-8'))
    logging.info(kafkaTriggerAvroGeneric.metadata)

Pour obtenir un ensemble complet d’exemples Python opérationnels, consultez le dépôt d’extensions Kafka.

Les annotations que vous utilisez pour configurer votre déclencheur dépendent du fournisseur d’événements spécifique.

L’exemple suivant montre une fonction Java qui lit et enregistre le contenu de l’événement Kafka :

@FunctionName("KafkaTrigger")
public void runSingle(
        @KafkaTrigger(
            name = "KafkaTrigger",
            topic = "topic",  
            brokerList="%BrokerList%",
            consumerGroup="$Default", 
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            protocol = BrokerProtocol.SASLSSL,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
            dataType = "string"
         ) String kafkaEventData,
        final ExecutionContext context) {
        context.getLogger().info(kafkaEventData);
}

Pour recevoir des événements dans un lot, utilisez une chaîne d’entrée en tant que tableau, comme indiqué dans l’exemple suivant :

@FunctionName("KafkaTriggerMany")
public void runMany(
        @KafkaTrigger(
            name = "kafkaTriggerMany",
            topic = "topic",  
            brokerList="%BrokerList%",
            consumerGroup="$Default", 
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            protocol = BrokerProtocol.SASLSSL,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
            cardinality = Cardinality.MANY,
            dataType = "string"
         ) String[] kafkaEvents,
        final ExecutionContext context) {
        for (String kevent: kafkaEvents) {
            context.getLogger().info(kevent);
        }    
}

La fonction suivante enregistre le message et les en-têtes de l’événement Kafka :

@FunctionName("KafkaTriggerManyWithHeaders")
public void runSingle(
        @KafkaTrigger(
            name = "KafkaTrigger",
            topic = "topic",  
            brokerList="%BrokerList%",
            consumerGroup="$Default", 
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            protocol = BrokerProtocol.SASLSSL,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
            dataType = "string",
            cardinality = Cardinality.MANY
         ) List<String> kafkaEvents,
        final ExecutionContext context) {
            Gson gson = new Gson(); 
            for (String keventstr: kafkaEvents) {
                KafkaEntity kevent = gson.fromJson(keventstr, KafkaEntity.class);
                context.getLogger().info("Java Kafka trigger function called for message: " + kevent.Value);
                context.getLogger().info("Headers for the message:");
                for (KafkaHeaders header : kevent.Headers) {
                    String decodedValue = new String(Base64.getDecoder().decode(header.Value));
                    context.getLogger().info("Key:" + header.Key + " Value:" + decodedValue);                    
                }                
            }
        }

Vous pouvez définir un schéma Avro générique pour l’événement passé au déclencheur. La fonction suivante définit un déclencheur pour le fournisseur spécifique avec un schéma Avro générique :

private static final String schema = "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}";

@FunctionName("KafkaAvroGenericTrigger")
public void runOne(
        @KafkaTrigger(
                name = "kafkaAvroGenericSingle",
                topic = "topic",
                brokerList="%BrokerList%",
                consumerGroup="$Default",
                username = "ConfluentCloudUsername",
                password = "ConfluentCloudPassword",
                avroSchema = schema,
                authenticationMode = BrokerAuthenticationMode.PLAIN,
                protocol = BrokerProtocol.SASLSSL) Payment payment,
        final ExecutionContext context) {
    context.getLogger().info(payment.toString());
}

Pour obtenir un ensemble complet d’exemples Java opérationnels pour Confluent, consultez le dépôt d’extensions Kafka.

Attributs

Les bibliothèques C# in-process et de processus Worker isolé utilisent l’attribut KafkaTriggerAttribute pour définir le déclencheur de fonction.

Le tableau suivant explique les propriétés que vous pouvez définir à l’aide de cet attribut de déclencheur :

Paramètre Description
BrokerList (Obligatoire) Liste des répartiteurs Kafka supervisés par le déclencheur. Pour plus d’informations, consultez Connexions .
Rubrique (Obligatoire) Rubrique supervisée par le déclencheur.
ConsumerGroup (Facultatif) Groupe de consommateurs Kafka utilisé par le déclencheur.
AvroSchema (Facultatif) Schéma d’un enregistrement générique de valeur de message lors de l’utilisation du protocole Avro.
KeyAvroSchema (Facultatif) Schéma d’un enregistrement générique de clé de message lors de l’utilisation du protocole Avro.
KeyDataType (Facultatif) Type de données pour recevoir la clé de message à partir de la rubrique Kafka. Si KeyAvroSchema elle est définie, cette valeur est un enregistrement générique. Les valeurs acceptées sont Int, , LongStringet Binary.
AuthenticationMode (Facultatif) Mode d’authentification lors de l’utilisation de l’authentification SASL (Simple Authentication and Security Layer). Les valeurs prises en charge sont NotSet (par défaut), Gssapi, , PlainScramSha256, ScramSha512et OAuthBearer.
Nom d’utilisateur (Facultatif) Nom d’utilisateur pour l’authentification SASL. Non pris en charge quand AuthenticationMode est Gssapi. Pour plus d’informations, consultez Connexions .
Mot de passe (Facultatif) Mot de passe pour l’authentification SASL. Non pris en charge quand AuthenticationMode est Gssapi. Pour plus d’informations, consultez Connexions .
Protocole (Facultatif) Protocole de sécurité utilisé lors de la communication avec les répartiteurs. Les valeurs prises en charge sont NotSet (par défaut), plaintext, ssl, sasl_plaintextsasl_ssl.
SslCaLocation (Facultatif) Chemin du fichier de certificat d’autorité de certification pour vérifier le certificat du répartiteur.
SslCertificateLocation (Facultatif) Chemin du certificat du client.
SslKeyLocation (Facultatif) Chemin de la clé privée du client (PEM) utilisée pour l’authentification.
SslKeyPassword (Facultatif) Mot de passe pour le certificat du client.
SslCertificatePEM (Facultatif) Certificat client au format PEM sous forme de chaîne. Pour plus d’informations, consultez Connexions .
SslKeyPEM (Facultatif) Clé privée du client au format PEM sous forme de chaîne. Pour plus d’informations, consultez Connexions .
SslCaPEM (Facultatif) Certificat d’autorité de certification au format PEM sous forme de chaîne. Pour plus d’informations, consultez Connexions .
SslCertificateandKeyPEM (Facultatif) Certificat client et clé au format PEM sous forme de chaîne. Pour plus d’informations, consultez Connexions .
SchemaRegistryUrl (Facultatif) URL du Registre de schémas Avro. Pour plus d’informations, consultez Connexions .
SchemaRegistryUsername (Facultatif) Nom d’utilisateur du Registre de schémas Avro. Pour plus d’informations, consultez Connexions .
SchemaRegistryPassword (Facultatif) Mot de passe pour le Registre de schémas Avro. Pour plus d’informations, consultez Connexions .
OAuthBearerMethod (Facultatif) Méthode du porteur OAuth. Les valeurs acceptées sont oidc et default.
OAuthBearerClientId (Facultatif) Quand OAuthBearerMethod la valeur est définie oidc, cela spécifie l’ID client du porteur OAuth. Pour plus d’informations, consultez Connexions .
OAuthBearerClientSecret (Facultatif) Quand OAuthBearerMethod la valeur est définie oidc, cela spécifie la clé secrète client du porteur OAuth. Pour plus d’informations, consultez Connexions .
OAuthBearerScope (Facultatif) Spécifie l’étendue de la demande d’accès au répartiteur.
OAuthBearerTokenEndpointUrl (Facultatif) URI HTTP(S) du point de terminaison de jeton d’émetteur OAuth/OIDC utilisé pour récupérer le jeton lorsque oidc la méthode est utilisée. Pour plus d’informations, consultez Connexions .
OAuthBearerExtensions (Facultatif) Liste séparée par des virgules des paires clé=valeur à fournir en tant qu’informations supplémentaires pour le répartiteur lorsque oidc la méthode est utilisée. Par exemple : supportFeatureX=true,organizationId=sales-emea.

Annotations

L’annotation KafkaTrigger vous permet de créer une fonction qui s’exécute lorsqu’elle reçoit une rubrique. Les options prises en charge incluent les éléments suivants :

Élément Description
name (Obligatoire) Nom de la variable qui représente le message de la file d’attente ou de la rubrique dans le code de la fonction.
brokerList (Obligatoire) Liste des répartiteurs Kafka supervisés par le déclencheur. Pour plus d’informations, consultez Connexions .
topic (Obligatoire) Rubrique supervisée par le déclencheur.
cardinalité (Facultatif) Indique la cardinalité de l’entrée du déclencheur. Les valeurs prises en charge sont ONE (valeur par défaut) et MANY. Utilisez ONE quand l’entrée est un message unique et MANY lorsque l’entrée est un tableau de messages. Lorsque vous utilisez MANY, vous devez également définir un dataType.
dataType Définit comment Functions gère la valeur du paramètre. Par défaut, la valeur est obtenue sous la forme d’une chaîne, et Functions tente de désérialiser la chaîne en objet POJO (Plain-Old Java Object) réel. Quand string, l’entrée est traitée comme une simple chaîne. Quand binary, le message est reçu sous forme de données binaires, et Functions tente de le désérialiser en type de paramètre réel byte[].
consumerGroup (Facultatif) Groupe de consommateurs Kafka utilisé par le déclencheur.
avroSchema (Facultatif) Schéma d’un enregistrement générique lors de l’utilisation du protocole Avro.
authenticationMode (Facultatif) Mode d’authentification lors de l’utilisation de l’authentification SASL (Simple Authentication and Security Layer). Les valeurs prises en charge sont NotSet (par défaut), Gssapi, Plain, ScramSha256ScramSha512.
username (Facultatif) Nom d’utilisateur pour l’authentification SASL. Non pris en charge quand AuthenticationMode est Gssapi. Pour plus d’informations, consultez Connexions .
mot de passe (Facultatif) Mot de passe pour l’authentification SASL. Non pris en charge quand AuthenticationMode est Gssapi. Pour plus d’informations, consultez Connexions .
protocol (Facultatif) Protocole de sécurité utilisé lors de la communication avec les répartiteurs. Les valeurs prises en charge sont NotSet (par défaut), plaintext, ssl, sasl_plaintextsasl_ssl.
sslCaLocation (Facultatif) Chemin du fichier de certificat d’autorité de certification pour vérifier le certificat du répartiteur.
sslCertificateLocation (Facultatif) Chemin du certificat du client.
sslKeyLocation (Facultatif) Chemin de la clé privée du client (PEM) utilisée pour l’authentification.
sslKeyPassword (Facultatif) Mot de passe pour le certificat du client.
lagThreshold (Facultatif) Seuil de décalage pour le déclencheur.
schemaRegistryUrl (Facultatif) URL du Registre de schémas Avro. Pour plus d’informations, consultez Connexions .
schemaRegistryUsername (Facultatif) Nom d’utilisateur du Registre de schémas Avro. Pour plus d’informations, consultez Connexions .
schemaRegistryPassword (Facultatif) Mot de passe pour le Registre de schémas Avro. Pour plus d’informations, consultez Connexions .

Configuration

Le tableau suivant décrit les propriétés de configuration de liaison que vous définissez dans le fichier function.json.

Propriété function.json Description
type (Obligatoire) Défini sur kafkaTrigger.
direction (Obligatoire) Défini sur in.
name (Obligatoire) Nom de la variable qui représente les données réparties dans le code de la fonction.
brokerList (Obligatoire) Liste des répartiteurs Kafka supervisés par le déclencheur. Pour plus d’informations, consultez Connexions .
topic (Obligatoire) Rubrique supervisée par le déclencheur.
cardinalité (Facultatif) Indique la cardinalité de l’entrée du déclencheur. Les valeurs prises en charge sont ONE (valeur par défaut) et MANY. Utilisez ONE quand l’entrée est un message unique et MANY lorsque l’entrée est un tableau de messages. Lorsque vous utilisez MANY, vous devez également définir un dataType.
dataType Définit comment Functions gère la valeur du paramètre. Par défaut, la valeur est obtenue sous la forme d’une chaîne, et Functions tente de désérialiser la chaîne en objet POJO (Plain-Old Java Object) réel. Quand string, l’entrée est traitée comme une simple chaîne. Quand binary, le message est reçu en tant que données binaires et Functions tente de le désérialiser en un type de paramètre de tableau d’octets réel.
consumerGroup (Facultatif) Groupe de consommateurs Kafka utilisé par le déclencheur.
avroSchema (Facultatif) Schéma d’un enregistrement générique lors de l’utilisation du protocole Avro.
keyAvroSchema (Facultatif) Schéma d’un enregistrement générique de clé de message lors de l’utilisation du protocole Avro.
keyDataType (Facultatif) Type de données pour recevoir la clé de message à partir de la rubrique Kafka. Si keyAvroSchema elle est définie, cette valeur est un enregistrement générique. Les valeurs acceptées sont Int, , LongStringet Binary.
authenticationMode (Facultatif) Mode d’authentification lors de l’utilisation de l’authentification SASL (Simple Authentication and Security Layer). Les valeurs prises en charge sont NotSet (par défaut), Gssapi, Plain, ScramSha256ScramSha512.
username (Facultatif) Nom d’utilisateur pour l’authentification SASL. Non pris en charge quand AuthenticationMode est Gssapi. Pour plus d’informations, consultez Connexions .
mot de passe (Facultatif) Mot de passe pour l’authentification SASL. Non pris en charge quand AuthenticationMode est Gssapi. Pour plus d’informations, consultez Connexions .
protocol (Facultatif) Protocole de sécurité utilisé lors de la communication avec les répartiteurs. Les valeurs prises en charge sont NotSet (par défaut), plaintext, ssl, sasl_plaintextsasl_ssl.
sslCaLocation (Facultatif) Chemin du fichier de certificat d’autorité de certification pour vérifier le certificat du répartiteur.
sslCertificateLocation (Facultatif) Chemin du certificat du client.
sslKeyLocation (Facultatif) Chemin de la clé privée du client (PEM) utilisée pour l’authentification.
sslKeyPassword (Facultatif) Mot de passe pour le certificat du client.
sslCertificatePEM (Facultatif) Certificat client au format PEM sous forme de chaîne. Pour plus d’informations, consultez Connexions .
sslKeyPEM (Facultatif) Clé privée du client au format PEM sous forme de chaîne. Pour plus d’informations, consultez Connexions .
sslCaPEM (Facultatif) Certificat d’autorité de certification au format PEM sous forme de chaîne. Pour plus d’informations, consultez Connexions .
sslCertificateandKeyPEM (Facultatif) Certificat client et clé au format PEM sous forme de chaîne. Pour plus d’informations, consultez Connexions .
lagThreshold (Facultatif) Seuil de décalage pour le déclencheur.
schemaRegistryUrl (Facultatif) URL du Registre de schémas Avro. Pour plus d’informations, consultez Connexions .
schemaRegistryUsername (Facultatif) Nom d’utilisateur du Registre de schémas Avro. Pour plus d’informations, consultez Connexions .
schemaRegistryPassword (Facultatif) Mot de passe pour le Registre de schémas Avro. Pour plus d’informations, consultez Connexions .
oAuthBearerMethod (Facultatif) Méthode du porteur OAuth. Les valeurs acceptées sont oidc et default.
oAuthBearerClientId (Facultatif) Quand oAuthBearerMethod la valeur est définie oidc, cela spécifie l’ID client du porteur OAuth. Pour plus d’informations, consultez Connexions .
oAuthBearerClientSecret (Facultatif) Quand oAuthBearerMethod la valeur est définie oidc, cela spécifie la clé secrète client du porteur OAuth. Pour plus d’informations, consultez Connexions .
oAuthBearerScope (Facultatif) Spécifie l’étendue de la demande d’accès au répartiteur.
oAuthBearerTokenEndpointUrl (Facultatif) URI HTTP(S) du point de terminaison de jeton d’émetteur OAuth/OIDC utilisé pour récupérer le jeton lorsque oidc la méthode est utilisée. Pour plus d’informations, consultez Connexions .

Configuration

Le tableau suivant décrit les propriétés de configuration de liaison que vous définissez dans le fichier function.json. Python utilise snake_case conventions d’affectation de noms pour les propriétés de configuration.

Propriété function.json Description
type (Obligatoire) Défini sur kafkaTrigger.
direction (Obligatoire) Défini sur in.
name (Obligatoire) Nom de la variable qui représente les données réparties dans le code de la fonction.
broker_list (Obligatoire) Liste des répartiteurs Kafka supervisés par le déclencheur. Pour plus d’informations, consultez Connexions .
topic (Obligatoire) Rubrique supervisée par le déclencheur.
cardinalité (Facultatif) Indique la cardinalité de l’entrée du déclencheur. Les valeurs prises en charge sont ONE (valeur par défaut) et MANY. Utilisez ONE quand l’entrée est un message unique et MANY lorsque l’entrée est un tableau de messages. Lorsque vous utilisez MANY, vous devez également définir un data_type.
data_type Définit comment Functions gère la valeur du paramètre. Par défaut, la valeur est obtenue sous la forme d’une chaîne, et Functions tente de désérialiser la chaîne en objet POJO (Plain-Old Java Object) réel. Quand string, l’entrée est traitée comme une simple chaîne. Quand binary, le message est reçu sous forme de données binaires, et Functions tente de le désérialiser en type de paramètre réel byte[].
consumerGroup (Facultatif) Groupe de consommateurs Kafka utilisé par le déclencheur.
avroSchema (Facultatif) Schéma d’un enregistrement générique lors de l’utilisation du protocole Avro.
authentication_mode (Facultatif) Mode d’authentification lors de l’utilisation de l’authentification SASL (Simple Authentication and Security Layer). Les valeurs prises en charge sont NOTSET (par défaut), Gssapi, Plain, ScramSha256ScramSha512.
username (Facultatif) Nom d’utilisateur pour l’authentification SASL. Non pris en charge quand authentication_mode est Gssapi. Pour plus d’informations, consultez Connexions .
mot de passe (Facultatif) Mot de passe pour l’authentification SASL. Non pris en charge quand authentication_mode est Gssapi. Pour plus d’informations, consultez Connexions .
protocol (Facultatif) Protocole de sécurité utilisé lors de la communication avec les répartiteurs. Les valeurs prises en charge sont NOTSET (par défaut), plaintext, ssl, sasl_plaintextsasl_ssl.
sslCaLocation (Facultatif) Chemin du fichier de certificat d’autorité de certification pour vérifier le certificat du répartiteur.
sslCertificateLocation (Facultatif) Chemin du certificat du client.
sslKeyLocation (Facultatif) Chemin de la clé privée du client (PEM) utilisée pour l’authentification.
sslKeyPassword (Facultatif) Mot de passe pour le certificat du client.
lag_threshold (Facultatif) Seuil de décalage pour le déclencheur.
schema_registry_url (Facultatif) URL du Registre de schémas Avro. Pour plus d’informations, consultez Connexions .
schema_registry_username (Facultatif) Nom d’utilisateur du Registre de schémas Avro. Pour plus d’informations, consultez Connexions .
schema_registry_password (Facultatif) Mot de passe pour le Registre de schémas Avro. Pour plus d’informations, consultez Connexions .
o_auth_bearer_method (Facultatif) Méthode du porteur OAuth. Les valeurs acceptées sont oidc et default.
o_auth_bearer_client_id (Facultatif) Quand o_auth_bearer_method la valeur est définie oidc, cela spécifie l’ID client du porteur OAuth. Pour plus d’informations, consultez Connexions .
o_auth_bearer_client_secret (Facultatif) Quand o_auth_bearer_method la valeur est définie oidc, cela spécifie la clé secrète client du porteur OAuth. Pour plus d’informations, consultez Connexions .
o_auth_bearer_scope (Facultatif) Spécifie l’étendue de la demande d’accès au répartiteur.
o_auth_bearer_token_endpoint_url (Facultatif) URI HTTP(S) du point de terminaison de jeton d’émetteur OAuth/OIDC utilisé pour récupérer le jeton lorsque oidc la méthode est utilisée. Pour plus d’informations, consultez Connexions .

Notes

Les propriétés associées au certificat PEM et les propriétés liées à la clé Avro ne sont pas encore disponibles dans la bibliothèque Python.

Utilisation

Le déclencheur Kafka prend actuellement en charge les événements Kafka en tant que chaînes et tableaux de chaînes qui sont des charges utiles JSON.

Le déclencheur Kafka transmet les messages Kafka à la fonction sous forme de chaînes. Le déclencheur prend également en charge les tableaux de chaînes qui sont des charges utiles JSON.

Dans un plan Premium, vous devez activer la surveillance de la mise à l’échelle du runtime pour la sortie Kafka pour effectuer un scale-out vers plusieurs instances. Pour plus d’informations, consultez Activer la mise à l’échelle du runtime.

Vous ne pouvez pas utiliser la fonctionnalité Test/Exécution de la page Code + Test dans le portail Azure pour utiliser des déclencheurs Kafka. À la place, vous devez envoyer des événements de test directement à la rubrique monitorée par le déclencheur.

Pour obtenir un ensemble complet de paramètres host.json pris en charge pour le déclencheur Kafka, consultez Paramètres host.json.

Connexions

Stockez toutes les informations de connexion requises par vos déclencheurs et liaisons dans les paramètres de l’application, et non dans les définitions de liaison dans votre code. Cette aide s’applique aux informations d’identification, que vous ne devez jamais stocker dans votre code.

Important

Les paramètres d’informations d’identification doivent référencer un paramètre d’application. Ne codez pas d’informations d’identification en dur dans vos fichiers de code ou de configuration. Lors de l’exécution locale, utilisez le fichier local.settings.json pour vos informations d’identification, et ne publiez pas le fichier local.settings.json.

Lors de la connexion à un cluster Kafka managé fourni par Confluent dans Azure, vous pouvez utiliser l’une des méthodes d’authentification suivantes.

Notes

Lorsque vous utilisez le plan Flex Consumption, les propriétés d’authentification de certificat basées sur l’emplacement des fichiers (SslCaLocation, SslCertificateLocation, SslKeyLocation) ne sont pas prises en charge. Utilisez plutôt les propriétés de certificat pem (SslCaPEM, SslCertificatePEM, , SslKeyPEMSslCertificateandKeyPEM) ou stockez des certificats dans Azure Key Vault.

Registre de schémas

Pour utiliser le registre de schémas fourni par Confluent dans l’extension Kafka, définissez les informations d’identification suivantes :

Paramètre Valeur recommandée Description
SchemaRegistryUrl SchemaRegistryUrl URL du service de Registre de schémas utilisé pour la gestion des schémas. Généralement du format https://psrc-xyz.us-east-2.aws.confluent.cloud
SchemaRegistryUsername CONFLUENT_API_KEY Nom d’utilisateur pour l’authentification de base sur le registre de schémas (si nécessaire).
SchemaRegistryPassword CONFLUENT_API_SECRET Mot de passe pour l’authentification de base sur le registre de schémas (si nécessaire).

Authentification par nom d’utilisateur/mot de passe

Lors de l’utilisation de cette forme d’authentification, assurez-vous qu’elle Protocol est définie SaslPlaintextPlainsur ou SaslSslsur , ScramSha256AuthenticationMode ou ScramSha512 si le certificat d’autorité de certification utilisé est différent du certificat ISRG Root X1 par défaut, veillez à mettre à jour SslCaLocation ou SslCaPEM.

Paramètre Valeur recommandée Description
BrokerList BootstrapServer Le paramètre d’application nommé BootstrapServer contient la valeur du serveur de démarrage trouvé sur la page de paramètres Confluent Cloud. La valeur est semblable à xyz-xyzxzy.westeurope.azure.confluent.cloud:9092.
Nom d’utilisateur ConfluentCloudUsername Le paramètre d’application nommé ConfluentCloudUsername contient la clé d’accès à l’API du site web de Confluent Cloud.
Mot de passe ConfluentCloudPassword Le paramètre d’application nommé ConfluentCloudPassword contient le secret de l’API obtenu sur le site web de Confluent Cloud.
SslCaPEM SSLCaPemCertificate Paramètre d’application nommé SSLCaPemCertificate qui contient le certificat d’autorité de certification sous forme de chaîne au format PEM. La valeur doit respecter le format standard, par exemple : -----BEGIN CERTIFICATE-----\nMII....JQ==\n-----END CERTIFICATE-----.

Authentification SSL

Assurez-vous que Protocol est réglé sur SSL.

Paramètre Valeur recommandée Description
BrokerList BootstrapServer Le paramètre d’application nommé BootstrapServer contient la valeur du serveur de démarrage trouvé sur la page de paramètres Confluent Cloud. La valeur est semblable à xyz-xyzxzy.westeurope.azure.confluent.cloud:9092.
SslCaPEM SslCaCertificatePem Paramètre d’application nommé SslCaCertificatePem qui contient la valeur PEM du certificat d’autorité de certification sous forme de chaîne. La valeur doit respecter le format standard : -----BEGIN CERTIFICATE-----\nMII...JQ==\n-----END CERTIFICATE-----
SslCertificatePEM SslClientCertificatePem Paramètre d’application nommé SslClientCertificatePem qui contient la valeur PEM du certificat client sous forme de chaîne. La valeur doit respecter le format standard : -----BEGIN CERTIFICATE-----\nMII...JQ==\n-----END CERTIFICATE-----
SslKeyPEM SslClientKeyPem Paramètre d’application nommé SslClientKeyPem qui contient la valeur PEM de la clé privée cliente sous forme de chaîne. La valeur doit respecter le format standard : -----BEGIN PRIVATE KEY-----\nMII...JQ==\n-----END PRIVATE KEY-----
SslCertificateandKeyPEM SslClientCertificateAndKeyPem Paramètre d’application nommé SslClientCertificateAndKeyPem qui contient la valeur PEM du certificat client et de la clé privée cliente concaténées sous forme de chaîne. La valeur doit respecter le format standard : -----BEGIN CERTIFICATE-----\nMII....JQ==\n-----END CERTIFICATE-----\n-----BEGIN PRIVATE KEY-----\nMIIE....BM=\n-----END PRIVATE KEY-----
SslKeyPassword SslClientKeyPassword Paramètre d’application nommé SslClientKeyPassword qui contient le mot de passe de la clé privée (le cas échéant).

Authentification OAuth

Lorsque vous utilisez l’authentification OAuth, configurez les propriétés associées à OAuth dans vos définitions de liaison.

Les valeurs de chaîne que vous utilisez pour ces paramètres doivent être présentes en tant que paramètres d’application dans Azure ou dans la collection Values dans le fichier local.settings.json file pendant le développement local.

Vous devez également définir les Protocol définitions de liaison et AuthenticationMode les définir.

Étapes suivantes