Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

简介

Awaken 是一个用 Rust 构建的模块化 AI 智能体运行时框架。它提供基于阶段的执行模型(含快照隔离与确定性重放)、带键作用域(thread / run)和合并策略(exclusive / commutative)的类型化状态引擎、用于可扩展性的插件生命周期系统,以及支持 AI SDK v6、AG-UI、A2A 和 MCP(HTTP 及 stdio)的多协议服务面,以及 ACP stdio 协议面。

Crate 概览

Crate说明
awaken-contract核心契约:类型、trait、状态模型、智能体规约
awaken-runtime执行引擎:阶段循环、插件系统、智能体循环、构建器
awaken-serverHTTP/SSE 网关与协议适配器
awaken-stores存储后端:内存、文件、PostgreSQL
awaken-tool-patternGlob/正则工具匹配,用于权限和提醒规则
awaken-ext-permission权限插件,支持 allow/deny/ask 策略
awaken-ext-observability基于 OpenTelemetry 的 LLM 和工具调用追踪
awaken-ext-mcpModel Context Protocol 客户端集成
awaken-ext-skills技能包发现与激活
awaken-ext-reminder声明式提醒规则,在工具执行后触发
awaken-ext-generative-ui声明式 UI 组件(A2UI 协议)
awaken-ext-deferred-tools基于概率模型的延迟工具加载
awaken门面 crate,重新导出核心模块

架构

应用代码
  注册 tool / model / provider / plugin / agent spec
        |
        v
AgentRuntime
  将 AgentSpec 解析为 ResolvedAgent
  从插件构建 ExecutionEnv
  执行 phase loop,并暴露 cancel / decision 控制面
        |
        v
服务与存储表面
  HTTP 路由、SSE 回放、mailbox、协议适配器、thread/run 持久化

核心原则

所有状态访问遵循快照隔离。阶段钩子看到的是不可变快照;变更收集在 MutationBatch 中,在收敛后原子性地应用。

本书内容

  • 快速上手 — 用最小可运行流程建立整体心智模型
  • 构建 Agent — 添加 Tool、Plugin、MCP、Skills、Reminder、Handoff 和 UI 能力
  • 服务与集成 — 暴露 HTTP 端点并接入 AI SDK 或 CopilotKit 前端
  • 状态与存储 — 选择持久化、上下文裁剪和状态访问模式
  • 运行与运维 — 用可观测性、权限、进度上报和测试加固生产行为
  • 参考 — API、协议、配置和 Schema 查阅页面
  • 架构 — 运行时分层、phase 执行与设计取舍

推荐阅读路径

如果你是第一次接触本项目,建议按以下顺序阅读:

  1. 先阅读 快速上手,并完成 第一个 Agent
  2. 需要扩展能力时,进入 构建 Agent
  3. 需要对接 HTTP 客户端或前端时,进入 服务与集成
  4. 从演示走向生产时,阅读 状态与存储运行与运维
  5. 需要精确契约或运行时内部细节时,回到 参考概览架构

仓库导航

从文档进入代码时,以下路径最为重要:

路径用途
crates/awaken-contract/核心契约:工具、事件、状态接口
crates/awaken-runtime/智能体运行时:执行引擎、插件、构建器
crates/awaken-server/HTTP/SSE 服务端
crates/awaken-stores/存储后端
crates/awaken/examples/小型运行时示例
examples/src/全栈服务端示例
docs/book/src/本文档源码

快速上手

如果你第一次接触 Awaken,先走这条路径,用最小可运行流程建立整体心智模型。

推荐顺序

  1. 阅读 第一个 Agent,先跑通最小 runtime。
  2. 阅读 第一个 Tool,理解 tool schema、执行流程和状态写入。
  3. 进入 构建 Agent,把示例整理成可复用的工程基线。
  4. 在写生产级工具前,补上 Tool Trait

何时离开这条路径

第一个 Agent

目标

端到端运行一个智能体,并确认你收到完整的事件流。

前置条件

[dependencies]
awaken = { package = "awaken-agent", version = "0.1" }
tokio = { version = "1", features = ["full"] }
async-trait = "0.1"
serde_json = "1"

运行之前,请设置一个模型提供商的 API 密钥:

# OpenAI 兼容模型(用于 gpt-4o-mini)
export OPENAI_API_KEY=<your-key>

# 或 DeepSeek 模型
export DEEPSEEK_API_KEY=<your-key>

1. 创建 src/main.rs

use std::sync::Arc;
use serde_json::{json, Value};
use async_trait::async_trait;
use awaken::contract::tool::{Tool, ToolDescriptor, ToolResult, ToolOutput, ToolError, ToolCallContext};
use awaken::contract::message::Message;
use awaken::contract::event::AgentEvent;
use awaken::contract::event_sink::VecEventSink;
use awaken::engine::GenaiExecutor;
use awaken::registry_spec::AgentSpec;
use awaken::registry::ModelEntry;
use awaken::{AgentRuntimeBuilder, RunRequest};

struct EchoTool;

#[async_trait]
impl Tool for EchoTool {
    fn descriptor(&self) -> ToolDescriptor {
        ToolDescriptor::new("echo", "Echo", "Echo input back to the caller")
            .with_parameters(json!({
                "type": "object",
                "properties": { "text": { "type": "string" } },
                "required": ["text"]
            }))
    }

    async fn execute(
        &self,
        args: Value,
        _ctx: &ToolCallContext,
    ) -> Result<ToolOutput, ToolError> {
        let text = args["text"].as_str().unwrap_or_default();
        Ok(ToolResult::success("echo", json!({ "echoed": text })).into())
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let agent_spec = AgentSpec::new("assistant")
        .with_model("gpt-4o-mini")
        .with_system_prompt("You are a helpful assistant. Use the echo tool when asked.")
        .with_max_rounds(5);

    let runtime = AgentRuntimeBuilder::new()
        .with_agent_spec(agent_spec)
        .with_tool("echo", Arc::new(EchoTool))
        .with_provider("openai", Arc::new(GenaiExecutor::new()))
        .with_model("gpt-4o-mini", ModelEntry {
            provider: "openai".into(),
            model_name: "gpt-4o-mini".into(),
        })
        .build()?;

    let request = RunRequest::new(
        "thread-1",
        vec![Message::user("Say hello using the echo tool")],
    )
    .with_agent_id("assistant");

    let sink = Arc::new(VecEventSink::new());
    runtime.run(request, sink.clone()).await?;

    let events = sink.take();
    println!("events: {}", events.len());

    let finished = events
        .iter()
        .any(|e| matches!(e, AgentEvent::RunFinish { .. }));
    println!("run_finish_seen: {}", finished);

    Ok(())
}

2. 运行

cargo run

3. 验证

预期输出包括:

  • events: <n>,其中 n > 0
  • run_finish_seen: true

事件流将至少包含 RunStart、一个或多个 TextDeltaToolCallStart/ToolCallDone 事件,以及最终的 RunFinish

你创建了什么

本示例创建了一个进程内的 AgentRuntime 并立即执行一个请求。

核心对象是:

let runtime = AgentRuntimeBuilder::new()
    .with_agent_spec(agent_spec)
    .with_tool("echo", Arc::new(EchoTool))
    .with_provider("openai", Arc::new(GenaiExecutor::new()))
    .with_model("gpt-4o-mini", ModelEntry {
        provider: "openai".into(),
        model_name: "gpt-4o-mini".into(),
    })
    .build()?;

之后,标准入口点是:

let sink = Arc::new(VecEventSink::new());
runtime.run(request, sink.clone()).await?;
let events = sink.take();

常见使用模式:

  • 一次性 CLI 程序:构造 RunRequest,通过 VecEventSink 收集事件,打印结果
  • 应用服务:将 runtime.run(...) 封装在你自己的应用逻辑中
  • HTTP 服务器:将 Arc<AgentRuntime> 存储在应用状态中,暴露协议路由

下一步阅读

根据你的需求选择下一页:

常见错误

  • 模型/提供商不匹配:gpt-4o-mini 需要兼容的 OpenAI 风格提供商配置。
  • 缺少密钥:在 cargo run 之前设置 OPENAI_API_KEYDEEPSEEK_API_KEY
  • 工具未被选中:确保提示词明确要求使用 echo
  • 没有 RunFinish 事件:检查 with_max_rounds 设置是否足够高,以便模型完成执行。

下一步

第一个 Tool

目标

实现一个在执行时从 ToolCallContext 读取类型化状态的工具。

状态是可选的。 许多工具(API 调用、搜索、Shell 命令)不需要状态——只需实现 execute 并返回 ToolResult 即可。

前置条件

[dependencies]
awaken = { package = "awaken-agent", version = "0.1" }
tokio = { version = "1", features = ["full"] }
async-trait = "0.1"
serde = { version = "1", features = ["derive"] }
serde_json = "1"

1. 定义 StateKey

StateKey 描述状态映射中的一个命名槽位,声明值类型、更新策略和生命周期范围。

use awaken::{StateKey, KeyScope, MergeStrategy};

/// 记录问候工具被调用的次数。
struct GreetCount;

impl StateKey for GreetCount {
    const KEY: &'static str = "greet_count";
    const MERGE: MergeStrategy = MergeStrategy::Commutative;
    const SCOPE: KeyScope = KeyScope::Run;

    type Value = u32;
    type Update = u32;

    fn apply(value: &mut Self::Value, update: Self::Update) {
        *value += update;
    }
}
fn main() {}

关键选项说明:

  • KeyScope::Run — 状态在每次运行开始时重置。使用 KeyScope::Thread 可跨运行持久化。
  • MergeStrategy::Commutative — 并发更新安全。当只有一个写入方时使用 Exclusive
  • apply 定义 Update 如何修改当前 Value,此处为递增。

2. 实现 Tool

该工具通过 ctx.state::<GreetCount>() 读取当前计数并返回个性化问候。

use awaken::{StateKey, KeyScope, MergeStrategy};
struct GreetCount;
impl StateKey for GreetCount {
    const KEY: &'static str = "greet_count";
    const MERGE: MergeStrategy = MergeStrategy::Commutative;
    const SCOPE: KeyScope = KeyScope::Run;
    type Value = u32;
    type Update = u32;
    fn apply(value: &mut Self::Value, update: Self::Update) { *value += update; }
}
use std::sync::Arc;
use async_trait::async_trait;
use serde_json::{json, Value};
use awaken::contract::tool::{Tool, ToolDescriptor, ToolResult, ToolOutput, ToolError, ToolCallContext};

struct GreetTool;

#[async_trait]
impl Tool for GreetTool {
    fn descriptor(&self) -> ToolDescriptor {
        ToolDescriptor::new("greet", "Greet", "Greet a user by name")
            .with_parameters(json!({
                "type": "object",
                "properties": {
                    "name": { "type": "string", "description": "Name to greet" }
                },
                "required": ["name"]
            }))
    }

    fn validate_args(&self, args: &Value) -> Result<(), ToolError> {
        args["name"]
            .as_str()
            .filter(|s| !s.is_empty())
            .ok_or_else(|| ToolError::InvalidArguments("name is required".into()))?;
        Ok(())
    }

    async fn execute(
        &self,
        args: Value,
        ctx: &ToolCallContext,
    ) -> Result<ToolOutput, ToolError> {
        let name = args["name"].as_str().unwrap_or("world");

        // 读取状态——如果该键尚未设置则返回 None。
        let count = ctx.state::<GreetCount>().copied().unwrap_or(0);

        Ok(ToolResult::success("greet", json!({
            "greeting": format!("Hello, {}!", name),
            "times_greeted": count,
        })).into())
    }
}
fn main() {}

3. 注册 Tool

use std::sync::Arc;
use async_trait::async_trait;
use serde_json::{json, Value};
use awaken::{StateKey, KeyScope, MergeStrategy};
use awaken::contract::tool::{Tool, ToolDescriptor, ToolResult, ToolOutput, ToolError, ToolCallContext};
struct GreetCount;
impl StateKey for GreetCount {
    const KEY: &'static str = "greet_count";
    const MERGE: MergeStrategy = MergeStrategy::Commutative;
    const SCOPE: KeyScope = KeyScope::Run;
    type Value = u32;
    type Update = u32;
    fn apply(value: &mut Self::Value, update: Self::Update) { *value += update; }
}
struct GreetTool;
#[async_trait]
impl Tool for GreetTool {
    fn descriptor(&self) -> ToolDescriptor {
        ToolDescriptor::new("greet", "Greet", "Greet a user by name")
    }
    async fn execute(&self, args: Value, ctx: &ToolCallContext) -> Result<ToolOutput, ToolError> {
        Ok(ToolResult::success("greet", json!({})).into())
    }
}
use awaken::registry_spec::AgentSpec;
use awaken::AgentRuntimeBuilder;

fn main() -> Result<(), Box<dyn std::error::Error>> {
let agent_spec = AgentSpec::new("assistant")
    .with_model("gpt-4o-mini")
    .with_system_prompt("You are a helpful assistant. Use the greet tool when asked.")
    .with_max_rounds(5);

let runtime = AgentRuntimeBuilder::new()
    .with_agent_spec(agent_spec)
    .with_tool("greet", Arc::new(GreetTool))
    .build()?;
Ok(())
}

4. 运行

use std::sync::Arc;
use async_trait::async_trait;
use serde_json::{json, Value};
use awaken::{StateKey, KeyScope, MergeStrategy};
use awaken::contract::tool::{Tool, ToolDescriptor, ToolResult, ToolOutput, ToolError, ToolCallContext};
use awaken::registry_spec::AgentSpec;
use awaken::AgentRuntimeBuilder;
struct GreetCount;
impl StateKey for GreetCount {
    const KEY: &'static str = "greet_count";
    const MERGE: MergeStrategy = MergeStrategy::Commutative;
    const SCOPE: KeyScope = KeyScope::Run;
    type Value = u32;
    type Update = u32;
    fn apply(value: &mut Self::Value, update: Self::Update) { *value += update; }
}
struct GreetTool;
#[async_trait]
impl Tool for GreetTool {
    fn descriptor(&self) -> ToolDescriptor {
        ToolDescriptor::new("greet", "Greet", "Greet a user by name")
    }
    async fn execute(&self, args: Value, ctx: &ToolCallContext) -> Result<ToolOutput, ToolError> {
        Ok(ToolResult::success("greet", json!({})).into())
    }
}
use awaken::contract::message::Message;
use awaken::contract::event_sink::VecEventSink;
use awaken::RunRequest;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let runtime = AgentRuntimeBuilder::new()
    .with_agent_spec(AgentSpec::new("assistant").with_model("gpt-4o-mini"))
    .with_tool("greet", Arc::new(GreetTool))
    .build()?;
let request = RunRequest::new(
    "thread-1",
    vec![Message::user("Greet Alice")],
)
.with_agent_id("assistant");

let sink = Arc::new(VecEventSink::new());
runtime.run(request, sink.clone()).await?;
Ok(())
}

5. 验证

检查收集到的事件中是否包含 name == "greet"ToolCallDone 事件:

use awaken::contract::event::AgentEvent;

let events = sink.take();
let tool_done = events.iter().any(|e| matches!(
    e,
    AgentEvent::ToolCallDone { id: _, message_id: _, result: _, outcome: _ }
));
println!("tool_call_done_seen: {}", tool_done);

预期结果:

  • tool_call_done_seen: true
  • ToolCallDone 中的 result 包含 greetingtimes_greeted 字段。

你创建了什么

一个工具,它:

  1. 通过 descriptor() 声明参数的 JSON Schema。
  2. 通过 validate_args() 在执行前验证参数。
  3. 通过 ctx.state::<K>() 从快照读取类型化状态。
  4. 通过 ToolResult::success() 返回结构化 JSON。

StateKey trait 提供了类型安全的、有范围的状态管理,无需手动操作原始 JSON。

下一步阅读

常见错误

  • ctx.state::<K>() 返回 None:该状态键在本次运行中尚未写入。对数值类型使用 .unwrap_or_default().copied().unwrap_or(0)
  • StateError::KeyEncode / StateError::KeyDecodeValue 类型无法通过 JSON 往返序列化。确保正确派生了 SerializeDeserialize
  • ToolError::InvalidArguments 未被触发:运行时在 execute 之前调用 validate_args。如果跳过验证,错误输入将到达 execute 并可能在 .unwrap() 处崩溃。
  • 范围不匹配:KeyScope::Run 状态在运行之间被清除。如果需要持久化,使用 KeyScope::Thread

构建 Agent 路径

当你已经理解基础运行流程,接下来就进入这条路径,把 Agent 能力逐步拼装完整。

推荐顺序

  1. 先读 构建 Agent,确定 runtime、model registry 和 agent spec 的骨架。
  2. 再读 添加 Tool添加 Plugin,用安全的方式扩展行为。
  3. 需要发现与委托能力时,继续阅读 使用 Skills 子系统使用 MCP Tools使用 Agent Handoff
  4. 需要更具体的能力时,再补上 使用 Reminder 插件使用 Generative UI使用延迟加载工具

建议搭配阅读

构建 Agent

当你需要把 agent spec、tools、provider 和持久化组装成一个可运行的 AgentRuntime 时,使用本页。

前置条件

  • 已在 Cargo.toml 中加入 awaken
  • 已有一个 LlmExecutor 实现
  • 了解 AgentSpecAgentRuntimeBuilder

步骤

  1. 定义 agent spec:
use awaken::engine::GenaiExecutor;
use awaken::{AgentSpec, AgentRuntimeBuilder, ModelSpec};

let spec = AgentSpec::new("assistant")
    .with_model("claude-sonnet")
    .with_system_prompt("You are a helpful assistant.")
    .with_max_rounds(10);
  1. 注册 tools:
use std::sync::Arc;

let builder = AgentRuntimeBuilder::new()
    .with_agent_spec(spec)
    .with_tool("search", Arc::new(SearchTool))
    .with_tool("calculator", Arc::new(CalculatorTool));
  1. 注册 provider 和 model:
let builder = builder
    .with_provider("anthropic", Arc::new(GenaiExecutor::new()))
    .with_model("claude-sonnet", ModelSpec {
        id: "claude-sonnet".into(),
        provider: "anthropic".into(),
        model: "claude-sonnet-4-20250514".into(),
    });
  1. 挂接持久化:
use awaken::stores::InMemoryStore;

let store = Arc::new(InMemoryStore::new());
let builder = builder.with_thread_run_store(store);
  1. 构建并校验:
let runtime = builder.build()?;

build() 会在启动时就解析并校验所有注册项,提前发现缺失的 model、provider 或 plugin。

  1. 执行一次 run:
use std::sync::Arc;
use awaken::RunRequest;
use awaken::contract::event_sink::VecEventSink;

let request = RunRequest::new("thread-1", vec![user_message])
    .with_agent_id("assistant");

let sink = Arc::new(VecEventSink::new());
let handle = runtime.run(request, sink.clone()).await?;

验证

如果启用了 server,可访问 /health;否则直接检查 AgentRunResult 是否成功完成。

常见错误

错误原因修复
BuildError::ValidationFailedspec 引用了未注册的 model/providerbuild() 前补齐注册
BuildError::State多个插件重复注册同一状态键保证状态键只注册一次
运行期 RuntimeErrorprovider 推理失败检查凭据和模型 ID

相关示例

examples/src/research/

关键文件

  • crates/awaken-runtime/src/builder.rs
  • crates/awaken-contract/src/registry_spec.rs
  • crates/awaken-runtime/src/runtime/agent_runtime/mod.rs

相关

添加 Tool

当你需要给 agent 暴露一个自定义能力时,使用本页。

前置条件

  • Cargo.toml 里已经加入 awaken
  • 已添加 async-traitserde_json

步骤

  1. 实现 Tool trait。
use async_trait::async_trait;
use serde_json::{Value, json};
use awaken::contract::tool::{Tool, ToolCallContext, ToolDescriptor, ToolError, ToolResult, ToolOutput};

pub struct WeatherTool;

#[async_trait]
impl Tool for WeatherTool {
    fn descriptor(&self) -> ToolDescriptor {
        ToolDescriptor::new("get_weather", "Get Weather", "Fetch current weather for a city")
            .with_parameters(json!({
                "type": "object",
                "properties": {
                    "city": {
                        "type": "string",
                        "description": "City name"
                    }
                },
                "required": ["city"]
            }))
    }

    async fn execute(&self, args: Value, _ctx: &ToolCallContext) -> Result<ToolOutput, ToolError> {
        let city = args["city"]
            .as_str()
            .ok_or_else(|| ToolError::InvalidArguments("Missing 'city'".into()))?;

        let weather = fetch_weather(city).await?;

        Ok(ToolResult::success("get_weather", json!({ "forecast": weather })).into())
    }
}
  1. 如有需要,重写参数校验:
fn validate_args(&self, args: &Value) -> Result<(), ToolError> {
    if !args.get("city").and_then(|v| v.as_str()).is_some_and(|s| !s.is_empty()) {
        return Err(ToolError::InvalidArguments("'city' must be a non-empty string".into()));
    }
    Ok(())
}

validate_args 会在 execute 之前运行,可以让你提前拒绝格式错误的输入。

  1. 在 builder 中注册 tool:
use std::sync::Arc;
use awaken::AgentRuntimeBuilder;

let runtime = AgentRuntimeBuilder::new()
    .with_tool("get_weather", Arc::new(WeatherTool))
    .with_agent_spec(spec)
    .with_provider("anthropic", Arc::new(provider))
    .build()?;

with_tool 的字符串 ID 必须和 ToolDescriptor::new 里的 id 一致。

  1. 或者在插件中注册:

    工具也可以在 Plugin::register 方法中通过 PluginRegistrar 注册:

fn register(&self, registrar: &mut PluginRegistrar) -> Result<(), StateError> {
    registrar.register_tool("get_weather", Arc::new(WeatherTool))?;
    Ok(())
}

通过插件注册的工具仅对激活了该插件的 agent 可见。

验证

发送一条应当触发该 tool 的消息,确认 run 结果里出现了预期的 tool 调用和返回值。

常见错误

错误原因修复
ToolError::InvalidArgumentsLLM 传了错误 JSON收紧参数 schema,给模型更明确约束
tool 从未被调用descriptor 的 id 与注册 ID 不一致保证两者完全一致
ToolError::ExecutionFailedexecute 内部运行时错误返回清晰错误信息,让 agent 能据此调整

相关示例

examples/src/research/tools.rs

关键文件

  • crates/awaken-contract/src/contract/tool.rs
  • crates/awaken-runtime/src/builder.rs

相关

添加 Plugin

当你需要通过 state key、phase hook、scheduled action 或 effect handler 扩展 agent 生命周期时,使用本页。

前置条件

  • 已在 Cargo.toml 中添加 awaken
  • 已了解 PhaseStateKey

步骤

  1. 定义一个状态键:
use awaken::{StateKey, KeyScope, MergeStrategy, StateError, JsonValue};
use serde::{Serialize, Deserialize};

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AuditLog {
    pub entries: Vec<String>,
}

pub struct AuditLogKey;

impl StateKey for AuditLogKey {
    type Value = AuditLog;
    const KEY: &'static str = "audit_log";
    const MERGE: MergeStrategy = MergeStrategy::Exclusive;

    type Update = AuditLog;

    fn apply(value: &mut Self::Value, update: Self::Update) {
        *value = update;
    }

    fn encode(value: &Self::Value) -> Result<JsonValue, StateError> {
        serde_json::to_value(value).map_err(|e| StateError::KeyEncode { key: Self::KEY.into(), message: e.to_string() })
    }

    fn decode(json: JsonValue) -> Result<Self::Value, StateError> {
        serde_json::from_value(json).map_err(|e| StateError::KeyDecode { key: Self::KEY.into(), message: e.to_string() })
    }
}
  1. 实现一个 phase hook:
use async_trait::async_trait;
use awaken::{PhaseHook, PhaseContext, StateCommand, StateError};

pub struct AuditHook;

#[async_trait]
impl PhaseHook for AuditHook {
    async fn run(&self, ctx: &PhaseContext) -> Result<StateCommand, StateError> {
        let mut log = ctx.state::<AuditLogKey>().cloned().unwrap_or(AuditLog {
            entries: Vec::new(),
        });
        log.entries.push(format!("Phase executed at {:?}", ctx.phase));
        let mut cmd = StateCommand::new();
        cmd.update::<AuditLogKey>(log);
        Ok(cmd)
    }
}
  1. 实现 Plugin trait:
use awaken::{Plugin, PluginDescriptor, PluginRegistrar, Phase, StateError, StateKeyOptions, KeyScope};

pub struct AuditPlugin;

impl Plugin for AuditPlugin {
    fn descriptor(&self) -> PluginDescriptor {
        PluginDescriptor { name: "audit" }
    }

    fn register(&self, registrar: &mut PluginRegistrar) -> Result<(), StateError> {
        registrar.register_key::<AuditLogKey>(StateKeyOptions {
            scope: KeyScope::Run,
            ..Default::default()
        })?;

        registrar.register_phase_hook(
            "audit",
            Phase::AfterInference,
            AuditHook,
        )?;

        Ok(())
    }
}
  1. 在 runtime 上注册插件,并在 agent 上激活它:
use std::sync::Arc;
use awaken::engine::GenaiExecutor;
use awaken::{AgentSpec, AgentRuntimeBuilder, ModelSpec};

let spec = AgentSpec::new("assistant")
    .with_model("claude-sonnet")
    .with_system_prompt("You are a helpful assistant.")
    .with_hook_filter("audit");

let runtime = AgentRuntimeBuilder::new()
    .with_plugin("audit", Arc::new(AuditPlugin))
    .with_agent_spec(spec)
    .with_provider("anthropic", Arc::new(GenaiExecutor::new()))
    .with_model("claude-sonnet", ModelSpec {
        id: "claude-sonnet".into(),
        provider: "anthropic".into(),
        model: "claude-sonnet-4-20250514".into(),
    })
    .build()?;

with_hook_filter 会为该 agent 激活指定插件的 phase hook。

验证

运行 agent 后查看状态快照,确认 audit_log 中出现了 hook 写入的条目。

常见错误

错误原因修复
StateError::KeyAlreadyRegistered多个插件注册了同一个 key保证每个 StateKey::KEY 全局唯一
StateError::UnknownKey读取了未注册的状态键确保注册该 key 的插件已激活
hook 没有执行with_hook_filter 中没有对应插件 ID把插件 ID 加到 agent spec

相关示例

crates/awaken-ext-observability/

关键文件

  • crates/awaken-runtime/src/plugins/lifecycle.rs
  • crates/awaken-runtime/src/plugins/registry.rs
  • crates/awaken-runtime/src/hooks/phase_hook.rs

相关

使用 Skills 子系统

当你希望 agent 在运行时发现、激活并按需加载技能包(指令、资源、脚本)时,使用本页。

前置条件

  • 已有可运行的 awaken runtime
  • awaken 启用了 skills
[dependencies]
awaken = { package = "awaken-agent", version = "0.1", features = ["skills"] }
tokio = { version = "1", features = ["full"] }
serde_json = "1"

步骤

  1. 创建技能目录:
skills/
  my-skill/
    SKILL.md
  another-skill/
    SKILL.md

示例 skills/my-skill/SKILL.md

---
name: my-skill
description: Helps with data analysis tasks
allowed-tools: read_file web_search
---
# My Skill

When this skill is active, focus on data analysis.
Use read_file to load datasets and web_search for reference material.
  1. 发现文件系统技能:
use std::sync::Arc;
use awaken::ext_skills::{FsSkill, InMemorySkillRegistry, SkillSubsystem};

let result = FsSkill::discover("./skills").expect("skill discovery failed");
for warning in &result.warnings {
    eprintln!("skill warning: {warning:?}");
}

let skills = FsSkill::into_arc_skills(result.skills);
let registry = Arc::new(InMemorySkillRegistry::from_skills(skills));
  1. 也可以嵌入编译期技能:
use std::sync::Arc;
use awaken::ext_skills::{EmbeddedSkill, EmbeddedSkillData, InMemorySkillRegistry};

const SKILL_MD: &str = "\
---
name: builtin-skill
description: A compiled-in skill
allowed-tools: read_file
---
Builtin Skill

Follow these instructions when active.
";

let skill = EmbeddedSkill::new(&EmbeddedSkillData {
    skill_md: SKILL_MD,
    references: &[],
    assets: &[],
}).expect("valid skill");

let registry = Arc::new(InMemorySkillRegistry::from_skills(vec![Arc::new(skill)]));
  1. 接入 runtime:
use std::sync::Arc;
use awaken::engine::GenaiExecutor;
use awaken::ext_skills::SkillSubsystem;
use awaken::registry_spec::{AgentSpec, ModelSpec};
use awaken::{AgentRuntimeBuilder, Plugin};

let subsystem = SkillSubsystem::new(registry);
let agent_spec = AgentSpec::new("skills-agent")
    .with_model("gpt-4o-mini")
    .with_system_prompt("Discover and activate skills when specialized help is useful.")
    .with_hook_filter("skills-discovery")
    .with_hook_filter("skills-active-instructions");

let runtime = AgentRuntimeBuilder::new()
    .with_provider("openai", Arc::new(GenaiExecutor::new()))
    .with_model(
        "gpt-4o-mini",
        ModelSpec {
            id: "gpt-4o-mini".into(),
            provider: "openai".into(),
            model: "gpt-4o-mini".into(),
        },
    )
    .with_agent_spec(agent_spec)
    .with_plugin(
        "skills-discovery",
        Arc::new(subsystem.discovery_plugin()) as Arc<dyn Plugin>,
    )
    .with_plugin(
        "skills-active-instructions",
        Arc::new(subsystem.active_instructions_plugin()) as Arc<dyn Plugin>,
    )
    .build()
    .expect("failed to build runtime");

SkillDiscoveryPlugin 会把技能目录注入到上下文中,并注册三个工具:

Tool用途
skill按名称激活技能
load_skill_resource读取技能资源或引用资料
skill_script运行技能脚本

验证

  1. 运行 agent,让它列出可用 skills
  2. LLM 应该能看到技能目录,并调用 skill 激活某个技能
  3. 激活后,后续推理会自动注入该 skill 的指令

常见错误

错误原因修复
没发现技能目录结构不对每个技能必须位于子目录里,并有 SKILL.md
SkillMaterializeErrorfrontmatter 无效至少提供 namedescription
skill tools 不可用相关插件未注册同时注册 discovery 和 active-instructions 两个插件
feature 不存在Cargo 没开 skillsCargo.toml 中启用

相关示例

  • crates/awaken-ext-skills/tests/

关键文件

  • crates/awaken-ext-skills/src/lib.rs
  • crates/awaken-ext-skills/src/registry.rs
  • crates/awaken-ext-skills/src/plugin/subsystem.rs
  • crates/awaken-ext-skills/src/plugin/discovery.rs
  • crates/awaken-ext-skills/src/embedded.rs
  • crates/awaken-ext-skills/src/tools.rs

相关

使用 MCP Tools

当你想连接外部 Model Context Protocol(MCP)server,并把它们的工具暴露给 awaken agent 时,使用本页。

前置条件

  • 已有可运行的 awaken runtime
  • awaken 启用了 mcp
  • 有一个可连接的 MCP server(stdio 或 HTTP/SSE)
[dependencies]
awaken = { package = "awaken-agent", version = "0.1", features = ["mcp"] }
tokio = { version = "1", features = ["full"] }
serde_json = "1"

步骤

  1. 配置 MCP server 连接:
use awaken::ext_mcp::McpServerConnectionConfig;

let stdio_config = McpServerConnectionConfig::stdio(
    "my-mcp-server",
    "node",
    vec!["server.js".into()],
);

let http_config = McpServerConnectionConfig::http(
    "remote-server",
    "http://localhost:8080/sse",
);
  1. 创建 registry manager 并发现工具:
use awaken::ext_mcp::McpToolRegistryManager;

let manager = McpToolRegistryManager::connect(vec![stdio_config, http_config])
    .await
    .expect("failed to connect MCP servers");

let registry = manager.registry();
for id in registry.ids() {
    println!("discovered: {id}");
}

MCP 工具的 ID 格式通常是 mcp__{server}__{tool}

  1. 把工具注册进 runtime:
use std::sync::Arc;
use awaken::engine::GenaiExecutor;
use awaken::ext_mcp::McpPlugin;
use awaken::registry_spec::{AgentSpec, ModelSpec};
use awaken::{AgentRuntimeBuilder, Plugin};

let agent_spec = AgentSpec::new("mcp-agent")
    .with_model("gpt-4o-mini")
    .with_system_prompt("Use MCP tools when they help answer the user.")
    .with_hook_filter("mcp");

let mut builder = AgentRuntimeBuilder::new()
    .with_provider("openai", Arc::new(GenaiExecutor::new()))
    .with_model(
        "gpt-4o-mini",
        ModelSpec {
            id: "gpt-4o-mini".into(),
            provider: "openai".into(),
            model: "gpt-4o-mini".into(),
        },
    )
    .with_agent_spec(agent_spec)
    .with_plugin("mcp", Arc::new(McpPlugin) as Arc<dyn Plugin>);

for (id, tool) in registry.snapshot() {
    builder = builder.with_tool(&id, tool);
}

let runtime = builder.build().expect("failed to build runtime");
  1. 如有需要,开启周期性刷新:
use std::time::Duration;

manager.start_periodic_refresh(Duration::from_secs(60));

验证

  1. 运行 agent,并让它调用来自 MCP server 的工具
  2. 检查后端日志里的 MCP 工具调用
  3. 返回结果中应带有 mcp.servermcp.tool 元数据

常见错误

错误原因修复
McpError::TransportErrorMCP server 未启动或不可达检查进程和 URL / 命令
没发现任何工具server 返回空工具列表确认 server 实现了 tools/list
调用超时server 响应太慢调大 transport timeout
feature 不存在没开 cargo feature启用 mcp
找不到 mcp__server__tool发现的工具没注册到 builder遍历 registry 并逐个 with_tool

相关示例

  • crates/awaken-ext-mcp/tests/

关键文件

  • crates/awaken-ext-mcp/src/lib.rs
  • crates/awaken-ext-mcp/src/manager.rs
  • crates/awaken-ext-mcp/src/config.rs
  • crates/awaken-ext-mcp/src/plugin.rs
  • crates/awaken-ext-mcp/src/transport.rs
  • crates/awaken-ext-mcp/tests/mcp_tests.rs

相关

使用 Reminder 插件

当你希望 agent 在工具执行后,根据模式匹配自动收到上下文提示时,使用本页。例如:修改 .toml 后提醒执行 cargo check,或在危险命令后给出警告。

前置条件

  • 已有可运行的 awaken runtime
  • awaken 启用了 reminder
[dependencies]
awaken = { package = "awaken-agent", version = "0.1", features = ["reminder"] }
tokio = { version = "1", features = ["full"] }
serde_json = "1"

步骤

  1. 用规则注册 reminder 插件:
use std::sync::Arc;
use awaken::engine::GenaiExecutor;
use awaken::ext_reminder::{ReminderPlugin, ReminderRulesConfig};
use awaken::registry_spec::{AgentSpec, ModelSpec};
use awaken::{AgentRuntimeBuilder, Plugin};

let json = r#"{
    "rules": [
        {
            "tool": "Bash(command ~ 'rm *')",
            "output": { "status": "success" },
            "message": {
                "target": "suffix_system",
                "content": "A deletion command just succeeded. Verify the result."
            }
        }
    ]
}"#;

let config = ReminderRulesConfig::from_str(json, Some("json"))
    .expect("failed to parse reminder config");
let rules = config.into_rules().expect("invalid rules");
let agent_spec = AgentSpec::new("my-agent")
    .with_model("claude-sonnet")
    .with_system_prompt("You are a helpful assistant.")
    .with_hook_filter("reminder");

let runtime = AgentRuntimeBuilder::new()
    .with_provider("anthropic", Arc::new(GenaiExecutor::new()))
    .with_model(
        "claude-sonnet",
        ModelSpec {
            id: "claude-sonnet".into(),
            provider: "anthropic".into(),
            model: "claude-3-7-sonnet-latest".into(),
        },
    )
    .with_agent_spec(agent_spec)
    .with_plugin("reminder", Arc::new(ReminderPlugin::new(rules)) as Arc<dyn Plugin>)
    .build()
    .expect("failed to build runtime");

该插件会在 AfterToolExecute 阶段检查工具名、参数和结果;一旦命中规则,就会调度 AddContextMessage 注入提示。

  1. 用工具模式定义规则:
Pattern匹配
"Bash"精确工具名
"*"任意工具
"mcp__*"所有 MCP 工具
"Bash(command ~ 'rm *')"主参数 glob
"Edit(file_path ~ '*.toml')"命名字段 glob
  1. 配置 output 匹配:

output 可以是:

  • "any"
  • { "status": ..., "content": ... }

status 支持:"success""error""pending""any"

content 支持两类:

  • 文本 glob
  • JSON 字段匹配
  1. 选择消息注入目标:
Target位置
"system"system prompt 前部
"suffix_system"system prompt 后部
"session"session 级 system message
"conversation"conversation 级 system message
  1. 用 cooldown 避免重复注入:
{
  "message": {
    "target": "system",
    "content": "Remember to be careful with file operations.",
    "cooldown_turns": 5
  }
}
  1. 也可以从文件加载规则:
use awaken::ext_reminder::ReminderRulesConfig;

let config = ReminderRulesConfig::from_file("reminders.json")
    .expect("failed to load reminder config");
  1. 在 agent spec 上激活插件:
let agent_spec = AgentSpec::new("my-agent")
    .with_model("claude-sonnet")
    .with_system_prompt("You are a helpful assistant.")
    .with_hook_filter("reminder");

验证

  1. 配一个容易命中的规则,例如 "*" + "any"
  2. 运行 agent 并调用一个工具
  3. 打开 debug tracing,应该看到 reminder 命中的日志
  4. 确认下一轮推理 prompt 中出现了提醒消息

常见错误

错误原因修复
InvalidPattern工具模式写错按 DSL 语法检查引号和通配规则
InvalidTarget消息目标无效只能用 system / suffix_system / session / conversation
InvalidOutputoutput 结构无效使用 "any" 或结构化对象
InvalidOp字段匹配操作符未知使用 glob / exact / regex / not_* 系列
规则从不触发插件没激活在 agent spec 上加 with_hook_filter("reminder")
规则触发太频繁没设置 cooldown设置正数 cooldown_turns

相关示例

  • crates/awaken-ext-reminder/src/config.rs

关键文件

路径作用
crates/awaken-ext-reminder/src/lib.rs模块根及公共再导出
crates/awaken-ext-reminder/src/config.rsReminderRulesConfig、JSON 加载、ReminderConfigKey
crates/awaken-ext-reminder/src/rule.rsReminderRule 结构定义
crates/awaken-ext-reminder/src/output_matcher.rsOutputMatcherContentMatcher、状态/内容匹配逻辑
crates/awaken-ext-reminder/src/plugin/plugin.rsReminderPlugin 注册(AfterToolExecute hook)
crates/awaken-ext-reminder/src/plugin/hook.rsReminderHook——每次工具调用的模式与输出评估
crates/awaken-tool-pattern/共享 glob/regex 模式匹配库

相关

使用 Generative UI(A2UI)

当你希望 agent 把声明式 UI 组件发送给前端,而不是只返回文本时,使用本页。

前置条件

  • 已有可运行的 runtime
  • 前端能够消费 A2UI 消息(例如 CopilotKit 或 AI SDK 集成)
  • 前端已经注册了组件目录(catalog)
[dependencies]
awaken = { package = "awaken-agent", version = "0.1" }
tokio = { version = "1", features = ["full"] }
serde_json = "1"

步骤

  1. 注册 A2UI 插件:
use std::sync::Arc;
use awaken::engine::GenaiExecutor;
use awaken::ext_generative_ui::A2uiPlugin;
use awaken::registry_spec::{AgentSpec, ModelSpec};
use awaken::{AgentRuntimeBuilder, Plugin};

let plugin = A2uiPlugin::with_catalog_id("my-catalog");
let agent_spec = AgentSpec::new("ui-agent")
    .with_model("gpt-4o-mini")
    .with_system_prompt("Render structured UI when visual output helps.")
    .with_hook_filter("generative-ui");

let runtime = AgentRuntimeBuilder::new()
    .with_provider("openai", Arc::new(GenaiExecutor::new()))
    .with_model(
        "gpt-4o-mini",
        ModelSpec {
            id: "gpt-4o-mini".into(),
            provider: "openai".into(),
            model: "gpt-4o-mini".into(),
        },
    )
    .with_agent_spec(agent_spec)
    .with_plugin("generative-ui", Arc::new(plugin) as Arc<dyn Plugin>)
    .build()
    .expect("failed to build runtime");

插件会注册一个 render_a2ui 工具,LLM 通过它把 A2UI 消息数组发给前端。

  1. 理解 A2UI v0.8 消息类型:
消息类型作用
createSurface创建渲染 surface
updateComponents定义或更新组件树
updateDataModel写入或更新数据模型
deleteSurface删除 surface

消息顺序通常是:先创建 surface,再定义组件,最后填充数据。

  1. 创建 surface:
let messages = serde_json::json!({
    "messages": [
        {
            "version": "v0.8",
            "createSurface": {
                "surfaceId": "order-form-1",
                "catalogId": "my-catalog"
            }
        }
    ]
});
  1. 定义组件树:

组件列表是扁平的,通过 child / children 表示父子关系。必须有一个 "id": "root" 作为入口。

  1. 用 JSON path 绑定数据:

组件属性里可以写 {"path": "/json/pointer"},前端会在渲染时从 data model 里解析。

  1. 删除 surface:
let messages = serde_json::json!({
    "messages": [
        {
            "version": "v0.8",
            "deleteSurface": {
                "surfaceId": "order-form-1"
            }
        }
    ]
});
  1. 一次 tool call 可以携带多条消息:

render_a2ui 接收的是一个消息数组,所以可以在一次调用中同时创建 surface、更新组件树和写入数据。

  1. 自定义插件指令:
let plugin = A2uiPlugin::with_catalog_and_examples(
    "my-catalog",
    "Example: create a card with a title and a button..."
);

let plugin = A2uiPlugin::with_custom_instructions(
    "You can render UI by calling render_a2ui...".to_string()
);

验证

  1. 注册插件后,给 agent 一个“请以可视化方式展示内容”的提示
  2. 确认 agent 调用了 render_a2ui
  3. 事件流里应出现成功结果:{"a2ui": [...], "rendered": true}
  4. 前端上应看到对应 surface 和组件

常见错误

错误原因修复
缺少 messages 字段tool 调用格式不对{"messages": [...]}
messages array must not be empty消息数组为空至少传一条 A2UI 消息
unsupported version版本不是 v0.8每条消息都设为 "version": "v0.8"
单条消息里混入多个类型键一条消息同时含 createSurfaceupdateComponents一条消息只允许一个类型键
组件缺少 idcomponent组件结构不完整补齐必需字段
LLM 不调用工具插件已注册但未激活或 prompt 指令不足检查 hook filter 和插件指令

相关示例

  • crates/awaken-ext-generative-ui/src/a2ui/tests.rs

关键文件

  • crates/awaken-ext-generative-ui/src/a2ui/mod.rs
  • crates/awaken-ext-generative-ui/src/a2ui/plugin.rs
  • crates/awaken-ext-generative-ui/src/a2ui/tool.rs
  • crates/awaken-ext-generative-ui/src/a2ui/types.rs
  • crates/awaken-ext-generative-ui/src/a2ui/validation.rs

相关

使用 Agent Handoff

当你需要在同一个 run、同一条 thread 内,动态切换 agent 的 system prompt、model 或工具集,而不终止 run 时,使用本页。

前置条件

  • 已添加 awaken
  • 了解 PluginStateKeyAgentRuntimeBuilder

概览

handoff 的核心不是启动一个全新 agent run,而是在当前 run 内应用 AgentOverlay,覆盖基础 agent 的一部分配置。

关键类型:

  • HandoffPlugin
  • AgentOverlay
  • HandoffState
  • HandoffAction

步骤

  1. 定义 overlay:
use awaken::extensions::handoff::AgentOverlay;

let researcher = AgentOverlay {
    system_prompt: Some("You are a research specialist. Find and cite sources.".into()),
    model: Some("claude-sonnet".into()),
    allowed_tools: Some(vec!["web_search".into(), "read_document".into()]),
    excluded_tools: None,
};

let writer = AgentOverlay {
    system_prompt: Some("You are a technical writer. Produce clear documentation.".into()),
    model: None,
    allowed_tools: None,
    excluded_tools: Some(vec!["web_search".into()]),
};

model 字段使用的也是模型注册表 ID。

  1. 用 overlays 构建 HandoffPlugin
use std::collections::HashMap;
use awaken::extensions::handoff::HandoffPlugin;

let mut overlays = HashMap::new();
overlays.insert("researcher".to_string(), researcher);
overlays.insert("writer".to_string(), writer);

let handoff = HandoffPlugin::new(overlays);
  1. 把插件注册进 runtime:
use std::sync::Arc;
use awaken::AgentRuntimeBuilder;

let runtime = AgentRuntimeBuilder::new()
    .with_plugin("agent_handoff", Arc::new(handoff))
    .with_agent_spec(spec)
    .with_provider("anthropic", Arc::new(provider))
    .build()?;
  1. 在 tool 或 hook 中请求 handoff:
use awaken::extensions::handoff::{request_handoff, activate_handoff, clear_handoff, ActiveAgentKey};
use awaken::state::StateCommand;

let mut cmd = StateCommand::new();
cmd.update::<ActiveAgentKey>(request_handoff("researcher"));

let mut cmd = StateCommand::new();
cmd.update::<ActiveAgentKey>(activate_handoff("writer"));

let mut cmd = StateCommand::new();
cmd.update::<ActiveAgentKey>(clear_handoff());
  1. 从插件状态里读取 overlay:
let overlay = handoff.overlay("researcher");

它是如何工作的

HandoffState 有两部分:

  • active_agent
  • requested_agent

内部同步 hook 会在 RunStartStepEnd 检测 requested_agent,并在安全边界上把它提升为 active_agent

Handoff vs Delegation

HandoffDelegation
Thread同一 thread、同一 run通常会产生子 agent 执行上下文
状态共享,原地覆盖一般隔离
适用场景切换角色、人设或工具集拆分独立子任务
开销很低更高

常见错误

错误原因修复
overlay 没生效request_handoff 的名字和 overlays map 不一致保证字符串完全一致
StateError::KeyAlreadyRegistered其他插件也注册了 ActiveAgentKey每个 runtime 只保留一个 HandoffPlugin
hook 没有执行插件未激活"agent_handoff" 加到 hook filter

关键文件

  • crates/awaken-runtime/src/extensions/handoff/mod.rs
  • crates/awaken-runtime/src/extensions/handoff/plugin.rs
  • crates/awaken-runtime/src/extensions/handoff/types.rs
  • crates/awaken-runtime/src/extensions/handoff/state.rs
  • crates/awaken-runtime/src/extensions/handoff/action.rs

相关

使用延迟加载工具

当你的代理拥有大量工具,且希望通过在 LLM 需要时才暴露工具 schema 来减少上下文窗口占用时,可以使用此功能。延迟加载工具插件将工具分为 Eager(始终发送)和 Deferred(隐藏直到被请求)两类。LLM 可通过 ToolSearch 工具按需发现延迟加载的工具。

前置条件

  • 一个可运行的 awaken 代理运行时(参见 第一个代理
  • awaken-ext-deferred-tools crate
[dependencies]
awaken-ext-deferred-tools = { version = "0.1" }
awaken = { package = "awaken-agent", version = "0.1" }
tokio = { version = "1", features = ["full"] }
serde_json = "1"

步骤

  1. 创建插件并注册。

收集代理暴露的所有工具描述符,然后将它们传给 DeferredToolsPlugin::new。插件会在激活时使用这些描述符进行工具分类并填充延迟注册表。

use std::sync::Arc;
use awaken::engine::GenaiExecutor;
use awaken::registry_spec::{AgentSpec, ModelSpec};
use awaken::{AgentRuntimeBuilder, Plugin};
use awaken_ext_deferred_tools::DeferredToolsPlugin;

// Collect descriptors from all tools registered on the agent.
let seed_tools = vec![
    weather_tool.descriptor(),
    search_tool.descriptor(),
    debug_tool.descriptor(),
    // ... all tool descriptors
];
let agent_spec = AgentSpec::new("deferred-agent")
    .with_model("gpt-4o-mini")
    .with_system_prompt("Search for tools only when needed.")
    .with_hook_filter("ext-deferred-tools");

let runtime = AgentRuntimeBuilder::new()
    .with_provider("openai", Arc::new(GenaiExecutor::new()))
    .with_model(
        "gpt-4o-mini",
        ModelSpec {
            id: "gpt-4o-mini".into(),
            provider: "openai".into(),
            model: "gpt-4o-mini".into(),
        },
    )
    .with_agent_spec(agent_spec)
    .with_plugin(
        "ext-deferred-tools",
        Arc::new(DeferredToolsPlugin::new(seed_tools)) as Arc<dyn Plugin>,
    )
    .build()
    .expect("failed to build runtime");
  1. 配置工具加载规则。

通过代理规格中的 deferred_tools 配置键设置规则。规则按顺序求值,首次匹配即生效。未匹配任何规则的工具将回退到 default_mode

use awaken_ext_deferred_tools::{
    DeferredToolsConfig, DeferralRule, ToolLoadMode,
};

let config = DeferredToolsConfig {
    rules: vec![
        DeferralRule { tool: "get_weather".into(), mode: ToolLoadMode::Eager },
        DeferralRule { tool: "debug_*".into(), mode: ToolLoadMode::Deferred },
    ],
    default_mode: ToolLoadMode::Deferred,
    ..Default::default()
};

tool 字段支持精确名称和 glob 模式(通过 wildcard_match 实现)。常见模式如下:

模式匹配对象
get_weather精确工具 ID
debug_*所有以 debug_ 开头的工具
mcp__github__*所有 GitHub MCP 工具
  1. 了解自动启用机制。

DeferredToolsConfig 上的 enabled 字段控制激活行为:

行为
Some(true)始终启用延迟加载工具
Some(false)始终禁用
None(默认)当所有延迟工具的总 token 节省量超过 beta_overhead(默认 1136 tokens)时自动启用

在自动启用模式下,插件将每个工具的 schema 成本估算为 len(parameters_json) / 4 tokens,并对所有可延迟工具的节省量求和。如果总节省量超过维护 ToolSearch 工具及上下文中延迟工具列表的开销,则自动激活延迟加载。

  1. 了解 ToolSearch 的工作原理。

插件会自动注册一个 ToolSearch 工具。LLM 通过传入查询字符串来发现延迟加载的工具:

查询格式行为
"select:Tool1,Tool2"按精确 ID 获取指定工具
"+required rest terms"要求包含某个关键词,按其余词项排序
"plain keywords"在 id、名称、描述中进行通用关键词搜索

ToolSearch 返回结果时,匹配的工具会在当前会话的后续过程中被提升为 Eager。该工具最多返回 max_results(默认 5)个匹配的工具 schema,格式为 <functions> 块。

LLM: I need to check the weather. Let me search for relevant tools.
     -> calls ToolSearch { query: "weather forecast" }

ToolSearch returns matching schemas, promotes get_weather to Eager.
Next inference: get_weather schema is included in the tool list.
  1. 配置自动重新延迟(DiscBeta)。

插件使用折扣 Beta 分布跟踪每个工具的使用统计。通过 ToolSearch 提升但不再被使用的工具会自动重新延迟。此过程是自适应的,无需手动调优。

重新延迟在以下条件全部满足时触发:

  • 工具当前为 Eager(从 Deferred 提升而来)
  • 工具未在规则中配置为始终 Eager
  • 工具已空闲至少 defer_after
  • 后验上可信区间(90%)低于 breakeven_p * thresh_mult

盈亏平衡频率为 (c - c_bar) / gamma,其中 c 为完整 schema 成本,c_bar 为仅名称成本。只有当工具的使用频率足够高,使得避免 ToolSearch 调用带来的节省超过每轮开销时,保持 Eager 才是划算的。

DiscBetaParams 的关键参数:

参数默认值用途
omega0.95每轮折扣因子。有效记忆约为 1/(1-omega) = 20 轮
n05.0先验强度,以等价观测数表示
defer_after5考虑重新延迟前的最小空闲轮数
thresh_mult0.5盈亏平衡频率的延迟阈值乘数
gamma2000.0ToolSearch 调用的估计 token 成本

这些参数位于 DeferredToolsConfig.disc_beta 下:

use awaken_ext_deferred_tools::{DeferredToolsConfig, DiscBetaParams};

let config = DeferredToolsConfig {
    disc_beta: DiscBetaParams {
        omega: 0.95,
        n0: 5.0,
        defer_after: 5,
        thresh_mult: 0.5,
        gamma: 2000.0,
    },
    beta_overhead: 1136.0,
    ..Default::default()
};
  1. 启用跨会话学习。

通过 AgentToolPriors(一个 ProfileKey),使用频率通过 EWMA(指数加权移动平均)在会话间持久化。会话结束时,PersistPriorsHook 将每个工具的存在频率写入配置存储;下次会话开始时,LoadPriorsHook 读取这些数据,并用学习到的先验(而非默认的 0.01)初始化 Beta 分布。

这需要在运行时配置 ProfileStore。除了插件注册外无需额外代码——钩子会自动接入。

EWMA 平滑因子为 lambda = max(0.1, 1/(n+1)),其中 n 为会话计数。早期会话贡献相等;10 次会话后因子稳定在 0.1,即 90% 的权重来自历史数据。

验证

  1. 运行代理并触发一次推理。检查日志中的 deferred_tools.list 上下文消息,其中列出了所有延迟加载的工具名称。

  2. 从运行时快照中读取 DeferralState,查看每个工具的当前模式:

use awaken_ext_deferred_tools::state::{DeferralState, DeferralStateValue};

let state: &DeferralStateValue = snapshot.state::<DeferralState>()
    .expect("DeferralState not found");

for (tool_id, mode) in &state.modes {
    println!("{tool_id}: {mode:?}");
}
  1. 向 LLM 提一个需要使用延迟工具的问题。确认 ToolSearch 被调用,且该工具在后续轮次中被提升为 Eager。

  2. 经过数轮不活跃后,通过检查快照中工具是否恢复为 Deferred 模式来验证重新延迟功能。

常见错误

症状原因修复方法
所有工具都发送给 LLM(无延迟加载)enabled: Some(false) 或总节省量低于 beta_overhead设置 enabled: Some(true) 或添加更多工具使节省量超过开销
插件已注册但无工具被延迟所有规则都解析为 Eager设置 default_mode: ToolLoadMode::Deferred 或添加 Deferred 规则
LLM 无法使用 ToolSearch插件未注册使用种子工具描述符注册 DeferredToolsPlugin
工具从未被重新延迟defer_after 过高或工具使用频繁降低 defer_after 或增大 thresh_mult
跨会话先验未加载未配置 ProfileStore在运行时接入配置存储
ToolSearch 无返回结果工具不在延迟注册表中检查该工具是否包含在传给插件的 seed_tools 列表中

关键文件

路径用途
crates/awaken-ext-deferred-tools/src/lib.rs模块根及公共再导出
crates/awaken-ext-deferred-tools/src/config.rsDeferredToolsConfigDeferralRuleToolLoadModeDiscBetaParams
crates/awaken-ext-deferred-tools/src/plugin/plugin.rsDeferredToolsPlugin 注册
crates/awaken-ext-deferred-tools/src/plugin/hooks.rs阶段钩子(BeforeInference、AfterToolExecute、AfterInference、RunStart、RunEnd)
crates/awaken-ext-deferred-tools/src/tool_search.rsToolSearchTool 实现与查询解析
crates/awaken-ext-deferred-tools/src/policy.rsConfigOnlyPolicyDiscBetaEvaluator
crates/awaken-ext-deferred-tools/src/state.rs状态键:DeferralStateDeferralRegistryDiscBetaStateToolUsageStatsAgentToolPriors

相关文档

服务与集成

这条路径面向需要把本地 runtime 暴露给其他系统调用的场景。

从这里开始

  1. 阅读 通过 SSE 暴露 HTTP,先把 runtime 放到 HTTP 和流式端点后面。
  2. 阅读 集成 AI SDK 前端,对接 React + AI SDK v6。
  3. 阅读 集成 CopilotKit(AG-UI),对接 CopilotKit 前端。

建议同时查阅

通过 SSE 暴露 HTTP

当你需要通过 HTTP + Server-Sent Events 对外提供 agent 服务,并挂上多种协议适配器(AI SDK、AG-UI、A2A、MCP)时,使用本页。

前置条件

  • awaken 启用了 server feature
  • tokio 启用了 rt-multi-threadsignal
  • 已构建好一个 AgentRuntime

步骤

  1. 添加依赖:
[dependencies]
awaken = { package = "awaken-agent", version = "...", features = ["server"] }
tokio = { version = "1", features = ["rt-multi-thread", "macros", "signal"] }
  1. 构建 runtime:
use std::sync::Arc;
use awaken::engine::GenaiExecutor;
use awaken::{AgentRuntimeBuilder, AgentSpec, ModelSpec};
use awaken::stores::InMemoryStore;

let store = Arc::new(InMemoryStore::new());

let runtime = AgentRuntimeBuilder::new()
    .with_agent_spec(
        AgentSpec::new("assistant")
            .with_model("claude-sonnet")
            .with_system_prompt("You are a helpful assistant."),
    )
    .with_tool("search", Arc::new(SearchTool))
    .with_provider("anthropic", Arc::new(GenaiExecutor::new()))
    .with_model("claude-sonnet", ModelSpec {
        id: "claude-sonnet".into(),
        provider: "anthropic".into(),
        model: "claude-sonnet-4-20250514".into(),
    })
    .with_thread_run_store(store.clone())
    .build()?;

let runtime = Arc::new(runtime);
  1. 创建应用状态:
use awaken::server::app::{AppState, ServerConfig};
use awaken::server::mailbox::{Mailbox, MailboxConfig};
use awaken::stores::InMemoryMailboxStore;

let mailbox_store = Arc::new(InMemoryMailboxStore::new());
let mailbox = Arc::new(Mailbox::new(
    runtime.clone(),
    mailbox_store,
    "default-consumer".to_string(),
    MailboxConfig::default(),
));

let state = AppState::new(
    runtime.clone(),
    mailbox,
    store,
    runtime.resolver_arc(),
    ServerConfig::default(),
);
  1. 构建 router:
use awaken::server::routes::build_router;

let app = build_router().with_state(state);

build_router() 会注册:

  • /health
  • /v1/threads
  • /v1/runs
  • /v1/config/*/v1/capabilities
  • AI SDK v6、AG-UI、A2A、MCP 路由
  1. 启动服务:
let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await?;
axum::serve(listener, app).await?;
  1. 配置 SSE buffer:
let config = ServerConfig {
    address: "0.0.0.0:8080".into(),
    sse_buffer_size: 128,
    ..ServerConfig::default()
};

验证

curl http://localhost:3000/health

应返回 200 OK。然后可以创建 thread 并启动 run。

常见错误

错误原因修复
端口已占用3000 已被其他进程使用ServerConfig.addressTcpListener::bind
SSE 立即断开客户端不支持 text/event-streamcurl --no-buffer 或标准 SSE 客户端
路由缺失没有启用 server feature确保 awaken 开启 features = ["server"]

相关示例

crates/awaken-server/tests/run_api.rs

关键文件

  • crates/awaken-server/src/app.rs
  • crates/awaken-server/src/routes.rs
  • crates/awaken-server/src/http_sse.rs
  • crates/awaken-server/src/mailbox.rs

相关

集成 AI SDK 前端

当你有一个基于 Vercel AI SDK v6 的 React 前端,并希望把它接到 awaken agent server 上时,使用本页。

前置条件

  • 已有可运行的 awaken runtime
  • awaken 启用了 server
  • Node.js 项目中已安装 @ai-sdk/react
[dependencies]
awaken = { package = "awaken-agent", version = "0.1", features = ["server"] }
tokio = { version = "1", features = ["full"] }
async-trait = "0.1"
serde_json = "1"
tracing-subscriber = "0.3"

步骤

  1. 先启动后端 server:
use std::sync::Arc;

use awaken::engine::GenaiExecutor;
use awaken::contract::storage::ThreadRunStore;
use awaken::registry_spec::{AgentSpec, ModelSpec};
use awaken::stores::{InMemoryMailboxStore, InMemoryStore};
use awaken::AgentRuntimeBuilder;
use awaken::server::app::{AppState, ServerConfig};
use awaken::server::mailbox::{Mailbox, MailboxConfig};
use awaken::server::routes::build_router;

#[tokio::main]
async fn main() {
    tracing_subscriber::fmt().with_target(true).init();

    let agent_spec = AgentSpec::new("my-agent")
        .with_model("gpt-4o-mini")
        .with_system_prompt("You are a helpful assistant.")
        .with_max_rounds(10);

    let runtime = AgentRuntimeBuilder::new()
        .with_provider("openai", Arc::new(GenaiExecutor::new()))
        .with_model(
            "gpt-4o-mini",
            ModelSpec {
                id: "gpt-4o-mini".into(),
                provider: "openai".into(),
                model: "gpt-4o-mini".into(),
            },
        )
        .with_agent_spec(agent_spec)
        .build()
        .expect("failed to build runtime");
    let runtime = Arc::new(runtime);

    let store = Arc::new(InMemoryStore::new());
    let resolver = runtime.resolver_arc();

    let mailbox_store = Arc::new(InMemoryMailboxStore::new());
    let mailbox = Arc::new(Mailbox::new(
        runtime.clone(),
        mailbox_store as Arc<dyn awaken::contract::MailboxStore>,
        format!("ai-sdk:{}", std::process::id()),
        MailboxConfig::default(),
    ));

    let state = AppState::new(
        runtime,
        mailbox,
        store as Arc<dyn ThreadRunStore>,
        resolver,
        ServerConfig {
            address: "127.0.0.1:3000".into(),
            ..Default::default()
        },
    );

    let app = build_router().with_state(state);
    let listener = tokio::net::TcpListener::bind("127.0.0.1:3000").await.unwrap();
    axum::serve(listener, app).await.unwrap();
}

AI SDK v6 相关路由:

  • POST /v1/ai-sdk/chat
  • GET /v1/ai-sdk/chat/:thread_id/stream
  • GET /v1/ai-sdk/threads/:thread_id/stream
  • GET /v1/ai-sdk/threads/:id/messages
  1. 安装前端依赖:
npm install ai @ai-sdk/react
  1. 在前端里使用 useChat
import { useChat } from "@ai-sdk/react";

export default function Chat() {
  const { messages, input, handleInputChange, handleSubmit } = useChat({
    api: "http://localhost:3000/v1/ai-sdk/chat",
    id: "thread-1",
  });

  return (
    <div>
      {messages.map((m) => (
        <div key={m.id}>
          <strong>{m.role}:</strong> {m.content}
        </div>
      ))}
      <form onSubmit={handleSubmit}>
        <input value={input} onChange={handleInputChange} />
        <button type="submit">Send</button>
      </form>
    </div>
  );
}
  1. 分别启动后端和前端。

验证

  1. 打开前端页面
  2. 发送一条消息
  3. 确认文本是流式出现的
  4. 确认后端日志中出现 RunStart / RunFinish

常见错误

错误原因修复
浏览器 CORS 错误未配置 CORS 中间件给 axum router 加 tower-http CORS
useChat 收不到事件URL 配错确认 api 指向 /v1/ai-sdk/chat
stream closed unexpectedlySSE 缓冲溢出增大 ServerConfig.sse_buffer_size
/v1/ai-sdk/chat 返回 404没开 server featureCargo.toml 里启用

相关示例

  • examples/ai-sdk-starter/agent/src/main.rs

关键文件

路径作用
crates/awaken-server/src/protocols/ai_sdk_v6/http.rsAI SDK v6 路由
crates/awaken-server/src/protocols/ai_sdk_v6/encoder.rsAI SDK v6 SSE encoder
crates/awaken-server/src/routes.rs总路由
crates/awaken-server/src/app.rsAppState / ServerConfig
examples/ai-sdk-starter/agent/src/main.rsAI SDK starter 后端入口

相关

集成 CopilotKit(AG-UI)

当你有一个 CopilotKit React 前端,并想通过 AG-UI 协议接入 awaken agent server 时,使用本页。

前置条件

  • 已有可运行的 awaken runtime
  • awaken 启用了 server
  • Node.js 项目中已安装 @copilotkit/react-core@copilotkit/react-ui
[dependencies]
awaken = { package = "awaken-agent", version = "0.1", features = ["server"] }
tokio = { version = "1", features = ["full"] }
async-trait = "0.1"
serde_json = "1"
tracing-subscriber = "0.3"

步骤

  1. 启动后端 server:
use std::sync::Arc;

use awaken::engine::GenaiExecutor;
use awaken::contract::storage::ThreadRunStore;
use awaken::registry_spec::{AgentSpec, ModelSpec};
use awaken::stores::{InMemoryMailboxStore, InMemoryStore};
use awaken::AgentRuntimeBuilder;
use awaken::server::app::{AppState, ServerConfig};
use awaken::server::mailbox::{Mailbox, MailboxConfig};
use awaken::server::routes::build_router;

#[tokio::main]
async fn main() {
    tracing_subscriber::fmt().with_target(true).init();

    let agent_spec = AgentSpec::new("copilotkit-agent")
        .with_model("gpt-4o-mini")
        .with_system_prompt(
            "You are a CopilotKit-powered assistant. \
             Update shared state and suggest actions when appropriate.",
        )
        .with_max_rounds(10);

    let runtime = AgentRuntimeBuilder::new()
        .with_provider("openai", Arc::new(GenaiExecutor::new()))
        .with_model(
            "gpt-4o-mini",
            ModelSpec {
                id: "gpt-4o-mini".into(),
                provider: "openai".into(),
                model: "gpt-4o-mini".into(),
            },
        )
        .with_agent_spec(agent_spec)
        .build()
        .expect("failed to build runtime");
    let runtime = Arc::new(runtime);

    let store = Arc::new(InMemoryStore::new());
    let resolver = runtime.resolver_arc();

    let mailbox_store = Arc::new(InMemoryMailboxStore::new());
    let mailbox = Arc::new(Mailbox::new(
        runtime.clone(),
        mailbox_store as Arc<dyn awaken::contract::MailboxStore>,
        format!("copilotkit:{}", std::process::id()),
        MailboxConfig::default(),
    ));

    let state = AppState::new(
        runtime,
        mailbox,
        store as Arc<dyn ThreadRunStore>,
        resolver,
        ServerConfig {
            address: "127.0.0.1:3000".into(),
            ..Default::default()
        },
    );

    let app = build_router().with_state(state);

    let listener = tokio::net::TcpListener::bind("127.0.0.1:3000")
        .await
        .expect("failed to bind");
    axum::serve(listener, app).await.expect("server crashed");
}

服务器会自动在以下路径注册 AG-UI 路由:

AG-UI 路由包括:

  • POST /v1/ag-ui/run
  • POST /v1/ag-ui/threads/:thread_id/runs
  • POST /v1/ag-ui/agents/:agent_id/runs
  • POST /v1/ag-ui/threads/:thread_id/interrupt
  • GET /v1/ag-ui/threads/:id/messages
  1. 安装 CopilotKit:
npm install @copilotkit/react-core @copilotkit/react-ui
  1. CopilotKit provider 包裹应用:
import { CopilotKit } from "@copilotkit/react-core";
import { CopilotChat } from "@copilotkit/react-ui";
import "@copilotkit/react-ui/styles.css";

export default function App() {
  return (
    <CopilotKit runtimeUrl="http://localhost:3000/v1/ag-ui">
      <CopilotChat
        labels={{ title: "Agent", initial: "How can I help?" }}
      />
    </CopilotKit>
  );
}
  1. 分别启动后端和前端。

验证

  1. 打开页面
  2. 在 CopilotChat 中发送消息
  3. 确认聊天 UI 中有流式回复
  4. 查看后端日志里的 RunStart / RunFinish

常见错误

错误原因修复
浏览器 CORS 错误未配置 CORS 中间件给 axum router 加 CORS
CopilotKit 提示 connection failedruntimeUrl 错了指向 http://localhost:3000/v1/ag-ui
有事件但 UI 不更新AG-UI 事件格式不兼容确认 CopilotKit 版本匹配
/v1/ag-ui/run 返回 404没开 server featureCargo.toml 里启用

相关示例

  • examples/copilotkit-starter/agent/src/main.rs

关键文件

路径作用
crates/awaken-server/src/protocols/ag_ui/http.rsAG-UI 路由
crates/awaken-server/src/protocols/ag_ui/encoder.rsAG-UI encoder
crates/awaken-server/src/routes.rs总路由
crates/awaken-server/src/app.rsAppState / ServerConfig
examples/copilotkit-starter/agent/src/main.rsCopilotKit starter 后端入口

相关

状态与存储

这条路径面向已经不满足无状态演示、需要认真设计状态与持久化的团队。

你可以在这里决定

  • thread / run 数据放在哪里
  • 状态键和合并策略怎么组织
  • 每一轮究竟把多少上下文送给模型

推荐顺序

  1. 使用文件存储使用 Postgres 存储 开始,先确定持久化后端。
  2. 阅读 状态键线程模型,理解状态布局和生命周期。
  3. 当上下文规模开始成为问题时,再阅读 优化上下文窗口

相关内部机制

使用文件存储

当你希望在不引入外部数据库的情况下,用文件系统持久化 threads、runs 和 messages 时,使用本页。

前置条件

  • awaken-stores 启用了 file feature

步骤

  1. 添加依赖:
[dependencies]
awaken-stores = { version = "...", features = ["file"] }

如果使用 awaken 门面 crate,也建议直接加 awaken-stores 来启用 file feature。

  1. 创建 FileStore
use std::sync::Arc;
use awaken::stores::FileStore;

let store = Arc::new(FileStore::new("./data"));

目录会在首次写入时自动创建,布局如下:

./data/
  threads/<thread_id>.json
  messages/<thread_id>.json
  runs/<run_id>.json
  1. 接入 runtime:
use awaken::AgentRuntimeBuilder;

let runtime = AgentRuntimeBuilder::new()
    .with_thread_run_store(store)
    .with_agent_spec(spec)
    .with_provider("anthropic", Arc::new(provider))
    .build()?;
  1. 生产环境建议使用绝对路径:
use std::path::PathBuf;

let data_dir = PathBuf::from("/var/lib/myapp/awaken");
let store = Arc::new(FileStore::new(data_dir));

验证

运行 agent 后检查目录,应该看到 threads/messages/runs/ 下生成了 JSON 文件。

常见错误

错误原因修复
StorageError::Io目录没有读写权限确保进程对目标路径有权限
StorageError::Io 且 ID 为空或非法thread/run ID 包含非法字符使用 UUID 风格或简单字母数字 ID
重启后找不到数据相对路径在不同启动目录下解析不同改用绝对路径

相关示例

crates/awaken-stores/src/file.rs

关键文件

  • crates/awaken-stores/Cargo.toml
  • crates/awaken-stores/src/file.rs
  • crates/awaken-stores/src/lib.rs

相关

使用 Postgres 存储

当你需要可持久化、可多实例共享的存储后端时,使用 PostgreSQL。

前置条件

  • awaken-stores 启用了 postgres feature
  • 有一个可连接的 PostgreSQL 实例
  • sqlx 所需 tokio 运行时依赖已就绪

步骤

  1. 添加依赖:
[dependencies]
awaken-stores = { version = "...", features = ["postgres"] }
  1. 创建连接池:
use sqlx::PgPool;

let pool = PgPool::connect("postgres://user:pass@localhost:5432/mydb").await?;
  1. 创建 PostgresStore
use std::sync::Arc;
use awaken::stores::PostgresStore;

let store = Arc::new(PostgresStore::new(pool));

默认表名:

  • awaken_threads
  • awaken_runs
  • awaken_messages
  1. 使用自定义前缀:
let store = Arc::new(PostgresStore::with_prefix(pool, "myapp"));
  1. 接入 runtime:
use awaken::AgentRuntimeBuilder;

let runtime = AgentRuntimeBuilder::new()
    .with_thread_run_store(store)
    .with_agent_spec(spec)
    .with_provider("anthropic", Arc::new(provider))
    .build()?;
  1. Schema 初始化:

表会在首次访问时自动通过 ensure_schema() 创建。初始接入无需手动 migration。

验证

执行 agent 后,在数据库中查询:

SELECT id, updated_at FROM awaken_threads;
SELECT id, updated_at FROM awaken_runs;

应该能看到对应记录。

常见错误

错误原因修复
sqlx::Error connection refusedPostgreSQL 未启动或连接串错误检查 DATABASE_URL 和数据库状态
首次写入报 StorageError数据库用户权限不足授予建表和写入权限
表名冲突其他应用共用了默认表名with_prefix() 做命名空间隔离

相关示例

crates/awaken-stores/src/postgres.rs

关键文件

  • crates/awaken-stores/Cargo.toml
  • crates/awaken-stores/src/postgres.rs
  • crates/awaken-stores/src/lib.rs

相关

优化上下文窗口

当你需要控制运行时如何管理会话历史,以避免超过模型上下文上限时,使用本页。

前置条件

  • 已添加 awaken
  • 已有一个 AgentSpec

ContextWindowPolicy

每个 agent 都可以配置 ContextWindowPolicy

use awaken::ContextWindowPolicy;

let policy = ContextWindowPolicy {
    max_context_tokens: 200_000,
    max_output_tokens: 16_384,
    min_recent_messages: 10,
    enable_prompt_cache: true,
    autocompact_threshold: Some(100_000),
    compaction_mode: ContextCompactionMode::KeepRecentRawSuffix,
    compaction_raw_suffix_messages: 2,
};

字段

字段类型默认值说明
max_context_tokensusize200_000模型上下文总窗口(token 数)
max_output_tokensusize16_384为模型输出预留的 token
min_recent_messagesusize10始终保留的最近消息数,即使超出预算
enable_prompt_cachebooltrue是否启用 prompt cache
autocompact_thresholdOption<usize>None达到该 token 数时触发自动压缩,None 表示禁用
compaction_modeContextCompactionModeKeepRecentRawSuffix自动压缩策略
compaction_raw_suffix_messagesusize2后缀压缩模式下保留的原始消息数

截断(Truncation)

当消息总量超过预算时,运行时会自动丢弃最旧消息。预算大致为:

available = max_context_tokens - max_output_tokens - tool_schema_tokens

截断时会保留什么

  • 所有 system messages
  • 至少 min_recent_messages 条最近消息
  • 成对的 tool call / tool result
  • 不会留下悬挂的 tool calls

Artifact 压缩

在真正截断前,过大的 tool result 会先被压缩成 preview,以减少 token 占用。system / user / assistant 普通消息不会做这种压缩。

压缩(Compaction)

压缩不会简单丢消息,而是把更早的历史总结成一条摘要消息。

启用自动压缩

let policy = ContextWindowPolicy {
    autocompact_threshold: Some(100_000),
    compaction_mode: ContextCompactionMode::KeepRecentRawSuffix,
    compaction_raw_suffix_messages: 4,
    ..Default::default()
};

ContextCompactionMode

  • KeepRecentRawSuffix:保留最近 N 条原始消息,其余压缩
  • CompactToSafeFrontier:压缩到安全边界为止

安全边界的含义是:不会把某条 tool call 和它对应的结果拆开。

CompactionConfig

压缩子系统通过 CompactionConfig 配置:

use awaken::CompactionConfig;

let config = CompactionConfig {
    summarizer_system_prompt: "You are a conversation summarizer. \
        Preserve all key facts, decisions, tool results, and action items. \
        Be concise but complete.".into(),
    summarizer_user_prompt: "Summarize the following conversation:\n\n{messages}".into(),
    summary_max_tokens: Some(1024),
    summary_model: Some("claude-3-haiku".into()),
    min_savings_ratio: 0.3,
};
字段类型默认值说明
summarizer_system_promptString内置摘要 prompt摘要 LLM 的 system prompt
summarizer_user_promptString"Summarize...\n\n{messages}"摘要用户 prompt 模板;{messages} 会被替换为对话记录
summary_max_tokensOption<u32>None摘要响应的最大 token 数
summary_modelOption<String>None摘要所用模型,默认沿用 agent 模型
min_savings_ratiof640.3接受一次压缩所需的最低 token 节省比例(0.0-1.0)

DefaultSummarizer

内置 DefaultSummarizer 会读取 CompactionConfig,并支持“在已有摘要上继续增量总结”。

摘要存储

压缩结果会被保存成带 <conversation-summary> 标签的内部 system message。重新加载时,历史中较早的已摘要部分不会再被重新放回上下文窗口。

截断恢复

如果 LLM 因 MaxTokens 截断,而且生成到一半的 tool call 参数不完整,运行时可以自动注入 continuation prompt 并重试,直到达到最大重试次数。

关键文件

  • crates/awaken-contract/src/contract/inference.rs
  • crates/awaken-runtime/src/context/transform/mod.rs
  • crates/awaken-runtime/src/context/transform/compaction.rs
  • crates/awaken-runtime/src/context/compaction.rs
  • crates/awaken-runtime/src/context/summarizer.rs
  • crates/awaken-runtime/src/context/plugin.rs
  • crates/awaken-runtime/src/context/truncation.rs

相关

状态键

Awaken 的状态系统提供类型化、带作用域、可持久化的键值存储。插件和工具在编译期声明状态键,运行时负责快照、持久化和并行合并语义。

StateKey trait

每个状态槽位都由一个实现了 StateKey 的类型来标识。

pub trait StateKey: 'static + Send + Sync {
    const KEY: &'static str;
    const MERGE: MergeStrategy = MergeStrategy::Exclusive;
    const SCOPE: KeyScope = KeyScope::Run;

    type Value: Clone + Default + Serialize + DeserializeOwned + Send + Sync + 'static;
    type Update: Send + 'static;

    fn apply(value: &mut Self::Value, update: Self::Update);
    fn encode(value: &Self::Value) -> Result<JsonValue, StateError>;
    fn decode(value: JsonValue) -> Result<Self::Value, StateError>;
}

示例

struct Counter;

impl StateKey for Counter {
    const KEY: &'static str = "counter";
    type Value = usize;
    type Update = usize;

    fn apply(value: &mut Self::Value, update: Self::Update) {
        *value += update;
    }
}

KeyScope

控制键值在 run 边界上的生命周期:

pub enum KeyScope {
    Run,
    Thread,
}

MergeStrategy

决定并行执行下多个 MutationBatch 如何合并:

pub enum MergeStrategy {
    Exclusive,
    Commutative,
}

StateMap

状态值的类型擦除容器。

pub struct StateMap { /* ... */ }

方法

fn contains<K: StateKey>(&self) -> bool
fn get<K: StateKey>(&self) -> Option<&K::Value>
fn get_mut<K: StateKey>(&mut self) -> Option<&mut K::Value>
fn insert<K: StateKey>(&mut self, value: K::Value)
fn remove<K: StateKey>(&mut self) -> Option<K::Value>
fn get_or_insert_default<K: StateKey>(&mut self) -> &mut K::Value

Snapshot

传给 hook 和 ToolCallContext 的不可变、带 revision 的状态视图:

pub struct Snapshot {
    pub revision: u64,
    pub ext: Arc<StateMap>,
}

方法

fn new(revision: u64, ext: Arc<StateMap>) -> Self
fn revision(&self) -> u64
fn get<K: StateKey>(&self) -> Option<&K::Value>
fn ext(&self) -> &StateMap

StateKeyOptions

注册状态键时的选项:

pub struct StateKeyOptions {
    pub persistent: bool,
    pub retain_on_uninstall: bool,
    pub scope: KeyScope,
}

PersistedState

存储后端使用的序列化状态格式:

pub struct PersistedState {
    pub revision: u64,
    pub extensions: HashMap<String, JsonValue>,
}

相关

线程模型

Thread 表示持久化会话。Thread 本身只保存 thread 元信息;消息和 run 历史通过存储 trait 单独管理。

Thread

pub struct Thread {
    pub id: String,
    pub metadata: ThreadMetadata,
}

构造函数

fn new() -> Self
fn with_id(id: impl Into<String>) -> Self

Builder 方法

fn with_title(self, title: impl Into<String>) -> Self

ThreadMetadata

pub struct ThreadMetadata {
    pub created_at: Option<u64>,
    pub updated_at: Option<u64>,
    pub title: Option<String>,
    pub custom: HashMap<String, Value>,
}

存储

消息不直接嵌在 Thread 里,而是通过 ThreadStore 读写:

#[async_trait]
pub trait ThreadStore: Send + Sync {
    async fn load_thread(&self, thread_id: &str) -> Result<Option<Thread>, StorageError>;
    async fn save_thread(&self, thread: &Thread) -> Result<(), StorageError>;
    async fn delete_thread(&self, thread_id: &str) -> Result<(), StorageError>;
    async fn list_threads(&self, offset: usize, limit: usize) -> Result<Vec<String>, StorageError>;
    async fn load_messages(&self, thread_id: &str) -> Result<Option<Vec<Message>>, StorageError>;
    async fn save_messages(&self, thread_id: &str, messages: &[Message]) -> Result<(), StorageError>;
    async fn delete_messages(&self, thread_id: &str) -> Result<(), StorageError>;
    async fn update_thread_metadata(&self, id: &str, metadata: ThreadMetadata) -> Result<(), StorageError>;
}

ThreadRunStore

ThreadRunStoreThreadStore + RunStore 基础上增加了原子 checkpoint:

#[async_trait]
pub trait ThreadRunStore: ThreadStore + RunStore + Send + Sync {
    async fn checkpoint(
        &self,
        thread_id: &str,
        messages: &[Message],
        run: &RunRecord,
    ) -> Result<(), StorageError>;
}

RunStore

#[async_trait]
pub trait RunStore: Send + Sync {
    async fn create_run(&self, record: &RunRecord) -> Result<(), StorageError>;
    async fn load_run(&self, run_id: &str) -> Result<Option<RunRecord>, StorageError>;
    async fn latest_run(&self, thread_id: &str) -> Result<Option<RunRecord>, StorageError>;
    async fn list_runs(&self, query: &RunQuery) -> Result<RunPage, StorageError>;
}

RunRecord

pub struct RunRecord {
    pub run_id: String,
    pub thread_id: String,
    pub agent_id: String,
    pub parent_run_id: Option<String>,
    pub status: RunStatus,
    pub termination_code: Option<String>,
    pub created_at: u64,
    pub updated_at: u64,
    pub steps: usize,
    pub input_tokens: u64,
    pub output_tokens: u64,
    pub state: Option<PersistedState>,
}

相关

运行与运维

当 happy path 已经跑通,这条路径用于把 Agent 服务加固到可运维状态。

推荐顺序

  1. 先启用 可观测性,把 run、tool 和 provider 变得可见。
  2. 再启用 工具权限 HITL,为工具执行增加审批控制。
  3. 通过 配置停止策略 把 agent loop 约束在可预测范围内。
  4. 上报 Tool 进度测试策略 提升可观测性和上线信心。

建议搭配阅读

启用工具权限 HITL

当你需要控制 agent 能调用哪些工具,并对敏感操作启用人工审批时,使用本页。

前置条件

  • 已有可运行的 awaken runtime
  • awaken 启用了 permission
[dependencies]
awaken = { package = "awaken-agent", version = "0.1", features = ["permission"] }
tokio = { version = "1", features = ["full"] }
serde_json = "1"

步骤

  1. 注册 permission 插件:
use std::sync::Arc;
use awaken::engine::GenaiExecutor;
use awaken::ext_permission::PermissionPlugin;
use awaken::registry_spec::{AgentSpec, ModelSpec};
use awaken::{AgentRuntimeBuilder, Plugin};

let agent_spec = AgentSpec::new("my-agent")
    .with_model("gpt-4o-mini")
    .with_system_prompt("You are a helpful assistant.")
    .with_hook_filter("permission");

let runtime = AgentRuntimeBuilder::new()
    .with_provider("openai", Arc::new(GenaiExecutor::new()))
    .with_model(
        "gpt-4o-mini",
        ModelSpec {
            id: "gpt-4o-mini".into(),
            provider: "openai".into(),
            model: "gpt-4o-mini".into(),
        },
    )
    .with_agent_spec(agent_spec)
    .with_plugin("permission", Arc::new(PermissionPlugin) as Arc<dyn Plugin>)
    .build()
    .expect("failed to build runtime");
  1. 以内联方式定义规则:
use awaken::ext_permission::{PermissionRulesConfig, PermissionRuleEntry, ToolPermissionBehavior};

