diff --git a/tests/caldera_coverage_support.py b/tests/caldera_coverage_support.py index 493715b..dc624ed 100644 --- a/tests/caldera_coverage_support.py +++ b/tests/caldera_coverage_support.py @@ -196,3 +196,41 @@ def write_coverage_report(report: dict[str, Any]) -> Path: ARTIFACT_PATH.parent.mkdir(parents=True, exist_ok=True) ARTIFACT_PATH.write_text(json.dumps(report, indent=2, sort_keys=True), encoding="utf-8") return ARTIFACT_PATH + + + +# --------------------------------------------------------------------------- +# Telemetry stream schema validation +# --------------------------------------------------------------------------- +REQUIRED_STREAM_SCHEMA_FIELDS: dict[str, list[str]] = { + "falco": [ + "rule", + "output", + "priority", + "output_fields", + "_timestamp", + ], + "default": [ + "service_name", + "operation_name", + "trace_id", + "span_id", + "_timestamp", + ], +} + + +def validate_stream_schema(stream_name: str, hits: list[dict[str, Any]], required_fields: list[str] | None = None) -> list[str]: + """Return a list of missing required fields from the first hit in a stream. + + If *required_fields* is not provided, uses the built-in + REQUIRED_STREAM_SCHEMA_FIELDS mapping keyed by *stream_name*. + Returns an empty list when all required fields are present. + Returns all required fields when there are no hits. + """ + if required_fields is None: + required_fields = REQUIRED_STREAM_SCHEMA_FIELDS.get(stream_name, []) + if not hits: + return list(required_fields) + first_hit = hits[0] + return [field for field in required_fields if field not in first_hit] diff --git a/tests/test_caldera_detection_coverage.py b/tests/test_caldera_detection_coverage.py index 5703452..16b0c42 100644 --- a/tests/test_caldera_detection_coverage.py +++ b/tests/test_caldera_detection_coverage.py @@ -1,8 +1,11 @@ from __future__ import annotations import json +import os import subprocess +import time from pathlib import Path +from typing import Any import pytest @@ -12,53 +15,200 @@ from tests.caldera_coverage_support import DUMP_HISTORY_ABILITY from tests.caldera_coverage_support import build_coverage_report from tests.caldera_coverage_support import write_coverage_report +from tests.caldera_coverage_support import validate_stream_schema +from tests.caldera_coverage_support import REQUIRED_STREAM_SCHEMA_FIELDS pytestmark = [pytest.mark.integration, pytest.mark.host_emulation] +# --------------------------------------------------------------------------- +# Configurable timeouts via environment variables +# --------------------------------------------------------------------------- +TRACE_VERIFY_TIMEOUT = int(os.environ.get("CALDERA_TRACE_VERIFY_TIMEOUT", "90")) +LOG_VERIFY_TIMEOUT = int(os.environ.get("CALDERA_LOG_VERIFY_TIMEOUT", "90")) +TRACE_VERIFY_INTERVAL = int(os.environ.get("CALDERA_TRACE_VERIFY_INTERVAL", "2")) +LOG_VERIFY_INTERVAL = int(os.environ.get("CALDERA_LOG_VERIFY_INTERVAL", "2")) +ABILITY_EXECUTION_TIMEOUT = int(os.environ.get("CALDERA_ABILITY_TIMEOUT", "30")) -def _run(repo_root: Path, args: list[str]) -> subprocess.CompletedProcess[str]: - return subprocess.run( - args, - cwd=repo_root, - check=True, - capture_output=True, - text=True, +# Retry configuration: how many times to retry a failing ability from scratch +MAX_ABILITY_RETRIES = int(os.environ.get("CALDERA_MAX_RETRIES", "3")) + + + +def _run( + repo_root: Path, + args: list[str], + *, + timeout: int | None = None, +) -> subprocess.CompletedProcess[str]: + """Run a subprocess, optionally with a custom timeout.""" + kwargs: dict[str, Any] = { + "cwd": repo_root, + "check": True, + "capture_output": True, + "text": True, + } + if timeout is not None: + kwargs["timeout"] = timeout + return subprocess.run(args, **kwargs) + + +def _run_ability( + repo_root: Path, + ability_id: str, + *, + retries: int = MAX_ABILITY_RETRIES, +) -> dict: + """Run a CALDERA ability via the harness with configurable timeouts and retry logic. + + Retries on failure (non-zero exit or verification timeout) up to *retries* times. + Each retry re-bootstraps the CALDERA repo and re-executes the ability with the + configured verification timeouts. + """ + harness_args = [ + "uv", + "run", + "python", + "tools/caldera_otel_harness.py", + "run-ability", + "--bootstrap", + "--ability-id", + ability_id, + "--verify-trace", + "--verify-logs", + "--verify-timeout-seconds", + str(TRACE_VERIFY_TIMEOUT), + "--verify-interval-seconds", + str(TRACE_VERIFY_INTERVAL), + "--log-verify-timeout-seconds", + str(LOG_VERIFY_TIMEOUT), + "--log-verify-interval-seconds", + str(LOG_VERIFY_INTERVAL), + "--timeout-seconds", + str(ABILITY_EXECUTION_TIMEOUT), + ] + + last_exc: Exception | None = None + last_payload: dict[str, Any] | None = None + + for attempt in range(1, retries + 1): + try: + result = _run(repo_root, harness_args, timeout=ABILITY_EXECUTION_TIMEOUT + TRACE_VERIFY_TIMEOUT + LOG_VERIFY_TIMEOUT + 30) + payload = json.loads(result.stdout) + last_payload = payload + + # If the ability ran but verification failed, log and retry + if payload["exit_code"] != 0: + if attempt < retries: + _diagnostic_log(ability_id, attempt, f"non-zero exit_code={payload['exit_code']}; retrying") + time.sleep(1) + continue + + # If trace verification failed, log and retry + if not payload["trace"].get("verified"): + if attempt < retries: + _diagnostic_log(ability_id, attempt, f"trace not verified; retrying") + time.sleep(2) + continue + + # If falco correlation is expected but not verified, log and retry + correlation = payload.get("correlation", {}) + if correlation.get("expected_count", 0) > 0 and not correlation.get("verified"): + if attempt < retries: + _diagnostic_log(ability_id, attempt, f"correlation not verified ({correlation.get('verified_count', 0)}/{correlation.get('expected_count', 0)}); retrying") + time.sleep(3) + continue + + return payload + + except (subprocess.CalledProcessError, subprocess.TimeoutExpired, json.JSONDecodeError) as exc: + last_exc = exc + if attempt < retries: + _diagnostic_log(ability_id, attempt, f"exception: {exc}; retrying") + time.sleep(2) + + # All retries exhausted — return last payload if available, otherwise raise + if last_payload is not None: + return last_payload + raise last_exc or RuntimeError(f"ability {ability_id} failed after {retries} retries with no payload") + + +def _diagnostic_log(ability_id: str, attempt: int, message: str) -> None: + """Print a diagnostic log line for retry attempts.""" + print(f"[CALDERA-RETRY] ability={ability_id} attempt={attempt} {message}") + + +# --------------------------------------------------------------------------- +# Telemetry stream schema validation helpers +# --------------------------------------------------------------------------- +# --------------------------------------------------------------------------- +# Enhanced assertion helpers with diagnostic output +# --------------------------------------------------------------------------- +def _assert_trace_verified(payload: dict, ability_id: str) -> None: + """Assert trace verification succeeded with a detailed error on failure.""" + trace = payload.get("trace", {}) + if trace.get("verified"): + return + hits = trace.get("hits", []) + trace_id = trace.get("trace_id", "") + raise AssertionError( + f"Trace verification failed for ability {ability_id}:\n" + f" trace_id: {trace_id}\n" + f" expected: trace to be searchable in OpenObserve\n" + f" found: {len(hits)} hit(s)\n" + f" raw trace data: {json.dumps(trace, indent=2, default=str)[:2000]}" ) -def _run_ability(repo_root: Path, ability_id: str) -> dict: - result = _run( - repo_root, - [ - "uv", - "run", - "python", - "tools/caldera_otel_harness.py", - "run-ability", - "--bootstrap", - "--ability-id", - ability_id, - "--verify-trace", - "--verify-logs", - ], +def _assert_falco_correlation(payload: dict, ability_id: str) -> None: + """Assert Falco log correlation succeeded with a detailed error on failure.""" + correlation = payload.get("correlation", {}) + streams = correlation.get("streams", {}) + falco = streams.get("falco", {}) + if falco.get("verified"): + return + expected_correlations = { + item["stream"]: item + for item in payload.get("expected_correlations", []) + } + falco_expected = expected_correlations.get("falco", {}) + hits = falco.get("hits", []) + sql = falco.get("sql", "") + raise AssertionError( + f"Falco correlation verification failed for ability {ability_id}:\n" + f" expected: Falco event for rule matching ability\n" + f" reason: {falco_expected.get('reason', '')}\n" + f" found: {len(hits)} hit(s)\n" + f" verified: {falco.get('verified')}\n" + f" stream_not_found: {falco.get('stream_not_found', False)}\n" + f" correlation.enabled: {correlation.get('enabled')}\n" + f" correlation.verified_count: {correlation.get('verified_count', 0)}/{correlation.get('expected_count', 0)}\n" + f" raw falco data: {json.dumps(falco, indent=2, default=str)[:2000]}" ) - return json.loads(result.stdout) +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- @pytest.mark.parametrize("ability_id", SAFE_ABILITIES) def test_safe_abilities_emit_trace_and_expected_correlations(repo_root: Path, ability_id: str) -> None: payload = _run_ability(repo_root, ability_id) - assert payload["exit_code"] == 0 - assert payload["trace"]["verified"] is True + assert payload["exit_code"] == 0, ( + f"Ability {ability_id} exited with code {payload['exit_code']}; " + f"stderr: {payload.get('stderr', '')[:500]}" + ) + _assert_trace_verified(payload, ability_id) expected = {item["stream"]: item for item in payload["expected_correlations"]} if ability_id in {PAYLOAD_ABILITY, DUMP_HISTORY_ABILITY, AVOID_LOGS_ABILITY}: assert expected["falco"]["expected"] is True + _assert_falco_correlation(payload, ability_id) falco_logs = payload["correlation"]["streams"]["falco"] - assert falco_logs["verified"] is True - assert falco_logs["hits"], "expected Falco hit for mapped safe ability" + assert falco_logs["hits"], ( + f"Expected Falco hit for mapped safe ability {ability_id}, " + f"but got 0 hits. Raw: {json.dumps(falco_logs, default=str)[:1000]}" + ) else: assert expected == {} assert payload["correlation"]["expected_count"] == 0 @@ -85,8 +235,15 @@ def test_avoid_logs_ability_reports_falco_and_osquery_coverage(repo_root: Path) expected = {item["stream"]: item for item in payload["expected_correlations"]} assert expected["falco"]["expected"] is True - assert payload["osquery"]["config"]["bash_profiles"] - assert payload["osquery"]["live_query"]["verified"] is True + assert payload["osquery"]["config"]["bash_profiles"], ( + f"Expected osquery bash_profiles to be configured, got: " + f"{payload['osquery']['config']['bash_profiles']}" + ) + assert payload["osquery"]["live_query"]["verified"] is True, ( + f"osquery live query not verified for AVOID_LOGS_ABILITY. " + f"stdout: {payload['osquery']['live_query'].get('stdout', '')[:500]}, " + f"stderr: {payload['osquery']['live_query'].get('stderr', '')[:500]}" + ) def test_detection_coverage_percentage_meets_trace_and_log_baseline(repo_root: Path) -> None: @@ -96,9 +253,59 @@ def test_detection_coverage_percentage_meets_trace_and_log_baseline(repo_root: P assert artifact_path.exists() assert report["safe_ability_count"] == len(SAFE_ABILITIES) - assert report["categories"]["trace"]["coverage_percent"] >= 100.0 - assert report["categories"]["config"]["coverage_percent"] >= 100.0 - assert report["coverage_percent"] >= 100.0, ( - f"expected 100% verified configured/live coverage, got {report['coverage_percent']:.1f}% " - f"({report['total_verified_signals']}/{report['total_expected_signals']})" + + trace_pct = report["categories"]["trace"]["coverage_percent"] + config_pct = report["categories"]["config"]["coverage_percent"] + overall_pct = report["coverage_percent"] + + assert trace_pct >= 100.0, ( + f"Trace coverage {trace_pct:.1f}% below 100% threshold. " + f"Details: {json.dumps(report['categories']['trace'], indent=2)}" + ) + assert config_pct >= 100.0, ( + f"Config coverage {config_pct:.1f}% below 100% threshold. " + f"Details: {json.dumps(report['categories']['config'], indent=2)}" + ) + assert overall_pct >= 100.0, ( + f"Overall coverage {overall_pct:.1f}% below 100% threshold. " + f"({report['total_verified_signals']}/{report['total_expected_signals']}) " + f"Category breakdown: {json.dumps(report['categories'], indent=2)}" + ) + + +def test_telemetry_stream_schema_fields_are_populated(repo_root: Path) -> None: + """Validate that telemetry streams have the expected schema fields populated + when running under active container workloads. + + This catches situations where OpenObserve has indexed events but is missing + key fields used by the correlation queries, which would cause false negatives + in the detection coverage tests. + """ + # Run just the PAYLOAD_ABILITY which exercises the richest set of streams + payload = _run_ability(repo_root, PAYLOAD_ABILITY) + + # Validate trace stream schema + trace_hits = payload.get("trace", {}).get("hits", []) + missing_trace = validate_stream_schema( + "default", trace_hits, REQUIRED_STREAM_SCHEMA_FIELDS["default"] + ) + assert not missing_trace, ( + f"Trace stream missing expected schema fields: {missing_trace}. " + f"First hit keys: {list(trace_hits[0].keys()) if trace_hits else 'no hits'}" + ) + + # Validate falco stream schema if correlation data is present + falco_hits = ( + payload.get("correlation", {}) + .get("streams", {}) + .get("falco", {}) + .get("hits", []) ) + if falco_hits: + missing_falco = validate_stream_schema( + "falco", falco_hits, REQUIRED_STREAM_SCHEMA_FIELDS["falco"] + ) + assert not missing_falco, ( + f"Falco stream missing expected schema fields: {missing_falco}. " + f"First hit keys: {list(falco_hits[0].keys()) if falco_hits else 'no hits'}" + ) diff --git a/tools/caldera_otel_harness.py b/tools/caldera_otel_harness.py index 7412d80..e45dd71 100644 --- a/tools/caldera_otel_harness.py +++ b/tools/caldera_otel_harness.py @@ -37,12 +37,14 @@ DEFAULT_TRACE_ENDPOINT = "http://localhost:4318/v1/traces" DEFAULT_TRACE_SEARCH_URL = "http://localhost:5080/api/default/_search?type=traces" DEFAULT_LOG_SEARCH_URL = "http://localhost:5080/api/default/_search" -DEFAULT_TRACE_LOOKBACK_SECONDS = 900 -DEFAULT_TRACE_VERIFY_TIMEOUT_SECONDS = 60 -DEFAULT_TRACE_VERIFY_INTERVAL_SECONDS = 3 -DEFAULT_LOG_LOOKBACK_SECONDS = 120 -DEFAULT_LOG_VERIFY_TIMEOUT_SECONDS = 60 -DEFAULT_LOG_VERIFY_INTERVAL_SECONDS = 3 +DEFAULT_TRACE_LOOKBACK_SECONDS = int(os.environ.get("CALDERA_TRACE_LOOKBACK_SECONDS", "900")) +DEFAULT_TRACE_VERIFY_TIMEOUT_SECONDS = int(os.environ.get("CALDERA_TRACE_VERIFY_TIMEOUT", "90")) +DEFAULT_TRACE_VERIFY_INTERVAL_SECONDS = int(os.environ.get("CALDERA_TRACE_VERIFY_INTERVAL", "2")) +DEFAULT_LOG_LOOKBACK_SECONDS = int(os.environ.get("CALDERA_LOG_LOOKBACK_SECONDS", "120")) +DEFAULT_LOG_VERIFY_TIMEOUT_SECONDS = int(os.environ.get("CALDERA_LOG_VERIFY_TIMEOUT", "90")) +DEFAULT_LOG_VERIFY_INTERVAL_SECONDS = int(os.environ.get("CALDERA_LOG_VERIFY_INTERVAL", "2")) +# Extra sleep after OTEL batch flush to allow OpenObserve indexing +DEFAULT_POST_FLUSH_SLEEP_SECONDS = float(os.environ.get("CALDERA_POST_FLUSH_SLEEP", "1.0")) PLACEHOLDER_PATTERN = re.compile(r"#\{([^}]+)\}") DUMP_HISTORY_ABILITY_ID = "422526ec-27e9-429a-995b-c686a29561a4" AVOID_LOGS_ABILITY_ID = "43b3754c-def4-4699-a673-1d85648fda6a" @@ -304,7 +306,10 @@ def _wait_for_trace( ) -> dict[str, Any]: deadline = time.time() + timeout_seconds latest: dict[str, Any] = {"hits": []} + attempts = 0 + max_attempts = max(1, timeout_seconds // max(1, interval_seconds)) while time.time() < deadline: + attempts += 1 latest = _search_trace( trace_id, search_url=search_url, @@ -313,8 +318,14 @@ def _wait_for_trace( lookback_seconds=lookback_seconds, ) if latest.get("hits"): + print(f"[CALDERA-TRACE] trace_id={trace_id} found after {attempts}/{max_attempts} attempts", file=sys.stderr) return latest - time.sleep(interval_seconds) + if attempts <= 3 or attempts % 5 == 0: + print(f"[CALDERA-TRACE] trace_id={trace_id} not yet indexed (attempt {attempts}/{max_attempts}), retrying...", file=sys.stderr) + # Exponential backoff capped at interval_seconds * 4 + backoff = min(interval_seconds * (1.5 ** min(attempts - 1, 4)), interval_seconds * 4) + time.sleep(backoff) + print(f"[CALDERA-TRACE] TIMEOUT: trace_id={trace_id} not found after {attempts} attempts over {timeout_seconds}s. Last response: {json.dumps(latest, default=str)[:500]}", file=sys.stderr) return latest @@ -362,7 +373,10 @@ def _wait_for_logs( ) -> dict[str, dict[str, Any]]: deadline = time.time() + timeout_seconds latest = {query.stream: {"hits": []} for query in queries} + attempts = 0 + max_attempts = max(1, timeout_seconds // max(1, interval_seconds)) while time.time() < deadline: + attempts += 1 latest = { query.stream: _search_logs( query, @@ -375,8 +389,23 @@ def _wait_for_logs( for query in queries } if any(response.get("hits") for response in latest.values()): + found_streams = [s for s, r in latest.items() if r.get("hits")] + print(f"[CALDERA-LOGS] Found hits in streams {found_streams} after {attempts}/{max_attempts} attempts", file=sys.stderr) return latest - time.sleep(interval_seconds) + if attempts <= 3 or attempts % 5 == 0: + stream_not_found = [s for s, r in latest.items() if r.get("stream_not_found")] + msg = f"[CALDERA-LOGS] No log hits yet (attempt {attempts}/{max_attempts})" + if stream_not_found: + msg += f" — streams not indexed yet: {stream_not_found}" + print(msg, file=sys.stderr) + # Exponential backoff capped at interval_seconds * 4 + backoff = min(interval_seconds * (1.5 ** min(attempts - 1, 4)), interval_seconds * 4) + time.sleep(backoff) + timed_out_streams = { + s: {"hits_count": len(r.get("hits", [])), "stream_not_found": r.get("stream_not_found", False)} + for s, r in latest.items() + } + print(f"[CALDERA-LOGS] TIMEOUT: No matching logs after {attempts} attempts over {timeout_seconds}s. Stream status: {json.dumps(timed_out_streams)}", file=sys.stderr) return latest @@ -674,8 +703,10 @@ def _correlate_logs( streams = { query.stream: { "expected": query.expected, + "sql": query.sql, "verified": bool(response.get("hits")), "hits": response.get("hits", []), + "stream_not_found": response.get("stream_not_found", False), } for query in resolved_queries for response in [responses[query.stream]] @@ -782,7 +813,7 @@ def execute_ability( trace_provider.force_flush() # Sleep briefly to ensure the OTEL BatchSpanProcessor has finished exporting # to OpenObserve before the verification query is sent. - time.sleep(0.5) + time.sleep(DEFAULT_POST_FLUSH_SLEEP_SECONDS) trace_result: dict[str, Any] = {"trace_id": trace_id, "verified": False} try: