diff --git a/pi/extensions/heartbeat.test.mjs b/pi/extensions/heartbeat.test.mjs index 11c63df..1aea812 100644 --- a/pi/extensions/heartbeat.test.mjs +++ b/pi/extensions/heartbeat.test.mjs @@ -63,6 +63,82 @@ function parseTodo(content) { } } +function hasReplyLogEntry(replyLogContent, threadTs) { + const lines = replyLogContent.split("\n"); + for (const line of lines) { + const trimmed = line.trim(); + if (!trimmed) continue; + try { + const entry = JSON.parse(trimmed); + if (entry?.thread_ts === threadTs) return true; + } catch { + // Ignore malformed JSONL lines. + } + } + return false; +} + +function hasOutboundSendCommand(sessionJsonlContent, threadTs) { + const escapedThreadTs = threadTs.replace(/[.*+?^${}()|[\]\\]/g, "\\$&"); + const threadTsPattern = new RegExp(`["']thread_ts["']\\s*:\\s*["']${escapedThreadTs}["']`); + + for (const line of sessionJsonlContent.split("\n")) { + const trimmed = line.trim(); + if (!trimmed) continue; + + let parsed; + try { + parsed = JSON.parse(trimmed); + } catch { + continue; + } + + if (parsed?.type !== "message") continue; + if (parsed?.message?.role !== "assistant") continue; + const items = parsed?.message?.content; + if (!Array.isArray(items)) continue; + + for (const item of items) { + if (item?.type !== "toolCall") continue; + if (item?.name !== "bash") continue; + const command = typeof item?.arguments?.command === "string" ? item.arguments.command : ""; + if (!command.includes("curl")) continue; + if (!command.includes("/send")) continue; + if (!threadTsPattern.test(command)) continue; + return true; + } + } + + return false; +} + +function slackTsToMs(ts) { + const parsed = Number.parseFloat(ts); + if (!Number.isFinite(parsed) || parsed <= 0) return null; + return Math.floor(parsed * 1000); +} + +function extractMentionThreadTs(logTail) { + const mentionThreadTs = new Set(); + + for (const line of logTail.split("\n")) { + if (!line.includes("app_mention")) continue; + + const threadMatch = line.match(/\bthread_ts:\s*(\d+\.\d+)/); + if (threadMatch?.[1]) { + mentionThreadTs.add(threadMatch[1]); + continue; + } + + const tsMatch = line.match(/\bts:\s*(\d+\.\d+)/); + if (tsMatch?.[1]) { + mentionThreadTs.add(tsMatch[1]); + } + } + + return [...mentionThreadTs]; +} + // ── Test helpers ──────────────────────────────────────────────────────────── // ── Tests ─────────────────────────────────────────────────────────────────── @@ -312,6 +388,95 @@ Not part of JSON.`; }); }); +describe("heartbeat v2: unanswered mention log parsing", () => { + it("extracts app_mention ts from broker-bridge log format", () => { + const log = + "[2026-02-28T21:10:00.000Z] 👤 message from <@U123> in C123 (type: app_mention, thread_ts: 1772313000.000001, ts: 1772313000.123456)"; + assert.deepEqual(extractMentionThreadTs(log), ["1772313000.000001"]); + }); + + it("falls back to message ts when thread_ts is absent", () => { + const log = "[2026-02-28T21:10:00.000Z] 👤 message from <@U123> in C123 (type: app_mention, ts: 1772313000.123456)"; + assert.deepEqual(extractMentionThreadTs(log), ["1772313000.123456"]); + }); + + it("extracts app_mention ts from socket-mode bridge log format", () => { + const log = "📣 app_mention from <@U123> in C123 ts: 1772313001.654321"; + assert.deepEqual(extractMentionThreadTs(log), ["1772313001.654321"]); + }); + + it("prefers thread_ts over message ts when both are present", () => { + const log = + "📣 app_mention from <@U123> in C123 thread_ts: 1772313000.000001 ts: 1772313001.654321"; + assert.deepEqual(extractMentionThreadTs(log), ["1772313000.000001"]); + }); + + it("ignores non-app_mention log lines", () => { + const log = [ + "💬 from <@U123>: hello", + "[2026-02-28T21:10:00.000Z] 👤 message from <@U123> in C123 (type: message, ts: 1772313000.123456)", + "🧵 Registered thread-1 → channel=C123 thread_ts=1772313000.123456", + ].join("\n"); + assert.deepEqual(extractMentionThreadTs(log), []); + }); + + it("converts slack ts to milliseconds", () => { + assert.equal(slackTsToMs("1772313000.123456"), 1772313000123); + assert.equal(slackTsToMs("0"), null); + assert.equal(slackTsToMs("not-a-ts"), null); + }); +}); + +describe("heartbeat v2: unanswered mention reply detection", () => { + it("matches exact thread_ts entries in reply log jsonl", () => { + const log = [ + '{"thread_ts":"1234.5678","replied_at":"2026-02-27T00:00:00Z"}', + '{"thread_ts":"2345.6789","replied_at":"2026-02-27T00:05:00Z"}', + ].join("\n"); + + assert.equal(hasReplyLogEntry(log, "1234.5678"), true); + assert.equal(hasReplyLogEntry(log, "9999.0000"), false); + }); + + it("ignores malformed reply-log lines", () => { + const log = ['{"thread_ts":"1234.5678"}', 'not-json', '{"thread_ts":"2345.6789"}'].join("\n"); + assert.equal(hasReplyLogEntry(log, "2345.6789"), true); + }); + + it("detects outbound curl /send with matching thread_ts", () => { + const session = JSON.stringify({ + type: "message", + message: { + role: "assistant", + content: [ + { + type: "toolCall", + name: "bash", + arguments: { + command: + "curl -s -X POST http://127.0.0.1:7890/send -H 'Content-Type: application/json' -d '{\"channel\":\"C123\",\"text\":\"hi\",\"thread_ts\":\"1234.5678\"}'", + }, + }, + ], + }, + }); + + assert.equal(hasOutboundSendCommand(session, "1234.5678"), true); + }); + + it("does not treat inbound text containing thread_ts as a reply", () => { + const inboundOnly = JSON.stringify({ + type: "message", + message: { + role: "user", + content: [{ type: "text", text: "inbound event metadata: thread_ts=1234.5678" }], + }, + }); + + assert.equal(hasOutboundSendCommand(inboundOnly, "1234.5678"), false); + }); +}); + describe("heartbeat v2: hasMatchingInProgressTodo logic", () => { // Replicate the matching logic from the extension function matchesWorktree(content, worktreeName) { diff --git a/pi/extensions/heartbeat.ts b/pi/extensions/heartbeat.ts index c529160..f41cfbc 100644 --- a/pi/extensions/heartbeat.ts +++ b/pi/extensions/heartbeat.ts @@ -9,11 +9,13 @@ * 2. Slack bridge — HTTP POST to localhost:7890/send returns 400 * 3. Stale worktrees — ~/workspace/worktrees/ has dirs with no matching in-progress todo * 4. Stuck todos — in-progress for >2 hours with no matching dev-agent session + * 5. Unanswered Slack mentions — app_mention events in bridge log with no reply within 5 min * * Configuration (env vars): * HEARTBEAT_INTERVAL_MS — interval between heartbeats (default: 600000 = 10 min) * HEARTBEAT_ENABLED — set to "0" or "false" to disable (default: enabled) * HEARTBEAT_EXPECTED_SESSIONS — comma-separated session aliases to check (default: "sentry-agent") + * HEARTBEAT_CHECK_UNANSWERED_MENTIONS — enabled by default, set to "0", "false", or "no" to disable * * When all checks pass, zero LLM tokens are consumed. When something fails, * a targeted prompt is injected describing only the failures so the control-agent @@ -37,6 +39,9 @@ const SOCKET_DIR = join(homedir(), ".pi", "session-control"); const WORKTREES_DIR = join(homedir(), "workspace", "worktrees"); const TODOS_DIR = join(homedir(), ".pi", "todos"); const BRIDGE_URL = "http://127.0.0.1:7890/send"; +const BRIDGE_LOG = join(homedir(), ".pi", "agent", "logs", "slack-bridge.log"); +const SESSION_DIR = join(homedir(), ".pi", "agent", "sessions"); +const UNANSWERED_MENTION_THRESHOLD_MS = 5 * 60 * 1000; // 5 minutes type HeartbeatState = { enabled: boolean; @@ -72,6 +77,12 @@ function getExpectedSessions(): string[] { return ["sentry-agent"]; } +function isUnansweredMentionsCheckEnabled(): boolean { + const val = process.env.HEARTBEAT_CHECK_UNANSWERED_MENTIONS?.trim().toLowerCase(); + // Default to enabled unless explicitly disabled + return val !== "0" && val !== "false" && val !== "no"; +} + // ── Health Check Functions ────────────────────────────────────────────────── function checkSessions(): CheckResult[] { @@ -300,6 +311,170 @@ function checkStuckTodos(): CheckResult[] { return results; } +function checkUnansweredMentions(): CheckResult[] { + const results: CheckResult[] = []; + const now = Date.now(); + + if (!existsSync(BRIDGE_LOG)) return results; + + try { + // Read the last 500 lines of the bridge log to find recent app_mention events. + // Support both bridge implementations: + // - broker-bridge.mjs: "... (type: app_mention, ts: 1234.5678)" + // - bridge.mjs: "app_mention ... ts: 1234.5678" + const { execSync } = require("node:child_process"); + const logTail = execSync(`tail -500 "${BRIDGE_LOG}"`, { encoding: "utf-8" }); + + const mentionThreadTsSet = new Set(extractMentionThreadTs(logTail)); + + const oneHourAgo = now - 60 * 60 * 1000; + + // For each recent mention, check if we replied to it. + for (const threadTs of mentionThreadTsSet) { + const mentionTime = slackTsToMs(threadTs); + if (mentionTime == null || mentionTime <= oneHourAgo) continue; + + const age = now - mentionTime; + + // Skip very recent mentions (< 5 min) - agent might still be processing. + if (age < UNANSWERED_MENTION_THRESHOLD_MS) continue; + + // Check if we sent a reply to this thread_ts. + const replied = hasRepliedToThread(threadTs); + + if (!replied) { + const minutesAgo = Math.round(age / (60 * 1000)); + results.push({ + name: `unanswered:${threadTs}`, + ok: false, + detail: `Slack mention at ts ${threadTs} (${minutesAgo} min ago) has no reply — may have been lost during restart`, + }); + } + } + } catch { + // Log read failure or exec error - non-fatal. + // Don't report this as a failure unless we have a specific problem to report. + } + + return results; +} + +function extractMentionThreadTs(logTail: string): string[] { + const mentionThreadTsSet = new Set(); + + for (const line of logTail.split("\n")) { + if (!line.includes("app_mention")) continue; + + const threadMatch = line.match(/\bthread_ts:\s*(\d+\.\d+)/); + if (threadMatch?.[1]) { + mentionThreadTsSet.add(threadMatch[1]); + continue; + } + + const tsMatch = line.match(/\bts:\s*(\d+\.\d+)/); + if (tsMatch?.[1]) { + mentionThreadTsSet.add(tsMatch[1]); + } + } + + return [...mentionThreadTsSet]; +} + +function slackTsToMs(ts: string): number | null { + const parsed = Number.parseFloat(ts); + if (!Number.isFinite(parsed) || parsed <= 0) return null; + return Math.floor(parsed * 1000); +} + +function hasRepliedToThread(threadTs: string): boolean { + // Check multiple sources for evidence of a reply to this thread_ts. + + // 1. Check the reply tracking log (most reliable — written by the agent). + // File: ~/.pi/agent/slack-reply-log.jsonl + // Each line: {"thread_ts":"...","replied_at":"..."} + const replyLogPath = join(homedir(), ".pi", "agent", "slack-reply-log.jsonl"); + if (existsSync(replyLogPath)) { + try { + const content = readFileSync(replyLogPath, "utf-8"); + const lines = content.split("\n"); + for (const line of lines) { + const trimmed = line.trim(); + if (!trimmed) continue; + try { + const entry = JSON.parse(trimmed); + if (entry?.thread_ts === threadTs) { + return true; + } + } catch { + // Ignore malformed JSONL lines and keep scanning. + } + } + } catch { + // File read error — fall through to other checks + } + } + + // 2. Check recent control-agent session logs for explicit outbound /send calls. + // Session files are in ~/.pi/agent/sessions/--home-baudbot_agent--/ + // and named _.jsonl. + const controlAgentSessionDir = join(SESSION_DIR, "--home-baudbot_agent--"); + if (existsSync(controlAgentSessionDir)) { + const escapeRegExp = (value: string): string => value.replace(/[.*+?^${}()|[\]\\]/g, "\\$&"); + const threadTsPattern = new RegExp(`["']thread_ts["']\\s*:\\s*["']${escapeRegExp(threadTs)}["']`); + + try { + const sessionFiles = readdirSync(controlAgentSessionDir) + .filter((f) => f.endsWith(".jsonl")) + .sort() + .reverse() + .slice(0, 3); // Check last 3 sessions + + for (const file of sessionFiles) { + try { + const content = readFileSync(join(controlAgentSessionDir, file), "utf-8"); + const lines = content.split("\n"); + + for (const line of lines) { + const trimmed = line.trim(); + if (!trimmed) continue; + + let parsed: any; + try { + parsed = JSON.parse(trimmed); + } catch { + continue; + } + + if (parsed?.type !== "message") continue; + if (parsed?.message?.role !== "assistant") continue; + + const items = parsed?.message?.content; + if (!Array.isArray(items)) continue; + + for (const item of items) { + if (item?.type !== "toolCall") continue; + if (item?.name !== "bash") continue; + + const command = typeof item?.arguments?.command === "string" ? item.arguments.command : ""; + if (!command.includes("curl")) continue; + if (!command.includes("/send")) continue; + if (!threadTsPattern.test(command)) continue; + + return true; + } + } + } catch { + // File read error - skip + } + } + } catch { + // Dir read error + } + } + + return false; +} + // ── Helper Functions ──────────────────────────────────────────────────────── function hasMatchingTodo(devAgentName: string): boolean { @@ -412,12 +587,16 @@ export default function heartbeatExtension(pi: ExtensionAPI): void { const bridgeResult = await checkBridge(); const worktreeResults = checkWorktrees(); const stuckTodoResults = checkStuckTodos(); + const unansweredMentionResults = isUnansweredMentionsCheckEnabled() + ? checkUnansweredMentions() + : []; const allResults: CheckResult[] = [ ...sessionResults, bridgeResult, ...worktreeResults, ...stuckTodoResults, + ...unansweredMentionResults, ]; const failures = allResults.filter((r) => !r.ok); @@ -550,12 +729,16 @@ export default function heartbeatExtension(pi: ExtensionAPI): void { const bridgeResult = await checkBridge(); const worktreeResults = checkWorktrees(); const stuckTodoResults = checkStuckTodos(); + const unansweredMentionResults = isUnansweredMentionsCheckEnabled() + ? checkUnansweredMentions() + : []; const allResults: CheckResult[] = [ ...sessionResults, bridgeResult, ...worktreeResults, ...stuckTodoResults, + ...unansweredMentionResults, ]; const failures = allResults.filter((r) => !r.ok); @@ -592,6 +775,7 @@ export default function heartbeatExtension(pi: ExtensionAPI): void { case "config": { const expected = getExpectedSessions(); + const checkUnanswered = isUnansweredMentionsCheckEnabled(); return { content: [ { @@ -604,11 +788,15 @@ export default function heartbeatExtension(pi: ExtensionAPI): void { ` Backoff multiplier: ${BACKOFF_MULTIPLIER}x per error`, ` Max backoff: ${MAX_BACKOFF_MS / 1000}s`, ` Expected sessions: ${expected.join(", ")} (env: HEARTBEAT_EXPECTED_SESSIONS)`, + ` Check unanswered mentions: ${checkUnanswered ? "enabled" : "disabled"} (env: HEARTBEAT_CHECK_UNANSWERED_MENTIONS)`, + ` Unanswered mention threshold: ${UNANSWERED_MENTION_THRESHOLD_MS / (60 * 1000)} min`, ` Stuck todo threshold: ${STUCK_TODO_THRESHOLD_MS / (60 * 60 * 1000)}h`, ` Bridge URL: ${BRIDGE_URL}`, + ` Bridge log: ${BRIDGE_LOG}`, ` Socket dir: ${SOCKET_DIR}`, ` Worktrees dir: ${WORKTREES_DIR}`, ` Todos dir: ${TODOS_DIR}`, + ` Session dir: ${SESSION_DIR}`, ].join("\n"), }, ], diff --git a/slack-bridge/bridge.mjs b/slack-bridge/bridge.mjs index cb436e6..665f80e 100644 --- a/slack-bridge/bridge.mjs +++ b/slack-bridge/bridge.mjs @@ -304,8 +304,10 @@ async function handleMessage(userMessage, event, say) { console.warn(`👀 eyes reaction failed: ${err.message}`); }); + const threadTs = event.thread_ts || event.ts; + // Track this message so we can add ✅ when the agent replies. - const threadKey = `${event.channel}:${event.thread_ts || event.ts}`; + const threadKey = `${event.channel}:${threadTs}`; pendingAckReactions.set(threadKey, { channel: event.channel, messageTs: event.ts, @@ -328,11 +330,11 @@ async function handleMessage(userMessage, event, say) { source: "Slack", user: event.user, channel: event.channel, - threadTs: event.ts, + threadTs, }); // Enrich with friendly thread ID so the agent can use /reply endpoint - const threadId = getThreadId(event.channel, event.thread_ts || event.ts); + const threadId = getThreadId(event.channel, threadTs); const contextMessage = `${wrappedMessage}\n[Bridge-Thread-ID: ${threadId}]`; // Fire-and-forget: deliver to agent, which will reply to Slack itself via /send API. @@ -346,6 +348,11 @@ async function handleMessage(userMessage, event, say) { // Handle @mentions app.event("app_mention", async ({ event, say }) => { + const threadTs = event.thread_ts || event.ts; + console.log( + `📣 app_mention from <@${event.user || "unknown"}> in ${event.channel || "n/a"} thread_ts: ${threadTs} ts: ${event.ts}` + ); + const userMessage = cleanMessage(event.text); if (!userMessage) { await say({ text: "👋 I'm here! Send me a message.", thread_ts: event.ts }); diff --git a/slack-bridge/broker-bridge.mjs b/slack-bridge/broker-bridge.mjs index 0306e0a..33ea2b2 100755 --- a/slack-bridge/broker-bridge.mjs +++ b/slack-bridge/broker-bridge.mjs @@ -716,7 +716,10 @@ function sanitizeOutboundMessage(text, contextLabel) { } async function handleUserMessage(userMessage, event) { - logInfo(`👤 message from <@${event.user}> in ${event.channel} (type: ${event.type}, ts: ${event.ts})`); + const threadTs = event.thread_ts || event.ts; + logInfo( + `👤 message from <@${event.user}> in ${event.channel} (type: ${event.type}, thread_ts: ${threadTs}, ts: ${event.ts})` + ); if (!isAllowed(event.user, ALLOWED_USERS)) { logWarn(`🚫 user <@${event.user}> not in allowed list — rejecting`); @@ -742,7 +745,7 @@ async function handleUserMessage(userMessage, event) { }); // Track this message so we can add ✅ when the agent replies. - const threadKey = `${ackChannel}:${event.thread_ts || ackMessageTs}`; + const threadKey = `${ackChannel}:${threadTs}`; pendingAckReactions.set(threadKey, { channel: ackChannel, messageTs: ackMessageTs, @@ -763,10 +766,10 @@ async function handleUserMessage(userMessage, event) { source: "Slack (broker)", user: event.user, channel: event.channel, - threadTs: event.ts, + threadTs, }); - const threadId = getThreadId(event.channel, event.thread_ts || event.ts); + const threadId = getThreadId(event.channel, threadTs); const contextMessage = `${wrappedMessage}\n[Bridge-Thread-ID: ${threadId}]`; // Fire-and-forget: deliver to agent, which will reply to Slack itself via /send API.