Skip to content

Adding virtual memory pools#21885

Draft
rayshrey wants to merge 1 commit into
opensearch-project:mainfrom
rayshrey:combined-memory-pool
Draft

Adding virtual memory pools#21885
rayshrey wants to merge 1 commit into
opensearch-project:mainfrom
rayshrey:combined-memory-pool

Conversation

@rayshrey
Copy link
Copy Markdown
Contributor

@rayshrey rayshrey commented May 29, 2026

Unified Native Memory Pool Management

Description

Introduces a unified memory pool registry and elastic rebalancer for all native (off-heap) memory in OpenSearch. All memory pools — Arrow-backed (flight, ingest, query) and Rust virtual
(datafusion, write, merge) — are now managed under a single framework with configurable min/max limits and automatic redistribution.

Key Changes

Unified Pool Registry

  • Extended NativeAllocator SPI with virtual pool registration, unified stats, and limit-setting hooks
  • All 6 pools (flight, ingest, query, datafusion, write, merge) registered with ArrowNativeAllocator
  • Each pool has a PoolGroup (transport, search, indexing, merge) for aggregated customer-facing stats
  • Consolidated _nodes/stats/native_memory endpoint — shows grouped pools + jemalloc runtime stats
  • New _plugins/arrow_base/stats endpoint — detailed per-pool view with min/max/group for debugging
  • Removed ROOT_LIMIT_SETTING — per-pool limits are the real enforcement (Arrow root set to Long.MAX_VALUE)
  • Added NativeAllocator parameter to SearchBackEndPlugin.createComponents so DataFusion plugin can register its pool

Elastic Rebalancer

  • NativeMemoryRebalancer — periodic (5s default) algorithm that grows/shrinks pools based on utilization
  • Pools start at their configured min; rebalancer grows them when utilization > 75%
  • Idle pools (< 50% utilization) shrink to free headroom for pressured pools
  • Growth distributed proportionally to demand (pools with more room-to-grow get more)
  • Safety valve: clamps all pools if jemalloc RSS > 95% of budget
  • Invariant: sum(effective_limits) <= node.native_memory.limit at all times

Rust Memory Pool

  • MemoryPool + MemoryReservation RAII primitives in native_bridge_common
  • Supports try_grow (reject) and wait_and_grow (block with timeout + Condvar notification)
  • FFM bridge for Java ↔ Rust pool limit/stats communication

Settings

Setting Default Scope
native.allocator.rebalancer.enabled true NodeScope
native.allocator.rebalance.interval_seconds 5 NodeScope, Dynamic
native.allocator.pool.{flight,ingest,query}.min 2-4% of budget NodeScope
native.allocator.pool.{flight,ingest,query}.max 5-8% of budget NodeScope
parquet.native.pool.{write,merge}.min 1-2% of budget NodeScope
parquet.native.pool.{write,merge}.max 3-5% of budget NodeScope
datafusion.memory_pool_min_bytes 37% of budget NodeScope
datafusion.memory_pool_limit_bytes 75% of budget NodeScope

Validations

  • PRESSURE_THRESHOLD > IDLE_THRESHOLD
  • Per-pool: min >= 1MB, min <= max, max <= 90% of budget
  • sum(all mins) <= 60% of budget
  • sum(all maxes) <= budget when rebalancer is disabled
  • Rebalancer enabled requires budget > 0 and interval > 0

Stats Output

_nodes/stats/native_memory

{
  "native_memory": {
    "total_estimated_bytes": -1,
    "runtime": { "allocated_bytes": 5476512, "resident_bytes": 15335424 },
    "memory_pools": {
      "transport": { "allocated_bytes": 0, "peak_bytes": 0, "limit_bytes": 1866163290 },
      "search":    { "allocated_bytes": 0, "peak_bytes": 0, "limit_bytes": 29858612641 },
      "indexing":  { "allocated_bytes": 0, "peak_bytes": 303104, "limit_bytes": 4851861264 },
      "merge":     { "allocated_bytes": 0, "peak_bytes": 0, "limit_bytes": 1119697974 }
    }
  }
}

_plugins/arrow_base/stats:

