Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions src/fetchgraph/replay/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@
from pathlib import Path
from typing import Callable, Dict, Iterable

from fetchgraph.utils.path_layout import validate_run_relative_posix
from fetchgraph.utils.path_layout import (
safe_join_under_validated,
validate_run_relative_posix,
validate_safe_path_segment,
)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -613,26 +617,25 @@ def copy_resource_files(
fixture_stem: str,
) -> None:
planned: dict[Path, tuple[str, Path]] = {}
fixture_stem = validate_safe_path_segment(fixture_stem, what="fixture_stem")
for resource_id, resource in resources.items():
if not isinstance(resource_id, str) or not resource_id:
raise ValueError("resource_id must be a non-empty string")
if "/" in resource_id or "\\" in resource_id or ".." in Path(resource_id).parts:
raise ValueError(f"resource_id must be a safe path segment: {resource_id!r}")
resource_id = validate_safe_path_segment(resource_id, what="resource_id")
data_ref = resource.get("data_ref")
if not isinstance(data_ref, dict):
continue
file_name = data_ref.get("file")
if not isinstance(file_name, str) or not file_name:
continue
rel_path = validate_run_relative_posix(file_name)
src_path = run_dir / rel_path
src_path = safe_join_under_validated(run_dir, rel_path)
if not src_path.exists():
raise FileNotFoundError(
f"Missing resource file for rid={resource_id!r}: "
f"src={src_path} (run_dir={run_dir}, fixture={fixture_stem})"
)
dest_rel = Path("resources") / fixture_stem / resource_id / rel_path
dest_path = out_dir / dest_rel
dest_rel = validate_run_relative_posix(dest_rel.as_posix())
dest_path = safe_join_under_validated(out_dir, dest_rel)
dest_path.parent.mkdir(parents=True, exist_ok=True)
prev = planned.get(dest_path)
if prev is not None:
Expand Down
28 changes: 23 additions & 5 deletions src/fetchgraph/replay/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@
from pathlib import Path
from typing import Callable, Dict

from fetchgraph.utils.path_layout import (
safe_join_under_validated,
validate_run_relative_posix,
validate_safe_path_segment,
)

@dataclass(frozen=True)
class ReplayContext:
Expand All @@ -15,22 +20,35 @@ class ReplayContext:

def resolve_resource_path(self, resource_path: str | Path) -> Path:
path = Path(resource_path)
if path.is_absolute() or self.base_dir is None:
return path
if self.base_dir is None:
raise ValueError("base_dir is required for replay")
if path.is_absolute():
raise ValueError(f"resource path must be relative to base_dir: {path}")
resource_str = resource_path.as_posix() if isinstance(resource_path, Path) else str(resource_path)
rel_path = validate_run_relative_posix(resource_str)
if resource_str.startswith("resources/"):
return self.base_dir / path
if not self.fixture_stem:
raise ValueError("fixture_stem is required for resources/ paths")
fixture_stem = validate_safe_path_segment(self.fixture_stem, what="fixture_stem")
prefix = f"resources/{fixture_stem}/"
if not resource_str.startswith(prefix):
raise ValueError(f"resource path must be under {prefix}")
return safe_join_under_validated(self.base_dir, rel_path)
if self.fixture_stem and self.resources:
fixture_stem = validate_safe_path_segment(self.fixture_stem, what="fixture_stem")
for resource_id, resource in self.resources.items():
resource_id = validate_safe_path_segment(resource_id, what="resource_id")
if not isinstance(resource, dict):
continue
data_ref = resource.get("data_ref")
if not isinstance(data_ref, dict):
continue
file_name = data_ref.get("file")
if file_name == resource_str:
return self.base_dir / "resources" / self.fixture_stem / resource_id / Path(resource_str)
return self.base_dir / path
fixture_rel = Path("resources") / fixture_stem / resource_id / rel_path
fixture_rel = validate_run_relative_posix(fixture_rel.as_posix())
return safe_join_under_validated(self.base_dir, fixture_rel)
return safe_join_under_validated(self.base_dir, rel_path)


REPLAY_HANDLERS: Dict[str, Callable[[dict, ReplayContext], dict]] = {}
Expand Down
36 changes: 36 additions & 0 deletions src/fetchgraph/utils/path_layout.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,39 @@ def validate_run_relative_posix(path_str: str) -> Path:
return Path(*rel.parts)


