OpenClaw 源代码分析
当用户在whatsapp, discord 等消息软件中发送了消息后,网关是如何获得的,再到回复,整个流程是如何运转的?
OpenClaw 消息处理完整流程
1. 消息接入 → 2. 路由决策 → 3. AI 处理 → 4. 回复发送
核心文件位置
| 模块 | 文件位置 | 功能 |
|---|---|---|
| 渠道实现 | extensions/*/src/channel.ts |
WhatsApp/Discord/Telegram 等渠道插件 |
| 渠道监听 | extensions/discord/src/monitor/listeners.ts |
监听渠道消息事件 |
| 消息分发 | src/auto-reply/dispatch.ts |
协调预处理、路由、回复 |
| 路由解析 | src/routing/resolve-route.ts |
根据 bindings 配置决定由哪个 agent 处理 |
| 网关服务 | src/gateway/server.impl.ts |
网关服务器主实现 |
| Agent 执行 | src/agents/pi-embedded-runner/ |
运行 AI agent |
| 消息发送 | src/infra/outbound/deliver.ts |
统一发送逻辑 |
详细流程示例(以 Discord 为例)
Discord 用户发送消息
↓
[extensions/discord/src/monitor/listeners.ts
DiscordMessageListener 监听消息事件
↓
预处理(权限检查、白名单验证
↓
解析消息内容、附件、用户信息
↓
src/auto-reply/dispatch.ts
dispatchInboundMessage()
↓
src/routing/resolve-route.ts
解析路由绑定(bindings)
├─ 匹配规则优先级:
├─ peer (用户)
├─ guild (服务器)
├─ channel (渠道)
└─ default (默认)
↓
生成 sessionKey,决定由哪个 agent 处理
↓
src/agents/pi-embedded-runner/
执行 AI agent,处理工具调用
↓
src/infra/outbound/deliver.ts
规范化回复内容
↓
根据渠道特性格式化消息
↓
extensions/discord/src/send.ts
发送回 Discord
关键设计点
- 渠道插件化 - 每个渠道在
extensions/*/独立实现 - 路由绑定 - `bindings 配置按优先级匹配
- 会话管理 - sessionKey 追踪会话状态
- 统一发送 - 各渠道格式统一处理
展示一下 resolveAgentRoute 函数的具体代码
resolveAgentRoute 函数代码详解
这是路由解析的核心函数(位于 src/routing/resolve-route.ts:295):
核心流程
1. 规范化输入 → 2. 获取 bindings → 3. 按优先级匹配 → 4. 返回路由结果
匹配优先级(tiers 数组)
| 优先级 | 匹配方式 | 条件 |
|---|---|---|
| 1 | binding.peer |
直接匹配用户 |
| 2 | binding.peer.parent |
继承父线程(帖子)的绑定 |
| 3 | binding.guild+roles |
Discord 服务器 + 角色 |
| 4 | binding.guild |
Discord 服务器 |
| 5 | binding.team |
Slack 团队 |
| 6 | binding.account |
账户 |
| 7 | binding.channel |
渠道 |
| 8 | default |
默认 agent |
关键返回值
{
agentId: string; // 目标 agent ID
channel: string; // 渠道
accountId: string; // 账户
sessionKey: string; // 会话密钥
mainSessionKey: string; // 主会话密钥(用于 DM 合并)
matchedBy: \"...\"; // 匹配方式(用于调试)
}
优化点
- 使用
WeakMap缓存evaluatedBindingsCacheByCfg,避免重复解析 bindings - 缓存最大 2000 个 channel+account 组合”
从 Web UI 进来的消息流程是?
Web UI 消息流程
完整链路图
┌─────────────────────────────────────────────────────────────────────────┐
│ Web UI (前端) │
│ ui/src/ui/controllers/chat.ts │
│ └─ sendChatMessage() │
│ └─ client.request(\"chat.send\", ...) │
└────────────────────────────────────┬────────────────────────────────────┘
│ WebSocket
▼
┌─────────────────────────────────────────────────────────────────────────┐
│ GatewayBrowserClient │
│ ui/src/ui/gateway.ts │
│ └─ request() 发送 JSON-RPC 消息 │
└────────────────────────────────────┬────────────────────────────────────┘
│ WebSocket
▼
┌─────────────────────────────────────────────────────────────────────────┐
│ Gateway Server (网关服务端) │
│ src/gateway/server-methods/chat.ts │
│ └─ chatHandlers[\"chat.send\"] │
│ ├─ 验证参数 │
│ ├─ 解析附件 │
│ ├─ 加载 session │
│ ├─ 检查停止命令 │
│ └─ 调用 dispatchInboundMessage() │
└────────────────────────────────────┬────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────┐
│ Auto-Reply (自动回复系统) │
│ src/auto-reply/dispatch.ts │
│ └─ dispatchInboundMessage() │
│ └─ dispatchReplyFromConfig() │
└────────────────────────────────────┬────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────┐
│ Routing (路由解析) │
│ src/routing/resolve-route.ts │
│ └─ resolveAgentRoute() → 决定由哪个 agent 处理 │
└────────────────────────────────────┬────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────┐
│ Agent Execution (Agent 执行) │
│ src/agents/pi-embedded-runner/ │
│ └─ 运行 AI agent,处理工具调用 │
└────────────────────────────────────┬────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────┐
│ 回复通过 chat 事件流返回 │
│ src/gateway/server-methods/chat.ts │
│ └─ broadcastChatFinal() / broadcastChatEvent() │
│ └─ 通过 WebSocket 发送 \"chat\" 事件 │
└────────────────────────────────────┬────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────┐
│ Web UI (前端) │
│ ui/src/ui/controllers/chat.ts │
│ └─ handleChatEvent() │
│ ├─ \"delta\" 状态: 更新流式文本 │
│ └─ \"final\" 状态: 加载完整历史,显示最终消息 │
└─────────────────────────────────────────────────────────────────────────┘
关键文件位置
| 环节 | 文件 | 说明 |
|---|---|---|
| 前端发送 | ui/src/ui/controllers/chat.ts:75 |
sendChatMessage() 函数 |
| WebSocket 客户端 | ui/src/ui/gateway.ts:289 |
GatewayBrowserClient.request() |
| 网关处理 | src/gateway/server-methods/chat.ts:704 |
chatHandlers[\"chat.send\"] |
| 消息分发 | src/auto-reply/dispatch.ts |
dispatchInboundMessage() |
| 路由解析 | src/routing/resolve-route.ts:295 |
resolveAgentRoute() |
| 事件接收 | ui/src/ui/controllers/chat.ts:185 |
handleChatEvent() |
与外部渠道(WhatsApp/Discord)的区别
| 特性 | Web UI | 外部渠道 |
|---|---|---|
| 通道 | WebSocket (JSON-RPC) | 渠道 API/SDK |
| 入口方法 | chat.send |
渠道监听器 |
| 回复方式 | WebSocket 事件流 | 渠道发送 API |
| Session Key | 直接使用 | 通过 bindings 解析 |
| 内部 channel | \"internal\" / \"webchat\" |
实际渠道名 |
Web UI 和 外部渠道 都要经过网关控制,然后进行路由?
路由机制的一致性
相同的路由解析函数
无论是 Web UI 还是外部渠道,都会调用同一个路由解析函数:
// 外部渠道(如 WhatsApp)在 src/web/auto-reply/monitor/on-message.ts 中调用
const route = resolveAgentRoute({
cfg: loadConfig(),
channel: \"whatsapp\",
accountId: msg.accountId,
peer: {
kind: msg.chatType === \"group\" ? \"group\" : \"direct\",
id: peerId,
},
});
// Web UI 在 src/gateway/server-methods/chat.ts 中调用
const agentId = resolveSessionAgentId({
sessionKey,
config: cfg,
});
// 而 resolveSessionAgentId 最终会使用与 resolveAgentRoute 相同的逻辑
共享的路由规则
路由规则在配置文件中定义,对所有渠道都是统一的:
// 路由解析的核心逻辑(src/routing/resolve-route.ts)
const tiers: Array<{
matchedBy: Exclude<ResolvedAgentRoute[\"matchedBy\"], \"default\">;
enabled: boolean;
scopePeer: RoutePeer | null;
predicate: (candidate: EvaluatedBinding) => boolean;
}> = [
{ matchedBy: \"binding.peer\", enabled: Boolean(peer), scopePeer: peer, predicate: (candidate) => candidate.match.peer.state === \"valid\" },
{ matchedBy: \"binding.peer.parent\", enabled: Boolean(parentPeer && parentPeer.id), scopePeer: parentPeer && parentPeer.id ? parentPeer : null, predicate: (candidate) => candidate.match.peer.state === \"valid\" },
{ matchedBy: \"binding.guild+roles\", enabled: Boolean(guildId && memberRoleIds.length > 0), scopePeer: peer, predicate: (candidate) => hasGuildConstraint(candidate.match) && hasRolesConstraint(candidate.match) },
{ matchedBy: \"binding.guild\", enabled: Boolean(guildId), scopePeer: peer, predicate: (candidate) => hasGuildConstraint(candidate.match) && !hasRolesConstraint(candidate.match) },
{ matchedBy: \"binding.team\", enabled: Boolean(teamId), scopePeer: peer, predicate: (candidate) => hasTeamConstraint(candidate.match) },
{ matchedBy: \"binding.account\", enabled: true, scopePeer: peer, predicate: (candidate) => candidate.match.accountPattern !== \"*\" },
{ matchedBy: \"binding.channel\", enabled: true, scopePeer: peer, predicate: (candidate) => candidate.match.accountPattern === \"*\" },
];
统一的回复处理
无论是 Web UI 还是外部渠道,最终都会使用相同的回复处理逻辑:
// Web UI 在 src/gateway/server-methods/chat.ts 中调用
const replyResult = await (params.replyResolver ?? getReplyFromConfig)(
ctx,
resolvedOpts,
cfg,
);
// 外部渠道在 src/web/auto-reply/monitor/process-message.ts 中调用
const replyResult = await params.replyResolver(
ctx,
replyOptions,
params.cfg,
);
处理流程的区别
虽然路由机制相同,但两者的入口点和处理流程略有不同:
外部渠道(如 WhatsApp)的流程
- 监听渠道消息(
monitorWebChannel) - 消息预处理(去重、验证等)
- 解析路由(
resolveAgentRoute) - 处理消息和生成回复(
processMessage) - 发送回复到渠道
Web UI 的流程
- 用户在浏览器中发送消息
- 前端通过 WebSocket 发送到网关(
chat.send方法) - 网关验证参数并解析会话
- 解析路由(
resolveSessionAgentId) - 处理消息和生成回复(
dispatchInboundMessage) - 通过 WebSocket 发送回复到前端
总结
Web UI 和外部渠道都要经过网关的控制和路由机制,使用相同的路由规则和回复处理逻辑,确保了系统行为的一致性。这种设计使得开发者可以统一管理所有渠道的消息处理,同时也为扩展新渠道提供了便利。
Pi Agent 如何调用 LLM 和本地执行命令
Pi Agent 调用 LLM 和本地执行命令的机制
1. LLM 调用机制
核心流程
Pi Agent 通过以下步骤调用 LLM:
// 1. 初始化 Agent 会话
const activeSession = createAgentSession({
session: sessionManager,
settings: settingsManager,
// ...其他配置
});
// 2. 配置模型和工具
if (params.model.api === \"ollama\") {
activeSession.agent.streamFn = createOllamaStreamFn(ollamaBaseUrl);
} else {
activeSession.agent.streamFn = streamSimple; // 使用 @mariozechner/pi-ai 库
}
// 3. 发送提示给 LLM
await activeSession.prompt(effectivePrompt, { images: imageResult.images });
关键实现位置
- LLM 配置:
src/agents/pi-embedded-runner/run/attempt.ts:612-634 - 模型解析:
src/agents/pi-embedded-runner/model.ts - 会话管理:使用
@mariozechner/pi-coding-agent库的SessionManager - Ollama 调用:
src/agents/ollama-stream.ts - 通用 LLM 调用:
@mariozechner/pi-ai库的streamSimple函数
支持的模型提供者
- OpenAI/GPT-4o
- Anthropic Claude 3
- Google Gemini
- Ollama(本地模型)
- GitHub Copilot
- 其他通过配置文件定义的模型
2. 本地命令执行机制
核心工具创建
// 在 src/agents/pi-tools.ts 中创建工具
const tools = createOpenClawCodingTools({
exec: {
...params.execOverrides,
elevated: params.bashElevated,
},
sandbox,
// ...其他配置
});
工具类型
- 编码工具:读取、写入、编辑文件
- 执行工具:执行 shell 命令(
bash-tools.exec.ts) - 进程工具:管理长期运行的进程(
bash-tools.process.ts) - OpenClaw 特定工具:浏览器操作、发送消息等(
openclaw-tools.ts)
命令执行流程
// 在 src/agents/bash-tools.exec-runtime.ts 中
export async function runExecProcess(opts: {
command: string;
workdir: string;
env: Record<string, string>;
sandbox?: BashSandboxConfig;
// ...其他配置
}): Promise<ExecProcessHandle> {
// 1. 安全检查和环境准备
validateHostEnv(env);
// 2. 执行命令
if (opts.sandbox) {
// 沙箱执行(Docker)
const argv = [\"docker\", ...buildDockerExecArgs({...})];
// ...
} else {
// 直接执行
const child = childProcess.spawn(command, options);
// ...
}
// 3. 输出处理和更新
child.stdout.on('data', handleStdout);
child.stderr.on('data', handleStderr);
// 4. 结果返回
return processHandle;
}
安全机制
- 白名单检查:
evaluateShellAllowlist()检查命令是否在安全白名单中 - 沙箱执行:支持 Docker 沙箱
-
权限分级:根据配置决定安全级别(deny allowlist full) - 审计日志:记录所有执行的命令
关键实现位置
- 工具创建:
src/agents/pi-tools.ts - 执行工具:
src/agents/bash-tools.exec.ts - 执行运行时:
src/agents/bash-tools.exec-runtime.ts - 进程管理:
src/agents/bash-tools.process.ts - 执行审批:
src/infra/exec-approvals.ts
3. 工具调用流程
Agent 与工具的交互
// 在 src/agents/pi-embedded-subscribe.subscribe-embedded-pi-session.ts 中
const subscription = subscribeEmbeddedPiSession({
session: activeSession,
runId: params.runId,
onToolResult: (toolResult) => {
// 处理工具结果
},
// ...其他回调
});
工具调用事件处理
- 工具执行开始:
handleToolExecutionStart() - 工具执行结束:
handleToolExecutionEnd() - 工具输出处理:通过回调函数实时更新 UI
- 错误处理:捕获和显示工具错误
4. 配置和策略
工具策略配置
# config.yaml 中的工具策略配置
tools:
exec:
security: allowlist # 安全模式
ask: on-miss # 审批模式
safeBins: # 安全命令白名单
- ls
- cat
- git
沙箱配置
sandbox:
enabled: true
containerName: openclaw-sandbox
containerWorkdir: /workspace
总结
Pi Agent 提供了一个完整的 AI 代理系统,支持通过配置文件灵活定义工具和模型行为。LLM 调用和本地命令执行通过统一的工具接口实现,同时提供了严格的安全机制(如沙箱、白名单)来确保系统安全。这种架构使得 OpenClaw 能够处理各种复杂的任务,从代码编写到系统管理。
定时任务 执行的流程
OpenClaw 定时任务执行流程
1. 任务定义与配置
定时任务支持三种调度类型:
// src/cron/types.ts
type CronSchedule =
| { kind: \"at\"; at: string } // 一次性任务(ISO时间或+duration)
| { kind: \"every\"; everyMs: number; anchorMs?: number } // 周期性任务(毫秒间隔)
| { kind: \"cron\"; expr: string; tz?: string }; // Cron表达式任务
任务存储在 ~/.openclaw/cron/ 目录的 JSON 文件中,通过 CronService 管理。
2. 调度器原理
核心调度器组件
- CronService:定时任务服务主类(
src/cron/service.ts) - 定时器管理:管理定时任务的唤醒和执行(
src/cron/service/timer.ts) - 作业管理:任务创建、更新、删除(
src/cron/service/jobs.ts)
调度策略
// src/cron/service/timer.ts
const MAX_TIMER_DELAY_MS = 60_000; // 最大定时器延迟(1分钟)
const MIN_REFIRE_GAP_MS = 2000; // 最小触发间隔(防止连续触发)
export function armTimer(state: CronServiceState) {
const nextAt = nextWakeAtMs(state); // 计算下一个任务的执行时间
const delay = Math.max(nextAt - now, 0);
const clampedDelay = Math.min(delay, MAX_TIMER_DELAY_MS);
state.timer = setTimeout(() => {
void onTimer(state).catch((err) => {
state.deps.log.error({ err: String(err) }, \"cron: timer tick failed\");
});
}, clampedDelay);
}
3. 任务执行流程
定时器触发(onTimer)
// src/cron/service/timer.ts
export async function onTimer(state: CronServiceState) {
if (state.running) { // 防止重入
state.timer = setTimeout(() => void onTimer(state), MAX_TIMER_DELAY_MS);
return;
}
state.running = true;
try {
// 1. 查找到期任务
const dueJobs = await locked(state, async () => {
await ensureLoaded(state, { forceReload: true, skipRecompute: true });
const due = findDueJobs(state);
for (const job of due) {
job.state.runningAtMs = state.deps.nowMs();
}
await persist(state);
return due;
});
// 2. 执行任务
const results = [];
for (const { id, job } of dueJobs) {
const startedAt = state.deps.nowMs();
const result = await executeJobCore(state, job);
results.push({ jobId: id, ...result, startedAt, endedAt: state.deps.nowMs() });
}
// 3. 处理执行结果
await locked(state, async () => {
for (const result of results) {
const job = state.store?.jobs.find((j) => j.id === result.jobId);
if (job) {
const shouldDelete = applyJobResult(state, job, result);
if (shouldDelete) {
state.store.jobs = state.store.jobs.filter((j) => j.id !== job.id);
}
}
}
await persist(state);
});
// 4. 会话清理
await sweepCronRunSessions({...});
} finally {
state.running = false;
armTimer(state); // 重新武装定时器
}
}
任务执行(executeJobCore)
// src/cron/service/timer.ts
async function executeJobCore(state: CronServiceState, job: CronJob): Promise<CronRunOutcome & CronRunTelemetry> {
if (job.sessionTarget === \"main\") {
// 主会话任务:发送系统事件
const text = resolveJobPayloadTextForMain(job);
state.deps.enqueueSystemEvent(text, {
agentId: job.agentId,
sessionKey: job.sessionKey,
contextKey: `cron:${job.id}`,
});
if (job.wakeMode === \"now\") {
await state.deps.runHeartbeatOnce({...});
} else {
state.deps.requestHeartbeatNow({...});
}
} else if (job.sessionTarget === \"isolated\") {
// 隔离会话任务:执行AI代理对话
const res = await state.deps.runIsolatedAgentJob({
job,
message: job.payload.message,
});
}
}
4. 执行入口
CLI 命令
# 查看任务状态
pnpm openclaw cron status
# 列出所有任务
pnpm openclaw cron list
# 添加任务
pnpm openclaw cron add --at \"2025-12-31T23:59:00\" --message \"Happy New Year!\"
# 立即运行任务
pnpm openclaw cron run <job-id>
网关 API
// src/gateway/server-methods/cron.ts
export const cronHandlers: GatewayRequestHandlers = {
\"cron.list\": async ({ params, respond, context }) => { ... },
\"cron.add\": async ({ params, respond, context }) => { ... },
\"cron.run\": async ({ params, respond, context }) => { ... },
\"cron.status\": async ({ params, respond, context }) => { ... },
};
5. 与其他系统组件的交互
系统事件
任务可以发送系统事件,在主会话中显示。
心跳系统
任务可以请求立即执行心跳,触发 AI 处理。
隔离代理执行
支持在隔离会话中执行 AI 任务,不会影响主会话的上下文。
6. 错误处理与稳定性
- 任务超时:默认 10 分钟超时
- 错误退避:指数退避策略(30s → 1min → 5min → 15min → 60min)
- 状态恢复:启动时清除过期的运行标记
- 存储锁定:确保操作原子性
总结
OpenClaw 的定时任务系统是一个设计完善的调度系统,支持多种任务类型和执行方式,具有高可靠性和良好的扩展性。
定时任务的整个生命周期
定时任务的完整生命周期
1. 任务创建阶段
创建入口
- CLI命令:
pnpm openclaw cron add - API调用:
cron.add网关方法 - UI操作:通过 Web UI 或应用程序界面
创建流程
// src/cron/service/jobs.ts
export function createJob(state: CronServiceState, input: CronJobCreate): CronJob {
const job: CronJob = {
id: input.id || generateUUID(),
schedule: input.schedule,
payload: input.payload,
agentId: input.agentId,
sessionKey: input.sessionKey,
sessionTarget: input.sessionTarget,
wakeMode: input.wakeMode,
deleteAfterRun: input.deleteAfterRun,
webhookTargets: input.webhookTargets,
tags: input.tags,
state: {
runningAtMs: undefined,
lastRunAtMs: undefined,
lastStatus: undefined,
lastDurationMs: undefined,
consecutiveErrors: 0,
nextRunAtMs: computeJobNextRunAtMs(input, state.deps.nowMs()),
lastErrorTimeMs: undefined,
},
};
state.store.jobs.push(job);
return job;
}
任务验证
// src/cron/normalize.ts
export function normalizeCronJobCreate(input: unknown): CronJobCreate | null;
// src/cron/validate-timestamp.ts
export function validateScheduleTimestamp(schedule: CronSchedule): { ok: boolean; message?: string };
2. 任务存储阶段
存储位置
- 默认路径:
~/.openclaw/cron/ - 文件格式:JSON
- 存储接口:
src/cron/store.ts
持久化流程
// src/cron/store.ts
async function saveCronStore(path: string, store: CronStore): Promise<void> {
const dir = path.dirname(path);
await fs.mkdir(dir, { recursive: true });
await fs.writeFile(path, JSON.stringify(store, null, 2), \"utf-8\");
}
3. 任务调度阶段
调度器初始化
// src/cron/service/ops.ts
export async function start(state: CronServiceState): Promise<void> {
await locked(state, async () => {
await ensureLoaded(state, { skipRecompute: true });
const jobs = state.store?.jobs ?? [];
// 清除过期的运行标记
for (const job of jobs) {
if (typeof job.state.runningAtMs === \"number\") {
job.state.runningAtMs = undefined;
}
}
await runMissedJobs(state); // 运行错过的任务
recomputeNextRuns(state); // 重新计算所有任务的下次运行时间
await persist(state); // 持久化状态
armTimer(state); // 启动定时器
});
}
定时器管理
// src/cron/service/timer.ts
export function armTimer(state: CronServiceState): void {
const nextAt = nextWakeAtMs(state); // 计算下一个任务的执行时间
const delay = Math.max(nextAt - now, 0);
const clampedDelay = Math.min(delay, MAX_TIMER_DELAY_MS); // 限制最大延迟
state.timer = setTimeout(() => {
void onTimer(state).catch((err) => {
state.deps.log.error({ err: String(err) }, \"cron: timer tick failed\");
});
}, clampedDelay);
}
4. 任务执行阶段
任务触发
// src/cron/service/timer.ts (onTimer)
export async function onTimer(state: CronServiceState) {
if (state.running) { // 防止重入
state.timer = setTimeout(() => void onTimer(state), MAX_TIMER_DELAY_MS);
return;
}
state.running = true;
try {
const dueJobs = await locked(state, async () => {
await ensureLoaded(state, { forceReload: true, skipRecompute: true });
const due = findDueJobs(state); // 查找到期任务
for (const job of due) {
job.state.runningAtMs = state.deps.nowMs();
}
await persist(state);
return due;
});
// 执行任务
const results = [];
for (const { id, job } of dueJobs) {
const startedAt = state.deps.nowMs();
const result = await executeJobCore(state, job);
results.push({ jobId: id, ...result, startedAt, endedAt: state.deps.nowMs() });
}
// 处理执行结果
await locked(state, async () => {
for (const result of results) {
const job = state.store?.jobs.find((j) => j.id === result.jobId);
if (job) {
const shouldDelete = applyJobResult(state, job, result);
if (shouldDelete) {
state.store.jobs = state.store.jobs.filter((j) => j.id !== job.id);
}
}
}
await persist(state);
});
await sweepCronRunSessions({...});
} finally {
state.running = false;
armTimer(state); // 重新武装定时器
}
}
任务执行(executeJobCore)
// src/cron/service/timer.ts
async function executeJobCore(state: CronServiceState, job: CronJob): Promise<CronRunOutcome & CronRunTelemetry> {
if (job.sessionTarget === \"main\") {
// 主会话任务:发送系统事件
const text = resolveJobPayloadTextForMain(job);
state.deps.enqueueSystemEvent(text, {
agentId: job.agentId,
sessionKey: job.sessionKey,
contextKey: `cron:${job.id}`,
});
if (job.wakeMode === \"now\") {
await state.deps.runHeartbeatOnce({...});
} else {
state.deps.requestHeartbeatNow({...});
}
} else if (job.sessionTarget === \"isolated\") {
// 隔离会话任务:执行AI代理对话
const res = await state.deps.runIsolatedAgentJob({
job,
message: job.payload.message,
});
}
}
5. 结果处理阶段
结果应用
// src/cron/service/timer.ts (applyJobResult)
function applyJobResult(
state: CronServiceState,
job: CronJob,
result: { status: CronRunStatus; error?: string; ... }
): boolean {
// 更新任务状态
job.state.runningAtMs = undefined;
job.state.lastRunAtMs = result.startedAt;
job.state.lastStatus = result.status;
job.state.lastDurationMs = result.endedAt - result.startedAt;
// 处理连续错误和退避
if (result.status === \"error\") {
job.state.consecutiveErrors = (job.state.consecutiveErrors ?? 0) + 1;
const backoff = errorBackoffMs(job.state.consecutiveErrors);
job.state.nextRunAtMs = Math.max(normalNext, result.endedAt + backoff);
}
// 处理一次性任务删除
const shouldDelete = job.schedule.kind === \"at\" &&
job.deleteAfterRun === true &&
result.status === \"ok\";
return shouldDelete;
}
结果记录
// src/cron/run-log.ts
export async function appendCronRunLog(logPath: string, entry: CronRunLogEntry): Promise<void> {
const dir = path.dirname(logPath);
await fs.mkdir(dir, { recursive: true });
const fd = await fs.open(logPath, \"a\");
try {
await fd.writeFile(`${JSON.stringify(entry)}\
`);
} finally {
await fd.close();
}
}
6. 任务更新与删除阶段
任务更新
// src/cron/service/jobs.ts
export function patchJob(state: CronServiceState, id: string, patch: CronJobPatch): CronJob | undefined {
const job = state.store.jobs.find((j) => j.id === id);
if (!job) {
return undefined;
}
// 合并更新
const next = mergeCronJobPatch(job, patch);
// 重新计算下次运行时间
next.state.nextRunAtMs = computeJobNextRunAtMs(next, state.deps.nowMs());
return next;
}
任务删除
// src/cron/service/ops.ts
export async function remove(state: CronServiceState, id: string): Promise<boolean> {
return await locked(state, async () => {
const index = state.store.jobs.findIndex((j) => j.id === id);
if (index === -1) {
return false;
}
state.store.jobs.splice(index, 1);
await persist(state);
// 可能需要重新武装定时器
if (!state.timer) {
armTimer(state);
}
return true;
});
}
7. 任务清理阶段
会话清理
// src/cron/session-reaper.ts
export async function sweepCronRunSessions(params: {
cronConfig?: CronConfig;
sessionStorePath: string;
nowMs: number;
log: Logger;
}) {
// 清理过期的会话(根据cronConfig.sessionRetention配置)
}
日志清理
// src/cron/run-log.ts
export async function pruneCronRunLog(logPath: string, opts: { maxAgeMs?: number; maxEntries?: number }): Promise<void> {
// 清理过时的日志条目
}
8. 错误处理与稳定性保障
错误类型
// src/cron/types.ts
type CronRunStatus = \"ok\" | \"error\" | \"skipped\";
type CronRunOutcome = {
status: CronRunStatus;
error?: string;
summary?: string;
sessionId?: string;
sessionKey?: string;
};
稳定性保障
- 任务超时:每个任务有执行超时限制(默认 10 分钟)
- 错误退避:指数退避策略防止重试风暴
- 状态恢复:启动时清除过期的运行标记
- 存储锁定:使用
locked()函数确保操作原子性 - 定时器保护:防止定时器泄漏和重入
- 任务重跑:支持手动重跑任务和运行错过的任务
完整流程图
任务创建 → 验证 → 存储 → 调度 → 执行 → 结果处理 → 更新/删除 → 清理
↓
任务创建 → 验证 → 存储 → 调度 → 执行 → 结果处理 → 更新/删除 → 清理
↑ ↑ ↑ ↑
│ │ │ │
└────────────┴────────────┴────────────┘
总结
OpenClaw 的定时任务系统实现了完整的任务生命周期管理,从创建到执行完成的所有阶段都有详细的处理逻辑。系统具有高可靠性和良好的扩展性,支持多种任务类型和执行方式,同时提供了完善的错误处理和稳定性保障机制。
定时任务开始会直接进入消息路由?
定时任务的执行路径
1. 会话管理
定时任务使用专门的会话管理机制:
// src/cron/isolated-agent/run.ts
const agentSessionKey = buildAgentMainSessionKey({
agentId,
mainKey: baseSessionKey,
});
const cronSession = resolveCronSession({
cfg: params.cfg,
sessionKey: agentSessionKey,
agentId,
nowMs: now,
});
- 使用
buildAgentMainSessionKey直接构建会话密钥 - 不通过
resolveAgentRoute解析路由 - 有专门的会话存储和恢复机制(
resolveCronSession)
2. 执行流程
任务执行时直接调用代理运行器:
// src/cron/isolated-agent/run.ts
const fallbackResult = await runWithModelFallback({
cfg: cfgWithAgentDefaults,
provider,
model,
agentDir,
fallbacksOverride: resolveAgentModelFallbacksOverride(params.cfg, agentId),
run: (providerOverride, modelOverride) => {
if (isCliProvider(providerOverride, cfgWithAgentDefaults)) {
return runCliAgent({...});
}
return runEmbeddedPiAgent({...});
},
});
- 直接调用
runEmbeddedPiAgent或runCliAgent - 不经过
dispatchInboundMessage入口点 - 有自己的模型选择和 fallback 机制
3. 交付机制
任务结果交付也有专门的流程:
// src/cron/isolated-agent/run.ts
if (deliveryRequested && !skipHeartbeatDelivery && !skipMessagingToolDelivery) {
if (resolvedDelivery.error) {
// 处理错误
}
const deliveryResults = await deliverOutboundPayloads({
cfg: cfgWithAgentDefaults,
channel: resolvedDelivery.channel,
to: resolvedDelivery.to,
accountId: resolvedDelivery.accountId,
threadId: resolvedDelivery.threadId,
payloads: deliveryPayloads,
agentId,
identity,
bestEffort: deliveryBestEffort,
deps: createOutboundSendDeps(params.deps),
});
delivered = deliveryResults.length > 0;
}
与常规消息路由的区别
| 特性 | 定时任务 | 常规消息路由 |
|---|---|---|
| 会话解析 | buildAgentMainSessionKey |
resolveAgentRoute |
| 入口点 | runCronIsolatedAgentTurn |
dispatchInboundMessage |
| 执行路径 | 直接调用代理运行器 | 经过路由解析和分发 |
| 会话管理 | 专门的 cron 会话机制 | 通用会话管理 |
| 交付方式 | 专门的交付计划 | 通道发送逻辑 |
总结
定时任务有自己独立的执行流程,从会话管理到任务执行再到结果交付,都不依赖常规的消息路由机制。这种设计使得定时任务能够在隔离的环境中运行,避免与常规对话任务相互干扰。