{
  "memory_pools": {
    "runtime": { "allocated_bytes": 5476512, "resident_bytes": 15335424 },
    "pools": {
      "flight":     { "allocated_bytes": 0, "peak_bytes": 0, "limit_bytes": 1866163290, "min_bytes": 746465316, "group": "transport" },
      "query":      { "allocated_bytes": 0, "peak_bytes": 0, "limit_bytes": 1866163290, "min_bytes": 746465316, "group": "search" },
      "datafusion": { "allocated_bytes": 0, "peak_bytes": 0, "limit_bytes": 27992449351, "min_bytes": 13809401241, "group": "search" },
      "ingest":     { "allocated_bytes": 0, "peak_bytes": 0, "limit_bytes": 2985861264, "min_bytes": 1492930632, "group": "indexing" },
      "write":      { "allocated_bytes": 0, "peak_bytes": 0, "limit_bytes": 1866163290, "min_bytes": 746465316, "group": "indexing" },
      "merge":      { "allocated_bytes": 0, "peak_bytes": 0, "limit_bytes": 1119697974, "min_bytes": 373232658, "group": "merge" }
    }
  }
}

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 29, 2026

PR Code Analyzer ❗

AI-powered 'Code-Diff-Analyzer' found issues on commit 3485d8a.

PathLineSeverityDescription
server/build.gradle80highDependency change: adds 'api project(":libs:opensearch-arrow-spi")' to the server module. This introduces a new compile-time dependency from the server core onto the Arrow SPI library. While it appears to be an internal project reference, maintainers must verify this does not introduce unintended transitive dependencies or widen the attack surface of the server module.
libs/arrow-spi/build.gradle14highDependency change: modifies the testImplementation dependency on ':test:framework' to add an exclusion for 'org.opensearch:opensearch-arrow-spi'. This alters test dependency resolution and could mask circular dependency issues or cause test isolation to differ from production behavior.
plugins/arrow-base/src/main/java/org/opensearch/arrow/allocator/ArrowBaseStatsAction.java50mediumNew REST endpoint '_plugins/arrow_base/stats' is registered via ActionPlugin. The handler exposes per-pool native memory statistics including allocated bytes, peak bytes, limit bytes, and min bytes for all registered pools. No explicit authorization check is visible in this handler; it relies entirely on the downstream security plugin. If OpenSearch Security is not installed, this endpoint is unauthenticated and leaks internal memory topology.
sandbox/plugins/parquet-data-format/src/main/rust/src/ffm.rs721mediumThe unsafe extern 'C' function 'parquet_get_pool_stats' writes 6 i64 values to a raw *mut i64 pointer without bounds checking. The function assumes the caller has allocated at least 48 bytes. If the Java caller ever passes a buffer smaller than 6×8 bytes (e.g., due to a refactor of RustBridge.getPoolStats), this will cause out-of-bounds memory writes in the native process, potentially enabling memory corruption or code execution.
plugins/arrow-base/src/main/java/org/opensearch/arrow/allocator/NativeMemoryRebalancer.java196lowThe 'clampAll' safety valve method, triggered when jemalloc RSS exceeds 95% of the configured budget, immediately reduces effective pool limits for every registered pool to max(currentlyAllocated, min). This could precipitously starve in-flight queries and write operations cluster-wide. An operator or attacker who can influence jemalloc RSS reporting (e.g., via the setNativeMemoryStatsSupplier callback) could trigger a node-wide denial of service.
sandbox/libs/dataformat-native/rust/common/src/memory_pool.rs168lowThe 'wait_and_grow' method in MemoryPool can block a calling thread for up to MERGE_WAIT_TIMEOUT (600 seconds) in a spin-wait loop that re-checks every 1 second. If merge operation threads are blocked here and thread pool sizing is not accounted for, this could exhaust the thread pool under memory pressure, contributing to a denial-of-service condition.

The table above displays the top 10 most important findings.

Total: 6 | Critical: 0 | High: 2 | Medium: 2 | Low: 2


Pull Requests Author(s): Please update your Pull Request according to the report above.

Repository Maintainer(s): You can bypass diff analyzer by adding label skip-diff-analyzer after reviewing the changes carefully, then re-run failed actions. To re-enable the analyzer, remove the label, then re-run all actions.


⚠️ Note: The Code-Diff-Analyzer helps protect against potentially harmful code patterns. Please ensure you have thoroughly reviewed the changes beforehand.

Thanks.

@rayshrey rayshrey force-pushed the combined-memory-pool branch from e063aec to c9c44b9 Compare May 30, 2026 20:21
@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 30, 2026

PR Reviewer Guide 🔍

(Review updated until commit 3995f96)

Here are some key observations to aid the review process:

🧪 PR contains tests
🔒 No security concerns identified
✅ No TODO sections
🔀 No multiple PR themes
⚡ Recommended focus areas for review

Possible Issue

