Skip to content

openarmature.graph

Public API for the OpenArmature graph engine.

Re-exports the surface a user touches when building and running a graph: the state schema base, reducers, the builder/compiled pair, edge primitives and the END sentinel, the node/subgraph/projection seams, and the canonical compile-time and runtime error categories.

GraphBuilder

GraphBuilder(state_cls: type[StateT])

Mutable builder for a graph; call compile() to produce a CompiledGraph.

add_node

add_node(
    name: str,
    fn: Callable[[StateT], Awaitable[Mapping[str, Any]]],
    *,
    middleware: Iterable[Middleware] | None = None
) -> Self

Register a function node.

fn is an async callable taking the current state and returning a partial update (a mapping from declared state-field names to new values). The merge into parent state happens through the field reducers; fn itself does not see or touch the merge.

middleware is an optional per-node middleware tuple, outer-to-inner. Per-graph middleware (see :meth:add_middleware) wraps these.

Raises ValueError if name is already declared on this builder.

add_subgraph_node

add_subgraph_node(
    name: str,
    compiled: CompiledGraph[ChildT],
    projection: (
        ProjectionStrategy[StateT, ChildT] | None
    ) = None,
    *,
    middleware: Iterable[Middleware] | None = None
) -> Self

Register a subgraph as a node.

compiled is a child :class:CompiledGraph (parameterized on its own state type). On entry the parent's state is translated to the child's via projection.project_in; on exit the child's final state is folded back via projection.project_out. The default projection is :class:FieldNameMatching; pass an :class:ExplicitMapping for declarative inputs/outputs instead.

middleware wraps the whole subgraph dispatch as one atomic call from the parent's perspective. Per-graph middleware on the parent does NOT propagate into the child; child middleware is configured on the child's own builder.

Raises ValueError if name is already declared.

add_fan_out_node

add_fan_out_node(
    name: str,
    *,
    subgraph: CompiledGraph[ChildT],
    collect_field: str,
    target_field: str,
    items_field: str | None = None,
    item_field: str | None = None,
    count: int | CountResolver | None = None,
    concurrency: int | ConcurrencyResolver | None = 10,
    error_policy: str = "fail_fast",
    on_empty: str = "raise",
    count_field: str | None = None,
    inputs: Mapping[str, str] | None = None,
    extra_outputs: Mapping[str, str] | None = None,
    instance_middleware: Iterable[Middleware] | None = None,
    errors_field: str | None = None,
    middleware: Iterable[Middleware] | None = None,
    subgraph_identity: str | None = None
) -> Self

Register a fan-out node.

Validates configuration at registration time:

  • Exactly one of items_field or count MUST be specified (fan_out_count_mode_ambiguous otherwise).
  • items_field MUST refer to a list-typed field on the parent state schema (fan_out_field_not_list otherwise).
  • items_field mode requires item_field; count mode forbids item_field.
  • on_empty and error_policy MUST be one of the permitted string literals ("raise" / "noop" and "fail_fast" / "collect" respectively).
  • inputs / extra_outputs / count_field field references go through the existing mapping_references_undeclared_field rule.

add_parallel_branches_node

add_parallel_branches_node(
    name: str,
    *,
    branches: Mapping[str, BranchSpec[Any]],
    error_policy: Literal[
        "fail_fast", "collect"
    ] = "fail_fast",
    errors_field: str | None = None,
    middleware: Iterable[Middleware] | None = None
) -> Self

Register a parallel-branches node.

branches is a mapping from non-empty branch name to a :class:BranchSpec. Insertion order is preserved and is the dispatch + merge order.

Validates at registration:

  • branches non-empty (raises ParallelBranchesNoBranches).
  • Each branch name is a non-empty string (raises ValueError).
  • Each branch gives its work as exactly one of subgraph / call, and a callable branch declares no inputs / outputs (raises ParallelBranchesInvalidBranchSpec).
  • Each subgraph branch's inputs / outputs refer only to declared fields on the (parent, branch-subgraph) state schemas (raises MappingReferencesUndeclaredField).
  • errors_field (when set) is a declared parent-state field.

with_checkpointer

with_checkpointer(checkpointer: Checkpointer) -> Self

Register a Checkpointer for the compiled graph.

At most one Checkpointer per graph; calling with_checkpointer again replaces the previously-stored one. Pass the result of :meth:compile to :meth:CompiledGraph.invoke as usual; the engine fires saves at every completed event for outermost-graph and subgraph-internal nodes.

with_state_migration

with_state_migration(
    from_version: str,
    to_version: str,
    migrate: Callable[[Any], Any],
) -> Self

Register one state migration.

On resume, when the saved record's schema_version does not match the current state class's schema_version, the engine consults the registry for a chain that bridges the two and applies it to the record's state (and to each entry in parent_states) before deserialization.

Migrations MUST be pure: deterministic, no I/O, no implicit state. The framework does not police purity, but violating it risks non-deterministic resume.

Raises CheckpointStateMigrationChainAmbiguous at registration if the (from_version, to_version) pair is already registered. Also raises ValueError if to_version is the empty-string sentinel (the un-declared marker is not a valid chain target).

with_state_migrations

with_state_migrations(*migrations: StateMigration) -> Self

Register multiple migrations in one call. Convenience over with_state_migration; each entry is registered through the same path and obeys the same ambiguity rule.

Pre-validates the full input list against the existing registry + against earlier entries in the same call before mutating, so a duplicate in the third entry cannot leave the first two half-registered. If any duplicate (from, to) pair is detected the call raises CheckpointStateMigrationChainAmbiguous without mutating the registry; otherwise all entries register atomically.

add_middleware

add_middleware(middleware: Middleware) -> Self

Register a per-graph middleware applied to every node in this graph.

Per-graph middleware composes OUTSIDE per-node middleware. Calling order is preserved (outer-to-inner); earlier add_middleware calls produce outer layers in the runtime chain.

add_edge

add_edge(source: str, target: str | EndSentinel) -> Self

Register a static edge from source to target.

target is either the name of another declared node or the :data:END sentinel. A node may have at most one outgoing static edge; declaring two raises :class:MultipleOutgoingEdges at compile(). For state-dependent branching use :meth:add_conditional_edge.

add_conditional_edge

add_conditional_edge(
    source: str, fn: Callable[[StateT], str | EndSentinel]
) -> Self

Register a conditional edge from source.

fn is a synchronous callable that receives the merged post-node state and returns either the name of the next node or the :data:END sentinel. Returning any other value raises :class:RoutingError at runtime; an exception inside fn becomes an :class:EdgeException.

Like static edges, a node has at most one outgoing edge; declaring both a static and a conditional edge from the same source raises :class:MultipleOutgoingEdges at compile().

set_entry

set_entry(name: str) -> Self

Declare which registered node the graph starts at.

Calling set_entry again replaces the previously set entry. compile() raises :class:NoDeclaredEntry if no entry was set, and :class:DanglingEdge if the entry name was never declared with add_*_node.

compile

compile() -> CompiledGraph[StateT]

Validate the builder and return an immutable :class:CompiledGraph.

Runs structural checks in order:

  • conflicting_reducers (state-field reducer conflicts)
  • declarative projection validate hooks (e.g. :class:ExplicitMapping)
  • no_declared_entry / dangling_edge (entry pointer)
  • dangling_edge (edge endpoints reference declared nodes)
  • multiple_outgoing_edges (one edge per source)
  • unreachable_node (every node reachable from entry)

The first failing check raises its specific :class:CompileError subclass; passing means the returned graph is ready for :meth:CompiledGraph.invoke. A previously attached :class:Checkpointer is forwarded onto the compiled graph.

CaughtException dataclass

CaughtException(
    category: str | None,
    message: str,
    chain: tuple[CauseLink, ...],
)

A classified exception cause chain.

