Skip to content

Commit 6b7fa82

Browse files
committed
Address review findings: comment accuracy, test hardening, trio reap pin
Source and docs (comment/docstring-level; one dead-code removal): - Correct the shutdown shield comment: a native task.cancel() delivered while cleanup is in progress can abort it regardless of count; the consumed-at-the-yield arithmetic only covered cancellation that initiated teardown. - Scope the pipe-gated process.wait() claims to asyncio on Python 3.11+ (3.10 and trio resolve on process exit alone), in the _wait_for_process_exit docstring and the migration guide. - Add sunset notes to the cpython#106749 tracer workarounds (3.11-only; revert when 3.11 support is dropped). - Document stdio_client's raised exceptions (Raises: section). - Document the Job Object pre-assignment window in create_windows_process and at the assignment call site; document the _process_jobs WeakKeyDictionary's two load-bearing invariants (PyHANDLE values, identity-hashed weakref keys); fix a wrong comment about the CloseHandle failure path in _maybe_assign_process_to_job. - Drop FallbackProcess.stdin_raw/stdout_raw (write-only attributes). Tests: - Fix the inert leak canary in the OSError spawn-failure test: the pytest.raises ExceptionInfo pinned the leaked streams across gc.collect(); drop it before collecting (mutation-verified). - Correct the EPERM escalation test's pre-fix description (the old code fell back to a leader-only kill, leaking other group members - it did not hang forever) and trim an unasserted claim from the write-after-death docstring. - Add a CRLF-framed message to the chunk-reframing test, pinning the trailing-CR tolerance Windows servers rely on, on every platform. - Pin the grace wait's returncode read (what reaps the leader's zombie on trio) with a read-counting stub, parametrized over asyncio and trio; deleting the read fails the test on both backends. - Add the missing width-justification comment to one fail_after(10.0).
1 parent fb960d9 commit 6b7fa82

4 files changed

Lines changed: 124 additions & 32 deletions

File tree

docs/migration.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,10 @@ child processes the server leaves behind are no longer killed on POSIX — their
120120
lifetime is the server's business. The old behavior was a side effect of a shutdown
121121
wait gated on the stdio pipes closing rather than on process exit: a child holding
122122
an inherited pipe made a well-behaved server look hung, so its whole process tree
123-
was killed. A server that does not exit within the grace period is still terminated
123+
was killed. (That gating is an asyncio behavior specific to Python 3.11+ — on
124+
Python 3.10 and the trio backend the old wait already resolved on process exit, so
125+
the spurious kill never fired there.) A server that does not exit within the grace
126+
period is still terminated
124127
along with its entire process group. On Windows, children stay in the server's Job
125128
Object and are still killed at shutdown — now deterministically when the job handle
126129
is closed, rather than whenever the handle happened to be garbage-collected.

