Checkpointing and migration¶
A lunar-mission planning pipeline that writes a SQLite checkpoint after every step, survives a simulated mid-pipeline crash, and then resumes the saved invocation under an upgraded state schema with a v1->v2 migration backfilling new fields.
Overview¶
A planning pipeline drafts a lunar mission plan in three steps:
define_objective- state the primary objective in one sentence.size_crew- pick a crew size between 2 and 8.draft_timeline- draft a one-sentence timeline.
The graph is wired with a SQLiteCheckpointer in JSON mode, so the
engine writes a record after every completed event. The demo
stages two reliability stories on top of that wiring:
Phase 1 - crash and resume. The first attempt at the v1 graph
runs define_objective to completion (its checkpoint saves), then
size_crew raises a simulated transient failure - the kind of
mid-LLM-call infrastructure blip you can't engineer away in
production (OOM kill, pod preemption, network drop). NodeException
reaches the invoke() boundary. A second invoke() call passes
resume_invocation=<id> of the same saved record. The engine reads
the record, skips define_objective (its position is already in
completed_positions), retries size_crew, runs draft_timeline,
and the pipeline finishes.
Phase 2 - migration on resume. Then "some time later" - in the
same script for demo purposes - a v2 schema lands. It adds a
risk_assessment field and a new assess_risks node at the end. A
migration function backfills risk_assessment="" for v1 records.
The v2 graph resumes the (now-completed) v1 invocation, the
migration runs once on load, and execution continues at
assess_risks. The original three v1 nodes do not re-execute.
What it teaches¶
SQLiteCheckpointer(path, serialization="json")writes records to a SQLite file synchronously after every node completes. The save returns before the next node starts, so a crash mid-next-node can't lose the previous node's record. JSON is the migration-eligible serialization;picklemode is faster but can't bridge schemas.with_checkpointerwires the checkpointer into aGraphBuilder. The engine fires a save at everycompletedevent for outermost and subgraph-internal nodes.NodeExceptionat theinvoke()boundary. When a node raises, the engine wraps the cause inNodeExceptionand re-raises it frominvoke(). The caller catches it; the checkpointer's record of the previously completed nodes is already durable on disk.exc.__cause__carries the original exception,exc.node_nameidentifies the failing node, andexc.recoverable_statecarries the state as it was just before the failing node ran.invoke(state, resume_invocation=<id>)resumes from a saved record. The engine reads the record, skips nodes whosecompletedevents are already incompleted_positions, and continues at the first uncompleted node. The retried node runs from a clean slate against the loaded state - whatever transient condition caused the previous failure can resolve cleanly.- Each
invoke()mints its owninvocation_id, even on resume. The pre-crash record stays under the original id; the resumed attempt's new checkpoints save under a fresh id. Phase 2's resume target is the resumed attempt's id (the post-resume completed record), not the original id (the pre-crash partial record). The example re-queriesCheckpointFilter(correlation_id=run_id)after the phase 1 resume completes to capture the new id; the sharedcorrelation_idis the cross-attempt join key. State.schema_versionas aClassVar[str]declaration. Empty string opts the class out of migration support; any non-empty value opts it in.with_state_migration(from_version, to_version, migrate)registers one edge of the migration chain. Themigratecallable is pure (dict in, dict out, no I/O). The engine applies it on load when the saved record'sschema_versiondoesn't match the current state class's.- The migration registry's BFS resolution. With a v3 schema
and two migration edges (
v1->v2,v2->v3), the registry walks the shortest chain automatically. A v1 record loaded under a v3 graph runsv1->v2thenv2->v3without caller-side composition.
How to run¶
uv sync --group examples
LLM_API_KEY=sk-... uv run python examples/checkpointing-and-migration/main.py
The SQLite database is created in a TemporaryDirectory that's
cleaned up automatically. The demo runs both phases in one
invocation so you can see the crash, the resume, and the migration
end-to-end without manual orchestration.
The graph¶
V1 graph:
flowchart LR
start([start])
define_objective[define_objective]
size_crew[size_crew]
draft_timeline[draft_timeline]
stop([end])
start --> define_objective --> size_crew --> draft_timeline --> stop
V2 graph (adds assess_risks at the end):
flowchart LR
start([start])
define_objective[define_objective]
size_crew[size_crew]
draft_timeline[draft_timeline]
assess_risks[assess_risks]
stop([end])
start --> define_objective --> size_crew --> draft_timeline --> assess_risks --> stop
The v2 graph also registers with_state_migration("v1", "v2",
migrate_v1_to_v2). The migration function takes the saved state
as a plain dict and returns a dict at the new schema (here, just
{**state_dict, "risk_assessment": ""}).
Reading the output¶
========================================================================
Phase 1 - invoke v1 graph; size_crew crashes; resume picks up
========================================================================
destination: Lunar South Pole
checkpoint db: /tmp/oa-checkpoint-demo-.../checkpoints.sqlite
first attempt:
NodeException at node 'size_crew': simulated transient mid-pipeline crash before size_crew completed
saved invocation_id: <uuid>
completed nodes: ['define_objective']
second attempt (resume from saved invocation):
objective: <objective sentence>
crew_size: 4
timeline: <timeline sentence>
trace: ['define_objective', 'size_crew', 'draft_timeline']
resumed invocation_id: <uuid-B, different from the pre-crash uuid above>
Each node name appears exactly once across two invoke() calls.
define_objective is in trace from the first attempt (its append
survived the crash via the synchronous checkpoint); size_crew +
draft_timeline are from the resumed attempt. size_crew has no
duplicate entry because its first call raised before returning
a state update.
========================================================================
Phase 2 - invoke v2 graph with resume; v1->v2 migration runs
========================================================================
v2 adds: risk_assessment field + assess_risks node
migration: backfills risk_assessment='' for v1 records
v2 result after resume:
objective: <same objective sentence>
crew_size: 4
timeline: <same timeline sentence>
risk_assessment: <new sentence from assess_risks>
trace: ['define_objective', 'size_crew', 'draft_timeline', 'assess_risks']
v2's trace appends 'assess_risks' to the v1 entries the migration
preserved. Each v1 node appears exactly once (no duplicates from
the v2 graph re-running them) because completed_positions skipped
them. Only assess_risks was new work in phase 2.
NodeException at node 'size_crew'is the signal that the engine caught the simulated crash and surfaced it at theinvoke()boundary. The caller'stry / except NodeExceptionis the canonical error boundary for nodes;exc.__cause__carries the originalRuntimeError.completed nodes: ['define_objective']on the loaded record proves the durability claim. Thedefine_objectivecheckpoint is on disk beforesize_creweven started; the crash can't take that record with it. The example projectsrecord.completed_positions(a tuple ofNodePositionentries carrying namespace, node_name, step, attempt_index) down to just the node names for display.saved invocation_idandresumed invocation_idare different. Eachinvoke()mints a freshinvocation_id, including the resumed call. The pre-crash record (one completed node) stays under the original id; the resumed attempt's checkpoints (size_crew + draft_timeline completions) save under the new id. The cross-attempt join key that ties them together iscorrelation_id(here:demo-mission-plan-1), supplied by the caller on the first invoke and preserved across the resume. Phase 2'sresume_invocation=target is the resumed attempt's id, NOT the original; resuming from the original would reload the pre-crash partial record and re-runsize_crew+draft_timeline, defeating the "completed v1 then migrate" story.trace: ['define_objective', 'size_crew', 'draft_timeline']after the resume is the cross-attempt continuity proof. The resumed invoke starts from the saved state (sotracealready carries the first attempt'sdefine_objectiveentry), and theappendreducer accumulates entries from the post-crash nodes on top. Each node name appears exactly once:define_objectiveran once on the first attempt;size_crewran twice but only the second call returned a state update (the first call raised before its return);draft_timelineran once on the resume. The absence of duplicates is the engine-side skip-set's signature.trace: [..., 'assess_risks']on the v2 result extends the v1 entries with one new entry. The v1 nodes did not re-execute on the v2 resume; theircompleted_positionsentries told the engine they were already done. The migration preserved their trace entries (via{**state_dict, ...}), and the v2 pipeline began at the first uncompleted position (assess_risks).crew_size: 4and the other v1 fields are present on the v2 result because the migration preserved them via{**state_dict, ...}. A migration that changed an existing field (e.g., splittingnameintofirst_name+last_name) would transform the dict more thoroughly.- JSON serialization is what made this possible. With
serialization="pickle", the saved record would be a pickled v1 instance that couldn't be re-deserialized againstMissionPlanStateV2; JSON makes the saved state a plain dict that the migration function can rewrite freely.