Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

package org.opensearch.arrow.spi;

import org.opensearch.common.annotation.PublicApi;

import java.io.Closeable;

/**
Expand All @@ -23,6 +25,7 @@
*
* @opensearch.api
*/
@PublicApi(since = "3.0.0")
public interface NativeAllocator extends Closeable {

/**
Expand Down Expand Up @@ -57,6 +60,16 @@ public interface NativeAllocator extends Closeable {
*/
NativeAllocatorPoolStats stats();

/**
* Returns the virtual pool handle for a Rust-side memory pool, or null if not registered.
*
* @param poolName logical pool name (e.g., "write", "merge")
* @return the pool handle, or null
*/
default PoolHandle getVirtualPool(String poolName) {
return null;
}

/**
* Opaque handle to a memory pool. Plugins downcast to the concrete type
* (e.g., Arrow's {@code BufferAllocator}) in the implementation layer.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,12 @@ public class ArrowNativeAllocator implements NativeAllocator {

private final RootAllocator root;
private final ConcurrentMap<String, ArrowPoolHandle> pools = new ConcurrentHashMap<>();
private final ConcurrentMap<String, VirtualPoolHandle> virtualPools = new ConcurrentHashMap<>();
private final ConcurrentMap<String, Long> poolMins = new ConcurrentHashMap<>();
private final ConcurrentMap<String, Long> poolMaxes = new ConcurrentHashMap<>();
private final ScheduledExecutorService rebalancer;
private volatile ScheduledFuture<?> rebalanceTask;
private volatile Runnable virtualPoolStatsRefresher;
/**
* True iff the rebalancer is configured to run periodically. Used by
* {@link #getOrCreatePool} to decide each pool's initial child-allocator
Expand Down Expand Up @@ -178,6 +180,11 @@ public void setRootLimit(long limit) {

@Override
public NativeAllocatorPoolStats stats() {
// Refresh Rust-side stats before collecting
Runnable refresher = this.virtualPoolStatsRefresher;
if (refresher != null) {
refresher.run();
}
List<NativeAllocatorPoolStats.PoolStats> poolStats = new ArrayList<>();
for (var entry : pools.entrySet()) {
BufferAllocator alloc = entry.getValue().allocator;
Expand All @@ -191,6 +198,11 @@ public NativeAllocatorPoolStats stats() {
)
);
}
// Include Rust-side virtual pools
for (var entry : virtualPools.entrySet()) {
VirtualPoolHandle vp = entry.getValue();
poolStats.add(new NativeAllocatorPoolStats.PoolStats(entry.getKey(), vp.allocatedBytes(), vp.peakBytes(), vp.limit(), 0));
}
return new NativeAllocatorPoolStats(root.getAllocatedMemory(), root.getPeakMemoryAllocation(), root.getLimit(), poolStats);
}

Expand Down Expand Up @@ -352,4 +364,69 @@ public BufferAllocator getAllocator() {
return allocator;
}
}

/**
* A virtual pool handle for Rust-side memory pools that report stats back to Java
* without using Arrow's BufferAllocator.
*/
public static class VirtualPoolHandle implements PoolHandle {
private final String name;
private final long limit;
private volatile long allocatedBytes;
private volatile long peakBytes;

VirtualPoolHandle(String name, long limit) {
this.name = name;
this.limit = limit;
}

/** Called by the stats refresher to report current Rust-side usage. */
public void updateStats(long allocated, long peak) {
this.allocatedBytes = allocated;
this.peakBytes = peak;
}

@Override
public PoolHandle newChild(String childName, long childLimit) {
throw new UnsupportedOperationException("Virtual pool [" + name + "] does not support children");
}

@Override
public long allocatedBytes() {
return allocatedBytes;
}

@Override
public long peakBytes() {
return peakBytes;
}

@Override
public long limit() {
return limit;
}

@Override
public void close() {}
}

/**
* Registers a virtual pool for a Rust-side memory pool.
*/
public VirtualPoolHandle registerVirtualPool(String poolName, long limit) {
VirtualPoolHandle handle = new VirtualPoolHandle(poolName, limit);
virtualPools.put(poolName, handle);
return handle;
}

/** Returns the virtual pool handle for a given name, or null if not registered. */
@Override
public VirtualPoolHandle getVirtualPool(String poolName) {
return virtualPools.get(poolName);
}

/** Registers a callback that refreshes virtual pool stats from the native layer. */
public void setVirtualPoolStatsRefresher(Runnable refresher) {
this.virtualPoolStatsRefresher = refresher;
}
}
1 change: 1 addition & 0 deletions sandbox/libs/dataformat-native/rust/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
pub mod error;
pub mod logger;
pub mod allocator;
pub mod memory_pool;

// Re-export the proc macro so plugins use `#[native_bridge_common::ffm_safe]`
pub use native_bridge_macros::ffm_safe;
Loading
Loading