Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions bin/slack-cli-stream
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ program
.option("--refresh-interval <minutes>", "Refresh Slack metadata interval in minutes (default: 15)")
.option("--mcp-port <port>", "Enable MCP server on the given port (e.g. 3737)")
.option("--status", "Show Slack usage status for the past week and exit")
.option("--backfill-from <datetime>", "Force backfill from the specified date/time (e.g. \"2026-05-28\" or \"2026-05-28 09:00\")")
.usage("[options] <parameters>")
.parse(process.argv);

Expand Down
70 changes: 54 additions & 16 deletions lib/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -209,11 +209,23 @@ core.start = async (commander) => {

const BACKFILL_GAP_THRESHOLD = 5 * 60; // 5分以上のギャップがあればバックフィルを実行
let backfillGapSeconds = null;
let lastHeartbeatTs = null; // 前回ハートビートのUnixタイムスタンプ(秒)

let forceSince = null; // --backfill-from で指定された開始時刻(Unixタイムスタンプ秒)
if (options.backfillFrom) {
const parsed = moment(options.backfillFrom, ["YYYY-MM-DD HH:mm", "YYYY-MM-DD"], true);
if (!parsed.isValid()) {
console.error(`Error: --backfill-from の日時形式が不正です: "${options.backfillFrom}" (例: "2026-05-28" または "2026-05-28 09:00")`);
process.exit(1);
}
forceSince = parsed.unix();
}

if (options.logSqlite) {
sqliteDb = initSqliteDb(options.logSqlite);
const lastHeartbeat = getLastAppHeartbeat(sqliteDb);
if (lastHeartbeat !== null) {
lastHeartbeatTs = lastHeartbeat;
backfillGapSeconds = Date.now() / 1000 - lastHeartbeat;
}
updateAppHeartbeat(sqliteDb);
Expand Down Expand Up @@ -557,18 +569,30 @@ core.start = async (commander) => {

let isBackfilling = false;

const runBackfill = async (label) => {
const runBackfill = async (label, manualSince) => {
if (isBackfilling || !sqliteDb) return;
isBackfilling = true;
try {
const lastTsMap = getLastSlackTsPerChannel(sqliteDb);
// 過去24時間以内に活動があったチャンネルのみ対象にすることで呼び出し数を抑制
const cutoff = Date.now() / 1000 - 24 * 3600;
const channelIds = Object.keys(lastTsMap).filter(id => parseFloat(lastTsMap[id]) > cutoff);

let channelIds;
if (manualSince !== undefined && manualSince !== null) {
// 手動バックフィル: カットオフフィルターをスキップして全チャンネルを対象にする
channelIds = Object.keys(lastTsMap);
} else {
// 前回ハートビート時刻を基準に24時間前をカットオフとする。
// 固定の「現在-24h」にすると停止期間が24h超の場合に全チャンネルが除外されるため。
const anchor = lastHeartbeatTs !== null ? lastHeartbeatTs : Date.now() / 1000;
const cutoff = anchor - 24 * 3600;
channelIds = Object.keys(lastTsMap).filter(id => parseFloat(lastTsMap[id]) > cutoff);
}

if (channelIds.length === 0) return;

process.stdout.write(`${label} Backfilling messages for ${channelIds.length} channel(s) in background...\n`);
const sinceLabel = manualSince !== undefined && manualSince !== null
? ` from ${moment(manualSince * 1000).format("YYYY-MM-DD HH:mm")}`
: "";
process.stdout.write(`${label} Backfilling messages for ${channelIds.length} channel(s)${sinceLabel} in background...\n`);
let totalFetched = 0;
let processedChannels = 0;
let earliestTs = null;
Expand Down Expand Up @@ -645,14 +669,25 @@ core.start = async (commander) => {
if (latestTs === null || ts > latestTs) latestTs = ts;
};

const printProgress = () => {
const pct = Math.round(processedChannels / channelIds.length * 100);
process.stdout.write(
`Backfill progress: ${processedChannels}/${channelIds.length} channels (${pct}%), ${totalFetched} messages fetched\n`
);
};

const fetchChannel = async (channelId) => {
const lastTs = lastTsMap[channelId];
const dbLastTs = lastTsMap[channelId];
// 手動指定時: max(manualSince, dbLastTs) で重複挿入を防ぐ
const oldest = (manualSince !== undefined && manualSince !== null)
? (dbLastTs && parseFloat(dbLastTs) > manualSince ? dbLastTs : String(manualSince))
: dbLastTs;
let cursor;
let channelMessages = [];

try {
do {
const params = { channel: channelId, oldest: lastTs, limit: 200 };
const params = { channel: channelId, oldest, limit: 200 };
if (cursor) params.cursor = cursor;
const res = await web.conversations.history(params);
channelMessages = channelMessages.concat(res.messages || []);
Expand All @@ -667,15 +702,11 @@ core.start = async (commander) => {
processedChannels++;
};

const progressInterval = setInterval(() => {
const range = earliestTs
? `${moment(earliestTs * 1000).format("YYYY-MM-DD HH:mm")} 〜 ${moment(latestTs * 1000).format("YYYY-MM-DD HH:mm")}`
: "no messages yet";
process.stdout.write(`Backfill progress: ${processedChannels}/${channelIds.length} channels, time range: ${range}\n`);
}, 60 * 1000);
const progressInterval = setInterval(printProgress, 30 * 1000);

for (let i = 0; i < channelIds.length; i += CONCURRENCY) {
await Promise.all(channelIds.slice(i, i + CONCURRENCY).map(fetchChannel));
printProgress();
}

clearInterval(progressInterval);
Expand All @@ -694,9 +725,15 @@ core.start = async (commander) => {
}
};

// 起動時差分バックフィル: 前回停止からのギャップが閾値を超えた場合のみ実行
if (sqliteDb && backfillGapSeconds !== null && backfillGapSeconds > BACKFILL_GAP_THRESHOLD) {
// バックグラウンドで実行 (RTM の起動を待たせない)
// 手動バックフィル: --backfill-from が指定された場合は強制実行
if (sqliteDb && forceSince !== null) {
if (!options.logSqlite) {
console.error("Error: --backfill-from requires --log-sqlite");
process.exit(1);
}
runBackfill("[Manual]", forceSince).catch(() => {});
} else if (sqliteDb && backfillGapSeconds !== null && backfillGapSeconds > BACKFILL_GAP_THRESHOLD) {
// 起動時差分バックフィル: 前回停止からのギャップが閾値を超えた場合のみ実行
runBackfill("[Startup]").catch(() => {});
}

Expand All @@ -712,6 +749,7 @@ core.start = async (commander) => {
lastSleepCheckAt = now;

if (elapsed > RESUME_THRESHOLD_MS) {
lastHeartbeatTs = (now - elapsed) / 1000;
process.stdout.write("System resume detected. Running backfill...\n");
runBackfill("[Resume]").catch(() => {});
}
Expand Down
Loading