Skip to content

Lucene as a driving backend for shard-local count fragments#21867

Open
alchemist51 wants to merge 6 commits into
opensearch-project:mainfrom
alchemist51:indexed-count
Open

Lucene as a driving backend for shard-local count fragments#21867
alchemist51 wants to merge 6 commits into
opensearch-project:mainfrom
alchemist51:indexed-count

Conversation

@alchemist51
Copy link
Copy Markdown
Contributor

@alchemist51 alchemist51 commented May 28, 2026

What changed

Lucene becomes a first-class driving backend for count fragments fully expressible against the inverted index (today: count(*) / count(col) over Lucene-indexable filters).

Capability surface

  • ScanCapability.InvertedIndex — a metadata-only scan that produces no row values. A backend declaring it can drive a stage iff every downstream op only needs document existence/cardinality, not column values.
  • OpenSearchTableScanRule two-phase viability:
    • Strict for value-producing backends: must cover every requested field (natively or via accepted delegation).
    • Permissive for the metadata-only driver (today: lucene): viable if it covers some indexed field. Downstream ops that need a value-producing column self-restrict, and PlanForker's chain-agreement filter drops the driver for chains that aren't end-to-end metadata-only. The only chain that survives is the count fast-path shape.

Coordinator-side alternative collapse

  • PlanAlternativeSelector runs on the coordinator after PlanForker + BackendPlanAdapter and before FragmentConversionDriver. When analytics.planner.prefer_metadata_driver (default true) is set and a stage has a lucene alternative, that alternative wins; the stage's alternative list collapses to a single StagePlan.
  • Convertor now runs once per stage; each FragmentExecutionRequest ships exactly one PlanAlternative; the data node skips alternative selection.

Lucene driver path

  • LuceneFragmentConvertor lowers the stage to [outputColumnNames][hasFilter?][BoolQueryBuilder] over NamedWriteable.
  • LuceneSearchExecEngine runs IndexSearcher.count(boolQuery); the count is exported through Arrow C-Data (Data.exportVectorSchemaRoot) so Flight's transferRoot round-trips it without zeroing — pure-Java setSafe-built buffers don't survive transfer.
  • Naming mirrors the DataFusion path: LuceneSearcherState, LuceneSearchExecEngine, LuceneResultStream.

Delegation combiner

  • New cluster setting analytics.delegation.fuse_dual_viable (default false): when on, an OR/NOT subtree where every leaf is dual-viable is fused into a single delegated boolean shipped to the peer driver, instead of carving the performance-delegated leaves back out.
  • Combiner had a latent break against AnnotatedPredicate markers nested inside an OR/NOT carve-out — fixed by an unwrapAnnotations(RexNode) pass before re-wrapping in a delegation_possible(...) placeholder.

Translation example

where userID = 'arpit' AND event_type = 'click' AND match(message, 'alpha') | stats count()

parse     Aggregate(COUNT)
            Filter(=userID,'arpit'  AND  =event_type,'click'  AND  match(message,'alpha'))
              TableScan(index)

mark      every node viable={lucene, datafusion}
          (all three predicates Lucene-indexable; count is a Lucene aggregate cap)

fork      stage alternatives = [lucene, datafusion]

select    prefer_metadata_driver=true  →  alternatives = [lucene]

convert   Lucene wire payload:
            outputColumnNames = [cnt]
            hasFilter = true
            BoolQueryBuilder{
              must = [ TermQuery(userID,'arpit'),
                       TermQuery(event_type,'click'),
                       MatchQuery(message,'alpha') ]
            }

execute   searcher.count(bool)  →  Arrow batch [cnt: Int64 = N]

If userID were a numeric field with no inverted index, the strict gate would drop lucene from the value-producing pool but the permissive gate keeps it for the count chain — PlanForker's chain-agreement still drops it because the = on userID can't run on the metadata-only driver, and the stage falls back to datafusion.

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 28, 2026

PR Reviewer Guide 🔍

(Review updated until commit f5e771b)

Here are some key observations to aid the review process:

🧪 PR contains tests
🔒 No security concerns identified
📝 TODO sections

🔀 No multiple PR themes
⚡ Recommended focus areas for review

Swallowed Exceptions

trackStart and trackEnd now catch all Throwable instances and log them, then continue. If an assertion failure or other critical error occurs (e.g., OutOfMemoryError), swallowing it silently disables resource tracking without surfacing the root cause. This can mask bugs during development and make debugging harder. Consider catching only Exception (not Throwable) or re-throwing errors after logging.

    // Must never throw — runs OUTSIDE the try/catch in each upcall target, so any
    // escaping exception (e.g. an `assert false` in TaskResourceTrackingService when
    // the thread is already tracked) crosses the FFM boundary and aborts the JVM
    // with `Unrecoverable uncaught exception encountered`. Swallow everything and
    // disable tracking for the remainder of this upcall by returning -1.
    try {
        DelegationThreadTracker t = TRACKER.get();
        return (t != null) ? t.trackStart() : -1;
    } catch (Throwable throwable) {
        LOGGER.warn("trackStart failed; resource attribution disabled for this upcall", throwable);
        return -1;
    }
}

private static void trackEnd(long threadId) {
    if (threadId < 0) return;
    // Same FFM safety rule as trackStart — runs in a `finally` block, so any
    // exception escaping here would mask the actual upcall result and abort the JVM.
    try {
        DelegationThreadTracker t = TRACKER.get();
        if (t != null) t.trackEnd(threadId);
    } catch (Throwable throwable) {
        LOGGER.warn("trackEnd failed", throwable);
    }
}
Capability Drift Risk

STANDARD_OPS is manually restricted to ScalarFunction.EQUALS with a TODO noting that declaring a capability without a matching serializer causes IllegalStateException at convert time. If a developer adds a new ScalarFunction to STANDARD_OPS without adding its serializer to QuerySerializerRegistry, the failure surfaces late (at query execution). The TODO suggests intersecting capabilities against the serializer keyset at startup, but this is not yet implemented. Until then, the manual sync is error-prone.

public class LuceneAnalyticsBackendPlugin implements AnalyticsSearchBackendPlugin {

    private static final String LUCENE_FORMAT = LuceneDataFormat.LUCENE_FORMAT_NAME;
    private static final Set<String> LUCENE_FORMATS = Set.of(LUCENE_FORMAT);

    // Lucene's STANDARD filter capabilities must stay in lockstep with the serializers
    // registered in QuerySerializerRegistry — declaring a capability without a matching
    // DelegatedPredicateSerializer makes the marking layer pick Lucene as viable for
    // operators it can't actually translate, and the failure surfaces at convert time as
    // an IllegalStateException ("No Lucene serializer for [..]"). Today only EQUALS has
    // a serializer; range ops, NOT_EQUALS, IS_NULL, IS_NOT_NULL, IN, LIKE are deferred
    // until their serializers land.
    // TODO: have CapabilityRegistry intersect declared FilterCapability against the
    // backend's serializer keyset at startup so this list can't drift again. The TODO in
    // OpenSearchFilterRule.resolveViableBackends references the same constraint.
    private static final Set<ScalarFunction> STANDARD_OPS = Set.of(ScalarFunction.EQUALS);
Resource Leak

If configureOn throws an exception after the FilterDelegationHandle is acquired but before ownership is transferred to the backend context, the handle is not closed. The finally block at line 346 only closes delegationSetup if it is non-null, but delegationSetup.close() is a no-op once ownershipTransferred is true. If configureOn fails partway through (e.g., installThreadTracker throws), the handle leaks. The fix is to call delegationSetup.close() in the catch block before wrapping it in FragmentResources.

    }

    engine = backend.getSearchExecEngineProvider().createSearchExecEngine(ctx, backendContext);
    stream = engine.execute(ctx);
    return new FragmentResources(
        readerContextStore,
        readerContext,
        engine,
        stream,
        delegationSetup == null ? null : delegationSetup.trackerCleanup()
    );
} catch (Exception e) {
    LOGGER.error(
        () -> new org.apache.logging.log4j.message.ParameterizedMessage(
            "startFragment failed [queryId={}, stageId={}, shardId={}]",
            resolved.queryId,
            resolved.stageId,
            resolved.shardIdStr
        ),
        e
    );
    Runnable trackerCleanupOnFailure = delegationSetup == null ? null : delegationSetup.trackerCleanup();
    try {
        new FragmentResources(readerContextStore, readerContext, engine, stream, trackerCleanupOnFailure).close();
    } catch (Exception suppressed) {
        e.addSuppressed(suppressed);
    }
    // Close the delegation handle if it was acquired but not yet wired into the backend context
    // (configureOn transfers ownership; close() before that frees the handle, after is a no-op).
    if (delegationSetup != null) {
        try {
            delegationSetup.close();
        } catch (Exception suppressed) {
            e.addSuppressed(suppressed);
        }
    }
    // Close the backend execution context as a safety net for failure paths that
Double-Close Risk

closeLastBatch only closes nextBatch if it is non-null, but the iterator contract allows hasNext to be called multiple times. If hasNext loads a batch, then close is called, then hasNext is called again (which is legal per Iterator contract), nextBatch could be non-null again and get closed twice. The second close would fail or corrupt state. The fix is to set a 'closed' flag in closeLastBatch and check it in loadNextBatch.

void closeLastBatch() {
    // Only close batches that were loaded but never handed to the caller. Caller
    // owns any batch returned by next(); closing it here would double-close after
    // Flight's transferTo or after row-path reads.
    if (nextBatch != null) {
        nextBatch.close();
        nextBatch = null;
    }
}

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 28, 2026

PR Code Suggestions ✨

Latest suggestions up to f5e771b

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
General
Avoid catching Throwable

Catching Throwable is overly broad and can mask critical JVM errors like
OutOfMemoryError or StackOverflowError. Restrict the catch to Exception to allow
fatal errors to propagate naturally, or explicitly handle only expected exceptions
from trackStart.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/indexfilter/FilterTreeCallbacks.java [65-71]

 try {
     DelegationThreadTracker t = TRACKER.get();
     return (t != null) ? t.trackStart() : -1;
-} catch (Throwable throwable) {
-    LOGGER.warn("trackStart failed; resource attribution disabled for this upcall", throwable);
+} catch (Exception e) {
+    LOGGER.warn("trackStart failed; resource attribution disabled for this upcall", e);
     return -1;
 }
Suggestion importance[1-10]: 7

__

Why: Catching Throwable can mask critical JVM errors like OutOfMemoryError. The suggestion to catch Exception instead is valid and improves error handling, though the impact is moderate since the code already logs and handles the error gracefully.

Medium
Preserve exception chain in cleanup

The nested finally block ensures arrowSchema.close() runs even if array.close()
throws, but exceptions from arrowSchema.close() will mask exceptions from
array.close(). Use try-with-resources or explicitly suppress secondary exceptions to
preserve the original failure.

sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneSearchExecEngine.java [90-96]

 } finally {
     if (transferred == false) {
         try {
             array.close();
-        } finally {
-            arrowSchema.close();
+        } catch (Exception arrayEx) {
+            try {
+                arrowSchema.close();
+            } catch (Exception schemaEx) {
+                arrayEx.addSuppressed(schemaEx);
+            }
+            throw arrayEx;
         }
+        arrowSchema.close();
     }
 }
Suggestion importance[1-10]: 6

__

Why: The suggestion correctly identifies that exceptions from arrowSchema.close() can mask exceptions from array.close(). However, the improved code has issues—it re-throws arrayEx which would escape the method's exception handling. A better approach would use addSuppressed without re-throwing.

Low
Use safer list access pattern

Calling getFirst() on a potentially empty list after checking isEmpty() is safe
here, but the logic is fragile if the loop structure changes. Consider using get(0)
with an explicit size check or restructuring to avoid the pattern.

sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneFragmentConvertor.java [116-117]

 if (current.getInputs().isEmpty()) break;
-current = current.getInputs().getFirst();
+current = current.getInput(0);
Suggestion importance[1-10]: 3

__

Why: The existing code is already safe—it checks isEmpty() before calling getFirst(). The suggestion to use getInput(0) doesn't improve safety and getInput(0) isn't a standard Calcite API method. The concern about fragility is minor.

Low

Previous suggestions

Suggestions up to commit 3c7e5c7
CategorySuggestion                                                                                                                                    Impact
General
Build stream before closing handle

The fast path closes delegationSetup after successfully obtaining the count but
before building the result stream. If buildOneRowCountStream throws an exception,
the delegation handle is closed but the exception propagates without proper resource
cleanup of the readerContext. This could leak the reader context.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/AnalyticsSearchService.java [516-526]

