Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 46 additions & 20 deletions sigma-agent/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,30 +145,55 @@ On a host with two A100s, `/metrics` will then contain (in addition to the usual
# TYPE sigma_gpu_utilization_percent gauge
sigma_gpu_utilization_percent{hostname="ml-01",gpu_index="0",gpu_name="NVIDIA A100"} 45
sigma_gpu_utilization_percent{hostname="ml-01",gpu_index="1",gpu_name="NVIDIA A100"} 30

# HELP sigma_gpu_memory_used_mib GPU memory used in MiB (nvidia-smi)
# TYPE sigma_gpu_memory_used_mib gauge
sigma_gpu_memory_used_mib{hostname="ml-01",gpu_index="0",gpu_name="NVIDIA A100"} 12000
sigma_gpu_memory_used_mib{hostname="ml-01",gpu_index="1",gpu_name="NVIDIA A100"} 8000

# HELP sigma_gpu_memory_total_mib GPU total memory in MiB (nvidia-smi)
# TYPE sigma_gpu_memory_total_mib gauge
sigma_gpu_memory_total_mib{hostname="ml-01",gpu_index="0",gpu_name="NVIDIA A100"} 81920
sigma_gpu_memory_total_mib{hostname="ml-01",gpu_index="1",gpu_name="NVIDIA A100"} 81920

# HELP sigma_gpu_temperature_celsius GPU temperature in Celsius (nvidia-smi)
# TYPE sigma_gpu_temperature_celsius gauge
sigma_gpu_temperature_celsius{hostname="ml-01",gpu_index="0",gpu_name="NVIDIA A100"} 65
sigma_gpu_temperature_celsius{hostname="ml-01",gpu_index="1",gpu_name="NVIDIA A100"} 60

# HELP sigma_gpu_power_watts GPU power draw in Watts (nvidia-smi)
# TYPE sigma_gpu_power_watts gauge
sigma_gpu_power_watts{hostname="ml-01",gpu_index="0",gpu_name="NVIDIA A100"} 250
sigma_gpu_power_watts{hostname="ml-01",gpu_index="1",gpu_name="NVIDIA A100"} 220
```

The same snapshot is also reachable via the MCP `query_gpu_metrics` tool (see below), which returns `{"enabled": false, "gpus": []}` on agents without `--gpu-metrics`.

## Backend Health Probing

When `--health-probe` is enabled, the agent runs a periodic **TCP-connect probe** against every active Envoy upstream defined in `envoy_routes` for this VPS. This provides **data plane validation independent of Envoy's own health checks**: if Envoy thinks an upstream is healthy but the agent can't reach the backend host:port from the same machine, something is wrong at L3/L4 (firewall, routing, DNS, container networking, backend service down).

### How It Works

1. Every `--health-probe-interval` seconds, the agent fetches active `envoy_nodes` for this VPS, then active `envoy_routes` for each node — reusing the same sigma-api endpoints the xDS subsystem polls.
2. For each route, it performs a **`tokio::net::TcpStream::connect`** against `backend_host:backend_port` wrapped in a `tokio::time::timeout` (per-probe ceiling: **3 seconds**).
3. Records `(reachable, latency_ms, error)` and writes the complete result vector to a shared `Arc<RwLock<Vec<_>>>` consumed by `/metrics` and the MCP tool surface.

### Sequential by design

Routes are probed **one at a time** in a single loop — the agent does **not** spawn one task per backend. The reason is the resource budget: agents run on VPS instances as small as 1 vCPU / 512 MB alongside the primary VPN workload. Fan-out probing of hundreds of upstreams would create unbounded CPU bursts and connection churn, defeating the <1% steady-state CPU target. With the 3 s ceiling per probe, even 100 dead backends complete a cycle in <5 min wall time, which is well within the default 30 s interval expectation for small fleets and acceptable for larger ones.

### Configuration

| Env var | CLI flag | Default | Description |
|---------|----------|---------|-------------|
| `AGENT_HEALTH_PROBE` | `--health-probe` | `false` | Enable backend health probing |
| `AGENT_HEALTH_PROBE_INTERVAL` | `--health-probe-interval` | `30` | Interval between probe cycles (seconds) |

Requires successful VPS registration (`vps_id` known) — if registration failed at startup, the probe loop is not spawned and a warning is logged.

### Prometheus Metrics

Exposed on the existing `/metrics` endpoint when probing is enabled and the first cycle has completed:

```
# HELP sigma_backend_reachable Backend reachability from agent TCP probe (1=reachable, 0=unreachable)
# TYPE sigma_backend_reachable gauge
sigma_backend_reachable{hostname="relay-01",route="vmess-vn01",listen_port="11001",backend="10.0.0.5:443"} 1
sigma_backend_reachable{hostname="relay-01",route="trojan-vn02",listen_port="11002",backend="10.0.0.6:443"} 0

# HELP sigma_backend_probe_latency_ms Time taken to TCP-connect to the backend (milliseconds)
# TYPE sigma_backend_probe_latency_ms gauge
sigma_backend_probe_latency_ms{hostname="relay-01",route="vmess-vn01",listen_port="11001",backend="10.0.0.5:443"} 8
sigma_backend_probe_latency_ms{hostname="relay-01",route="trojan-vn02",listen_port="11002",backend="10.0.0.6:443"} 3001
```

Unreachable backends still appear with `reachable=0` and a `latency_ms` reflecting either the connect failure time or the 3 s timeout.

### MCP Tool

`query_backend_health` returns the latest snapshot. Pass `unreachable_only: true` to filter to failing probes only — useful for incident triage queries from sigma-api's LLM-assisted workflows.

## MCP Tool Surface (LLM-callable control plane)

When `--mcp-enabled` is set, the agent runs a [Model Context Protocol](https://modelcontextprotocol.io) server at `POST /mcp` (JSON-RPC 2.0). This exposes agent capabilities as **tools that an external LLM can call** — e.g., an SRE assistant in `sigma-api` invoking `query_ebpf_traffic` during incident triage, or an automation calling `allocate_ports` when provisioning new Envoy routes.
Expand All @@ -187,6 +212,7 @@ When `--mcp-enabled` is set, the agent runs a [Model Context Protocol](https://m
| `query_envoy_routes` | `source?` (dynamic/static/all) | Envoy routes for this VPS via sigma-api |
| `query_dns_leaks` | `min_queries?` (default 1) | Processes sending UDP to port 53 |
| `query_gpu_metrics` | none | Per-GPU utilization/memory/temp/power (nvidia-smi) |
| `query_backend_health` | `unreachable_only?` (default false) | Latest TCP-probe snapshot of Envoy upstreams |

Tools that depend on capabilities (port scan, eBPF, registration) return a structured `isError` or `enabled=false` payload when their dependency is not configured — they do not break the MCP session.

Expand Down
8 changes: 8 additions & 0 deletions sigma-agent/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,14 @@ pub struct Config {
#[arg(long, env = "AGENT_GPU_METRICS_INTERVAL", default_value = "60")]
pub gpu_metrics_interval: u64,

/// Enable backend health probing (TCP connect test on each Envoy upstream)
#[arg(long, env = "AGENT_HEALTH_PROBE", default_value = "false")]
pub health_probe: bool,

/// Health probe interval in seconds
#[arg(long, env = "AGENT_HEALTH_PROBE_INTERVAL", default_value = "30")]
pub health_probe_interval: u64,

/// Enable MCP (Model Context Protocol) server — exposes agent capabilities as LLM-callable tools
#[arg(long, env = "AGENT_MCP_ENABLED", default_value = "false")]
pub mcp_enabled: bool,
Expand Down
262 changes: 262 additions & 0 deletions sigma-agent/src/health_probe.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,262 @@
//! Backend health probing for Envoy upstreams.
//!
//! Periodically fetches the active `envoy_routes` for this VPS (via the same
//! sigma-api endpoints the xDS subsystem polls) and performs a **sequential**
//! TCP connect test against each route's `backend_host:backend_port`. Results
//! are written to a shared snapshot consumed by `/metrics` and the MCP tool
//! surface.
//!
//! ## Why sequential?
//!
//! The agent runs on VPS instances as small as 1 vCPU / 512 MB alongside the
//! primary VPN workload. Spawning one task per backend would create unbounded
//! CPU bursts and connection churn on routes with hundreds of upstreams.
//! Instead, we iterate routes one at a time with a per-probe 3 s ceiling —
//! worst-case CPU for one cycle is `O(N * timeout)` wall time but ≈ 0% CPU
//! steady-state since each probe is blocked on the network.
//!
//! ## Failure model
//!
//! - Per-probe TCP connect timeout is bounded (default 3 s, never higher).
//! - On timeout / connection refused / DNS failure we record `reachable=false`
//! plus the error string and move on. The probe loop never aborts.
//! - This is **data plane validation** — independent of Envoy's own active
//! health checks. If Envoy thinks an upstream is healthy but the agent
//! can't TCP-connect from the same host, something is wrong at L3/L4
//! (firewall, route, DNS, container networking).

use std::sync::Arc;
use std::time::{Duration, Instant};

use serde::Serialize;
use tokio::net::TcpStream;
use tokio::sync::RwLock;
use tokio::time::timeout;
use tracing::{debug, info, warn};

use crate::client::SigmaClient;
use crate::models::{EnvoyNode, EnvoyRoute, PaginatedResponse};

/// Per-probe TCP connect timeout ceiling. The configured value is clamped to
/// this maximum — a longer timeout would let one stuck backend block the
/// entire sequential cycle for too long.
const MAX_PROBE_TIMEOUT_SECS: u64 = 3;

/// Snapshot of a single backend probe result.
#[derive(Serialize, Clone, Debug)]
pub struct BackendProbeResult {
pub route_name: String,
pub listen_port: i32,
pub backend: String,
pub reachable: bool,
pub latency_ms: u64,
pub checked_at: chrono::DateTime<chrono::Utc>,
pub error: Option<String>,
}

/// Shared snapshot of the latest complete probe cycle.
pub type SharedProbeResults = Arc<RwLock<Vec<BackendProbeResult>>>;

/// Perform a single TCP-connect probe against `host:port`.
///
/// Returns `(reachable, latency_ms, error)`:
/// - `reachable=true` on a successful connect within `timeout_secs`.
/// - `reachable=false` with an error string on connect failure, DNS failure,
/// or timeout.
///
/// The timeout is clamped to `MAX_PROBE_TIMEOUT_SECS` regardless of input.
pub async fn probe_backend(
host: &str,
port: u16,
timeout_secs: u64,
) -> (bool, u64, Option<String>) {
let effective_timeout = timeout_secs.min(MAX_PROBE_TIMEOUT_SECS).max(1);
let addr = format!("{}:{}", host, port);
let start = Instant::now();

match timeout(
Duration::from_secs(effective_timeout),
TcpStream::connect(&addr),
)
.await
{
Ok(Ok(_stream)) => {
// Drop the stream immediately — we only care about connect success.
let elapsed_ms = start.elapsed().as_millis() as u64;
(true, elapsed_ms, None)
}
Ok(Err(e)) => {
let elapsed_ms = start.elapsed().as_millis() as u64;
(false, elapsed_ms, Some(e.to_string()))
}
Err(_) => {
// tokio::time::timeout elapsed
let elapsed_ms = start.elapsed().as_millis() as u64;
(
false,
elapsed_ms,
Some(format!("connect timeout after {}s", effective_timeout)),
)
}
}
}

/// Probe loop entry point — runs forever.
///
/// Every `interval_secs`:
/// 1. Fetch active envoy_nodes for `vps_id`.
/// 2. For each node, fetch active envoy_routes.
/// 3. For each route, sequentially TCP-probe `backend_host:backend_port`
/// with the bounded timeout.
/// 4. Replace `shared` with the complete fresh result vector (single write).
///
/// API fetch failures log a warning and skip the cycle (no partial overwrite).
pub async fn probe_loop(
client: Arc<SigmaClient>,
vps_id: uuid::Uuid,
shared: SharedProbeResults,
interval_secs: u64,
) {
info!(
vps_id = %vps_id,
interval = interval_secs,
timeout_secs = MAX_PROBE_TIMEOUT_SECS,
"Backend health probe loop starting (sequential)"
);

loop {
match run_probe_cycle(&client, vps_id).await {
Ok(results) => {
let total = results.len();
let reachable = results.iter().filter(|r| r.reachable).count();
debug!(
total,
reachable,
unreachable = total - reachable,
"Probe cycle complete"
);
let mut guard = shared.write().await;
*guard = results;
}
Err(e) => {
warn!("Probe cycle failed: {:#}", e);
}
}

tokio::time::sleep(Duration::from_secs(interval_secs)).await;
}
}

/// Execute one probe cycle: fetch nodes → routes → probe each sequentially.
async fn run_probe_cycle(
client: &SigmaClient,
vps_id: uuid::Uuid,
) -> anyhow::Result<Vec<BackendProbeResult>> {
let nodes: PaginatedResponse<EnvoyNode> = client
.get(&format!(
"/envoy-nodes?vps_id={}&status=active&per_page=100",
vps_id
))
.await?;

let mut results = Vec::new();

for node in &nodes.data {
let routes: PaginatedResponse<EnvoyRoute> = match client
.get(&format!(
"/envoy-routes?envoy_node_id={}&status=active&per_page=1000",
node.id
))
.await
{
Ok(r) => r,
Err(e) => {
warn!(
node_id = %node.node_id,
"Failed to fetch routes for envoy node, skipping: {:#}",
e
);
continue;
}
};

for route in &routes.data {
// Skip routes without a resolvable backend (e.g. malformed entries).
let (host, port) = match (route.backend_host.as_deref(), route.backend_port) {
(Some(h), Some(p)) if !h.is_empty() && p > 0 && p <= u16::MAX as i32 => {
(h, p as u16)
}
_ => {
results.push(BackendProbeResult {
route_name: route.name.clone(),
listen_port: route.listen_port,
backend: format!(
"{}:{}",
route.backend_host.as_deref().unwrap_or("-"),
route.backend_port.unwrap_or(0)
),
reachable: false,
latency_ms: 0,
checked_at: chrono::Utc::now(),
error: Some("missing or invalid backend_host/backend_port".to_string()),
});
continue;
}
};

// SEQUENTIAL probe — bounded by MAX_PROBE_TIMEOUT_SECS.
let (reachable, latency_ms, error) =
probe_backend(host, port, MAX_PROBE_TIMEOUT_SECS).await;

results.push(BackendProbeResult {
route_name: route.name.clone(),
listen_port: route.listen_port,
backend: format!("{}:{}", host, port),
reachable,
latency_ms,
checked_at: chrono::Utc::now(),
error,
});
}
}

Ok(results)
}

#[cfg(test)]
mod tests {
use super::*;

#[tokio::test]
async fn probe_backend_closed_port_returns_error() {
// Port 1 is privileged and almost never has a listener on a dev/CI
// machine — connect should fail (ECONNREFUSED) or, if the kernel
// silently drops, time out. Either way we expect reachable=false +
// an error message, with latency_ms < timeout * 1000.
let (reachable, latency_ms, error) = probe_backend("127.0.0.1", 1, 1).await;
assert!(!reachable, "expected unreachable for closed port 1");
assert!(error.is_some(), "expected an error message");
// 1s timeout ceiling — give a generous slack for slow CI.
assert!(
latency_ms <= 2_000,
"latency_ms={} exceeded reasonable bound",
latency_ms
);
}

#[tokio::test]
async fn probe_backend_clamps_timeout() {
// Even if caller asks for 600s, we cap at MAX_PROBE_TIMEOUT_SECS.
// Probe a closed port and verify we return well before 600s.
let start = Instant::now();
let (reachable, _, _) = probe_backend("127.0.0.1", 1, 600).await;
let elapsed = start.elapsed();
assert!(!reachable);
assert!(
elapsed <= Duration::from_secs(MAX_PROBE_TIMEOUT_SECS + 1),
"probe took {:?}, expected <= {}s",
elapsed,
MAX_PROBE_TIMEOUT_SECS + 1
);
}
}
Loading
Loading