Skip to content

Latest commit

 

History

History
403 lines (299 loc) · 17.5 KB

File metadata and controls

403 lines (299 loc) · 17.5 KB

thread - event queue management CLI command

A CLI tool for managing event queues (Threads). It persists events and subscription state in SQLite, and leverages notifier for async, non-blocking task scheduling.

决策记录

  1. Thread 目录即 Thread ID:每个 thread 的全部数据存放在一个目录下,通过 thread init <path> 初始化(类似 git init),后续命令通过 --thread <path> 指定。路径经 path.resolve() 规范化后作为 thread id,天然唯一、无需注册。清理时直接删除目录即可,无需专用清理命令。
  2. 直接使用 SQLite:不依赖 xdb 服务,使用 better-sqlite3 直接操作。每个 thread 目录下一个独立的 events.db,数据隔离。
  3. 双轨存储:SQLite 作为查询/订阅状态的主存储;同时维护一份 events.jsonl(只追加),供人类调试浏览,无需 SQLite 客户端。
  4. Event Structure:见第 4 节。凡需要在订阅过滤/分发前判断的字段,统一提到 event 顶层结构中。
  5. Batch 支持push --batch 从 stdin 读 NDJSON(每行一个 payload);pop --limit 默认 100。
  6. Filter 设计:订阅时通过 --filter 指定 SQL WHERE 子句片段(施加在 events 表上)。filter 同时作用于 dispatch(决定是否触发 handler)和 pop(consumer 只拿到匹配的事件)。调用者均为内部命令,无用户输入注入风险。

1. Role

  • Event Storage: Persist events in SQLite + JSONL (dual-track).
  • Subscription & Dispatch: Manage consumer subscriptions; trigger notifier tasks on push.
  • Progress Tracking: Record per-consumer ACK progress for monitoring.

2. Tech Stack & Project Structure

遵循 pai repo 约定:

  • TypeScript + ESM (Node 20+)
  • 构建: tsup (ESM, shebang banner)
  • 测试: vitest (unit, pbt, fixtures)
  • CLI 解析: commander
  • SQLite: better-sqlite3
thread/
├── src/
│   ├── index.ts              # 入口,CLI 解析与分发
│   ├── commands/             # 子命令实现
│   │   ├── init.ts
│   │   ├── push.ts
│   │   ├── pop.ts
│   │   ├── peek.ts
│   │   ├── dispatch.ts
│   │   ├── subscribe.ts
│   │   ├── unsubscribe.ts
│   │   └── info.ts
│   ├── db/
│   │   ├── init.ts           # SQLite 初始化、WAL 模式、schema 迁移
│   │   └── queries.ts        # 封装所有 SQL 操作
│   ├── event-log.ts          # JSONL 追加写入工具
│   ├── notifier-client.ts    # 调用 notifier CLI 的核心逻辑
│   ├── help.ts               # --help / --help --verbose 输出
│   ├── logger.ts             # 运行日志工具
│   └── types.ts              # 共享类型定义
├── vitest/
│   ├── unit/
│   ├── pbt/
│   ├── fixtures/
│   └── helpers/
├── package.json
├── tsconfig.json
├── tsup.config.ts
├── vitest.config.ts
├── SPEC.md
└── USAGE.md

3. Data Directory Layout

每个 thread 的数据完全自包含于其目录下:

<thread-dir>/          # thread id (absolute path after path.resolve())
├── events.db          # SQLite database (WAL mode)
├── events.jsonl       # Append-only event log for human debugging
├── events-<timestamp>.jsonl  # Rotated historical event logs
├── run/               # Consumer runtime .lock files
└── logs/
    ├── thread.log     # Current runtime log
    └── thread-<timestamp>.log  # Rotated historical logs

<thread-dir> 由调用者在命令行通过 --thread <path> 指定,thread 工具在首次写入时自动创建目录结构。

4. Data Protocol

4.1 Event Structure

Field Type Description
id INTEGER Auto-increment primary key, generated by SQLite
created_at TEXT ISO 8601 timestamp, auto-generated on write
source TEXT Structured source address (see 4.3 Source Address Format)
type TEXT Event type, see enum below
subtype TEXT | null Event subtype, see enum below
content TEXT Event payload as string (may contain serialized JSON; thread does not parse it)

sourcetypesubtype 是可用于订阅过滤的字段,统一置于顶层结构。

4.2 Event Type Enum

type subtype Description
message null Inter-actor communication message
record toolcall Atomic record of an actor's tool invocation
record decision Actor decision record for external observability

subtype 可随需求扩展(如 artifact 生命周期事件、thread 状态事件等)。

4.3 Source Address Format

source 字段是一个冒号分隔的结构化地址字符串,编码了事件来源的完整身份信息。根据前缀分为三类:

external — 来自网关外部(xgw 入站)

external:<channel_type>:<channel_id>:<session_type>:<session_id>:<peer_id>

6 段,标识外部渠道消息的完整来源。由 xgw 在写入 agent inbox 时构造。

说明 示例
external 固定前缀 external
channel_type 渠道类型 telegram, slack, discord
channel_id 渠道实例 ID tg-main, slack-work
session_type 会话类型 dm, group, channel
session_id 会话标识 alice, grp-123, grp-123/topic-456
peer_id 发送者标识 alice, bob

示例:

  • Telegram 单聊 Alice: external:telegram:tg-main:dm:alice:alice
  • Telegram 群聊 Bob: external:telegram:tg-main:group:grp-123:bob
  • Telegram 子会话: external:telegram:tg-main:group:grp-123/topic-456:alice

internal — 来自系统内部(agent 间通信)

internal:<session_type>:<session_id>:<agent_id>

4 段,标识内部 agent 间消息的来源。由发送方 agent 在写入目标 agent inbox 时构造。没有实际值的段使用 default

说明 示例
internal 固定前缀 internal
session_type 会话类型 dm
session_id 会话标识(无特定会话时为 default default, health-check-0320
agent_id 发送方 agent ID warden, maintainer

示例:

  • warden → admin 普通消息: internal:dm:default:warden
  • maintainer 发起特定任务会话: internal:dm:health-check-0320:maintainer

self — agent 自身产生的事件

self

1 段,用于 agent 写入自己的 thread 的 record 事件(toolcall、decision 等)。这些事件不来自任何通信渠道,是 agent 自身行为的记录。

Filter 示例

subscription filter 可以利用 source 的结构化格式进行精确过滤:

  • source LIKE 'external:%' — 所有外部消息
  • source LIKE 'external:telegram:%' — Telegram 来的消息
  • source LIKE 'internal:%:warden' — warden 发的内部消息
  • source = 'self' — agent 自己的 record
  • source LIKE '%:alice' — Alice 发的(external 和 internal 都能匹配)

约定:所有段的值统一小写,不含冒号。session_id 中的子会话分隔符使用 /(与 xgw 子会话归一化约定一致),不与冒号冲突。

5. Data Model (SQLite)

PRAGMA journal_mode=WAL; enabled for high-performance concurrent reads/writes.

5.1 events

CREATE TABLE events (
  id         INTEGER PRIMARY KEY AUTOINCREMENT,
  created_at TEXT    NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')),
  source     TEXT    NOT NULL,
  type       TEXT    NOT NULL,
  subtype    TEXT,
  content    TEXT    NOT NULL
);
CREATE INDEX idx_events_source ON events(source);
CREATE INDEX idx_events_type   ON events(type);

5.2 subscriptions

CREATE TABLE subscriptions (
  consumer_id  TEXT NOT NULL,
  handler_cmd  TEXT NOT NULL,
  filter       TEXT,         -- 可选的 SQL WHERE 子句片段,施加在 events 表上
                             -- 例: "type = 'message'" 或 "source LIKE 'internal:%:warden' AND type = 'message'"
                             -- 为 null 时表示订阅全部事件
  PRIMARY KEY (consumer_id)
);

5.3 consumer_progress

CREATE TABLE consumer_progress (
  consumer_id   TEXT NOT NULL PRIMARY KEY,
  last_acked_id INTEGER NOT NULL DEFAULT 0,  -- 0 表示尚未消费任何事件;SQLite AUTOINCREMENT 从 1 开始,故 id > 0 可取到全部事件
  updated_at    TEXT NOT NULL
);

6. CLI Commands

All commands require --thread <path> to specify the target thread directory. push, pop, peek, subscribe, info support --json output; unsubscribe and dispatch do not.

thread 目录不存在或不是有效 thread 目录时,除 thread init 外所有命令均报错退出(退出码 1,提示先运行 thread init <path>)。判断是否为有效 thread 目录的依据:目录存在且包含 events.db

6.1 Core Operations

thread init

Args: <path> (positional, required — target directory path)

Behavior:

  1. 创建目录结构(run/logs/)。
  2. 初始化 events.db(建表、WAL 模式)。
  3. 创建空的 events.jsonl
  4. 若目录已存在且已是有效 thread 目录,报错退出(退出码 1)。若目录已存在但不是 thread 目录,在其中初始化(类似 git init)。

thread push

Args:

  • --thread <path> (required)
  • --source <name> (required)
  • --type <type> (required)
  • --subtype <subtype> (optional)
  • --content <data> (required in single-event mode)
  • --batch (optional — read NDJSON from stdin, one event object per line; ignores --content)

Behavior:

  1. 将事件插入 events 表(事务)。
  2. 同步 append 到 events.jsonl
  3. 触发调度:执行 notifier task add --author <source> --task-id "dispatch-<thread_path_slug>" --command "thread dispatch --thread <path>"
    • <source> 即本次 push 的 --source 参数值
    • <thread_path_slug> 为 thread 路径将所有非字母数字字符替换为连字符后的结果,超过 40 字符时取前 32 字符加 - 加路径 sha1 前 6 位,例如 /home/user/my-project/threadhome-user-my-project-thread
  4. notifier 返回退出码 1(任务已存在),视为成功正常退出(dispatch 已在队列中)。

--batch mode: stdin 每行为一个 JSON 对象,包含 sourcetypecontent 字段(subtype 可选)。所有行在单个事务中批量插入。batch 模式下仍只触发一次 notifier dispatch(以最后一条事件的 source 作为 --author)。

thread pop

Args:

  • --thread <path> (required)
  • --consumer <id> (required)
  • --last-event-id <id> (required — the max event id already processed; pass 0 for first consumption, since SQLite ids start at 1)
  • --limit <n> (optional, default 100)

Behavior:

  1. 查询该 consumer 的 filter(从 subscriptions 表)。若 consumer 不存在于 subscriptions,报错退出(退出码 1)。
  2. 更新 consumer_progress 中的 last_acked_id--last-event-id 参数值(upsert;首次 pop 时该记录不存在,正常插入)。
  3. Query:
    SELECT * FROM events
    WHERE id > <last-event-id>
      AND (<filter clause, omitted when null>)
    ORDER BY id ASC LIMIT <limit>
  4. Output NDJSON to stdout (one event per line). Empty output if no new events.

ACK Semantics: --last-event-id is the max event id from the previously processed batch, not the starting point of the current fetch. Typical consumer flow: pop(last=0) → process → pop(last=<max processed id>) → process → … → empty result → exit. If a consumer crashes mid-processing, it re-sends the last confirmed id on restart, and unprocessed events are re-delivered (at-least-once semantics).

thread peek

Args:

  • --thread <path> (required)
  • --last-event-id <id> (required — return events with id > this value; pass 0 to read from the beginning)
  • --limit <n> (optional, default 100)
  • --filter <sql-where> (optional — SQL WHERE clause fragment, e.g. "type = 'message'")

Behavior:

  1. Query:
    SELECT * FROM events
    WHERE id > <last-event-id>
      AND (<filter clause, omitted when null>)
    ORDER BY id ASC LIMIT <limit>
  2. Output NDJSON to stdout (one event per line). Empty output if no matching events.

pop 的区别:peek 是纯只读查询,不需要 --consumer 参数,不更新 consumer_progress,不要求调用者在 subscriptions 表中注册。适用于 agent 构建 LLM context 时从 thread 读取最近消息等"读取但不消费"的场景。

thread dispatch (Internal)

Args: --thread <path> (required)

Behavior:

  1. 遍历 subscriptions 表中所有订阅。
  2. 对每个 consumer,从 consumer_progresslast_acked_id(若无记录则视为 0)。查询是否存在未消费的、符合 filter 条件的事件:
    SELECT 1 FROM events
    WHERE id > <last_acked_id>
      AND (<filter clause, omitted when null>)
    LIMIT 1
    若无匹配事件,跳过该 consumer。
  3. 尝试对 run/<consumer_id>.lock 加互斥锁(跨平台方案,见下方说明)。
  4. 若加锁成功,以 shell: true 分离模式 spawn handler_cmd(作为单行 shell 命令执行,支持管道等 shell 特性);若加锁失败(consumer 正在运行),跳过。
  5. 记录调度详情到日志。

文件锁跨平台方案:优先使用通用 Node.js 方案(如 proper-lockfile 或手动 O_EXCL 原子创建锁文件)。若无合适的通用方案,则分别实现 Linux(flock)和 macOS/Windows 的锁逻辑,统一封装在 src/flock-utils.ts 中,对外暴露 tryLock(path): boolean / unlock(path): void 接口。

注意:filter 同时决定 dispatch 是否触发 handler,以及 pop 返回的事件范围(consumer 只拿到它关心的事件)。

6.2 Management Operations

thread subscribe

Args:

  • --thread <path> (required)
  • --consumer <id> (required)
  • --handler <cmd> (required)
  • --filter <sql-where> (optional — SQL WHERE clause fragment, e.g. "type = 'message'"; subscribes to all events when omitted)

Behavior: 插入 subscriptions。若 consumer_id 已存在则报错退出(退出码 1,提示先 thread unsubscribe 再重新订阅)。

注意handler_cmd 由订阅者自行构造,建议在命令中包含 --thread <path>--consumer <id> 参数(或通过其他方式确保 handler 能定位到对应的 thread 和 consumer)。

thread unsubscribe

Args: --thread <path> (required), --consumer <id> (required)

Behavior: 删除对应订阅。不存在时报错退出(退出码 1)。

thread info

Args: --thread <path> (required), --json (optional)

Behavior: Output thread summary including total event count, subscription list, and per-consumer progress.

7. Output Format

7.1 stdout / stderr Contract

  • stdout: Command result data (NDJSON event stream, info summary, etc.).
  • stderr: Progress, debug, error, and warning messages.

7.2 Human / Machine Readability

  • Default output is human-readable.
  • --json enables structured JSON/NDJSON output (push, pop, subscribe, info).

8. Error Handling & Exit Codes

8.1 Exit Codes

Code Meaning
0 Success
1 Logic error (subscription already exists, file not found, etc.)
2 Usage/argument error (missing required args, invalid type, etc.)

8.2 Error Output

  • Default (no --json): human-readable error to stderr, format Error: <what went wrong> - <how to fix>.
  • --json mode: {"error": "...", "suggestion": "..."}

9. Logging

9.1 Runtime Log

Log file: <thread-dir>/logs/thread.log

记录内容

  • push 成功及 notifier 调用状态
  • dispatch 扫描结果:发现哪些 consumer、哪些因加锁失败跳过、哪些成功启动
  • SQLite 致命错误

Log line format:

[2026-03-18T10:30:00.123Z] [INFO] push: source=external:telegram:tg-main:dm:alice:alice type=message id=42
[2026-03-18T10:30:00.200Z] [INFO] dispatch: consumer=worker-1 spawned handler_cmd="pai chat ..."
[2026-03-18T10:30:00.201Z] [INFO] dispatch: consumer=worker-2 skipped (lock held)

9.2 JSONL Event Log

events.jsonl 超过 10000 行时自动轮换(在下次 push 时检查),旧文件重命名为 events-<YYYYMMDD-HHmmss>.jsonl。SQLite 中的事件数据不受影响。

9.3 Rotation Policy

运行日志和 JSONL 事件日志均采用相同策略:超过 10000 行时自动轮换,旧文件重命名为 <name>-<YYYYMMDD-HHmmss>.<ext>,新建空文件继续写入。

10. Idempotency & Safety

  1. Push:非幂等,每次调用生成新 event id。但触发的 dispatch 调度在 notifier 层是幂等的(相同 task-id 不重复排队)。
  2. Dispatch 安全性:文件锁确保同一 consumer 不会被重复启动。
  3. 原子性:所有数据库写操作包裹在事务中。batch push 整批在单个事务中完成。

11. Environment Variables

变量 说明
无全局 HOME 变量 thread 目录由调用者通过 --thread <path> 显式指定