Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions etc/test-data/cyclonedx/fan-out-external/container.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
{
"bomFormat": "CycloneDX",
"specVersion": "1.5",
"serialNumber": "urn:cdx:cccccccc-cccc-cccc-cccc-cccccccccccc",
"version": 1,
"metadata": {
"timestamp": "1970-01-01T13:30:00Z",
"component": {
"bom-ref": "root",
"name": "fan-out-container",
"type": "application"
}
},
"components": [
{
"bom-ref": "c1",
"name": "C1",
"version": "1",
"purl": "pkg:rpm/redhat/C1@1.0.0?arch=src",
"type": "library"
},
{
"bom-ref": "c2",
"name": "C2",
"version": "1",
"purl": "pkg:rpm/redhat/C2@1.0.0?arch=src",
"type": "library"
},
{
"bom-ref": "c3",
"name": "C3",
"version": "1",
"purl": "pkg:rpm/redhat/C3@1.0.0?arch=src",
"type": "library"
}
],
"dependencies": [
{
"ref": "root",
"dependsOn": ["c1", "c2", "c3"]
},
{
"ref": "c1",
"dependsOn": ["urn:cdx:tttttttt-tttt-tttt-tttt-tttttttttttt/1#t1"]
},
{
"ref": "c2",
"dependsOn": ["urn:cdx:tttttttt-tttt-tttt-tttt-tttttttttttt/1#t1"]
},
{
"ref": "c3",
"dependsOn": ["urn:cdx:tttttttt-tttt-tttt-tttt-tttttttttttt/1#t1"]
}
]
}
29 changes: 29 additions & 0 deletions etc/test-data/cyclonedx/fan-out-external/target.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
{
"bomFormat": "CycloneDX",
"specVersion": "1.5",
"serialNumber": "urn:cdx:tttttttt-tttt-tttt-tttt-tttttttttttt",
"version": 1,
"metadata": {
"timestamp": "1970-01-01T13:30:00Z",
"component": {
"bom-ref": "root",
"name": "fan-out-target",
"type": "application"
}
},
"components": [
{
"bom-ref": "t1",
"name": "T1",
"version": "1",
"purl": "pkg:rpm/redhat/T1@1.0.0?arch=src",
"type": "library"
}
],
"dependencies": [
{
"ref": "root",
"dependsOn": ["t1"]
}
]
}
28 changes: 28 additions & 0 deletions etc/test-data/cyclonedx/loop-external/a.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
{
"bomFormat": "CycloneDX",
"specVersion": "1.5",
"serialNumber": "urn:cdx:aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa",
"version": 1,
"metadata": {
"timestamp": "1970-01-01T13:30:00Z",
"component": {
"name": "loop-external-a",
"type": "application"
}
},
"components": [
{
"bom-ref": "x",
"name": "X",
"version": "1",
"purl": "pkg:rpm/redhat/X@0.0.0?arch=src",
"type": "library"
}
],
"dependencies": [
{
"ref": "x",
"dependsOn": ["urn:cdx:bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb/1#y"]
}
]
}
28 changes: 28 additions & 0 deletions etc/test-data/cyclonedx/loop-external/b.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
{
"bomFormat": "CycloneDX",
"specVersion": "1.5",
"serialNumber": "urn:cdx:bbbbbbbb-bbbb-bbbb-bbbb-bbbbbbbbbbbb",
"version": 1,
"metadata": {
"timestamp": "1970-01-01T13:30:00Z",
"component": {
"name": "loop-external-b",
"type": "application"
}
},
"components": [
{
"bom-ref": "y",
"name": "Y",
"version": "1",
"purl": "pkg:rpm/redhat/Y@0.0.0?arch=src",
"type": "library"
}
],
"dependencies": [
{
"ref": "y",
"dependsOn": ["urn:cdx:aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa/1#x"]
}
]
}
82 changes: 63 additions & 19 deletions modules/analysis/src/service/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,22 @@ use super::*;
use crate::model::graph::{ExternalNode, PackageNode};
use futures::stream::{self, StreamExt};
use parking_lot::Mutex;
use std::{collections::HashMap, sync::Arc};
use std::{
collections::{HashMap, hash_map::Entry},
sync::Arc,
};

/// Tracker for visited nodes, across graphs.
#[derive(Default, Clone)]
pub struct DiscoveredTracker {
cache: Arc<Mutex<HashMap<*const NodeGraph, FixedBitSet>>>,
cache: Arc<Mutex<HashMap<Uuid, FixedBitSet>>>,
}

impl DiscoveredTracker {
pub fn visit(&self, graph: &NodeGraph, node: NodeIndex) -> bool {
/// Check if a node was already visited, marking it as visited if not.
pub fn visit(&self, sbom_id: Uuid, graph: &NodeGraph, node: NodeIndex) -> bool {
let mut maps = self.cache.lock();
let map = maps
.entry(graph as *const Graph<_, _>)
.or_insert_with(|| graph.visit_map());
let map = maps.entry(sbom_id).or_insert_with(|| graph.visit_map());

map.visit(node)
}
Expand All @@ -28,11 +30,13 @@ impl DiscoveredTracker {
pub struct Collector<'a, C: ConnectionTrait> {
graph_cache: &'a Arc<GraphMap>,
graphs: &'a [(Uuid, Arc<PackageGraph>)],
sbom_id: Uuid,
graph: &'a NodeGraph,
node: NodeIndex,
direction: Direction,
depth: u64,
discovered: DiscoveredTracker,
loaded_graphs: Arc<Mutex<HashMap<Uuid, Arc<PackageGraph>>>>,
relationships: &'a HashSet<Relationship>,
connection: &'a C,
concurrency: usize,
Expand All @@ -44,11 +48,13 @@ impl<'a, C: ConnectionTrait> Collector<'a, C> {
Collector {
graph_cache: self.graph_cache,
graphs: self.graphs,
sbom_id: self.sbom_id,
graph: self.graph,
node: self.node,
direction: self.direction,
depth: self.depth,
discovered: self.discovered.clone(),
loaded_graphs: self.loaded_graphs.clone(),
relationships: self.relationships,
connection: self.connection,
concurrency: self.concurrency,
Expand All @@ -61,6 +67,7 @@ impl<'a, C: ConnectionTrait> Collector<'a, C> {
pub fn new(
graph_cache: &'a Arc<GraphMap>,
graphs: &'a [(Uuid, Arc<PackageGraph>)],
sbom_id: Uuid,
graph: &'a NodeGraph,
node: NodeIndex,
direction: Direction,
Expand All @@ -73,11 +80,13 @@ impl<'a, C: ConnectionTrait> Collector<'a, C> {
Self {
graph_cache,
graphs,
sbom_id,
graph,
node,
direction,
depth,
discovered: Default::default(),
loaded_graphs: Default::default(),
relationships,
connection,
concurrency,
Expand All @@ -88,8 +97,9 @@ impl<'a, C: ConnectionTrait> Collector<'a, C> {
/// Continue with another graph and node as an entry point.
///
/// Shares the visited set.
pub fn with(self, graph: &'a NodeGraph, node: NodeIndex) -> Self {
pub fn with(self, sbom_id: Uuid, graph: &'a NodeGraph, node: NodeIndex) -> Self {
Self {
sbom_id,
graph,
node,
..self
Expand All @@ -103,18 +113,41 @@ impl<'a, C: ConnectionTrait> Collector<'a, C> {
Self {
graph_cache: self.graph_cache,
graphs: self.graphs,
sbom_id: self.sbom_id,
graph: self.graph,
node,
direction: self.direction,
depth: self.depth - 1,
discovered: self.discovered.clone(),
loaded_graphs: self.loaded_graphs.clone(),
relationships: self.relationships,
connection: self.connection,
concurrency: self.concurrency,
loader: self.loader,
}
}

/// Load an external SBOM graph, checking the local cache first.
async fn load_external_graph(&self, sbom_id: Uuid) -> Result<Option<Arc<PackageGraph>>, Error> {
if let Some(graph) = self.loaded_graphs.lock().get(&sbom_id).cloned() {
return Ok(Some(graph));
}

let Some(graph) = self.loader.load(self.connection, sbom_id).await? else {
return Ok(None);
};

let mut map = self.loaded_graphs.lock();
match map.entry(sbom_id) {
Entry::Occupied(e) => {
log::debug!("concurrent load of external SBOM {sbom_id}, reusing existing handle");
self.loader.redundant_loads.add(1, &[]);
Ok(Some(e.get().clone()))
}
Entry::Vacant(e) => Ok(Some(e.insert(graph).clone())),
}
}

/// Collect related nodes in the provided direction.
///
/// If the depth is zero, or the node was already processed, it will return [`None`], indicating
Expand All @@ -130,7 +163,7 @@ impl<'a, C: ConnectionTrait> Collector<'a, C> {

let node = self.graph.node_weight(self.node);

if !self.discovered.visit(self.graph, self.node) {
if !self.discovered.visit(self.sbom_id, self.graph, self.node) {
log::trace!("node got visited already");
return Ok((None, vec!["This node was already visited. Possible relationship loop. Skipping further processing.".into()]));
}
Expand Down Expand Up @@ -169,9 +202,8 @@ impl<'a, C: ConnectionTrait> Collector<'a, C> {
));
};

// retrieve external sbom graph from graph_cache
let Some(external_graph) = self.loader.load(self.connection, external_sbom_id).await?
else {
// retrieve external sbom graph, checking local cache first
let Some(external_graph) = self.load_external_graph(external_sbom_id).await? else {
return Ok((
None,
vec![format!(
Expand All @@ -196,9 +228,13 @@ impl<'a, C: ConnectionTrait> Collector<'a, C> {
// recurse into those descendent nodes
Ok((
Some(
self.with(external_graph.as_ref(), external_node_index)
.collect_graph()
.await?,
self.with(
external_sbom_id,
external_graph.as_ref(),
external_node_index,
)
.collect_graph()
.await?,
),
vec![],
))
Expand Down Expand Up @@ -244,9 +280,7 @@ impl<'a, C: ConnectionTrait> Collector<'a, C> {

// get the external sbom graph

let Some(external_graph) =
self.loader.load(self.connection, matched.sbom_id).await?
else {
let Some(external_graph) = self.load_external_graph(matched.sbom_id).await? else {
log::warn!(
"external sbom graph {} not found in graph cache or database",
matched.sbom_id
Expand All @@ -268,7 +302,11 @@ impl<'a, C: ConnectionTrait> Collector<'a, C> {
// recurse into those external sbom nodes and save

collector
.with(external_graph.as_ref(), external_node_index)
.with(
matched.sbom_id,
external_graph.as_ref(),
external_node_index,
)
.collect_graph()
.await
})
Expand Down Expand Up @@ -344,13 +382,19 @@ impl<'a, C: ConnectionTrait> Collector<'a, C> {
#[derive(Clone)]
pub struct GraphLoader {
service: AnalysisService,
pub(crate) redundant_loads: Counter<u64>,
}

impl GraphLoader {
pub fn new(service: AnalysisService) -> Self {
Self { service }
let meter = global::meter("AnalysisService");
Self {
service,
redundant_loads: meter.u64_counter("collector_redundant_loads").build(),
}
}

#[instrument(skip(self, connection), err(level=tracing::Level::INFO))]
pub async fn load(
&self,
connection: &impl ConnectionTrait,
Expand Down
2 changes: 2 additions & 0 deletions modules/analysis/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,7 @@ impl AnalysisService {
let ancestors = Collector::new(
&graph_cache,
graphs,
node.sbom_id,
graph,
node_index,
Direction::Incoming,
Expand All @@ -518,6 +519,7 @@ impl AnalysisService {
let descendants = Collector::new(
&graph_cache,
graphs,
node.sbom_id,
graph,
node_index,
Direction::Outgoing,
Expand Down
Loading
Loading