Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions backend/src/events/event-bus.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,21 @@
import type { DomainEventType, EventHandler, StoredEvent } from './event-types.js';
import type { AgenticPayWebSocketServer } from '../websocket/server.js';

type WildcardHandler = (event: StoredEvent) => void | Promise<void>;

const handlers = new Map<string, Set<EventHandler>>();
let wildcardHandlers: Set<WildcardHandler> = new Set();
let websocketServer: AgenticPayWebSocketServer | undefined;

const channelByEventPrefix: Array<{ prefix: string; channel: string }> = [
{ prefix: 'payment.', channel: 'payment.events' },
{ prefix: 'dispute.', channel: 'dispute.updates' },
{ prefix: 'project.disputed', channel: 'dispute.updates' },
];

export function bindWebSocketServer(server: AgenticPayWebSocketServer): void {
websocketServer = server;
}

export function subscribe<T = unknown>(type: DomainEventType, handler: EventHandler<T>): () => void {
const set = handlers.get(type) ?? new Set<EventHandler>();
Expand All @@ -29,9 +41,19 @@ export async function publish(event: StoredEvent): Promise<void> {
if (wildcardHandlers.size > 0) {
await Promise.all(Array.from(wildcardHandlers).map((h) => h(event)));
}

const channel = channelByEventPrefix.find(({ prefix }) => event.type.startsWith(prefix))?.channel;
if (channel) {
websocketServer?.broadcastToChannel(channel, {
type: event.type,
payload: event,
priority: channel === 'dispute.updates' ? 'high' : 'normal',
});
}
}

export function clearHandlers(): void {
handlers.clear();
wildcardHandlers = new Set();
websocketServer = undefined;
}
4 changes: 3 additions & 1 deletion backend/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ import { disputeService } from './disputes/disputeService.js';
import http from 'node:http';
import { attachWebSocketServer } from './websocket/server.js';
import { createWebSocketRouter } from './routes/websocket.js';
import { bindWebSocketServer } from './events/event-bus.js';
import { receiptsRouter } from './routes/receipts.js';
import { eventsRouter } from './routes/events.js';
import { threatDetectionRouter } from './routes/threat-detection.js';
Expand Down Expand Up @@ -358,12 +359,13 @@ setInterval(async () => {

const server = http.createServer(app);
const wsServer = attachWebSocketServer({ server, options: { path: '/ws' } });
bindWebSocketServer(wsServer);
app.use('/api/v1/websocket', createWebSocketRouter(wsServer));
app.use('/api/v1/analytics', createAnalyticsRouter(wsServer));

// Broadcast analytics snapshot every 30 seconds to all connected WebSocket clients
const analyticsInterval = setInterval(() => {
wsServer.broadcast({ type: 'analytics:update', payload: analyticsService.snapshot() });
wsServer.broadcastToChannel('analytics.updates', { type: 'analytics:update', payload: analyticsService.snapshot() });
}, 30_000);

server.listen(config.server.port, () => {
Expand Down
69 changes: 69 additions & 0 deletions backend/src/routes/receipts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,14 @@ import {
getReceiptByTxHash,
getReceiptsByWallet,
getAllReceipts,
verifyReceiptProof,
getReceiptByMerkleRoot,
searchReceipts,
archiveReceipts,
generateReceiptPdf,
} from '../services/receipts.js';
import {
archiveReceiptSchema,
mintReceiptSchema,
batchMintReceiptSchema,
transferReceiptSchema,
Expand Down Expand Up @@ -72,6 +78,31 @@ receiptsRouter.delete(
})
);

receiptsRouter.get(
'/search',
cacheControl({ maxAge: CacheTTL.SHORT }),
asyncHandler(async (req, res) => {
res.json(searchReceipts({
paymentId: req.query.paymentId as string | undefined,
txHash: req.query.txHash as string | undefined,
wallet: req.query.wallet as string | undefined,
currency: req.query.currency as string | undefined,
from: req.query.from as string | undefined,
to: req.query.to as string | undefined,
includeArchived: req.query.includeArchived === 'true',
}));
})
);

receiptsRouter.post(
'/archive',
validate(archiveReceiptSchema),
asyncHandler(async (req, res) => {
const archived = archiveReceipts(req.body.retentionBefore);
res.json({ archived, count: archived.length });
})
);

receiptsRouter.get(
'/',
cacheControl({ maxAge: CacheTTL.SHORT }),
Expand All @@ -91,6 +122,16 @@ receiptsRouter.get(
})
);

receiptsRouter.get(
'/by-root/:root',
cacheControl({ maxAge: CacheTTL.SHORT }),
asyncHandler(async (req, res) => {
const receipt = getReceiptByMerkleRoot(req.params.root);
if (!receipt) throw new AppError(404, 'Receipt not found', 'NOT_FOUND');
res.json(receipt);
})
);

receiptsRouter.get(
'/by-tx/:txHash',
cacheControl({ maxAge: CacheTTL.SHORT }),
Expand Down Expand Up @@ -119,6 +160,34 @@ receiptsRouter.get(
})
);

receiptsRouter.get(
'/:tokenId/verify',
cacheControl({ maxAge: CacheTTL.SHORT }),
asyncHandler(async (req, res) => {
const receipt = getReceiptByTokenId(req.params.tokenId);
if (!receipt) throw new AppError(404, 'Receipt not found', 'NOT_FOUND');
res.json({
tokenId: receipt.tokenId,
merkleRoot: receipt.merkleRoot,
valid: verifyReceiptProof(receipt),
proof: receipt.merkleProof,
});
})
);

receiptsRouter.get(
'/:tokenId/pdf',
cacheControl({ maxAge: CacheTTL.LONG }),
asyncHandler(async (req, res) => {
const receipt = getReceiptByTokenId(req.params.tokenId);
if (!receipt) throw new AppError(404, 'Receipt not found', 'NOT_FOUND');
const pdf = generateReceiptPdf(receipt);
res.setHeader('Content-Type', 'application/pdf');
res.setHeader('Content-Disposition', `attachment; filename="${receipt.tokenId}.pdf"`);
res.send(pdf);
})
);

receiptsRouter.get(
'/:tokenId/metadata',
cacheControl({ maxAge: CacheTTL.LONG }),
Expand Down
9 changes: 8 additions & 1 deletion backend/src/schemas/receipts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ export const mintReceiptSchema = z.object({
sender: z.string().min(1, 'Sender address is required'),
recipient: z.string().min(1, 'Recipient address is required'),
amount: z.number().positive('Amount must be positive'),
asset: z.string().min(1, 'Asset is required'),
asset: z.string().min(1, 'Asset is required').optional(),
currency: z.string().min(1, 'Currency is required').optional(),
timestamp: z.string().datetime().optional(),
retentionUntil: z.string().datetime().optional(),
});

export const batchMintReceiptSchema = z.object({
Expand All @@ -16,3 +19,7 @@ export const batchMintReceiptSchema = z.object({
export const transferReceiptSchema = z.object({
newOwner: z.string().min(1, 'New owner address is required'),
});

export const archiveReceiptSchema = z.object({
retentionBefore: z.string().datetime('Retention cutoff must be an ISO timestamp'),
});
Loading
Loading