Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/fresh-batch-spans.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@cleverbrush/otel': patch
'@cleverbrush/server': patch
---

Trace batched requests as a parent batch span with child spans for each
sub-request.
123 changes: 122 additions & 1 deletion libs/otel/src/middleware/tracing.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,17 @@ import {
SimpleSpanProcessor
} from '@opentelemetry/sdk-trace-base';
import { NodeTracerProvider } from '@opentelemetry/sdk-trace-node';
import { afterAll, beforeAll, beforeEach, describe, expect, it } from 'vitest';
import {
afterAll,
afterEach,
beforeAll,
beforeEach,
describe,
expect,
it
} from 'vitest';
import { endpoint, mapHandlers } from '../../../server/src/Endpoint.js';
import { createServer, type Server } from '../../../server/src/Server.js';
import { OTEL_SPAN_ITEM_KEY, tracingMiddleware } from './tracing.js';

const exporter = new InMemorySpanExporter();
Expand All @@ -24,6 +34,13 @@ afterAll(async () => {

beforeEach(() => exporter.reset());

let server: Server | undefined;

afterEach(async () => {
await server?.close();
server = undefined;
});

function makeCtx(overrides: Partial<any> = {}): any {
const items = new Map<string, unknown>();
return {
Expand All @@ -36,6 +53,35 @@ function makeCtx(overrides: Partial<any> = {}): any {
};
}

const batchApi = {
items: {
a: endpoint.get('/api/a'),
b: endpoint.get('/api/b')
}
};

const batchHandlers = mapHandlers(batchApi, {
items: {
a: () => ({ ok: 'a' }),
b: () => ({ ok: 'b' })
}
});

async function postBatch(
port: number,
requests: Array<{
method: string;
url: string;
headers?: Record<string, string>;
}>
): Promise<Response> {
return fetch(`http://127.0.0.1:${port}/__batch`, {
method: 'POST',
headers: { 'content-type': 'application/json' },
body: JSON.stringify({ requests })
});
}

describe('tracingMiddleware', () => {
it('creates a SERVER span with HTTP semconv attributes', async () => {
const mw = tracingMiddleware();
Expand Down Expand Up @@ -209,4 +255,79 @@ describe('tracingMiddleware', () => {
const ctx = makeCtx();
await expect(mw(ctx, async () => {})).resolves.toBeUndefined();
});

it('parents batch sub-request spans under the outer batch request', async () => {
server = await createServer()
.use(tracingMiddleware())
.useBatching()
.handleAll(batchHandlers)
.listen(0);

const response = await postBatch(server.address!.port, [
{ method: 'GET', url: '/api/a', headers: {} },
{ method: 'GET', url: '/api/b', headers: {} }
]);
expect(response.status).toBe(200);

const spans = exporter.getFinishedSpans();
expect(spans).toHaveLength(3);

const batchSpan = spans.find(span => span.name === 'POST /__batch')!;
const childSpans = spans.filter(span => span.name !== 'POST /__batch');

expect(
childSpans.map(span => span.attributes['url.path']).sort()
).toEqual(['/api/a', '/api/b']);

for (const child of childSpans) {
expect(child.spanContext().traceId).toBe(
batchSpan.spanContext().traceId
);
expect(child.parentSpanContext?.spanId).toBe(
batchSpan.spanContext().spanId
);
expect(child.attributes['cleverbrush.batch.size']).toBe(2);
expect(child.attributes['cleverbrush.batch.path']).toBe('/__batch');
}
expect(
childSpans.map(span => span.attributes['cleverbrush.batch.index'])
).toEqual([0, 1]);
});

it('links sub-request traceparent while keeping the batch span as parent', async () => {
server = await createServer()
.use(tracingMiddleware())
.useBatching()
.handleAll(batchHandlers)
.listen(0);

await postBatch(server.address!.port, [
{
method: 'GET',
url: '/api/a',
headers: {
traceparent:
'00-0af7651916cd43dd8448eb211c80319c-b7ad6b7169203331-01'
}
}
]);

const spans = exporter.getFinishedSpans();
const batchSpan = spans.find(span => span.name === 'POST /__batch')!;
const childSpan = spans.find(
span => span.attributes['url.path'] === '/api/a'
)!;

expect(childSpan.spanContext().traceId).toBe(
batchSpan.spanContext().traceId
);
expect(childSpan.parentSpanContext?.spanId).toBe(
batchSpan.spanContext().spanId
);
expect(childSpan.links).toHaveLength(1);
expect(childSpan.links[0]!.context.traceId).toBe(
'0af7651916cd43dd8448eb211c80319c'
);
expect(childSpan.links[0]!.context.spanId).toBe('b7ad6b7169203331');
});
});
56 changes: 54 additions & 2 deletions libs/otel/src/middleware/tracing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ import {
context,
propagation,
type Span,
type SpanContext,
SpanKind,
type SpanOptions,
SpanStatusCode,
type Tracer,
trace
Expand Down Expand Up @@ -88,6 +90,37 @@ function getEndpointMeta(ctx: any): any | undefined {
return items?.get('__endpoint_meta');
}

interface BatchSubrequestMeta {
index: number;
size: number;
path: string;
}

function getBatchSubrequestMeta(ctx: any): BatchSubrequestMeta | undefined {
const items: Map<string, unknown> | undefined = ctx?.items;
const meta = items?.get('__batch_subrequest') as
| Partial<BatchSubrequestMeta>
| undefined;
if (
typeof meta?.index !== 'number' ||
typeof meta.size !== 'number' ||
typeof meta.path !== 'string'
) {
return undefined;
}
return {
index: meta.index,
size: meta.size,
path: meta.path
};
}

function hasValidTraceparent(headers: Record<string, string>): boolean {
return /^00-[0-9a-f]{32}-[0-9a-f]{16}-[0-9a-f]{2}$/i.test(
headers.traceparent ?? ''
);
}

function getRouteTemplate(meta: any): string | undefined {
if (!meta) return undefined;
const base: string = meta.basePath ?? '';
Expand Down Expand Up @@ -171,7 +204,13 @@ export function tracingMiddleware(options?: TracingMiddlewareOptions) {
const headers: Record<string, string> = ctx.headers ?? {};

// Extract inbound trace context (W3C traceparent + baggage).
const parentCtx = propagation.extract(context.active(), headers);
const extractedCtx = propagation.extract(context.active(), headers);
const batchSubrequest = getBatchSubrequestMeta(ctx);
const parentCtx = batchSubrequest ? context.active() : extractedCtx;
const linkedSpanContext: SpanContext | undefined =
batchSubrequest && hasValidTraceparent(headers)
? trace.getSpanContext(extractedCtx)
: undefined;

const meta = getEndpointMeta(ctx);
const route = getRouteTemplate(meta);
Expand Down Expand Up @@ -211,10 +250,23 @@ export function tracingMiddleware(options?: TracingMiddlewareOptions) {
if (meta?.operationId) {
attributes['cleverbrush.endpoint.operationId'] = meta.operationId;
}
if (batchSubrequest) {
attributes['cleverbrush.batch.index'] = batchSubrequest.index;
attributes['cleverbrush.batch.size'] = batchSubrequest.size;
attributes['cleverbrush.batch.path'] = batchSubrequest.path;
}

const spanOptions: SpanOptions = {
kind: SpanKind.SERVER,
attributes,
...(linkedSpanContext
? { links: [{ context: linkedSpanContext }] }
: {})
};

await tracer.startActiveSpan(
spanName,
{ kind: SpanKind.SERVER, attributes },
spanOptions,
parentCtx,
async (span: Span) => {
ctx.items?.set?.(OTEL_SPAN_ITEM_KEY, span);
Expand Down
41 changes: 36 additions & 5 deletions libs/server/src/Server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,18 @@ export class Server {
const ctx = new RequestContext(req, res, this.#maxBodySize);
const urlPath = ctx.url.pathname;
const method = ctx.method;
const batchSubrequest = (
req as http.IncomingMessage & {
__cleverbrushBatchSubrequest?: {
index: number;
size: number;
path: string;
};
}
).__cleverbrushBatchSubrequest;
if (batchSubrequest) {
ctx.items.set('__batch_subrequest', batchSubrequest);
}

if (
this.#healthcheck &&
Expand All @@ -638,13 +650,26 @@ export class Server {
return;
}

// Batch endpoint — handled before routing and auth.
// Batch endpoint — it has no endpoint metadata, but should still
// pass through global middleware so observability/logging can wrap
// the physical batch request.
if (
this.#batchConfig !== null &&
method === 'POST' &&
urlPath === (this.#batchConfig.path ?? '/__batch')
) {
await this.#handleBatchRequest(req, res);
ctx.services = scope.serviceProvider;

const pipeline = new MiddlewarePipeline();
for (const mw of this.#globalMiddlewares) {
pipeline.add(mw);
}

await pipeline.execute(ctx, async () => {
if (ctx.responded) return;
await this.#handleBatchRequest(req, res);
ctx.responded = true;
});
return;
}

Expand Down Expand Up @@ -904,14 +929,20 @@ export class Server {
}

const execute = async (
item: BatchSubRequest
item: BatchSubRequest,
index: number
): Promise<BatchSubResponse> => {
const virtualReq = new VirtualIncomingMessage({
method: (item.method ?? 'GET').toUpperCase(),
url: item.url,
headers: item.headers ?? {},
body: item.body
});
virtualReq.__cleverbrushBatchSubrequest = {
index,
size: outerBody.requests.length,
path: config.path ?? '/__batch'
};
const virtualRes = new VirtualServerResponse();

await this.#handleRequest(
Expand All @@ -927,8 +958,8 @@ export class Server {
results = await Promise.all(outerBody.requests.map(execute));
} else {
results = [];
for (const item of outerBody.requests) {
results.push(await execute(item));
for (let i = 0; i < outerBody.requests.length; i++) {
results.push(await execute(outerBody.requests[i]!, i));
}
}

Expand Down
5 changes: 5 additions & 0 deletions libs/server/src/VirtualHttp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@ export class VirtualIncomingMessage extends Readable {
readonly method: string;
readonly url: string;
readonly headers: Record<string, string>;
__cleverbrushBatchSubrequest?: {
index: number;
size: number;
path: string;
};
// Satisfy the `socket` property that IncomingMessage exposes.
readonly socket: null = null;

Expand Down
20 changes: 20 additions & 0 deletions libs/server/tests/Batching.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,26 @@ describe('POST /__batch integration', () => {
expect(JSON.parse(responses[1].body)).toMatchObject({ name: 'Alice' });
});

it('runs global middleware for the outer batch request and sub-requests', async () => {
const paths: string[] = [];
server = await createServer()
.use(async (ctx, next) => {
paths.push(ctx.url.pathname);
await next();
})
.useBatching()
.handleAll(handlers)
.listen(0);
port = server.address!.port;

const { status } = await post(port, '/__batch', {
requests: [{ method: 'GET', url: '/api/todos', headers: {} }]
});

expect(status).toBe(200);
expect(paths).toEqual(['/__batch', '/api/todos']);
});

it('returns 404 sub-response for unknown sub-request route', async () => {
({ server, port } = await makeServer());

Expand Down
Loading