Partilhar via


Vinculação de saída do Apache Kafka para o Azure Functions

A ligação de saída permite que uma aplicação Azure Functions envie mensagens para um tópico Kafka.

Importante

Estão disponíveis fixações Kafka para Funções no plano Flex Consumption, Elastic Premium Plan e Dedicated (App Service). Só são suportados na versão 4.x do runtime Functions.

Exemplo

A forma como usas a ligação depende da modalidade C# na tua aplicação de funções. Pode usar uma das seguintes modalidades:

Uma função C# compilada que utiliza uma biblioteca de classes de processo worker isolada que corre num processo separado do tempo de execução.

Os atributos que você usa dependem do provedor de eventos específico.

O exemplo seguinte utiliza um tipo de retorno personalizado chamado MultipleOutputType, que consiste numa resposta HTTP e numa saída Kafka.

[Function("KafkaOutput")]

public static MultipleOutputType Output(
    [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
    FunctionContext executionContext)
{
    var log = executionContext.GetLogger("HttpFunction");
    log.LogInformation("C# HTTP trigger function processed a request.");

    string message = req.FunctionContext
                        .BindingContext
                        .BindingData["message"]
                        .ToString();

    var response = req.CreateResponse(HttpStatusCode.OK);
    return new MultipleOutputType()
    {
        Kevent = message,
        HttpResponse = response
    };
}

Na MultipleOutputType classe, Kevent é a variável de ligação de saída para a ligação de Kafka.

public class MultipleOutputType
{
    [KafkaOutput("BrokerList",
                "topic",
                Username = "ConfluentCloudUserName",
                Password = "ConfluentCloudPassword",
        Protocol = BrokerProtocol.SaslSsl,
        AuthenticationMode = BrokerAuthenticationMode.Plain
    )]        
    public string Kevent { get; set; }

    public HttpResponseData HttpResponse { get; set; }
}

Para enviar um lote de eventos, passe uma matriz de cadeia de caracteres para o tipo de saída, conforme mostrado no exemplo a seguir:

[Function("KafkaOutputMany")]

public static MultipleOutputTypeForBatch Output(
    [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
    FunctionContext executionContext)
{
    var log = executionContext.GetLogger("HttpFunction");
    log.LogInformation("C# HTTP trigger function processed a request.");
    var response = req.CreateResponse(HttpStatusCode.OK);

    string[] messages = new string[2];
    messages[0] = "one";
    messages[1] = "two";

    return new MultipleOutputTypeForBatch()
    {
        Kevents = messages,
        HttpResponse = response
    };
}

O array de strings é definido como a Kevents propriedade sobre a classe, e a ligação de saída é definida nesta propriedade:

public class MultipleOutputTypeForBatch
{
    [KafkaOutput("BrokerList",
                 "topic",
                 Username = "ConfluentCloudUserName",
                 Password = "ConfluentCloudPassword",
        Protocol = BrokerProtocol.SaslSsl,
        AuthenticationMode = BrokerAuthenticationMode.Plain
    )]        
    public string[] Kevents { get; set; }

    public HttpResponseData HttpResponse { get; set; }
}

A função a seguir adiciona cabeçalhos aos dados de saída de Kafka:

[Function("KafkaOutputWithHeaders")]

public static MultipleOutputType Output(
    [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
    FunctionContext executionContext)
{
    var log = executionContext.GetLogger("HttpFunction");
    log.LogInformation("C# HTTP trigger function processed a request.");

    string message = req.FunctionContext
                        .BindingContext
                        .BindingData["message"]
                        .ToString();
    string kevent = "{ \"Offset\":364,\"Partition\":0,\"Topic\":\"kafkaeventhubtest1\",\"Timestamp\":\"2022-04-09T03:20:06.591Z\", \"Value\": \"" + message + "\", \"Headers\": [{ \"Key\": \"test\", \"Value\": \"dotnet-isolated\" }] }";
    var response = req.CreateResponse(HttpStatusCode.OK);
    return new MultipleOutputType()
    {
        Kevent = kevent,
        HttpResponse = response
    };
}

Para obter um conjunto completo de exemplos .NET de trabalho, consulte o repositório de extensão Kafka.

A utilização do binding de saída depende da tua versão do modelo de programação Node.js.

No modelo Node.js v4, defines a ligação de saída diretamente no código da função. Para obter mais informações, consulte o Guia do desenvolvedor do Azure Functions Node.js.

Nestes exemplos, os fornecedores de eventos são ou Confluent ou Azure Event Hubs. Estes exemplos mostram uma ligação de saída Kafka para uma função que um pedido HTTP aciona e envia dados do pedido para o tópico Kafka.

import {
  app,
  HttpRequest,
  HttpResponseInit,
  InvocationContext,
  output,
} from "@azure/functions";

const kafkaOutput = output.generic({
  type: "kafka",
  direction: "out",
  topic: "topic",
  brokerList: "%BrokerList%",
  username: "ConfluentCloudUsername",
  password: "ConfluentCloudPassword",
  protocol: "saslSsl",
  authenticationMode: "plain",
});

export async function kafkaOutputWithHttp(
  request: HttpRequest,
  context: InvocationContext
): Promise<HttpResponseInit> {
  context.log(`Http function processed request for url "${request.url}"`);

  const body = await request.text();
  const queryName = request.query.get("name");
  const parsedbody = JSON.parse(body);
  const name = queryName || parsedbody.name || "world";
  context.extraOutputs.set(kafkaOutput, `Hello, ${parsedbody.name}!`);
  context.log(
    `Sending message to kafka: ${context.extraOutputs.get(kafkaOutput)}`
  );
  return {
    body: `Message sent to kafka with value: ${context.extraOutputs.get(
      kafkaOutput
    )}`,
    status: 200,
  };
}

const extraOutputs = [];
extraOutputs.push(kafkaOutput);

app.http("kafkaOutputWithHttp", {
  methods: ["GET", "POST"],
  authLevel: "anonymous",
  extraOutputs,
  handler: kafkaOutputWithHttp,
});

Para enviar eventos em lote, envie um array de mensagens, como mostrado nestes exemplos:

const { app, output } = require("@azure/functions");

const kafkaOutput = output.generic({
  type: "kafka",
  direction: "out",
  topic: "topic",
  brokerList: "%BrokerList%",
  username: "ConfluentCloudUsername",
  password: "ConfluentCloudPassword",
  protocol: "saslSsl",
  authenticationMode: "plain",
});

async function kafkaOutputManyWithHttp(request, context) {
  context.log(`Http function processed request for url "${request.url}"`);

  const queryName = request.query.get("name");
  const body = await request.text();
  const parsedbody = body ? JSON.parse(body) : {};
  parsedbody.name = parsedbody.name || "world";
  const name = queryName || parsedbody.name;
  context.extraOutputs.set(kafkaOutput, `Message one. Hello, ${name}!`);
  context.extraOutputs.set(kafkaOutput, `Message two. Hello, ${name}!`);
  return {
    body: `Messages sent to kafka.`,
    status: 200,
  };
}

const extraOutputs = [];
extraOutputs.push(kafkaOutput);

app.http("kafkaOutputManyWithHttp", {
  methods: ["GET", "POST"],
  authLevel: "anonymous",
  extraOutputs,
  handler: kafkaOutputManyWithHttp,
});

Estes exemplos mostram como enviar uma mensagem de evento com cabeçalhos para um tópico Kafka:

import {
  app,
  HttpRequest,
  HttpResponseInit,
  InvocationContext,
  output,
} from "@azure/functions";

const kafkaOutput = output.generic({
  type: "kafka",
  direction: "out",
  topic: "topic",
  brokerList: "%BrokerList%",
  username: "ConfluentCloudUsername",
  password: "ConfluentCloudPassword",
  protocol: "saslSsl",
  authenticationMode: "plain",
});

export async function kafkaOutputWithHttp(
  request: HttpRequest,
  context: InvocationContext
): Promise<HttpResponseInit> {
  context.log(`Http function processed request for url "${request.url}"`);

  const body = await request.text();
  const parsedbody = JSON.parse(body);
  // assuming body is of the format { "key": "key", "value": {JSON object} }
  context.extraOutputs.set(
    kafkaOutput,
    `{ "Offset":364,"Partition":0,"Topic":"test-topic","Timestamp":"2022-04-09T03:20:06.591Z", "Value": "${JSON.stringify(
      parsedbody.value
    ).replace(/"/g, '\\"')}", "Key":"${
      parsedbody.key
    }", "Headers": [{ "Key": "language", "Value": "javascript" }] }`
  );
  context.log(
    `Sending message to kafka: ${context.extraOutputs.get(kafkaOutput)}`
  );
  return {
    body: `Message sent to kafka with value: ${context.extraOutputs.get(
      kafkaOutput
    )}`,
    status: 200,
  };
}

const extraOutputs = [];
extraOutputs.push(kafkaOutput);

app.http("kafkaOutputWithHttp", {
  methods: ["GET", "POST"],
  authLevel: "anonymous",
  extraOutputs,
  handler: kafkaOutputWithHttp,
});

Para obter um conjunto completo de exemplos JavaScript de trabalho, consulte o repositório de extensões Kafka.

import {
  app,
  HttpRequest,
  HttpResponseInit,
  InvocationContext,
  output,
} from "@azure/functions";

const kafkaOutput = output.generic({
  type: "kafka",
  direction: "out",
  topic: "topic",
  brokerList: "%BrokerList%",
  username: "ConfluentCloudUsername",
  password: "ConfluentCloudPassword",
  protocol: "saslSsl",
  authenticationMode: "plain",
});

export async function kafkaOutputWithHttp(
  request: HttpRequest,
  context: InvocationContext
): Promise<HttpResponseInit> {
  context.log(`Http function processed request for url "${request.url}"`);

  const body = await request.text();
  const queryName = request.query.get("name");
  const parsedbody = JSON.parse(body);
  const name = queryName || parsedbody.name || "world";
  context.extraOutputs.set(kafkaOutput, `Hello, ${parsedbody.name}!`);
  context.log(
    `Sending message to kafka: ${context.extraOutputs.get(kafkaOutput)}`
  );
  return {
    body: `Message sent to kafka with value: ${context.extraOutputs.get(
      kafkaOutput
    )}`,
    status: 200,
  };
}

const extraOutputs = [];
extraOutputs.push(kafkaOutput);

app.http("kafkaOutputWithHttp", {
  methods: ["GET", "POST"],
  authLevel: "anonymous",
  extraOutputs,
  handler: kafkaOutputWithHttp,
});

Para enviar eventos em lote, envie um array de mensagens, como mostrado nestes exemplos:

import {
  app,
  HttpRequest,
  HttpResponseInit,
  InvocationContext,
  output,
} from "@azure/functions";

const kafkaOutput = output.generic({
  type: "kafka",
  direction: "out",
  topic: "topic",
  brokerList: "%BrokerList%",
  username: "ConfluentCloudUsername",
  password: "ConfluentCloudPassword",
  protocol: "saslSsl",
  authenticationMode: "plain",
});

export async function kafkaOutputManyWithHttp(
  request: HttpRequest,
  context: InvocationContext
): Promise<HttpResponseInit> {
  context.log(`Http function processed request for url "${request.url}"`);

  const queryName = request.query.get("name");
  const body = await request.text();
  const parsedbody = body ? JSON.parse(body) : {};
  parsedbody.name = parsedbody.name || "world";
  const name = queryName || parsedbody.name;
  context.extraOutputs.set(kafkaOutput, `Message one. Hello, ${name}!`);
  context.extraOutputs.set(kafkaOutput, `Message two. Hello, ${name}!`);
  return {
    body: `Messages sent to kafka.`,
    status: 200,
  };
}

const extraOutputs = [];
extraOutputs.push(kafkaOutput);

app.http("kafkaOutputManyWithHttp", {
  methods: ["GET", "POST"],
  authLevel: "anonymous",
  extraOutputs,
  handler: kafkaOutputManyWithHttp,
});

Estes exemplos mostram como enviar uma mensagem de evento com cabeçalhos para um tópico Kafka:

import {
  app,
  HttpRequest,
  HttpResponseInit,
  InvocationContext,
  output,
} from "@azure/functions";

const kafkaOutput = output.generic({
  type: "kafka",
  direction: "out",
  topic: "topic",
  brokerList: "%BrokerList%",
  username: "ConfluentCloudUsername",
  password: "ConfluentCloudPassword",
  protocol: "saslSsl",
  authenticationMode: "plain",
});

export async function kafkaOutputWithHttp(
  request: HttpRequest,
  context: InvocationContext
): Promise<HttpResponseInit> {
  context.log(`Http function processed request for url "${request.url}"`);

  const body = await request.text();
  const parsedbody = JSON.parse(body);
  // assuming body is of the format { "key": "key", "value": {JSON object} }
  context.extraOutputs.set(
    kafkaOutput,
    `{ "Offset":364,"Partition":0,"Topic":"test-topic","Timestamp":"2022-04-09T03:20:06.591Z", "Value": "${JSON.stringify(
      parsedbody.value
    ).replace(/"/g, '\\"')}", "Key":"${
      parsedbody.key
    }", "Headers": [{ "Key": "language", "Value": "typescript" }] }`
  );
  context.log(
    `Sending message to kafka: ${context.extraOutputs.get(kafkaOutput)}`
  );
  return {
    body: `Message sent to kafka with value: ${context.extraOutputs.get(
      kafkaOutput
    )}`,
    status: 200,
  };
}

const extraOutputs = [];
extraOutputs.push(kafkaOutput);

app.http("kafkaOutputWithHttp", {
  methods: ["GET", "POST"],
  authLevel: "anonymous",
  extraOutputs,
  handler: kafkaOutputWithHttp,
});

Para um conjunto completo de exemplos funcionais de TypeScript, consulte o repositório de extensões Kafka.

As propriedades específicas do arquivo function.json dependem do seu provedor de eventos, que nesses exemplos são Confluent ou Hubs de Eventos do Azure. Os exemplos seguintes mostram uma ligação de saída Kafka para uma função que um pedido HTTP desencadeia e envia dados do pedido para o tópico Kafka.

A function.json a seguir define o gatilho para o provedor específico nesses exemplos:

{
  "bindings": [
    {
      "authLevel": "function",
      "type": "httpTrigger",
      "direction": "in",
      "name": "Request",
      "methods": [
        "get"
      ]
    },
    {
      "type": "kafka",
      "name": "outputMessage",
      "brokerList": "BrokerList",
      "topic": "topic",
      "username" : "%ConfluentCloudUserName%",
      "password" : "%ConfluentCloudPassword%",
      "protocol": "SASLSSL",
      "authenticationMode": "PLAIN",
      "direction": "out"
    },
    {
      "type": "http",
      "direction": "out",
      "name": "Response"
    }
  ]
}

O seguinte código envia uma mensagem ao tópico:

using namespace System.Net

# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)

# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."

# Interact with query parameters or the body of the request.
$message = $Request.Query.Message

$message

Push-OutputBinding -Name outputMessage -Value ($message)

# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
    StatusCode = [HttpStatusCode]::OK
})

O código a seguir envia várias mensagens como uma matriz para o mesmo tópico:

using namespace System.Net

# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)

# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."

$message = @("one", "two")
Push-OutputBinding -Name outputMessage -Value ($message)

# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
    StatusCode = [HttpStatusCode]::OK
})

