diff --git a/docs/migration.md b/docs/migration.md index 9850f74cd4..f0facf1816 100644 --- a/docs/migration.md +++ b/docs/migration.md @@ -105,6 +105,41 @@ The `headers`, `timeout`, `sse_read_timeout`, and `auth` parameters have been re Note: `sse_client` retains its `headers`, `timeout`, `sse_read_timeout`, and `auth` parameters — only the streamable HTTP transport changed. +### `terminate_windows_process` removed + +The deprecated `mcp.os.win32.utilities.terminate_windows_process` function has been +removed. Process termination is handled internally by the `stdio_client` context +manager; there is no replacement API. The Windows tree-termination helper +`terminate_windows_process_tree` no longer accepts a `timeout_seconds` argument — +the value was never used (Job Object termination is immediate). + +### `stdio_client` no longer kills children of a gracefully-exited server on POSIX + +When a server exits on its own after `stdio_client` closes its stdin, background +child processes the server leaves behind are no longer killed on POSIX — their +lifetime is the server's business. The old behavior was a side effect of a shutdown +wait gated on the stdio pipes closing rather than on process exit: a child holding +an inherited pipe made a well-behaved server look hung, so its whole process tree +was killed. (That gating is an asyncio behavior specific to Python 3.11+ — on +Python 3.10 and the trio backend the old wait already resolved on process exit, so +the spurious kill never fired there.) A server that does not exit within the grace +period is still terminated +along with its entire process group. On Windows, children stay in the server's Job +Object and are still killed at shutdown — now deterministically when the job handle +is closed, rather than whenever the handle happened to be garbage-collected. + +If you relied on `stdio_client` killing everything the server spawned, make the +server terminate its own children on shutdown (its stdin reaching EOF is the +shutdown signal), or clean up the process tree from the host application after +`stdio_client` exits. + +Two related shutdown refinements: `stdio_client` now closes its end of the pipes +deterministically at shutdown, so a surviving child that keeps writing to an +inherited stdout receives `EPIPE`/`SIGPIPE` once the client is gone (previously the +pipe lingered until garbage collection); and a failed write to a server that is +still running now surfaces as a closed connection (`CONNECTION_CLOSED`) on the read +side instead of leaving requests waiting indefinitely. + ### Removed type aliases and classes The following deprecated type aliases and classes have been removed from `mcp.types`: diff --git a/src/mcp/client/stdio.py b/src/mcp/client/stdio.py index 902dc8576c..cc502ae0c7 100644 --- a/src/mcp/client/stdio.py +++ b/src/mcp/client/stdio.py @@ -1,3 +1,4 @@ +import asyncio import logging import os import sys @@ -7,7 +8,7 @@ import anyio import anyio.lowlevel -from anyio.abc import Process +from anyio.abc import AsyncResource, Process from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream from anyio.streams.text import TextReceiveStream from pydantic import BaseModel, Field @@ -16,6 +17,7 @@ from mcp.os.posix.utilities import terminate_posix_process_tree from mcp.os.win32.utilities import ( FallbackProcess, + close_process_job, create_windows_process, get_windows_executable_command, terminate_windows_process_tree, @@ -44,9 +46,28 @@ else ["HOME", "LOGNAME", "PATH", "SHELL", "TERM", "USER"] ) -# Timeout for process termination before falling back to force kill +# How long the server gets to exit on its own after its stdin is closed, before its +# process tree is terminated. PROCESS_TERMINATION_TIMEOUT = 2.0 +# How long the process tree gets to die after a graceful termination request +# (SIGTERM on POSIX) before it is force-killed. Windows tree termination is an +# immediate hard kill, so this only stretches the POSIX path. +FORCE_KILL_TIMEOUT = 2.0 + +# How long to wait for the event loop to observe the death of a killed process. +# Kill-death is prompt, so this normally takes one poll interval; the bound only +# matters for a process that even SIGKILL cannot collect (uninterruptible I/O). +_KILL_REAP_TIMEOUT = 2.0 + +# How long the writer task gets to hand already-accepted outbound messages to the +# server's stdin before shutdown closes it. Normally one scheduling round; only a +# wedged pipe (full, with its reader gone) makes it run out. +_WRITER_FLUSH_TIMEOUT = 0.5 + +# How often to poll for process death while waiting out the grace period. +_EXIT_POLL_INTERVAL = 0.01 + def get_default_environment() -> dict[str, str]: """Returns a default environment object including only environment variables deemed @@ -105,6 +126,12 @@ class StdioServerParameters(BaseModel): async def stdio_client(server: StdioServerParameters, errlog: TextIO = sys.stderr): """Client transport for stdio: this will connect to a server by spawning a process and communicating with it over stdin/stdout. + + Raises: + OSError: If the server process cannot be spawned (for example, the command + does not exist or is not executable). + ValueError: If the spawn parameters are invalid (for example, an embedded + NUL byte in the command or an argument). """ read_stream: MemoryObjectReceiveStream[SessionMessage | Exception] read_stream_writer: MemoryObjectSendStream[SessionMessage | Exception] @@ -115,6 +142,7 @@ async def stdio_client(server: StdioServerParameters, errlog: TextIO = sys.stder read_stream_writer, read_stream = anyio.create_memory_object_stream(0) write_stream, write_stream_reader = anyio.create_memory_object_stream(0) + spawned = False try: command = _get_executable_command(server.command) @@ -126,89 +154,258 @@ async def stdio_client(server: StdioServerParameters, errlog: TextIO = sys.stder errlog=errlog, cwd=server.cwd, ) - except OSError: - # Clean up streams if process creation fails - await read_stream.aclose() - await write_stream.aclose() - await read_stream_writer.aclose() - await write_stream_reader.aclose() - raise - - async def stdout_reader(): + spawned = True + finally: + if not spawned: + # The spawn can fail with more than OSError: a cancellation delivered + # while the interpreter cold-starts, or ValueError from a NUL byte in + # the command. Close the streams on every failure or they leak to a + # GC-time ResourceWarning. Shielded so a pending cancellation cannot + # interrupt the closes. + with anyio.CancelScope(shield=True): + await _aclose_all(read_stream, write_stream, read_stream_writer, write_stream_reader) + + # Flipped by the shutdown sequence so the pipe tasks can tell expected + # teardown noise from genuine mid-session transport failures. + shutting_down = False + # Set when stdin_writer finishes, so shutdown can wait (bounded) for messages + # the transport already accepted to be flushed before it closes stdin. + writer_done = anyio.Event() + # Shutdown's final cancellation targets these instead of the task group's own + # scope: cancelling the host scope would deliver the cancellation by throwing + # through the caller's suspended frames, and Python 3.11's tracer loses + # coverage events after such a throw() traversal (python/cpython#106749; + # 3.11-only — see the sunset note in `_wait_for_process_exit`). + # These scope exactly the work the pipe tasks own. + reader_scope = anyio.CancelScope() + writer_scope = anyio.CancelScope() + + async def stdout_reader() -> None: assert process.stdout, "Opened process is missing stdout" - try: - async with read_stream_writer: - buffer = "" - async for chunk in TextReceiveStream( - process.stdout, - encoding=server.encoding, - errors=server.encoding_error_handler, - ): - lines = (buffer + chunk).split("\n") - buffer = lines.pop() - - for line in lines: - try: - message = types.jsonrpc_message_adapter.validate_json(line, by_name=False) - except Exception as exc: # pragma: no cover - logger.exception("Failed to parse JSONRPC message from server") - await read_stream_writer.send(exc) + with reader_scope: + # Once the read stream is gone, keep consuming stdout without delivering: + # a server flushing its remaining output during shutdown must not block on + # a full pipe and miss its chance to exit before the grace period ends. + delivering = True + try: + async with read_stream_writer: + buffer = "" + async for chunk in TextReceiveStream( + process.stdout, + encoding=server.encoding, + errors=server.encoding_error_handler, + ): + if not delivering: continue - session_message = SessionMessage(message) - await read_stream_writer.send(session_message) - except anyio.ClosedResourceError: # pragma: lax no cover - await anyio.lowlevel.checkpoint() - - async def stdin_writer(): + lines = (buffer + chunk).split("\n") + buffer = lines.pop() + + for line in lines: + item: SessionMessage | Exception + try: + message = types.jsonrpc_message_adapter.validate_json(line, by_name=False) + except ValueError as exc: + logger.exception("Failed to parse JSONRPC message from server") + item = exc + else: + item = SessionMessage(message) + + try: + await read_stream_writer.send(item) + except (anyio.ClosedResourceError, anyio.BrokenResourceError): + # The read stream is gone — shutdown closed it, or the + # caller did. Stop delivering but keep draining. + delivering = False + break + except anyio.ClosedResourceError: + # Our own shutdown closed/poisoned the stdout stream under the read. + await anyio.lowlevel.checkpoint() + except (anyio.BrokenResourceError, ConnectionError): + # The stdout pipe itself failed. During shutdown that's expected (the + # process may be killed mid-read, which the proactor backend surfaces + # as ConnectionResetError); mid-session it's a real transport failure + # worth a log line. Either way the session observes a clean closure + # when this task's exit closes the read stream. + if not shutting_down: + logger.exception("Reading from the MCP server's stdout failed mid-session") + await anyio.lowlevel.checkpoint() + + async def stdin_writer() -> None: assert process.stdin, "Opened process is missing stdin" - try: - async with write_stream_reader: - async for session_message in write_stream_reader: - json = session_message.message.model_dump_json(by_alias=True, exclude_unset=True) - await process.stdin.send( - (json + "\n").encode( - encoding=server.encoding, - errors=server.encoding_error_handler, + with writer_scope: + try: + async with write_stream_reader: + async for session_message in write_stream_reader: + json = session_message.message.model_dump_json(by_alias=True, exclude_unset=True) + await process.stdin.send( + (json + "\n").encode( + encoding=server.encoding, + errors=server.encoding_error_handler, + ) ) - ) - except anyio.ClosedResourceError: # pragma: no cover - await anyio.lowlevel.checkpoint() - - async with anyio.create_task_group() as tg, process: + except (anyio.ClosedResourceError, anyio.BrokenResourceError, OSError): + # The server stopped reading — its process died, it closed its stdin, + # or shutdown closed the pipe under a racing write. The exact exception + # depends on platform and backend; all of them just mean the pipe is + # gone. The server may well still be alive, so close the read stream to + # tell the session the connection is over — a silently swallowed write + # would otherwise leave a request waiting forever for its response. + await read_stream_writer.aclose() + finally: + writer_done.set() + + async with anyio.create_task_group() as tg: tg.start_soon(stdout_reader) tg.start_soon(stdin_writer) try: yield read_stream, write_stream finally: - # MCP spec: stdio shutdown sequence - # 1. Close input stream to server - # 2. Wait for server to exit, or send SIGTERM if it doesn't exit in time - # 3. Send SIGKILL if still not exited - if process.stdin: # pragma: no branch - try: - await process.stdin.aclose() - except Exception: # pragma: no cover - # stdin might already be closed, which is fine - pass + shutting_down = True + # The shutdown sequence must run to completion even when the caller is + # being cancelled — a cancellation that skipped it would leak the server + # process (and its children) and could block forever on the way out. + # Every wait inside the shield is time-bounded. The shield holds against + # anyio-level cancellation; a native task.cancel() delivered while the + # cleanup is in progress can still abort it — there is no backend-neutral + # way to refuse native cancellation. + with anyio.CancelScope(shield=True): + # Let the writer hand any message the transport already accepted to + # the server's stdin before that stdin closes; a zero-buffer send + # completes at the rendezvous, before the writer has written. + write_stream.close() + flush_deadline = anyio.current_time() + _WRITER_FLUSH_TIMEOUT + while not writer_done.is_set() and anyio.current_time() < flush_deadline: + await anyio.sleep(_EXIT_POLL_INTERVAL) + # Unblock the reader from any undelivered message so it drains + # stdout for the rest of shutdown (see the drain note in + # stdout_reader). + read_stream.close() + await _shutdown_process_tree(process) + await _aclose_all(read_stream, write_stream, read_stream_writer, write_stream_reader) + # Give tasks unblocked by the closes above one scheduling pass so + # they run their exit/except paths (deterministic + # ClosedResourceError handling) before the cancellation below. + await anyio.lowlevel.checkpoint() + # Nothing the pipe tasks could still do matters now; cancel them so the + # task-group join cannot hang on a write into a pipe whose read end a + # kill-surviving descendant still holds. Cancelling the pipe tasks' own + # scopes, not the task group's: see the note where they are created. + # (On the Windows fallback path a reader thread parked in a synchronous + # ReadFile ignores cancellation and anyio waits for it; there is no + # portable way to abandon it here.) + reader_scope.cancel() + writer_scope.cancel() + + +async def _aclose_all(*streams: AsyncResource) -> None: + """Close every given stream.""" + for stream in streams: + await stream.aclose() + + +async def _shutdown_process_tree(process: Process | FallbackProcess) -> None: + """Shut the server process down per the MCP spec stdio sequence. + + 1. Close the server's stdin so it can exit on its own. + 2. Give it `PROCESS_TERMINATION_TIMEOUT` seconds to exit. + 3. Otherwise terminate its whole process tree (SIGTERM then SIGKILL on POSIX, + Job Object hard kill on Windows) and wait (bounded) for the death to be + observed, logging if even that fails. + 4. Release the OS-level pipe/transport resources deterministically. + """ + if process.stdin: # pragma: no branch + try: + await process.stdin.aclose() + except (OSError, anyio.BrokenResourceError, anyio.ClosedResourceError): + # stdin is already closed or the pipe is already gone, which is fine + await anyio.lowlevel.checkpoint() + + exited = await _wait_for_process_exit(process, PROCESS_TERMINATION_TIMEOUT) + if not exited: + # Process didn't exit from stdin closure; use platform-specific termination, + # which kills the entire process tree, not just the spawned process. + await _terminate_process_tree(process) + # Wait (bounded) for the kill to be observed by the event loop: on asyncio, + # `returncode` flips only once the child watcher's callback has been + # delivered, and that delivery is also what lets the subprocess transport + # close instead of leaking a ResourceWarning into whatever runs next. + if not await _wait_for_process_exit(process, _KILL_REAP_TIMEOUT): + # SIGKILL/job termination cannot be refused, but it can stall + # (uninterruptible I/O, an unsignalable group member). Leave a trace + # rather than abandoning the process silently. + logger.warning("MCP server process %d is still alive after the kill escalation; abandoning it", process.pid) + + # On Windows, drop the process's Job Object handle now: the job is configured to + # kill its remaining members when the handle closes, so closing it here makes + # that reaping deterministic instead of GC-timed. (POSIX deliberately leaves a + # well-behaved server's surviving children alive; no-op there.) + close_process_job(process) + + # The process is dead, but its stdout pipe can still be held open by something + # that inherited it (an orphaned grandchild, say), in which case the reader task + # would never see EOF. Closing our wrapper poisons the Python-level reader so the + # reader task finishes either way; the OS-level pipe end itself lives until the + # subprocess transport is closed below. + if process.stdout: # pragma: no branch + try: + await process.stdout.aclose() + except (OSError, anyio.BrokenResourceError, anyio.ClosedResourceError): + # The pipe may already be broken or contended (the Windows fallback + # closes it in a worker thread); the reader is poisoned either way. + await anyio.lowlevel.checkpoint() + + _close_subprocess_transport(process) - try: - # Give the process time to exit gracefully after stdin closes - with anyio.fail_after(PROCESS_TERMINATION_TIMEOUT): - await process.wait() - except TimeoutError: - # Process didn't exit from stdin closure, use platform-specific termination - # which handles SIGTERM -> SIGKILL escalation - await _terminate_process_tree(process) - except ProcessLookupError: # pragma: no cover - # Process already exited, which is fine - pass - await read_stream.aclose() - await write_stream.aclose() - await read_stream_writer.aclose() - await write_stream_reader.aclose() + +def _close_subprocess_transport(process: Process | FallbackProcess) -> None: + """Deterministically release the asyncio subprocess transport, if there is one. + + On asyncio the transport — and the OS-level pipe fds it owns — is otherwise + closed only once the process has exited *and* every pipe has reported EOF. A + surviving descendant that inherited a pipe end (deliberately left alive on + POSIX when the server exited gracefully) would keep the client's fds and the + transport open until garbage collection, which warns. Nothing public exposes + the transport, hence the private attribute walk. trio and the Windows fallback + close the real fds in their stream wrappers' `aclose()` and take the early + return here. + """ + transport = getattr(getattr(process, "_process", None), "_transport", None) + if isinstance(transport, asyncio.SubprocessTransport): + # If unflushed stdin bytes remain (their reader never drained them), the + # write-pipe close stays deferred until that holder exits — close() still + # marks the transport closed, so nothing warns at GC, and the residual fd + # is bounded by the survivor's lifetime. + transport.close() + + +async def _wait_for_process_exit(process: Process | FallbackProcess, timeout: float) -> bool: + """Wait for the process itself to die, returning whether it did within `timeout`. + + Deliberately does not use `process.wait()`: on asyncio under Python 3.11+ it + resolves only once the process has exited *and* every one of its pipes has + closed (3.10 and trio resolve on exit alone) — and pipes are inherited by the + server's own children, so a well-behaved server that exits instantly but leaves + a background child alive would be misclassified as hung and get its whole tree + terminated. `returncode` reflects process death alone on every backend. + """ + # Implemented as a plain deadline loop rather than `anyio.move_on_after()`: a + # cancel scope's deadline fires by throwing a cancellation through every frame + # suspended in the await chain, including the caller's, and Python 3.11's tracer + # loses coverage events in a frame resumed after such a throw() traversal + # (python/cpython#106749). With no cancel scope, the timeout path completes a + # normal `sleep()` and returns, so no frame is ever thrown through. The tracer + # bug is 3.11-only (fixed in 3.12, wontfix on 3.11): revert this and the other + # workaround sites that cite it to their natural cancel-scope forms when + # Python 3.11 support is dropped. + deadline = anyio.current_time() + timeout + while process.returncode is None: + if anyio.current_time() >= deadline: + return False + await anyio.sleep(_EXIT_POLL_INTERVAL) + return True def _get_executable_command(command: str) -> str: @@ -232,16 +429,16 @@ async def _create_platform_compatible_process( env: dict[str, str] | None = None, errlog: TextIO = sys.stderr, cwd: Path | str | None = None, -): +) -> Process | FallbackProcess: """Creates a subprocess in a platform-compatible way. Unix: Creates process in a new session/process group for killpg support Windows: Creates process in a Job Object for reliable child termination """ if sys.platform == "win32": # pragma: no cover - process = await create_windows_process(command, args, env, errlog, cwd) + return await create_windows_process(command, args, env, errlog, cwd) else: # pragma: lax no cover - process = await anyio.open_process( + return await anyio.open_process( [command, *args], env=env, stderr=errlog, @@ -249,22 +446,16 @@ async def _create_platform_compatible_process( start_new_session=True, ) - return process - -async def _terminate_process_tree(process: Process | FallbackProcess, timeout_seconds: float = 2.0) -> None: +async def _terminate_process_tree(process: Process | FallbackProcess) -> None: """Terminate a process and all its children using platform-specific methods. - Unix: Uses os.killpg() for atomic process group termination - Windows: Uses Job Objects via pywin32 for reliable child process cleanup - - Args: - process: The process to terminate - timeout_seconds: Timeout in seconds before force killing (default: 2.0) + Unix: SIGTERM to the process group, escalating to SIGKILL after + `FORCE_KILL_TIMEOUT`. Windows: immediate Job Object termination. """ if sys.platform == "win32": # pragma: no cover - await terminate_windows_process_tree(process, timeout_seconds) + await terminate_windows_process_tree(process) else: # pragma: lax no cover - # FallbackProcess should only be used for Windows compatibility + # Windows-only FallbackProcess never reaches the POSIX path. assert isinstance(process, Process) - await terminate_posix_process_tree(process, timeout_seconds) + await terminate_posix_process_tree(process, FORCE_KILL_TIMEOUT) diff --git a/src/mcp/os/posix/utilities.py b/src/mcp/os/posix/utilities.py index 0e9d74cf3c..5131ae8d8e 100644 --- a/src/mcp/os/posix/utilities.py +++ b/src/mcp/os/posix/utilities.py @@ -9,49 +9,87 @@ logger = logging.getLogger(__name__) +# How often to probe for surviving process-group members between SIGTERM and SIGKILL. +_GROUP_POLL_INTERVAL = 0.01 + async def terminate_posix_process_tree(process: Process, timeout_seconds: float = 2.0) -> None: - """Terminate a process and all its children on POSIX systems. + """Terminate a process and all its descendants on POSIX systems. + + The process was spawned with `start_new_session=True`, so it leads its own + process group and its pgid equals its pid. `os.killpg` on that group reaches + every descendant in one atomic call — including descendants whose parent (even + the group leader itself) has already exited, which a walk of the process tree + would miss. - Uses os.killpg() for atomic process group termination. + Sends SIGTERM to the group, waits up to `timeout_seconds` for the group to + disappear, then SIGKILLs whatever remains. - Args: - process: The process to terminate - timeout_seconds: Timeout in seconds before force killing (default: 2.0) + Descendants that move themselves into a new session or process group + (daemonizers) escape a group kill by design. And a process group only + disappears once every member is dead *and reaped*: if this client runs as + PID 1 or a subreaper without reaping orphans, dead descendants reparent to + it as zombies that keep the group occupied, so the wait below always runs + to its full timeout. Run such clients under an init shim (e.g. + `docker run --init`) to get the fast path back. """ - pid = getattr(process, "pid", None) or getattr(getattr(process, "popen", None), "pid", None) - if not pid: - # No PID means there's no process to terminate - it either never started, - # already exited, or we have an invalid process object - return + # start_new_session=True at spawn makes the leader's pid the pgid; do not ask the + # OS via getpgid(), which fails with ProcessLookupError once the leader has been + # reaped even while other group members are still alive. + pgid = process.pid try: - pgid = os.getpgid(pid) os.killpg(pgid, signal.SIGTERM) + except ProcessLookupError: + # The entire group is already gone; nothing to terminate. + return + except PermissionError: + # What EPERM proves differs by platform. Linux killpg(2): no "permission to + # send the signal to any of the target processes" — every member was denied, + # but those members are still alive. macOS kill(2): "when signaling a process + # group, this error is returned if any members of the group could not be + # signaled" — one foreign-euid member is enough, and the rest of the group may + # well have been signalled (current XNU also raises it when only unreaped + # zombies remain, where Linux would succeed). On no platform does it mean the + # group is gone, so fall through to the grace wait and SIGKILL escalation — + # both tolerate EPERM — instead of giving up: members that exit (or get + # reaped) end the wait early, and permitted members still get the KILL + # wherever the platform delivers it. + logger.exception("No permission to signal some of process group %d; waiting for it to exit anyway", pgid) - with anyio.move_on_after(timeout_seconds): - while True: - try: - # Check if process group still exists (signal 0 = check only) - os.killpg(pgid, 0) - await anyio.sleep(0.1) - except ProcessLookupError: - return - - try: - os.killpg(pgid, signal.SIGKILL) - except ProcessLookupError: - pass - - except (ProcessLookupError, PermissionError, OSError) as e: - logger.warning(f"Process group termination failed for PID {pid}: {e}, falling back to simple terminate") - try: - process.terminate() - with anyio.fail_after(timeout_seconds): - await process.wait() - except Exception: - logger.warning(f"Process termination failed for PID {pid}, attempting force kill") + with anyio.move_on_after(timeout_seconds): + while True: try: - process.kill() - except Exception: - logger.exception(f"Failed to kill process {pid}") + # Probe for surviving group members (signal 0 checks without + # signalling). Only ESRCH proves the group is gone: on Linux the + # probe keeps succeeding while live members or unreaped zombies + # remain (so it waits out reaping rather than racing it), and EPERM + # is ambiguous on every platform. + os.killpg(pgid, 0) + except ProcessLookupError: + return + except PermissionError: + # Live members we may not signal (Linux), or a group with foreign + # members or nothing but zombies (macOS). Keep waiting: reaping + # turns an all-zombie group into ESRCH above, and unsignalable + # survivors may still exit on their own within the timeout. + pass + # Touching returncode reaps the leader on trio (the property calls + # Popen.poll()); without it nothing reaps during this loop and the + # leader's zombie keeps the group alive for the full timeout. On + # asyncio it is a cheap attribute read. Dead non-leader descendants + # are reaped by init once orphaned — except under a non-reaping + # PID-1/subreaper client, where their zombies hold the group here for + # the full timeout (see docstring). + _ = process.returncode + await anyio.sleep(_GROUP_POLL_INTERVAL) + + try: + os.killpg(pgid, signal.SIGKILL) + except ProcessLookupError: + # The group died between the last probe and the kill. + pass + except PermissionError: + # Same per-platform ambiguity as the SIGTERM above: whatever the platform + # let us signal has now been KILLed; the rest is not ours to touch. + pass diff --git a/src/mcp/os/win32/utilities.py b/src/mcp/os/win32/utilities.py index 6f68405f78..953c476d8f 100644 --- a/src/mcp/os/win32/utilities.py +++ b/src/mcp/os/win32/utilities.py @@ -4,16 +4,15 @@ import shutil import subprocess import sys +import weakref from pathlib import Path from typing import BinaryIO, TextIO, cast import anyio -from anyio import to_thread from anyio.abc import Process from anyio.streams.file import FileReadStream, FileWriteStream -from typing_extensions import deprecated -logger = logging.getLogger("client.stdio.win32") +logger = logging.getLogger(__name__) # Windows-specific imports for Job Objects if sys.platform == "win32": @@ -28,7 +27,27 @@ win32job = None pywintypes = None -JobHandle = int +# How often FallbackProcess polls the underlying Popen for exit. Polling keeps the +# wait cancellable: a thread blocked in Popen.wait() cannot be cancelled by anyio, +# which would make every timeout around it ineffective. +_EXIT_POLL_INTERVAL = 0.01 + +# The Job Object each spawned process was assigned to, so the process tree can be +# terminated through it later. +# +# Values must stay the pywin32 `PyHANDLE` returned by `CreateJobObject`, never a +# detached int: on abandoned-shutdown paths where neither pop site runs, the dying +# weak entry drops the last reference to the `PyHANDLE`, whose destructor closes +# the OS handle — and `KILL_ON_JOB_CLOSE` then reaps the orphaned tree. That +# destructor-close is the only backstop on those paths; storing anything but the +# `PyHANDLE` would turn it into a permanent handle leak. +# +# Keys rely on anyio's `Process` being weakref-able and identity-hashed (it is a +# `dataclass(eq=False)` without `__slots__`); if that ever changes, registration +# fails loudly with a `TypeError` rather than silently. Entries are written once, +# after assignment can no longer fail, and consumed via `pop()` on the event +# loop — no locking needed. +_process_jobs: "weakref.WeakKeyDictionary[Process | FallbackProcess, object]" = weakref.WeakKeyDictionary() def get_windows_executable_command(command: str) -> str: @@ -73,50 +92,29 @@ class FallbackProcess: def __init__(self, popen_obj: subprocess.Popen[bytes]): self.popen: subprocess.Popen[bytes] = popen_obj - self.stdin_raw = popen_obj.stdin # type: ignore[assignment] - self.stdout_raw = popen_obj.stdout # type: ignore[assignment] - self.stderr = popen_obj.stderr # type: ignore[assignment] - - self.stdin = FileWriteStream(cast(BinaryIO, self.stdin_raw)) if self.stdin_raw else None - self.stdout = FileReadStream(cast(BinaryIO, self.stdout_raw)) if self.stdout_raw else None - - async def __aenter__(self): - """Support async context manager entry.""" - return self - - async def __aexit__( - self, - exc_type: BaseException | None, - exc_val: BaseException | None, - exc_tb: object | None, - ) -> None: - """Terminate and wait on process exit inside a thread.""" + stdin = popen_obj.stdin + stdout = popen_obj.stdout + + self.stdin = FileWriteStream(cast(BinaryIO, stdin)) if stdin else None + self.stdout = FileReadStream(cast(BinaryIO, stdout)) if stdout else None + + async def wait(self) -> int: + """Wait for process exit by polling. + + `Popen.wait()` in a worker thread cannot be cancelled by anyio, which would + defeat every timeout placed around this call; polling keeps it cancellable. + """ + while (returncode := self.popen.poll()) is None: + await anyio.sleep(_EXIT_POLL_INTERVAL) + return returncode + + def terminate(self) -> None: + """Terminate the subprocess.""" self.popen.terminate() - await to_thread.run_sync(self.popen.wait) - - # Close the file handles to prevent ResourceWarning - if self.stdin: - await self.stdin.aclose() - if self.stdout: - await self.stdout.aclose() - if self.stdin_raw: - self.stdin_raw.close() - if self.stdout_raw: - self.stdout_raw.close() - if self.stderr: - self.stderr.close() - - async def wait(self): - """Async wait for process completion.""" - return await to_thread.run_sync(self.popen.wait) - - def terminate(self): - """Terminate the subprocess immediately.""" - return self.popen.terminate() def kill(self) -> None: - """Kill the subprocess immediately (alias for terminate).""" - self.terminate() + """Kill the subprocess (on Windows this is the same hard kill as terminate).""" + self.popen.kill() @property def pid(self) -> int: @@ -125,13 +123,12 @@ def pid(self) -> int: @property def returncode(self) -> int | None: - """Return the exit code, or ``None`` if the process has not yet terminated.""" - return self.popen.returncode + """Return the exit code, or `None` if the process has not yet terminated. - -# ------------------------ -# Updated function -# ------------------------ + Polls the underlying `Popen` so the value updates as soon as the process + dies, without anyone having to call `wait()`. + """ + return self.popen.poll() async def create_windows_process( @@ -148,8 +145,11 @@ async def create_windows_process( when using the SelectorEventLoop, which does not support async subprocesses. In that case, we fall back to using subprocess.Popen. - The process is automatically added to a Job Object to ensure all child - processes are terminated when the parent is terminated. + The process is added to a Job Object so that child processes are terminated + with it. Children the server spawns before the assignment completes — a + window of two API calls against the server's interpreter cold start — are + not captured: job membership is inherited at process creation, never + acquired retroactively. Args: command (str): The executable to run @@ -161,33 +161,26 @@ async def create_windows_process( Returns: Process | FallbackProcess: Async-compatible subprocess with stdin and stdout streams """ - job = _create_job_object() - process = None - try: - # First try using anyio with Windows-specific flags to hide console window process = await anyio.open_process( [command, *args], env=env, # Ensure we don't create console windows for each process - creationflags=subprocess.CREATE_NO_WINDOW # type: ignore - if hasattr(subprocess, "CREATE_NO_WINDOW") - else 0, + creationflags=getattr(subprocess, "CREATE_NO_WINDOW", 0), stderr=errlog, cwd=cwd, ) except NotImplementedError: - # If Windows doesn't support async subprocess creation, use fallback + # Windows event loops without async subprocess support (SelectorEventLoop) process = await _create_windows_fallback_process(command, args, env, errlog, cwd) - except Exception: - # Try again without creation flags - process = await anyio.open_process( - [command, *args], - env=env, - stderr=errlog, - cwd=cwd, - ) + # Created only after a successful spawn: a failed spawn raises before any job + # exists, so there is no handle to leak on that path. Children the server + # spawns before AssignProcessToJobObject completes land outside the job + # (membership is inherited at CreateProcess, never acquired retroactively); + # the window is two API calls racing the server's interpreter cold start. If + # it ever bites, the fix is a CREATE_SUSPENDED spawn -> assign -> resume. + job = _create_job_object() _maybe_assign_process_to_job(process, job) return process @@ -203,37 +196,25 @@ async def _create_windows_fallback_process( This function wraps the sync subprocess.Popen in an async-compatible interface. """ - try: - # Try launching with creationflags to avoid opening a new console window - popen_obj = subprocess.Popen( - [command, *args], - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=errlog, - env=env, - cwd=cwd, - bufsize=0, # Unbuffered output - creationflags=getattr(subprocess, "CREATE_NO_WINDOW", 0), - ) - except Exception: - # If creationflags failed, fallback without them - popen_obj = subprocess.Popen( - [command, *args], - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=errlog, - env=env, - cwd=cwd, - bufsize=0, - ) + popen_obj = subprocess.Popen( + [command, *args], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=errlog, + env=env, + cwd=cwd, + bufsize=0, # Unbuffered output + creationflags=getattr(subprocess, "CREATE_NO_WINDOW", 0), + ) return FallbackProcess(popen_obj) -def _create_job_object() -> int | None: +def _create_job_object() -> object | None: """Create a Windows Job Object configured to terminate all processes when closed.""" - if sys.platform != "win32" or not win32job: + if sys.platform != "win32" or not win32api or not win32job: return None + job = None try: job = win32job.CreateJobObject(None, "") extended_info = win32job.QueryInformationJobObject(job, win32job.JobObjectExtendedLimitInformation) @@ -241,17 +222,26 @@ def _create_job_object() -> int | None: extended_info["BasicLimitInformation"]["LimitFlags"] |= win32job.JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE win32job.SetInformationJobObject(job, win32job.JobObjectExtendedLimitInformation, extended_info) return job - except Exception as e: - logger.warning(f"Failed to create Job Object for process tree management: {e}") + except pywintypes.error: + logger.warning("Failed to create Job Object for process tree management", exc_info=True) + # If creation succeeded but configuration failed, close the handle rather + # than leaving it to be reclaimed whenever the GC gets to it. + if job is not None: + try: + win32api.CloseHandle(job) + except pywintypes.error: + pass return None -def _maybe_assign_process_to_job(process: Process | FallbackProcess, job: JobHandle | None) -> None: +def _maybe_assign_process_to_job(process: Process | FallbackProcess, job: object | None) -> None: """Try to assign a process to a job object. + On success the job is recorded for the process so that + `terminate_windows_process_tree` can terminate the whole tree through it. If assignment fails for any reason, the job handle is closed. """ - if not job: + if job is None: return if sys.platform != "win32" or not win32api or not win32con or not win32job: @@ -262,72 +252,77 @@ def _maybe_assign_process_to_job(process: Process | FallbackProcess, job: JobHan win32con.PROCESS_SET_QUOTA | win32con.PROCESS_TERMINATE, False, process.pid ) if not process_handle: - raise Exception("Failed to open process handle") + raise pywintypes.error(0, "OpenProcess", "Failed to open process handle") try: win32job.AssignProcessToJobObject(job, process_handle) - process._job_object = job finally: win32api.CloseHandle(process_handle) - except Exception as e: - logger.warning(f"Failed to assign process {process.pid} to Job Object: {e}") - if win32api: + # Recorded only after the process-handle close above. If that close failed + # post-assignment, the except below would close the job handle and + # KILL_ON_JOB_CLOSE would take the just-assigned healthy server with it — + # accepted, because CloseHandle cannot realistically fail on a handle + # OpenProcess just returned. + _process_jobs[process] = job + except pywintypes.error: + logger.warning("Failed to assign process %d to Job Object", process.pid, exc_info=True) + try: win32api.CloseHandle(job) + except pywintypes.error: + pass -async def terminate_windows_process_tree(process: Process | FallbackProcess, timeout_seconds: float = 2.0) -> None: - """Terminate a process and all its children on Windows. +def close_process_job(process: Process | FallbackProcess) -> None: + """Close the process's Job Object handle, if it still has one. - If the process has an associated job object, it will be terminated. - Otherwise, falls back to basic process termination. + The job is created with `JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE`, so closing the + handle also kills any job members that are still alive. Calling this at the end + of shutdown makes that reaping deterministic — otherwise it would happen whenever + the handle happens to be garbage-collected. This is a deliberate divergence from + POSIX, where a well-behaved server's surviving background children are left + alive. No-op on POSIX and when no job was assigned (or it was already closed by + tree termination). + """ + if sys.platform != "win32": + return - Args: - process: The process to terminate - timeout_seconds: Timeout in seconds before force killing (default: 2.0) + job = _process_jobs.pop(process, None) + if job is not None and win32api: + try: + win32api.CloseHandle(job) + except pywintypes.error: + pass + + +async def terminate_windows_process_tree(process: Process | FallbackProcess) -> None: + """Terminate a process and all its children on Windows. + + If the process was assigned to a Job Object at spawn, the job is terminated, + which kills every process in it immediately. Otherwise only the process itself + is terminated. Both are immediate hard kills: Windows offers no portable + equivalent of SIGTERM for a whole tree, so unlike POSIX there is no graceful + phase here — the stdin-close grace period in the client shutdown is the + server's opportunity to exit cleanly. """ if sys.platform != "win32": return - job = getattr(process, "_job_object", None) - if job and win32job: + job = _process_jobs.pop(process, None) + if job is not None and win32job: try: win32job.TerminateJobObject(job, 1) - except Exception: + except pywintypes.error: # Job might already be terminated pass finally: if win32api: try: win32api.CloseHandle(job) - except Exception: + except pywintypes.error: pass # Always try to terminate the process itself as well try: process.terminate() - except Exception: + except OSError: pass - - -@deprecated( - "terminate_windows_process is deprecated and will be removed in a future version. " - "Process termination is now handled internally by the stdio_client context manager." -) -async def terminate_windows_process(process: Process | FallbackProcess): - """Terminate a Windows process. - - Note: On Windows, terminating a process with process.terminate() doesn't - always guarantee immediate process termination. - If the process does not exit within 2 seconds, process.kill() is called - to send a SIGKILL-equivalent signal. - - Args: - process: The process to terminate - """ - try: - process.terminate() - with anyio.fail_after(2.0): - await process.wait() - except TimeoutError: - # Force kill if it doesn't terminate - process.kill() diff --git a/tests/client/test_stdio.py b/tests/client/test_stdio.py index 06e2cba4b1..ce9a475d29 100644 --- a/tests/client/test_stdio.py +++ b/tests/client/test_stdio.py @@ -1,252 +1,923 @@ +"""Tests for the stdio client transport. + +The transport's logic — framing, parse errors, the shutdown escalation decisions — is +tested in process against a fake process injected through the spawn seam; only the +properties that *are* OS behaviour (process-group kill semantics, SIGKILL after an +ignored SIGTERM, exec failure) use real subprocesses, with kernel-level liveness +sockets as their only synchronization. The full client↔server round trip over a real +process boundary is pinned by tests/interaction/transports/test_stdio.py. +""" + import errno -import shutil +import gc +import logging +import math +import os +import signal import sys -import textwrap -import time +from collections.abc import Callable from contextlib import AsyncExitStack, suppress +from pathlib import Path +from typing import TextIO, cast import anyio import anyio.abc +import anyio.lowlevel import pytest +import trio +import trio.testing +from anyio.streams.memory import MemoryObjectReceiveStream +from mcp.client import stdio from mcp.client.session import ClientSession from mcp.client.stdio import ( + _EXIT_POLL_INTERVAL, StdioServerParameters, _create_platform_compatible_process, _terminate_process_tree, stdio_client, ) +from mcp.os.posix import utilities as posix_utilities +from mcp.os.posix.utilities import terminate_posix_process_tree from mcp.os.win32.utilities import FallbackProcess from mcp.shared.exceptions import MCPError from mcp.shared.message import SessionMessage from mcp.types import CONNECTION_CLOSED, JSONRPCMessage, JSONRPCRequest, JSONRPCResponse -# Timeout for cleanup of processes that ignore SIGTERM -# This timeout ensures the test fails quickly if the cleanup logic doesn't have -# proper fallback mechanisms (SIGINT/SIGKILL) for processes that ignore SIGTERM -SIGTERM_IGNORING_PROCESS_TIMEOUT = 5.0 +# --------------------------------------------------------------------------- +# In-process fake of the spawned server process +# --------------------------------------------------------------------------- +# +# `stdio_client` touches its process through a narrow surface: `stdin.send/aclose`, +# `stdout` (a byte receive stream), and `returncode`. Everything between the spawn +# and the OS kill — framing, parse errors, the stdin-close grace period, the +# escalation decision — is pure SDK logic, so it is tested against this fake by +# monkeypatching the spawn and terminate seams. The OS half is tested separately +# below with real processes. + + +class _FakeStdin: + """The fake process's stdin: records what the client writes, signals closure.""" + + def __init__(self, process: "FakeProcess") -> None: + self._process = process + + async def send(self, data: bytes) -> None: + if self._process.stdin_send_blocks: + # A pipe whose reader stopped reading: the write never completes. + await anyio.sleep_forever() + if self._process.stdin_send_error is not None: + raise self._process.stdin_send_error + if self._process.returncode is not None: + # What the asyncio backend surfaces when writing to a dead child's pipe. + raise ConnectionResetError("Connection lost") + self._process.written.append(data) + + async def aclose(self) -> None: + self._process.stdin_closed.set() + if self._process.on_stdin_close is not None: + self._process.on_stdin_close() + if self._process.stdin_aclose_error is not None: + raise self._process.stdin_aclose_error + + +class _FakeStdout: + """The fake process's stdout: delegates to the in-memory stream, optionally + surfacing the abrupt-death or close-time errors a real pipe can.""" + + def __init__( + self, + inner: MemoryObjectReceiveStream[bytes], + *, + eof_error: Exception | None = None, + aclose_error: Exception | None = None, + ) -> None: + self._inner = inner + self._eof_error = eof_error + self._aclose_error = aclose_error + + async def receive(self) -> bytes: + try: + return await self._inner.receive() + except anyio.EndOfStream: + if self._eof_error is not None: + # A hard-killed pipe surfaces a reset, not EOF, on the proactor loop. + raise self._eof_error from None + raise + + async def aclose(self) -> None: + await self._inner.aclose() + if self._aclose_error is not None: + raise self._aclose_error + # Real async closes yield; keeps the fake honest and shutdown scheduling realistic. + await anyio.lowlevel.checkpoint() + + +class FakeProcess: + """In-memory stand-in for the spawned server process. + + The test feeds server-to-client bytes with `feed`/`close_stdout`, reads what the + client wrote to the server's stdin from `written`, and controls process death via + `exit` (or an `on_stdin_close` callback playing a server that exits when its + stdin closes). The error/blocking knobs replay specific pipe failure modes. + """ + + def __init__( + self, + on_stdin_close: Callable[[], None] | None = None, + stdin_aclose_error: Exception | None = None, + stdin_send_error: Exception | None = None, + stdin_send_blocks: bool = False, + stdout_eof_error: Exception | None = None, + stdout_aclose_error: Exception | None = None, + ) -> None: + self._stdout_send, stdout_receive = anyio.create_memory_object_stream[bytes](math.inf) + self.stdout = _FakeStdout(stdout_receive, eof_error=stdout_eof_error, aclose_error=stdout_aclose_error) + self.pid = 424242 + self.written: list[bytes] = [] + self.stdin_closed = anyio.Event() + self.returncode: int | None = None + self.on_stdin_close = on_stdin_close + self.stdin_aclose_error = stdin_aclose_error + self.stdin_send_error = stdin_send_error + self.stdin_send_blocks = stdin_send_blocks + self.stdin = _FakeStdin(self) + + async def feed(self, data: bytes) -> None: + """Make `data` readable on the fake process's stdout.""" + await self._stdout_send.send(data) + + def close_stdout(self) -> None: + """End the fake process's stdout, as the kernel does when it dies.""" + self._stdout_send.close() + + def exit(self, code: int = 0) -> None: + """Die: set the exit code and EOF stdout, as the kernel does.""" + self.returncode = code + self.close_stdout() + + def pending_stdout_chunks(self) -> int: + """How many fed chunks the client has not yet pulled off the fake stdout.""" + return self._stdout_send.statistics().current_buffer_used + + +def install_fake_process( + monkeypatch: pytest.MonkeyPatch, process: FakeProcess, *, grace_period: float | None = 0.2 +) -> list[FakeProcess]: + """Route stdio_client's spawn and terminate seams to `process`. + + Returns the list that records every process the (fake) tree termination was + invoked on; terminating marks the fake dead and EOFs its stdout, as the real + kill does. `grace_period` shortens the stdin-close grace so wall-clock tests + stay fast; `None` keeps the production value (the virtual-clock test can + afford it, and pins it). + """ + terminated: list[FakeProcess] = [] + + async def fake_spawn( + command: str, + args: list[str], + env: dict[str, str] | None = None, + errlog: TextIO = sys.stderr, + cwd: Path | str | None = None, + ) -> FakeProcess: + return process + + async def fake_terminate_tree(proc: FakeProcess) -> None: + terminated.append(proc) + proc.exit(-15) + + monkeypatch.setattr(stdio, "_create_platform_compatible_process", fake_spawn) + monkeypatch.setattr(stdio, "_terminate_process_tree", fake_terminate_tree) + if grace_period is not None: + monkeypatch.setattr(stdio, "PROCESS_TERMINATION_TIMEOUT", grace_period) + return terminated + + +FAKE_PARAMS = StdioServerParameters(command="fake-server") + + +def _line(message: JSONRPCMessage) -> bytes: + """The wire form of `message`: one JSON document on its own line.""" + return (message.model_dump_json(by_alias=True, exclude_unset=True) + "\n").encode() + + +async def _next_message(read_stream: MemoryObjectReceiveStream[SessionMessage | Exception]) -> JSONRPCMessage: + received = await read_stream.receive() + assert isinstance(received, SessionMessage) + return received.message + -tee = shutil.which("tee") +@pytest.mark.anyio +async def test_messages_split_and_packed_across_chunks_are_reframed(monkeypatch: pytest.MonkeyPatch) -> None: + """Framing survives arbitrary chunk boundaries: a message split mid-byte, two + messages packed into one chunk, and a CRLF-terminated message are each delivered + exactly once, and a trailing line without a newline is not delivered. + + A real pipe delivers short writes as whole lines, so only an in-process stdout + can pin the reassembly buffer deterministically. + """ + ping = JSONRPCRequest(jsonrpc="2.0", id=1, method="ping") + pong = JSONRPCResponse(jsonrpc="2.0", id=1, result={}) + ping2 = JSONRPCRequest(jsonrpc="2.0", id=2, method="ping") + process = FakeProcess(on_stdin_close=lambda: process.exit(0)) + + install_fake_process(monkeypatch, process) + + with anyio.fail_after(5): + async with stdio_client(FAKE_PARAMS) as (read_stream, _): + # Split the first message mid-bytes, pack the rest of it together with + # the whole second message, a CRLF-framed third (the SDK's own server + # emits \r\n on Windows; jiter treats the \r as JSON whitespace), and a + # partial fourth into one chunk. + wire = _line(ping) + crlf_wire = ping2.model_dump_json(by_alias=True, exclude_unset=True).encode() + b"\r\n" + await process.feed(wire[:7]) + await process.feed(wire[7:] + _line(pong) + crlf_wire + b'{"jsonrpc": "2.0", "id": 99') + + assert await _next_message(read_stream) == ping + assert await _next_message(read_stream) == pong + assert await _next_message(read_stream) == ping2 + + # The partial trailing message is dropped at EOF rather than delivered + # broken: EOF ends the read stream with nothing further on it. + # (no branch: coverage mis-traces the exit arc of a `with` whose body + # raises inside a nested async context.) + with pytest.raises(anyio.EndOfStream): # pragma: no branch + process.close_stdout() + await read_stream.receive() @pytest.mark.anyio -@pytest.mark.skipif(tee is None, reason="could not find tee command") -async def test_stdio_context_manager_exiting(): - assert tee is not None - async with stdio_client(StdioServerParameters(command=tee)) as (_, _): - pass +async def test_each_outgoing_message_is_written_as_exactly_one_line(monkeypatch: pytest.MonkeyPatch) -> None: + """Client -> server framing: every sent message reaches the server's stdin as + exactly one newline-terminated JSON document.""" + ping = JSONRPCRequest(jsonrpc="2.0", id=1, method="ping") + pong = JSONRPCResponse(jsonrpc="2.0", id=1, result={}) + process = FakeProcess(on_stdin_close=lambda: process.exit(0)) + + install_fake_process(monkeypatch, process) + + with anyio.fail_after(5): + async with stdio_client(FAKE_PARAMS) as (_, write_stream): + await write_stream.send(SessionMessage(ping)) + await write_stream.send(SessionMessage(pong)) + # The zero-buffer handoff resumes this task before the writer task has + # necessarily written; wait until it has gone back to blocking on the + # next message, at which point both writes have landed. + await anyio.wait_all_tasks_blocked() + assert process.written == [_line(ping), _line(pong)] @pytest.mark.anyio -@pytest.mark.skipif(tee is None, reason="could not find tee command") -async def test_stdio_client(): - assert tee is not None - server_parameters = StdioServerParameters(command=tee) - - async with stdio_client(server_parameters) as (read_stream, write_stream): - # Test sending and receiving messages - messages = [ - JSONRPCRequest(jsonrpc="2.0", id=1, method="ping"), - JSONRPCResponse(jsonrpc="2.0", id=2, result={}), - ] - - async with write_stream: - for message in messages: - session_message = SessionMessage(message) - await write_stream.send(session_message) - - read_messages: list[JSONRPCMessage] = [] - async with read_stream: - async for message in read_stream: - if isinstance(message, Exception): # pragma: no cover - raise message - - read_messages.append(message.message) - if len(read_messages) == 2: - break - - assert len(read_messages) == 2 - assert read_messages[0] == JSONRPCRequest(jsonrpc="2.0", id=1, method="ping") - assert read_messages[1] == JSONRPCResponse(jsonrpc="2.0", id=2, result={}) +async def test_invalid_json_from_the_server_surfaces_as_an_in_stream_exception( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """A line that fails JSON-RPC validation is delivered as an Exception on the read + stream, and the messages after it still come through.""" + ping = JSONRPCRequest(jsonrpc="2.0", id=1, method="ping") + process = FakeProcess(on_stdin_close=lambda: process.exit(0)) + + install_fake_process(monkeypatch, process) + + with anyio.fail_after(5): + async with stdio_client(FAKE_PARAMS) as (read_stream, _): + await process.feed(b"not json\n" + _line(ping)) + + error = await read_stream.receive() + # The transport surfaces parse failures as the underlying validation error. + assert isinstance(error, ValueError) + assert await _next_message(read_stream) == ping @pytest.mark.anyio -async def test_stdio_client_bad_path(): - """Check that the connection doesn't hang if process errors.""" - server_params = StdioServerParameters(command=sys.executable, args=["-c", "non-existent-file.py"]) - async with stdio_client(server_params) as (read_stream, write_stream): - async with ClientSession(read_stream, write_stream) as session: - # The session should raise an error when the connection closes +async def test_a_server_that_dies_before_responding_fails_initialize_with_connection_closed( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Server death (stdout EOF) is reported to the session as a closed connection + instead of hanging the in-flight initialize.""" + process = FakeProcess(on_stdin_close=lambda: process.exit(0)) + process.exit(1) + + install_fake_process(monkeypatch, process) + + with anyio.fail_after(5): + async with ( + stdio_client(FAKE_PARAMS) as (read_stream, write_stream), + ClientSession(read_stream, write_stream) as session, + ): with pytest.raises(MCPError) as exc_info: await session.initialize() - # Check that we got a connection closed error assert exc_info.value.error.code == CONNECTION_CLOSED - assert "Connection closed" in exc_info.value.error.message + assert exc_info.value.error.message == "Connection closed" + + +@pytest.mark.anyio +async def test_a_server_that_exits_on_stdin_close_is_never_terminated(monkeypatch: pytest.MonkeyPatch) -> None: + """The shutdown's first step — closing stdin — suffices for a well-behaved server: + the escalation is never invoked. The fake's stdin also raises on close (it is + already closing), which the shutdown must tolerate.""" + + process = FakeProcess( + on_stdin_close=lambda: process.exit(0), + stdin_aclose_error=anyio.ClosedResourceError(), + ) + terminated = install_fake_process(monkeypatch, process) + + with anyio.fail_after(5): + async with stdio_client(FAKE_PARAMS): + pass + + assert terminated == [] + assert process.stdin_closed.is_set() + + +def test_escalation_fires_once_and_only_after_the_grace_period(monkeypatch: pytest.MonkeyPatch) -> None: + """A server that ignores stdin closure is terminated at the grace deadline exactly: + no earlier than the production `PROCESS_TERMINATION_TIMEOUT` on the runtime clock, + and by the first `returncode` poll after it. + + The suite's only direct trio use: anyio's pytest plugin cannot hand the backend a + clock, so the test calls `trio.run` itself with a `MockClock` that jumps whenever + all tasks are blocked. Every time primitive — the grace deadline loop, `fail_after` + — rides that one virtual clock, so the production grace elapses instantly and the + bound can be two-sided, which a wall-clock version cannot afford (a real upper + bound flakes under load). That virtual seconds correspond to wall seconds is the + runtime clock's contract, deliberately not re-tested here. + """ + + class ClockedFakeProcess(FakeProcess): + """Records the virtual time of each death; `exit` is only called by the + (fake) tree termination here, so these are the escalation timestamps.""" + + def __init__(self) -> None: + super().__init__() + self.exit_times: list[float] = [] + + def exit(self, code: int = 0) -> None: + self.exit_times.append(trio.current_time()) + super().exit(code) + + process = ClockedFakeProcess() + terminated = install_fake_process(monkeypatch, process, grace_period=None) + + async def run_client() -> float: + with anyio.fail_after(stdio.PROCESS_TERMINATION_TIMEOUT + 5): # virtual seconds + async with stdio_client(FAKE_PARAMS): + # Evaluated just before the context exits: the moment cleanup begins. + return trio.current_time() + + cleanup_started = trio.run(run_client, clock=trio.testing.MockClock(autojump_threshold=0)) + + assert terminated == [process] + virtual_elapsed = process.exit_times[0] - cleanup_started + # Two-sided: never before the grace deadline, and within one poll interval past + # it — consumed by shutdown's writer-flush poll; the grace wait itself fires + # exactly at its deadline. (The epsilon absorbs float accumulation across the + # virtual sleeps.) + assert ( + stdio.PROCESS_TERMINATION_TIMEOUT + <= virtual_elapsed + <= stdio.PROCESS_TERMINATION_TIMEOUT + _EXIT_POLL_INTERVAL + 1e-9 + ), virtual_elapsed @pytest.mark.anyio -async def test_stdio_client_nonexistent_command(): - """Test that stdio_client raises an error for non-existent commands.""" - # Create a server with a non-existent command +async def test_cancelling_the_client_still_runs_the_full_shutdown(monkeypatch: pytest.MonkeyPatch) -> None: + """Cancellation (a client timeout, app shutdown) must not skip the shutdown + sequence: stdin is still closed and a server ignoring it is still terminated. + Without the shielded shutdown this leaks the process and can deadlock.""" + process = FakeProcess() + terminated = install_fake_process(monkeypatch, process, grace_period=0.05) + entered = anyio.Event() + # Cancel a scope owned by the client's task, not the test's task group: a host + # self-cancel is delivered by throwing through this test function's suspended + # frames, and Python 3.11's tracer loses coverage events after such a throw() + # traversal (python/cpython#106749). + cancel_scope = anyio.CancelScope() + + async def run_client_until_cancelled() -> None: + with cancel_scope: + async with stdio_client(FAKE_PARAMS): + entered.set() + await anyio.sleep_forever() + + with anyio.fail_after(5): + async with anyio.create_task_group() as tg: + tg.start_soon(run_client_until_cancelled) + await entered.wait() + cancel_scope.cancel() + + assert process.stdin_closed.is_set() + assert terminated == [process] + + +@pytest.mark.anyio +async def test_writing_after_the_server_dies_reports_clean_closure(monkeypatch: pytest.MonkeyPatch) -> None: + """A send racing the server's death must not surface a raw backend exception + (ConnectionResetError in an exception group) out of the context manager; the + transport still shuts down cleanly.""" + ping = JSONRPCRequest(jsonrpc="2.0", id=1, method="ping") + process = FakeProcess(on_stdin_close=lambda: process.exit(0)) + + install_fake_process(monkeypatch, process) + + with anyio.fail_after(5): + async with stdio_client(FAKE_PARAMS) as (_, write_stream): + process.exit(1) + # The fake's stdin now raises ConnectionResetError, as a dead child's pipe does. + await write_stream.send(SessionMessage(ping)) + + assert process.written == [] + + +@pytest.mark.anyio +async def test_exiting_with_an_unconsumed_server_message_does_not_raise(monkeypatch: pytest.MonkeyPatch) -> None: + """Exiting while a server message is still undelivered must be a clean exit. + + Shutdown closes the read stream out from under the reader task blocked on + delivering the message; that closure must not escape to the caller as a + BrokenResourceError in an exception group.""" + ping = JSONRPCRequest(jsonrpc="2.0", id=1, method="ping") + process = FakeProcess(on_stdin_close=lambda: process.exit(0)) + + install_fake_process(monkeypatch, process) + + with anyio.fail_after(5): + async with stdio_client(FAKE_PARAMS): + # Feed a message and never receive it: the reader parses it and blocks + # delivering into the zero-buffer read stream until shutdown breaks the send. + await process.feed(_line(ping)) + # Wait until the reader task is genuinely parked on its blocked send + # before shutdown closes the stream out from under it. + await anyio.wait_all_tasks_blocked() + + +@pytest.mark.anyio +async def test_spawn_failure_propagates_the_error_and_leaks_no_streams(monkeypatch: pytest.MonkeyPatch) -> None: + """When the spawn itself fails, the OSError reaches the caller and the transport's + internal streams are all closed (an unclosed stream would fail the test through + its GC-time ResourceWarning under filterwarnings=error).""" + + async def failing_spawn( + command: str, + args: list[str], + env: dict[str, str] | None = None, + errlog: TextIO = sys.stderr, + cwd: Path | str | None = None, + ) -> FakeProcess: + raise OSError(errno.EACCES, "Permission denied") + + monkeypatch.setattr(stdio, "_create_platform_compatible_process", failing_spawn) + + with pytest.raises(OSError) as exc_info: + async with stdio_client(FAKE_PARAMS): + pass # pragma: no cover + + assert exc_info.value.errno == errno.EACCES + # Drop the ExceptionInfo before collecting: its traceback references the suspended + # stdio_client frame, which would keep leaked streams alive across the collect. + del exc_info + gc.collect() + + +@pytest.mark.anyio +async def test_a_command_that_cannot_be_execed_raises_enoent() -> None: + """A command that cannot be exec'd raises OSError(ENOENT) out of stdio_client.""" server_params = StdioServerParameters( command="/path/to/nonexistent/command", args=["--help"], ) - # Should raise an error when trying to start the process with pytest.raises(OSError) as exc_info: - async with stdio_client(server_params) as (_, _): + async with stdio_client(server_params): pass # pragma: no cover - # The error should indicate the command was not found (ENOENT: No such file or directory) assert exc_info.value.errno == errno.ENOENT @pytest.mark.anyio -async def test_stdio_client_universal_cleanup(): - """Test that stdio_client completes cleanup within reasonable time - even when connected to processes that exit slowly. - """ +async def test_cancellation_during_spawn_leaks_no_streams(monkeypatch: pytest.MonkeyPatch) -> None: + """A caller timeout that fires while the spawn is still in flight (interpreter + cold start) must not leak the transport's internal streams: an unclosed stream + would fail the test through its GC-time ResourceWarning under + filterwarnings=error.""" + spawn_started = anyio.Event() + + async def hanging_spawn( + command: str, + args: list[str], + env: dict[str, str] | None = None, + errlog: TextIO = sys.stderr, + cwd: Path | str | None = None, + ) -> FakeProcess: + spawn_started.set() + await anyio.sleep_forever() + raise NotImplementedError("unreachable: the spawn is cancelled while parked") + + monkeypatch.setattr(stdio, "_create_platform_compatible_process", hanging_spawn) + + # Cancel a scope owned by the client's task, not the test's task group: a host + # self-cancel is delivered by throwing through this test function's suspended + # frames, and Python 3.11's tracer loses coverage events after such a throw() + # traversal (python/cpython#106749). + cancel_scope = anyio.CancelScope() + + async def run_client() -> None: + with cancel_scope: + async with stdio_client(FAKE_PARAMS): + pass # pragma: no cover + + with anyio.fail_after(5): + async with anyio.create_task_group() as tg: + tg.start_soon(run_client) + await spawn_started.wait() + cancel_scope.cancel() + + gc.collect() - # Use a Python script that simulates a long-running process - # This ensures consistent behavior across platforms - long_running_script = textwrap.dedent( - """ - import time - import sys - - # Simulate a long-running process - for i in range(100): - time.sleep(0.1) - # Flush to ensure output is visible - sys.stdout.flush() - sys.stderr.flush() - """ - ) - server_params = StdioServerParameters( - command=sys.executable, - args=["-c", long_running_script], +@pytest.mark.anyio +async def test_a_non_oserror_spawn_failure_propagates_and_leaks_no_streams(monkeypatch: pytest.MonkeyPatch) -> None: + """Spawning can fail with more than OSError (e.g. ValueError for a NUL byte in + the command); the error reaches the caller and the transport's internal streams + are still all closed (checked through GC-time ResourceWarnings, as above).""" + + async def failing_spawn( + command: str, + args: list[str], + env: dict[str, str] | None = None, + errlog: TextIO = sys.stderr, + cwd: Path | str | None = None, + ) -> FakeProcess: + raise ValueError("embedded null byte") + + monkeypatch.setattr(stdio, "_create_platform_compatible_process", failing_spawn) + + with pytest.raises(ValueError, match="embedded null byte"): + async with stdio_client(FAKE_PARAMS): + pass # pragma: no cover + + gc.collect() + + +@pytest.mark.anyio +async def test_a_message_sent_just_before_exit_is_flushed_to_the_server(monkeypatch: pytest.MonkeyPatch) -> None: + """A message the transport accepted must reach the server even when the caller + exits immediately after sending it. The first-ever send parks the sender until + the writer takes the message, but once the writer is parked waiting, a send is + a pure handoff that returns before the writer has written — so the second + message here is the one shutdown must let the writer flush before it closes the + server's stdin.""" + ping = JSONRPCRequest(jsonrpc="2.0", id=1, method="ping") + pong = JSONRPCResponse(jsonrpc="2.0", id=1, result={}) + process = FakeProcess(on_stdin_close=lambda: process.exit(0)) + + install_fake_process(monkeypatch, process) + + with anyio.fail_after(5): + async with stdio_client(FAKE_PARAMS) as (_, write_stream): + await write_stream.send(SessionMessage(ping)) + await write_stream.send(SessionMessage(pong)) + + assert process.written == [_line(ping), _line(pong)] + + +@pytest.mark.anyio +async def test_a_failed_write_to_a_live_server_closes_the_read_stream_instead_of_hanging( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """When a write fails but the server is still alive (it closed its own stdin, so + stdout never EOFs), the transport must end the read stream — that closure is what + lets a session map the loss to CONNECTION_CLOSED instead of waiting forever for a + response. EIO also pins that plain OSError, not just ConnectionError, is handled. + + Steps: + 1. A send fails with EIO while the server is alive; the writer signals the + read side, so the read stream ends. + 2. Output the server produces afterwards is still drained (the reader keeps + consuming without delivering), so the server cannot wedge on a full pipe. + """ + ping = JSONRPCRequest(jsonrpc="2.0", id=1, method="ping") + pong = JSONRPCResponse(jsonrpc="2.0", id=1, result={}) + process = FakeProcess( + on_stdin_close=lambda: process.exit(0), + stdin_send_error=OSError(errno.EIO, "I/O error"), ) + terminated = install_fake_process(monkeypatch, process) - start_time = time.time() + with anyio.fail_after(5): + async with stdio_client(FAKE_PARAMS) as (read_stream, write_stream): + await write_stream.send(SessionMessage(ping)) - with anyio.move_on_after(8.0) as cancel_scope: - async with stdio_client(server_params) as (_, _): - # Immediately exit - this triggers cleanup while process is still running - pass + with pytest.raises(anyio.EndOfStream): + await read_stream.receive() - end_time = time.time() - elapsed = end_time - start_time + await process.feed(_line(pong)) + await anyio.wait_all_tasks_blocked() + assert process.pending_stdout_chunks() == 0 - # On Windows: 2s (stdin wait) + 2s (terminate wait) + overhead = ~5s expected - assert elapsed < 6.0, ( - f"stdio_client cleanup took {elapsed:.1f} seconds, expected < 6.0 seconds. " - f"This suggests the timeout mechanism may not be working properly." - ) + assert process.written == [] + assert terminated == [] - # Check if we timed out - if cancel_scope.cancelled_caught: # pragma: no cover - pytest.fail( - "stdio_client cleanup timed out after 8.0 seconds. " - "This indicates the cleanup mechanism is hanging and needs fixing." - ) + +@pytest.mark.anyio +async def test_exit_completes_when_a_write_is_wedged_in_a_pipe_no_one_reads( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Exiting must stay bounded even when the writer task is parked in a write that + can never complete (the pipe's reader is gone but the pipe is not broken — e.g. a + kill-surviving descendant holds the read end without reading). The flush window + expires and the post-shutdown cancellation unparks the writer.""" + ping = JSONRPCRequest(jsonrpc="2.0", id=1, method="ping") + process = FakeProcess(on_stdin_close=lambda: process.exit(0), stdin_send_blocks=True) + terminated = install_fake_process(monkeypatch, process) + monkeypatch.setattr(stdio, "_WRITER_FLUSH_TIMEOUT", 0.05) + + with anyio.fail_after(5): + async with stdio_client(FAKE_PARAMS) as (_, write_stream): + await write_stream.send(SessionMessage(ping)) + # Wait until the writer task is genuinely parked inside the wedged send. + await anyio.wait_all_tasks_blocked() + + assert process.written == [] + assert terminated == [] + assert process.stdin_closed.is_set() @pytest.mark.anyio -@pytest.mark.skipif(sys.platform == "win32", reason="Windows signal handling is different") -async def test_stdio_client_sigint_only_process(): # pragma: lax no cover - """Test cleanup with a process that ignores SIGTERM but responds to SIGINT.""" - # Create a Python script that ignores SIGTERM but handles SIGINT - script_content = textwrap.dedent( - """ - import signal - import sys - import time - - # Ignore SIGTERM (what process.terminate() sends) - signal.signal(signal.SIGTERM, signal.SIG_IGN) - - # Handle SIGINT (Ctrl+C signal) by exiting cleanly - def sigint_handler(signum, frame): - sys.exit(0) - - signal.signal(signal.SIGINT, sigint_handler) - - # Keep running until SIGINT received - while True: - time.sleep(0.1) - """ +async def test_undelivered_server_output_is_drained_at_shutdown_so_the_server_can_exit( + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Output the caller never received is consumed during the stdin-close grace + period rather than left to back up: a real server flushing its remaining output + on the way out would otherwise block on a full pipe, never reach its stdin read, + and be killed despite being well-behaved. The fake here ignores stdin closure + (so it is ultimately terminated); what the test pins is that its backlog was + drained during the grace window, when a flushing server needs it.""" + ping = JSONRPCRequest(jsonrpc="2.0", id=1, method="ping") + pong = JSONRPCResponse(jsonrpc="2.0", id=1, result={}) + process = FakeProcess() + terminated = install_fake_process(monkeypatch, process) + + with anyio.fail_after(5): + async with stdio_client(FAKE_PARAMS): + # Three separate chunks: the reader parks delivering the first; the other + # two sit unconsumed in the pipe when shutdown begins. + await process.feed(_line(ping)) + await process.feed(_line(pong)) + await process.feed(_line(ping)) + await anyio.wait_all_tasks_blocked() + assert process.pending_stdout_chunks() == 2 + + assert terminated == [process] + assert process.pending_stdout_chunks() == 0 + + +@pytest.mark.anyio +async def test_a_kill_racing_a_pending_stdout_read_is_swallowed_during_shutdown( + monkeypatch: pytest.MonkeyPatch, + caplog: pytest.LogCaptureFixture, +) -> None: + """The shutdown escalation can hard-kill the server while a stdout read is + pending; the proactor backend surfaces that as ConnectionResetError from the + read. It must not escape the context manager as an exception group, and being + expected teardown noise, it is not logged as an error either.""" + process = FakeProcess(stdout_eof_error=ConnectionResetError("read torn down by kill")) + terminated = install_fake_process(monkeypatch, process) + + with anyio.fail_after(5): + async with stdio_client(FAKE_PARAMS): + pass # the fake ignores stdin closure, so shutdown must escalate + + assert terminated == [process] + assert not [record for record in caplog.records if record.levelno >= logging.ERROR] + + +@pytest.mark.anyio +async def test_a_mid_session_stdout_failure_is_logged_and_surfaces_as_clean_closure( + monkeypatch: pytest.MonkeyPatch, + caplog: pytest.LogCaptureFixture, +) -> None: + """A stdout read failure in the middle of a session (not during shutdown) ends + the read stream cleanly — no raw exception out of the context manager — and + leaves an error log identifying the failure, unlike the silent shutdown case.""" + process = FakeProcess( + on_stdin_close=lambda: process.exit(0), + stdout_eof_error=ConnectionResetError("pipe failed mid-session"), ) + install_fake_process(monkeypatch, process) - server_params = StdioServerParameters( - command=sys.executable, - args=["-c", script_content], + with anyio.fail_after(5): + async with stdio_client(FAKE_PARAMS) as (read_stream, _): + process.exit(1) + # (no branch: coverage mis-traces the exit arc of a `with` whose body + # raises inside a nested async context.) + with pytest.raises(anyio.EndOfStream): # pragma: no branch + await read_stream.receive() + + assert "stdout failed mid-session" in caplog.text + + +@pytest.mark.anyio +async def test_a_failing_stdout_close_still_closes_the_transport_streams(monkeypatch: pytest.MonkeyPatch) -> None: + """A close-time error on the process's stdout (a contended pipe handle on the + Windows fallback) must not abort the rest of the shutdown: the context exits + cleanly and the internal streams are all closed (checked through GC-time + ResourceWarnings under filterwarnings=error).""" + process = FakeProcess( + on_stdin_close=lambda: process.exit(0), + stdout_aclose_error=OSError(errno.EBADF, "Bad file descriptor"), ) + terminated = install_fake_process(monkeypatch, process) - start_time = time.time() - - try: - # Use anyio timeout to prevent test from hanging forever - with anyio.move_on_after(5.0) as cancel_scope: - async with stdio_client(server_params) as (_, _): - # Let the process start and begin ignoring SIGTERM - await anyio.sleep(0.5) - # Exit context triggers cleanup - this should not hang - pass - - if cancel_scope.cancelled_caught: # pragma: no cover - raise TimeoutError("Test timed out") - - end_time = time.time() - elapsed = end_time - start_time - - # Should complete quickly even with SIGTERM-ignoring process - # This will fail if cleanup only uses process.terminate() without fallback - assert elapsed < SIGTERM_IGNORING_PROCESS_TIMEOUT, ( - f"stdio_client cleanup took {elapsed:.1f} seconds with SIGTERM-ignoring process. " - f"Expected < {SIGTERM_IGNORING_PROCESS_TIMEOUT} seconds. " - "This suggests the cleanup needs SIGINT/SIGKILL fallback." - ) - except (TimeoutError, Exception) as e: # pragma: no cover - if isinstance(e, TimeoutError) or "timed out" in str(e): - pytest.fail( - f"stdio_client cleanup timed out after {SIGTERM_IGNORING_PROCESS_TIMEOUT} seconds " - "with SIGTERM-ignoring process. " - "This confirms the cleanup needs SIGINT/SIGKILL fallback for processes that ignore SIGTERM." - ) - else: - raise + with anyio.fail_after(5): + async with stdio_client(FAKE_PARAMS): + pass + + assert terminated == [] + gc.collect() + + +@pytest.mark.anyio +async def test_a_process_surviving_the_kill_escalation_is_logged_and_abandoned( + monkeypatch: pytest.MonkeyPatch, + caplog: pytest.LogCaptureFixture, +) -> None: + """If the process is still alive after the whole escalation (D-state, an + unsignalable survivor), shutdown completes — bounded — and leaves a warning + instead of silently leaking a live process.""" + process = FakeProcess() # ignores stdin closure and survives "termination" + install_fake_process(monkeypatch, process, grace_period=0.05) + + stubborn: list[FakeProcess] = [] + + async def stubborn_terminate(proc: FakeProcess) -> None: + stubborn.append(proc) # the kill has no effect + + monkeypatch.setattr(stdio, "_terminate_process_tree", stubborn_terminate) + monkeypatch.setattr(stdio, "_KILL_REAP_TIMEOUT", 0.05) + + with anyio.fail_after(5): + async with stdio_client(FAKE_PARAMS): + pass + + assert stubborn == [process] + assert process.returncode is None + assert "still alive after the kill escalation" in caplog.text + # The fake "survived", so nothing ever EOF'd its stdout pipe; release it here + # or its GC-time ResourceWarning would fail a later test. + process.close_stdout() # --------------------------------------------------------------------------- -# TestChildProcessCleanup — socket-based deterministic child liveness probe +# POSIX tree-termination policy, tested through the sanctioned killpg seam # --------------------------------------------------------------------------- # -# These tests verify that `_terminate_process_tree()` kills the *entire* process -# tree (not just the immediate child), which is critical for cleaning up tools -# like `npx` that spawn their own subprocesses. -# -# Mechanism: each subprocess in the tree connects a TCP socket back to a -# listener owned by the test. We then use two kernel-guaranteed blocking-I/O -# signals — neither requires any `sleep()` or polling loop: +# `mcp.os.posix.utilities` is coverage-omitted and explicitly the place where OS +# calls are monkeypatched: these tests pin the EPERM policy (macOS raises EPERM +# from killpg when *any* group member cannot be signalled, even though permitted +# members were) without needing a foreign-euid process. + + +class _StubPosixProcess: + """The two attributes `terminate_posix_process_tree` touches: the pgid source + and the reap-progress probe.""" + + pid = 54321 + returncode: int | None = None + + +@pytest.mark.anyio +@pytest.mark.skipif(sys.platform == "win32", reason="POSIX killpg semantics") +# Excluded from coverage (lax: exempt from strict-no-cover) because coverage is enforced +# per CI job at 100%, including on Windows runners, where this test is skipped. +async def test_an_eperm_group_that_dies_during_the_grace_period_is_not_sigkilled( # pragma: lax no cover + monkeypatch: pytest.MonkeyPatch, +) -> None: + """EPERM from the SIGTERM killpg no longer short-circuits termination: the grace + wait still runs, and a group observed to be gone during it is never SIGKILLed.""" + calls: list[tuple[int, int]] = [] + probes = 0 + + def fake_killpg(pgid: int, sig: int) -> None: + nonlocal probes + calls.append((pgid, sig)) + if sig == signal.SIGTERM: + raise PermissionError("one group member has a foreign euid") + if sig == 0: + probes += 1 + if probes == 1: + raise PermissionError("survivors we may not signal") + raise ProcessLookupError("group is gone") + raise NotImplementedError("no other signal should be sent") + + monkeypatch.setattr(posix_utilities.os, "killpg", fake_killpg) + stub = _StubPosixProcess() + + with anyio.fail_after(5): + await terminate_posix_process_tree(cast(anyio.abc.Process, stub)) + + assert calls == [(stub.pid, signal.SIGTERM), (stub.pid, 0), (stub.pid, 0)] + + +@pytest.mark.anyio +@pytest.mark.skipif(sys.platform == "win32", reason="POSIX killpg semantics") +# Excluded from coverage for the same Windows-runner reason as above. +async def test_an_eperm_group_that_outlives_the_grace_period_is_still_sigkilled( # pragma: lax no cover + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Even when every probe reports EPERM, the SIGKILL escalation still fires after + the grace period (and its own EPERM is tolerated) — pre-fix, EPERM at SIGTERM + abandoned the group escalation for a leader-only kill, leaking every other group + member. The tiny timeout is the time-based grace period under test.""" + calls: list[tuple[int, int]] = [] + + def fake_killpg(pgid: int, sig: int) -> None: + calls.append((pgid, sig)) + if sig in (signal.SIGTERM, 0, signal.SIGKILL): + raise PermissionError("a foreign-euid member never goes away") + raise NotImplementedError("no other signal should be sent") + + monkeypatch.setattr(posix_utilities.os, "killpg", fake_killpg) + stub = _StubPosixProcess() + + with anyio.fail_after(5): + await terminate_posix_process_tree(cast(anyio.abc.Process, stub), timeout_seconds=0.05) + + assert calls[0] == (stub.pid, signal.SIGTERM) + assert calls[-1] == (stub.pid, signal.SIGKILL) + assert set(calls[1:-1]) == {(stub.pid, 0)} + + +@pytest.mark.anyio +@pytest.mark.parametrize("anyio_backend", ["asyncio", "trio"]) +@pytest.mark.skipif(sys.platform == "win32", reason="POSIX killpg semantics") +# Excluded from coverage for the same Windows-runner reason as above. +async def test_the_grace_wait_reads_returncode_so_trio_can_reap_the_leaders_zombie( # pragma: lax no cover + monkeypatch: pytest.MonkeyPatch, +) -> None: + """The wait between SIGTERM and SIGKILL reads `process.returncode` while it polls: + on trio that property calls `Popen.poll()`, and that reap is what stops the + leader's zombie from keeping the process group alive for the full timeout (see the + comment in `terminate_posix_process_tree`). Regression pin for the read itself, on + both backends — the reaping side effect is trio's documented `returncode` + behaviour, deliberately not re-tested here.""" + + calls: list[tuple[int, int]] = [] + + def fake_killpg(pgid: int, sig: int) -> None: + # SIGTERM is accepted and every liveness probe reports survivors, so the + # grace wait runs to its (tiny) timeout and the SIGKILL escalation fires. + calls.append((pgid, sig)) + + class _ReadCountingProcess: + """A live-forever leader whose `returncode` property counts its reads.""" + + pid = 54321 + + def __init__(self) -> None: + self.returncode_reads = 0 + + @property + def returncode(self) -> int | None: + self.returncode_reads += 1 + return None + + monkeypatch.setattr(posix_utilities.os, "killpg", fake_killpg) + stub = _ReadCountingProcess() + + with anyio.fail_after(5): + await terminate_posix_process_tree(cast(anyio.abc.Process, stub), timeout_seconds=0.05) + + # The wait ran to its deadline (the escalation fired)... + assert calls[0] == (stub.pid, signal.SIGTERM) + assert calls[-1] == (stub.pid, signal.SIGKILL) + # ...and `returncode` was read while it polled — the read that reaps on trio. + assert stub.returncode_reads >= 1 + + +# --------------------------------------------------------------------------- +# Real-process tests: the OS facts no fake can certify +# --------------------------------------------------------------------------- # -# 1. `await listener.accept()` blocks until the subprocess connects, -# proving it is running. -# 2. After `_terminate_process_tree()`, `await stream.receive(1)` raises -# `EndOfStream` (clean close / FIN) or `BrokenResourceError` (abrupt -# close / RST — typical on Windows after TerminateJobObject) because the -# kernel closes all file descriptors when a process terminates. Either -# is the direct, OS-level proof that the child is dead. +# These pin kernel behaviour — process-group kill semantics and SIGKILL delivery — +# using a socket-based liveness probe with two kernel-guaranteed signals, neither of +# which needs a sleep or a poll: # -# This replaces an older file-growth-watching approach whose fixed `sleep()` -# durations raced against slow Python interpreter startup on loaded CI runners. +# 1. `await listener.accept()` blocks until the subprocess connects, proving it is +# running (and that the setup lines before the connect, such as installing a +# signal handler, have executed). +# 2. After cleanup, `await stream.receive(1)` raises `EndOfStream` (clean close / +# FIN) or `BrokenResourceError` (abrupt close / RST, typical of SIGKILL and +# Windows job termination) because the kernel closes all of a process's file +# descriptors when it terminates. def _connect_back_script(port: int) -> str: """Return a ``python -c`` script body that connects to the given port, - sends ``b'alive'``, then blocks forever. Used by TestChildProcessCleanup + sends `b'alive'`, then blocks forever. Used by the cleanup tests' subprocesses as a liveness probe.""" return ( f"import socket, time\n" @@ -256,15 +927,6 @@ def _connect_back_script(port: int) -> str: ) -def _spawn_then_block(child_script: str) -> str: - """Return a ``python -c`` script body that spawns ``child_script`` as a - subprocess, then blocks forever. The ``!r`` injection avoids nested-quote - escaping for arbitrary child script content.""" - return ( - f"import subprocess, sys, time\nsubprocess.Popen([sys.executable, '-c', {child_script!r}])\ntime.sleep(3600)\n" - ) - - async def _open_liveness_listener() -> tuple[anyio.abc.SocketListener, int]: """Open a TCP listener on localhost and return it along with its port.""" multi = await anyio.create_tcp_listener(local_host="127.0.0.1") @@ -290,53 +952,31 @@ async def _accept_alive(sock: anyio.abc.SocketListener) -> anyio.abc.SocketStrea async def _assert_stream_closed(stream: anyio.abc.SocketStream) -> None: - """Assert the peer holding the other end of ``stream`` has terminated. + """Assert the peer holding the other end of `stream` has terminated.""" + with anyio.fail_after(5.0), pytest.raises((anyio.EndOfStream, anyio.BrokenResourceError)): + await stream.receive(1) - When a process dies, the kernel closes its file descriptors including - sockets. The next ``receive()`` on the peer socket unblocks with one of: - - ``anyio.EndOfStream`` — clean close (FIN), typical after graceful exit - or POSIX ``SIGTERM``. - - ``anyio.BrokenResourceError`` — abrupt close (RST), typical after - Windows ``TerminateJobObject`` or POSIX ``SIGKILL``. +# Excluded from coverage (lax: exempt from strict-no-cover) like their only callers, the +# win32-skipped tests below: Windows CI jobs enforce 100% coverage per job, where these +# helpers never execute. +async def _wait_until_exited(proc: anyio.abc.Process) -> None: # pragma: lax no cover + """Wait for the process itself to die, by polling `returncode`. - Either is a deterministic, kernel-level signal that the process is dead — - no sleeps or polling required. - """ - with anyio.fail_after(5.0), pytest.raises((anyio.EndOfStream, anyio.BrokenResourceError)): - await stream.receive(1) + Not `proc.wait()`: on asyncio that resolves only once the process has exited + *and* all its pipes have closed, conflating process death with pipe state.""" + while proc.returncode is None: + await anyio.sleep(0.01) -async def _terminate_and_reap(proc: anyio.abc.Process | FallbackProcess) -> None: - """Terminate the process tree, reap, and tear down pipe transports. - - ``_terminate_process_tree`` kills the OS process group / Job Object but does - not call ``process.wait()`` or clean up the asyncio pipe transports. On - Windows those transports leak and emit ``ResourceWarning`` when GC'd in a - later test, causing ``PytestUnraisableExceptionWarning`` knock-on failures. - - Production ``stdio.py`` avoids this via its ``stdout_reader`` task which - reads stdout to EOF (triggering ``_ProactorReadPipeTransport._eof_received`` - → ``close()``) plus ``async with process:`` which waits and closes stdin. - These tests call ``_terminate_process_tree`` directly, so they replicate - both parts here: ``wait()`` + close stdin + drain stdout to EOF. - - The stdout drain is the non-obvious part: anyio's ``StreamReaderWrapper.aclose()`` - only marks the Python-level reader closed — it never touches the underlying - ``_ProactorReadPipeTransport``. That transport starts paused and only detects - pipe EOF when someone reads, so without a drain it lives until ``__del__``. - - Idempotent: the ``returncode`` guard skips termination if already reaped - (avoids spurious WARNING/ERROR logs from ``terminate_posix_process_tree``'s - fallback path, visible because ``log_cli = true``); ``wait()`` and stream - ``aclose()`` no-op on subsequent calls; the drain raises ``ClosedResourceError`` - on the second call, caught by the suppress. The tests call this explicitly - as the action under test and ``AsyncExitStack`` calls it again on exit as a - safety net. Bounded by ``move_on_after`` to prevent hangs. - """ +async def _reap(proc: anyio.abc.Process) -> None: # pragma: lax no cover + """Reap an already-killed process and release its pipe transports. + + Draining stdout to EOF is what lets the asyncio pipe transport observe the + closure instead of leaking until GC (and warning there). The bound silently + swallows a hung cleanup on purpose: reaping is a safety net, and the test's + own assertions have already passed or failed by the time it runs.""" with anyio.move_on_after(5.0): - if proc.returncode is None: - await _terminate_process_tree(proc) await proc.wait() assert proc.stdin is not None assert proc.stdout is not None @@ -346,215 +986,217 @@ async def _terminate_and_reap(proc: anyio.abc.Process | FallbackProcess) -> None await proc.stdout.aclose() -class TestChildProcessCleanup: - """Integration tests for ``_terminate_process_tree`` covering basic, - nested, and early-parent-exit process tree scenarios. See module-level - comment above for the socket-based liveness probe mechanism. - """ - - @pytest.mark.anyio - async def test_basic_child_process_cleanup(self): - """Parent spawns one child; terminating the tree kills both.""" - async with AsyncExitStack() as stack: - sock, port = await _open_liveness_listener() - stack.push_async_callback(sock.aclose) - - # Parent spawns a child; the child connects back to us. - parent_script = _spawn_then_block(_connect_back_script(port)) - proc = await _create_platform_compatible_process(sys.executable, ["-c", parent_script]) - stack.push_async_callback(_terminate_and_reap, proc) - - # Deterministic: accept() blocks until the child connects. No sleep. - with anyio.fail_after(10.0): - stream = await _accept_alive(sock) - stack.push_async_callback(stream.aclose) - - # Terminate, reap and close transports (wraps _terminate_process_tree, - # the behavior under test). - await _terminate_and_reap(proc) +def _record_spawned_processes(monkeypatch: pytest.MonkeyPatch) -> list[anyio.abc.Process | FallbackProcess]: + """Record every process `stdio_client` spawns (the real spawn still runs), so a + test can inspect it afterwards and tear its process group down on failure.""" + spawned: list[anyio.abc.Process | FallbackProcess] = [] + + async def recording_spawn( + command: str, + args: list[str], + env: dict[str, str] | None = None, + errlog: TextIO = sys.stderr, + cwd: Path | str | None = None, + ) -> anyio.abc.Process | FallbackProcess: + process = await _create_platform_compatible_process(command, args, env, errlog, cwd) + spawned.append(process) + return process + + monkeypatch.setattr(stdio, "_create_platform_compatible_process", recording_spawn) + return spawned + + +# Excluded from coverage (lax: exempt from strict-no-cover): registered on every +# platform but a no-op on Windows, whose runners enforce 100% coverage per job. +def _kill_spawn_groups(spawned: list[anyio.abc.Process | FallbackProcess]) -> None: # pragma: lax no cover + """Failure-path safety net: SIGKILL each spawn-time process group, so a test + failing mid-body cannot orphan its sleep-forever descendants for an hour. The + groups are already gone when the test passed, making this a no-op. On Windows + there is no process group to signal (the Job Object covers strays).""" + if sys.platform == "win32": + return + for process in spawned: + with suppress(ProcessLookupError): + os.killpg(process.pid, signal.SIGKILL) - # Deterministic: kernel closed child's socket when it died. - await _assert_stream_closed(stream) - @pytest.mark.anyio - async def test_nested_process_tree(self): - """Parent → child → grandchild; terminating the tree kills all three.""" - async with AsyncExitStack() as stack: - sock, port = await _open_liveness_listener() - stack.push_async_callback(sock.aclose) - - # Build a three-level chain: parent spawns child, child spawns - # grandchild. Every level connects back to our socket. - grandchild = _connect_back_script(port) - child = ( - f"import subprocess, sys\n" - f"subprocess.Popen([sys.executable, '-c', {grandchild!r}])\n" + _connect_back_script(port) - ) - parent_script = ( - f"import subprocess, sys\n" - f"subprocess.Popen([sys.executable, '-c', {child!r}])\n" + _connect_back_script(port) - ) - proc = await _create_platform_compatible_process(sys.executable, ["-c", parent_script]) - stack.push_async_callback(_terminate_and_reap, proc) - - # Deterministic: three blocking accepts, one per tree level. - streams: list[anyio.abc.SocketStream] = [] - with anyio.fail_after(10.0): - for _ in range(3): - stream = await _accept_alive(sock) +@pytest.mark.anyio +async def test_exiting_the_context_terminates_the_entire_process_tree(monkeypatch: pytest.MonkeyPatch) -> None: + """Exiting `stdio_client` kills the server's whole process tree, not just the + server: a parent that exits instantly on SIGTERM (so the group must outlive its + leader), a child, and a grandchild — every level's death observed through the + kernel closing its liveness socket. + + The grace period is shortened to keep the test fast. The escalation decision — + terminate only after the configured grace elapses — is pinned in process by + test_escalation_fires_once_and_only_after_the_grace_period; the production + constant's value is deliberately unpinned. + """ + monkeypatch.setattr(stdio, "PROCESS_TERMINATION_TIMEOUT", 0.2) + spawned = _record_spawned_processes(monkeypatch) + + async with AsyncExitStack() as stack: + stack.callback(_kill_spawn_groups, spawned) + sock, port = await _open_liveness_listener() + stack.push_async_callback(sock.aclose) + + grandchild = _connect_back_script(port) + child = ( + f"import subprocess, sys\nsubprocess.Popen([sys.executable, '-c', {grandchild!r}])\n" + + _connect_back_script(port) + ) + # The parent exits immediately on SIGTERM and never reads stdin, so cleanup + # must escalate, and the group kill must work even as its leader dies first. + parent = ( + f"import signal, subprocess, sys, time\n" + f"signal.signal(signal.SIGTERM, lambda *_: sys.exit(0))\n" + f"subprocess.Popen([sys.executable, '-c', {child!r}])\n" + _connect_back_script(port) + ) + server_params = StdioServerParameters(command=sys.executable, args=["-c", parent]) + + # The bound covers three Python interpreter cold starts on a loaded runner; + # a healthy run takes well under a second. + with anyio.fail_after(15.0): + async with stdio_client(server_params): + streams = [await _accept_alive(sock) for _ in range(3)] + for stream in streams: stack.push_async_callback(stream.aclose) - streams.append(stream) - - # Terminate the entire tree (wraps _terminate_process_tree). - await _terminate_and_reap(proc) - - # Every level of the tree must be dead: three kernel-level EOFs. - for stream in streams: - await _assert_stream_closed(stream) - - @pytest.mark.anyio - async def test_early_parent_exit(self): - """Parent exits immediately on SIGTERM; process-group termination still - catches the child (exercises the race where the parent dies mid-cleanup). - """ - async with AsyncExitStack() as stack: - sock, port = await _open_liveness_listener() - stack.push_async_callback(sock.aclose) - - # Parent installs a SIGTERM handler that exits immediately, spawns a - # child that connects back to us, then blocks. - child = _connect_back_script(port) - parent_script = ( - f"import signal, subprocess, sys, time\n" - f"signal.signal(signal.SIGTERM, lambda *_: sys.exit(0))\n" - f"subprocess.Popen([sys.executable, '-c', {child!r}])\n" - f"time.sleep(3600)\n" - ) - proc = await _create_platform_compatible_process(sys.executable, ["-c", parent_script]) - stack.push_async_callback(_terminate_and_reap, proc) - - # Deterministic: child connected means both parent and child are up. - with anyio.fail_after(10.0): - stream = await _accept_alive(sock) - stack.push_async_callback(stream.aclose) - # Parent will sys.exit(0) on SIGTERM, but the process-group kill - # (POSIX killpg / Windows Job Object) must still terminate the child. - await _terminate_and_reap(proc) - - # Child must be dead despite parent's early exit. + for stream in streams: await _assert_stream_closed(stream) @pytest.mark.anyio -async def test_stdio_client_graceful_stdin_exit(): - """Test that a process exits gracefully when stdin is closed, - without needing SIGTERM or SIGKILL. - """ - # Create a Python script that exits when stdin is closed - script_content = textwrap.dedent( - """ - import sys - - # Read from stdin until it's closed - try: - while True: - line = sys.stdin.readline() - if not line: # EOF/stdin closed - break - except: - pass - - # Exit gracefully - sys.exit(0) - """ - ) - - server_params = StdioServerParameters( - command=sys.executable, - args=["-c", script_content], - ) - - start_time = time.time() +@pytest.mark.skipif(sys.platform == "win32", reason="POSIX process-group semantics") +# Excluded from coverage (lax: exempt from strict-no-cover) because coverage is enforced +# per CI job at 100%, including on Windows runners, where this test is skipped and its +# body would otherwise count as uncovered lines. +async def test_tree_kill_reaches_children_after_the_leader_has_already_exited() -> None: # pragma: lax no cover + """Killing the tree of a process that has already exited must still reach its + surviving children: the process group outlives its leader, and the group ID is + the leader's pid by construction (start_new_session), not something to look up + from the (reaped) leader.""" + async with AsyncExitStack() as stack: + sock, port = await _open_liveness_listener() + stack.push_async_callback(sock.aclose) + + child = _connect_back_script(port) + # The parent spawns the child and exits immediately: the group leader is dead + # (and reaped) by the time the tree is terminated. + parent = f"import subprocess, sys\nsubprocess.Popen([sys.executable, '-c', {child!r}])\n" + proc = await _create_platform_compatible_process(sys.executable, ["-c", parent]) + assert isinstance(proc, anyio.abc.Process) + stack.callback(_kill_spawn_groups, [proc]) + stack.push_async_callback(_reap, proc) + + # Two interpreter cold starts on a loaded runner; healthy runs take ~0.2s. + with anyio.fail_after(10.0): + stream = await _accept_alive(sock) + stack.push_async_callback(stream.aclose) + # The child connecting proves the parent ran; wait for the leader itself + # to be gone so the kill exercises the dead-leader path. + await _wait_until_exited(proc) - # Use anyio timeout to prevent test from hanging forever - with anyio.move_on_after(5.0) as cancel_scope: - async with stdio_client(server_params) as (_, _): - # Let the process start and begin reading stdin - await anyio.sleep(0.2) - # Exit context triggers cleanup - process should exit from stdin closure - pass + await _terminate_process_tree(proc) - if cancel_scope.cancelled_caught: - pytest.fail( - "stdio_client cleanup timed out after 5.0 seconds. " - "Process should have exited gracefully when stdin was closed." - ) # pragma: no cover + await _assert_stream_closed(stream) - end_time = time.time() - elapsed = end_time - start_time - # Should complete quickly with just stdin closure (no signals needed) - assert elapsed < 3.0, ( - f"stdio_client cleanup took {elapsed:.1f} seconds for stdin-aware process. " - f"Expected < 3.0 seconds since process should exit on stdin closure." - ) +@pytest.mark.anyio +@pytest.mark.skipif(sys.platform == "win32", reason="POSIX process-group semantics") +# Excluded from coverage for the same Windows-runner reason as above. +async def test_terminating_an_already_exited_process_is_a_no_op() -> None: # pragma: lax no cover + """Once the whole group is gone, tree termination returns without error (and + without falling back to signalling a reaped pid).""" + proc = await _create_platform_compatible_process(sys.executable, ["-c", "pass"]) + assert isinstance(proc, anyio.abc.Process) + + # The bound covers one interpreter cold start on a loaded runner; a healthy run + # takes well under a second. + with anyio.fail_after(10.0): + await _wait_until_exited(proc) + await _terminate_process_tree(proc) + await _reap(proc) @pytest.mark.anyio -async def test_stdio_client_stdin_close_ignored(): - """Test that when a process ignores stdin closure, the shutdown sequence - properly escalates to SIGTERM. +@pytest.mark.skipif(sys.platform == "win32", reason="Windows signal handling is different") +# Excluded from coverage (lax: exempt from strict-no-cover) because coverage is enforced +# per CI job at 100%, including on Windows runners, where this test is skipped and its +# body would otherwise count as uncovered lines. +async def test_escalation_kills_a_process_that_ignores_sigterm( # pragma: lax no cover + monkeypatch: pytest.MonkeyPatch, +) -> None: + """Cleanup escalates past SIGTERM and kills a process that ignores it. + + The child installs SIG_IGN for SIGTERM *before* connecting to the liveness socket, so + by the time the test proceeds the ignore is guaranteed to be in place. SIGKILL cannot + be observed by the child; its delivery is proven by the kernel closing the socket. + Both timeouts are shortened to keep the test fast: the escalation *decision* is + pinned in process with a shortened grace, and this is the only test that exercises + the SIGTERM-then-SIGKILL escalation itself — the production constants' values are + deliberately unpinned. """ - # Create a Python script that ignores stdin closure but responds to SIGTERM - script_content = textwrap.dedent( - """ - import signal - import sys - import time - - # Set up SIGTERM handler to exit cleanly - def sigterm_handler(signum, frame): - sys.exit(0) - - signal.signal(signal.SIGTERM, sigterm_handler) - - # Close stdin immediately to simulate ignoring it - sys.stdin.close() - - # Keep running until SIGTERM - while True: - time.sleep(0.1) - """ - ) - - server_params = StdioServerParameters( - command=sys.executable, - args=["-c", script_content], - ) - - start_time = time.time() - - # Use anyio timeout to prevent test from hanging forever - with anyio.move_on_after(7.0) as cancel_scope: - async with stdio_client(server_params) as (_, _): - # Let the process start - await anyio.sleep(0.2) - # Exit context triggers cleanup - pass + monkeypatch.setattr(stdio, "PROCESS_TERMINATION_TIMEOUT", 0.2) + monkeypatch.setattr(stdio, "FORCE_KILL_TIMEOUT", 0.2) + spawned = _record_spawned_processes(monkeypatch) + + async with AsyncExitStack() as stack: + stack.callback(_kill_spawn_groups, spawned) + sock, port = await _open_liveness_listener() + stack.push_async_callback(sock.aclose) + + script = "import signal\nsignal.signal(signal.SIGTERM, signal.SIG_IGN)\n" + _connect_back_script(port) + server_params = StdioServerParameters(command=sys.executable, args=["-c", script]) + + # The bound covers an interpreter cold start on a loaded runner plus the two + # shortened escalation waits; a healthy run takes well under a second. + with anyio.fail_after(15.0): + async with stdio_client(server_params): + stream = await _accept_alive(sock) + stack.push_async_callback(stream.aclose) - if cancel_scope.cancelled_caught: - pytest.fail( - "stdio_client cleanup timed out after 7.0 seconds. " - "Process should have been terminated via SIGTERM escalation." - ) # pragma: no cover + await _assert_stream_closed(stream) - end_time = time.time() - elapsed = end_time - start_time - # Should take ~2 seconds (stdin close timeout) before SIGTERM is sent - # Total time should be between 2-4 seconds - assert 1.5 < elapsed < 4.5, ( - f"stdio_client cleanup took {elapsed:.1f} seconds for stdin-ignoring process. " - f"Expected between 2-4 seconds (2s stdin timeout + termination time)." - ) +@pytest.mark.anyio +@pytest.mark.skipif(not Path("/proc/self/fd").is_dir(), reason="needs procfs to enumerate open file descriptors") +# Excluded from coverage (lax: exempt from strict-no-cover) because coverage is enforced +# per CI job at 100%, including on Windows runners, which have no procfs and skip this. +async def test_a_graceful_exit_with_a_surviving_child_leaks_no_pipe_fds( # pragma: lax no cover + monkeypatch: pytest.MonkeyPatch, +) -> None: + """A server may exit cleanly on stdin closure while leaving a child behind (the + POSIX policy: survivors are the server's business), and that child inherits the + stdio pipe ends. The client must still release its own pipe fds and subprocess + transport at shutdown — on asyncio nothing else ever closes them while the + orphan holds the pipe — instead of leaking them for the orphan's lifetime.""" + spawned = _record_spawned_processes(monkeypatch) + + async with AsyncExitStack() as stack: + stack.callback(_kill_spawn_groups, spawned) + sock, port = await _open_liveness_listener() + stack.push_async_callback(sock.aclose) + + child = _connect_back_script(port) + # The server hands its inherited pipes to a child, then exits as soon as its + # stdin closes — the well-behaved graceful path, so no kill ever happens. + server = f"import subprocess, sys\nsubprocess.Popen([sys.executable, '-c', {child!r}])\nsys.stdin.read()\n" + server_params = StdioServerParameters(command=sys.executable, args=["-c", server]) + + gc.collect() # settle earlier garbage so its collection cannot close fds mid-test + baseline = set(os.listdir("/proc/self/fd")) + + # Two interpreter cold starts on a loaded runner; healthy runs take ~0.3s. + with anyio.fail_after(15.0): + async with stdio_client(server_params): + stream = await _accept_alive(sock) + await stream.aclose() + + leader = spawned[0] + assert isinstance(leader, anyio.abc.Process) + # The graceful path: exited on stdin closure, no termination involved. + assert leader.returncode == 0 + # Subset, not equality: other machinery may close fds, but never open new + # ones — a leaked pipe fd would show up as an extra entry. + assert set(os.listdir("/proc/self/fd")) <= baseline diff --git a/tests/interaction/transports/test_stdio.py b/tests/interaction/transports/test_stdio.py index 27cc65de42..a6b77e163c 100644 --- a/tests/interaction/transports/test_stdio.py +++ b/tests/interaction/transports/test_stdio.py @@ -26,6 +26,7 @@ import pytest from inline_snapshot import snapshot +from mcp.client import stdio from mcp.client.client import Client from mcp.client.stdio import StdioServerParameters, stdio_client from mcp.server.stdio import stdio_server @@ -51,10 +52,20 @@ @requirement("transport:stdio") @requirement("transport:stdio:clean-shutdown") @requirement("transport:stdio:stderr-passthrough") -async def test_tool_call_and_notification_round_trip_over_a_stdio_subprocess() -> None: +async def test_tool_call_and_notification_round_trip_over_a_stdio_subprocess( + monkeypatch: pytest.MonkeyPatch, +) -> None: """A Client connected over stdio initializes, calls a tool with arguments, receives the server's log notification before the call returns, and the server exits when the transport closes its stdin.""" + # After shutdown closes the child's stdin, the child must unwind its run loop, write the + # clean-exit line asserted below, and let coverage's atexit hook persist the subprocess data + # file (enabled by the COVERAGE_ passthrough below) before the grace period expires. The + # production 2s default proved too tight on slow Windows runners: the escalation killed the + # child mid-atexit — after the asserted stderr line, so the test stayed green — and the + # silently missing data file tripped the 100% coverage gate. The timeout is not under test. + monkeypatch.setattr(stdio, "PROCESS_TERMINATION_TIMEOUT", 10.0) + received: list[LoggingMessageNotificationParams] = [] async def collect(params: LoggingMessageNotificationParams) -> None: @@ -69,7 +80,12 @@ async def collect(params: LoggingMessageNotificationParams) -> None: # stdio_client deliberately filters the inherited environment to a safe minimum, # which drops the variables coverage.py's subprocess support uses; pass them through # so the server module is measured. Empty when not running under coverage. - env={key: value for key, value in os.environ.items() if key.startswith("COVERAGE_")}, + # SyntaxWarning is suppressed because the child compiles dependencies from source + # (pytest's pyc tag doesn't match a plain python child's): at the anyio>=4.9 floor, + # Python 3.14 emits a compile-time warning for anyio's return-in-finally, which + # would land on the snapshot-asserted stderr below. + env={key: value for key, value in os.environ.items() if key.startswith("COVERAGE_")} + | {"PYTHONWARNINGS": "ignore::SyntaxWarning"}, ), errlog=errlog, ) diff --git a/tests/issues/test_1027_win_unreachable_cleanup.py b/tests/issues/test_1027_win_unreachable_cleanup.py deleted file mode 100644 index c59c5aecae..0000000000 --- a/tests/issues/test_1027_win_unreachable_cleanup.py +++ /dev/null @@ -1,240 +0,0 @@ -"""Regression test for issue #1027: Ensure cleanup procedures run properly during shutdown - -Issue #1027 reported that cleanup code after "yield" in lifespan was unreachable when -processes were terminated. This has been fixed by implementing the MCP spec-compliant -stdio shutdown sequence that closes stdin first, allowing graceful exit. - -These tests verify the fix continues to work correctly across all platforms. -""" - -import sys -import tempfile -import textwrap -from pathlib import Path - -import anyio -import pytest - -from mcp import ClientSession, StdioServerParameters -from mcp.client.stdio import _create_platform_compatible_process, stdio_client -from tests.shared.test_win32_utils import escape_path_for_python - - -@pytest.mark.anyio -async def test_lifespan_cleanup_executed(): - """Regression test ensuring MCP server cleanup code runs during shutdown. - - This test verifies that the fix for issue #1027 works correctly by: - 1. Starting an MCP server that writes a marker file on startup - 2. Shutting down the server normally via stdio_client - 3. Verifying the cleanup code (after yield) executed and wrote its marker file - - The fix implements proper stdin closure before termination, giving servers - time to run their cleanup handlers. - """ - - # Create marker files to track server lifecycle - with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".txt") as f: - startup_marker = f.name - with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".txt") as f: - cleanup_marker = f.name - - # Remove the files so we can detect when they're created - Path(startup_marker).unlink() - Path(cleanup_marker).unlink() - - # Create a minimal MCP server using MCPServer that tracks lifecycle - server_code = textwrap.dedent(f""" - import asyncio - import sys - from pathlib import Path - from contextlib import asynccontextmanager - from mcp.server.mcpserver import MCPServer - - STARTUP_MARKER = {escape_path_for_python(startup_marker)} - CLEANUP_MARKER = {escape_path_for_python(cleanup_marker)} - - @asynccontextmanager - async def lifespan(server): - # Write startup marker - Path(STARTUP_MARKER).write_text("started") - try: - yield {{"started": True}} - finally: - # This cleanup code now runs properly during shutdown - Path(CLEANUP_MARKER).write_text("cleaned up") - - mcp = MCPServer("test-server", lifespan=lifespan) - - @mcp.tool() - def echo(text: str) -> str: - return text - - if __name__ == "__main__": - mcp.run() - """) - - # Write the server script to a temporary file - with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".py") as f: - server_script = f.name - f.write(server_code) - - try: - # Launch the MCP server - params = StdioServerParameters(command=sys.executable, args=[server_script]) - - async with stdio_client(params) as (read, write): - async with ClientSession(read, write) as session: - # Initialize the session - result = await session.initialize() - assert result.protocol_version in ["2024-11-05", "2025-06-18", "2025-11-25"] - - # Verify startup marker was created - assert Path(startup_marker).exists(), "Server startup marker not created" - assert Path(startup_marker).read_text() == "started" - - # Make a test request to ensure server is working - response = await session.call_tool("echo", {"text": "hello"}) - assert response.content[0].type == "text" - assert getattr(response.content[0], "text") == "hello" - - # Session will be closed when exiting the context manager - - # Give server a moment to complete cleanup - with anyio.move_on_after(5.0): - while not Path(cleanup_marker).exists(): # pragma: lax no cover - await anyio.sleep(0.1) - - # Verify cleanup marker was created - this works now that stdio_client - # properly closes stdin before termination, allowing graceful shutdown - assert Path(cleanup_marker).exists(), "Server cleanup marker not created - regression in issue #1027 fix" - assert Path(cleanup_marker).read_text() == "cleaned up" - - finally: - # Clean up files - for path in [server_script, startup_marker, cleanup_marker]: - try: # pragma: lax no cover - Path(path).unlink() - except FileNotFoundError: # pragma: lax no cover - pass - - -@pytest.mark.anyio -@pytest.mark.filterwarnings("ignore::ResourceWarning" if sys.platform == "win32" else "default") -async def test_stdin_close_triggers_cleanup(): - """Regression test verifying the stdin-based graceful shutdown mechanism. - - This test ensures the core fix for issue #1027 continues to work by: - 1. Manually managing a server process - 2. Closing stdin to trigger graceful shutdown - 3. Verifying cleanup handlers run before the process exits - - This mimics the behavior now implemented in stdio_client's shutdown sequence. - - Note on Windows ResourceWarning: - On Windows, we may see ResourceWarning about unclosed file descriptors. - This is expected behavior because: - - We're manually managing the process lifecycle - - Windows file handle cleanup works differently than Unix - - The warning doesn't indicate a real issue - cleanup still works - We filter this warning on Windows only to avoid test noise. - """ - - # Create marker files to track server lifecycle - with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".txt") as f: - startup_marker = f.name - with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".txt") as f: - cleanup_marker = f.name - - # Remove the files so we can detect when they're created - Path(startup_marker).unlink() - Path(cleanup_marker).unlink() - - # Create an MCP server that handles stdin closure gracefully - server_code = textwrap.dedent(f""" - import asyncio - import sys - from pathlib import Path - from contextlib import asynccontextmanager - from mcp.server.mcpserver import MCPServer - - STARTUP_MARKER = {escape_path_for_python(startup_marker)} - CLEANUP_MARKER = {escape_path_for_python(cleanup_marker)} - - @asynccontextmanager - async def lifespan(server): - # Write startup marker - Path(STARTUP_MARKER).write_text("started") - try: - yield {{"started": True}} - finally: - # This cleanup code runs when stdin closes, enabling graceful shutdown - Path(CLEANUP_MARKER).write_text("cleaned up") - - mcp = MCPServer("test-server", lifespan=lifespan) - - @mcp.tool() - def echo(text: str) -> str: - return text - - if __name__ == "__main__": - # The server should exit gracefully when stdin closes - try: - mcp.run() - except Exception: - # Server might get EOF or other errors when stdin closes - pass - """) - - # Write the server script to a temporary file - with tempfile.NamedTemporaryFile(mode="w", delete=False, suffix=".py") as f: - server_script = f.name - f.write(server_code) - - try: - # This test manually manages the process to verify stdin-based shutdown - # Start the server process - process = await _create_platform_compatible_process( - command=sys.executable, args=[server_script], env=None, errlog=sys.stderr, cwd=None - ) - - # Wait for server to start - with anyio.move_on_after(10.0): - while not Path(startup_marker).exists(): - await anyio.sleep(0.1) - - # Check if process is still running - if hasattr(process, "returncode") and process.returncode is not None: # pragma: lax no cover - pytest.fail(f"Server process exited with code {process.returncode}") - - assert Path(startup_marker).exists(), "Server startup marker not created" - - # Close stdin to signal shutdown - if process.stdin: # pragma: no branch - await process.stdin.aclose() - - # Wait for process to exit gracefully - try: - with anyio.fail_after(5.0): # Increased from 2.0 to 5.0 - await process.wait() - except TimeoutError: # pragma: lax no cover - # If it doesn't exit after stdin close, terminate it - process.terminate() - await process.wait() - - # Check if cleanup ran - with anyio.move_on_after(5.0): - while not Path(cleanup_marker).exists(): # pragma: lax no cover - await anyio.sleep(0.1) - - # Verify the cleanup ran - stdin closure enables graceful shutdown - assert Path(cleanup_marker).exists(), "Server cleanup marker not created - stdin-based shutdown failed" - assert Path(cleanup_marker).read_text() == "cleaned up" - - finally: - # Clean up files - for path in [server_script, startup_marker, cleanup_marker]: - try: # pragma: lax no cover - Path(path).unlink() - except FileNotFoundError: # pragma: lax no cover - pass diff --git a/tests/issues/test_552_windows_hang.py b/tests/issues/test_552_windows_hang.py index 1adb5d80cb..fdb134da7e 100644 --- a/tests/issues/test_552_windows_hang.py +++ b/tests/issues/test_552_windows_hang.py @@ -1,5 +1,6 @@ """Test for issue #552: stdio_client hangs on Windows.""" +import json import sys from textwrap import dedent @@ -8,41 +9,36 @@ from mcp import ClientSession, StdioServerParameters from mcp.client.stdio import stdio_client +from mcp.types import LATEST_PROTOCOL_VERSION, InitializeResult @pytest.mark.skipif(sys.platform != "win32", reason="Windows-specific test") # pragma: no cover @pytest.mark.anyio -async def test_windows_stdio_client_with_session(): - """Test the exact scenario from issue #552: Using ClientSession with stdio_client. - - This reproduces the original bug report where stdio_client hangs on Windows 11 - when used with ClientSession. +async def test_initialize_succeeds_and_shutdown_returns_after_the_server_exits_mid_session(): + """Initialize completes (and shutdown returns) against a server that responds and + then exits mid-session — the proactor pipe scenario that hung on Windows 11 in + issue #552. The positive assertion matters: a session that *errors* quickly would + also "not hang", so finishing fast alone proves nothing. """ - # Create a minimal MCP server that responds to initialization - server_script = dedent(""" + # A minimal server: answer initialize correctly, then exit. + server_script = dedent(f""" import json import sys - # Read initialization request line = sys.stdin.readline() + request = json.loads(line) - # Send initialization response - response = { + response = {{ "jsonrpc": "2.0", - "id": 1, - "result": { - "protocolVersion": "1.0", - "capabilities": {}, - "serverInfo": {"name": "test-server", "version": "1.0"} - } - } + "id": request["id"], + "result": {{ + "protocolVersion": {json.dumps(LATEST_PROTOCOL_VERSION)}, + "capabilities": {{}}, + "serverInfo": {{"name": "test-server", "version": "1.0"}} + }} + }} print(json.dumps(response)) sys.stdout.flush() - - # Exit after a short delay - import time - time.sleep(0.1) - sys.exit(0) """).strip() params = StdioServerParameters( @@ -50,14 +46,11 @@ async def test_windows_stdio_client_with_session(): args=["-c", server_script], ) - # This is the exact pattern from the bug report with anyio.fail_after(10): - try: - async with stdio_client(params) as (read, write): - async with ClientSession(read, write) as session: - await session.initialize() - # Should exit ClientSession without hanging - # Should exit stdio_client without hanging - except Exception: - # Connection errors are expected when process exits - pass + async with stdio_client(params) as (read, write): + async with ClientSession(read, write) as session: + result = await session.initialize() + assert isinstance(result, InitializeResult) + assert result.server_info.name == "test-server" + # Exiting ClientSession and stdio_client must not hang even though the + # server process is already gone. diff --git a/tests/server/mcpserver/test_elicitation.py b/tests/server/mcpserver/test_elicitation.py index 679fb848f5..9292586b32 100644 --- a/tests/server/mcpserver/test_elicitation.py +++ b/tests/server/mcpserver/test_elicitation.py @@ -1,4 +1,4 @@ -"""Test the elicitation feature using stdio transport.""" +"""Test the elicitation feature over the in-memory client transport.""" from typing import Any @@ -58,9 +58,9 @@ async def call_tool_and_assert( @pytest.mark.anyio -async def test_stdio_elicitation(): - """Test the elicitation feature using stdio transport.""" - mcp = MCPServer(name="StdioElicitationServer") +async def test_elicitation_accept_returns_the_users_answer_to_the_tool(): + """An accepted elicitation delivers the user's content back to the requesting tool.""" + mcp = MCPServer(name="ElicitationServer") create_ask_user_tool(mcp) # Create a custom handler for elicitation requests @@ -76,9 +76,9 @@ async def elicitation_callback(context: RequestContext[ClientSession], params: E @pytest.mark.anyio -async def test_stdio_elicitation_decline(): - """Test elicitation with user declining.""" - mcp = MCPServer(name="StdioElicitationDeclineServer") +async def test_elicitation_decline_reaches_the_tool_without_content(): + """A declined elicitation reports the decline to the tool, with no content attached.""" + mcp = MCPServer(name="ElicitationDeclineServer") create_ask_user_tool(mcp) async def elicitation_callback(context: RequestContext[ClientSession], params: ElicitRequestParams): diff --git a/tests/server/test_stdio.py b/tests/server/test_stdio.py index 677a993567..d5025bbe67 100644 --- a/tests/server/test_stdio.py +++ b/tests/server/test_stdio.py @@ -1,17 +1,23 @@ import io import sys +import threading +from collections.abc import AsyncIterator +from contextlib import asynccontextmanager from io import TextIOWrapper import anyio import pytest +from mcp.server.mcpserver import MCPServer from mcp.server.stdio import stdio_server from mcp.shared.message import SessionMessage from mcp.types import JSONRPCMessage, JSONRPCRequest, JSONRPCResponse, jsonrpc_message_adapter @pytest.mark.anyio -async def test_stdio_server(): +async def test_stdio_server_round_trips_messages_over_injected_streams() -> None: + """stdio_server parses one JSON-RPC message per stdin line and writes each + outgoing message as exactly one line, driven over injected in-process streams.""" stdin = io.StringIO() stdout = io.StringIO() @@ -24,47 +30,41 @@ async def test_stdio_server(): stdin.write(message.model_dump_json(by_alias=True, exclude_none=True) + "\n") stdin.seek(0) - async with stdio_server(stdin=anyio.AsyncFile(stdin), stdout=anyio.AsyncFile(stdout)) as ( - read_stream, - write_stream, - ): - received_messages: list[JSONRPCMessage] = [] - async with read_stream: - async for message in read_stream: - if isinstance(message, Exception): # pragma: no cover - raise message - received_messages.append(message.message) - if len(received_messages) == 2: - break - - # Verify received messages - assert len(received_messages) == 2 - assert received_messages[0] == JSONRPCRequest(jsonrpc="2.0", id=1, method="ping") - assert received_messages[1] == JSONRPCResponse(jsonrpc="2.0", id=2, result={}) - - # Test sending responses from the server - responses = [ - JSONRPCRequest(jsonrpc="2.0", id=3, method="ping"), - JSONRPCResponse(jsonrpc="2.0", id=4, result={}), - ] - - async with write_stream: + with anyio.fail_after(5): + async with stdio_server(stdin=anyio.AsyncFile(stdin), stdout=anyio.AsyncFile(stdout)) as ( + read_stream, + write_stream, + ): + async with read_stream: + received_messages: list[JSONRPCMessage] = [] + for _ in range(2): + received = await read_stream.receive() + assert not isinstance(received, Exception) + received_messages.append(received.message) + + assert received_messages[0] == JSONRPCRequest(jsonrpc="2.0", id=1, method="ping") + assert received_messages[1] == JSONRPCResponse(jsonrpc="2.0", id=2, result={}) + + responses = [ + JSONRPCRequest(jsonrpc="2.0", id=3, method="ping"), + JSONRPCResponse(jsonrpc="2.0", id=4, result={}), + ] + for response in responses: - session_message = SessionMessage(response) - await write_stream.send(session_message) + await write_stream.send(SessionMessage(response)) + await write_stream.aclose() stdout.seek(0) output_lines = stdout.readlines() assert len(output_lines) == 2 received_responses = [jsonrpc_message_adapter.validate_json(line.strip()) for line in output_lines] - assert len(received_responses) == 2 assert received_responses[0] == JSONRPCRequest(jsonrpc="2.0", id=3, method="ping") assert received_responses[1] == JSONRPCResponse(jsonrpc="2.0", id=4, result={}) @pytest.mark.anyio -async def test_stdio_server_invalid_utf8(monkeypatch: pytest.MonkeyPatch): +async def test_stdio_server_invalid_utf8(monkeypatch: pytest.MonkeyPatch) -> None: """Non-UTF-8 bytes on stdin must not crash the server. Invalid bytes are replaced with U+FFFD, which then fails JSON parsing and @@ -92,3 +92,77 @@ async def test_stdio_server_invalid_utf8(monkeypatch: pytest.MonkeyPatch): second = await read_stream.receive() assert isinstance(second, SessionMessage) assert second.message == valid + + +class _KeepOpenBytesIO(io.BytesIO): + """A BytesIO that survives its TextIOWrapper being closed, so the test can read + what was written after `run()` has torn the wrapper down.""" + + def close(self) -> None: + pass + + +def _run_stdio_bounded(server: MCPServer) -> None: + """Call the blocking `server.run("stdio")` with a deadline, failing instead of hanging. + + `run()` creates its own event loop, so a sync test has no async frame to arm + `anyio.fail_after` from; a daemon thread joined with the suite's standard 5s + bound is the sync analogue. `join()` returns as soon as `run()` does — the + timeout only fires if the run loop regresses into never returning on stdin EOF, + turning a silent CI hang into a red test. An exception escaping `run()` fails + the test too: pytest reports it as `PytestUnhandledThreadExceptionWarning`, + which `filterwarnings = ["error"]` escalates. + """ + + def target() -> None: + server.run("stdio") + + thread = threading.Thread(target=target, daemon=True) + thread.start() + thread.join(5) + assert not thread.is_alive(), 'run("stdio") did not return after stdin EOF' + + +def test_mcpserver_run_stdio_serves_until_stdin_closes(monkeypatch: pytest.MonkeyPatch) -> None: + """`MCPServer.run("stdio")` answers a request over the process's stdio and returns + when stdin reaches EOF, rather than serving forever.""" + ping = JSONRPCRequest(jsonrpc="2.0", id=1, method="ping") + stdin_bytes = io.BytesIO(ping.model_dump_json(by_alias=True, exclude_none=True).encode() + b"\n") + captured = _KeepOpenBytesIO() + monkeypatch.setattr(sys, "stdin", TextIOWrapper(stdin_bytes, encoding="utf-8")) + monkeypatch.setattr(sys, "stdout", TextIOWrapper(captured, encoding="utf-8")) + + _run_stdio_bounded(MCPServer(name="RunStdioServer")) + + response = jsonrpc_message_adapter.validate_json(captured.getvalue().decode().strip()) + assert response == JSONRPCResponse(jsonrpc="2.0", id=1, result={}) + + +def test_mcpserver_run_stdio_runs_lifespan_cleanup_after_stdin_closes(monkeypatch: pytest.MonkeyPatch) -> None: + """Code after `yield` in a lifespan runs when stdin EOF ends `run("stdio")`. + + Regression lock for the shutdown chain behind issue #1027: the run loop must end + on stdin EOF and unwind the lifespan — were the process killed before the run + loop returned, the cleanup entry would never be appended. + """ + events: list[str] = [] + + @asynccontextmanager + async def lifespan(server: MCPServer) -> AsyncIterator[None]: + events.append("setup") + try: + yield + finally: + events.append("cleanup") + + ping = JSONRPCRequest(jsonrpc="2.0", id=1, method="ping") + stdin_bytes = io.BytesIO(ping.model_dump_json(by_alias=True, exclude_none=True).encode() + b"\n") + captured = _KeepOpenBytesIO() + monkeypatch.setattr(sys, "stdin", TextIOWrapper(stdin_bytes, encoding="utf-8")) + monkeypatch.setattr(sys, "stdout", TextIOWrapper(captured, encoding="utf-8")) + + _run_stdio_bounded(MCPServer(name="LifespanStdioServer", lifespan=lifespan)) + + assert events == ["setup", "cleanup"] + response = jsonrpc_message_adapter.validate_json(captured.getvalue().decode().strip()) + assert response == JSONRPCResponse(jsonrpc="2.0", id=1, result={}) diff --git a/tests/shared/test_win32_utils.py b/tests/shared/test_win32_utils.py deleted file mode 100644 index e0f9cb4995..0000000000 --- a/tests/shared/test_win32_utils.py +++ /dev/null @@ -1,10 +0,0 @@ -"""Windows-specific test utilities.""" - - -def escape_path_for_python(path: str) -> str: - """Escape a file path for use in Python code strings. - - Converts backslashes to forward slashes which work on all platforms - and don't need escaping in Python strings. - """ - return repr(path.replace("\\", "/")) diff --git a/tests/transports/__init__.py b/tests/transports/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/transports/stdio/__init__.py b/tests/transports/stdio/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/transports/stdio/_liveness.py b/tests/transports/stdio/_liveness.py new file mode 100644 index 0000000000..a17d32d252 --- /dev/null +++ b/tests/transports/stdio/_liveness.py @@ -0,0 +1,98 @@ +"""Kernel-synchronized liveness probes for the real-subprocess stdio lifecycle suite. + +A spawned (grand)child connects back to a test-owned TCP listener and sends +`b'alive'`. From there the kernel provides every signal a test needs, with no +sleeps or polling anywhere: + +1. `accept_alive` blocks until the subprocess connects, proving it is running (and + that the script lines before the connect have executed). +2. `assert_stream_closed` proves the peer terminated: the kernel closes all of a + process's file descriptors on exit, surfacing EOF (clean close / FIN) or + `BrokenResourceError` (abrupt close / RST, typical of SIGKILL and Windows job + termination). +3. `assert_peer_echoes` proves the peer is *alive*: only a running process can + answer an echo, so a positive reply cannot race a kill the way a "no FIN yet" + observation could. + +These helpers are extracted from the real-process section of +tests/client/test_stdio.py; the two copies on this branch are deliberate — +consolidating that file onto this module is follow-up work. +""" + +import anyio +import anyio.abc +import pytest + + +def connect_back_script(port: int, *, echo: bool = False) -> str: + """Return a `python -c` script body that connects to 127.0.0.1:`port` and + sends `b'alive'`. + + By default the process then blocks forever, serving as a pure liveness beacon + for kill/termination tests. With `echo=True` it instead echoes every received + chunk back (the recv parks it just as indefinitely), so a survival test can + prove the process is still running after the client is gone — see + `assert_peer_echoes`. + """ + # Excluded from coverage (lax: exempt from strict-no-cover): echo mode is + # used only by POSIX-gated tests, and Windows runners enforce 100% per job. + if echo: # pragma: lax no cover + tail = "while True:\n data = s.recv(65536)\n if not data:\n break\n s.sendall(data)\n" + else: + tail = "time.sleep(3600)\n" + return f"import socket, time\ns = socket.create_connection(('127.0.0.1', {port}))\ns.sendall(b'alive')\n" + tail + + +async def open_liveness_listener() -> tuple[anyio.abc.SocketListener, int]: + """Open a TCP listener on localhost and return it along with its port.""" + multi = await anyio.create_tcp_listener(local_host="127.0.0.1") + sock = multi.listeners[0] + assert isinstance(sock, anyio.abc.SocketListener) + addr = sock.extra(anyio.abc.SocketAttribute.local_address) + # IPv4 local_address is (host: str, port: int) + assert isinstance(addr, tuple) and len(addr) >= 2 and isinstance(addr[1], int) + return sock, addr[1] + + +async def accept_alive(sock: anyio.abc.SocketListener) -> anyio.abc.SocketStream: + """Accept one connection and assert the peer sent `b'alive'`. + + Blocks deterministically until a subprocess connects (no polling), reading + until the full 5-byte banner has arrived — TCP may legally split even a tiny + send. The calling test bounds this with `anyio.fail_after` to catch the case + where the subprocess chain failed to start. + """ + stream = await sock.accept() + msg = b"" + while len(msg) < 5: + msg += await stream.receive(5 - len(msg)) + assert msg == b"alive", f"expected b'alive', got {msg!r}" + return stream + + +async def assert_stream_closed(stream: anyio.abc.SocketStream) -> None: + """Assert the peer holding the other end of `stream` has terminated.""" + with anyio.fail_after(5.0), pytest.raises((anyio.EndOfStream, anyio.BrokenResourceError)): + await stream.receive(1) + + +async def assert_peer_echoes(stream: anyio.abc.SocketStream) -> None: # pragma: lax no cover + """Assert the peer holding the other end of `stream` is still running, by + round-tripping one echo through it (the peer must use `echo=True`). + + A dead process can never answer, so under a regression that kills the peer this + raises (EOF/reset) or times out via the bound — it cannot pass spuriously; the + sub-millisecond window between a kill being issued and taking effect is dwarfed + by the socket round trip that must complete after it. + + Excluded from coverage (lax: exempt from strict-no-cover) like `connect_back_script`'s + echo mode: only POSIX-gated survival tests call this, and Windows runners enforce + 100% coverage per job. + """ + with anyio.fail_after(5.0): + await stream.send(b"ping") + # Read until the full echo has arrived: TCP may legally split even a tiny send. + echoed = b"" + while len(echoed) < 4: + echoed += await stream.receive(4 - len(echoed)) + assert echoed == b"ping", f"expected b'ping', got {echoed!r}" diff --git a/tests/transports/stdio/conftest.py b/tests/transports/stdio/conftest.py new file mode 100644 index 0000000000..56918f730b --- /dev/null +++ b/tests/transports/stdio/conftest.py @@ -0,0 +1,82 @@ +"""Fixtures for the stdio lifecycle suite: recording seams around the spawn and +tree-termination internals of `stdio_client` (the real implementations still run), +plus the failure-path safety net that keeps a crashed test from orphaning its +sleep-forever subprocesses. +""" + +import os +import signal +import sys +from collections.abc import Generator +from contextlib import suppress +from pathlib import Path +from typing import TextIO + +import anyio.abc +import pytest + +from mcp.client import stdio +from mcp.client.stdio import _create_platform_compatible_process, _terminate_process_tree +from mcp.os.win32.utilities import FallbackProcess + + +@pytest.fixture +def spawned_processes( + monkeypatch: pytest.MonkeyPatch, +) -> Generator[list[anyio.abc.Process | FallbackProcess]]: + """Record every process `stdio_client` spawns; the real spawn still runs. + + Tests inspect the recorded processes afterwards (exit codes, concrete type on + the Windows fallback path). Teardown SIGKILLs each spawn-time process group on + POSIX, in both of its roles: failure-path safety net (a test that dies mid-body + cannot orphan its sleep-forever descendants for an hour) and the reaper for + tests that deliberately leave a survivor running, like the POSIX survival + test's echo child. On Windows there is no process group to signal (the Job + Object covers strays). + """ + spawned: list[anyio.abc.Process | FallbackProcess] = [] + + async def recording_spawn( + command: str, + args: list[str], + env: dict[str, str] | None = None, + errlog: TextIO = sys.stderr, + cwd: Path | str | None = None, + ) -> anyio.abc.Process | FallbackProcess: + process = await _create_platform_compatible_process(command, args, env, errlog, cwd) + spawned.append(process) + return process + + monkeypatch.setattr(stdio, "_create_platform_compatible_process", recording_spawn) + yield spawned + _kill_spawn_groups(spawned) + + +@pytest.fixture +def terminate_calls(monkeypatch: pytest.MonkeyPatch) -> list[anyio.abc.Process | FallbackProcess]: + """Record every invocation of `stdio_client`'s tree-termination seam; the real + termination still runs. + + An empty list after the context exits proves the graceful path: the server was + never escalated against, which a socket signal alone cannot establish (a FIN + looks the same whether the peer exited on stdin closure or was killed). + """ + terminated: list[anyio.abc.Process | FallbackProcess] = [] + + async def recording_terminate(process: anyio.abc.Process | FallbackProcess) -> None: + terminated.append(process) + await _terminate_process_tree(process) + + monkeypatch.setattr(stdio, "_terminate_process_tree", recording_terminate) + return terminated + + +# Excluded from coverage (lax: exempt from strict-no-cover): registered on every +# platform but a no-op on Windows, whose runners enforce 100% coverage per job. +def _kill_spawn_groups(spawned: list[anyio.abc.Process | FallbackProcess]) -> None: # pragma: lax no cover + """SIGKILL each spawn-time process group; see `spawned_processes`.""" + if sys.platform == "win32": + return + for process in spawned: + with suppress(ProcessLookupError): + os.killpg(process.pid, signal.SIGKILL) diff --git a/tests/transports/stdio/test_lifecycle.py b/tests/transports/stdio/test_lifecycle.py new file mode 100644 index 0000000000..077c287ad0 --- /dev/null +++ b/tests/transports/stdio/test_lifecycle.py @@ -0,0 +1,282 @@ +"""Real-subprocess stdio lifecycle tests that hold on both POSIX and Windows. + +The four `stdio_client` tests each launch a real server process through the public +API and pin one lifecycle behaviour — the happy path, cancellation, mid-session +death, stderr routing — with kernel-level liveness sockets as the only +synchronization. The two `FallbackProcess` tests wrap a raw `subprocess.Popen` +directly (the wrapper holds a plain Popen, so they need no Windows machinery; one +is gated on `os.waitid` availability). The Windows-only SelectorEventLoop path that +*selects* the fallback is pinned in test_windows.py, and platform-divergent +shutdown policy lives in test_posix.py / test_windows.py. + +Division of labour: the full protocol round trip over a real subprocess is pinned +by tests/interaction/transports/test_stdio.py, and in-process shutdown logic by +tests/client/test_stdio.py; this folder owns lifecycle mechanics against real +subprocesses. +""" + +import os +import subprocess +import sys +import threading +from contextlib import AsyncExitStack +from pathlib import Path + +import anyio +import anyio.abc +import pytest + +from mcp.client import stdio +from mcp.client.stdio import StdioServerParameters, stdio_client +from mcp.os.win32.utilities import FallbackProcess +from tests.transports.stdio._liveness import ( + accept_alive, + assert_stream_closed, + connect_back_script, + open_liveness_listener, +) + + +@pytest.mark.anyio +async def test_a_server_that_exits_on_stdin_close_is_reaped_and_never_terminated( + spawned_processes: list[anyio.abc.Process | FallbackProcess], + terminate_calls: list[anyio.abc.Process | FallbackProcess], +) -> None: + """The happy path: closing stdin alone shuts a well-behaved server down — it + exits with code 0 and the escalation seam is never invoked. The socket only + proves death; the exit code and empty terminate log attribute its cause.""" + async with AsyncExitStack() as stack: + sock, port = await open_liveness_listener() + stack.push_async_callback(sock.aclose) + + # The server connects, then exits on its own as soon as its stdin reaches + # EOF — the well-behaved response to shutdown's first step. + server = ( + f"import socket, sys\n" + f"s = socket.create_connection(('127.0.0.1', {port}))\n" + f"s.sendall(b'alive')\n" + f"sys.stdin.read()\n" + ) + params = StdioServerParameters(command=sys.executable, args=["-c", server]) + + # The bound covers one interpreter cold start on a loaded runner; a healthy + # run takes well under a second. + with anyio.fail_after(10.0): + async with stdio_client(params): + stream = await accept_alive(sock) + stack.push_async_callback(stream.aclose) + + await assert_stream_closed(stream) + + assert spawned_processes[0].returncode == 0 + assert terminate_calls == [] + + +@pytest.mark.anyio +async def test_cancelling_the_client_mid_session_terminates_the_whole_server_tree( + monkeypatch: pytest.MonkeyPatch, + spawned_processes: list[anyio.abc.Process | FallbackProcess], + terminate_calls: list[anyio.abc.Process | FallbackProcess], +) -> None: + """Cancellation (a client timeout, app shutdown) still runs the full shutdown + against a real process tree: a server that ignores stdin closure is escalated + against, and its child dies with it.""" + monkeypatch.setattr(stdio, "PROCESS_TERMINATION_TIMEOUT", 0.2) + + async with AsyncExitStack() as stack: + sock, port = await open_liveness_listener() + stack.push_async_callback(sock.aclose) + + child = connect_back_script(port) + # The parent never reads stdin and blocks forever, so only the escalation + # can end it — under cancellation, which must not skip that escalation. + parent = f"import subprocess, sys\nsubprocess.Popen([sys.executable, '-c', {child!r}])\n" + connect_back_script( + port + ) + params = StdioServerParameters(command=sys.executable, args=["-c", parent]) + + entered = anyio.Event() + # Cancel a scope owned by the client's task, not the test's task group: a + # host self-cancel is delivered by throwing through this test function's + # suspended frames, and Python 3.11's tracer loses coverage events after + # such a throw() traversal (python/cpython#106749). + cancel_scope = anyio.CancelScope() + + async def run_client_until_cancelled() -> None: + with cancel_scope: + async with stdio_client(params): + entered.set() + await anyio.sleep_forever() + + streams: list[anyio.abc.SocketStream] = [] + # The bound covers two interpreter cold starts on a loaded runner plus the + # shortened escalation wait; a healthy run takes around a second. + with anyio.fail_after(10.0): + async with anyio.create_task_group() as tg: + tg.start_soon(run_client_until_cancelled) + await entered.wait() + for _ in range(2): + stream = await accept_alive(sock) + stack.push_async_callback(stream.aclose) + streams.append(stream) + cancel_scope.cancel() + + for stream in streams: + await assert_stream_closed(stream) + + assert terminate_calls == spawned_processes + + +@pytest.mark.anyio +async def test_a_server_that_exits_mid_session_keeps_its_own_exit_code( + spawned_processes: list[anyio.abc.Process | FallbackProcess], + terminate_calls: list[anyio.abc.Process | FallbackProcess], +) -> None: + """A server that dies on its own mid-session is reaped with the exit code it + chose: the client surfaces the child's true status rather than synthesizing + one, and the escalation seam confirms nothing was terminated along the way.""" + async with AsyncExitStack() as stack: + sock, port = await open_liveness_listener() + stack.push_async_callback(sock.aclose) + + server = ( + f"import socket, sys\n" + f"s = socket.create_connection(('127.0.0.1', {port}))\n" + f"s.sendall(b'alive')\n" + f"sys.exit(7)\n" + ) + params = StdioServerParameters(command=sys.executable, args=["-c", server]) + + # The bound covers one interpreter cold start on a loaded runner; a healthy + # run takes well under a second. + with anyio.fail_after(10.0): + # (no branch: coverage mis-traces the exit arcs of a nested `async with` + # on Python 3.11+.) + async with stdio_client(params): # pragma: no branch + stream = await accept_alive(sock) + stack.push_async_callback(stream.aclose) + # The server is already gone before shutdown begins. + await assert_stream_closed(stream) + + assert spawned_processes[0].returncode == 7 + assert terminate_calls == [] + + +@pytest.mark.anyio +async def test_server_stderr_output_reaches_the_errlog_file( + tmp_path: Path, + spawned_processes: list[anyio.abc.Process | FallbackProcess], +) -> None: + """What the server writes to stderr lands in the file passed as `errlog`. + errlog becomes the child's stderr at the OS level (the spawn hands over its + file descriptor), so it must be a real file — an in-memory StringIO has no + fileno.""" + marker = "stdio-lifecycle stderr marker 4242" + + async with AsyncExitStack() as stack: + sock, port = await open_liveness_listener() + stack.push_async_callback(sock.aclose) + + server = ( + f"import socket, sys\n" + f"s = socket.create_connection(('127.0.0.1', {port}))\n" + f"s.sendall(b'alive')\n" + f"sys.stderr.write({marker!r} + '\\n')\n" + f"sys.stderr.flush()\n" + f"sys.stdin.read()\n" + ) + params = StdioServerParameters(command=sys.executable, args=["-c", server]) + + with (tmp_path / "errlog.txt").open("w+", encoding="utf-8") as errlog: + # The bound covers one interpreter cold start on a loaded runner; a + # healthy run takes well under a second. + with anyio.fail_after(10.0): + async with stdio_client(params, errlog=errlog): + stream = await accept_alive(sock) + stack.push_async_callback(stream.aclose) + + # The server exited on stdin EOF, so every stderr write it made has + # reached the file descriptor. + errlog.seek(0) + content = errlog.read() + + assert marker in content + assert spawned_processes[0].returncode == 0 + + +@pytest.mark.skipif( + not hasattr(os, "waitid"), reason="needs os.waitid(WNOWAIT); absent on Windows and macOS before 3.13" +) +# Excluded from coverage (lax: exempt from strict-no-cover) because coverage is enforced +# per CI job at 100%, including on Windows runners, which lack os.waitid and skip this +# test. There the property is exercised for real by test_windows.py's SelectorEventLoop +# lifecycle test. +def test_fallback_process_reports_death_through_returncode_without_a_wait_call() -> None: # pragma: lax no cover + """`FallbackProcess.returncode` observes process death on its own — pre-fix it + returned Popen's cached value, which stays None until someone calls + wait()/poll(), so the client's returncode-keyed grace wait never saw the + Windows-fallback server die. + + `os.waitid(WEXITED | WNOWAIT)` is the only way to wait for the child to become + reapable without reaping it or priming Popen's cache (which would mask the + regression): after it returns, the new property's poll() deterministically + collects the status while the pre-fix cached read still sees None. (stdout EOF + is NOT such a signal: the kernel closes the pipes before the exit status is + published, so an EOF-then-assert version flakes.) + """ + popen = subprocess.Popen( + [sys.executable, "-c", "pass"], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + ) + assert popen.stdin is not None and popen.stdout is not None + try: + process = FallbackProcess(popen) + + os.waitid(os.P_PID, popen.pid, os.WEXITED | os.WNOWAIT) + assert process.returncode == 0 + finally: + popen.stdin.close() + popen.stdout.close() + # The WNOWAIT above left the child unreaped; reap it so no zombie (and no + # Popen ResourceWarning) outlives the test. + popen.wait() + + +@pytest.mark.anyio +async def test_fallback_process_wait_is_cancellable_while_the_child_lives() -> None: + """`FallbackProcess.wait()` honours cancellation while the child is still + running — pre-fix it parked `Popen.wait()` in a worker thread anyio will not + abandon, which also blocks every cancellation aimed at it; the watchdog below + turns that regression into a clean failure instead of a hang. Runs everywhere: + the wrapper holds a plain Popen.""" + popen = subprocess.Popen( + [sys.executable, "-c", "import sys; sys.stdin.read()"], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + ) + assert popen.stdin is not None and popen.stdout is not None + # Under the pre-fix implementation no timeout below can fire while the worker + # thread is parked in Popen.wait(); killing the child unparks it, the pending + # cancellation lands, and fail_after raises a clean TimeoutError. The healthy + # path cancels the timer unfired. + watchdog = threading.Timer(8.0, popen.kill) + watchdog.start() + try: + process = FallbackProcess(popen) + + # move_on_after's short deadline is the time-based feature under test — + # cancellability — not a wait for an async condition. + with anyio.fail_after(5): + with anyio.move_on_after(0.1) as scope: + await process.wait() + + assert scope.cancelled_caught + # Only the wait was cancelled; the child itself is untouched. + assert popen.poll() is None + finally: + watchdog.cancel() + popen.kill() + popen.wait() + popen.stdin.close() + popen.stdout.close() diff --git a/tests/transports/stdio/test_posix.py b/tests/transports/stdio/test_posix.py new file mode 100644 index 0000000000..0c1ec100b0 --- /dev/null +++ b/tests/transports/stdio/test_posix.py @@ -0,0 +1,126 @@ +"""POSIX-only stdio lifecycle tests: what happens to a well-behaved server's +background children when the client shuts down. + +The policy under test is SDK-defined, not spec-mandated (docs/migration.md, +"`stdio_client` no longer kills children of a gracefully-exited server on +POSIX"): a server that exits on its own after stdin closes keeps its surviving +children — their lifetime is the server's business. The same scenario on +Windows has the opposite documented outcome (the Job Object reaps survivors at +shutdown); see tests/transports/stdio/test_windows.py. +""" + +import errno +import sys +from contextlib import suppress + +import anyio +import anyio.abc +import pytest + +from mcp.client.stdio import StdioServerParameters, stdio_client +from mcp.os.win32.utilities import FallbackProcess +from tests.transports.stdio._liveness import ( + accept_alive, + assert_peer_echoes, + connect_back_script, + open_liveness_listener, +) + +pytestmark = pytest.mark.skipif(sys.platform == "win32", reason="POSIX process-group semantics") + + +@pytest.mark.anyio +# Excluded from coverage (lax: exempt from strict-no-cover) because coverage is enforced +# per CI job at 100%, including on Windows runners, where this file is skipped. +async def test_a_gracefully_exiting_servers_child_survives_the_client_shutdown( # pragma: lax no cover + spawned_processes: list[anyio.abc.Process | FallbackProcess], + terminate_calls: list[anyio.abc.Process | FallbackProcess], +) -> None: + """A server that exits on stdin closure keeps its background child: the client + never escalates, and the child is still running after `stdio_client` returns. + + SDK-defined policy pinned per docs/migration.md; the pre-fix client misread the + child's inherited pipes as the server hanging and tree-killed it. The Windows + twin in test_windows.py pins the opposite documented outcome. + """ + sock, port = await open_liveness_listener() + async with sock: + child = connect_back_script(port, echo=True) + # The server hands its inherited pipes to a child, then exits as soon as + # its stdin closes — the well-behaved graceful path. + server = f"import subprocess, sys\nsubprocess.Popen([sys.executable, '-c', {child!r}])\nsys.stdin.read()\n" + params = StdioServerParameters(command=sys.executable, args=["-c", server]) + + # Two interpreter cold starts on a loaded runner; healthy runs take ~0.3s. + with anyio.fail_after(10.0): + async with stdio_client(params): + child_stream = await accept_alive(sock) + async with child_stream: + # Only a live process answers an echo: the child survived shutdown. + await assert_peer_echoes(child_stream) + + # A FIN-shaped probe could not tell graceful exit from a kill; the seam can: + # the escalation was never invoked, and the leader exited 0 on stdin closure. + assert terminate_calls == [] + leader = spawned_processes[0] + assert leader.returncode == 0 + # The child is deliberately left running; the spawned_processes teardown + # SIGKILLs the spawn-time process group to reap it. + + +@pytest.mark.anyio +@pytest.mark.usefixtures("spawned_processes") # failure-path safety net for the parked child +# Excluded from coverage for the same Windows-runner reason as above. +async def test_a_surviving_childs_write_to_the_inherited_stdout_fails_with_epipe() -> None: # pragma: lax no cover + """Once the client is gone, a surviving child writing to the stdout pipe it + inherited from the server gets EPIPE: the pipe's only read end was the + client's, and shutdown closed it deterministically rather than at GC time. + + Pins the docs/migration.md claim "a surviving child that keeps writing to an + inherited stdout receives EPIPE/SIGPIPE once the client is gone" (SDK-defined; + documented but previously unproven). + + Steps: + 1. The server hands its stdio pipes to a child and exits on stdin closure. + 2. The child parks on its socket until the test signals that `stdio_client` + has fully exited, so the write cannot race the transport teardown. + 3. The child writes one byte to its inherited fd 1 and reports the errno + (0 on success) back over the socket. + """ + sock, port = await open_liveness_listener() + async with sock: + # The child pins SIGPIPE to SIG_IGN explicitly (CPython already starts + # that way) so the write observably fails with EPIPE instead of the test + # depending on interpreter startup details for the child's survival. + child = ( + f"import os, signal, socket\n" + f"signal.signal(signal.SIGPIPE, signal.SIG_IGN)\n" + f"s = socket.create_connection(('127.0.0.1', {port}))\n" + f"s.sendall(b'alive')\n" + f"s.recv(4)\n" + f"try:\n" + f" os.write(1, b'x')\n" + f" result = b'0'\n" + f"except OSError as e:\n" + f" result = str(e.errno).encode()\n" + f"s.sendall(result)\n" + ) + server = f"import subprocess, sys\nsubprocess.Popen([sys.executable, '-c', {child!r}])\nsys.stdin.read()\n" + params = StdioServerParameters(command=sys.executable, args=["-c", server]) + + # Two interpreter cold starts on a loaded runner; healthy runs take ~0.3s. + with anyio.fail_after(10.0): + async with stdio_client(params): + child_stream = await accept_alive(sock) + async with child_stream: + # The context has fully exited: the transport, and with it the + # pipe's only read end, is closed. Release the child's write. + await child_stream.send(b"go") + # The child sends its errno report and exits, so read to EOF: the + # complete reply is everything before the kernel's FIN. + reply = b"" + with suppress(anyio.EndOfStream): + while True: + reply += await child_stream.receive(16) + + assert int(reply) == errno.EPIPE, f"child reported errno {reply!r}, expected EPIPE" diff --git a/tests/transports/stdio/test_windows.py b/tests/transports/stdio/test_windows.py new file mode 100644 index 0000000000..b9695cadd6 --- /dev/null +++ b/tests/transports/stdio/test_windows.py @@ -0,0 +1,253 @@ +"""Windows-only stdio lifecycle behaviors, against real subprocesses. + +Each test pins a contract that exists only on Windows: Job-Object reaping of a +gracefully-exited server's children (the deliberate divergence from the POSIX +policy in test_posix.py), the SelectorEventLoop fallback wrapper, and the CRLF +line endings a native text-mode server emits. Synchronization is kernel-level +only (liveness sockets); see `_liveness`. + +These bodies run solely on windows-latest CI legs, so each test function carries +the same no-cover exclusion as tests/issues/test_552_windows_hang.py: the per-job +100% coverage gate on non-Windows runners would otherwise count them as uncovered, +and strict-no-cover (which would object to an executed excluded line) is skipped +on the Windows runners where they do execute. +""" + +import asyncio +import sys +from contextlib import AsyncExitStack +from pathlib import Path + +import anyio +import anyio.abc +import pytest + +from mcp.client.stdio import StdioServerParameters, stdio_client +from mcp.os.win32.utilities import FallbackProcess +from mcp.shared.message import SessionMessage +from mcp.types import JSONRPCRequest, JSONRPCResponse +from tests.transports.stdio._liveness import ( + accept_alive, + assert_stream_closed, + connect_back_script, + open_liveness_listener, +) + +pytestmark = [ + pytest.mark.anyio, + pytest.mark.skipif(sys.platform != "win32", reason="Windows Job Object / event-loop semantics"), +] + + +async def test_a_gracefully_exited_servers_child_is_reaped_when_the_job_handle_closes( # pragma: no cover + tmp_path: Path, + spawned_processes: list[anyio.abc.Process | FallbackProcess], + terminate_calls: list[anyio.abc.Process | FallbackProcess], +) -> None: + """A server that exits cleanly on stdin closure leaves a child behind; on Windows + that child is killed when shutdown closes the server's Job Object handle + (`close_process_job` + `JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE`) — deterministically, + not whenever the handle happens to be garbage-collected. This is the documented + divergence from POSIX, where the identical scenario leaves the child alive + (docs/migration.md, "stdio_client no longer kills children of a gracefully-exited + server on POSIX"; the POSIX twin is + test_posix.py::test_a_gracefully_exiting_servers_child_survives_the_client_shutdown). + + `terminate_calls == []` is the load-bearing distinction: it proves the child died + through the graceful path's job-handle close and not through the escalation's + `TerminateJobObject` — the two kills are indistinguishable on the socket. + + The server connects back too (not just the child), the child's stderr is routed + into the server's, and both are captured through `errlog`; the child prints a + startup marker there, and the server reports the child's `poll()` status after + stdin EOF ends it. A timeout failure then reports how many connections arrived + (so which process never showed), how long the spawn took, and the captured + stderr verbatim — including the child's fate — since xdist swallows subprocess + stderr on CI, and without the capture a broken spawn chain is undiagnosable + there. + """ + async with AsyncExitStack() as stack: + sock, port = await open_liveness_listener() + stack.push_async_callback(sock.aclose) + + # The startup marker (and any child traceback, via the Popen's + # stderr=sys.stderr below) lands in errlog, splitting "child never + # spawned/started" from "child started but could not connect". + child = "import sys\nprint('child-started', file=sys.stderr, flush=True)\n" + connect_back_script(port) + # The server spawns a child (its Popen failure, if any, is surfaced on + # stderr), connects back itself, then exits as soon as its stdin closes — + # the well-behaved graceful path, so the escalation never runs. The child + # inherits Job membership because the SDK assigns the server to the Job + # synchronously after the spawn returns, while the server's interpreter is + # still cold-starting — long before it can Popen the child (job membership + # is inherited at CreateProcess, never acquired retroactively). + # + # The child's stdin must be decoupled from the server's (DEVNULL): CPython + # startup queries fd 0, the server's stdin is a synchronous pipe object, + # and Windows serializes that query behind the server's pending blocking + # `sys.stdin.read()` — with an inherited stdin the child freezes at + # interpreter startup until the next inbound byte or EOF. (The same goes + # for any Windows stdio server spawning Python children without + # redirecting their stdin.) After stdin EOF ends the server, it reports + # the child's `poll()` status — `None` means the child was alive when the + # server exited; an exit or NTSTATUS code names whatever killed it. + server = ( + f"import socket, subprocess, sys\n" + f"try:\n" + f" p = subprocess.Popen([sys.executable, '-c', {child!r}], " + f"stdin=subprocess.DEVNULL, stderr=sys.stderr)\n" + f"except BaseException as exc:\n" + f" print(exc, file=sys.stderr, flush=True)\n" + f" raise\n" + f"s = socket.create_connection(('127.0.0.1', {port}))\n" + f"s.sendall(b'alive')\n" + f"sys.stdin.read()\n" + f"print('child-rc:%s' % p.poll(), file=sys.stderr, flush=True)\n" + ) + server_params = StdioServerParameters(command=sys.executable, args=["-c", server]) + + with (tmp_path / "errlog.txt").open("w+", encoding="utf-8") as errlog: + + def server_stderr() -> str: + errlog.seek(0) + return errlog.read() + + streams: list[anyio.abc.SocketStream] = [] + spawn_started = anyio.current_time() + entered_at: float | None = None + try: + # The bound covers two Python interpreter cold starts on a loaded + # runner; a healthy run takes well under a second. + with anyio.fail_after(15.0): + async with stdio_client(server_params, errlog=errlog): + entered_at = anyio.current_time() + # The server and child race to connect; accept both, + # order-agnostic (accept_alive verifies each banner). + for _ in range(2): + stream = await accept_alive(sock) + stack.push_async_callback(stream.aclose) + streams.append(stream) + except TimeoutError: + # By the time this clause runs, `stdio_client.__aexit__` has already + # completed its shielded shutdown on the way out of the `async + # with`: stdin closed, the server printed its `child-rc` line and + # exited. The stderr read below therefore carries the child's fate, + # not a mid-flight snapshot. + missing_leg = "the server never ran its connect line" if not streams else "the child never connected" + spawn_split = ( + "the context never entered" + if entered_at is None + else f"the context entered {entered_at - spawn_started:.1f}s after spawn began" + ) + pytest.fail( + f"{len(streams)}/2 liveness connections arrived ({missing_leg}); " + f"{spawn_split}; server stderr: {server_stderr()!r}" + ) + + # Both peers connected and the context has fully exited, closing the + # job handle. KILL_ON_JOB_CLOSE must have killed the child, and the + # server died with its graceful exit: both sockets close. The + # `spawned_processes` recording is load-bearing here beyond + # observability: `_process_jobs` is weak-keyed, and the recorded strong + # reference pins the process object (and with it the job-handle entry) + # across this assertion window — without it, a GC between context exit + # and this assert could close the handle itself and mask a regression + # in the deterministic close. + try: + for stream in streams: + await assert_stream_closed(stream) + except TimeoutError: + pytest.fail(f"a socket stayed open after shutdown; server stderr: {server_stderr()!r}") + + leader = spawned_processes[0] + # The graceful path: the server exited on stdin closure with code 0, + # and the tree-termination escalation was never invoked. + assert leader.returncode == 0, server_stderr() + assert terminate_calls == [], server_stderr() + + +# Overrides the suite-wide plain-"asyncio" anyio_backend fixture for this test only: +# a selector event loop cannot run asyncio subprocesses, which is exactly the +# environment that forces stdio_client onto the FallbackProcess path. +@pytest.mark.parametrize("anyio_backend", [("asyncio", {"loop_factory": asyncio.SelectorEventLoop})]) +async def test_a_selector_event_loop_session_uses_the_fallback_process_and_exits_cleanly( # pragma: no cover + spawned_processes: list[anyio.abc.Process | FallbackProcess], + terminate_calls: list[anyio.abc.Process | FallbackProcess], +) -> None: + """Under a `SelectorEventLoop` (no asyncio subprocess support), `stdio_client` + falls back to the Popen-based `FallbackProcess` wrapper and a well-behaved + server still completes the full clean lifecycle: spawn, liveness, exit on stdin + closure, reaped, never escalated against. + + The `isinstance` check is the engagement proof: if a future anyio gains selector + subprocess support, the spawn silently returns a normal Process and this test + would otherwise stop testing the fallback stack without failing. A hang here + (a `fail_after` TimeoutError — or, if the reader thread is truly parked in a + synchronous `ReadFile`, a hard hang that `fail_after` cannot interrupt) most + likely means that known fallback hazard, documented in `stdio_client`'s + shutdown comment — which is why this test pins only the clean-exit path, never + a kill path. + """ + async with AsyncExitStack() as stack: + sock, port = await open_liveness_listener() + stack.push_async_callback(sock.aclose) + + # Connect back for liveness, then exit as soon as stdin closes: the + # well-behaved server, so shutdown's first step suffices. + server = ( + f"import socket, sys\n" + f"s = socket.create_connection(('127.0.0.1', {port}))\n" + f"s.sendall(b'alive')\n" + f"sys.stdin.read()\n" + ) + server_params = StdioServerParameters(command=sys.executable, args=["-c", server]) + + # One interpreter cold start on a loaded runner; healthy runs take ~0.3s. + with anyio.fail_after(10.0): + async with stdio_client(server_params): + stream = await accept_alive(sock) + stack.push_async_callback(stream.aclose) + # The engagement proof, asserted while the session is live. + assert isinstance(spawned_processes[0], FallbackProcess) + + # The server exited on stdin closure: socket closed, exit code 0, and the + # escalation never fired. + await assert_stream_closed(stream) + assert spawned_processes[0].returncode == 0 + assert terminate_calls == [] + + +async def test_a_native_server_emitting_crlf_line_endings_round_trips_messages() -> None: # pragma: no cover + """A text-mode Windows server frames its output with \\r\\n (`TextIOWrapper`'s + `newline=None` translates "\\n" to `os.linesep`), and the client still parses + each line: the reader splits on "\\n" only, so the trailing "\\r" reaches the + JSON parser and is tolerated as whitespace. The SDK's own server writes through + exactly such a wrapper, so this tolerance is load-bearing for Windows interop. + + tests/issues/test_552_windows_hang.py exercises the same wire form implicitly + through `initialize()`; this test is the explicit owner of the framing claim, + driving `stdio_client`'s public streams with no session on top. + """ + # Read one request, answer it via print() — which emits \r\n on Windows — then + # exit when stdin closes. json.loads/dumps keep the script free of SDK imports. + server = ( + "import json, sys\n" + "line = sys.stdin.readline()\n" + "request = json.loads(line)\n" + "print(json.dumps({'jsonrpc': '2.0', 'id': request['id'], 'result': {}}))\n" + "sys.stdout.flush()\n" + "sys.stdin.read()\n" + ) + server_params = StdioServerParameters(command=sys.executable, args=["-c", server]) + + ping = JSONRPCRequest(jsonrpc="2.0", id=1, method="ping") + + # One interpreter cold start on a loaded runner; healthy runs take ~0.3s. + with anyio.fail_after(10.0): + async with stdio_client(server_params) as (read_stream, write_stream): + await write_stream.send(SessionMessage(ping)) + received = await read_stream.receive() + # A reader that choked on the trailing \r would deliver a ValueError + # here instead of a parsed message. + assert isinstance(received, SessionMessage) + assert received.message == JSONRPCResponse(jsonrpc="2.0", id=1, result={})