Nuta
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować się zalogować lub zmienić katalog.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Tworzenie punktów kontrolnych umożliwia przepływom pracy zapisywanie ich stanu w określonych punktach i wznawianie wykonywania później, nawet po ponownym uruchomieniu procesu. Ma to kluczowe znaczenie dla długotrwałych przepływów pracy, odzyskiwania błędów oraz scenariuszy z udziałem człowieka.
Omówione pojęcia
Wymagania wstępne
- Pakiet .NET 8.0 SDK lub nowszy
- Nowa aplikacja konsolowa
Kluczowe składniki
Instalowanie pakietów NuGet
Najpierw zainstaluj wymagane pakiety dla projektu .NET:
dotnet add package Microsoft.Agents.AI.Workflows --prerelease
CheckpointManager
CheckpointManager zapewnia funkcjonalność przechowywania i pobierania punktów kontrolnych.
using Microsoft.Agents.AI.Workflows;
// Use the default in-memory checkpoint manager
var checkpointManager = CheckpointManager.Default;
// Or create a custom checkpoint manager with JSON serialization
var checkpointManager = CheckpointManager.CreateJson(store, customOptions);
Włączanie tworzenia punktów kontrolnych
Włącz punktowanie kontrolne w ramach realizacji przepływów pracy za pomocą InProcessExecution:
using Microsoft.Agents.AI.Workflows;
// Create workflow with checkpointing support
var workflow = await WorkflowHelper.GetWorkflowAsync();
var checkpointManager = CheckpointManager.Default;
// Execute with checkpointing enabled
await using Checkpointed<StreamingRun> checkpointedRun = await InProcessExecution
.StreamAsync(workflow, NumberSignal.Init, checkpointManager);
Trwałość stanu
Stan egzekutora
Egzekutorzy mogą utrwalać lokalny stan, który przetrwa punkty kontrolne, korzystając z klasy bazowej Executor<T>.
internal sealed class GuessNumberExecutor : Executor<NumberSignal>("Guess")
{
private const string StateKey = "GuessNumberExecutor.State";
public int LowerBound { get; private set; }
public int UpperBound { get; private set; }
public GuessNumberExecutor() : this()
{
}
public override async ValueTask HandleAsync(NumberSignal message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
int guess = (LowerBound + UpperBound) / 2;
await context.SendMessageAsync(guess, cancellationToken);
}
/// <summary>
/// Checkpoint the current state of the executor.
/// This must be overridden to save any state that is needed to resume the executor.
/// </summary>
protected override ValueTask OnCheckpointingAsync(IWorkflowContext context, CancellationToken cancellationToken = default) =>
context.QueueStateUpdateAsync(StateKey, (LowerBound, UpperBound), cancellationToken);
/// <summary>
/// Restore the state of the executor from a checkpoint.
/// This must be overridden to restore any state that was saved during checkpointing.
/// </summary>
protected override async ValueTask OnCheckpointRestoredAsync(IWorkflowContext context, CancellationToken cancellationToken = default)
{
var state = await context.ReadStateAsync<(int, int)>(StateKey, cancellationToken);
(LowerBound, UpperBound) = state;
}
}
Automatyczne tworzenie punktu kontrolnego
Punkty kontrolne są tworzone automatycznie na końcu każdego superkroku, gdy menedżer punktów kontrolnych jest dostępny.
var checkpoints = new List<CheckpointInfo>();
await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync())
{
switch (evt)
{
case SuperStepCompletedEvent superStepCompletedEvt:
// Checkpoints are automatically created at super step boundaries
CheckpointInfo? checkpoint = superStepCompletedEvt.CompletionInfo!.Checkpoint;
if (checkpoint is not null)
{
checkpoints.Add(checkpoint);
Console.WriteLine($"Checkpoint created at step {checkpoints.Count}.");
}
break;
case WorkflowOutputEvent workflowOutputEvt:
Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
break;
}
}
Praca z punktami kontrolnymi
Uzyskiwanie dostępu do informacji o punkcie kontrolnym
Uzyskiwanie dostępu do metadanych punktu kontrolnego z ukończonych przebiegów:
// Get all checkpoints from a checkpointed run
var allCheckpoints = checkpointedRun.Checkpoints;
// Get the latest checkpoint
var latestCheckpoint = checkpointedRun.LatestCheckpoint;
// Access checkpoint details
foreach (var checkpoint in checkpoints)
{
Console.WriteLine($"Checkpoint ID: {checkpoint.CheckpointId}");
Console.WriteLine($"Step Number: {checkpoint.StepNumber}");
Console.WriteLine($"Parent ID: {checkpoint.Parent?.CheckpointId ?? "None"}");
}
Przechowywanie punktów kontrolnych
Punkty kontrolne są zarządzane za pośrednictwem interfejsu CheckpointManager :
// Commit a checkpoint (usually done automatically)
CheckpointInfo checkpointInfo = await checkpointManager.CommitCheckpointAsync(runId, checkpoint);
// Retrieve a checkpoint
Checkpoint restoredCheckpoint = await checkpointManager.LookupCheckpointAsync(runId, checkpointInfo);
Wznawianie z punktów kontrolnych
Wznawianie przesyłania strumieniowego
Wznowienie wykonywania z punktu kontrolnego i strumieniowanie zdarzeń w czasie rzeczywistym.
// Resume from a specific checkpoint with streaming
CheckpointInfo savedCheckpoint = checkpoints[checkpointIndex];
await using Checkpointed<StreamingRun> resumedRun = await InProcessExecution
.ResumeStreamAsync(workflow, savedCheckpoint, checkpointManager, runId);
await foreach (WorkflowEvent evt in resumedRun.Run.WatchStreamAsync())
{
switch (evt)
{
case ExecutorCompletedEvent executorCompletedEvt:
Console.WriteLine($"Executor {executorCompletedEvt.ExecutorId} completed.");
break;
case WorkflowOutputEvent workflowOutputEvt:
Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
return;
}
}
Nieprzesyłające CV
Wznów i poczekaj na zakończenie:
// Resume from checkpoint without streaming
Checkpointed<Run> resumedRun = await InProcessExecution
.ResumeAsync(workflow, savedCheckpoint, checkpointManager, runId);
// Wait for completion and get final result
var result = await resumedRun.Run.WaitForCompletionAsync();
przywracanie In-Place
Przywracanie punktu kontrolnego bezpośrednio do istniejącego wystąpienia przebiegu:
// Restore checkpoint to the same run instance
await checkpointedRun.RestoreCheckpointAsync(savedCheckpoint);
// Continue execution from the restored state
await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync())
{
// Handle events as normal
if (evt is WorkflowOutputEvent outputEvt)
{
Console.WriteLine($"Resumed workflow result: {outputEvt.Data}");
break;
}
}
Nowe wystąpienie przepływu pracy (reaktywacja)
Utwórz nowe wystąpienie przepływu pracy z punktu kontrolnego:
// Create a completely new workflow instance
var newWorkflow = await WorkflowHelper.GetWorkflowAsync();
// Resume with the new instance from a saved checkpoint
await using Checkpointed<StreamingRun> newCheckpointedRun = await InProcessExecution
.ResumeStreamAsync(newWorkflow, savedCheckpoint, checkpointManager, originalRunId);
await foreach (WorkflowEvent evt in newCheckpointedRun.Run.WatchStreamAsync())
{
if (evt is WorkflowOutputEvent workflowOutputEvt)
{
Console.WriteLine($"Rehydrated workflow result: {workflowOutputEvt.Data}");
break;
}
}
Człowiek w pętli z punktami kontrolnymi
Połącz tworzenie punktów kontrolnych z przepływami pracy typu human-in-the-loop:
var checkpoints = new List<CheckpointInfo>();
await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync())
{
switch (evt)
{
case RequestInfoEvent requestInputEvt:
// Handle external requests
ExternalResponse response = HandleExternalRequest(requestInputEvt.Request);
await checkpointedRun.Run.SendResponseAsync(response);
break;
case SuperStepCompletedEvent superStepCompletedEvt:
// Save checkpoint after each interaction
CheckpointInfo? checkpoint = superStepCompletedEvt.CompletionInfo!.Checkpoint;
if (checkpoint is not null)
{
checkpoints.Add(checkpoint);
Console.WriteLine($"Checkpoint created after human interaction.");
}
break;
case WorkflowOutputEvent workflowOutputEvt:
Console.WriteLine($"Workflow completed: {workflowOutputEvt.Data}");
return;
}
}
// Later, resume from any checkpoint
if (checkpoints.Count > 0)
{
var selectedCheckpoint = checkpoints[1]; // Select specific checkpoint
await checkpointedRun.RestoreCheckpointAsync(selectedCheckpoint);
// Continue from that point
await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync())
{
// Handle remaining workflow execution
}
}
Kompletny przykładowy wzorzec
Oto kompleksowy wzorzec przepływu pracy tworzenia punktów kontrolnych:
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.Agents.AI.Workflows;
public static class CheckpointingExample
{
public static async Task RunAsync()
{
// Create workflow and checkpoint manager
var workflow = await WorkflowHelper.GetWorkflowAsync();
var checkpointManager = CheckpointManager.Default;
var checkpoints = new List<CheckpointInfo>();
Console.WriteLine("Starting workflow with checkpointing...");
// Execute workflow with checkpointing
await using Checkpointed<StreamingRun> checkpointedRun = await InProcessExecution
.StreamAsync(workflow, NumberSignal.Init, checkpointManager);
// Monitor execution and collect checkpoints
await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync())
{
switch (evt)
{
case ExecutorCompletedEvent executorEvt:
Console.WriteLine($"Executor {executorEvt.ExecutorId} completed.");
break;
case SuperStepCompletedEvent superStepEvt:
var checkpoint = superStepEvt.CompletionInfo!.Checkpoint;
if (checkpoint is not null)
{
checkpoints.Add(checkpoint);
Console.WriteLine($"Checkpoint {checkpoints.Count} created.");
}
break;
case WorkflowOutputEvent outputEvt:
Console.WriteLine($"Workflow completed: {outputEvt.Data}");
goto FinishExecution;
}
}
FinishExecution:
Console.WriteLine($"Total checkpoints created: {checkpoints.Count}");
// Demonstrate resuming from a checkpoint
if (checkpoints.Count > 5)
{
var selectedCheckpoint = checkpoints[5];
Console.WriteLine($"Resuming from checkpoint 6...");
// Restore to same instance
await checkpointedRun.RestoreCheckpointAsync(selectedCheckpoint);
await foreach (WorkflowEvent evt in checkpointedRun.Run.WatchStreamAsync())
{
if (evt is WorkflowOutputEvent resumedOutputEvt)
{
Console.WriteLine($"Resumed workflow result: {resumedOutputEvt.Data}");
break;
}
}
}
// Demonstrate rehydration with new workflow instance
if (checkpoints.Count > 3)
{
var newWorkflow = await WorkflowHelper.GetWorkflowAsync();
var rehydrationCheckpoint = checkpoints[3];
Console.WriteLine("Rehydrating from checkpoint 4 with new workflow instance...");
await using Checkpointed<StreamingRun> newRun = await InProcessExecution
.ResumeStreamAsync(newWorkflow, rehydrationCheckpoint, checkpointManager, checkpointedRun.Run.RunId);
await foreach (WorkflowEvent evt in newRun.Run.WatchStreamAsync())
{
if (evt is WorkflowOutputEvent rehydratedOutputEvt)
{
Console.WriteLine($"Rehydrated workflow result: {rehydratedOutputEvt.Data}");
break;
}
}
}
}
}
Najważniejsze korzyści
- Odporność na uszkodzenia: przepływy pracy mogą odzyskiwać sprawność po awariach, wracając z ostatniego punktu kontrolnego
- Długotrwałe procesy: podziel długie przepływy pracy na segmenty zarządzane przy użyciu automatycznych granic punktów kontrolnych
- Human-in-the-Loop: Wstrzymaj w celu uzyskania zewnętrznych danych wejściowych, następnie wznów z zapisanego stanu
- Debugowanie: sprawdzanie stanu przepływu pracy w określonych punktach i wznawianie wykonywania na potrzeby testowania
- Przenośność: punkty kontrolne można przywrócić do nowych wystąpień przepływu pracy (ponowne wypełnianie)
- Automatyczne zarządzanie: punkty kontrolne są tworzone automatycznie w granicach superetapowych
Uruchamianie przykładu
Aby uzyskać pełną implementację roboczą, zobacz przykład CheckpointAndResume.
Kluczowe składniki
FileCheckpointStorage
Klasa FileCheckpointStorage udostępnia trwały magazyn punktów kontrolnych przy użyciu plików JSON:
from agent_framework import FileCheckpointStorage
from pathlib import Path
# Initialize checkpoint storage
checkpoint_storage = FileCheckpointStorage(storage_path="./checkpoints")
Włączanie tworzenia punktów kontrolnych
Włącz punktowanie kontrolne podczas projektowania workflow.
from agent_framework import WorkflowBuilder
workflow = (
WorkflowBuilder(max_iterations=5)
.add_edge(executor1, executor2)
.set_start_executor(executor1)
.with_checkpointing(checkpoint_storage=checkpoint_storage) # Enable checkpointing
.build()
)
Trwałość stanu
Stan egzekutora
Wykonawcy mogą zachowywać stan lokalny, który przetrwa punkty kontrolne.
from agent_framework import Executor, WorkflowContext, handler
class WorkerExecutor(Executor):
"""Processes numbers to compute their factor pairs and manages executor state for checkpointing."""
def __init__(self, id: str) -> None:
super().__init__(id=id)
self._composite_number_pairs: dict[int, list[tuple[int, int]]] = {}
@handler
async def compute(
self,
task: ComputeTask,
ctx: WorkflowContext[ComputeTask, dict[int, list[tuple[int, int]]]],
) -> None:
"""Process the next number in the task, computing its factor pairs."""
next_number = task.remaining_numbers.pop(0)
print(f"WorkerExecutor: Computing factor pairs for {next_number}")
pairs: list[tuple[int, int]] = []
for i in range(1, next_number):
if next_number % i == 0:
pairs.append((i, next_number // i))
self._composite_number_pairs[next_number] = pairs
if not task.remaining_numbers:
# All numbers processed - output the results
await ctx.yield_output(self._composite_number_pairs)
else:
# More numbers to process - continue with remaining task
await ctx.send_message(task)
@override
async def on_checkpoint_save(self) -> dict[str, Any]:
"""Save the executor's internal state for checkpointing."""
return {"composite_number_pairs": self._composite_number_pairs}
@override
async def on_checkpoint_restore(self, state: dict[str, Any]) -> None:
"""Restore the executor's internal state from a checkpoint."""
self._composite_number_pairs = state.get("composite_number_pairs", {})
Praca z punktami kontrolnymi
Wyświetlanie listy punktów kontrolnych
Pobierz i sprawdź dostępne punkty kontrolne:
# List all checkpoints
all_checkpoints = await checkpoint_storage.list_checkpoints()
# List checkpoints for a specific workflow
workflow_checkpoints = await checkpoint_storage.list_checkpoints(workflow_id="my-workflow")
# Sort by creation time
sorted_checkpoints = sorted(all_checkpoints, key=lambda cp: cp.timestamp)
Wznawianie z punktów kontrolnych
Wznawianie przesyłania strumieniowego
Wznowienie działania i transmisja zdarzeń w czasie rzeczywistym:
# Resume from a specific checkpoint
async for event in workflow.run_stream(
checkpoint_id="checkpoint-id",
checkpoint_storage=checkpoint_storage
):
print(f"Resumed Event: {event}")
if isinstance(event, WorkflowOutputEvent):
print(f"Final Result: {event.data}")
break
Nieprzesyłające CV
Wznów i pobierz wszystkie wyniki jednocześnie:
# Resume and wait for completion
result = await workflow.run(
checkpoint_id="checkpoint-id",
checkpoint_storage=checkpoint_storage
)
# Access final outputs
outputs = result.get_outputs()
print(f"Final outputs: {outputs}")
Wznawianie oczekujących żądań
Podczas wznawiania pracy z punktu kontrolnego, zawierającego oczekujące żądania, przepływ pracy ponownie wysyła te zdarzenia żądań, co umożliwia ich przechwycenie i odpowiedzenie na nie.
request_info_events = []
# Resume from checkpoint - pending requests will be re-emitted
async for event in workflow.run_stream(
checkpoint_id="checkpoint-id",
checkpoint_storage=checkpoint_storage
):
if isinstance(event, RequestInfoEvent):
# Capture re-emitted pending requests
print(f"Pending request re-emitted: {event.request_id}")
request_info_events.append(event)
# Handle the request and provide response
# If responses are already provided, no need to handle them again
responses = {}
for event in request_info_events:
response = handle_request(event.data)
responses[event.request_id] = response
# Send response back to workflow
async for event in workflow.send_responses_streaming(responses):
if isinstance(event, WorkflowOutputEvent):
print(f"Workflow completed: {event.data}")
Jeśli wznawiasz z punktu kontrolnego z oczekującymi żądaniami, na które już udzielono odpowiedzi, należy nadal wywołać run_stream(), aby kontynuować przepływ pracy, a następnie send_responses_streaming() z użyciem wcześniej dostarczonych odpowiedzi.
Wybór interakcyjnego punktu kontrolnego
Utwórz przyjazny dla użytkownika wybór punktu kontrolnego:
async def select_and_resume_checkpoint(workflow, storage):
# Get available checkpoints
checkpoints = await storage.list_checkpoints()
if not checkpoints:
print("No checkpoints available")
return
# Sort and display options
sorted_cps = sorted(checkpoints, key=lambda cp: cp.timestamp)
print("Available checkpoints:")
for i, cp in enumerate(sorted_cps):
summary = get_checkpoint_summary(cp)
print(f"[{i}] {summary.checkpoint_id[:8]}... iter={summary.iteration_count}")
# Get user selection
try:
idx = int(input("Enter checkpoint index: "))
selected = sorted_cps[idx]
# Resume from selected checkpoint
print(f"Resuming from checkpoint: {selected.checkpoint_id}")
async for event in workflow.run_stream(
selected.checkpoint_id,
checkpoint_storage=storage
):
print(f"Event: {event}")
except (ValueError, IndexError):
print("Invalid selection")
Kompletny przykładowy wzorzec
Oto typowy wzorzec przepływu pracy tworzenia punktów kontrolnych:
import asyncio
from pathlib import Path
from agent_framework import (
FileCheckpointStorage,
WorkflowBuilder,
WorkflowOutputEvent,
get_checkpoint_summary
)
async def main():
# Setup checkpoint storage
checkpoint_dir = Path("./checkpoints")
checkpoint_dir.mkdir(exist_ok=True)
storage = FileCheckpointStorage(checkpoint_dir)
# Build workflow with checkpointing
workflow = (
WorkflowBuilder()
.add_edge(executor1, executor2)
.set_start_executor(executor1)
.with_checkpointing(storage)
.build()
)
# Initial run
print("Running workflow...")
async for event in workflow.run_stream("input data"):
print(f"Event: {event}")
# List and inspect checkpoints
checkpoints = await storage.list_checkpoints()
for cp in sorted(checkpoints, key=lambda c: c.timestamp):
summary = get_checkpoint_summary(cp)
print(f"Checkpoint: {summary.checkpoint_id[:8]}... iter={summary.iteration_count}")
# Resume from a checkpoint
if checkpoints:
latest = max(checkpoints, key=lambda cp: cp.timestamp)
print(f"Resuming from: {latest.checkpoint_id}")
async for event in workflow.run_stream(latest.checkpoint_id):
print(f"Resumed: {event}")
if __name__ == "__main__":
asyncio.run(main())
Najważniejsze korzyści
- Odporność na uszkodzenia: przepływy pracy mogą odzyskiwać sprawność po awariach, wracając z ostatniego punktu kontrolnego
- Procesy długotrwałe: Podziel długie przepływy pracy na segmenty, którymi można zarządzać przy użyciu granic punktów kontrolnych
- Human-in-the-Loop: Zatrzymaj proces dla wkładu ludzkiego i wznów później — oczekujące żądania są ponownie wysyłane po wznowieniu
- Debugowanie: sprawdzanie stanu przepływu pracy w określonych punktach i wznawianie wykonywania na potrzeby testowania
- Zarządzanie zasobami: zatrzymywanie i ponowne uruchamianie przepływów pracy na podstawie dostępności zasobów
Uruchamianie przykładu
Aby uzyskać pełną implementację roboczą, zapoznaj się z przykładem Checkpoint with Resume.