Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 51 additions & 7 deletions galaxy/client/support/status_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
"""

import logging
from typing import Dict, List, Optional, Any
import time
from typing import Any, Dict, List, Optional

from ..device_manager import ConstellationDeviceManager
from ..config_loader import ConstellationConfig
Expand Down Expand Up @@ -39,6 +40,10 @@ def __init__(
self.pending_task_tracker = pending_task_tracker or {}
self.logger = logging.getLogger(f"{__name__}.StatusManager")

# Completed task tracking: task_id -> {success, execution_time, completed_at}
self._completed_tasks: Dict[str, Dict[str, Any]] = {}
self._failed_tasks: Dict[str, Dict[str, Any]] = {}

def get_device_status(self, device_id: Optional[str] = None) -> Dict[str, Any]:
"""
Get device status information.
Expand Down Expand Up @@ -165,23 +170,62 @@ def get_device_health_summary(self) -> Dict[str, Any]:

return health_summary

def record_task_completion(
self,
task_id: str,
success: bool,
execution_time: Optional[float] = None,
) -> None:
"""
Record the outcome of a completed task.

Call this after each task finishes so that get_task_statistics()
can return accurate success rates and timing information.

:param task_id: Unique task identifier
:param success: True if the task succeeded, False if it failed
:param execution_time: Wall-clock seconds the task took, or None if unknown
"""
record: Dict[str, Any] = {
"task_id": task_id,
"completed_at": time.time(),
"execution_time": execution_time,
}
if success:
self._completed_tasks[task_id] = record
else:
self._failed_tasks[task_id] = record

def get_task_statistics(self) -> Dict[str, Any]:
"""
Get task execution statistics.

:return: Task statistics
"""
# Note: This is a basic implementation. In a full system, you'd track
# completed tasks, success rates, execution times, etc.
total_done = len(self._completed_tasks) + len(self._failed_tasks)
success_rate = (
len(self._completed_tasks) / total_done if total_done > 0 else 0.0
)

execution_times = [
r["execution_time"]
for r in list(self._completed_tasks.values())
+ list(self._failed_tasks.values())
if r.get("execution_time") is not None
]
average_execution_time = (
sum(execution_times) / len(execution_times) if execution_times else 0.0
)

return {
"pending_tasks": len(self.pending_task_tracker),
"task_queue_health": (
"healthy" if len(self.pending_task_tracker) < 100 else "overloaded"
),
# TODO: Add completed task tracking
"completed_tasks": 0,
"success_rate": 0.0,
"average_execution_time": 0.0,
"completed_tasks": len(self._completed_tasks),
"failed_tasks": len(self._failed_tasks),
"success_rate": round(success_rate, 3),
"average_execution_time": round(average_execution_time, 3),
}

def get_performance_metrics(self) -> Dict[str, Any]:
Expand Down