diff --git a/galaxy/client/support/status_manager.py b/galaxy/client/support/status_manager.py index a5b149c0e..3fd50c8fd 100644 --- a/galaxy/client/support/status_manager.py +++ b/galaxy/client/support/status_manager.py @@ -9,7 +9,8 @@ """ import logging -from typing import Dict, List, Optional, Any +import time +from typing import Any, Dict, List, Optional from ..device_manager import ConstellationDeviceManager from ..config_loader import ConstellationConfig @@ -39,6 +40,10 @@ def __init__( self.pending_task_tracker = pending_task_tracker or {} self.logger = logging.getLogger(f"{__name__}.StatusManager") + # Completed task tracking: task_id -> {success, execution_time, completed_at} + self._completed_tasks: Dict[str, Dict[str, Any]] = {} + self._failed_tasks: Dict[str, Dict[str, Any]] = {} + def get_device_status(self, device_id: Optional[str] = None) -> Dict[str, Any]: """ Get device status information. @@ -165,23 +170,62 @@ def get_device_health_summary(self) -> Dict[str, Any]: return health_summary + def record_task_completion( + self, + task_id: str, + success: bool, + execution_time: Optional[float] = None, + ) -> None: + """ + Record the outcome of a completed task. + + Call this after each task finishes so that get_task_statistics() + can return accurate success rates and timing information. + + :param task_id: Unique task identifier + :param success: True if the task succeeded, False if it failed + :param execution_time: Wall-clock seconds the task took, or None if unknown + """ + record: Dict[str, Any] = { + "task_id": task_id, + "completed_at": time.time(), + "execution_time": execution_time, + } + if success: + self._completed_tasks[task_id] = record + else: + self._failed_tasks[task_id] = record + def get_task_statistics(self) -> Dict[str, Any]: """ Get task execution statistics. :return: Task statistics """ - # Note: This is a basic implementation. In a full system, you'd track - # completed tasks, success rates, execution times, etc. + total_done = len(self._completed_tasks) + len(self._failed_tasks) + success_rate = ( + len(self._completed_tasks) / total_done if total_done > 0 else 0.0 + ) + + execution_times = [ + r["execution_time"] + for r in list(self._completed_tasks.values()) + + list(self._failed_tasks.values()) + if r.get("execution_time") is not None + ] + average_execution_time = ( + sum(execution_times) / len(execution_times) if execution_times else 0.0 + ) + return { "pending_tasks": len(self.pending_task_tracker), "task_queue_health": ( "healthy" if len(self.pending_task_tracker) < 100 else "overloaded" ), - # TODO: Add completed task tracking - "completed_tasks": 0, - "success_rate": 0.0, - "average_execution_time": 0.0, + "completed_tasks": len(self._completed_tasks), + "failed_tasks": len(self._failed_tasks), + "success_rate": round(success_rate, 3), + "average_execution_time": round(average_execution_time, 3), } def get_performance_metrics(self) -> Dict[str, Any]: