diff --git a/autorepro/cli.py b/autorepro/cli.py index 7f2dc9c..e6831fa 100644 --- a/autorepro/cli.py +++ b/autorepro/cli.py @@ -570,6 +570,73 @@ def _setup_report_parser(subparsers) -> argparse.ArgumentParser: return report_parser +def _setup_replay_parser(subparsers) -> argparse.ArgumentParser: + """Setup replay subcommand parser.""" + replay_parser = subparsers.add_parser( + "replay", + help="Re-execute JSONL runs from previous exec sessions", + description="Re-run a previous execution log (runs.jsonl) to reproduce execution results, compare outcomes, and emit a new replay.jsonl plus REPLAY_SUMMARY.json", + ) + + # Required input file + replay_parser.add_argument( + "--from", + dest="from_path", + required=True, + help="Path to input JSONL file containing 'type:run' records", + ) + + # Selection and filtering options + replay_parser.add_argument( + "--until-success", + action="store_true", + help="Stop after the first successful replay (exit_code == 0)", + ) + replay_parser.add_argument( + "--indexes", + help="Filter the selected set before replay (single indices and ranges, comma-separated, e.g., '0,2-3')", + ) + + # Execution options + replay_parser.add_argument( + "--timeout", + type=int, + help="Per-run timeout in seconds (applied to each command)", + ) + + # Output options + replay_parser.add_argument( + "--jsonl", + help="Write per-run replay records and final summary to JSONL file (default: replay.jsonl)", + ) + replay_parser.add_argument( + "--summary", + help="Write a standalone JSON summary file (default: replay_summary.json)", + ) + + # Common options + replay_parser.add_argument( + "--dry-run", + action="store_true", + help="Print actions without executing", + ) + replay_parser.add_argument( + "-q", + "--quiet", + action="store_true", + help="Show errors only", + ) + replay_parser.add_argument( + "-v", + "--verbose", + action="count", + default=0, + help="Increase verbosity (-v, -vv)", + ) + + return replay_parser + + def create_parser() -> argparse.ArgumentParser: """Create and configure the argument parser.""" parser = argparse.ArgumentParser( @@ -604,6 +671,7 @@ def create_parser() -> argparse.ArgumentParser: _setup_plan_parser(subparsers) _setup_exec_parser(subparsers) _setup_report_parser(subparsers) + _setup_replay_parser(subparsers) return parser @@ -947,6 +1015,87 @@ def _parse_indexes(self, indexes_str: str) -> list[int]: # noqa: C901 return sorted(set(result)) +@dataclass +class ReplayConfig: + """Configuration for replay command operations.""" + + from_path: str + until_success: bool = False + indexes: str | None = None + timeout: int = field(default_factory=lambda: config.timeouts.default_seconds) + jsonl_path: str | None = None + summary_path: str | None = None + dry_run: bool = False + + def validate(self) -> None: + """Validate replay configuration and raise descriptive errors.""" + # Required field validation + if not self.from_path: + raise FieldValidationError("from_path is required", field="from_path") + + # File existence validation + from pathlib import Path + + if not Path(self.from_path).exists(): + raise FieldValidationError( + f"input file does not exist: {self.from_path}", field="from_path" + ) + + # Validate indexes format if provided + if self.indexes: + try: + self._parse_indexes(self.indexes) + except ValueError as e: + raise FieldValidationError( + f"invalid indexes format: {e}", field="indexes" + ) from e + + # Field validation + if self.timeout <= 0: + raise FieldValidationError( + f"timeout must be positive, got: {self.timeout}", field="timeout" + ) + + def _parse_indexes(self, indexes_str: str) -> list[int]: # noqa: C901 + """Parse indexes string like '0,2-3' into list of integers.""" + if not indexes_str.strip(): + raise ValueError("indexes string cannot be empty") + + result: list[int] = [] + for part in indexes_str.split(","): + part = part.strip() + if not part: + continue + + if "-" in part: + # Range like "2-5" + try: + start, end = part.split("-", 1) + start_idx = int(start.strip()) + end_idx = int(end.strip()) + if start_idx < 0 or end_idx < 0: + raise ValueError("indexes must be non-negative") + if start_idx > end_idx: + raise ValueError(f"invalid range {part}: start > end") + result.extend(range(start_idx, end_idx + 1)) + except ValueError as e: + if "invalid range" in str(e): + raise + raise ValueError(f"invalid range format: {part}") from e + else: + # Single index + try: + idx = int(part) + if idx < 0: + raise ValueError("indexes must be non-negative") + result.append(idx) + except ValueError: + raise ValueError(f"invalid index: {part}") from None + + # Remove duplicates and sort + return sorted(set(result)) + + @dataclass @dataclass class InitConfig: @@ -2071,6 +2220,317 @@ def cmd_exec(config: ExecConfig | None = None, **kwargs) -> int: return 1 # Generic error for unexpected failures +def _parse_jsonl_file(file_path: str) -> list[dict]: + """Parse JSONL file and return list of run records.""" + import json + + log = logging.getLogger("autorepro") + run_records = [] + + try: + with open(file_path, encoding="utf-8") as f: + for line_num, line in enumerate(f, 1): + line = line.strip() + if not line: + continue + + try: + record = json.loads(line) + if record.get("type") == "run": + run_records.append(record) + except json.JSONDecodeError as e: + log.warning(f"Skipping invalid JSON on line {line_num}: {e}") + continue + + except OSError as e: + log.error(f"Failed to read JSONL file {file_path}: {e}") + raise + + return run_records + + +def _filter_records_by_indexes( + records: list[dict], indexes_str: str | None +) -> list[dict]: + """Filter records by index string like '0,2-3'.""" + if not indexes_str: + return records + + config = ReplayConfig(from_path="") # Temporary instance for parsing + try: + selected_indexes = config._parse_indexes(indexes_str) + except ValueError as e: + raise ValueError(f"Invalid indexes format: {e}") from e + + # Filter records that match the selected indexes + filtered = [] + for record in records: + record_index = record.get("index") + if record_index is not None and record_index in selected_indexes: + filtered.append(record) + + return filtered + + +def _execute_replay_command( + cmd: str, timeout: int, repo_path: Path | None = None +) -> dict: + """Execute a command for replay and return results.""" + import subprocess + import time + from datetime import datetime + + log = logging.getLogger("autorepro") + + # Determine execution directory + exec_dir = repo_path if repo_path else Path.cwd() + + # Record start time + start_time = time.time() + start_dt = datetime.now() + + # Execute command with timeout + try: + result = subprocess.run( + cmd, + shell=True, + cwd=exec_dir, + capture_output=True, + text=True, + timeout=timeout, + ) + + end_time = time.time() + end_dt = datetime.now() + duration_ms = int((end_time - start_time) * 1000) + + # Prepare output previews (first 1KB for human inspection) + stdout_preview = result.stdout[:1024] if result.stdout else "" + stderr_preview = result.stderr[:1024] if result.stderr else "" + + return { + "exit_code": result.returncode, + "duration_ms": duration_ms, + "start_time": start_dt, + "end_time": end_dt, + "stdout_full": result.stdout, + "stderr_full": result.stderr, + "stdout_preview": stdout_preview, + "stderr_preview": stderr_preview, + } + + except subprocess.TimeoutExpired: + end_time = time.time() + end_dt = datetime.now() + duration_ms = int((end_time - start_time) * 1000) + + log.warning(f"Command timed out after {timeout}s: {cmd}") + return { + "exit_code": 124, # Standard timeout exit code + "duration_ms": duration_ms, + "start_time": start_dt, + "end_time": end_dt, + "stdout_full": "", + "stderr_full": f"Command timed out after {timeout}s", + "stdout_preview": "", + "stderr_preview": f"Command timed out after {timeout}s", + } + + except Exception as e: + end_time = time.time() + end_dt = datetime.now() + duration_ms = int((end_time - start_time) * 1000) + + log.error(f"Command execution failed: {e}") + return { + "exit_code": 1, + "duration_ms": duration_ms, + "start_time": start_dt, + "end_time": end_dt, + "stdout_full": "", + "stderr_full": f"Execution failed: {e}", + "stdout_preview": "", + "stderr_preview": f"Execution failed: {e}", + } + + +def _create_replay_run_record( + index: int, + cmd: str, + original_exit_code: int, + results: dict, +) -> dict: + """Create a replay run record for JSONL output.""" + return { + "type": "run", + "index": index, + "cmd": cmd, + "start_ts": results["start_time"].strftime("%Y-%m-%dT%H:%M:%SZ"), + "end_ts": results["end_time"].strftime("%Y-%m-%dT%H:%M:%SZ"), + "exit_code_original": original_exit_code, + "exit_code_replay": results["exit_code"], + "matched": original_exit_code == results["exit_code"], + "duration_ms": results["duration_ms"], + "stdout_preview": results["stdout_preview"], + "stderr_preview": results["stderr_preview"], + } + + +def _create_replay_summary_record( # noqa: PLR0913 + runs: int, + successes: int, + failures: int, + matches: int, + mismatches: int, + first_success_index: int | None, +) -> dict: + """Create a replay summary record for JSONL output.""" + return { + "type": "summary", + "schema_version": 1, + "tool": "autorepro", + "mode": "replay", + "runs": runs, + "successes": successes, + "failures": failures, + "matches": matches, + "mismatches": mismatches, + "first_success_index": first_success_index, + } + + +def cmd_replay(config: ReplayConfig) -> int: # noqa: PLR0915, C901, PLR0911, PLR0912 + """Handle the replay command.""" + log = logging.getLogger("autorepro") + + try: + # Validate configuration + config.validate() + + # Parse JSONL file to get run records + try: + run_records = _parse_jsonl_file(config.from_path) + except Exception as e: + log.error(f"Failed to parse JSONL file: {e}") + return 1 + + if not run_records: + log.error("No 'type:run' records found in JSONL file") + return 1 + + # Filter records by indexes if specified + if config.indexes: + try: + run_records = _filter_records_by_indexes(run_records, config.indexes) + except ValueError as e: + log.error(f"Index filtering failed: {e}") + return 1 + + if not run_records: + log.error("No records match the specified indexes") + return 1 + + # Sort records by index to ensure consistent execution order + run_records.sort(key=lambda x: x.get("index", 0)) + + if config.dry_run: + log.info(f"Would replay {len(run_records)} commands:") + for record in run_records: + index = record.get("index", "?") + cmd = record.get("cmd", "") + log.info(f" [{index}] {cmd}") + return 0 + + # Set default output paths if not specified + jsonl_path = config.jsonl_path or "replay.jsonl" + summary_path = config.summary_path or "replay_summary.json" + + # Execute replay + runs = 0 + successes = 0 + failures = 0 + matches = 0 + mismatches = 0 + first_success_index = None + + log.info(f"Starting replay of {len(run_records)} commands") + + for record in run_records: + index = record.get("index", runs) + cmd = record.get("cmd", "") + original_exit_code = record.get("exit_code", 0) + + if not cmd: + log.warning(f"Skipping record {index}: no command found") + continue + + log.info(f"Replaying [{index}]: {cmd}") + + # Execute the command + results = _execute_replay_command(cmd, config.timeout) + replay_exit_code = results["exit_code"] + + # Track statistics + runs += 1 + if replay_exit_code == 0: + successes += 1 + if first_success_index is None: + first_success_index = index + else: + failures += 1 + + if original_exit_code == replay_exit_code: + matches += 1 + else: + mismatches += 1 + + # Create and write replay run record + replay_record = _create_replay_run_record( + index, cmd, original_exit_code, results + ) + + if config.jsonl_path: + _write_jsonl_record(jsonl_path, replay_record) + + # Print output to console unless quiet + if results["stdout_full"]: + print(results["stdout_full"], end="") + if results["stderr_full"]: + print(results["stderr_full"], file=sys.stderr, end="") + + # Check for early stopping + if config.until_success and replay_exit_code == 0: + log.info(f"Stopping after first success (command {index})") + break + + # Create and write summary + summary_record = _create_replay_summary_record( + runs, successes, failures, matches, mismatches, first_success_index + ) + + if config.jsonl_path: + _write_jsonl_record(jsonl_path, summary_record) + + if config.summary_path: + try: + with open(summary_path, "w", encoding="utf-8") as f: + json.dump(summary_record, f, indent=2) + except OSError as e: + log.error(f"Failed to write summary file: {e}") + return 1 + + # Log final results + log.info( + f"Replay completed: {runs} runs, {successes} successes, {matches} matches" + ) + + return 0 + + except Exception as e: + log.error(f"Replay failed: {e}") + return 1 + + def _prepare_pr_config(config: PrConfig) -> PrConfig: """Validate and process PR configuration.""" log = logging.getLogger("autorepro") @@ -2506,6 +2966,25 @@ def _setup_logging(args, project_verbosity: str | None = None) -> None: configure_logging(level=level, fmt=None, stream=sys.stderr) +def _dispatch_replay_command(args) -> int: + """Dispatch replay command with parsed arguments.""" + global_config = get_config() + timeout_value = getattr(args, "timeout", None) + if timeout_value is None: + timeout_value = global_config.timeouts.default_seconds + + config = ReplayConfig( + from_path=args.from_path, + until_success=getattr(args, "until_success", False), + indexes=getattr(args, "indexes", None), + timeout=timeout_value, + jsonl_path=getattr(args, "jsonl", None), + summary_path=getattr(args, "summary", None), + dry_run=getattr(args, "dry_run", False), + ) + return cmd_replay(config) + + def _dispatch_command(args, parser) -> int: # noqa: PLR0911 """Dispatch command based on parsed arguments.""" if args.command == "scan": @@ -2520,6 +2999,8 @@ def _dispatch_command(args, parser) -> int: # noqa: PLR0911 return _dispatch_pr_command(args) elif args.command == "report": return _dispatch_report_command(args) + elif args.command == "replay": + return _dispatch_replay_command(args) return _dispatch_help_command(parser) diff --git a/autorepro/detect.py b/autorepro/detect.py index 9d1e86d..df8e36f 100644 --- a/autorepro/detect.py +++ b/autorepro/detect.py @@ -225,9 +225,11 @@ def _process_weighted_patterns( pattern=filename, path=f"./{filename}", kind=str(info["kind"]), - weight=int(info["weight"]) - if isinstance(info["weight"], int | str) - else 0, + weight=( + int(info["weight"]) + if isinstance(info["weight"], int | str) + else 0 + ), ), ) @@ -265,9 +267,11 @@ def _process_glob_pattern( pattern=pattern, path=f"./{basename}", kind=str(info["kind"]), - weight=int(info["weight"]) - if isinstance(info["weight"], int | str) - else 0, + weight=( + int(info["weight"]) + if isinstance(info["weight"], int | str) + else 0 + ), ), ) @@ -289,9 +293,9 @@ def _process_exact_filename( pattern=pattern, path=f"./{pattern}", kind=str(info["kind"]), - weight=int(info["weight"]) - if isinstance(info["weight"], int | str) - else 0, + weight=( + int(info["weight"]) if isinstance(info["weight"], int | str) else 0 + ), ), ) @@ -569,9 +573,11 @@ def collect_evidence( # noqa: C901 pattern=filename, path=rel_path, kind=str(info["kind"]), - weight=int(info["weight"]) - if isinstance(info["weight"], int | str) - else 0, + weight=( + int(info["weight"]) + if isinstance(info["weight"], int | str) + else 0 + ), ), ) @@ -595,9 +601,11 @@ def collect_evidence( # noqa: C901 pattern=pattern, path=rel_path, kind=str(info["kind"]), - weight=int(info["weight"]) - if isinstance(info["weight"], int | str) - else 0, + weight=( + int(info["weight"]) + if isinstance(info["weight"], int | str) + else 0 + ), ), ) else: diff --git a/autorepro/utils/error_handling.py b/autorepro/utils/error_handling.py index c317ae9..d20585a 100644 --- a/autorepro/utils/error_handling.py +++ b/autorepro/utils/error_handling.py @@ -295,9 +295,11 @@ def _safe_subprocess_run_impl( errno_val == 1 or "not permitted" in str(e).lower() ): te = subprocess.TimeoutExpired( - cmd - if isinstance(cmd, list) - else (cmd.split() if isinstance(cmd, str) else cmd), + ( + cmd + if isinstance(cmd, list) + else (cmd.split() if isinstance(cmd, str) else cmd) + ), config.timeout, ) error_msg = f"{operation_name} timed out after {config.timeout}s: {cmd_str}" diff --git a/autorepro/utils/logging.py b/autorepro/utils/logging.py index 307f923..9e65753 100644 --- a/autorepro/utils/logging.py +++ b/autorepro/utils/logging.py @@ -69,7 +69,11 @@ def formatTime(self, record: logging.LogRecord, datefmt: str | None = None) -> s s = time.strftime(datefmt, ct) else: s = time.strftime(self.default_time_format, ct) - return self.default_msec_format % (s, record.msecs) + # Safe access to msecs with fallback calculation for compatibility + msecs = getattr( + record, "msecs", int((record.created - int(record.created)) * 1000) + ) + return self.default_msec_format % (s, msecs) def converter(self, timestamp: float | None): # Use UTC timestamps for easier aggregation in logs @@ -91,7 +95,7 @@ def format(self, record: logging.LogRecord) -> str: for key, value in record.__dict__.items(): if key not in reserved and key not in {"message", "asctime"}: try: - extras.append(f"{key}={json.dumps(value, separators=(',',':'))}") + extras.append(f"{key}={json.dumps(value, separators=(',', ':'))}") except Exception: extras.append(f'{key}="{value}"') if record.exc_info: @@ -104,9 +108,13 @@ def format(self, record: logging.LogRecord) -> str: return base + (" " + " ".join(extras) if extras else "") def formatTime(self, record: logging.LogRecord, datefmt: str | None = None) -> str: # noqa: N802 - # ISO8601-ish UTC time + # ISO8601-ish UTC time with robust msecs handling ct = time.gmtime(record.created) - return time.strftime("%Y-%m-%dT%H:%M:%S", ct) + f".{int(record.msecs):03d}Z" + # Safe access to msecs with fallback calculation for compatibility + msecs = getattr( + record, "msecs", int((record.created - int(record.created)) * 1000) + ) + return time.strftime("%Y-%m-%dT%H:%M:%S", ct) + f".{int(msecs):03d}Z" class ContextAdapter(logging.LoggerAdapter): diff --git a/tests/test_replay.py b/tests/test_replay.py new file mode 100644 index 0000000..c49645b --- /dev/null +++ b/tests/test_replay.py @@ -0,0 +1,396 @@ +"""Tests for replay command functionality.""" + +import json +import tempfile +from pathlib import Path + +import pytest + +from autorepro.cli import ( + ReplayConfig, + _create_replay_run_record, + _create_replay_summary_record, + _filter_records_by_indexes, + _parse_jsonl_file, + cmd_replay, +) +from autorepro.config.exceptions import FieldValidationError + + +class TestReplayConfig: + """Test ReplayConfig validation and parsing.""" + + def test_valid_config(self): + """Test valid replay configuration.""" + with tempfile.TemporaryDirectory() as tmpdir: + test_file = Path(tmpdir) / "test.jsonl" + test_file.write_text( + '{"type": "run", "index": 0, "cmd": "echo test", "exit_code": 0}\n' + ) + + config = ReplayConfig(from_path=str(test_file)) + config.validate() # Should not raise + + def test_missing_from_path(self): + """Test validation fails when from_path is missing.""" + config = ReplayConfig(from_path="") + with pytest.raises(FieldValidationError, match="from_path is required"): + config.validate() + + def test_nonexistent_file(self): + """Test validation fails when file doesn't exist.""" + config = ReplayConfig(from_path="/nonexistent/file.jsonl") + with pytest.raises(FieldValidationError, match="input file does not exist"): + config.validate() + + def test_invalid_timeout(self): + """Test validation fails with invalid timeout.""" + with tempfile.TemporaryDirectory() as tmpdir: + test_file = Path(tmpdir) / "test.jsonl" + test_file.write_text( + '{"type": "run", "index": 0, "cmd": "echo test", "exit_code": 0}\n' + ) + + config = ReplayConfig(from_path=str(test_file), timeout=0) + with pytest.raises(FieldValidationError, match="timeout must be positive"): + config.validate() + + def test_valid_indexes_parsing(self): + """Test valid indexes string parsing.""" + with tempfile.TemporaryDirectory() as tmpdir: + test_file = Path(tmpdir) / "test.jsonl" + test_file.write_text( + '{"type": "run", "index": 0, "cmd": "echo test", "exit_code": 0}\n' + ) + + config = ReplayConfig(from_path=str(test_file), indexes="0,2-4,7") + result = config._parse_indexes("0,2-4,7") + expected = [0, 2, 3, 4, 7] + assert result == expected + + def test_invalid_indexes_parsing(self): + """Test invalid indexes string parsing.""" + with tempfile.TemporaryDirectory() as tmpdir: + test_file = Path(tmpdir) / "test.jsonl" + test_file.write_text( + '{"type": "run", "index": 0, "cmd": "echo test", "exit_code": 0}\n' + ) + + config = ReplayConfig(from_path=str(test_file), indexes="invalid") + with pytest.raises(FieldValidationError, match="invalid indexes format"): + config.validate() + + +class TestJSONLParsing: + """Test JSONL file parsing functionality.""" + + def test_parse_valid_jsonl(self): + """Test parsing valid JSONL file with run records.""" + with tempfile.TemporaryDirectory() as tmpdir: + test_file = Path(tmpdir) / "test.jsonl" + content = """{"type": "run", "index": 0, "cmd": "echo test1", "exit_code": 0} +{"type": "run", "index": 1, "cmd": "echo test2", "exit_code": 1} +{"type": "summary", "runs": 2} +""" + test_file.write_text(content) + + records = _parse_jsonl_file(str(test_file)) + assert len(records) == 2 + assert records[0]["cmd"] == "echo test1" + assert records[1]["cmd"] == "echo test2" + + def test_parse_empty_file(self): + """Test parsing empty JSONL file.""" + with tempfile.TemporaryDirectory() as tmpdir: + test_file = Path(tmpdir) / "empty.jsonl" + test_file.write_text("") + + records = _parse_jsonl_file(str(test_file)) + assert records == [] + + def test_parse_no_run_records(self): + """Test parsing JSONL file with no run records.""" + with tempfile.TemporaryDirectory() as tmpdir: + test_file = Path(tmpdir) / "test.jsonl" + content = """{"type": "summary", "runs": 0} +{"type": "other", "data": "value"} +""" + test_file.write_text(content) + + records = _parse_jsonl_file(str(test_file)) + assert records == [] + + def test_parse_invalid_json_lines(self): + """Test parsing JSONL file with some invalid JSON lines.""" + with tempfile.TemporaryDirectory() as tmpdir: + test_file = Path(tmpdir) / "test.jsonl" + content = """{"type": "run", "index": 0, "cmd": "echo test1", "exit_code": 0} +invalid json line +{"type": "run", "index": 1, "cmd": "echo test2", "exit_code": 1} +""" + test_file.write_text(content) + + records = _parse_jsonl_file(str(test_file)) + assert len(records) == 2 + assert records[0]["cmd"] == "echo test1" + assert records[1]["cmd"] == "echo test2" + + +class TestIndexFiltering: + """Test index filtering functionality.""" + + def test_filter_by_single_index(self): + """Test filtering by single index.""" + records = [ + {"type": "run", "index": 0, "cmd": "cmd0"}, + {"type": "run", "index": 1, "cmd": "cmd1"}, + {"type": "run", "index": 2, "cmd": "cmd2"}, + ] + + filtered = _filter_records_by_indexes(records, "1") + assert len(filtered) == 1 + assert filtered[0]["cmd"] == "cmd1" + + def test_filter_by_range(self): + """Test filtering by range.""" + records = [ + {"type": "run", "index": 0, "cmd": "cmd0"}, + {"type": "run", "index": 1, "cmd": "cmd1"}, + {"type": "run", "index": 2, "cmd": "cmd2"}, + {"type": "run", "index": 3, "cmd": "cmd3"}, + ] + + filtered = _filter_records_by_indexes(records, "1-2") + assert len(filtered) == 2 + assert filtered[0]["cmd"] == "cmd1" + assert filtered[1]["cmd"] == "cmd2" + + def test_filter_by_mixed_indexes(self): + """Test filtering by mixed indexes and ranges.""" + records = [ + {"type": "run", "index": 0, "cmd": "cmd0"}, + {"type": "run", "index": 1, "cmd": "cmd1"}, + {"type": "run", "index": 2, "cmd": "cmd2"}, + {"type": "run", "index": 3, "cmd": "cmd3"}, + {"type": "run", "index": 4, "cmd": "cmd4"}, + ] + + filtered = _filter_records_by_indexes(records, "0,2-3") + assert len(filtered) == 3 + assert filtered[0]["cmd"] == "cmd0" + assert filtered[1]["cmd"] == "cmd2" + assert filtered[2]["cmd"] == "cmd3" + + def test_filter_no_matches(self): + """Test filtering with no matching indexes.""" + records = [ + {"type": "run", "index": 0, "cmd": "cmd0"}, + {"type": "run", "index": 1, "cmd": "cmd1"}, + ] + + filtered = _filter_records_by_indexes(records, "5-7") + assert filtered == [] + + def test_filter_no_indexes_specified(self): + """Test filtering with no indexes specified returns all records.""" + records = [ + {"type": "run", "index": 0, "cmd": "cmd0"}, + {"type": "run", "index": 1, "cmd": "cmd1"}, + ] + + filtered = _filter_records_by_indexes(records, None) + assert filtered == records + + +class TestRecordCreation: + """Test replay record creation functions.""" + + def test_create_replay_run_record(self): + """Test creation of replay run records.""" + from datetime import datetime + + results = { + "exit_code": 1, + "duration_ms": 150, + "start_time": datetime(2025, 9, 15, 12, 0, 0), + "end_time": datetime(2025, 9, 15, 12, 0, 1), + "stdout_preview": "output", + "stderr_preview": "error", + } + + record = _create_replay_run_record(2, "test command", 0, results) + + assert record["type"] == "run" + assert record["index"] == 2 + assert record["cmd"] == "test command" + assert record["exit_code_original"] == 0 + assert record["exit_code_replay"] == 1 + assert record["matched"] is False + assert record["duration_ms"] == 150 + assert record["stdout_preview"] == "output" + assert record["stderr_preview"] == "error" + + def test_create_replay_summary_record(self): + """Test creation of replay summary records.""" + record = _create_replay_summary_record(5, 3, 2, 4, 1, 1) + + assert record["type"] == "summary" + assert record["schema_version"] == 1 + assert record["tool"] == "autorepro" + assert record["mode"] == "replay" + assert record["runs"] == 5 + assert record["successes"] == 3 + assert record["failures"] == 2 + assert record["matches"] == 4 + assert record["mismatches"] == 1 + assert record["first_success_index"] == 1 + + +class TestReplayExecution: + """Test end-to-end replay execution.""" + + def test_replay_dry_run(self): + """Test replay dry run functionality.""" + with tempfile.TemporaryDirectory() as tmpdir: + # Create test JSONL file + test_file = Path(tmpdir) / "test.jsonl" + content = """{"type": "run", "index": 0, "cmd": "echo test1", "exit_code": 0} +{"type": "run", "index": 1, "cmd": "echo test2", "exit_code": 1} +""" + test_file.write_text(content) + + config = ReplayConfig(from_path=str(test_file), dry_run=True) + result = cmd_replay(config) + + assert result == 0 + + def test_replay_simple_execution(self): + """Test simple replay execution.""" + with tempfile.TemporaryDirectory() as tmpdir: + # Create test JSONL file + test_file = Path(tmpdir) / "test.jsonl" + content = """{"type": "run", "index": 0, "cmd": "echo 'success'", "exit_code": 0} +{"type": "run", "index": 1, "cmd": "python -c 'import sys; sys.exit(1)'", "exit_code": 1} +""" + test_file.write_text(content) + + # Create output files + jsonl_file = Path(tmpdir) / "replay.jsonl" + summary_file = Path(tmpdir) / "summary.json" + + config = ReplayConfig( + from_path=str(test_file), + jsonl_path=str(jsonl_file), + summary_path=str(summary_file), + ) + result = cmd_replay(config) + + assert result == 0 + assert jsonl_file.exists() + assert summary_file.exists() + + # Verify JSONL output + jsonl_lines = jsonl_file.read_text().strip().split("\n") + assert len(jsonl_lines) == 3 # 2 run records + 1 summary + + # Verify summary + summary = json.loads(summary_file.read_text()) + assert summary["runs"] == 2 + assert summary["successes"] == 1 + assert summary["failures"] == 1 + + def test_replay_with_index_filtering(self): + """Test replay with index filtering.""" + with tempfile.TemporaryDirectory() as tmpdir: + # Create test JSONL file + test_file = Path(tmpdir) / "test.jsonl" + content = """{"type": "run", "index": 0, "cmd": "echo 'cmd0'", "exit_code": 0} +{"type": "run", "index": 1, "cmd": "echo 'cmd1'", "exit_code": 0} +{"type": "run", "index": 2, "cmd": "echo 'cmd2'", "exit_code": 0} +""" + test_file.write_text(content) + + summary_file = Path(tmpdir) / "summary.json" + + config = ReplayConfig( + from_path=str(test_file), + indexes="0,2", + summary_path=str(summary_file), + ) + result = cmd_replay(config) + + assert result == 0 + + # Verify only 2 commands were run + summary = json.loads(summary_file.read_text()) + assert summary["runs"] == 2 + + def test_replay_until_success(self): + """Test replay with until-success flag.""" + with tempfile.TemporaryDirectory() as tmpdir: + # Create test JSONL file - first command fails, second succeeds + test_file = Path(tmpdir) / "test.jsonl" + content = """{"type": "run", "index": 0, "cmd": "python -c 'import sys; sys.exit(1)'", "exit_code": 1} +{"type": "run", "index": 1, "cmd": "echo 'success'", "exit_code": 0} +{"type": "run", "index": 2, "cmd": "echo 'should not run'", "exit_code": 0} +""" + test_file.write_text(content) + + summary_file = Path(tmpdir) / "summary.json" + + config = ReplayConfig( + from_path=str(test_file), + until_success=True, + summary_path=str(summary_file), + ) + result = cmd_replay(config) + + assert result == 0 + + # Should stop after second command (first success) + summary = json.loads(summary_file.read_text()) + assert summary["runs"] == 2 + assert summary["successes"] == 1 + assert summary["first_success_index"] == 1 + + def test_replay_error_cases(self): + """Test replay error handling.""" + # Test with nonexistent file + config = ReplayConfig(from_path="/nonexistent/file.jsonl") + result = cmd_replay(config) + assert result == 1 + + # Test with empty JSONL file + with tempfile.TemporaryDirectory() as tmpdir: + test_file = Path(tmpdir) / "empty.jsonl" + test_file.write_text("") + + config = ReplayConfig(from_path=str(test_file)) + result = cmd_replay(config) + assert result == 1 + + def test_replay_invalid_indexes(self): + """Test replay with invalid indexes.""" + with tempfile.TemporaryDirectory() as tmpdir: + # Create test JSONL file + test_file = Path(tmpdir) / "test.jsonl" + content = """{"type": "run", "index": 0, "cmd": "echo test", "exit_code": 0} +""" + test_file.write_text(content) + + config = ReplayConfig(from_path=str(test_file), indexes="invalid") + result = cmd_replay(config) + assert result == 1 + + def test_replay_no_matching_indexes(self): + """Test replay when no records match specified indexes.""" + with tempfile.TemporaryDirectory() as tmpdir: + # Create test JSONL file + test_file = Path(tmpdir) / "test.jsonl" + content = """{"type": "run", "index": 0, "cmd": "echo test", "exit_code": 0} +""" + test_file.write_text(content) + + config = ReplayConfig(from_path=str(test_file), indexes="5-7") + result = cmd_replay(config) + assert result == 1