-
Notifications
You must be signed in to change notification settings - Fork 2
feat: add TURN-TCP support with server-driven config #116
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
4568496
a3875e9
ba2dcb0
ec9452f
a52d12c
60a57b8
1f1f721
6b41b1f
a12fd86
33974e4
14d25b3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -11,9 +11,10 @@ import type { | |
| PromptAckMessage, | ||
| SessionIdMessage, | ||
| SetImageAckMessage, | ||
| TurnConfigMessage, | ||
| } from "./types"; | ||
|
|
||
| const ICE_SERVERS: RTCIceServer[] = [{ urls: "stun:stun.l.google.com:19302" }]; | ||
| const DEFAULT_ICE_SERVERS: RTCIceServer[] = [{ urls: "stun:stun.l.google.com:19302" }]; | ||
| const AVATAR_SETUP_TIMEOUT_MS = 30_000; // 30 seconds | ||
|
|
||
| interface ConnectionCallbacks { | ||
|
|
@@ -28,13 +29,17 @@ interface ConnectionCallbacks { | |
| initialPrompt?: { text: string; enhance?: boolean }; | ||
| logger?: Logger; | ||
| onDiagnostic?: DiagnosticEmitter; | ||
| iceServers?: RTCIceServer[]; | ||
| expectTurnConfig?: boolean; | ||
| forceRelay?: boolean; | ||
| } | ||
|
|
||
| type WsMessageEvents = { | ||
| promptAck: PromptAckMessage; | ||
| setImageAck: SetImageAckMessage; | ||
| sessionId: SessionIdMessage; | ||
| generationTick: GenerationTickMessage; | ||
| turnConfig: TurnConfigMessage; | ||
| }; | ||
|
|
||
| const noopDiagnostic: DiagnosticEmitter = () => {}; | ||
|
|
@@ -46,6 +51,7 @@ export class WebRTCConnection { | |
| private connectionReject: ((error: Error) => void) | null = null; | ||
| private logger: Logger; | ||
| private emitDiagnostic: DiagnosticEmitter; | ||
| private turnServers: RTCIceServer[] = []; | ||
| state: ConnectionState = "disconnected"; | ||
| websocketMessagesEmitter = mitt<WsMessageEvents>(); | ||
| constructor(private callbacks: ConnectionCallbacks = {}) { | ||
|
|
@@ -159,6 +165,22 @@ export class WebRTCConnection { | |
| }); | ||
| } | ||
|
|
||
| // Phase 2.5: Wait for turn_config if not yet received. | ||
| // Only wait when the server is expected to send TURN config (iceTransportPolicy was set). | ||
| // turn_config arrives during Phase 2 but may race with set_image_ack. | ||
| if (this.callbacks.expectTurnConfig && this.turnServers.length === 0) { | ||
| let turnHandler: (() => void) | null = null; | ||
| await Promise.race([ | ||
| new Promise<void>((resolve) => { | ||
| turnHandler = () => resolve(); | ||
| this.websocketMessagesEmitter.on("turnConfig", turnHandler); | ||
| }), | ||
| new Promise<void>((resolve) => setTimeout(resolve, 2000)), | ||
| connectAbort, | ||
| ]); | ||
| if (turnHandler) this.websocketMessagesEmitter.off("turnConfig", turnHandler); | ||
| } | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Event handler leak when connectAbort fires during Phase 2.5Low Severity If Reviewed by Cursor Bugbot for commit 1f1f721. Configure here. |
||
|
|
||
| // Phase 3: WebRTC handshake | ||
| const handshakeStart = performance.now(); | ||
| await this.setupNewPeerConnection(); | ||
|
|
@@ -254,6 +276,12 @@ export class WebRTCConnection { | |
| return; | ||
| } | ||
|
|
||
| if (msg.type === "turn_config") { | ||
| this.turnServers = [{ urls: msg.urls, username: msg.username, credential: msg.credential }]; | ||
| this.websocketMessagesEmitter.emit("turnConfig", msg); | ||
| return; | ||
| } | ||
|
|
||
| // All other messages require peer connection | ||
| if (!this.pc) return; | ||
|
|
||
|
|
@@ -411,7 +439,11 @@ export class WebRTCConnection { | |
| }); | ||
| this.pc.close(); | ||
| } | ||
| this.pc = new RTCPeerConnection({ iceServers: ICE_SERVERS }); | ||
| const iceServers: RTCIceServer[] = [ | ||
| ...(this.callbacks.iceServers ?? DEFAULT_ICE_SERVERS), | ||
| ...this.turnServers, | ||
| ]; | ||
| this.pc = new RTCPeerConnection({ iceServers, ...(this.callbacks.forceRelay && { iceTransportPolicy: "relay" }) }); | ||
| this.setState("connecting"); | ||
|
|
||
| if (this.localStream) { | ||
|
|
@@ -557,6 +589,7 @@ export class WebRTCConnection { | |
| this.ws?.close(); | ||
| this.ws = null; | ||
| this.localStream = null; | ||
| this.turnServers = []; | ||
| this.setState("disconnected"); | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,176 @@ | ||
| declare const __DECART_API_KEY__: string; | ||
| declare const __WEBRTC_BASE_URL__: string; | ||
|
|
||
| import { | ||
| createDecartClient, | ||
| type CustomModelDefinition, | ||
| type DecartSDKError, | ||
| type SelectedCandidatePairEvent, | ||
| } from "@decartai/sdk"; | ||
| import { beforeAll, describe, expect, it } from "vitest"; | ||
|
|
||
| function createSyntheticStream(fps: number, width: number, height: number): MediaStream { | ||
| const canvas = document.createElement("canvas"); | ||
| canvas.width = width; | ||
| canvas.height = height; | ||
| return canvas.captureStream(fps); | ||
| } | ||
|
|
||
| const BIT_INVERT_MODEL: CustomModelDefinition = { | ||
| name: "bit_invert", | ||
| urlPath: "/ws", | ||
| fps: 25, | ||
| width: 512, | ||
| height: 512, | ||
| }; | ||
|
|
||
| const TURN_ICE_SERVERS: RTCIceServer[] = [ | ||
| { urls: "stun:stun.l.google.com:19302" }, | ||
| { urls: "turn:127.0.0.1:3478?transport=tcp", username: "turn", credential: "turn" }, | ||
| ]; | ||
|
|
||
| const TIMEOUT = 2 * 60 * 1000; // 2 minutes | ||
|
|
||
| /** | ||
| * Wraps the global RTCPeerConnection so every new instance uses | ||
| * the given iceTransportPolicy. Returns a cleanup function to restore. | ||
| */ | ||
| function overrideIceTransportPolicy(policy: RTCIceTransportPolicy): () => void { | ||
| const OriginalPC = globalThis.RTCPeerConnection; | ||
| globalThis.RTCPeerConnection = class extends OriginalPC { | ||
| constructor(config?: RTCConfiguration) { | ||
| super({ ...config, iceTransportPolicy: policy }); | ||
| } | ||
| } as typeof RTCPeerConnection; | ||
| return () => { | ||
| globalThis.RTCPeerConnection = OriginalPC; | ||
| }; | ||
| } | ||
|
|
||
| /** | ||
| * Collects the selectedCandidatePair diagnostic from a realtime client. | ||
| * The event is buffered during connect() and flushed via setTimeout(0) | ||
| * after connect() resolves, so registering immediately catches it. | ||
| */ | ||
| function collectSelectedCandidatePair( | ||
| realtimeClient: { on: (event: "diagnostic", handler: (e: { name: string; data: unknown }) => void) => void }, | ||
| ): Promise<SelectedCandidatePairEvent | null> { | ||
| return new Promise((resolve) => { | ||
| const handler = (event: { name: string; data: unknown }) => { | ||
| if (event.name === "selectedCandidatePair") { | ||
| resolve(event.data as SelectedCandidatePairEvent); | ||
| } | ||
| }; | ||
| realtimeClient.on("diagnostic", handler); | ||
| // Fallback: if the event was already emitted before we registered, resolve after a delay | ||
| setTimeout(() => resolve(null), 5000); | ||
| }); | ||
| } | ||
|
|
||
| describe("TURN-TCP E2E Tests", { timeout: TIMEOUT, retry: 2 }, () => { | ||
| let apiKey: string; | ||
| let webrtcBaseUrl: string; | ||
|
|
||
| beforeAll(() => { | ||
| apiKey = __DECART_API_KEY__; | ||
| webrtcBaseUrl = __WEBRTC_BASE_URL__; | ||
| if (!apiKey) { | ||
| throw new Error( | ||
| "DECART_API_KEY environment variable not set. Run with: DECART_API_KEY=your_key pnpm test:e2e:turn-tcp", | ||
| ); | ||
| } | ||
| if (!webrtcBaseUrl) { | ||
| throw new Error( | ||
| "WEBRTC_BASE_URL environment variable not set. Set it to your local k8s WebSocket URL.", | ||
| ); | ||
| } | ||
| }); | ||
|
|
||
| // Requires server-side aioice TURN-TCP allocation to work (server must produce relay candidates). | ||
| // Skip until server-side TURN candidate generation is verified. | ||
| it.skip("TURN-TCP relay only (iceTransportPolicy=relay)", async () => { | ||
| const restore = overrideIceTransportPolicy("relay"); | ||
|
|
||
| try { | ||
| const client = createDecartClient({ apiKey, realtimeBaseUrl: webrtcBaseUrl }); | ||
| const stream = createSyntheticStream(BIT_INVERT_MODEL.fps, BIT_INVERT_MODEL.width, BIT_INVERT_MODEL.height); | ||
|
|
||
| let remoteStreamReceived = false; | ||
|
|
||
| const realtimeClient = await client.realtime.connect(stream, { | ||
| model: BIT_INVERT_MODEL, | ||
| onRemoteStream: () => { | ||
| remoteStreamReceived = true; | ||
| }, | ||
| iceServers: TURN_ICE_SERVERS, | ||
| }); | ||
|
|
||
| // Register diagnostic listener immediately - buffered events flush on next macrotask | ||
| const candidatePairPromise = collectSelectedCandidatePair(realtimeClient); | ||
|
|
||
| const errors: DecartSDKError[] = []; | ||
| realtimeClient.on("error", (err) => errors.push(err)); | ||
|
|
||
| try { | ||
| expect(["connected", "generating"]).toContain(realtimeClient.getConnectionState()); | ||
| expect(realtimeClient.sessionId).toBeTruthy(); | ||
| expect(remoteStreamReceived).toBe(true); | ||
| expect(errors).toEqual([]); | ||
|
|
||
| // With relay-only policy, the selected candidate must be a relay (TURN) | ||
| const pair = await candidatePairPromise; | ||
| if (pair) { | ||
| expect(pair.local.candidateType).toBe("relay"); | ||
| } | ||
| } finally { | ||
| realtimeClient.disconnect(); | ||
| for (const track of stream.getTracks()) track.stop(); | ||
| } | ||
|
|
||
| expect(realtimeClient.getConnectionState()).toBe("disconnected"); | ||
| } finally { | ||
| restore(); | ||
| } | ||
| }); | ||
|
|
||
| it("Both UDP + TURN available (default iceTransportPolicy=all)", async () => { | ||
| const client = createDecartClient({ apiKey, realtimeBaseUrl: webrtcBaseUrl }); | ||
| const stream = createSyntheticStream(BIT_INVERT_MODEL.fps, BIT_INVERT_MODEL.width, BIT_INVERT_MODEL.height); | ||
|
|
||
| let remoteStreamReceived = false; | ||
|
|
||
| const realtimeClient = await client.realtime.connect(stream, { | ||
| model: BIT_INVERT_MODEL, | ||
| onRemoteStream: () => { | ||
| remoteStreamReceived = true; | ||
| }, | ||
| iceServers: TURN_ICE_SERVERS, | ||
| }); | ||
|
|
||
| // Register diagnostic listener immediately | ||
| const candidatePairPromise = collectSelectedCandidatePair(realtimeClient); | ||
|
|
||
| const errors: DecartSDKError[] = []; | ||
| realtimeClient.on("error", (err) => errors.push(err)); | ||
|
|
||
| try { | ||
| expect(["connected", "generating"]).toContain(realtimeClient.getConnectionState()); | ||
| expect(realtimeClient.sessionId).toBeTruthy(); | ||
| expect(remoteStreamReceived).toBe(true); | ||
| expect(errors).toEqual([]); | ||
|
|
||
| // With default policy, ICE should prefer direct UDP over relay. | ||
| // In Docker/NAT environments, the local candidate may appear as "prflx" | ||
| // (peer-reflexive) rather than "host", but it should NOT be "relay". | ||
| const pair = await candidatePairPromise; | ||
| if (pair) { | ||
| expect(pair.local.candidateType).not.toBe("relay"); | ||
| } | ||
| } finally { | ||
| realtimeClient.disconnect(); | ||
| for (const track of stream.getTracks()) track.stop(); | ||
| } | ||
|
|
||
| expect(realtimeClient.getConnectionState()).toBe("disconnected"); | ||
| }); | ||
| }); |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,18 @@ | ||
| import { playwright } from "@vitest/browser-playwright"; | ||
| import { defineConfig } from "vitest/config"; | ||
|
|
||
| export default defineConfig({ | ||
| define: { | ||
| __DECART_API_KEY__: JSON.stringify(process.env.DECART_API_KEY), | ||
| __WEBRTC_BASE_URL__: JSON.stringify(process.env.WEBRTC_BASE_URL || "wss://slim-bit-invert.dev.localhost"), | ||
| }, | ||
| test: { | ||
| include: ["tests/e2e-turn-tcp.test.ts"], | ||
| browser: { | ||
| enabled: true, | ||
| provider: playwright(), | ||
| headless: true, | ||
| instances: [{ browser: "chromium" }], | ||
| }, | ||
| }, | ||
| }); |


There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unnecessary 2-second wait when iceTransportPolicy is "udp"
Medium Severity
expectTurnConfig: !!options.iceTransportPolicyevaluates totruefor all policy values, including"udp". WheniceTransportPolicyis"udp", the server is unlikely to send aturn_configmessage (TURN is a TCP/relay fallback), so Phase 2.5 always hits the full 2-second timeout before proceeding. The condition needs to exclude"udp"to match the stated goal of "zero latency impact" when TURN isn't requested.Additional Locations (1)
packages/sdk/src/realtime/webrtc-connection.ts#L169-L181Reviewed by Cursor Bugbot for commit 1f1f721. Configure here.