Nuta
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować się zalogować lub zmienić katalog.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Do tego momentu tworzyliśmy instancje egzekutora i przekazywaliśmy je bezpośrednio do obiektu WorkflowBuilder. Takie podejście działa dobrze w przypadku prostych scenariuszy, w których potrzebujesz tylko jednego wystąpienia przepływu pracy. Jednak w bardziej złożonych przypadkach możesz utworzyć wiele izolowanych wystąpień tego samego przepływu pracy. Aby to umożliwić, każde wystąpienie przepływu pracy musi odbierać własny zestaw wystąpień funkcji wykonawczej. Ponowne stosowanie tych samych funkcji wykonawczych spowoduje, że ich stan wewnętrzny będzie współużytkowany w przepływach pracy, co powoduje niezamierzone skutki uboczne. Aby tego uniknąć, można zarejestrować fabryki funkcji wykonawczej w programie WorkflowBuilder, zapewniając, że dla każdego wystąpienia przepływu pracy są tworzone nowe wystąpienia funkcji wykonawczej.
Rejestrowanie fabryk w programie Workflow Builder
Wkrótce...
Aby zarejestrować fabrykę wykonawczą w WorkflowBuilder, możesz użyć metody register_executor. Ta metoda przyjmuje dwa parametry: funkcję fabryki, która tworzy instancje wykonawcy (typu Executor lub jego pochodna Executor) oraz nazwę fabryki, która ma być używana w konfiguracji przepływu pracy.
class UpperCase(Executor):
def __init__(self, id: str):
super().__init__(id=id)
@handler
async def to_upper_case(self, text: str, ctx: WorkflowContext[str]) -> None:
"""Convert the input to uppercase and forward it to the next node."""
result = text.upper()
# Send the result to the next executor in the workflow.
await ctx.send_message(result)
class Accumulate(Executor):
def __init__(self, id: str):
super().__init__(id=id)
# Executor internal state that should not be shared among different workflow instances.
self._text_length = 0
@handler
async def accumulate(self, text: str, ctx: WorkflowContext) -> None:
"""Accumulate the length of the input text and log it."""
self._text_length += len(text)
print(f"Accumulated text length: {self._text_length}")
@executor(id="reverse_text_executor")
async def reverse_text(text: str, ctx: WorkflowContext[str]) -> None:
"""Reverse the input string and send it downstream."""
result = text[::-1]
# Send the result to the next executor in the workflow.
await ctx.yield_output(result)
workflow_builder = (
WorkflowBuilder()
.register_executor(
factory_func=lambda: UpperCase(id="UpperCaseExecutor"),
name="UpperCase",
)
.register_executor(
factory_func=lambda: Accumulate(id="AccumulateExecutor"),
name="Accumulate",
)
.register_executor(
factory_func=lambda: reverse_text,
name="ReverseText",
)
# Use the factory name to configure the workflow
.add_fan_out_edges("UpperCase", ["Accumulate", "ReverseText"])
.set_start_executor("UpperCase")
)
Tworzenie przepływu pracy przy użyciu konstruktora
# Build the workflow using the builder
workflow_a = workflow_builder.build()
await workflow_a.run("hello world")
await workflow_a.run("hello world")
Oczekiwane dane wyjściowe:
Accumulated text length: 22
Teraz utwórzmy kolejne wystąpienie przepływu pracy i uruchomimy je. Funkcja Accumulate wykonawcza powinna mieć własny stan wewnętrzny, a nie udostępniać stanu pierwszemu wystąpieniu przepływu pracy.
# Build another workflow using the builder
# This workflow will have its own set of executors, including a new instance of the Accumulate executor.
workflow_b = workflow_builder.build()
await workflow_b.run("hello world")
Oczekiwane dane wyjściowe:
Accumulated text length: 11
Aby zarejestrować fabrykę agentów do WorkflowBuilder, możesz użyć metody register_agent. Ta metoda przyjmuje dwa parametry: funkcję fabryki, która tworzy wystąpienia agenta (typów implementujących AgentProtocol) i nazwę fabryki, która ma być używana w konfiguracji przepływu pracy.
def create_agent() -> ChatAgent:
"""Factory function to create a Writer agent."""
return AzureOpenAIChatClient(credential=AzureCliCredential()).create_agent(
instructions=("You are a helpful assistant.",),
name="assistant",
)
workflow_builder = (
WorkflowBuilder()
.register_agent(
factory_func=create_agent,
name="Assistant",
)
# Register other executors or agents as needed and configure the workflow
...
)
# Build the workflow using the builder
workflow = workflow_builder.build()
Za każdym razem, gdy tworzone jest nowe wystąpienie przepływu pracy, agent w tym przepływie będzie nowym wystąpieniem utworzonym przez funkcję fabryczną i otrzyma nowe wystąpienie wątku.
Izolacja stanu przepływu pracy
Aby dowiedzieć się więcej na temat izolacji stanu przepływu pracy, zapoznaj się z dokumentacją izolacji stanu przepływu pracy .