From 67c60bb223e684bf24b8902c090b657f38c1119b Mon Sep 17 00:00:00 2001 From: Sohil Kshirsagar Date: Wed, 1 Apr 2026 00:45:35 -0700 Subject: [PATCH 01/13] feat: add coverage snapshot server for code coverage POC When TUSK_COVERAGE_PORT env var is set, the SDK starts a tiny HTTP server that manages coverage.py. On each /snapshot request: - Stop coverage, get data, erase (reset), restart - Returns per-file line counts with clean per-test data - No diffing needed (coverage.py supports stop/erase/start cycle) Works with Flask, FastAPI, Django, gunicorn, uvicorn - any framework, because the SDK runs inside the app process. Requires: pip install coverage (or pip install tusk-drift[coverage]) --- drift/core/coverage_server.py | 135 ++++++++++++++++++++++++++++++++++ drift/core/drift_sdk.py | 4 + 2 files changed, 139 insertions(+) create mode 100644 drift/core/coverage_server.py diff --git a/drift/core/coverage_server.py b/drift/core/coverage_server.py new file mode 100644 index 0000000..3d95daf --- /dev/null +++ b/drift/core/coverage_server.py @@ -0,0 +1,135 @@ +"""Coverage snapshot HTTP server for Python SDK. + +When TUSK_COVERAGE_PORT is set, starts a tiny HTTP server that manages +coverage.py. On each /snapshot request: +1. Stop coverage collection +2. Get coverage data (which lines were executed since last snapshot) +3. Erase coverage data (reset for next test) +4. Restart coverage collection +5. Return per-file line counts as JSON + +This gives clean per-test coverage data - no diffing needed. +""" + +from __future__ import annotations + +import json +import logging +import os +import threading +from http.server import HTTPServer, BaseHTTPRequestHandler + +logger = logging.getLogger("TuskDrift") + + +class CoverageSnapshotHandler(BaseHTTPRequestHandler): + """HTTP handler for coverage snapshot requests.""" + + # Shared state set by start_coverage_server + cov_instance = None + source_root = None + + def do_GET(self): + if self.path == "/snapshot": + self._handle_snapshot() + else: + self.send_response(404) + self.end_headers() + + def _handle_snapshot(self): + try: + cov = self.__class__.cov_instance + source_root = self.__class__.source_root + + if cov is None: + self.send_response(500) + self.send_header("Content-Type", "application/json") + self.end_headers() + self.wfile.write(json.dumps({"ok": False, "error": "coverage not initialized"}).encode()) + return + + # Stop coverage, get data, erase (reset), restart + cov.stop() + data = cov.get_data() + + # Extract per-file line counts + coverage = {} + for filename in data.measured_files(): + # Filter to user source files + if "site-packages" in filename or "lib/python" in filename: + continue + if source_root and not filename.startswith(source_root): + continue + + lines = data.lines(filename) + if lines: + # Convert to { "lineNumber": 1 } format (1 = covered) + coverage[filename] = {str(line): 1 for line in lines} + + # Erase data and restart for next test + cov.erase() + cov.start() + + self.send_response(200) + self.send_header("Content-Type", "application/json") + self.end_headers() + self.wfile.write(json.dumps({"ok": True, "coverage": coverage}).encode()) + + except Exception as e: + self.send_response(500) + self.send_header("Content-Type", "application/json") + self.end_headers() + self.wfile.write(json.dumps({"ok": False, "error": str(e)}).encode()) + + def log_message(self, format, *args): + """Suppress default HTTP server logging.""" + pass + + +def start_coverage_server(port: int | None = None) -> bool: + """Start the coverage snapshot server if TUSK_COVERAGE_PORT is set. + + Returns True if the server was started, False otherwise. + """ + port_str = os.environ.get("TUSK_COVERAGE_PORT") + if not port_str and port is None: + return False + + actual_port = port or int(port_str) + + # Try to import coverage + try: + import coverage as coverage_module + except ImportError: + logger.warning( + "TUSK_COVERAGE_PORT is set but 'coverage' package is not installed. " + "Install it with: pip install coverage" + ) + return False + + source_root = os.getcwd() + + # Start coverage collection + cov = coverage_module.Coverage( + source=[source_root], + omit=[ + "*/site-packages/*", + "*/venv/*", + "*/.venv/*", + "*/test*", + "*/__pycache__/*", + ], + ) + cov.start() + + # Set shared state on the handler class + CoverageSnapshotHandler.cov_instance = cov + CoverageSnapshotHandler.source_root = source_root + + # Start HTTP server in a daemon thread + http_server = HTTPServer(("127.0.0.1", actual_port), CoverageSnapshotHandler) + thread = threading.Thread(target=http_server.serve_forever, daemon=True) + thread.start() + + logger.info(f"Coverage snapshot server listening on port {actual_port}") + return True diff --git a/drift/core/drift_sdk.py b/drift/core/drift_sdk.py index 67927f4..ffacf0b 100644 --- a/drift/core/drift_sdk.py +++ b/drift/core/drift_sdk.py @@ -160,6 +160,10 @@ def initialize( configure_logger(log_level=log_level, prefix="TuskDrift") + # Start coverage server early (before any SDK mode checks that might return early) + from .coverage_server import start_coverage_server + start_coverage_server() + instance._init_params = { "api_key": api_key, "env": env, From 3261a400396f4f7bdca1d5eb2e968c5099294b80 Mon Sep 17 00:00:00 2001 From: Sohil Kshirsagar Date: Wed, 1 Apr 2026 01:16:34 -0700 Subject: [PATCH 02/13] feat: add ?baseline=true parameter using coverage.py analysis2 When /snapshot?baseline=true is called, uses coverage.analysis2() to get ALL coverable statements (including uncovered) for the denominator. Regular /snapshot calls only return executed lines (for per-test data). --- drift/core/coverage_server.py | 57 ++++++++++++++++++++++++----------- 1 file changed, 40 insertions(+), 17 deletions(-) diff --git a/drift/core/coverage_server.py b/drift/core/coverage_server.py index 3d95daf..4e3351b 100644 --- a/drift/core/coverage_server.py +++ b/drift/core/coverage_server.py @@ -30,13 +30,17 @@ class CoverageSnapshotHandler(BaseHTTPRequestHandler): source_root = None def do_GET(self): - if self.path == "/snapshot": - self._handle_snapshot() + from urllib.parse import urlparse, parse_qs + parsed = urlparse(self.path) + if parsed.path == "/snapshot": + params = parse_qs(parsed.query) + is_baseline = params.get("baseline", ["false"])[0] == "true" + self._handle_snapshot(is_baseline) else: self.send_response(404) self.end_headers() - def _handle_snapshot(self): + def _handle_snapshot(self, is_baseline: bool = False): try: cov = self.__class__.cov_instance source_root = self.__class__.source_root @@ -48,23 +52,42 @@ def _handle_snapshot(self): self.wfile.write(json.dumps({"ok": False, "error": "coverage not initialized"}).encode()) return - # Stop coverage, get data, erase (reset), restart + # Stop coverage to read data cov.stop() - data = cov.get_data() - # Extract per-file line counts coverage = {} - for filename in data.measured_files(): - # Filter to user source files - if "site-packages" in filename or "lib/python" in filename: - continue - if source_root and not filename.startswith(source_root): - continue - - lines = data.lines(filename) - if lines: - # Convert to { "lineNumber": 1 } format (1 = covered) - coverage[filename] = {str(line): 1 for line in lines} + + if is_baseline: + # Baseline: return ALL coverable lines (including uncovered at count=0) + # This provides the denominator for coverage percentage. + # analysis2() returns (filename, statements, excluded, missing, formatted) + data = cov.get_data() + for filename in data.measured_files(): + if "site-packages" in filename or "lib/python" in filename: + continue + if source_root and not filename.startswith(source_root): + continue + try: + _, statements, _, missing, _ = cov.analysis2(filename) + missing_set = set(missing) + lines_map = {} + for line in statements: + lines_map[str(line)] = 0 if line in missing_set else 1 + if lines_map: + coverage[filename] = lines_map + except Exception: + continue + else: + # Regular snapshot: only executed lines since last reset + data = cov.get_data() + for filename in data.measured_files(): + if "site-packages" in filename or "lib/python" in filename: + continue + if source_root and not filename.startswith(source_root): + continue + lines = data.lines(filename) + if lines: + coverage[filename] = {str(line): 1 for line in lines} # Erase data and restart for next test cov.erase() From 2a546647b7c295d46c82435914d8c0b5a246c07d Mon Sep 17 00:00:00 2001 From: Sohil Kshirsagar Date: Wed, 1 Apr 2026 01:22:39 -0700 Subject: [PATCH 03/13] chore: add thread safety lock and clean shutdown for coverage server - Threading lock protects stop/get_data/erase/start sequence - stop_coverage_server() for clean shutdown, integrated into SDK shutdown() - Module-level server reference for proper cleanup --- drift/core/coverage_server.py | 116 +++++++++++++++++++--------------- drift/core/drift_sdk.py | 4 ++ 2 files changed, 70 insertions(+), 50 deletions(-) diff --git a/drift/core/coverage_server.py b/drift/core/coverage_server.py index 4e3351b..df4119d 100644 --- a/drift/core/coverage_server.py +++ b/drift/core/coverage_server.py @@ -28,6 +28,7 @@ class CoverageSnapshotHandler(BaseHTTPRequestHandler): # Shared state set by start_coverage_server cov_instance = None source_root = None + _lock = threading.Lock() def do_GET(self): from urllib.parse import urlparse, parse_qs @@ -42,56 +43,57 @@ def do_GET(self): def _handle_snapshot(self, is_baseline: bool = False): try: - cov = self.__class__.cov_instance - source_root = self.__class__.source_root - - if cov is None: - self.send_response(500) - self.send_header("Content-Type", "application/json") - self.end_headers() - self.wfile.write(json.dumps({"ok": False, "error": "coverage not initialized"}).encode()) - return - - # Stop coverage to read data - cov.stop() - - coverage = {} - - if is_baseline: - # Baseline: return ALL coverable lines (including uncovered at count=0) - # This provides the denominator for coverage percentage. - # analysis2() returns (filename, statements, excluded, missing, formatted) - data = cov.get_data() - for filename in data.measured_files(): - if "site-packages" in filename or "lib/python" in filename: - continue - if source_root and not filename.startswith(source_root): - continue - try: - _, statements, _, missing, _ = cov.analysis2(filename) - missing_set = set(missing) - lines_map = {} - for line in statements: - lines_map[str(line)] = 0 if line in missing_set else 1 - if lines_map: - coverage[filename] = lines_map - except Exception: - continue - else: - # Regular snapshot: only executed lines since last reset - data = cov.get_data() - for filename in data.measured_files(): - if "site-packages" in filename or "lib/python" in filename: - continue - if source_root and not filename.startswith(source_root): - continue - lines = data.lines(filename) - if lines: - coverage[filename] = {str(line): 1 for line in lines} - - # Erase data and restart for next test - cov.erase() - cov.start() + with self.__class__._lock: + cov = self.__class__.cov_instance + source_root = self.__class__.source_root + + if cov is None: + self.send_response(500) + self.send_header("Content-Type", "application/json") + self.end_headers() + self.wfile.write(json.dumps({"ok": False, "error": "coverage not initialized"}).encode()) + return + + # Stop coverage to read data + cov.stop() + + coverage = {} + + if is_baseline: + # Baseline: return ALL coverable lines (including uncovered at count=0) + # This provides the denominator for coverage percentage. + # analysis2() returns (filename, statements, excluded, missing, formatted) + data = cov.get_data() + for filename in data.measured_files(): + if "site-packages" in filename or "lib/python" in filename: + continue + if source_root and not filename.startswith(source_root): + continue + try: + _, statements, _, missing, _ = cov.analysis2(filename) + missing_set = set(missing) + lines_map = {} + for line in statements: + lines_map[str(line)] = 0 if line in missing_set else 1 + if lines_map: + coverage[filename] = lines_map + except Exception: + continue + else: + # Regular snapshot: only executed lines since last reset + data = cov.get_data() + for filename in data.measured_files(): + if "site-packages" in filename or "lib/python" in filename: + continue + if source_root and not filename.startswith(source_root): + continue + lines = data.lines(filename) + if lines: + coverage[filename] = {str(line): 1 for line in lines} + + # Erase data and restart for next test + cov.erase() + cov.start() self.send_response(200) self.send_header("Content-Type", "application/json") @@ -109,11 +111,16 @@ def log_message(self, format, *args): pass +_coverage_server: HTTPServer | None = None + + def start_coverage_server(port: int | None = None) -> bool: """Start the coverage snapshot server if TUSK_COVERAGE_PORT is set. Returns True if the server was started, False otherwise. """ + global _coverage_server + port_str = os.environ.get("TUSK_COVERAGE_PORT") if not port_str and port is None: return False @@ -151,8 +158,17 @@ def start_coverage_server(port: int | None = None) -> bool: # Start HTTP server in a daemon thread http_server = HTTPServer(("127.0.0.1", actual_port), CoverageSnapshotHandler) + _coverage_server = http_server thread = threading.Thread(target=http_server.serve_forever, daemon=True) thread.start() logger.info(f"Coverage snapshot server listening on port {actual_port}") return True + + +def stop_coverage_server() -> None: + """Shut down the coverage snapshot server if running.""" + global _coverage_server + if _coverage_server is not None: + _coverage_server.shutdown() + _coverage_server = None diff --git a/drift/core/drift_sdk.py b/drift/core/drift_sdk.py index ffacf0b..f8ad573 100644 --- a/drift/core/drift_sdk.py +++ b/drift/core/drift_sdk.py @@ -831,6 +831,8 @@ def shutdown(self) -> None: """Shutdown the SDK.""" import asyncio + from .coverage_server import stop_coverage_server + # Shutdown OpenTelemetry tracer provider if self._td_span_processor is not None: self._td_span_processor.shutdown() @@ -851,3 +853,5 @@ def shutdown(self) -> None: TraceBlockingManager.get_instance().shutdown() except Exception as e: logger.error(f"Error shutting down trace blocking manager: {e}") + + stop_coverage_server() From da89503f9788ddbd050ffa4eed8e2c8310debe12 Mon Sep 17 00:00:00 2001 From: Sohil Kshirsagar Date: Wed, 1 Apr 2026 17:33:41 -0700 Subject: [PATCH 04/13] feat: add branch coverage tracking - Enable branch=True in coverage.py initialization - Extract branch data via cov._analyze(filename) API: - numbers.n_branches, n_missing_branches for totals - missing_branch_arcs() for per-line branch detail - Return branch data in /snapshot response alongside line coverage - Python shows accurate branch coverage (93.3% for demo app) --- drift/core/coverage_server.py | 71 +++++++++++++++++++++++++++++++++-- 1 file changed, 67 insertions(+), 4 deletions(-) diff --git a/drift/core/coverage_server.py b/drift/core/coverage_server.py index df4119d..709c14d 100644 --- a/drift/core/coverage_server.py +++ b/drift/core/coverage_server.py @@ -61,8 +61,7 @@ def _handle_snapshot(self, is_baseline: bool = False): if is_baseline: # Baseline: return ALL coverable lines (including uncovered at count=0) - # This provides the denominator for coverage percentage. - # analysis2() returns (filename, statements, excluded, missing, formatted) + # plus branch coverage data. data = cov.get_data() for filename in data.measured_files(): if "site-packages" in filename or "lib/python" in filename: @@ -75,8 +74,15 @@ def _handle_snapshot(self, is_baseline: bool = False): lines_map = {} for line in statements: lines_map[str(line)] = 0 if line in missing_set else 1 + + # Branch data from coverage.py + branch_data = _get_branch_data(cov, data, filename) + if lines_map: - coverage[filename] = lines_map + coverage[filename] = { + "lines": lines_map, + **branch_data, + } except Exception: continue else: @@ -89,7 +95,11 @@ def _handle_snapshot(self, is_baseline: bool = False): continue lines = data.lines(filename) if lines: - coverage[filename] = {str(line): 1 for line in lines} + branch_data = _get_branch_data(cov, data, filename) + coverage[filename] = { + "lines": {str(line): 1 for line in lines}, + **branch_data, + } # Erase data and restart for next test cov.erase() @@ -111,6 +121,58 @@ def log_message(self, format, *args): pass +def _get_branch_data(cov, data, filename: str) -> dict: + """Extract branch coverage data for a file. + + Returns dict with totalBranches, coveredBranches, and per-line branch detail. + Uses coverage.py's analysis API which tracks branches as arcs (from_line, to_line). + """ + try: + if not data.has_arcs(): + return {"totalBranches": 0, "coveredBranches": 0, "branches": {}} + + # Use internal _analyze for full branch analysis + analysis = cov._analyze(filename) + numbers = analysis.numbers + + total_branches = numbers.n_branches + covered_branches = total_branches - numbers.n_missing_branches + + # Get per-line branch detail from missing_branch_arcs + missing_arcs = analysis.missing_branch_arcs() + executed_arcs = set(data.arcs(filename) or []) + + # Build per-line branch info + # Collect all branch source lines from both executed and missing + branch_lines: dict[int, dict] = {} # from_line -> {total, covered} + + # Count executed arcs by source line + for from_line, to_line in executed_arcs: + if from_line < 0: # negative = entry/exit arcs, skip + continue + if from_line not in branch_lines: + branch_lines[from_line] = {"total": 0, "covered": 0} + branch_lines[from_line]["total"] += 1 + branch_lines[from_line]["covered"] += 1 + + # Count missing arcs by source line + for from_line, to_lines in missing_arcs.items(): + if from_line not in branch_lines: + branch_lines[from_line] = {"total": 0, "covered": 0} + branch_lines[from_line]["total"] += len(to_lines) + + # Convert to string keys + branches = {str(line): info for line, info in branch_lines.items()} + + return { + "totalBranches": total_branches, + "coveredBranches": covered_branches, + "branches": branches, + } + except Exception: + return {"totalBranches": 0, "coveredBranches": 0, "branches": {}} + + _coverage_server: HTTPServer | None = None @@ -142,6 +204,7 @@ def start_coverage_server(port: int | None = None) -> bool: # Start coverage collection cov = coverage_module.Coverage( source=[source_root], + branch=True, # Enable branch coverage tracking omit=[ "*/site-packages/*", "*/venv/*", From 020e2afe04839e424f85bf99c55ff7ecc2db9cfb Mon Sep 17 00:00:00 2001 From: Sohil Kshirsagar Date: Wed, 1 Apr 2026 19:19:28 -0700 Subject: [PATCH 05/13] wip: migrate coverage to protobuf channel (Python handler timing out - needs debug) --- drift/core/communication/communicator.py | 58 +++++++++++++ drift/core/communication/types.py | 3 + drift/core/coverage_server.py | 101 +++++++++++++++++++---- drift/core/drift_sdk.py | 7 +- 4 files changed, 152 insertions(+), 17 deletions(-) diff --git a/drift/core/communication/communicator.py b/drift/core/communication/communicator.py index 6ae4361..e6afebc 100644 --- a/drift/core/communication/communicator.py +++ b/drift/core/communication/communicator.py @@ -16,8 +16,11 @@ from ..span_serialization import clean_span_to_proto from ..types import CleanSpanData, calling_library_context from .types import ( + BranchInfo, CliMessage, ConnectRequest, + CoverageSnapshotResponse, + FileCoverageData, GetMockRequest, InstrumentationVersionMismatchAlert, MessageType, @@ -750,6 +753,10 @@ def _background_read_loop(self) -> None: self._handle_set_time_travel_sync(cli_message) continue + if cli_message.type == MessageType.COVERAGE_SNAPSHOT: + self._handle_coverage_snapshot_sync(cli_message) + continue + # Route responses to waiting callers by request_id request_id = cli_message.request_id if request_id: @@ -809,6 +816,57 @@ def _handle_set_time_travel_sync(self, cli_message: CliMessage) -> None: except Exception as e: logger.error(f"Failed to send SetTimeTravel response: {e}") + def _handle_coverage_snapshot_sync(self, cli_message: CliMessage) -> None: + """Handle CoverageSnapshot request from CLI and send response.""" + request = cli_message.coverage_snapshot_request + if not request: + return + + logger.debug(f"Received CoverageSnapshot request: baseline={request.baseline}") + + try: + from ..coverage_server import take_coverage_snapshot + + result = take_coverage_snapshot(request.baseline) + + # Convert to protobuf + coverage: dict[str, FileCoverageData] = {} + for file_path, file_data in result.items(): + branches: dict[str, BranchInfo] = {} + for line, branch_info in file_data.get("branches", {}).items(): + branches[line] = BranchInfo( + total=branch_info.get("total", 0), + covered=branch_info.get("covered", 0), + ) + + coverage[file_path] = FileCoverageData( + lines=file_data.get("lines", {}), + total_branches=file_data.get("totalBranches", 0), + covered_branches=file_data.get("coveredBranches", 0), + branches=branches, + ) + + response = CoverageSnapshotResponse( + success=True, + error="", + coverage=coverage, + ) + except Exception as e: + logger.error(f"Failed to take coverage snapshot: {e}") + response = CoverageSnapshotResponse(success=False, error=str(e)) + + sdk_message = SdkMessage( + type=MessageType.COVERAGE_SNAPSHOT, + request_id=cli_message.request_id, + coverage_snapshot_response=response, + ) + + try: + self._send_message_sync(sdk_message) + logger.debug(f"Sent CoverageSnapshot response: success={response.success}") + except Exception as e: + logger.error(f"Failed to send CoverageSnapshot response: {e}") + def _send_message_sync(self, message: SdkMessage) -> None: """Send a message synchronously on the main socket.""" if not self._socket: diff --git a/drift/core/communication/types.py b/drift/core/communication/types.py index 0ecd7aa..298a203 100644 --- a/drift/core/communication/types.py +++ b/drift/core/communication/types.py @@ -43,7 +43,10 @@ from typing import Any from tusk.drift.core.v1 import ( + BranchInfo, CliMessage, + CoverageSnapshotResponse, + FileCoverageData, InstrumentationVersionMismatchAlert, MessageType, Runtime, diff --git a/drift/core/coverage_server.py b/drift/core/coverage_server.py index 709c14d..1a1fd5b 100644 --- a/drift/core/coverage_server.py +++ b/drift/core/coverage_server.py @@ -173,38 +173,90 @@ def _get_branch_data(cov, data, filename: str) -> dict: return {"totalBranches": 0, "coveredBranches": 0, "branches": {}} +def take_coverage_snapshot(baseline: bool = False) -> dict: + """Take a coverage snapshot (callable from both HTTP handler and protobuf handler). + + Returns dict of { filePath: { "lines": {...}, "totalBranches": N, ... } } + """ + cov = CoverageSnapshotHandler.cov_instance + source_root = CoverageSnapshotHandler.source_root + + if cov is None: + raise RuntimeError("Coverage not initialized") + + with CoverageSnapshotHandler._lock: + cov.stop() + coverage = {} + + if baseline: + data = cov.get_data() + for filename in data.measured_files(): + if "site-packages" in filename or "lib/python" in filename: + continue + if source_root and not filename.startswith(source_root): + continue + try: + _, statements, _, missing, _ = cov.analysis2(filename) + missing_set = set(missing) + lines_map = {} + for line in statements: + lines_map[str(line)] = 0 if line in missing_set else 1 + branch_data = _get_branch_data(cov, data, filename) + if lines_map: + coverage[filename] = {"lines": lines_map, **branch_data} + except Exception: + continue + else: + data = cov.get_data() + for filename in data.measured_files(): + if "site-packages" in filename or "lib/python" in filename: + continue + if source_root and not filename.startswith(source_root): + continue + lines = data.lines(filename) + if lines: + branch_data = _get_branch_data(cov, data, filename) + coverage[filename] = { + "lines": {str(line): 1 for line in lines}, + **branch_data, + } + + cov.erase() + cov.start() + + return coverage + + _coverage_server: HTTPServer | None = None -def start_coverage_server(port: int | None = None) -> bool: - """Start the coverage snapshot server if TUSK_COVERAGE_PORT is set. +def start_coverage_collection() -> bool: + """Initialize coverage.py collection if NODE_V8_COVERAGE is set. - Returns True if the server was started, False otherwise. - """ - global _coverage_server + Coverage data is accessed via take_coverage_snapshot() which can be called + from the protobuf handler or HTTP server. - port_str = os.environ.get("TUSK_COVERAGE_PORT") - if not port_str and port is None: + Returns True if coverage was started, False otherwise. + """ + # NODE_V8_COVERAGE is set by the CLI when coverage is enabled. + # Python doesn't use V8 but we use the same env var as the signal. + if not os.environ.get("NODE_V8_COVERAGE"): return False - actual_port = port or int(port_str) - - # Try to import coverage try: import coverage as coverage_module except ImportError: logger.warning( - "TUSK_COVERAGE_PORT is set but 'coverage' package is not installed. " + "Coverage requested but 'coverage' package is not installed. " "Install it with: pip install coverage" ) return False source_root = os.getcwd() - # Start coverage collection cov = coverage_module.Coverage( source=[source_root], - branch=True, # Enable branch coverage tracking + branch=True, omit=[ "*/site-packages/*", "*/venv/*", @@ -215,10 +267,31 @@ def start_coverage_server(port: int | None = None) -> bool: ) cov.start() - # Set shared state on the handler class CoverageSnapshotHandler.cov_instance = cov CoverageSnapshotHandler.source_root = source_root + logger.info("Coverage collection started") + return True + + +def start_coverage_server(port: int | None = None) -> bool: + """Start the coverage HTTP snapshot server (legacy, for non-protobuf mode). + + Returns True if the server was started, False otherwise. + """ + global _coverage_server + + port_str = os.environ.get("TUSK_COVERAGE_PORT") + if not port_str and port is None: + return False + + actual_port = port or int(port_str) + + # Ensure coverage is initialized + if CoverageSnapshotHandler.cov_instance is None: + if not start_coverage_collection(): + return False + # Start HTTP server in a daemon thread http_server = HTTPServer(("127.0.0.1", actual_port), CoverageSnapshotHandler) _coverage_server = http_server diff --git a/drift/core/drift_sdk.py b/drift/core/drift_sdk.py index f8ad573..16787b7 100644 --- a/drift/core/drift_sdk.py +++ b/drift/core/drift_sdk.py @@ -160,9 +160,10 @@ def initialize( configure_logger(log_level=log_level, prefix="TuskDrift") - # Start coverage server early (before any SDK mode checks that might return early) - from .coverage_server import start_coverage_server - start_coverage_server() + # Start coverage collection early (before any SDK mode checks that might return early). + # Coverage data is accessed via protobuf channel (communicator handles requests). + from .coverage_server import start_coverage_collection + start_coverage_collection() instance._init_params = { "api_key": api_key, From b145c08fdb4ae777797323ea2016ae4da3601e05 Mon Sep 17 00:00:00 2001 From: Sohil Kshirsagar Date: Wed, 1 Apr 2026 19:30:30 -0700 Subject: [PATCH 06/13] fix: Python protobuf coverage handler - use 'is None' not truthiness betterproto treats messages with all default values as falsy. CoverageSnapshotRequest(baseline=False) was falsy, causing per-test snapshots to be skipped. Changed 'if not request' to 'if request is None'. Also separated coverage initialization from HTTP server so coverage.py starts via start_coverage_collection() for the protobuf channel. Extracted take_coverage_snapshot() as reusable function. --- drift/core/communication/communicator.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/drift/core/communication/communicator.py b/drift/core/communication/communicator.py index e6afebc..b7247c7 100644 --- a/drift/core/communication/communicator.py +++ b/drift/core/communication/communicator.py @@ -819,7 +819,7 @@ def _handle_set_time_travel_sync(self, cli_message: CliMessage) -> None: def _handle_coverage_snapshot_sync(self, cli_message: CliMessage) -> None: """Handle CoverageSnapshot request from CLI and send response.""" request = cli_message.coverage_snapshot_request - if not request: + if request is None: return logger.debug(f"Received CoverageSnapshot request: baseline={request.baseline}") @@ -865,7 +865,7 @@ def _handle_coverage_snapshot_sync(self, cli_message: CliMessage) -> None: self._send_message_sync(sdk_message) logger.debug(f"Sent CoverageSnapshot response: success={response.success}") except Exception as e: - logger.error(f"Failed to send CoverageSnapshot response: {e}") + logger.error(f"[coverage] Failed to send response: {e}") def _send_message_sync(self, message: SdkMessage) -> None: """Send a message synchronously on the main socket.""" From e0e69459401132849806182a0ac6e90bc9f00520 Mon Sep 17 00:00:00 2001 From: Sohil Kshirsagar Date: Wed, 1 Apr 2026 19:41:00 -0700 Subject: [PATCH 07/13] refactor: remove HTTP server, clean up coverage module - Remove HTTP server code (CoverageSnapshotHandler, start_coverage_server, _coverage_server global, HTTPServer import) - Replace with clean module-level state (_cov_instance, _source_root, _lock) - Extract _is_user_file() helper - stop_coverage_server() -> stop_coverage_collection() - Update module docstring to reflect protobuf-only architecture --- drift/core/coverage_server.py | 355 +++++++++++----------------------- drift/core/drift_sdk.py | 4 +- 2 files changed, 118 insertions(+), 241 deletions(-) diff --git a/drift/core/coverage_server.py b/drift/core/coverage_server.py index 1a1fd5b..8ce361b 100644 --- a/drift/core/coverage_server.py +++ b/drift/core/coverage_server.py @@ -1,310 +1,187 @@ -"""Coverage snapshot HTTP server for Python SDK. - -When TUSK_COVERAGE_PORT is set, starts a tiny HTTP server that manages -coverage.py. On each /snapshot request: -1. Stop coverage collection -2. Get coverage data (which lines were executed since last snapshot) -3. Erase coverage data (reset for next test) -4. Restart coverage collection -5. Return per-file line counts as JSON - -This gives clean per-test coverage data - no diffing needed. +"""Coverage collection for Python SDK. + +Manages coverage.py for collecting per-test code coverage data. +Coverage data is accessed via take_coverage_snapshot() which is called +from the protobuf communicator handler. + +Flow: +1. start_coverage_collection() initializes coverage.py at SDK startup +2. Between tests, CLI sends CoverageSnapshotRequest via protobuf +3. Communicator calls take_coverage_snapshot(baseline) +4. For baseline: returns ALL coverable lines (including uncovered at count=0) +5. For per-test: returns only executed lines since last snapshot, then resets """ from __future__ import annotations -import json import logging import os import threading -from http.server import HTTPServer, BaseHTTPRequestHandler logger = logging.getLogger("TuskDrift") +# Shared state for coverage collection +_cov_instance = None +_source_root: str | None = None +_lock = threading.Lock() -class CoverageSnapshotHandler(BaseHTTPRequestHandler): - """HTTP handler for coverage snapshot requests.""" - # Shared state set by start_coverage_server - cov_instance = None - source_root = None - _lock = threading.Lock() - - def do_GET(self): - from urllib.parse import urlparse, parse_qs - parsed = urlparse(self.path) - if parsed.path == "/snapshot": - params = parse_qs(parsed.query) - is_baseline = params.get("baseline", ["false"])[0] == "true" - self._handle_snapshot(is_baseline) - else: - self.send_response(404) - self.end_headers() +def start_coverage_collection() -> bool: + """Initialize coverage.py collection if NODE_V8_COVERAGE is set. - def _handle_snapshot(self, is_baseline: bool = False): - try: - with self.__class__._lock: - cov = self.__class__.cov_instance - source_root = self.__class__.source_root - - if cov is None: - self.send_response(500) - self.send_header("Content-Type", "application/json") - self.end_headers() - self.wfile.write(json.dumps({"ok": False, "error": "coverage not initialized"}).encode()) - return - - # Stop coverage to read data - cov.stop() - - coverage = {} - - if is_baseline: - # Baseline: return ALL coverable lines (including uncovered at count=0) - # plus branch coverage data. - data = cov.get_data() - for filename in data.measured_files(): - if "site-packages" in filename or "lib/python" in filename: - continue - if source_root and not filename.startswith(source_root): - continue - try: - _, statements, _, missing, _ = cov.analysis2(filename) - missing_set = set(missing) - lines_map = {} - for line in statements: - lines_map[str(line)] = 0 if line in missing_set else 1 - - # Branch data from coverage.py - branch_data = _get_branch_data(cov, data, filename) - - if lines_map: - coverage[filename] = { - "lines": lines_map, - **branch_data, - } - except Exception: - continue - else: - # Regular snapshot: only executed lines since last reset - data = cov.get_data() - for filename in data.measured_files(): - if "site-packages" in filename or "lib/python" in filename: - continue - if source_root and not filename.startswith(source_root): - continue - lines = data.lines(filename) - if lines: - branch_data = _get_branch_data(cov, data, filename) - coverage[filename] = { - "lines": {str(line): 1 for line in lines}, - **branch_data, - } - - # Erase data and restart for next test - cov.erase() - cov.start() - - self.send_response(200) - self.send_header("Content-Type", "application/json") - self.end_headers() - self.wfile.write(json.dumps({"ok": True, "coverage": coverage}).encode()) - - except Exception as e: - self.send_response(500) - self.send_header("Content-Type", "application/json") - self.end_headers() - self.wfile.write(json.dumps({"ok": False, "error": str(e)}).encode()) - - def log_message(self, format, *args): - """Suppress default HTTP server logging.""" - pass - - -def _get_branch_data(cov, data, filename: str) -> dict: - """Extract branch coverage data for a file. + NODE_V8_COVERAGE is set by the CLI when coverage is enabled. + Python doesn't use V8 but we use the same env var as the signal. - Returns dict with totalBranches, coveredBranches, and per-line branch detail. - Uses coverage.py's analysis API which tracks branches as arcs (from_line, to_line). + Returns True if coverage was started, False otherwise. """ - try: - if not data.has_arcs(): - return {"totalBranches": 0, "coveredBranches": 0, "branches": {}} + global _cov_instance, _source_root - # Use internal _analyze for full branch analysis - analysis = cov._analyze(filename) - numbers = analysis.numbers - - total_branches = numbers.n_branches - covered_branches = total_branches - numbers.n_missing_branches + if not os.environ.get("NODE_V8_COVERAGE"): + return False - # Get per-line branch detail from missing_branch_arcs - missing_arcs = analysis.missing_branch_arcs() - executed_arcs = set(data.arcs(filename) or []) + try: + import coverage as coverage_module + except ImportError: + logger.warning( + "Coverage requested but 'coverage' package is not installed. " + "Install it with: pip install coverage" + ) + return False - # Build per-line branch info - # Collect all branch source lines from both executed and missing - branch_lines: dict[int, dict] = {} # from_line -> {total, covered} + _source_root = os.getcwd() - # Count executed arcs by source line - for from_line, to_line in executed_arcs: - if from_line < 0: # negative = entry/exit arcs, skip - continue - if from_line not in branch_lines: - branch_lines[from_line] = {"total": 0, "covered": 0} - branch_lines[from_line]["total"] += 1 - branch_lines[from_line]["covered"] += 1 + _cov_instance = coverage_module.Coverage( + source=[_source_root], + branch=True, + omit=[ + "*/site-packages/*", + "*/venv/*", + "*/.venv/*", + "*/test*", + "*/__pycache__/*", + ], + ) + _cov_instance.start() - # Count missing arcs by source line - for from_line, to_lines in missing_arcs.items(): - if from_line not in branch_lines: - branch_lines[from_line] = {"total": 0, "covered": 0} - branch_lines[from_line]["total"] += len(to_lines) + logger.info("Coverage collection started") + return True - # Convert to string keys - branches = {str(line): info for line, info in branch_lines.items()} - return { - "totalBranches": total_branches, - "coveredBranches": covered_branches, - "branches": branches, - } - except Exception: - return {"totalBranches": 0, "coveredBranches": 0, "branches": {}} +def stop_coverage_collection() -> None: + """Stop coverage collection and clean up.""" + global _cov_instance + if _cov_instance is not None: + try: + _cov_instance.stop() + except Exception: + pass + _cov_instance = None def take_coverage_snapshot(baseline: bool = False) -> dict: - """Take a coverage snapshot (callable from both HTTP handler and protobuf handler). + """Take a coverage snapshot. - Returns dict of { filePath: { "lines": {...}, "totalBranches": N, ... } } - """ - cov = CoverageSnapshotHandler.cov_instance - source_root = CoverageSnapshotHandler.source_root + Called from the protobuf communicator handler between tests. - if cov is None: + Args: + baseline: If True, returns ALL coverable lines (including uncovered at count=0) + for computing the total coverage denominator. + If False, returns only lines executed since the last snapshot. + + Returns: + dict of { filePath: { "lines": {...}, "totalBranches": N, "coveredBranches": N, "branches": {...} } } + """ + if _cov_instance is None: raise RuntimeError("Coverage not initialized") - with CoverageSnapshotHandler._lock: - cov.stop() + with _lock: + _cov_instance.stop() coverage = {} if baseline: - data = cov.get_data() + data = _cov_instance.get_data() for filename in data.measured_files(): - if "site-packages" in filename or "lib/python" in filename: - continue - if source_root and not filename.startswith(source_root): + if not _is_user_file(filename): continue try: - _, statements, _, missing, _ = cov.analysis2(filename) + _, statements, _, missing, _ = _cov_instance.analysis2(filename) missing_set = set(missing) lines_map = {} for line in statements: lines_map[str(line)] = 0 if line in missing_set else 1 - branch_data = _get_branch_data(cov, data, filename) + branch_data = _get_branch_data(data, filename) if lines_map: coverage[filename] = {"lines": lines_map, **branch_data} except Exception: continue else: - data = cov.get_data() + data = _cov_instance.get_data() for filename in data.measured_files(): - if "site-packages" in filename or "lib/python" in filename: - continue - if source_root and not filename.startswith(source_root): + if not _is_user_file(filename): continue lines = data.lines(filename) if lines: - branch_data = _get_branch_data(cov, data, filename) + branch_data = _get_branch_data(data, filename) coverage[filename] = { "lines": {str(line): 1 for line in lines}, **branch_data, } - cov.erase() - cov.start() + _cov_instance.erase() + _cov_instance.start() return coverage -_coverage_server: HTTPServer | None = None - - -def start_coverage_collection() -> bool: - """Initialize coverage.py collection if NODE_V8_COVERAGE is set. - - Coverage data is accessed via take_coverage_snapshot() which can be called - from the protobuf handler or HTTP server. - - Returns True if coverage was started, False otherwise. - """ - # NODE_V8_COVERAGE is set by the CLI when coverage is enabled. - # Python doesn't use V8 but we use the same env var as the signal. - if not os.environ.get("NODE_V8_COVERAGE"): +def _is_user_file(filename: str) -> bool: + """Check if a file is a user source file (not third-party).""" + if "site-packages" in filename or "lib/python" in filename: return False - - try: - import coverage as coverage_module - except ImportError: - logger.warning( - "Coverage requested but 'coverage' package is not installed. " - "Install it with: pip install coverage" - ) + if _source_root and not filename.startswith(_source_root): return False - - source_root = os.getcwd() - - cov = coverage_module.Coverage( - source=[source_root], - branch=True, - omit=[ - "*/site-packages/*", - "*/venv/*", - "*/.venv/*", - "*/test*", - "*/__pycache__/*", - ], - ) - cov.start() - - CoverageSnapshotHandler.cov_instance = cov - CoverageSnapshotHandler.source_root = source_root - - logger.info("Coverage collection started") return True -def start_coverage_server(port: int | None = None) -> bool: - """Start the coverage HTTP snapshot server (legacy, for non-protobuf mode). +def _get_branch_data(data, filename: str) -> dict: + """Extract branch coverage data for a file. - Returns True if the server was started, False otherwise. + Uses coverage.py's arc tracking (from_line, to_line) to compute + per-line branch coverage. """ - global _coverage_server + try: + if not data.has_arcs(): + return {"totalBranches": 0, "coveredBranches": 0, "branches": {}} - port_str = os.environ.get("TUSK_COVERAGE_PORT") - if not port_str and port is None: - return False + analysis = _cov_instance._analyze(filename) + numbers = analysis.numbers - actual_port = port or int(port_str) + total_branches = numbers.n_branches + covered_branches = max(0, total_branches - numbers.n_missing_branches) - # Ensure coverage is initialized - if CoverageSnapshotHandler.cov_instance is None: - if not start_coverage_collection(): - return False + missing_arcs = analysis.missing_branch_arcs() + executed_arcs = set(data.arcs(filename) or []) - # Start HTTP server in a daemon thread - http_server = HTTPServer(("127.0.0.1", actual_port), CoverageSnapshotHandler) - _coverage_server = http_server - thread = threading.Thread(target=http_server.serve_forever, daemon=True) - thread.start() + branch_lines: dict[int, dict] = {} - logger.info(f"Coverage snapshot server listening on port {actual_port}") - return True + for from_line, to_line in executed_arcs: + if from_line < 0: + continue + if from_line not in branch_lines: + branch_lines[from_line] = {"total": 0, "covered": 0} + branch_lines[from_line]["total"] += 1 + branch_lines[from_line]["covered"] += 1 + + for from_line, to_lines in missing_arcs.items(): + if from_line not in branch_lines: + branch_lines[from_line] = {"total": 0, "covered": 0} + branch_lines[from_line]["total"] += len(to_lines) + branches = {str(line): info for line, info in branch_lines.items()} -def stop_coverage_server() -> None: - """Shut down the coverage snapshot server if running.""" - global _coverage_server - if _coverage_server is not None: - _coverage_server.shutdown() - _coverage_server = None + return { + "totalBranches": total_branches, + "coveredBranches": covered_branches, + "branches": branches, + } + except Exception: + return {"totalBranches": 0, "coveredBranches": 0, "branches": {}} diff --git a/drift/core/drift_sdk.py b/drift/core/drift_sdk.py index 16787b7..a420547 100644 --- a/drift/core/drift_sdk.py +++ b/drift/core/drift_sdk.py @@ -832,7 +832,7 @@ def shutdown(self) -> None: """Shutdown the SDK.""" import asyncio - from .coverage_server import stop_coverage_server + from .coverage_server import stop_coverage_collection # Shutdown OpenTelemetry tracer provider if self._td_span_processor is not None: @@ -855,4 +855,4 @@ def shutdown(self) -> None: except Exception as e: logger.error(f"Error shutting down trace blocking manager: {e}") - stop_coverage_server() + stop_coverage_collection() From 5e4aa978df0401993a963c9bfd042be6aeb4b72c Mon Sep 17 00:00:00 2001 From: Sohil Kshirsagar Date: Wed, 1 Apr 2026 19:46:37 -0700 Subject: [PATCH 08/13] fix: prod readiness - thread-safe coverage shutdown Add _lock protection to stop_coverage_collection() to prevent race condition where shutdown sets _cov_instance=None while a snapshot is in progress on the background reader thread. --- drift/core/coverage_server.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/drift/core/coverage_server.py b/drift/core/coverage_server.py index 8ce361b..16f935b 100644 --- a/drift/core/coverage_server.py +++ b/drift/core/coverage_server.py @@ -68,14 +68,15 @@ def start_coverage_collection() -> bool: def stop_coverage_collection() -> None: - """Stop coverage collection and clean up.""" + """Stop coverage collection and clean up. Thread-safe.""" global _cov_instance - if _cov_instance is not None: - try: - _cov_instance.stop() - except Exception: - pass - _cov_instance = None + with _lock: + if _cov_instance is not None: + try: + _cov_instance.stop() + except Exception: + pass + _cov_instance = None def take_coverage_snapshot(baseline: bool = False) -> dict: From 18e8349edf5655a1cbc62d748a1c1622815901ed Mon Sep 17 00:00:00 2001 From: Sohil Kshirsagar Date: Wed, 1 Apr 2026 20:10:20 -0700 Subject: [PATCH 09/13] feat: use TUSK_COVERAGE instead of NODE_V8_COVERAGE for Python --- drift/core/coverage_server.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/drift/core/coverage_server.py b/drift/core/coverage_server.py index 16f935b..505c12a 100644 --- a/drift/core/coverage_server.py +++ b/drift/core/coverage_server.py @@ -36,7 +36,8 @@ def start_coverage_collection() -> bool: """ global _cov_instance, _source_root - if not os.environ.get("NODE_V8_COVERAGE"): + # TUSK_COVERAGE is the language-agnostic signal from the CLI + if not os.environ.get("TUSK_COVERAGE"): return False try: From 5d438df3b7e0293d436181b07fe7da31e0d07215 Mon Sep 17 00:00:00 2001 From: Sohil Kshirsagar Date: Fri, 3 Apr 2026 14:36:05 -0700 Subject: [PATCH 10/13] docs: add code coverage documentation Add docs/coverage.md explaining coverage.py integration, branch coverage via arc tracking, thread safety, and limitations. Update environment-variables.md with coverage env vars section. --- docs/coverage.md | 101 ++++++++++++++++++++++++++++++++++ docs/environment-variables.md | 13 +++++ 2 files changed, 114 insertions(+) create mode 100644 docs/coverage.md diff --git a/docs/coverage.md b/docs/coverage.md new file mode 100644 index 0000000..5252657 --- /dev/null +++ b/docs/coverage.md @@ -0,0 +1,101 @@ +# Code Coverage (Python) + +The Python SDK collects per-test code coverage during Tusk Drift replay using `coverage.py`. Unlike Node.js (which uses V8's built-in coverage), Python requires the `coverage` package to be installed. + +## Requirements + +```bash +pip install coverage +# or +pip install tusk-drift[coverage] +``` + +If `coverage` is not installed and `--coverage` is used, the SDK logs a warning and coverage is skipped — tests still run normally. + +## How It Works + +### coverage.py Integration + +When coverage is enabled (via `--show-coverage`, `--coverage-output`, or `coverage.enabled: true` in config), the CLI sets `TUSK_COVERAGE=true`. The SDK detects this during initialization and starts coverage.py: + +```python +# What the SDK does internally: +import coverage +cov = coverage.Coverage( + source=[os.path.realpath(os.getcwd())], + branch=True, + omit=["*/site-packages/*", "*/venv/*", "*/.venv/*", "*/test*", "*/__pycache__/*"], +) +cov.start() +``` + +Key points: +- `branch=True` enables branch coverage (arc-based tracking) +- `source` is set to the real path of the working directory (symlinks resolved) +- Third-party code (site-packages, venv) is excluded by default + +### Snapshot Flow + +1. **Baseline**: CLI sends `CoverageSnapshotRequest(baseline=true)`. The SDK: + - Calls `cov.stop()` + - Uses `cov.analysis2(filename)` for each measured file to get ALL coverable lines (statements + missing) + - Returns lines with count=0 for uncovered, count=1 for covered + - Calls `cov.erase()` then `cov.start()` to reset counters + +2. **Per-test**: CLI sends `CoverageSnapshotRequest(baseline=false)`. The SDK: + - Calls `cov.stop()` + - Uses `cov.get_data().lines(filename)` to get only executed lines since last reset + - Returns only covered lines (count=1) + - Calls `cov.erase()` then `cov.start()` to reset + +3. **Communication**: Results are sent back to the CLI via the existing protobuf channel — same socket used for replay. No HTTP server or extra ports. + +### Branch Coverage + +Branch coverage uses coverage.py's arc tracking. The SDK extracts per-line branch data using: + +```python +analysis = cov._analyze(filename) # Private API +missing_arcs = analysis.missing_branch_arcs() +executed_arcs = set(data.arcs(filename) or []) +``` + +For each branch point (line with multiple execution paths), the SDK reports: +- `total`: number of branch paths from that line +- `covered`: number of paths that were actually taken + +**Note:** `_analyze()` is a private coverage.py API. It's the only way to get per-line branch arc data. The public API (`analysis2()`) only provides aggregate branch counts. This means branch coverage may break on major coverage.py version upgrades. + +### Path Handling + +The SDK uses `os.path.realpath()` for the source root to handle symlinked project directories. File paths reported by coverage.py are also resolved via `realpath` before comparison. This prevents the silent failure where all files get filtered out because symlink paths don't match. + +## Environment Variables + +Set automatically by the CLI. You should not set these manually. + +| Variable | Description | +|----------|-------------| +| `TUSK_COVERAGE` | Set to `true` by the CLI when coverage is enabled. The SDK checks this to decide whether to start coverage.py. | + +Note: `NODE_V8_COVERAGE` is also set by the CLI (for Node.js), but the Python SDK ignores it — it only checks `TUSK_COVERAGE`. + +## Thread Safety + +Coverage collection uses a module-level lock (`threading.Lock`) to ensure thread safety: + +- `start_coverage_collection()`: Acquires lock while initializing. Guards against double initialization — if called twice, stops the existing instance first. +- `take_coverage_snapshot()`: Acquires lock for the entire stop/read/erase/start cycle. +- `stop_coverage_collection()`: Acquires lock while stopping and cleaning up. + +This is important because the protobuf communicator runs coverage handlers in a background thread. + +## Limitations + +- **`coverage` package required**: Unlike Node.js (V8 coverage is built-in), Python needs `pip install coverage`. If not installed, coverage silently doesn't work (warning logged). +- **Performance overhead**: coverage.py uses `sys.settrace()` which adds 10-30% execution overhead. V8 coverage is near-zero. This overhead only applies during `--coverage` replay runs. +- **Multi-process servers**: gunicorn with `--workers > 1` forks worker processes. The SDK starts coverage.py in the main process; forked workers don't inherit it. Use `--workers 1` during coverage runs. +- **Private API for branches**: `_analyze()` is not part of coverage.py's public API. Branch coverage detail may break on future coverage.py versions. +- **Python 3.12+ recommended for async**: coverage.py's `sys.settrace` can miss some async lines on Python < 3.12. Python 3.12+ uses `sys.monitoring` for better async tracking. +- **Startup ordering**: coverage.py starts during SDK initialization. Code that executes before `TuskDrift.initialize()` (e.g., module-level code in `tusk_drift_init.py`) isn't tracked. This is why `tusk_drift_init.py` typically shows 0% coverage. +- **C extensions invisible**: coverage.py can't track C extensions (numpy, Cython modules). Not relevant for typical web API servers. diff --git a/docs/environment-variables.md b/docs/environment-variables.md index 7e1e365..0425f1b 100644 --- a/docs/environment-variables.md +++ b/docs/environment-variables.md @@ -174,7 +174,20 @@ These variables configure how the SDK connects to the Tusk CLI during replay: These are typically set automatically by the Tusk CLI and do not need to be configured manually. +## Coverage Variables + +Set automatically by the CLI when `tusk drift run --coverage` is used. You should **not** set them manually. + +| Variable | Description | +|----------|-------------| +| `TUSK_COVERAGE` | Set to `true` when coverage is enabled. The SDK checks this to start coverage.py. | + +Note: `NODE_V8_COVERAGE` is also set by the CLI (for Node.js) but is ignored by the Python SDK. + +See [Coverage Guide](./coverage.md) for details on how coverage collection works. + ## Related Docs - [Initialization Guide](./initialization.md) - SDK initialization parameters and config file settings - [Quick Start Guide](./quickstart.md) - Record and replay your first trace +- [Coverage Guide](./coverage.md) - Code coverage during test replay From 2a383b0bab42930764c5bba30514fb9f502e0953 Mon Sep 17 00:00:00 2001 From: Sohil Kshirsagar Date: Fri, 3 Apr 2026 14:40:19 -0700 Subject: [PATCH 11/13] fix: coverage code quality improvements - Use getattr() for betterproto oneof field access (prevents AttributeError) - Fix _is_user_file path prefix collision (/app matching /application) - Add os.path.realpath() for symlink-safe path comparison - Add thread lock to start_coverage_collection() - Add double-init guard (stop existing instance before creating new) - Narrow */test* omit pattern to */tests/* and */test_*.py - Log failed file analysis at debug level instead of silent swallow --- drift/core/communication/communicator.py | 6 +-- drift/core/coverage_server.py | 51 +++++++++++++++--------- 2 files changed, 35 insertions(+), 22 deletions(-) diff --git a/drift/core/communication/communicator.py b/drift/core/communication/communicator.py index b7247c7..91da868 100644 --- a/drift/core/communication/communicator.py +++ b/drift/core/communication/communicator.py @@ -781,8 +781,8 @@ def _background_read_loop(self) -> None: def _handle_set_time_travel_sync(self, cli_message: CliMessage) -> None: """Handle SetTimeTravel request from CLI and send response.""" - request = cli_message.set_time_travel_request - if not request: + request = getattr(cli_message, "set_time_travel_request", None) + if request is None: return logger.debug( @@ -818,7 +818,7 @@ def _handle_set_time_travel_sync(self, cli_message: CliMessage) -> None: def _handle_coverage_snapshot_sync(self, cli_message: CliMessage) -> None: """Handle CoverageSnapshot request from CLI and send response.""" - request = cli_message.coverage_snapshot_request + request = getattr(cli_message, "coverage_snapshot_request", None) if request is None: return diff --git a/drift/core/coverage_server.py b/drift/core/coverage_server.py index 505c12a..02ddc4a 100644 --- a/drift/core/coverage_server.py +++ b/drift/core/coverage_server.py @@ -27,10 +27,10 @@ def start_coverage_collection() -> bool: - """Initialize coverage.py collection if NODE_V8_COVERAGE is set. + """Initialize coverage.py collection if TUSK_COVERAGE is set. - NODE_V8_COVERAGE is set by the CLI when coverage is enabled. - Python doesn't use V8 but we use the same env var as the signal. + TUSK_COVERAGE is set by the CLI when coverage is enabled. + This is the language-agnostic signal (Node uses NODE_V8_COVERAGE additionally). Returns True if coverage was started, False otherwise. """ @@ -49,20 +49,29 @@ def start_coverage_collection() -> bool: ) return False - _source_root = os.getcwd() - - _cov_instance = coverage_module.Coverage( - source=[_source_root], - branch=True, - omit=[ - "*/site-packages/*", - "*/venv/*", - "*/.venv/*", - "*/test*", - "*/__pycache__/*", - ], - ) - _cov_instance.start() + with _lock: + # Guard against double initialization — stop existing instance first + if _cov_instance is not None: + try: + _cov_instance.stop() + except Exception: + pass + + _source_root = os.path.realpath(os.getcwd()) + + _cov_instance = coverage_module.Coverage( + source=[_source_root], + branch=True, + omit=[ + "*/site-packages/*", + "*/venv/*", + "*/.venv/*", + "*/tests/*", + "*/test_*.py", + "*/__pycache__/*", + ], + ) + _cov_instance.start() logger.info("Coverage collection started") return True @@ -114,7 +123,8 @@ def take_coverage_snapshot(baseline: bool = False) -> dict: branch_data = _get_branch_data(data, filename) if lines_map: coverage[filename] = {"lines": lines_map, **branch_data} - except Exception: + except Exception as e: + logger.debug(f"Failed to analyze {filename}: {e}") continue else: data = _cov_instance.get_data() @@ -139,7 +149,10 @@ def _is_user_file(filename: str) -> bool: """Check if a file is a user source file (not third-party).""" if "site-packages" in filename or "lib/python" in filename: return False - if _source_root and not filename.startswith(_source_root): + # Resolve symlinks for consistent path comparison + resolved = os.path.realpath(filename) + # Use trailing separator to avoid prefix collisions (/app matching /application) + if _source_root and not (resolved.startswith(_source_root + os.sep) or resolved == _source_root): return False return True From 5b3354be752c1a67b86daa55161cbe51f37431cd Mon Sep 17 00:00:00 2001 From: Sohil Kshirsagar Date: Fri, 3 Apr 2026 15:02:45 -0700 Subject: [PATCH 12/13] docs: clean up AI writing patterns in coverage doc --- docs/coverage.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/coverage.md b/docs/coverage.md index 5252657..ac346ce 100644 --- a/docs/coverage.md +++ b/docs/coverage.md @@ -10,7 +10,7 @@ pip install coverage pip install tusk-drift[coverage] ``` -If `coverage` is not installed and `--coverage` is used, the SDK logs a warning and coverage is skipped — tests still run normally. +If `coverage` is not installed when coverage is enabled, the SDK logs a warning and coverage is skipped. Tests still run normally. ## How It Works @@ -24,7 +24,7 @@ import coverage cov = coverage.Coverage( source=[os.path.realpath(os.getcwd())], branch=True, - omit=["*/site-packages/*", "*/venv/*", "*/.venv/*", "*/test*", "*/__pycache__/*"], + omit=["*/site-packages/*", "*/venv/*", "*/.venv/*", "*/tests/*", "*/test_*.py", "*/__pycache__/*"], ) cov.start() ``` @@ -93,7 +93,7 @@ This is important because the protobuf communicator runs coverage handlers in a ## Limitations - **`coverage` package required**: Unlike Node.js (V8 coverage is built-in), Python needs `pip install coverage`. If not installed, coverage silently doesn't work (warning logged). -- **Performance overhead**: coverage.py uses `sys.settrace()` which adds 10-30% execution overhead. V8 coverage is near-zero. This overhead only applies during `--coverage` replay runs. +- **Performance overhead**: coverage.py uses `sys.settrace()` which adds 10-30% execution overhead. This only applies during coverage replay runs. - **Multi-process servers**: gunicorn with `--workers > 1` forks worker processes. The SDK starts coverage.py in the main process; forked workers don't inherit it. Use `--workers 1` during coverage runs. - **Private API for branches**: `_analyze()` is not part of coverage.py's public API. Branch coverage detail may break on future coverage.py versions. - **Python 3.12+ recommended for async**: coverage.py's `sys.settrace` can miss some async lines on Python < 3.12. Python 3.12+ uses `sys.monitoring` for better async tracking. From 4d156c4f3b5411d77175af74ba91559922ec63dc Mon Sep 17 00:00:00 2001 From: Sohil Kshirsagar Date: Fri, 3 Apr 2026 15:27:49 -0700 Subject: [PATCH 13/13] fix: address bugbot review feedback - Move _cov_instance None check inside lock (TOCTOU race fix) - Fix branch counting to only include actual branch points, not all arcs --- drift/core/coverage_server.py | 37 ++++++++++++++++++++++------------- 1 file changed, 23 insertions(+), 14 deletions(-) diff --git a/drift/core/coverage_server.py b/drift/core/coverage_server.py index 02ddc4a..53ea8aa 100644 --- a/drift/core/coverage_server.py +++ b/drift/core/coverage_server.py @@ -102,10 +102,9 @@ def take_coverage_snapshot(baseline: bool = False) -> dict: Returns: dict of { filePath: { "lines": {...}, "totalBranches": N, "coveredBranches": N, "branches": {...} } } """ - if _cov_instance is None: - raise RuntimeError("Coverage not initialized") - with _lock: + if _cov_instance is None: + raise RuntimeError("Coverage not initialized") _cov_instance.stop() coverage = {} @@ -176,20 +175,30 @@ def _get_branch_data(data, filename: str) -> dict: missing_arcs = analysis.missing_branch_arcs() executed_arcs = set(data.arcs(filename) or []) - branch_lines: dict[int, dict] = {} - + # Group executed arcs by from_line (skip negative entry arcs) + executed_by_line: dict[int, list[int]] = {} for from_line, to_line in executed_arcs: if from_line < 0: continue - if from_line not in branch_lines: - branch_lines[from_line] = {"total": 0, "covered": 0} - branch_lines[from_line]["total"] += 1 - branch_lines[from_line]["covered"] += 1 - - for from_line, to_lines in missing_arcs.items(): - if from_line not in branch_lines: - branch_lines[from_line] = {"total": 0, "covered": 0} - branch_lines[from_line]["total"] += len(to_lines) + executed_by_line.setdefault(from_line, []).append(to_line) + + # A line is a branch point if: + # - it appears in missing_arcs (at least one path wasn't taken), OR + # - it has multiple executed arcs (multiple paths from same line) + branch_point_lines = set(missing_arcs.keys()) + for from_line, to_lines in executed_by_line.items(): + if len(to_lines) > 1: + branch_point_lines.add(from_line) + + branch_lines: dict[int, dict] = {} + + for from_line in branch_point_lines: + executed_count = len(executed_by_line.get(from_line, [])) + missing_count = len(missing_arcs.get(from_line, [])) + branch_lines[from_line] = { + "total": executed_count + missing_count, + "covered": executed_count, + } branches = {str(line): info for line, info in branch_lines.items()}