openarmature.checkpoint¶
openarmature.checkpoint: checkpointing capability.
Public surface: the typed :class:Checkpointer Protocol,
:class:CheckpointRecord / :class:NodePosition /
:class:CheckpointSummary shapes, the checkpoint error categories,
and two reference backends (in-memory and SQLite).
Users register a backend at graph build time via
GraphBuilder.with_checkpointer(...); the engine then fires saves
at every completed event for outermost-graph nodes and
subgraph-internal nodes, and invoke(resume_invocation=X) loads +
restores from a prior record.
FanOutInternalSaveBatching
dataclass
¶
Per-Checkpointer-instance configuration for fan-out internal save batching.
Applies ONLY to fan-out instance internal saves. Outermost-graph, subgraph-internal, and fan-out node completion saves remain synchronous.
flush_every: flush the buffer every N buffered saves.0/ negative means batching is disabled (every save flushes immediately). The buffered save count resets at each flush.
Buffered-but-unflushed saves are LOST on crash; on resume,
instances whose completed state was buffered-only revert to
in_flight / not_started and re-run. Reducer correctness
holds because their contributions hadn't durably committed.
InMemoryCheckpointer ¶
InMemoryCheckpointer(
*,
fan_out_internal_save_batching: (
FanOutInternalSaveBatching | None
) = None
)
Dict-backed Checkpointer.
Durability: none. Records live for the lifetime of this instance only; restarting the process loses everything. Appropriate for unit tests, the dev loop, and short-lived in-process pipelines that don't need crash recovery.
State shape: any. The record is held by reference, so the
Pydantic state instance the engine produces is what comes back
from :meth:load; no serialization round-trip. (This is the
feature: tests can assert on the saved state's identity.)
State-migration eligibility: none. A backend supports
migration only when it can expose a structural intermediate form
of the loaded state independent of the current
state class. This backend holds live typed instances by
reference, so a version mismatch on resume raises
CheckpointRecordInvalid rather than consulting the
migration registry.
Fan-out internal save batching: optional via the
fan_out_internal_save_batching constructor parameter.
Default is no batching (every save flushes immediately). When
enabled, fan-out instance internal saves buffer in memory and
flush every flush_every saves. Outermost-graph,
subgraph-internal, and fan-out node completion saves bypass the
buffer entirely (they remain synchronous). On crash, buffered
saves are lost — by design, a documented cost trade-off.
save
async
¶
save(invocation_id: str, record: CheckpointRecord) -> None
Store record under invocation_id, replacing any
previous record for the same id. Not durable across process
restarts.
Outermost-graph, subgraph-internal, and fan-out node
completion saves are synchronous regardless of
the batching configuration. The engine routes fan-out
instance internal saves through :meth:save_fan_out_internal
instead; this method bypasses the buffer.
save_fan_out_internal
async
¶
save_fan_out_internal(
invocation_id: str, record: CheckpointRecord
) -> None
Buffer a fan-out instance internal save under the batching
policy. When batching is disabled (default), behaves
identically to :meth:save — every save is synchronously
durable. When flush_every is positive, the save is
buffered; the buffer flushes when the count reaches the
configured threshold.
save_fan_out_in_flight_failure
async
¶
save_fan_out_in_flight_failure(
invocation_id: str, record: CheckpointRecord
) -> None
Buffer an "instance failed mid-execution" save under the batching policy. The failure save records the in_flight state of an instance whose terminal inner node raised; this save closes the in_flight observability gap for instances whose subgraphs have no sibling-completed save to piggyback on.
Under batching, this save buffers BUT does NOT count toward the flush threshold. The rationale: this save logically represents "the moment of crash" — a real crash wouldn't complete an extra save first; the buffered records (and this one) would simply be lost. The batching count-trigger mechanism is meant for steady-state save flow, not the abort path.
Backends without batching route this to a synchronous
:meth:save — the failure save is durable in the non-batching
case (fixture 048's in_flight observability requirement).
load
async
¶
load(invocation_id: str) -> CheckpointRecord | None
Return the saved record for invocation_id or None
if nothing has been saved under that id. Buffered-but-unflushed
fan-out internal saves are NOT visible
to load — that's the crash-loses-buffered contract. To
simulate a crash before the buffer flushes, drop the
Checkpointer reference; the buffer is in-memory only.
list
async
¶
list(
filter: CheckpointFilter | None = None,
) -> Iterable[CheckpointSummary]
Enumerate stored invocations as :class:CheckpointSummary
rows. With filter.correlation_id set, restricts the
results to invocations carrying that correlation id;
otherwise returns all rows.
delete
async
¶
Remove the record for invocation_id. No-op when nothing
is saved under that id (no error).
SQLiteCheckpointer ¶
SQLite Checkpointer with WAL-mode durability.
Retention: upsert; one row per invocation_id, overwritten
on every save. Saved records are NOT historical: only the most
recent save for any given invocation_id is retained.
Cross-language portability: depends on the serialization
constructor argument. "pickle" is Python-only; "json"
works across languages but is restricted to JSON-native state
shapes (the engine's Pydantic state must successfully
model_dump(mode="json")).
save
async
¶
save(invocation_id: str, record: CheckpointRecord) -> None
Upsert record under invocation_id. The state,
completed positions, parent-state stack, and per-fan-out-node
progress are serialized via the configured
:class:SerializationMode and written in a single statement.
Writes are durable on return (WAL mode, per-write fsync at the
SQLite layer).
load
async
¶
load(invocation_id: str) -> CheckpointRecord | None
Return the saved record for invocation_id or None
when no row exists. The serialization mode stored with the
row is used to decode the blobs back, so a database written
with one mode can still be loaded after the backend has been
reconfigured.
list
async
¶
list(
filter: CheckpointFilter | None = None,
) -> Iterable[CheckpointSummary]
Enumerate saved invocations as :class:CheckpointSummary
rows, ordered by last_saved_at ascending. With
filter.correlation_id set the SQL query is constrained at
the database (indexed lookup); without a filter the full
table is returned.
delete
async
¶
Remove the row for invocation_id. No-op when no row
exists (no error). The delete is durable on return.
CheckpointError ¶
Bases: Exception
Base for all checkpoint errors. Each subclass carries a
category class attribute matching its canonical category
string.
CheckpointNotFound ¶
Bases: CheckpointError
Raised when invoke(resume_invocation=X) is called and
Checkpointer.load(X) returns None. Non-transient: the
record genuinely does not exist, and retrying without changing
the invocation_id will never succeed.
CheckpointRecordInvalid ¶
Bases: CheckpointError
Raised when Checkpointer.load(X) returns a record whose
schema is incompatible with the current graph: state shape
mismatch, missing required fields, OR a post-migration state
that fails to deserialize against the current state class.
Non-transient.
Note: raw schema_version mismatches no longer route here.
They now flow through CheckpointStateMigrationMissing (no
chain registered) or CheckpointStateMigrationFailed (chain
application raised) — a three-way category distinction.
CheckpointSaveFailed ¶
Bases: CheckpointError
Raised when Checkpointer.save itself raises during a
completed event handler. Engine behavior on save failure is
implementation-defined; this implementation raises to the caller
of invoke() immediately and does NOT retry the save itself
(documented on :meth:CompiledGraph.invoke).
CheckpointStateMigrationChainAmbiguous ¶
CheckpointStateMigrationChainAmbiguous(
*args: Any,
from_version: str | None = None,
to_version: str | None = None
)
Bases: CheckpointError
Raised when the registered migration graph is ambiguous:
- Duplicate-pair case: two migrations register with the same
(from_version, to_version)pair. Raised at registration time so the user sees the ambiguity before any resume attempt. - Multi-shortest-path case: the registered migration graph has
multiple distinct shortest paths between the saved and current
versions (e.g., a diamond
v1→v2→v4+v1→v3→v4). Either compile-time detection (recommended) or load-time detection is acceptable (this impl runs the check inside BFS at resume time).
Non-transient: retrying without changing the migration graph
will not succeed. Carries from_version / to_version when
known (always set for the duplicate-pair case, set on the resume
side too for multi-shortest-path detection).
CheckpointStateMigrationFailed ¶
Bases: CheckpointError
Raised on resume when a registered migration function raises
during chain application. The migration's
exception is preserved as __cause__. Non-transient by
default: a buggy migration is deterministic, so retrying
without changing the migration code will not succeed.
CheckpointStateMigrationMissing ¶
CheckpointStateMigrationMissing(
*args: Any,
from_version: str,
to_version: str,
registered_migrations_count: int,
registry_description: str
)
Bases: CheckpointError
Raised on resume when the saved record's schema_version
does not match the current state class's schema_version AND
no chain of registered migrations bridges the two. Non-transient;
the user MUST register a migration (or pin
their state to the saved version) for the resume to succeed.
Carries the saved-from / current-to versions and a description of the registered migration set so the user can see what migrations are available.
MigrationRegistry ¶
Ordered set of registered migrations + BFS chain resolution.
Registration-time invariants:
- Two migrations with the same
from_versionANDto_versionraiseCheckpointStateMigrationChainAmbiguousdirectly so the canonical category surfaces at the registration boundary without any wrapping by the builder. - Two migrations with the same
from_versionand differentto_versionare permitted (branched migration graph; chain resolution picks a path or raises ambiguity if multiple shortest paths exist).
Resolution-time semantics:
- BFS from
record.schema_versiontocurrent.schema_version. BFS naturally finds the shortest path. - Empty registry on mismatch → no path → caller raises
CheckpointStateMigrationMissing. - Non-empty registry with no connecting path → same.
- Found a unique shortest path → return ordered list.
- Found multiple distinct shortest paths (same edge count,
different edge sequences) → raise
ValueErrorinternally;CompiledGraph._migrate_recordwraps theValueErrorasCheckpointStateMigrationChainAmbiguousat the resume boundary. The internalValueErrorkeeps the registry module dependency-light (no canonical-error import cycle).
register ¶
register(migration: StateMigration) -> None
Add a migration to the registry.
Raises ValueError if migration.to_version is empty
(the empty string marks "no schema_version declared" and is
not a valid chain target). Raises
:class:CheckpointStateMigrationChainAmbiguous if a migration
with the same (from_version, to_version) pair is already
registered. On success the migration becomes available to
:meth:resolve_chain for any path that crosses it.
resolve_chain ¶
resolve_chain(
from_version: str, to_version: str
) -> list[StateMigration] | None
Return an ordered chain of migrations bridging the two
versions, or None if no chain exists.
Raises CheckpointStateMigrationChainAmbiguous if
multiple distinct shortest paths exist between
from_version and to_version (an ambiguous chain).
Same canonical category as the duplicate-pair detection
in register; one type for chain ambiguity regardless
of when it surfaces.
describe ¶
Human-readable description of the registered set, used
in the CheckpointStateMigrationMissing error payload.
Empty registry returns "<no migrations registered>".
Output is registration-order (Python's dict preserves
insertion order). Diff-friendly test assertions should
not depend on the order across distinct registration
sequences; if cross-language conformance ever needs a
canonical order, a future change can sort by
(from_version, to_version).
StateMigration
dataclass
¶
One edge in the migration graph.
migrate receives the most-deserialized form the backend can
expose that is still independent of the current state class
(a plain dict for JSON-backed backends). It MUST return a
value of the same kind, suitable for the next migration in the
chain (or for final deserialization into the current state class).
Migrations MUST be pure: deterministic, no I/O, no implicit state. The framework does not police purity (the contract is documented, not policed); violating it risks non-deterministic resume.
Checkpointer ¶
Bases: Protocol
Persistence seam for graph invocations.
Implementations MUST be safe to share across concurrent
invocations of the same graph (the engine does not serialize
access). Each operation MUST be thread-safe (Python) /
task-coroutine-safe (asyncio); backends with synchronous I/O
typically wrap their work in asyncio.to_thread or equivalent.
supports_state_migration marks whether the backend can
expose a structural intermediate form of the loaded state (a
plain dict, JSON tree, or similar) that is independent of the
current state class. JSON-encoded backends naturally satisfy
this; backends that store live typed state instances or use
class-bound serialization (pickle) cannot. Backends that cannot
expose the intermediate MUST raise
CheckpointRecordInvalid on version mismatch even when
migrations are registered; the registry has no chance to bridge.
Attribute-presence contract. The class-body = False
below is a typing-level signal, not a runtime guarantee:
typing.Protocol does not create an instance attribute on
a conforming class that doesn't declare it itself. Concrete
backends SHOULD declare supports_state_migration (either
at the class level like InMemoryCheckpointer does, or as
an __init__-set instance attribute like
SQLiteCheckpointer does for the mode-dependent case) so
Pyright accepts the structural conformance and getattr
sees the value. The engine's resume path reads the attribute
via getattr(checkpointer, "supports_state_migration",
False), so a third-party backend that omits the attribute
entirely is treated as non-migration-eligible without
raising; that's the runtime default the engine guarantees.
save
async
¶
save(invocation_id: str, record: CheckpointRecord) -> None
Persist record for invocation_id. After return the
record MUST be durable across process crashes for backends
that document durability (in-memory backends are not durable
and MUST document this). Synchronous-by-contract: the engine
awaits this call before continuing to the next node so a
crash immediately after a completed event cannot have
lost the corresponding save.
load
async
¶
load(invocation_id: str) -> CheckpointRecord | None
Return the most recent record for invocation_id or
None if no record exists. The returned record MUST be
structurally identical to what save last wrote for this
invocation_id (round-trip integrity).
list
async
¶
list(
filter: CheckpointFilter | None = None,
) -> Iterable[CheckpointSummary]
Enumerate saved invocations. The filter shape is
backend-defined; this implementation ships list_all and
list_by_correlation_id predicates.
delete
async
¶
Remove all records for invocation_id. MUST be a no-op
when the invocation_id has no record (no error).
CheckpointFilter
dataclass
¶
Predicate for :meth:Checkpointer.list. v1 ships two narrow
fields; richer query DSLs are deferred to follow-on work.
correlation_id: match records whosecorrelation_idequals the supplied value.Nonematches every record (the "list all" case).
CheckpointRecord
dataclass
¶
CheckpointRecord(
invocation_id: str,
correlation_id: str,
state: Any,
completed_positions: tuple[NodePosition, ...],
parent_states: tuple[Any, ...],
last_saved_at: float,
schema_version: str = "",
fan_out_progress: tuple[FanOutProgress, ...] = (),
)
One invocation's progress at one save point.
Frozen: backends MUST treat the record as immutable. The engine
builds a fresh record per completed event rather than mutating
a shared one. The fan_out_progress field carries
per-fan-out-node entries when one or more fan-outs are in flight
at save time; an empty tuple means no fan-out progress to record.
CheckpointSummary
dataclass
¶
CheckpointSummary(
invocation_id: str,
correlation_id: str,
last_saved_at: float,
completed_node_count: int,
)
Lightweight record-level metadata returned by
:meth:Checkpointer.list.
Implementations MAY add backend-specific fields; the four declared here are the cross-backend portable subset callers can rely on.
FanOutInstanceProgress
dataclass
¶
FanOutInstanceProgress(
state: Literal["completed", "in_flight", "not_started"],
result: Any = None,
result_is_error: bool = False,
completed_inner_positions: tuple[
NodePosition, ...
] = (),
)
Per-instance progress entry inside a fan-out's
:attr:FanOutProgress.instances sequence.
Fields:
state: one of"completed","in_flight","not_started". Thecompletedstate is the load-bearing correctness contract: an instance markedcompletedMUST have its contribution recorded into the accumulator AND that contribution MUST be reflected inresult. Reducer composition rules depend on this exactly-once guarantee.result: forcompletedinstances, the durable contribution to the fan-out accumulator (a success value for thetarget_fieldbucket, or undercollecterror policy an error entry for theerrors_fieldbucket). Typed per the parent state schema'starget_field/errors_field(representation is implementation-defined; Python stores asAnysince dynamic typing absorbs the variance). Unused forin_flightandnot_started.result_is_error: boolean discriminator forcompletedentries —Truewhen the contribution is acollect-mode error entry that rolls forward intoerrors_field,Falsewhen the contribution is a success value that rolls forward intotarget_field. MUST beFalseforin_flightandnot_started(the value ofresultis ignored for those). Implementations MUST consult this field on resume rather than inferring routing fromresult's shape — heuristic inspection would misclassify user state values that happen to match the engine's error-record shape.completed_inner_positions: forin_flightinstances, a tuple ofNodePositionentries captured at save time. Same shape as :attr:CheckpointRecord.completed_positionsbut scoped to this instance's inner subgraph rather than the outer graph. Empty when the instance fired its firststartedevent but no innercompletedevent yet. Observational only:in_flightinstances re-enter at the subgraph entry node on resume, not at any of these positions. Unused forcompletedandnot_started.
FanOutProgress
dataclass
¶
FanOutProgress(
fan_out_node_name: str,
namespace: tuple[str, ...],
instance_count: int,
instances: tuple[FanOutInstanceProgress, ...],
)
Per-fan-out-node progress entry inside a
:attr:CheckpointRecord.fan_out_progress sequence.
Fields:
fan_out_node_name: the fan-out node's name in its containing graph.namespace: the chain of outer subgraph-node names enclosing the fan-out (empty for outermost-graph fan-outs). Disambiguates fan-outs of the same name in different nested-subgraph contexts.instance_count: the resolved instance count for this fan-out (count or items_field mode).instances: a tuple of per-instance entries indexed byfan_out_index(instances[i]is the entry forfan_out_index=i). Length equalsinstance_count.
NodePosition
dataclass
¶
NodePosition(
namespace: tuple[str, ...],
node_name: str,
step: int,
attempt_index: int = 0,
fan_out_index: int | None = None,
)
A single completed-node coordinate in the resume map.
Frozen + automatically hashable (no mutable fields), so positions
can live in sets and dict keys; the engine's resume-entry
derivation relies on set membership to skip nodes that have
already completed.
Fields:
namespace: chain of containing-graph node names from outermost down to (but not including) this node. Empty for outermost-graph nodes; one entry for subgraph-internal nodes; two entries when nested two deep, and so on. Distinct fromNodeEvent.namespacewhich includes the node's own name;NodeEvent.namespace == NodePosition.namespace + (NodePosition.node_name,).node_name: the node's local name in its containing graph.step: the monotonic step counter at the time the node completed (shared withNodeEvent.step).attempt_index: 0-based retry attempt index. The final successful attempt's index is what gets recorded.fan_out_index: populated only for events from inside a fan-out instance. Those events do NOT produce records in the shipping version; the field is part of the position shape so a future per-instance fan-out resume can populate it without a record-shape migration.