From 0bc29590a16761367d9b379ed61a804184a3a1ec Mon Sep 17 00:00:00 2001 From: Andrew Zolotukhin Date: Tue, 12 May 2026 20:41:35 +0000 Subject: [PATCH] fix: add nested otel spans for __batch requests --- .changeset/fresh-batch-spans.md | 7 ++ libs/otel/src/middleware/tracing.test.ts | 123 ++++++++++++++++++++++- libs/otel/src/middleware/tracing.ts | 56 ++++++++++- libs/server/src/Server.ts | 41 +++++++- libs/server/src/VirtualHttp.ts | 5 + libs/server/tests/Batching.test.ts | 20 ++++ 6 files changed, 244 insertions(+), 8 deletions(-) create mode 100644 .changeset/fresh-batch-spans.md diff --git a/.changeset/fresh-batch-spans.md b/.changeset/fresh-batch-spans.md new file mode 100644 index 00000000..a1dd4acb --- /dev/null +++ b/.changeset/fresh-batch-spans.md @@ -0,0 +1,7 @@ +--- +'@cleverbrush/otel': patch +'@cleverbrush/server': patch +--- + +Trace batched requests as a parent batch span with child spans for each +sub-request. diff --git a/libs/otel/src/middleware/tracing.test.ts b/libs/otel/src/middleware/tracing.test.ts index 38fc650a..bae9f7ec 100644 --- a/libs/otel/src/middleware/tracing.test.ts +++ b/libs/otel/src/middleware/tracing.test.ts @@ -5,7 +5,17 @@ import { SimpleSpanProcessor } from '@opentelemetry/sdk-trace-base'; import { NodeTracerProvider } from '@opentelemetry/sdk-trace-node'; -import { afterAll, beforeAll, beforeEach, describe, expect, it } from 'vitest'; +import { + afterAll, + afterEach, + beforeAll, + beforeEach, + describe, + expect, + it +} from 'vitest'; +import { endpoint, mapHandlers } from '../../../server/src/Endpoint.js'; +import { createServer, type Server } from '../../../server/src/Server.js'; import { OTEL_SPAN_ITEM_KEY, tracingMiddleware } from './tracing.js'; const exporter = new InMemorySpanExporter(); @@ -24,6 +34,13 @@ afterAll(async () => { beforeEach(() => exporter.reset()); +let server: Server | undefined; + +afterEach(async () => { + await server?.close(); + server = undefined; +}); + function makeCtx(overrides: Partial = {}): any { const items = new Map(); return { @@ -36,6 +53,35 @@ function makeCtx(overrides: Partial = {}): any { }; } +const batchApi = { + items: { + a: endpoint.get('/api/a'), + b: endpoint.get('/api/b') + } +}; + +const batchHandlers = mapHandlers(batchApi, { + items: { + a: () => ({ ok: 'a' }), + b: () => ({ ok: 'b' }) + } +}); + +async function postBatch( + port: number, + requests: Array<{ + method: string; + url: string; + headers?: Record; + }> +): Promise { + return fetch(`http://127.0.0.1:${port}/__batch`, { + method: 'POST', + headers: { 'content-type': 'application/json' }, + body: JSON.stringify({ requests }) + }); +} + describe('tracingMiddleware', () => { it('creates a SERVER span with HTTP semconv attributes', async () => { const mw = tracingMiddleware(); @@ -209,4 +255,79 @@ describe('tracingMiddleware', () => { const ctx = makeCtx(); await expect(mw(ctx, async () => {})).resolves.toBeUndefined(); }); + + it('parents batch sub-request spans under the outer batch request', async () => { + server = await createServer() + .use(tracingMiddleware()) + .useBatching() + .handleAll(batchHandlers) + .listen(0); + + const response = await postBatch(server.address!.port, [ + { method: 'GET', url: '/api/a', headers: {} }, + { method: 'GET', url: '/api/b', headers: {} } + ]); + expect(response.status).toBe(200); + + const spans = exporter.getFinishedSpans(); + expect(spans).toHaveLength(3); + + const batchSpan = spans.find(span => span.name === 'POST /__batch')!; + const childSpans = spans.filter(span => span.name !== 'POST /__batch'); + + expect( + childSpans.map(span => span.attributes['url.path']).sort() + ).toEqual(['/api/a', '/api/b']); + + for (const child of childSpans) { + expect(child.spanContext().traceId).toBe( + batchSpan.spanContext().traceId + ); + expect(child.parentSpanContext?.spanId).toBe( + batchSpan.spanContext().spanId + ); + expect(child.attributes['cleverbrush.batch.size']).toBe(2); + expect(child.attributes['cleverbrush.batch.path']).toBe('/__batch'); + } + expect( + childSpans.map(span => span.attributes['cleverbrush.batch.index']) + ).toEqual([0, 1]); + }); + + it('links sub-request traceparent while keeping the batch span as parent', async () => { + server = await createServer() + .use(tracingMiddleware()) + .useBatching() + .handleAll(batchHandlers) + .listen(0); + + await postBatch(server.address!.port, [ + { + method: 'GET', + url: '/api/a', + headers: { + traceparent: + '00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01' + } + } + ]); + + const spans = exporter.getFinishedSpans(); + const batchSpan = spans.find(span => span.name === 'POST /__batch')!; + const childSpan = spans.find( + span => span.attributes['url.path'] === '/api/a' + )!; + + expect(childSpan.spanContext().traceId).toBe( + batchSpan.spanContext().traceId + ); + expect(childSpan.parentSpanContext?.spanId).toBe( + batchSpan.spanContext().spanId + ); + expect(childSpan.links).toHaveLength(1); + expect(childSpan.links[0]!.context.traceId).toBe( + '0af7651916cd43dd8448eb211c80319c' + ); + expect(childSpan.links[0]!.context.spanId).toBe('b7ad6b7169203331'); + }); }); diff --git a/libs/otel/src/middleware/tracing.ts b/libs/otel/src/middleware/tracing.ts index f0c0fd5f..eed71344 100644 --- a/libs/otel/src/middleware/tracing.ts +++ b/libs/otel/src/middleware/tracing.ts @@ -2,7 +2,9 @@ import { context, propagation, type Span, + type SpanContext, SpanKind, + type SpanOptions, SpanStatusCode, type Tracer, trace @@ -88,6 +90,37 @@ function getEndpointMeta(ctx: any): any | undefined { return items?.get('__endpoint_meta'); } +interface BatchSubrequestMeta { + index: number; + size: number; + path: string; +} + +function getBatchSubrequestMeta(ctx: any): BatchSubrequestMeta | undefined { + const items: Map | undefined = ctx?.items; + const meta = items?.get('__batch_subrequest') as + | Partial + | undefined; + if ( + typeof meta?.index !== 'number' || + typeof meta.size !== 'number' || + typeof meta.path !== 'string' + ) { + return undefined; + } + return { + index: meta.index, + size: meta.size, + path: meta.path + }; +} + +function hasValidTraceparent(headers: Record): boolean { + return /^00-[0-9a-f]{32}-[0-9a-f]{16}-[0-9a-f]{2}$/i.test( + headers.traceparent ?? '' + ); +} + function getRouteTemplate(meta: any): string | undefined { if (!meta) return undefined; const base: string = meta.basePath ?? ''; @@ -171,7 +204,13 @@ export function tracingMiddleware(options?: TracingMiddlewareOptions) { const headers: Record = ctx.headers ?? {}; // Extract inbound trace context (W3C traceparent + baggage). - const parentCtx = propagation.extract(context.active(), headers); + const extractedCtx = propagation.extract(context.active(), headers); + const batchSubrequest = getBatchSubrequestMeta(ctx); + const parentCtx = batchSubrequest ? context.active() : extractedCtx; + const linkedSpanContext: SpanContext | undefined = + batchSubrequest && hasValidTraceparent(headers) + ? trace.getSpanContext(extractedCtx) + : undefined; const meta = getEndpointMeta(ctx); const route = getRouteTemplate(meta); @@ -211,10 +250,23 @@ export function tracingMiddleware(options?: TracingMiddlewareOptions) { if (meta?.operationId) { attributes['cleverbrush.endpoint.operationId'] = meta.operationId; } + if (batchSubrequest) { + attributes['cleverbrush.batch.index'] = batchSubrequest.index; + attributes['cleverbrush.batch.size'] = batchSubrequest.size; + attributes['cleverbrush.batch.path'] = batchSubrequest.path; + } + + const spanOptions: SpanOptions = { + kind: SpanKind.SERVER, + attributes, + ...(linkedSpanContext + ? { links: [{ context: linkedSpanContext }] } + : {}) + }; await tracer.startActiveSpan( spanName, - { kind: SpanKind.SERVER, attributes }, + spanOptions, parentCtx, async (span: Span) => { ctx.items?.set?.(OTEL_SPAN_ITEM_KEY, span); diff --git a/libs/server/src/Server.ts b/libs/server/src/Server.ts index 13dcf23a..9c4aa8f4 100644 --- a/libs/server/src/Server.ts +++ b/libs/server/src/Server.ts @@ -627,6 +627,18 @@ export class Server { const ctx = new RequestContext(req, res, this.#maxBodySize); const urlPath = ctx.url.pathname; const method = ctx.method; + const batchSubrequest = ( + req as http.IncomingMessage & { + __cleverbrushBatchSubrequest?: { + index: number; + size: number; + path: string; + }; + } + ).__cleverbrushBatchSubrequest; + if (batchSubrequest) { + ctx.items.set('__batch_subrequest', batchSubrequest); + } if ( this.#healthcheck && @@ -638,13 +650,26 @@ export class Server { return; } - // Batch endpoint — handled before routing and auth. + // Batch endpoint — it has no endpoint metadata, but should still + // pass through global middleware so observability/logging can wrap + // the physical batch request. if ( this.#batchConfig !== null && method === 'POST' && urlPath === (this.#batchConfig.path ?? '/__batch') ) { - await this.#handleBatchRequest(req, res); + ctx.services = scope.serviceProvider; + + const pipeline = new MiddlewarePipeline(); + for (const mw of this.#globalMiddlewares) { + pipeline.add(mw); + } + + await pipeline.execute(ctx, async () => { + if (ctx.responded) return; + await this.#handleBatchRequest(req, res); + ctx.responded = true; + }); return; } @@ -904,7 +929,8 @@ export class Server { } const execute = async ( - item: BatchSubRequest + item: BatchSubRequest, + index: number ): Promise => { const virtualReq = new VirtualIncomingMessage({ method: (item.method ?? 'GET').toUpperCase(), @@ -912,6 +938,11 @@ export class Server { headers: item.headers ?? {}, body: item.body }); + virtualReq.__cleverbrushBatchSubrequest = { + index, + size: outerBody.requests.length, + path: config.path ?? '/__batch' + }; const virtualRes = new VirtualServerResponse(); await this.#handleRequest( @@ -927,8 +958,8 @@ export class Server { results = await Promise.all(outerBody.requests.map(execute)); } else { results = []; - for (const item of outerBody.requests) { - results.push(await execute(item)); + for (let i = 0; i < outerBody.requests.length; i++) { + results.push(await execute(outerBody.requests[i]!, i)); } } diff --git a/libs/server/src/VirtualHttp.ts b/libs/server/src/VirtualHttp.ts index 96d11f1e..e7d18b47 100644 --- a/libs/server/src/VirtualHttp.ts +++ b/libs/server/src/VirtualHttp.ts @@ -56,6 +56,11 @@ export class VirtualIncomingMessage extends Readable { readonly method: string; readonly url: string; readonly headers: Record; + __cleverbrushBatchSubrequest?: { + index: number; + size: number; + path: string; + }; // Satisfy the `socket` property that IncomingMessage exposes. readonly socket: null = null; diff --git a/libs/server/tests/Batching.test.ts b/libs/server/tests/Batching.test.ts index d062ca04..55c0027a 100644 --- a/libs/server/tests/Batching.test.ts +++ b/libs/server/tests/Batching.test.ts @@ -116,6 +116,26 @@ describe('POST /__batch integration', () => { expect(JSON.parse(responses[1].body)).toMatchObject({ name: 'Alice' }); }); + it('runs global middleware for the outer batch request and sub-requests', async () => { + const paths: string[] = []; + server = await createServer() + .use(async (ctx, next) => { + paths.push(ctx.url.pathname); + await next(); + }) + .useBatching() + .handleAll(handlers) + .listen(0); + port = server.address!.port; + + const { status } = await post(port, '/__batch', { + requests: [{ method: 'GET', url: '/api/todos', headers: {} }] + }); + + expect(status).toBe(200); + expect(paths).toEqual(['/__batch', '/api/todos']); + }); + it('returns 404 sub-response for unknown sub-request route', async () => { ({ server, port } = await makeServer());