检查点和恢复工作流

检查点允许工作流在特定时间点保存状态,并在以后恢复执行,即使在进程重启之后也是如此。 这对长时间运行的工作流、错误恢复和人机循环方案至关重要。

涵盖的概念

先决条件

关键组件

安装 NuGet 包

首先,安装 .NET 项目所需的包:

dotnet add package Microsoft.Agents.AI.Workflows --prerelease

CheckpointManager

CheckpointManager 的提供检查点存储和检索功能:

using Microsoft.Agents.AI.Workflows;

// Use the default in-memory checkpoint manager
var checkpointManager = CheckpointManager.Default;

// Or create a custom checkpoint manager with JSON serialization
var checkpointManager = CheckpointManager.CreateJson(store, customOptions);

启用检查点

使用InProcessExecution执行工作流时启用检查点:

using Microsoft.Agents.AI.Workflows;

// Create workflow with checkpointing support
var workflow = await WorkflowHelper.GetWorkflowAsync();
var checkpointManager = CheckpointManager.Default;

// Execute with checkpointing enabled
await using Checkpointed<StreamingRun> checkpointedRun = await InProcessExecution
    .StreamAsync(workflow, NumberSignal.Init, checkpointManager);

状态持久性

执行器状态

执行程序可以使用Executor<T>基类来在检查点中保留能够生存的本地状态。

internal sealed class GuessNumberExecutor : Executor<NumberSignal>("Guess")
{
    private const string StateKey = "GuessNumberExecutor.State";

    public int LowerBound { get; private set; }
    public int UpperBound { get; private set; }

    public GuessNumberExecutor() : this()
    {
    }

    public override async ValueTask HandleAsync(NumberSignal message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        int guess = (LowerBound + UpperBound) / 2;
        await context.SendMessageAsync(guess, cancellationToken);
    }

    /// <summary>
    /// Checkpoint the current state of the executor.
    /// This must be overridden to save any state that is needed to resume the executor.
    /// </summary>
    protected override ValueTask OnCheckpointingAsync(IWorkflowContext context, CancellationToken cancellationToken = default) =>
        context.QueueStateUpdateAsync(StateKey, (LowerBound, UpperBound), cancellationToken);

    /// <summary>
    /// Restore the state of the executor from a checkpoint.
    /// This must be overridden to restore any state that was saved during checkpointing.
    /// </summary>
    protected override async ValueTask OnCheckpointRestoredAsync(IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        var state = await context.ReadStateAsync<(int, int)>(StateKey, cancellationToken);
        (LowerBound, UpperBound) = state;
    }
}

自动创建检查点

提供检查点管理器时,会在每个超级步骤结束时自动创建检查点:

var checkpoints = new List<CheckpointInfo>();

await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync())
{
    switch (evt)
    {
        case SuperStepCompletedEvent superStepCompletedEvt:
            // Checkpoints are automatically created at super step boundaries
            CheckpointInfo? checkpoint = superStepCompletedEvt.CompletionInfo!.Checkpoint;
            if (checkpoint is not null)
            {
                checkpoints.Add(checkpoint);
                Console.WriteLine($"Checkpoint created at step {checkpoints.Count}.");
            }
            break;

        case WorkflowOutputEvent workflowOutputEvt:
            Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
            break;
    }
}

如何使用检查点

访问检查点信息

从已完成的运行中访问检查点元数据:

// Get all checkpoints from a checkpointed run
var allCheckpoints = checkpointedRun.Checkpoints;

// Get the latest checkpoint
var latestCheckpoint = checkpointedRun.LatestCheckpoint;

// Access checkpoint details
foreach (var checkpoint in checkpoints)
{
    Console.WriteLine($"Checkpoint ID: {checkpoint.CheckpointId}");
    Console.WriteLine($"Step Number: {checkpoint.StepNumber}");
    Console.WriteLine($"Parent ID: {checkpoint.Parent?.CheckpointId ?? "None"}");
}

检查点存储

检查点通过 CheckpointManager 接口进行管理:

// Commit a checkpoint (usually done automatically)
CheckpointInfo checkpointInfo = await checkpointManager.CommitCheckpointAsync(runId, checkpoint);

// Retrieve a checkpoint
Checkpoint restoredCheckpoint = await checkpointManager.LookupCheckpointAsync(runId, checkpointInfo);

从检查点恢复

流媒体继续

从检查点继续执行,并实时流式处理事件:

// Resume from a specific checkpoint with streaming
CheckpointInfo savedCheckpoint = checkpoints[checkpointIndex];

await using Checkpointed<StreamingRun> resumedRun = await InProcessExecution
    .ResumeStreamAsync(workflow, savedCheckpoint, checkpointManager, runId);

