Freigeben über


Apache Kafka-Ausgabebindung für Azure Functions

Die Ausgabebindung ermöglicht es einer Azure Functions-App, Nachrichten an ein Kafka-Thema zu senden.

Wichtig

Kafka-Bindungen sind für Funktionen im Flex Consumption-Plan, Elastic Premium Plan und Dedicated (App Service)-Plan verfügbar. Sie werden nur in Version 4.x der Funktionslaufzeit unterstützt.

Beispiel

Die Verwendung der Bindung hängt von der C#-Modalität in Ihrer Funktions-App ab. Sie können eine der folgenden Modalitäten verwenden:

Eine kompilierte C#-Funktion, die eine klassenbibliothek für isolierten Arbeitsprozess verwendet, die in einem Prozess ausgeführt wird, der von der Laufzeit getrennt ist.

Die von Ihnen verwendeten Attribute hängen vom jeweiligen Ereignisanbieter ab.

Im folgenden Beispiel wird ein benutzerdefinierter Rückgabetyp namens verwendet MultipleOutputType, der aus einer HTTP-Antwort und einer Kafka-Ausgabe besteht.

[Function("KafkaOutput")]

public static MultipleOutputType Output(
    [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
    FunctionContext executionContext)
{
    var log = executionContext.GetLogger("HttpFunction");
    log.LogInformation("C# HTTP trigger function processed a request.");

    string message = req.FunctionContext
                        .BindingContext
                        .BindingData["message"]
                        .ToString();

    var response = req.CreateResponse(HttpStatusCode.OK);
    return new MultipleOutputType()
    {
        Kevent = message,
        HttpResponse = response
    };
}

In der MultipleOutputType Klasse Kevent ist die Ausgabebindungsvariable für die Kafka-Bindung.

public class MultipleOutputType
{
    [KafkaOutput("BrokerList",
                "topic",
                Username = "ConfluentCloudUserName",
                Password = "ConfluentCloudPassword",
        Protocol = BrokerProtocol.SaslSsl,
        AuthenticationMode = BrokerAuthenticationMode.Plain
    )]        
    public string Kevent { get; set; }

    public HttpResponseData HttpResponse { get; set; }
}

Um einen Batch von Ereignissen zu senden, übergeben Sie ein Zeichenfolgenarray an den Ausgabetyp, wie im folgenden Beispiel gezeigt:

[Function("KafkaOutputMany")]

public static MultipleOutputTypeForBatch Output(
    [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
    FunctionContext executionContext)
{
    var log = executionContext.GetLogger("HttpFunction");
    log.LogInformation("C# HTTP trigger function processed a request.");
    var response = req.CreateResponse(HttpStatusCode.OK);

    string[] messages = new string[2];
    messages[0] = "one";
    messages[1] = "two";

    return new MultipleOutputTypeForBatch()
    {
        Kevents = messages,
        HttpResponse = response
    };
}

Das Zeichenfolgenarray wird als Kevents Eigenschaft für die Klasse definiert, und die Ausgabebindung wird für diese Eigenschaft definiert:

public class MultipleOutputTypeForBatch
{
    [KafkaOutput("BrokerList",
                 "topic",
                 Username = "ConfluentCloudUserName",
                 Password = "ConfluentCloudPassword",
        Protocol = BrokerProtocol.SaslSsl,
        AuthenticationMode = BrokerAuthenticationMode.Plain
    )]        
    public string[] Kevents { get; set; }

    public HttpResponseData HttpResponse { get; set; }
}

Die folgende Funktion fügt den Kafka-Ausgabedaten Kopfzeilen hinzu:

[Function("KafkaOutputWithHeaders")]

public static MultipleOutputType Output(
    [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
    FunctionContext executionContext)
{
    var log = executionContext.GetLogger("HttpFunction");
    log.LogInformation("C# HTTP trigger function processed a request.");

    string message = req.FunctionContext
                        .BindingContext
                        .BindingData["message"]
                        .ToString();
    string kevent = "{ \"Offset\":364,\"Partition\":0,\"Topic\":\"kafkaeventhubtest1\",\"Timestamp\":\"2022-04-09T03:20:06.591Z\", \"Value\": \"" + message + "\", \"Headers\": [{ \"Key\": \"test\", \"Value\": \"dotnet-isolated\" }] }";
    var response = req.CreateResponse(HttpStatusCode.OK);
    return new MultipleOutputType()
    {
        Kevent = kevent,
        HttpResponse = response
    };
}

Eine vollständige Sammlung funktionierender .NET-Beispiele finden Sie im Kafka-Erweiterungsrepository.

Die Verwendung der Ausgabebindung hängt von Ihrer Version des Node.js-Programmiermodells ab.

Im Node.js v4-Modell definieren Sie die Ausgabebindung direkt in Ihrem Funktionscode. Weitere Informationen finden Sie im Node.js-Entwicklerhandbuch für Azure Functions.

In diesen Beispielen sind die Ereignisanbieter entweder Confluent oder Azure Event Hubs. Diese Beispiele zeigen eine Kafka-Ausgabebindung für eine Funktion, die eine HTTP-Anforderung auslöst und Daten aus der Anforderung an das Kafka-Thema sendet.

import {
  app,
  HttpRequest,
  HttpResponseInit,
  InvocationContext,
  output,
} from "@azure/functions";

const kafkaOutput = output.generic({
  type: "kafka",
  direction: "out",
  topic: "topic",
  brokerList: "%BrokerList%",
  username: "ConfluentCloudUsername",
  password: "ConfluentCloudPassword",
  protocol: "saslSsl",
  authenticationMode: "plain",
});

export async function kafkaOutputWithHttp(
  request: HttpRequest,
  context: InvocationContext
): Promise<HttpResponseInit> {
  context.log(`Http function processed request for url "${request.url}"`);

  const body = await request.text();
  const queryName = request.query.get("name");
  const parsedbody = JSON.parse(body);
  const name = queryName || parsedbody.name || "world";
  context.extraOutputs.set(kafkaOutput, `Hello, ${parsedbody.name}!`);
  context.log(
    `Sending message to kafka: ${context.extraOutputs.get(kafkaOutput)}`
  );
  return {
    body: `Message sent to kafka with value: ${context.extraOutputs.get(
      kafkaOutput
    )}`,
    status: 200,
  };
}

const extraOutputs = [];
extraOutputs.push(kafkaOutput);

app.http("kafkaOutputWithHttp", {
  methods: ["GET", "POST"],
  authLevel: "anonymous",
  extraOutputs,
  handler: kafkaOutputWithHttp,
});

Um Ereignisse in einem Batch zu senden, senden Sie ein Array von Nachrichten, wie in den folgenden Beispielen gezeigt:

const { app, output } = require("@azure/functions");

const kafkaOutput = output.generic({
  type: "kafka",
  direction: "out",
  topic: "topic",
  brokerList: "%BrokerList%",
  username: "ConfluentCloudUsername",
  password: "ConfluentCloudPassword",
  protocol: "saslSsl",
  authenticationMode: "plain",
});

async function kafkaOutputManyWithHttp(request, context) {
  context.log(`Http function processed request for url "${request.url}"`);

  const queryName = request.query.get("name");
  const body = await request.text();
  const parsedbody = body ? JSON.parse(body) : {};
  parsedbody.name = parsedbody.name || "world";
  const name = queryName || parsedbody.name;
  context.extraOutputs.set(kafkaOutput, `Message one. Hello, ${name}!`);
  context.extraOutputs.set(kafkaOutput, `Message two. Hello, ${name}!`);
  return {
    body: `Messages sent to kafka.`,
    status: 200,
  };
}

const extraOutputs = [];
extraOutputs.push(kafkaOutput);

app.http("kafkaOutputManyWithHttp", {
  methods: ["GET", "POST"],
  authLevel: "anonymous",
  extraOutputs,
  handler: kafkaOutputManyWithHttp,
});

In diesen Beispielen wird gezeigt, wie Sie eine Ereignisnachricht mit Kopfzeilen an ein Kafka-Thema senden:

import {
  app,
  HttpRequest,
  HttpResponseInit,
  InvocationContext,
  output,
} from "@azure/functions";

const kafkaOutput = output.generic({
  type: "kafka",
  direction: "out",
  topic: "topic",
  brokerList: "%BrokerList%",
  username: "ConfluentCloudUsername",
  password: "ConfluentCloudPassword",
  protocol: "saslSsl",
  authenticationMode: "plain",
});

export async function kafkaOutputWithHttp(
  request: HttpRequest,
  context: InvocationContext
): Promise<HttpResponseInit> {
  context.log(`Http function processed request for url "${request.url}"`);

  const body = await request.text();
  const parsedbody = JSON.parse(body);
  // assuming body is of the format { "key": "key", "value": {JSON object} }
  context.extraOutputs.set(
    kafkaOutput,
    `{ "Offset":364,"Partition":0,"Topic":"test-topic","Timestamp":"2022-04-09T03:20:06.591Z", "Value": "${JSON.stringify(
      parsedbody.value
    ).replace(/"/g, '\\"')}", "Key":"${
      parsedbody.key
    }", "Headers": [{ "Key": "language", "Value": "javascript" }] }`
  );
  context.log(
    `Sending message to kafka: ${context.extraOutputs.get(kafkaOutput)}`
  );
  return {
    body: `Message sent to kafka with value: ${context.extraOutputs.get(
      kafkaOutput
    )}`,
    status: 200,
  };
}

const extraOutputs = [];
extraOutputs.push(kafkaOutput);

app.http("kafkaOutputWithHttp", {
  methods: ["GET", "POST"],
  authLevel: "anonymous",
  extraOutputs,
  handler: kafkaOutputWithHttp,
});

Eine vollständige Sammlung funktionierender JavaScript-Beispiele finden Sie im Kafka-Erweiterungsrepository.

import {
  app,
  HttpRequest,
  HttpResponseInit,
  InvocationContext,
  output,
} from "@azure/functions";

const kafkaOutput = output.generic({
  type: "kafka",
  direction: "out",
  topic: "topic",
  brokerList: "%BrokerList%",
  username: "ConfluentCloudUsername",
  password: "ConfluentCloudPassword",
  protocol: "saslSsl",
  authenticationMode: "plain",
});

export async function kafkaOutputWithHttp(
  request: HttpRequest,
  context: InvocationContext
): Promise<HttpResponseInit> {
  context.log(`Http function processed request for url "${request.url}"`);

  const body = await request.text();
  const queryName = request.query.get("name");
  const parsedbody = JSON.parse(body);
  const name = queryName || parsedbody.name || "world";
  context.extraOutputs.set(kafkaOutput, `Hello, ${parsedbody.name}!`);
  context.log(
    `Sending message to kafka: ${context.extraOutputs.get(kafkaOutput)}`
  );
  return {
    body: `Message sent to kafka with value: ${context.extraOutputs.get(
      kafkaOutput
    )}`,
    status: 200,
  };
}

const extraOutputs = [];
extraOutputs.push(kafkaOutput);

app.http("kafkaOutputWithHttp", {
  methods: ["GET", "POST"],
  authLevel: "anonymous",
  extraOutputs,
  handler: kafkaOutputWithHttp,
});

Um Ereignisse in einem Batch zu senden, senden Sie ein Array von Nachrichten, wie in den folgenden Beispielen gezeigt:

import {
  app,
  HttpRequest,
  HttpResponseInit,
  InvocationContext,
  output,
} from "@azure/functions";

const kafkaOutput = output.generic({
  type: "kafka",
  direction: "out",
  topic: "topic",
  brokerList: "%BrokerList%",
  username: "ConfluentCloudUsername",
  password: "ConfluentCloudPassword",
  protocol: "saslSsl",
  authenticationMode: "plain",
});

export async function kafkaOutputManyWithHttp(
  request: HttpRequest,
  context: InvocationContext
): Promise<HttpResponseInit> {
  context.log(`Http function processed request for url "${request.url}"`);

  const queryName = request.query.get("name");
  const body = await request.text();
  const parsedbody = body ? JSON.parse(body) : {};
  parsedbody.name = parsedbody.name || "world";
  const name = queryName || parsedbody.name;
  context.extraOutputs.set(kafkaOutput, `Message one. Hello, ${name}!`);
  context.extraOutputs.set(kafkaOutput, `Message two. Hello, ${name}!`);
  return {
    body: `Messages sent to kafka.`,
    status: 200,
  };
}

const extraOutputs = [];
extraOutputs.push(kafkaOutput);

app.http("kafkaOutputManyWithHttp", {
  methods: ["GET", "POST"],
  authLevel: "anonymous",
  extraOutputs,
  handler: kafkaOutputManyWithHttp,
});

In diesen Beispielen wird gezeigt, wie Sie eine Ereignisnachricht mit Kopfzeilen an ein Kafka-Thema senden:

import {
  app,
  HttpRequest,
  HttpResponseInit,
  InvocationContext,
  output,
} from "@azure/functions";

const kafkaOutput = output.generic({
  type: "kafka",
  direction: "out",
  topic: "topic",
  brokerList: "%BrokerList%",
  username: "ConfluentCloudUsername",
  password: "ConfluentCloudPassword",
  protocol: "saslSsl",
  authenticationMode: "plain",
});

export async function kafkaOutputWithHttp(
  request: HttpRequest,
  context: InvocationContext
): Promise<HttpResponseInit> {
  context.log(`Http function processed request for url "${request.url}"`);

  const body = await request.text();
  const parsedbody = JSON.parse(body);
  // assuming body is of the format { "key": "key", "value": {JSON object} }
  context.extraOutputs.set(
    kafkaOutput,
    `{ "Offset":364,"Partition":0,"Topic":"test-topic","Timestamp":"2022-04-09T03:20:06.591Z", "Value": "${JSON.stringify(
      parsedbody.value
    ).replace(/"/g, '\\"')}", "Key":"${
      parsedbody.key
    }", "Headers": [{ "Key": "language", "Value": "typescript" }] }`
  );
  context.log(
    `Sending message to kafka: ${context.extraOutputs.get(kafkaOutput)}`
  );
  return {
    body: `Message sent to kafka with value: ${context.extraOutputs.get(
      kafkaOutput
    )}`,
    status: 200,
  };
}

const extraOutputs = [];
extraOutputs.push(kafkaOutput);

app.http("kafkaOutputWithHttp", {
  methods: ["GET", "POST"],
  authLevel: "anonymous",
  extraOutputs,
  handler: kafkaOutputWithHttp,
});

Einen vollständigen Satz funktionierender TypeScript-Beispiele finden Sie im Kafka-Erweiterungs-Repository.

Die spezifischen Eigenschaften der Datei „function.json“ hängen von Ihrem Ereignisanbieter ab, der in diesen Beispielen entweder Confluent oder Azure Event Hubs ist. Die folgenden Beispiele zeigen eine Kafka-Ausgabebindung für eine Funktion, die eine HTTP-Anforderung auslöst und Daten aus der Anforderung an das Kafka-Thema sendet.

Die folgende Datei „function.json“ definiert den Trigger für den spezifischen Anbieter in diesen Beispielen:

{
  "bindings": [
    {
      "authLevel": "function",
      "type": "httpTrigger",
      "direction": "in",
      "name": "Request",
      "methods": [
        "get"
      ]
    },
    {
      "type": "kafka",
      "name": "outputMessage",
      "brokerList": "BrokerList",
      "topic": "topic",
      "username" : "%ConfluentCloudUserName%",
      "password" : "%ConfluentCloudPassword%",
      "protocol": "SASLSSL",
      "authenticationMode": "PLAIN",
      "direction": "out"
    },
    {
      "type": "http",
      "direction": "out",
      "name": "Response"
    }
  ]
}

Der folgende Code sendet eine Nachricht an das Thema:

using namespace System.Net

# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)

# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."

# Interact with query parameters or the body of the request.
$message = $Request.Query.Message

$message

Push-OutputBinding -Name outputMessage -Value ($message)

# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
    StatusCode = [HttpStatusCode]::OK
})

Der folgende Code sendet mehrere Nachrichten als Array an dasselbe Thema:

using namespace System.Net

# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)

# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."

$message = @("one", "two")
Push-OutputBinding -Name outputMessage -Value ($message)

# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
    StatusCode = [HttpStatusCode]::OK
})

