Nota
O acesso a esta página requer autorização. Podes tentar iniciar sessão ou mudar de diretório.
O acesso a esta página requer autorização. Podes tentar mudar de diretório.
Use o gatilho Apache Kafka no Azure Functions para executar o seu código de função em resposta a mensagens em tópicos Kafka. Você também pode usar uma ligação de saída Kafka para escrever de sua função para um tópico. Para obter informações sobre detalhes de instalação e configuração, consulte Visão geral das ligações do Apache Kafka para o Azure Functions.
Importante
Estão disponíveis fixações Kafka para Funções no plano Flex Consumption, Elastic Premium Plan e Dedicated (App Service). Só são suportados na versão 4.x do runtime Functions.
Exemplo
O uso do gatilho depende da modalidade C# usada em seu aplicativo de função, que pode ser um dos seguintes modos:
Uma função C# compilada que utiliza uma biblioteca de classes de processo worker isolada que corre num processo separado do tempo de execução.
Os atributos que você usa dependem do provedor de eventos específico.
O exemplo a seguir mostra uma função C# que lê e registra a mensagem Kafka como um evento Kafka:
[Function("KafkaTrigger")]
public static void Run(
[KafkaTrigger("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain,
ConsumerGroup = "$Default")] string eventData, FunctionContext context)
{
var logger = context.GetLogger("KafkaFunction");
logger.LogInformation($"C# Kafka trigger function processed a message: {JObject.Parse(eventData)["Value"]}");
}
Para receber eventos em um lote, use uma matriz de cadeia de caracteres como entrada, conforme mostrado no exemplo a seguir:
[Function("KafkaTriggerMany")]
public static void Run(
[KafkaTrigger("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain,
ConsumerGroup = "$Default",
IsBatched = true)] string[] events, FunctionContext context)
{
foreach (var kevent in events)
{
var logger = context.GetLogger("KafkaFunction");
logger.LogInformation($"C# Kafka trigger function processed a message: {JObject.Parse(kevent)["Value"]}");
}
A função a seguir registra a mensagem e os cabeçalhos para o evento Kafka:
[Function("KafkaTriggerWithHeaders")]
public static void Run(
[KafkaTrigger("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain,
ConsumerGroup = "$Default")] string eventData, FunctionContext context)
{
var eventJsonObject = JObject.Parse(eventData);
var logger = context.GetLogger("KafkaFunction");
logger.LogInformation($"C# Kafka trigger function processed a message: {eventJsonObject["Value"]}");
var headersJArr = eventJsonObject["Headers"] as JArray;
logger.LogInformation("Headers for this event: ");
foreach (JObject header in headersJArr)
{
logger.LogInformation($"{header["Key"]} {System.Text.Encoding.UTF8.GetString((byte[])header["Value"])}");
}
}
Para obter um conjunto completo de exemplos .NET de trabalho, consulte o repositório de extensão Kafka.
A utilização do gatilho depende da tua versão do modelo de programação Node.js.
No modelo Node.js v4, defines o teu trigger diretamente no código da função. Para obter mais informações, consulte o Guia do desenvolvedor do Azure Functions Node.js.
Nestes exemplos, os fornecedores de eventos são ou Confluent ou Azure Event Hubs. Estes exemplos mostram como definir um gatilho de Kafka para uma função que lê uma mensagem de Kafka.
const { app } = require("@azure/functions");
async function kafkaTrigger(event, context) {
context.log("Event Offset: " + event.Offset);
context.log("Event Partition: " + event.Partition);
context.log("Event Topic: " + event.Topic);
context.log("Event Timestamp: " + event.Timestamp);
context.log("Event Key: " + event.Key);
context.log("Event Value (as string): " + event.Value);
let event_obj = JSON.parse(event.Value);
context.log("Event Value Object: ");
context.log(" Value.registertime: ", event_obj.registertime.toString());
context.log(" Value.userid: ", event_obj.userid);
context.log(" Value.regionid: ", event_obj.regionid);
context.log(" Value.gender: ", event_obj.gender);
}
app.generic("Kafkatrigger", {
trigger: {
type: "kafkaTrigger",
direction: "in",
name: "event",
topic: "topic",
brokerList: "%BrokerList%",
username: "%ConfluentCloudUserName%",
password: "%ConfluentCloudPassword%",
consumerGroup: "$Default",
protocol: "saslSsl",
authenticationMode: "plain",
dataType: "string"
},
handler: kafkaTrigger,
});
Para receber eventos num lote, defina o cardinality valor para many, como mostrado nestes exemplos:
const { app } = require("@azure/functions");
async function kafkaTriggerMany(events, context) {
for (const event of events) {
context.log("Event Offset: " + event.Offset);
context.log("Event Partition: " + event.Partition);
context.log("Event Topic: " + event.Topic);
context.log("Event Key: " + event.Key);
context.log("Event Timestamp: " + event.Timestamp);
context.log("Event Value (as string): " + event.Value);
let event_obj = JSON.parse(event.Value);
context.log("Event Value Object: ");
context.log(" Value.registertime: ", event_obj.registertime.toString());
context.log(" Value.userid: ", event_obj.userid);
context.log(" Value.regionid: ", event_obj.regionid);
context.log(" Value.gender: ", event_obj.gender);
}
}
app.generic("kafkaTriggerMany", {
trigger: {
type: "kafkaTrigger",
direction: "in",
name: "event",
topic: "topic",
brokerList: "%BrokerList%",
username: "%ConfluentCloudUserName%",
password: "%ConfluentCloudPassword%",
consumerGroup: "$Default",
protocol: "saslSsl",
authenticationMode: "plain",
dataType: "string",
cardinality: "MANY"
},
handler: kafkaTriggerMany,
});
Você pode definir um esquema Avro genérico para o evento passado para o gatilho. Este exemplo define o gatilho para o fornecedor específico com um esquema Avro genérico:
const { app } = require("@azure/functions");
async function kafkaAvroGenericTrigger(event, context) {
context.log("Processed kafka event: ", event);
if (context.triggerMetadata?.key !== undefined) {
context.log("message key: ", context.triggerMetadata?.key);
}
}
app.generic("kafkaAvroGenericTrigger", {
trigger: {
type: "kafkaTrigger",
direction: "in",
name: "event",
protocol: "SASLSSL",
password: "EventHubConnectionString",
dataType: "string",
topic: "topic",
authenticationMode: "PLAIN",
avroSchema:
'{"type":"record","name":"Payment","namespace":"io.confluent.examples.clients.basicavro","fields":[{"name":"id","type":"string"},{"name":"amount","type":"double"},{"name":"type","type":"string"}]}',
consumerGroup: "$Default",
username: "$ConnectionString",
brokerList: "%BrokerList%",
},
handler: kafkaAvroGenericTrigger,
});
Para obter um conjunto completo de exemplos JavaScript de trabalho, consulte o repositório de extensões Kafka.
import { app, InvocationContext } from "@azure/functions";
// This is a sample interface that describes the actual data in your event.
interface EventData {
registertime: number;
userid: string;
regionid: string;
gender: string;
}
export async function kafkaTrigger(
event: any,
context: InvocationContext
): Promise<void> {
context.log("Event Offset: " + event.Offset);
context.log("Event Partition: " + event.Partition);
context.log("Event Topic: " + event.Topic);
context.log("Event Timestamp: " + event.Timestamp);
context.log("Event Value (as string): " + event.Value);
let event_obj: EventData = JSON.parse(event.Value);
context.log("Event Value Object: ");
context.log(" Value.registertime: ", event_obj.registertime.toString());
context.log(" Value.userid: ", event_obj.userid);
context.log(" Value.regionid: ", event_obj.regionid);
context.log(" Value.gender: ", event_obj.gender);
}
app.generic("Kafkatrigger", {
trigger: {
type: "kafkaTrigger",
direction: "in",
name: "event",
topic: "topic",
brokerList: "%BrokerList%",
username: "%ConfluentCloudUserName%",
password: "%ConfluentCloudPassword%",
consumerGroup: "$Default",
protocol: "saslSsl",
authenticationMode: "plain",
dataType: "string"
},
handler: kafkaTrigger,
});
Para receber eventos num lote, defina o cardinality valor para many, como mostrado nestes exemplos:
import { app, InvocationContext } from "@azure/functions";
// This is a sample interface that describes the actual data in your event.
interface EventData {
registertime: number;
userid: string;
regionid: string;
gender: string;
}
interface KafkaEvent {
Offset: number;
Partition: number;
Topic: string;
Timestamp: number;
Value: string;
}
export async function kafkaTriggerMany(
events: any,
context: InvocationContext
): Promise<void> {
for (const event of events) {
context.log("Event Offset: " + event.Offset);
context.log("Event Partition: " + event.Partition);
context.log("Event Topic: " + event.Topic);
context.log("Event Timestamp: " + event.Timestamp);
context.log("Event Value (as string): " + event.Value);
let event_obj: EventData = JSON.parse(event.Value);
context.log("Event Value Object: ");
context.log(" Value.registertime: ", event_obj.registertime.toString());
context.log(" Value.userid: ", event_obj.userid);
context.log(" Value.regionid: ", event_obj.regionid);
context.log(" Value.gender: ", event_obj.gender);
}
}
app.generic("kafkaTriggerMany", {
trigger: {
type: "kafkaTrigger",
direction: "in",
name: "event",
topic: "topic",
brokerList: "%BrokerList%",
username: "%ConfluentCloudUserName%",
password: "%ConfluentCloudPassword%",
consumerGroup: "$Default",
protocol: "saslSsl",
authenticationMode: "plain",
dataType: "string",
cardinality: "MANY"
},
handler: kafkaTriggerMany,
});
Você pode definir um esquema Avro genérico para o evento passado para o gatilho. Este exemplo define o gatilho para o fornecedor específico com um esquema Avro genérico:
import { app, InvocationContext } from "@azure/functions";
export async function kafkaAvroGenericTrigger(
event: any,
context: InvocationContext
): Promise<void> {
context.log("Processed kafka event: ", event);
context.log(
`Message ID: ${event.id}, amount: ${event.amount}, type: ${event.type}`
);
if (context.triggerMetadata?.key !== undefined) {
context.log(`Message Key : ${context.triggerMetadata?.key}`);
}
}
app.generic("kafkaAvroGenericTrigger", {
trigger: {
type: "kafkaTrigger",
direction: "in",
name: "event",
protocol: "SASLSSL",
username: "ConfluentCloudUsername",
password: "ConfluentCloudPassword",
dataType: "string",
topic: "topic",
authenticationMode: "PLAIN",
avroSchema:
'{"type":"record","name":"Payment","namespace":"io.confluent.examples.clients.basicavro","fields":[{"name":"id","type":"string"},{"name":"amount","type":"double"},{"name":"type","type":"string"}]}',
consumerGroup: "$Default",
brokerList: "%BrokerList%",
},
handler: kafkaAvroGenericTrigger,
});
Para um conjunto completo de exemplos funcionais de TypeScript, consulte o repositório de extensões Kafka.
As propriedades específicas do function.json ficheiro dependem do seu fornecedor de eventos. Nestes exemplos, os fornecedores de eventos são ou Confluent ou Azure Event Hubs. Os exemplos a seguir mostram um gatilho Kafka para uma função que lê e registra uma mensagem Kafka.
O ficheiro seguinte function.json define o gatilho para o fornecedor específico:
{
"bindings": [
{
"type": "kafkaTrigger",
"name": "kafkaEvent",
"direction": "in",
"protocol" : "SASLSSL",
"password" : "%ConfluentCloudPassword%",
"dataType" : "string",
"topic" : "topic",
"authenticationMode" : "PLAIN",
"consumerGroup" : "$Default",
"username" : "%ConfluentCloudUserName%",
"brokerList" : "%BrokerList%",
"sslCaLocation": "confluent_cloud_cacert.pem"
}
]
}
O seguinte código é executado quando a função é ativada:
using namespace System.Net
param($kafkaEvent, $TriggerMetadata)
Write-Output "Powershell Kafka trigger function called for message $kafkaEvent.Value"
Para receber eventos em um lote, defina o cardinality valor como many no arquivo function.json, conforme mostrado nos exemplos a seguir:
{
"bindings": [
{
"type": "kafkaTrigger",
"name": "kafkaEvent",
"direction": "in",
"protocol" : "SASLSSL",
"password" : "%ConfluentCloudPassword%",
"dataType" : "string",
"topic" : "topic",
"authenticationMode" : "PLAIN",
"cardinality" : "MANY",
"consumerGroup" : "$Default",
"username" : "%ConfluentCloudUserName%",
"brokerList" : "%BrokerList%",
"sslCaLocation": "confluent_cloud_cacert.pem"
}
]
}
O código seguinte analisa o array de eventos e regista os dados do evento:
using namespace System.Net
param($kafkaEvents, $TriggerMetadata)
$kafkaEvents
foreach ($kafkaEvent in $kafkaEvents) {
$event = $kafkaEvent | ConvertFrom-Json -AsHashtable
Write-Output "Powershell Kafka trigger function called for message $event.Value"
}
O código seguinte regista os dados do cabeçalho:
using namespace System.Net
param($kafkaEvents, $TriggerMetadata)
foreach ($kafkaEvent in $kafkaEvents) {
$kevent = $kafkaEvent | ConvertFrom-Json -AsHashtable
Write-Output "Powershell Kafka trigger function called for message $kevent.Value"
Write-Output "Headers for this message:"
foreach ($header in $kevent.Headers) {
$DecodedValue = [System.Text.Encoding]::Unicode.GetString([System.Convert]::FromBase64String($header.Value))
$Key = $header.Key
Write-Output "Key: $Key Value: $DecodedValue"
}
}
Você pode definir um esquema Avro genérico para o evento passado para o gatilho. A function.json a seguir define o gatilho para o provedor específico com um esquema Avro genérico:
{
"bindings" : [ {
"type" : "kafkaTrigger",
"direction" : "in",
"name" : "kafkaEvent",
"protocol" : "SASLSSL",
"password" : "ConfluentCloudPassword",
"topic" : "topic",
"authenticationMode" : "PLAIN",
"avroSchema" : "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
"consumerGroup" : "$Default",
"username" : "ConfluentCloudUsername",
"brokerList" : "%BrokerList%"
} ]
}
O seguinte código é executado quando a função é ativada:
using namespace System.Net
param($kafkaEvent, $TriggerMetadata)
Write-Output "Powershell Kafka trigger function called for message $kafkaEvent.Value"
Para obter um conjunto completo de exemplos de PowerShell em funcionamento, consulte o repositório de extensão Kafka.
A utilização do trigger depende da tua versão do modelo de programação em Python.
No modelo Python v2, defines o teu trigger diretamente no código da função usando decoradores. Para mais informações, consulte o guia para programadores Azure Functions em Python.
Estes exemplos mostram como definir um gatilho de Kafka para uma função que lê uma mensagem de Kafka.
@KafkaTrigger.function_name(name="KafkaTrigger")
@KafkaTrigger.kafka_trigger(
arg_name="kevent",
topic="KafkaTopic",
broker_list="KafkaBrokerList",
username="KafkaUsername",
password="KafkaPassword",
protocol="SaslSsl",
authentication_mode="Plain",
consumer_group="$Default1")
def kafka_trigger(kevent : func.KafkaEvent):
logging.info(kevent.get_body().decode('utf-8'))
logging.info(kevent.metadata)
Este exemplo recebe eventos em lote ao definir o cardinality valor para many.
@KafkaTrigger.function_name(name="KafkaTriggerMany")
@KafkaTrigger.kafka_trigger(
arg_name="kevents",
topic="KafkaTopic",
broker_list="KafkaBrokerList",
username="KafkaUsername",
password="KafkaPassword",
protocol="SaslSsl",
authentication_mode="Plain",
cardinality="MANY",
data_type="string",
consumer_group="$Default2")
def kafka_trigger_many(kevents : typing.List[func.KafkaEvent]):
for event in kevents:
logging.info(event.get_body())
Você pode definir um esquema Avro genérico para o evento passado para o gatilho.
@KafkaTriggerAvro.function_name(name="KafkaTriggerAvroOne")
@KafkaTriggerAvro.kafka_trigger(
arg_name="kafkaTriggerAvroGeneric",
topic="KafkaTopic",
broker_list="KafkaBrokerList",
username="KafkaUsername",
password="KafkaPassword",
protocol="SaslSsl",
authentication_mode="Plain",
consumer_group="$Default",
avro_schema= "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}")
def kafka_trigger_avro_one(kafkaTriggerAvroGeneric : func.KafkaEvent):
logging.info(kafkaTriggerAvroGeneric.get_body().decode('utf-8'))
logging.info(kafkaTriggerAvroGeneric.metadata)
As anotações usadas para configurar o gatilho dependem do provedor de eventos específico.
O exemplo a seguir mostra uma função Java que lê e registra o conteúdo do evento Kafka:
@FunctionName("KafkaTrigger")
public void runSingle(
@KafkaTrigger(
name = "KafkaTrigger",
topic = "topic",
brokerList="%BrokerList%",
consumerGroup="$Default",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
protocol = BrokerProtocol.SASLSSL,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
dataType = "string"
) String kafkaEventData,
final ExecutionContext context) {
context.getLogger().info(kafkaEventData);
}
Para receber eventos em um lote, use uma cadeia de caracteres de entrada como uma matriz, conforme mostrado no exemplo a seguir:
@FunctionName("KafkaTriggerMany")
public void runMany(
@KafkaTrigger(
name = "kafkaTriggerMany",
topic = "topic",
brokerList="%BrokerList%",
consumerGroup="$Default",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
protocol = BrokerProtocol.SASLSSL,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
cardinality = Cardinality.MANY,
dataType = "string"
) String[] kafkaEvents,
final ExecutionContext context) {
for (String kevent: kafkaEvents) {
context.getLogger().info(kevent);
}
}
A função a seguir registra a mensagem e os cabeçalhos para o evento Kafka:
@FunctionName("KafkaTriggerManyWithHeaders")
public void runSingle(
@KafkaTrigger(
name = "KafkaTrigger",
topic = "topic",
brokerList="%BrokerList%",
consumerGroup="$Default",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
protocol = BrokerProtocol.SASLSSL,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
dataType = "string",
cardinality = Cardinality.MANY
) List<String> kafkaEvents,
final ExecutionContext context) {
Gson gson = new Gson();
for (String keventstr: kafkaEvents) {
KafkaEntity kevent = gson.fromJson(keventstr, KafkaEntity.class);
context.getLogger().info("Java Kafka trigger function called for message: " + kevent.Value);
context.getLogger().info("Headers for the message:");
for (KafkaHeaders header : kevent.Headers) {
String decodedValue = new String(Base64.getDecoder().decode(header.Value));
context.getLogger().info("Key:" + header.Key + " Value:" + decodedValue);
}
}
}
Você pode definir um esquema Avro genérico para o evento passado para o gatilho. A função a seguir define um gatilho para o provedor específico com um esquema Avro genérico:
private static final String schema = "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}";
@FunctionName("KafkaAvroGenericTrigger")
public void runOne(
@KafkaTrigger(
name = "kafkaAvroGenericSingle",
topic = "topic",
brokerList="%BrokerList%",
consumerGroup="$Default",
username = "ConfluentCloudUsername",
password = "ConfluentCloudPassword",
avroSchema = schema,
authenticationMode = BrokerAuthenticationMode.PLAIN,
protocol = BrokerProtocol.SASLSSL) Payment payment,
final ExecutionContext context) {
context.getLogger().info(payment.toString());
}
Para obter um conjunto completo de exemplos Java de trabalho para Confluent, consulte o repositório de extensão Kafka.
Atributos
As bibliotecas C# do processo de trabalho em processo e isoladas usam o KafkaTriggerAttribute para definir o gatilho de função.
A tabela seguinte explica as propriedades que pode definir usando este atributo trigger:
| Parâmetro | Description |
|---|---|
| Lista de Corretores | (Obrigatório) A lista de corretores Kafka monitorados pelo gatilho. Consulte Conexões para obter mais informações. |
| Tópico | (Obrigatório) O tópico monitorado pelo gatilho. |
| Grupo de Consumidores | (Opcional) Grupo de consumidores Kafka usado pelo gatilho. |
| AvroSchema | (Opcional) Esquema de um registo genérico do valor da mensagem ao utilizar o protocolo Avro. |
| KeyAvroSchema | (Opcional) Esquema de um registo genérico da chave de mensagem ao utilizar o protocolo Avro. |
| KeyDataType | (Opcional) Tipo de dados para receber a chave de mensagem a partir do Kafka Topic. Se KeyAvroSchema for definido, este valor é registo genérico. Os valores aceites são Int, Long, String, e Binary. |
| Modo de autenticação | (Opcional) O modo de autenticação ao usar a autenticação SASL (Simple Authentication and Security Layer). Os valores suportados são NotSet (por defeito), Gssapi, Plain, ScramSha256, ScramSha512, e OAuthBearer. |
| Nome de utilizador | (Opcional) O nome de usuário para autenticação SASL. Não suportado quando AuthenticationMode é Gssapi. Consulte Conexões para obter mais informações. |
| Palavra-passe | (Opcional) A senha para autenticação SASL. Não suportado quando AuthenticationMode é Gssapi. Consulte Conexões para obter mais informações. |
| Protocolo | (Opcional) O protocolo de segurança usado na comunicação com corretores. Os valores suportados são NotSet (por defeito), plaintext, ssl, sasl_plaintext, sasl_ssl. |
| Localização de SslCa. | (Opcional) Caminho para o arquivo de certificado da autoridade de certificação para verificar o certificado do corretor. |
| SslCertificateLocation | (Opcional) Caminho para o certificado do cliente. |
| SslKeyLocalização | (Opcional) Caminho para a chave privada do cliente (PEM) usada para autenticação. |
| SslKeyPassword | (Opcional) Senha para o certificado do cliente. |
| SslCertificatePEM | (Opcional) Certificado de cliente em formato PEM como uma cadeia. Consulte Conexões para obter mais informações. |
| SslKeyPEM | (Opcional) Chave privada do cliente em formato PEM como uma cadeia. Consulte Conexões para obter mais informações. |
| SslCaPEM | (Opcional) Certificado CA em formato PEM como string. Consulte Conexões para obter mais informações. |
| SslCertificateandKeyPEM | (Opcional) Certificado do cliente e chave em formato PEM como uma string. Consulte Conexões para obter mais informações. |
| SchemaRegistryUrl | (Opcional) URL para o Avro Schema Registry. Consulte Conexões para obter mais informações. |
| SchemaRegistryUsername | (Opcional) Nome de utilizador para o Avro Schema Registry. Consulte Conexões para obter mais informações. |
| SchemaRegistryPassword | (Opcional) Palavra-passe para o Registo de Esquemas Avro. Consulte Conexões para obter mais informações. |
| OAuthBearerMethod | (Opcional) Método OAuth Bearer. Os valores aceites são oidc e default. |
| OAuthBearerClientId | (Opcional) Quando OAuthBearerMethod está definido para oidc, isto especifica o ID do cliente portador OAuth. Consulte Conexões para obter mais informações. |
| OAuthBearerClientSecret | (Opcional) Quando OAuthBearerMethod está definido como oidc, isto especifica o secreto do cliente portador OAuth. Consulte Conexões para obter mais informações. |
| OAuthBearerScope | (Opcional) Especifica o âmbito do pedido de acesso ao corretor. |
| OAuthBearerTokenEndpointUrl | (Opcional) OAuth/OIDC endpoint do token emissor HTTP(S) URI usado para recuperar o token quando oidc o método é utilizado. Consulte Conexões para obter mais informações. |
| OAuthBearerExtensions | (Opcional) Lista separada por vírgulas de pares chave=valor a ser fornecida como informação adicional ao corretor quando oidc o método for utilizado. Por exemplo: supportFeatureX=true,organizationId=sales-emea. |
Anotações
A KafkaTrigger anotação permite-lhe criar uma função que é executada quando recebe um tópico. As opções suportadas incluem os seguintes elementos:
| Elemento | Description |
|---|---|
| Designação | (Obrigatório) O nome da variável que representa a fila ou a mensagem de tópico no código da função. |
| Lista de corretores | (Obrigatório) A lista de corretores Kafka monitorados pelo gatilho. Consulte Conexões para obter mais informações. |
| topic | (Obrigatório) O tópico monitorado pelo gatilho. |
| cardinalidade | (Opcional) Indica a cardinalidade da entrada do gatilho. Os valores suportados são ONE (padrão) e MANY. Use ONE quando a entrada é uma única mensagem e MANY quando a entrada é uma matriz de mensagens. Quando você usa MANYo , você também deve definir um dataTypearquivo . |
| Tipo de dados | Define como Functions lida com o valor do parâmetro. Por padrão, o valor é obtido como uma string e Functions tenta desserializar a string para o objeto Java plain-old real (POJO). Quando string, a entrada é tratada como apenas uma cadeia de caracteres. Quando binary, a mensagem é recebida como dados binários e o Functions tenta desserializá-la para um byte do tipo parâmetro real[]. |
| Grupo de consumidores | (Opcional) Grupo de consumidores Kafka usado pelo gatilho. |
| avroSchema | (Opcional) Esquema de um registro genérico ao usar o protocolo Avro. |
| authenticationMode | (Opcional) O modo de autenticação ao usar a autenticação SASL (Simple Authentication and Security Layer). Os valores suportados são NotSet (por defeito), Gssapi, Plain, ScramSha256, ScramSha512. |
| nome de utilizador | (Opcional) O nome de usuário para autenticação SASL. Não suportado quando AuthenticationMode é Gssapi. Consulte Conexões para obter mais informações. |
| palavra-passe | (Opcional) A senha para autenticação SASL. Não suportado quando AuthenticationMode é Gssapi. Consulte Conexões para obter mais informações. |
| protocolo | (Opcional) O protocolo de segurança usado na comunicação com corretores. Os valores suportados são NotSet (por defeito), plaintext, ssl, sasl_plaintext, sasl_ssl. |
| sslCaLocalização | (Opcional) Caminho para o arquivo de certificado da autoridade de certificação para verificar o certificado do corretor. |
| sslCertificateLocation | (Opcional) Caminho para o certificado do cliente. |
| sslKeyLocalização | (Opcional) Caminho para a chave privada do cliente (PEM) usada para autenticação. |
| sslKeyPassword | (Opcional) Senha para o certificado do cliente. |
| lagThreshold | (Opcional) Limiar de atraso para o gatilho. |
| schemaRegistryUrl | (Opcional) URL para o Avro Schema Registry. Consulte Conexões para obter mais informações. |
| schemaRegistryUsername | (Opcional) Nome de utilizador para o Avro Schema Registry. Consulte Conexões para obter mais informações. |
| schemaRegistryPassword | (Opcional) Palavra-passe para o Registo de Esquemas Avro. Consulte Conexões para obter mais informações. |
Configuração
A tabela a seguir explica as propriedades de configuração de associação definidas no arquivo function.json .
| function.json propriedade | Description |
|---|---|
| type | (Obrigatório) Definir para kafkaTrigger. |
| direção | (Obrigatório) Definir para in. |
| Designação | (Obrigatório) O nome da variável que representa os dados intermediados no código da função. |
| Lista de corretores | (Obrigatório) A lista de corretores Kafka monitorados pelo gatilho. Consulte Conexões para obter mais informações. |
| topic | (Obrigatório) O tópico monitorado pelo gatilho. |
| cardinalidade | (Opcional) Indica a cardinalidade da entrada do gatilho. Os valores suportados são ONE (padrão) e MANY. Use ONE quando a entrada é uma única mensagem e MANY quando a entrada é uma matriz de mensagens. Quando você usa MANYo , você também deve definir um dataTypearquivo . |
| Tipo de dados | Define como Functions lida com o valor do parâmetro. Por padrão, o valor é obtido como uma string e Functions tenta desserializar a string para o objeto Java plain-old real (POJO). Quando string, a entrada é tratada como apenas uma cadeia de caracteres. Quando binary, a mensagem é recebida como dados binários, e Funções tenta desserializá-la para um tipo real de parâmetro de array de bytes. |
| Grupo de consumidores | (Opcional) Grupo de consumidores Kafka usado pelo gatilho. |
| avroSchema | (Opcional) Esquema de um registro genérico ao usar o protocolo Avro. |
| keyAvroSchema | (Opcional) Esquema de um registo genérico da chave de mensagem ao utilizar o protocolo Avro. |
| keyDataType | (Opcional) Tipo de dados para receber a chave de mensagem a partir do Kafka Topic. Se keyAvroSchema for definido, este valor é registo genérico. Os valores aceites são Int, Long, String, e Binary. |
| authenticationMode | (Opcional) O modo de autenticação ao usar a autenticação SASL (Simple Authentication and Security Layer). Os valores suportados são NotSet (por defeito), Gssapi, Plain, ScramSha256, ScramSha512. |
| nome de utilizador | (Opcional) O nome de usuário para autenticação SASL. Não suportado quando AuthenticationMode é Gssapi. Consulte Conexões para obter mais informações. |
| palavra-passe | (Opcional) A senha para autenticação SASL. Não suportado quando AuthenticationMode é Gssapi. Consulte Conexões para obter mais informações. |
| protocolo | (Opcional) O protocolo de segurança usado na comunicação com corretores. Os valores suportados são NotSet (por defeito), plaintext, ssl, sasl_plaintext, sasl_ssl. |
| sslCaLocalização | (Opcional) Caminho para o arquivo de certificado da autoridade de certificação para verificar o certificado do corretor. |
| sslCertificateLocation | (Opcional) Caminho para o certificado do cliente. |
| sslKeyLocalização | (Opcional) Caminho para a chave privada do cliente (PEM) usada para autenticação. |
| sslKeyPassword | (Opcional) Senha para o certificado do cliente. |
| sslCertificatePEM | (Opcional) Certificado de cliente em formato PEM como uma cadeia. Consulte Conexões para obter mais informações. |
| sslKeyPEM | (Opcional) Chave privada do cliente em formato PEM como uma cadeia. Consulte Conexões para obter mais informações. |
| sslCaPEM | (Opcional) Certificado CA em formato PEM como string. Consulte Conexões para obter mais informações. |
| sslCertificateandKeyPEM | (Opcional) Certificado do cliente e chave em formato PEM como uma string. Consulte Conexões para obter mais informações. |
| lagThreshold | (Opcional) Limiar de atraso para o gatilho. |
| schemaRegistryUrl | (Opcional) URL para o Avro Schema Registry. Consulte Conexões para obter mais informações. |
| schemaRegistryUsername | (Opcional) Nome de utilizador para o Avro Schema Registry. Consulte Conexões para obter mais informações. |
| schemaRegistryPassword | (Opcional) Palavra-passe para o Registo de Esquemas Avro. Consulte Conexões para obter mais informações. |
| oAuthBearerMethod | (Opcional) Método OAuth Bearer. Os valores aceites são oidc e default. |
| oAuthBearerClientId | (Opcional) Quando oAuthBearerMethod está definido para oidc, isto especifica o ID do cliente portador OAuth. Consulte Conexões para obter mais informações. |
| oAuthBearerClientSecret | (Opcional) Quando oAuthBearerMethod está definido como oidc, isto especifica o secreto do cliente portador OAuth. Consulte Conexões para obter mais informações. |
| oAuthBearerScope | (Opcional) Especifica o âmbito do pedido de acesso ao corretor. |
| oAuthBearerTokenEndpointUrl | (Opcional) OAuth/OIDC endpoint do token emissor HTTP(S) URI usado para recuperar o token quando oidc o método é utilizado. Consulte Conexões para obter mais informações. |
Configuração
A tabela a seguir explica as propriedades de configuração de associação definidas no arquivo function.json . O Python utiliza snake_case convenções de nomenclatura para propriedades de configuração.
| function.json propriedade | Description |
|---|---|
| type | (Obrigatório) Definir para kafkaTrigger. |
| direção | (Obrigatório) Definir para in. |
| Designação | (Obrigatório) O nome da variável que representa os dados intermediados no código da função. |
| broker_list | (Obrigatório) A lista de corretores Kafka monitorados pelo gatilho. Consulte Conexões para obter mais informações. |
| topic | (Obrigatório) O tópico monitorado pelo gatilho. |
| cardinalidade | (Opcional) Indica a cardinalidade da entrada do gatilho. Os valores suportados são ONE (padrão) e MANY. Use ONE quando a entrada é uma única mensagem e MANY quando a entrada é uma matriz de mensagens. Quando você usa MANYo , você também deve definir um data_typearquivo . |
| data_type | Define como Functions lida com o valor do parâmetro. Por padrão, o valor é obtido como uma string e Functions tenta desserializar a string para o objeto Java plain-old real (POJO). Quando string, a entrada é tratada como apenas uma cadeia de caracteres. Quando binary, a mensagem é recebida como dados binários e o Functions tenta desserializá-la para um byte do tipo parâmetro real[]. |
| Grupo de consumidores | (Opcional) Grupo de consumidores Kafka usado pelo gatilho. |
| avroSchema | (Opcional) Esquema de um registro genérico ao usar o protocolo Avro. |
| authentication_mode | (Opcional) O modo de autenticação ao usar a autenticação SASL (Simple Authentication and Security Layer). Os valores suportados são NOTSET (por defeito), Gssapi, Plain, ScramSha256, ScramSha512. |
| nome de utilizador | (Opcional) O nome de usuário para autenticação SASL. Não suportado quando authentication_mode é Gssapi. Consulte Conexões para obter mais informações. |
| palavra-passe | (Opcional) A senha para autenticação SASL. Não suportado quando authentication_mode é Gssapi. Consulte Conexões para obter mais informações. |
| protocolo | (Opcional) O protocolo de segurança usado na comunicação com corretores. Os valores suportados são NOTSET (por defeito), plaintext, ssl, sasl_plaintext, sasl_ssl. |
| sslCaLocalização | (Opcional) Caminho para o arquivo de certificado da autoridade de certificação para verificar o certificado do corretor. |
| sslCertificateLocation | (Opcional) Caminho para o certificado do cliente. |
| sslKeyLocalização | (Opcional) Caminho para a chave privada do cliente (PEM) usada para autenticação. |
| sslKeyPassword | (Opcional) Senha para o certificado do cliente. |
| lag_threshold | (Opcional) Limiar de atraso para o gatilho. |
| schema_registry_url | (Opcional) URL para o Avro Schema Registry. Consulte Conexões para obter mais informações. |
| schema_registry_username | (Opcional) Nome de utilizador para o Avro Schema Registry. Consulte Conexões para obter mais informações. |
| schema_registry_password | (Opcional) Palavra-passe para o Registo de Esquemas Avro. Consulte Conexões para obter mais informações. |
| o_auth_bearer_method | (Opcional) Método OAuth Bearer. Os valores aceites são oidc e default. |
| o_auth_bearer_client_id | (Opcional) Quando o_auth_bearer_method está definido para oidc, isto especifica o ID do cliente portador OAuth. Consulte Conexões para obter mais informações. |
| o_auth_bearer_client_secret | (Opcional) Quando o_auth_bearer_method está definido como oidc, isto especifica o secreto do cliente portador OAuth. Consulte Conexões para obter mais informações. |
| o_auth_bearer_scope | (Opcional) Especifica o âmbito do pedido de acesso ao corretor. |
| o_auth_bearer_token_endpoint_url | (Opcional) OAuth/OIDC endpoint do token emissor HTTP(S) URI usado para recuperar o token quando oidc o método é utilizado. Consulte Conexões para obter mais informações. |
Nota
As propriedades relacionadas com PEM de certificados e as propriedades relacionadas com chaves Avro ainda não estão disponíveis na biblioteca Python.
Utilização
O gatilho Kafka atualmente suporta eventos Kafka como strings e arrays de strings que são cargas úteis JSON.
O gatilho Kafka passa mensagens Kafka para a função sob a forma de cadeias. O trigger também suporta arrays de string que são cargas úteis JSON.
Num plano Premium, deve ativar a monitorização de escala em tempo de execução para que a saída Kafka escale para múltiplas instâncias. Para saber mais, consulte Habilitar o dimensionamento em tempo de execução.
Não podes usar a funcionalidade Testar/Executar da página Código + Teste no portal Azure para trabalhar com gatilhos Kafka. Em vez disso, você deve enviar eventos de teste diretamente para o tópico que está sendo monitorado pelo gatilho.
Para obter um conjunto completo de configurações de host.json suportadas para o gatilho Kafka, consulte host.json configurações.
Ligações
Armazena toda a informação de ligação exigida pelos teus triggers e bindings nas definições da aplicação, não nas definições de binding do teu código. Esta orientação aplica-se a credenciais, que nunca deve armazenar no seu código.
Importante
As configurações de credenciais devem fazer referência a uma configuração de aplicativo. Não codifice credenciais em seu código ou arquivos de configuração. Ao executar localmente, use o arquivo local.settings.json para suas credenciais e não publique o arquivo local.settings.json.
Ao ligar-se a um cluster Kafka gerido fornecido pelo Confluent no Azure, pode usar um dos seguintes métodos de autenticação.
Nota
Ao utilizar o plano Flex Consumption, as propriedades de autenticação de certificados baseadas na localização de ficheiros (SslCaLocation, SslCertificateLocation, SslKeyLocation) não são suportadas. Em vez disso, utilize as propriedades de certificados baseadas em PEM (SslCaPEM, SslCertificatePEM, SslKeyPEM, SslCertificateandKeyPEM) ou armazene certificados no Azure Key Vault.
Registo de Esquemas
Para utilizar o registo de esquema fornecido pelo Confluent na Extensão Kafka, defina as seguintes credenciais:
| Definição | Valor Recomendado | Description |
|---|---|---|
| SchemaRegistryUrl | SchemaRegistryUrl |
URL do serviço de registo de esquemas usado para gestão de esquemas. Normalmente do formato https://psrc-xyz.us-east-2.aws.confluent.cloud |
| SchemaRegistryUsername | CONFLUENT_API_KEY |
Nome de utilizador para autenticação básica no registo de esquemas (se necessário). |
| SchemaRegistryPassword | CONFLUENT_API_SECRET |
Palavra-passe para autenticação básica no registo de esquemas (se necessário). |
Autenticação por nome de utilizador/palavra-passe
Ao utilizar esta forma de autenticação, certifique-se de que Protocol está definido como SaslPlaintext ou SaslSsl, AuthenticationMode está definido como Plain, ScramSha256 ou ScramSha512 e, se o certificado CA utilizado for diferente do certificado padrão ISRG Root X1, certifique-se de atualizar SslCaLocation ou SslCaPEM.
| Definição | Valor recomendado | Description |
|---|---|---|
| Lista de Corretores | BootstrapServer |
A configuração do aplicativo nomeada BootstrapServer contém o valor do servidor de inicialização encontrado na página Configurações do Confluent Cloud. O valor é semelhante a xyz-xyzxzy.westeurope.azure.confluent.cloud:9092. |
| Nome de utilizador | ConfluentCloudUsername |
A configuração do aplicativo nomeada ConfluentCloudUsername contém a chave de acesso da API do site Confluent Cloud. |
| Palavra-passe | ConfluentCloudPassword |
A configuração do aplicativo nomeada ConfluentCloudPassword contém o segredo da API obtido do site Confluent Cloud. |
| SslCaPEM | SSLCaPemCertificate |
Definição de aplicação chamada SSLCaPemCertificate que contém o certificado CA como uma string em formato PEM. O valor deve seguir o formato padrão, por exemplo: -----BEGIN CERTIFICATE-----\nMII....JQ==\n-----END CERTIFICATE-----. |
Autenticação SSL
Certifique-se de que Protocol está definido como SSL.
| Definição | Valor Recomendado | Description |
|---|---|---|
| Lista de Corretores | BootstrapServer |
A configuração do aplicativo nomeada BootstrapServer contém o valor do servidor de inicialização encontrado na página Configurações do Confluent Cloud. O valor é semelhante a xyz-xyzxzy.westeurope.azure.confluent.cloud:9092. |
| SslCaPEM | SslCaCertificatePem |
Definição de aplicação nomeada SslCaCertificatePem que contém o valor PEM do certificado da CA como uma string. O valor deve seguir o formato padrão: -----BEGIN CERTIFICATE-----\nMII...JQ==\n-----END CERTIFICATE----- |
| SslCertificatePEM | SslClientCertificatePem |
Definição de aplicação nomeada SslClientCertificatePem que contém o valor PEM do certificado do cliente como uma string. O valor deve seguir o formato padrão: -----BEGIN CERTIFICATE-----\nMII...JQ==\n-----END CERTIFICATE----- |
| SslKeyPEM | SslClientKeyPem |
Definição de aplicação nomeada SslClientKeyPem que contém o valor PEM da chave privada do cliente como uma string. O valor deve seguir o formato padrão: -----BEGIN PRIVATE KEY-----\nMII...JQ==\n-----END PRIVATE KEY----- |
| SslCertificateandKeyPEM | SslClientCertificateAndKeyPem |
Definição de aplicação nomeada SslClientCertificateAndKeyPem que contém o valor PEM do certificado do cliente e da chave privada do cliente concatenadas como uma string. O valor deve seguir o formato padrão: -----BEGIN CERTIFICATE-----\nMII....JQ==\n-----END CERTIFICATE-----\n-----BEGIN PRIVATE KEY-----\nMIIE....BM=\n-----END PRIVATE KEY----- |
| SslKeyPassword | SslClientKeyPassword |
Uma definição de aplicação chamada SslClientKeyPassword que contém a palavra-passe da chave privada (se existir). |
Autenticação OAuth
Ao usar autenticação OAuth, configure as propriedades relacionadas com OAuth nas suas definições de ligação.
Os valores de cadeia de caracteres que você usa para essas configurações devem estar presentes como Values ou na coleção no arquivo local.settings.json durante o desenvolvimento local.
Deves também definir o Protocol e AuthenticationMode nas definições vinculativas.