The result of :func:classify_cause_chain, and the record FailureIsolatedEvent.caught_exception carries.

  • category: the derived single failure category, the outermost non-carrier link whose category is a non-empty string, or None when no non-carrier link carries one.
  • message: the message of the link category is derived from, or (when no link carries a category) the outermost non-carrier link's message.
  • chain: the ordered cause chain, outermost (the classified exception, index 0) to innermost (the originating raise), one :class:CauseLink per exception.
CauseLink(
    category: str | None, message: str, carrier: bool
)

One link in a caught exception's resolved cause chain.

  • category: the link's failure category when it carries one (a string), else None.
  • message: the link's own message (the str of the exception).
  • carrier: True when the link is an engine-applied node_exception carrier wrapper, False for an ordinary (non-carrier) exception.

CompiledGraph dataclass

CompiledGraph(
    state_cls: type[StateT],
    entry: str,
    nodes: Mapping[str, Node[StateT]],
    edges: Mapping[
        str, StaticEdge | ConditionalEdge[StateT]
    ],
    reducers: Mapping[str, Reducer],
    middleware: tuple[Middleware, ...] = (),
    _attached_observers: list[SubscribedObserver] = list[
        SubscribedObserver
    ](),
    _active_workers: dict[
        Task[None], _InvocationContext
    ] = dict[Task[None], _InvocationContext](),
    _checkpointer_slot: list[Checkpointer | None] = (
        lambda: [None]
    )(),
    migration_registry: MigrationRegistry = MigrationRegistry(),
)

An immutable, executable graph produced by GraphBuilder.compile().

The compile-time topology (state class, entry, nodes, edges, reducers) is immutable. Two mutable lists ride alongside for observer plumbing (_attached_observers and _active_workers), neither of which affect the compiled topology and both of which are scoped to the same instance.

checkpointer property

checkpointer: Checkpointer | None

Currently-registered Checkpointer, or None.

attach_observer

attach_observer(
    observer: Observer,
    *,
    phases: Iterable[str] | None = None
) -> RemoveHandle

Register a graph-attached observer.

Graph-attached observers fire on every invocation of this graph until removed; including when this graph runs as a subgraph inside a parent. Returns a RemoveHandle whose .remove() method detaches the observer; idempotent.

phases selects the phase strings ("started", "completed") the observer subscribes to; default is both. An empty phases set raises ValueError at registration time.

Changes to the registered set during a graph run do NOT take effect until the next invocation. The set of observers delivering events for an in-flight invocation is fixed at the point the invocation begins.

attach_checkpointer

attach_checkpointer(
    checkpointer: Checkpointer | None,
) -> None

Register a Checkpointer for this graph.

Pass None to clear a previously-registered backend. Without a registered Checkpointer the engine never calls save() and invoke(resume_invocation=...) raises checkpoint_not_found.

At most one Checkpointer per graph. Calling attach_checkpointer again replaces the previously- registered one; multi-backend fan-out is the user's responsibility (wrap two underlying Checkpointers behind a custom protocol-conforming implementation if needed).

drain async

drain(timeout: float | None = None) -> DrainSummary

Await delivery of every observer event produced by prior invocations of this graph, optionally bounded by timeout.

Callers running in short-lived processes (scripts, serverless functions, CLIs) MUST use drain to avoid losing observer events that were dispatched but not yet delivered.

Only events dispatched before this call are awaited; events from invocations started concurrently with drain may or may not be included. Subgraph events from active invocations are part of the parent invocation's worker and are covered automatically.

timeout is a non-negative duration in seconds. If omitted or None, drain waits indefinitely — a slow, hung, or misbehaving observer can therefore hold drain (and the calling process) indefinitely. If supplied, drain returns no later than timeout seconds after the call begins; any observer events still queued or in-flight at that point are considered undelivered. Workers are cancelled via Task.cancel() so the compiled graph remains usable for subsequent invocations — partial delivery state from one drain does NOT leak into the next invocation.

Returns a :class:DrainSummary with undelivered_count and timeout_reached fields. The shape is the same whether or not a timeout was supplied; on the no-timeout / timeout-not- fired path both fields are zero / false.

Observers SHOULD be written to be cancellation-safe (idempotent writes, try/finally cleanup) so that interruption by drain timeout does not leave partial side effects in an inconsistent state.

Raises ValueError if timeout is negative or NaN. Non-numeric input raises TypeError from the comparison.

drain_events_for async

drain_events_for(
    invocation_id: str, *, timeout: float | None = 5.0
) -> DrainSummary

Await delivery of every observer event tagged with invocation_id that was dispatched as of this call's entry, optionally bounded by timeout.

Use this from a terminal node body to synchronize on the observer event stream before reading derived observer state (a queryable accumulator's per-invocation bucket, a latency rollup, a token-usage record). The drain blocks until every event dispatched up to the moment of the call has reached every attached observer, then returns.

Snapshot semantic: the drain awaits the events dispatched as of call time. Events emitted after the call begins (notably the calling node's own completed event, which fires only after the node body returns) are out of scope. This is what allows an in-node call to avoid deadlocking on its own completed event. The calling node's started event, by contrast, fires immediately BEFORE the body runs and IS in the snapshot — the drain awaits its delivery normally.

timeout is a non-negative duration in seconds (default 5.0). None waits indefinitely. timeout=0.0 is a non-blocking check: returns immediately whether the snapshot target was met. Raises :class:ValueError on negative or NaN input.

On timeout the deliver worker is left running. The compiled graph stays available to serve other invocations after a per-invocation drain times out; the deliver loop continues processing the queue, including the events the timed-out caller failed to await. This is the load-bearing difference from :meth:drain, which cancels its workers.

Returns a :class:DrainSummary with undelivered_count and timeout_reached. On the clean path both are zero / false; on timeout undelivered_count is the snapshot target minus the deliver loop's current delivered count for this invocation. Unknown invocation_id (no active worker, or the invocation has already drained and the worker has exited) returns an empty summary — not an error.

Interaction with :meth:drain: if process-wide drain is called while a per-invocation drain is pending, drain's shutdown semantics take precedence. The deliver worker is cancelled, its remaining events are not delivered, and the per-invocation waker's target may never be reached. The per-invocation call then blocks until its own timeout fires and returns timeout_reached=True. Mixing the two primitives in the same shutdown path is unusual; use drain for lifespan / shutdown coordination and drain_events_for for in-flight synchronization.

invoke async

invoke(
    initial_state: StateT,
    observers: (
        Iterable[Observer | SubscribedObserver] | None
    ) = None,
    *,
    correlation_id: str | None = None,
    invocation_id: str | None = None,
    resume_invocation: str | None = None,
    metadata: Mapping[str, Any] | None = None
) -> StateT

Run the graph from initial_state to END and return the final state.

Optional observers are invocation-scoped; they fire only for this run, after all graph-attached observers (including subgraph-attached ones for events originating in subgraphs).

Each entry in observers may be either a bare Observer callable (subscribes to both phases) or a SubscribedObserver wrapping an observer with an explicit phases set.

This method returns as soon as the graph execution loop completes, regardless of whether the observer delivery queue has finished processing every dispatched event. Use await compiled.drain() if you need delivery-completion guarantees.

Checkpointing.

  • correlation_id is the per-invocation cross-backend join key. Caller-supplied or auto-generated UUIDv4 when absent. Preserved unchanged across resume_invocation.
  • invocation_id is the per-attempt id. Caller-supplied or auto-generated UUIDv4 when absent; a caller value MAY be any non-empty URL-safe string. Applies to the fresh-invocation path only — a resume_invocation mints a fresh id regardless (each attempt is its own invocation).
  • resume_invocation names a prior invocation_id to resume from. Requires a registered Checkpointer; raises CheckpointNotFound when the backend has no record for the supplied id, CheckpointRecordInvalid when the loaded record's schema is incompatible. Resume mints a NEW invocation_id; each attempt is its own invocation in the observability sense; the correlation_id is the cross-attempt join key.
  • Save-failure policy. This implementation raises CheckpointSaveFailed to the caller of invoke() immediately when Checkpointer.save raises; saves are NOT retried by the engine. Wrap the Checkpointer in your own retry logic if transient backend failures should be reattempted.

Caller-supplied invocation metadata.

  • metadata is an optional mapping of arbitrary key → value entries the framework propagates to every observability backend. Values MUST be OTel-attribute- compatible scalars (str / int / float / bool) or homogeneous arrays of those types. Keys MUST NOT use the openarmature.* or gen_ai.* reserved namespaces. Validation runs synchronously at the API boundary; rule violations raise ValueError BEFORE any work begins.
  • The OTel observer emits each entry as an openarmature.user.<key> cross-cutting span attribute on every span and OTel log record. The Langfuse observer merges each entry into trace.metadata AND every observation.metadata (top level, sibling to correlation_id).
  • Mid-invocation augmentation via :func:openarmature.observability.set_invocation_metadata merges into the same ContextVar with the same validation rules; affects spans emitted AFTER the call returns.

Raises one of the runtime error categories on failure.

ConditionalEdge dataclass

ConditionalEdge(
    source: str, fn: Callable[[StateT], str | EndSentinel]
)

Routes from source to whichever node fn(state) returns. The function MUST return either a declared node name or END; any other value raises RoutingError at runtime.

EndSentinel

Engine-provided sentinel routing target. Use the module-level END.

StaticEdge dataclass

StaticEdge(source: str, target: str | EndSentinel)

Always routes from source to target.

CompileError

Bases: GraphError

Base for compile-time errors.

ConflictingReducers

ConflictingReducers(field_name: str)

Bases: CompileError

Raised at compile() when a state field has more than one distinct reducer attached across its declarations. Each field accepts at most one reducer.

DanglingEdge

DanglingEdge(source: str, target: str)

Bases: CompileError

Raised at compile() when an edge (static or the entry pointer) names a node that was never declared on the builder.

EdgeException

EdgeException(
    source_node: str,
    cause: BaseException,
    recoverable_state: Any,
)

Bases: RuntimeGraphError

Raised when a conditional-edge callable raises an uncaught exception during invoke(). The original exception is attached as __cause__; recoverable_state carries the merged state that the callable was evaluating.

FanOutCountModeAmbiguous

FanOutCountModeAmbiguous(node_name: str, message: str)

Bases: CompileError

Raised when a fan-out node specifies both items_field and count, or neither. Exactly one is required.

FanOutDegradedUpdateMissingCollectField

FanOutDegradedUpdateMissingCollectField(
    node_name: str, collect_field: str
)

Bases: CompileError

Raised when a fan-out instance FailureIsolationMiddleware has a static (mapping) degraded_update that omits the node's collect_field. A degraded instance contributes its degraded_update as the instance result, so the collected field has to be present. A callable degraded_update is exempt: its output is not known at construction time, and an omitted collect_field yields a null slot at runtime instead of a failure.

FanOutEmpty

FanOutEmpty(node_name: str, recoverable_state: Any)

Bases: NodeException

Raised when a fan-out node resolves to zero instances while its on_empty config is "raise" (the default).

Surfaces as a regular node_exception (so it integrates with the existing error propagation and recoverable-state machinery) but exposes an additional fan_out_category attribute so callers can distinguish empty-fan-out from generic node failures.

FanOutFieldNotList

FanOutFieldNotList(node_name: str, field_name: str)

Bases: CompileError

Raised when a fan-out node's items_field does not refer to a declared list-typed field on the parent state schema.

FanOutInvalidConcurrency

FanOutInvalidConcurrency(
    node_name: str,
    returned: int | None,
    recoverable_state: Any,
)

Bases: NodeException

Raised when a fan-out node's concurrency callable returns zero or a negative integer at runtime. Same node-exception shape as :class:FanOutEmpty.

FanOutInvalidCount

FanOutInvalidCount(
    node_name: str, returned: int, recoverable_state: Any
)

Bases: NodeException

Raised when a fan-out node's count callable returns a negative integer at runtime. Same node-exception shape as :class:FanOutEmpty, with fan_out_category = "fan_out_invalid_count".

GraphError

Bases: Exception

Base for all graph-engine errors.

MappingReferencesUndeclaredField

MappingReferencesUndeclaredField(
    *, direction: str, side: str, field_name: str
)

Bases: CompileError

Raised when a subgraph-as-node inputs or outputs mapping names a field that is not declared in the relevant state schema.

MultipleOutgoingEdges

MultipleOutgoingEdges(source: str)

Bases: CompileError

Raised at compile() when a single source node has more than one outgoing static edge. To branch from one node, use :meth:GraphBuilder.add_conditional_edge instead of multiple add_edge calls.

NoDeclaredEntry

NoDeclaredEntry()

Bases: CompileError

Raised at compile() when the graph has no entry node set. Call :meth:GraphBuilder.set_entry before compiling.

NodeException

NodeException(
    node_name: str,
    cause: BaseException,
    recoverable_state: Any,
)

Bases: RuntimeGraphError

Raised when a node's function (or innermost middleware) raises an uncaught exception during invoke(). The original exception is attached as __cause__; recoverable_state carries the state as it was just before the failing node ran.

ParallelBranchesBranchFailed

ParallelBranchesBranchFailed(
    node_name: str,
    cause: BaseException,
    recoverable_state: Any,
    *,
    branch_name: str
)

Bases: NodeException

Raised when a branch's subgraph raises under error_policy: 'fail_fast'.

Subtype of :class:NodeException (a node_exception subtype attached at the parallel-branches node's level). The existing NodeException-classifier path handles transient classification from __cause__: non-transient by default, inheriting transient classification from the wrapped exception.

Carries branch_name as a structured field; the inner exception rides __cause__.

ParallelBranchesInvalidBranchSpec

ParallelBranchesInvalidBranchSpec(
    node_name: str, branch_name: str, reason: str
)

Bases: CompileError

Raised at registration when a branch spec's work is not given by exactly one of subgraph / call.

A branch is either a compiled subgraph (with optional inputs / outputs projection) or an inline call (an async function over the parent state). Declaring both, neither, or inputs / outputs on a callable branch (which reads parent state and returns parent-shaped fields directly, so projection is meaningless) is invalid. Non-transient.

ParallelBranchesNoBranches

ParallelBranchesNoBranches(node_name: str)

Bases: CompileError

Raised at registration when a parallel-branches node's branches mapping is empty. Non-transient.

ReducerError

ReducerError(
    field_name: str,
    reducer_name: str,
    producing_node: str,
    cause: BaseException,
    recoverable_state: Any,
)

Bases: RuntimeGraphError

Raised when a field reducer raises while merging a node's partial update. __cause__ is the original exception; recoverable_state is the pre-merge state.

RoutingError

RoutingError(
    source_node: str,
    returned: object,
    recoverable_state: Any,
)

Bases: RuntimeGraphError

Raised when a conditional-edge callable returns a value that is neither the name of a declared node nor the :data:END sentinel.

RuntimeGraphError

Bases: GraphError

Base for runtime errors. The four non-validation categories carry a recoverable_state attribute.

StateValidationError

StateValidationError(
    message: str,
    fields: list[str],
    cause: BaseException | None = None,
)

Bases: RuntimeGraphError

State failed schema validation at a graph boundary.

Unlike the other runtime errors, this category does NOT carry recoverable_state: at entry there is no prior state to recover; at exit the failing state IS the final state.

UnreachableNode

UnreachableNode(node_name: str)

Bases: CompileError

Raised at compile() when a declared node cannot be reached from the entry by following static or conditional edges.

FailureIsolatedEvent dataclass

FailureIsolatedEvent(
    event_name: str,
    namespace: tuple[str, ...],
    attempt_index: int,
    fan_out_index: int | None,
    branch_name: str | None,
    pre_state: Any,
    post_state: Mapping[str, Any],
    caught_exception: CaughtException,
)

A failure-isolation event delivered to observers.

Reports that FailureIsolationMiddleware caught an exception at a node and substituted a degraded partial update for the node's output. Observer code filters by type discrimination (isinstance(event, FailureIsolatedEvent)).

Field set:

  • event_name: the caller-supplied identifier for this catch site, from the middleware's configuration.
  • namespace / attempt_index / fan_out_index / branch_name: the wrapped node's lineage identity, surfaced for correlation with the node's other events.
  • pre_state: the state the wrapped node received.
  • post_state: the degraded partial update the middleware returned in place of the node's output.
  • caught_exception: a :class:CaughtException record of the caught exception (its derived category / message and the full cause chain).

InvocationCompletedEvent dataclass

InvocationCompletedEvent(
    final_state: Any,
    status: Literal["completed", "failed"],
    final_node: str,
    invocation_id: str,
    correlation_id: str | None,
)

An invocation-exit event delivered to observers.

Emitted once per invocation, after the last node has fired (and after a failure boundary on the failure path). Observers that populate Trace-level output fields (the Langfuse observer, today) consume it to resolve trace.output per the three-lever decision tree. Observers without a Trace-level output concept (the OTel observer) treat it as a no-op.

Carries:

  • final_state: the state at invocation exit (the engine's returned state on the success path; the state at point-of- failure on the failure path).
  • status: closed enum "completed" (END reached) or "failed" (any node, edge, reducer, or boundary validator raised before END).
  • final_node: the name of the node whose execution preceded the END-reached transition on the success path, or the node that raised on the failure path.
  • invocation_id / correlation_id: the run + correlation ids.

The event is NOT subject to the observer phases filter; the delivery worker forwards it to every subscribed observer.

InvocationStartedEvent dataclass

InvocationStartedEvent(
    initial_state: Any,
    invocation_id: str,
    correlation_id: str | None,
    entry_node: str,
)

An invocation-entry event delivered to observers.

Emitted once per invocation, before any node fires. Observers that populate Trace-level input fields (the Langfuse observer, today) consume it to resolve trace.input per the three-lever decision tree. Observers without a Trace-level input concept (the OTel observer) treat it as a no-op.

Carries:

  • initial_state: the raw state object the engine constructed from invoke()'s arguments (the typed-state instance).
  • invocation_id: the invocation id (caller-supplied or framework-generated).
  • correlation_id: the correlation id when present.
  • entry_node: the outermost-graph entry node name.

The event is NOT subject to the observer phases filter (which only governs NodeEvent phases); the delivery worker forwards it to every subscribed observer.

LlmCompletionEvent dataclass

LlmCompletionEvent(
    invocation_id: str,
    correlation_id: str | None,
    node_name: str,
    namespace: tuple[str, ...],
    attempt_index: int,
    fan_out_index: int | None,
    branch_name: str | None,
    provider: str,
    model: str,
    response_id: str | None,
    response_model: str | None,
    usage: Usage | None,
    latency_ms: float | None,
    finish_reason: str | None,
    input_messages: list[dict[str, Any]],
    output_content: str | None,
    request_params: Mapping[str, Any],
    request_extras: Mapping[str, Any],
    active_prompt: Any,
    active_prompt_group: Any,
    call_id: str,
    caller_invocation_metadata: (
        Mapping[str, AttributeValue] | None
    ) = None,
    output_tool_calls: list[ToolCall] = list["ToolCall"](),
)

A typed LLM provider call event delivered to observers.

Carries identity, scoping, and outcome data for an LLM call as structured fields. Observer code filters by type discrimination (isinstance(event, LlmCompletionEvent)) rather than by the impl-current sentinel-namespace string match the legacy NodeEvent pattern uses.

Field set:

  • invocation_id: the outer invocation's identifier.
  • correlation_id: cross-backend correlation id when present.
  • node_name: the user-defined node that issued the call.
  • namespace: the calling node's namespace tuple (NOT the legacy sentinel namespace).
  • attempt_index: retry-attempt index (0 on first attempt).
  • fan_out_index: fan-out instance index when the calling node ran inside a fan-out instance; None otherwise.
  • branch_name: parallel-branches branch name when the calling node ran inside a branch; None otherwise.
  • provider: provider identifier; matches gen_ai.system.
  • model: the model identifier the call targeted (the request-side bound model; distinct from response_model).
  • response_id: provider-returned response id; None when the provider didn't return one.
  • response_model: provider-returned model identifier; distinct from model (the provider may return a more specific identifier than the one requested). None when the provider didn't return one.
  • usage: token-accounting record reusing the existing openarmature.llm.response.Usage class. None when the call returned no usage at all.
  • latency_ms: wall-clock latency measured at the adapter boundary, in milliseconds. None when latency was not measured.
  • finish_reason: the call's finish reason; None when the call did not complete normally.
  • input_messages: the message list the call was made with, serialized to the plain-dict shape. Non-nullable; empty list when the call had no history. Inline image bytes are redacted before population (see the comment block above for the redaction contract).
  • output_content: the assistant message's content string from the response. None on tool-call-only responses (the structured-response and tool-call paths are mutually exclusive at the response level).
  • output_tool_calls: the assistant message's output tool calls (the ToolCall records). Populated unconditionally; empty list when the response carried no tool calls. The output tool calls live here rather than in output_content (which is the response text and is empty on a tool-call-only response).
  • request_params: the GenAI request-parameter set the caller supplied. Absence-is-meaningful: only caller-supplied keys appear; empty mapping when none supplied. Keys are the cross-vendor parameter names without the gen_ai.request. prefix (e.g. temperature, max_tokens).
  • request_extras: the RuntimeConfig extras pass- through bag in native mapping form (not JSON-encoded). Empty mapping when no extras supplied.
  • active_prompt: 5-field identity snapshot of the active PromptResult at LLM-call time (name / version / label / template_hash / rendered_hash). None when the call ran outside any prompt-context binding. Typed as Any because the prompts package imports State indirectly; observer-side narrowing reads the attribute names directly.
  • active_prompt_group: {group_name} snapshot when the call ran inside a PromptGroup context; None otherwise. Same Any typing rationale as active_prompt.
  • call_id: per-call disambiguator minted by the implementation. Always present, freshly minted per provider.complete() call, stable for the call's lifetime, unique within the run. Distinct from response_id.
  • caller_invocation_metadata: optional snapshot of caller- supplied invocation metadata at LLM-call time. OPTIONAL; the python OpenAIProvider populates it by default so the bundled OTel/Langfuse observers can emit the openarmature.user.<key> span-attribute family without an extra opt-in. Pass populate_caller_metadata=False to suppress the snapshot. Future non-OpenAI providers MAY default to None.

LlmFailedEvent dataclass

LlmFailedEvent(
    invocation_id: str,
    correlation_id: str | None,
    node_name: str,
    namespace: tuple[str, ...],
    attempt_index: int,
    fan_out_index: int | None,
    branch_name: str | None,
    provider: str,
    model: str,
    latency_ms: float | None,
    input_messages: list[dict[str, Any]],
    request_params: Mapping[str, Any],
    request_extras: Mapping[str, Any],
    active_prompt: Any,
    active_prompt_group: Any,
    call_id: str,
    error_category: str,
    error_message: str,
    error_type: str | None = None,
    caller_invocation_metadata: (
        Mapping[str, AttributeValue] | None
    ) = None,
)

A typed LLM provider call failure event delivered to observers.

Carries identity, scoping, and failure-context data for an LLM call that raised a llm-provider category exception. Observer code filters by type discrimination (isinstance(event, LlmFailedEvent)) rather than by the impl-current sentinel- namespace string match.

Identity / scoping / request-side field set mirrors LlmCompletionEvent 1:1 — same field semantics, same nullability rules. Response-side fields (response_id, response_model, usage, output_content, finish_reason) are ABSENT from this variant — no response was received.

Failure-specific fields:

  • error_category: the llm-provider normative error category the call raised. One of the 9 canonical strings (provider_authentication, provider_unavailable, provider_invalid_model, provider_model_not_loaded, provider_rate_limit, provider_invalid_response, provider_invalid_request, provider_unsupported_content_block, structured_output_invalid). Always present.
  • error_type: OPTIONAL impl-level / vendor-specific error type or code. Two acceptable styles: vendor error code (e.g. "rate_limit_exceeded") OR upstream exception class name (e.g. "RateLimitError"). None when no impl-side type is available.
  • error_message: human-readable message from the raised exception. Always present (empty string when the exception carried no message).

LlmRetryAttemptEvent dataclass

LlmRetryAttemptEvent(
    invocation_id: str,
    correlation_id: str | None,
    node_name: str,
    namespace: tuple[str, ...],
    attempt_index: int,
    fan_out_index: int | None,
    branch_name: str | None,
    provider: str,
    model: str,
    call_id: str,
    llm_attempt_index: int,
    latency_ms: float | None,
    input_messages: list[dict[str, Any]],
    request_params: Mapping[str, Any],
    request_extras: Mapping[str, Any],
    active_prompt: Any,
    active_prompt_group: Any,
    response_id: str | None = None,
    response_model: str | None = None,
    usage: Usage | None = None,
    finish_reason: str | None = None,
    output_content: str | None = None,
    error_category: str | None = None,
    error_message: str | None = None,
    error_type: str | None = None,
    caller_invocation_metadata: (
        Mapping[str, AttributeValue] | None
    ) = None,
    output_tool_calls: list[ToolCall] = list["ToolCall"](),
)

One LLM-call attempt delivered to observers for per-attempt span rendering.

Carries the full request-side surface plus that attempt's outcome. error_category discriminates the outcome: None for a successful attempt (the response-side fields are populated), a category string for a failed attempt (the response-side fields are None — no response was received).

Field set:

  • llm_attempt_index: the call-level retry-attempt index, 0 for the first attempt and 0..N-1 across the N attempts of a call-level retry. Distinct from attempt_index (the node-level retry index used for calling-span resolution); the two are independent.
  • identity / scoping (invocation_id ... call_id) and the request side (input_messages / request_params / request_extras / active_prompt / active_prompt_group) mirror :class:LlmCompletionEvent, carried on every attempt.
  • response side (response_id / response_model / usage / finish_reason / output_content / output_tool_calls): populated on a successful attempt; None / empty list on a failed attempt. output_tool_calls is the source the OTel observer renders the §5.5.1 / §5.5.10 output tool-call attributes from (this is the per-attempt event that drives the LLM span).
  • failure side (error_category / error_message / error_type): populated on a failed attempt; None on a successful one.

MetadataAugmentationEvent dataclass

MetadataAugmentationEvent(
    entries: Mapping[str, AttributeValue],
    namespace: tuple[str, ...],
    attempt_index: int = 0,
    fan_out_index: int | None = None,
    branch_name: str | None = None,
    fan_out_index_chain: tuple[int | None, ...] = (),
    branch_name_chain: tuple[str | None, ...] = (),
)

A metadata-augmentation event delivered to observers.

Emitted by :func:openarmature.observability.metadata.set_invocation_metadata when called mid-invocation. Carries:

  • entries: the delta merged into the per-async-context invocation metadata mapping by the call. Read-only view.
  • namespace / attempt_index / fan_out_index / branch_name: the four lineage fields that jointly identify the augmenting execution context (the calling node's identity tuple). When set_invocation_metadata is called from outside a node body, namespace is the empty tuple, attempt_index is 0, and both fan_out_index and branch_name are None — the invocation-level identity.

Distinct from :class:NodeEvent because there is no node phase, no pre/post state, and no error: this event reports a side-channel augmentation, not a node-attempt boundary. The event is NOT subject to the observer phases filter (which only governs NodeEvent phases); the delivery worker forwards it to every subscribed observer. Observers that handle it iterate their open observations whose lineage is an ancestor of (or equal to) the augmenting context's lineage and apply the entries as openarmature.user.<key> (OTel) / metadata.<key> (Langfuse).

NodeEvent dataclass

NodeEvent(
    node_name: str,
    namespace: tuple[str, ...],
    step: int,
    phase: Literal[
        "started",
        "completed",
        "checkpoint_saved",
        "checkpoint_migrated",
    ],
    pre_state: Any,
    post_state: State | None,
    error: RuntimeGraphError | None,
    parent_states: tuple[State, ...],
    attempt_index: int = 0,
    fan_out_index: int | None = None,
    fan_out_config: FanOutEventConfig | None = None,
    parallel_branches_config: (
        ParallelBranchesEventConfig | None
    ) = None,
    branch_name: str | None = None,
    fan_out_index_chain: tuple[int | None, ...] = (),
    branch_name_chain: tuple[str | None, ...] = (),
    subgraph_identities: tuple[str | None, ...] = (),
    caller_invocation_metadata: Mapping[
        str, AttributeValue
    ] = (lambda: _EMPTY_METADATA)(),
)

A single node-boundary event delivered to observers.

  • phase is "started" (dispatched before the node runs) or "completed" (dispatched after the node returns or raises and the merge runs/fails). Each node attempt produces exactly one of each in that order. The engine ALSO dispatches a "checkpoint_saved" event on the same shape after a successful Checkpointer.save call; observers MUST opt in explicitly via phases={"checkpoint_saved"} to receive these (default subscription is {"started", "completed"} only, so legacy observers don't see them).
  • node_name is the name under which this node was registered in its immediate containing graph.
  • namespace is an ordered sequence of node names from the outermost graph down to this node. For a node in the outermost graph, namespace is (node_name,). For nested subgraphs, the chain extends.
  • step is a monotonically-increasing counter starting at 0, scoped to a single outermost invocation. Subgraph-internal nodes increment the same counter. The started/completed pair for one attempt share the same step.
  • pre_state is the state the node received, before reducer merge. Populated on both phases (identical across the pair).
  • post_state is the state after the node's partial update merged successfully. Populated only on completed events that succeeded.
  • error is the wrapped runtime error (NodeException, ReducerError, or StateValidationError) when the node failed. Populated only on completed events that failed.
  • parent_states carries one state snapshot per containing graph, outermost first; for a node in the outermost graph it's an empty tuple. Invariant: len(parent_states) == len(namespace) - 1.
  • attempt_index is the 0-based index of this attempt among any retries. 0 for nodes not wrapped by retry middleware.
  • fan_out_index is the 0-based index of this fan-out instance among its siblings. None for nodes not inside a fan-out.
  • fan_out_config carries resolved fan-out configuration on events from a fan-out NODE itself. See :class:FanOutEventConfig. None on every other event.
  • branch_name is the non-empty string name of the parallel-branches branch this event came from. None for nodes outside any branch. The combination of namespace, branch_name, fan_out_index, attempt_index, and phase jointly uniquely identifies an event source. branch_name and fan_out_index are independent; both MAY be present when a branch's subgraph contains a fan-out (or a fan-out instance contains a parallel-branches node).

Invariants:

  • On started events, post_state and error MUST both be None.
  • On completed events, exactly one of post_state and error is populated.

Synthetic phases. "checkpoint_saved" and "checkpoint_migrated" repurpose this dataclass for non-node events. Both are opt-in via phases={...} on observer registration; default subscriptions are {"started", "completed"} only, so legacy observers never see them. Conventions on synthetic events:

  • checkpoint_saved: pre_state carries the saved post-merge state (still a real State instance for this phase), post_state is None. step matches the saving node's step.
  • checkpoint_migrated: step=-1 (no graph-step sequencing; migrations run before any node fires). node_name="openarmature.checkpoint.migrate" and namespace=("openarmature.checkpoint.migrate",) are dotted-pseudo identifiers, not real node names. pre_state carries a private _MigrationSummary dataclass with from_version / to_version / chain_length, NOT a State instance. parent_states is the empty tuple.

Because pre_state is no longer guaranteed to be a State on the synthetic phases, its type is declared as Any and observer authors who subscribe to those phases MUST narrow per-phase before reading pre_state.

FanOutConfig dataclass

FanOutConfig(
    subgraph: CompiledGraph[Any],
    collect_field: str,
    target_field: str,
    items_field: str | None = None,
    item_field: str | None = None,
    count: int | CountResolver | None = None,
    concurrency: int | ConcurrencyResolver | None = 10,
    error_policy: Literal[
        "fail_fast", "collect"
    ] = "fail_fast",
    on_empty: Literal["raise", "noop"] = "raise",
    count_field: str | None = None,
    inputs: Mapping[str, str] = dict[str, str](),
    extra_outputs: Mapping[str, str] = dict[str, str](),
    instance_middleware: tuple[Middleware, ...] = (),
    errors_field: str | None = None,
    subgraph_identity: str | None = None,
)

Frozen configuration for a :class:FanOutNode.

Validation happens at builder compile time (see GraphBuilder.add_fan_out_node); construction here is unchecked beyond the obvious type-level constraints.

FanOutNode dataclass

FanOutNode(
    name: str,
    config: FanOutConfig,
    middleware: tuple[Middleware, ...] = (),
)

A node that fans out into N concurrent subgraph instances.

The Node Protocol contract requires name, middleware, and run; run here is unusual because it needs the engine invocation context to descend properly. The engine recognizes this type in _invoke and calls run_with_context (see compiled.CompiledGraph._step_fan_out_node) rather than the plain run(state) shape. run exists for Protocol conformance only and raises if anyone calls it without context.

run async

run(state: ParentT) -> Mapping[str, Any]

Not implemented at this level. The fan-out node requires the engine's invocation context to resolve count and concurrency and dispatch instances; the engine calls :meth:run_with_context instead. This method exists only to satisfy the :class:Node Protocol and always raises :class:NotImplementedError.

run_with_context async

run_with_context(
    state: ParentT,
    context: _InvocationContext,
    *,
    pre_resolved_count: int | None = None,
    pre_resolved_concurrency: (
        tuple[int | None] | None
    ) = None
) -> Mapping[str, Any]

Execute the fan-out and return the merged partial update.

Snapshot, resolve count + concurrency, build per-instance states, run concurrently with the configured error policy, fan-in collected/extra fields, write count_field and errors_field if configured.

Per the per-instance resume contract: this method registers a per-fan-out tracking entry on the shared context.fan_out_progress_state dict before dispatching, flips each instance's state through not_started -> in_flight -> completed as the instance progresses, and fires an explicit "instance completed" save after the per-instance contribution has been recorded into the accumulator. The atomicity contract is observed: the per-instance state mutation precedes the save, so a crash after mutation but before save leaves the saved record showing in_flight (resume re-runs the instance).

pre_resolved_count / pre_resolved_concurrency are hooks: when the engine has already resolved the config eagerly to populate NodeEvent.fan_out_config for the fan-out node's events, it passes the resolved values in so callable resolvers aren't invoked twice. pre_resolved_concurrency is wrapped in a 1-tuple to disambiguate "caller passed None (unbounded)" from "caller didn't pass anything."

FailureIsolationMiddleware

FailureIsolationMiddleware(
    *,
    degraded_update: DegradedUpdate,
    event_name: str,
    catch: Collection[str] | None = None,
    predicate: Callable[[Exception], bool] | None = None,
    on_caught: (
        Callable[[Exception], Awaitable[None]] | None
    ) = None
)

Catch exceptions escaping the inner chain; return a degraded partial update.

Configuration:

  • degraded_update (required): the partial update returned on a caught exception, OR a callable state -> partial_update for input-state-dependent degraded shapes.
  • event_name (required): a stable identifier for this catch site; surfaces on the FailureIsolatedEvent. No default — useful values are node-specific, and a generic default would make downstream telemetry strictly worse.
  • catch (optional): a set of error categories. When supplied, an exception is caught only if the derived category of its cause chain (the category of the outermost non-carrier link that carries one, resolving through node_exception carriers, the value reported as caught_exception.category) is in the set. Composes with predicate as a conjunction; both default permissive (both unset catches every Exception). The recommended gate for category-scoped degradation.
  • predicate (optional): Exception -> bool over the SURFACE (caught) exception. When supplied, only exceptions where predicate(exc) is true are caught; others propagate. Defaults to always-true. A predicate inspecting the exception directly sees the node_exception carrier at a wrapping placement, not the originating failure; use catch for category gating, or classify the chain via classify_cause_chain.
  • on_caught (optional): an async Exception -> Awaitable[None] hook fired on a caught exception, for caller-specific telemetry beyond the framework event. It runs inline before the degraded update is returned, so a slow hook delays the node's return; an exception raised by the hook is isolated (logged via warnings.warn, not propagated) so a telemetry bug cannot turn a recovered node back into a failure.

Middleware

Bases: Protocol

An async callable that wraps the dispatch of a single node.

The shape is (state, next) -> partial_update. The middleware MUST return a mapping of field names to values; same shape a node returns. It may:

  • Inspect or transform state before calling next(state).
  • Inspect or transform the partial update returned from next.
  • Short-circuit by NOT calling next and returning its own partial (the rest of the chain, meaning any subsequent middleware and the wrapped node, does not execute).
  • Catch exceptions raised by next(state) and either re-raise, transform, or recover (returning a partial update instead of raising).
  • Call next more than once (e.g., retry middleware).

A middleware MUST NOT mutate the input state object; pass a new state to next if a transformation is needed.

NextCall

Bases: Protocol

The next callable a middleware receives.

Calling it with a state invokes the next layer of the chain (or the wrapped node, at the inner end) and returns the partial update from that layer. Middleware MAY transform the state passed to next; the transformed state flows down the chain but does NOT replace the engine's pre-merge state at the outermost level (the transformed state is passed to next, NOT to the engine's merge step).

RetryConfig dataclass

RetryConfig(
    max_attempts: int = 3,
    classifier: Classifier | None = None,
    backoff: BackoffStrategy | None = None,
    on_retry: OnRetryCallback | None = None,
)

Canonical retry configuration record consumed by :class:RetryMiddleware.

  • max_attempts: total attempts including the first call. 1 disables retry. Default 3.
  • classifier: predicate (exception, state) -> bool deciding whether a failure is retry-eligible. None (the default) selects :func:default_classifier (matches category against TRANSIENT_CATEGORIES).
  • backoff: callable (attempt_index) -> seconds. None (the default) selects :func:exponential_jitter_backoff (base 1s, cap 30s, full jitter).
  • on_retry: optional async callback (exception, attempt_index) -> None fired before each backoff sleep.

RetryMiddleware

RetryMiddleware(config: RetryConfig | None = None)

Canonical retry middleware.

Configured with a :class:RetryConfig (or the default RetryConfig() when omitted). Construct as RetryMiddleware(RetryConfig(max_attempts=...)).

TimingMiddleware

TimingMiddleware(
    *,
    node_name: str,
    on_complete: OnCompleteCallback,
    clock: Callable[[], float] | None = None
)

Canonical timing middleware.

Records wall-clock duration of the wrapped chain via the host language's monotonic clock (Python's time.monotonic). The callback fires inline before the chain's result returns to the caller; slow callbacks add to the apparent node duration, so users SHOULD keep them fast (queue work, defer I/O).

Errors raised by on_complete propagate to the engine as a node_exception.

TimingRecord dataclass

TimingRecord(
    node_name: str,
    duration_ms: float,
    outcome: str,
    exception_category: str | None,
)

A single timing measurement produced by TimingMiddleware.

  • node_name: the node this middleware was attached to (captured at registration; users supply it explicitly for per-node use).
  • duration_ms: milliseconds from middleware entry to chain return-or-raise, measured with a monotonic clock.
  • outcome: one of "success" or "exception".
  • exception_category: when outcome == "exception" and the exception carries a category attribute, that string; otherwise None.

FunctionNode dataclass

FunctionNode(
    name: str,
    fn: Callable[[StateT], Awaitable[Mapping[str, Any]]],
    middleware: tuple[Middleware, ...] = tuple[
        Middleware, ...
    ](),
)

A node backed by an async callable.

run async

run(state: StateT) -> Mapping[str, Any]

Invoke the wrapped async callable and return its partial update. Called by the engine inside any per-node and per-graph middleware chain.

Node

Bases: Protocol

A unit of work in a compiled graph.

name property

name: str

The name this node was registered under in its containing graph.

middleware property

middleware: tuple[Middleware, ...]

Per-node middleware applied at this node's registration site, outer-to-inner. Composed inside any per-graph middleware.

run async

run(state: StateT) -> Mapping[str, Any]

Execute against state and return a partial update to be merged via reducers.

DrainSummary dataclass

DrainSummary(undelivered_count: int, timeout_reached: bool)

Outcome of a CompiledGraph.drain() call.

Returned from drain() regardless of whether a timeout was supplied. When no timeout was supplied, or the timeout did not fire, undelivered_count == 0 and timeout_reached is False. When the timeout fired, undelivered_count reports the number of events that were dispatched to the delivery worker but not fully delivered to every subscribed observer before cancellation, and timeout_reached is True.

These two fields are the required minimum. Implementations MAY extend the shape with diagnostic detail (per-observer counts, sampled event metadata) in subsequent versions; this version ships the minimum.

Observer

Bases: Protocol

The shape of a callable that receives observer events.

Observer is a structural Protocol; any async callable matching the signature qualifies, no subclass required. Plain functions, bound methods, and class instances with __call__ all work::

async def log_observer(event: NodeEvent | MetadataAugmentationEvent) -> None:
    if isinstance(event, NodeEvent):
        print(event.node_name, event.phase)

compiled.attach_observer(log_observer)

Contract:

  • Observers MUST be async so the delivery queue can await each one and coordinate ordering. The graph itself never awaits observers.
  • Observers MUST NOT alter state, routing, or any other aspect of the graph run. Read-only side effects (logging, metrics, span emission) only.

The event parameter is positional-only (event, /) so structural conformance doesn't pin you to that name; any of event, _event, e, etc. matches.

Seven event variants reach observers. The signature is the union; observers isinstance-narrow on the first line and choose which variants they handle.

  • :class:NodeEvent — the started/completed/checkpoint phase events. Subject to the phases filter on :class:SubscribedObserver; observers whose phase set excludes event.phase do NOT receive it.
  • :class:MetadataAugmentationEvent — emitted by :func:openarmature.observability.metadata.set_invocation_metadata when called mid-invocation. Carries the augmenting context's lineage tuple (namespace, attempt_index, fan_out_index, branch_name) so rich backends can update their open observations in place (span.set_attribute(openarmature.user.<key>, v) for OTel, observation.update(metadata=...) for Langfuse). This variant is NOT subject to the phases filter — every subscribed observer sees it and isinstance-narrows to decide whether to act. Simple user observers typically early-return after isinstance(event, NodeEvent) checks.
  • :class:InvocationStartedEvent — emitted once per invocation before any node fires. Carries the engine-constructed initial_state so Trace-level backends (Langfuse) can populate trace.input via the three-lever decision tree. NOT subject to the phases filter; OTel-only observers ignore it via the isinstance gate.
  • :class:InvocationCompletedEvent — emitted once per invocation after the last node fires (on both the success path and the failure path). Carries final_state + a closed status: {"completed", "failed"} enum so Trace-level backends can populate trace.output. NOT subject to the phases filter; OTel-only observers ignore it via the isinstance gate.
  • :class:LlmCompletionEvent — dispatched on every successful LLM provider call. Carries the typed identity / request / response field set for LLM-aware backends. NOT subject to the phases filter; non-LLM observers ignore it via the isinstance gate.
  • :class:LlmFailedEvent — the failure-side counterpart, dispatched alongside the provider exception when an LLM call raises. NOT subject to the phases filter.
  • :class:FailureIsolatedEvent — dispatched by FailureIsolationMiddleware when it catches an exception and substitutes a degraded partial update. NOT subject to the phases filter.

Optional prepare_sync extension

An observer MAY additionally define a synchronous method::

def prepare_sync(self, event: NodeEvent, /) -> None: ...

that the engine calls IN THE ENGINE TASK, BEFORE queueing the event for the async __call__. This exists for observers that need to set up state (e.g., open a span and stash a handle in a ContextVar) that the engine itself must read synchronously before running the node body (otherwise logs emitted on the first line of the body wouldn't see the right span).

prepare_sync is opt-in via hasattr; no subclass or Protocol method required. Observers that don't define it skip the synchronous prep entirely; observers that do define it run only for "started"-phase events, with errors warned-not- propagated (same isolation contract as the async path). prepare_sync is never invoked for :class:MetadataAugmentationEvent (the synchronous-prep contract is anchored on the started phase, which only NodeEvent carries).

RemoveHandle dataclass

RemoveHandle(
    _observers: list[SubscribedObserver],
    _observer: SubscribedObserver,
)

Returned by CompiledGraph.attach_observer. Call .remove() to detach the observer. Idempotent: calling .remove() after the observer is already detached is a no-op.

Changes to the registered observer set during a graph run do NOT take effect until the next invocation.

remove

remove() -> None

Detach the observer from its compiled graph. Idempotent: a second call is a no-op rather than an error. The change takes effect on the next invoke(); in-flight invocations keep the observer set they started with.

SubscribedObserver dataclass

SubscribedObserver(
    observer: Observer, phases: frozenset[str] = ALL_PHASES
)

An observer paired with its phase subscription set.

Observers register with an optional phases parameter naming the phase strings they want to receive. The default is ALL_PHASES, historically named when there were only two phases; it now means "the default subscription" ({"started", "completed"}). The "checkpoint_saved" phase is opt-in: subscribe to it explicitly via phases={"checkpoint_saved"} (or include it in a custom set). KNOWN_PHASES is the full "every phase the engine can produce" set used by the registration-time validator.

Empty phase sets are forbidden; passing one raises ValueError at registration time so misconfiguration surfaces immediately.

Construct one of these directly when handing phase-filtered observers to CompiledGraph.invoke(observers=...). For the single-observer attach_observer path, pass phases= as a keyword argument and the engine wraps it for you.

BranchSpec dataclass

BranchSpec(
    subgraph: CompiledGraph[ChildT] | None = None,
    call: BranchCallable | None = None,
    when: Callable[[Any], bool] | None = None,
    inputs: Mapping[str, str] = dict[str, str](),
    outputs: Mapping[str, str] = dict[str, str](),
    middleware: tuple[Middleware, ...] = (),
)

One entry in a :class:ParallelBranchesNode's branch mapping.

A branch's work is given by exactly one of:

  • subgraph — a compiled subgraph (the heterogeneous-subgraph form): each branch may reference a different compiled subgraph with a different state schema, with inputs / outputs following the same shape as subgraph projection mappings.
  • call — an inline async function over the parent state returning a parent-shaped partial update (the lightweight form): no subgraph, no state schema, no inputs / outputs. The function reads the parent state directly and returns parent fields.

An optional when predicate over the parent state, evaluated once at dispatch, skips the branch entirely when it returns false.

Validation lives on the builder side (GraphBuilder.add_parallel_branches_node): exactly one of subgraph / call and no inputs / outputs on a callable branch (parallel_branches_invalid_branch_spec); mapping_references_undeclared_field for subgraph-branch inputs/outputs referencing undeclared fields; parallel_branches_no_branches for empty branches maps; ValueError for empty-string branch names.

ParallelBranchesNode dataclass

ParallelBranchesNode(
    name: str,
    branches: dict[str, BranchSpec[Any]],
    error_policy: Literal[
        "fail_fast", "collect"
    ] = "fail_fast",
    errors_field: str | None = None,
    middleware: tuple[Middleware, ...] = (),
)

A node that dispatches M heterogeneous compiled subgraphs concurrently.

The Node Protocol contract requires name, middleware, and run. Like :class:FanOutNode, the engine recognizes this type in _invoke and calls run_with_context so the dispatcher has access to the invocation context for observer-attribution + namespace descent. run exists for Protocol conformance only and raises if anyone calls it directly.

run async

run(state: ParentT) -> Mapping[str, Any]

Not implemented at this level. Dispatching parallel branches requires the engine's invocation context so each branch gets observer attribution and a namespaced descent; the engine calls :meth:run_with_context instead. This method exists only to satisfy the :class:Node Protocol and always raises :class:NotImplementedError.

dispatched_branches

dispatched_branches(
    state: Any,
) -> list[tuple[str, BranchSpec[Any]]]

Return the branches that dispatch for state, in insertion order: every declared branch whose when predicate is absent or returns true (§11.10).

when MUST be a pure function of the dispatch-time parent state, so this is deterministic and safe to evaluate both here (the dispatch set) and at the NODE event's branch_count (the count of branches that dispatch, which excludes when-skipped branches).

run_with_context async

run_with_context(
    state: ParentT, context: _InvocationContext
) -> Mapping[str, Any]

Execute the parallel-branches dispatch and return the merged partial update.

Snapshot parent state, project per-branch initial states, dispatch M branches concurrently in insertion order, then either fail-fast on first error (cancelling the rest) or run to completion and merge per the configured error policy.

ExplicitMapping

ExplicitMapping(
    *,
    inputs: Mapping[str, str] | None = None,
    outputs: Mapping[str, str] | None = None
)

Explicit input/output mapping between parent and subgraph state.

inputs: subgraph_field to parent_field. At entry, the named parent field's current value is copied into the named subgraph field. Subgraph fields not listed receive their schema-declared defaults; there is NO field-name fallback (additive over the default no-projection-in).

outputs: parent_field to subgraph_field. At exit, the named subgraph field's value is merged into the named parent field via the parent's reducer. Subgraph fields not listed are discarded; outputs REPLACES field-name matching for projection-out.

The two directions are independent: pass either, both, or neither. The outputs field distinguishes "absent" (default applies) from "present but empty"; outputs=None means absent (fall back to field-name matching), outputs={} means present and empty (project nothing). For inputs the two defaults coincide (no-projection-in either way), so the distinction is only meaningful for outputs.

project_in

project_in(
    parent_state: ParentT, subgraph_state_cls: type[ChildT]
) -> ChildT

Construct the subgraph's initial state from inputs. Each declared subgraph_field → parent_field pair copies the parent's current value into the subgraph kwarg; subgraph fields not listed get their schema defaults.

project_out

project_out(
    subgraph_final_state: ChildT,
    parent_state: ParentT,
    subgraph_state_cls: type[ChildT],
) -> Mapping[str, Any]

Project back per outputs: each declared parent_field → subgraph_field pair folds the subgraph value into the parent's reducer. When outputs was not provided, falls back to field-name matching.

validate

validate(
    parent_cls: type[ParentT],
    subgraph_state_cls: type[ChildT],
) -> None

Compile-time check that every field name in inputs and outputs exists on the relevant state schema. Called once per subgraph node from the parent's compile(). Raises :class:MappingReferencesUndeclaredField on the first typo.

FieldNameMatching

Default subgraph projection strategy.

Parameterized for protocol conformance under generics. ParentT is not consumed (the default projection ignores parent state on the way in), but carrying the type variable keeps the default assignable to ProjectionStrategy[ParentT, ChildT] without type gymnastics at the SubgraphNode default-factory site.

project_in

project_in(
    parent_state: ParentT, subgraph_state_cls: type[ChildT]
) -> ChildT

No-projection-in: the subgraph starts from its schema's field defaults, ignoring the parent state entirely.

project_out

project_out(
    subgraph_final_state: ChildT,
    parent_state: ParentT,
    subgraph_state_cls: type[ChildT],
) -> Mapping[str, Any]

Project shared field names back to the parent. Subgraph fields whose names match a parent field are folded through the parent's reducer; non-matching subgraph fields are discarded.

ProjectionStrategy

Bases: Protocol

Strategy for moving state across the parent ↔ subgraph boundary.

Two required methods plus one optional hook:

  • project_in and project_out are required: the engine calls them on every subgraph step.
  • validate(parent_cls, subgraph_state_cls) -> None is an optional compile-time validation hook. If a strategy defines it, the parent graph's compile() calls it once per SubgraphNode; the strategy may raise a CompileError subclass when its declarations don't match the supplied schemas. Declarative strategies like ExplicitMapping use this to catch field-name typos before any node runs. Imperative custom projections typically have nothing declarative to check and can simply omit the method; the engine uses duck typing (getattr) to find it.

project_in

project_in(
    parent_state: ParentT, subgraph_state_cls: type[ChildT]
) -> ChildT

Build the subgraph's initial state at the moment it begins.

project_out

project_out(
    subgraph_final_state: ChildT,
    parent_state: ParentT,
    subgraph_state_cls: type[ChildT],
) -> Mapping[str, Any]

Project the subgraph's final state back to the parent as a partial update.

Reducer

Base class for state-field reducers.

Each reducer carries a canonical name used in error messages and introspection. Subclasses override __call__ to merge a node's partial update for a single field into the prior value.

State

Bases: BaseModel

Base for graph state schemas. Immutable; reducers attach via Annotated.

SubgraphNode dataclass

SubgraphNode(
    name: str,
    compiled: CompiledGraph[ChildT],
    projection: ProjectionStrategy[
        ParentT, ChildT
    ] = FieldNameMatching[ParentT, ChildT](),
    middleware: tuple[Middleware, ...] = tuple[
        Middleware, ...
    ](),
    subgraph_identity: str | None = None,
)

A node backed by a compiled subgraph.

The parent's per-node middleware on a SubgraphNode wraps the subgraph dispatch as a single atomic call; parent middleware does NOT cross into the subgraph's internal nodes (those are wrapped by the subgraph's own middleware independently).

run async

run(
    state: ParentT,
    context: _InvocationContext | None = None,
) -> Mapping[str, Any]

Execute the subgraph and project its result back into the parent.

When context is None (e.g., direct invocation in tests, or a parent call that doesn't thread a context), the subgraph runs via its own public invoke(); a fresh root invocation with no parent observer chain.

When context is provided (the engine's normal path during a parent run), the subgraph descends into a child context that shares the parent's queue + step counter and extends the namespace and parent-state stack. Observer events from inner nodes bubble up to outer observers.

classify_cause_chain

classify_cause_chain(exc: Exception) -> CaughtException

Classify exc by walking its __cause__ chain.

Records one CauseLink per exception from exc (outermost) to the originating raise (innermost), flagging node_exception carriers, and derives the single category / message the chain represents (the outermost non-carrier categorized link).

default_classifier

default_classifier(exc: Exception, _state: Any) -> bool

Default classifier; purely category-based, ignores state.

Returns True if either the exception itself or its __cause__ carries a category attribute matching TRANSIENT_CATEGORIES. The cause-walking covers the common case of a graph-engine NodeException wrapping an llm-provider transient: a node_exception whose __cause__ is a transient category classifies as transient.

The _state parameter is ignored by the default; the leading underscore is the canonical Python convention for "intentionally unused" while keeping the signature stable for user-supplied state-aware classifiers.

deterministic_backoff

deterministic_backoff(
    seconds: float,
) -> Callable[[int], float]

Constant-N seconds backoff factory, for deterministic testing.

The conformance fixtures use this form via backoff: {type: deterministic, seconds: N} so retry timing is reproducible across runs.

exponential_jitter_backoff

exponential_jitter_backoff(
    attempt: int, *, base: float = 1.0, cap: float = 30.0
) -> float

Default backoff: random.uniform(0, min(cap, base * 2**attempt)).

Jitter is mandatory; fixed exponential backoff causes synchronized retries from many concurrent callers, amplifying rate-limit storms. base and cap are configurable; the defaults are 1.0 and 30.0 seconds.