let config = PermissionRulesConfig {
    default_behavior: ToolPermissionBehavior::Ask,
    rules: vec![
        PermissionRuleEntry {
            tool: "read_file".into(),
            behavior: ToolPermissionBehavior::Allow,
            scope: Default::default(),
        },
        PermissionRuleEntry {
            tool: "file_*".into(),
            behavior: ToolPermissionBehavior::Ask,
            scope: Default::default(),
        },
        PermissionRuleEntry {
            tool: "delete_*".into(),
            behavior: ToolPermissionBehavior::Deny,
            scope: Default::default(),
        },
    ],
};
  1. 也可以从 YAML 文件加载:
default_behavior: ask
rules:
  - tool: "read_file"
    behavior: allow
  - tool: "Bash(npm *)"
    behavior: allow
  - tool: "file_*"
    behavior: ask
  - tool: "delete_*"
    behavior: deny
  1. 通过 agent spec 激活:
let agent_spec = AgentSpec::new("my-agent")
    .with_model("gpt-4o-mini")
    .with_system_prompt("You are a helpful assistant.")
    .with_hook_filter("permission");
  1. 理解规则优先级:

  2. Deny

  3. Allow

  4. Ask

匹配 DSL 支持:

Pattern匹配方式
read_file精确匹配工具名
file_*工具名 glob
mcp__github__*MCP 工具 glob
Bash(npm *)主参数 glob
Edit(file_path ~ "src/**")命名字段 glob
Bash(command =~ "(?i)rm")命名字段 regex
/mcp__(gh|gl)__.*/工具名 regex

验证

  1. 用一个命中 deny 的工具测试,调用应被拦截
  2. 用一个命中 ask 的工具测试,run 应进入等待审批状态
  3. 通过 mailbox 接口提交审批
  4. 确认 run 恢复执行

常见错误

错误原因修复
所有工具都被拦住default_behavior: deny 且无 allow 规则显式给安全工具加 allow
规则没有生效插件没注册注册 PermissionPlugin 并激活
pattern 无效glob / regex 语法错对照 DSL 语法检查
ask 一直不恢复没有 mailbox consumer让前端或 API 客户端消费审批请求

相关示例

  • crates/awaken-ext-permission/tests/

关键文件

  • crates/awaken-ext-permission/src/lib.rs
  • crates/awaken-ext-permission/src/config.rs
  • crates/awaken-ext-permission/src/rules.rs
  • crates/awaken-ext-permission/src/plugin/plugin.rs
  • crates/awaken-ext-permission/src/plugin/checker.rs
  • crates/awaken-tool-pattern/

相关

配置停止策略

当你需要根据轮数、token 用量、耗时或连续错误来决定 agent run 何时终止时,使用本页。

前置条件

  • 已添加 awaken
  • 了解 PluginAgentRuntimeBuilder

概览

stop policy 会在每次推理步骤结束后判断 run 是否继续。内置策略包括:

策略触发条件
MaxRoundsPolicy步数超过上限
TokenBudgetPolicy输入+输出 token 超过预算
TimeoutPolicy墙钟时间超过上限
ConsecutiveErrorsPolicy连续推理错误达到阈值

步骤

  1. 以编程方式构造策略:
use std::sync::Arc;
use awaken::policies::{
    MaxRoundsPolicy, TokenBudgetPolicy, TimeoutPolicy,
    ConsecutiveErrorsPolicy, StopPolicy,
};

let policies: Vec<Arc<dyn StopPolicy>> = vec![
    Arc::new(MaxRoundsPolicy::new(25)),
    Arc::new(TokenBudgetPolicy::new(100_000)),
    Arc::new(TimeoutPolicy::new(300_000)),
    Arc::new(ConsecutiveErrorsPolicy::new(3)),
];
  1. StopConditionPlugin 注册到 runtime:
use awaken::policies::StopConditionPlugin;
use awaken::AgentRuntimeBuilder;

let runtime = AgentRuntimeBuilder::new()
    .with_plugin("stop-condition", Arc::new(StopConditionPlugin::new(policies)))
    .with_agent_spec(spec)
    .with_provider("anthropic", Arc::new(provider))
    .build()?;

如果只需要限制轮数,也可以直接用 MaxRoundsPlugin

use awaken::policies::MaxRoundsPlugin;

let runtime = AgentRuntimeBuilder::new()
    .with_plugin("stop-condition:max-rounds", Arc::new(MaxRoundsPlugin::new(10)))
    .with_agent_spec(spec)
    .with_provider("anthropic", Arc::new(provider))
    .build()?;
  1. 用声明式 StopConditionSpec
use awaken_contract::contract::lifecycle::StopConditionSpec;
use awaken::policies::{policies_from_specs, StopConditionPlugin};

let specs = vec![
    StopConditionSpec::MaxRounds { rounds: 10 },
    StopConditionSpec::Timeout { seconds: 300 },
    StopConditionSpec::TokenBudget { max_total: 100_000 },
    StopConditionSpec::ConsecutiveErrors { max: 3 },
];

let policies = policies_from_specs(&specs);
let plugin = StopConditionPlugin::new(policies);

完整的 StopConditionSpec 还包含 StopOnToolContentMatchLoopDetection,但这些目前只有契约定义,尚未在 policies_from_specs 中实现。

StopPolicy Trait

你也可以实现自定义策略:

use awaken::policies::{StopPolicy, StopDecision, StopPolicyStats};

pub struct MyCustomPolicy {
    pub threshold: u64,
}

impl StopPolicy for MyCustomPolicy {
    fn id(&self) -> &str {
        "my_custom"
    }

    fn evaluate(&self, stats: &StopPolicyStats) -> StopDecision {
        if stats.total_output_tokens > self.threshold {
            StopDecision::Stop {
                code: "my_custom".into(),
                detail: format!("output tokens {} exceeded {}", stats.total_output_tokens, self.threshold),
            }
        } else {
            StopDecision::Continue
        }
    }
}

StopPolicyStats

内置 StopConditionHook 会为每次评估准备一份 StopPolicyStats

字段类型说明
step_countu32已完成推理步数
total_input_tokensu64累计输入 token
total_output_tokensu64累计输出 token
elapsed_msu64从第一步开始经过的毫秒数
consecutive_errorsu32连续推理错误计数
last_tool_namesVec<String>最近一轮里调用的工具名
last_response_textString最近一次推理返回的文本

StopDecision

pub enum StopDecision {
    Continue,
    Stop { code: String, detail: String },
}

任何一个 policy 返回 Stop,该 run 就会以 TerminationReason::Stopped 结束。

Stop policy 如何进入 agent loop

  1. StopConditionPluginPhase::AfterInference 注册 hook
  2. 每轮推理结束后,hook 会更新统计状态
  3. 构造 StopPolicyStats
  4. 依次调用每个策略的 evaluate
  5. 如果有策略返回 Stop,则写入 RunLifecycleUpdate::Done

maxmax_total 设为 0 的策略视为禁用,始终返回 Continue

常见错误

错误原因修复
run 永远不结束没注册 stop policy,LLM 一直在调工具至少加一个 MaxRoundsPolicy
StateError::KeyAlreadyRegistered同时注册了 StopConditionPluginMaxRoundsPlugin二选一
Timeout 提前触发TimeoutPolicy::new() 传的是毫秒,StopConditionSpec::Timeout 是秒注意单位

关键文件

  • crates/awaken-runtime/src/policies/mod.rs
  • crates/awaken-runtime/src/policies/policy.rs
  • crates/awaken-runtime/src/policies/plugin.rs
  • crates/awaken-runtime/src/policies/state.rs
  • crates/awaken-runtime/src/policies/hook.rs
  • crates/awaken-contract/src/contract/lifecycle.rs

相关

启用可观测性

当你需要用 OpenTelemetry 兼容的遥测方式追踪 LLM 推理和工具执行时,使用本页。

前置条件

  • 已有可运行的 awaken runtime
  • awaken 启用了 observability
  • 如果要导出 OTel:awaken-ext-observability 需要启用 otel,并准备好 collector
[dependencies]
awaken = { package = "awaken-agent", version = "0.1", features = ["observability"] }
tokio = { version = "1", features = ["full"] }

步骤

  1. 先用内存 sink(开发环境):
use std::sync::Arc;
use awaken::engine::GenaiExecutor;
use awaken::ext_observability::{ObservabilityPlugin, InMemorySink};
use awaken::registry_spec::{AgentSpec, ModelSpec};
use awaken::{AgentRuntimeBuilder, Plugin};

let sink = InMemorySink::new();
let obs_plugin = ObservabilityPlugin::new(sink.clone());
let agent_spec = AgentSpec::new("observed-agent")
    .with_model("gpt-4o-mini")
    .with_system_prompt("You are a helpful assistant.")
    .with_hook_filter("observability");

let runtime = AgentRuntimeBuilder::new()
    .with_provider("openai", Arc::new(GenaiExecutor::new()))
    .with_model(
        "gpt-4o-mini",
        ModelSpec {
            id: "gpt-4o-mini".into(),
            provider: "openai".into(),
            model: "gpt-4o-mini".into(),
        },
    )
    .with_agent_spec(agent_spec)
    .with_plugin("observability", Arc::new(obs_plugin) as Arc<dyn Plugin>)
    .build()
    .expect("failed to build runtime");

run 结束后,可以直接读取 sink.metrics()

  1. 换成 OTel sink(生产环境):
use std::sync::Arc;
use awaken::engine::GenaiExecutor;
use awaken::ext_observability::{ObservabilityPlugin, OtelMetricsSink};
use awaken::registry_spec::{AgentSpec, ModelSpec};
use awaken::{AgentRuntimeBuilder, Plugin};
use opentelemetry_sdk::trace::SdkTracerProvider;

let provider = SdkTracerProvider::builder().build();
let tracer = provider.tracer("awaken");
let obs_plugin = ObservabilityPlugin::new(OtelMetricsSink::new(tracer));
  1. 如果需要,也可以实现自定义 sink:
use awaken::ext_observability::{MetricsSink, GenAISpan, ToolSpan, AgentMetrics};

struct MySink;

impl MetricsSink for MySink {
    fn on_inference(&self, span: &GenAISpan) {}
    fn on_tool(&self, span: &ToolSpan) {}
    fn on_run_end(&self, metrics: &AgentMetrics) {}
}
  1. 插件会在这些 phase 采集数据:
Phase采集内容
RunStartsession 起始时间
BeforeInferencemodel、provider、开始时间
AfterInferencetoken usage、finish reason、耗时
BeforeToolExecutetool 开始时间
AfterToolExecutetool 耗时、失败状态
RunEndsession 总时长

验证

  1. InMemorySink 跑一个 agent
  2. 执行结束后调用 sink.metrics()
  3. 确认 inferences 非空且 token 统计有值
  4. 如果用 OTel,去 collector / Jaeger 确认 span 已上报

常见错误

错误原因修复
metrics 全是 0插件没注册通过 builder 注册 ObservabilityPlugin
找不到 OtelMetricsSink缺少 otel featureawaken-ext-observabilityotel
collector 里没有 spanexporter 没配置或 tracer provider 被提前释放检查 exporter 和 provider 生命周期
token 统计缺失provider 没返回 usage确保 LlmExecutor 产生 TokenUsage

相关示例

  • crates/awaken-ext-observability/tests/

关键文件

  • crates/awaken-ext-observability/src/lib.rs
  • crates/awaken-ext-observability/src/plugin/plugin.rs
  • crates/awaken-ext-observability/src/plugin/hooks.rs
  • crates/awaken-ext-observability/src/metrics.rs
  • crates/awaken-ext-observability/src/sink.rs
  • crates/awaken-ext-observability/src/otel.rs

相关

上报 Tool 进度

当你需要在工具执行过程中,把进度或活动快照实时流回前端时,使用本页。

前置条件

  • 已有一个 Tool 实现,并能访问 ToolCallContext
  • 已添加 awaken

步骤

  1. report_progress 上报结构化进度:
use awaken::contract::tool::{Tool, ToolCallContext, ToolDescriptor, ToolError, ToolResult, ToolOutput};
use awaken::contract::progress::ProgressStatus;
use async_trait::async_trait;
use serde_json::{Value, json};

struct IndexTool;

#[async_trait]
impl Tool for IndexTool {
    fn descriptor(&self) -> ToolDescriptor {
        ToolDescriptor::new("index_docs", "Index Docs", "Index a document set")
    }

    async fn execute(&self, args: Value, ctx: &ToolCallContext) -> Result<ToolOutput, ToolError> {
        ctx.report_progress(ProgressStatus::Running, Some("Starting indexing"), None).await;

        for i in 0..10 {
            let fraction = (i + 1) as f64 / 10.0;
            ctx.report_progress(
                ProgressStatus::Running,
                Some(&format!("Indexed batch {}/10", i + 1)),
                Some(fraction),
            ).await;
        }

        ctx.report_progress(ProgressStatus::Done, Some("Indexing complete"), Some(1.0)).await;
        Ok(ToolResult::success("index_docs", json!({"indexed": 10})).into())
    }
}
  1. report_activity 上报完整活动快照:
ctx.report_activity("code-generation", "fn hello() {\n    println!(\"hi\");\n}").await;
  1. report_activity_delta 上报增量补丁:
use serde_json::json;

ctx.report_activity_delta(
    "code-generation",
    json!([
        { "op": "add", "path": "/line", "value": "    println!(\"world\");" }
    ]),
).await;
  1. ProgressStatus 反映 tool call 生命周期:
变体含义
Pending已排队,尚未开始
Running正在执行
Done成功完成
Failed执行失败
Cancelled被取消

原理

report_progress 内部会构造 ToolCallProgressState,再由运行时包装成 AgentEvent::ActivitySnapshot

pub struct ToolCallProgressState {
    pub schema: String,
    pub node_id: String,
    pub call_id: String,
    pub tool_name: String,
    pub status: ProgressStatus,
    pub progress: Option<f64>,
    pub loaded: Option<u64>,
    pub total: Option<u64>,
    pub message: Option<String>,
    pub parent_node_id: Option<String>,
    pub parent_call_id: Option<String>,
}

活动类型常量是 "tool-call-progress"

验证

订阅 SSE 事件流,确认在工具执行期间能收到 ActivitySnapshot,并且 status"running" 变成 "done"

常见错误

错误原因修复
没有事件activity_sink 为空确保运行时配置了事件 sink
前端没有更新进度前端未处理该活动类型过滤 activity_type == "tool-call-progress"

关键文件

  • crates/awaken-contract/src/contract/progress.rs
  • crates/awaken-contract/src/contract/tool.rs

相关

测试策略

当你需要测试工具、插件、状态键或完整的代理运行,且不依赖真实 LLM 时,可以使用本指南。

前置条件

  • Cargo.toml 中添加 awaken crate(包含运行时的 re-exports)
  • tokio 需启用 rtmacros features 以支持异步测试
  • serde_json 用于构造工具参数和断言

1. 单元测试 Tool

创建一个带有测试快照的 ToolCallContext,调用 tool.execute(),然后对返回的 ToolOutput 进行断言。

use async_trait::async_trait;
use serde_json::{Value, json};
use awaken::contract::tool::{
    Tool, ToolCallContext, ToolDescriptor, ToolError, ToolOutput, ToolResult,
};

struct GreetTool;

#[async_trait]
impl Tool for GreetTool {
    fn descriptor(&self) -> ToolDescriptor {
        ToolDescriptor::new("greet", "greet", "Greet a user by name")
    }

    async fn execute(&self, args: Value, _ctx: &ToolCallContext) -> Result<ToolOutput, ToolError> {
        let name = args["name"]
            .as_str()
            .ok_or_else(|| ToolError::InvalidArguments("missing 'name'".into()))?;
        Ok(ToolResult::success("greet", json!({ "greeting": format!("Hello, {name}!") })).into())
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[tokio::test]
    async fn greet_tool_returns_greeting() {
        let tool = GreetTool;
        let ctx = ToolCallContext::test_default();

        let output = tool.execute(json!({"name": "Alice"}), &ctx).await.unwrap();

        assert!(output.result.is_success());
        assert_eq!(output.result.data["greeting"], "Hello, Alice!");
    }

    #[tokio::test]
    async fn greet_tool_rejects_missing_name() {
        let tool = GreetTool;
        let ctx = ToolCallContext::test_default();

        let err = tool.execute(json!({}), &ctx).await.unwrap_err();

        assert!(matches!(err, ToolError::InvalidArguments(_)));
    }
}

当你的工具通过 ToolOutput::with_command() 返回副作用时,对 command 字段进行断言:

#[tokio::test]
async fn tool_produces_state_command() {
    let tool = CounterMutationTool { increment: 5 };
    let ctx = ToolCallContext::test_default();

    let output = tool.execute(json!({}), &ctx).await.unwrap();

    assert!(output.result.is_success());
    // The command is opaque at this level; integration tests (section 4)
    // verify that commands are applied correctly to the StateStore.
    assert!(!output.command.is_empty());
}

2. 单元测试 Plugin

通过创建 PluginRegistrar 并调用 plugin.register(),验证插件注册了预期的状态键和钩子。

use awaken::contract::StateError;
use awaken::state::{StateKey, MergeStrategy, StateKeyOptions, StateCommand};
use awaken::plugins::{Plugin, PluginDescriptor, PluginRegistrar};
use serde::{Serialize, Deserialize};

// -- State key --

struct Counter;

impl StateKey for Counter {
    const KEY: &'static str = "test.counter";
    const MERGE: MergeStrategy = MergeStrategy::Commutative;
    type Value = usize;
    type Update = usize;

    fn apply(value: &mut Self::Value, update: Self::Update) {
        *value += update;
    }
}

// -- Plugin --

struct CounterPlugin;

impl Plugin for CounterPlugin {
    fn descriptor(&self) -> PluginDescriptor {
        PluginDescriptor { name: "counter" }
    }

    fn register(&self, r: &mut PluginRegistrar) -> Result<(), StateError> {
        r.register_key::<Counter>(StateKeyOptions::default())?;
        Ok(())
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use awaken::state::StateStore;

    #[test]
    fn counter_plugin_registers_key() {
        let store = StateStore::new();
        // install_plugin calls register() internally
        store.install_plugin(CounterPlugin).unwrap();

        // The key should now be readable (returns the default value)
        let val = store.read::<Counter>().unwrap_or_default();
        assert_eq!(val, 0);
    }

    #[test]
    fn counter_plugin_rejects_double_registration() {
        let store = StateStore::new();
        store.install_plugin(CounterPlugin).unwrap();

        // Registering the same key again should fail
        let result = store.install_plugin(CounterPlugin);
        assert!(result.is_err());
    }
}

要直接测试阶段钩子,请构建一个最小的 PhaseContext 并检查返回的 StateCommand:

#[tokio::test]
async fn audit_hook_appends_entry() {
    let hook = AuditHook;
    let ctx = PhaseContext::test_default();

    let cmd = hook.run(&ctx).await.unwrap();

    // Commit the command to a test store and verify
    let store = StateStore::new();
    store.install_plugin(AuditPlugin).unwrap();
    store.commit(cmd).unwrap();

    let log = store.read::<AuditLogKey>().unwrap();
    assert!(!log.entries.is_empty());
}

3. 单元测试 StateKey

直接测试 apply() 变更操作,无需任何运行时开销:

use awaken::state::{StateKey, MergeStrategy};

struct HitCounter;

impl StateKey for HitCounter {
    const KEY: &'static str = "test.hit_counter";
    const MERGE: MergeStrategy = MergeStrategy::Commutative;
    type Value = u64;
    type Update = u64;

    fn apply(value: &mut Self::Value, update: Self::Update) {
        *value += update;
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn apply_increments_counter() {
        let mut value = u64::default();
        HitCounter::apply(&mut value, 5);
        assert_eq!(value, 5);
        HitCounter::apply(&mut value, 3);
        assert_eq!(value, 8);
    }

    #[test]
    fn apply_from_default_is_identity_free() {
        let mut value = u64::default();
        HitCounter::apply(&mut value, 0);
        assert_eq!(value, 0);
    }
}

对于具有复杂值类型的状态键,测试边界情况如空集合或合并冲突:

#[test]
fn apply_merge_replaces_entries() {
    let mut log = AuditLog { entries: vec!["old".into()] };
    AuditLogKey::apply(&mut log, AuditLog { entries: vec!["new".into()] });
    // Exclusive merge replaces the entire value
    assert_eq!(log.entries, vec!["new"]);
}

4. 使用模拟 LLM 进行集成测试

使用脚本化的 LlmExecutor 构建完整的代理运行时,返回预设的响应。这是 awaken-runtime 集成测试中使用的主要模式。

use std::sync::{Arc, Mutex};
use async_trait::async_trait;
use serde_json::json;

use awaken::contract::content::ContentBlock;
use awaken::contract::event_sink::VecEventSink;
use awaken::contract::executor::{InferenceExecutionError, InferenceRequest, LlmExecutor};
use awaken::contract::identity::{RunIdentity, RunOrigin};
use awaken::contract::inference::{StopReason, StreamResult};
use awaken::contract::message::{Message, ToolCall};
use awaken::contract::tool::{
    Tool, ToolCallContext, ToolDescriptor, ToolError, ToolOutput, ToolResult,
};
use awaken::loop_runner::{AgentLoopParams, LoopStatePlugin, build_agent_env, run_agent_loop};
use awaken::registry::{AgentResolver, ResolvedAgent};
use awaken::state::StateStore;
use awaken::phase::PhaseRuntime;
use awaken::RuntimeError;

// -- 脚本化 LLM 执行器 --

struct ScriptedLlm {
    responses: Mutex<Vec<StreamResult>>,
}

impl ScriptedLlm {
    fn new(responses: Vec<StreamResult>) -> Self {
        Self {
            responses: Mutex::new(responses),
        }
    }
}

#[async_trait]
impl LlmExecutor for ScriptedLlm {
    async fn execute(
        &self,
        _request: InferenceRequest,
    ) -> Result<StreamResult, InferenceExecutionError> {
        let mut responses = self.responses.lock().unwrap();
        if responses.is_empty() {
            // Fallback: end the conversation
            Ok(StreamResult {
                content: vec![ContentBlock::text("Done.")],
                tool_calls: vec![],
                usage: None,
                stop_reason: Some(StopReason::EndTurn),
                has_incomplete_tool_calls: false,
            })
        } else {
            Ok(responses.remove(0))
        }
    }

    fn name(&self) -> &str {
        "scripted"
    }
}

// -- 解析器 --

struct FixedResolver {
    agent: ResolvedAgent,
}

impl AgentResolver for FixedResolver {
    fn resolve(&self, _agent_id: &str) -> Result<ResolvedAgent, RuntimeError> {
        let mut agent = self.agent.clone();
        agent.env = build_agent_env(&[], &agent)?;
        Ok(agent)
    }
}

// -- 辅助函数 --

fn tool_step(calls: Vec<ToolCall>) -> StreamResult {
    StreamResult {
        content: vec![],
        tool_calls: calls,
        usage: None,
        stop_reason: Some(StopReason::ToolUse),
        has_incomplete_tool_calls: false,
    }
}

fn text_step(text: &str) -> StreamResult {
    StreamResult {
        content: vec![ContentBlock::text(text)],
        tool_calls: vec![],
        usage: None,
        stop_reason: Some(StopReason::EndTurn),
        has_incomplete_tool_calls: false,
    }
}

fn test_identity() -> RunIdentity {
    RunIdentity::new(
        "thread-test".into(),
        None,
        "run-test".into(),
        None,
        "agent".into(),
        RunOrigin::User,
    )
}

// -- 测试 --

#[tokio::test]
async fn tool_call_flow_end_to_end() {
    // Script: LLM calls get_weather, then responds with text
    let llm = Arc::new(ScriptedLlm::new(vec![
        tool_step(vec![ToolCall::new("c1", "get_weather", json!({"city": "Tokyo"}))]),
        text_step("The weather in Tokyo is sunny."),
    ]));

    let agent = ResolvedAgent::new("test", "model", "You are helpful.", llm)
        .with_tool(Arc::new(GetWeatherTool));

    let store = StateStore::new();
    let runtime = PhaseRuntime::new(store.clone()).unwrap();
    store.install_plugin(LoopStatePlugin).unwrap();

    let resolver = FixedResolver { agent };
    let sink = Arc::new(VecEventSink::new());

    let result = run_agent_loop(AgentLoopParams {
        resolver: &resolver,
        agent_id: "test",
        runtime: &runtime,
        sink: sink.clone(),
        checkpoint_store: None,
        messages: vec![Message::user("What's the weather?")],
        run_identity: test_identity(),
        cancellation_token: None,
        decision_rx: None,
        overrides: None,
        frontend_tools: Vec::new(),
    })
    .await
    .unwrap();

    assert_eq!(result.response, "The weather in Tokyo is sunny.");
    assert_eq!(result.steps, 2); // tool step + text step
}

对于更简单的场景,可以使用内置的 MockLlmExecutor,它仅返回纯文本响应:

use awaken::engine::MockLlmExecutor;

let llm = Arc::new(MockLlmExecutor::new().with_responses(vec!["Hello!".into()]));
let agent = ResolvedAgent::new("test", "model", "system prompt", llm);

5. 测试事件流

使用 VecEventSink 捕获运行期间发出的所有事件,并对其顺序和内容进行断言。

use awaken::contract::event::AgentEvent;
use awaken::contract::event_sink::VecEventSink;
use awaken::contract::lifecycle::TerminationReason;

#[tokio::test]
async fn events_follow_expected_lifecycle() {
    // ... set up runtime and run agent (see section 4) ...

    let events = sink.take();

    // Verify ordering: RunStart -> StepStart -> ... -> StepEnd -> RunFinish
    assert!(matches!(events.first(), Some(AgentEvent::RunStart { .. })));
    assert!(matches!(events.last(), Some(AgentEvent::RunFinish { .. })));

    // Count specific event types
    let step_starts = events.iter().filter(|e| matches!(e, AgentEvent::StepStart { .. })).count();
    let step_ends = events.iter().filter(|e| matches!(e, AgentEvent::StepEnd)).count();
    assert_eq!(step_starts, step_ends, "every StepStart needs a StepEnd");

    // Verify termination reason
    if let Some(AgentEvent::RunFinish { termination, .. }) = events.last() {
        assert_eq!(*termination, TerminationReason::NaturalEnd);
    }

    // Check that tool call events appear in the correct order
    let has_tool_start = events.iter().any(|e| matches!(e, AgentEvent::ToolCallStart { .. }));
    let has_tool_done = events.iter().any(|e| matches!(e, AgentEvent::ToolCallDone { .. }));
    if has_tool_start {
        assert!(has_tool_done, "ToolCallStart without ToolCallDone");
        let start_idx = events.iter().position(|e| matches!(e, AgentEvent::ToolCallStart { .. })).unwrap();
        let done_idx = events.iter().position(|e| matches!(e, AgentEvent::ToolCallDone { .. })).unwrap();
        assert!(start_idx < done_idx);
    }
}

一个可复用的事件类型提取辅助函数(在运行时测试套件中使用):

fn event_type(e: &AgentEvent) -> &'static str {
    match e {
        AgentEvent::RunStart { .. } => "run_start",
        AgentEvent::RunFinish { .. } => "run_finish",
        AgentEvent::StepStart { .. } => "step_start",
        AgentEvent::StepEnd => "step_end",
        AgentEvent::TextDelta { .. } => "text_delta",
        AgentEvent::ToolCallStart { .. } => "tool_call_start",
        AgentEvent::ToolCallDone { .. } => "tool_call_done",
        AgentEvent::InferenceComplete { .. } => "inference_complete",
        AgentEvent::StateSnapshot { .. } => "state_snapshot",
        _ => "other",
    }
}

let types: Vec<&str> = events.iter().map(event_type).collect();
assert_eq!(types[0], "run_start");
assert_eq!(*types.last().unwrap(), "run_finish");

6. 使用真实 LLM 测试(在线测试)

要针对真实的提供商进行端到端验证,可使用 GenaiExecutor 并通过环境变量传递凭据。将这些测试标记为 #[ignore],使其仅在显式请求时运行。

use awaken::engine::GenaiExecutor;

#[tokio::test]
#[ignore] // Run with: cargo test -- --ignored
async fn live_llm_responds() {
    // Requires: OPENAI_API_KEY or (LLM_BASE_URL + LLM_API_KEY)
    let model = std::env::var("LLM_MODEL").unwrap_or_else(|_| "gpt-4o-mini".into());
    let llm = Arc::new(GenaiExecutor::new());

    let agent = ResolvedAgent::new(
        "live-test",
        &model,
        "You are a test assistant. Answer in one word.",
        llm,
    );

    // ... set up resolver, store, runtime, sink as in section 4 ...

    let result = run_agent_loop(AgentLoopParams {
        resolver: &resolver,
        agent_id: "live-test",
        runtime: &runtime,
        sink: sink.clone(),
        checkpoint_store: None,
        messages: vec![Message::user("What is 2+2? Answer in one word.")],
        run_identity: test_identity(),
        cancellation_token: None,
        decision_rx: None,
        overrides: None,
        frontend_tools: Vec::new(),
    })
    .await
    .unwrap();

    assert!(!result.response.is_empty());
}

运行在线测试:

# OpenAI 兼容的提供商
OPENAI_API_KEY=<your-key> LLM_MODEL=gpt-4o-mini cargo test -- --ignored

# 自定义端点(例如 BigModel)
LLM_BASE_URL=https://open.bigmodel.cn/api/paas/v4/ \
  LLM_API_KEY=<key> \
  LLM_MODEL=GLM-4.7-Flash \
  cargo test -- --ignored

完整的可运行示例(含控制台输出)请参见 examples/live_test.rsexamples/tool_call_live.rs

关键文件

  • crates/awaken-contract/src/contract/tool.rs – Tool trait、ToolCallContext::test_default()、ToolResult、ToolOutput
  • crates/awaken-contract/src/contract/event_sink.rs – VecEventSink
  • crates/awaken-runtime/src/engine/mock.rs – MockLlmExecutor
  • crates/awaken-runtime/src/state/mod.rs – StateStore、StateCommand
  • crates/awaken-runtime/src/loop_runner/mod.rsrun_agent_loop、AgentLoopParams、AgentRunResult
  • crates/awaken-runtime/tests/ – 集成测试套件(事件生命周期、工具副作用)

相关文档

概览

awaken crate 是 Awaken 的公开门面。它把 awaken-contractawaken-runtimeawaken-stores 以及若干扩展 crate 的公共 API 重新导出为一个统一依赖面。

模块再导出

门面路径来源 crate内容
awaken::contractawaken-contracttool、event、message、suspension、lifecycle 等契约
awaken::modelawaken-contractPhaseEffectSpecScheduledActionSpecJsonValue
awaken::registry_specawaken-contractAgentSpecModelSpecProviderSpecMcpServerSpecPluginConfigKey
awaken::stateawaken-contract + awaken-runtimeStateKeyStateMapSnapshotStateStoreMutationBatch
awaken::agentawaken-runtimeagent 配置与状态
awaken::builderawaken-runtimeAgentRuntimeBuilderBuildError
awaken::contextawaken-runtimePhaseContext
awaken::engineawaken-runtimeLLM 执行层抽象
awaken::executionawaken-runtimeExecutionEnv
awaken::extensionsawaken-runtime内置扩展基础设施
awaken::loop_runnerawaken-runtimeagent loop 执行器
awaken::phaseawaken-runtimePhaseRuntimePhaseHook
awaken::pluginsawaken-runtimePluginPluginRegistrar
awaken::policiesawaken-runtimecontext window / retry policy
awaken::registryawaken-runtimeAgentResolverResolvedAgent
awaken::runtimeawaken-runtimeAgentRuntime
awaken::storesawaken-storesfile / postgres / memory store

受 feature flag 控制的模块

门面路径feature flag来源 crate
awaken::ext_permissionpermissionawaken-ext-permission
awaken::ext_observabilityobservabilityawaken-ext-observability
awaken::ext_mcpmcpawaken-ext-mcp
awaken::ext_skillsskillsawaken-ext-skills
awaken::ext_generative_uigenerative-uiawaken-ext-generative-ui
awaken::ext_reminderreminderawaken-ext-reminder
awaken::serverserverawaken-server

根级再导出

常用类型还会直接从 crate root 导出,例如:

  • 来自 awaken-contractAgentSpecKeyScopeMergeStrategyPhaseStateKeyStateMapSnapshot
  • 来自 awaken-runtimeAgentRuntimeAgentRuntimeBuilderBuildErrorRunRequestRuntimeErrorPhaseHook

Feature Flags

Flag默认开启说明
permissionyes工具级权限控制与 HITL
observabilityyestracing 与 metrics
mcpyesMCP 工具桥接
skillsyes技能子系统
reminderyes工具执行后的提醒注入
serveryesHTTP / SSE / protocol server
generative-uiyes生成式 UI 组件流
fullyes上述功能全集

独立工作区扩展 crate 也可能存在但未接到门面 feature 上;当前包括 awaken-ext-deferred-tools

相关

Tool Trait

Tool 是 Awaken 暴露能力给 LLM 的主扩展点。tool 接收 JSON 参数和只读上下文,返回 ToolOutput

Trait 定义

#[async_trait]
pub trait Tool: Send + Sync {
    fn descriptor(&self) -> ToolDescriptor;

    fn validate_args(&self, _args: &Value) -> Result<(), ToolError> {
        Ok(())
    }

    async fn execute(
        &self,
        args: Value,
        ctx: &ToolCallContext,
    ) -> Result<ToolOutput, ToolError>;
}

ToolDescriptor

描述 tool 的 ID、名称、说明和参数 schema。它会被注册到运行时,并在推理请求里暴露给 LLM。

pub struct ToolDescriptor {
    pub id: String,
    pub name: String,
    pub description: String,
    pub parameters: Value,
    pub category: Option<String>,
}

Builder 方法:

ToolDescriptor::new(id, name, description) -> Self
    .with_parameters(schema: Value) -> Self
    .with_category(category: impl Into<String>) -> Self

ToolResult

Tool::execute 返回的结构化结果。

pub struct ToolResult {
    pub tool_name: String,
    pub status: ToolStatus,
    pub data: Value,
    pub message: Option<String>,
    pub suspension: Option<Box<SuspendTicket>>,
}

ToolStatus

pub enum ToolStatus {
    Success,
    Pending,
    Error,
}

构造函数

方法状态用途
ToolResult::success(name, data)Success正常完成
ToolResult::success_with_message(name, data, msg)Success带补充说明的成功
ToolResult::error(name, message)Error可恢复失败
ToolResult::error_with_code(name, code, message)Error带 code 的结构化失败
ToolResult::suspended(name, message)PendingHITL 挂起
ToolResult::suspended_with(name, message, ticket)PendingSuspendTicket 的挂起

判定方法

  • is_success()
  • is_pending()
  • is_error()
  • to_json()

ToolError

ToolErrorToolResult::error(...) 的区别是:前者直接终止该次 tool call,后者会把失败信息回传给 LLM。

pub enum ToolError {
    InvalidArguments(String),
    ExecutionFailed(String),
    Denied(String),
    NotFound(String),
    Internal(String),
}

ToolCallContext

tool 执行期拿到的只读上下文:

pub struct ToolCallContext {
    pub call_id: String,
    pub tool_name: String,
    pub run_identity: RunIdentity,
    pub agent_spec: Arc<AgentSpec>,
    pub snapshot: Snapshot,
    pub activity_sink: Option<Arc<dyn EventSink>>,
}

方法

fn state<K: StateKey>(&self) -> Option<&K::Value>
async fn report_activity(&self, activity_type: &str, content: &str)
async fn report_activity_delta(&self, activity_type: &str, patch: Value)
async fn report_progress(
    &self,
    status: ProgressStatus,
    message: Option<&str>,
    progress: Option<f64>,
)

示例

最小 tool

use async_trait::async_trait;
use awaken::contract::tool::{Tool, ToolCallContext, ToolDescriptor, ToolError, ToolResult, ToolOutput};
use serde_json::{Value, json};

struct Greet;

#[async_trait]
impl Tool for Greet {
    fn descriptor(&self) -> ToolDescriptor {
        ToolDescriptor::new("greet", "greet", "Greet a user by name")
            .with_parameters(json!({
                "type": "object",
                "properties": {
                    "name": { "type": "string" }
                },
                "required": ["name"]
            }))
    }

    async fn execute(
        &self,
        args: Value,
        _ctx: &ToolCallContext,
    ) -> Result<ToolOutput, ToolError> {
        let name = args["name"]
            .as_str()
            .ok_or_else(|| ToolError::InvalidArguments("name required".into()))?;
        Ok(ToolResult::success("greet", json!({ "greeting": format!("Hello, {name}!") })).into())
    }
}

从上下文读取状态

use async_trait::async_trait;
use awaken::contract::tool::{Tool, ToolCallContext, ToolDescriptor, ToolError, ToolResult, ToolOutput};
use awaken::state::StateKey;
use serde_json::{Value, json};

struct GetPreferences;

#[async_trait]
impl Tool for GetPreferences {
    fn descriptor(&self) -> ToolDescriptor {
        ToolDescriptor::new("get_prefs", "get_preferences", "Get user preferences")
    }

    async fn execute(
        &self,
        _args: Value,
        ctx: &ToolCallContext,
    ) -> Result<ToolOutput, ToolError> {
        // let prefs = ctx.state::<UserPreferences>().cloned().unwrap_or_default();
        Ok(ToolResult::success("get_prefs", json!({})).into())
    }
}

Tool 执行钩子

每次 tool call 在真正执行前后都会经过插件钩子。

完整生命周期

LLM 选择 tool
  -> validate_args()
  -> BeforeToolExecute
     插件可调度 ToolInterceptAction:
       Block
       Suspend
       SetResult
  -> execute()
  -> AfterToolExecute

BeforeToolExecute

参数校验后运行。插件可以在这里阻断、挂起,或直接提供预构造结果。

拦截优先级:

Block > Suspend > SetResult

AfterToolExecute

tool 完成后运行。插件可以观察 ToolResult、更新状态、追加事件或调度后续 action。

ToolCallStatus 转移

New -> Running -> Succeeded
                  Failed
                  Suspended -> Resuming -> Running -> ...
                  Cancelled

终态(SucceededFailedCancelled)不能再向前转移。

相关

Scheduled Actions

Scheduled action 是插件、tool 和运行时在 phase 收敛循环里发起副作用的主要机制。任何 hook、tool 或内部模块都可以通过 StateCommand::schedule_action::<A>(payload) 调度一个 action,运行时会在目标 phase 的 EXECUTE 阶段把它交给对应 handler。

工作方式

Hook / Tool                    Runtime
    |                            |
    |-- StateCommand ----------->|  (包含 scheduled_actions)
    |                            |-- commit state updates
    |                            |-- dispatch to handler(A, p)
    |                            |      |
    |                            |      |-- handler returns StateCommand
    |                            |<-----'
    |                            |-- commit handler results

从 hook 调度

use awaken_runtime::agent::state::ExcludeTool;

async fn run(&self, ctx: &PhaseContext) -> Result<StateCommand, StateError> {
    let mut cmd = StateCommand::new();
    cmd.schedule_action::<ExcludeTool>("dangerous_tool".into())?;
    Ok(cmd)
}

从 tool 调度

use awaken_runtime::agent::state::AddContextMessage;
use awaken_contract::contract::context_message::ContextMessage;

async fn execute(&self, args: Value, ctx: &ToolCallContext) -> Result<ToolOutput, ToolError> {
    let mut cmd = StateCommand::new();
    cmd.schedule_action::<AddContextMessage>(
        ContextMessage::system("my_tool.hint", "Remember to check the docs."),
    )?;
    Ok(ToolOutput::with_command(
        ToolResult::success("my_tool", json!({"ok": true})),
        cmd,
    ))
}

核心 Actions(awaken-runtime)

AddContextMessage

Keyruntime.add_context_message
PhaseBeforeInference
PayloadContextMessage

向当前步骤的推理上下文注入一条 context message。

SetInferenceOverride

Keyruntime.set_inference_override
PhaseBeforeInference
PayloadInferenceOverride

覆盖当前步骤的推理参数,如 model、temperature、max_tokens 等。

ExcludeTool

Keyruntime.exclude_tool
PhaseBeforeInference
PayloadString(tool ID)

把某个 tool 从当前步骤提供给 LLM 的工具集合中移除。

IncludeOnlyTools

Keyruntime.include_only_tools
PhaseBeforeInference
PayloadVec<String>

把当前步骤的工具集合限制为指定白名单。

ToolInterceptAction

Keytool_intercept
PhaseBeforeToolExecute
PayloadToolInterceptPayload

在 tool 真正执行前拦截:

变体效果
Block { reason }阻断 tool,run 终止
Suspend(SuspendTicket)挂起 tool,等待外部决策
SetResult(ToolResult)直接使用预构造结果,跳过执行

优先级:

Block > Suspend > SetResult

Deferred Tools Actions(awaken-ext-deferred-tools)

DeferToolAction

Keydeferred_tools.defer
PhaseBeforeInference
PayloadVec<String>

把工具切换到 Deferred 模式,从 LLM 工具列表里移除,由 ToolSearch 间接暴露。

PromoteToolAction

Keydeferred_tools.promote
PhaseBeforeInference
PayloadVec<String>

把工具从 Deferred 提升回 Eager 模式。

插件 Action 使用矩阵

插件AddContextSetOverrideExcludeIncludeOnlyInterceptDeferPromote
permissionXX
skillsX
reminderX
deferred-toolsXXXX
observability
mcp
generative-ui

定义自定义 action

插件可以通过实现 ScheduledActionSpec 来定义自己的 action:

use awaken_contract::model::{Phase, ScheduledActionSpec};

pub struct MyCustomAction;

impl ScheduledActionSpec for MyCustomAction {
    const KEY: &'static str = "my_plugin.custom_action";
    const PHASE: Phase = Phase::BeforeInference;
    type Payload = MyPayload;
}

register() 中挂上 handler:

fn register(&self, r: &mut PluginRegistrar) -> Result<(), StateError> {
    r.register_scheduled_action::<MyCustomAction, _>(MyHandler)?;
    Ok(())
}

收敛与级联

scheduled actions 在 phase 收敛循环内部执行。某个 handler 可以再为同一 phase 调度新的 action,于是运行时会继续下一轮 dispatch,直到没有新 action 产生。

循环工作方式

Phase EXECUTE stage:
  round 1: dispatch queued actions -> handlers return StateCommands
           commit state, collect newly scheduled actions
  round 2: dispatch new actions
           ...
  round N: no new actions -> phase converges

限制

循环上限是 DEFAULT_MAX_PHASE_ROUNDS(当前默认 16)。如果超过上限仍不断产生 action,会返回 StateError::PhaseRunLoopExceeded

失败 action

handler 返回错误时不会重试,失败会被写入 FailedScheduledActions

let failed = store.read::<FailedScheduledActions>().unwrap_or_default();
assert!(failed.is_empty(), "expected no failed actions");

Effects

Effect 是类型化的、fire-and-forget 的副作用事件。和会在 phase 收敛循环内部执行、还能继续级联的 scheduled actions 不同,effect 在 commit 之后才分发,而且 handler 不能再返回新的 StateCommand

常见用途:审计日志、外部 webhook、指标上报、通知投递。

EffectSpec trait

pub trait EffectSpec: 'static + Send + Sync {
    const KEY: &'static str;
    type Payload: Serialize + DeserializeOwned + Send + Sync + 'static;
}

约定 KEY 采用 "<plugin>.<effect_name>" 这类全局唯一名字。

发出 effect

通过 StateCommand::emit::<E>(payload) 发出:

use awaken::{StateCommand, StateError};

async fn run(&self, ctx: &PhaseContext) -> Result<StateCommand, StateError> {
    let mut cmd = StateCommand::new();
    cmd.emit::<AuditEffect>(AuditPayload {
        action: "user_login".into(),
        actor: "agent-1".into(),
    })?;
    Ok(cmd)
}

TypedEffect 封装

运行时内部用 TypedEffect 做类型擦除:

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct TypedEffect {
    pub key: String,
    pub payload: JsonValue,
}
  • TypedEffect::from_spec::<E>(payload):把 typed payload 序列化成 TypedEffect
  • TypedEffect::decode::<E>():把 JSON 反序列化回具体 payload

注册 effect handler

#[async_trait]
pub trait TypedEffectHandler<E>: Send + Sync + 'static
where
    E: EffectSpec,
{
    async fn handle_typed(
        &self,
        payload: E::Payload,
        snapshot: &Snapshot,
    ) -> Result<(), String>;
}

关键点:

  • handler 收到的是 post-commit Snapshot
  • 返回值是 Result<(), String>,不是 StateError
  • handler 失败会被记录,但不会回滚已提交的状态

注册方式:

fn register(&self, r: &mut PluginRegistrar) -> Result<(), StateError> {
    r.register_effect::<AuditEffect, _>(AuditEffectHandler)?;
    Ok(())
}

分发生命周期

  1. hook / action handler / tool 调用 emit::<E>()
  2. submit_command 校验所有 effect key 是否都有 handler
  3. 状态变更提交到 store
  4. 依次调用每个 handler
  5. handler 失败只记录,不影响后续 effect
Hook / Tool                         Runtime
    |                                 |
    |-- StateCommand (with effects) ->|
    |                                 |-- validate all effect keys
    |                                 |-- commit state mutations
    |                                 |-- dispatch effects sequentially
    |<--------------------------------|

示例

定义 effect:

use awaken::EffectSpec;
use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AuditPayload {
    pub action: String,
    pub actor: String,
}

pub struct AuditEffect;

impl EffectSpec for AuditEffect {
    const KEY: &'static str = "audit.record";
    type Payload = AuditPayload;
}

在 hook 中发出:

use async_trait::async_trait;
use awaken::{PhaseContext, PhaseHook, StateCommand, StateError};

pub struct AuditHook;

#[async_trait]
impl PhaseHook for AuditHook {
    async fn run(&self, ctx: &PhaseContext) -> Result<StateCommand, StateError> {
        let mut cmd = StateCommand::new();
        cmd.emit::<AuditEffect>(AuditPayload {
            action: "phase_entered".into(),
            actor: "system".into(),
        })?;
        Ok(cmd)
    }
}

处理 effect:

use async_trait::async_trait;
use awaken::{Snapshot, TypedEffectHandler};

pub struct AuditEffectHandler;

#[async_trait]
impl TypedEffectHandler<AuditEffect> for AuditEffectHandler {
    async fn handle_typed(
        &self,
        payload: AuditPayload,
        _snapshot: &Snapshot,
    ) -> Result<(), String> {
        tracing::info!(
            action = %payload.action,
            actor = %payload.actor,
            "audit effect dispatched"
        );
        Ok(())
    }
}

Effects 与 Scheduled Actions 的区别

EffectsScheduled Actions
执行时机commit 后phase 收敛循环内
是否可级联
能否产出 StateCommand
失败处理记录日志,不阻塞错误会上抛
状态可见性post-commit snapshotpre-commit context
适用场景外部 I/O、日志、指标内部控制流、状态变更

另见

事件

agent loop 在执行过程中会持续发出 AgentEvent。这些事件既会被协议编码器消费,也会通过 SSE 发给客户端。

AgentEvent

所有变体在 JSON 中都使用 event_type 作为标签字段(#[serde(tag = "event_type", rename_all = "snake_case")])。

pub enum AgentEvent {
    RunStart {
        thread_id: String,
        run_id: String,
        parent_run_id: Option<String>,    // 为 None 时省略
    },

    RunFinish {
        thread_id: String,
        run_id: String,
        result: Option<Value>,            // 为 None 时省略
        termination: TerminationReason,
    },

    TextDelta { delta: String },

    ReasoningDelta { delta: String },

    ToolCallStart { id: String, name: String },

    ToolCallDelta { id: String, args_delta: String },

    ToolCallReady {
        id: String,
        name: String,
        arguments: Value,
    },

    ToolCallDone {
        id: String,
        message_id: String,
        result: ToolResult,
        outcome: ToolCallOutcome,
    },

    ToolCallStreamDelta {
        id: String,
        name: String,
        delta: String,
    },

    ReasoningEncryptedValue { encrypted_value: String },

    MessagesSnapshot { messages: Vec<Value> },

    ActivitySnapshot {
        message_id: String,
        activity_type: String,
        content: Value,
        replace: Option<bool>,            // 为 None 时省略
    },

    ActivityDelta {
        message_id: String,
        activity_type: String,
        patch: Vec<Value>,
    },

    ToolCallResumed { target_id: String, result: Value },

    StepStart { message_id: String },

    StepEnd,

    InferenceComplete {
        model: String,
        usage: Option<TokenUsage>,        // 为 None 时省略
        duration_ms: u64,
    },

    StateSnapshot { snapshot: Value },

    StateDelta { delta: Vec<Value> },

    Error {
        message: String,
        code: Option<String>,             // 为 None 时省略
    },
}

Crate 路径: awaken::contract::event::AgentEvent

Helper

impl AgentEvent {
    /// 从 RunFinish 的 result 中提取响应文本。
    pub fn extract_response(result: &Option<Value>) -> String
}

SSE 传输格式

事件通过 HTTP Server-Sent Events 传输。每帧使用 SSE 的 id 字段作为单调递增的序列号,支持断点续连:

id: {seq}
data: {json}

其中 {json}AgentEvent 的 JSON 序列化结果。序列号从 0 开始,run 内每发出一个事件递增 1。

当流空闲时,服务器还会定期发送注释行(: heartbeat),以防止客户端或代理因长时间推理暂停而超时。客户端应忽略这些注释行,它们不携带任何事件数据。

该功能由 awaken-server::http_sse 中的 format_sse_data_with_id 实现。

Run 输入

向 run 推送消息时,POST 到 /v1/runs/{run_id}/inputs,请求体反序列化为 PushRunInputsPayload

struct PushRunInputsPayload {
    messages: Vec<RunMessage>,
}

struct RunMessage {
    role: String,
    content: String,
}

至少需要一条消息。成功时服务器返回 202 Accepted

RunOutput

run 返回的事件流类型别名:

pub type RunOutput = futures::stream::BoxStream<'static, AgentEvent>;

TerminationReason

run 终止的原因。使用 #[serde(tag = "type", content = "value", rename_all = "snake_case")] 序列化。

  • 无负载的单元变体序列化为 { "type": "natural_end" },不会输出 "value" 键。
  • 带负载的元组变体序列化为 { "type": "stopped", "value": { ... } }
pub enum TerminationReason {
    NaturalEnd,
    BehaviorRequested,
    Stopped(StoppedReason),
    Cancelled,
    Blocked(String),
    Suspended,
    Error(String),
}

StoppedReason

TerminationReason::Stopped 携带的负载。

pub struct StoppedReason {
    pub code: String,
    pub detail: Option<String>,    // 为 None 时省略
}

ToolCallOutcome

pub enum ToolCallOutcome {
    Succeeded,
    Failed,
    Suspended,
}

ToolResult

工具执行结果,在 ToolCallDone 中携带。定义于 awaken::contract::tool::ToolResult

pub struct ToolResult {
    pub tool_name: String,
    pub status: ToolStatus,        // "success" | "pending" | "error"
    pub data: Value,
    pub message: Option<String>,
    pub suspension: Option<Box<SuspendTicket>>,   // 为 None 时省略
    pub metadata: HashMap<String, Value>,          // 为空时省略
}

TokenUsage

pub struct TokenUsage {
    pub prompt_tokens: Option<i32>,
    pub completion_tokens: Option<i32>,
    pub total_tokens: Option<i32>,
    pub cache_read_tokens: Option<i32>,
    pub cache_creation_tokens: Option<i32>,
    pub thinking_tokens: Option<i32>,
}

所有字段为 None 时均从 JSON 中省略。TokenUsage::default() 产生所有字段均为 None 的值。

相关

HTTP API

启用 server feature 后,awaken-server 会通过 Axum 暴露 HTTP API。大多数接口返回 JSON,流式接口返回 SSE。

本页对应当前代码里的路由树:crates/awaken-server/src/routes.rscrates/awaken-server/src/config_routes.rs

健康检查与指标

方法路径说明
GET/health就绪探针;检查 store 连通性,返回 200503
GET/health/live存活探针;始终返回 200
GET/metricsPrometheus 指标抓取口

Threads

方法路径说明
GET/v1/threads列出 thread ID
POST/v1/threads创建 thread
GET/v1/threads/summaries列出 thread 摘要
GET/v1/threads/:id获取 thread
PATCH/v1/threads/:id更新 thread 元信息
DELETE/v1/threads/:id删除 thread
POST/v1/threads/:id/cancel取消该 thread 上排队或运行中的某个 job;返回 cancel_requested
POST/v1/threads/:id/decision向该 thread 上等待中的 run 提交 HITL decision
POST/v1/threads/:id/interrupt中断该 thread:递增 thread generation、取消所有待执行 job、中止活动 run;返回 interrupt_requestedsuperseded_jobs 计数。与 /cancel 不同,此接口通过 mailbox.interrupt() 执行完整的“清空并中断“操作
PATCH/v1/threads/:id/metadata更新 metadata 的别名接口
GET/v1/threads/:id/messages列出消息
POST/v1/threads/:id/messages作为后台 run 提交消息
POST/v1/threads/:id/mailbox向 mailbox 推送消息载荷
GET/v1/threads/:id/mailbox查看该 thread 的 mailbox job
GET/v1/threads/:id/runs列出该 thread 的 runs
GET/v1/threads/:id/runs/latest获取最新 run

Runs

方法路径说明
GET/v1/runs列出 runs
POST/v1/runs启动 run,并通过 SSE 返回事件
GET/v1/runs/:id获取 run 记录
POST/v1/runs/:id/inputs向同一 thread 追加后续输入
POST/v1/runs/:id/cancel按 run ID 取消
POST/v1/runs/:id/decision按 run ID 提交 HITL decision

Config 与 Capabilities

这些接口由 config_routes() 提供。读取与 schema 接口要求 AppState 挂接 config store;写接口还要求挂接 config runtime manager,才能在写入后 校验并发布新的 registry snapshot。缺少这些配置时会返回 400,错误为 config management API not enabled

方法路径说明
GET/v1/capabilities列出 agents、tools、plugins、models、providers 和 config namespaces
GET/v1/config/:namespace列出某个 namespace 下的配置项
POST/v1/config/:namespace创建配置项,body 必须含 "id"
GET/v1/config/:namespace/:id获取单个配置项
PUT/v1/config/:namespace/:id整体替换配置项
DELETE/v1/config/:namespace/:id删除配置项
GET/v1/config/:namespace/$schema获取该 namespace 的 JSON Schema
GET/v1/agents/v1/config/agents 的便捷别名
GET/v1/agents/:id/v1/config/agents/:id 的便捷别名

当前内置 namespace:

  • agents
  • models
  • providers
  • mcp-servers

AI SDK v6 路由

方法路径说明
POST/v1/ai-sdk/chat启动 chat run,并流式返回 AI SDK 编码事件
POST/v1/ai-sdk/threads/:thread_id/runs在指定 thread 上启动 run
POST/v1/ai-sdk/agents/:agent_id/runs在指定 agent 上启动 run
GET/v1/ai-sdk/chat/:thread_id/stream按 thread ID 续接 SSE
GET/v1/ai-sdk/threads/:thread_id/stream同上别名
GET/v1/ai-sdk/threads/:thread_id/messages列出 thread 消息
POST/v1/ai-sdk/threads/:thread_id/cancel取消该 thread 上活动或排队中的 run
POST/v1/ai-sdk/threads/:thread_id/interrupt中断 thread(递增 generation、取消待执行 job、中止活动 run)

AG-UI 路由

方法路径说明
POST/v1/ag-ui/run启动 AG-UI run,并流式返回 AG-UI 事件
POST/v1/ag-ui/threads/:thread_id/runs在指定 thread 上启动 run
POST/v1/ag-ui/agents/:agent_id/runs在指定 agent 上启动 run
POST/v1/ag-ui/threads/:thread_id/interrupt中断 thread
GET/v1/ag-ui/threads/:id/messages列出 thread 消息

A2A 路由

方法路径说明
GET/.well-known/agent-card.json获取公共/默认 agent card
POST/v1/a2a/message:send向公共/默认 A2A agent 发送消息
POST/v1/a2a/message:stream通过 SSE 进行流式发送
GET/v1/a2a/tasks列出 A2A 任务
GET/v1/a2a/tasks/:task_id查询任务状态
POST/v1/a2a/tasks/:task_id:cancel取消任务
POST/v1/a2a/tasks/:task_id:subscribe通过 SSE 订阅任务更新
POST/v1/a2a/tasks/:task_id/pushNotificationConfigs创建推送通知配置
GET/v1/a2a/tasks/:task_id/pushNotificationConfigs列出推送通知配置
GET/v1/a2a/tasks/:task_id/pushNotificationConfigs/:config_id获取推送通知配置
DELETE/v1/a2a/tasks/:task_id/pushNotificationConfigs/:config_id删除推送通知配置
GET/v1/a2a/extendedAgentCard获取扩展 agent card;未启用时返回 501
POST/v1/a2a/:tenant/message:send向 tenant 作用域 agent 发送消息
POST/v1/a2a/:tenant/message:streamtenant 作用域流式发送
GET/v1/a2a/:tenant/tasks列出 tenant 作用域任务
GET/v1/a2a/:tenant/tasks/:task_id查询 tenant 作用域任务状态
POST/v1/a2a/:tenant/tasks/:task_id:cancel取消 tenant 作用域任务
POST/v1/a2a/:tenant/tasks/:task_id:subscribe订阅 tenant 作用域任务更新
GET/v1/a2a/:tenant/extendedAgentCard获取 tenant 作用域扩展 agent card

MCP HTTP 路由

方法路径说明
POST/v1/mcpMCP JSON-RPC 请求/响应入口
GET/v1/mcp为 MCP 服务端主动 SSE 预留;当前返回 405

常见查询参数

  • offset:跳过的条数
  • limit:返回上限,范围会被限制在 1..=200
  • cursor:消息历史分页游标;提供后会优先于 offset,历史消息接口响应会返回 next_cursor
  • status:按 run 状态过滤,支持 runningwaitingdone
  • visibility:消息可见性过滤;省略时只看外部消息,all 表示包含内部消息

错误格式

大多数接口返回:

{ "error": "human-readable message" }

MCP 接口返回 JSON-RPC 错误对象,而不是上面的通用形状。

相关

配置

AgentSpec

AgentSpec 是可序列化的 agent 定义。它既可以从 JSON / YAML 加载,也可以用 builder 方法在代码里构造。

pub struct AgentSpec {
    pub id: String,
    pub model: String,
    pub system_prompt: String,
    pub max_rounds: usize,
    pub max_continuation_retries: usize,
    pub context_policy: Option<ContextWindowPolicy>,
    pub reasoning_effort: Option<ReasoningEffort>,
    pub plugin_ids: Vec<String>,
    pub active_hook_filter: HashSet<String>,
    pub allowed_tools: Option<Vec<String>>,
    pub excluded_tools: Option<Vec<String>>,
    pub endpoint: Option<RemoteEndpoint>,
    pub delegates: Vec<String>,
    pub sections: HashMap<String, Value>,
    pub registry: Option<String>,
}

Crate 路径: awaken::registry_spec::AgentSpec(在 awaken::AgentSpec 重新导出)

Builder 方法

AgentSpec::new(id) -> Self
    .with_model(model) -> Self
    .with_system_prompt(prompt) -> Self
    .with_max_rounds(n) -> Self
    .with_reasoning_effort(effort) -> Self
    .with_hook_filter(plugin_id) -> Self
    .with_config::<K>(config) -> Result<Self, StateError>
    .with_delegate(agent_id) -> Self
    .with_endpoint(endpoint) -> Self
    .with_section(key, value: Value) -> Self

类型化配置访问

fn config<K: PluginConfigKey>(&self) -> Result<K::Config, StateError>
fn set_config<K: PluginConfigKey>(&mut self, config: K::Config) -> Result<(), StateError>

ContextWindowPolicy

控制上下文窗口和自动压缩行为。

pub struct ContextWindowPolicy {
    pub max_context_tokens: usize,
    pub max_output_tokens: usize,
    pub min_recent_messages: usize,
    pub enable_prompt_cache: bool,
    pub autocompact_threshold: Option<usize>,
    pub compaction_mode: ContextCompactionMode,
    pub compaction_raw_suffix_messages: usize,
}

ContextCompactionMode

pub enum ContextCompactionMode {
    KeepRecentRawSuffix,
    CompactToSafeFrontier,
}

InferenceOverride

用于单次推理的参数覆盖。所有字段都是 Option,多插件同时写时按字段 last-wins 合并。

pub struct InferenceOverride {
    pub model: Option<String>,
    pub fallback_models: Option<Vec<String>>,
    pub temperature: Option<f64>,
    pub max_tokens: Option<u32>,
    pub top_p: Option<f64>,
    pub reasoning_effort: Option<ReasoningEffort>,
}

方法

fn is_empty(&self) -> bool
fn merge(&mut self, other: InferenceOverride)

ReasoningEffort

pub enum ReasoningEffort {
    None,
    Low,
    Medium,
    High,
    Max,
    Budget(u32),
}

PluginConfigKey trait

把配置 section 名称和 Rust 配置结构绑定在一起:

pub trait PluginConfigKey: 'static + Send + Sync {
    const KEY: &'static str;
    type Config: Default + Clone + Serialize + DeserializeOwned
        + schemars::JsonSchema + Send + Sync + 'static;
}

RemoteEndpoint

远程 backend agent 的配置。当前内置的是 "a2a" backend,backend 专有参数放在 options 中:

pub struct RemoteEndpoint {
    pub backend: String,
    pub base_url: String,
    pub auth: Option<RemoteAuth>,
    pub target: Option<String>,
    pub timeout_ms: u64,
    pub options: BTreeMap<String, Value>,
}

pub struct RemoteAuth {
    pub r#type: String,
    // backend 专有认证字段,例如 bearer 用 { "token": "..." }
}

ServerConfig

HTTP server 配置。需启用 server feature。

pub struct ServerConfig {
    pub address: String,                   // default: "0.0.0.0:3000"
    pub sse_buffer_size: usize,            // default: 64
    pub replay_buffer_capacity: usize,     // default: 1024
    pub shutdown: ShutdownConfig,
    pub max_concurrent_requests: usize,    // default: 100
    pub a2a_extended_card_bearer_token: Option<String>,
}

pub struct ShutdownConfig {
    pub timeout_secs: u64,                 // default: 30
}

Crate 路径: awaken_server::app::ServerConfig

字段类型默认值说明
addressString"0.0.0.0:3000"服务器绑定的 socket 地址
sse_buffer_sizeusize64单连接 SSE 通道最大缓冲帧数
replay_buffer_capacityusize1024每次 run 用于断线续接的最大 replay buffer 帧数
max_concurrent_requestsusize100最大并发请求数;超出时返回 503
a2a_extended_card_bearer_tokenOption<String>None设置后启用带认证的 GET /v1/a2a/extendedAgentCard
shutdown.timeout_secsu6430强制退出前等待飞行中请求排空的秒数

MailboxConfig

mailbox 持久化队列配置。控制租约计时、扫描/GC 间隔以及失败任务的重试行为。

pub struct MailboxConfig {
    pub lease_ms: u64,                          // default: 30_000
    pub suspended_lease_ms: u64,                // default: 600_000
    pub lease_renewal_interval: Duration,       // default: 10s
    pub sweep_interval: Duration,               // default: 30s
    pub gc_interval: Duration,                  // default: 60s
    pub gc_ttl: Duration,                       // default: 24h
    pub default_max_attempts: u32,              // default: 5
    pub default_retry_delay_ms: u64,            // default: 250
    pub max_retry_delay_ms: u64,                // default: 30_000
}

Crate 路径: awaken_server::mailbox::MailboxConfig

字段类型默认值说明
lease_msu6430_000活跃 run 的租约时长(毫秒)
suspended_lease_msu64600_000等待人工输入的挂起 run 的租约时长(毫秒)
lease_renewal_intervalDuration10sworker 续约频率
sweep_intervalDuration30s扫描过期租约、回收孤儿任务的频率
gc_intervalDuration60s对已终止(完成/失败)任务进行垃圾回收的频率
gc_ttlDuration24h已终止任务在被清除前的保留时长
default_max_attemptsu325任务进入死信队列前的最大投递次数
default_retry_delay_msu64250两次重试之间的基础延迟(毫秒)
max_retry_delay_msu6430_000指数退避的最大延迟上限(毫秒)

LlmRetryPolicy

LLM 推理失败后的重试与 fallback model 策略,支持指数退避。可通过 AgentSpec"retry" section 按 agent 配置。

pub struct LlmRetryPolicy {
    pub max_retries: u32,              // default: 2
    pub fallback_models: Vec<String>,  // default: []
    pub backoff_base_ms: u64,          // default: 500
}

Crate 路径: awaken_runtime::engine::retry::LlmRetryPolicy

字段类型默认值说明
max_retriesu322初次调用后的最大重试次数(0 表示不重试)
fallback_modelsVec<String>[]主模型耗尽重试后依次尝试的备用模型列表
backoff_base_msu64500指数退避的基础延迟(毫秒);实际延迟 = min(base × 2^attempt, 8000ms)。设为 0 可禁用退避

AgentSpec 集成

通过 "retry" section 配置:

use awaken_runtime::engine::retry::RetryConfigKey;

let spec = AgentSpec::new("my-agent")
    .with_config::<RetryConfigKey>(LlmRetryPolicy {
        max_retries: 3,
        fallback_models: vec!["claude-sonnet-4-20250514".into()],
        backoff_base_ms: 1000,
    })?;

CircuitBreakerConfig

每个模型单独维护的熔断器配置。通过短路对失败过多的模型的请求,防止级联故障。冷却期过后熔断器进入半开状态,允许有限的探测请求;成功后完全关闭。

pub struct CircuitBreakerConfig {
    pub failure_threshold: u32,    // default: 5
    pub cooldown: Duration,        // default: 30s
    pub half_open_max: u32,        // default: 1
}

Crate 路径: awaken_runtime::engine::circuit_breaker::CircuitBreakerConfig

字段类型默认值说明
failure_thresholdu325触发熔断器打开并拒绝请求所需的连续失败次数
cooldownDuration30s熔断器从打开状态过渡到半开状态前的等待时长
half_open_maxu321半开状态下允许的最大探测请求数;失败则重新打开,成功则完全关闭

Feature flags 及其效果

Flag运行时行为
permission注册权限插件,可对工具启用 HITL 审批
observability注册观测插件,发出 traces / metrics
mcp启用 MCP 工具桥接
skills启用技能子系统
reminder注册 reminder 插件,在工具执行后根据模式规则注入上下文消息
server启用 HTTP / SSE server 与协议适配层
generative-ui启用生成式 UI 组件流

工作区还包含不通过门面 feature 暴露的扩展 crate,当前包括 awaken-ext-deferred-tools

自定义插件配置

插件通过 PluginConfigKey 声明类型化配置 section,并通过 config_schemas() 提供 JSON Schema,用于 resolve 阶段校验。

声明 schema 用于校验

fn config_schemas(&self) -> Vec<ConfigSchema> {
    vec![ConfigSchema {
        key: RateLimitConfigKey::KEY,
        json_schema: schemars::schema_for!(RateLimitConfig),
    }]
}

在运行时读取配置

let cfg = ctx.agent_spec().config::<RateLimitConfigKey>()?;

示例

use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use awaken::PluginConfigKey;

#[derive(Debug, Clone, Default, Serialize, Deserialize, JsonSchema)]
pub struct RateLimitConfig {
    pub max_calls_per_step: u32,
    pub cooldown_ms: u64,
}

pub struct RateLimitConfigKey;

impl PluginConfigKey for RateLimitConfigKey {
    const KEY: &'static str = "rate_limit";
    type Config = RateLimitConfig;
}

校验行为

  • section 存在但不合法:resolve 失败
  • section 存在但没有插件声明:记录 warning
  • section 缺失:返回 Config::default()

相关

错误

Awaken 的错误类型统一基于 thiserror,并实现 std::error::ErrorDisplay

StateError

状态系统相关错误,定义在 awaken-contract

pub enum StateError {
    RevisionConflict { expected: u64, actual: u64 },
    MutationBaseRevisionMismatch { left: u64, right: u64 },
    PluginAlreadyInstalled { name: String },
    PluginNotInstalled { type_name: &'static str },
    KeyAlreadyRegistered { key: String },
    UnknownKey { key: String },
    KeyDecode { key: String, message: String },
    KeyEncode { key: String, message: String },
    HandlerAlreadyRegistered { key: String },
    EffectHandlerAlreadyRegistered { key: String },
    PhaseRunLoopExceeded { phase: Phase, max_rounds: usize },
    UnknownScheduledActionHandler { key: String },
    UnknownEffectHandler { key: String },
    ParallelMergeConflict { key: String },
    ToolAlreadyRegistered { tool_id: String },
    Cancelled,
}

ToolError

Tool::validate_argsTool::execute 返回的错误。和 ToolResult::error(...) 不同,ToolError 会直接中止该次 tool call。

pub enum ToolError {
    InvalidArguments(String),
    ExecutionFailed(String),
    Denied(String),
    NotFound(String),
    Internal(String),
}

BuildError

AgentRuntimeBuilder::build() 阶段的错误。

pub enum BuildError {
    State(StateError),
    AgentRegistryConflict(String),
    ToolRegistryConflict(String),
    ModelRegistryConflict(String),
    ProviderRegistryConflict(String),
    PluginRegistryConflict(String),
    ValidationFailed(String),
    DiscoveryFailed(DiscoveryError),
}

RuntimeError

运行时执行错误,例如 agent 无法解析、同一 thread 重入运行等。

pub enum RuntimeError {
    State(StateError),
    ThreadAlreadyRunning { thread_id: String },
    AgentNotFound { agent_id: String },
    ResolveFailed { message: String },
}

InferenceExecutionError

LLM 执行层错误。

pub enum InferenceExecutionError {
    Provider(String),
    RateLimited(String),
    Timeout(String),
    Cancelled,
}

StorageError

ThreadStoreRunStoreThreadRunStore 返回的错误。

pub enum StorageError {
    NotFound(String),
    AlreadyExists(String),
    VersionConflict { expected: u64, actual: u64 },
    Io(String),
    Serialization(String),
}

ResolveError

agent 解析管线中的错误。

pub enum ResolveError {
    AgentNotFound(String),
    ModelNotFound(String),
    ProviderNotFound(String),
    PluginNotFound(String),
    InvalidPluginConfig { plugin: String, key: String, message: String },
    RemoteAgentNotDirectlyRunnable(String),
    ToolIdConflict { tool_id: String, source_a: String, source_b: String },
    EnvBuild(StateError),
}

UnknownKeyPolicy

反序列化未知状态键时的策略。

pub enum UnknownKeyPolicy {
    Error,
    Skip,
}

相关

AI SDK v6 协议

AI SDK v6 适配层会把 Awaken 的 AgentEvent 流转换成 Vercel AI SDK v6 UI Message Stream 格式。这样 useChatuseAssistant 等前端无需自定义解析器就能直接消费 Awaken 的输出。

入口

POST /v1/ai-sdk/chat

请求体

{
  "messages": [{ "role": "user", "content": "Hello" }],
  "threadId": "optional-thread-id",
  "agentId": "optional-agent-id"
}
字段类型必填说明
messagesAiSdkMessage[]输入消息。内容可以是字符串或 content parts 数组。
threadIdstring继续已有 thread。省略时会创建新 thread。
agentIdstring指定 agent。省略时使用默认 agent。

响应

SSE 流(text/event-stream),每一行是一个 JSON 编码的 UIStreamEvent

辅助路由

路由方法说明
/v1/ai-sdk/threads/:thread_id/runsPOST在指定 thread 上启动 run
/v1/ai-sdk/agents/:agent_id/runsPOST在指定 agent 上启动 run
/v1/ai-sdk/chat/:thread_id/streamGET按 thread ID 续接 SSE
/v1/ai-sdk/threads/:thread_id/streamGET同上,thread 路由别名
/v1/ai-sdk/threads/:thread_id/messagesGET读取 thread 消息历史
/v1/ai-sdk/threads/:thread_id/cancelPOST取消 thread
/v1/ai-sdk/threads/:thread_id/interruptPOST中断 thread

事件映射

AiSdkEncoder 会把 AgentEvent 映射到 UIStreamEvent

AgentEventUIStreamEvent
RunStartMessageStart + Data("run-info", ...)
TextDeltaTextStart(如果 block 未打开)+ TextDelta
ReasoningDeltaReasoningStart(如果 block 未打开)+ ReasoningDelta
ReasoningEncryptedValueReasoningStart(如果未打开)+ ReasoningDelta
ToolCallStart关闭当前 text/reasoning block,然后发 ToolCallStart
ToolCallDeltaToolCallDelta
ToolCallDoneToolCallEnd
StepStart(无直接映射)
StepEnd(无直接映射)
InferenceCompleteData("inference-complete", ...)
MessagesSnapshotData("messages-snapshot", ...)
StateSnapshotData("state-snapshot", ...)
StateDeltaData("state-delta", ...)
ActivitySnapshotData("activity-snapshot", ...)
ActivityDeltaData("activity-delta", ...)
RunFinish关闭当前 block,然后发 Data("finish", ...)Finish

UIStreamEvent 类型

  • start
  • text-start / text-delta / text-end
  • reasoning-start / reasoning-delta / reasoning-end
  • tool-call-start / tool-call-delta / tool-call-end
  • data
  • finish

文本块生命周期

编码器会自动管理文本块边界:

  1. 第一个 TextDelta 会打开一个 text block。
  2. 后续 delta 追加到同一个 block。
  3. 遇到 ToolCallStart 会先关闭当前 block。
  4. 工具结束后的新文本会重新开一个新 block。

相关

AG-UI 协议

AG-UI 适配层会把 Awaken 的 AgentEvent 流转换成 AG-UI / CopilotKit 事件格式,使 CopilotKit 前端可以直接驱动 Awaken agent。

入口

POST /v1/ag-ui/run

请求体

{
  "threadId": "optional-thread-id",
  "agentId": "optional-agent-id",
  "messages": [{ "role": "user", "content": "Hello" }],
  "context": {}
}
字段类型必填说明
messagesAgUiMessage[]聊天消息,使用 rolecontent
threadIdstring继续已有 thread
agentIdstring指定目标 agent
contextobject前端上下文透传

响应

SSE 流(text/event-stream),每个 frame 是一个 JSON 编码的 AG-UI Event

辅助路由

路由方法说明
/v1/ag-ui/threads/:thread_id/runsPOST在指定 thread 上启动 run
/v1/ag-ui/agents/:agent_id/runsPOST在指定 agent 上启动 run
/v1/ag-ui/threads/:thread_id/interruptPOST中断指定 thread
/v1/ag-ui/threads/:id/messagesGET读取 thread 消息历史

事件映射

AgentEventAG-UI Event
RunStartRUN_STARTED
TextDeltaTEXT_MESSAGE_START + TEXT_MESSAGE_CONTENT
ReasoningDeltaREASONING_MESSAGE_START + REASONING_MESSAGE_CONTENT
ToolCallStart关闭当前 text/reasoning,然后发 STEP_STARTEDTOOL_CALL_START
ToolCallDeltaTOOL_CALL_ARGS
ToolCallDoneTOOL_CALL_ENDSTEP_FINISHED
StateSnapshotSTATE_SNAPSHOT
StateDeltaSTATE_DELTA
RunFinishRUN_FINISHEDRUN_ERROR

AG-UI 事件类型

  • RUN_STARTED / RUN_FINISHED / RUN_ERROR
  • TEXT_MESSAGE_START / TEXT_MESSAGE_CONTENT / TEXT_MESSAGE_END
  • REASONING_MESSAGE_START / REASONING_MESSAGE_CONTENT / REASONING_MESSAGE_END
  • STEP_STARTED / STEP_FINISHED
  • TOOL_CALL_START / TOOL_CALL_ARGS / TOOL_CALL_END
  • STATE_SNAPSHOT / STATE_DELTA
  • MESSAGES_SNAPSHOT

角色

AG-UI 消息角色使用小写字符串:systemuserassistanttool

文本消息生命周期

  1. 第一个 TextDelta 会发 TEXT_MESSAGE_STARTTEXT_MESSAGE_CONTENT
  2. 后续 delta 只追加 TEXT_MESSAGE_CONTENT
  3. 遇到 ToolCallStartRunFinish 时,当前消息会以 TEXT_MESSAGE_END 关闭。

reasoning 消息采用同样模式。

相关

A2A 协议

A2A 适配器实现了官方 A2A 协议,用于远程 agent 发现、任务委托与 agent 间通信。

Feature gateserver

端点

路径方法说明
/.well-known/agent-card.jsonGET公共/默认 agent card 发现端点。
/v1/a2a/message:sendPOST向公共/默认 A2A agent 发送消息,返回 task 包装结果。
/v1/a2a/message:streamPOST通过 SSE 进行流式发送。
/v1/a2a/tasksGET列出 A2A 任务。
/v1/a2a/tasks/:task_idGET按 task ID 查询状态。
/v1/a2a/tasks/:task_id:cancelPOST取消运行中的任务。
/v1/a2a/tasks/:task_id:subscribePOST通过 SSE 订阅任务更新。
/v1/a2a/tasks/:task_id/pushNotificationConfigsPOST创建推送通知配置。
/v1/a2a/tasks/:task_id/pushNotificationConfigsGET列出推送通知配置。
/v1/a2a/tasks/:task_id/pushNotificationConfigs/:config_idGET / DELETE读取或删除推送通知配置。
/v1/a2a/extendedAgentCardGET扩展 agent card;只有 capabilities.extendedAgentCard=true 时才受支持。

租户/agent 作用域的等价路由位于 /v1/a2a/:tenant/...,例如 /v1/a2a/research/message:send/v1/a2a/research/tasks/:task_id

Agent Card

发现端点返回描述接口与能力的 AgentCard

{
  "name": "My Agent",
  "description": "A helpful assistant",
  "supportedInterfaces": [
    {
      "url": "https://example.com/v1/a2a",
      "protocolBinding": "HTTP+JSON",
      "protocolVersion": "1.0"
    }
  ],
  "version": "1.0.0",
  "capabilities": {
    "streaming": true,
    "pushNotifications": true,
    "stateTransitionHistory": false,
    "extendedAgentCard": false
  },
  "defaultInputModes": ["text/plain"],
  "defaultOutputModes": ["text/plain"],
  "skills": [
    {
      "id": "general",
      "name": "General Q&A",
      "description": "Answer general questions",
      "tags": ["qa"],
      "inputModes": ["text/plain"],
      "outputModes": ["text/plain"]
    }
  ]
}

Agent card 由已注册的 AgentSpec 生成。旧版顶层 url / id 字段不会再输出。

Message Send

{
  "message": {
    "taskId": "optional-client-provided-id",
    "contextId": "optional-client-provided-id",
    "messageId": "msg-123",
    "role": "ROLE_USER",
    "parts": [{ "text": "Summarize this document" }]
  },
  "configuration": {
    "returnImmediately": true
  }
}

服务端会把 A2A task 映射到 Awaken 的 thread / mailbox 执行链路。响应使用 v1 的 task 包装结构:

{
  "task": {
    "id": "optional-client-provided-id",
    "contextId": "optional-client-provided-id",
    "status": {
      "state": "TASK_STATE_SUBMITTED"
    }
  }
}

如果未设置 returnImmediately 或传入 false,适配器会等待任务进入终态或中断态后再返回。

Task 状态

GET /v1/a2a/tasks/:task_id 返回 Task 资源:

{
  "id": "abc-123",
  "contextId": "abc-123",
  "status": {
    "state": "TASK_STATE_COMPLETED",
    "message": {
      "messageId": "msg-response",
      "role": "ROLE_AGENT",
      "parts": [{ "text": "..." }]
    }
  },
  "history": []
}

任务状态使用 v1 枚举名,例如 TASK_STATE_SUBMITTEDTASK_STATE_WORKINGTASK_STATE_COMPLETEDTASK_STATE_FAILEDTASK_STATE_CANCELED

可选能力默认值

Awaken 当前默认启用以下 A2A 能力:

  • streaming = true
  • pushNotifications = true

extendedAgentCard 仍然是可选能力,只有在配置 ServerConfig.a2a_extended_card_bearer_token 后才会启用。未启用时,对应端点会返回符合规范的 unsupported 错误。

远程 Agent 委托

Awaken agent 可以通过 AgentTool::remote() 委托到远程 A2A agent。A2aBackend 会向远端发送 message:send 请求,读取返回的 task.id,再轮询 /tasks/:task_id 直到完成。从 LLM 视角看,这仍然只是一次普通工具调用。

远程 agent 配置写在 AgentSpec 中。RemoteEndpoint 是通用结构,A2A 通过 backend: "a2a" 指定:

{
  "id": "remote-researcher",
  "endpoint": {
    "backend": "a2a",
    "base_url": "https://remote-agent.example.com/v1/a2a",
    "auth": { "type": "bearer", "token": "..." },
    "target": "researcher",
    "timeout_ms": 300000,
    "options": {
      "poll_interval_ms": 1000
    }
  }
}

endpoint 的 agent 会按远程 backend agent 解析。目前内置的委托 backend 是 A2A;没有 endpoint 的 agent 仍在本地运行。

另见

取消

Awaken 使用协作式取消机制来中断 agent run、流式输出和长时间运行的操作。

CancellationToken

CancellationToken 是一个可克隆的句柄,底层由共享的 AtomicBooltokio::sync::Notify 组成。任何一个 clone 调用 cancel(),其他 clone 都会立刻观察到取消状态。

use awaken::CancellationToken;

let token = CancellationToken::new();

方法

pub fn new() -> Self
pub fn cancel(&self)
pub fn is_cancelled(&self) -> bool
pub async fn cancelled(&self)

Trait

  • Clone
  • Default

同步轮询

在同步代码或紧密循环里,可以使用 is_cancelled()

let token = CancellationToken::new();

while !token.is_cancelled() {
    // do work
}

配合 tokio::select! 的异步等待

let token = CancellationToken::new();

tokio::select! {
    result = some_async_work() => {
        // work completed before cancellation
    }
    _ = token.cancelled() => {
        // cancellation was signalled
    }
}

协作式语义

取消不会强杀任务。cancel() 只会设置标志位并唤醒等待者,具体执行逻辑仍需要主动检查 is_cancelled() 或在 select! 中监听 cancelled()

关键特性:

  • 幂等:重复调用 cancel() 是安全的。
  • 共享:所有 clone 看到的是同一份状态。
  • 可见性:原子标志使用强顺序,跨线程立即可见。
  • 立即唤醒:cancel() 会通知所有等待者。

运行时里的用法

运行时会给每个 run 传入 CancellationToken,用于:

  • 中断流式推理
  • 在 phase 之间提前停止 run
  • 把 HTTP / SSE / mailbox 层的取消请求传播到核心运行时
let token = CancellationToken::new();
let clone = token.clone();

tokio::select! {
    _ = async {
        while let Some(chunk) = stream.next().await {
            // process chunk
        }
    } => {}
    _ = clone.cancelled() => {
        // stop processing, clean up
    }
}

新消息到来时的自动取消

同一 thread 上已经有活动 run 时,如果又来了新的输入,mailbox 会先取消旧 run,再启动新 run。这样可以避免 ThreadAlreadyRunning,也避免挂起 run 与新输入互相干扰。

大致顺序是:

  1. Mailbox::submit() 发现该 thread 已有活动 run。
  2. 调用 cancel_and_wait_by_thread() 触发取消并等待线程槽位释放。
  3. 旧 run 以 TerminationReason::Cancelled 结束。
  4. 新 run 启动前清理不成对的 tool call 历史,避免污染上下文。

关键文件

  • crates/awaken-runtime/src/cancellation.rs
  • crates/awaken-runtime/src/runtime/agent_runtime/active_registry.rs
  • crates/awaken-runtime/src/runtime/agent_runtime/runner.rs

工具执行模式

ToolExecutionMode 决定 LLM 在同一轮推理里返回多个 tool call 时,运行时如何执行这些调用。

ToolExecutionMode

#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub enum ToolExecutionMode {
    #[default]
    Sequential,
    ParallelBatchApproval,
    ParallelStreaming,
}

默认值是 Sequential

模式

Sequential

按顺序逐个执行 tool call。

  • 每个调用之间都会刷新状态快照。
  • 遇到挂起会在第一个挂起点停止后续调用。
  • 适合 tool 之间有数据依赖或顺序很重要的场景。

ParallelBatchApproval

并发执行所有 tool call,所有调用看到的是同一份冻结快照。

  • 对挂起决策采用批量回放。
  • 会做并行补丁冲突检查。
  • 不会因单个失败或挂起而提前停止其它调用。

ParallelStreaming

同样并发执行全部调用,但挂起决策一到就立刻回放,不等待其他挂起调用。

