Nuta
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować się zalogować lub zmienić katalog.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Powiązanie wyjściowe umożliwia aplikacji usługi Azure Functions wysyłanie komunikatów do tematu platformy Kafka.
Ważne
Powiązania platformy Kafka są dostępne dla funkcji w ramach planu Flex Consumption, planu Elastic Premium i planu dedykowanego (App Service). Są one obsługiwane tylko w wersji 4.x środowiska uruchomieniowego usługi Functions.
Przykład
Sposób użycia powiązania zależy od modalności języka C# w aplikacji funkcji. Można użyć jednej z następujących metod:
Skompilowana funkcja języka C#, która używa izolowanej biblioteki klas procesów roboczych , która jest uruchamiana w procesie oddzielonym od środowiska uruchomieniowego.
Używane atrybuty zależą od określonego dostawcy zdarzeń.
W poniższym przykładzie użyto niestandardowego typu zwracanego o nazwie MultipleOutputType, który składa się z odpowiedzi HTTP i danych wyjściowych platformy Kafka.
[Function("KafkaOutput")]
public static MultipleOutputType Output(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
FunctionContext executionContext)
{
var log = executionContext.GetLogger("HttpFunction");
log.LogInformation("C# HTTP trigger function processed a request.");
string message = req.FunctionContext
.BindingContext
.BindingData["message"]
.ToString();
var response = req.CreateResponse(HttpStatusCode.OK);
return new MultipleOutputType()
{
Kevent = message,
HttpResponse = response
};
}
MultipleOutputType W klasie Kevent jest zmienną powiązania wyjściowego dla powiązania platformy Kafka.
public class MultipleOutputType
{
[KafkaOutput("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain
)]
public string Kevent { get; set; }
public HttpResponseData HttpResponse { get; set; }
}
Aby wysłać partię zdarzeń, przekaż tablicę ciągów do typu danych wyjściowych, jak pokazano w poniższym przykładzie:
[Function("KafkaOutputMany")]
public static MultipleOutputTypeForBatch Output(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
FunctionContext executionContext)
{
var log = executionContext.GetLogger("HttpFunction");
log.LogInformation("C# HTTP trigger function processed a request.");
var response = req.CreateResponse(HttpStatusCode.OK);
string[] messages = new string[2];
messages[0] = "one";
messages[1] = "two";
return new MultipleOutputTypeForBatch()
{
Kevents = messages,
HttpResponse = response
};
}
Tablica ciągów jest zdefiniowana jako Kevents właściwość klasy, a powiązanie wyjściowe jest definiowane na tej właściwości:
public class MultipleOutputTypeForBatch
{
[KafkaOutput("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain
)]
public string[] Kevents { get; set; }
public HttpResponseData HttpResponse { get; set; }
}
Następująca funkcja dodaje nagłówki do danych wyjściowych platformy Kafka:
[Function("KafkaOutputWithHeaders")]
public static MultipleOutputType Output(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
FunctionContext executionContext)
{
var log = executionContext.GetLogger("HttpFunction");
log.LogInformation("C# HTTP trigger function processed a request.");
string message = req.FunctionContext
.BindingContext
.BindingData["message"]
.ToString();
string kevent = "{ \"Offset\":364,\"Partition\":0,\"Topic\":\"kafkaeventhubtest1\",\"Timestamp\":\"2022-04-09T03:20:06.591Z\", \"Value\": \"" + message + "\", \"Headers\": [{ \"Key\": \"test\", \"Value\": \"dotnet-isolated\" }] }";
var response = req.CreateResponse(HttpStatusCode.OK);
return new MultipleOutputType()
{
Kevent = kevent,
HttpResponse = response
};
}
Pełny zestaw działających przykładów platformy .NET można znaleźć w repozytorium rozszerzeń platformy Kafka.
Użycie powiązania wyjściowego zależy od wersji modelu programowania Node.js.
W modelu Node.js w wersji 4 zdefiniujesz powiązanie wyjściowe bezpośrednio w kodzie funkcji. Aby uzyskać więcej informacji, zobacz Przewodnik dewelopera usługi Azure Functions Node.js.
W tych przykładach dostawcy zdarzeń to Confluent lub Azure Event Hubs. W tych przykładach przedstawiono powiązanie danych wyjściowych platformy Kafka dla funkcji wyzwalanej przez żądanie HTTP i wysyłania danych z żądania do tematu platformy Kafka.
import {
app,
HttpRequest,
HttpResponseInit,
InvocationContext,
output,
} from "@azure/functions";
const kafkaOutput = output.generic({
type: "kafka",
direction: "out",
topic: "topic",
brokerList: "%BrokerList%",
username: "ConfluentCloudUsername",
password: "ConfluentCloudPassword",
protocol: "saslSsl",
authenticationMode: "plain",
});
export async function kafkaOutputWithHttp(
request: HttpRequest,
context: InvocationContext
): Promise<HttpResponseInit> {
context.log(`Http function processed request for url "${request.url}"`);
const body = await request.text();
const queryName = request.query.get("name");
const parsedbody = JSON.parse(body);
const name = queryName || parsedbody.name || "world";
context.extraOutputs.set(kafkaOutput, `Hello, ${parsedbody.name}!`);
context.log(
`Sending message to kafka: ${context.extraOutputs.get(kafkaOutput)}`
);
return {
body: `Message sent to kafka with value: ${context.extraOutputs.get(
kafkaOutput
)}`,
status: 200,
};
}
const extraOutputs = [];
extraOutputs.push(kafkaOutput);
app.http("kafkaOutputWithHttp", {
methods: ["GET", "POST"],
authLevel: "anonymous",
extraOutputs,
handler: kafkaOutputWithHttp,
});
Aby wysyłać zdarzenia w partii, wyślij tablicę komunikatów, jak pokazano w poniższych przykładach:
const { app, output } = require("@azure/functions");
const kafkaOutput = output.generic({
type: "kafka",
direction: "out",
topic: "topic",
brokerList: "%BrokerList%",
username: "ConfluentCloudUsername",
password: "ConfluentCloudPassword",
protocol: "saslSsl",
authenticationMode: "plain",
});
async function kafkaOutputManyWithHttp(request, context) {
context.log(`Http function processed request for url "${request.url}"`);
const queryName = request.query.get("name");
const body = await request.text();
const parsedbody = body ? JSON.parse(body) : {};
parsedbody.name = parsedbody.name || "world";
const name = queryName || parsedbody.name;
context.extraOutputs.set(kafkaOutput, `Message one. Hello, ${name}!`);
context.extraOutputs.set(kafkaOutput, `Message two. Hello, ${name}!`);
return {
body: `Messages sent to kafka.`,
status: 200,
};
}
const extraOutputs = [];
extraOutputs.push(kafkaOutput);
app.http("kafkaOutputManyWithHttp", {
methods: ["GET", "POST"],
authLevel: "anonymous",
extraOutputs,
handler: kafkaOutputManyWithHttp,
});
W poniższych przykładach pokazano, jak wysłać komunikat zdarzenia z nagłówkami do tematu platformy Kafka:
import {
app,
HttpRequest,
HttpResponseInit,
InvocationContext,
output,
} from "@azure/functions";
const kafkaOutput = output.generic({
type: "kafka",
direction: "out",
topic: "topic",
brokerList: "%BrokerList%",
username: "ConfluentCloudUsername",
password: "ConfluentCloudPassword",
protocol: "saslSsl",
authenticationMode: "plain",
});
export async function kafkaOutputWithHttp(
request: HttpRequest,
context: InvocationContext
): Promise<HttpResponseInit> {
context.log(`Http function processed request for url "${request.url}"`);
const body = await request.text();
const parsedbody = JSON.parse(body);
// assuming body is of the format { "key": "key", "value": {JSON object} }
context.extraOutputs.set(
kafkaOutput,
`{ "Offset":364,"Partition":0,"Topic":"test-topic","Timestamp":"2022-04-09T03:20:06.591Z", "Value": "${JSON.stringify(
parsedbody.value
).replace(/"/g, '\\"')}", "Key":"${
parsedbody.key
}", "Headers": [{ "Key": "language", "Value": "javascript" }] }`
);
context.log(
`Sending message to kafka: ${context.extraOutputs.get(kafkaOutput)}`
);
return {
body: `Message sent to kafka with value: ${context.extraOutputs.get(
kafkaOutput
)}`,
status: 200,
};
}
const extraOutputs = [];
extraOutputs.push(kafkaOutput);
app.http("kafkaOutputWithHttp", {
methods: ["GET", "POST"],
authLevel: "anonymous",
extraOutputs,
handler: kafkaOutputWithHttp,
});
Pełny zestaw działających przykładów języka JavaScript można znaleźć w repozytorium rozszerzeń platformy Kafka.
import {
app,
HttpRequest,
HttpResponseInit,
InvocationContext,
output,
} from "@azure/functions";
const kafkaOutput = output.generic({
type: "kafka",
direction: "out",
topic: "topic",
brokerList: "%BrokerList%",
username: "ConfluentCloudUsername",
password: "ConfluentCloudPassword",
protocol: "saslSsl",
authenticationMode: "plain",
});
export async function kafkaOutputWithHttp(
request: HttpRequest,
context: InvocationContext
): Promise<HttpResponseInit> {
context.log(`Http function processed request for url "${request.url}"`);
const body = await request.text();
const queryName = request.query.get("name");
const parsedbody = JSON.parse(body);
const name = queryName || parsedbody.name || "world";
context.extraOutputs.set(kafkaOutput, `Hello, ${parsedbody.name}!`);
context.log(
`Sending message to kafka: ${context.extraOutputs.get(kafkaOutput)}`
);
return {
body: `Message sent to kafka with value: ${context.extraOutputs.get(
kafkaOutput
)}`,
status: 200,
};
}
const extraOutputs = [];
extraOutputs.push(kafkaOutput);
app.http("kafkaOutputWithHttp", {
methods: ["GET", "POST"],
authLevel: "anonymous",
extraOutputs,
handler: kafkaOutputWithHttp,
});
Aby wysyłać zdarzenia w partii, wyślij tablicę komunikatów, jak pokazano w poniższych przykładach:
import {
app,
HttpRequest,
HttpResponseInit,
InvocationContext,
output,
} from "@azure/functions";
const kafkaOutput = output.generic({
type: "kafka",
direction: "out",
topic: "topic",
brokerList: "%BrokerList%",
username: "ConfluentCloudUsername",
password: "ConfluentCloudPassword",
protocol: "saslSsl",
authenticationMode: "plain",
});
export async function kafkaOutputManyWithHttp(
request: HttpRequest,
context: InvocationContext
): Promise<HttpResponseInit> {
context.log(`Http function processed request for url "${request.url}"`);
const queryName = request.query.get("name");
const body = await request.text();
const parsedbody = body ? JSON.parse(body) : {};
parsedbody.name = parsedbody.name || "world";
const name = queryName || parsedbody.name;
context.extraOutputs.set(kafkaOutput, `Message one. Hello, ${name}!`);
context.extraOutputs.set(kafkaOutput, `Message two. Hello, ${name}!`);
return {
body: `Messages sent to kafka.`,
status: 200,
};
}
const extraOutputs = [];
extraOutputs.push(kafkaOutput);
app.http("kafkaOutputManyWithHttp", {
methods: ["GET", "POST"],
authLevel: "anonymous",
extraOutputs,
handler: kafkaOutputManyWithHttp,
});
W poniższych przykładach pokazano, jak wysłać komunikat zdarzenia z nagłówkami do tematu platformy Kafka:
import {
app,
HttpRequest,
HttpResponseInit,
InvocationContext,
output,
} from "@azure/functions";
const kafkaOutput = output.generic({
type: "kafka",
direction: "out",
topic: "topic",
brokerList: "%BrokerList%",
username: "ConfluentCloudUsername",
password: "ConfluentCloudPassword",
protocol: "saslSsl",
authenticationMode: "plain",
});
export async function kafkaOutputWithHttp(
request: HttpRequest,
context: InvocationContext
): Promise<HttpResponseInit> {
context.log(`Http function processed request for url "${request.url}"`);
const body = await request.text();
const parsedbody = JSON.parse(body);
// assuming body is of the format { "key": "key", "value": {JSON object} }
context.extraOutputs.set(
kafkaOutput,
`{ "Offset":364,"Partition":0,"Topic":"test-topic","Timestamp":"2022-04-09T03:20:06.591Z", "Value": "${JSON.stringify(
parsedbody.value
).replace(/"/g, '\\"')}", "Key":"${
parsedbody.key
}", "Headers": [{ "Key": "language", "Value": "typescript" }] }`
);
context.log(
`Sending message to kafka: ${context.extraOutputs.get(kafkaOutput)}`
);
return {
body: `Message sent to kafka with value: ${context.extraOutputs.get(
kafkaOutput
)}`,
status: 200,
};
}
const extraOutputs = [];
extraOutputs.push(kafkaOutput);
app.http("kafkaOutputWithHttp", {
methods: ["GET", "POST"],
authLevel: "anonymous",
extraOutputs,
handler: kafkaOutputWithHttp,
});
Pełny zestaw działających przykładów języka TypeScript można znaleźć w repozytorium rozszerzeń platformy Kafka.
Określone właściwości pliku function.json zależą od dostawcy zdarzeń, który w tych przykładach to Confluent lub Azure Event Hubs. W poniższych przykładach pokazano powiązanie danych wyjściowych platformy Kafka dla funkcji wyzwalanej przez żądanie HTTP i wysyłania danych z żądania do tematu platformy Kafka.
W poniższych function.json zdefiniowano wyzwalacz dla określonego dostawcy w następujących przykładach:
{
"bindings": [
{
"authLevel": "function",
"type": "httpTrigger",
"direction": "in",
"name": "Request",
"methods": [
"get"
]
},
{
"type": "kafka",
"name": "outputMessage",
"brokerList": "BrokerList",
"topic": "topic",
"username" : "%ConfluentCloudUserName%",
"password" : "%ConfluentCloudPassword%",
"protocol": "SASLSSL",
"authenticationMode": "PLAIN",
"direction": "out"
},
{
"type": "http",
"direction": "out",
"name": "Response"
}
]
}
Poniższy kod wysyła komunikat do tematu:
using namespace System.Net
# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)
# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."
# Interact with query parameters or the body of the request.
$message = $Request.Query.Message
$message
Push-OutputBinding -Name outputMessage -Value ($message)
# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
StatusCode = [HttpStatusCode]::OK
})
Poniższy kod wysyła wiele komunikatów jako tablicę do tego samego tematu:
using namespace System.Net
# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)
# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."
$message = @("one", "two")
Push-OutputBinding -Name outputMessage -Value ($message)
# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
StatusCode = [HttpStatusCode]::OK
})
W poniższym przykładzie pokazano, jak wysłać komunikat zdarzenia z nagłówkami do tego samego tematu platformy Kafka:
using namespace System.Net
# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)
# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."
# Interact with query parameters or the body of the request.
$message = $Request.Query.Message
if (-not $message) {
$message = $Request.Body.Message
}
$kevent = @{
Offset = 364
Partition = 0
Topic = "kafkaeventhubtest1"
Timestamp = "2022-04-09T03:20:06.591Z"
Value = $message
Headers= @(@{
Key= "test"
Value= "powershell"
}
)
}
Push-OutputBinding -Name Message -Value $kevent
# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
StatusCode = [HttpStatusCode]::OK
Body = 'ok'
})
Pełny zestaw działających przykładów programu PowerShell można znaleźć w repozytorium rozszerzeń platformy Kafka.
Użycie powiązania wyjściowego zależy od używanej wersji modelu programowania w języku Python.
W modelu języka Python w wersji 2 zdefiniujesz powiązanie wyjściowe bezpośrednio w kodzie funkcji przy użyciu dekoratorów. Aby uzyskać więcej informacji, zobacz Przewodnik dla deweloperów języka Python usługi Azure Functions.
W tych przykładach przedstawiono powiązanie danych wyjściowych platformy Kafka dla funkcji wyzwalanej przez żądanie HTTP i wysyłania danych z żądania do tematu platformy Kafka.
input_msg = req.params.get('message')
outputMessage.set(input_msg)
return 'OK'
@KafkaOutput.function_name(name="KafkaOutputMany")
@KafkaOutput.route(route="kafka_output_many")
@KafkaOutput.kafka_output(arg_name="outputMessage", topic="KafkaTopic", broker_list="KafkaBrokerList", username="KafkaUsername", password="KafkaPassword", protocol="SaslSsl", authentication_mode="Plain", data_type="string")
def kafka_output_many(req: func.HttpRequest, outputMessage: func.Out[str] ) -> func.HttpResponse:
outputMessage.set(json.dumps(['one', 'two']))
return 'OK'
Aby wysyłać zdarzenia w partii, wyślij tablicę komunikatów, jak pokazano w poniższych przykładach:
@KafkaOutput.route(route="kafka_output_with_headers")
@KafkaOutput.kafka_output(arg_name="out", topic="KafkaTopic", broker_list="KafkaBrokerList", username="KafkaUsername", password="KafkaPassword", protocol="SaslSsl", authentication_mode="Plain")
def kafka_output_with_headers(req: func.HttpRequest, out: func.Out[str]) -> func.HttpResponse:
message = req.params.get('message')
kevent = { "Offset":0,"Partition":0,"Topic":"dummy","Timestamp":"2022-04-09T03:20:06.591Z", "Value": message, "Headers": [{ "Key": "test", "Value": "python" }] }
out.set(json.dumps(kevent))
return 'OK'
@KafkaOutput.function_name(name="KafkaOutputManyWithHeaders")
@KafkaOutput.route(route="kafka_output_many_with_headers")
@KafkaOutput.kafka_output(arg_name="out", topic="KafkaTopic", broker_list="KafkaBrokerList", username="KafkaUsername", password="KafkaPassword", protocol="SaslSsl", authentication_mode="Plain")
def kafka_output_many_with_headers(req: func.HttpRequest, out: func.Out[str]) -> func.HttpResponse:
kevent = [{ "Offset": 364, "Partition":0,"Topic":"kafkaeventhubtest1","Timestamp":"2022-04-09T03:20:06.591Z", "Value": "one", "Headers": [{ "Key": "test", "Value": "python" }] },
W poniższych przykładach pokazano, jak wysłać komunikat zdarzenia z nagłówkami do tematu platformy Kafka:
Pełny zestaw działających przykładów języka Python można znaleźć w repozytorium rozszerzeń platformy Kafka.
Adnotacje używane do konfigurowania powiązania wyjściowego zależą od określonego dostawcy zdarzeń.
Poniższa funkcja wysyła komunikat do tematu platformy Kafka.
@FunctionName("KafkaOutput")
public HttpResponseMessage run(
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
@KafkaOutput(
name = "kafkaOutput",
topic = "topic",
brokerList="%BrokerList%",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
protocol = BrokerProtocol.SASLSSL
) OutputBinding<String> output,
final ExecutionContext context) {
context.getLogger().info("Java HTTP trigger processed a request.");
// Parse query parameter
String query = request.getQueryParameters().get("message");
String message = request.getBody().orElse(query);
context.getLogger().info("Message:" + message);
output.setValue(message);
return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
W poniższym przykładzie pokazano, jak wysyłać wiele komunikatów do tematu platformy Kafka.
@FunctionName("KafkaOutputMany")
public HttpResponseMessage run(
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
@KafkaOutput(
name = "kafkaOutput",
topic = "topic",
brokerList="%BrokerList%",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
protocol = BrokerProtocol.SASLSSL
) OutputBinding<String[]> output,
final ExecutionContext context) {
context.getLogger().info("Java HTTP trigger processed a request.");
String[] messages = new String[2];
messages[0] = "one";
messages[1] = "two";
output.setValue(messages);
return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
}
W tym przykładzie parametr powiązania wyjściowego został zmieniony na tablicę ciągów.
W ostatnim przykładzie użyto następujących KafkaEntity klas i KafkaHeader :
public class KafkaEntity {
public int Offset;
public int Partition;
public String Timestamp;
public String Topic;
public String Value;
public KafkaHeaders Headers[];
public KafkaEntity(int Offset, int Partition, String Topic, String Timestamp, String Value,KafkaHeaders[] headers) {
this.Offset = Offset;
this.Partition = Partition;
this.Topic = Topic;
this.Timestamp = Timestamp;
this.Value = Value;
this.Headers = headers;
}
public class KafkaHeaders{
public String Key;
public String Value;
public KafkaHeaders(String key, String value) {
this.Key = key;
this.Value = value;
}
Poniższa przykładowa funkcja wysyła komunikat z nagłówkami do tematu platformy Kafka.
@FunctionName("KafkaOutputWithHeaders")
public HttpResponseMessage run(
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
@KafkaOutput(
name = "kafkaOutput",
topic = "topic",
brokerList="%BrokerList%",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
protocol = BrokerProtocol.SASLSSL
) OutputBinding<KafkaEntity> output,
final ExecutionContext context) {
context.getLogger().info("Java HTTP trigger processed a request.");
// Parse query parameter
String query = request.getQueryParameters().get("message");
String message = request.getBody().orElse(query);
KafkaHeaders[] headers = new KafkaHeaders[1];
headers[0] = new KafkaHeaders("test", "java");
KafkaEntity kevent = new KafkaEntity(364, 0, "topic", "2022-04-09T03:20:06.591Z", message, headers);
output.setValue(kevent);
return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
}
Pełny zestaw działających przykładów języka Java dla platformy Confluent można znaleźć w repozytorium rozszerzeń platformy Kafka.
Atrybuty
Biblioteki języka C# procesu roboczego w procesie przetwarzania procesów przetwarzania procesów procesów przetwarzania w procesie przetwarzania izolowanegoużywają atrybutu Kafka do zdefiniowania wyzwalacza funkcji.
W poniższej tabeli opisano właściwości, które można ustawić przy użyciu tego atrybutu:
| Parametr | Opis |
|---|---|
| Lista brokerów | (Wymagane) Lista brokerów platformy Kafka, do których są wysyłane dane wyjściowe. Aby uzyskać więcej informacji, zobacz Połączenia . |
| Temat | (Wymagane) Temat, do którego są wysyłane dane wyjściowe. |
| AvroSchema | (Opcjonalnie) Schemat ogólnego rekordu wartości komunikatu podczas korzystania z protokołu Avro. |
| KeyAvroSchema | (Opcjonalnie) Schemat ogólnego rekordu klucza komunikatu podczas korzystania z protokołu Avro. |
| KeyDataType | (Opcjonalnie) Typ danych, aby wysłać klucz komunikatu do tematu platformy Kafka. Jeśli KeyAvroSchema jest ustawiona, ta wartość jest rekordem ogólnym. Akceptowane wartości to Int, , StringLongi Binary. |
| MaxMessageBytes | (Opcjonalnie) Maksymalny rozmiar wysyłanego komunikatu wyjściowego (w MB) z wartością 1domyślną . |
| BatchSize | (Opcjonalnie) Maksymalna liczba komunikatów wsadowych w jednym zestawie komunikatów z wartością 10000domyślną . |
| EnableIdempotence | (Opcjonalnie) W przypadku ustawienia opcji truegwarantuje, że komunikaty są generowane dokładnie raz i w oryginalnej kolejności produkcji z wartością domyślną false |
| MessageTimeoutMs | (Opcjonalnie) Limit czasu komunikatu lokalnego( w milisekundach). Ta wartość jest wymuszana tylko lokalnie i ogranicza czas oczekiwania wygenerowanego komunikatu na pomyślne dostarczenie z domyślną wartością 300000. Czas 0 jest nieskończony. Ta wartość to maksymalny czas używany do dostarczenia komunikatu (w tym ponownych prób). Błąd dostarczania występuje, gdy liczba ponownych prób lub przekroczenie limitu czasu komunikatu. |
| RequestTimeoutMs | (Opcjonalnie) Limit czasu potwierdzenia żądania wyjściowego (w milisekundach) z wartością domyślną 5000. |
| Maksymalna liczba ponownych prób | (Opcjonalnie) Liczba ponownych prób wysłania komunikatu zakończonego niepowodzeniem z wartością domyślną 2. Ponawianie próby może spowodować zmiana kolejności, chyba że EnableIdempotence ustawiono wartość true. |
| AuthenticationMode | (Opcjonalnie) Tryb uwierzytelniania podczas korzystania z uwierzytelniania prostego i warstwy zabezpieczeń (SASL). Obsługiwane wartości to NotSet (wartość domyślna), Gssapi, , PlainScramSha256, ScramSha512i OAuthBearer. |
| Nazwa użytkownika | (Opcjonalnie) Nazwa użytkownika uwierzytelniania SASL. Nieobsługiwane, gdy AuthenticationMode ma wartość Gssapi. Aby uzyskać więcej informacji, zobacz Połączenia . |
| Hasło | (Opcjonalnie) Hasło do uwierzytelniania SASL. Nieobsługiwane, gdy AuthenticationMode ma wartość Gssapi. Aby uzyskać więcej informacji, zobacz Połączenia . |
| Protokół | (Opcjonalnie) Protokół zabezpieczeń używany podczas komunikacji z brokerami. Obsługiwane wartości to NotSet (wartość domyślna), plaintext, ssl, sasl_plaintext, sasl_ssl. |
| SslCaLocation | (Opcjonalnie) Ścieżka do pliku certyfikatu urzędu certyfikacji na potrzeby weryfikowania certyfikatu brokera. |
| SslCertificateLocation | (Opcjonalnie) Ścieżka do certyfikatu klienta. |
| SslKeyLocation | (Opcjonalnie) Ścieżka do klucza prywatnego klienta (PEM) używanego do uwierzytelniania. |
| SslKeyPassword | (Opcjonalnie) Hasło do certyfikatu klienta. |
| SslCertificatePEM | (Opcjonalnie) Certyfikat klienta w formacie PEM jako ciąg. Aby uzyskać więcej informacji, zobacz Połączenia . |
| SslKeyPEM | (Opcjonalnie) Klucz prywatny klienta w formacie PEM jako ciąg. Aby uzyskać więcej informacji, zobacz Połączenia . |
| SslCaPEM | (Opcjonalnie) Certyfikat urzędu certyfikacji w formacie PEM jako ciąg. Aby uzyskać więcej informacji, zobacz Połączenia . |
| SslCertificateandKeyPEM | (Opcjonalnie) Certyfikat klienta i klucz w formacie PEM jako ciąg. Aby uzyskać więcej informacji, zobacz Połączenia . |
| SchemaRegistryUrl | (Opcjonalnie) Adres URL rejestru schematów Avro. Aby uzyskać więcej informacji, zobacz Połączenia . |
| SchemaRegistryUsername | (Opcjonalnie) Nazwa użytkownika rejestru schematów Avro. Aby uzyskać więcej informacji, zobacz Połączenia . |
| SchemaRegistryPassword | (Opcjonalnie) Hasło rejestru schematów Avro. Aby uzyskać więcej informacji, zobacz Połączenia . |
| OAuthBearerMethod | (Opcjonalnie) OAuth Bearer, metoda. Akceptowane wartości to oidc i default. |
| OAuthBearerClientId | (Opcjonalnie) Gdy OAuthBearerMethod jest ustawiona wartość oidc, określa identyfikator klienta elementu nośnego OAuth. Aby uzyskać więcej informacji, zobacz Połączenia . |
| OAuthBearerClientSecret | (Opcjonalnie) Gdy OAuthBearerMethod jest ustawiona wartość oidc, określa klucz tajny klienta elementu nośnego OAuth. Aby uzyskać więcej informacji, zobacz Połączenia . |
| OAuthBearerScope | (Opcjonalnie) Określa zakres żądania dostępu do brokera. |
| OAuthBearerTokenEndpointUrl | (Opcjonalnie) Identyfikator URI tokenu wystawcy OAuth/OIDC używany do pobierania tokenu podczas oidc użycia metody. Aby uzyskać więcej informacji, zobacz Połączenia . |
| OAuthBearerExtensions | (Opcjonalnie) Rozdzielona przecinkami lista par key=value, które mają być udostępniane jako dodatkowe informacje do brokera w przypadku oidc użycia metody. Na przykład: supportFeatureX=true,organizationId=sales-emea. |
Adnotacje
Adnotacja KafkaOutput umożliwia utworzenie funkcji, która zapisuje w określonym temacie. Obsługiwane opcje obejmują następujące elementy:
| Element | Opis |
|---|---|
| name | Nazwa zmiennej reprezentującej dane obsługiwane przez brokera w kodzie funkcji. |
| brokerList | (Wymagane) Lista brokerów platformy Kafka, do których są wysyłane dane wyjściowe. Aby uzyskać więcej informacji, zobacz Połączenia . |
| topic | (Wymagane) Temat, do którego są wysyłane dane wyjściowe. |
| Datatype | Definiuje sposób obsługi wartości parametru przez funkcję Functions. Domyślnie wartość jest uzyskiwana jako ciąg, a usługa Functions próbuje wykonać deserializacji ciągu do rzeczywistego zwykłego obiektu Java (POJO). Gdy stringelement wejściowy jest traktowany jako tylko ciąg. Gdy binarykomunikat zostanie odebrany jako dane binarne, a usługa Functions próbuje wykonać deserializacji go do rzeczywistego bajtu typu parametru[]. |
| avroSchema | (Opcjonalnie) Schemat rekordu ogólnego w przypadku korzystania z protokołu Avro. (Obecnie nieobsługiwane dla języka Java). |
| maxMessageBytes | (Opcjonalnie) Maksymalny rozmiar wysyłanego komunikatu wyjściowego (w MB) z wartością 1domyślną . |
| batchSize | (Opcjonalnie) Maksymalna liczba komunikatów wsadowych w jednym zestawie komunikatów z wartością 10000domyślną . |
| enableIdempotence | (Opcjonalnie) W przypadku ustawienia opcji truegwarantuje, że komunikaty są generowane dokładnie raz i w oryginalnej kolejności produkcji z wartością falsedomyślną . |
| messageTimeoutMs | (Opcjonalnie) Limit czasu komunikatu lokalnego( w milisekundach). Ta wartość jest wymuszana tylko lokalnie i ogranicza czas oczekiwania wygenerowanego komunikatu na pomyślne dostarczenie z domyślną wartością 300000. Czas 0 jest nieskończony. Ta wartość to maksymalny czas używany do dostarczenia komunikatu (w tym ponownych prób). Błąd dostarczania występuje, gdy liczba ponownych prób lub przekroczenie limitu czasu komunikatu. |
| requestTimeoutMs | (Opcjonalnie) Limit czasu potwierdzenia żądania wyjściowego (w milisekundach) z wartością domyślną 5000. |
| maxRetries | (Opcjonalnie) Liczba ponownych prób wysłania komunikatu zakończonego niepowodzeniem z wartością domyślną 2. Ponawianie próby może spowodować zmiana kolejności, chyba że EnableIdempotence ustawiono wartość true. |
| authenticationMode | (Opcjonalnie) Tryb uwierzytelniania podczas korzystania z uwierzytelniania prostego i warstwy zabezpieczeń (SASL). Obsługiwane wartości to NotSet (wartość domyślna), Gssapi, Plain, ScramSha256, ScramSha512. |
| nazwa użytkownika | (Opcjonalnie) Nazwa użytkownika uwierzytelniania SASL. Nieobsługiwane, gdy AuthenticationMode ma wartość Gssapi. Aby uzyskać więcej informacji, zobacz Połączenia . |
| hasło | (Opcjonalnie) Hasło do uwierzytelniania SASL. Nieobsługiwane, gdy AuthenticationMode ma wartość Gssapi. Aby uzyskać więcej informacji, zobacz Połączenia . |
| protokół | (Opcjonalnie) Protokół zabezpieczeń używany podczas komunikacji z brokerami. Obsługiwane wartości to NotSet (wartość domyślna), plaintext, ssl, sasl_plaintext, sasl_ssl. |
| sslCaLocation | (Opcjonalnie) Ścieżka do pliku certyfikatu urzędu certyfikacji na potrzeby weryfikowania certyfikatu brokera. |
| sslCertificateLocation | (Opcjonalnie) Ścieżka do certyfikatu klienta. |
| sslKeyLocation | (Opcjonalnie) Ścieżka do klucza prywatnego klienta (PEM) używanego do uwierzytelniania. |
| sslKeyPassword | (Opcjonalnie) Hasło do certyfikatu klienta. |
| schemaRegistryUrl | (Opcjonalnie) Adres URL rejestru schematów Avro. Aby uzyskać więcej informacji, zobacz Połączenia . |
| schemaRegistryUsername | (Opcjonalnie) Nazwa użytkownika rejestru schematów Avro. Aby uzyskać więcej informacji, zobacz Połączenia . |
| schemaRegistryPassword | (Opcjonalnie) Hasło rejestru schematów Avro. Aby uzyskać więcej informacji, zobacz Połączenia . |
Konfigurowanie
W poniższej tabeli opisano właściwości konfiguracji powiązania ustawione w pliku function.json .
| właściwość function.json | Opis |
|---|---|
| type | Ustaw wartość kafka. |
| direction | Ustaw wartość out. |
| name | Nazwa zmiennej reprezentującej dane obsługiwane przez brokera w kodzie funkcji. |
| brokerList | (Wymagane) Lista brokerów platformy Kafka, do których są wysyłane dane wyjściowe. Aby uzyskać więcej informacji, zobacz Połączenia . |
| topic | (Wymagane) Temat, do którego są wysyłane dane wyjściowe. |
| avroSchema | (Opcjonalnie) Schemat rekordu ogólnego w przypadku korzystania z protokołu Avro. |
| keyAvroSchema | (Opcjonalnie) Schemat ogólnego rekordu klucza komunikatu podczas korzystania z protokołu Avro. |
| keyDataType | (Opcjonalnie) Typ danych, aby wysłać klucz komunikatu do tematu platformy Kafka. Jeśli keyAvroSchema jest ustawiona, ta wartość jest rekordem ogólnym. Akceptowane wartości to Int, , StringLongi Binary. |
| maxMessageBytes | (Opcjonalnie) Maksymalny rozmiar wysyłanego komunikatu wyjściowego (w MB) z wartością 1domyślną . |
| batchSize | (Opcjonalnie) Maksymalna liczba komunikatów wsadowych w jednym zestawie komunikatów z wartością 10000domyślną . |
| enableIdempotence | (Opcjonalnie) W przypadku ustawienia opcji truegwarantuje, że komunikaty są generowane dokładnie raz i w oryginalnej kolejności produkcji z wartością falsedomyślną . |
| messageTimeoutMs | (Opcjonalnie) Limit czasu komunikatu lokalnego( w milisekundach). Ta wartość jest wymuszana tylko lokalnie i ogranicza czas oczekiwania wygenerowanego komunikatu na pomyślne dostarczenie z domyślną wartością 300000. Czas 0 jest nieskończony. Ta wartość to maksymalny czas używany do dostarczenia komunikatu (w tym ponownych prób). Błąd dostarczania występuje, gdy liczba ponownych prób lub przekroczenie limitu czasu komunikatu. |
| requestTimeoutMs | (Opcjonalnie) Limit czasu potwierdzenia żądania wyjściowego (w milisekundach) z wartością domyślną 5000. |
| maxRetries | (Opcjonalnie) Liczba ponownych prób wysłania komunikatu zakończonego niepowodzeniem z wartością domyślną 2. Ponawianie próby może spowodować zmiana kolejności, chyba że EnableIdempotence ustawiono wartość true. |
| authenticationMode | (Opcjonalnie) Tryb uwierzytelniania podczas korzystania z uwierzytelniania prostego i warstwy zabezpieczeń (SASL). Obsługiwane wartości to NotSet (wartość domyślna), Gssapi, Plain, ScramSha256, ScramSha512. |
| nazwa użytkownika | (Opcjonalnie) Nazwa użytkownika uwierzytelniania SASL. Nieobsługiwane, gdy AuthenticationMode ma wartość Gssapi. Aby uzyskać więcej informacji, zobacz Połączenia . |
| hasło | (Opcjonalnie) Hasło do uwierzytelniania SASL. Nieobsługiwane, gdy AuthenticationMode ma wartość Gssapi. Aby uzyskać więcej informacji, zobacz Połączenia . |
| protokół | (Opcjonalnie) Protokół zabezpieczeń używany podczas komunikacji z brokerami. Obsługiwane wartości to NotSet (wartość domyślna), plaintext, ssl, sasl_plaintext, sasl_ssl. |
| sslCaLocation | (Opcjonalnie) Ścieżka do pliku certyfikatu urzędu certyfikacji na potrzeby weryfikowania certyfikatu brokera. |
| sslCertificateLocation | (Opcjonalnie) Ścieżka do certyfikatu klienta. |
| sslKeyLocation | (Opcjonalnie) Ścieżka do klucza prywatnego klienta (PEM) używanego do uwierzytelniania. |
| sslKeyPassword | (Opcjonalnie) Hasło do certyfikatu klienta. |
| sslCertificatePEM | (Opcjonalnie) Certyfikat klienta w formacie PEM jako ciąg. Aby uzyskać więcej informacji, zobacz Połączenia . |
| sslKeyPEM | (Opcjonalnie) Klucz prywatny klienta w formacie PEM jako ciąg. Aby uzyskać więcej informacji, zobacz Połączenia . |
| sslCaPEM | (Opcjonalnie) Certyfikat urzędu certyfikacji w formacie PEM jako ciąg. Aby uzyskać więcej informacji, zobacz Połączenia . |
| sslCertificateandKeyPEM | (Opcjonalnie) Certyfikat klienta i klucz w formacie PEM jako ciąg. Aby uzyskać więcej informacji, zobacz Połączenia . |
| schemaRegistryUrl | (Opcjonalnie) Adres URL rejestru schematów Avro. Aby uzyskać więcej informacji, zobacz Połączenia . |
| schemaRegistryUsername | (Opcjonalnie) Nazwa użytkownika rejestru schematów Avro. Aby uzyskać więcej informacji, zobacz Połączenia . |
| schemaRegistryPassword | (Opcjonalnie) Hasło rejestru schematów Avro. Aby uzyskać więcej informacji, zobacz Połączenia . |
| oAuthBearerMethod | (Opcjonalnie) OAuth Bearer, metoda. Akceptowane wartości to oidc i default. |
| oAuthBearerClientId | (Opcjonalnie) Gdy oAuthBearerMethod jest ustawiona wartość oidc, określa identyfikator klienta elementu nośnego OAuth. Aby uzyskać więcej informacji, zobacz Połączenia . |
| oAuthBearerClientSecret | (Opcjonalnie) Gdy oAuthBearerMethod jest ustawiona wartość oidc, określa klucz tajny klienta elementu nośnego OAuth. Aby uzyskać więcej informacji, zobacz Połączenia . |
| oAuthBearerScope | (Opcjonalnie) Określa zakres żądania dostępu do brokera. |
| oAuthBearerTokenEndpointUrl | (Opcjonalnie) Identyfikator URI tokenu wystawcy OAuth/OIDC używany do pobierania tokenu podczas oidc użycia metody. Aby uzyskać więcej informacji, zobacz Połączenia . |
Konfigurowanie
W poniższej tabeli opisano właściwości konfiguracji powiązania ustawione w pliku function.json . Język Python używa snake_case konwencji nazewnictwa dla właściwości konfiguracji.
| właściwość function.json | Opis |
|---|---|
| type | Ustaw wartość kafka. |
| direction | Ustaw wartość out. |
| name | Nazwa zmiennej reprezentującej dane obsługiwane przez brokera w kodzie funkcji. |
| broker_list | (Wymagane) Lista brokerów platformy Kafka, do których są wysyłane dane wyjściowe. Aby uzyskać więcej informacji, zobacz Połączenia . |
| topic | (Wymagane) Temat, do którego są wysyłane dane wyjściowe. |
| avroSchema | (Opcjonalnie) Schemat rekordu ogólnego w przypadku korzystania z protokołu Avro. |
| maxMessageBytes | (Opcjonalnie) Maksymalny rozmiar wysyłanego komunikatu wyjściowego (w MB) z wartością 1domyślną . |
| batchSize | (Opcjonalnie) Maksymalna liczba komunikatów wsadowych w jednym zestawie komunikatów z wartością 10000domyślną . |
| enableIdempotence | (Opcjonalnie) W przypadku ustawienia opcji truegwarantuje, że komunikaty są generowane dokładnie raz i w oryginalnej kolejności produkcji z wartością falsedomyślną . |
| messageTimeoutMs | (Opcjonalnie) Limit czasu komunikatu lokalnego( w milisekundach). Ta wartość jest wymuszana tylko lokalnie i ogranicza czas oczekiwania wygenerowanego komunikatu na pomyślne dostarczenie z domyślną wartością 300000. Czas 0 jest nieskończony. Ta wartość to maksymalny czas używany do dostarczenia komunikatu (w tym ponownych prób). Błąd dostarczania występuje, gdy liczba ponownych prób lub przekroczenie limitu czasu komunikatu. |
| requestTimeoutMs | (Opcjonalnie) Limit czasu potwierdzenia żądania wyjściowego (w milisekundach) z wartością domyślną 5000. |
| maxRetries | (Opcjonalnie) Liczba ponownych prób wysłania komunikatu zakończonego niepowodzeniem z wartością domyślną 2. Ponawianie próby może spowodować zmiana kolejności, chyba że EnableIdempotence ustawiono wartość true. |
| authentication_mode | (Opcjonalnie) Tryb uwierzytelniania podczas korzystania z uwierzytelniania prostego i warstwy zabezpieczeń (SASL). Obsługiwane wartości to NOTSET (wartość domyślna), Gssapi, Plain, ScramSha256, ScramSha512. |
| nazwa użytkownika | (Opcjonalnie) Nazwa użytkownika uwierzytelniania SASL. Nieobsługiwane, gdy authentication_mode ma wartość Gssapi. Aby uzyskać więcej informacji, zobacz Połączenia . |
| hasło | (Opcjonalnie) Hasło do uwierzytelniania SASL. Nieobsługiwane, gdy authentication_mode ma wartość Gssapi. Aby uzyskać więcej informacji, zobacz Połączenia . |
| protokół | (Opcjonalnie) Protokół zabezpieczeń używany podczas komunikacji z brokerami. Obsługiwane wartości to NOTSET (wartość domyślna), plaintext, ssl, sasl_plaintext, sasl_ssl. |
| sslCaLocation | (Opcjonalnie) Ścieżka do pliku certyfikatu urzędu certyfikacji na potrzeby weryfikowania certyfikatu brokera. |
| sslCertificateLocation | (Opcjonalnie) Ścieżka do certyfikatu klienta. |
| sslKeyLocation | (Opcjonalnie) Ścieżka do klucza prywatnego klienta (PEM) używanego do uwierzytelniania. |
| sslKeyPassword | (Opcjonalnie) Hasło do certyfikatu klienta. |
| schema_registry_url | (Opcjonalnie) Adres URL rejestru schematów Avro. Aby uzyskać więcej informacji, zobacz Połączenia . |
| schema_registry_username | (Opcjonalnie) Nazwa użytkownika rejestru schematów Avro. Aby uzyskać więcej informacji, zobacz Połączenia . |
| schema_registry_password | (Opcjonalnie) Hasło rejestru schematów Avro. Aby uzyskać więcej informacji, zobacz Połączenia . |
| o_auth_bearer_method | (Opcjonalnie) OAuth Bearer, metoda. Akceptowane wartości to oidc i default. |
| o_auth_bearer_client_id | (Opcjonalnie) Gdy o_auth_bearer_method jest ustawiona wartość oidc, określa identyfikator klienta elementu nośnego OAuth. Aby uzyskać więcej informacji, zobacz Połączenia . |
| o_auth_bearer_client_secret | (Opcjonalnie) Gdy o_auth_bearer_method jest ustawiona wartość oidc, określa klucz tajny klienta elementu nośnego OAuth. Aby uzyskać więcej informacji, zobacz Połączenia . |
| o_auth_bearer_scope | (Opcjonalnie) Określa zakres żądania dostępu do brokera. |
| o_auth_bearer_token_endpoint_url | (Opcjonalnie) Identyfikator URI tokenu wystawcy OAuth/OIDC używany do pobierania tokenu podczas oidc użycia metody. Aby uzyskać więcej informacji, zobacz Połączenia . |
Uwaga
Właściwości związane z certyfikatem PEM i właściwości związane z kluczami Avro nie są jeszcze dostępne w bibliotece języka Python.
Użycie
Przesunięcie, partycja i sygnatura czasowa zdarzenia są generowane w czasie wykonywania. Można ustawić tylko wartość i nagłówki wewnątrz funkcji. Temat należy ustawić w pliku function.json.
Upewnij się, że masz dostęp do tematu platformy Kafka, w którym chcesz napisać. Powiązanie należy skonfigurować przy użyciu poświadczeń dostępu i połączenia do tematu platformy Kafka.
W planie Premium należy włączyć monitorowanie skalowania w czasie wykonywania dla danych wyjściowych platformy Kafka w celu skalowania w poziomie do wielu wystąpień. Aby dowiedzieć się więcej, zobacz Włączanie skalowania środowiska uruchomieniowego.
Aby uzyskać pełny zestaw obsługiwanych ustawień host.json wyzwalacza platformy Kafka, zobacz host.json ustawienia.
Połączenia
Przechowuj wszystkie informacje o połączeniu wymagane przez wyzwalacze i powiązania w ustawieniach aplikacji, a nie w definicjach powiązań w kodzie. Te wskazówki dotyczą poświadczeń, których nigdy nie należy przechowywać w kodzie.
Ważne
Ustawienia poświadczeń muszą odwoływać się do ustawienia aplikacji. Nie należy zapisywać poświadczeń w kodzie ani plikach konfiguracji. W przypadku uruchamiania lokalnego użyj pliku local.settings.json dla poświadczeń i nie publikuj pliku local.settings.json.
Podczas nawiązywania połączenia z zarządzanym klastrem Kafka udostępnianym przez platformę Confluent na platformie Azure można użyć jednej z następujących metod uwierzytelniania.
Uwaga
W przypadku korzystania z planu Flex Consumption właściwości uwierzytelniania certyfikatu opartego na lokalizacji (SslCaLocation, SslCertificateLocation, SslKeyLocation) nie są obsługiwane. Zamiast tego użyj właściwości certyfikatu opartego na PEM (SslCaPEM, SslCertificatePEM, SslKeyPEM, SslCertificateandKeyPEM) lub przechowuj certyfikaty w usłudze Azure Key Vault.
Rejestr schematów
Aby korzystać z rejestru schematów udostępnianego przez platformę Confluent w rozszerzeniu platformy Kafka, ustaw następujące poświadczenia:
| Ustawienie | Zalecana wartość | Opis |
|---|---|---|
| SchemaRegistryUrl | SchemaRegistryUrl |
Adres URL usługi rejestru schematów używanej do zarządzania schematami. Zazwyczaj format https://psrc-xyz.us-east-2.aws.confluent.cloud |
| SchemaRegistryUsername | CONFLUENT_API_KEY |
Nazwa użytkownika podstawowego uwierzytelniania w rejestrze schematów (jeśli jest to wymagane). |
| SchemaRegistryPassword | CONFLUENT_API_SECRET |
Hasło do podstawowego uwierzytelniania w rejestrze schematów (jeśli jest to wymagane). |
Uwierzytelnianie nazwy użytkownika/hasła
Korzystając z tej formy uwierzytelniania, upewnij się, że Protocol ustawiono ScramSha256ScramSha512PlainSaslPlaintext wartość lub AuthenticationModeSaslSsl, a jeśli używany certyfikat urzędu certyfikacji różni się od domyślnego certyfikatu ISRG Root X1, pamiętaj, aby zaktualizować SslCaLocation lub .SslCaPEM
| Ustawienie | Zalecana wartość | Opis |
|---|---|---|
| Lista brokerów | BootstrapServer |
Ustawienie aplikacji o nazwie BootstrapServer zawiera wartość serwera bootstrap znalezionego na stronie ustawień chmury Confluent. Wartość przypomina xyz-xyzxzy.westeurope.azure.confluent.cloud:9092. |
| Nazwa użytkownika | ConfluentCloudUsername |
Ustawienie aplikacji o nazwie ConfluentCloudUsername zawiera klucz dostępu interfejsu API z witryny internetowej Confluent Cloud. |
| Hasło | ConfluentCloudPassword |
Ustawienie aplikacji o nazwie ConfluentCloudPassword zawiera wpis tajny interfejsu API uzyskany z witryny internetowej platformy Confluent Cloud. |
| SslCaPEM | SSLCaPemCertificate |
Ustawienie aplikacji o nazwie SSLCaPemCertificate zawierającej certyfikat urzędu certyfikacji jako ciąg w formacie PEM. Wartość powinna być zgodna ze standardowym formatem, na przykład: -----BEGIN CERTIFICATE-----\nMII....JQ==\n-----END CERTIFICATE-----. |
Uwierzytelnianie SSL
Upewnij się, że Protocol jest ustawione na SSL.
| Ustawienie | Zalecana wartość | Opis |
|---|---|---|
| Lista brokerów | BootstrapServer |
Ustawienie aplikacji o nazwie BootstrapServer zawiera wartość serwera bootstrap znalezionego na stronie ustawień chmury Confluent. Wartość przypomina xyz-xyzxzy.westeurope.azure.confluent.cloud:9092. |
| SslCaPEM | SslCaCertificatePem |
Ustawienie aplikacji o nazwie SslCaCertificatePem zawierające wartość PEM certyfikatu urzędu certyfikacji jako ciąg. Wartość powinna być zgodna ze standardowym formatem: -----BEGIN CERTIFICATE-----\nMII...JQ==\n-----END CERTIFICATE----- |
| SslCertificatePEM | SslClientCertificatePem |
Ustawienie aplikacji o nazwie SslClientCertificatePem zawierające wartość PEM certyfikatu klienta jako ciąg. Wartość powinna być zgodna ze standardowym formatem: -----BEGIN CERTIFICATE-----\nMII...JQ==\n-----END CERTIFICATE----- |
| SslKeyPEM | SslClientKeyPem |
Ustawienie aplikacji o nazwie SslClientKeyPem zawierające wartość PEM klucza prywatnego klienta jako ciąg. Wartość powinna być zgodna ze standardowym formatem: -----BEGIN PRIVATE KEY-----\nMII...JQ==\n-----END PRIVATE KEY----- |
| SslCertificateandKeyPEM | SslClientCertificateAndKeyPem |
Ustawienie aplikacji o nazwie SslClientCertificateAndKeyPem zawierające wartość PEM certyfikatu klienta i klucza prywatnego klienta łączone jako ciąg. Wartość powinna być zgodna ze standardowym formatem: -----BEGIN CERTIFICATE-----\nMII....JQ==\n-----END CERTIFICATE-----\n-----BEGIN PRIVATE KEY-----\nMIIE....BM=\n-----END PRIVATE KEY----- |
| SslKeyPassword | SslClientKeyPassword |
Ustawienie aplikacji o nazwie SslClientKeyPassword zawierające hasło klucza prywatnego (jeśli istnieje). |
Uwierzytelnianie OAuth
W przypadku korzystania z uwierzytelniania OAuth skonfiguruj właściwości związane z protokołem OAuth w definicjach powiązań.
Wartości ciągu używane dla tych ustawień muszą być obecne jako ustawienia aplikacji na platformie Azure lub w Values kolekcji w pliku local.settings.json podczas programowania lokalnego.
Należy również ustawić Protocol wartości i AuthenticationMode w definicjach powiązań.