Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 139 additions & 2 deletions lib/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ let cli = require("./cli.js");
const path = require("path");
const fs = require("fs");
const exec = require("child_process").exec;
const { initSqliteDb, logMessageSqlite } = require("./sqlite-logger");
const { initSqliteDb, logMessageSqlite, getLastSlackTsPerChannel, updateAppHeartbeat, getLastAppHeartbeat } = require("./sqlite-logger");
const { startMcpServer } = require("./mcp-server");

let sqliteDb = null;
Expand Down Expand Up @@ -207,8 +207,17 @@ core.start = async (commander) => {
}
}

const BACKFILL_GAP_THRESHOLD = 5 * 60; // 5分以上のギャップがあればバックフィルを実行
let backfillGapSeconds = null;

if (options.logSqlite) {
sqliteDb = initSqliteDb(options.logSqlite);
const lastHeartbeat = getLastAppHeartbeat(sqliteDb);
if (lastHeartbeat !== null) {
backfillGapSeconds = Date.now() / 1000 - lastHeartbeat;
}
updateAppHeartbeat(sqliteDb);
setInterval(() => updateAppHeartbeat(sqliteDb), 60 * 1000);
}

const mcpPort = options.mcpPort
Expand Down Expand Up @@ -546,7 +555,135 @@ core.start = async (commander) => {
await refreshSlackData();
setInterval(refreshSlackData, refreshIntervalMinutes * 60 * 1000);

// complete
// 起動時差分バックフィル: 前回停止からのギャップが閾値を超えた場合のみ実行
if (sqliteDb && backfillGapSeconds !== null && backfillGapSeconds > BACKFILL_GAP_THRESHOLD) {
const lastTsMap = getLastSlackTsPerChannel(sqliteDb);
// 過去24時間以内に活動があったチャンネルのみ対象にすることで呼び出し数を抑制
const cutoff = Date.now() / 1000 - 24 * 3600;
const channelIds = Object.keys(lastTsMap).filter(id => parseFloat(lastTsMap[id]) > cutoff);

if (channelIds.length > 0) {
process.stdout.write(`Backfilling messages for ${channelIds.length} channel(s) in background...\n`);
let totalFetched = 0;
let processedChannels = 0;
let earliestTs = null;
let latestTs = null;
const CONCURRENCY = 5;

const processMessage = (message, channelId) => {
if (message.subtype) {
switch (message.subtype) {
case "message_deleted":
case "message_changed":
case "message_replied":
case "reply_broadcast":
return;
}
}

let text = util.parseText(message);
if (text === "" && message.attachments && message.attachments.length > 0) {
text = message.attachments[0].text || "";
}

let lines = typeof text === "string" ? text.split(/\r\n|\r|\n/) : [""];
let fullLines = lines;
if (lines.length > 8) {
lines = lines.slice(0, 5);
lines.push("--- snip ---");
}

if (options.user) {
const messageUser = util.users[message.user] ? util.users[message.user].name : "-";
if (options.user !== messageUser) return;
}

const data = {
bufferKey: resolveChannelLabelKey(channelId),
lines,
fullLines,
time: moment(parseFloat(message.ts) * 1000),
channel: channelId,
user: message.user,
slackTs: message.ts || null,
threadTs: message.thread_ts || null
};

util.addMessageBuffer(data);

// バックフィル時は標準出力を省略し、ログへの書き込みのみ行う
const plainDateFormat = data.time.format("YYYY-MM-DD HH:mm:ss");
const plainChannel = resolveChannelLabelKey(channelId);
const plainName = util.users[data.user]
? util.users[data.user].name
: (typeof data.user === "string" ? data.user : "-");
(data.fullLines || data.lines).forEach((line) => {
let l = emoji.emojify(line);
l = util.replaceSlackId(l);
l = util.decolateText(l);
const plainLine = removeEscapeSequences(l);
if (options.log) {
logMessage(options.log, plainDateFormat, plainChannel, plainName, plainLine);
}
if (options.logSqlite && sqliteDb) {
logMessageSqlite(sqliteDb, plainDateFormat, plainChannel, plainName, plainLine,
channelId, data.user || null, data.slackTs, data.threadTs);
}
});

totalFetched++;

const ts = parseFloat(message.ts);
if (earliestTs === null || ts < earliestTs) earliestTs = ts;
if (latestTs === null || ts > latestTs) latestTs = ts;
};

const fetchChannel = async (channelId) => {
const lastTs = lastTsMap[channelId];
let cursor;
let channelMessages = [];

try {
do {
const params = { channel: channelId, oldest: lastTs, limit: 200 };
if (cursor) params.cursor = cursor;
const res = await web.conversations.history(params);
channelMessages = channelMessages.concat(res.messages || []);
cursor = res.has_more ? res.response_metadata.next_cursor : null;
} while (cursor);
} catch (err) {
return;
}

// 古い順に並べ替えて表示
channelMessages.reverse().forEach(msg => processMessage(msg, channelId));
processedChannels++;
};

// バックグラウンドで実行 (RTM の起動を待たせない)
(async () => {
const progressInterval = setInterval(() => {
const range = earliestTs
? `${moment(earliestTs * 1000).format("YYYY-MM-DD HH:mm")} 〜 ${moment(latestTs * 1000).format("YYYY-MM-DD HH:mm")}`
: "no messages yet";
process.stdout.write(`Backfill progress: ${processedChannels}/${channelIds.length} channels, time range: ${range}\n`);
}, 60 * 1000);

for (let i = 0; i < channelIds.length; i += CONCURRENCY) {
await Promise.all(channelIds.slice(i, i + CONCURRENCY).map(fetchChannel));
}

clearInterval(progressInterval);
if (totalFetched > 0) {
process.stdout.write(`Backfill complete: ${totalFetched} message(s) fetched.\n`);
} else {
process.stdout.write("Backfill complete: no new messages.\n");
}
})();
}
}

// complete
rtm.start();
};

Expand Down
29 changes: 28 additions & 1 deletion lib/sqlite-logger.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ const initSqliteDb = (dbPath) => {
created_at TEXT DEFAULT (datetime('now', 'localtime'))
)
`);
db.exec(`
CREATE TABLE IF NOT EXISTS app_heartbeat (
id INTEGER PRIMARY KEY CHECK (id = 1),
last_seen_at REAL NOT NULL
)
`);
db.exec("CREATE INDEX IF NOT EXISTS idx_logged_at ON messages (logged_at)");
db.exec("CREATE INDEX IF NOT EXISTS idx_channel ON messages (channel)");
db.exec("CREATE INDEX IF NOT EXISTS idx_user ON messages (user)");
Expand Down Expand Up @@ -53,4 +59,25 @@ const logMessageSqlite = (db, time, channel, user, message, channelId, userId, s
);
};

module.exports = { initSqliteDb, logMessageSqlite };
const updateAppHeartbeat = (db) => {
db.prepare("INSERT OR REPLACE INTO app_heartbeat (id, last_seen_at) VALUES (1, ?)").run(Date.now() / 1000);
};

const getLastAppHeartbeat = (db) => {
const row = db.prepare("SELECT last_seen_at FROM app_heartbeat WHERE id = 1").get();
return row ? row.last_seen_at : null;
};

const getLastSlackTsPerChannel = (db) => {
const rows = db.prepare(`
SELECT channel_id, MAX(slack_ts) AS last_ts
FROM messages
WHERE channel_id IS NOT NULL AND slack_ts IS NOT NULL
GROUP BY channel_id
`).all();
const map = {};
rows.forEach(row => { map[row.channel_id] = row.last_ts; });
return map;
};

module.exports = { initSqliteDb, logMessageSqlite, getLastSlackTsPerChannel, updateAppHeartbeat, getLastAppHeartbeat };
Loading