From 5f3793171346ece00fbc26217a562cd76ebb8eca Mon Sep 17 00:00:00 2001 From: James Fuller Date: Tue, 26 May 2026 14:48:25 +0200 Subject: [PATCH] analysis: perf exp --- modules/analysis/src/service/load/mod.rs | 4 ++- modules/analysis/src/service/load/rank.rs | 32 ++++++++++++++--------- modules/analysis/src/service/mod.rs | 2 +- 3 files changed, 23 insertions(+), 15 deletions(-) diff --git a/modules/analysis/src/service/load/mod.rs b/modules/analysis/src/service/load/mod.rs index 5ee93de4d..715da8b43 100644 --- a/modules/analysis/src/service/load/mod.rs +++ b/modules/analysis/src/service/load/mod.rs @@ -334,6 +334,7 @@ impl InnerService { &self, connection: &C, query: GraphQuery<'_>, + concurrency: usize, ) -> Result)>, Error> where C: ConnectionTrait + Send + Sync, @@ -396,7 +397,8 @@ impl InnerService { log::debug!("test latest sbom ids: {:?}", matched_sbom_ids); - let mut ranked_sboms = resolve_sbom_cpes(cpe_search, connection, matched_sbom_ids).await?; + let mut ranked_sboms = + resolve_sbom_cpes(cpe_search, connection, matched_sbom_ids, concurrency).await?; // apply rank apply_rank(&mut ranked_sboms); diff --git a/modules/analysis/src/service/load/rank.rs b/modules/analysis/src/service/load/rank.rs index 8c8910479..83ad05d9d 100644 --- a/modules/analysis/src/service/load/rank.rs +++ b/modules/analysis/src/service/load/rank.rs @@ -79,7 +79,7 @@ async fn find_external_refs( visited: &mut HashSet<(Uuid, String)>, ) -> Result, Error> where - C: ConnectionTrait + Send, + C: ConnectionTrait + Send + Sync, { if !visited.insert((sbom_id, node_id.clone())) { log::debug!("cycle detected for SBOM {sbom_id} / {node_id}, skipping recursion"); @@ -132,7 +132,7 @@ where /// #[instrument(skip(connection), err(level=tracing::Level::INFO))] async fn describing_cpes( - connection: &(impl ConnectionTrait + Send), + connection: &(impl ConnectionTrait + Send + Sync), sbom_id: Uuid, ) -> Result, Error> { Ok(sbom_node_cpe_ref::Entity::find() @@ -236,16 +236,22 @@ pub async fn find_node_ancestors( #[instrument(skip(connection, rows), fields(rows=rows.len()))] pub async fn resolve_sbom_cpes( cpe_search: bool, - connection: &(impl ConnectionTrait + Send), + connection: &(impl ConnectionTrait + Send + Sync), rows: Vec, + concurrency: usize, ) -> Result, Error> { - let mut matched_sboms = Vec::new(); - - for matched in rows { - matched_sboms.extend(resolve_sbom_cpe(matched, cpe_search, connection).await?); - } + use futures::{StreamExt, TryStreamExt, stream}; - Ok(matched_sboms) + stream::iter(rows) + .map(|matched| async move { + resolve_sbom_cpe(matched, cpe_search, connection).await + }) + .buffer_unordered(concurrency) + .try_fold(Vec::new(), |mut acc, sboms| async move { + acc.extend(sboms); + Ok(acc) + }) + .await } /// Resolves direct CPE matches by joining external nodes to SBOM nodes. @@ -253,7 +259,7 @@ pub async fn resolve_sbom_cpes( #[instrument(skip(connection), err(level=tracing::Level::INFO))] async fn resolve_direct_cpe_matches( matched: &Row, - connection: &(impl ConnectionTrait + Send), + connection: &(impl ConnectionTrait + Send + Sync), ) -> Result, Error> { let direct = describing_cpes(connection, matched.sbom_id); let direct_external = async { @@ -311,7 +317,7 @@ async fn resolve_direct_cpe_matches( #[instrument(skip(connection), err(level=tracing::Level::INFO))] async fn resolve_ancestor_external_sboms( matched: &Row, - connection: &(impl ConnectionTrait + Send), + connection: &(impl ConnectionTrait + Send + Sync), ) -> Result, Error> { let top_packages = find_node_ancestors(matched.sbom_id, matched.node_id.clone(), connection).await?; @@ -345,7 +351,7 @@ async fn resolve_ancestor_external_sboms( async fn enrich_external_sboms( matched: &Row, external_sboms: Vec, - connection: &(impl ConnectionTrait + Send), + connection: &(impl ConnectionTrait + Send + Sync), ) -> Result, Error> { let mut results = Vec::new(); @@ -382,7 +388,7 @@ async fn enrich_external_sboms( async fn resolve_sbom_cpe( matched: Row, cpe_search: bool, - connection: &(impl ConnectionTrait + Send), + connection: &(impl ConnectionTrait + Send + Sync), ) -> Result, Error> { let mut results = Vec::new(); diff --git a/modules/analysis/src/service/mod.rs b/modules/analysis/src/service/mod.rs index 406efa0b8..c09146e0f 100644 --- a/modules/analysis/src/service/mod.rs +++ b/modules/analysis/src/service/mod.rs @@ -633,7 +633,7 @@ impl AnalysisService { // load only latest graphs let graphs = self .inner - .load_latest_graphs_query(connection, query) + .load_latest_graphs_query(connection, query, self.concurrency) .await?; log::debug!("graph sbom count: {:?}", graphs.len());