Skip to content

Fix filter delegation concurrency bug by threading context_id through FFM upcalls#21845

Open
aravindsagar wants to merge 6 commits into
opensearch-project:mainfrom
aravindsagar:fix/filter-delegation-concurrency
Open

Fix filter delegation concurrency bug by threading context_id through FFM upcalls#21845
aravindsagar wants to merge 6 commits into
opensearch-project:mainfrom
aravindsagar:fix/filter-delegation-concurrency

Conversation

@aravindsagar
Copy link
Copy Markdown
Contributor

@aravindsagar aravindsagar commented May 27, 2026

Description

FilterTreeCallbacks used global AtomicReference singletons for HANDLE and TRACKER. Under concurrent indexed-path queries, these were overwritten by the last query to enter startFragment, causing:

  • Query failures: collectDocs routed to wrong query's Lucene handle -> -1
  • Tracking mis-attribution: trackEnd routed to wrong task -> AssertionError

Fix: pass context_id (= OpenSearch task ID, already available in Rust from QueryTrackingContext) as the first parameter of every FFM upcall. Java uses it to look up the correct (handle, tracker) pair from a ConcurrentHashMap keyed by contextId. Each query gets isolated bindings.

Additionally guards trackStart against same-thread double-tracking (the original Slack-reported variant) by checking isThreadTrackedForTask before calling taskExecutionStartedOnThread.

Related Issues

Resolves #[Issue number to be closed when this PR is merged]

Check List

  • Functionality includes testing.
  • API changes companion pull request created, if applicable.
  • Public documentation issue/PR created, if applicable.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@aravindsagar aravindsagar requested a review from a team as a code owner May 27, 2026 07:12
@github-actions
Copy link
Copy Markdown
Contributor

PR Code Analyzer ❗

AI-powered 'Code-Diff-Analyzer' found issues on commit d2def46.

PathLineSeverityDescription
server/src/main/java/org/opensearch/tasks/TaskResourceTrackingService.java236mediumPreviously private method `isCurrentThreadWorkingOnTask` is refactored and re-exposed as the public method `isThreadTrackedForTask(Task task, long threadId)`. This widens the API surface of the task tracking subsystem, allowing any caller with a Task reference to probe whether an arbitrary thread ID is actively tracked — information that could be used to infer monitoring state or evade detection in adversarial code running within the same JVM.
sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/AnalyticsSearchService.java223lowA new hard `throw new IllegalStateException` is introduced when `task == null` during filter delegation setup. This silently changes the behavior for any prior code path where task could legitimately be null (previously it would proceed without tracking). If any production caller reaches this path with a null task, it becomes an unchecked runtime failure rather than a graceful no-op, creating a potential availability issue. The intent appears legitimate (enforcing invariants for per-query isolation), but the behavioral change is not backward-compatible.

The table above displays the top 10 most important findings.

Total: 2 | Critical: 0 | High: 0 | Medium: 1 | Low: 1


Pull Requests Author(s): Please update your Pull Request according to the report above.

Repository Maintainer(s): You can bypass diff analyzer by adding label skip-diff-analyzer after reviewing the changes carefully, then re-run failed actions. To re-enable the analyzer, remove the label, then re-run all actions.


⚠️ Note: The Code-Diff-Analyzer helps protect against potentially harmful code patterns. Please ensure you have thoroughly reviewed the changes beforehand.

Thanks.

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 27, 2026

PR Reviewer Guide 🔍

(Review updated until commit c05f752)

Here are some key observations to aid the review process:

🧪 PR contains tests
🔒 No security concerns identified
✅ No TODO sections
🔀 No multiple PR themes
⚡ Recommended focus areas for review

Possible Issue

The code checks if (task == null) after already dereferencing task.getId() on line 324 (in the old code path). The new code moves the null check earlier (line 331), but the old code at line 324 would have thrown NullPointerException if task was null. This suggests the null check may be unreachable in practice, or the old code had a latent bug. If task can actually be null when delegation is present, the old code would have crashed before reaching the new validation.

if (task == null) {
    throw new IllegalStateException("Filter delegation requires a tracked task for per-query isolation");
}
Resource Leak Risk

The reordered close sequence (stream, engine, rowIdVector, then onClose) means that if stream.close() or engine.close() throws an exception, onClose still runs (via the finally-like addSuppressed logic). However, if onClose.run() itself throws and there is no prior exception, the new code sets first = e only after closing stream/engine/rowIdVector. This is correct. But if rowIdVector close fails after engine close fails, the rowIdVector exception is added as suppressed to the engine exception, and then onClose exception (if any) is added as suppressed to that chain. The original code ran onClose first, so any onClose exception became the primary exception. The new order makes stream or engine exceptions primary. This changes exception precedence and may hide onClose failures (like unregister errors) if an earlier close also fails. If onClose is critical (e.g., unregistering the context binding to prevent leaks), a failure in stream.close() could mask the unregister failure, leaving a stale binding in FilterTreeCallbacks.BINDINGS.

Exception first = closeQuietly(stream, null);
first = closeQuietly(engine, first);
first = closeQuietly(rowIdVector, first);
if (onClose != null) {
    try {
        onClose.run();
    } catch (Exception e) {
        if (first == null) first = e;
        else first.addSuppressed(e);
    }
}

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 27, 2026

PR Code Suggestions ✨

Latest suggestions up to c05f752

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Propagate accumulated cleanup exceptions

The comment explains that onClose must run after stream/engine close to avoid
unregistering the binding while release upcalls are pending. However, if
onClose.run() throws, the exception handling doesn't rethrow first, potentially
swallowing critical cleanup failures. Ensure the accumulated exception is
propagated.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/FragmentResources.java [85-95]

 Exception first = closeQuietly(stream, null);
 first = closeQuietly(engine, first);
 first = closeQuietly(rowIdVector, first);
 if (onClose != null) {
     try {
         onClose.run();
     } catch (Exception e) {
         if (first == null) first = e;
         else first.addSuppressed(e);
     }
 }
+if (first != null) {
+    throw first;
+}
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly identifies that accumulated exceptions from close() are not rethrown, which could mask cleanup failures. Adding if (first != null) throw first; after the cleanup sequence would ensure failures are propagated to the caller, improving error visibility.

Medium
General
Propagate handle cleanup failures

The cleanup lambda logs but swallows exceptions from handle.close(). If handle
cleanup fails (e.g., resource leak in the accepting backend), the caller has no
visibility. Consider rethrowing or wrapping in an unchecked exception so the failure
surfaces in query execution monitoring.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionAnalyticsBackendPlugin.java [825-833]

 FilterTreeCallbacks.register(contextId, handle, tracker);
 return () -> {
     FilterTreeCallbacks.unregister(contextId);
     try {
         handle.close();
     } catch (Exception e) {
-        LOGGER.warn(new ParameterizedMessage("FilterDelegationHandle.close() failed for contextId={}", contextId), e);
+        LOGGER.error(new ParameterizedMessage("FilterDelegationHandle.close() failed for contextId={}", contextId), e);
+        throw new RuntimeException("Failed to close FilterDelegationHandle for contextId=" + contextId, e);
     }
 };
Suggestion importance[1-10]: 5

__

Why: The suggestion to propagate handle.close() failures has merit for visibility, but throwing from a cleanup lambda (which runs in a finally block) could mask the original query exception. The current approach of logging at WARN level is reasonable, though upgrading to ERROR and considering suppressed exceptions would be better than a hard throw.

Low
Handle missing task gracefully

The validation occurs after DelegationDescriptor is checked but before any
delegation resources are allocated. However, task could be null in legitimate
scenarios (e.g., non-tracked queries). Consider whether this should be a hard
failure or if delegation should gracefully degrade when task tracking is
unavailable.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/AnalyticsSearchService.java [331-334]

+long contextId;
 if (task == null) {
-    throw new IllegalStateException("Filter delegation requires a tracked task for per-query isolation");
+    LOGGER.warn("Filter delegation invoked without task tracking; using fallback contextId");
+    contextId = System.nanoTime(); // fallback: unique per-invocation
+} else {
+    contextId = task.getId();
 }
-long contextId = task.getId();
Suggestion importance[1-10]: 3

__

Why: The suggestion to use a fallback contextId when task is null could prevent hard failures, but the PR explicitly validates that task must exist for per-query isolation. The fallback approach using System.nanoTime() could introduce subtle concurrency bugs if multiple queries share the same timestamp. The current strict validation is safer.

Low

Previous suggestions

Suggestions up to commit c05f752
CategorySuggestion                                                                                                                                    Impact
Possible issue
Prevent handle leak on registration failure

The code allocates FilterDelegationHandle after validating task != null, but if
getFilterDelegationHandle throws an exception, the handle is never registered and
cannot be cleaned up. Wrap the handle allocation and registration in a try-catch to
ensure any partially-allocated handle is closed on failure, preventing resource
leaks.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/AnalyticsSearchService.java [327-363]

 if (delegation != null) {
-    // Filter delegation routes per-query state via taskId; without a task we cannot
-    // isolate concurrent queries from each other. Validate before allocating any
-    // delegation resources to avoid leaks.
     if (task == null) {
         throw new IllegalStateException("Filter delegation requires a tracked task for per-query isolation");
     }
     long contextId = task.getId();
 
     String acceptingBackendId = delegation.delegatedExpressions().getFirst().getAcceptingBackendId();
     AnalyticsSearchBackendPlugin acceptingBackend = backends.get(acceptingBackendId);
-    FilterDelegationHandle handle = acceptingBackend.getFilterDelegationHandle(delegation.delegatedExpressions(), ctx);
-    ...
+    FilterDelegationHandle handle = null;
+    try {
+        handle = acceptingBackend.getFilterDelegationHandle(delegation.delegatedExpressions(), ctx);
+        ...
+        trackerCleanup = backend.configureFilterDelegation(contextId, handle, tracker, backendContext);
+    } catch (Exception e) {
+        if (handle != null) {
+            try { handle.close(); } catch (Exception suppressed) { e.addSuppressed(suppressed); }
+        }
+        throw e;
+    }
 }
Suggestion importance[1-10]: 7

__

Why: Valid concern about resource leaks if getFilterDelegationHandle or subsequent operations throw exceptions before configureFilterDelegation returns the cleanup action. The suggestion correctly identifies that handle should be closed on failure to prevent leaks.

Medium
General
Eliminate duplicated exception handling logic

The onClose exception handling duplicates the logic of closeQuietly. Extract the
exception-chaining pattern into closeQuietly by accepting a Runnable overload,
eliminating the duplication and ensuring consistent suppression behavior across all
cleanup steps.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/FragmentResources.java [80-95]

 public void close() throws Exception {
-    // Close the stream and engine first so any in-flight release upcalls from native
-    // code (e.g. ProviderHandle::drop -> releaseProvider) can still find their
-    // per-query binding in FilterTreeCallbacks. Running onClose first would unregister
-    // the binding while release upcalls are still pending, causing the eager release
-    // of Lucene resources to be skipped.
     Exception first = closeQuietly(stream, null);
     first = closeQuietly(engine, first);
     first = closeQuietly(rowIdVector, first);
-    if (onClose != null) {
-        try {
-            onClose.run();
-        } catch (Exception e) {
-            if (first == null) first = e;
-            else first.addSuppressed(e);
-        }
-    }
+    first = closeQuietly(onClose, first);
     ...
 }
 
+private static Exception closeQuietly(Runnable action, Exception first) {
+    if (action == null) return first;
+    try {
+        action.run();
+    } catch (Exception e) {
+        if (first == null) return e;
+        first.addSuppressed(e);
+    }
+    return first;
+}
+
Suggestion importance[1-10]: 4

__

Why: The suggestion correctly identifies code duplication in exception handling. However, the proposed closeQuietly(Runnable, Exception) overload would require changes to the existing closeQuietly method signature or adding a new method, which is a minor refactoring improvement rather than a critical issue.

Low
Suggestions up to commit c05f752
CategorySuggestion                                                                                                                                    Impact
Possible issue
Use putIfAbsent to prevent registration race

The assertion after put creates a race window where a concurrent register call with
the same contextId could overwrite the binding before the assertion fires. Use
putIfAbsent to atomically check and insert, preventing the race and ensuring the
assertion always reflects the true state.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/indexfilter/FilterTreeCallbacks.java [78-81]

 public static void register(long contextId, FilterDelegationHandle handle, DelegationThreadTracker tracker) {
-    QueryBinding prev = BINDINGS.put(contextId, new QueryBinding(handle, tracker));
+    QueryBinding prev = BINDINGS.putIfAbsent(contextId, new QueryBinding(handle, tracker));
     assert prev == null : "FilterTreeCallbacks.register: binding already present for contextId=" + contextId;
 }
Suggestion importance[1-10]: 8

__

Why: This is a valid concurrency issue. Using putIfAbsent instead of put eliminates the race window where concurrent register calls could overwrite each other before the assertion fires. This is a correctness improvement that prevents potential data races in the per-query binding registration.

Medium
General
Prevent handle leak on registration failure

If getFilterDelegationHandle throws an exception after the task null-check, the
allocated handle may leak because trackerCleanup is never assigned. Wrap the handle
acquisition and registration in a try-catch that closes the handle on failure, or
defer the task null-check until after handle allocation succeeds.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/AnalyticsSearchService.java [331-339]

 if (task == null) {
     throw new IllegalStateException("Filter delegation requires a tracked task for per-query isolation");
 }
 long contextId = task.getId();
 
 String acceptingBackendId = delegation.delegatedExpressions().getFirst().getAcceptingBackendId();
 AnalyticsSearchBackendPlugin acceptingBackend = backends.get(acceptingBackendId);
-FilterDelegationHandle handle = acceptingBackend.getFilterDelegationHandle(delegation.delegatedExpressions(), ctx);
+FilterDelegationHandle handle = null;
+try {
+    handle = acceptingBackend.getFilterDelegationHandle(delegation.delegatedExpressions(), ctx);
+    ...
+} catch (Exception e) {
+    if (handle != null) {
+        try { handle.close(); } catch (Exception ignored) {}
+    }
+    throw e;
+}
Suggestion importance[1-10]: 7

__

Why: Valid concern about potential resource leak if getFilterDelegationHandle succeeds but subsequent operations fail before trackerCleanup is assigned. The suggestion correctly identifies that the handle should be closed on failure. However, the improved code is incomplete (shows ...) and doesn't show the full try-catch structure needed to properly handle all failure paths.

Medium
Defer expensive keySet iteration until assertion fails

The assertion message concatenates BINDINGS.keySet() which can be expensive when
many queries are registered, and the snapshot may be stale by the time the assertion
fires. Consider logging the keySet only when the assertion fails, or use a simpler
message that doesn't iterate the map.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/indexfilter/FilterTreeCallbacks.java [118-126]

 private static void assertBindingExists(QueryBinding binding, String op, long contextId) {
-    assert binding != null : "FilterTreeCallbacks."
-        + op
-        + ": no binding for contextId="
-        + contextId
-        + " (registered: "
-        + BINDINGS.keySet()
-        + ")";
+    if (binding == null) {
+        String msg = "FilterTreeCallbacks." + op + ": no binding for contextId=" + contextId + " (registered: " + BINDINGS.keySet() + ")";
+        assert false : msg;
+    }
 }
Suggestion importance[1-10]: 4

__

Why: The suggestion correctly identifies that BINDINGS.keySet() is evaluated eagerly in the assertion message, which could be expensive. However, the impact is limited since assertions are typically disabled in production, and the improved code still evaluates the keySet when the assertion fails. The optimization is minor and only affects development/test scenarios.

Low
Ensure onClose exceptions are properly chained

If onClose.run() throws an exception and first is already non-null, the new
exception is added as suppressed. However, if first is null, the exception becomes
the primary exception but any subsequent cleanup failures won't be captured.
Consider wrapping the entire cleanup sequence in a try-finally to ensure all
exceptions are properly chained.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/FragmentResources.java [85-95]

 Exception first = closeQuietly(stream, null);
 first = closeQuietly(engine, first);
 first = closeQuietly(rowIdVector, first);
-if (onClose != null) {
-    try {
+try {
+    if (onClose != null) {
         onClose.run();
-    } catch (Exception e) {
-        if (first == null) first = e;
-        else first.addSuppressed(e);
     }
+} catch (Exception e) {
+    if (first == null) first = e;
+    else first.addSuppressed(e);
 }
Suggestion importance[1-10]: 3

__

Why: The suggestion misunderstands the existing code. The current implementation already properly chains exceptions from onClose.run() with the first exception from prior cleanup steps. The suggested change wraps onClose.run() in try-catch but doesn't improve exception handling since the logic is identical to what's already there.

Low
Suggestions up to commit c05f752
CategorySuggestion                                                                                                                                    Impact
Possible issue
Prevent binding overwrite race condition

The assertion after put creates a race window where a duplicate contextId briefly
overwrites the existing binding before the assertion fires. If assertions are
disabled in production, the old binding leaks. Check for existence before inserting
to prevent the overwrite entirely.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/indexfilter/FilterTreeCallbacks.java [78-81]

 public static void register(long contextId, FilterDelegationHandle handle, DelegationThreadTracker tracker) {
-    QueryBinding prev = BINDINGS.put(contextId, new QueryBinding(handle, tracker));
+    QueryBinding prev = BINDINGS.putIfAbsent(contextId, new QueryBinding(handle, tracker));
     assert prev == null : "FilterTreeCallbacks.register: binding already present for contextId=" + contextId;
 }
Suggestion importance[1-10]: 8

__

Why: This is a valid correctness issue. Using put followed by an assertion creates a race where a duplicate contextId overwrites the existing binding before the assertion fires. With assertions disabled in production, this leads to a leaked binding. Using putIfAbsent prevents the overwrite entirely, making the code safer in both development and production environments.

Medium
General
Optimize assertion diagnostic overhead

The assertion message concatenates BINDINGS.keySet() which can be expensive when
many queries are registered, and the snapshot may be stale by the time the assertion
fires. Consider logging the keySet only when the assertion fails, or limit the
diagnostic to a count of registered bindings to reduce overhead.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/indexfilter/FilterTreeCallbacks.java [118-126]

 private static void assertBindingExists(QueryBinding binding, String op, long contextId) {
-    assert binding != null : "FilterTreeCallbacks."
-        + op
-        + ": no binding for contextId="
-        + contextId
-        + " (registered: "
-        + BINDINGS.keySet()
-        + ")";
+    if (binding == null) {
+        String msg = "FilterTreeCallbacks." + op + ": no binding for contextId=" + contextId 
+            + " (registered count: " + BINDINGS.size() + ")";
+        assert false : msg;
+    }
 }
Suggestion importance[1-10]: 4

__

Why: The suggestion correctly identifies that BINDINGS.keySet() in the assertion message could be expensive. However, the impact is limited since assertions are typically disabled in production, and the overhead only occurs when a lifecycle bug is detected during development. The improved code maintains the assertion's diagnostic value while reducing overhead.

Low
Ensure consistent exception chaining order

If onClose.run() throws an exception and first is already non-null, the new
exception is added as suppressed. However, if first is null, the exception from
onClose becomes the primary exception, potentially masking earlier close failures.
Ensure consistent exception chaining by always treating onClose exceptions as
suppressed when prior close operations failed.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/FragmentResources.java [85-95]

 Exception first = closeQuietly(stream, null);
 first = closeQuietly(engine, first);
 first = closeQuietly(rowIdVector, first);
 if (onClose != null) {
     try {
         onClose.run();
     } catch (Exception e) {
-        if (first == null) first = e;
-        else first.addSuppressed(e);
+        if (first != null) {
+            first.addSuppressed(e);
+        } else {
+            first = e;
+        }
     }
 }
Suggestion importance[1-10]: 3

__

Why: The existing code already handles exception chaining correctly - when first is null, the onClose exception becomes primary; when first is non-null, it's added as suppressed. The suggestion's claim that this "masks earlier close failures" is incorrect; the code properly chains exceptions. The improved code is functionally identical to the existing code, making this a low-value suggestion.

Low
Suggestions up to commit c05f752
CategorySuggestion                                                                                                                                    Impact
Possible issue
Prevent binding leak on close failure

The close ordering relies on native code completing all release upcalls before
onClose unregisters the binding. However, if stream.close() or engine.close() throw
an exception, onClose may never run, leaving the binding leaked in BINDINGS. Wrap
the entire close sequence in a try-finally to guarantee onClose executes.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/FragmentResources.java [79-95]

 public void close() throws Exception {
-    // Close the stream and engine first so any in-flight release upcalls from native
-    // code (e.g. ProviderHandle::drop -> releaseProvider) can still find their
-    // per-query binding in FilterTreeCallbacks. Running onClose first would unregister
-    // the binding while release upcalls are still pending, causing the eager release
-    // of Lucene resources to be skipped.
-    Exception first = closeQuietly(stream, null);
-    first = closeQuietly(engine, first);
-    first = closeQuietly(rowIdVector, first);
-    if (onClose != null) {
-        try {
-            onClose.run();
-        } catch (Exception e) {
-            if (first == null) first = e;
-            else first.addSuppressed(e);
+    Exception first = null;
+    try {
+        first = closeQuietly(stream, null);
+        first = closeQuietly(engine, first);
+        first = closeQuietly(rowIdVector, first);
+    } finally {
+        if (onClose != null) {
+            try {
+                onClose.run();
+            } catch (Exception e) {
+                if (first == null) first = e;
+                else first.addSuppressed(e);
+            }
         }
     }
Suggestion importance[1-10]: 8

__

Why: This is a critical correctness issue. If stream.close() or engine.close() throw an exception, onClose (which unregisters the binding) may never execute, causing a binding leak in BINDINGS. The try-finally ensures cleanup always runs, preventing resource leaks across concurrent queries.

Medium
General
Validate task ID is positive

The validation ensures task is non-null before extracting contextId, but does not
verify that task.getId() returns a valid identifier. If task.getId() returns a
sentinel value (e.g., -1 or 0) that could collide across queries, concurrent queries
might share the same binding. Consider adding a check that contextId is positive.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/AnalyticsSearchService.java [331-334]

 if (task == null) {
     throw new IllegalStateException("Filter delegation requires a tracked task for per-query isolation");
 }
 long contextId = task.getId();
+if (contextId <= 0) {
+    throw new IllegalStateException("Invalid task ID for filter delegation: " + contextId);
+}
Suggestion importance[1-10]: 6

__

Why: The suggestion to validate that contextId is positive is a reasonable defensive check. If task.getId() returns 0 or negative values, concurrent queries could collide. However, this assumes task.getId() can return invalid values, which may not be the case in the actual implementation.

Low
Elevate close failure logging severity

If handle.close() throws an exception, the cleanup action logs a warning but does
not propagate the exception. This silently swallows close failures that might
indicate resource leaks. Consider rethrowing the exception or at least using
LOGGER.error to ensure close failures are visible in production logs.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/DataFusionAnalyticsBackendPlugin.java [819-834]

 public Runnable configureFilterDelegation(
     long contextId,
     FilterDelegationHandle handle,
     DelegationThreadTracker tracker,
     BackendExecutionContext backendContext
 ) {
     FilterTreeCallbacks.register(contextId, handle, tracker);
     return () -> {
         FilterTreeCallbacks.unregister(contextId);
         try {
             handle.close();
         } catch (Exception e) {
-            LOGGER.warn(new ParameterizedMessage("FilterDelegationHandle.close() failed for contextId={}", contextId), e);
+            LOGGER.error(new ParameterizedMessage("FilterDelegationHandle.close() failed for contextId={}", contextId), e);
         }
     };
 }
Suggestion importance[1-10]: 5

__

Why: The suggestion to change LOGGER.warn to LOGGER.error for handle.close() failures is reasonable since close failures may indicate resource leaks. However, the current code already logs the exception, and changing severity is a minor improvement that doesn't fix a bug.

Low
Optimize assertion message construction

The assertion message construction concatenates BINDINGS.keySet() which can be
expensive when many queries are registered. Since this is an assertion that runs on
every upcall in test mode, consider caching the keySet or limiting the diagnostic
output to avoid performance overhead in assertion-enabled builds.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/indexfilter/FilterTreeCallbacks.java [118-126]

 private static void assertBindingExists(QueryBinding binding, String op, long contextId) {
     assert binding != null : "FilterTreeCallbacks."
         + op
         + ": no binding for contextId="
         + contextId
-        + " (registered: "
-        + BINDINGS.keySet()
+        + " (registered count: "
+        + BINDINGS.size()
         + ")";
 }
Suggestion importance[1-10]: 4

__

Why: The suggestion correctly identifies that BINDINGS.keySet() in the assertion message could be expensive, but the impact is limited since assertions are only enabled in tests. The proposed change to use BINDINGS.size() reduces diagnostic value. This is a minor optimization with limited benefit.

Low
Suggestions up to commit 6badb56
CategorySuggestion                                                                                                                                    Impact
General
Validate task earlier to prevent leaks

The validation occurs after backendContext is created but before delegation
resources are allocated. However, if task is null, the backendContext may already
hold resources that won't be cleaned up. Consider validating task earlier, before
creating backendContext, to avoid potential resource leaks.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/AnalyticsSearchService.java [330-332]

-if (task == null) {
+if (delegation != null && task == null) {
     throw new IllegalStateException("Filter delegation requires a tracked task for per-query isolation");
 }
 
+// ... existing backendContext creation code ...
+
+if (delegation != null) {
+    long contextId = task.getId();
+    // ... rest of delegation setup ...
+}
+
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly identifies that validating task before creating backendContext could prevent potential resource leaks. However, the impact is moderate since backendContext creation typically doesn't allocate significant resources that require cleanup, and the current code already validates before allocating delegation-specific resources.

Medium
Log warning for missing binding

When binding is null, returning -1 silently hides a potential bug where an FFM
upcall arrives after unregister was called. This could indicate a lifecycle ordering
issue (e.g., native code still holding references after Java cleanup). Consider
logging a warning when binding is missing to aid debugging of concurrency or cleanup
races.

sandbox/plugins/analytics-backend-datafusion/src/main/java/org/opensearch/be/datafusion/indexfilter/FilterTreeCallbacks.java [78-83]

 private static long trackStart(long contextId) {
     QueryBinding binding = BINDINGS.get(contextId);
-    if (binding == null) return -1;
+    if (binding == null) {
+        LOGGER.warn("trackStart called for unregistered contextId={}", contextId);
+        return -1;
+    }
     DelegationThreadTracker t = binding.tracker();
     return (t != null) ? t.trackStart() : -1;
 }
Suggestion importance[1-10]: 6

__

Why: Adding a warning log when binding is null could help debug lifecycle issues. However, this is a minor improvement since the existing error handling already returns -1, and the calling code logs errors when callbacks fail. The score is moderate because it's a defensive logging enhancement rather than a bug fix.

Low
Assert timeout after cleanup completes

If done.await times out and returns false, the test continues to unregister and stop
tracking, potentially masking the timeout failure. The assertion should fail the
test immediately if the timeout occurs, preventing cleanup code from running and
obscuring the root cause.

sandbox/plugins/analytics-backend-datafusion/src/test/java/org/opensearch/be/datafusion/indexfilter/DelegationTaskTrackingTests.java [246-251]

-assertTrue("All queries should complete within timeout", done.await(15, TimeUnit.SECONDS));
+boolean completed = done.await(15, TimeUnit.SECONDS);
 
 for (int q = 0; q < queryCount; q++) {
     FilterTreeCallbacks.unregister(contextIds[q]);
     trackingService.stopTracking(tasks[q]);
 }
 
+assertTrue("All queries should complete within timeout", completed);
+
Suggestion importance[1-10]: 5

__

Why: The suggestion correctly identifies that moving the assertion after cleanup would prevent masking timeout failures. However, the current code already fails immediately via assertTrue if the timeout occurs (line 246), so the issue described doesn't actually exist. The suggested change would delay the failure reporting but wouldn't change the test outcome.

Low
Ensure all cleanup runs despite exceptions

The comment explains that onClose must run after stream/engine close to allow FFM
release upcalls to find their binding. However, if onClose.run() throws an
exception, the remaining cleanup (rowIdVector, readerContext) won't execute. Wrap
the entire cleanup sequence in a try-finally or ensure all cleanup steps run even if
onClose fails.

sandbox/plugins/analytics-engine/src/main/java/org/opensearch/analytics/exec/FragmentResources.java [85-95]

 Exception first = closeQuietly(stream, null);
 first = closeQuietly(engine, first);
 first = closeQuietly(rowIdVector, first);
-if (onClose != null) {
-    try {
+try {
+    if (onClose != null) {
         onClose.run();
-    } catch (Exception e) {
-        if (first == null) first = e;
-        else first.addSuppressed(e);
+    }
+} catch (Exception e) {
+    if (first == null) first = e;
+    else first.addSuppressed(e);
+} finally {
+    // Ensure readerContext release always runs
+    if (readerContext != null) {
+        try {
+            readerContext.decRef();
+        } catch (Exception e) {
+            if (first == null) first = e;
+            else first.addSuppressed(e);
+        }
     }
 }
Suggestion importance[1-10]: 4

__

Why: The suggestion misreads the code structure. The readerContext cleanup is already outside the onClose try-catch block (lines 97-103 in the new hunk), so it will execute even if onClose throws. The suggested change would duplicate the existing cleanup logic.

Low

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for 65055f8: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@aravindsagar
Copy link
Copy Markdown
Contributor Author

gradle-check failing due to Maven throttling. For example,

> Could not GET 'https://repo.maven.apache.org/maven2/com/diffplug/durian/durian-core/1.2.0/durian-core-1.2.0.pom'. Received status code 429 from server: Too Many Requests

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit e028959

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for e028959: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

aravindsagar and others added 4 commits May 29, 2026 04:12
… FFM upcalls

FilterTreeCallbacks used global AtomicReference singletons for HANDLE and
TRACKER. Under concurrent indexed-path queries, these were overwritten by
the last query to enter startFragment, causing:
- Query failures: collectDocs routed to wrong query's Lucene handle -> -1
- Tracking mis-attribution: trackEnd routed to wrong task -> AssertionError

Fix: pass context_id (= OpenSearch task ID, already available in Rust from
QueryTrackingContext) as the first parameter of every FFM upcall. Java uses
it to look up the correct (handle, tracker) pair from a ConcurrentHashMap
keyed by contextId. Each query gets isolated bindings.

Additionally guards trackStart against same-thread double-tracking (the
original Slack-reported variant) by checking isThreadTrackedForTask before
calling taskExecutionStartedOnThread.

Signed-off-by: Aravind Sagar <sagarara@amazon.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The isThreadTrackedForTask check guarded against the same-thread
double-tracking scenario from the original Slack report. That scenario
is structurally impossible on current main: cpu_executor.spawn(...).await
in df_execute_with_context (commit 19b99d8) ensures all FFM upcalls
fire on datafusion-cpu workers, never on the runTask thread that
TaskAwareRunnable pre-tracked.

Removing the guard so the assertion will fire if a future Rust change
reintroduces the synchronous path — silent no-op would hide the bug.

Also reverts the public isThreadTrackedForTask method on
TaskResourceTrackingService that was added to support the guard.

Signed-off-by: Aravind Sagar <sagarara@amazon.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Three changes from code review:

1. Fix cleanup ordering in FragmentResources.close()
   The previous order ran onClose (which calls FilterTreeCallbacks.unregister)
   BEFORE closing the result stream. Closing the stream is what triggers Rust
   to drop ProviderHandle/FfmSegmentCollector, which fire releaseProvider/
   releaseCollector upcalls. With unregister already done, those upcalls found
   no binding and skipped the eager Lucene Weight/Scorer release.

   Reorder: close stream → engine → reader first, then run onClose. Release
   upcalls now find their binding and call handle.releaseProvider/Collector
   as intended.

2. Move null-task check before handle creation in AnalyticsSearchService
   Previously the IllegalStateException for null task fired AFTER
   getFilterDelegationHandle had already been called, leaking the handle.
   Move the check to the top of the delegation block so we never allocate
   resources we won't track.

3. Close FilterDelegationHandle in cleanup
   DataFusionAnalyticsBackendPlugin.configureFilterDelegation now returns a
   cleanup that both unregisters from FilterTreeCallbacks and closes the
   handle. Previously nothing closed the handle eagerly — its internal
   ConcurrentHashMaps relied on GC.

Also clarifies the testSharedContextIdCausesDataCorruption javadoc to
explain that each thread re-registers its own handle at the shared
contextId, simulating the old AtomicReference race.

Signed-off-by: Aravind Sagar <sagarara@amazon.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The header comment said "Four callback slots" but lists five (and the
code defines five). Pre-existing typo, easy to fix while in this area.

Signed-off-by: Aravind Sagar <sagarara@amazon.com>
@aravindsagar aravindsagar force-pushed the fix/filter-delegation-concurrency branch from e028959 to 6badb56 Compare May 29, 2026 04:41
@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for 6badb56: SUCCESS

@codecov
Copy link
Copy Markdown

codecov Bot commented May 29, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 74.10%. Comparing base (dad63c0) to head (c05f752).
⚠️ Report is 5 commits behind head on main.

Additional details and impacted files
@@             Coverage Diff              @@
##               main   #21845      +/-   ##
============================================
+ Coverage     73.51%   74.10%   +0.59%     
- Complexity    75582    76140     +558     
============================================
  Files          6034     6034              
  Lines        342661   342661              
  Branches      49294    49294              
============================================
+ Hits         251918   253942    +2024     
+ Misses        70712    69099    -1613     
+ Partials      20031    19620     -411     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Copy link
Copy Markdown
Contributor

@himshikhagupta himshikhagupta left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor comment, mostly LGTM

Copy link
Copy Markdown
Contributor

@Bukhtawar Bukhtawar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the fix. One suggestion: the current code silently returns -1 when a binding is missing, which makes contract violations invisible in testing. Adding assertions would catch lifecycle bugs (double-register, leaked bindings, premature unregister) during development while keeping production behavior safe.

aravindsagar and others added 2 commits May 31, 2026 05:09
Production keeps its silent fallbacks (return -1 / no-op) so a misuse
never crashes the JVM through an FFM upcall. With assertions enabled
(-ea, default in tests and ./gradlew run), the callbacks now also
fail loudly on lifecycle violations so they surface in development:

- register: asserts no prior binding exists for the contextId. Catches
  leaked bindings from earlier queries (missing unregister) and
  duplicate register calls.
- createProvider/createCollector/collectDocs/release*: assert a
  binding exists for the contextId. Catches premature unregister and
  stale Rust handles outliving their query.

The upcall methods catch Throwable but re-throw AssertionError so
the assertion isn't swallowed by the surrounding error-handling block.
In production (no -ea), these branches never execute.

Replaces testNoHandleReturnsNegativeOne / testReleaseWithNoHandleIsSafe
with testUnregisteredContextIdAsserts (now expects AssertionError).
Replaces testSharedContextIdCausesDataCorruption (probabilistic race
test) with testDoubleRegisterAsserts (deterministic — the assertion
catches the bug at the API boundary).

Signed-off-by: Aravind Sagar <sagarara@amazon.com>
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@aravindsagar
Copy link
Copy Markdown
Contributor Author

One suggestion: the current code silently returns -1 when a binding is missing, which makes contract violations invisible in testing. Adding assertions would catch lifecycle bugs (double-register, leaked bindings, premature unregister) during development while keeping production behavior safe.

Thanks Bukhtawar, added

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit c05f752

@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for c05f752: SUCCESS

Copy link
Copy Markdown
Contributor

@himshikhagupta himshikhagupta left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@aravindsagar
Copy link
Copy Markdown
Contributor Author

> Task :sandbox:qa:analytics-engine-coordinator:internalClusterTest

ShardFailoverIT > testQuerySucceedsAfterPrimaryNodeIsolated FAILED
    java.lang.NullPointerException: Cannot invoke "java.lang.Number.longValue()" because "java.util.List.get(int)[idx]" is null
        at __randomizedtesting.SeedInfo.seed([F42EACCE0A7FBDEB:8ED8E607B1353152]:0)
        at org.opensearch.analytics.resilience.ShardFailoverIT.testQuerySucceedsAfterPrimaryNodeIsolated(ShardFailoverIT.java:187)
REPRODUCE WITH: ./gradlew ':sandbox:qa:analytics-engine-coordinator:internalClusterTest' --tests 'org.opensearch.analytics.resilience.ShardFailoverIT.testQuerySucceedsAfterPrimaryNodeIsolated' -Dtests.seed=F42EACCE0A7FBDEB -Dtests.security.manager=true -Dtests.jvm.argline="-XX:TieredStopAtLevel=1 -XX:ReservedCodeCacheSize=64m" -Dtests.locale=en-VG -Dtests.timezone=America/Anchorage -Druntime.java=25

Not able to reproduce, the same test with the same seed succeeds locally. Seems like a flaky test

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit c05f752

@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for c05f752: SUCCESS

@aravindsagar
Copy link
Copy Markdown
Contributor Author

Execution failed for task ':sandbox:plugins:composite-engine:internalClusterTest'.
> Test process encountered an unexpected problem.
   > class org.gradle.api.internal.tasks.testing.LifecycleTrackingTestEventReporter cannot be cast to class org.gradle.api.internal.tasks.testing.GroupTestEventReporterInternal (org.gradle.api.internal.tasks.testing.LifecycleTrackingTestEventReporter and org.gradle.api.internal.tasks.testing.GroupTestEventReporterInternal are in unnamed module of loader org.gradle.initialization.MixInLegacyTypesClassLoader @50ad3bc1)

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit c05f752

@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for c05f752: SUCCESS

@aravindsagar
Copy link
Copy Markdown
Contributor Author

REPRODUCE WITH: ./gradlew ':sandbox:plugins:analytics-backend-datafusion:test' --tests 'org.opensearch.be.datafusion.DatafusionReduceSinkTests.testCancelAfterFirstBatchUnwindsDrain' -Dtests.seed=C8895E83BA26F287 -Dtests.security.manager=true -Dtests.jvm.argline="-XX:TieredStopAtLevel=1 -XX:ReservedCodeCacheSize=64m" -Dtests.locale=zu-Latn-ZA -Dtests.timezone=Australia/Melbourne -Druntime.java=25
> Task :sandbox:plugins:analytics-backend-datafusion:test

flaky test again, not failing in local

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit c05f752

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for c05f752: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit c05f752

@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for c05f752: SUCCESS

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants