跳转到内容

线程模型

Thread 表示持久化会话。Thread 本身保存 thread 元信息和少量 run 投影; 消息、run 历史和 dispatch 尝试通过存储 trait 单独管理。

持久化模型是:

Thread 1 -> * MessageRecord
Thread 1 -> * RunRecord
RunRecord 1 -> * RunDispatch
RunRecord 通过 range 或显式 id 读取 MessageRecord。
RunRecord 通过 checkpoint 产出 assistant/tool 消息。
pub struct Thread {
pub id: String,
pub resource_id: Option<String>,
pub parent_thread_id: Option<String>,
pub metadata: ThreadMetadata,
pub active_run_id: Option<String>,
pub open_run_id: Option<String>,
pub latest_run_id: Option<String>,
}

active_run_id 指向正在 worker 上执行的 run。open_run_id 指向当前未完成、 可以继续恢复的用户意图。latest_run_id 指向最近一次 run。过期 dispatch 的 supersede epoch 属于 RunDispatch/mailbox 平面,不属于 thread 真相。

parent_thread_id 在赋值时会规范化:去除前后空白、空字符串反序列化为 Noneresource_id 同样处理。Thread hierarchy 与 run 生命周期联动:当一个 sub-agent run 启动时,RunActivationSnapshot.trace.parent_thread_id 携带父 thread;旧记录 也可能通过 RunRequestSnapshot.parent_thread_id 携带同一信息。checkpoint 投影会在 子 thread 第一次被物化时填充 Thread.parent_thread_id

fn new() -> Self
fn with_id(id: impl Into<String>) -> Self
fn with_title(self, title: impl Into<String>) -> Self
fn with_resource_id(self, resource_id: impl Into<String>) -> Self
fn with_parent_thread_id(self, parent_thread_id: impl Into<String>) -> Self
pub struct ThreadMetadata {
pub created_at: Option<u64>,
pub updated_at: Option<u64>,
pub title: Option<String>,
pub custom: HashMap<String, Value>,
}

消息不直接嵌在 Thread 里,而是通过 ThreadStore 读写:

#[async_trait]
pub trait ThreadStore: Send + Sync {
async fn load_thread(&self, thread_id: &str) -> Result<Option<Thread>, StorageError>;
async fn save_thread(&self, thread: &Thread) -> Result<(), StorageError>;
async fn save_thread_validated(&self, thread: &Thread) -> Result<(), StorageError>;
async fn delete_thread(&self, thread_id: &str) -> Result<(), StorageError>;
async fn delete_thread_with_strategy(
&self,
thread_id: &str,
strategy: ChildThreadDeleteStrategy,
) -> Result<(), StorageError>;
async fn list_threads(&self, offset: usize, limit: usize) -> Result<Vec<String>, StorageError>;
async fn list_threads_query(&self, query: &ThreadQuery) -> Result<ThreadPage, StorageError>;
async fn list_child_threads(&self, parent_thread_id: &str) -> Result<Vec<Thread>, StorageError>;
async fn validate_thread_hierarchy(
&self,
thread_id: &str,
parent_thread_id: Option<&str>,
) -> Result<(), StorageError>;
async fn load_messages(&self, thread_id: &str) -> Result<Option<Vec<Message>>, StorageError>;
async fn load_message_records(&self, thread_id: &str) -> Result<Option<Vec<MessageRecord>>, StorageError>;
async fn save_messages(&self, thread_id: &str, messages: &[Message]) -> Result<(), StorageError>;
async fn delete_messages(&self, thread_id: &str) -> Result<(), StorageError>;
async fn update_thread_metadata(&self, id: &str, metadata: ThreadMetadata) -> Result<(), StorageError>;
}

ThreadStore 的默认辅助方法直接覆盖了 lineage 过滤、父线程存在性/环检测, 以及子线程删除策略,后端不需要重复实现这套逻辑。

pub enum ChildThreadDeleteStrategy {
/// 当存在直接子 thread 时,拒绝删除。
Reject,
/// 保留子 thread,并清空它们的 `parent_thread_id`。默认值。
Detach,
/// 递归删除所有后代 thread,再删除目标 thread。
Cascade,
}

默认的 delete_thread_with_strategy 实现会发出多次低级写操作,不是原子操 作。生产级的并发后端应该用事务或栅栏化的实现覆盖该方法;file、PostgreSQL 与 NATS-buffered 后端已经提供了原生覆盖。

默认的 list_threads_query 会按固定步长扫 list_threads 后在内存里做过滤; file、PostgreSQL 与 NATS-buffered 后端各自提供了原生下推。 ThreadQuery::encode_cursor 返回的游标在 decode 时会校验原始 query 的形状, 因此分页序列不会漂移到不同的过滤条件。

Message 是协议载荷;MessageRecord 是 thread 消息日志的持久化投影:

pub struct MessageRecord {
pub message_id: String,
pub thread_id: String,
pub seq: u64,
pub message: Message,
pub produced_by_run_id: Option<String>,
pub step_index: Option<u32>,
pub tool_call_id: Option<String>,
pub created_at: Option<u64>,
}

默认的 load_message_records 实现基于 load_messages 生成记录,按追加顺序分配从 1 开始的 seq,并从每条 Message 的 metadata 投影出生产者信息。

用户和系统消息通常没有 produced_by_run_id。Assistant、tool 和内部消息应该通过 Message.metadata.run_id 记录生产它们的 run。子 agent 结果可以基于子 run 的输出消息 范围读取,结果是该范围内最后一条非 tool 的 assistant 消息。

ThreadRunStoreThreadStore + RunStore 基础上提供 checkpoint helper。0.6 的生产写入应通过 CommitCoordinator::commit_checkpoint;legacy checkpoint 保留给 conformance tests 和 coordinator 内部使用。checkpoint_append 会在可选的 expected message count guard 下追加 message delta,并返回新的 committed message count。

#[async_trait]
pub trait ThreadRunStore: ThreadStore + RunStore + Send + Sync {
#[deprecated(since = "0.6.0", note = "use CommitCoordinator")]
async fn checkpoint(
&self,
thread_id: &str,
messages: &[Message],
run: &RunRecord,
) -> Result<(), StorageError>;
async fn checkpoint_append(
&self,
thread_id: &str,
messages: &[Message],
expected_version: Option<u64>,
run: &RunRecord,
) -> Result<u64, StorageError>;
}
#[async_trait]
pub trait RunStore: Send + Sync {
async fn create_run(&self, record: &RunRecord) -> Result<(), StorageError>;
async fn load_run(&self, run_id: &str) -> Result<Option<RunRecord>, StorageError>;
async fn latest_run(&self, thread_id: &str) -> Result<Option<RunRecord>, StorageError>;
async fn list_runs(&self, query: &RunQuery) -> Result<RunPage, StorageError>;
}
pub struct RunRecord {
pub run_id: String,
pub thread_id: String,
pub agent_id: String,
pub parent_run_id: Option<String>,
pub resolution_id: Option<String>,
pub activation: Option<RunActivationSnapshot>,
pub request: Option<RunRequestSnapshot>,
pub input: Option<RunMessageInput>,
pub output: Option<RunMessageOutput>,
pub status: RunStatus,
pub termination_reason: Option<TerminationReason>,
pub final_output: Option<String>,
pub error_payload: Option<Value>,
pub dispatch_id: Option<String>,
pub session_id: Option<String>,
pub transport_request_id: Option<String>,
pub waiting: Option<RunWaitingState>,
pub outcome: Option<RunOutcome>,
pub created_at: u64,
pub started_at: Option<u64>,
pub finished_at: Option<u64>,
pub updated_at: u64,
pub steps: usize,
pub input_tokens: u64,
pub output_tokens: u64,
pub state: Option<PersistedState>,
}

RunRecord 是一次用户意图的事实来源。它保存 request 元信息、生命周期状态、 waiting 状态、输出/错误结果和 trace id。run 绑定到已发布 registry snapshot 时, resolution_id 是 server 拥有的不透明 binding id;runtime 不再把 registry manifest 嵌入 run record。它不拥有消息正文。

RunActivationSnapshot 和 RunRequestSnapshot

Section titled “RunActivationSnapshot 和 RunRequestSnapshot”

RunActivationSnapshot 是当前可重放的 activation 投影:

pub struct RunActivationSnapshot {
pub intent: RunIntent,
pub input: RunInputSnapshot,
pub options: RunOptions,
pub trace: RunTraceContext,
pub seeded_decisions: Vec<(String, ToolCallResume)>,
pub resolution_id: Option<String>,
}

RunRequestSnapshot 是仍可读取的旧 request 投影:

pub struct RunRequestSnapshot {
pub origin: RunRequestOrigin,
pub sender_id: Option<String>,
pub input_message_ids: Vec<String>,
pub input_message_count: u64,
pub request_extras: Option<Value>,
pub decisions: Vec<RunResumeDecision>,
pub frontend_tools: Vec<ToolDescriptor>,
pub parent_thread_id: Option<String>,
pub transport_request_id: Option<String>,
}

两种 snapshot 都只引用 thread 拥有的消息记录或范围,不拥有消息正文。

RunMessageInput 描述 run 读取的 thread 消息范围或显式消息选择; RunMessageOutput 描述 run 产出的消息。两者都引用 thread 拥有的消息记录:

pub struct RunMessageInput {
pub thread_id: String,
pub range: Option<MessageSeqRange>,
pub trigger_message_ids: Vec<String>,
pub selected_message_ids: Vec<String>,
pub context_policy: Option<String>,
pub compacted_snapshot_id: Option<String>,
}

RunDispatch 负责 delivery、lease、retry 和队列审计。它不是 agent 成功失败的事实来源。

Queued -> Claimed -> Acked | Cancelled | Superseded | DeadLetter

Acked 只表示这个 dispatch 已消费,不会再重试。判断 agent 是否成功,需要读取 RunRecord.statusRunRecord.waitingRunRecord.outcome