-private FragmentResources tryCountFastPath(
-    List<InstructionNode> instructions,
-    DelegationSetup delegationSetup,
-    ReaderContext readerContext
-) {
-    if (delegationSetup == null) return null;
-    if (instructions.size() != 1) return null;
-    if (!(instructions.get(0) instanceof ShardScanInstructionNode scanNode)) return null;
-    if (!scanNode.countQuery()) return null;
+OptionalLong fast = delegationSetup.handle().tryCountQuery(scanNode.countExistenceFields());
+if (fast.isEmpty()) return null;
 
-    OptionalLong fast = delegationSetup.handle().tryCountQuery(scanNode.countExistenceFields());
-    if (fast.isEmpty()) return null;
-
-    long count = fast.getAsLong();
-    try {
-        delegationSetup.close();
-    } catch (IOException e) {
-        LOGGER.warn("Failed to close delegation handle after count fast path", e);
-    }
-    ...
+long count = fast.getAsLong();
+EngineResultStream stream = buildOneRowCountStream(scanNode.partialCountColumnNames(), count);
+try {
+    delegationSetup.close();
+} catch (IOException e) {
+    LOGGER.warn("Failed to close delegation handle after count fast path", e);
 }
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly identifies a potential resource leak if buildOneRowCountStream throws an exception after closing delegationSetup. Reordering operations ensures the stream is built before closing the handle, improving exception safety.

Medium
Handle concurrent reader closure

The method accesses directoryReader.numDeletedDocs() and searcher.count() without
verifying that the reader is still open. If the reader was closed concurrently
(e.g., by another thread or during query cancellation), these calls could throw
AlreadyClosedException. Add a check or wrap in a try-catch to handle this race
condition gracefully.

sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneFilterDelegationHandle.java [284-312]

 public OptionalLong tryCountQuery(List<String> existenceFields) {
-    if (directoryReader.numDeletedDocs() > 0) {
-        LOGGER.debug("[count-fast-path] decline: shard has {} deleted docs", directoryReader.numDeletedDocs());
-        return OptionalLong.empty();
-    }
-    if (queriesByAnnotationId.isEmpty() && existenceFields.isEmpty()) {
-        return OptionalLong.empty();
-    }
     try {
+        if (directoryReader.numDeletedDocs() > 0) {
+            LOGGER.debug("[count-fast-path] decline: shard has {} deleted docs", directoryReader.numDeletedDocs());
+            return OptionalLong.empty();
+        }
+        if (queriesByAnnotationId.isEmpty() && existenceFields.isEmpty()) {
+            return OptionalLong.empty();
+        }
         Query combined = buildCombinedQuery(existenceFields);
         long n = searcher.count(combined);
         ...
+    } catch (org.apache.lucene.store.AlreadyClosedException e) {
+        LOGGER.warn("[count-fast-path] reader closed during count, declining fast path", e);
+        return OptionalLong.empty();
+    } catch (IOException e) {
+        LOGGER.warn("[count-fast-path] IOException during searcher.count, declining fast path", e);
+        return OptionalLong.empty();
     }
 }
Suggestion importance[1-10]: 6

__

Why: The suggestion addresses a valid concurrency concern where the reader could be closed during tryCountQuery. However, the existing code already has a catch for IOException at line 309, which would catch AlreadyClosedException (a subclass). The suggestion adds explicit handling but doesn't fundamentally change behavior.

Low
Log suppressed close exceptions

The delegation handle close operation in the exception handler may fail silently if
delegationSetup.close() throws an exception that gets suppressed. This could lead to
resource leaks when the primary exception path is taken. Consider logging the
suppressed exception before adding it to ensure visibility of close failures.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/AnalyticsSearchService.java [365-372]

 if (delegationSetup != null) {
     try {
         delegationSetup.close();
     } catch (Exception suppressed) {
+        LOGGER.warn("Failed to close delegation setup during exception handling", suppressed);
         e.addSuppressed(suppressed);
     }
 }
Suggestion importance[1-10]: 5

__

Why: Adding logging for suppressed exceptions improves observability of resource cleanup failures. However, the exception is already being suppressed and added to the primary exception, so the impact is moderate—mainly helpful for debugging.

Low
Narrow exception catch scope

Catching Throwable in trackStart and trackEnd is overly broad and may mask critical
JVM errors like OutOfMemoryError or StackOverflowError. While the comment explains
the FFM safety requirement, consider catching only Exception to allow fatal errors
to propagate, or explicitly handle only expected exceptions like
IllegalStateException.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/indexfilter/FilterTreeCallbacks.java [59-71]

 private static long trackStart() {
     try {
         DelegationThreadTracker t = TRACKER.get();
         return (t != null) ? t.trackStart() : -1;
-    } catch (Throwable throwable) {
-        LOGGER.warn("trackStart failed; resource attribution disabled for this upcall", throwable);
+    } catch (Exception e) {
+        LOGGER.warn("trackStart failed; resource attribution disabled for this upcall", e);
         return -1;
     }
 }
Suggestion importance[1-10]: 3

__

Why: While catching Throwable is generally discouraged, the PR comments explicitly justify this choice for FFM safety—any escaping exception would abort the JVM. The suggestion to narrow to Exception contradicts the stated requirement and could introduce instability.

Low
Suggestions up to commit 8a49ea6
CategorySuggestion                                                                                                                                    Impact
Possible issue
Prevent silent overflow in cast

The cast segment.max_doc as i32 can silently overflow if max_doc exceeds i32::MAX.
This would cause the invariant check to pass incorrectly when it should fail.
Validate max_doc fits in i32 before casting or use checked conversion.

sandbox/plugins/analytics-backend-datafusion/rust/src/count_executor.rs [392-400]

-let expected_max = segment.max_doc as i32;
+let expected_max = i32::try_from(segment.max_doc).map_err(|_| {
+    DataFusionError::Internal(format!(
+        "CountExec: segment.max_doc {} overflows i32 for segment_idx {}",
+        segment.max_doc, chunk.segment_idx
+    ))
+})?;
 if chunk.doc_min != 0 || chunk.doc_max != expected_max {
     return Err(DataFusionError::Internal(format!(
         "CountExec: chunk for segment_idx {} (writer_generation={}) must \
          cover the whole segment but doc_range=[{},{}) (expected [0,{}))",
         chunk.segment_idx, segment.writer_generation,
         chunk.doc_min, chunk.doc_max, expected_max
     )));
 }
Suggestion importance[1-10]: 7

__

Why: Valid concern about silent overflow when casting segment.max_doc to i32. The try_from approach prevents incorrect validation when max_doc exceeds i32::MAX, which could cause the whole-segment invariant check to pass incorrectly.

Medium
Synchronize shared iterator state access

The currentDoc field is mutated during iteration but ScorerHandle may be shared
across multiple countDocs calls. If countDocs is invoked concurrently or multiple
times on the same collectorKey, the iterator state becomes corrupted. Synchronize
access or document that countDocs must not be called concurrently for the same
collector.

sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneFilterDelegationHandle.java [343-356]

 if (handle.scorer == null) {
     return 0L;
 }
 int scanFrom = Math.max(minDoc, handle.partitionMinDoc);
 int scanTo = Math.min(maxDoc, handle.partitionMaxDoc);
 if (scanFrom >= scanTo) {
     return 0L;
 }
 long matches = 0L;
-try {
-    DocIdSetIterator iterator = handle.scorer.iterator();
-    int docId = handle.currentDoc;
+synchronized (handle) {
+    try {
+        DocIdSetIterator iterator = handle.scorer.iterator();
+        int docId = handle.currentDoc;
Suggestion importance[1-10]: 2

__

Why: The concern about concurrent access is misplaced. Each collectorKey maps to a unique ScorerHandle instance created per-segment in createCollector. The contract ensures countDocs is called sequentially per collector from a single DataFusion partition, so synchronization is unnecessary and would add overhead.

Low
General
Validate maxDoc is non-negative

The bounds check validates minDoc < 0 and maxDoc < minDoc but does not verify maxDoc
is non-negative. A negative maxDoc with minDoc=0 would pass validation but
represents invalid input. Add maxDoc < 0 to the condition to reject all negative
bounds.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/indexfilter/FilterTreeCallbacks.java [194-202]

-if (minDoc < 0 || maxDoc < minDoc) {
+if (minDoc < 0 || maxDoc < 0 || maxDoc < minDoc) {
     LOGGER.error(
         "countDocs received invalid bounds collectorKey={}, [{}, {})",
         collectorKey,
         minDoc,
         maxDoc
     );
     return -1L;
 }
Suggestion importance[1-10]: 5

__

Why: The suggestion correctly identifies that maxDoc < 0 is not explicitly checked. While maxDoc < minDoc would catch most negative maxDoc cases when minDoc >= 0, adding maxDoc < 0 makes the validation more explicit and defensive against edge cases like minDoc = -1, maxDoc = -2.

Low
Document ownership transfer to prevent leak

The execute function returns Result<i64, DataFusionError> but constructs a stream
handle and boxes it. This creates a memory leak if the caller never consumes the
stream. Consider returning the stream handle directly or documenting the ownership
transfer contract explicitly.

sandbox/plugins/analytics-backend-datafusion/rust/src/count_executor.rs [262-275]

 let (cross_rt_stream, abort_handle) =
     CrossRtStream::new_with_df_error_stream_cancellable(df_stream, cpu_executor);
 let context_id = query_context.context_id();
 if let Some(h) = abort_handle {
     crate::query_tracker::set_abort_handle(context_id, h);
 }
 
 let schema = cross_rt_stream.schema();
 let wrapped = RecordBatchStreamAdapter::new(schema, cross_rt_stream);
 let stream_handle =
     crate::api::QueryStreamHandle::with_session_context(wrapped, query_context, ctx);
+// SAFETY: Ownership of stream_handle is transferred to caller via raw pointer.
+// Caller MUST call the corresponding cleanup function to avoid memory leak.
 Ok(Box::into_raw(Box::new(stream_handle)) as i64)
Suggestion importance[1-10]: 3

__

Why: The suggestion correctly identifies that Box::into_raw transfers ownership, but the existing code already follows Rust's standard FFI pattern. Adding a comment is helpful but not critical since this pattern is well-established in the codebase (mirrored in indexed_executor.rs).

Low
Suggestions up to commit 3500bba
CategorySuggestion                                                                                                                                    Impact
Possible issue
Return scalar count instead of stream

The function signature returns Result<i64, DataFusionError> but the actual execution
creates a stream handle and returns it as a raw pointer. This mismatches the
COUNT_DELEGATION design which should return a scalar count, not a stream pointer.
The count should be extracted from the stream before returning.

sandbox/plugins/analytics-backend-datafusion/rust/src/count_executor.rs [262-275]

-let (cross_rt_stream, abort_handle) =
-    CrossRtStream::new_with_df_error_stream_cancellable(df_stream, cpu_executor);
-let context_id = query_context.context_id();
-if let Some(h) = abort_handle {
-    crate::query_tracker::set_abort_handle(context_id, h);
+let mut stream = datafusion::physical_plan::execute_stream(exec, ctx.task_ctx())
+    .map_err(|e| DataFusionError::Execution(format!("execute_stream: {}", e)))?;
+
+let mut total: i64 = 0;
+while let Some(batch) = stream.next().await {
+    let batch = batch?;
+    if batch.num_rows() > 0 {
+        let array = batch.column(0);
+        let count_array = array.as_any().downcast_ref::<Int64Array>()
+            .or_else(|| array.as_any().downcast_ref::<UInt64Array>().map(|a| a.value(0) as i64))
+            .ok_or_else(|| DataFusionError::Internal("Invalid count array type".into()))?;
+        total += count_array.value(0);
+    }
 }
+Ok(total)
 
-let schema = cross_rt_stream.schema();
-let wrapped = RecordBatchStreamAdapter::new(schema, cross_rt_stream);
-let stream_handle =
-    crate::api::QueryStreamHandle::with_session_context(wrapped, query_context, ctx);
-Ok(Box::into_raw(Box::new(stream_handle)) as i64)
-
Suggestion importance[1-10]: 10

__

Why: Critical design flaw: the function signature promises Result<i64, DataFusionError> (a scalar count) but returns a stream pointer. This breaks the COUNT_DELEGATION contract where callers expect a direct count value, not a stream handle requiring further consumption.

High
General
Explicit cast for integer overflow safety

The Weight.count() result is cast from int to long for return, but the iteration
fallback accumulates into a long matches variable. If Weight.count() returns
Integer.MAX_VALUE for a large segment, the cast preserves the value correctly, but
document iteration could theoretically exceed Integer.MAX_VALUE and overflow when
stored in int docId. Consider validating the fast-path result doesn't indicate
overflow.

sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneFilterDelegationHandle.java [321-340]

 if (fullSegment && handle.weight != null) {
     try {
         int fast = handle.weight.count(handle.leaf);
         if (fast >= 0) {
             LOGGER.info(...);
-            return fast;
+            return (long) fast;
         }
         LOGGER.info(...);
     } catch (IOException exception) {
         LOGGER.warn("Weight.count failed, falling back to iterate-and-count", exception);
     }
 }
Suggestion importance[1-10]: 3

__

Why: The cast from int to long at line 332 (return fast;) is implicit in Java and safe. The suggestion's concern about Integer.MAX_VALUE overflow is theoretical; Lucene's Weight.count() contract already handles this. The explicit cast adds no safety benefit.

Low
Suggestions up to commit 3ca715a
CategorySuggestion                                                                                                                                    Impact
Possible issue
Reset iterator state per call

The currentDoc field is being read and updated across multiple countDocs calls on
the same ScorerHandle, creating stateful iteration. If countDocs is called multiple
times with overlapping or non-sequential ranges, the iterator position from previous
calls will cause incorrect counts. Either document that countDocs must be called
exactly once per collector, or reset the iterator state at the start of each call.

sandbox/plugins/analytics-backend-lucene/src/main/java/org/opensearch/be/lucene/LuceneFilterDelegationHandle.java [341-353]

 if (handle.scorer == null) {
     return 0L;
 }
 int scanFrom = Math.max(minDoc, handle.partitionMinDoc);
 int scanTo = Math.min(maxDoc, handle.partitionMaxDoc);
 if (scanFrom >= scanTo) {
     return 0L;
 }
 long matches = 0L;
 try {
     DocIdSetIterator iterator = handle.scorer.iterator();
-    int docId = handle.currentDoc;
+    int docId = iterator.nextDoc();
+    if (docId == DocIdSetIterator.NO_MORE_DOCS) {
+        return 0L;
+    }
Suggestion importance[1-10]: 9

__

Why: This identifies a critical stateful iteration bug. The currentDoc field is read from handle.currentDoc and updated across calls, meaning multiple countDocs invocations on the same ScorerHandle will produce incorrect counts. The suggested fix to call iterator.nextDoc() instead of reading handle.currentDoc is correct and prevents this issue.

High
General
Guard against empty viable backends

Calling getFirst() on viableBackends will throw NoSuchElementException if the list
is empty. While the planner contract may guarantee non-empty viable backends, add a
defensive check or use isEmpty() guard before accessing the first element to prevent
runtime crashes if the contract is violated.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/planner/dag/CountDelegationDetector.java [128-138]

 private static boolean filterIsAllDelegatedLeaves(RexNode node, String drivingBackendId) {
     if (node instanceof AnnotatedPredicate predicate) {
-        boolean isCorrectness = !predicate.getViableBackends().getFirst().equals(drivingBackendId);
+        boolean isCorrectness = !predicate.getViableBackends().isEmpty() 
+            && !predicate.getViableBackends().getFirst().equals(drivingBackendId);
         boolean isPerformance = !predicate.getPerformanceDelegationBackends().isEmpty();
         boolean delegated = isCorrectness || isPerformance;
         ...
         return delegated;
     }
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly identifies that getFirst() will throw if viableBackends is empty. While the planner contract may guarantee non-empty lists, adding a defensive check improves robustness and prevents potential runtime crashes if the contract is violated upstream.

Medium
Handle empty range early

The bounds check maxDoc < minDoc allows maxDoc == minDoc to proceed to the handle
dispatch, but this represents an empty range that should return 0 immediately. Add
an early return for this case to avoid unnecessary FFM boundary crossing and handle
invocation.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/indexfilter/FilterTreeCallbacks.java [194-202]

 public static long countDocs(int collectorKey, int minDoc, int maxDoc) {
     long tid = trackStart();
     try {
         if (minDoc < 0 || maxDoc < minDoc) {
             LOGGER.error(
                 "countDocs received invalid bounds collectorKey={}, [{}, {})",
                 collectorKey,
                 minDoc,
                 maxDoc
             );
             return -1L;
         }
+        if (maxDoc == minDoc) {
+            return 0L;
+        }
         ...
     }
 }
Suggestion importance[1-10]: 5

__

Why: The suggestion correctly identifies that maxDoc == minDoc represents an empty range. However, the Rust-side count_docs already handles this case (line 218-220 in ffm_callbacks.rs), so adding a Java-side early return is a minor optimization that avoids an FFM call but doesn't fix a bug.

Low
Defer provider creation after validation

If extract_single_count_collector returns an error but create_provider was already
called for a different annotation_id in a retry scenario, the provider handle may
leak. Ensure that provider creation happens only after all validation passes, or
implement proper cleanup on early returns to prevent resource leaks across the FFM
boundary.

sandbox/plugins/analytics-backend-datafusion/rust/src/count_executor.rs [199-222]

 let annotation_id = extract_single_count_collector(&extraction.tree)?;
 
-...
+let layouts: Vec<SegmentLayout> = segments
+    .iter()
+    .map(|seg| SegmentLayout {
+        row_groups: seg.row_groups.clone(),
+    })
+    .collect();
+let assignments =
+    SegmentGrouped.assign(&layouts, query_config.target_partitions.max(1));
 
 let provider = Arc::new(
     create_provider(annotation_id).map_err(|e| DataFusionError::External(e.into()))?,
 );
Suggestion importance[1-10]: 4

__

Why: The suggestion proposes moving create_provider after SegmentGrouped.assign to avoid leaking the provider if assignment fails. However, the current code already has provider wrapped in Arc and dropped on error paths, so the leak risk is minimal. The reordering is a minor improvement but not critical.

Low

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for 3ca715a: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 28, 2026

PR Code Analyzer ❗

AI-powered 'Code-Diff-Analyzer' found issues on commit 69cd418.

PathLineSeverityDescription
sandbox/plugins/analytics-backend-lucene/build.gradle32highFour new Apache Arrow dependencies added (arrow-vector, arrow-memory-core, arrow-format, arrow-c-data) pinned to a version variable. Per mandatory policy, all dependency additions must be flagged regardless of apparent legitimacy — maintainers should verify artifact authenticity and that the resolved versions match expected checksums.

The table above displays the top 10 most important findings.

Total: 1 | Critical: 0 | High: 1 | Medium: 0 | Low: 0


Pull Requests Author(s): Please update your Pull Request according to the report above.

Repository Maintainer(s): You can bypass diff analyzer by adding label skip-diff-analyzer after reviewing the changes carefully, then re-run failed actions. To re-enable the analyzer, remove the label, then re-run all actions.


⚠️ Note: The Code-Diff-Analyzer helps protect against potentially harmful code patterns. Please ensure you have thoroughly reviewed the changes beforehand.

Thanks.

@alchemist51 alchemist51 force-pushed the indexed-count branch 2 times, most recently from 4fb86e5 to 3500bba Compare May 28, 2026 15:17
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 3500bba

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for 3500bba: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 8a49ea6

@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for 8a49ea6: SUCCESS

@codecov
Copy link
Copy Markdown

codecov Bot commented May 29, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 73.49%. Comparing base (dad63c0) to head (f5e771b).
⚠️ Report is 5 commits behind head on main.

Additional details and impacted files
@@             Coverage Diff              @@
##               main   #21867      +/-   ##
============================================
- Coverage     73.51%   73.49%   -0.03%     
+ Complexity    75582    75548      -34     
============================================
  Files          6034     6034              
  Lines        342661   342661              
  Branches      49294    49294              
============================================
- Hits         251918   251846      -72     
- Misses        70712    70803      +91     
+ Partials      20031    20012      -19     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 3c7e5c7

@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for 3c7e5c7: SUCCESS

@alchemist51 alchemist51 changed the title [Draft] Fast path for count queries in case of delegation possible Lucene as a driving backend for shard-local count fragments May 30, 2026
trackStart runs outside the per-upcall try/catch and trackEnd in the
finally, so any escaping exception crosses the FFM boundary and aborts
the JVM. Wrap both in try/catch(Throwable); on failure, log warn and
return -1 from trackStart.

Signed-off-by: Arpit Bandejiya <abandeji@amazon.com>
startFragment had grown three concerns inline: instruction-handler
fan-out, delegation-handle ownership transfer, and per-task thread
tracking. Pull each into a named helper. No behaviour change — same
ordering, same failure cleanup, same idempotent close semantics.

Signed-off-by: Arpit Bandejiya <abandeji@amazon.com>
Planner produces a Lucene StagePlan alternative for count(*)/count(col)
over Lucene-indexable filters. Data node prefers Lucene, runs
IndexSearcher.count, exports through Arrow C-Data so the result VSR
survives Flight's VectorTransfer (pure-Java setSafe buffers don't).

- ScanCapability.InvertedIndex variant: metadata-only scan declaration.
- TableScan viability: strict for value backends, permissive (any field)
  for metadata-only.
- LuceneFragmentConvertor: filter as NamedWriteable BoolQueryBuilder.
- LuceneResultStream mirrors DatafusionResultStream.BatchIterator for
  Flight-safe buffers.
- DelegatedPredicateCombiner.makePlaceholder no longer assumes
  Delegated.subtree() is a leaf AnnotatedPredicate — recursively unwraps
  markers from the bubble-up RexCall so Substrait conversion stays clean.

Tests: CountFastPathIT covers Lucene-only, fused all-keyword AND/OR/NOT,
DataFusion-only numeric, mixed AND/OR, MATCH, IN, NOT.

Signed-off-by: Arpit Bandejiya <abandeji@amazon.com>
Signed-off-by: Arpit Bandejiya <abandeji@amazon.com>
@alchemist51 alchemist51 added the skip-diff-analyzer Maintainer to skip code-diff-analyzer check, after reviewing issues in AI analysis. label May 31, 2026
@alchemist51 alchemist51 marked this pull request as ready for review May 31, 2026 05:25
@alchemist51 alchemist51 requested a review from a team as a code owner May 31, 2026 05:25
# Conflicts:
#	sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/AnalyticsPlugin.java
Signed-off-by: Arpit Bandejiya <abandeji@amazon.com>
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit f5e771b

@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for f5e771b: SUCCESS

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

skip-diff-analyzer Maintainer to skip code-diff-analyzer check, after reviewing issues in AI analysis.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant