From 89c5d75e69f7c560dedc8f2e8f300157465000cb Mon Sep 17 00:00:00 2001 From: Marcel Date: Sat, 7 Feb 2026 19:00:04 +0100 Subject: [PATCH] feat: add stuck agent detection with 20-minute inactivity timeout Agents that hang without producing output are now automatically detected and killed after 20 minutes of inactivity. This prevents features from being stuck indefinitely when an agent hangs. Changes: - Add AGENT_INACTIVITY_TIMEOUT constant (1200 seconds) - Track last activity timestamp per agent - Kill and restart agents with no output for 20+ minutes - Clean up tracking on agent completion Co-Authored-By: Claude Opus 4.6 --- parallel_orchestrator.py | 80 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 80 insertions(+) diff --git a/parallel_orchestrator.py b/parallel_orchestrator.py index 856e33cb..cb6626ea 100644 --- a/parallel_orchestrator.py +++ b/parallel_orchestrator.py @@ -27,6 +27,7 @@ import subprocess import sys import threading +import time from datetime import datetime, timezone from pathlib import Path from typing import Any, Callable, Literal @@ -135,6 +136,7 @@ def _dump_database_state(feature_dicts: list[dict], label: str = ""): POLL_INTERVAL = 5 # seconds between checking for ready features MAX_FEATURE_RETRIES = 3 # Maximum times to retry a failed feature INITIALIZER_TIMEOUT = 1800 # 30 minutes timeout for initializer +AGENT_INACTIVITY_TIMEOUT = 1200 # 20 minutes - kill agents with no output activity class ParallelOrchestrator: @@ -199,6 +201,10 @@ def __init__( # Track feature failures to prevent infinite retry loops self._failure_counts: dict[int, int] = {} + # Track last activity time per agent for stuck detection + # Updated whenever an agent produces output + self._last_activity: dict[int, float] = {} + # Track recently tested feature IDs to avoid redundant re-testing. # Cleared when all passing features have been covered at least once. self._recently_tested: set[int] = set() @@ -867,6 +873,8 @@ def _spawn_coding_agent(self, feature_id: int) -> tuple[bool, str]: with self._lock: self.running_coding_agents[feature_id] = proc self.abort_events[feature_id] = abort_event + # Initialize activity timestamp for stuck detection + self._last_activity[feature_id] = time.time() # Start output reader thread threading.Thread( @@ -1027,6 +1035,8 @@ def _spawn_testing_agent(self) -> tuple[bool, str]: # when multiple agents test the same feature self.running_testing_agents[proc.pid] = (primary_feature_id, proc) testing_count = len(self.running_testing_agents) + # Initialize activity timestamp for stuck detection (negative PID to distinguish from coding) + self._last_activity[-proc.pid] = time.time() # Start output reader thread with primary feature ID for log attribution threading.Thread( @@ -1139,6 +1149,11 @@ def _read_output( if abort.is_set(): break line = line.rstrip() + # Update activity timestamp for stuck detection + if feature_id is not None: + activity_key = -proc.pid if agent_type == "testing" else feature_id + with self._lock: + self._last_activity[activity_key] = time.time() # Detect when a batch agent claims a new feature claim_match = self._CLAIM_FEATURE_PATTERN.search(line) if claim_match: @@ -1206,6 +1221,59 @@ async def _wait_for_agent_completion(self, timeout: float = POLL_INTERVAL): # Timeout reached without agent completion - this is normal, just check anyway pass + def _check_stuck_agents(self) -> list[int]: + """Check for and kill agents that have been inactive for too long. + + An agent is considered stuck if it hasn't produced any output for + AGENT_INACTIVITY_TIMEOUT seconds. This catches agents that hang without + crashing (e.g., waiting indefinitely for a response). + + Returns: + List of feature IDs that were killed due to inactivity. + """ + current_time = time.time() + killed_features = [] + + with self._lock: + # Check coding agents (keyed by feature_id) + for feature_id, proc in list(self.running_coding_agents.items()): + last_activity = self._last_activity.get(feature_id, current_time) + inactive_seconds = current_time - last_activity + + if inactive_seconds > AGENT_INACTIVITY_TIMEOUT: + inactive_minutes = int(inactive_seconds // 60) + print(f"WARNING: Feature #{feature_id} agent stuck - no output for {inactive_minutes} minutes. Killing...", flush=True) + debug_log.log("STUCK", f"Killing stuck coding agent for feature #{feature_id}", + inactive_minutes=inactive_minutes, + pid=proc.pid) + + # Kill the stuck agent + try: + kill_process_tree(proc, timeout=5.0) + except Exception as e: + debug_log.log("STUCK", f"Error killing stuck agent for feature #{feature_id}", error=str(e)) + + killed_features.append(feature_id) + + # Check testing agents (keyed by PID, activity keyed by -PID) + for pid, (feature_id, proc) in list(self.running_testing_agents.items()): + last_activity = self._last_activity.get(-pid, current_time) + inactive_seconds = current_time - last_activity + + if inactive_seconds > AGENT_INACTIVITY_TIMEOUT: + inactive_minutes = int(inactive_seconds // 60) + print(f"WARNING: Testing agent for feature #{feature_id} stuck - no output for {inactive_minutes} minutes. Killing...", flush=True) + debug_log.log("STUCK", f"Killing stuck testing agent for feature #{feature_id}", + inactive_minutes=inactive_minutes, + pid=proc.pid) + + try: + kill_process_tree(proc, timeout=5.0) + except Exception as e: + debug_log.log("STUCK", f"Error killing stuck testing agent for feature #{feature_id}", error=str(e)) + + return killed_features + def _on_agent_complete( self, feature_id: int | None, @@ -1228,6 +1296,8 @@ def _on_agent_complete( with self._lock: # Remove by PID self.running_testing_agents.pop(proc.pid, None) + # Clean up activity tracking (negative PID key for testing agents) + self._last_activity.pop(-proc.pid, None) status = "completed" if return_code == 0 else "failed" print(f"Feature #{feature_id} testing {status}", flush=True) @@ -1252,6 +1322,8 @@ def _on_agent_complete( self._feature_to_primary.pop(fid, None) self.running_coding_agents.pop(feature_id, None) self.abort_events.pop(feature_id, None) + # Clean up activity tracking + self._last_activity.pop(feature_id, None) all_feature_ids = batch_ids or [feature_id] @@ -1489,6 +1561,14 @@ async def run_loop(self): print("\nAll features complete!", flush=True) break + # Check for stuck agents (no output for AGENT_INACTIVITY_TIMEOUT) + stuck_features = self._check_stuck_agents() + if stuck_features: + debug_log.log("STUCK", f"Killed {len(stuck_features)} stuck agent(s)", + feature_ids=stuck_features) + # Brief pause to allow process cleanup before continuing + await asyncio.sleep(1) + # Maintain testing agents independently (runs every iteration) self._maintain_testing_agents(feature_dicts)