In Part 1 I built a LangGraph ReAct agent behind an OpenAI-compatible API and waved at one line:
return StreamingResponse(graph_to_openai_sse(graph, inputs, model_name, config=config),
media_type="text/event-stream")
That graph_to_openai_sse is where the real work hides. An OpenAI client like Open WebUI doesn't want "a LangGraph run" β it wants a very specific stream of chat.completion.chunk JSON objects over Server-Sent Events, terminated by a [DONE] sentinel. LangGraph, meanwhile, emits its own rich event stream. This post is the adapter between the two β about 90 lines that also give you a free "thinking" panel showing the agent's tool calls as they happen.
The two formats
What the client expects β each token arrives as an SSE line: data: {json}\n\n, where the JSON is an OpenAI chunk:
# app/api/openai_compat.py
def make_chunk(delta, model_name, completion_id, finish_reason=None):
return {
"id": completion_id, # "chatcmpl-..."
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": model_name,
"choices": [{"index": 0, "delta": delta, "finish_reason": finish_reason}],
}
The stream has a strict shape:
- a first chunk with
delta = {"role": "assistant"}, - many chunks with
delta = {"content": "..."}β one per token, - a final chunk with empty delta and
finish_reason = "stop", - the literal line
data: [DONE]\n\n.
Miss the [DONE] and the client spins forever. Skip the role chunk and some clients drop the first token. The contract is small but unforgiving.
What LangGraph emits β astream_events is a single async stream of typed events for everything happening inside the graph: model tokens, tool calls, node transitions. We subscribe once and translate each event we care about into chunks.
The core loop
# app/api/streaming.py
async def graph_to_openai_sse(graph, inputs, model_name, config=None):
completion_id = new_completion_id()
yield _sse(make_chunk({"role": "assistant"}, model_name, completion_id)) # (1) role
def emit(text):
return _sse(make_chunk({"content": text}, model_name, completion_id))
async for event in graph.astream_events(inputs, config=config, version="v2"):
kind = event.get("event")
if kind == "on_chat_model_stream":
chunk = event["data"]["chunk"]
if isinstance(chunk, AIMessageChunk) and isinstance(chunk.content, str):
yield emit(chunk.content) # (2) tokens
yield _sse(make_chunk({}, model_name, completion_id, finish_reason="stop")) # (3) stop
yield b"data: [DONE]\n\n" # (4) done
Three things to notice:
-
version="v2"pins the event schema. The event stream format has changed across LangChain releases; pinning it means yourmetadata.langgraph_nodeanddata.chunkkeys don't silently move under you. -
on_chat_model_streamis the token event. Itsdata.chunkis anAIMessageChunkβ but only when the LLM is actually streaming. Guarding withisinstance(...)avoids crashing on the non-streaming events that also flow through. -
One
completion_idfor the whole response. Every chunk in a single completion shares it; that's how the client stitches tokens into one message.
_sse is just the wire framing β and note ensure_ascii=False, which matters the moment your tokens are Korean, Japanese, or emoji:
def _sse(payload):
return f"data: {json.dumps(payload, ensure_ascii=False)}\n\n".encode("utf-8")
Surfacing the agent's thinking
Streaming the final answer is table stakes. The interesting part of a ReAct agent is what it did before answering β which document it searched, what came back. Open WebUI renders any text wrapped in <think>...</think> as a collapsible reasoning panel. So we narrate tool activity into that panel.
First, label the nodes worth announcing:
NODE_LABELS = {
"tools": "π Searching the docsβ¦",
}
Then open a <think> block, and on the relevant events, emit human-readable progress instead of raw tokens:
show_thinking = bool(NODE_LABELS)
think_open = False
prev_node = None
if show_thinking:
yield emit("<think>\n")
think_open = True
async for event in graph.astream_events(inputs, config=config, version="v2"):
kind = event.get("event")
node = (event.get("metadata") or {}).get("langgraph_node", "")
# node entry β status line
if node and node != prev_node and node in NODE_LABELS:
yield emit(f"\n{NODE_LABELS[node]}\n")
prev_node = node
if kind == "on_tool_start":
yield emit(f" β’ `{event.get('name', 'tool')}` runningβ¦")
continue
if kind == "on_tool_end":
output = event.get("data", {}).get("output")
text = output.content if hasattr(output, "content") else str(output)
snippet = " ".join(str(text).split())[:90] # collapse whitespace, clip
yield emit(f" β `{snippet}β¦`\n" if snippet else " β\n")
continue
# ... on_chat_model_stream handled as before
The on_tool_end output is a ToolMessage, so its text lives on .content β hence the hasattr(output, "content") check before falling back to str(). Collapsing whitespace and clipping to ~90 chars keeps the panel readable instead of dumping a wall of retrieved text.
Closing the panel has to happen no matter how the stream ends β success, exception, or early return β so it goes in a finally:
finally:
if think_open:
yield _sse(make_chunk({"content": "\n</think>\n"}, model_name, completion_id))
The result in the UI: a collapsible "π Searching the docsβ¦ β" panel, then the streamed answer below it. The user sees the agent reach for RAG in real time.
Two production details that bite
1. Errors belong in the stream, not in a 500. Once you've started streaming, the HTTP status is already 200 and headers are flushed β you can't switch to an error response. So catch inside the generator and emit the error as content:
except Exception as exc:
log.exception("stream failed")
yield _sse(make_chunk({"content": f"\n[error] {exc}"}, model_name, completion_id))
The user sees [error] ... in the chat instead of a frozen, half-rendered message.
2. Not every model streams. Some gateways/models return a single batched response with no on_chat_model_stream events at all. If you only ever forwarded tokens, those models would yield an empty answer. Track whether any token was seen, and if not, fall back to a plain ainvoke:
if not saw_token:
result = await graph.ainvoke(inputs, config=config)
final = extract_final_text(result.get("messages", []))
yield emit(final)
extract_final_text walks the message log backwards for the last non-empty AIMessage β handling both plain-string content and the list-of-blocks shape some providers return. This one guard is the difference between "streaming works on my dev model" and "works on every model behind the gateway."
The shape of the whole thing
graph.astream_events(version="v2")
β
ββ on_chat_model_stream β emit({"content": token})
ββ node entry β emit("π status line") β
ββ on_tool_start β emit("β’ tool runningβ¦") ββ inside <think>β¦</think>
ββ on_tool_end β emit("β snippetβ¦") β
ββ (exception) β emit("[error] β¦")
βΌ
first chunk {role} β β¦content chunksβ¦ β {finish_reason: stop} β data: [DONE]
The payoff from Part 1 compounds here: because the boundary is just OpenAI SSE, this thinking-panel UX shows up in any OpenAI-compatible client with zero client code. You wrote a translator, and every frontend in that ecosystem speaks it for free.
Next up: persisting conversation threads with a checkpointer so the agent remembers across requests β and what that does to the streaming loop.
Built with LangGraph, LangChain, and FastAPI. Part 2 of a series on running LangGraph in production β Part 1 here.













