Authoring a Provider¶
When you target a wire format that isn't OpenAI Chat Completions
(Anthropic Messages, Bedrock, an internal gateway, a hand-rolled
inference service), implement the Provider Protocol yourself. The
shipped OpenAIProvider is ~465 lines because it handles every
edge case; a minimum-viable Provider is closer to 60 lines.
If you're new to Providers, read Model Providers for the overview and contract guarantees before continuing.
Skeleton¶
A minimal OpenAI-compatible Provider targeting any
/v1/chat/completions endpoint. Compare with
openarmature.llm.OpenAIProvider to see what a full implementation
adds (tool-call wire mapping, observability spans, the /v1/models
catalog probe, retry-after parsing, lenient argument parsing under
finish_reason="error", etc.).
from collections.abc import Sequence
from typing import Any
import httpx
from pydantic import BaseModel
from openarmature.llm import (
AssistantMessage,
Message,
ProviderInvalidRequest,
ProviderInvalidResponse,
ProviderUnavailable,
Response,
RuntimeConfig,
SystemMessage,
Tool,
ToolMessage,
Usage,
UserMessage,
classify_http_error,
validate_message_list,
validate_tools,
)
class MyProvider:
def __init__(self, *, base_url: str, model: str, api_key: str | None = None) -> None:
headers = {"Content-Type": "application/json"}
if api_key:
headers["Authorization"] = f"Bearer {api_key}"
self._client = httpx.AsyncClient(
base_url=base_url, headers=headers, timeout=60.0
)
self.model = model
async def ready(self) -> None:
try:
resp = await self._client.get("/v1/models")
except httpx.HTTPError as exc:
raise ProviderUnavailable(str(exc)) from exc
if resp.status_code != 200:
raise classify_http_error(resp)
async def complete(
self,
messages: Sequence[Message],
tools: Sequence[Tool] | None = None,
config: RuntimeConfig | None = None,
response_schema: dict[str, Any] | type[BaseModel] | None = None,
) -> Response:
# response_schema is part of the Protocol; a skeleton provider
# MUST NOT silently ignore it: callers expect either
# Response.parsed populated or a StructuredOutputInvalid raise.
# Until the wire path is implemented, raise
# ProviderInvalidRequest when response_schema is set. A
# production provider wires it through to native response_format
# support or the prompt-augmentation fallback; see
# ``openarmature.llm.OpenAIProvider``.
if response_schema is not None:
raise ProviderInvalidRequest(
"response_schema is not supported by this provider"
)
validate_message_list(messages)
validate_tools(tools)
body: dict[str, Any] = {
"model": self.model,
"messages": [_msg_to_wire(m) for m in messages],
}
if config and config.temperature is not None:
body["temperature"] = config.temperature
try:
resp = await self._client.post("/v1/chat/completions", json=body)
except httpx.HTTPError as exc:
raise ProviderUnavailable(str(exc)) from exc
if resp.status_code != 200:
raise classify_http_error(resp)
try:
payload = resp.json()
except ValueError as exc:
raise ProviderInvalidResponse("non-JSON response body") from exc
choice = payload["choices"][0]
wire_msg = choice["message"]
usage = payload.get("usage", {})
return Response(
message=AssistantMessage(content=wire_msg.get("content") or ""),
finish_reason=choice["finish_reason"],
usage=Usage(
# All three fields are required; pass ``None`` when
# the provider doesn't report usage.
prompt_tokens=usage.get("prompt_tokens"),
completion_tokens=usage.get("completion_tokens"),
total_tokens=usage.get("total_tokens"),
),
raw=payload,
)
def _msg_to_wire(msg: Message) -> dict[str, Any]:
if isinstance(msg, SystemMessage):
return {"role": "system", "content": msg.content}
if isinstance(msg, UserMessage):
return {"role": "user", "content": msg.content}
if isinstance(msg, AssistantMessage):
return {"role": "assistant", "content": msg.content or ""}
if isinstance(msg, ToolMessage):
return {
"role": "tool",
"content": msg.content,
"tool_call_id": msg.tool_call_id,
}
raise ValueError(f"unhandled message type: {type(msg).__name__}")
Contract checklist¶
When you ship a Provider, the following must hold:
Statelessness + reentrancy.
-
complete()MUST NOT carry state across calls. Each call sees the full message list; there is no implicit conversation state. - Multiple
complete()calls MAY run concurrently on the same Provider instance. The HTTP client should be safe for concurrent use (httpx.AsyncClient is).
Non-mutation.
-
messagespassed tocomplete()MUST NOT be mutated. Build wire bodies from copies / projections; never modify the input.
Boundary validation.
- Call
validate_message_list(messages)to enforce the list-level invariants (non-empty list;systemis optional but, when present, must be the first message; last must beuserortool; everytool_call_idmatches an earlier assistantToolCall.id). - Call
validate_tools(tools)if tools are accepted (duplicate-name check).
Error mapping.
- Network failures (connection errors, timeouts) →
ProviderUnavailable. - HTTP 401/403 →
ProviderAuthentication. - HTTP 400 →
ProviderInvalidRequest. - HTTP 404 with model-not-found →
ProviderInvalidModel; otherwise →ProviderUnavailable. - HTTP 429 →
ProviderRateLimitwithretry_afterfrom the header. - HTTP 503 with model-loading →
ProviderModelNotLoaded; otherwise →ProviderUnavailable. - HTTP 5xx (other) →
ProviderUnavailable. - 200 OK that fails to parse into the expected response shape →
ProviderInvalidResponse.
For OpenAI-compatible endpoints, classify_http_error does the
whole non-200 mapping table for you; the skeleton above just
delegates.
Finish reasons.
- Return one of:
"stop","length","tool_calls","content_filter","error". Map the wire format's finish-reason vocabulary to these five.
Beyond the skeleton¶
The skeleton omits things real Providers usually need. Reach for
openarmature.llm.OpenAIProvider as a reference when you need any
of:
- Tool calls. Wire-mapping the
tool_callsarray onAssistantMessageto the Provider's expected shape, parsing tool results back fromToolMessages. - Content blocks (multimodal user input). Wire-mapping the
list[ContentBlock]form ofUserMessage.contentto the provider's multimodal shape (OpenAI'simage_urlcontent-array entries, Anthropic's image blocks, Google'sinlineDataparts, etc.). The spec types (TextBlock,ImageBlock,ImageSourceURL,ImageSourceInline) are stable across providers; only the wire shape differs. Provider authors targeting non-multimodal models MUST surfaceProviderUnsupportedContentBlockwhen the request carries blocks the bound model can't serve (pre-send or post-receive per §7). - Structured output. Threading
response_schemathrough the request body (nativeresponse_formatif the underlying wire supports it; prompt-augmentation fallback otherwise) and validating the response against the schema before returning. PopulateResponse.parsedwith the validated value; raiseStructuredOutputInvalidon parse or validation failure. Usevalidate_response_schemaandstrict_mode_supportedfromopenarmature.llmto share the provider-agnostic boundary checks. - Observability events. Opt-in typed-event dispatch around the
wire call so the bundled OTel and Langfuse observers (plus any
custom observer using type discrimination) can build LLM spans
and Generation observations. Dispatch
openarmature.graph.events.LlmCompletionEventon successful calls andLlmFailedEventalongside anyLlmProviderErroryour adapter raises. The sketch below is the success and failure emission shape insidecomplete(); the bundledOpenAIProvider._build_llm_completion_eventand_build_llm_failed_eventare the reference implementations.
import time
import uuid
from openarmature.graph.events import LlmCompletionEvent, LlmFailedEvent
from openarmature.llm.errors import LlmProviderError
from openarmature.observability.correlation import (
current_attempt_index,
current_branch_name,
current_correlation_id,
current_dispatch,
current_fan_out_index,
current_invocation_id,
current_namespace_prefix,
)
async def complete(self, messages, /, *, tools=None, config=None):
dispatch = current_dispatch()
call_id = str(uuid.uuid4()) # fresh per call (per-attempt under retry)
adapter_start = time.perf_counter()
# Capture request-side data ONCE so both success and failure
# paths populate the typed event from the same projection.
serialized_messages: list[dict] = [] # image-redacted; see below
request_params = self._project_request_params(config)
request_extras = self._project_request_extras(config)
try:
serialized_messages = self._serialize_messages(messages)
response = await self._wire_call(messages, tools, config)
except LlmProviderError as exc:
# Alongside the exception — caller-side flow unchanged
# (the exception still raises out of complete()).
if dispatch is not None:
latency_ms = (time.perf_counter() - adapter_start) * 1000.0
dispatch(LlmFailedEvent(
invocation_id=current_invocation_id() or "",
correlation_id=current_correlation_id(),
node_name=(current_namespace_prefix() or ("",))[-1],
namespace=current_namespace_prefix(),
attempt_index=current_attempt_index(),
fan_out_index=current_fan_out_index(),
branch_name=current_branch_name(),
provider="my-provider",
model=self.model,
latency_ms=latency_ms,
input_messages=serialized_messages,
request_params=request_params,
request_extras=request_extras,
active_prompt=None,
active_prompt_group=None,
call_id=call_id,
error_category=exc.category,
error_type=type(exc).__name__,
error_message=str(exc),
))
raise
if dispatch is not None:
latency_ms = (time.perf_counter() - adapter_start) * 1000.0
dispatch(LlmCompletionEvent(
invocation_id=current_invocation_id() or "",
correlation_id=current_correlation_id(),
node_name=(current_namespace_prefix() or ("",))[-1],
namespace=current_namespace_prefix(),
attempt_index=current_attempt_index(),
fan_out_index=current_fan_out_index(),
branch_name=current_branch_name(),
provider="my-provider",
model=self.model,
response_id=response.response_id,
response_model=response.response_model,
usage=response.usage,
latency_ms=latency_ms,
finish_reason=response.finish_reason,
input_messages=serialized_messages,
output_content=response.message.content or None,
output_tool_calls=list(response.message.tool_calls or []),
request_params=request_params,
request_extras=request_extras,
active_prompt=None,
active_prompt_group=None,
call_id=call_id,
))
return response
Two contracts the bundled provider follows that custom providers SHOULD match:
- Mutual exclusion. A single
complete()call emits exactly oneLlmCompletionEvent(success) or exactly oneLlmFailedEvent(failure) — never both. Conformance fixture 072 locks this down. - Exception-flow preservation.
LlmFailedEventis dispatched alongside the §7 exception, not in place of it. The exception still raises out ofcomplete(); caller-side error handling is unchanged. The typed event is on the observer queue.
Inline image bytes MUST be redacted in the provider's
serialization step before reaching the typed event's
input_messages field (see
Observability: Inline image
redaction)
so observers consuming the typed events cannot leak raw bytes.
Legacy sentinel-namespace pattern (LLM_NAMESPACE +
LlmEventPayload dispatched as a NodeEvent pair) remains in
the public API as a documented compatibility surface. The bundled
OpenAIProvider retired it in v0.13.0; the bundled OTel and
Langfuse observers no longer recognize it. New providers should
emit typed events directly; the sentinel pattern is preserved for
providers shipped before the v0.13.0 migration that haven't yet
switched.
- Lenient response parsing under finish_reason="error".
Degraded responses surface what they can; tool-call arguments that
fail to parse populate arguments=None instead of raising.
- Catalog-aware ready(). GET /v1/models plus checking
whether the bound model is in the returned catalog (and, for local
servers like LM Studio, whether it's actually loaded).
- Retry-After parsing. Use parse_retry_after (re-exported
from openarmature.llm) to populate the retry_after field of
ProviderRateLimit from the response header.
The conformance fixtures under
tests/conformance/test_llm_provider.py exercise the wire mapping
end-to-end; a custom Provider that passes those fixtures matches
the contract.