diff --git a/FORK.md b/FORK.md index b9f06040c..ef99754ce 100644 --- a/FORK.md +++ b/FORK.md @@ -23,3 +23,4 @@ line in the PR that syncs it back. | a9d47a12 | api: finalize codex turn.failed as failed_permanent + post failure notice to Slack; raise iron-proxy upstream header timeout to 300s (codex remote compaction); fold signoz/aws header allowlist into base.yaml | [#13](https://github.com/Tavus-Engineering/centaur/pull/13) | | dcfd647c | slackbot: route in-thread Watch Agent mentions through DM and post results back only after approval | [#14](https://github.com/Tavus-Engineering/centaur/pull/14) | | 964493b4 | sandbox: launch Codex with external-sandbox bypass so shell commands work in Kubernetes sandboxes | [#15](https://github.com/Tavus-Engineering/centaur/pull/15) | +| 06a05515 | slackbot: dedupe Slack messages by message identity (team:channel:ts) so an app_mention + message double-delivery collapses to one DM handoff | [#16](https://github.com/Tavus-Engineering/centaur/pull/16) | diff --git a/services/slackbot/src/index.test.ts b/services/slackbot/src/index.test.ts index acb20e7d1..46a03e43b 100644 --- a/services/slackbot/src/index.test.ts +++ b/services/slackbot/src/index.test.ts @@ -137,9 +137,10 @@ describe('Slack event HTTP dedupe', () => { } }) - it('routes addressed channel-thread mentions into a DM workflow handoff', async () => { + it('routes addressed channel-thread mentions into one DM workflow handoff', async () => { process.env.SLACK_SIGNING_SECRET = 'test-signing-secret' process.env.SLACK_BOT_TOKEN = 'xoxb-thread-route-test' + process.env.SLACK_EVENT_DEDUP_TTL_MS = '600000' delete process.env.SLACKBOT_API_KEY delete process.env.CENTAUR_API_KEY @@ -197,9 +198,9 @@ describe('Slack event HTTP dedupe', () => { try { const { app } = await import(`./index.ts?thread_route=${Date.now()}`) - const body = JSON.stringify({ + const mentionBody = JSON.stringify({ type: 'event_callback', - event_id: 'Ev-thread-route', + event_id: 'Ev-thread-route-app', team_id: 'T123', event: { type: 'app_mention', @@ -210,10 +211,33 @@ describe('Slack event HTTP dedupe', () => { text: '<@UBOT> investigate this' } }) + const messageBody = JSON.stringify({ + type: 'event_callback', + event_id: 'Ev-thread-route-message', + team_id: 'T123', + event: { + type: 'message', + user: 'U123', + channel: 'C123', + thread_ts: '1778883000.000000', + ts: '1778883001.000000', + text: '<@UBOT> investigate this' + } + }) const waits: Promise[] = [] - const response = await app.request( + const mentionResponse = await app.request( '/api/webhooks/slack', - signedJsonRequest(body, process.env.SLACK_SIGNING_SECRET), + signedJsonRequest(mentionBody, process.env.SLACK_SIGNING_SECRET), + {}, + { + waitUntil: (promise: Promise) => { + waits.push(promise) + } + } as any + ) + const messageResponse = await app.request( + '/api/webhooks/slack', + signedJsonRequest(messageBody, process.env.SLACK_SIGNING_SECRET), {}, { waitUntil: (promise: Promise) => { @@ -222,8 +246,10 @@ describe('Slack event HTTP dedupe', () => { } as any ) - expect(response.status).toBe(200) - expect(await response.json()).toEqual({ ok: true }) + expect(mentionResponse.status).toBe(200) + expect(await mentionResponse.json()).toEqual({ ok: true }) + expect(messageResponse.status).toBe(200) + expect(await messageResponse.json()).toEqual({ ok: true, duplicate: true }) await Promise.allSettled(waits) expect(slackCalls.find(call => call.path === '/api/reactions.add')?.body).toMatchObject({ @@ -234,7 +260,9 @@ describe('Slack event HTTP dedupe', () => { expect(slackCalls.find(call => call.path === '/api/conversations.open')?.body).toMatchObject({ users: 'U123' }) - const dmRoot = slackCalls.find(call => call.path === '/api/chat.postMessage') + const dmRootMessages = slackCalls.filter(call => call.path === '/api/chat.postMessage') + expect(dmRootMessages).toHaveLength(1) + const dmRoot = dmRootMessages[0] expect(dmRoot?.body).toMatchObject({ channel: 'D123' }) @@ -251,6 +279,7 @@ describe('Slack event HTTP dedupe', () => { const workflow = centaurRequests.find(request => request.path === '/workflows/runs')?.body as | { input?: Record } | undefined + expect(centaurRequests.filter(request => request.path === '/workflows/runs')).toHaveLength(1) expect(workflow?.input).toMatchObject({ thread_key: 'slack:T123:D123:1778884000.000000', delivery: { @@ -330,9 +359,9 @@ describe('Slack event HTTP dedupe', () => { expect(second.status).toBe(200) expect(await second.json()).toEqual({ ok: true, duplicate: true }) expect(console.warn).toHaveBeenCalledWith( - 'slack_duplicate_event_skipped', + 'slack_duplicate_message_skipped', expect.objectContaining({ - dedupe_key: 'event:Ev-duplicate', + dedupe_key: 'message:T123:C123:1778883099.579529', event_id: 'Ev-duplicate', team_id: 'T123', channel_id: 'C123', diff --git a/services/slackbot/src/index.ts b/services/slackbot/src/index.ts index 54379dd1f..791a26308 100644 --- a/services/slackbot/src/index.ts +++ b/services/slackbot/src/index.ts @@ -19,7 +19,7 @@ import { import { AgentSessionRenderer, withAgentSessionLock } from './slack/agent-session' import { authorizeSlackOrg } from './slack/authorization' import { CodexSessionRenderer, hasActiveCodexSession } from './slack/codex-session' -import { EventDeduper, slackDedupKey } from './slack/dedup' +import { EventDeduper, slackDedupKeys } from './slack/dedup' import { duplicateSlackAlertText, type DuplicateSlackEventDetails } from './slack/duplicate-alert' import { publishWatchAgentHome } from './slack/home' import { EnvSlackInstallationStore, SlackClientResolver } from './slack/installations' @@ -149,13 +149,16 @@ const slackHandler = async (c: Context<{ Variables: Variables }>) => { if (envelope.type === 'url_verification') return c.json({ challenge: envelope.challenge }) const event = envelope.event - const key = slackDedupKey({ + const keys = slackDedupKeys({ eventId: envelope.event_id, + eventType: typeof event?.type === 'string' ? event.type : undefined, teamId: envelope.team_id, channelId: typeof event?.channel === 'string' ? event.channel : undefined, messageTs: typeof event?.ts === 'string' ? event.ts : undefined }) - if (!deduper.checkAndRemember(key)) { + const dedupe = deduper.checkAndRememberAll(keys) + if (!dedupe.ok) { + const key = dedupe.key const duplicate = duplicateSlackEventDetails(envelope, event, key) logWarn( key.startsWith('message:') diff --git a/services/slackbot/src/slack/dedup.test.ts b/services/slackbot/src/slack/dedup.test.ts index 71897ecba..caaa43eb0 100644 --- a/services/slackbot/src/slack/dedup.test.ts +++ b/services/slackbot/src/slack/dedup.test.ts @@ -1,5 +1,5 @@ import { describe, expect, it } from 'bun:test' -import { EventDeduper, slackDedupKey } from './dedup' +import { EventDeduper, slackDedupKey, slackDedupKeys } from './dedup' describe('EventDeduper', () => { it('rejects duplicate keys until the TTL expires', () => { @@ -10,6 +10,25 @@ describe('EventDeduper', () => { expect(deduper.checkAndRemember('event:Ev123', 1_101)).toBe(true) }) + it('rejects duplicate key batches if any key is still live', () => { + const deduper = new EventDeduper(100) + + expect(deduper.checkAndRememberAll(['message:T123:C123:1', 'event:Ev-app'], 1_000)).toEqual({ + ok: true + }) + expect(deduper.checkAndRememberAll(['message:T123:C123:1', 'event:Ev-message'], 1_050)).toEqual( + { + ok: false, + key: 'message:T123:C123:1' + } + ) + expect(deduper.checkAndRememberAll(['message:T123:C123:1', 'event:Ev-message'], 1_101)).toEqual( + { + ok: true + } + ) + }) + it('prefers Slack event IDs and falls back to message identity', () => { expect( slackDedupKey({ @@ -28,4 +47,23 @@ describe('EventDeduper', () => { }) ).toBe('message:T123:C123:1778883099.579529') }) + + it('uses message identity before event ID for Slack message callbacks', () => { + expect( + slackDedupKeys({ + eventId: 'Ev-app', + eventType: 'app_mention', + teamId: 'T123', + channelId: 'C123', + messageTs: '1778883099.579529' + }) + ).toEqual(['message:T123:C123:1778883099.579529', 'event:Ev-app']) + + expect( + slackDedupKeys({ + eventId: 'Ev-home', + eventType: 'app_home_opened' + }) + ).toEqual(['event:Ev-home']) + }) }) diff --git a/services/slackbot/src/slack/dedup.ts b/services/slackbot/src/slack/dedup.ts index 059d64b07..c4dfc5ff5 100644 --- a/services/slackbot/src/slack/dedup.ts +++ b/services/slackbot/src/slack/dedup.ts @@ -14,6 +14,19 @@ export class EventDeduper { return true } + checkAndRememberAll(keys: string[], now = Date.now()): { ok: true } | { ok: false; key: string } { + this.prune(now) + for (const key of keys) { + const expiresAt = this.seen.get(key) + if (expiresAt && expiresAt > now) return { ok: false, key } + } + const expiresAt = now + this.ttlMs + for (const key of keys) { + this.seen.set(key, expiresAt) + } + return { ok: true } + } + private prune(now: number): void { for (const [key, expiresAt] of this.seen) { if (expiresAt <= now) this.seen.delete(key) @@ -30,3 +43,31 @@ export function slackDedupKey(opts: { if (opts.eventId) return `event:${opts.eventId}` return `message:${opts.teamId ?? 'unknown'}:${opts.channelId ?? 'unknown'}:${opts.messageTs ?? 'unknown'}` } + +export function slackDedupKeys(opts: { + eventId?: string + eventType?: string + teamId?: string + channelId?: string + messageTs?: string +}): string[] { + const keys: string[] = [] + if (isSlackMessageCallback(opts.eventType) && opts.channelId && opts.messageTs) { + keys.push(messageDedupKey(opts)) + } + if (opts.eventId) keys.push(`event:${opts.eventId}`) + if (!keys.length) keys.push(messageDedupKey(opts)) + return keys +} + +function isSlackMessageCallback(eventType: string | undefined): boolean { + return eventType === 'app_mention' || eventType === 'message' +} + +function messageDedupKey(opts: { + teamId?: string + channelId?: string + messageTs?: string +}): string { + return `message:${opts.teamId ?? 'unknown'}:${opts.channelId ?? 'unknown'}:${opts.messageTs ?? 'unknown'}` +}