await foreach (WorkflowEvent evt in resumedRun.Run.WatchStreamAsync())
{
    switch (evt)
    {
        case ExecutorCompletedEvent executorCompletedEvt:
            Console.WriteLine($"Executor {executorCompletedEvt.ExecutorId} completed.");
            break;

        case WorkflowOutputEvent workflowOutputEvt:
            Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
            return;
    }
}

非流式处理恢复

继续并等待完成:

// Resume from checkpoint without streaming
Checkpointed<Run> resumedRun = await InProcessExecution
    .ResumeAsync(workflow, savedCheckpoint, checkpointManager, runId);

// Wait for completion and get final result
var result = await resumedRun.Run.WaitForCompletionAsync();

In-Place 还原

将检查点直接还原到现有运行实例:

// Restore checkpoint to the same run instance
await checkpointedRun.RestoreCheckpointAsync(savedCheckpoint);

// Continue execution from the restored state
await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync())
{
    // Handle events as normal
    if (evt is WorkflowOutputEvent outputEvt)
    {
        Console.WriteLine($"Resumed workflow result: {outputEvt.Data}");
        break;
    }
}

新工作流实例 (解除冻结)

从检查点创建新的工作流实例:

// Create a completely new workflow instance
var newWorkflow = await WorkflowHelper.GetWorkflowAsync();

// Resume with the new instance from a saved checkpoint
await using Checkpointed<StreamingRun> newCheckpointedRun = await InProcessExecution
    .ResumeStreamAsync(newWorkflow, savedCheckpoint, checkpointManager, originalRunId);

await foreach (WorkflowEvent evt in newCheckpointedRun.Run.WatchStreamAsync())
{
    if (evt is WorkflowOutputEvent workflowOutputEvt)
    {
        Console.WriteLine($"Rehydrated workflow result: {workflowOutputEvt.Data}");
        break;
    }
}

使用检查点的以人为中心的循环

将检查点与人机交互工作流结合使用:

var checkpoints = new List<CheckpointInfo>();

await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync())
{
    switch (evt)
    {
        case RequestInfoEvent requestInputEvt:
            // Handle external requests
            ExternalResponse response = HandleExternalRequest(requestInputEvt.Request);
            await checkpointedRun.Run.SendResponseAsync(response);
            break;

        case SuperStepCompletedEvent superStepCompletedEvt:
            // Save checkpoint after each interaction
            CheckpointInfo? checkpoint = superStepCompletedEvt.CompletionInfo!.Checkpoint;
            if (checkpoint is not null)
            {
                checkpoints.Add(checkpoint);
                Console.WriteLine($"Checkpoint created after human interaction.");
            }
            break;

        case WorkflowOutputEvent workflowOutputEvt:
            Console.WriteLine($"Workflow completed: {workflowOutputEvt.Data}");
            return;
    }
}

// Later, resume from any checkpoint
if (checkpoints.Count > 0)
{
    var selectedCheckpoint = checkpoints[1]; // Select specific checkpoint
    await checkpointedRun.RestoreCheckpointAsync(selectedCheckpoint);

    // Continue from that point
    await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync())
    {
        // Handle remaining workflow execution
    }
}

完整示例模式

下面是一个全面的检查点工作流模式:

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.Agents.AI.Workflows;

public static class CheckpointingExample
{
    public static async Task RunAsync()
    {
        // Create workflow and checkpoint manager
        var workflow = await WorkflowHelper.GetWorkflowAsync();
        var checkpointManager = CheckpointManager.Default;
        var checkpoints = new List<CheckpointInfo>();

        Console.WriteLine("Starting workflow with checkpointing...");

        // Execute workflow with checkpointing
        await using Checkpointed<StreamingRun> checkpointedRun = await InProcessExecution
            .StreamAsync(workflow, NumberSignal.Init, checkpointManager);

        // Monitor execution and collect checkpoints
        await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync())
        {
            switch (evt)
            {
                case ExecutorCompletedEvent executorEvt:
                    Console.WriteLine($"Executor {executorEvt.ExecutorId} completed.");
                    break;

                case SuperStepCompletedEvent superStepEvt:
                    var checkpoint = superStepEvt.CompletionInfo!.Checkpoint;
                    if (checkpoint is not null)
                    {
                        checkpoints.Add(checkpoint);
                        Console.WriteLine($"Checkpoint {checkpoints.Count} created.");
                    }
                    break;

                case WorkflowOutputEvent outputEvt:
                    Console.WriteLine($"Workflow completed: {outputEvt.Data}");
                    goto FinishExecution;
            }
        }

        FinishExecution:
        Console.WriteLine($"Total checkpoints created: {checkpoints.Count}");

