Fan-out¶
Run the same subgraph many times in parallel, each instance receiving a different input, results merged back deterministically.
The "same subgraph at two-or-three call sites" pattern from
ExplicitMapping
handles cases where you know the parent fields up front. Fan-out
handles N call sites where N is determined at runtime: "for each
item in state.urls, run the scraping subgraph; collect the
results."
Two modes: per-item or per-count¶
A fan-out can dispatch instances driven by a list in state
(items_field mode) or by a count resolved from state (count mode).
items_field mode: one instance per item in a parent list field:
builder.add_fan_out_node(
"scrape_all",
subgraph=scrape_subgraph, # CompiledGraph[ScrapeState]
items_field="urls", # parent list field, one instance per item
item_field="url", # subgraph field that receives each item
collect_field="content", # subgraph field whose value is collected
target_field="contents", # parent list field that receives the collection
concurrency=4,
error_policy="fail_fast", # or "collect"
on_empty="raise", # or "noop"
)
count mode: fixed-or-dynamic instance count, no list field:
builder.add_fan_out_node(
"sample",
subgraph=sample_subgraph,
count=8, # int or callable: state -> int
collect_field="reading",
target_field="readings",
concurrency=4,
)
Both count and concurrency accept a callable that takes the
pre-fan-out parent state and returns an int (None for concurrency
means unbounded). That lets you size the dispatch from state at run
time.
Per-instance state, inputs and outputs¶
Each instance gets its own subgraph state, distinct from siblings, distinct from the parent. By default the instance receives only:
- the dispatched item in the field named by
item_field(initems_fieldmode); and - the parent-field-name-mapped values declared in
inputs.
inputs is a Mapping[subgraph_field, parent_field]. The subgraph
fields not named in inputs (and not item_field) take their
schema defaults; same closed-by-default-on-the-way-in posture as
the explicit-projection story for ordinary subgraphs.
On exit, each instance's collect_field value becomes one element
of the parent's target_field list, in instance-index order. To
collect additional per-instance fields, declare
extra_outputs: Mapping[parent_field, subgraph_field]; each becomes
its own parent list of the same length, instance-index-aligned.
Error policy¶
Two values:
"fail_fast"(default): the first instance failure cancels the in-flight siblings (asyncio.gathersemantics) and propagates as aNodeExceptionwrapping the failing instance's cause, withrecoverable_stateset to the parent's pre-fan-out snapshot. Use this when one bad result invalidates the rest."collect": instance failures are captured; the fan-out runs to completion. Failed instances contribute nothing totarget_field. If you declareerrors_fieldon the config, each failed instance produces a record ({"fan_out_index": str(idx), "category": str}) appended to that parent list field.
Choose by whether partial results are useful.
What ends up in the parent¶
After the fan-out completes, the parent receives a partial update containing:
target_field: list ofcollect_fieldvalues, instance-index order.- Each parent name in
extra_outputs: list of values from the named subgraph field, instance-index order. count_field(if configured): the instance count.errors_field(if configured,"collect"policy only): per-instance error records.on_empty="noop"for an empty items_field → all the above with empty lists;count_fieldset to 0.
Choosing the target_field reducer¶
The engine writes target_field as a list with one entry per
successful instance: [instance_0_value, instance_1_value, …]. The
reducer you declare on the parent field decides how that list folds
into prior state:
- Each instance emits a single value (
collect_field: X) → declareappendonAnnotated[list[X], append]. Each instance's value is already anX;appendconcatenates cleanly. - Each instance emits a
list[X](0..N records per instance) → the engine landslist[list[X]]. Declareconcat_flatteninstead; it flattens one level so the parent field stayslist[X]. Plainappendwould leave the nesting and fail Pydantic validation. - Each instance emits a
dict[str, X]→ the engine landslist[dict]. Declaremerge_all, which folds the mappings into the parent dict with last-write-wins per key. Plainmergecan't consume alist[dict].
concat_flatten and merge_all are strict: they raise
ReducerError if an update element isn't the expected list/mapping
shape. See state and reducers.
Empty fan-outs¶
If items_field is set and the parent list is empty (or count
resolves to 0):
on_empty="raise"(default): raisesFanOutEmpty(a runtime error category).on_empty="noop": emits an empty partial (no instances dispatched, no errors).
Observability per instance¶
The fan-out node's own started / completed events carry a
fan_out_config payload populated from the resolved
item_count / concurrency / error_policy / parent_node_name.
Per-instance events have fan_out_index = N (0-based) and a
namespace whose final element is the fan-out node's name; instances
do NOT contribute a separate synthetic namespace element. Backends
disambiguate per-instance spans using fan_out_index alongside the
namespace.
Resume semantics¶
A fan-out node's completed event triggers a save like any other
outermost-graph or subgraph-internal node. Per-instance internal
events also save, and the resume contract is per-instance: the
engine consults the saved record's fan_out_progress entry for
this fan-out and treats each instance as one of three states:
completed: the instance ran to completion in the prior run and recorded its contribution into the accumulator. The engine skips re-execution on resume; the contribution rolls forward to the fan-in step.in_flight: the instance began execution but its terminal inner node had not yet firedcompletedat save time, so no contribution was recorded. On resume the engine re-runs the instance from the subgraph's declared entry node.completed_inner_positionson the saved record are observational only; they do NOT serve as a per-inner-node resume point.not_started: the instance was not dispatched at save time. On resume the engine dispatches it normally.
The append reducer's no-double-merge guarantee holds because
completed is a one-shot accumulator state: every completed
instance's contribution rolls forward exactly once at fan-in.
Under error_policy: collect, a failed instance's error record IS
a completed contribution (the error rolls forward through the
errors_field bucket rather than target_field). Under
error_policy: fail_fast, a failed instance leaves the saved
record with that instance in in_flight state; cancelled siblings
are in_flight or not_started. None are completed, so resume
re-runs them all.
Per-instance saves can be high-volume in fan-outs with many
instances or many inner nodes per instance. Checkpointer backends
MAY opt into configurable batching scoped to fan-out instance
internal saves; outermost-graph, subgraph-internal, and the fan-out
node's own completion save remain synchronous. The in-memory
backend exposes the knob via:
from openarmature.checkpoint import (
InMemoryCheckpointer,
FanOutInternalSaveBatching,
)
cp = InMemoryCheckpointer(
fan_out_internal_save_batching=FanOutInternalSaveBatching(flush_every=10),
)
Buffered-but-unflushed saves are lost on crash by design:
instances whose completed state was only buffered revert to
in_flight / not_started on resume and re-run. The trade-off is
explicit (fewer writes per fan-out instance vs some redundant
re-execution on crash recovery); default is no batching.
When to reach for fan-out¶
The signal: N similar pieces of work, N depends on state at runtime
(not at build time), the work is independent enough to run
concurrently. If N is known at build time and small (≤3),
ExplicitMapping at multiple subgraph sites is simpler. If the
work isn't independent (instance 2 needs instance 1's output),
that's a linear pipeline, not fan-out.
What fan-out is NOT¶
- Not a map-reduce. No reduce phase beyond the parent's reducers. If you need a real reduce, do it in a node after the fan-out.
- Not a queue. All instances dispatch within a single invocation; the engine doesn't persist them.
- Not retry. If an instance fails and you want a retry,
wrap the subgraph (or individual nodes inside it) with retry
middleware. The fan-out's
error_policyis a fan-in-collection decision, not a recovery one.