Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion modules/analysis/src/service/load/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ impl InnerService {
&self,
connection: &C,
query: GraphQuery<'_>,
concurrency: usize,
) -> Result<Vec<(Uuid, Arc<PackageGraph>)>, Error>
where
C: ConnectionTrait + Send + Sync,
Expand Down Expand Up @@ -396,7 +397,8 @@ impl InnerService {

log::debug!("test latest sbom ids: {:?}", matched_sbom_ids);

let mut ranked_sboms = resolve_sbom_cpes(cpe_search, connection, matched_sbom_ids).await?;
let mut ranked_sboms =
resolve_sbom_cpes(cpe_search, connection, matched_sbom_ids, concurrency).await?;

// apply rank
apply_rank(&mut ranked_sboms);
Expand Down
32 changes: 19 additions & 13 deletions modules/analysis/src/service/load/rank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ async fn find_external_refs<C>(
visited: &mut HashSet<(Uuid, String)>,
) -> Result<Vec<ResolvedSbom>, Error>
where
C: ConnectionTrait + Send,
C: ConnectionTrait + Send + Sync,
{
if !visited.insert((sbom_id, node_id.clone())) {
log::debug!("cycle detected for SBOM {sbom_id} / {node_id}, skipping recursion");
Expand Down Expand Up @@ -132,7 +132,7 @@ where
///
#[instrument(skip(connection), err(level=tracing::Level::INFO))]
async fn describing_cpes(
connection: &(impl ConnectionTrait + Send),
connection: &(impl ConnectionTrait + Send + Sync),
sbom_id: Uuid,
) -> Result<Vec<Uuid>, Error> {
Ok(sbom_node_cpe_ref::Entity::find()
Expand Down Expand Up @@ -236,24 +236,30 @@ pub async fn find_node_ancestors<C: ConnectionTrait>(
#[instrument(skip(connection, rows), fields(rows=rows.len()))]
pub async fn resolve_sbom_cpes(
cpe_search: bool,
connection: &(impl ConnectionTrait + Send),
connection: &(impl ConnectionTrait + Send + Sync),
rows: Vec<Row>,
concurrency: usize,
) -> Result<Vec<RankedSbom>, Error> {
let mut matched_sboms = Vec::new();

for matched in rows {
matched_sboms.extend(resolve_sbom_cpe(matched, cpe_search, connection).await?);
}
use futures::{StreamExt, TryStreamExt, stream};

Ok(matched_sboms)
stream::iter(rows)
.map(|matched| async move {
resolve_sbom_cpe(matched, cpe_search, connection).await
})
.buffer_unordered(concurrency)
.try_fold(Vec::new(), |mut acc, sboms| async move {
acc.extend(sboms);
Ok(acc)
})
.await
}

/// Resolves direct CPE matches by joining external nodes to SBOM nodes.
/// (hopefully avoiding N+1 queries).
#[instrument(skip(connection), err(level=tracing::Level::INFO))]
async fn resolve_direct_cpe_matches(
matched: &Row,
connection: &(impl ConnectionTrait + Send),
connection: &(impl ConnectionTrait + Send + Sync),
) -> Result<Vec<RankedSbom>, Error> {
let direct = describing_cpes(connection, matched.sbom_id);
let direct_external = async {
Expand Down Expand Up @@ -311,7 +317,7 @@ async fn resolve_direct_cpe_matches(
#[instrument(skip(connection), err(level=tracing::Level::INFO))]
async fn resolve_ancestor_external_sboms(
matched: &Row,
connection: &(impl ConnectionTrait + Send),
connection: &(impl ConnectionTrait + Send + Sync),
) -> Result<Vec<ResolvedSbom>, Error> {
let top_packages =
find_node_ancestors(matched.sbom_id, matched.node_id.clone(), connection).await?;
Expand Down Expand Up @@ -345,7 +351,7 @@ async fn resolve_ancestor_external_sboms(
async fn enrich_external_sboms(
matched: &Row,
external_sboms: Vec<ResolvedSbom>,
connection: &(impl ConnectionTrait + Send),
connection: &(impl ConnectionTrait + Send + Sync),
) -> Result<Vec<RankedSbom>, Error> {
let mut results = Vec::new();

Expand Down Expand Up @@ -382,7 +388,7 @@ async fn enrich_external_sboms(
async fn resolve_sbom_cpe(
matched: Row,
cpe_search: bool,
connection: &(impl ConnectionTrait + Send),
connection: &(impl ConnectionTrait + Send + Sync),
) -> Result<Vec<RankedSbom>, Error> {
let mut results = Vec::new();

Expand Down
2 changes: 1 addition & 1 deletion modules/analysis/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -633,7 +633,7 @@ impl AnalysisService {
// load only latest graphs
let graphs = self
.inner
.load_latest_graphs_query(connection, query)
.load_latest_graphs_query(connection, query, self.concurrency)
.await?;

log::debug!("graph sbom count: {:?}", graphs.len());
Expand Down
Loading