        // Demonstrate resuming from a checkpoint
        if (checkpoints.Count > 5)
        {
            var selectedCheckpoint = checkpoints[5];
            Console.WriteLine($"Resuming from checkpoint 6...");

            // Restore to same instance
            await checkpointedRun.RestoreCheckpointAsync(selectedCheckpoint);

            await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync())
            {
                if (evt is WorkflowOutputEvent resumedOutputEvt)
                {
                    Console.WriteLine($"Resumed workflow result: {resumedOutputEvt.Data}");
                    break;
                }
            }
        }

        // Demonstrate rehydration with new workflow instance
        if (checkpoints.Count > 3)
        {
            var newWorkflow = await WorkflowHelper.GetWorkflowAsync();
            var rehydrationCheckpoint = checkpoints[3];

            Console.WriteLine("Rehydrating from checkpoint 4 with new workflow instance...");

            await using Checkpointed<StreamingRun> newRun = await InProcessExecution
                .ResumeStreamAsync(newWorkflow, rehydrationCheckpoint, checkpointManager, checkpointedRun.Run.RunId);

            await foreach (WorkflowEvent evt in newRun.Run.WatchStreamAsync())
            {
                if (evt is WorkflowOutputEvent rehydratedOutputEvt)
                {
                    Console.WriteLine($"Rehydrated workflow result: {rehydratedOutputEvt.Data}");
                    break;
                }
            }
        }
    }
}

主要优势

  • 容错:工作流可以通过从最后一个检查点恢复来从故障中恢复
  • 长时间运行的进程:使用自动检查点边界将长时间的工作流程分解为可管理的步骤
  • 人工介入流程:暂停外部输入,稍后从保存状态继续
  • 调试:检查特定点的工作流状态并恢复执行以进行测试
  • 可移植性:检查点可以还原到新的工作流实例(解除冻结)
  • 自动管理:在超级步骤边界自动创建检查点

运行示例

有关完整的工作实现,请参阅 CheckpointAndResume 示例

关键组件

FileCheckpointStorage

FileCheckpointStorage 类使用 JSON 文件提供持久性检查点存储:

from agent_framework import FileCheckpointStorage
from pathlib import Path

# Initialize checkpoint storage
checkpoint_storage = FileCheckpointStorage(storage_path="./checkpoints")

启用检查点

在构建工作流时启用检查点

from agent_framework import WorkflowBuilder

workflow = (
    WorkflowBuilder(max_iterations=5)
    .add_edge(executor1, executor2)
    .set_start_executor(executor1)
    .with_checkpointing(checkpoint_storage=checkpoint_storage)  # Enable checkpointing
    .build()
)

状态持久性

执行器状态

执行器可以保留跨检查点存活的本地状态:

from agent_framework import Executor, WorkflowContext, handler

