端到端协同
一次 sendMessage 从 click 到 DOM 更新的完整旅程 / UIMessageChunk 25 种类型的全量参考 / abort 双向传播的 4 环 / resume 的端到端实现模板 / 4 类 error 路径分类 / SSE 环境层陷阱
为什么需要这一页
前五页按侧切:发送端四页、接收端一页。但实战里的麻烦往往是纵向贯穿的——“一次点击发送到 DOM 更新中间究竟发生了什么”、“为什么 client 停了但 server 还在烧 token”、“resume 到底怎么落地”——这些问题哪一页都答不完整。
本页把前五页串成一个端到端视图:
- 一次
sendMessage的完整调用链(客户端 + 服务端 + 传输层) - 所有
UIMessageChunk类型的含义和触发时机(wire 协议的唯一入口) - 三个真正双端的协议——abort / resume / error——只看一侧是 debug 不出来的
不重复前五页的概念,只讲”合在一起怎么跑”。
一次 sendMessage 的完整旅程
以最典型的场景开始: 用户在 textarea 输入一句话,点提交。下面是点击到 DOM 更新的完整调用链。
每一环在源码中的位置:
| 环节 | 位置 | 关键调用 |
|---|---|---|
| UI → hook | 你的组件 | sendMessage({ text }) |
| hook → Chat | @ai-sdk/react dist/index.js | AbstractChat.sendMessage |
| Chat → transport | [email protected] AbstractChat.makeRequest | transport.sendMessages(opts) |
| transport → wire | DefaultChatTransport.sendMessages | fetch(api, { method: 'POST', body, signal }) |
| wire → server | 你的路由 handler | req.json() → 拿到 messages: UIMessage[] |
| 消息桥接 | [email protected] convertToModelMessages | UIMessage[] → ModelMessage[] |
| 模型调用 | [email protected] streamText (dist:6441) | 返回 StreamTextResult |
| 转 UI 流 | StreamTextResult.toUIMessageStream (dist:7839) | fullStream.pipeThrough(TransformStream) |
| SSE 封装 | toUIMessageStreamResponse / createUIMessageStreamResponse | new Response(stream, { headers: sseHeaders }) |
| 服务端 → wire | Node / Bun / Edge runtime | HTTP 流写 |
| wire → client | HttpChatTransport.processResponseStream | 解析 SSE data: ...\n\n 帧 → UIMessageChunk |
| chunk → state | AbstractChat 内部 chunk 分派 | pushMessage / replaceMessage |
| state → DOM | React 订阅 + throttle | useSyncExternalStore + re-render |
UIMessageChunk 全类型参考
这是 client ↔ server 之间唯一的 wire 协议——每个 SSE 帧解析出来就是其中一个 type。完整 union 定义在
[email protected]/dist/index.d.ts:2158-2273。
按作用分 7 组:
1. 流生命周期控制
| type | 何时发 | 客户端处理 |
|---|---|---|
start | 流的第一个 chunk, messageId 和 message-level metadata 注入 | 创建新 assistant message 对象 |
start-step | agent 每步开始 | 在当前 message 上开一个新 step |
finish-step | agent 每步结束 | 触发 L3 onStepFinish; step 边界标记 |
finish | 整流结束, 含 finishReason | 触发 L3 onFinish; status → ready |
abort | 服务端收到 abort 信号后发出 | onFinish({ isAbort: true, ... }) |
error | 服务端捕获异常, errorText 由 onError 返回值决定 | onError(new Error(errorText)) + isError: true |
2. 文本内容
| type | 含义 | 客户端处理 |
|---|---|---|
text-start | 一段文本 part 开始, id 标识这段 part | 在 message.parts push 一个 TextUIPart, text="" |
text-delta | 增量 | 追加到对应 id 的 TextUIPart.text |
text-end | 本段 part 结束 | 标记 complete (可选触发 tree-shake) |
id
相同的 start/delta/end 组成一段文本。不同 id 组成不同段(例如模型先输出一段 → 调用工具 → 再输出一段,这就是两个不同 id 的 text
part)。
3. 推理 / Thinking
| type | 含义 | 客户端处理 |
|---|---|---|
reasoning-start / reasoning-delta / reasoning-end | Anthropic thinking 块 / OpenAI reasoning trace | 结构和 text 一样, 但在 UI 上可折叠显示 |
4. 工具调用
| type | 含义 | 客户端处理 (ToolUIPart.state) |
|---|---|---|
tool-input-start | 工具调用开始, 参数还在流式生成 | state = 'input-streaming' |
tool-input-delta | 参数文本增量 (JSON 串的一段) | 累加到 input 字段的”正在生成”形态 |
tool-input-available | 参数完整 | state = 'input-streaming' → 'input-available', input 字段解析完成 |
tool-input-error | 参数解析失败 / schema 校验失败 | state = 'output-error', errorText 注入 |
tool-approval-request | 工具需要人工审批(罕见) | state = 'approval-requested' |
tool-output-available | 工具执行完成 | state = 'output-available', output 字段注入 |
tool-output-error | 工具执行失败 | state = 'output-error' |
tool-output-denied | 审批被拒 | state = 'approval-responded' (denied) |
dynamic-tool 变体: 如果工具是运行时发现的 (MCP / 动态注册), chunk 里会带 dynamic: true,客户端应该把它渲染为
DynamicToolUIPart (type: 'dynamic-tool') 而不是 tool-${name} 的静态形态。
5. 源引用 / 文件
| type | 含义 |
|---|---|
source-url | 引用了一个 URL (RAG / 搜索) |
source-document | 引用了一个文档 (PDF / markdown) |
file | 本次回答附带一个文件 (图像 / 音频 / …) |
6. 自定义业务事件
{
type: `data-${string}`; // 例如 data-progress, data-todos-update, data-run-init
data: unknown; // 业务方自定义 shape
id?: string;
transient?: boolean; // true = 客户端只在 onData 里看到, 不写入 message.parts
}
这套协议的详解见 UI 流编排 → 自定义 data-* 事件协议 和 useChat → onData。
7. 消息级元数据
| type | 含义 |
|---|---|
message-metadata | 给当前 message 打元数据 (延迟到 message 级别写入, 不是 part 级别) |
总表速查
共 25 个 type (把 data-${string} 算一个):
| 组 | 数量 | types |
|---|---|---|
| 生命周期控制 | 6 | start / start-step / finish-step / finish / abort / error |
| 文本 | 3 | text-start / text-delta / text-end |
| 推理 | 3 | reasoning-start / reasoning-delta / reasoning-end |
| 工具 | 8 | tool-input-start / tool-input-delta / tool-input-available / tool-input-error / tool-approval-request / tool-output-available / tool-output-error / tool-output-denied |
| 源 / 文件 | 3 | source-url / source-document / file |
| 自定义 | 1 | data-${string} |
| 消息元数据 | 1 | message-metadata |
Abort 双向传播 —— 最容易漏的那一环
用户点 stop 按钮,或者关 tab,或者网络断。这个 signal 必须完整地从 client 传到 server 再传到 LLM provider,中间任何一环漏了都会导致:
- 客户端显示”已停止”,但服务端在继续烧 token
- LLM 调用完成后写进 DB, 但用户已经走了
- 长 tool 执行(shell / 网络请求) 无法中止
完整的四环链路:
四环漏哪一环什么症状
| 漏的环 | 症状 |
|---|---|
Client 的 chat.stop() 没调 | 用户按了 stop 但流继续 —— 检查 stop 按钮 handler |
| fetch 的 signal 没传 | client 中止但 HTTP 连接还活着 —— DefaultChatTransport 已正确处理, 自定义 transport 容易漏 |
Server 没读 request.signal | server 继续执行 —— 最常见的漏环, 检查路由 handler |
streamText 没接 abortSignal | LLM 调用继续 —— 账单继续跑 |
tool.execute 忽略自己的 abortSignal | 长工具继续跑 (shell 命令、长 fetch) |
正确模板 (Hono)
import { convertToModelMessages, streamText } from "ai";
import { Hono } from "hono";
app.post("/api/chat", async (c) => {
const { messages } = await c.req.json();
const result = streamText({
model,
messages: await convertToModelMessages(messages),
tools,
abortSignal: c.req.raw.signal, // ← 关键: Hono 的 raw request 里有 signal
// 工具 execute 签名里一定接收 { abortSignal } 并传给下游 fetch / 子进程
});
return result.toUIMessageStreamResponse();
});
正确模板 (Next.js Route Handler)
export async function POST(req: Request) {
const { messages } = await req.json();
const result = streamText({
model,
messages: await convertToModelMessages(messages),
tools,
abortSignal: req.signal, // ← 关键
});
return result.toUIMessageStreamResponse();
}
Tool 内部的 abort 规范
const searchTool = tool({
description: "...",
inputSchema: z.object({ query: z.string() }),
execute: async ({ query }, { abortSignal }) => {
const res = await fetch(`https://api.example.com/search?q=${query}`, {
signal: abortSignal, // ← 这一行, 长 fetch 才会被中止
});
return res.json();
},
});
Resume 端到端实现
前面 useChat 页只讲了 client 侧一行 resume: true,server 侧才是大头。这一节给出完整的双端实现模式。
核心约束
- chunk 必须可 replay: 重连时要把已产生的所有 chunk 重发一遍, 然后接着推新 chunk。这要求 server 边生成边缓冲
- chat id 必须稳定: 客户端 mount 时
useChat({ id })必须用同一个 id, 不然服务端找不到缓冲 - 存储必须跨进程可见: 单进程内存 buffer 只能解决单 worker 重连, 多 worker 部署或 pod 重启都会丢 —— 生产用 Redis stream / Pub-Sub
客户端行为
useChat({
id: chatId, // 稳定, 来自 URL / props
resume: true, // mount 时自动调 transport.reconnectToStream({ chatId })
});
DefaultChatTransport.reconnectToStream 会向 ${api}?chatId={id} 发 POST。如果返回 null 或空流, client 进 ready
状态, 不做 replay; 如果返回有内容的流, client 按正常 stream 消费。
服务端实现 (Redis Stream 方案)
伪代码,核心思路:
// POST /api/chat — 新对话
app.post("/api/chat", async (c) => {
const body = await c.req.json();
const chatId = body.id;
const result = streamText({...});
const uiStream = result.toUIMessageStream();
// 关键: tee 一份写进 Redis, 另一份返回给客户端
const [forClient, forBuffer] = uiStream.tee();
// 异步把 forBuffer 的每个 chunk 写进 Redis stream (keyed by chatId)
ctx.waitUntil(writeStreamToRedis(chatId, forBuffer));
return new Response(forClient, { headers: sseHeaders });
});
// POST /api/chat?chatId=xxx — 重连
app.post("/api/chat", async (c) => {
const reconnectId = c.req.query("chatId");
if (reconnectId) {
// 查 Redis, 如果 stream 存在就返回 (replay 已有 + 订阅新)
const bufferedStream = await readStreamFromRedis(reconnectId);
if (bufferedStream) {
return new Response(bufferedStream, { headers: sseHeaders });
}
// stream 已完成或不存在 → 返回空, 客户端回到 ready
return new Response(null, { status: 204 });
}
// ... 正常新对话路径
});
幂等性保证
- Server 端每个 chunk 可带一个自增 seq number (用 Redis XADD 原生的
id字段即可) - Client 端在 reconnect 时可以传上次收到的最后一个 seq (
?chatId=xxx&lastSeq=42) - Server 只 replay
seq > lastSeq的 - 这样避免重连时对 text-delta 重复应用 (text-delta 是增量, 重放两次就是 doubled 内容)
DefaultChatTransport 的默认实现没有自动 seq 去重——要做精确去重需要自定义 prepareReconnectToStreamRequest
和服务端路由配合。对多数场景: agent 步循环里一步一个完整 text-start → text-delta → text-end 块, text-end 后就持久化,
client 端重建 message.parts 时天然能 dedupe by part id。
常见 pitfall
| 错误 | 真相 |
|---|---|
| 用内存 Map 存 buffer | 单 pod 重启就全丢 |
| 不设 TTL | Redis 里越堆越多 (建议 24-48h TTL) |
| chatId 随机生成 | 客户端刷页就对不上, resume 永远失败 |
| 重连时直接调 streamText | 绕过了 buffer, LLM 再跑一次 —— 账单翻倍 |
忘了 tee() | 写 buffer 就不能发客户端, 反之亦然 |
Error 四种路径
服务端错误到客户端 onError 的路径不是一条——有四种完全不同的传递方式,对应完全不同的网络层面行为:
| 路径 | 何时发生 | HTTP 层面 | Stream 层面 | Client 如何收到 |
|---|---|---|---|---|
| A. 请求前置错 (4xx) | 鉴权失败 / 参数校验失败 / rate limit | 返回 4xx, 有 body | 流没开始 | fetch 返回 non-ok response → transport throws → onError(new Error("HTTP 401 ...")) |
| B. 请求前置错 (5xx) | server 启动时异常 / 依赖初始化失败 | 返回 5xx, 有 body | 流没开始 | 同上, 但 message 是 5xx 内容 |
| C. 流中错 (agent 内部) | streamText 异常 / 工具异常 | HTTP 200, stream 已开始 | 发出 error chunk | chunk 被 parse → onError(new Error(errorText)) + onFinish({ isError: true }) |
| D. 连接中断 | TCP 断 / 代理超时 / client 网络坏 | HTTP 200, stream 开始后中断 | 无 error / finish chunk | fetch reject → onFinish({ isDisconnect: true }) |
每条路径的 client 侧信号
useChat({
onError: (error) => {
// 只触发路径 A / B / C
// 路径 D 不走这里, 走 onFinish 的 isDisconnect
},
onFinish: ({ isAbort, isDisconnect, isError, finishReason }) => {
// 四种状态互斥:
// - 正常完成: 三个 flag 都是 false, finishReason = 'stop' / 'length' / 'tool-calls' / ...
// - 用户 stop: isAbort = true
// - 网络断: isDisconnect = true (路径 D)
// - 流中错: isError = true (路径 C)
// - 路径 A / B: onFinish 不触发 (流都没开始)
},
});
服务端正确分层
路径 A / B (前置错): 正常 throw, 返回 HTTP 错误 response:
app.post("/api/chat", async (c) => {
if (!c.req.header("Authorization")) {
return c.json({ error: "Unauthorized" }, 401); // ← 路径 A
}
// ...
});
路径 C (流中错): 走 onError 序列化器:
const result = streamText({...});
return result.toUIMessageStreamResponse({
onError: (error) => {
// 关键: 这个返回值会被写进 error chunk 的 errorText
// 不要直接 return error.message — 可能暴露内部栈信息
log.error("stream.failed", error);
return "Internal error, please retry.";
},
});
如果 onError 返回 string, 流中断前会发一个 { type: "error", errorText: "..." } chunk, client 的 onError 和
onFinish({ isError: true }) 都会触发。
路径 D (连接断): 服务端通常无法察觉(或只能通过 request.signal 间接察觉)。client 靠 fetch reject 判定。
客户端错误恢复的四种策略
| 情形 | 策略 | 实现 |
|---|---|---|
isAbort | 不做事,用户主动的 | onFinish 里什么都不干 |
isDisconnect | 暴露重连 | 显示”重连?”按钮, 调 resumeStream() |
isError | 显示错误, 可选回滚 | onError + setMessages 去掉最后一条 user msg |
| 路径 A / B | 表单错误提示 | onError 里判断 error.message 来区分 (或服务端返回结构化 JSON) |
SSE 环境层陷阱
流式 SSE 对网络路径上的每一层都有要求。生产部署时这些层容易出问题:
反向代理缓冲
nginx 默认会缓冲 response,直到收完一定字节才发。SSE 流被缓冲 = client 看到的是”一次性一大堆”而不是”陆续小块”。
location /api/chat {
proxy_pass http://app;
proxy_buffering off; # ← 关键
proxy_cache off;
proxy_set_header Connection "";
proxy_http_version 1.1;
chunked_transfer_encoding on;
}
Cloudflare
默认启用 Auto Minify 和某些 compression 可能打断 SSE。在 Cloudflare dashboard 关掉路径 /api/chat 的 Auto Minify /
Rocket Loader / Speed features。或者给 response 加 header:
return result.toUIMessageStreamResponse({
headers: {
"X-Accel-Buffering": "no", // 通用绕过 (nginx / some CDN 认)
"Cache-Control": "no-cache",
},
});
Service Worker
如果你的 app 注册了 PWA service worker, 它可能把 POST 请求缓存或重放。显式排除 /api/chat 路径:
self.addEventListener("fetch", (event) => {
if (event.request.url.includes("/api/chat")) {
return; // 让 browser 原生处理, 不走 SW
}
// ...
});
浏览器连接数
HTTP/1.1 每个 origin 最多 6 个并发连接。SSE 流本身占一个, 页面里再用 XHR 拉数据容易撞上限。部署用 HTTP/2 或 HTTP/3 (Vercel / Cloudflare 默认都是) 避免。
负载均衡器超时
AWS ALB / nginx / Cloudflare 的默认 idle timeout 一般 60s - 5min。一个 agent 跑长任务(10+ 分钟) 可能超时断连。要么:
- 调大 LB idle timeout
- 或让 server 每隔 15-30s 主动 flush 一个 no-op 事件(例如 transient
data-heartbeat)保持连接活跃
延伸阅读
本章其他页
- 运行生命周期 — 12 个回调在时间轴上的位置
- UI 流编排 —
createUIMessageStream/writer/ data-* 事件协议 - 消息引用模型 — UIMessage / ModelMessage 的内部模型
- prepareStep 语义 — 最深的步钩子
- 客户端消费 (useChat) — 接收端 API 全解
SDK 源码锚点
[email protected]——dist/index.d.ts:2158-2273(UIMessageChunk完整 union 定义)[email protected]——dist/index.d.ts:2150-2156(DataUIMessageChunk+ transient)[email protected]——dist/index.js:5101-5198(createUIMessageStreamResponseSSE 封装)@ai-sdk/[email protected]——dist/index.js(AbstractChatstream 消费逻辑)
外部参考
- MDN — Server-Sent Events (SSE 协议规范)
- Vercel AI SDK Resumable Streams — 官方 resume 实现指南
- nginx SSE config — 反向代理缓冲设置