diff --git a/codeframe/core/adapters/verification_wrapper.py b/codeframe/core/adapters/verification_wrapper.py index d76ab972..a571d775 100644 --- a/codeframe/core/adapters/verification_wrapper.py +++ b/codeframe/core/adapters/verification_wrapper.py @@ -1,24 +1,45 @@ -"""Verification gate wrapper for agent adapters.""" +"""Verification gate wrapper for agent adapters. + +Wraps any AgentAdapter with post-execution verification gates, quick fixes, +fix attempt tracking, and escalation to blockers. This gives all execution +engines (built-in and external) the same self-correction capabilities that +ReactAgent has internally. +""" from __future__ import annotations +import logging from pathlib import Path from typing import Callable, Optional from codeframe.core.adapters.agent_adapter import AgentAdapter, AgentEvent, AgentResult +from codeframe.core import blockers +from codeframe.core.fix_tracker import ( + EscalationDecision, + FixAttemptTracker, + FixOutcome, + build_escalation_question, +) from codeframe.core.gates import GateStatus from codeframe.core.gates import run as run_gates +from codeframe.core.quick_fixes import apply_quick_fix, find_quick_fix from codeframe.core.workspace import Workspace +logger = logging.getLogger(__name__) + class VerificationWrapper: """Wraps any AgentAdapter with post-execution verification gates. After the inner adapter completes, runs verification gates (pytest, ruff, etc.). - If gates fail, re-invokes the adapter with error context for self-correction, - up to max_correction_rounds times. + If gates fail: + 1. Try a pattern-based quick fix (no LLM needed) + 2. If quick fix applied, re-run gates immediately + 3. If no quick fix, re-invoke adapter with error context for self-correction + 4. Track all fix attempts to detect loops + 5. Escalate to blocker when fix tracker recommends it or retries exhausted - This is the same self-correction loop that ReactAgent._run_final_verification() + This is the same self-correction pattern that ReactAgent._run_final_verification() uses, but decoupled from any specific engine so it wraps any adapter. """ @@ -26,7 +47,7 @@ def __init__( self, inner: AgentAdapter, workspace: Workspace, - max_correction_rounds: int = 3, + max_correction_rounds: int = 5, gate_names: Optional[list[str]] = None, verbose: bool = False, ) -> None: @@ -35,6 +56,7 @@ def __init__( self._max_correction_rounds = max_correction_rounds self._gate_names = gate_names # None = use default gates self._verbose = verbose + self.fix_tracker = FixAttemptTracker() @property def name(self) -> str: @@ -78,7 +100,12 @@ def run( on_event(AgentEvent(type="verification_passed", data={})) return result - # Gates failed -- build correction prompt and re-invoke + # Gates failed — get structured error summary + error_summary = ( + gate_result.get_error_summary() or + self._format_gate_errors(gate_result) + ) + if on_event: on_event(AgentEvent( type="verification_failed", @@ -88,12 +115,40 @@ def run( }, )) - error_summary = self._format_gate_errors(gate_result) + # 1. Record the attempt (outcome deferred until we know if fix works) + self.fix_tracker.record_attempt(error_summary, "verification_gate") + + # 2. Check escalation based on prior history + escalation = self.fix_tracker.should_escalate(error_summary) + if escalation.should_escalate: + self.fix_tracker.record_outcome( + error_summary, "verification_gate", FixOutcome.FAILED, + ) + return self._create_escalation_blocker( + task_id, error_summary, escalation, + last_output=result.output, + ) + + # 3. Try quick fix first (no adapter re-invocation needed) + if self._try_quick_fix(error_summary): + self.fix_tracker.record_outcome( + error_summary, "verification_gate", FixOutcome.SUCCESS, + ) + self._verbose_print( + f"[VerificationWrapper] Quick fix applied (round {round_num + 1})" + ) + continue # Re-run gates without re-invoking adapter + + # 4. No quick fix — record failure and re-invoke adapter with error context + self.fix_tracker.record_outcome( + error_summary, "verification_gate", FixOutcome.FAILED, + ) + formatted_errors = self._format_gate_errors(gate_result) correction_prompt = ( f"{prompt}\n\n" f"## Verification Gate Failures (Correction Round {round_num + 1})\n\n" f"Your previous changes failed the following verification gates. " - f"Fix these issues:\n\n{error_summary}" + f"Fix these issues:\n\n{formatted_errors}" ) result = self._inner.run( @@ -113,17 +168,91 @@ def run( if gate_result.passed: return result - # All rounds exhausted, gates still failing + # All rounds exhausted — create blocker error_summary = self._format_gate_errors(gate_result) + return self._create_exhaustion_blocker( + task_id, error_summary, last_output=result.output, + ) + + def _try_quick_fix(self, error_summary: str) -> bool: + """Attempt a pattern-based quick fix for the gate error. + + Returns True if a fix was successfully applied. + """ + fix = find_quick_fix(error_summary, repo_path=self._workspace.repo_path) + if fix is None: + return False + + success, msg = apply_quick_fix(fix, self._workspace.repo_path) + if success: + self._verbose_print(f"[VerificationWrapper] Quick fix: {msg}") + return success + + def _create_escalation_blocker( + self, + task_id: str, + error: str, + escalation: EscalationDecision, + last_output: str = "", + ) -> AgentResult: + """Create a blocker when fix tracker recommends escalation.""" + question = build_escalation_question( + error, escalation.reason, self.fix_tracker, + ) + + try: + blockers.create( + workspace=self._workspace, + question=question, + task_id=task_id, + ) + except Exception: + logger.warning("Failed to create escalation blocker", exc_info=True) + return AgentResult( - status="failed", - output=result.output, + status="blocked", + output=last_output, + blocker_question=question, + error=f"Escalated to blocker: {escalation.reason}", + ) + + def _create_exhaustion_blocker( + self, + task_id: str, + error_summary: str, + last_output: str = "", + ) -> AgentResult: + """Create a blocker when all correction rounds are exhausted.""" + question = ( + f"Verification gates still failing after " + f"{self._max_correction_rounds} correction rounds.\n\n" + f"Errors:\n{error_summary[:500]}\n\n" + f"Please investigate and provide guidance." + ) + + try: + blockers.create( + workspace=self._workspace, + question=question, + task_id=task_id, + ) + except Exception: + logger.warning("Failed to create exhaustion blocker", exc_info=True) + + return AgentResult( + status="blocked", + output=last_output, + blocker_question=question, error=( f"Verification gates still failing after " f"{self._max_correction_rounds} correction rounds:\n{error_summary}" ), ) + def _verbose_print(self, msg: str) -> None: + if self._verbose: + print(msg) + @staticmethod def _format_gate_errors(gate_result) -> str: """Format gate check failures into a readable summary.""" diff --git a/codeframe/core/fix_tracker.py b/codeframe/core/fix_tracker.py index 170b7b4c..28154071 100644 --- a/codeframe/core/fix_tracker.py +++ b/codeframe/core/fix_tracker.py @@ -453,3 +453,28 @@ def from_dict(cls, data: dict) -> "FixAttemptTracker": tracker._file_counts = dict(data.get("file_counts", {})) return tracker + + +def build_escalation_question( + error: str, + escalation_reason: str, + fix_tracker: FixAttemptTracker, +) -> str: + """Build a human-readable blocker question for escalation. + + Shared by VerificationWrapper and ReactAgent to produce consistent + escalation blocker messages. + """ + context = fix_tracker.get_blocker_context(error) + attempted = context.get("attempted_fixes", []) + attempted_str = ( + "\n".join(f" - {f}" for f in attempted) if attempted else " (none)" + ) + return ( + f"Verification keeps failing and automated fixes are not working.\n\n" + f"Error: {error[:300]}\n\n" + f"Reason for escalation: {escalation_reason}\n\n" + f"Fixes already attempted:\n{attempted_str}\n\n" + f"Total failures in this run: {context.get('total_run_failures', 0)}\n\n" + f"Please investigate and provide guidance." + ) diff --git a/codeframe/core/react_agent.py b/codeframe/core/react_agent.py index 5bddafbb..df31e0de 100644 --- a/codeframe/core/react_agent.py +++ b/codeframe/core/react_agent.py @@ -22,7 +22,12 @@ from codeframe.core.blocker_detection import classify_error_for_blocker from codeframe.core.context import ContextLoader, TaskContext from codeframe.core.events import EventType -from codeframe.core.fix_tracker import EscalationDecision, FixAttemptTracker, FixOutcome +from codeframe.core.fix_tracker import ( + EscalationDecision, + FixAttemptTracker, + FixOutcome, + build_escalation_question, +) from codeframe.core.stall_detector import StallAction, StallDetectedError from codeframe.core.stall_monitor import StallEvent, StallMonitor from codeframe.core.models import AgentPhase, CompletionEvent, ErrorEvent, ProgressEvent @@ -1029,17 +1034,8 @@ def _create_escalation_blocker( the run record to the blocker. If creation fails the exception propagates — callers in ``run()`` catch it and return FAILED. """ - context = self.fix_tracker.get_blocker_context(error) - attempted = context.get("attempted_fixes", []) - attempted_str = "\n".join(f" - {f}" for f in attempted) if attempted else " (none)" - - question = ( - f"Verification keeps failing and automated fixes are not working.\n\n" - f"Error: {error[:300]}\n\n" - f"Reason for escalation: {escalation.reason}\n\n" - f"Fixes already attempted:\n{attempted_str}\n\n" - f"Total failures in this run: {context.get('total_run_failures', 0)}\n\n" - f"Please investigate and provide guidance." + question = build_escalation_question( + error, escalation.reason, self.fix_tracker, ) blocker = blockers.create( workspace=self.workspace, diff --git a/codeframe/core/runtime.py b/codeframe/core/runtime.py index 39cbcfcc..4b413823 100644 --- a/codeframe/core/runtime.py +++ b/codeframe/core/runtime.py @@ -692,7 +692,7 @@ def on_adapter_event(event: AdapterEvent) -> None: adapter = get_external_adapter(engine) wrapper = VerificationWrapper( - adapter, workspace, max_correction_rounds=3, verbose=verbose, + adapter, workspace, max_correction_rounds=5, verbose=verbose, ) result = wrapper.run( diff --git a/tests/core/adapters/test_verification_wrapper.py b/tests/core/adapters/test_verification_wrapper.py index 2e5d8b20..bb1860df 100644 --- a/tests/core/adapters/test_verification_wrapper.py +++ b/tests/core/adapters/test_verification_wrapper.py @@ -6,6 +6,7 @@ from codeframe.core.adapters.verification_wrapper import VerificationWrapper from codeframe.core.adapters.agent_adapter import AgentAdapter, AgentEvent, AgentResult +from codeframe.core.fix_tracker import FixAttemptTracker, FixOutcome, build_escalation_question from codeframe.core.gates import GateStatus @@ -41,6 +42,7 @@ def failing_gate_result(): result = MagicMock() result.passed = False result.checks = [check] + result.get_error_summary.return_value = "test_main.py:1:1: E001 test failure" return result @@ -95,21 +97,31 @@ def test_self_correction_on_gate_failure( second_prompt = mock_inner_adapter.run.call_args_list[1][0][1] assert "Verification Gate Failures" in second_prompt - def test_exhausted_correction_rounds( + def test_exhausted_correction_rounds_creates_blocker( self, mock_inner_adapter, mock_workspace, failing_gate_result, ): - """If all correction rounds fail, return failed result.""" - with patch( - "codeframe.core.adapters.verification_wrapper.run_gates", - return_value=failing_gate_result, + """If all correction rounds fail, create blocker and return blocked.""" + with ( + patch( + "codeframe.core.adapters.verification_wrapper.run_gates", + return_value=failing_gate_result, + ), + patch( + "codeframe.core.adapters.verification_wrapper.blockers", + ) as mock_blockers, ): + mock_blocker = MagicMock() + mock_blocker.id = "b-exhaust" + mock_blockers.create.return_value = mock_blocker + wrapper = VerificationWrapper( mock_inner_adapter, mock_workspace, max_correction_rounds=2, ) result = wrapper.run("t1", "prompt", Path("/tmp")) - assert result.status == "failed" + assert result.status == "blocked" assert "still failing after 2 correction rounds" in result.error + mock_blockers.create.assert_called_once() def test_emits_verification_events( self, mock_inner_adapter, mock_workspace, passing_gate_result, @@ -178,3 +190,293 @@ def test_custom_gate_names( mock_gates.assert_called_once_with( mock_workspace, gates=["ruff"], verbose=False, ) + + +class TestQuickFixIntegration: + """Tests for quick fix integration in VerificationWrapper.""" + + def test_quick_fix_applied_before_adapter_reinvocation( + self, mock_inner_adapter, mock_workspace, failing_gate_result, passing_gate_result, + ): + """Quick fix should be tried before re-invoking the adapter.""" + with ( + patch( + "codeframe.core.adapters.verification_wrapper.run_gates", + ) as mock_gates, + patch( + "codeframe.core.adapters.verification_wrapper.find_quick_fix", + ) as mock_find, + patch( + "codeframe.core.adapters.verification_wrapper.apply_quick_fix", + return_value=(True, "Fixed"), + ), + ): + # First gate check fails, quick fix applied, second gate check passes + mock_gates.side_effect = [failing_gate_result, passing_gate_result] + mock_find.return_value = MagicMock() # Non-None = fix found + + wrapper = VerificationWrapper( + mock_inner_adapter, mock_workspace, max_correction_rounds=3, + ) + result = wrapper.run("t1", "prompt", Path("/tmp")) + + assert result.status == "completed" + # Adapter should NOT be re-invoked — quick fix handled it + assert mock_inner_adapter.run.call_count == 1 + + def test_quick_fix_failure_falls_through_to_adapter( + self, mock_inner_adapter, mock_workspace, failing_gate_result, passing_gate_result, + ): + """When quick fix fails, fall through to adapter re-invocation.""" + with ( + patch( + "codeframe.core.adapters.verification_wrapper.run_gates", + ) as mock_gates, + patch( + "codeframe.core.adapters.verification_wrapper.find_quick_fix", + return_value=None, + ), + ): + mock_gates.side_effect = [failing_gate_result, passing_gate_result] + + wrapper = VerificationWrapper( + mock_inner_adapter, mock_workspace, max_correction_rounds=3, + ) + result = wrapper.run("t1", "prompt", Path("/tmp")) + + assert result.status == "completed" + # No quick fix found, so adapter re-invoked + assert mock_inner_adapter.run.call_count == 2 + + def test_quick_fix_apply_failure_falls_through( + self, mock_inner_adapter, mock_workspace, failing_gate_result, passing_gate_result, + ): + """When quick fix is found but apply fails, fall through to adapter.""" + with ( + patch( + "codeframe.core.adapters.verification_wrapper.run_gates", + ) as mock_gates, + patch( + "codeframe.core.adapters.verification_wrapper.find_quick_fix", + ) as mock_find, + patch( + "codeframe.core.adapters.verification_wrapper.apply_quick_fix", + return_value=(False, "apply failed"), + ), + ): + mock_gates.side_effect = [failing_gate_result, passing_gate_result] + mock_find.return_value = MagicMock() + + wrapper = VerificationWrapper( + mock_inner_adapter, mock_workspace, max_correction_rounds=3, + ) + result = wrapper.run("t1", "prompt", Path("/tmp")) + + assert result.status == "completed" + # Quick fix apply failed, so adapter re-invoked + assert mock_inner_adapter.run.call_count == 2 + + +class TestFixTrackerIntegration: + """Tests for FixAttemptTracker integration in VerificationWrapper.""" + + def test_fix_tracker_records_gate_failures( + self, mock_inner_adapter, mock_workspace, failing_gate_result, + ): + """Gate failures should be recorded in the fix tracker.""" + failing_gate_result.get_error_summary = MagicMock( + return_value="test.py:1:1: E501 Line too long" + ) + with ( + patch( + "codeframe.core.adapters.verification_wrapper.run_gates", + return_value=failing_gate_result, + ), + patch( + "codeframe.core.adapters.verification_wrapper.find_quick_fix", + return_value=None, + ), + ): + wrapper = VerificationWrapper( + mock_inner_adapter, mock_workspace, max_correction_rounds=1, + ) + wrapper.run("t1", "prompt", Path("/tmp")) + + assert wrapper.fix_tracker.get_total_failures() > 0 + + +class TestEscalationIntegration: + """Tests for escalation blocker creation in VerificationWrapper.""" + + def test_escalation_creates_blocker_and_returns_blocked( + self, mock_inner_adapter, mock_workspace, failing_gate_result, + ): + """When fix tracker recommends escalation, create blocker and return blocked.""" + failing_gate_result.get_error_summary = MagicMock( + return_value="SyntaxError: invalid syntax" + ) + with ( + patch( + "codeframe.core.adapters.verification_wrapper.run_gates", + return_value=failing_gate_result, + ), + patch( + "codeframe.core.adapters.verification_wrapper.find_quick_fix", + return_value=None, + ), + patch( + "codeframe.core.adapters.verification_wrapper.blockers", + ) as mock_blockers, + ): + mock_blocker = MagicMock() + mock_blocker.id = "blocker-123" + mock_blockers.create.return_value = mock_blocker + + # Use high max_correction_rounds but force escalation via tracker + wrapper = VerificationWrapper( + mock_inner_adapter, mock_workspace, max_correction_rounds=10, + ) + # Pre-fill tracker to trigger escalation (3+ same-error failures) + error_text = "SyntaxError: invalid syntax" + for _ in range(4): + wrapper.fix_tracker.record_attempt(error_text, "gate_failure") + wrapper.fix_tracker.record_outcome( + error_text, "gate_failure", + FixOutcome.FAILED, + ) + + result = wrapper.run("t1", "prompt", Path("/tmp")) + + assert result.status == "blocked" + assert result.blocker_question is not None + assert "escalation" in result.blocker_question.lower() or "failing" in result.blocker_question.lower() + mock_blockers.create.assert_called_once() + + def test_max_retries_exhausted_creates_blocker( + self, mock_inner_adapter, mock_workspace, failing_gate_result, + ): + """When all correction rounds exhausted, create blocker instead of returning failed.""" + failing_gate_result.get_error_summary = MagicMock( + return_value="FAILED test_main.py::test_foo" + ) + with ( + patch( + "codeframe.core.adapters.verification_wrapper.run_gates", + return_value=failing_gate_result, + ), + patch( + "codeframe.core.adapters.verification_wrapper.find_quick_fix", + return_value=None, + ), + patch( + "codeframe.core.adapters.verification_wrapper.blockers", + ) as mock_blockers, + ): + mock_blocker = MagicMock() + mock_blocker.id = "blocker-456" + mock_blockers.create.return_value = mock_blocker + + wrapper = VerificationWrapper( + mock_inner_adapter, mock_workspace, max_correction_rounds=2, + ) + result = wrapper.run("t1", "prompt", Path("/tmp")) + + assert result.status == "blocked" + assert result.blocker_question is not None + mock_blockers.create.assert_called_once() + + +class TestDefaultMaxRetries: + """Test default max_correction_rounds is 5.""" + + def test_default_max_correction_rounds_is_five( + self, mock_inner_adapter, mock_workspace, + ): + wrapper = VerificationWrapper(mock_inner_adapter, mock_workspace) + assert wrapper._max_correction_rounds == 5 + + +class TestBuildEscalationQuestion: + """Tests for the shared build_escalation_question helper.""" + + def test_includes_error_and_reason(self): + tracker = FixAttemptTracker() + question = build_escalation_question( + "SyntaxError: invalid syntax", + "Same error 3+ times", + tracker, + ) + assert "SyntaxError" in question + assert "Same error 3+ times" in question + assert "automated fixes are not working" in question + + def test_includes_attempted_fixes(self): + tracker = FixAttemptTracker() + tracker.record_attempt("err", "fix_1") + tracker.record_attempt("err", "fix_2") + question = build_escalation_question("err", "reason", tracker) + assert "fix_1" in question + assert "fix_2" in question + + def test_truncates_long_errors(self): + tracker = FixAttemptTracker() + long_error = "x" * 500 + question = build_escalation_question(long_error, "reason", tracker) + assert len(question) < len(long_error) + 500 # Error truncated to 300 + + +class TestIntegrationScenario: + """Integration test: engine succeeds → gates fail → retry → gates pass.""" + + def test_full_correction_flow( + self, mock_inner_adapter, mock_workspace, + ): + """Simulate: adapter completes, gates fail, no quick fix, adapter + re-invoked with error context, gates pass on second check.""" + passing = MagicMock() + passing.passed = True + passing.checks = [] + + failing = MagicMock() + failing.passed = False + failing.get_error_summary.return_value = "test.py:1:1: E501 Line too long" + check = MagicMock() + check.name = "ruff" + check.status = GateStatus.FAILED + check.output = "test.py:1:1: E501 Line too long" + failing.checks = [check] + + with ( + patch( + "codeframe.core.adapters.verification_wrapper.run_gates", + ) as mock_gates, + patch( + "codeframe.core.adapters.verification_wrapper.find_quick_fix", + return_value=None, + ), + ): + # Gate sequence: fail → pass (after adapter correction) + mock_gates.side_effect = [failing, passing] + + events_captured: list[AgentEvent] = [] + wrapper = VerificationWrapper( + mock_inner_adapter, mock_workspace, max_correction_rounds=5, + ) + result = wrapper.run( + "t1", "implement feature X", Path("/tmp"), + on_event=events_captured.append, + ) + + assert result.status == "completed" + assert mock_inner_adapter.run.call_count == 2 + + # Verify event sequence + event_types = [e.type for e in events_captured] + assert "verification" in event_types + assert "verification_failed" in event_types + assert "verification_passed" in event_types + + # Verify error context was included in correction prompt + correction_prompt = mock_inner_adapter.run.call_args_list[1][0][1] + assert "Correction Round 1" in correction_prompt + assert "ruff" in correction_prompt