class WorkerExecutor(Executor):
    """Processes numbers to compute their factor pairs and manages executor state for checkpointing."""

    def __init__(self, id: str) -> None:
        super().__init__(id=id)
        self._composite_number_pairs: dict[int, list[tuple[int, int]]] = {}

    @handler
    async def compute(
        self,
        task: ComputeTask,
        ctx: WorkflowContext[ComputeTask, dict[int, list[tuple[int, int]]]],
    ) -> None:
        """Process the next number in the task, computing its factor pairs."""
        next_number = task.remaining_numbers.pop(0)

        print(f"WorkerExecutor: Computing factor pairs for {next_number}")
        pairs: list[tuple[int, int]] = []
        for i in range(1, next_number):
            if next_number % i == 0:
                pairs.append((i, next_number // i))
        self._composite_number_pairs[next_number] = pairs

        if not task.remaining_numbers:
            # All numbers processed - output the results
            await ctx.yield_output(self._composite_number_pairs)
        else:
            # More numbers to process - continue with remaining task
            await ctx.send_message(task)

    @override
    async def on_checkpoint_save(self) -> dict[str, Any]:
        """Save the executor's internal state for checkpointing."""
        return {"composite_number_pairs": self._composite_number_pairs}

    @override
    async def on_checkpoint_restore(self, state: dict[str, Any]) -> None:
        """Restore the executor's internal state from a checkpoint."""
        self._composite_number_pairs = state.get("composite_number_pairs", {})

如何使用检查点

列出检查点

检索和检查可用的检查点:

# List all checkpoints
all_checkpoints = await checkpoint_storage.list_checkpoints()

# List checkpoints for a specific workflow
workflow_checkpoints = await checkpoint_storage.list_checkpoints(workflow_id="my-workflow")

# Sort by creation time
sorted_checkpoints = sorted(all_checkpoints, key=lambda cp: cp.timestamp)

从检查点恢复

流媒体继续

实时继续执行和流式传输事件。

# Resume from a specific checkpoint
async for event in workflow.run_stream(
    checkpoint_id="checkpoint-id",
    checkpoint_storage=checkpoint_storage
):
    print(f"Resumed Event: {event}")

    if isinstance(event, WorkflowOutputEvent):
        print(f"Final Result: {event.data}")
        break

非流式处理恢复

继续并立即获取所有结果:

# Resume and wait for completion
result = await workflow.run(
    checkpoint_id="checkpoint-id",
    checkpoint_storage=checkpoint_storage
)

# Access final outputs
outputs = result.get_outputs()
print(f"Final outputs: {outputs}")

继续处理挂起的请求

从包含挂起请求的检查点恢复时,工作流将重新发出这些请求事件,使您可以捕获和响应这些请求。

request_info_events = []
# Resume from checkpoint - pending requests will be re-emitted
async for event in workflow.run_stream(
    checkpoint_id="checkpoint-id",
    checkpoint_storage=checkpoint_storage
):
    if isinstance(event, RequestInfoEvent):
        # Capture re-emitted pending requests
        print(f"Pending request re-emitted: {event.request_id}")
        request_info_events.append(event)

# Handle the request and provide response
# If responses are already provided, no need to handle them again
responses = {}
for event in request_info_events:
    response = handle_request(event.data)
    responses[event.request_id] = response

# Send response back to workflow
async for event in workflow.send_responses_streaming(responses):
    if isinstance(event, WorkflowOutputEvent):
        print(f"Workflow completed: {event.data}")

如果从检查点恢复时有未完成的请求已经被响应,您仍然需要调用 run_stream() 以继续流程,然后使用预先提供的响应调用 send_responses_streaming()

交互式检查点选择

生成用户友好的检查点选择:

async def select_and_resume_checkpoint(workflow, storage):
    # Get available checkpoints
    checkpoints = await storage.list_checkpoints()
    if not checkpoints:
        print("No checkpoints available")
        return

    # Sort and display options
    sorted_cps = sorted(checkpoints, key=lambda cp: cp.timestamp)
    print("Available checkpoints:")
    for i, cp in enumerate(sorted_cps):
        summary = get_checkpoint_summary(cp)
        print(f"[{i}] {summary.checkpoint_id[:8]}... iter={summary.iteration_count}")

    # Get user selection
    try:
        idx = int(input("Enter checkpoint index: "))
        selected = sorted_cps[idx]

        # Resume from selected checkpoint
        print(f"Resuming from checkpoint: {selected.checkpoint_id}")
        async for event in workflow.run_stream(
            selected.checkpoint_id,
            checkpoint_storage=storage
        ):
            print(f"Event: {event}")

    except (ValueError, IndexError):
        print("Invalid selection")

完整示例模式

下面是典型的检查点工作流模式:

import asyncio
from pathlib import Path

from agent_framework import (
    FileCheckpointStorage,
    WorkflowBuilder,
    WorkflowOutputEvent,
    get_checkpoint_summary
)

async def main():
    # Setup checkpoint storage
    checkpoint_dir = Path("./checkpoints")
    checkpoint_dir.mkdir(exist_ok=True)
    storage = FileCheckpointStorage(checkpoint_dir)

    # Build workflow with checkpointing
    workflow = (
        WorkflowBuilder()
        .add_edge(executor1, executor2)
        .set_start_executor(executor1)
        .with_checkpointing(storage)
        .build()
    )

    # Initial run
    print("Running workflow...")
    async for event in workflow.run_stream("input data"):
        print(f"Event: {event}")

    # List and inspect checkpoints
    checkpoints = await storage.list_checkpoints()
    for cp in sorted(checkpoints, key=lambda c: c.timestamp):
        summary = get_checkpoint_summary(cp)
        print(f"Checkpoint: {summary.checkpoint_id[:8]}... iter={summary.iteration_count}")

    # Resume from a checkpoint
    if checkpoints:
        latest = max(checkpoints, key=lambda cp: cp.timestamp)
        print(f"Resuming from: {latest.checkpoint_id}")

        async for event in workflow.run_stream(latest.checkpoint_id):
            print(f"Resumed: {event}")

if __name__ == "__main__":
    asyncio.run(main())

主要优势

  • 容错:工作流可以通过从最后一个检查点恢复来从故障中恢复
  • 长时间运行的进程:将长时间的工作流分解为具有检查点边界的可管理段
  • 人机交互:暂停以便人工输入,然后稍后恢复 - 恢复后重新发送挂起的请求
  • 调试:检查特定点的工作流状态并恢复执行以进行测试
  • 资源管理:根据资源可用性停止和重启工作流

运行示例

完整的实际实施方案请参考 “检查点与恢复示例”

后续步骤