Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 116 additions & 0 deletions packages/control-api/src/webhook-gateway.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,122 @@ describe('webhook gateway HTTP endpoint', () => {
}
});

it('deduplicates GitHub webhook retries by delivery id', async () => {
registerWebhookGatewaySource(workspacePath, {
key: 'github-main',
provider: 'github',
secret: 'github-secret',
actor: 'github-bot',
});

const handle = await startWorkgraphServer({
workspacePath,
host: '127.0.0.1',
port: 0,
});
try {
const payload = JSON.stringify({
action: 'synchronize',
pull_request: {
number: 42,
},
});
const signature = signGithub(payload, 'github-secret');
const first = await fetch(`${handle.baseUrl}/webhook-gateway/github-main`, {
method: 'POST',
headers: {
'content-type': 'application/json',
'x-github-event': 'pull_request',
'x-github-delivery': 'delivery-dup-1',
'x-hub-signature-256': signature,
},
body: payload,
});
const firstBody = await first.json() as Record<string, unknown>;
expect(first.status).toBe(202);
expect(firstBody.accepted).toBe(true);

const second = await fetch(`${handle.baseUrl}/webhook-gateway/github-main`, {
method: 'POST',
headers: {
'content-type': 'application/json',
'x-github-event': 'pull_request',
'x-github-delivery': 'delivery-dup-1',
'x-hub-signature-256': signature,
},
body: payload,
});
const secondBody = await second.json() as Record<string, unknown>;
expect(second.status).toBe(200);
expect(secondBody.accepted).toBe(false);
expect(secondBody.reason).toBe('duplicate');
expect(secondBody.duplicateBy).toBe('deliveryId');

const recent = ledger.recent(workspacePath, 20);
const gatewayEntries = recent.filter((entry) => entry.target.includes('.workgraph/webhook-gateway/github-main/delivery-dup-1'));
expect(gatewayEntries).toHaveLength(1);
} finally {
await handle.close();
}
});

it('deduplicates GitHub webhook retries by payload digest', async () => {
registerWebhookGatewaySource(workspacePath, {
key: 'github-main',
provider: 'github',
secret: 'github-secret',
actor: 'github-bot',
});

const handle = await startWorkgraphServer({
workspacePath,
host: '127.0.0.1',
port: 0,
});
try {
const payload = JSON.stringify({
action: 'opened',
issue: {
number: 77,
},
});
const signature = signGithub(payload, 'github-secret');
const first = await fetch(`${handle.baseUrl}/webhook-gateway/github-main`, {
method: 'POST',
headers: {
'content-type': 'application/json',
'x-github-event': 'issues',
'x-github-delivery': 'delivery-digest-1',
'x-hub-signature-256': signature,
},
body: payload,
});
expect(first.status).toBe(202);

const second = await fetch(`${handle.baseUrl}/webhook-gateway/github-main`, {
method: 'POST',
headers: {
'content-type': 'application/json',
'x-github-event': 'issues',
'x-github-delivery': 'delivery-digest-2',
'x-hub-signature-256': signature,
},
body: payload,
});
const secondBody = await second.json() as Record<string, unknown>;
expect(second.status).toBe(200);
expect(secondBody.accepted).toBe(false);
expect(secondBody.reason).toBe('duplicate');
expect(secondBody.duplicateBy).toBe('payloadDigest');

const recent = ledger.recent(workspacePath, 20);
const gatewayEntries = recent.filter((entry) => entry.target.includes('.workgraph/webhook-gateway/github-main/'));
expect(gatewayEntries).toHaveLength(1);
} finally {
await handle.close();
}
});

it('rejects invalid GitHub signatures', async () => {
registerWebhookGatewaySource(workspacePath, {
key: 'github-main',
Expand Down
90 changes: 90 additions & 0 deletions packages/control-api/src/webhook-gateway.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ const DEFAULT_LOG_LIMIT = 50;
const MAX_LOG_LIMIT = 1_000;
const MAX_WEBHOOK_BODY_BYTES = 2 * 1024 * 1024;
const SLACK_SIGNATURE_MAX_AGE_SECONDS = 60 * 5;
const WEBHOOK_DEDUP_TTL_MS = 5 * 60_000;
const WEBHOOK_DEDUP_MAX_ENTRIES = 1_000;

const recentWebhookDedup = new Map<string, number>();

export type WebhookGatewayProvider = 'github' | 'linear' | 'slack' | 'generic';
type LogStatus = 'accepted' | 'rejected';
Expand Down Expand Up @@ -339,6 +343,41 @@ export function registerWebhookGatewayEndpoint(app: any, workspacePath: string):
return;
}

const duplicateBy = detectRecentWebhookDuplicate(
workspacePath,
source.key,
adapted.deliveryId,
payloadDigest,
);
if (duplicateBy) {
const duplicateLog: WebhookGatewayLogEntry = {
id: randomUUID(),
ts: new Date().toISOString(),
sourceKey: source.key,
provider: source.provider,
eventType: adapted.eventType,
actor,
status: 'accepted',
statusCode: 200,
signatureVerified: verification.verified,
message: `Duplicate webhook ignored (${duplicateBy}).`,
deliveryId: adapted.deliveryId,
payloadDigest,
};
appendWebhookGatewayLog(workspacePath, duplicateLog);
writeWebhookGatewayHttpResponse(res, 200, {
ok: true,
accepted: false,
reason: 'duplicate',
duplicateBy,
source: source.key,
provider: source.provider,
eventType: adapted.eventType,
deliveryId: adapted.deliveryId,
});
return;
}

appendWebhookGatewayLedgerEvent(workspacePath, source, {
eventType: adapted.eventType,
deliveryId: adapted.deliveryId,
Expand Down Expand Up @@ -378,6 +417,57 @@ export function registerWebhookGatewayEndpoint(app: any, workspacePath: string):
});
}

function detectRecentWebhookDuplicate(
workspacePath: string,
sourceKey: string,
deliveryId: string,
payloadDigest: string,
): 'deliveryId' | 'payloadDigest' | null {
const nowMs = Date.now();
evictExpiredWebhookDedupEntries(nowMs);
const deliveryKey = `${workspacePath}|${sourceKey}|delivery|${deliveryId}`;
if (isWebhookDedupHit(deliveryKey, nowMs)) {
return 'deliveryId';
}
const payloadKey = `${workspacePath}|${sourceKey}|digest|${payloadDigest}`;
if (isWebhookDedupHit(payloadKey, nowMs)) {
return 'payloadDigest';
}
rememberWebhookDedupKey(deliveryKey, nowMs);
rememberWebhookDedupKey(payloadKey, nowMs);
return null;
}

function isWebhookDedupHit(key: string, nowMs: number): boolean {
const expiresAt = recentWebhookDedup.get(key);
if (!expiresAt) return false;
if (expiresAt <= nowMs) {
recentWebhookDedup.delete(key);
return false;
}
// Re-insert to refresh LRU order while keeping original expiration.
recentWebhookDedup.delete(key);
recentWebhookDedup.set(key, expiresAt);
return true;
}

function rememberWebhookDedupKey(key: string, nowMs: number): void {
recentWebhookDedup.set(key, nowMs + WEBHOOK_DEDUP_TTL_MS);
while (recentWebhookDedup.size > WEBHOOK_DEDUP_MAX_ENTRIES) {
const oldest = recentWebhookDedup.keys().next().value as string | undefined;
if (!oldest) break;
recentWebhookDedup.delete(oldest);
}
}

function evictExpiredWebhookDedupEntries(nowMs: number): void {
for (const [key, expiresAt] of recentWebhookDedup.entries()) {
if (expiresAt <= nowMs) {
recentWebhookDedup.delete(key);
}
}
}

function readWebhookGatewayStore(workspacePath: string): WebhookGatewayStoreFile {
const filePath = webhookGatewayStorePath(workspacePath);
if (!fs.existsSync(filePath)) {
Expand Down
1 change: 1 addition & 0 deletions packages/kernel/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ export * as ledger from './ledger.js';
export * as auth from './auth.js';
export * as store from './store.js';
export * as thread from './thread.js';
export * as threadContext from './thread-context.js';
export * as mission from './mission.js';
export * as missionOrchestrator from './mission-orchestrator.js';
export * as capability from './capability.js';
Expand Down
126 changes: 126 additions & 0 deletions packages/kernel/src/thread-context.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
import fs from 'node:fs';
import os from 'node:os';
import path from 'node:path';
import matter from 'gray-matter';
import { afterEach, beforeEach, describe, expect, it } from 'vitest';
import { loadRegistry, saveRegistry } from './registry.js';
import * as thread from './thread.js';
import * as threadContext from './thread-context.js';

let workspacePath: string;
let threadPath: string;

beforeEach(() => {
workspacePath = fs.mkdtempSync(path.join(os.tmpdir(), 'wg-thread-context-'));
const registry = loadRegistry(workspacePath);
saveRegistry(workspacePath, registry);
threadPath = thread.createThread(
workspacePath,
'Thread Context Fixture',
'Validate thread context operations',
'agent-seed',
).path;
});

afterEach(() => {
fs.rmSync(workspacePath, { recursive: true, force: true });
});

describe('thread context store', () => {
it('adds entries and lists metadata/content', () => {
const first = threadContext.addThreadContextEntry(workspacePath, threadPath, {
title: 'Architecture note',
content: 'We should keep context local to a thread for retrieval.',
source: 'adr-001',
addedBy: 'agent-a',
relevanceScore: 0.9,
});
const second = threadContext.addThreadContextEntry(workspacePath, threadPath, {
title: 'Research snippet',
content: 'BM25 ranking is robust for keyword retrieval tasks.',
addedBy: 'agent-b',
relevanceScore: 0.4,
});

expect(first.path).toContain('.workgraph/thread-context/thread-context-fixture/context/');
expect(second.path).toContain('.workgraph/thread-context/thread-context-fixture/context/');

const listed = threadContext.listThreadContextEntries(workspacePath, threadPath);
expect(listed).toHaveLength(2);
expect(listed[0].title).toBe('Architecture note');
expect(listed[0].source).toBe('adr-001');
expect(listed[1].title).toBe('Research snippet');
});

it('searches entries with BM25 ranking', () => {
threadContext.addThreadContextEntry(workspacePath, threadPath, {
title: 'Webhook dedup design',
content: 'Use payload digest and delivery id dedup in the webhook gateway.',
addedBy: 'agent-a',
relevanceScore: 0.8,
});
threadContext.addThreadContextEntry(workspacePath, threadPath, {
title: 'Standalone threads',
content: 'Add a top-level thread creation tool with idempotency support.',
addedBy: 'agent-a',
relevanceScore: 0.6,
});
threadContext.addThreadContextEntry(workspacePath, threadPath, {
title: 'Unrelated note',
content: 'Color palette and spacing tweaks for dashboard.',
addedBy: 'agent-a',
relevanceScore: 0.9,
});

const results = threadContext.searchThreadContextEntries(
workspacePath,
threadPath,
'webhook delivery dedup',
{ limit: 2 },
);

expect(results).toHaveLength(1);
expect(results[0].title).toBe('Webhook dedup design');
expect(results[0].bm25_score).toBeGreaterThan(0);
expect(results[0].snippet.toLowerCase()).toContain('dedup');
});

it('prunes stale and low relevance entries', () => {
const stale = threadContext.addThreadContextEntry(workspacePath, threadPath, {
title: 'Old context',
content: 'This note is no longer fresh.',
addedBy: 'agent-a',
relevanceScore: 0.9,
});
const low = threadContext.addThreadContextEntry(workspacePath, threadPath, {
title: 'Low signal',
content: 'This entry should be removed by relevance.',
addedBy: 'agent-a',
relevanceScore: 0.1,
});
const keep = threadContext.addThreadContextEntry(workspacePath, threadPath, {
title: 'Keep context',
content: 'High relevance and recent entry.',
addedBy: 'agent-a',
relevanceScore: 0.95,
});

const staleAbsPath = path.join(workspacePath, stale.path);
const staleParsed = matter(fs.readFileSync(staleAbsPath, 'utf-8'));
staleParsed.data.added_at = '2000-01-01T00:00:00.000Z';
fs.writeFileSync(staleAbsPath, matter.stringify(staleParsed.content, staleParsed.data), 'utf-8');

const pruned = threadContext.pruneThreadContextEntries(workspacePath, threadPath, {
maxAgeMinutes: 60,
minRelevance: 0.5,
now: new Date('2026-03-11T00:00:00.000Z'),
});

expect(pruned.removedCount).toBe(2);
expect(pruned.keptCount).toBe(1);
expect(pruned.removed.map((entry) => entry.path).sort()).toEqual([low.path, stale.path].sort());
const remaining = threadContext.listThreadContextEntries(workspacePath, threadPath);
expect(remaining).toHaveLength(1);
expect(remaining[0].path).toBe(keep.path);
});
});
Loading
Loading