From baa12dbdd32472a945f08495b2e43e258a7ab2ce Mon Sep 17 00:00:00 2001 From: Andrej Luptak Date: Tue, 21 Apr 2026 16:57:58 +0200 Subject: [PATCH 1/5] perf: faster json vmaas response parse orjson library claims itself 'as the fastest Python library' This made a little performance improvement, however, this still looks like a single piece of puzzle here. RHINENG-24321 --- common/vmaas_client.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/common/vmaas_client.py b/common/vmaas_client.py index 5a6792ce6..8fe4211e9 100644 --- a/common/vmaas_client.py +++ b/common/vmaas_client.py @@ -6,6 +6,7 @@ import ssl import aiohttp +import orjson from common.config import Config from common.logging import get_logger @@ -17,6 +18,13 @@ CFG = Config() +def _json_loads(body: bytes): + """Parse a UTF-8 JSON HTTP body (orjson: lower CPU than stdlib json on large VMaaS responses).""" + if not body: + return None + return orjson.loads(body) + + async def vmaas_request(endpoint, data_json=None, method="POST"): """Sends request to VMAAS""" headers = {"Content-type": "application/json", "Accept": "application/json"} @@ -29,7 +37,8 @@ async def vmaas_request(endpoint, data_json=None, method="POST"): async with aiohttp.ClientSession() as session: async with session.request(method, endpoint, json=data_json, headers=headers, ssl=ssl_ctx) as response: if response.status == 200: - return await response.json() + # Read raw bytes once; decode with orjson (hot path for large VMaaS responses). + return _json_loads(await response.read()) if response.status == 503: LOGGER.info("VMAAS temporarily unavailable, retrying...") await asyncio.sleep(1) From e43878eb79dd6441b9c6b55e3ce391d5812bace9 Mon Sep 17 00:00:00 2001 From: Andrej Luptak Date: Wed, 29 Apr 2026 15:43:17 +0200 Subject: [PATCH 2/5] chore: orjson updated requirements --- requirements.txt | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/requirements.txt b/requirements.txt index f24694f73..98a53c879 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1300,6 +1300,22 @@ oauthlib==3.3.1 ; python_version >= "3.12" and python_version < "3.14" \ packaging==26.1 ; python_version >= "3.12" and python_version < "3.14" \ --hash=sha256:5d9c0669c6285e491e0ced2eee587eaf67b670d94a19e94e3984a481aba6802f \ --hash=sha256:f042152b681c4bfac5cae2742a55e103d27ab2ec0f3d88037136b6bfe7c9c5de +orjson==3.11.8 ; python_version >= "3.12" and python_version < "3.14" \ + --hash=sha256:093d489fa039ddade2db541097dbb484999fcc65fc2b0ff9819141e2ab364f25 \ + --hash=sha256:14778ffd0f6896aa613951a7fbf4690229aa7a543cb2bfbe9f358e08aafa9546 \ + --hash=sha256:1cd0b77e77c95758f8e1100139844e99f3ccc87e71e6fc8e1c027e55807c549f \ + --hash=sha256:3cf17c141617b88ced4536b2135c552490f07799f6ad565948ea07bef0dcb9a6 \ + --hash=sha256:3f262401086a3960586af06c054609365e98407151f5ea24a62893a40d80dbbb \ + --hash=sha256:469ac2125611b7c5741a0b3798cd9e5786cbad6345f9f400c77212be89563bec \ + --hash=sha256:48854463b0572cc87dac7d981aa72ed8bf6deedc0511853dc76b8bbd5482d36d \ + --hash=sha256:53a0f57e59a530d18a142f4d4ba6dfc708dc5fdedce45e98ff06b44930a2a48f \ + --hash=sha256:54153d21520a71a4c82a0dbb4523e468941d549d221dc173de0f019678cf3813 \ + --hash=sha256:6a3d159d5ffa0e3961f353c4b036540996bf8b9697ccc38261c0eac1fd3347a6 \ + --hash=sha256:76070a76e9c5ae661e2d9848f216980d8d533e0f8143e6ed462807b242e3c5e8 \ + --hash=sha256:8e8c6218b614badf8e229b697865df4301afa74b791b6c9ade01d19a9953a942 \ + --hash=sha256:9b48e274f8824567d74e2158199e269597edf00823a1b12b63d48462bbf5123e \ + --hash=sha256:e0950ed1bcb9893f4293fd5c5a7ee10934fbf82c4101c70be360db23ce24b7d2 \ + --hash=sha256:ea56a955056a6d6c550cf18b3348656a9d9a4f02e2d0c02cabf3c73f1055d506 peewee==3.19.0 ; python_version >= "3.12" and python_version < "3.14" \ --hash=sha256:de220b94766e6008c466e00ce4ba5299b9a832117d9eb36d45d0062f3cfd7417 \ --hash=sha256:f88292a6f0d7b906cb26bca9c8599b8f4d8920ebd36124400d0cbaaaf915511f From 427908281e5c50cc3e01fe9e901c236b833f7194 Mon Sep 17 00:00:00 2001 From: Andrej Luptak Date: Wed, 29 Apr 2026 15:49:17 +0200 Subject: [PATCH 3/5] perf: reduce the number of vmaas sessions We do not need to create new session for each vmaas call but rather reuse a single one. RHINENG-24321 --- common/vmaas_client.py | 67 +++++++++++++++++++++++++++++------------- 1 file changed, 47 insertions(+), 20 deletions(-) diff --git a/common/vmaas_client.py b/common/vmaas_client.py index 8fe4211e9..8db275a53 100644 --- a/common/vmaas_client.py +++ b/common/vmaas_client.py @@ -25,34 +25,61 @@ def _json_loads(body: bytes): return orjson.loads(body) +class _VmaasConnectionManager: + """ + Encapsulates all state, SSL contexts, and sessions + """ + + def __init__(self): + self._session: aiohttp.ClientSession | None = None + self._session_lock: asyncio.Lock | None = None + self._ssl_context = ssl.create_default_context(cafile=CFG.tls_ca_path) + + # Ref: https://docs.aiohttp.org/en/stable/client_quickstart.html#make-a-request + async def get_session(self) -> aiohttp.ClientSession: + if self._session_lock is None: + self._session_lock = asyncio.Lock() + async with self._session_lock: + if self._session is None or self._session.closed: + # Ref: https://docs.aiohttp.org/en/stable/client_reference.html#aiohttp.TCPConnector + connector = aiohttp.TCPConnector(ssl=self._ssl_context) + self._session = aiohttp.ClientSession(connector=connector) + return self._session + + +# Instantiate a single, private instance of the manager +_connection_manager = _VmaasConnectionManager() + + async def vmaas_request(endpoint, data_json=None, method="POST"): """Sends request to VMAAS""" headers = {"Content-type": "application/json", "Accept": "application/json"} tries = 0 + + # Grab the session from the private manager + session = await _connection_manager.get_session() + while True: if tries >= CFG.request_retries: break - ssl_ctx = ssl.create_default_context(cafile=CFG.tls_ca_path) try: - async with aiohttp.ClientSession() as session: - async with session.request(method, endpoint, json=data_json, headers=headers, ssl=ssl_ctx) as response: - if response.status == 200: - # Read raw bytes once; decode with orjson (hot path for large VMaaS responses). - return _json_loads(await response.read()) - if response.status == 503: - LOGGER.info("VMAAS temporarily unavailable, retrying...") - await asyncio.sleep(1) - else: - tries += 1 - VMAAS_RETURN_ERR.inc() - LOGGER.error( - "Error during request to VMaaS endpoint %s: HTTP %s, %s", endpoint, response.status, await response.text() - ) - if data_json: - LOGGER.debug("JSON: %s", str(data_json)) - # Do not retry for 4xx HTTP codes - if 400 <= response.status < 500: - break + async with session.request(method, endpoint, json=data_json, headers=headers) as response: + if response.status == 200: + return _json_loads(await response.read()) + + if response.status == 503: + LOGGER.info("VMAAS temporarily unavailable, retrying...") + await asyncio.sleep(1) + else: + tries += 1 + VMAAS_RETURN_ERR.inc() + LOGGER.error("Error during request to VMaaS endpoint %s: HTTP %s, %s", endpoint, response.status, await response.text()) + if data_json: + LOGGER.debug("JSON: %s", str(data_json)) + # Do not retry for 4xx HTTP codes + if 400 <= response.status < 500: + break + except aiohttp.ClientError: tries += 1 VMAAS_CNX_ERR.inc() From 4b7ba98ca39df6eeb34a4624de35c06c7d0ece79 Mon Sep 17 00:00:00 2001 From: Andrej Luptak Date: Wed, 29 Apr 2026 15:58:01 +0200 Subject: [PATCH 4/5] perf: use list comprehentions to parse vmaas response Using list comprehentions to parse vmaas responses should have overall better performance than nested for loops. RHINENG-24321 --- evaluator/logic.py | 36 +++++++++++++++++++++--------------- 1 file changed, 21 insertions(+), 15 deletions(-) diff --git a/evaluator/logic.py b/evaluator/logic.py index 11bd3b171..bc4773808 100644 --- a/evaluator/logic.py +++ b/evaluator/logic.py @@ -321,21 +321,27 @@ async def _perform_vmaas_request(self, vmaas_json: dict) -> (List[CveAdvisories] vmaas_json["latest_only"] = True vmaas_response = await vmaas_request(CFG.vmaas_vulnerabilities_endpoint, vmaas_json) if vmaas_response: - for cve in vmaas_response["cve_list"]: - playbook_cves.append(CveAdvisories(cve["cve"], ",".join(sorted(cve["errata"] or [])) or None)) - for cve in vmaas_response["manually_fixable_cve_list"]: - manually_fixable_cves.append(CveAdvisories(cve["cve"], ",".join(sorted(cve["errata"] or [])) or None)) - for cve in vmaas_response["unpatched_cve_list"]: - for affected_package in cve["affected"]: - unpatched_cves.append( - CveUnpatched( - cve["cve"], - affected_package["package_name"], - affected_package["cpe"], - affected_package["module_name"], - affected_package["module_stream"], - ) - ) + playbook_cves = [ + CveAdvisories(cve["cve"], ",".join(sorted(cve["errata"] or [])) or None) for cve in vmaas_response.get("cve_list", []) + ] + + manually_fixable_cves = [ + CveAdvisories(cve["cve"], ",".join(sorted(cve["errata"] or [])) or None) + for cve in vmaas_response.get("manually_fixable_cve_list", []) + ] + + # Nested list comprehension for unpatched CVEs (flatten should be the fastest way to do) + unpatched_cves = [ + CveUnpatched( + cve["cve"], + affected_package["package_name"], + affected_package["cpe"], + affected_package["module_name"], + affected_package["module_stream"], + ) + for cve in vmaas_response.get("unpatched_cve_list", []) + for affected_package in cve.get("affected", []) + ] else: raise VmaasErrorException("cannot evaluate system, unable to contact vmaas for evaluation") return playbook_cves, manually_fixable_cves, unpatched_cves From 10bad0d14bdd63bc4a711bbb4183d779bd248836 Mon Sep 17 00:00:00 2001 From: Andrej Luptak Date: Wed, 29 Apr 2026 16:03:33 +0200 Subject: [PATCH 5/5] perf: prepare data for psycopg By preparing data into tuples we help the psycopg binding cost per row. RHINENG-24321 --- evaluator/processor.py | 81 +++++++++++++++++++++++++++++++----------- 1 file changed, 60 insertions(+), 21 deletions(-) diff --git a/evaluator/processor.py b/evaluator/processor.py index 5a9344730..9926e3cb5 100644 --- a/evaluator/processor.py +++ b/evaluator/processor.py @@ -194,26 +194,45 @@ async def _load_db_system_vulnerabilities(self, system_platform: SystemPlatform, @time(EVAL_PART_TIME.labels(part="insert_vulnerabilities")) async def _insert_vulnerabilities(self, to_insert: [Dict], conn: AsyncConnection) -> List[Tuple]: """Insert given system_vulnerabilities""" + if not to_insert: + return [] + + # Pre-pack dicts into tuples, this helps psycopg from having to do expensive dictionary key lookups per row + tuple_data = [ + ( + r["state"].value if isinstance(r["state"], VulnerabilityState) else r["state"], + r["cve_id"], + r["rh_account_id"], + r["system_id"], + r["advisories"], + r["advisory_available"], + r["when_mitigated"], + r["rule_id"], + r["rule_hit_details"], + r["mitigation_reason"], + r["remediation_type_id"], + ) + for r in to_insert + ] + new_sys_vulns = [] async with conn.cursor(row_factory=dict_row) as cur: await cur.executemany( """ INSERT INTO system_vulnerabilities_active (state, cve_id, rh_account_id, system_id, - advisories, advisory_available, when_mitigated, rule_id, - rule_hit_details, mitigation_reason, remediation_type_id) - VALUES - (%(state)s, %(cve_id)s, %(rh_account_id)s, %(system_id)s, - %(advisories)s, %(advisory_available)s, %(when_mitigated)s, - %(rule_id)s, %(rule_hit_details)s, - %(mitigation_reason)s, %(remediation_type_id)s) + advisories, advisory_available, when_mitigated, rule_id, + rule_hit_details, mitigation_reason, remediation_type_id) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) RETURNING id, cve_id - """, - to_insert, + """, + tuple_data, returning=True, ) + for inserted in await executemany_fetchall(cur): new_sys_vulns.append((inserted["id"], inserted["cve_id"])) + return new_sys_vulns @time(EVAL_PART_TIME.labels(part="update_vulnerabilities")) @@ -221,23 +240,43 @@ async def _update_vulnerabilities(self, to_update: [Dict], conn: AsyncConnection """Update given system_vulnerabilities""" # TODO: think if it would be possible to do EXECUTEMANY # where the update queries would update only changed fields + if not to_update: + return + + tuple_data = [ + ( + r["state"].value if isinstance(r["state"], VulnerabilityState) else r["state"], + r["advisories"], + r["advisory_available"], + r["when_mitigated"], + r["rule_id"], + r["rule_hit_details"], + r["mitigation_reason"], + r["remediation_type_id"], + r["rh_account_id"], + r["system_id"], + r["cve_id"], + ) + for r in to_update + ] + async with conn.cursor() as cur: await cur.executemany( """ UPDATE system_vulnerabilities_active AS sv - SET state = %(state)s, - advisories = %(advisories)s, - advisory_available = %(advisory_available)s, - when_mitigated = %(when_mitigated)s, - rule_id = %(rule_id)s, - rule_hit_details = %(rule_hit_details)s, - mitigation_reason = %(mitigation_reason)s, - remediation_type_id = %(remediation_type_id)s - WHERE sv.rh_account_id = %(rh_account_id)s - AND sv.system_id = %(system_id)s - AND sv.cve_id = %(cve_id)s + SET state = %s, + advisories = %s, + advisory_available = %s, + when_mitigated = %s, + rule_id = %s, + rule_hit_details = %s, + mitigation_reason = %s, + remediation_type_id = %s + WHERE sv.rh_account_id = %s + AND sv.system_id = %s + AND sv.cve_id = %s """, - to_update, + tuple_data, ) @time(EVAL_PART_TIME.labels(part="delete_vulnerabilities"))