Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 55 additions & 19 deletions common/vmaas_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import ssl

import aiohttp
import orjson

from common.config import Config
from common.logging import get_logger
Expand All @@ -17,33 +18,68 @@
CFG = Config()


def _json_loads(body: bytes):
"""Parse a UTF-8 JSON HTTP body (orjson: lower CPU than stdlib json on large VMaaS responses)."""
if not body:
return None
return orjson.loads(body)
Comment on lines +21 to +25
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): Consider whether returning None on an empty 200 OK body matches existing expectations.

Previously await response.json() would raise on an empty JSON body; _json_loads now returns None for an empty 200 response. If callers assume a dict/list and don’t handle None, this can cause subtle downstream errors. Consider either preserving the old behavior (raise on empty) or returning a schema-appropriate default value ({}/[]) so unexpected VMaaS responses fail fast.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

empty 200 body becomes None -> VmaasErrorException in logic.py, not a random AttributeError.
Is is true, that it would be more clear to catch this earlier. Thats more like a schema decission. I will find out.



class _VmaasConnectionManager:
Comment thread
sourcery-ai[bot] marked this conversation as resolved.
"""
Encapsulates all state, SSL contexts, and sessions
"""

def __init__(self):
self._session: aiohttp.ClientSession | None = None
self._session_lock: asyncio.Lock | None = None
self._ssl_context = ssl.create_default_context(cafile=CFG.tls_ca_path)

# Ref: https://docs.aiohttp.org/en/stable/client_quickstart.html#make-a-request
async def get_session(self) -> aiohttp.ClientSession:
if self._session_lock is None:
self._session_lock = asyncio.Lock()
async with self._session_lock:
if self._session is None or self._session.closed:
# Ref: https://docs.aiohttp.org/en/stable/client_reference.html#aiohttp.TCPConnector
connector = aiohttp.TCPConnector(ssl=self._ssl_context)
self._session = aiohttp.ClientSession(connector=connector)
Comment thread
sourcery-ai[bot] marked this conversation as resolved.
return self._session


# Instantiate a single, private instance of the manager
_connection_manager = _VmaasConnectionManager()


async def vmaas_request(endpoint, data_json=None, method="POST"):
"""Sends request to VMAAS"""
headers = {"Content-type": "application/json", "Accept": "application/json"}
tries = 0

# Grab the session from the private manager
session = await _connection_manager.get_session()

while True:
if tries >= CFG.request_retries:
break
ssl_ctx = ssl.create_default_context(cafile=CFG.tls_ca_path)
try:
async with aiohttp.ClientSession() as session:
async with session.request(method, endpoint, json=data_json, headers=headers, ssl=ssl_ctx) as response:
if response.status == 200:
return await response.json()
if response.status == 503:
LOGGER.info("VMAAS temporarily unavailable, retrying...")
await asyncio.sleep(1)
else:
tries += 1
VMAAS_RETURN_ERR.inc()
LOGGER.error(
"Error during request to VMaaS endpoint %s: HTTP %s, %s", endpoint, response.status, await response.text()
)
if data_json:
LOGGER.debug("JSON: %s", str(data_json))
# Do not retry for 4xx HTTP codes
if 400 <= response.status < 500:
break
async with session.request(method, endpoint, json=data_json, headers=headers) as response:
if response.status == 200:
return _json_loads(await response.read())

if response.status == 503:
LOGGER.info("VMAAS temporarily unavailable, retrying...")
await asyncio.sleep(1)
else:
tries += 1
VMAAS_RETURN_ERR.inc()
LOGGER.error("Error during request to VMaaS endpoint %s: HTTP %s, %s", endpoint, response.status, await response.text())
if data_json:
LOGGER.debug("JSON: %s", str(data_json))
# Do not retry for 4xx HTTP codes
if 400 <= response.status < 500:
break

