将工厂注册到工作流生成器

至此,我们一直在创建执行程序实例并将其直接传递给该 WorkflowBuilder实例。 此方法适用于只需单个工作流实例的简单方案。 但是,在更复杂的情况下,可能需要创建同一工作流的多个独立实例。 若要支持此功能,每个工作流实例必须接收其自己的执行程序实例集。 重用相同的执行程序会导致其内部状态在工作流之间共享,从而导致意外的副作用。 为了避免这种情况,你可以将执行器工厂注册到 WorkflowBuilder,确保为每个工作流实例创建新的执行器实例。

将工厂注册到工作流生成器

即将推出……

若要将执行程序工厂注册到 WorkflowBuilder,可以使用 register_executor 方法。 此方法采用两个参数:用于创建执行程序实例(类型 Executor 或派生) Executor的工厂函数以及要用于工作流配置的工厂的名称。

class UpperCase(Executor):
    def __init__(self, id: str):
        super().__init__(id=id)

    @handler
    async def to_upper_case(self, text: str, ctx: WorkflowContext[str]) -> None:
        """Convert the input to uppercase and forward it to the next node."""
        result = text.upper()

        # Send the result to the next executor in the workflow.
        await ctx.send_message(result)

class Accumulate(Executor):
    def __init__(self, id: str):
        super().__init__(id=id)
        # Executor internal state that should not be shared among different workflow instances.
        self._text_length = 0

    @handler
    async def accumulate(self, text: str, ctx: WorkflowContext) -> None:
        """Accumulate the length of the input text and log it."""
        self._text_length += len(text)
        print(f"Accumulated text length: {self._text_length}")

@executor(id="reverse_text_executor")
async def reverse_text(text: str, ctx: WorkflowContext[str]) -> None:
    """Reverse the input string and send it downstream."""
    result = text[::-1]

    # Send the result to the next executor in the workflow.
    await ctx.yield_output(result)

workflow_builder = (
    WorkflowBuilder()
    .register_executor(
        factory_func=lambda: UpperCase(id="UpperCaseExecutor"),
        name="UpperCase",
    )
    .register_executor(
        factory_func=lambda: Accumulate(id="AccumulateExecutor"),
        name="Accumulate",
    )
    .register_executor(
        factory_func=lambda: reverse_text,
        name="ReverseText",
    )
    # Use the factory name to configure the workflow
    .add_fan_out_edges("UpperCase", ["Accumulate", "ReverseText"])
    .set_start_executor("UpperCase")
)

使用生成器生成工作流

# Build the workflow using the builder
workflow_a = workflow_builder.build()
await workflow_a.run("hello world")
await workflow_a.run("hello world")

预期输出:

Accumulated text length: 22

现在,让我们创建另一个工作流实例并运行它。 Accumulate执行程序应有自己的内部状态,而不是与第一个工作流实例共享状态。

# Build another workflow using the builder
# This workflow will have its own set of executors, including a new instance of the Accumulate executor.
workflow_b = workflow_builder.build()
await workflow_b.run("hello world")

预期输出:

Accumulated text length: 11

若要将代理工厂注册到该 WorkflowBuilder工厂,可以使用该方法 register_agent 。 此方法采用两个参数:用于创建代理实例的工厂函数(实现 AgentProtocol的类型)和要用于工作流配置的工厂的名称。

def create_agent() -> ChatAgent:
    """Factory function to create a Writer agent."""
    return AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent(
        instructions=("You are a helpful assistant.",),
        name="assistant",
    )

workflow_builder = (
    WorkflowBuilder()
    .register_agent(
        factory_func=create_agent,
        name="Assistant",
    )
    # Register other executors or agents as needed and configure the workflow
    ...
)

# Build the workflow using the builder
workflow = workflow_builder.build()

每次创建新的工作流实例时,工作流中的代理都将是工厂函数创建的新实例,并获取新的线程实例。

工作流状态隔离

若要了解有关工作流状态隔离的详细信息,请参阅 工作流状态隔离 文档。