Event-driven monitor for HyperFlow workflows running on Kubernetes. Polls K8s Jobs and Pods, detects anomalies, and streams structured reports into an asyncio.Queue.
| Event | Description |
|---|---|
PROGRESS |
Change in completed task count; DAG phase transitions (started / completed) |
ANOMALY_OOMKILL |
Pod killed by OOMKill; proactive risk warning for other active jobs of the same type |
ANOMALY_STRAGGLER |
Job running longer than straggler_multiplier × expected_duration |
MASS_FAILURE |
Failed task ratio exceeded mass_failure_threshold |
CONNECTIVITY_LOSS |
Three consecutive kubectl errors |
COMPLETION |
Workflow finished (.workflow-exit-code signal file detected in engine pod) |
cd execution-sentinel
pip install -e .With dev dependencies (pytest, mypy, ruff):
pip install -e ".[dev]"workflow-conductor uses Execution Sentinel in its monitoring phase to watch a running HyperFlow workflow, translate reports to natural language, and display them to the user.
from execution_sentinel.config import SentinelSettings
from execution_sentinel.models import MonitoringContext, TaskSpec
from execution_sentinel.sentinel import Sentinel
context = MonitoringContext(
namespace="wf-1000g-20260221",
engine_pod_name="hyperflow-engine-0",
engine_container="hyperflow",
task_inventory=[
TaskSpec(name="sifting-0", task_type="sifting", dag_order=2, expected_duration_seconds=45.0),
TaskSpec(name="frequency-0", task_type="frequency", dag_order=4, expected_duration_seconds=15.0),
],
dag_structure={"sifting": [], "frequency": ["sifting"]},
)
settings = SentinelSettings(poll_interval=10, timeout=3600)
sentinel = Sentinel(context, settings)
# Reports are streamed into sentinel.reports (asyncio.Queue)
summary = await sentinel.watch()
print(f"Completed: {summary.completed_tasks}/{summary.total_tasks}")# From a JSON file
execution-sentinel watch --context context.json
# From stdin
cat context.json | execution-sentinel watch --context -
# Override poll interval and timeout
execution-sentinel watch --context context.json --poll 5 --timeout 7200context.json is a serialized MonitoringContext:
{
"namespace": "wf-1000g-20260221",
"engine_pod_name": "hyperflow-engine-0",
"engine_container": "hyperflow",
"task_inventory": [
{"name": "sifting-0", "task_type": "sifting", "dag_order": 2, "expected_duration_seconds": 45.0}
],
"dag_structure": {"sifting": []}
}Via environment variables (prefix SENTINEL_) or a .env file:
| Variable | Default | Description |
|---|---|---|
SENTINEL_POLL_INTERVAL |
10 |
Polling interval in seconds |
SENTINEL_TIMEOUT |
3600 |
Maximum watch duration in seconds |
SENTINEL_STRAGGLER_MULTIPLIER |
3.0 |
Straggler threshold (× expected duration) |
SENTINEL_MASS_FAILURE_THRESHOLD |
0.3 |
Failed task ratio to trigger escalation |
SENTINEL_LOG_LEVEL |
INFO |
Log level |
SENTINEL_KUBECONFIG |
"" |
Path to kubeconfig (empty = default) |
# Unit tests — no cluster required
pytest tests/unit/ -v
# Integration tests — require a running cluster
pytest tests/integration/ -v