diff --git a/bin/slack-cli-stream b/bin/slack-cli-stream index 2222e02..652608b 100644 --- a/bin/slack-cli-stream +++ b/bin/slack-cli-stream @@ -26,6 +26,7 @@ program .option("--refresh-interval ", "Refresh Slack metadata interval in minutes (default: 15)") .option("--mcp-port ", "Enable MCP server on the given port (e.g. 3737)") .option("--status", "Show Slack usage status for the past week and exit") + .option("--backfill-from ", "Force backfill from the specified date/time (e.g. \"2026-05-28\" or \"2026-05-28 09:00\")") .usage("[options] ") .parse(process.argv); diff --git a/lib/core.js b/lib/core.js index 6aa4804..89a4cef 100644 --- a/lib/core.js +++ b/lib/core.js @@ -209,11 +209,23 @@ core.start = async (commander) => { const BACKFILL_GAP_THRESHOLD = 5 * 60; // 5分以上のギャップがあればバックフィルを実行 let backfillGapSeconds = null; + let lastHeartbeatTs = null; // 前回ハートビートのUnixタイムスタンプ(秒) + + let forceSince = null; // --backfill-from で指定された開始時刻(Unixタイムスタンプ秒) + if (options.backfillFrom) { + const parsed = moment(options.backfillFrom, ["YYYY-MM-DD HH:mm", "YYYY-MM-DD"], true); + if (!parsed.isValid()) { + console.error(`Error: --backfill-from の日時形式が不正です: "${options.backfillFrom}" (例: "2026-05-28" または "2026-05-28 09:00")`); + process.exit(1); + } + forceSince = parsed.unix(); + } if (options.logSqlite) { sqliteDb = initSqliteDb(options.logSqlite); const lastHeartbeat = getLastAppHeartbeat(sqliteDb); if (lastHeartbeat !== null) { + lastHeartbeatTs = lastHeartbeat; backfillGapSeconds = Date.now() / 1000 - lastHeartbeat; } updateAppHeartbeat(sqliteDb); @@ -557,18 +569,30 @@ core.start = async (commander) => { let isBackfilling = false; - const runBackfill = async (label) => { + const runBackfill = async (label, manualSince) => { if (isBackfilling || !sqliteDb) return; isBackfilling = true; try { const lastTsMap = getLastSlackTsPerChannel(sqliteDb); - // 過去24時間以内に活動があったチャンネルのみ対象にすることで呼び出し数を抑制 - const cutoff = Date.now() / 1000 - 24 * 3600; - const channelIds = Object.keys(lastTsMap).filter(id => parseFloat(lastTsMap[id]) > cutoff); + + let channelIds; + if (manualSince !== undefined && manualSince !== null) { + // 手動バックフィル: カットオフフィルターをスキップして全チャンネルを対象にする + channelIds = Object.keys(lastTsMap); + } else { + // 前回ハートビート時刻を基準に24時間前をカットオフとする。 + // 固定の「現在-24h」にすると停止期間が24h超の場合に全チャンネルが除外されるため。 + const anchor = lastHeartbeatTs !== null ? lastHeartbeatTs : Date.now() / 1000; + const cutoff = anchor - 24 * 3600; + channelIds = Object.keys(lastTsMap).filter(id => parseFloat(lastTsMap[id]) > cutoff); + } if (channelIds.length === 0) return; - process.stdout.write(`${label} Backfilling messages for ${channelIds.length} channel(s) in background...\n`); + const sinceLabel = manualSince !== undefined && manualSince !== null + ? ` from ${moment(manualSince * 1000).format("YYYY-MM-DD HH:mm")}` + : ""; + process.stdout.write(`${label} Backfilling messages for ${channelIds.length} channel(s)${sinceLabel} in background...\n`); let totalFetched = 0; let processedChannels = 0; let earliestTs = null; @@ -645,14 +669,25 @@ core.start = async (commander) => { if (latestTs === null || ts > latestTs) latestTs = ts; }; + const printProgress = () => { + const pct = Math.round(processedChannels / channelIds.length * 100); + process.stdout.write( + `Backfill progress: ${processedChannels}/${channelIds.length} channels (${pct}%), ${totalFetched} messages fetched\n` + ); + }; + const fetchChannel = async (channelId) => { - const lastTs = lastTsMap[channelId]; + const dbLastTs = lastTsMap[channelId]; + // 手動指定時: max(manualSince, dbLastTs) で重複挿入を防ぐ + const oldest = (manualSince !== undefined && manualSince !== null) + ? (dbLastTs && parseFloat(dbLastTs) > manualSince ? dbLastTs : String(manualSince)) + : dbLastTs; let cursor; let channelMessages = []; try { do { - const params = { channel: channelId, oldest: lastTs, limit: 200 }; + const params = { channel: channelId, oldest, limit: 200 }; if (cursor) params.cursor = cursor; const res = await web.conversations.history(params); channelMessages = channelMessages.concat(res.messages || []); @@ -667,15 +702,11 @@ core.start = async (commander) => { processedChannels++; }; - const progressInterval = setInterval(() => { - const range = earliestTs - ? `${moment(earliestTs * 1000).format("YYYY-MM-DD HH:mm")} 〜 ${moment(latestTs * 1000).format("YYYY-MM-DD HH:mm")}` - : "no messages yet"; - process.stdout.write(`Backfill progress: ${processedChannels}/${channelIds.length} channels, time range: ${range}\n`); - }, 60 * 1000); + const progressInterval = setInterval(printProgress, 30 * 1000); for (let i = 0; i < channelIds.length; i += CONCURRENCY) { await Promise.all(channelIds.slice(i, i + CONCURRENCY).map(fetchChannel)); + printProgress(); } clearInterval(progressInterval); @@ -694,9 +725,15 @@ core.start = async (commander) => { } }; - // 起動時差分バックフィル: 前回停止からのギャップが閾値を超えた場合のみ実行 - if (sqliteDb && backfillGapSeconds !== null && backfillGapSeconds > BACKFILL_GAP_THRESHOLD) { - // バックグラウンドで実行 (RTM の起動を待たせない) + // 手動バックフィル: --backfill-from が指定された場合は強制実行 + if (sqliteDb && forceSince !== null) { + if (!options.logSqlite) { + console.error("Error: --backfill-from requires --log-sqlite"); + process.exit(1); + } + runBackfill("[Manual]", forceSince).catch(() => {}); + } else if (sqliteDb && backfillGapSeconds !== null && backfillGapSeconds > BACKFILL_GAP_THRESHOLD) { + // 起動時差分バックフィル: 前回停止からのギャップが閾値を超えた場合のみ実行 runBackfill("[Startup]").catch(() => {}); } @@ -712,6 +749,7 @@ core.start = async (commander) => { lastSleepCheckAt = now; if (elapsed > RESUME_THRESHOLD_MS) { + lastHeartbeatTs = (now - elapsed) / 1000; process.stdout.write("System resume detected. Running backfill...\n"); runBackfill("[Resume]").catch(() => {}); }