The rebalancer is started unconditionally if rebalancerEnabled is true, but if intervalSeconds is 0 or negative after the check at lines 278-283, the scheduled task is never created. This leaves rebalanceTask null, but updateRebalanceInterval at line 311 attempts to cancel it without checking if it exists. If a dynamic update sets interval to 0 after startup, line 313 calls FutureUtils.cancel(existing) on a potentially null reference, which could throw NullPointerException.

boolean rebalancerEnabled = REBALANCER_ENABLED_SETTING.get(settings);
if (rebalancerEnabled) {
    long budget = ResourceTrackerSettings.NODE_NATIVE_MEMORY_LIMIT_SETTING.get(settings).getBytes();
    if (budget <= 0) {
        throw new IllegalArgumentException(
            "Rebalancer is enabled but node.native_memory.limit is not configured. "
                + "Set node.native_memory.limit or disable the rebalancer."
        );
    }
    long intervalSeconds = REBALANCE_INTERVAL_SETTING.get(settings);
    if (intervalSeconds <= 0) {
        throw new IllegalArgumentException(
            "Rebalancer is enabled but native.allocator.rebalance.interval_seconds is 0. "
                + "Set a positive interval or disable the rebalancer."
        );
    }
    validatePoolBudgetConstraints(settings, budget);

    NativeMemoryRebalancer nativeRebalancer = new NativeMemoryRebalancer(
        allocator,
        () -> ResourceTrackerSettings.NODE_NATIVE_MEMORY_LIMIT_SETTING.get(settings).getBytes()
    );
    this.rebalancer = nativeRebalancer;

    org.opensearch.threadpool.Scheduler.SafeScheduledThreadPoolExecutor executor =
        new org.opensearch.threadpool.Scheduler.SafeScheduledThreadPoolExecutor(1, r -> {
            Thread t = new Thread(r, "native-allocator-rebalancer");
            t.setDaemon(true);
            return t;
        });
    executor.setRemoveOnCancelPolicy(true);
    this.rebalancerScheduler = executor;

    rebalanceTask = rebalancerScheduler.scheduleAtFixedRate(nativeRebalancer, intervalSeconds, intervalSeconds, TimeUnit.SECONDS);

    // Register dynamic consumer for interval changes
    cs.addSettingsUpdateConsumer(REBALANCE_INTERVAL_SETTING, this::updateRebalanceInterval);
}
Possible Issue

The method validateSumMaxesWithinBudget at line 80 iterates over poolMaxes entries and sums them, but it does not account for virtual pools registered via registerVirtualPool. Virtual pools are stored in virtualPools and their maxes are in poolMaxes, but the iteration only checks poolMaxes.entrySet(). If a virtual pool is registered after an Arrow pool, the sum could include both, but the logic does not distinguish between them. If the sum exceeds budget, the error message says "rebalancer is disabled" but the check runs even when rebalancer is enabled (line 81 checks rebalancerEnabled and returns early if true, so this is only an issue when rebalancer is disabled). However, the method is called from getOrCreatePool and registerVirtualPool, and if a virtual pool is registered when rebalancer is disabled, the sum could incorrectly include the new pool's max twice if the pool name already exists in poolMaxes from a prior registration attempt.

private void validateSumMaxesWithinBudget(String newPoolName, long newPoolMax) {
    if (rebalancerEnabled || budget == Long.MAX_VALUE || budget <= 0) {
        return;
    }
    long sumMaxes = newPoolMax;
    for (var entry : poolMaxes.entrySet()) {
        if (entry.getKey().equals(newPoolName) == false) {
            sumMaxes += entry.getValue();
        }
    }
    if (sumMaxes > budget) {
        throw new IllegalArgumentException(
            "Sum of pool max limits ("
                + sumMaxes
                + " bytes) exceeds native memory budget ("
                + budget
                + " bytes) and rebalancer is disabled. Reduce pool max settings or enable the rebalancer."
        );
    }
}
Possible Issue

In registerVirtualPool at line 220, the method checks if pools.containsKey(poolName) || virtualPools.containsKey(poolName) and throws if true. However, poolMins and poolMaxes are updated at lines 237-238 before the handle is created. If a concurrent call to getOrCreatePool or registerVirtualPool with the same name occurs between the containsKey check and the putIfAbsent at line 244, the maps could be in an inconsistent state. The poolMins and poolMaxes maps use put (not putIfAbsent), so a race could overwrite values. This is unlikely in practice but could cause subtle bugs under high concurrency.

public VirtualPoolHandle registerVirtualPool(
    String poolName,
    long min,
    long max,
    org.opensearch.arrow.spi.PoolGroup group,
    Consumer<Long> limitSetter
) {
    if (pools.containsKey(poolName) || virtualPools.containsKey(poolName)) {
        throw new IllegalStateException("Pool '" + poolName + "' already registered");
    }
    if (min < MIN_POOL_FLOOR_BYTES) {
        throw new IllegalArgumentException("Pool '" + poolName + "' min must be >= 1MB, got " + min + " bytes");
    }
    if (min > max) {
        throw new IllegalArgumentException("Pool '" + poolName + "' min (" + min + ") exceeds max (" + max + ")");
    }
    validateSumMaxesWithinBudget(poolName, max);
    poolMins.put(poolName, min);
    poolMaxes.put(poolName, max);
    if (group != null) {
        poolGroups.put(poolName, group);
    }
    long initialLimit = rebalancerEnabled ? min : max;
    VirtualPoolHandleImpl handle = new VirtualPoolHandleImpl(poolName, initialLimit, limitSetter);
    limitSetter.accept(initialLimit);
    virtualPools.put(poolName, handle);
    return handle;
Possible Issue

The rebalancer at line 143 updates states.get(entry.getKey()).effectiveLimit directly after calling allocator.setPoolEffectiveLimit. However, states is a local map created at line 102, and the update at line 144 modifies the PoolState object in that map. This is fine for the current tick, but if an exception occurs in setPoolEffectiveLimit (line 143), the state map is updated even though the allocator was not. The subsequent logic at lines 150-153 computes sumLimits from the state map, which could now be inconsistent with the actual allocator state. This could cause the rebalancer to think it has more headroom than it actually does, leading to over-allocation.

for (var entry : idle.entrySet()) {
    PoolState s = entry.getValue();
    long newLimit = Math.max((long) (s.effectiveLimit * SHRINK_FACTOR), s.min);
    newLimit = Math.max(newLimit, s.allocated);
    if (newLimit != s.effectiveLimit) {
        allocator.setPoolEffectiveLimit(entry.getKey(), newLimit);
        states.get(entry.getKey()).effectiveLimit = newLimit;
    }
}

// Compute available headroom
long sumLimits = 0;
for (PoolState s : states.values()) {
    sumLimits += s.effectiveLimit;
}
long availableHeadroom = Math.max(0, budget - sumLimits);
Possible Issue

The method validatePoolBudgetConstraints at line 321 checks if individual pool maxes exceed 90% of budget, and if sum of mins exceeds 60% of budget. However, it only checks the three Arrow pools (flight, ingest, query) and does not account for virtual pools that may be registered later by other plugins. If a virtual pool is registered with a large max or min, the total could exceed the budget, but this validation only runs at startup for the Arrow pools. The validation should either run after all pools are registered, or be enforced dynamically when virtual pools are added.

private static void validatePoolBudgetConstraints(Settings settings, long budget) {
    long flightMax = FLIGHT_MAX_SETTING.get(settings);
    long ingestMax = INGEST_MAX_SETTING.get(settings);
    long queryMax = QUERY_MAX_SETTING.get(settings);
    long ninetyPercent = (long) (budget * 0.9);

    if (flightMax != Long.MAX_VALUE && flightMax > ninetyPercent) {
        throw new IllegalArgumentException("Flight pool max (" + flightMax + ") exceeds 90% of budget (" + budget + ")");
    }
    if (ingestMax != Long.MAX_VALUE && ingestMax > ninetyPercent) {
        throw new IllegalArgumentException("Ingest pool max (" + ingestMax + ") exceeds 90% of budget (" + budget + ")");
    }
    if (queryMax != Long.MAX_VALUE && queryMax > ninetyPercent) {
        throw new IllegalArgumentException("Query pool max (" + queryMax + ") exceeds 90% of budget (" + budget + ")");
    }

    long flightMin = FLIGHT_MIN_SETTING.get(settings);
    long ingestMin = INGEST_MIN_SETTING.get(settings);
    long queryMin = QUERY_MIN_SETTING.get(settings);
    long sumMins = flightMin + ingestMin + queryMin;
    long sixtyPercent = (long) (budget * 0.6);
    if (sumMins > sixtyPercent) {
        throw new IllegalArgumentException("Sum of pool mins (" + sumMins + ") exceeds 60% of budget (" + budget + ")");
    }
}

@github-actions
Copy link
Copy Markdown
Contributor

github-actions Bot commented May 30, 2026

PR Code Suggestions ✨

Latest suggestions up to 3995f96
Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
General
Add synchronization for rebalance interval updates

The updateRebalanceInterval method has a race condition: between canceling the
existing task and starting a new one, there's a window where rebalanceTask is null
but the rebalancer might still be running. Consider using synchronization or atomic
operations to ensure thread-safe updates, especially since this is called from
cluster settings updates which may occur concurrently.

plugins/arrow-base/src/main/java/org/opensearch/arrow/allocator/ArrowBasePlugin.java [310-319]

-private void updateRebalanceInterval(long newInterval) {
+private synchronized void updateRebalanceInterval(long newInterval) {
     ScheduledFuture<?> existing = rebalanceTask;
     if (existing != null) {
         org.opensearch.common.util.concurrent.FutureUtils.cancel(existing);
         rebalanceTask = null;
     }
     if (newInterval > 0 && rebalancerScheduler != null && rebalancer != null) {
         rebalanceTask = rebalancerScheduler.scheduleAtFixedRate(rebalancer, newInterval, newInterval, TimeUnit.SECONDS);
     }
 }
Suggestion importance[1-10]: 8

__

Why: The suggestion correctly identifies a potential race condition in updateRebalanceInterval. Since this method is called from cluster settings updates which may occur concurrently, adding synchronization prevents issues where the rebalancer state becomes inconsistent. This is an important correctness improvement.

Medium
Extract pool minimum validation logic

The 1MB minimum pool floor is enforced only when the rebalancer is enabled, but the
same check is duplicated in registerVirtualPool. Consider extracting this validation
into a private helper method to avoid code duplication and ensure consistent
enforcement across both Arrow and virtual pool registration paths.

plugins/arrow-base/src/main/java/org/opensearch/arrow/allocator/ArrowNativeAllocator.java [122-135]

 private static final long MIN_POOL_FLOOR_BYTES = 1024 * 1024; // 1MB absolute minimum
 
-public PoolHandle getOrCreatePool(String poolName, long min, long max, org.opensearch.arrow.spi.PoolGroup group) {
+private void validatePoolMin(String poolName, long min) {
     if (rebalancerEnabled && min < MIN_POOL_FLOOR_BYTES) {
         throw new IllegalArgumentException(
             "Pool '" + poolName + "' min must be >= 1MB when rebalancer is enabled, got " + min + " bytes"
         );
     }
+}
 
+public PoolHandle getOrCreatePool(String poolName, long min, long max, org.opensearch.arrow.spi.PoolGroup group) {
+    validatePoolMin(poolName, min);
+
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly identifies code duplication between getOrCreatePool and registerVirtualPool for the 1MB minimum validation. Extracting this into a helper method improves maintainability and ensures consistent enforcement. However, the impact is moderate since the duplication is limited to two locations.

Medium
Add logging for stats refresh failures

The refreshStats() method silently swallows all exceptions without logging. While
this is marked as "best-effort", completely silent failures make debugging
difficult. Consider adding debug-level logging when a refresher fails to help
operators diagnose issues with stats collection from native layers.

plugins/arrow-base/src/main/java/org/opensearch/arrow/allocator/ArrowNativeAllocator.java [449-457]

 public void refreshStats() {
     for (Runnable refresher : statsRefreshers) {
         try {
             refresher.run();
         } catch (Exception e) {
-            // best-effort
+            logger.debug("Stats refresher failed (best-effort, continuing)", e);
         }
     }
 }
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly identifies that silently swallowing exceptions makes debugging difficult. Adding debug-level logging would help operators diagnose issues with native stats collection without impacting performance. This is a good observability improvement.

Medium
Move constant validation to static block

The threshold validation in the constructor checks compile-time constants that never
change at runtime. This validation should be moved to a static initializer block or
removed entirely, as it adds unnecessary overhead to every rebalancer instantiation
for a condition that can be verified at compile time or during testing.

plugins/arrow-base/src/main/java/org/opensearch/arrow/allocator/NativeMemoryRebalancer.java [59-67]

-public NativeMemoryRebalancer(ArrowNativeAllocator allocator, Supplier<Long> budgetSupplier) {
+static {
     if (PRESSURE_THRESHOLD <= IDLE_THRESHOLD) {
         throw new IllegalStateException(
             "PRESSURE_THRESHOLD (" + PRESSURE_THRESHOLD + ") must be greater than IDLE_THRESHOLD (" + IDLE_THRESHOLD + ")"
         );
     }
+}
+
+public NativeMemoryRebalancer(ArrowNativeAllocator allocator, Supplier<Long> budgetSupplier) {
     this.allocator = allocator;
     this.budgetSupplier = budgetSupplier;
 }
Suggestion importance[1-10]: 6

__

Why: The suggestion correctly identifies that validating compile-time constants in the constructor is inefficient. Moving to a static block would catch configuration errors earlier and avoid repeated checks. However, the performance impact is minimal since rebalancer instantiation is infrequent, and the current approach provides clearer error context.

Low
Strengthen memory ordering for concurrency

The memory ordering for try_grow uses Relaxed for both success and failure paths.
For correctness in concurrent scenarios, use Acquire ordering when reading the limit
and AcqRel for the fetch_update to ensure proper synchronization with set_limit
which uses Release.

sandbox/libs/dataformat-native/rust/common/src/memory_pool.rs [116-128]

 pub fn try_grow(&self, bytes: usize) -> Result<(), PoolExhausted> {
     if bytes == 0 {
         return Ok(());
     }
-    let limit = self.limit.load(Ordering::Relaxed);
-    let result = self.used.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |used| {
+    let limit = self.limit.load(Ordering::Acquire);
+    let result = self.used.fetch_update(Ordering::AcqRel, Ordering::Acquire, |used| {
         let new_used = used.checked_add(bytes)?;
         if limit > 0 && new_used > limit {
             None
         } else {
             Some(new_used)
         }
     });
Suggestion importance[1-10]: 6

__

Why: The suggestion raises a valid concern about memory ordering consistency with set_limit which uses Release. Using Acquire for reading limit and AcqRel for fetch_update would provide stronger synchronization guarantees. However, for simple counter operations, Relaxed ordering may be acceptable depending on the synchronization requirements, making this a moderate improvement rather than a critical fix.

Low
Add assertion for memory accounting

The shrink method silently clamps the bytes to the current size, which can mask bugs
where callers attempt to release more memory than they reserved. Consider adding a
debug assertion or warning when bytes > self.size to catch accounting errors during
development.

sandbox/libs/dataformat-native/rust/common/src/memory_pool.rs [262-266]

 pub fn shrink(&mut self, bytes: usize) {
+    debug_assert!(bytes <= self.size, 
+        "Attempted to shrink {} bytes but only {} reserved", bytes, self.size);
     let actual = bytes.min(self.size);
     self.pool.shrink(actual);
     self.size -= actual;
 }
Suggestion importance[1-10]: 5

__

Why: Adding a debug_assert! to catch accounting errors is a reasonable suggestion for development-time bug detection. However, the current implementation already handles this case safely by clamping to self.size, so this is a minor improvement for debugging rather than a correctness issue.

Low
Possible issue
Handle mutex poisoning gracefully

The unwrap() calls on mutex lock and condvar wait can panic if the mutex is poisoned
or the wait fails. This creates a critical failure path in memory allocation.
Replace with proper error handling that returns a PoolTimeout error instead of
panicking.

sandbox/libs/dataformat-native/rust/common/src/memory_pool.rs [169-170]

-let guard = self.notify_lock.lock().unwrap();
-let _ = self.notify.wait_timeout(guard, remaining.min(Duration::from_secs(1))).unwrap();
+let guard = self.notify_lock.lock().map_err(|_| PoolTimeout {
+    pool_name: self.name,
+    requested: bytes,
+    used: self.used.load(Ordering::Relaxed),
+    limit: self.limit.load(Ordering::Relaxed),
+    waited: elapsed,
+})?;
+let _ = self.notify.wait_timeout(guard, remaining.min(Duration::from_secs(1)))
+    .map_err(|_| PoolTimeout {
+        pool_name: self.name,
+        requested: bytes,
+        used: self.used.load(Ordering::Relaxed),
+        limit: self.limit.load(Ordering::Relaxed),
+        waited: elapsed,
+    })?;
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly identifies that unwrap() calls can panic on mutex poisoning, which is a legitimate concern in production code. However, the improved code changes the return type from Result<(), PoolTimeout> to require ? operator compatibility with a different error type, which would require broader API changes. The issue is valid but the solution needs refinement.

Medium

Previous suggestions

Suggestions up to commit c9c44b9
CategorySuggestion                                                                                                                                    Impact
Possible issue
Prevent integer underflow in shrink

The fetch_sub operation can underflow if bytes exceeds used, causing integer
wraparound. This could corrupt the pool's accounting. Add a check or use saturating
subtraction to prevent underflow.

sandbox/libs/dataformat-native/rust/common/src/memory_pool.rs [188-194]

 pub fn shrink(&self, bytes: usize) {
     if bytes == 0 {
         return;
     }
-    self.used.fetch_sub(bytes, Ordering::Relaxed);
+    self.used.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |used| {
+        Some(used.saturating_sub(bytes))
+    }).ok();
     self.notify.notify_all();
 }
Suggestion importance[1-10]: 9

__

Why: This is a critical bug fix. The fetch_sub operation can indeed underflow if bytes exceeds used, causing integer wraparound and corrupting the pool's accounting. The suggested saturating_sub approach prevents this corruption, which could lead to serious memory tracking issues.

High
Handle mutex/condvar errors gracefully

The unwrap() calls will panic if the mutex is poisoned or the condvar fails. In
production code handling memory allocation, panics can crash the process. Consider
using proper error handling with ? or expect() with descriptive messages.

sandbox/libs/dataformat-native/rust/common/src/memory_pool.rs [169-170]

-let guard = self.notify_lock.lock().unwrap();
-let _ = self.notify.wait_timeout(guard, remaining.min(Duration::from_secs(1))).unwrap();
+let guard = self.notify_lock.lock().expect("notify_lock poisoned");
+let _ = self.notify.wait_timeout(guard, remaining.min(Duration::from_secs(1)))
+    .expect("condvar wait failed");
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly identifies that unwrap() calls can panic if the mutex is poisoned or condvar fails. Using expect() with descriptive messages improves debuggability, though the underlying panic behavior remains. This is a moderate improvement for production code handling memory allocation.

Medium
Enforce minimum pool size universally

The validation for MIN_POOL_FLOOR_BYTES should also apply when the rebalancer is
disabled to prevent pools from being created with zero or very small min values that
could cause issues. The 1MB floor is a safety constraint that should be enforced
universally, not just when the rebalancer is active.

plugins/arrow-base/src/main/java/org/opensearch/arrow/allocator/ArrowNativeAllocator.java [131-135]

-if (rebalancerEnabled && min < MIN_POOL_FLOOR_BYTES) {
+if (min < MIN_POOL_FLOOR_BYTES) {
     throw new IllegalArgumentException(
-        "Pool '" + poolName + "' min must be >= 1MB when rebalancer is enabled, got " + min + " bytes"
+        "Pool '" + poolName + "' min must be >= 1MB, got " + min + " bytes"
     );
 }
Suggestion importance[1-10]: 6

__

Why: The suggestion to enforce MIN_POOL_FLOOR_BYTES universally (not just when rebalancer is enabled) has merit for consistency, but the current design intentionally allows smaller mins when the rebalancer is off. The 1MB floor is specifically a rebalancer constraint to prevent thrashing. Changing this would alter the intended behavior for non-rebalanced deployments.

Low
General
Log stats refresher failures for visibility

Silently swallowing exceptions in refreshStats() can hide critical failures in stats
collection from virtual pools. Log the exception at warn level so operators can
diagnose issues with native memory tracking without breaking the stats collection
flow.

plugins/arrow-base/src/main/java/org/opensearch/arrow/allocator/ArrowNativeAllocator.java [449-457]

 public void refreshStats() {
     for (Runnable refresher : statsRefreshers) {
         try {
             refresher.run();
         } catch (Exception e) {
-            // best-effort
+            logger.warn("Stats refresher failed", e);
         }
     }
 }
Suggestion importance[1-10]: 8

__

Why: Silently swallowing exceptions in refreshStats() can hide critical failures in virtual pool stats collection. Logging at warn level provides visibility for operators without breaking the stats flow. This is a valuable improvement for diagnosability.

Medium
Prevent silent re-initialization with wrong limits

If init_pools is called multiple times with different limits, get_or_init will
ignore subsequent calls, keeping the first values. This could lead to incorrect pool
limits. Consider returning an error or using set_once pattern if re-initialization
should be prevented.

sandbox/plugins/parquet-data-format/src/main/rust/src/memory.rs [29-40]

 pub fn init_pools(write_limit: usize, merge_limit: usize) {
-    WRITE_POOL.get_or_init(|| PoolCounters {
+    WRITE_POOL.set(PoolCounters {
         limit: AtomicUsize::new(write_limit),
         used: AtomicUsize::new(0),
         peak: AtomicUsize::new(0),
-    });
-    MERGE_POOL.get_or_init(|| PoolCounters {
+    }).expect("WRITE_POOL already initialized");
+    MERGE_POOL.set(PoolCounters {
         limit: AtomicUsize::new(merge_limit),
         used: AtomicUsize::new(0),
         peak: AtomicUsize::new(0),
-    });
+    }).expect("MERGE_POOL already initialized");
 }
Suggestion importance[1-10]: 8

__

Why: The suggestion identifies a real issue where get_or_init silently ignores subsequent calls with different limits, potentially causing incorrect pool configuration. Using set() with expect() makes re-initialization attempts explicit and prevents silent failures, which is important for correctness.

Medium
Distribute truncation remainder to pressured pools

The proportional distribution can result in integer truncation that leaves
unallocated headroom when totalDesired is large. After the loop completes, any
remaining headroom should be distributed to pressured pools to maximize utilization.
Add a second pass to distribute the remainder.

plugins/arrow-base/src/main/java/org/opensearch/arrow/allocator/NativeMemoryRebalancer.java [177-197]

 long grant = (long) ((double) availableHeadroom * desired / totalDesired);
 grant = Math.min(grant, s.max - s.effectiveLimit);
 grant = Math.min(grant, availableHeadroom - totalGranted);
+if (grant > 0) {
+    try {
+        long newLimit = s.effectiveLimit + grant;
+        allocator.setPoolEffectiveLimit(name, newLimit);
+        totalGranted += grant;
+        logger.debug("Rebalancer: grew pool [{}] by {} bytes to {}", name, grant, newLimit);
+    } catch (Exception e) {
+        logger.warn(() -> new org.apache.logging.log4j.message.ParameterizedMessage("Rebalancer: failed to grow pool [{}]", name), e);
+    }
+}
+// Distribute any remaining headroom due to truncation
+long remaining = availableHeadroom - totalGranted;
+if (remaining > 0) {
+    for (var entry : desires.entrySet()) {
+        String name = entry.getKey();
+        PoolState s = states.get(name);
+        long additionalGrant = Math.min(remaining, s.max - s.effectiveLimit);
+        if (additionalGrant > 0) {
+            allocator.setPoolEffectiveLimit(name, s.effectiveLimit + additionalGrant);
+            remaining -= additionalGrant;
+            if (remaining == 0) break;
+        }
+    }
+}
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly identifies that integer truncation in proportional distribution can leave unallocated headroom. Adding a second pass to distribute the remainder would improve utilization. However, the impact is minor (typically a few bytes) and the added complexity may not justify the benefit in most scenarios.

Medium
Align virtual pool validation with Arrow pools

The validation for MIN_POOL_FLOOR_BYTES in registerVirtualPool is unconditional, but
the same validation in getOrCreatePool only applies when rebalancerEnabled is true.
This inconsistency can lead to different behavior for Arrow vs virtual pools. Apply
the same conditional logic or make both unconditional for consistency.

plugins/arrow-base/src/main/java/org/opensearch/arrow/allocator/ArrowNativeAllocator.java [230-232]

 public VirtualPoolHandle registerVirtualPool(
     String poolName,
     long min,
     long max,
     org.opensearch.arrow.spi.PoolGroup group,
     Consumer<Long> limitSetter
 ) {
     if (pools.containsKey(poolName) || virtualPools.containsKey(poolName)) {
         throw new IllegalStateException("Pool '" + poolName + "' already registered");
     }
-    if (min < MIN_POOL_FLOOR_BYTES) {
-        throw new IllegalArgumentException("Pool '" + poolName + "' min must be >= 1MB, got " + min + " bytes");
+    if (rebalancerEnabled && min < MIN_POOL_FLOOR_BYTES) {
+        throw new IllegalArgumentException("Pool '" + poolName + "' min must be >= 1MB when rebalancer is enabled, got " + min + " bytes");
     }
Suggestion importance[1-10]: 7

__

Why: The inconsistency between registerVirtualPool (unconditional 1MB floor) and getOrCreatePool (conditional on rebalancerEnabled) is a valid concern. However, the current design may be intentional: virtual pools are always expected to participate in rebalancing, while Arrow pools can operate standalone. Clarifying the intent or aligning the logic would improve consistency.

Medium

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for c9c44b9: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

@rayshrey rayshrey force-pushed the combined-memory-pool branch from c9c44b9 to 3485d8a Compare May 30, 2026 20:58
Signed-off-by: rayshrey <rayshrey@amazon.com>
@rayshrey rayshrey force-pushed the combined-memory-pool branch from 3485d8a to 3995f96 Compare May 31, 2026 08:58
@github-actions
Copy link
Copy Markdown
Contributor

Persistent review updated to latest commit 3995f96

@github-actions
Copy link
Copy Markdown
Contributor

❌ Gradle check result for 3995f96: FAILURE

Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant