diff --git a/docs/coverage.md b/docs/coverage.md new file mode 100644 index 0000000..ac346ce --- /dev/null +++ b/docs/coverage.md @@ -0,0 +1,101 @@ +# Code Coverage (Python) + +The Python SDK collects per-test code coverage during Tusk Drift replay using `coverage.py`. Unlike Node.js (which uses V8's built-in coverage), Python requires the `coverage` package to be installed. + +## Requirements + +```bash +pip install coverage +# or +pip install tusk-drift[coverage] +``` + +If `coverage` is not installed when coverage is enabled, the SDK logs a warning and coverage is skipped. Tests still run normally. + +## How It Works + +### coverage.py Integration + +When coverage is enabled (via `--show-coverage`, `--coverage-output`, or `coverage.enabled: true` in config), the CLI sets `TUSK_COVERAGE=true`. The SDK detects this during initialization and starts coverage.py: + +```python +# What the SDK does internally: +import coverage +cov = coverage.Coverage( + source=[os.path.realpath(os.getcwd())], + branch=True, + omit=["*/site-packages/*", "*/venv/*", "*/.venv/*", "*/tests/*", "*/test_*.py", "*/__pycache__/*"], +) +cov.start() +``` + +Key points: +- `branch=True` enables branch coverage (arc-based tracking) +- `source` is set to the real path of the working directory (symlinks resolved) +- Third-party code (site-packages, venv) is excluded by default + +### Snapshot Flow + +1. **Baseline**: CLI sends `CoverageSnapshotRequest(baseline=true)`. The SDK: + - Calls `cov.stop()` + - Uses `cov.analysis2(filename)` for each measured file to get ALL coverable lines (statements + missing) + - Returns lines with count=0 for uncovered, count=1 for covered + - Calls `cov.erase()` then `cov.start()` to reset counters + +2. **Per-test**: CLI sends `CoverageSnapshotRequest(baseline=false)`. The SDK: + - Calls `cov.stop()` + - Uses `cov.get_data().lines(filename)` to get only executed lines since last reset + - Returns only covered lines (count=1) + - Calls `cov.erase()` then `cov.start()` to reset + +3. **Communication**: Results are sent back to the CLI via the existing protobuf channel — same socket used for replay. No HTTP server or extra ports. + +### Branch Coverage + +Branch coverage uses coverage.py's arc tracking. The SDK extracts per-line branch data using: + +```python +analysis = cov._analyze(filename) # Private API +missing_arcs = analysis.missing_branch_arcs() +executed_arcs = set(data.arcs(filename) or []) +``` + +For each branch point (line with multiple execution paths), the SDK reports: +- `total`: number of branch paths from that line +- `covered`: number of paths that were actually taken + +**Note:** `_analyze()` is a private coverage.py API. It's the only way to get per-line branch arc data. The public API (`analysis2()`) only provides aggregate branch counts. This means branch coverage may break on major coverage.py version upgrades. + +### Path Handling + +The SDK uses `os.path.realpath()` for the source root to handle symlinked project directories. File paths reported by coverage.py are also resolved via `realpath` before comparison. This prevents the silent failure where all files get filtered out because symlink paths don't match. + +## Environment Variables + +Set automatically by the CLI. You should not set these manually. + +| Variable | Description | +|----------|-------------| +| `TUSK_COVERAGE` | Set to `true` by the CLI when coverage is enabled. The SDK checks this to decide whether to start coverage.py. | + +Note: `NODE_V8_COVERAGE` is also set by the CLI (for Node.js), but the Python SDK ignores it — it only checks `TUSK_COVERAGE`. + +## Thread Safety + +Coverage collection uses a module-level lock (`threading.Lock`) to ensure thread safety: + +- `start_coverage_collection()`: Acquires lock while initializing. Guards against double initialization — if called twice, stops the existing instance first. +- `take_coverage_snapshot()`: Acquires lock for the entire stop/read/erase/start cycle. +- `stop_coverage_collection()`: Acquires lock while stopping and cleaning up. + +This is important because the protobuf communicator runs coverage handlers in a background thread. + +## Limitations + +- **`coverage` package required**: Unlike Node.js (V8 coverage is built-in), Python needs `pip install coverage`. If not installed, coverage silently doesn't work (warning logged). +- **Performance overhead**: coverage.py uses `sys.settrace()` which adds 10-30% execution overhead. This only applies during coverage replay runs. +- **Multi-process servers**: gunicorn with `--workers > 1` forks worker processes. The SDK starts coverage.py in the main process; forked workers don't inherit it. Use `--workers 1` during coverage runs. +- **Private API for branches**: `_analyze()` is not part of coverage.py's public API. Branch coverage detail may break on future coverage.py versions. +- **Python 3.12+ recommended for async**: coverage.py's `sys.settrace` can miss some async lines on Python < 3.12. Python 3.12+ uses `sys.monitoring` for better async tracking. +- **Startup ordering**: coverage.py starts during SDK initialization. Code that executes before `TuskDrift.initialize()` (e.g., module-level code in `tusk_drift_init.py`) isn't tracked. This is why `tusk_drift_init.py` typically shows 0% coverage. +- **C extensions invisible**: coverage.py can't track C extensions (numpy, Cython modules). Not relevant for typical web API servers. diff --git a/docs/environment-variables.md b/docs/environment-variables.md index 7e1e365..0425f1b 100644 --- a/docs/environment-variables.md +++ b/docs/environment-variables.md @@ -174,7 +174,20 @@ These variables configure how the SDK connects to the Tusk CLI during replay: These are typically set automatically by the Tusk CLI and do not need to be configured manually. +## Coverage Variables + +Set automatically by the CLI when `tusk drift run --coverage` is used. You should **not** set them manually. + +| Variable | Description | +|----------|-------------| +| `TUSK_COVERAGE` | Set to `true` when coverage is enabled. The SDK checks this to start coverage.py. | + +Note: `NODE_V8_COVERAGE` is also set by the CLI (for Node.js) but is ignored by the Python SDK. + +See [Coverage Guide](./coverage.md) for details on how coverage collection works. + ## Related Docs - [Initialization Guide](./initialization.md) - SDK initialization parameters and config file settings - [Quick Start Guide](./quickstart.md) - Record and replay your first trace +- [Coverage Guide](./coverage.md) - Code coverage during test replay diff --git a/drift/core/communication/communicator.py b/drift/core/communication/communicator.py index 6ae4361..91da868 100644 --- a/drift/core/communication/communicator.py +++ b/drift/core/communication/communicator.py @@ -16,8 +16,11 @@ from ..span_serialization import clean_span_to_proto from ..types import CleanSpanData, calling_library_context from .types import ( + BranchInfo, CliMessage, ConnectRequest, + CoverageSnapshotResponse, + FileCoverageData, GetMockRequest, InstrumentationVersionMismatchAlert, MessageType, @@ -750,6 +753,10 @@ def _background_read_loop(self) -> None: self._handle_set_time_travel_sync(cli_message) continue + if cli_message.type == MessageType.COVERAGE_SNAPSHOT: + self._handle_coverage_snapshot_sync(cli_message) + continue + # Route responses to waiting callers by request_id request_id = cli_message.request_id if request_id: @@ -774,8 +781,8 @@ def _background_read_loop(self) -> None: def _handle_set_time_travel_sync(self, cli_message: CliMessage) -> None: """Handle SetTimeTravel request from CLI and send response.""" - request = cli_message.set_time_travel_request - if not request: + request = getattr(cli_message, "set_time_travel_request", None) + if request is None: return logger.debug( @@ -809,6 +816,57 @@ def _handle_set_time_travel_sync(self, cli_message: CliMessage) -> None: except Exception as e: logger.error(f"Failed to send SetTimeTravel response: {e}") + def _handle_coverage_snapshot_sync(self, cli_message: CliMessage) -> None: + """Handle CoverageSnapshot request from CLI and send response.""" + request = getattr(cli_message, "coverage_snapshot_request", None) + if request is None: + return + + logger.debug(f"Received CoverageSnapshot request: baseline={request.baseline}") + + try: + from ..coverage_server import take_coverage_snapshot + + result = take_coverage_snapshot(request.baseline) + + # Convert to protobuf + coverage: dict[str, FileCoverageData] = {} + for file_path, file_data in result.items(): + branches: dict[str, BranchInfo] = {} + for line, branch_info in file_data.get("branches", {}).items(): + branches[line] = BranchInfo( + total=branch_info.get("total", 0), + covered=branch_info.get("covered", 0), + ) + + coverage[file_path] = FileCoverageData( + lines=file_data.get("lines", {}), + total_branches=file_data.get("totalBranches", 0), + covered_branches=file_data.get("coveredBranches", 0), + branches=branches, + ) + + response = CoverageSnapshotResponse( + success=True, + error="", + coverage=coverage, + ) + except Exception as e: + logger.error(f"Failed to take coverage snapshot: {e}") + response = CoverageSnapshotResponse(success=False, error=str(e)) + + sdk_message = SdkMessage( + type=MessageType.COVERAGE_SNAPSHOT, + request_id=cli_message.request_id, + coverage_snapshot_response=response, + ) + + try: + self._send_message_sync(sdk_message) + logger.debug(f"Sent CoverageSnapshot response: success={response.success}") + except Exception as e: + logger.error(f"[coverage] Failed to send response: {e}") + def _send_message_sync(self, message: SdkMessage) -> None: """Send a message synchronously on the main socket.""" if not self._socket: diff --git a/drift/core/communication/types.py b/drift/core/communication/types.py index 0ecd7aa..298a203 100644 --- a/drift/core/communication/types.py +++ b/drift/core/communication/types.py @@ -43,7 +43,10 @@ from typing import Any from tusk.drift.core.v1 import ( + BranchInfo, CliMessage, + CoverageSnapshotResponse, + FileCoverageData, InstrumentationVersionMismatchAlert, MessageType, Runtime, diff --git a/drift/core/coverage_server.py b/drift/core/coverage_server.py new file mode 100644 index 0000000..53ea8aa --- /dev/null +++ b/drift/core/coverage_server.py @@ -0,0 +1,211 @@ +"""Coverage collection for Python SDK. + +Manages coverage.py for collecting per-test code coverage data. +Coverage data is accessed via take_coverage_snapshot() which is called +from the protobuf communicator handler. + +Flow: +1. start_coverage_collection() initializes coverage.py at SDK startup +2. Between tests, CLI sends CoverageSnapshotRequest via protobuf +3. Communicator calls take_coverage_snapshot(baseline) +4. For baseline: returns ALL coverable lines (including uncovered at count=0) +5. For per-test: returns only executed lines since last snapshot, then resets +""" + +from __future__ import annotations + +import logging +import os +import threading + +logger = logging.getLogger("TuskDrift") + +# Shared state for coverage collection +_cov_instance = None +_source_root: str | None = None +_lock = threading.Lock() + + +def start_coverage_collection() -> bool: + """Initialize coverage.py collection if TUSK_COVERAGE is set. + + TUSK_COVERAGE is set by the CLI when coverage is enabled. + This is the language-agnostic signal (Node uses NODE_V8_COVERAGE additionally). + + Returns True if coverage was started, False otherwise. + """ + global _cov_instance, _source_root + + # TUSK_COVERAGE is the language-agnostic signal from the CLI + if not os.environ.get("TUSK_COVERAGE"): + return False + + try: + import coverage as coverage_module + except ImportError: + logger.warning( + "Coverage requested but 'coverage' package is not installed. " + "Install it with: pip install coverage" + ) + return False + + with _lock: + # Guard against double initialization — stop existing instance first + if _cov_instance is not None: + try: + _cov_instance.stop() + except Exception: + pass + + _source_root = os.path.realpath(os.getcwd()) + + _cov_instance = coverage_module.Coverage( + source=[_source_root], + branch=True, + omit=[ + "*/site-packages/*", + "*/venv/*", + "*/.venv/*", + "*/tests/*", + "*/test_*.py", + "*/__pycache__/*", + ], + ) + _cov_instance.start() + + logger.info("Coverage collection started") + return True + + +def stop_coverage_collection() -> None: + """Stop coverage collection and clean up. Thread-safe.""" + global _cov_instance + with _lock: + if _cov_instance is not None: + try: + _cov_instance.stop() + except Exception: + pass + _cov_instance = None + + +def take_coverage_snapshot(baseline: bool = False) -> dict: + """Take a coverage snapshot. + + Called from the protobuf communicator handler between tests. + + Args: + baseline: If True, returns ALL coverable lines (including uncovered at count=0) + for computing the total coverage denominator. + If False, returns only lines executed since the last snapshot. + + Returns: + dict of { filePath: { "lines": {...}, "totalBranches": N, "coveredBranches": N, "branches": {...} } } + """ + with _lock: + if _cov_instance is None: + raise RuntimeError("Coverage not initialized") + _cov_instance.stop() + coverage = {} + + if baseline: + data = _cov_instance.get_data() + for filename in data.measured_files(): + if not _is_user_file(filename): + continue + try: + _, statements, _, missing, _ = _cov_instance.analysis2(filename) + missing_set = set(missing) + lines_map = {} + for line in statements: + lines_map[str(line)] = 0 if line in missing_set else 1 + branch_data = _get_branch_data(data, filename) + if lines_map: + coverage[filename] = {"lines": lines_map, **branch_data} + except Exception as e: + logger.debug(f"Failed to analyze {filename}: {e}") + continue + else: + data = _cov_instance.get_data() + for filename in data.measured_files(): + if not _is_user_file(filename): + continue + lines = data.lines(filename) + if lines: + branch_data = _get_branch_data(data, filename) + coverage[filename] = { + "lines": {str(line): 1 for line in lines}, + **branch_data, + } + + _cov_instance.erase() + _cov_instance.start() + + return coverage + + +def _is_user_file(filename: str) -> bool: + """Check if a file is a user source file (not third-party).""" + if "site-packages" in filename or "lib/python" in filename: + return False + # Resolve symlinks for consistent path comparison + resolved = os.path.realpath(filename) + # Use trailing separator to avoid prefix collisions (/app matching /application) + if _source_root and not (resolved.startswith(_source_root + os.sep) or resolved == _source_root): + return False + return True + + +def _get_branch_data(data, filename: str) -> dict: + """Extract branch coverage data for a file. + + Uses coverage.py's arc tracking (from_line, to_line) to compute + per-line branch coverage. + """ + try: + if not data.has_arcs(): + return {"totalBranches": 0, "coveredBranches": 0, "branches": {}} + + analysis = _cov_instance._analyze(filename) + numbers = analysis.numbers + + total_branches = numbers.n_branches + covered_branches = max(0, total_branches - numbers.n_missing_branches) + + missing_arcs = analysis.missing_branch_arcs() + executed_arcs = set(data.arcs(filename) or []) + + # Group executed arcs by from_line (skip negative entry arcs) + executed_by_line: dict[int, list[int]] = {} + for from_line, to_line in executed_arcs: + if from_line < 0: + continue + executed_by_line.setdefault(from_line, []).append(to_line) + + # A line is a branch point if: + # - it appears in missing_arcs (at least one path wasn't taken), OR + # - it has multiple executed arcs (multiple paths from same line) + branch_point_lines = set(missing_arcs.keys()) + for from_line, to_lines in executed_by_line.items(): + if len(to_lines) > 1: + branch_point_lines.add(from_line) + + branch_lines: dict[int, dict] = {} + + for from_line in branch_point_lines: + executed_count = len(executed_by_line.get(from_line, [])) + missing_count = len(missing_arcs.get(from_line, [])) + branch_lines[from_line] = { + "total": executed_count + missing_count, + "covered": executed_count, + } + + branches = {str(line): info for line, info in branch_lines.items()} + + return { + "totalBranches": total_branches, + "coveredBranches": covered_branches, + "branches": branches, + } + except Exception: + return {"totalBranches": 0, "coveredBranches": 0, "branches": {}} diff --git a/drift/core/drift_sdk.py b/drift/core/drift_sdk.py index 67927f4..a420547 100644 --- a/drift/core/drift_sdk.py +++ b/drift/core/drift_sdk.py @@ -160,6 +160,11 @@ def initialize( configure_logger(log_level=log_level, prefix="TuskDrift") + # Start coverage collection early (before any SDK mode checks that might return early). + # Coverage data is accessed via protobuf channel (communicator handles requests). + from .coverage_server import start_coverage_collection + start_coverage_collection() + instance._init_params = { "api_key": api_key, "env": env, @@ -827,6 +832,8 @@ def shutdown(self) -> None: """Shutdown the SDK.""" import asyncio + from .coverage_server import stop_coverage_collection + # Shutdown OpenTelemetry tracer provider if self._td_span_processor is not None: self._td_span_processor.shutdown() @@ -847,3 +854,5 @@ def shutdown(self) -> None: TraceBlockingManager.get_instance().shutdown() except Exception as e: logger.error(f"Error shutting down trace blocking manager: {e}") + + stop_coverage_collection()