검사점을 사용하면 워크플로가 프로세스를 다시 시작한 후에도 특정 지점에서 상태를 저장하고 나중에 실행을 다시 시작할 수 있습니다. 이는 장기 실행 워크플로, 오류 복구 및 휴먼 인 더 루프 시나리오에 매우 중요합니다.
다루는 개념
필수 조건
- .NET 8.0 SDK 이상
- 새 콘솔 애플리케이션
주요 구성 요소
NuGet 패키지 설치
먼저 .NET 프로젝트에 필요한 패키지를 설치합니다.
dotnet add package Microsoft.Agents.AI.Workflows --prerelease
CheckpointManager
체크포인트 CheckpointManager 스토리지 및 검색 기능 제공.
using Microsoft.Agents.AI.Workflows;
// Use the default in-memory checkpoint manager
var checkpointManager = CheckpointManager.Default;
// Or create a custom checkpoint manager with JSON serialization
var checkpointManager = CheckpointManager.CreateJson(store, customOptions);
체크포인트 활성화
다음을 사용하여 워크플로를 실행할 때 검사점을 사용하도록 설정합니다InProcessExecution.
using Microsoft.Agents.AI.Workflows;
// Create workflow with checkpointing support
var workflow = await WorkflowHelper.GetWorkflowAsync();
var checkpointManager = CheckpointManager.Default;
// Execute with checkpointing enabled
await using Checkpointed<StreamingRun> checkpointedRun = await InProcessExecution
.StreamAsync(workflow, NumberSignal.Init, checkpointManager);
상태 지속성
실행기 상태
실행기는 Executor<T> 기본 클래스를 사용하여 검사점 이후에도 지속되는 로컬 상태를 유지할 수 있습니다.
internal sealed class GuessNumberExecutor : Executor<NumberSignal>("Guess")
{
private const string StateKey = "GuessNumberExecutor.State";
public int LowerBound { get; private set; }
public int UpperBound { get; private set; }
public GuessNumberExecutor() : this()
{
}
public override async ValueTask HandleAsync(NumberSignal message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
int guess = (LowerBound + UpperBound) / 2;
await context.SendMessageAsync(guess, cancellationToken);
}
/// <summary>
/// Checkpoint the current state of the executor.
/// This must be overridden to save any state that is needed to resume the executor.
/// </summary>
protected override ValueTask OnCheckpointingAsync(IWorkflowContext context, CancellationToken cancellationToken = default) =>
context.QueueStateUpdateAsync(StateKey, (LowerBound, UpperBound), cancellationToken);
/// <summary>
/// Restore the state of the executor from a checkpoint.
/// This must be overridden to restore any state that was saved during checkpointing.
/// </summary>
protected override async ValueTask OnCheckpointRestoredAsync(IWorkflowContext context, CancellationToken cancellationToken = default)
{
var state = await context.ReadStateAsync<(int, int)>(StateKey, cancellationToken);
(LowerBound, UpperBound) = state;
}
}
자동 검사점 만들기
검사점 관리자가 제공되면 각 슈퍼 단계가 끝날 때 검사점이 자동으로 생성됩니다.
var checkpoints = new List<CheckpointInfo>();
await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync())
{
switch (evt)
{
case SuperStepCompletedEvent superStepCompletedEvt:
// Checkpoints are automatically created at super step boundaries
CheckpointInfo? checkpoint = superStepCompletedEvt.CompletionInfo!.Checkpoint;
if (checkpoint is not null)
{
checkpoints.Add(checkpoint);
Console.WriteLine($"Checkpoint created at step {checkpoints.Count}.");
}
break;
case WorkflowOutputEvent workflowOutputEvt:
Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
break;
}
}
체크포인트 작업
검사점 정보 액세스
완료된 실행에서 검사점 메타데이터 액세스:
// Get all checkpoints from a checkpointed run
var allCheckpoints = checkpointedRun.Checkpoints;
// Get the latest checkpoint
var latestCheckpoint = checkpointedRun.LatestCheckpoint;
// Access checkpoint details
foreach (var checkpoint in checkpoints)
{
Console.WriteLine($"Checkpoint ID: {checkpoint.CheckpointId}");
Console.WriteLine($"Step Number: {checkpoint.StepNumber}");
Console.WriteLine($"Parent ID: {checkpoint.Parent?.CheckpointId ?? "None"}");
}
검사점 스토리지
검사점은 CheckpointManager 인터페이스를 통해 관리됩니다.
// Commit a checkpoint (usually done automatically)
CheckpointInfo checkpointInfo = await checkpointManager.CommitCheckpointAsync(runId, checkpoint);
// Retrieve a checkpoint
Checkpoint restoredCheckpoint = await checkpointManager.LookupCheckpointAsync(runId, checkpointInfo);
검사점에서 다시 시작하기
스트리밍 다시 시작
검사점에서 실행을 다시 시작하고 이벤트를 실시간으로 스트리밍합니다.
// Resume from a specific checkpoint with streaming
CheckpointInfo savedCheckpoint = checkpoints[checkpointIndex];
await using Checkpointed<StreamingRun> resumedRun = await InProcessExecution
.ResumeStreamAsync(workflow, savedCheckpoint, checkpointManager, runId);
await foreach (WorkflowEvent evt in resumedRun.Run.WatchStreamAsync())
{
switch (evt)
{
case ExecutorCompletedEvent executorCompletedEvt:
Console.WriteLine($"Executor {executorCompletedEvt.ExecutorId} completed.");
break;
case WorkflowOutputEvent workflowOutputEvt:
Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
return;
}
}
비 스트리밍 다시 시작
다시 시작하고 완료될 때까지 기다립니다.
// Resume from checkpoint without streaming
Checkpointed<Run> resumedRun = await InProcessExecution
.ResumeAsync(workflow, savedCheckpoint, checkpointManager, runId);
// Wait for completion and get final result
var result = await resumedRun.Run.WaitForCompletionAsync();
즉시 복원
기존 실행 인스턴스로 검사점을 직접 복원합니다.
// Restore checkpoint to the same run instance
await checkpointedRun.RestoreCheckpointAsync(savedCheckpoint);
// Continue execution from the restored state
await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync())
{
// Handle events as normal
if (evt is WorkflowOutputEvent outputEvt)
{
Console.WriteLine($"Resumed workflow result: {outputEvt.Data}");
break;
}
}
새 워크플로 인스턴스(리하일레이션)
검사점에서 새 워크플로 인스턴스를 만듭니다.
// Create a completely new workflow instance
var newWorkflow = await WorkflowHelper.GetWorkflowAsync();
// Resume with the new instance from a saved checkpoint
await using Checkpointed<StreamingRun> newCheckpointedRun = await InProcessExecution
.ResumeStreamAsync(newWorkflow, savedCheckpoint, checkpointManager, originalRunId);
await foreach (WorkflowEvent evt in newCheckpointedRun.Run.WatchStreamAsync())
{
if (evt is WorkflowOutputEvent workflowOutputEvt)
{
Console.WriteLine($"Rehydrated workflow result: {workflowOutputEvt.Data}");
break;
}
}
검사점이 있는 휴먼 인 더 루프
체크포인트를 인간 개입 워크플로와 결합하기.
var checkpoints = new List<CheckpointInfo>();
await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync())
{
switch (evt)
{
case RequestInfoEvent requestInputEvt:
// Handle external requests
ExternalResponse response = HandleExternalRequest(requestInputEvt.Request);
await checkpointedRun.Run.SendResponseAsync(response);
break;
case SuperStepCompletedEvent superStepCompletedEvt:
// Save checkpoint after each interaction
CheckpointInfo? checkpoint = superStepCompletedEvt.CompletionInfo!.Checkpoint;
if (checkpoint is not null)
{
checkpoints.Add(checkpoint);
Console.WriteLine($"Checkpoint created after human interaction.");
}
break;
case WorkflowOutputEvent workflowOutputEvt:
Console.WriteLine($"Workflow completed: {workflowOutputEvt.Data}");
return;
}
}
// Later, resume from any checkpoint
if (checkpoints.Count > 0)
{
var selectedCheckpoint = checkpoints[1]; // Select specific checkpoint
await checkpointedRun.RestoreCheckpointAsync(selectedCheckpoint);
// Continue from that point
await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync())
{
// Handle remaining workflow execution
}
}
전체 예제 패턴
포괄적인 검사점 워크플로 패턴은 다음과 같습니다.
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.Agents.AI.Workflows;
public static class CheckpointingExample
{
public static async Task RunAsync()
{
// Create workflow and checkpoint manager
var workflow = await WorkflowHelper.GetWorkflowAsync();
var checkpointManager = CheckpointManager.Default;
var checkpoints = new List<CheckpointInfo>();
Console.WriteLine("Starting workflow with checkpointing...");
// Execute workflow with checkpointing
await using Checkpointed<StreamingRun> checkpointedRun = await InProcessExecution
.StreamAsync(workflow, NumberSignal.Init, checkpointManager);
// Monitor execution and collect checkpoints
await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync())
{
switch (evt)
{
case ExecutorCompletedEvent executorEvt:
Console.WriteLine($"Executor {executorEvt.ExecutorId} completed.");
break;
case SuperStepCompletedEvent superStepEvt:
var checkpoint = superStepEvt.CompletionInfo!.Checkpoint;
if (checkpoint is not null)
{
checkpoints.Add(checkpoint);
Console.WriteLine($"Checkpoint {checkpoints.Count} created.");
}
break;
case WorkflowOutputEvent outputEvt:
Console.WriteLine($"Workflow completed: {outputEvt.Data}");
goto FinishExecution;
}
}
FinishExecution:
Console.WriteLine($"Total checkpoints created: {checkpoints.Count}");
// Demonstrate resuming from a checkpoint
if (checkpoints.Count > 5)
{
var selectedCheckpoint = checkpoints[5];
Console.WriteLine($"Resuming from checkpoint 6...");
// Restore to same instance
await checkpointedRun.RestoreCheckpointAsync(selectedCheckpoint);
await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync())
{
if (evt is WorkflowOutputEvent resumedOutputEvt)
{
Console.WriteLine($"Resumed workflow result: {resumedOutputEvt.Data}");
break;
}
}
}
// Demonstrate rehydration with new workflow instance
if (checkpoints.Count > 3)
{
var newWorkflow = await WorkflowHelper.GetWorkflowAsync();
var rehydrationCheckpoint = checkpoints[3];
Console.WriteLine("Rehydrating from checkpoint 4 with new workflow instance...");
await using Checkpointed<StreamingRun> newRun = await InProcessExecution
.ResumeStreamAsync(newWorkflow, rehydrationCheckpoint, checkpointManager, checkpointedRun.Run.RunId);
await foreach (WorkflowEvent evt in newRun.Run.WatchStreamAsync())
{
if (evt is WorkflowOutputEvent rehydratedOutputEvt)
{
Console.WriteLine($"Rehydrated workflow result: {rehydratedOutputEvt.Data}");
break;
}
}
}
}
}
주요 이점
- 내결함성: 워크플로는 마지막 검사점에서 다시 시작하여 장애를 복구할 수 있습니다.
- 장시간 실행 프로세스: 자동 체크포인트 경계를 활용하여 긴 워크플로를 관리 가능한 세그먼트로 분할
- 휴먼 인 더 루프: 외부 입력을 위해 일시 중지하고 나중에 저장된 상태에서 다시 시작
- 디버깅: 특정 지점에서 워크플로 상태를 검사하고 테스트를 위해 실행을 다시 시작합니다.
- 이식성: 검사점을 새 워크플로 인스턴스로 복원할 수 있습니다(리하일레이션).
- 자동 관리: 검사점은 슈퍼 단계 경계에서 자동으로 생성됩니다.
예제 실행
전체 작업 구현은 CheckpointAndResume 샘플을 참조하세요.
주요 구성 요소
파일 체크포인트 저장소
클래스는 FileCheckpointStorage JSON 파일을 사용하여 영구 검사점 스토리지를 제공합니다.
from agent_framework import FileCheckpointStorage
from pathlib import Path
# Initialize checkpoint storage
checkpoint_storage = FileCheckpointStorage(storage_path="./checkpoints")
체크포인트 활성화
워크플로를 빌드할 때 검사점을 사용하도록 설정합니다.
from agent_framework import WorkflowBuilder
workflow = (
WorkflowBuilder(max_iterations=5)
.add_edge(executor1, executor2)
.set_start_executor(executor1)
.with_checkpointing(checkpoint_storage=checkpoint_storage) # Enable checkpointing
.build()
)
상태 지속성
실행기 상태
실행기는 검사점을 통과하는 동안 로컬 상태를 보존할 수 있습니다.
from agent_framework import Executor, WorkflowContext, handler
class WorkerExecutor(Executor):
"""Processes numbers to compute their factor pairs and manages executor state for checkpointing."""
def __init__(self, id: str) -> None:
super().__init__(id=id)
self._composite_number_pairs: dict[int, list[tuple[int, int]]] = {}
@handler
async def compute(
self,
task: ComputeTask,
ctx: WorkflowContext[ComputeTask, dict[int, list[tuple[int, int]]]],
) -> None:
"""Process the next number in the task, computing its factor pairs."""
next_number = task.remaining_numbers.pop(0)
print(f"WorkerExecutor: Computing factor pairs for {next_number}")
pairs: list[tuple[int, int]] = []
for i in range(1, next_number):
if next_number % i == 0:
pairs.append((i, next_number // i))
self._composite_number_pairs[next_number] = pairs
if not task.remaining_numbers:
# All numbers processed - output the results
await ctx.yield_output(self._composite_number_pairs)
else:
# More numbers to process - continue with remaining task
await ctx.send_message(task)
@override
async def on_checkpoint_save(self) -> dict[str, Any]:
"""Save the executor's internal state for checkpointing."""
return {"composite_number_pairs": self._composite_number_pairs}
@override
async def on_checkpoint_restore(self, state: dict[str, Any]) -> None:
"""Restore the executor's internal state from a checkpoint."""
self._composite_number_pairs = state.get("composite_number_pairs", {})
체크포인트 작업
검사점 나열
사용 가능한 검사점을 검색하고 검사합니다.
# List all checkpoints
all_checkpoints = await checkpoint_storage.list_checkpoints()
# List checkpoints for a specific workflow
workflow_checkpoints = await checkpoint_storage.list_checkpoints(workflow_id="my-workflow")
# Sort by creation time
sorted_checkpoints = sorted(all_checkpoints, key=lambda cp: cp.timestamp)
검사점에서 다시 시작하기
스트리밍 다시 시작
실시간으로 실행 및 스트림 이벤트를 다시 시작합니다.
# Resume from a specific checkpoint
async for event in workflow.run_stream(
checkpoint_id="checkpoint-id",
checkpoint_storage=checkpoint_storage
):
print(f"Resumed Event: {event}")
if isinstance(event, WorkflowOutputEvent):
print(f"Final Result: {event.data}")
break
비 스트리밍 다시 시작
한 번에 모든 결과를 다시 시작하고 가져옵니다.
# Resume and wait for completion
result = await workflow.run(
checkpoint_id="checkpoint-id",
checkpoint_storage=checkpoint_storage
)
# Access final outputs
outputs = result.get_outputs()
print(f"Final outputs: {outputs}")
보류 중인 요청으로 다시 시작
보류 중인 요청이 포함된 검사점에서 다시 시작하면 워크플로에서 해당 요청 이벤트를 다시 내보내서 캡처하고 응답할 수 있습니다.
request_info_events = []
# Resume from checkpoint - pending requests will be re-emitted
async for event in workflow.run_stream(
checkpoint_id="checkpoint-id",
checkpoint_storage=checkpoint_storage
):
if isinstance(event, RequestInfoEvent):
# Capture re-emitted pending requests
print(f"Pending request re-emitted: {event.request_id}")
request_info_events.append(event)
# Handle the request and provide response
# If responses are already provided, no need to handle them again
responses = {}
for event in request_info_events:
response = handle_request(event.data)
responses[event.request_id] = response
# Send response back to workflow
async for event in workflow.send_responses_streaming(responses):
if isinstance(event, WorkflowOutputEvent):
print(f"Workflow completed: {event.data}")
이미 응답된 보류 중인 요청이 있는 검사점에서 다시 시작하는 경우, 워크플로를 계속하기 위해 먼저 run_stream()를 호출한 다음 미리 준비된 응답을 이용하여 send_responses_streaming()을 호출해야 합니다.
대화형 검사점 선택
사용자에게 친숙한 검사점 선택을 빌드합니다.
async def select_and_resume_checkpoint(workflow, storage):
# Get available checkpoints
checkpoints = await storage.list_checkpoints()
if not checkpoints:
print("No checkpoints available")
return
# Sort and display options
sorted_cps = sorted(checkpoints, key=lambda cp: cp.timestamp)
print("Available checkpoints:")
for i, cp in enumerate(sorted_cps):
summary = get_checkpoint_summary(cp)
print(f"[{i}] {summary.checkpoint_id[:8]}... iter={summary.iteration_count}")
# Get user selection
try:
idx = int(input("Enter checkpoint index: "))
selected = sorted_cps[idx]
# Resume from selected checkpoint
print(f"Resuming from checkpoint: {selected.checkpoint_id}")
async for event in workflow.run_stream(
selected.checkpoint_id,
checkpoint_storage=storage
):
print(f"Event: {event}")
except (ValueError, IndexError):
print("Invalid selection")
전체 예제 패턴
일반적인 검사점 워크플로 패턴은 다음과 같습니다.
import asyncio
from pathlib import Path
from agent_framework import (
FileCheckpointStorage,
WorkflowBuilder,
WorkflowOutputEvent,
get_checkpoint_summary
)
async def main():
# Setup checkpoint storage
checkpoint_dir = Path("./checkpoints")
checkpoint_dir.mkdir(exist_ok=True)
storage = FileCheckpointStorage(checkpoint_dir)
# Build workflow with checkpointing
workflow = (
WorkflowBuilder()
.add_edge(executor1, executor2)
.set_start_executor(executor1)
.with_checkpointing(storage)
.build()
)
# Initial run
print("Running workflow...")
async for event in workflow.run_stream("input data"):
print(f"Event: {event}")
# List and inspect checkpoints
checkpoints = await storage.list_checkpoints()
for cp in sorted(checkpoints, key=lambda c: c.timestamp):
summary = get_checkpoint_summary(cp)
print(f"Checkpoint: {summary.checkpoint_id[:8]}... iter={summary.iteration_count}")
# Resume from a checkpoint
if checkpoints:
latest = max(checkpoints, key=lambda cp: cp.timestamp)
print(f"Resuming from: {latest.checkpoint_id}")
async for event in workflow.run_stream(latest.checkpoint_id):
print(f"Resumed: {event}")
if __name__ == "__main__":
asyncio.run(main())
주요 이점
- 내결함성: 워크플로는 마지막 검사점에서 다시 시작하여 장애를 복구할 수 있습니다.
- 긴 시간 실행 프로세스: 긴 워크플로를 검사점 경계를 활용하여 관리 가능한 세그먼트로 나누기.
- 휴먼 인 더 루프: 사용자 입력을 위해 일시 중지하고 나중에 다시 시작 - 보류 중인 요청은 다시 시작 시 다시 내보내집니다.
- 디버깅: 특정 지점에서 워크플로 상태를 검사하고 테스트를 위해 실행을 다시 시작합니다.
- 리소스 관리: 리소스 가용성에 따라 워크플로 중지 및 다시 시작
예제 실행
전체 작업 구현에 대해서는 체크포인트 및 재시작 샘플을 참조하세요.