Skip to content

Commit dd722a9

Browse files
committed
Add Bedrock streaming integration tests for reasoning
Add integration tests for invoke-with-response-stream and converse-stream endpoints that verify thinking block events appear before text content events when reasoning is present. Includes a binary event stream frame decoder for parsing the AWS Event Stream wire format in tests.
1 parent 2e9a35e commit dd722a9

1 file changed

Lines changed: 236 additions & 0 deletions

File tree

src/__tests__/reasoning-all-providers.test.ts

Lines changed: 236 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { describe, it, expect, beforeAll, afterAll } from "vitest";
22
import * as http from "node:http";
3+
import { crc32 } from "node:zlib";
34
import type { Fixture } from "../types.js";
45
import { createServer, type ServerInstance } from "../server.js";
56
import { buildBedrockStreamTextEvents } from "../bedrock.js";
@@ -45,6 +46,98 @@ function post(
4546
});
4647
}
4748

49+
function postRaw(
50+
path: string,
51+
body: unknown,
52+
): Promise<{ status: number; headers: http.IncomingHttpHeaders; body: Buffer }> {
53+
return new Promise((resolve, reject) => {
54+
const data = JSON.stringify(body);
55+
const parsed = new URL(baseUrl);
56+
const req = http.request(
57+
{
58+
hostname: parsed.hostname,
59+
port: parsed.port,
60+
path,
61+
method: "POST",
62+
headers: {
63+
"Content-Type": "application/json",
64+
"Content-Length": Buffer.byteLength(data),
65+
},
66+
},
67+
(res) => {
68+
const chunks: Buffer[] = [];
69+
res.on("data", (c: Buffer) => chunks.push(c));
70+
res.on("end", () => {
71+
resolve({
72+
status: res.statusCode ?? 0,
73+
headers: res.headers,
74+
body: Buffer.concat(chunks),
75+
});
76+
});
77+
},
78+
);
79+
req.on("error", reject);
80+
req.write(data);
81+
req.end();
82+
});
83+
}
84+
85+
/**
86+
* Decode AWS Event Stream binary frames from a Buffer.
87+
* Returns an array of { eventType, payload } objects.
88+
*/
89+
function decodeEventStreamFrames(buf: Buffer): Array<{ eventType: string; payload: object }> {
90+
const frames: Array<{ eventType: string; payload: object }> = [];
91+
let offset = 0;
92+
93+
while (offset < buf.length) {
94+
if (offset + 12 > buf.length) break;
95+
96+
const totalLength = buf.readUInt32BE(offset);
97+
const headersLength = buf.readUInt32BE(offset + 4);
98+
const preludeCrc = buf.readUInt32BE(offset + 8);
99+
100+
// Verify prelude CRC
101+
const computedPreludeCrc = crc32(buf.subarray(offset, offset + 8));
102+
if (computedPreludeCrc >>> 0 !== preludeCrc) {
103+
throw new Error("Prelude CRC mismatch");
104+
}
105+
106+
// Parse headers
107+
const headersStart = offset + 12;
108+
const headersEnd = headersStart + headersLength;
109+
const headers: Record<string, string> = {};
110+
let hOff = headersStart;
111+
while (hOff < headersEnd) {
112+
const nameLen = buf.readUInt8(hOff);
113+
hOff += 1;
114+
const name = buf.subarray(hOff, hOff + nameLen).toString("utf8");
115+
hOff += nameLen;
116+
hOff += 1; // skip header type byte (7 = STRING)
117+
const valueLen = buf.readUInt16BE(hOff);
118+
hOff += 2;
119+
const value = buf.subarray(hOff, hOff + valueLen).toString("utf8");
120+
hOff += valueLen;
121+
headers[name] = value;
122+
}
123+
124+
// Parse payload
125+
const payloadStart = headersEnd;
126+
const payloadEnd = offset + totalLength - 4; // minus message CRC
127+
const payloadBuf = buf.subarray(payloadStart, payloadEnd);
128+
const payload = payloadBuf.length > 0 ? JSON.parse(payloadBuf.toString("utf8")) : {};
129+
130+
frames.push({
131+
eventType: headers[":event-type"] ?? "",
132+
payload,
133+
});
134+
135+
offset += totalLength;
136+
}
137+
138+
return frames;
139+
}
140+
48141
interface SSEEvent {
49142
type?: string;
50143
choices?: {
@@ -325,6 +418,149 @@ describe("POST /model/{id}/converse (reasoning non-streaming)", () => {
325418
});
326419
});
327420

421+
// ─── Bedrock InvokeModel Streaming: Reasoning ─────────────────────────────────
422+
423+
describe("POST /model/{id}/invoke-with-response-stream (reasoning streaming)", () => {
424+
it("emits thinking block events before text content events", async () => {
425+
const res = await postRaw(
426+
`/model/anthropic.claude-3-sonnet-20240229-v1:0/invoke-with-response-stream`,
427+
{
428+
messages: [{ role: "user", content: [{ type: "text", text: "think" }] }],
429+
max_tokens: 1024,
430+
anthropic_version: "bedrock-2023-05-31",
431+
},
432+
);
433+
434+
expect(res.status).toBe(200);
435+
const frames = decodeEventStreamFrames(res.body);
436+
const eventTypes = frames.map((f) => f.eventType);
437+
438+
// Should start with messageStart
439+
expect(eventTypes[0]).toBe("messageStart");
440+
441+
// Find thinking and text block starts
442+
const thinkingStartIdx = frames.findIndex(
443+
(f) =>
444+
f.eventType === "contentBlockStart" &&
445+
(f.payload as { start?: { type?: string } }).start?.type === "thinking",
446+
);
447+
const textStartIdx = frames.findIndex(
448+
(f) =>
449+
f.eventType === "contentBlockStart" &&
450+
(f.payload as { start?: { type?: string } }).start?.type === undefined,
451+
);
452+
453+
expect(thinkingStartIdx).toBeGreaterThan(0);
454+
expect(textStartIdx).toBeGreaterThan(thinkingStartIdx);
455+
456+
// Verify thinking content
457+
const thinkingDeltas = frames.filter(
458+
(f) =>
459+
f.eventType === "contentBlockDelta" &&
460+
(f.payload as { delta?: { type?: string } }).delta?.type === "thinking_delta",
461+
);
462+
const fullThinking = thinkingDeltas
463+
.map((f) => (f.payload as { delta: { thinking: string } }).delta.thinking)
464+
.join("");
465+
expect(fullThinking).toBe("Let me think step by step about this problem.");
466+
467+
// Verify text content
468+
const textDeltas = frames.filter(
469+
(f) =>
470+
f.eventType === "contentBlockDelta" &&
471+
(f.payload as { delta?: { type?: string } }).delta?.type === "text_delta",
472+
);
473+
const fullText = textDeltas
474+
.map((f) => (f.payload as { delta: { text: string } }).delta.text)
475+
.join("");
476+
expect(fullText).toBe("The answer is 42.");
477+
478+
// Should end with messageStop
479+
expect(eventTypes[eventTypes.length - 1]).toBe("messageStop");
480+
});
481+
482+
it("no thinking block when reasoning is absent", async () => {
483+
const res = await postRaw(
484+
`/model/anthropic.claude-3-sonnet-20240229-v1:0/invoke-with-response-stream`,
485+
{
486+
messages: [{ role: "user", content: [{ type: "text", text: "plain" }] }],
487+
max_tokens: 1024,
488+
anthropic_version: "bedrock-2023-05-31",
489+
},
490+
);
491+
492+
expect(res.status).toBe(200);
493+
const frames = decodeEventStreamFrames(res.body);
494+
495+
const thinkingDeltas = frames.filter(
496+
(f) =>
497+
f.eventType === "contentBlockDelta" &&
498+
(f.payload as { delta?: { type?: string } }).delta?.type === "thinking_delta",
499+
);
500+
expect(thinkingDeltas).toHaveLength(0);
501+
});
502+
});
503+
504+
// ─── Bedrock Converse Streaming: Reasoning ────────────────────────────────────
505+
506+
describe("POST /model/{id}/converse-stream (reasoning streaming)", () => {
507+
it("emits thinking block events before text content events", async () => {
508+
const res = await postRaw(`/model/anthropic.claude-3-sonnet-20240229-v1:0/converse-stream`, {
509+
messages: [{ role: "user", content: [{ text: "think" }] }],
510+
});
511+
512+
expect(res.status).toBe(200);
513+
const frames = decodeEventStreamFrames(res.body);
514+
const eventTypes = frames.map((f) => f.eventType);
515+
516+
expect(eventTypes[0]).toBe("messageStart");
517+
518+
// Find thinking and text block starts
519+
const thinkingStartIdx = frames.findIndex(
520+
(f) =>
521+
f.eventType === "contentBlockStart" &&
522+
(f.payload as { start?: { type?: string } }).start?.type === "thinking",
523+
);
524+
const textStartIdx = frames.findIndex(
525+
(f) =>
526+
f.eventType === "contentBlockStart" &&
527+
(f.payload as { start?: { type?: string } }).start?.type === undefined,
528+
);
529+
530+
expect(thinkingStartIdx).toBeGreaterThan(0);
531+
expect(textStartIdx).toBeGreaterThan(thinkingStartIdx);
532+
533+
// Verify reasoning content appears in the stream
534+
const thinkingDeltas = frames.filter(
535+
(f) =>
536+
f.eventType === "contentBlockDelta" &&
537+
(f.payload as { delta?: { type?: string } }).delta?.type === "thinking_delta",
538+
);
539+
const fullThinking = thinkingDeltas
540+
.map((f) => (f.payload as { delta: { thinking: string } }).delta.thinking)
541+
.join("");
542+
expect(fullThinking).toBe("Let me think step by step about this problem.");
543+
544+
expect(eventTypes[eventTypes.length - 1]).toBe("messageStop");
545+
});
546+
547+
it("no thinking block when reasoning is absent", async () => {
548+
const res = await postRaw(`/model/anthropic.claude-3-sonnet-20240229-v1:0/converse-stream`, {
549+
messages: [{ role: "user", content: [{ text: "plain" }] }],
550+
});
551+
552+
expect(res.status).toBe(200);
553+
const frames = decodeEventStreamFrames(res.body);
554+
555+
const thinkingDeltas = frames.filter(
556+
(f) =>
557+
f.eventType === "contentBlockDelta" &&
558+
(f.payload as { delta?: { type?: string } }).delta?.type === "thinking_delta",
559+
);
560+
expect(thinkingDeltas).toHaveLength(0);
561+
});
562+
});
563+
328564
// ─── Ollama /api/chat: Reasoning ────────────────────────────────────────────
329565

330566
function parseNDJSON(body: string): object[] {

0 commit comments

Comments
 (0)