一. 相关结构
1. ChannelManager
管理所有channel的类。通过管道tx发送消息,通过rx接受消息传给agent。所有的channel的rx会合并成一个大流。
有一个注入管道,用于热载入channel和背景任务使用。热载的将会在下次进入正常流程。
除了下面的外,其余操作都是借助具体的channel去使用功能。
/// Manages multiple input channels and merges their message streams.
///
/// Includes an injection channel so background tasks (e.g., job monitors) can
/// push messages into the agent loop without being a full `Channel` impl.
pub struct ChannelManager {
channels: Arc<RwLock<HashMap<String, Arc<dyn Channel>>>>,
inject_tx: mpsc::Sender<IncomingMessage>,
/// Taken once in `start_all()` and merged into the stream.
inject_rx: tokio::sync::Mutex<Option<mpsc::Receiver<IncomingMessage>>>,
}
/// Hot-add a channel to a running agent.
///
/// Starts the channel, registers it in the channels map for `respond()`/`broadcast()`,
/// and spawns a task that forwards its stream messages through `inject_tx` into
/// the agent loop.
pub async fn hot_add(&self, channel: Box<dyn Channel>) -> Result<(), ChannelError> {
let name = channel.name().to_string();
// Shut down any existing channel with the same name to avoid parallel consumers.
// The old forwarding task will stop when the channel's stream ends after shutdown.
{
let channels = self.channels.read().await;
if let Some(existing) = channels.get(&name) {
tracing::debug!(channel = %name, "Shutting down existing channel before hot-add replacement");
let _ = existing.shutdown().await;
}
}
let stream = channel.start().await?;
// Register for respond/broadcast/send_status
self.channels
.write()
.await
.insert(name.clone(), Arc::from(channel));
// Forward stream messages through inject_tx
let tx = self.inject_tx.clone();//起了一个背景任务,为了将热载入的channel使用注入tx发送到agentLoop
tokio::spawn(async move {
use futures::StreamExt;
let mut stream = stream;
while let Some(msg) = stream.next().await {
if tx.send(msg).await.is_err() {
tracing::warn!(channel = %name, "Inject channel closed, stopping hot-added channel");
break;
}
}
tracing::debug!(channel = %name, "Hot-added channel stream ended");
});
Ok(())
} /// Start all channels and return a merged stream of messages. 启动所有channel,然后合并成一个接受流给agent使用
///
/// Also merges the injection channel so background tasks can push messages
/// into the same stream.
pub async fn start_all(&self) -> Result<MessageStream, ChannelError> {
let channels = self.channels.read().await;
let mut streams: Vec<MessageStream> = Vec::new();
for (name, channel) in channels.iter() {
match channel.start().await {
Ok(stream) => {
tracing::debug!("Started channel: {}", name);
streams.push(stream);
}
Err(e) => {
tracing::error!("Failed to start channel {}: {}", name, e);
// Continue with other channels, don't fail completely
}
}
}
if streams.is_empty() {
return Err(ChannelError::StartupFailed {
name: "all".to_string(),
reason: "No channels started successfully".to_string(),
});
}
// Take the injection receiver (can only be taken once)
if let Some(inject_rx) = self.inject_rx.lock().await.take() {
let inject_stream = tokio_stream::wrappers::ReceiverStream::new(inject_rx);
streams.push(Box::pin(inject_stream));
tracing::debug!("Injection channel merged into message stream");
}
// Merge all streams into one
let merged = stream::select_all(streams);
Ok(Box::pin(merged))
}2. channel trait
pub trait Channel: Send + Sync {
/// Get the channel name (e.g., "cli", "slack", "telegram", "http").
fn name(&self) -> &str;
/// Start listening for messages.
///
/// Returns a stream of incoming messages. The channel should handle
/// reconnection and error recovery internally.
async fn start(&self) -> Result<MessageStream, ChannelError>;
/// Send a response back to the user.
///
/// The response is sent in the context of the original message
/// (same channel, same thread if applicable).
async fn respond(
&self,
msg: &IncomingMessage,
response: OutgoingResponse,
) -> Result<(), ChannelError>;
/// Send a status update (thinking, tool execution, etc.).
///
/// The metadata contains channel-specific routing info (e.g., Telegram chat_id)
/// needed to deliver the status to the correct destination.
///
/// Default implementation does nothing (for channels that don't support status).
async fn send_status(
&self,
_status: StatusUpdate,
_metadata: &serde_json::Value,
) -> Result<(), ChannelError> {
Ok(())
}
/// Send a proactive message without a prior incoming message.
///
/// Used for alerts, heartbeat notifications, and other agent-initiated communication.
/// The user_id helps target a specific user within the channel.
///
/// Default implementation does nothing (for channels that don't support broadcast).
async fn broadcast(
&self,
_user_id: &str,
_response: OutgoingResponse,
) -> Result<(), ChannelError> {
Ok(())
}
/// Check if the channel is healthy.
async fn health_check(&self) -> Result<(), ChannelError>;
/// Get conversation context from message metadata for system prompt.
///
/// Returns key-value pairs like "sender", "sender_uuid", "group" that
/// help the LLM understand who it's talking to.
///
/// Default implementation returns empty map.
fn conversation_context(&self, _metadata: &serde_json::Value) -> HashMap<String, String> {
HashMap::new()
}
/// Gracefully shut down the channel.
async fn shutdown(&self) -> Result<(), ChannelError> {
Ok(())
}
}3. gateway channel
/// Web gateway channel implementing the Channel trait.
pub struct GatewayChannel {
config: GatewayConfig,
state: Arc<GatewayState>,
/// Combined auth state: env-var tokens + optional DB-backed tokens.
auth: CombinedAuthState,
}/// Shared state for all gateway handlers.
pub struct GatewayState {
/// Channel to send messages to the agent loop.
pub msg_tx: tokio::sync::RwLock<Option<mpsc::Sender<IncomingMessage>>>,//参与进agentLoop循环,发送管道
/// SSE broadcast manager (Arc-wrapped so extension manager can hold a reference).
pub sse: Arc<SseManager>,//sse链接处理
/// Workspace for memory API (single-user fallback).
pub workspace: Option<Arc<Workspace>>,
/// Optional per-user workspace resolver/pool.
///
/// This is independent of `multi_tenant_mode`: the runtime may provide a
/// per-user workspace pool even in single-user mode for plumbing or test
/// harnesses.
pub workspace_pool: Option<Arc<WorkspacePool>>,
/// Whether the gateway started in multi-tenant mode.
///
/// This is intentionally separate from `workspace_pool.is_some()`: the
/// runtime may still use a per-user workspace resolver in single-user mode,
/// but the unauthenticated bootstrap routes (`/`, `/style.css`) only need
/// to suppress workspace-driven frontend customizations when startup
/// actually determined that multiple tenants exist.
pub multi_tenant_mode: bool,
/// Session manager for thread info.
pub session_manager: Option<Arc<SessionManager>>,
/// Log broadcaster for the logs SSE endpoint.
pub log_broadcaster: Option<Arc<LogBroadcaster>>,
/// Handle for changing the tracing log level at runtime.
pub log_level_handle: Option<Arc<crate::channels::web::log_layer::LogLevelHandle>>,
/// Extension manager for extension management API.
pub extension_manager: Option<Arc<ExtensionManager>>,
/// Tool registry for listing registered tools.
pub tool_registry: Option<Arc<ToolRegistry>>,
/// Database store for sandbox job persistence.
pub store: Option<Arc<dyn Database>>,
/// Cached settings store. When present, settings reads/writes go through
/// the cache layer for consistency with the agent loop. Concrete type so
/// handlers can also call `invalidate_user()` / `flush()`.
pub settings_cache: Option<Arc<crate::db::cached_settings::CachedSettingsStore>>,
/// Container job manager for sandbox operations.
pub job_manager: Option<Arc<ContainerJobManager>>,
/// Prompt queue for Claude Code follow-up prompts.
pub prompt_queue: Option<PromptQueue>,
/// Durable owner scope for persistence and unauthenticated callback flows.
pub owner_id: String,
/// Shutdown signal sender.
pub shutdown_tx: tokio::sync::RwLock<Option<oneshot::Sender<()>>>,
/// WebSocket connection tracker.
pub ws_tracker: Option<Arc<crate::channels::web::ws::WsConnectionTracker>>,
/// LLM provider for OpenAI-compatible API proxy.
pub llm_provider: Option<Arc<dyn ironclaw_llm::LlmProvider>>,
/// Hot-reload controller for the LLM provider chain. Populated at
/// startup when the chain is built from config (not in test harnesses
/// that inject a provider directly).
pub llm_reload: Option<Arc<ironclaw_llm::LlmReloadHandle>>,
/// LLM session manager handed through to `LlmReloadHandle::reload` so
/// the rebuilt chain keeps using the same (potentially authenticated)
/// NEAR AI / OAuth session without forcing a re-login.
pub llm_session_manager: Option<Arc<ironclaw_llm::SessionManager>>,
/// Optional TOML config path that produced the current `LlmConfig`.
/// Needed so a hot-reload reads the same precedence layers
/// (TOML → DB overlay) as startup.
pub config_toml_path: Option<std::path::PathBuf>,
/// Skill registry for skill management API.
pub skill_registry: Option<Arc<std::sync::RwLock<ironclaw_skills::SkillRegistry>>>,
/// Skill catalog for searching the ClawHub registry.
pub skill_catalog: Option<Arc<ironclaw_skills::catalog::SkillCatalog>>,
/// Shared auth manager for gateway auth submission and readiness checks.
pub auth_manager: Option<Arc<crate::auth::extension::AuthManager>>,
/// Scheduler for sending follow-up messages to running agent jobs.
pub scheduler: Option<crate::tools::builtin::SchedulerSlot>,
/// Per-user rate limiter for chat endpoints (30 messages per 60 seconds per user).
pub chat_rate_limiter: PerUserRateLimiter,
/// Per-IP rate limiter for OAuth/auth endpoints (20 requests per 60 seconds per IP).
pub oauth_rate_limiter: PerUserRateLimiter,
/// Rate limiter for webhook trigger endpoints (10 requests per 60 seconds).
pub webhook_rate_limiter: RateLimiter,
/// Registry catalog entries for the available extensions API.
/// Populated at startup from `registry/` manifests, independent of extension manager.
pub registry_entries: Vec<crate::extensions::RegistryEntry>,
/// Cost guard for token/cost tracking.
pub cost_guard: Option<Arc<crate::agent::cost_guard::CostGuard>>,
/// Routine engine slot for manual routine triggering (filled at runtime).
pub routine_engine: RoutineEngineSlot,
/// Server startup time for uptime calculation.
pub startup_time: std::time::Instant,
/// Snapshot of active (resolved) configuration for the frontend.
pub active_config: Arc<tokio::sync::RwLock<ActiveConfigSnapshot>>,
/// Secrets store for admin secret provisioning.
pub secrets_store: Option<Arc<dyn crate::secrets::SecretsStore + Send + Sync>>,
/// DB auth cache for invalidation on security-critical actions.
pub db_auth: Option<Arc<crate::channels::web::auth::DbAuthenticator>>,
/// Shared pairing store (one instance per server, not per request).
pub pairing_store: Option<Arc<crate::pairing::PairingStore>>,
/// OAuth providers for social login (None when OAuth is disabled).
pub oauth_providers: Option<
Arc<
std::collections::HashMap<
String,
Arc<dyn crate::channels::web::oauth::providers::OAuthProvider>,
>,
>,
>,
/// In-memory store for pending OAuth flows (CSRF + PKCE state).
pub oauth_state_store: Option<Arc<crate::channels::web::oauth::state_store::OAuthStateStore>>,
/// Base URL for constructing OAuth callback URLs.
pub oauth_base_url: Option<String>,
/// Email domains allowed for OAuth/OIDC login. Empty means allow all.
pub oauth_allowed_domains: Vec<String>,
/// NEAR wallet auth nonce store (None when NEAR auth is disabled).
pub near_nonce_store: Option<Arc<crate::channels::web::oauth::near::NearNonceStore>>,
/// NEAR RPC endpoint URL for access key verification.
pub near_rpc_url: Option<String>,
/// NEAR network name (mainnet/testnet) for the frontend wallet connector.
pub near_network: Option<String>,
/// Shutdown signal for OAuth/NEAR sweep background tasks.
/// When this sender is dropped, the sweep loops exit gracefully.
#[allow(dead_code)]
pub oauth_sweep_shutdown: Option<tokio::sync::watch::Sender<()>>,
/// Cache for the assembled frontend HTML served from `/`.
///
/// The cache key is derived from the `updated_at` of
/// `.system/gateway/layout.json` and the `.system/gateway/widgets/`
/// directory — both returned by a single cheap `list(".system/gateway/")`
/// call. A hit skips reading the layout, every widget manifest, every
/// widget JS file, and every widget CSS file. A miss (or absent cache)
/// falls through to the full `build_frontend_html()` path.
pub frontend_html_cache: Arc<tokio::sync::RwLock<Option<FrontendHtmlCache>>>,
/// Channel-agnostic tool dispatcher for routing handler operations through
/// the tool pipeline with audit trail.
pub tool_dispatcher: Option<Arc<crate::tools::dispatch::ToolDispatcher>>,
} async fn start(&self) -> Result<MessageStream, ChannelError> {
let (tx, rx) = mpsc::channel(256);
*self.state.msg_tx.write().await = Some(tx);
let addr: SocketAddr = format!("{}:{}", self.config.host, self.config.port)
.parse()
.map_err(|e| ChannelError::StartupFailed {
name: "gateway".to_string(),
reason: format!(
"Invalid address '{}:{}': {}",
self.config.host, self.config.port, e
),
})?;
// The warning bridge forwards WARN/ERROR log lines into the
// shared SSE stream as verbose-only `AppEvent::Warning` frames.
// The `tracing` layer that feeds it captures log context at the
// global subscriber scope, not at request scope — a WARN
// emitted inside tenant A's request handler is indistinguishable
// from a global gateway warning. Scoping the whole bridge to
// the gateway `owner_id` in multi-tenant mode would deliver
// tenant A's warnings to the admin/owner account instead of
// tenant A, misrouting per-request diagnostics across
// accounts. Until per-request provenance is threaded through
// every `warn!` / `error!` call site, keep the bridge off in
// multi-tenant deployments entirely.
if let Some(log_broadcaster) = self.state.log_broadcaster.as_ref() {
if self.state.multi_tenant_mode {
tracing::debug!(
"warning bridge disabled in multi-tenant mode: \
WARN/ERROR log forwarding to debug panel requires \
per-request tenant provenance that is not yet \
wired through the tracing layer"
);
} else {
log_layer::spawn_warning_bridge(
Arc::clone(log_broadcaster),
Arc::clone(&self.state.sse),
None,
);
}
}
platform::router::start_server(addr, self.state.clone(), self.auth.clone()).await?;//启动一个web服务去监听端口:http请求
通过tx参与agentLoop
Ok(Box::pin(ReceiverStream::new(rx)))
} async fn respond(
&self,
msg: &IncomingMessage,
response: OutgoingResponse,
) -> Result<(), ChannelError> {
let thread_id = match &msg.thread_id {//外部channel对话id
Some(tid) => tid.as_str().to_string(),
None => {
return Err(ChannelError::MissingRoutingTarget {
name: "gateway".to_string(),
reason: "respond() requires a thread_id on the incoming message".to_string(),
});
}
};
self.state.sse.broadcast_for_user(//使用sse管理器广播回去
&msg.user_id,
AppEvent::Response {
content: response.content,
thread_id,
},
);
Ok(())
}链路
GatewayChannel 完整使用链路
下面是从浏览器请求到响应回流的整条链路,所有引用均来自实际读到的代码(src/channels/web/、src/channels/、src/agent/、src/bridge/、src/main.rs)。
1. 启动阶段:GatewayChannel::new + with_* + Channel::start
src/main.rs:887 调用 GatewayChannel::new(gw_config, owner_id),随后大量 with_* 链式注入依赖:
GatewayChannel::new
└─ 创建 state: Arc<GatewayState>
├─ sse: SseManager (broadcast::Sender<ScopedEvent>)
├─ msg_tx: RwLock<Option<mpsc::Sender<IncomingMessage>>>
├─ shutdown_tx, ws_tracker, log_broadcaster...
└─ 各 subsystem 占位 Option
└─ 创建 auth: CombinedAuthState (env_auth + 可选 db_auth + 可选 OIDC)
mod.rs:115-207。每个 with_*(with_workspace、with_session_manager、with_tool_registry、with_db_auth、with_job_manager、with_skill_registry、with_llm_provider、with_oauth 等)通过 rebuild_state() 重建 Arc<GatewayState>(注意:必须 start() 前调用,已连接后不能改 sse sender)。
Channel::start() (mod.rs:697-743) 流程:
1. 创建 mpsc 通道 (tx, rx),把 tx 写入 state.msg_tx —— 之后所有 HTTP handler 拿这个 sender 往 agent 推消息。
2. 解析 SocketAddr,绑定 TcpListener。
3. 在多租户模式下关闭 warning bridge(注释说清楚原因:tracing 层不带请求级租户信息)。
4. 调用 platform::router::start_server(addr, state, auth) 启动 axum。
5. 返回 Box::pin(ReceiverStream::new(rx)) 给 ChannelManager。
2. axum 路由组合(platform/router.rs::start_server)
四组 router,最后 .with_state(state.clone()):
┌─────────┬───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┬───────────────┐
│ Router │ 路由 │ 鉴权 │
├─────────┼───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┼───────────────┤
│ public │ /api/health、/oauth/callback、/auth/*、/api/webhooks/*、/relay/events │ 无 │
├─────────┼───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┼───────────────┤
│ protect │ 全部 /api/chat/*、/api/memory/*、/api/jobs/*、/api/extensions/*、/api/skills/*、/api/settings/*、/api/admin/*、/api/profile、/api/tokens、/v1/chat/completions、/api/ │ auth_middlewa │
│ ed │ v1/responses、/v1/responses 等 │ re │
├─────────┼───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┼───────────────┤
│ statics │ /、/app.js、/style.css、/theme.css、/admin*、/i18n/*、/favicon.ico、/theme-init.js 等 │ 无 │
├─────────┼───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┼───────────────┤
│ project │ /projects/{id}/* │ auth_middlewa │
│ s │ │ re │
└─────────┴───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┴───────────────┘
跨切层(按从外到内):
- DefaultBodyLimit::max(14 MiB)
- catch_panic(500 兜底,截断到 200 字节)
- CorsLayer(同源 + localhost 跨域)
- X-Content-Type-Options: nosniff
- X-Frame-Options: DENY
- Content-Security-Policy: BASE_CSP_HEADER
auth_middleware 来自 platform/auth.rs::CombinedAuthState(env_token / DbAuthenticator / OIDC JWT 任一通过即可)。
3. 入站:浏览器 → 鉴权 → 处理器
举最常用例子:用户发消息到 POST /api/chat/send。
features/chat/mod.rs:82-164 chat_send_handler:
1. 提取 State<Arc<GatewayState>> + AuthenticatedUser(user) + Json<SendMessageRequest>。
2. 速率限制:state.chat_rate_limiter.check(&user.user_id) —— 30 req/60s 滑动窗口(state.rs::PerUserRateLimiter::new(30, 60))。
3. web_incoming_message("gateway", &user.user_id, content, thread_id) 构造 IncomingMessage(自动注入 metadata.user_id),关联附件、时区。
4. 从 state.msg_tx 克隆 sender(短锁,避免跨 await 持锁)。
5. tx.send(msg).await 把消息推进 agent 主循环。
6. 返回 202 ACCEPTED + SendMessageResponse { message_id, status: "accepted" }。
▎ 关键:HTTP 入口到这一步就 返回了 202。真正的 agent 工作是异步的。
ChannelManager 同样把 GatewayChannel::start() 返回的 MessageStream 合并到统一的 mpsc::Receiver<IncomingMessage> 流(manager.rs:99-133 start_all + inject_rx 合并)。
4. agent 主循环消费(src/agent/agent_loop.rs::run)
agent_loop.rs:1080 self.channels.start_all().await? 拿到合并流,然后进入主循环 loop:
loop {
select! {
Ctrl-C => break,
msg = message_stream.next() => { message = msg }
}
// 中间件:音频转写、文档提取
match self.handle_message(&message).await { ... }
}
agent_loop.rs:1486-1502(run() 主循环)。
handle_message(agent_loop.rs:1713)分发到:
- v1 路径:dispatcher.rs::ChatDelegate —— run_agentic_loop() 共享引擎;send_status() 大量 StatusUpdate::Thinking / ToolStarted / ToolCompleted / ToolResult / ReasoningUpdate / TurnMetrics / ImageGenerated(见 dispatcher.rs:210, 641, 789, 872, 903, 1086, 1108, 1139, 1163, 1261, 1288, 1312, 1625)。
- v2 路径:bridge::router::handle_with_engine() —— 走 ironclaw_engine 的 ConversationManager / ThreadManager。
5. 出站:agent → ChannelManager → GatewayChannel
respond_then_done(agent_loop.rs:686-714)保证顺序:
self.channels.respond(message, response).await; // 1. 先发正文
self.channels.send_status(channel, StatusUpdate::Status("Done"), &metadata).await; // 2. 再发 Done
ChannelManager::respond (manager.rs:136-150) 通过 msg.channel 字段(这里就是 "gateway")找到注册过的 channel 实例,转发到 GatewayChannel::respond()(mod.rs:745-769):
self.state.sse.broadcast_for_user(
&msg.user_id,
AppEvent::Response { content: response.content, thread_id },
);
ChannelManager::send_status 类似地路由到 GatewayChannel::send_status(mod.rs:771-1003)—— 这是个超长 match,把 30+ 个 StatusUpdate 变体转成 AppEvent(如 Thinking → AppEvent::Thinking、ApprovalNeeded → AppEvent::ApprovalNeeded 等),再调用 dispatch_status_event(mod.rs:664-689)。
dispatch_status_event 是 SSE 路由策略核心:
- Some(user_id) → sse.broadcast_for_user(uid, event)(带 // projection-exempt: bridge dispatcher, scoped status update 注释,符合 .claude/rules/gateway-events.md)。
- None + multi_tenant_mode → WARN 并丢弃(不能跨租户广播)。
- None + 单租户 → sse.broadcast(event)(全局广播)。
6. SSE / WebSocket 推送到浏览器
SseManager(platform/sse.rs:44-148)内部用 tokio::sync::broadcast::Sender<ScopedEvent>:
pub fn broadcast_for_user(&self, user_id: &str, event: AppEvent) {
let _ = self.tx.send(self.next_scoped_event(Some(user_id.into()), event));
}
每个 ScopedEvent 带 id({boot_uuid}:{seq})、user_id、AppEvent。
订阅侧(/api/chat/events → chat_events_handler,mod.rs:474-499):
let sse = state.sse.subscribe(Some(user.user_id), verbose, last_event_id)?;
subscribe 内部 tx.subscribe() 得到 BroadcastStream,过滤规则(sse.rs:188-200):
- user_id=None(全局事件)→ 所有订阅者收。
- 订阅者 user_id=None → 收全部。
- 订阅者 user_id=Some(sub),事件 user_id=Some(ev),且 sub == ev → 收。
- 其他 → 跳过。
- verbose-only 事件(ToolResultFull、TurnMetrics)只发给 verbose=true 的连接(防止每条 tool call 复制 50KB)。
axum 把流装成 SSE 响应(30s keepalive,header X-Accel-Buffering: no、Cache-Control: no-cache),event: 字段 = AppEvent 变体名,id: 字段 = <boot>:<seq>,data: 字段 = JSON(#[serde(tag = "type")])。
WebSocket 路径(/api/chat/ws → chat_ws_handler,mod.rs:501-547):
1. 校验 Origin 必须是 localhost / 127.0.0.1 / [::1](is_local_origin)。
2. axum WebSocketUpgrade::on_upgrade → platform::ws::handle_ws_connection。
3. 双任务:sender 把 subscribe_raw() 拿到的 AppEvent 流编码成 {"type":"event","event_type":"...","data":...} 推给客户端;receiver 把客户端消息解码为 WsClientMessage(message / approval),再走和 chat_send_handler 一样的路径(写 msg_tx 或调 inline gate resolver)。
7. v2 引擎路径的 SSE 投影
当 ENGINE_V2=true 时(bridge/router.rs:64-68 is_engine_v2_enabled),agent 调 handle_with_engine(),用 ironclaw_engine 的事件日志作为唯一可信源。投影层有两路:
1. 主对话:bridge/router.rs:6418 thread_event_to_app_events(event, thread_id) -> Vec<AppEvent> —— 唯一把 ThreadEvent 翻成 AppEvent 的函数(EventKind::StepStarted → Thinking,ActionExecuted → ToolStarted + ToolCompleted 等)。
2. gate 暂停后:spawn_post_park_continuation(bridge/router.rs:4922-5060)订阅 thread_manager.subscribe_events(),对匹配 thread_id 的事件跑 thread_event_to_app_events + redact_code_executed_secrets,再 sse.broadcast_for_user(&user_id, app_event),带 // projection-exempt: bridge dispatcher, post-park event forwarding 注释。
这条规则被 .claude/rules/gateway-events.md 强制:除了源日志(bridge dispatcher / sandbox JobEvent / channel-lifecycle)和 transport-only 允许列表(Heartbeat、StreamChunk),其他 sse.broadcast* 调用必须带 // projection-exempt: <category>, <detail> 注释。
8. 子系统的 state.broadcast / state.sse 路径
GatewayChannel::broadcast(user_id, response)(mod.rs:1005-1045)用于主动推送(心跳、自修复、任务结果),如果 response.thread_id 缺失就用 store.get_or_create_assistant_conversation(user_id, "gateway") 兜底。HealthCheck / Shutdown 看 state.msg_tx.read().await / state.shutdown_tx。
9. 端到端时序
Browser POST /api/chat/send
└─ axum 路由 → auth_middleware → chat_send_handler
└─ 速率检查
└─ 构造 IncomingMessage (带 metadata.user_id, thread_id, attachments)
└─ tx.send(msg) → state.msg_tx
└─ 返回 202 ACCEPTED + message_id
[agent_loop.rs 主循环]
└─ message_stream.next() → 拿到 msg
└─ 中间件 (转写/文档提取)
└─ handle_message(msg)
├─ v1: ChatDelegate.run_agentic_loop()
│ └─ send_status(Thinking/ToolStarted/ToolResult/...)
│ └─ LLM 调用 + 工具执行
│ └─ respond_then_done(response, Status("Done"))
└─ v2: handle_with_engine()
└─ 投影 ThreadEvent → AppEvent → sse.broadcast_for_user
[ChannelManager 路由]
└─ respond / send_status → GatewayChannel.respond / send_status
└─ sse.broadcast_for_user(user_id, AppEvent::Response/...)
[SSE/WS 订阅者]
└─ SseManager.subscribe() 的 BroadcastStream 过滤
└─ axum SSE / WebSocket 帧发给浏览器
Browser EventSource / WebSocket 收到 frame → 渲染
10. 关键不变量(与 .claude/rules/*.md 对齐)
- with_* 必须在 start() 前——rebuild_state 替换 Arc<GatewayState>,已建连的 SSE sender 仍然有效(保留 sse 字段)但其他字段对运行中行为无意义。
- is_internal 不可伪造(channel.rs:62 注释),但这个 channel 不依赖它。
- 多租户隔离:dispatch_status_event + SseManager 的 user_id 过滤 + with_metadata 的 user_id 字符串覆写(防止 WASM channel 伪造 owner)。
- 审计优先:UI 启动的 mutation 走 ToolDispatcher(不在 GatewayChannel 主路径上,是 handlers 的职责)。
- Agent/UI 一致:state.sse.broadcast* 只能从源日志(bridge dispatcher、sandbox JobEvent、channel-lifecycle)发,注释豁免要写明 category。
- 顺序保证:respond_then_done 保证 Response 帧在 Done 之前到达(修复 #2079 SSE 早关闭 turn 的 bug)。
11. 关键文件索引
┌──────────────────────────────────┬──────────────────────────────────────────────────────┐
│ 关注点 │ 文件 │
├──────────────────────────────────┼──────────────────────────────────────────────────────┤
│ Channel 抽象 │ src/channels/channel.rs:894-960 │
├──────────────────────────────────┼──────────────────────────────────────────────────────┤
│ ChannelManager │ src/channels/manager.rs │
├──────────────────────────────────┼──────────────────────────────────────────────────────┤
│ GatewayChannel 定义 + trait impl │ src/channels/web/mod.rs:103-1064 │
├──────────────────────────────────┼──────────────────────────────────────────────────────┤
│ axum 路由组合 │ src/channels/web/platform/router.rs:113-624 │
├──────────────────────────────────┼──────────────────────────────────────────────────────┤
│ SSE 广播/订阅 │ src/channels/web/platform/sse.rs:44-280 │
├──────────────────────────────────┼──────────────────────────────────────────────────────┤
│ WebSocket │ src/channels/web/platform/ws.rs:64- │
├──────────────────────────────────┼──────────────────────────────────────────────────────┤
│ 鉴权中间件 │ src/channels/web/platform/auth.rs │
├──────────────────────────────────┼──────────────────────────────────────────────────────┤
│ Chat HTTP handlers │ src/channels/web/features/chat/mod.rs │
├──────────────────────────────────┼──────────────────────────────────────────────────────┤
│ 事件源日志投影 │ src/bridge/router.rs:6418 thread_event_to_app_events │
├──────────────────────────────────┼──────────────────────────────────────────────────────┤
│ Agent 主循环 │ src/agent/agent_loop.rs:1070-1620 │
├──────────────────────────────────┼──────────────────────────────────────────────────────┤
│ Agentic loop │ src/agent/agentic_loop.rs、src/agent/dispatcher.rs │
├──────────────────────────────────┼──────────────────────────────────────────────────────┤
│ 启动 wiring │ src/main.rs:887+ │
└──────────────────────────────────┴─────────────────────────────────────是的。`chat_events_handler` 处理的就是浏览器发起 `GET /api/chat/events` 长连接订阅。
## 完整订阅流程
### 1. 浏览器发起 GET 请求
```js
// 前端(用 EventSource,因为 SSE 协议不能自定义 header)
const es = new EventSource('/api/chat/events?token=<gateway_token>');
es.addEventListener('response', e => { /* 最终回复 */ });
es.addEventListener('stream_chunk', e => { /* 流式 token */ });
es.addEventListener('tool_started', e => { /* 工具开始 */ });
es.addEventListener('tool_completed',e => { /* 工具结束 */ });
es.addEventListener('thinking', e => { /* 思考中 */ });
es.addEventListener('status', e => { /* 状态 */ });
es.addEventListener('error', e => { /* 错误,含 reconnect 触发 */ });
```
也可以走 WebSocket(`/api/chat/ws`),语义一样。
### 2. 路由命中 `chat_events_handler`
`platform/router.rs:187` 注册:
```rust
.route("/api/chat/events", get(chat_events_handler))
```
`features/chat/mod.rs:474-499` 处理器做的事:
```rust
let verbose = params.debug && user.role == "admin"; // 1. 决定能否看 verbose 流
let sse = state.sse.subscribe(
Some(user.user_id), // 2. 按 user_id 过滤
verbose,
extract_last_event_id(¶ms, &headers), // 3. 增量补帧
).ok_or((503, "Too many connections"))?;
Ok((
[("X-Accel-Buffering", "no"), ("Cache-Control", "no-cache")],
sse, // 4. 装好的 axum::response::Sse 流
))
```
### 3. 鉴权
`/api/chat/events` 在 `protected` router 下,挂有 `auth_middleware`(`router.rs:486-489`)。中间件按顺序查:
1. `Authorization: Bearer <token>`(env-var token)
2. DB token(`DbAuthenticator`)
3. OIDC JWT header
`EventSource` 不能设 header,所以这个端点额外支持 `?token=xxx` 查询串(`platform/auth.rs::allows_query_token_auth()` 白名单)。
### 4. axum 写出 SSE 响应
`axum::response::Sse` 把流变成:
```
HTTP/1.1 200 OK
Content-Type: text/event-stream
Cache-Control: no-cache
X-Accel-Buffering: no
event: stream_chunk
id: <boot_uuid>:<seq>
data: {"type":"stream_chunk","content":"Hel","thread_id":"..."}
event: tool_started
id: <boot_uuid>:<seq>
data: {"type":"tool_started","name":"http","detail":"GET https://...","call_id":"...","thread_id":"..."}
event: response
id: <boot_uuid>:<seq>
data: {"type":"response","content":"Here is the answer.","thread_id":"..."}
(30s 没事件就发 : keepalive 注释行)
```
`event:` 字段 = `AppEvent` 变体名(`#[serde(tag = "type")]`),`id:` 字段 = `<boot>:<seq>`,让 `EventSource` 断线重连时通过 `Last-Event-ID` 头 / `?last_event_id=` 增量补帧。
### 5. 事件怎么流到这里
事件并不是 handler 主动发的。它在 `subscribe()` 那一刻**已经**和 `SseManager.tx` 这条广播总线连上了:
```
上游发布端:
agent_loop / bridge router / dispatcher
└─► GatewayChannel.respond() / send_status() / broadcast()
└─► state.sse.broadcast_for_user(user_id, AppEvent)
└─► broadcast::Sender.send(ScopedEvent { id, user_id, event })
下游订阅端(多个并存):
┌─ /api/chat/events 浏览器 A (subscribe → 独立 rx + user_id 过滤)
├─ /api/chat/events 浏览器 B (subscribe → 独立 rx + user_id 过滤)
├─ /api/chat/ws 浏览器 C (subscribe_raw → 独立 rx)
└─ JobMonitor 内部订阅 (subscribe_raw → 独立 rx)
```
`tokio::sync::broadcast` 保证:
- **每个订阅者有独立的 lag 位置**,慢的浏览器断线/卡顿只会让自己的 `RecvError::Lagged` 丢帧,不会拖累其他订阅者。
- **新连接的浏览器只看从订阅那一刻起的新事件**——它"漏掉"的历史要从 `/api/chat/history` 拉,不在 SSE 这条路补。
## 答你的问题
**是**——`chat_events_handler` 处理的就是浏览器订阅 SSE 的入口。请求到达:
1. axum 路由表匹配到 `GET /api/chat/events`。
2. `auth_middleware` 验证 token。
3. `chat_events_handler` 调 `state.sse.subscribe(...)` 拿到一个**绑定了该用户 user_id 的** `Sse` 流。
4. axum 立刻把响应头(`200 OK` + `text/event-stream`)写回客户端,**连接保持打开**。
5. 之后只要 `broadcast_for_user` 给这个 user_id 发事件,浏览器就会实时收到 `data:` 帧,直到浏览器关闭 / 服务器 shutdown 才结束。
关键点:handler 在 `subscribe()` 之后**就返回了**,真正的数据推送由 `SseManager` + `broadcast::Sender` 在后台异步驱动。4. WasmChannel
①类型链路相关
pub struct WasmChannel {
/// Channel name.
name: String,
/// Runtime for WASM execution.
runtime: Arc<WasmChannelRuntime>,
/// Prepared module (compiled WASM).
prepared: Arc<PreparedChannelModule>,
/// Channel capabilities.
capabilities: ChannelCapabilities,
/// Channel configuration JSON (passed to on_start).
/// Wrapped in RwLock to allow updating before start.
config_json: RwLock<String>,
/// Channel configuration returned by on_start.
channel_config: RwLock<Option<ChannelConfig>>,
/// Message sender (for emitting messages to the stream).
/// Wrapped in Arc for sharing with the polling task.
message_tx: Arc<RwLock<Option<mpsc::Sender<IncomingMessage>>>>,
/// Pending responses (for synchronous response handling).
pending_responses: RwLock<HashMap<Uuid, oneshot::Sender<String>>>,
/// Rate limiter for message emission.
/// Wrapped in Arc for sharing with the polling task.
rate_limiter: Arc<RwLock<ChannelEmitRateLimiter>>,
/// Shutdown signal sender.
shutdown_tx: RwLock<Option<oneshot::Sender<()>>>,
/// Polling shutdown signal sender (keeps polling alive while held).
poll_shutdown_tx: RwLock<Option<oneshot::Sender<()>>>,
/// Join handle for the active polling task so restarts can wait for the
/// previous long-poll to exit before starting a replacement.
poll_task: RwLock<Option<tokio::task::JoinHandle<()>>>,
/// Websocket runtime shutdown signal sender.
websocket_shutdown_tx: RwLock<Option<oneshot::Sender<()>>>,
/// Host-managed websocket outbound sender used by `on_respond` and
/// `on_poll` to emit protocol-specific websocket frames.
websocket_outbound_tx: Arc<RwLock<Option<mpsc::Sender<String>>>>,
/// Serializes websocket-triggered poll executions.
websocket_poll_lock: Arc<Mutex<()>>,
/// Registered HTTP endpoints.
endpoints: RwLock<Vec<RegisteredEndpoint>>,
/// Injected credentials for HTTP requests (e.g., bot tokens).
/// Keys are placeholder names like "TELEGRAM_BOT_TOKEN".
/// Wrapped in Arc for sharing with the polling task.
credentials: Arc<RwLock<HashMap<String, String>>>,
/// Background task that repeats typing indicators every 4 seconds.
/// Telegram's "typing..." indicator expires after ~5s, so we refresh it.
typing_task: RwLock<Option<tokio::task::JoinHandle<()>>>,
/// Generated images staged from status updates until the final channel
/// response can deliver them together with the assistant's text.
pending_generated_image_attachments: Arc<Mutex<HashMap<String, Vec<String>>>>,
/// Pairing store for DM pairing (guest access control).
pairing_store: Arc<PairingStore>,
/// In-memory workspace store persisting writes across callback invocations.
/// Ensures WASM channels can maintain state (e.g., polling offsets) between ticks.
workspace_store: Arc<ChannelWorkspaceStore>,
/// Serializes callback execution for a single channel instance.
///
/// Some channel state is read-modify-written through the shared workspace
/// store, so overlapping callbacks can otherwise lose updates.
callback_lock: Arc<tokio::sync::Mutex<()>>,
/// Last-seen message metadata (contains chat_id for broadcast routing).
/// Populated from incoming messages so `broadcast()` knows where to send.
last_broadcast_metadata: Arc<tokio::sync::RwLock<Option<String>>>,
/// Settings store for persisting broadcast metadata across restarts.
settings_store: Option<Arc<dyn crate::db::SettingsStore>>,
/// Stable owner scope for persistent data and owner-target routing.
owner_scope_id: String,
/// Channel-specific actor ID that maps to the instance owner on this channel.
/// Wrapped in `Arc` so spawned polling/websocket tasks can read the current
/// value after pairing approval without capturing a stale clone.
owner_actor_id: Arc<tokio::sync::RwLock<Option<String>>>,
/// User bound to a single-login channel such as WeChat.
channel_bound_user_id: Arc<RwLock<Option<String>>>,
/// Secrets store for host-based credential injection.
/// Used to pre-resolve credentials before each WASM callback.
secrets_store: Option<Arc<dyn SecretsStore + Send + Sync>>,
}
---
# WasmChannel 必看字段清单(博客版)
> 文件:`src/channels/wasm/wrapper.rs:782-885`、`mod.rs:120`、`capabilities.rs:24-59`
> 一句话:WasmChannel 是宿主进程内对**一个 WASM 模块实例**的 Rust 侧封装,承载 Channel trait 的全部交互;它的所有字段都围绕"如何把外部消息安全、可控、可观测地喂给一个沙箱里的插件"展开。
## 1. 标识与"是什么 channel"
| 字段 | 类型 | 用途 | 写博客要点 |
| -------------- | ---------------------------- | --------------------------------------------------------- | ----------------------------------------------------------------------------- |
| `name` | `String` | channel 名(= WASM 模块名),Channel trait 暴露给上层路由 | 决定 ChannelManager 里按什么 key 注册,Dispatch 时按 `msg.channel` 找到它 |
| `capabilities` | `ChannelCapabilities` | **核心安全边界**,声明本 channel 能做什么 | 见 §6 |
| `prepared` | `Arc<PreparedChannelModule>` | 编译好的 WASM component | 一次编译、多次实例化;回调时 `new_store` + `instantiate` 拿新实例(每次隔离) |
## 2. 运行时载体("消息怎么进出")
| 字段 | 类型 | 用途 |
| ------------------- | ---------------------------------------------------- | ---------------------------------------------------------------------------------------------------- |
| `runtime` | `Arc<WasmChannelRuntime>` | wasmtime 引擎 + 链接器 + host functions(logging、http、workspace…) |
| `message_tx` | `Arc<RwLock<Option<mpsc::Sender<IncomingMessage>>>>` | WASM 内部 `emit_message` 写入 → 转成 IncomingMessage 投到 agent 主循环 |
| `pending_responses` | `RwLock<HashMap<Uuid, oneshot::Sender<String>>>` | **同步响应通道**:`emit_message` 配 `request_response` 模式时挂载的 one-shot,等待 host 立即给个回执 |
| `rate_limiter` | `Arc<RwLock<ChannelEmitRateLimiter>>` | emit 节流(默认 100/min、5000/h)——防恶意/写崩的 WASM 把 agent 灌爆 |
> 写稿角度:这三件套是"一个沙箱插件"和"主进程"**唯一的对话窗口**。`Arc<RwLock<Option<...>>>` 模式是为了让 polling / websocket 任务能 clone 出自己的 sender。
## 3. 长生命周期任务句柄("后台跑什么")
| 字段 | 类型 | 启动时机 | 关闭方式 |
| ------------------------------------------------------------------------- | ------------------------------------- | ----------------------- | ------------------------------------------------------------------------------------------------------------ |
| `shutdown_tx` | `RwLock<Option<oneshot::Sender<()>>>` | `Channel::start()` 末尾 | `Channel::shutdown()` 时 fire |
| `poll_shutdown_tx` | `RwLock<Option<oneshot::Sender<()>>>` | 启动 polling 时 | 仅在 polling 关闭时 |
| `poll_task: RwLock<Option<JoinHandle<()>>>` | 轮询任务句柄 | start_polling 后 | **关键**:替换轮询配置时要先 `await` 旧的退出(避免重叠 #2557 类问题) |
| `websocket_shutdown_tx` / `websocket_outbound_tx` / `websocket_poll_lock` | oneshot + mpsc + Mutex | 启动 ws runtime 时 | `WebsocketStartDecision::MissingAuth` 时**绝不启动**(注释明说 #2557 教训:Discord 4003 拒后不能死循环重试) |
> 写稿角度:**"installed ≠ active"** 规则的真实落地。discovery 时只装载模块;activation 时才创建这些后台任务句柄;auth 失败就保持 `None`,等用户写完 token 后由 `refresh_active_channel` 重新激活。
## 4. 配置 / 凭据("channel 自身的状态")
| 字段 | 类型 | 用途 |
| --------------------------------------------------- | -------------------------------------------------------- | --------------------------------------------------------------------------------------- |
| `config_json: RwLock<String>` | 配置 JSON(start 前可改、start 后改不动) | 传给 `on_start` 的初始配置;启动后可由 `with_runtime_config` 注入 `tunnel_url` 等动态值 |
| `channel_config: RwLock<Option<ChannelConfig>>` | `on_start` 返回的配置(HTTP endpoints + poll + ws 决策) | `start()` 时拿到,注册 endpoint、起轮询、起 ws |
| `credentials: Arc<RwLock<HashMap<String, String>>>` | URL 注入占位符 | 形如 `{TELEGRAM_BOT_TOKEN}` 在 URL / header 中被替换;**WASM 永远拿不到明文** |
| `secrets_store: Option<Arc<dyn SecretsStore>>` | 宿主侧的 secrets 存储 | `with_secrets_store` 注入;**回调时**按 host_patterns 预解析、自动注入到出站 HTTP |
| `secrets_store`-类预解析 | `Vec<ResolvedHostCredential>` | host + path 匹配 → header/query 注入;多匹配按 path specificity 排序 |
> 写稿角度:把"凭据"切成两层。**`credentials`**:插件作者在 WASM 里写 `{TELEGRAM_BOT_TOKEN}` 的占位(XML 模板级别)。**`secrets_store`**:宿主在调用前按 host 匹配自动注入(系统级能力)。这两层是 host **永远不让 WASM 看到明文**的 zero-exposure 模型。
## 5. 路由与状态
| 字段 | 类型 | 用途 |
| ------------------------------------------------------------------------------- | ------------------------------------------- | ---------------------------------------------------------------------------------------------------- |
| `endpoints: RwLock<Vec<RegisteredEndpoint>>` | 在 host 注册的 HTTP webhook 路径 | start() 时遍历 `channel_config.http_endpoints`,按 `capabilities.is_path_allowed` 过滤后注册 |
| `workspace_store: Arc<ChannelWorkspaceStore>` | **callback 之间共享的 KV 存储** | 跨 `on_poll` / `on_http_request` 保存 offset、token 等 |
| `callback_lock: Arc<Mutex<()>>` | 串行化同一个 channel 实例的所有回调 | 防 workspace 读写竞争 |
| `last_broadcast_metadata: Arc<RwLock<Option<String>>>` | 最近一次消息的 chat_id 等元数据 | `Channel::broadcast()` 不知道往哪发时,从这里取上次的目的地 |
| `settings_store: Option<Arc<dyn SettingsStore>>` | 把 broadcast metadata 持久化 | 跨重启恢复 |
| `owner_scope_id: String` | 稳定的所有者 scope | 决定 settings key、broadcast 路由 |
| `owner_actor_id: Arc<RwLock<Option<String>>>` | channel 侧 actor id ↔ 宿主 owner | **pairing 通过后会更新**;用 `Arc<RwLock<...>>` 是为了让 polling/WS 长任务读到新值而不是捕获旧 clone |
| `channel_bound_user_id: Arc<RwLock<Option<String>>>` | 单点登录 channel(如 WeChat)绑定的 user | 来自 config_json 解析 |
| `pending_generated_image_attachments: Arc<Mutex<HashMap<String, Vec<String>>>>` | 工具生成的图片按"最终回复"打包发出 | 见 §7 注释 |
| `pairing_store: Arc<PairingStore>` | DM 配对(guest 访问控制) | WebChat 可走 `/api/pairing/{channel}/approve` 批准 |
| `typing_task: RwLock<Option<JoinHandle<()>>>` | 每 4s 重发 typing 指示(Telegram ~5s 过期) | `send_status(Thinking)` 时启动;terminal 状态时取消 |
> 写稿角度:这一组是"channel 自洽性"的全部——polling 拿到一条消息、emit 给 agent、agent 回信、回到 on_respond、可能要发图。**`workspace_store` + `callback_lock` + `owner_actor_id: Arc<RwLock>`** 是最容易写错的三个。
## 6. ChannelCapabilities(最该截图贴博客)
`capabilities.rs:24-59`。**这一段决定整个 WASM channel 系统的安全姿态**,建议博客里整个截图:
| 字段 | 默认 | 含义 |
| ------------------------- | ----------------------------- | ---------------------------------------------------------------------------------------------------------------------- |
| `tool_capabilities` | `ToolCapabilities::default()` | 复用的工具能力(HTTP 白名单、secrets 访问、workspace_read…) |
| `allowed_paths` | `[]` | HTTP webhook 路径白名单。`is_path_allowed` 严格相等匹配,**默认 0 个**意味着不开放 webhook |
| `allow_polling` | `false` | 是否允许长轮询 |
| `min_poll_interval_ms` | `30_000`(30s) | 轮询最小间隔。`validate_poll_interval` 会强制 clamp,**不能低于 30s**(防失控) |
| `workspace_prefix` | `""` / `channels/<name>/` | 命名空间:所有写入自动加前缀 |
| `durable_workspace_paths` | `[]` | **跨重启持久化**的白名单。默认全空,意思是 callback workspace 全在内存,重启就丢(设计意图:防 token/secret 误持久化) |
| `emit_rate_limit` | 100/min, 5000/h | emit_message 节流 |
| `max_message_size` | 64 KB | 单条 emit 上限 |
| `callback_timeout` | 30s | 每次 WASM 回调的最大执行时间 |
> **写稿金句**:`validate_workspace_path` 三个拒绝规则(绝对路径、`..`、NUL 字节) + `is_durable_workspace_path` 白名单 = "默认 deny、显式 allow"。
## 7. Channel trait 实现的 5 个方法(行为骨架)
`wrapper.rs:3681-3930`:
| 方法 | 关键步骤 | 设计点 |
| --------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ----------------------------------------------------------------------------------- |
| `name()` | 返回 `&self.name` | 简单 |
| `start()` | **先**调 `on_start`(失败直接返回,**不创建** message_tx)→ 再 `mpsc::channel(256)` → 写 message_tx、shutdown_tx → 注册 endpoints(按 capabilities 过滤)→ 起 polling → 决定 ws(`MissingAuth` **不启**)→ 返回 MessageStream | **顺序很关键**:`on_start` 失败时不能留 orphan tx(注释 3693-3695 明说) |
| `respond()` | 包装 `call_on_respond` | 收到 agent 回复 |
| `broadcast()` | 包装 `call_on_broadcast` | 主动推送,目的地从 `last_broadcast_metadata` 推 |
| `send_status()` | 包装 `call_on_status` | Thinking → 起 4s typing 重发;terminal → 取消 typing;StreamChunk → no-op(防刷屏) |
| `shutdown()` | fire `shutdown_tx` / `websocket_shutdown_tx` / `poll_shutdown_tx` → await 任务退出 | |
## 8. "必看" 速记表(直接贴博客)
```
┌─ 身份 ───────────┐
│ name │ ChannelManager 的 key
│ capabilities │ 安全边界(必看 §6)
│ prepared/runtime │ WASM 引擎 + 编译产物
└──────────────────┘
┌─ 对话窗口 ───────┐
│ message_tx │ WASM → agent
│ pending_responses│ 同步回执
│ rate_limiter │ 流量护栏
└──────────────────┘
┌─ 长任务 ─────────┐
│ *_shutdown_tx │ 三类 one-shot
│ poll_task │ 替换时必须 await 旧
│ websocket_* │ MissingAuth 不启
└──────────────────┘
┌─ 凭据 ───────────┐
│ credentials │ 占位符替换(XML 模板级)
│ secrets_store │ host 按 host_patterns 注入
└──────────────────┘
┌─ 自洽状态 ───────┐
│ workspace_store + callback_lock │ 跨回调串行 KV
│ last_broadcast_metadata │ 主动推送给谁
│ owner_actor_id: Arc<RwLock> │ pairing 后可变
│ typing_task │ Telegram 4s 重发
│ durable_workspace_paths │ 默认空 = 不持久化
└──────────────────┘
```启动
async fn start(&self) -> Result<MessageStream, ChannelError> {
// Restore broadcast metadata from settings (survives restarts)
self.load_broadcast_metadata().await;
恢复上次运行记住的"我最后把消息发到哪个 chat_id"。这是个跨重启的细节:
- 主动推送(broadcast)时 WASM 插件不知道自己该往哪发。
- 解决方法:每收到一条入站消息时把"目标 chat_id"存进 last_broadcast_metadata,并持久化到 settings_store。
- 重启后从 settings_store 读回来,这样新一次启动就能直接 broadcast 到对的地方。
// Call on_start BEFORE creating message_tx so a failed start
// doesn't leave an orphaned sender with a dropped receiver.
// (If on_start fails, the rx would be dropped on error return
// but message_tx would keep the tx alive — causing is_closed=true
// on any subsequent polling attempt.)
let config = self
.call_on_start()
.await
.map_err(|e| ChannelError::StartupFailed {
name: self.name.clone(),
reason: e.to_string(),
})?;
这是整个 start 流程里最重的一步:
- 新建一个 wasmtime Store(隔离的线性内存 + HostState)。
- 加载 host functions(日志、http、workspace…)。
- 实例化 prepared 模块(编译过的 artifact)。
- 传 config_json 进 WASM 调它的 on_start 导出函数。
- 拿回 ChannelConfig { http_endpoints, poll, websocket, display_name, ... }。
// Create message channel — only after on_start succeeds
let (tx, rx) = mpsc::channel(256);
*self.message_tx.write().await = Some(tx);
3. 创建 mpsc 并写入 self.message_tx
let (tx, rx) = mpsc::channel(256);
*self.message_tx.write().await = Some(tx);
- tx 存到 WasmChannel 字段里,给 polling 任务 / websocket 任务用。
- rx 留在本地,包成 ReceiverStream 后面返回给 ChannelManager。
- buffer 256:积压超 256 条未消费消息就阻塞 emit(背压)。
- message_tx 字段是 Arc<RwLock<Option<...>>> 的原因就在这里:polling 任务、websocket 任务、未来 refresh_active_channel 都要能 clone。
// Create shutdown channel
let (shutdown_tx, _shutdown_rx) = oneshot::channel();
*self.shutdown_tx.write().await = Some(shutdown_tx);
// Store the config
*self.channel_config.write().await = Some(config.clone());
// Register HTTP endpoints
let mut endpoints = Vec::new();
for endpoint in &config.http_endpoints {
// Validate path is allowed
if !self.capabilities.is_path_allowed(&endpoint.path) {
tracing::warn!(
channel = %self.name,
path = %endpoint.path,
"HTTP endpoint path not allowed by capabilities"
);
continue;
}
endpoints.push(RegisteredEndpoint {
channel_name: self.name.clone(),
path: endpoint.path.clone(),
methods: endpoint.methods.clone(),
require_secret: endpoint.require_secret,
});
}
*self.endpoints.write().await = endpoints;
5. 注册 HTTP endpoints(按 capabilities 过滤)
for endpoint in &config.http_endpoints {
if !self.capabilities.is_path_allowed(&endpoint.path) {
tracing::warn!(...);
continue; // 不在白名单 → 跳过
}
endpoints.push(RegisteredEndpoint { ... });
}
WASM 插件在 on_start 里声明想注册哪些路径,但 host 端按 capabilities.allowed_paths 强制二次过滤。
- 插件不能偷偷注册 /admin/*、/internal/*。
- 没在白名单里的路径只 warn,不报错(设计意图:插件可以"声明宽、用得窄",但 host 真正暴露的只能是 cap 允许的那批)。
注册结果存进 endpoints: RwLock<Vec<RegisteredEndpoint>>,host 的 HTTP server 启动时(platform/router.rs 的 webhook 路由)会查这张表来分发请求到对应 channel。
// Start polling if configured
if let Some(poll_config) = &config.poll
&& poll_config.enabled
{
let interval = self
.capabilities
.validate_poll_interval(poll_config.interval_ms)
.map_err(|e| ChannelError::StartupFailed {
name: self.name.clone(),
reason: e,
})?;
// Create shutdown channel for polling and store the sender to keep it alive
let (poll_shutdown_tx, poll_shutdown_rx) = oneshot::channel();
*self.poll_shutdown_tx.write().await = Some(poll_shutdown_tx);
let handle = self.start_polling(
Duration::from_millis(interval as u64),
poll_shutdown_rx,
Arc::clone(&self.owner_actor_id),
);
*self.poll_task.write().await = Some(handle);
}
match websocket_start_decision(
&self.capabilities,
self.secrets_store.as_deref(),
&self.owner_scope_id,
)
.await
{
WebsocketStartDecision::NotConfigured => {}
WebsocketStartDecision::Spawn(websocket_config) => {
let (websocket_shutdown_tx, websocket_shutdown_rx) = oneshot::channel();
*self.websocket_shutdown_tx.write().await = Some(websocket_shutdown_tx);
self.start_websocket_runtime(
websocket_config,
websocket_shutdown_rx,
Arc::clone(&self.owner_actor_id),
);
}
WebsocketStartDecision::MissingAuth { credential_name } => {
// Skip the spawn entirely: the runtime would connect, be
// rejected (e.g. Discord 4003), and retry forever (#2557).
// Recovery happens through the normal activation/restart
// path once the credential is written.
tracing::warn!(
channel = %self.name,
credential_name = %credential_name,
"Skipping websocket runtime start: required credential is not present"
);
}
WebsocketStartDecision::MalformedConfig { reason } => {
tracing::warn!(
channel = %self.name,
reason = %reason,
"Skipping websocket runtime start: malformed capability configuration"
);
}
}
tracing::info!(
channel = %self.name,
display_name = %config.display_name,
endpoints = config.http_endpoints.len(),
"WASM channel started"
);
Ok(Box::pin(ReceiverStream::new(rx)))
}
够了。三种"消息从外部世界到 agent"机制讲清楚。
# Webhook / Polling / WebSocket:WASM channel 的三种入站方式
> 全部从 `src/channels/wasm/` 读出来。这三种机制**不是给宿主用的**,是给 WASM channel 插件用的"如何从外部 IM 平台拿到消息"的三种通讯模型。
## 一句话对比
| 机制 | 触发方向 | 适用平台 | 核心特征 |
| ------------- | ----------- | ----------------------------------------------------------------------- | -------------------------------------- |
| **Webhook** | 外部 → 宿主 | 平台支持回调 URL 时(Slack、Telegram、GitHub) | 宿主**暴露 HTTP 路径**,平台主动推过来 |
| **Polling** | 宿主 → 平台 | 平台**不**支持回调时(一些无 webhook 的老 IM) | 宿主**定时调**平台的 list API |
| **WebSocket** | 双向长连接 | 平台提供 ws gateway 时(Discord v10、Slack Apps 用 Events API ws mode) | 宿主**保持一条长连接**,双向推 |
## 1. Webhook:宿主开门让外面"敲门"
### 流程
```
用户发消息给 Telegram bot
│
▼
Telegram 服务器主动 POST https://你的域名/webhook/telegram
│
▼
WasmChannelRouter 收到请求
├─ 查 path_to_channel 找到 "telegram" channel
├─ 验 X-Telegram-Bot-Api-Secret-Token 是否匹配
└─ call_on_http_request(channel, body) ──► WASM 插件处理
│
▼
emit_message(...) ──► 投到 mpsc
```
### 关键代码
`src/channels/wasm/wrapper.rs:3715-3735` (`start()` 里的 endpoint 注册):
```rust
for endpoint in &config.http_endpoints {
if !self.capabilities.is_path_allowed(&endpoint.path) { // ← 二次过滤
continue; // WASM 声明不算数
}
endpoints.push(RegisteredEndpoint { ... });
}
```
`src/channels/wasm/router.rs:24-33` 的 `RegisteredEndpoint`:
```rust
pub struct RegisteredEndpoint {
pub channel_name: String,
pub path: String, // e.g., "/webhook/slack"
pub methods: Vec<String>, // GET / POST
pub require_secret: bool, // ← 必须验签
}
```
**两层 secret 校验**:
- 普通 channel:`X-Webhook-Secret: <shared_secret>` 头校验。
- Telegram:`X-Telegram-Bot-Api-Secret-Token: <token>` 头校验(`router.rs:62-67, 91, 154` 注释里明说 Telegram 用专用头)。
- Slack:用 `signature_keys` 做 Ed25519 验签(`router.rs:64`)或 `hmac_secrets` 做 HMAC-SHA256 验签(`router.rs:67`)。
### WASM 插件里要做什么
在 `on_start` 的 `ChannelConfig.http_endpoints` 里**声明**:
```rust
ChannelConfig {
http_endpoints: vec![HttpEndpointConfig {
path: "/webhook/slack".into(),
methods: vec!["POST".into()],
require_secret: true,
}],
..Default::default()
}
```
然后实现 `on_http_request(payload) -> Vec<EmitMessage>`,把 webhook 原始 JSON 解析成 IncomingMessage。
### 限制
- 需要**公网可达**的域名(生产配 `tunnel.rs::start_managed_tunnel` 走 cloudflared/ngrok/tailscale)。
- 路径必须在 `capabilities.allowed_paths` 白名单里。
- 验签**必须开**(`require_secret: true` 是默认)。
## 2. Polling:宿主主动"上门问"
### 流程
```
tokio::time::interval(30s) 触发
│
▼
spawn 的 polling 任务 tick
├─ resolve_channel_host_credentials(...) ← 预解析 host 凭据
├─ execute_poll → 新建 wasmtime Store
│ 实例化 prepared 模块
│ 调 WASM 导出函数 on_poll
│ ← 拿回 Vec<EmitMessage>
└─ dispatch_emitted_messages → message_tx.send
│
▼
agent 主循环
```
### 关键代码
`src/channels/wasm/wrapper.rs:3291-3370` `start_polling`:
```rust
tokio::spawn(async move {
let mut interval_timer = tokio::time::interval(interval);
loop {
tokio::select! {
_ = interval_timer.tick() => {
// 1. 预解析 host 凭据
let host_credentials = resolve_channel_host_credentials(...);
// 2. 用新实例跑 on_poll
let result = Self::execute_poll(...).await;
// 3. 把 emit 出来的消息投到 message_tx
if !emitted_messages.is_empty() {
Self::dispatch_emitted_messages(...);
}
}
}
}
});
```
### 硬性约束
- `capabilities.allow_polling: false` 是**默认**(`capabilities.rs:33, 67`)。要开 polling 的 channel 必须**显式** `with_polling(min_ms)`。
- `min_poll_interval_ms` 强制 ≥ 30_000ms(`MIN_POLL_INTERVAL_MS = 30s`,`capabilities.rs:14, 96`)。`validate_poll_interval` 强制 clamp。
- **不能高于** capabilities 上限。host 决定节奏,不让 WASM 自己写间隔。
- **每次 tick 都新建 WASM 实例**(注释 `wrapper.rs:3288-3290` 明确说 "fresh instance per callback" 模式)。所以 polling 实例的 workspace 状态**不会**留在 wasmtime linear memory 里,要存到 `ChannelWorkspaceStore`(即 `wrapper.rs:854-856` 的 `workspace_store`)。
### WASM 插件里要做什么
在 `on_start` 的 `ChannelConfig.poll` 里**声明**:
```rust
ChannelConfig {
poll: Some(PollConfig {
enabled: true,
interval_ms: 60_000,
}),
..Default::default()
}
```
实现 `on_poll() -> Vec<EmitMessage>`,自己用 `http_request` host function 去平台拉消息(带 cursor / offset 保存在 workspace 里)。
### 限制
- **延迟高**:最坏 = 30s 才看到消息。
- **费资源**:每次 tick 实例化 WASM 一次。
- 但**最通用**——任何有 list API 的平台都能包。
## 3. WebSocket:双向长连接
### 流程
```
spawn 的 ws runtime 任务启动
│
▼
建 tokio TcpStream → wss://platform/gateway
│
▼
鉴权(HELLO/Identify/握手协议,每平台不同)
│
┌────┴────────────────────────────┐
│ │
▼ 平台发来的消息 ▼ 自己主动发
dispatch_incoming (调 on_respond) via websocket_outbound_tx ──► ws 帧
│ │
└─ emit_message ─► message_tx ──► agent
│
▼ (agent 回复)
on_respond(agent_response) ──► websocket_outbound_tx.send(json)
```
### 关键代码
`src/channels/wasm/wrapper.rs:1501-1545` `start_websocket_runtime`:
```rust
tokio::spawn(async move {
let mut reconnect_attempt = 0u32;
// ... 持久化未发队列
// 主循环:建连 → 鉴权 → 收发 → 断线重连
});
```
`wrapper.rs:1527-1536` 注释区明确:
```rust
// websocket_outbound_tx 是 host-managed 双向出站
// 让 on_respond / on_poll 都能 emit protocol-specific 帧
// 而不是只走 agent 主流程
```
### 三个特殊字段(其他机制没的)
| 字段 | 类型 | 作用 |
| ------------------------------------------------------------------ | ------- | ------------------------------------------------------------------- |
| `websocket_shutdown_tx` | oneshot | 单独关闭 ws runtime(不影响 polling) |
| `websocket_outbound_tx: Arc<RwLock<Option<mpsc::Sender<String>>>>` | mpsc | WASM 侧发帧的通道 |
| `websocket_poll_lock: Arc<Mutex<()>>` | Mutex | **串行化** ws 触发的 poll 执行(防同一 channel 实例并发跑 on_poll) |
### 四种 start 决策(`wrapper.rs:3761-3796`)
| 决策 | 触发条件 | 行为 |
| --------------------------------- | ------------- | ------------------------------------------------- |
| `NotConfigured` | 插件没声明 ws | 跳过 |
| `Spawn(cfg)` | 配置 ok | 起 ws runtime |
| `MissingAuth { credential_name }` | 凭据没写 | **warn 后跳过**(**不能死循环重试**,#2557 教训) |
| `MalformedConfig { reason }` | 配置坏 | warn 后跳过 |
### WASM 插件里要做什么
在 `on_start` 的 `ChannelConfig.websocket` 里声明(具体 schema 见 `capabilities.rs` / `schema.rs`),实现 `on_respond(agent_message)` 用 `websocket_outbound` host function 发帧。
### 限制
- 需要**稳定长连接**(NAT 穿透、proxy timeout 都要算上)。
- 平台侧 ws gateway 可能要求心跳。
- **MissingAuth 不能起**:这是整个 start() 流程里**最反直觉**的设计决策。`wrapper.rs:3778-3788` 注释把教训写死:
```rust
// Skip the spawn entirely: the runtime would connect, be
// rejected (e.g. Discord 4003), and retry forever (#2557).
// Recovery happens through the normal activation/restart
// path once the credential is written.
```
## 三者怎么选(写博客的对比段)
```
┌─ 平台支持 webhook?
│ ├─ 是 ──► Webhook ← 首选(延迟低、host 不耗资源)
│ └─ 否 ──► 平台支持 ws?
│ ├─ 是 ──► WebSocket
│ └─ 否 ──► Polling ← 兜底
▼
全部受 capabilities 白名单 + host 二次校验约束
```
| 维度 | Webhook | WebSocket | Polling |
| --------- | ------------------------------------ | ------------------------- | -------------------------- |
| 延迟 | 实时(<1s) | 实时(<1s) | 30s+ |
| host 资源 | 极低(被动接请求) | 中(持长连) | 中(定时实例化 WASM) |
| 平台要求 | 公网 URL | ws gateway | list API |
| 凭据校验 | host 验签(secret / Ed25519 / HMAC) | 握手时鉴权 | 每次请求带 token |
| 限流 | 由平台控制 | 由平台控制 | **host 强制 ≥ 30s** |
| 错处理 | 5xx 让平台重试 | 自动重连 + 未发队列持久化 | tick 失败 → 下个 tick 继续 |
| 缺凭据 | endpoint 不暴露 | **不启 ws**(不重试) | tick 失败 |
## 三者共享的 host 侧安全姿态
不管哪种入站方式,host **永远**在中间做这些事(写博客可以列成"host 不变式"):
1. **WASM 声明什么 ≠ host 给什么**:`capabilities.allowed_paths` / `allow_polling` / ws 决策都是二次过滤。
2. **每个回调都是新 WASM 实例**:线性内存/句柄不残留(`wrapper.rs:3288-3290` 注释),状态走 `workspace_store`。
3. **凭据 zero-exposure**:WASM 拿不到明文,要么是占位符替换、要么是 host 在出站 HTTP 时按 host_patterns 注入。
4. **错误不重试到死**:webhook 失败 → 5xx 让平台重试;ws 缺凭据 → **不启**;polling tick 失败 → 下个 tick 重试(30s 起步)。
## 给博客的小标题
> "WASM channel 的三种入站姿势:Webhook 是开门接客,WebSocket 是长聊专线,Polling 是定时查岗。" async fn execute_on_start_with_state(//执行on_start
&self,
) -> Result<(Result<ChannelConfig, WasmChannelError>, ChannelHostState), WasmChannelError> {
let _callback_guard = self.callback_lock.lock().await;
self.load_durable_workspace_snapshot().await;
let runtime = Arc::clone(&self.runtime);
let prepared = Arc::clone(&self.prepared);
let capabilities = Self::inject_workspace_reader(&self.capabilities, &self.workspace_store);
let config_json = self.config_json.read().await.clone();
let timeout = self.runtime.config().callback_timeout;
let channel_name = self.name.clone();
let credentials = self.get_credentials().await;
let host_credentials = resolve_channel_host_credentials(
&self.capabilities,
self.secrets_store.as_deref(),
&self.owner_scope_id,
)
.await;
let pairing_store = self.pairing_store.clone();
let workspace_store = self.workspace_store.clone();
let websocket_outbound_tx = self.websocket_outbound_tx.read().await.clone();
let (config_result, host_state, committed_paths) =
tokio::time::timeout(timeout, async move {
tokio::task::spawn_blocking(move || {
let mut store = Self::create_store(
&runtime,
&prepared,
&capabilities,
credentials,
host_credentials,
pairing_store,
websocket_outbound_tx,
)?;
let instance = Self::instantiate_component(&runtime, &prepared, &mut store)?;
let channel_iface = instance.near_agent_channel();
let config_result = channel_iface
.call_on_start(&mut store, &config_json)
.map_err(|e| Self::map_wasm_error(e, &prepared.name, prepared.limits.fuel))
.and_then(|wasm_result| match wasm_result {
Ok(wit_config) => Ok(convert_channel_config(wit_config)),
Err(err_msg) => Err(WasmChannelError::CallbackFailed {
name: prepared.name.clone(),
reason: err_msg,
}),
});
let mut host_state =
Self::extract_host_state(&mut store, &prepared.name, &capabilities);
let committed_paths =
Self::commit_callback_workspace_writes(&mut host_state, &workspace_store);
Ok::<_, WasmChannelError>((config_result, host_state, committed_paths))
})
.await
.map_err(|e| WasmChannelError::ExecutionPanicked {
name: channel_name.clone(),
reason: e.to_string(),
})?
})
.await
.map_err(|_| WasmChannelError::Timeout {
name: self.name.clone(),
callback: "on_start".to_string(),
})??;
self.persist_durable_workspace_snapshot_if_needed(&committed_paths)
.await;
Ok((config_result, host_state))
}
execute_on_start_with_state 函数详解
这个函数位于 src\channels\wasm\wrapper.rs:2030,是 WASM 通道(不是工具)的 wrapper。它的作用是:在阻塞线程里实例化 WASM 组件并执行它的 on_start 回调,拿到通道运行期所需的 ChannelConfig(display_name、HTTP 端点、轮询配置等)。
整体流程
拿到互斥锁
↓
加载持久化的 workspace 快照
↓
准备所有回调所需的 Arc / 凭证 / 快照
↓
tokio::time::timeout(callback_timeout, {
spawn_blocking {
创建 Store
实例化 WASM Component
调用 on_start(config_json) → wit_config
转 wit_config → ChannelConfig
抽取 host_state
提交回调期间产生的 workspace 写入
}
})
↓
持久化新的 workspace 快照
↓
返回 (config_result, host_state)
分步解读
1. 互斥与快照恢复(2033–2034 行)
let _callback_guard = self.callback_lock.lock().await;
self.load_durable_workspace_snapshot().await;
- callback_lock 保证同一个 channel 不会并发跑两次回调(on_start / on_http_request 共享)。
- load_durable_workspace_snapshot 把上次持久化进 linear memory 的 workspace 状态恢复出来,让回调看到一致的环境。
2. 准备回调上下文(2036–2051 行)
逐项收集调用 on_start 需要的全部数据:
- runtime / prepared:WASM 引擎与已编译组件的 Arc。
- capabilities = inject_workspace_reader(...):注入 workspace 读权限。
- config_json:通道配置(从外部传入的 JSON)。
- timeout:回调超时(来自 runtime.config().callback_timeout)。
- credentials:通过 get_credentials() 拿到的占位符 → 明文映射(用于 {TELEGRAM_BOT_TOKEN} 替换)。
- host_credentials:根据 capabilities 自动解密、按 host 注入的凭证(如 Bearer)。
- pairing_store、workspace_store、websocket_outbound_tx:通道回调可访问的运行时依赖。
3. 阻塞线程执行回调(2053–2096 行)
tokio::time::timeout(timeout, async move {
tokio::task::spawn_blocking(move || {
// 1) create_store: 构造 wasmtime::Store,绑定 host functions
// 2) instantiate_component: 实例化 wasm 组件
// 3) call_on_start(&config_json)
// 4) convert_channel_config: WIT 配置 → Rust ChannelConfig
// 5) extract_host_state: 抽出 host 侧累积的状态(logs 等)
// 6) commit_callback_workspace_writes: 提交 callback 期间对 workspace 的写
}).await...
}).await...??
关键点:
- spawn_blocking:WASM 同步执行,不能占用 tokio worker。
- 三层 ??:外层 ? 处理 tokio::time::timeout 错误(→ WasmChannelError::Timeout),中层处理 spawn_blocking join 错误(→ ExecutionPanicked),最里层处理回调本身返回的 WasmChannelError。
- WIT → Rust 转换:on_start 返回的是 Result<wit_tool::ChannelConfig, String>,需用 convert_channel_config 转成宿主侧的 ChannelConfig,否则错误统一包成 CallbackFailed。
4. 持久化新快照(2098–2099 行)
回调若写入了 workspace,需要把改动落盘,下次启动 load_durable_workspace_snapshot 能恢复。
5. 返回值
Ok((config_result, host_state))
- config_result:回调产出的 ChannelConfig(HTTP 端点、轮询配置、display name)。
- host_state:回调期间累积的日志等状态,调用方(call_on_start)会用 log_on_start_host_state 把它刷到 tracing。
上游调用方
call_on_start(2112 行)调用它:
1. 拿到 (config_result, host_state);
2. 把 host_state 里的日志刷出去;
3. 把 config_result 解包返回给 start(),用于注册 HTTP webhook、设置轮询等。
▎ 注意:第一次启动没有 component 时,call_on_start 会直接返回默认空配置(2114–2123 行),不走 execute_on_start_with_state。
✻ Churned for 39s
❯ 这为啥要写入workspace,这是啥东西
WASM 通道的 workspace 是啥?为啥 on_start 要写它?
概念
WASM 通道的 workspace 不是"用户的工作区"——它是 wasmtime Store 的 linear memory 里的私有 KV 存储。一个 HashMap<String, Vec<u8>>(或类似结构),由通道 WASM 通过 host function 读写。
可以把它想象成:给通道一块"白板",通道自己挂上去就能记东西。
// 通道在 WASM 里这样调用(伪代码):
let n = workspace_read("pending_user_id"); // 拿上一次记的值
if n is None {
workspace_write("pending_user_id", "u_123"); // 记下来
}
为啥需要它
on_start 只在 激活时 跑一次。但通道在生命周期里会发生很多次事件(on_http_request、on_message、on_pairing_response……),而有些状态需要 跨回调、跨进程重启 保留。
workspace 解决的就是 回调之间的本地持久化:
┌────────────────────────────────────────────────────┬─────────────────────────────────────────────────┐
│ 场景 │ workspace 的角色 │
├────────────────────────────────────────────────────┼─────────────────────────────────────────────────┤
│ OAuth 回调分两步(先 redirect、再 token exchange) │ 第一次把 state 写进 workspace,第二次读出来校验 │
├────────────────────────────────────────────────────┼─────────────────────────────────────────────────┤
│ 多步配对流程 │ 记录 pending_user_id、pending_device │
├────────────────────────────────────────────────────┼─────────────────────────────────────────────────┤
│ 增量去重 / 已处理消息 ID │ 记 last_seen_id │
├────────────────────────────────────────────────────┼─────────────────────────────────────────────────┤
│ 缓存上游 API 响应 │ 短期缓存 │
└────────────────────────────────────────────────────┴─────────────────────────────────────────────────┘
为啥要"持久化"到磁盘
WASM 通道的 linear memory 跟着 Store 走,Store 销毁,workspace 跟着没了。
如果通道进程崩溃重启、或者 worker 容器重启,内存里的 workspace 就清空了——配对流程会被打断,OAuth state 会失效。
所以两层存储:
1. 内存层:每次回调里直接读写(快)。
2. 快照层:每次回调结束(on_start / on_http_request / on_message 等),把 workspace 里声明要持久化的 path 序列化落盘。
下次通道激活时,先 load_durable_workspace_snapshot 把磁盘上的 KV 灌回内存,通道就像"没死过一样"继续工作。
对应回这个函数的代码
async fn execute_on_start_with_state(&self) ... {
// ① 回调开始前:把上次的快照恢复进内存
self.load_durable_workspace_snapshot().await;
// ② 在 spawn_blocking 里跑 WASM
// - create_store 时把 workspace_store 绑进去
// - WASM 通过 host fn 读/写 workspace
// - 提取 host_state 时,host_state 携带了回调期间累计的写入
let mut host_state = Self::extract_host_state(&mut store, ...);
let committed_paths =
Self::commit_callback_workspace_writes(&mut host_state, &workspace_store);
// → 关键:把回调里"声明要持久化"的写入提交到 workspace_store
// (不是所有写入都要落盘,通道要明确标记哪些 path 是 durable 的)
// ③ 回调结束后:把新提交的 path 落盘
self.persist_durable_workspace_snapshot_if_needed(&committed_paths).await;
Ok((config_result, host_state))
}
安全意义
- WASM 沙箱:workspace 是 WASM 唯一可写的状态;落盘由宿主控制,通道自己写不进去。
- 租户隔离:workspace_store 按 owner scope key 隔离,不同用户的快照互不污染。
- 可清理:不需要的时候直接删快照文件,等于"重置"通道本地状态。
一句话总结
on_start 写 workspace = 通道在第一次激活时把它的本地状态(OAuth state、配对进度、缓存等)持久化下来,这样后续回调和重启都能接得上,而不会每次都从零开始。
响应
async fn respond(
&self,
msg: &IncomingMessage,
response: OutgoingResponse,
) -> Result<(), ChannelError> {
// Stop the typing indicator, we're about to send the actual response
self.cancel_typing_task().await;
// Check if there's a pending synchronous response waiter
if let Some(tx) = self.pending_responses.write().await.remove(&msg.id) {
let _ = tx.send(response.content.clone());
}
// Call WASM on_respond
// IMPORTANT: Use the ORIGINAL message's metadata, not the response's metadata.
// The original metadata contains channel-specific routing info (e.g., Telegram chat_id)
// that the WASM channel needs to send the reply to the correct destination.
let metadata_json = serde_json::to_string(&msg.metadata).unwrap_or_default();
// Store for owner-target routing (chat_id etc.) only when the configured
// owner is the actor in this conversation.
let owner_actor_id = self.owner_actor_id.read().await.clone();
if should_update_owner_broadcast_metadata(
&msg.user_id,
&msg.sender_id,
&self.owner_scope_id,
owner_actor_id.as_deref(),
) {
self.update_broadcast_metadata(&metadata_json).await;
}
let generated_image_attachments =
self.take_generated_image_attachments(&msg.metadata).await;
let mut attachments = response.attachments.clone();
let response_already_has_generated_image = attachments
.iter()
.any(|path| is_staged_generated_image_path(path))
|| !response.inline_attachments.is_empty();
if response_already_has_generated_image {
tracing::debug!(
channel = %self.name,
skipped = generated_image_attachments.len(),
"Skipping status-staged generated images because the final response already carries generated-image attachments"
);
} else {
attachments.extend(generated_image_attachments.iter().cloned());
}
let result = self
.call_on_respond(
msg.id,
&response.content,
response.thread_id.as_ref().map(|t| t.as_str()),
&metadata_json,
&attachments,
&response.inline_attachments,
)
.await;
remove_staged_generated_image_attachments(&generated_image_attachments);
result.map_err(|e| ChannelError::SendFailed {
name: self.name.clone(),
reason: e.to_string(),
})?;
Ok(())
}
数据足够,下面是逐段拆解。
# `WasmChannel::respond()` 拆解
`src/channels/wasm/wrapper.rs:3808-3873`。这是"agent 给 WASM channel 回信"时 host 做的事。**它不是简单地把回复转发出去**——它要先做 5 件事:停 typing、回同步等待者、记下目的地、合入生成的图片、调 WASM 回调。
## 整体节奏
```
┌─ 1. 停 typing 指示器(4s 重发那个) ─┐
└─────────────────────────────────────┘
↓
┌─ 2. 有同步等待者?给个回执 ─────────┐
│ pending_responses 里有这个 msg.id? │
│ 有 → oneshot.send(content) │
│ 没有 → 继续走正常路径 │
└─────────────────────────────────────┘
↓
┌─ 3. 拿原始 metadata(不是 response 的)┐
│ metadata 含 chat_id、message_id 等 │
│ channel-specific 路由信息 │
└─────────────────────────────────────┘
↓
┌─ 4. 缓存"广播目的地" ───────────────┐
│ 只在这是 owner 自己说话时记 │
│ (应该_update_owner_broadcast_...) │
└─────────────────────────────────────┘
↓
┌─ 5. 合并 status 阶段暂存的生成图片 ───┐
│ 但 response 已带图就跳过 │
└─────────────────────────────────────┘
↓
┌─ 6. 调 WASM 插件的 on_respond ───────┐
│ 传 msg_id, content, thread_id, │
│ metadata, attachments, │
│ inline_attachments │
└─────────────────────────────────────┘
↓
┌─ 7. 清理 staging 目录里的暂存图 ──────┐
└─────────────────────────────────────┘
```
## 逐段讲
### 1. `cancel_typing_task().await`
```rust
self.cancel_typing_task().await;
```
回信要发了,那个 4 秒一次的"typing..."重发任务**必须停**,否则会跟最终回复打架(用户先看到 typing、然后又看到内容 + typing 闪烁)。
回顾一下背景(`wrapper.rs:843-845, 2914-2922`):
- `typing_task: RwLock<Option<JoinHandle>>` 保存后台任务句柄。
- `send_status(Thinking)` 启动它,每 4 秒调一次 `on_status` 发"typing"。
- `send_status(terminal)` 或 `send_status(approval_needed)` 时**取消**它。
- 现在 `respond()` 是另一条取消路径——**真正的回复马上要来了,typing 必须停**。
注释写得很硬:`Stop the typing indicator, we're about to send the actual response`。
### 2. 同步回执(`pending_responses`)
```rust
if let Some(tx) = self.pending_responses.write().await.remove(&msg.id) {
let _ = tx.send(response.content.clone());
}
```
`pending_responses: RwLock<HashMap<Uuid, oneshot::Sender<String>>>` 是上篇提过的"**同步响应通道**"。
工作流:
- WASM 插件某次 `emit_message` 选了 `request_response` 模式,在 host 侧 `pending_responses` 里挂个 oneshot sender,等着 agent 立刻回。
- agent 跑完一轮,把回复传回 host 调 `respond()`。
- 这里 `remove(&msg.id)` 取出发送端,把 content 塞进 oneshot —— **调 emit 的那行同步代码立即拿到 agent 的回复**。
注意:取出发送端后**这个分支就 return 了**?不,**继续往下走**。注释说 respond() 的**完整流程**还是要跑(停 typing、调 on_respond),但同步等待者已经被满足了。两者并行不冲突。
如果 `msg.id` 不在 map 里(绝大多数情况——非同步 emit),就安静跳过。
### 3. **关键不变量**:用**原消息的** metadata,不是 response 的
```rust
// IMPORTANT: Use the ORIGINAL message's metadata, not the response's metadata.
// The original metadata contains channel-specific routing info (e.g., Telegram chat_id)
// that the WASM channel needs to send the reply to the correct destination.
let metadata_json = serde_json::to_string(&msg.metadata).unwrap_or_default();
```
这是整个 `respond()` 里**最值得讲的设计点**。
直觉里你可能觉得"response 的 metadata 是新写的,应该用新的",但**错的**:
- 原始 `IncomingMessage.metadata` 里塞了 `chat_id`(Telegram)、`channel_id`(Slack)、`message_id` 等**怎么回信**的路由信息。
- 这些路由信息**只有入站那一刻才知道**(Telegram 用户在哪个 chat、Slack 事件来自哪个 channel)。
- `OutgoingResponse.metadata` 是 response 自己的元数据,跟"把回复发到哪"无关。
如果用错 metadata:调 `on_respond` 时 WASM 拿到 `{"foo": "bar"}`(response 的元数据),**根本不知道把消息发回哪个 chat**,IM 平台一收就 400。
### 4. 缓存广播目的地
```rust
let owner_actor_id = self.owner_actor_id.read().await.clone();
if should_update_owner_broadcast_metadata(
&msg.user_id,
&msg.sender_id,
&self.owner_scope_id,
owner_actor_id.as_deref(),
) {
self.update_broadcast_metadata(&metadata_json).await;
}
```
**这段只看是"owner 自己在用 channel"才记**。`should_update_owner_broadcast_metadata` 规则(`wrapper.rs:1090-1099`):
```rust
owner_actor_id.map_or(
user_id == owner_scope_id, // 没人 bound → 用 user_id
|owner_actor_id| sender_id == owner_actor_id, // 有人 bound → 用 sender_id
)
```
为什么要这么挑?这是 `installed ≠ active` 规则的延伸。
举例:
- 一个**没配对**的 Telegram channel:发消息的 `user_id` 就是 owner → 记下 `chat_id=xxx`,下次主动推给 owner。
- 一个**配对过**的 Telegram channel:可能有好几个 `sender_id` 都在用(owner 之外还有授权用户)→ **只**在 sender 恰好等于 `owner_actor_id` 时记,避免把别人的 chat_id 误当成 owner 的。
记下来后存到 `last_broadcast_metadata: Arc<RwLock<Option<String>>>`,**并**写进 `settings_store`(`update_broadcast_metadata` 内部),**跨重启保留**。这才是 `start()` 开头那行 `load_broadcast_metadata().await` 能恢复数据的原因。
### 5. 合入生成的图片(status 阶段暂存 → 终态发出)
```rust
let generated_image_attachments =
self.take_generated_image_attachments(&msg.metadata).await;
let mut attachments = response.attachments.clone();
let response_already_has_generated_image = attachments
.iter()
.any(|path| is_staged_generated_image_path(path))
|| !response.inline_attachments.is_empty();
if response_already_has_generated_image {
tracing::debug!(...);
} else {
attachments.extend(generated_image_attachments.iter().cloned());
}
```
**问题**:agent 跑一轮可能产生 5 张图,每张在 `StatusUpdate::ImageGenerated` 阶段就 staged 到磁盘了。**但当时还不知道最终 reply 长什么样**——你不能每生成一张就发一条消息(用户会被刷屏)。**正确做法是**把图"挂"在当前这条消息上,等最终回复一起发。
实现:
- `take_generated_image_attachments(&msg.metadata)` 从 metadata 里取"这次消息暂存了哪些图"。
- 检查 `response` 本身**已经**带了 generated image(按 staging 路径前缀 / inline 附件判断)。
- **是** → 跳过暂存图(避免重复)。
- **否** → 把暂存图合进 `attachments` 一起发。
- 合并完传给 `call_on_respond`。
### 6. 调 WASM 插件的 `on_respond`
```rust
let result = self.call_on_respond(
msg.id,
&response.content,
response.thread_id.as_ref().map(|t| t.as_str()),
&metadata_json,
&attachments,
&response.inline_attachments,
).await;
```
跟其他 callback(`on_start` / `on_poll` / `on_http_request`)**完全同款**的"造 wasmtime Store + 加载 host functions + 实例化 + 调导出函数"流程。
参数关键点:
| 参数 | 来源 | 作用 |
| ------------------------------ | ----------------------------- | ------------------------------ |
| `msg.id` | 原入站消息的 UUID | 让插件回复时挂回同条消息 |
| `&response.content` | agent 的回复文本 | 必发 |
| `response.thread_id` | 可选 `ExternalThreadId` | 多回同一 thread(IM 平台概念) |
| `&metadata_json` | **原消息**的 metadata | 路由用(chat_id、channel_id…) |
| `&attachments` | response.attachments + 暂存图 | 走 host 注入凭据后出站 |
| `&response.inline_attachments` | response 自己的内嵌附件 | 不落盘直接出 |
WASM 插件在 `on_respond` 里**真正的活**:
- 解析 metadata 取 `chat_id`。
- 调 `http_request` host function(host 注入 Bearer token 后发出站 HTTP)。
- 把消息发到 Telegram API / Slack API。
### 7. 清理 staging
```rust
remove_staged_generated_image_attachments(&generated_image_attachments);
```
不管 `call_on_respond` 成功失败,这步都跑。staging 目录里的暂存图生命周期跟"agent 这轮回复"绑定,回复发出后即可清。
### 错误处理
```rust
result.map_err(|e| ChannelError::SendFailed {
name: self.name.clone(),
reason: e.to_string(),
})?;
```
`call_on_respond` 失败 = 发不出去 = `ChannelError::SendFailed`。`ChannelManager::respond` 会把这个错误冒泡到 `agent_loop::respond_then_done`(`agent_loop.rs:686-714`):
```rust
let respond_result = self.channels.respond(message, response).await;
// ... 删除 staging 图
// 然后继续发 Done(不论 respond 成败,turn 都要关闭)
```
也就是说:**WASM 发不出消息不会让 agent 死循环**,Done 信号照发、UI 知道这一轮收尾了。
## 写博客可以拎出来的金句
1. **"用原消息的 metadata 不用 response 的"**——这条注释是整个函数里**唯一**带 IMPORTANT 的,凡是 channel 抽象必踩的坑:路由信息只属于入站那一刻。
2. **"owner 在说话才记广播目的地"**——`should_update_owner_broadcast_metadata` 的两段式判定,规避多用户 channel 误把客人的 chat_id 标成 owner 的。
3. **"生成的图先暂存,最终回复时合包发"**——为什么 `pending_generated_image_attachments` 是个 `HashMap<msg_id, Vec<path>>`,而不是每张图独立 emit。
4. **"respond 失败不影响 Done"**——`respond_then_done` 故意解耦"消息到达"和"轮次结束"两件事,跟 `agent_loop.rs:681-685` 注释(#2079 SSE 早关闭 turn 的修复)一脉相承。
5. **"同步回执是少数派,绝大多数走 on_respond"**——`pending_responses.remove(&msg.id)` 是个 NoOp 的概率 99%,但留着是 1% 同步调用的关键通道。
## 速记表
```
┌─ respond() 5 件事 ─────────────────────────┐
│ 1. 停 typing ← cancel_typing_task │
│ 2. 同步回执 (1% 概率) ← pending_responses │
│ 3. 路由用原 metadata ← chat_id 来源 │
│ 4. 缓存 broadcast 目的地 ← owner only │
│ 5. 合入暂存生成图 ← 避免刷屏 │
│ 6. 调 on_respond ← 真正发信 │
│ 7. 清理 staging ← 生命周期结束 │
└────────────────────────────────────────────┘
```完整链路
# WasmChannel 完整调用链路(第一次连接、发消息、收回复)
> 假设场景:用户首次在 Telegram 给 bot 发 `/start`。
> 假设 WASM Telegram channel 插件用 webhook 模式(path = `/webhook/telegram`)。
> 所有引用均来自实际读到的代码。
## 总览(鸟瞰图)
```
Telegram 服务器 ──POST──> WasmChannelRouter ──> WasmChannel::on_http_request
│
▼
call_on_http_request (新 WASM 实例)
│
▼
WASM 插件: parse + emit_message
│
▼
Agent 主循环 (channel merge)
│
▼
agent 处理 + 调工具
│
▼
Agent::respond → respond_then_done
│
▼
ChannelManager::respond
│
▼
WasmChannel::respond
│
▼
call_on_respond (新 WASM 实例)
│
▼
WASM 插件: 解析 chat_id + http_request
│
▼
Telegram API ◄──Host 注入 Bearer token
│
▼
用户在 Telegram 看到回复
```
## 阶段 0:启动 wiring(程序启动时一次性)
```
src/main.rs
└─ let mut gw = GatewayChannel::new(...).with_workspace_pool(...);
└─ ... 链式 with_* 注入所有 subsystem
```
但 `WasmChannel` **不在** main.rs 链上,它是在 `WasmChannelLoader` 启动时按 `bundled.rs` / 磁盘扫描出来的(`src/channels/wasm/mod.rs:106`):
```rust
WasmChannelLoader::new(runtime, pairing_store, settings_store, owner_scope_id)
└─ load_installed() // 扫描 ~/.ironclaw/channels/*.wasm
├─ 编译(首次)/ 取缓存(后续)
└─ 构造 WasmChannel 实例
└─ 每个 WasmChannel::start()
└─ (就是上一篇讲的 5 步)
```
`start()` 完成后,channel 的:
- `message_tx: Arc<RwLock<Option<mpsc::Sender<IncomingMessage>>>>` 已就绪
- `endpoints: RwLock<Vec<RegisteredEndpoint>>` 已注册
- `WasmChannelRouter.path_to_channel["/webhook/telegram"] = "telegram"` 已建立
**WasmChannel **本身**不**在 `ChannelManager` 里(除了 `GatewayChannel`)——WAS M channel 是**单独走 web channel 路径**:
```
GatewayChannel 是 Channel trait 的实现("gateway" channel 名)
└─ ChannelManager::hot_add(GatewayChannel) ← 一开始就注册
WasmChannel 也是 Channel trait 的实现
└─ 但它自己直接被 start() 调起,message_tx 直接注入自己的 mpsc
└─ 或者用 WasmChannelLoader 的回调路径:把 WasmChannel 的 mpsc 串到一个内部 forwarding task
forwarding task: msg_rx → ??? → agent 主循环
```
> 具体怎么把 WasmChannel 的 mpsc 串进 `agent.channels.start_all()` 的合并流,要看 `WasmChannelLoader::load_installed` 的实现,但论文里你已经看到了核心循环,本文聚焦**单条消息的一次完整往返**。
## 阶段 1:Telegram 推 webhook
### 1.1 Telegram 服务器
用户给 bot 发消息。Telegram 用**主动回调**模式:调用我们暴露的 URL,把 update JSON POST 过来。
```
POST https://<your-domain>/webhook/telegram
X-Telegram-Bot-Api-Secret-Token: <shared secret>
Content-Type: application/json
{ "update_id": 12345, "message": { "chat": { "id": 999 }, "from": {...}, "text": "/start" } }
```
### 1.2 路由命中
在 `WasmChannelRouter` 内部,预先注册过的表:
```rust
path_to_channel["/webhook/telegram"] = "telegram"
path_methods["/webhook/telegram"] = ["POST"]
secrets["telegram"] = "telegram-shared-secret"
secret_headers["telegram"] = "X-Telegram-Bot-Api-Secret-Token"
```
`router.rs:62-67` 注释明确说明 Telegram 用专用 header(其他 channel 默认 `X-Webhook-Secret`)。
### 1.3 验签
`router.rs:445-447` 检查 `X-Telegram-Bot-Api-Secret-Token` 是否等于配置的 secret,**不等直接 403**——host 验签,**完全不让 WASM 看到 webhook 原始 body** 之前就拒掉伪造请求。
### 1.4 找到 channel 实例
```rust
let channel: Arc<WasmChannel> = channels.get("telegram").unwrap();
```
调 `channel.call_on_http_request(body, headers)`。
## 阶段 2:WasmChannel 跑 `on_http_request`
`wrapper.rs:2142` 起的函数。**和上一篇 `respond()` 同样的"造新实例"流程**:
```rust
let _callback_guard = self.callback_lock.lock().await; // 1. 串行化所有回调
// 2. tokio::time::timeout(callback_timeout=30s)
// 3. tokio::task::spawn_blocking(|| {
// a. prepare_response_attachments(...) // 几乎没活
// b. create_store(...) // 装 host functions
// c. instantiate_component(...) // 装 prepared 模块
// d. let channel_iface = instance.near_agent_channel();
// e. channel_iface.call_on_http_request(&mut store, &wit_request)
// })
```
### 2.1 `create_store`(`wrapper.rs:1874`)
新建一个 wasmtime `Store<ChannelStoreData>`,注入:
- `limiter: ResourceLimiter`(fuel + 内存)
- `host_state: ChannelHostState { emitted_messages, pending_writes, base HostState { logger } }`
- `http_client: reqwest::Client`(预解析 host credentials 时用)
- `credentials: HashMap<placeholder, value>`(占位符级)
- `host_credentials: Vec<ResolvedHostCredential>`(按 host 自动注入的)
- `pairing_store`(guest 访问控制)
- `workspace_store`(`Arc<ChannelWorkspaceStore>`,跨回调 KV)
- `runtime: Option<Runtime>`(仅 ws 模式有)
**host functions 注册**(`add_to_linker` 生成代码):
```
logging::log(level, target, message)
http::http_request(method, url, headers_json, body, timeout_ms) -> response_json
http::http_request_with_secrets(method, url, headers_json, body, timeout_ms, ...) -> response_json
pairing::is_paired(actor_id) -> bool
pairing::request_pairing(actor_id, channel, metadata_json) -> bool
workspace::workspace_read(path) -> option<string>
workspace::workspace_write(path, value) -> result
websocket::websocket_send(message) -> result
```
### 2.2 `instantiate_component`
从 `prepared.component: Arc<Option<Component>>` 拿到预编译产物,实例化。**线性内存全空**——保证"fresh instance per callback"不变量。
### 2.3 调 `on_http_request`
WASM 侧走它自己的实现,**典型 Telegram 插件逻辑**:
```rust
// 伪代码:WASM 插件 on_http_request
fn on_http_request(req: HttpRequest) -> Result<HttpResponse, String> {
let update: Update = serde_json::from_str(&req.body)?;
let text = update.message.text;
let chat_id = update.message.chat.id;
// 关键:把 Telegram 私有信息塞进 metadata
let metadata = json!({
"chat_id": chat_id,
"message_id": update.message.message_id,
"user_id": update.message.from.id, // 整数,保留做"channel-private 路由"
"telegram_update_id": update.update_id,
});
// emit 给 agent
emit_message(EmitMessage {
content: text,
user_id: "alice".to_string(), // ← 这个是 owner/已配对用户的 user_id
sender_id: update.message.from.id.to_string(), // ← 这个是 Telegram 侧 actor
thread_id: None,
metadata_json: metadata.to_string(),
is_internal: false,
// 关键:request_response: false(绝大多数情况)
});
Ok(HttpResponse { status: 200 })
}
```
`emit_message` 是 host function,**把消息写进 `host_state.emitted_messages`**——这一步 WASM 不会立刻把消息发出去,**等回调结束统一 dispatch**。
### 2.4 `extract_host_state`
回调返回后,从 `Store` 取出 `emitted_messages` + `pending_writes`(workspace 写)+ 累积的 guest 日志。
### 2.5 `commit_callback_workspace_writes`
把"这次回调写过的 workspace 路径"提交到 `ChannelWorkspaceStore`(`Arc<ChannelWorkspaceStore>`),跨 tick 保留。`durable_workspace_paths` 之外的会进 settings_store 跨重启。
### 2.6 `dispatch_emitted_messages`(关键!)
把 `emitted_messages` 转成 `IncomingMessage` 投到 `message_tx`:
```rust
// wrapper.rs:3117
self.dispatch_emitted_messages(
EmitDispatchContext {
channel_name: &self.name,
capabilities: &self.capabilities,
owner_scope_id: &self.owner_scope_id,
owner_actor_id: current_owner.as_deref(),
...
},
emitted_messages,
).await?;
```
`dispatch_emitted_messages` 内部做:
1. 速率限制(`rate_limiter.check`)。
2. 检查 `max_message_size`(默认 64KB)。
3. 构造 `IncomingMessage::new(channel, user_id, content).with_metadata(metadata)`。
4. 关键——`with_metadata` 会**强制**把字符串 `metadata.user_id` 覆写成 `self.user_id`(`channel.rs:231-251` 的安全不变量,防止 WASM 伪造 owner)。**非字符串** user_id(如 Telegram 整数)保留不动(SSE 路由层 `as_str()` 视为 missing 失败关闭)。
5. `message_tx.send(msg).await`——入 mpsc。
## 阶段 3:消息进 agent 主循环
`WasmChannel::start()` 返回 `Box::pin(ReceiverStream::new(rx))` 给 `WasmChannelLoader`,loader 用 spawn 起来的 forwarding task 把消息喂到 host 的统一入站流。
> 具体怎么 merge 进 `ChannelManager.start_all()` 的流:看 `WasmChannelLoader::load_installed` 末段。
最终,`agent_loop.rs:1080` 拿到的 `message_stream` 能 `next()` 拿到这条消息:
```rust
let mut message_stream = self.channels.start_all().await?;
loop {
select! {
msg = message_stream.next() => {
let message = msg.unwrap();
// ...转写、文档提取
match self.handle_message(&message).await {
Ok(HandleOutcome::Respond(response)) => {
self.respond_then_done(&message, response).await;
}
...
}
}
}
}
```
`handle_message`(`agent_loop.rs:1713`)分发到:
- v1: `ChatDelegate::run_agentic_loop()`(共享 `agentic_loop.rs`)
- v2: `bridge::router::handle_with_engine()`(engine v2)
假设 v1。`ChatDelegate` 跑 agentic 循环:LLM 调用 → 工具执行 → 重复 → 出 final response。
中间 agent 还会触发一堆 `StatusUpdate`(Thinking / ToolStarted / ToolResult / ReasoningUpdate / …),这些走 `ChannelManager::send_status("telegram", status, &metadata)` → `WasmChannel::send_status` → 调 `on_status` 回调(`wrapper.rs:2688-2773`),WASM 插件可以选择发"Telegram typing..."(**4 秒重发机制**就在 send_status 路径上)。
## 阶段 4:agent 回信
`agent_loop.rs:686-714` `respond_then_done`:
```rust
async fn respond_then_done(&self, message, response) {
// 1. 先发正文
self.channels.respond(message, response).await; // ★
// 2. 不论成功失败都发 Done
self.channels.send_status(
&message.channel, // "telegram"
StatusUpdate::Status("Done".into()),
&message.metadata,
).await;
}
```
`★` 走到 `ChannelManager::respond`(`manager.rs:136-150`):
```rust
pub async fn respond(&self, msg, response) {
if let Some(channel) = channels.get(&msg.channel) { // "telegram" 找 WasmChannel
channel.respond(msg, response).await
}
}
```
## 阶段 5:`WasmChannel::respond`(上一篇详解的那个)
`wrapper.rs:3808-3873`。**5 件事**:
1. `cancel_typing_task`(停 4 秒 typing 重发)
2. 检查 `pending_responses`(同步回执,1% 路径)
3. **用原消息的 metadata**(不是 response 的)—— 路由信息 `chat_id` 来源
4. 缓存 `last_broadcast_metadata`(仅 owner 在说话时记)
5. 合入 `pending_generated_image_attachments`(status 阶段暂存的图)
6. `call_on_respond(msg_id, content, thread_id, metadata_json, attachments, inline_attachments)`
## 阶段 6:`call_on_respond`(`wrapper.rs:2386-2554`)
跟 `call_on_http_request` 同款"新实例"流程:
```
1. callback_lock.lock() 串行化
2. tokio::time::timeout(callback_timeout=30s)
3. spawn_blocking:
a. create_store (同前)
b. instantiate_component
c. wit_response = AgentResponse { message_id, content, thread_id, metadata_json, attachments }
d. channel_iface.call_on_respond(&mut store, &wit_response)
e. extract_host_state
f. commit_callback_workspace_writes
4. drain_guest_logs ← 不论成功失败都保留 guest 日志
5. persist_durable_workspace_snapshot_if_needed ← durable paths 写 settings_store
```
WASM 插件 `on_respond` 实现:
```rust
// 伪代码
fn on_respond(response: AgentResponse) -> Result<(), String> {
let metadata: serde_json::Value = serde_json::from_str(&response.metadata_json)?;
let chat_id = metadata["chat_id"].as_i64().ok_or("no chat_id")?;
let reply_url = "https://api.telegram.org/bot{BOT_TOKEN}/sendMessage";
// 占位符 {BOT_TOKEN} 在 host 注入凭据时已被替换
let body = json!({
"chat_id": chat_id,
"text": response.content,
"reply_to_message_id": metadata["message_id"],
});
let resp = http_request("POST", reply_url, "{}", &body.to_string(), 10000)?;
Ok(())
}
```
**关键**:WASM 拿不到 `{BOT_TOKEN}` 的明文——要么是 `credentials: HashMap<placeholder, value>` 里的占位符(`wrapper.rs:154-172` `inject_credentials`),要么是 host 在出站 HTTP 时按 `host_patterns` 自动注入 `Authorization: Bearer ...`(`wrapper.rs:250-280`)。
host 注入完发出去 → Telegram 收到 → 用户在 Telegram 看到回复。
## 阶段 7:响应回流
WASM 回调 `Ok(())` → `call_on_respond` 返回 `Ok(())` → `WasmChannel::respond` 返回 `Ok(())` → `ChannelManager::respond` 返回 `Ok(())` → `respond_then_done` 继续发 `Status("Done")` → `WasmChannel::send_status` → `on_status`(WASM 看到 terminal 不重发 typing)→ agent 主循环下一轮。
## 完整时序
```
Telegram WasmChannelRouter WasmChannel spawn_blocking Agent ChannelManager
│ │ │ │ │ │
│───POST /webhook──>│ │ │ │ │
│ │──验签─────────>│ │ │ │
│ │ │ │ │ │
│ │ │──callback_lock.lock()────>│ │
│ │ │──spawn_blocking──────────>│ │
│ │ │ create_store │ │
│ │ │ instantiate │ │
│ │ │ call_on_http_request────>│ │
│ │ │ ← 写入 host_state.emitted│ │
│ │ │ extract_host_state │ │
│ │ │ commit_workspace_writes │ │
│ │ │<─Ok()─── │ │
│ │ │ │ │ │
│ │ │──dispatch_emitted────────>│ │
│ │ │ │ message_tx.send │
│ │ │ │ │ │
│ │ │ │ message_stream.next()
│ │ │ │ │──handle_message
│ │ │ │ │ (LLM + tools)
│ │ │ │ │ respond(response)
│ │ │ │ │ │
│ │ │ │ │──channels.respond──>│
│ │ │ │ │ │ channel.respond
│ │ │ │ │ │
│ │ │──cancel_typing_task │ │
│ │ │──callback_lock.lock() │ │
│ │ │──spawn_blocking │ │
│ │ │ create_store │ │
│ │ │ instantiate │ │
│ │ │ call_on_respond──>WASM │ │
│ │ │ WASM 解析 chat_id │ │
│ │ │ http_request──────────────┼──>Telegram API
│ │ │ ← 200 OK │ │
│ │ │<─Ok()── │ │
│ │ │ drain_guest_logs │ │
│ │ │ persist_durable_workspace│ │
│ │ │<─Ok()── │ │
│ │ │ │ │ │
│ │ │ │ │<─Ok()──────│
│ │ │ │ │ │
│ │ │ │ │──send_status("Done")
│ │ │ │ │ │
│ │ │ call_on_status(Terminal)─│ │
│ │ │ WASM 不再发 typing │ │
│ │ │ │ │ │
│<─用户看到 bot 回复──────────────────────────────────────────────────────────│
```
## 关键不变量
| 不变量 | 在哪 | 为什么 |
| ------------------------------------ | --------------------------------------------- | ------------------------------------------------ |
| **fresh instance per callback** | `create_store` + `instantiate_component` 每次 | 线性内存/句柄不残留(防 token 滞留) |
| **callback_lock** | `call_on_*` 开头 | 同一 channel 实例的回调串行,防 workspace 写竞争 |
| **callback_timeout=30s** | `tokio::time::timeout` | 沙箱失控兜底(防死循环/CPU 占用) |
| **spawn_blocking** | 包裹 wasmtime | wasmtime 是 CPU-bound,不该污染 tokio 调度 |
| **host 验签早于 WASM** | `WasmChannelRouter` 验签 | 假 webhook 根本不进 WASM |
| **host 注入凭据早于出站** | `http::http_request` host fn | WASM 永远拿不到明文 |
| **emit_message 后处理** | `dispatch_emitted_messages` | WASM 不能在回调内直接 await 网络 |
| **用原 metadata 不用 response 的** | `WasmChannel::respond` 注释 | 路由信息只属于入站那一刻 |
| **owner-only 缓存 broadcast 目的地** | `should_update_owner_broadcast_metadata` | 防客人的 chat_id 误标成 owner |
| **Done 不论 respond 成败都发** | `respond_then_done` | 防止 #2079 SSE 早关闭 turn |
## 给博客的开头
> "一次 Telegram 消息在我们系统里的旅程:消息从 Telegram 服务器到你的屏幕,要穿过 4 个 host 屏障(Telegram API 验签、WASM 沙箱、agent 循环、host 出站凭据注入),每道屏障都把插件可能捅出的问题压窄。**关键不是 WASM 跑得多快,而是 host 在哪些地方强制介入。**"③wit
WASM Channel 安全模型
// Security Model:
// - WASM channels are untrusted and run in a sandbox
// (WASM channel 是不可信的,跑在沙箱里)
// - Fresh instance per callback (no shared mutable state)
// (每次回调都用全新的实例——不共享可变状态)
// - All capabilities are opt-in (default: no access)
// (所有能力都是显式开启的——默认什么都不能做)
// - Secrets are NEVER exposed to WASM; credentials are injected at host boundary
// (WASM 永远拿不到原始凭据;凭据由 host 在边界处注入)
// - Workspace writes are prefixed with channels/<name>/ to prevent escape
// (workspace 写入自动加 `channels/<name>/` 前缀,防止越界)
// - Message emission is rate-limited
// (消息发送受速率限制)
我数了一下,`wit/channel.wit` 中 `channel-host` 接口**共包含 12 个函数(func)**,按职责可分为三大类,加上 5 个类型定义。下文按"基础能力 → 渠道特定能力 → 配对能力"依次讲解。
---
## 一、类型定义(5 个,非函数)
虽然你问的是"接口",但 `channel-host` 还内嵌了一些数据契约,先列出来便于理解函数签名:
| 类型 | 类别 | 作用 |
| -------------------------------- | ---- | -------------------------------------------------------------------------------- |
| `log-level` (enum) | 基础 | 日志等级:`trace`/`debug`/`info`/`warn`/`error` |
| `http-response` (record) | 基础 | HTTP 响应:状态码 + 头(JSON)+ body |
| `inbound-attachment` (record) | 渠道 | 入站附件:文件 id、MIME、文件名、大小、源 URL、存储 key、抽取文本、`extras-json` |
| `emitted-message` (record) | 渠道 | 发往 agent 的消息:user-id/name、content、thread-id、metadata、attachments |
| `pairing-upsert-result` (record) | 配对 | 配对请求结果:code + 是否新建 |
---
## 二、基础能力(Base Capabilities, 5 个)
源自通用 `tool host`,但被 `channel-host` 重新声明以扩展渠道场景。
### 1. `log(level, message)`
结构化日志。**约束**:单个 callback 最多 1000 条、每条 ≤ 4KB;callback 结束后统一输出。
### 2. `now-millis() -> u64`
当前 Unix 毫秒时间戳。给 WASM 提供统一时间基准,避免依赖宿主时区。
### 3. `workspace-read(path) -> option<string>`
读取 `~/.ironclaw/workspace/channels/<name>/` 下的文件。
- 路径**自动加前缀**,不能以 `/` 开头,不能含 `..`
- 文件不存在返回 `none`
### 4. `http-request(method, url, headers-json, body, timeout-ms) -> result<http-response, string>`
允许列表内的出站 HTTP(**这是渠道访问外网的主要出口**)。
- **凭据由宿主注入**,WASM 永不接触明文 secret
- 响应会做 secret 泄露扫描再返回
- `timeout-ms` 缺省 30000 ms,上限为 `callback_timeout`,适配 Telegram long-polling 等场景
### 5. `secret-exists(name) -> bool`
只能**判定是否存在**,永远拿不到值。真正的凭据在 `http-request` 调用时由宿主按允许列表注入。
---
## 三、渠道特定能力(Channel-Specific, 4 个)
### 6. `store-attachment-data(attachment-id, data) -> result`
把下载好的二进制(语音、图片、PDF 原文等)先存到宿主侧,再通过 `emit-message` 把 `attachment-id` 关联回去。
- 单个附件 ≤ 20 MB,整个 callback ≤ 50 MB
- callback 结束后**自动清除**
### 7. `emit-message(msg)`
**核心能力**:把用户消息推入 `MessageStream` 等待 agent 处理。
- 速率:单 callback ≤ 100 条,渠道全局可在配置中限流
- 内容 ≤ 64 KB
- callback 成功结束才真正投递,失败则丢弃(不会发出半成品消息)
### 8. `workspace-write(path, content) -> result`
写入 `channels/<name>/` 命名空间下的工作区文件。`emit-message` 可借助 `metadata-json` 引用,方便 agent 看到渠道侧的上下文。
### 9. `websocket-send-text(payload) -> result`
通过宿主维护的 WebSocket runtime 主动推送文本帧。**传输无关**:渠道自己负责构造协议 payload(如 Slack Events API、Discord gateway)。渠道无活跃 WS 或正在关闭时返回错误。
---
## 四、DM 配对能力(DM Pairing, 3 个)
用于"陌生发件人"白名单流程。
### 10. `pairing-upsert-request(channel, id, meta-json) -> result<(code, created)>`
陌生用户首次发消息时,宿主**幂等创建**配对请求,返回短码。
- `created=true`:渠道应发一条配对提示给用户
- `created=false`:已存在,沿用旧码(避免重复骚扰)
### 11. `pairing-resolve-identity(channel, external-id) -> result<option<string>>`
把 `(channel, external-id)` 解析成已配对的 `owner-id`,agent 后续按 owner 做权限与会话归属判定。**新渠道应当优先使用它**而非旧的 `allowFrom` 机制。
### 12. `pairing-read-allow-from(channel) -> result<list<string>>`
读取某渠道历史已配对的外部 ID 列表,仅为**兼容旧的 allowFrom 准入检查**。新接入的渠道不应再依赖它。
---
## 五、汇总
| # | 函数 | 类别 | 一句话用途 |
| --- | -------------------------- | ---- | --------------------------------- |
| 1 | `log` | 基础 | 结构化日志 |
| 2 | `now-millis` | 基础 | 统一时间戳 |
| 3 | `workspace-read` | 基础 | 读渠道工作区 |
| 4 | `http-request` | 基础 | 出站 HTTP(含凭据注入与泄露扫描) |
| 5 | `secret-exists` | 基础 | 仅探测 secret 是否存在 |
| 6 | `store-attachment-data` | 渠道 | 上传附件二进制到宿主 |
| 7 | `emit-message` | 渠道 | 把消息投递到 agent 队列 |
| 8 | `workspace-write` | 渠道 | 写渠道工作区 |
| 9 | `websocket-send-text` | 渠道 | 通过宿主 WS 主动推帧 |
| 10 | `pairing-upsert-request` | 配对 | 创建/复用配对短码 |
| 11 | `pairing-resolve-identity` | 配对 | 解析 paired owner-id(推荐) |
| 12 | `pairing-read-allow-from` | 配对 | 读取旧 allowFrom(兼容) |
**安全模型总结**(与文件头注释一致):WASM 是不可信沙箱、每个 callback 用全新实例、默认零能力、secret 永不外泄、`channels/<name>/` 前缀防逃逸、出入消息均有速率与大小限制。# `channel` 接口(沙箱侧)统计
按 `wit/channel.wit` 中 `interface channel` 的定义,沙箱自身**实现 6 个生命周期回调(func)+ 6 个数据类型**。下表先给可调用方法,再讲细节。
| # | 方法 | 类别 | 触发方 | 一句话职责 |
| --- | --------------------------------------------------------- | -------- | ----------- | ---------------------------------------- |
| 1 | `on-start(config-json) -> result<channel-config, string>` | 生命周期 | 宿主加载时 | 上报 HTTP 端点 / 轮询配置 |
| 2 | `on-http-request(req) -> outgoing-http-response` | HTTP | HTTP 路由器 | 处理入站 webhook |
| 3 | `on-poll()` | 轮询 | 轮询调度器 | 周期性拉取(如 Telegram getUpdates) |
| 4 | `on-respond(response) -> result<_, string>` | 出站 | 响应调度 | 把 agent 回复投递到渠道 |
| 5 | `on-status(update)` | 状态 | 状态广播 | 展示"思考中/工具执行/需要审批"等 UI 状态 |
| 6 | `on-broadcast(user-id, response) -> result<_, string>` | 出站 | 后台/告警 | 主动推送,无前置入站消息 |
| — | `on-shutdown()` | 生命周期 | 宿主卸载 | 释放资源 |
> 备注:`on-shutdown` 在文件 `interface channel` 段尾出现(行 431),属于"清理回调",未编号进上表但仍可被宿主调用。
---
## 一、数据类型(沙箱侧用得到,6 个)
| 类型 | 形态 | 用途 |
| ------------------------------- | ------ | ------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `http-endpoint-config` | record | 注册到宿主 HTTP 路由器的端点描述 |
| `poll-config` | record | 轮询间隔(≥ 30 000 ms)与开关 |
| `channel-config` | record | `on-start` 的返回值:display-name + endpoints + poll |
| `incoming-http-request` | record | 入站 webhook:method/path/headers/query/body/`secret-validated` |
| `outgoing-http-response` | record | `on-http-request` 的同步响应 |
| `attachment` / `agent-response` | record | 出站消息体,含原始字节附件 |
| `status-type` (enum) | enum | `thinking`/`done`/`interrupted`/`tool-started`/`tool-completed`/`tool-result`/`approval-needed`/`status`/`job-started`/`auth-required`/`auth-completed` |
| `status-update` | record | `on-status` 收到的状态消息 |
---
## 二、逐个讲解
### 1. `on-start(config-json) -> result<channel-config, string>`
- **何时被调**:宿主首次加载该 WASM 时调用一次
- **输入**:渠道能力文件中的原始 JSON 配置
- **输出**:`channel-config`——告诉宿主
- 展示名(`display-name`)
- 要注册哪些 HTTP 端点(路径、方法、是否要 `require-secret` 校验)
- 是否开启轮询及 `interval-ms`(最低 30 秒,低于此值会被宿主拒绝)
- **失败语义**:返回 `Err(string)` 时宿主将拒绝加载该渠道
### 2. `on-http-request(req) -> outgoing-http-response`
- **何时被调**:HTTP 路由器把请求转过来
- **关键字段**:`secret-validated: bool` 表明宿主已先做 secret 校验,沙箱不应再做(也不可信)
- **必须同步返回**一个 `outgoing-http-response`,否则连接挂起
- **常配合 `emit-message`** 把入站消息排入 agent 队列
### 3. `on-poll()`
- **何时被调**:宿主轮询调度器按 `poll.interval-ms` 触发
- **无入参、无返回值**,内部应通过 `channel-host.emit-message` 上报发现的消息
- **适用场景**:无 webhook 的渠道(Telegram getUpdates、IMAP 轮询等)
### 4. `on-respond(response) -> result<_, string>`
- **何时被调**:agent 针对该渠道之前 `emit-message` 进来的消息产出了回复
- **入参**:`agent-response` 包含 `message-id`(用于关联)、`content`、`thread-id`、metadata、可选 `attachments`(**原始字节**)
- **职责**:把回复真正发到外部系统(Slack 调 `chat.postMessage`、Telegram 调 `sendMessage` 等)
- **凭据**:通过 `channel-host.http-request` 发起,宿主负责注入 secret
### 5. `on-status(update)`
- **何时被调**:agent 状态机变更(`status-type` 中 11 种事件)
- **典型用途**:
- `thinking` → 在 UI 上显示"typing…"或 emoji
- `tool-started`/`tool-completed`/`tool-result` → 展示工具进度
- `approval-needed` → 渲染审批按钮
- `auth-required`/`auth-completed` → 通知用户 OAuth 流程
- `job-started` → 提示后台 sandbox 任务已启动
- **无返回值**,纯通知;失败由宿主记日志
### 6. `on-broadcast(user-id, response) -> result<_, string>`
- **何时被调**:agent **主动**向某用户发消息,无前置入站
- **用途**:告警、定时播报、带附件的主动通知
- **与 `on-respond` 区别**:没有要关联的 `message-id`(或说消息并非回复),接收方由 `user-id` 直接指定
### 7. `on-shutdown()`
- **何时被调**:宿主卸载/重启渠道实例
- **职责**:关闭 WS、释放文件句柄、flush 缓存
- **无入参无返回值**
---
## 三、调用方向总结图
```
宿主 (host) 沙箱 (WASM channel)
│ │
│── on-start(config-json) ────────────▶│ (启动)
│◀─ channel-config ─────────────────── │
│ │
│── on-http-request(req) ─────────────▶│ (webhook 触发)
│◀─ outgoing-http-response ────────────│
│ │
│── on-poll() ────────────────────────▶│ (轮询触发)
│ │
│── on-respond(response) ─────────────▶│ (agent 回复)
│◀─ result<_, err> ────────────────────│
│ │
│── on-status(update) ────────────────▶│ (状态广播)
│ │
│── on-broadcast(user-id, response) ──▶│ (主动推送)
│◀─ result<_, err> ────────────────────│
│ │
│── on-shutdown() ────────────────────▶│ (卸载)
```
**沙箱不能主动调宿主**——所有"反向"能力(出站 HTTP、写文件、发消息、WS 发帧、配对)都通过 `channel-host` 接口的 12 个 import 函数(参见上一轮讲解)来调用。
---
## 四、关键约束速记
- **每个 callback 跑在全新 WASM 实例上**,沙箱不能依赖跨调用的内存状态;需要持久化请走 `workspace-read/write`
- **凭据永不进 WASM**:所有需要 secret 的 HTTP 一律走 `channel-host.http-request`,由宿主按允许列表注入
- **同步返回**:只有 `on-http-request` 必须同步返回响应;其他回调都是"先入队、再回调"的批处理模式
- **状态广播频次**:`on-status` 由宿主按事件触发,沙箱应保持幂等(同一状态可能多次到达)④链路相关代码
关键区分
WASM channel 不是消息流的必经路径;它更像是"宿主里的一个沙箱",通过宿主侧的入队/出队通道和 agent 通信。完整数据流是:
外部用户
│ webhook / poll
▼
[宿主 HTTP 路由器 / 轮询调度器] ← 真实入口,永远在 Rust 侧
│ 反序列化
▼
[WASM channel 沙箱实例] ← 调用 on-http-request / on-poll
│ 内部逻辑
▼
channel-host.emit-message() ← 沙箱 *主动调用* 宿主 import
│
▼
[宿主 MessageStream / 队列] ← 关键中转,不在 WASM 里
│
▼
[Agent 引擎] ← 读取消息、推理、调工具
│
▼
[宿主响应分发器] ← 拿到 agent-response
│
▼
[WASM channel 沙箱实例] ← 调 on-respond / on-broadcast
│ 内部用 http-request 把消息发到 Slack/Telegram…
▼
外部用户main.rs
// Start the unified webhook server if any routes were registered. wasm/tool启动一个webhook服务器
let webhook_server: Option<Arc<tokio::sync::Mutex<WebhookServer>>> = if !webhook_routes
.is_empty()
{
里面会添加路由:去监听wasmChannel发的请求。
Router::new()
// Catch-all for webhook paths
.route("/webhook/{*path}", get(webhook_handler))
.route("/webhook/{*path}", post(webhook_handler))
.with_state(state)/// Generic webhook handler that routes to the appropriate WASM channel.
async fn webhook_handler(
State(state): State<RouterState>,
method: Method,
Path(path): Path<String>,
Query(query): Query<HashMap<String, String>>,
headers: HeaderMap,
body: Bytes,
) -> impl IntoResponse {
// Find the channel for this path
let channel = match state.router.get_channel_for_path(&full_path)//根据路径找到对应的channel
//检查方法是否在白名单
if !state
.router
.method_allowed_for_path(&full_path, &method)
.await
//签名校验
如果该 channel 注册了 webhook secret:
- 拿 secret_header 配置(请求头名,比如 X-Hub-Signature-256)
- 拿注册时存的 secrets: HashMap<String, String>
- 计算 HMAC 比对
- 校验失败 → 返回 401 "Invalid secret"
- 通过 → did_authenticate = true
//调该channel的wasm方法
match channel
.call_on_http_request(
method.as_str(),
&full_path,
&headers_map,
&query,
&body,
secret_validated,
)
.await /// Execute the on_http_request callback.
///
/// Called when an HTTP request arrives at a registered endpoint.
pub async fn call_on_http_request(
&self,
method: &str,
path: &str,
headers: &HashMap<String, String>,
query: &HashMap<String, String>,
body: &[u8],
secret_validated: bool,
) -> Result<HttpResponse, WasmChannelError> {
let _callback_guard = self.callback_lock.lock().await;
tracing::info!(
channel = %self.name,
method = method,
path = path,
body_len = body.len(),
secret_validated = secret_validated,
"call_on_http_request invoked (webhook received)"
);
// Log the body for debugging (truncated at char boundary)
if let Ok(body_str) = std::str::from_utf8(body) {
let truncated = if body_str.chars().count() > 1000 {
format!("{}...", body_str.chars().take(1000).collect::<String>())
} else {
body_str.to_string()
};
tracing::debug!(body = %truncated, "Webhook request body");
}
// Log credentials state (without values)
let creds = self.get_credentials().await;
tracing::info!(
credential_count = creds.len(),
credential_names = ?creds.keys().collect::<Vec<_>>(),
"Credentials available for on_http_request"
);
// If no WASM bytes, return 200 OK (for testing)
if self.prepared.component().is_none() {
tracing::debug!(
channel = %self.name,
method = method,
path = path,
"WASM channel on_http_request called (no WASM module)"
);
return Ok(HttpResponse::ok());
}
let runtime = Arc::clone(&self.runtime);
let prepared = Arc::clone(&self.prepared);
let capabilities = Self::inject_workspace_reader(&self.capabilities, &self.workspace_store);
let timeout = self.runtime.config().callback_timeout;
let credentials = self.get_credentials().await;
let host_credentials = resolve_channel_host_credentials(
&self.capabilities,
self.secrets_store.as_deref(),
&self.owner_scope_id,
)
.await;
let pairing_store = self.pairing_store.clone();
let workspace_store = self.workspace_store.clone();
let websocket_outbound_tx = self.websocket_outbound_tx.read().await.clone();
// Prepare request data
let method = method.to_string();
let path = path.to_string();
let headers_json = serde_json::to_string(&headers).unwrap_or_default();
let query_json = serde_json::to_string(&query).unwrap_or_default();
let body = body.to_vec();
let channel_name = self.name.clone();
// Execute in blocking task with timeout
let result = tokio::time::timeout(timeout, async move {
tokio::task::spawn_blocking(move || {
let mut store = Self::create_store(
&runtime,
&prepared,
&capabilities,
credentials,
host_credentials,
pairing_store,
websocket_outbound_tx,
)?;
let instance = Self::instantiate_component(&runtime, &prepared, &mut store)?;
// Build the WIT request type
let wit_request = wit_channel::IncomingHttpRequest {
method,
path,
headers_json,
query_json,
body,
secret_validated,
};
// Call on_http_request using the generated typed interface
let channel_iface = instance.near_agent_channel();
let wit_response = channel_iface
.call_on_http_request(&mut store, &wit_request)//调用wasm的实现的方法
.map_err(|e| Self::map_wasm_error(e, &prepared.name, prepared.limits.fuel))?;
let response = convert_http_response(wit_response);
let mut host_state =
Self::extract_host_state(&mut store, &prepared.name, &capabilities);
// Commit pending workspace writes to the persistent store
let committed_paths =
Self::commit_callback_workspace_writes(&mut host_state, &workspace_store);
Ok((response, host_state, committed_paths))
})
.await
.map_err(|e| WasmChannelError::ExecutionPanicked {
name: channel_name.clone(),
reason: e.to_string(),
})?
})
.await;
let channel_name = self.name.clone();
match result {
Ok(Ok((response, mut host_state, committed_paths))) => {
self.persist_durable_workspace_snapshot_if_needed(&committed_paths)
.await;
// Process emitted messages
let emitted = host_state.take_emitted_messages();
self.process_emitted_messages(emitted).await?;
tracing::debug!(
channel = %channel_name,
status = response.status,
"WASM channel on_http_request completed"
);
Ok(response)
}
Ok(Err(e)) => Err(e),
Err(_) => Err(WasmChannelError::Timeout {
name: channel_name,
callback: "on_http_request".to_string(),
}),
}
}ult<HttpResponse, WasmChannelError> {
飞书为例子
fn on_http_request(req: IncomingHttpRequest) -> OutgoingHttpResponse {
// Parse the request body as UTF-8.
let body_str = match std::str::from_utf8(&req.body) {
Ok(s) => s,
Err(_) => {
return json_response(400, serde_json::json!({"error": "Invalid UTF-8 body"}));
}
};
// Parse as Feishu event envelope.
let event: FeishuEvent = match serde_json::from_str(body_str) {
Ok(e) => e,
Err(e) => {
channel_host::log(
channel_host::LogLevel::Error,
&format!("Failed to parse Feishu event: {}", e),
);
return json_response(200, serde_json::json!({}));
}
};
let configured_token =
channel_host::workspace_read(VERIFICATION_TOKEN_PATH).filter(|token| !token.is_empty());
if !is_authenticated_webhook(
req.secret_validated,
configured_token.as_deref(),
request_verification_token(&event),
) {
channel_host::log(
channel_host::LogLevel::Warn,
"Rejecting unauthenticated Feishu webhook request",
);
return json_response(
401,
serde_json::json!({"error": "Webhook authentication failed"}),
);
}
// Handle URL verification challenge (initial webhook setup).
if event.event_type.as_deref() == Some("url_verification") {
if let Some(challenge) = &event.challenge {
channel_host::log(
channel_host::LogLevel::Info,
"Handling URL verification challenge",
);
return json_response(200, serde_json::json!({ "challenge": challenge }));
}
}
// Handle v2.0 events.
if let Some(header) = &event.header {
match header.event_type.as_str() {
"im.message.receive_v1" => {
if let Some(event_data) = &event.event {
handle_message_event(event_data);///处理数据
}
}
other => {
channel_host::log(
channel_host::LogLevel::Debug,
&format!("Ignoring event type: {}", other),
);
}
}
}
// Always respond 200 quickly (Feishu expects fast responses).
json_response(200, serde_json::json!({}))
}
******************
Feishu 沙箱内 on_http_request 做的事
位置:channels-src/feishu/src/lib.rs:371-439
完整 7 步处理
1. 解析 body 为 UTF-8 字符串(行 373-378)
let body_str = match std::str::from_utf8(&req.body) { ... };
// 失败 → 返回 400 "Invalid UTF-8 body"
2. 解析为飞书事件信封(行 381-390)
let event: FeishuEvent = match serde_json::from_str(body_str) { ... };
// 失败 → 返回 200 {}(不报错,飞书重试会很烦)
FeishuEvent 是飞书 Event Subscription v2.0 的标准 envelope,含 schema / header / event / challenge / token / type 字段。
3. 校验 webhook 来源(行 392-407)
双重认证:
- 首选:宿主已校验过 secret(req.secret_validated == true)
- 回退:对比请求里带的 verification_token 与本地配置的 token
fn is_authenticated_webhook(secret_validated, configured_token, request_token) -> bool {
if secret_validated { return true; } // 宿主已认证
// 否则用 constant-time 比较 verification token
bool::from(expected.as_bytes().ct_eq(provided.as_bytes()))
}
任一通过 → 继续;都失败 → 返回 401。
4. 处理 URL 验证挑战(行 410-418)
飞书首次配置 webhook 时会发 url_verification 类型的事件:
{"type": "url_verification", "challenge": "xxx", "token": "yyy"}
沙箱必须回 {"challenge": "xxx"} 完成握手。
5. 处理 v2.0 事件(行 421-435)
按 header.event_type 分发:
- "im.message.receive_v1" → 调 handle_message_event(event_data)(核心逻辑)
- 其它 → 记 debug 日志忽略
6. 调 handle_message_event(行 471-623)
这是真正处理消息的地方,分 6 个子步骤:
6.1 解析消息事件结构
- sender_id.open_id → 发送者 ID
- message.message_id / chat_id / chat_type(p2p 或 group)
- message.content(JSON 字符串,text 类型为 {"text": "..."})
- message.mentions / parent_id / root_id
6.2 Owner 限制(行 491-499)
如果 OWNER_ID_PATH 在 workspace 里被设了(on_start 写入),只接受 owner 的消息,其它静默忽略。
6.3 allow_from 白名单(行 502-515)
如果 ALLOW_FROM_PATH 配置了用户列表,只接受列表内的用户。
6.4 DM 配对流程(行 525-582)— p2p 私聊专用
按 dm_policy("open" 或 "pairing")分支:
┌────────────────────────────────────────┬───────────────────────────────────────────────────────────────────────────────────────────────────┐
│ 情况 │ 行为 │
├────────────────────────────────────────┼───────────────────────────────────────────────────────────────────────────────────────────────────┤
│ dm_policy = "open" │ 跳过配对检查 │
├────────────────────────────────────────┼───────────────────────────────────────────────────────────────────────────────────────────────────┤
│ dm_policy = "pairing" 且 sender 已配对 │ user_id = owner_id(消息归属到 owner) │
├────────────────────────────────────────┼───────────────────────────────────────────────────────────────────────────────────────────────────┤
│ dm_policy = "pairing" 且 sender 未配对 │ 调 pairing_upsert_request 创建配对码 → 主动发飞书消息告诉用户配对码 → return(不调 emit_message) │
├────────────────────────────────────────┼───────────────────────────────────────────────────────────────────────────────────────────────────┤
│ 配对查询失败 │ 记错误日志,return │
└────────────────────────────────────────┴───────────────────────────────────────────────────────────────────────────────────────────────────┘
6.5 提取文本(行 585-595)
- 只处理 message_type == "text"
- image / post / file 等类型 → 记 debug 日志,忽略
提取逻辑(extract_text_content,行 629-649):
1. 解析 content JSON 字符串里的 "text" 字段
2. 把 @_user_1 这种 mention 占位符替换为真实用户名
3. trim
6.6 构造 metadata + emit_message(行 598-622)
let metadata = FeishuMessageMetadata {
chat_id, message_id, chat_type,
};
let thread_id = root_id.or(parent_id); // 飞书的 thread 链
channel_host::emit_message(&EmittedMessage {
user_id, // owner_id(已配对)或 sender_id
user_name: None,
content: text,
thread_id,
metadata_json, // 序列化的 FeishuMessageMetadata,回复时回读
attachments: vec![], // 当前实现不支持附件
});
fn handle_message_event(event_data: &serde_json::Value) {
// 1. 解析消息结构 → MessageReceiveEvent
// 2. 提取发送者 open_id → sender_id
// 3. Owner 限制过滤 → 陌生人发给 owner 机器人
// 4. allow_from 白名单 → 特定用户白名单
// 5. DM 配对(p2p 专属) → 新用户配对流程
// 6. 提取文本内容 → 过滤掉图片/文件等
// 7. 构造 metadata + thread → 回复上下文
// 8. emit_message → 推给 agent
}
/// Handle an im.message.receive_v1 event.
fn handle_message_event(event_data: &serde_json::Value) {
let msg_event: MessageReceiveEvent = match serde_json::from_value(event_data.clone()) {
Ok(e) => e,
Err(e) => {
channel_host::log(
channel_host::LogLevel::Error,
&format!("Failed to parse message event: {}", e),
);
return;
}
};
let sender_id = msg_event
.sender
.sender_id
.open_id
.as_deref()
.unwrap_or("unknown");
// Owner restriction check.
if let Some(owner_id) = channel_host::workspace_read(OWNER_ID_PATH) {
if !owner_id.is_empty() && sender_id != owner_id {
channel_host::log(
channel_host::LogLevel::Debug,
&format!("Ignoring message from non-owner: {}", sender_id),
);
return;
}
}
// allow_from restriction: if configured, only listed user IDs may interact.
if let Some(allow_from_json) = channel_host::workspace_read(ALLOW_FROM_PATH) {
if let Ok(allow_list) = serde_json::from_str::<Vec<String>>(&allow_from_json) {
if !allow_list.is_empty() && !allow_list.iter().any(|id| id == sender_id) {
channel_host::log(
channel_host::LogLevel::Debug,
&format!(
"Ignoring message from user not in allow_from: {}",
sender_id
),
);
return;
}
}
}
// DM pairing check for p2p chats.
let chat_type = msg_event.message.chat_type.as_deref().unwrap_or("unknown");
// Resolved user_id for the emitted message. Defaults to sender_id but
// is overwritten with the owner_id when the sender is paired, ensuring
// the message is scoped to the correct owner/tenant.
let mut user_id = sender_id.to_string();
if chat_type == "p2p" {
let dm_policy =
channel_host::workspace_read(DM_POLICY_PATH).unwrap_or_else(|| "pairing".to_string());
if dm_policy == "pairing" {
match channel_host::pairing_resolve_identity("feishu", sender_id) {
Ok(Some(owner_id)) => {
// Sender is paired; scope message to owner.
user_id = owner_id;
}
Ok(None) => {
// Unknown sender — upsert a pairing request.
let meta = serde_json::json!({
"sender_id": sender_id,
"chat_id": msg_event.message.chat_id,
"chat_type": chat_type,
});
match channel_host::pairing_upsert_request(
"feishu",
sender_id,
&meta.to_string(),
) {
Ok(result) => {
channel_host::log(
channel_host::LogLevel::Info,
&format!(
"Pairing request created for {}: {}",
sender_id, result.code
),
);
let _ = send_message(
sender_id,
"open_id",
&format!(
"Enter this code in IronClaw to pair your feishu account: `{}`. CLI fallback: `ironclaw pairing approve feishu {}`",
result.code, result.code
),
);
}
Err(e) => {
channel_host::log(
channel_host::LogLevel::Error,
&format!("Pairing upsert failed: {}", e),
);
}
}
return;
}
Err(e) => {
channel_host::log(
channel_host::LogLevel::Error,
&format!("Pairing check failed: {}", e),
);
return;
}
}
}
}
// Extract text content.
let text = extract_text_content(&msg_event.message);
if text.is_empty() {
channel_host::log(
channel_host::LogLevel::Debug,
&format!(
"Ignoring non-text message type: {}",
msg_event.message.message_type
),
);
return;
}
// Build metadata for responding.
let metadata = FeishuMessageMetadata {
chat_id: msg_event.message.chat_id.clone(),
message_id: msg_event.message.message_id.clone(),
chat_type: chat_type.to_string(),
};
let metadata_json = serde_json::to_string(&metadata).unwrap_or_else(|_| "{}".to_string());
// Determine thread ID from reply chain.
let thread_id = msg_event
.message
.root_id
.as_deref()
.or(msg_event.message.parent_id.as_deref())
.map(|s| s.to_string());
// Emit message to the agent.
channel_host::emit_message(&EmittedMessage {//宿主能力
user_id,
user_name: None,
content: text,
thread_id,
metadata_json,
attachments: vec![],
});
}src/channels/wasm/wrapper.rs:634-698
/// Store data for WASM channel execution.
///
/// Contains the resource limiter, channel-specific host state, and WASI context.
struct ChannelStoreData {
limiter: WasmResourceLimiter,
host_state: ChannelHostState,//消息先emit进这里
wasi: WasiCtx,
table: ResourceTable,
/// Injected credentials for URL substitution (e.g., bot tokens).
/// Keys are placeholder names like "TELEGRAM_BOT_TOKEN".
credentials: HashMap<String, String>,
/// Pre-resolved credentials for automatic host-based injection.
/// Applied per-request by matching the URL host against host_patterns.
host_credentials: Vec<ResolvedHostCredential>,
/// Pairing store for DM pairing (guest access control).
pairing_store: Arc<PairingStore>,
/// Optional websocket outbound sender for channels with a managed runtime.
websocket_outbound_tx: Option<mpsc::Sender<String>>,
/// Dedicated tokio runtime for HTTP requests, lazily initialized.
/// Reused across multiple `http_request` calls within one execution.
http_runtime: Option<tokio::runtime::Runtime>,
}
// Implement the generated Host trait for channel-host interface
impl near::agent::channel_host::Host for ChannelStoreData {
fn emit_message(&mut self, msg: near::agent::channel_host::EmittedMessage) {
// 1. 速率限制 (MAX_EMITS_PER_EXECUTION, 默认 100)
// 2. 校验 attachments (MIME / 大小 / 数量)
// 3. 截断超长 content (MAX_MESSAGE_CONTENT_SIZE = 64KB)
// 4. push 到 self.emitted_messages: Vec<EmittedMessage>
// 注: 此时还没投递!只是暂存在 ChannelStoreData
let attachments: Vec<crate::channels::wasm::host::Attachment> = msg
.attachments
.into_iter()
.map(|a| {
// Parse extras-json for well-known fields
let extras: serde_json::Value = if a.extras_json.is_empty() {
serde_json::Value::Null
} else {
serde_json::from_str(&a.extras_json).unwrap_or(serde_json::Value::Null)
};
let duration_secs = extras
.get("duration_secs")
.and_then(|v| v.as_u64())
.map(|v| v as u32);
// Merge stored binary data (from store-attachment-data host call)
let data = self
.host_state
.remove_attachment_data(&a.id)
.unwrap_or_default();
crate::channels::wasm::host::Attachment {
}
})
.collect();
let mut emitted = EmittedMessage::new(msg.user_id.clone(), msg.content.clone());
if let Some(name) = msg.user_name {
emitted = emitted.with_user_name(name);
}
if let Some(tid) = msg.thread_id {
emitted = emitted.with_thread_id(tid);
}
emitted = emitted.with_metadata(msg.metadata_json);
emitted = emitted.with_attachments(attachments);
match self.host_state.emit_message(emitted) {//添加到队列
}
}
飞书的 emit_message 命中飞书的 ChannelStoreData 不是靠"类型匹配"——是靠 WasmChannel 实例的 name 字段:WasmChannel 是飞书专属的(注册时 name = "feishu"),它的 call_on_http_request 在 create_store 时把
prepared.name = "feishu" 传给 ChannelStoreData::new,再传给 ChannelHostState::new,于是沙箱内 self.host_state 天然就是飞书专属的。沙箱代码不感知"自己在飞书里"——它只调
channel_host::emit_message,import stub 自动转发到正确的 host state。
# WASM 沙箱怎么知道调 `ChannelStoreData` 的方法
这个问题分两层:
1. **WASM 怎么"知道"调哪个 Rust 方法** —— 由 `wit-bindgen` + wasmtime linker 决定
2. **怎么把飞书的 `ChannelStoreData` 实例传进去** —— 由 wasmtime `Linker` 的 state 机制决定
## 关键:沙箱根本不知道 `ChannelStoreData` 的存在
**沙箱只知道一个抽象的 import 函数签名**,对应 `wit/channel.wit` 里 `interface channel-host` 声明的 12 个 func。它不"知道"任何 Rust 类型。
## 完整机制:3 个组件协作
### 组件 1:`wit-bindgen` 生成的 import stub(沙箱侧)
**位置**:编译时由 `wit-bindgen` 生成
**`channels-src/feishu/src/lib.rs:30-33`**:
```rust
wit_bindgen::generate!({
world: "sandboxed-channel",
path: "../../wit/channel.wit",
});
```
`wit-bindgen` 读 `wit/channel.wit`,生成:
- 沙箱**导出**的 `on_start` / `on_http_request` 等 6 个 export 函数(`channel` interface)
- 沙箱**导入**的 `emit_message` / `log` / `workspace_read` 等 12 个 import 函数(`channel-host` interface)
生成的 `channel_host::emit_message` 看起来像普通函数调用,**实际**展开成:
```rust
pub fn emit_message(msg: &EmittedMessage) {
unsafe {
// 调 wasmtime 的 canonical ABI trampoline
// 把参数按 WIT 编码规则写到线性内存
// 触发 host import
}
}
```
**沙箱源码**(`feishu/src/lib.rs:615`):
```rust
channel_host::emit_message(&EmittedMessage { ... });
// ↑
// 这个是 wit-bindgen 生成的本地函数
// 编译后变成 wasm import
```
**沙箱视角**:调一个普通函数。
**wasm 二进制视角**:这是一个 `import` 声明,import 名是 `near:agent/channel-host/emit-message`。
**Rust 视角**:这是一个由 `wit-bindgen` 展开的 trampoline。
### 组件 2:`Linker` 注册 host 实现(宿主侧)
**位置**:`src/channels/wasm/wrapper.rs:1486-1496`
```rust
// Add WASI support (required by the component adapter)
wasmtime_wasi::p2::add_to_linker_sync(linker).map_err(|e| {
WasmChannelError::Config(format!("Failed to add WASI functions: {}", e))
})?;
// Use the generated add_to_linker function from bindgen for our custom interface
SandboxedChannel::add_to_linker::<_, wasmtime::component::HasSelf<_>>(
linker,
|state: &mut ChannelStoreData| state, // ← 关键!告诉 linker 怎么拿 state
)
.map_err(|e| WasmChannelError::Config(format!("Failed to add host functions: {}", e)))?;
```
`SandboxedChannel::add_to_linker` 也是 `wit-bindgen` 生成的(编译时),**它做了两件事**:
1. **注册 12 个 host import**:每个 WIT `channel-host` 函数都注册一个 host 实现
2. **指定 state 提取器**:`|state: &mut ChannelStoreData| state` —— 告诉 wasmtime"沙箱调 host 函数时,state 从哪儿拿"
**沙箱调 `emit_message` 时实际发生的事**:
```
沙箱代码: channel_host::emit_message(msg)
↓ (WIT canonical ABI)
wasm import call
↓ (wasmtime router)
Linker 找到 import 名 "near:agent/channel-host/emit-message"
↓
查表: 这个 import → ChannelStoreData::emit_message (impl Host for ChannelStoreData)
↓
拿 state = |state: &mut ChannelStoreData| state (从 Store<ChannelStoreData> 里)
↓
调 ChannelStoreData::emit_message(state, msg)
```
### 组件 3:`Store<ChannelStoreData>` 提供 state 实例
**位置**:`src/channels/wasm/wrapper.rs:1897`
```rust
let store_data = ChannelStoreData::new(
limits.memory_bytes,
&prepared.name, // ← "feishu"
capabilities.clone(), // ← 飞书的 capabilities
credentials, // ← 飞书的凭据
host_credentials,
pairing_store,
websocket_outbound_tx,
);
let mut store = Store::new(engine, store_data); // ← 装进 Store
```
`Store<ChannelStoreData>` 是 wasmtime 的执行环境:
- `engine` —— wasmtime engine(编译好的 wasm)
- `store_data` —— **本次 callback 的 state 实例**
**沙箱调 `emit_message` 时**,wasmtime:
1. 拿到当前 `Store<ChannelStoreData>`(飞书专属的那个)
2. 用 `|state: &mut ChannelStoreData| state` 提取器拿到 `&mut ChannelStoreData`
3. 调 `ChannelStoreData::emit_message(state, msg)`(即 `impl Host for ChannelStoreData`)
**这就是飞书的 `ChannelStoreData` 怎么"传进去"的** —— 不是显式传,是 wasmtime 在沙箱调 import 时**自动**从 `Store` 里取。
## 完整时间线
```
[编译期]
channels-src/feishu/Cargo.toml
└─ wit-bindgen::generate!({ world: "sandboxed-channel", ... })
↓ 读 wit/channel.wit
↓ 生成:
│ - feishu/src/bindings.rs 包含 channel_host::emit_message (本地函数)
│ - 编译时这个本地函数展开成 wasm import call
src/channels/wasm/wrapper.rs
└─ 引入 wit_bindgen 生成的 add_to_linker
↓ 编译时生成:
│ - 12 个 host 函数实现注册到 Linker
│ - state 提取器: |state| state
[运行期 - 飞书 callback 到来]
飞书 POST /webhook/feishu
↓
webhook_handler 命中 WasmChannel (name="feishu")
↓
WasmChannel::call_on_http_request(self = 飞书的 Arc<WasmChannel>)
↓
create_store(prepared.name = "feishu")
↓
ChannelStoreData::new(name="feishu", ...) ← 飞书专属 state
↓
let mut store = Store::new(engine, store_data)
↓
instance = instantiate(prepared, &mut store)
↓
调沙箱的 on_http_request(wit_request)
↓ (wasmtime 执行沙箱)
↓ 沙箱内 channel_host::emit_message(msg)
↓
↓ wit-bindgen 生成的本地函数展开成:
↓ import "near:agent/channel-host/emit-message" (canonical ABI)
↓
↓ wasmtime 拦截 → 查 Linker 注册表
↓ 命中: ChannelStoreData::emit_message
↓
↓ 拿 state = |s: &mut ChannelStoreData| s
↓ 从 Store<ChannelStoreData> 提取 → 飞书的 store_data
↓
↓ 调 ChannelStoreData::emit_message(&mut 飞书 store_data, msg)
↓
↓ 在 impl Host for ChannelStoreData 里
↓ self.host_state.channel_name = "feishu" ← 自然就是飞书的
```
## 沙箱视角的"无知"
**沙箱**(`channels-src/feishu/src/lib.rs`)**完全不知道**:
- 自己在哪个 `ChannelStoreData` 里
- `ChannelStoreData` 这个 Rust 类型存在
- `impl Host for ChannelStoreData` 的存在
- wasmtime 的存在
沙箱只知道:
- 有一个 `channel_host` 模块
- 可以调 `channel_host::emit_message(msg)`
- msg 是个结构体,有 `user_id` / `content` 等字段
**这是 WIT(WebAssembly Interface Type)设计的核心:语言无关的接口定义**。同一份 `wit/channel.wit` 可以生成:
- Rust import(飞书沙箱用)
- Go import(未来可能的 Go 沙箱用)
- Python import(未来可能的 Python 沙箱用)
每个语言的 import 调法都不同,但 host 侧只要 `impl Host for WhateverState` 就行。
## 为什么 `impl for ChannelStoreData` 而不是 `impl for WasmChannel`?
`ChannelStoreData` 是**为每次 callback 临时构造的**:
- 飞书 callback #1 → `ChannelStoreData` 实例 A(带"feishu"标识)
- 飞书 callback #2 → `ChannelStoreData` 实例 B(带"feishu"标识)
- 同时 telegram callback → `ChannelStoreData` 实例 C(带"telegram"标识)
`WasmChannel` 是**持久的**,跨 callback 共享。如果 `impl for WasmChannel`:
- state 在 callback 间共享
- `emitted_messages: Vec` 会**累积**,callback 结束后不会清空
- 速率限制状态会混乱
`ChannelStoreData` 是**一次性的**:
- 新 callback → 新 `ChannelStoreData` → 新空 `Vec`
- callback 结束 → drop → 清空一切
- 状态隔离干净
## 一句话
**WASM 沙箱"知道"调 `ChannelStoreData` 的方法**——是 `wit-bindgen` 在编译期把抽象 WIT 接口展开成 4 个具体协作机制:
1. **沙箱侧**:生成 `channel_host::emit_message` 本地函数,编译成 wasm import call
2. **宿主侧**:用 `wit_bindgen` 生成的 `add_to_linker` 把 12 个 WIT 函数注册成 `ChannelStoreData` 的方法
3. **state 注入**:`Store::new(engine, ChannelStoreData_for_feishu)` 把飞书专属的 state 装进 wasmtime
4. **运行时路由**:沙箱调 import → wasmtime 查 Linker → 提取 Store 里的 state → 调 `ChannelStoreData::emit_message`
**沙箱代码完全不知道 `ChannelStoreData` 的存在**——它只调 `channel_host::emit_message`,WIT + wasmtime 完成所有"飞书识别"的工作。/// Host state for WASM channel callbacks.
///
/// Maintains all side effects during callback execution and enforces limits.
/// This is the channel-specific equivalent of HostState for tools.
pub struct ChannelHostState {
/// Base tool host state (logging, time, HTTP, etc.).
base: HostState,
/// Channel name (for error messages).
channel_name: String,
/// Channel capabilities.
capabilities: ChannelCapabilities,
/// Emitted messages (queued for delivery).
emitted_messages: Vec<EmittedMessage>,
/// Pending workspace writes.
pending_writes: Vec<PendingWorkspaceWrite>,
/// Emit count for rate limiting within this execution.
emit_count: u32,
/// Whether emit is still allowed (false after rate limit hit).
emit_enabled: bool,
/// Count of emits dropped due to rate limiting.
emits_dropped: usize,
/// Binary data stored for attachments via `store-attachment-data`.
/// Keyed by attachment ID, cleared after callback completes.
attachment_data: HashMap<String, Vec<u8>>,
/// Total bytes stored in attachment_data (for enforcing limits).
attachment_data_total: u64,
}
/// Emit a message from the channel.
///
/// Messages are queued and delivered after callback execution completes.
/// Rate limiting is enforced per-execution and globally.
/// Attachments are validated for count, total size, and MIME type.
pub fn emit_message(&mut self, msg: EmittedMessage) -> Result<(), WasmChannelError> {
// Check per-execution limit
if !self.emit_enabled {
self.emits_dropped += 1;
return Ok(()); // Silently drop, don't fail execution
}
self.emitted_messages.push(msg);//关键入队
self.emit_count += 1;
Ok(())
}
WASM 沙箱在同步的 spawn_blocking 里跑,mpsc::Sender::send() 是 async,沙箱没法直接调——消息只能先暂存到 Vec,等 spawn_blocking 结束回到 tokio 世界再 tx.send().await。 pub async fn call_on_http_request(
&self,
method: &str,
path: &str,
headers: &HashMap<String, String>,
query: &HashMap<String, String>,
body: &[u8],
secret_validated: bool,
) -> Result<HttpResponse, WasmChannelError> {在请求内执行完WASM后,尾部会从上面emit的队列里取消息并tx
......同之前刚开始
let channel_name = self.name.clone();
match result {
Ok(Ok((response, mut host_state, committed_paths))) => {
self.persist_durable_workspace_snapshot_if_needed(&committed_paths)
.await;
// Process emitted messages
let emitted = host_state.take_emitted_messages();//取出并发送到agentLoop
self.process_emitted_messages(emitted).await?;
tracing::debug!(
channel = %channel_name,
status = response.status,
"WASM channel on_http_request completed"
); /// Process emitted messages from a callback.
async fn process_emitted_messages(
&self,
messages: Vec<EmittedMessage>,
) -> Result<(), WasmChannelError> {
for emitted in messages {
if tx.send(msg).await.is_err() {//发送到agentLoop
tracing::error!(
channel = %self.name,
"Failed to send emitted message, channel closed"
);
break;
}agent执行完后,想要回复:调对应channel的response async fn respond(
&self,
msg: &IncomingMessage,
response: OutgoingResponse,
) -> Result<(), ChannelError> {
let result = self
.call_on_respond(//wasm能力
msg.id,
&response.content,
response.thread_id.as_ref().map(|t| t.as_str()),
&metadata_json,
&attachments,
&response.inline_attachments,
)
.await;
result.map_err(|e| ChannelError::SendFailed {//执行报错上抛
name: self.name.clone(),
reason: e.to_string(),
})?;
Ok(())channel-src
fn on_respond(response: AgentResponse) -> Result<(), String> {
let metadata: FeishuMessageMetadata = serde_json::from_str(&response.metadata_json)
.map_err(|e| format!("Failed to parse metadata: {}", e))?;
send_reply(&metadata.message_id, &response.content)
}fn send_reply(message_id: &str, content: &str) -> Result<(), String> {
let api_base = channel_host::workspace_read(API_BASE_PATH)
.unwrap_or_else(|| "https://open.feishu.cn".to_string());
let token = get_valid_token(&api_base)?;
let url = format!("{}/open-apis/im/v1/messages/{}/reply", api_base, message_id);
let body = ReplyMessageBody {
msg_type: "text".to_string(),
content: serde_json::json!({"text": content}).to_string(),
};
let body_json =
serde_json::to_string(&body).map_err(|e| format!("Failed to serialize body: {}", e))?;
let headers = serde_json::json!({
"Content-Type": "application/json; charset=utf-8",
"Authorization": format!("Bearer {}", token),
});
let result = channel_host::http_request(//借用宿主的http能力发给飞书服务器
"POST",
&url,
&headers.to_string(),
Some(body_json.as_bytes()),
Some(10_000),
);
match result {
Ok(response) => {
if response.status != 200 {
let body_str = String::from_utf8_lossy(&response.body);
return Err(format!(
"Feishu API returned {}: {}",
response.status, body_str
));
}
// Check API-level error code.
if let Ok(api_resp) =
serde_json::from_slice::<FeishuApiResponse<serde_json::Value>>(&response.body)
{
if api_resp.code != 0 {
return Err(format!(
"Feishu API error {}: {}",
api_resp.code, api_resp.msg
));
}
}
Ok(())
}
Err(e) => Err(format!("HTTP request failed: {}", e)),
}
}impl near::agent::channel_host::Host for ChannelStoreData {
fn http_request(
&mut self,
method: String,
url: String,
headers_json: String,
body: Option<Vec<u8>>,
timeout_ms: Option<u32>,
) -> Result<near::agent::channel_host::HttpResponse, String> {
// Inject credentials into URL (e.g., replace {TELEGRAM_BOT_TOKEN} with actual token)
let injected_url = self.inject_credentials(&url, "url");
// Log whether injection happened (without revealing the token)
let url_changed = injected_url != url;
tracing::info!(url_changed = url_changed, "URL after credential injection");
// Check if HTTP is allowed for this URL
self.host_state
.check_http_allowed(&injected_url, &method)
.map_err(|e| {
tracing::error!(error = %e, "HTTP not allowed");
format!("HTTP not allowed: {}", e)
})?;
// Record the request for rate limiting
self.host_state.record_http_request().map_err(|e| {
tracing::error!(error = %e, "Rate limit exceeded");
format!("Rate limit exceeded: {}", e)
})?;
// Parse headers from WASM and scan for leaks before credential injection.
// Host-injected tokens (e.g., xoxb- Slack bot token) would otherwise
// trigger the leak detector. The URL has template substitution applied
// (`injected_url`) but not yet host credential injection.
let raw_headers: std::collections::HashMap<String, String> = serde_json::from_str(
&headers_json,
)
.unwrap_or_else(|e| {
tracing::warn!(error = %e, "Malformed headers JSON from WASM; scanning empty headers");
std::collections::HashMap::new()
});
let mut logical_url = injected_url;
let leak_detector = LeakDetector::new();
let raw_header_vec: Vec<(String, String)> = raw_headers
.iter()
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
leak_detector//探测过滤
.scan_http_request(&logical_url, &raw_header_vec, body.as_deref())
.map_err(|e| format!("Potential secret leak blocked: {}", e))?;
// Now inject credentials into header values
// This allows patterns like "Authorization": "Bearer {WHATSAPP_ACCESS_TOKEN}"
let mut headers: std::collections::HashMap<String, String> = raw_headers
.into_iter()
.map(|(k, v)| {
(
k.clone(),
self.inject_credentials(&v, &format!("header:{}", k)),
)
})
.collect();
let headers_changed = headers
.values()
.any(|v| v.contains("Bearer ") && !v.contains('{'));
// Inject pre-resolved host credentials (Bearer tokens, API keys, etc.)
// after the leak scan so host-injected secrets don't trigger false positives.
if let Some(host) = extract_host_from_url(&logical_url) {
self.inject_host_credentials(&host, &mut headers, &mut logical_url);
}
// Get the max response size from capabilities (default 10MB).
let max_response_bytes = self
.host_state
.capabilities()
.tool_capabilities
.http
.as_ref()
.map(|h| h.max_response_bytes)
.unwrap_or(10 * 1024 * 1024);
// Resolve hostname and reject private/internal IPs to prevent DNS rebinding.
// Test/dev URL rewrites intentionally point at local fake servers.
if !allow_private_test_target {
reject_private_ip(&transport_url)?;
}
// Make the HTTP request using a dedicated single-threaded runtime.
// We're inside spawn_blocking, so we can't rely on the main runtime's
// I/O driver (it may be busy with WASM compilation or other startup work).
// A dedicated runtime gives us our own I/O driver and avoids contention.
// The runtime is lazily created and reused across calls within one execution.
if self.http_runtime.is_none() {
self.http_runtime = Some(
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.map_err(|e| format!("Failed to create HTTP runtime: {e}"))?,
);
}
let rt = self.http_runtime.as_ref().expect("just initialized");
let result = rt.block_on(async {
let client = ssrf_safe_client_builder()
.connect_timeout(Duration::from_secs(10))
.build()
.map_err(|e| format!("Failed to build HTTP client: {e}"))?;
let mut request = match method.to_uppercase().as_str() {
"GET" => client.get(&transport_url),
"POST" => client.post(&transport_url),
"PUT" => client.put(&transport_url),
"DELETE" => client.delete(&transport_url),
"PATCH" => client.patch(&transport_url),
"HEAD" => client.head(&transport_url),
_ => return Err(format!("Unsupported HTTP method: {}", method)),
};
// Add headers
for (key, value) in headers {
request = request.header(&key, &value);
}
// Add body if present
if let Some(body_bytes) = body {
request = request.body(body_bytes);
}
// Send request with caller-specified timeout (default 30s, max 5min).
let timeout_ms = timeout_ms.unwrap_or(30_000).min(300_000) as u64;
let timeout = std::time::Duration::from_millis(timeout_ms);
let response = request.timeout(timeout).send().await.map_err(|e| {///发送请求
// Walk the full error chain so we get the actual root cause
// (DNS, TLS, connection refused, etc.) instead of just
// "error sending request for url (...)".
let mut chain = format!("HTTP request failed: {}", e);
let mut source = std::error::Error::source(&e);
while let Some(cause) = source {
chain.push_str(&format!(" -> {}", cause));
source = cause.source();
}
chain
})?;
let status = response.status().as_u16();
let response_headers: std::collections::HashMap<String, String> = response
.headers()
.iter()
.filter_map(|(k, v)| {
v.to_str()
.ok()
.map(|v| (k.as_str().to_string(), v.to_string()))
})
.collect();
let headers_json = serde_json::to_string(&response_headers).unwrap_or_default();
// Enforce max response body size to prevent memory exhaustion.
let max_response = max_response_bytes;
if let Some(cl) = response.content_length()
&& cl as usize > max_response
{
return Err(format!(
"Response body too large: {} bytes exceeds limit of {} bytes",
cl, max_response
));
}
let body = response
.bytes()
.await
.map_err(|e| format!("Failed to read response body: {}", e))?;
if body.len() > max_response {
return Err(format!(
"Response body too large: {} bytes exceeds limit of {} bytes",
body.len(),
max_response
));
}
let body = body.to_vec();
tracing::info!(
status = status,
body_len = body.len(),
"HTTP response received"
);
// Log response body for debugging (truncated at char boundary)
if let Ok(body_str) = std::str::from_utf8(&body) {
let truncated = if body_str.chars().count() > 500 {
format!("{}...", body_str.chars().take(500).collect::<String>())
} else {
body_str.to_string()
};
tracing::debug!(body = %truncated, "Response body");
}
// Leak detection on response body (best-effort).
//
// Telegram `getUpdates` is special: it is inbound polling data, so
// user-pasted secrets can legitimately appear in the response body.
// Those messages are still checked later by the inbound message
// safety layer before they reach the LLM, so we allow the polling
// response to continue here to avoid poisoning the offset state.
if let Ok(body_str) = std::str::from_utf8(&body)
&& !should_skip_response_leak_scan(&logical_url)
{
leak_detector
.scan_and_clean(body_str)
.map_err(|e| format!("Potential secret leak in response: {}", e))?;
}
Ok(near::agent::channel_host::HttpResponse {
status,
headers_json,
body,
})
});
// Scrub credential values from error messages before logging or returning
// to WASM. reqwest::Error includes the full URL (with injected credentials)
// in its Display output.
let result = result.map_err(|e| self.redact_credentials(&e));
match &result {
Ok(resp) => {
tracing::info!(status = resp.status, "http_request completed successfully");
}
Err(e) => {
tracing::error!(error = %e, "http_request failed");
}
}
result
}⑤配对逻辑
现在有了完整链路。以下是飞书 DM 配对端到端流程:
---
## 飞书 DM 配对 — 完整链路
### 整体时序
```
外部用户 DM 飞书机器人
→ 飞书服务器 webhook
→ WebhookServer (axum)
→ WasmChannelRouter 路由
→ 签名验证
→ wasmtime 沙箱: on_http_request
→ handle_message_event
├── 已配对? → user_id=owner_id → emit_message → agent 处理
└── 未配对? → pairing_upsert_request(生成配对码)
→ send_message(DM 发配对码给用户)
→ return (不 emit, 消息不进 agent)
← 用户拿到配对码
→ Web UI 或 CLI 提交配对码
→ PairingStore::approve(写入 channel_identities 表)
→ propagate_approval(更新运行中 channel 的 owner 绑定 + 重启 polling)
← 用户再次发 DM
→ resolve_identity 命中 → emit_message → agent 正常处理
```
---
### 阶段 1:webhook 进沙箱,判断是否 p2p
`channels-src/feishu/src/lib.rs:525-581`:
```rust
if chat_type == "p2p" {
let dm_policy = channel_host::workspace_read(DM_POLICY_PATH)
.unwrap_or_else(|| "pairing".to_string());
if dm_policy == "pairing" {
match channel_host::pairing_resolve_identity("feishu", sender_id) {
Ok(Some(owner_id)) => {
user_id = owner_id; // 已配对,消息归属到 owner
}
Ok(None) => {
// 未配对 → 创建配对请求 + 发 DM 告知配对码
channel_host::pairing_upsert_request("feishu", sender_id, &meta);
send_message(sender_id, "open_id",
"Enter this code in IronClaw to pair your feishu account: `{code}`");
return; // ← 不 emit_message,消息不进 agent
}
}
}
}
```
关键判断在 `chat_type == "p2p"`(飞书的"私聊"即 DM)。群聊 (`chat_type == "group"`) 不受配对限制。
---
### 阶段 2:宿主侧 — `pairing_resolve_identity` 查 DB
飞书沙箱调的是 WIT host import → 落到 `src/channels/wasm/wrapper.rs:745`:
```rust
fn pairing_resolve_identity(&mut self, channel: String, external_id: String)
-> Result<Option<String>, String>
{
// block_in_place + block_on:在 spawn_blocking 的同步线程里跑异步 DB 查询
let result = tokio::task::block_in_place(move || {
handle.block_on(async move {
store.resolve_identity(&channel, &external_id).await
})
});
// 把 UserId 转回 String 给 WASM
}
```
然后 `src/pairing/store.rs:55`:
```rust
pub async fn resolve_identity(&self, channel: &str, external_id: &str)
-> Result<Option<UserId>, DatabaseError>
{
// 1. 先查内存缓存 (OwnershipCache)
if let Some(identity) = self.cache.get(&channel, external_id) {
return Ok(Some(identity)); // 热路径,零 DB 查询
}
// 2. 缓存未命中 → 查 DB (channel_identities JOIN users)
let identity = db.resolve_channel_identity(&channel, external_id).await?;
// 3. 查到就写入缓存
if let Some(ref id) = identity {
self.cache.insert(&channel, external_id, id.clone());
}
Ok(identity)
}
```
**返回 `Some(UserId)`** = 这个 `sender_id` 已被某个 IronClaw 用户 claim 过了,消息可以进 agent。
**返回 `None`** = 陌生人,需要走配对流程。
---
### 阶段 3:宿主侧 — `pairing_upsert_request` 生成配对码
`src/channels/wasm/wrapper.rs:715` → `PairingStore::upsert_request` → `src/pairing/store.rs:85`:
```rust
pub async fn upsert_request(&self, channel: &str, external_id: &str, meta: Option<Value>)
-> Result<PairingRequestRecord, DatabaseError>
{
db.upsert_pairing_request(&channel, external_id, meta).await
}
```
DB 层生成 8 位配对码(`src/pairing/code.rs` 字母表:`ABCDEFGHJKLMNPQRSTUVWXYZ23456789`,避免混淆字符 I/1、O/0),有效期 15 分钟。返回 `PairingRequestRecord { code, created, expires_at, ... }`。
飞书沙箱拿到 `code` 后,通过 `send_message`(飞书服务端 API)把配对码 DM 发给用户。
---
### 阶段 4:用户提交配对码 — 两条入口
**Web UI 路径**:`POST /api/pairing/{channel}/approve`
`src/channels/web/features/pairing/mod.rs:118` → `pairing_approve_handler`:
- 从 URL path 取 channel 名(如 `feishu`)
- 从 JSON body 取 `{ code, thread_id?, request_id? }`
- 调用 `PairingStore::approve`
- 成功后 `ext_mgr.complete_pairing_approval` 传播到运行中 channel
- 失败则 `store.revert_approval` 回滚
**CLI 路径**:`ironclaw pairing approve feishu XXXXXXX`
`src/cli/pairing.rs:132` → `run_approve`:
- 直接从 config 取 owner_id
- 调用 `PairingStore::approve`
- CLI 不做 channel 传播(无运行中 channel 引用)
---
### 阶段 5:`PairingStore::approve` — 写入配对关系
`src/pairing/store.rs:115`:
```rust
pub async fn approve(&self, channel: &str, code: &str, owner_id: &UserId)
-> Result<PairingApprovalRecord, DatabaseError>
{
// 规范化配对码(大小写、空格)
let normalized = code.trim().to_ascii_uppercase();
// DB 原子操作:验证 code 有效 + 创建 channel_identities 行
db.approve_pairing(&channel, &normalized, owner_id.as_str()).await
}
```
DB 层的 `approve_pairing` 做的事:
1. 在 `pairing_requests` 表查找 `(channel, code)` → 拿到 `external_id`
2. 在 `channel_identities` 表插入 `(channel, external_id, owner_id)`
3. 删除已消费的配对请求行
4. 这在一个事务内完成
**后续 `resolve_identity` 就能命中**了:缓存热路径直接返回 `Some(owner_id)`,连 DB 都不用查。
---
### 阶段 6:`propagate_approval` — 通知运行中 channel
`src/pairing/approval.rs:34`(仅 Web UI 路径,CLI 跳过):
```rust
pub async fn propagate_approval(channel, channel_name, external_id, deps) {
// 1. 记录旧状态(用于失败回滚)
let previous_owner = channel.owner_actor_id().await;
let previous_config = channel.config_json_snapshot().await;
// 2. 设置新 owner
channel.set_owner_actor_id(Some(external_id)).await;
// 3. 更新运行时配置(tunnel_url, owner_id 等)
channel.update_config(config_updates).await;
// 4. 调用 on_start() 重启 polling
let config = channel.call_on_start().await?;
// 5. 失败 → 回滚到旧状态
// 成功 → ensure_polling 启动新轮询
channel.ensure_polling(&config).await;
}
```
---
### 阶段 7:用户再次发 DM → 消息正常进入 agent
此次 `resolve_identity("feishu", "ou_xxx")` → `OwnershipCache` 命中 → `Some("ironclaw-user-1")`。
飞书沙箱设置 `user_id = "ironclaw-user-1"`,调用 `emit_message`。
---
### 阶段 8:`process_emitted_messages` 的 scope 二次校验
`src/channels/wasm/wrapper.rs:3185`:
```rust
let (resolved_user_id, is_owner_sender) = resolve_message_scope_with_pairing(
&self.name,
&self.owner_scope_id,
owner_actor_id.as_deref(),
channel_bound_user_id.as_deref(),
&user_id, // WASM 沙箱传出来的
self.pairing_store.as_ref(), // 再查一次 pairing store
).await;
```
这里的 `resolve_message_scope_with_pairing` 是**防御性二次校验**——即使 WASM 沙箱传了 `user_id`,宿主侧仍然用 `pairing_store` 再次确认归属关系,防止沙箱内的逻辑错误导致消息 scope 错乱。
---
### 一句话总结
**第一次 DM** → 飞书沙箱发现 `p2p` + 未配对 → 生成配对码 → DM 发给用户 → **消息不进入 agent**。
**用户在 Web UI/CLI 输入配对码** → `PairingStore::approve` 写入 `channel_identities(飞书, sender_id, owner_id)` → `propagate_approval` 重启 channel polling。
**第二次 DM** → `resolve_identity` 命中 → `user_id = owner_id` → `emit_message` → agent 正常处理。好问题。**飞书不知道**——必须由人去飞书开放平台手动配置 webhook 地址。
## 完整初始化链路
```
[IronClaw 启动]
→ 加载 feishu.wasm
→ on_start() → 返回 http_endpoints: [{ path: "/webhook/feishu", methods: ["POST"] }]
→ WasmChannelRouter 注册路由 /webhook/feishu → POST
→ WebhookServer 启动,监听本地端口 (如 127.0.0.1:8742)
→ Tunnel (cloudflared/ngrok) 启动 → 公网 URL: https://xxx.tunnel.example.com
[人工操作] 开发者去飞书开放平台
→ 创建自建应用 → 开启"机器人"能力
→ 事件订阅 → 配置请求网址: https://xxx.tunnel.example.com/webhook/feishu
→ 飞书向该 URL 发送 URL 验证请求 (challenge)
[IronClaw 收到验证]
→ webhook_handler → 路由到 feishu.wasm
→ on_http_request → event_type == "url_verification"
→ 返回 { challenge: "收到的 challenge 值" }
→ 飞书确认验证通过 ✓
[此后] 用户给飞书机器人发 DM
→ 飞书服务器 POST https://xxx.tunnel.example.com/webhook/feishu
→ IronClaw WebhookServer 收到 → 沙箱处理 → 配对/emit
```
## 关键:`on_start()` 返回的 `http_endpoints` 就是注册的路由
`channels-src/feishu/src/lib.rs:360-368`:
```rust
fn on_start(config_json: String) -> Result<ChannelConfig, String> {
// ... 存储 app_id, app_secret, verification_token 到 workspace ...
// ... 获取 tenant_access_token ...
Ok(ChannelConfig {
display_name: "Feishu".to_string(),
http_endpoints: vec![HttpEndpointConfig {
path: "/webhook/feishu".to_string(), // ← 这个路径被注册到 axum Router
methods: vec!["POST".to_string()],
require_secret: false,
}],
poll: None, // 飞书用 webhook,不用轮询
})
}
```
宿主侧在 `src/channels/wasm/setup.rs:281` 用这个信息注册路由:
```rust
let webhook_path = format!("/webhook/{}", channel_name); // → "/webhook/feishu"
// 注册到 WasmChannelRouter → 最终合并到 WebhookServer 的 axum Router
```
同时把 tunnel 的公网 URL 通过运行时配置注入:
```rust
let mut config_updates = build_runtime_config_updates(
config.tunnel.public_url.as_deref(), // ← https://xxx.tunnel.example.com
webhook_secret.as_deref(),
owner_actor_id.as_deref(),
);
channel.update_config(config_updates).await;
```
这样飞书沙箱内部可以拿到 `tunnel_url` 来构造回链等,但 webhook 的注册本身是**宿主侧完成的**——沙箱只声明"我需要 `/webhook/feishu` 这个路径",宿主负责把它挂到 axum Router 上。
## 一句话
**飞书不知道 agent 在哪——是 DevOps 在飞书开放平台"事件订阅"页,手动填入 IronClaw 的公网 tunnel URL (`https://<tunnel>/webhook/feishu`),飞书发一个 challenge 验证通过后,才开始推送消息。** 没有任何自动发现或注册协议,纯手工配置。