Production observability with dual observers and timing middleware¶
A single-turn lunar-mission Q&A endpoint instrumented the way you'd
ship it: BOTH OTel and Langfuse observers attached to the same
graph, caller hooks deriving domain-shaped trace.input /
trace.output from State, the built-in TimingMiddleware
recording per-node duration, and multi-tenant caller-supplied
metadata propagating to both observers in one invoke() call.
Overview¶
One node, one LLM call, two production-grade observability backends. The pipeline takes a question, calls the LLM, returns the answer. The interesting part is the observability wiring:
OTelObserverattached with anInMemorySpanExporter(production swaps this forBatchSpanProcessor+OTLPSpanExporterpointed at HyperDX / Honeycomb / Tempo / any OTLP backend).LangfuseObserverattached with anInMemoryLangfuseClient(production swaps forLangfuseSDKAdapter(Langfuse(...))).- Both observers consume the same
NodeEventstream independently; node code never knows there are two backends. LangfuseObservercarriestrace_input_from_stateandtrace_output_from_statecaller hooks that derive domain dicts like{"question": ...}/{"answer": ..., "model": ...}from State, instead of letting the observer dump the raw State object.TimingMiddleware(canonical, fromopenarmature.graph.middleware) wraps the respond node. Anon_completeasync callback receives aTimingRecordand prints a one-line timing summary; production would queue to a metrics backend (StatsD, Prometheus pushgateway, OTLP metrics).invoke(metadata={...})carriestenantId,requestId, andfeatureFlagfrom the call site. Both observers pick them up: OTel attaches them asopenarmature.user.*span attributes, Langfuse merges them as top-leveltrace.metadatakeys.
At the end the demo prints what each backend captured so a reader sees the same logical events represented two ways.
What it teaches¶
- Two observers on one graph
(no double-export between them).
Each consumes the
NodeEventstream independently; the engine fans events out to all attached observers. Production deployments often run both: OTel for infrastructure-side correlation (logs, distributed tracing across services), Langfuse for LLM-aware generation rendering. - Caller hooks for
trace.input/trace.output(deriving domain dicts from State). Without the hooks the Langfuse observer either omits the field (disable_state_payload=Truedefault) or dumps the raw State (whendisable_state_payload=False). The hooks let you return a domain dict shaped for the Langfuse UI viewer while keeping PII the operator hasn't audited out of trace payloads. TimingMiddleware(reference). Wraps a node's execution and dispatches aTimingRecord(node_name, duration_ms, outcome, exception_category)to an async callback when the chain returns or raises. The callback fires inline before the chain's result reaches the engine; keep it fast (queue work, defer I/O).invoke(metadata={...})propagation across observers (caller metadata and reserved keys). One call site, both backends pick it up: OTel attaches each entry asopenarmature.user.<key>cross-cutting span attribute, Langfuse merges as top-leveltrace.metadatakeys plus per-observation metadata.- In-memory captures for both backends
(reference).
InMemoryLangfuseClientrecords every Trace / Observation;InMemorySpanExporterrecords every Span. Production deployments swap each for a real exporter / SDK adapter; the observer call surface doesn't change.
How to run¶
uv sync --group examples --all-extras
LLM_API_KEY=sk-... uv run python examples/production-observability/main.py
LLM_MODEL defaults to gpt-4o-mini. The pipeline is single-turn
and doesn't need vision capability.
The demo prints in three blocks: a header (the question and the caller-supplied tenant/request/feature-flag), the LLM answer, then two captured-trace summaries (OTel spans + Langfuse Trace tree).
Reading the output¶
Numbers shown below (durations, token counts, UUIDs) are illustrative and vary per run; the shape is what matters.
=== openarmature production-observability demo ===
question: What was the primary objective of Apollo 11?
tenant id: demo-acme
request id: <uuid>
feature flag:v2-canary
[timing] respond: 1234.5ms (success)
answer: The primary objective of Apollo 11 was ...
model: gpt-4o-mini-2024-07-18
--- captured OTel spans ---
[openarmature.invocation] 1240.0ms openarmature.user.tenantId='demo-acme', ...
[respond] 1235.0ms openarmature.node.name='respond', openarmature.user.tenantId='demo-acme', ...
[openarmature.llm.complete] 1200.0ms gen_ai.system='openai', gen_ai.usage.input_tokens=42, ...
--- captured Langfuse trace ---
Trace id=<uuid>
name='respond'
input={'question': 'What was the primary objective of Apollo 11?'}
output={'answer': '...', 'model': 'gpt-4o-mini-2024-07-18'}
metadata={'tenantId': 'demo-acme', 'requestId': '<uuid>', 'featureFlag': 'v2-canary', ...}
[span] 'respond'
input={'question': '...'}
output={'answer': '...', 'model': '...'}
[generation] 'openarmature.llm.complete'
input=[{'role': 'system', ...}, {'role': 'user', ...}]
output='The primary objective of Apollo 11 ...'
model='gpt-4o-mini-2024-07-18'
usage={'input_tokens': 42, 'output_tokens': 38}
[timing] respond: 1234.5ms (success): emitted by theTimingMiddlewarecallback as soon as the respond chain returns.outcomeis"success"here; aProviderRateLimitwould surface asoutcome="exception"withexception_category="provider_rate_limit".- OTel spans block: one line per captured span, sorted by
start time. The relevant attributes shown are a curated subset
for readability; the full attribute set is on each
Spanobject for any reader inspecting them programmatically. Note theopenarmature.user.*attributes appearing on every span (the cross-cutting attribute propagation frominvoke(metadata=...)). - Langfuse trace block: the same invocation as seen by the
Langfuse data model.
trace.input/trace.outputcome from the caller hooks ({"question": ...}/{"answer": ..., "model": ...}) rather than the raw State. The Observation tree shows[span]for the node and[generation]for the LLM call; production Langfuse renders these as nested cards in the UI. - Identical
correlation_id(not shown by the formatter but present in both captures' metadata): the cross-system join key. Find a slow Generation in Langfuse, grep for thecorrelation_idin OTel logs, see the surrounding infrastructure activity.
Swapping to production backends¶
# OTel: real OTLP exporter
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace.export import BatchSpanProcessor
otel_observer = OTelObserver(
span_processor=BatchSpanProcessor(
OTLPSpanExporter(
endpoint="https://your-collector/v1/traces",
headers={"authorization": os.environ["OTLP_AUTH"]},
)
),
resource=Resource.create({"service.name": "lunar-briefing"}),
)
# Langfuse: real SDK adapter
from langfuse import Langfuse
from openarmature.observability.langfuse import LangfuseSDKAdapter
langfuse_observer = LangfuseObserver(
client=LangfuseSDKAdapter(
Langfuse(
public_key="pk-lf-...",
secret_key="sk-lf-...",
host="https://cloud.langfuse.com",
)
),
trace_input_from_state=_trace_input,
trace_output_from_state=_trace_output,
disable_llm_payload=False,
)
Same observer call surface, real exporters underneath. Node and
graph code don't change. The observer-hooks example shows the
OTel-only side at finer granularity (force_flush, log bridging,
error handling); the langfuse-observability example shows the
Langfuse + LangfusePromptBackend prompt-linkage side.