Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 140 additions & 11 deletions codeframe/core/adapters/verification_wrapper.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,53 @@
"""Verification gate wrapper for agent adapters."""
"""Verification gate wrapper for agent adapters.

Wraps any AgentAdapter with post-execution verification gates, quick fixes,
fix attempt tracking, and escalation to blockers. This gives all execution
engines (built-in and external) the same self-correction capabilities that
ReactAgent has internally.
"""

from __future__ import annotations

import logging
from pathlib import Path
from typing import Callable, Optional

from codeframe.core.adapters.agent_adapter import AgentAdapter, AgentEvent, AgentResult
from codeframe.core import blockers
from codeframe.core.fix_tracker import (
EscalationDecision,
FixAttemptTracker,
FixOutcome,
build_escalation_question,
)
from codeframe.core.gates import GateStatus
from codeframe.core.gates import run as run_gates
from codeframe.core.quick_fixes import apply_quick_fix, find_quick_fix
from codeframe.core.workspace import Workspace

logger = logging.getLogger(__name__)


class VerificationWrapper:
"""Wraps any AgentAdapter with post-execution verification gates.

After the inner adapter completes, runs verification gates (pytest, ruff, etc.).
If gates fail, re-invokes the adapter with error context for self-correction,
up to max_correction_rounds times.
If gates fail:
1. Try a pattern-based quick fix (no LLM needed)
2. If quick fix applied, re-run gates immediately
3. If no quick fix, re-invoke adapter with error context for self-correction
4. Track all fix attempts to detect loops
5. Escalate to blocker when fix tracker recommends it or retries exhausted

This is the same self-correction loop that ReactAgent._run_final_verification()
This is the same self-correction pattern that ReactAgent._run_final_verification()
uses, but decoupled from any specific engine so it wraps any adapter.
"""

