Skip to content

feat: Wire up native cancellation stats from Rust to Java#21802

Open
aravindsagar wants to merge 11 commits into
opensearch-project:mainfrom
aravindsagar:stats/cancellation-wiring
Open

feat: Wire up native cancellation stats from Rust to Java#21802
aravindsagar wants to merge 11 commits into
opensearch-project:mainfrom
aravindsagar:stats/cancellation-wiring

Conversation

@aravindsagar
Copy link
Copy Markdown
Contributor

Description

Implement the cancellation stats counters in the Rust native layer:

  • Add QueryType enum (Shard/Coordinator) to distinguish query origins
  • Record cancelled_at timestamp when cancel_query() fires
  • Compute current_count_post_cancel via live registry scan
  • Increment total_count_post_cancel on Drop when past threshold
  • Make threshold configurable via df_set_cancel_stats_threshold_ms
  • Thread QueryType through all QueryTrackingContext call sites

Related Issues

Resolves #[Issue number to be closed when this PR is merged]

Check List

  • Functionality includes testing.
  • API changes companion pull request created, if applicable.
  • Public documentation issue/PR created, if applicable.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Implement the cancellation stats counters in the Rust native layer:
- Add QueryType enum (Shard/Coordinator) to distinguish query origins
- Record cancelled_at timestamp when cancel_query() fires
- Compute current_count_post_cancel via live registry scan
- Increment total_count_post_cancel on Drop when past threshold
- Make threshold configurable via df_set_cancel_stats_threshold_ms
- Thread QueryType through all QueryTrackingContext call sites

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

Signed-off-by: Aravind Sagar <sagarara@amazon.com>
@aravindsagar aravindsagar requested a review from a team as a code owner May 22, 2026 07:33
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 22, 2026

PR Reviewer Guide 🔍

(Review updated until commit 5573fe8)

Here are some key observations to aid the review process:

🧪 PR contains tests
🔒 No security concerns identified
✅ No TODO sections
🔀 No multiple PR themes
⚡ Recommended focus areas for review

Possible Issue

The coordinator_total calculation adds coordinator_current to the total counter value, which double-counts queries still running. A query that is currently running and past the threshold will appear in both coordinator_current (from the registry scan) and will have already incremented NATIVE_SEARCH_TASK_TOTAL if it dropped earlier. The comment claims this prevents double-counting, but the addition coordinator_total + coordinator_current at line 80 creates the opposite problem: queries that are still running get counted twice in the total.

let coordinator_total = NATIVE_SEARCH_TASK_TOTAL.load(Ordering::Relaxed);
let shard_total = NATIVE_SEARCH_SHARD_TASK_TOTAL.load(Ordering::Relaxed);
let (shard_current, coordinator_current) =
    query_tracker::count_cancelled_running(query_tracker::cancel_stats_threshold());
let vals: [i64; 4] = [
    coordinator_current,
    coordinator_total + coordinator_current,
    shard_current,
    shard_total + shard_current,
Race Condition

The count_cancelled_running function reads cancelled_at_nanos with Acquire ordering but the timestamp is set with Release ordering in cancel_query. However, there is a window between when cancelled_at_nanos is set (line 347) and when the query actually starts being counted as "running past threshold". If a query completes immediately after cancellation but before the threshold elapses, it will increment the total counter in Drop (lines 498-510) even though it never appeared in any count_cancelled_running scan. This creates a mismatch between current and total counts.

pub fn count_cancelled_running(threshold: Duration) -> (i64, i64) {
    let mut shard_count: i64 = 0;
    let mut coordinator_count: i64 = 0;
    let threshold_nanos = threshold.as_nanos() as u64;
    let now_nanos = PROCESS_START.elapsed().as_nanos() as u64;
    for entry in QUERY_REGISTRY.iter() {
        let tracker = entry.value();
        let cancelled_nanos = tracker.cancelled_at_nanos.load(Ordering::Acquire);
        if cancelled_nanos > 0 && (now_nanos - cancelled_nanos) >= threshold_nanos {
            match tracker.query_type {
                QueryType::Shard => shard_count += 1,
                QueryType::Coordinator => coordinator_count += 1,
            }
        }
    }
    (shard_count, coordinator_count)
}

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 22, 2026

PR Code Suggestions ✨

Latest suggestions up to 5573fe8

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
General
Use acquire ordering for stats consistency

Reading totals before scanning the registry prevents double-counting dropped
queries, but a query completing between the total read and registry scan will be
undercounted (missing from both). Use Ordering::Acquire for total loads to ensure
visibility of concurrent increments from dropping queries.

sandbox/plugins/analytics-backend-datafusion/rust/src/native_node_stats.rs [74-77]

-let coordinator_total = NATIVE_SEARCH_TASK_TOTAL.load(Ordering::Relaxed);
-let shard_total = NATIVE_SEARCH_SHARD_TASK_TOTAL.load(Ordering::Relaxed);
+let coordinator_total = NATIVE_SEARCH_TASK_TOTAL.load(Ordering::Acquire);
+let shard_total = NATIVE_SEARCH_SHARD_TASK_TOTAL.load(Ordering::Acquire);
 let (shard_current, coordinator_current) =
     query_tracker::count_cancelled_running(query_tracker::cancel_stats_threshold());
Suggestion importance[1-10]: 5

__

Why: The suggestion correctly identifies a potential visibility issue, but the existing comment at lines 68-73 explicitly acknowledges this transient undercount as acceptable ("worst case: transient undercount by 1, self-corrects next read"). Using Acquire ordering would improve consistency slightly but contradicts the documented relaxed ordering design.

Low
Prevent timestamp overflow in cancellation tracking

The as u64 cast can silently truncate if elapsed().as_nanos() exceeds u64::MAX
(approximately 584 years). For long-running processes, consider using saturating_as
or explicitly handling overflow to prevent timestamp wraparound that could corrupt
cancellation duration calculations.

sandbox/plugins/analytics-backend-datafusion/rust/src/query_tracker.rs [346-347]

-let nanos = PROCESS_START.elapsed().as_nanos() as u64;
+let nanos = PROCESS_START.elapsed().as_nanos().min(u64::MAX as u128) as u64;
 tracker.cancelled_at_nanos.compare_exchange(0, nanos, Ordering::Release, Ordering::Relaxed).ok();
Suggestion importance[1-10]: 3

__

Why: While theoretically correct, the overflow scenario (584+ years) is practically impossible. The suggestion adds complexity for an edge case that won't occur in production. The min() check adds unnecessary overhead to every cancellation.

Low
Prevent underflow in cancellation duration calculation

Subtracting cancelled_nanos from PROCESS_START.elapsed().as_nanos() as u64 can
underflow if the cast truncates a large elapsed time. This would produce a massive
elapsed_since_cancel value, incorrectly triggering the threshold check. Use
saturating arithmetic or validate the cast result before subtraction.

sandbox/plugins/analytics-backend-datafusion/rust/src/query_tracker.rs [500-501]

-let elapsed_since_cancel = PROCESS_START.elapsed().as_nanos() as u64 - cancelled_nanos;
+let now_nanos = PROCESS_START.elapsed().as_nanos().min(u64::MAX as u128) as u64;
+let elapsed_since_cancel = now_nanos.saturating_sub(cancelled_nanos);
 if elapsed_since_cancel >= cancel_stats_threshold().as_nanos() as u64 {
Suggestion importance[1-10]: 3

__

Why: Similar to suggestion 1, this addresses a theoretical overflow after 584+ years. The saturating_sub is reasonable defensive programming, but the overflow scenario is unrealistic. The suggestion adds minimal value for production code.

Low

Previous suggestions

Suggestions up to commit 5573fe8
CategorySuggestion                                                                                                                                    Impact
General
Prevent overflow in stats aggregation

Adding coordinator_current and shard_current to their respective totals can overflow
if both values are near i64::MAX. This would wrap to negative values, corrupting the
stats output. Use saturating_add to prevent overflow.

sandbox/plugins/analytics-backend-datafusion/rust/src/native_node_stats.rs [74-82]

 let coordinator_total = NATIVE_SEARCH_TASK_TOTAL.load(Ordering::Relaxed);
 let shard_total = NATIVE_SEARCH_SHARD_TASK_TOTAL.load(Ordering::Relaxed);
 let (shard_current, coordinator_current) =
     query_tracker::count_cancelled_running(query_tracker::cancel_stats_threshold());
 let vals: [i64; 4] = [
     coordinator_current,
-    coordinator_total + coordinator_current,
+    coordinator_total.saturating_add(coordinator_current),
     shard_current,
-    shard_total + shard_current,
+    shard_total.saturating_add(shard_current),
 ];
Suggestion importance[1-10]: 5

__

Why: Adding current to total could theoretically overflow if both approach i64::MAX, though this requires billions of cancelled queries. Using saturating_add is a low-cost defensive measure that prevents potential negative values in stats output, improving correctness.

Low
Prevent underflow in elapsed time calculation

Subtracting cancelled_nanos from PROCESS_START.elapsed().as_nanos() as u64 can
underflow if the cast truncates to a value smaller than cancelled_nanos. This would
wrap around to a very large u64 value, incorrectly triggering the threshold check.
Validate before subtraction.

sandbox/plugins/analytics-backend-datafusion/rust/src/query_tracker.rs [500]

-let elapsed_since_cancel = PROCESS_START.elapsed().as_nanos() as u64 - cancelled_nanos;
+let now_nanos = PROCESS_START.elapsed().as_nanos().min(u64::MAX as u128) as u64;
+let elapsed_since_cancel = now_nanos.saturating_sub(cancelled_nanos);
Suggestion importance[1-10]: 4

__

Why: The underflow concern is valid only if the cast truncates, which requires 584+ years of uptime. Using saturating_sub is a reasonable defensive practice with minimal cost, though the scenario is extremely unlikely. The suggestion improves robustness slightly.

Low
Prevent timestamp truncation overflow

The as u64 cast can silently truncate if elapsed().as_nanos() exceeds u64::MAX
(approximately 584 years). For long-running processes, this could cause incorrect
timestamp calculations. Use saturating_as or validate the range before casting.

sandbox/plugins/analytics-backend-datafusion/rust/src/query_tracker.rs [346-347]

-let nanos = PROCESS_START.elapsed().as_nanos() as u64;
+let nanos = PROCESS_START.elapsed().as_nanos().min(u64::MAX as u128) as u64;
 tracker.cancelled_at_nanos.compare_exchange(0, nanos, Ordering::Release, Ordering::Relaxed).ok();
Suggestion importance[1-10]: 3

__

Why: While theoretically correct, the overflow scenario (584+ years of uptime) is practically impossible. The suggestion adds complexity for an edge case that will never occur in production. The min() check has a small performance cost for no real benefit.

Low
Suggestions up to commit b44fe1c
CategorySuggestion                                                                                                                                    Impact
General
Handle clock adjustments in elapsed check

The threshold check uses elapsed() which can panic if the system clock moves
backward. Wrap the elapsed call in a check or use checked_duration_since to handle
clock adjustments gracefully and prevent potential panics in production.

sandbox/plugins/analytics-backend-datafusion/rust/src/query_tracker.rs [499-510]

 if let Some(cancelled_at) = *tracker.cancelled_at.lock() {
-    if cancelled_at.elapsed() >= cancel_stats_threshold() {
-        match tracker.query_type {
-            QueryType::Shard => {
-                crate::native_node_stats::inc_native_search_shard_task_total();
-            }
-            QueryType::Coordinator => {
-                crate::native_node_stats::inc_native_search_task_total();
+    if let Some(elapsed) = Instant::now().checked_duration_since(cancelled_at) {
+        if elapsed >= cancel_stats_threshold() {
+            match tracker.query_type {
+                QueryType::Shard => {
+                    crate::native_node_stats::inc_native_search_shard_task_total();
+                }
+                QueryType::Coordinator => {
+                    crate::native_node_stats::inc_native_search_task_total();
+                }
             }
         }
     }
 }
Suggestion importance[1-10]: 7

__

Why: Good catch on potential panic from elapsed() if system clock moves backward. Using checked_duration_since is a safer approach that handles clock adjustments gracefully. This is a real edge case that could cause production issues, though rare.

Medium
Prevent undercounting with blocking lock

Using try_lock silently skips contended entries, leading to undercounting when
cancel_query runs concurrently. Consider using a blocking lock or documenting that
the count is a lower bound estimate to avoid misleading stats during high
cancellation activity.

sandbox/plugins/analytics-backend-datafusion/rust/src/query_tracker.rs [362-384]

 pub fn count_cancelled_running(threshold: Duration) -> (i64, i64) {
     let mut shard_count: i64 = 0;
     let mut coordinator_count: i64 = 0;
     for entry in QUERY_REGISTRY.iter() {
         let tracker = entry.value();
-        if let Some(guard) = tracker.cancelled_at.try_lock() {
-            if let Some(cancelled_at) = *guard {
-                if cancelled_at.elapsed() >= threshold {
-                    match tracker.query_type {
-                        QueryType::Shard => shard_count += 1,
-                        QueryType::Coordinator => coordinator_count += 1,
-                    }
+        // Blocking lock ensures accurate count; brief contention is acceptable
+        // for stats collection (not on hot path).
+        let guard = tracker.cancelled_at.lock();
+        if let Some(cancelled_at) = *guard {
+            if cancelled_at.elapsed() >= threshold {
+                match tracker.query_type {
+                    QueryType::Shard => shard_count += 1,
+                    QueryType::Coordinator => coordinator_count += 1,
                 }
             }
         }
     }
     (shard_count, coordinator_count)
 }
Suggestion importance[1-10]: 5

__

Why: Valid concern about try_lock causing undercounting during contention. However, the current design explicitly uses try_lock to avoid blocking during iteration (as noted in the comment). Switching to blocking locks could cause performance issues. The suggestion is reasonable but trades off accuracy for potential latency.

Low
Use stronger memory ordering for totals

The ordering comment claims reading totals first prevents double-counting, but a
query can still drop between the total read and the registry scan, causing it to be
counted in the scan but not in the total (undercount). Consider using
Ordering::Acquire for the total loads to ensure visibility of concurrent drops.

sandbox/plugins/analytics-backend-datafusion/rust/src/native_node_stats.rs [74-77]

-let coordinator_total = NATIVE_SEARCH_TASK_TOTAL.load(Ordering::Relaxed);
-let shard_total = NATIVE_SEARCH_SHARD_TASK_TOTAL.load(Ordering::Relaxed);
+let coordinator_total = NATIVE_SEARCH_TASK_TOTAL.load(Ordering::Acquire);
+let shard_total = NATIVE_SEARCH_SHARD_TASK_TOTAL.load(Ordering::Acquire);
 let (shard_current, coordinator_current) =
     query_tracker::count_cancelled_running(query_tracker::cancel_stats_threshold());
Suggestion importance[1-10]: 3

__

Why: The suggestion to use Ordering::Acquire is technically more conservative, but the existing Ordering::Relaxed is sufficient here since the comment already acknowledges transient undercounting is acceptable. The ordering doesn't prevent the race condition described, and Acquire doesn't provide meaningful benefits for independent counters.

Low
Suggestions up to commit 6bfb2c1
CategorySuggestion                                                                                                                                    Impact
General
Cache threshold before stats snapshot

The comment explains reading totals first prevents double-counting, but the code
reads totals, then calls count_cancelled_running which internally reads the
threshold again. If the threshold changes between these operations, the current
count may use a different threshold than intended, causing inconsistent stats
snapshots.

sandbox/plugins/analytics-backend-datafusion/rust/src/native_node_stats.rs [74-77]

+let threshold = query_tracker::cancel_stats_threshold();
 let coordinator_total = NATIVE_SEARCH_TASK_TOTAL.load(Ordering::Relaxed);
 let shard_total = NATIVE_SEARCH_SHARD_TASK_TOTAL.load(Ordering::Relaxed);
 let (shard_current, coordinator_current) =
-    query_tracker::count_cancelled_running(query_tracker::cancel_stats_threshold());
+    query_tracker::count_cancelled_running(threshold);
Suggestion importance[1-10]: 7

__

Why: Valid concern about threshold consistency during snapshot. Caching the threshold ensures count_cancelled_running uses the same threshold value that was intended for the snapshot, preventing inconsistency if the threshold changes mid-operation. However, the impact is limited since threshold changes are rare (primarily for testing).

Medium
Use blocking lock for accurate counts

Using try_lock silently skips contended entries, causing undercounting when
cancel_query runs concurrently. Since cancelled_at is only written once (on first
cancel) and reads are cheap, use a blocking lock to ensure accurate counts. The
brief lock hold time makes contention unlikely.

sandbox/plugins/analytics-backend-datafusion/rust/src/query_tracker.rs [362-384]

 pub fn count_cancelled_running(threshold: Duration) -> (i64, i64) {
     let mut shard_count: i64 = 0;
     let mut coordinator_count: i64 = 0;
     for entry in QUERY_REGISTRY.iter() {
         let tracker = entry.value();
-        if let Some(guard) = tracker.cancelled_at.try_lock() {
-            if let Some(cancelled_at) = *guard {
-                if cancelled_at.elapsed() >= threshold {
-                    match tracker.query_type {
-                        QueryType::Shard => shard_count += 1,
-                        QueryType::Coordinator => coordinator_count += 1,
-                    }
+        if let Some(cancelled_at) = *tracker.cancelled_at.lock() {
+            if cancelled_at.elapsed() >= threshold {
+                match tracker.query_type {
+                    QueryType::Shard => shard_count += 1,
+                    QueryType::Coordinator => coordinator_count += 1,
                 }
             }
         }
     }
     (shard_count, coordinator_count)
 }
Suggestion importance[1-10]: 4

__

Why: The suggestion correctly identifies that try_lock can cause undercounting during contention. However, the PR's design explicitly chose try_lock to avoid blocking during DashMap iteration (as noted in the comment "Uses try_lock... to avoid holding a lock during DashMap iteration"). The conservative undercount is acceptable for stats that self-correct on the next read, making this a design trade-off rather than a bug.

Low
Suggestions up to commit 65ee3dd
CategorySuggestion                                                                                                                                    Impact
General
Capture threshold at cancellation time

The threshold check uses cancel_stats_threshold() which can change at runtime via
FFI. A query cancelled just before threshold change could be counted or skipped
inconsistently. Capture the threshold value at cancellation time in QueryTracker to
ensure consistent evaluation at drop time.

sandbox/plugins/analytics-backend-datafusion/rust/src/query_tracker.rs [499-510]

 if let Some(cancelled_at) = *tracker.cancelled_at.lock() {
-    if cancelled_at.elapsed() >= cancel_stats_threshold() {
+    let threshold_at_cancel = tracker.cancel_threshold_at_time;
+    if cancelled_at.elapsed() >= threshold_at_cancel {
         match tracker.query_type {
             QueryType::Shard => {
                 crate::native_node_stats::inc_native_search_shard_task_total();
             }
             QueryType::Coordinator => {
                 crate::native_node_stats::inc_native_search_task_total();
             }
         }
     }
 }
Suggestion importance[1-10]: 6

__

Why: This identifies a legitimate edge case where runtime threshold changes could cause inconsistent counting. However, the improved_code references a non-existent field cancel_threshold_at_time that would need to be added to QueryTracker. The suggestion is valid but incomplete, and threshold changes during query execution are likely rare in practice.

Low
Replace try_lock with blocking lock

Using try_lock silently skips contended entries, leading to undercounting when
cancel_query runs concurrently. For stats accuracy, consider using a blocking lock
or an atomic flag to track cancellation state without requiring a mutex during
iteration.

sandbox/plugins/analytics-backend-datafusion/rust/src/query_tracker.rs [362-384]

 pub fn count_cancelled_running(threshold: Duration) -> (i64, i64) {
     let mut shard_count: i64 = 0;
     let mut coordinator_count: i64 = 0;
     for entry in QUERY_REGISTRY.iter() {
         let tracker = entry.value();
-        if let Some(guard) = tracker.cancelled_at.try_lock() {
-            if let Some(cancelled_at) = *guard {
-                if cancelled_at.elapsed() >= threshold {
-                    match tracker.query_type {
-                        QueryType::Shard => shard_count += 1,
-                        QueryType::Coordinator => coordinator_count += 1,
-                    }
+        let guard = tracker.cancelled_at.lock();
+        if let Some(cancelled_at) = *guard {
+            if cancelled_at.elapsed() >= threshold {
+                match tracker.query_type {
+                    QueryType::Shard => shard_count += 1,
+                    QueryType::Coordinator => coordinator_count += 1,
                 }
             }
         }
     }
     (shard_count, coordinator_count)
 }
Suggestion importance[1-10]: 4

__

Why: The suggestion addresses a real concern about undercounting during contention. However, the PR explicitly chose try_lock to avoid blocking during DashMap iteration (as stated in the comment). Blocking locks could cause performance issues. The conservative undercount is an acceptable trade-off for non-blocking iteration.

Low
Use stronger memory ordering for totals

The comment explains that reading totals first prevents double-counting, but a query
dropping between the total read and registry scan could still be missed entirely
(not in total yet, not in scan). Consider using Ordering::Acquire for the total
loads to ensure visibility of concurrent updates, or document this transient
undercount behavior.

sandbox/plugins/analytics-backend-datafusion/rust/src/native_node_stats.rs [74-77]

-let coordinator_total = NATIVE_SEARCH_TASK_TOTAL.load(Ordering::Relaxed);
-let shard_total = NATIVE_SEARCH_SHARD_TASK_TOTAL.load(Ordering::Relaxed);
+let coordinator_total = NATIVE_SEARCH_TASK_TOTAL.load(Ordering::Acquire);
+let shard_total = NATIVE_SEARCH_SHARD_TASK_TOTAL.load(Ordering::Acquire);
 let (shard_current, coordinator_current) =
     query_tracker::count_cancelled_running(query_tracker::cancel_stats_threshold());
Suggestion importance[1-10]: 3

__

Why: While Ordering::Acquire provides stronger guarantees, the comment already acknowledges transient undercounting is acceptable. The current Relaxed ordering is sufficient for independent counters where exact synchronization isn't required. The suggestion is technically correct but offers minimal practical benefit.

Low
Suggestions up to commit ae384b9
CategorySuggestion                                                                                                                                    Impact
Possible issue
Fix double-counting in stats calculation

Adding coordinator_current to coordinator_total double-counts queries that are both
in the registry scan and already incremented the total counter. The comment claims
this prevents double-counting, but the addition creates it. Return totals directly
without adding current counts.

sandbox/plugins/analytics-backend-datafusion/rust/src/native_node_stats.rs [74-83]

 let coordinator_total = NATIVE_SEARCH_TASK_TOTAL.load(Ordering::Relaxed);
 let shard_total = NATIVE_SEARCH_SHARD_TASK_TOTAL.load(Ordering::Relaxed);
 let (shard_current, coordinator_current) =
     query_tracker::count_cancelled_running(query_tracker::cancel_stats_threshold());
 let vals: [i64; 4] = [
     coordinator_current,
-    coordinator_total + coordinator_current,
+    coordinator_total,
     shard_current,
-    shard_total + shard_current,
+    shard_total,
 ];
Suggestion importance[1-10]: 10

__

Why: This identifies a critical logic error. The code adds coordinator_current to coordinator_total (and same for shard), which double-counts queries currently running past the threshold. The comment claims the ordering prevents double-counting, but the addition itself creates it. The total counters should be returned directly without adding current counts.

High
General
Handle lock contention in count function

The function silently skips entries when try_lock() fails, leading to undercounting
during concurrent cancellations. Consider logging when locks are contended or using
a lock-free approach (e.g., AtomicU64 for cancelled_at) to ensure accurate counts
even under contention.

sandbox/plugins/analytics-backend-datafusion/rust/src/query_tracker.rs [225-242]

 pub fn count_cancelled_running(threshold: Duration) -> (i64, i64) {
     let mut shard_count: i64 = 0;
     let mut coordinator_count: i64 = 0;
     for entry in QUERY_REGISTRY.iter() {
         let tracker = entry.value();
         if let Some(guard) = tracker.cancelled_at.try_lock() {
             if let Some(cancelled_at) = *guard {
                 if cancelled_at.elapsed() >= threshold {
                     match tracker.query_type {
                         QueryType::Shard => shard_count += 1,
                         QueryType::Coordinator => coordinator_count += 1,
                     }
                 }
             }
+        } else {
+            log::trace!("Lock contention on cancelled_at for ctx={}", tracker.context_id);
         }
     }
     (shard_count, coordinator_count)
 }
Suggestion importance[1-10]: 4

__

Why: The suggestion correctly identifies that try_lock() failures lead to undercounting, but the PR's comment already acknowledges this as acceptable ("conservative undercount"). Adding trace logging is a minor improvement for debugging, but doesn't change the fundamental design choice. The lock-free alternative mentioned would require significant refactoring.

Low

…ion stats

- Read total counters BEFORE scanning registry in df_native_node_stats.
  Prevents a query that drops mid-read from appearing in both the scan
  (current) and the incremented total counter simultaneously.

- Use try_lock instead of lock in count_cancelled_running to avoid
  holding a Mutex during DashMap iteration. If contended with a
  concurrent cancel_query call, the entry is skipped (conservative
  undercount for that instant) rather than risking lock ordering issues.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

Signed-off-by: Aravind Sagar <sagarara@amazon.com>
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 78ef24e

Move the cancelled_at timestamp write after cancellation_token.cancel()
so the query is confirmed cancelled before being marked for stats.
Avoids relying on DashMap locking as an implicit ordering guarantee.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

Signed-off-by: Aravind Sagar <sagarara@amazon.com>
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit f9e96ae

@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for f9e96ae: SUCCESS

@codecov
Copy link
Copy Markdown

codecov Bot commented May 22, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 73.35%. Comparing base (dad63c0) to head (5573fe8).
⚠️ Report is 5 commits behind head on main.

Additional details and impacted files
@@             Coverage Diff              @@
##               main   #21802      +/-   ##
============================================
- Coverage     73.51%   73.35%   -0.17%     
+ Complexity    75582    75455     -127     
============================================
  Files          6034     6034              
  Lines        342661   342661              
  Branches      49294    49294              
============================================
- Hits         251918   251353     -565     
- Misses        70712    71283     +571     
+ Partials      20031    20025       -6     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Signed-off-by: Aravind Sagar <sagarara@amazon.com>
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit ae384b9

@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for ae384b9: SUCCESS

Resolve conflicts:
- ffm.rs: keep both df_set_cancel_stats_threshold_ms and df_query_registry_top_n_by_current
- query_tracker.rs: add QueryType::Shard to new top-N snapshot tests

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

Signed-off-by: Aravind Sagar <sagarara@amazon.com>
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 65ee3dd

@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for 65ee3dd: SUCCESS

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 6bfb2c1

@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for 6bfb2c1: SUCCESS

Comment on lines +855 to +860
/**
* Sets the cancellation stats threshold in milliseconds.
* Queries cancelled for less than this duration are not counted in stats.
* Primarily for testing — production uses the default (10 000 ms).
*/
public static void setCancelStatsThresholdMs(long millis) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should this setting be configurable?

Copy link
Copy Markdown
Contributor

@himshikhagupta himshikhagupta left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Comment on lines +41 to +44
/// Data-node shard fragment execution (AnalyticsShardTask on the Java side).
Shard,
/// Coordinator-side local reduce execution (AnalyticsQueryTask on the Java side).
Coordinator,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename to fragment execution/reduce for consistency

/// CPU task abort handle, set after the stream is created.
pub abort_handle: OnceLock<AbortHandle>,
/// Instant when cancellation was signalled, or None if not cancelled.
pub cancelled_at: Mutex<Option<Instant>>,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we replace it with AtomicU64 and CAS to simplify locks

 let nanos = PROCESS_START.elapsed().as_nanos() as u64;
  tracker.cancelled_at_nanos.compare_exchange(0, nanos, Ordering::Release, Ordering::Relaxed).ok();

Basically 0 denotes not cancelled. >0 denotes nanos since PROCESS_START when cancel was fired

@github-actions
Copy link
Copy Markdown
Contributor

PR Code Analyzer ❗

AI-powered 'Code-Diff-Analyzer' found issues on commit 72f79f1.

PathLineSeverityDescription
sandbox/libs/dataformat-native/rust/Cargo.toml85mediumRelease profile hardening weakened: LTO disabled (lto = false) and codegen-units increased from 1 to 16. LTO enables dead-code elimination and cross-crate inlining that reduces binary attack surface; disabling it produces a larger, less-optimized binary. Combined with higher codegen-units, this rolls back standard Rust binary hardening for release builds without a stated reason.
sandbox/plugins/analytics-backend-datafusion/rust/src/ffm.rs350lowNew exported C FFI symbol df_set_cancel_stats_threshold_ms allows any caller with access to the native library to silently suppress or inflate cancellation statistics at runtime (threshold=0 inflates all cancels into stats; very large threshold hides all). The Java bridge exposes this as a public static method with a comment noting it is 'primarily for testing', but no enforcement prevents production callers from invoking it to manipulate observability data.
sandbox/plugins/analytics-backend-datafusion/rust/src/native_node_stats.rs72lowThe reported 'total' counters (vals[1] and vals[3]) are now computed as permanent_total + current_live, meaning currently-running cancelled queries are double-counted in the total. This changes the semantic contract of the stats buffer silently: callers reading the buffer who expect total to be a monotonically incrementing lifetime count will receive an inflated value whenever there are live queries. While documented in comments, this could mislead monitoring or alerting logic that relies on the previous semantics.

The table above displays the top 10 most important findings.

Total: 3 | Critical: 0 | High: 0 | Medium: 1 | Low: 2


Pull Requests Author(s): Please update your Pull Request according to the report above.

Repository Maintainer(s): You can bypass diff analyzer by adding label skip-diff-analyzer after reviewing the changes carefully, then re-run failed actions. To re-enable the analyzer, remove the label, then re-run all actions.


⚠️ Note: The Code-Diff-Analyzer helps protect against potentially harmful code patterns. Please ensure you have thoroughly reviewed the changes beforehand.

Thanks.

- Add QueryType::Shard to new QueryTrackingContext::new call in
  execute_query (benchmark path added on main)
- Revert Cargo.toml profile.release back to lto=true, codegen-units=1
  (accidentally committed as lto=false during merge)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

Signed-off-by: Aravind Sagar <sagarara@amazon.com>
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit b44fe1c

Use lock-free AtomicU64 (nanos since process start) with CAS for
cancelled_at, eliminating the per-entry Mutex entirely. This removes
any deadlock concern between DashMap iteration and the per-entry lock.

- cancelled_at_nanos: 0 = not cancelled, >0 = nanos since PROCESS_START
- cancel_query uses compare_exchange(0, nanos, Release, Relaxed)
- count_cancelled_running is fully lock-free (atomic load per entry)
- Drop reads cancelled_at_nanos with Acquire ordering
- Remove threshold-sensitive unit tests that were flaky under parallel
  execution (Drop-to-total path covered by manual cluster testing)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

Signed-off-by: Aravind Sagar <sagarara@amazon.com>
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 5573fe8

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for 5573fe8: null

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@aravindsagar
Copy link
Copy Markdown
Contributor Author

gradle-check timed out (Job not started yet. Waiting for 60 seconds before next attempt. for 2 hours)

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 5573fe8

@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for 5573fe8: SUCCESS

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants