端到端协同

一次 sendMessage 从 click 到 DOM 更新的完整旅程 / UIMessageChunk 25 种类型的全量参考 / abort 双向传播的 4 环 / resume 的端到端实现模板 / 4 类 error 路径分类 / SSE 环境层陷阱

为什么需要这一页

前五页按切:发送端四页、接收端一页。但实战里的麻烦往往是纵向贯穿的——“一次点击发送到 DOM 更新中间究竟发生了什么”、“为什么 client 停了但 server 还在烧 token”、“resume 到底怎么落地”——这些问题哪一页都答不完整。

本页把前五页串成一个端到端视图:

  • 一次 sendMessage 的完整调用链(客户端 + 服务端 + 传输层)
  • 所有 UIMessageChunk 类型的含义和触发时机(wire 协议的唯一入口)
  • 三个真正双端的协议——abort / resume / error——只看一侧是 debug 不出来的

不重复前五页的概念,只讲”合在一起怎么跑”。

一次 sendMessage 的完整旅程

以最典型的场景开始: 用户在 textarea 输入一句话,点提交。下面是点击到 DOM 更新的完整调用链。

sequenceDiagram actor U as User participant UI as React Component participant Hook as useChat participant Chat as AbstractChat participant T as DefaultChatTransport participant F as fetch / Browser participant R as Server Route participant ST as streamText / Agent participant M as LLM Provider rect rgba(187, 247, 208, 0.18) Note over U,Chat: ① 点击 → 客户端状态更新 U->>UI: click submit UI->>Hook: sendMessage({ text }) Hook->>Chat: sendMessage(message, opts) Chat->>Chat: pushMessage(userMsg)<br/>status = submitted end rect rgba(254, 243, 199, 0.18) Note over Chat,R: ② 网络交接 Chat->>T: sendMessages({ trigger, chatId,<br/>messages, abortSignal }) T->>F: fetch POST /api/chat<br/>body: { messages, ... }<br/>signal: abortSignal F->>R: HTTP POST (SSE) R->>R: await convertToModelMessages(uiMessages) R->>ST: streamText({ model, messages,<br/>abortSignal: request.signal }) ST->>M: provider call (streaming) Note over R,F: HTTP 连接已握手 — 还没 flush 第一个 byte Note over Chat: status 仍是 submitted end rect rgba(199, 210, 254, 0.18) Note over Chat,M: ③ 流式回传 M-->>ST: first chunk ST-->>R: text-start / text-delta ... R-->>F: SSE frame: data: {...} F-->>T: ReadableStream(Uint8Array) T->>T: processResponseStream()<br/>→ ReadableStream(UIMessageChunk) T-->>Chat: stream of UIMessageChunk Chat->>Chat: 首个 chunk 到达<br/>status = streaming end loop 每个 chunk Chat->>Chat: 按 type 分派<br/>(parts 增量更新) Chat-->>Hook: 通知订阅者 Hook-->>UI: re-render (throttled) UI-->>U: DOM 更新 end rect rgba(52, 211, 153, 0.18) Note over U,M: ④ 收尾 ST-->>R: finish chunk R-->>T: SSE 流结束 T-->>Chat: finish Chat->>Chat: status = ready<br/>onFinish({ message, ... }) Hook-->>UI: 最终 re-render end

每一环在源码中的位置:

环节位置关键调用
UI → hook你的组件sendMessage({ text })
hook → Chat@ai-sdk/react dist/index.jsAbstractChat.sendMessage
Chat → transport[email protected] AbstractChat.makeRequesttransport.sendMessages(opts)
transport → wireDefaultChatTransport.sendMessagesfetch(api, { method: 'POST', body, signal })
wire → server你的路由 handlerreq.json() → 拿到 messages: UIMessage[]
消息桥接[email protected] convertToModelMessagesUIMessage[]ModelMessage[]
模型调用[email protected] streamText (dist:6441)返回 StreamTextResult
转 UI 流StreamTextResult.toUIMessageStream (dist:7839)fullStream.pipeThrough(TransformStream)
SSE 封装toUIMessageStreamResponse / createUIMessageStreamResponsenew Response(stream, { headers: sseHeaders })
服务端 → wireNode / Bun / Edge runtimeHTTP 流写
wire → clientHttpChatTransport.processResponseStream解析 SSE data: ...\n\n 帧 → UIMessageChunk
chunk → stateAbstractChat 内部 chunk 分派pushMessage / replaceMessage
state → DOMReact 订阅 + throttleuseSyncExternalStore + re-render

UIMessageChunk 全类型参考

这是 client ↔ server 之间唯一的 wire 协议——每个 SSE 帧解析出来就是其中一个 type。完整 union 定义在 [email protected]/dist/index.d.ts:2158-2273

按作用分 7 组:

1. 流生命周期控制

type何时发客户端处理
start流的第一个 chunk, messageId 和 message-level metadata 注入创建新 assistant message 对象
start-stepagent 每步开始在当前 message 上开一个新 step
finish-stepagent 每步结束触发 L3 onStepFinish; step 边界标记
finish整流结束, 含 finishReason触发 L3 onFinish; status → ready
abort服务端收到 abort 信号后发出onFinish({ isAbort: true, ... })
error服务端捕获异常, errorTextonError 返回值决定onError(new Error(errorText)) + isError: true

2. 文本内容

type含义客户端处理
text-start一段文本 part 开始, id 标识这段 partmessage.parts push 一个 TextUIPart, text=""
text-delta增量追加到对应 id 的 TextUIPart.text
text-end本段 part 结束标记 complete (可选触发 tree-shake)

id 相同的 start/delta/end 组成一段文本。不同 id 组成不同段(例如模型先输出一段 → 调用工具 → 再输出一段,这就是两个不同 id 的 text part)。

3. 推理 / Thinking

type含义客户端处理
reasoning-start / reasoning-delta / reasoning-endAnthropic thinking 块 / OpenAI reasoning trace结构和 text 一样, 但在 UI 上可折叠显示

4. 工具调用

type含义客户端处理 (ToolUIPart.state)
tool-input-start工具调用开始, 参数还在流式生成state = 'input-streaming'
tool-input-delta参数文本增量 (JSON 串的一段)累加到 input 字段的”正在生成”形态
tool-input-available参数完整state = 'input-streaming' → 'input-available', input 字段解析完成
tool-input-error参数解析失败 / schema 校验失败state = 'output-error', errorText 注入
tool-approval-request工具需要人工审批(罕见)state = 'approval-requested'
tool-output-available工具执行完成state = 'output-available', output 字段注入
tool-output-error工具执行失败state = 'output-error'
tool-output-denied审批被拒state = 'approval-responded' (denied)

dynamic-tool 变体: 如果工具是运行时发现的 (MCP / 动态注册), chunk 里会带 dynamic: true,客户端应该把它渲染为 DynamicToolUIPart (type: 'dynamic-tool') 而不是 tool-${name} 的静态形态。

5. 源引用 / 文件

type含义
source-url引用了一个 URL (RAG / 搜索)
source-document引用了一个文档 (PDF / markdown)
file本次回答附带一个文件 (图像 / 音频 / …)

6. 自定义业务事件

{
  type: `data-${string}`;   // 例如 data-progress, data-todos-update, data-run-init
  data: unknown;            // 业务方自定义 shape
  id?: string;
  transient?: boolean;      // true = 客户端只在 onData 里看到, 不写入 message.parts
}

这套协议的详解见 UI 流编排 → 自定义 data-* 事件协议useChat → onData

7. 消息级元数据

type含义
message-metadata给当前 message 打元数据 (延迟到 message 级别写入, 不是 part 级别)

总表速查

25 个 type (把 data-${string} 算一个):

数量types
生命周期控制6start / start-step / finish-step / finish / abort / error
文本3text-start / text-delta / text-end
推理3reasoning-start / reasoning-delta / reasoning-end
工具8tool-input-start / tool-input-delta / tool-input-available / tool-input-error / tool-approval-request / tool-output-available / tool-output-error / tool-output-denied
源 / 文件3source-url / source-document / file
自定义1data-${string}
消息元数据1message-metadata

Abort 双向传播 —— 最容易漏的那一环

用户点 stop 按钮,或者关 tab,或者网络断。这个 signal 必须完整地从 client 传到 server 再传到 LLM provider,中间任何一环漏了都会导致:

  • 客户端显示”已停止”,但服务端在继续烧 token
  • LLM 调用完成后写进 DB, 但用户已经走了
  • 长 tool 执行(shell / 网络请求) 无法中止

完整的四环链路:

Abort 信号双向传播 (Bidirectional Propagation) — 4 环级联 漏任一环 → client 显示"已停止",server 还在烧 token 1. 客户端 (Client) — chat.stop() 用户按 stop 按钮 / 关 tab 2. AbstractChat — AbortController.abort() 内部 controller 的 signal 传入 transport.sendMessages() 3. DefaultChatTransport — fetch(api, { signal }) 浏览器 abort HTTP 请求;网络连接切断 wire: 网络层断连 4. 服务端路由 — request.signal 进 aborted 状态 Hono: c.req.raw.signal — Next.js Route Handler: req.signal 5. streamText({ abortSignal: request.signal }) LLM provider 调用被中止;账单停止 6. tool.execute(input, { abortSignal }) 转发到下游 fetch / 子进程——长执行工具主动 cleanup 99% 的 abort 漏环发生在 4-6 环。最常见:忘了给 streamText / tool.execute 传 abortSignal。

四环漏哪一环什么症状

漏的环症状
Client 的 chat.stop() 没调用户按了 stop 但流继续 —— 检查 stop 按钮 handler
fetch 的 signal 没传client 中止但 HTTP 连接还活着 —— DefaultChatTransport 已正确处理, 自定义 transport 容易漏
Server 没读 request.signalserver 继续执行 —— 最常见的漏环, 检查路由 handler
streamText 没接 abortSignalLLM 调用继续 —— 账单继续跑
tool.execute 忽略自己的 abortSignal长工具继续跑 (shell 命令、长 fetch)

正确模板 (Hono)

import { convertToModelMessages, streamText } from "ai";
import { Hono } from "hono";

app.post("/api/chat", async (c) => {
  const { messages } = await c.req.json();

  const result = streamText({
    model,
    messages: await convertToModelMessages(messages),
    tools,
    abortSignal: c.req.raw.signal, // ← 关键: Hono 的 raw request 里有 signal
    // 工具 execute 签名里一定接收 { abortSignal } 并传给下游 fetch / 子进程
  });

  return result.toUIMessageStreamResponse();
});

正确模板 (Next.js Route Handler)

export async function POST(req: Request) {
  const { messages } = await req.json();

  const result = streamText({
    model,
    messages: await convertToModelMessages(messages),
    tools,
    abortSignal: req.signal, // ← 关键
  });

  return result.toUIMessageStreamResponse();
}

Tool 内部的 abort 规范

const searchTool = tool({
  description: "...",
  inputSchema: z.object({ query: z.string() }),
  execute: async ({ query }, { abortSignal }) => {
    const res = await fetch(`https://api.example.com/search?q=${query}`, {
      signal: abortSignal, // ← 这一行, 长 fetch 才会被中止
    });
    return res.json();
  },
});

Resume 端到端实现

前面 useChat 页只讲了 client 侧一行 resume: true,server 侧才是大头。这一节给出完整的双端实现模式

核心约束

  1. chunk 必须可 replay: 重连时要把已产生的所有 chunk 重发一遍, 然后接着推新 chunk。这要求 server 边生成边缓冲
  2. chat id 必须稳定: 客户端 mount 时 useChat({ id }) 必须用同一个 id, 不然服务端找不到缓冲
  3. 存储必须跨进程可见: 单进程内存 buffer 只能解决单 worker 重连, 多 worker 部署或 pod 重启都会丢 —— 生产用 Redis stream / Pub-Sub

客户端行为

useChat({
  id: chatId, // 稳定, 来自 URL / props
  resume: true, // mount 时自动调 transport.reconnectToStream({ chatId })
});

DefaultChatTransport.reconnectToStream 会向 ${api}?chatId={id} 发 POST。如果返回 null 或空流, client 进 ready 状态, 不做 replay; 如果返回有内容的流, client 按正常 stream 消费。

服务端实现 (Redis Stream 方案)

伪代码,核心思路:

// POST /api/chat — 新对话
app.post("/api/chat", async (c) => {
  const body = await c.req.json();
  const chatId = body.id;

  const result = streamText({...});
  const uiStream = result.toUIMessageStream();

  // 关键: tee 一份写进 Redis, 另一份返回给客户端
  const [forClient, forBuffer] = uiStream.tee();

  // 异步把 forBuffer 的每个 chunk 写进 Redis stream (keyed by chatId)
  ctx.waitUntil(writeStreamToRedis(chatId, forBuffer));

  return new Response(forClient, { headers: sseHeaders });
});

// POST /api/chat?chatId=xxx — 重连
app.post("/api/chat", async (c) => {
  const reconnectId = c.req.query("chatId");
  if (reconnectId) {
    // 查 Redis, 如果 stream 存在就返回 (replay 已有 + 订阅新)
    const bufferedStream = await readStreamFromRedis(reconnectId);
    if (bufferedStream) {
      return new Response(bufferedStream, { headers: sseHeaders });
    }
    // stream 已完成或不存在 → 返回空, 客户端回到 ready
    return new Response(null, { status: 204 });
  }
  // ... 正常新对话路径
});

幂等性保证

  • Server 端每个 chunk 可带一个自增 seq number (用 Redis XADD 原生的 id 字段即可)
  • Client 端在 reconnect 时可以传上次收到的最后一个 seq (?chatId=xxx&lastSeq=42)
  • Server 只 replay seq > lastSeq
  • 这样避免重连时对 text-delta 重复应用 (text-delta 是增量, 重放两次就是 doubled 内容)

DefaultChatTransport 的默认实现没有自动 seq 去重——要做精确去重需要自定义 prepareReconnectToStreamRequest 和服务端路由配合。对多数场景: agent 步循环里一步一个完整 text-start → text-delta → text-end 块, text-end 后就持久化, client 端重建 message.parts 时天然能 dedupe by part id

常见 pitfall

错误真相
用内存 Map 存 buffer单 pod 重启就全丢
不设 TTLRedis 里越堆越多 (建议 24-48h TTL)
chatId 随机生成客户端刷页就对不上, resume 永远失败
重连时直接调 streamText绕过了 buffer, LLM 再跑一次 —— 账单翻倍
忘了 tee()写 buffer 就不能发客户端, 反之亦然

Error 四种路径

服务端错误到客户端 onError 的路径不是一条——有四种完全不同的传递方式,对应完全不同的网络层面行为:

路径何时发生HTTP 层面Stream 层面Client 如何收到
A. 请求前置错 (4xx)鉴权失败 / 参数校验失败 / rate limit返回 4xx, 有 body流没开始fetch 返回 non-ok response → transport throws → onError(new Error("HTTP 401 ..."))
B. 请求前置错 (5xx)server 启动时异常 / 依赖初始化失败返回 5xx, 有 body流没开始同上, 但 message 是 5xx 内容
C. 流中错 (agent 内部)streamText 异常 / 工具异常HTTP 200, stream 已开始发出 error chunkchunk 被 parse → onError(new Error(errorText)) + onFinish({ isError: true })
D. 连接中断TCP 断 / 代理超时 / client 网络坏HTTP 200, stream 开始后中断error / finish chunkfetch reject → onFinish({ isDisconnect: true })

每条路径的 client 侧信号

