Observability¶
Two complementary patterns:
- The
tracefield pattern: a typed list inside state that nodes append to. State-shaped history, accessible from inside the graph, visible in the final state. Falls out of existing primitives. Covered in State and reducers. - Observer hooks: out-of-band events delivered to external code,
with full pre/post state snapshots, error context, and visibility
across subgraph boundaries. The control-side equivalent of the
data-side
tracefield. This page.
The two are complementary, not redundant. trace is what state itself
remembers. Observers are what external code sees as state changes.
An observer is an async callable¶
from openarmature.graph import NodeEvent
async def my_observer(event: NodeEvent) -> None:
print(event.phase, event.step, event.namespace, event.node_name)
The matching Protocol is Observer:
from openarmature.graph import Observer
class StructuredLogger:
async def __call__(self, event: NodeEvent) -> None: ...
_: Observer = StructuredLogger() # structural conformance check
Two registration modes¶
Graph-attached: fires on every invocation until removed:
compiled = builder.compile()
handle = compiled.attach_observer(my_observer)
# ...later
handle.remove() # idempotent
Changes to the registered set during a graph run don't take effect
until the next invocation. The in-flight observer set is fixed at
invoke() time.
Invocation-scoped: fires only for one specific run:
Common pattern: graph-attached for global concerns (Sentry, metrics, structured tracing); invocation-scoped for per-request concerns (a request-ID closure, a per-call snapshot ring).
The NodeEvent shape¶
@dataclass(frozen=True)
class NodeEvent:
node_name: str
namespace: tuple[str, ...]
step: int
phase: Literal["started", "completed", "checkpoint_saved"]
pre_state: State
post_state: State | None
error: RuntimeGraphError | None
parent_states: tuple[State, ...]
attempt_index: int = 0
fan_out_index: int | None = None
fan_out_config: FanOutEventConfig | None = None
branch_name: str | None = None
A walk-through:
phase: every node attempt produces astarted/completedpair. The pair sharesstepandpre_state.startedfires before the node body runs;completedfires after the reducer merge succeeds and the outgoing edge has been evaluated. A successful pair populatespost_stateoncompleted; a failed pair populateserroroncompleted.startedevents have neitherpost_statenorerrorpopulated.
checkpoint_saved is an additional optional phase: when a
Checkpointer is attached, the engine emits one per successful save
(post-completed, immediately after the save resolves).
Default observer subscriptions don't include checkpoint_saved;
opt in via phases={"checkpoint_saved"} when registering (or
phases=KNOWN_PHASES, exported from openarmature.graph, to
subscribe to every phase including checkpoint_saved).
-
node_name: the node's local name in its immediate containing graph. For nested subgraphs, the inner name, NOT a qualified path. -
namespace: the qualified path of containing-graph node names -
the current node's name, outermost-first. For a top-level node:
(node_name,). For a subgraph-internal node:(outer_subgraph_node_name, inner_name). A tuple of strings; the framework keeps it as a tuple at the API boundary rather than joining with a delimiter, so node names can contain any characters without parsing ambiguity. -
step: monotonic counter starting at 0, scoped to one outermost invocation. Subgraph-internal nodes increment the same counter; subgraph events interleave with outer events. Thestarted/completedpair for one attempt share the same step. -
pre_state/post_state: state the node received vs. state after the reducer merge. Shape varies with namespace: for a subgraph-internal node, both are subgraph-state instances, not the outer state. -
error: the wrapped runtime error oncompletedevents that failed.event.error.categorygives the canonical error category;event.error.__cause__gives the original exception. Edge / routing errors land here too; see Routing errors and the completed event below. -
parent_states: one snapshot per containing graph, outermost first. Empty tuple for outermost-graph events. Invariant:len(parent_states) == len(namespace) - 1. -
attempt_index: 0-based retry attempt counter.0for nodes not wrapped by retry middleware;1+for retries. Retry middleware may wrap transitively. A retry on a parallel-branches branch or fan-outinstance_middlewarere-runs the whole subgraph; events from inner nodes carry the wrapping retry's attempt counter. -
fan_out_index: 0-based per-instance index for events inside a fan-out instance;Noneoutside. -
fan_out_config: populated onstarted/completedevents for the fan-out node itself, carrying the resolveditem_count/concurrency/error_policy/parent_node_name.Noneon every other event. -
branch_name: populated on events from nodes inside a parallel-branches branch, carrying the branch's name as declared on the dispatcher.Noneoutside. Independent offan_out_index; both may be present simultaneously when a parallel-branches branch contains a fan-out (or a fan-out instance contains a parallel-branches node). The combination(namespace, branch_name, fan_out_index, attempt_index, phase)uniquely identifies each event source. On the OTel mapping side, anopenarmature.node.branch_namespan attribute is added in parallel to the existingopenarmature.node.fan_out_index.
Routing errors and the completed event¶
When a conditional edge raises or returns an invalid target:
- The preceding node runs and its body returns successfully.
- The reducer merge succeeds.
- The engine evaluates the outgoing edge.
- The edge fn raises (
EdgeException) OR returns something that isn't a declared node name orEND(RoutingError). - The engine populates that error into the preceding node's
completedevent and dispatches it, sharing the started/completed pair rather than synthesising a new event.
So edge / routing errors do land on a NodeEvent, on the
preceding node's completed event, with error populated and
post_state left None. Observers see the failure attributed to the
right node without a synthetic event.
Subgraph events bubble up¶
A subgraph-attached observer sees its own internal node events whenever the subgraph runs, directly OR as a subgraph inside a parent. The parent's observers ALSO see those internal events.
Delivery order for an event from a subgraph-internal node:
Within each level, registration order. The subgraph-as-node wrapper itself does not generate its own event; it's transparent to observers.
Serial delivery¶
Observers receive events serially within a single outermost invocation:
- No two observers receive the same event concurrently.
- No observer sees event N+1 until every observer has finished N.
Why not parallel? Two reasons. Parallel observers' output
interleaves nondeterministically (log readers can't reconstruct
ordering), and multi-observer error semantics get fiddly
(first-error-wins? collected exceptions?). Serial keeps per-run
output deterministic and error handling trivial. If a single observer
needs internal parallelism it can asyncio.gather itself.
A slow observer holds back delivery of subsequent events to siblings. Two responses: keep the slow exporter as one observer (it serializes naturally), or push events to an internal queue and return fast.
Async-from-graph delivery + drain()¶
The graph's execution loop dispatches events onto a per-invocation queue and does not await observer processing. Event dispatch is constant-time from the graph's perspective; observers can't slow node execution down.
This means await compiled.invoke(...) returns when the graph
reaches END (or raises), regardless of whether the observer queue has
finished. For long-running services that's fine. For short-lived
processes (scripts, serverless, CLIs), events dispatched late in the
run may not be delivered before the process exits.
drain() waits until every dispatched event has been delivered and
returns a DrainSummary reporting the outcome:
final = await compiled.invoke(initial)
summary = await compiled.drain()
# DrainSummary(undelivered_count=0, timeout_reached=False)
- Per-graph, not per-invoke. Drain awaits all prior invocations' queues.
- Snapshot at call time. Events from invocations started concurrently
with
drain()may or may not be included. - Subgraph events are part of the parent. A parent drain covers every subgraph event from any of its invocations; no need to drain each subgraph separately.
If you forget drain() in a CLI, the symptom is an empty trace file
or missing log entries.
Bounded drain (optional timeout)¶
drain() accepts an optional timeout parameter (non-negative
seconds): await compiled.drain(timeout=5.0) bounds the wait at five
seconds. When the deadline fires, in-flight workers are cancelled
cleanly so the compiled graph stays usable for subsequent invocations;
partial delivery state from one drain does NOT leak into the next.
The returned DrainSummary carries:
timeout_reached: bool:Trueonly when the timeout actually fired. A drain that finishes before the deadline reportsFalse.undelivered_count: int: events dispatched but not fully delivered to every subscribed observer before the deadline. Always0whentimeout_reached is False.
Observers should be cancellation-safe (idempotent writes,
try/finally cleanup) so that interruption by drain timeout does not
leave partial side effects in an inconsistent state.
When to set a timeout: short-lived processes (CLIs, scripts, serverless functions) where a misbehaving observer holding drain indefinitely would stall process exit. Long-running services that control their own lifecycle can leave the timeout off and let drain wait for natural completion.
Error isolation¶
An observer that raises:
- Does NOT propagate its exception to
invoke()'s caller. - Does NOT prevent other observers from receiving the same event.
- Does NOT prevent any observer from receiving subsequent events.
Failures are reported via warnings.warn (Python's channel for
non-fatal anomalies). A bad observer can't take down the system that's
calling it. The graph run is the source of truth; observability is a
side concern.
correlation_id is a separate join key¶
Two identifiers travel with every invocation:
invocation_id: unique perinvoke()call. Identifies this run. Surfaced onCheckpointRecord.invocation_id, observer span attributes, log records.correlation_id: a cross-system identifier propagated viaContextVar. Multiple invocations related by a higher-level request (e.g., a parent run that spawns a subgraph via directawait sub.invoke(...), or a user-request that drives several related graph runs) can share onecorrelation_idwhile each having its owninvocation_id.
correlation_id is the load-bearing join key in the multi-backend
scenario: a Langfuse trace, an OTel trace, and a structured log all
end up with the same correlation_id even
though their invocation_ids differ. It's exported from the
openarmature.observability package as current_correlation_id /
current_invocation_id (and friends) for code that needs to thread
the IDs explicitly.
Caller-supplied invocation metadata¶
correlation_id is one string; if you also need to attach
business-domain identifiers (tenant IDs, request IDs, feature
flags, A/B cohort labels), pass them as a structured mapping at
invoke() time:
await compiled.invoke(
initial_state,
metadata={
"tenantId": "acme-corp",
"requestId": "req-12345",
"featureFlag": "v2-canary",
"seatCount": 42,
},
)
Every observability backend picks the entries up:
- OTel emits each entry as an
openarmature.user.<key>cross-cutting span attribute on every span: invocation, node, subgraph wrapper, fan-out instance, LLM provider, retry attempt. Backends that consume OTel attributes (Phoenix / Arize, Honeycomb, Datadog APM, HyperDX, Grafana Tempo, custom collectors) see them uniformly without per-backend wiring. - Langfuse merges each entry as a top-level key into
trace.metadataAND into everyobservation.metadata. The Langfuse UI filters onmetadata.<key>directly, so dashboard queries like "show me all traces fortenantId == acme-corp" work without any custom dashboard config.
Validation runs at the invoke() boundary before any work begins.
Two rules:
- Keys MUST NOT start with
openarmature.orgen_ai.(reserved for spec-normative attribute namespaces; collisions would silently overwrite OA-emitted state). - Values MUST be OTel-attribute-compatible scalars (
str,int,float,bool) or homogeneous arrays of those types.None, nested objects, and mixed-type arrays are rejected.
Violations raise ValueError synchronously: no spans emitted, no
work runs.
Adding entries mid-invocation¶
From inside a node body, middleware, or observer, augment the in-scope metadata via the public helper:
from openarmature.observability import set_invocation_metadata
async def evaluate_product(state: PipelineState) -> dict[str, Any]:
set_invocation_metadata(productId=state.product_id, productCategory=state.category)
# Spans emitted AFTER this call carry productId + productCategory
# in addition to whatever the original invoke() metadata supplied.
response = await provider.complete(messages)
return {"score": parse_score(response.message.content)}
Spans already closed are NOT retroactively updated. Spans emitted
after the call (the current node's completed event, the next
node's started, any LLM call inside) pick up the new entries.
Per-async-context scoping. The metadata mapping lives in a
ContextVar, which Python copies on async-task creation. Fan-out
instances and parallel-branches each receive their own copy at
dispatch time; an instance that calls set_invocation_metadata
does NOT leak its augmentation to sibling instances. This is the
canonical pattern for per-instance identifiers:
# Each fan-out instance adds its own productId; siblings stay clean
async def evaluate_product(state: ProductState) -> dict[str, Any]:
set_invocation_metadata(productId=state.product_id)
return await score_product(state)
Augmentation within the parent context (before fan-out dispatch, or
in code that runs serially) flows forward to subsequent spans in
that context, per normal ContextVar semantics.
Reading the in-scope metadata¶
openarmature.observability.get_invocation_metadata() returns an
immutable MappingProxyType snapshot of the entries visible in the
current async context's view, or an empty mapping outside any active
invocation. The read is per-attempt scoped under retry middleware:
values written in a prior failed attempt are not visible. Reads do
NOT emit a metadata-augmentation event; the augmentation event
signals mutations to backends, not consumer reads.
The existing current_invocation_metadata() is a stable alias
pointing at the same function; both names live in __all__. Pick
whichever reads naturally at the call site — get_/set_ for
symmetry, current_ for "the current value of the contextvar".
Three call-site categories:
- Observers and capability code (LLM provider span hook, Langfuse observer, OTel observer) read this to surface the entries on backend-specific records.
- Downstream pipeline nodes read the entries an earlier node
wrote. A common shape: an upstream
classifynode callsset_invocation_metadata(audit_kind="fraud"); a terminalpersistnode callsget_invocation_metadata()to read the audit kind without round-tripping the field through State. - Outside an invocation the read returns the empty mapping
silently. Library functions that may be called both inside and
outside an invocation can branch on
bool(get_invocation_metadata())without special-casing.
The read inherits the per-async-context scoping from set exactly:
fan-out instance writes are isolated to the instance's copy and are
NOT visible after the join. Implementations MUST NOT layer a
separate global aggregator structure to make sibling-instance writes
visible across the join — the read surface mirrors the write
surface's scoping.
Queryable observer pattern¶
The Observer protocol is intentionally minimal: a single async
callable receiving the event union. Concrete observer types MAY
expose additional read methods on the instance — pipeline nodes
hold a reference to the observer they attached and consume those
methods at runtime.
This is the queryable observer pattern: a convention for letting an observer carry derived state across the event stream (per-node token rollups, per-node latency summaries, per-node error counts) that downstream pipeline nodes consume at the end of an invocation or at specific summary points.
The pattern is convention, not protocol. The Observer surface's
single async-callable shape is unchanged; the read methods live on
the concrete observer type, not on the abstract protocol.
Read-method contract¶
Read methods on a queryable observer MUST be:
- Query-only. No graph state mutation; observers MUST NOT modify pipeline state (the graph engine owns it exclusively).
- No routing side effects. The read MUST NOT influence edge resolution, conditional branching, or node dispatch.
- No observer-side emission. Read methods MUST NOT emit events
to other observers, directly or indirectly. The observer's role
in the event stream is event consumption (via the
Observer.__call__surface); cross-observer notification would create ordering dependencies the spec does not establish. - Non-blocking from the event-loop perspective. Read methods SHOULD be local-state accesses (synchronous reads against in-memory data the observer accumulated). I/O-backed reads are not forbidden, but the concrete observer accepts responsibility for the latency envelope and SHOULD document expectations.
Queryable observers are a read-augmenting convenience for patterns where pipeline computation depends on cross-cutting data derived from event emissions. They are NOT a replacement for State — see Three-channel data-access guidance below.
Async-safety¶
Read methods MAY race with concurrent event emission to the same observer. Concrete implementations MUST ensure their internal state is read-consistent — a read MUST NOT return a torn or partially-mutated view (no half-updated dictionaries, no inconsistent counter pairs) — but they MUST NOT guarantee that a read sees all events emitted up to a particular point in wall-clock time.
A consumer that needs post-completion stability (e.g., a final-summary node that wants to read after every event for the invocation has been delivered) MUST gate the read on observing the invocation's completion signal. The strictly-serial observer delivery queue guarantees prior events are delivered before the invocation's terminal event reaches the observer — gating on the completion signal is the spec-mandated synchronization point.
Concrete observers MAY offer stricter guarantees (e.g., a
get_stable_total() accessor that blocks until completion); the
floor is read-consistency.
Three-channel data-access guidance¶
Pipelines have three distinct read surfaces for data accumulated across an invocation. Use the right one for the use case:
| Channel | Shape | Use when |
|---|---|---|
| State | Typed schema with declared reducers; participates in graph routing; survives checkpoint / resume; canonical mutable data plane | Pipeline computation data; data the next node's behavior depends on; data that needs to round-trip through reducers; data that needs to survive a crash |
| Invocation metadata | Untyped per-invocation key/value channel; cross-cutting attribution; per-async-context scoped | Span / trace attributes; user / request IDs; audit context; values that don't belong in the typed schema; cross-cutting attribution consumed by one end-of-invocation node |
| Queryable observer accumulator | Derived summary state on a concrete observer instance; queried via read methods at runtime | Per-node summaries derived from event emissions (usage tokens per node, latency per node, retry count per node); when adding the summary as a State field would force reducer-shape pollution |
Default: prefer State. State is the canonical mutable data channel for pipeline computation. Invocation metadata and queryable observer accumulators are narrow carve-outs.
Invocation metadata is the right answer when the data is cross-cutting attribution (user, request, audit context), adding it as a State field would be schema pollution, the data doesn't need reducer semantics, and the data doesn't survive across invocations.
Queryable observer accumulator is the right answer when the data is a derived summary (counts, sums, ratios) over event emissions (not raw input), adding the summary as a State field would force schema pollution (incompatible reducer shapes, fan-out vs non-fan-out asymmetry), AND the consuming node is downstream of the event emissions it needs to read.
The three channels are independent — a real pipeline may use all
three. A persist node at the end of an invocation might read its
canonical computation results from State, its user attribution from
invocation metadata, and its per-LLM-call token rollup from a
queryable accumulator.
Lifecycle¶
The lifecycle rules below apply only to queryable observers that accumulate per-invocation state (e.g., per-node-summary accumulators). Observers that expose query methods over non-accumulated data (e.g., a pass-through inspector that returns the latest event seen) are not subject to these rules.
Accumulating queryable observers MUST NOT auto-drop accumulated
state on the invocation's completion signal — an end-of-invocation
reader (typically a persist or summary node running as the final
invocation step) legitimately needs to read the bucket BEFORE the
invocation completes; auto-drop on the completion signal would race
against the read.
Concrete accumulating observers MUST provide an explicit drop /
cleanup mechanism — typically drop(invocation_id) — that
releases the accumulated state for a given invocation. The consuming
node calls drop after reading.
Long-lived accumulators (an observer that survives across many
invocations) accumulate buckets per invocation_id until explicitly
dropped — a feature for session-scoped accumulators surviving across
resumes; a cost in memory pressure if drops are missed. The spec does
NOT mandate a maximum retention policy; concrete observers MAY offer
LRU eviction or TTL-based cleanup on top.
Synchronization with the deliver loop¶
A subtle race: the strictly-serial observer-delivery queue may still hold not-yet-dispatched events for the in-flight invocation at the moment a terminal node reads the accumulator. The accumulator's view in that moment can be one event behind reality, and a downstream read can miss the most-recent contribution.
For accumulators where this matters (token rollups consumed by a
persist node, latency summaries written to a canonical JSON
artifact), gate the read on the per-invocation drain primitive
CompiledGraph.drain_events_for(invocation_id, *, timeout). The
canonical two-step shape is drain, then read, then drop — read via
the accumulator's documented query method (e.g. get_bucket), then
release the bucket via the §9.4 drop discipline:
async def persist(state: PipelineState) -> Mapping[str, Any]:
# current_invocation_id() returns the engine-minted (or
# caller-supplied) id of the active invocation; never None
# inside a node body.
invocation_id = current_invocation_id()
# 1. Wait for every event under this invocation_id to dispatch
# to every attached observer; bounded by the timeout.
await graph.drain_events_for(invocation_id, timeout=2.0)
# 2. Read the bucket — the accumulator's view now reflects the
# full event stream for this invocation.
usage_records = accumulator.get_bucket(invocation_id)
# 3. Release the bucket per §9.4. Skip this step only if the
# accumulator is intentionally session-scoped across resumes.
accumulator.drop(invocation_id)
# ...
drain_events_for is symmetric with the existing process-wide
graph.drain() but scoped to one invocation. Returns the same
DrainSummary shape with the same timeout discipline, but with one
load-bearing divergence: a per-invocation drain timeout MUST NOT
cancel the delivery worker. graph.drain() cancels because it is a
shutdown primitive; per-invocation drain is an in-flight
synchronization primitive, so the graph stays available to serve
other invocations after the timeout fires, and the deliver loop
keeps processing the queue. The default timeout is 5.0 seconds;
pass None to wait indefinitely, or 0.0 for a non-blocking check.
OpenTelemetry mapping (opt-in)¶
Install with the [otel] extra:
OTelObserver maps node events to OTel spans + structured log
correlation:
- Each node
started/completedpair becomes one span. - Subgraph hierarchy is reflected in span parent-child structure.
- Spec error categories map to OTel
Status.ERRORwith semantic attributes. - Log records emitted during node execution carry the active span's
trace_id/span_idplus anopenarmature.correlation_idattribute, so the join key survives the OTel boundary.
TracerProvider isolation¶
OTelObserver constructs a private TracerProvider from the
processor you supply. It never registers globally and never reads
get_tracer_provider(). This isolation is intentional.
The motivation is concrete: many production stacks already register a
global TracerProvider (Langfuse v3's OpenInference integration is
the recurring example) for their own instrumentation. If openarmature
piggybacked on the global provider, every span the engine emits would
also flow to those other backends, doubling exports, corrupting
hierarchies, and tying openarmature's lifecycle to whichever
unrelated library happened to register first. Isolation prevents
that; the observer's spans only flow through the processor you handed
it.
Detached trace mode¶
Some subgraphs or fan-outs are better as their own root trace than as descendants of the parent's span tree: long-running asynchronous work, retries that would balloon a parent span, or work that gets reported to a different backend.
Configure detachment on the observer:
obs = OTelObserver(
processor=processor,
detached_subgraphs=frozenset({"long_async_step"}),
detached_fan_outs=frozenset({"daily_batch"}),
)
A detached subgraph or fan-out renders into a fresh trace, rooted in
its own openarmature.invocation span that carries the same
invocation_id as the parent (detached mode is an observer-side
rendering choice, not a separate run). The new trace has a fresh
trace_id, and the correlation_id still propagates through, so
join semantics survive even when trace boundaries don't.
The non-detached default is what you want most of the time: one trace per outermost invocation, with subgraphs and fan-out instances as nested spans.
LLM provider spans¶
When an OpenAIProvider (or any custom Provider
that wires the dispatch hook) is used inside a graph with OTelObserver
attached, each provider.complete() attempt emits a dedicated span
named openarmature.llm.complete, parented under the calling node's
span. A call without retry emits one span; a call-level retry= that
retries emits one span per attempt.
Each span carries two attribute families.
openarmature.llm.* (always on). The framework's canonical
namespace: model identifier, finish reason, token counts, prompt
identity from with_active_prompt(...), error category on failure, and
openarmature.llm.attempt_index (the 0-based call-level attempt
counter). Set unconditionally whenever the LLM span itself emits.
gen_ai.* (OpenTelemetry GenAI semantic conventions, default on).
Cross-vendor attribute names every LLM-aware backend reads
(Langfuse, Phoenix, Honeycomb's LLM lens, OpenInference-aware
tools). Emitted alongside the OA namespace:
gen_ai.system:"openai"by default; override per provider instance to"vllm"/"lm_studio"/"llama_cpp"/ etc. when the OpenAI Chat Completions wire format is hitting a non-OpenAI endpoint:
provider = OpenAIProvider(
base_url="http://vllm.internal:8000",
model="meta-llama/Llama-3-8B-Instruct",
genai_system="vllm",
)
gen_ai.request.model/gen_ai.response.model: the bound model and (when the provider returns one) the more-specific identifier in the response body.gen_ai.request.temperature/max_tokens/top_p/seed/frequency_penalty/presence_penalty/stop_sequences: only emitted for fields the caller actually set; absence on the span means "not supplied," distinct from a zero value.gen_ai.usage.input_tokens/output_tokens: token counts.gen_ai.response.finish_reasons: single-element string array.gen_ai.response.id: when the provider returns one.
Disable the GenAI semconv set with OTelObserver(disable_genai_semconv=True)
when an external auto-instrumentation library (OpenInference,
opentelemetry-instrumentation-openai) is already the canonical
source on your stack.
Per-attempt spans under call-level retry¶
Call-level retry
(provider.complete(retry=...)) retries transient provider errors
inside a single call. Each attempt emits its own
openarmature.llm.complete span tagged with
openarmature.llm.attempt_index (0-based). A call that succeeds on the
first try emits one span at attempt_index 0; a call that fails twice
transiently before succeeding emits three spans (indices 0, 1, 2). Each
failed attempt's span carries ERROR status plus
openarmature.error.category; the final attempt's span carries the
terminal outcome (OK on success, ERROR on an exhausted or
non-transient failure).
openarmature.llm.attempt_index is the call-level attempt counter,
independent of the node-level attempt_index:
the former counts attempts inside one complete() call, the latter
counts node re-executions driven by retry middleware. A node retried
once by middleware, each execution calling a provider that itself
retries once, produces node attempt_index 0/1 and, within each,
call-level attempt_index 0/1.
LLM payload attributes¶
By default, LLM spans do not carry the messages sent or the
response content. Opt in with disable_provider_payload=False:
observer = OTelObserver(
span_processor=SimpleSpanProcessor(exporter),
disable_provider_payload=False,
)
This surfaces four attributes:
openarmature.llm.input.messages: JSON-encoded message array (the spec §3 message shape:{role, content, tool_calls?, …}).openarmature.llm.output.content: the assistant's response content string verbatim. Omitted for tool-call-only responses with empty content.openarmature.llm.output.tool_calls: JSON-encoded[{id, name, arguments}]array of the tool calls the model requested (the same encodingtool_callsuses insideinput.messages). This is the output-side home for the request, including the call arguments, so it is payload-gated. Emitted only when the response requests tool calls.openarmature.llm.request.extras: JSON-encodedRuntimeConfigextras bag (provider-specific pass-through fields likerepetition_penaltyfor vLLM, ortop_kfor HuggingFace endpoints). Omitted when empty.
Default-off is deliberate. The payload may contain PII the user
hasn't audited; opting in is a separate decision from opting into
observability. The flag name keeps symmetry with disable_llm_spans:
the default value (True) reads as "the observer disables payload
emission by default."
Output tool-call identity (ungated)¶
The full openarmature.llm.output.tool_calls carries the arguments, so
it is payload-gated. But which tools the model asked for (their
names and ids) is identity, not payload, the same class as
openarmature.llm.model. So three identity projections render
regardless of disable_provider_payload, surfacing the request
under the default payload-off posture and queryable without parsing
JSON:
openarmature.llm.output.tool_calls.count: the number of tool calls requested (an int, equal to the length of.names).openarmature.llm.output.tool_calls.names: the requested tool names, in request order.openarmature.llm.output.tool_calls.ids: the requestedToolCallids, index-aligned with.names(names[i]/ids[i]describe the same call), the linkage to a downstream tool execution.
The whole family (these three plus the gated full serialization) is
emitted only on a tool-calling completion. A completion that
requests no tools emits none of them; absence means "no tools
requested", distinct from count = 0.
Truncation¶
Each payload attribute is capped at payload_max_bytes UTF-8 bytes
(default 64 KiB, minimum 256). When the serialized value exceeds the
cap, the observer emits the largest UTF-8-code-point-aligned prefix
that fits within cap - len(marker) bytes followed by the marker:
where M is the pre-truncation byte length. The marker is appended outside any JSON encoding, so a truncated attribute is not parseable JSON, which is the clean signal backend code can use to detect truncation without a separate flag.
Inline image redaction (always on)¶
Image content blocks with ImageSourceInline are redacted at the
provider, before the payload reaches the observer:
{
"type": "image",
"source": {"type": "inline_redacted", "byte_count": 4096},
"media_type": "image/png",
"detail": "auto"
}
The media_type and detail fields are preserved at the image-block
level (per llm-provider §3.1.2); only source is replaced. URL-form
images pass through unchanged: the URL is a short string and is
informative for trace readers.
Redaction is not gated by disable_provider_payload and is not
configurable. Inline image bytes never leave the provider in event
form, so custom observers consuming
LlmCompletionEvent / LlmFailedEvent
cannot accidentally leak raw bytes regardless of how they're
written.
GenAI metrics (enable_metrics)¶
Spans answer "what happened on this one call"; metrics answer "what is
the token throughput and latency across all calls". The OTel observer
can emit two histogram instruments over provider calls. Opt in with
enable_metrics=True (default off):
When enabled, the observer obtains a Meter from the configured
MeterProvider. Pass meter_provider=... to use a private one;
otherwise it falls back to the OTel global, and recording is a silent
no-op when no provider is configured. The two instruments:
openarmature.gen_ai.client.token.usage(unit{token}). Per LLM completion it records two observations: the input-token count, taggedopenarmature.gen_ai.token.type="input", and the output-token count, tagged"output", sourced from the response usage record. Recorded only when the call returned usage.openarmature.gen_ai.client.operation.duration(units). The provider-call wall-clock duration, one observation per attempt. A failed attempt records too, carryingerror.type.
Both carry openarmature.gen_ai.operation ("chat"),
gen_ai.request.model, and gen_ai.system. Under call-level retry the
duration instrument records once per attempt; the token instrument
records only for attempts that returned usage.
Metrics are independent of spans. enable_metrics is orthogonal to
the disable_llm_spans / disable_provider_payload flags: you can
record metrics with spans off, or emit spans with metrics off. Both draw
from the same event stream.
The instrument names are OA-namespaced, mirroring the upstream
gen_ai.client.* instruments (still at Development status), so a future
cutover is a mechanical prefix-strip. Metrics target OTel only; there is
no Langfuse mapping.
Tool-execution observability (with_tool_call)¶
A model requests tools in its completion (the output_tool_calls above);
the caller executes them in node-body code. OpenArmature does not run,
choose, loop, or feed back tools (that orchestration stays in your graph),
but it can observe a tool execution you wrap in the with_tool_call
instrumentation scope:
from openarmature.observability import with_tool_call
async def run_tools(state: AgentState) -> dict:
with with_tool_call("get_weather", {"city": "Paris"}, tool_call_id="call_abc") as scope:
result = await get_weather(city="Paris")
scope.set_result(result)
return {"weather": result}
with_tool_call is a context manager (like with_active_prompt): you run
the tool inside it and report the outcome with scope.set_result(...). On a
clean exit it dispatches a ToolCallEvent; if the tool raises, it dispatches
a ToolCallFailedEvent and re-raises (it observes, it does not swallow, so
your node body still sees the exception). tool_call_id links the execution
back to the output_tool_calls entry that requested it, or is omitted for a
standalone instrumented function.
The events render on both backends:
- OTel: an
openarmature.tool.callspan parented under the calling node, carryingopenarmature.tool.name,openarmature.tool.call.id, and (when payload is on)openarmature.tool.call.arguments/.result. A failure sets ERROR status with the standarderror.typeattribute. - Langfuse: a dedicated
Toolobservation (not a Generation) under the node's Span observation, with the arguments / result as input / output and the tool name andtool_call_idin metadata; a failure renders at ERROR level.
The arguments and result are payload, gated by disable_provider_payload
exactly like the LLM payload attributes (default off keeps tool inputs and
outputs out of traces). disable_llm_spans does not affect tool spans. The
openarmature.tool.* attribute names mirror the upstream Development
gen_ai.tool.* surface, which OpenArmature does not emit in v1, so a future
cutover is a prefix swap.
Identifying the service: Resource¶
Pass an opentelemetry.sdk.resources.Resource to set
service.name / service.version / etc. without relying on the
OTEL_SERVICE_NAME / OTEL_RESOURCE_ATTRIBUTES environment
variables (which had to be set before OTelObserver()
construction to take effect):
from opentelemetry.sdk.resources import Resource
observer = OTelObserver(
span_processor=SimpleSpanProcessor(exporter),
resource=Resource.create({"service.name": "claims-pipeline"}),
)
Fanning out to multiple backends¶
The span_processor argument accepts either a single processor or
a sequence. Multi-destination export (HyperDX + Langfuse from one
observer) is a one-line construct:
observer = OTelObserver(
span_processor=[
BatchSpanProcessor(OTLPSpanExporter(endpoint=HYPERDX_URL)),
BatchSpanProcessor(OTLPSpanExporter(endpoint=LANGFUSE_URL)),
],
)
Every registered processor receives every span.
Adding backend-specific attributes: attribute_enrichers¶
When a backend needs attributes the framework doesn't emit
(custom langfuse.observation.* keys, Honeycomb derived fields,
etc.), the attribute_enrichers hook fires just before every
span.end() call:
def langfuse_observation_kind(span, event):
if span.name == "openarmature.llm.complete":
span.set_attribute("langfuse.observation.type", "generation")
observer = OTelObserver(
span_processor=processor,
attribute_enrichers=[langfuse_observation_kind],
)
Each enricher receives the live Span plus the NodeEvent that
triggered the close (or None on synthetic close sites: subgraph
dispatch, detached root, fan-out instance, invocation span,
shutdown drain). Setting attributes inside this hook works
correctly; doing it from a SpanProcessor.on_end callback does
not, because the framework has already called span.end() and the
OTel SDK silently drops set_attribute on ended spans.
For the openarmature.llm.complete span the close event is an
LlmRetryAttemptEvent (one per attempt) rather than a NodeEvent;
that is the per-attempt event the observer renders the LLM span from.
An enricher scoped to that span (span.name ==
"openarmature.llm.complete") can read the attempt's outcome straight
off it: event.llm_attempt_index, event.error_category,
event.usage, event.finish_reason, and so on.
Exceptions raised by an enricher are caught and warned, never propagated.
Consuming LLM events in custom observers¶
openarmature.graph.events.LlmCompletionEvent and
openarmature.graph.events.LlmFailedEvent are the two typed event
variants any Provider implementation emits around a complete()
call. Custom observers consume them via type discrimination:
from openarmature.graph.events import LlmCompletionEvent, LlmFailedEvent
async def my_llm_observer(event):
if isinstance(event, LlmCompletionEvent):
# Successful call. Read identity / scoping / outcome
# directly off the typed fields:
# event.model, event.input_messages (already image-redacted),
# event.output_content, event.request_params, event.response_id,
# event.active_prompt, event.usage, event.latency_ms, …
return
if isinstance(event, LlmFailedEvent):
# §7 category exception was raised. Same identity / scoping
# surface as the completion variant, plus three failure-
# specific fields:
# event.error_category — one of the 9 normative §7 categories
# event.error_type — vendor code or upstream class name
# event.error_message — human-readable, may be empty
return
The two variants are mutually exclusive on a single complete()
call — implementations MUST NOT emit both for the same call.
Conformance fixture 072 locks this down. The failure variant carries
the same identity + request-side fields as the completion variant,
minus the response-side fields (response_id, response_model,
usage, output_content, finish_reason) — there was no response
to record.
A custom Provider that wants observers to see the same events
dispatches LlmCompletionEvent(...) on success and
LlmFailedEvent(...) alongside the §7 category exception on failure
via current_dispatch(). See
Authoring providers for the full
pattern.
Under call-level retry the
bundled OpenAIProvider additionally dispatches a python-internal
LlmRetryAttemptEvent once per attempt; that is the event the OTel
observer renders each per-attempt span from (including the lone attempt
of a no-retry call, at index 0). The terminal LlmCompletionEvent /
LlmFailedEvent above are unchanged: still one per call, still the
stable surface for per-call consumption (token accounting, failure
tracking). An observer that only cares about per-call outcomes can
ignore LlmRetryAttemptEvent.
Legacy sentinel-namespace pattern (compatibility surface)¶
openarmature.observability.LLM_NAMESPACE and
openarmature.observability.LlmEventPayload remain in the public
API as a documented compatibility surface for custom providers and
observers that haven't migrated to typed events. The bundled
OpenAIProvider no longer emits the sentinel NodeEvent pair; the
bundled OTel and Langfuse observers no longer recognize it. If
you're writing a downstream observer that needs to interoperate
with custom providers still using the sentinel pattern, the legacy
shape is:
from openarmature.graph.events import NodeEvent
from openarmature.observability import LLM_NAMESPACE, LlmEventPayload
async def legacy_llm_observer(event):
if not isinstance(event, NodeEvent):
return
if event.namespace != LLM_NAMESPACE:
return
payload = event.pre_state
if not isinstance(payload, LlmEventPayload):
return
# payload.model, payload.input_messages, …
New code should prefer the typed-event path above.
Flushing under fast teardown¶
OTelObserver.shutdown() calls provider.shutdown() on the private
TracerProvider, which per OTel SDK contract flushes every
registered span processor. Under unusual teardown orderings (for
example, FastAPI's TestClient teardown that closes the event loop
before a BatchSpanProcessor's export thread finishes), spans can
appear dropped. Two workarounds:
- Call
observer._provider.force_flush(timeout_millis=...)explicitly beforeshutdown(). - Use
SimpleSpanProcessorinstead ofBatchSpanProcessorin tests; it exports synchronously and is unaffected by teardown timing.
Langfuse mapping (opt-in)¶
A second sibling observer maps the same NodeEvent stream onto
Langfuse's native Trace + Observation data model: Traces at the
top, Span observations for graph nodes, Generation observations for
LLM calls, and Tool observations for instrumented tool executions.
Use it instead of (or alongside) the OTel observer when
your trace UI is Langfuse and you want first-class Generation
rendering without going through Langfuse's OTLP ingest.
from openarmature.observability.langfuse import (
InMemoryLangfuseClient,
LangfuseObserver,
)
client = InMemoryLangfuseClient() # or langfuse.Langfuse(...) in prod
observer = LangfuseObserver(client=client)
graph.attach_observer(observer)
The client is anything matching the LangfuseClient Protocol:
the bundled InMemoryLangfuseClient (used by the conformance
harness, useful for unit tests), or a real langfuse.Langfuse()
instance wrapped in LangfuseSDKAdapter for production. Install
the optional extras to bring in the Langfuse SDK:
Production wire-up:
from langfuse import Langfuse
from openarmature.observability.langfuse import (
LangfuseObserver,
LangfuseSDKAdapter,
)
langfuse_client = Langfuse(
public_key="pk-lf-...",
secret_key="sk-lf-...",
host="https://cloud.langfuse.com",
)
observer = LangfuseObserver(
client=LangfuseSDKAdapter(langfuse_client),
disable_provider_payload=False,
)
The adapter bridges langfuse>=4.6's unified start_observation
API onto our LangfuseClient Protocol; the observer code is the
same in tests and production. See
examples/langfuse-observability
for a runnable demo.
Langfuse SDK version compatibility
Validated against langfuse>=4.6,<5. The v4 SDK introduced an
OTel-based architecture with start_observation /
propagate_attributes replacing the v2/v3 trace / span /
generation low-level API; the bundled LangfuseSDKAdapter
handles the bridge so the observer surface is stable across
future v4 patches.
Earlier SDK versions (v2.x, v3.x) are NOT supported. Projects on
those versions either upgrade to v4 or supply their own adapter
matching the LangfuseClient Protocol.
A runtime isinstance(adapter, LangfuseClient) check ships in
the unit suite, so if a future v4 patch breaks the Protocol's
surface, the test fails loudly.
What Langfuse sees¶
- Trace ID = invocation ID. The Trace's
idis the OAinvocation_idverbatim, so cross-system lookup by invocation_id finds the Langfuse Trace directly (spec §8.4.1). - Trace name. Defaults to the entry-node name (spec §8.6 fallback). Caller-supplied invocation labels land in PR 4 (proposal 0034).
- Session / user grouping (
trace.sessionId/trace.userId). The observer populates the two cross-trace grouping fields behind Langfuse's Sessions and Users dashboards (spec §8.4.1, proposal 0064).trace.userIdis promoted from a recognizeduserIdkey in the caller-supplied invocation metadata, automatically and additively (the key also stays attrace.metadata.userId); an absent key leaves it unset.trace.sessionIdis sourced fromopenarmature.session_id(the sessions capability), which is not yet implemented, so it is unset for now. There is no OTel equivalent (an OTel trace has no trace-level session / user field); the same identity already rides asopenarmature.session_idand theopenarmature.user.*family on the OTel span side. - Per-observation metadata. Each Span / Generation carries
namespace,step,attempt_index, optionalfan_out_index/branch_name, and thecorrelation_idcross-cutting join key (spec §8.5). - Generation fields. LLM calls become Generation observations
with
model,model_parameters(thegen_ai.request.*request parameters lifted by inclusion per §8.4.3),usage(input / output / total tokens), andmetadata.finish_reason/system/response_model/response_id.
Payload + truncation¶
disable_provider_payload mirrors the OTel observer's flag and defaults
to True for the same privacy reason. Flip to False to populate
generation.input / output / metadata.request_extras from the
LLM event payload.
observer = LangfuseObserver(
client=client,
disable_provider_payload=False,
payload_byte_cap=65536,
)
When a payload exceeds payload_byte_cap, the observer emits the
serialized form with the §5.5.5 truncation marker
(…[truncated, M bytes total]) verbatim as a raw string instead of
parsing back to native shape. The unparseable JSON IS the
truncation signal in the Langfuse UI.
Prompt linkage¶
When a Prompt's source backend exposes a Langfuse Prompt entity
reference under Prompt.observability_entities['langfuse_prompt'],
the Generation observation links to that entity natively (spec
§8.4.4 case 1). Backends that don't surface a Langfuse reference
(filesystem, in-memory, etc.) leave the Generation with
metadata.prompt populated but no entity link (case 2).
Composition with OTel¶
The two observers are independent §6 event consumers and can be
attached together. They share the correlation_id as the
cross-backend join key: find a slow Generation in Langfuse, search
for its correlation_id in OTel logs, see the surrounding
infrastructure activity.
otel_observer = OTelObserver(span_processor=...)
langfuse_observer = LangfuseObserver(client=langfuse_client)
graph.attach_observer(otel_observer)
graph.attach_observer(langfuse_observer)
Each observer's disable_llm_spans / disable_provider_payload flag is
independent; one MAY emit while the other suppresses.