Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion libs/arrow-spi/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ apply plugin: 'opensearch.publish'
dependencies {
api project(':libs:opensearch-core')
api project(':libs:opensearch-common')
testImplementation project(':test:framework')
testImplementation(project(':test:framework')) {
exclude group: 'org.opensearch', module: 'opensearch-arrow-spi'
}
}

tasks.named('forbiddenApisMain').configure {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,52 +9,105 @@
package org.opensearch.arrow.spi;

import java.io.Closeable;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Supplier;

/**
* Arrow-agnostic interface for a hierarchical native memory allocator.
* Unified native memory allocator interface.
*
* <p>The implementation (backed by Arrow's {@code RootAllocator}) is provided by
* a plugin. The SPI allows other subsystems to interact with the allocator
* without depending on Arrow classes.
*
* <p>Plugins that need Arrow allocators obtain the implementation via
* service lookup or plugin extension and call {@link #getOrCreatePool} to
* register their pool.
* <p>Manages memory pools under a shared budget. Each pool has a minimum
* guaranteed allocation and a maximum burst limit. Implementations may
* redistribute unused capacity across pools.
*
* @opensearch.api
*/
public interface NativeAllocator extends Closeable {

/**
* Returns the named pool, creating it on first access with the given limit.
* Subsequent calls with the same name return the same pool (first-call limit wins).
* Returns the named pool, creating it on first access.
* Subsequent calls with the same name return the existing pool (first-call config wins).
*
* @param poolName logical pool name (e.g., "query", "flight")
* @param limit maximum bytes this pool can allocate in aggregate
* @param poolName logical pool name
* @param min minimum guaranteed bytes
* @param max maximum bytes this pool can allocate
* @param group the group this pool belongs to for aggregated stats, or null
* @return an opaque pool handle
*/
PoolHandle getOrCreatePool(String poolName, long limit);
PoolHandle getOrCreatePool(String poolName, long min, long max, PoolGroup group);

/**
* Updates the limit of an existing pool. Children of the pool allocator
* inherit the change automatically via Arrow's parent-cap check at
* allocation time — no notification SPI is needed.
* Updates the effective limit of an existing pool.
*
* @param poolName logical pool name
* @param newLimit new maximum bytes for the pool
*/
void setPoolLimit(String poolName, long newLimit);

/**
* Sets the root-level memory limit for the entire allocator.
* Registers a virtual pool with initial min/max and a callback
* invoked when the pool's limit changes.
*
* @param poolName logical pool name
* @param min minimum guaranteed bytes
* @param max initial maximum bytes (the pool's starting limit)
* @param group the group this pool belongs to for aggregated stats
* @param limitSetter callback invoked when the pool limit changes
* @return a handle to update stats from the native layer
*/
VirtualPoolHandle registerVirtualPool(String poolName, long min, long max, PoolGroup group, Consumer<Long> limitSetter);

/**
* Updates the minimum guaranteed bytes for a pool.
*
* @param poolName logical pool name
* @param newMin new minimum bytes
*/
void setPoolMin(String poolName, long newMin);

/**
* Returns all registered pool names.
*/
Set<String> getAllPoolNames();

/**
* Adds a callback invoked before stats collection to refresh pool usage data.
*
* @param limit new maximum bytes for the root allocator
* @param refresher runnable that updates pool stats
*/
void addStatsRefresher(Runnable refresher);

/**
* Sets the supplier for process-wide native memory stats.
*
* @param supplier returns [allocatedBytes, residentBytes]
*/
void setNativeMemoryStatsSupplier(Supplier<long[]> supplier);

/**
* Handle for a virtual pool. Plugins update stats via this handle.
*/
void setRootLimit(long limit);
interface VirtualPoolHandle {
/**
* Update the current usage stats.
*
* @param allocatedBytes current allocated bytes
* @param peakBytes peak allocated bytes
*/
void updateStats(long allocatedBytes, long peakBytes);

/** Returns current allocated bytes. */
long allocatedBytes();

/** Returns peak allocated bytes. */
long peakBytes();

/** Returns current limit. */
long limit();
}

/**
* Opaque handle to a memory pool. Plugins downcast to the concrete type
* (e.g., Arrow's {@code BufferAllocator}) in the implementation layer.
* Opaque handle to a memory pool.
*/
interface PoolHandle {

Expand All @@ -63,28 +116,20 @@ interface PoolHandle {
*
* @param childName name for debugging
* @param childLimit maximum bytes for the child
* @return an opaque child handle (downcast to BufferAllocator in Arrow impl)
* @return a child handle
*/
PoolHandle newChild(String childName, long childLimit);

/**
* Returns the current allocated bytes for this pool/child.
*/
/** Returns the current allocated bytes. */
long allocatedBytes();

/**
* Returns the peak memory allocation.
*/
/** Returns the peak memory allocation. */
long peakBytes();

/**
* Returns the configured limit.
*/
/** Returns the configured limit. */
long limit();

/**
* Releases this allocation handle.
*/
/** Releases this allocation handle. */
void close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,6 @@ public final class NativeAllocatorPoolConfig {
/** Pool name for query-execution memory (analytics-engine fragments and per-query allocators). */
public static final String POOL_QUERY = "query";

/** Setting key for the root allocator limit. */
public static final String SETTING_ROOT_LIMIT = "native.allocator.root.limit";

/** Setting key for the Flight pool minimum. */
public static final String SETTING_FLIGHT_MIN = "native.allocator.pool.flight.min";
/** Setting key for the Flight pool maximum. */
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.arrow.spi;

/**
* Groups that memory pools belong to for aggregated customer-facing stats.
* Each pool is assigned to exactly one group at registration time.
*
* @opensearch.api
*/
public enum PoolGroup {
/** Arrow Flight transport pool group. */
TRANSPORT("transport"),
/** Query and analytics execution pool group. */
SEARCH("search"),
/** Ingest and write path pool group. */
INDEXING("indexing"),
/** Background merge operations pool group. */
MERGE("merge");

private final String name;

PoolGroup(String name) {
this.name = name;
}

/** Returns the group name used in stats output. */
public String getName() {
return name;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,4 @@ public void testSettingKeys() {
assertEquals("native.allocator.pool.query.min", NativeAllocatorPoolConfig.SETTING_QUERY_MIN);
assertEquals("native.allocator.pool.query.max", NativeAllocatorPoolConfig.SETTING_QUERY_MAX);
}

public void testRootSettingKey() {
assertEquals("native.allocator.root.limit", NativeAllocatorPoolConfig.SETTING_ROOT_LIMIT);
}
}
Loading
Loading