Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions FORK.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@ line in the PR that syncs it back.
| a9d47a12 | api: finalize codex turn.failed as failed_permanent + post failure notice to Slack; raise iron-proxy upstream header timeout to 300s (codex remote compaction); fold signoz/aws header allowlist into base.yaml | [#13](https://github.com/Tavus-Engineering/centaur/pull/13) |
| dcfd647c | slackbot: route in-thread Watch Agent mentions through DM and post results back only after approval | [#14](https://github.com/Tavus-Engineering/centaur/pull/14) |
| 964493b4 | sandbox: launch Codex with external-sandbox bypass so shell commands work in Kubernetes sandboxes | [#15](https://github.com/Tavus-Engineering/centaur/pull/15) |
| 06a05515 | slackbot: dedupe Slack messages by message identity (team:channel:ts) so an app_mention + message double-delivery collapses to one DM handoff | [#16](https://github.com/Tavus-Engineering/centaur/pull/16) |
49 changes: 39 additions & 10 deletions services/slackbot/src/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,10 @@ describe('Slack event HTTP dedupe', () => {
}
})

it('routes addressed channel-thread mentions into a DM workflow handoff', async () => {
it('routes addressed channel-thread mentions into one DM workflow handoff', async () => {
process.env.SLACK_SIGNING_SECRET = 'test-signing-secret'
process.env.SLACK_BOT_TOKEN = 'xoxb-thread-route-test'
process.env.SLACK_EVENT_DEDUP_TTL_MS = '600000'
delete process.env.SLACKBOT_API_KEY
delete process.env.CENTAUR_API_KEY

Expand Down Expand Up @@ -197,9 +198,9 @@ describe('Slack event HTTP dedupe', () => {

try {
const { app } = await import(`./index.ts?thread_route=${Date.now()}`)
const body = JSON.stringify({
const mentionBody = JSON.stringify({
type: 'event_callback',
event_id: 'Ev-thread-route',
event_id: 'Ev-thread-route-app',
team_id: 'T123',
event: {
type: 'app_mention',
Expand All @@ -210,10 +211,33 @@ describe('Slack event HTTP dedupe', () => {
text: '<@UBOT> investigate this'
}
})
const messageBody = JSON.stringify({
type: 'event_callback',
event_id: 'Ev-thread-route-message',
team_id: 'T123',
event: {
type: 'message',
user: 'U123',
channel: 'C123',
thread_ts: '1778883000.000000',
ts: '1778883001.000000',
text: '<@UBOT> investigate this'
}
})
const waits: Promise<unknown>[] = []
const response = await app.request(
const mentionResponse = await app.request(
'/api/webhooks/slack',
signedJsonRequest(body, process.env.SLACK_SIGNING_SECRET),
signedJsonRequest(mentionBody, process.env.SLACK_SIGNING_SECRET),
{},
{
waitUntil: (promise: Promise<unknown>) => {
waits.push(promise)
}
} as any
)
const messageResponse = await app.request(
'/api/webhooks/slack',
signedJsonRequest(messageBody, process.env.SLACK_SIGNING_SECRET),
{},
{
waitUntil: (promise: Promise<unknown>) => {
Expand All @@ -222,8 +246,10 @@ describe('Slack event HTTP dedupe', () => {
} as any
)

expect(response.status).toBe(200)
expect(await response.json()).toEqual({ ok: true })
expect(mentionResponse.status).toBe(200)
expect(await mentionResponse.json()).toEqual({ ok: true })
expect(messageResponse.status).toBe(200)
expect(await messageResponse.json()).toEqual({ ok: true, duplicate: true })
await Promise.allSettled(waits)

expect(slackCalls.find(call => call.path === '/api/reactions.add')?.body).toMatchObject({
Expand All @@ -234,7 +260,9 @@ describe('Slack event HTTP dedupe', () => {
expect(slackCalls.find(call => call.path === '/api/conversations.open')?.body).toMatchObject({
users: 'U123'
})
const dmRoot = slackCalls.find(call => call.path === '/api/chat.postMessage')
const dmRootMessages = slackCalls.filter(call => call.path === '/api/chat.postMessage')
expect(dmRootMessages).toHaveLength(1)
const dmRoot = dmRootMessages[0]
expect(dmRoot?.body).toMatchObject({
channel: 'D123'
})
Expand All @@ -251,6 +279,7 @@ describe('Slack event HTTP dedupe', () => {
const workflow = centaurRequests.find(request => request.path === '/workflows/runs')?.body as
| { input?: Record<string, unknown> }
| undefined
expect(centaurRequests.filter(request => request.path === '/workflows/runs')).toHaveLength(1)
expect(workflow?.input).toMatchObject({
thread_key: 'slack:T123:D123:1778884000.000000',
delivery: {
Expand Down Expand Up @@ -330,9 +359,9 @@ describe('Slack event HTTP dedupe', () => {
expect(second.status).toBe(200)
expect(await second.json()).toEqual({ ok: true, duplicate: true })
expect(console.warn).toHaveBeenCalledWith(
'slack_duplicate_event_skipped',
'slack_duplicate_message_skipped',
expect.objectContaining({
dedupe_key: 'event:Ev-duplicate',
dedupe_key: 'message:T123:C123:1778883099.579529',
event_id: 'Ev-duplicate',
team_id: 'T123',
channel_id: 'C123',
Expand Down
9 changes: 6 additions & 3 deletions services/slackbot/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import {
import { AgentSessionRenderer, withAgentSessionLock } from './slack/agent-session'
import { authorizeSlackOrg } from './slack/authorization'
import { CodexSessionRenderer, hasActiveCodexSession } from './slack/codex-session'
import { EventDeduper, slackDedupKey } from './slack/dedup'
import { EventDeduper, slackDedupKeys } from './slack/dedup'
import { duplicateSlackAlertText, type DuplicateSlackEventDetails } from './slack/duplicate-alert'
import { publishWatchAgentHome } from './slack/home'
import { EnvSlackInstallationStore, SlackClientResolver } from './slack/installations'
Expand Down Expand Up @@ -149,13 +149,16 @@ const slackHandler = async (c: Context<{ Variables: Variables }>) => {
if (envelope.type === 'url_verification') return c.json({ challenge: envelope.challenge })

const event = envelope.event
const key = slackDedupKey({
const keys = slackDedupKeys({
eventId: envelope.event_id,
eventType: typeof event?.type === 'string' ? event.type : undefined,
teamId: envelope.team_id,
channelId: typeof event?.channel === 'string' ? event.channel : undefined,
messageTs: typeof event?.ts === 'string' ? event.ts : undefined
})
if (!deduper.checkAndRemember(key)) {
const dedupe = deduper.checkAndRememberAll(keys)
if (!dedupe.ok) {
const key = dedupe.key
const duplicate = duplicateSlackEventDetails(envelope, event, key)
logWarn(
key.startsWith('message:')
Expand Down
40 changes: 39 additions & 1 deletion services/slackbot/src/slack/dedup.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { describe, expect, it } from 'bun:test'
import { EventDeduper, slackDedupKey } from './dedup'
import { EventDeduper, slackDedupKey, slackDedupKeys } from './dedup'

describe('EventDeduper', () => {
it('rejects duplicate keys until the TTL expires', () => {
Expand All @@ -10,6 +10,25 @@ describe('EventDeduper', () => {
expect(deduper.checkAndRemember('event:Ev123', 1_101)).toBe(true)
})

it('rejects duplicate key batches if any key is still live', () => {
const deduper = new EventDeduper(100)

expect(deduper.checkAndRememberAll(['message:T123:C123:1', 'event:Ev-app'], 1_000)).toEqual({
ok: true
})
expect(deduper.checkAndRememberAll(['message:T123:C123:1', 'event:Ev-message'], 1_050)).toEqual(
{
ok: false,
key: 'message:T123:C123:1'
}
)
expect(deduper.checkAndRememberAll(['message:T123:C123:1', 'event:Ev-message'], 1_101)).toEqual(
{
ok: true
}
)
})

it('prefers Slack event IDs and falls back to message identity', () => {
expect(
slackDedupKey({
Expand All @@ -28,4 +47,23 @@ describe('EventDeduper', () => {
})
).toBe('message:T123:C123:1778883099.579529')
})

it('uses message identity before event ID for Slack message callbacks', () => {
expect(
slackDedupKeys({
eventId: 'Ev-app',
eventType: 'app_mention',
teamId: 'T123',
channelId: 'C123',
messageTs: '1778883099.579529'
})
).toEqual(['message:T123:C123:1778883099.579529', 'event:Ev-app'])

expect(
slackDedupKeys({
eventId: 'Ev-home',
eventType: 'app_home_opened'
})
).toEqual(['event:Ev-home'])
})
})
41 changes: 41 additions & 0 deletions services/slackbot/src/slack/dedup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,19 @@ export class EventDeduper {
return true
}

checkAndRememberAll(keys: string[], now = Date.now()): { ok: true } | { ok: false; key: string } {
this.prune(now)
for (const key of keys) {
const expiresAt = this.seen.get(key)
if (expiresAt && expiresAt > now) return { ok: false, key }
}
const expiresAt = now + this.ttlMs
for (const key of keys) {
this.seen.set(key, expiresAt)
}
return { ok: true }
}

private prune(now: number): void {
for (const [key, expiresAt] of this.seen) {
if (expiresAt <= now) this.seen.delete(key)
Expand All @@ -30,3 +43,31 @@ export function slackDedupKey(opts: {
if (opts.eventId) return `event:${opts.eventId}`
return `message:${opts.teamId ?? 'unknown'}:${opts.channelId ?? 'unknown'}:${opts.messageTs ?? 'unknown'}`
}

export function slackDedupKeys(opts: {
eventId?: string
eventType?: string
teamId?: string
channelId?: string
messageTs?: string
}): string[] {
const keys: string[] = []
if (isSlackMessageCallback(opts.eventType) && opts.channelId && opts.messageTs) {
keys.push(messageDedupKey(opts))
}
if (opts.eventId) keys.push(`event:${opts.eventId}`)
if (!keys.length) keys.push(messageDedupKey(opts))
return keys
}

function isSlackMessageCallback(eventType: string | undefined): boolean {
return eventType === 'app_mention' || eventType === 'message'
}

function messageDedupKey(opts: {
teamId?: string
channelId?: string
messageTs?: string
}): string {
return `message:${opts.teamId ?? 'unknown'}:${opts.channelId ?? 'unknown'}:${opts.messageTs ?? 'unknown'}`
}
Loading