Nota:
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
Use el desencadenador de Apache Kafka en Azure Functions para ejecutar el código de función en respuesta a los mensajes de los temas de Kafka. También puede usar un enlace de salida de Kafka para escribir desde la función en un tema. Para obtener información sobre los detalles de instalación y configuración, consulte Información general de los enlaces de Apache Kafka para Azure Functions.
Importante
Los enlaces de Kafka están disponibles para Functions en el plan de consumo flexible, el plan Elastic Premium y el plan dedicado (App Service). Solo se admiten en la versión 4.x del entorno de ejecución de Functions.
Ejemplo
El uso del desencadenador depende de la modalidad de C# que se usa en la aplicación de funciones, que puede ser uno de los modos siguientes:
Función de C# compilada que usa una biblioteca de clases de proceso de trabajo aislada que se ejecuta en un proceso independiente del tiempo de ejecución.
Los atributos que use dependen del proveedor de eventos específico.
En el ejemplo siguiente se muestra una función de C# que lee y registra el mensaje de Kafka como un evento de Kafka:
[Function("KafkaTrigger")]
public static void Run(
[KafkaTrigger("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain,
ConsumerGroup = "$Default")] string eventData, FunctionContext context)
{
var logger = context.GetLogger("KafkaFunction");
logger.LogInformation($"C# Kafka trigger function processed a message: {JObject.Parse(eventData)["Value"]}");
}
Para recibir eventos en un lote, use una matriz de cadenas como entrada, como se muestra en el ejemplo siguiente:
[Function("KafkaTriggerMany")]
public static void Run(
[KafkaTrigger("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain,
ConsumerGroup = "$Default",
IsBatched = true)] string[] events, FunctionContext context)
{
foreach (var kevent in events)
{
var logger = context.GetLogger("KafkaFunction");
logger.LogInformation($"C# Kafka trigger function processed a message: {JObject.Parse(kevent)["Value"]}");
}
La función siguiente registra el mensaje y los encabezados del evento de Kafka:
[Function("KafkaTriggerWithHeaders")]
public static void Run(
[KafkaTrigger("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain,
ConsumerGroup = "$Default")] string eventData, FunctionContext context)
{
var eventJsonObject = JObject.Parse(eventData);
var logger = context.GetLogger("KafkaFunction");
logger.LogInformation($"C# Kafka trigger function processed a message: {eventJsonObject["Value"]}");
var headersJArr = eventJsonObject["Headers"] as JArray;
logger.LogInformation("Headers for this event: ");
foreach (JObject header in headersJArr)
{
logger.LogInformation($"{header["Key"]} {System.Text.Encoding.UTF8.GetString((byte[])header["Value"])}");
}
}
Para obtener un conjunto completo de ejemplos de .NET en funcionamiento, consulte el repositorio de extensiones de Kafka.
El uso del desencadenador depende de la versión del modelo de programación de Node.js.
En el modelo de Node.js v4, defina el desencadenador directamente en el código de función. Para más información, vea la Guía de Azure Functions para desarrolladores de Node.js.
En estos ejemplos, los proveedores de eventos son Confluent o Azure Event Hubs. Estos ejemplos muestran cómo definir un desencadenador de Kafka para una función que lee un mensaje de Kafka.
const { app } = require("@azure/functions");
async function kafkaTrigger(event, context) {
context.log("Event Offset: " + event.Offset);
context.log("Event Partition: " + event.Partition);
context.log("Event Topic: " + event.Topic);
context.log("Event Timestamp: " + event.Timestamp);
context.log("Event Key: " + event.Key);
context.log("Event Value (as string): " + event.Value);
let event_obj = JSON.parse(event.Value);
context.log("Event Value Object: ");
context.log(" Value.registertime: ", event_obj.registertime.toString());
context.log(" Value.userid: ", event_obj.userid);
context.log(" Value.regionid: ", event_obj.regionid);
context.log(" Value.gender: ", event_obj.gender);
}
app.generic("Kafkatrigger", {
trigger: {
type: "kafkaTrigger",
direction: "in",
name: "event",
topic: "topic",
brokerList: "%BrokerList%",
username: "%ConfluentCloudUserName%",
password: "%ConfluentCloudPassword%",
consumerGroup: "$Default",
protocol: "saslSsl",
authenticationMode: "plain",
dataType: "string"
},
handler: kafkaTrigger,
});
Para recibir eventos en un lote, establezca el cardinality valor manyen , como se muestra en estos ejemplos:
const { app } = require("@azure/functions");
async function kafkaTriggerMany(events, context) {
for (const event of events) {
context.log("Event Offset: " + event.Offset);
context.log("Event Partition: " + event.Partition);
context.log("Event Topic: " + event.Topic);
context.log("Event Key: " + event.Key);
context.log("Event Timestamp: " + event.Timestamp);
context.log("Event Value (as string): " + event.Value);
let event_obj = JSON.parse(event.Value);
context.log("Event Value Object: ");
context.log(" Value.registertime: ", event_obj.registertime.toString());
context.log(" Value.userid: ", event_obj.userid);
context.log(" Value.regionid: ", event_obj.regionid);
context.log(" Value.gender: ", event_obj.gender);
}
}
app.generic("kafkaTriggerMany", {
trigger: {
type: "kafkaTrigger",
direction: "in",
name: "event",
topic: "topic",
brokerList: "%BrokerList%",
username: "%ConfluentCloudUserName%",
password: "%ConfluentCloudPassword%",
consumerGroup: "$Default",
protocol: "saslSsl",
authenticationMode: "plain",
dataType: "string",
cardinality: "MANY"
},
handler: kafkaTriggerMany,
});
Puede definir un esquema de Avro genérico para el evento que se pasa al desencadenador. En este ejemplo se define el desencadenador para el proveedor específico con un esquema avro genérico:
const { app } = require("@azure/functions");
async function kafkaAvroGenericTrigger(event, context) {
context.log("Processed kafka event: ", event);
if (context.triggerMetadata?.key !== undefined) {
context.log("message key: ", context.triggerMetadata?.key);
}
}
app.generic("kafkaAvroGenericTrigger", {
trigger: {
type: "kafkaTrigger",
direction: "in",
name: "event",
protocol: "SASLSSL",
password: "EventHubConnectionString",
dataType: "string",
topic: "topic",
authenticationMode: "PLAIN",
avroSchema:
'{"type":"record","name":"Payment","namespace":"io.confluent.examples.clients.basicavro","fields":[{"name":"id","type":"string"},{"name":"amount","type":"double"},{"name":"type","type":"string"}]}',
consumerGroup: "$Default",
username: "$ConnectionString",
brokerList: "%BrokerList%",
},
handler: kafkaAvroGenericTrigger,
});
Para obtener un conjunto completo de ejemplos de JavaScript en funcionamiento, consulte el repositorio de extensiones de Kafka.
import { app, InvocationContext } from "@azure/functions";
// This is a sample interface that describes the actual data in your event.
interface EventData {
registertime: number;
userid: string;
regionid: string;
gender: string;
}
export async function kafkaTrigger(
event: any,
context: InvocationContext
): Promise<void> {
context.log("Event Offset: " + event.Offset);
context.log("Event Partition: " + event.Partition);
context.log("Event Topic: " + event.Topic);
context.log("Event Timestamp: " + event.Timestamp);
context.log("Event Value (as string): " + event.Value);
let event_obj: EventData = JSON.parse(event.Value);
context.log("Event Value Object: ");
context.log(" Value.registertime: ", event_obj.registertime.toString());
context.log(" Value.userid: ", event_obj.userid);
context.log(" Value.regionid: ", event_obj.regionid);
context.log(" Value.gender: ", event_obj.gender);
}
app.generic("Kafkatrigger", {
trigger: {
type: "kafkaTrigger",
direction: "in",
name: "event",
topic: "topic",
brokerList: "%BrokerList%",
username: "%ConfluentCloudUserName%",
password: "%ConfluentCloudPassword%",
consumerGroup: "$Default",
protocol: "saslSsl",
authenticationMode: "plain",
dataType: "string"
},
handler: kafkaTrigger,
});
Para recibir eventos en un lote, establezca el cardinality valor manyen , como se muestra en estos ejemplos:
import { app, InvocationContext } from "@azure/functions";
// This is a sample interface that describes the actual data in your event.
interface EventData {
registertime: number;
userid: string;
regionid: string;
gender: string;
}
interface KafkaEvent {
Offset: number;
Partition: number;
Topic: string;
Timestamp: number;
Value: string;
}
export async function kafkaTriggerMany(
events: any,
context: InvocationContext
): Promise<void> {
for (const event of events) {
context.log("Event Offset: " + event.Offset);
context.log("Event Partition: " + event.Partition);
context.log("Event Topic: " + event.Topic);
context.log("Event Timestamp: " + event.Timestamp);
context.log("Event Value (as string): " + event.Value);
let event_obj: EventData = JSON.parse(event.Value);
context.log("Event Value Object: ");
context.log(" Value.registertime: ", event_obj.registertime.toString());
context.log(" Value.userid: ", event_obj.userid);
context.log(" Value.regionid: ", event_obj.regionid);
context.log(" Value.gender: ", event_obj.gender);
}
}
app.generic("kafkaTriggerMany", {
trigger: {
type: "kafkaTrigger",
direction: "in",
name: "event",
topic: "topic",
brokerList: "%BrokerList%",
username: "%ConfluentCloudUserName%",
password: "%ConfluentCloudPassword%",
consumerGroup: "$Default",
protocol: "saslSsl",
authenticationMode: "plain",
dataType: "string",
cardinality: "MANY"
},
handler: kafkaTriggerMany,
});
Puede definir un esquema de Avro genérico para el evento que se pasa al desencadenador. En este ejemplo se define el desencadenador para el proveedor específico con un esquema avro genérico:
import { app, InvocationContext } from "@azure/functions";
export async function kafkaAvroGenericTrigger(
event: any,
context: InvocationContext
): Promise<void> {
context.log("Processed kafka event: ", event);
context.log(
`Message ID: ${event.id}, amount: ${event.amount}, type: ${event.type}`
);
if (context.triggerMetadata?.key !== undefined) {
context.log(`Message Key : ${context.triggerMetadata?.key}`);
}
}
app.generic("kafkaAvroGenericTrigger", {
trigger: {
type: "kafkaTrigger",
direction: "in",
name: "event",
protocol: "SASLSSL",
username: "ConfluentCloudUsername",
password: "ConfluentCloudPassword",
dataType: "string",
topic: "topic",
authenticationMode: "PLAIN",
avroSchema:
'{"type":"record","name":"Payment","namespace":"io.confluent.examples.clients.basicavro","fields":[{"name":"id","type":"string"},{"name":"amount","type":"double"},{"name":"type","type":"string"}]}',
consumerGroup: "$Default",
brokerList: "%BrokerList%",
},
handler: kafkaAvroGenericTrigger,
});
Para obtener un conjunto completo de ejemplos de TypeScript en funcionamiento, consulte el repositorio de extensiones de Kafka.
Las propiedades específicas del function.json archivo dependen del proveedor de eventos. En estos ejemplos, los proveedores de eventos son Confluent o Azure Event Hubs. En los ejemplos siguientes se muestra un desencadenador de Kafka para una función que lee y registra un mensaje de Kafka.
El siguiente function.json archivo define el desencadenador para el proveedor específico:
{
"bindings": [
{
"type": "kafkaTrigger",
"name": "kafkaEvent",
"direction": "in",
"protocol" : "SASLSSL",
"password" : "%ConfluentCloudPassword%",
"dataType" : "string",
"topic" : "topic",
"authenticationMode" : "PLAIN",
"consumerGroup" : "$Default",
"username" : "%ConfluentCloudUserName%",
"brokerList" : "%BrokerList%",
"sslCaLocation": "confluent_cloud_cacert.pem"
}
]
}
El código siguiente se ejecuta cuando se desencadena la función:
using namespace System.Net
param($kafkaEvent, $TriggerMetadata)
Write-Output "Powershell Kafka trigger function called for message $kafkaEvent.Value"
Para recibir eventos en un lote, establezca el valor de cardinality en many en el archivo function.json, como se muestra en los ejemplos siguientes:
{
"bindings": [
{
"type": "kafkaTrigger",
"name": "kafkaEvent",
"direction": "in",
"protocol" : "SASLSSL",
"password" : "%ConfluentCloudPassword%",
"dataType" : "string",
"topic" : "topic",
"authenticationMode" : "PLAIN",
"cardinality" : "MANY",
"consumerGroup" : "$Default",
"username" : "%ConfluentCloudUserName%",
"brokerList" : "%BrokerList%",
"sslCaLocation": "confluent_cloud_cacert.pem"
}
]
}
El código siguiente analiza la matriz de eventos y registra los datos del evento:
using namespace System.Net
param($kafkaEvents, $TriggerMetadata)
$kafkaEvents
foreach ($kafkaEvent in $kafkaEvents) {
$event = $kafkaEvent | ConvertFrom-Json -AsHashtable
Write-Output "Powershell Kafka trigger function called for message $event.Value"
}
El código siguiente registra los datos de encabezado:
using namespace System.Net
param($kafkaEvents, $TriggerMetadata)
foreach ($kafkaEvent in $kafkaEvents) {
$kevent = $kafkaEvent | ConvertFrom-Json -AsHashtable
Write-Output "Powershell Kafka trigger function called for message $kevent.Value"
Write-Output "Headers for this message:"
foreach ($header in $kevent.Headers) {
$DecodedValue = [System.Text.Encoding]::Unicode.GetString([System.Convert]::FromBase64String($header.Value))
$Key = $header.Key
Write-Output "Key: $Key Value: $DecodedValue"
}
}
Puede definir un esquema de Avro genérico para el evento que se pasa al desencadenador. El siguiente archivo function.json define el desencadenador para el proveedor específico con un esquema de Avro genérico:
{
"bindings" : [ {
"type" : "kafkaTrigger",
"direction" : "in",
"name" : "kafkaEvent",
"protocol" : "SASLSSL",
"password" : "ConfluentCloudPassword",
"topic" : "topic",
"authenticationMode" : "PLAIN",
"avroSchema" : "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
"consumerGroup" : "$Default",
"username" : "ConfluentCloudUsername",
"brokerList" : "%BrokerList%"
} ]
}
El código siguiente se ejecuta cuando se desencadena la función:
using namespace System.Net
param($kafkaEvent, $TriggerMetadata)
Write-Output "Powershell Kafka trigger function called for message $kafkaEvent.Value"
Para obtener un conjunto completo de ejemplos de PowerShell en funcionamiento, consulte el repositorio de extensiones de Kafka.
El uso del desencadenador depende de la versión del modelo de programación de Python.
En el modelo de Python v2, el desencadenador se define directamente en el código de función mediante decoradores. Para más información, consulte la guía para desarrolladores de Python de Azure Functions.
Estos ejemplos muestran cómo definir un desencadenador de Kafka para una función que lee un mensaje de Kafka.
@KafkaTrigger.function_name(name="KafkaTrigger")
@KafkaTrigger.kafka_trigger(
arg_name="kevent",
topic="KafkaTopic",
broker_list="KafkaBrokerList",
username="KafkaUsername",
password="KafkaPassword",
protocol="SaslSsl",
authentication_mode="Plain",
consumer_group="$Default1")
def kafka_trigger(kevent : func.KafkaEvent):
logging.info(kevent.get_body().decode('utf-8'))
logging.info(kevent.metadata)
En este ejemplo se reciben eventos de un lote estableciendo el cardinality valor manyen .
@KafkaTrigger.function_name(name="KafkaTriggerMany")
@KafkaTrigger.kafka_trigger(
arg_name="kevents",
topic="KafkaTopic",
broker_list="KafkaBrokerList",
username="KafkaUsername",
password="KafkaPassword",
protocol="SaslSsl",
authentication_mode="Plain",
cardinality="MANY",
data_type="string",
consumer_group="$Default2")
def kafka_trigger_many(kevents : typing.List[func.KafkaEvent]):
for event in kevents:
logging.info(event.get_body())
Puede definir un esquema de Avro genérico para el evento que se pasa al desencadenador.
@KafkaTriggerAvro.function_name(name="KafkaTriggerAvroOne")
@KafkaTriggerAvro.kafka_trigger(
arg_name="kafkaTriggerAvroGeneric",
topic="KafkaTopic",
broker_list="KafkaBrokerList",
username="KafkaUsername",
password="KafkaPassword",
protocol="SaslSsl",
authentication_mode="Plain",
consumer_group="$Default",
avro_schema= "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}")
def kafka_trigger_avro_one(kafkaTriggerAvroGeneric : func.KafkaEvent):
logging.info(kafkaTriggerAvroGeneric.get_body().decode('utf-8'))
logging.info(kafkaTriggerAvroGeneric.metadata)
Para obtener un conjunto completo de ejemplos de Python en funcionamiento, consulte el repositorio de extensiones de Kafka.
Las anotaciones que se usan para configurar el desencadenador dependen del proveedor de eventos específico.
En el ejemplo siguiente se muestra una función de Java que lee y registra el contenido del evento de Kafka:
@FunctionName("KafkaTrigger")
public void runSingle(
@KafkaTrigger(
name = "KafkaTrigger",
topic = "topic",
brokerList="%BrokerList%",
consumerGroup="$Default",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
protocol = BrokerProtocol.SASLSSL,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
dataType = "string"
) String kafkaEventData,
final ExecutionContext context) {
context.getLogger().info(kafkaEventData);
}
Para recibir eventos en un lote, use una cadena de entrada como una matriz, como se muestra en el ejemplo siguiente:
@FunctionName("KafkaTriggerMany")
public void runMany(
@KafkaTrigger(
name = "kafkaTriggerMany",
topic = "topic",
brokerList="%BrokerList%",
consumerGroup="$Default",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
protocol = BrokerProtocol.SASLSSL,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
cardinality = Cardinality.MANY,
dataType = "string"
) String[] kafkaEvents,
final ExecutionContext context) {
for (String kevent: kafkaEvents) {
context.getLogger().info(kevent);
}
}
La función siguiente registra el mensaje y los encabezados del evento de Kafka:
@FunctionName("KafkaTriggerManyWithHeaders")
public void runSingle(
@KafkaTrigger(
name = "KafkaTrigger",
topic = "topic",
brokerList="%BrokerList%",
consumerGroup="$Default",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
protocol = BrokerProtocol.SASLSSL,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
dataType = "string",
cardinality = Cardinality.MANY
) List<String> kafkaEvents,
final ExecutionContext context) {
Gson gson = new Gson();
for (String keventstr: kafkaEvents) {
KafkaEntity kevent = gson.fromJson(keventstr, KafkaEntity.class);
context.getLogger().info("Java Kafka trigger function called for message: " + kevent.Value);
context.getLogger().info("Headers for the message:");
for (KafkaHeaders header : kevent.Headers) {
String decodedValue = new String(Base64.getDecoder().decode(header.Value));
context.getLogger().info("Key:" + header.Key + " Value:" + decodedValue);
}
}
}
Puede definir un esquema de Avro genérico para el evento que se pasa al desencadenador. La función siguiente define un desencadenador para el proveedor específico con un esquema de Avro genérico:
private static final String schema = "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}";
@FunctionName("KafkaAvroGenericTrigger")
public void runOne(
@KafkaTrigger(
name = "kafkaAvroGenericSingle",
topic = "topic",
brokerList="%BrokerList%",
consumerGroup="$Default",
username = "ConfluentCloudUsername",
password = "ConfluentCloudPassword",
avroSchema = schema,
authenticationMode = BrokerAuthenticationMode.PLAIN,
protocol = BrokerProtocol.SASLSSL) Payment payment,
final ExecutionContext context) {
context.getLogger().info(payment.toString());
}
Para obtener un conjunto completo de ejemplos de Java en funcionamiento para Confluent, consulte el repositorio de extensiones de Kafka.
Atributos
Las bibliotecas de C# de procesos de trabajo aislados y en proceso usan KafkaTriggerAttribute para definir el desencadenador de la función.
En la tabla siguiente se explican las propiedades que puede establecer mediante este atributo de desencadenador:
| Parámetro | Descripción |
|---|---|
| BrokerList | (Obligatorio) Lista de agentes de Kafka que supervisa el desencadenador. Consulte Conexiones para obtener más información. |
| Tema. | (Obligatorio) Tema supervisado por el desencadenador. |
| ConsumerGroup | (Opcional) Grupo de consumidores de Kafka que usa el desencadenador. |
| AvroSchema | (Opcional) Esquema de un registro genérico de valor de mensaje al usar el protocolo Avro. |
| KeyAvroSchema | (Opcional) Esquema de un registro genérico de la clave de mensaje al usar el protocolo Avro. |
| KeyDataType | (Opcional) Tipo de datos para recibir la clave de mensaje a partir del tema de Kafka. Si KeyAvroSchema se establece, este valor es un registro genérico. Los valores aceptados son Int, Long, Stringy Binary. |
| AuthenticationMode | (Opcional) Modo de autenticación cuando se usa la autenticación SASL (Nivel de seguridad y autenticación simples). Los valores admitidos son NotSet (valor predeterminado), Gssapi, Plain, ScramSha256, ScramSha512y OAuthBearer. |
| Nombre de usuario | (Opcional) Nombre de usuario para la autenticación SASL. No se admite cuando AuthenticationMode es Gssapi. Consulte Conexiones para obtener más información. |
| Contraseña | (Opcional) Contraseña para la autenticación SASL. No se admite cuando AuthenticationMode es Gssapi. Consulte Conexiones para obtener más información. |
| Protocolo | (Opcional) Protocolo de seguridad que se usa al comunicarse con agentes. Los valores admitidos son NotSet (valor predeterminado), plaintext, ssl, sasl_plaintext, sasl_ssl. |
| SslCaLocation | (Opcional) Ruta de acceso al archivo de certificado de CA para comprobar el certificado del agente. |
| SslCertificateLocation | (Opcional) Ruta de acceso al certificado del cliente. |
| SslKeyLocation | (Opcional) Ruta de acceso a la clave privada (PEM) del cliente que se usa para la autenticación. |
| SslKeyPassword | (Opcional) Contraseña del certificado del cliente. |
| SslCertificatePEM | (Opcional) Certificado de cliente en formato PEM como una cadena. Consulte Conexiones para obtener más información. |
| SslKeyPEM | (Opcional) Clave privada de cliente en formato PEM como una cadena. Consulte Conexiones para obtener más información. |
| SslCaPEM | (Opcional) Certificado de ENTIDAD de certificación en formato PEM como una cadena. Consulte Conexiones para obtener más información. |
| SslCertificateandKeyPEM | (Opcional) Certificado de cliente y clave en formato PEM como una cadena. Consulte Conexiones para obtener más información. |
| SchemaRegistryUrl | (Opcional) Dirección URL del Registro de esquemas de Avro. Consulte Conexiones para obtener más información. |
| SchemaRegistryUsername | (Opcional) Nombre de usuario del Registro de esquemas de Avro. Consulte Conexiones para obtener más información. |
| SchemaRegistryPassword | (Opcional) Contraseña del Registro de esquemas de Avro. Consulte Conexiones para obtener más información. |
| OAuthBearerMethod | (Opcional) Método Bearer de OAuth. Los valores aceptados son oidc y default. |
| OAuthBearerClientId | (Opcional) Cuando OAuthBearerMethod se establece oidcen , especifica el identificador de cliente de portador de OAuth. Consulte Conexiones para obtener más información. |
| OAuthBearerClientSecret | (Opcional) Cuando OAuthBearerMethod se establece oidcen , especifica el secreto de cliente de portador de OAuth. Consulte Conexiones para obtener más información. |
| OAuthBearerScope | (Opcional) Especifica el ámbito de la solicitud de acceso al agente. |
| OAuthBearerTokenEndpointUrl | (Opcional) URI http(S) del punto de conexión del token de emisor de OAuth/OIDC que se usa para recuperar el token cuando oidc se usa el método. Consulte Conexiones para obtener más información. |
| OAuthBearerExtensions | (Opcional) Lista separada por comas de pares clave=valor que se proporcionarán como información adicional para el agente cuando oidc se usa el método. Por ejemplo: supportFeatureX=true,organizationId=sales-emea. |
anotaciones
La KafkaTrigger anotación permite crear una función que se ejecuta cuando recibe un tema. Entre las opciones admitidas se incluyen los elementos siguientes:
| Elemento | Descripción |
|---|---|
| name | (Obligatorio) Nombre de la variable que representa el mensaje de cola o tema en el código de la función. |
| brokerList | (Obligatorio) Lista de agentes de Kafka que supervisa el desencadenador. Consulte Conexiones para obtener más información. |
| topic | (Obligatorio) Tema supervisado por el desencadenador. |
| cardinalidad | (Opcional) Indica la cardinalidad de la entrada del desencadenador. Los valores admitidos son ONE (predeterminado) y MANY. Se usa ONE cuando la entrada es un único mensaje y MANY cuando la entrada es una matriz de mensajes. Cuando se usa MANY, también se debe establecer un valor dataType. |
| dataType | Define la manera en que Functions controla el valor del parámetro. De forma predeterminada, el valor se obtiene como una cadena y Functions intenta deserializarla en el objeto de Java antiguo sin formato (POJO) real. Cuando es string, la entrada se trata como una cadena. Cuando es binary, el mensaje se recibe como datos binarios y Functions intenta deserializarlo en bytes de tipo de parámetro real[]. |
| consumerGroup | (Opcional) Grupo de consumidores de Kafka que usa el desencadenador. |
| avroSchema | (Opcional) Esquema de un registro genérico al usar el protocolo de Avro. |
| authenticationMode | (Opcional) Modo de autenticación cuando se usa la autenticación SASL (Nivel de seguridad y autenticación simples). Los valores admitidos son NotSet (valor predeterminado), Gssapi, Plain, ScramSha256, ScramSha512. |
| username | (Opcional) Nombre de usuario para la autenticación SASL. No se admite cuando AuthenticationMode es Gssapi. Consulte Conexiones para obtener más información. |
| password | (Opcional) Contraseña para la autenticación SASL. No se admite cuando AuthenticationMode es Gssapi. Consulte Conexiones para obtener más información. |
| protocolo | (Opcional) Protocolo de seguridad que se usa al comunicarse con agentes. Los valores admitidos son NotSet (valor predeterminado), plaintext, ssl, sasl_plaintext, sasl_ssl. |
| sslCaLocation | (Opcional) Ruta de acceso al archivo de certificado de CA para comprobar el certificado del agente. |
| sslCertificateLocation | (Opcional) Ruta de acceso al certificado del cliente. |
| sslKeyLocation | (Opcional) Ruta de acceso a la clave privada (PEM) del cliente que se usa para la autenticación. |
| sslKeyPassword | (Opcional) Contraseña del certificado del cliente. |
| lagThreshold | (Opcional) Umbral de retardo para el desencadenador. |
| schemaRegistryUrl | (Opcional) Dirección URL del Registro de esquemas de Avro. Consulte Conexiones para obtener más información. |
| schemaRegistryUsername | (Opcional) Nombre de usuario del Registro de esquemas de Avro. Consulte Conexiones para obtener más información. |
| schemaRegistryPassword | (Opcional) Contraseña del Registro de esquemas de Avro. Consulte Conexiones para obtener más información. |
Configuración
En la siguiente tabla se explican las propiedades de configuración de enlace que se establecen en el archivo function.json.
| Propiedad de function.json | Descripción |
|---|---|
| type | (Obligatorio) Establezca en kafkaTrigger. |
| direction | (Obligatorio) Establezca en in. |
| name | (Obligatorio) Nombre de la variable que representa los datos del agente en el código de la función. |
| brokerList | (Obligatorio) Lista de agentes de Kafka que supervisa el desencadenador. Consulte Conexiones para obtener más información. |
| topic | (Obligatorio) Tema supervisado por el desencadenador. |
| cardinalidad | (Opcional) Indica la cardinalidad de la entrada del desencadenador. Los valores admitidos son ONE (predeterminado) y MANY. Se usa ONE cuando la entrada es un único mensaje y MANY cuando la entrada es una matriz de mensajes. Cuando se usa MANY, también se debe establecer un valor dataType. |
| dataType | Define la manera en que Functions controla el valor del parámetro. De forma predeterminada, el valor se obtiene como una cadena y Functions intenta deserializarla en el objeto de Java antiguo sin formato (POJO) real. Cuando es string, la entrada se trata como una cadena. Cuando binary, el mensaje se recibe como datos binarios y Functions intenta deserializarlo en un tipo de parámetro de matriz de bytes real. |
| consumerGroup | (Opcional) Grupo de consumidores de Kafka que usa el desencadenador. |
| avroSchema | (Opcional) Esquema de un registro genérico al usar el protocolo de Avro. |
| keyAvroSchema | (Opcional) Esquema de un registro genérico de la clave de mensaje al usar el protocolo Avro. |
| keyDataType | (Opcional) Tipo de datos para recibir la clave de mensaje a partir del tema de Kafka. Si keyAvroSchema se establece, este valor es un registro genérico. Los valores aceptados son Int, Long, Stringy Binary. |
| authenticationMode | (Opcional) Modo de autenticación cuando se usa la autenticación SASL (Nivel de seguridad y autenticación simples). Los valores admitidos son NotSet (valor predeterminado), Gssapi, Plain, ScramSha256, ScramSha512. |
| username | (Opcional) Nombre de usuario para la autenticación SASL. No se admite cuando AuthenticationMode es Gssapi. Consulte Conexiones para obtener más información. |
| password | (Opcional) Contraseña para la autenticación SASL. No se admite cuando AuthenticationMode es Gssapi. Consulte Conexiones para obtener más información. |
| protocolo | (Opcional) Protocolo de seguridad que se usa al comunicarse con agentes. Los valores admitidos son NotSet (valor predeterminado), plaintext, ssl, sasl_plaintext, sasl_ssl. |
| sslCaLocation | (Opcional) Ruta de acceso al archivo de certificado de CA para comprobar el certificado del agente. |
| sslCertificateLocation | (Opcional) Ruta de acceso al certificado del cliente. |
| sslKeyLocation | (Opcional) Ruta de acceso a la clave privada (PEM) del cliente que se usa para la autenticación. |
| sslKeyPassword | (Opcional) Contraseña del certificado del cliente. |
| sslCertificatePEM | (Opcional) Certificado de cliente en formato PEM como una cadena. Consulte Conexiones para obtener más información. |
| sslKeyPEM | (Opcional) Clave privada de cliente en formato PEM como una cadena. Consulte Conexiones para obtener más información. |
| sslCaPEM | (Opcional) Certificado de ENTIDAD de certificación en formato PEM como una cadena. Consulte Conexiones para obtener más información. |
| sslCertificateandKeyPEM | (Opcional) Certificado de cliente y clave en formato PEM como una cadena. Consulte Conexiones para obtener más información. |
| lagThreshold | (Opcional) Umbral de retardo para el desencadenador. |
| schemaRegistryUrl | (Opcional) Dirección URL del Registro de esquemas de Avro. Consulte Conexiones para obtener más información. |
| schemaRegistryUsername | (Opcional) Nombre de usuario del Registro de esquemas de Avro. Consulte Conexiones para obtener más información. |
| schemaRegistryPassword | (Opcional) Contraseña del Registro de esquemas de Avro. Consulte Conexiones para obtener más información. |
| oAuthBearerMethod | (Opcional) Método Bearer de OAuth. Los valores aceptados son oidc y default. |
| oAuthBearerClientId | (Opcional) Cuando oAuthBearerMethod se establece oidcen , especifica el identificador de cliente de portador de OAuth. Consulte Conexiones para obtener más información. |
| oAuthBearerClientSecret | (Opcional) Cuando oAuthBearerMethod se establece oidcen , especifica el secreto de cliente de portador de OAuth. Consulte Conexiones para obtener más información. |
| oAuthBearerScope | (Opcional) Especifica el ámbito de la solicitud de acceso al agente. |
| oAuthBearerTokenEndpointUrl | (Opcional) URI http(S) del punto de conexión del token de emisor de OAuth/OIDC que se usa para recuperar el token cuando oidc se usa el método. Consulte Conexiones para obtener más información. |
Configuración
En la siguiente tabla se explican las propiedades de configuración de enlace que se establecen en el archivo function.json. Python usa snake_case convenciones de nomenclatura para las propiedades de configuración.
| Propiedad de function.json | Descripción |
|---|---|
| type | (Obligatorio) Establezca en kafkaTrigger. |
| direction | (Obligatorio) Establezca en in. |
| name | (Obligatorio) Nombre de la variable que representa los datos del agente en el código de la función. |
| broker_list | (Obligatorio) Lista de agentes de Kafka que supervisa el desencadenador. Consulte Conexiones para obtener más información. |
| topic | (Obligatorio) Tema supervisado por el desencadenador. |
| cardinalidad | (Opcional) Indica la cardinalidad de la entrada del desencadenador. Los valores admitidos son ONE (predeterminado) y MANY. Se usa ONE cuando la entrada es un único mensaje y MANY cuando la entrada es una matriz de mensajes. Cuando se usa MANY, también se debe establecer un valor data_type. |
| data_type | Define la manera en que Functions controla el valor del parámetro. De forma predeterminada, el valor se obtiene como una cadena y Functions intenta deserializarla en el objeto de Java antiguo sin formato (POJO) real. Cuando es string, la entrada se trata como una cadena. Cuando es binary, el mensaje se recibe como datos binarios y Functions intenta deserializarlo en bytes de tipo de parámetro real[]. |
| consumerGroup | (Opcional) Grupo de consumidores de Kafka que usa el desencadenador. |
| avroSchema | (Opcional) Esquema de un registro genérico al usar el protocolo de Avro. |
| authentication_mode | (Opcional) Modo de autenticación cuando se usa la autenticación SASL (Nivel de seguridad y autenticación simples). Los valores admitidos son NOTSET (valor predeterminado), Gssapi, Plain, ScramSha256, ScramSha512. |
| username | (Opcional) Nombre de usuario para la autenticación SASL. No se admite cuando authentication_mode es Gssapi. Consulte Conexiones para obtener más información. |
| password | (Opcional) Contraseña para la autenticación SASL. No se admite cuando authentication_mode es Gssapi. Consulte Conexiones para obtener más información. |
| protocolo | (Opcional) Protocolo de seguridad que se usa al comunicarse con agentes. Los valores admitidos son NOTSET (valor predeterminado), plaintext, ssl, sasl_plaintext, sasl_ssl. |
| sslCaLocation | (Opcional) Ruta de acceso al archivo de certificado de CA para comprobar el certificado del agente. |
| sslCertificateLocation | (Opcional) Ruta de acceso al certificado del cliente. |
| sslKeyLocation | (Opcional) Ruta de acceso a la clave privada (PEM) del cliente que se usa para la autenticación. |
| sslKeyPassword | (Opcional) Contraseña del certificado del cliente. |
| lag_threshold | (Opcional) Umbral de retardo para el desencadenador. |
| schema_registry_url | (Opcional) Dirección URL del Registro de esquemas de Avro. Consulte Conexiones para obtener más información. |
| schema_registry_username | (Opcional) Nombre de usuario del Registro de esquemas de Avro. Consulte Conexiones para obtener más información. |
| schema_registry_password | (Opcional) Contraseña del Registro de esquemas de Avro. Consulte Conexiones para obtener más información. |
| o_auth_bearer_method | (Opcional) Método Bearer de OAuth. Los valores aceptados son oidc y default. |
| o_auth_bearer_client_id | (Opcional) Cuando o_auth_bearer_method se establece oidcen , especifica el identificador de cliente de portador de OAuth. Consulte Conexiones para obtener más información. |
| o_auth_bearer_client_secret | (Opcional) Cuando o_auth_bearer_method se establece oidcen , especifica el secreto de cliente de portador de OAuth. Consulte Conexiones para obtener más información. |
| o_auth_bearer_scope | (Opcional) Especifica el ámbito de la solicitud de acceso al agente. |
| o_auth_bearer_token_endpoint_url | (Opcional) URI http(S) del punto de conexión del token de emisor de OAuth/OIDC que se usa para recuperar el token cuando oidc se usa el método. Consulte Conexiones para obtener más información. |
Nota:
Las propiedades relacionadas con PEM de certificado y las propiedades relacionadas con claves de Avro aún no están disponibles en la biblioteca de Python.
Uso
El desencadenador de Kafka admite actualmente eventos de Kafka como cadenas y matrices de cadenas que son cargas JSON.
El desencadenador de Kafka pasa mensajes de Kafka a la función como cadenas. El desencadenador también admite matrices de cadenas que son cargas JSON.
En un plan Premium, debe habilitar la supervisión de escalado en tiempo de ejecución para la salida de Kafka para escalar horizontalmente a varias instancias. Para obtener más información, consulte Habilitación del escalado en tiempo de ejecución.
No puede usar la característica Prueba y ejecución de la página Código y prueba en Azure Portal para trabajar con desencadenadores de Kafka. Debe enviar eventos de prueba directamente al tema supervisado por el desencadenador.
Para obtener un conjunto completo de configuraciones de host.json admitidas en el desencadenador de Kafka, consulte la configuración de host.json.
Conexiones
Almacene toda la información de conexión requerida por los desencadenadores y enlaces en la configuración de la aplicación, no en las definiciones de enlace del código. Esta guía se aplica a las credenciales, que nunca debe almacenar en el código.
Importante
La configuración de credenciales debe hacer referencia a una configuración de aplicación. No codifique las credenciales de forma rígida en el código ni en los archivos de configuración. Cuando realice la ejecución de forma local, use el archivo local.settings.json para las credenciales y no publique el archivo local.settings.json.
Al conectarse a un clúster de Kafka administrado proporcionado por Confluent en Azure, puede usar uno de los métodos de autenticación siguientes.
Nota:
Al usar el plan de consumo flexible, no se admiten las propiedades de autenticación de certificados basadas en ubicación de archivos (SslCaLocation, SslCertificateLocation, SslKeyLocation). En su lugar, use las propiedades de certificado basadas en PEM (SslCaPEM, SslCertificatePEM, SslKeyPEM, SslCertificateandKeyPEM) o almacene certificados en Azure Key Vault.
Registro de esquemas
Para usar el registro de esquema proporcionado por Confluent en la extensión de Kafka, establezca las siguientes credenciales:
| Setting | Valor recomendado | Descripción |
|---|---|---|
| SchemaRegistryUrl | SchemaRegistryUrl |
Dirección URL del servicio del Registro de esquemas usado para la administración de esquemas. Normalmente, del formato https://psrc-xyz.us-east-2.aws.confluent.cloud |
| SchemaRegistryUsername | CONFLUENT_API_KEY |
Nombre de usuario para la autenticación básica en el registro de esquema (si es necesario). |
| SchemaRegistryPassword | CONFLUENT_API_SECRET |
Contraseña para la autenticación básica en el registro de esquema (si es necesario). |
Autenticación de nombre de usuario y contraseña
Al usar esta forma de autenticación, asegúrese de que Protocol está establecido SaslPlaintext en o SaslSsl, AuthenticationMode está establecido PlainScramSha256 en o ScramSha512 y, si el certificado de ENTIDAD de certificación que se usa es diferente del certificado X1 raíz isRG predeterminado, asegúrese de actualizar SslCaLocation o SslCaPEM.
| Setting | Valor recomendado | Descripción |
|---|---|---|
| BrokerList | BootstrapServer |
La configuración de la aplicación denominada BootstrapServer contiene el valor del servidor de arranque que se encuentra en la página de configuración de Confluent Cloud. El valor es similar a xyz-xyzxzy.westeurope.azure.confluent.cloud:9092. |
| Nombre de usuario | ConfluentCloudUsername |
La configuración de la aplicación denominada ConfluentCloudUsername contiene la clave de acceso de API del sitio web de Confluent Cloud. |
| Contraseña | ConfluentCloudPassword |
La configuración de la aplicación denominada ConfluentCloudPassword contiene el secreto de API obtenido en el sitio web de Confluent Cloud. |
| SslCaPEM | SSLCaPemCertificate |
Configuración de la aplicación denominada SSLCaPemCertificate que contiene el certificado de ENTIDAD de certificación como una cadena en formato PEM. El valor debe seguir el formato estándar, por ejemplo: -----BEGIN CERTIFICATE-----\nMII....JQ==\n-----END CERTIFICATE-----. |
Autenticación SSL
Asegúrese de que Protocol está establecido en SSL.
| Setting | Valor recomendado | Descripción |
|---|---|---|
| BrokerList | BootstrapServer |
La configuración de la aplicación denominada BootstrapServer contiene el valor del servidor de arranque que se encuentra en la página de configuración de Confluent Cloud. El valor es similar a xyz-xyzxzy.westeurope.azure.confluent.cloud:9092. |
| SslCaPEM | SslCaCertificatePem |
Configuración de la aplicación denominada SslCaCertificatePem que contiene el valor PEM del certificado de entidad de certificación como una cadena. El valor debe seguir el formato estándar: -----BEGIN CERTIFICATE-----\nMII...JQ==\n-----END CERTIFICATE----- |
| SslCertificatePEM | SslClientCertificatePem |
Configuración de la aplicación denominada SslClientCertificatePem que contiene el valor PEM del certificado de cliente como una cadena. El valor debe seguir el formato estándar: -----BEGIN CERTIFICATE-----\nMII...JQ==\n-----END CERTIFICATE----- |
| SslKeyPEM | SslClientKeyPem |
Configuración de la aplicación denominada SslClientKeyPem que contiene el valor PEM de la clave privada del cliente como una cadena. El valor debe seguir el formato estándar: -----BEGIN PRIVATE KEY-----\nMII...JQ==\n-----END PRIVATE KEY----- |
| SslCertificateandKeyPEM | SslClientCertificateAndKeyPem |
Configuración de la aplicación denominada SslClientCertificateAndKeyPem que contiene el valor PEM del certificado de cliente y la clave privada de cliente concatenada como una cadena. El valor debe seguir el formato estándar: -----BEGIN CERTIFICATE-----\nMII....JQ==\n-----END CERTIFICATE-----\n-----BEGIN PRIVATE KEY-----\nMIIE....BM=\n-----END PRIVATE KEY----- |
| SslKeyPassword | SslClientKeyPassword |
Configuración de la aplicación denominada SslClientKeyPassword que contiene la contraseña de la clave privada (si existe). |
Autenticación de OAuth
Al usar la autenticación de OAuth, configure las propiedades relacionadas con OAuth en las definiciones de enlace.
Los valores de la cadena que use en esta configuración deben estar presentes como la configuración de la aplicación en Azure o en la colección Values en el archivo local.settings.json durante el desarrollo local.
También debe establecer y en las ProtocolAuthenticationMode definiciones de enlace.