openarmature.graph¶
Public API for the OpenArmature graph engine.
Re-exports the surface a user touches when building and running a graph: the state schema base, reducers, the builder/compiled pair, edge primitives and the END sentinel, the node/subgraph/projection seams, and the canonical compile-time and runtime error categories.
GraphBuilder ¶
Mutable builder for a graph; call compile() to produce a CompiledGraph.
add_node ¶
add_node(
name: str,
fn: Callable[[StateT], Awaitable[Mapping[str, Any]]],
*,
middleware: Iterable[Middleware] | None = None
) -> Self
Register a function node.
fn is an async callable taking the current state and
returning a partial update (a mapping from declared state-field
names to new values). The merge into parent state happens
through the field reducers; fn itself does not see or touch
the merge.
middleware is an optional per-node middleware tuple,
outer-to-inner. Per-graph middleware (see
:meth:add_middleware) wraps these.
Raises ValueError if name is already declared on this
builder.
add_subgraph_node ¶
add_subgraph_node(
name: str,
compiled: CompiledGraph[ChildT],
projection: (
ProjectionStrategy[StateT, ChildT] | None
) = None,
*,
middleware: Iterable[Middleware] | None = None
) -> Self
Register a subgraph as a node.
compiled is a child :class:CompiledGraph (parameterized
on its own state type). On entry the parent's state is
translated to the child's via projection.project_in; on
exit the child's final state is folded back via
projection.project_out. The default projection is
:class:FieldNameMatching; pass an :class:ExplicitMapping
for declarative inputs/outputs instead.
middleware wraps the whole subgraph dispatch as one atomic
call from the parent's perspective. Per-graph middleware on
the parent does NOT propagate into the child; child middleware
is configured on the child's own builder.
Raises ValueError if name is already declared.
add_fan_out_node ¶
add_fan_out_node(
name: str,
*,
subgraph: CompiledGraph[ChildT],
collect_field: str,
target_field: str,
items_field: str | None = None,
item_field: str | None = None,
count: int | CountResolver | None = None,
concurrency: int | ConcurrencyResolver | None = 10,
error_policy: str = "fail_fast",
on_empty: str = "raise",
count_field: str | None = None,
inputs: Mapping[str, str] | None = None,
extra_outputs: Mapping[str, str] | None = None,
instance_middleware: Iterable[Middleware] | None = None,
errors_field: str | None = None,
middleware: Iterable[Middleware] | None = None,
subgraph_identity: str | None = None
) -> Self
Register a fan-out node.
Validates configuration at registration time:
- Exactly one of
items_fieldorcountMUST be specified (fan_out_count_mode_ambiguousotherwise). items_fieldMUST refer to a list-typed field on the parent state schema (fan_out_field_not_listotherwise).items_fieldmode requiresitem_field;countmode forbidsitem_field.on_emptyanderror_policyMUST be one of the permitted string literals ("raise"/"noop"and"fail_fast"/"collect"respectively).inputs/extra_outputs/count_fieldfield references go through the existingmapping_references_undeclared_fieldrule.
add_parallel_branches_node ¶
add_parallel_branches_node(
name: str,
*,
branches: Mapping[str, BranchSpec[Any]],
error_policy: Literal[
"fail_fast", "collect"
] = "fail_fast",
errors_field: str | None = None,
middleware: Iterable[Middleware] | None = None
) -> Self
Register a parallel-branches node.
branches is a mapping from non-empty branch name to a
:class:BranchSpec. Insertion order is preserved and is
the dispatch + merge order.
Validates at registration:
branchesnon-empty (raisesParallelBranchesNoBranches).- Each branch name is a non-empty string (raises
ValueError). - Each branch gives its work as exactly one of
subgraph/call, and a callable branch declares noinputs/outputs(raisesParallelBranchesInvalidBranchSpec). - Each subgraph branch's
inputs/outputsrefer only to declared fields on the (parent, branch-subgraph) state schemas (raisesMappingReferencesUndeclaredField). errors_field(when set) is a declared parent-state field.
with_checkpointer ¶
with_checkpointer(checkpointer: Checkpointer) -> Self
Register a Checkpointer for the compiled graph.
At most one Checkpointer per graph; calling
with_checkpointer again replaces the previously-stored
one. Pass the result of :meth:compile to
:meth:CompiledGraph.invoke as usual; the engine fires saves
at every completed event for outermost-graph and
subgraph-internal nodes.
with_state_migration ¶
Register one state migration.
On resume, when the saved record's schema_version does not
match the current state class's schema_version, the engine
consults the registry for a chain that bridges the two and
applies it to the record's state (and to each entry in
parent_states) before deserialization.
Migrations MUST be pure: deterministic, no I/O, no implicit state. The framework does not police purity, but violating it risks non-deterministic resume.
Raises CheckpointStateMigrationChainAmbiguous at
registration if the (from_version, to_version) pair is
already registered. Also raises ValueError if
to_version is the empty-string sentinel (the un-declared
marker is not a valid chain target).
with_state_migrations ¶
with_state_migrations(*migrations: StateMigration) -> Self
Register multiple migrations in one call. Convenience over
with_state_migration; each entry is registered through the
same path and obeys the same ambiguity rule.
Pre-validates the full input list against the existing
registry + against earlier entries in the same call before
mutating, so a duplicate in the third entry cannot leave
the first two half-registered. If any duplicate (from,
to) pair is detected the call raises
CheckpointStateMigrationChainAmbiguous without mutating
the registry; otherwise all entries register atomically.
add_middleware ¶
add_middleware(middleware: Middleware) -> Self
Register a per-graph middleware applied to every node in this graph.
Per-graph middleware composes OUTSIDE per-node middleware.
Calling order is preserved (outer-to-inner); earlier
add_middleware calls produce outer layers in the runtime
chain.
add_edge ¶
add_edge(source: str, target: str | EndSentinel) -> Self
Register a static edge from source to target.
target is either the name of another declared node or the
:data:END sentinel. A node may have at most one outgoing
static edge; declaring two raises
:class:MultipleOutgoingEdges at compile(). For
state-dependent branching use :meth:add_conditional_edge.
add_conditional_edge ¶
add_conditional_edge(
source: str, fn: Callable[[StateT], str | EndSentinel]
) -> Self
Register a conditional edge from source.
fn is a synchronous callable that receives the merged
post-node state and returns either the name of the next node
or the :data:END sentinel. Returning any other value raises
:class:RoutingError at runtime; an exception inside fn
becomes an :class:EdgeException.
Like static edges, a node has at most one outgoing edge;
declaring both a static and a conditional edge from the same
source raises :class:MultipleOutgoingEdges at compile().
set_entry ¶
Declare which registered node the graph starts at.
Calling set_entry again replaces the previously set entry.
compile() raises :class:NoDeclaredEntry if no entry was
set, and :class:DanglingEdge if the entry name was never
declared with add_*_node.
compile ¶
compile() -> CompiledGraph[StateT]
Validate the builder and return an immutable
:class:CompiledGraph.
Runs structural checks in order:
conflicting_reducers(state-field reducer conflicts)- declarative projection
validatehooks (e.g. :class:ExplicitMapping) no_declared_entry/dangling_edge(entry pointer)dangling_edge(edge endpoints reference declared nodes)multiple_outgoing_edges(one edge per source)unreachable_node(every node reachable from entry)
The first failing check raises its specific
:class:CompileError subclass; passing means the returned
graph is ready for :meth:CompiledGraph.invoke. A previously
attached :class:Checkpointer is forwarded onto the compiled
graph.
CaughtException
dataclass
¶
CaughtException(
category: str | None,
message: str,
chain: tuple[CauseLink, ...],
)
A classified exception cause chain.
The result of :func:classify_cause_chain, and the record
FailureIsolatedEvent.caught_exception carries.
category: the derived single failure category, the outermost non-carrier link whose category is a non-empty string, orNonewhen no non-carrier link carries one.message: the message of the linkcategoryis derived from, or (when no link carries a category) the outermost non-carrier link's message.chain: the ordered cause chain, outermost (the classified exception, index 0) to innermost (the originating raise), one :class:CauseLinkper exception.
CauseLink
dataclass
¶
One link in a caught exception's resolved cause chain.
category: the link's failure category when it carries one (a string), elseNone.message: the link's own message (thestrof the exception).carrier:Truewhen the link is an engine-appliednode_exceptioncarrier wrapper,Falsefor an ordinary (non-carrier) exception.
CompiledGraph
dataclass
¶
CompiledGraph(
state_cls: type[StateT],
entry: str,
nodes: Mapping[str, Node[StateT]],
edges: Mapping[
str, StaticEdge | ConditionalEdge[StateT]
],
reducers: Mapping[str, Reducer],
middleware: tuple[Middleware, ...] = (),
_attached_observers: list[SubscribedObserver] = list[
SubscribedObserver
](),
_active_workers: dict[
Task[None], _InvocationContext
] = dict[Task[None], _InvocationContext](),
_checkpointer_slot: list[Checkpointer | None] = (
lambda: [None]
)(),
migration_registry: MigrationRegistry = MigrationRegistry(),
)
An immutable, executable graph produced by GraphBuilder.compile().
The compile-time topology (state class, entry, nodes, edges, reducers) is
immutable. Two mutable lists ride alongside for observer plumbing
(_attached_observers and _active_workers), neither of which affect the
compiled topology and both of which are scoped to the same instance.
checkpointer
property
¶
checkpointer: Checkpointer | None
Currently-registered Checkpointer, or None.
attach_observer ¶
attach_observer(
observer: Observer,
*,
phases: Iterable[str] | None = None
) -> RemoveHandle
Register a graph-attached observer.
Graph-attached observers fire on every invocation of this
graph until removed; including when this graph runs as a
subgraph inside a parent. Returns a RemoveHandle whose
.remove() method detaches the observer; idempotent.
phases selects the phase strings ("started",
"completed") the observer subscribes to; default is both.
An empty phases set raises ValueError at registration
time.
Changes to the registered set during a graph run do NOT take effect until the next invocation. The set of observers delivering events for an in-flight invocation is fixed at the point the invocation begins.
attach_checkpointer ¶
attach_checkpointer(
checkpointer: Checkpointer | None,
) -> None
Register a Checkpointer for this graph.
Pass None to clear a previously-registered backend.
Without a registered Checkpointer the engine never calls
save() and invoke(resume_invocation=...) raises
checkpoint_not_found.
At most one Checkpointer per graph. Calling
attach_checkpointer again replaces the previously-
registered one; multi-backend fan-out is the user's
responsibility (wrap two underlying Checkpointers behind a
custom protocol-conforming implementation if needed).
drain
async
¶
drain(timeout: float | None = None) -> DrainSummary
Await delivery of every observer event produced by prior
invocations of this graph, optionally bounded by timeout.
Callers running in short-lived processes (scripts, serverless functions, CLIs) MUST use drain to avoid losing observer events that were dispatched but not yet delivered.
Only events dispatched before this call are awaited; events from invocations started concurrently with drain may or may not be included. Subgraph events from active invocations are part of the parent invocation's worker and are covered automatically.
timeout is a non-negative duration in seconds. If omitted
or None, drain waits indefinitely — a slow, hung, or
misbehaving observer can therefore hold drain (and the calling
process) indefinitely. If supplied, drain returns no later
than timeout seconds after the call begins; any observer
events still queued or in-flight at that point are considered
undelivered. Workers are cancelled via Task.cancel() so
the compiled graph remains usable for subsequent invocations
— partial delivery state from one drain does NOT leak into
the next invocation.
Returns a :class:DrainSummary with undelivered_count and
timeout_reached fields. The shape is the same whether or
not a timeout was supplied; on the no-timeout / timeout-not-
fired path both fields are zero / false.
Observers SHOULD be written to be cancellation-safe (idempotent writes, try/finally cleanup) so that interruption by drain timeout does not leave partial side effects in an inconsistent state.
Raises ValueError if timeout is negative or NaN.
Non-numeric input raises TypeError from the comparison.
drain_events_for
async
¶
drain_events_for(
invocation_id: str, *, timeout: float | None = 5.0
) -> DrainSummary
Await delivery of every observer event tagged with
invocation_id that was dispatched as of this call's entry,
optionally bounded by timeout.
Use this from a terminal node body to synchronize on the observer event stream before reading derived observer state (a queryable accumulator's per-invocation bucket, a latency rollup, a token-usage record). The drain blocks until every event dispatched up to the moment of the call has reached every attached observer, then returns.
Snapshot semantic: the drain awaits the events dispatched as
of call time. Events emitted after the call begins (notably
the calling node's own completed event, which fires only
after the node body returns) are out of scope. This is what
allows an in-node call to avoid deadlocking on its own
completed event. The calling node's started event, by
contrast, fires immediately BEFORE the body runs and IS in
the snapshot — the drain awaits its delivery normally.
timeout is a non-negative duration in seconds (default
5.0). None waits indefinitely. timeout=0.0 is a
non-blocking check: returns immediately whether the snapshot
target was met. Raises :class:ValueError on negative or
NaN input.
On timeout the deliver worker is left running. The compiled
graph stays available to serve other invocations after a
per-invocation drain times out; the deliver loop continues
processing the queue, including the events the timed-out
caller failed to await. This is the load-bearing difference
from :meth:drain, which cancels its workers.
Returns a :class:DrainSummary with undelivered_count and
timeout_reached. On the clean path both are zero / false;
on timeout undelivered_count is the snapshot target minus
the deliver loop's current delivered count for this
invocation. Unknown invocation_id (no active worker, or
the invocation has already drained and the worker has exited)
returns an empty summary — not an error.
Interaction with :meth:drain: if process-wide drain is
called while a per-invocation drain is pending, drain's
shutdown semantics take precedence. The deliver worker is
cancelled, its remaining events are not delivered, and the
per-invocation waker's target may never be reached. The
per-invocation call then blocks until its own timeout
fires and returns timeout_reached=True. Mixing the two
primitives in the same shutdown path is unusual; use
drain for lifespan / shutdown coordination and
drain_events_for for in-flight synchronization.
invoke
async
¶
invoke(
initial_state: StateT,
observers: (
Iterable[Observer | SubscribedObserver] | None
) = None,
*,
correlation_id: str | None = None,
invocation_id: str | None = None,
resume_invocation: str | None = None,
metadata: Mapping[str, Any] | None = None
) -> StateT
Run the graph from initial_state to END and return the
final state.
Optional observers are invocation-scoped; they fire only
for this run, after all graph-attached observers (including
subgraph-attached ones for events originating in subgraphs).
Each entry in observers may be either a bare Observer
callable (subscribes to both phases) or a SubscribedObserver
wrapping an observer with an explicit phases set.
This method returns as soon as the graph execution loop
completes, regardless of whether the observer delivery queue
has finished processing every dispatched event. Use
await compiled.drain() if you need delivery-completion
guarantees.
Checkpointing.
correlation_idis the per-invocation cross-backend join key. Caller-supplied or auto-generated UUIDv4 when absent. Preserved unchanged acrossresume_invocation.invocation_idis the per-attempt id. Caller-supplied or auto-generated UUIDv4 when absent; a caller value MAY be any non-empty URL-safe string. Applies to the fresh-invocation path only — aresume_invocationmints a fresh id regardless (each attempt is its own invocation).resume_invocationnames a priorinvocation_idto resume from. Requires a registered Checkpointer; raisesCheckpointNotFoundwhen the backend has no record for the supplied id,CheckpointRecordInvalidwhen the loaded record's schema is incompatible. Resume mints a NEWinvocation_id; each attempt is its own invocation in the observability sense; thecorrelation_idis the cross-attempt join key.- Save-failure policy. This implementation raises
CheckpointSaveFailedto the caller ofinvoke()immediately whenCheckpointer.saveraises; saves are NOT retried by the engine. Wrap the Checkpointer in your own retry logic if transient backend failures should be reattempted.
Caller-supplied invocation metadata.
metadatais an optional mapping of arbitrarykey → valueentries the framework propagates to every observability backend. Values MUST be OTel-attribute- compatible scalars (str/int/float/bool) or homogeneous arrays of those types. Keys MUST NOT use theopenarmature.*orgen_ai.*reserved namespaces. Validation runs synchronously at the API boundary; rule violations raiseValueErrorBEFORE any work begins.- The OTel observer emits each entry as an
openarmature.user.<key>cross-cutting span attribute on every span and OTel log record. The Langfuse observer merges each entry intotrace.metadataAND everyobservation.metadata(top level, sibling tocorrelation_id). - Mid-invocation augmentation via
:func:
openarmature.observability.set_invocation_metadatamerges into the same ContextVar with the same validation rules; affects spans emitted AFTER the call returns.
Raises one of the runtime error categories on failure.
ConditionalEdge
dataclass
¶
ConditionalEdge(
source: str, fn: Callable[[StateT], str | EndSentinel]
)
Routes from source to whichever node fn(state) returns. The function
MUST return either a declared node name or END; any other value raises
RoutingError at runtime.
EndSentinel ¶
Engine-provided sentinel routing target. Use the module-level END.
StaticEdge
dataclass
¶
StaticEdge(source: str, target: str | EndSentinel)
Always routes from source to target.
CompileError ¶
ConflictingReducers ¶
Bases: CompileError
Raised at compile() when a state field has more than one
distinct reducer attached across its declarations. Each field
accepts at most one reducer.
DanglingEdge ¶
Bases: CompileError
Raised at compile() when an edge (static or the entry pointer)
names a node that was never declared on the builder.
EdgeException ¶
Bases: RuntimeGraphError
Raised when a conditional-edge callable raises an uncaught
exception during invoke(). The original exception is attached
as __cause__; recoverable_state carries the merged state
that the callable was evaluating.
FanOutCountModeAmbiguous ¶
Bases: CompileError
Raised when a fan-out node specifies both items_field and
count, or neither. Exactly one is required.
FanOutDegradedUpdateMissingCollectField ¶
Bases: CompileError
Raised when a fan-out instance FailureIsolationMiddleware has a
static (mapping) degraded_update that omits the node's
collect_field. A degraded instance contributes its degraded_update
as the instance result, so the collected field has to be present. A
callable degraded_update is exempt: its output is not known at
construction time, and an omitted collect_field yields a null slot at
runtime instead of a failure.
FanOutEmpty ¶
Bases: NodeException
Raised when a fan-out node resolves to zero instances while
its on_empty config is "raise" (the default).
Surfaces as a regular node_exception (so it integrates with
the existing error propagation and recoverable-state machinery)
but exposes an additional fan_out_category attribute so
callers can distinguish empty-fan-out from generic node failures.
FanOutFieldNotList ¶
Bases: CompileError
Raised when a fan-out node's items_field does not refer to
a declared list-typed field on the parent state schema.
FanOutInvalidConcurrency ¶
Bases: NodeException
Raised when a fan-out node's concurrency callable returns
zero or a negative integer at runtime. Same node-exception shape
as :class:FanOutEmpty.
FanOutInvalidCount ¶
Bases: NodeException
Raised when a fan-out node's count callable returns a
negative integer at runtime. Same node-exception shape as
:class:FanOutEmpty, with
fan_out_category = "fan_out_invalid_count".
GraphError ¶
Bases: Exception
Base for all graph-engine errors.
MappingReferencesUndeclaredField ¶
Bases: CompileError
Raised when a subgraph-as-node inputs or outputs
mapping names a field that is not declared in the relevant state
schema.
MultipleOutgoingEdges ¶
Bases: CompileError
Raised at compile() when a single source node has more than
one outgoing static edge. To branch from one node, use
:meth:GraphBuilder.add_conditional_edge instead of multiple
add_edge calls.
NoDeclaredEntry ¶
Bases: CompileError
Raised at compile() when the graph has no entry node set.
Call :meth:GraphBuilder.set_entry before compiling.
NodeException ¶
Bases: RuntimeGraphError
Raised when a node's function (or innermost middleware) raises
an uncaught exception during invoke(). The original exception
is attached as __cause__; recoverable_state carries the
state as it was just before the failing node ran.
ParallelBranchesBranchFailed ¶
ParallelBranchesBranchFailed(
node_name: str,
cause: BaseException,
recoverable_state: Any,
*,
branch_name: str
)
Bases: NodeException
Raised when a branch's subgraph raises under
error_policy: 'fail_fast'.
Subtype of :class:NodeException (a node_exception subtype
attached at the parallel-branches node's level). The existing
NodeException-classifier path handles transient classification
from __cause__: non-transient by default, inheriting transient
classification from the wrapped exception.
Carries branch_name as a structured field; the inner exception
rides __cause__.
ParallelBranchesInvalidBranchSpec ¶
Bases: CompileError
Raised at registration when a branch spec's work is not given by
exactly one of subgraph / call.
A branch is either a compiled subgraph (with optional
inputs / outputs projection) or an inline call (an async
function over the parent state). Declaring both, neither, or
inputs / outputs on a callable branch (which reads parent
state and returns parent-shaped fields directly, so projection is
meaningless) is invalid. Non-transient.
ParallelBranchesNoBranches ¶
Bases: CompileError
Raised at registration when a parallel-branches node's
branches mapping is empty. Non-transient.
ReducerError ¶
ReducerError(
field_name: str,
reducer_name: str,
producing_node: str,
cause: BaseException,
recoverable_state: Any,
)
Bases: RuntimeGraphError
Raised when a field reducer raises while merging a node's
partial update. __cause__ is the original exception;
recoverable_state is the pre-merge state.
RoutingError ¶
Bases: RuntimeGraphError
Raised when a conditional-edge callable returns a value that is
neither the name of a declared node nor the :data:END sentinel.
RuntimeGraphError ¶
Bases: GraphError
Base for runtime errors. The four non-validation categories carry a
recoverable_state attribute.
StateValidationError ¶
Bases: RuntimeGraphError
State failed schema validation at a graph boundary.
Unlike the other runtime errors, this category does NOT carry
recoverable_state: at entry there is no prior state to
recover; at exit the failing state IS the final state.
UnreachableNode ¶
Bases: CompileError
Raised at compile() when a declared node cannot be reached
from the entry by following static or conditional edges.
FailureIsolatedEvent
dataclass
¶
FailureIsolatedEvent(
event_name: str,
namespace: tuple[str, ...],
attempt_index: int,
fan_out_index: int | None,
branch_name: str | None,
pre_state: Any,
post_state: Mapping[str, Any],
caught_exception: CaughtException,
)
A failure-isolation event delivered to observers.
Reports that FailureIsolationMiddleware caught an exception at
a node and substituted a degraded partial update for the node's
output. Observer code filters by type discrimination
(isinstance(event, FailureIsolatedEvent)).
Field set:
event_name: the caller-supplied identifier for this catch site, from the middleware's configuration.namespace/attempt_index/fan_out_index/branch_name: the wrapped node's lineage identity, surfaced for correlation with the node's other events.pre_state: the state the wrapped node received.post_state: the degraded partial update the middleware returned in place of the node's output.caught_exception: a :class:CaughtExceptionrecord of the caught exception (its derived category / message and the full causechain).
InvocationCompletedEvent
dataclass
¶
InvocationCompletedEvent(
final_state: Any,
status: Literal["completed", "failed"],
final_node: str,
invocation_id: str,
correlation_id: str | None,
)
An invocation-exit event delivered to observers.
Emitted once per invocation, after the last node has fired (and
after a failure boundary on the failure path). Observers that
populate Trace-level output fields (the Langfuse observer, today)
consume it to resolve trace.output per the three-lever
decision tree. Observers without a Trace-level output concept (the
OTel observer) treat it as a no-op.
Carries:
final_state: the state at invocation exit (the engine's returned state on the success path; the state at point-of- failure on the failure path).status: closed enum"completed"(END reached) or"failed"(any node, edge, reducer, or boundary validator raised before END).final_node: the name of the node whose execution preceded the END-reached transition on the success path, or the node that raised on the failure path.invocation_id/correlation_id: the run + correlation ids.
The event is NOT subject to the observer phases filter; the
delivery worker forwards it to every subscribed observer.
InvocationStartedEvent
dataclass
¶
InvocationStartedEvent(
initial_state: Any,
invocation_id: str,
correlation_id: str | None,
entry_node: str,
)
An invocation-entry event delivered to observers.
Emitted once per invocation, before any node fires. Observers that
populate Trace-level input fields (the Langfuse observer, today)
consume it to resolve trace.input per the three-lever decision
tree. Observers without a Trace-level input concept (the OTel
observer) treat it as a no-op.
Carries:
initial_state: the raw state object the engine constructed frominvoke()'s arguments (the typed-state instance).invocation_id: the invocation id (caller-supplied or framework-generated).correlation_id: the correlation id when present.entry_node: the outermost-graph entry node name.
The event is NOT subject to the observer phases filter (which
only governs NodeEvent phases); the delivery worker forwards it
to every subscribed observer.
LlmCompletionEvent
dataclass
¶
LlmCompletionEvent(
invocation_id: str,
correlation_id: str | None,
node_name: str,
namespace: tuple[str, ...],
attempt_index: int,
fan_out_index: int | None,
branch_name: str | None,
provider: str,
model: str,
response_id: str | None,
response_model: str | None,
usage: Usage | None,
latency_ms: float | None,
finish_reason: str | None,
input_messages: list[dict[str, Any]],
output_content: str | None,
request_params: Mapping[str, Any],
request_extras: Mapping[str, Any],
active_prompt: Any,
active_prompt_group: Any,
call_id: str,
caller_invocation_metadata: (
Mapping[str, AttributeValue] | None
) = None,
output_tool_calls: list[ToolCall] = list["ToolCall"](),
)
A typed LLM provider call event delivered to observers.
Carries identity, scoping, and outcome data for an LLM call as
structured fields. Observer code filters by type discrimination
(isinstance(event, LlmCompletionEvent)) rather than by the
impl-current sentinel-namespace string match the legacy
NodeEvent pattern uses.
Field set:
invocation_id: the outer invocation's identifier.correlation_id: cross-backend correlation id when present.node_name: the user-defined node that issued the call.namespace: the calling node's namespace tuple (NOT the legacy sentinel namespace).attempt_index: retry-attempt index (0 on first attempt).fan_out_index: fan-out instance index when the calling node ran inside a fan-out instance;Noneotherwise.branch_name: parallel-branches branch name when the calling node ran inside a branch;Noneotherwise.provider: provider identifier; matchesgen_ai.system.model: the model identifier the call targeted (the request-side bound model; distinct fromresponse_model).response_id: provider-returned response id;Nonewhen the provider didn't return one.response_model: provider-returned model identifier; distinct frommodel(the provider may return a more specific identifier than the one requested).Nonewhen the provider didn't return one.usage: token-accounting record reusing the existingopenarmature.llm.response.Usageclass.Nonewhen the call returned no usage at all.latency_ms: wall-clock latency measured at the adapter boundary, in milliseconds.Nonewhen latency was not measured.finish_reason: the call's finish reason;Nonewhen the call did not complete normally.input_messages: the message list the call was made with, serialized to the plain-dict shape. Non-nullable; empty list when the call had no history. Inline image bytes are redacted before population (see the comment block above for the redaction contract).output_content: the assistant message's content string from the response.Noneon tool-call-only responses (the structured-response and tool-call paths are mutually exclusive at the response level).output_tool_calls: the assistant message's output tool calls (theToolCallrecords). Populated unconditionally; empty list when the response carried no tool calls. The output tool calls live here rather than inoutput_content(which is the response text and is empty on a tool-call-only response).request_params: the GenAI request-parameter set the caller supplied. Absence-is-meaningful: only caller-supplied keys appear; empty mapping when none supplied. Keys are the cross-vendor parameter names without thegen_ai.request.prefix (e.g.temperature,max_tokens).request_extras: theRuntimeConfigextras pass- through bag in native mapping form (not JSON-encoded). Empty mapping when no extras supplied.active_prompt: 5-field identity snapshot of the activePromptResultat LLM-call time (name/version/label/template_hash/rendered_hash).Nonewhen the call ran outside any prompt-context binding. Typed asAnybecause the prompts package imports State indirectly; observer-side narrowing reads the attribute names directly.active_prompt_group:{group_name}snapshot when the call ran inside aPromptGroupcontext;Noneotherwise. SameAnytyping rationale asactive_prompt.call_id: per-call disambiguator minted by the implementation. Always present, freshly minted perprovider.complete()call, stable for the call's lifetime, unique within the run. Distinct fromresponse_id.caller_invocation_metadata: optional snapshot of caller- supplied invocation metadata at LLM-call time. OPTIONAL; the python OpenAIProvider populates it by default so the bundled OTel/Langfuse observers can emit theopenarmature.user.<key>span-attribute family without an extra opt-in. Passpopulate_caller_metadata=Falseto suppress the snapshot. Future non-OpenAI providers MAY default toNone.
LlmFailedEvent
dataclass
¶
LlmFailedEvent(
invocation_id: str,
correlation_id: str | None,
node_name: str,
namespace: tuple[str, ...],
attempt_index: int,
fan_out_index: int | None,
branch_name: str | None,
provider: str,
model: str,
latency_ms: float | None,
input_messages: list[dict[str, Any]],
request_params: Mapping[str, Any],
request_extras: Mapping[str, Any],
active_prompt: Any,
active_prompt_group: Any,
call_id: str,
error_category: str,
error_message: str,
error_type: str | None = None,
caller_invocation_metadata: (
Mapping[str, AttributeValue] | None
) = None,
)
A typed LLM provider call failure event delivered to observers.
Carries identity, scoping, and failure-context data for an LLM
call that raised a llm-provider category exception. Observer
code filters by type discrimination (isinstance(event,
LlmFailedEvent)) rather than by the impl-current sentinel-
namespace string match.
Identity / scoping / request-side field set mirrors
LlmCompletionEvent 1:1 — same field semantics, same nullability
rules. Response-side fields (response_id, response_model,
usage, output_content, finish_reason) are ABSENT from
this variant — no response was received.
Failure-specific fields:
error_category: the llm-provider normative error category the call raised. One of the 9 canonical strings (provider_authentication,provider_unavailable,provider_invalid_model,provider_model_not_loaded,provider_rate_limit,provider_invalid_response,provider_invalid_request,provider_unsupported_content_block,structured_output_invalid). Always present.error_type: OPTIONAL impl-level / vendor-specific error type or code. Two acceptable styles: vendor error code (e.g."rate_limit_exceeded") OR upstream exception class name (e.g."RateLimitError").Nonewhen no impl-side type is available.error_message: human-readable message from the raised exception. Always present (empty string when the exception carried no message).
LlmRetryAttemptEvent
dataclass
¶
LlmRetryAttemptEvent(
invocation_id: str,
correlation_id: str | None,
node_name: str,
namespace: tuple[str, ...],
attempt_index: int,
fan_out_index: int | None,
branch_name: str | None,
provider: str,
model: str,
call_id: str,
llm_attempt_index: int,
latency_ms: float | None,
input_messages: list[dict[str, Any]],
request_params: Mapping[str, Any],
request_extras: Mapping[str, Any],
active_prompt: Any,
active_prompt_group: Any,
response_id: str | None = None,
response_model: str | None = None,
usage: Usage | None = None,
finish_reason: str | None = None,
output_content: str | None = None,
error_category: str | None = None,
error_message: str | None = None,
error_type: str | None = None,
caller_invocation_metadata: (
Mapping[str, AttributeValue] | None
) = None,
output_tool_calls: list[ToolCall] = list["ToolCall"](),
)
One LLM-call attempt delivered to observers for per-attempt span rendering.
Carries the full request-side surface plus that attempt's outcome.
error_category discriminates the outcome: None for a
successful attempt (the response-side fields are populated), a
category string for a failed attempt (the response-side fields are
None — no response was received).
Field set:
llm_attempt_index: the call-level retry-attempt index,0for the first attempt and0..N-1across the N attempts of a call-level retry. Distinct fromattempt_index(the node-level retry index used for calling-span resolution); the two are independent.- identity / scoping (
invocation_id...call_id) and the request side (input_messages/request_params/request_extras/active_prompt/active_prompt_group) mirror :class:LlmCompletionEvent, carried on every attempt. - response side (
response_id/response_model/usage/finish_reason/output_content/output_tool_calls): populated on a successful attempt;None/ empty list on a failed attempt.output_tool_callsis the source the OTel observer renders the §5.5.1 / §5.5.10 output tool-call attributes from (this is the per-attempt event that drives the LLM span). - failure side (
error_category/error_message/error_type): populated on a failed attempt;Noneon a successful one.
MetadataAugmentationEvent
dataclass
¶
MetadataAugmentationEvent(
entries: Mapping[str, AttributeValue],
namespace: tuple[str, ...],
attempt_index: int = 0,
fan_out_index: int | None = None,
branch_name: str | None = None,
fan_out_index_chain: tuple[int | None, ...] = (),
branch_name_chain: tuple[str | None, ...] = (),
)
A metadata-augmentation event delivered to observers.
Emitted by :func:openarmature.observability.metadata.set_invocation_metadata
when called mid-invocation. Carries:
entries: the delta merged into the per-async-context invocation metadata mapping by the call. Read-only view.namespace/attempt_index/fan_out_index/branch_name: the four lineage fields that jointly identify the augmenting execution context (the calling node's identity tuple). Whenset_invocation_metadatais called from outside a node body,namespaceis the empty tuple,attempt_indexis0, and bothfan_out_indexandbranch_nameareNone— the invocation-level identity.
Distinct from :class:NodeEvent because there is no node phase,
no pre/post state, and no error: this event reports a side-channel
augmentation, not a node-attempt boundary. The event is NOT
subject to the observer phases filter (which only governs
NodeEvent phases); the delivery worker forwards it to every
subscribed observer. Observers that handle it iterate their open
observations whose lineage is an ancestor of (or equal to) the
augmenting context's lineage and apply the entries as
openarmature.user.<key> (OTel) / metadata.<key>
(Langfuse).
NodeEvent
dataclass
¶
NodeEvent(
node_name: str,
namespace: tuple[str, ...],
step: int,
phase: Literal[
"started",
"completed",
"checkpoint_saved",
"checkpoint_migrated",
],
pre_state: Any,
post_state: State | None,
error: RuntimeGraphError | None,
parent_states: tuple[State, ...],
attempt_index: int = 0,
fan_out_index: int | None = None,
fan_out_config: FanOutEventConfig | None = None,
parallel_branches_config: (
ParallelBranchesEventConfig | None
) = None,
branch_name: str | None = None,
fan_out_index_chain: tuple[int | None, ...] = (),
branch_name_chain: tuple[str | None, ...] = (),
subgraph_identities: tuple[str | None, ...] = (),
caller_invocation_metadata: Mapping[
str, AttributeValue
] = (lambda: _EMPTY_METADATA)(),
)
A single node-boundary event delivered to observers.
phaseis"started"(dispatched before the node runs) or"completed"(dispatched after the node returns or raises and the merge runs/fails). Each node attempt produces exactly one of each in that order. The engine ALSO dispatches a"checkpoint_saved"event on the same shape after a successfulCheckpointer.savecall; observers MUST opt in explicitly viaphases={"checkpoint_saved"}to receive these (default subscription is{"started", "completed"}only, so legacy observers don't see them).node_nameis the name under which this node was registered in its immediate containing graph.namespaceis an ordered sequence of node names from the outermost graph down to this node. For a node in the outermost graph,namespaceis(node_name,). For nested subgraphs, the chain extends.stepis a monotonically-increasing counter starting at 0, scoped to a single outermost invocation. Subgraph-internal nodes increment the same counter. The started/completed pair for one attempt share the same step.pre_stateis the state the node received, before reducer merge. Populated on both phases (identical across the pair).post_stateis the state after the node's partial update merged successfully. Populated only oncompletedevents that succeeded.erroris the wrapped runtime error (NodeException,ReducerError, orStateValidationError) when the node failed. Populated only oncompletedevents that failed.parent_statescarries one state snapshot per containing graph, outermost first; for a node in the outermost graph it's an empty tuple. Invariant:len(parent_states) == len(namespace) - 1.attempt_indexis the 0-based index of this attempt among any retries.0for nodes not wrapped by retry middleware.fan_out_indexis the 0-based index of this fan-out instance among its siblings.Nonefor nodes not inside a fan-out.fan_out_configcarries resolved fan-out configuration on events from a fan-out NODE itself. See :class:FanOutEventConfig.Noneon every other event.branch_nameis the non-empty string name of the parallel-branches branch this event came from.Nonefor nodes outside any branch. The combination ofnamespace,branch_name,fan_out_index,attempt_index, andphasejointly uniquely identifies an event source.branch_nameandfan_out_indexare independent; both MAY be present when a branch's subgraph contains a fan-out (or a fan-out instance contains a parallel-branches node).
Invariants:
- On
startedevents,post_stateanderrorMUST both beNone. - On
completedevents, exactly one ofpost_stateanderroris populated.
Synthetic phases. "checkpoint_saved" and
"checkpoint_migrated" repurpose this dataclass for non-node
events. Both
are opt-in via phases={...} on observer registration;
default subscriptions are {"started", "completed"} only, so
legacy observers never see them. Conventions on synthetic
events:
checkpoint_saved:pre_statecarries the saved post-merge state (still a realStateinstance for this phase),post_stateisNone.stepmatches the saving node's step.checkpoint_migrated:step=-1(no graph-step sequencing; migrations run before any node fires).node_name="openarmature.checkpoint.migrate"andnamespace=("openarmature.checkpoint.migrate",)are dotted-pseudo identifiers, not real node names.pre_statecarries a private_MigrationSummarydataclass withfrom_version/to_version/chain_length, NOT aStateinstance.parent_statesis the empty tuple.
Because pre_state is no longer guaranteed to be a State
on the synthetic phases, its type is declared as Any and
observer authors who subscribe to those phases MUST narrow
per-phase before reading pre_state.
FanOutConfig
dataclass
¶
FanOutConfig(
subgraph: CompiledGraph[Any],
collect_field: str,
target_field: str,
items_field: str | None = None,
item_field: str | None = None,
count: int | CountResolver | None = None,
concurrency: int | ConcurrencyResolver | None = 10,
error_policy: Literal[
"fail_fast", "collect"
] = "fail_fast",
on_empty: Literal["raise", "noop"] = "raise",
count_field: str | None = None,
inputs: Mapping[str, str] = dict[str, str](),
extra_outputs: Mapping[str, str] = dict[str, str](),
instance_middleware: tuple[Middleware, ...] = (),
errors_field: str | None = None,
subgraph_identity: str | None = None,
)
Frozen configuration for a :class:FanOutNode.
Validation happens at builder compile time (see
GraphBuilder.add_fan_out_node); construction here is
unchecked beyond the obvious type-level constraints.
FanOutNode
dataclass
¶
FanOutNode(
name: str,
config: FanOutConfig,
middleware: tuple[Middleware, ...] = (),
)
A node that fans out into N concurrent subgraph instances.
The Node Protocol contract requires name, middleware, and
run; run here is unusual because it needs the engine
invocation context to descend properly. The engine recognizes this
type in _invoke and calls run_with_context (see
compiled.CompiledGraph._step_fan_out_node) rather than the
plain run(state) shape. run exists for Protocol conformance
only and raises if anyone calls it without context.
run
async
¶
Not implemented at this level. The fan-out node requires the
engine's invocation context to resolve count and concurrency
and dispatch instances; the engine calls
:meth:run_with_context instead. This method exists only to
satisfy the :class:Node Protocol and always raises
:class:NotImplementedError.
run_with_context
async
¶
run_with_context(
state: ParentT,
context: _InvocationContext,
*,
pre_resolved_count: int | None = None,
pre_resolved_concurrency: (
tuple[int | None] | None
) = None
) -> Mapping[str, Any]
Execute the fan-out and return the merged partial update.
Snapshot, resolve count + concurrency, build per-instance states, run concurrently with the configured error policy, fan-in collected/extra fields, write count_field and errors_field if configured.
Per the per-instance resume contract: this method registers a
per-fan-out tracking entry on the shared
context.fan_out_progress_state dict before dispatching,
flips each instance's state through
not_started -> in_flight -> completed as the instance
progresses, and fires an explicit "instance completed" save
after the per-instance contribution has been recorded into
the accumulator. The atomicity contract is
observed: the per-instance state mutation precedes the save,
so a crash after mutation but before save leaves the saved
record showing in_flight (resume re-runs the instance).
pre_resolved_count / pre_resolved_concurrency are
hooks: when the engine has already
resolved the config eagerly to populate
NodeEvent.fan_out_config for the fan-out node's events,
it passes the resolved values in so callable resolvers
aren't invoked twice. pre_resolved_concurrency is wrapped
in a 1-tuple to disambiguate "caller passed None
(unbounded)" from "caller didn't pass anything."
FailureIsolationMiddleware ¶
FailureIsolationMiddleware(
*,
degraded_update: DegradedUpdate,
event_name: str,
catch: Collection[str] | None = None,
predicate: Callable[[Exception], bool] | None = None,
on_caught: (
Callable[[Exception], Awaitable[None]] | None
) = None
)
Catch exceptions escaping the inner chain; return a degraded partial update.
Configuration:
degraded_update(required): the partial update returned on a caught exception, OR a callablestate -> partial_updatefor input-state-dependent degraded shapes.event_name(required): a stable identifier for this catch site; surfaces on theFailureIsolatedEvent. No default — useful values are node-specific, and a generic default would make downstream telemetry strictly worse.catch(optional): a set of error categories. When supplied, an exception is caught only if the derived category of its cause chain (the category of the outermost non-carrier link that carries one, resolving throughnode_exceptioncarriers, the value reported ascaught_exception.category) is in the set. Composes withpredicateas a conjunction; both default permissive (both unset catches everyException). The recommended gate for category-scoped degradation.predicate(optional):Exception -> boolover the SURFACE (caught) exception. When supplied, only exceptions wherepredicate(exc)is true are caught; others propagate. Defaults to always-true. A predicate inspecting the exception directly sees thenode_exceptioncarrier at a wrapping placement, not the originating failure; usecatchfor category gating, or classify the chain viaclassify_cause_chain.on_caught(optional): an asyncException -> Awaitable[None]hook fired on a caught exception, for caller-specific telemetry beyond the framework event. It runs inline before the degraded update is returned, so a slow hook delays the node's return; an exception raised by the hook is isolated (logged viawarnings.warn, not propagated) so a telemetry bug cannot turn a recovered node back into a failure.
Middleware ¶
Bases: Protocol
An async callable that wraps the dispatch of a single node.
The shape is (state, next) -> partial_update. The middleware
MUST return a mapping of field names to values; same shape a
node returns. It may:
- Inspect or transform
statebefore callingnext(state). - Inspect or transform the partial update returned from
next. - Short-circuit by NOT calling
nextand returning its own partial (the rest of the chain, meaning any subsequent middleware and the wrapped node, does not execute). - Catch exceptions raised by
next(state)and either re-raise, transform, or recover (returning a partial update instead of raising). - Call
nextmore than once (e.g., retry middleware).
A middleware MUST NOT mutate the input state object; pass a new
state to next if a transformation is needed.
NextCall ¶
Bases: Protocol
The next callable a middleware receives.
Calling it with a state invokes the next layer of the chain (or
the wrapped node, at the inner end) and returns the partial
update from that layer. Middleware MAY transform the state passed
to next; the transformed state flows down the chain but does
NOT replace the engine's pre-merge state at the outermost level
(the transformed state is passed to next, NOT to the engine's
merge step).
RetryConfig
dataclass
¶
RetryConfig(
max_attempts: int = 3,
classifier: Classifier | None = None,
backoff: BackoffStrategy | None = None,
on_retry: OnRetryCallback | None = None,
)
Canonical retry configuration record consumed by
:class:RetryMiddleware.
max_attempts: total attempts including the first call.1disables retry. Default3.classifier: predicate(exception, state) -> booldeciding whether a failure is retry-eligible.None(the default) selects :func:default_classifier(matchescategoryagainstTRANSIENT_CATEGORIES).backoff: callable(attempt_index) -> seconds.None(the default) selects :func:exponential_jitter_backoff(base 1s, cap 30s, full jitter).on_retry: optional async callback(exception, attempt_index) -> Nonefired before each backoff sleep.
RetryMiddleware ¶
RetryMiddleware(config: RetryConfig | None = None)
Canonical retry middleware.
Configured with a :class:RetryConfig (or the default
RetryConfig() when omitted). Construct as
RetryMiddleware(RetryConfig(max_attempts=...)).
TimingMiddleware ¶
TimingMiddleware(
*,
node_name: str,
on_complete: OnCompleteCallback,
clock: Callable[[], float] | None = None
)
Canonical timing middleware.
Records wall-clock duration of the wrapped chain via the host
language's monotonic clock (Python's time.monotonic). The
callback fires inline before the chain's result returns to the
caller; slow callbacks add to the apparent node duration, so
users SHOULD keep them fast (queue work, defer I/O).
Errors raised by on_complete propagate to the engine as a
node_exception.
TimingRecord
dataclass
¶
A single timing measurement produced by TimingMiddleware.
node_name: the node this middleware was attached to (captured at registration; users supply it explicitly for per-node use).duration_ms: milliseconds from middleware entry to chain return-or-raise, measured with a monotonic clock.outcome: one of"success"or"exception".exception_category: whenoutcome == "exception"and the exception carries acategoryattribute, that string; otherwiseNone.
FunctionNode
dataclass
¶
FunctionNode(
name: str,
fn: Callable[[StateT], Awaitable[Mapping[str, Any]]],
middleware: tuple[Middleware, ...] = tuple[
Middleware, ...
](),
)
A node backed by an async callable.
run
async
¶
Invoke the wrapped async callable and return its partial update. Called by the engine inside any per-node and per-graph middleware chain.
Node ¶
Bases: Protocol
A unit of work in a compiled graph.
middleware
property
¶
middleware: tuple[Middleware, ...]
Per-node middleware applied at this node's registration site, outer-to-inner. Composed inside any per-graph middleware.
run
async
¶
Execute against state and return a partial update to be merged via reducers.
DrainSummary
dataclass
¶
Outcome of a CompiledGraph.drain() call.
Returned from drain() regardless of whether a timeout was
supplied. When no timeout was supplied, or the timeout did not
fire, undelivered_count == 0 and timeout_reached is False.
When the timeout fired, undelivered_count reports the number of
events that were dispatched to the delivery worker but not fully
delivered to every subscribed observer before cancellation, and
timeout_reached is True.
These two fields are the required minimum. Implementations MAY extend the shape with diagnostic detail (per-observer counts, sampled event metadata) in subsequent versions; this version ships the minimum.
Observer ¶
Bases: Protocol
The shape of a callable that receives observer events.
Observer is a structural Protocol; any async callable matching the
signature qualifies, no subclass required. Plain functions, bound
methods, and class instances with __call__ all work::
async def log_observer(event: NodeEvent | MetadataAugmentationEvent) -> None:
if isinstance(event, NodeEvent):
print(event.node_name, event.phase)
compiled.attach_observer(log_observer)
Contract:
- Observers MUST be async so the delivery queue can await each one and coordinate ordering. The graph itself never awaits observers.
- Observers MUST NOT alter state, routing, or any other aspect of the graph run. Read-only side effects (logging, metrics, span emission) only.
The event parameter is positional-only (event, /) so structural
conformance doesn't pin you to that name; any of event, _event,
e, etc. matches.
Seven event variants reach observers. The signature is the union;
observers isinstance-narrow on the first line and choose which
variants they handle.
- :class:
NodeEvent— the started/completed/checkpoint phase events. Subject to thephasesfilter on :class:SubscribedObserver; observers whose phase set excludesevent.phasedo NOT receive it. - :class:
MetadataAugmentationEvent— emitted by :func:openarmature.observability.metadata.set_invocation_metadatawhen called mid-invocation. Carries the augmenting context's lineage tuple (namespace,attempt_index,fan_out_index,branch_name) so rich backends can update their open observations in place (span.set_attribute(openarmature.user.<key>, v)for OTel,observation.update(metadata=...)for Langfuse). This variant is NOT subject to thephasesfilter — every subscribed observer sees it and isinstance-narrows to decide whether to act. Simple user observers typically early-return afterisinstance(event, NodeEvent)checks. - :class:
InvocationStartedEvent— emitted once per invocation before any node fires. Carries the engine-constructedinitial_stateso Trace-level backends (Langfuse) can populatetrace.inputvia the three-lever decision tree. NOT subject to thephasesfilter; OTel-only observers ignore it via the isinstance gate. - :class:
InvocationCompletedEvent— emitted once per invocation after the last node fires (on both the success path and the failure path). Carriesfinal_state+ a closedstatus: {"completed", "failed"}enum so Trace-level backends can populatetrace.output. NOT subject to thephasesfilter; OTel-only observers ignore it via the isinstance gate. - :class:
LlmCompletionEvent— dispatched on every successful LLM provider call. Carries the typed identity / request / response field set for LLM-aware backends. NOT subject to thephasesfilter; non-LLM observers ignore it via the isinstance gate. - :class:
LlmFailedEvent— the failure-side counterpart, dispatched alongside the provider exception when an LLM call raises. NOT subject to thephasesfilter. - :class:
FailureIsolatedEvent— dispatched byFailureIsolationMiddlewarewhen it catches an exception and substitutes a degraded partial update. NOT subject to thephasesfilter.
Optional prepare_sync extension¶
An observer MAY additionally define a synchronous method::
def prepare_sync(self, event: NodeEvent, /) -> None: ...
that the engine calls IN THE ENGINE TASK, BEFORE queueing the
event for the async __call__. This exists for observers that
need to set up state (e.g., open a span and stash a handle in a
ContextVar) that the engine itself must read synchronously
before running the node body (otherwise logs emitted on the
first line of the body wouldn't see the right span).
prepare_sync is opt-in via hasattr; no subclass or
Protocol method required. Observers that don't define it skip
the synchronous prep entirely; observers that do define it run
only for "started"-phase events, with errors warned-not-
propagated (same isolation contract as the async path).
prepare_sync is never invoked for
:class:MetadataAugmentationEvent (the synchronous-prep contract
is anchored on the started phase, which only NodeEvent
carries).
RemoveHandle
dataclass
¶
RemoveHandle(
_observers: list[SubscribedObserver],
_observer: SubscribedObserver,
)
Returned by CompiledGraph.attach_observer. Call
.remove() to detach the observer. Idempotent: calling
.remove() after the observer is already detached is a no-op.
Changes to the registered observer set during a graph run do NOT take effect until the next invocation.
remove ¶
Detach the observer from its compiled graph. Idempotent: a
second call is a no-op rather than an error. The change takes
effect on the next invoke(); in-flight invocations keep
the observer set they started with.
SubscribedObserver
dataclass
¶
SubscribedObserver(
observer: Observer, phases: frozenset[str] = ALL_PHASES
)
An observer paired with its phase subscription set.
Observers register with an optional phases parameter naming
the phase strings they want to receive. The default is
ALL_PHASES, historically named when there were only two
phases; it now means "the default subscription"
({"started", "completed"}). The "checkpoint_saved" phase
is opt-in: subscribe to it explicitly via
phases={"checkpoint_saved"} (or include it in a custom set).
KNOWN_PHASES is the full "every phase the engine can produce"
set used by the registration-time validator.
Empty phase sets are forbidden; passing one raises
ValueError at registration time so misconfiguration surfaces
immediately.
Construct one of these directly when handing phase-filtered
observers to CompiledGraph.invoke(observers=...). For the
single-observer attach_observer path, pass phases= as a
keyword argument and the engine wraps it for you.
BranchSpec
dataclass
¶
BranchSpec(
subgraph: CompiledGraph[ChildT] | None = None,
call: BranchCallable | None = None,
when: Callable[[Any], bool] | None = None,
inputs: Mapping[str, str] = dict[str, str](),
outputs: Mapping[str, str] = dict[str, str](),
middleware: tuple[Middleware, ...] = (),
)
One entry in a :class:ParallelBranchesNode's branch mapping.
A branch's work is given by exactly one of:
subgraph— a compiled subgraph (the heterogeneous-subgraph form): each branch may reference a different compiled subgraph with a different state schema, withinputs/outputsfollowing the same shape as subgraph projection mappings.call— an inline async function over the parent state returning a parent-shaped partial update (the lightweight form): no subgraph, no state schema, noinputs/outputs. The function reads the parent state directly and returns parent fields.
An optional when predicate over the parent state, evaluated once
at dispatch, skips the branch entirely when it returns false.
Validation lives on the builder side
(GraphBuilder.add_parallel_branches_node): exactly one of
subgraph / call and no inputs / outputs on a
callable branch (parallel_branches_invalid_branch_spec);
mapping_references_undeclared_field for subgraph-branch
inputs/outputs referencing undeclared fields;
parallel_branches_no_branches for empty branches maps;
ValueError for empty-string branch names.
ParallelBranchesNode
dataclass
¶
ParallelBranchesNode(
name: str,
branches: dict[str, BranchSpec[Any]],
error_policy: Literal[
"fail_fast", "collect"
] = "fail_fast",
errors_field: str | None = None,
middleware: tuple[Middleware, ...] = (),
)
A node that dispatches M heterogeneous compiled subgraphs concurrently.
The Node Protocol contract requires name, middleware,
and run. Like :class:FanOutNode, the engine recognizes
this type in _invoke and calls run_with_context so the
dispatcher has access to the invocation context for
observer-attribution + namespace descent. run exists for
Protocol conformance only and raises if anyone calls it
directly.
run
async
¶
Not implemented at this level. Dispatching parallel branches
requires the engine's invocation context so each branch gets
observer attribution and a namespaced descent; the engine
calls :meth:run_with_context instead. This method exists
only to satisfy the :class:Node Protocol and always raises
:class:NotImplementedError.
dispatched_branches ¶
dispatched_branches(
state: Any,
) -> list[tuple[str, BranchSpec[Any]]]
Return the branches that dispatch for state, in insertion
order: every declared branch whose when predicate is absent or
returns true (§11.10).
when MUST be a pure function of the dispatch-time parent state,
so this is deterministic and safe to evaluate both here (the
dispatch set) and at the NODE event's branch_count (the count of
branches that dispatch, which excludes when-skipped branches).
run_with_context
async
¶
Execute the parallel-branches dispatch and return the merged partial update.
Snapshot parent state, project per-branch initial states, dispatch M branches concurrently in insertion order, then either fail-fast on first error (cancelling the rest) or run to completion and merge per the configured error policy.
ExplicitMapping ¶
ExplicitMapping(
*,
inputs: Mapping[str, str] | None = None,
outputs: Mapping[str, str] | None = None
)
Explicit input/output mapping between parent and subgraph state.
inputs: subgraph_field to parent_field. At entry, the named
parent field's current value is copied into the named subgraph
field. Subgraph fields not listed receive their schema-declared
defaults; there is NO field-name fallback (additive over the
default no-projection-in).
outputs: parent_field to subgraph_field. At exit, the named
subgraph field's value is merged into the named parent field via
the parent's reducer. Subgraph fields not listed are discarded;
outputs REPLACES field-name matching for projection-out.
The two directions are independent: pass either, both, or
neither. The outputs field distinguishes "absent" (default
applies) from "present but empty"; outputs=None means absent
(fall back to field-name matching), outputs={} means present
and empty (project nothing). For inputs the two defaults
coincide (no-projection-in either way), so the distinction is
only meaningful for outputs.
project_in ¶
Construct the subgraph's initial state from inputs.
Each declared subgraph_field → parent_field pair copies
the parent's current value into the subgraph kwarg; subgraph
fields not listed get their schema defaults.
project_out ¶
project_out(
subgraph_final_state: ChildT,
parent_state: ParentT,
subgraph_state_cls: type[ChildT],
) -> Mapping[str, Any]
Project back per outputs: each declared
parent_field → subgraph_field pair folds the subgraph
value into the parent's reducer. When outputs was not
provided, falls back to field-name matching.
validate ¶
Compile-time check that every field name in inputs and
outputs exists on the relevant state schema. Called once
per subgraph node from the parent's compile(). Raises
:class:MappingReferencesUndeclaredField on the first typo.
FieldNameMatching ¶
Default subgraph projection strategy.
Parameterized for protocol conformance under generics. ParentT
is not consumed (the default projection ignores parent state on
the way in), but carrying the type variable keeps the default
assignable to ProjectionStrategy[ParentT, ChildT] without type
gymnastics at the SubgraphNode default-factory site.
project_in ¶
No-projection-in: the subgraph starts from its schema's field defaults, ignoring the parent state entirely.
project_out ¶
project_out(
subgraph_final_state: ChildT,
parent_state: ParentT,
subgraph_state_cls: type[ChildT],
) -> Mapping[str, Any]
Project shared field names back to the parent. Subgraph fields whose names match a parent field are folded through the parent's reducer; non-matching subgraph fields are discarded.
ProjectionStrategy ¶
Bases: Protocol
Strategy for moving state across the parent ↔ subgraph boundary.
Two required methods plus one optional hook:
project_inandproject_outare required: the engine calls them on every subgraph step.validate(parent_cls, subgraph_state_cls) -> Noneis an optional compile-time validation hook. If a strategy defines it, the parent graph'scompile()calls it once perSubgraphNode; the strategy may raise aCompileErrorsubclass when its declarations don't match the supplied schemas. Declarative strategies likeExplicitMappinguse this to catch field-name typos before any node runs. Imperative custom projections typically have nothing declarative to check and can simply omit the method; the engine uses duck typing (getattr) to find it.
Reducer ¶
Base class for state-field reducers.
Each reducer carries a canonical name used in error messages and
introspection. Subclasses override __call__ to merge a node's partial
update for a single field into the prior value.
State ¶
Bases: BaseModel
Base for graph state schemas. Immutable; reducers attach via Annotated.
SubgraphNode
dataclass
¶
SubgraphNode(
name: str,
compiled: CompiledGraph[ChildT],
projection: ProjectionStrategy[
ParentT, ChildT
] = FieldNameMatching[ParentT, ChildT](),
middleware: tuple[Middleware, ...] = tuple[
Middleware, ...
](),
subgraph_identity: str | None = None,
)
A node backed by a compiled subgraph.
The parent's per-node middleware on a SubgraphNode wraps the subgraph dispatch as a single atomic call; parent middleware does NOT cross into the subgraph's internal nodes (those are wrapped by the subgraph's own middleware independently).
run
async
¶
Execute the subgraph and project its result back into the parent.
When context is None (e.g., direct invocation in tests, or a parent
call that doesn't thread a context), the subgraph runs via its own
public invoke(); a fresh root invocation with no parent observer
chain.
When context is provided (the engine's normal path during
a parent run), the subgraph descends into a child context
that shares the parent's queue + step counter and extends the
namespace and parent-state stack. Observer events from inner
nodes bubble up to outer observers.
classify_cause_chain ¶
classify_cause_chain(exc: Exception) -> CaughtException
Classify exc by walking its __cause__ chain.
Records one CauseLink per exception from exc (outermost) to the
originating raise (innermost), flagging node_exception carriers, and
derives the single category / message the chain represents (the outermost
non-carrier categorized link).
default_classifier ¶
Default classifier; purely category-based, ignores state.
Returns True if either the exception itself or its __cause__
carries a category attribute matching TRANSIENT_CATEGORIES.
The cause-walking covers the common case of a graph-engine
NodeException wrapping an llm-provider transient: a
node_exception whose __cause__ is a transient category
classifies as transient.
The _state parameter is ignored by the default; the leading
underscore is the canonical Python convention for "intentionally
unused" while keeping the signature stable for user-supplied
state-aware classifiers.
deterministic_backoff ¶
Constant-N seconds backoff factory, for deterministic testing.
The conformance fixtures use this form via backoff: {type:
deterministic, seconds: N} so retry timing is reproducible across
runs.
exponential_jitter_backoff ¶
Default backoff: random.uniform(0, min(cap, base * 2**attempt)).
Jitter is mandatory; fixed exponential backoff causes
synchronized retries from many concurrent callers, amplifying
rate-limit storms. base and cap are configurable; the
defaults are 1.0 and 30.0 seconds.