Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
Gebruik de Apache Kafka-trigger in Azure Functions om uw functiecode uit te voeren als reactie op berichten in Kafka-onderwerpen. U kunt ook een Kafka-uitvoerbinding gebruiken om vanuit uw functie naar een onderwerp te schrijven. Zie het overzicht van Apache Kafka-bindingen voor Azure Functions voor meer informatie over de installatie- en configuratiedetails.
Belangrijk
Kafka-bindingen zijn beschikbaar voor Functions in het Flex Consumption-abonnement, elastic Premium-abonnement en toegewezen (App Service)-abonnement. Ze worden alleen ondersteund op versie 4.x van de Functions-runtime.
Opmerking
Het gebruik van de trigger is afhankelijk van de C#-modaliteit die wordt gebruikt in uw functie-app. Dit kan een van de volgende modi zijn:
Een gecompileerde C#-functie die gebruikmaakt van een geïsoleerde werkprocesklassebibliotheek die wordt uitgevoerd in een proces dat gescheiden is van de runtime.
De kenmerken die u gebruikt, zijn afhankelijk van de specifieke gebeurtenisprovider.
In het volgende voorbeeld ziet u een C#-functie die het Kafka-bericht leest en registreert als een Kafka-gebeurtenis:
[Function("KafkaTrigger")]
public static void Run(
[KafkaTrigger("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain,
ConsumerGroup = "$Default")] string eventData, FunctionContext context)
{
var logger = context.GetLogger("KafkaFunction");
logger.LogInformation($"C# Kafka trigger function processed a message: {JObject.Parse(eventData)["Value"]}");
}
Als u gebeurtenissen in een batch wilt ontvangen, gebruikt u een tekenreeksmatrix als invoer, zoals wordt weergegeven in het volgende voorbeeld:
[Function("KafkaTriggerMany")]
public static void Run(
[KafkaTrigger("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain,
ConsumerGroup = "$Default",
IsBatched = true)] string[] events, FunctionContext context)
{
foreach (var kevent in events)
{
var logger = context.GetLogger("KafkaFunction");
logger.LogInformation($"C# Kafka trigger function processed a message: {JObject.Parse(kevent)["Value"]}");
}
De volgende functie registreert het bericht en de headers voor de Kafka-gebeurtenis:
[Function("KafkaTriggerWithHeaders")]
public static void Run(
[KafkaTrigger("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain,
ConsumerGroup = "$Default")] string eventData, FunctionContext context)
{
var eventJsonObject = JObject.Parse(eventData);
var logger = context.GetLogger("KafkaFunction");
logger.LogInformation($"C# Kafka trigger function processed a message: {eventJsonObject["Value"]}");
var headersJArr = eventJsonObject["Headers"] as JArray;
logger.LogInformation("Headers for this event: ");
foreach (JObject header in headersJArr)
{
logger.LogInformation($"{header["Key"]} {System.Text.Encoding.UTF8.GetString((byte[])header["Value"])}");
}
}
Zie de Kafka-extensieopslagplaats voor een volledige set werkende .NET-voorbeelden.
Het gebruik van de trigger is afhankelijk van uw versie van het Node.js programmeermodel.
In het Node.js v4-model definieert u de trigger rechtstreeks in uw functiecode. Zie de ontwikkelaarshandleiding voor Azure Functions Node.js voor meer informatie.
In deze voorbeelden zijn de gebeurtenisproviders Confluent of Azure Event Hubs. Deze voorbeelden laten zien hoe u een Kafka-trigger definieert voor een functie die een Kafka-bericht leest.
const { app } = require("@azure/functions");
async function kafkaTrigger(event, context) {
context.log("Event Offset: " + event.Offset);
context.log("Event Partition: " + event.Partition);
context.log("Event Topic: " + event.Topic);
context.log("Event Timestamp: " + event.Timestamp);
context.log("Event Key: " + event.Key);
context.log("Event Value (as string): " + event.Value);
let event_obj = JSON.parse(event.Value);
context.log("Event Value Object: ");
context.log(" Value.registertime: ", event_obj.registertime.toString());
context.log(" Value.userid: ", event_obj.userid);
context.log(" Value.regionid: ", event_obj.regionid);
context.log(" Value.gender: ", event_obj.gender);
}
app.generic("Kafkatrigger", {
trigger: {
type: "kafkaTrigger",
direction: "in",
name: "event",
topic: "topic",
brokerList: "%BrokerList%",
username: "%ConfluentCloudUserName%",
password: "%ConfluentCloudPassword%",
consumerGroup: "$Default",
protocol: "saslSsl",
authenticationMode: "plain",
dataType: "string"
},
handler: kafkaTrigger,
});
Als u gebeurtenissen in een batch wilt ontvangen, stelt u de cardinality waarde manyin op , zoals wordt weergegeven in deze voorbeelden:
const { app } = require("@azure/functions");
async function kafkaTriggerMany(events, context) {
for (const event of events) {
context.log("Event Offset: " + event.Offset);
context.log("Event Partition: " + event.Partition);
context.log("Event Topic: " + event.Topic);
context.log("Event Key: " + event.Key);
context.log("Event Timestamp: " + event.Timestamp);
context.log("Event Value (as string): " + event.Value);
let event_obj = JSON.parse(event.Value);
context.log("Event Value Object: ");
context.log(" Value.registertime: ", event_obj.registertime.toString());
context.log(" Value.userid: ", event_obj.userid);
context.log(" Value.regionid: ", event_obj.regionid);
context.log(" Value.gender: ", event_obj.gender);
}
}
app.generic("kafkaTriggerMany", {
trigger: {
type: "kafkaTrigger",
direction: "in",
name: "event",
topic: "topic",
brokerList: "%BrokerList%",
username: "%ConfluentCloudUserName%",
password: "%ConfluentCloudPassword%",
consumerGroup: "$Default",
protocol: "saslSsl",
authenticationMode: "plain",
dataType: "string",
cardinality: "MANY"
},
handler: kafkaTriggerMany,
});
U kunt een algemeen Avro-schema definiëren voor de gebeurtenis die aan de trigger is doorgegeven. In dit voorbeeld wordt de trigger voor de specifieke provider gedefinieerd met een algemeen Avro-schema:
const { app } = require("@azure/functions");
async function kafkaAvroGenericTrigger(event, context) {
context.log("Processed kafka event: ", event);
if (context.triggerMetadata?.key !== undefined) {
context.log("message key: ", context.triggerMetadata?.key);
}
}
app.generic("kafkaAvroGenericTrigger", {
trigger: {
type: "kafkaTrigger",
direction: "in",
name: "event",
protocol: "SASLSSL",
password: "EventHubConnectionString",
dataType: "string",
topic: "topic",
authenticationMode: "PLAIN",
avroSchema:
'{"type":"record","name":"Payment","namespace":"io.confluent.examples.clients.basicavro","fields":[{"name":"id","type":"string"},{"name":"amount","type":"double"},{"name":"type","type":"string"}]}',
consumerGroup: "$Default",
username: "$ConnectionString",
brokerList: "%BrokerList%",
},
handler: kafkaAvroGenericTrigger,
});
Zie de Kafka-extensieopslagplaats voor een volledige set werkende JavaScript-voorbeelden.
import { app, InvocationContext } from "@azure/functions";
// This is a sample interface that describes the actual data in your event.
interface EventData {
registertime: number;
userid: string;
regionid: string;
gender: string;
}
export async function kafkaTrigger(
event: any,
context: InvocationContext
): Promise<void> {
context.log("Event Offset: " + event.Offset);
context.log("Event Partition: " + event.Partition);
context.log("Event Topic: " + event.Topic);
context.log("Event Timestamp: " + event.Timestamp);
context.log("Event Value (as string): " + event.Value);
let event_obj: EventData = JSON.parse(event.Value);
context.log("Event Value Object: ");
context.log(" Value.registertime: ", event_obj.registertime.toString());
context.log(" Value.userid: ", event_obj.userid);
context.log(" Value.regionid: ", event_obj.regionid);
context.log(" Value.gender: ", event_obj.gender);
}
app.generic("Kafkatrigger", {
trigger: {
type: "kafkaTrigger",
direction: "in",
name: "event",
topic: "topic",
brokerList: "%BrokerList%",
username: "%ConfluentCloudUserName%",
password: "%ConfluentCloudPassword%",
consumerGroup: "$Default",
protocol: "saslSsl",
authenticationMode: "plain",
dataType: "string"
},
handler: kafkaTrigger,
});
Als u gebeurtenissen in een batch wilt ontvangen, stelt u de cardinality waarde manyin op , zoals wordt weergegeven in deze voorbeelden:
import { app, InvocationContext } from "@azure/functions";
// This is a sample interface that describes the actual data in your event.
interface EventData {
registertime: number;
userid: string;
regionid: string;
gender: string;
}
interface KafkaEvent {
Offset: number;
Partition: number;
Topic: string;
Timestamp: number;
Value: string;
}
export async function kafkaTriggerMany(
events: any,
context: InvocationContext
): Promise<void> {
for (const event of events) {
context.log("Event Offset: " + event.Offset);
context.log("Event Partition: " + event.Partition);
context.log("Event Topic: " + event.Topic);
context.log("Event Timestamp: " + event.Timestamp);
context.log("Event Value (as string): " + event.Value);
let event_obj: EventData = JSON.parse(event.Value);
context.log("Event Value Object: ");
context.log(" Value.registertime: ", event_obj.registertime.toString());
context.log(" Value.userid: ", event_obj.userid);
context.log(" Value.regionid: ", event_obj.regionid);
context.log(" Value.gender: ", event_obj.gender);
}
}
app.generic("kafkaTriggerMany", {
trigger: {
type: "kafkaTrigger",
direction: "in",
name: "event",
topic: "topic",
brokerList: "%BrokerList%",
username: "%ConfluentCloudUserName%",
password: "%ConfluentCloudPassword%",
consumerGroup: "$Default",
protocol: "saslSsl",
authenticationMode: "plain",
dataType: "string",
cardinality: "MANY"
},
handler: kafkaTriggerMany,
});
U kunt een algemeen Avro-schema definiëren voor de gebeurtenis die aan de trigger is doorgegeven. In dit voorbeeld wordt de trigger voor de specifieke provider gedefinieerd met een algemeen Avro-schema:
import { app, InvocationContext } from "@azure/functions";
export async function kafkaAvroGenericTrigger(
event: any,
context: InvocationContext
): Promise<void> {
context.log("Processed kafka event: ", event);
context.log(
`Message ID: ${event.id}, amount: ${event.amount}, type: ${event.type}`
);
if (context.triggerMetadata?.key !== undefined) {
context.log(`Message Key : ${context.triggerMetadata?.key}`);
}
}
app.generic("kafkaAvroGenericTrigger", {
trigger: {
type: "kafkaTrigger",
direction: "in",
name: "event",
protocol: "SASLSSL",
username: "ConfluentCloudUsername",
password: "ConfluentCloudPassword",
dataType: "string",
topic: "topic",
authenticationMode: "PLAIN",
avroSchema:
'{"type":"record","name":"Payment","namespace":"io.confluent.examples.clients.basicavro","fields":[{"name":"id","type":"string"},{"name":"amount","type":"double"},{"name":"type","type":"string"}]}',
consumerGroup: "$Default",
brokerList: "%BrokerList%",
},
handler: kafkaAvroGenericTrigger,
});
Zie de Kafka-extensieopslagplaats voor een volledige set werkende TypeScript-voorbeelden.
De specifieke eigenschappen van het function.json bestand zijn afhankelijk van uw gebeurtenisprovider. In deze voorbeelden zijn de gebeurtenisproviders Confluent of Azure Event Hubs. In de volgende voorbeelden ziet u een Kafka-trigger voor een functie die een Kafka-bericht leest en registreert.
Het volgende function.json bestand definieert de trigger voor de specifieke provider:
{
"bindings": [
{
"type": "kafkaTrigger",
"name": "kafkaEvent",
"direction": "in",
"protocol" : "SASLSSL",
"password" : "%ConfluentCloudPassword%",
"dataType" : "string",
"topic" : "topic",
"authenticationMode" : "PLAIN",
"consumerGroup" : "$Default",
"username" : "%ConfluentCloudUserName%",
"brokerList" : "%BrokerList%",
"sslCaLocation": "confluent_cloud_cacert.pem"
}
]
}
De volgende code wordt uitgevoerd wanneer de functie wordt geactiveerd:
using namespace System.Net
param($kafkaEvent, $TriggerMetadata)
Write-Output "Powershell Kafka trigger function called for message $kafkaEvent.Value"
Als u gebeurtenissen in een batch wilt ontvangen, stelt u de cardinality waarde many in op het function.json-bestand, zoals wordt weergegeven in de volgende voorbeelden:
{
"bindings": [
{
"type": "kafkaTrigger",
"name": "kafkaEvent",
"direction": "in",
"protocol" : "SASLSSL",
"password" : "%ConfluentCloudPassword%",
"dataType" : "string",
"topic" : "topic",
"authenticationMode" : "PLAIN",
"cardinality" : "MANY",
"consumerGroup" : "$Default",
"username" : "%ConfluentCloudUserName%",
"brokerList" : "%BrokerList%",
"sslCaLocation": "confluent_cloud_cacert.pem"
}
]
}
Met de volgende code wordt de matrix van gebeurtenissen geparseerd en worden de gebeurtenisgegevens vastgelegd:
using namespace System.Net
param($kafkaEvents, $TriggerMetadata)
$kafkaEvents
foreach ($kafkaEvent in $kafkaEvents) {
$event = $kafkaEvent | ConvertFrom-Json -AsHashtable
Write-Output "Powershell Kafka trigger function called for message $event.Value"
}
Met de volgende code worden de headergegevens vastgelegd:
using namespace System.Net
param($kafkaEvents, $TriggerMetadata)
foreach ($kafkaEvent in $kafkaEvents) {
$kevent = $kafkaEvent | ConvertFrom-Json -AsHashtable
Write-Output "Powershell Kafka trigger function called for message $kevent.Value"
Write-Output "Headers for this message:"
foreach ($header in $kevent.Headers) {
$DecodedValue = [System.Text.Encoding]::Unicode.GetString([System.Convert]::FromBase64String($header.Value))
$Key = $header.Key
Write-Output "Key: $Key Value: $DecodedValue"
}
}
U kunt een algemeen Avro-schema definiëren voor de gebeurtenis die aan de trigger is doorgegeven. De volgende function.json definieert de trigger voor de specifieke provider met een algemeen Avro-schema:
{
"bindings" : [ {
"type" : "kafkaTrigger",
"direction" : "in",
"name" : "kafkaEvent",
"protocol" : "SASLSSL",
"password" : "ConfluentCloudPassword",
"topic" : "topic",
"authenticationMode" : "PLAIN",
"avroSchema" : "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
"consumerGroup" : "$Default",
"username" : "ConfluentCloudUsername",
"brokerList" : "%BrokerList%"
} ]
}
De volgende code wordt uitgevoerd wanneer de functie wordt geactiveerd:
using namespace System.Net
param($kafkaEvent, $TriggerMetadata)
Write-Output "Powershell Kafka trigger function called for message $kafkaEvent.Value"
Zie de Kafka-extensieopslagplaats voor een volledige set werkende PowerShell-voorbeelden.
Het gebruik van de trigger is afhankelijk van uw versie van het Python-programmeermodel.
In het Python v2-model definieert u de trigger rechtstreeks in uw functiecode met behulp van decorators. Zie de ontwikkelaarshandleiding voor Azure Functions Python voor meer informatie.
Deze voorbeelden laten zien hoe u een Kafka-trigger definieert voor een functie die een Kafka-bericht leest.
@KafkaTrigger.function_name(name="KafkaTrigger")
@KafkaTrigger.kafka_trigger(
arg_name="kevent",
topic="KafkaTopic",
broker_list="KafkaBrokerList",
username="KafkaUsername",
password="KafkaPassword",
protocol="SaslSsl",
authentication_mode="Plain",
consumer_group="$Default1")
def kafka_trigger(kevent : func.KafkaEvent):
logging.info(kevent.get_body().decode('utf-8'))
logging.info(kevent.metadata)
In dit voorbeeld worden gebeurtenissen in een batch ontvangen door de cardinality waarde in te stellen op many.
@KafkaTrigger.function_name(name="KafkaTriggerMany")
@KafkaTrigger.kafka_trigger(
arg_name="kevents",
topic="KafkaTopic",
broker_list="KafkaBrokerList",
username="KafkaUsername",
password="KafkaPassword",
protocol="SaslSsl",
authentication_mode="Plain",
cardinality="MANY",
data_type="string",
consumer_group="$Default2")
def kafka_trigger_many(kevents : typing.List[func.KafkaEvent]):
for event in kevents:
logging.info(event.get_body())
U kunt een algemeen Avro-schema definiëren voor de gebeurtenis die aan de trigger is doorgegeven.
@KafkaTriggerAvro.function_name(name="KafkaTriggerAvroOne")
@KafkaTriggerAvro.kafka_trigger(
arg_name="kafkaTriggerAvroGeneric",
topic="KafkaTopic",
broker_list="KafkaBrokerList",
username="KafkaUsername",
password="KafkaPassword",
protocol="SaslSsl",
authentication_mode="Plain",
consumer_group="$Default",
avro_schema= "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}")
def kafka_trigger_avro_one(kafkaTriggerAvroGeneric : func.KafkaEvent):
logging.info(kafkaTriggerAvroGeneric.get_body().decode('utf-8'))
logging.info(kafkaTriggerAvroGeneric.metadata)
Zie de Kafka-extensieopslagplaats voor een volledige set werkende Python-voorbeelden.
De aantekeningen die u gebruikt om uw trigger te configureren, zijn afhankelijk van de specifieke gebeurtenisprovider.
In het volgende voorbeeld ziet u een Java-functie die de inhoud van de Kafka-gebeurtenis leest en registreert:
@FunctionName("KafkaTrigger")
public void runSingle(
@KafkaTrigger(
name = "KafkaTrigger",
topic = "topic",
brokerList="%BrokerList%",
consumerGroup="$Default",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
protocol = BrokerProtocol.SASLSSL,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
dataType = "string"
) String kafkaEventData,
final ExecutionContext context) {
context.getLogger().info(kafkaEventData);
}
Als u gebeurtenissen in een batch wilt ontvangen, gebruikt u een invoertekenreeks als een matrix, zoals wordt weergegeven in het volgende voorbeeld:
@FunctionName("KafkaTriggerMany")
public void runMany(
@KafkaTrigger(
name = "kafkaTriggerMany",
topic = "topic",
brokerList="%BrokerList%",
consumerGroup="$Default",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
protocol = BrokerProtocol.SASLSSL,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
cardinality = Cardinality.MANY,
dataType = "string"
) String[] kafkaEvents,
final ExecutionContext context) {
for (String kevent: kafkaEvents) {
context.getLogger().info(kevent);
}
}
De volgende functie registreert het bericht en de headers voor de Kafka-gebeurtenis:
@FunctionName("KafkaTriggerManyWithHeaders")
public void runSingle(
@KafkaTrigger(
name = "KafkaTrigger",
topic = "topic",
brokerList="%BrokerList%",
consumerGroup="$Default",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
protocol = BrokerProtocol.SASLSSL,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
dataType = "string",
cardinality = Cardinality.MANY
) List<String> kafkaEvents,
final ExecutionContext context) {
Gson gson = new Gson();
for (String keventstr: kafkaEvents) {
KafkaEntity kevent = gson.fromJson(keventstr, KafkaEntity.class);
context.getLogger().info("Java Kafka trigger function called for message: " + kevent.Value);
context.getLogger().info("Headers for the message:");
for (KafkaHeaders header : kevent.Headers) {
String decodedValue = new String(Base64.getDecoder().decode(header.Value));
context.getLogger().info("Key:" + header.Key + " Value:" + decodedValue);
}
}
}
U kunt een algemeen Avro-schema definiëren voor de gebeurtenis die aan de trigger is doorgegeven. De volgende functie definieert een trigger voor de specifieke provider met een algemeen Avro-schema:
private static final String schema = "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}";
@FunctionName("KafkaAvroGenericTrigger")
public void runOne(
@KafkaTrigger(
name = "kafkaAvroGenericSingle",
topic = "topic",
brokerList="%BrokerList%",
consumerGroup="$Default",
username = "ConfluentCloudUsername",
password = "ConfluentCloudPassword",
avroSchema = schema,
authenticationMode = BrokerAuthenticationMode.PLAIN,
protocol = BrokerProtocol.SASLSSL) Payment payment,
final ExecutionContext context) {
context.getLogger().info(payment.toString());
}
Zie de Kafka-extensieopslagplaats voor een volledige set werkende Java-voorbeelden voor Confluent.
Kenmerken
C #-bibliotheken voor zowel in-process - als geïsoleerde werkprocessen maken gebruik van de KafkaTriggerAttribute functietrigger om de functietrigger te definiëren.
In de volgende tabel worden de eigenschappen uitgelegd die u kunt instellen met behulp van dit triggerkenmerk:
| Parameter | Description |
|---|---|
| BrokerList | (Vereist) De lijst met Kafka-brokers die worden bewaakt door de trigger. Zie Verbindingen voor meer informatie. |
| Onderwerp | (Vereist) Het onderwerp dat wordt bewaakt door de trigger. |
| ConsumerGroup | (Optioneel) Kafka-consumentengroep die door de trigger wordt gebruikt. |
| AvroSchema | (Optioneel) Schema van een algemene record van berichtwaarde bij gebruik van het Avro-protocol. |
| KeyAvroSchema | (Optioneel) Schema van een algemene record van berichtsleutel bij gebruik van het Avro-protocol. |
| KeyDataType | (Optioneel) Gegevenstype voor het ontvangen van de berichtsleutel vanaf Kafka-onderwerp. Als KeyAvroSchema deze waarde is ingesteld, is deze waarde een algemene record. Geaccepteerde waarden zijnInt, Long, en BinaryString. |
| AuthenticationMode | (Optioneel) De verificatiemodus bij het gebruik van SASL-verificatie (Simple Authentication and Security Layer). De ondersteunde waarden zijn NotSet (standaard), Gssapi, Plain, ScramSha256, en ScramSha512OAuthBearer. |
| Gebruikersnaam | (Optioneel) De gebruikersnaam voor SASL-verificatie. Niet ondersteund wanneer AuthenticationMode .Gssapi Zie Verbindingen voor meer informatie. |
| Wachtwoord | (Optioneel) Het wachtwoord voor SASL-verificatie. Niet ondersteund wanneer AuthenticationMode .Gssapi Zie Verbindingen voor meer informatie. |
| Protocol | (Optioneel) Het beveiligingsprotocol dat wordt gebruikt bij het communiceren met brokers. De ondersteunde waarden zijn NotSet (standaard), plaintext, ssl, sasl_plaintext, . sasl_ssl |
| SslCaLocation | (Optioneel) Pad naar ca-certificaatbestand voor het verifiëren van het certificaat van de broker. |
| SslCertificateLocation | (Optioneel) Pad naar het certificaat van de client. |
| SslKeyLocation | (Optioneel) Pad naar de persoonlijke sleutel van de client (PEM) die wordt gebruikt voor verificatie. |
| SslKeyPassword | (Optioneel) Wachtwoord voor het certificaat van de client. |
| SslCertificatePEM | (Optioneel) Clientcertificaat in PEM-indeling als een tekenreeks. Zie Verbindingen voor meer informatie. |
| SslKeyPEM | (Optioneel) Persoonlijke clientsleutel in PEM-indeling als een tekenreeks. Zie Verbindingen voor meer informatie. |
| SslCaPEM | (Optioneel) CA-certificaat in PEM-indeling als een tekenreeks. Zie Verbindingen voor meer informatie. |
| SslCertificateandKeyPEM | (Optioneel) Clientcertificaat en -sleutel in PEM-indeling als tekenreeks. Zie Verbindingen voor meer informatie. |
| SchemaRegistryUrl | (Optioneel) URL voor het Avro-schemaregister. Zie Verbindingen voor meer informatie. |
| SchemaRegistryUsername | (Optioneel) Gebruikersnaam voor het Avro-schemaregister. Zie Verbindingen voor meer informatie. |
| SchemaRegistryPassword | (Optioneel) Wachtwoord voor het Avro-schemaregister. Zie Verbindingen voor meer informatie. |
| OAuthBearerMethod | (Optioneel) OAuth Bearer-methode. Geaccepteerde waarden zijn oidc en default. |
| OAuthBearerClientId | (Optioneel) Wanneer OAuthBearerMethod dit is ingesteld oidcop, geeft dit de OAuth bearer-client-id op. Zie Verbindingen voor meer informatie. |
| OAuthBearerClientSecret | (Optioneel) Als OAuthBearerMethod dit is ingesteld oidcop, geeft dit het OAuth bearer-clientgeheim aan. Zie Verbindingen voor meer informatie. |
| OAuthBearerScope | (Optioneel) Hiermee geeft u het bereik van de toegangsaanvraag voor de broker. |
| OAuthBearerTokenEndpointUrl | (Optioneel) OAuth/OIDC issuer token endpoint HTTP(S) URI gebruikt om token op te halen wanneer oidc de methode wordt gebruikt. Zie Verbindingen voor meer informatie. |
| OAuthBearerExtensions | (Optioneel) Door komma's gescheiden lijst met sleutel-waardeparen die moeten worden verstrekt als aanvullende informatie voor broker wanneer oidc de methode wordt gebruikt. Voorbeeld: supportFeatureX=true,organizationId=sales-emea. |
Aantekeningen
Met de KafkaTrigger aantekening kunt u een functie maken die wordt uitgevoerd wanneer een onderwerp wordt ontvangen. Ondersteunde opties omvatten de volgende elementen:
| Element | Description |
|---|---|
| name | (Vereist) De naam van de variabele die de wachtrij of het onderwerpbericht in functiecode vertegenwoordigt. |
| brokerList | (Vereist) De lijst met Kafka-brokers die worden bewaakt door de trigger. Zie Verbindingen voor meer informatie. |
| onderwerp | (Vereist) Het onderwerp dat wordt bewaakt door de trigger. |
| cardinality | (Optioneel) Geeft de kardinaliteit van de triggerinvoer aan. De ondersteunde waarden zijn ONE (standaard) en MANY. Gebruik ONE wanneer de invoer één bericht is en MANY wanneer de invoer een matrix van berichten is. Wanneer u gebruikt MANY, moet u ook een dataType. |
| Datatype | Definieert hoe Functions de parameterwaarde verwerkt. De waarde wordt standaard verkregen als een tekenreeks en Functions probeert de tekenreeks te deserialiseren naar een echt normaal oud Java-object (POJO). Wanneer string, wordt de invoer behandeld als alleen een tekenreeks. Wanneer binary, het bericht wordt ontvangen als binaire gegevens en Functions probeert het te deserialiseren naar een werkelijke parametertype byte[]. |
| consumerGroup | (Optioneel) Kafka-consumentengroep die door de trigger wordt gebruikt. |
| avroSchema | (Optioneel) Schema van een algemene record bij gebruik van het Avro-protocol. |
| authenticationMode | (Optioneel) De verificatiemodus bij het gebruik van SASL-verificatie (Simple Authentication and Security Layer). De ondersteunde waarden zijn NotSet (standaard), Gssapi, Plain, ScramSha256, . ScramSha512 |
| gebruikersnaam | (Optioneel) De gebruikersnaam voor SASL-verificatie. Niet ondersteund wanneer AuthenticationMode .Gssapi Zie Verbindingen voor meer informatie. |
| password | (Optioneel) Het wachtwoord voor SASL-verificatie. Niet ondersteund wanneer AuthenticationMode .Gssapi Zie Verbindingen voor meer informatie. |
| protocol | (Optioneel) Het beveiligingsprotocol dat wordt gebruikt bij het communiceren met brokers. De ondersteunde waarden zijn NotSet (standaard), plaintext, ssl, sasl_plaintext, . sasl_ssl |
| sslCaLocation | (Optioneel) Pad naar ca-certificaatbestand voor het verifiëren van het certificaat van de broker. |
| sslCertificateLocation | (Optioneel) Pad naar het certificaat van de client. |
| sslKeyLocation | (Optioneel) Pad naar de persoonlijke sleutel van de client (PEM) die wordt gebruikt voor verificatie. |
| sslKeyPassword | (Optioneel) Wachtwoord voor het certificaat van de client. |
| lagThreshold | (Optioneel) Vertragingsdrempel voor de trigger. |
| schemaRegistryUrl | (Optioneel) URL voor het Avro-schemaregister. Zie Verbindingen voor meer informatie. |
| schemaRegistryUsername | (Optioneel) Gebruikersnaam voor het Avro-schemaregister. Zie Verbindingen voor meer informatie. |
| schemaRegistryPassword | (Optioneel) Wachtwoord voor het Avro-schemaregister. Zie Verbindingen voor meer informatie. |
Configuratie
In de volgende tabel worden de bindingsconfiguratie-eigenschappen uitgelegd die u in het function.json-bestand hebt ingesteld.
| eigenschap function.json | Description |
|---|---|
| type | (Vereist) Ingesteld op kafkaTrigger. |
| direction | (Vereist) Ingesteld op in. |
| name | (Vereist) De naam van de variabele die de brokergegevens in functiecode vertegenwoordigt. |
| brokerList | (Vereist) De lijst met Kafka-brokers die worden bewaakt door de trigger. Zie Verbindingen voor meer informatie. |
| onderwerp | (Vereist) Het onderwerp dat wordt bewaakt door de trigger. |
| cardinality | (Optioneel) Geeft de kardinaliteit van de triggerinvoer aan. De ondersteunde waarden zijn ONE (standaard) en MANY. Gebruik ONE wanneer de invoer één bericht is en MANY wanneer de invoer een matrix van berichten is. Wanneer u gebruikt MANY, moet u ook een dataType. |
| Datatype | Definieert hoe Functions de parameterwaarde verwerkt. De waarde wordt standaard verkregen als een tekenreeks en Functions probeert de tekenreeks te deserialiseren naar een echt normaal oud Java-object (POJO). Wanneer string, wordt de invoer behandeld als alleen een tekenreeks. Wanneer binary, het bericht wordt ontvangen als binaire gegevens en Functions probeert het te deserialiseren naar een werkelijk bytematrixparametertype. |
| consumerGroup | (Optioneel) Kafka-consumentengroep die door de trigger wordt gebruikt. |
| avroSchema | (Optioneel) Schema van een algemene record bij gebruik van het Avro-protocol. |
| keyAvroSchema | (Optioneel) Schema van een algemene record van berichtsleutel bij gebruik van het Avro-protocol. |
| keyDataType | (Optioneel) Gegevenstype voor het ontvangen van de berichtsleutel vanaf Kafka-onderwerp. Als keyAvroSchema deze waarde is ingesteld, is deze waarde een algemene record. Geaccepteerde waarden zijnInt, Long, en BinaryString. |
| authenticationMode | (Optioneel) De verificatiemodus bij het gebruik van SASL-verificatie (Simple Authentication and Security Layer). De ondersteunde waarden zijn NotSet (standaard), Gssapi, Plain, ScramSha256, . ScramSha512 |
| gebruikersnaam | (Optioneel) De gebruikersnaam voor SASL-verificatie. Niet ondersteund wanneer AuthenticationMode .Gssapi Zie Verbindingen voor meer informatie. |
| password | (Optioneel) Het wachtwoord voor SASL-verificatie. Niet ondersteund wanneer AuthenticationMode .Gssapi Zie Verbindingen voor meer informatie. |
| protocol | (Optioneel) Het beveiligingsprotocol dat wordt gebruikt bij het communiceren met brokers. De ondersteunde waarden zijn NotSet (standaard), plaintext, ssl, sasl_plaintext, . sasl_ssl |
| sslCaLocation | (Optioneel) Pad naar ca-certificaatbestand voor het verifiëren van het certificaat van de broker. |
| sslCertificateLocation | (Optioneel) Pad naar het certificaat van de client. |
| sslKeyLocation | (Optioneel) Pad naar de persoonlijke sleutel van de client (PEM) die wordt gebruikt voor verificatie. |
| sslKeyPassword | (Optioneel) Wachtwoord voor het certificaat van de client. |
| sslCertificatePEM | (Optioneel) Clientcertificaat in PEM-indeling als een tekenreeks. Zie Verbindingen voor meer informatie. |
| sslKeyPEM | (Optioneel) Persoonlijke clientsleutel in PEM-indeling als een tekenreeks. Zie Verbindingen voor meer informatie. |
| sslCaPEM | (Optioneel) CA-certificaat in PEM-indeling als een tekenreeks. Zie Verbindingen voor meer informatie. |
| sslCertificateandKeyPEM | (Optioneel) Clientcertificaat en -sleutel in PEM-indeling als tekenreeks. Zie Verbindingen voor meer informatie. |
| lagThreshold | (Optioneel) Vertragingsdrempel voor de trigger. |
| schemaRegistryUrl | (Optioneel) URL voor het Avro-schemaregister. Zie Verbindingen voor meer informatie. |
| schemaRegistryUsername | (Optioneel) Gebruikersnaam voor het Avro-schemaregister. Zie Verbindingen voor meer informatie. |
| schemaRegistryPassword | (Optioneel) Wachtwoord voor het Avro-schemaregister. Zie Verbindingen voor meer informatie. |
| oAuthBearerMethod | (Optioneel) OAuth Bearer-methode. Geaccepteerde waarden zijn oidc en default. |
| oAuthBearerClientId | (Optioneel) Wanneer oAuthBearerMethod dit is ingesteld oidcop, geeft dit de OAuth bearer-client-id op. Zie Verbindingen voor meer informatie. |
| oAuthBearerClientSecret | (Optioneel) Als oAuthBearerMethod dit is ingesteld oidcop, geeft dit het OAuth bearer-clientgeheim aan. Zie Verbindingen voor meer informatie. |
| oAuthBearerScope | (Optioneel) Hiermee geeft u het bereik van de toegangsaanvraag voor de broker. |
| oAuthBearerTokenEndpointUrl | (Optioneel) OAuth/OIDC issuer token endpoint HTTP(S) URI gebruikt om token op te halen wanneer oidc de methode wordt gebruikt. Zie Verbindingen voor meer informatie. |
Configuratie
In de volgende tabel worden de bindingsconfiguratie-eigenschappen uitgelegd die u in het function.json-bestand hebt ingesteld. Python gebruikt snake_case naamconventies voor configuratie-eigenschappen.
| eigenschap function.json | Description |
|---|---|
| type | (Vereist) Ingesteld op kafkaTrigger. |
| direction | (Vereist) Ingesteld op in. |
| name | (Vereist) De naam van de variabele die de brokergegevens in functiecode vertegenwoordigt. |
| broker_list | (Vereist) De lijst met Kafka-brokers die worden bewaakt door de trigger. Zie Verbindingen voor meer informatie. |
| onderwerp | (Vereist) Het onderwerp dat wordt bewaakt door de trigger. |
| cardinality | (Optioneel) Geeft de kardinaliteit van de triggerinvoer aan. De ondersteunde waarden zijn ONE (standaard) en MANY. Gebruik ONE wanneer de invoer één bericht is en MANY wanneer de invoer een matrix van berichten is. Wanneer u gebruikt MANY, moet u ook een data_type. |
| data_type | Definieert hoe Functions de parameterwaarde verwerkt. De waarde wordt standaard verkregen als een tekenreeks en Functions probeert de tekenreeks te deserialiseren naar een echt normaal oud Java-object (POJO). Wanneer string, wordt de invoer behandeld als alleen een tekenreeks. Wanneer binary, het bericht wordt ontvangen als binaire gegevens en Functions probeert het te deserialiseren naar een werkelijke parametertype byte[]. |
| consumerGroup | (Optioneel) Kafka-consumentengroep die door de trigger wordt gebruikt. |
| avroSchema | (Optioneel) Schema van een algemene record bij gebruik van het Avro-protocol. |
| authentication_mode | (Optioneel) De verificatiemodus bij het gebruik van SASL-verificatie (Simple Authentication and Security Layer). De ondersteunde waarden zijn NOTSET (standaard), Gssapi, Plain, ScramSha256, . ScramSha512 |
| gebruikersnaam | (Optioneel) De gebruikersnaam voor SASL-verificatie. Niet ondersteund wanneer authentication_mode .Gssapi Zie Verbindingen voor meer informatie. |
| password | (Optioneel) Het wachtwoord voor SASL-verificatie. Niet ondersteund wanneer authentication_mode .Gssapi Zie Verbindingen voor meer informatie. |
| protocol | (Optioneel) Het beveiligingsprotocol dat wordt gebruikt bij het communiceren met brokers. De ondersteunde waarden zijn NOTSET (standaard), plaintext, ssl, sasl_plaintext, . sasl_ssl |
| sslCaLocation | (Optioneel) Pad naar ca-certificaatbestand voor het verifiëren van het certificaat van de broker. |
| sslCertificateLocation | (Optioneel) Pad naar het certificaat van de client. |
| sslKeyLocation | (Optioneel) Pad naar de persoonlijke sleutel van de client (PEM) die wordt gebruikt voor verificatie. |
| sslKeyPassword | (Optioneel) Wachtwoord voor het certificaat van de client. |
| lag_threshold | (Optioneel) Vertragingsdrempel voor de trigger. |
| schema_registry_url | (Optioneel) URL voor het Avro-schemaregister. Zie Verbindingen voor meer informatie. |
| schema_registry_username | (Optioneel) Gebruikersnaam voor het Avro-schemaregister. Zie Verbindingen voor meer informatie. |
| schema_registry_password | (Optioneel) Wachtwoord voor het Avro-schemaregister. Zie Verbindingen voor meer informatie. |
| o_auth_bearer_method | (Optioneel) OAuth Bearer-methode. Geaccepteerde waarden zijn oidc en default. |
| o_auth_bearer_client_id | (Optioneel) Wanneer o_auth_bearer_method dit is ingesteld oidcop, geeft dit de OAuth bearer-client-id op. Zie Verbindingen voor meer informatie. |
| o_auth_bearer_client_secret | (Optioneel) Als o_auth_bearer_method dit is ingesteld oidcop, geeft dit het OAuth bearer-clientgeheim aan. Zie Verbindingen voor meer informatie. |
| o_auth_bearer_scope | (Optioneel) Hiermee geeft u het bereik van de toegangsaanvraag voor de broker. |
| o_auth_bearer_token_endpoint_url | (Optioneel) OAuth/OIDC issuer token endpoint HTTP(S) URI gebruikt om token op te halen wanneer oidc de methode wordt gebruikt. Zie Verbindingen voor meer informatie. |
Notitie
Certificaat-PEM-gerelateerde eigenschappen en Avro-sleuteleigenschappen zijn nog niet beschikbaar in de Python-bibliotheek.
Gebruik
De Kafka-trigger ondersteunt momenteel Kafka-gebeurtenissen als tekenreeksen en tekenreeksmatrices die JSON-nettoladingen zijn.
De Kafka-trigger geeft Kafka-berichten door aan de functie als tekenreeksen. De trigger ondersteunt ook tekenreeksmatrices die JSON-nettoladingen zijn.
In een Premium-abonnement moet u runtimeschaalbewaking inschakelen voor de Kafka-uitvoer om uit te schalen naar meerdere exemplaren. Zie Schalen van runtime inschakelen voor meer informatie.
U kunt de functie Testen/uitvoeren van de pagina Code en test in Azure Portal niet gebruiken om te werken met Kafka-triggers. U moet in plaats daarvan testgebeurtenissen rechtstreeks verzenden naar het onderwerp dat wordt bewaakt door de trigger.
Zie host.json instellingen voor een volledige set ondersteunde host.json instellingen voor de Kafka-trigger.
Connecties
Sla alle verbindingsgegevens op die vereist zijn voor uw triggers en bindingen in toepassingsinstellingen, niet in de bindingsdefinities in uw code. Deze richtlijnen zijn van toepassing op referenties, die u nooit in uw code moet opslaan.
Belangrijk
Referentie-instellingen moeten verwijzen naar een toepassingsinstelling. Gebruik geen codereferenties in uw code- of configuratiebestanden. Wanneer u lokaal werkt, gebruikt u het local.settings.json-bestand voor uw referenties en publiceert u het local.settings.json bestand niet.
Wanneer u verbinding maakt met een beheerd Kafka-cluster dat wordt geleverd door Confluent in Azure, kunt u een van de volgende verificatiemethoden gebruiken.
Notitie
Wanneer u het Flex Consumption-abonnement gebruikt, worden certificaatverificatie-eigenschappen op basis van bestandslocatie (SslCaLocation, SslCertificateLocation, SslKeyLocation) niet ondersteund. Gebruik in plaats daarvan de op PEM gebaseerde certificaateigenschappen (SslCaPEM, SslCertificatePEM, , SslKeyPEM) SslCertificateandKeyPEMof sla certificaten op in Azure Key Vault.
Schemaregister
Als u gebruik wilt maken van het schemaregister dat wordt geleverd door Confluent in de Kafka-extensie, stelt u de volgende referenties in:
| Instelling | Aanbevolen waarde | Description |
|---|---|---|
| SchemaRegistryUrl | SchemaRegistryUrl |
URL van de schemaregisterservice die wordt gebruikt voor schemabeheer. Meestal van de indeling https://psrc-xyz.us-east-2.aws.confluent.cloud |
| SchemaRegistryUsername | CONFLUENT_API_KEY |
Gebruikersnaam voor basisverificatie in het schemaregister (indien nodig). |
| SchemaRegistryPassword | CONFLUENT_API_SECRET |
Wachtwoord voor basisverificatie in schemaregister (indien nodig). |
Gebruikersnaam/wachtwoordverificatie
Zorg er tijdens het gebruik van deze vorm van verificatie voor dat deze is ingesteld op of SaslPlaintext , is ingesteld op Plain, ScramSha256 of ScramSha512 als het CA-certificaat dat Protocol wordt gebruikt verschilt van het standaard ISRG Root X1-certificaat, moet u ervoor zorgen dat u dit SslCaLocation bijwerkt of SslCaPEM. AuthenticationModeSaslSsl
| Instelling | Aanbevolen waarde | Description |
|---|---|---|
| BrokerList | BootstrapServer |
De app-instelling met de naam BootstrapServer bevat de waarde van de bootstrap-server die is gevonden op de pagina met Confluent Cloud-instellingen. De waarde lijkt op xyz-xyzxzy.westeurope.azure.confluent.cloud:9092. |
| Gebruikersnaam | ConfluentCloudUsername |
App-instelling met de naam ConfluentCloudUsername bevat de API-toegangssleutel van de Confluent Cloud-website. |
| Wachtwoord | ConfluentCloudPassword |
App-instelling met de naam ConfluentCloudPassword bevat het API-geheim dat is verkregen van de Confluent Cloud-website. |
| SslCaPEM | SSLCaPemCertificate |
App-instelling met de naam SSLCaPemCertificate die het CA-certificaat bevat als een tekenreeks in PEM-indeling. De waarde moet de standaardindeling volgen, bijvoorbeeld: -----BEGIN CERTIFICATE-----\nMII....JQ==\n-----END CERTIFICATE-----. |
SSL-verificatie
Zorg ervoor dat deze Protocol is ingesteld op SSL.
| Instelling | Aanbevolen waarde | Description |
|---|---|---|
| BrokerList | BootstrapServer |
De app-instelling met de naam BootstrapServer bevat de waarde van de bootstrap-server die is gevonden op de pagina met Confluent Cloud-instellingen. De waarde lijkt op xyz-xyzxzy.westeurope.azure.confluent.cloud:9092. |
| SslCaPEM | SslCaCertificatePem |
App-instelling met de naam SslCaCertificatePem PEM-waarde van het CA-certificaat als tekenreeks. De waarde moet de standaardindeling volgen: -----BEGIN CERTIFICATE-----\nMII...JQ==\n-----END CERTIFICATE----- |
| SslCertificatePEM | SslClientCertificatePem |
App-instelling met de naam SslClientCertificatePem PEM-waarde van het clientcertificaat als tekenreeks. De waarde moet de standaardindeling volgen: -----BEGIN CERTIFICATE-----\nMII...JQ==\n-----END CERTIFICATE----- |
| SslKeyPEM | SslClientKeyPem |
App-instelling met de naam SslClientKeyPem PEM-waarde van de persoonlijke sleutel van de client als een tekenreeks. De waarde moet de standaardindeling volgen: -----BEGIN PRIVATE KEY-----\nMII...JQ==\n-----END PRIVATE KEY----- |
| SslCertificateandKeyPEM | SslClientCertificateAndKeyPem |
App-instelling met de naam SslClientCertificateAndKeyPem PEM-waarde van het clientcertificaat en de persoonlijke sleutel van de client die is samengevoegd als een tekenreeks. De waarde moet de standaardindeling volgen: -----BEGIN CERTIFICATE-----\nMII....JQ==\n-----END CERTIFICATE-----\n-----BEGIN PRIVATE KEY-----\nMIIE....BM=\n-----END PRIVATE KEY----- |
| SslKeyPassword | SslClientKeyPassword |
App-instelling met de naam SslClientKeyPassword die het wachtwoord voor de persoonlijke sleutel bevat (indien van toepassing). |
OAuth-verificatie
Wanneer u OAuth-verificatie gebruikt, configureert u de OAuth-gerelateerde eigenschappen in uw bindingsdefinities.
De tekenreekswaarden die u voor deze instellingen gebruikt, moeten aanwezig zijn als toepassingsinstellingen in Azure of in de Values verzameling in het local.settings.json-bestand tijdens lokale ontwikkeling.
U moet ook de Protocol bindingsdefinities AuthenticationMode instellen.