Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 62 additions & 2 deletions src/telegram/adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,20 @@ export interface TelegramMessageInfo {
message_thread_id?: number;
}

/** In-memory conversation history per chat thread */
interface ChatHistory {
messages: NormalizedMessage[];
lastUpdated: number;
}

const MAX_HISTORY_PER_CHAT = 50;
const MAX_CHATS = 500;
const HISTORY_TTL_MS = 2 * 60 * 60 * 1000; // 2 hours

export class TelegramAdapter {
private client: TelegramClient;
private botInfo: TelegramUser | null = null;
private chatHistories = new Map<string, ChatHistory>();

constructor(client: TelegramClient) {
this.client = client;
Expand Down Expand Up @@ -70,7 +81,39 @@ export class TelegramAdapter {
};
}

/** Build ConversationContext from a Telegram message */
/** Add a message to chat history */
addToHistory(threadId: string, message: NormalizedMessage): void {
const now = Date.now();
this.cleanupExpired();

let history = this.chatHistories.get(threadId);
if (!history) {
// Evict oldest chat if at capacity
if (this.chatHistories.size >= MAX_CHATS) {
let oldestKey: string | null = null;
let oldestTime = Infinity;
for (const [key, h] of this.chatHistories.entries()) {
if (h.lastUpdated < oldestTime) {
oldestTime = h.lastUpdated;
oldestKey = key;
}
}
if (oldestKey) this.chatHistories.delete(oldestKey);
}
history = { messages: [], lastUpdated: now };
this.chatHistories.set(threadId, history);
}

history.messages.push(message);
history.lastUpdated = now;

// Trim to max history size
if (history.messages.length > MAX_HISTORY_PER_CHAT) {
history.messages = history.messages.slice(-MAX_HISTORY_PER_CHAT);
}
}

/** Build ConversationContext from a Telegram message, including chat history */
buildConversationContext(msg: TelegramMessageInfo): ConversationContext {
const chatId = String(msg.chat.id);
const threadId = msg.message_thread_id ? `${chatId}:${msg.message_thread_id}` : chatId;
Expand All @@ -84,13 +127,20 @@ export class TelegramAdapter {
.trim();
}

// Add current user message to history
this.addToHistory(threadId, current);

// Get full conversation history for this chat
const history = this.chatHistories.get(threadId);
const messages = history ? [...history.messages] : [current];

return {
transport: 'telegram',
thread: {
id: threadId,
url: msg.chat.username ? `https://t.me/${msg.chat.username}/${msg.message_id}` : undefined,
},
messages: [current],
messages,
current,
attributes: {
chat_id: chatId,
Expand All @@ -103,4 +153,14 @@ export class TelegramAdapter {
},
};
}

/** Remove expired chat histories */
private cleanupExpired(): void {
const now = Date.now();
for (const [key, history] of this.chatHistories.entries()) {
if (now - history.lastUpdated > HISTORY_TTL_MS) {
this.chatHistories.delete(key);
}
}
}
}
55 changes: 48 additions & 7 deletions src/telegram/polling-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ export class TelegramPollingRunner {
if (!isMentioned && !isReplyToBot) return;
}

// 6. Build conversation context
// 6. Build conversation context (includes chat history)
const conversation = this.adapter.buildConversationContext(msg);

// 7. Build webhook data map (same pattern as Slack)
Expand Down Expand Up @@ -193,11 +193,15 @@ export class TelegramPollingRunner {
const allChecks = Object.keys(this.cfg.checks || {});
if (allChecks.length === 0) return;

const chatTrackKey = this.trackChat(String(msg.chat.id));
// Derive thread ID for history tracking
const chatId = String(msg.chat.id);
const threadId = msg.message_thread_id ? `${chatId}:${msg.message_thread_id}` : chatId;

const chatTrackKey = this.trackChat(chatId);
try {
const runEngine = new StateMachineExecutionEngine();

// Inject Telegram client into execution context
// Inject Telegram client and conversation into execution context
try {
const parentCtx: any = (this.engine as any).getExecutionContext?.() || {};
const prevCtx: any = (runEngine as any).getExecutionContext?.() || {};
Expand All @@ -206,11 +210,12 @@ export class TelegramPollingRunner {
...prevCtx,
telegram: this.client,
telegramClient: this.client,
conversation,
});
} catch {}

logger.info(
`[TelegramPolling] Dispatching engine run for chat ${msg.chat.id} (${msg.chat.type})`
`[TelegramPolling] Dispatching engine run for chat ${msg.chat.id} (${msg.chat.type}), history: ${conversation.messages.length} messages`
);

const execFn = () =>
Expand All @@ -223,26 +228,40 @@ export class TelegramPollingRunner {
debug: process.env.VISOR_DEBUG === 'true',
} as any);

let result: any;
if (this.taskStore) {
const { trackExecution } = await import('../agent-protocol/track-execution');
await trackExecution(
result = await trackExecution(
{
taskStore: this.taskStore,
source: 'telegram',
workflowId: allChecks.join(','),
configPath: this.configPath,
messageText: String(msg.text || msg.caption || 'Telegram message'),
metadata: {
telegram_chat_id: String(msg.chat.id),
telegram_chat_id: chatId,
telegram_chat_type: msg.chat.type,
telegram_user: msg.from ? String(msg.from.id) : 'unknown',
},
},
execFn
);
} else {
await execFn();
result = await execFn();
}

// Capture bot response and add to conversation history
try {
const botResponse = this.extractBotResponse(result);
if (botResponse) {
this.adapter.addToHistory(threadId, {
role: 'bot',
text: botResponse,
timestamp: String(Math.floor(Date.now() / 1000)),
origin: 'visor',
});
}
} catch {}
} catch (e) {
logger.error(
`[TelegramPolling] Engine execution failed: ${e instanceof Error ? e.message : String(e)}`
Expand All @@ -267,6 +286,28 @@ export class TelegramPollingRunner {
}
}

/** Extract bot response text from engine execution result */
private extractBotResponse(result: any): string | null {
if (!result) return null;
try {
// Try workflow output 'text' field from history
const history = result?.reviewSummary?.history;
if (history) {
for (const entry of Object.values(history) as any[]) {
if (entry?.text && typeof entry.text === 'string') return entry.text;
}
}
// Try parsing from formatted issue message
const msg = result?.reviewSummary?.issues?.[0]?.message || '';
const textMatch = msg.match(/^text:\s*(.+?)(?:\nintent:|\ntags:|\ntopic:|\n\n|$)/ms);
if (textMatch?.[1]) return textMatch[1].trim();
// Try raw response
const raw = result?.reviewSummary?.debug?.rawResponse;
if (raw && typeof raw === 'string') return raw;
} catch {}
return null;
}

/** Deduplication: track processed updates by chat_id:message_id */
private isDuplicate(messageId: number, chatId: number): boolean {
const key = chatId * 1000000 + messageId; // simple composite key
Expand Down
Loading