你写了一个看起来完全正确的 Agent。Prompt 调了三版,工具注册没问题,RAG 链路也通了,测试输入怼进去,模型开始吐字——然后用户在等。等了多久?
不知道。只知道第一个 token 出来之前,整个系统像是卡死了。
这不是你的 Agent 逻辑有问题。这是 LLM 本身的问题:TTFT(Time To First Token,首 token 延迟)在大模型推理里是个躲不过的坎。
7B 模型还好,一旦切到 70B,TTFT 动辄两三秒。用户看到白屏,不知道你在干什么,直接关掉。
Streaming 就是来解决这个问题的。它不是让模型跑更快,而是让用户在等待的时候能看到东西——哪怕只是一堆跳动的光标,哪怕是一行行正在生成的文字。
感知延迟降低了,实际等待时间没变,但体验天差地别。

LLM 的锅,凭什么要我前端工程师来擦
在字节、腾讯、阿里今年的 Agent 开发岗位面试里,「如何实时输出思考过程」已经是高频追问。
它不是考你知不知道 Streaming 这个词,而是考你能不能说清楚 astream 和 astream_events 的本质区别,能不能在真实项目里做出对的选型。
这篇把这两个 API 掰开了讲,连带生产落地的坑一起填上。
一、Streaming 解决的是什么:先说清楚工程问题
LLM 生成内容是个逐 token 吐出的过程。在模型跑完整个推理之前,第一个 token 根本出不来。这个间隔,TTFT,在大模型场景下是几秒级别的。
对用户来说,这几秒像几年。对产品来说,这几秒足够让人关掉页面。
Streaming 的核心思路不是让 LLM 跑更快,而是把 token 生成的过程「管道化」——模型每生成一个 token,就往前端发一个。
用户看到的是一行行跳出来的文字,而不是一个光标转圈。实际等待时间没变,但感知等待时间大幅压缩。
LangGraph 官方把这一点说得很直接:「Bridge user expectations and agent capabilities」1。Streaming 就是这座桥。
你可能见过一些粗糙的实现:模型生成完毕后一次性返回完整文本,前端再慢慢打字出来。
这种「伪流式」在短回答场景下还能凑合,但在长回答、复杂 Agent 场景下,TTFT 问题根本没有解决,只是把等待时间平移到了渲染阶段。
真正的 Streaming 必须从模型推理层就开始。你看到的 token,是模型真正生成的那个 token,不是一个假的打字动画。
二、astream:面向用户的 token 级流式 API
2.1 API 签名与返回值结构
astream 是 LangGraph 面向用户的流式输出入口。它的签名大概长这样:
async def astream(
input: dict,
config: RunnableConfig,
*,
stream_mode: StreamMode = "values"
) -> AsyncIterator[StreamEvent]
核心参数是两个:input 是输入状态,config 包含 run_id、tags 这类运行时配置。stream_mode 控制输出格式,这是面试里最容易挖坑的地方。
2.2 stream_mode='values' vs stream_mode='updates' 的核心差异
这是 astream 最常被追问的细节。两种模式输出语义完全不同:
stream_mode='values':每次迭代输出完整的当前状态快照。你拿到的每个事件都包含完整的 state dict,可以看到整个图在当前时刻的所有变量。
这意味着你可以用最新状态渲染 UI,但每次事件的数据量较大。
stream_mode='updates':每次迭代只输出自上次以来的变化部分。拿到的每个事件只包含被修改的节点及其新值,不包含完整状态。
这意味着数据量小,适合做增量渲染,但需要你在前端自己维护状态。
在实际项目里,如果你的 UI 需要展示完整的对话上下文,用 values;如果你只需要渲染最新输出,用 updates 可以省不少带宽。
2.3 基础代码示例
一个典型的 astream 调用大概长这样:
async for event in agent.astream(
{"messages": [HumanMessage(content=user_input)]},
config={"recursion_limit": 50},
stream_mode="values"
):
# event 是当前完整状态快照
last_message = event["messages"][-1]
print(last_message.content, end="", flush=True)
注意 flush=True——这是让 Python 实时打印的关键。没有 flush=True,你还是会看到批量输出而不是逐 token 效果。

flush=True 这个参数卡了我三小时
2.4 面试追问:astream vs LCEL stream
这里有个容易踩的坑:LangChain 的 LCEL(LangChain Expression Language)也有自己的 stream 方法,但它是同步的,用的是 stream() 而不是 astream()。
LCEL 的 stream 是给简单链式调用的,线性流程用这个没问题。
但 LangGraph 的 astream 是异步的,专门为有状态循环图设计——它需要处理多轮推理、工具调用、中断恢复这些复杂场景。
面试时如果被问到「你用过哪些流式方案」,正确答案是:「LCEL 的 stream 用于简单链式场景,LangGraph 的 astream 用于复杂 Agent 编排场景。
」这个区分本身就是工程判断力的体现。
三、astream_events:面向开发者的全链路事件流
3.1 事件类型全景图
如果说 astream 是给用户看的「产品界面」,astream_events 就是给工程师看的「调试面板」。它输出的不是 token,而是整个 Agent 执行链路上的结构化事件。
LangGraph 的 astream_events 会触发以下几类核心事件:
| 事件类型 | 触发时机 | 主要字段 |
|---|---|---|
| on_chain_start | 图节点开始执行 | name, run_id, input |
| on_chain_stream | 节点运行时持续输出 | run_id, data |
| on_chain_end | 节点执行完成 | run_id, output, metadata |
| on_tool_start | 工具开始调用 | name, tool_input |
| on_tool_end | 工具调用完成 | tool_output, error |
| on_text_delta | token 生成 | text_chunk |
| on_chat_model_stream | 模型 token 输出 | chunk, meta |
这个事件体系覆盖了 LLM 调用、工具执行、输出解析全链路。你拿到的是一个完整的执行轨迹,每个环节的时间戳、输入输出都能看到。
3.2 run_id 与 parent_run_id 的链路追踪
多节点 Agent 里,一个 run 往往会触发子 run。比如主图调用了一个子 Agent,子 Agent 又调了一个工具。
这时候父 run 和子 run 通过 parent_run_id 串联起来。
astream_events 输出的每个事件都带着 run_id 字段。
你可以顺着 parent_run_id 把完整的调用链还原出来——哪个节点先跑、哪个工具后调、哪里耗时最长、哪里出错了,一目了然。
3.3 与 LangSmith 的联动
LangGraph 官方文档明确把 astream_events 和 LangSmith 绑定在一起:「LangSmith, our agent engineering platform, helps developers debug every agent decision, eval changes, and deploy in one click」2。
LangSmith 是 Anthropic/LangChain 官方出的可观测性平台。
你可以把自己写的 event handler 把 astream_events 的事件转发到 LangSmith,在平台上看到完整的调用链路火焰图。
面试时如果提到这个,面试官会知道你不仅会用 API,还理解背后的工程生态。
3.4 生产调试:用 astream_events 定位工具调用错误
真实的研报生成 Agent 场景里,工具调用失败是家常便饭。
比如你定义了一个「搜索数据库」的工具,参数是 SQL 查询字符串。如果用户输入包含特殊字符,SQL 解析可能报错。
如果你在 astream_events 里监听 on_tool_end 事件,你可以在工具返回错误时立即捕获:
async for event in agent.astream_events(
input_state,
config={"tags": ["production-agent"]},
version="v1"
):
if event["event"] == "on_tool_end":
if event.get("error"):
# 立即触发告警或降级逻辑
await notify_slack(f"Tool {event['name']} failed: {event['error']}")
# 决定是重试还是跳过
await execute_fallback()
你不需要等到整个 Agent 执行完毕才知道哪里出了问题。实时事件流让你可以在错误发生的第一时间做出响应。

线上报错了,AST 都跑完了才告诉我有个 tool 调用失败
3.5 面试追问:怎么把原始事件组装成可读状态
这是面试官会追问的进阶问题:astream_events 输出的是扁平事件流,但你最终要给用户展示的是「当前 Agent 思考到哪里了」。这两个东西之间的翻译怎么做?
标准答案是:维护一个状态机。你在 event handler 里累积事件,根据事件类型更新内部状态。
比如 on_chain_start 加一个节点,on_chain_end 标记完成,on_text_delta 追加到当前节点的输出内容里。
前端拿到的是你维护的状态机,而不是原始事件列表。
四、Streaming 模式下的状态管理与 checkpoint
4.1 Checkpoint 与 stream_mode 的共存关系
LangGraph 的 checkpoint 机制是它的核心优势之一:可以把图状态持久化到 SQLite、PostgreSQL 或 Redis,Agent 可以在任意节点中断并恢复。
一个常见的误解是:streaming 和 checkpoint 是互斥的——流式输出的时候状态在不断变化,怎么 checkpoint?
实际上,checkpoint 和 stream_mode 完全兼容。checkpoint 记录的是图的完整状态快照,包括当前节点的输入输出、中间变量、执行位置。
streaming 只是在这个快照的基础上,把变化实时推送给你。
两者协同工作的典型场景是:Agent 在执行一个长任务(比如生成一篇万字研报),用户突然点击「暂停」检查中间结果。
这时候 checkpoint 保存当前进度,streaming 停止推送,用户可以审查、修改、甚至重新注入上下文,然后 Agent 继续执行。
4.2 Human-in-the-loop:流式过程中的人工审查与中断
Human-in-the-loop(人机协同)是 LangGraph 的核心设计目标之一。
官方文档说:「Prevent agents from veering off course with easy-to-add moderation and quality controls. Add human-in-the-loop checks to steer and approve agent actions」3。
在 streaming 场景下,这意味着用户可以在 Agent 执行过程中介入。比如:
# 定义一个需要人工确认的节点
def human_review_node(state):
return state
# 在图中插入条件分支
graph.add_node("human_review", human_review_node)
graph.add_conditional_edges(
"llm_node",
lambda state: "human_review" if state.get("needs_review") else "output",
{"human_review": "human_review", "output": "END"}
)
当 Agent 执行到这个节点时,streaming 会暂停,等待人类输入。用户确认后,Agent 继续执行。
4.3 中断恢复后 streaming 是否重放已生成内容
这是面试里问得不多,但实际开发时很容易踩坑的问题:Agent 中断后恢复,streaming 会从头重放所有已生成的内容吗?
答案是:不会。
LangGraph 的 checkpoint 包含完整的执行状态,包括「执行到第几步」。恢复时,Agent 直接从断点继续,不会在流式输出里重复已生成的内容。
用户看到的是从断点开始的新的 token 流,而不是一遍又一遍的重放。
五、partial message 与 UI 渲染:生产落地的最后一公里
5.1 partial message 概念与增量渲染
流式输出的前端渲染不是简单的「收到什么就显示什么」。
LLM 生成的内容是一个 partial message——它还在进行中,内容可能变化。如果前端直接显示每个 chunk,会出现文字抖动、内容回退等问题。
正确的做法是维护一个前端状态机:
收到 on_text_delta,把 chunk 追加到当前内容
显示时用稳定的 ID 标记每个 token,允许后验修改
收到 on_chain_end,标记当前消息为「已完成」,不再接受后验修改
5.2 SSE / WebSocket 对接 LangGraph streaming
LangGraph 的 astream 输出的是异步迭代器,你需要把它桥接到前端能消费的协议。主流方案有两个:
SSE(Server-Sent Events):轻量、单向,前端只接收不发送。适合 Chat 场景,你的用户发送消息后等着收流式响应。实现简单,但不支持双向通信。
WebSocket:双向通信,可以支持更复杂的交互(比如用户在 Agent 执行过程中实时注入指令)。实现复杂度高,但能力更强。
对于大部分 Chat Agent 场景,SSE 足够。如果你做的是类似 Copilot 的实时协作场景,需要用户在 Agent 执行过程中实时反馈,WebSocket 更合适。
5.3 错误处理:tool 超时后已流出部分如何处理
这是一个真实的生产问题:tool 调用超时,但前面已经有部分输出流到用户了,这时候怎么处理?
标准方案是三层兜底:
前端截断:检测到 tool_error 事件后,停止追加新内容,显示「生成中断」提示
状态回滚:通过 checkpoint 恢复到一个一致的 state(不包含错误 tool 的输出)
重试或降级:根据错误类型决定是重试 tool 还是走降级路径(跳过这个 tool,用已有 context 继续生成)

tool 超时了,但用户已经看到一半了,这时候该怎么说
5.4 背压问题:on_text_delta 事件过密的批量打包方案
在高吞吐场景下,LLM 每秒可能吐出几十个 token,每个 token 都会触发一个 on_text_delta 事件。
如果前端处理不过来,会出现背压——事件在 channel 里堆积,甚至丢消息。
解决方案是批量打包:在 event handler 里收集一定数量的 delta(比如 10 个),或者等待一定时间窗口(比如 50ms),然后一次性推送给前端。
class BatchHandler:
def __init__(self, batch_size=10, batch_timeout=0.05):
self.buffer = []
self.batch_size = batch_size
self.batch_timeout = batch_timeout
async def handle(self, event):
if event["event"] == "on_text_delta":
self.buffer.append(event["data"])
if len(self.buffer) >= self.batch_size:
await self.flush()
async def flush(self):
# 一次性发送,减少网络往返
await ws.send(json.dumps({"chunks": self.buffer}))
self.buffer = []
50ms 的批量窗口意味着你的最大延迟是 50ms,但吞吐量可以提升数倍。这是一个典型的延迟换吞吐的工程取舍。
六、面试高频追问:astream vs astream_events 选型决策
6.1 核心区分:面向用户内容流 vs 面向开发者事件流
面试时被问「astream 和 astream_events 有什么区别」,最简洁的回答是:
astream:给用户看的。输出 token 级内容流,用来渲染 Chat 界面。
astream_events:给工程师看的。输出全链路结构化事件,用来做调试、可观测性、链路追踪。
这个区分本身就是一个设计决策:谁需要这个数据?数据用来干什么?
6.2 选型原则
不是什么场景都要用 astream_events。它的开销比 astream 大——事件类型多、字段多、需要额外的事件处理逻辑。
在低并发场景下可能不明显,但在高并发场景下,每个 run 都要生成完整事件流,开销是实打实的。
选 astream 的场景:
End-user Chat 界面
简单客服 Agent
对延迟敏感的消费级应用
选 astream_events 的场景:
生产调试
可观测性集成(LangSmith)
链路追踪与性能分析
需要对工具调用做实时监控的场景

正文图解 1
6.3 生产共存:两者同时开启的架构设计
在真实生产环境里,你可能需要同时开启两个 API。End-user 需要 astream 渲染界面,运维团队需要 astream_events 做监控告警。
但同时开启要注意事件处理的异步性——不要让 astream_events 的处理影响 astream 的输出延迟。建议把事件处理拆成两个独立通道:
async def main():
# 主通道:给用户看的流
user_stream = agent.astream(user_input)
# 旁路:给监控用的事件流
event_stream = agent.astream_events(user_input, version="v1")
# 两条通道独立跑,不互相阻塞
await asyncio.gather(
stream_to_sse(user_stream),
stream_to_langsmith(event_stream)
)
七、易错点与高风险误答清单
误答 1:astream_events 就是日志库
这是最常见的误解。astream_events 不是日志库,不是你拿来存日志的,它是结构化事件流。
它的每个事件都有明确语义(on_chain_start、on_tool_end),而不是「ERROR: something failed」这种字符串。
你可以用它做日志,但它的价值远不止日志——你可以在事件流里实时响应、动态修改执行路径、甚至注入人工决策。
如果面试官追问「你们怎么用 astream_events 做可观测性」,正确答案是:event handler 解析事件类型,把关键事件(tool_error、slow_node)转发到 LangSmith 或自己的 APM 平台,同时在内存里维护执行状态机用于前端渲染。
误答 2:streaming 和 checkpoint 互斥
前面已经说过了,这是个误解。两者完全兼容,checkpoint 记录状态快照,streaming 实时推送变化。
如果面试官追问「你们怎么做断点续传」,正确答案是:checkpoint 保存完整状态包括执行位置,恢复时从断点继续,不会重放已流出的内容。
误答 3:stream_mode 混用导致 UI 抖动
如果你在前端用 values 模式渲染完整状态,但更新频率又很高(比如每秒更新 20 次),前端会出现明显的抖动——每次状态更新都重新渲染整个界面,而不是增量更新。
解决方案:用 updates 模式做增量渲染,或者在 values 模式下做 diff,只更新变化的 DOM 节点。
面试官继续追问的口子
「如果用户在 Agent 执行过程中动态修改 system prompt,怎么处理?」
这是一个边缘场景,但不是没人问。动态修改 system prompt 意味着当前 run 的上下文变了,但已经流出的内容不能回退。正确答案是:
先让当前 run 继续执行完(避免中断导致状态不一致)
新 run 使用新的 system prompt
如果必须中断,使用 checkpoint 回滚到一致状态,然后重新注入新 prompt
八、项目落地:我在真实 Agent 项目里是怎么用 Streaming 的
8.1 案例:研报生成 Agent(LangGraph 多节点图)
研报生成是 LangGraph 的典型场景。一个完整的研报需要多个步骤:信息采集 → 数据分析 → 观点生成 → 格式排版 → 审核发布。每个步骤都可能调用工具、出错、需要人工确认。
我们用 astream_events 监听整个链路,把每个节点的状态实时推到前端:
class ReportAgentMonitor:
def __init__(self):
self.status = "idle"
self.current_node = None
self.progress = 0.0
async def handle(self, event):
if event["event"] == "on_chain_start":
self.current_node = event["name"]
self.status = "running"
await self.notify_ui()
elif event["event"] == "on_chain_end":
self.progress += 0.25 # 假设有4个主要节点
self.status = "complete" if self.progress >= 1.0 else "running"
await self.notify_ui()
elif event["event"] == "on_tool_error":
self.status = "error"
await self.notify_slack(f"Tool {self.current_node} failed")
await self.execute_fallback()
# SSE 桥接
async def stream_to_sse(monitor, user_ws):
async for event in agent.astream_events(initial_input, version="v1"):
await monitor.handle(event)
# 把状态机变更推给前端
await user_ws.send(json.dumps(monitor.get_state()))
前端拿到的是我们维护的状态机(当前节点、进度、错误状态),而不是原始事件流。
用户看到的是「正在分析数据... 75%」这样的进度条,而不是一堆 on_chain_start、on_chain_end 事件。

