你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn。
使用 Azure Functions 中的 Apache Kafka 触发器运行函数代码,以响应 Kafka 主题中的消息。 还可以使用 Kafka 输出绑定从函数写入主题。 若要了解设置和配置详细信息,请参阅适用于 Azure Functions 的 Apache Kafka 绑定概述。
重要
Kafka 绑定适用于弹性消耗计划、弹性高级计划和专用(应用服务)计划的 Functions。 它们仅在 Functions 运行时版本 4.x 上受支持。
示例
触发器的用法取决于函数应用中使用的 C# 模态,后者可以是以下模式之一:
所使用的特性取决于特定的事件提供程序。
以下示例显示了一个 C# 函数,该函数将 Kafka 消息作为 Kafka 事件进行读取和记录:
[Function("KafkaTrigger")]
public static void Run(
[KafkaTrigger("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain,
ConsumerGroup = "$Default")] string eventData, FunctionContext context)
{
var logger = context.GetLogger("KafkaFunction");
logger.LogInformation($"C# Kafka trigger function processed a message: {JObject.Parse(eventData)["Value"]}");
}
若要批量接收事件,请使用字符串数组作为输入,如以下示例所示:
[Function("KafkaTriggerMany")]
public static void Run(
[KafkaTrigger("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain,
ConsumerGroup = "$Default",
IsBatched = true)] string[] events, FunctionContext context)
{
foreach (var kevent in events)
{
var logger = context.GetLogger("KafkaFunction");
logger.LogInformation($"C# Kafka trigger function processed a message: {JObject.Parse(kevent)["Value"]}");
}
以下函数记录 Kafka 事件的消息和标头:
[Function("KafkaTriggerWithHeaders")]
public static void Run(
[KafkaTrigger("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain,
ConsumerGroup = "$Default")] string eventData, FunctionContext context)
{
var eventJsonObject = JObject.Parse(eventData);
var logger = context.GetLogger("KafkaFunction");
logger.LogInformation($"C# Kafka trigger function processed a message: {eventJsonObject["Value"]}");
var headersJArr = eventJsonObject["Headers"] as JArray;
logger.LogInformation("Headers for this event: ");
foreach (JObject header in headersJArr)
{
logger.LogInformation($"{header["Key"]} {System.Text.Encoding.UTF8.GetString((byte[])header["Value"])}");
}
}
有关完整的一组可用 .NET 示例,请参阅 Kafka 扩展存储库。
触发器的使用取决于 Node.js 编程模型的版本。
在 Node.js v4 模型中,直接在函数代码中定义触发器。 有关详细信息,请参阅 Azure Functions Node.js 开发人员指南。
在这些示例中,事件提供程序为 Confluent 或 Azure 事件中心。 这些示例演示如何为读取 Kafka 消息的函数定义 Kafka 触发器。
const { app } = require("@azure/functions");
async function kafkaTrigger(event, context) {
context.log("Event Offset: " + event.Offset);
context.log("Event Partition: " + event.Partition);
context.log("Event Topic: " + event.Topic);
context.log("Event Timestamp: " + event.Timestamp);
context.log("Event Key: " + event.Key);
context.log("Event Value (as string): " + event.Value);
let event_obj = JSON.parse(event.Value);
context.log("Event Value Object: ");
context.log(" Value.registertime: ", event_obj.registertime.toString());
context.log(" Value.userid: ", event_obj.userid);
context.log(" Value.regionid: ", event_obj.regionid);
context.log(" Value.gender: ", event_obj.gender);
}
app.generic("Kafkatrigger", {
trigger: {
type: "kafkaTrigger",
direction: "in",
name: "event",
topic: "topic",
brokerList: "%BrokerList%",
username: "%ConfluentCloudUserName%",
password: "%ConfluentCloudPassword%",
consumerGroup: "$Default",
protocol: "saslSsl",
authenticationMode: "plain",
dataType: "string"
},
handler: kafkaTrigger,
});
若要在批处理中接收事件,请将 cardinality 值设置为 many,如以下示例所示:
const { app } = require("@azure/functions");
async function kafkaTriggerMany(events, context) {
for (const event of events) {
context.log("Event Offset: " + event.Offset);
context.log("Event Partition: " + event.Partition);
context.log("Event Topic: " + event.Topic);
context.log("Event Key: " + event.Key);
context.log("Event Timestamp: " + event.Timestamp);
context.log("Event Value (as string): " + event.Value);
let event_obj = JSON.parse(event.Value);
context.log("Event Value Object: ");
context.log(" Value.registertime: ", event_obj.registertime.toString());
context.log(" Value.userid: ", event_obj.userid);
context.log(" Value.regionid: ", event_obj.regionid);
context.log(" Value.gender: ", event_obj.gender);
}
}
app.generic("kafkaTriggerMany", {
trigger: {
type: "kafkaTrigger",
direction: "in",
name: "event",
topic: "topic",
brokerList: "%BrokerList%",
username: "%ConfluentCloudUserName%",
password: "%ConfluentCloudPassword%",
consumerGroup: "$Default",
protocol: "saslSsl",
authenticationMode: "plain",
dataType: "string",
cardinality: "MANY"
},
handler: kafkaTriggerMany,
});
可以为传递给触发器的事件定义一个通用的 Avro 架构。 此示例使用通用 Avro 架构定义特定提供程序的触发器:
const { app } = require("@azure/functions");
async function kafkaAvroGenericTrigger(event, context) {
context.log("Processed kafka event: ", event);
if (context.triggerMetadata?.key !== undefined) {
context.log("message key: ", context.triggerMetadata?.key);
}
}
app.generic("kafkaAvroGenericTrigger", {
trigger: {
type: "kafkaTrigger",
direction: "in",
name: "event",
protocol: "SASLSSL",
password: "EventHubConnectionString",
dataType: "string",
topic: "topic",
authenticationMode: "PLAIN",
avroSchema:
'{"type":"record","name":"Payment","namespace":"io.confluent.examples.clients.basicavro","fields":[{"name":"id","type":"string"},{"name":"amount","type":"double"},{"name":"type","type":"string"}]}',
consumerGroup: "$Default",
username: "$ConnectionString",
brokerList: "%BrokerList%",
},
handler: kafkaAvroGenericTrigger,
});
有关完整的一组可用 JavaScript 示例,请参阅 Kafka 扩展存储库。
import { app, InvocationContext } from "@azure/functions";
// This is a sample interface that describes the actual data in your event.
interface EventData {
registertime: number;
userid: string;
regionid: string;
gender: string;
}
export async function kafkaTrigger(
event: any,
context: InvocationContext
): Promise<void> {
context.log("Event Offset: " + event.Offset);
context.log("Event Partition: " + event.Partition);
context.log("Event Topic: " + event.Topic);
context.log("Event Timestamp: " + event.Timestamp);
context.log("Event Value (as string): " + event.Value);
let event_obj: EventData = JSON.parse(event.Value);
context.log("Event Value Object: ");
context.log(" Value.registertime: ", event_obj.registertime.toString());
context.log(" Value.userid: ", event_obj.userid);
context.log(" Value.regionid: ", event_obj.regionid);
context.log(" Value.gender: ", event_obj.gender);
}
app.generic("Kafkatrigger", {
trigger: {
type: "kafkaTrigger",
direction: "in",
name: "event",
topic: "topic",
brokerList: "%BrokerList%",
username: "%ConfluentCloudUserName%",
password: "%ConfluentCloudPassword%",
consumerGroup: "$Default",
protocol: "saslSsl",
authenticationMode: "plain",
dataType: "string"
},
handler: kafkaTrigger,
});
若要在批处理中接收事件,请将 cardinality 值设置为 many,如以下示例所示:
import { app, InvocationContext } from "@azure/functions";
// This is a sample interface that describes the actual data in your event.
interface EventData {
registertime: number;
userid: string;
regionid: string;
gender: string;
}
interface KafkaEvent {
Offset: number;
Partition: number;
Topic: string;
Timestamp: number;
Value: string;
}
export async function kafkaTriggerMany(
events: any,
context: InvocationContext
): Promise<void> {
for (const event of events) {
context.log("Event Offset: " + event.Offset);
context.log("Event Partition: " + event.Partition);
context.log("Event Topic: " + event.Topic);
context.log("Event Timestamp: " + event.Timestamp);
context.log("Event Value (as string): " + event.Value);
let event_obj: EventData = JSON.parse(event.Value);
context.log("Event Value Object: ");
context.log(" Value.registertime: ", event_obj.registertime.toString());
context.log(" Value.userid: ", event_obj.userid);
context.log(" Value.regionid: ", event_obj.regionid);
context.log(" Value.gender: ", event_obj.gender);
}
}
app.generic("kafkaTriggerMany", {
trigger: {
type: "kafkaTrigger",
direction: "in",
name: "event",
topic: "topic",
brokerList: "%BrokerList%",
username: "%ConfluentCloudUserName%",
password: "%ConfluentCloudPassword%",
consumerGroup: "$Default",
protocol: "saslSsl",
authenticationMode: "plain",
dataType: "string",
cardinality: "MANY"
},
handler: kafkaTriggerMany,
});
可以为传递给触发器的事件定义一个通用的 Avro 架构。 此示例使用通用 Avro 架构定义特定提供程序的触发器:
import { app, InvocationContext } from "@azure/functions";
export async function kafkaAvroGenericTrigger(
event: any,
context: InvocationContext
): Promise<void> {
context.log("Processed kafka event: ", event);
context.log(
`Message ID: ${event.id}, amount: ${event.amount}, type: ${event.type}`
);
if (context.triggerMetadata?.key !== undefined) {
context.log(`Message Key : ${context.triggerMetadata?.key}`);
}
}
app.generic("kafkaAvroGenericTrigger", {
trigger: {
type: "kafkaTrigger",
direction: "in",
name: "event",
protocol: "SASLSSL",
username: "ConfluentCloudUsername",
password: "ConfluentCloudPassword",
dataType: "string",
topic: "topic",
authenticationMode: "PLAIN",
avroSchema:
'{"type":"record","name":"Payment","namespace":"io.confluent.examples.clients.basicavro","fields":[{"name":"id","type":"string"},{"name":"amount","type":"double"},{"name":"type","type":"string"}]}',
consumerGroup: "$Default",
brokerList: "%BrokerList%",
},
handler: kafkaAvroGenericTrigger,
});
有关一组完整的工作 TypeScript 示例,请参阅 Kafka 扩展存储库。
文件的特定属性 function.json 取决于事件提供程序。 在这些示例中,事件提供程序为 Confluent 或 Azure 事件中心。 以下示例演示用于读取和记录 Kafka 消息的函数的 Kafka 触发器。
以下 function.json 文件定义特定提供程序的触发器:
{
"bindings": [
{
"type": "kafkaTrigger",
"name": "kafkaEvent",
"direction": "in",
"protocol" : "SASLSSL",
"password" : "%ConfluentCloudPassword%",
"dataType" : "string",
"topic" : "topic",
"authenticationMode" : "PLAIN",
"consumerGroup" : "$Default",
"username" : "%ConfluentCloudUserName%",
"brokerList" : "%BrokerList%",
"sslCaLocation": "confluent_cloud_cacert.pem"
}
]
}
触发函数时,将运行以下代码:
using namespace System.Net
param($kafkaEvent, $TriggerMetadata)
Write-Output "Powershell Kafka trigger function called for message $kafkaEvent.Value"
若要批量接收事件,请将 function.json 文件中的 cardinality 值设为 many,如以下示例所示:
{
"bindings": [
{
"type": "kafkaTrigger",
"name": "kafkaEvent",
"direction": "in",
"protocol" : "SASLSSL",
"password" : "%ConfluentCloudPassword%",
"dataType" : "string",
"topic" : "topic",
"authenticationMode" : "PLAIN",
"cardinality" : "MANY",
"consumerGroup" : "$Default",
"username" : "%ConfluentCloudUserName%",
"brokerList" : "%BrokerList%",
"sslCaLocation": "confluent_cloud_cacert.pem"
}
]
}
以下代码分析事件数组并记录事件数据:
using namespace System.Net
param($kafkaEvents, $TriggerMetadata)
$kafkaEvents
foreach ($kafkaEvent in $kafkaEvents) {
$event = $kafkaEvent | ConvertFrom-Json -AsHashtable
Write-Output "Powershell Kafka trigger function called for message $event.Value"
}
以下代码记录标头数据:
using namespace System.Net
param($kafkaEvents, $TriggerMetadata)
foreach ($kafkaEvent in $kafkaEvents) {
$kevent = $kafkaEvent | ConvertFrom-Json -AsHashtable
Write-Output "Powershell Kafka trigger function called for message $kevent.Value"
Write-Output "Headers for this message:"
foreach ($header in $kevent.Headers) {
$DecodedValue = [System.Text.Encoding]::Unicode.GetString([System.Convert]::FromBase64String($header.Value))
$Key = $header.Key
Write-Output "Key: $Key Value: $DecodedValue"
}
}
可以为传递给触发器的事件定义一个通用的 Avro 架构。 以下 function.json 使用通用 Avro 架构定义了特定提供程序的触发器:
{
"bindings" : [ {
"type" : "kafkaTrigger",
"direction" : "in",
"name" : "kafkaEvent",
"protocol" : "SASLSSL",
"password" : "ConfluentCloudPassword",
"topic" : "topic",
"authenticationMode" : "PLAIN",
"avroSchema" : "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}",
"consumerGroup" : "$Default",
"username" : "ConfluentCloudUsername",
"brokerList" : "%BrokerList%"
} ]
}
触发函数时,将运行以下代码:
using namespace System.Net
param($kafkaEvent, $TriggerMetadata)
Write-Output "Powershell Kafka trigger function called for message $kafkaEvent.Value"
有关完整的一组可用 PowerShell 示例,请参阅 Kafka 扩展存储库。
触发器的用法取决于 Python 编程模型的版本。
在 Python v2 模型中,使用修饰器直接在函数代码中定义触发器。 有关详细信息,请参阅 Azure Functions Python 开发人员指南。
这些示例演示如何为读取 Kafka 消息的函数定义 Kafka 触发器。
@KafkaTrigger.function_name(name="KafkaTrigger")
@KafkaTrigger.kafka_trigger(
arg_name="kevent",
topic="KafkaTopic",
broker_list="KafkaBrokerList",
username="KafkaUsername",
password="KafkaPassword",
protocol="SaslSsl",
authentication_mode="Plain",
consumer_group="$Default1")
def kafka_trigger(kevent : func.KafkaEvent):
logging.info(kevent.get_body().decode('utf-8'))
logging.info(kevent.metadata)
此示例通过将值
@KafkaTrigger.function_name(name="KafkaTriggerMany")
@KafkaTrigger.kafka_trigger(
arg_name="kevents",
topic="KafkaTopic",
broker_list="KafkaBrokerList",
username="KafkaUsername",
password="KafkaPassword",
protocol="SaslSsl",
authentication_mode="Plain",
cardinality="MANY",
data_type="string",
consumer_group="$Default2")
def kafka_trigger_many(kevents : typing.List[func.KafkaEvent]):
for event in kevents:
logging.info(event.get_body())
可以为传递给触发器的事件定义一个通用的 Avro 架构。
@KafkaTriggerAvro.function_name(name="KafkaTriggerAvroOne")
@KafkaTriggerAvro.kafka_trigger(
arg_name="kafkaTriggerAvroGeneric",
topic="KafkaTopic",
broker_list="KafkaBrokerList",
username="KafkaUsername",
password="KafkaPassword",
protocol="SaslSsl",
authentication_mode="Plain",
consumer_group="$Default",
avro_schema= "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}")
def kafka_trigger_avro_one(kafkaTriggerAvroGeneric : func.KafkaEvent):
logging.info(kafkaTriggerAvroGeneric.get_body().decode('utf-8'))
logging.info(kafkaTriggerAvroGeneric.metadata)
有关完整的一组可用 Python 示例,请参阅 Kafka 扩展存储库。
用于配置触发器的注释取决于特定的事件提供程序。
以下示例显示了一个 Java 函数,该函数读取和记录 Kafka 事件的内容:
@FunctionName("KafkaTrigger")
public void runSingle(
@KafkaTrigger(
name = "KafkaTrigger",
topic = "topic",
brokerList="%BrokerList%",
consumerGroup="$Default",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
protocol = BrokerProtocol.SASLSSL,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
dataType = "string"
) String kafkaEventData,
final ExecutionContext context) {
context.getLogger().info(kafkaEventData);
}
若要批量接收事件,请使用输入字符串作为数组,如以下示例所示:
@FunctionName("KafkaTriggerMany")
public void runMany(
@KafkaTrigger(
name = "kafkaTriggerMany",
topic = "topic",
brokerList="%BrokerList%",
consumerGroup="$Default",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
protocol = BrokerProtocol.SASLSSL,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
cardinality = Cardinality.MANY,
dataType = "string"
) String[] kafkaEvents,
final ExecutionContext context) {
for (String kevent: kafkaEvents) {
context.getLogger().info(kevent);
}
}
以下函数记录 Kafka 事件的消息和标头:
@FunctionName("KafkaTriggerManyWithHeaders")
public void runSingle(
@KafkaTrigger(
name = "KafkaTrigger",
topic = "topic",
brokerList="%BrokerList%",
consumerGroup="$Default",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
protocol = BrokerProtocol.SASLSSL,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
dataType = "string",
cardinality = Cardinality.MANY
) List<String> kafkaEvents,
final ExecutionContext context) {
Gson gson = new Gson();
for (String keventstr: kafkaEvents) {
KafkaEntity kevent = gson.fromJson(keventstr, KafkaEntity.class);
context.getLogger().info("Java Kafka trigger function called for message: " + kevent.Value);
context.getLogger().info("Headers for the message:");
for (KafkaHeaders header : kevent.Headers) {
String decodedValue = new String(Base64.getDecoder().decode(header.Value));
context.getLogger().info("Key:" + header.Key + " Value:" + decodedValue);
}
}
}
可以为传递给触发器的事件定义一个通用的 Avro 架构。 以下函数使用通用 Avro 架构定义了特定提供程序的触发器:
private static final String schema = "{\"type\":\"record\",\"name\":\"Payment\",\"namespace\":\"io.confluent.examples.clients.basicavro\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"amount\",\"type\":\"double\"},{\"name\":\"type\",\"type\":\"string\"}]}";
@FunctionName("KafkaAvroGenericTrigger")
public void runOne(
@KafkaTrigger(
name = "kafkaAvroGenericSingle",
topic = "topic",
brokerList="%BrokerList%",
consumerGroup="$Default",
username = "ConfluentCloudUsername",
password = "ConfluentCloudPassword",
avroSchema = schema,
authenticationMode = BrokerAuthenticationMode.PLAIN,
protocol = BrokerProtocol.SASLSSL) Payment payment,
final ExecutionContext context) {
context.getLogger().info(payment.toString());
}
有关完整的一组适用于 Confluent 的可用 Java 示例,请参阅 Kafka 扩展存储库。
属性
进程内和独立工作进程 C# 库都使用 KafkaTriggerAttribute 来定义函数触发器。
下表说明了可以使用此触发器属性设置的属性:
| 参数 | 说明 |
|---|---|
| BrokerList | (必需)触发器监视的 Kafka 代理的列表。 有关详细信息,请参阅连接。 |
| 主题 | (必需)触发器监视的主题。 |
| ConsumerGroup | (可选)触发器使用的 Kafka 使用者组。 |
| AvroSchema | (可选)使用 Avro 协议时消息值的通用记录的架构。 |
| KeyAvroSchema | (可选)使用 Avro 协议时消息密钥的通用记录的架构。 |
| KeyDataType | (可选)从 Kafka 主题接收消息密钥的数据类型。 如果 KeyAvroSchema 已设置,则此值为泛型记录。 接受的值是Int、Long和StringBinary。 |
| AuthenticationMode | (可选)使用简单身份验证和安全层 (SASL) 身份验证时的身份验证方式。 支持的值是NotSet(默认值)、Gssapi、Plain、ScramSha256和ScramSha512OAuthBearer。 |
| 用户名 | (可选)SASL 身份验证的用户名。 当 AuthenticationMode 为 Gssapi 时不受支持。 有关详细信息,请参阅连接。 |
| 密码 | (可选)SASL 身份验证的密码。 当 AuthenticationMode 为 Gssapi 时不受支持。 有关详细信息,请参阅连接。 |
| 协议 | (可选)与代理通信时使用的安全协议。 支持的值是NotSet(默认值)、plaintext、ssl、sasl_plaintextsasl_ssl。 |
| SslCaLocation | (可选)用于验证代理证书的 CA 证书文件的路径。 |
| SslCertificateLocation | (可选)客户端证书的路径。 |
| SslKeyLocation | (可选)用于身份验证的客户端私钥 (PEM) 的路径。 |
| SslKeyPassword | (可选)客户端证书的密码。 |
| SslCertificatePEM | (可选)以 PEM 格式作为字符串的客户端证书。 有关详细信息,请参阅连接。 |
| SslKeyPEM | (可选)以 PEM 格式作为字符串的客户端私钥。 有关详细信息,请参阅连接。 |
| SslCaPEM | (可选)PEM 格式为字符串的 CA 证书。 有关详细信息,请参阅连接。 |
| SslCertificateandKeyPEM | (可选)以 PEM 格式作为字符串的客户端证书和密钥。 有关详细信息,请参阅连接。 |
| SchemaRegistryUrl | (可选)Avro 架构注册表的 URL。 有关详细信息,请参阅连接。 |
| SchemaRegistryUsername | (可选)Avro 架构注册表的用户名。 有关详细信息,请参阅连接。 |
| SchemaRegistryPassword | (可选)Avro 架构注册表的密码。 有关详细信息,请参阅连接。 |
| OAuthBearerMethod | (可选)OAuth Bearer 方法。 接受的值是 oidc 和 default。 |
| OAuthBearerClientId | (可选)如果 OAuthBearerMethod 设置为 oidc,则指定 OAuth 持有者客户端 ID。 有关详细信息,请参阅连接。 |
| OAuthBearerClientSecret | (可选)如果 OAuthBearerMethod 设置为 oidc,则指定 OAuth 持有者客户端密码。 有关详细信息,请参阅连接。 |
| OAuthBearerScope | (可选)指定对代理的访问请求的范围。 |
| OAuthBearerTokenEndpointUrl | (可选)使用方法时用于检索令牌的 oidc OAuth/OIDC 颁发者令牌终结点 HTTP(S) URI。 有关详细信息,请参阅连接。 |
| OAuthBearerExtensions | (可选)使用方法时 oidc 要作为中转站的其他信息提供的 key=value 对的逗号分隔列表。 例如: supportFeatureX=true,organizationId=sales-emea。 |
批注
通过 KafkaTrigger 批注,可以创建在收到主题时运行的函数。 支持的选项包括以下元素:
| 元素 | 说明 |
|---|---|
| name | (必需)变量的名称,表示函数代码中的队列或主题消息。 |
| brokerList | (必需)触发器监视的 Kafka 代理的列表。 有关详细信息,请参阅连接。 |
| topic | (必需)触发器监视的主题。 |
| 基数 | (可选)指示触发器输入的基数。 支持的值为 ONE(默认值)和 MANY。 当输入是单个消息时使用 ONE,当输入是消息数组时使用 MANY。 使用 MANY 时,还必须设置一个 dataType。 |
| dataType | 定义 Functions 如何处理参数值。 默认情况下,值是作为一个字符串获取的,并且 Functions 会尝试将此字符串反序列化为实际的普通旧 Java 对象 (POJO)。 当此元素为 string 时,输入仅仅被视为一个字符串。 当此元素为 binary 时,消息是作为二进制数据接收的,Functions 会尝试将其反序列化为实际参数类型 byte[]。 |
| consumerGroup | (可选)触发器使用的 Kafka 使用者组。 |
| avroSchema | (可选)使用 Avro 协议时的通用记录的架构。 |
| authenticationMode | (可选)使用简单身份验证和安全层 (SASL) 身份验证时的身份验证方式。 支持的值是NotSet(默认值)、Gssapi、Plain、ScramSha256ScramSha512。 |
| username | (可选)SASL 身份验证的用户名。 当 AuthenticationMode 为 Gssapi 时不受支持。 有关详细信息,请参阅连接。 |
| password | (可选)SASL 身份验证的密码。 当 AuthenticationMode 为 Gssapi 时不受支持。 有关详细信息,请参阅连接。 |
| protocol | (可选)与代理通信时使用的安全协议。 支持的值是NotSet(默认值)、plaintext、ssl、sasl_plaintextsasl_ssl。 |
| sslCaLocation | (可选)用于验证代理证书的 CA 证书文件的路径。 |
| sslCertificateLocation | (可选)客户端证书的路径。 |
| sslKeyLocation | (可选)用于身份验证的客户端私钥 (PEM) 的路径。 |
| sslKeyPassword | (可选)客户端证书的密码。 |
| lagThreshold | (可选)触发器的滞后阈值。 |
| schemaRegistryUrl | (可选)Avro 架构注册表的 URL。 有关详细信息,请参阅连接。 |
| schemaRegistryUsername | (可选)Avro 架构注册表的用户名。 有关详细信息,请参阅连接。 |
| schemaRegistryPassword | (可选)Avro 架构注册表的密码。 有关详细信息,请参阅连接。 |
配置
下表解释了在 function.json 文件中设置的绑定配置属性。
| “function.json”属性 | 说明 |
|---|---|
| type | (必需)设置为 kafkaTrigger. |
| direction | (必需)设置为 in. |
| name | (必需)表示函数代码中代理数据的变量的名称。 |
| brokerList | (必需)触发器监视的 Kafka 代理的列表。 有关详细信息,请参阅连接。 |
| topic | (必需)触发器监视的主题。 |
| 基数 | (可选)指示触发器输入的基数。 支持的值为 ONE(默认值)和 MANY。 当输入是单个消息时使用 ONE,当输入是消息数组时使用 MANY。 使用 MANY 时,还必须设置一个 dataType。 |
| dataType | 定义 Functions 如何处理参数值。 默认情况下,值是作为一个字符串获取的,并且 Functions 会尝试将此字符串反序列化为实际的普通旧 Java 对象 (POJO)。 当此元素为 string 时,输入仅仅被视为一个字符串。 当消息作为二进制数据接收时 binary,Functions 会尝试将其反序列化为实际的字节数组参数类型。 |
| consumerGroup | (可选)触发器使用的 Kafka 使用者组。 |
| avroSchema | (可选)使用 Avro 协议时的通用记录的架构。 |
| keyAvroSchema | (可选)使用 Avro 协议时消息密钥的通用记录的架构。 |
| keyDataType | (可选)从 Kafka 主题接收消息密钥的数据类型。 如果 keyAvroSchema 已设置,则此值为泛型记录。 接受的值是Int、Long和StringBinary。 |
| authenticationMode | (可选)使用简单身份验证和安全层 (SASL) 身份验证时的身份验证方式。 支持的值是NotSet(默认值)、Gssapi、Plain、ScramSha256ScramSha512。 |
| username | (可选)SASL 身份验证的用户名。 当 AuthenticationMode 为 Gssapi 时不受支持。 有关详细信息,请参阅连接。 |
| password | (可选)SASL 身份验证的密码。 当 AuthenticationMode 为 Gssapi 时不受支持。 有关详细信息,请参阅连接。 |
| protocol | (可选)与代理通信时使用的安全协议。 支持的值是NotSet(默认值)、plaintext、ssl、sasl_plaintextsasl_ssl。 |
| sslCaLocation | (可选)用于验证代理证书的 CA 证书文件的路径。 |
| sslCertificateLocation | (可选)客户端证书的路径。 |
| sslKeyLocation | (可选)用于身份验证的客户端私钥 (PEM) 的路径。 |
| sslKeyPassword | (可选)客户端证书的密码。 |
| sslCertificatePEM | (可选)以 PEM 格式作为字符串的客户端证书。 有关详细信息,请参阅连接。 |
| sslKeyPEM | (可选)以 PEM 格式作为字符串的客户端私钥。 有关详细信息,请参阅连接。 |
| sslCaPEM | (可选)PEM 格式为字符串的 CA 证书。 有关详细信息,请参阅连接。 |
| sslCertificateandKeyPEM | (可选)以 PEM 格式作为字符串的客户端证书和密钥。 有关详细信息,请参阅连接。 |
| lagThreshold | (可选)触发器的滞后阈值。 |
| schemaRegistryUrl | (可选)Avro 架构注册表的 URL。 有关详细信息,请参阅连接。 |
| schemaRegistryUsername | (可选)Avro 架构注册表的用户名。 有关详细信息,请参阅连接。 |
| schemaRegistryPassword | (可选)Avro 架构注册表的密码。 有关详细信息,请参阅连接。 |
| oAuthBearerMethod | (可选)OAuth Bearer 方法。 接受的值是 oidc 和 default。 |
| oAuthBearerClientId | (可选)如果 oAuthBearerMethod 设置为 oidc,则指定 OAuth 持有者客户端 ID。 有关详细信息,请参阅连接。 |
| oAuthBearerClientSecret | (可选)如果 oAuthBearerMethod 设置为 oidc,则指定 OAuth 持有者客户端密码。 有关详细信息,请参阅连接。 |
| oAuthBearerScope | (可选)指定对代理的访问请求的范围。 |
| oAuthBearerTokenEndpointUrl | (可选)使用方法时用于检索令牌的 oidc OAuth/OIDC 颁发者令牌终结点 HTTP(S) URI。 有关详细信息,请参阅连接。 |
配置
下表解释了在 function.json 文件中设置的绑定配置属性。 Python 对配置属性使用snake_case命名约定。
| “function.json”属性 | 说明 |
|---|---|
| type | (必需)设置为 kafkaTrigger. |
| direction | (必需)设置为 in. |
| name | (必需)表示函数代码中代理数据的变量的名称。 |
| broker_list | (必需)触发器监视的 Kafka 代理的列表。 有关详细信息,请参阅连接。 |
| topic | (必需)触发器监视的主题。 |
| 基数 | (可选)指示触发器输入的基数。 支持的值为 ONE(默认值)和 MANY。 当输入是单个消息时使用 ONE,当输入是消息数组时使用 MANY。 使用 MANY 时,还必须设置一个 data_type。 |
| data_type | 定义 Functions 如何处理参数值。 默认情况下,值是作为一个字符串获取的,并且 Functions 会尝试将此字符串反序列化为实际的普通旧 Java 对象 (POJO)。 当此元素为 string 时,输入仅仅被视为一个字符串。 当此元素为 binary 时,消息是作为二进制数据接收的,Functions 会尝试将其反序列化为实际参数类型 byte[]。 |
| consumerGroup | (可选)触发器使用的 Kafka 使用者组。 |
| avroSchema | (可选)使用 Avro 协议时的通用记录的架构。 |
| authentication_mode | (可选)使用简单身份验证和安全层 (SASL) 身份验证时的身份验证方式。 支持的值是NOTSET(默认值)、Gssapi、Plain、ScramSha256ScramSha512。 |
| username | (可选)SASL 身份验证的用户名。 当 authentication_mode 为 Gssapi 时不受支持。 有关详细信息,请参阅连接。 |
| password | (可选)SASL 身份验证的密码。 当 authentication_mode 为 Gssapi 时不受支持。 有关详细信息,请参阅连接。 |
| protocol | (可选)与代理通信时使用的安全协议。 支持的值是NOTSET(默认值)、plaintext、ssl、sasl_plaintextsasl_ssl。 |
| sslCaLocation | (可选)用于验证代理证书的 CA 证书文件的路径。 |
| sslCertificateLocation | (可选)客户端证书的路径。 |
| sslKeyLocation | (可选)用于身份验证的客户端私钥 (PEM) 的路径。 |
| sslKeyPassword | (可选)客户端证书的密码。 |
| lag_threshold | (可选)触发器的滞后阈值。 |
| schema_registry_url | (可选)Avro 架构注册表的 URL。 有关详细信息,请参阅连接。 |
| schema_registry_username | (可选)Avro 架构注册表的用户名。 有关详细信息,请参阅连接。 |
| schema_registry_password | (可选)Avro 架构注册表的密码。 有关详细信息,请参阅连接。 |
| o_auth_bearer_method | (可选)OAuth Bearer 方法。 接受的值是 oidc 和 default。 |
| o_auth_bearer_client_id | (可选)如果 o_auth_bearer_method 设置为 oidc,则指定 OAuth 持有者客户端 ID。 有关详细信息,请参阅连接。 |
| o_auth_bearer_client_secret | (可选)如果 o_auth_bearer_method 设置为 oidc,则指定 OAuth 持有者客户端密码。 有关详细信息,请参阅连接。 |
| o_auth_bearer_scope | (可选)指定对代理的访问请求的范围。 |
| o_auth_bearer_token_endpoint_url | (可选)使用方法时用于检索令牌的 oidc OAuth/OIDC 颁发者令牌终结点 HTTP(S) URI。 有关详细信息,请参阅连接。 |
注意
证书 PEM 相关属性和 Avro 密钥相关属性在 Python 库中尚不可用。
使用情况
Kafka 触发器将 Kafka 消息作为字符串传递到函数。 触发器还支持 JSON 有效负载的字符串数组。
在高级计划中,必须为 Kafka 输出启用运行时缩放监视,以便横向扩展到多个实例。 若要了解详细信息,请参阅启用运行时缩放。
无法在 Azure 门户中使用 Code + 测试页的“测试/运行”功能来处理 Kafka 触发器。 必须改为将测试事件直接发送到由触发器监视的主题。
有关 Kafka 触发器支持的完整一组 host.json 设置,请参阅 host.json 设置。
连接
将触发器和绑定所需的所有连接信息存储在应用程序设置中,而不是存储在代码中的绑定定义中。 本指南适用于不应存储在代码中的凭据。
重要
凭据设置必须引用应用程序设置。 不要在代码或配置文件中对凭据进行硬编码。 在本地运行时,请对凭据使用 local.settings.json 文件,并且不要发布 local.settings.json 文件。
连接到 Azure 中 Confluent 提供的托管 Kafka 群集时,可以使用以下身份验证方法之一。
注意
使用 Flex Consumption 计划时,不支持基于文件位置的证书身份验证属性(SslCaLocation、 SslCertificateLocation、 SslKeyLocation)。 请改用基于 PEM 的证书属性(SslCaPEM、、SslCertificatePEMSslKeyPEM、SslCertificateandKeyPEM)或将证书存储在 Azure Key Vault 中。
架构注册表
若要在 Kafka 扩展中使用 Confluent 提供的架构注册表,请设置以下凭据:
| 设置 | 建议的值 | 说明 |
|---|---|---|
| SchemaRegistryUrl | SchemaRegistryUrl |
用于架构管理的架构注册表服务的 URL。 通常为格式 https://psrc-xyz.us-east-2.aws.confluent.cloud |
| SchemaRegistryUsername | CONFLUENT_API_KEY |
架构注册表上基本身份验证的用户名(如果需要)。 |
| SchemaRegistryPassword | CONFLUENT_API_SECRET |
架构注册表上基本身份验证的密码(如果需要)。 |
用户名/密码身份验证
使用此形式的身份验证时,请确保设置为Protocol或SaslPlaintextSaslSsl设置为Plain, ScramSha256AuthenticationMode或者ScramSha512,如果使用的 CA 证书不同于默认 ISRG 根 X1 证书,请确保更新或SslCaPEM。SslCaLocation
| 设置 | 建议的值 | 说明 |
|---|---|---|
| BrokerList | BootstrapServer |
名为 BootstrapServer 的应用设置包含在 Confluent Cloud 设置页面中找到的引导服务器的值。 该值类似于 xyz-xyzxzy.westeurope.azure.confluent.cloud:9092。 |
| 用户名 | ConfluentCloudUsername |
名为 ConfluentCloudUsername 的应用设置包含来自 Confluent Cloud 网站的 API 访问密钥。 |
| 密码 | ConfluentCloudPassword |
名为 ConfluentCloudPassword 的应用设置包含从 Confluent Cloud 网站获取的 API 机密。 |
| SslCaPEM | SSLCaPemCertificate |
以 PEM 格式将 CA 证书作为字符串命名 SSLCaPemCertificate 的应用设置。 该值应遵循标准格式,例如: -----BEGIN CERTIFICATE-----\nMII....JQ==\n-----END CERTIFICATE----- |
SSL 身份验证
确保 Protocol 已设置为 SSL.
| 设置 | 建议的值 | 说明 |
|---|---|---|
| BrokerList | BootstrapServer |
名为 BootstrapServer 的应用设置包含在 Confluent Cloud 设置页面中找到的引导服务器的值。 该值类似于 xyz-xyzxzy.westeurope.azure.confluent.cloud:9092。 |
| SslCaPEM | SslCaCertificatePem |
名为 SslCaCertificatePem 包含 CA 证书 PEM 值的应用设置作为字符串。 该值应遵循标准格式: -----BEGIN CERTIFICATE-----\nMII...JQ==\n-----END CERTIFICATE----- |
| SslCertificatePEM | SslClientCertificatePem |
名为 SslClientCertificatePem 包含客户端证书 PEM 值的应用设置作为字符串。 该值应遵循标准格式: -----BEGIN CERTIFICATE-----\nMII...JQ==\n-----END CERTIFICATE----- |
| SslKeyPEM | SslClientKeyPem |
名为 SslClientKeyPem 包含客户端私钥的 PEM 值作为字符串的应用设置。 该值应遵循标准格式: -----BEGIN PRIVATE KEY-----\nMII...JQ==\n-----END PRIVATE KEY----- |
| SslCertificateandKeyPEM | SslClientCertificateAndKeyPem |
名为 SslClientCertificateAndKeyPem 包含客户端证书的 PEM 值和以字符串形式串联的客户端私钥的应用设置。 该值应遵循标准格式: -----BEGIN CERTIFICATE-----\nMII....JQ==\n-----END CERTIFICATE-----\n-----BEGIN PRIVATE KEY-----\nMIIE....BM=\n-----END PRIVATE KEY----- |
| SslKeyPassword | SslClientKeyPassword |
名为 SslClientKeyPassword 包含私钥密码的应用设置(如果有)。 |
OAuth 身份验证
使用 OAuth 身份验证时,请在绑定定义中配置与 OAuth 相关的属性。
在本地开发期间,用于这些设置的字符串值必须作为 Azure 中的应用程序设置存在,或存在于 Values的 集合中。
还应设置 Protocol 绑定定义和 AuthenticationMode 绑定定义。