useChat({
  onError: (error) => {
    // 只触发路径 A / B / C
    // 路径 D 不走这里, 走 onFinish 的 isDisconnect
  },
  onFinish: ({ isAbort, isDisconnect, isError, finishReason }) => {
    // 四种状态互斥:
    // - 正常完成: 三个 flag 都是 false, finishReason = 'stop' / 'length' / 'tool-calls' / ...
    // - 用户 stop: isAbort = true
    // - 网络断: isDisconnect = true (路径 D)
    // - 流中错: isError = true (路径 C)
    // - 路径 A / B: onFinish 不触发 (流都没开始)
  },
});

服务端正确分层

路径 A / B (前置错): 正常 throw, 返回 HTTP 错误 response:

app.post("/api/chat", async (c) => {
  if (!c.req.header("Authorization")) {
    return c.json({ error: "Unauthorized" }, 401); // ← 路径 A
  }
  // ...
});

路径 C (流中错): 走 onError 序列化器:

const result = streamText({...});

return result.toUIMessageStreamResponse({
  onError: (error) => {
    // 关键: 这个返回值会被写进 error chunk 的 errorText
    // 不要直接 return error.message — 可能暴露内部栈信息
    log.error("stream.failed", error);
    return "Internal error, please retry.";
  },
});

如果 onError 返回 string, 流中断前会发一个 { type: "error", errorText: "..." } chunk, client 的 onErroronFinish({ isError: true }) 都会触发。

路径 D (连接断): 服务端通常无法察觉(或只能通过 request.signal 间接察觉)。client 靠 fetch reject 判定。

客户端错误恢复的四种策略

情形策略实现
isAbort不做事,用户主动的onFinish 里什么都不干
isDisconnect暴露重连显示”重连?”按钮, 调 resumeStream()
isError显示错误, 可选回滚onError + setMessages 去掉最后一条 user msg
路径 A / B表单错误提示onError 里判断 error.message 来区分 (或服务端返回结构化 JSON)

SSE 环境层陷阱

流式 SSE 对网络路径上的每一层都有要求。生产部署时这些层容易出问题:

反向代理缓冲

nginx 默认会缓冲 response,直到收完一定字节才发。SSE 流被缓冲 = client 看到的是”一次性一大堆”而不是”陆续小块”。

location /api/chat {
  proxy_pass http://app;
  proxy_buffering off;         # ← 关键
  proxy_cache off;
  proxy_set_header Connection "";
  proxy_http_version 1.1;
  chunked_transfer_encoding on;
}

Cloudflare

默认启用 Auto Minify 和某些 compression 可能打断 SSE。在 Cloudflare dashboard 关掉路径 /api/chat 的 Auto Minify / Rocket Loader / Speed features。或者给 response 加 header:

return result.toUIMessageStreamResponse({
  headers: {
    "X-Accel-Buffering": "no", // 通用绕过 (nginx / some CDN 认)
    "Cache-Control": "no-cache",
  },
});

Service Worker

如果你的 app 注册了 PWA service worker, 它可能把 POST 请求缓存或重放。显式排除 /api/chat 路径:

self.addEventListener("fetch", (event) => {
  if (event.request.url.includes("/api/chat")) {
    return; // 让 browser 原生处理, 不走 SW
  }
  // ...
});

浏览器连接数

HTTP/1.1 每个 origin 最多 6 个并发连接。SSE 流本身占一个, 页面里再用 XHR 拉数据容易撞上限。部署用 HTTP/2 或 HTTP/3 (Vercel / Cloudflare 默认都是) 避免。

负载均衡器超时

AWS ALB / nginx / Cloudflare 的默认 idle timeout 一般 60s - 5min。一个 agent 跑长任务(10+ 分钟) 可能超时断连。要么:

  • 调大 LB idle timeout
  • 或让 server 每隔 15-30s 主动 flush 一个 no-op 事件(例如 transient data-heartbeat)保持连接活跃

延伸阅读

本章其他页

SDK 源码锚点

外部参考

这页有帮助吗?