A CLI tool for managing event queues (Threads). It persists events and subscription state in SQLite, and leverages notifier for async, non-blocking task scheduling.
- Thread 目录即 Thread ID:每个 thread 的全部数据存放在一个目录下,通过
thread init <path>初始化(类似git init),后续命令通过--thread <path>指定。路径经path.resolve()规范化后作为 thread id,天然唯一、无需注册。清理时直接删除目录即可,无需专用清理命令。 - 直接使用 SQLite:不依赖 xdb 服务,使用
better-sqlite3直接操作。每个 thread 目录下一个独立的events.db,数据隔离。 - 双轨存储:SQLite 作为查询/订阅状态的主存储;同时维护一份
events.jsonl(只追加),供人类调试浏览,无需 SQLite 客户端。 - Event Structure:见第 4 节。凡需要在订阅过滤/分发前判断的字段,统一提到 event 顶层结构中。
- Batch 支持:
push --batch从 stdin 读 NDJSON(每行一个 payload);pop --limit默认 100。 - Filter 设计:订阅时通过
--filter指定 SQL WHERE 子句片段(施加在events表上)。filter 同时作用于 dispatch(决定是否触发 handler)和 pop(consumer 只拿到匹配的事件)。调用者均为内部命令,无用户输入注入风险。
- Event Storage: Persist events in SQLite + JSONL (dual-track).
- Subscription & Dispatch: Manage consumer subscriptions; trigger
notifiertasks onpush. - Progress Tracking: Record per-consumer ACK progress for monitoring.
遵循 pai repo 约定:
- TypeScript + ESM (Node 20+)
- 构建: tsup (ESM, shebang banner)
- 测试: vitest (unit, pbt, fixtures)
- CLI 解析: commander
- SQLite: better-sqlite3
thread/
├── src/
│ ├── index.ts # 入口,CLI 解析与分发
│ ├── commands/ # 子命令实现
│ │ ├── init.ts
│ │ ├── push.ts
│ │ ├── pop.ts
│ │ ├── peek.ts
│ │ ├── dispatch.ts
│ │ ├── subscribe.ts
│ │ ├── unsubscribe.ts
│ │ └── info.ts
│ ├── db/
│ │ ├── init.ts # SQLite 初始化、WAL 模式、schema 迁移
│ │ └── queries.ts # 封装所有 SQL 操作
│ ├── event-log.ts # JSONL 追加写入工具
│ ├── notifier-client.ts # 调用 notifier CLI 的核心逻辑
│ ├── help.ts # --help / --help --verbose 输出
│ ├── logger.ts # 运行日志工具
│ └── types.ts # 共享类型定义
├── vitest/
│ ├── unit/
│ ├── pbt/
│ ├── fixtures/
│ └── helpers/
├── package.json
├── tsconfig.json
├── tsup.config.ts
├── vitest.config.ts
├── SPEC.md
└── USAGE.md
每个 thread 的数据完全自包含于其目录下:
<thread-dir>/ # thread id (absolute path after path.resolve())
├── events.db # SQLite database (WAL mode)
├── events.jsonl # Append-only event log for human debugging
├── events-<timestamp>.jsonl # Rotated historical event logs
├── run/ # Consumer runtime .lock files
└── logs/
├── thread.log # Current runtime log
└── thread-<timestamp>.log # Rotated historical logs
<thread-dir> 由调用者在命令行通过 --thread <path> 指定,thread 工具在首次写入时自动创建目录结构。
| Field | Type | Description |
|---|---|---|
id |
INTEGER | Auto-increment primary key, generated by SQLite |
created_at |
TEXT | ISO 8601 timestamp, auto-generated on write |
source |
TEXT | Structured source address (see 4.3 Source Address Format) |
type |
TEXT | Event type, see enum below |
subtype |
TEXT | null | Event subtype, see enum below |
content |
TEXT | Event payload as string (may contain serialized JSON; thread does not parse it) |
source、type、subtype 是可用于订阅过滤的字段,统一置于顶层结构。
| type | subtype | Description |
|---|---|---|
message |
null | Inter-actor communication message |
record |
toolcall |
Atomic record of an actor's tool invocation |
record |
decision |
Actor decision record for external observability |
subtype 可随需求扩展(如 artifact 生命周期事件、thread 状态事件等)。
source 字段是一个冒号分隔的结构化地址字符串,编码了事件来源的完整身份信息。根据前缀分为三类:
external — 来自网关外部(xgw 入站)
external:<channel_type>:<channel_id>:<session_type>:<session_id>:<peer_id>
6 段,标识外部渠道消息的完整来源。由 xgw 在写入 agent inbox 时构造。
| 段 | 说明 | 示例 |
|---|---|---|
external |
固定前缀 | external |
channel_type |
渠道类型 | telegram, slack, discord |
channel_id |
渠道实例 ID | tg-main, slack-work |
session_type |
会话类型 | dm, group, channel |
session_id |
会话标识 | alice, grp-123, grp-123/topic-456 |
peer_id |
发送者标识 | alice, bob |
示例:
- Telegram 单聊 Alice:
external:telegram:tg-main:dm:alice:alice - Telegram 群聊 Bob:
external:telegram:tg-main:group:grp-123:bob - Telegram 子会话:
external:telegram:tg-main:group:grp-123/topic-456:alice
internal — 来自系统内部(agent 间通信)
internal:<session_type>:<session_id>:<agent_id>
4 段,标识内部 agent 间消息的来源。由发送方 agent 在写入目标 agent inbox 时构造。没有实际值的段使用 default。
| 段 | 说明 | 示例 |
|---|---|---|
internal |
固定前缀 | internal |
session_type |
会话类型 | dm |
session_id |
会话标识(无特定会话时为 default) |
default, health-check-0320 |
agent_id |
发送方 agent ID | warden, maintainer |
示例:
- warden → admin 普通消息:
internal:dm:default:warden - maintainer 发起特定任务会话:
internal:dm:health-check-0320:maintainer
self — agent 自身产生的事件
self
1 段,用于 agent 写入自己的 thread 的 record 事件(toolcall、decision 等)。这些事件不来自任何通信渠道,是 agent 自身行为的记录。
Filter 示例:
subscription filter 可以利用 source 的结构化格式进行精确过滤:
source LIKE 'external:%'— 所有外部消息source LIKE 'external:telegram:%'— Telegram 来的消息source LIKE 'internal:%:warden'— warden 发的内部消息source = 'self'— agent 自己的 recordsource LIKE '%:alice'— Alice 发的(external 和 internal 都能匹配)
约定:所有段的值统一小写,不含冒号。session_id 中的子会话分隔符使用 /(与 xgw 子会话归一化约定一致),不与冒号冲突。
PRAGMA journal_mode=WAL; enabled for high-performance concurrent reads/writes.
CREATE TABLE events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
created_at TEXT NOT NULL DEFAULT (strftime('%Y-%m-%dT%H:%M:%fZ', 'now')),
source TEXT NOT NULL,
type TEXT NOT NULL,
subtype TEXT,
content TEXT NOT NULL
);
CREATE INDEX idx_events_source ON events(source);
CREATE INDEX idx_events_type ON events(type);CREATE TABLE subscriptions (
consumer_id TEXT NOT NULL,
handler_cmd TEXT NOT NULL,
filter TEXT, -- 可选的 SQL WHERE 子句片段,施加在 events 表上
-- 例: "type = 'message'" 或 "source LIKE 'internal:%:warden' AND type = 'message'"
-- 为 null 时表示订阅全部事件
PRIMARY KEY (consumer_id)
);CREATE TABLE consumer_progress (
consumer_id TEXT NOT NULL PRIMARY KEY,
last_acked_id INTEGER NOT NULL DEFAULT 0, -- 0 表示尚未消费任何事件;SQLite AUTOINCREMENT 从 1 开始,故 id > 0 可取到全部事件
updated_at TEXT NOT NULL
);All commands require --thread <path> to specify the target thread directory. push, pop, peek, subscribe, info support --json output; unsubscribe and dispatch do not.
thread 目录不存在或不是有效 thread 目录时,除 thread init 外所有命令均报错退出(退出码 1,提示先运行 thread init <path>)。判断是否为有效 thread 目录的依据:目录存在且包含 events.db。
Args: <path> (positional, required — target directory path)
Behavior:
- 创建目录结构(
run/、logs/)。 - 初始化
events.db(建表、WAL 模式)。 - 创建空的
events.jsonl。 - 若目录已存在且已是有效 thread 目录,报错退出(退出码 1)。若目录已存在但不是 thread 目录,在其中初始化(类似
git init)。
Args:
--thread <path>(required)--source <name>(required)--type <type>(required)--subtype <subtype>(optional)--content <data>(required in single-event mode)--batch(optional — read NDJSON from stdin, one event object per line; ignores--content)
Behavior:
- 将事件插入
events表(事务)。 - 同步 append 到
events.jsonl。 - 触发调度:执行
notifier task add --author <source> --task-id "dispatch-<thread_path_slug>" --command "thread dispatch --thread <path>"。<source>即本次 push 的--source参数值<thread_path_slug>为 thread 路径将所有非字母数字字符替换为连字符后的结果,超过 40 字符时取前 32 字符加-加路径 sha1 前 6 位,例如/home/user/my-project/thread→home-user-my-project-thread
- 若
notifier返回退出码1(任务已存在),视为成功正常退出(dispatch 已在队列中)。
--batch mode: stdin 每行为一个 JSON 对象,包含 source、type、content 字段(subtype 可选)。所有行在单个事务中批量插入。batch 模式下仍只触发一次 notifier dispatch(以最后一条事件的 source 作为 --author)。
Args:
--thread <path>(required)--consumer <id>(required)--last-event-id <id>(required — the max event id already processed; pass 0 for first consumption, since SQLite ids start at 1)--limit <n>(optional, default 100)
Behavior:
- 查询该 consumer 的
filter(从subscriptions表)。若 consumer 不存在于subscriptions,报错退出(退出码 1)。 - 更新
consumer_progress中的last_acked_id为--last-event-id参数值(upsert;首次 pop 时该记录不存在,正常插入)。 - Query:
SELECT * FROM events WHERE id > <last-event-id> AND (<filter clause, omitted when null>) ORDER BY id ASC LIMIT <limit>
- Output NDJSON to stdout (one event per line). Empty output if no new events.
ACK Semantics:
--last-event-idis the max event id from the previously processed batch, not the starting point of the current fetch. Typical consumer flow:pop(last=0)→ process →pop(last=<max processed id>)→ process → … → empty result → exit. If a consumer crashes mid-processing, it re-sends the last confirmed id on restart, and unprocessed events are re-delivered (at-least-once semantics).
Args:
--thread <path>(required)--last-event-id <id>(required — return events with id > this value; pass 0 to read from the beginning)--limit <n>(optional, default 100)--filter <sql-where>(optional — SQL WHERE clause fragment, e.g."type = 'message'")
Behavior:
- Query:
SELECT * FROM events WHERE id > <last-event-id> AND (<filter clause, omitted when null>) ORDER BY id ASC LIMIT <limit>
- Output NDJSON to stdout (one event per line). Empty output if no matching events.
与 pop 的区别:peek 是纯只读查询,不需要 --consumer 参数,不更新 consumer_progress,不要求调用者在 subscriptions 表中注册。适用于 agent 构建 LLM context 时从 thread 读取最近消息等"读取但不消费"的场景。
Args: --thread <path> (required)
Behavior:
- 遍历
subscriptions表中所有订阅。 - 对每个 consumer,从
consumer_progress取last_acked_id(若无记录则视为 0)。查询是否存在未消费的、符合 filter 条件的事件:若无匹配事件,跳过该 consumer。SELECT 1 FROM events WHERE id > <last_acked_id> AND (<filter clause, omitted when null>) LIMIT 1
- 尝试对
run/<consumer_id>.lock加互斥锁(跨平台方案,见下方说明)。 - 若加锁成功,以
shell: true分离模式 spawnhandler_cmd(作为单行 shell 命令执行,支持管道等 shell 特性);若加锁失败(consumer 正在运行),跳过。 - 记录调度详情到日志。
文件锁跨平台方案:优先使用通用 Node.js 方案(如
proper-lockfile或手动O_EXCL原子创建锁文件)。若无合适的通用方案,则分别实现 Linux(flock)和 macOS/Windows 的锁逻辑,统一封装在src/flock-utils.ts中,对外暴露tryLock(path): boolean/unlock(path): void接口。
注意:filter 同时决定 dispatch 是否触发 handler,以及
pop返回的事件范围(consumer 只拿到它关心的事件)。
Args:
--thread <path>(required)--consumer <id>(required)--handler <cmd>(required)--filter <sql-where>(optional — SQL WHERE clause fragment, e.g."type = 'message'"; subscribes to all events when omitted)
Behavior: 插入 subscriptions。若 consumer_id 已存在则报错退出(退出码 1,提示先 thread unsubscribe 再重新订阅)。
注意:
handler_cmd由订阅者自行构造,建议在命令中包含--thread <path>和--consumer <id>参数(或通过其他方式确保 handler 能定位到对应的 thread 和 consumer)。
Args: --thread <path> (required), --consumer <id> (required)
Behavior: 删除对应订阅。不存在时报错退出(退出码 1)。
Args: --thread <path> (required), --json (optional)
Behavior: Output thread summary including total event count, subscription list, and per-consumer progress.
stdout: Command result data (NDJSON event stream, info summary, etc.).stderr: Progress, debug, error, and warning messages.
- Default output is human-readable.
--jsonenables structured JSON/NDJSON output (push,pop,subscribe,info).
| Code | Meaning |
|---|---|
0 |
Success |
1 |
Logic error (subscription already exists, file not found, etc.) |
2 |
Usage/argument error (missing required args, invalid type, etc.) |
- Default (no
--json): human-readable error tostderr, formatError: <what went wrong> - <how to fix>. --jsonmode:{"error": "...", "suggestion": "..."}
Log file: <thread-dir>/logs/thread.log
记录内容:
push成功及 notifier 调用状态dispatch扫描结果:发现哪些 consumer、哪些因加锁失败跳过、哪些成功启动- SQLite 致命错误
Log line format:
[2026-03-18T10:30:00.123Z] [INFO] push: source=external:telegram:tg-main:dm:alice:alice type=message id=42
[2026-03-18T10:30:00.200Z] [INFO] dispatch: consumer=worker-1 spawned handler_cmd="pai chat ..."
[2026-03-18T10:30:00.201Z] [INFO] dispatch: consumer=worker-2 skipped (lock held)
events.jsonl 超过 10000 行时自动轮换(在下次 push 时检查),旧文件重命名为 events-<YYYYMMDD-HHmmss>.jsonl。SQLite 中的事件数据不受影响。
运行日志和 JSONL 事件日志均采用相同策略:超过 10000 行时自动轮换,旧文件重命名为 <name>-<YYYYMMDD-HHmmss>.<ext>,新建空文件继续写入。
- Push:非幂等,每次调用生成新 event id。但触发的 dispatch 调度在 notifier 层是幂等的(相同 task-id 不重复排队)。
- Dispatch 安全性:文件锁确保同一 consumer 不会被重复启动。
- 原子性:所有数据库写操作包裹在事务中。batch push 整批在单个事务中完成。
| 变量 | 说明 |
|---|---|
| 无全局 HOME 变量 | thread 目录由调用者通过 --thread <path> 显式指定 |