From bfecc89cfff02301e2bd25dda9112fee7b80c08c Mon Sep 17 00:00:00 2001 From: Andrew DiZenzo Date: Thu, 4 Jun 2026 13:51:35 +0000 Subject: [PATCH] fix(child_process): support stream-backed stdio --- crates/perry-runtime/src/child_process/mod.rs | 31 ++++- .../child_process/async/stdio-stream.ts | 86 ++++++++++++++ .../child_process/sync/sync-stdio-stream.ts | 111 ++++++++++++++++++ 3 files changed, 222 insertions(+), 6 deletions(-) create mode 100644 test-parity/node-suite/child_process/async/stdio-stream.ts create mode 100644 test-parity/node-suite/child_process/sync/sync-stdio-stream.ts diff --git a/crates/perry-runtime/src/child_process/mod.rs b/crates/perry-runtime/src/child_process/mod.rs index 803ec4c14e..0fab99b44a 100644 --- a/crates/perry-runtime/src/child_process/mod.rs +++ b/crates/perry-runtime/src/child_process/mod.rs @@ -1633,9 +1633,9 @@ pub(super) enum CpStdio { Fd(i32), } -fn cp_stdio_kind(value: f64) -> CpStdio { +fn cp_stdio_number_fd(value: f64) -> Option { let js_value = JSValue::from_bits(value.to_bits()); - let fd = if js_value.is_int32() { + if js_value.is_int32() { Some(js_value.as_int32()) } else if js_value.is_number() { let n = js_value.as_number(); @@ -1646,8 +1646,27 @@ fn cp_stdio_kind(value: f64) -> CpStdio { } } else { None + } +} + +fn cp_stdio_stream_fd(value: f64, fd_index: usize) -> Option { + let expected_stream = match fd_index { + 0 => crate::fs::is_fs_stream_instance_value(value, "ReadStream"), + 1 | 2 => crate::fs::is_fs_stream_instance_value(value, "WriteStream"), + _ => false, }; - if let Some(fd) = fd { + if !expected_stream { + return None; + } + let fd = cp_get_field(value, b"fd"); + cp_stdio_number_fd(fd).filter(|fd| crate::fs::fd_is_registered(*fd)) +} + +fn cp_stdio_kind(value: f64, fd_index: usize) -> CpStdio { + if let Some(fd) = cp_stdio_number_fd(value) { + return CpStdio::Fd(fd); + } + if let Some(fd) = cp_stdio_stream_fd(value, fd_index) { return CpStdio::Fd(fd); } @@ -1659,8 +1678,8 @@ fn cp_stdio_kind(value: f64) -> CpStdio { } /// Read the deterministic live-stdio subset: `pipe` (default), `ignore`, -/// `inherit`, and numeric fd entries. Custom stream handles intentionally -/// remain in #2555. +/// `inherit`, numeric fd entries, and opened fs stream objects backed by a +/// registered fd. pub(super) fn cp_read_stdio(opts_val: f64, fds: usize) -> Vec { let mut out = vec![CpStdio::Pipe; fds]; if cp_object_ptr(opts_val).is_none() { @@ -1671,7 +1690,7 @@ pub(super) fn cp_read_stdio(opts_val: f64, fds: usize) -> Vec { if let Some(arr) = cp_array_ptr(stdio) { let n = crate::array::js_array_length(arr).min(fds as u32); for i in 0..n { - out[i as usize] = cp_stdio_kind(crate::array::js_array_get_f64(arr, i)); + out[i as usize] = cp_stdio_kind(crate::array::js_array_get_f64(arr, i), i as usize); } return out; } diff --git a/test-parity/node-suite/child_process/async/stdio-stream.ts b/test-parity/node-suite/child_process/async/stdio-stream.ts new file mode 100644 index 0000000000..4c69127e4a --- /dev/null +++ b/test-parity/node-suite/child_process/async/stdio-stream.ts @@ -0,0 +1,86 @@ +import { fork, spawn } from "node:child_process"; +import * as fs from "node:fs"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; + +function slot(value: any): string { + return value === null ? "null" : typeof value; +} + +function waitOpen(stream: any): Promise { + if (typeof stream.fd === "number") return Promise.resolve(); + return new Promise((resolve, reject) => { + stream.once("open", () => resolve()); + stream.once("error", reject); + }); +} + +function waitClose(child: any): Promise { + return new Promise((resolve) => child.on("close", (code: number | null) => resolve(code))); +} + +function read(path: string): string { + return fs.readFileSync(path, "utf8"); +} + +const base = join(tmpdir(), `perry-stdio-stream-${process.pid}`); +const inPath = `${base}-in.txt`; +const outPath = `${base}-out.txt`; +const errPath = `${base}-err.txt`; +const childFile = `${base}-child.js`; +const forkOutPath = `${base}-fork-out.txt`; + +fs.writeFileSync(inPath, "stream-stdin"); +fs.writeFileSync(outPath, ""); +fs.writeFileSync(errPath, ""); +fs.writeFileSync(forkOutPath, ""); +fs.writeFileSync( + childFile, + [ + "process.stdout.write('fork-stream-out');", + "process.on('message', () => { if (process.send) process.send({ ok: true }); process.exit(0); });", + ].join(""), +); + +const stdin = fs.createReadStream(inPath); +const stdout = fs.createWriteStream(outPath); +const stderr = fs.createWriteStream(errPath); +await Promise.all([waitOpen(stdin), waitOpen(stdout), waitOpen(stderr)]); + +const child = spawn("sh", ["-c", "cat; printf stream-err >&2"], { + stdio: [stdin, stdout, stderr], +}); +console.log("spawn props:", slot(child.stdin), slot(child.stdout), slot(child.stderr)); +console.log("spawn stdio:", child.stdio.map(slot).join(",")); +console.log("spawn close:", await waitClose(child)); +stdout.end(); +stderr.end(); +stdin.close(); +await Promise.all([ + new Promise((resolve) => stdout.on("close", resolve)), + new Promise((resolve) => stderr.on("close", resolve)), +]); +console.log("spawn stdout file:", read(outPath)); +console.log("spawn stderr file:", read(errPath)); + +const forkStdout = fs.createWriteStream(forkOutPath); +await waitOpen(forkStdout); +const forked = fork(childFile, [], { stdio: ["ignore", forkStdout, "ignore", "ipc"] }); +console.log("fork props:", slot(forked.stdin), slot(forked.stdout), slot(forked.stderr)); +console.log("fork stdio:", forked.stdio.map(slot).join(",")); +console.log("fork channel:", typeof forked.channel); +const message: any = await new Promise((resolve) => { + forked.on("message", resolve); + forked.send({ ping: true }); +}); +console.log("fork ipc:", message.ok); +console.log("fork close:", await waitClose(forked)); +forkStdout.end(); +await new Promise((resolve) => forkStdout.on("close", resolve)); +console.log("fork stdout file:", read(forkOutPath)); + +for (const path of [inPath, outPath, errPath, childFile, forkOutPath]) { + try { + fs.unlinkSync(path); + } catch {} +} diff --git a/test-parity/node-suite/child_process/sync/sync-stdio-stream.ts b/test-parity/node-suite/child_process/sync/sync-stdio-stream.ts new file mode 100644 index 0000000000..8a003c9d43 --- /dev/null +++ b/test-parity/node-suite/child_process/sync/sync-stdio-stream.ts @@ -0,0 +1,111 @@ +import { execFileSync, execSync, spawnSync } from "node:child_process"; +import * as fs from "node:fs"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; + +function slot(value: any): string { + if (value === null) return "null"; + if (Buffer.isBuffer(value)) return `buffer:${value.toString("utf8")}`; + return `${typeof value}:${String(value)}`; +} + +function waitOpen(stream: any): Promise { + if (typeof stream.fd === "number") return Promise.resolve(); + return new Promise((resolve, reject) => { + stream.once("open", () => resolve()); + stream.once("error", reject); + }); +} + +function closeStream(stream: any): Promise { + if (typeof stream.end === "function") { + stream.end(); + } else if (typeof stream.close === "function") { + stream.close(); + } + return new Promise((resolve) => stream.once("close", () => resolve())); +} + +function reportThrow(label: string, action: () => any) { + try { + console.log(`${label} value:`, slot(action())); + } catch (err: any) { + console.log(`${label} status:`, err.status); + console.log(`${label} stdout:`, slot(err.stdout)); + console.log(`${label} stderr:`, slot(err.stderr)); + console.log(`${label} output:`, err.output.map(slot).join("|")); + } +} + +const base = join(tmpdir(), `perry-sync-stdio-stream-${process.pid}`); +const spawnInPath = `${base}-spawn-in.txt`; +const spawnOutPath = `${base}-spawn-out.txt`; +const spawnErrPath = `${base}-spawn-err.txt`; +const execOutPath = `${base}-exec-out.txt`; +const execErrPath = `${base}-exec-err.txt`; +const fileOutPath = `${base}-file-out.txt`; +const fileErrPath = `${base}-file-err.txt`; + +fs.writeFileSync(spawnInPath, "sync-stdin"); +for (const path of [spawnOutPath, spawnErrPath, execOutPath, execErrPath, fileOutPath, fileErrPath]) { + fs.writeFileSync(path, ""); +} + +const spawnIn = fs.createReadStream(spawnInPath); +const spawnOut = fs.createWriteStream(spawnOutPath); +const spawnErr = fs.createWriteStream(spawnErrPath); +await Promise.all([waitOpen(spawnIn), waitOpen(spawnOut), waitOpen(spawnErr)]); +const spawnResult = spawnSync("sh", ["-c", "cat; printf sync-err >&2"], { + encoding: "utf8", + stdio: [spawnIn, spawnOut, spawnErr], +}); +console.log("spawnSync status:", spawnResult.status); +console.log("spawnSync stdout:", slot(spawnResult.stdout)); +console.log("spawnSync stderr:", slot(spawnResult.stderr)); +console.log("spawnSync output:", spawnResult.output.map(slot).join("|")); +await Promise.all([closeStream(spawnIn), closeStream(spawnOut), closeStream(spawnErr)]); +console.log("spawnSync stdout file:", fs.readFileSync(spawnOutPath, "utf8")); +console.log("spawnSync stderr file:", fs.readFileSync(spawnErrPath, "utf8")); + +const execOut = fs.createWriteStream(execOutPath); +const execErr = fs.createWriteStream(execErrPath); +await Promise.all([waitOpen(execOut), waitOpen(execErr)]); +console.log( + "execSync value:", + slot( + execSync("printf exec-out; printf exec-err >&2", { + encoding: "utf8", + stdio: ["ignore", execOut, execErr], + }), + ), +); +await Promise.all([closeStream(execOut), closeStream(execErr)]); +console.log("execSync stdout file:", fs.readFileSync(execOutPath, "utf8")); +console.log("execSync stderr file:", fs.readFileSync(execErrPath, "utf8")); + +const fileOut = fs.createWriteStream(fileOutPath); +const fileErr = fs.createWriteStream(fileErrPath); +await Promise.all([waitOpen(fileOut), waitOpen(fileErr)]); +reportThrow("execFileSync throw", () => + execFileSync("sh", ["-c", "printf file-out; printf file-err >&2; exit 6"], { + encoding: "utf8", + stdio: ["ignore", fileOut, fileErr], + }), +); +await Promise.all([closeStream(fileOut), closeStream(fileErr)]); +console.log("execFileSync stdout file:", fs.readFileSync(fileOutPath, "utf8")); +console.log("execFileSync stderr file:", fs.readFileSync(fileErrPath, "utf8")); + +for (const path of [ + spawnInPath, + spawnOutPath, + spawnErrPath, + execOutPath, + execErrPath, + fileOutPath, + fileErrPath, +]) { + try { + fs.unlinkSync(path); + } catch {} +}