第一次用 astream_events 时,我直接把原始事件推给前端,前端同事差点打我
8.2 踩坑 1:tool_call JSON blob 太大导致卡顿
研报 Agent 里的一个工具是「查询数据库」——它要返回一个很大的 JSON,里面可能包含几千行数据。
一开始我们直接用 astream_events 的 on_tool_end 事件把整个 JSON 推出去。结果:JSON 太大,序列化耗时,stream 出现明显卡顿。
用户看到的是「查询完成」后光标停了好几秒才开始下一段输出。
解决方案:把 tool 返回分成两个通道。一个轻量通道只返回「执行成功」+「结果 ID」,前端显示查询完成;另一个重通道通过单独的 API 获取结果。
# 轻量通道:快速响应
async def lightweight_tool_response(tool_name, result_id):
return {
"tool": tool_name,
"status": "success",
"result_id": result_id, # 前端用这个 ID 单独拉结果
"preview": "查询完成,共 3247 条记录"
}
# 重通道:完整结果(不通过 streaming)
async def get_full_result(result_id):
# 通过普通 HTTP API 获取,不走事件流
return await db.fetch(result_id)
8.3 踩坑 2:on_text_delta 过密导致背压
研报生成时,LLM 每秒可能吐出 20-30 个 token,每个 token 都触发一个 on_text_delta 事件。
前端 websocket 处理不过来,出现背压——事件在 channel 里堆积,用户看到的是输出越来越慢。
解决方案是批量打包,50ms 的窗口合并 10 个 delta 一起发。
效果:实际延迟增加 50ms,但吞吐量从每秒 20 条消息降到每秒 2 条消息(20 条 delta 合并成 1 条),前端压力大幅降低。用户体验几乎不受影响,但系统稳定性好了很多。

50ms 批量窗口这个参数调了我一下午
九、参考文献与来源
4: LangGraph 官方文档 - Streaming 模式:https://www.langchain.com/langgraph
5: LangChain + LangGraph Agent 完全指南:流式传输、结构化输出、提示词缓存:https://juejin.cn/post/7625210636653969418
6: Agent 构建必选框架——LangGraph工程落地手册:https://cloud.tencent.com/developer/article/2650620
7: OpenAI Agents SDK vs LangGraph vs CrewAI: 2026 Matrix:https://www.digitalapplied.com/blog/openai-agents-sdk-vs-langgraph-vs-crewai-matrix-2026
1: Anthropic ManagedAgents 深度研究:解耦大脑与双手的架构哲学:https://magicliang.github.io/2026/04/14/Anthropic-Managed-Agents深度研究/
8: GitHub - didilili/ai-agents-from-zero: 2026 最系统的 AI Agent 速成指南:https://github.com/didilili/ai-agents-from-zero