Agent 运行循环
Agent 系统作者:一次 Claude Code 对话到底发生了什么——query async generator、每轮 14 步流水线、StreamingToolExecutor、重试 / 恢复 / 熔断路径,全部源码级还原。
这一章填的空白
前面讲了静态组成:prompt 怎么拼 / 记忆怎么存 / 压缩怎么做 / 权限怎么判。但有一个核心问题一直没答:一次对话到底是怎么运行起来的?
- 用户按 Enter 之后,Claude Code 在内部做了什么?
- ReAct 循环是怎么实现的?
- 多轮工具调用怎么调度的?
- 一轮出错后怎么恢复?
claude --resume从上次退出处恢复的机制是什么?
这一章从源码级还原 agent 的运行生命周期。主要参考 query.ts(1729 行)+ QueryEngine.ts(1295 行)+ Task.ts(125 行)+ query/ 目录。
顶层结构:query() 是一个 async generator
Claude Code 的主循环定位在一个函数里:
// query.ts 第 219 行
export async function* query(
params: QueryParams,
): AsyncGenerator<
| StreamEvent
| RequestStartEvent
| Message
| TombstoneMessage
| ToolUseSummaryMessage,
Terminal
> {
const consumedCommandUuids: string[] = []
const terminal = yield* queryLoop(params, consumedCommandUuids)
for (const uuid of consumedCommandUuids) {
notifyCommandLifecycle(uuid, 'completed')
}
return terminal
}
两个关键设计决策:
- Async generator 而不是 Promise
——UI 层实时消费 yield 出来的 StreamEvent,用户看到的”字流式”输出就是这里来的 - Terminal 返回类型——generator 有明确的 exit reason(
Terminal),不是黑盒结束。调用者能区分 “正常完成 / 被 abort / 撞到 blocking limit / maxTurns 耗尽” 等多种终止原因
queryLoop 是内部实现,query 只是包了一个 command lifecycle notify 的薄壳——如果 loop throw 或被 .return() 调用,command 不会被 completed 通知(只有正常返回才通知)。这是生命周期对称性:started 不保证 completed。
QueryParams:一次调用的完整输入
export type QueryParams = {
messages: Message[] // 到目前为止的对话
systemPrompt: SystemPrompt // 预先装配好的系统提示词
userContext: { [k: string]: string } // CLAUDE.md / currentDate
systemContext: { [k: string]: string } // gitStatus / cacheBreaker
canUseTool: CanUseToolFn // 权限检查回调
toolUseContext: ToolUseContext // 工具执行上下文(含 mode / 允许的 tool)
fallbackModel?: string // 主 model 失败时的 fallback
querySource: QuerySource // 调用来源标识(repl_main_thread / compact / ...)
maxOutputTokensOverride?: number // 单轮输出上限覆盖
maxTurns?: number // 循环上限
skipCacheWrite?: boolean // 跳过 cache write
taskBudget?: { total: number } // API 侧的 task budget (beta)
deps?: QueryDeps // 可注入的依赖(测试用)
}
可注入的 deps 是一个精妙的设计(query/deps.ts):
export type QueryDeps = {
callModel: typeof queryModelWithStreaming // LLM 调用
microcompact: typeof microcompactMessages // 工具结果压缩
autocompact: typeof autoCompactIfNeeded // 自动压缩
uuid: () => string // ID 生成
}
源码注释解释为什么:“tests can inject fakes directly instead of spyOn-per-module — the most common mocks (callModel, autocompact) are each spied in 6-8 test files today with module-import-and-spy boilerplate”。
给自研 agent 的启示:顶层循环函数的依赖要可注入——测试环境直接注 fake,不用 spy 6-8 个模块。这是可测试性的基础。
循环状态:14 字段状态机
queryLoop 是一个无限 while 循环,用 State 对象携带跨迭代状态:
type State = {
messages: Message[] // 对话历史
toolUseContext: ToolUseContext // 工具上下文
autoCompactTracking: AutoCompactTrackingState | undefined // 压缩熔断器
maxOutputTokensRecoveryCount: number // max-tokens 恢复次数
hasAttemptedReactiveCompact: boolean // reactive 压缩已尝试?
maxOutputTokensOverride: number | undefined // 输出 token 覆盖
pendingToolUseSummary: Promise<ToolUseSummaryMessage | null> // 异步的工具调用摘要
stopHookActive: boolean | undefined // stop hook 正在跑?
turnCount: number // 当前轮次
transition: Continue | undefined // 上一轮为什么 continue
}
每个字段回答一个具体问题,没有冗余:
transition:上一轮为什么没结束,直接驱动下一轮的处理(注释:“Lets tests assert recovery paths fired without inspecting message contents”——让测试可以断言”恢复路径跑了”,而不用翻消息内容)maxOutputTokensRecoveryCount:独立的子循环计数器——max-output-tokens 错误后多次加量重试,不是全局 turnCounthasAttemptedReactiveCompact:每轮只能尝试一次 reactive compact,避免死循环pendingToolUseSummary:跟工具调用摘要走异步——主循环不等它,后台生成
给自研 agent 的启示:状态字段多并不坏,关键是每个字段有明确的语义责任。14 字段 × 清晰语义 远胜 4 字段 × { [key: string]: any }。
每轮 14 步:处理流水线
每次 while 循环的一轮最多跑这 14 步(很多步会因条件不触发而跳过):
| # | 步骤 | 源码位置 | 作用 |
|---|---|---|---|
| 1 | State 解构 | query.ts 第 311-321 行 | 从 State 拿本轮需要的字段 |
| 2 | Skill 预取 | 第 331 行 startSkillDiscoveryPrefetch | 并发预取相关 skill,在 LLM 流式期间跑 |
| 3 | Yield stream_request_start | 第 337 行 | 给 UI 的”开始了”信号 |
| 4 | getMessagesAfterCompactBoundary | 第 365 行 | 只看 compact boundary 之后的消息,已压缩的跳过 |
| 5 | applyToolResultBudget | 第 379 行 | 执行 per-message 的 tool result 预算 |
| 6 | HISTORY_SNIP(feature-flagged) | 第 401 行 | 删减策略式的历史清理 |
| 7 | Microcompact | 第 414 行 | 工具结果清理(清老 Read/Bash 结果) |
| 8 | Context Collapse(feature-flagged) | 第 441 行 | 另一套上下文管理系统 |
| 9 | Auto-compact | 第 454 行 | LLM 续写摘要(见 压缩) |
| 10 | Blocking limit check | 第 641 行 | 如果撞到硬顶上限,yield error 并 return { reason: 'blocking_limit' } |
| 11 | callModel 流式调用 | 第 659 行 deps.callModel | 调 LLM,流式接收 message / event |
| 12 | Yield messages | 第 708 行之后 | 逐条 yield 给 UI(含 tombstone 回滚机制) |
| 13 | 工具执行 | StreamingToolExecutor 或 runTools | 并行 / 串行跑 tool |
| 14 | 收集 toolResults,决定是否 continue | 循环末尾 | needsFollowUp = toolUseBlocks.length > 0 |
关键细节:
2. Skill 预取和 LLM 流式并行
const pendingSkillPrefetch = skillPrefetch?.startSkillDiscoveryPrefetch(...)
// ... 继续处理
// ... 调用 LLM,流式接收
// LLM 响应期间 skill 预取在后台跑
注释说:“Replaces the blocking assistant_turn path that ran inside getAttachmentMessages (97% of those calls found nothing in prod).”——原来 skill 发现是阻塞的,生产中 97% 找不到任何 skill 却阻塞了整轮。现在并发跑,几乎零成本。
4. getMessagesAfterCompactBoundary —— 压缩边界保护
压缩之后旧消息被替换成 summary。这里只取最近 boundary 之后的消息。注释:“REPL keeps snipped messages for UI scrollback — project so the compact model doesn’t summarize content that was intentionally removed”——UI 的滚动条保留着被”snip”掉的消息供展示,但模型那边不能看见(否则又被重新压缩一次)。
5. applyToolResultBudget —— 每条消息的 tool result 预算
Enforce per-message budget on aggregate tool result size. Runs BEFORE microcompact — cached MC operates purely by tool_use_id (never inspects content), so content replacement is invisible to it and the two compose cleanly.
意思:有一个 per-message 的工具结果总量预算(不同工具可以有不同上限),超预算就把内容替换为占位。顺序很关键——必须在 microcompact 之前,因为 cached microcompact 只看 tool_use_id 不看内容,两者能无缝组合。
10. Blocking limit —— 硬顶之前的主动阻塞
const { isAtBlockingLimit } = calculateTokenWarningState(
tokenCountWithEstimation(messagesForQuery) - snipTokensFreed,
model,
)
if (isAtBlockingLimit) {
yield createAssistantAPIErrorMessage({ content: PROMPT_TOO_LONG_ERROR_MESSAGE, ... })
return { reason: 'blocking_limit' }
}
当自动压缩被用户关掉时,这个检查预先阻止超限——为手动 /compact 留出 MANUAL_COMPACT_BUFFER_TOKENS = 3000 的空间。注释详述这个门槛要跳过的四种情况:
- 刚刚压缩过(
compactionResult)——usage 是 stale 的 querySource === 'compact' / 'session_memory'——forked agent 会死锁- Reactive compact enabled——让真 413 触发 reactive
- Context collapse enabled——collapse 自己管
给自研 agent 的启示:硬顶拦截不该是一个全局开关,要能精确豁免特殊调用路径——否则特殊路径会在 blocking limit 上死锁。
13. Streaming Tool Executor —— 边流边执行
const useStreamingToolExecution = config.gates.streamingToolExecution
let streamingToolExecutor = useStreamingToolExecution
? new StreamingToolExecutor(...)
: null
两条路径:
- 传统:LLM 流完 → 解析 tool_use → 按顺序 / 并发执行工具 → 拿结果
- Streaming(
StreamingToolExecutor):LLM 流到 tool_use block 一出现就启动对应工具,不等整个 assistant message 完成
延迟显著降低——如果是多个独立工具调用,能接近并发执行的 wall-clock 时间。
模型 fallback + streaming fallback
第 654 行的 while (attemptWithFallback) 是双层 fallback 逻辑:
- Model fallback:主 model 失败(API error / throttle) → 换
fallbackModel重试 - Streaming fallback:streaming 模式下出错(如 thinking block 异常),切到非 streaming 重试
源码里有一段特别难读但很重要的处理:streaming fallback 触发时,之前流出去的半截 assistant message 要被 tombstone——因为它可能带着无效的 thinking block signature,重新提交会被 API 拒绝。
if (streamingFallbackOccured) {
for (const msg of assistantMessages) {
yield { type: 'tombstone' as const, message: msg } // UI 和 transcript 删掉这条
}
assistantMessages.length = 0
toolResults.length = 0
// ... 丢弃 pending tool results,重开 executor
}
Tombstone 消息是 UI 和 transcript 的”删除标记”——流出去的消息不能从客户端撤回,但 tombstone 告诉下游”这条作废”。
给自研 agent 的启示:streaming 输出的撤回机制必须存在。LLM 流到一半发现有问题,不可能从客户端”收回”已经流出的字符——需要一个显式作废标记。
终止条件:Terminal 的 6 种 reason
根据 query.ts 里 return { reason: ... } 的所有分支,循环可能以这些原因终止:
| Reason | 条件 | 含义 |
|---|---|---|
blocking_limit | 撞到硬顶 | 手动压缩都救不了 |
max_turns | turnCount > maxTurns | 用户设置的 turn 上限 |
done | 本轮没有 tool_use | 模型认为任务完成 |
aborted | abortController.signal.aborted | 用户中断 |
stop_hook_blocked | Stop hook 返回 block | 用户 hook 阻止继续 |
error | 其他异常 | 不可恢复错误 |
不同 reason 对应不同的后续处理——“done” 后 UI 显示完成、“aborted” 显示”已取消”、“blocking_limit” 引导用户手动压缩、“max_turns” 建议调 maxTurns。
Task Layer:7 种 Task 类型
Agent 调用的顶层封装在 Task.ts。共有 7 种 TaskType:
export type TaskType =
| 'local_bash' // 本地 shell 任务
| 'local_agent' // 本地运行的子 agent
| 'remote_agent' // CCR 云端 agent
| 'in_process_teammate' // 同进程内的 teammate
| 'local_workflow' // 本地 workflow
| 'monitor_mcp' // MCP server 监控
| 'dream' // 夜间整理记忆的 dream job
5 种状态:
export type TaskStatus = 'pending' | 'running' | 'completed' | 'failed' | 'killed'
Task ID 有前缀(TASK_ID_PREFIXES):
{
local_bash: 'b', // 保持 'b' 兼容旧版本
local_agent: 'a',
remote_agent: 'r',
in_process_teammate: 't',
local_workflow: 'w',
monitor_mcp: 'm',
dream: 'd',
}
随机 8 字符 suffix 从 36^8 ≈ 2.8 万亿组合——源码注释:“sufficient to resist brute-force symlink attacks”。
为什么 ID 要抗 symlink 攻击:task 的输出文件路径来自 ID(getTaskOutputPath(id))。如果攻击者能预测 ID,就能预先创建 symlink 指向任意文件,让 task 的 stdout 写入该文件。36^8 的熵足够让这种攻击不现实。
给自研 agent 的启示:任何用 ID 作为文件路径的系统必须考虑 ID 可预测性——生产事故里这种”ID 不够随机导致 race condition 或攻击”的案例不少见。
Resume:claude --resume 和 session storage
前面 compaction 提到过:“Background jobs that summarize previous conversations for the claude --resume feature”——resume 时的压缩是后台预算好的。
机制拆解:
- Session storage:每次对话都写入
~/.claude/projects/<project>/sessions/<sessionId>.jsonl - 后台 summary agent:退出 Claude Code 后,一个后台 job 读取 session 文件,产出 summary
- Resume 时:读 session + summary,重建 State,进入 queryLoop
这样 resume 近乎即时——summary 已经算好。注释里明说 “subscribers can use /stats to view usage patterns”——usage 数据持久化了,跨 session 可见。
Abort 的全链路路径
toolUseContext.abortController 是贯穿整条调用链的中央取消点:
用户按 Esc
→ abortController.abort()
→ signal.aborted = true
→ LLM 流中断(AbortSignal 传到 fetch)
→ 所有在跑的工具收到 signal(tool.execute 的第二参数)
→ 各工具自己的 cleanup(bwrap 进程 kill、文件锁释放、...)
→ queryLoop 检查 signal.aborted → return { reason: 'aborted' }
关键设计:每一层都自己观察 signal,不用等上层通知。LLM fetch 原生支持 AbortSignal;工具 execute 的第二参数总含 signal;Bash 进程 wait 监听 signal——取消是全链路的广播,不是逐层转发。
给自研 agent 的启示:AbortController 必须从入口贯穿到每一个叶子操作。半截的 abort 支持比没有更糟——用户以为取消了,实际上某层还在跑。
QueryEngine:单次 LLM 调用的层级
QueryEngine.ts(1295 行)是 callModel 函数的底层,专门处理一次 LLM 调用的流式消费:
- 解析 SSE event(content_block_start / content_block_delta / content_block_stop / message_delta / …)
- 构建 assistant message
- 处理 max_tokens / stop_reason / 各种 API error
- Streaming fallback(上面讲过)
- Thinking block 的特殊处理(signature 验证)
- Usage tracking(包括 cache read / cache creation 各自的 token 数)
这一层的复杂度来自需要把 API 的”low-level event 流”组装成”high-level assistant message”,同时保持 cancel-safe / error-safe / partial-state-safe。不是玩具——生产级的 streaming API 消费端逻辑至少是 1000+ 行。
给自研 agent 的要点
- 主循环用 async generator,不是 Promise
——UI 实时流式消费是 agent 产品的基线体验 - Terminal 返回值要带 reason——不同终止原因引导不同后续 UX,黑盒 undefined 没法做
- 依赖注入:callModel / microcompact / autocompact 等高频 mock 的依赖做成
deps对象——否则测试必须 spy 6-8 个模块 - State 字段清晰语义:14 个字段每个回答一个问题,远胜一个
anybucket - 每轮处理是明确的流水线:snip → microcompact → collapse → autocompact → blocking check → model call → tools。顺序是设计选择,要有注释解释为什么这个顺序
- 并发预取:skill / memory / cache params 都可以在 LLM 流式期间后台跑——用
Promise.all或using disposable管理生命周期 - Streaming tool executor:LLM 流到 tool_use block 就启动工具,不等整个 assistant message 完成——显著降低多工具场景延迟
- Tombstone message 作为 “已流出但作废” 的显式标记——streaming 输出无法客户端撤回,需要显式标记
- Model fallback + streaming fallback 是两层 —— 分别应对 API error 和 streaming 异常
- AbortController 贯穿到叶子——每一层自己观察 signal,不是层层转发。半截的 cancel 比没有更糟
- Task ID 要有足够熵——文件名用 ID 时必须考虑 symlink 攻击,36^8 是 Claude Code 的选择
- Resume 的压缩在后台预算——用户体验上 resume 是即时的,不是退出时没处理