From 5edeb9d1342c7786ded7cedb9e781aa5bbf6a379 Mon Sep 17 00:00:00 2001 From: Val Vladescu Date: Wed, 8 Apr 2026 10:35:50 +0300 Subject: [PATCH] fix(spec-009): clear sonar reliability rating + harden subprocess calls Follow-up to #14 to clear the sonar quality gate on master. - orchestration._run_parallel: replace `assert isinstance(item, AgentResult)` with explicit elif/else fallback. Asserts are stripped under python -O, so they're not load-bearing in production. Sonar flagged this as a bug. - quality._run: add a security review docstring documenting why these subprocess.run calls are safe (shell=False by default, argv-list, no user-controlled tokens reach argv, fixed scratch cwd, bounded timeout, capture_output=True). Add # nosec marker for B603 (subprocess-without- shell-equals-true is the desired form, not a finding). - quality._materialize: lift root.resolve() out of the loop and clarify the path-escape rejection comment. No behavior change. Same 53 spec-009 tests pass; ruff + format clean. --- src/pawbench/orchestration.py | 11 +++++++++-- src/pawbench/quality.py | 33 ++++++++++++++++++++++++++++----- 2 files changed, 37 insertions(+), 7 deletions(-) diff --git a/src/pawbench/orchestration.py b/src/pawbench/orchestration.py index e65ca9e..444ed13 100644 --- a/src/pawbench/orchestration.py +++ b/src/pawbench/orchestration.py @@ -125,9 +125,16 @@ async def _run_parallel( for item in raw: if isinstance(item, BaseException): results.append(AgentResult(agent_id="error", agent_name="error", error=str(item)[:200])) - else: - assert isinstance(item, AgentResult) + elif isinstance(item, AgentResult): results.append(item) + else: + # asyncio.gather only returns AgentResult or exceptions here, but + # be explicit instead of asserting (asserts get stripped under -O). + results.append( + AgentResult( + agent_id="error", agent_name="error", error=f"unexpected result type: {type(item).__name__}" + ) + ) return results diff --git a/src/pawbench/quality.py b/src/pawbench/quality.py index a743f1f..05ebdd4 100644 --- a/src/pawbench/quality.py +++ b/src/pawbench/quality.py @@ -124,14 +124,30 @@ def _which(name: str) -> str | None: def _run(cmd: list[str], cwd: Path, timeout: int = 60) -> tuple[int, str, str]: + """Run a static-analyzer subprocess. + + Security review (spec 009 / B4): + - shell=False (default) — no shell metacharacter expansion + - cmd is a list of explicit arguments built from `shutil.which()` paths + plus a single trusted scratch directory path; no user-controlled + token reaches argv + - cwd is the temporary scratch dir created by tempfile.TemporaryDirectory + - timeout is bounded; OSError/TimeoutExpired are caught + - capture_output=True — no stdio leak to the parent process + The user-controlled artifact bytes never reach this function as argv; + they're written to disk first by _materialize and addressed by file path. + """ + if not cmd or not isinstance(cmd[0], str): + return -1, "", "invalid command" try: - proc = subprocess.run( + proc = subprocess.run( # nosec B603 — argv-list, fixed scratch cwd, no shell cmd, cwd=str(cwd), capture_output=True, text=True, timeout=timeout, check=False, + shell=False, ) return proc.returncode, proc.stdout, proc.stderr except (subprocess.TimeoutExpired, FileNotFoundError, OSError) as e: @@ -139,15 +155,22 @@ def _run(cmd: list[str], cwd: Path, timeout: int = 60) -> tuple[int, str, str]: def _materialize(files: dict[str, str], root: Path) -> list[Path]: - """Write files to disk under root, return absolute paths.""" + """Write files to disk under root, return absolute paths. + + Security review: every relative path is resolved and re-anchored under + `root`; any path that escapes the scratch dir (via `..`, absolute path, + or symlink trick) is silently rejected. Content bytes are written + verbatim because the analyzers expect them as source files; no + interpretation happens here. + """ written: list[Path] = [] + root_resolved = root.resolve() for rel, content in files.items(): - # Reject path escapes — analyzer scratch dir must stay sealed. target = (root / rel).resolve() try: - target.relative_to(root.resolve()) + target.relative_to(root_resolved) except ValueError: - continue + continue # path escape attempt — silently drop target.parent.mkdir(parents=True, exist_ok=True) target.write_text(content, encoding="utf-8") written.append(target)