你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

如何将基于队列的 Azure Functions 与 Microsoft Foundry 代理配合使用

注释

本文档引用 Microsoft Foundry (经典) 门户。

🔍 查看 Microsoft Foundry (new) 文档 ,了解新门户。

本文介绍如何使用基于队列的集成工具方法来使 Microsoft Foundry 代理能够访问部署到 Azure Functions 的代码。 在此方法中,代理通过 Azure 队列存储中的单独输入和输出消息队列在 Azure Functions 中异步访问工具代码。

通过 AzureFunctionsTool 提供的工具定义,Foundry 代理直接连接到由 Azure Functions 监视的输入队列。 当代理需要使用此 Azure Functions 托管工具时,它使用工具定义将消息放置在由 Azure Functions 中的函数应用监视的输入队列中。 Azure 存储队列触发器调用函数代码来处理消息,并通过输出队列绑定返回结果。 代理从输出队列中读取消息以继续聊天。

先决条件

  • 准备好的环境。 有关详细信息,请参阅 概述 文章。

注释

必须具有 具有标准设置的已部署代理。 不支持基本代理设置。

小窍门

可以在 GitHub 上找到完整的工作示例

定义代理要调用的函数

首先定义一个 Azure 队列触发器函数,该函数处理来自队列的函数调用。 例如:

app = func.FunctionApp()

@app.queue_trigger(arg_name="msg", queue_name="azure-function-foo-input", connection="STORAGE_CONNECTION")
@app.queue_output(arg_name="outputQueue", queue_name="azure-function-foo-output", connection="STORAGE_CONNECTION")  

def queue_trigger(inputQueue: func.QueueMessage, outputQueue: func.Out[str]):
    try:
        messagepayload = json.loads(inputQueue.get_body().decode("utf-8"))
        logging.info(f'The function receives the following message: {json.dumps(messagepayload)}')
        location = messagepayload["location"]
        weather_result = f"Weather is {len(location)} degrees and sunny in {location}"
        response_message = {
            "Value": weather_result,
            "CorrelationId": messagepayload["CorrelationId"]
        }
        logging.info(f'The function returns the following message through the {outputQueue} queue: {json.dumps(response_message)}')

        outputQueue.set(json.dumps(response_message))

    except Exception as e:
        logging.error(f"Error processing message: {e}")

配置 Azure 函数工具

首先,定义 Azure 函数工具,并指定其名称、说明、参数和存储队列配置。

import os
from azure.ai.projects import AIProjectClient
from azure.identity import DefaultAzureCredential
from azure.ai.agents.models import AzureFunctionStorageQueue, AzureFunctionTool

# Retrieve the storage service endpoint from environment variables
storage_service_endpoint = os.environ["STORAGE_SERVICE_ENDPONT"]

# Define the Azure Function tool
azure_function_tool = AzureFunctionTool(
    name="foo",  # Name of the tool
    description="Get answers from the foo bot.",  # Description of the tool's purpose
    parameters={  # Define the parameters required by the tool
        "type": "object",
        "properties": {
            "query": {"type": "string", "description": "The question to ask."},
            "outputqueueuri": {"type": "string", "description": "The full output queue URI."},
        },
    },
    input_queue=AzureFunctionStorageQueue(  # Input queue configuration
        queue_name="azure-function-foo-input",
        storage_service_endpoint=storage_service_endpoint,
    ),
    output_queue=AzureFunctionStorageQueue(  # Output queue configuration
        queue_name="azure-function-foo-output",
        storage_service_endpoint=storage_service_endpoint,
    ),
)

创建 AI 项目客户端和代理

接下来,创建 AI 项目客户端,然后创建代理,附加前面定义的 Azure Function 工具。

# Initialize the AIProjectClient
project_client = AIProjectClient(
    endpoint=os.environ["PROJECT_ENDPOINT"],
    credential=DefaultAzureCredential()
)
# Create an agent with the Azure Function tool
agent = project_client.agents.create_agent(
    model=os.environ["MODEL_DEPLOYMENT_NAME"],  # Model deployment name
    name="azure-function-agent-foo",  # Name of the agent
    instructions=(
        "You are a helpful support agent. Use the provided function any time the prompt contains the string "
        "'What would foo say?'. When you invoke the function, ALWAYS specify the output queue URI parameter as "
        f"'{storage_service_endpoint}/azure-function-tool-output'. Always respond with \"Foo says\" and then the response from the tool."
    ),
    tools=azure_function_tool.definitions,  # Attach the tool definitions to the agent
)
print(f"Created agent, agent ID: {agent.id}")

为代理创建一个线程

# Create a thread for communication
thread = project_client.agents.threads.create()
print(f"Created thread, thread ID: {thread.id}")

创建运行并检查输出

# Create a message in the thread
message = project_client.agents.messages.create(
    thread_id=thread.id,
    role="user",
    content="What is the most prevalent element in the universe? What would foo say?",  
)
print(f"Created message, message ID: {message['id']}")

# Create and process a run for the agent to handle the message
run = project_client.agents.runs.create_and_process(thread_id=thread.id, agent_id=agent.id)
print(f"Run finished with status: {run.status}")

# Check if the run failed
if run.status == "failed":
    print(f"Run failed: {run.last_error}")

获取运行的结果

# Retrieve and print all messages from the thread
messages = project_client.agents.messages.list(thread_id=thread.id)
for msg in messages:
    print(f"Role: {msg['role']}, Content: {msg['content']}")# Get messages from the assistant thread

# Get the last message from the assistant
last_msg = messages.get_last_text_message_by_sender("assistant")
if last_msg:
    print(f"Last Message: {last_msg.text.value}")

# Delete the agent once done
project_client.agents.delete_agent(agent.id)
print(f"Deleted agent")

对于 Python 代码的任何问题,请在示例代码存储库中创建问题

创建代理

在下面的示例中,我们将创建一个客户端和一个代理,该代理具有 Azure 函数的工具定义

请按照 REST API 快速入门 为环境变量 AGENT_TOKENAZURE_AI_FOUNDRY_PROJECT_ENDPOINTAPI_VERSION 设置正确的值。

curl --request POST \
  --url $AZURE_AI_FOUNDRY_PROJECT_ENDPOINT/assistants?api-version=$API_VERSION \
  -H "Authorization: Bearer $AGENT_TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "instructions": "You are a helpful support agent. Answer the user's questions to the best of your ability.",
    "name": "azure-function-agent-get-weather",
    "model": "gpt-4o-mini",
    "tools": [
      { 
        "type": "azure_function",
        "azure_function": {
            "function": {
                "name": "GetWeather",
                "description": "Get the weather in a location.",
                "parameters": {
                    "type": "object",
                    "properties": {
                        "location": {"type": "string", "description": "The location to look up."}
                    },
                    "required": ["location"]
                }
            },
            "input_binding": {
                "type": "storage_queue",
                "storage_queue": {
                    "queue_service_endpoint": "https://storageaccount.queue.core.windows.net",
                    "queue_name": "input"
                }
            },
            "output_binding": {
                "type": "storage_queue",
                "storage_queue": {
                    "queue_service_endpoint": "https://storageaccount.queue.core.windows.net",
                    "queue_name": "output"
                }
            }
        }
      }
    ]
  }'

为代理创建一个线程

curl --request POST \
  --url $AZURE_AI_FOUNDRY_PROJECT_ENDPOINT/threads?api-version=$API_VERSION \
  -H "Authorization: Bearer $AGENT_TOKEN" \
  -H "Content-Type: application/json" \
  -d ''

创建运行并检查输出

curl --request POST \
  --url $AZURE_AI_FOUNDRY_PROJECT_ENDPOINT/threads/thread_abc123/messages?api-version=$API_VERSION \
  -H "Authorization: Bearer $AGENT_TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
      "role": "user",
      "content": "What is the weather in Seattle, WA?"
    }'
curl --request POST \
  --url $AZURE_AI_FOUNDRY_PROJECT_ENDPOINT/threads/thread_abc123/runs?api-version=$API_VERSION \
  -H "Authorization: Bearer $AGENT_TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "assistant_id": "asst_abc123",
  }'
