From 20c79877fe679c5489e53495e4d25c5d3302693d Mon Sep 17 00:00:00 2001 From: rayshrey Date: Mon, 25 May 2026 09:58:46 +0530 Subject: [PATCH 1/2] Introduce memory pool on write/merge side Signed-off-by: rayshrey --- .../dataformat-native/rust/common/src/lib.rs | 1 + .../rust/common/src/memory_pool.rs | 566 ++++++++++++++++++ .../dataformat-native/rust/lib/Cargo.toml | 3 + .../src/main/rust/Cargo.toml | 1 + .../src/main/rust/src/ffm.rs | 55 +- .../src/main/rust/src/lib.rs | 1 + .../src/main/rust/src/memory.rs | 44 ++ .../src/main/rust/src/merge/context.rs | 42 +- .../src/main/rust/src/merge/cursor.rs | 44 +- .../src/main/rust/src/merge/mod.rs | 1 + .../src/main/rust/src/merge/sorted.rs | 47 +- .../src/main/rust/src/merge/unsorted.rs | 28 + .../src/main/rust/src/writer.rs | 175 ++++-- 13 files changed, 942 insertions(+), 66 deletions(-) create mode 100644 sandbox/libs/dataformat-native/rust/common/src/memory_pool.rs create mode 100644 sandbox/plugins/parquet-data-format/src/main/rust/src/memory.rs diff --git a/sandbox/libs/dataformat-native/rust/common/src/lib.rs b/sandbox/libs/dataformat-native/rust/common/src/lib.rs index 0f4b8c132407f..c44fa871c4fb3 100644 --- a/sandbox/libs/dataformat-native/rust/common/src/lib.rs +++ b/sandbox/libs/dataformat-native/rust/common/src/lib.rs @@ -11,6 +11,7 @@ pub mod error; pub mod logger; pub mod allocator; +pub mod memory_pool; // Re-export the proc macro so plugins use `#[native_bridge_common::ffm_safe]` pub use native_bridge_macros::ffm_safe; diff --git a/sandbox/libs/dataformat-native/rust/common/src/memory_pool.rs b/sandbox/libs/dataformat-native/rust/common/src/memory_pool.rs new file mode 100644 index 0000000000000..495bc13d9ab84 --- /dev/null +++ b/sandbox/libs/dataformat-native/rust/common/src/memory_pool.rs @@ -0,0 +1,566 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +//! Memory pool for tracking native memory usage across write and merge operations. +//! +//! Provides a simple atomic counter with an optional limit. Operations that allocate +//! significant memory (RecordBatch buffering, sort read-back, merge cursors) call +//! `try_grow` before allocating and `shrink` after freeing. The pool rejects +//! allocations that would exceed the configured limit. +//! +//! `MemoryReservation` is an RAII handle that automatically returns memory to the +//! pool on drop, preventing leaks even on error paths. + +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Condvar, Mutex}; +use std::time::{Duration, Instant}; +use std::fmt; +use crate::{log_info, log_error}; + +/// Default timeout for blocking wait (300 seconds). +pub const DEFAULT_WAIT_TIMEOUT: Duration = Duration::from_secs(300); + +/// Merge operations can wait longer (600 seconds). +pub const MERGE_WAIT_TIMEOUT: Duration = Duration::from_secs(600); + +/// Controls whether an allocation blocks or rejects immediately. +#[derive(Debug, Clone)] +pub enum PoolBehavior { + /// Block until memory is available, up to the given timeout. + /// Returns `PoolTimeout` on expiry. + Wait(Duration), + /// Fail immediately if pool is full. + /// Returns `PoolExhausted`. + Reject, +} + +/// Error returned when a pool cannot satisfy an allocation request. +#[derive(Debug, Clone)] +pub struct PoolExhausted { + pub pool_name: &'static str, + pub requested: usize, + pub used: usize, + pub limit: usize, +} + +impl fmt::Display for PoolExhausted { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "[{}] memory limit exceeded: requested {} bytes, used {}, limit {}", + self.pool_name, self.requested, self.used, self.limit + ) + } +} + +impl std::error::Error for PoolExhausted {} + +/// Error returned when wait_and_grow times out. +#[derive(Debug, Clone)] +pub struct PoolTimeout { + pub pool_name: &'static str, + pub requested: usize, + pub used: usize, + pub limit: usize, + pub waited: Duration, +} + +impl fmt::Display for PoolTimeout { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!( + f, + "[{}] timed out waiting for {} bytes after {:?} (used: {}, limit: {})", + self.pool_name, self.requested, self.waited, self.used, self.limit + ) + } +} + +impl std::error::Error for PoolTimeout {} + +/// A node-level memory pool backed by an atomic counter with blocking wait support. +/// +/// When the pool is full, callers can block via `wait_and_grow` until other +/// reservations free memory. A `Condvar` is notified on every `shrink`. +pub struct MemoryPool { + name: &'static str, + used: AtomicUsize, + limit: AtomicUsize, + peak: AtomicUsize, + /// Condvar notified when memory is freed (shrink/free). + notify: Condvar, + /// Mutex paired with the condvar (holds no meaningful state). + notify_lock: Mutex<()>, +} + +impl fmt::Debug for MemoryPool { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("MemoryPool") + .field("name", &self.name) + .field("used", &self.used.load(Ordering::Relaxed)) + .field("limit", &self.limit.load(Ordering::Relaxed)) + .field("peak", &self.peak.load(Ordering::Relaxed)) + .finish() + } +} + +impl MemoryPool { + /// Create a new pool. `limit = 0` means unlimited. + pub fn new(name: &'static str, limit: usize) -> Self { + Self { + name, + used: AtomicUsize::new(0), + limit: AtomicUsize::new(limit), + peak: AtomicUsize::new(0), + notify: Condvar::new(), + notify_lock: Mutex::new(()), + } + } + + /// Attempt to reserve `bytes`. Returns error if it would exceed the limit. + fn try_grow(&self, bytes: usize, consumer: &str) -> Result<(), PoolExhausted> { + if bytes == 0 { + return Ok(()); + } + let limit = self.limit.load(Ordering::Relaxed); + let result = self.used.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |used| { + let new_used = used.checked_add(bytes)?; + if limit > 0 && new_used > limit { + None + } else { + Some(new_used) + } + }); + + match result { + Ok(old) => { + let new_used = old + bytes; + self.peak.fetch_max(new_used, Ordering::Relaxed); + log_info!( + "[{}] +{} bytes for '{}' (used: {}, limit: {})", + self.name, bytes, consumer, new_used, limit + ); + Ok(()) + } + Err(_) => { + let used = self.used.load(Ordering::Relaxed); + log_info!( + "[{}] REJECTED +{} bytes for '{}' (used: {}, limit: {})", + self.name, bytes, consumer, used, limit + ); + Err(PoolExhausted { + pool_name: self.name, + requested: bytes, + used, + limit, + }) + } + } + } + + /// Blocks until `bytes` can be reserved, or timeout expires. + /// On each failed attempt, waits for a notification (triggered by shrink/free). + fn wait_and_grow(&self, bytes: usize, consumer: &str, timeout: Duration) -> Result<(), PoolTimeout> { + if bytes == 0 { + return Ok(()); + } + // Fast path: try without waiting + if self.try_grow(bytes, consumer).is_ok() { + return Ok(()); + } + + // Slow path: wait for memory to be freed + let start = Instant::now(); + log_info!( + "[{}] WAITING for {} bytes for '{}' (used: {}, limit: {})", + self.name, bytes, consumer, self.used.load(Ordering::Relaxed), self.limit.load(Ordering::Relaxed) + ); + + loop { + let elapsed = start.elapsed(); + if elapsed >= timeout { + let used = self.used.load(Ordering::Relaxed); + let limit = self.limit.load(Ordering::Relaxed); + log_error!( + "[{}] TIMEOUT waiting for {} bytes for '{}' after {:?} (used: {}, limit: {})", + self.name, bytes, consumer, elapsed, used, limit + ); + return Err(PoolTimeout { + pool_name: self.name, + requested: bytes, + used, + limit, + waited: elapsed, + }); + } + + let remaining = timeout - elapsed; + let guard = self.notify_lock.lock().unwrap(); + let _ = self.notify.wait_timeout(guard, remaining.min(Duration::from_secs(1))).unwrap(); + + // Retry after wakeup + if self.try_grow(bytes, consumer).is_ok() { + log_info!( + "[{}] WAIT RESOLVED for '{}' after {:?}", + self.name, consumer, start.elapsed() + ); + return Ok(()); + } + } + } + + /// Infallible grow — use when the allocation has already happened and must be tracked. + pub fn grow(&self, bytes: usize, consumer: &str) { + if bytes == 0 { + return; + } + let new_used = self.used.fetch_add(bytes, Ordering::Relaxed) + bytes; + self.peak.fetch_max(new_used, Ordering::Relaxed); + log_info!( + "[{}] +{} bytes for '{}' (used: {}, limit: {})", + self.name, bytes, consumer, new_used, self.limit.load(Ordering::Relaxed) + ); + } + + /// Release `bytes` back to the pool. Notifies any waiting threads. + pub fn shrink(&self, bytes: usize, consumer: &str) { + if bytes == 0 { + return; + } + let old = self.used.fetch_sub(bytes, Ordering::Relaxed); + if old < bytes { + log_error!( + "[{}] UNDERFLOW: shrink {} bytes for '{}' but only {} was tracked", + self.name, bytes, consumer, old + ); + } else { + log_info!( + "[{}] -{} bytes for '{}' (used: {}, limit: {})", + self.name, bytes, consumer, old - bytes, self.limit.load(Ordering::Relaxed) + ); + } + // Wake up any threads waiting for memory + self.notify.notify_all(); + } + + pub fn used(&self) -> usize { + self.used.load(Ordering::Relaxed) + } + + pub fn peak(&self) -> usize { + self.peak.load(Ordering::Relaxed) + } + + pub fn limit(&self) -> usize { + self.limit.load(Ordering::Relaxed) + } + + pub fn name(&self) -> &'static str { + self.name + } + + pub fn set_limit(&self, new_limit: usize) { + let old = self.limit.swap(new_limit, Ordering::Relaxed); + log_info!("[{}] limit changed: {} -> {}", self.name, old, new_limit); + } +} + +/// RAII handle that tracks a portion of memory reserved from a [`MemoryPool`]. +/// Automatically releases all held memory on drop. +pub struct MemoryReservation { + pool: Arc, + consumer: &'static str, + size: usize, + behavior: PoolBehavior, +} + +impl MemoryReservation { + pub fn new(pool: &Arc, consumer: &'static str, behavior: PoolBehavior) -> Self { + Self { + pool: Arc::clone(pool), + consumer, + size: 0, + behavior, + } + } + + /// Try to grow this reservation. On failure, the reservation is unchanged. + fn try_grow(&mut self, bytes: usize) -> Result<(), PoolExhausted> { + self.pool.try_grow(bytes, self.consumer)?; + self.size += bytes; + Ok(()) + } + + /// Blocks until `bytes` can be reserved, or timeout expires. + fn wait_and_grow(&mut self, bytes: usize, timeout: Duration) -> Result<(), PoolTimeout> { + self.pool.wait_and_grow(bytes, self.consumer, timeout)?; + self.size += bytes; + Ok(()) + } + + /// Grow based on the reservation's behavior: either block (Wait) or reject immediately (Reject). + pub fn request(&mut self, bytes: usize) -> Result<(), Box> { + match &self.behavior { + PoolBehavior::Reject => self.try_grow(bytes).map_err(|e| Box::new(e) as Box), + PoolBehavior::Wait(timeout) => self.wait_and_grow(bytes, *timeout).map_err(|e| Box::new(e) as Box), + } + } + + /// Reserve an estimated amount using the reservation's behavior. + /// Returns the estimated amount for later use with `reconcile()`. + pub fn reserve_estimated(&mut self, estimated: usize) -> Result> { + self.request(estimated)?; + Ok(estimated) + } + + /// Reconcile a previous estimate with the actual measured size. + /// Logs a warning if the estimate was significantly off (>2× or <0.5×). + pub fn reconcile(&mut self, estimated: usize, actual: usize) { + if actual > estimated { + self.grow(actual - estimated); + } else if actual < estimated { + self.shrink(estimated - actual); + } + // Warn if estimate was significantly off + if estimated > 0 && (actual > estimated * 2 || actual < estimated / 2) { + crate::log_info!( + "[{}] ESTIMATE DRIFT for '{}': estimated={}, actual={}, ratio={:.2}x", + self.pool.name, self.consumer, estimated, actual, actual as f64 / estimated as f64 + ); + } + } + + /// Infallible grow. + pub fn grow(&mut self, bytes: usize) { + self.pool.grow(bytes, self.consumer); + self.size += bytes; + } + + /// Release `bytes` from this reservation. + pub fn shrink(&mut self, bytes: usize) { + let actual = bytes.min(self.size); + self.pool.shrink(actual, self.consumer); + self.size -= actual; + } + + /// Resize to a new total. Grows or shrinks as needed. + pub fn resize(&mut self, new_total: usize) { + if new_total > self.size { + self.grow(new_total - self.size); + } else if new_total < self.size { + self.shrink(self.size - new_total); + } + } + + /// Release all memory back to the pool. Returns bytes freed. + pub fn free(&mut self) -> usize { + let s = self.size; + if s > 0 { + self.pool.shrink(s, self.consumer); + self.size = 0; + } + s + } + + pub fn size(&self) -> usize { + self.size + } + + pub fn consumer(&self) -> &'static str { + self.consumer + } + + /// Create a sibling reservation from the same pool and behavior, with a different consumer name. + pub fn child(&self, consumer: &'static str) -> Self { + Self { + pool: Arc::clone(&self.pool), + consumer, + size: 0, + behavior: self.behavior.clone(), + } + } +} + +impl Drop for MemoryReservation { + fn drop(&mut self) { + if self.size > 0 { + log_info!( + "[{}] reservation '{}' dropped, releasing {} bytes", + self.pool.name, self.consumer, self.size + ); + self.pool.shrink(self.size, self.consumer); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn test_pool(limit: usize) -> Arc { + Arc::new(MemoryPool::new("TEST", limit)) + } + + #[test] + fn basic_grow_shrink() { + let pool = test_pool(0); // unlimited + let mut res = MemoryReservation::new(&pool, "test", PoolBehavior::Reject); + res.grow(100); + assert_eq!(res.size(), 100); + assert_eq!(pool.used(), 100); + res.shrink(40); + assert_eq!(res.size(), 60); + assert_eq!(pool.used(), 60); + res.free(); + assert_eq!(pool.used(), 0); + } + + #[test] + fn try_grow_within_limit() { + let pool = test_pool(1000); + let mut res = MemoryReservation::new(&pool, "test", PoolBehavior::Reject); + assert!(res.try_grow(500).is_ok()); + assert!(res.try_grow(400).is_ok()); + assert_eq!(pool.used(), 900); + } + + #[test] + fn try_grow_exceeds_limit() { + let pool = test_pool(1000); + let mut res = MemoryReservation::new(&pool, "test", PoolBehavior::Reject); + assert!(res.try_grow(500).is_ok()); + let err = res.try_grow(600).unwrap_err(); + assert_eq!(err.requested, 600); + assert_eq!(err.used, 500); + assert_eq!(err.limit, 1000); + assert_eq!(res.size(), 500); // unchanged + assert_eq!(pool.used(), 500); + } + + #[test] + fn drop_releases_memory() { + let pool = test_pool(0); + { + let mut res = MemoryReservation::new(&pool, "test", PoolBehavior::Reject); + res.grow(200); + assert_eq!(pool.used(), 200); + } // res dropped here + assert_eq!(pool.used(), 0); + } + + #[test] + fn resize() { + let pool = test_pool(0); + let mut res = MemoryReservation::new(&pool, "test", PoolBehavior::Reject); + res.resize(100); + assert_eq!(res.size(), 100); + assert_eq!(pool.used(), 100); + res.resize(50); + assert_eq!(res.size(), 50); + assert_eq!(pool.used(), 50); + res.resize(200); + assert_eq!(res.size(), 200); + assert_eq!(pool.used(), 200); + } + + #[test] + fn multiple_reservations_share_pool() { + let pool = test_pool(1000); + let mut r1 = MemoryReservation::new(&pool, "writer1", PoolBehavior::Reject); + let mut r2 = MemoryReservation::new(&pool, "writer2", PoolBehavior::Reject); + r1.try_grow(400).unwrap(); + r2.try_grow(400).unwrap(); + assert_eq!(pool.used(), 800); + // Third allocation that would exceed + assert!(r2.try_grow(300).is_err()); + assert_eq!(pool.used(), 800); + drop(r1); + assert_eq!(pool.used(), 400); + // Now it fits + assert!(r2.try_grow(300).is_ok()); + assert_eq!(pool.used(), 700); + } + + #[test] + fn peak_tracking() { + let pool = test_pool(0); + let mut res = MemoryReservation::new(&pool, "test", PoolBehavior::Reject); + res.grow(100); + res.grow(200); + assert_eq!(pool.peak(), 300); + res.shrink(250); + assert_eq!(pool.peak(), 300); // peak unchanged + assert_eq!(pool.used(), 50); + } + + #[test] + fn set_limit_at_runtime() { + let pool = test_pool(100); + let mut res = MemoryReservation::new(&pool, "test", PoolBehavior::Reject); + assert!(res.try_grow(80).is_ok()); + assert!(res.try_grow(30).is_err()); // 80+30 > 100 + pool.set_limit(200); + assert!(res.try_grow(30).is_ok()); // 80+30 < 200 + assert_eq!(pool.used(), 110); + } + + #[test] + fn zero_bytes_is_noop() { + let pool = test_pool(100); + let mut res = MemoryReservation::new(&pool, "test", PoolBehavior::Reject); + assert!(res.try_grow(0).is_ok()); + res.grow(0); + res.shrink(0); + assert_eq!(pool.used(), 0); + assert_eq!(res.size(), 0); + } + + #[test] + fn wait_and_grow_succeeds_immediately_when_under_limit() { + let pool = test_pool(1000); + let mut res = MemoryReservation::new(&pool, "test", PoolBehavior::Reject); + assert!(res.wait_and_grow(500, Duration::from_secs(1)).is_ok()); + assert_eq!(res.size(), 500); + assert_eq!(pool.used(), 500); + } + + #[test] + fn wait_and_grow_times_out_when_pool_full() { + let pool = test_pool(100); + let mut res = MemoryReservation::new(&pool, "test", PoolBehavior::Reject); + res.grow(100); // fill the pool + // Try to grow more — should timeout quickly + let result = res.wait_and_grow(50, Duration::from_millis(100)); + assert!(result.is_err()); + assert_eq!(res.size(), 100); // unchanged + } + + #[test] + fn wait_and_grow_unblocks_when_memory_freed() { + use std::thread; + + let pool = test_pool(100); + let pool2 = Arc::clone(&pool); + + let mut res = MemoryReservation::new(&pool, "holder", PoolBehavior::Reject); + res.grow(100); // fill the pool + + // Spawn a thread that will free memory after 50ms + let handle = thread::spawn(move || { + thread::sleep(Duration::from_millis(50)); + pool2.shrink(60, "holder"); // free 60 bytes + }); + + // This should block, then succeed after the other thread frees memory + let mut res2 = MemoryReservation::new(&pool, "waiter", PoolBehavior::Wait(Duration::from_secs(5))); + let result = res2.wait_and_grow(50, Duration::from_secs(5)); + assert!(result.is_ok()); + assert_eq!(res2.size(), 50); + + handle.join().unwrap(); + } +} diff --git a/sandbox/libs/dataformat-native/rust/lib/Cargo.toml b/sandbox/libs/dataformat-native/rust/lib/Cargo.toml index 6eadb23e82a21..bc23046a898cb 100644 --- a/sandbox/libs/dataformat-native/rust/lib/Cargo.toml +++ b/sandbox/libs/dataformat-native/rust/lib/Cargo.toml @@ -9,6 +9,9 @@ license = "Apache-2.0" name = "opensearch_native" crate-type = ["cdylib"] +[features] +test-limits = ["opensearch-parquet-format/test-limits"] + [dependencies] opensearch-datafusion = { path = "../../../../plugins/analytics-backend-datafusion/rust" } opensearch-parquet-format = { path = "../../../../plugins/parquet-data-format/src/main/rust" } diff --git a/sandbox/plugins/parquet-data-format/src/main/rust/Cargo.toml b/sandbox/plugins/parquet-data-format/src/main/rust/Cargo.toml index 365f571c62c5d..3020d4f5cfc25 100644 --- a/sandbox/plugins/parquet-data-format/src/main/rust/Cargo.toml +++ b/sandbox/plugins/parquet-data-format/src/main/rust/Cargo.toml @@ -7,6 +7,7 @@ workspace = "../../../../../libs/dataformat-native/rust" [features] test-utils = [] +test-limits = [] [lib] name = "opensearch_parquet_format" diff --git a/sandbox/plugins/parquet-data-format/src/main/rust/src/ffm.rs b/sandbox/plugins/parquet-data-format/src/main/rust/src/ffm.rs index e96d6b70d5b00..0ca50c55dbf6d 100644 --- a/sandbox/plugins/parquet-data-format/src/main/rust/src/ffm.rs +++ b/sandbox/plugins/parquet-data-format/src/main/rust/src/ffm.rs @@ -14,7 +14,7 @@ use std::slice; use std::str; -use native_bridge_common::{ffm_safe, log_debug}; +use native_bridge_common::ffm_safe; use crate::native_settings::NativeSettings; use crate::field_config::FieldConfig; @@ -694,3 +694,56 @@ pub unsafe extern "C" fn parquet_free_row_id_mapping( let _ = Box::from_raw(slice::from_raw_parts_mut(mapping_ptr as *mut i64, mapping_len as usize)); } } + +// --------------------------------------------------------------------------- +// Memory pool management +// --------------------------------------------------------------------------- + +/// Initialize write and merge memory pools with limits (bytes). 0 = unlimited. +#[no_mangle] +pub extern "C" fn parquet_init_memory_pools(write_limit: i64, merge_limit: i64) -> i64 { + let wl = if write_limit < 0 { 0 } else { write_limit as usize }; + let ml = if merge_limit < 0 { 0 } else { merge_limit as usize }; + crate::memory::init_pools(wl, ml); + 0 +} + +/// Set write pool limit at runtime. +#[no_mangle] +pub extern "C" fn parquet_set_write_pool_limit(limit: i64) -> i64 { + let l = if limit < 0 { 0 } else { limit as usize }; + crate::memory::write_pool().set_limit(l); + 0 +} + +/// Set merge pool limit at runtime. +#[no_mangle] +pub extern "C" fn parquet_set_merge_pool_limit(limit: i64) -> i64 { + let l = if limit < 0 { 0 } else { limit as usize }; + crate::memory::merge_pool().set_limit(l); + 0 +} + +/// Returns current write pool usage in bytes. +#[no_mangle] +pub extern "C" fn parquet_get_write_pool_used() -> i64 { + crate::memory::write_pool().used() as i64 +} + +/// Returns current merge pool usage in bytes. +#[no_mangle] +pub extern "C" fn parquet_get_merge_pool_used() -> i64 { + crate::memory::merge_pool().used() as i64 +} + +/// Returns peak write pool usage in bytes. +#[no_mangle] +pub extern "C" fn parquet_get_write_pool_peak() -> i64 { + crate::memory::write_pool().peak() as i64 +} + +/// Returns peak merge pool usage in bytes. +#[no_mangle] +pub extern "C" fn parquet_get_merge_pool_peak() -> i64 { + crate::memory::merge_pool().peak() as i64 +} diff --git a/sandbox/plugins/parquet-data-format/src/main/rust/src/lib.rs b/sandbox/plugins/parquet-data-format/src/main/rust/src/lib.rs index 2ce15506f12c4..72d27d421a919 100644 --- a/sandbox/plugins/parquet-data-format/src/main/rust/src/lib.rs +++ b/sandbox/plugins/parquet-data-format/src/main/rust/src/lib.rs @@ -20,6 +20,7 @@ pub mod writer_properties_builder; pub mod rate_limited_writer; pub mod crc_writer; pub mod merge; +pub mod memory; pub use native_settings::NativeSettings; pub use field_config::FieldConfig; diff --git a/sandbox/plugins/parquet-data-format/src/main/rust/src/memory.rs b/sandbox/plugins/parquet-data-format/src/main/rust/src/memory.rs new file mode 100644 index 0000000000000..d03265131f0e2 --- /dev/null +++ b/sandbox/plugins/parquet-data-format/src/main/rust/src/memory.rs @@ -0,0 +1,44 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +//! Global memory pool instances for write and merge operations. + +use std::sync::{Arc, OnceLock}; +use native_bridge_common::memory_pool::MemoryPool; + +static WRITE_POOL: OnceLock> = OnceLock::new(); +static MERGE_POOL: OnceLock> = OnceLock::new(); + +/// Default: 0 (unlimited). Set via FFI at runtime. +/// For testing limit enforcement, build with: cargo build --features test-limits +#[cfg(not(feature = "test-limits"))] +const DEFAULT_WRITE_LIMIT: usize = 0; +#[cfg(feature = "test-limits")] +const DEFAULT_WRITE_LIMIT: usize = 35 * 1024 * 1024; + +/// Default: 0 (unlimited). Set via FFI at runtime. +#[cfg(not(feature = "test-limits"))] +const DEFAULT_MERGE_LIMIT: usize = 0; +#[cfg(feature = "test-limits")] +const DEFAULT_MERGE_LIMIT: usize = 25 * 1024 * 1024; + +/// Returns the node-level write memory pool. +pub fn write_pool() -> &'static Arc { + WRITE_POOL.get_or_init(|| Arc::new(MemoryPool::new("WRITE", DEFAULT_WRITE_LIMIT))) +} + +/// Returns the node-level merge memory pool. +pub fn merge_pool() -> &'static Arc { + MERGE_POOL.get_or_init(|| Arc::new(MemoryPool::new("MERGE", DEFAULT_MERGE_LIMIT))) +} + +/// Initialize both pools with limits. Called from FFI at node startup. +pub fn init_pools(write_limit: usize, merge_limit: usize) { + write_pool().set_limit(write_limit); + merge_pool().set_limit(merge_limit); +} diff --git a/sandbox/plugins/parquet-data-format/src/main/rust/src/merge/context.rs b/sandbox/plugins/parquet-data-format/src/main/rust/src/merge/context.rs index e71707b21b9a0..47b22feffc593 100644 --- a/sandbox/plugins/parquet-data-format/src/main/rust/src/merge/context.rs +++ b/sandbox/plugins/parquet-data-format/src/main/rust/src/merge/context.rs @@ -22,7 +22,8 @@ use tokio::sync::{mpsc as tokio_mpsc, oneshot}; use crate::crc_writer::CrcWriter; use crate::rate_limited_writer::RateLimitedWriter; use crate::writer_properties_builder::WriterPropertiesBuilder; -use crate::{log_debug, SETTINGS_STORE}; +use crate::{log_debug, log_info, SETTINGS_STORE}; +use native_bridge_common::memory_pool::MemoryReservation; use super::error::{MergeError, MergeResult}; use super::io_task::{ @@ -45,6 +46,7 @@ pub struct MergeContext { next_row_id: i64, total_rows_written: usize, rayon_threads: Option, + reservation: MemoryReservation, } impl MergeContext { @@ -59,6 +61,7 @@ impl MergeContext { rayon_threads: Option, io_threads: Option, output_writer_generation: i64, + reservation: MemoryReservation, ) -> MergeResult { if let Some(parent) = Path::new(output_path).parent() { if !parent.exists() { @@ -121,6 +124,7 @@ impl MergeContext { next_row_id: 0, total_rows_written: 0, rayon_threads, + reservation, }) } @@ -131,6 +135,16 @@ impl MergeContext { /// Buffers a batch (already padded to data_schema) and auto-flushes when /// the row count threshold is reached. pub fn push_batch(&mut self, batch: RecordBatch) -> MergeResult<()> { + let batch_bytes = batch.get_array_memory_size(); + let num_rows = batch.num_rows(); + let num_cols = batch.num_columns(); + log_info!( + "[ALLOC] merge push_batch: batch_bytes={}, rows={}, cols={}, buffered_chunks={}, total_buffered_rows={}", + batch_bytes, num_rows, num_cols, self.output_chunks.len(), self.output_row_count + ); + if let Err(e) = self.reservation.request(batch_bytes) { + return Err(MergeError::Logic(format!("Merge memory limit exceeded: {}", e))); + } self.output_row_count += batch.num_rows(); self.output_chunks.push(batch); if self.output_row_count >= self.output_flush_rows { @@ -155,8 +169,22 @@ impl MergeContext { }; let n = merged.num_rows(); + // Track temporary spike: merged + with_id coexist briefly + let merged_bytes = merged.get_array_memory_size(); + if let Err(e) = self.reservation.request(merged_bytes) { + return Err(MergeError::Logic(format!("Merge memory limit exceeded during flush (concat): {}", e))); + } + let with_id = append_row_id(&merged, self.next_row_id, &self.output_schema)?; + let with_id_bytes = with_id.get_array_memory_size(); + if let Err(e) = self.reservation.request(with_id_bytes) { + return Err(MergeError::Logic(format!("Merge memory limit exceeded during flush (with_id): {}", e))); + } + log_info!( + "[ALLOC] merge flush: merged_bytes={}, with_id_bytes={}, rows={}", merged_bytes, with_id_bytes, n + ); drop(merged); + self.reservation.shrink(merged_bytes); let col_writers = self .rg_writer_factory @@ -189,15 +217,27 @@ impl MergeContext { encoded_chunks.push(r?); } + // Track encoded column memory (exists between encoding and IO send) + let encoded_bytes: usize = encoded_chunks.iter() + .map(|c| c.close().bytes_written as usize) + .sum(); + self.reservation.grow(encoded_bytes); + self.io_tx .blocking_send(IoCommand::WriteRowGroup(encoded_chunks)) .map_err(|_| MergeError::Logic("IO task terminated unexpectedly".into()))?; + // Release: with_id + encoded chunks (moved to IO) + self.reservation.shrink(with_id_bytes + encoded_bytes); + self.row_group_index += 1; self.next_row_id += n as i64; self.total_rows_written += n; self.output_row_count = 0; + // Release buffered batch memory — data has been encoded and sent to IO + self.reservation.free(); + log_debug!( "[RUST] Flushed row group {}: {} rows (total: {})", self.row_group_index - 1, diff --git a/sandbox/plugins/parquet-data-format/src/main/rust/src/merge/cursor.rs b/sandbox/plugins/parquet-data-format/src/main/rust/src/merge/cursor.rs index e39b5eb8ff3a1..8b3687a2f2a26 100644 --- a/sandbox/plugins/parquet-data-format/src/main/rust/src/merge/cursor.rs +++ b/sandbox/plugins/parquet-data-format/src/main/rust/src/merge/cursor.rs @@ -14,15 +14,14 @@ use arrow::datatypes::{DataType as ArrowDataType, Schema as ArrowSchema}; use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; use parquet::schema::types::SchemaDescriptor; +use native_bridge_common::memory_pool::MemoryReservation; + use super::error::{MergeError, MergeResult}; use super::heap::{get_sort_values, SortKey}; use super::io_task::get_merge_pool; use super::schema::projection_indices_excluding_row_id; /// A cursor over a single sorted Parquet input file. -/// -/// Each cursor reads batches sequentially and prefetches the next batch on the -/// shared Rayon pool to overlap IO with merge computation. pub struct FileCursor { reader: Arc>, prefetch_rx: std::sync::mpsc::Receiver>>, @@ -34,6 +33,8 @@ pub struct FileCursor { pub sort_col_indices: Vec, pub sort_col_types: Vec, pub nulls_first: Vec, + /// Bytes currently tracked in the reservation for this cursor's batches. + current_batch_bytes: usize, } impl FileCursor { @@ -47,6 +48,7 @@ impl FileCursor { sort_columns: &[String], nulls_first: &[bool], batch_size: usize, + reservation: &mut MemoryReservation, ) -> MergeResult<(Self, Arc, SchemaDescriptor, i64, usize)> { let file = File::open(path)?; let builder = ParquetRecordBatchReaderBuilder::try_new(file)?; @@ -81,6 +83,13 @@ impl FileCursor { .with_projection(projection) .build()?; + // Estimate cursor memory: batch_size rows × avg_row_bytes × 2 (current + prefetch) + // Use uncompressed size from metadata as estimate + let total_uncompressed: usize = parquet_schema_descr.root_schema().get_fields().len() * batch_size * 8; // rough estimate + let cursor_estimate = total_uncompressed * 2; + let estimate = reservation.reserve_estimated(cursor_estimate) + .map_err(|e| MergeError::Logic(format!("Merge memory limit exceeded (cursor {}): {}", file_id, e)))?; + let first_batch = match reader.next() { Some(Ok(b)) if b.num_rows() > 0 => b, Some(Err(e)) => return Err(e.into()), @@ -92,6 +101,11 @@ impl FileCursor { } }; + // Reconcile with actual batch size × 2 (for prefetch) + let actual_batch_bytes = first_batch.get_array_memory_size(); + let actual_cursor_bytes = actual_batch_bytes * 2; + reservation.reconcile(estimate, actual_cursor_bytes); + let projected_schema = first_batch.schema(); let mut sort_col_indices = Vec::with_capacity(sort_columns.len()); @@ -125,6 +139,7 @@ impl FileCursor { sort_col_indices, sort_col_types, nulls_first: nulls_first.to_vec(), + current_batch_bytes: actual_batch_bytes, }; cursor.start_prefetch(); @@ -152,23 +167,36 @@ impl FileCursor { }); } - pub fn load_next_batch(&mut self) -> MergeResult { + pub fn load_next_batch(&mut self, reservation: &mut MemoryReservation) -> MergeResult { + let old_bytes = self.current_batch_bytes; self.current_batch = None; match self.prefetch_rx.recv() { Ok(Some(Ok(batch))) => { + let new_bytes = batch.get_array_memory_size(); self.current_batch = Some(batch); self.row_idx = 0; self.prefetch_pending = false; self.start_prefetch(); + // Adjust reservation: shrink old, grow new (net delta) + if new_bytes > old_bytes { + reservation.grow(new_bytes - old_bytes); + } else if new_bytes < old_bytes { + reservation.shrink(old_bytes - new_bytes); + } + self.current_batch_bytes = new_bytes; Ok(true) } Ok(Some(Err(e))) => { self.prefetch_pending = false; + reservation.shrink(old_bytes); + self.current_batch_bytes = 0; Err(e) } Ok(None) | Err(_) => { self.prefetch_pending = false; + reservation.shrink(old_bytes); + self.current_batch_bytes = 0; Ok(false) } } @@ -208,20 +236,20 @@ impl FileCursor { self.current_batch.as_ref().unwrap().slice(start, len) } - pub fn advance(&mut self) -> MergeResult { + pub fn advance(&mut self, reservation: &mut MemoryReservation) -> MergeResult { if self.current_batch.is_none() { return Ok(false); } self.row_idx += 1; if self.row_idx >= self.current_batch.as_ref().unwrap().num_rows() { self.current_batch = None; - return self.load_next_batch(); + return self.load_next_batch(reservation); } Ok(true) } - pub fn advance_past_batch(&mut self) -> MergeResult { + pub fn advance_past_batch(&mut self, reservation: &mut MemoryReservation) -> MergeResult { self.current_batch = None; - self.load_next_batch() + self.load_next_batch(reservation) } } diff --git a/sandbox/plugins/parquet-data-format/src/main/rust/src/merge/mod.rs b/sandbox/plugins/parquet-data-format/src/main/rust/src/merge/mod.rs index 6df699d2db3b7..204d2ec69cde8 100644 --- a/sandbox/plugins/parquet-data-format/src/main/rust/src/merge/mod.rs +++ b/sandbox/plugins/parquet-data-format/src/main/rust/src/merge/mod.rs @@ -17,6 +17,7 @@ mod unsorted; pub use error::{MergeError, MergeResult}; pub use sorted::merge_sorted; +pub use sorted::merge_sorted_with_pool; pub use unsorted::merge_unsorted; /// Output of a merge operation. Carries both the row-ID mapping (for remapping diff --git a/sandbox/plugins/parquet-data-format/src/main/rust/src/merge/sorted.rs b/sandbox/plugins/parquet-data-format/src/main/rust/src/merge/sorted.rs index 000fff97b2bde..6176afcd3d916 100644 --- a/sandbox/plugins/parquet-data-format/src/main/rust/src/merge/sorted.rs +++ b/sandbox/plugins/parquet-data-format/src/main/rust/src/merge/sorted.rs @@ -14,6 +14,8 @@ use arrow::datatypes::Schema as ArrowSchema; use parquet::schema::types::SchemaDescriptor; use crate::log_debug; +use crate::log_info; +use native_bridge_common::memory_pool::{MemoryPool, MemoryReservation, PoolBehavior}; use super::context::MergeContext; use super::cursor::FileCursor; @@ -21,7 +23,9 @@ use super::heap::{cmp_sort_values, get_sort_values, HeapItem}; use super::io_task::get_merge_pool; use super::schema::ColumnMapping; + /// Performs a streaming k-way merge with an explicit sort direction per column. +/// Defaults to merge pool with reject behavior. pub fn merge_sorted( input_files: &[String], output_path: &str, @@ -30,6 +34,26 @@ pub fn merge_sorted( reverse_sorts: &[bool], nulls_first: &[bool], output_writer_generation: i64, +) -> super::MergeResult { + let mut reservation = MemoryReservation::new( + crate::memory::merge_pool(), "merge:cursors_and_mapping", PoolBehavior::Reject); + merge_sorted_with_pool( + input_files, output_path, index_name, sort_columns, + reverse_sorts, nulls_first, output_writer_generation, + &mut reservation, + ) +} + +/// Performs a streaming k-way merge with caller-specified pool and behavior. +pub fn merge_sorted_with_pool( + input_files: &[String], + output_path: &str, + index_name: &str, + sort_columns: &[String], + reverse_sorts: &[bool], + nulls_first: &[bool], + output_writer_generation: i64, + reservation: &mut MemoryReservation, ) -> super::MergeResult { let config = crate::writer::SETTINGS_STORE .get(index_name) @@ -51,7 +75,7 @@ pub fn merge_sorted( )); } - let pool = get_merge_pool(rayon_threads); + let rayon_pool = get_merge_pool(rayon_threads); let direction_label = if reverse_sorts.iter().all(|&r| !r) { "ascending" } else if reverse_sorts.iter().all(|&r| r) { @@ -68,7 +92,7 @@ pub fn merge_sorted( sort_columns, batch_size, output_flush_rows, - pool.current_num_threads(), + rayon_pool.current_num_threads(), output_path ); @@ -82,7 +106,7 @@ pub fn merge_sorted( for (file_id, path) in input_files.iter().enumerate() { log_debug!("[RUST] Opening cursor {} for file: {}", file_id, path); let (cursor, projected_schema, parquet_descr, generation, row_count) = - FileCursor::new(path, file_id, sort_columns, nulls_first, batch_size)?; + FileCursor::new(path, file_id, sort_columns, nulls_first, batch_size, reservation)?; cursors.push(cursor); arrow_schemas.push(projected_schema.as_ref().clone()); parquet_descriptors.push(parquet_descr); @@ -93,6 +117,7 @@ pub fn merge_sorted( let num_cursors = cursors.len(); // ── Phase 2: Create MergeContext (union schemas, writer, IO task) ─── + let ctx_reservation = reservation.child("merge:output_buffer"); let mut ctx = MergeContext::new( arrow_schemas.clone(), &parquet_descriptors, @@ -102,6 +127,7 @@ pub fn merge_sorted( rayon_threads, io_threads, output_writer_generation, + ctx_reservation, )?; // Precompute column mappings per cursor (avoids per-batch name lookups) @@ -112,7 +138,16 @@ pub fn merge_sorted( // Row-ID mapping: pre-allocate the flat mapping array and compute offsets // from file metadata row counts (known before reading any data). let total_rows: usize = file_row_counts.iter().sum(); + // Track mapping vec allocation — use reserve_estimated so merge can be rejected before allocating + let mapping_bytes = total_rows * std::mem::size_of::(); + reservation.reserve_estimated(mapping_bytes) + .map_err(|e| super::MergeError::Logic(format!("Merge memory limit exceeded (mapping): {}", e)))?; + log_info!( + "[ALLOC] merge_sorted: mapping_vec={} bytes, total_rows={}, num_files={}, batch_size={}", + mapping_bytes, total_rows, input_files.len(), batch_size + ); let mut mapping: Vec = vec![0i64; total_rows]; + let mut gen_keys: Vec = Vec::with_capacity(num_cursors); let mut gen_offsets: Vec = Vec::with_capacity(num_cursors); let mut gen_sizes: Vec = Vec::with_capacity(num_cursors); @@ -168,7 +203,7 @@ pub fn merge_sorted( } ctx.push_batch(col_mapping.pad_batch(&slice)?)?; } - if !cursor.advance_past_batch()? { + if !cursor.advance_past_batch(reservation)? { break; } } @@ -195,7 +230,7 @@ pub fn merge_sorted( } ctx.push_batch(col_mapping.pad_batch(&slice)?)?; - if !cursor.advance_past_batch()? { + if !cursor.advance_past_batch(reservation)? { break; } // Check if cursor should yield after loading new batch @@ -249,7 +284,7 @@ pub fn merge_sorted( } cursor.row_idx = run_end; - if !cursor.advance()? { + if !cursor.advance(reservation)? { break; } diff --git a/sandbox/plugins/parquet-data-format/src/main/rust/src/merge/unsorted.rs b/sandbox/plugins/parquet-data-format/src/main/rust/src/merge/unsorted.rs index 61222edb504f3..92332cbfc6c04 100644 --- a/sandbox/plugins/parquet-data-format/src/main/rust/src/merge/unsorted.rs +++ b/sandbox/plugins/parquet-data-format/src/main/rust/src/merge/unsorted.rs @@ -14,6 +14,8 @@ use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchR use parquet::schema::types::SchemaDescriptor; use crate::log_debug; +use crate::log_info; +use native_bridge_common::memory_pool::{MemoryPool, MemoryReservation, PoolBehavior}; use super::context::MergeContext; use super::error::MergeResult; @@ -21,11 +23,28 @@ use super::schema::{projection_indices_excluding_row_id, ColumnMapping}; /// Unsorted merge: reads each input file sequentially, pads to union schema, /// rewrites `__row_id__` with globally sequential values. No sorting performed. +/// Defaults to merge pool with reject behavior. pub fn merge_unsorted( input_files: &[String], output_path: &str, index_name: &str, output_writer_generation: i64, +) -> MergeResult { + let mut reservation = MemoryReservation::new( + &crate::memory::merge_pool(), "merge:mapping", PoolBehavior::Reject); + merge_unsorted_with_pool( + input_files, output_path, index_name, output_writer_generation, + &mut reservation, + ) +} + +/// Unsorted merge with caller-specified reservation. +pub fn merge_unsorted_with_pool( + input_files: &[String], + output_path: &str, + index_name: &str, + output_writer_generation: i64, + reservation: &mut MemoryReservation, ) -> MergeResult { let config = crate::writer::SETTINGS_STORE .get(index_name) @@ -68,6 +87,7 @@ pub fn merge_unsorted( file_generations.push(generation); } + let ctx_reservation = reservation.child("merge:output_buffer"); let mut ctx = MergeContext::new( arrow_schemas.clone(), &parquet_descriptors, @@ -77,6 +97,7 @@ pub fn merge_unsorted( rayon_threads, io_threads, output_writer_generation, + ctx_reservation, )?; // Precompute column mappings per reader @@ -87,6 +108,13 @@ pub fn merge_unsorted( // Build row-ID mapping: for unsorted merge, files are concatenated sequentially. // old_row_id maps directly to new_row_id with a per-file offset. let total_rows: usize = file_row_counts.iter().sum(); + let mapping_bytes = total_rows * std::mem::size_of::(); + reservation.reserve_estimated(mapping_bytes) + .map_err(|e| super::MergeError::Logic(format!("Merge memory limit exceeded (mapping): {}", e)))?; + log_info!( + "[ALLOC] merge_unsorted: mapping_vec={} bytes, total_rows={}, num_files={}", + mapping_bytes, total_rows, input_files.len() + ); let mut mapping: Vec = vec![0i64; total_rows]; let mut gen_keys: Vec = Vec::with_capacity(input_files.len()); let mut gen_offsets: Vec = Vec::with_capacity(input_files.len()); diff --git a/sandbox/plugins/parquet-data-format/src/main/rust/src/writer.rs b/sandbox/plugins/parquet-data-format/src/main/rust/src/writer.rs index 104fdc0a054bf..3828bec4ce06e 100644 --- a/sandbox/plugins/parquet-data-format/src/main/rust/src/writer.rs +++ b/sandbox/plugins/parquet-data-format/src/main/rust/src/writer.rs @@ -22,9 +22,17 @@ use std::sync::{Arc, Mutex}; use crate::{log_error, log_debug, log_info}; use crate::crc_writer::CrcWriter; -use crate::merge::{merge_sorted, schema::ROW_ID_COLUMN_NAME}; +use crate::memory::write_pool; +use crate::merge::{merge_sorted_with_pool, schema::ROW_ID_COLUMN_NAME}; use crate::native_settings::NativeSettings; use crate::writer_properties_builder::WriterPropertiesBuilder; +use native_bridge_common::memory_pool::{MemoryReservation, DEFAULT_WAIT_TIMEOUT}; + +/// Write path timeout: 300s in production, 10s for test-limits builds. +#[cfg(not(feature = "test-limits"))] +const WRITE_TIMEOUT: std::time::Duration = DEFAULT_WAIT_TIMEOUT; +#[cfg(feature = "test-limits")] +const WRITE_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); /// Result from finalizing a writer: Parquet metadata + whole-file CRC32 + optional sort permutation. #[derive(Debug)] @@ -148,7 +156,7 @@ impl SortingChunkedWriter { Ok(()) } - fn write(&mut self, batch: &RecordBatch) -> Result<(), Box> { + fn write(&mut self, batch: &RecordBatch, reservation: &mut MemoryReservation) -> Result<(), Box> { if self.current_ipc_writer.is_none() { return Ok(()); } @@ -156,13 +164,10 @@ impl SortingChunkedWriter { let incoming_batch_bytes = batch.get_array_memory_size() as u64; // Check if adding this batch would breach the memory threshold. - // If the current chunk already has data and the combined size exceeds - // the budget, flush (sort + write) the current chunk first, then - // write the new batch into a fresh IPC staging file. if self.current_chunk_bytes > 0 && self.current_chunk_bytes + incoming_batch_bytes > self.memory_threshold_bytes { - self.flush_and_sort_chunk()?; + self.flush_and_sort_chunk(reservation)?; } // If the batch itself fits within the threshold, write it directly. @@ -174,11 +179,8 @@ impl SortingChunkedWriter { self.current_rows += batch.num_rows(); self.total_rows += batch.num_rows(); } else { - // The batch alone exceeds the memory budget — slice it into pieces - // that each fit within the threshold, flushing after each piece. let num_rows = batch.num_rows(); let bytes_per_row = incoming_batch_bytes / num_rows as u64; - // Compute how many rows fit within the threshold (at least 1 to make progress). let rows_per_slice = std::cmp::max( 1, (self.memory_threshold_bytes / bytes_per_row) as usize, @@ -198,23 +200,21 @@ impl SortingChunkedWriter { self.total_rows += len; offset += len; - // Flush after each slice that fills the budget. if self.current_chunk_bytes >= self.memory_threshold_bytes { - self.flush_and_sort_chunk()?; + self.flush_and_sort_chunk(reservation)?; } } } - // Safety net: flush if we ended up at or above the threshold. if self.current_chunk_bytes >= self.memory_threshold_bytes { - self.flush_and_sort_chunk()?; + self.flush_and_sort_chunk(reservation)?; } Ok(()) } /// Close the current IPC file, read it back, sort, write as sorted Parquet chunk. - fn flush_and_sort_chunk(&mut self) -> Result<(), Box> { + fn flush_and_sort_chunk(&mut self, reservation: &mut MemoryReservation) -> Result<(), Box> { use arrow::array::Int64Array; log_debug!( @@ -241,21 +241,36 @@ impl SortingChunkedWriter { } if batches.is_empty() { - // Nothing to sort, just reopen let _ = std::fs::remove_file(&ipc_path); self.open_new_ipc()?; return Ok(()); } - // Concat and sort + // Pre-reserve estimated sort memory (blocks if pool is full) + // Estimate = 2× data size (covers peak when two copies coexist: read+concat or concat+sort) + let estimated = self.estimate_sort_chunk_memory(); + reservation.request(estimated) + .map_err(|e| -> Box { + format!("Write memory pool timeout during sort: {}", e).into() + })?; + + log_info!( + "[ALLOC] flush_and_sort_chunk START: chunk_idx={}, data_bytes={}, estimated_reserve={}, num_batches={}, rows={}", + self.chunk_idx, self.current_chunk_bytes, estimated, + batches.len(), batches.iter().map(|b| b.num_rows()).sum::() + ); + + // Concat all batches into one let combined = concat_batches(&self.schema, &batches)?; - drop(batches); // free memory before sort allocates + drop(batches); + + // Sort let sorted_batch = NativeParquetWriter::sort_batch( &combined, &self.sort_columns, &self.reverse_sorts, &self.nulls_first, )?; - drop(combined); // free unsorted data + drop(combined); - // Capture original row IDs for permutation building, then rewrite to sequential 0..N + // Capture original row IDs for permutation building let row_id_col_idx = self.schema.fields().iter().position(|f| f.name() == ROW_ID_COLUMN_NAME); let final_batch = if let Some(idx) = row_id_col_idx { let row_id_array = sorted_batch.column(idx) @@ -291,13 +306,25 @@ impl SortingChunkedWriter { // Delete the IPC staging file and open a fresh one let _ = std::fs::remove_file(&ipc_path); self.open_new_ipc()?; + + // Release sort-phase memory — data is now on disk as Parquet chunk + reservation.shrink(estimated); + // Track chunk_row_ids growth (long-lived until finish()) + let row_ids_bytes = self.memory_size(); + let current = reservation.size(); + if row_ids_bytes > current { + reservation.grow(row_ids_bytes - current); + } + log_info!( + "[ALLOC] flush_and_sort_chunk DONE: chunk_idx={}, chunk_row_ids_bytes={}", self.chunk_idx - 1, row_ids_bytes + ); Ok(()) } /// Finalize: flush remaining IPC data (sort + write) and return chunk paths + row IDs + CRCs. - fn finish(mut self) -> Result<(Vec, Vec>, Vec), Box> { + fn finish(mut self, reservation: &mut MemoryReservation) -> Result<(Vec, Vec>, Vec), Box> { if self.current_rows > 0 { - self.flush_and_sort_chunk()?; + self.flush_and_sort_chunk(reservation)?; } // Close and remove the trailing IPC staging file if let Some(mut writer) = self.current_ipc_writer.take() { @@ -326,6 +353,13 @@ impl SortingChunkedWriter { .map(|ids| ids.len() * std::mem::size_of::()) .sum() } + + /// Estimates memory needed for flush_and_sort_chunk based on jemalloc findings. + /// Real memory ≈ 2× the data size (IPC read-back + concat coexist briefly). + /// The data size is approximated by current_chunk_bytes (tracked during IPC writes). + fn estimate_sort_chunk_memory(&self) -> usize { + (self.current_chunk_bytes as usize) * 2 + } } /// Bundles all per-writer resources so a single `DashMap::remove` atomically @@ -335,6 +369,7 @@ struct WriterState { settings: NativeSettings, crc_handle: Option, writer_generation: i64, + reservation: MemoryReservation, } /// Path suffix for the intermediate Arrow IPC file used during sort-on-close. @@ -389,8 +424,9 @@ impl NativeParquetWriter { let temp_filename = Self::temp_filename(&filename); if WRITERS.contains_key(&temp_filename) { - log_error!("ERROR: Writer already exists for file: {}", temp_filename); - return Err("Writer already exists for this file".into()); + // Stale writer from a failed engine — remove it so recovery can proceed. + log_info!("Removing stale writer for file: {} (likely from failed engine)", temp_filename); + WRITERS.remove(&temp_filename); } let arrow_schema = unsafe { FFI_ArrowSchema::from_raw(schema_address as *mut _) }; @@ -434,11 +470,19 @@ impl NativeParquetWriter { (WriterVariant::Parquet(Arc::new(Mutex::new(writer))), Some(crc_handle)) }; + let consumer = if matches!(&variant, WriterVariant::Ipc(_)) { + "parquet_writer_sorted" + } else { + "parquet_writer_unsorted" + }; + let reservation = MemoryReservation::new(write_pool(), consumer, native_bridge_common::memory_pool::PoolBehavior::Wait(WRITE_TIMEOUT)); + WRITERS.insert(temp_filename, WriterState { variant, settings, crc_handle, writer_generation, + reservation, }); Ok(()) @@ -464,18 +508,54 @@ impl NativeParquetWriter { let record_batch = RecordBatch::try_new(schema, struct_array.columns().to_vec())?; log_debug!("Created RecordBatch with {} rows and {} columns", record_batch.num_rows(), record_batch.num_columns()); - if let Some(state) = WRITERS.get_mut(&temp_filename) { - match &state.variant { - WriterVariant::Ipc(writer_arc) => { - log_debug!("Writing RecordBatch to IPC staging file"); - let mut writer = writer_arc.lock().unwrap(); - writer.write(&record_batch)?; - } - WriterVariant::Parquet(writer_arc) => { - log_debug!("Writing RecordBatch to Parquet file"); - let mut writer = writer_arc.lock().unwrap(); - writer.write(&record_batch)?; - } + if let Some(mut state) = WRITERS.get_mut(&temp_filename) { + let is_ipc = matches!(&state.variant, WriterVariant::Ipc(_)); + if is_ipc { + let writer_arc = match &state.variant { + WriterVariant::Ipc(w) => Arc::clone(w), + _ => unreachable!(), + }; + let batch_bytes = record_batch.get_array_memory_size(); + let num_cols = record_batch.num_columns(); + let num_rows = record_batch.num_rows(); + log_info!( + "[ALLOC] write_data IPC: batch_bytes={}, rows={}, cols={}, file={}", + batch_bytes, num_rows, num_cols, temp_filename + ); + let mut writer = writer_arc.lock().unwrap(); + writer.write(&record_batch, &mut state.reservation)?; + } else { + let writer_arc = match &state.variant { + WriterVariant::Parquet(w) => Arc::clone(w), + _ => unreachable!(), + }; + let batch_bytes = record_batch.get_array_memory_size(); + let num_cols = record_batch.num_columns(); + let num_rows = record_batch.num_rows(); + + // Pre-reserve estimated writer memory growth (4× batch size) + // Blocks if pool is full — before any allocation happens + let estimated_growth = batch_bytes * 4; + let before_mem = state.reservation.size(); // = previous writer.memory_size() + state.reservation.reserve_estimated(estimated_growth) + .map_err(|e| -> Box { + format!("Write memory pool timeout: {}", e).into() + })?; + + // Now write (memory is pre-reserved) + let mut writer = writer_arc.lock().unwrap(); + writer.write(&record_batch)?; + let actual_mem = writer.memory_size(); + drop(writer); + + let actual_growth = actual_mem.saturating_sub(before_mem); + log_info!( + "[ALLOC] write_data Parquet: batch_bytes={}, rows={}, cols={}, writer_memory={}, estimated_growth={}, actual_growth={}, file={}", + batch_bytes, num_rows, num_cols, actual_mem, estimated_growth, actual_growth, temp_filename + ); + + // Reconcile: adjust delta between estimate and actual growth + state.reservation.reconcile(estimated_growth, actual_growth); } Ok(()) } else { @@ -494,7 +574,7 @@ impl NativeParquetWriter { log_debug!("finalize_writer called for file: {} (temp: {})", filename, temp_filename); if let Some((_, state)) = WRITERS.remove(&temp_filename) { - let WriterState { variant, settings, crc_handle, writer_generation } = state; + let WriterState { variant, settings, crc_handle, writer_generation, mut reservation } = state; let index_name = settings.index_name.as_deref().unwrap_or(""); match variant { @@ -504,7 +584,7 @@ impl NativeParquetWriter { let chunked_writer = mutex.into_inner().unwrap(); let total_rows = chunked_writer.total_rows(); let schema = chunked_writer.schema.clone(); - let (chunk_paths, chunk_row_ids, chunk_crcs) = chunked_writer.finish()?; + let (chunk_paths, chunk_row_ids, chunk_crcs) = chunked_writer.finish(&mut reservation)?; log_info!( "Successfully closed sorting chunked writer for: {}, total_rows={}, chunks={}", temp_filename, total_rows, chunk_paths.len() @@ -630,6 +710,7 @@ impl NativeParquetWriter { } } Some(mapping) + // mapping_reservation dropped here — memory transferred to Java via Box::into_raw } else { None }; @@ -644,7 +725,10 @@ impl NativeParquetWriter { chunk_paths.len(), output_filename ); - let merge_output = merge_sorted( + let mut merge_reservation = MemoryReservation::new( + write_pool(), "parquet_writer_sorted:k_way_merge", + native_bridge_common::memory_pool::PoolBehavior::Wait(WRITE_TIMEOUT)); + let merge_output = merge_sorted_with_pool( chunk_paths, output_filename, index_name, @@ -652,6 +736,7 @@ impl NativeParquetWriter { reverse_sorts, nulls_first, writer_generation, + &mut merge_reservation, ) .map_err(|e| -> Box { format!("Streaming merge failed: {}", e).into() @@ -782,18 +867,8 @@ impl NativeParquetWriter { let mut total_memory = 0; for entry in WRITERS.iter() { if entry.key().starts_with(&path_prefix) { - match &entry.value().variant { - WriterVariant::Parquet(writer_arc) => { - if let Ok(writer) = writer_arc.lock() { - total_memory += writer.memory_size(); - } - } - WriterVariant::Ipc(writer_arc) => { - if let Ok(writer) = writer_arc.lock() { - total_memory += writer.memory_size(); - } - } - } + // WriterState.reservation tracks memory for both variants + total_memory += entry.value().reservation.size(); } } Ok(total_memory) From 5e7f83006a8ed8ee1c30dfdbd7946414ae2ebd5e Mon Sep 17 00:00:00 2001 From: rayshrey Date: Tue, 26 May 2026 23:33:58 +0530 Subject: [PATCH 2/2] Integrate Memory Pool with IMC and NativeAllocator Signed-off-by: rayshrey --- .../opensearch/arrow/spi/NativeAllocator.java | 13 ++ .../arrow/allocator/ArrowNativeAllocator.java | 77 ++++++++++ .../rust/common/src/memory_pool.rs | 6 + .../CompositeIndexingExecutionEngine.java | 9 ++ .../parquet/ParquetDataFormatPlugin.java | 37 +++++ .../opensearch/parquet/ParquetSettings.java | 11 ++ .../opensearch/parquet/bridge/RustBridge.java | 48 ++++++ .../src/main/rust/src/ffm.rs | 29 ++++ .../src/main/rust/src/merge/context.rs | 40 ++--- .../src/main/rust/src/merge/cursor.rs | 17 ++- .../src/main/rust/src/merge/sorted.rs | 8 +- .../src/main/rust/src/merge/unsorted.rs | 4 +- .../src/main/rust/src/writer.rs | 35 +++-- server/build.gradle | 1 + .../common/settings/ClusterSettings.java | 1 + .../index/engine/DataFormatAwareEngine.java | 7 +- .../dataformat/IndexingExecutionEngine.java | 9 ++ .../index/engine/exec/IndexerStatistics.java | 7 + .../opensearch/index/shard/IndexShard.java | 13 ++ .../indices/IndexingMemoryController.java | 140 ++++++++++++------ .../opensearch/indices/IndicesService.java | 6 + .../main/java/org/opensearch/node/Node.java | 4 + 22 files changed, 441 insertions(+), 81 deletions(-) diff --git a/libs/arrow-spi/src/main/java/org/opensearch/arrow/spi/NativeAllocator.java b/libs/arrow-spi/src/main/java/org/opensearch/arrow/spi/NativeAllocator.java index 425cc5795d358..bbde4fcd6e4e1 100644 --- a/libs/arrow-spi/src/main/java/org/opensearch/arrow/spi/NativeAllocator.java +++ b/libs/arrow-spi/src/main/java/org/opensearch/arrow/spi/NativeAllocator.java @@ -8,6 +8,8 @@ package org.opensearch.arrow.spi; +import org.opensearch.common.annotation.PublicApi; + import java.io.Closeable; /** @@ -23,6 +25,7 @@ * * @opensearch.api */ +@PublicApi(since = "3.0.0") public interface NativeAllocator extends Closeable { /** @@ -57,6 +60,16 @@ public interface NativeAllocator extends Closeable { */ NativeAllocatorPoolStats stats(); + /** + * Returns the virtual pool handle for a Rust-side memory pool, or null if not registered. + * + * @param poolName logical pool name (e.g., "write", "merge") + * @return the pool handle, or null + */ + default PoolHandle getVirtualPool(String poolName) { + return null; + } + /** * Opaque handle to a memory pool. Plugins downcast to the concrete type * (e.g., Arrow's {@code BufferAllocator}) in the implementation layer. diff --git a/plugins/arrow-base/src/main/java/org/opensearch/arrow/allocator/ArrowNativeAllocator.java b/plugins/arrow-base/src/main/java/org/opensearch/arrow/allocator/ArrowNativeAllocator.java index afab06a249d59..b179d237adf62 100644 --- a/plugins/arrow-base/src/main/java/org/opensearch/arrow/allocator/ArrowNativeAllocator.java +++ b/plugins/arrow-base/src/main/java/org/opensearch/arrow/allocator/ArrowNativeAllocator.java @@ -46,10 +46,12 @@ public class ArrowNativeAllocator implements NativeAllocator { private final RootAllocator root; private final ConcurrentMap pools = new ConcurrentHashMap<>(); + private final ConcurrentMap virtualPools = new ConcurrentHashMap<>(); private final ConcurrentMap poolMins = new ConcurrentHashMap<>(); private final ConcurrentMap poolMaxes = new ConcurrentHashMap<>(); private final ScheduledExecutorService rebalancer; private volatile ScheduledFuture rebalanceTask; + private volatile Runnable virtualPoolStatsRefresher; /** * True iff the rebalancer is configured to run periodically. Used by * {@link #getOrCreatePool} to decide each pool's initial child-allocator @@ -178,6 +180,11 @@ public void setRootLimit(long limit) { @Override public NativeAllocatorPoolStats stats() { + // Refresh Rust-side stats before collecting + Runnable refresher = this.virtualPoolStatsRefresher; + if (refresher != null) { + refresher.run(); + } List poolStats = new ArrayList<>(); for (var entry : pools.entrySet()) { BufferAllocator alloc = entry.getValue().allocator; @@ -191,6 +198,11 @@ public NativeAllocatorPoolStats stats() { ) ); } + // Include Rust-side virtual pools + for (var entry : virtualPools.entrySet()) { + VirtualPoolHandle vp = entry.getValue(); + poolStats.add(new NativeAllocatorPoolStats.PoolStats(entry.getKey(), vp.allocatedBytes(), vp.peakBytes(), vp.limit(), 0)); + } return new NativeAllocatorPoolStats(root.getAllocatedMemory(), root.getPeakMemoryAllocation(), root.getLimit(), poolStats); } @@ -352,4 +364,69 @@ public BufferAllocator getAllocator() { return allocator; } } + + /** + * A virtual pool handle for Rust-side memory pools that report stats back to Java + * without using Arrow's BufferAllocator. + */ + public static class VirtualPoolHandle implements PoolHandle { + private final String name; + private final long limit; + private volatile long allocatedBytes; + private volatile long peakBytes; + + VirtualPoolHandle(String name, long limit) { + this.name = name; + this.limit = limit; + } + + /** Called by the stats refresher to report current Rust-side usage. */ + public void updateStats(long allocated, long peak) { + this.allocatedBytes = allocated; + this.peakBytes = peak; + } + + @Override + public PoolHandle newChild(String childName, long childLimit) { + throw new UnsupportedOperationException("Virtual pool [" + name + "] does not support children"); + } + + @Override + public long allocatedBytes() { + return allocatedBytes; + } + + @Override + public long peakBytes() { + return peakBytes; + } + + @Override + public long limit() { + return limit; + } + + @Override + public void close() {} + } + + /** + * Registers a virtual pool for a Rust-side memory pool. + */ + public VirtualPoolHandle registerVirtualPool(String poolName, long limit) { + VirtualPoolHandle handle = new VirtualPoolHandle(poolName, limit); + virtualPools.put(poolName, handle); + return handle; + } + + /** Returns the virtual pool handle for a given name, or null if not registered. */ + @Override + public VirtualPoolHandle getVirtualPool(String poolName) { + return virtualPools.get(poolName); + } + + /** Registers a callback that refreshes virtual pool stats from the native layer. */ + public void setVirtualPoolStatsRefresher(Runnable refresher) { + this.virtualPoolStatsRefresher = refresher; + } } diff --git a/sandbox/libs/dataformat-native/rust/common/src/memory_pool.rs b/sandbox/libs/dataformat-native/rust/common/src/memory_pool.rs index 495bc13d9ab84..7b68e767ec721 100644 --- a/sandbox/libs/dataformat-native/rust/common/src/memory_pool.rs +++ b/sandbox/libs/dataformat-native/rust/common/src/memory_pool.rs @@ -374,6 +374,12 @@ impl MemoryReservation { self.consumer } + /// Returns a reference to the underlying pool (for direct grow/shrink when memory + /// outlives this reservation, e.g., FFI transfers to Java). + pub fn pool(&self) -> &Arc { + &self.pool + } + /// Create a sibling reservation from the same pool and behavior, with a different consumer name. pub fn child(&self, consumer: &'static str) -> Self { Self { diff --git a/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java index e932b7214dc6b..c1f311a711834 100644 --- a/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java +++ b/sandbox/plugins/composite-engine/src/main/java/org/opensearch/composite/CompositeIndexingExecutionEngine.java @@ -263,6 +263,15 @@ public long getNativeBytesUsed() { return total; } + @Override + public long getHeapBytesUsed() { + long total = primaryEngine.getHeapBytesUsed(); + for (IndexingExecutionEngine engine : secondaryEngines) { + total += engine.getHeapBytesUsed(); + } + return total; + } + /** * Deletes files from all per-format engines. If any engine fails, the first exception * is thrown with subsequent failures added as suppressed exceptions. diff --git a/sandbox/plugins/parquet-data-format/src/main/java/org/opensearch/parquet/ParquetDataFormatPlugin.java b/sandbox/plugins/parquet-data-format/src/main/java/org/opensearch/parquet/ParquetDataFormatPlugin.java index 5a368f9753cb8..eb69b4b8913b0 100644 --- a/sandbox/plugins/parquet-data-format/src/main/java/org/opensearch/parquet/ParquetDataFormatPlugin.java +++ b/sandbox/plugins/parquet-data-format/src/main/java/org/opensearch/parquet/ParquetDataFormatPlugin.java @@ -64,6 +64,10 @@ */ public class ParquetDataFormatPlugin extends Plugin implements DataFormatPlugin { + private static final org.apache.logging.log4j.Logger logger = org.apache.logging.log4j.LogManager.getLogger( + ParquetDataFormatPlugin.class + ); + /** * Current parquet writer format version, long-encoded (plugin-defined namespace; the * encoding happens to reuse {@code major * 1_000_000 + minor * 1_000 + patch} but is @@ -112,6 +116,39 @@ public Collection createComponents( .addSettingsUpdateConsumer(ParquetSettings.MAX_PER_VSR_ALLOCATION_DIVISOR, v -> this.maxPerVsrAllocationDivisor = v); this.nativeAllocator = pluginComponentRegistry.getComponent(ArrowNativeAllocator.class) .orElseThrow(() -> new IllegalStateException("ArrowNativeAllocator not available; arrow-base plugin must be installed")); + + // Register virtual pools for Rust-side write and merge memory tracking + // Budget = parquet.native_memory_percent of available native memory, split 60/40 write/merge + double nativePercent = ParquetSettings.NATIVE_MEMORY_PERCENT.get(settings); + long totalNativeMemory = org.opensearch.monitor.os.OsProbe.getInstance().getTotalPhysicalMemorySize() + - org.opensearch.monitor.jvm.JvmInfo.jvmInfo().getConfiguredMaxHeapSize(); + long rustBudget = (long) (totalNativeMemory * nativePercent / 100.0); + long writeLimit = (long) (rustBudget * 0.6); + long mergeLimit = (long) (rustBudget * 0.4); + var writePool = this.nativeAllocator.registerVirtualPool("write", writeLimit); + var mergePool = this.nativeAllocator.registerVirtualPool("merge", mergeLimit); + + // Initialize Rust-side pool limits (0 = unlimited for testing) + org.opensearch.parquet.bridge.RustBridge.initMemoryPools(0, 0); + logger.info( + "Native memory pools initialized: total_native={}MB, budget={}MB ({}%), write={}MB, merge={}MB", + totalNativeMemory / (1024 * 1024), + rustBudget / (1024 * 1024), + nativePercent, + writeLimit / (1024 * 1024), + mergeLimit / (1024 * 1024) + ); + this.nativeAllocator.setVirtualPoolStatsRefresher(() -> { + try { + long[] stats = org.opensearch.parquet.bridge.RustBridge.getPoolStats(); + // stats: [write_limit, write_used, write_peak, merge_limit, merge_used, merge_peak] + writePool.updateStats(stats[1], stats[2]); + mergePool.updateStats(stats[4], stats[5]); + } catch (Exception e) { + // Best-effort — stats may be stale if native lib is unavailable + } + }); + return Collections.emptyList(); } diff --git a/sandbox/plugins/parquet-data-format/src/main/java/org/opensearch/parquet/ParquetSettings.java b/sandbox/plugins/parquet-data-format/src/main/java/org/opensearch/parquet/ParquetSettings.java index 2e8039b00be70..1c0af097acbf2 100644 --- a/sandbox/plugins/parquet-data-format/src/main/java/org/opensearch/parquet/ParquetSettings.java +++ b/sandbox/plugins/parquet-data-format/src/main/java/org/opensearch/parquet/ParquetSettings.java @@ -37,6 +37,16 @@ private ParquetSettings() {} public static final String DEFAULT_MAX_NATIVE_ALLOCATION = "10%"; public static final int DEFAULT_MAX_ROWS_PER_VSR = 65536; + /** Percentage of available native memory (physical - JVM heap) allocated for Rust write+merge pools. + * Split 60/40 between write and merge. Default: 5%. */ + public static final Setting NATIVE_MEMORY_PERCENT = Setting.doubleSetting( + "parquet.native_memory_percent", + 5.0, + 0.0, + 100.0, + Setting.Property.NodeScope + ); + /** Data page size limit in bytes (default 1MB). */ public static final Setting PAGE_SIZE_BYTES = Setting.byteSizeSetting( "index.parquet.page_size_bytes", @@ -668,6 +678,7 @@ public static void validateFieldConfigurations( /** Returns all settings defined by the Parquet plugin. */ public static List> getSettings() { return List.of( + NATIVE_MEMORY_PERCENT, PAGE_SIZE_BYTES, PAGE_ROW_LIMIT, DICT_SIZE_BYTES, diff --git a/sandbox/plugins/parquet-data-format/src/main/java/org/opensearch/parquet/bridge/RustBridge.java b/sandbox/plugins/parquet-data-format/src/main/java/org/opensearch/parquet/bridge/RustBridge.java index e59a3549a0dd1..fc696fe30af3e 100644 --- a/sandbox/plugins/parquet-data-format/src/main/java/org/opensearch/parquet/bridge/RustBridge.java +++ b/sandbox/plugins/parquet-data-format/src/main/java/org/opensearch/parquet/bridge/RustBridge.java @@ -45,6 +45,8 @@ public class RustBridge { private static final MethodHandle FREE_MERGE_RESULT; private static final MethodHandle READ_AS_JSON; private static final MethodHandle FREE_ROW_ID_MAPPING; + private static final MethodHandle GET_POOL_STATS; + private static final MethodHandle INIT_MEMORY_POOLS; static { SymbolLookup lib = NativeLibraryLoader.symbolLookup(); @@ -251,6 +253,14 @@ public class RustBridge { ValueLayout.JAVA_LONG // mapping_len ) ); + GET_POOL_STATS = linker.downcallHandle( + lib.find("parquet_get_pool_stats").orElseThrow(), + FunctionDescriptor.of(ValueLayout.JAVA_LONG, ValueLayout.ADDRESS, ValueLayout.JAVA_LONG) + ); + INIT_MEMORY_POOLS = linker.downcallHandle( + lib.find("parquet_init_memory_pools").orElseThrow(), + FunctionDescriptor.of(ValueLayout.JAVA_LONG, ValueLayout.JAVA_LONG, ValueLayout.JAVA_LONG) + ); } public static void initLogger() {} @@ -688,5 +698,43 @@ private static LongMapArrays toLongMapArrays(NativeCall call, Map return new LongMapArrays(call.strArray(keys), seg); } + /** + * Initializes Rust-side write and merge memory pool limits. + */ + public static void initMemoryPools(long writeLimit, long mergeLimit) { + try { + long rc = (long) INIT_MEMORY_POOLS.invokeExact(writeLimit, mergeLimit); + if (rc != 0) { + throw new RuntimeException("parquet_init_memory_pools returned error: " + rc); + } + } catch (Throwable t) { + throw new RuntimeException("parquet_init_memory_pools failed", t); + } + } + + /** + * Fetches write and merge pool stats from Rust in a single FFM call. + * Returns [writeUsed, writePeak, writeLimit, mergeUsed, mergePeak, mergeLimit]. + */ + public static long[] getPoolStats() { + try (var arena = java.lang.foreign.Arena.ofConfined()) { + var buf = arena.allocate(ValueLayout.JAVA_LONG, 6); + long rc; + try { + rc = (long) GET_POOL_STATS.invokeExact(buf, 6L); + } catch (Throwable t) { + throw new RuntimeException("parquet_get_pool_stats failed", t); + } + if (rc != 0) { + throw new RuntimeException("parquet_get_pool_stats returned error: " + rc); + } + long[] stats = new long[6]; + for (int i = 0; i < 6; i++) { + stats[i] = buf.getAtIndex(ValueLayout.JAVA_LONG, i); + } + return stats; + } + } + private RustBridge() {} } diff --git a/sandbox/plugins/parquet-data-format/src/main/rust/src/ffm.rs b/sandbox/plugins/parquet-data-format/src/main/rust/src/ffm.rs index 0ca50c55dbf6d..7ff912548a2b4 100644 --- a/sandbox/plugins/parquet-data-format/src/main/rust/src/ffm.rs +++ b/sandbox/plugins/parquet-data-format/src/main/rust/src/ffm.rs @@ -559,6 +559,9 @@ pub unsafe extern "C" fn parquet_merge_files( // Write row-ID mapping into out-pointers as heap-allocated arrays. // Java reads them and then calls parquet_free_merge_result to deallocate. let mapping = result.mapping.into_boxed_slice(); + // POOL: track mapping memory from now until Java calls parquet_free_merge_result + let mapping_track_bytes = mapping.len() * std::mem::size_of::(); + crate::memory::merge_pool().grow(mapping_track_bytes, "merge:mapping_to_java"); *out_mapping_len = mapping.len() as i64; *out_mapping_ptr = Box::into_raw(mapping) as *mut i64 as i64; @@ -585,6 +588,9 @@ pub unsafe extern "C" fn parquet_free_merge_result( gen_count: i64, ) { if mapping_ptr != 0 && mapping_len > 0 { + // POOL: shrink merge pool — mapping memory being freed by Java + let mapping_bytes = mapping_len as usize * std::mem::size_of::(); + crate::memory::merge_pool().shrink(mapping_bytes, "merge:mapping_to_java"); let _ = Box::from_raw(slice::from_raw_parts_mut(mapping_ptr as *mut i64, mapping_len as usize)); } let n = gen_count as usize; @@ -691,6 +697,9 @@ pub unsafe extern "C" fn parquet_free_row_id_mapping( mapping_len: i64, ) { if mapping_ptr != 0 && mapping_len > 0 { + // POOL: shrink write pool — mapping from finalize_sorted_chunks being freed by Java + let mapping_bytes = mapping_len as usize * std::mem::size_of::(); + crate::memory::write_pool().shrink(mapping_bytes, "write:mapping_to_java"); let _ = Box::from_raw(slice::from_raw_parts_mut(mapping_ptr as *mut i64, mapping_len as usize)); } } @@ -708,6 +717,26 @@ pub extern "C" fn parquet_init_memory_pools(write_limit: i64, merge_limit: i64) 0 } +/// Write pool stats into caller-provided buffer: [write_limit, write_used, write_peak, merge_limit, merge_used, merge_peak]. +/// `out` must point to at least `out_len` i64 slots (minimum 6). +#[no_mangle] +pub extern "C" fn parquet_get_pool_stats(out: *mut i64, out_len: i64) -> i64 { + if out.is_null() || out_len < 6 { + return -1; + } + let wp = crate::memory::write_pool(); + let mp = crate::memory::merge_pool(); + unsafe { + *out.add(0) = wp.limit() as i64; + *out.add(1) = wp.used() as i64; + *out.add(2) = wp.peak() as i64; + *out.add(3) = mp.limit() as i64; + *out.add(4) = mp.used() as i64; + *out.add(5) = mp.peak() as i64; + } + 0 +} + /// Set write pool limit at runtime. #[no_mangle] pub extern "C" fn parquet_set_write_pool_limit(limit: i64) -> i64 { diff --git a/sandbox/plugins/parquet-data-format/src/main/rust/src/merge/context.rs b/sandbox/plugins/parquet-data-format/src/main/rust/src/merge/context.rs index 47b22feffc593..bd84436b95c2f 100644 --- a/sandbox/plugins/parquet-data-format/src/main/rust/src/merge/context.rs +++ b/sandbox/plugins/parquet-data-format/src/main/rust/src/merge/context.rs @@ -134,18 +134,12 @@ impl MergeContext { /// Buffers a batch (already padded to data_schema) and auto-flushes when /// the row count threshold is reached. + /// NOTE: Batches are often slices of a larger cursor batch and share the parent's + /// memory buffers. We do NOT track them in the reservation here — actual memory + /// allocation only happens in flush() when concat_batches creates a new contiguous batch. pub fn push_batch(&mut self, batch: RecordBatch) -> MergeResult<()> { - let batch_bytes = batch.get_array_memory_size(); let num_rows = batch.num_rows(); - let num_cols = batch.num_columns(); - log_info!( - "[ALLOC] merge push_batch: batch_bytes={}, rows={}, cols={}, buffered_chunks={}, total_buffered_rows={}", - batch_bytes, num_rows, num_cols, self.output_chunks.len(), self.output_row_count - ); - if let Err(e) = self.reservation.request(batch_bytes) { - return Err(MergeError::Logic(format!("Merge memory limit exceeded: {}", e))); - } - self.output_row_count += batch.num_rows(); + self.output_row_count += num_rows; self.output_chunks.push(batch); if self.output_row_count >= self.output_flush_rows { self.flush()?; @@ -160,7 +154,8 @@ impl MergeContext { return Ok(()); } - let merged = if self.output_chunks.len() == 1 { + let num_chunks = self.output_chunks.len(); + let merged = if num_chunks == 1 { self.output_chunks.pop().unwrap() } else { let m = concat_batches(&self.data_schema, self.output_chunks.as_slice())?; @@ -169,7 +164,8 @@ impl MergeContext { }; let n = merged.num_rows(); - // Track temporary spike: merged + with_id coexist briefly + // RESERVATION: request for concat result — first real allocation in this flush cycle. + // Fallible (try_grow or wait_and_grow depending on pool behavior). let merged_bytes = merged.get_array_memory_size(); if let Err(e) = self.reservation.request(merged_bytes) { return Err(MergeError::Logic(format!("Merge memory limit exceeded during flush (concat): {}", e))); @@ -177,12 +173,17 @@ impl MergeContext { let with_id = append_row_id(&merged, self.next_row_id, &self.output_schema)?; let with_id_bytes = with_id.get_array_memory_size(); + // RESERVATION: request for with_id batch — coexists briefly with merged. if let Err(e) = self.reservation.request(with_id_bytes) { return Err(MergeError::Logic(format!("Merge memory limit exceeded during flush (with_id): {}", e))); } + log_info!( - "[ALLOC] merge flush: merged_bytes={}, with_id_bytes={}, rows={}", merged_bytes, with_id_bytes, n + "[ALLOC] merge flush: merged_bytes={}, with_id_bytes={}, rows={}, chunks_merged={}", + merged_bytes, with_id_bytes, n, num_chunks ); + + // RESERVATION: shrink merged_bytes — merged batch dropped, with_id holds the data now. drop(merged); self.reservation.shrink(merged_bytes); @@ -217,7 +218,8 @@ impl MergeContext { encoded_chunks.push(r?); } - // Track encoded column memory (exists between encoding and IO send) + // RESERVATION: infallible grow for encoded column chunks — memory exists between + // encoding completion and IO send. Cannot reject since encoding already happened. let encoded_bytes: usize = encoded_chunks.iter() .map(|c| c.close().bytes_written as usize) .sum(); @@ -227,7 +229,9 @@ impl MergeContext { .blocking_send(IoCommand::WriteRowGroup(encoded_chunks)) .map_err(|_| MergeError::Logic("IO task terminated unexpectedly".into()))?; - // Release: with_id + encoded chunks (moved to IO) + // RESERVATION: shrink (with_id + encoded) — both transferred to IO task. + // Net effect of this flush: reservation.size returns to 0. + // +merged_bytes +with_id_bytes -merged_bytes +encoded_bytes -(with_id_bytes + encoded_bytes) = 0 self.reservation.shrink(with_id_bytes + encoded_bytes); self.row_group_index += 1; @@ -235,8 +239,10 @@ impl MergeContext { self.total_rows_written += n; self.output_row_count = 0; - // Release buffered batch memory — data has been encoded and sent to IO - self.reservation.free(); + log_info!( + "[ALLOC] merge flush complete: row_group={}, rows={}, reservation_size={}", + self.row_group_index - 1, n, self.reservation.size() + ); log_debug!( "[RUST] Flushed row group {}: {} rows (total: {})", diff --git a/sandbox/plugins/parquet-data-format/src/main/rust/src/merge/cursor.rs b/sandbox/plugins/parquet-data-format/src/main/rust/src/merge/cursor.rs index 8b3687a2f2a26..52bc1915cd851 100644 --- a/sandbox/plugins/parquet-data-format/src/main/rust/src/merge/cursor.rs +++ b/sandbox/plugins/parquet-data-format/src/main/rust/src/merge/cursor.rs @@ -84,7 +84,8 @@ impl FileCursor { .build()?; // Estimate cursor memory: batch_size rows × avg_row_bytes × 2 (current + prefetch) - // Use uncompressed size from metadata as estimate + // RESERVATION: reserve_estimated for cursor memory (current batch + prefetch buffer). + // Estimate = 2× (num_fields × batch_size × 8). Reconciled after first batch read. let total_uncompressed: usize = parquet_schema_descr.root_schema().get_fields().len() * batch_size * 8; // rough estimate let cursor_estimate = total_uncompressed * 2; let estimate = reservation.reserve_estimated(cursor_estimate) @@ -101,7 +102,8 @@ impl FileCursor { } }; - // Reconcile with actual batch size × 2 (for prefetch) + // RESERVATION: reconcile estimate with actual first batch size × 2 (current + prefetch). + // If actual < estimate: shrinks the over-reservation. If actual > estimate: grows (infallible). let actual_batch_bytes = first_batch.get_array_memory_size(); let actual_cursor_bytes = actual_batch_bytes * 2; reservation.reconcile(estimate, actual_cursor_bytes); @@ -167,6 +169,13 @@ impl FileCursor { }); } + /// Loads the next batch from the prefetch channel, replacing the current batch. + /// + /// # Reservation accounting (infallible grow/shrink on parent reservation): + /// - On success: adjusts by net delta (new_bytes - old_bytes). Uses infallible `grow`/`shrink` + /// because the memory is already allocated by the prefetch thread — we're just tracking it. + /// This means pool.used can temporarily exceed pool.limit without rejection. + /// - On exhaustion/error: shrinks by old_bytes (cursor no longer holds any batch). pub fn load_next_batch(&mut self, reservation: &mut MemoryReservation) -> MergeResult { let old_bytes = self.current_batch_bytes; self.current_batch = None; @@ -178,7 +187,7 @@ impl FileCursor { self.row_idx = 0; self.prefetch_pending = false; self.start_prefetch(); - // Adjust reservation: shrink old, grow new (net delta) + // RESERVATION: infallible delta adjustment — memory already exists if new_bytes > old_bytes { reservation.grow(new_bytes - old_bytes); } else if new_bytes < old_bytes { @@ -189,12 +198,14 @@ impl FileCursor { } Ok(Some(Err(e))) => { self.prefetch_pending = false; + // RESERVATION: cursor error — release all batch memory reservation.shrink(old_bytes); self.current_batch_bytes = 0; Err(e) } Ok(None) | Err(_) => { self.prefetch_pending = false; + // RESERVATION: cursor exhausted — release all batch memory reservation.shrink(old_bytes); self.current_batch_bytes = 0; Ok(false) diff --git a/sandbox/plugins/parquet-data-format/src/main/rust/src/merge/sorted.rs b/sandbox/plugins/parquet-data-format/src/main/rust/src/merge/sorted.rs index 6176afcd3d916..1de3aedd83ad3 100644 --- a/sandbox/plugins/parquet-data-format/src/main/rust/src/merge/sorted.rs +++ b/sandbox/plugins/parquet-data-format/src/main/rust/src/merge/sorted.rs @@ -117,6 +117,8 @@ pub fn merge_sorted_with_pool( let num_cursors = cursors.len(); // ── Phase 2: Create MergeContext (union schemas, writer, IO task) ─── + // RESERVATION: child("merge:output_buffer") — sibling reservation on the same pool. + // Tracks memory for concat/encode operations in flush(). Dropped when MergeContext is consumed. let ctx_reservation = reservation.child("merge:output_buffer"); let mut ctx = MergeContext::new( arrow_schemas.clone(), @@ -138,10 +140,10 @@ pub fn merge_sorted_with_pool( // Row-ID mapping: pre-allocate the flat mapping array and compute offsets // from file metadata row counts (known before reading any data). let total_rows: usize = file_row_counts.iter().sum(); - // Track mapping vec allocation — use reserve_estimated so merge can be rejected before allocating + // Mapping vec: total_rows × 8 bytes. NOT tracked in the reservation here because + // the mapping is passed to the caller (and ultimately to Java via FFI). The caller + // is responsible for pool tracking at the point of FFI transfer. let mapping_bytes = total_rows * std::mem::size_of::(); - reservation.reserve_estimated(mapping_bytes) - .map_err(|e| super::MergeError::Logic(format!("Merge memory limit exceeded (mapping): {}", e)))?; log_info!( "[ALLOC] merge_sorted: mapping_vec={} bytes, total_rows={}, num_files={}, batch_size={}", mapping_bytes, total_rows, input_files.len(), batch_size diff --git a/sandbox/plugins/parquet-data-format/src/main/rust/src/merge/unsorted.rs b/sandbox/plugins/parquet-data-format/src/main/rust/src/merge/unsorted.rs index 92332cbfc6c04..7f717ebdf233b 100644 --- a/sandbox/plugins/parquet-data-format/src/main/rust/src/merge/unsorted.rs +++ b/sandbox/plugins/parquet-data-format/src/main/rust/src/merge/unsorted.rs @@ -109,8 +109,8 @@ pub fn merge_unsorted_with_pool( // old_row_id maps directly to new_row_id with a per-file offset. let total_rows: usize = file_row_counts.iter().sum(); let mapping_bytes = total_rows * std::mem::size_of::(); - reservation.reserve_estimated(mapping_bytes) - .map_err(|e| super::MergeError::Logic(format!("Merge memory limit exceeded (mapping): {}", e)))?; + // Mapping vec: NOT tracked in reservation — passed to caller/Java via FFI. + // Pool tracking happens at the FFI boundary (parquet_merge_files / parquet_free_merge_result). log_info!( "[ALLOC] merge_unsorted: mapping_vec={} bytes, total_rows={}, num_files={}", mapping_bytes, total_rows, input_files.len() diff --git a/sandbox/plugins/parquet-data-format/src/main/rust/src/writer.rs b/sandbox/plugins/parquet-data-format/src/main/rust/src/writer.rs index 3828bec4ce06e..5923bbb536cb2 100644 --- a/sandbox/plugins/parquet-data-format/src/main/rust/src/writer.rs +++ b/sandbox/plugins/parquet-data-format/src/main/rust/src/writer.rs @@ -246,8 +246,9 @@ impl SortingChunkedWriter { return Ok(()); } - // Pre-reserve estimated sort memory (blocks if pool is full) - // Estimate = 2× data size (covers peak when two copies coexist: read+concat or concat+sort) + // RESERVATION: request for sort-phase memory. Estimate = 2× chunk data size + // (covers peak when IPC read-back + concat coexist, or concat + sort coexist). + // Fallible — blocks (Wait behavior) until pool has capacity. let estimated = self.estimate_sort_chunk_memory(); reservation.request(estimated) .map_err(|e| -> Box { @@ -307,9 +308,11 @@ impl SortingChunkedWriter { let _ = std::fs::remove_file(&ipc_path); self.open_new_ipc()?; - // Release sort-phase memory — data is now on disk as Parquet chunk + // RESERVATION: shrink(estimated) — sort phase complete, data written to disk as Parquet chunk. reservation.shrink(estimated); - // Track chunk_row_ids growth (long-lived until finish()) + // RESERVATION: infallible grow for chunk_row_ids delta — these Vec are long-lived + // (held until finish() returns). Each chunk adds its own row IDs; we track the cumulative size. + // grow(new_total - current) ensures we only add the delta from this chunk. let row_ids_bytes = self.memory_size(); let current = reservation.size(); if row_ids_bytes > current { @@ -533,10 +536,12 @@ impl NativeParquetWriter { let num_cols = record_batch.num_columns(); let num_rows = record_batch.num_rows(); - // Pre-reserve estimated writer memory growth (4× batch size) - // Blocks if pool is full — before any allocation happens + // RESERVATION: reserve_estimated(4× batch_bytes) — pre-reserves before + // the Parquet writer allocates internal buffers. The 4× multiplier covers: + // raw batch + column encoding buffers + dictionary overhead + page buffer. + // Fallible (Wait behavior) — blocks until pool has capacity. let estimated_growth = batch_bytes * 4; - let before_mem = state.reservation.size(); // = previous writer.memory_size() + let before_mem = state.reservation.size(); state.reservation.reserve_estimated(estimated_growth) .map_err(|e| -> Box { format!("Write memory pool timeout: {}", e).into() @@ -554,7 +559,9 @@ impl NativeParquetWriter { batch_bytes, num_rows, num_cols, actual_mem, estimated_growth, actual_growth, temp_filename ); - // Reconcile: adjust delta between estimate and actual growth + // RESERVATION: reconcile(estimated_growth, actual_growth) — corrects the + // over/under-estimate. If actual < estimated (typical): shrinks the excess. + // If actual > estimated: infallible grow (memory already allocated by writer). state.reservation.reconcile(estimated_growth, actual_growth); } Ok(()) @@ -710,11 +717,16 @@ impl NativeParquetWriter { } } Some(mapping) - // mapping_reservation dropped here — memory transferred to Java via Box::into_raw + // POOL: tracked via write_pool().grow below — freed by parquet_free_row_id_mapping } else { None }; + // POOL: track mapping memory on write pool until Java calls parquet_free_row_id_mapping + if let Some(ref m) = row_id_mapping { + crate::memory::write_pool().grow(m.len() * std::mem::size_of::(), "write:mapping_to_java"); + } + return Ok((crc32, row_id_mapping)); } @@ -770,6 +782,11 @@ impl NativeParquetWriter { None }; + // POOL: track mapping memory on write pool until Java calls parquet_free_row_id_mapping + if let Some(ref m) = row_id_mapping { + crate::memory::write_pool().grow(m.len() * std::mem::size_of::(), "write:mapping_to_java"); + } + log_info!( "finalize_sorted_chunks: DONE file={}, chunks={}, merge_duration={:?}", output_filename, chunk_paths.len(), merge_duration diff --git a/server/build.gradle b/server/build.gradle index 4833c8b059991..24894b398304c 100644 --- a/server/build.gradle +++ b/server/build.gradle @@ -73,6 +73,7 @@ dependencies { api project(":libs:opensearch-geo") api project(":libs:opensearch-telemetry") api project(":libs:opensearch-task-commons") + api project(":libs:opensearch-arrow-spi") compileOnly project(":libs:agent-sm:bootstrap") diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 3306e612e3b61..75020f9915248 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -649,6 +649,7 @@ public void apply(Settings value, Settings current, Settings previous) { IndexingMemoryController.MAX_INDEX_BUFFER_SIZE_SETTING, IndexingMemoryController.SHARD_INACTIVE_TIME_SETTING, IndexingMemoryController.SHARD_MEMORY_INTERVAL_TIME_SETTING, + IndexingMemoryController.NATIVE_INDEX_BUFFER_PERCENT_SETTING, ResourceWatcherService.ENABLED, ResourceWatcherService.RELOAD_INTERVAL_HIGH, ResourceWatcherService.RELOAD_INTERVAL_MEDIUM, diff --git a/server/src/main/java/org/opensearch/index/engine/DataFormatAwareEngine.java b/server/src/main/java/org/opensearch/index/engine/DataFormatAwareEngine.java index a1a9f910aeb72..b1cf4065948fc 100644 --- a/server/src/main/java/org/opensearch/index/engine/DataFormatAwareEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/DataFormatAwareEngine.java @@ -1118,9 +1118,14 @@ public void forceMerge( } /** {@inheritDoc} Returns the RAM bytes used by the indexing execution engine. */ + @Override + public long getHeapBytesUsed() { + return indexingExecutionEngine.getHeapBytesUsed(); + } + @Override public long getIndexBufferRAMBytesUsed() { - return indexingExecutionEngine.getNativeBytesUsed(); + return getHeapBytesUsed(); } /** {@inheritDoc} Activates write throttling when merge pressure increases. */ diff --git a/server/src/main/java/org/opensearch/index/engine/dataformat/IndexingExecutionEngine.java b/server/src/main/java/org/opensearch/index/engine/dataformat/IndexingExecutionEngine.java index 46cb8abb6f2cb..4816b529e7e21 100644 --- a/server/src/main/java/org/opensearch/index/engine/dataformat/IndexingExecutionEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/dataformat/IndexingExecutionEngine.java @@ -78,6 +78,15 @@ default long getNativeBytesUsed() { return 0; } + /** + * Returns JVM heap bytes used by indexing buffers that would be freed by a refresh. + * + * @return heap usage in bytes + */ + default long getHeapBytesUsed() { + return 0; + } + /** * Deletes the specified files grouped by directory. * diff --git a/server/src/main/java/org/opensearch/index/engine/exec/IndexerStatistics.java b/server/src/main/java/org/opensearch/index/engine/exec/IndexerStatistics.java index 2bb4f953e20a5..8a8870918286a 100644 --- a/server/src/main/java/org/opensearch/index/engine/exec/IndexerStatistics.java +++ b/server/src/main/java/org/opensearch/index/engine/exec/IndexerStatistics.java @@ -105,6 +105,13 @@ default long getNativeBytesUsed() { return 0; } + /** + * Returns JVM heap bytes used by indexing buffers. + */ + default long getHeapBytesUsed() { + return 0; + } + /** * Returns per-segment metadata for this indexer. * Each segment includes committed/search state, doc count, and size. diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 7444460d694cc..9f4e2a2f4b753 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -3400,6 +3400,19 @@ public long getIndexBufferRAMBytesUsed() { } } + /** Returns native (off-heap) bytes used by indexing buffers for this shard, or 0 if closed. */ + public long getNativeBytesUsed() { + Indexer engine = getIndexerOrNull(); + if (engine == null) { + return 0; + } + try { + return engine.getNativeBytesUsed(); + } catch (AlreadyClosedException ex) { + return 0; + } + } + public void addShardFailureCallback(Consumer onShardFailure) { this.shardEventListener.delegates.add(onShardFailure); } diff --git a/server/src/main/java/org/opensearch/indices/IndexingMemoryController.java b/server/src/main/java/org/opensearch/indices/IndexingMemoryController.java index 93b75218fd1c6..0d31b507e349f 100644 --- a/server/src/main/java/org/opensearch/indices/IndexingMemoryController.java +++ b/server/src/main/java/org/opensearch/indices/IndexingMemoryController.java @@ -122,6 +122,19 @@ public class IndexingMemoryController implements IndexingOperationListener, Clos private final TimeValue inactiveTime; private final TimeValue interval; + /** Percentage of (ingest pool + write pool) to use as native memory budget. */ + public static final Setting NATIVE_INDEX_BUFFER_PERCENT_SETTING = Setting.doubleSetting( + "indices.memory.native_index_buffer_percent", + 80.0, + 0.0, + 100.0, + Property.NodeScope + ); + + /** Native allocator for querying pool limits. Set after plugins load. */ + private volatile org.opensearch.arrow.spi.NativeAllocator nativeAllocator; + private final double nativeBufferPercent; + /** Contains shards currently being throttled because we can't write segments quickly enough */ private final Set throttled = new HashSet<>(); @@ -159,6 +172,8 @@ public class IndexingMemoryController implements IndexingOperationListener, Clos // we need to have this relatively small to free up heap quickly enough this.interval = SHARD_MEMORY_INTERVAL_TIME_SETTING.get(settings); + this.nativeBufferPercent = NATIVE_INDEX_BUFFER_PERCENT_SETTING.get(settings); + this.statusChecker = new ShardsIndicesStatusChecker(); logger.debug( @@ -175,6 +190,16 @@ public class IndexingMemoryController implements IndexingOperationListener, Clos this.threadPool = threadPool; } + /** Sets the native allocator after plugins are loaded. */ + public void setNativeAllocator(org.opensearch.arrow.spi.NativeAllocator allocator) { + this.nativeAllocator = allocator; + } + + /** Returns how much native (off-heap) memory this shard is using for its indexing buffer. */ + protected long getNativeBytesUsed(IndexShard shard) { + return shard.getNativeBytesUsed(); + } + protected Cancellable scheduleTask(ThreadPool threadPool) { // it's fine to run it on the scheduler thread, no busy work return threadPool.scheduleWithFixedDelay(statusChecker, interval, Names.SAME); @@ -333,9 +358,26 @@ private void runUnlocked() { // NOTE: even if we hit an errant exc here, our ThreadPool.scheduledWithFixedDelay will log the exception and re-invoke us // again, on schedule - // First pass to sum up how much heap all shards' indexing buffers are using now, and how many bytes they are currently moving - // to disk: - long totalBytesUsed = 0; + // Compute native budget dynamically from allocator pool limits + long nativeBudget = 0; + org.opensearch.arrow.spi.NativeAllocator na = nativeAllocator; + if (na != null) { + try { + org.opensearch.arrow.spi.NativeAllocatorPoolStats stats = na.stats(); + long ingestLimit = 0, writeLimit = 0; + for (var ps : stats.getPools()) { + if ("ingest".equals(ps.getName())) ingestLimit = ps.getLimitBytes(); + if ("write".equals(ps.getName())) writeLimit = ps.getLimitBytes(); + } + nativeBudget = (long) ((ingestLimit + writeLimit) * nativeBufferPercent / 100); + } catch (Exception e) { + logger.debug("Failed to compute native budget from allocator", e); + } + } + + // First pass: sum heap and native usage across all shards + long totalHeapBytesUsed = 0; + long totalNativeUsed = 0; long totalBytesWriting = 0; for (IndexShard shard : availableShards()) { @@ -351,87 +393,93 @@ private void runUnlocked() { shardBytesUsed -= shardWritingBytes; totalBytesWriting += shardWritingBytes; - // If the refresh completed just after we pulled shardWritingBytes and before we pulled shardBytesUsed, then we could - // have a negative value here. So we just skip this shard since that means it's now using very little heap: if (shardBytesUsed < 0) { - continue; + shardBytesUsed = 0; } - totalBytesUsed += shardBytesUsed; + totalHeapBytesUsed += shardBytesUsed; + totalNativeUsed += getNativeBytesUsed(shard); } if (logger.isTraceEnabled()) { logger.trace( - "total indexing heap bytes used [{}] vs {} [{}], currently writing bytes [{}]", - new ByteSizeValue(totalBytesUsed), - INDEX_BUFFER_SIZE_SETTING.getKey(), + "total heap used [{}] vs heap budget [{}], native used [{}] vs native budget [{}], writing [{}]", + new ByteSizeValue(totalHeapBytesUsed), indexingBuffer, + new ByteSizeValue(totalNativeUsed), + new ByteSizeValue(nativeBudget), new ByteSizeValue(totalBytesWriting) ); } - // If we are using more than 50% of our budget across both indexing buffer and bytes we are still moving to disk, then we now - // throttle the top shards to send back-pressure to ongoing indexing: - boolean doThrottle = (totalBytesWriting + totalBytesUsed) > 1.5 * indexingBuffer.getBytes(); + boolean heapOverBudget = totalHeapBytesUsed > indexingBuffer.getBytes(); + boolean nativeOverBudget = nativeBudget > 0 && totalNativeUsed > nativeBudget; + + // Throttle if significantly over budget + boolean doThrottle = (totalBytesWriting + totalHeapBytesUsed) > 1.5 * indexingBuffer.getBytes() + || (nativeBudget > 0 && totalNativeUsed > 1.5 * nativeBudget); + + if (heapOverBudget || nativeOverBudget) { + if (nativeOverBudget) { + logger.info( + "IMC native over budget: native_used={}, native_budget={}, triggering refresh", + new ByteSizeValue(totalNativeUsed), + new ByteSizeValue(nativeBudget) + ); + } - if (totalBytesUsed > indexingBuffer.getBytes()) { - // OK we are now over-budget; fill the priority queue and ask largest shard(s) to refresh: + // Build priority queue PriorityQueue queue = new PriorityQueue<>(); for (IndexShard shard : availableShards()) { - // How many bytes this shard is currently (async'd) moving from heap to disk: long shardWritingBytes = getShardWritingBytes(shard); - - // How many heap bytes this shard is currently using long shardBytesUsed = getIndexBufferRAMBytesUsed(shard); - - // Only count up bytes not already being refreshed: shardBytesUsed -= shardWritingBytes; - - // If the refresh completed just after we pulled shardWritingBytes and before we pulled shardBytesUsed, then we could - // have a negative value here. So we just skip this shard since that means it's now using very little heap: if (shardBytesUsed < 0) { - continue; + shardBytesUsed = 0; } - if (shardBytesUsed > 0) { - if (logger.isTraceEnabled()) { - if (shardWritingBytes != 0) { - logger.trace( - "shard [{}] is using [{}] heap, writing [{}] heap", - shard.shardId(), - shardBytesUsed, - shardWritingBytes - ); - } else { - logger.trace("shard [{}] is using [{}] heap, not writing any bytes", shard.shardId(), shardBytesUsed); - } - } - queue.add(new ShardAndBytesUsed(shardBytesUsed, shard)); + long shardNativeUsed = getNativeBytesUsed(shard); + + // Sort key depends on which budget is exceeded + long sortKey; + if (heapOverBudget && nativeOverBudget) { + sortKey = shardBytesUsed + shardNativeUsed; + } else if (nativeOverBudget) { + sortKey = shardNativeUsed; + } else { + sortKey = shardBytesUsed; + } + + if (sortKey > 0) { + queue.add(new ShardAndBytesUsed(sortKey, shard)); } } logger.debug( - "now write some indexing buffers: total indexing heap bytes used [{}] vs {} [{}], " - + "currently writing bytes [{}], [{}] shards with non-zero indexing buffer", - new ByteSizeValue(totalBytesUsed), - INDEX_BUFFER_SIZE_SETTING.getKey(), + "now write some indexing buffers: heap used [{}] vs budget [{}], " + + "native used [{}] vs budget [{}], writing [{}], [{}] shards queued", + new ByteSizeValue(totalHeapBytesUsed), indexingBuffer, + new ByteSizeValue(totalNativeUsed), + new ByteSizeValue(nativeBudget), new ByteSizeValue(totalBytesWriting), queue.size() ); - while (totalBytesUsed > indexingBuffer.getBytes() && queue.isEmpty() == false) { + while ((totalHeapBytesUsed > indexingBuffer.getBytes() || (nativeBudget > 0 && totalNativeUsed > nativeBudget)) + && queue.isEmpty() == false) { ShardAndBytesUsed largest = queue.poll(); logger.debug( - "write indexing buffer to disk for shard [{}] to free up its [{}] indexing buffer", + "write indexing buffer to disk for shard [{}] to free up [{}] bytes", largest.shard.shardId(), new ByteSizeValue(largest.bytesUsed) ); writeIndexingBufferAsync(largest.shard); - totalBytesUsed -= largest.bytesUsed; + totalHeapBytesUsed -= getIndexBufferRAMBytesUsed(largest.shard); + totalNativeUsed -= getNativeBytesUsed(largest.shard); if (doThrottle && throttled.contains(largest.shard) == false) { - logger.info("now throttling indexing for shard [{}]: segment writing can't keep up", largest.shard.shardId()); + logger.info("now throttling indexing for shard [{}]: memory pressure", largest.shard.shardId()); throttled.add(largest.shard); activateThrottling(largest.shard); } diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index 81763849eb507..bd072df18270e 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -446,6 +446,12 @@ public class IndicesService extends AbstractLifecycleComponent private final MapperRegistry mapperRegistry; private final NamedWriteableRegistry namedWriteableRegistry; private final IndexingMemoryController indexingMemoryController; + + /** Sets the native allocator on the IMC after plugins are loaded. */ + public void setNativeAllocator(org.opensearch.arrow.spi.NativeAllocator allocator) { + indexingMemoryController.setNativeAllocator(allocator); + } + private final TimeValue cleanInterval; // clean interval for the field data cache final IndicesRequestCache indicesRequestCache; // pkg-private for testing private final IndicesQueryCache indicesQueryCache; diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 240f7fa9463f5..8578509a70c1b 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -1176,6 +1176,10 @@ protected Node(final Environment initialEnvironment, Collection clas } pluginComponents.addAll(components); } + // Wire native allocator to IMC for native memory budget tracking + pluginComponentRegistry.getComponent(org.opensearch.arrow.spi.NativeAllocator.class) + .ifPresent(na -> indicesService.setNativeAllocator(na)); + pluginComponentRegistry.seal(); Collection telemetryAwarePluginComponents = pluginsService.filterPlugins(TelemetryAwarePlugin.class)