From 494ff8742ce155e46936eeb0ee69e22a626a3d80 Mon Sep 17 00:00:00 2001 From: Hideaki Terai Date: Sat, 23 May 2026 09:10:31 +0900 Subject: [PATCH 1/2] =?UTF-8?q?=E8=B5=B7=E5=8B=95=E6=99=82=E3=81=AB?= =?UTF-8?q?=E6=9C=AA=E5=8F=96=E5=BE=97=E3=83=A1=E3=83=83=E3=82=BB=E3=83=BC?= =?UTF-8?q?=E3=82=B8=E3=82=92=E3=83=90=E3=83=83=E3=82=AF=E3=83=95=E3=82=A3?= =?UTF-8?q?=E3=83=AB=E3=81=99=E3=82=8B=E6=A9=9F=E8=83=BD=E3=82=92=E8=BF=BD?= =?UTF-8?q?=E5=8A=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 前回停止からのギャップが5分以上の場合のみ、過去24時間以内に活動があったチャンネルを対象に conversations.history API で差分を取得する - app_heartbeat テーブルを SQLite に追加し、起動時・1分ごとにハートビートを記録してギャップを判定する - バックフィルは RTM 起動をブロックしないようバックグラウンドで実行し、1分ごとに進捗・時間帯を標準出力に表示する - 通常の再起動(5分未満)では API 呼び出しゼロ Co-Authored-By: Claude Sonnet 4.6 --- lib/core.js | 121 ++++++++++++++++++++++++++++++++++++++++++- lib/sqlite-logger.js | 29 ++++++++++- 2 files changed, 147 insertions(+), 3 deletions(-) diff --git a/lib/core.js b/lib/core.js index 7259dcf..5f4698b 100644 --- a/lib/core.js +++ b/lib/core.js @@ -9,7 +9,7 @@ let cli = require("./cli.js"); const path = require("path"); const fs = require("fs"); const exec = require("child_process").exec; -const { initSqliteDb, logMessageSqlite } = require("./sqlite-logger"); +const { initSqliteDb, logMessageSqlite, getLastSlackTsPerChannel, updateAppHeartbeat, getLastAppHeartbeat } = require("./sqlite-logger"); const { startMcpServer } = require("./mcp-server"); let sqliteDb = null; @@ -207,8 +207,17 @@ core.start = async (commander) => { } } + const BACKFILL_GAP_THRESHOLD = 5 * 60; // 5分以上のギャップがあればバックフィルを実行 + let backfillGapSeconds = null; + if (options.logSqlite) { sqliteDb = initSqliteDb(options.logSqlite); + const lastHeartbeat = getLastAppHeartbeat(sqliteDb); + if (lastHeartbeat !== null) { + backfillGapSeconds = Date.now() / 1000 - lastHeartbeat; + } + updateAppHeartbeat(sqliteDb); + setInterval(() => updateAppHeartbeat(sqliteDb), 60 * 1000); } const mcpPort = options.mcpPort @@ -546,7 +555,115 @@ core.start = async (commander) => { await refreshSlackData(); setInterval(refreshSlackData, refreshIntervalMinutes * 60 * 1000); - // complete + // 起動時差分バックフィル: 前回停止からのギャップが閾値を超えた場合のみ実行 + if (sqliteDb && backfillGapSeconds !== null && backfillGapSeconds > BACKFILL_GAP_THRESHOLD) { + const lastTsMap = getLastSlackTsPerChannel(sqliteDb); + // 過去24時間以内に活動があったチャンネルのみ対象にすることで呼び出し数を抑制 + const cutoff = Date.now() / 1000 - 24 * 3600; + const channelIds = Object.keys(lastTsMap).filter(id => parseFloat(lastTsMap[id]) > cutoff); + + if (channelIds.length > 0) { + process.stdout.write(`Backfilling messages for ${channelIds.length} channel(s) in background...\n`); + let totalFetched = 0; + let processedChannels = 0; + let earliestTs = null; + let latestTs = null; + const CONCURRENCY = 5; + + const processMessage = (message, channelId) => { + if (message.subtype) { + switch (message.subtype) { + case "message_deleted": + case "message_changed": + case "message_replied": + case "reply_broadcast": + return; + } + } + + let text = util.parseText(message); + if (text === "" && message.attachments && message.attachments.length > 0) { + text = message.attachments[0].text || ""; + } + + let lines = typeof text === "string" ? text.split(/\r\n|\r|\n/) : [""]; + let fullLines = lines; + if (lines.length > 8) { + lines = lines.slice(0, 5); + lines.push("--- snip ---"); + } + + if (options.user) { + const messageUser = util.users[message.user] ? util.users[message.user].name : "-"; + if (options.user !== messageUser) return; + } + + const data = { + bufferKey: resolveChannelLabelKey(channelId), + lines, + fullLines, + time: moment(parseFloat(message.ts) * 1000), + channel: channelId, + user: message.user, + slackTs: message.ts || null, + threadTs: message.thread_ts || null + }; + + util.addMessageBuffer(data); + core.display(data, options); + totalFetched++; + + const ts = parseFloat(message.ts); + if (earliestTs === null || ts < earliestTs) earliestTs = ts; + if (latestTs === null || ts > latestTs) latestTs = ts; + }; + + const fetchChannel = async (channelId) => { + const lastTs = lastTsMap[channelId]; + let cursor; + let channelMessages = []; + + try { + do { + const params = { channel: channelId, oldest: lastTs, limit: 200 }; + if (cursor) params.cursor = cursor; + const res = await web.conversations.history(params); + channelMessages = channelMessages.concat(res.messages || []); + cursor = res.has_more ? res.response_metadata.next_cursor : null; + } while (cursor); + } catch (err) { + return; + } + + // 古い順に並べ替えて表示 + channelMessages.reverse().forEach(msg => processMessage(msg, channelId)); + processedChannels++; + }; + + // バックグラウンドで実行 (RTM の起動を待たせない) + (async () => { + const progressInterval = setInterval(() => { + const range = earliestTs + ? `${moment(earliestTs * 1000).format("YYYY-MM-DD HH:mm")} 〜 ${moment(latestTs * 1000).format("YYYY-MM-DD HH:mm")}` + : "no messages yet"; + process.stdout.write(`Backfill progress: ${processedChannels}/${channelIds.length} channels, time range: ${range}\n`); + }, 60 * 1000); + + for (let i = 0; i < channelIds.length; i += CONCURRENCY) { + await Promise.all(channelIds.slice(i, i + CONCURRENCY).map(fetchChannel)); + } + + clearInterval(progressInterval); + if (totalFetched > 0) { + process.stdout.write(`Backfill complete: ${totalFetched} message(s) fetched.\n`); + } else { + process.stdout.write("Backfill complete: no new messages.\n"); + } + })(); + } + } + + // complete rtm.start(); }; diff --git a/lib/sqlite-logger.js b/lib/sqlite-logger.js index 3c3ebfe..66ce727 100644 --- a/lib/sqlite-logger.js +++ b/lib/sqlite-logger.js @@ -16,6 +16,12 @@ const initSqliteDb = (dbPath) => { created_at TEXT DEFAULT (datetime('now', 'localtime')) ) `); + db.exec(` + CREATE TABLE IF NOT EXISTS app_heartbeat ( + id INTEGER PRIMARY KEY CHECK (id = 1), + last_seen_at REAL NOT NULL + ) + `); db.exec("CREATE INDEX IF NOT EXISTS idx_logged_at ON messages (logged_at)"); db.exec("CREATE INDEX IF NOT EXISTS idx_channel ON messages (channel)"); db.exec("CREATE INDEX IF NOT EXISTS idx_user ON messages (user)"); @@ -53,4 +59,25 @@ const logMessageSqlite = (db, time, channel, user, message, channelId, userId, s ); }; -module.exports = { initSqliteDb, logMessageSqlite }; +const updateAppHeartbeat = (db) => { + db.prepare("INSERT OR REPLACE INTO app_heartbeat (id, last_seen_at) VALUES (1, ?)").run(Date.now() / 1000); +}; + +const getLastAppHeartbeat = (db) => { + const row = db.prepare("SELECT last_seen_at FROM app_heartbeat WHERE id = 1").get(); + return row ? row.last_seen_at : null; +}; + +const getLastSlackTsPerChannel = (db) => { + const rows = db.prepare(` + SELECT channel_id, MAX(slack_ts) AS last_ts + FROM messages + WHERE channel_id IS NOT NULL AND slack_ts IS NOT NULL + GROUP BY channel_id + `).all(); + const map = {}; + rows.forEach(row => { map[row.channel_id] = row.last_ts; }); + return map; +}; + +module.exports = { initSqliteDb, logMessageSqlite, getLastSlackTsPerChannel, updateAppHeartbeat, getLastAppHeartbeat }; From 4fc7469ae2367ce173d322c59a20fc6a57b00a23 Mon Sep 17 00:00:00 2001 From: Hideaki Terai Date: Sat, 23 May 2026 09:15:36 +0900 Subject: [PATCH 2/2] =?UTF-8?q?=E3=83=90=E3=83=83=E3=82=AF=E3=83=95?= =?UTF-8?q?=E3=82=A3=E3=83=AB=E6=99=82=E3=81=AE=E3=83=A1=E3=83=83=E3=82=BB?= =?UTF-8?q?=E3=83=BC=E3=82=B8=E3=82=92=E6=A8=99=E6=BA=96=E5=87=BA=E5=8A=9B?= =?UTF-8?q?=E3=81=97=E3=81=AA=E3=81=84=E3=82=88=E3=81=86=E4=BF=AE=E6=AD=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit バックフィル中の個々のメッセージ表示を抑制し、SQLite/TSV へのログ書き込みのみ行う。 進捗・完了メッセージは引き続き標準出力に表示する。 Co-Authored-By: Claude Sonnet 4.6 --- lib/core.js | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/lib/core.js b/lib/core.js index 5f4698b..1ff8bb4 100644 --- a/lib/core.js +++ b/lib/core.js @@ -610,7 +610,27 @@ core.start = async (commander) => { }; util.addMessageBuffer(data); - core.display(data, options); + + // バックフィル時は標準出力を省略し、ログへの書き込みのみ行う + const plainDateFormat = data.time.format("YYYY-MM-DD HH:mm:ss"); + const plainChannel = resolveChannelLabelKey(channelId); + const plainName = util.users[data.user] + ? util.users[data.user].name + : (typeof data.user === "string" ? data.user : "-"); + (data.fullLines || data.lines).forEach((line) => { + let l = emoji.emojify(line); + l = util.replaceSlackId(l); + l = util.decolateText(l); + const plainLine = removeEscapeSequences(l); + if (options.log) { + logMessage(options.log, plainDateFormat, plainChannel, plainName, plainLine); + } + if (options.logSqlite && sqliteDb) { + logMessageSqlite(sqliteDb, plainDateFormat, plainChannel, plainName, plainLine, + channelId, data.user || null, data.slackTs, data.threadTs); + } + }); + totalFetched++; const ts = parseFloat(message.ts);