src/mcp/client/stdio.py

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,12 @@ class StdioServerParameters(BaseModel):
126126
async def stdio_client(server: StdioServerParameters, errlog: TextIO = sys.stderr):
127127
"""Client transport for stdio: this will connect to a server by spawning a
128128
process and communicating with it over stdin/stdout.
129+
130+
Raises:
131+
OSError: If the server process cannot be spawned (for example, the command
132+
does not exist or is not executable).
133+
ValueError: If the spawn parameters are invalid (for example, an embedded
134+
NUL byte in the command or an argument).
129135
"""
130136
read_stream: MemoryObjectReceiveStream[SessionMessage | Exception]
131137
read_stream_writer: MemoryObjectSendStream[SessionMessage | Exception]
@@ -168,7 +174,8 @@ async def stdio_client(server: StdioServerParameters, errlog: TextIO = sys.stder
168174
# Shutdown's final cancellation targets these instead of the task group's own
169175
# scope: cancelling the host scope would deliver the cancellation by throwing
170176
# through the caller's suspended frames, and Python 3.11's tracer loses
171-
# coverage events after such a throw() traversal (python/cpython#106749).
177+
# coverage events after such a throw() traversal (python/cpython#106749;
178+
# 3.11-only — see the sunset note in `_wait_for_process_exit`).
172179
# These scope exactly the work the pipe tasks own.
173180
reader_scope = anyio.CancelScope()
174181
writer_scope = anyio.CancelScope()
@@ -261,9 +268,9 @@ async def stdin_writer() -> None:
261268
# being cancelled — a cancellation that skipped it would leak the server
262269
# process (and its children) and could block forever on the way out.
263270
# Every wait inside the shield is time-bounded. The shield holds against
264-
# anyio-level cancellation and one native task.cancel(); a second native
265-
# cancel delivered mid-cleanup can still abort it — there is no
266-
# backend-neutral way to refuse repeated native cancellation.
271+
# anyio-level cancellation; a native task.cancel() delivered while the
272+
# cleanup is in progress can still abort it — there is no backend-neutral
273+
# way to refuse native cancellation.
267274
with anyio.CancelScope(shield=True):
268275
# Let the writer hand any message the transport already accepted to
269276
# the server's stdin before that stdin closes; a zero-buffer send
@@ -377,19 +384,22 @@ def _close_subprocess_transport(process: Process | FallbackProcess) -> None:
377384
async def _wait_for_process_exit(process: Process | FallbackProcess, timeout: float) -> bool:
378385
"""Wait for the process itself to die, returning whether it did within `timeout`.
379386
380-
Deliberately does not use `process.wait()`: on the asyncio backend that resolves
381-
only once the process has exited *and* every one of its pipes has closed — and
382-
pipes are inherited by the server's own children, so a well-behaved server that
383-
exits instantly but leaves a background child alive would be misclassified as
384-
hung and get its whole tree terminated. `returncode` reflects process death
385-
alone.
387+
Deliberately does not use `process.wait()`: on asyncio under Python 3.11+ it
388+
resolves only once the process has exited *and* every one of its pipes has
389+
closed (3.10 and trio resolve on exit alone) — and pipes are inherited by the
390+
server's own children, so a well-behaved server that exits instantly but leaves
391+
a background child alive would be misclassified as hung and get its whole tree
392+
terminated. `returncode` reflects process death alone on every backend.
386393
"""
387394
# Implemented as a plain deadline loop rather than `anyio.move_on_after()`: a
388395
# cancel scope's deadline fires by throwing a cancellation through every frame
389396
# suspended in the await chain, including the caller's, and Python 3.11's tracer
390397
# loses coverage events in a frame resumed after such a throw() traversal
391398
# (python/cpython#106749). With no cancel scope, the timeout path completes a
392-
# normal `sleep()` and returns, so no frame is ever thrown through.
399+
# normal `sleep()` and returns, so no frame is ever thrown through. The tracer
400+
# bug is 3.11-only (fixed in 3.12, wontfix on 3.11): revert this and the other
401+
# workaround sites that cite it to their natural cancel-scope forms when
402+
# Python 3.11 support is dropped.
393403
deadline = anyio.current_time() + timeout
394404
while process.returncode is None:
395405
if anyio.current_time() >= deadline:

src/mcp/os/win32/utilities.py

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,20 @@
3333
_EXIT_POLL_INTERVAL = 0.01
3434

3535
# The Job Object each spawned process was assigned to, so the process tree can be
36-
# terminated through it later. Keyed weakly: if a process object goes away without
37-
# explicit termination, the entry goes with it.
36+
# terminated through it later.
37+
#
38+
# Values must stay the pywin32 `PyHANDLE` returned by `CreateJobObject`, never a
39+
# detached int: on abandoned-shutdown paths where neither pop site runs, the dying
40+
# weak entry drops the last reference to the `PyHANDLE`, whose destructor closes
41+
# the OS handle — and `KILL_ON_JOB_CLOSE` then reaps the orphaned tree. That
42+
# destructor-close is the only backstop on those paths; storing anything but the
43+
# `PyHANDLE` would turn it into a permanent handle leak.
44+
#
45+
# Keys rely on anyio's `Process` being weakref-able and identity-hashed (it is a
46+
# `dataclass(eq=False)` without `__slots__`); if that ever changes, registration
47+
# fails loudly with a `TypeError` rather than silently. Entries are written once,
48+
# after assignment can no longer fail, and consumed via `pop()` on the event
49+
# loop — no locking needed.
3850
_process_jobs: "weakref.WeakKeyDictionary[Process | FallbackProcess, object]" = weakref.WeakKeyDictionary()
3951

4052

@@ -80,11 +92,11 @@ class FallbackProcess:
8092

8193
def __init__(self, popen_obj: subprocess.Popen[bytes]):
8294
self.popen: subprocess.Popen[bytes] = popen_obj
83-
self.stdin_raw = popen_obj.stdin
84-
self.stdout_raw = popen_obj.stdout
95+
stdin = popen_obj.stdin
96+
stdout = popen_obj.stdout
8597

86-
self.stdin = FileWriteStream(cast(BinaryIO, self.stdin_raw)) if self.stdin_raw else None
87-
self.stdout = FileReadStream(cast(BinaryIO, self.stdout_raw)) if self.stdout_raw else None
98+
self.stdin = FileWriteStream(cast(BinaryIO, stdin)) if stdin else None
99+
self.stdout = FileReadStream(cast(BinaryIO, stdout)) if stdout else None
88100

89101
async def wait(self) -> int:
90102
"""Wait for process exit by polling.
@@ -133,8 +145,11 @@ async def create_windows_process(
133145
when using the SelectorEventLoop, which does not support async subprocesses.
134146
In that case, we fall back to using subprocess.Popen.
135147
136-
The process is automatically added to a Job Object to ensure all child
137-
processes are terminated when the parent is terminated.
148+
The process is added to a Job Object so that child processes are terminated
149+
with it. Children the server spawns before the assignment completes — a
150+
window of two API calls against the server's interpreter cold start — are
151+
not captured: job membership is inherited at process creation, never
152+
acquired retroactively.
138153
139154
Args:
140155
command (str): The executable to run
@@ -160,7 +175,11 @@ async def create_windows_process(
160175
process = await _create_windows_fallback_process(command, args, env, errlog, cwd)
161176

162177
# Created only after a successful spawn: a failed spawn raises before any job
163-
# exists, so there is no handle to leak on that path.
178+
# exists, so there is no handle to leak on that path. Children the server
179+
# spawns before AssignProcessToJobObject completes land outside the job
180+
# (membership is inherited at CreateProcess, never acquired retroactively);
181+
# the window is two API calls racing the server's interpreter cold start. If
182+
# it ever bites, the fix is a CREATE_SUSPENDED spawn -> assign -> resume.
164183
job = _create_job_object()
165184
_maybe_assign_process_to_job(process, job)
166185
return process
@@ -239,8 +258,11 @@ def _maybe_assign_process_to_job(process: Process | FallbackProcess, job: object
239258
win32job.AssignProcessToJobObject(job, process_handle)
240259
finally:
241260
win32api.CloseHandle(process_handle)
242-
# Recorded only once nothing else can fail, so the failure branch below can
243-
# assume the job is unregistered and just close it.
261+
# Recorded only after the process-handle close above. If that close failed
262+
# post-assignment, the except below would close the job handle and
263+
# KILL_ON_JOB_CLOSE would take the just-assigned healthy server with it —
264+
# accepted, because CloseHandle cannot realistically fail on a handle
265+
# OpenProcess just returned.
244266
_process_jobs[process] = job
245267
except pywintypes.error:
246268
logger.warning("Failed to assign process %d to Job Object", process.pid, exc_info=True)

tests/client/test_stdio.py

Lines changed: 66 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -210,29 +210,34 @@ async def _next_message(read_stream: MemoryObjectReceiveStream[SessionMessage |
210210

211211
@pytest.mark.anyio
212212
async def test_messages_split_and_packed_across_chunks_are_reframed(monkeypatch: pytest.MonkeyPatch) -> None:
213-
"""Framing survives arbitrary chunk boundaries: a message split mid-byte and two
214-
messages packed into one chunk are each delivered exactly once, and a trailing
215-
line without a newline is not delivered.
213+
"""Framing survives arbitrary chunk boundaries: a message split mid-byte, two
214+
messages packed into one chunk, and a CRLF-terminated message are each delivered
215+
exactly once, and a trailing line without a newline is not delivered.
216216
217217
A real pipe delivers short writes as whole lines, so only an in-process stdout
218218
can pin the reassembly buffer deterministically.
219219
"""
220220
ping = JSONRPCRequest(jsonrpc="2.0", id=1, method="ping")
221221
pong = JSONRPCResponse(jsonrpc="2.0", id=1, result={})
222+
ping2 = JSONRPCRequest(jsonrpc="2.0", id=2, method="ping")
222223
process = FakeProcess(on_stdin_close=lambda: process.exit(0))
223224

224225
install_fake_process(monkeypatch, process)
225226

226227
with anyio.fail_after(5):
227228
async with stdio_client(FAKE_PARAMS) as (read_stream, _):
228229
# Split the first message mid-bytes, pack the rest of it together with
229-
# the whole second message and a partial third into one chunk.
230+
# the whole second message, a CRLF-framed third (the SDK's own server
231+
# emits \r\n on Windows; jiter treats the \r as JSON whitespace), and a
232+
# partial fourth into one chunk.
230233
wire = _line(ping)
234+
crlf_wire = ping2.model_dump_json(by_alias=True, exclude_unset=True).encode() + b"\r\n"
231235
await process.feed(wire[:7])
232-
await process.feed(wire[7:] + _line(pong) + b'{"jsonrpc": "2.0", "id": 99')
236+
await process.feed(wire[7:] + _line(pong) + crlf_wire + b'{"jsonrpc": "2.0", "id": 99')
233237

234238
assert await _next_message(read_stream) == ping
235239
assert await _next_message(read_stream) == pong
240+
assert await _next_message(read_stream) == ping2
236241

237242
# The partial trailing message is dropped at EOF rather than delivered
238243
# broken: EOF ends the read stream with nothing further on it.
@@ -411,8 +416,8 @@ async def run_client_until_cancelled() -> None:
411416
@pytest.mark.anyio
412417
async def test_writing_after_the_server_dies_reports_clean_closure(monkeypatch: pytest.MonkeyPatch) -> None:
413418
"""A send racing the server's death must not surface a raw backend exception
414-
(ConnectionResetError in an exception group); the death is reported through the
415-
read stream's closure and the transport still shuts down cleanly."""
419+
(ConnectionResetError in an exception group) out of the context manager; the
420+
transport still shuts down cleanly."""
416421
ping = JSONRPCRequest(jsonrpc="2.0", id=1, method="ping")
417422
process = FakeProcess(on_stdin_close=lambda: process.exit(0))
418423

@@ -471,6 +476,9 @@ async def failing_spawn(
471476
pass # pragma: no cover
472477

473478
assert exc_info.value.errno == errno.EACCES
479+
# Drop the ExceptionInfo before collecting: its traceback references the suspended
480+
# stdio_client frame, which would keep leaked streams alive across the collect.
481+
del exc_info
474482
gc.collect()
475483

476484

@@ -822,8 +830,8 @@ async def test_an_eperm_group_that_outlives_the_grace_period_is_still_sigkilled(
822830
) -> None:
823831
"""Even when every probe reports EPERM, the SIGKILL escalation still fires after
824832
the grace period (and its own EPERM is tolerated) — pre-fix, EPERM at SIGTERM
825-
returned early and a SIGTERM-ignoring server survived shutdown forever. The tiny
826-
timeout is the time-based grace period under test."""
833+
abandoned the group escalation for a leader-only kill, leaking every other group
834+
member. The tiny timeout is the time-based grace period under test."""
827835
calls: list[tuple[int, int]] = []
828836

829837
def fake_killpg(pgid: int, sig: int) -> None:
@@ -843,6 +851,53 @@ def fake_killpg(pgid: int, sig: int) -> None:
843851
assert set(calls[1:-1]) == {(stub.pid, 0)}
844852

845853

854+
@pytest.mark.anyio
855+
@pytest.mark.parametrize("anyio_backend", ["asyncio", "trio"])
856+
@pytest.mark.skipif(sys.platform == "win32", reason="POSIX killpg semantics")
857+
# Excluded from coverage for the same Windows-runner reason as above.
858+
async def test_the_grace_wait_reads_returncode_so_trio_can_reap_the_leaders_zombie( # pragma: lax no cover
859+
monkeypatch: pytest.MonkeyPatch,
860+
) -> None:
861+
"""The wait between SIGTERM and SIGKILL reads `process.returncode` while it polls:
862+
on trio that property calls `Popen.poll()`, and that reap is what stops the
863+
leader's zombie from keeping the process group alive for the full timeout (see the
864+
comment in `terminate_posix_process_tree`). Regression pin for the read itself, on
865+
both backends — the reaping side effect is trio's documented `returncode`
866+
behaviour, deliberately not re-tested here."""
867+
868+
calls: list[tuple[int, int]] = []
869+
870+
def fake_killpg(pgid: int, sig: int) -> None:
871+
# SIGTERM is accepted and every liveness probe reports survivors, so the
872+
# grace wait runs to its (tiny) timeout and the SIGKILL escalation fires.
873+
calls.append((pgid, sig))
874+
875+
class _ReadCountingProcess:
876+
"""A live-forever leader whose `returncode` property counts its reads."""
877+
878+
pid = 54321
879+
880+
def __init__(self) -> None:
881+
self.returncode_reads = 0
882+
883+
@property
884+
def returncode(self) -> int | None:
885+
self.returncode_reads += 1
886+
return None
887+
888+
monkeypatch.setattr(posix_utilities.os, "killpg", fake_killpg)
889+
stub = _ReadCountingProcess()
890+
891+
with anyio.fail_after(5):
892+
await terminate_posix_process_tree(cast(anyio.abc.Process, stub), timeout_seconds=0.05)
893+
894+
# The wait ran to its deadline (the escalation fired)...
895+
assert calls[0] == (stub.pid, signal.SIGTERM)
896+
assert calls[-1] == (stub.pid, signal.SIGKILL)
897+
# ...and `returncode` was read while it polled — the read that reaps on trio.
898+
assert stub.returncode_reads >= 1
899+
900+
846901
# ---------------------------------------------------------------------------
847902
# Real-process tests: the OS facts no fake can certify
848903
# ---------------------------------------------------------------------------
@@ -1056,6 +1111,8 @@ async def test_terminating_an_already_exited_process_is_a_no_op() -> None: # pr
10561111
proc = await _create_platform_compatible_process(sys.executable, ["-c", "pass"])
10571112
assert isinstance(proc, anyio.abc.Process)
10581113

1114+
# The bound covers one interpreter cold start on a loaded runner; a healthy run
1115+
# takes well under a second.
10591116
with anyio.fail_after(10.0):
10601117
await _wait_until_exited(proc)
10611118
await _terminate_process_tree(proc)

0 commit comments

Comments
 (0)