Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 25 additions & 6 deletions crates/perry-runtime/src/child_process/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1633,9 +1633,9 @@ pub(super) enum CpStdio {
Fd(i32),
}

fn cp_stdio_kind(value: f64) -> CpStdio {
fn cp_stdio_number_fd(value: f64) -> Option<i32> {
let js_value = JSValue::from_bits(value.to_bits());
let fd = if js_value.is_int32() {
if js_value.is_int32() {
Some(js_value.as_int32())
} else if js_value.is_number() {
let n = js_value.as_number();
Expand All @@ -1646,8 +1646,27 @@ fn cp_stdio_kind(value: f64) -> CpStdio {
}
} else {
None
}
}

fn cp_stdio_stream_fd(value: f64, fd_index: usize) -> Option<i32> {
let expected_stream = match fd_index {
0 => crate::fs::is_fs_stream_instance_value(value, "ReadStream"),
1 | 2 => crate::fs::is_fs_stream_instance_value(value, "WriteStream"),
_ => false,
};
if let Some(fd) = fd {
if !expected_stream {
return None;
}
let fd = cp_get_field(value, b"fd");
cp_stdio_number_fd(fd).filter(|fd| crate::fs::fd_is_registered(*fd))
}

fn cp_stdio_kind(value: f64, fd_index: usize) -> CpStdio {
if let Some(fd) = cp_stdio_number_fd(value) {
return CpStdio::Fd(fd);
}
if let Some(fd) = cp_stdio_stream_fd(value, fd_index) {
return CpStdio::Fd(fd);
}

Expand All @@ -1659,8 +1678,8 @@ fn cp_stdio_kind(value: f64) -> CpStdio {
}

/// Read the deterministic live-stdio subset: `pipe` (default), `ignore`,
/// `inherit`, and numeric fd entries. Custom stream handles intentionally
/// remain in #2555.
/// `inherit`, numeric fd entries, and opened fs stream objects backed by a
/// registered fd.
pub(super) fn cp_read_stdio(opts_val: f64, fds: usize) -> Vec<CpStdio> {
let mut out = vec![CpStdio::Pipe; fds];
if cp_object_ptr(opts_val).is_none() {
Expand All @@ -1671,7 +1690,7 @@ pub(super) fn cp_read_stdio(opts_val: f64, fds: usize) -> Vec<CpStdio> {
if let Some(arr) = cp_array_ptr(stdio) {
let n = crate::array::js_array_length(arr).min(fds as u32);
for i in 0..n {
out[i as usize] = cp_stdio_kind(crate::array::js_array_get_f64(arr, i));
out[i as usize] = cp_stdio_kind(crate::array::js_array_get_f64(arr, i), i as usize);
}
return out;
}
Expand Down
86 changes: 86 additions & 0 deletions test-parity/node-suite/child_process/async/stdio-stream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import { fork, spawn } from "node:child_process";
import * as fs from "node:fs";
import { tmpdir } from "node:os";
import { join } from "node:path";

function slot(value: any): string {
return value === null ? "null" : typeof value;
}

function waitOpen(stream: any): Promise<void> {
if (typeof stream.fd === "number") return Promise.resolve();
return new Promise((resolve, reject) => {
stream.once("open", () => resolve());
stream.once("error", reject);
});
}

function waitClose(child: any): Promise<number | null> {
return new Promise((resolve) => child.on("close", (code: number | null) => resolve(code)));
}

function read(path: string): string {
return fs.readFileSync(path, "utf8");
}

const base = join(tmpdir(), `perry-stdio-stream-${process.pid}`);
const inPath = `${base}-in.txt`;
const outPath = `${base}-out.txt`;
const errPath = `${base}-err.txt`;
const childFile = `${base}-child.js`;
const forkOutPath = `${base}-fork-out.txt`;

fs.writeFileSync(inPath, "stream-stdin");
fs.writeFileSync(outPath, "");
fs.writeFileSync(errPath, "");
fs.writeFileSync(forkOutPath, "");
fs.writeFileSync(
childFile,
[
"process.stdout.write('fork-stream-out');",
"process.on('message', () => { if (process.send) process.send({ ok: true }); process.exit(0); });",
].join(""),
);

const stdin = fs.createReadStream(inPath);
const stdout = fs.createWriteStream(outPath);
const stderr = fs.createWriteStream(errPath);
await Promise.all([waitOpen(stdin), waitOpen(stdout), waitOpen(stderr)]);

const child = spawn("sh", ["-c", "cat; printf stream-err >&2"], {
stdio: [stdin, stdout, stderr],
});
console.log("spawn props:", slot(child.stdin), slot(child.stdout), slot(child.stderr));
console.log("spawn stdio:", child.stdio.map(slot).join(","));
console.log("spawn close:", await waitClose(child));
stdout.end();
stderr.end();
stdin.close();
await Promise.all([
new Promise((resolve) => stdout.on("close", resolve)),
new Promise((resolve) => stderr.on("close", resolve)),
]);
console.log("spawn stdout file:", read(outPath));
console.log("spawn stderr file:", read(errPath));

const forkStdout = fs.createWriteStream(forkOutPath);
await waitOpen(forkStdout);
const forked = fork(childFile, [], { stdio: ["ignore", forkStdout, "ignore", "ipc"] });
console.log("fork props:", slot(forked.stdin), slot(forked.stdout), slot(forked.stderr));
console.log("fork stdio:", forked.stdio.map(slot).join(","));
console.log("fork channel:", typeof forked.channel);
const message: any = await new Promise((resolve) => {
forked.on("message", resolve);
forked.send({ ping: true });
});
console.log("fork ipc:", message.ok);
console.log("fork close:", await waitClose(forked));
forkStdout.end();
await new Promise((resolve) => forkStdout.on("close", resolve));
console.log("fork stdout file:", read(forkOutPath));

for (const path of [inPath, outPath, errPath, childFile, forkOutPath]) {
try {
fs.unlinkSync(path);
} catch {}
}
111 changes: 111 additions & 0 deletions test-parity/node-suite/child_process/sync/sync-stdio-stream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
import { execFileSync, execSync, spawnSync } from "node:child_process";
import * as fs from "node:fs";
import { tmpdir } from "node:os";
import { join } from "node:path";

function slot(value: any): string {
if (value === null) return "null";
if (Buffer.isBuffer(value)) return `buffer:${value.toString("utf8")}`;
return `${typeof value}:${String(value)}`;
}

function waitOpen(stream: any): Promise<void> {
if (typeof stream.fd === "number") return Promise.resolve();
return new Promise((resolve, reject) => {
stream.once("open", () => resolve());
stream.once("error", reject);
});
}

function closeStream(stream: any): Promise<void> {
if (typeof stream.end === "function") {
stream.end();
} else if (typeof stream.close === "function") {
stream.close();
}
return new Promise((resolve) => stream.once("close", () => resolve()));
}

function reportThrow(label: string, action: () => any) {
try {
console.log(`${label} value:`, slot(action()));
} catch (err: any) {
console.log(`${label} status:`, err.status);
console.log(`${label} stdout:`, slot(err.stdout));
console.log(`${label} stderr:`, slot(err.stderr));
console.log(`${label} output:`, err.output.map(slot).join("|"));
}
}

const base = join(tmpdir(), `perry-sync-stdio-stream-${process.pid}`);
const spawnInPath = `${base}-spawn-in.txt`;
const spawnOutPath = `${base}-spawn-out.txt`;
const spawnErrPath = `${base}-spawn-err.txt`;
const execOutPath = `${base}-exec-out.txt`;
const execErrPath = `${base}-exec-err.txt`;
const fileOutPath = `${base}-file-out.txt`;
const fileErrPath = `${base}-file-err.txt`;

fs.writeFileSync(spawnInPath, "sync-stdin");
for (const path of [spawnOutPath, spawnErrPath, execOutPath, execErrPath, fileOutPath, fileErrPath]) {
fs.writeFileSync(path, "");
}

const spawnIn = fs.createReadStream(spawnInPath);
const spawnOut = fs.createWriteStream(spawnOutPath);
const spawnErr = fs.createWriteStream(spawnErrPath);
await Promise.all([waitOpen(spawnIn), waitOpen(spawnOut), waitOpen(spawnErr)]);
const spawnResult = spawnSync("sh", ["-c", "cat; printf sync-err >&2"], {
encoding: "utf8",
stdio: [spawnIn, spawnOut, spawnErr],
});
console.log("spawnSync status:", spawnResult.status);
console.log("spawnSync stdout:", slot(spawnResult.stdout));
console.log("spawnSync stderr:", slot(spawnResult.stderr));
console.log("spawnSync output:", spawnResult.output.map(slot).join("|"));
await Promise.all([closeStream(spawnIn), closeStream(spawnOut), closeStream(spawnErr)]);
console.log("spawnSync stdout file:", fs.readFileSync(spawnOutPath, "utf8"));
console.log("spawnSync stderr file:", fs.readFileSync(spawnErrPath, "utf8"));

const execOut = fs.createWriteStream(execOutPath);
const execErr = fs.createWriteStream(execErrPath);
await Promise.all([waitOpen(execOut), waitOpen(execErr)]);
console.log(
"execSync value:",
slot(
execSync("printf exec-out; printf exec-err >&2", {
encoding: "utf8",
stdio: ["ignore", execOut, execErr],
}),
),
);
await Promise.all([closeStream(execOut), closeStream(execErr)]);
console.log("execSync stdout file:", fs.readFileSync(execOutPath, "utf8"));
console.log("execSync stderr file:", fs.readFileSync(execErrPath, "utf8"));

const fileOut = fs.createWriteStream(fileOutPath);
const fileErr = fs.createWriteStream(fileErrPath);
await Promise.all([waitOpen(fileOut), waitOpen(fileErr)]);
reportThrow("execFileSync throw", () =>
execFileSync("sh", ["-c", "printf file-out; printf file-err >&2; exit 6"], {
encoding: "utf8",
stdio: ["ignore", fileOut, fileErr],
}),
);
await Promise.all([closeStream(fileOut), closeStream(fileErr)]);
console.log("execFileSync stdout file:", fs.readFileSync(fileOutPath, "utf8"));
console.log("execFileSync stderr file:", fs.readFileSync(fileErrPath, "utf8"));

for (const path of [
spawnInPath,
spawnOutPath,
spawnErrPath,
execOutPath,
execErrPath,
fileOutPath,
fileErrPath,
]) {
try {
fs.unlinkSync(path);
} catch {}
}