简介
Awaken 是一个用 Rust 构建的模块化 AI 智能体运行时框架。它提供基于阶段的执行模型(含快照隔离与确定性重放)、带键作用域(thread / run)和合并策略(exclusive / commutative)的类型化状态引擎、用于可扩展性的插件生命周期系统,以及支持 AI SDK v6、AG-UI、A2A 和 MCP(HTTP 及 stdio)的多协议服务面,以及 ACP stdio 协议面。
Crate 概览
| Crate | 说明 |
|---|---|
awaken-contract | 核心契约:类型、trait、状态模型、智能体规约 |
awaken-runtime | 执行引擎:阶段循环、插件系统、智能体循环、构建器 |
awaken-server | HTTP/SSE 网关与协议适配器 |
awaken-stores | 存储后端:内存、文件、PostgreSQL |
awaken-tool-pattern | Glob/正则工具匹配,用于权限和提醒规则 |
awaken-ext-permission | 权限插件,支持 allow/deny/ask 策略 |
awaken-ext-observability | 基于 OpenTelemetry 的 LLM 和工具调用追踪 |
awaken-ext-mcp | Model Context Protocol 客户端集成 |
awaken-ext-skills | 技能包发现与激活 |
awaken-ext-reminder | 声明式提醒规则,在工具执行后触发 |
awaken-ext-generative-ui | 声明式 UI 组件(A2UI 协议) |
awaken-ext-deferred-tools | 基于概率模型的延迟工具加载 |
awaken | 门面 crate,重新导出核心模块 |
架构
应用代码
注册 tool / model / provider / plugin / agent spec
|
v
AgentRuntime
将 AgentSpec 解析为 ResolvedAgent
从插件构建 ExecutionEnv
执行 phase loop,并暴露 cancel / decision 控制面
|
v
服务与存储表面
HTTP 路由、SSE 回放、mailbox、协议适配器、thread/run 持久化
核心原则
所有状态访问遵循快照隔离。阶段钩子看到的是不可变快照;变更收集在 MutationBatch 中,在收敛后原子性地应用。
本书内容
- 快速上手 — 用最小可运行流程建立整体心智模型
- 构建 Agent — 添加 Tool、Plugin、MCP、Skills、Reminder、Handoff 和 UI 能力
- 服务与集成 — 暴露 HTTP 端点并接入 AI SDK 或 CopilotKit 前端
- 状态与存储 — 选择持久化、上下文裁剪和状态访问模式
- 运行与运维 — 用可观测性、权限、进度上报和测试加固生产行为
- 参考 — API、协议、配置和 Schema 查阅页面
- 架构 — 运行时分层、phase 执行与设计取舍
推荐阅读路径
如果你是第一次接触本项目,建议按以下顺序阅读:
- 先阅读 快速上手,并完成 第一个 Agent。
- 需要扩展能力时,进入 构建 Agent。
- 需要对接 HTTP 客户端或前端时,进入 服务与集成。
- 从演示走向生产时,阅读 状态与存储 和 运行与运维。
- 需要精确契约或运行时内部细节时,回到 参考概览 和 架构。
仓库导航
从文档进入代码时,以下路径最为重要:
| 路径 | 用途 |
|---|---|
crates/awaken-contract/ | 核心契约:工具、事件、状态接口 |
crates/awaken-runtime/ | 智能体运行时:执行引擎、插件、构建器 |
crates/awaken-server/ | HTTP/SSE 服务端 |
crates/awaken-stores/ | 存储后端 |
crates/awaken/examples/ | 小型运行时示例 |
examples/src/ | 全栈服务端示例 |
docs/book/src/ | 本文档源码 |
快速上手
如果你第一次接触 Awaken,先走这条路径,用最小可运行流程建立整体心智模型。
推荐顺序
- 阅读 第一个 Agent,先跑通最小 runtime。
- 阅读 第一个 Tool,理解 tool schema、执行流程和状态写入。
- 进入 构建 Agent,把示例整理成可复用的工程基线。
- 在写生产级工具前,补上 Tool Trait。
何时离开这条路径
- 需要更多 Agent 能力时,进入 构建 Agent 路径。
- 需要 HTTP 或前端集成时,进入 服务与集成。
- 需要持久化和运行控制时,进入 状态与存储 或 运行与运维。
第一个 Agent
目标
端到端运行一个智能体,并确认你收到完整的事件流。
前置条件
[dependencies]
awaken = { package = "awaken-agent", version = "0.1" }
tokio = { version = "1", features = ["full"] }
async-trait = "0.1"
serde_json = "1"
运行之前,请设置一个模型提供商的 API 密钥:
# OpenAI 兼容模型(用于 gpt-4o-mini)
export OPENAI_API_KEY=<your-key>
# 或 DeepSeek 模型
export DEEPSEEK_API_KEY=<your-key>
1. 创建 src/main.rs
use std::sync::Arc;
use serde_json::{json, Value};
use async_trait::async_trait;
use awaken::contract::tool::{Tool, ToolDescriptor, ToolResult, ToolOutput, ToolError, ToolCallContext};
use awaken::contract::message::Message;
use awaken::contract::event::AgentEvent;
use awaken::contract::event_sink::VecEventSink;
use awaken::engine::GenaiExecutor;
use awaken::registry_spec::AgentSpec;
use awaken::registry::ModelEntry;
use awaken::{AgentRuntimeBuilder, RunRequest};
struct EchoTool;
#[async_trait]
impl Tool for EchoTool {
fn descriptor(&self) -> ToolDescriptor {
ToolDescriptor::new("echo", "Echo", "Echo input back to the caller")
.with_parameters(json!({
"type": "object",
"properties": { "text": { "type": "string" } },
"required": ["text"]
}))
}
async fn execute(
&self,
args: Value,
_ctx: &ToolCallContext,
) -> Result<ToolOutput, ToolError> {
let text = args["text"].as_str().unwrap_or_default();
Ok(ToolResult::success("echo", json!({ "echoed": text })).into())
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let agent_spec = AgentSpec::new("assistant")
.with_model("gpt-4o-mini")
.with_system_prompt("You are a helpful assistant. Use the echo tool when asked.")
.with_max_rounds(5);
let runtime = AgentRuntimeBuilder::new()
.with_agent_spec(agent_spec)
.with_tool("echo", Arc::new(EchoTool))
.with_provider("openai", Arc::new(GenaiExecutor::new()))
.with_model("gpt-4o-mini", ModelEntry {
provider: "openai".into(),
model_name: "gpt-4o-mini".into(),
})
.build()?;
let request = RunRequest::new(
"thread-1",
vec![Message::user("Say hello using the echo tool")],
)
.with_agent_id("assistant");
let sink = Arc::new(VecEventSink::new());
runtime.run(request, sink.clone()).await?;
let events = sink.take();
println!("events: {}", events.len());
let finished = events
.iter()
.any(|e| matches!(e, AgentEvent::RunFinish { .. }));
println!("run_finish_seen: {}", finished);
Ok(())
}
2. 运行
cargo run
3. 验证
预期输出包括:
events: <n>,其中n > 0run_finish_seen: true
事件流将至少包含 RunStart、一个或多个 TextDelta 或 ToolCallStart/ToolCallDone 事件,以及最终的 RunFinish。
你创建了什么
本示例创建了一个进程内的 AgentRuntime 并立即执行一个请求。
核心对象是:
let runtime = AgentRuntimeBuilder::new()
.with_agent_spec(agent_spec)
.with_tool("echo", Arc::new(EchoTool))
.with_provider("openai", Arc::new(GenaiExecutor::new()))
.with_model("gpt-4o-mini", ModelEntry {
provider: "openai".into(),
model_name: "gpt-4o-mini".into(),
})
.build()?;
之后,标准入口点是:
let sink = Arc::new(VecEventSink::new());
runtime.run(request, sink.clone()).await?;
let events = sink.take();
常见使用模式:
- 一次性 CLI 程序:构造
RunRequest,通过VecEventSink收集事件,打印结果 - 应用服务:将
runtime.run(...)封装在你自己的应用逻辑中 - HTTP 服务器:将
Arc<AgentRuntime>存储在应用状态中,暴露协议路由
下一步阅读
根据你的需求选择下一页:
- 添加类型化状态和有状态工具:第一个 Tool
- 了解事件如何映射到智能体循环:事件参考
- 通过 HTTP 暴露智能体:暴露 HTTP SSE
常见错误
- 模型/提供商不匹配:
gpt-4o-mini需要兼容的 OpenAI 风格提供商配置。 - 缺少密钥:在
cargo run之前设置OPENAI_API_KEY或DEEPSEEK_API_KEY。 - 工具未被选中:确保提示词明确要求使用
echo。 - 没有
RunFinish事件:检查with_max_rounds设置是否足够高,以便模型完成执行。
下一步
第一个 Tool
目标
实现一个在执行时从 ToolCallContext 读取类型化状态的工具。
状态是可选的。 许多工具(API 调用、搜索、Shell 命令)不需要状态——只需实现
execute并返回ToolResult即可。
前置条件
[dependencies]
awaken = { package = "awaken-agent", version = "0.1" }
tokio = { version = "1", features = ["full"] }
async-trait = "0.1"
serde = { version = "1", features = ["derive"] }
serde_json = "1"
1. 定义 StateKey
StateKey 描述状态映射中的一个命名槽位,声明值类型、更新策略和生命周期范围。
use awaken::{StateKey, KeyScope, MergeStrategy};
/// 记录问候工具被调用的次数。
struct GreetCount;
impl StateKey for GreetCount {
const KEY: &'static str = "greet_count";
const MERGE: MergeStrategy = MergeStrategy::Commutative;
const SCOPE: KeyScope = KeyScope::Run;
type Value = u32;
type Update = u32;
fn apply(value: &mut Self::Value, update: Self::Update) {
*value += update;
}
}
fn main() {}
关键选项说明:
KeyScope::Run— 状态在每次运行开始时重置。使用KeyScope::Thread可跨运行持久化。MergeStrategy::Commutative— 并发更新安全。当只有一个写入方时使用Exclusive。apply定义Update如何修改当前Value,此处为递增。
2. 实现 Tool
该工具通过 ctx.state::<GreetCount>() 读取当前计数并返回个性化问候。
use awaken::{StateKey, KeyScope, MergeStrategy};
struct GreetCount;
impl StateKey for GreetCount {
const KEY: &'static str = "greet_count";
const MERGE: MergeStrategy = MergeStrategy::Commutative;
const SCOPE: KeyScope = KeyScope::Run;
type Value = u32;
type Update = u32;
fn apply(value: &mut Self::Value, update: Self::Update) { *value += update; }
}
use std::sync::Arc;
use async_trait::async_trait;
use serde_json::{json, Value};
use awaken::contract::tool::{Tool, ToolDescriptor, ToolResult, ToolOutput, ToolError, ToolCallContext};
struct GreetTool;
#[async_trait]
impl Tool for GreetTool {
fn descriptor(&self) -> ToolDescriptor {
ToolDescriptor::new("greet", "Greet", "Greet a user by name")
.with_parameters(json!({
"type": "object",
"properties": {
"name": { "type": "string", "description": "Name to greet" }
},
"required": ["name"]
}))
}
fn validate_args(&self, args: &Value) -> Result<(), ToolError> {
args["name"]
.as_str()
.filter(|s| !s.is_empty())
.ok_or_else(|| ToolError::InvalidArguments("name is required".into()))?;
Ok(())
}
async fn execute(
&self,
args: Value,
ctx: &ToolCallContext,
) -> Result<ToolOutput, ToolError> {
let name = args["name"].as_str().unwrap_or("world");
// 读取状态——如果该键尚未设置则返回 None。
let count = ctx.state::<GreetCount>().copied().unwrap_or(0);
Ok(ToolResult::success("greet", json!({
"greeting": format!("Hello, {}!", name),
"times_greeted": count,
})).into())
}
}
fn main() {}
3. 注册 Tool
use std::sync::Arc;
use async_trait::async_trait;
use serde_json::{json, Value};
use awaken::{StateKey, KeyScope, MergeStrategy};
use awaken::contract::tool::{Tool, ToolDescriptor, ToolResult, ToolOutput, ToolError, ToolCallContext};
struct GreetCount;
impl StateKey for GreetCount {
const KEY: &'static str = "greet_count";
const MERGE: MergeStrategy = MergeStrategy::Commutative;
const SCOPE: KeyScope = KeyScope::Run;
type Value = u32;
type Update = u32;
fn apply(value: &mut Self::Value, update: Self::Update) { *value += update; }
}
struct GreetTool;
#[async_trait]
impl Tool for GreetTool {
fn descriptor(&self) -> ToolDescriptor {
ToolDescriptor::new("greet", "Greet", "Greet a user by name")
}
async fn execute(&self, args: Value, ctx: &ToolCallContext) -> Result<ToolOutput, ToolError> {
Ok(ToolResult::success("greet", json!({})).into())
}
}
use awaken::registry_spec::AgentSpec;
use awaken::AgentRuntimeBuilder;
fn main() -> Result<(), Box<dyn std::error::Error>> {
let agent_spec = AgentSpec::new("assistant")
.with_model("gpt-4o-mini")
.with_system_prompt("You are a helpful assistant. Use the greet tool when asked.")
.with_max_rounds(5);
let runtime = AgentRuntimeBuilder::new()
.with_agent_spec(agent_spec)
.with_tool("greet", Arc::new(GreetTool))
.build()?;
Ok(())
}
4. 运行
use std::sync::Arc;
use async_trait::async_trait;
use serde_json::{json, Value};
use awaken::{StateKey, KeyScope, MergeStrategy};
use awaken::contract::tool::{Tool, ToolDescriptor, ToolResult, ToolOutput, ToolError, ToolCallContext};
use awaken::registry_spec::AgentSpec;
use awaken::AgentRuntimeBuilder;
struct GreetCount;
impl StateKey for GreetCount {
const KEY: &'static str = "greet_count";
const MERGE: MergeStrategy = MergeStrategy::Commutative;
const SCOPE: KeyScope = KeyScope::Run;
type Value = u32;
type Update = u32;
fn apply(value: &mut Self::Value, update: Self::Update) { *value += update; }
}
struct GreetTool;
#[async_trait]
impl Tool for GreetTool {
fn descriptor(&self) -> ToolDescriptor {
ToolDescriptor::new("greet", "Greet", "Greet a user by name")
}
async fn execute(&self, args: Value, ctx: &ToolCallContext) -> Result<ToolOutput, ToolError> {
Ok(ToolResult::success("greet", json!({})).into())
}
}
use awaken::contract::message::Message;
use awaken::contract::event_sink::VecEventSink;
use awaken::RunRequest;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let runtime = AgentRuntimeBuilder::new()
.with_agent_spec(AgentSpec::new("assistant").with_model("gpt-4o-mini"))
.with_tool("greet", Arc::new(GreetTool))
.build()?;
let request = RunRequest::new(
"thread-1",
vec![Message::user("Greet Alice")],
)
.with_agent_id("assistant");
let sink = Arc::new(VecEventSink::new());
runtime.run(request, sink.clone()).await?;
Ok(())
}
5. 验证
检查收集到的事件中是否包含 name == "greet" 的 ToolCallDone 事件:
use awaken::contract::event::AgentEvent;
let events = sink.take();
let tool_done = events.iter().any(|e| matches!(
e,
AgentEvent::ToolCallDone { id: _, message_id: _, result: _, outcome: _ }
));
println!("tool_call_done_seen: {}", tool_done);
预期结果:
tool_call_done_seen: trueToolCallDone中的result包含greeting和times_greeted字段。
你创建了什么
一个工具,它:
- 通过
descriptor()声明参数的 JSON Schema。 - 通过
validate_args()在执行前验证参数。 - 通过
ctx.state::<K>()从快照读取类型化状态。 - 通过
ToolResult::success()返回结构化 JSON。
StateKey trait 提供了类型安全的、有范围的状态管理,无需手动操作原始 JSON。
下一步阅读
- 了解完整工具生命周期:Tool Trait
- 添加跨运行管理状态的插件:添加插件
- 学习状态范围规则:State Keys
常见错误
ctx.state::<K>()返回None:该状态键在本次运行中尚未写入。对数值类型使用.unwrap_or_default()或.copied().unwrap_or(0)。StateError::KeyEncode/StateError::KeyDecode:Value类型无法通过 JSON 往返序列化。确保正确派生了Serialize和Deserialize。ToolError::InvalidArguments未被触发:运行时在execute之前调用validate_args。如果跳过验证,错误输入将到达execute并可能在.unwrap()处崩溃。- 范围不匹配:
KeyScope::Run状态在运行之间被清除。如果需要持久化,使用KeyScope::Thread。
构建 Agent 路径
当你已经理解基础运行流程,接下来就进入这条路径,把 Agent 能力逐步拼装完整。
推荐顺序
- 先读 构建 Agent,确定 runtime、model registry 和 agent spec 的骨架。
- 再读 添加 Tool 和 添加 Plugin,用安全的方式扩展行为。
- 需要发现与委托能力时,继续阅读 使用 Skills 子系统、使用 MCP Tools 和 使用 Agent Handoff。
- 需要更具体的能力时,再补上 使用 Reminder 插件、使用 Generative UI 和 使用延迟加载工具。
建议搭配阅读
- Tool Trait 用于核对精确契约。
- Tool 与 Plugin 的边界 用于判断扩展应该放在哪一层。
- 架构 用于理解完整运行时模型。
构建 Agent
当你需要把 agent spec、tools、provider 和持久化组装成一个可运行的 AgentRuntime 时,使用本页。
前置条件
- 已在
Cargo.toml中加入awaken - 已有一个
LlmExecutor实现 - 了解
AgentSpec和AgentRuntimeBuilder
步骤
- 定义 agent spec:
use awaken::engine::GenaiExecutor;
use awaken::{AgentSpec, AgentRuntimeBuilder, ModelSpec};
let spec = AgentSpec::new("assistant")
.with_model("claude-sonnet")
.with_system_prompt("You are a helpful assistant.")
.with_max_rounds(10);
- 注册 tools:
use std::sync::Arc;
let builder = AgentRuntimeBuilder::new()
.with_agent_spec(spec)
.with_tool("search", Arc::new(SearchTool))
.with_tool("calculator", Arc::new(CalculatorTool));
- 注册 provider 和 model:
let builder = builder
.with_provider("anthropic", Arc::new(GenaiExecutor::new()))
.with_model("claude-sonnet", ModelSpec {
id: "claude-sonnet".into(),
provider: "anthropic".into(),
model: "claude-sonnet-4-20250514".into(),
});
- 挂接持久化:
use awaken::stores::InMemoryStore;
let store = Arc::new(InMemoryStore::new());
let builder = builder.with_thread_run_store(store);
- 构建并校验:
let runtime = builder.build()?;
build() 会在启动时就解析并校验所有注册项,提前发现缺失的 model、provider 或 plugin。
- 执行一次 run:
use std::sync::Arc;
use awaken::RunRequest;
use awaken::contract::event_sink::VecEventSink;
let request = RunRequest::new("thread-1", vec![user_message])
.with_agent_id("assistant");
let sink = Arc::new(VecEventSink::new());
let handle = runtime.run(request, sink.clone()).await?;
验证
如果启用了 server,可访问 /health;否则直接检查 AgentRunResult 是否成功完成。
常见错误
| 错误 | 原因 | 修复 |
|---|---|---|
BuildError::ValidationFailed | spec 引用了未注册的 model/provider | 在 build() 前补齐注册 |
BuildError::State | 多个插件重复注册同一状态键 | 保证状态键只注册一次 |
运行期 RuntimeError | provider 推理失败 | 检查凭据和模型 ID |
相关示例
examples/src/research/
关键文件
crates/awaken-runtime/src/builder.rscrates/awaken-contract/src/registry_spec.rscrates/awaken-runtime/src/runtime/agent_runtime/mod.rs
相关
添加 Tool
当你需要给 agent 暴露一个自定义能力时,使用本页。
前置条件
Cargo.toml里已经加入awaken- 已添加
async-trait和serde_json
步骤
- 实现
Tooltrait。
use async_trait::async_trait;
use serde_json::{Value, json};
use awaken::contract::tool::{Tool, ToolCallContext, ToolDescriptor, ToolError, ToolResult, ToolOutput};
pub struct WeatherTool;
#[async_trait]
impl Tool for WeatherTool {
fn descriptor(&self) -> ToolDescriptor {
ToolDescriptor::new("get_weather", "Get Weather", "Fetch current weather for a city")
.with_parameters(json!({
"type": "object",
"properties": {
"city": {
"type": "string",
"description": "City name"
}
},
"required": ["city"]
}))
}
async fn execute(&self, args: Value, _ctx: &ToolCallContext) -> Result<ToolOutput, ToolError> {
let city = args["city"]
.as_str()
.ok_or_else(|| ToolError::InvalidArguments("Missing 'city'".into()))?;
let weather = fetch_weather(city).await?;
Ok(ToolResult::success("get_weather", json!({ "forecast": weather })).into())
}
}
- 如有需要,重写参数校验:
fn validate_args(&self, args: &Value) -> Result<(), ToolError> {
if !args.get("city").and_then(|v| v.as_str()).is_some_and(|s| !s.is_empty()) {
return Err(ToolError::InvalidArguments("'city' must be a non-empty string".into()));
}
Ok(())
}
validate_args 会在 execute 之前运行,可以让你提前拒绝格式错误的输入。
- 在 builder 中注册 tool:
use std::sync::Arc;
use awaken::AgentRuntimeBuilder;
let runtime = AgentRuntimeBuilder::new()
.with_tool("get_weather", Arc::new(WeatherTool))
.with_agent_spec(spec)
.with_provider("anthropic", Arc::new(provider))
.build()?;
with_tool 的字符串 ID 必须和 ToolDescriptor::new 里的 id 一致。
-
或者在插件中注册:
工具也可以在
Plugin::register方法中通过PluginRegistrar注册:
fn register(&self, registrar: &mut PluginRegistrar) -> Result<(), StateError> {
registrar.register_tool("get_weather", Arc::new(WeatherTool))?;
Ok(())
}
通过插件注册的工具仅对激活了该插件的 agent 可见。
验证
发送一条应当触发该 tool 的消息,确认 run 结果里出现了预期的 tool 调用和返回值。
常见错误
| 错误 | 原因 | 修复 |
|---|---|---|
ToolError::InvalidArguments | LLM 传了错误 JSON | 收紧参数 schema,给模型更明确约束 |
| tool 从未被调用 | descriptor 的 id 与注册 ID 不一致 | 保证两者完全一致 |
ToolError::ExecutionFailed | execute 内部运行时错误 | 返回清晰错误信息,让 agent 能据此调整 |
相关示例
examples/src/research/tools.rs
关键文件
crates/awaken-contract/src/contract/tool.rscrates/awaken-runtime/src/builder.rs
相关
添加 Plugin
当你需要通过 state key、phase hook、scheduled action 或 effect handler 扩展 agent 生命周期时,使用本页。
前置条件
- 已在
Cargo.toml中添加awaken - 已了解
Phase与StateKey
步骤
- 定义一个状态键:
use awaken::{StateKey, KeyScope, MergeStrategy, StateError, JsonValue};
use serde::{Serialize, Deserialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AuditLog {
pub entries: Vec<String>,
}
pub struct AuditLogKey;
impl StateKey for AuditLogKey {
type Value = AuditLog;
const KEY: &'static str = "audit_log";
const MERGE: MergeStrategy = MergeStrategy::Exclusive;
type Update = AuditLog;
fn apply(value: &mut Self::Value, update: Self::Update) {
*value = update;
}
fn encode(value: &Self::Value) -> Result<JsonValue, StateError> {
serde_json::to_value(value).map_err(|e| StateError::KeyEncode { key: Self::KEY.into(), message: e.to_string() })
}
fn decode(json: JsonValue) -> Result<Self::Value, StateError> {
serde_json::from_value(json).map_err(|e| StateError::KeyDecode { key: Self::KEY.into(), message: e.to_string() })
}
}
- 实现一个 phase hook:
use async_trait::async_trait;
use awaken::{PhaseHook, PhaseContext, StateCommand, StateError};
pub struct AuditHook;
#[async_trait]
impl PhaseHook for AuditHook {
async fn run(&self, ctx: &PhaseContext) -> Result<StateCommand, StateError> {
let mut log = ctx.state::<AuditLogKey>().cloned().unwrap_or(AuditLog {
entries: Vec::new(),
});
log.entries.push(format!("Phase executed at {:?}", ctx.phase));
let mut cmd = StateCommand::new();
cmd.update::<AuditLogKey>(log);
Ok(cmd)
}
}
- 实现
Plugintrait:
use awaken::{Plugin, PluginDescriptor, PluginRegistrar, Phase, StateError, StateKeyOptions, KeyScope};
pub struct AuditPlugin;
impl Plugin for AuditPlugin {
fn descriptor(&self) -> PluginDescriptor {
PluginDescriptor { name: "audit" }
}
fn register(&self, registrar: &mut PluginRegistrar) -> Result<(), StateError> {
registrar.register_key::<AuditLogKey>(StateKeyOptions {
scope: KeyScope::Run,
..Default::default()
})?;
registrar.register_phase_hook(
"audit",
Phase::AfterInference,
AuditHook,
)?;
Ok(())
}
}
- 在 runtime 上注册插件,并在 agent 上激活它:
use std::sync::Arc;
use awaken::engine::GenaiExecutor;
use awaken::{AgentSpec, AgentRuntimeBuilder, ModelSpec};
let spec = AgentSpec::new("assistant")
.with_model("claude-sonnet")
.with_system_prompt("You are a helpful assistant.")
.with_hook_filter("audit");
let runtime = AgentRuntimeBuilder::new()
.with_plugin("audit", Arc::new(AuditPlugin))
.with_agent_spec(spec)
.with_provider("anthropic", Arc::new(GenaiExecutor::new()))
.with_model("claude-sonnet", ModelSpec {
id: "claude-sonnet".into(),
provider: "anthropic".into(),
model: "claude-sonnet-4-20250514".into(),
})
.build()?;
with_hook_filter 会为该 agent 激活指定插件的 phase hook。
验证
运行 agent 后查看状态快照,确认 audit_log 中出现了 hook 写入的条目。
常见错误
| 错误 | 原因 | 修复 |
|---|---|---|
StateError::KeyAlreadyRegistered | 多个插件注册了同一个 key | 保证每个 StateKey::KEY 全局唯一 |
StateError::UnknownKey | 读取了未注册的状态键 | 确保注册该 key 的插件已激活 |
| hook 没有执行 | with_hook_filter 中没有对应插件 ID | 把插件 ID 加到 agent spec |
相关示例
crates/awaken-ext-observability/
关键文件
crates/awaken-runtime/src/plugins/lifecycle.rscrates/awaken-runtime/src/plugins/registry.rscrates/awaken-runtime/src/hooks/phase_hook.rs
相关
使用 Skills 子系统
当你希望 agent 在运行时发现、激活并按需加载技能包(指令、资源、脚本)时,使用本页。
前置条件
- 已有可运行的 awaken runtime
awaken启用了skills
[dependencies]
awaken = { package = "awaken-agent", version = "0.1", features = ["skills"] }
tokio = { version = "1", features = ["full"] }
serde_json = "1"
步骤
- 创建技能目录:
skills/
my-skill/
SKILL.md
another-skill/
SKILL.md
示例 skills/my-skill/SKILL.md:
---
name: my-skill
description: Helps with data analysis tasks
allowed-tools: read_file web_search
---
# My Skill
When this skill is active, focus on data analysis.
Use read_file to load datasets and web_search for reference material.
- 发现文件系统技能:
use std::sync::Arc;
use awaken::ext_skills::{FsSkill, InMemorySkillRegistry, SkillSubsystem};
let result = FsSkill::discover("./skills").expect("skill discovery failed");
for warning in &result.warnings {
eprintln!("skill warning: {warning:?}");
}
let skills = FsSkill::into_arc_skills(result.skills);
let registry = Arc::new(InMemorySkillRegistry::from_skills(skills));
- 也可以嵌入编译期技能:
use std::sync::Arc;
use awaken::ext_skills::{EmbeddedSkill, EmbeddedSkillData, InMemorySkillRegistry};
const SKILL_MD: &str = "\
---
name: builtin-skill
description: A compiled-in skill
allowed-tools: read_file
---
Builtin Skill
Follow these instructions when active.
";
let skill = EmbeddedSkill::new(&EmbeddedSkillData {
skill_md: SKILL_MD,
references: &[],
assets: &[],
}).expect("valid skill");
let registry = Arc::new(InMemorySkillRegistry::from_skills(vec![Arc::new(skill)]));
- 接入 runtime:
use std::sync::Arc;
use awaken::engine::GenaiExecutor;
use awaken::ext_skills::SkillSubsystem;
use awaken::registry_spec::{AgentSpec, ModelSpec};
use awaken::{AgentRuntimeBuilder, Plugin};
let subsystem = SkillSubsystem::new(registry);
let agent_spec = AgentSpec::new("skills-agent")
.with_model("gpt-4o-mini")
.with_system_prompt("Discover and activate skills when specialized help is useful.")
.with_hook_filter("skills-discovery")
.with_hook_filter("skills-active-instructions");
let runtime = AgentRuntimeBuilder::new()
.with_provider("openai", Arc::new(GenaiExecutor::new()))
.with_model(
"gpt-4o-mini",
ModelSpec {
id: "gpt-4o-mini".into(),
provider: "openai".into(),
model: "gpt-4o-mini".into(),
},
)
.with_agent_spec(agent_spec)
.with_plugin(
"skills-discovery",
Arc::new(subsystem.discovery_plugin()) as Arc<dyn Plugin>,
)
.with_plugin(
"skills-active-instructions",
Arc::new(subsystem.active_instructions_plugin()) as Arc<dyn Plugin>,
)
.build()
.expect("failed to build runtime");
SkillDiscoveryPlugin 会把技能目录注入到上下文中,并注册三个工具:
| Tool | 用途 |
|---|---|
skill | 按名称激活技能 |
load_skill_resource | 读取技能资源或引用资料 |
skill_script | 运行技能脚本 |
验证
- 运行 agent,让它列出可用 skills
- LLM 应该能看到技能目录,并调用
skill激活某个技能 - 激活后,后续推理会自动注入该 skill 的指令
常见错误
| 错误 | 原因 | 修复 |
|---|---|---|
| 没发现技能 | 目录结构不对 | 每个技能必须位于子目录里,并有 SKILL.md |
SkillMaterializeError | frontmatter 无效 | 至少提供 name 和 description |
| skill tools 不可用 | 相关插件未注册 | 同时注册 discovery 和 active-instructions 两个插件 |
| feature 不存在 | Cargo 没开 skills | 在 Cargo.toml 中启用 |
相关示例
crates/awaken-ext-skills/tests/
关键文件
crates/awaken-ext-skills/src/lib.rscrates/awaken-ext-skills/src/registry.rscrates/awaken-ext-skills/src/plugin/subsystem.rscrates/awaken-ext-skills/src/plugin/discovery.rscrates/awaken-ext-skills/src/embedded.rscrates/awaken-ext-skills/src/tools.rs
相关
使用 MCP Tools
当你想连接外部 Model Context Protocol(MCP)server,并把它们的工具暴露给 awaken agent 时,使用本页。
前置条件
- 已有可运行的 awaken runtime
awaken启用了mcp- 有一个可连接的 MCP server(stdio 或 HTTP/SSE)
[dependencies]
awaken = { package = "awaken-agent", version = "0.1", features = ["mcp"] }
tokio = { version = "1", features = ["full"] }
serde_json = "1"
步骤
- 配置 MCP server 连接:
use awaken::ext_mcp::McpServerConnectionConfig;
let stdio_config = McpServerConnectionConfig::stdio(
"my-mcp-server",
"node",
vec!["server.js".into()],
);
let http_config = McpServerConnectionConfig::http(
"remote-server",
"http://localhost:8080/sse",
);
- 创建 registry manager 并发现工具:
use awaken::ext_mcp::McpToolRegistryManager;
let manager = McpToolRegistryManager::connect(vec![stdio_config, http_config])
.await
.expect("failed to connect MCP servers");
let registry = manager.registry();
for id in registry.ids() {
println!("discovered: {id}");
}
MCP 工具的 ID 格式通常是 mcp__{server}__{tool}。
- 把工具注册进 runtime:
use std::sync::Arc;
use awaken::engine::GenaiExecutor;
use awaken::ext_mcp::McpPlugin;
use awaken::registry_spec::{AgentSpec, ModelSpec};
use awaken::{AgentRuntimeBuilder, Plugin};
let agent_spec = AgentSpec::new("mcp-agent")
.with_model("gpt-4o-mini")
.with_system_prompt("Use MCP tools when they help answer the user.")
.with_hook_filter("mcp");
let mut builder = AgentRuntimeBuilder::new()
.with_provider("openai", Arc::new(GenaiExecutor::new()))
.with_model(
"gpt-4o-mini",
ModelSpec {
id: "gpt-4o-mini".into(),
provider: "openai".into(),
model: "gpt-4o-mini".into(),
},
)
.with_agent_spec(agent_spec)
.with_plugin("mcp", Arc::new(McpPlugin) as Arc<dyn Plugin>);
for (id, tool) in registry.snapshot() {
builder = builder.with_tool(&id, tool);
}
let runtime = builder.build().expect("failed to build runtime");
- 如有需要,开启周期性刷新:
use std::time::Duration;
manager.start_periodic_refresh(Duration::from_secs(60));
验证
- 运行 agent,并让它调用来自 MCP server 的工具
- 检查后端日志里的 MCP 工具调用
- 返回结果中应带有
mcp.server与mcp.tool元数据
常见错误
| 错误 | 原因 | 修复 |
|---|---|---|
McpError::TransportError | MCP server 未启动或不可达 | 检查进程和 URL / 命令 |
| 没发现任何工具 | server 返回空工具列表 | 确认 server 实现了 tools/list |
| 调用超时 | server 响应太慢 | 调大 transport timeout |
| feature 不存在 | 没开 cargo feature | 启用 mcp |
找不到 mcp__server__tool | 发现的工具没注册到 builder | 遍历 registry 并逐个 with_tool |
相关示例
crates/awaken-ext-mcp/tests/
关键文件
crates/awaken-ext-mcp/src/lib.rscrates/awaken-ext-mcp/src/manager.rscrates/awaken-ext-mcp/src/config.rscrates/awaken-ext-mcp/src/plugin.rscrates/awaken-ext-mcp/src/transport.rscrates/awaken-ext-mcp/tests/mcp_tests.rs
相关
使用 Reminder 插件
当你希望 agent 在工具执行后,根据模式匹配自动收到上下文提示时,使用本页。例如:修改 .toml 后提醒执行 cargo check,或在危险命令后给出警告。
前置条件
- 已有可运行的 awaken runtime
awaken启用了reminder
[dependencies]
awaken = { package = "awaken-agent", version = "0.1", features = ["reminder"] }
tokio = { version = "1", features = ["full"] }
serde_json = "1"
步骤
- 用规则注册 reminder 插件:
use std::sync::Arc;
use awaken::engine::GenaiExecutor;
use awaken::ext_reminder::{ReminderPlugin, ReminderRulesConfig};
use awaken::registry_spec::{AgentSpec, ModelSpec};
use awaken::{AgentRuntimeBuilder, Plugin};
let json = r#"{
"rules": [
{
"tool": "Bash(command ~ 'rm *')",
"output": { "status": "success" },
"message": {
"target": "suffix_system",
"content": "A deletion command just succeeded. Verify the result."
}
}
]
}"#;
let config = ReminderRulesConfig::from_str(json, Some("json"))
.expect("failed to parse reminder config");
let rules = config.into_rules().expect("invalid rules");
let agent_spec = AgentSpec::new("my-agent")
.with_model("claude-sonnet")
.with_system_prompt("You are a helpful assistant.")
.with_hook_filter("reminder");
let runtime = AgentRuntimeBuilder::new()
.with_provider("anthropic", Arc::new(GenaiExecutor::new()))
.with_model(
"claude-sonnet",
ModelSpec {
id: "claude-sonnet".into(),
provider: "anthropic".into(),
model: "claude-3-7-sonnet-latest".into(),
},
)
.with_agent_spec(agent_spec)
.with_plugin("reminder", Arc::new(ReminderPlugin::new(rules)) as Arc<dyn Plugin>)
.build()
.expect("failed to build runtime");
该插件会在 AfterToolExecute 阶段检查工具名、参数和结果;一旦命中规则,就会调度 AddContextMessage 注入提示。
- 用工具模式定义规则:
| Pattern | 匹配 |
|---|---|
"Bash" | 精确工具名 |
"*" | 任意工具 |
"mcp__*" | 所有 MCP 工具 |
"Bash(command ~ 'rm *')" | 主参数 glob |
"Edit(file_path ~ '*.toml')" | 命名字段 glob |
- 配置 output 匹配:
output 可以是:
"any"{ "status": ..., "content": ... }
status 支持:"success"、"error"、"pending"、"any"
content 支持两类:
- 文本 glob
- JSON 字段匹配
- 选择消息注入目标:
| Target | 位置 |
|---|---|
"system" | system prompt 前部 |
"suffix_system" | system prompt 后部 |
"session" | session 级 system message |
"conversation" | conversation 级 system message |
- 用 cooldown 避免重复注入:
{
"message": {
"target": "system",
"content": "Remember to be careful with file operations.",
"cooldown_turns": 5
}
}
- 也可以从文件加载规则:
use awaken::ext_reminder::ReminderRulesConfig;
let config = ReminderRulesConfig::from_file("reminders.json")
.expect("failed to load reminder config");
- 在 agent spec 上激活插件:
let agent_spec = AgentSpec::new("my-agent")
.with_model("claude-sonnet")
.with_system_prompt("You are a helpful assistant.")
.with_hook_filter("reminder");
验证
- 配一个容易命中的规则,例如
"*"+"any" - 运行 agent 并调用一个工具
- 打开 debug tracing,应该看到 reminder 命中的日志
- 确认下一轮推理 prompt 中出现了提醒消息
常见错误
| 错误 | 原因 | 修复 |
|---|---|---|
InvalidPattern | 工具模式写错 | 按 DSL 语法检查引号和通配规则 |
InvalidTarget | 消息目标无效 | 只能用 system / suffix_system / session / conversation |
InvalidOutput | output 结构无效 | 使用 "any" 或结构化对象 |
InvalidOp | 字段匹配操作符未知 | 使用 glob / exact / regex / not_* 系列 |
| 规则从不触发 | 插件没激活 | 在 agent spec 上加 with_hook_filter("reminder") |
| 规则触发太频繁 | 没设置 cooldown | 设置正数 cooldown_turns |
相关示例
crates/awaken-ext-reminder/src/config.rs
关键文件
| 路径 | 作用 |
|---|---|
crates/awaken-ext-reminder/src/lib.rs | 模块根及公共再导出 |
crates/awaken-ext-reminder/src/config.rs | ReminderRulesConfig、JSON 加载、ReminderConfigKey |
crates/awaken-ext-reminder/src/rule.rs | ReminderRule 结构定义 |
crates/awaken-ext-reminder/src/output_matcher.rs | OutputMatcher、ContentMatcher、状态/内容匹配逻辑 |
crates/awaken-ext-reminder/src/plugin/plugin.rs | ReminderPlugin 注册(AfterToolExecute hook) |
crates/awaken-ext-reminder/src/plugin/hook.rs | ReminderHook——每次工具调用的模式与输出评估 |
crates/awaken-tool-pattern/ | 共享 glob/regex 模式匹配库 |
相关
- 启用工具权限 HITL – 使用相同的工具模式 DSL
- 添加 Plugin
- 构建 Agent
使用 Generative UI(A2UI)
当你希望 agent 把声明式 UI 组件发送给前端,而不是只返回文本时,使用本页。
前置条件
- 已有可运行的 runtime
- 前端能够消费 A2UI 消息(例如 CopilotKit 或 AI SDK 集成)
- 前端已经注册了组件目录(catalog)
[dependencies]
awaken = { package = "awaken-agent", version = "0.1" }
tokio = { version = "1", features = ["full"] }
serde_json = "1"
步骤
- 注册 A2UI 插件:
use std::sync::Arc;
use awaken::engine::GenaiExecutor;
use awaken::ext_generative_ui::A2uiPlugin;
use awaken::registry_spec::{AgentSpec, ModelSpec};
use awaken::{AgentRuntimeBuilder, Plugin};
let plugin = A2uiPlugin::with_catalog_id("my-catalog");
let agent_spec = AgentSpec::new("ui-agent")
.with_model("gpt-4o-mini")
.with_system_prompt("Render structured UI when visual output helps.")
.with_hook_filter("generative-ui");
let runtime = AgentRuntimeBuilder::new()
.with_provider("openai", Arc::new(GenaiExecutor::new()))
.with_model(
"gpt-4o-mini",
ModelSpec {
id: "gpt-4o-mini".into(),
provider: "openai".into(),
model: "gpt-4o-mini".into(),
},
)
.with_agent_spec(agent_spec)
.with_plugin("generative-ui", Arc::new(plugin) as Arc<dyn Plugin>)
.build()
.expect("failed to build runtime");
插件会注册一个 render_a2ui 工具,LLM 通过它把 A2UI 消息数组发给前端。
- 理解 A2UI v0.8 消息类型:
| 消息类型 | 作用 |
|---|---|
createSurface | 创建渲染 surface |
updateComponents | 定义或更新组件树 |
updateDataModel | 写入或更新数据模型 |
deleteSurface | 删除 surface |
消息顺序通常是:先创建 surface,再定义组件,最后填充数据。
- 创建 surface:
let messages = serde_json::json!({
"messages": [
{
"version": "v0.8",
"createSurface": {
"surfaceId": "order-form-1",
"catalogId": "my-catalog"
}
}
]
});
- 定义组件树:
组件列表是扁平的,通过 child / children 表示父子关系。必须有一个 "id": "root" 作为入口。
- 用 JSON path 绑定数据:
组件属性里可以写 {"path": "/json/pointer"},前端会在渲染时从 data model 里解析。
- 删除 surface:
let messages = serde_json::json!({
"messages": [
{
"version": "v0.8",
"deleteSurface": {
"surfaceId": "order-form-1"
}
}
]
});
- 一次 tool call 可以携带多条消息:
render_a2ui 接收的是一个消息数组,所以可以在一次调用中同时创建 surface、更新组件树和写入数据。
- 自定义插件指令:
let plugin = A2uiPlugin::with_catalog_and_examples(
"my-catalog",
"Example: create a card with a title and a button..."
);
let plugin = A2uiPlugin::with_custom_instructions(
"You can render UI by calling render_a2ui...".to_string()
);
验证
- 注册插件后,给 agent 一个“请以可视化方式展示内容”的提示
- 确认 agent 调用了
render_a2ui - 事件流里应出现成功结果:
{"a2ui": [...], "rendered": true} - 前端上应看到对应 surface 和组件
常见错误
| 错误 | 原因 | 修复 |
|---|---|---|
缺少 messages 字段 | tool 调用格式不对 | 传 {"messages": [...]} |
messages array must not be empty | 消息数组为空 | 至少传一条 A2UI 消息 |
unsupported version | 版本不是 v0.8 | 每条消息都设为 "version": "v0.8" |
| 单条消息里混入多个类型键 | 一条消息同时含 createSurface 和 updateComponents 等 | 一条消息只允许一个类型键 |
组件缺少 id 或 component | 组件结构不完整 | 补齐必需字段 |
| LLM 不调用工具 | 插件已注册但未激活或 prompt 指令不足 | 检查 hook filter 和插件指令 |
相关示例
crates/awaken-ext-generative-ui/src/a2ui/tests.rs
关键文件
crates/awaken-ext-generative-ui/src/a2ui/mod.rscrates/awaken-ext-generative-ui/src/a2ui/plugin.rscrates/awaken-ext-generative-ui/src/a2ui/tool.rscrates/awaken-ext-generative-ui/src/a2ui/types.rscrates/awaken-ext-generative-ui/src/a2ui/validation.rs
相关
使用 Agent Handoff
当你需要在同一个 run、同一条 thread 内,动态切换 agent 的 system prompt、model 或工具集,而不终止 run 时,使用本页。
前置条件
- 已添加
awaken - 了解
Plugin、StateKey和AgentRuntimeBuilder
概览
handoff 的核心不是启动一个全新 agent run,而是在当前 run 内应用 AgentOverlay,覆盖基础 agent 的一部分配置。
关键类型:
HandoffPluginAgentOverlayHandoffStateHandoffAction
步骤
- 定义 overlay:
use awaken::extensions::handoff::AgentOverlay;
let researcher = AgentOverlay {
system_prompt: Some("You are a research specialist. Find and cite sources.".into()),
model: Some("claude-sonnet".into()),
allowed_tools: Some(vec!["web_search".into(), "read_document".into()]),
excluded_tools: None,
};
let writer = AgentOverlay {
system_prompt: Some("You are a technical writer. Produce clear documentation.".into()),
model: None,
allowed_tools: None,
excluded_tools: Some(vec!["web_search".into()]),
};
model 字段使用的也是模型注册表 ID。
- 用 overlays 构建
HandoffPlugin:
use std::collections::HashMap;
use awaken::extensions::handoff::HandoffPlugin;
let mut overlays = HashMap::new();
overlays.insert("researcher".to_string(), researcher);
overlays.insert("writer".to_string(), writer);
let handoff = HandoffPlugin::new(overlays);
- 把插件注册进 runtime:
use std::sync::Arc;
use awaken::AgentRuntimeBuilder;
let runtime = AgentRuntimeBuilder::new()
.with_plugin("agent_handoff", Arc::new(handoff))
.with_agent_spec(spec)
.with_provider("anthropic", Arc::new(provider))
.build()?;
- 在 tool 或 hook 中请求 handoff:
use awaken::extensions::handoff::{request_handoff, activate_handoff, clear_handoff, ActiveAgentKey};
use awaken::state::StateCommand;
let mut cmd = StateCommand::new();
cmd.update::<ActiveAgentKey>(request_handoff("researcher"));
let mut cmd = StateCommand::new();
cmd.update::<ActiveAgentKey>(activate_handoff("writer"));
let mut cmd = StateCommand::new();
cmd.update::<ActiveAgentKey>(clear_handoff());
- 从插件状态里读取 overlay:
let overlay = handoff.overlay("researcher");
它是如何工作的
HandoffState 有两部分:
active_agentrequested_agent
内部同步 hook 会在 RunStart 和 StepEnd 检测 requested_agent,并在安全边界上把它提升为 active_agent。
Handoff vs Delegation
| Handoff | Delegation | |
|---|---|---|
| Thread | 同一 thread、同一 run | 通常会产生子 agent 执行上下文 |
| 状态 | 共享,原地覆盖 | 一般隔离 |
| 适用场景 | 切换角色、人设或工具集 | 拆分独立子任务 |
| 开销 | 很低 | 更高 |
常见错误
| 错误 | 原因 | 修复 |
|---|---|---|
| overlay 没生效 | request_handoff 的名字和 overlays map 不一致 | 保证字符串完全一致 |
StateError::KeyAlreadyRegistered | 其他插件也注册了 ActiveAgentKey | 每个 runtime 只保留一个 HandoffPlugin |
| hook 没有执行 | 插件未激活 | 把 "agent_handoff" 加到 hook filter |
关键文件
crates/awaken-runtime/src/extensions/handoff/mod.rscrates/awaken-runtime/src/extensions/handoff/plugin.rscrates/awaken-runtime/src/extensions/handoff/types.rscrates/awaken-runtime/src/extensions/handoff/state.rscrates/awaken-runtime/src/extensions/handoff/action.rs
相关
使用延迟加载工具
当你的代理拥有大量工具,且希望通过在 LLM 需要时才暴露工具 schema 来减少上下文窗口占用时,可以使用此功能。延迟加载工具插件将工具分为 Eager(始终发送)和 Deferred(隐藏直到被请求)两类。LLM 可通过 ToolSearch 工具按需发现延迟加载的工具。
前置条件
- 一个可运行的 awaken 代理运行时(参见 第一个代理)
awaken-ext-deferred-toolscrate
[dependencies]
awaken-ext-deferred-tools = { version = "0.1" }
awaken = { package = "awaken-agent", version = "0.1" }
tokio = { version = "1", features = ["full"] }
serde_json = "1"
步骤
- 创建插件并注册。
收集代理暴露的所有工具描述符,然后将它们传给 DeferredToolsPlugin::new。插件会在激活时使用这些描述符进行工具分类并填充延迟注册表。
use std::sync::Arc;
use awaken::engine::GenaiExecutor;
use awaken::registry_spec::{AgentSpec, ModelSpec};
use awaken::{AgentRuntimeBuilder, Plugin};
use awaken_ext_deferred_tools::DeferredToolsPlugin;
// Collect descriptors from all tools registered on the agent.
let seed_tools = vec![
weather_tool.descriptor(),
search_tool.descriptor(),
debug_tool.descriptor(),
// ... all tool descriptors
];
let agent_spec = AgentSpec::new("deferred-agent")
.with_model("gpt-4o-mini")
.with_system_prompt("Search for tools only when needed.")
.with_hook_filter("ext-deferred-tools");
let runtime = AgentRuntimeBuilder::new()
.with_provider("openai", Arc::new(GenaiExecutor::new()))
.with_model(
"gpt-4o-mini",
ModelSpec {
id: "gpt-4o-mini".into(),
provider: "openai".into(),
model: "gpt-4o-mini".into(),
},
)
.with_agent_spec(agent_spec)
.with_plugin(
"ext-deferred-tools",
Arc::new(DeferredToolsPlugin::new(seed_tools)) as Arc<dyn Plugin>,
)
.build()
.expect("failed to build runtime");
- 配置工具加载规则。
通过代理规格中的 deferred_tools 配置键设置规则。规则按顺序求值,首次匹配即生效。未匹配任何规则的工具将回退到 default_mode。
use awaken_ext_deferred_tools::{
DeferredToolsConfig, DeferralRule, ToolLoadMode,
};
let config = DeferredToolsConfig {
rules: vec![
DeferralRule { tool: "get_weather".into(), mode: ToolLoadMode::Eager },
DeferralRule { tool: "debug_*".into(), mode: ToolLoadMode::Deferred },
],
default_mode: ToolLoadMode::Deferred,
..Default::default()
};
tool 字段支持精确名称和 glob 模式(通过 wildcard_match 实现)。常见模式如下:
| 模式 | 匹配对象 |
|---|---|
get_weather | 精确工具 ID |
debug_* | 所有以 debug_ 开头的工具 |
mcp__github__* | 所有 GitHub MCP 工具 |
- 了解自动启用机制。
DeferredToolsConfig 上的 enabled 字段控制激活行为:
| 值 | 行为 |
|---|---|
Some(true) | 始终启用延迟加载工具 |
Some(false) | 始终禁用 |
None(默认) | 当所有延迟工具的总 token 节省量超过 beta_overhead(默认 1136 tokens)时自动启用 |
在自动启用模式下,插件将每个工具的 schema 成本估算为 len(parameters_json) / 4 tokens,并对所有可延迟工具的节省量求和。如果总节省量超过维护 ToolSearch 工具及上下文中延迟工具列表的开销,则自动激活延迟加载。
- 了解 ToolSearch 的工作原理。
插件会自动注册一个 ToolSearch 工具。LLM 通过传入查询字符串来发现延迟加载的工具:
| 查询格式 | 行为 |
|---|---|
"select:Tool1,Tool2" | 按精确 ID 获取指定工具 |
"+required rest terms" | 要求包含某个关键词,按其余词项排序 |
"plain keywords" | 在 id、名称、描述中进行通用关键词搜索 |
当 ToolSearch 返回结果时,匹配的工具会在当前会话的后续过程中被提升为 Eager。该工具最多返回 max_results(默认 5)个匹配的工具 schema,格式为 <functions> 块。
LLM: I need to check the weather. Let me search for relevant tools.
-> calls ToolSearch { query: "weather forecast" }
ToolSearch returns matching schemas, promotes get_weather to Eager.
Next inference: get_weather schema is included in the tool list.
- 配置自动重新延迟(DiscBeta)。
插件使用折扣 Beta 分布跟踪每个工具的使用统计。通过 ToolSearch 提升但不再被使用的工具会自动重新延迟。此过程是自适应的,无需手动调优。
重新延迟在以下条件全部满足时触发:
- 工具当前为 Eager(从 Deferred 提升而来)
- 工具未在规则中配置为始终 Eager
- 工具已空闲至少
defer_after轮 - 后验上可信区间(90%)低于
breakeven_p * thresh_mult
盈亏平衡频率为 (c - c_bar) / gamma,其中 c 为完整 schema 成本,c_bar 为仅名称成本。只有当工具的使用频率足够高,使得避免 ToolSearch 调用带来的节省超过每轮开销时,保持 Eager 才是划算的。
DiscBetaParams 的关键参数:
| 参数 | 默认值 | 用途 |
|---|---|---|
omega | 0.95 | 每轮折扣因子。有效记忆约为 1/(1-omega) = 20 轮 |
n0 | 5.0 | 先验强度,以等价观测数表示 |
defer_after | 5 | 考虑重新延迟前的最小空闲轮数 |
thresh_mult | 0.5 | 盈亏平衡频率的延迟阈值乘数 |
gamma | 2000.0 | ToolSearch 调用的估计 token 成本 |
这些参数位于 DeferredToolsConfig.disc_beta 下:
use awaken_ext_deferred_tools::{DeferredToolsConfig, DiscBetaParams};
let config = DeferredToolsConfig {
disc_beta: DiscBetaParams {
omega: 0.95,
n0: 5.0,
defer_after: 5,
thresh_mult: 0.5,
gamma: 2000.0,
},
beta_overhead: 1136.0,
..Default::default()
};
- 启用跨会话学习。
通过 AgentToolPriors(一个 ProfileKey),使用频率通过 EWMA(指数加权移动平均)在会话间持久化。会话结束时,PersistPriorsHook 将每个工具的存在频率写入配置存储;下次会话开始时,LoadPriorsHook 读取这些数据,并用学习到的先验(而非默认的 0.01)初始化 Beta 分布。
这需要在运行时配置 ProfileStore。除了插件注册外无需额外代码——钩子会自动接入。
EWMA 平滑因子为 lambda = max(0.1, 1/(n+1)),其中 n 为会话计数。早期会话贡献相等;10 次会话后因子稳定在 0.1,即 90% 的权重来自历史数据。
验证
-
运行代理并触发一次推理。检查日志中的
deferred_tools.list上下文消息,其中列出了所有延迟加载的工具名称。 -
从运行时快照中读取
DeferralState,查看每个工具的当前模式:
use awaken_ext_deferred_tools::state::{DeferralState, DeferralStateValue};
let state: &DeferralStateValue = snapshot.state::<DeferralState>()
.expect("DeferralState not found");
for (tool_id, mode) in &state.modes {
println!("{tool_id}: {mode:?}");
}
-
向 LLM 提一个需要使用延迟工具的问题。确认
ToolSearch被调用,且该工具在后续轮次中被提升为 Eager。 -
经过数轮不活跃后,通过检查快照中工具是否恢复为
Deferred模式来验证重新延迟功能。
常见错误
| 症状 | 原因 | 修复方法 |
|---|---|---|
| 所有工具都发送给 LLM(无延迟加载) | enabled: Some(false) 或总节省量低于 beta_overhead | 设置 enabled: Some(true) 或添加更多工具使节省量超过开销 |
| 插件已注册但无工具被延迟 | 所有规则都解析为 Eager | 设置 default_mode: ToolLoadMode::Deferred 或添加 Deferred 规则 |
| LLM 无法使用 ToolSearch | 插件未注册 | 使用种子工具描述符注册 DeferredToolsPlugin |
| 工具从未被重新延迟 | defer_after 过高或工具使用频繁 | 降低 defer_after 或增大 thresh_mult |
| 跨会话先验未加载 | 未配置 ProfileStore | 在运行时接入配置存储 |
| ToolSearch 无返回结果 | 工具不在延迟注册表中 | 检查该工具是否包含在传给插件的 seed_tools 列表中 |
关键文件
| 路径 | 用途 |
|---|---|
crates/awaken-ext-deferred-tools/src/lib.rs | 模块根及公共再导出 |
crates/awaken-ext-deferred-tools/src/config.rs | DeferredToolsConfig、DeferralRule、ToolLoadMode、DiscBetaParams |
crates/awaken-ext-deferred-tools/src/plugin/plugin.rs | DeferredToolsPlugin 注册 |
crates/awaken-ext-deferred-tools/src/plugin/hooks.rs | 阶段钩子(BeforeInference、AfterToolExecute、AfterInference、RunStart、RunEnd) |
crates/awaken-ext-deferred-tools/src/tool_search.rs | ToolSearchTool 实现与查询解析 |
crates/awaken-ext-deferred-tools/src/policy.rs | ConfigOnlyPolicy 与 DiscBetaEvaluator |
crates/awaken-ext-deferred-tools/src/state.rs | 状态键:DeferralState、DeferralRegistry、DiscBetaState、ToolUsageStats、AgentToolPriors |
相关文档
服务与集成
这条路径面向需要把本地 runtime 暴露给其他系统调用的场景。
从这里开始
- 阅读 通过 SSE 暴露 HTTP,先把 runtime 放到 HTTP 和流式端点后面。
- 阅读 集成 AI SDK 前端,对接 React + AI SDK v6。
- 阅读 集成 CopilotKit(AG-UI),对接 CopilotKit 前端。
建议同时查阅
通过 SSE 暴露 HTTP
当你需要通过 HTTP + Server-Sent Events 对外提供 agent 服务,并挂上多种协议适配器(AI SDK、AG-UI、A2A、MCP)时,使用本页。
前置条件
awaken启用了serverfeaturetokio启用了rt-multi-thread和signal- 已构建好一个
AgentRuntime
步骤
- 添加依赖:
[dependencies]
awaken = { package = "awaken-agent", version = "...", features = ["server"] }
tokio = { version = "1", features = ["rt-multi-thread", "macros", "signal"] }
- 构建 runtime:
use std::sync::Arc;
use awaken::engine::GenaiExecutor;
use awaken::{AgentRuntimeBuilder, AgentSpec, ModelSpec};
use awaken::stores::InMemoryStore;
let store = Arc::new(InMemoryStore::new());
let runtime = AgentRuntimeBuilder::new()
.with_agent_spec(
AgentSpec::new("assistant")
.with_model("claude-sonnet")
.with_system_prompt("You are a helpful assistant."),
)
.with_tool("search", Arc::new(SearchTool))
.with_provider("anthropic", Arc::new(GenaiExecutor::new()))
.with_model("claude-sonnet", ModelSpec {
id: "claude-sonnet".into(),
provider: "anthropic".into(),
model: "claude-sonnet-4-20250514".into(),
})
.with_thread_run_store(store.clone())
.build()?;
let runtime = Arc::new(runtime);
- 创建应用状态:
use awaken::server::app::{AppState, ServerConfig};
use awaken::server::mailbox::{Mailbox, MailboxConfig};
use awaken::stores::InMemoryMailboxStore;
let mailbox_store = Arc::new(InMemoryMailboxStore::new());
let mailbox = Arc::new(Mailbox::new(
runtime.clone(),
mailbox_store,
"default-consumer".to_string(),
MailboxConfig::default(),
));
let state = AppState::new(
runtime.clone(),
mailbox,
store,
runtime.resolver_arc(),
ServerConfig::default(),
);
- 构建 router:
use awaken::server::routes::build_router;
let app = build_router().with_state(state);
build_router() 会注册:
/health/v1/threads/v1/runs/v1/config/*和/v1/capabilities- AI SDK v6、AG-UI、A2A、MCP 路由
- 启动服务:
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await?;
axum::serve(listener, app).await?;
- 配置 SSE buffer:
let config = ServerConfig {
address: "0.0.0.0:8080".into(),
sse_buffer_size: 128,
..ServerConfig::default()
};
验证
curl http://localhost:3000/health
应返回 200 OK。然后可以创建 thread 并启动 run。
常见错误
| 错误 | 原因 | 修复 |
|---|---|---|
| 端口已占用 | 3000 已被其他进程使用 | 改 ServerConfig.address 或 TcpListener::bind |
| SSE 立即断开 | 客户端不支持 text/event-stream | 用 curl --no-buffer 或标准 SSE 客户端 |
| 路由缺失 | 没有启用 server feature | 确保 awaken 开启 features = ["server"] |
相关示例
crates/awaken-server/tests/run_api.rs
关键文件
crates/awaken-server/src/app.rscrates/awaken-server/src/routes.rscrates/awaken-server/src/http_sse.rscrates/awaken-server/src/mailbox.rs
相关
集成 AI SDK 前端
当你有一个基于 Vercel AI SDK v6 的 React 前端,并希望把它接到 awaken agent server 上时,使用本页。
前置条件
- 已有可运行的 awaken runtime
awaken启用了server- Node.js 项目中已安装
@ai-sdk/react
[dependencies]
awaken = { package = "awaken-agent", version = "0.1", features = ["server"] }
tokio = { version = "1", features = ["full"] }
async-trait = "0.1"
serde_json = "1"
tracing-subscriber = "0.3"
步骤
- 先启动后端 server:
use std::sync::Arc;
use awaken::engine::GenaiExecutor;
use awaken::contract::storage::ThreadRunStore;
use awaken::registry_spec::{AgentSpec, ModelSpec};
use awaken::stores::{InMemoryMailboxStore, InMemoryStore};
use awaken::AgentRuntimeBuilder;
use awaken::server::app::{AppState, ServerConfig};
use awaken::server::mailbox::{Mailbox, MailboxConfig};
use awaken::server::routes::build_router;
#[tokio::main]
async fn main() {
tracing_subscriber::fmt().with_target(true).init();
let agent_spec = AgentSpec::new("my-agent")
.with_model("gpt-4o-mini")
.with_system_prompt("You are a helpful assistant.")
.with_max_rounds(10);
let runtime = AgentRuntimeBuilder::new()
.with_provider("openai", Arc::new(GenaiExecutor::new()))
.with_model(
"gpt-4o-mini",
ModelSpec {
id: "gpt-4o-mini".into(),
provider: "openai".into(),
model: "gpt-4o-mini".into(),
},
)
.with_agent_spec(agent_spec)
.build()
.expect("failed to build runtime");
let runtime = Arc::new(runtime);
let store = Arc::new(InMemoryStore::new());
let resolver = runtime.resolver_arc();
let mailbox_store = Arc::new(InMemoryMailboxStore::new());
let mailbox = Arc::new(Mailbox::new(
runtime.clone(),
mailbox_store as Arc<dyn awaken::contract::MailboxStore>,
format!("ai-sdk:{}", std::process::id()),
MailboxConfig::default(),
));
let state = AppState::new(
runtime,
mailbox,
store as Arc<dyn ThreadRunStore>,
resolver,
ServerConfig {
address: "127.0.0.1:3000".into(),
..Default::default()
},
);
let app = build_router().with_state(state);
let listener = tokio::net::TcpListener::bind("127.0.0.1:3000").await.unwrap();
axum::serve(listener, app).await.unwrap();
}
AI SDK v6 相关路由:
POST /v1/ai-sdk/chatGET /v1/ai-sdk/chat/:thread_id/streamGET /v1/ai-sdk/threads/:thread_id/streamGET /v1/ai-sdk/threads/:id/messages
- 安装前端依赖:
npm install ai @ai-sdk/react
- 在前端里使用
useChat:
import { useChat } from "@ai-sdk/react";
export default function Chat() {
const { messages, input, handleInputChange, handleSubmit } = useChat({
api: "http://localhost:3000/v1/ai-sdk/chat",
id: "thread-1",
});
return (
<div>
{messages.map((m) => (
<div key={m.id}>
<strong>{m.role}:</strong> {m.content}
</div>
))}
<form onSubmit={handleSubmit}>
<input value={input} onChange={handleInputChange} />
<button type="submit">Send</button>
</form>
</div>
);
}
- 分别启动后端和前端。
验证
- 打开前端页面
- 发送一条消息
- 确认文本是流式出现的
- 确认后端日志中出现
RunStart/RunFinish
常见错误
| 错误 | 原因 | 修复 |
|---|---|---|
| 浏览器 CORS 错误 | 未配置 CORS 中间件 | 给 axum router 加 tower-http CORS |
useChat 收不到事件 | URL 配错 | 确认 api 指向 /v1/ai-sdk/chat |
stream closed unexpectedly | SSE 缓冲溢出 | 增大 ServerConfig.sse_buffer_size |
/v1/ai-sdk/chat 返回 404 | 没开 server feature | 在 Cargo.toml 里启用 |
相关示例
examples/ai-sdk-starter/agent/src/main.rs
关键文件
| 路径 | 作用 |
|---|---|
crates/awaken-server/src/protocols/ai_sdk_v6/http.rs | AI SDK v6 路由 |
crates/awaken-server/src/protocols/ai_sdk_v6/encoder.rs | AI SDK v6 SSE encoder |
crates/awaken-server/src/routes.rs | 总路由 |
crates/awaken-server/src/app.rs | AppState / ServerConfig |
examples/ai-sdk-starter/agent/src/main.rs | AI SDK starter 后端入口 |
相关
集成 CopilotKit(AG-UI)
当你有一个 CopilotKit React 前端,并想通过 AG-UI 协议接入 awaken agent server 时,使用本页。
前置条件
- 已有可运行的 awaken runtime
awaken启用了server- Node.js 项目中已安装
@copilotkit/react-core和@copilotkit/react-ui
[dependencies]
awaken = { package = "awaken-agent", version = "0.1", features = ["server"] }
tokio = { version = "1", features = ["full"] }
async-trait = "0.1"
serde_json = "1"
tracing-subscriber = "0.3"
步骤
- 启动后端 server:
use std::sync::Arc;
use awaken::engine::GenaiExecutor;
use awaken::contract::storage::ThreadRunStore;
use awaken::registry_spec::{AgentSpec, ModelSpec};
use awaken::stores::{InMemoryMailboxStore, InMemoryStore};
use awaken::AgentRuntimeBuilder;
use awaken::server::app::{AppState, ServerConfig};
use awaken::server::mailbox::{Mailbox, MailboxConfig};
use awaken::server::routes::build_router;
#[tokio::main]
async fn main() {
tracing_subscriber::fmt().with_target(true).init();
let agent_spec = AgentSpec::new("copilotkit-agent")
.with_model("gpt-4o-mini")
.with_system_prompt(
"You are a CopilotKit-powered assistant. \
Update shared state and suggest actions when appropriate.",
)
.with_max_rounds(10);
let runtime = AgentRuntimeBuilder::new()
.with_provider("openai", Arc::new(GenaiExecutor::new()))
.with_model(
"gpt-4o-mini",
ModelSpec {
id: "gpt-4o-mini".into(),
provider: "openai".into(),
model: "gpt-4o-mini".into(),
},
)
.with_agent_spec(agent_spec)
.build()
.expect("failed to build runtime");
let runtime = Arc::new(runtime);
let store = Arc::new(InMemoryStore::new());
let resolver = runtime.resolver_arc();
let mailbox_store = Arc::new(InMemoryMailboxStore::new());
let mailbox = Arc::new(Mailbox::new(
runtime.clone(),
mailbox_store as Arc<dyn awaken::contract::MailboxStore>,
format!("copilotkit:{}", std::process::id()),
MailboxConfig::default(),
));
let state = AppState::new(
runtime,
mailbox,
store as Arc<dyn ThreadRunStore>,
resolver,
ServerConfig {
address: "127.0.0.1:3000".into(),
..Default::default()
},
);
let app = build_router().with_state(state);
let listener = tokio::net::TcpListener::bind("127.0.0.1:3000")
.await
.expect("failed to bind");
axum::serve(listener, app).await.expect("server crashed");
}
服务器会自动在以下路径注册 AG-UI 路由:
AG-UI 路由包括:
POST /v1/ag-ui/runPOST /v1/ag-ui/threads/:thread_id/runsPOST /v1/ag-ui/agents/:agent_id/runsPOST /v1/ag-ui/threads/:thread_id/interruptGET /v1/ag-ui/threads/:id/messages
- 安装 CopilotKit:
npm install @copilotkit/react-core @copilotkit/react-ui
- 用
CopilotKitprovider 包裹应用:
import { CopilotKit } from "@copilotkit/react-core";
import { CopilotChat } from "@copilotkit/react-ui";
import "@copilotkit/react-ui/styles.css";
export default function App() {
return (
<CopilotKit runtimeUrl="http://localhost:3000/v1/ag-ui">
<CopilotChat
labels={{ title: "Agent", initial: "How can I help?" }}
/>
</CopilotKit>
);
}
- 分别启动后端和前端。
验证
- 打开页面
- 在 CopilotChat 中发送消息
- 确认聊天 UI 中有流式回复
- 查看后端日志里的
RunStart/RunFinish
常见错误
| 错误 | 原因 | 修复 |
|---|---|---|
| 浏览器 CORS 错误 | 未配置 CORS 中间件 | 给 axum router 加 CORS |
| CopilotKit 提示 connection failed | runtimeUrl 错了 | 指向 http://localhost:3000/v1/ag-ui |
| 有事件但 UI 不更新 | AG-UI 事件格式不兼容 | 确认 CopilotKit 版本匹配 |
/v1/ag-ui/run 返回 404 | 没开 server feature | 在 Cargo.toml 里启用 |
相关示例
examples/copilotkit-starter/agent/src/main.rs
关键文件
| 路径 | 作用 |
|---|---|
crates/awaken-server/src/protocols/ag_ui/http.rs | AG-UI 路由 |
crates/awaken-server/src/protocols/ag_ui/encoder.rs | AG-UI encoder |
crates/awaken-server/src/routes.rs | 总路由 |
crates/awaken-server/src/app.rs | AppState / ServerConfig |
examples/copilotkit-starter/agent/src/main.rs | CopilotKit starter 后端入口 |
相关
状态与存储
这条路径面向已经不满足无状态演示、需要认真设计状态与持久化的团队。
你可以在这里决定
- thread / run 数据放在哪里
- 状态键和合并策略怎么组织
- 每一轮究竟把多少上下文送给模型
推荐顺序
- 从 使用文件存储 或 使用 Postgres 存储 开始,先确定持久化后端。
- 阅读 状态键 和 线程模型,理解状态布局和生命周期。
- 当上下文规模开始成为问题时,再阅读 优化上下文窗口。
相关内部机制
使用文件存储
当你希望在不引入外部数据库的情况下,用文件系统持久化 threads、runs 和 messages 时,使用本页。
前置条件
awaken-stores启用了filefeature
步骤
- 添加依赖:
[dependencies]
awaken-stores = { version = "...", features = ["file"] }
如果使用 awaken 门面 crate,也建议直接加 awaken-stores 来启用 file feature。
- 创建
FileStore:
use std::sync::Arc;
use awaken::stores::FileStore;
let store = Arc::new(FileStore::new("./data"));
目录会在首次写入时自动创建,布局如下:
./data/
threads/<thread_id>.json
messages/<thread_id>.json
runs/<run_id>.json
- 接入 runtime:
use awaken::AgentRuntimeBuilder;
let runtime = AgentRuntimeBuilder::new()
.with_thread_run_store(store)
.with_agent_spec(spec)
.with_provider("anthropic", Arc::new(provider))
.build()?;
- 生产环境建议使用绝对路径:
use std::path::PathBuf;
let data_dir = PathBuf::from("/var/lib/myapp/awaken");
let store = Arc::new(FileStore::new(data_dir));
验证
运行 agent 后检查目录,应该看到 threads/、messages/、runs/ 下生成了 JSON 文件。
常见错误
| 错误 | 原因 | 修复 |
|---|---|---|
StorageError::Io | 目录没有读写权限 | 确保进程对目标路径有权限 |
StorageError::Io 且 ID 为空或非法 | thread/run ID 包含非法字符 | 使用 UUID 风格或简单字母数字 ID |
| 重启后找不到数据 | 相对路径在不同启动目录下解析不同 | 改用绝对路径 |
相关示例
crates/awaken-stores/src/file.rs
关键文件
crates/awaken-stores/Cargo.tomlcrates/awaken-stores/src/file.rscrates/awaken-stores/src/lib.rs
相关
使用 Postgres 存储
当你需要可持久化、可多实例共享的存储后端时,使用 PostgreSQL。
前置条件
awaken-stores启用了postgresfeature- 有一个可连接的 PostgreSQL 实例
sqlx所需 tokio 运行时依赖已就绪
步骤
- 添加依赖:
[dependencies]
awaken-stores = { version = "...", features = ["postgres"] }
- 创建连接池:
use sqlx::PgPool;
let pool = PgPool::connect("postgres://user:pass@localhost:5432/mydb").await?;
- 创建
PostgresStore:
use std::sync::Arc;
use awaken::stores::PostgresStore;
let store = Arc::new(PostgresStore::new(pool));
默认表名:
awaken_threadsawaken_runsawaken_messages
- 使用自定义前缀:
let store = Arc::new(PostgresStore::with_prefix(pool, "myapp"));
- 接入 runtime:
use awaken::AgentRuntimeBuilder;
let runtime = AgentRuntimeBuilder::new()
.with_thread_run_store(store)
.with_agent_spec(spec)
.with_provider("anthropic", Arc::new(provider))
.build()?;
- Schema 初始化:
表会在首次访问时自动通过 ensure_schema() 创建。初始接入无需手动 migration。
验证
执行 agent 后,在数据库中查询:
SELECT id, updated_at FROM awaken_threads;
SELECT id, updated_at FROM awaken_runs;
应该能看到对应记录。
常见错误
| 错误 | 原因 | 修复 |
|---|---|---|
sqlx::Error connection refused | PostgreSQL 未启动或连接串错误 | 检查 DATABASE_URL 和数据库状态 |
首次写入报 StorageError | 数据库用户权限不足 | 授予建表和写入权限 |
| 表名冲突 | 其他应用共用了默认表名 | 用 with_prefix() 做命名空间隔离 |
相关示例
crates/awaken-stores/src/postgres.rs
关键文件
crates/awaken-stores/Cargo.tomlcrates/awaken-stores/src/postgres.rscrates/awaken-stores/src/lib.rs
相关
优化上下文窗口
当你需要控制运行时如何管理会话历史,以避免超过模型上下文上限时,使用本页。
前置条件
- 已添加
awaken - 已有一个
AgentSpec
ContextWindowPolicy
每个 agent 都可以配置 ContextWindowPolicy:
use awaken::ContextWindowPolicy;
let policy = ContextWindowPolicy {
max_context_tokens: 200_000,
max_output_tokens: 16_384,
min_recent_messages: 10,
enable_prompt_cache: true,
autocompact_threshold: Some(100_000),
compaction_mode: ContextCompactionMode::KeepRecentRawSuffix,
compaction_raw_suffix_messages: 2,
};
字段
| 字段 | 类型 | 默认值 | 说明 |
|---|---|---|---|
max_context_tokens | usize | 200_000 | 模型上下文总窗口(token 数) |
max_output_tokens | usize | 16_384 | 为模型输出预留的 token |
min_recent_messages | usize | 10 | 始终保留的最近消息数,即使超出预算 |
enable_prompt_cache | bool | true | 是否启用 prompt cache |
autocompact_threshold | Option<usize> | None | 达到该 token 数时触发自动压缩,None 表示禁用 |
compaction_mode | ContextCompactionMode | KeepRecentRawSuffix | 自动压缩策略 |
compaction_raw_suffix_messages | usize | 2 | 后缀压缩模式下保留的原始消息数 |
截断(Truncation)
当消息总量超过预算时,运行时会自动丢弃最旧消息。预算大致为:
available = max_context_tokens - max_output_tokens - tool_schema_tokens
截断时会保留什么
- 所有 system messages
- 至少
min_recent_messages条最近消息 - 成对的 tool call / tool result
- 不会留下悬挂的 tool calls
Artifact 压缩
在真正截断前,过大的 tool result 会先被压缩成 preview,以减少 token 占用。system / user / assistant 普通消息不会做这种压缩。
压缩(Compaction)
压缩不会简单丢消息,而是把更早的历史总结成一条摘要消息。
启用自动压缩
let policy = ContextWindowPolicy {
autocompact_threshold: Some(100_000),
compaction_mode: ContextCompactionMode::KeepRecentRawSuffix,
compaction_raw_suffix_messages: 4,
..Default::default()
};
ContextCompactionMode
KeepRecentRawSuffix:保留最近 N 条原始消息,其余压缩CompactToSafeFrontier:压缩到安全边界为止
安全边界的含义是:不会把某条 tool call 和它对应的结果拆开。
CompactionConfig
压缩子系统通过 CompactionConfig 配置:
use awaken::CompactionConfig;
let config = CompactionConfig {
summarizer_system_prompt: "You are a conversation summarizer. \
Preserve all key facts, decisions, tool results, and action items. \
Be concise but complete.".into(),
summarizer_user_prompt: "Summarize the following conversation:\n\n{messages}".into(),
summary_max_tokens: Some(1024),
summary_model: Some("claude-3-haiku".into()),
min_savings_ratio: 0.3,
};
| 字段 | 类型 | 默认值 | 说明 |
|---|---|---|---|
summarizer_system_prompt | String | 内置摘要 prompt | 摘要 LLM 的 system prompt |
summarizer_user_prompt | String | "Summarize...\n\n{messages}" | 摘要用户 prompt 模板;{messages} 会被替换为对话记录 |
summary_max_tokens | Option<u32> | None | 摘要响应的最大 token 数 |
summary_model | Option<String> | None | 摘要所用模型,默认沿用 agent 模型 |
min_savings_ratio | f64 | 0.3 | 接受一次压缩所需的最低 token 节省比例(0.0-1.0) |
DefaultSummarizer
内置 DefaultSummarizer 会读取 CompactionConfig,并支持“在已有摘要上继续增量总结”。
摘要存储
压缩结果会被保存成带 <conversation-summary> 标签的内部 system message。重新加载时,历史中较早的已摘要部分不会再被重新放回上下文窗口。
截断恢复
如果 LLM 因 MaxTokens 截断,而且生成到一半的 tool call 参数不完整,运行时可以自动注入 continuation prompt 并重试,直到达到最大重试次数。
关键文件
crates/awaken-contract/src/contract/inference.rscrates/awaken-runtime/src/context/transform/mod.rscrates/awaken-runtime/src/context/transform/compaction.rscrates/awaken-runtime/src/context/compaction.rscrates/awaken-runtime/src/context/summarizer.rscrates/awaken-runtime/src/context/plugin.rscrates/awaken-runtime/src/context/truncation.rs
相关
状态键
Awaken 的状态系统提供类型化、带作用域、可持久化的键值存储。插件和工具在编译期声明状态键,运行时负责快照、持久化和并行合并语义。
StateKey trait
每个状态槽位都由一个实现了 StateKey 的类型来标识。
pub trait StateKey: 'static + Send + Sync {
const KEY: &'static str;
const MERGE: MergeStrategy = MergeStrategy::Exclusive;
const SCOPE: KeyScope = KeyScope::Run;
type Value: Clone + Default + Serialize + DeserializeOwned + Send + Sync + 'static;
type Update: Send + 'static;
fn apply(value: &mut Self::Value, update: Self::Update);
fn encode(value: &Self::Value) -> Result<JsonValue, StateError>;
fn decode(value: JsonValue) -> Result<Self::Value, StateError>;
}
示例
struct Counter;
impl StateKey for Counter {
const KEY: &'static str = "counter";
type Value = usize;
type Update = usize;
fn apply(value: &mut Self::Value, update: Self::Update) {
*value += update;
}
}
KeyScope
控制键值在 run 边界上的生命周期:
pub enum KeyScope {
Run,
Thread,
}
MergeStrategy
决定并行执行下多个 MutationBatch 如何合并:
pub enum MergeStrategy {
Exclusive,
Commutative,
}
StateMap
状态值的类型擦除容器。
pub struct StateMap { /* ... */ }
方法
fn contains<K: StateKey>(&self) -> bool
fn get<K: StateKey>(&self) -> Option<&K::Value>
fn get_mut<K: StateKey>(&mut self) -> Option<&mut K::Value>
fn insert<K: StateKey>(&mut self, value: K::Value)
fn remove<K: StateKey>(&mut self) -> Option<K::Value>
fn get_or_insert_default<K: StateKey>(&mut self) -> &mut K::Value
Snapshot
传给 hook 和 ToolCallContext 的不可变、带 revision 的状态视图:
pub struct Snapshot {
pub revision: u64,
pub ext: Arc<StateMap>,
}
方法
fn new(revision: u64, ext: Arc<StateMap>) -> Self
fn revision(&self) -> u64
fn get<K: StateKey>(&self) -> Option<&K::Value>
fn ext(&self) -> &StateMap
StateKeyOptions
注册状态键时的选项:
pub struct StateKeyOptions {
pub persistent: bool,
pub retain_on_uninstall: bool,
pub scope: KeyScope,
}
PersistedState
存储后端使用的序列化状态格式:
pub struct PersistedState {
pub revision: u64,
pub extensions: HashMap<String, JsonValue>,
}
相关
线程模型
Thread 表示持久化会话。Thread 本身只保存 thread 元信息;消息和 run 历史通过存储 trait 单独管理。
Thread
pub struct Thread {
pub id: String,
pub metadata: ThreadMetadata,
}
构造函数
fn new() -> Self
fn with_id(id: impl Into<String>) -> Self
Builder 方法
fn with_title(self, title: impl Into<String>) -> Self
ThreadMetadata
pub struct ThreadMetadata {
pub created_at: Option<u64>,
pub updated_at: Option<u64>,
pub title: Option<String>,
pub custom: HashMap<String, Value>,
}
存储
消息不直接嵌在 Thread 里,而是通过 ThreadStore 读写:
#[async_trait]
pub trait ThreadStore: Send + Sync {
async fn load_thread(&self, thread_id: &str) -> Result<Option<Thread>, StorageError>;
async fn save_thread(&self, thread: &Thread) -> Result<(), StorageError>;
async fn delete_thread(&self, thread_id: &str) -> Result<(), StorageError>;
async fn list_threads(&self, offset: usize, limit: usize) -> Result<Vec<String>, StorageError>;
async fn load_messages(&self, thread_id: &str) -> Result<Option<Vec<Message>>, StorageError>;
async fn save_messages(&self, thread_id: &str, messages: &[Message]) -> Result<(), StorageError>;
async fn delete_messages(&self, thread_id: &str) -> Result<(), StorageError>;
async fn update_thread_metadata(&self, id: &str, metadata: ThreadMetadata) -> Result<(), StorageError>;
}
ThreadRunStore
ThreadRunStore 在 ThreadStore + RunStore 基础上增加了原子 checkpoint:
#[async_trait]
pub trait ThreadRunStore: ThreadStore + RunStore + Send + Sync {
async fn checkpoint(
&self,
thread_id: &str,
messages: &[Message],
run: &RunRecord,
) -> Result<(), StorageError>;
}
RunStore
#[async_trait]
pub trait RunStore: Send + Sync {
async fn create_run(&self, record: &RunRecord) -> Result<(), StorageError>;
async fn load_run(&self, run_id: &str) -> Result<Option<RunRecord>, StorageError>;
async fn latest_run(&self, thread_id: &str) -> Result<Option<RunRecord>, StorageError>;
async fn list_runs(&self, query: &RunQuery) -> Result<RunPage, StorageError>;
}
RunRecord
pub struct RunRecord {
pub run_id: String,
pub thread_id: String,
pub agent_id: String,
pub parent_run_id: Option<String>,
pub status: RunStatus,
pub termination_code: Option<String>,
pub created_at: u64,
pub updated_at: u64,
pub steps: usize,
pub input_tokens: u64,
pub output_tokens: u64,
pub state: Option<PersistedState>,
}
相关
运行与运维
当 happy path 已经跑通,这条路径用于把 Agent 服务加固到可运维状态。
推荐顺序
- 先启用 可观测性,把 run、tool 和 provider 变得可见。
- 再启用 工具权限 HITL,为工具执行增加审批控制。
- 通过 配置停止策略 把 agent loop 约束在可预测范围内。
- 用 上报 Tool 进度 和 测试策略 提升可观测性和上线信心。
建议搭配阅读
启用工具权限 HITL
当你需要控制 agent 能调用哪些工具,并对敏感操作启用人工审批时,使用本页。
前置条件
- 已有可运行的 awaken runtime
awaken启用了permission
[dependencies]
awaken = { package = "awaken-agent", version = "0.1", features = ["permission"] }
tokio = { version = "1", features = ["full"] }
serde_json = "1"
步骤
- 注册 permission 插件:
use std::sync::Arc;
use awaken::engine::GenaiExecutor;
use awaken::ext_permission::PermissionPlugin;
use awaken::registry_spec::{AgentSpec, ModelSpec};
use awaken::{AgentRuntimeBuilder, Plugin};
let agent_spec = AgentSpec::new("my-agent")
.with_model("gpt-4o-mini")
.with_system_prompt("You are a helpful assistant.")
.with_hook_filter("permission");
let runtime = AgentRuntimeBuilder::new()
.with_provider("openai", Arc::new(GenaiExecutor::new()))
.with_model(
"gpt-4o-mini",
ModelSpec {
id: "gpt-4o-mini".into(),
provider: "openai".into(),
model: "gpt-4o-mini".into(),
},
)
.with_agent_spec(agent_spec)
.with_plugin("permission", Arc::new(PermissionPlugin) as Arc<dyn Plugin>)
.build()
.expect("failed to build runtime");
- 以内联方式定义规则:
use awaken::ext_permission::{PermissionRulesConfig, PermissionRuleEntry, ToolPermissionBehavior};
let config = PermissionRulesConfig {
default_behavior: ToolPermissionBehavior::Ask,
rules: vec![
PermissionRuleEntry {
tool: "read_file".into(),
behavior: ToolPermissionBehavior::Allow,
scope: Default::default(),
},
PermissionRuleEntry {
tool: "file_*".into(),
behavior: ToolPermissionBehavior::Ask,
scope: Default::default(),
},
PermissionRuleEntry {
tool: "delete_*".into(),
behavior: ToolPermissionBehavior::Deny,
scope: Default::default(),
},
],
};
- 也可以从 YAML 文件加载:
default_behavior: ask
rules:
- tool: "read_file"
behavior: allow
- tool: "Bash(npm *)"
behavior: allow
- tool: "file_*"
behavior: ask
- tool: "delete_*"
behavior: deny
- 通过 agent spec 激活:
let agent_spec = AgentSpec::new("my-agent")
.with_model("gpt-4o-mini")
.with_system_prompt("You are a helpful assistant.")
.with_hook_filter("permission");
-
理解规则优先级:
-
Deny -
Allow -
Ask
匹配 DSL 支持:
| Pattern | 匹配方式 |
|---|---|
read_file | 精确匹配工具名 |
file_* | 工具名 glob |
mcp__github__* | MCP 工具 glob |
Bash(npm *) | 主参数 glob |
Edit(file_path ~ "src/**") | 命名字段 glob |
Bash(command =~ "(?i)rm") | 命名字段 regex |
/mcp__(gh|gl)__.*/ | 工具名 regex |
验证
- 用一个命中
deny的工具测试,调用应被拦截 - 用一个命中
ask的工具测试,run 应进入等待审批状态 - 通过 mailbox 接口提交审批
- 确认 run 恢复执行
常见错误
| 错误 | 原因 | 修复 |
|---|---|---|
| 所有工具都被拦住 | default_behavior: deny 且无 allow 规则 | 显式给安全工具加 allow |
| 规则没有生效 | 插件没注册 | 注册 PermissionPlugin 并激活 |
| pattern 无效 | glob / regex 语法错 | 对照 DSL 语法检查 |
| ask 一直不恢复 | 没有 mailbox consumer | 让前端或 API 客户端消费审批请求 |
相关示例
crates/awaken-ext-permission/tests/
关键文件
crates/awaken-ext-permission/src/lib.rscrates/awaken-ext-permission/src/config.rscrates/awaken-ext-permission/src/rules.rscrates/awaken-ext-permission/src/plugin/plugin.rscrates/awaken-ext-permission/src/plugin/checker.rscrates/awaken-tool-pattern/
相关
配置停止策略
当你需要根据轮数、token 用量、耗时或连续错误来决定 agent run 何时终止时,使用本页。
前置条件
- 已添加
awaken - 了解
Plugin与AgentRuntimeBuilder
概览
stop policy 会在每次推理步骤结束后判断 run 是否继续。内置策略包括:
| 策略 | 触发条件 |
|---|---|
MaxRoundsPolicy | 步数超过上限 |
TokenBudgetPolicy | 输入+输出 token 超过预算 |
TimeoutPolicy | 墙钟时间超过上限 |
ConsecutiveErrorsPolicy | 连续推理错误达到阈值 |
步骤
- 以编程方式构造策略:
use std::sync::Arc;
use awaken::policies::{
MaxRoundsPolicy, TokenBudgetPolicy, TimeoutPolicy,
ConsecutiveErrorsPolicy, StopPolicy,
};
let policies: Vec<Arc<dyn StopPolicy>> = vec![
Arc::new(MaxRoundsPolicy::new(25)),
Arc::new(TokenBudgetPolicy::new(100_000)),
Arc::new(TimeoutPolicy::new(300_000)),
Arc::new(ConsecutiveErrorsPolicy::new(3)),
];
- 把
StopConditionPlugin注册到 runtime:
use awaken::policies::StopConditionPlugin;
use awaken::AgentRuntimeBuilder;
let runtime = AgentRuntimeBuilder::new()
.with_plugin("stop-condition", Arc::new(StopConditionPlugin::new(policies)))
.with_agent_spec(spec)
.with_provider("anthropic", Arc::new(provider))
.build()?;
如果只需要限制轮数,也可以直接用 MaxRoundsPlugin:
use awaken::policies::MaxRoundsPlugin;
let runtime = AgentRuntimeBuilder::new()
.with_plugin("stop-condition:max-rounds", Arc::new(MaxRoundsPlugin::new(10)))
.with_agent_spec(spec)
.with_provider("anthropic", Arc::new(provider))
.build()?;
- 用声明式
StopConditionSpec:
use awaken_contract::contract::lifecycle::StopConditionSpec;
use awaken::policies::{policies_from_specs, StopConditionPlugin};
let specs = vec![
StopConditionSpec::MaxRounds { rounds: 10 },
StopConditionSpec::Timeout { seconds: 300 },
StopConditionSpec::TokenBudget { max_total: 100_000 },
StopConditionSpec::ConsecutiveErrors { max: 3 },
];
let policies = policies_from_specs(&specs);
let plugin = StopConditionPlugin::new(policies);
完整的 StopConditionSpec 还包含 StopOnTool、ContentMatch、LoopDetection,但这些目前只有契约定义,尚未在 policies_from_specs 中实现。
StopPolicy Trait
你也可以实现自定义策略:
use awaken::policies::{StopPolicy, StopDecision, StopPolicyStats};
pub struct MyCustomPolicy {
pub threshold: u64,
}
impl StopPolicy for MyCustomPolicy {
fn id(&self) -> &str {
"my_custom"
}
fn evaluate(&self, stats: &StopPolicyStats) -> StopDecision {
if stats.total_output_tokens > self.threshold {
StopDecision::Stop {
code: "my_custom".into(),
detail: format!("output tokens {} exceeded {}", stats.total_output_tokens, self.threshold),
}
} else {
StopDecision::Continue
}
}
}
StopPolicyStats
内置 StopConditionHook 会为每次评估准备一份 StopPolicyStats:
| 字段 | 类型 | 说明 |
|---|---|---|
step_count | u32 | 已完成推理步数 |
total_input_tokens | u64 | 累计输入 token |
total_output_tokens | u64 | 累计输出 token |
elapsed_ms | u64 | 从第一步开始经过的毫秒数 |
consecutive_errors | u32 | 连续推理错误计数 |
last_tool_names | Vec<String> | 最近一轮里调用的工具名 |
last_response_text | String | 最近一次推理返回的文本 |
StopDecision
pub enum StopDecision {
Continue,
Stop { code: String, detail: String },
}
任何一个 policy 返回 Stop,该 run 就会以 TerminationReason::Stopped 结束。
Stop policy 如何进入 agent loop
StopConditionPlugin在Phase::AfterInference注册 hook- 每轮推理结束后,hook 会更新统计状态
- 构造
StopPolicyStats - 依次调用每个策略的
evaluate - 如果有策略返回
Stop,则写入RunLifecycleUpdate::Done
max 或 max_total 设为 0 的策略视为禁用,始终返回 Continue。
常见错误
| 错误 | 原因 | 修复 |
|---|---|---|
| run 永远不结束 | 没注册 stop policy,LLM 一直在调工具 | 至少加一个 MaxRoundsPolicy |
StateError::KeyAlreadyRegistered | 同时注册了 StopConditionPlugin 和 MaxRoundsPlugin | 二选一 |
| Timeout 提前触发 | TimeoutPolicy::new() 传的是毫秒,StopConditionSpec::Timeout 是秒 | 注意单位 |
关键文件
crates/awaken-runtime/src/policies/mod.rscrates/awaken-runtime/src/policies/policy.rscrates/awaken-runtime/src/policies/plugin.rscrates/awaken-runtime/src/policies/state.rscrates/awaken-runtime/src/policies/hook.rscrates/awaken-contract/src/contract/lifecycle.rs
相关
启用可观测性
当你需要用 OpenTelemetry 兼容的遥测方式追踪 LLM 推理和工具执行时,使用本页。
前置条件
- 已有可运行的 awaken runtime
awaken启用了observability- 如果要导出 OTel:
awaken-ext-observability需要启用otel,并准备好 collector
[dependencies]
awaken = { package = "awaken-agent", version = "0.1", features = ["observability"] }
tokio = { version = "1", features = ["full"] }
步骤
- 先用内存 sink(开发环境):
use std::sync::Arc;
use awaken::engine::GenaiExecutor;
use awaken::ext_observability::{ObservabilityPlugin, InMemorySink};
use awaken::registry_spec::{AgentSpec, ModelSpec};
use awaken::{AgentRuntimeBuilder, Plugin};
let sink = InMemorySink::new();
let obs_plugin = ObservabilityPlugin::new(sink.clone());
let agent_spec = AgentSpec::new("observed-agent")
.with_model("gpt-4o-mini")
.with_system_prompt("You are a helpful assistant.")
.with_hook_filter("observability");
let runtime = AgentRuntimeBuilder::new()
.with_provider("openai", Arc::new(GenaiExecutor::new()))
.with_model(
"gpt-4o-mini",
ModelSpec {
id: "gpt-4o-mini".into(),
provider: "openai".into(),
model: "gpt-4o-mini".into(),
},
)
.with_agent_spec(agent_spec)
.with_plugin("observability", Arc::new(obs_plugin) as Arc<dyn Plugin>)
.build()
.expect("failed to build runtime");
run 结束后,可以直接读取 sink.metrics()。
- 换成 OTel sink(生产环境):
use std::sync::Arc;
use awaken::engine::GenaiExecutor;
use awaken::ext_observability::{ObservabilityPlugin, OtelMetricsSink};
use awaken::registry_spec::{AgentSpec, ModelSpec};
use awaken::{AgentRuntimeBuilder, Plugin};
use opentelemetry_sdk::trace::SdkTracerProvider;
let provider = SdkTracerProvider::builder().build();
let tracer = provider.tracer("awaken");
let obs_plugin = ObservabilityPlugin::new(OtelMetricsSink::new(tracer));
- 如果需要,也可以实现自定义 sink:
use awaken::ext_observability::{MetricsSink, GenAISpan, ToolSpan, AgentMetrics};
struct MySink;
impl MetricsSink for MySink {
fn on_inference(&self, span: &GenAISpan) {}
fn on_tool(&self, span: &ToolSpan) {}
fn on_run_end(&self, metrics: &AgentMetrics) {}
}
- 插件会在这些 phase 采集数据:
| Phase | 采集内容 |
|---|---|
RunStart | session 起始时间 |
BeforeInference | model、provider、开始时间 |
AfterInference | token usage、finish reason、耗时 |
BeforeToolExecute | tool 开始时间 |
AfterToolExecute | tool 耗时、失败状态 |
RunEnd | session 总时长 |
验证
- 用
InMemorySink跑一个 agent - 执行结束后调用
sink.metrics() - 确认
inferences非空且 token 统计有值 - 如果用 OTel,去 collector / Jaeger 确认 span 已上报
常见错误
| 错误 | 原因 | 修复 |
|---|---|---|
| metrics 全是 0 | 插件没注册 | 通过 builder 注册 ObservabilityPlugin |
找不到 OtelMetricsSink | 缺少 otel feature | 给 awaken-ext-observability 开 otel |
| collector 里没有 span | exporter 没配置或 tracer provider 被提前释放 | 检查 exporter 和 provider 生命周期 |
| token 统计缺失 | provider 没返回 usage | 确保 LlmExecutor 产生 TokenUsage |
相关示例
crates/awaken-ext-observability/tests/
关键文件
crates/awaken-ext-observability/src/lib.rscrates/awaken-ext-observability/src/plugin/plugin.rscrates/awaken-ext-observability/src/plugin/hooks.rscrates/awaken-ext-observability/src/metrics.rscrates/awaken-ext-observability/src/sink.rscrates/awaken-ext-observability/src/otel.rs
相关
上报 Tool 进度
当你需要在工具执行过程中,把进度或活动快照实时流回前端时,使用本页。
前置条件
- 已有一个
Tool实现,并能访问ToolCallContext - 已添加
awaken
步骤
- 用
report_progress上报结构化进度:
use awaken::contract::tool::{Tool, ToolCallContext, ToolDescriptor, ToolError, ToolResult, ToolOutput};
use awaken::contract::progress::ProgressStatus;
use async_trait::async_trait;
use serde_json::{Value, json};
struct IndexTool;
#[async_trait]
impl Tool for IndexTool {
fn descriptor(&self) -> ToolDescriptor {
ToolDescriptor::new("index_docs", "Index Docs", "Index a document set")
}
async fn execute(&self, args: Value, ctx: &ToolCallContext) -> Result<ToolOutput, ToolError> {
ctx.report_progress(ProgressStatus::Running, Some("Starting indexing"), None).await;
for i in 0..10 {
let fraction = (i + 1) as f64 / 10.0;
ctx.report_progress(
ProgressStatus::Running,
Some(&format!("Indexed batch {}/10", i + 1)),
Some(fraction),
).await;
}
ctx.report_progress(ProgressStatus::Done, Some("Indexing complete"), Some(1.0)).await;
Ok(ToolResult::success("index_docs", json!({"indexed": 10})).into())
}
}
- 用
report_activity上报完整活动快照:
ctx.report_activity("code-generation", "fn hello() {\n println!(\"hi\");\n}").await;
- 用
report_activity_delta上报增量补丁:
use serde_json::json;
ctx.report_activity_delta(
"code-generation",
json!([
{ "op": "add", "path": "/line", "value": " println!(\"world\");" }
]),
).await;
- 用
ProgressStatus反映 tool call 生命周期:
| 变体 | 含义 |
|---|---|
Pending | 已排队,尚未开始 |
Running | 正在执行 |
Done | 成功完成 |
Failed | 执行失败 |
Cancelled | 被取消 |
原理
report_progress 内部会构造 ToolCallProgressState,再由运行时包装成 AgentEvent::ActivitySnapshot:
pub struct ToolCallProgressState {
pub schema: String,
pub node_id: String,
pub call_id: String,
pub tool_name: String,
pub status: ProgressStatus,
pub progress: Option<f64>,
pub loaded: Option<u64>,
pub total: Option<u64>,
pub message: Option<String>,
pub parent_node_id: Option<String>,
pub parent_call_id: Option<String>,
}
活动类型常量是 "tool-call-progress"。
验证
订阅 SSE 事件流,确认在工具执行期间能收到 ActivitySnapshot,并且 status 从 "running" 变成 "done"。
常见错误
| 错误 | 原因 | 修复 |
|---|---|---|
| 没有事件 | activity_sink 为空 | 确保运行时配置了事件 sink |
| 前端没有更新进度 | 前端未处理该活动类型 | 过滤 activity_type == "tool-call-progress" |
关键文件
crates/awaken-contract/src/contract/progress.rscrates/awaken-contract/src/contract/tool.rs
相关
测试策略
当你需要测试工具、插件、状态键或完整的代理运行,且不依赖真实 LLM 时,可以使用本指南。
前置条件
- 在
Cargo.toml中添加awakencrate(包含运行时的 re-exports) tokio需启用rt和macrosfeatures 以支持异步测试serde_json用于构造工具参数和断言
1. 单元测试 Tool
创建一个带有测试快照的 ToolCallContext,调用 tool.execute(),然后对返回的 ToolOutput 进行断言。
use async_trait::async_trait;
use serde_json::{Value, json};
use awaken::contract::tool::{
Tool, ToolCallContext, ToolDescriptor, ToolError, ToolOutput, ToolResult,
};
struct GreetTool;
#[async_trait]
impl Tool for GreetTool {
fn descriptor(&self) -> ToolDescriptor {
ToolDescriptor::new("greet", "greet", "Greet a user by name")
}
async fn execute(&self, args: Value, _ctx: &ToolCallContext) -> Result<ToolOutput, ToolError> {
let name = args["name"]
.as_str()
.ok_or_else(|| ToolError::InvalidArguments("missing 'name'".into()))?;
Ok(ToolResult::success("greet", json!({ "greeting": format!("Hello, {name}!") })).into())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn greet_tool_returns_greeting() {
let tool = GreetTool;
let ctx = ToolCallContext::test_default();
let output = tool.execute(json!({"name": "Alice"}), &ctx).await.unwrap();
assert!(output.result.is_success());
assert_eq!(output.result.data["greeting"], "Hello, Alice!");
}
#[tokio::test]
async fn greet_tool_rejects_missing_name() {
let tool = GreetTool;
let ctx = ToolCallContext::test_default();
let err = tool.execute(json!({}), &ctx).await.unwrap_err();
assert!(matches!(err, ToolError::InvalidArguments(_)));
}
}
当你的工具通过 ToolOutput::with_command() 返回副作用时,对 command 字段进行断言:
#[tokio::test]
async fn tool_produces_state_command() {
let tool = CounterMutationTool { increment: 5 };
let ctx = ToolCallContext::test_default();
let output = tool.execute(json!({}), &ctx).await.unwrap();
assert!(output.result.is_success());
// The command is opaque at this level; integration tests (section 4)
// verify that commands are applied correctly to the StateStore.
assert!(!output.command.is_empty());
}
2. 单元测试 Plugin
通过创建 PluginRegistrar 并调用 plugin.register(),验证插件注册了预期的状态键和钩子。
use awaken::contract::StateError;
use awaken::state::{StateKey, MergeStrategy, StateKeyOptions, StateCommand};
use awaken::plugins::{Plugin, PluginDescriptor, PluginRegistrar};
use serde::{Serialize, Deserialize};
// -- State key --
struct Counter;
impl StateKey for Counter {
const KEY: &'static str = "test.counter";
const MERGE: MergeStrategy = MergeStrategy::Commutative;
type Value = usize;
type Update = usize;
fn apply(value: &mut Self::Value, update: Self::Update) {
*value += update;
}
}
// -- Plugin --
struct CounterPlugin;
impl Plugin for CounterPlugin {
fn descriptor(&self) -> PluginDescriptor {
PluginDescriptor { name: "counter" }
}
fn register(&self, r: &mut PluginRegistrar) -> Result<(), StateError> {
r.register_key::<Counter>(StateKeyOptions::default())?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use awaken::state::StateStore;
#[test]
fn counter_plugin_registers_key() {
let store = StateStore::new();
// install_plugin calls register() internally
store.install_plugin(CounterPlugin).unwrap();
// The key should now be readable (returns the default value)
let val = store.read::<Counter>().unwrap_or_default();
assert_eq!(val, 0);
}
#[test]
fn counter_plugin_rejects_double_registration() {
let store = StateStore::new();
store.install_plugin(CounterPlugin).unwrap();
// Registering the same key again should fail
let result = store.install_plugin(CounterPlugin);
assert!(result.is_err());
}
}
要直接测试阶段钩子,请构建一个最小的 PhaseContext 并检查返回的 StateCommand:
#[tokio::test]
async fn audit_hook_appends_entry() {
let hook = AuditHook;
let ctx = PhaseContext::test_default();
let cmd = hook.run(&ctx).await.unwrap();
// Commit the command to a test store and verify
let store = StateStore::new();
store.install_plugin(AuditPlugin).unwrap();
store.commit(cmd).unwrap();
let log = store.read::<AuditLogKey>().unwrap();
assert!(!log.entries.is_empty());
}
3. 单元测试 StateKey
直接测试 apply() 变更操作,无需任何运行时开销:
use awaken::state::{StateKey, MergeStrategy};
struct HitCounter;
impl StateKey for HitCounter {
const KEY: &'static str = "test.hit_counter";
const MERGE: MergeStrategy = MergeStrategy::Commutative;
type Value = u64;
type Update = u64;
fn apply(value: &mut Self::Value, update: Self::Update) {
*value += update;
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn apply_increments_counter() {
let mut value = u64::default();
HitCounter::apply(&mut value, 5);
assert_eq!(value, 5);
HitCounter::apply(&mut value, 3);
assert_eq!(value, 8);
}
#[test]
fn apply_from_default_is_identity_free() {
let mut value = u64::default();
HitCounter::apply(&mut value, 0);
assert_eq!(value, 0);
}
}
对于具有复杂值类型的状态键,测试边界情况如空集合或合并冲突:
#[test]
fn apply_merge_replaces_entries() {
let mut log = AuditLog { entries: vec!["old".into()] };
AuditLogKey::apply(&mut log, AuditLog { entries: vec!["new".into()] });
// Exclusive merge replaces the entire value
assert_eq!(log.entries, vec!["new"]);
}
4. 使用模拟 LLM 进行集成测试
使用脚本化的 LlmExecutor 构建完整的代理运行时,返回预设的响应。这是 awaken-runtime 集成测试中使用的主要模式。
use std::sync::{Arc, Mutex};
use async_trait::async_trait;
use serde_json::json;
use awaken::contract::content::ContentBlock;
use awaken::contract::event_sink::VecEventSink;
use awaken::contract::executor::{InferenceExecutionError, InferenceRequest, LlmExecutor};
use awaken::contract::identity::{RunIdentity, RunOrigin};
use awaken::contract::inference::{StopReason, StreamResult};
use awaken::contract::message::{Message, ToolCall};
use awaken::contract::tool::{
Tool, ToolCallContext, ToolDescriptor, ToolError, ToolOutput, ToolResult,
};
use awaken::loop_runner::{AgentLoopParams, LoopStatePlugin, build_agent_env, run_agent_loop};
use awaken::registry::{AgentResolver, ResolvedAgent};
use awaken::state::StateStore;
use awaken::phase::PhaseRuntime;
use awaken::RuntimeError;
// -- 脚本化 LLM 执行器 --
struct ScriptedLlm {
responses: Mutex<Vec<StreamResult>>,
}
impl ScriptedLlm {
fn new(responses: Vec<StreamResult>) -> Self {
Self {
responses: Mutex::new(responses),
}
}
}
#[async_trait]
impl LlmExecutor for ScriptedLlm {
async fn execute(
&self,
_request: InferenceRequest,
) -> Result<StreamResult, InferenceExecutionError> {
let mut responses = self.responses.lock().unwrap();
if responses.is_empty() {
// Fallback: end the conversation
Ok(StreamResult {
content: vec![ContentBlock::text("Done.")],
tool_calls: vec![],
usage: None,
stop_reason: Some(StopReason::EndTurn),
has_incomplete_tool_calls: false,
})
} else {
Ok(responses.remove(0))
}
}
fn name(&self) -> &str {
"scripted"
}
}
// -- 解析器 --
struct FixedResolver {
agent: ResolvedAgent,
}
impl AgentResolver for FixedResolver {
fn resolve(&self, _agent_id: &str) -> Result<ResolvedAgent, RuntimeError> {
let mut agent = self.agent.clone();
agent.env = build_agent_env(&[], &agent)?;
Ok(agent)
}
}
// -- 辅助函数 --
fn tool_step(calls: Vec<ToolCall>) -> StreamResult {
StreamResult {
content: vec![],
tool_calls: calls,
usage: None,
stop_reason: Some(StopReason::ToolUse),
has_incomplete_tool_calls: false,
}
}
fn text_step(text: &str) -> StreamResult {
StreamResult {
content: vec![ContentBlock::text(text)],
tool_calls: vec![],
usage: None,
stop_reason: Some(StopReason::EndTurn),
has_incomplete_tool_calls: false,
}
}
fn test_identity() -> RunIdentity {
RunIdentity::new(
"thread-test".into(),
None,
"run-test".into(),
None,
"agent".into(),
RunOrigin::User,
)
}
// -- 测试 --
#[tokio::test]
async fn tool_call_flow_end_to_end() {
// Script: LLM calls get_weather, then responds with text
let llm = Arc::new(ScriptedLlm::new(vec![
tool_step(vec![ToolCall::new("c1", "get_weather", json!({"city": "Tokyo"}))]),
text_step("The weather in Tokyo is sunny."),
]));
let agent = ResolvedAgent::new("test", "model", "You are helpful.", llm)
.with_tool(Arc::new(GetWeatherTool));
let store = StateStore::new();
let runtime = PhaseRuntime::new(store.clone()).unwrap();
store.install_plugin(LoopStatePlugin).unwrap();
let resolver = FixedResolver { agent };
let sink = Arc::new(VecEventSink::new());
let result = run_agent_loop(AgentLoopParams {
resolver: &resolver,
agent_id: "test",
runtime: &runtime,
sink: sink.clone(),
checkpoint_store: None,
messages: vec![Message::user("What's the weather?")],
run_identity: test_identity(),
cancellation_token: None,
decision_rx: None,
overrides: None,
frontend_tools: Vec::new(),
})
.await
.unwrap();
assert_eq!(result.response, "The weather in Tokyo is sunny.");
assert_eq!(result.steps, 2); // tool step + text step
}
对于更简单的场景,可以使用内置的 MockLlmExecutor,它仅返回纯文本响应:
use awaken::engine::MockLlmExecutor;
let llm = Arc::new(MockLlmExecutor::new().with_responses(vec!["Hello!".into()]));
let agent = ResolvedAgent::new("test", "model", "system prompt", llm);
5. 测试事件流
使用 VecEventSink 捕获运行期间发出的所有事件,并对其顺序和内容进行断言。
use awaken::contract::event::AgentEvent;
use awaken::contract::event_sink::VecEventSink;
use awaken::contract::lifecycle::TerminationReason;
#[tokio::test]
async fn events_follow_expected_lifecycle() {
// ... set up runtime and run agent (see section 4) ...
let events = sink.take();
// Verify ordering: RunStart -> StepStart -> ... -> StepEnd -> RunFinish
assert!(matches!(events.first(), Some(AgentEvent::RunStart { .. })));
assert!(matches!(events.last(), Some(AgentEvent::RunFinish { .. })));
// Count specific event types
let step_starts = events.iter().filter(|e| matches!(e, AgentEvent::StepStart { .. })).count();
let step_ends = events.iter().filter(|e| matches!(e, AgentEvent::StepEnd)).count();
assert_eq!(step_starts, step_ends, "every StepStart needs a StepEnd");
// Verify termination reason
if let Some(AgentEvent::RunFinish { termination, .. }) = events.last() {
assert_eq!(*termination, TerminationReason::NaturalEnd);
}
// Check that tool call events appear in the correct order
let has_tool_start = events.iter().any(|e| matches!(e, AgentEvent::ToolCallStart { .. }));
let has_tool_done = events.iter().any(|e| matches!(e, AgentEvent::ToolCallDone { .. }));
if has_tool_start {
assert!(has_tool_done, "ToolCallStart without ToolCallDone");
let start_idx = events.iter().position(|e| matches!(e, AgentEvent::ToolCallStart { .. })).unwrap();
let done_idx = events.iter().position(|e| matches!(e, AgentEvent::ToolCallDone { .. })).unwrap();
assert!(start_idx < done_idx);
}
}
一个可复用的事件类型提取辅助函数(在运行时测试套件中使用):
fn event_type(e: &AgentEvent) -> &'static str {
match e {
AgentEvent::RunStart { .. } => "run_start",
AgentEvent::RunFinish { .. } => "run_finish",
AgentEvent::StepStart { .. } => "step_start",
AgentEvent::StepEnd => "step_end",
AgentEvent::TextDelta { .. } => "text_delta",
AgentEvent::ToolCallStart { .. } => "tool_call_start",
AgentEvent::ToolCallDone { .. } => "tool_call_done",
AgentEvent::InferenceComplete { .. } => "inference_complete",
AgentEvent::StateSnapshot { .. } => "state_snapshot",
_ => "other",
}
}
let types: Vec<&str> = events.iter().map(event_type).collect();
assert_eq!(types[0], "run_start");
assert_eq!(*types.last().unwrap(), "run_finish");
6. 使用真实 LLM 测试(在线测试)
要针对真实的提供商进行端到端验证,可使用 GenaiExecutor 并通过环境变量传递凭据。将这些测试标记为 #[ignore],使其仅在显式请求时运行。
use awaken::engine::GenaiExecutor;
#[tokio::test]
#[ignore] // Run with: cargo test -- --ignored
async fn live_llm_responds() {
// Requires: OPENAI_API_KEY or (LLM_BASE_URL + LLM_API_KEY)
let model = std::env::var("LLM_MODEL").unwrap_or_else(|_| "gpt-4o-mini".into());
let llm = Arc::new(GenaiExecutor::new());
let agent = ResolvedAgent::new(
"live-test",
&model,
"You are a test assistant. Answer in one word.",
llm,
);
// ... set up resolver, store, runtime, sink as in section 4 ...
let result = run_agent_loop(AgentLoopParams {
resolver: &resolver,
agent_id: "live-test",
runtime: &runtime,
sink: sink.clone(),
checkpoint_store: None,
messages: vec![Message::user("What is 2+2? Answer in one word.")],
run_identity: test_identity(),
cancellation_token: None,
decision_rx: None,
overrides: None,
frontend_tools: Vec::new(),
})
.await
.unwrap();
assert!(!result.response.is_empty());
}
运行在线测试:
# OpenAI 兼容的提供商
OPENAI_API_KEY=<your-key> LLM_MODEL=gpt-4o-mini cargo test -- --ignored
# 自定义端点(例如 BigModel)
LLM_BASE_URL=https://open.bigmodel.cn/api/paas/v4/ \
LLM_API_KEY=<key> \
LLM_MODEL=GLM-4.7-Flash \
cargo test -- --ignored
完整的可运行示例(含控制台输出)请参见 examples/live_test.rs 和 examples/tool_call_live.rs。
关键文件
crates/awaken-contract/src/contract/tool.rs– Tool trait、ToolCallContext::test_default()、ToolResult、ToolOutputcrates/awaken-contract/src/contract/event_sink.rs– VecEventSinkcrates/awaken-runtime/src/engine/mock.rs– MockLlmExecutorcrates/awaken-runtime/src/state/mod.rs– StateStore、StateCommandcrates/awaken-runtime/src/loop_runner/mod.rs–run_agent_loop、AgentLoopParams、AgentRunResultcrates/awaken-runtime/tests/– 集成测试套件(事件生命周期、工具副作用)
相关文档
概览
awaken crate 是 Awaken 的公开门面。它把 awaken-contract、awaken-runtime、awaken-stores 以及若干扩展 crate 的公共 API 重新导出为一个统一依赖面。
模块再导出
| 门面路径 | 来源 crate | 内容 |
|---|---|---|
awaken::contract | awaken-contract | tool、event、message、suspension、lifecycle 等契约 |
awaken::model | awaken-contract | Phase、EffectSpec、ScheduledActionSpec、JsonValue |
awaken::registry_spec | awaken-contract | AgentSpec、ModelSpec、ProviderSpec、McpServerSpec、PluginConfigKey |
awaken::state | awaken-contract + awaken-runtime | StateKey、StateMap、Snapshot、StateStore、MutationBatch |
awaken::agent | awaken-runtime | agent 配置与状态 |
awaken::builder | awaken-runtime | AgentRuntimeBuilder、BuildError |
awaken::context | awaken-runtime | PhaseContext |
awaken::engine | awaken-runtime | LLM 执行层抽象 |
awaken::execution | awaken-runtime | ExecutionEnv |
awaken::extensions | awaken-runtime | 内置扩展基础设施 |
awaken::loop_runner | awaken-runtime | agent loop 执行器 |
awaken::phase | awaken-runtime | PhaseRuntime、PhaseHook |
awaken::plugins | awaken-runtime | Plugin、PluginRegistrar |
awaken::policies | awaken-runtime | context window / retry policy |
awaken::registry | awaken-runtime | AgentResolver、ResolvedAgent |
awaken::runtime | awaken-runtime | AgentRuntime |
awaken::stores | awaken-stores | file / postgres / memory store |
受 feature flag 控制的模块
| 门面路径 | feature flag | 来源 crate |
|---|---|---|
awaken::ext_permission | permission | awaken-ext-permission |
awaken::ext_observability | observability | awaken-ext-observability |
awaken::ext_mcp | mcp | awaken-ext-mcp |
awaken::ext_skills | skills | awaken-ext-skills |
awaken::ext_generative_ui | generative-ui | awaken-ext-generative-ui |
awaken::ext_reminder | reminder | awaken-ext-reminder |
awaken::server | server | awaken-server |
根级再导出
常用类型还会直接从 crate root 导出,例如:
- 来自
awaken-contract:AgentSpec、KeyScope、MergeStrategy、Phase、StateKey、StateMap、Snapshot - 来自
awaken-runtime:AgentRuntime、AgentRuntimeBuilder、BuildError、RunRequest、RuntimeError、PhaseHook
Feature Flags
| Flag | 默认开启 | 说明 |
|---|---|---|
permission | yes | 工具级权限控制与 HITL |
observability | yes | tracing 与 metrics |
mcp | yes | MCP 工具桥接 |
skills | yes | 技能子系统 |
reminder | yes | 工具执行后的提醒注入 |
server | yes | HTTP / SSE / protocol server |
generative-ui | yes | 生成式 UI 组件流 |
full | yes | 上述功能全集 |
独立工作区扩展 crate 也可能存在但未接到门面 feature 上;当前包括 awaken-ext-deferred-tools。
相关
Tool Trait
Tool 是 Awaken 暴露能力给 LLM 的主扩展点。tool 接收 JSON 参数和只读上下文,返回 ToolOutput。
Trait 定义
#[async_trait]
pub trait Tool: Send + Sync {
fn descriptor(&self) -> ToolDescriptor;
fn validate_args(&self, _args: &Value) -> Result<(), ToolError> {
Ok(())
}
async fn execute(
&self,
args: Value,
ctx: &ToolCallContext,
) -> Result<ToolOutput, ToolError>;
}
ToolDescriptor
描述 tool 的 ID、名称、说明和参数 schema。它会被注册到运行时,并在推理请求里暴露给 LLM。
pub struct ToolDescriptor {
pub id: String,
pub name: String,
pub description: String,
pub parameters: Value,
pub category: Option<String>,
}
Builder 方法:
ToolDescriptor::new(id, name, description) -> Self
.with_parameters(schema: Value) -> Self
.with_category(category: impl Into<String>) -> Self
ToolResult
Tool::execute 返回的结构化结果。
pub struct ToolResult {
pub tool_name: String,
pub status: ToolStatus,
pub data: Value,
pub message: Option<String>,
pub suspension: Option<Box<SuspendTicket>>,
}
ToolStatus
pub enum ToolStatus {
Success,
Pending,
Error,
}
构造函数
| 方法 | 状态 | 用途 |
|---|---|---|
ToolResult::success(name, data) | Success | 正常完成 |
ToolResult::success_with_message(name, data, msg) | Success | 带补充说明的成功 |
ToolResult::error(name, message) | Error | 可恢复失败 |
ToolResult::error_with_code(name, code, message) | Error | 带 code 的结构化失败 |
ToolResult::suspended(name, message) | Pending | HITL 挂起 |
ToolResult::suspended_with(name, message, ticket) | Pending | 带 SuspendTicket 的挂起 |
判定方法
is_success()is_pending()is_error()to_json()
ToolError
ToolError 和 ToolResult::error(...) 的区别是:前者直接终止该次 tool call,后者会把失败信息回传给 LLM。
pub enum ToolError {
InvalidArguments(String),
ExecutionFailed(String),
Denied(String),
NotFound(String),
Internal(String),
}
ToolCallContext
tool 执行期拿到的只读上下文:
pub struct ToolCallContext {
pub call_id: String,
pub tool_name: String,
pub run_identity: RunIdentity,
pub agent_spec: Arc<AgentSpec>,
pub snapshot: Snapshot,
pub activity_sink: Option<Arc<dyn EventSink>>,
}
方法
fn state<K: StateKey>(&self) -> Option<&K::Value>
async fn report_activity(&self, activity_type: &str, content: &str)
async fn report_activity_delta(&self, activity_type: &str, patch: Value)
async fn report_progress(
&self,
status: ProgressStatus,
message: Option<&str>,
progress: Option<f64>,
)
示例
最小 tool
use async_trait::async_trait;
use awaken::contract::tool::{Tool, ToolCallContext, ToolDescriptor, ToolError, ToolResult, ToolOutput};
use serde_json::{Value, json};
struct Greet;
#[async_trait]
impl Tool for Greet {
fn descriptor(&self) -> ToolDescriptor {
ToolDescriptor::new("greet", "greet", "Greet a user by name")
.with_parameters(json!({
"type": "object",
"properties": {
"name": { "type": "string" }
},
"required": ["name"]
}))
}
async fn execute(
&self,
args: Value,
_ctx: &ToolCallContext,
) -> Result<ToolOutput, ToolError> {
let name = args["name"]
.as_str()
.ok_or_else(|| ToolError::InvalidArguments("name required".into()))?;
Ok(ToolResult::success("greet", json!({ "greeting": format!("Hello, {name}!") })).into())
}
}
从上下文读取状态
use async_trait::async_trait;
use awaken::contract::tool::{Tool, ToolCallContext, ToolDescriptor, ToolError, ToolResult, ToolOutput};
use awaken::state::StateKey;
use serde_json::{Value, json};
struct GetPreferences;
#[async_trait]
impl Tool for GetPreferences {
fn descriptor(&self) -> ToolDescriptor {
ToolDescriptor::new("get_prefs", "get_preferences", "Get user preferences")
}
async fn execute(
&self,
_args: Value,
ctx: &ToolCallContext,
) -> Result<ToolOutput, ToolError> {
// let prefs = ctx.state::<UserPreferences>().cloned().unwrap_or_default();
Ok(ToolResult::success("get_prefs", json!({})).into())
}
}
Tool 执行钩子
每次 tool call 在真正执行前后都会经过插件钩子。
完整生命周期
LLM 选择 tool
-> validate_args()
-> BeforeToolExecute
插件可调度 ToolInterceptAction:
Block
Suspend
SetResult
-> execute()
-> AfterToolExecute
BeforeToolExecute
参数校验后运行。插件可以在这里阻断、挂起,或直接提供预构造结果。
拦截优先级:
Block > Suspend > SetResult
AfterToolExecute
tool 完成后运行。插件可以观察 ToolResult、更新状态、追加事件或调度后续 action。
ToolCallStatus 转移
New -> Running -> Succeeded
Failed
Suspended -> Resuming -> Running -> ...
Cancelled
终态(Succeeded、Failed、Cancelled)不能再向前转移。
相关
Scheduled Actions
Scheduled action 是插件、tool 和运行时在 phase 收敛循环里发起副作用的主要机制。任何 hook、tool 或内部模块都可以通过 StateCommand::schedule_action::<A>(payload) 调度一个 action,运行时会在目标 phase 的 EXECUTE 阶段把它交给对应 handler。
工作方式
Hook / Tool Runtime
| |
|-- StateCommand ----------->| (包含 scheduled_actions)
| |-- commit state updates
| |-- dispatch to handler(A, p)
| | |
| | |-- handler returns StateCommand
| |<-----'
| |-- commit handler results
从 hook 调度
use awaken_runtime::agent::state::ExcludeTool;
async fn run(&self, ctx: &PhaseContext) -> Result<StateCommand, StateError> {
let mut cmd = StateCommand::new();
cmd.schedule_action::<ExcludeTool>("dangerous_tool".into())?;
Ok(cmd)
}
从 tool 调度
use awaken_runtime::agent::state::AddContextMessage;
use awaken_contract::contract::context_message::ContextMessage;
async fn execute(&self, args: Value, ctx: &ToolCallContext) -> Result<ToolOutput, ToolError> {
let mut cmd = StateCommand::new();
cmd.schedule_action::<AddContextMessage>(
ContextMessage::system("my_tool.hint", "Remember to check the docs."),
)?;
Ok(ToolOutput::with_command(
ToolResult::success("my_tool", json!({"ok": true})),
cmd,
))
}
核心 Actions(awaken-runtime)
AddContextMessage
| Key | runtime.add_context_message |
| Phase | BeforeInference |
| Payload | ContextMessage |
向当前步骤的推理上下文注入一条 context message。
SetInferenceOverride
| Key | runtime.set_inference_override |
| Phase | BeforeInference |
| Payload | InferenceOverride |
覆盖当前步骤的推理参数,如 model、temperature、max_tokens 等。
ExcludeTool
| Key | runtime.exclude_tool |
| Phase | BeforeInference |
| Payload | String(tool ID) |
把某个 tool 从当前步骤提供给 LLM 的工具集合中移除。
IncludeOnlyTools
| Key | runtime.include_only_tools |
| Phase | BeforeInference |
| Payload | Vec<String> |
把当前步骤的工具集合限制为指定白名单。
ToolInterceptAction
| Key | tool_intercept |
| Phase | BeforeToolExecute |
| Payload | ToolInterceptPayload |
在 tool 真正执行前拦截:
| 变体 | 效果 |
|---|---|
Block { reason } | 阻断 tool,run 终止 |
Suspend(SuspendTicket) | 挂起 tool,等待外部决策 |
SetResult(ToolResult) | 直接使用预构造结果,跳过执行 |
优先级:
Block > Suspend > SetResult
Deferred Tools Actions(awaken-ext-deferred-tools)
DeferToolAction
| Key | deferred_tools.defer |
| Phase | BeforeInference |
| Payload | Vec<String> |
把工具切换到 Deferred 模式,从 LLM 工具列表里移除,由 ToolSearch 间接暴露。
PromoteToolAction
| Key | deferred_tools.promote |
| Phase | BeforeInference |
| Payload | Vec<String> |
把工具从 Deferred 提升回 Eager 模式。
插件 Action 使用矩阵
| 插件 | AddContext | SetOverride | Exclude | IncludeOnly | Intercept | Defer | Promote |
|---|---|---|---|---|---|---|---|
permission | X | X | |||||
skills | X | ||||||
reminder | X | ||||||
deferred-tools | X | X | X | X | |||
observability | |||||||
mcp | |||||||
generative-ui |
定义自定义 action
插件可以通过实现 ScheduledActionSpec 来定义自己的 action:
use awaken_contract::model::{Phase, ScheduledActionSpec};
pub struct MyCustomAction;
impl ScheduledActionSpec for MyCustomAction {
const KEY: &'static str = "my_plugin.custom_action";
const PHASE: Phase = Phase::BeforeInference;
type Payload = MyPayload;
}
在 register() 中挂上 handler:
fn register(&self, r: &mut PluginRegistrar) -> Result<(), StateError> {
r.register_scheduled_action::<MyCustomAction, _>(MyHandler)?;
Ok(())
}
收敛与级联
scheduled actions 在 phase 收敛循环内部执行。某个 handler 可以再为同一 phase 调度新的 action,于是运行时会继续下一轮 dispatch,直到没有新 action 产生。
循环工作方式
Phase EXECUTE stage:
round 1: dispatch queued actions -> handlers return StateCommands
commit state, collect newly scheduled actions
round 2: dispatch new actions
...
round N: no new actions -> phase converges
限制
循环上限是 DEFAULT_MAX_PHASE_ROUNDS(当前默认 16)。如果超过上限仍不断产生 action,会返回 StateError::PhaseRunLoopExceeded。
失败 action
handler 返回错误时不会重试,失败会被写入 FailedScheduledActions:
let failed = store.read::<FailedScheduledActions>().unwrap_or_default();
assert!(failed.is_empty(), "expected no failed actions");
Effects
Effect 是类型化的、fire-and-forget 的副作用事件。和会在 phase 收敛循环内部执行、还能继续级联的 scheduled actions 不同,effect 在 commit 之后才分发,而且 handler 不能再返回新的 StateCommand。
常见用途:审计日志、外部 webhook、指标上报、通知投递。
EffectSpec trait
pub trait EffectSpec: 'static + Send + Sync {
const KEY: &'static str;
type Payload: Serialize + DeserializeOwned + Send + Sync + 'static;
}
约定 KEY 采用 "<plugin>.<effect_name>" 这类全局唯一名字。
发出 effect
通过 StateCommand::emit::<E>(payload) 发出:
use awaken::{StateCommand, StateError};
async fn run(&self, ctx: &PhaseContext) -> Result<StateCommand, StateError> {
let mut cmd = StateCommand::new();
cmd.emit::<AuditEffect>(AuditPayload {
action: "user_login".into(),
actor: "agent-1".into(),
})?;
Ok(cmd)
}
TypedEffect 封装
运行时内部用 TypedEffect 做类型擦除:
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct TypedEffect {
pub key: String,
pub payload: JsonValue,
}
TypedEffect::from_spec::<E>(payload):把 typed payload 序列化成TypedEffectTypedEffect::decode::<E>():把 JSON 反序列化回具体 payload
注册 effect handler
#[async_trait]
pub trait TypedEffectHandler<E>: Send + Sync + 'static
where
E: EffectSpec,
{
async fn handle_typed(
&self,
payload: E::Payload,
snapshot: &Snapshot,
) -> Result<(), String>;
}
关键点:
- handler 收到的是 post-commit
Snapshot - 返回值是
Result<(), String>,不是StateError - handler 失败会被记录,但不会回滚已提交的状态
注册方式:
fn register(&self, r: &mut PluginRegistrar) -> Result<(), StateError> {
r.register_effect::<AuditEffect, _>(AuditEffectHandler)?;
Ok(())
}
分发生命周期
- hook / action handler / tool 调用
emit::<E>() submit_command校验所有 effect key 是否都有 handler- 状态变更提交到 store
- 依次调用每个 handler
- handler 失败只记录,不影响后续 effect
Hook / Tool Runtime
| |
|-- StateCommand (with effects) ->|
| |-- validate all effect keys
| |-- commit state mutations
| |-- dispatch effects sequentially
|<--------------------------------|
示例
定义 effect:
use awaken::EffectSpec;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AuditPayload {
pub action: String,
pub actor: String,
}
pub struct AuditEffect;
impl EffectSpec for AuditEffect {
const KEY: &'static str = "audit.record";
type Payload = AuditPayload;
}
在 hook 中发出:
use async_trait::async_trait;
use awaken::{PhaseContext, PhaseHook, StateCommand, StateError};
pub struct AuditHook;
#[async_trait]
impl PhaseHook for AuditHook {
async fn run(&self, ctx: &PhaseContext) -> Result<StateCommand, StateError> {
let mut cmd = StateCommand::new();
cmd.emit::<AuditEffect>(AuditPayload {
action: "phase_entered".into(),
actor: "system".into(),
})?;
Ok(cmd)
}
}
处理 effect:
use async_trait::async_trait;
use awaken::{Snapshot, TypedEffectHandler};
pub struct AuditEffectHandler;
#[async_trait]
impl TypedEffectHandler<AuditEffect> for AuditEffectHandler {
async fn handle_typed(
&self,
payload: AuditPayload,
_snapshot: &Snapshot,
) -> Result<(), String> {
tracing::info!(
action = %payload.action,
actor = %payload.actor,
"audit effect dispatched"
);
Ok(())
}
}
Effects 与 Scheduled Actions 的区别
| Effects | Scheduled Actions | |
|---|---|---|
| 执行时机 | commit 后 | phase 收敛循环内 |
| 是否可级联 | 否 | 是 |
能否产出 StateCommand | 否 | 是 |
| 失败处理 | 记录日志,不阻塞 | 错误会上抛 |
| 状态可见性 | post-commit snapshot | pre-commit context |
| 适用场景 | 外部 I/O、日志、指标 | 内部控制流、状态变更 |
另见
事件
agent loop 在执行过程中会持续发出 AgentEvent。这些事件既会被协议编码器消费,也会通过 SSE 发给客户端。
AgentEvent
所有变体在 JSON 中都使用 event_type 作为标签字段(#[serde(tag = "event_type", rename_all = "snake_case")])。
pub enum AgentEvent {
RunStart {
thread_id: String,
run_id: String,
parent_run_id: Option<String>, // 为 None 时省略
},
RunFinish {
thread_id: String,
run_id: String,
result: Option<Value>, // 为 None 时省略
termination: TerminationReason,
},
TextDelta { delta: String },
ReasoningDelta { delta: String },
ToolCallStart { id: String, name: String },
ToolCallDelta { id: String, args_delta: String },
ToolCallReady {
id: String,
name: String,
arguments: Value,
},
ToolCallDone {
id: String,
message_id: String,
result: ToolResult,
outcome: ToolCallOutcome,
},
ToolCallStreamDelta {
id: String,
name: String,
delta: String,
},
ReasoningEncryptedValue { encrypted_value: String },
MessagesSnapshot { messages: Vec<Value> },
ActivitySnapshot {
message_id: String,
activity_type: String,
content: Value,
replace: Option<bool>, // 为 None 时省略
},
ActivityDelta {
message_id: String,
activity_type: String,
patch: Vec<Value>,
},
ToolCallResumed { target_id: String, result: Value },
StepStart { message_id: String },
StepEnd,
InferenceComplete {
model: String,
usage: Option<TokenUsage>, // 为 None 时省略
duration_ms: u64,
},
StateSnapshot { snapshot: Value },
StateDelta { delta: Vec<Value> },
Error {
message: String,
code: Option<String>, // 为 None 时省略
},
}
Crate 路径: awaken::contract::event::AgentEvent
Helper
impl AgentEvent {
/// 从 RunFinish 的 result 中提取响应文本。
pub fn extract_response(result: &Option<Value>) -> String
}
SSE 传输格式
事件通过 HTTP Server-Sent Events 传输。每帧使用 SSE 的 id 字段作为单调递增的序列号,支持断点续连:
id: {seq}
data: {json}
其中 {json} 是 AgentEvent 的 JSON 序列化结果。序列号从 0 开始,run 内每发出一个事件递增 1。
当流空闲时,服务器还会定期发送注释行(: heartbeat),以防止客户端或代理因长时间推理暂停而超时。客户端应忽略这些注释行,它们不携带任何事件数据。
该功能由 awaken-server::http_sse 中的 format_sse_data_with_id 实现。
Run 输入
向 run 推送消息时,POST 到 /v1/runs/{run_id}/inputs,请求体反序列化为 PushRunInputsPayload:
struct PushRunInputsPayload {
messages: Vec<RunMessage>,
}
struct RunMessage {
role: String,
content: String,
}
至少需要一条消息。成功时服务器返回 202 Accepted。
RunOutput
run 返回的事件流类型别名:
pub type RunOutput = futures::stream::BoxStream<'static, AgentEvent>;
TerminationReason
run 终止的原因。使用 #[serde(tag = "type", content = "value", rename_all = "snake_case")] 序列化。
- 无负载的单元变体序列化为
{ "type": "natural_end" },不会输出"value"键。 - 带负载的元组变体序列化为
{ "type": "stopped", "value": { ... } }。
pub enum TerminationReason {
NaturalEnd,
BehaviorRequested,
Stopped(StoppedReason),
Cancelled,
Blocked(String),
Suspended,
Error(String),
}
StoppedReason
TerminationReason::Stopped 携带的负载。
pub struct StoppedReason {
pub code: String,
pub detail: Option<String>, // 为 None 时省略
}
ToolCallOutcome
pub enum ToolCallOutcome {
Succeeded,
Failed,
Suspended,
}
ToolResult
工具执行结果,在 ToolCallDone 中携带。定义于 awaken::contract::tool::ToolResult。
pub struct ToolResult {
pub tool_name: String,
pub status: ToolStatus, // "success" | "pending" | "error"
pub data: Value,
pub message: Option<String>,
pub suspension: Option<Box<SuspendTicket>>, // 为 None 时省略
pub metadata: HashMap<String, Value>, // 为空时省略
}
TokenUsage
pub struct TokenUsage {
pub prompt_tokens: Option<i32>,
pub completion_tokens: Option<i32>,
pub total_tokens: Option<i32>,
pub cache_read_tokens: Option<i32>,
pub cache_creation_tokens: Option<i32>,
pub thinking_tokens: Option<i32>,
}
所有字段为 None 时均从 JSON 中省略。TokenUsage::default() 产生所有字段均为 None 的值。
相关
HTTP API
启用 server feature 后,awaken-server 会通过 Axum 暴露 HTTP API。大多数接口返回 JSON,流式接口返回 SSE。
本页对应当前代码里的路由树:crates/awaken-server/src/routes.rs 与 crates/awaken-server/src/config_routes.rs。
健康检查与指标
| 方法 | 路径 | 说明 |
|---|---|---|
GET | /health | 就绪探针;检查 store 连通性,返回 200 或 503 |
GET | /health/live | 存活探针;始终返回 200 |
GET | /metrics | Prometheus 指标抓取口 |
Threads
| 方法 | 路径 | 说明 |
|---|---|---|
GET | /v1/threads | 列出 thread ID |
POST | /v1/threads | 创建 thread |
GET | /v1/threads/summaries | 列出 thread 摘要 |
GET | /v1/threads/:id | 获取 thread |
PATCH | /v1/threads/:id | 更新 thread 元信息 |
DELETE | /v1/threads/:id | 删除 thread |
POST | /v1/threads/:id/cancel | 取消该 thread 上排队或运行中的某个 job;返回 cancel_requested |
POST | /v1/threads/:id/decision | 向该 thread 上等待中的 run 提交 HITL decision |
POST | /v1/threads/:id/interrupt | 中断该 thread:递增 thread generation、取消所有待执行 job、中止活动 run;返回 interrupt_requested 及 superseded_jobs 计数。与 /cancel 不同,此接口通过 mailbox.interrupt() 执行完整的“清空并中断“操作 |
PATCH | /v1/threads/:id/metadata | 更新 metadata 的别名接口 |
GET | /v1/threads/:id/messages | 列出消息 |
POST | /v1/threads/:id/messages | 作为后台 run 提交消息 |
POST | /v1/threads/:id/mailbox | 向 mailbox 推送消息载荷 |
GET | /v1/threads/:id/mailbox | 查看该 thread 的 mailbox job |
GET | /v1/threads/:id/runs | 列出该 thread 的 runs |
GET | /v1/threads/:id/runs/latest | 获取最新 run |
Runs
| 方法 | 路径 | 说明 |
|---|---|---|
GET | /v1/runs | 列出 runs |
POST | /v1/runs | 启动 run,并通过 SSE 返回事件 |
GET | /v1/runs/:id | 获取 run 记录 |
POST | /v1/runs/:id/inputs | 向同一 thread 追加后续输入 |
POST | /v1/runs/:id/cancel | 按 run ID 取消 |
POST | /v1/runs/:id/decision | 按 run ID 提交 HITL decision |
Config 与 Capabilities
这些接口由 config_routes() 提供。读取与 schema 接口要求 AppState
挂接 config store;写接口还要求挂接 config runtime manager,才能在写入后
校验并发布新的 registry snapshot。缺少这些配置时会返回 400,错误为
config management API not enabled。
| 方法 | 路径 | 说明 |
|---|---|---|
GET | /v1/capabilities | 列出 agents、tools、plugins、models、providers 和 config namespaces |
GET | /v1/config/:namespace | 列出某个 namespace 下的配置项 |
POST | /v1/config/:namespace | 创建配置项,body 必须含 "id" |
GET | /v1/config/:namespace/:id | 获取单个配置项 |
PUT | /v1/config/:namespace/:id | 整体替换配置项 |
DELETE | /v1/config/:namespace/:id | 删除配置项 |
GET | /v1/config/:namespace/$schema | 获取该 namespace 的 JSON Schema |
GET | /v1/agents | /v1/config/agents 的便捷别名 |
GET | /v1/agents/:id | /v1/config/agents/:id 的便捷别名 |
当前内置 namespace:
agentsmodelsprovidersmcp-servers
AI SDK v6 路由
| 方法 | 路径 | 说明 |
|---|---|---|
POST | /v1/ai-sdk/chat | 启动 chat run,并流式返回 AI SDK 编码事件 |
POST | /v1/ai-sdk/threads/:thread_id/runs | 在指定 thread 上启动 run |
POST | /v1/ai-sdk/agents/:agent_id/runs | 在指定 agent 上启动 run |
GET | /v1/ai-sdk/chat/:thread_id/stream | 按 thread ID 续接 SSE |
GET | /v1/ai-sdk/threads/:thread_id/stream | 同上别名 |
GET | /v1/ai-sdk/threads/:thread_id/messages | 列出 thread 消息 |
POST | /v1/ai-sdk/threads/:thread_id/cancel | 取消该 thread 上活动或排队中的 run |
POST | /v1/ai-sdk/threads/:thread_id/interrupt | 中断 thread(递增 generation、取消待执行 job、中止活动 run) |
AG-UI 路由
| 方法 | 路径 | 说明 |
|---|---|---|
POST | /v1/ag-ui/run | 启动 AG-UI run,并流式返回 AG-UI 事件 |
POST | /v1/ag-ui/threads/:thread_id/runs | 在指定 thread 上启动 run |
POST | /v1/ag-ui/agents/:agent_id/runs | 在指定 agent 上启动 run |
POST | /v1/ag-ui/threads/:thread_id/interrupt | 中断 thread |
GET | /v1/ag-ui/threads/:id/messages | 列出 thread 消息 |
A2A 路由
| 方法 | 路径 | 说明 |
|---|---|---|
GET | /.well-known/agent-card.json | 获取公共/默认 agent card |
POST | /v1/a2a/message:send | 向公共/默认 A2A agent 发送消息 |
POST | /v1/a2a/message:stream | 通过 SSE 进行流式发送 |
GET | /v1/a2a/tasks | 列出 A2A 任务 |
GET | /v1/a2a/tasks/:task_id | 查询任务状态 |
POST | /v1/a2a/tasks/:task_id:cancel | 取消任务 |
POST | /v1/a2a/tasks/:task_id:subscribe | 通过 SSE 订阅任务更新 |
POST | /v1/a2a/tasks/:task_id/pushNotificationConfigs | 创建推送通知配置 |
GET | /v1/a2a/tasks/:task_id/pushNotificationConfigs | 列出推送通知配置 |
GET | /v1/a2a/tasks/:task_id/pushNotificationConfigs/:config_id | 获取推送通知配置 |
DELETE | /v1/a2a/tasks/:task_id/pushNotificationConfigs/:config_id | 删除推送通知配置 |
GET | /v1/a2a/extendedAgentCard | 获取扩展 agent card;未启用时返回 501 |
POST | /v1/a2a/:tenant/message:send | 向 tenant 作用域 agent 发送消息 |
POST | /v1/a2a/:tenant/message:stream | tenant 作用域流式发送 |
GET | /v1/a2a/:tenant/tasks | 列出 tenant 作用域任务 |
GET | /v1/a2a/:tenant/tasks/:task_id | 查询 tenant 作用域任务状态 |
POST | /v1/a2a/:tenant/tasks/:task_id:cancel | 取消 tenant 作用域任务 |
POST | /v1/a2a/:tenant/tasks/:task_id:subscribe | 订阅 tenant 作用域任务更新 |
GET | /v1/a2a/:tenant/extendedAgentCard | 获取 tenant 作用域扩展 agent card |
MCP HTTP 路由
| 方法 | 路径 | 说明 |
|---|---|---|
POST | /v1/mcp | MCP JSON-RPC 请求/响应入口 |
GET | /v1/mcp | 为 MCP 服务端主动 SSE 预留;当前返回 405 |
常见查询参数
offset:跳过的条数limit:返回上限,范围会被限制在1..=200cursor:消息历史分页游标;提供后会优先于offset,历史消息接口响应会返回next_cursorstatus:按 run 状态过滤,支持running、waiting、donevisibility:消息可见性过滤;省略时只看外部消息,all表示包含内部消息
错误格式
大多数接口返回:
{ "error": "human-readable message" }
MCP 接口返回 JSON-RPC 错误对象,而不是上面的通用形状。
相关
配置
AgentSpec
AgentSpec 是可序列化的 agent 定义。它既可以从 JSON / YAML 加载,也可以用 builder 方法在代码里构造。
pub struct AgentSpec {
pub id: String,
pub model: String,
pub system_prompt: String,
pub max_rounds: usize,
pub max_continuation_retries: usize,
pub context_policy: Option<ContextWindowPolicy>,
pub reasoning_effort: Option<ReasoningEffort>,
pub plugin_ids: Vec<String>,
pub active_hook_filter: HashSet<String>,
pub allowed_tools: Option<Vec<String>>,
pub excluded_tools: Option<Vec<String>>,
pub endpoint: Option<RemoteEndpoint>,
pub delegates: Vec<String>,
pub sections: HashMap<String, Value>,
pub registry: Option<String>,
}
Crate 路径: awaken::registry_spec::AgentSpec(在 awaken::AgentSpec 重新导出)
Builder 方法
AgentSpec::new(id) -> Self
.with_model(model) -> Self
.with_system_prompt(prompt) -> Self
.with_max_rounds(n) -> Self
.with_reasoning_effort(effort) -> Self
.with_hook_filter(plugin_id) -> Self
.with_config::<K>(config) -> Result<Self, StateError>
.with_delegate(agent_id) -> Self
.with_endpoint(endpoint) -> Self
.with_section(key, value: Value) -> Self
类型化配置访问
fn config<K: PluginConfigKey>(&self) -> Result<K::Config, StateError>
fn set_config<K: PluginConfigKey>(&mut self, config: K::Config) -> Result<(), StateError>
ContextWindowPolicy
控制上下文窗口和自动压缩行为。
pub struct ContextWindowPolicy {
pub max_context_tokens: usize,
pub max_output_tokens: usize,
pub min_recent_messages: usize,
pub enable_prompt_cache: bool,
pub autocompact_threshold: Option<usize>,
pub compaction_mode: ContextCompactionMode,
pub compaction_raw_suffix_messages: usize,
}
ContextCompactionMode
pub enum ContextCompactionMode {
KeepRecentRawSuffix,
CompactToSafeFrontier,
}
InferenceOverride
用于单次推理的参数覆盖。所有字段都是 Option,多插件同时写时按字段 last-wins 合并。
pub struct InferenceOverride {
pub model: Option<String>,
pub fallback_models: Option<Vec<String>>,
pub temperature: Option<f64>,
pub max_tokens: Option<u32>,
pub top_p: Option<f64>,
pub reasoning_effort: Option<ReasoningEffort>,
}
方法
fn is_empty(&self) -> bool
fn merge(&mut self, other: InferenceOverride)
ReasoningEffort
pub enum ReasoningEffort {
None,
Low,
Medium,
High,
Max,
Budget(u32),
}
PluginConfigKey trait
把配置 section 名称和 Rust 配置结构绑定在一起:
pub trait PluginConfigKey: 'static + Send + Sync {
const KEY: &'static str;
type Config: Default + Clone + Serialize + DeserializeOwned
+ schemars::JsonSchema + Send + Sync + 'static;
}
RemoteEndpoint
远程 backend agent 的配置。当前内置的是 "a2a" backend,backend 专有参数放在 options 中:
pub struct RemoteEndpoint {
pub backend: String,
pub base_url: String,
pub auth: Option<RemoteAuth>,
pub target: Option<String>,
pub timeout_ms: u64,
pub options: BTreeMap<String, Value>,
}
pub struct RemoteAuth {
pub r#type: String,
// backend 专有认证字段,例如 bearer 用 { "token": "..." }
}
ServerConfig
HTTP server 配置。需启用 server feature。
pub struct ServerConfig {
pub address: String, // default: "0.0.0.0:3000"
pub sse_buffer_size: usize, // default: 64
pub replay_buffer_capacity: usize, // default: 1024
pub shutdown: ShutdownConfig,
pub max_concurrent_requests: usize, // default: 100
pub a2a_extended_card_bearer_token: Option<String>,
}
pub struct ShutdownConfig {
pub timeout_secs: u64, // default: 30
}
Crate 路径: awaken_server::app::ServerConfig
| 字段 | 类型 | 默认值 | 说明 |
|---|---|---|---|
address | String | "0.0.0.0:3000" | 服务器绑定的 socket 地址 |
sse_buffer_size | usize | 64 | 单连接 SSE 通道最大缓冲帧数 |
replay_buffer_capacity | usize | 1024 | 每次 run 用于断线续接的最大 replay buffer 帧数 |
max_concurrent_requests | usize | 100 | 最大并发请求数;超出时返回 503 |
a2a_extended_card_bearer_token | Option<String> | None | 设置后启用带认证的 GET /v1/a2a/extendedAgentCard |
shutdown.timeout_secs | u64 | 30 | 强制退出前等待飞行中请求排空的秒数 |
MailboxConfig
mailbox 持久化队列配置。控制租约计时、扫描/GC 间隔以及失败任务的重试行为。
pub struct MailboxConfig {
pub lease_ms: u64, // default: 30_000
pub suspended_lease_ms: u64, // default: 600_000
pub lease_renewal_interval: Duration, // default: 10s
pub sweep_interval: Duration, // default: 30s
pub gc_interval: Duration, // default: 60s
pub gc_ttl: Duration, // default: 24h
pub default_max_attempts: u32, // default: 5
pub default_retry_delay_ms: u64, // default: 250
pub max_retry_delay_ms: u64, // default: 30_000
}
Crate 路径: awaken_server::mailbox::MailboxConfig
| 字段 | 类型 | 默认值 | 说明 |
|---|---|---|---|
lease_ms | u64 | 30_000 | 活跃 run 的租约时长(毫秒) |
suspended_lease_ms | u64 | 600_000 | 等待人工输入的挂起 run 的租约时长(毫秒) |
lease_renewal_interval | Duration | 10s | worker 续约频率 |
sweep_interval | Duration | 30s | 扫描过期租约、回收孤儿任务的频率 |
gc_interval | Duration | 60s | 对已终止(完成/失败)任务进行垃圾回收的频率 |
gc_ttl | Duration | 24h | 已终止任务在被清除前的保留时长 |
default_max_attempts | u32 | 5 | 任务进入死信队列前的最大投递次数 |
default_retry_delay_ms | u64 | 250 | 两次重试之间的基础延迟(毫秒) |
max_retry_delay_ms | u64 | 30_000 | 指数退避的最大延迟上限(毫秒) |
LlmRetryPolicy
LLM 推理失败后的重试与 fallback model 策略,支持指数退避。可通过 AgentSpec 的 "retry" section 按 agent 配置。
pub struct LlmRetryPolicy {
pub max_retries: u32, // default: 2
pub fallback_models: Vec<String>, // default: []
pub backoff_base_ms: u64, // default: 500
}
Crate 路径: awaken_runtime::engine::retry::LlmRetryPolicy
| 字段 | 类型 | 默认值 | 说明 |
|---|---|---|---|
max_retries | u32 | 2 | 初次调用后的最大重试次数(0 表示不重试) |
fallback_models | Vec<String> | [] | 主模型耗尽重试后依次尝试的备用模型列表 |
backoff_base_ms | u64 | 500 | 指数退避的基础延迟(毫秒);实际延迟 = min(base × 2^attempt, 8000ms)。设为 0 可禁用退避 |
AgentSpec 集成
通过 "retry" section 配置:
use awaken_runtime::engine::retry::RetryConfigKey;
let spec = AgentSpec::new("my-agent")
.with_config::<RetryConfigKey>(LlmRetryPolicy {
max_retries: 3,
fallback_models: vec!["claude-sonnet-4-20250514".into()],
backoff_base_ms: 1000,
})?;
CircuitBreakerConfig
每个模型单独维护的熔断器配置。通过短路对失败过多的模型的请求,防止级联故障。冷却期过后熔断器进入半开状态,允许有限的探测请求;成功后完全关闭。
pub struct CircuitBreakerConfig {
pub failure_threshold: u32, // default: 5
pub cooldown: Duration, // default: 30s
pub half_open_max: u32, // default: 1
}
Crate 路径: awaken_runtime::engine::circuit_breaker::CircuitBreakerConfig
| 字段 | 类型 | 默认值 | 说明 |
|---|---|---|---|
failure_threshold | u32 | 5 | 触发熔断器打开并拒绝请求所需的连续失败次数 |
cooldown | Duration | 30s | 熔断器从打开状态过渡到半开状态前的等待时长 |
half_open_max | u32 | 1 | 半开状态下允许的最大探测请求数;失败则重新打开,成功则完全关闭 |
Feature flags 及其效果
| Flag | 运行时行为 |
|---|---|
permission | 注册权限插件,可对工具启用 HITL 审批 |
observability | 注册观测插件,发出 traces / metrics |
mcp | 启用 MCP 工具桥接 |
skills | 启用技能子系统 |
reminder | 注册 reminder 插件,在工具执行后根据模式规则注入上下文消息 |
server | 启用 HTTP / SSE server 与协议适配层 |
generative-ui | 启用生成式 UI 组件流 |
工作区还包含不通过门面 feature 暴露的扩展 crate,当前包括 awaken-ext-deferred-tools。
自定义插件配置
插件通过 PluginConfigKey 声明类型化配置 section,并通过 config_schemas() 提供 JSON Schema,用于 resolve 阶段校验。
声明 schema 用于校验
fn config_schemas(&self) -> Vec<ConfigSchema> {
vec![ConfigSchema {
key: RateLimitConfigKey::KEY,
json_schema: schemars::schema_for!(RateLimitConfig),
}]
}
在运行时读取配置
let cfg = ctx.agent_spec().config::<RateLimitConfigKey>()?;
示例
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use awaken::PluginConfigKey;
#[derive(Debug, Clone, Default, Serialize, Deserialize, JsonSchema)]
pub struct RateLimitConfig {
pub max_calls_per_step: u32,
pub cooldown_ms: u64,
}
pub struct RateLimitConfigKey;
impl PluginConfigKey for RateLimitConfigKey {
const KEY: &'static str = "rate_limit";
type Config = RateLimitConfig;
}
校验行为
- section 存在但不合法:resolve 失败
- section 存在但没有插件声明:记录 warning
- section 缺失:返回
Config::default()
相关
错误
Awaken 的错误类型统一基于 thiserror,并实现 std::error::Error 与 Display。
StateError
状态系统相关错误,定义在 awaken-contract。
pub enum StateError {
RevisionConflict { expected: u64, actual: u64 },
MutationBaseRevisionMismatch { left: u64, right: u64 },
PluginAlreadyInstalled { name: String },
PluginNotInstalled { type_name: &'static str },
KeyAlreadyRegistered { key: String },
UnknownKey { key: String },
KeyDecode { key: String, message: String },
KeyEncode { key: String, message: String },
HandlerAlreadyRegistered { key: String },
EffectHandlerAlreadyRegistered { key: String },
PhaseRunLoopExceeded { phase: Phase, max_rounds: usize },
UnknownScheduledActionHandler { key: String },
UnknownEffectHandler { key: String },
ParallelMergeConflict { key: String },
ToolAlreadyRegistered { tool_id: String },
Cancelled,
}
ToolError
由 Tool::validate_args 或 Tool::execute 返回的错误。和 ToolResult::error(...) 不同,ToolError 会直接中止该次 tool call。
pub enum ToolError {
InvalidArguments(String),
ExecutionFailed(String),
Denied(String),
NotFound(String),
Internal(String),
}
BuildError
AgentRuntimeBuilder::build() 阶段的错误。
pub enum BuildError {
State(StateError),
AgentRegistryConflict(String),
ToolRegistryConflict(String),
ModelRegistryConflict(String),
ProviderRegistryConflict(String),
PluginRegistryConflict(String),
ValidationFailed(String),
DiscoveryFailed(DiscoveryError),
}
RuntimeError
运行时执行错误,例如 agent 无法解析、同一 thread 重入运行等。
pub enum RuntimeError {
State(StateError),
ThreadAlreadyRunning { thread_id: String },
AgentNotFound { agent_id: String },
ResolveFailed { message: String },
}
InferenceExecutionError
LLM 执行层错误。
pub enum InferenceExecutionError {
Provider(String),
RateLimited(String),
Timeout(String),
Cancelled,
}
StorageError
ThreadStore、RunStore、ThreadRunStore 返回的错误。
pub enum StorageError {
NotFound(String),
AlreadyExists(String),
VersionConflict { expected: u64, actual: u64 },
Io(String),
Serialization(String),
}
ResolveError
agent 解析管线中的错误。
pub enum ResolveError {
AgentNotFound(String),
ModelNotFound(String),
ProviderNotFound(String),
PluginNotFound(String),
InvalidPluginConfig { plugin: String, key: String, message: String },
RemoteAgentNotDirectlyRunnable(String),
ToolIdConflict { tool_id: String, source_a: String, source_b: String },
EnvBuild(StateError),
}
UnknownKeyPolicy
反序列化未知状态键时的策略。
pub enum UnknownKeyPolicy {
Error,
Skip,
}
相关
AI SDK v6 协议
AI SDK v6 适配层会把 Awaken 的 AgentEvent 流转换成 Vercel AI SDK v6 UI Message Stream 格式。这样 useChat、useAssistant 等前端无需自定义解析器就能直接消费 Awaken 的输出。
入口
POST /v1/ai-sdk/chat
请求体
{
"messages": [{ "role": "user", "content": "Hello" }],
"threadId": "optional-thread-id",
"agentId": "optional-agent-id"
}
| 字段 | 类型 | 必填 | 说明 |
|---|---|---|---|
messages | AiSdkMessage[] | 是 | 输入消息。内容可以是字符串或 content parts 数组。 |
threadId | string | 否 | 继续已有 thread。省略时会创建新 thread。 |
agentId | string | 否 | 指定 agent。省略时使用默认 agent。 |
响应
SSE 流(text/event-stream),每一行是一个 JSON 编码的 UIStreamEvent。
辅助路由
| 路由 | 方法 | 说明 |
|---|---|---|
/v1/ai-sdk/threads/:thread_id/runs | POST | 在指定 thread 上启动 run |
/v1/ai-sdk/agents/:agent_id/runs | POST | 在指定 agent 上启动 run |
/v1/ai-sdk/chat/:thread_id/stream | GET | 按 thread ID 续接 SSE |
/v1/ai-sdk/threads/:thread_id/stream | GET | 同上,thread 路由别名 |
/v1/ai-sdk/threads/:thread_id/messages | GET | 读取 thread 消息历史 |
/v1/ai-sdk/threads/:thread_id/cancel | POST | 取消 thread |
/v1/ai-sdk/threads/:thread_id/interrupt | POST | 中断 thread |
事件映射
AiSdkEncoder 会把 AgentEvent 映射到 UIStreamEvent:
| AgentEvent | UIStreamEvent |
|---|---|
RunStart | MessageStart + Data("run-info", ...) |
TextDelta | TextStart(如果 block 未打开)+ TextDelta |
ReasoningDelta | ReasoningStart(如果 block 未打开)+ ReasoningDelta |
ReasoningEncryptedValue | ReasoningStart(如果未打开)+ ReasoningDelta |
ToolCallStart | 关闭当前 text/reasoning block,然后发 ToolCallStart |
ToolCallDelta | ToolCallDelta |
ToolCallDone | ToolCallEnd |
StepStart | (无直接映射) |
StepEnd | (无直接映射) |
InferenceComplete | Data("inference-complete", ...) |
MessagesSnapshot | Data("messages-snapshot", ...) |
StateSnapshot | Data("state-snapshot", ...) |
StateDelta | Data("state-delta", ...) |
ActivitySnapshot | Data("activity-snapshot", ...) |
ActivityDelta | Data("activity-delta", ...) |
RunFinish | 关闭当前 block,然后发 Data("finish", ...) 和 Finish |
UIStreamEvent 类型
starttext-start/text-delta/text-endreasoning-start/reasoning-delta/reasoning-endtool-call-start/tool-call-delta/tool-call-enddatafinish
文本块生命周期
编码器会自动管理文本块边界:
- 第一个
TextDelta会打开一个 text block。 - 后续 delta 追加到同一个 block。
- 遇到
ToolCallStart会先关闭当前 block。 - 工具结束后的新文本会重新开一个新 block。
相关
AG-UI 协议
AG-UI 适配层会把 Awaken 的 AgentEvent 流转换成 AG-UI / CopilotKit 事件格式,使 CopilotKit 前端可以直接驱动 Awaken agent。
入口
POST /v1/ag-ui/run
请求体
{
"threadId": "optional-thread-id",
"agentId": "optional-agent-id",
"messages": [{ "role": "user", "content": "Hello" }],
"context": {}
}
| 字段 | 类型 | 必填 | 说明 |
|---|---|---|---|
messages | AgUiMessage[] | 是 | 聊天消息,使用 role 和 content |
threadId | string | 否 | 继续已有 thread |
agentId | string | 否 | 指定目标 agent |
context | object | 否 | 前端上下文透传 |
响应
SSE 流(text/event-stream),每个 frame 是一个 JSON 编码的 AG-UI Event。
辅助路由
| 路由 | 方法 | 说明 |
|---|---|---|
/v1/ag-ui/threads/:thread_id/runs | POST | 在指定 thread 上启动 run |
/v1/ag-ui/agents/:agent_id/runs | POST | 在指定 agent 上启动 run |
/v1/ag-ui/threads/:thread_id/interrupt | POST | 中断指定 thread |
/v1/ag-ui/threads/:id/messages | GET | 读取 thread 消息历史 |
事件映射
| AgentEvent | AG-UI Event |
|---|---|
RunStart | RUN_STARTED |
TextDelta | TEXT_MESSAGE_START + TEXT_MESSAGE_CONTENT |
ReasoningDelta | REASONING_MESSAGE_START + REASONING_MESSAGE_CONTENT |
ToolCallStart | 关闭当前 text/reasoning,然后发 STEP_STARTED、TOOL_CALL_START |
ToolCallDelta | TOOL_CALL_ARGS |
ToolCallDone | TOOL_CALL_END、STEP_FINISHED |
StateSnapshot | STATE_SNAPSHOT |
StateDelta | STATE_DELTA |
RunFinish | RUN_FINISHED 或 RUN_ERROR |
AG-UI 事件类型
RUN_STARTED/RUN_FINISHED/RUN_ERRORTEXT_MESSAGE_START/TEXT_MESSAGE_CONTENT/TEXT_MESSAGE_ENDREASONING_MESSAGE_START/REASONING_MESSAGE_CONTENT/REASONING_MESSAGE_ENDSTEP_STARTED/STEP_FINISHEDTOOL_CALL_START/TOOL_CALL_ARGS/TOOL_CALL_ENDSTATE_SNAPSHOT/STATE_DELTAMESSAGES_SNAPSHOT
角色
AG-UI 消息角色使用小写字符串:system、user、assistant、tool。
文本消息生命周期
- 第一个
TextDelta会发TEXT_MESSAGE_START和TEXT_MESSAGE_CONTENT。 - 后续 delta 只追加
TEXT_MESSAGE_CONTENT。 - 遇到
ToolCallStart或RunFinish时,当前消息会以TEXT_MESSAGE_END关闭。
reasoning 消息采用同样模式。
相关
A2A 协议
A2A 适配器实现了官方 A2A 协议,用于远程 agent 发现、任务委托与 agent 间通信。
Feature gate:server
端点
| 路径 | 方法 | 说明 |
|---|---|---|
/.well-known/agent-card.json | GET | 公共/默认 agent card 发现端点。 |
/v1/a2a/message:send | POST | 向公共/默认 A2A agent 发送消息,返回 task 包装结果。 |
/v1/a2a/message:stream | POST | 通过 SSE 进行流式发送。 |
/v1/a2a/tasks | GET | 列出 A2A 任务。 |
/v1/a2a/tasks/:task_id | GET | 按 task ID 查询状态。 |
/v1/a2a/tasks/:task_id:cancel | POST | 取消运行中的任务。 |
/v1/a2a/tasks/:task_id:subscribe | POST | 通过 SSE 订阅任务更新。 |
/v1/a2a/tasks/:task_id/pushNotificationConfigs | POST | 创建推送通知配置。 |
/v1/a2a/tasks/:task_id/pushNotificationConfigs | GET | 列出推送通知配置。 |
/v1/a2a/tasks/:task_id/pushNotificationConfigs/:config_id | GET / DELETE | 读取或删除推送通知配置。 |
/v1/a2a/extendedAgentCard | GET | 扩展 agent card;只有 capabilities.extendedAgentCard=true 时才受支持。 |
租户/agent 作用域的等价路由位于 /v1/a2a/:tenant/...,例如 /v1/a2a/research/message:send 和 /v1/a2a/research/tasks/:task_id。
Agent Card
发现端点返回描述接口与能力的 AgentCard:
{
"name": "My Agent",
"description": "A helpful assistant",
"supportedInterfaces": [
{
"url": "https://example.com/v1/a2a",
"protocolBinding": "HTTP+JSON",
"protocolVersion": "1.0"
}
],
"version": "1.0.0",
"capabilities": {
"streaming": true,
"pushNotifications": true,
"stateTransitionHistory": false,
"extendedAgentCard": false
},
"defaultInputModes": ["text/plain"],
"defaultOutputModes": ["text/plain"],
"skills": [
{
"id": "general",
"name": "General Q&A",
"description": "Answer general questions",
"tags": ["qa"],
"inputModes": ["text/plain"],
"outputModes": ["text/plain"]
}
]
}
Agent card 由已注册的 AgentSpec 生成。旧版顶层 url / id 字段不会再输出。
Message Send
{
"message": {
"taskId": "optional-client-provided-id",
"contextId": "optional-client-provided-id",
"messageId": "msg-123",
"role": "ROLE_USER",
"parts": [{ "text": "Summarize this document" }]
},
"configuration": {
"returnImmediately": true
}
}
服务端会把 A2A task 映射到 Awaken 的 thread / mailbox 执行链路。响应使用 v1 的 task 包装结构:
{
"task": {
"id": "optional-client-provided-id",
"contextId": "optional-client-provided-id",
"status": {
"state": "TASK_STATE_SUBMITTED"
}
}
}
如果未设置 returnImmediately 或传入 false,适配器会等待任务进入终态或中断态后再返回。
Task 状态
GET /v1/a2a/tasks/:task_id 返回 Task 资源:
{
"id": "abc-123",
"contextId": "abc-123",
"status": {
"state": "TASK_STATE_COMPLETED",
"message": {
"messageId": "msg-response",
"role": "ROLE_AGENT",
"parts": [{ "text": "..." }]
}
},
"history": []
}
任务状态使用 v1 枚举名,例如 TASK_STATE_SUBMITTED、TASK_STATE_WORKING、TASK_STATE_COMPLETED、TASK_STATE_FAILED、TASK_STATE_CANCELED。
可选能力默认值
Awaken 当前默认启用以下 A2A 能力:
streaming = truepushNotifications = true
extendedAgentCard 仍然是可选能力,只有在配置 ServerConfig.a2a_extended_card_bearer_token 后才会启用。未启用时,对应端点会返回符合规范的 unsupported 错误。
远程 Agent 委托
Awaken agent 可以通过 AgentTool::remote() 委托到远程 A2A agent。A2aBackend 会向远端发送 message:send 请求,读取返回的 task.id,再轮询 /tasks/:task_id 直到完成。从 LLM 视角看,这仍然只是一次普通工具调用。
远程 agent 配置写在 AgentSpec 中。RemoteEndpoint 是通用结构,A2A 通过 backend: "a2a" 指定:
{
"id": "remote-researcher",
"endpoint": {
"backend": "a2a",
"base_url": "https://remote-agent.example.com/v1/a2a",
"auth": { "type": "bearer", "token": "..." },
"target": "researcher",
"timeout_ms": 300000,
"options": {
"poll_interval_ms": 1000
}
}
}
带 endpoint 的 agent 会按远程 backend agent 解析。目前内置的委托 backend 是 A2A;没有 endpoint 的 agent 仍在本地运行。
另见
取消
Awaken 使用协作式取消机制来中断 agent run、流式输出和长时间运行的操作。
CancellationToken
CancellationToken 是一个可克隆的句柄,底层由共享的 AtomicBool 与 tokio::sync::Notify 组成。任何一个 clone 调用 cancel(),其他 clone 都会立刻观察到取消状态。
use awaken::CancellationToken;
let token = CancellationToken::new();
方法
pub fn new() -> Self
pub fn cancel(&self)
pub fn is_cancelled(&self) -> bool
pub async fn cancelled(&self)
Trait
CloneDefault
同步轮询
在同步代码或紧密循环里,可以使用 is_cancelled():
let token = CancellationToken::new();
while !token.is_cancelled() {
// do work
}
配合 tokio::select! 的异步等待
let token = CancellationToken::new();
tokio::select! {
result = some_async_work() => {
// work completed before cancellation
}
_ = token.cancelled() => {
// cancellation was signalled
}
}
协作式语义
取消不会强杀任务。cancel() 只会设置标志位并唤醒等待者,具体执行逻辑仍需要主动检查 is_cancelled() 或在 select! 中监听 cancelled()。
关键特性:
- 幂等:重复调用
cancel()是安全的。 - 共享:所有 clone 看到的是同一份状态。
- 可见性:原子标志使用强顺序,跨线程立即可见。
- 立即唤醒:
cancel()会通知所有等待者。
运行时里的用法
运行时会给每个 run 传入 CancellationToken,用于:
- 中断流式推理
- 在 phase 之间提前停止 run
- 把 HTTP / SSE / mailbox 层的取消请求传播到核心运行时
let token = CancellationToken::new();
let clone = token.clone();
tokio::select! {
_ = async {
while let Some(chunk) = stream.next().await {
// process chunk
}
} => {}
_ = clone.cancelled() => {
// stop processing, clean up
}
}
新消息到来时的自动取消
同一 thread 上已经有活动 run 时,如果又来了新的输入,mailbox 会先取消旧 run,再启动新 run。这样可以避免 ThreadAlreadyRunning,也避免挂起 run 与新输入互相干扰。
大致顺序是:
Mailbox::submit()发现该 thread 已有活动 run。- 调用
cancel_and_wait_by_thread()触发取消并等待线程槽位释放。 - 旧 run 以
TerminationReason::Cancelled结束。 - 新 run 启动前清理不成对的 tool call 历史,避免污染上下文。
关键文件
crates/awaken-runtime/src/cancellation.rscrates/awaken-runtime/src/runtime/agent_runtime/active_registry.rscrates/awaken-runtime/src/runtime/agent_runtime/runner.rs
工具执行模式
ToolExecutionMode 决定 LLM 在同一轮推理里返回多个 tool call 时,运行时如何执行这些调用。
ToolExecutionMode
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub enum ToolExecutionMode {
#[default]
Sequential,
ParallelBatchApproval,
ParallelStreaming,
}
默认值是 Sequential。
模式
Sequential
按顺序逐个执行 tool call。
- 每个调用之间都会刷新状态快照。
- 遇到挂起会在第一个挂起点停止后续调用。
- 适合 tool 之间有数据依赖或顺序很重要的场景。
ParallelBatchApproval
并发执行所有 tool call,所有调用看到的是同一份冻结快照。
- 对挂起决策采用批量回放。
- 会做并行补丁冲突检查。
- 不会因单个失败或挂起而提前停止其它调用。
ParallelStreaming
同样并发执行全部调用,但挂起决策一到就立刻回放,不等待其他挂起调用。
- 适合独立工具很多、希望尽快恢复执行的场景。
对比
| 行为 | Sequential | ParallelBatchApproval | ParallelStreaming |
|---|---|---|---|
| 执行顺序 | 串行 | 全并发 | 全并发 |
| 状态可见性 | 每次调用前刷新 | 冻结快照 | 冻结快照 |
| 遇挂起是否停止 | 是 | 否 | 否 |
| 遇失败是否停止 | 否 | 否 | 否 |
| decision 回放 | 不适用 | 批量 | 即时 |
| 冲突检查 | 否 | 是 | 是 |
Executor trait
#[async_trait]
pub trait ToolExecutor: Send + Sync {
async fn execute(
&self,
tools: &HashMap<String, Arc<dyn Tool>>,
calls: &[ToolCall],
base_ctx: &ToolCallContext,
) -> Result<Vec<ToolExecutionResult>, ToolExecutorError>;
fn name(&self) -> &'static str;
fn requires_incremental_state(&self) -> bool { false }
}
DecisionReplayPolicy
并行模式下,DecisionReplayPolicy 决定挂起 tool call 的恢复决策何时回放:
pub enum DecisionReplayPolicy {
Immediate,
BatchAllSuspended,
}
关键文件
crates/awaken-contract/src/contract/executor.rscrates/awaken-runtime/src/execution/executor.rs
相关
架构
Awaken 围绕一个运行时核心及三个外围层面组织:契约类型、服务器/存储适配器以及可选扩展。真正重要的不是 crate 的边界,而是决策在哪里做出。
应用组装
注册 tool / model / provider / plugin / AgentSpec
|
v
AgentRuntime
解析 AgentSpec -> ResolvedAgent
从插件构建 ExecutionEnv
执行 phase loop
暴露 cancel / decision 等活跃 run 的控制面
|
v
服务器与存储表面
HTTP 路由、mailbox、SSE 回放、协议适配器、
thread/run 持久化、profile storage
契约层 – awaken-contract 定义全系统共用的类型:AgentSpec、ModelSpec、ProviderSpec、Tool、AgentEvent、transport trait 以及类型化状态模型。这是系统其余部分共同使用的“词汇表“。
运行时核心 – awaken-runtime 是编排层。它将 agent ID 解析为完整配置(ResolvedAgent),从插件构建 ExecutionEnv,管理活跃 run,并将执行委托给循环运行器和阶段引擎。
服务器与持久化表面 – awaken-server 将运行时转化为 HTTP 和 SSE 端点、基于 mailbox 的后台执行以及协议适配器。awaken-stores 提供 thread 和 run 的具体持久化后端。awaken-ext-* crate 在阶段和工具边界扩展运行时行为,而不改动核心循环。
请求时序
下图展示一个典型请求在系统中的流转过程:
sequenceDiagram
participant Client
participant Server
participant Runtime
participant LLM
participant Tool
Client->>Server: POST /v1/ai-sdk/chat
Server->>Runtime: RunRequest (agent_id, thread_id, messages)
Runtime->>Runtime: Resolve agent (AgentSpec -> ResolvedAgent)
Runtime->>Runtime: Load thread history
Runtime->>Runtime: RunStart phase
loop Step loop
Runtime->>Runtime: StepStart phase
Runtime->>Runtime: BeforeInference phase
Runtime->>LLM: Inference request (messages + tools)
LLM-->>Runtime: Response (text + tool_calls)
Runtime->>Runtime: AfterInference phase
opt Tool calls present
Runtime->>Runtime: BeforeToolExecute phase
Runtime->>Tool: execute(args, ctx)
Tool-->>Runtime: ToolResult
Runtime->>Runtime: AfterToolExecute phase
end
Runtime->>Runtime: StepEnd phase (checkpoint)
end
Runtime->>Runtime: RunEnd phase
Runtime-->>Server: AgentEvent stream
Server-->>Client: SSE events (protocol-specific encoding)
阶段驱动的执行循环
每次 run 都经历固定的阶段序列。插件注册在各阶段边界运行的钩子,从而控制推理参数、工具执行、状态变更和终止逻辑。
RunStart -> [StepStart -> BeforeInference -> AfterInference
-> BeforeToolExecute -> AfterToolExecute -> StepEnd]* -> RunEnd
步骤循环持续执行,直到以下任一条件触发:
- LLM 返回不含工具调用的响应(
NaturalEnd)。 - 插件或停止条件请求终止(
Stopped、BehaviorRequested)。 - 工具调用挂起等待外部输入(
Suspended)。 - run 被外部取消(
Cancelled)。 - 发生错误(
Error)。
在每个阶段边界,循环在继续执行前会检查取消令牌和 run 的生命周期状态。
仓库结构
awaken
├─ awaken-contract
│ ├─ registry specs
│ ├─ tool / executor / event / transport contracts
│ └─ state model
├─ awaken-runtime
│ ├─ builder + registries + resolve pipeline
│ ├─ AgentRuntime control plane
│ ├─ loop_runner + phase engine
│ ├─ execution / context / policies / profile
│ └─ runtime extensions (handoff, local A2A, background)
├─ awaken-server
│ ├─ routes + config API + mailbox + services
│ ├─ protocols: ai_sdk_v6 / ag_ui / a2a / mcp / acp-stdio
│ └─ transport: SSE relay / replay buffer / transcoder
├─ awaken-stores
└─ awaken-ext-*
设计意图
三项原则指导整体架构:
快照隔离 – 阶段钩子永远看不到部分应用的状态。它们从不可变快照中读取,并向 MutationBatch 写入。在一个阶段的所有钩子收敛后,批次被原子性地提交。这消除了并发钩子之间的数据竞争,使钩子的执行顺序对正确性无关紧要。
追加式持久化 – Thread 消息仅追加,不修改。状态在步骤边界做检查点。这使得从任意检查点重放 run 成为可能,并产生确定性的审计轨迹。
传输无关性 – 运行时通过 EventSink trait 发出 AgentEvent 值。协议适配器(AiSdkEncoder、AgUiEncoder)将这些事件转码为具体的传输格式。运行时对 HTTP、SSE 或任何具体协议一无所知。添加新协议意味着实现一个新的编码器——运行时本身无需改变。
另见
- Run 生命周期与阶段 – 阶段执行模型
- 状态与快照模型 – 快照隔离详解
- 设计权衡 – 关键架构决策的理由
- 工具与插件边界 – 插件与工具的设计边界
智能体解析
当调用 runtime.run(request) 时,请求中的 agent_id 必须被解析为一个完全装配好的 ResolvedAgent – 一个持有 LLM 执行器、工具、插件和执行环境的活引用的结构体。每次调用 resolve() 都会重新执行解析;不会在运行之间缓存或共享任何内容。本文描述三阶段解析流水线及其输入构建器。
流水线概览
解析是一个纯函数:(RegistrySet, agent_id) -> ResolvedAgent。它按顺序经过三个阶段:
flowchart LR
subgraph Stage1["Stage 1: Lookup"]
A1[Fetch AgentSpec] --> A2[Resolve ModelSpec]
A2 --> A3[Get LlmExecutor]
A3 --> A4{Retry config?}
A4 -- yes --> A5[Wrap in RetryingExecutor]
A4 -- no --> A6[Use executor directly]
end
subgraph Stage2["Stage 2: Plugin Pipeline"]
B1[Resolve plugin IDs] --> B2[Inject default plugins]
B2 --> B3{context_policy set?}
B3 -- yes --> B4[Add CompactionPlugin + ContextTransformPlugin]
B3 -- no --> B5[Skip]
B4 --> B6[Validate config sections]
B5 --> B6
B6 --> B7[Build ExecutionEnv]
end
subgraph Stage3["Stage 3: Tool Pipeline"]
C1[Collect global tools] --> C2[Merge delegate agent tools]
C2 --> C3[Merge plugin-registered tools]
C3 --> C4[Check for ID conflicts]
C4 --> C5[Apply allowed/excluded filters]
end
Stage1 --> Stage2 --> Stage3
任何阶段的失败都会产生 ResolveError 并中止流程。流水线永远不会返回部分结果。
阶段 1:查找
第一阶段从注册表中获取原始数据:
-
AgentSpec – 通过
agent_id从AgentSpecRegistry中查找。如果规格包含endpoint字段(远程 backend 智能体),解析会以RemoteAgentNotDirectlyRunnable失败 – 远程智能体只能作为委托使用,不能直接运行。 -
ModelSpec – 规格的
model字段(一个字符串 ID,如"gpt-4")通过ModelRegistry解析为ModelSpec,将其映射到一个 provider ID 和实际 API 模型名(例如,provider"openai",模型"gpt-4o")。 -
LlmExecutor – 模型条目中的提供者 ID 通过
ProviderRegistry解析为一个活的LlmExecutor实例。 -
重试装饰 – 如果智能体规格包含
RetryConfigKey配置段,且max_retries > 0或fallback_models非空,则执行器会被包装在RetryingExecutor装饰器中。
阶段 2:插件流水线
第二阶段组装插件链并构建执行环境。
插件解析
AgentSpec.plugin_ids 中列出的插件通过 ID 从 PluginSource 解析。缺失的插件会产生 ResolveError::PluginNotFound。
默认插件注入
解析用户声明的插件后,流水线会注入运行时必需的默认插件。无论智能体如何配置,这些插件始终存在:
-
LoopActionHandlersPlugin– 注册运行时循环用于处理工具调用、发射事件和管理步骤转换的核心动作处理器。没有此插件,循环无法运行。 -
MaxRoundsPlugin– 强制执行智能体规格上配置的max_rounds停止条件。使用规格的max_rounds值注入。防止失控循环。
条件插件
仅当设置了 AgentSpec.context_policy 时才添加以下插件:
-
CompactionPlugin– 管理上下文窗口压缩(当上下文增长过大时对旧消息进行摘要)。使用规格中的CompactionConfigKey配置段创建,缺失时回退到默认值。 -
ContextTransformPlugin– 在每次推理请求前应用上下文窗口策略变换(token 计数、截断、提示缓存)。使用context_policy值创建。
构建 ExecutionEnv
插件列表确定后,ExecutionEnv::from_plugins() 对每个插件调用其 register() 方法并传入 PluginRegistrar。插件通过注册器声明:
- 阶段钩子(按阶段回调)
- 调度动作处理器
- 效果处理器
- 请求变换
- 状态键注册
- 工具
结果是一个 ExecutionEnv – 见下文 ExecutionEnv。
配置验证
插件可以声明 config_schemas(),返回一组 ConfigSchema 条目。每个条目将一个配置段键与一个 JSON Schema 关联。解析期间,每个声明的 schema 都会针对 AgentSpec.sections 中的对应条目进行验证:
- 配置段存在 – 针对 JSON Schema 进行验证。失败会产生
ResolveError::InvalidPluginConfig。 - 配置段缺失 – 允许。插件应使用合理的默认值。
- 配置段存在但无插件认领 – 没有插件为其声明 schema。流水线记录一条警告(可能是配置中的拼写错误)。
阶段 3:工具流水线
第三阶段从所有来源收集工具并生成最终工具集。
工具来源
工具按以下顺序合并:
-
全局工具 – 所有通过构建器在
ToolRegistry中注册的工具(例如builder.with_tool("search", search_tool))。 -
委托智能体工具 – 对于
AgentSpec.delegates中的每个智能体 ID,流水线创建一个AgentTool。如果委托有endpoint(远程),流水线会选择配置的远程 backend。目前内置的远程委托 backend 是 A2A;本地委托仍然创建由解析器支持的本地工具。委托工具需要a2a功能标志;没有该标志时,委托会被静默忽略并记录警告。 -
插件注册的工具 – 插件在
register()期间声明的工具,存储在ExecutionEnv.tools中。
冲突检测
如果插件注册的工具与全局工具具有相同的 ID,解析会以 ResolveError::ToolIdConflict 失败。这是有意为之的 – 静默覆盖会成为难以调试的问题来源。
过滤
合并后,应用规格的 allowed_tools 和 excluded_tools 字段:
allowed_tools = None– 保留所有工具。allowed_tools = Some(list)– 仅保留 ID 出现在列表中的工具。其余全部丢弃。excluded_tools– 任何 ID 出现在此列表中的工具都会被移除,即使它在允许列表中。
ExecutionEnv
ExecutionEnv 是插件流水线的每次解析产物。它不是全局或共享的 – 每次 resolve() 调用都会构建一个全新的实例。其内容:
| 字段 | 类型 | 用途 |
|---|---|---|
phase_hooks | HashMap<Phase, Vec<TaggedPhaseHook>> | 在每个阶段边界调用的钩子 |
scheduled_action_handlers | HashMap<String, ScheduledActionHandlerArc> | 用于调度/延迟动作的命名处理器 |
effect_handlers | HashMap<String, EffectHandlerArc> | 用于副作用的命名处理器 |
request_transforms | Vec<TaggedRequestTransform> | LLM 调用前应用于推理请求的变换 |
key_registrations | Vec<KeyRegistration> | 运行开始时安装到状态存储的状态键 |
tools | HashMap<String, Arc<dyn Tool>> | 插件提供的工具(在阶段 3 合并到主工具集) |
plugins | Vec<Arc<dyn Plugin>> | 用于生命周期钩子的插件引用(on_activate/on_deactivate) |
每个 TaggedPhaseHook 和 TaggedRequestTransform 都携带其所属插件的 ID,用于诊断和过滤。
AgentRuntimeBuilder
构建器(AgentRuntimeBuilder)是构造 AgentRuntime 的标准方式。它累积五个注册表:
| 注册表 | 构建器方法 | 用途 |
|---|---|---|
MapAgentSpecRegistry | with_agent_spec() / with_agent_specs() | 智能体定义 |
MapToolRegistry | with_tool() | 全局工具 |
MapModelRegistry | with_model() | 模型 ID 到提供者 + 模型名称的映射 |
MapProviderRegistry | with_provider() | LLM 执行器实例 |
MapPluginSource | with_plugin() | 插件实例 |
错误处理
构建器使用延迟错误收集。每个检测到冲突(重复 ID)的 with_* 调用会将 BuildError 推入内部错误列表,而不是返回 Result。第一个收集到的错误在调用 build() 或 build_unchecked() 时浮现。
验证
build() 在构造运行时后对每个已注册的智能体规格执行一次试运行解析。如果任何智能体解析失败(缺失模型、缺失提供者、缺失插件),错误会被收集并作为 BuildError::ValidationFailed 返回。这在启动时而非首次请求时捕获配置错误。
build_unchecked() 跳过此验证。仅在需要延迟解析或智能体将在构造后动态添加时使用。
远程智能体 (A2A)
当启用 a2a 功能标志时,构建器支持 with_remote_agents() 来注册远程 A2A 端点。这些端点被包装在 CompositeAgentSpecRegistry 中,该注册表组合了本地和远程智能体来源。远程智能体通过 build_and_discover() 异步发现。
另见
- 架构 – 系统分层与请求序列
- Run 生命周期与阶段 – 解析之后发生什么
- 工具与插件边界 – 何时使用工具 vs 插件
- 设计权衡 – 关键决策的理由
状态与快照模型
Awaken 使用带快照隔离的类型化状态引擎。本页解释状态原语、作用域、合并策略以及状态变更生命周期。
StateKey Trait
每一类运行时状态都由一个实现了 StateKey 的零大小类型来声明:
pub trait StateKey: 'static + Send + Sync {
const KEY: &'static str;
const MERGE: MergeStrategy = MergeStrategy::Exclusive;
const SCOPE: KeyScope = KeyScope::Run;
type Value: Clone + Default + Serialize + DeserializeOwned + Send + Sync + 'static;
type Update: Send + 'static;
fn apply(value: &mut Self::Value, update: Self::Update);
}
StateKey 同时把字符串键名、值类型、更新类型和合并规则绑定在一起。
KeyScope
pub enum KeyScope {
Run,
Thread,
}
Run:run 开始时重置Thread:在同一 thread 的多次 run 间保留
MergeStrategy
pub enum MergeStrategy {
Exclusive,
Commutative,
}
Exclusive:并发写同一键会冲突Commutative:更新可交换,允许并行合并
Snapshot
Snapshot 是某个时刻的不可变状态视图。phase hook 和 tool 只读取 Snapshot,不能直接修改它。
Phase hook 读取: &Snapshot
Phase hook 写入: MutationBatch
这样同一 phase 内的所有 hook 都能看到一致状态。
MutationBatch
MutationBatch 收集一次 hook 执行产生的状态更新:
pub struct MutationBatch {
base_revision: Option<u64>,
ops: Vec<Box<dyn MutationOp>>,
touched_keys: Vec<String>,
}
touched_keys 用于并发冲突检测。
变更生命周期
1. Phase 开始
2. 运行时拍一份 Snapshot
3. 每个 hook 读取 Snapshot,产出 MutationBatch
4. gather 完成
5. 运行时检查 MutationBatch 是否冲突
6. 合并后的 batch 原子提交到 live state
7. 为下一个 phase 创建新 Snapshot
StateMap
StateMap 是类型擦除的状态容器,内部保存所有状态键对应的值。
- 持久化键会在 checkpoint 时序列化
- 非持久化键只在内存里存在
StateStore
StateStore 在 StateMap 之上提供:
- 快照创建
- 带 revision 的 batch apply
- commit hooks
StateCommand处理
另见
Run 生命周期与 Phases
本页描述 run 和 tool call 的状态机、8 个 phase、终止条件、checkpoint 触发点,以及挂起 / 恢复如何桥接 run 层与 tool-call 层。
RunStatus
Running --+--> Waiting --+--> Running (resume)
| |
+--> Done +--> Done
pub enum RunStatus {
Running,
Waiting,
Done,
}
ToolCallStatus
New --> Running --+--> Succeeded
+--> Failed
+--> Cancelled
+--> Suspended --> Resuming --+--> Running
+--> Suspended
+--> Succeeded/Failed/Cancelled
pub enum ToolCallStatus {
New,
Running,
Suspended,
Resuming,
Succeeded,
Failed,
Cancelled,
}
Phase Enum
Awaken 的执行顺序由 8 个 phase 固定下来:
pub enum Phase {
RunStart,
StepStart,
BeforeInference,
AfterInference,
BeforeToolExecute,
AfterToolExecute,
StepEnd,
RunEnd,
}
RunStart:run 级初始化StepStart:每轮推理开始BeforeInference:最后修改推理请求AfterInference:观察 LLM 返回,修改工具列表或请求终止BeforeToolExecute:权限检查、拦截、挂起AfterToolExecute:消费工具结果并触发副作用StepEnd:checkpoint 和 stop policyRunEnd:清理与最终持久化
TerminationReason
pub enum TerminationReason {
NaturalEnd,
BehaviorRequested,
Stopped(StoppedReason),
Cancelled,
Blocked(String),
Suspended,
Error(String),
}
只有 Suspended 会映射到 RunStatus::Waiting;其他都映射为 Done。
Stop Conditions
可通过配置声明 stop 条件,例如:
MaxRoundsTimeoutTokenBudgetConsecutiveErrorsStopOnToolContentMatchLoopDetection
这些条件在 StepEnd 评估。
Checkpoint Triggers
StepEnd 会把以下内容写入 checkpoint:
- thread messages
- run 生命周期状态
- 持久化状态键
- 挂起的 tool call 状态
从 ToolCall 状态推导 RunStatus
run 的状态本质上是所有 tool call 状态的聚合投影:
fn derive_run_status(calls: &HashMap<String, ToolCallState>) -> RunStatus {
let mut has_suspended = false;
for state in calls.values() {
match state.status {
ToolCallStatus::Running | ToolCallStatus::Resuming => {
return RunStatus::Running;
}
ToolCallStatus::Suspended => {
has_suspended = true;
}
_ => {}
}
}
if has_suspended { RunStatus::Waiting } else { RunStatus::Done }
}
并行 tool call 时间线
Time tool_A(需审批) tool_B(需审批) tool_C(正常) → Run Status
────────────────────────────────────────────────────────────────
t0 Created Created Created Running
t1 Suspended Created Running Running
t2 Suspended Suspended Running Running
t3 Suspended Suspended Succeeded Waiting
t4 Resuming Suspended Succeeded Running
t5 Succeeded Suspended Succeeded Waiting
t6 Succeeded Resuming Succeeded Running
t7 Succeeded Succeeded Succeeded Done
挂起如何桥接 run 层与 tool-call 层
当前执行模型(串行 phases)
当前 execute_tools_with_interception 基本分两段:
Phase 1 - Intercept:
BeforeToolExecute hooks
可能得到 Suspend / Block / SetResult
Phase 2 - Execute:
对允许执行的调用做串行或并行执行
如果任一调用挂起,step 会返回 StepOutcome::Suspended,然后:
- checkpoint 持久化
- 发出
RunFinish(Suspended) - 进入
wait_for_resume_or_cancel
wait_for_resume_or_cancel 循环
loop {
let decisions = decision_rx.next().await;
emit_decision_events_and_messages(decisions);
prepare_resume(decisions);
detect_and_replay_resume();
if !has_suspended_calls() {
return WaitOutcome::Resumed;
}
}
Resume replay
恢复时会扫描 status == Resuming 的 tool call,并按 ToolCallResumeMode 回放:
| Resume Mode | Replay 参数 | 行为 |
|---|---|---|
ReplayToolCall | 原始参数 | 完整重跑 |
UseDecisionAsToolResult | decision 结果 | 直接作为 tool result |
PassDecisionToTool | decision 结果 | 作为新参数传给 tool |
局限:执行中到达的 decision
在当前串行模型里,decision 即使更早到达,也要等当前 Phase 2 工具执行结束后才会被消费,因此恢复存在额外延迟。
并发执行模型(未来方向)
理想模型会让“等待 decision”和“执行允许的工具”并发进行,使某个工具一旦得到决策就能立刻恢复。
架构
Phase 1 - Intercept
Phase 2 - Concurrent execution:
execute(tool_C)
execute(tool_D)
wait_decision(tool_A) -> replay(tool_A)
wait_decision(tool_B) -> replay(tool_B)
barrier: 所有 task 进入终态
按调用分发 decision
共享 decision_rx 需要先 demux 到每个 call 自己的等待通道。
状态转移时机
并发模型下,状态会随着事件实时前进,而不是整批推进。
协议适配器:SSE 重连
长生命周期 run 可能跨多个前端 SSE 连接,尤其是 AI SDK v6 这类“一次 HTTP 请求对应一次 SSE 流”的协议。
问题
Turn 1:
HTTP POST -> SSE 1 -> tool suspend -> stream 关闭
但 run 还活着,正在 wait_for_resume_or_cancel
Turn 2:
新 HTTP POST 带 decision 到来
如果事件仍然发往旧 channel,就会丢失
解决方案:ReconnectableEventSink
用一个可替换底层 sender 的 event sink 包装原始 channel,新连接到来时先 reconnect() 再投递 decision。
重连流程
Turn 1:
submit() -> 创建 event_tx1 / event_rx1
run suspend -> SSE 1 结束
Turn 2:
新请求创建 event_tx2 / event_rx2
sink.reconnect(event_tx2)
send_decision
后续事件都发往 SSE 2
协议层差异
| 协议 | 挂起信号 | 恢复机制 |
|---|---|---|
| AI SDK v6 | finish(finishReason: "tool-calls") | 新 HTTP POST -> reconnect -> send_decision |
| AG-UI | RUN_FINISHED(outcome: "interrupt") | 新 HTTP POST -> reconnect -> send_decision |
| CopilotKit | renderAndWaitForResponse UI | 同一 SSE 或新请求恢复 |
另见
HITL 与 Mailbox
本页解释 Awaken 如何通过 tool call 挂起和 mailbox 队列来实现 human-in-the-loop(HITL)。
SuspendTicket
当 tool call 需要外部审批或输入时,会产出一个 SuspendTicket:
pub struct SuspendTicket {
pub suspension: Suspension,
pub pending: PendingToolCall,
pub resume_mode: ToolCallResumeMode,
}
其中:
suspension:外部可见的动作描述、提示语、参数 schemapending:事件流里暴露给前端的待处理 tool call 投影resume_mode:decision 到来后如何恢复
ToolCallResumeMode
pub enum ToolCallResumeMode {
ReplayToolCall,
UseDecisionAsToolResult,
PassDecisionToTool,
}
ReplayToolCall:用原始参数重跑UseDecisionAsToolResult:直接把 decision 结果当 tool 结果PassDecisionToTool:把 decision 结果作为新参数传入工具
ResumeDecisionAction
pub enum ResumeDecisionAction {
Resume,
Cancel,
}
ToolCallResume
恢复载荷:
pub struct ToolCallResume {
pub decision_id: String,
pub action: ResumeDecisionAction,
pub result: Value,
pub reason: Option<String>,
pub updated_at: u64,
}
Permission 插件的 Ask 模式
awaken-ext-permission 利用挂起来实现审批:
- tool call 命中
behavior: ask - permission checker 生成
SuspendTicket - tool call 进入
Suspended - run 进入
Waiting - 前端提示用户审批
- 用户提交
Resume或Cancel Resume时按resume_mode恢复;Cancel时该 tool call 标记为取消
Mailbox 架构
Mailbox 是所有 run 请求的持久化队列。无论是 streaming、background、A2A 还是内部请求,最终都会变成一个 MailboxJob。
MailboxJob
pub struct MailboxJob {
pub job_id: String,
pub mailbox_id: String,
pub agent_id: String,
pub messages: Vec<Message>,
pub origin: MailboxJobOrigin,
pub sender_id: Option<String>,
pub parent_run_id: Option<String>,
pub request_extras: Option<Value>,
pub priority: u8,
pub dedupe_key: Option<String>,
pub generation: u64,
pub status: MailboxJobStatus,
pub available_at: u64,
pub attempt_count: u32,
pub max_attempts: u32,
pub last_error: Option<String>,
pub claim_token: Option<String>,
pub claimed_by: Option<String>,
pub lease_until: Option<u64>,
pub created_at: u64,
pub updated_at: u64,
}
MailboxJobStatus
Queued --claim--> Claimed --ack--> Accepted
| |
| nack(retry) --> Queued
| |
| nack(permanent) --> DeadLetter
|
|-- cancel --> Cancelled
+-- interrupt(generation bump) --> Superseded
pub enum MailboxJobStatus {
Queued,
Claimed,
Accepted,
Cancelled,
Superseded,
DeadLetter,
}
MailboxJobOrigin
pub enum MailboxJobOrigin {
User,
A2A,
Internal,
}
MailboxStore Trait
MailboxStore 负责 durable enqueue、原子 claim、ack/nack、cancel、lease 延长以及 interrupt。
实现必须保证:
- enqueue 持久化
- claim 原子化,且只能一个消费者成功
- ack/nack 校验 claim token
- interrupt 与 generation bump 原子完成
MailboxInterrupt
pub struct MailboxInterrupt {
pub new_generation: u64,
pub active_job: Option<MailboxJob>,
pub superseded_count: usize,
}
当更高优先级请求到来时,旧 job 会被 supersede,活动 run 需要被取消。
另见
多智能体模式
Awaken 支持多种 agent 组合方式,包括本地委托、远程 A2A agent、sub-agent 执行以及 handoff。
通过 AgentSpec.delegates 进行 agent 委托
agent 可以通过 delegates 声明它允许委托的子 agent:
{
"id": "orchestrator",
"model": "gpt-4o",
"system_prompt": "You coordinate tasks across specialized agents.",
"delegates": ["researcher", "writer", "reviewer"]
}
解析时,运行时会为每个 delegate 生成一个 AgentTool,LLM 看见的是普通工具,例如 agent_run_researcher。
通过 A2A 使用远程 agent
如果 AgentSpec.endpoint 存在,该 delegate 会被当作远程 A2A agent:
{
"id": "remote-analyst",
"model": "unused-for-remote",
"system_prompt": "",
"endpoint": {
"backend": "a2a",
"base_url": "https://analyst.example.com/v1/a2a",
"auth": { "type": "bearer", "token": "token-abc" },
"target": "analyst",
"timeout_ms": 300000,
"options": {
"poll_interval_ms": 1000
}
}
}
A2aBackend 会发送 message:send,读取返回的 task.id 并轮询任务状态,再把最终结果包装成 ToolResult 返回给父 agent。
Sub-Agent 模式
串行委托
Orchestrator -> researcher -> result
-> writer -> result
-> reviewer -> result
父 agent 按顺序调用子 agent,并根据前一步结果决定下一步。
并行委托
如果 LLM 在同一轮推理里一次返回多个 delegate tool call,运行时可并发执行这些子 agent。
嵌套委托
orchestrator
-> team_lead (delegates: [dev_a, dev_b])
-> dev_a
-> dev_b
每一层都独立通过 AgentResolver 解析。理论上没有硬深度限制,但每层都会增加 token 和延迟成本。
Agent Handoff
handoff 会在同一 run 内把控制权切换给另一个 agent,而不是把它当成子任务调用。
机制:
- 插件或 handoff 扩展把新 agent ID 写入活动 agent 状态键
- loop runner 在下一个步骤边界检测到变化
- 重新通过
AgentResolver解析 agent - 在同一个 thread、同一条消息历史上继续执行
Handoff 与 Delegation 的区别
| 方面 | Delegation | Handoff |
|---|---|---|
| 控制流 | 父 agent 调子 agent,拿回结果 | 控制权直接切到新 agent |
| Thread 连续性 | 子 agent 可以有独立上下文 | 同一 thread、同一消息历史 |
| 返回路径 | 结果回到父 agent | 不返回,后续由新 agent 接管 |
| 适用场景 | 任务拆解、专长子任务 | 角色切换、升级处理、路由 |
AgentBackend Trait
本地和远程委托都基于 AgentBackend:
pub trait AgentBackend: Send + Sync {
async fn execute(
&self,
agent_id: &str,
messages: Vec<Message>,
event_sink: Arc<dyn EventSink>,
parent_run_id: Option<String>,
parent_tool_call_id: Option<String>,
) -> Result<DelegateRunResult, AgentBackendError>;
}
这也是实现自定义委托后端的扩展点。
另见
Tool 与 Plugin 的边界
Awaken 明确区分“给 LLM 使用的能力”(tools)和“运行时生命周期扩展”(plugins)。本页解释两者的边界、适用场景以及交互方式。
Tools
tool 是暴露给 LLM 的能力:
pub trait Tool: Send + Sync {
fn descriptor(&self) -> ToolDescriptor;
fn validate_args(&self, args: &Value) -> Result<(), ToolError> { Ok(()) }
async fn execute(&self, args: Value, ctx: &ToolCallContext) -> Result<ToolOutput, ToolError>;
}
特征:
- 用 ID 注册
- 对 LLM 可见
- 在
BeforeToolExecute/AfterToolExecute窗口执行 - 只能访问参数和
ToolCallContext - 适合文件操作、API 调用、数据库查询等业务能力
Plugins
plugin 是运行时级扩展,不会出现在 LLM 工具列表中:
pub trait Plugin: Send + Sync + 'static {
fn descriptor(&self) -> PluginDescriptor;
fn register(&self, registrar: &mut PluginRegistrar) -> Result<(), StateError>;
fn config_schemas(&self) -> Vec<ConfigSchema> { vec![] }
}
特征:
- 用插件 ID 注册
- 对 LLM 不可见
- 在 phase 边界运行
- 可以访问更完整的
PhaseContext - 适合权限、观察、提醒、请求变换、状态同步等横切逻辑
PluginRegistrar
Plugin::register() 中,插件通过 PluginRegistrar 声明自己需要的能力:
| 注册方法 | 用途 |
|---|---|
register_key::<K>() | 注册状态键 |
register_phase_hook() | 在指定 phase 添加 hook |
register_tool() | 向运行时注入插件提供的工具 |
register_effect_handler() | 处理 effects |
register_scheduled_action() | 处理 scheduled actions |
register_request_transform() | 在请求到达 LLM 前变换推理请求 |
插件也可以注册 tool,例如 AgentTool、MCP 工具、skills 工具。对于 LLM 来说,这些和用户手写 tool 没有区别。
何时使用 Tool
适合以下情况:
- 需要被 LLM 显式按名字调用
- 是领域能力,如搜索、计算、文件 I/O、API 调用
- 有结构化输入和可供 LLM 推理的结果
何时使用 Plugin
适合以下情况:
- 希望逻辑在 phase 边界自动运行
- 需要修改 inference request
- 需要在 LLM 看见之前检查或转换 tool result
- 属于权限、可观测性、提醒等横切关注点
- 需要注册 state key、effect handler 或 request transform
Tool 和 Plugin 如何交互
插件可以影响 tool 执行,而 tool 自己并不需要感知:
- permission 插件可在
BeforeToolExecute阻断或挂起调用 - observability 插件为 tool 执行包裹 trace
- reminder 插件在工具完成后注入上下文提示
- interception pipeline 可改参数、替换结果、甚至跳过执行
插件提供的 Tool 与用户 Tool
| 方面 | 用户 Tool | 插件 Tool |
|---|---|---|
| 注册方式 | AgentRuntimeBuilder::with_tool() | PluginRegistrar::register_tool() |
| 生命周期 | 跟 runtime 一起存在 | 跟插件激活状态绑定 |
| 配置来源 | 直接构造 | 来自插件配置或 agent spec |
| 示例 | 业务工具 | AgentTool、MCP tools、skill tools |
另见
插件系统内部机制
本文介绍插件系统的内部运作机制:插件如何注册和激活、钩子如何执行和解决冲突、阶段收敛循环的工作原理,以及请求变换、效果、推理覆盖和工具拦截在运行时的行为。
关于工具与插件的高层边界,参见 Tool and Plugin Boundary。关于阶段生命周期,参见 Run Lifecycle and Phases。
插件注册与激活
当插件被加载时,其 register() 方法会以 PluginRegistrar 为参数被调用。插件声明两类组件:
结构组件 始终可用,不受激活状态影响:
- 状态键(
register_key::<K>()) - 调度动作处理器(
register_scheduled_action::<A, H>()) - 效果处理器(
register_effect::<E, H>())
行为组件 仅在插件通过激活过滤器时才处于活跃状态:
- 阶段钩子(
register_phase_hook()) - 工具(
register_tool()) - 请求变换(
register_request_transform())
激活由 AgentSpec.active_hook_filter 控制:
active_hook_filter 值 | 行为 |
|---|---|
| 空(默认) | 所有插件的行为组件均处于活跃状态 |
| 非空集合 | 仅 ID 在集合中的插件贡献行为组件 |
这种分离使得基础设施插件(状态管理、动作处理器、效果处理器)可以存在而不影响执行流。例如,一个仅注册效果处理器的日志插件永远不需要出现在 active_hook_filter 中——只要任何插件发出对应的效果,其处理器就会触发。
过滤逻辑在 PhaseRuntime::filter_hooks() 中实现:
fn filter_hooks<'a>(env: &'a ExecutionEnv, ctx: &PhaseContext) -> Vec<&'a TaggedPhaseHook> {
let hooks = env.hooks_for_phase(ctx.phase);
let active_hook_filter = &ctx.agent_spec.active_hook_filter;
hooks
.iter()
.filter(|tagged| {
active_hook_filter.is_empty()
|| active_hook_filter.contains(&tagged.plugin_id)
})
.collect()
}
钩子排序与冲突解决
多个插件可以为同一 Phase 注册钩子。引擎通过 gather_and_commit_hooks()(crates/awaken-runtime/src/phase/engine.rs)中实现的两阶段流程来处理它们。
快速路径:并行执行,单次提交
同一阶段的所有钩子针对一份冻结快照并行运行。每个钩子接收相同的快照并产生一个 StateCommand。如果没有钩子写入另一个钩子也写入的 MergeStrategy::Exclusive 键,则所有命令在单次批处理中合并并提交。
flowchart LR
S[冻结快照] --> H1[钩子 A]
S --> H2[钩子 B]
S --> H3[钩子 C]
H1 --> M[合并全部]
H2 --> M
H3 --> M
M --> C[单次提交]
冲突回退:分区与串行重试
如果两个或更多钩子写入同一个 Exclusive 键,引擎检测到冲突并回退:
- 分区 —— 按注册顺序遍历命令。贪心地将每个命令添加到“兼容批次“中,前提是其
Exclusive键与批次中已有的键不重叠。否则,延迟该钩子。 - 提交批次 —— 合并并提交兼容批次。
- 串行重新执行 —— 延迟的钩子逐个重新运行,每个钩子基于包含所有先前提交结果的新鲜快照执行。
flowchart TD
PAR[并行运行所有钩子] --> CHECK{Exclusive 键重叠?}
CHECK -- 否 --> FAST[合并全部,单次提交]
CHECK -- 是 --> PART[分区:批次 + 延迟]
PART --> CBATCH[提交兼容批次]
CBATCH --> SERIAL[串行重新运行延迟的钩子]
SERIAL --> DONE[所有钩子已提交]
由于钩子是纯函数(冻结快照输入,StateCommand 输出,无副作用),冲突时的重新执行始终是安全的。延迟的钩子能看到批次提交后的更新状态,因此即使原始并行执行会产生冲突,它们也能产生正确的结果。
关于 MergeStrategy 和快照隔离的详细信息,参见 State and Snapshot Model。
阶段收敛循环
每个阶段运行一个 GATHER 然后 EXECUTE 的循环,当没有新工作时收敛。
GATHER 阶段
并行运行所有活跃钩子(按上述冲突解决方式处理)。钩子产生的 StateCommand 值可能包含:
- 状态变更(键更新)
- 调度动作(在 EXECUTE 阶段处理)
- 效果(提交后立即派发)
EXECUTE 阶段
处理待执行的调度动作,这些动作的 Phase 匹配当前阶段且其动作键有已注册的处理器。每个动作处理器基于新鲜快照运行,并产生自己的 StateCommand,该命令可能为同一阶段调度新的动作。
如果处理后出现新的匹配动作,循环重复:
flowchart TD
START[阶段入口] --> GATHER[GATHER:运行钩子,提交]
GATHER --> EXEC[EXECUTE:处理匹配的动作]
EXEC --> CHECK{当前阶段有新动作?}
CHECK -- 是 --> EXEC
CHECK -- 否 --> DONE[阶段完成]
循环受 DEFAULT_MAX_PHASE_ROUNDS(16)限制。如果动作数量在此限制内未收敛,引擎返回 StateError::PhaseRunLoopExceeded。这可以防止无限反应链,同时允许合法的多步动作级联。
这种收敛设计支持反应式模式:权限检查动作可以在同一个 BeforeToolExecute 阶段调度暂停动作,两者在阶段完成前均被处理。
请求变换钩子
插件可以通过 registrar.register_request_transform() 注册 InferenceRequestTransform 实现。变换在请求到达 LLM 执行器之前修改 InferenceRequest。
用例:
- 系统提示注入 —— 向系统消息追加上下文、指令或提醒
- 工具列表修改 —— 过滤、重排序或增强发送给 LLM 的工具描述符
- 参数覆盖 —— 调整温度、最大 token 数或其他推理参数
变换按注册顺序运行,且可组合:每个变换接收经前一个变换修改后的请求。
仅活跃插件的变换会被应用。如果 active_hook_filter 非空,则不在过滤器中的插件的变换会被跳过。
效果处理器
效果是通过 EffectSpec trait 定义的类型化事件:
pub trait EffectSpec: 'static + Send + Sync {
const KEY: &'static str;
type Payload: Serialize + DeserializeOwned + Send + Sync + 'static;
}
钩子和动作处理器通过在 StateCommand 上调用 command.emit::<E>(payload) 来发出效果。与调度动作(在特定阶段的收敛循环中执行)不同,效果在命令提交到存储之后派发。
插件通过 registrar.register_effect::<E, H>(handler) 注册效果处理器。当效果被派发时,引擎使用效果载荷和当前快照调用处理器。
关键特性:
- 即发即忘 —— 处理器失败会被记录日志,但不会阻塞执行或回滚提交。
- 提交后执行 —— 效果看到的是发出它们的命令已被应用后的状态。
- 提交时验证 —— 如果命令发出的效果没有已注册的处理器,
submit_command立即返回StateError::UnknownEffectHandler,防止静默丢弃。
用例:审计日志、指标发送、跨插件通知、外部系统同步。
InferenceOverride 合并
多个插件可以通过调度 SetInferenceOverride 动作来影响推理参数。InferenceOverride 结构体使用按字段后写入者胜出的合并语义:
pub struct InferenceOverride {
pub model: Option<String>,
pub fallback_models: Option<Vec<String>>,
pub temperature: Option<f64>,
pub max_tokens: Option<u32>,
pub top_p: Option<f64>,
pub reasoning_effort: Option<ReasoningEffort>,
}
当两个覆盖被合并时,每个字段独立取最后一个非 None 的值:
pub fn merge(&mut self, other: InferenceOverride) {
if other.temperature.is_some() {
self.temperature = other.temperature;
}
if other.max_tokens.is_some() {
self.max_tokens = other.max_tokens;
}
// ... 所有字段同理
}
这允许插件覆盖特定参数而不影响其他参数。成本控制插件可以设置 max_tokens,而质量插件设置 temperature,两者互不干扰。如果两者设置同一字段,最后一次合并胜出。
工具拦截优先级
在 BeforeToolExecute 阶段,插件可以调度 ToolInterceptAction 来控制工具执行流。动作载荷是以下三个变体之一:
pub enum ToolInterceptPayload {
Block { reason: String },
Suspend(SuspendTicket),
SetResult(ToolResult),
}
当多个拦截被调度给同一个工具调用时,按隐式优先级解决:
| 优先级 | 变体 | 行为 |
|---|---|---|
| 3(最高) | Block | 终止工具执行,使调用失败 |
| 2 | Suspend | 暂停执行,等待外部决策 |
| 1(最低) | SetResult | 以预定义结果短路返回 |
最高优先级的拦截胜出。如果两个拦截具有相同优先级(例如,两个插件都调度了 Block),第一个被处理的生效,冲突被记录为错误。
如果没有调度拦截,工具正常执行(隐式放行)。
flowchart TD
BTE[BeforeToolExecute 钩子运行] --> INT{有调度的拦截?}
INT -- 否 --> EXEC[正常执行工具]
INT -- 是 --> PRI[按优先级解决]
PRI --> B[Block:使调用失败]
PRI --> S[Suspend:等待决策]
PRI --> R[SetResult:返回预定义结果]
另见
- Tool and Plugin Boundary —— 何时使用工具 vs 插件
- Run Lifecycle and Phases —— 阶段排序与终止
- State and Snapshot Model —— 合并策略、作用域、快照隔离
- Scheduled Actions Reference —— 动作处理器注册
- HITL and Mailbox —— 暂停与恢复流程
设计取舍
本页总结 Awaken 几个关键架构决策及其权衡。
Snapshot Isolation vs Mutable State
决策:phase hook 只读不可变 Snapshot,写入 MutationBatch,所有变更在 gather 结束后原子提交。
备选:hook 直接修改共享可变状态。
| Snapshot Isolation | Mutable State | |
|---|---|---|
| 正确性 | 不依赖 hook 执行顺序 | 容易受锁粒度和顺序影响 |
| 并发性 | hook 可安全并行 | 需要精细锁或强制串行 |
| 复杂度 | 需要 batch / merge / conflict machinery | 实现直观 |
| 可调试性 | 每个 phase 边界都是清晰状态跃迁 | 状态变化交织,难追踪 |
| 代价 | 每次 phase 快照需额外 Arc clone | 无快照开销 |
为什么选 Snapshot:插件组合不应该依赖隐藏顺序。快照模型让 phase 更接近“状态到变更”的纯函数,便于推理和重放。
Phase-Based Execution vs Event-Driven
决策:执行遵循固定 phase 顺序。
备选:完全事件驱动,插件异步订阅事件。
| Phase-Based | Event-Driven | |
|---|---|---|
| 可预测性 | 高 | 低 |
| 插件组合 | 边界明确 | 易产生隐式耦合 |
| 可测试性 | 容易对 phase 序列做单测 | 需要模拟异步事件流 |
| 灵活性 | 插入新能力通常要加 phase | 事件扩展自由 |
| 性能 | 顺序 phase 执行引入额外开销 | 可并发处理 |
为什么选 Phase:agent 执行天然是“推理 -> 工具 -> 检查终止”的顺序流程。phase 模型更适合在固定节点上插入权限、观察、提醒和请求变换。
Typed State Keys vs Dynamic State
决策:状态键是 Rust 类型,必须实现 StateKey。
备选:HashMap<String, Value> 风格的动态状态。
| Typed Keys | Dynamic State | |
|---|---|---|
| 类型安全 | 编译期保证 | 运行期才发现错误 |
| 合并语义 | 每个键明确声明 MergeStrategy | 需要外部约定 |
| 可发现性 | IDE 可跳转 | 只能 grep 字符串 |
| 代价 | 需要定义类型 | 使用方便 |
| 可扩展性 | 新键需要修改代码并重新编译 | 运行时可动态添加新键 |
为什么选 Typed Keys:状态错误往往非常隐蔽。把键名、值类型和更新语义统一固化在类型上,更适合作为运行时核心机制。
Plugin System vs Middleware Chain
决策:使用 PluginRegistrar 做结构化注册,而不是用 middleware 链包裹整条执行路径。
备选:类似 HTTP middleware 的包裹链。
| Plugin System | Middleware Chain | |
|---|---|---|
| 粒度 | 能分别注册 hook / key / tool / effect | 只能包整个执行 |
| 组合 | 多插件可共享同一 phase | 顺序高度敏感 |
| 选择性激活 | 可按 agent 启停插件 | 需要改链结构 |
| 复杂度 | 注册动作较多 | 心智模型简单 |
| 横切关注点 | 天然契合——每个插件处理一个关注点 | 每个 middleware 处理一个关注点,但会看到全部流量 |
为什么选 Plugin System:Awaken 的扩展点并不只在单一调用链上,而是散落在 phase、tool 拦截、state、effects、actions 等多个边界。
Multi-Protocol Server vs Single Protocol
决策:同一个 server 同时提供 AI SDK v6、AG-UI、A2A、MCP HTTP,ACP 则作为独立 stdio 协议模块存在。
备选:只支持一种标准协议。
| Multi-Protocol | Single Protocol | |
|---|---|---|
| 前端兼容性 | 可直接接多种生态 | 客户端需自写适配 |
| 维护成本 | 多套 encoder / route | 一套协议面 |
| 测试面 | 更大 | 更小 |
| 运行时耦合 | 运行时保持协议无关 | 容易耦合到唯一协议 |
| 复杂度 | 多套路由、encoder 类型和事件映射 | 一套路由、一套 encoder |
为什么选 Multi-Protocol:当前 agent 生态没有统一协议。把协议复杂度压在 server 层,比要求用户自写桥接更实用。
另见
- 架构 – 三层设计
- 状态与快照模型 – Snapshot Isolation 详解
- Run 生命周期与 Phases – phase 执行模型
- Tool 与 Plugin 的边界 – plugin 与 tool 的设计边界
术语表
| 术语 | 中文 | 说明 |
|---|---|---|
Thread | 会话线程 | 持久化的对话与状态历史。 |
Run | 运行 | 针对某个 thread 的一次执行尝试。 |
Phase | 阶段 | 执行循环中的命名阶段。 |
Snapshot | 快照 | 传给 hook / tool 的不可变状态视图(struct Snapshot { revision: u64, ext: Arc<StateMap> })。 |
StateKey | 状态键 | 带作用域、合并策略和值类型的类型化键。 |
MutationBatch | 变更批次 | 在提交前收集的一组状态变更。 |
AgentRuntime | 智能体运行时 | 负责解析 agent 和执行 run 的核心运行时。 |
AgentSpec | 智能体规约 | 可序列化的 agent 定义。 |
AgentEvent | 智能体事件 | 运行时统一事件流。 |
Plugin | 插件 | 通过 PluginRegistrar 注册的系统级扩展。 |
PluginRegistrar | 插件注册器 | 注册 phase hook、tool、state key、handler 的入口。 |
PhaseHook | 阶段钩子 | 绑定到某个 phase 的异步 hook。 |
PhaseContext | 阶段上下文 | hook 执行时拿到的只读上下文。 |
StateCommand | 状态命令 | hook 返回值,包含变更、action、effect。 |
Tool | 工具 | 暴露给 agent 的能力接口。 |
ToolDescriptor | 工具描述符 | tool 的 ID、名称、描述、参数 schema。 |
ToolResult | 工具结果 | tool 执行成功、失败或挂起后的结构化结果。 |
ToolCallContext | 工具调用上下文 | tool 执行期可读取状态和上报活动的上下文。 |
TerminationReason | 终止原因 | run 结束的原因。 |
SuspendTicket | 挂起票据 | 描述挂起原因、恢复模式和待决 tool call。 |
MailboxJob | 邮箱任务 | 后台执行与 HITL 的持久化作业项。 |
RunRequest | 运行请求 | 启动 run 的输入。 |
MergeStrategy | 合并策略 | 并行状态写入如何合并。 |
KeyScope | 键作用域 | 状态键生命周期:Run 或 Thread。 |
StateMap | 状态映射 | Snapshot 背后的类型安全异构 map。 |
RunStatus | 运行状态 | 粗粒度 run 状态:Running、Waiting、Done。 |
ToolCallStatus | 工具调用状态 | 单个 tool call 的状态。 |
ResolvedAgent | 已解析智能体 | 从 registries 解析完成、可直接运行的 agent。 |
AgentResolver | 智能体解析器 | 把 agent ID 解析成 ResolvedAgent 的组件。 |
BuildError | 构建错误 | AgentRuntimeBuilder::build() 阶段的错误。 |
RuntimeError | 运行时错误 | agent loop 执行中的错误。 |
InferenceOverride | 推理覆盖 | 针对单次推理的 model / temperature 等覆盖。 |
ContextWindowPolicy | 上下文窗口策略 | token 预算和压缩策略。 |
StreamEvent | 流事件 | 对 AgentEvent 的带序号封装。 |
TokenUsage | 令牌用量 | LLM 推理返回的 token 统计。 |
ExecutionEnv | 执行环境 | 解析后组装出来的 hook、tool、handler 集合。 |
CommitHook | 提交钩子 | 状态提交到存储时触发的 hook。 |
常见问题
支持哪些 LLM provider?
任何兼容 genai 的 provider 都可以,包括 OpenAI、Anthropic、DeepSeek、Google Gemini、Ollama 等。当前做法不是在 AgentSpec 里直接写 provider 名,而是把 AgentSpec.model 写成模型注册表里的 ID,再由 ModelSpec 解析到 provider 和真实模型名。
如何添加新的存储后端?
实现 awaken-contract 里的 ThreadRunStore trait。它要求你支持 thread、message、run、checkpoint 的读写。可以参考 awaken-stores 里的 InMemoryStore、FileStore、PostgresStore。
不启用 server 能用 awaken 吗?
可以。AgentRuntime 本身就是独立运行时。你可以自己构造 RunRequest,再直接调用 runtime.run(request, sink)。awaken-server 只是附加的 HTTP / SSE / mailbox / protocol gateway。
如何运行多个 agent?
两种主要方式:
- 本地委托:在
AgentSpec.delegates里声明可委托的 agent ID,由运行时在步骤边界完成委托切换。 - 远程 A2A:给
AgentSpec.endpoint配置远端地址,或注册远程 agent,让它通过 A2A 以“工具调用”的形式执行。
Run scope 和 Thread scope 的区别是什么?
Run:只在一次 run 生命周期内有效。run 结束即清空。适合步骤计数、预算、临时上下文。Thread:在同一 thread 的多次 run 之间持续存在。适合用户偏好、会话记忆、长期状态。
如何处理工具错误?
可恢复错误返回 ToolResult::error(...),这样错误会以 tool 响应消息的形式回写给 LLM,LLM 可以继续决定下一步。如果是参数校验失败或真正要中止工具执行的错误,则返回 ToolError。
工具可以并行执行吗?
可以。通过 ToolExecutionMode 配置。并行模式下,运行时会并发执行同一步里的多个 tool call,并在下一轮推理前收集结果、做冲突检查和状态合并。
run 卡住时怎么排查?
先看 run 的状态:
Waiting:通常是 HITL 决策未完成,检查SuspendTicket、mailbox 和待处理 decision。Running:检查max_rounds、timeout、工具是否阻塞,以及是否有流式调用未结束。
如果需要细节,启用 observability 插件查看 phase、tool、inference 级别的 tracing。
不连真实 LLM 怎么测试?
实现一个返回固定响应的 LlmExecutor 即可。详细模式见测试策略。
并行工具同时写同一个状态键会怎样?
如果该键是 MergeStrategy::Exclusive,并行合并会失败,相关执行会回退到串行或报冲突。若该键天然支持交换律,应使用 MergeStrategy::Commutative。
request transform 是怎么工作的?
插件可以注册 InferenceRequestTransform。它会在请求真正发给 LLM 前修改 system prompt、工具列表、推理参数等。只有当前 agent 激活的插件 transform 才会生效。
可以自定义存储后端吗?
可以。状态与消息持久化实现 ThreadRunStore;如果还要支持 HITL / 后台队列,再实现 MailboxStore。
context compaction 是怎么做的?
当 ContextWindowPolicy 启用自动压缩时,运行时会在超过预算后寻找安全边界,把较早消息总结成 <conversation-summary>,再保留最近一段原始上下文。
AI SDK v6、AG-UI、A2A 该怎么选?
- AI SDK v6:适合 Vercel AI SDK /
useChat前端。 - AG-UI:适合 CopilotKit 和带生成式 UI 的前端。
- A2A:适合 agent 到 agent 的服务间编排和远程委托。
它们共享同一个 AgentRuntime,差别主要在协议编码层和前端生态。
从 Tirea 迁移
Awaken 不是对 tirea 的增量升级,而是一次从头重写。本页帮助你把 tirea 0.5 的核心概念映射到 Awaken。
Crate 映射
| tirea 0.5 | awaken | 说明 |
|---|---|---|
tirea-state | awaken-contract | 状态引擎并入 contract crate |
tirea-state-derive | — | 移除;直接实现 StateKey |
tirea-contract | awaken-contract | 契约统一到一个 crate |
tirea-agentos | awaken-runtime | 运行时改名 |
tirea-store-adapters | awaken-stores | 存储适配层改名 |
tirea-agentos-server | awaken-server | server 改名,协议已并入 |
tirea-protocol-ai-sdk-v6 | awaken-server::protocols::ai_sdk_v6 | 合并到 server |
tirea-protocol-ag-ui | awaken-server::protocols::ag_ui | 合并到 server |
tirea-protocol-acp | awaken-server::protocols::acp | 合并到 server |
tirea-extension-permission | awaken-ext-permission | 改名 |
tirea-extension-observability | awaken-ext-observability | 改名 |
tirea-extension-skills | awaken-ext-skills | 改名 |
tirea-extension-mcp | awaken-ext-mcp | 改名 |
tirea-extension-handoff | awaken-runtime::extensions::handoff | 并入 runtime |
tirea-extension-a2ui | awaken-ext-generative-ui | 改名 |
tirea | awaken | 门面 crate 改名 |
状态模型
tirea 使用 derive-macro 状态系统;Awaken 改为 trait-based:
// tirea 0.5
#[derive(StateSlot)]
#[state(key = "my_plugin.counter", scope = "run")]
struct Counter(u32);
// awaken
use awaken::prelude::*;
pub struct Counter;
impl StateKey for Counter {
const KEY: &'static str = "my_plugin.counter";
const MERGE: MergeStrategy = MergeStrategy::Exclusive;
type Value = u32;
type Update = u32;
fn apply(value: &mut u32, update: u32) {
*value = update;
}
}
| tirea 概念 | Awaken 对应 |
|---|---|
StateSlot derive | StateKey trait |
ScopeDomain::Run | KeyScope::Run |
ScopeDomain::Thread | KeyScope::Thread |
ScopeDomain::Global | 已移除,通常改为 Thread + ProfileStore |
| 可变 state 直接读写 | 不可变快照 + StateCommand |
Action 系统
tirea 使用按 phase 分类的 action enum;Awaken 统一改为 ScheduledActionSpec + handler:
// tirea 0.5
BeforeInferenceAction::AddContextMessage(
ContextMessage::system("key", "content")
)
// awaken
let mut cmd = StateCommand::new();
cmd.schedule_action::<AddContextMessage>(
ContextMessage::system("key", "content"),
)?;
Plugin Trait
// tirea 0.5
impl Extension for MyPlugin {
fn name(&self) -> &str { "my_plugin" }
fn register(&self, ctx: &mut ExtensionContext) -> Result<()> {
ctx.add_hook(Phase::BeforeInference, MyHook);
Ok(())
}
}
// awaken
impl Plugin for MyPlugin {
fn descriptor(&self) -> PluginDescriptor {
PluginDescriptor { name: "my_plugin" }
}
fn register(&self, r: &mut PluginRegistrar) -> Result<(), StateError> {
r.register_phase_hook("my_plugin", Phase::BeforeInference, MyHook)?;
r.register_key::<MyState>(StateKeyOptions::default())?;
Ok(())
}
}
Tool Trait
// tirea 0.5
#[async_trait]
impl TypedTool for MyTool {
type Args = MyArgs;
async fn call(&self, args: Self::Args, ctx: ToolContext) -> ToolOutput {
ToolOutput::success(json!({"result": 42}))
}
}
// awaken
#[async_trait]
impl Tool for MyTool {
fn descriptor(&self) -> ToolDescriptor { ... }
async fn execute(&self, args: Value, ctx: &ToolCallContext) -> Result<ToolOutput, ToolError> {
Ok(ToolResult::success("my_tool", json!({"result": 42})).into())
}
}
Runtime Builder
// tirea 0.5
let runtime = AgentOsBuilder::new()
.agent(agent_spec)
.tool("search", SearchTool)
.extension(PermissionExtension::new(rules))
.build()?;
// awaken
let runtime = AgentRuntimeBuilder::new()
.with_agent_spec(agent_spec)
.with_tool("search", Arc::new(SearchTool))
.with_plugin("permissions", Arc::new(PermissionPlugin::new(rules)))
.build()?;
被移除的概念
StateSlotderive macroGlobalscopeRuntimeEffectEffectLog/ScheduledActionLogConfigStore/ConfigSlotAgentProfileExtensionContext的动态激活
新增的概念
PluginRegistrar- 带
StateCommand的ToolOutput ToolInterceptActionCircuitBreakerMailboxEventReplayBufferDeferredToolsPluginProfileStore
快速检查清单
-
Cargo.toml里把tirea替换成awaken -
use tirea::*改成use awaken::prelude::* -
#[derive(StateSlot)]改成impl StateKey -
Extension改成Plugin -
TypedTool改成Tool - action enum 改成
cmd.schedule_action::<ActionType>(...) -
AgentOsBuilder改成AgentRuntimeBuilder - store 和 server import 切到
awaken-stores、awaken-server