一. agent_loop.rs
1. 整体流程
/// Run the agent main loop.
pub async fn run(self) -> Result<(), Error> {
// Start channels
let mut message_stream = self.channels.start_all().await?;开启所有channel,得到接受流
Agent 启动时 spawn 出来的一个后台定时清理任务,负责周期性地把"长时间无活动的 session"从内存中清理掉,避免 SessionManager 里的几张 in-memory map 无限膨胀。
// Spawn session pruning task
let session_mgr = self.session_manager.clone();
let session_idle_timeout = self.config.session_idle_timeout;
let pruning_handle = tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(600)); // Every 10 min
interval.tick().await; // Skip immediate first tick
loop {
interval.tick().await;
session_mgr.prune_stale_sessions(session_idle_timeout).await;
}
});
每 10 分钟一次:
1. 找 last_active_at < (now - session_idle_timeout) 且 try_lock 成功的 sessions
2. 收集它们的 thread_ids
3. 对每个 stale session spawn 一个 OnSessionEnd hook(不等)
4. 从 sessions / thread_map / undo_managers 三个 map 里清掉相关条目
5. 打 info 日志,返回清理数量
Agent 启动时 spawn 的一个 5 分钟一次的后台 worker,负责把本地排队中的脱敏对话 trace 推送到 Trace Commons 远端,并把"贡献被引用"的致谢通知通过用户可用的 channel 投递出去。
脱敏对话 trace = agent 跑完一轮对话后,把这次对话的 turns 打包成 envelope,过一遍 deterministic redactor(移除/替换 PII、密钥、邮箱、IP、内部路径等敏感字段),再排进本地队列、最终推到 Trace Commons
远端的那份"对话记录"。
它是 Trace Commons 共享语料库的基本单位——别人可以下载、研究、微调,但永远看不到原始用户名、token、个人信息。
let trace_queue_worker_handle = spawn_trace_queue_flush_worker(
self.owner_id().to_string(),
self.deps.store.clone(),
self.channels.clone(),
);
// Spawn heartbeat if enabled
let heartbeat_handle
Agent 启动时 spawn 的心跳后台任务,周期性读取 HEARTBEAT.md 让 agent 主动检查用户/工作区状态并把发现通过 channel 通知用户——也就是"在用户没说话时也保持主动"的定时器。默认 30 分钟一次,频率由 hb_config
控制。
Agent 启动时 spawn 的定时/事件触发引擎,按 Routine 配置里的 cron、事件或系统信号去自动执行用户预设的轻量或完整任务——比如"每天早上 9 点整理邮件"、"收到某类事件时跑一次全 agent 流程"。
// Spawn routine engine if enabled
let routine_handle