  • 适合独立工具很多、希望尽快恢复执行的场景。

对比

行为SequentialParallelBatchApprovalParallelStreaming
执行顺序串行全并发全并发
状态可见性每次调用前刷新冻结快照冻结快照
遇挂起是否停止
遇失败是否停止
decision 回放不适用批量即时
冲突检查

Executor trait

#[async_trait]
pub trait ToolExecutor: Send + Sync {
    async fn execute(
        &self,
        tools: &HashMap<String, Arc<dyn Tool>>,
        calls: &[ToolCall],
        base_ctx: &ToolCallContext,
    ) -> Result<Vec<ToolExecutionResult>, ToolExecutorError>;

    fn name(&self) -> &'static str;

    fn requires_incremental_state(&self) -> bool { false }
}

DecisionReplayPolicy

并行模式下,DecisionReplayPolicy 决定挂起 tool call 的恢复决策何时回放:

pub enum DecisionReplayPolicy {
    Immediate,
    BatchAllSuspended,
}

关键文件

  • crates/awaken-contract/src/contract/executor.rs
  • crates/awaken-runtime/src/execution/executor.rs

相关

架构

Awaken 围绕一个运行时核心及三个外围层面组织:契约类型、服务器/存储适配器以及可选扩展。真正重要的不是 crate 的边界,而是决策在哪里做出。

应用组装
  注册 tool / model / provider / plugin / AgentSpec
        |
        v
AgentRuntime
  解析 AgentSpec -> ResolvedAgent
  从插件构建 ExecutionEnv
  执行 phase loop
  暴露 cancel / decision 等活跃 run 的控制面
        |
        v
服务器与存储表面
  HTTP 路由、mailbox、SSE 回放、协议适配器、
  thread/run 持久化、profile storage

契约层awaken-contract 定义全系统共用的类型:AgentSpecModelSpecProviderSpecToolAgentEvent、transport trait 以及类型化状态模型。这是系统其余部分共同使用的“词汇表“。

运行时核心awaken-runtime 是编排层。它将 agent ID 解析为完整配置(ResolvedAgent),从插件构建 ExecutionEnv,管理活跃 run,并将执行委托给循环运行器和阶段引擎。

服务器与持久化表面awaken-server 将运行时转化为 HTTP 和 SSE 端点、基于 mailbox 的后台执行以及协议适配器。awaken-stores 提供 thread 和 run 的具体持久化后端。awaken-ext-* crate 在阶段和工具边界扩展运行时行为,而不改动核心循环。

请求时序

下图展示一个典型请求在系统中的流转过程:

sequenceDiagram
    participant Client
    participant Server
    participant Runtime
    participant LLM
    participant Tool

    Client->>Server: POST /v1/ai-sdk/chat
    Server->>Runtime: RunRequest (agent_id, thread_id, messages)
    Runtime->>Runtime: Resolve agent (AgentSpec -> ResolvedAgent)
    Runtime->>Runtime: Load thread history
    Runtime->>Runtime: RunStart phase
    loop Step loop
        Runtime->>Runtime: StepStart phase
        Runtime->>Runtime: BeforeInference phase
        Runtime->>LLM: Inference request (messages + tools)
        LLM-->>Runtime: Response (text + tool_calls)
        Runtime->>Runtime: AfterInference phase
        opt Tool calls present
            Runtime->>Runtime: BeforeToolExecute phase
            Runtime->>Tool: execute(args, ctx)
            Tool-->>Runtime: ToolResult
            Runtime->>Runtime: AfterToolExecute phase
        end
        Runtime->>Runtime: StepEnd phase (checkpoint)
    end
    Runtime->>Runtime: RunEnd phase
    Runtime-->>Server: AgentEvent stream
    Server-->>Client: SSE events (protocol-specific encoding)

阶段驱动的执行循环

每次 run 都经历固定的阶段序列。插件注册在各阶段边界运行的钩子,从而控制推理参数、工具执行、状态变更和终止逻辑。

RunStart -> [StepStart -> BeforeInference -> AfterInference
             -> BeforeToolExecute -> AfterToolExecute -> StepEnd]* -> RunEnd

步骤循环持续执行,直到以下任一条件触发:

  • LLM 返回不含工具调用的响应(NaturalEnd)。
  • 插件或停止条件请求终止(StoppedBehaviorRequested)。
  • 工具调用挂起等待外部输入(Suspended)。
  • run 被外部取消(Cancelled)。
  • 发生错误(Error)。

在每个阶段边界,循环在继续执行前会检查取消令牌和 run 的生命周期状态。

仓库结构

awaken
├─ awaken-contract
│  ├─ registry specs
│  ├─ tool / executor / event / transport contracts
│  └─ state model
├─ awaken-runtime
│  ├─ builder + registries + resolve pipeline
│  ├─ AgentRuntime control plane
│  ├─ loop_runner + phase engine
│  ├─ execution / context / policies / profile
│  └─ runtime extensions (handoff, local A2A, background)
├─ awaken-server
│  ├─ routes + config API + mailbox + services
│  ├─ protocols: ai_sdk_v6 / ag_ui / a2a / mcp / acp-stdio
│  └─ transport: SSE relay / replay buffer / transcoder
├─ awaken-stores
└─ awaken-ext-*

设计意图

三项原则指导整体架构:

快照隔离 – 阶段钩子永远看不到部分应用的状态。它们从不可变快照中读取,并向 MutationBatch 写入。在一个阶段的所有钩子收敛后,批次被原子性地提交。这消除了并发钩子之间的数据竞争,使钩子的执行顺序对正确性无关紧要。

追加式持久化 – Thread 消息仅追加,不修改。状态在步骤边界做检查点。这使得从任意检查点重放 run 成为可能,并产生确定性的审计轨迹。

传输无关性 – 运行时通过 EventSink trait 发出 AgentEvent 值。协议适配器(AiSdkEncoderAgUiEncoder)将这些事件转码为具体的传输格式。运行时对 HTTP、SSE 或任何具体协议一无所知。添加新协议意味着实现一个新的编码器——运行时本身无需改变。

另见

智能体解析

当调用 runtime.run(request) 时,请求中的 agent_id 必须被解析为一个完全装配好的 ResolvedAgent – 一个持有 LLM 执行器、工具、插件和执行环境的活引用的结构体。每次调用 resolve() 都会重新执行解析;不会在运行之间缓存或共享任何内容。本文描述三阶段解析流水线及其输入构建器。

流水线概览

解析是一个纯函数:(RegistrySet, agent_id) -> ResolvedAgent。它按顺序经过三个阶段:

flowchart LR
    subgraph Stage1["Stage 1: Lookup"]
        A1[Fetch AgentSpec] --> A2[Resolve ModelSpec]
        A2 --> A3[Get LlmExecutor]
        A3 --> A4{Retry config?}
        A4 -- yes --> A5[Wrap in RetryingExecutor]
        A4 -- no --> A6[Use executor directly]
    end

    subgraph Stage2["Stage 2: Plugin Pipeline"]
        B1[Resolve plugin IDs] --> B2[Inject default plugins]
        B2 --> B3{context_policy set?}
        B3 -- yes --> B4[Add CompactionPlugin + ContextTransformPlugin]
        B3 -- no --> B5[Skip]
        B4 --> B6[Validate config sections]
        B5 --> B6
        B6 --> B7[Build ExecutionEnv]
    end

    subgraph Stage3["Stage 3: Tool Pipeline"]
        C1[Collect global tools] --> C2[Merge delegate agent tools]
        C2 --> C3[Merge plugin-registered tools]
        C3 --> C4[Check for ID conflicts]
        C4 --> C5[Apply allowed/excluded filters]
    end

    Stage1 --> Stage2 --> Stage3

任何阶段的失败都会产生 ResolveError 并中止流程。流水线永远不会返回部分结果。

阶段 1:查找

第一阶段从注册表中获取原始数据:

  1. AgentSpec – 通过 agent_idAgentSpecRegistry 中查找。如果规格包含 endpoint 字段(远程 backend 智能体),解析会以 RemoteAgentNotDirectlyRunnable 失败 – 远程智能体只能作为委托使用,不能直接运行。

  2. ModelSpec – 规格的 model 字段(一个字符串 ID,如 "gpt-4")通过 ModelRegistry 解析为 ModelSpec,将其映射到一个 provider ID 和实际 API 模型名(例如,provider "openai",模型 "gpt-4o")。

  3. LlmExecutor – 模型条目中的提供者 ID 通过 ProviderRegistry 解析为一个活的 LlmExecutor 实例。

  4. 重试装饰 – 如果智能体规格包含 RetryConfigKey 配置段,且 max_retries > 0fallback_models 非空,则执行器会被包装在 RetryingExecutor 装饰器中。

阶段 2:插件流水线

第二阶段组装插件链并构建执行环境。

插件解析

AgentSpec.plugin_ids 中列出的插件通过 ID 从 PluginSource 解析。缺失的插件会产生 ResolveError::PluginNotFound

默认插件注入

解析用户声明的插件后,流水线会注入运行时必需的默认插件。无论智能体如何配置,这些插件始终存在:

  • LoopActionHandlersPlugin – 注册运行时循环用于处理工具调用、发射事件和管理步骤转换的核心动作处理器。没有此插件,循环无法运行。

  • MaxRoundsPlugin – 强制执行智能体规格上配置的 max_rounds 停止条件。使用规格的 max_rounds 值注入。防止失控循环。

条件插件

仅当设置了 AgentSpec.context_policy 时才添加以下插件:

  • CompactionPlugin – 管理上下文窗口压缩(当上下文增长过大时对旧消息进行摘要)。使用规格中的 CompactionConfigKey 配置段创建,缺失时回退到默认值。

  • ContextTransformPlugin – 在每次推理请求前应用上下文窗口策略变换(token 计数、截断、提示缓存)。使用 context_policy 值创建。

构建 ExecutionEnv

插件列表确定后,ExecutionEnv::from_plugins() 对每个插件调用其 register() 方法并传入 PluginRegistrar。插件通过注册器声明:

  • 阶段钩子(按阶段回调)
  • 调度动作处理器
  • 效果处理器
  • 请求变换
  • 状态键注册
  • 工具

结果是一个 ExecutionEnv – 见下文 ExecutionEnv

配置验证

插件可以声明 config_schemas(),返回一组 ConfigSchema 条目。每个条目将一个配置段键与一个 JSON Schema 关联。解析期间,每个声明的 schema 都会针对 AgentSpec.sections 中的对应条目进行验证:

  • 配置段存在 – 针对 JSON Schema 进行验证。失败会产生 ResolveError::InvalidPluginConfig
  • 配置段缺失 – 允许。插件应使用合理的默认值。
  • 配置段存在但无插件认领 – 没有插件为其声明 schema。流水线记录一条警告(可能是配置中的拼写错误)。

阶段 3:工具流水线

第三阶段从所有来源收集工具并生成最终工具集。

工具来源

工具按以下顺序合并:

  1. 全局工具 – 所有通过构建器在 ToolRegistry 中注册的工具(例如 builder.with_tool("search", search_tool))。

  2. 委托智能体工具 – 对于 AgentSpec.delegates 中的每个智能体 ID,流水线创建一个 AgentTool。如果委托有 endpoint(远程),流水线会选择配置的远程 backend。目前内置的远程委托 backend 是 A2A;本地委托仍然创建由解析器支持的本地工具。委托工具需要 a2a 功能标志;没有该标志时,委托会被静默忽略并记录警告。

  3. 插件注册的工具 – 插件在 register() 期间声明的工具,存储在 ExecutionEnv.tools 中。

冲突检测

如果插件注册的工具与全局工具具有相同的 ID,解析会以 ResolveError::ToolIdConflict 失败。这是有意为之的 – 静默覆盖会成为难以调试的问题来源。

过滤

合并后,应用规格的 allowed_toolsexcluded_tools 字段:

  • allowed_tools = None – 保留所有工具。
  • allowed_tools = Some(list) – 仅保留 ID 出现在列表中的工具。其余全部丢弃。
  • excluded_tools – 任何 ID 出现在此列表中的工具都会被移除,即使它在允许列表中。

ExecutionEnv

ExecutionEnv 是插件流水线的每次解析产物。它不是全局或共享的 – 每次 resolve() 调用都会构建一个全新的实例。其内容:

字段类型用途
phase_hooksHashMap<Phase, Vec<TaggedPhaseHook>>在每个阶段边界调用的钩子
scheduled_action_handlersHashMap<String, ScheduledActionHandlerArc>用于调度/延迟动作的命名处理器
effect_handlersHashMap<String, EffectHandlerArc>用于副作用的命名处理器
request_transformsVec<TaggedRequestTransform>LLM 调用前应用于推理请求的变换
key_registrationsVec<KeyRegistration>运行开始时安装到状态存储的状态键
toolsHashMap<String, Arc<dyn Tool>>插件提供的工具(在阶段 3 合并到主工具集)
pluginsVec<Arc<dyn Plugin>>用于生命周期钩子的插件引用(on_activate/on_deactivate

每个 TaggedPhaseHookTaggedRequestTransform 都携带其所属插件的 ID,用于诊断和过滤。

AgentRuntimeBuilder

构建器(AgentRuntimeBuilder)是构造 AgentRuntime 的标准方式。它累积五个注册表:

注册表构建器方法用途
MapAgentSpecRegistrywith_agent_spec() / with_agent_specs()智能体定义
MapToolRegistrywith_tool()全局工具
MapModelRegistrywith_model()模型 ID 到提供者 + 模型名称的映射
MapProviderRegistrywith_provider()LLM 执行器实例
MapPluginSourcewith_plugin()插件实例

错误处理

构建器使用延迟错误收集。每个检测到冲突(重复 ID)的 with_* 调用会将 BuildError 推入内部错误列表,而不是返回 Result。第一个收集到的错误在调用 build()build_unchecked() 时浮现。

验证

build() 在构造运行时后对每个已注册的智能体规格执行一次试运行解析。如果任何智能体解析失败(缺失模型、缺失提供者、缺失插件),错误会被收集并作为 BuildError::ValidationFailed 返回。这在启动时而非首次请求时捕获配置错误。

build_unchecked() 跳过此验证。仅在需要延迟解析或智能体将在构造后动态添加时使用。

远程智能体 (A2A)

当启用 a2a 功能标志时,构建器支持 with_remote_agents() 来注册远程 A2A 端点。这些端点被包装在 CompositeAgentSpecRegistry 中,该注册表组合了本地和远程智能体来源。远程智能体通过 build_and_discover() 异步发现。

另见

状态与快照模型

Awaken 使用带快照隔离的类型化状态引擎。本页解释状态原语、作用域、合并策略以及状态变更生命周期。

StateKey Trait

每一类运行时状态都由一个实现了 StateKey 的零大小类型来声明:

pub trait StateKey: 'static + Send + Sync {
    const KEY: &'static str;
    const MERGE: MergeStrategy = MergeStrategy::Exclusive;
    const SCOPE: KeyScope = KeyScope::Run;

    type Value: Clone + Default + Serialize + DeserializeOwned + Send + Sync + 'static;
    type Update: Send + 'static;

    fn apply(value: &mut Self::Value, update: Self::Update);
}

StateKey 同时把字符串键名、值类型、更新类型和合并规则绑定在一起。

KeyScope

pub enum KeyScope {
    Run,
    Thread,
}
  • Run:run 开始时重置
  • Thread:在同一 thread 的多次 run 间保留

MergeStrategy

pub enum MergeStrategy {
    Exclusive,
    Commutative,
}
  • Exclusive:并发写同一键会冲突
  • Commutative:更新可交换,允许并行合并

Snapshot

Snapshot 是某个时刻的不可变状态视图。phase hook 和 tool 只读取 Snapshot,不能直接修改它。

Phase hook 读取: &Snapshot
Phase hook 写入: MutationBatch

这样同一 phase 内的所有 hook 都能看到一致状态。

MutationBatch

MutationBatch 收集一次 hook 执行产生的状态更新:

pub struct MutationBatch {
    base_revision: Option<u64>,
    ops: Vec<Box<dyn MutationOp>>,
    touched_keys: Vec<String>,
}

touched_keys 用于并发冲突检测。

变更生命周期

1. Phase 开始
2. 运行时拍一份 Snapshot
3. 每个 hook 读取 Snapshot,产出 MutationBatch
4. gather 完成
5. 运行时检查 MutationBatch 是否冲突
6. 合并后的 batch 原子提交到 live state
7. 为下一个 phase 创建新 Snapshot

StateMap

StateMap 是类型擦除的状态容器,内部保存所有状态键对应的值。

  • 持久化键会在 checkpoint 时序列化
  • 非持久化键只在内存里存在

StateStore

StateStoreStateMap 之上提供:

  • 快照创建
  • 带 revision 的 batch apply
  • commit hooks
  • StateCommand 处理

另见

Run 生命周期与 Phases

本页描述 run 和 tool call 的状态机、8 个 phase、终止条件、checkpoint 触发点,以及挂起 / 恢复如何桥接 run 层与 tool-call 层。

RunStatus

Running --+--> Waiting --+--> Running (resume)
          |              |
          +--> Done      +--> Done
pub enum RunStatus {
    Running,
    Waiting,
    Done,
}

ToolCallStatus

New --> Running --+--> Succeeded
                  +--> Failed
                  +--> Cancelled
                  +--> Suspended --> Resuming --+--> Running
                                                +--> Suspended
                                                +--> Succeeded/Failed/Cancelled
pub enum ToolCallStatus {
    New,
    Running,
    Suspended,
    Resuming,
    Succeeded,
    Failed,
    Cancelled,
}

Phase Enum

Awaken 的执行顺序由 8 个 phase 固定下来:

pub enum Phase {
    RunStart,
    StepStart,
    BeforeInference,
    AfterInference,
    BeforeToolExecute,
    AfterToolExecute,
    StepEnd,
    RunEnd,
}
  • RunStart:run 级初始化
  • StepStart:每轮推理开始
  • BeforeInference:最后修改推理请求
  • AfterInference:观察 LLM 返回,修改工具列表或请求终止
  • BeforeToolExecute:权限检查、拦截、挂起
  • AfterToolExecute:消费工具结果并触发副作用
  • StepEnd:checkpoint 和 stop policy
  • RunEnd:清理与最终持久化

TerminationReason

pub enum TerminationReason {
    NaturalEnd,
    BehaviorRequested,
    Stopped(StoppedReason),
    Cancelled,
    Blocked(String),
    Suspended,
    Error(String),
}

只有 Suspended 会映射到 RunStatus::Waiting;其他都映射为 Done

Stop Conditions

可通过配置声明 stop 条件,例如:

  • MaxRounds
  • Timeout
  • TokenBudget
  • ConsecutiveErrors
  • StopOnTool
  • ContentMatch
  • LoopDetection

这些条件在 StepEnd 评估。

Checkpoint Triggers

StepEnd 会把以下内容写入 checkpoint:

  • thread messages
  • run 生命周期状态
  • 持久化状态键
  • 挂起的 tool call 状态

从 ToolCall 状态推导 RunStatus

run 的状态本质上是所有 tool call 状态的聚合投影:

fn derive_run_status(calls: &HashMap<String, ToolCallState>) -> RunStatus {
    let mut has_suspended = false;
    for state in calls.values() {
        match state.status {
            ToolCallStatus::Running | ToolCallStatus::Resuming => {
                return RunStatus::Running;
            }
            ToolCallStatus::Suspended => {
                has_suspended = true;
            }
            _ => {}
        }
    }
    if has_suspended { RunStatus::Waiting } else { RunStatus::Done }
}

并行 tool call 时间线

Time  tool_A(需审批)  tool_B(需审批)  tool_C(正常)   → Run Status
────────────────────────────────────────────────────────────────
t0    Created        Created        Created        Running
t1    Suspended      Created        Running        Running
t2    Suspended      Suspended      Running        Running
t3    Suspended      Suspended      Succeeded      Waiting
t4    Resuming       Suspended      Succeeded      Running
t5    Succeeded      Suspended      Succeeded      Waiting
t6    Succeeded      Resuming       Succeeded      Running
t7    Succeeded      Succeeded      Succeeded      Done

挂起如何桥接 run 层与 tool-call 层

当前执行模型(串行 phases)

当前 execute_tools_with_interception 基本分两段:

Phase 1 - Intercept:
  BeforeToolExecute hooks
  可能得到 Suspend / Block / SetResult

Phase 2 - Execute:
  对允许执行的调用做串行或并行执行

如果任一调用挂起,step 会返回 StepOutcome::Suspended,然后:

  1. checkpoint 持久化
  2. 发出 RunFinish(Suspended)
  3. 进入 wait_for_resume_or_cancel

wait_for_resume_or_cancel 循环

loop {
    let decisions = decision_rx.next().await;
    emit_decision_events_and_messages(decisions);
    prepare_resume(decisions);
    detect_and_replay_resume();
    if !has_suspended_calls() {
        return WaitOutcome::Resumed;
    }
}

Resume replay

恢复时会扫描 status == Resuming 的 tool call,并按 ToolCallResumeMode 回放:

Resume ModeReplay 参数行为
ReplayToolCall原始参数完整重跑
UseDecisionAsToolResultdecision 结果直接作为 tool result
PassDecisionToTooldecision 结果作为新参数传给 tool

局限:执行中到达的 decision

在当前串行模型里,decision 即使更早到达,也要等当前 Phase 2 工具执行结束后才会被消费,因此恢复存在额外延迟。

并发执行模型(未来方向)

理想模型会让“等待 decision”和“执行允许的工具”并发进行,使某个工具一旦得到决策就能立刻恢复。

架构

Phase 1 - Intercept

Phase 2 - Concurrent execution:
  execute(tool_C)
  execute(tool_D)
  wait_decision(tool_A) -> replay(tool_A)
  wait_decision(tool_B) -> replay(tool_B)
  barrier: 所有 task 进入终态

按调用分发 decision

共享 decision_rx 需要先 demux 到每个 call 自己的等待通道。

状态转移时机

并发模型下,状态会随着事件实时前进,而不是整批推进。

协议适配器:SSE 重连

长生命周期 run 可能跨多个前端 SSE 连接,尤其是 AI SDK v6 这类“一次 HTTP 请求对应一次 SSE 流”的协议。

问题

Turn 1:
  HTTP POST -> SSE 1 -> tool suspend -> stream 关闭
  但 run 还活着,正在 wait_for_resume_or_cancel

Turn 2:
  新 HTTP POST 带 decision 到来
  如果事件仍然发往旧 channel,就会丢失

解决方案:ReconnectableEventSink

用一个可替换底层 sender 的 event sink 包装原始 channel,新连接到来时先 reconnect() 再投递 decision。

重连流程

Turn 1:
  submit() -> 创建 event_tx1 / event_rx1
  run suspend -> SSE 1 结束

Turn 2:
  新请求创建 event_tx2 / event_rx2
  sink.reconnect(event_tx2)
  send_decision
  后续事件都发往 SSE 2

协议层差异

协议挂起信号恢复机制
AI SDK v6finish(finishReason: "tool-calls")新 HTTP POST -> reconnect -> send_decision
AG-UIRUN_FINISHED(outcome: "interrupt")新 HTTP POST -> reconnect -> send_decision
CopilotKitrenderAndWaitForResponse UI同一 SSE 或新请求恢复

另见

HITL 与 Mailbox

本页解释 Awaken 如何通过 tool call 挂起和 mailbox 队列来实现 human-in-the-loop(HITL)。

SuspendTicket

当 tool call 需要外部审批或输入时,会产出一个 SuspendTicket

pub struct SuspendTicket {
    pub suspension: Suspension,
    pub pending: PendingToolCall,
    pub resume_mode: ToolCallResumeMode,
}

其中:

  • suspension:外部可见的动作描述、提示语、参数 schema
  • pending:事件流里暴露给前端的待处理 tool call 投影
  • resume_mode:decision 到来后如何恢复

ToolCallResumeMode

pub enum ToolCallResumeMode {
    ReplayToolCall,
    UseDecisionAsToolResult,
    PassDecisionToTool,
}
  • ReplayToolCall:用原始参数重跑
  • UseDecisionAsToolResult:直接把 decision 结果当 tool 结果
  • PassDecisionToTool:把 decision 结果作为新参数传入工具

ResumeDecisionAction

pub enum ResumeDecisionAction {
    Resume,
    Cancel,
}

ToolCallResume

恢复载荷:

pub struct ToolCallResume {
    pub decision_id: String,
    pub action: ResumeDecisionAction,
    pub result: Value,
    pub reason: Option<String>,
    pub updated_at: u64,
}

Permission 插件的 Ask 模式

awaken-ext-permission 利用挂起来实现审批:

  1. tool call 命中 behavior: ask
  2. permission checker 生成 SuspendTicket
  3. tool call 进入 Suspended
  4. run 进入 Waiting
  5. 前端提示用户审批
  6. 用户提交 ResumeCancel
  7. Resume 时按 resume_mode 恢复;Cancel 时该 tool call 标记为取消

Mailbox 架构

Mailbox 是所有 run 请求的持久化队列。无论是 streaming、background、A2A 还是内部请求,最终都会变成一个 MailboxJob

MailboxJob

pub struct MailboxJob {
    pub job_id: String,
    pub mailbox_id: String,
    pub agent_id: String,
    pub messages: Vec<Message>,
    pub origin: MailboxJobOrigin,
    pub sender_id: Option<String>,
    pub parent_run_id: Option<String>,
    pub request_extras: Option<Value>,
    pub priority: u8,
    pub dedupe_key: Option<String>,
    pub generation: u64,
    pub status: MailboxJobStatus,
    pub available_at: u64,
    pub attempt_count: u32,
    pub max_attempts: u32,
    pub last_error: Option<String>,
    pub claim_token: Option<String>,
    pub claimed_by: Option<String>,
    pub lease_until: Option<u64>,
    pub created_at: u64,
    pub updated_at: u64,
}

MailboxJobStatus

Queued --claim--> Claimed --ack--> Accepted
  |                  |
  |               nack(retry) --> Queued
  |                  |
  |               nack(permanent) --> DeadLetter
  |
  |-- cancel --> Cancelled
  +-- interrupt(generation bump) --> Superseded
pub enum MailboxJobStatus {
    Queued,
    Claimed,
    Accepted,
    Cancelled,
    Superseded,
    DeadLetter,
}

MailboxJobOrigin

pub enum MailboxJobOrigin {
    User,
    A2A,
    Internal,
}

MailboxStore Trait

MailboxStore 负责 durable enqueue、原子 claim、ack/nack、cancel、lease 延长以及 interrupt。

实现必须保证:

  • enqueue 持久化
  • claim 原子化,且只能一个消费者成功
  • ack/nack 校验 claim token
  • interrupt 与 generation bump 原子完成

MailboxInterrupt

pub struct MailboxInterrupt {
    pub new_generation: u64,
    pub active_job: Option<MailboxJob>,
    pub superseded_count: usize,
}

当更高优先级请求到来时,旧 job 会被 supersede,活动 run 需要被取消。

另见

多智能体模式

Awaken 支持多种 agent 组合方式,包括本地委托、远程 A2A agent、sub-agent 执行以及 handoff。

通过 AgentSpec.delegates 进行 agent 委托

agent 可以通过 delegates 声明它允许委托的子 agent:

{
  "id": "orchestrator",
  "model": "gpt-4o",
  "system_prompt": "You coordinate tasks across specialized agents.",
  "delegates": ["researcher", "writer", "reviewer"]
}

解析时,运行时会为每个 delegate 生成一个 AgentTool,LLM 看见的是普通工具,例如 agent_run_researcher

通过 A2A 使用远程 agent

如果 AgentSpec.endpoint 存在,该 delegate 会被当作远程 A2A agent:

{
  "id": "remote-analyst",
  "model": "unused-for-remote",
  "system_prompt": "",
  "endpoint": {
    "backend": "a2a",
    "base_url": "https://analyst.example.com/v1/a2a",
    "auth": { "type": "bearer", "token": "token-abc" },
    "target": "analyst",
    "timeout_ms": 300000,
    "options": {
      "poll_interval_ms": 1000
    }
  }
}

A2aBackend 会发送 message:send,读取返回的 task.id 并轮询任务状态,再把最终结果包装成 ToolResult 返回给父 agent。

Sub-Agent 模式

串行委托

Orchestrator -> researcher -> result
             -> writer     -> result
             -> reviewer   -> result

父 agent 按顺序调用子 agent,并根据前一步结果决定下一步。

并行委托

如果 LLM 在同一轮推理里一次返回多个 delegate tool call,运行时可并发执行这些子 agent。

嵌套委托

orchestrator
  -> team_lead (delegates: [dev_a, dev_b])
       -> dev_a
       -> dev_b

每一层都独立通过 AgentResolver 解析。理论上没有硬深度限制,但每层都会增加 token 和延迟成本。

Agent Handoff

handoff 会在同一 run 内把控制权切换给另一个 agent,而不是把它当成子任务调用。

机制:

  1. 插件或 handoff 扩展把新 agent ID 写入活动 agent 状态键
  2. loop runner 在下一个步骤边界检测到变化
  3. 重新通过 AgentResolver 解析 agent
  4. 在同一个 thread、同一条消息历史上继续执行

Handoff 与 Delegation 的区别

方面DelegationHandoff
控制流父 agent 调子 agent,拿回结果控制权直接切到新 agent
Thread 连续性子 agent 可以有独立上下文同一 thread、同一消息历史
返回路径结果回到父 agent不返回,后续由新 agent 接管
适用场景任务拆解、专长子任务角色切换、升级处理、路由

AgentBackend Trait

本地和远程委托都基于 AgentBackend

pub trait AgentBackend: Send + Sync {
    async fn execute(
        &self,
        agent_id: &str,
        messages: Vec<Message>,
        event_sink: Arc<dyn EventSink>,
        parent_run_id: Option<String>,
        parent_tool_call_id: Option<String>,
    ) -> Result<DelegateRunResult, AgentBackendError>;
}

这也是实现自定义委托后端的扩展点。

另见

Tool 与 Plugin 的边界

Awaken 明确区分“给 LLM 使用的能力”(tools)和“运行时生命周期扩展”(plugins)。本页解释两者的边界、适用场景以及交互方式。

Tools

tool 是暴露给 LLM 的能力:

pub trait Tool: Send + Sync {
    fn descriptor(&self) -> ToolDescriptor;
    fn validate_args(&self, args: &Value) -> Result<(), ToolError> { Ok(()) }
    async fn execute(&self, args: Value, ctx: &ToolCallContext) -> Result<ToolOutput, ToolError>;
}

特征:

  • 用 ID 注册
  • 对 LLM 可见
  • BeforeToolExecute / AfterToolExecute 窗口执行
  • 只能访问参数和 ToolCallContext
  • 适合文件操作、API 调用、数据库查询等业务能力

Plugins

plugin 是运行时级扩展,不会出现在 LLM 工具列表中:

pub trait Plugin: Send + Sync + 'static {
    fn descriptor(&self) -> PluginDescriptor;
    fn register(&self, registrar: &mut PluginRegistrar) -> Result<(), StateError>;
    fn config_schemas(&self) -> Vec<ConfigSchema> { vec![] }
}

特征:

  • 用插件 ID 注册
  • 对 LLM 不可见
  • 在 phase 边界运行
  • 可以访问更完整的 PhaseContext
  • 适合权限、观察、提醒、请求变换、状态同步等横切逻辑

PluginRegistrar

Plugin::register() 中,插件通过 PluginRegistrar 声明自己需要的能力:

注册方法用途
register_key::<K>()注册状态键
register_phase_hook()在指定 phase 添加 hook
register_tool()向运行时注入插件提供的工具
register_effect_handler()处理 effects
register_scheduled_action()处理 scheduled actions
register_request_transform()在请求到达 LLM 前变换推理请求

插件也可以注册 tool,例如 AgentTool、MCP 工具、skills 工具。对于 LLM 来说,这些和用户手写 tool 没有区别。

何时使用 Tool

适合以下情况:

  • 需要被 LLM 显式按名字调用
  • 是领域能力,如搜索、计算、文件 I/O、API 调用
  • 有结构化输入和可供 LLM 推理的结果

何时使用 Plugin

适合以下情况:

  • 希望逻辑在 phase 边界自动运行
  • 需要修改 inference request
  • 需要在 LLM 看见之前检查或转换 tool result
  • 属于权限、可观测性、提醒等横切关注点
  • 需要注册 state key、effect handler 或 request transform

Tool 和 Plugin 如何交互

插件可以影响 tool 执行,而 tool 自己并不需要感知:

  • permission 插件可在 BeforeToolExecute 阻断或挂起调用
  • observability 插件为 tool 执行包裹 trace
  • reminder 插件在工具完成后注入上下文提示
  • interception pipeline 可改参数、替换结果、甚至跳过执行

插件提供的 Tool 与用户 Tool

方面用户 Tool插件 Tool
注册方式AgentRuntimeBuilder::with_tool()PluginRegistrar::register_tool()
生命周期跟 runtime 一起存在跟插件激活状态绑定
配置来源直接构造来自插件配置或 agent spec
示例业务工具AgentTool、MCP tools、skill tools

另见

插件系统内部机制

本文介绍插件系统的内部运作机制:插件如何注册和激活、钩子如何执行和解决冲突、阶段收敛循环的工作原理,以及请求变换、效果、推理覆盖和工具拦截在运行时的行为。

关于工具与插件的高层边界,参见 Tool and Plugin Boundary。关于阶段生命周期,参见 Run Lifecycle and Phases

插件注册与激活

当插件被加载时,其 register() 方法会以 PluginRegistrar 为参数被调用。插件声明两类组件:

结构组件 始终可用,不受激活状态影响:

  • 状态键(register_key::<K>()
  • 调度动作处理器(register_scheduled_action::<A, H>()
  • 效果处理器(register_effect::<E, H>()

行为组件 仅在插件通过激活过滤器时才处于活跃状态:

  • 阶段钩子(register_phase_hook()
  • 工具(register_tool()
  • 请求变换(register_request_transform()

激活由 AgentSpec.active_hook_filter 控制:

active_hook_filter行为
空(默认)所有插件的行为组件均处于活跃状态
非空集合仅 ID 在集合中的插件贡献行为组件

这种分离使得基础设施插件(状态管理、动作处理器、效果处理器)可以存在而不影响执行流。例如,一个仅注册效果处理器的日志插件永远不需要出现在 active_hook_filter 中——只要任何插件发出对应的效果,其处理器就会触发。

过滤逻辑在 PhaseRuntime::filter_hooks() 中实现:

fn filter_hooks<'a>(env: &'a ExecutionEnv, ctx: &PhaseContext) -> Vec<&'a TaggedPhaseHook> {
    let hooks = env.hooks_for_phase(ctx.phase);
    let active_hook_filter = &ctx.agent_spec.active_hook_filter;
    hooks
        .iter()
        .filter(|tagged| {
            active_hook_filter.is_empty()
                || active_hook_filter.contains(&tagged.plugin_id)
        })
        .collect()
}

钩子排序与冲突解决

多个插件可以为同一 Phase 注册钩子。引擎通过 gather_and_commit_hooks()crates/awaken-runtime/src/phase/engine.rs)中实现的两阶段流程来处理它们。

快速路径:并行执行,单次提交

同一阶段的所有钩子针对一份冻结快照并行运行。每个钩子接收相同的快照并产生一个 StateCommand。如果没有钩子写入另一个钩子也写入的 MergeStrategy::Exclusive 键,则所有命令在单次批处理中合并并提交。

flowchart LR
    S[冻结快照] --> H1[钩子 A]
    S --> H2[钩子 B]
    S --> H3[钩子 C]
    H1 --> M[合并全部]
    H2 --> M
    H3 --> M
    M --> C[单次提交]

冲突回退:分区与串行重试

如果两个或更多钩子写入同一个 Exclusive 键,引擎检测到冲突并回退:

  1. 分区 —— 按注册顺序遍历命令。贪心地将每个命令添加到“兼容批次“中,前提是其 Exclusive 键与批次中已有的键不重叠。否则,延迟该钩子。
  2. 提交批次 —— 合并并提交兼容批次。
  3. 串行重新执行 —— 延迟的钩子逐个重新运行,每个钩子基于包含所有先前提交结果的新鲜快照执行。
flowchart TD
    PAR[并行运行所有钩子] --> CHECK{Exclusive 键重叠?}
    CHECK -- 否 --> FAST[合并全部,单次提交]
    CHECK -- 是 --> PART[分区:批次 + 延迟]
    PART --> CBATCH[提交兼容批次]
    CBATCH --> SERIAL[串行重新运行延迟的钩子]
    SERIAL --> DONE[所有钩子已提交]

由于钩子是纯函数(冻结快照输入,StateCommand 输出,无副作用),冲突时的重新执行始终是安全的。延迟的钩子能看到批次提交后的更新状态,因此即使原始并行执行会产生冲突,它们也能产生正确的结果。

关于 MergeStrategy 和快照隔离的详细信息,参见 State and Snapshot Model

阶段收敛循环

每个阶段运行一个 GATHER 然后 EXECUTE 的循环,当没有新工作时收敛。

GATHER 阶段

并行运行所有活跃钩子(按上述冲突解决方式处理)。钩子产生的 StateCommand 值可能包含:

  • 状态变更(键更新)
  • 调度动作(在 EXECUTE 阶段处理)
  • 效果(提交后立即派发)

EXECUTE 阶段

处理待执行的调度动作,这些动作的 Phase 匹配当前阶段且其动作键有已注册的处理器。每个动作处理器基于新鲜快照运行,并产生自己的 StateCommand,该命令可能为同一阶段调度新的动作。

如果处理后出现新的匹配动作,循环重复:

flowchart TD
    START[阶段入口] --> GATHER[GATHER:运行钩子,提交]
    GATHER --> EXEC[EXECUTE:处理匹配的动作]
    EXEC --> CHECK{当前阶段有新动作?}
    CHECK -- 是 --> EXEC
    CHECK -- 否 --> DONE[阶段完成]

循环受 DEFAULT_MAX_PHASE_ROUNDS(16)限制。如果动作数量在此限制内未收敛,引擎返回 StateError::PhaseRunLoopExceeded。这可以防止无限反应链,同时允许合法的多步动作级联。

这种收敛设计支持反应式模式:权限检查动作可以在同一个 BeforeToolExecute 阶段调度暂停动作,两者在阶段完成前均被处理。

请求变换钩子

插件可以通过 registrar.register_request_transform() 注册 InferenceRequestTransform 实现。变换在请求到达 LLM 执行器之前修改 InferenceRequest

用例:

  • 系统提示注入 —— 向系统消息追加上下文、指令或提醒
  • 工具列表修改 —— 过滤、重排序或增强发送给 LLM 的工具描述符
  • 参数覆盖 —— 调整温度、最大 token 数或其他推理参数

变换按注册顺序运行,且可组合:每个变换接收经前一个变换修改后的请求。

仅活跃插件的变换会被应用。如果 active_hook_filter 非空,则不在过滤器中的插件的变换会被跳过。

效果处理器

效果是通过 EffectSpec trait 定义的类型化事件:

pub trait EffectSpec: 'static + Send + Sync {
    const KEY: &'static str;
    type Payload: Serialize + DeserializeOwned + Send + Sync + 'static;
}

钩子和动作处理器通过在 StateCommand 上调用 command.emit::<E>(payload) 来发出效果。与调度动作(在特定阶段的收敛循环中执行)不同,效果在命令提交到存储之后派发。

插件通过 registrar.register_effect::<E, H>(handler) 注册效果处理器。当效果被派发时,引擎使用效果载荷和当前快照调用处理器。

关键特性:

  • 即发即忘 —— 处理器失败会被记录日志,但不会阻塞执行或回滚提交。
  • 提交后执行 —— 效果看到的是发出它们的命令已被应用后的状态。
  • 提交时验证 —— 如果命令发出的效果没有已注册的处理器,submit_command 立即返回 StateError::UnknownEffectHandler,防止静默丢弃。

用例:审计日志、指标发送、跨插件通知、外部系统同步。

InferenceOverride 合并

多个插件可以通过调度 SetInferenceOverride 动作来影响推理参数。InferenceOverride 结构体使用按字段后写入者胜出的合并语义:

pub struct InferenceOverride {
    pub model: Option<String>,
    pub fallback_models: Option<Vec<String>>,
    pub temperature: Option<f64>,
    pub max_tokens: Option<u32>,
    pub top_p: Option<f64>,
    pub reasoning_effort: Option<ReasoningEffort>,
}

当两个覆盖被合并时,每个字段独立取最后一个非 None 的值:

pub fn merge(&mut self, other: InferenceOverride) {
    if other.temperature.is_some() {
        self.temperature = other.temperature;
    }
    if other.max_tokens.is_some() {
        self.max_tokens = other.max_tokens;
    }
    // ... 所有字段同理
}

这允许插件覆盖特定参数而不影响其他参数。成本控制插件可以设置 max_tokens,而质量插件设置 temperature,两者互不干扰。如果两者设置同一字段,最后一次合并胜出。

工具拦截优先级

BeforeToolExecute 阶段,插件可以调度 ToolInterceptAction 来控制工具执行流。动作载荷是以下三个变体之一:

pub enum ToolInterceptPayload {
    Block { reason: String },
    Suspend(SuspendTicket),
    SetResult(ToolResult),
}

当多个拦截被调度给同一个工具调用时,按隐式优先级解决:

优先级变体行为
3(最高)Block终止工具执行,使调用失败
2Suspend暂停执行,等待外部决策
1(最低)SetResult以预定义结果短路返回

最高优先级的拦截胜出。如果两个拦截具有相同优先级(例如,两个插件都调度了 Block),第一个被处理的生效,冲突被记录为错误。

如果没有调度拦截,工具正常执行(隐式放行)。

flowchart TD
    BTE[BeforeToolExecute 钩子运行] --> INT{有调度的拦截?}
    INT -- 否 --> EXEC[正常执行工具]
    INT -- 是 --> PRI[按优先级解决]
    PRI --> B[Block:使调用失败]
    PRI --> S[Suspend:等待决策]
    PRI --> R[SetResult:返回预定义结果]

另见

设计取舍

本页总结 Awaken 几个关键架构决策及其权衡。

Snapshot Isolation vs Mutable State

决策:phase hook 只读不可变 Snapshot,写入 MutationBatch,所有变更在 gather 结束后原子提交。

备选:hook 直接修改共享可变状态。

Snapshot IsolationMutable State
正确性不依赖 hook 执行顺序容易受锁粒度和顺序影响
并发性hook 可安全并行需要精细锁或强制串行
复杂度需要 batch / merge / conflict machinery实现直观
可调试性每个 phase 边界都是清晰状态跃迁状态变化交织,难追踪
代价每次 phase 快照需额外 Arc clone无快照开销

为什么选 Snapshot:插件组合不应该依赖隐藏顺序。快照模型让 phase 更接近“状态到变更”的纯函数,便于推理和重放。

Phase-Based Execution vs Event-Driven

决策:执行遵循固定 phase 顺序。

备选:完全事件驱动,插件异步订阅事件。

Phase-BasedEvent-Driven
可预测性
插件组合边界明确易产生隐式耦合
可测试性容易对 phase 序列做单测需要模拟异步事件流
灵活性插入新能力通常要加 phase事件扩展自由
性能顺序 phase 执行引入额外开销可并发处理

为什么选 Phase:agent 执行天然是“推理 -> 工具 -> 检查终止”的顺序流程。phase 模型更适合在固定节点上插入权限、观察、提醒和请求变换。

Typed State Keys vs Dynamic State

决策:状态键是 Rust 类型,必须实现 StateKey

备选HashMap<String, Value> 风格的动态状态。

Typed KeysDynamic State
类型安全编译期保证运行期才发现错误
合并语义每个键明确声明 MergeStrategy需要外部约定
可发现性IDE 可跳转只能 grep 字符串
代价需要定义类型使用方便
可扩展性新键需要修改代码并重新编译运行时可动态添加新键

为什么选 Typed Keys:状态错误往往非常隐蔽。把键名、值类型和更新语义统一固化在类型上,更适合作为运行时核心机制。

Plugin System vs Middleware Chain

决策:使用 PluginRegistrar 做结构化注册,而不是用 middleware 链包裹整条执行路径。

备选:类似 HTTP middleware 的包裹链。

Plugin SystemMiddleware Chain
粒度能分别注册 hook / key / tool / effect只能包整个执行
组合多插件可共享同一 phase顺序高度敏感
选择性激活可按 agent 启停插件需要改链结构
复杂度注册动作较多心智模型简单
横切关注点天然契合——每个插件处理一个关注点每个 middleware 处理一个关注点,但会看到全部流量

为什么选 Plugin System:Awaken 的扩展点并不只在单一调用链上,而是散落在 phase、tool 拦截、state、effects、actions 等多个边界。

Multi-Protocol Server vs Single Protocol

决策:同一个 server 同时提供 AI SDK v6、AG-UI、A2A、MCP HTTP,ACP 则作为独立 stdio 协议模块存在。

备选:只支持一种标准协议。

Multi-ProtocolSingle Protocol
前端兼容性可直接接多种生态客户端需自写适配
维护成本多套 encoder / route一套协议面
测试面更大更小
运行时耦合运行时保持协议无关容易耦合到唯一协议
复杂度多套路由、encoder 类型和事件映射一套路由、一套 encoder

为什么选 Multi-Protocol:当前 agent 生态没有统一协议。把协议复杂度压在 server 层,比要求用户自写桥接更实用。

另见

术语表

术语中文说明
Thread会话线程持久化的对话与状态历史。
Run运行针对某个 thread 的一次执行尝试。
Phase阶段执行循环中的命名阶段。
Snapshot快照传给 hook / tool 的不可变状态视图(struct Snapshot { revision: u64, ext: Arc<StateMap> })。
StateKey状态键带作用域、合并策略和值类型的类型化键。
MutationBatch变更批次在提交前收集的一组状态变更。
AgentRuntime智能体运行时负责解析 agent 和执行 run 的核心运行时。
AgentSpec智能体规约可序列化的 agent 定义。
AgentEvent智能体事件运行时统一事件流。
Plugin插件通过 PluginRegistrar 注册的系统级扩展。
PluginRegistrar插件注册器注册 phase hook、tool、state key、handler 的入口。
PhaseHook阶段钩子绑定到某个 phase 的异步 hook。
PhaseContext阶段上下文hook 执行时拿到的只读上下文。
StateCommand状态命令hook 返回值,包含变更、action、effect。
Tool工具暴露给 agent 的能力接口。
ToolDescriptor工具描述符tool 的 ID、名称、描述、参数 schema。
ToolResult工具结果tool 执行成功、失败或挂起后的结构化结果。
ToolCallContext工具调用上下文tool 执行期可读取状态和上报活动的上下文。
TerminationReason终止原因run 结束的原因。
SuspendTicket挂起票据描述挂起原因、恢复模式和待决 tool call。
MailboxJob邮箱任务后台执行与 HITL 的持久化作业项。
RunRequest运行请求启动 run 的输入。
MergeStrategy合并策略并行状态写入如何合并。
KeyScope键作用域状态键生命周期:RunThread
StateMap状态映射Snapshot 背后的类型安全异构 map。
RunStatus运行状态粗粒度 run 状态:RunningWaitingDone
ToolCallStatus工具调用状态单个 tool call 的状态。
ResolvedAgent已解析智能体从 registries 解析完成、可直接运行的 agent。
AgentResolver智能体解析器把 agent ID 解析成 ResolvedAgent 的组件。
BuildError构建错误AgentRuntimeBuilder::build() 阶段的错误。
RuntimeError运行时错误agent loop 执行中的错误。
InferenceOverride推理覆盖针对单次推理的 model / temperature 等覆盖。
ContextWindowPolicy上下文窗口策略token 预算和压缩策略。
StreamEvent流事件AgentEvent 的带序号封装。
TokenUsage令牌用量LLM 推理返回的 token 统计。
ExecutionEnv执行环境解析后组装出来的 hook、tool、handler 集合。
CommitHook提交钩子状态提交到存储时触发的 hook。

常见问题

支持哪些 LLM provider?

任何兼容 genai 的 provider 都可以,包括 OpenAI、Anthropic、DeepSeek、Google Gemini、Ollama 等。当前做法不是在 AgentSpec 里直接写 provider 名,而是把 AgentSpec.model 写成模型注册表里的 ID,再由 ModelSpec 解析到 provider 和真实模型名。

如何添加新的存储后端?

实现 awaken-contract 里的 ThreadRunStore trait。它要求你支持 thread、message、run、checkpoint 的读写。可以参考 awaken-stores 里的 InMemoryStoreFileStorePostgresStore

不启用 server 能用 awaken 吗?

可以。AgentRuntime 本身就是独立运行时。你可以自己构造 RunRequest,再直接调用 runtime.run(request, sink)awaken-server 只是附加的 HTTP / SSE / mailbox / protocol gateway。

如何运行多个 agent?

两种主要方式:

  • 本地委托:在 AgentSpec.delegates 里声明可委托的 agent ID,由运行时在步骤边界完成委托切换。
  • 远程 A2A:给 AgentSpec.endpoint 配置远端地址,或注册远程 agent,让它通过 A2A 以“工具调用”的形式执行。

Run scope 和 Thread scope 的区别是什么?

  • Run:只在一次 run 生命周期内有效。run 结束即清空。适合步骤计数、预算、临时上下文。
  • Thread:在同一 thread 的多次 run 之间持续存在。适合用户偏好、会话记忆、长期状态。

如何处理工具错误?

可恢复错误返回 ToolResult::error(...),这样错误会以 tool 响应消息的形式回写给 LLM,LLM 可以继续决定下一步。如果是参数校验失败或真正要中止工具执行的错误,则返回 ToolError

工具可以并行执行吗?

可以。通过 ToolExecutionMode 配置。并行模式下,运行时会并发执行同一步里的多个 tool call,并在下一轮推理前收集结果、做冲突检查和状态合并。

run 卡住时怎么排查?

先看 run 的状态:

  • Waiting:通常是 HITL 决策未完成,检查 SuspendTicket、mailbox 和待处理 decision。
  • Running:检查 max_rounds、timeout、工具是否阻塞,以及是否有流式调用未结束。

如果需要细节,启用 observability 插件查看 phase、tool、inference 级别的 tracing。

不连真实 LLM 怎么测试?

实现一个返回固定响应的 LlmExecutor 即可。详细模式见测试策略

并行工具同时写同一个状态键会怎样?

如果该键是 MergeStrategy::Exclusive,并行合并会失败,相关执行会回退到串行或报冲突。若该键天然支持交换律,应使用 MergeStrategy::Commutative

request transform 是怎么工作的?

插件可以注册 InferenceRequestTransform。它会在请求真正发给 LLM 前修改 system prompt、工具列表、推理参数等。只有当前 agent 激活的插件 transform 才会生效。

可以自定义存储后端吗?

可以。状态与消息持久化实现 ThreadRunStore;如果还要支持 HITL / 后台队列,再实现 MailboxStore

context compaction 是怎么做的?

ContextWindowPolicy 启用自动压缩时,运行时会在超过预算后寻找安全边界,把较早消息总结成 <conversation-summary>,再保留最近一段原始上下文。

AI SDK v6、AG-UI、A2A 该怎么选?

  • AI SDK v6:适合 Vercel AI SDK / useChat 前端。
  • AG-UI:适合 CopilotKit 和带生成式 UI 的前端。
  • A2A:适合 agent 到 agent 的服务间编排和远程委托。

它们共享同一个 AgentRuntime,差别主要在协议编码层和前端生态。

从 Tirea 迁移

Awaken 不是对 tirea 的增量升级,而是一次从头重写。本页帮助你把 tirea 0.5 的核心概念映射到 Awaken。

Crate 映射

tirea 0.5awaken说明
tirea-stateawaken-contract状态引擎并入 contract crate
tirea-state-derive移除;直接实现 StateKey
tirea-contractawaken-contract契约统一到一个 crate
tirea-agentosawaken-runtime运行时改名
tirea-store-adaptersawaken-stores存储适配层改名
tirea-agentos-serverawaken-serverserver 改名,协议已并入
tirea-protocol-ai-sdk-v6awaken-server::protocols::ai_sdk_v6合并到 server
tirea-protocol-ag-uiawaken-server::protocols::ag_ui合并到 server
tirea-protocol-acpawaken-server::protocols::acp合并到 server
tirea-extension-permissionawaken-ext-permission改名
tirea-extension-observabilityawaken-ext-observability改名
tirea-extension-skillsawaken-ext-skills改名
tirea-extension-mcpawaken-ext-mcp改名
tirea-extension-handoffawaken-runtime::extensions::handoff并入 runtime
tirea-extension-a2uiawaken-ext-generative-ui改名
tireaawaken门面 crate 改名

状态模型

tirea 使用 derive-macro 状态系统;Awaken 改为 trait-based:

// tirea 0.5
#[derive(StateSlot)]
#[state(key = "my_plugin.counter", scope = "run")]
struct Counter(u32);

// awaken
use awaken::prelude::*;

pub struct Counter;

impl StateKey for Counter {
    const KEY: &'static str = "my_plugin.counter";
    const MERGE: MergeStrategy = MergeStrategy::Exclusive;
    type Value = u32;
    type Update = u32;

    fn apply(value: &mut u32, update: u32) {
        *value = update;
    }
}
tirea 概念Awaken 对应
StateSlot deriveStateKey trait
ScopeDomain::RunKeyScope::Run
ScopeDomain::ThreadKeyScope::Thread
ScopeDomain::Global已移除,通常改为 Thread + ProfileStore
可变 state 直接读写不可变快照 + StateCommand

Action 系统

tirea 使用按 phase 分类的 action enum;Awaken 统一改为 ScheduledActionSpec + handler:

// tirea 0.5
BeforeInferenceAction::AddContextMessage(
    ContextMessage::system("key", "content")
)

// awaken
let mut cmd = StateCommand::new();
cmd.schedule_action::<AddContextMessage>(
    ContextMessage::system("key", "content"),
)?;

Plugin Trait

// tirea 0.5
impl Extension for MyPlugin {
    fn name(&self) -> &str { "my_plugin" }
    fn register(&self, ctx: &mut ExtensionContext) -> Result<()> {
        ctx.add_hook(Phase::BeforeInference, MyHook);
        Ok(())
    }
}

// awaken
impl Plugin for MyPlugin {
    fn descriptor(&self) -> PluginDescriptor {
        PluginDescriptor { name: "my_plugin" }
    }
    fn register(&self, r: &mut PluginRegistrar) -> Result<(), StateError> {
        r.register_phase_hook("my_plugin", Phase::BeforeInference, MyHook)?;
        r.register_key::<MyState>(StateKeyOptions::default())?;
        Ok(())
    }
}

Tool Trait

// tirea 0.5
#[async_trait]
impl TypedTool for MyTool {
    type Args = MyArgs;
    async fn call(&self, args: Self::Args, ctx: ToolContext) -> ToolOutput {
        ToolOutput::success(json!({"result": 42}))
    }
}

// awaken
#[async_trait]
impl Tool for MyTool {
    fn descriptor(&self) -> ToolDescriptor { ... }
    async fn execute(&self, args: Value, ctx: &ToolCallContext) -> Result<ToolOutput, ToolError> {
        Ok(ToolResult::success("my_tool", json!({"result": 42})).into())
    }
}

Runtime Builder

// tirea 0.5
let runtime = AgentOsBuilder::new()
    .agent(agent_spec)
    .tool("search", SearchTool)
    .extension(PermissionExtension::new(rules))
    .build()?;

// awaken
let runtime = AgentRuntimeBuilder::new()
    .with_agent_spec(agent_spec)
    .with_tool("search", Arc::new(SearchTool))
    .with_plugin("permissions", Arc::new(PermissionPlugin::new(rules)))
    .build()?;

被移除的概念

  • StateSlot derive macro
  • Global scope
  • RuntimeEffect
  • EffectLog / ScheduledActionLog
  • ConfigStore / ConfigSlot
  • AgentProfile
  • ExtensionContext 的动态激活

新增的概念

  • PluginRegistrar
  • StateCommandToolOutput
  • ToolInterceptAction
  • CircuitBreaker
  • Mailbox
  • EventReplayBuffer
  • DeferredToolsPlugin
  • ProfileStore

快速检查清单

  • Cargo.toml 里把 tirea 替换成 awaken
  • use tirea::* 改成 use awaken::prelude::*
  • #[derive(StateSlot)] 改成 impl StateKey
  • Extension 改成 Plugin
  • TypedTool 改成 Tool
  • action enum 改成 cmd.schedule_action::<ActionType>(...)
  • AgentOsBuilder 改成 AgentRuntimeBuilder
  • store 和 server import 切到 awaken-storesawaken-server