Skip to content

Fix signed/unsigned long overflow on sum() using Neumaier summation#21871

Open
aasom143 wants to merge 1 commit into
opensearch-project:mainfrom
aasom143:sum-overflow-fix
Open

Fix signed/unsigned long overflow on sum() using Neumaier summation#21871
aasom143 wants to merge 1 commit into
opensearch-project:mainfrom
aasom143:sum-overflow-fix

Conversation

@aasom143
Copy link
Copy Markdown
Contributor

Replace DataFusion's built-in SUM with a custom UDAF that casts all numeric inputs to f64 and uses Neumaier's improved Kahan compensated summation algorithm. This prevents i64 wrapping overflow when summing large values.

Key changes:

  • New custom sum UDAF (udaf/sum.rs) returning Float64
  • Neumaier compensated summation for cross-batch precision
  • Vectorized GroupsAccumulator for GROUP BY queries (zero perf regression)
  • Arrow SIMD-optimized intra-batch sum + Neumaier for cross-batch merge

Performed the performance benchmarking and perf numbers are comparable without this change.

Description

[Describe what this change achieves]

Related Issues

Resolves #[Issue number to be closed when this PR is merged]

Check List

  • Functionality includes testing.
  • API changes companion pull request created, if applicable.
  • Public documentation issue/PR created, if applicable.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@aasom143 aasom143 requested a review from a team as a code owner May 28, 2026 18:01
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 28, 2026

PR Reviewer Guide 🔍

(Review updated until commit 12562ab)

Here are some key observations to aid the review process:

🧪 PR contains tests
🔒 No security concerns identified
✅ No TODO sections
🔀 No multiple PR themes
⚡ Recommended focus areas for review

Possible Issue

In merge_batch, the count field from partial states is read but never actually used to track the total count. Line 233 reads counts.value(i) but only increments self.count by the count from that state. However, the count represents how many non-null values contributed to that partial sum. If a partial state has count=0 (all nulls), we still call neumaier_add on its sum and compensation (lines 231-232), potentially adding zeros or uninitialized values. This could corrupt the final sum when merging states where some workers saw only nulls.

fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
    let sums = states[0].as_any().downcast_ref::<Float64Array>().unwrap();
    let comps = states[1].as_any().downcast_ref::<Float64Array>().unwrap();
    let counts = states[2].as_any().downcast_ref::<UInt64Array>().unwrap();

    for i in 0..sums.len() {
        if sums.is_null(i) {
            continue;
        }
        self.neumaier_add(sums.value(i));
        self.neumaier_add(comps.value(i));
        self.count += counts.value(i);
    }
    Ok(())
}
Possible Issue

In NeumaierGroupsAccumulator::merge_batch, line 373 checks if counts.value(i) > 0 to set null_state[group] = true, but this happens after already calling neumaier_add_to_group twice (lines 371-372). If counts.value(i) is 0 (meaning the partial state had no non-null values), we've already added its sum and compensation to the group. This means groups can accumulate garbage values from empty partial states. The count check should gate the neumaier_add calls, not just the null_state update.

for (i, &group) in group_indices.iter().enumerate() {
    if opt_filter.map_or(false, |f| !f.value(i)) || sums.is_null(i) {
        continue;
    }
    self.neumaier_add_to_group(group, sums.value(i));
    self.neumaier_add_to_group(group, comps.value(i));
    if counts.value(i) > 0 {
        self.null_state[group] = true;
    }
}
Incorrect State

The state() method in NeumaierGroupsAccumulator (lines 422-432) constructs a count array where each element is 1 if null_state[group] is true, else 0. This loses the actual count of values accumulated per group. When these states are later merged via merge_batch, the receiving accumulator will see count=1 for every non-null group regardless of how many values it actually summed. This breaks distributed aggregation correctness when partial results are shuffled and re-aggregated.

let null_buffer = NullBuffer::from(nulls.clone());
let sum_array = Float64Array::new(sums.into(), Some(null_buffer.clone()));
let comp_array = Float64Array::new(comps.into(), Some(null_buffer.clone()));
let count_array: Vec<u64> = nulls.iter().map(|&v| if v { 1 } else { 0 }).collect();
let count_array = UInt64Array::from(count_array);

Ok(vec![
    Arc::new(sum_array),
    Arc::new(comp_array),
    Arc::new(count_array),
])

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 28, 2026

PR Code Suggestions ✨

Latest suggestions up to 12562ab

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Handle NaN and infinity values