def validate_safe_path_segment(name: str, *, what: str) -> str:
if not isinstance(name, str) or not name:
raise ValueError(f"{what} must be a non-empty string")
if "/" in name or "\\" in name:
raise ValueError(f"{what} must be a single path segment: {name!r}")
if name in {".", ".."}:
raise ValueError(f"{what} must not be '.' or '..': {name!r}")
if ":" in name:
raise ValueError(f"{what} must not contain ':': {name!r}")
return name


def safe_join_under(root: Path, rel_posix: str) -> Path:
if not isinstance(root, Path):
raise ValueError("root must be a Path")
rel_path = validate_run_relative_posix(rel_posix)
return safe_join_under_validated(root, rel_path)


def safe_join_under_validated(root: Path, rel_path: Path) -> Path:
if not isinstance(root, Path):
raise ValueError("root must be a Path")
if not isinstance(rel_path, Path):
raise ValueError("rel_path must be a Path")
if rel_path.is_absolute():
raise ValueError(f"path must be relative: {rel_path}")
root_resolved = root.resolve(strict=False)
candidate = (root / rel_path).resolve(strict=False)
if not candidate.is_relative_to(root_resolved):
raise ValueError(f"path escapes root: {rel_path} (root={root})")
return candidate


def _find_cases_dir(run_root: Path, cfg: LayoutConfig) -> Path:
direct = run_root / cfg.cases_dirname
if direct.exists():
Expand All @@ -182,5 +215,8 @@ def _find_cases_dir(run_root: Path, cfg: LayoutConfig) -> Path:
"run_relative_posix_path",
"run_root_from_case_dir",
"runs_root",
"safe_join_under",
"safe_join_under_validated",
"validate_safe_path_segment",
"validate_run_relative_posix",
]
63 changes: 63 additions & 0 deletions tests/test_replay_runtime.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import os
from pathlib import Path

import pytest
Expand Down Expand Up @@ -33,3 +34,65 @@ def test_resolve_resource_path_run_root_relative(tmp_path: Path) -> None:
resolved = ctx.resolve_resource_path(resource_path)
assert resolved == stored_path
assert resolved.read_text(encoding="utf-8") == "schema"


def test_resolve_resource_path_requires_base_dir() -> None:
ctx = ReplayContext()
with pytest.raises(ValueError, match="base_dir is required for replay"):
ctx.resolve_resource_path("ok/inside.txt")


def test_resolve_resource_path_rejects_absolute(tmp_path: Path) -> None:
base_dir = tmp_path / "fixtures"
base_dir.mkdir()
ctx = ReplayContext(base_dir=base_dir)
with pytest.raises(ValueError, match="relative to base_dir"):
ctx.resolve_resource_path("/etc/passwd")


def test_resolve_resource_path_rejects_parent_traversal(tmp_path: Path) -> None:
base_dir = tmp_path / "fixtures"
base_dir.mkdir()
ctx = ReplayContext(base_dir=base_dir)
with pytest.raises(ValueError, match="traverse parents"):
ctx.resolve_resource_path("../secrets.txt")


def test_resolve_resource_path_allows_inside_base_dir(tmp_path: Path) -> None:
base_dir = tmp_path / "fixtures"
base_dir.mkdir()
ctx = ReplayContext(base_dir=base_dir)
resolved = ctx.resolve_resource_path("ok/inside.txt")
assert resolved == base_dir / "ok" / "inside.txt"


def test_resolve_resource_path_rejects_symlink_escape(tmp_path: Path) -> None:
base_dir = tmp_path / "fixtures"
base_dir.mkdir()
outside_dir = tmp_path / "outside"
outside_dir.mkdir()
fixture_stem = "case_1__abcd1234"
resource_id = "schema_v1"
resource_path = "ok/inside.txt"
fixture_root = base_dir / "resources" / fixture_stem / resource_id
fixture_root.mkdir(parents=True, exist_ok=True)
try:
os.symlink(outside_dir, fixture_root / "ok")
except (OSError, NotImplementedError) as exc:
pytest.skip(f"symlink not supported: {exc}")

ctx = ReplayContext(
resources={resource_id: {"data_ref": {"file": resource_path}}},
base_dir=base_dir,
fixture_stem=fixture_stem,
)
with pytest.raises(ValueError, match="escapes root"):
ctx.resolve_resource_path(resource_path)


def test_resolve_resource_path_rejects_other_fixture_resources(tmp_path: Path) -> None:
base_dir = tmp_path / "fixtures"
base_dir.mkdir()
ctx = ReplayContext(base_dir=base_dir, fixture_stem="case_1__abcd1234")
with pytest.raises(ValueError, match="resources/case_1__abcd1234/"):
ctx.resolve_resource_path("resources/other_fixture/data.json")