Skip to content

Fan-out

Run the same subgraph many times in parallel, each instance receiving a different input, results merged back deterministically.

The "same subgraph at two-or-three call sites" pattern from ExplicitMapping handles cases where you know the parent fields up front. Fan-out handles N call sites where N is determined at runtime: "for each item in state.urls, run the scraping subgraph; collect the results."

Two modes: per-item or per-count

A fan-out can dispatch instances driven by a list in state (items_field mode) or by a count resolved from state (count mode).

items_field mode: one instance per item in a parent list field:

builder.add_fan_out_node(
    "scrape_all",
    subgraph=scrape_subgraph,        # CompiledGraph[ScrapeState]
    items_field="urls",              # parent list field, one instance per item
    item_field="url",                # subgraph field that receives each item
    collect_field="content",         # subgraph field whose value is collected
    target_field="contents",         # parent list field that receives the collection
    concurrency=4,
    error_policy="fail_fast",        # or "collect"
    on_empty="raise",                # or "noop"
)

count mode: fixed-or-dynamic instance count, no list field:

builder.add_fan_out_node(
    "sample",
    subgraph=sample_subgraph,
    count=8,                          # int or callable: state -> int
    collect_field="reading",
    target_field="readings",
    concurrency=4,
)

Both count and concurrency accept a callable that takes the pre-fan-out parent state and returns an int (None for concurrency means unbounded). That lets you size the dispatch from state at run time.

Per-instance state, inputs and outputs

Each instance gets its own subgraph state, distinct from siblings, distinct from the parent. By default the instance receives only:

  • the dispatched item in the field named by item_field (in items_field mode); and
  • the parent-field-name-mapped values declared in inputs.

inputs is a Mapping[subgraph_field, parent_field]. The subgraph fields not named in inputs (and not item_field) take their schema defaults; same closed-by-default-on-the-way-in posture as the explicit-projection story for ordinary subgraphs.

On exit, each instance's collect_field value becomes one element of the parent's target_field list, in instance-index order. To collect additional per-instance fields, declare extra_outputs: Mapping[parent_field, subgraph_field]; each becomes its own parent list of the same length, instance-index-aligned.

Error policy

Two values:

  • "fail_fast" (default): the first instance failure cancels the in-flight siblings (asyncio.gather semantics) and propagates as a NodeException wrapping the failing instance's cause, with recoverable_state set to the parent's pre-fan-out snapshot. Use this when one bad result invalidates the rest.
  • "collect": instance failures are captured; the fan-out runs to completion. Failed instances contribute nothing to target_field. If you declare errors_field on the config, each failed instance produces a record ({"fan_out_index": str(idx), "category": str}) appended to that parent list field.

Choose by whether partial results are useful.

What ends up in the parent

After the fan-out completes, the parent receives a partial update containing:

  • target_field: list of collect_field values, instance-index order.
  • Each parent name in extra_outputs: list of values from the named subgraph field, instance-index order.
  • count_field (if configured): the instance count.
  • errors_field (if configured, "collect" policy only): per-instance error records.
  • on_empty="noop" for an empty items_field → all the above with empty lists; count_field set to 0.

Choosing the target_field reducer

The engine writes target_field as a list with one entry per successful instance: [instance_0_value, instance_1_value, …]. The reducer you declare on the parent field decides how that list folds into prior state:

  • Each instance emits a single value (collect_field: X) → declare append on Annotated[list[X], append]. Each instance's value is already an X; append concatenates cleanly.
  • Each instance emits a list[X] (0..N records per instance) → the engine lands list[list[X]]. Declare concat_flatten instead; it flattens one level so the parent field stays list[X]. Plain append would leave the nesting and fail Pydantic validation.
  • Each instance emits a dict[str, X] → the engine lands list[dict]. Declare merge_all, which folds the mappings into the parent dict with last-write-wins per key. Plain merge can't consume a list[dict].

concat_flatten and merge_all are strict: they raise ReducerError if an update element isn't the expected list/mapping shape. See state and reducers.

Empty fan-outs

If items_field is set and the parent list is empty (or count resolves to 0):

  • on_empty="raise" (default): raises FanOutEmpty (a runtime error category).
  • on_empty="noop": emits an empty partial (no instances dispatched, no errors).

Observability per instance

The fan-out node's own started / completed events carry a fan_out_config payload populated from the resolved item_count / concurrency / error_policy / parent_node_name.

Per-instance events have fan_out_index = N (0-based) and a namespace whose final element is the fan-out node's name; instances do NOT contribute a separate synthetic namespace element. Backends disambiguate per-instance spans using fan_out_index alongside the namespace.

Resume semantics

A fan-out node's completed event triggers a save like any other outermost-graph or subgraph-internal node. Per-instance internal events also save, and the resume contract is per-instance: the engine consults the saved record's fan_out_progress entry for this fan-out and treats each instance as one of three states:

  • completed: the instance ran to completion in the prior run and recorded its contribution into the accumulator. The engine skips re-execution on resume; the contribution rolls forward to the fan-in step.
  • in_flight: the instance began execution but its terminal inner node had not yet fired completed at save time, so no contribution was recorded. On resume the engine re-runs the instance from the subgraph's declared entry node. completed_inner_positions on the saved record are observational only; they do NOT serve as a per-inner-node resume point.
  • not_started: the instance was not dispatched at save time. On resume the engine dispatches it normally.

The append reducer's no-double-merge guarantee holds because completed is a one-shot accumulator state: every completed instance's contribution rolls forward exactly once at fan-in.

Under error_policy: collect, a failed instance's error record IS a completed contribution (the error rolls forward through the errors_field bucket rather than target_field). Under error_policy: fail_fast, a failed instance leaves the saved record with that instance in in_flight state; cancelled siblings are in_flight or not_started. None are completed, so resume re-runs them all.

Per-instance saves can be high-volume in fan-outs with many instances or many inner nodes per instance. Checkpointer backends MAY opt into configurable batching scoped to fan-out instance internal saves; outermost-graph, subgraph-internal, and the fan-out node's own completion save remain synchronous. The in-memory backend exposes the knob via:

from openarmature.checkpoint import (
    InMemoryCheckpointer,
    FanOutInternalSaveBatching,
)

cp = InMemoryCheckpointer(
    fan_out_internal_save_batching=FanOutInternalSaveBatching(flush_every=10),
)

Buffered-but-unflushed saves are lost on crash by design: instances whose completed state was only buffered revert to in_flight / not_started on resume and re-run. The trade-off is explicit (fewer writes per fan-out instance vs some redundant re-execution on crash recovery); default is no batching.

When to reach for fan-out

The signal: N similar pieces of work, N depends on state at runtime (not at build time), the work is independent enough to run concurrently. If N is known at build time and small (≤3), ExplicitMapping at multiple subgraph sites is simpler. If the work isn't independent (instance 2 needs instance 1's output), that's a linear pipeline, not fan-out.

What fan-out is NOT

  • Not a map-reduce. No reduce phase beyond the parent's reducers. If you need a real reduce, do it in a node after the fan-out.
  • Not a queue. All instances dispatch within a single invocation; the engine doesn't persist them.
  • Not retry. If an instance fails and you want a retry, wrap the subgraph (or individual nodes inside it) with retry middleware. The fan-out's error_policy is a fan-in-collection decision, not a recovery one.