Das folgende Beispiel zeigt, wie eine Ereignisnachricht mit Headern an dasselbe Kafka-Thema gesendet wird:

using namespace System.Net

# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)

# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."

# Interact with query parameters or the body of the request.
$message = $Request.Query.Message
if (-not $message) {
    $message = $Request.Body.Message
}

$kevent = @{
    Offset = 364
    Partition = 0
    Topic = "kafkaeventhubtest1"
    Timestamp = "2022-04-09T03:20:06.591Z"
    Value = $message
    Headers= @(@{
        Key= "test"
        Value= "powershell"
    }
    )
}

Push-OutputBinding -Name Message -Value $kevent

# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
    StatusCode = [HttpStatusCode]::OK
    Body = 'ok'
})

Eine vollständige Sammlung funktionierender PowerShell-Beispiele finden Sie im Kafka-Erweiterungsrepository.


Die Verwendung der Ausgabebindung hängt von Ihrer Version des Python-Programmiermodells ab.

Im Python v2-Modell definieren Sie Ihre Ausgabebindung direkt in Ihrem Funktionscode mithilfe von Dekoratoren. Weitere Informationen finden Sie im Entwicklerhandbuch für Azure Functions Python.

Diese Beispiele zeigen eine Kafka-Ausgabebindung für eine Funktion, die eine HTTP-Anforderung auslöst und Daten aus der Anforderung an das Kafka-Thema sendet.

    input_msg = req.params.get('message')
    outputMessage.set(input_msg)
    return 'OK'


@KafkaOutput.function_name(name="KafkaOutputMany")
@KafkaOutput.route(route="kafka_output_many")
@KafkaOutput.kafka_output(arg_name="outputMessage", topic="KafkaTopic", broker_list="KafkaBrokerList", username="KafkaUsername", password="KafkaPassword", protocol="SaslSsl", authentication_mode="Plain", data_type="string")
def kafka_output_many(req: func.HttpRequest, outputMessage: func.Out[str] ) -> func.HttpResponse:
    outputMessage.set(json.dumps(['one', 'two']))
    return 'OK'

Um Ereignisse in einem Batch zu senden, senden Sie ein Array von Nachrichten, wie in den folgenden Beispielen gezeigt:

@KafkaOutput.route(route="kafka_output_with_headers")
@KafkaOutput.kafka_output(arg_name="out", topic="KafkaTopic", broker_list="KafkaBrokerList", username="KafkaUsername", password="KafkaPassword", protocol="SaslSsl", authentication_mode="Plain")
def kafka_output_with_headers(req: func.HttpRequest, out: func.Out[str]) -> func.HttpResponse:
    message = req.params.get('message')
    kevent =  { "Offset":0,"Partition":0,"Topic":"dummy","Timestamp":"2022-04-09T03:20:06.591Z", "Value": message, "Headers": [{ "Key": "test", "Value": "python" }] }
    out.set(json.dumps(kevent))
    return 'OK'

