Cpu performance improvements#2305
Conversation
orjson library claims itself 'as the fastest Python library' This made a little performance improvement, however, this still looks like a single piece of puzzle here. RHINENG-24321
We do not need to create new session for each vmaas call but rather reuse a single one. RHINENG-24321
Using list comprehentions to parse vmaas responses should have overall better performance than nested for loops. RHINENG-24321
By preparing data into tuples we help the psycopg binding cost per row. RHINENG-24321
Reviewer's GuideOptimizes CPU usage when talking to VMaaS and the database by reusing a single aiohttp ClientSession, parsing JSON responses with orjson and list comprehensions, and pre-packing vulnerability rows into tuples for faster psycopg executemany operations. Class diagram for _VmaasConnectionManager and vmaas_request integrationclassDiagram
class _VmaasConnectionManager {
-aiohttp.ClientSession _session
-asyncio.Lock _session_lock
-ssl.SSLContext _ssl_context
+_VmaasConnectionManager()
+get_session() aiohttp.ClientSession
}
class vmaas_request_fn {
+vmaas_request(endpoint, data_json, method) dict
}
_VmaasConnectionManager <.. vmaas_request_fn : uses
aiohttp.ClientSession <.. _VmaasConnectionManager : creates
ssl.SSLContext <.. _VmaasConnectionManager : uses
orjson <.. vmaas_request_fn : parses JSON
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
SC Environment Impact AssessmentOverall Impact: ⚪ NONE No SC Environment-specific impacts detected in this PR. What was checkedThis PR was automatically scanned for:
|
There was a problem hiding this comment.
Hey - I've found 5 issues
Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments
### Comment 1
<location path="common/vmaas_client.py" line_range="21-25" />
<code_context>
CFG = Config()
+def _json_loads(body: bytes):
+ """Parse a UTF-8 JSON HTTP body (orjson: lower CPU than stdlib json on large VMaaS responses)."""
+ if not body:
+ return None
+ return orjson.loads(body)
+
+
</code_context>
<issue_to_address>
**issue (bug_risk):** Consider whether returning None on an empty 200 OK body matches existing expectations.
Previously `await response.json()` would raise on an empty JSON body; `_json_loads` now returns `None` for an empty 200 response. If callers assume a dict/list and don’t handle `None`, this can cause subtle downstream errors. Consider either preserving the old behavior (raise on empty) or returning a schema-appropriate default value (`{}`/`[]`) so unexpected VMaaS responses fail fast.
</issue_to_address>
### Comment 2
<location path="common/vmaas_client.py" line_range="39-46" />
<code_context>
+ self._ssl_context = ssl.create_default_context(cafile=CFG.tls_ca_path)
+
+ # Ref: https://docs.aiohttp.org/en/stable/client_quickstart.html#make-a-request
+ async def get_session(self) -> aiohttp.ClientSession:
+ if self._session_lock is None:
+ self._session_lock = asyncio.Lock()
+ async with self._session_lock:
+ if self._session is None or self._session.closed:
+ # Ref: https://docs.aiohttp.org/en/stable/client_reference.html#aiohttp.TCPConnector
+ connector = aiohttp.TCPConnector(ssl=self._ssl_context)
+ self._session = aiohttp.ClientSession(connector=connector)
+ return self._session
+
</code_context>
<issue_to_address>
**suggestion (bug_risk):** The shared ClientSession is never closed, which can lead to resource leaks on shutdown.
This long-lived `ClientSession` has no lifecycle hook to close it, so connections/file descriptors may be leaked and graceful shutdown becomes harder. Consider adding an explicit `close()`/`aclose()` on `_VmaasConnectionManager` and integrating it into the service shutdown path so `self._session.close()` (and connector cleanup) are reliably called.
Suggested implementation:
```python
# Ref: https://docs.aiohttp.org/en/stable/client_quickstart.html#make-a-request
async def get_session(self) -> aiohttp.ClientSession:
if self._session_lock is None:
self._session_lock = asyncio.Lock()
async with self._session_lock:
if self._session is None or self._session.closed:
# Ref: https://docs.aiohttp.org/en/stable/client_reference.html#aiohttp.TCPConnector
connector = aiohttp.TCPConnector(ssl=self._ssl_context)
self._session = aiohttp.ClientSession(connector=connector)
return self._session
async def aclose(self) -> None:
"""
Close the underlying aiohttp.ClientSession, if it exists.
This should be called from the service shutdown path to ensure that
connections and file descriptors are released cleanly.
"""
if self._session is not None and not self._session.closed:
await self._session.close()
self._session = None
```
To fully implement the lifecycle management, you should:
1. Identify where `_VmaasConnectionManager` is instantiated and ensure that `await vmaas_connection_manager.aclose()` (or equivalent) is called from your service/app shutdown hook (e.g., FastAPI/Lifespan event, aiohttp app cleanup signal, or custom shutdown handler).
2. If the surrounding code is not async-aware at shutdown, consider adding a thin synchronous wrapper that schedules `aclose()` on the running event loop, or adapt your shutdown path to be async so it can `await` this method directly.
</issue_to_address>
### Comment 3
<location path="evaluator/logic.py" line_range="324-333" />
<code_context>
+ playbook_cves = [
</code_context>
<issue_to_address>
**question (bug_risk):** Using `.get(..., [])` changes behavior when VMaaS omits expected keys, which may mask schema issues.
Direct access to `vmaas_response[...]` previously surfaced missing keys as errors, making schema changes obvious. Using `.get(..., [])` instead will turn those cases into empty lists, potentially hiding VMaaS or schema problems and misleading consumers. If the VMaaS schema is meant to be strict, consider failing fast (or at least logging) when these keys are absent rather than defaulting to empty lists.
</issue_to_address>
### Comment 4
<location path="evaluator/processor.py" line_range="201-210" />
<code_context>
+ return []
+
+ # Pre-pack dicts into tuples, this helps psycopg from having to do expensive dictionary key lookups per row
+ tuple_data = [
+ (
+ r["state"].value if isinstance(r["state"], VulnerabilityState) else r["state"],
+ r["cve_id"],
+ r["rh_account_id"],
+ r["system_id"],
+ r["advisories"],
+ r["advisory_available"],
+ r["when_mitigated"],
+ r["rule_id"],
+ r["rule_hit_details"],
+ r["mitigation_reason"],
+ r["remediation_type_id"],
+ )
+ for r in to_insert
+ ]
+
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Tuple packing logic could be centralized to avoid duplication and reduce the risk of column/parameter misalignment.
This manual tuple must stay perfectly aligned with the SQL column list, and `_update_vulnerabilities` appears to duplicate the same ordering. If a column is added/removed/reordered, multiple tuple literals and SQL strings must be updated in sync, which is error-prone. Consider small serializers (e.g. `_serialize_vuln_for_insert/for_update`) or at least a shared column-order definition to avoid subtle misalignment bugs.
Suggested implementation:
```python
def _serialize_vulnerability_for_insert(self, r: Dict) -> Tuple:
"""Serialize a vulnerability dict into a tuple aligned with the INSERT column order."""
return (
r["state"].value if isinstance(r["state"], VulnerabilityState) else r["state"],
r["cve_id"],
r["rh_account_id"],
r["system_id"],
r["advisories"],
r["advisory_available"],
r["when_mitigated"],
r["rule_id"],
r["rule_hit_details"],
r["mitigation_reason"],
r["remediation_type_id"],
)
@time(EVAL_PART_TIME.labels(part="insert_vulnerabilities"))
async def _insert_vulnerabilities(self, to_insert: [Dict], conn: AsyncConnection) -> List[Tuple]:
"""Insert given system_vulnerabilities"""
if not to_insert:
return []
# Pre-pack dicts into tuples, this helps psycopg from having to do expensive dictionary key lookups per row
tuple_data = [
self._serialize_vulnerability_for_insert(r)
```
To fully implement the suggestion and avoid column/parameter misalignment across the codebase, you should also:
1. Update `_update_vulnerabilities` to use a shared serializer rather than its own manual tuple literal. Depending on the UPDATE column order, either:
- Reuse `_serialize_vulnerability_for_insert` if the order matches, or
- Introduce a `_serialize_vulnerability_for_update` that uses the appropriate column order for the UPDATE query.
2. If the INSERT and UPDATE column lists are defined as SQL strings elsewhere, consider centralizing the column order in a single constant (e.g. `VULN_COLUMN_ORDER`) and have both SQL generation and serializer(s) derive their ordering from that shared definition.
</issue_to_address>
### Comment 5
<location path="common/vmaas_client.py" line_range="28" />
<code_context>
+ return orjson.loads(body)
+
+
+class _VmaasConnectionManager:
+ """
+ Encapsulates all state, SSL contexts, and sessions
</code_context>
<issue_to_address>
**issue (complexity):** Consider replacing the custom `_VmaasConnectionManager` class with simple module-level globals and a `_get_session` helper to manage the shared `ClientSession` more directly and transparently.
You can drop the `_VmaasConnectionManager` indirection and keep the shared-session behavior with less state and branching. This also makes it easier to ensure a closed session is recreated on retries.
For example, replace the manager with simple module-level globals and a helper:
```python
CFG = Config()
LOGGER = get_logger(__name__)
_SESSION: aiohttp.ClientSession | None = None
_SESSION_LOCK = asyncio.Lock()
_SSL_CONTEXT = ssl.create_default_context(cafile=CFG.tls_ca_path)
async def _get_session() -> aiohttp.ClientSession:
async with _SESSION_LOCK:
global _SESSION
if _SESSION is None or _SESSION.closed:
connector = aiohttp.TCPConnector(ssl=_SSL_CONTEXT)
_SESSION = aiohttp.ClientSession(connector=connector)
return _SESSION
```
Then in `vmaas_request` acquire the session on each retry, so a closed session is automatically recreated:
```python
async def vmaas_request(endpoint, data_json=None, method="POST"):
headers = {"Content-type": "application/json", "Accept": "application/json"}
tries = 0
while True:
if tries >= CFG.request_retries:
break
session = await _get_session()
try:
async with session.request(method, endpoint, json=data_json, headers=headers) as response:
if response.status == 200:
return _json_loads(await response.read())
if response.status == 503:
LOGGER.info("VMAAS temporarily unavailable, retrying...")
await asyncio.sleep(1)
else:
tries += 1
VMAAS_RETURN_ERR.inc()
LOGGER.error(
"Error during request to VMaaS endpoint %s: HTTP %s, %s",
endpoint,
response.status,
await response.text(),
)
if data_json:
LOGGER.debug("JSON: %s", str(data_json))
if 400 <= response.status < 500:
break
except aiohttp.ClientError:
tries += 1
VMAAS_CNX_ERR.inc()
LOGGER.exception("Error calling VMAAS: ")
return None
```
This keeps:
- A single shared `ClientSession` with a `TCPConnector` using your SSL context.
- The same retry and logging behavior.
- The orjson-based `_json_loads` optimization.
But removes the custom manager class, lazy lock creation, and makes the lifecycle of the session more obvious.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| def _json_loads(body: bytes): | ||
| """Parse a UTF-8 JSON HTTP body (orjson: lower CPU than stdlib json on large VMaaS responses).""" | ||
| if not body: | ||
| return None | ||
| return orjson.loads(body) |
There was a problem hiding this comment.
issue (bug_risk): Consider whether returning None on an empty 200 OK body matches existing expectations.
Previously await response.json() would raise on an empty JSON body; _json_loads now returns None for an empty 200 response. If callers assume a dict/list and don’t handle None, this can cause subtle downstream errors. Consider either preserving the old behavior (raise on empty) or returning a schema-appropriate default value ({}/[]) so unexpected VMaaS responses fail fast.
There was a problem hiding this comment.
empty 200 body becomes None -> VmaasErrorException in logic.py, not a random AttributeError.
Is is true, that it would be more clear to catch this earlier. Thats more like a schema decission. I will find out.
do not create new session per vmaas call. This adds an additional overhead. Even, in case this does not add much CPU active time, this should be considered as improvement, please see the docs note: https://docs.aiohttp.org/en/stable/client_quickstart.html#make-a-request
using orjson to parse the vmaas response which is faster than traditional json_loads.
If the additional dependency is OK, we can further use this in evaluator as well.
(https://github.com/ijl/orjson?tab=readme-ov-file#performance)
Parse vmaas response using list comprehensions. These should perform slightly better than nested for loops.
prepare data for psycopg.
psycopg binds each row without repeated dict key lookups or named-parameter resolution, which cuts client-side CPU on large executemany batches.
For even better performance, we can use unnest but then we are limited by locks limit due to a transaction.
Sorted from the most significant improvement.
Summary by Sourcery
Optimize VMaaS communication and database vulnerability operations to reduce client-side CPU usage.
Enhancements:
Build: