Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions common/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,11 @@ def __init__(self):
self.kessel_auth_oidc_issuer = os.getenv("KESSEL_AUTH_OIDC_ISSUER", "https://sso.redhat.com/auth/realms/redhat-external")
self.kessel_insecure = strtobool(os.getenv("KESSEL_INSECURE", "TRUE"))

# Evaluator
self.insights_load_rule_cache_ttl_sec = int(
os.getenv("INSIGHTS_LOAD_RULE_CACHE_TTL_SEC", "0")
) # 0 -> local cache disabled -> always use DB cache


# pylint: disable=too-many-instance-attributes
class Config(BaseConfig, metaclass=Singleton):
Expand Down
1 change: 1 addition & 0 deletions conf/evaluator.env
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ DB_MAX_POOL_SIZE=30
MAX_LOADED_EVALUATOR_MSGS=20
USE_VMAAS_GO=true
INVENTORY_VIEWS_TOPIC=platform.inventory.host-apps
INSIGHTS_LOAD_RULE_CACHE_TTL_SEC=360
81 changes: 73 additions & 8 deletions evaluator/logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

import json
from datetime import datetime
from datetime import timedelta
from datetime import timezone
from typing import Dict
from typing import List
from typing import Optional
Expand Down Expand Up @@ -55,6 +57,8 @@ def __init__(self, db_pool: AsyncConnectionPool):
self.module_cache: Dict[str, ModuleCache] = {}
self.vulnerable_package_cache: Dict[(int, int, Optional[int]), VulnerablePackageCache] = {}
self.skipped_rules = ["CVE_2017_5715_cpu_virt|VIRT_CVE_2017_5715_CPU_3_ONLYKERNEL", "CVE_2017_5715_cpu_virt"]
self.rule_cache: Dict[str, RuleCache] = {}
Comment thread
sourcery-ai[bot] marked this conversation as resolved.
self.rule_cache_expires_at: Optional[datetime] = None

async def init(self):
"""Async constructor"""
Expand All @@ -64,6 +68,7 @@ async def init(self):
self.cpe_cache = await self._load_cpe_cache()
self.module_cache = await self._load_module_cache()
self.vulnerable_package_cache = await self._load_vulnerable_package_cache()
await self._get_rule_cache() # Rules cache warmup

async def _load_cve_impact_cache(self) -> Dict[str, CveImpactCache]:
"""Load cve impact cache from DB"""
Expand All @@ -86,14 +91,72 @@ async def _load_cve_cache(self) -> Dict[str, CveCache]:
return cache

async def _load_rule_cache(self, conn: AsyncConnection) -> Dict[str, RuleCache]:
"""Load rule cache from DB"""
"""Load fresh rule cache from DB. You can also load cache snapshot (see _get_rule_cache)"""
cache = {}
async with conn.cursor(row_factory=dict_row) as cur:
await cur.execute("""SELECT id, name, playbook_count FROM insights_rule""")
for rule in await cur.fetchall():
cache[rule["name"]] = RuleCache(rule["id"], rule["playbook_count"])
return cache

def _set_rule_cache_expiry(self) -> None:
"""Set rule cache expiry; only used when TTL > 0 (see _rule_cache_expired)"""
ttl = CFG.insights_load_rule_cache_ttl_sec
if ttl > 0:
self.rule_cache_expires_at = datetime.now(timezone.utc) + timedelta(seconds=ttl)
else:
self.rule_cache_expires_at = None

def _rule_cache_expired(self) -> bool:
"""TTL <= 0 disables caching (always reload on get from DB)"""
if CFG.insights_load_rule_cache_ttl_sec <= 0:
return True
if not self.rule_cache or self.rule_cache_expires_at is None:
return True
return datetime.now(timezone.utc) >= self.rule_cache_expires_at

async def _load_rule_cache_for_name(self, conn: AsyncConnection, name: str) -> Optional[RuleCache]:
"""Load one insight rule row by name for lazy cache misses during advisor merge"""
async with conn.cursor(row_factory=dict_row) as cur:
await cur.execute(
"""SELECT id, playbook_count FROM insights_rule WHERE name = %s""",
(name,),
)
rule = await cur.fetchone()
if not rule:
return None
return RuleCache(rule["id"], rule["playbook_count"])

async def _get_rule_cache(self, conn: Optional[AsyncConnection] = None) -> Dict[str, RuleCache]:
"""Return the canoical rule cache dict, refreshing from DB when TTL expired (see _load_rule_cache)"""
if self._rule_cache_expired():
if conn is None:
async with self.db_pool.connection() as conn:
self.rule_cache = await self._load_rule_cache(conn)
else:
self.rule_cache = await self._load_rule_cache(conn)
self._set_rule_cache_expiry()
return self.rule_cache

async def _get_rule_from_cache(
self,
conn: AsyncConnection,
rule_name: str,
missing_rules_not_found: Set[str],
) -> Optional[RuleCache]:
"""Resolve rule metadata from self.rule_cache, lazy-loading by name on miss"""
cached = self.rule_cache.get(rule_name)
if cached:
return cached
if rule_name in missing_rules_not_found:
return None
fetched = await self._load_rule_cache_for_name(conn, rule_name)
if fetched:
self.rule_cache[rule_name] = fetched
return fetched
missing_rules_not_found.add(rule_name)
return None

async def _load_package_name_cache(self) -> Dict[str, PackageNameCache]:
"""Load package name cache from DB"""
cache = {}
Expand Down Expand Up @@ -575,11 +638,15 @@ async def _evaluate_advisor_res(
self,
rule_results: dict,
sys_vuln_rows: Dict[str, SystemVulnerabilitiesRow],
rule_cache: Dict[str, RuleCache],
system_platform: SystemPlatform,
unpatched_cves: Set[str],
conn: AsyncConnection,
) -> Dict[str, SystemVulnerabilitiesRow]:
"""Merge results from vmaas package evaluation with advisor rule evaluation"""

await self._get_rule_cache(conn)
missing_rules_not_found: Set[str] = set()

for cve, hit_details in rule_results["rule_hits"].items():
if cve in unpatched_cves:
# skip cves that are unpatched to avoid duplicates
Expand All @@ -596,7 +663,7 @@ async def _evaluate_advisor_res(
if not isinstance(hit_details["details"], dict):
hit_details["details"] = json.loads(hit_details["details"])

rule_db = rule_cache.get(rule)
rule_db = await self._get_rule_from_cache(conn, rule, missing_rules_not_found)
if not rule_db:
continue

Expand Down Expand Up @@ -645,7 +712,7 @@ async def _evaluate_advisor_res(
continue

# system was marked vulnerable from vmaas but not from by rules -> abnv
rule_db = rule_cache.get(rule)
rule_db = await self._get_rule_from_cache(conn, rule, missing_rules_not_found)
if not rule_db:
continue

Expand Down Expand Up @@ -676,11 +743,9 @@ async def evaluate_vulnerabilities(self, system_platform: SystemPlatform, conn:
if system_platform.rule_results:
# set of unpatched cves from vmaas for exclusion from advisor evaluation
unpatched_cves_set = set(x.cve for x in unpatched_cves)
# unfortunately, rule cache can change meanwhile evaluator is running,
# so it cannot be static and needs to be loaded on each evaluation
# Rule metadata is cached in memory (warm in init), TTL controls refresh while evaluator runs
with RULES_EVAL_TIME.time():
rule_cache = await self._load_rule_cache(conn)
sys_vuln_rows = await self._evaluate_advisor_res(
system_platform.rule_results, sys_vuln_rows, rule_cache, system_platform, unpatched_cves_set
system_platform.rule_results, sys_vuln_rows, system_platform, unpatched_cves_set, conn
)
return sys_vuln_rows
Loading
Loading