diff --git a/lib/core.js b/lib/core.js index 7259dcf..1ff8bb4 100644 --- a/lib/core.js +++ b/lib/core.js @@ -9,7 +9,7 @@ let cli = require("./cli.js"); const path = require("path"); const fs = require("fs"); const exec = require("child_process").exec; -const { initSqliteDb, logMessageSqlite } = require("./sqlite-logger"); +const { initSqliteDb, logMessageSqlite, getLastSlackTsPerChannel, updateAppHeartbeat, getLastAppHeartbeat } = require("./sqlite-logger"); const { startMcpServer } = require("./mcp-server"); let sqliteDb = null; @@ -207,8 +207,17 @@ core.start = async (commander) => { } } + const BACKFILL_GAP_THRESHOLD = 5 * 60; // 5分以上のギャップがあればバックフィルを実行 + let backfillGapSeconds = null; + if (options.logSqlite) { sqliteDb = initSqliteDb(options.logSqlite); + const lastHeartbeat = getLastAppHeartbeat(sqliteDb); + if (lastHeartbeat !== null) { + backfillGapSeconds = Date.now() / 1000 - lastHeartbeat; + } + updateAppHeartbeat(sqliteDb); + setInterval(() => updateAppHeartbeat(sqliteDb), 60 * 1000); } const mcpPort = options.mcpPort @@ -546,7 +555,135 @@ core.start = async (commander) => { await refreshSlackData(); setInterval(refreshSlackData, refreshIntervalMinutes * 60 * 1000); - // complete + // 起動時差分バックフィル: 前回停止からのギャップが閾値を超えた場合のみ実行 + if (sqliteDb && backfillGapSeconds !== null && backfillGapSeconds > BACKFILL_GAP_THRESHOLD) { + const lastTsMap = getLastSlackTsPerChannel(sqliteDb); + // 過去24時間以内に活動があったチャンネルのみ対象にすることで呼び出し数を抑制 + const cutoff = Date.now() / 1000 - 24 * 3600; + const channelIds = Object.keys(lastTsMap).filter(id => parseFloat(lastTsMap[id]) > cutoff); + + if (channelIds.length > 0) { + process.stdout.write(`Backfilling messages for ${channelIds.length} channel(s) in background...\n`); + let totalFetched = 0; + let processedChannels = 0; + let earliestTs = null; + let latestTs = null; + const CONCURRENCY = 5; + + const processMessage = (message, channelId) => { + if (message.subtype) { + switch (message.subtype) { + case "message_deleted": + case "message_changed": + case "message_replied": + case "reply_broadcast": + return; + } + } + + let text = util.parseText(message); + if (text === "" && message.attachments && message.attachments.length > 0) { + text = message.attachments[0].text || ""; + } + + let lines = typeof text === "string" ? text.split(/\r\n|\r|\n/) : [""]; + let fullLines = lines; + if (lines.length > 8) { + lines = lines.slice(0, 5); + lines.push("--- snip ---"); + } + + if (options.user) { + const messageUser = util.users[message.user] ? util.users[message.user].name : "-"; + if (options.user !== messageUser) return; + } + + const data = { + bufferKey: resolveChannelLabelKey(channelId), + lines, + fullLines, + time: moment(parseFloat(message.ts) * 1000), + channel: channelId, + user: message.user, + slackTs: message.ts || null, + threadTs: message.thread_ts || null + }; + + util.addMessageBuffer(data); + + // バックフィル時は標準出力を省略し、ログへの書き込みのみ行う + const plainDateFormat = data.time.format("YYYY-MM-DD HH:mm:ss"); + const plainChannel = resolveChannelLabelKey(channelId); + const plainName = util.users[data.user] + ? util.users[data.user].name + : (typeof data.user === "string" ? data.user : "-"); + (data.fullLines || data.lines).forEach((line) => { + let l = emoji.emojify(line); + l = util.replaceSlackId(l); + l = util.decolateText(l); + const plainLine = removeEscapeSequences(l); + if (options.log) { + logMessage(options.log, plainDateFormat, plainChannel, plainName, plainLine); + } + if (options.logSqlite && sqliteDb) { + logMessageSqlite(sqliteDb, plainDateFormat, plainChannel, plainName, plainLine, + channelId, data.user || null, data.slackTs, data.threadTs); + } + }); + + totalFetched++; + + const ts = parseFloat(message.ts); + if (earliestTs === null || ts < earliestTs) earliestTs = ts; + if (latestTs === null || ts > latestTs) latestTs = ts; + }; + + const fetchChannel = async (channelId) => { + const lastTs = lastTsMap[channelId]; + let cursor; + let channelMessages = []; + + try { + do { + const params = { channel: channelId, oldest: lastTs, limit: 200 }; + if (cursor) params.cursor = cursor; + const res = await web.conversations.history(params); + channelMessages = channelMessages.concat(res.messages || []); + cursor = res.has_more ? res.response_metadata.next_cursor : null; + } while (cursor); + } catch (err) { + return; + } + + // 古い順に並べ替えて表示 + channelMessages.reverse().forEach(msg => processMessage(msg, channelId)); + processedChannels++; + }; + + // バックグラウンドで実行 (RTM の起動を待たせない) + (async () => { + const progressInterval = setInterval(() => { + const range = earliestTs + ? `${moment(earliestTs * 1000).format("YYYY-MM-DD HH:mm")} 〜 ${moment(latestTs * 1000).format("YYYY-MM-DD HH:mm")}` + : "no messages yet"; + process.stdout.write(`Backfill progress: ${processedChannels}/${channelIds.length} channels, time range: ${range}\n`); + }, 60 * 1000); + + for (let i = 0; i < channelIds.length; i += CONCURRENCY) { + await Promise.all(channelIds.slice(i, i + CONCURRENCY).map(fetchChannel)); + } + + clearInterval(progressInterval); + if (totalFetched > 0) { + process.stdout.write(`Backfill complete: ${totalFetched} message(s) fetched.\n`); + } else { + process.stdout.write("Backfill complete: no new messages.\n"); + } + })(); + } + } + + // complete rtm.start(); }; diff --git a/lib/sqlite-logger.js b/lib/sqlite-logger.js index 3c3ebfe..66ce727 100644 --- a/lib/sqlite-logger.js +++ b/lib/sqlite-logger.js @@ -16,6 +16,12 @@ const initSqliteDb = (dbPath) => { created_at TEXT DEFAULT (datetime('now', 'localtime')) ) `); + db.exec(` + CREATE TABLE IF NOT EXISTS app_heartbeat ( + id INTEGER PRIMARY KEY CHECK (id = 1), + last_seen_at REAL NOT NULL + ) + `); db.exec("CREATE INDEX IF NOT EXISTS idx_logged_at ON messages (logged_at)"); db.exec("CREATE INDEX IF NOT EXISTS idx_channel ON messages (channel)"); db.exec("CREATE INDEX IF NOT EXISTS idx_user ON messages (user)"); @@ -53,4 +59,25 @@ const logMessageSqlite = (db, time, channel, user, message, channelId, userId, s ); }; -module.exports = { initSqliteDb, logMessageSqlite }; +const updateAppHeartbeat = (db) => { + db.prepare("INSERT OR REPLACE INTO app_heartbeat (id, last_seen_at) VALUES (1, ?)").run(Date.now() / 1000); +}; + +const getLastAppHeartbeat = (db) => { + const row = db.prepare("SELECT last_seen_at FROM app_heartbeat WHERE id = 1").get(); + return row ? row.last_seen_at : null; +}; + +const getLastSlackTsPerChannel = (db) => { + const rows = db.prepare(` + SELECT channel_id, MAX(slack_ts) AS last_ts + FROM messages + WHERE channel_id IS NOT NULL AND slack_ts IS NOT NULL + GROUP BY channel_id + `).all(); + const map = {}; + rows.forEach(row => { map[row.channel_id] = row.last_ts; }); + return map; +}; + +module.exports = { initSqliteDb, logMessageSqlite, getLastSlackTsPerChannel, updateAppHeartbeat, getLastAppHeartbeat };