From 67d64cb0e74a4bde3acd0e7cf894fded64f8f3d5 Mon Sep 17 00:00:00 2001 From: Gary van der Merwe Date: Sat, 10 Jan 2026 14:52:26 +0200 Subject: [PATCH 1/3] pytestadapter: Remove unused code for connecting to a unix socket. --- python_files/tests/pytestadapter/helpers.py | 117 +++++--------------- 1 file changed, 26 insertions(+), 91 deletions(-) diff --git a/python_files/tests/pytestadapter/helpers.py b/python_files/tests/pytestadapter/helpers.py index 03f1187149df..8db048bc2714 100644 --- a/python_files/tests/pytestadapter/helpers.py +++ b/python_files/tests/pytestadapter/helpers.py @@ -144,57 +144,30 @@ def _listen_on_fifo(pipe_name: str, result: List[str], completed: threading.Even result.append(data) -def _listen_on_pipe_new(listener, result: List[str], completed: threading.Event): - """Listen on the named pipe or Unix domain socket for JSON data from the server. - - Created as a separate function for clarity in threading context. - """ - # Windows design - if sys.platform == "win32": - all_data: list = [] - stream = listener.wait() - while True: - # Read data from collection - close = stream.closed - if close: - break - data = stream.readlines() - if not data: - if completed.is_set(): - break # Exit loop if completed event is set - else: - try: - # Attempt to accept another connection if the current one closes unexpectedly - print("attempt another connection") - except socket.timeout: - # On timeout, append all collected data to result and return - # result.append("".join(all_data)) - return - data_decoded = "".join(data) - all_data.append(data_decoded) - # Append all collected data to result array - result.append("".join(all_data)) - else: # Unix design - connection, _ = listener.socket.accept() - listener.socket.settimeout(1) - all_data: list = [] - while True: - # Reading from connection - data: bytes = connection.recv(1024 * 1024) - if not data: - if completed.is_set(): - break # Exit loop if completed event is set - else: - try: - # Attempt to accept another connection if the current one closes unexpectedly - connection, _ = listener.socket.accept() - except socket.timeout: - # On timeout, append all collected data to result and return - result.append("".join(all_data)) - return - all_data.append(data.decode("utf-8")) - # Append all collected data to result array - result.append("".join(all_data)) +def _listen_win_named_pipe(listener, result: List[str], completed: threading.Event): + all_data: list = [] + stream = listener.wait() + while True: + # Read data from collection + close = stream.closed + if close: + break + data = stream.readlines() + if not data: + if completed.is_set(): + break # Exit loop if completed event is set + else: + try: + # Attempt to accept another connection if the current one closes unexpectedly + print("attempt another connection") + except socket.timeout: + # On timeout, append all collected data to result and return + # result.append("".join(all_data)) + return + data_decoded = "".join(data) + all_data.append(data_decoded) + # Append all collected data to result array + result.append("".join(all_data)) def _run_test_code(proc_args: List[str], proc_env, proc_cwd: str, completed: threading.Event): @@ -314,7 +287,7 @@ def runner_with_cwd_env( result = [] # result is a string array to store the data during threading t1: threading.Thread = threading.Thread( - target=_listen_on_pipe_new, args=(pipe, result, completed) + target=_listen_win_named_pipe, args=(pipe, result, completed) ) t1.start() @@ -340,13 +313,10 @@ def runner_with_cwd_env( # if additional environment variables are passed, add them to the environment if env_add: env.update(env_add) - # server = UnixPipeServer(pipe_name) - # server.start() - ################# + # Create the FIFO (named pipe) if it doesn't exist # if not pathlib.Path.exists(pipe_name): os.mkfifo(pipe_name) - ################# completed = threading.Event() @@ -432,38 +402,3 @@ def generate_random_pipe_name(prefix=""): return os.path.join(xdg_runtime_dir, f"{prefix}-{random_suffix}") # noqa: PTH118 else: return os.path.join(tempfile.gettempdir(), f"{prefix}-{random_suffix}") # noqa: PTH118 - - -class UnixPipeServer: - def __init__(self, name): - self.name = name - self.is_windows = sys.platform == "win32" - if self.is_windows: - raise NotImplementedError( - "This class is only intended for Unix-like systems, not Windows." - ) - else: - # For Unix-like systems, use a Unix domain socket. - self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - # Ensure the socket does not already exist - try: - os.unlink(self.name) # noqa: PTH108 - except OSError: - if os.path.exists(self.name): # noqa: PTH110 - raise - - def start(self): - if self.is_windows: - raise NotImplementedError( - "This class is only intended for Unix-like systems, not Windows." - ) - else: - # Bind the socket to the address and listen for incoming connections. - self.socket.bind(self.name) - self.socket.listen(1) - print(f"Server listening on {self.name}") - - def stop(self): - # Clean up the server socket. - self.socket.close() - print("Server stopped.") From 9724bbf788aa9a207acd1c80e1fbad0e76d4651b Mon Sep 17 00:00:00 2001 From: Gary van der Merwe Date: Sat, 10 Jan 2026 20:26:42 +0200 Subject: [PATCH 2/3] pytestadapter: Refactor test runner to make platform specific named pipe code encapuslated. --- python_files/tests/pytestadapter/helpers.py | 200 ++++++++------------ 1 file changed, 81 insertions(+), 119 deletions(-) diff --git a/python_files/tests/pytestadapter/helpers.py b/python_files/tests/pytestadapter/helpers.py index 8db048bc2714..805865b531c4 100644 --- a/python_files/tests/pytestadapter/helpers.py +++ b/python_files/tests/pytestadapter/helpers.py @@ -14,10 +14,6 @@ import uuid from typing import Any, Dict, List, Optional, Tuple -if sys.platform == "win32": - from namedpipe import NPopen - - script_dir = pathlib.Path(__file__).parent.parent.parent script_dir_child = pathlib.Path(__file__).parent.parent sys.path.append(os.fspath(script_dir)) @@ -128,52 +124,84 @@ def parse_rpc_message(data: str) -> Tuple[Dict[str, str], str]: print("json decode error") -def _listen_on_fifo(pipe_name: str, result: List[str], completed: threading.Event): - # Open the FIFO for reading - fifo_path = pathlib.Path(pipe_name) - with fifo_path.open() as fifo: - print("Waiting for data...") - while True: - if completed.is_set(): - break # Exit loop if completed event is set - data = fifo.read() # This will block until data is available - if len(data) == 0: - # If data is empty, assume EOF - break - print(f"Received: {data}") - result.append(data) - - -def _listen_win_named_pipe(listener, result: List[str], completed: threading.Event): - all_data: list = [] - stream = listener.wait() - while True: - # Read data from collection - close = stream.closed - if close: - break - data = stream.readlines() - if not data: - if completed.is_set(): - break # Exit loop if completed event is set - else: +if sys.platform == "win32": + from namedpipe import NPopen + + @contextlib.contextmanager + def pipe_setup_and_listen(pipe_name: str, result: List[str]): + # For Windows, named pipes have a specific naming convention. + pipe_path = f"\\\\.\\pipe\\{pipe_name}" + + with NPopen("r+t", name=pipe_name, bufsize=0) as pipe: + completed = threading.Event() + + def listen(): + all_data: list = [] + stream = pipe.wait() + while True: + # Read data from collection + close = stream.closed + if close: + break + data = stream.readlines() + if not data: + if completed.is_set(): + break # Exit loop if completed event is set + else: + try: + # Attempt to accept another connection if the current one closes unexpectedly + print("attempt another connection") + except socket.timeout: + # On timeout, append all collected data to result and return + # result.append("".join(all_data)) + return + data_decoded = "".join(data) + all_data.append(data_decoded) + # Append all collected data to result array + result.append("".join(all_data)) + + thread = threading.Thread(target=listen) + thread.start() try: - # Attempt to accept another connection if the current one closes unexpectedly - print("attempt another connection") - except socket.timeout: - # On timeout, append all collected data to result and return - # result.append("".join(all_data)) - return - data_decoded = "".join(data) - all_data.append(data_decoded) - # Append all collected data to result array - result.append("".join(all_data)) + yield pipe_path + finally: + completed.set() + thread.join() +else: + + @contextlib.contextmanager + def pipe_setup_and_listen(pipe_name: str, result: List[str]): + # For Unix-like systems, use either the XDG_RUNTIME_DIR or a temporary directory. + xdg_runtime_dir = os.getenv("XDG_RUNTIME_DIR") + pipe_path = pathlib.Path( + xdg_runtime_dir if xdg_runtime_dir else tempfile.gettempdir(), + pipe_name, + ) + os.mkfifo(pipe_path) + completed = threading.Event() -def _run_test_code(proc_args: List[str], proc_env, proc_cwd: str, completed: threading.Event): - result = subprocess.run(proc_args, env=proc_env, cwd=proc_cwd) - completed.set() - return result + def listen(): + # Open the FIFO for reading + with pipe_path.open() as fifo: + print("Waiting for data...") + while True: + if completed.is_set(): + break # Exit loop if completed event is set + data = fifo.read() # This will block until data is available + if len(data) == 0: + # If data is empty, assume EOF + break + print(f"Received: {data}") + result.append(data) + + thread = threading.Thread(target=listen) + thread.start() + try: + yield pipe_path + finally: + completed.set() + thread.join() def runner(args: List[str]) -> Optional[List[Dict[str, Any]]]: @@ -266,77 +294,20 @@ def runner_with_cwd_env( *args, ] - # Generate pipe name, pipe name specific per OS type. - - # Windows design - if sys.platform == "win32": - with NPopen("r+t", name=pipe_name, bufsize=0) as pipe: - # Update the environment with the pipe name and PYTHONPATH. - env = os.environ.copy() - env.update( - { - "TEST_RUN_PIPE": pipe.path, - "PYTHONPATH": os.fspath(pathlib.Path(__file__).parent.parent.parent), - } - ) - # if additional environment variables are passed, add them to the environment - if env_add: - env.update(env_add) - - completed = threading.Event() - - result = [] # result is a string array to store the data during threading - t1: threading.Thread = threading.Thread( - target=_listen_win_named_pipe, args=(pipe, result, completed) - ) - t1.start() - - t2 = threading.Thread( - target=_run_test_code, - args=(process_args, env, path, completed), - ) - t2.start() - - t1.join() - t2.join() - - return process_data_received(result[0]) if result else None - else: # Unix design - # Update the environment with the pipe name and PYTHONPATH. + result = [] # result is a string array to store the data during threading + with pipe_setup_and_listen(pipe_name, result) as pipe_path: env = os.environ.copy() env.update( { - "TEST_RUN_PIPE": pipe_name, + "TEST_RUN_PIPE": pipe_path, "PYTHONPATH": os.fspath(pathlib.Path(__file__).parent.parent.parent), } ) # if additional environment variables are passed, add them to the environment if env_add: env.update(env_add) - - # Create the FIFO (named pipe) if it doesn't exist - # if not pathlib.Path.exists(pipe_name): - os.mkfifo(pipe_name) - - completed = threading.Event() - - result = [] # result is a string array to store the data during threading - t1: threading.Thread = threading.Thread( - target=_listen_on_fifo, args=(pipe_name, result, completed) - ) - t1.start() - - t2: threading.Thread = threading.Thread( - target=_run_test_code, - args=(process_args, env, path, completed), - ) - - t2.start() - - t1.join() - t2.join() - - return process_data_received(result[0]) if result else None + subprocess.run(process_args, env=env, cwd=path) + return process_data_received(result[0]) if result else None def find_test_line_number(test_name: str, test_file_path) -> str: @@ -392,13 +363,4 @@ def generate_random_pipe_name(prefix=""): if not prefix: prefix = "python-ext-rpc" - # For Windows, named pipes have a specific naming convention. - if sys.platform == "win32": - return f"\\\\.\\pipe\\{prefix}-{random_suffix}" - - # For Unix-like systems, use either the XDG_RUNTIME_DIR or a temporary directory. - xdg_runtime_dir = os.getenv("XDG_RUNTIME_DIR") - if xdg_runtime_dir: - return os.path.join(xdg_runtime_dir, f"{prefix}-{random_suffix}") # noqa: PTH118 - else: - return os.path.join(tempfile.gettempdir(), f"{prefix}-{random_suffix}") # noqa: PTH118 + return f"{prefix}-{random_suffix}" From 31e861a9d2f5f08ab70ee6300af055cfcda3dc90 Mon Sep 17 00:00:00 2001 From: Gary van der Merwe Date: Sat, 10 Jan 2026 21:33:10 +0200 Subject: [PATCH 3/3] pytestadapter: Read pipe in non-blocking mode so tests don't hang if the subprocess fails to open the pipe for writing. --- python_files/tests/pytestadapter/helpers.py | 30 ++++++++++++++------- 1 file changed, 21 insertions(+), 9 deletions(-) diff --git a/python_files/tests/pytestadapter/helpers.py b/python_files/tests/pytestadapter/helpers.py index 805865b531c4..e10db6118ec9 100644 --- a/python_files/tests/pytestadapter/helpers.py +++ b/python_files/tests/pytestadapter/helpers.py @@ -6,6 +6,7 @@ import json import os import pathlib +import select import socket import subprocess import sys @@ -182,18 +183,29 @@ def pipe_setup_and_listen(pipe_name: str, result: List[str]): completed = threading.Event() def listen(): - # Open the FIFO for reading - with pipe_path.open() as fifo: - print("Waiting for data...") + # When using blocking IO, open blocks forever if the subprocess compleates but never + # opens the pipe for writing (which may happen if there is an error early in the + # subprocess.) Hence we go to the effort of using non-blocking io so that we can + # break out of this function if that happens. + fd = os.open(pipe_path, os.O_RDONLY | os.O_NONBLOCK) + try: + all_data = bytearray() while True: if completed.is_set(): - break # Exit loop if completed event is set - data = fifo.read() # This will block until data is available - if len(data) == 0: - # If data is empty, assume EOF break - print(f"Received: {data}") - result.append(data) + + # Wait till the pipe has data to read, with a timeout. + rlist, _, _ = select.select([fd], [], [], 0.1) + if rlist: + # Data is available, read it. + data = os.read(fd, 1024) + if not data: + # Empty data indicates EOF. + break + all_data.extend(data) + result.append(all_data.decode()) + finally: + os.close(fd) thread = threading.Thread(target=listen) thread.start()