Skip to content

Introduce memory pool on write/merge side#21821

Draft
rayshrey wants to merge 2 commits into
opensearch-project:mainfrom
rayshrey:writer-memory-pool
Draft

Introduce memory pool on write/merge side#21821
rayshrey wants to merge 2 commits into
opensearch-project:mainfrom
rayshrey:writer-memory-pool

Conversation

@rayshrey
Copy link
Copy Markdown
Contributor

Description

[Describe what this change achieves]

Related Issues

Resolves #[Issue number to be closed when this PR is merged]

Check List

  • Functionality includes testing.
  • API changes companion pull request created, if applicable.
  • Public documentation issue/PR created, if applicable.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@github-actions
Copy link
Copy Markdown
Contributor

PR Reviewer Guide 🔍

Here are some key observations to aid the review process:

🧪 No relevant tests
🔒 No security concerns identified
✅ No TODO sections
🔀 No multiple PR themes
⚡ Recommended focus areas for review

Possible Issue

In write_record_batch, memory reservation is checked after the batch is already written to the ArrowWriter. If the reservation fails, the function returns an error, but the data has already been written to the internal buffers. This creates an inconsistent state where the writer holds data that exceeds the memory limit. The comment acknowledges this ('the write already happened but we reject'), but this design means the limit is not enforced preventively—it only detects violations after the fact. If the intent is to prevent writes that would exceed the limit, the reservation should be attempted before calling writer.write(&record_batch).

let mut writer = writer_arc.lock().unwrap();
writer.write(&record_batch)?;
let actual_mem = writer.memory_size();
drop(writer);

// Now check: try to reserve the actual memory used.
// If over limit, the write already happened but we reject —
// the writer will be dropped (data replayed from translog).
let current = state.reservation.size();
if actual_mem > current {
    state.reservation.try_grow(actual_mem - current)
        .map_err(|e| -> Box<dyn std::error::Error> {
            format!("Write memory limit exceeded: {}", e).into()
        })?;
} else if actual_mem < current {
    state.reservation.shrink(current - actual_mem);
}

Underflow Risk
In shrink, if old < bytes, the function logs an underflow error but proceeds with fetch_sub, causing the atomic counter to wrap around to a very large value (unsigned underflow). This corrupts the pool's accounting. After logging the error, the function should either clamp the subtraction to zero or panic to prevent silent data corruption in the memory tracking.

@github-actions
Copy link
Copy Markdown
Contributor

PR Code Suggestions ✨

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Prevent integer overflow in memory calculation

The multiplication batch_bytes * 3 can overflow for large batches, causing incorrect
memory reservation or panic. Use checked_mul to detect overflow and return an
appropriate error before attempting reservation.

sandbox/plugins/parquet-data-format/src/main/rust/src/writer.rs [260-264]

-let sort_peak = batch_bytes * 3;
+let sort_peak = batch_bytes.checked_mul(3)
+    .ok_or_else(|| -> Box<dyn std::error::Error> {
+        format!("Sort memory calculation overflow: batch_bytes={}", batch_bytes).into()
+    })?;
 self.reservation.try_grow(sort_peak)
     .map_err(|e| -> Box<dyn std::error::Error> {
         format!("Write memory limit exceeded during sort (need {} bytes): {}", sort_peak, e).into()
     })?;
Suggestion importance[1-10]: 8

__

Why: The multiplication batch_bytes * 3 can overflow for large batches, causing incorrect memory reservation or panic. Using checked_mul prevents this critical issue and provides proper error handling before memory operations.

Medium
Prevent overflow in mapping size calculation

The multiplication total_rows * std::mem::size_of::() can overflow for very large
row counts, leading to incorrect memory tracking or allocation failure. Use
checked_mul to detect overflow before growing the reservation.

sandbox/plugins/parquet-data-format/src/main/rust/src/merge/sorted.rs [117-120]

 let total_rows: usize = file_row_counts.iter().sum();
 let mut reservation = MemoryReservation::new(merge_pool(), "merge_sorted");
 // Track mapping vec allocation
-reservation.grow(total_rows * std::mem::size_of::<i64>());
+let mapping_bytes = total_rows.checked_mul(std::mem::size_of::<i64>())
+    .ok_or_else(|| MergeError::Logic(format!("Mapping size overflow: {} rows", total_rows)))?;
+reservation.grow(mapping_bytes);
 let mut mapping: Vec<i64> = vec![0i64; total_rows];
Suggestion importance[1-10]: 8

__

Why: The multiplication total_rows * std::mem::size_of::<i64>() can overflow for very large row counts, leading to incorrect memory tracking. Using checked_mul prevents this potential bug and ensures accurate memory reservation.

Medium
Use stronger memory ordering for atomics

The fetch_update uses Ordering::Relaxed for both success and failure orderings,
which may allow memory reordering issues in concurrent scenarios. Use
Ordering::SeqCst or at minimum Ordering::AcqRel for the success case to ensure
proper synchronization when multiple threads are reserving memory simultaneously.

sandbox/plugins/parquet-data-format/src/main/rust/src/memory_pool.rs [71-78]

-let result = self.used.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |used| {
+let result = self.used.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |used| {
     let new_used = used.checked_add(bytes)?;
     if limit > 0 && new_used > limit {
         None
     } else {
         Some(new_used)
     }
 });
Suggestion importance[1-10]: 7

__

Why: Using Ordering::Relaxed in concurrent memory reservation scenarios can lead to race conditions. Stronger ordering like Ordering::SeqCst or Ordering::AcqRel ensures proper synchronization across threads, which is critical for a memory pool that tracks allocations across multiple operations.

Medium
General
Check memory limit before write operation

Memory limit check occurs after the write completes, allowing the limit to be
exceeded. Check and reserve memory before calling writer.write() to prevent
exceeding the limit. Use writer.memory_size() before the write, reserve the delta,
then write.

sandbox/plugins/parquet-data-format/src/main/rust/src/writer.rs [513-527]

 let mut writer = writer_arc.lock().unwrap();
+let mem_before = writer.memory_size();
 writer.write(&record_batch)?;
-let actual_mem = writer.memory_size();
+let mem_after = writer.memory_size();
 drop(writer);
 
-// Now check: try to reserve the actual memory used.
-// If over limit, the write already happened but we reject —
-// the writer will be dropped (data replayed from translog).
 let current = state.reservation.size();
-if actual_mem > current {
-    state.reservation.try_grow(actual_mem - current)
+if mem_after > current {
+    state.reservation.try_grow(mem_after - current)
         .map_err(|e| -> Box<dyn std::error::Error> {
             format!("Write memory limit exceeded: {}", e).into()
         })?;
+} else if mem_after < current {
+    state.reservation.shrink(current - mem_after);
+}
Suggestion importance[1-10]: 4

__

Why: The suggestion correctly identifies that memory checking happens after the write, but the improved code doesn't actually fix the issue—it still checks after writer.write(). The comment acknowledges this is intentional (data replayed from translog), so this is more of a design choice than a bug.

Low

@github-actions
Copy link
Copy Markdown
Contributor

✅ Gradle check result for b4117dd: SUCCESS

@codecov
Copy link
Copy Markdown

codecov Bot commented May 25, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 73.30%. Comparing base (71c0afb) to head (b4117dd).
⚠️ Report is 3 commits behind head on main.

Additional details and impacted files
@@             Coverage Diff              @@
##               main   #21821      +/-   ##
============================================
- Coverage     73.40%   73.30%   -0.11%     
+ Complexity    75366    75293      -73     
============================================
  Files          6029     6029              
  Lines        342164   342164              
  Branches      49204    49204              
============================================
- Hits         251178   250807     -371     
- Misses        71051    71415     +364     
- Partials      19935    19942       +7     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Signed-off-by: rayshrey <rayshrey@amazon.com>
@rayshrey rayshrey force-pushed the writer-memory-pool branch from b4117dd to 20c7987 Compare May 26, 2026 11:22
@github-actions
Copy link
Copy Markdown
Contributor

PR Code Analyzer ❗

AI-powered 'Code-Diff-Analyzer' found issues on commit 7bc820b.

PathLineSeverityDescription
server/build.gradle76highNew dependency added: 'compileOnly project(":libs:opensearch-arrow-spi")'. Per mandatory review policy, all dependency changes must be flagged for maintainer verification regardless of apparent legitimacy. This wires the arrow-spi library into the core server module.

The table above displays the top 10 most important findings.

Total: 1 | Critical: 0 | High: 1 | Medium: 0 | Low: 0


Pull Requests Author(s): Please update your Pull Request according to the report above.

Repository Maintainer(s): You can bypass diff analyzer by adding label skip-diff-analyzer after reviewing the changes carefully, then re-run failed actions. To re-enable the analyzer, remove the label, then re-run all actions.


⚠️ Note: The Code-Diff-Analyzer helps protect against potentially harmful code patterns. Please ensure you have thoroughly reviewed the changes beforehand.

Thanks.

Signed-off-by: rayshrey <rayshrey@amazon.com>
@rayshrey rayshrey force-pushed the writer-memory-pool branch from 7bc820b to 5e7f830 Compare May 31, 2026 12:00
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant