Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 23 additions & 14 deletions lib/nodejs/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,28 @@ defmodule NodeJS.Supervisor do
timeout = Keyword.get(opts, :timeout, @timeout)
esm = Keyword.get(opts, :esm, module |> elem(0) |> to_string |> String.ends_with?(".mjs"))

func = fn pid ->
try do
GenServer.call(pid, {module, args, [binary: binary, timeout: timeout, esm: esm]}, timeout)
catch
:exit, {:timeout, _} ->
{:error, "Call timed out."}

:exit, error ->
{:error, {:node_js_worker_exit, error}}
end
end

pool_name = supervisor_pool(opts)
:poolboy.transaction(pool_name, func, timeout)
pid = :poolboy.checkout(pool_name, true, timeout)

try do
result = GenServer.call(pid, {module, args, [binary: binary, esm: esm]}, timeout)
:poolboy.checkin(pool_name, pid)
result
catch
:exit, {:timeout, _} ->
# Kill the stuck worker — don't check it back in. The worker's Node.js
# runtime may be wedged (infinite loop, blocked event loop), and
# returning it to the pool would hand the next caller a broken runtime.
# Poolboy links to workers, so it sees the EXIT and spawns a fresh
# replacement on its own; checking in would race that cleanup and
# temporarily re-add a dead pid to the pool.
Process.exit(pid, :kill)
{:error, :timeout}

:exit, error ->
:poolboy.checkin(pool_name, pid)
{:error, {:node_js_worker_exit, error}}
end
end

defp supervisor_name(opts) do
Expand All @@ -69,7 +77,7 @@ defmodule NodeJS.Supervisor do
run_in_transaction(module, args, opts)
catch
:exit, {:timeout, _} ->
{:error, "Call timed out."}
{:error, :timeout}
end
end

Expand All @@ -78,6 +86,7 @@ defmodule NodeJS.Supervisor do
|> call(args, opts)
|> case do
{:ok, result} -> result
{:error, :timeout} -> raise NodeJS.Error, message: "Call timed out."
{:error, message} -> raise NodeJS.Error, message: message
end
end
Expand Down
18 changes: 7 additions & 11 deletions lib/nodejs/worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,14 @@ defmodule NodeJS.Worker do
]
end

defp get_response(data, timeout, port, expected_uid) do
defp get_response(data, port, expected_uid) do
receive do
{^port, {:data, {flag, chunk}}} ->
data = data ++ chunk

case flag do
:noeol ->
get_response(data, timeout, port, expected_uid)
get_response(data, port, expected_uid)

:eol ->
case data do
Expand All @@ -87,19 +87,16 @@ defmodule NodeJS.Worker do
{:ok, response_data}

{_stale_uid, _response_data} ->
# Response from a different (likely timed-out) request — discard it
get_response(~c"", timeout, port, expected_uid)
get_response(~c"", port, expected_uid)
end

_ ->
get_response(~c"", timeout, port, expected_uid)
get_response(~c"", port, expected_uid)
end
end

{^port, {:exit_status, status}} when status != 0 ->
{:error, {:exit, status}}
after
timeout -> {:error, :timeout}
end
end

Expand All @@ -123,7 +120,6 @@ defmodule NodeJS.Worker do
@doc false
def handle_call({module, args, opts}, _from, %{port: port, uid_counter: uid_counter} = state)
when is_tuple(module) do
timeout = Keyword.get(opts, :timeout)
binary = Keyword.get(opts, :binary)
esm = Keyword.get(opts, :esm, false)
uid = Integer.to_string(uid_counter)
Expand All @@ -132,7 +128,7 @@ defmodule NodeJS.Worker do

state = %{state | uid_counter: uid_counter + 1}

case get_response(~c"", timeout, port, uid) do
case get_response(~c"", port, uid) do
{:ok, response} ->
decoded_response =
response
Expand All @@ -141,8 +137,8 @@ defmodule NodeJS.Worker do

{:reply, decoded_response, state}

{:error, :timeout} ->
{:reply, {:error, :timeout}, state}
{:error, _} = error ->
{:reply, error, state}
end
end

Expand Down
49 changes: 41 additions & 8 deletions test/nodejs_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,10 @@ defmodule NodeJS.Test do
assert_receive :received_timeout_3, 10
assert_receive :received_timeout_4, 10

assert {:error, "Call timed out."} = Task.await(task1)
assert {:error, "Call timed out."} = Task.await(task2)
assert {:error, "Call timed out."} = Task.await(task3)
assert {:error, "Call timed out."} = Task.await(task4)
assert {:error, :timeout} = Task.await(task1)
assert {:error, :timeout} = Task.await(task2)
assert {:error, :timeout} = Task.await(task3)
assert {:error, :timeout} = Task.await(task4)

# We should still get an answer here, before the timeout
assert {:ok, 1115} = NodeJS.call("slow-async-echo", [1115, 1])
Expand All @@ -190,7 +190,7 @@ defmodule NodeJS.Test do

# Send a call that takes 300ms but times out after 50ms.
# After timeout, the worker is free but the Node.js response is still pending.
assert {:error, "Call timed out."} =
assert {:error, :timeout} =
NodeJS.call("slow-async-echo", [9999, 300],
timeout: 50,
name: NodeJS.RaceTest
Expand All @@ -207,9 +207,42 @@ defmodule NodeJS.Test do
end
end

describe "CPU-bound JS timing out" do
test "doesn't leave the worker pool permanently wedged" do
# A single-worker pool makes the failure mode unambiguous: if the timed-out
# call doesn't tear down the worker, the follow-up has nowhere to run.
path = __ENV__.file |> Path.dirname() |> Path.join("js")

start_supervised!(
Supervisor.child_spec(
{NodeJS.Supervisor, path: path, name: NodeJS.WedgedTest, pool_size: 1},
id: NodeJS.WedgedTest
)
)

# block-event-loop busy-waits for 10s, long enough that a wedged Node
# runtime could not possibly service the follow-up call below in time.
# The call is aborted at 50ms; the worker must die so its Node process
# dies with it, freeing the pool for a fresh worker.
assert {:error, :timeout} =
NodeJS.call("block-event-loop", [10_000],
timeout: 50,
name: NodeJS.WedgedTest
)

# Must succeed on a freshly spawned worker. If it does not, it means it received
# broken runtime from poolboy.
assert {:ok, 42} =
NodeJS.call("slow-async-echo", [42, 1],
timeout: 2_000,
name: NodeJS.WedgedTest
)
end
end

describe "overriding call timeout" do
test "works, and you can tell because the slow function will time out" do
assert {:error, "Call timed out."} = NodeJS.call("slow-async-echo", [1111], timeout: 0)
assert {:error, :timeout} = NodeJS.call("slow-async-echo", [1111], timeout: 0)
assert_raise NodeJS.Error, fn -> NodeJS.call!("slow-async-echo", [1111], timeout: 0) end
assert {:ok, 1111} = NodeJS.call("slow-async-echo", [1111])
end
Expand All @@ -223,9 +256,9 @@ defmodule NodeJS.Test do

describe "Implementation details shouldn't leak:" do
test "Timeouts do not send stray messages to calling process" do
assert {:error, "Call timed out."} = NodeJS.call("slow-async-echo", [1111], timeout: 0)
assert {:error, :timeout} = NodeJS.call("slow-async-echo", [1111], timeout: 0)

refute_receive {_ref, {:error, "Call timed out."}}, 50
refute_receive {_ref, {:error, :timeout}}, 50
end

test "Crashes do not bring down the calling process" do
Expand Down