Skip to content

Commit f20dee9

Browse files
authored
feat: Windows TCP fallback for embedded gRPC server (#39)
* feat: Windows TCP fallback for embedded gRPC server On Windows, Unix domain sockets are not available. This change: - Adds _start_tcp() that spawns capiscio-core with --address flag using an ephemeral TCP port (same approach as capiscio-mcp-python) - Adds _start_unix_socket() to keep existing Unix socket behavior on macOS/Linux unchanged - Extracts _wait_grpc_ready() to share readiness probe logic - Adds _find_free_port() using socket.bind(('', 0)) - Fixes client.py fallback address to use localhost:50051 on Windows - Adds 7 unit tests covering the new code paths * fix: address Copilot review - pipe drain + defensive decode - Add _drain_pipes() to close stdout/stderr after successful startup, preventing OS pipe buffer fill on long-lived server processes - Use errors='replace' in .decode() calls to avoid masking real failures with UnicodeDecodeError - Call self.stop() before raising in _wait_grpc_ready and socket wait loop to keep singleton state consistent
1 parent 072cce8 commit f20dee9

3 files changed

Lines changed: 184 additions & 28 deletions

File tree

capiscio_sdk/_rpc/client.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
"""gRPC client wrapper for capiscio-core."""
22

3+
import sys
34
from typing import Generator, Optional
45

56
import grpc
@@ -95,7 +96,10 @@ def connect(self, timeout: float = 10.0) -> "CapiscioRPCClient":
9596
self._process_manager = get_process_manager()
9697
address = self._process_manager.ensure_running(timeout=timeout)
9798
elif address is None:
98-
address = "unix:///tmp/capiscio.sock"
99+
if sys.platform == "win32":
100+
address = "localhost:50051"
101+
else:
102+
address = "unix:///tmp/capiscio.sock"
99103

100104
# Create channel
101105
if address.startswith("unix://"):

capiscio_sdk/_rpc/process.py

Lines changed: 91 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import os
66
import platform
77
import shutil
8+
import socket
89
import stat
910
import subprocess
1011
import sys
@@ -245,6 +246,41 @@ def ensure_running(
245246
binary = self._download_binary()
246247
self._binary_path = binary
247248

249+
# Windows doesn't support Unix sockets — use TCP instead
250+
if sys.platform == "win32":
251+
return self._start_tcp(binary, timeout)
252+
else:
253+
return self._start_unix_socket(binary, socket_path, timeout)
254+
255+
def _start_tcp(self, binary: Path, timeout: float) -> str:
256+
"""Start the gRPC server with a TCP listener (used on Windows)."""
257+
port = self._find_free_port()
258+
addr = f"localhost:{port}"
259+
cmd = [str(binary), "rpc", "--address", addr]
260+
261+
try:
262+
popen_kwargs = {
263+
"stdout": subprocess.PIPE,
264+
"stderr": subprocess.PIPE,
265+
}
266+
if sys.platform == "win32":
267+
popen_kwargs["creationflags"] = subprocess.CREATE_NEW_PROCESS_GROUP
268+
else:
269+
popen_kwargs["start_new_session"] = True
270+
self._process = subprocess.Popen(cmd, **popen_kwargs)
271+
except Exception as e:
272+
raise RuntimeError(f"Failed to start capiscio server: {e}") from e
273+
274+
self._tcp_address = addr
275+
self._wait_grpc_ready(addr, timeout)
276+
self._drain_pipes()
277+
self._started = True
278+
return self.address
279+
280+
def _start_unix_socket(
281+
self, binary: Path, socket_path: Optional[Path], timeout: float
282+
) -> str:
283+
"""Start the gRPC server with a Unix socket listener."""
248284
# Set up socket path
249285
self._socket_path = socket_path or DEFAULT_SOCKET_PATH
250286

@@ -268,7 +304,7 @@ def ensure_running(
268304
except Exception as e:
269305
raise RuntimeError(f"Failed to start capiscio server: {e}") from e
270306

271-
# Wait for socket to appear
307+
# Wait for socket file to appear
272308
start_time = time.time()
273309
while time.time() - start_time < timeout:
274310
if self._socket_path.exists():
@@ -277,10 +313,11 @@ def ensure_running(
277313
# Check if process died
278314
if self._process.poll() is not None:
279315
stdout, stderr = self._process.communicate()
316+
self.stop()
280317
raise RuntimeError(
281318
f"capiscio server exited unexpectedly:\n"
282-
f"stdout: {stdout.decode()}\n"
283-
f"stderr: {stderr.decode()}"
319+
f"stdout: {stdout.decode(errors='replace') if stdout else ''}\n"
320+
f"stderr: {stderr.decode(errors='replace') if stderr else ''}"
284321
)
285322

286323
time.sleep(0.1)
@@ -294,33 +331,60 @@ def ensure_running(
294331

295332
# Socket exists — verify gRPC is actually accepting connections
296333
remaining = timeout - (time.time() - start_time)
297-
if remaining > 0:
298-
import grpc
299-
addr = f"unix://{self._socket_path}"
300-
deadline = time.time() + remaining
301-
while time.time() < deadline:
302-
time_left = deadline - time.time()
303-
if time_left <= 0:
304-
break
305-
channel = grpc.insecure_channel(addr)
306-
try:
307-
grpc.channel_ready_future(channel).result(timeout=min(1.0, time_left))
308-
break
309-
except grpc.FutureTimeoutError:
310-
time.sleep(0.1)
311-
except Exception:
312-
time.sleep(0.1)
313-
finally:
314-
channel.close()
315-
else:
334+
addr = f"unix://{self._socket_path}"
335+
self._wait_grpc_ready(addr, remaining)
336+
self._drain_pipes()
337+
self._started = True
338+
return self.address
339+
340+
@staticmethod
341+
def _find_free_port() -> int:
342+
"""Find a free TCP port by binding to port 0."""
343+
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
344+
s.bind(("", 0))
345+
return s.getsockname()[1]
346+
347+
def _drain_pipes(self) -> None:
348+
"""Close piped stdout/stderr to prevent OS buffer fill on long-lived processes."""
349+
if self._process is not None:
350+
if self._process.stdout:
351+
self._process.stdout.close()
352+
if self._process.stderr:
353+
self._process.stderr.close()
354+
355+
def _wait_grpc_ready(self, addr: str, remaining: float) -> None:
356+
"""Wait for the gRPC server to accept connections."""
357+
if remaining <= 0:
358+
return
359+
import grpc
360+
deadline = time.time() + remaining
361+
while time.time() < deadline:
362+
# Check if process died
363+
if self._process is not None and self._process.poll() is not None:
364+
stdout, stderr = self._process.communicate()
316365
self.stop()
317366
raise RuntimeError(
318-
f"capiscio server socket appeared but gRPC not ready "
319-
f"within {timeout}s at {self._socket_path}"
367+
f"capiscio server exited unexpectedly:\n"
368+
f"stdout: {stdout.decode(errors='replace') if stdout else ''}\n"
369+
f"stderr: {stderr.decode(errors='replace') if stderr else ''}"
320370
)
321-
322-
self._started = True
323-
return self.address
371+
time_left = deadline - time.time()
372+
if time_left <= 0:
373+
break
374+
channel = grpc.insecure_channel(addr)
375+
try:
376+
grpc.channel_ready_future(channel).result(timeout=min(1.0, time_left))
377+
return
378+
except grpc.FutureTimeoutError:
379+
time.sleep(0.1)
380+
except Exception:
381+
time.sleep(0.1)
382+
finally:
383+
channel.close()
384+
self.stop()
385+
raise RuntimeError(
386+
f"capiscio server gRPC not ready within timeout at {addr}"
387+
)
324388

325389
def stop(self) -> None:
326390
"""Stop the gRPC server process."""

tests/unit/test_process.py

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import os
44
import platform
55
import pytest
6+
import sys
67
from pathlib import Path
78
from unittest.mock import MagicMock, patch, mock_open
89

@@ -216,3 +217,90 @@ def test_binary_download_triggered_when_not_found(self):
216217
# Just verify the method exists and can be mocked for integration
217218
assert hasattr(pm, "_download_binary")
218219
assert callable(pm._download_binary)
220+
221+
def test_find_free_port(self):
222+
"""Test that _find_free_port returns a valid port number."""
223+
port = ProcessManager._find_free_port()
224+
assert isinstance(port, int)
225+
assert 1 <= port <= 65535
226+
227+
def test_ensure_running_delegates_to_tcp_on_windows(self):
228+
"""Test ensure_running uses TCP when sys.platform is win32."""
229+
pm = ProcessManager()
230+
mock_binary = MagicMock()
231+
232+
with patch.object(pm, "find_binary", return_value=mock_binary):
233+
with patch("capiscio_sdk._rpc.process.sys.platform", "win32"):
234+
with patch.object(pm, "_start_tcp", return_value="localhost:9999") as mock_tcp:
235+
result = pm.ensure_running()
236+
mock_tcp.assert_called_once_with(mock_binary, 5.0)
237+
assert result == "localhost:9999"
238+
239+
def test_ensure_running_delegates_to_unix_socket_on_posix(self):
240+
"""Test ensure_running uses Unix socket when not on Windows."""
241+
pm = ProcessManager()
242+
mock_binary = MagicMock()
243+
244+
with patch.object(pm, "find_binary", return_value=mock_binary):
245+
with patch("capiscio_sdk._rpc.process.sys.platform", "darwin"):
246+
with patch.object(
247+
pm, "_start_unix_socket", return_value="unix:///tmp/test.sock"
248+
) as mock_unix:
249+
result = pm.ensure_running()
250+
mock_unix.assert_called_once_with(mock_binary, None, 5.0)
251+
assert result == "unix:///tmp/test.sock"
252+
253+
def test_start_tcp_spawns_with_address_flag(self):
254+
"""Test _start_tcp spawns the binary with --address flag."""
255+
pm = ProcessManager()
256+
mock_binary = Path("/tmp/capiscio")
257+
258+
with patch.object(ProcessManager, "_find_free_port", return_value=54321):
259+
with patch("subprocess.Popen") as mock_popen:
260+
mock_proc = MagicMock()
261+
mock_proc.poll.return_value = None
262+
mock_popen.return_value = mock_proc
263+
264+
with patch.object(pm, "_wait_grpc_ready"):
265+
pm._start_tcp(mock_binary, timeout=5.0)
266+
267+
# Verify it used --address, not --socket
268+
call_args = mock_popen.call_args
269+
cmd = call_args[0][0]
270+
assert cmd == ["/tmp/capiscio", "rpc", "--address", "localhost:54321"]
271+
assert pm._tcp_address == "localhost:54321"
272+
273+
def test_start_tcp_uses_platform_appropriate_process_isolation(self):
274+
"""Test _start_tcp uses correct process isolation per platform."""
275+
pm = ProcessManager()
276+
mock_binary = Path("/tmp/capiscio")
277+
278+
with patch.object(ProcessManager, "_find_free_port", return_value=12345):
279+
with patch("subprocess.Popen") as mock_popen:
280+
mock_proc = MagicMock()
281+
mock_proc.poll.return_value = None
282+
mock_popen.return_value = mock_proc
283+
284+
with patch.object(pm, "_wait_grpc_ready"):
285+
pm._start_tcp(mock_binary, timeout=5.0)
286+
287+
call_kwargs = mock_popen.call_args[1]
288+
if sys.platform == "win32":
289+
import subprocess
290+
assert call_kwargs["creationflags"] == subprocess.CREATE_NEW_PROCESS_GROUP
291+
else:
292+
assert call_kwargs["start_new_session"] is True
293+
294+
def test_address_property_returns_tcp_when_set(self):
295+
"""Test address property returns TCP address when set."""
296+
pm = ProcessManager()
297+
pm._tcp_address = "localhost:50051"
298+
assert pm.address == "localhost:50051"
299+
300+
def test_address_property_returns_unix_by_default(self):
301+
"""Test address property returns Unix socket by default."""
302+
pm = ProcessManager()
303+
pm._tcp_address = None
304+
pm._socket_path = None
305+
from capiscio_sdk._rpc.process import DEFAULT_SOCKET_PATH
306+
assert pm.address == f"unix://{DEFAULT_SOCKET_PATH}"

0 commit comments

Comments
 (0)