def __init__(
self,
inner: AgentAdapter,
workspace: Workspace,
max_correction_rounds: int = 3,
max_correction_rounds: int = 5,
gate_names: Optional[list[str]] = None,
verbose: bool = False,
) -> None:
Expand All @@ -35,6 +56,7 @@ def __init__(
self._max_correction_rounds = max_correction_rounds
self._gate_names = gate_names # None = use default gates
self._verbose = verbose
self.fix_tracker = FixAttemptTracker()

Comment on lines +59 to 60
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Reset the fix tracker at the start of each run().

FixAttemptTracker is stored on the wrapper instance, so failures from a previous task can bleed into the next run() and trigger an immediate escalation or early exhaustion on a fresh task.

Suggested fix
 def run(
     self,
     task_id: str,
     prompt: str,
     workspace_path: Path,
     on_event: Callable[[AgentEvent], None] | None = None,
 ) -> AgentResult:
     """Run the inner adapter, then verify with gates. Self-correct on failure."""
+    self.fix_tracker.reset()
 
     # Initial run
     result = self._inner.run(task_id, prompt, workspace_path, on_event)

Also applies to: 65-75

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@codeframe/core/adapters/verification_wrapper.py` around lines 59 - 60, Reset
the instance's FixAttemptTracker at the start of each run() to avoid bleed-over
from prior tasks: inside the VerificationWrapper.run method (or whichever class
defines run), reassign self.fix_tracker = FixAttemptTracker() at the start of
the method (and similarly before any per-task loop that may call escalation
logic) so each invocation begins with a fresh tracker and previous failures
won't trigger immediate escalation or exhaustion; update references to
self.fix_tracker accordingly.

@property
def name(self) -> str:
Expand Down Expand Up @@ -78,7 +100,12 @@ def run(
on_event(AgentEvent(type="verification_passed", data={}))
return result

# Gates failed -- build correction prompt and re-invoke
# Gates failed — get structured error summary
error_summary = (
gate_result.get_error_summary() or
self._format_gate_errors(gate_result)
)

if on_event:
on_event(AgentEvent(
type="verification_failed",
Expand All @@ -88,12 +115,40 @@ def run(
},
))

error_summary = self._format_gate_errors(gate_result)
# 1. Record the attempt (outcome deferred until we know if fix works)
self.fix_tracker.record_attempt(error_summary, "verification_gate")

# 2. Check escalation based on prior history
escalation = self.fix_tracker.should_escalate(error_summary)
if escalation.should_escalate:
self.fix_tracker.record_outcome(
error_summary, "verification_gate", FixOutcome.FAILED,
)
return self._create_escalation_blocker(
task_id, error_summary, escalation,
last_output=result.output,
)

# 3. Try quick fix first (no adapter re-invocation needed)
if self._try_quick_fix(error_summary):
self.fix_tracker.record_outcome(
error_summary, "verification_gate", FixOutcome.SUCCESS,
)
self._verbose_print(
f"[VerificationWrapper] Quick fix applied (round {round_num + 1})"
)
continue # Re-run gates without re-invoking adapter

# 4. No quick fix — record failure and re-invoke adapter with error context
self.fix_tracker.record_outcome(
error_summary, "verification_gate", FixOutcome.FAILED,
)
Comment on lines +118 to +145
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don't mark a quick-fix path successful before the rerun actually passes.

Here a quick fix is counted as SUCCESS as soon as it applies, even though the next gate run may still fail with the same error. That leaves no failed record for the attempted quick fix, so the tracker can't reliably prevent retrying the same bad fix or escalate on repeated failures.

Based on learnings, self-correction attempts must use core/fix_tracker.py to prevent repeating failed fixes.

Also applies to: 177-189

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@codeframe/core/adapters/verification_wrapper.py` around lines 118 - 145, The
quick-fix path in verification_wrapper.py currently marks a fix as
FixOutcome.SUCCESS as soon as _try_quick_fix(error_summary) returns true;
instead, leave only the initial attempt recorded (use
fix_tracker.record_attempt) and defer calling fix_tracker.record_outcome(...,
FixOutcome.SUCCESS) until after the subsequent gate re-run actually succeeds; if
the re-run still fails, record FixOutcome.FAILED for that same error_summary and
verification_gate so the fix_tracker can prevent repeating bad fixes or
escalate; apply the same change to the identical logic around the other
quick-fix block referenced (the block at lines ~177-189) so both quick-fix
branches only record SUCCESS after a verified successful re-run.

formatted_errors = self._format_gate_errors(gate_result)
correction_prompt = (
f"{prompt}\n\n"
f"## Verification Gate Failures (Correction Round {round_num + 1})\n\n"
f"Your previous changes failed the following verification gates. "
f"Fix these issues:\n\n{error_summary}"
f"Fix these issues:\n\n{formatted_errors}"
)

result = self._inner.run(
Expand All @@ -113,17 +168,91 @@ def run(
if gate_result.passed:
return result

# All rounds exhausted, gates still failing
# All rounds exhausted — create blocker
error_summary = self._format_gate_errors(gate_result)
return self._create_exhaustion_blocker(
task_id, error_summary, last_output=result.output,
)

def _try_quick_fix(self, error_summary: str) -> bool:
"""Attempt a pattern-based quick fix for the gate error.

Returns True if a fix was successfully applied.
"""
fix = find_quick_fix(error_summary, repo_path=self._workspace.repo_path)
if fix is None:
return False

success, msg = apply_quick_fix(fix, self._workspace.repo_path)
if success:
self._verbose_print(f"[VerificationWrapper] Quick fix: {msg}")
return success

def _create_escalation_blocker(
self,
task_id: str,
error: str,
escalation: EscalationDecision,
last_output: str = "",
) -> AgentResult:
"""Create a blocker when fix tracker recommends escalation."""
question = build_escalation_question(
error, escalation.reason, self.fix_tracker,
)

try:
blockers.create(
workspace=self._workspace,
question=question,
task_id=task_id,
)
except Exception:
logger.warning("Failed to create escalation blocker", exc_info=True)

return AgentResult(
status="failed",
output=result.output,
status="blocked",
output=last_output,
blocker_question=question,
error=f"Escalated to blocker: {escalation.reason}",
)

def _create_exhaustion_blocker(
self,
task_id: str,
error_summary: str,
last_output: str = "",
) -> AgentResult:
"""Create a blocker when all correction rounds are exhausted."""
question = (
f"Verification gates still failing after "
f"{self._max_correction_rounds} correction rounds.\n\n"
f"Errors:\n{error_summary[:500]}\n\n"
f"Please investigate and provide guidance."
)

try:
blockers.create(
workspace=self._workspace,
question=question,
task_id=task_id,
)
except Exception:
logger.warning("Failed to create exhaustion blocker", exc_info=True)

return AgentResult(
status="blocked",
output=last_output,
blocker_question=question,
error=(
f"Verification gates still failing after "
f"{self._max_correction_rounds} correction rounds:\n{error_summary}"
),
)

def _verbose_print(self, msg: str) -> None:
if self._verbose:
print(msg)

@staticmethod
def _format_gate_errors(gate_result) -> str:
"""Format gate check failures into a readable summary."""
Expand Down
25 changes: 25 additions & 0 deletions codeframe/core/fix_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -453,3 +453,28 @@ def from_dict(cls, data: dict) -> "FixAttemptTracker":
tracker._file_counts = dict(data.get("file_counts", {}))

return tracker


def build_escalation_question(
error: str,
escalation_reason: str,
fix_tracker: FixAttemptTracker,
) -> str:
"""Build a human-readable blocker question for escalation.

Shared by VerificationWrapper and ReactAgent to produce consistent
escalation blocker messages.
"""
context = fix_tracker.get_blocker_context(error)
attempted = context.get("attempted_fixes", [])
attempted_str = (
"\n".join(f" - {f}" for f in attempted) if attempted else " (none)"
)
return (
f"Verification keeps failing and automated fixes are not working.\n\n"
f"Error: {error[:300]}\n\n"
f"Reason for escalation: {escalation_reason}\n\n"
f"Fixes already attempted:\n{attempted_str}\n\n"
f"Total failures in this run: {context.get('total_run_failures', 0)}\n\n"
f"Please investigate and provide guidance."
)
20 changes: 8 additions & 12 deletions codeframe/core/react_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,12 @@
from codeframe.core.blocker_detection import classify_error_for_blocker
from codeframe.core.context import ContextLoader, TaskContext
from codeframe.core.events import EventType
from codeframe.core.fix_tracker import EscalationDecision, FixAttemptTracker, FixOutcome
from codeframe.core.fix_tracker import (
EscalationDecision,
FixAttemptTracker,
FixOutcome,
build_escalation_question,
)
from codeframe.core.stall_detector import StallAction, StallDetectedError
from codeframe.core.stall_monitor import StallEvent, StallMonitor
from codeframe.core.models import AgentPhase, CompletionEvent, ErrorEvent, ProgressEvent
Expand Down Expand Up @@ -1029,17 +1034,8 @@ def _create_escalation_blocker(
the run record to the blocker. If creation fails the exception
propagates — callers in ``run()`` catch it and return FAILED.
"""
context = self.fix_tracker.get_blocker_context(error)
attempted = context.get("attempted_fixes", [])
attempted_str = "\n".join(f" - {f}" for f in attempted) if attempted else " (none)"

question = (
f"Verification keeps failing and automated fixes are not working.\n\n"
f"Error: {error[:300]}\n\n"
f"Reason for escalation: {escalation.reason}\n\n"
f"Fixes already attempted:\n{attempted_str}\n\n"
f"Total failures in this run: {context.get('total_run_failures', 0)}\n\n"
f"Please investigate and provide guidance."
question = build_escalation_question(
error, escalation.reason, self.fix_tracker,
)
blocker = blockers.create(
workspace=self.workspace,
Expand Down
2 changes: 1 addition & 1 deletion codeframe/core/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,7 @@ def on_adapter_event(event: AdapterEvent) -> None:

adapter = get_external_adapter(engine)
wrapper = VerificationWrapper(
adapter, workspace, max_correction_rounds=3, verbose=verbose,
adapter, workspace, max_correction_rounds=5, verbose=verbose,
)

result = wrapper.run(
Expand Down
Loading
Loading