Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions sigma-agent/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,49 @@ Unreachable backends still appear with `reachable=0` and a `latency_ms` reflecti

`query_backend_health` returns the latest snapshot. Pass `unreachable_only: true` to filter to failing probes only — useful for incident triage queries from sigma-api's LLM-assisted workflows.

## Self-Update Watchdog

This watchdog **DETECTS** available updates. It **never** downloads, restarts, or applies anything. Operators are notified via Prometheus + MCP and act consciously.

The rationale is operator safety: a misconfigured auto-updater on a VPN fleet can take every node offline simultaneously. Detection-only keeps the human in the loop — the agent merely signals that a newer version exists; the operator chooses when and how to roll out.

### Configuration

| Env var | CLI flag | Default | Description |
|---------|----------|---------|-------------|
| `AGENT_UPDATE_CHECK` | `--update-check` | `false` | Enable the update-detection watchdog |
| `AGENT_UPDATE_MANIFEST_URL` | `--update-manifest-url` | `https://lai3d.github.io/sigma/agent-version.json` | URL of the version manifest JSON |
| `AGENT_UPDATE_CHECK_INTERVAL` | `--update-check-interval` | `3600` | Poll interval in seconds (default 1 hour) |

### Manifest JSON schema

```json
{
"version": "3.2.0",
"binary_url": "https://example.com/sigma-agent-3.2.0",
"sha256": "abc123def456..."
}
```

The agent compares `version` against its compiled-in `CARGO_PKG_VERSION` using component-wise integer parse on `.` (e.g. `3.2.0` vs `3.2.1`). Anything unparseable, or a component-count mismatch (`3.2` vs `3.2.0`), is treated as "no update" rather than guessing.

### Prometheus metrics

Exposed on the existing `/metrics` endpoint when enabled:

- `sigma_agent_update_available{hostname,current_version,latest_version}` — `1` if an update is detected, `0` otherwise. `0` also means "unknown" after a network or parse failure (graceful degradation).
- `sigma_agent_update_last_check_timestamp{hostname}` — Unix timestamp of the last poll attempt (success or failure).

A simple Grafana alert: `max by (hostname) (sigma_agent_update_available) == 1` for the fleet's "agents-behind-latest" panel; `time() - sigma_agent_update_last_check_timestamp > 86400` to alert on stale watchdogs.

### MCP tool

When MCP is also enabled, the tool `agent_check_update` returns the cached snapshot, or — with `{"force": true}` — triggers an immediate manifest poll. The response includes `detection_only: true` to make the contract explicit to LLM callers.

### Graceful degradation

Network errors, non-2xx responses, and malformed JSON all set `last_error`, force `update_available = false` in the gauge, and log a `warn!`. The watchdog never panics and never blocks the heartbeat loop. HTTP timeout is 5 seconds per check.

## MCP Tool Surface (LLM-callable control plane)

When `--mcp-enabled` is set, the agent runs a [Model Context Protocol](https://modelcontextprotocol.io) server at `POST /mcp` (JSON-RPC 2.0). This exposes agent capabilities as **tools that an external LLM can call** — e.g., an SRE assistant in `sigma-api` invoking `query_ebpf_traffic` during incident triage, or an automation calling `allocate_ports` when provisioning new Envoy routes.
Expand All @@ -213,6 +256,7 @@ When `--mcp-enabled` is set, the agent runs a [Model Context Protocol](https://m
| `query_dns_leaks` | `min_queries?` (default 1) | Processes sending UDP to port 53 |
| `query_gpu_metrics` | none | Per-GPU utilization/memory/temp/power (nvidia-smi) |
| `query_backend_health` | `unreachable_only?` (default false) | Latest TCP-probe snapshot of Envoy upstreams |
| `agent_check_update` | `force?` (bool) | Detection-only update check — cached snapshot, or forced poll |

Tools that depend on capabilities (port scan, eBPF, registration) return a structured `isError` or `enabled=false` payload when their dependency is not configured — they do not break the MCP session.

Expand Down
12 changes: 12 additions & 0 deletions sigma-agent/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,18 @@ pub struct Config {
#[arg(long, env = "AGENT_HEALTH_PROBE_INTERVAL", default_value = "30")]
pub health_probe_interval: u64,

/// Enable update-check watchdog (DETECTION ONLY — never auto-applies)
#[arg(long, env = "AGENT_UPDATE_CHECK", default_value = "false")]
pub update_check: bool,

/// URL of the version manifest JSON
#[arg(long, env = "AGENT_UPDATE_MANIFEST_URL", default_value = "https://lai3d.github.io/sigma/agent-version.json")]
pub update_manifest_url: String,

/// Update check interval in seconds (default 1 hour)
#[arg(long, env = "AGENT_UPDATE_CHECK_INTERVAL", default_value = "3600")]
pub update_check_interval: u64,

/// Enable MCP (Model Context Protocol) server — exposes agent capabilities as LLM-callable tools
#[arg(long, env = "AGENT_MCP_ENABLED", default_value = "false")]
pub mcp_enabled: bool,
Expand Down
38 changes: 34 additions & 4 deletions sigma-agent/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ mod metrics;
mod models;
mod port_scan;
mod system;
mod watchdog;
mod xds;
mod xds_resources;

Expand Down Expand Up @@ -116,8 +117,30 @@ async fn main() -> Result<()> {
None
};

// Conditionally start self-update detection watchdog (DETECTION ONLY — never auto-applies)
let update_info: Option<watchdog::SharedUpdateInfo> = if config.update_check {
let shared = Arc::new(RwLock::new(watchdog::UpdateInfo::new(
env!("CARGO_PKG_VERSION").to_string(),
)));
let manifest_url = config.update_manifest_url.clone();
let interval = config.update_check_interval;
let current = env!("CARGO_PKG_VERSION").to_string();
let task_shared = shared.clone();
info!(
manifest_url = %manifest_url,
interval_secs = interval,
"Update-detection watchdog enabled (DETECTION ONLY — never auto-applies)"
);
tokio::spawn(async move {
watchdog::watchdog_loop(task_shared, current, manifest_url, interval).await;
});
Some(shared)
} else {
None
};

// Metrics server is spawned after registration + health-probe setup so it
// can include both gpu_metrics and probe_results in its rendering state.
// can include gpu_metrics, probe_results, and update_info in its rendering state.

// Fetch public IP once at startup (the IP the world sees via default route)
let public_ip = match system::fetch_public_ip().await {
Expand Down Expand Up @@ -197,17 +220,18 @@ async fn main() -> Result<()> {
None
};

// Conditionally start metrics server (now that both gpu_metrics and
// probe_results are bound).
// Conditionally start metrics server (now that gpu_metrics, probe_results,
// and update_info are all bound).
if config.metrics_port > 0 {
let shared = scan_result.clone();
let port = config.metrics_port;
let hn = hostname.clone();
let ts = traffic_stats.clone();
let gm = gpu_metrics.clone();
let pr = probe_results.clone();
let ui = update_info.clone();
tokio::spawn(async move {
metrics::serve_metrics(port, shared, hn, port_range, ts, gm, pr).await;
metrics::serve_metrics(port, shared, hn, port_range, ts, gm, pr, ui).await;
});
}

Expand Down Expand Up @@ -262,6 +286,12 @@ async fn main() -> Result<()> {
traffic_stats: traffic_stats.clone(),
gpu_metrics: gpu_metrics.clone(),
probe_results: probe_results.clone(),
update_info: update_info.clone(),
update_manifest_url: if config.update_check {
Some(config.update_manifest_url.clone())
} else {
None
},
});
let bind = config.mcp_bind.clone();
info!(bind = %bind, "MCP server enabled");
Expand Down
61 changes: 61 additions & 0 deletions sigma-agent/src/mcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ use crate::gpu::SharedGpuMetrics;
use crate::health_probe::SharedProbeResults;
use crate::port_scan::{self, SharedScanResult};
use crate::system;
use crate::watchdog::{self, SharedUpdateInfo};

#[cfg(feature = "ebpf-traffic")]
use crate::ebpf_traffic::SharedTrafficStats;
Expand Down Expand Up @@ -125,6 +126,9 @@ pub struct McpState {
pub traffic_stats: Option<SharedTrafficStats>,
pub gpu_metrics: Option<SharedGpuMetrics>,
pub probe_results: Option<SharedProbeResults>,
pub update_info: Option<SharedUpdateInfo>,
/// Manifest URL — needed so `agent_check_update` can run a forced poll.
pub update_manifest_url: Option<String>,
}

// ---------- Tool schemas (returned by tools/list) ----------
Expand Down Expand Up @@ -187,6 +191,20 @@ fn tools_list_response() -> Value {
"additionalProperties": false
}
},
{
"name": "agent_check_update",
"description": "Check the version-manifest URL for an available sigma-agent update. DETECTION ONLY — does not download or apply. Returns current version, latest version, and update_available flag.",
"inputSchema": {
"type": "object",
"properties": {
"force": {
"type": "boolean",
"description": "If true, trigger an immediate poll of the manifest URL instead of returning the cached snapshot."
}
},
"additionalProperties": false
}
},
{
"name": "query_dns_leaks",
"description": "Detect potential DNS leaks: processes that emitted UDP packets to port 53 in the last eBPF harvest window. On a VPN node, any process bypassing the tunnel for DNS resolution is a security concern. Returns enabled=false if eBPF traffic monitoring is not configured.",
Expand Down Expand Up @@ -263,6 +281,7 @@ async fn handle_tool_call(state: Arc<McpState>, params: Value) -> Value {
"query_dns_leaks" => tool_query_dns_leaks(&state, args).await,
"query_gpu_metrics" => tool_query_gpu_metrics(&state).await,
"query_backend_health" => tool_query_backend_health(&state, args).await,
"agent_check_update" => tool_agent_check_update(&state, args).await,
other => return tool_err(format!("unknown tool: {}", other)),
};

Expand Down Expand Up @@ -558,6 +577,48 @@ async fn tool_query_backend_health(state: &McpState, args: Value) -> Result<Stri
.map_err(|e| e.to_string())
}

async fn tool_agent_check_update(state: &McpState, args: Value) -> Result<String, String> {
let Some(ref shared) = state.update_info else {
return Ok(json!({"enabled": false}).to_string());
};

let force = args
.get("force")
.and_then(|v| v.as_bool())
.unwrap_or(false);

let info = if force {
let Some(ref url) = state.update_manifest_url else {
return Ok(json!({
"enabled": false,
"note": "manifest URL not configured"
})
.to_string());
};
watchdog::check_once(
shared.clone(),
state.agent_version.to_string(),
url.clone(),
)
.await
} else {
shared.read().await.clone()
};

serde_json::to_string_pretty(&json!({
"enabled": true,
"detection_only": true,
"current_version": info.current_version,
"latest_version": info.latest_version,
"update_available": info.update_available,
"binary_url": info.binary_url,
"sha256": info.sha256,
"last_checked": info.last_checked.to_rfc3339(),
"last_error": info.last_error,
}))
.map_err(|e| e.to_string())
}

// ---------- HTTP handler ----------

async fn mcp_handler(
Expand Down
60 changes: 60 additions & 0 deletions sigma-agent/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use tracing::{error, info};
use crate::gpu::{GpuMetrics, SharedGpuMetrics};
use crate::health_probe::{BackendProbeResult, SharedProbeResults};
use crate::port_scan::{self, PortScanResult, SharedScanResult};
use crate::watchdog::{SharedUpdateInfo, UpdateInfo};

#[cfg(feature = "ebpf-traffic")]
use crate::ebpf_traffic::SharedTrafficStats;
Expand All @@ -37,6 +38,7 @@ struct MetricsState {
traffic_stats: Option<SharedTrafficStats>,
gpu_metrics: Option<SharedGpuMetrics>,
probe_results: Option<SharedProbeResults>,
update_info: Option<SharedUpdateInfo>,
}

#[derive(Deserialize)]
Expand Down Expand Up @@ -562,6 +564,56 @@ pub async fn render_probe_metrics(results: &[BackendProbeResult], hostname: &str
out
}

/// Render self-update watchdog metrics in Prometheus text format.
///
/// Emits two gauges:
/// - `sigma_agent_update_available{hostname,current_version,latest_version}`
/// — 1 if an update is detected, 0 otherwise (including the "unknown"
/// state after a network/parse failure).
/// - `sigma_agent_update_last_check_timestamp{hostname}` — unix timestamp
/// of the last check attempt (success or failure).
pub async fn render_update_metrics(info: &UpdateInfo, hostname: &str) -> String {
let mut out = String::with_capacity(512);

let latest = info.latest_version.as_deref().unwrap_or("unknown");
let available = if info.update_available { 1 } else { 0 };

writeln!(
out,
"# HELP sigma_agent_update_available Whether a sigma-agent update is detected (DETECTION ONLY — agent never auto-applies; 0 also means unknown after a check failure)"
)
.unwrap();
writeln!(out, "# TYPE sigma_agent_update_available gauge").unwrap();
writeln!(
out,
"sigma_agent_update_available{{hostname=\"{}\",current_version=\"{}\",latest_version=\"{}\"}} {}",
hostname, info.current_version, latest, available
)
.unwrap();

writeln!(out).unwrap();

writeln!(
out,
"# HELP sigma_agent_update_last_check_timestamp Unix timestamp of the last update check (success or failure)"
)
.unwrap();
writeln!(
out,
"# TYPE sigma_agent_update_last_check_timestamp gauge"
)
.unwrap();
writeln!(
out,
"sigma_agent_update_last_check_timestamp{{hostname=\"{}\"}} {}",
hostname,
info.last_checked.timestamp()
)
.unwrap();

out
}

async fn metrics_handler(State(state): State<Arc<MetricsState>>) -> impl IntoResponse {
let result = state.scan_result.read().await;

Expand Down Expand Up @@ -593,6 +645,12 @@ async fn metrics_handler(State(state): State<Arc<MetricsState>>) -> impl IntoRes
}
}

if let Some(ref ui) = state.update_info {
let snapshot = ui.read().await.clone();
body.push('\n');
body.push_str(&render_update_metrics(&snapshot, &state.hostname).await);
}

(
[(
header::CONTENT_TYPE,
Expand Down Expand Up @@ -654,6 +712,7 @@ pub async fn serve_metrics(
#[cfg(not(feature = "ebpf-traffic"))] _traffic_stats: Option<()>,
gpu_metrics: Option<SharedGpuMetrics>,
probe_results: Option<SharedProbeResults>,
update_info: Option<SharedUpdateInfo>,
) {
let state = Arc::new(MetricsState {
scan_result,
Expand All @@ -663,6 +722,7 @@ pub async fn serve_metrics(
traffic_stats,
gpu_metrics,
probe_results,
update_info,
});

let app = Router::new()
Expand Down
Loading
Loading