Skip to content

hyperflow-wms/execution-sentinel

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

3 Commits
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Execution Sentinel

Event-driven monitor for HyperFlow workflows running on Kubernetes. Polls K8s Jobs and Pods, detects anomalies, and streams structured reports into an asyncio.Queue.

What it detects

Event Description
PROGRESS Change in completed task count; DAG phase transitions (started / completed)
ANOMALY_OOMKILL Pod killed by OOMKill; proactive risk warning for other active jobs of the same type
ANOMALY_STRAGGLER Job running longer than straggler_multiplier × expected_duration
MASS_FAILURE Failed task ratio exceeded mass_failure_threshold
CONNECTIVITY_LOSS Three consecutive kubectl errors
COMPLETION Workflow finished (.workflow-exit-code signal file detected in engine pod)

Installation

cd execution-sentinel
pip install -e .

With dev dependencies (pytest, mypy, ruff):

pip install -e ".[dev]"

Usage as a library

workflow-conductor uses Execution Sentinel in its monitoring phase to watch a running HyperFlow workflow, translate reports to natural language, and display them to the user.

from execution_sentinel.config import SentinelSettings
from execution_sentinel.models import MonitoringContext, TaskSpec
from execution_sentinel.sentinel import Sentinel

context = MonitoringContext(
    namespace="wf-1000g-20260221",
    engine_pod_name="hyperflow-engine-0",
    engine_container="hyperflow",
    task_inventory=[
        TaskSpec(name="sifting-0", task_type="sifting", dag_order=2, expected_duration_seconds=45.0),
        TaskSpec(name="frequency-0", task_type="frequency", dag_order=4, expected_duration_seconds=15.0),
    ],
    dag_structure={"sifting": [], "frequency": ["sifting"]},
)

settings = SentinelSettings(poll_interval=10, timeout=3600)
sentinel = Sentinel(context, settings)

# Reports are streamed into sentinel.reports (asyncio.Queue)
summary = await sentinel.watch()
print(f"Completed: {summary.completed_tasks}/{summary.total_tasks}")

CLI

# From a JSON file
execution-sentinel watch --context context.json

# From stdin
cat context.json | execution-sentinel watch --context -

# Override poll interval and timeout
execution-sentinel watch --context context.json --poll 5 --timeout 7200

context.json is a serialized MonitoringContext:

{
  "namespace": "wf-1000g-20260221",
  "engine_pod_name": "hyperflow-engine-0",
  "engine_container": "hyperflow",
  "task_inventory": [
    {"name": "sifting-0", "task_type": "sifting", "dag_order": 2, "expected_duration_seconds": 45.0}
  ],
  "dag_structure": {"sifting": []}
}

Configuration

Via environment variables (prefix SENTINEL_) or a .env file:

Variable Default Description
SENTINEL_POLL_INTERVAL 10 Polling interval in seconds
SENTINEL_TIMEOUT 3600 Maximum watch duration in seconds
SENTINEL_STRAGGLER_MULTIPLIER 3.0 Straggler threshold (× expected duration)
SENTINEL_MASS_FAILURE_THRESHOLD 0.3 Failed task ratio to trigger escalation
SENTINEL_LOG_LEVEL INFO Log level
SENTINEL_KUBECONFIG "" Path to kubeconfig (empty = default)

Tests

# Unit tests — no cluster required
pytest tests/unit/ -v

# Integration tests — require a running cluster
pytest tests/integration/ -v

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages