Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Remote cluster state] Download functionality of global metadata from remote store ([#10535](https://github.com/opensearch-project/OpenSearch/pull/10535))
- [Remote cluster state] Restore global metadata from remote store when local state is lost after quorum loss ([#10404](https://github.com/opensearch-project/OpenSearch/pull/10404))
- [AdmissionControl] Added changes for AdmissionControl Interceptor and AdmissionControlService for RateLimiting ([#9286](https://github.com/opensearch-project/OpenSearch/pull/9286))
- [AdmissionControl] Added changes to integrade cpu AC to ResourceUsageCollector and Emit Stats

### Dependencies
- Bump `log4j-core` from 2.18.0 to 2.19.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import org.opensearch.monitor.process.ProcessStats;
import org.opensearch.node.AdaptiveSelectionStats;
import org.opensearch.node.NodesResourceUsageStats;
import org.opensearch.ratelimitting.admissioncontrol.stats.AdmissionControlStats;
import org.opensearch.repositories.RepositoriesStats;
import org.opensearch.script.ScriptCacheStats;
import org.opensearch.script.ScriptStats;
Expand Down Expand Up @@ -154,6 +155,9 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
@Nullable
private RepositoriesStats repositoriesStats;

@Nullable
private AdmissionControlStats admissionControlStats;

public NodeStats(StreamInput in) throws IOException {
super(in);
timestamp = in.readVLong();
Expand Down Expand Up @@ -226,6 +230,11 @@ public NodeStats(StreamInput in) throws IOException {
} else {
repositoriesStats = null;
}
if(in.getVersion().onOrAfter(Version.V_3_0_0)) {
admissionControlStats = in.readOptionalWriteable(AdmissionControlStats::new);
} else {
admissionControlStats = null;
}
}

public NodeStats(
Expand Down Expand Up @@ -255,7 +264,8 @@ public NodeStats(
@Nullable TaskCancellationStats taskCancellationStats,
@Nullable SearchPipelineStats searchPipelineStats,
@Nullable SegmentReplicationRejectionStats segmentReplicationRejectionStats,
@Nullable RepositoriesStats repositoriesStats
@Nullable RepositoriesStats repositoriesStats,
@Nullable AdmissionControlStats admissionControlStats
) {
super(node);
this.timestamp = timestamp;
Expand Down Expand Up @@ -284,6 +294,7 @@ public NodeStats(
this.searchPipelineStats = searchPipelineStats;
this.segmentReplicationRejectionStats = segmentReplicationRejectionStats;
this.repositoriesStats = repositoriesStats;
this.admissionControlStats = admissionControlStats;
}

public long getTimestamp() {
Expand Down Expand Up @@ -435,6 +446,11 @@ public RepositoriesStats getRepositoriesStats() {
return repositoriesStats;
}

@Nullable
public AdmissionControlStats getAdmissionControlStats() {
return admissionControlStats;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
Expand Down Expand Up @@ -588,6 +604,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (getRepositoriesStats() != null) {
getRepositoriesStats().toXContent(builder, params);
}
if (getAdmissionControlStats() != null) {
getAdmissionControlStats().toXContent(builder, params);
}
return builder;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,8 @@ public enum Metric {
SEARCH_PIPELINE("search_pipeline"),
RESOURCE_USAGE_STATS("resource_usage_stats"),
SEGMENT_REPLICATION_BACKPRESSURE("segment_replication_backpressure"),
REPOSITORIES("repositories");
REPOSITORIES("repositories"),
ADMISSION_CONTROL("admission_control");

private String metricName;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest) {
NodesStatsRequest.Metric.SEARCH_PIPELINE.containedIn(metrics),
NodesStatsRequest.Metric.RESOURCE_USAGE_STATS.containedIn(metrics),
NodesStatsRequest.Metric.SEGMENT_REPLICATION_BACKPRESSURE.containedIn(metrics),
NodesStatsRequest.Metric.REPOSITORIES.containedIn(metrics)
NodesStatsRequest.Metric.REPOSITORIES.containedIn(metrics),
NodesStatsRequest.Metric.ADMISSION_CONTROL.containedIn(metrics)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;
import org.opensearch.core.transport.TransportResponse;
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType;
import org.opensearch.search.SearchPhaseResult;
import org.opensearch.search.SearchService;
import org.opensearch.search.dfs.DfsSearchResult;
Expand Down Expand Up @@ -542,6 +543,9 @@ public static void registerRequestHandler(TransportService transportService, Sea
transportService.registerRequestHandler(
DFS_ACTION_NAME,
ThreadPool.Names.SAME,
false,
true,
AdmissionControlActionType.SEARCH,
ShardSearchRequest::new,
(request, channel, task) -> searchService.executeDfsPhase(
request,
Expand All @@ -556,6 +560,9 @@ public static void registerRequestHandler(TransportService transportService, Sea
transportService.registerRequestHandler(
QUERY_ACTION_NAME,
ThreadPool.Names.SAME,
false,
true,
AdmissionControlActionType.SEARCH,
ShardSearchRequest::new,
(request, channel, task) -> {
searchService.executeQueryPhase(
Expand All @@ -575,6 +582,9 @@ public static void registerRequestHandler(TransportService transportService, Sea
transportService.registerRequestHandler(
QUERY_ID_ACTION_NAME,
ThreadPool.Names.SAME,
false,
true,
AdmissionControlActionType.SEARCH,
QuerySearchRequest::new,
(request, channel, task) -> {
searchService.executeQueryPhase(
Expand Down Expand Up @@ -633,6 +643,7 @@ public static void registerRequestHandler(TransportService transportService, Sea
ThreadPool.Names.SAME,
true,
true,
AdmissionControlActionType.SEARCH,
ShardFetchSearchRequest::new,
(request, channel, task) -> {
searchService.executeFetchPhase(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListenerResponseHandler;
import org.opensearch.action.UnavailableShardsException;
import org.opensearch.action.bulk.TransportShardBulkAction;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.ActiveShardCount;
import org.opensearch.action.support.ChannelActionListener;
Expand Down Expand Up @@ -82,6 +83,7 @@
import org.opensearch.indices.IndexClosedException;
import org.opensearch.indices.IndicesService;
import org.opensearch.node.NodeClosedException;
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.ConnectTransportException;
Expand Down Expand Up @@ -219,14 +221,26 @@ protected TransportReplicationAction(

transportService.registerRequestHandler(actionName, ThreadPool.Names.SAME, requestReader, this::handleOperationRequest);

transportService.registerRequestHandler(
transportPrimaryAction,
executor,
forceExecutionOnPrimary,
true,
in -> new ConcreteShardRequest<>(requestReader, in),
this::handlePrimaryRequest
);
if(transportPrimaryAction.equals(TransportShardBulkAction.ACTION_NAME + PRIMARY_ACTION_SUFFIX)){
transportService.registerRequestHandler(
transportPrimaryAction,
executor,
forceExecutionOnPrimary,
true,
AdmissionControlActionType.INDEXING,
in -> new ConcreteShardRequest<>(requestReader, in),
this::handlePrimaryRequest
);
} else {
transportService.registerRequestHandler(
transportPrimaryAction,
executor,
forceExecutionOnPrimary,
true,
in -> new ConcreteShardRequest<>(requestReader, in),
this::handlePrimaryRequest
);
}

// we must never reject on because of thread pool capacity on replicas
transportService.registerRequestHandler(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.opensearch.http.HttpServerTransport;
import org.opensearch.index.shard.PrimaryReplicaSyncer.ResyncTask;
import org.opensearch.plugins.NetworkPlugin;
import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType;
import org.opensearch.tasks.RawTaskStatus;
import org.opensearch.tasks.Task;
import org.opensearch.telemetry.tracing.Tracer;
Expand Down Expand Up @@ -299,6 +300,20 @@ public <T extends TransportRequest> TransportRequestHandler<T> interceptHandler(
return actualHandler;
}

@Override
public <T extends TransportRequest> TransportRequestHandler<T> interceptHandler(
String action,
String executor,
boolean forceExecution,
TransportRequestHandler<T> actualHandler,
AdmissionControlActionType transportActionType
) {
for (TransportInterceptor interceptor : this.transportInterceptors) {
actualHandler = interceptor.interceptHandler(action, executor, forceExecution, actualHandler, transportActionType);
}
return actualHandler;
}

@Override
public AsyncSender interceptSender(AsyncSender sender) {
for (TransportInterceptor interceptor : this.transportInterceptors) {
Expand Down
29 changes: 16 additions & 13 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -894,12 +894,24 @@ protected Node(

final RestController restController = actionModule.getRestController();

final AdmissionControlService admissionControlService = new AdmissionControlService(
final NodeResourceUsageTracker nodeResourceUsageTracker = new NodeResourceUsageTracker(
threadPool,
settings,
clusterService.getClusterSettings(),
clusterService.getClusterSettings()
);
final ResourceUsageCollectorService resourceUsageCollectorService = new ResourceUsageCollectorService(
nodeResourceUsageTracker,
clusterService,
threadPool
);

final AdmissionControlService admissionControlService = new AdmissionControlService(
settings,
clusterService,
threadPool,
resourceUsageCollectorService
);

AdmissionControlTransportInterceptor admissionControlTransportInterceptor = new AdmissionControlTransportInterceptor(
admissionControlService
);
Expand Down Expand Up @@ -1101,16 +1113,6 @@ protected Node(
transportService.getTaskManager(),
taskCancellationMonitoringSettings
);
final NodeResourceUsageTracker nodeResourceUsageTracker = new NodeResourceUsageTracker(
threadPool,
settings,
clusterService.getClusterSettings()
);
final ResourceUsageCollectorService resourceUsageCollectorService = new ResourceUsageCollectorService(
nodeResourceUsageTracker,
clusterService,
threadPool
);
this.nodeService = new NodeService(
settings,
threadPool,
Expand All @@ -1135,7 +1137,8 @@ protected Node(
taskCancellationMonitoringService,
resourceUsageCollectorService,
segmentReplicationStatsTracker,
repositoryService
repositoryService,
admissionControlService
);

final SearchService searchService = newSearchService(
Expand Down
12 changes: 9 additions & 3 deletions server/src/main/java/org/opensearch/node/NodeService.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.opensearch.ingest.IngestService;
import org.opensearch.monitor.MonitorService;
import org.opensearch.plugins.PluginsService;
import org.opensearch.ratelimitting.admissioncontrol.AdmissionControlService;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.script.ScriptService;
import org.opensearch.search.aggregations.support.AggregationUsageService;
Expand Down Expand Up @@ -96,6 +97,7 @@ public class NodeService implements Closeable {
private final FileCache fileCache;
private final TaskCancellationMonitoringService taskCancellationMonitoringService;
private final RepositoriesService repositoriesService;
AdmissionControlService admissionControlService;

private final SegmentReplicationStatsTracker segmentReplicationStatsTracker;

Expand Down Expand Up @@ -123,7 +125,8 @@ public class NodeService implements Closeable {
TaskCancellationMonitoringService taskCancellationMonitoringService,
ResourceUsageCollectorService resourceUsageCollectorService,
SegmentReplicationStatsTracker segmentReplicationStatsTracker,
RepositoriesService repositoriesService
RepositoriesService repositoriesService,
AdmissionControlService admissionControlService
) {
this.settings = settings;
this.threadPool = threadPool;
Expand All @@ -148,6 +151,7 @@ public class NodeService implements Closeable {
this.taskCancellationMonitoringService = taskCancellationMonitoringService;
this.resourceUsageCollectorService = resourceUsageCollectorService;
this.repositoriesService = repositoriesService;
this.admissionControlService = admissionControlService;
clusterService.addStateApplier(ingestService);
clusterService.addStateApplier(searchPipelineService);
this.segmentReplicationStatsTracker = segmentReplicationStatsTracker;
Expand Down Expand Up @@ -232,7 +236,8 @@ public NodeStats stats(
boolean searchPipelineStats,
boolean resourceUsageStats,
boolean segmentReplicationTrackerStats,
boolean repositoriesStats
boolean repositoriesStats,
boolean admissionControl
) {
// for indices stats we want to include previous allocated shards stats as well (it will
// only be applied to the sensible ones to use, like refresh/merge/flush/indexing stats)
Expand Down Expand Up @@ -263,7 +268,8 @@ public NodeStats stats(
taskCancellation ? this.taskCancellationMonitoringService.stats() : null,
searchPipelineStats ? this.searchPipelineService.stats() : null,
segmentReplicationTrackerStats ? this.segmentReplicationStatsTracker.getTotalRejectionStats() : null,
repositoriesStats ? this.repositoriesService.getRepositoriesStats() : null
repositoriesStats ? this.repositoriesService.getRepositoriesStats() : null,
admissionControl ? this.admissionControlService.stats(): null
);
}

Expand Down
Loading