except aiohttp.ClientError:
tries += 1
VMAAS_CNX_ERR.inc()
Expand Down
36 changes: 21 additions & 15 deletions evaluator/logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,21 +321,27 @@ async def _perform_vmaas_request(self, vmaas_json: dict) -> (List[CveAdvisories]
vmaas_json["latest_only"] = True
vmaas_response = await vmaas_request(CFG.vmaas_vulnerabilities_endpoint, vmaas_json)
if vmaas_response:
for cve in vmaas_response["cve_list"]:
playbook_cves.append(CveAdvisories(cve["cve"], ",".join(sorted(cve["errata"] or [])) or None))
for cve in vmaas_response["manually_fixable_cve_list"]:
manually_fixable_cves.append(CveAdvisories(cve["cve"], ",".join(sorted(cve["errata"] or [])) or None))
for cve in vmaas_response["unpatched_cve_list"]:
for affected_package in cve["affected"]:
unpatched_cves.append(
CveUnpatched(
cve["cve"],
affected_package["package_name"],
affected_package["cpe"],
affected_package["module_name"],
affected_package["module_stream"],
)
)
playbook_cves = [
CveAdvisories(cve["cve"], ",".join(sorted(cve["errata"] or [])) or None) for cve in vmaas_response.get("cve_list", [])
]

manually_fixable_cves = [
CveAdvisories(cve["cve"], ",".join(sorted(cve["errata"] or [])) or None)
for cve in vmaas_response.get("manually_fixable_cve_list", [])
]

# Nested list comprehension for unpatched CVEs (flatten should be the fastest way to do)
Comment thread
sourcery-ai[bot] marked this conversation as resolved.
unpatched_cves = [
CveUnpatched(
cve["cve"],
affected_package["package_name"],
affected_package["cpe"],
affected_package["module_name"],
affected_package["module_stream"],
)
for cve in vmaas_response.get("unpatched_cve_list", [])
for affected_package in cve.get("affected", [])
]
else:
raise VmaasErrorException("cannot evaluate system, unable to contact vmaas for evaluation")
return playbook_cves, manually_fixable_cves, unpatched_cves
Expand Down
81 changes: 60 additions & 21 deletions evaluator/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,50 +194,89 @@ async def _load_db_system_vulnerabilities(self, system_platform: SystemPlatform,
@time(EVAL_PART_TIME.labels(part="insert_vulnerabilities"))
async def _insert_vulnerabilities(self, to_insert: [Dict], conn: AsyncConnection) -> List[Tuple]:
"""Insert given system_vulnerabilities"""
if not to_insert:
return []

# Pre-pack dicts into tuples, this helps psycopg from having to do expensive dictionary key lookups per row
tuple_data = [
(
r["state"].value if isinstance(r["state"], VulnerabilityState) else r["state"],
r["cve_id"],
r["rh_account_id"],
r["system_id"],
r["advisories"],
r["advisory_available"],
r["when_mitigated"],
r["rule_id"],
Comment thread
sourcery-ai[bot] marked this conversation as resolved.
r["rule_hit_details"],
r["mitigation_reason"],
r["remediation_type_id"],
)
for r in to_insert
]

new_sys_vulns = []
async with conn.cursor(row_factory=dict_row) as cur:
await cur.executemany(
"""
INSERT INTO system_vulnerabilities_active
(state, cve_id, rh_account_id, system_id,
advisories, advisory_available, when_mitigated, rule_id,
rule_hit_details, mitigation_reason, remediation_type_id)
VALUES
(%(state)s, %(cve_id)s, %(rh_account_id)s, %(system_id)s,
%(advisories)s, %(advisory_available)s, %(when_mitigated)s,
%(rule_id)s, %(rule_hit_details)s,
%(mitigation_reason)s, %(remediation_type_id)s)
advisories, advisory_available, when_mitigated, rule_id,
rule_hit_details, mitigation_reason, remediation_type_id)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
RETURNING id, cve_id
""",
to_insert,
""",
tuple_data,
returning=True,
)

for inserted in await executemany_fetchall(cur):
new_sys_vulns.append((inserted["id"], inserted["cve_id"]))

return new_sys_vulns

@time(EVAL_PART_TIME.labels(part="update_vulnerabilities"))
async def _update_vulnerabilities(self, to_update: [Dict], conn: AsyncConnection):
"""Update given system_vulnerabilities"""
# TODO: think if it would be possible to do EXECUTEMANY
# where the update queries would update only changed fields
if not to_update:
return

tuple_data = [
(
r["state"].value if isinstance(r["state"], VulnerabilityState) else r["state"],
r["advisories"],
r["advisory_available"],
r["when_mitigated"],
r["rule_id"],
r["rule_hit_details"],
r["mitigation_reason"],
r["remediation_type_id"],
r["rh_account_id"],
r["system_id"],
r["cve_id"],
)
for r in to_update
]

async with conn.cursor() as cur:
await cur.executemany(
"""
UPDATE system_vulnerabilities_active AS sv
SET state = %(state)s,
advisories = %(advisories)s,
advisory_available = %(advisory_available)s,
when_mitigated = %(when_mitigated)s,
rule_id = %(rule_id)s,
rule_hit_details = %(rule_hit_details)s,
mitigation_reason = %(mitigation_reason)s,
remediation_type_id = %(remediation_type_id)s
WHERE sv.rh_account_id = %(rh_account_id)s
AND sv.system_id = %(system_id)s
AND sv.cve_id = %(cve_id)s
SET state = %s,
advisories = %s,
advisory_available = %s,
when_mitigated = %s,
rule_id = %s,
rule_hit_details = %s,
mitigation_reason = %s,
remediation_type_id = %s
WHERE sv.rh_account_id = %s
AND sv.system_id = %s
AND sv.cve_id = %s
""",
to_update,
tuple_data,
)

@time(EVAL_PART_TIME.labels(part="delete_vulnerabilities"))
Expand Down
16 changes: 16 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1300,6 +1300,22 @@ oauthlib==3.3.1 ; python_version >= "3.12" and python_version < "3.14" \
packaging==26.1 ; python_version >= "3.12" and python_version < "3.14" \
--hash=sha256:5d9c0669c6285e491e0ced2eee587eaf67b670d94a19e94e3984a481aba6802f \
--hash=sha256:f042152b681c4bfac5cae2742a55e103d27ab2ec0f3d88037136b6bfe7c9c5de
orjson==3.11.8 ; python_version >= "3.12" and python_version < "3.14" \
--hash=sha256:093d489fa039ddade2db541097dbb484999fcc65fc2b0ff9819141e2ab364f25 \
--hash=sha256:14778ffd0f6896aa613951a7fbf4690229aa7a543cb2bfbe9f358e08aafa9546 \
--hash=sha256:1cd0b77e77c95758f8e1100139844e99f3ccc87e71e6fc8e1c027e55807c549f \
--hash=sha256:3cf17c141617b88ced4536b2135c552490f07799f6ad565948ea07bef0dcb9a6 \
--hash=sha256:3f262401086a3960586af06c054609365e98407151f5ea24a62893a40d80dbbb \
--hash=sha256:469ac2125611b7c5741a0b3798cd9e5786cbad6345f9f400c77212be89563bec \
--hash=sha256:48854463b0572cc87dac7d981aa72ed8bf6deedc0511853dc76b8bbd5482d36d \
--hash=sha256:53a0f57e59a530d18a142f4d4ba6dfc708dc5fdedce45e98ff06b44930a2a48f \
--hash=sha256:54153d21520a71a4c82a0dbb4523e468941d549d221dc173de0f019678cf3813 \
--hash=sha256:6a3d159d5ffa0e3961f353c4b036540996bf8b9697ccc38261c0eac1fd3347a6 \
--hash=sha256:76070a76e9c5ae661e2d9848f216980d8d533e0f8143e6ed462807b242e3c5e8 \
--hash=sha256:8e8c6218b614badf8e229b697865df4301afa74b791b6c9ade01d19a9953a942 \
--hash=sha256:9b48e274f8824567d74e2158199e269597edf00823a1b12b63d48462bbf5123e \
--hash=sha256:e0950ed1bcb9893f4293fd5c5a7ee10934fbf82c4101c70be360db23ce24b7d2 \
--hash=sha256:ea56a955056a6d6c550cf18b3348656a9d9a4f02e2d0c02cabf3c73f1055d506
peewee==3.19.0 ; python_version >= "3.12" and python_version < "3.14" \
--hash=sha256:de220b94766e6008c466e00ce4ba5299b9a832117d9eb36d45d0062f3cfd7417 \
--hash=sha256:f88292a6f0d7b906cb26bca9c8599b8f4d8920ebd36124400d0cbaaaf915511f
Expand Down
Loading