diff --git a/docs/rulehound.md b/docs/rulehound.md new file mode 100644 index 0000000..00cb61b --- /dev/null +++ b/docs/rulehound.md @@ -0,0 +1,117 @@ +# Rulehound Detection Rule Import + +## What is Rulehound? + +[Rulehound](https://github.com/infosecB/Rulehound) is a catalog and search engine for publicly available, open-source threat detection rulesets. It indexes rules across multiple detection engines including **Sigma**, **Splunk Security Content**, **Elastic Detection Rules**, **Panther Rules**, and **Anvilogic Forge Rules**. + +Rulehound provides a centralized way to discover and reference community detection content mapped to MITRE ATT&CK techniques, making it easier to build comprehensive detection coverage. + +## How the Import Works + +The `scripts/import_rulehound_rules.py` script bridges Rulehound's indexed rulesets and LocalObserve's Falco-based detection pipeline: + +``` +[Sigma Rules (SigmaHQ — indexed by Rulehound)] + │ + ▼ +[scripts/import_rulehound_rules.py] ── Translates & converts + │ + ├─► [rules/rulehound/*.yaml] ← Individual Falco rules + └─► [rules/rulehound/rulehound_falco_rules.yaml] ← Combined rules file +``` + +### Conversion Logic + +The script fetches Sigma YAML rules from the SigmaHQ repository (the primary source indexed by Rulehound) and converts them into Falco-compatible YAML: + +| Sigma Category | Falco Event Mapping | +|---|---| +| `process_creation` | `spawned_process` | +| `file_event` | `open_write` | +| `network_connection` | `outbound` | +| `auditd` | `spawned_process` | +| `builtin` | `spawned_process` | + +MITRE ATT&CK tags are automatically translated: +- `attack.defense_evasion` → `mitre_defense_evasion` +- `attack.t1070.002` → `T1070.002` + +Sigma severity levels map to Falco priorities: +- `critical` / `high` → `CRITICAL` +- `medium` → `WARNING` +- `low` / `informational` → `INFO` + +## Usage + +### Basic Import + +```bash +# Fetch process_creation and file_event rules (default categories) +python scripts/import_rulehound_rules.py + +# Specify output directory and categories +python scripts/import_rulehound_rules.py --output rules/rulehound/ --categories process_creation,file_event,network_connection + +# Use a GitHub token to avoid rate limits +GITHUB_TOKEN=ghp_xxxx python scripts/import_rulehound_rules.py +``` + +### Options + +| Flag | Default | Description | +|---|---|---| +| `--output`, `-o` | `rules/rulehound/` | Output directory for converted Falco rules | +| `--categories`, `-c` | `process_creation,file_event` | Comma-separated Sigma rule categories to fetch | + +### Available Categories + +- `process_creation` — Process execution/spawning rules +- `file_event` — File modification/creation rules +- `network_connection` — Network activity rules +- `auditd` — Linux auditd-based rules +- `builtin` — Built-in detection rules + +## Output Structure + +After running the script, the `rules/rulehound/` directory will contain: + +``` +rules/rulehound/ +├── rulehound_falco_rules.yaml ← Combined file with all converted rules +├── linux_clear_log_attempts.yaml +├── linux_shred_file_deletion.yaml +├── linux_ssh_authorized_keys_modification.yaml +└── ... ← One file per converted rule +``` + +## Integrating with LocalObserve + +To add the imported rules to your Falco configuration: + +1. Run the import script to generate the rules +2. Append the combined rules file to your Falco local rules: + ```bash + cat rules/rulehound/rulehound_falco_rules.yaml >> falco_rules.local.yaml + ``` +3. Restart Falco to pick up the new rules + +## Alignment Validation + +The existing `scripts/import_rulehound_mappings.py` script validates that Rulehound-mapped rules in `docs/rulehound_mappings.md` are active in `falco_rules.local.yaml` and `osqueryd.conf`. The new import script complements this by pulling new rules directly from upstream sources. + +## Testing + +```bash +# Unit tests for the conversion logic +pytest tests/test_rulehound_import.py -v + +# Integration test with live GitHub API (requires network) +python scripts/import_rulehound_rules.py --categories process_creation +``` + +## References + +- [Rulehound](https://github.com/infosecB/Rulehound) — Detection rules catalog +- [SigmaHQ/sigma](https://github.com/SigmaHQ/sigma) — Sigma rule repository +- [Rulehound Mappings](../docs/rulehound_mappings.md) — LocalObserve's existing alignment matrix +- [Falco Rules](https://falco.org/docs/rules/) — Falco rule documentation diff --git a/scripts/import_rulehound_rules.py b/scripts/import_rulehound_rules.py new file mode 100644 index 0000000..91a0bc2 --- /dev/null +++ b/scripts/import_rulehound_rules.py @@ -0,0 +1,417 @@ +#!/usr/bin/env python3 +""" +Rulehound Detection Rule Importer +=================================== +Fetches Linux detection rules from the SigmaHQ/sigma repository (the primary +ruleset indexed by Rulehound) and converts them into Falco-compatible YAML +rules that can be directly used by LocalObserve's Falco sidecar. + +Usage: + python scripts/import_rulehound_rules.py [--output rules/rulehound/] [--categories process_creation,file_event] + +What it does: + 1. Queries the SigmaHQ GitHub API for Linux rule files in the specified + categories (default: process_creation, file_event, network_connection). + 2. Downloads each Sigma YAML rule. + 3. Converts Sigma rule definitions into Falco rule YAML format: + - process_creation -> spawned_process detection + - file_event -> open_write / open_read detection + - network_connection -> outbound detection + 4. Writes the converted Falco rules to rules/rulehound/ as individual YAML files + and a combined rules/rulehound/rulehound_falco_rules.yaml. + +Rulehound (https://github.com/infosecB/Rulehound) is a catalogue of public +threat-detection rulesets. This script uses the Sigma rules it indexes as the +canonical source and translates them into the Falco format LocalObserve uses. +""" + +from __future__ import annotations + +import argparse +import json +import os +import re +import sys +import urllib.request +import urllib.error +from pathlib import Path + +import yaml + +# --------------------------------------------------------------------------- +# Configuration +# --------------------------------------------------------------------------- + +REPO_ROOT = Path(__file__).resolve().parents[1] +DEFAULT_OUTPUT_DIR = REPO_ROOT / "rules" / "rulehound" + +SIGMA_GITHUB_API = "https://api.github.com/repos/SigmaHQ/sigma/contents/rules/linux" +SIGMA_RAW_URL = "https://raw.githubusercontent.com/SigmaHQ/sigma/master/rules/linux" + +CATEGORIES = [ + "process_creation", + "file_event", + "network_connection", + "auditd", + "builtin", +] + +# MITRE tag helpers +MITRE_TACTIC_MAP = { + "attack.defense_evasion": "mitre_defense_evasion", + "attack.privilege_escalation": "mitre_privilege_escalation", + "attack.persistence": "mitre_persistence", + "attack.credential_access": "mitre_credential_access", + "attack.discovery": "mitre_discovery", + "attack.lateral_movement": "mitre_lateral_movement", + "attack.execution": "mitre_execution", + "attack.exfiltration": "mitre_exfiltration", + "attack.impact": "mitre_impact", + "attack.command_and_control": "mitre_command_and_control", + "attack.initial_access": "mitre_initial_access", + "attack.collection": "mitre_collection", +} + +PRIORITY_MAP = { + "critical": "CRITICAL", + "high": "CRITICAL", + "medium": "WARNING", + "low": "INFO", + "informational": "INFO", +} + +# --------------------------------------------------------------------------- +# GitHub API helpers +# --------------------------------------------------------------------------- + +def _github_request(url: str) -> dict | list: + """Issue a GET request to the GitHub API with optional token auth.""" + headers = {"Accept": "application/vnd.github.v3+json"} + token = os.environ.get("GITHUB_TOKEN") + if token: + headers["Authorization"] = f"token {token}" + req = urllib.request.Request(url, headers=headers) + try: + with urllib.request.urlopen(req, timeout=30) as resp: + return json.loads(resp.read().decode("utf-8")) + except urllib.error.HTTPError as exc: + if exc.code in (401, 403): + raise RuntimeError(f"GitHub API auth error {exc.code} fetching {url}: {exc.reason}") from exc + print(f"[!] HTTP {exc.code} fetching {url}: {exc.reason}", file=sys.stderr) + return [] + + +def _fetch_raw(url: str) -> str: + """Fetch raw text content from a URL.""" + req = urllib.request.Request(url) + try: + with urllib.request.urlopen(req, timeout=30) as resp: + return resp.read().decode("utf-8") + except urllib.error.URLError as exc: + print(f"[!] Failed to fetch {url}: {exc}", file=sys.stderr) + return "" + + +# --------------------------------------------------------------------------- +# Sigma detection block flattening +# --------------------------------------------------------------------------- + +def _flatten_detection(detection: dict) -> dict: + """ + Flatten a Sigma detection block by merging all selection groups + into a single flat dict. Sigma rules nest detection criteria under + named selection keys like 'selection', 'filter', etc., while the + 'condition' key describes how they combine. + + This extracts all key-value pairs from nested dicts (except 'condition' + and other meta-keys) so we can find process names, paths, etc. + """ + flat: dict[str, list] = {} + for key, value in detection.items(): + if key == "condition": + continue + if isinstance(value, dict): + for k, v in value.items(): + existing = flat.get(k, []) + if isinstance(v, list): + existing.extend(v) + else: + existing.append(v) + flat[k] = existing + elif isinstance(value, list): + existing = flat.get(key, []) + existing.extend(value) + flat[key] = existing + else: + existing = flat.get(key, []) + existing.append(value) + flat[key] = existing + return flat + + +# --------------------------------------------------------------------------- +# Sigma -> Falco conversion +# --------------------------------------------------------------------------- + +def _mitre_tags(sigma_tags: list[str]) -> list[str]: + """Convert Sigma ATT&CK tag notation to Falco-friendly tags.""" + tags: list[str] = [] + for tag in sigma_tags: + tag_lower = tag.lower() + if tag_lower in MITRE_TACTIC_MAP: + tags.append(MITRE_TACTIC_MAP[tag_lower]) + # Extract technique IDs like attack.t1070.002 or attack.t1548.003 + match = re.match(r"attack\.t(\d+\.\d+)", tag_lower) + if match: + tags.append(f"T{match.group(1)}") + else: + match2 = re.match(r"attack\.t(\d+)$", tag_lower) + if match2: + tags.append(f"T{match2.group(1)}") + tags = list(dict.fromkeys(tags)) # dedupe preserving order + return tags + + +def _category_to_falco_event(category: str) -> str: + """Map a Sigma logsource category to a Falco event macro.""" + mapping = { + "process_creation": "spawned_process", + "file_event": "open_write", + "network_connection": "outbound", + "auditd": "spawned_process", + "builtin": "spawned_process", + } + return mapping.get(category, "spawned_process") + + +def _extract_procs(sigma_detection: dict) -> list[str]: + """ + Extract process/binary names from a Sigma detection block. + + Looks at both top-level and nested (selection group) keys for Image, + Image|endswith, ProcessName, etc., then filters out wildcards. + """ + flat = _flatten_detection(sigma_detection) + procs: list[str] = [] + for key in ("Image", "Image|endswith", "ProcessName", "ProcessName|endswith"): + vals = flat.get(key, []) + for v in vals: + if not isinstance(v, str): + continue + basename = v.rsplit("/", 1)[-1] if "/" in v else v + # Strip leading slash for /rm -> rm + basename = basename.lstrip("/") + if basename and not any(c in basename for c in ("*", "?", "%")): + procs.append(basename) + return sorted(set(procs)) + + +def _extract_paths(sigma_detection: dict) -> list[str]: + """ + Extract file/path patterns from a Sigma detection block. + + Looks at TargetFilename, Path, and variants. + """ + flat = _flatten_detection(sigma_detection) + paths: list[str] = [] + for key in ("TargetFilename", "TargetFilename|contains", "TargetFilename|endswith", + "TargetFilename|startswith", "Path", "Path|contains", "Path|endswith"): + vals = flat.get(key, []) + for v in vals: + if isinstance(v, str) and v: + paths.append(v) + return sorted(set(paths)) + + +def sigma_to_falco_rule(sigma: dict) -> dict | None: + """ + Convert a single Sigma rule dict into a Falco rule dict. + + Returns None if the rule cannot be meaningfully converted. + """ + title = sigma.get("title", "Untitled Sigma Rule") + description = sigma.get("description", "").strip() + level = sigma.get("level", "medium") + tags = _mitre_tags(sigma.get("tags", [])) + logsource = sigma.get("logsource", {}) + category = logsource.get("category", "process_creation") + detection = sigma.get("detection", {}) + + # Use category for the Falco event type + falco_event = _category_to_falco_event(category) + + procs = _extract_procs(detection) + paths = _extract_paths(detection) + + # Build condition parts — only add proc_name_exists when there are process conditions + condition_parts = [falco_event] + + if procs: + condition_parts.append("proc_name_exists") + if len(procs) == 1: + condition_parts.append(f'proc.name = "{procs[0]}"') + else: + procs_str = ", ".join(f'"{p}"' for p in procs[:10]) # Limit to avoid overly long rules + condition_parts.append(f"proc.name in ({procs_str})") + + if paths: + if len(paths) == 1: + condition_parts.append(f'fd.name endswith "{paths[0]}"') + else: + path_conds = " or ".join(f'fd.name endswith "{p}"' for p in paths[:5]) + condition_parts.append(f"({path_conds})") + + if not procs and not paths: + # Cannot build a meaningful condition; skip this rule + return None + + condition = "\n and ".join(condition_parts) + priority = PRIORITY_MAP.get(level, "WARNING") + output = (f"[Rulehound] {title} | user=%user.name command=%proc.cmdline " + f"process=%proc.name parent=%proc.pname") + + tags.extend(["linux", "rulehound"]) + tags = list(dict.fromkeys(tags)) + + rule = { + "rule": title, + "desc": description[:300] if description else f"Converted from Sigma rule: {title}", + "condition": condition, + "output": output, + "priority": priority, + "tags": tags, + } + return rule + + +# --------------------------------------------------------------------------- +# Main import logic +# --------------------------------------------------------------------------- + +def fetch_sigma_rules(categories: list[str] | None = None) -> list[dict]: + """ + Fetch Sigma Linux rules from GitHub and return them as parsed dicts. + + Parameters + ---------- + categories : list[str] | None + Sigma rule categories to fetch. Defaults to process_creation and file_event. + """ + if categories is None: + categories = ["process_creation", "file_event"] + + all_rules: list[dict] = [] + + for cat in categories: + api_url = f"{SIGMA_GITHUB_API}/{cat}" + print(f"[*] Fetching Sigma rule index: {cat}") + entries = _github_request(api_url) + if not isinstance(entries, list): + print(f"[!] No entries found for category {cat}, skipping") + continue + + # Filter for .yml files + yml_files = [e for e in entries if isinstance(e, dict) and e.get("name", "").endswith(".yml")] + print(f"[+] Found {len(yml_files)} rules in {cat}") + + for entry in yml_files: + raw_url = f"{SIGMA_RAW_URL}/{cat}/{entry['name']}" + content = _fetch_raw(raw_url) + if not content: + continue + + try: + sigma = yaml.safe_load(content) + except Exception as exc: + print(f"[!] YAML parse error for {entry['name']}: {exc}", file=sys.stderr) + continue + + if isinstance(sigma, dict) and "title" in sigma: + sigma["_category"] = cat + sigma["_source_file"] = entry["name"] + all_rules.append(sigma) + + print(f"[+] Total Sigma rules fetched: {len(all_rules)}") + return all_rules + + +def convert_and_write(rules: list[dict], output_dir: Path) -> list[Path]: + """ + Convert Sigma rules to Falco format and write them to output_dir. + + Returns list of written file paths. + """ + output_dir.mkdir(parents=True, exist_ok=True) + written: list[Path] = [] + combined: list[dict] = [] + + for sigma in rules: + falco = sigma_to_falco_rule(sigma) + if falco is None: + continue + + # Individual file + safe_name = re.sub(r"[^a-z0-9_]", "_", falco["rule"].lower())[:80] + out_path = output_dir / f"{safe_name}.yaml" + + with open(out_path, "w", encoding="utf-8") as fh: + yaml.dump([falco], fh, default_flow_style=False, sort_keys=False, allow_unicode=True) + written.append(out_path) + + combined.append(falco) + + # Write combined file + combined_path = output_dir / "rulehound_falco_rules.yaml" + with open(combined_path, "w", encoding="utf-8") as fh: + yaml.dump(combined, fh, default_flow_style=False, sort_keys=False, allow_unicode=True) + written.append(combined_path) + + print(f"[+] Wrote {len(combined)} converted Falco rules and {len(written)} files to {output_dir}") + return written + + +def main() -> int: + parser = argparse.ArgumentParser( + description="Import detection rules from Rulehound (Sigma) and convert to Falco format" + ) + parser.add_argument( + "--output", "-o", + default=str(DEFAULT_OUTPUT_DIR), + help=f"Output directory for converted rules (default: {DEFAULT_OUTPUT_DIR})", + ) + parser.add_argument( + "--categories", "-c", + default="process_creation,file_event", + help="Comma-separated Sigma rule categories to fetch (default: process_creation,file_event)", + ) + args = parser.parse_args() + + output_dir = Path(args.output) + categories = [c.strip() for c in args.categories.split(",") if c.strip()] + + print("=" * 60) + print("Rulehound Detection Rule Importer") + print("=" * 60) + print(f" Output directory: {output_dir}") + print(f" Categories: {', '.join(categories)}") + print() + + # 1. Fetch Sigma rules + sigma_rules = fetch_sigma_rules(categories) + if not sigma_rules: + print("[!] No Sigma rules fetched. Check network connectivity or GitHub API rate limits.", file=sys.stderr) + return 1 + + # 2. Convert and write + written = convert_and_write(sigma_rules, output_dir) + + print() + print("[+] Import complete!") + print(f" Sigma rules fetched: {len(sigma_rules)}") + print(f" Falco rules written: {len(written) - 1}") # -1 for combined file + print(f" Combined rules file: {output_dir / 'rulehound_falco_rules.yaml'}") + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/tests/test_rulehound_import.py b/tests/test_rulehound_import.py new file mode 100644 index 0000000..b406bd1 --- /dev/null +++ b/tests/test_rulehound_import.py @@ -0,0 +1,351 @@ +"""Tests for scripts/import_rulehound_rules.py + +Validates the import script runs without errors and produces valid output, +including unit tests for the Sigma-to-Falco conversion logic. +""" + +from __future__ import annotations + +import importlib +import sys +from pathlib import Path + +import pytest +import yaml + +REPO_ROOT = Path(__file__).resolve().parents[1] +SCRIPT_PATH = REPO_ROOT / "scripts" / "import_rulehound_rules.py" +RULES_DIR = REPO_ROOT / "rules" / "rulehound" + +# --------------------------------------------------------------------------- +# Ensure the script is importable +# --------------------------------------------------------------------------- + +sys.path.insert(0, str(SCRIPT_PATH.parent)) +import_rulehound_rules = importlib.import_module("import_rulehound_rules") + + +# =========================================================================== +# Unit tests – flattening helper +# =========================================================================== + + +class TestFlattenDetection: + """Test _flatten_detection correctly merges selection groups.""" + + def test_nested_selection(self): + detection = { + "selection": { + "Image|endswith": ["/rm", "/shred"], + "CommandLine|contains": "/var/log", + }, + "condition": "selection", + } + flat = import_rulehound_rules._flatten_detection(detection) + assert "Image|endswith" in flat + assert "/rm" in flat["Image|endswith"] + assert "/shred" in flat["Image|endswith"] + # condition should be excluded + assert "condition" not in flat + + def test_top_level_list(self): + detection = { + "Image|endswith": ["/bin/bash"], + } + flat = import_rulehound_rules._flatten_detection(detection) + assert "/bin/bash" in flat.get("Image|endswith", []) + + def test_empty(self): + assert import_rulehound_rules._flatten_detection({}) == {} + + def test_multiple_groups_merged(self): + detection = { + "selection": {"Image|endswith": ["/rm"]}, + "filter": {"Image|endswith": ["/shred"]}, + "condition": "selection and not filter", + } + flat = import_rulehound_rules._flatten_detection(detection) + assert "/rm" in flat["Image|endswith"] + assert "/shred" in flat["Image|endswith"] + + +# =========================================================================== +# Unit tests – conversion helpers +# =========================================================================== + + +class TestMitreTags: + """Test _mitre_tags conversion.""" + + def test_tactic_mapping(self): + tags = import_rulehound_rules._mitre_tags(["attack.defense_evasion"]) + assert "mitre_defense_evasion" in tags + + def test_technique_id(self): + tags = import_rulehound_rules._mitre_tags(["attack.t1070.002"]) + assert "T1070.002" in tags + + def test_bare_technique_id(self): + tags = import_rulehound_rules._mitre_tags(["attack.t1059"]) + assert "T1059" in tags + + def test_deduplication(self): + tags = import_rulehound_rules._mitre_tags( + ["attack.defense_evasion", "attack.defense_evasion"] + ) + assert tags.count("mitre_defense_evasion") == 1 + + def test_empty_input(self): + assert import_rulehound_rules._mitre_tags([]) == [] + + +class TestCategoryToFalcoEvent: + """Test _category_to_falco_event mapping.""" + + def test_process_creation(self): + assert import_rulehound_rules._category_to_falco_event("process_creation") == "spawned_process" + + def test_file_event(self): + assert import_rulehound_rules._category_to_falco_event("file_event") == "open_write" + + def test_network_connection(self): + assert import_rulehound_rules._category_to_falco_event("network_connection") == "outbound" + + def test_unknown_defaults(self): + assert import_rulehound_rules._category_to_falco_event("unknown_cat") == "spawned_process" + + +class TestExtractProcs: + """Test _extract_procs helper.""" + + def test_single_image(self): + detection = {"selection": {"Image": "/usr/bin/shred"}, "condition": "selection"} + procs = import_rulehound_rules._extract_procs(detection) + assert "shred" in procs + + def test_endswith_nested(self): + detection = { + "selection": {"Image|endswith": ["/rm", "/shred", "/unlink"]}, + "condition": "selection", + } + procs = import_rulehound_rules._extract_procs(detection) + assert "rm" in procs + assert "shred" in procs + + def test_wildcard_filtered(self): + detection = { + "selection": {"Image|endswith": ["/clear", "/bash*"]}, + "condition": "selection", + } + procs = import_rulehound_rules._extract_procs(detection) + # Wildcard entries should be filtered out + assert all("*" not in p for p in procs) + + def test_empty(self): + assert import_rulehound_rules._extract_procs({}) == [] + + def test_top_level_image(self): + detection = {"Image": "/usr/bin/whoami"} + procs = import_rulehound_rules._extract_procs(detection) + assert "whoami" in procs + + +class TestExtractPaths: + """Test _extract_paths helper.""" + + def test_target_filename_nested(self): + detection = { + "selection": {"TargetFilename|endswith": "/.ssh/authorized_keys"}, + "condition": "selection", + } + paths = import_rulehound_rules._extract_paths(detection) + assert "/.ssh/authorized_keys" in paths + + def test_path_contains_list(self): + detection = { + "selection": {"TargetFilename|contains": ["/var/log", "/etc/shadow"]}, + "condition": "selection", + } + paths = import_rulehound_rules._extract_paths(detection) + assert "/var/log" in paths + + def test_empty(self): + assert import_rulehound_rules._extract_paths({}) == [] + + +# =========================================================================== +# Integration tests – sigma_to_falco_rule +# =========================================================================== + + +class TestSigmaToFalcoRule: + """Test the full Sigma->Falco conversion.""" + + def test_basic_process_creation_rule(self): + sigma = { + "title": "Test Clear Logs", + "description": "Detects log clearing", + "level": "medium", + "tags": ["attack.defense_evasion", "attack.t1070.002"], + "logsource": {"product": "linux", "category": "process_creation"}, + "detection": { + "selection": { + "Image|endswith": ["/rm", "/shred"], + "CommandLine|contains": "/var/log", + }, + "condition": "selection", + }, + } + result = import_rulehound_rules.sigma_to_falco_rule(sigma) + assert result is not None + assert result["rule"] == "Test Clear Logs" + assert "spawned_process" in result["condition"] + assert "rm" in result["condition"] + assert "shred" in result["condition"] + assert result["priority"] == "WARNING" + assert "mitre_defense_evasion" in result["tags"] + assert "T1070.002" in result["tags"] + assert "rulehound" in result["tags"] + + def test_file_event_rule(self): + sigma = { + "title": "SSH Authorized Keys Modification", + "description": "Detects writes to authorized_keys", + "level": "high", + "tags": ["attack.persistence", "attack.t1098.004"], + "logsource": {"product": "linux", "category": "file_event"}, + "detection": { + "selection": { + "TargetFilename|endswith": "/.ssh/authorized_keys", + }, + "condition": "selection", + }, + } + result = import_rulehound_rules.sigma_to_falco_rule(sigma) + assert result is not None + assert "open_write" in result["condition"] + assert result["priority"] == "CRITICAL" + assert "mitre_persistence" in result["tags"] + + def test_rule_with_no_detections_skipped(self): + sigma = { + "title": "Empty Rule", + "description": "No detection fields", + "level": "low", + "logsource": {"category": "process_creation"}, + "detection": {"condition": "selection"}, + } + result = import_rulehound_rules.sigma_to_falco_rule(sigma) + # Should return None since no procs/paths can be extracted + assert result is None + + def test_rulehound_tag_always_present(self): + sigma = { + "title": "Minimal Rule", + "description": "Minimal", + "level": "medium", + "logsource": {"category": "process_creation"}, + "detection": { + "selection": {"Image": "/usr/bin/whoami"}, + "condition": "selection", + }, + } + result = import_rulehound_rules.sigma_to_falco_rule(sigma) + assert result is not None + assert "rulehound" in result["tags"] + + def test_description_truncation(self): + very_long = "A" * 500 + sigma = { + "title": "Long Desc Rule", + "description": very_long, + "level": "medium", + "logsource": {"category": "process_creation"}, + "detection": {"selection": {"Image": "/bin/ls"}, "condition": "selection"}, + } + result = import_rulehound_rules.sigma_to_falco_rule(sigma) + assert result is not None + assert len(result["desc"]) <= 300 + + def test_network_connection_category(self): + sigma = { + "title": "Suspicious Outbound Connection", + "description": "Test", + "level": "high", + "logsource": {"category": "network_connection"}, + "detection": {"selection": {"Image": "/usr/bin/nc"}, "condition": "selection"}, + } + result = import_rulehound_rules.sigma_to_falco_rule(sigma) + assert result is not None + assert "outbound" in result["condition"] + + def test_linux_tag_always_present(self): + sigma = { + "title": "Linux Test", + "description": "Test", + "level": "medium", + "logsource": {"category": "process_creation"}, + "detection": {"selection": {"Image": "/bin/test"}, "condition": "selection"}, + } + result = import_rulehound_rules.sigma_to_falco_rule(sigma) + assert result is not None + assert "linux" in result["tags"] + + +# =========================================================================== +# End-to-end tests – output validity +# =========================================================================== + + +class TestOutputValidity: + """Test that the output directory structure is valid when rules exist.""" + + def test_output_dir_structure(self): + """If the rules/rulehound/ directory exists, validate structure.""" + if not RULES_DIR.exists(): + pytest.skip("No rulehound output directory yet (run import first)") + + combined = RULES_DIR / "rulehound_falco_rules.yaml" + if combined.exists(): + with open(combined, "r") as fh: + rules = yaml.safe_load(fh) + assert isinstance(rules, list), "Combined file should contain a list" + for rule in rules: + assert isinstance(rule, dict), "Each rule must be a dict" + assert "rule" in rule, "Each rule must have a 'rule' key" + assert "condition" in rule, "Each rule must have a 'condition' key" + assert "priority" in rule, "Each rule must have a 'priority' key" + + def test_individual_rule_files_valid_yaml(self): + """Each individual rule YAML file should be valid.""" + if not RULES_DIR.exists(): + pytest.skip("No rulehound output directory yet (run import first)") + + yml_files = list(RULES_DIR.glob("*.yaml")) + list(RULES_DIR.glob("*.yml")) + # Skip the combined file + yml_files = [f for f in yml_files if f.name != "rulehound_falco_rules.yaml"] + + for yml_file in yml_files[:5]: # Spot-check up to 5 files + with open(yml_file, "r") as fh: + content = yaml.safe_load(fh) + assert isinstance(content, list), f"{yml_file.name} should contain a list" + + +class TestScriptModule: + """Verify the module loaded correctly.""" + + def test_module_imports(self): + assert hasattr(import_rulehound_rules, "fetch_sigma_rules") + assert hasattr(import_rulehound_rules, "convert_and_write") + assert hasattr(import_rulehound_rules, "sigma_to_falco_rule") + assert hasattr(import_rulehound_rules, "_flatten_detection") + + def test_default_output_dir_exists(self): + assert hasattr(import_rulehound_rules, "DEFAULT_OUTPUT_DIR") + + def test_priority_map_complete(self): + pm = import_rulehound_rules.PRIORITY_MAP + assert "critical" in pm + assert "high" in pm + assert "medium" in pm + assert "low" in pm