The Neumaier summation algorithm should handle special floating-point values (NaN,
infinity) explicitly. Currently, adding NaN or infinity can corrupt the compensation
term and produce incorrect results. Add checks to skip or handle these special cases
before performing the compensated addition.

sandbox/plugins/analytics-backend-datafusion/rust/src/udaf/sum.rs [165-173]

 fn neumaier_add(&mut self, value: f64) {
+    if !value.is_finite() {
+        self.sum += value;
+        return;
+    }
     let t = self.sum + value;
     if self.sum.abs() >= value.abs() {
         self.compensation += (self.sum - t) + value;
     } else {
         self.compensation += (value - t) + self.sum;
     }
     self.sum = t;
 }
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly identifies that special floating-point values (NaN, infinity) can corrupt the Neumaier compensation term. Adding explicit handling for non-finite values improves robustness and prevents incorrect results in edge cases.

Medium
Handle special float values in groups

The group accumulator's Neumaier addition should also handle special floating-point
values (NaN, infinity) to prevent corruption of the compensation term. This ensures
consistency with the single accumulator and prevents incorrect results when
aggregating groups containing special values.

sandbox/plugins/analytics-backend-datafusion/rust/src/udaf/sum.rs [287-297]

 fn neumaier_add_to_group(&mut self, group: usize, value: f64) {
+    if !value.is_finite() {
+        self.sums[group] += value;
+        self.null_state[group] = true;
+        return;
+    }
     let sum = self.sums[group];
     let t = sum + value;
     if sum.abs() >= value.abs() {
         self.compensations[group] += (sum - t) + value;
     } else {
         self.compensations[group] += (value - t) + sum;
     }
     self.sums[group] = t;
     self.null_state[group] = true;
 }
Suggestion importance[1-10]: 7

__

Why: Similar to the first suggestion, this addresses the same issue in the GroupsAccumulator implementation. Handling special floating-point values consistently across both accumulator types prevents corruption of compensation terms and ensures correct aggregation results.

Medium

Previous suggestions

Suggestions up to commit 577faaa
CategorySuggestion                                                                                                                                    Impact
Possible issue
Add bounds checking for array access

Add bounds checking for the values array before indexing. If values has fewer than 3
elements, accessing values[0], values[1], or values[2] will panic at runtime.

sandbox/plugins/analytics-backend-datafusion/rust/src/udaf/sum.rs [309-321]

 fn merge_batch(
     &mut self,
     values: &[ArrayRef],
     group_indices: &[usize],
     opt_filter: Option<&BooleanArray>,
     total_num_groups: usize,
 ) -> Result<()> {
+    if values.len() < 3 {
+        return Err(datafusion::common::DataFusionError::Internal(
+            format!("Expected 3 state arrays, got {}", values.len())
+        ));
+    }
     self.ensure_capacity(total_num_groups);
 
-    let sums = values[0].as_any().downcast_ref::<Float64Array>().unwrap();
-    let comps = values[1].as_any().downcast_ref::<Float64Array>().unwrap();
-    let counts = values[2].as_any().downcast_ref::<UInt64Array>().unwrap();
+    let sums = values[0].as_any().downcast_ref::<Float64Array>()
+        .ok_or_else(|| datafusion::common::DataFusionError::Internal("Failed to downcast sums to Float64Array".to_string()))?;
+    let comps = values[1].as_any().downcast_ref::<Float64Array>()
+        .ok_or_else(|| datafusion::common::DataFusionError::Internal("Failed to downcast comps to Float64Array".to_string()))?;
+    let counts = values[2].as_any().downcast_ref::<UInt64Array>()
+        .ok_or_else(|| datafusion::common::DataFusionError::Internal("Failed to downcast counts to UInt64Array".to_string()))?;
Suggestion importance[1-10]: 8

__

Why: This is a valid concern about potential out-of-bounds access. The merge_batch method in GroupsAccumulator expects exactly 3 state arrays, and accessing them without bounds checking could cause a panic. Adding validation improves robustness.

Medium
Add bounds checking for state arrays

Add bounds checking before accessing states[0], states[1], and states[2]. If the
states slice has fewer than 3 elements, the code will panic at runtime.

sandbox/plugins/analytics-backend-datafusion/rust/src/udaf/sum.rs [187-191]

 fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
-    let sums = states[0].as_any().downcast_ref::<Float64Array>().unwrap();
-    let comps = states[1].as_any().downcast_ref::<Float64Array>().unwrap();
-    let counts = states[2].as_any().downcast_ref::<UInt64Array>().unwrap();
+    if states.len() < 3 {
+        return Err(datafusion::common::DataFusionError::Internal(
+            format!("Expected 3 state arrays, got {}", states.len())
+        ));
+    }
+    let sums = states[0].as_any().downcast_ref::<Float64Array>()
+        .ok_or_else(|| datafusion::common::DataFusionError::Internal("Failed to downcast sums to Float64Array".to_string()))?;
+    let comps = states[1].as_any().downcast_ref::<Float64Array>()
+        .ok_or_else(|| datafusion::common::DataFusionError::Internal("Failed to downcast comps to Float64Array".to_string()))?;
+    let counts = states[2].as_any().downcast_ref::<UInt64Array>()
+        .ok_or_else(|| datafusion::common::DataFusionError::Internal("Failed to downcast counts to UInt64Array".to_string()))?;
Suggestion importance[1-10]: 8

__

Why: Similar to suggestion 2, this identifies a potential panic from out-of-bounds array access in the Accumulator::merge_batch method. The suggestion to add bounds checking and replace unwrap() with proper error handling is valid and improves code safety.

Medium
Replace unwrap with error handling

Replace unwrap() calls with proper error handling using ok_or_else() or pattern
matching. If downcasting fails, it indicates a critical type mismatch that should
return a descriptive error rather than panicking.

sandbox/plugins/analytics-backend-datafusion/rust/src/udaf/sum.rs [111-129]

 fn native_sum_as_f64(col: &ArrayRef) -> Result<Option<f64>> {
     use datafusion::arrow::array::*;
     match col.data_type() {
-        DataType::Float64 => Ok(compute::sum(col.as_any().downcast_ref::<Float64Array>().unwrap())),
-        DataType::Float32 => Ok(compute::sum(col.as_any().downcast_ref::<Float32Array>().unwrap()).map(|v| v as f64)),
+        DataType::Float64 => {
+            let arr = col.as_any().downcast_ref::<Float64Array>()
+                .ok_or_else(|| datafusion::common::DataFusionError::Internal("Failed to downcast to Float64Array".to_string()))?;
+            Ok(compute::sum(arr))
+        }
+        DataType::Float32 => {
+            let arr = col.as_any().downcast_ref::<Float32Array>()
+                .ok_or_else(|| datafusion::common::DataFusionError::Internal("Failed to downcast to Float32Array".to_string()))?;
+            Ok(compute::sum(arr).map(|v| v as f64))
+        }
         ...
         _ => {
             let f64_array = compute::cast(col, &DataType::Float64)?;
-            let f64_typed = f64_array.as_any().downcast_ref::<Float64Array>().unwrap();
+            let f64_typed = f64_array.as_any().downcast_ref::<Float64Array>()
+                .ok_or_else(|| datafusion::common::DataFusionError::Internal("Failed to downcast casted array to Float64Array".to_string()))?;
             Ok(compute::sum(f64_typed))
         }
     }
 }
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly identifies that unwrap() calls can panic and should be replaced with proper error handling. However, since these are internal type conversions that should always succeed given the match on data_type(), the risk is moderate rather than critical.

Medium
Suggestions up to commit ea148b3
CategorySuggestion                                                                                                                                    Impact
Possible issue
Preserve actual group counts

The count_array construction loses the actual count information from groups. Each
group should preserve its actual count of non-null values, not just a binary 0/1
flag. This could cause incorrect results when merging partial aggregates across
multiple nodes.

sandbox/plugins/analytics-backend-datafusion/rust/src/udaf/sum.rs [358-362]

 let null_buffer = NullBuffer::from(nulls.clone());
 let sum_array = Float64Array::new(sums.into(), Some(null_buffer.clone()));
 let comp_array = Float64Array::new(comps.into(), Some(null_buffer.clone()));
-let count_array: Vec<u64> = nulls.iter().map(|&v| if v { 1 } else { 0 }).collect();
-let count_array = UInt64Array::from(count_array);
+let count_array = UInt64Array::new(counts.into(), Some(null_buffer));
Suggestion importance[1-10]: 9

__

Why: This is a critical bug. The state() method creates a count array with only 0/1 values instead of preserving actual counts, which would break distributed aggregation when merging states. The improved_code references an undefined counts variable, but the core issue identification is correct.

High
Add counts field for groups

The NeumaierGroupsAccumulator lacks a counts field to track the actual number of
non-null values per group. The current implementation only tracks whether a group
has seen any value (null_state), but doesn't maintain counts needed for proper state
serialization in state() method.

sandbox/plugins/analytics-backend-datafusion/rust/src/udaf/sum.rs [214-218]

 pub struct NeumaierGroupsAccumulator {
     sums: Vec<f64>,
     compensations: Vec<f64>,
+    counts: Vec<u64>,
     null_state: Vec<bool>,
 }
Suggestion importance[1-10]: 9

__

Why: This correctly identifies that NeumaierGroupsAccumulator needs a counts field to properly track per-group counts for state serialization. This is directly related to the bug in suggestion 2 and is necessary for correct distributed aggregation behavior.

High
General
Handle NaN and infinity values

The Neumaier summation algorithm should handle special floating-point values (NaN,
infinity) explicitly. If value is NaN or infinite, the compensation logic may
produce incorrect results. Consider adding early checks to handle these cases before
performing the compensation arithmetic.

sandbox/plugins/analytics-backend-datafusion/rust/src/udaf/sum.rs [122-130]

 fn neumaier_add(&mut self, value: f64) {
+    if !value.is_finite() {
+        self.sum += value;
+        return;
+    }
     let t = self.sum + value;
     if self.sum.abs() >= value.abs() {
         self.compensation += (self.sum - t) + value;
     } else {
         self.compensation += (value - t) + self.sum;
     }
     self.sum = t;
 }
Suggestion importance[1-10]: 5

__

Why: While handling special floating-point values is good practice, the suggestion doesn't account for the fact that self.sum might also be non-finite. The early return could bypass necessary state updates, and the compensation logic may still need adjustment for edge cases.

Low

@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for ea148b3: SUCCESS

@codecov
Copy link
Copy Markdown

codecov Bot commented May 28, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 73.45%. Comparing base (dad63c0) to head (12562ab).
⚠️ Report is 5 commits behind head on main.

Additional details and impacted files
@@             Coverage Diff              @@
##               main   #21871      +/-   ##
============================================
- Coverage     73.51%   73.45%   -0.07%     
- Complexity    75582    75584       +2     
============================================
  Files          6034     6034              
  Lines        342661   342661              
  Branches      49294    49294              
============================================
- Hits         251918   251696     -222     
- Misses        70712    70974     +262     
+ Partials      20031    19991      -40     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 577faaa

@github-actions
Copy link
Copy Markdown
Contributor

PR Code Analyzer ❗

AI-powered 'Code-Diff-Analyzer' found issues on commit f058a69.

PathLineSeverityDescription
sandbox/plugins/analytics-backend-datafusion/rust/src/udaf/sum.rs82mediumThe custom UDAF registers itself under the name 'sum', silently overriding DataFusion's built-in aggregate. This changes the return type from Int64 to Float64 for all SUM queries in sessions that call register_all(), which can break downstream consumers expecting Int64 results without any indication that the behavior has changed.
sandbox/plugins/analytics-backend-datafusion/rust/src/udaf/sum.rs447lowNeumaierGroupsAccumulator::state() emits count values of 0 or 1 (presence flags) rather than the actual non-null value count as NeumaierSumAccumulator does. In distributed merge scenarios, merge_batch only checks counts.value(i) > 0 to set null_state, so partial aggregation across nodes works, but the semantics differ from the scalar path and could produce incorrect null handling if a coordinator relies on count values for averaging or other derived computations.

The table above displays the top 10 most important findings.

Total: 2 | Critical: 0 | High: 0 | Medium: 1 | Low: 1


Pull Requests Author(s): Please update your Pull Request according to the report above.

Repository Maintainer(s): You can bypass diff analyzer by adding label skip-diff-analyzer after reviewing the changes carefully, then re-run failed actions. To re-enable the analyzer, remove the label, then re-run all actions.


⚠️ Note: The Code-Diff-Analyzer helps protect against potentially harmful code patterns. Please ensure you have thoroughly reviewed the changes beforehand.

Thanks.

Replace DataFusion's built-in SUM with a custom UDAF that casts all
numeric inputs to f64 except the batch and uses Neumaier's improved
Kahan compensated summation algorithm. This prevents i64 wrapping
overflow when summing large values.

Key changes:
- New custom sum UDAF (udaf/sum.rs) returning Float64
- Neumaier compensated summation for cross-batch precision
- Vectorized GroupsAccumulator for GROUP BY queries (zero perf regression)
- Arrow SIMD-optimized intra-batch sum + Neumaier for cross-batch merge

Signed-off-by: Somesh Gupta <someshgupta987@gmail.com>
@aasom143 aasom143 force-pushed the sum-overflow-fix branch from f058a69 to 12562ab Compare May 31, 2026 10:44
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 12562ab

@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for 12562ab: SUCCESS

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant