Streaming Tool Calls and Partial Outputs in LangGraph
Token streaming is the easy part. Streaming what a tool is doing, mid-call, is what makes an agent feel alive.
Streaming the final answer token by token is the easy part, and it is also the least interesting. The thing that makes an agent feel responsive is showing what it is doing while a tool runs: "searching 3 of 5 sources", "reading the database", "found 120 rows". That progress lives inside a tool call, not in the model's output, and getting it out to the user trips up a lot of people. This post is about how LangGraph streams all of it, and the one pattern (the custom writer) that you need for tool progress.
I will use the v2 stream API throughout, which is the current recommendation in LangGraph and gives every chunk the same shape. If you are still on v1, the migration is small and I will point out the differences.
The mental model: pick what you want out of the graph#
A LangGraph run produces a lot of events. Streaming is just choosing which of those events you want pushed to you as they happen. You choose with stream_mode. The modes you will actually use:
messages: LLM tokens, token by token, as 2-tuples of(message_chunk, metadata).updates: the state changes returned by each node, after it runs.custom: arbitrary data you emit yourself from inside a node or tool. This is the one for tool progress.values: the full state after each step (heavier; use when the client wants the whole snapshot).
You can pass several at once. With version="v2", every chunk is a StreamPart dict with type, ns, and data, so you switch on chunk["type"] instead of unpacking tuples differently per mode.
for part in graph.stream(
{"messages": [{"role": "user", "content": "what's the weather in Lagos?"}]},
stream_mode=["messages", "updates", "custom"],
version="v2",
):
if part["type"] == "messages":
msg, meta = part["data"]
if msg.content:
print(msg.content, end="", flush=True)
elif part["type"] == "updates":
for node, state in part["data"].items():
print(f"\n[node {node} done]")
elif part["type"] == "custom":
print(f"\n[progress] {part['data']}")That single loop is the whole API surface for most apps. The rest is knowing which mode answers which question.
Streaming LLM tokens#
Use messages mode to stream tokens from any model call in the graph, including ones inside tools or subgraphs. Each chunk's data is a (message_chunk, metadata) tuple.
for part in graph.stream(inputs, stream_mode="messages", version="v2"):
if part["type"] == "messages":
msg, meta = part["data"]
if msg.content:
print(msg.content, end="", flush=True)The metadata is where the useful filtering lives. If your graph has more than one model call, you do not want them interleaved in the UI. Filter by node:
if msg.content and meta["langgraph_node"] == "final_answer":
yield msg.contentOr tag the model and filter by tag, which is cleaner when the same node runs different models:
from langchain.chat_models import init_chat_model
answer_model = init_chat_model("gpt-4.1-mini", tags=["answer"])
# ... later, in the stream loop:
if meta["tags"] == ["answer"]:
...If you run a model purely for internal work (structured output, a routing decision) and do not want its tokens in the user stream, tag it with nostream. It still runs and returns its output, the tokens just never appear in messages mode. This saves you from leaking a router's reasoning into the chat window.
Streaming tool progress: the custom writer#
Here is the part people miss. A tool call is opaque by default. The model decides to call search_web, the tool runs for four seconds, and the user stares at nothing. messages mode will not help, because the tool is not emitting tokens. You need to emit progress yourself, and that is what custom mode plus the stream writer are for.
Inside a tool, grab the writer and call it with whatever you want to surface:
from langchain.tools import tool
from langgraph.config import get_stream_writer
@tool
def search_sources(query: str) -> str:
"""Search across the user's connected sources."""
writer = get_stream_writer()
sources = ["docs", "slack", "email", "tickets", "wiki"]
hits = []
for i, src in enumerate(sources, start=1):
writer({"type": "progress", "msg": f"searching {src} ({i}/{len(sources)})"})
hits.extend(search_one(src, query))
writer({"type": "progress", "msg": f"found {len(hits)} results"})
return format_hits(hits)Then read it on the other side:
for part in graph.stream(inputs, stream_mode="custom", version="v2"):
if part["type"] == "custom":
print(part["data"]["msg"])searching docs (1/5)
searching slack (2/5)
searching email (3/5)
searching tickets (4/5)
searching wiki (5/5)
found 120 resultsThat is the difference between a spinner and a status line. The writer takes any JSON-serializable payload, so you can send structured events (a progress percentage, a partial result, a row count) and let your frontend decide how to render them.
On Python older than 3.11 in async code, get_stream_writer() does not work because of how context propagates. Add a writer: StreamWriter argument to your node or tool instead, and LangGraph injects it. On 3.11+ the get_stream_writer() call is fine.
Streaming partial state with updates#
updates mode gives you what each node changed when it finished. For an agent loop, that is a clean way to show structure: the model decided to call a tool, the tool returned, the model is composing the answer. Each node appears as a key in part["data"].
for part in graph.stream(inputs, stream_mode="updates", version="v2"):
if part["type"] == "updates":
for node, state in part["data"].items():
if node == "tools":
print("ran a tool, got results")
elif node == "agent":
print("model produced a step")Use updates for milestones and custom for fine-grained progress within a step. They complement each other: updates says "the tool node finished", custom says "we are on source 3 of 5" while it is still running.
Subgraphs and multi-agent setups#
If your agent calls subgraphs (a common multi-agent shape), set subgraphs=True so their events come through too. The ns field on each chunk tells you where it came from: an empty tuple for the root graph, and a path like ("research_agent:<id>",) for a subgraph.
for part in graph.stream(inputs, stream_mode="updates", subgraphs=True, version="v2"):
if part["type"] == "updates":
where = part["ns"][0] if part["ns"] else "root"
print(f"[{where}] {list(part['data'].keys())}")This is how you keep a supervisor's progress and a worker's progress visually separate, which matters once you have more than one agent in play. If you are building that kind of system, it pairs with the routing choices in Multi-agent handoffs vs the supervisor pattern.
Getting it to the browser#
Most of the time you are not printing to a terminal, you are pushing to a web client. Server-Sent Events (SSE) is the simplest transport, and an async generator maps onto it cleanly. Here is the FastAPI shape:
import json
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
@app.post("/chat")
async def chat(body: dict):
async def event_stream():
async for part in graph.astream(
{"messages": [{"role": "user", "content": body["message"]}]},
stream_mode=["messages", "custom"],
version="v2",
):
if part["type"] == "messages":
msg, _ = part["data"]
if msg.content:
yield f"data: {json.dumps({'token': msg.content})}\n\n"
elif part["type"] == "custom":
yield f"data: {json.dumps({'progress': part['data']})}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(event_stream(), media_type="text/event-stream")On the client, one event type carries tokens to append to the message, another carries progress to show in a status line. I went deeper on the full backend-to-UI path, including reconnection and backpressure, in Streaming LLM responses end to end. This is the LangGraph-specific half of that story.
What I reach for, and when#
| You want to show | Mode | Notes |
|---|---|---|
| The answer typing out | messages | Filter by node or tag so only the final answer streams |
| "Searching source 3 of 5" | custom | Emit with get_stream_writer() inside the tool |
| "Tool finished, composing answer" | updates | Per-node milestones |
| Which agent is talking | any + subgraphs=True | Read the ns field |
| Internal model output kept private | tag it nostream | Runs, but tokens are not streamed |
The trap is thinking streaming is only about tokens. The agents that feel good are the ones that tell you what they are doing while they do it, and in LangGraph that means reaching for the custom writer, not just messages mode. Wire up ["messages", "custom"] from the start, emit a progress line from every tool that takes more than a second, and the difference in how the agent feels is immediate.
If you have not set up persistence yet, streaming pairs naturally with it: a checkpointed graph can stream, pause for human input, and resume the stream later. That side is covered in LangGraph state, checkpointing, and persistence.

Folarin Akinloye is an AI Engineer based in London, UK. He builds production-ready agentic AI systems, multi-agent architectures, and sophisticated RAG implementations, and writes about the engineering decisions behind them.