curl --request GET \
  --url $AZURE_AI_FOUNDRY_PROJECT_ENDPOINT/threads/thread_abc123/runs/run_abc123?api-version=$API_VERSION \
  -H "Authorization: Bearer $AGENT_TOKEN"

获取运行的结果

curl --request GET \
  --url $AZURE_AI_FOUNDRY_PROJECT_ENDPOINT/threads/thread_abc123/messages?api-version=$API_VERSION \
  -H "Authorization: Bearer $AGENT_TOKEN"

小窍门

可以在 GitHub 上找到完整的工作示例

.NET Azure 函数示例的先决条件

若要进行函数调用,需要创建和部署 Azure 函数。 在代码片段中,我们有一个 C# 上的函数示例,该函数可由前面的代码使用。

namespace FunctionProj
{
    public class Response
    {
        public required string Value { get; set; }
        public required string CorrelationId { get; set; }
    }

    public class Arguments
    {
        public required string OutputQueueUri { get; set; }
        public required string CorrelationId { get; set; }
    }

    public class Foo
    {
        private readonly ILogger<Foo> _logger;

        public Foo(ILogger<Foo> logger)
        {
            _logger = logger;
        }

        [Function("Foo")]
        public void Run([QueueTrigger("azure-function-foo-input")] Arguments input, FunctionContext executionContext)
        {
            var logger = executionContext.GetLogger("Foo");
            logger.LogInformation("C# Queue function processed a request.");

            // We have to provide the Managed identity for function resource
            // and allow this identity a Queue Data Contributor role on the storage account.
            var cred = new DefaultAzureCredential();
            var queueClient = new QueueClient(new Uri(input.OutputQueueUri), cred,
                    new QueueClientOptions { MessageEncoding = QueueMessageEncoding.Base64 });

            var response = new Response
            {
                Value = "Bar",
                // Important! Correlation ID must match the input correlation ID.
                CorrelationId = input.CorrelationId
            };

            var jsonResponse = JsonSerializer.Serialize(response);
            queueClient.SendMessage(jsonResponse);
        }
    }
}

在此代码中,我们分别定义函数输入和输出类: ArgumentsResponse 这两个数据类在 JSON 中序列化。 重要的是,它们都包含 CorrelationId,后者在输入和输出之间是相同的。

在我们的示例中,该函数存储在使用 AI 中心创建的存储帐户中。 为此,我们需要允许密钥访问该存储。 在 Azure 门户中,转到“存储帐户 > 设置 > 配置”,并将“允许存储帐户密钥访问”设置为“已启用”。 如果未完成,则显示的错误为“远程服务器返回错误:(403) 禁止”。若要创建将托管函数的函数资源,请安装 azure-cli python 包并运行下一个命令:

pip install -U azure-cli
az login
az functionapp create --resource-group your-resource-group --consumption-plan-location region --runtime dotnet-isolated --functions-version 4 --name function_name --storage-account storage_account_already_present_in_resource_group --app-insights existing_or_new_application_insights_name

此函数将数据写入输出队列,因此需要向 Azure 进行身份验证,因此我们需要分配函数系统标识并提供它 Storage Queue Data Contributor。 若要在 Azure 门户中执行此作,请选择位于资源组和设置your-resource-group标识中的>函数,将其打开并选择“保存”。 然后,为分配的系统托管标识分配我们的函数(上面脚本中的 Storage Queue Data Contributor)所使用的存储帐户的 storage_account_already_present_in_resource_group 权限。

现在,我们将创建函数本身。 安装 .NETCore Tools ,并使用下一个命令创建函数项目。

func init FunctionProj --worker-runtime dotnet-isolated --target-framework net8.0
cd FunctionProj
func new --name foo --template "HTTP trigger" --authlevel "anonymous"
dotnet add package Azure.Identity
dotnet add package Microsoft.Azure.Functions.Worker.Extensions.Storage.Queues --prerelease

注释

有一个“Azure 队列存储触发器”,然而目前尝试使用它会导致出错。 我们创建了一个项目,其中包含一个 HTTP 触发的 Azure 函数,其逻辑位于 Foo.cs 文件中。 由于我们需要通过队列中的新消息触发 Azure 函数,我们将上述 C# 示例代码替换为 Foo.cs 的内容。 若要部署函数,请从 dotnet 项目文件夹运行命令:

func azure functionapp publish function_name

storage_account_already_present_in_resource_group 中选择 Queue service,并创建两个队列:azure-function-foo-inputazure-function-tool-output。 示例中使用相同的队列。 若要检查函数是否正常工作,请将下一条消息放入 azure-function-foo-input 中,并用实际资源组名称替换 storage_account_already_present_in_resource_group,或者仅复制输出队列地址。

{
  "OutputQueueUri": "https://storage_account_already_present_in_resource_group.queue.core.windows.net/azure-function-tool-output",
  "CorrelationId": "42"
}

然后,我们监控输出队列或消息。 你应当收到下一条消息。

{
  "Value": "Bar",
  "CorrelationId": "42"
}

输入 CorrelationId 与输出相同。

小窍门

将多个消息放入输入队列,并在输出队列打开的情况下保留第二个 Internet 浏览器窗口,并在门户用户界面上命中刷新按钮,这样就不会错过该消息。 如果改将消息发送到 azure-function-foo-input-poison 队列,则该函数在完成时会出现错误,这种情况下请检查你的设置。 测试函数并确保其正常工作后,请确保 Azure AI 项目对存储帐户具有以下角色: Storage Account ContributorStorage Blob Data ContributorStorage File Data Privileged ContributorStorage Queue Data ContributorStorage Table Data Contributor。 现在,该函数可供代理使用。

在下面的示例中,我们调用函数“foo”,该函数响应“Bar”。

创建客户端、工具定义和代理

获取必要的配置、初始化 PersistentAgentsClient、定义 AzureFunctionToolDefinition Azure 函数,然后创建代理。

using Azure;
using Azure.AI.Agents.Persistent;
using Azure.Identity;
using Microsoft.Extensions.Configuration;
using System.Text.Json;

//Get configuration from appsettings.json.
IConfigurationRoot configuration = new ConfigurationBuilder()
    .SetBasePath(AppContext.BaseDirectory)
    .AddJsonFile("appsettings.json", optional: false, reloadOnChange: true)
    .Build();

var projectEndpoint = configuration["ProjectEndpoint"];
var modelDeploymentName = configuration["ModelDeploymentName"];
var storageQueueUri = configuration["StorageQueueURI"];
//Initialize PersistentAgentsClient.
PersistentAgentsClient client = new(projectEndpoint, new DefaultAzureCredential());

//Define Azure Function tool definition.
AzureFunctionToolDefinition azureFnTool = new(
    name: "foo",
    description: "Get answers from the foo bot.",
    inputBinding: new AzureFunctionBinding(
        new AzureFunctionStorageQueue(
            queueName: "azure-function-foo-input",
            storageServiceEndpoint: storageQueueUri
        )
    ),
    outputBinding: new AzureFunctionBinding(
        new AzureFunctionStorageQueue(
            queueName: "azure-function-tool-output",
            storageServiceEndpoint: storageQueueUri
        )
    ),
    parameters: BinaryData.FromObjectAsJson(
            new
            {
                Type = "object",
                Properties = new
                {
                    query = new
                    {
                        Type = "string",
                        Description = "The question to ask.",
                    },
                    outputqueueuri = new
                    {
                        Type = "string",
                        Description = "The full output queue uri."
                    }
                },
            },
        new JsonSerializerOptions() { PropertyNamingPolicy = JsonNamingPolicy.CamelCase }
    )
);

//Create agent and give it the Azure Function tool.
PersistentAgent agent = client.Administration.CreateAgent(
    model: modelDeploymentName,
    name: "azure-function-agent-foo",
    instructions: "You are a helpful support agent. Use the provided function any "
    + "time the prompt contains the string 'What would foo say?'. When you invoke "
    + "the function, ALWAYS specify the output queue uri parameter as "
    + $"'{storageQueueUri}/azure-function-tool-output'. Always responds with "
    + "\"Foo says\" and then the response from the tool.",
    tools: [azureFnTool]
);

创建线程并添加消息

接下来,创建新的永久性代理线程,并向其添加初始用户消息。

PersistentAgentThread thread = client.Threads.CreateThread();

client.Messages.CreateMessage(
    thread.Id,
    MessageRole.User,
    "What is the most prevalent element in the universe? What would foo say?");

创建和监视运行

然后,在线程上为代理创建运行并轮询其状态,直到它完成或者它需要你采取措施。

ThreadRun run = client.Runs.CreateRun(thread.Id, agent.Id);

do
{
    Thread.Sleep(TimeSpan.FromMilliseconds(500));
    run = client.Runs.GetRun(thread.Id, run.Id);
}
while (run.Status == RunStatus.Queued
    || run.Status == RunStatus.InProgress
    || run.Status == RunStatus.RequiresAction);

处理结果

运行完成后,我们将从线程中检索和处理消息。

Pageable<PersistentThreadMessage> messages = client.Messages.GetMessages(
    threadId: thread.Id,
    order: ListSortOrder.Ascending
);

foreach (PersistentThreadMessage threadMessage in messages)
{
    foreach (MessageContent content in threadMessage.ContentItems)
    {
        switch (content)
        {
            case MessageTextContent textItem:
                Console.WriteLine($"[{threadMessage.Role}]: {textItem.Text}");
                break;
        }
    }
}

清理资源

最后,通过删除线程和代理来清理已创建的资源。

client.Threads.DeleteThread(thread.Id);
client.Administration.DeleteAgent(agent.Id);

代码示例

package com.example.agents;

import com.azure.ai.agents.persistent.MessagesClient;
import com.azure.ai.agents.persistent.PersistentAgentsAdministrationClient;
import com.azure.ai.agents.persistent.PersistentAgentsClient;
import com.azure.ai.agents.persistent.PersistentAgentsClientBuilder;
import com.azure.ai.agents.persistent.RunsClient;
import com.azure.ai.agents.persistent.ThreadsClient;
import com.azure.ai.agents.persistent.implementation.models.CreateAgentRequest;
import com.azure.ai.agents.persistent.models.AzureFunctionBinding;
import com.azure.ai.agents.persistent.models.AzureFunctionDefinition;
import com.azure.ai.agents.persistent.models.AzureFunctionStorageQueue;
import com.azure.ai.agents.persistent.models.AzureFunctionToolDefinition;
import com.azure.ai.agents.persistent.models.CreateRunOptions;
import com.azure.ai.agents.persistent.models.FunctionDefinition;
import com.azure.ai.agents.persistent.models.MessageImageFileContent;
import com.azure.ai.agents.persistent.models.MessageRole;
import com.azure.ai.agents.persistent.models.MessageTextContent;
import com.azure.ai.agents.persistent.models.PersistentAgent;
import com.azure.ai.agents.persistent.models.PersistentAgentThread;
import com.azure.ai.agents.persistent.models.RunStatus;
import com.azure.ai.agents.persistent.models.ThreadMessage;
import com.azure.ai.agents.persistent.models.ThreadRun;
import com.azure.ai.agents.persistent.models.MessageContent;
import com.azure.core.http.HttpHeaderName;
import com.azure.core.http.rest.PagedIterable;
import com.azure.core.http.rest.RequestOptions;
import com.azure.core.util.BinaryData;
import com.azure.identity.DefaultAzureCredentialBuilder;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

public class AgentExample {

    public static void main(String[] args) {

        // variables for authenticating requests to the agent service 
        String projectEndpoint = System.getenv("PROJECT_ENDPOINT");
        String modelName = System.getenv("MODEL_DEPLOYMENT_NAME");
        String storageQueueUri = System.getenv("STORAGE_QUEUE_URI");
        String azureFunctionName = System.getenv("AZURE_FUNCTION_NAME");

        PersistentAgentsClientBuilder clientBuilder = new PersistentAgentsClientBuilder().endpoint(projectEndpoint)
            .credential(new DefaultAzureCredentialBuilder().build());
        PersistentAgentsClient agentsClient = clientBuilder.buildClient();
        PersistentAgentsAdministrationClient administrationClient = agentsClient.getPersistentAgentsAdministrationClient();
        ThreadsClient threadsClient = agentsClient.getThreadsClient();
        MessagesClient messagesClient = agentsClient.getMessagesClient();
        RunsClient runsClient = agentsClient.getRunsClient();

        FunctionDefinition fnDef = new FunctionDefinition(
            azureFunctionName,
            BinaryData.fromObject(
                mapOf(
                    "type", "object",
                    "properties", mapOf(
                        "location",
                        mapOf("type", "string", "description", "The location to look up")
                    ),
                    "required", new String[]{"location"}
                )
            )
        );
        AzureFunctionDefinition azureFnDef = new AzureFunctionDefinition(
            fnDef,
            new AzureFunctionBinding(new AzureFunctionStorageQueue(storageQueueUri, "agent-input")),
            new AzureFunctionBinding(new AzureFunctionStorageQueue(storageQueueUri, "agent-output"))
        );
        AzureFunctionToolDefinition azureFnTool = new AzureFunctionToolDefinition(azureFnDef);

        String agentName = "azure_function_example";
        RequestOptions requestOptions = new RequestOptions()
            .setHeader(HttpHeaderName.fromString("x-ms-enable-preview"), "true");
        CreateAgentRequest createAgentRequestObj = new CreateAgentRequest(modelName)
            .setName(agentName)
            .setInstructions("You are a helpful agent. Use the provided function any time "
                + "you are asked with the weather of any location")
            .setTools(Arrays.asList(azureFnTool));
        BinaryData createAgentRequest = BinaryData.fromObject(createAgentRequestObj);
        PersistentAgent agent = administrationClient.createAgentWithResponse(createAgentRequest, requestOptions)
            .getValue().toObject(PersistentAgent.class);

        PersistentAgentThread thread = threadsClient.createThread();
        ThreadMessage createdMessage = messagesClient.createMessage(
            thread.getId(),
            MessageRole.USER,
            "What is the weather in Seattle, WA?");

        try {
            //run agent
            CreateRunOptions createRunOptions = new CreateRunOptions(thread.getId(), agent.getId())
                .setAdditionalInstructions("");
            ThreadRun threadRun = runsClient.createRun(createRunOptions);

            waitForRunCompletion(thread.getId(), threadRun, runsClient);
            printRunMessages(messagesClient, thread.getId());
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } finally {
            //cleanup
            threadsClient.deleteThread(thread.getId());
            administrationClient.deleteAgent(agent.getId());
        }
    }

    // Use "Map.of" if available
    @SuppressWarnings("unchecked")
    private static <T> Map<String, T> mapOf(Object... inputs) {
        Map<String, T> map = new HashMap<>();
        for (int i = 0; i < inputs.length; i += 2) {
            String key = (String) inputs[i];
            T value = (T) inputs[i + 1];
            map.put(key, value);
        }
        return map;
    }

    // A helper function to print messages from the agent
    public static void printRunMessages(MessagesClient messagesClient, String threadId) {

        PagedIterable<ThreadMessage> runMessages = messagesClient.listMessages(threadId);
        for (ThreadMessage message : runMessages) {
            System.out.print(String.format("%1$s - %2$s : ", message.getCreatedAt(), message.getRole()));
            for (MessageContent contentItem : message.getContent()) {
                if (contentItem instanceof MessageTextContent) {
                    System.out.print((((MessageTextContent) contentItem).getText().getValue()));
                } else if (contentItem instanceof MessageImageFileContent) {
                    String imageFileId = (((MessageImageFileContent) contentItem).getImageFile().getFileId());
                    System.out.print("Image from ID: " + imageFileId);
                }
                System.out.println();
            }
        }
    }

    // a helper function to wait until a run has completed running
    public static void waitForRunCompletion(String threadId, ThreadRun threadRun, RunsClient runsClient)
        throws InterruptedException {

        do {
            Thread.sleep(500);
            threadRun = runsClient.getRun(threadId, threadRun.getId());
        }
        while (
            threadRun.getStatus() == RunStatus.QUEUED
                || threadRun.getStatus() == RunStatus.IN_PROGRESS
                || threadRun.getStatus() == RunStatus.REQUIRES_ACTION);

        if (threadRun.getStatus() == RunStatus.FAILED) {
            System.out.println(threadRun.getLastError().getMessage());
        }
    }
}