Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 60 additions & 32 deletions lib/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -555,15 +555,20 @@ core.start = async (commander) => {
await refreshSlackData();
setInterval(refreshSlackData, refreshIntervalMinutes * 60 * 1000);

// 起動時差分バックフィル: 前回停止からのギャップが閾値を超えた場合のみ実行
if (sqliteDb && backfillGapSeconds !== null && backfillGapSeconds > BACKFILL_GAP_THRESHOLD) {
const lastTsMap = getLastSlackTsPerChannel(sqliteDb);
// 過去24時間以内に活動があったチャンネルのみ対象にすることで呼び出し数を抑制
const cutoff = Date.now() / 1000 - 24 * 3600;
const channelIds = Object.keys(lastTsMap).filter(id => parseFloat(lastTsMap[id]) > cutoff);
let isBackfilling = false;

const runBackfill = async (label) => {
if (isBackfilling || !sqliteDb) return;
isBackfilling = true;
try {
const lastTsMap = getLastSlackTsPerChannel(sqliteDb);
// 過去24時間以内に活動があったチャンネルのみ対象にすることで呼び出し数を抑制
const cutoff = Date.now() / 1000 - 24 * 3600;
const channelIds = Object.keys(lastTsMap).filter(id => parseFloat(lastTsMap[id]) > cutoff);

if (channelIds.length === 0) return;

if (channelIds.length > 0) {
process.stdout.write(`Backfilling messages for ${channelIds.length} channel(s) in background...\n`);
process.stdout.write(`${label} Backfilling messages for ${channelIds.length} channel(s) in background...\n`);
let totalFetched = 0;
let processedChannels = 0;
let earliestTs = null;
Expand Down Expand Up @@ -662,32 +667,55 @@ core.start = async (commander) => {
processedChannels++;
};

// バックグラウンドで実行 (RTM の起動を待たせない)
(async () => {
const progressInterval = setInterval(() => {
const range = earliestTs
? `${moment(earliestTs * 1000).format("YYYY-MM-DD HH:mm")} 〜 ${moment(latestTs * 1000).format("YYYY-MM-DD HH:mm")}`
: "no messages yet";
process.stdout.write(`Backfill progress: ${processedChannels}/${channelIds.length} channels, time range: ${range}\n`);
}, 60 * 1000);

for (let i = 0; i < channelIds.length; i += CONCURRENCY) {
await Promise.all(channelIds.slice(i, i + CONCURRENCY).map(fetchChannel));
}
const progressInterval = setInterval(() => {
const range = earliestTs
? `${moment(earliestTs * 1000).format("YYYY-MM-DD HH:mm")} 〜 ${moment(latestTs * 1000).format("YYYY-MM-DD HH:mm")}`
: "no messages yet";
process.stdout.write(`Backfill progress: ${processedChannels}/${channelIds.length} channels, time range: ${range}\n`);
}, 60 * 1000);

clearInterval(progressInterval);
if (totalFetched > 0) {
process.stdout.write(`Backfill complete: ${totalFetched} message(s) fetched.\n`);
Object.entries(fetchedPerChannel)
.sort((a, b) => b[1] - a[1])
.forEach(([chId, count]) => {
process.stdout.write(` ${resolveChannelLabelKey(chId)}: ${count} message(s)\n`);
});
} else {
process.stdout.write("Backfill complete: no new messages.\n");
}
})();
for (let i = 0; i < channelIds.length; i += CONCURRENCY) {
await Promise.all(channelIds.slice(i, i + CONCURRENCY).map(fetchChannel));
}

clearInterval(progressInterval);
if (totalFetched > 0) {
process.stdout.write(`Backfill complete: ${totalFetched} message(s) fetched.\n`);
Object.entries(fetchedPerChannel)
.sort((a, b) => b[1] - a[1])
.forEach(([chId, count]) => {
process.stdout.write(` ${resolveChannelLabelKey(chId)}: ${count} message(s)\n`);
});
} else {
process.stdout.write("Backfill complete: no new messages.\n");
}
} finally {
isBackfilling = false;
}
};

// 起動時差分バックフィル: 前回停止からのギャップが閾値を超えた場合のみ実行
if (sqliteDb && backfillGapSeconds !== null && backfillGapSeconds > BACKFILL_GAP_THRESHOLD) {
// バックグラウンドで実行 (RTM の起動を待たせない)
runBackfill("[Startup]").catch(() => {});
}

// レジューム検出: タイマードリフトでスリープからの復帰を検知し、5分以上経過していればバックフィル発動
if (sqliteDb) {
const SLEEP_DETECT_INTERVAL_MS = 30 * 1000;
const RESUME_THRESHOLD_MS = BACKFILL_GAP_THRESHOLD * 1000;
let lastSleepCheckAt = Date.now();

setInterval(() => {
const now = Date.now();
const elapsed = now - lastSleepCheckAt;
lastSleepCheckAt = now;

if (elapsed > RESUME_THRESHOLD_MS) {
process.stdout.write("System resume detected. Running backfill...\n");
runBackfill("[Resume]").catch(() => {});
}
}, SLEEP_DETECT_INTERVAL_MS);
}

// complete
Expand Down
Loading