@KafkaOutput.function_name(name="KafkaOutputManyWithHeaders")
@KafkaOutput.route(route="kafka_output_many_with_headers")
@KafkaOutput.kafka_output(arg_name="out", topic="KafkaTopic", broker_list="KafkaBrokerList", username="KafkaUsername", password="KafkaPassword", protocol="SaslSsl", authentication_mode="Plain")
def kafka_output_many_with_headers(req: func.HttpRequest, out: func.Out[str]) -> func.HttpResponse:
    kevent = [{ "Offset": 364, "Partition":0,"Topic":"kafkaeventhubtest1","Timestamp":"2022-04-09T03:20:06.591Z", "Value": "one", "Headers": [{ "Key": "test", "Value": "python" }]  },

In diesen Beispielen wird gezeigt, wie Sie eine Ereignisnachricht mit Kopfzeilen an ein Kafka-Thema senden:

out.set(json.dumps(kevent))
return 'OK'

Eine vollständige Sammlung funktionierender Python-Beispiele finden Sie im Kafka-Erweiterungsrepository.

Die Anmerkungen, die Sie zum Konfigurieren der Ausgabebindung verwenden, hängen vom jeweiligen Ereignisanbieter ab.

Die folgende Funktion sendet eine Nachricht an das Kafka-Thema.

@FunctionName("KafkaOutput")
public HttpResponseMessage run(
        @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
        @KafkaOutput(
            name = "kafkaOutput",
            topic = "topic",  
            brokerList="%BrokerList%",
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.  
            protocol = BrokerProtocol.SASLSSL
        )  OutputBinding<String> output,
        final ExecutionContext context) {
    context.getLogger().info("Java HTTP trigger processed a request.");

    // Parse query parameter
    String query = request.getQueryParameters().get("message");
    String message = request.getBody().orElse(query);
    context.getLogger().info("Message:" + message);
    output.setValue(message);
    return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();

Das folgende Beispiel zeigt, wie mehrere Nachrichten an ein Kafka-Thema gesendet werden.

@FunctionName("KafkaOutputMany")
public HttpResponseMessage run(
        @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
        @KafkaOutput(
            name = "kafkaOutput",
            topic = "topic",  
            brokerList="%BrokerList%",
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.  
            protocol = BrokerProtocol.SASLSSL
        )  OutputBinding<String[]> output,
        final ExecutionContext context) {
    context.getLogger().info("Java HTTP trigger processed a request.");
    String[] messages = new String[2];
    messages[0] = "one";
    messages[1] = "two";
    output.setValue(messages);
    return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
}

In diesem Beispiel wird der Ausgabebindungsparameter in ein Zeichenfolgenarray geändert.

Im letzten Beispiel werden diese KafkaEntity und KafkaHeader Klassen verwendet:

public class KafkaEntity {
    public int Offset;
    public int Partition;
    public String Timestamp;
    public String Topic;
    public String Value;
    public KafkaHeaders Headers[];

    public KafkaEntity(int Offset, int Partition, String Topic, String Timestamp, String Value,KafkaHeaders[] headers) {
        this.Offset = Offset;
        this.Partition = Partition;
        this.Topic = Topic;
        this.Timestamp = Timestamp;
        this.Value = Value;
        this.Headers = headers;
    }
public class KafkaHeaders{
    public String Key;
    public String Value;

    public KafkaHeaders(String key, String value) {
        this.Key = key;
        this.Value = value;
    }

Die folgende Beispielfunktion sendet eine Nachricht mit Headern an ein Kafka-Thema.

@FunctionName("KafkaOutputWithHeaders")
public HttpResponseMessage run(
        @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
        @KafkaOutput(
            name = "kafkaOutput",
            topic = "topic",  
            brokerList="%BrokerList%",
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.  
            protocol = BrokerProtocol.SASLSSL
        )  OutputBinding<KafkaEntity> output,
        final ExecutionContext context) {
            context.getLogger().info("Java HTTP trigger processed a request.");
    
            // Parse query parameter
            String query = request.getQueryParameters().get("message");
            String message = request.getBody().orElse(query);
            KafkaHeaders[] headers = new KafkaHeaders[1];
            headers[0] = new KafkaHeaders("test", "java");
            KafkaEntity kevent = new KafkaEntity(364, 0, "topic", "2022-04-09T03:20:06.591Z", message, headers);
            output.setValue(kevent);
            return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
        }

Eine vollständige Sammlung funktionierender Java-Beispiele für Confluent finden Sie im Kafka-Erweiterungsrepository.

Attribute

Von C#-Bibliotheken des Typs In-Process und isolierter Workerprozess wird das Attribut Kafka verwendet, um die Funktion zu definieren.

In der folgenden Tabelle werden die Eigenschaften erläutert, die Sie mithilfe dieses Attributs festlegen können:

Parameter BESCHREIBUNG
BrokerList (Erforderlich) Die Liste der Kafka-Broker, an die die Ausgabe gesendet wird. Weitere Informationen finden Sie unter Verbindungen.
Thema (Erforderlich) Das Thema, an das die Ausgabe gesendet wird.
AvroSchema (Optional) Schema eines generischen Datensatzes von Nachrichtenwert bei Verwendung des Avro-Protokolls.
KeyAvroSchema (Optional) Schema eines generischen Eintrags von Nachrichtenschlüsseln bei Verwendung des Avro-Protokolls.
KeyDataType (Optional) Datentyp zum Senden des Nachrichtenschlüssels an kafka Topic. Wenn KeyAvroSchema dieser Wert festgelegt ist, ist dieser Wert generischer Datensatz. Akzeptierte Werte sind Int, Long, String, und Binary.
MaxMessageBytes (Optional) Die maximale Größe der gesendeten Ausgabenachricht (in MB) mit einem Standardwert von 1.
BatchSize (Optional) Die maximale Anzahl von Nachrichten, die in einem einzelnen Nachrichtensatz mit einem Standardwert von 10000 als Batch zusammengefasst werden.
EnableIdempotence (Optional) Wenn diese Einstellung auf true festgelegt ist, wird garantiert, dass Nachrichten genau ein Mal und in der ursprünglichen Generierungsreihenfolge mit einem Standardwert von false erfolgreich generiert werden.
MessageTimeoutMs (Optional) Das lokale Nachrichtentimeout in Millisekunden. Dieser Wert wird nur lokal erzwungen und begrenzt die Zeit, die eine generierte Nachricht mit einem Standardwert von 300000 auf ihre erfolgreiche Übermittlung wartet. Eine Zeitangabe von 0 ist unendlich. Dieser Wert ist die maximale Zeit für die Übermittlung einer Nachricht (einschließlich Wiederholungsversuche). Ein Übermittlungsfehler tritt auf, wenn entweder die Anzahl der Wiederholungsversuche oder das Nachrichtentimeout überschritten wird.
RequestTimeoutMs (Optional) Das Bestätigungstimeout der Ausgabeanforderung in Millisekunden mit einem Standardwert von 5000.
MaxRetries (Optional) Die Anzahl der Wiederholungsversuche zum Senden einer fehlgeschlagenen Nachricht mit einem Standardwert von 2. Wiederholungsversuche können dazu führen, dass sich die Reihenfolge ändert, es sei denn, EnableIdempotence ist auf true festgelegt.
AuthenticationMode (Optional) Der Authentifizierungsmodus bei Verwendung der SASL-Authentifizierung (Simple Authentication and Security Layer). Die unterstützten Werte sind NotSet (Standard), Gssapi, , Plain, ScramSha256, und OAuthBearerScramSha512.
Benutzername (Optional) Der Benutzername für die SASL-Authentifizierung. Wird nicht unterstützt, wenn AuthenticationMode den Wert Gssapi aufweist. Weitere Informationen finden Sie unter Verbindungen.
Kennwort (Optional) Das Kennwort für die SASL-Authentifizierung. Wird nicht unterstützt, wenn AuthenticationMode den Wert Gssapi aufweist. Weitere Informationen finden Sie unter Verbindungen.
Protokoll (Optional) Das Sicherheitsprotokoll, das beim Kommunizieren mit Brokern verwendet wird. Die unterstützten Werte sind NotSet (Standard), plaintext, ssl, , sasl_plaintext. sasl_ssl
SslCaLocation (Optional) Der Pfad zur Zertifikatdatei der Zertifizierungsstelle zum Überprüfen des Zertifikats des Brokers.
SslCertificateLocation (Optional) Der Pfad zum Zertifikat des Clients.
SslKeyLocation (Optional) Der Pfad zum privaten Schlüssel (PEM) des Clients, der für die Authentifizierung verwendet wird.
SslKeyPassword (Optional) Das Kennwort für das Zertifikat des Clients.
SslCertificatePEM (Optional) Clientzertifikat im PEM-Format als Zeichenfolge. Weitere Informationen finden Sie unter Verbindungen.
SslKeyPEM (Optional) Privater Clientschlüssel im PEM-Format als Zeichenfolge. Weitere Informationen finden Sie unter Verbindungen.
SslCaPEM (Optional) Zertifizierungsstellenzertifikat im PEM-Format als Zeichenfolge. Weitere Informationen finden Sie unter Verbindungen.
SslCertificateandKeyPEM (Optional) Clientzertifikat und Schlüssel im PEM-Format als Zeichenfolge. Weitere Informationen finden Sie unter Verbindungen.
SchemaRegistryUrl (Optional) URL für die Avro-Schemaregistrierung. Weitere Informationen finden Sie unter Verbindungen.
SchemaRegistryUsername (Optional) Benutzername für die Avro-Schemaregistrierung. Weitere Informationen finden Sie unter Verbindungen.
SchemaRegistryPassword (Optional) Kennwort für die Avro-Schemaregistrierung. Weitere Informationen finden Sie unter Verbindungen.
OAuthBearerMethod (Optional) OAuth Bearer-Methode. Akzeptierte Werte sind oidc und default.
OAuthBearerClientId (Optional) Wenn OAuthBearerMethod dieser Wert auf oidc festgelegt ist, gibt dies die OAuth-Bearer-Client-ID an. Weitere Informationen finden Sie unter Verbindungen.
OAuthBearerClientSecret (Optional) Wenn OAuthBearerMethod dieser Wert auf oidc festgelegt ist, gibt dies den geheimen OAuth-Bearer-Clientschlüssel an. Weitere Informationen finden Sie unter Verbindungen.
OAuthBearerScope (Optional) Gibt den Umfang der Zugriffsanforderung an den Broker an.
OAuthBearerTokenEndpointUrl (Optional) OAuth/OIDC issuer token endpoint HTTP(S) URI used to retrieve token when oidc method is used. Weitere Informationen finden Sie unter Verbindungen.
OAuthBearerExtensions (Optional) Kommagetrennte Liste der Schlüssel=Wertpaare, die als zusätzliche Informationen für den Broker bereitgestellt werden sollen, wenn oidc die Methode verwendet wird. Beispiel: supportFeatureX=true,organizationId=sales-emea.

Anmerkungen

Mit der KafkaOutput Anmerkung können Sie eine Funktion erstellen, die in ein bestimmtes Thema schreibt. Unterstützte Optionen umfassen die folgenden Elemente:

Element BESCHREIBUNG
name Der Name der Variablen, die die Brokerdaten im Funktionscode darstellt.
brokerList (Erforderlich) Die Liste der Kafka-Broker, an die die Ausgabe gesendet wird. Weitere Informationen finden Sie unter Verbindungen.
topic (Erforderlich) Das Thema, an das die Ausgabe gesendet wird.
dataType Definiert, wie Functions den Parameterwert verarbeitet. Standardmäßig wird der Wert als Zeichenfolge abgerufen, und Functions versucht, die Zeichenfolge in das tatsächliche POJO (Plain-Old Java Object) zu deserialisieren. Bei string wird die Eingabe nur als Zeichenfolge behandelt. Wenn binary, wird die Nachricht als Binärdaten empfangen, und Functions versucht, sie in einen tatsächlichen Parametertyp byte[] zu deserialisieren.
avroSchema (Optional) Das Schema eines generischen Datensatzes, wenn das Avro-Protokoll verwendet wird. (Derzeit nicht für Java unterstützt.)
maxMessageBytes (Optional) Die maximale Größe der gesendeten Ausgabenachricht (in MB) mit einem Standardwert von 1.
batchSize (Optional) Die maximale Anzahl von Nachrichten, die in einem einzelnen Nachrichtensatz mit einem Standardwert von 10000 als Batch zusammengefasst werden.
enableIdempotence (Optional) Bei Festlegung auf true, garantiert, dass Nachrichten erfolgreich genau einmal und in der ursprünglichen Serienreihenfolge erstellt werden, mit einem Standardwert von false.
messageTimeoutMs (Optional) Das lokale Nachrichtentimeout in Millisekunden. Dieser Wert wird nur lokal erzwungen und begrenzt die Zeit, die eine generierte Nachricht mit einem Standardwert von 300000 auf ihre erfolgreiche Übermittlung wartet. Eine Zeitangabe von 0 ist unendlich. Dieser Wert ist die maximale Zeit für die Übermittlung einer Nachricht (einschließlich Wiederholungsversuche). Ein Übermittlungsfehler tritt auf, wenn entweder die Anzahl der Wiederholungsversuche oder das Nachrichtentimeout überschritten wird.
requestTimeoutMs (Optional) Das Bestätigungstimeout der Ausgabeanforderung in Millisekunden mit einem Standardwert von 5000.
maxRetries (Optional) Die Anzahl der Wiederholungsversuche zum Senden einer fehlgeschlagenen Nachricht mit einem Standardwert von 2. Beim Wiederholen kann die Neuanordnung verursacht werden, es sei denn EnableIdempotence , sie ist auf true.
authenticationMode (Optional) Der Authentifizierungsmodus bei Verwendung der SASL-Authentifizierung (Simple Authentication and Security Layer). Die unterstützten Werte sind NotSet (Standard), Gssapi, Plain, , ScramSha256. ScramSha512
username (Optional) Der Benutzername für die SASL-Authentifizierung. Wird nicht unterstützt, wenn AuthenticationMode den Wert Gssapi aufweist. Weitere Informationen finden Sie unter Verbindungen.
password (Optional) Das Kennwort für die SASL-Authentifizierung. Wird nicht unterstützt, wenn AuthenticationMode den Wert Gssapi aufweist. Weitere Informationen finden Sie unter Verbindungen.
protocol (Optional) Das Sicherheitsprotokoll, das beim Kommunizieren mit Brokern verwendet wird. Die unterstützten Werte sind NotSet (Standard), plaintext, ssl, , sasl_plaintext. sasl_ssl
sslCaLocation (Optional) Der Pfad zur Zertifikatdatei der Zertifizierungsstelle zum Überprüfen des Zertifikats des Brokers.
sslCertificateLocation (Optional) Der Pfad zum Zertifikat des Clients.
sslKeyLocation (Optional) Der Pfad zum privaten Schlüssel (PEM) des Clients, der für die Authentifizierung verwendet wird.
sslKeyPassword (Optional) Das Kennwort für das Zertifikat des Clients.
schemaRegistryUrl (Optional) URL für die Avro-Schemaregistrierung. Weitere Informationen finden Sie unter Verbindungen.
schemaRegistryUsername (Optional) Benutzername für die Avro-Schemaregistrierung. Weitere Informationen finden Sie unter Verbindungen.
schemaRegistryPassword (Optional) Kennwort für die Avro-Schemaregistrierung. Weitere Informationen finden Sie unter Verbindungen.

Konfiguration

Die folgende Tabelle gibt Aufschluss über die Bindungskonfigurationseigenschaften, die Sie in der Datei function.json festlegen.

function.json-Eigenschaft BESCHREIBUNG
type Auf kafka festlegen.
direction Auf out festlegen.
name Der Name der Variablen, die die Brokerdaten im Funktionscode darstellt.
brokerList (Erforderlich) Die Liste der Kafka-Broker, an die die Ausgabe gesendet wird. Weitere Informationen finden Sie unter Verbindungen.
topic (Erforderlich) Das Thema, an das die Ausgabe gesendet wird.
avroSchema (Optional) Das Schema eines generischen Datensatzes, wenn das Avro-Protokoll verwendet wird.
keyAvroSchema (Optional) Schema eines generischen Eintrags von Nachrichtenschlüsseln bei Verwendung des Avro-Protokolls.
keyDataType (Optional) Datentyp zum Senden des Nachrichtenschlüssels an kafka Topic. Wenn keyAvroSchema dieser Wert festgelegt ist, ist dieser Wert generischer Datensatz. Akzeptierte Werte sind Int, Long, String, und Binary.
maxMessageBytes (Optional) Die maximale Größe der gesendeten Ausgabenachricht (in MB) mit einem Standardwert von 1.
batchSize (Optional) Die maximale Anzahl von Nachrichten, die in einem einzelnen Nachrichtensatz mit einem Standardwert von 10000 als Batch zusammengefasst werden.
enableIdempotence (Optional) Bei Festlegung auf true, garantiert, dass Nachrichten erfolgreich genau einmal und in der ursprünglichen Serienreihenfolge erstellt werden, mit einem Standardwert von false.
messageTimeoutMs (Optional) Das lokale Nachrichtentimeout in Millisekunden. Dieser Wert wird nur lokal erzwungen und begrenzt die Zeit, die eine generierte Nachricht mit einem Standardwert von 300000 auf ihre erfolgreiche Übermittlung wartet. Eine Zeitangabe von 0 ist unendlich. Dieser Wert ist die maximale Zeit für die Übermittlung einer Nachricht (einschließlich Wiederholungsversuche). Ein Übermittlungsfehler tritt auf, wenn entweder die Anzahl der Wiederholungsversuche oder das Nachrichtentimeout überschritten wird.
requestTimeoutMs (Optional) Das Bestätigungstimeout der Ausgabeanforderung in Millisekunden mit einem Standardwert von 5000.
maxRetries (Optional) Die Anzahl der Wiederholungsversuche zum Senden einer fehlgeschlagenen Nachricht mit einem Standardwert von 2. Beim Wiederholen kann die Neuanordnung verursacht werden, es sei denn EnableIdempotence , sie ist auf true.
authenticationMode (Optional) Der Authentifizierungsmodus bei Verwendung der SASL-Authentifizierung (Simple Authentication and Security Layer). Die unterstützten Werte sind NotSet (Standard), Gssapi, Plain, , ScramSha256. ScramSha512
username (Optional) Der Benutzername für die SASL-Authentifizierung. Wird nicht unterstützt, wenn AuthenticationMode den Wert Gssapi aufweist. Weitere Informationen finden Sie unter Verbindungen.
password (Optional) Das Kennwort für die SASL-Authentifizierung. Wird nicht unterstützt, wenn AuthenticationMode den Wert Gssapi aufweist. Weitere Informationen finden Sie unter Verbindungen.
protocol (Optional) Das Sicherheitsprotokoll, das beim Kommunizieren mit Brokern verwendet wird. Die unterstützten Werte sind NotSet (Standard), plaintext, ssl, , sasl_plaintext. sasl_ssl
sslCaLocation (Optional) Der Pfad zur Zertifikatdatei der Zertifizierungsstelle zum Überprüfen des Zertifikats des Brokers.
sslCertificateLocation (Optional) Der Pfad zum Zertifikat des Clients.
sslKeyLocation (Optional) Der Pfad zum privaten Schlüssel (PEM) des Clients, der für die Authentifizierung verwendet wird.
sslKeyPassword (Optional) Das Kennwort für das Zertifikat des Clients.
sslCertificatePEM (Optional) Clientzertifikat im PEM-Format als Zeichenfolge. Weitere Informationen finden Sie unter Verbindungen.
sslKeyPEM (Optional) Privater Clientschlüssel im PEM-Format als Zeichenfolge. Weitere Informationen finden Sie unter Verbindungen.
sslCaPEM (Optional) Zertifizierungsstellenzertifikat im PEM-Format als Zeichenfolge. Weitere Informationen finden Sie unter Verbindungen.
sslCertificateandKeyPEM (Optional) Clientzertifikat und Schlüssel im PEM-Format als Zeichenfolge. Weitere Informationen finden Sie unter Verbindungen.
schemaRegistryUrl (Optional) URL für die Avro-Schemaregistrierung. Weitere Informationen finden Sie unter Verbindungen.
schemaRegistryUsername (Optional) Benutzername für die Avro-Schemaregistrierung. Weitere Informationen finden Sie unter Verbindungen.
schemaRegistryPassword (Optional) Kennwort für die Avro-Schemaregistrierung. Weitere Informationen finden Sie unter Verbindungen.
oAuthBearerMethod (Optional) OAuth Bearer-Methode. Akzeptierte Werte sind oidc und default.
oAuthBearerClientId (Optional) Wenn oAuthBearerMethod dieser Wert auf oidc festgelegt ist, gibt dies die OAuth-Bearer-Client-ID an. Weitere Informationen finden Sie unter Verbindungen.
oAuthBearerClientSecret (Optional) Wenn oAuthBearerMethod dieser Wert auf oidc festgelegt ist, gibt dies den geheimen OAuth-Bearer-Clientschlüssel an. Weitere Informationen finden Sie unter Verbindungen.
oAuthBearerScope (Optional) Gibt den Umfang der Zugriffsanforderung an den Broker an.
oAuthBearerTokenEndpointUrl (Optional) OAuth/OIDC issuer token endpoint HTTP(S) URI used to retrieve token when oidc method is used. Weitere Informationen finden Sie unter Verbindungen.

Konfiguration

Die folgende Tabelle gibt Aufschluss über die Bindungskonfigurationseigenschaften, die Sie in der Datei function.json festlegen. Python verwendet snake_case Benennungskonventionen für Konfigurationseigenschaften.

function.json-Eigenschaft BESCHREIBUNG
type Auf kafka festlegen.
direction Auf out festlegen.
name Der Name der Variablen, die die Brokerdaten im Funktionscode darstellt.
broker_list (Erforderlich) Die Liste der Kafka-Broker, an die die Ausgabe gesendet wird. Weitere Informationen finden Sie unter Verbindungen.
topic (Erforderlich) Das Thema, an das die Ausgabe gesendet wird.
avroSchema (Optional) Das Schema eines generischen Datensatzes, wenn das Avro-Protokoll verwendet wird.
maxMessageBytes (Optional) Die maximale Größe der gesendeten Ausgabenachricht (in MB) mit einem Standardwert von 1.
batchSize (Optional) Die maximale Anzahl von Nachrichten, die in einem einzelnen Nachrichtensatz mit einem Standardwert von 10000 als Batch zusammengefasst werden.
enableIdempotence (Optional) Bei Festlegung auf true, garantiert, dass Nachrichten erfolgreich genau einmal und in der ursprünglichen Serienreihenfolge erstellt werden, mit einem Standardwert von false.
messageTimeoutMs (Optional) Das lokale Nachrichtentimeout in Millisekunden. Dieser Wert wird nur lokal erzwungen und begrenzt die Zeit, die eine generierte Nachricht mit einem Standardwert von 300000 auf ihre erfolgreiche Übermittlung wartet. Eine Zeitangabe von 0 ist unendlich. Dieser Wert ist die maximale Zeit für die Übermittlung einer Nachricht (einschließlich Wiederholungsversuche). Ein Übermittlungsfehler tritt auf, wenn entweder die Anzahl der Wiederholungsversuche oder das Nachrichtentimeout überschritten wird.
requestTimeoutMs (Optional) Das Bestätigungstimeout der Ausgabeanforderung in Millisekunden mit einem Standardwert von 5000.
maxRetries (Optional) Die Anzahl der Wiederholungsversuche zum Senden einer fehlgeschlagenen Nachricht mit einem Standardwert von 2. Beim Wiederholen kann die Neuanordnung verursacht werden, es sei denn EnableIdempotence , sie ist auf true.
authentication_mode (Optional) Der Authentifizierungsmodus bei Verwendung der SASL-Authentifizierung (Simple Authentication and Security Layer). Die unterstützten Werte sind NOTSET (Standard), Gssapi, Plain, , ScramSha256. ScramSha512
username (Optional) Der Benutzername für die SASL-Authentifizierung. Wird nicht unterstützt, wenn authentication_mode den Wert Gssapi aufweist. Weitere Informationen finden Sie unter Verbindungen.
password (Optional) Das Kennwort für die SASL-Authentifizierung. Wird nicht unterstützt, wenn authentication_mode den Wert Gssapi aufweist. Weitere Informationen finden Sie unter Verbindungen.
protocol (Optional) Das Sicherheitsprotokoll, das beim Kommunizieren mit Brokern verwendet wird. Die unterstützten Werte sind NOTSET (Standard), plaintext, ssl, , sasl_plaintext. sasl_ssl
sslCaLocation (Optional) Der Pfad zur Zertifikatdatei der Zertifizierungsstelle zum Überprüfen des Zertifikats des Brokers.
sslCertificateLocation (Optional) Der Pfad zum Zertifikat des Clients.
sslKeyLocation (Optional) Der Pfad zum privaten Schlüssel (PEM) des Clients, der für die Authentifizierung verwendet wird.
sslKeyPassword (Optional) Das Kennwort für das Zertifikat des Clients.
schema_registry_url (Optional) URL für die Avro-Schemaregistrierung. Weitere Informationen finden Sie unter Verbindungen.
schema_registry_username (Optional) Benutzername für die Avro-Schemaregistrierung. Weitere Informationen finden Sie unter Verbindungen.
schema_registry_password (Optional) Kennwort für die Avro-Schemaregistrierung. Weitere Informationen finden Sie unter Verbindungen.
o_auth_bearer_method (Optional) OAuth Bearer-Methode. Akzeptierte Werte sind oidc und default.
o_auth_bearer_client_id (Optional) Wenn o_auth_bearer_method dieser Wert auf oidc festgelegt ist, gibt dies die OAuth-Bearer-Client-ID an. Weitere Informationen finden Sie unter Verbindungen.
o_auth_bearer_client_secret (Optional) Wenn o_auth_bearer_method dieser Wert auf oidc festgelegt ist, gibt dies den geheimen OAuth-Bearer-Clientschlüssel an. Weitere Informationen finden Sie unter Verbindungen.
o_auth_bearer_scope (Optional) Gibt den Umfang der Zugriffsanforderung an den Broker an.
o_auth_bearer_token_endpoint_url (Optional) OAuth/OIDC issuer token endpoint HTTP(S) URI used to retrieve token when oidc method is used. Weitere Informationen finden Sie unter Verbindungen.

Hinweis

Zertifikat-PEM-bezogene Eigenschaften und Avro-Schlüsseleigenschaften sind in der Python-Bibliothek noch nicht verfügbar.

Verwendung

Schlüssel- und Werttypen funktionieren mit integrierter Avro - und Protobuf-Serialisierung .

Der Offset, die Partition und der Zeitstempel für das Ereignis werden zur Laufzeit generiert. Sie können nur den Wert und die Kopfzeilen innerhalb der Funktion festlegen. Sie legen das Thema in der function.json Datei fest.

Stellen Sie sicher, dass Sie Zugriff auf das Kafka-Thema haben, in dem Sie schreiben möchten. Sie konfigurieren die Bindung mit Zugriffs- und Verbindungsanmeldeinformationen für das Kafka-Thema.

In einem Premium-Plan müssen Sie die Überwachung der Laufzeitskala für die Kafka-Ausgabe aktivieren, um auf mehrere Instanzen zu skalieren. Weitere Informationen finden Sie unter Aktivieren der Laufzeitskalierung.

Eine vollständige Liste der unterstützten host.json-Einstellungen für den Kafka-Trigger finden Sie unter host.json-Einstellungen.

Verbindungen

Speichern Sie alle Verbindungsinformationen, die von Ihren Triggern und Bindungen in Den Anwendungseinstellungen erforderlich sind, nicht in den Bindungsdefinitionen in Ihrem Code. Dieser Leitfaden gilt für Anmeldeinformationen, die Sie niemals in Ihrem Code speichern sollten.

Wichtig

Einstellungen für Anmeldeinformationen müssen auf eine Anwendungseinstellung verweisen. Stellen Sie keine hartcodierten Anmeldeinformationen in Ihrem Code oder Ihren Konfigurationsdateien bereit. Wenn die Ausführung lokal erfolgt, verwenden Sie die Datei local.settings.json für Ihre Anmeldeinformationen, und veröffentlichen Sie die Datei „local.settings.json“ nicht.

Wenn Sie eine Verbindung mit einem verwalteten Kafka-Cluster herstellen, der von Confluent in Azure bereitgestellt wird, können Sie eine der folgenden Authentifizierungsmethoden verwenden.

Hinweis

Bei Verwendung des Flex-Verbrauchsplans werden dateispeicherortbasierte Zertifikatauthentifizierungseigenschaften (SslCaLocation, SslCertificateLocation, SslKeyLocation) nicht unterstützt. Verwenden Sie stattdessen die PEM-basierten Zertifikateigenschaften (SslCaPEM, SslCertificatePEM, , SslKeyPEMSslCertificateandKeyPEM, ) oder speichern Sie Zertifikate in Azure Key Vault.

Schemaspeicher

Um die Schemaregistrierung zu verwenden, die von Confluent in kafka Extension bereitgestellt wird, legen Sie die folgenden Anmeldeinformationen fest:

Einstellung Empfohlener Wert BESCHREIBUNG
SchemaRegistryUrl SchemaRegistryUrl DIE URL des Schemaregistrierungsdiensts, der für die Schemaverwaltung verwendet wird. In der Regel des Formats https://psrc-xyz.us-east-2.aws.confluent.cloud
SchemaRegistryUsername CONFLUENT_API_KEY Benutzername für die grundlegende Authentifizierung in der Schemaregistrierung (falls erforderlich).
SchemaRegistryPassword CONFLUENT_API_SECRET Kennwort für die grundlegende Authentifizierung in der Schemaregistrierung (falls erforderlich).

Benutzername/Kennwortauthentifizierung

Stellen Sie bei der Verwendung dieser Authentifizierungsform sicher, dass Protocol sie entweder SaslPlaintext oder SaslSsl, AuthenticationMode oder auf oder ScramSha256 festgelegt Plainist, oderScramSha512, und wenn das verwendete Zertifizierungsstellenzertifikat vom standardmäßigen ISRG Root X1-Zertifikat abweicht, stellen Sie sicher, dass Sie aktualisieren SslCaLocation oder SslCaPEM.

Einstellung Empfohlener Wert BESCHREIBUNG
BrokerList BootstrapServer Die App-Einstellung namens BootstrapServer enthält den Wert des Bootstrapservers, der auf der Seite mit den Confluent Cloud-Einstellungen gefunden wurde. Der Wert ähnelt xyz-xyzxzy.westeurope.azure.confluent.cloud:9092.
Benutzername ConfluentCloudUsername Die App-Einstellung namens ConfluentCloudUsername enthält den API-Zugriffsschlüssel von der Confluent Cloud-Website.
Kennwort ConfluentCloudPassword Die App-Einstellung namens ConfluentCloudPassword enthält das API-Geheimnis, das von der Confluent Cloud-Website abgerufen wurde.
SslCaPEM SSLCaPemCertificate App-Einstellung, SSLCaPemCertificate die das Zertifizierungsstellenzertifikat als Zeichenfolge im PEM-Format enthält. Der Wert sollte dem Standardformat entsprechen, z. B.: -----BEGIN CERTIFICATE-----\nMII....JQ==\n-----END CERTIFICATE-----.

SSL-Authentifizierung

Stellen Sie sicher, dass Protocol auf SSL eingestellt ist.

Einstellung Empfohlener Wert BESCHREIBUNG
BrokerList BootstrapServer Die App-Einstellung namens BootstrapServer enthält den Wert des Bootstrapservers, der auf der Seite mit den Confluent Cloud-Einstellungen gefunden wurde. Der Wert ähnelt xyz-xyzxzy.westeurope.azure.confluent.cloud:9092.
SslCaPEM SslCaCertificatePem App-Einstellung mit SslCaCertificatePem dem Namen PEM-Wert des Zertifizierungsstellenzertifikats als Zeichenfolge. Der Wert sollte dem Standardformat entsprechen: -----BEGIN CERTIFICATE-----\nMII...JQ==\n-----END CERTIFICATE-----
SslCertificatePEM SslClientCertificatePem App-Einstellung mit SslClientCertificatePem dem Namen PEM-Wert des Clientzertifikats als Zeichenfolge. Der Wert sollte dem Standardformat entsprechen: -----BEGIN CERTIFICATE-----\nMII...JQ==\n-----END CERTIFICATE-----
SslKeyPEM SslClientKeyPem App-Einstellung mit SslClientKeyPem dem Namen PEM-Wert des privaten Clientschlüssels als Zeichenfolge. Der Wert sollte dem Standardformat entsprechen: -----BEGIN PRIVATE KEY-----\nMII...JQ==\n-----END PRIVATE KEY-----
SslCertificateandKeyPEM SslClientCertificateAndKeyPem App-Einstellung mit SslClientCertificateAndKeyPem dem Namen PEM-Wert des Clientzertifikats und des privaten Clientschlüssels, die als Zeichenfolge verkettet sind. Der Wert sollte dem Standardformat entsprechen: -----BEGIN CERTIFICATE-----\nMII....JQ==\n-----END CERTIFICATE-----\n-----BEGIN PRIVATE KEY-----\nMIIE....BM=\n-----END PRIVATE KEY-----
SslKeyPassword SslClientKeyPassword App-Einstellung mit dem Namen SslClientKeyPassword , die das Kennwort für den privaten Schlüssel enthält (falls vorhanden).

OAuth-Authentifizierung

Konfigurieren Sie bei verwendung der OAuth-Authentifizierung die OAuth-bezogenen Eigenschaften in Ihren Bindungsdefinitionen.

Die für diese Einstellungen verwendeten Zeichenfolgenwerte müssen während der lokalen Entwicklung als Anwendungseinstellungen in Azure oder in der Values-Sammlung in der Datei local.settings.json vorhanden sein.

Außerdem sollten Sie die Protocol Bindungsdefinitionen und AuthenticationMode die Bindungsdefinitionen festlegen.

Nächste Schritte