Streaming
Abstract
Streaming 用来从 Agent 运行过程中实时输出更新。对 LLM 应用来说,流式输出很重要:完整响应生成前,用户就能看到中间进度、token、工具调用或自定义状态,交互体验会明显更好。
LangChain 的 streaming 系统可以把 Agent run 中的实时反馈传给应用层。常见用途包括:
- Stream agent progress:每个 Agent step 后输出状态更新。
- Stream LLM tokens:模型生成 token 时逐步输出。
- Stream thinking / reasoning tokens:输出模型 reasoning 内容。
- Stream custom updates:从工具或 graph node 内部发出自定义事件,例如
"Fetched 10/100 records"。 - Stream multiple modes:同时使用
updates、messages、custom等模式。
Stream Modes
stream / astream 支持传入一个或多个 stream mode:
| Mode | Description |
|---|---|
updates |
每个 Agent step 后输出 state updates。如果一个 step 中多个 node 产生更新,会分别输出。 |
messages |
从调用 LLM 的 graph nodes 中输出 (token, metadata)。适合展示 token、tool call chunk、reasoning block。 |
custom |
输出 graph nodes 或 tools 中通过 stream writer 写出的自定义数据。 |
Agent 进度
使用 stream_mode="updates" 可以观察 Agent 每一步的进展。一个调用工具的 Agent 通常会经历:
- model node:模型生成
AIMessage,其中包含 tool call request。 - tools node:工具执行并返回
ToolMessage。 - model node:模型基于工具结果输出最终回答。
from langchain.agents import create_agent
def get_weather(city: str) -> str:
"""Get weather for a given city."""
return f"It's always sunny in {city}!"
agent = create_agent(
model="gpt-5-nano",
tools=[get_weather],
)
for chunk in agent.stream(
{"messages": [{"role": "user", "content": "What is the weather in SF?"}]},
stream_mode="updates",
version="v2",
):
if chunk["type"] == "updates":
for step, data in chunk["data"].items():
print(f"step: {step}")
print(f"content: {data['messages'][-1].content_blocks}")
updates 更适合显示“Agent 现在走到哪一步了”,例如模型准备调用工具、工具返回结果、最终答案生成完成。
LLM Tokens
使用 stream_mode="messages" 可以流式获取模型 token,以及 token 对应的 metadata。它也会输出 tool call chunk,所以可以看到工具调用参数如何逐步生成。
from langchain.agents import create_agent
def get_weather(city: str) -> str:
"""Get weather for a given city."""
return f"It's always sunny in {city}!"
agent = create_agent(
model="gpt-5-nano",
tools=[get_weather],
)
for chunk in agent.stream(
{"messages": [{"role": "user", "content": "What is the weather in SF?"}]},
stream_mode="messages",
version="v2",
):
if chunk["type"] == "messages":
token, metadata = chunk["data"]
print(f"node: {metadata['langgraph_node']}")
print(f"content: {token.content_blocks}")
messages 适合前端打字机效果、展示 reasoning、观察 tool call chunk,或区分 token 来自哪个 node。
Custom Updates
如果想从 tool 执行过程中输出自定义进度,可以使用 get_stream_writer。
from langchain.agents import create_agent
from langgraph.config import get_stream_writer
def get_weather(city: str) -> str:
"""Get weather for a given city."""
writer = get_stream_writer()
writer(f"Looking up data for city: {city}")
writer(f"Acquired data for city: {city}")
return f"It's always sunny in {city}!"
agent = create_agent(
model="claude-sonnet-4-6",
tools=[get_weather],
)
for chunk in agent.stream(
{"messages": [{"role": "user", "content": "What is the weather in SF?"}]},
stream_mode="custom",
version="v2",
):
if chunk["type"] == "custom":
print(chunk["data"])
如果在 tool 中使用 get_stream_writer,这个 tool 需要在 LangGraph execution context 中运行,不能像普通函数一样脱离 graph 单独调用。
多模式 Streaming
可以同时传入多个 stream mode,例如 stream_mode=["updates", "custom"]。在 v2 格式下,每个 chunk 都是一个 StreamPart dict,包含 type、ns、data。
from langchain.agents import create_agent
from langgraph.config import get_stream_writer
def get_weather(city: str) -> str:
"""Get weather for a given city."""
writer = get_stream_writer()
writer(f"Looking up data for city: {city}")
writer(f"Acquired data for city: {city}")
return f"It's always sunny in {city}!"
agent = create_agent(
model="gpt-5-nano",
tools=[get_weather],
)
for chunk in agent.stream(
{"messages": [{"role": "user", "content": "What is the weather in SF?"}]},
stream_mode=["updates", "custom"],
version="v2",
):
print(f"stream_mode: {chunk['type']}")
print(f"content: {chunk['data']}")
多模式适合同时展示 Agent step 和工具进度,例如 UI 中一边显示工具正在执行,一边保留最终状态更新。
常见模式
Streaming Reasoning Tokens
一些模型可以输出 reasoning 内容。使用 stream_mode="messages",再从 content_blocks 中过滤 type == "reasoning" 的 block。
from langchain.agents import create_agent
from langchain.messages import AIMessageChunk
from langchain_anthropic import ChatAnthropic
def get_weather(city: str) -> str:
"""Get weather for a given city."""
return f"It's always sunny in {city}!"
model = ChatAnthropic(
model_name="claude-sonnet-4-6",
timeout=None,
stop=None,
thinking={"type": "enabled", "budget_tokens": 5000},
)
agent = create_agent(
model=model,
tools=[get_weather],
)
for token, metadata in agent.stream(
{"messages": [{"role": "user", "content": "What is the weather in SF?"}]},
stream_mode="messages",
):
if not isinstance(token, AIMessageChunk):
continue
reasoning = [b for b in token.content_blocks if b["type"] == "reasoning"]
text = [b for b in token.content_blocks if b["type"] == "text"]
if reasoning:
print(f"[thinking] {reasoning[0]['reasoning']}", end="")
if text:
print(text[0]["text"], end="")
LangChain 会把不同 provider 的 reasoning / thinking 格式规范化成标准 reasoning content block。
Streaming Tool Calls
Tool call 在 messages 模式下会以 chunk 形式逐步输出,例如部分 JSON 参数。完成后的 tool call 和 tool response 可以通过 updates 模式读取。
from langchain.agents import create_agent
from langchain.messages import AIMessage, AIMessageChunk, AnyMessage, ToolMessage
def get_weather(city: str) -> str:
"""Get weather for a given city."""
return f"It's always sunny in {city}!"
agent = create_agent("openai:gpt-5.4", tools=[get_weather])
def render_message_chunk(token: AIMessageChunk) -> None:
if token.text:
print(token.text, end="|")
if token.tool_call_chunks:
print(token.tool_call_chunks)
def render_completed_message(message: AnyMessage) -> None:
if isinstance(message, AIMessage) and message.tool_calls:
print(f"Tool calls: {message.tool_calls}")
if isinstance(message, ToolMessage):
print(f"Tool response: {message.content_blocks}")
input_message = {"role": "user", "content": "What is the weather in Boston?"}
for chunk in agent.stream(
{"messages": [input_message]},
stream_mode=["messages", "updates"],
version="v2",
):
if chunk["type"] == "messages":
token, metadata = chunk["data"]
if isinstance(token, AIMessageChunk):
render_message_chunk(token)
elif chunk["type"] == "updates":
for source, update in chunk["data"].items():
if source in ("model", "tools"):
render_completed_message(update["messages"][-1])
如果 completed messages 没有进入 state updates,可以用 custom updates 发出完整消息,或者在 streaming loop 中把 chunks 聚合成完整 AIMessageChunk。
Human-in-the-loop Streaming
结合 Human-in-the-loop middleware 时,可以在 updates 中收集 __interrupt__,再通过 Command(resume=decisions) 恢复执行。
from langchain.agents import create_agent
from langchain.agents.middleware import HumanInTheLoopMiddleware
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.types import Command
def get_weather(city: str) -> str:
"""Get weather for a given city."""
return f"It's always sunny in {city}!"
agent = create_agent(
"openai:gpt-5.4",
tools=[get_weather],
middleware=[
HumanInTheLoopMiddleware(interrupt_on={"get_weather": True}),
],
checkpointer=InMemorySaver(),
)
config = {"configurable": {"thread_id": "some_id"}}
interrupts = []
for chunk in agent.stream(
{"messages": [{"role": "user", "content": "Can you look up the weather in Boston?"}]},
config=config,
stream_mode=["messages", "updates"],
version="v2",
):
if chunk["type"] == "updates":
for source, update in chunk["data"].items():
if source == "__interrupt__":
interrupts.extend(update)
# 根据 interrupts 构造 approve / edit / reject decisions 后恢复
agent.stream(
Command(resume=decisions),
config=config,
stream_mode=["messages", "updates"],
version="v2",
)
这里的关键是:恢复时必须使用同一个 thread_id,并且 decisions 的顺序要和 interrupt 中 action requests 的顺序一致。
Sub-agents Streaming
当 Agent 内部还有子 Agent 或多个 LLM 时,需要区分 token 来源。可以在 create_agent 时设置 name,并在 messages metadata 中读取 lc_agent_name。
from langchain.agents import create_agent
from langchain.chat_models import init_chat_model
def get_weather(city: str) -> str:
"""Get weather for a given city."""
return f"It's always sunny in {city}!"
weather_agent = create_agent(
model=init_chat_model("openai:gpt-5.4"),
tools=[get_weather],
name="weather_agent",
)
def call_weather_agent(query: str) -> str:
"""Query the weather agent."""
result = weather_agent.invoke({
"messages": [{"role": "user", "content": query}]
})
return result["messages"][-1].text
agent = create_agent(
model=init_chat_model("openai:gpt-5.4"),
tools=[call_weather_agent],
name="supervisor",
)
current_agent = None
for chunk in agent.stream(
{"messages": [{"role": "user", "content": "What is the weather in Boston?"}]},
stream_mode=["messages", "updates"],
subgraphs=True,
version="v2",
):
if chunk["type"] == "messages":
token, metadata = chunk["data"]
if agent_name := metadata.get("lc_agent_name"):
if agent_name != current_agent:
print(f"{agent_name}:")
current_agent = agent_name
subgraphs=True 会让子图输出也进入 stream,适合 supervisor / worker 或 multi-agent 系统。
禁用 Streaming
有些场景需要禁用某个模型的 token streaming,例如多 Agent 系统中只希望特定 Agent 输出,或者混用支持 streaming 和不支持 streaming 的模型。
from langchain_openai import ChatOpenAI
model = ChatOpenAI(
model="gpt-5.4",
streaming=False,
)
不是所有 chat model integration 都支持 streaming 参数。如果不支持,可以使用基础类提供的 disable_streaming=True。
v2 Streaming Format
LangGraph 1.1+ 支持 version="v2"。v2 下所有 stream chunk 都是统一格式:
type:stream mode,例如updates、messages、custom。ns:namespace,用于区分子图等来源。data:实际 payload。
for chunk in agent.stream(
{"messages": [{"role": "user", "content": "What is the weather in SF?"}]},
stream_mode=["updates", "custom"],
version="v2",
):
print(chunk["type"])
print(chunk["data"])
旧格式需要解包 (mode, data) tuple;v2 让单模式和多模式的处理方式保持一致,更适合统一封装前端事件。
v2 也改进了 invoke() 返回值,可以把 state 和 interrupts 分开:
result = agent.invoke(
{"messages": [{"role": "user", "content": "Hello"}]},
version="v2",
)
print(result.value)
print(result.interrupts)
小结
Streaming 是提升 Agent 应用体验和可观测性的关键能力。updates 适合展示 Agent 进度,messages 适合展示 token、reasoning 和 tool call chunks,custom 适合输出业务进度。复杂应用通常会组合多个 stream mode,并使用 v2 格式统一处理事件。