Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 128 additions & 24 deletions src/runtime/node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ import { getVenvBinDir, getVenvPythonExe } from '../utils/runtime.js';
import { globalCache } from '../utils/cache.js';

import { BridgeProtocol, type BridgeProtocolOptions } from './bridge-protocol.js';
import { BridgeProtocolError } from './errors.js';
import { BridgeExecutionError, BridgeProtocolError } from './errors.js';
import { ProcessIO } from './process-io.js';
import { PooledTransport } from './pooled-transport.js';
import type { CodecOptions } from './safe-codec.js';
import { PROTOCOL_ID } from './transport.js';
import type { PooledWorker } from './worker-pool.js';

// =============================================================================
Expand Down Expand Up @@ -73,7 +74,8 @@ export interface NodeBridgeOptions {

/** Commands to run on each process at startup for warming up. */
warmupCommands?: Array<
{ module: string; functionName: string; args?: unknown[] } | { method: string; params: unknown } // Legacy format for backwards compatibility
| { module: string; functionName: string; args?: unknown[] }
| { method: string; params: unknown } // Legacy shape preserved so runtime can surface a migration error
>;

// ===========================================================================
Expand Down Expand Up @@ -119,9 +121,13 @@ interface ResolvedOptions {
enableCache: boolean;
env: Record<string, string | undefined>;
codec?: CodecOptions;
warmupCommands: Array<
{ module: string; functionName: string; args?: unknown[] } | { method: string; params: unknown }
>;
warmupCommands: WarmupCommand[];
}

interface WarmupCommand {
module: string;
functionName: string;
args?: unknown[];
}

// =============================================================================
Expand Down Expand Up @@ -190,6 +196,53 @@ function assertSafeEnvOverrideKey(key: string): void {
}
}

function normalizeWarmupCommands(commands: NodeBridgeOptions['warmupCommands']): WarmupCommand[] {
if (commands === undefined) {
return [];
}
if (!Array.isArray(commands)) {
throw new BridgeProtocolError('warmupCommands must be an array when provided');
}

const warmups = commands;
return warmups.map((command, index) => {
if (!command || typeof command !== 'object' || Array.isArray(command)) {
throw new BridgeProtocolError(
`Invalid warmup command at index ${index + 1}: expected { module, functionName, args? }`
);
}

const candidate = command as Record<string, unknown>;
if ('method' in candidate || 'params' in candidate) {
throw new BridgeProtocolError(
`Invalid warmup command at index ${index + 1}: legacy { method, params } format is no longer supported. Use { module, functionName, args? }.`
);
}

if (typeof candidate.module !== 'string' || candidate.module.trim().length === 0) {
throw new BridgeProtocolError(
`Invalid warmup command at index ${index + 1}: "module" must be a non-empty string`
);
}
if (typeof candidate.functionName !== 'string' || candidate.functionName.trim().length === 0) {
throw new BridgeProtocolError(
`Invalid warmup command at index ${index + 1}: "functionName" must be a non-empty string`
);
}
if (candidate.args !== undefined && !Array.isArray(candidate.args)) {
throw new BridgeProtocolError(
`Invalid warmup command at index ${index + 1}: "args" must be an array when provided`
);
}

return {
module: candidate.module,
functionName: candidate.functionName,
args: candidate.args as unknown[] | undefined,
};
});
}

// =============================================================================
// NODE BRIDGE
// =============================================================================
Expand Down Expand Up @@ -250,6 +303,8 @@ export class NodeBridge extends BridgeProtocol {
const maxProcesses = options.maxProcesses ?? 1;
const minProcesses = Math.min(options.minProcesses ?? 1, maxProcesses);

const warmupCommands = normalizeWarmupCommands(options.warmupCommands);

const resolvedOptions: ResolvedOptions = {
minProcesses,
maxProcesses,
Expand All @@ -264,7 +319,7 @@ export class NodeBridge extends BridgeProtocol {
enableCache: options.enableCache ?? false,
env: options.env ?? {},
codec: options.codec,
warmupCommands: options.warmupCommands ?? [],
warmupCommands,
};

// Build environment for ProcessIO
Expand Down Expand Up @@ -485,8 +540,8 @@ export class NodeBridge extends BridgeProtocol {
* Simple request ID generator for warmup commands.
*/
let warmupRequestId = 0;
function generateWarmupId(): string {
return `warmup_${++warmupRequestId}_${Date.now()}`;
function generateWarmupId(): number {
return ++warmupRequestId;
}

/**
Expand All @@ -496,32 +551,81 @@ function generateWarmupId(): string {
* bypassing the pool to ensure each worker gets warmed up individually.
*/
function createWarmupCallback(
warmupCommands: Array<
{ module: string; functionName: string; args?: unknown[] } | { method: string; params: unknown }
>,
warmupCommands: WarmupCommand[],
timeoutMs: number
): (worker: PooledWorker) => Promise<void> {
return async (worker: PooledWorker) => {
for (const cmd of warmupCommands) {
for (const [index, cmd] of warmupCommands.entries()) {
const commandLabel = `${cmd.module}.${cmd.functionName}`;
const requestId = generateWarmupId();
let message: string;
try {
// Handle both new and legacy warmup command formats
if ('module' in cmd && 'functionName' in cmd) {
// Build the protocol message
const message = JSON.stringify({
id: generateWarmupId(),
type: 'call',
message = JSON.stringify({
id: requestId,
protocol: PROTOCOL_ID,
method: 'call',
params: {
module: cmd.module,
functionName: cmd.functionName,
args: cmd.args ?? [],
kwargs: {},
});
},
});
} catch (error) {
throw new BridgeExecutionError(
`Warmup command #${index + 1} (${commandLabel}) failed to encode request: ${error instanceof Error ? error.message : String(error)}`,
{ cause: error instanceof Error ? error : undefined }
);
}

// Send directly to this worker's transport
await worker.transport.send(message, timeoutMs);
let response: string;
try {
response = await worker.transport.send(message, timeoutMs);
} catch (error) {
throw new BridgeExecutionError(
`Warmup command #${index + 1} (${commandLabel}) failed to send: ${error instanceof Error ? error.message : String(error)}`,
{ cause: error instanceof Error ? error : undefined }
);
}

let parsed: unknown;
try {
parsed = JSON.parse(response);
} catch (error) {
throw new BridgeExecutionError(
`Warmup command #${index + 1} (${commandLabel}) returned invalid JSON response`,
{ cause: error instanceof Error ? error : undefined }
);
}

if (!parsed || typeof parsed !== 'object' || Array.isArray(parsed)) {
throw new BridgeExecutionError(
`Warmup command #${index + 1} (${commandLabel}) returned malformed response envelope for request ${requestId}`
);
}

const envelope = parsed as { result?: unknown; error?: unknown };
if ('error' in envelope && envelope.error !== undefined && envelope.error !== null) {
const errorPayload = envelope.error;
if (errorPayload && typeof errorPayload === 'object' && !Array.isArray(errorPayload)) {
const err = errorPayload as { type?: unknown; message?: unknown };
const errType = typeof err.type === 'string' ? err.type : 'Error';
const errMessage =
typeof err.message === 'string' ? err.message : 'Unknown warmup error';
throw new BridgeExecutionError(
`Warmup command #${index + 1} (${commandLabel}) failed: ${errType}: ${errMessage}`
);
}
// Legacy format { method, params } is ignored as it's not supported
} catch {
// Ignore warmup errors - they're not critical

throw new BridgeExecutionError(
`Warmup command #${index + 1} (${commandLabel}) failed with malformed error payload`
);
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

if (!('result' in envelope)) {
throw new BridgeExecutionError(
`Warmup command #${index + 1} (${commandLabel}) returned malformed response envelope for request ${requestId}`
);
}
}
};
Expand Down
18 changes: 13 additions & 5 deletions src/runtime/worker-pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -409,13 +409,21 @@ export class WorkerPool extends BoundedContext {
inFlightCount: 0,
};

this.workers.push(worker);

// Call onWorkerReady callback if provided
if (this.options.onWorkerReady) {
await this.options.onWorkerReady(worker);
try {
// Call onWorkerReady callback if provided
if (this.options.onWorkerReady) {
await this.options.onWorkerReady(worker);
}
} catch (error) {
// Ensure partially initialized workers do not leak when warmup fails.
await transport.dispose().catch(() => {
// Ignore disposal failures during warmup failure handling.
});
throw error;
}

this.workers.push(worker);

return worker;
}

Expand Down
28 changes: 26 additions & 2 deletions test/optimized-node.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { existsSync } from 'node:fs';
// OptimizedNodeBridge is now an alias for NodeBridge with pool configuration
import { NodeBridge as OptimizedNodeBridge } from '../src/runtime/node.js';
import { isNodejs, getPythonExecutableName } from '../src/utils/runtime.js';
import { BridgeCodecError } from '../src/runtime/errors.js';
import { BridgeCodecError, BridgeProtocolError } from '../src/runtime/errors.js';

const describeNodeOnly = isNodejs() ? describe : describe.skip;
const BRIDGE_SCRIPT = 'runtime/python_bridge.py';
Expand Down Expand Up @@ -71,11 +71,35 @@ describe('OptimizedNodeBridge', () => {

it('should accept warmup commands', () => {
bridge = new OptimizedNodeBridge({
warmupCommands: [{ method: 'import', params: { module: 'os' } }],
warmupCommands: [{ module: 'math', functionName: 'sqrt', args: [16] }],
});
expect(bridge).toBeInstanceOf(OptimizedNodeBridge);
});

it('should reject legacy warmup command format', () => {
const createBridge = (): OptimizedNodeBridge =>
new OptimizedNodeBridge({
warmupCommands: [{ method: 'import', params: { module: 'os' } }],
});

expect(createBridge).toThrow(BridgeProtocolError);
expect(createBridge).toThrow(/legacy \{ method, params \} format is no longer supported/i);
});

it('should reject non-array warmupCommands', () => {
const createBridge = (): OptimizedNodeBridge =>
new OptimizedNodeBridge({
warmupCommands: {
module: 'math',
functionName: 'sqrt',
args: [16],
} as unknown as Array<{ module: string; functionName: string; args?: unknown[] }>,
});

expect(createBridge).toThrow(BridgeProtocolError);
expect(createBridge).toThrow(/warmupCommands must be an array/i);
});

it('should accept custom environment variables', () => {
bridge = new OptimizedNodeBridge({
env: {
Expand Down
Loading
Loading