Delen via


Azure Functions integreren met Azure Data Explorer met behulp van invoer- en uitvoerbindingen (preview)

Belangrijk

Deze connector kan worden gebruikt in Real-Time Intelligence in Microsoft Fabric. Gebruik de instructies in dit artikel met de volgende uitzonderingen:

Met Azure Functions kunt u serverloze code uitvoeren in de cloud volgens een planning of als reactie op een gebeurtenis. Met Azure Data Explorer-invoer- en uitvoerbindingen voor Azure Functions kunt u Azure Data Explorer integreren in uw werkstromen om gegevens op te nemen en query's uit te voeren op uw cluster.

Vereiste voorwaarden

De integratie uitproberen met ons voorbeeldproject

Azure Data Explorer-bindingen gebruiken voor Azure Functions

Zie de volgende onderwerpen voor informatie over het gebruik van Azure Data Explorer-bindingen voor Azure Functions:

Scenario's voor het gebruik van Azure Data Explorer-bindingen voor Azure Functions

In de volgende secties worden enkele veelvoorkomende scenario's beschreven voor het gebruik van Azure Data Explorer-bindingen voor Azure Functions.

Invoerbindingen

Invoerbindingen voeren een KQL-query (Kusto Query Language) of KQL-functie uit, eventueel met parameters en retourneert de uitvoer naar de functie.

In de volgende secties wordt beschreven hoe u invoerbindingen gebruikt in enkele veelvoorkomende scenario's.

Scenario 1: Een HTTP-eindpunt voor het opvragen van gegevens uit een cluster

Het gebruik van invoerbindingen is van toepassing in situaties waarin u Azure Data Explorer-gegevens beschikbaar moet maken via een REST API. In dit scenario gebruikt u een HTTP-trigger van Azure Functions om query's uit te voeren op gegevens in uw cluster. Het scenario is met name handig in situaties waarin u programmatische toegang moet bieden tot Azure Data Explorer-gegevens voor externe toepassingen of services. Door uw gegevens beschikbaar te maken via een REST API, kunnen toepassingen de gegevens gemakkelijk gebruiken zonder dat ze rechtstreeks verbinding hoeven te maken met uw cluster.

De code definieert een functie met een HTTP-trigger en een Azure Data Explorer-invoerbinding. De invoerbinding geeft de query op die moet worden uitgevoerd op de tabel Producten in de productsdb-database . De functie gebruikt de kolom product-id als het predicaat dat als parameter wordt doorgegeven.

{
    [FunctionName("GetProduct")]
    public static async Task<IActionResult> RunAsync(
        [HttpTrigger(AuthorizationLevel.User, "get", Route = "getproducts/{productId}")]
        HttpRequest req,
        [Kusto(Database:"productsdb" ,
        KqlCommand = "declare query_parameters (productId:long);Products | where ProductID == productId" ,
        KqlParameters = "@productId={productId}",
        Connection = "KustoConnectionString")]
        IAsyncEnumerable<Product> products)
    {
        IAsyncEnumerator<Product> enumerator = products.GetAsyncEnumerator();
        var productList = new List<Product>();
        while (await enumerator.MoveNextAsync())
        {
            productList.Add(enumerator.Current);
        }
        await enumerator.DisposeAsync();
        return new OkObjectResult(productList);
    }
}

De functie kan vervolgens als volgt worden aangeroepen:

curl https://myfunctionapp.azurewebsites.net/api/getproducts/1

Scenario 2: Een geplande trigger voor het exporteren van gegevens uit een cluster

Het volgende scenario is van toepassing in situaties waarin gegevens moeten worden geëxporteerd volgens een tijdschema.

De code definieert een functie met een timertrigger waarmee een aggregatie van verkoopgegevens uit de productsdb-database wordt geëxporteerd naar een CSV-bestand in Azure Blob Storage.

public static async Task Run([TimerTrigger("0 0 1 * * *")] TimerInfo myTimer,
    [Kusto(ConnectionStringSetting = "KustoConnectionString",
            DatabaseName = "productsdb",
            Query = "ProductSales | where OrderDate >= ago(1d) | summarize Sales = sum(ProductSales) by ProductName | top 10 by Sales desc")] IEnumerable<dynamic> queryResults,
[Blob("salescontainer/productsblob.csv", FileAccess.Write, Connection = "BlobStorageConnection")] CloudBlockBlob outputBlob,
ILogger log)
{
    // Write the query results to a CSV file
    using (var stream = new MemoryStream())
    using (var writer = new StreamWriter(stream))
    using (var csv = new CsvWriter(writer, CultureInfo.InvariantCulture))
    {
        csv.WriteRecords(queryResults);
        writer.Flush();
        stream.Position = 0;
        await outputBlob.UploadFromStreamAsync(stream);
    }
}

Uitvoerbindingen

Uitvoerbindingen gebruiken een of meer rijen en voegen deze in een Azure Data Explorer-tabel in.

In de volgende secties wordt beschreven hoe u uitvoerbindingen gebruikt in enkele veelvoorkomende scenario's.

Scenario 1: HTTP-eindpunt voor het opnemen van gegevens in een cluster

Het volgende scenario is van toepassing in situaties waarin binnenkomende HTTP-aanvragen moeten worden verwerkt en opgenomen in uw cluster. Met behulp van een uitvoerbinding kunnen binnenkomende gegevens uit de aanvraag worden weggeschreven naar Azure Data Explorer-tabellen.

De code definieert een functie met een HTTP-trigger en een Azure Data Explorer-uitvoerbinding. Deze functie neemt een JSON-payload in de inhoud van de HTTP-aanvraag en schrijft deze naar de productentabel in de database productsdb.

public static IActionResult Run(
    [HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "addproductuni")]
    HttpRequest req, ILogger log,
    [Kusto(Database:"productsdb" ,
    TableName ="products" ,
    Connection = "KustoConnectionString")] out Product product)
{
    log.LogInformation($"AddProduct function started");
    string body = new StreamReader(req.Body).ReadToEnd();
    product = JsonConvert.DeserializeObject<Product>(body);
    string productString = string.Format(CultureInfo.InvariantCulture, "(Name:{0} ID:{1} Cost:{2})",
                product.Name, product.ProductID, product.Cost);
    log.LogInformation("Ingested product {}", productString);
    return new CreatedResult($"/api/addproductuni", product);
}

De functie kan vervolgens als volgt worden aangeroepen:

curl -X POST https://myfunctionapp.azurewebsites.net/api/addproductuni -d '{"Name":"Product1","ProductID":1,"Cost":100,"ActivatedOn":"2023-01-02T00:00:00"}'

Scenario 2: Gegevens opnemen van RabbitMQ of andere berichtensystemen die worden ondersteund in Azure

Het volgende scenario is van toepassing in situaties waarin gegevens uit een berichtensysteem moeten worden opgenomen in uw cluster. Met behulp van een uitvoerbinding kunnen binnenkomende gegevens van het berichtensysteem worden opgenomen in Azure Data Explorer-tabellen.

De code definieert een functie die berichten en gegevens in JSON-indeling ontvangt, die binnenkomen via een RabbitMQ-trigger en worden opgenomen in de productentabel van de productsdb-database.

public class QueueTrigger
{
    [FunctionName("QueueTriggerBinding")]
    [return: Kusto(Database: "productsdb",
                TableName = "products",
                Connection = "KustoConnectionString")]
    public static Product Run(
        [RabbitMQTrigger(queueName: "bindings.products.queue", ConnectionStringSetting = "rabbitMQConnectionAppSetting")] Product product,
        ILogger log)
    {
        log.LogInformation($"Dequeued product {product.ProductID}");
        return product;
    }
}

Zie de documentatie van Azure Functions voor meer informatie over functies. De Azure Data Explorer-extensie is beschikbaar op: