Skip to content

Add bloom filter pruning on the indexed parquet read path#21904

Draft
Bukhtawar wants to merge 3 commits into
opensearch-project:mainfrom
Bukhtawar:bloom-filter-read-pruning
Draft

Add bloom filter pruning on the indexed parquet read path#21904
Bukhtawar wants to merge 3 commits into
opensearch-project:mainfrom
Bukhtawar:bloom-filter-read-pruning

Conversation

@Bukhtawar
Copy link
Copy Markdown
Contributor

Integrate per-row-group bloom filter checks into the indexed execution prefetch phase. When enabled, equality predicates (=, IN) are checked against parquet bloom filters before invoking the Lucene collector FFM call. If the bloom filter proves the queried value is absent from a row group, the entire RG is skipped — saving the FFM round-trip, page pruning, and parquet decode.

Java:

  • Add datafusion.indexed.bloom_filter_on_read (Dynamic, NodeScope, default true) to DatafusionSettings
  • Wire through WireConfigSnapshot at offset 72 (i32, 0/1)

Rust:

  • Add bloom_filter_on_read to WireDatafusionQueryConfig + DatafusionQueryConfig
  • New module: indexed_table/bloom_pruner.rs
    • extract_equality_checks: walks PhysicalExpr tree for col=lit and IN
    • read_bloom_filter: fetches SBBF bytes from object store per RG/column
    • scalar_to_bloom_check: converts ScalarValue to Sbbf::check input
    • bloom_prune_rg: top-level entry — returns true if RG can be skipped
  • SingleCollectorEvaluator: accepts store/path/metadata/schema; calls bloom_prune_rg at the top of prefetch_rg (before page pruning)
  • indexed_executor: captures store + schema + bloom_filter_on_read in the evaluator factory closure

Design:

  • AND semantics only: if any single column's equality proves absence, skip the RG. OR predicates are not extracted (future enhancement).
  • Conservative: any IO error or unsupported type → do NOT prune.
  • Runs synchronously via futures::executor::block_on since prefetch_rg is already on a worker thread; bloom filter reads are small (~8-32KB).

Description

[Describe what this change achieves]

Related Issues

Resolves #[Issue number to be closed when this PR is merged]

Check List

  • Functionality includes testing.
  • API changes companion pull request created, if applicable.
  • Public documentation issue/PR created, if applicable.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@Bukhtawar Bukhtawar requested a review from a team as a code owner May 30, 2026 09:35
@Bukhtawar Bukhtawar marked this pull request as draft May 30, 2026 09:35
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 30, 2026

PR Reviewer Guide 🔍

(Review updated until commit c6b93d9)

Here are some key observations to aid the review process:

🧪 PR contains tests
🔒 No security concerns identified
✅ No TODO sections
🔀 No multiple PR themes
⚡ Recommended focus areas for review

Blocking I/O

bloom_prune_rg is async but called via futures::executor::block_on in prefetch_rg (line 252 of single_collector.rs). This blocks the worker thread during bloom filter reads. If multiple RGs are processed concurrently on the same thread pool, this can cause thread starvation and degrade throughput. The PR description acknowledges this is intentional because prefetch_rg is already on a worker thread, but blocking async I/O in a thread pool can still cause contention if the pool is shared or if bloom filter reads are slower than expected (network latency, object store throttling).

pub async fn bloom_prune_rg(
    store: &dyn ObjectStore,
    path: &object_store::path::Path,
    metadata: &ParquetMetaData,
    _arrow_schema: &arrow::datatypes::Schema,
    rg_idx: usize,
    predicate: &PruningPredicate,
) -> bool {
    let rg_meta = match metadata.row_groups().get(rg_idx) {
        Some(m) => m,
        None => return false,
    };

    let literal_columns = predicate.literal_columns();
    if literal_columns.is_empty() {
        return false;
    }

    let schema_descr = metadata.file_metadata().schema_descr();
    let mut stats = BloomFilterStatistics::new();

    for col_name in &literal_columns {
        let col_idx = match (0..schema_descr.num_columns())
            .find(|&i| schema_descr.column(i).name() == col_name)
        {
            Some(idx) => idx,
            None => continue,
        };

        let col_meta = match rg_meta.columns().get(col_idx) {
            Some(m) => m,
            None => continue,
        };

        let bf_offset = match col_meta.bloom_filter_offset() {
            Some(o) => o as usize,
            None => continue,
        };

        let bf_length = match col_meta.bloom_filter_length() {
            Some(l) => l as usize,
            None => continue,
        };

        let opts = GetOptions {
            range: Some(GetRange::Bounded(std::ops::Range {
                start: bf_offset as u64,
                end: (bf_offset + bf_length) as u64,
            })),
            ..Default::default()
        };

        let bytes = match store.get_opts(path, opts).await {
            Ok(result) => match result.bytes().await {
                Ok(b) => b,
                Err(_) => continue,
            },
            Err(_) => continue,
        };

        let sbbf = match Sbbf::from_bytes(&bytes) {
            Ok(f) => f,
            Err(_) => continue,
        };

        let physical_type = schema_descr.column(col_idx).physical_type();
        stats.insert(col_name.clone(), sbbf, physical_type);
    }

    if stats.is_empty() {
        return false;
    }

    // Reuse PruningPredicate::prune — same logic DataFusion uses for min/max and bloom
    match predicate.prune(&stats) {
        Ok(results) => {
            // results[0] == false means "can prune this container"
            if let Some(&false) = results.first() {
                debug!("bloom_prune_rg: pruning rg={} via PruningPredicate", rg_idx);
                return true;
            }
            false
        }
        Err(_) => false,
    }
Silent Failure

Bloom filter pruning errors are silently ignored (bloom_pruner.rs lines 220-226, 229-231). If the object store is misconfigured, credentials expire, or the bloom filter is corrupted, the RG is not pruned and the query proceeds with the expensive collector call. No log, metric, or warning is emitted. Users cannot detect when bloom filtering is silently disabled due to transient failures, making performance regressions invisible.

if let (Some(bloom), Some(pp)) = (&self.bloom_config, &self.pruning_predicate) {
    let pruned = futures::executor::block_on(
        crate::indexed_table::bloom_pruner::bloom_prune_rg(
            &*bloom.store,
            &bloom.object_path,
            &bloom.metadata,
            &bloom.arrow_schema,
            rg.index,
            pp.as_ref(),
        )
    );
    if pruned {
        return Ok(None);
    }
}

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 30, 2026

PR Code Suggestions ✨

Latest suggestions up to c6b93d9

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Prevent potential async runtime deadlock

Using block_on in a synchronous context can cause deadlocks if called from within an
async runtime. Since prefetch_rg is synchronous but may be called from async
contexts, consider using tokio::task::block_in_place or restructuring to make
prefetch_rg async to avoid potential runtime conflicts.

sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_table/eval/single_collector.rs [252-261]

-let pruned = futures::executor::block_on(
-    crate::indexed_table::bloom_pruner::bloom_prune_rg(
-        &*bloom.store,
-        &bloom.object_path,
-        &bloom.metadata,
-        &bloom.arrow_schema,
-        rg.index,
-        pp.as_ref(),
+let pruned = tokio::task::block_in_place(|| {
+    futures::executor::block_on(
+        crate::indexed_table::bloom_pruner::bloom_prune_rg(
+            &*bloom.store,
+            &bloom.object_path,
+            &bloom.metadata,
+            &bloom.arrow_schema,
+            rg.index,
+            pp.as_ref(),
+        )
     )
-);
+});
Suggestion importance[1-10]: 7

__

Why: Using futures::executor::block_on in a synchronous context that may be called from async runtimes can cause deadlocks. The suggestion to use tokio::task::block_in_place is valid and addresses a real concurrency issue, though the impact depends on the actual runtime context.

Medium
General
Log bloom filter fetch errors

Silently ignoring errors when fetching bloom filter data can mask critical I/O
failures and lead to incorrect pruning decisions. Log errors at debug or warn level
to aid troubleshooting while maintaining the fallback behavior.

sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_table/bloom_pruner.rs [212-226]

 let bytes = match store.get_opts(path, opts).await {
     Ok(result) => match result.bytes().await {
         Ok(b) => b,
-        Err(_) => continue,
+        Err(e) => {
+            debug!("Failed to read bloom filter bytes for column {}: {}", col_name, e);
+            continue;
+        }
     },
-    Err(_) => continue,
+    Err(e) => {
+        debug!("Failed to fetch bloom filter for column {}: {}", col_name, e);
+        continue;
+    }
 };
Suggestion importance[1-10]: 5

__

Why: Adding debug logging for I/O errors when fetching bloom filters improves observability and troubleshooting. While the fallback behavior is correct (continuing without the bloom filter), logging helps diagnose issues. The score is moderate as this is a maintainability improvement rather than a correctness fix.

Low
Log bloom filter parsing failures

Silently ignoring bloom filter deserialization errors can hide data corruption or
format incompatibilities. Add debug logging to track when bloom filters fail to
parse, which is critical for diagnosing pruning effectiveness issues.

sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_table/bloom_pruner.rs [228-232]

 let sbbf = match Sbbf::from_bytes(&bytes) {
     Ok(f) => f,
-    Err(_) => continue,
+    Err(e) => {
+        debug!("Failed to parse bloom filter for column {}: {}", col_name, e);
+        continue;
+    }
 };
Suggestion importance[1-10]: 5

__

Why: Similar to the previous suggestion, adding debug logging for deserialization failures improves observability. This helps track data corruption or format incompatibilities. The score is moderate as it's a maintainability enhancement rather than a critical fix.

Low

Previous suggestions

Suggestions up to commit 5fa55f5
CategorySuggestion                                                                                                                                    Impact
General
Avoid potential deadlock with block_on

Using block_on in a synchronous context can cause deadlocks if the async runtime is
already blocked or if nested blocking occurs. Since prefetch_rg is called from the
stream processing path, consider making it async or using a dedicated runtime handle
to avoid blocking the current thread.

sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_table/eval/single_collector.rs [251-265]

 if let (Some(bloom), Some(pp)) = (&self.bloom_config, &self.pruning_predicate) {
-    let pruned = futures::executor::block_on(
+    let handle = tokio::runtime::Handle::try_current()
+        .map_err(|e| format!("No tokio runtime available for bloom pruning: {}", e))?;
+    let pruned = handle.block_on(
         crate::indexed_table::bloom_pruner::bloom_prune_rg(
             &*bloom.store,
             &bloom.object_path,
             &bloom.metadata,
             &bloom.arrow_schema,
             rg.index,
             pp.as_ref(),
         )
     );
     if pruned {
         return Ok(None);
     }
 }
Suggestion importance[1-10]: 7

__

Why: Using block_on in a potentially async context can cause deadlocks. The suggestion to use Handle::try_current() is a safer approach, though the actual risk depends on the calling context. This is a valid concern for production code.

Medium
Log bloom filter fetch failures

The nested error handling silently ignores all I/O failures when fetching bloom
filter data. This can mask critical issues like network failures, permission errors,
or corrupted metadata. Consider logging errors at debug/warn level to aid
troubleshooting, especially since bloom filter failures should be transparent but
observable.

sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_table/bloom_pruner.rs [212-226]

 let bytes = match store.get_opts(path, opts).await {
     Ok(result) => match result.bytes().await {
         Ok(b) => b,
-        Err(_) => continue,
+        Err(e) => {
+            debug!("Failed to read bloom filter bytes for column '{}' in rg={}: {}", col_name, rg_idx, e);
+            continue;
+        }
     },
-    Err(_) => continue,
+    Err(e) => {
+        debug!("Failed to fetch bloom filter for column '{}' in rg={}: {}", col_name, rg_idx, e);
+        continue;
+    }
 };
Suggestion importance[1-10]: 5

__

Why: Adding debug logging for bloom filter fetch failures would improve observability without changing behavior. However, the impact is moderate since bloom filter failures are designed to be transparent (the system continues without pruning).

Low
Log bloom filter deserialization failures

Silently ignoring bloom filter deserialization errors can hide data corruption or
format incompatibilities. Since bloom filters are critical for performance
optimization, log deserialization failures to help diagnose issues with parquet file
generation or version mismatches.

sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_table/bloom_pruner.rs [228-231]

 let sbbf = match Sbbf::from_bytes(&bytes) {
     Ok(f) => f,
-    Err(_) => continue,
+    Err(e) => {
+        debug!("Failed to deserialize bloom filter for column '{}' in rg={}: {}", col_name, rg_idx, e);
+        continue;
+    }
 };
Suggestion importance[1-10]: 5

__

Why: Similar to suggestion 1, adding debug logging for deserialization failures would improve observability. The impact is moderate since the system gracefully handles these failures by continuing without bloom filter pruning.

Low
Suggestions up to commit 5d5ed91
CategorySuggestion                                                                                                                                    Impact
Possible issue
Prevent potential runtime deadlocks

Using block_on in a synchronous context can cause deadlocks if called from within an
async runtime. Since prefetch_rg is synchronous but may be called from async
contexts, consider using tokio::task::block_in_place or restructuring to avoid
blocking.

sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_table/bloom_pruner.rs [267-276]

-let pruned = futures::executor::block_on(
-    crate::indexed_table::bloom_pruner::bloom_prune_rg(
-        store.as_ref(),
-        path,
-        meta.as_ref(),
-        schema.as_ref(),
-        rg.index,
-        pp.as_ref(),
+let pruned = tokio::task::block_in_place(|| {
+    futures::executor::block_on(
+        crate::indexed_table::bloom_pruner::bloom_prune_rg(
+            store.as_ref(),
+            path,
+            meta.as_ref(),
+            schema.as_ref(),
+            rg.index,
+            pp.as_ref(),
+        )
     )
-);
+});
Suggestion importance[1-10]: 7

__

Why: Using block_on in sync code called from async contexts can cause deadlocks. tokio::task::block_in_place is the correct solution for this scenario, preventing potential runtime issues.

Medium
General
Log bloom filter fetch failures

Silently ignoring errors when fetching bloom filter data can mask critical I/O
failures or corruption. Log errors at debug/warn level to aid troubleshooting while
maintaining the fallback behavior.

sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_table/bloom_pruner.rs [219-227]

-let opts = GetOptions {
-    range: Some(GetRange::Bounded(std::ops::Range {
-        start: bf_offset as u64,
-        end: (bf_offset + bf_length) as u64,
-    })),
-    ..Default::default()
-};
-
 let bytes = match store.get_opts(path, opts).await {
     Ok(result) => match result.bytes().await {
         Ok(b) => b,
-        Err(_) => continue,
+        Err(e) => {
+            debug!("Failed to read bloom filter bytes for column {}: {}", col_name, e);
+            continue;
+        }
     },
-    Err(_) => continue,
+    Err(e) => {
+        debug!("Failed to fetch bloom filter for column {}: {}", col_name, e);
+        continue;
+    }
 };
Suggestion importance[1-10]: 5

__

Why: Adding debug logging for I/O failures improves troubleshooting without changing behavior. The suggestion correctly identifies that silently ignoring errors can mask issues, though the impact is moderate since the fallback behavior is intentional.

Low
Detect incomplete bloom filter configuration

The bloom filter check requires all five optional fields to be present. If any are
missing, pruning silently falls through. Consider logging when bloom_filter_on_read
is enabled but required fields are missing to detect configuration issues.

sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_table/eval/single_collector.rs [259-281]

 if self.bloom_filter_on_read {
-    if let (Some(store), Some(path), Some(meta), Some(schema), Some(pp)) = (
+    match (
         self.store.as_ref(),
         self.object_path.as_ref(),
         self.metadata.as_ref(),
         self.arrow_schema.as_ref(),
         self.pruning_predicate.as_ref(),
     ) {
-        let pruned = futures::executor::block_on(
-            crate::indexed_table::bloom_pruner::bloom_prune_rg(
-                ...
-            )
-        );
-        if pruned {
-            return Ok(None);
+        (Some(store), Some(path), Some(meta), Some(schema), Some(pp)) => {
+            let pruned = futures::executor::block_on(...);
+            if pruned {
+                return Ok(None);
+            }
+        }
+        _ => {
+            debug!("Bloom filter enabled but required fields missing for rg={}", rg.index);
         }
     }
 }
Suggestion importance[1-10]: 4

__

Why: Adding debug logging when bloom_filter_on_read is enabled but required fields are missing helps detect configuration issues. However, this is a minor improvement since missing fields may be intentional in some scenarios.

Low
Suggestions up to commit 2b97c12
CategorySuggestion                                                                                                                                    Impact
Possible issue
Avoid blocking executor with async I/O

Using futures::executor::block_on in a synchronous context can cause deadlocks or
performance issues if called from within an async runtime. The prefetch_rg method is
synchronous but performs async I/O. Consider making prefetch_rg async or using a
dedicated blocking thread pool for the bloom filter check to avoid blocking the
executor.

sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_table/eval/single_collector.rs [256-281]

 if self.bloom_filter_on_read {
     if let (Some(store), Some(path), Some(meta), Some(schema), Some(pred)) = (
         self.store.as_ref(),
         self.object_path.as_ref(),
         self.metadata.as_ref(),
         self.arrow_schema.as_ref(),
         self.residual_expr.as_ref(),
     ) {
-        let pruned = futures::executor::block_on(
-            crate::indexed_table::bloom_pruner::bloom_prune_rg(
-                store.as_ref(),
-                path,
-                meta.as_ref(),
-                schema.as_ref(),
-                rg.index,
-                pred,
+        let pruned = tokio::task::block_in_place(|| {
+            tokio::runtime::Handle::current().block_on(
+                crate::indexed_table::bloom_pruner::bloom_prune_rg(
+                    store.as_ref(),
+                    path,
+                    meta.as_ref(),
+                    schema.as_ref(),
+                    rg.index,
+                    pred,
+                )
             )
-        );
+        });
         if pruned {
             return Ok(None);
         }
     }
 }
Suggestion importance[1-10]: 7

__

Why: Using futures::executor::block_on in a potentially async context can cause performance issues or deadlocks. However, the suggested fix using tokio::task::block_in_place assumes a Tokio runtime is available, which may not always be the case. The suggestion is valid but the implementation needs verification against the actual runtime environment.

Medium
General
Parallelize bloom filter reads

The function performs sequential async I/O for each equality check, which could be
slow when multiple columns have bloom filters. Consider using
futures::future::join_all or similar to read all bloom filters concurrently, then
check them sequentially to improve latency for multi-column predicates.

sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_table/bloom_pruner.rs [185-225]

 pub async fn bloom_prune_rg(
     store: &dyn ObjectStore,
     path: &object_store::path::Path,
     metadata: &ParquetMetaData,
     arrow_schema: &arrow::datatypes::Schema,
     rg_idx: usize,
     predicate: &Arc<dyn PhysicalExpr>,
 ) -> bool {
     let checks = extract_equality_checks(predicate, metadata, arrow_schema);
     if checks.is_empty() {
         return false;
     }
 
-    for check in &checks {
-        let sbbf = match read_bloom_filter(store, path, metadata, rg_idx, check.parquet_col_idx).await {
+    let bloom_futures: Vec<_> = checks.iter()
+        .map(|check| read_bloom_filter(store, path, metadata, rg_idx, check.parquet_col_idx))
+        .collect();
+    let blooms = futures::future::join_all(bloom_futures).await;
+
+    for (check, sbbf_opt) in checks.iter().zip(blooms.into_iter()) {
+        let sbbf = match sbbf_opt {
             Some(filter) => filter,
             None => continue,
         };
         ...
     }
Suggestion importance[1-10]: 6

__

Why: Parallelizing bloom filter reads could improve performance for multi-column predicates. However, the improvement is speculative and depends on the typical number of equality checks per query. The added complexity may not be justified for the common case of single-column predicates, and the current sequential approach is simpler and more maintainable.

Low
Log bloom filter read failures

The bloom filter read silently returns None on any I/O error, which could mask
transient network failures or permission issues. Consider logging errors at debug
level before returning None to aid troubleshooting in production environments where
bloom filter reads unexpectedly fail.

sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_table/bloom_pruner.rs [169-175]

 let opts = GetOptions {
     range: Some(GetRange::Bounded(std::ops::Range { start: bf_offset as u64, end: (bf_offset + bf_length) as u64 })),
     ..Default::default()
 };
-let result = store.get_opts(path, opts).await.ok()?;
-let bytes = result.bytes().await.ok()?;
+let result = match store.get_opts(path, opts).await {
+    Ok(r) => r,
+    Err(e) => {
+        debug!("Failed to read bloom filter for rg={}, col={}: {}", rg_idx, col_idx, e);
+        return None;
+    }
+};
+let bytes = match result.bytes().await {
+    Ok(b) => b,
+    Err(e) => {
+        debug!("Failed to read bloom filter bytes for rg={}, col={}: {}", rg_idx, col_idx, e);
+        return None;
+    }
+};
Suggestion importance[1-10]: 5

__

Why: Adding debug logging for I/O errors would improve troubleshooting capabilities. However, this is a minor enhancement that doesn't address a critical issue. The current silent failure is intentional (conservative approach when bloom filters are unavailable), so the impact is moderate.

Low
Suggestions up to commit 55b2ccc
CategorySuggestion                                                                                                                                    Impact
Possible issue
Avoid blocking async executor

Using futures::executor::block_on in a potentially async context can cause deadlocks
or panics if called from within an async runtime. The prefetch_rg method should be
made async, or the bloom filter check should be moved to an earlier async stage to
avoid blocking the executor.

sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_table/eval/single_collector.rs [256-281]

-if self.bloom_filter_on_read {
-    if let (Some(store), Some(path), Some(meta), Some(schema), Some(pred)) = (
-        self.store.as_ref(),
-        self.object_path.as_ref(),
-        self.metadata.as_ref(),
-        self.arrow_schema.as_ref(),
-        self.residual_expr.as_ref(),
-    ) {
-        let pruned = futures::executor::block_on(
-            crate::indexed_table::bloom_pruner::bloom_prune_rg(
-                store.as_ref(),
-                path,
-                meta.as_ref(),
-                schema.as_ref(),
-                rg.index,
-                pred,
-            )
-        );
-        if pruned {
-            return Ok(None);
+// Make prefetch_rg async and await the bloom filter check:
+async fn prefetch_rg(
+    &self,
+    rg: &RowGroupInfo,
+    min_doc: i32,
+    max_doc: i32,
+) -> Result<Option<PrefetchedRg>, String> {
+    if self.bloom_filter_on_read {
+        if let (Some(store), Some(path), Some(meta), Some(schema), Some(pred)) = (...) {
+            let pruned = crate::indexed_table::bloom_pruner::bloom_prune_rg(
+                store.as_ref(), path, meta.as_ref(), schema.as_ref(), rg.index, pred
+            ).await;
+            if pruned {
+                return Ok(None);
+            }
         }
     }
+    ...
 }
Suggestion importance[1-10]: 9

__

Why: Using futures::executor::block_on within an async context can cause deadlocks or runtime panics. This is a critical issue that could lead to production failures. Making prefetch_rg async is the correct solution.

High
General
Log bloom filter read errors

The bloom filter read silently returns None on any IO error, which could mask
transient failures or misconfigurations. Consider logging errors at debug or warn
level before returning None to aid troubleshooting in production.

sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_table/bloom_pruner.rs [169-175]

 let opts = GetOptions {
     range: Some(GetRange::Bounded(std::ops::Range { start: bf_offset as u64, end: (bf_offset + bf_length) as u64 })),
     ..Default::default()
 };
-let result = store.get_opts(path, opts).await.ok()?;
-let bytes = result.bytes().await.ok()?;
+let result = match store.get_opts(path, opts).await {
+    Ok(r) => r,
+    Err(e) => {
+        debug!("Failed to read bloom filter for rg={}, col={}: {}", rg_idx, col_idx, e);
+        return None;
+    }
+};
+let bytes = match result.bytes().await {
+    Ok(b) => b,
+    Err(e) => {
+        debug!("Failed to read bloom filter bytes for rg={}, col={}: {}", rg_idx, col_idx, e);
+        return None;
+    }
+};
Suggestion importance[1-10]: 6

__

Why: Adding debug logging for IO errors when reading bloom filters would significantly improve troubleshooting capabilities in production without affecting performance, as errors are currently silently swallowed.

Low
Remove unreachable fallback code

The fallback logic after the loop assigns _field_idx but never uses it, then
unconditionally returns None. This means columns that don't match by name in the
parquet schema will always fail resolution, even if the Arrow schema contains them.
Either implement the Arrow-to-parquet mapping or remove the dead code.

sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_table/bloom_pruner.rs [116-131]

 fn resolve_parquet_col_idx(
     col_name: &str,
     metadata: &ParquetMetaData,
     arrow_schema: &arrow::datatypes::Schema,
 ) -> Option<usize> {
     let schema_descr = metadata.file_metadata().schema_descr();
     for i in 0..schema_descr.num_columns() {
         let col_descr = schema_descr.column(i);
         if col_descr.name() == col_name {
             return Some(i);
         }
     }
-    // Try matching via Arrow schema field name → parquet leaf index
-    let _field_idx = arrow_schema.index_of(col_name).ok()?;
     None
 }
Suggestion importance[1-10]: 5

__

Why: The suggestion correctly identifies dead code where _field_idx is assigned but never used, and None is unconditionally returned. Removing this improves code clarity and maintainability.

Low
Suggestions up to commit 83e2fc5
CategorySuggestion                                                                                                                                    Impact
Possible issue
Avoid blocking async calls unsafely

Using block_on in a synchronous context can cause deadlocks or panics if called from
within an async runtime. Consider making prefetch_rg async or using a dedicated
blocking thread pool via tokio::task::spawn_blocking to safely execute the async
bloom filter check.

sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_table/eval/single_collector.rs [267-276]

-let pruned = futures::executor::block_on(
-    crate::indexed_table::bloom_pruner::bloom_prune_rg(
-        store.as_ref(),
-        path,
-        meta.as_ref(),
-        schema.as_ref(),
-        rg.index,
-        pred,
+let pruned = tokio::task::block_in_place(|| {
+    tokio::runtime::Handle::current().block_on(
+        crate::indexed_table::bloom_pruner::bloom_prune_rg(
+            store.as_ref(),
+            path,
+            meta.as_ref(),
+            schema.as_ref(),
+            rg.index,
+            pred,
+        )
     )
-);
+});
Suggestion importance[1-10]: 8

__

Why: Using futures::executor::block_on in a synchronous context within an async runtime can cause deadlocks or panics. The suggestion to use tokio::task::block_in_place is a valid improvement for safely executing async code from a sync context, addressing a potential runtime issue.

Medium
Security
Prevent integer overflow in range calculation

The range calculation bf_offset + bf_length can overflow if both values are near
usize::MAX. Use checked arithmetic to prevent potential panics or incorrect range
calculations that could read wrong data from the object store.

sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_table/bloom_pruner.rs [169-172]

+let end_offset = bf_offset.checked_add(bf_length)?;
 let opts = GetOptions {
-    range: Some(GetRange::Bounded(std::ops::Range { start: bf_offset as u64, end: (bf_offset + bf_length) as u64 })),
+    range: Some(GetRange::Bounded(std::ops::Range { start: bf_offset as u64, end: end_offset as u64 })),
     ..Default::default()
 };
Suggestion importance[1-10]: 7

__

Why: The addition bf_offset + bf_length could theoretically overflow if both values are near usize::MAX. Using checked_add prevents potential panics and incorrect range calculations, improving robustness and safety of the bloom filter reading logic.

Medium
General
Validate row group index bounds

The function doesn't validate that rg_idx is within bounds of the row groups in
metadata. An out-of-bounds index will cause metadata.row_groups().get(rg_idx) to
return None silently, but this should be caught earlier to prevent logic errors.

sandbox/plugins/analytics-backend-datafusion/rust/src/indexed_table/bloom_pruner.rs [185-196]

 pub async fn bloom_prune_rg(
     store: &dyn ObjectStore,
     path: &object_store::path::Path,
     metadata: &ParquetMetaData,
     arrow_schema: &arrow::datatypes::Schema,
     rg_idx: usize,
     predicate: &Arc<dyn PhysicalExpr>,
 ) -> bool {
+    if rg_idx >= metadata.row_groups().len() {
+        return false;
+    }
     let checks = extract_equality_checks(predicate, metadata, arrow_schema);
     if checks.is_empty() {
         return false;
     }
Suggestion importance[1-10]: 6

__

Why: Adding bounds validation for rg_idx before accessing metadata.row_groups() improves defensive programming. While the current code handles None gracefully via ? operators, explicit validation makes the intent clearer and prevents potential logic errors from invalid indices.

Low

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 55b2ccc

@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for 55b2ccc: SUCCESS

@codecov
Copy link
Copy Markdown

codecov Bot commented May 30, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 73.49%. Comparing base (dad63c0) to head (c6b93d9).
⚠️ Report is 5 commits behind head on main.

Additional details and impacted files
@@             Coverage Diff              @@
##               main   #21904      +/-   ##
============================================
- Coverage     73.51%   73.49%   -0.03%     
+ Complexity    75582    75553      -29     
============================================
  Files          6034     6034              
  Lines        342661   342661              
  Branches      49294    49294              
============================================
- Hits         251918   251827      -91     
- Misses        70712    70810      +98     
+ Partials      20031    20024       -7     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@Bukhtawar Bukhtawar force-pushed the bloom-filter-read-pruning branch from 55b2ccc to 2b97c12 Compare May 30, 2026 12:30
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 2b97c12

@Bukhtawar Bukhtawar force-pushed the bloom-filter-read-pruning branch from 2b97c12 to 7911c00 Compare May 30, 2026 12:54
@github-actions
Copy link
Copy Markdown
Contributor

PR Code Analyzer ❗

AI-powered 'Code-Diff-Analyzer' found issues on commit 7911c00.

'Diff too large, requires skip by maintainers after manual review'


Pull Requests Author(s): Please update your Pull Request according to the report above.

Repository Maintainer(s): You can bypass diff analyzer by adding label skip-diff-analyzer after reviewing the changes carefully, then re-run failed actions. To re-enable the analyzer, remove the label, then re-run all actions.


⚠️ Note: The Code-Diff-Analyzer helps protect against potentially harmful code patterns. Please ensure you have thoroughly reviewed the changes beforehand.

Thanks.

Integrate per-row-group bloom filter checks into the indexed execution
prefetch phase. Reuses DataFusion's PruningPredicate::prune() with a
BloomFilterStatistics PruningStatistics impl — same pattern DataFusion
uses internally for its vanilla ParquetExec bloom filter pruning.

When enabled, the PruningPredicate's literal_columns() identifies which
columns to check, bloom filter bytes are read from the object store,
and PruningPredicate::prune() evaluates whether the RG can be skipped.
If pruned, the FFM collector call, page pruning, and decode are all
avoided.

Java:
- Add datafusion.indexed.bloom_filter_on_read (Dynamic, NodeScope, default true)
- Wire through WireConfigSnapshot at offset 72 (i32, 0/1)

Rust:
- New module: indexed_table/bloom_pruner.rs
  - BloomFilterStatistics implements PruningStatistics (same as DataFusion)
  - check_scalar: type-aware bloom check (aligned with DataFusion's impl)
  - bloom_prune_rg: loads per-column Sbbf, calls predicate.prune()
- SingleCollectorEvaluator: calls bloom_prune_rg at top of prefetch_rg
- DatafusionQueryConfig: bloom_filter_on_read field in wire format

Signed-off-by: Bukhtawar Khan <bukhtawa@amazon.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: Bukhtawar Khan <bukhtawa@amazon.com>
@Bukhtawar Bukhtawar force-pushed the bloom-filter-read-pruning branch from 7911c00 to 5d5ed91 Compare May 30, 2026 12:58
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 5d5ed91

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for 5d5ed91: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Two benchmarks measuring bloom filter pruning performance:

1. bloom_filter_bench: Micro-benchmark of bloom_prune_rg in isolation.
   Measures IO + check cost per RG for present/absent/no-bloom scenarios.

2. bloom_filter_e2e_bench: End-to-end benchmark of full prefetch_rg
   pipeline with a mock collector simulating 1ms FFM latency.
   Demonstrates 7x speedup when bloom filter prunes an absent value.

Run: cargo bench --bench bloom_filter_bench
     cargo bench --bench bloom_filter_e2e_bench

Signed-off-by: Bukhtawar Khan <bukhtawa@amazon.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Signed-off-by: Bukhtawar Khan <bukhtawa@amazon.com>
@Bukhtawar Bukhtawar force-pushed the bloom-filter-read-pruning branch from 5d5ed91 to 5fa55f5 Compare May 30, 2026 16:40
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 5fa55f5

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for 5fa55f5: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit c6b93d9

@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for c6b93d9: SUCCESS

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant