Fix filter delegation concurrency bug by threading context_id through FFM upcalls#21845
Fix filter delegation concurrency bug by threading context_id through FFM upcalls#21845aravindsagar wants to merge 6 commits into
Conversation
PR Code Analyzer ❗AI-powered 'Code-Diff-Analyzer' found issues on commit d2def46.
The table above displays the top 10 most important findings. Pull Requests Author(s): Please update your Pull Request according to the report above. Repository Maintainer(s): You can Thanks. |
PR Reviewer Guide 🔍(Review updated until commit c05f752)Here are some key observations to aid the review process:
|
PR Code Suggestions ✨Latest suggestions up to c05f752 Explore these optional code suggestions:
Previous suggestionsSuggestions up to commit c05f752
Suggestions up to commit c05f752
Suggestions up to commit c05f752
Suggestions up to commit c05f752
Suggestions up to commit 6badb56
|
|
❌ Gradle check result for 65055f8: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
|
gradle-check failing due to Maven throttling. For example, |
|
Persistent review updated to latest commit e028959 |
|
❌ Gradle check result for e028959: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
… FFM upcalls FilterTreeCallbacks used global AtomicReference singletons for HANDLE and TRACKER. Under concurrent indexed-path queries, these were overwritten by the last query to enter startFragment, causing: - Query failures: collectDocs routed to wrong query's Lucene handle -> -1 - Tracking mis-attribution: trackEnd routed to wrong task -> AssertionError Fix: pass context_id (= OpenSearch task ID, already available in Rust from QueryTrackingContext) as the first parameter of every FFM upcall. Java uses it to look up the correct (handle, tracker) pair from a ConcurrentHashMap keyed by contextId. Each query gets isolated bindings. Additionally guards trackStart against same-thread double-tracking (the original Slack-reported variant) by checking isThreadTrackedForTask before calling taskExecutionStartedOnThread. Signed-off-by: Aravind Sagar <sagarara@amazon.com> Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The isThreadTrackedForTask check guarded against the same-thread double-tracking scenario from the original Slack report. That scenario is structurally impossible on current main: cpu_executor.spawn(...).await in df_execute_with_context (commit 19b99d8) ensures all FFM upcalls fire on datafusion-cpu workers, never on the runTask thread that TaskAwareRunnable pre-tracked. Removing the guard so the assertion will fire if a future Rust change reintroduces the synchronous path — silent no-op would hide the bug. Also reverts the public isThreadTrackedForTask method on TaskResourceTrackingService that was added to support the guard. Signed-off-by: Aravind Sagar <sagarara@amazon.com> Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Three changes from code review: 1. Fix cleanup ordering in FragmentResources.close() The previous order ran onClose (which calls FilterTreeCallbacks.unregister) BEFORE closing the result stream. Closing the stream is what triggers Rust to drop ProviderHandle/FfmSegmentCollector, which fire releaseProvider/ releaseCollector upcalls. With unregister already done, those upcalls found no binding and skipped the eager Lucene Weight/Scorer release. Reorder: close stream → engine → reader first, then run onClose. Release upcalls now find their binding and call handle.releaseProvider/Collector as intended. 2. Move null-task check before handle creation in AnalyticsSearchService Previously the IllegalStateException for null task fired AFTER getFilterDelegationHandle had already been called, leaking the handle. Move the check to the top of the delegation block so we never allocate resources we won't track. 3. Close FilterDelegationHandle in cleanup DataFusionAnalyticsBackendPlugin.configureFilterDelegation now returns a cleanup that both unregisters from FilterTreeCallbacks and closes the handle. Previously nothing closed the handle eagerly — its internal ConcurrentHashMaps relied on GC. Also clarifies the testSharedContextIdCausesDataCorruption javadoc to explain that each thread re-registers its own handle at the shared contextId, simulating the old AtomicReference race. Signed-off-by: Aravind Sagar <sagarara@amazon.com> Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The header comment said "Four callback slots" but lists five (and the code defines five). Pre-existing typo, easy to fix while in this area. Signed-off-by: Aravind Sagar <sagarara@amazon.com>
e028959 to
6badb56
Compare
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #21845 +/- ##
============================================
+ Coverage 73.51% 74.10% +0.59%
- Complexity 75582 76140 +558
============================================
Files 6034 6034
Lines 342661 342661
Branches 49294 49294
============================================
+ Hits 251918 253942 +2024
+ Misses 70712 69099 -1613
+ Partials 20031 19620 -411 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
himshikhagupta
left a comment
There was a problem hiding this comment.
Minor comment, mostly LGTM
Bukhtawar
left a comment
There was a problem hiding this comment.
Thanks for the fix. One suggestion: the current code silently returns -1 when a binding is missing, which makes contract violations invisible in testing. Adding assertions would catch lifecycle bugs (double-register, leaked bindings, premature unregister) during development while keeping production behavior safe.
Production keeps its silent fallbacks (return -1 / no-op) so a misuse never crashes the JVM through an FFM upcall. With assertions enabled (-ea, default in tests and ./gradlew run), the callbacks now also fail loudly on lifecycle violations so they surface in development: - register: asserts no prior binding exists for the contextId. Catches leaked bindings from earlier queries (missing unregister) and duplicate register calls. - createProvider/createCollector/collectDocs/release*: assert a binding exists for the contextId. Catches premature unregister and stale Rust handles outliving their query. The upcall methods catch Throwable but re-throw AssertionError so the assertion isn't swallowed by the surrounding error-handling block. In production (no -ea), these branches never execute. Replaces testNoHandleReturnsNegativeOne / testReleaseWithNoHandleIsSafe with testUnregisteredContextIdAsserts (now expects AssertionError). Replaces testSharedContextIdCausesDataCorruption (probabilistic race test) with testDoubleRegisterAsserts (deterministic — the assertion catches the bug at the API boundary). Signed-off-by: Aravind Sagar <sagarara@amazon.com> Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Thanks Bukhtawar, added |
|
Persistent review updated to latest commit c05f752 |
Not able to reproduce, the same test with the same seed succeeds locally. Seems like a flaky test |
|
Persistent review updated to latest commit c05f752 |
|
|
Persistent review updated to latest commit c05f752 |
flaky test again, not failing in local |
|
Persistent review updated to latest commit c05f752 |
|
❌ Gradle check result for c05f752: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
|
Persistent review updated to latest commit c05f752 |
Description
FilterTreeCallbacks used global AtomicReference singletons for HANDLE and TRACKER. Under concurrent indexed-path queries, these were overwritten by the last query to enter startFragment, causing:
Fix: pass context_id (= OpenSearch task ID, already available in Rust from QueryTrackingContext) as the first parameter of every FFM upcall. Java uses it to look up the correct (handle, tracker) pair from a ConcurrentHashMap keyed by contextId. Each query gets isolated bindings.
Additionally guards trackStart against same-thread double-tracking (the original Slack-reported variant) by checking isThreadTrackedForTask before calling taskExecutionStartedOnThread.
Related Issues
Resolves #[Issue number to be closed when this PR is merged]
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.