O exemplo a seguir mostra como enviar uma mensagem de evento com cabeçalhos para o mesmo tópico Kafka:

using namespace System.Net

# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)

# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."

# Interact with query parameters or the body of the request.
$message = $Request.Query.Message
if (-not $message) {
    $message = $Request.Body.Message
}

$kevent = @{
    Offset = 364
    Partition = 0
    Topic = "kafkaeventhubtest1"
    Timestamp = "2022-04-09T03:20:06.591Z"
    Value = $message
    Headers= @(@{
        Key= "test"
        Value= "powershell"
    }
    )
}

Push-OutputBinding -Name Message -Value $kevent

# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
    StatusCode = [HttpStatusCode]::OK
    Body = 'ok'
})

Para obter um conjunto completo de exemplos de PowerShell em funcionamento, consulte o repositório de extensão Kafka.


A utilização do output binding depende da tua versão do modelo de programação em Python.

No modelo Python v2, defines a tua ligação de saída diretamente no código da função usando decoradores. Para mais informações, consulte o guia para programadores Azure Functions em Python.

Estes exemplos mostram uma ligação de saída Kafka para uma função que um pedido HTTP aciona e envia dados do pedido para o tópico Kafka.

    input_msg = req.params.get('message')
    outputMessage.set(input_msg)
    return 'OK'


@KafkaOutput.function_name(name="KafkaOutputMany")
@KafkaOutput.route(route="kafka_output_many")
@KafkaOutput.kafka_output(arg_name="outputMessage", topic="KafkaTopic", broker_list="KafkaBrokerList", username="KafkaUsername", password="KafkaPassword", protocol="SaslSsl", authentication_mode="Plain", data_type="string")
def kafka_output_many(req: func.HttpRequest, outputMessage: func.Out[str] ) -> func.HttpResponse:
    outputMessage.set(json.dumps(['one', 'two']))
    return 'OK'

Para enviar eventos em lote, envie um array de mensagens, como mostrado nestes exemplos:

@KafkaOutput.route(route="kafka_output_with_headers")
@KafkaOutput.kafka_output(arg_name="out", topic="KafkaTopic", broker_list="KafkaBrokerList", username="KafkaUsername", password="KafkaPassword", protocol="SaslSsl", authentication_mode="Plain")
def kafka_output_with_headers(req: func.HttpRequest, out: func.Out[str]) -> func.HttpResponse:
    message = req.params.get('message')
    kevent =  { "Offset":0,"Partition":0,"Topic":"dummy","Timestamp":"2022-04-09T03:20:06.591Z", "Value": message, "Headers": [{ "Key": "test", "Value": "python" }] }
    out.set(json.dumps(kevent))
    return 'OK'

@KafkaOutput.function_name(name="KafkaOutputManyWithHeaders")
@KafkaOutput.route(route="kafka_output_many_with_headers")
@KafkaOutput.kafka_output(arg_name="out", topic="KafkaTopic", broker_list="KafkaBrokerList", username="KafkaUsername", password="KafkaPassword", protocol="SaslSsl", authentication_mode="Plain")
def kafka_output_many_with_headers(req: func.HttpRequest, out: func.Out[str]) -> func.HttpResponse:
    kevent = [{ "Offset": 364, "Partition":0,"Topic":"kafkaeventhubtest1","Timestamp":"2022-04-09T03:20:06.591Z", "Value": "one", "Headers": [{ "Key": "test", "Value": "python" }]  },

Estes exemplos mostram como enviar uma mensagem de evento com cabeçalhos para um tópico Kafka:

out.set(json.dumps(kevent))
return 'OK'

Para obter um conjunto completo de exemplos Python de trabalho, consulte o repositório de extensão Kafka.

As anotações que você usa para configurar a associação de saída dependem do provedor de eventos específico.

A função a seguir envia uma mensagem para o tópico Kafka.

@FunctionName("KafkaOutput")
public HttpResponseMessage run(
        @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
        @KafkaOutput(
            name = "kafkaOutput",
            topic = "topic",  
            brokerList="%BrokerList%",
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.  
            protocol = BrokerProtocol.SASLSSL
        )  OutputBinding<String> output,
        final ExecutionContext context) {
    context.getLogger().info("Java HTTP trigger processed a request.");

    // Parse query parameter
    String query = request.getQueryParameters().get("message");
    String message = request.getBody().orElse(query);
    context.getLogger().info("Message:" + message);
    output.setValue(message);
    return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();

O exemplo a seguir mostra como enviar várias mensagens para um tópico Kafka.

@FunctionName("KafkaOutputMany")
public HttpResponseMessage run(
        @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
        @KafkaOutput(
            name = "kafkaOutput",
            topic = "topic",  
            brokerList="%BrokerList%",
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.  
            protocol = BrokerProtocol.SASLSSL
        )  OutputBinding<String[]> output,
        final ExecutionContext context) {
    context.getLogger().info("Java HTTP trigger processed a request.");
    String[] messages = new String[2];
    messages[0] = "one";
    messages[1] = "two";
    output.setValue(messages);
    return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
}

Neste exemplo, o parâmetro de vinculação de saída é alterado para matriz de cadeia de caracteres.

O último exemplo usa estas KafkaEntity e KafkaHeader classes:

public class KafkaEntity {
    public int Offset;
    public int Partition;
    public String Timestamp;
    public String Topic;
    public String Value;
    public KafkaHeaders Headers[];

    public KafkaEntity(int Offset, int Partition, String Topic, String Timestamp, String Value,KafkaHeaders[] headers) {
        this.Offset = Offset;
        this.Partition = Partition;
        this.Topic = Topic;
        this.Timestamp = Timestamp;
        this.Value = Value;
        this.Headers = headers;
    }
public class KafkaHeaders{
    public String Key;
    public String Value;

    public KafkaHeaders(String key, String value) {
        this.Key = key;
        this.Value = value;
    }

A função de exemplo a seguir envia uma mensagem com cabeçalhos para um tópico Kafka.

@FunctionName("KafkaOutputWithHeaders")
public HttpResponseMessage run(
        @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
        @KafkaOutput(
            name = "kafkaOutput",
            topic = "topic",  
            brokerList="%BrokerList%",
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.  
            protocol = BrokerProtocol.SASLSSL
        )  OutputBinding<KafkaEntity> output,
        final ExecutionContext context) {
            context.getLogger().info("Java HTTP trigger processed a request.");
    
            // Parse query parameter
            String query = request.getQueryParameters().get("message");
            String message = request.getBody().orElse(query);
            KafkaHeaders[] headers = new KafkaHeaders[1];
            headers[0] = new KafkaHeaders("test", "java");
            KafkaEntity kevent = new KafkaEntity(364, 0, "topic", "2022-04-09T03:20:06.591Z", message, headers);
            output.setValue(kevent);
            return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
        }

Para obter um conjunto completo de exemplos Java de trabalho para Confluent, consulte o repositório de extensão Kafka.

Atributos

As bibliotecas C# do processo de trabalho em processo e isoladas usam o Kafka atributo para definir o gatilho de função.

A tabela seguinte explica as propriedades que pode definir usando este atributo:

Parâmetro Description
Lista de Corretores (Obrigatório) A lista de corretores Kafka para os quais a saída é enviada. Consulte Conexões para obter mais informações.
Tópico (Obrigatório) O tópico para o qual a saída é enviada.
AvroSchema (Opcional) Esquema de um registo genérico do valor da mensagem ao utilizar o protocolo Avro.
KeyAvroSchema (Opcional) Esquema de um registo genérico da chave de mensagem ao utilizar o protocolo Avro.
KeyDataType (Opcional) Tipo de dados para enviar a chave de mensagem para o Tópico Kafka. Se KeyAvroSchema for definido, este valor é registo genérico. Os valores aceites são Int, Long, String, e Binary.
MaxMessageBytes (Opcional) O tamanho máximo da mensagem de saída que está sendo enviada (em MB), com um valor padrão de 1.
Tamanho do lote (Opcional) Número máximo de mensagens em lote em um único conjunto de mensagens, com um valor padrão de 10000.
CapacitaçãoIdempotência (Opcional) Quando definido como true, garante que as mensagens sejam produzidas com êxito exatamente uma vez e na ordem de produção original, com um valor padrão de false
MessageTimeoutMs (Opcional) O tempo limite da mensagem local, em milissegundos. Esse valor só é imposto localmente e limita o tempo que uma mensagem produzida aguarda a entrega bem-sucedida, com um padrão 300000. Um tempo de 0 é infinito. Esse valor é o tempo máximo usado para entregar uma mensagem (incluindo tentativas). O erro de entrega ocorre quando a contagem de novas tentativas ou o tempo limite da mensagem são excedidos.
RequestTimeoutMs (Opcional) O tempo limite de confirmação da solicitação de saída, em milissegundos, com um padrão de 5000.
MaxRetries (Opcional) O número de vezes que tentar enviar novamente uma mensagem com falha, com um padrão de 2. Tentar novamente pode causar reordenação, a menos que EnableIdempotence esteja definido como true.
Modo de autenticação (Opcional) O modo de autenticação ao usar a autenticação SASL (Simple Authentication and Security Layer). Os valores suportados são NotSet (por defeito), Gssapi, Plain, ScramSha256, ScramSha512, e OAuthBearer.
Nome de utilizador (Opcional) O nome de usuário para autenticação SASL. Não suportado quando AuthenticationMode é Gssapi. Consulte Conexões para obter mais informações.
Palavra-passe (Opcional) A senha para autenticação SASL. Não suportado quando AuthenticationMode é Gssapi. Consulte Conexões para obter mais informações.
Protocolo (Opcional) O protocolo de segurança usado na comunicação com corretores. Os valores suportados são NotSet (por defeito), plaintext, ssl, sasl_plaintext, sasl_ssl.
Localização de SslCa. (Opcional) Caminho para o arquivo de certificado da autoridade de certificação para verificar o certificado do corretor.
SslCertificateLocation (Opcional) Caminho para o certificado do cliente.
SslKeyLocalização (Opcional) Caminho para a chave privada do cliente (PEM) usada para autenticação.
SslKeyPassword (Opcional) Senha para o certificado do cliente.
SslCertificatePEM (Opcional) Certificado de cliente em formato PEM como uma cadeia. Consulte Conexões para obter mais informações.
SslKeyPEM (Opcional) Chave privada do cliente em formato PEM como uma cadeia. Consulte Conexões para obter mais informações.
SslCaPEM (Opcional) Certificado CA em formato PEM como string. Consulte Conexões para obter mais informações.
SslCertificateandKeyPEM (Opcional) Certificado do cliente e chave em formato PEM como uma string. Consulte Conexões para obter mais informações.
SchemaRegistryUrl (Opcional) URL para o Avro Schema Registry. Consulte Conexões para obter mais informações.
SchemaRegistryUsername (Opcional) Nome de utilizador para o Avro Schema Registry. Consulte Conexões para obter mais informações.
SchemaRegistryPassword (Opcional) Palavra-passe para o Registo de Esquemas Avro. Consulte Conexões para obter mais informações.
OAuthBearerMethod (Opcional) Método OAuth Bearer. Os valores aceites são oidc e default.
OAuthBearerClientId (Opcional) Quando OAuthBearerMethod está definido para oidc, isto especifica o ID do cliente portador OAuth. Consulte Conexões para obter mais informações.
OAuthBearerClientSecret (Opcional) Quando OAuthBearerMethod está definido como oidc, isto especifica o secreto do cliente portador OAuth. Consulte Conexões para obter mais informações.
OAuthBearerScope (Opcional) Especifica o âmbito do pedido de acesso ao corretor.
OAuthBearerTokenEndpointUrl (Opcional) OAuth/OIDC endpoint do token emissor HTTP(S) URI usado para recuperar o token quando oidc o método é utilizado. Consulte Conexões para obter mais informações.
OAuthBearerExtensions (Opcional) Lista separada por vírgulas de pares chave=valor a ser fornecida como informação adicional ao corretor quando oidc o método for utilizado. Por exemplo: supportFeatureX=true,organizationId=sales-emea.

Anotações

A KafkaOutput anotação permite-lhe criar uma função que escreve sobre um tema específico. As opções suportadas incluem os seguintes elementos:

Elemento Description
Designação O nome da variável que representa os dados intermediados no código da função.
Lista de corretores (Obrigatório) A lista de corretores Kafka para os quais a saída é enviada. Consulte Conexões para obter mais informações.
topic (Obrigatório) O tópico para o qual a saída é enviada.
Tipo de dados Define como Functions lida com o valor do parâmetro. Por padrão, o valor é obtido como uma string e Functions tenta desserializar a string para o objeto Java plain-old real (POJO). Quando string, a entrada é tratada como apenas uma cadeia de caracteres. Quando binary, a mensagem é recebida como dados binários e o Functions tenta desserializá-la para um byte do tipo parâmetro real[].
avroSchema (Opcional) Esquema de um registro genérico ao usar o protocolo Avro. (Atualmente não suportado para Java.)
maxMessageBytes (Opcional) O tamanho máximo da mensagem de saída que está sendo enviada (em MB), com um valor padrão de 1.
tamanho do lote (Opcional) Número máximo de mensagens em lote em um único conjunto de mensagens, com um valor padrão de 10000.
enableIdempotence (Opcional) Quando definido para true, garante que as mensagens são produzidas com sucesso exatamente uma vez e na ordem original de produção, com um valor padrão de false.
messageTimeoutMs (Opcional) O tempo limite da mensagem local, em milissegundos. Esse valor só é imposto localmente e limita o tempo que uma mensagem produzida aguarda a entrega bem-sucedida, com um padrão 300000. Um tempo de 0 é infinito. Esse valor é o tempo máximo usado para entregar uma mensagem (incluindo tentativas). O erro de entrega ocorre quando a contagem de novas tentativas ou o tempo limite da mensagem são excedidos.
requestTimeoutMs (Opcional) O tempo limite de confirmação da solicitação de saída, em milissegundos, com um padrão de 5000.
maxTentativas (Opcional) O número de vezes que tentar enviar novamente uma mensagem com falha, com um padrão de 2. Tentar novamente pode causar reordenação, a menos que EnableIdempotence esteja definido para true.
authenticationMode (Opcional) O modo de autenticação ao usar a autenticação SASL (Simple Authentication and Security Layer). Os valores suportados são NotSet (por defeito), Gssapi, Plain, ScramSha256, ScramSha512.
nome de utilizador (Opcional) O nome de usuário para autenticação SASL. Não suportado quando AuthenticationMode é Gssapi. Consulte Conexões para obter mais informações.
palavra-passe (Opcional) A senha para autenticação SASL. Não suportado quando AuthenticationMode é Gssapi. Consulte Conexões para obter mais informações.
protocolo (Opcional) O protocolo de segurança usado na comunicação com corretores. Os valores suportados são NotSet (por defeito), plaintext, ssl, sasl_plaintext, sasl_ssl.
sslCaLocalização (Opcional) Caminho para o arquivo de certificado da autoridade de certificação para verificar o certificado do corretor.
sslCertificateLocation (Opcional) Caminho para o certificado do cliente.
sslKeyLocalização (Opcional) Caminho para a chave privada do cliente (PEM) usada para autenticação.
sslKeyPassword (Opcional) Senha para o certificado do cliente.
schemaRegistryUrl (Opcional) URL para o Avro Schema Registry. Consulte Conexões para obter mais informações.
schemaRegistryUsername (Opcional) Nome de utilizador para o Avro Schema Registry. Consulte Conexões para obter mais informações.
schemaRegistryPassword (Opcional) Palavra-passe para o Registo de Esquemas Avro. Consulte Conexões para obter mais informações.

Configuração

A tabela a seguir explica as propriedades de configuração de associação definidas no arquivo function.json .

function.json propriedade Description
type Definido como kafka.
direção Definido como out.
Designação O nome da variável que representa os dados intermediados no código da função.
Lista de corretores (Obrigatório) A lista de corretores Kafka para os quais a saída é enviada. Consulte Conexões para obter mais informações.
topic (Obrigatório) O tópico para o qual a saída é enviada.
avroSchema (Opcional) Esquema de um registro genérico ao usar o protocolo Avro.
keyAvroSchema (Opcional) Esquema de um registo genérico da chave de mensagem ao utilizar o protocolo Avro.
keyDataType (Opcional) Tipo de dados para enviar a chave de mensagem para o Tópico Kafka. Se keyAvroSchema for definido, este valor é registo genérico. Os valores aceites são Int, Long, String, e Binary.
maxMessageBytes (Opcional) O tamanho máximo da mensagem de saída que está sendo enviada (em MB), com um valor padrão de 1.
tamanho do lote (Opcional) Número máximo de mensagens em lote em um único conjunto de mensagens, com um valor padrão de 10000.
enableIdempotence (Opcional) Quando definido para true, garante que as mensagens são produzidas com sucesso exatamente uma vez e na ordem original de produção, com um valor padrão de false.
messageTimeoutMs (Opcional) O tempo limite da mensagem local, em milissegundos. Esse valor só é imposto localmente e limita o tempo que uma mensagem produzida aguarda a entrega bem-sucedida, com um padrão 300000. Um tempo de 0 é infinito. Esse valor é o tempo máximo usado para entregar uma mensagem (incluindo tentativas). O erro de entrega ocorre quando a contagem de novas tentativas ou o tempo limite da mensagem são excedidos.
requestTimeoutMs (Opcional) O tempo limite de confirmação da solicitação de saída, em milissegundos, com um padrão de 5000.
maxTentativas (Opcional) O número de vezes que tentar enviar novamente uma mensagem com falha, com um padrão de 2. Tentar novamente pode causar reordenação, a menos que EnableIdempotence esteja definido para true.
authenticationMode (Opcional) O modo de autenticação ao usar a autenticação SASL (Simple Authentication and Security Layer). Os valores suportados são NotSet (por defeito), Gssapi, Plain, ScramSha256, ScramSha512.
nome de utilizador (Opcional) O nome de usuário para autenticação SASL. Não suportado quando AuthenticationMode é Gssapi. Consulte Conexões para obter mais informações.
palavra-passe (Opcional) A senha para autenticação SASL. Não suportado quando AuthenticationMode é Gssapi. Consulte Conexões para obter mais informações.
protocolo (Opcional) O protocolo de segurança usado na comunicação com corretores. Os valores suportados são NotSet (por defeito), plaintext, ssl, sasl_plaintext, sasl_ssl.
sslCaLocalização (Opcional) Caminho para o arquivo de certificado da autoridade de certificação para verificar o certificado do corretor.
sslCertificateLocation (Opcional) Caminho para o certificado do cliente.
sslKeyLocalização (Opcional) Caminho para a chave privada do cliente (PEM) usada para autenticação.
sslKeyPassword (Opcional) Senha para o certificado do cliente.
sslCertificatePEM (Opcional) Certificado de cliente em formato PEM como uma cadeia. Consulte Conexões para obter mais informações.
sslKeyPEM (Opcional) Chave privada do cliente em formato PEM como uma cadeia. Consulte Conexões para obter mais informações.
sslCaPEM (Opcional) Certificado CA em formato PEM como string. Consulte Conexões para obter mais informações.
sslCertificateandKeyPEM (Opcional) Certificado do cliente e chave em formato PEM como uma string. Consulte Conexões para obter mais informações.
schemaRegistryUrl (Opcional) URL para o Avro Schema Registry. Consulte Conexões para obter mais informações.
schemaRegistryUsername (Opcional) Nome de utilizador para o Avro Schema Registry. Consulte Conexões para obter mais informações.
schemaRegistryPassword (Opcional) Palavra-passe para o Registo de Esquemas Avro. Consulte Conexões para obter mais informações.
oAuthBearerMethod (Opcional) Método OAuth Bearer. Os valores aceites são oidc e default.
oAuthBearerClientId (Opcional) Quando oAuthBearerMethod está definido para oidc, isto especifica o ID do cliente portador OAuth. Consulte Conexões para obter mais informações.
oAuthBearerClientSecret (Opcional) Quando oAuthBearerMethod está definido como oidc, isto especifica o secreto do cliente portador OAuth. Consulte Conexões para obter mais informações.
oAuthBearerScope (Opcional) Especifica o âmbito do pedido de acesso ao corretor.
oAuthBearerTokenEndpointUrl (Opcional) OAuth/OIDC endpoint do token emissor HTTP(S) URI usado para recuperar o token quando oidc o método é utilizado. Consulte Conexões para obter mais informações.

Configuração

A tabela a seguir explica as propriedades de configuração de associação definidas no arquivo function.json . O Python utiliza snake_case convenções de nomenclatura para propriedades de configuração.

function.json propriedade Description
type Definido como kafka.
direção Definido como out.
Designação O nome da variável que representa os dados intermediados no código da função.
broker_list (Obrigatório) A lista de corretores Kafka para os quais a saída é enviada. Consulte Conexões para obter mais informações.
topic (Obrigatório) O tópico para o qual a saída é enviada.
avroSchema (Opcional) Esquema de um registro genérico ao usar o protocolo Avro.
maxMessageBytes (Opcional) O tamanho máximo da mensagem de saída que está sendo enviada (em MB), com um valor padrão de 1.
tamanho do lote (Opcional) Número máximo de mensagens em lote em um único conjunto de mensagens, com um valor padrão de 10000.
enableIdempotence (Opcional) Quando definido para true, garante que as mensagens são produzidas com sucesso exatamente uma vez e na ordem original de produção, com um valor padrão de false.
messageTimeoutMs (Opcional) O tempo limite da mensagem local, em milissegundos. Esse valor só é imposto localmente e limita o tempo que uma mensagem produzida aguarda a entrega bem-sucedida, com um padrão 300000. Um tempo de 0 é infinito. Esse valor é o tempo máximo usado para entregar uma mensagem (incluindo tentativas). O erro de entrega ocorre quando a contagem de novas tentativas ou o tempo limite da mensagem são excedidos.
requestTimeoutMs (Opcional) O tempo limite de confirmação da solicitação de saída, em milissegundos, com um padrão de 5000.
maxTentativas (Opcional) O número de vezes que tentar enviar novamente uma mensagem com falha, com um padrão de 2. Tentar novamente pode causar reordenação, a menos que EnableIdempotence esteja definido para true.
authentication_mode (Opcional) O modo de autenticação ao usar a autenticação SASL (Simple Authentication and Security Layer). Os valores suportados são NOTSET (por defeito), Gssapi, Plain, ScramSha256, ScramSha512.
nome de utilizador (Opcional) O nome de usuário para autenticação SASL. Não suportado quando authentication_mode é Gssapi. Consulte Conexões para obter mais informações.
palavra-passe (Opcional) A senha para autenticação SASL. Não suportado quando authentication_mode é Gssapi. Consulte Conexões para obter mais informações.
protocolo (Opcional) O protocolo de segurança usado na comunicação com corretores. Os valores suportados são NOTSET (por defeito), plaintext, ssl, sasl_plaintext, sasl_ssl.
sslCaLocalização (Opcional) Caminho para o arquivo de certificado da autoridade de certificação para verificar o certificado do corretor.
sslCertificateLocation (Opcional) Caminho para o certificado do cliente.
sslKeyLocalização (Opcional) Caminho para a chave privada do cliente (PEM) usada para autenticação.
sslKeyPassword (Opcional) Senha para o certificado do cliente.
schema_registry_url (Opcional) URL para o Avro Schema Registry. Consulte Conexões para obter mais informações.
schema_registry_username (Opcional) Nome de utilizador para o Avro Schema Registry. Consulte Conexões para obter mais informações.
schema_registry_password (Opcional) Palavra-passe para o Registo de Esquemas Avro. Consulte Conexões para obter mais informações.
o_auth_bearer_method (Opcional) Método OAuth Bearer. Os valores aceites são oidc e default.
o_auth_bearer_client_id (Opcional) Quando o_auth_bearer_method está definido para oidc, isto especifica o ID do cliente portador OAuth. Consulte Conexões para obter mais informações.
o_auth_bearer_client_secret (Opcional) Quando o_auth_bearer_method está definido como oidc, isto especifica o secreto do cliente portador OAuth. Consulte Conexões para obter mais informações.
o_auth_bearer_scope (Opcional) Especifica o âmbito do pedido de acesso ao corretor.
o_auth_bearer_token_endpoint_url (Opcional) OAuth/OIDC endpoint do token emissor HTTP(S) URI usado para recuperar o token quando oidc o método é utilizado. Consulte Conexões para obter mais informações.

Nota

As propriedades relacionadas com PEM de certificados e as propriedades relacionadas com chaves Avro ainda não estão disponíveis na biblioteca Python.

Utilização

Tanto os tipos de chave como os de valor funcionam com a serialização Avro e Protobuf incorporada.

O deslocamento, a partição e o carimbo de data/hora do evento são gerados em tempo de execução. Podes definir apenas o valor e os cabeçalhos dentro da função. Define o tema no ficheiro function.json.

Certifica-te de que tens acesso ao tema Kafka onde queres escrever. Você configura a associação com credenciais de acesso e conexão para o tópico Kafka.

Num plano Premium, deve ativar a monitorização de escala em tempo de execução para que a saída Kafka escale para múltiplas instâncias. Para saber mais, consulte Habilitar o dimensionamento em tempo de execução.

Para obter um conjunto completo de configurações de host.json suportadas para o gatilho Kafka, consulte host.json configurações.

Ligações

Armazena toda a informação de ligação exigida pelos teus triggers e bindings nas definições da aplicação, não nas definições de binding do teu código. Esta orientação aplica-se a credenciais, que nunca deve armazenar no seu código.

Importante

As configurações de credenciais devem fazer referência a uma configuração de aplicativo. Não codifice credenciais em seu código ou arquivos de configuração. Ao executar localmente, use o arquivo local.settings.json para suas credenciais e não publique o arquivo local.settings.json.

Ao ligar-se a um cluster Kafka gerido fornecido pelo Confluent no Azure, pode usar um dos seguintes métodos de autenticação.

Nota

Ao utilizar o plano Flex Consumption, as propriedades de autenticação de certificados baseadas na localização de ficheiros (SslCaLocation, SslCertificateLocation, SslKeyLocation) não são suportadas. Em vez disso, utilize as propriedades de certificados baseadas em PEM (SslCaPEM, SslCertificatePEM, SslKeyPEM, SslCertificateandKeyPEM) ou armazene certificados no Azure Key Vault.

Registo de Esquemas

Para utilizar o registo de esquema fornecido pelo Confluent na Extensão Kafka, defina as seguintes credenciais:

Definição Valor Recomendado Description
SchemaRegistryUrl SchemaRegistryUrl URL do serviço de registo de esquemas usado para gestão de esquemas. Normalmente do formato https://psrc-xyz.us-east-2.aws.confluent.cloud
SchemaRegistryUsername CONFLUENT_API_KEY Nome de utilizador para autenticação básica no registo de esquemas (se necessário).
SchemaRegistryPassword CONFLUENT_API_SECRET Palavra-passe para autenticação básica no registo de esquemas (se necessário).

Autenticação por nome de utilizador/palavra-passe

Ao utilizar esta forma de autenticação, certifique-se de que Protocol está definido como SaslPlaintext ou SaslSsl, AuthenticationMode está definido como Plain, ScramSha256 ou ScramSha512 e, se o certificado CA utilizado for diferente do certificado padrão ISRG Root X1, certifique-se de atualizar SslCaLocation ou SslCaPEM.

Definição Valor recomendado Description
Lista de Corretores BootstrapServer A configuração do aplicativo nomeada BootstrapServer contém o valor do servidor de inicialização encontrado na página Configurações do Confluent Cloud. O valor é semelhante a xyz-xyzxzy.westeurope.azure.confluent.cloud:9092.
Nome de utilizador ConfluentCloudUsername A configuração do aplicativo nomeada ConfluentCloudUsername contém a chave de acesso da API do site Confluent Cloud.
Palavra-passe ConfluentCloudPassword A configuração do aplicativo nomeada ConfluentCloudPassword contém o segredo da API obtido do site Confluent Cloud.
SslCaPEM SSLCaPemCertificate Definição de aplicação chamada SSLCaPemCertificate que contém o certificado CA como uma string em formato PEM. O valor deve seguir o formato padrão, por exemplo: -----BEGIN CERTIFICATE-----\nMII....JQ==\n-----END CERTIFICATE-----.

Autenticação SSL

Certifique-se de que Protocol está definido como SSL.

Definição Valor Recomendado Description
Lista de Corretores BootstrapServer A configuração do aplicativo nomeada BootstrapServer contém o valor do servidor de inicialização encontrado na página Configurações do Confluent Cloud. O valor é semelhante a xyz-xyzxzy.westeurope.azure.confluent.cloud:9092.
SslCaPEM SslCaCertificatePem Definição de aplicação nomeada SslCaCertificatePem que contém o valor PEM do certificado da CA como uma string. O valor deve seguir o formato padrão: -----BEGIN CERTIFICATE-----\nMII...JQ==\n-----END CERTIFICATE-----
SslCertificatePEM SslClientCertificatePem Definição de aplicação nomeada SslClientCertificatePem que contém o valor PEM do certificado do cliente como uma string. O valor deve seguir o formato padrão: -----BEGIN CERTIFICATE-----\nMII...JQ==\n-----END CERTIFICATE-----
SslKeyPEM SslClientKeyPem Definição de aplicação nomeada SslClientKeyPem que contém o valor PEM da chave privada do cliente como uma string. O valor deve seguir o formato padrão: -----BEGIN PRIVATE KEY-----\nMII...JQ==\n-----END PRIVATE KEY-----
SslCertificateandKeyPEM SslClientCertificateAndKeyPem Definição de aplicação nomeada SslClientCertificateAndKeyPem que contém o valor PEM do certificado do cliente e da chave privada do cliente concatenadas como uma string. O valor deve seguir o formato padrão: -----BEGIN CERTIFICATE-----\nMII....JQ==\n-----END CERTIFICATE-----\n-----BEGIN PRIVATE KEY-----\nMIIE....BM=\n-----END PRIVATE KEY-----
SslKeyPassword SslClientKeyPassword Uma definição de aplicação chamada SslClientKeyPassword que contém a palavra-passe da chave privada (se existir).

Autenticação OAuth

Ao usar autenticação OAuth, configure as propriedades relacionadas com OAuth nas suas definições de ligação.

Os valores de cadeia de caracteres que você usa para essas configurações devem estar presentes como Values ou na coleção no arquivo local.settings.json durante o desenvolvimento local.

Deves também definir o Protocol e AuthenticationMode nas definições vinculativas.

Próximos passos