Nota:
El acceso a esta página requiere autorización. Puede intentar iniciar sesión o cambiar directorios.
El acceso a esta página requiere autorización. Puede intentar cambiar los directorios.
El enlace de salida permite que una aplicación de Azure Functions envíe mensajes a un tema de Kafka.
Importante
Los enlaces de Kafka están disponibles para Functions en el plan de consumo flexible, el plan Elastic Premium y el plan dedicado (App Service). Solo se admiten en la versión 4.x del entorno de ejecución de Functions.
Ejemplo
La forma de usar el enlace depende de la modalidad de C# en la aplicación de funciones. Puede usar una de las siguientes modalidades:
Función de C# compilada que usa una biblioteca de clases de proceso de trabajo aislada que se ejecuta en un proceso independiente del tiempo de ejecución.
Los atributos que use dependen del proveedor de eventos específico.
En el ejemplo siguiente se usa un tipo de valor devuelto personalizado denominado MultipleOutputType, que consta de una respuesta HTTP y una salida de Kafka.
[Function("KafkaOutput")]
public static MultipleOutputType Output(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
FunctionContext executionContext)
{
var log = executionContext.GetLogger("HttpFunction");
log.LogInformation("C# HTTP trigger function processed a request.");
string message = req.FunctionContext
.BindingContext
.BindingData["message"]
.ToString();
var response = req.CreateResponse(HttpStatusCode.OK);
return new MultipleOutputType()
{
Kevent = message,
HttpResponse = response
};
}
En la MultipleOutputType clase , Kevent es la variable de enlace de salida para el enlace de Kafka.
public class MultipleOutputType
{
[KafkaOutput("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain
)]
public string Kevent { get; set; }
public HttpResponseData HttpResponse { get; set; }
}
Para enviar un lote de eventos, pase una matriz de cadenas al tipo de salida, tal como se muestra en el ejemplo siguiente:
[Function("KafkaOutputMany")]
public static MultipleOutputTypeForBatch Output(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
FunctionContext executionContext)
{
var log = executionContext.GetLogger("HttpFunction");
log.LogInformation("C# HTTP trigger function processed a request.");
var response = req.CreateResponse(HttpStatusCode.OK);
string[] messages = new string[2];
messages[0] = "one";
messages[1] = "two";
return new MultipleOutputTypeForBatch()
{
Kevents = messages,
HttpResponse = response
};
}
La matriz de cadenas se define como la Kevents propiedad de la clase y el enlace de salida se define en esta propiedad:
public class MultipleOutputTypeForBatch
{
[KafkaOutput("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain
)]
public string[] Kevents { get; set; }
public HttpResponseData HttpResponse { get; set; }
}
La siguiente función agrega encabezados a los datos de salida de Kafka:
[Function("KafkaOutputWithHeaders")]
public static MultipleOutputType Output(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
FunctionContext executionContext)
{
var log = executionContext.GetLogger("HttpFunction");
log.LogInformation("C# HTTP trigger function processed a request.");
string message = req.FunctionContext
.BindingContext
.BindingData["message"]
.ToString();
string kevent = "{ \"Offset\":364,\"Partition\":0,\"Topic\":\"kafkaeventhubtest1\",\"Timestamp\":\"2022-04-09T03:20:06.591Z\", \"Value\": \"" + message + "\", \"Headers\": [{ \"Key\": \"test\", \"Value\": \"dotnet-isolated\" }] }";
var response = req.CreateResponse(HttpStatusCode.OK);
return new MultipleOutputType()
{
Kevent = kevent,
HttpResponse = response
};
}
Para obtener un conjunto completo de ejemplos de .NET en funcionamiento, consulte el repositorio de extensiones de Kafka.
El uso del enlace de salida depende de la versión del modelo de programación de Node.js.
En el modelo Node.js v4, defina el enlace de salida directamente en el código de función. Para más información, vea la Guía de Azure Functions para desarrolladores de Node.js.
En estos ejemplos, los proveedores de eventos son Confluent o Azure Event Hubs. Estos ejemplos muestran un enlace de salida de Kafka para una función que desencadena una solicitud HTTP y envía datos de la solicitud al tema de Kafka.
import {
app,
HttpRequest,
HttpResponseInit,
InvocationContext,
output,
} from "@azure/functions";
const kafkaOutput = output.generic({
type: "kafka",
direction: "out",
topic: "topic",
brokerList: "%BrokerList%",
username: "ConfluentCloudUsername",
password: "ConfluentCloudPassword",
protocol: "saslSsl",
authenticationMode: "plain",
});
export async function kafkaOutputWithHttp(
request: HttpRequest,
context: InvocationContext
): Promise<HttpResponseInit> {
context.log(`Http function processed request for url "${request.url}"`);
const body = await request.text();
const queryName = request.query.get("name");
const parsedbody = JSON.parse(body);
const name = queryName || parsedbody.name || "world";
context.extraOutputs.set(kafkaOutput, `Hello, ${parsedbody.name}!`);
context.log(
`Sending message to kafka: ${context.extraOutputs.get(kafkaOutput)}`
);
return {
body: `Message sent to kafka with value: ${context.extraOutputs.get(
kafkaOutput
)}`,
status: 200,
};
}
const extraOutputs = [];
extraOutputs.push(kafkaOutput);
app.http("kafkaOutputWithHttp", {
methods: ["GET", "POST"],
authLevel: "anonymous",
extraOutputs,
handler: kafkaOutputWithHttp,
});
Para enviar eventos en un lote, envíe una matriz de mensajes, como se muestra en estos ejemplos:
const { app, output } = require("@azure/functions");
const kafkaOutput = output.generic({
type: "kafka",
direction: "out",
topic: "topic",
brokerList: "%BrokerList%",
username: "ConfluentCloudUsername",
password: "ConfluentCloudPassword",
protocol: "saslSsl",
authenticationMode: "plain",
});
async function kafkaOutputManyWithHttp(request, context) {
context.log(`Http function processed request for url "${request.url}"`);
const queryName = request.query.get("name");
const body = await request.text();
const parsedbody = body ? JSON.parse(body) : {};
parsedbody.name = parsedbody.name || "world";
const name = queryName || parsedbody.name;
context.extraOutputs.set(kafkaOutput, `Message one. Hello, ${name}!`);
context.extraOutputs.set(kafkaOutput, `Message two. Hello, ${name}!`);
return {
body: `Messages sent to kafka.`,
status: 200,
};
}
const extraOutputs = [];
extraOutputs.push(kafkaOutput);
app.http("kafkaOutputManyWithHttp", {
methods: ["GET", "POST"],
authLevel: "anonymous",
extraOutputs,
handler: kafkaOutputManyWithHttp,
});
Estos ejemplos muestran cómo enviar un mensaje de evento con encabezados a un tema de Kafka:
import {
app,
HttpRequest,
HttpResponseInit,
InvocationContext,
output,
} from "@azure/functions";
const kafkaOutput = output.generic({
type: "kafka",
direction: "out",
topic: "topic",
brokerList: "%BrokerList%",
username: "ConfluentCloudUsername",
password: "ConfluentCloudPassword",
protocol: "saslSsl",
authenticationMode: "plain",
});
export async function kafkaOutputWithHttp(
request: HttpRequest,
context: InvocationContext
): Promise<HttpResponseInit> {
context.log(`Http function processed request for url "${request.url}"`);
const body = await request.text();
const parsedbody = JSON.parse(body);
// assuming body is of the format { "key": "key", "value": {JSON object} }
context.extraOutputs.set(
kafkaOutput,
`{ "Offset":364,"Partition":0,"Topic":"test-topic","Timestamp":"2022-04-09T03:20:06.591Z", "Value": "${JSON.stringify(
parsedbody.value
).replace(/"/g, '\\"')}", "Key":"${
parsedbody.key
}", "Headers": [{ "Key": "language", "Value": "javascript" }] }`
);
context.log(
`Sending message to kafka: ${context.extraOutputs.get(kafkaOutput)}`
);
return {
body: `Message sent to kafka with value: ${context.extraOutputs.get(
kafkaOutput
)}`,
status: 200,
};
}
const extraOutputs = [];
extraOutputs.push(kafkaOutput);
app.http("kafkaOutputWithHttp", {
methods: ["GET", "POST"],
authLevel: "anonymous",
extraOutputs,
handler: kafkaOutputWithHttp,
});
Para obtener un conjunto completo de ejemplos de JavaScript en funcionamiento, consulte el repositorio de extensiones de Kafka.
import {
app,
HttpRequest,
HttpResponseInit,
InvocationContext,
output,
} from "@azure/functions";
const kafkaOutput = output.generic({
type: "kafka",
direction: "out",
topic: "topic",
brokerList: "%BrokerList%",
username: "ConfluentCloudUsername",
password: "ConfluentCloudPassword",
protocol: "saslSsl",
authenticationMode: "plain",
});
export async function kafkaOutputWithHttp(
request: HttpRequest,
context: InvocationContext
): Promise<HttpResponseInit> {
context.log(`Http function processed request for url "${request.url}"`);
const body = await request.text();
const queryName = request.query.get("name");
const parsedbody = JSON.parse(body);
const name = queryName || parsedbody.name || "world";
context.extraOutputs.set(kafkaOutput, `Hello, ${parsedbody.name}!`);
context.log(
`Sending message to kafka: ${context.extraOutputs.get(kafkaOutput)}`
);
return {
body: `Message sent to kafka with value: ${context.extraOutputs.get(
kafkaOutput
)}`,
status: 200,
};
}
const extraOutputs = [];
extraOutputs.push(kafkaOutput);
app.http("kafkaOutputWithHttp", {
methods: ["GET", "POST"],
authLevel: "anonymous",
extraOutputs,
handler: kafkaOutputWithHttp,
});
Para enviar eventos en un lote, envíe una matriz de mensajes, como se muestra en estos ejemplos:
import {
app,
HttpRequest,
HttpResponseInit,
InvocationContext,
output,
} from "@azure/functions";
const kafkaOutput = output.generic({
type: "kafka",
direction: "out",
topic: "topic",
brokerList: "%BrokerList%",
username: "ConfluentCloudUsername",
password: "ConfluentCloudPassword",
protocol: "saslSsl",
authenticationMode: "plain",
});
export async function kafkaOutputManyWithHttp(
request: HttpRequest,
context: InvocationContext
): Promise<HttpResponseInit> {
context.log(`Http function processed request for url "${request.url}"`);
const queryName = request.query.get("name");
const body = await request.text();
const parsedbody = body ? JSON.parse(body) : {};
parsedbody.name = parsedbody.name || "world";
const name = queryName || parsedbody.name;
context.extraOutputs.set(kafkaOutput, `Message one. Hello, ${name}!`);
context.extraOutputs.set(kafkaOutput, `Message two. Hello, ${name}!`);
return {
body: `Messages sent to kafka.`,
status: 200,
};
}
const extraOutputs = [];
extraOutputs.push(kafkaOutput);
app.http("kafkaOutputManyWithHttp", {
methods: ["GET", "POST"],
authLevel: "anonymous",
extraOutputs,
handler: kafkaOutputManyWithHttp,
});
Estos ejemplos muestran cómo enviar un mensaje de evento con encabezados a un tema de Kafka:
import {
app,
HttpRequest,
HttpResponseInit,
InvocationContext,
output,
} from "@azure/functions";
const kafkaOutput = output.generic({
type: "kafka",
direction: "out",
topic: "topic",
brokerList: "%BrokerList%",
username: "ConfluentCloudUsername",
password: "ConfluentCloudPassword",
protocol: "saslSsl",
authenticationMode: "plain",
});
export async function kafkaOutputWithHttp(
request: HttpRequest,
context: InvocationContext
): Promise<HttpResponseInit> {
context.log(`Http function processed request for url "${request.url}"`);
const body = await request.text();
const parsedbody = JSON.parse(body);
// assuming body is of the format { "key": "key", "value": {JSON object} }
context.extraOutputs.set(
kafkaOutput,
`{ "Offset":364,"Partition":0,"Topic":"test-topic","Timestamp":"2022-04-09T03:20:06.591Z", "Value": "${JSON.stringify(
parsedbody.value
).replace(/"/g, '\\"')}", "Key":"${
parsedbody.key
}", "Headers": [{ "Key": "language", "Value": "typescript" }] }`
);
context.log(
`Sending message to kafka: ${context.extraOutputs.get(kafkaOutput)}`
);
return {
body: `Message sent to kafka with value: ${context.extraOutputs.get(
kafkaOutput
)}`,
status: 200,
};
}
const extraOutputs = [];
extraOutputs.push(kafkaOutput);
app.http("kafkaOutputWithHttp", {
methods: ["GET", "POST"],
authLevel: "anonymous",
extraOutputs,
handler: kafkaOutputWithHttp,
});
Para obtener un conjunto completo de ejemplos de TypeScript en funcionamiento, consulte el repositorio de extensiones de Kafka.
Las propiedades específicas del archivo function.json dependen del proveedor de eventos, que en estos ejemplos son Confluent o Azure Event Hubs. En los ejemplos siguientes se muestra un enlace de salida de Kafka para una función que desencadena una solicitud HTTP y envía datos de la solicitud al tema de Kafka.
El siguiente archivo function.json define el desencadenador para el proveedor específico en estos ejemplos:
{
"bindings": [
{
"authLevel": "function",
"type": "httpTrigger",
"direction": "in",
"name": "Request",
"methods": [
"get"
]
},
{
"type": "kafka",
"name": "outputMessage",
"brokerList": "BrokerList",
"topic": "topic",
"username" : "%ConfluentCloudUserName%",
"password" : "%ConfluentCloudPassword%",
"protocol": "SASLSSL",
"authenticationMode": "PLAIN",
"direction": "out"
},
{
"type": "http",
"direction": "out",
"name": "Response"
}
]
}
El código siguiente envía un mensaje al tema:
using namespace System.Net
# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)
# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."
# Interact with query parameters or the body of the request.
$message = $Request.Query.Message
$message
Push-OutputBinding -Name outputMessage -Value ($message)
# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
StatusCode = [HttpStatusCode]::OK
})
El código siguiente envía varios mensajes como una matriz al mismo tema:
using namespace System.Net
# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)
# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."
$message = @("one", "two")
Push-OutputBinding -Name outputMessage -Value ($message)
# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
StatusCode = [HttpStatusCode]::OK
})
En el ejemplo siguiente se muestra cómo enviar un mensaje de evento con encabezados al mismo tema de Kafka:
using namespace System.Net
# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)
# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."
# Interact with query parameters or the body of the request.
$message = $Request.Query.Message
if (-not $message) {
$message = $Request.Body.Message
}
$kevent = @{
Offset = 364
Partition = 0
Topic = "kafkaeventhubtest1"
Timestamp = "2022-04-09T03:20:06.591Z"
Value = $message
Headers= @(@{
Key= "test"
Value= "powershell"
}
)
}
Push-OutputBinding -Name Message -Value $kevent
# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
StatusCode = [HttpStatusCode]::OK
Body = 'ok'
})
Para obtener un conjunto completo de ejemplos de PowerShell en funcionamiento, consulte el repositorio de extensiones de Kafka.
El uso del enlace de salida depende de la versión del modelo de programación de Python.
En el modelo de Python v2, debe definir el enlace de salida directamente en el código de función mediante decoradores. Para más información, consulte la guía para desarrolladores de Python de Azure Functions.
Estos ejemplos muestran un enlace de salida de Kafka para una función que desencadena una solicitud HTTP y envía datos de la solicitud al tema de Kafka.
input_msg = req.params.get('message')
outputMessage.set(input_msg)
return 'OK'
@KafkaOutput.function_name(name="KafkaOutputMany")
@KafkaOutput.route(route="kafka_output_many")
@KafkaOutput.kafka_output(arg_name="outputMessage", topic="KafkaTopic", broker_list="KafkaBrokerList", username="KafkaUsername", password="KafkaPassword", protocol="SaslSsl", authentication_mode="Plain", data_type="string")
def kafka_output_many(req: func.HttpRequest, outputMessage: func.Out[str] ) -> func.HttpResponse:
outputMessage.set(json.dumps(['one', 'two']))
return 'OK'
Para enviar eventos en un lote, envíe una matriz de mensajes, como se muestra en estos ejemplos:
@KafkaOutput.route(route="kafka_output_with_headers")
@KafkaOutput.kafka_output(arg_name="out", topic="KafkaTopic", broker_list="KafkaBrokerList", username="KafkaUsername", password="KafkaPassword", protocol="SaslSsl", authentication_mode="Plain")
def kafka_output_with_headers(req: func.HttpRequest, out: func.Out[str]) -> func.HttpResponse:
message = req.params.get('message')
kevent = { "Offset":0,"Partition":0,"Topic":"dummy","Timestamp":"2022-04-09T03:20:06.591Z", "Value": message, "Headers": [{ "Key": "test", "Value": "python" }] }
out.set(json.dumps(kevent))
return 'OK'
@KafkaOutput.function_name(name="KafkaOutputManyWithHeaders")
@KafkaOutput.route(route="kafka_output_many_with_headers")
@KafkaOutput.kafka_output(arg_name="out", topic="KafkaTopic", broker_list="KafkaBrokerList", username="KafkaUsername", password="KafkaPassword", protocol="SaslSsl", authentication_mode="Plain")
def kafka_output_many_with_headers(req: func.HttpRequest, out: func.Out[str]) -> func.HttpResponse:
kevent = [{ "Offset": 364, "Partition":0,"Topic":"kafkaeventhubtest1","Timestamp":"2022-04-09T03:20:06.591Z", "Value": "one", "Headers": [{ "Key": "test", "Value": "python" }] },
Estos ejemplos muestran cómo enviar un mensaje de evento con encabezados a un tema de Kafka:
Para obtener un conjunto completo de ejemplos de Python en funcionamiento, consulte el repositorio de extensiones de Kafka.
Las anotaciones que se usan para configurar el enlace de salida dependen del proveedor de eventos específico.
La siguiente función envía un mensaje al tema de Kafka.
@FunctionName("KafkaOutput")
public HttpResponseMessage run(
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
@KafkaOutput(
name = "kafkaOutput",
topic = "topic",
brokerList="%BrokerList%",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
protocol = BrokerProtocol.SASLSSL
) OutputBinding<String> output,
final ExecutionContext context) {
context.getLogger().info("Java HTTP trigger processed a request.");
// Parse query parameter
String query = request.getQueryParameters().get("message");
String message = request.getBody().orElse(query);
context.getLogger().info("Message:" + message);
output.setValue(message);
return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
En el ejemplo siguiente se muestra cómo enviar varios mensajes a un tema de Kafka.
@FunctionName("KafkaOutputMany")
public HttpResponseMessage run(
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
@KafkaOutput(
name = "kafkaOutput",
topic = "topic",
brokerList="%BrokerList%",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
protocol = BrokerProtocol.SASLSSL
) OutputBinding<String[]> output,
final ExecutionContext context) {
context.getLogger().info("Java HTTP trigger processed a request.");
String[] messages = new String[2];
messages[0] = "one";
messages[1] = "two";
output.setValue(messages);
return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
}
En este ejemplo, el parámetro de enlace de salida se cambia a la matriz de cadenas.
En el último ejemplo se usan estas KafkaEntity clases y KafkaHeader :
public class KafkaEntity {
public int Offset;
public int Partition;
public String Timestamp;
public String Topic;
public String Value;
public KafkaHeaders Headers[];
public KafkaEntity(int Offset, int Partition, String Topic, String Timestamp, String Value,KafkaHeaders[] headers) {
this.Offset = Offset;
this.Partition = Partition;
this.Topic = Topic;
this.Timestamp = Timestamp;
this.Value = Value;
this.Headers = headers;
}
public class KafkaHeaders{
public String Key;
public String Value;
public KafkaHeaders(String key, String value) {
this.Key = key;
this.Value = value;
}
La siguiente función de ejemplo envía un mensaje con encabezados a un tema de Kafka.
@FunctionName("KafkaOutputWithHeaders")
public HttpResponseMessage run(
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
@KafkaOutput(
name = "kafkaOutput",
topic = "topic",
brokerList="%BrokerList%",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
protocol = BrokerProtocol.SASLSSL
) OutputBinding<KafkaEntity> output,
final ExecutionContext context) {
context.getLogger().info("Java HTTP trigger processed a request.");
// Parse query parameter
String query = request.getQueryParameters().get("message");
String message = request.getBody().orElse(query);
KafkaHeaders[] headers = new KafkaHeaders[1];
headers[0] = new KafkaHeaders("test", "java");
KafkaEntity kevent = new KafkaEntity(364, 0, "topic", "2022-04-09T03:20:06.591Z", message, headers);
output.setValue(kevent);
return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
}
Para obtener un conjunto completo de ejemplos de Java en funcionamiento para Confluent, consulte el repositorio de extensiones de Kafka.
Atributos
Las bibliotecas de C# en proceso y de proceso de trabajo aislado usan el atributo Kafka para definir el desencadenador de la función.
En la tabla siguiente se explican las propiedades que puede establecer mediante este atributo:
| Parámetro | Descripción |
|---|---|
| BrokerList | (Obligatorio) Lista de agentes de Kafka a los que se envía la salida. Para obtener más información, consulte Conexiones. |
| Tema. | (Obligatorio) El tema al que se envía la salida. |
| AvroSchema | (Opcional) Esquema de un registro genérico de valor de mensaje al usar el protocolo Avro. |
| KeyAvroSchema | (Opcional) Esquema de un registro genérico de la clave de mensaje al usar el protocolo Avro. |
| KeyDataType | (Opcional) Tipo de datos para enviar la clave de mensaje como al tema de Kafka. Si KeyAvroSchema se establece, este valor es un registro genérico. Los valores aceptados son Int, Long, Stringy Binary. |
| MaxMessageBytes | (Opcional) Tamaño máximo del mensaje de salida que se envía (en MB), con un valor predeterminado de 1. |
| BatchSize | (Opcional) Número máximo de mensajes por lotes en un único conjunto de mensajes, con un valor predeterminado de 10000. |
| EnableIdempotence | (Opcional) Cuando se establece en true, garantiza que los mensajes se generan correctamente una vez y en el orden de producción original, con un valor predeterminado de false. |
| MessageTimeoutMs | (Opcional) Tiempo de espera del mensaje local, en milisegundos. Este valor solo se aplica localmente y limita el tiempo en que un mensaje generado espera una entrega correcta, con un valor predeterminado 300000. Un tiempo de 0 es infinito. Este valor es el tiempo máximo que se usa para entregar un mensaje (incluidos los reintentos). El error de entrega se produce cuando se supera el número de reintentos o el tiempo de espera del mensaje. |
| RequestTimeoutMs | (Opcional) Tiempo de espera de confirmación de la solicitud de salida, en milisegundos, con un valor predeterminado de 5000. |
| MaxRetries | (Opcional) Número de veces que se va a reintentar el envío de un mensaje con error, con un valor predeterminado de 2. El reintento puede provocar el reordenamiento, a menos que EnableIdempotence esté establecido en true. |
| AuthenticationMode | (Opcional) Modo de autenticación cuando se usa la autenticación SASL (Nivel de seguridad y autenticación simples). Los valores admitidos son NotSet (valor predeterminado), Gssapi, Plain, ScramSha256, ScramSha512y OAuthBearer. |
| Nombre de usuario | (Opcional) Nombre de usuario para la autenticación SASL. No se admite cuando AuthenticationMode es Gssapi. Para obtener más información, consulte Conexiones. |
| Contraseña | (Opcional) Contraseña para la autenticación SASL. No se admite cuando AuthenticationMode es Gssapi. Para obtener más información, consulte Conexiones. |
| Protocolo | (Opcional) Protocolo de seguridad que se usa al comunicarse con agentes. Los valores admitidos son NotSet (valor predeterminado), plaintext, ssl, sasl_plaintext, sasl_ssl. |
| SslCaLocation | (Opcional) Ruta de acceso al archivo de certificado de CA para comprobar el certificado del agente. |
| SslCertificateLocation | (Opcional) Ruta de acceso al certificado del cliente. |
| SslKeyLocation | (Opcional) Ruta de acceso a la clave privada (PEM) del cliente que se usa para la autenticación. |
| SslKeyPassword | (Opcional) Contraseña del certificado del cliente. |
| SslCertificatePEM | (Opcional) Certificado de cliente en formato PEM como una cadena. Para obtener más información, consulte Conexiones. |
| SslKeyPEM | (Opcional) Clave privada de cliente en formato PEM como una cadena. Para obtener más información, consulte Conexiones. |
| SslCaPEM | (Opcional) Certificado de ENTIDAD de certificación en formato PEM como una cadena. Para obtener más información, consulte Conexiones. |
| SslCertificateandKeyPEM | (Opcional) Certificado de cliente y clave en formato PEM como una cadena. Para obtener más información, consulte Conexiones. |
| SchemaRegistryUrl | (Opcional) Dirección URL del Registro de esquemas de Avro. Para obtener más información, consulte Conexiones. |
| SchemaRegistryUsername | (Opcional) Nombre de usuario del Registro de esquemas de Avro. Para obtener más información, consulte Conexiones. |
| SchemaRegistryPassword | (Opcional) Contraseña del Registro de esquemas de Avro. Para obtener más información, consulte Conexiones. |
| OAuthBearerMethod | (Opcional) Método Bearer de OAuth. Los valores aceptados son oidc y default. |
| OAuthBearerClientId | (Opcional) Cuando OAuthBearerMethod se establece oidcen , especifica el identificador de cliente de portador de OAuth. Para obtener más información, consulte Conexiones. |
| OAuthBearerClientSecret | (Opcional) Cuando OAuthBearerMethod se establece oidcen , especifica el secreto de cliente de portador de OAuth. Para obtener más información, consulte Conexiones. |
| OAuthBearerScope | (Opcional) Especifica el ámbito de la solicitud de acceso al agente. |
| OAuthBearerTokenEndpointUrl | (Opcional) URI http(S) del punto de conexión del token de emisor de OAuth/OIDC que se usa para recuperar el token cuando oidc se usa el método. Para obtener más información, consulte Conexiones. |
| OAuthBearerExtensions | (Opcional) Lista separada por comas de pares clave=valor que se proporcionarán como información adicional para el agente cuando oidc se usa el método. Por ejemplo: supportFeatureX=true,organizationId=sales-emea. |
anotaciones
La KafkaOutput anotación permite crear una función que escribe en un tema específico. Entre las opciones admitidas se incluyen los siguientes elementos:
| Elemento | Descripción |
|---|---|
| name | Nombre de la variable que representa los datos del agente en el código de la función. |
| brokerList | (Obligatorio) Lista de agentes de Kafka a los que se envía la salida. Para obtener más información, consulte Conexiones. |
| topic | (Obligatorio) El tema al que se envía la salida. |
| dataType | Define la manera en que Functions controla el valor del parámetro. De forma predeterminada, el valor se obtiene como una cadena y Functions intenta deserializarla en el objeto de Java antiguo sin formato (POJO) real. Cuando es string, la entrada se trata como una cadena. Cuando es binary, el mensaje se recibe como datos binarios y Functions intenta deserializarlo en bytes de tipo de parámetro real[]. |
| avroSchema | (Opcional) Esquema de un registro genérico al usar el protocolo Avro. (Actualmente no se admite para Java). |
| maxMessageBytes | (Opcional) Tamaño máximo del mensaje de salida que se envía (en MB), con un valor predeterminado de 1. |
| batchSize | (Opcional) Número máximo de mensajes por lotes en un único conjunto de mensajes, con un valor predeterminado de 10000. |
| enableIdempotence | (Opcional) Cuando se establece trueen , garantiza que los mensajes se generan correctamente una vez y en el orden de producción original, con un valor predeterminado de false. |
| messageTimeoutMs | (Opcional) Tiempo de espera del mensaje local, en milisegundos. Este valor solo se aplica localmente y limita el tiempo en que un mensaje generado espera una entrega correcta, con un valor predeterminado 300000. Un tiempo de 0 es infinito. Este valor es el tiempo máximo que se usa para entregar un mensaje (incluidos los reintentos). El error de entrega se produce cuando se supera el número de reintentos o el tiempo de espera del mensaje. |
| requestTimeoutMs | (Opcional) Tiempo de espera de confirmación de la solicitud de salida, en milisegundos, con un valor predeterminado de 5000. |
| maxRetries | (Opcional) Número de veces que se va a reintentar el envío de un mensaje con error, con un valor predeterminado de 2. El reintento puede provocar el reordenamiento, a menos que EnableIdempotence esté establecido en true. |
| authenticationMode | (Opcional) Modo de autenticación cuando se usa la autenticación SASL (Nivel de seguridad y autenticación simples). Los valores admitidos son NotSet (valor predeterminado), Gssapi, Plain, ScramSha256, ScramSha512. |
| username | (Opcional) Nombre de usuario para la autenticación SASL. No se admite cuando AuthenticationMode es Gssapi. Para obtener más información, consulte Conexiones. |
| password | (Opcional) Contraseña para la autenticación SASL. No se admite cuando AuthenticationMode es Gssapi. Para obtener más información, consulte Conexiones. |
| protocolo | (Opcional) Protocolo de seguridad que se usa al comunicarse con agentes. Los valores admitidos son NotSet (valor predeterminado), plaintext, ssl, sasl_plaintext, sasl_ssl. |
| sslCaLocation | (Opcional) Ruta de acceso al archivo de certificado de CA para comprobar el certificado del agente. |
| sslCertificateLocation | (Opcional) Ruta de acceso al certificado del cliente. |
| sslKeyLocation | (Opcional) Ruta de acceso a la clave privada (PEM) del cliente que se usa para la autenticación. |
| sslKeyPassword | (Opcional) Contraseña del certificado del cliente. |
| schemaRegistryUrl | (Opcional) Dirección URL del Registro de esquemas de Avro. Para obtener más información, consulte Conexiones. |
| schemaRegistryUsername | (Opcional) Nombre de usuario del Registro de esquemas de Avro. Para obtener más información, consulte Conexiones. |
| schemaRegistryPassword | (Opcional) Contraseña del Registro de esquemas de Avro. Para obtener más información, consulte Conexiones. |
Configuración
En la siguiente tabla se explican las propiedades de configuración de enlace que se establecen en el archivo function.json.
| Propiedad de function.json | Descripción |
|---|---|
| type | Establécelo en kafka. |
| direction | Establécelo en out. |
| name | Nombre de la variable que representa los datos del agente en el código de la función. |
| brokerList | (Obligatorio) Lista de agentes de Kafka a los que se envía la salida. Para obtener más información, consulte Conexiones. |
| topic | (Obligatorio) El tema al que se envía la salida. |
| avroSchema | (Opcional) Esquema de un registro genérico al usar el protocolo Avro. |
| keyAvroSchema | (Opcional) Esquema de un registro genérico de la clave de mensaje al usar el protocolo Avro. |
| keyDataType | (Opcional) Tipo de datos para enviar la clave de mensaje como al tema de Kafka. Si keyAvroSchema se establece, este valor es un registro genérico. Los valores aceptados son Int, Long, Stringy Binary. |
| maxMessageBytes | (Opcional) Tamaño máximo del mensaje de salida que se envía (en MB), con un valor predeterminado de 1. |
| batchSize | (Opcional) Número máximo de mensajes por lotes en un único conjunto de mensajes, con un valor predeterminado de 10000. |
| enableIdempotence | (Opcional) Cuando se establece trueen , garantiza que los mensajes se generan correctamente una vez y en el orden de producción original, con un valor predeterminado de false. |
| messageTimeoutMs | (Opcional) Tiempo de espera del mensaje local, en milisegundos. Este valor solo se aplica localmente y limita el tiempo en que un mensaje generado espera una entrega correcta, con un valor predeterminado 300000. Un tiempo de 0 es infinito. Este valor es el tiempo máximo que se usa para entregar un mensaje (incluidos los reintentos). El error de entrega se produce cuando se supera el número de reintentos o el tiempo de espera del mensaje. |
| requestTimeoutMs | (Opcional) Tiempo de espera de confirmación de la solicitud de salida, en milisegundos, con un valor predeterminado de 5000. |
| maxRetries | (Opcional) Número de veces que se va a reintentar el envío de un mensaje con error, con un valor predeterminado de 2. El reintento puede provocar el reordenamiento, a menos que EnableIdempotence esté establecido en true. |
| authenticationMode | (Opcional) Modo de autenticación cuando se usa la autenticación SASL (Nivel de seguridad y autenticación simples). Los valores admitidos son NotSet (valor predeterminado), Gssapi, Plain, ScramSha256, ScramSha512. |
| username | (Opcional) Nombre de usuario para la autenticación SASL. No se admite cuando AuthenticationMode es Gssapi. Para obtener más información, consulte Conexiones. |
| password | (Opcional) Contraseña para la autenticación SASL. No se admite cuando AuthenticationMode es Gssapi. Para obtener más información, consulte Conexiones. |
| protocolo | (Opcional) Protocolo de seguridad que se usa al comunicarse con agentes. Los valores admitidos son NotSet (valor predeterminado), plaintext, ssl, sasl_plaintext, sasl_ssl. |
| sslCaLocation | (Opcional) Ruta de acceso al archivo de certificado de CA para comprobar el certificado del agente. |
| sslCertificateLocation | (Opcional) Ruta de acceso al certificado del cliente. |
| sslKeyLocation | (Opcional) Ruta de acceso a la clave privada (PEM) del cliente que se usa para la autenticación. |
| sslKeyPassword | (Opcional) Contraseña del certificado del cliente. |
| sslCertificatePEM | (Opcional) Certificado de cliente en formato PEM como una cadena. Para obtener más información, consulte Conexiones. |
| sslKeyPEM | (Opcional) Clave privada de cliente en formato PEM como una cadena. Para obtener más información, consulte Conexiones. |
| sslCaPEM | (Opcional) Certificado de ENTIDAD de certificación en formato PEM como una cadena. Para obtener más información, consulte Conexiones. |
| sslCertificateandKeyPEM | (Opcional) Certificado de cliente y clave en formato PEM como una cadena. Para obtener más información, consulte Conexiones. |
| schemaRegistryUrl | (Opcional) Dirección URL del Registro de esquemas de Avro. Para obtener más información, consulte Conexiones. |
| schemaRegistryUsername | (Opcional) Nombre de usuario del Registro de esquemas de Avro. Para obtener más información, consulte Conexiones. |
| schemaRegistryPassword | (Opcional) Contraseña del Registro de esquemas de Avro. Para obtener más información, consulte Conexiones. |
| oAuthBearerMethod | (Opcional) Método Bearer de OAuth. Los valores aceptados son oidc y default. |
| oAuthBearerClientId | (Opcional) Cuando oAuthBearerMethod se establece oidcen , especifica el identificador de cliente de portador de OAuth. Para obtener más información, consulte Conexiones. |
| oAuthBearerClientSecret | (Opcional) Cuando oAuthBearerMethod se establece oidcen , especifica el secreto de cliente de portador de OAuth. Para obtener más información, consulte Conexiones. |
| oAuthBearerScope | (Opcional) Especifica el ámbito de la solicitud de acceso al agente. |
| oAuthBearerTokenEndpointUrl | (Opcional) URI http(S) del punto de conexión del token de emisor de OAuth/OIDC que se usa para recuperar el token cuando oidc se usa el método. Para obtener más información, consulte Conexiones. |
Configuración
En la siguiente tabla se explican las propiedades de configuración de enlace que se establecen en el archivo function.json. Python usa snake_case convenciones de nomenclatura para las propiedades de configuración.
| Propiedad de function.json | Descripción |
|---|---|
| type | Establécelo en kafka. |
| direction | Establécelo en out. |
| name | Nombre de la variable que representa los datos del agente en el código de la función. |
| broker_list | (Obligatorio) Lista de agentes de Kafka a los que se envía la salida. Para obtener más información, consulte Conexiones. |
| topic | (Obligatorio) El tema al que se envía la salida. |
| avroSchema | (Opcional) Esquema de un registro genérico al usar el protocolo Avro. |
| maxMessageBytes | (Opcional) Tamaño máximo del mensaje de salida que se envía (en MB), con un valor predeterminado de 1. |
| batchSize | (Opcional) Número máximo de mensajes por lotes en un único conjunto de mensajes, con un valor predeterminado de 10000. |
| enableIdempotence | (Opcional) Cuando se establece trueen , garantiza que los mensajes se generan correctamente una vez y en el orden de producción original, con un valor predeterminado de false. |
| messageTimeoutMs | (Opcional) Tiempo de espera del mensaje local, en milisegundos. Este valor solo se aplica localmente y limita el tiempo en que un mensaje generado espera una entrega correcta, con un valor predeterminado 300000. Un tiempo de 0 es infinito. Este valor es el tiempo máximo que se usa para entregar un mensaje (incluidos los reintentos). El error de entrega se produce cuando se supera el número de reintentos o el tiempo de espera del mensaje. |
| requestTimeoutMs | (Opcional) Tiempo de espera de confirmación de la solicitud de salida, en milisegundos, con un valor predeterminado de 5000. |
| maxRetries | (Opcional) Número de veces que se va a reintentar el envío de un mensaje con error, con un valor predeterminado de 2. El reintento puede provocar el reordenamiento, a menos que EnableIdempotence esté establecido en true. |
| authentication_mode | (Opcional) Modo de autenticación cuando se usa la autenticación SASL (Nivel de seguridad y autenticación simples). Los valores admitidos son NOTSET (valor predeterminado), Gssapi, Plain, ScramSha256, ScramSha512. |
| username | (Opcional) Nombre de usuario para la autenticación SASL. No se admite cuando authentication_mode es Gssapi. Para obtener más información, consulte Conexiones. |
| password | (Opcional) Contraseña para la autenticación SASL. No se admite cuando authentication_mode es Gssapi. Para obtener más información, consulte Conexiones. |
| protocolo | (Opcional) Protocolo de seguridad que se usa al comunicarse con agentes. Los valores admitidos son NOTSET (valor predeterminado), plaintext, ssl, sasl_plaintext, sasl_ssl. |
| sslCaLocation | (Opcional) Ruta de acceso al archivo de certificado de CA para comprobar el certificado del agente. |
| sslCertificateLocation | (Opcional) Ruta de acceso al certificado del cliente. |
| sslKeyLocation | (Opcional) Ruta de acceso a la clave privada (PEM) del cliente que se usa para la autenticación. |
| sslKeyPassword | (Opcional) Contraseña del certificado del cliente. |
| schema_registry_url | (Opcional) Dirección URL del Registro de esquemas de Avro. Para obtener más información, consulte Conexiones. |
| schema_registry_username | (Opcional) Nombre de usuario del Registro de esquemas de Avro. Para obtener más información, consulte Conexiones. |
| schema_registry_password | (Opcional) Contraseña del Registro de esquemas de Avro. Para obtener más información, consulte Conexiones. |
| o_auth_bearer_method | (Opcional) Método Bearer de OAuth. Los valores aceptados son oidc y default. |
| o_auth_bearer_client_id | (Opcional) Cuando o_auth_bearer_method se establece oidcen , especifica el identificador de cliente de portador de OAuth. Para obtener más información, consulte Conexiones. |
| o_auth_bearer_client_secret | (Opcional) Cuando o_auth_bearer_method se establece oidcen , especifica el secreto de cliente de portador de OAuth. Para obtener más información, consulte Conexiones. |
| o_auth_bearer_scope | (Opcional) Especifica el ámbito de la solicitud de acceso al agente. |
| o_auth_bearer_token_endpoint_url | (Opcional) URI http(S) del punto de conexión del token de emisor de OAuth/OIDC que se usa para recuperar el token cuando oidc se usa el método. Para obtener más información, consulte Conexiones. |
Nota:
Las propiedades relacionadas con PEM de certificado y las propiedades relacionadas con claves de Avro aún no están disponibles en la biblioteca de Python.
Uso
El desplazamiento, la partición y la marca de tiempo del evento se generan en tiempo de ejecución. Solo puede establecer el valor y los encabezados dentro de la función. Establezca el tema en el archivo function.json.
Asegúrese de que tiene acceso al tema de Kafka donde desea escribir. Tiene que configurar el enlace con credenciales de acceso y conexión al tema de Kafka.
En un plan Premium, debe habilitar la supervisión de escalado en tiempo de ejecución para la salida de Kafka para escalar horizontalmente a varias instancias. Para obtener más información, consulte Habilitación del escalado en tiempo de ejecución.
Para obtener un conjunto completo de configuraciones de host.json admitidas en el desencadenador de Kafka, consulte la configuración de host.json.
Conexiones
Almacene toda la información de conexión requerida por los desencadenadores y enlaces en la configuración de la aplicación, no en las definiciones de enlace del código. Esta guía se aplica a las credenciales, que nunca debe almacenar en el código.
Importante
La configuración de credenciales debe hacer referencia a una configuración de aplicación. No codifique las credenciales de forma rígida en el código ni en los archivos de configuración. Cuando realice la ejecución de forma local, use el archivo local.settings.json para las credenciales y no publique el archivo local.settings.json.
Al conectarse a un clúster de Kafka administrado proporcionado por Confluent en Azure, puede usar uno de los métodos de autenticación siguientes.
Nota:
Al usar el plan de consumo flexible, no se admiten las propiedades de autenticación de certificados basadas en ubicación de archivos (SslCaLocation, SslCertificateLocation, SslKeyLocation). En su lugar, use las propiedades de certificado basadas en PEM (SslCaPEM, SslCertificatePEM, SslKeyPEM, SslCertificateandKeyPEM) o almacene certificados en Azure Key Vault.
Registro de esquemas
Para usar el registro de esquema proporcionado por Confluent en la extensión de Kafka, establezca las siguientes credenciales:
| Setting | Valor recomendado | Descripción |
|---|---|---|
| SchemaRegistryUrl | SchemaRegistryUrl |
Dirección URL del servicio del Registro de esquemas usado para la administración de esquemas. Normalmente, del formato https://psrc-xyz.us-east-2.aws.confluent.cloud |
| SchemaRegistryUsername | CONFLUENT_API_KEY |
Nombre de usuario para la autenticación básica en el registro de esquema (si es necesario). |
| SchemaRegistryPassword | CONFLUENT_API_SECRET |
Contraseña para la autenticación básica en el registro de esquema (si es necesario). |
Autenticación de nombre de usuario y contraseña
Al usar esta forma de autenticación, asegúrese de que Protocol está establecido SaslPlaintext en o SaslSsl, AuthenticationMode está establecido PlainScramSha256 en o ScramSha512 y, si el certificado de ENTIDAD de certificación que se usa es diferente del certificado X1 raíz isRG predeterminado, asegúrese de actualizar SslCaLocation o SslCaPEM.
| Setting | Valor recomendado | Descripción |
|---|---|---|
| BrokerList | BootstrapServer |
La configuración de la aplicación denominada BootstrapServer contiene el valor del servidor de arranque que se encuentra en la página de configuración de Confluent Cloud. El valor es similar a xyz-xyzxzy.westeurope.azure.confluent.cloud:9092. |
| Nombre de usuario | ConfluentCloudUsername |
La configuración de la aplicación denominada ConfluentCloudUsername contiene la clave de acceso de API del sitio web de Confluent Cloud. |
| Contraseña | ConfluentCloudPassword |
La configuración de la aplicación denominada ConfluentCloudPassword contiene el secreto de API obtenido en el sitio web de Confluent Cloud. |
| SslCaPEM | SSLCaPemCertificate |
Configuración de la aplicación denominada SSLCaPemCertificate que contiene el certificado de ENTIDAD de certificación como una cadena en formato PEM. El valor debe seguir el formato estándar, por ejemplo: -----BEGIN CERTIFICATE-----\nMII....JQ==\n-----END CERTIFICATE-----. |
Autenticación SSL
Asegúrese de que Protocol está establecido en SSL.
| Setting | Valor recomendado | Descripción |
|---|---|---|
| BrokerList | BootstrapServer |
La configuración de la aplicación denominada BootstrapServer contiene el valor del servidor de arranque que se encuentra en la página de configuración de Confluent Cloud. El valor es similar a xyz-xyzxzy.westeurope.azure.confluent.cloud:9092. |
| SslCaPEM | SslCaCertificatePem |
Configuración de la aplicación denominada SslCaCertificatePem que contiene el valor PEM del certificado de entidad de certificación como una cadena. El valor debe seguir el formato estándar: -----BEGIN CERTIFICATE-----\nMII...JQ==\n-----END CERTIFICATE----- |
| SslCertificatePEM | SslClientCertificatePem |
Configuración de la aplicación denominada SslClientCertificatePem que contiene el valor PEM del certificado de cliente como una cadena. El valor debe seguir el formato estándar: -----BEGIN CERTIFICATE-----\nMII...JQ==\n-----END CERTIFICATE----- |
| SslKeyPEM | SslClientKeyPem |
Configuración de la aplicación denominada SslClientKeyPem que contiene el valor PEM de la clave privada del cliente como una cadena. El valor debe seguir el formato estándar: -----BEGIN PRIVATE KEY-----\nMII...JQ==\n-----END PRIVATE KEY----- |
| SslCertificateandKeyPEM | SslClientCertificateAndKeyPem |
Configuración de la aplicación denominada SslClientCertificateAndKeyPem que contiene el valor PEM del certificado de cliente y la clave privada de cliente concatenada como una cadena. El valor debe seguir el formato estándar: -----BEGIN CERTIFICATE-----\nMII....JQ==\n-----END CERTIFICATE-----\n-----BEGIN PRIVATE KEY-----\nMIIE....BM=\n-----END PRIVATE KEY----- |
| SslKeyPassword | SslClientKeyPassword |
Configuración de la aplicación denominada SslClientKeyPassword que contiene la contraseña de la clave privada (si existe). |
Autenticación de OAuth
Al usar la autenticación de OAuth, configure las propiedades relacionadas con OAuth en las definiciones de enlace.
Los valores de la cadena que use en esta configuración deben estar presentes como la configuración de la aplicación en Azure o en la colección Values en el archivo local.settings.json durante el desarrollo local.
También debe establecer y en las ProtocolAuthenticationMode definiciones de enlace.