Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class ArrowNativeAllocator implements NativeAllocator {

private final RootAllocator root;
private final ConcurrentMap<String, ArrowPoolHandle> pools = new ConcurrentHashMap<>();
private final ConcurrentMap<String, VirtualPoolHandle> virtualPools = new ConcurrentHashMap<>();
private final ConcurrentMap<String, Long> poolMins = new ConcurrentHashMap<>();
private final ConcurrentMap<String, Long> poolMaxes = new ConcurrentHashMap<>();
private final ScheduledExecutorService rebalancer;
Expand Down Expand Up @@ -178,6 +179,11 @@ public void setRootLimit(long limit) {

@Override
public NativeAllocatorPoolStats stats() {
// Refresh Rust-side stats before collecting
Runnable refresher = this.virtualPoolStatsRefresher;
if (refresher != null) {
refresher.run();
}
List<NativeAllocatorPoolStats.PoolStats> poolStats = new ArrayList<>();
for (var entry : pools.entrySet()) {
BufferAllocator alloc = entry.getValue().allocator;
Expand All @@ -191,6 +197,11 @@ public NativeAllocatorPoolStats stats() {
)
);
}
// Include Rust-side virtual pools in the unified stats view
for (var entry : virtualPools.entrySet()) {
VirtualPoolHandle vp = entry.getValue();
poolStats.add(new NativeAllocatorPoolStats.PoolStats(entry.getKey(), vp.allocatedBytes(), vp.peakBytes(), vp.limit(), 0));
}
return new NativeAllocatorPoolStats(root.getAllocatedMemory(), root.getPeakMemoryAllocation(), root.getLimit(), poolStats);
}

Expand Down Expand Up @@ -352,4 +363,86 @@ public BufferAllocator getAllocator() {
return allocator;
}
}

/**
* A virtual pool handle for Rust-side memory pools that report stats back to Java
* without using Arrow's BufferAllocator. The Rust side enforces its own limit locally;
* this handle provides a unified view in {@link #stats()}.
*/
public static class VirtualPoolHandle implements PoolHandle {

private final String name;
private final long limit;
private volatile long allocatedBytes;
private volatile long peakBytes;

VirtualPoolHandle(String name, long limit) {
this.name = name;
this.limit = limit;
}

/** Called by Rust (via FFM) to report current usage. */
public void updateStats(long allocated, long peak) {
this.allocatedBytes = allocated;
this.peakBytes = peak;
}

@Override
public PoolHandle newChild(String childName, long childLimit) {
throw new UnsupportedOperationException("Virtual pool [" + name + "] does not support children");
}

@Override
public long allocatedBytes() {
return allocatedBytes;
}

@Override
public long peakBytes() {
return peakBytes;
}

@Override
public long limit() {
return limit;
}

@Override
public void close() {
// No-op — Rust owns the memory lifecycle
}
}

/**
* Registers a virtual pool for a Rust-side memory pool. The Rust side tracks
* allocations locally and periodically reports usage via {@link VirtualPoolHandle#updateStats}.
* Stats appear in {@link #stats()} alongside real Arrow pools.
*
* @param poolName logical pool name (e.g., "write")
* @param limit the limit enforced by Rust
* @return the virtual handle (caller passes to Rust for stats reporting)
*/
public VirtualPoolHandle registerVirtualPool(String poolName, long limit) {
VirtualPoolHandle handle = new VirtualPoolHandle(poolName, limit);
virtualPools.put(poolName, handle);
return handle;
}

/**
* Returns the virtual pool handle for a given name, or null if not registered.
*/
public VirtualPoolHandle getVirtualPool(String poolName) {
return virtualPools.get(poolName);
}

private volatile Runnable virtualPoolStatsRefresher;

/**
* Registers a callback that refreshes virtual pool stats from the native layer.
* Called by the parquet plugin during initialization. The callback is invoked
* before stats() to ensure returned values reflect current Rust-side usage.
*/
public void setVirtualPoolStatsRefresher(Runnable refresher) {
this.virtualPoolStatsRefresher = refresher;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -232,4 +232,57 @@ public void testSetPoolMinDoesNotShrinkLiveLimit() {
allocator.setPoolMin("p", 1L);
assertEquals("dropping min must not shrink live limit", startLimit, pool.getLimit());
}

public void testVirtualPoolRegistration() {
var handle = allocator.registerVirtualPool("write", 512 * 1024 * 1024);
assertNotNull(handle);
assertEquals(512 * 1024 * 1024, handle.limit());
assertEquals(0, handle.allocatedBytes());
assertEquals(0, handle.peakBytes());
assertSame(handle, allocator.getVirtualPool("write"));
}

public void testVirtualPoolUpdateStats() {
var handle = allocator.registerVirtualPool("merge", 0);
handle.updateStats(100_000, 200_000);
assertEquals(100_000, handle.allocatedBytes());
assertEquals(200_000, handle.peakBytes());
}

public void testVirtualPoolAppearsInStats() {
allocator.registerVirtualPool("write", 0);
allocator.getVirtualPool("write").updateStats(42, 99);

NativeAllocatorPoolStats stats = allocator.stats();
boolean found = false;
for (NativeAllocatorPoolStats.PoolStats ps : stats.getPools()) {
if ("write".equals(ps.getName())) {
assertEquals(42, ps.getAllocatedBytes());
assertEquals(99, ps.getPeakBytes());
found = true;
}
}
assertTrue("Virtual pool 'write' should appear in stats", found);
}

public void testStatsRefresherCalledBeforeStats() {
var handle = allocator.registerVirtualPool("write", 0);
allocator.setVirtualPoolStatsRefresher(() -> handle.updateStats(777, 888));

NativeAllocatorPoolStats stats = allocator.stats();
boolean found = false;
for (NativeAllocatorPoolStats.PoolStats ps : stats.getPools()) {
if ("write".equals(ps.getName())) {
assertEquals(777, ps.getAllocatedBytes());
assertEquals(888, ps.getPeakBytes());
found = true;
}
}
assertTrue("Refresher should have been called before collecting stats", found);
}

public void testVirtualPoolDoesNotSupportChildren() {
var handle = allocator.registerVirtualPool("vp", 0);
expectThrows(UnsupportedOperationException.class, () -> handle.newChild("child", 100));
}
}
1 change: 1 addition & 0 deletions sandbox/libs/dataformat-native/rust/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
pub mod error;
pub mod logger;
pub mod allocator;
pub mod memory_pool;

// Re-export the proc macro so plugins use `#[native_bridge_common::ffm_safe]`
pub use native_bridge_macros::ffm_safe;
Loading
Loading