Udostępnij przez


Przepływy pracy programu Microsoft Agent Framework — punkty kontrolne

Ta strona zawiera omówienie punktów kontrolnych w systemie przepływu pracy programu Microsoft Agent Framework.

Przegląd

Punkty kontrolne umożliwiają zapisywanie stanu procesu w określonych punktach podczas jego wykonywania i kontynuowanie od tych punktów później. Ta funkcja jest szczególnie przydatna w następujących scenariuszach:

  • Długotrwałe przepływy pracy, w których chcesz uniknąć utraty postępu w przypadku awarii.
  • Długotrwałe przepływy pracy, w których chcesz wstrzymać i wznowić wykonywanie w późniejszym czasie.
  • Przepływy pracy, które wymagają okresowego zapisywania stanu na potrzeby inspekcji lub zgodności.
  • Przepływy pracy, które należy migrować w różnych środowiskach lub instancjach.

Kiedy są tworzone punkty kontrolne?

Pamiętaj, że przepływy pracy są wykonywane w superkrokach, jak opisano w podstawowych pojęciach. Punkty kontrolne są tworzone na końcu każdego superkroku, po zakończeniu wykonywania wszystkich funkcji wykonawczych w tym superkroku. Punkt kontrolny przechwytuje cały stan przepływu pracy, w tym:

  • Bieżący stan wszystkich funkcji wykonawczych
  • Wszystkie oczekujące komunikaty w przepływie pracy dla następnego superkroku
  • Oczekujące żądania i odpowiedzi
  • Stany udostępnione

Przechwytywanie punktów kontrolnych

Aby włączyć tworzenie punktów kontrolnych, należy podać element CheckpointManager przy tworzeniu przebiegu pracy. Dostęp do punktu kontrolnego można uzyskać za pośrednictwem SuperStepCompletedEvent.

using Microsoft.Agents.AI.Workflows;

// Create a checkpoint manager to manage checkpoints
var checkpointManager = new CheckpointManager();
// List to store checkpoint info for later use
var checkpoints = new List<CheckpointInfo>();

// Run the workflow with checkpointing enabled
Checkpointed<StreamingRun> checkpointedRun = await InProcessExecution
    .StreamAsync(workflow, input, checkpointManager)
    .ConfigureAwait(false);
await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync().ConfigureAwait(false))
{
    if (evt is SuperStepCompletedEvent superStepCompletedEvt)
    {
        // Access the checkpoint and store it
        CheckpointInfo? checkpoint = superStepCompletedEvt.CompletionInfo!.Checkpoint;
        if (checkpoint != null)
        {
            checkpoints.Add(checkpoint);
        }
    }
}

Aby włączyć punkty kontrolne, trzeba podać element CheckpointStorage podczas tworzenia przepływu pracy. Dostęp do punktu kontrolnego można uzyskać za pośrednictwem magazynu.

from agent_framework import (
    InMemoryCheckpointStorage,
    WorkflowBuilder,
)

# Create a checkpoint storage to manage checkpoints
# There are different implementations of CheckpointStorage, such as InMemoryCheckpointStorage and FileCheckpointStorage.
checkpoint_storage = InMemoryCheckpointStorage()

# Build a workflow with checkpointing enabled
builder = WorkflowBuilder()
builder.set_start_executor(start_executor)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
workflow = builder.with_checkpointing(checkpoint_storage).build()

# Run the workflow
async for event in workflow.run_streaming(input):
    ...

# Access checkpoints from the storage
checkpoints = await checkpoint_storage.list_checkpoints()

Wznawianie z punktów kontrolnych

Przepływ pracy można wznowić bezpośrednio z konkretnego punktu kontrolnego w tym samym przebiegu.

// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = checkpoints[5];
// Note that we are restoring the state directly to the same run instance.
await checkpointedRun.RestoreCheckpointAsync(savedCheckpoint, CancellationToken.None).ConfigureAwait(false);
await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync().ConfigureAwait(false))
{
    if (evt is WorkflowOutputEvent workflowOutputEvt)
    {
        Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
    }
}

Możesz wznowić przepływ pracy z określonego punktu kontrolnego bezpośrednio w tym samym wystąpieniu.

# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run_stream(checkpoint_id=saved_checkpoint.checkpoint_id):
    ...

Przywracanie z punktów kontrolnych

Możesz też przywrócić przepływ pracy z punktu kontrolnego do nowego wystąpienia uruchomienia.

// Assume we want to resume from the 6th checkpoint
CheckpointInfo savedCheckpoint = checkpoints[5];
Checkpointed<StreamingRun> newCheckpointedRun = await InProcessExecution
    .ResumeStreamAsync(newWorkflow, savedCheckpoint, checkpointManager)
    .ConfigureAwait(false);
await foreach (WorkflowEvent evt in newCheckpointedRun.Run.WatchStreamAsync().ConfigureAwait(false))
{
    if (evt is WorkflowOutputEvent workflowOutputEvt)
    {
        Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
    }
}

Możesz też przywrócić nowe wystąpienie przepływu pracy z punktu kontrolnego.

from agent_framework import WorkflowBuilder

builder = WorkflowBuilder()
builder.set_start_executor(start_executor)
builder.add_edge(start_executor, executor_b)
builder.add_edge(executor_b, executor_c)
builder.add_edge(executor_b, end_executor)
# This workflow instance doesn't require checkpointing enabled.
workflow = builder.build()

# Assume we want to resume from the 6th checkpoint
saved_checkpoint = checkpoints[5]
async for event in workflow.run_stream
    checkpoint_id=saved_checkpoint.checkpoint_id,
    checkpoint_storage=checkpoint_storage,
):
    ...

Zapisz stany wykonawcze

Aby upewnić się, że stan funkcji wykonawczej jest przechwytywany w punkcie OnCheckpointingAsync kontrolnym, funkcja wykonawcza musi zastąpić metodę i zapisać jej stan w kontekście przepływu pracy.

using Microsoft.Agents.AI.Workflows;
using Microsoft.Agents.AI.Workflows.Reflection;

internal sealed class CustomExecutor() : Executor<string>("CustomExecutor")
{
    private const string StateKey = "CustomExecutorState";

    private List<string> messages = new();

    public async ValueTask HandleAsync(string message, IWorkflowContext context)
    {
        this.messages.Add(message);
        // Executor logic...
    }

    protected override ValueTask OnCheckpointingAsync(IWorkflowContext context, CancellationToken cancellation = default)
    {
        return context.QueueStateUpdateAsync(StateKey, this.messages);
    }
}

Ponadto, aby zapewnić prawidłowe przywrócenie stanu podczas wznawiania z punktu kontrolnego, egzekutor musi zastąpić OnCheckpointRestoredAsync metodę i załadować jego stan z kontekstu przepływu pracy.

protected override async ValueTask OnCheckpointRestoredAsync(IWorkflowContext context, CancellationToken cancellation = default)
{
    this.messages = await context.ReadStateAsync<List<string>>(StateKey).ConfigureAwait(false);
}

Aby upewnić się, że stan funkcji wykonawczej jest przechwytywany w punkcie on_checkpoint_save kontrolnym, funkcja wykonawcza musi zastąpić metodę i zapisać jej stan w kontekście przepływu pracy.

class CustomExecutor(Executor):
    def __init__(self, id: str) -> None:
        super().__init__(id=id)
        self._messages: list[str] = []

    @handler
    async def handle(self, message: str, ctx: WorkflowContext):
        self._messages.append(message)
        # Executor logic...

    async def on_checkpoint_save(self) -> dict[str, Any]:
        return {"messages": self._messages}

Ponadto, aby zapewnić prawidłowe przywrócenie stanu podczas wznawiania z punktu kontrolnego, egzekutor musi zastąpić on_checkpoint_restore metodę i załadować jego stan z kontekstu przepływu pracy.

async def on_checkpoint_restore(self, state: dict[str, Any]) -> None:
    self._messages = state.get("messages", [])

Dalsze kroki