LLMs¶
The graph engine has no concept of LLMs or tools. A node is just an async function that reads typed state and returns a partial update. Calling an LLM is one of the things a node can do during that call, the same way it might read a file, hit a database, or invoke an internal service. This page covers the patterns that emerge once you start mixing LLM calls into graph nodes.
LLM calls are async IO inside a node¶
Construct one Provider when your application
owns its lifecycle (entry-point coroutine, FastAPI startup event,
lazy on-first-use) and share it across nodes. Each complete() call
carries the full message list and returns a
Response; the provider is stateless and
reentrant, so multiple nodes (or fan-out instances) can call into it
concurrently without coordination.
OpenAIProvider eagerly opens an httpx.AsyncClient in its
constructor; that client must be closed with await provider.aclose()
to release the connection pool. Constructing the provider as a
module-level side effect (provider = OpenAIProvider(...) at the top
of the file) leaks the client in tooling, tests, and docs-build
processes that import the module without running your shutdown path.
Prefer lazy construction or an explicit lifecycle hook.
import os
from openarmature.llm import OpenAIProvider, UserMessage
_provider_instance: OpenAIProvider | None = None
def _get_provider() -> OpenAIProvider:
global _provider_instance
if _provider_instance is None:
_provider_instance = OpenAIProvider(
base_url=os.environ.get("LLM_BASE_URL", "https://api.openai.com"),
model="gpt-4o-mini",
api_key=os.environ["LLM_API_KEY"],
)
return _provider_instance
async def analyze(state: AnalysisState) -> dict:
response = await _get_provider().complete(
[UserMessage(content=state.text)],
)
return {"raw": response.message.content}
async def main() -> None:
try:
... # build graph, invoke
finally:
if _provider_instance is not None:
await _provider_instance.aclose()
The provider goes wherever your application's other long-lived
dependencies go (dependency-injection container, factory, lazy
module-level cache), and you close it on the same lifecycle hook you
use for those. A FastAPI app uses app.on_event("shutdown"); a
script uses a try/finally around the entry-point coroutine.
A real graph hits LLMs from multiple nodes. The conventional shape:
async def classify(state): # one provider call
response = await provider.complete(...)
return {...}
async def extract(state): # another provider call
response = await provider.complete(...)
return {...}
async def synthesize(state): # a third
response = await provider.complete(...)
return {...}
The graph composes the order; the provider sees three independent stateless calls. Conversational memory (if you want it) is the caller's responsibility: thread it through state and pass the accumulated message list into each call.
Structured output¶
Every LLM-using node that produces typed data ends up with the same shape: render a prompt, call the model, parse the response as JSON, validate it against the expected schema, retry on parse or validation failure. Five steps of boilerplate that differ only in the schema and the prompt.
Structured output collapses that into one parameter. Pass a
response_schema to complete() and the provider:
- Tells the model on the wire to produce schema-conforming output.
- Parses and validates the response against the schema.
- Surfaces the validated value on
Response.parsed. - Raises
StructuredOutputInvalidon parse or validation failure.
Two forms are accepted: a Pydantic class (typed-instance return) and a JSON Schema dict (raw-dict return). Same wire shape underneath.
Pydantic class form¶
from typing import Literal
from pydantic import BaseModel
class Classification(BaseModel):
intent: Literal["research", "summarize"]
rationale: str
async def classify(state):
response = await provider.complete(
[UserMessage(content=f"Route this query: {state.query!r}")],
response_schema=Classification,
)
return {"classification": response.parsed}
Response.parsed is a validated Classification instance at
runtime; the framework calls .model_json_schema() under the hood
to derive the wire body and .model_validate() to deserialize the
response.
Static typing is shallower. Response.parsed is annotated as
dict[str, Any] | BaseModel | None, so a type checker won't narrow
to Classification from the response_schema=Classification
argument alone. Callers that want static field access either
cast(Classification, response.parsed), narrow with isinstance,
or assign the value into a typed local. Generic Response[T] is on
the table as a follow-up.
JSON Schema dict form¶
async def research(state):
response = await provider.complete(
[UserMessage(content=f"Plan research: {state.query!r}")],
response_schema={
"type": "object",
"properties": {
"topics": {"type": "array", "items": {"type": "string"}},
"follow_up_questions": {"type": "array", "items": {"type": "string"}},
},
"required": ["topics", "follow_up_questions"],
"additionalProperties": False,
},
)
return {"research_plan": response.parsed}
Response.parsed is a dict[str, Any] populated per the schema. Use
this when the shape is dynamic, generated, or borrowed from another
system that already speaks JSON Schema.
Wire paths: native and fallback¶
Real OpenAIProvider traffic uses OpenAI's native response_format
field on the request body, so the model produces schema-conforming
output in one trip. Some OpenAI-compatible servers (older vLLM, some
LM Studio releases, llama.cpp variants) either reject response_format
with a 400 or silently ignore it. For those, construct the provider
with force_prompt_augmentation_fallback=True:
provider = OpenAIProvider(
base_url="http://localhost:8000",
model="some-local-model",
force_prompt_augmentation_fallback=True, # opt into the fallback
)
In fallback mode the provider prepends a system directive containing
the serialized schema, omits response_format from the wire, and
parses-and-validates the response post-receive. The behavioral contract
is identical: Response.parsed populates the same way; failures raise
StructuredOutputInvalid the same way. The
uses_prompt_augmentation_fallback read-only property lets callers
inspect which path is active.
Strict mode¶
OpenAI's native path supports a strict: true flag that engages the
model's schema-constrained decoding: the model literally cannot emit
non-conforming tokens. The framework decides strict: true vs
strict: false automatically based on whether your schema satisfies
strict-mode constraints. Either way, the framework validates the
response post-receive against the supplied schema; strict is a
wire-level optimization, not a correctness requirement.
strict_mode_supported(schema) (exported from openarmature.llm)
performs the deep recursive check. The heuristic is conservative —
anything not on the list below trips to strict: false:
- Top-level schema is
type: "object". - For every nested object:
additionalPropertiesis explicitlyfalse, and every key inpropertiesis listed inrequired. - For every nested array:
itemsis present and points to a verifiable schema (dict, or tuple-form list of dicts). - Every branch of
anyOf/oneOf/allOfindependently satisfies the above. - Internal
$reftargets (#/...or bare#) resolve and their resolved schema passes. External refs (any other URI) and$refcycles are handled conservatively. - Primitive types (
string,integer,number,boolean,null) are accepted as terminal: no nested structure to verify. - Empty
{}schemas and unrecognized-keyword schemas (const-only,enum-only, etc.) trip to non-strict; the walker can't statically verify them.
If you control the schema and want strict mode, the easiest path is to
set additionalProperties: false and put every property in required
on every object. Pydantic-derived schemas may need model_config =
ConfigDict(extra="forbid") on the class to get the
additionalProperties: false in the generated JSON Schema.
Routing on parsed fields¶
A conditional edge is a function state -> str that names the next
node. The string can come from anywhere: a hard-coded rule, a lookup
table, the parsed output of an LLM call. The graph engine doesn't
distinguish.
This means LLM-driven routing and deterministic routing have the same
shape. A classifier node writes its parsed Classification to state;
the conditional edge reads state.classification.intent and returns
that string. The branches don't know whether the LLM or a regex
produced the discriminator.
async def classify(state):
response = await provider.complete(
[UserMessage(content=f"Route: {state.query!r}")],
response_schema=Classification,
)
return {"classification": response.parsed}
def route(state) -> str:
return state.classification.intent
builder.add_conditional_edge("classify", route)
The same route function could read a feature flag, a config lookup,
or "research" if "?" in state.query else "summarize". The branch
nodes don't change. Swapping a rule-based router for an LLM-based one
is a one-node change.
Errors at the LLM boundary¶
Every provider call can fail. The
openarmature.llm reference lists the canonical
error categories; this section covers how they compose with the rest
of the graph.
Transient categories (retry MAY succeed):
ProviderRateLimit, ProviderUnavailable, ProviderModelNotLoaded.
These are the canonical "wrap a node in RetryMiddleware" set; the
default classifier picks them up automatically via
TRANSIENT_CATEGORIES.
Non-transient categories (retry without changing the request will
not succeed): ProviderAuthentication, ProviderInvalidModel,
ProviderInvalidRequest, ProviderInvalidResponse,
StructuredOutputInvalid. These propagate up as NodeException so
the graph's error-recovery middleware (or the caller of invoke())
can handle them.
StructuredOutputInvalid is the new one and worth a note. It fires
when a model returns content that fails to parse as JSON, or parses
but fails to validate against the supplied schema. The exception
carries the requested response_schema, the raw_content the model
produced, and a failure_description. It is non-transient by default
because a model that emits non-conforming output on a given prompt
usually emits the same non-conforming output on retry. Useful retry
strategies for this case involve changing the prompt or doubling
max_tokens rather than re-issuing the same call; that's a
middleware concern, not the provider's default.
from openarmature.llm import StructuredOutputInvalid
async def classify_with_diagnostics(state):
try:
response = await provider.complete(
[UserMessage(content=...)],
response_schema=Classification,
)
except StructuredOutputInvalid as exc:
log.warning(
"schema-validation failure on classify",
extra={
"raw_content": exc.raw_content,
"failure": exc.failure_description,
},
)
raise
return {"classification": response.parsed}
Callers wanting to retry validation failures specifically can
construct a RetryMiddleware with a custom classifier that adds
structured_output_invalid to the transient set. The default
classifier won't do this for them.
Where to next¶
- Model Providers for the provider
contract, the shipped
OpenAIProvider, and the canonical error categories. - Authoring a Provider for writing a provider against a non-OpenAI wire format (Anthropic Messages, Bedrock, internal gateway).
- API reference:
openarmature.llmfor the full surface: message types,Response,RuntimeConfig, every error class, validation helpers. - Examples:
00-hello-worldfor a runnable graph exercising bothresponse_schemaforms in one pipeline.