Skip to content

openarmature.checkpoint

openarmature.checkpoint: checkpointing capability.

Public surface: the typed :class:Checkpointer Protocol, :class:CheckpointRecord / :class:NodePosition / :class:CheckpointSummary shapes, the checkpoint error categories, and two reference backends (in-memory and SQLite).

Users register a backend at graph build time via GraphBuilder.with_checkpointer(...); the engine then fires saves at every completed event for outermost-graph nodes and subgraph-internal nodes, and invoke(resume_invocation=X) loads + restores from a prior record.

FanOutInternalSaveBatching dataclass

FanOutInternalSaveBatching(flush_every: int = 0)

Per-Checkpointer-instance configuration for fan-out internal save batching.

Applies ONLY to fan-out instance internal saves. Outermost-graph, subgraph-internal, and fan-out node completion saves remain synchronous.

  • flush_every: flush the buffer every N buffered saves. 0 / negative means batching is disabled (every save flushes immediately). The buffered save count resets at each flush.

Buffered-but-unflushed saves are LOST on crash; on resume, instances whose completed state was buffered-only revert to in_flight / not_started and re-run. Reducer correctness holds because their contributions hadn't durably committed.

InMemoryCheckpointer

InMemoryCheckpointer(
    *,
    fan_out_internal_save_batching: (
        FanOutInternalSaveBatching | None
    ) = None
)

Dict-backed Checkpointer.

Durability: none. Records live for the lifetime of this instance only; restarting the process loses everything. Appropriate for unit tests, the dev loop, and short-lived in-process pipelines that don't need crash recovery.

State shape: any. The record is held by reference, so the Pydantic state instance the engine produces is what comes back from :meth:load; no serialization round-trip. (This is the feature: tests can assert on the saved state's identity.)

State-migration eligibility: none. A backend supports migration only when it can expose a structural intermediate form of the loaded state independent of the current state class. This backend holds live typed instances by reference, so a version mismatch on resume raises CheckpointRecordInvalid rather than consulting the migration registry.

Fan-out internal save batching: optional via the fan_out_internal_save_batching constructor parameter. Default is no batching (every save flushes immediately). When enabled, fan-out instance internal saves buffer in memory and flush every flush_every saves. Outermost-graph, subgraph-internal, and fan-out node completion saves bypass the buffer entirely (they remain synchronous). On crash, buffered saves are lost — by design, a documented cost trade-off.

save async

save(invocation_id: str, record: CheckpointRecord) -> None

Store record under invocation_id, replacing any previous record for the same id. Not durable across process restarts.

Outermost-graph, subgraph-internal, and fan-out node completion saves are synchronous regardless of the batching configuration. The engine routes fan-out instance internal saves through :meth:save_fan_out_internal instead; this method bypasses the buffer.

save_fan_out_internal async

save_fan_out_internal(
    invocation_id: str, record: CheckpointRecord
) -> None

Buffer a fan-out instance internal save under the batching policy. When batching is disabled (default), behaves identically to :meth:save — every save is synchronously durable. When flush_every is positive, the save is buffered; the buffer flushes when the count reaches the configured threshold.

save_fan_out_in_flight_failure async

save_fan_out_in_flight_failure(
    invocation_id: str, record: CheckpointRecord
) -> None

Buffer an "instance failed mid-execution" save under the batching policy. The failure save records the in_flight state of an instance whose terminal inner node raised; this save closes the in_flight observability gap for instances whose subgraphs have no sibling-completed save to piggyback on.

Under batching, this save buffers BUT does NOT count toward the flush threshold. The rationale: this save logically represents "the moment of crash" — a real crash wouldn't complete an extra save first; the buffered records (and this one) would simply be lost. The batching count-trigger mechanism is meant for steady-state save flow, not the abort path.

Backends without batching route this to a synchronous :meth:save — the failure save is durable in the non-batching case (fixture 048's in_flight observability requirement).

load async

load(invocation_id: str) -> CheckpointRecord | None

Return the saved record for invocation_id or None if nothing has been saved under that id. Buffered-but-unflushed fan-out internal saves are NOT visible to load — that's the crash-loses-buffered contract. To simulate a crash before the buffer flushes, drop the Checkpointer reference; the buffer is in-memory only.

list async

list(
    filter: CheckpointFilter | None = None,
) -> Iterable[CheckpointSummary]

Enumerate stored invocations as :class:CheckpointSummary rows. With filter.correlation_id set, restricts the results to invocations carrying that correlation id; otherwise returns all rows.

delete async

delete(invocation_id: str) -> None

Remove the record for invocation_id. No-op when nothing is saved under that id (no error).

SQLiteCheckpointer

SQLiteCheckpointer(
    path: str | Path,
    *,
    serialization: SerializationMode = "pickle"
)

SQLite Checkpointer with WAL-mode durability.

Retention: upsert; one row per invocation_id, overwritten on every save. Saved records are NOT historical: only the most recent save for any given invocation_id is retained.

Cross-language portability: depends on the serialization constructor argument. "pickle" is Python-only; "json" works across languages but is restricted to JSON-native state shapes (the engine's Pydantic state must successfully model_dump(mode="json")).

save async

save(invocation_id: str, record: CheckpointRecord) -> None

Upsert record under invocation_id. The state, completed positions, parent-state stack, and per-fan-out-node progress are serialized via the configured :class:SerializationMode and written in a single statement. Writes are durable on return (WAL mode, per-write fsync at the SQLite layer).

load async

load(invocation_id: str) -> CheckpointRecord | None

Return the saved record for invocation_id or None when no row exists. The serialization mode stored with the row is used to decode the blobs back, so a database written with one mode can still be loaded after the backend has been reconfigured.

list async

list(
    filter: CheckpointFilter | None = None,
) -> Iterable[CheckpointSummary]

Enumerate saved invocations as :class:CheckpointSummary rows, ordered by last_saved_at ascending. With filter.correlation_id set the SQL query is constrained at the database (indexed lookup); without a filter the full table is returned.

delete async

delete(invocation_id: str) -> None

Remove the row for invocation_id. No-op when no row exists (no error). The delete is durable on return.

CheckpointError

Bases: Exception

Base for all checkpoint errors. Each subclass carries a category class attribute matching its canonical category string.

CheckpointNotFound

CheckpointNotFound(invocation_id: str)

Bases: CheckpointError

Raised when invoke(resume_invocation=X) is called and Checkpointer.load(X) returns None. Non-transient: the record genuinely does not exist, and retrying without changing the invocation_id will never succeed.

CheckpointRecordInvalid

CheckpointRecordInvalid(invocation_id: str, message: str)

Bases: CheckpointError

Raised when Checkpointer.load(X) returns a record whose schema is incompatible with the current graph: state shape mismatch, missing required fields, OR a post-migration state that fails to deserialize against the current state class. Non-transient.

Note: raw schema_version mismatches no longer route here. They now flow through CheckpointStateMigrationMissing (no chain registered) or CheckpointStateMigrationFailed (chain application raised) — a three-way category distinction.

CheckpointSaveFailed

CheckpointSaveFailed(
    invocation_id: str, cause: BaseException
)

Bases: CheckpointError

Raised when Checkpointer.save itself raises during a completed event handler. Engine behavior on save failure is implementation-defined; this implementation raises to the caller of invoke() immediately and does NOT retry the save itself (documented on :meth:CompiledGraph.invoke).

CheckpointStateMigrationChainAmbiguous

CheckpointStateMigrationChainAmbiguous(
    *args: Any,
    from_version: str | None = None,
    to_version: str | None = None
)

Bases: CheckpointError

Raised when the registered migration graph is ambiguous:

  • Duplicate-pair case: two migrations register with the same (from_version, to_version) pair. Raised at registration time so the user sees the ambiguity before any resume attempt.
  • Multi-shortest-path case: the registered migration graph has multiple distinct shortest paths between the saved and current versions (e.g., a diamond v1→v2→v4 + v1→v3→v4). Either compile-time detection (recommended) or load-time detection is acceptable (this impl runs the check inside BFS at resume time).

Non-transient: retrying without changing the migration graph will not succeed. Carries from_version / to_version when known (always set for the duplicate-pair case, set on the resume side too for multi-shortest-path detection).

CheckpointStateMigrationFailed

CheckpointStateMigrationFailed(
    *args: Any, from_version: str, to_version: str
)

Bases: CheckpointError

Raised on resume when a registered migration function raises during chain application. The migration's exception is preserved as __cause__. Non-transient by default: a buggy migration is deterministic, so retrying without changing the migration code will not succeed.

CheckpointStateMigrationMissing

CheckpointStateMigrationMissing(
    *args: Any,
    from_version: str,
    to_version: str,
    registered_migrations_count: int,
    registry_description: str
)

Bases: CheckpointError

Raised on resume when the saved record's schema_version does not match the current state class's schema_version AND no chain of registered migrations bridges the two. Non-transient; the user MUST register a migration (or pin their state to the saved version) for the resume to succeed.

Carries the saved-from / current-to versions and a description of the registered migration set so the user can see what migrations are available.

MigrationRegistry

MigrationRegistry()

Ordered set of registered migrations + BFS chain resolution.

Registration-time invariants:

  • Two migrations with the same from_version AND to_version raise CheckpointStateMigrationChainAmbiguous directly so the canonical category surfaces at the registration boundary without any wrapping by the builder.
  • Two migrations with the same from_version and different to_version are permitted (branched migration graph; chain resolution picks a path or raises ambiguity if multiple shortest paths exist).

Resolution-time semantics:

  • BFS from record.schema_version to current.schema_version. BFS naturally finds the shortest path.
  • Empty registry on mismatch → no path → caller raises CheckpointStateMigrationMissing.
  • Non-empty registry with no connecting path → same.
  • Found a unique shortest path → return ordered list.
  • Found multiple distinct shortest paths (same edge count, different edge sequences) → raise ValueError internally; CompiledGraph._migrate_record wraps the ValueError as CheckpointStateMigrationChainAmbiguous at the resume boundary. The internal ValueError keeps the registry module dependency-light (no canonical-error import cycle).

register

register(migration: StateMigration) -> None

Add a migration to the registry.

Raises ValueError if migration.to_version is empty (the empty string marks "no schema_version declared" and is not a valid chain target). Raises :class:CheckpointStateMigrationChainAmbiguous if a migration with the same (from_version, to_version) pair is already registered. On success the migration becomes available to :meth:resolve_chain for any path that crosses it.

resolve_chain

resolve_chain(
    from_version: str, to_version: str
) -> list[StateMigration] | None

Return an ordered chain of migrations bridging the two versions, or None if no chain exists.

Raises CheckpointStateMigrationChainAmbiguous if multiple distinct shortest paths exist between from_version and to_version (an ambiguous chain). Same canonical category as the duplicate-pair detection in register; one type for chain ambiguity regardless of when it surfaces.

describe

describe() -> str

Human-readable description of the registered set, used in the CheckpointStateMigrationMissing error payload. Empty registry returns "<no migrations registered>".

Output is registration-order (Python's dict preserves insertion order). Diff-friendly test assertions should not depend on the order across distinct registration sequences; if cross-language conformance ever needs a canonical order, a future change can sort by (from_version, to_version).

StateMigration dataclass

StateMigration(
    from_version: str,
    to_version: str,
    migrate: Callable[[Any], Any],
)

One edge in the migration graph.

migrate receives the most-deserialized form the backend can expose that is still independent of the current state class (a plain dict for JSON-backed backends). It MUST return a value of the same kind, suitable for the next migration in the chain (or for final deserialization into the current state class).

Migrations MUST be pure: deterministic, no I/O, no implicit state. The framework does not police purity (the contract is documented, not policed); violating it risks non-deterministic resume.

Checkpointer

Bases: Protocol

Persistence seam for graph invocations.

Implementations MUST be safe to share across concurrent invocations of the same graph (the engine does not serialize access). Each operation MUST be thread-safe (Python) / task-coroutine-safe (asyncio); backends with synchronous I/O typically wrap their work in asyncio.to_thread or equivalent.

supports_state_migration marks whether the backend can expose a structural intermediate form of the loaded state (a plain dict, JSON tree, or similar) that is independent of the current state class. JSON-encoded backends naturally satisfy this; backends that store live typed state instances or use class-bound serialization (pickle) cannot. Backends that cannot expose the intermediate MUST raise CheckpointRecordInvalid on version mismatch even when migrations are registered; the registry has no chance to bridge.

Attribute-presence contract. The class-body = False below is a typing-level signal, not a runtime guarantee: typing.Protocol does not create an instance attribute on a conforming class that doesn't declare it itself. Concrete backends SHOULD declare supports_state_migration (either at the class level like InMemoryCheckpointer does, or as an __init__-set instance attribute like SQLiteCheckpointer does for the mode-dependent case) so Pyright accepts the structural conformance and getattr sees the value. The engine's resume path reads the attribute via getattr(checkpointer, "supports_state_migration", False), so a third-party backend that omits the attribute entirely is treated as non-migration-eligible without raising; that's the runtime default the engine guarantees.

save async

save(invocation_id: str, record: CheckpointRecord) -> None

Persist record for invocation_id. After return the record MUST be durable across process crashes for backends that document durability (in-memory backends are not durable and MUST document this). Synchronous-by-contract: the engine awaits this call before continuing to the next node so a crash immediately after a completed event cannot have lost the corresponding save.

load async

load(invocation_id: str) -> CheckpointRecord | None

Return the most recent record for invocation_id or None if no record exists. The returned record MUST be structurally identical to what save last wrote for this invocation_id (round-trip integrity).

list async

list(
    filter: CheckpointFilter | None = None,
) -> Iterable[CheckpointSummary]

Enumerate saved invocations. The filter shape is backend-defined; this implementation ships list_all and list_by_correlation_id predicates.

delete async

delete(invocation_id: str) -> None

Remove all records for invocation_id. MUST be a no-op when the invocation_id has no record (no error).

CheckpointFilter dataclass

CheckpointFilter(correlation_id: str | None = None)

Predicate for :meth:Checkpointer.list. v1 ships two narrow fields; richer query DSLs are deferred to follow-on work.

  • correlation_id: match records whose correlation_id equals the supplied value. None matches every record (the "list all" case).

CheckpointRecord dataclass

CheckpointRecord(
    invocation_id: str,
    correlation_id: str,
    state: Any,
    completed_positions: tuple[NodePosition, ...],
    parent_states: tuple[Any, ...],
    last_saved_at: float,
    schema_version: str = "",
    fan_out_progress: tuple[FanOutProgress, ...] = (),
)

One invocation's progress at one save point.

Frozen: backends MUST treat the record as immutable. The engine builds a fresh record per completed event rather than mutating a shared one. The fan_out_progress field carries per-fan-out-node entries when one or more fan-outs are in flight at save time; an empty tuple means no fan-out progress to record.

CheckpointSummary dataclass

CheckpointSummary(
    invocation_id: str,
    correlation_id: str,
    last_saved_at: float,
    completed_node_count: int,
)

Lightweight record-level metadata returned by :meth:Checkpointer.list.

Implementations MAY add backend-specific fields; the four declared here are the cross-backend portable subset callers can rely on.

FanOutInstanceProgress dataclass

FanOutInstanceProgress(
    state: Literal["completed", "in_flight", "not_started"],
    result: Any = None,
    result_is_error: bool = False,
    completed_inner_positions: tuple[
        NodePosition, ...
    ] = (),
)

Per-instance progress entry inside a fan-out's :attr:FanOutProgress.instances sequence.

Fields:

  • state: one of "completed", "in_flight", "not_started". The completed state is the load-bearing correctness contract: an instance marked completed MUST have its contribution recorded into the accumulator AND that contribution MUST be reflected in result. Reducer composition rules depend on this exactly-once guarantee.
  • result: for completed instances, the durable contribution to the fan-out accumulator (a success value for the target_field bucket, or under collect error policy an error entry for the errors_field bucket). Typed per the parent state schema's target_field / errors_field (representation is implementation-defined; Python stores as Any since dynamic typing absorbs the variance). Unused for in_flight and not_started.
  • result_is_error: boolean discriminator for completed entries — True when the contribution is a collect-mode error entry that rolls forward into errors_field, False when the contribution is a success value that rolls forward into target_field. MUST be False for in_flight and not_started (the value of result is ignored for those). Implementations MUST consult this field on resume rather than inferring routing from result's shape — heuristic inspection would misclassify user state values that happen to match the engine's error-record shape.
  • completed_inner_positions: for in_flight instances, a tuple of NodePosition entries captured at save time. Same shape as :attr:CheckpointRecord.completed_positions but scoped to this instance's inner subgraph rather than the outer graph. Empty when the instance fired its first started event but no inner completed event yet. Observational only: in_flight instances re-enter at the subgraph entry node on resume, not at any of these positions. Unused for completed and not_started.

FanOutProgress dataclass

FanOutProgress(
    fan_out_node_name: str,
    namespace: tuple[str, ...],
    instance_count: int,
    instances: tuple[FanOutInstanceProgress, ...],
)

Per-fan-out-node progress entry inside a :attr:CheckpointRecord.fan_out_progress sequence.

Fields:

  • fan_out_node_name: the fan-out node's name in its containing graph.
  • namespace: the chain of outer subgraph-node names enclosing the fan-out (empty for outermost-graph fan-outs). Disambiguates fan-outs of the same name in different nested-subgraph contexts.
  • instance_count: the resolved instance count for this fan-out (count or items_field mode).
  • instances: a tuple of per-instance entries indexed by fan_out_index (instances[i] is the entry for fan_out_index=i). Length equals instance_count.

NodePosition dataclass

NodePosition(
    namespace: tuple[str, ...],
    node_name: str,
    step: int,
    attempt_index: int = 0,
    fan_out_index: int | None = None,
)

A single completed-node coordinate in the resume map.

Frozen + automatically hashable (no mutable fields), so positions can live in sets and dict keys; the engine's resume-entry derivation relies on set membership to skip nodes that have already completed.

Fields:

  • namespace: chain of containing-graph node names from outermost down to (but not including) this node. Empty for outermost-graph nodes; one entry for subgraph-internal nodes; two entries when nested two deep, and so on. Distinct from NodeEvent.namespace which includes the node's own name; NodeEvent.namespace == NodePosition.namespace + (NodePosition.node_name,).
  • node_name: the node's local name in its containing graph.
  • step: the monotonic step counter at the time the node completed (shared with NodeEvent.step).
  • attempt_index: 0-based retry attempt index. The final successful attempt's index is what gets recorded.
  • fan_out_index: populated only for events from inside a fan-out instance. Those events do NOT produce records in the shipping version; the field is part of the position shape so a future per-instance fan-out resume can populate it without a record-shape migration.