From d9d07eb734088937699df9c02af685fe7f81f870 Mon Sep 17 00:00:00 2001 From: Bharathwaj G Date: Sun, 22 Oct 2023 10:02:10 +0530 Subject: [PATCH 1/2] javadoc changes and more Signed-off-by: Bharathwaj G --- CHANGELOG.md | 4 +- .../admin/cluster/node/stats/NodeStats.java | 7 +- .../stats/TransportClusterStatsAction.java | 1 + .../TransportReplicationAction.java | 3 +- .../common/network/NetworkModule.java | 7 +- .../java/org/opensearch/node/NodeService.java | 2 +- .../AdmissionControlService.java | 51 +++++++------ .../controllers/AdmissionController.java | 24 ++++-- .../CPUBasedAdmissionController.java | 76 +++++++++++++------ .../CPUBasedAdmissionControllerSettings.java | 17 +---- .../stats/AdmissionControlStats.java | 18 ++--- ...ats.java => AdmissionControllerStats.java} | 32 ++++---- .../stats/BaseAdmissionControllerStats.java | 15 ---- .../AdmissionControlTransportHandler.java | 20 ++--- .../AdmissionControlTransportInterceptor.java | 10 ++- .../transport/TransportInterceptor.java | 11 +-- .../transport/TransportService.java | 11 +-- .../AdmissionControlServiceTests.java | 6 +- .../CPUBasedAdmissionControllerTests.java | 16 ++-- .../enums/TransportActionTypeTests.java | 2 +- ...CPUBasedAdmissionControlSettingsTests.java | 1 - 21 files changed, 182 insertions(+), 152 deletions(-) rename server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/{CPUBasedAdmissionControllerStats.java => AdmissionControllerStats.java} (71%) delete mode 100644 server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/BaseAdmissionControllerStats.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 76bf757083d15..fd3acf115dd23 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,8 +18,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [Remote cluster state] Upload global metadata in cluster state to remote store([#10404](https://github.com/opensearch-project/OpenSearch/pull/10404)) - [Remote cluster state] Download functionality of global metadata from remote store ([#10535](https://github.com/opensearch-project/OpenSearch/pull/10535)) - [Remote cluster state] Restore global metadata from remote store when local state is lost after quorum loss ([#10404](https://github.com/opensearch-project/OpenSearch/pull/10404)) -- [AdmissionControl] Added changes for AdmissionControl Interceptor and AdmissionControlService for RateLimiting ([#9286](https://github.com/opensearch-project/OpenSearch/pull/9286)) -- [AdmissionControl] Added changes to integrade cpu AC to ResourceUsageCollector and Emit Stats +- [Admission Control] Add changes for AdmissionControl Interceptor and AdmissionControlService for RateLimiting ([#9286](https://github.com/opensearch-project/OpenSearch/pull/9286)) +- [Admission Control] Add changes to integrate CPU AC and ResourceUsageCollector with Stats ([#9286](https://github.com/opensearch-project/OpenSearch/pull/9286)) ### Dependencies - Bump `log4j-core` from 2.18.0 to 2.19.0 diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java index 1598fbaf3711a..330b57cf32c70 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java @@ -230,7 +230,8 @@ public NodeStats(StreamInput in) throws IOException { } else { repositoriesStats = null; } - if(in.getVersion().onOrAfter(Version.V_3_0_0)) { + // TODO: change to V_2_12_0 on main after backport to 2.x + if (in.getVersion().onOrAfter(Version.V_3_0_0)) { admissionControlStats = in.readOptionalWriteable(AdmissionControlStats::new); } else { admissionControlStats = null; @@ -504,6 +505,10 @@ public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_2_12_0)) { out.writeOptionalWriteable(repositoriesStats); } + // TODO: change to V_2_12_0 on main after backport to 2.x + if (out.getVersion().onOrAfter(Version.V_3_0_0)) { + out.writeOptionalWriteable(admissionControlStats); + } } @Override diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java index 5efec8b876435..9c5dcc9e9de3f 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/stats/TransportClusterStatsAction.java @@ -171,6 +171,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq false, false, false, + false, false ); List shardsStats = new ArrayList<>(); diff --git a/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java index 7dd34fff1b159..11046e44b61e0 100644 --- a/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java @@ -221,7 +221,8 @@ protected TransportReplicationAction( transportService.registerRequestHandler(actionName, ThreadPool.Names.SAME, requestReader, this::handleOperationRequest); - if(transportPrimaryAction.equals(TransportShardBulkAction.ACTION_NAME + PRIMARY_ACTION_SUFFIX)){ + // Register only TransportShardBulkAction for admission control ( primary indexing action ) + if (transportPrimaryAction.equals(TransportShardBulkAction.ACTION_NAME + PRIMARY_ACTION_SUFFIX)) { transportService.registerRequestHandler( transportPrimaryAction, executor, diff --git a/server/src/main/java/org/opensearch/common/network/NetworkModule.java b/server/src/main/java/org/opensearch/common/network/NetworkModule.java index 7fa8ec771b488..5687b2f0a253a 100644 --- a/server/src/main/java/org/opensearch/common/network/NetworkModule.java +++ b/server/src/main/java/org/opensearch/common/network/NetworkModule.java @@ -300,16 +300,19 @@ public TransportRequestHandler interceptHandler( return actualHandler; } + /** + * Intercept the transport action and perform admission control if applicable + */ @Override public TransportRequestHandler interceptHandler( String action, String executor, boolean forceExecution, TransportRequestHandler actualHandler, - AdmissionControlActionType transportActionType + AdmissionControlActionType admissionControlActionType ) { for (TransportInterceptor interceptor : this.transportInterceptors) { - actualHandler = interceptor.interceptHandler(action, executor, forceExecution, actualHandler, transportActionType); + actualHandler = interceptor.interceptHandler(action, executor, forceExecution, actualHandler, admissionControlActionType); } return actualHandler; } diff --git a/server/src/main/java/org/opensearch/node/NodeService.java b/server/src/main/java/org/opensearch/node/NodeService.java index 3c6dd15834f57..224061d09b2c6 100644 --- a/server/src/main/java/org/opensearch/node/NodeService.java +++ b/server/src/main/java/org/opensearch/node/NodeService.java @@ -269,7 +269,7 @@ public NodeStats stats( searchPipelineStats ? this.searchPipelineService.stats() : null, segmentReplicationTrackerStats ? this.segmentReplicationStatsTracker.getTotalRejectionStats() : null, repositoriesStats ? this.repositoriesService.getRepositoriesStats() : null, - admissionControl ? this.admissionControlService.stats(): null + admissionControl ? this.admissionControlService.stats() : null ); } diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlService.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlService.java index b71b062dc788d..1683f8e381c58 100644 --- a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlService.java +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlService.java @@ -11,15 +11,13 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.cluster.service.ClusterService; -import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.node.ResourceUsageCollectorService; import org.opensearch.ratelimitting.admissioncontrol.controllers.AdmissionController; import org.opensearch.ratelimitting.admissioncontrol.controllers.CPUBasedAdmissionController; import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; import org.opensearch.ratelimitting.admissioncontrol.stats.AdmissionControlStats; -import org.opensearch.ratelimitting.admissioncontrol.stats.BaseAdmissionControllerStats; -import org.opensearch.ratelimitting.admissioncontrol.stats.CPUBasedAdmissionControllerStats; +import org.opensearch.ratelimitting.admissioncontrol.stats.AdmissionControllerStats; import org.opensearch.threadpool.ThreadPool; import java.util.ArrayList; @@ -47,8 +45,14 @@ public class AdmissionControlService { * @param settings Immutable settings instance * @param clusterService ClusterService Instance * @param threadPool ThreadPool Instance + * @param resourceUsageCollectorService Instance used to get node resource usage stats */ - public AdmissionControlService(Settings settings, ClusterService clusterService, ThreadPool threadPool, ResourceUsageCollectorService resourceUsageCollectorService) { + public AdmissionControlService( + Settings settings, + ClusterService clusterService, + ThreadPool threadPool, + ResourceUsageCollectorService resourceUsageCollectorService + ) { this.threadPool = threadPool; this.admissionControlSettings = new AdmissionControlSettings(clusterService.getClusterSettings(), settings); this.ADMISSION_CONTROLLERS = new ConcurrentHashMap<>(); @@ -68,11 +72,13 @@ private void initialise() { /** * - * @param action transport action that is being executed. we are using it for logging while request is rejected - * @param admissionControlActionType type of the admissionControllerActionType + * @param action Transport action name + * @param admissionControlActionType admissionControllerActionType value */ public void applyTransportAdmissionControl(String action, AdmissionControlActionType admissionControlActionType) { - this.ADMISSION_CONTROLLERS.forEach((name, admissionController) -> { admissionController.apply(action, admissionControlActionType); }); + this.ADMISSION_CONTROLLERS.forEach( + (name, admissionController) -> { admissionController.apply(action, admissionControlActionType); } + ); } /** @@ -90,7 +96,12 @@ public void registerAdmissionController(String admissionControllerName) { private AdmissionController controllerFactory(String admissionControllerName) { switch (admissionControllerName) { case CPU_BASED_ADMISSION_CONTROLLER: - return new CPUBasedAdmissionController(admissionControllerName, this.settings, this.clusterService, this.resourceUsageCollectorService); + return new CPUBasedAdmissionController( + admissionControllerName, + this.resourceUsageCollectorService, + this.clusterService, + this.settings + ); default: throw new IllegalArgumentException("Not Supported AdmissionController : " + admissionControllerName); } @@ -113,26 +124,18 @@ public AdmissionController getAdmissionController(String controllerName) { return this.ADMISSION_CONTROLLERS.getOrDefault(controllerName, null); } - public AdmissionControlStats stats(){ - List statsList = new ArrayList<>(); - if(this.ADMISSION_CONTROLLERS.size() > 0){ + /** + * Return admission control stats + */ + public AdmissionControlStats stats() { + List statsList = new ArrayList<>(); + if (this.ADMISSION_CONTROLLERS.size() > 0) { this.ADMISSION_CONTROLLERS.forEach((controllerName, admissionController) -> { - BaseAdmissionControllerStats admissionControllerStats = controllerStatsFactory(admissionController); - if(admissionControllerStats != null) { - statsList.add(admissionControllerStats); - } + AdmissionControllerStats admissionControllerStats = new AdmissionControllerStats(admissionController, controllerName); + statsList.add(admissionControllerStats); }); return new AdmissionControlStats(statsList); } return null; } - - private BaseAdmissionControllerStats controllerStatsFactory(AdmissionController admissionController) { - switch (admissionController.getName()) { - case CPU_BASED_ADMISSION_CONTROLLER: - return new CPUBasedAdmissionControllerStats(admissionController); - default: - return null; - } - } } diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/AdmissionController.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/AdmissionController.java index 794a70f7a7483..040ddc4a6baaa 100644 --- a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/AdmissionController.java +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/AdmissionController.java @@ -11,7 +11,6 @@ import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.node.ResourceUsageCollectorService; -import org.opensearch.ratelimitting.admissioncontrol.AdmissionControlService; import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode; @@ -33,11 +32,17 @@ public abstract class AdmissionController { public final ClusterService clusterService; /** - * @param rejectionCount initialised rejectionCount value for AdmissionController - * @param admissionControllerName name of the admissionController + * @param admissionControllerName name of the admissionController + * @param resourceUsageCollectorService instance used to get resource usage stats of the node + * @param rejectionCount initialised rejectionCount value for AdmissionController * @param clusterService */ - public AdmissionController(AtomicLong rejectionCount, String admissionControllerName, ResourceUsageCollectorService resourceUsageCollectorService, ClusterService clusterService) { + public AdmissionController( + String admissionControllerName, + ResourceUsageCollectorService resourceUsageCollectorService, + AtomicLong rejectionCount, + ClusterService clusterService + ) { this.rejectionCount = rejectionCount; this.admissionControllerName = admissionControllerName; this.resourceUsageCollectorService = resourceUsageCollectorService; @@ -62,8 +67,7 @@ public Boolean isAdmissionControllerEnforced(AdmissionControlMode admissionContr } /** - * Increment the tracking-objects and apply the admission control if threshold is breached. - * Mostly applicable while applying admission controller + * Apply admission control based on the resource usage for an action */ public abstract void apply(String action, AdmissionControlActionType admissionControlActionType); @@ -74,9 +78,12 @@ public String getName() { return this.admissionControllerName; } + /** + * Add rejection count to the rejection count metric tracked by the admission controller + */ public void addRejectionCount(String admissionControlActionType, long count) { AtomicLong updatedCount = new AtomicLong(0); - if(this.rejectionCountMap.containsKey(admissionControlActionType)){ + if (this.rejectionCountMap.containsKey(admissionControlActionType)) { updatedCount.addAndGet(this.rejectionCountMap.get(admissionControlActionType).get()); } updatedCount.addAndGet(count); @@ -91,6 +98,9 @@ public long getRejectionCount(String admissionControlActionType) { return rejectionCount.get(); } + /** + * Get rejection stats of the admission controller + */ public Map getRejectionStats() { Map rejectionStats = new HashMap<>(); rejectionCountMap.forEach((actionType, count) -> rejectionStats.put(actionType, count.get())); diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CPUBasedAdmissionController.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CPUBasedAdmissionController.java index 2514b1e83fd04..dd9c56a46b892 100644 --- a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CPUBasedAdmissionController.java +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CPUBasedAdmissionController.java @@ -16,11 +16,8 @@ import org.opensearch.node.NodeResourceUsageStats; import org.opensearch.node.ResourceUsageCollectorService; import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; -import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode; import org.opensearch.ratelimitting.admissioncontrol.settings.CPUBasedAdmissionControllerSettings; -import java.util.HashMap; -import java.util.Map; import java.util.Optional; import java.util.concurrent.atomic.AtomicLong; @@ -33,56 +30,89 @@ public class CPUBasedAdmissionController extends AdmissionController { public CPUBasedAdmissionControllerSettings settings; /** - * - * @param admissionControllerName State of the admission controller + * @param admissionControllerName Name of the admission controller + * @param resourceUsageCollectorService Instance used to get node resource usage stats + * @param clusterService ClusterService Instance + * @param settings Immutable settings instance */ - public CPUBasedAdmissionController(String admissionControllerName, Settings settings, ClusterService clusterService, ResourceUsageCollectorService resourceUsageCollectorService) { - super(new AtomicLong(0), admissionControllerName, resourceUsageCollectorService, clusterService); + public CPUBasedAdmissionController( + String admissionControllerName, + ResourceUsageCollectorService resourceUsageCollectorService, + ClusterService clusterService, + Settings settings + ) { + super(admissionControllerName, resourceUsageCollectorService, new AtomicLong(0), clusterService); this.settings = new CPUBasedAdmissionControllerSettings(clusterService.getClusterSettings(), settings); } /** - * This function will take of applying admission controller based on CPU usage + * Apply admission control based on process CPU usage * @param action is the transport action */ @Override public void apply(String action, AdmissionControlActionType admissionControlActionType) { - // TODO Will extend this logic further currently just incrementing rejectionCount if (this.isEnabledForTransportLayer(this.settings.getTransportLayerAdmissionControllerMode())) { this.applyForTransportLayer(action, admissionControlActionType); } } + /** + * Apply transport layer admission control if configured limit has been reached + */ private void applyForTransportLayer(String actionName, AdmissionControlActionType admissionControlActionType) { - if (isLimitsBreached(admissionControlActionType)) { + if (isLimitsBreached(actionName, admissionControlActionType)) { this.addRejectionCount(admissionControlActionType.getType(), 1); if (this.isAdmissionControllerEnforced(this.settings.getTransportLayerAdmissionControllerMode())) { - throw new OpenSearchRejectedExecutionException("Action ["+ actionName +"] was rejected due to CPU usage admission controller limit breached"); + throw new OpenSearchRejectedExecutionException( + String.format("CPU usage admission controller limit reached for action [%s]", admissionControlActionType.name()) + ); } } } - private boolean isLimitsBreached(AdmissionControlActionType transportActionType) { - long maxCpuLimit = this.getCpuRejectionThreshold(transportActionType); - Optional nodePerformanceStatistics = this.resourceUsageCollectorService.getNodeStatistics(this.clusterService.state().nodes().getLocalNodeId()); - if(nodePerformanceStatistics.isPresent()) { - double cpuUsage = nodePerformanceStatistics.get().getCpuUtilizationPercent(); - if (cpuUsage >= maxCpuLimit){ - LOGGER.warn("CpuBasedAdmissionController rejected the request as the current CPU usage [" + - cpuUsage + "%] exceeds the allowed limit [" + maxCpuLimit + "%]"); - return true; + /** + * Check if the configured resource usage limits are breached for the action + */ + private boolean isLimitsBreached(String actionName, AdmissionControlActionType admissionControlActionType) { + // check if cluster state is ready + if (clusterService.state() != null && clusterService.state().nodes() != null) { + long maxCpuLimit = this.getCpuRejectionThreshold(admissionControlActionType); + Optional nodePerformanceStatistics = this.resourceUsageCollectorService.getNodeStatistics( + this.clusterService.state().nodes().getLocalNodeId() + ); + if (nodePerformanceStatistics.isPresent()) { + double cpuUsage = nodePerformanceStatistics.get().getCpuUtilizationPercent(); + if (cpuUsage >= maxCpuLimit) { + LOGGER.warn( + "CpuBasedAdmissionController rejected the request as the current CPU " + + "usage [{}] exceeds the allowed limit [{}] for transport action [{}]", + cpuUsage, + maxCpuLimit, + actionName + ); + return true; + } } } return false; } - private long getCpuRejectionThreshold(AdmissionControlActionType transportActionType) { - switch (transportActionType) { + + /** + * Get CPU rejection threshold based on action type + */ + private long getCpuRejectionThreshold(AdmissionControlActionType admissionControlActionType) { + switch (admissionControlActionType) { case SEARCH: return this.settings.getSearchCPULimit(); case INDEXING: return this.settings.getIndexingCPULimit(); default: - throw new IllegalArgumentException("Not Supported TransportAction Type: " + transportActionType.getType()); + throw new IllegalArgumentException( + String.format( + "Admission control not Supported for AdmissionControlActionType: %s", + admissionControlActionType.getType() + ) + ); } } } diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/settings/CPUBasedAdmissionControllerSettings.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/settings/CPUBasedAdmissionControllerSettings.java index 141e9b68db145..397fd485c6da3 100644 --- a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/settings/CPUBasedAdmissionControllerSettings.java +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/settings/CPUBasedAdmissionControllerSettings.java @@ -14,9 +14,6 @@ import org.opensearch.ratelimitting.admissioncontrol.AdmissionControlSettings; import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode; -import java.util.Arrays; -import java.util.List; - /** * Settings related to cpu based admission controller. * @opensearch.internal @@ -28,15 +25,12 @@ public class CPUBasedAdmissionControllerSettings { * Default parameters for the CPUBasedAdmissionControllerSettings */ public static class Defaults { - public static final long CPU_USAGE = 95; - public static List TRANSPORT_LAYER_DEFAULT_URI_TYPE = Arrays.asList("indexing", "search"); + public static final long CPU_USAGE_LIMIT = 95; } private AdmissionControlMode transportLayerMode; private Long searchCPULimit; private Long indexingCPULimit; - - private final List transportActionsList; /** * Feature level setting to operate in shadow-mode or in enforced-mode. If enforced field is set * rejection will be performed, otherwise only rejection metrics will be populated. @@ -54,7 +48,7 @@ public static class Defaults { */ public static final Setting SEARCH_CPU_USAGE_LIMIT = Setting.longSetting( "admission_control.search.cpu_usage.limit", - Defaults.CPU_USAGE, + Defaults.CPU_USAGE_LIMIT, Setting.Property.Dynamic, Setting.Property.NodeScope ); @@ -64,7 +58,7 @@ public static class Defaults { */ public static final Setting INDEXING_CPU_USAGE_LIMIT = Setting.longSetting( "admission_control.indexing.cpu_usage.limit", - Defaults.CPU_USAGE, + Defaults.CPU_USAGE_LIMIT, Setting.Property.Dynamic, Setting.Property.NodeScope ); @@ -75,7 +69,6 @@ public CPUBasedAdmissionControllerSettings(ClusterSettings clusterSettings, Sett clusterSettings.addSettingsUpdateConsumer(CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE, this::setTransportLayerMode); this.searchCPULimit = SEARCH_CPU_USAGE_LIMIT.get(settings); this.indexingCPULimit = INDEXING_CPU_USAGE_LIMIT.get(settings); - this.transportActionsList = Defaults.TRANSPORT_LAYER_DEFAULT_URI_TYPE; clusterSettings.addSettingsUpdateConsumer(INDEXING_CPU_USAGE_LIMIT, this::setIndexingCPULimit); clusterSettings.addSettingsUpdateConsumer(SEARCH_CPU_USAGE_LIMIT, this::setSearchCPULimit); } @@ -103,8 +96,4 @@ public void setIndexingCPULimit(Long indexingCPULimit) { public void setSearchCPULimit(Long searchCPULimit) { this.searchCPULimit = searchCPULimit; } - - public List getTransportActionsList() { - return transportActionsList; - } } diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControlStats.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControlStats.java index 188feb77318e4..eab86c1fb3f2c 100644 --- a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControlStats.java +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControlStats.java @@ -8,7 +8,6 @@ package org.opensearch.ratelimitting.admissioncontrol.stats; -import org.opensearch.Version; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; @@ -18,15 +17,18 @@ import java.io.IOException; import java.util.List; +/** + * Class for admission control stats used as part of node stats + */ public class AdmissionControlStats implements ToXContentFragment, Writeable { - List admissionControllerStatsList; + List admissionControllerStatsList; /** * * @param admissionControllerStatsList list of admissionControllerStats */ - public AdmissionControlStats(List admissionControllerStatsList){ + public AdmissionControlStats(List admissionControllerStatsList) { this.admissionControllerStatsList = admissionControllerStatsList; } @@ -36,11 +38,7 @@ public AdmissionControlStats(List admissionControl * @throws IOException if an I/O error occurs */ public AdmissionControlStats(StreamInput in) throws IOException { - if (in.getVersion().onOrAfter(Version.V_3_0_0)) { - this.admissionControllerStatsList = in.readNamedWriteableList(BaseAdmissionControllerStats.class); - } else { - this.admissionControllerStatsList = null; - } + this.admissionControllerStatsList = in.readNamedWriteableList(AdmissionControllerStats.class); } /** @@ -50,9 +48,7 @@ public AdmissionControlStats(StreamInput in) throws IOException { */ @Override public void writeTo(StreamOutput out) throws IOException { - if (out.getVersion().onOrAfter(Version.V_3_0_0)) { - out.writeList(this.admissionControllerStatsList); - } + out.writeList(this.admissionControllerStatsList); } /** diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/CPUBasedAdmissionControllerStats.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControllerStats.java similarity index 71% rename from server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/CPUBasedAdmissionControllerStats.java rename to server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControllerStats.java index 7b4e4a9695509..2763b366f4b75 100644 --- a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/CPUBasedAdmissionControllerStats.java +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControllerStats.java @@ -8,35 +8,38 @@ package org.opensearch.ratelimitting.admissioncontrol.stats; +import org.opensearch.core.common.io.stream.NamedWriteable; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.xcontent.ToXContentFragment; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.ratelimitting.admissioncontrol.controllers.AdmissionController; -import org.opensearch.ratelimitting.admissioncontrol.controllers.CPUBasedAdmissionController; import java.io.IOException; import java.util.Map; -import static org.opensearch.ratelimitting.admissioncontrol.settings.CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER; -public class CPUBasedAdmissionControllerStats extends BaseAdmissionControllerStats { - - /** - * Returns the name of the writeable object - */ - @Override - public String getWriteableName() { - return CPU_BASED_ADMISSION_CONTROLLER; - } - +/** + * Class for admission controller ( such as CPU ) stats which includes rejection count for each action type + */ +public class AdmissionControllerStats implements NamedWriteable, ToXContentFragment { public Map rejectionCount; + public String admissionControllerName; - public CPUBasedAdmissionControllerStats(AdmissionController admissionController){ + public AdmissionControllerStats(AdmissionController admissionController, String admissionControllerName) { this.rejectionCount = admissionController.getRejectionStats(); + this.admissionControllerName = admissionControllerName; } - public CPUBasedAdmissionControllerStats(StreamInput in) throws IOException { + public AdmissionControllerStats(StreamInput in) throws IOException { this.rejectionCount = in.readMap(StreamInput::readString, StreamInput::readLong); + this.admissionControllerName = in.readString(); + } + + @Override + public String getWriteableName() { + return admissionControllerName; } + /** * Write this into the {@linkplain StreamOutput}. * @@ -45,6 +48,7 @@ public CPUBasedAdmissionControllerStats(StreamInput in) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException { out.writeMap(this.rejectionCount, StreamOutput::writeString, StreamOutput::writeLong); + out.writeString(this.admissionControllerName); } /** diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/BaseAdmissionControllerStats.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/BaseAdmissionControllerStats.java deleted file mode 100644 index 0ee1807bf80da..0000000000000 --- a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/BaseAdmissionControllerStats.java +++ /dev/null @@ -1,15 +0,0 @@ -/* - * SPDX-License-Identifier: Apache-2.0 - * - * The OpenSearch Contributors require contributions made to - * this file be licensed under the Apache-2.0 license or a - * compatible open source license. - */ - -package org.opensearch.ratelimitting.admissioncontrol.stats; - -import org.opensearch.core.common.io.stream.NamedWriteable; -import org.opensearch.core.xcontent.ToXContentFragment; - -public abstract class BaseAdmissionControllerStats implements NamedWriteable, ToXContentFragment { -} diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/transport/AdmissionControlTransportHandler.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/transport/AdmissionControlTransportHandler.java index dfe286d9b9537..6561a670f0794 100644 --- a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/transport/AdmissionControlTransportHandler.java +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/transport/AdmissionControlTransportHandler.java @@ -54,15 +54,17 @@ public AdmissionControlTransportHandler( */ @Override public void messageReceived(T request, TransportChannel channel, Task task) throws Exception { - // intercept all the transport requests here and apply admission control - try { - // TODO Need to evaluate if we need to apply admission control or not if force Execution is true will update in next PR. - this.admissionControlService.applyTransportAdmissionControl(this.action, this.admissionControlActionType); - } catch (final OpenSearchRejectedExecutionException openSearchRejectedExecutionException) { - log.warn(openSearchRejectedExecutionException.getMessage()); - channel.sendResponse(openSearchRejectedExecutionException); - } catch (final Exception e) { - throw e; + // skip admission control if force execution is true + if (!this.forceExecution) { + // intercept the transport requests here and apply admission control + try { + this.admissionControlService.applyTransportAdmissionControl(this.action, this.admissionControlActionType); + } catch (final OpenSearchRejectedExecutionException openSearchRejectedExecutionException) { + log.warn(openSearchRejectedExecutionException.getMessage()); + channel.sendResponse(openSearchRejectedExecutionException); + } catch (final Exception e) { + throw e; + } } actualHandler.messageReceived(request, channel, task); } diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/transport/AdmissionControlTransportInterceptor.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/transport/AdmissionControlTransportInterceptor.java index c725af821ac8f..ae1520bca769d 100644 --- a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/transport/AdmissionControlTransportInterceptor.java +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/transport/AdmissionControlTransportInterceptor.java @@ -15,7 +15,7 @@ import org.opensearch.transport.TransportRequestHandler; /** - * This class allows throttling to intercept requests on both the sender and the receiver side. + * This class allows throttling by intercepting requests on both the sender and the receiver side. */ public class AdmissionControlTransportInterceptor implements TransportInterceptor { @@ -37,6 +37,12 @@ public TransportRequestHandler interceptHandler( TransportRequestHandler actualHandler, AdmissionControlActionType admissionControlActionType ) { - return new AdmissionControlTransportHandler<>(action, actualHandler, this.admissionControlService, forceExecution, admissionControlActionType); + return new AdmissionControlTransportHandler<>( + action, + actualHandler, + this.admissionControlService, + forceExecution, + admissionControlActionType + ); } } diff --git a/server/src/main/java/org/opensearch/transport/TransportInterceptor.java b/server/src/main/java/org/opensearch/transport/TransportInterceptor.java index 12b0990a5d692..e8efbeb7de3f9 100644 --- a/server/src/main/java/org/opensearch/transport/TransportInterceptor.java +++ b/server/src/main/java/org/opensearch/transport/TransportInterceptor.java @@ -59,21 +59,14 @@ default TransportRequestHandler interceptHandler } /** - * - * @param action - * @param executor - * @param forceExecution - * @param actualHandler - * @param transportActionType - * @return - * @param + * This is called for handlers that needs admission control support */ default TransportRequestHandler interceptHandler( String action, String executor, boolean forceExecution, TransportRequestHandler actualHandler, - AdmissionControlActionType transportActionType + AdmissionControlActionType admissionControlActionType ) { return interceptHandler(action, executor, forceExecution, actualHandler); } diff --git a/server/src/main/java/org/opensearch/transport/TransportService.java b/server/src/main/java/org/opensearch/transport/TransportService.java index a55a20478aa3d..211564c6bb4ac 100644 --- a/server/src/main/java/org/opensearch/transport/TransportService.java +++ b/server/src/main/java/org/opensearch/transport/TransportService.java @@ -1243,13 +1243,14 @@ public void registerRequestHandler( } /** - * Registers a new request handler + * Registers a new request handler with admission control support * * @param action The action the request handler is associated with - * @param requestReader The request class that will be used to construct new instances for streaming * @param executor The executor the request handling will be executed on * @param forceExecution Force execution on the executor queue and never reject it - * @param transportActionType Check the request size and raise an exception in case the limit is breached. + * @param canTripCircuitBreaker Check the request size and raise an exception in case the limit is breached. + * @param admissionControlActionType Admission control based on resource usage limits of provided action type + * @param requestReader The request class that will be used to construct new instances for streaming * @param handler The handler itself that implements the request handling */ public void registerRequestHandler( @@ -1257,12 +1258,12 @@ public void registerRequestHandler( String executor, boolean forceExecution, boolean canTripCircuitBreaker, - AdmissionControlActionType transportActionType, + AdmissionControlActionType admissionControlActionType, Writeable.Reader requestReader, TransportRequestHandler handler ) { validateActionName(action); - handler = interceptor.interceptHandler(action, executor, forceExecution, handler, transportActionType); + handler = interceptor.interceptHandler(action, executor, forceExecution, handler, admissionControlActionType); RequestHandlerRegistry reg = new RequestHandlerRegistry<>( action, requestReader, diff --git a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlServiceTests.java b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlServiceTests.java index abd38a3cbf1fb..95caa8b1a6a22 100644 --- a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlServiceTests.java +++ b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlServiceTests.java @@ -109,12 +109,14 @@ public void testApplyAdmissionControllerDisabled() { admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService, threadPool, null); admissionControlService.applyTransportAdmissionControl(this.action, null); List admissionControllerList = admissionControlService.getAdmissionControllers(); - admissionControllerList.forEach(admissionController -> { assertEquals(admissionController.getRejectionCount(AdmissionControlActionType.INDEXING.getType()), 0); }); + admissionControllerList.forEach(admissionController -> { + assertEquals(admissionController.getRejectionCount(AdmissionControlActionType.INDEXING.getType()), 0); + }); } public void testApplyAdmissionControllerEnabled() { this.action = "indices:data/write/bulk[s][p]"; - admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService, threadPool,null); + admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService, threadPool, null); admissionControlService.applyTransportAdmissionControl(this.action, null); assertEquals( admissionControlService.getAdmissionController(CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER) diff --git a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CPUBasedAdmissionControllerTests.java b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CPUBasedAdmissionControllerTests.java index 2473b242f71b5..9d8fc967c5a82 100644 --- a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CPUBasedAdmissionControllerTests.java +++ b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CPUBasedAdmissionControllerTests.java @@ -45,9 +45,9 @@ public void tearDown() throws Exception { public void testCheckDefaultParameters() { admissionController = new CPUBasedAdmissionController( CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER, - Settings.EMPTY, + null, clusterService, - null + Settings.EMPTY ); assertEquals(admissionController.getName(), CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER); assertEquals(admissionController.getRejectionCount(AdmissionControlActionType.INDEXING.getType()), 0); @@ -60,9 +60,9 @@ public void testCheckDefaultParameters() { public void testCheckUpdateSettings() { admissionController = new CPUBasedAdmissionController( CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER, - Settings.EMPTY, + null, clusterService, - null + Settings.EMPTY ); Settings settings = Settings.builder() .put( @@ -81,9 +81,9 @@ public void testCheckUpdateSettings() { public void testApplyControllerWithDefaultSettings() { admissionController = new CPUBasedAdmissionController( CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER, - Settings.EMPTY, + null, clusterService, - null + Settings.EMPTY ); assertEquals(admissionController.getRejectionCount(AdmissionControlActionType.INDEXING.getType()), 0); assertEquals(admissionController.settings.getTransportLayerAdmissionControllerMode(), AdmissionControlMode.DISABLED); @@ -101,9 +101,9 @@ public void testApplyControllerWhenSettingsEnabled() { .build(); admissionController = new CPUBasedAdmissionController( CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER, - settings, + null, clusterService, - null + settings ); assertTrue(admissionController.isEnabledForTransportLayer(admissionController.settings.getTransportLayerAdmissionControllerMode())); assertEquals(admissionController.getRejectionCount(AdmissionControlActionType.INDEXING.getType()), 0); diff --git a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/enums/TransportActionTypeTests.java b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/enums/TransportActionTypeTests.java index 419e9ea8d4827..3923048376d69 100644 --- a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/enums/TransportActionTypeTests.java +++ b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/enums/TransportActionTypeTests.java @@ -10,7 +10,7 @@ import org.opensearch.test.OpenSearchTestCase; -public class TransportActionTypeTests extends OpenSearchTestCase { +public class admissionControlActionTypeTests extends OpenSearchTestCase { public void testValidActionType() { assertEquals(AdmissionControlActionType.SEARCH.getType(), "search"); diff --git a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/settings/CPUBasedAdmissionControlSettingsTests.java b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/settings/CPUBasedAdmissionControlSettingsTests.java index 43103926a69a2..04777f3f6173f 100644 --- a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/settings/CPUBasedAdmissionControlSettingsTests.java +++ b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/settings/CPUBasedAdmissionControlSettingsTests.java @@ -64,7 +64,6 @@ public void testDefaultSettings() { assertEquals(cpuBasedAdmissionControllerSettings.getTransportLayerAdmissionControllerMode(), AdmissionControlMode.DISABLED); assertEquals(cpuBasedAdmissionControllerSettings.getIndexingCPULimit().longValue(), percent); assertEquals(cpuBasedAdmissionControllerSettings.getSearchCPULimit().longValue(), percent); - assertEquals(cpuBasedAdmissionControllerSettings.getTransportActionsList(), Arrays.asList("indexing", "search")); } public void testGetConfiguredSettings() { From a08ac91f9ec70433c7ea3e7ed8daa97967942303 Mon Sep 17 00:00:00 2001 From: Bharathwaj G Date: Sun, 22 Oct 2023 14:12:39 +0530 Subject: [PATCH 2/2] Fixes for tests Signed-off-by: Bharathwaj G --- .../AdmissionControlService.java | 2 +- .../controllers/AdmissionController.java | 4 +- .../CPUBasedAdmissionController.java | 4 +- .../CPUBasedAdmissionControllerSettings.java | 1 - .../stats/AdmissionControlStats.java | 8 +- .../stats/AdmissionControllerStats.java | 11 +- .../cluster/node/stats/NodeStatsTests.java | 64 +++++- .../cluster/stats/ClusterStatsNodesTests.java | 9 +- .../opensearch/cluster/DiskUsageTests.java | 6 + .../AdmissionControlServiceTests.java | 25 ++- .../AdmissionControlSingleNodeTests.java | 203 ++++++++++++++++++ .../CPUBasedAdmissionControllerTests.java | 30 +-- ...a => AdmissionControlActionTypeTests.java} | 2 +- .../opensearch/test/InternalTestCluster.java | 1 + 14 files changed, 335 insertions(+), 35 deletions(-) create mode 100644 server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlSingleNodeTests.java rename server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/enums/{TransportActionTypeTests.java => AdmissionControlActionTypeTests.java} (94%) diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlService.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlService.java index 1683f8e381c58..c4f6ae431a10b 100644 --- a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlService.java +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlService.java @@ -25,7 +25,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import static org.opensearch.ratelimitting.admissioncontrol.settings.CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER; +import static org.opensearch.ratelimitting.admissioncontrol.controllers.CPUBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER; /** * Admission control Service that bootstraps and manages all the Admission Controllers in OpenSearch. diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/AdmissionController.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/AdmissionController.java index 040ddc4a6baaa..aba2885b614b4 100644 --- a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/AdmissionController.java +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/AdmissionController.java @@ -34,16 +34,14 @@ public abstract class AdmissionController { /** * @param admissionControllerName name of the admissionController * @param resourceUsageCollectorService instance used to get resource usage stats of the node - * @param rejectionCount initialised rejectionCount value for AdmissionController * @param clusterService */ public AdmissionController( String admissionControllerName, ResourceUsageCollectorService resourceUsageCollectorService, - AtomicLong rejectionCount, ClusterService clusterService ) { - this.rejectionCount = rejectionCount; + this.rejectionCount = new AtomicLong(0); this.admissionControllerName = admissionControllerName; this.resourceUsageCollectorService = resourceUsageCollectorService; this.clusterService = clusterService; diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CPUBasedAdmissionController.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CPUBasedAdmissionController.java index dd9c56a46b892..d7730dc62eb92 100644 --- a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CPUBasedAdmissionController.java +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CPUBasedAdmissionController.java @@ -19,13 +19,13 @@ import org.opensearch.ratelimitting.admissioncontrol.settings.CPUBasedAdmissionControllerSettings; import java.util.Optional; -import java.util.concurrent.atomic.AtomicLong; /** * Class for CPU Based Admission Controller in OpenSearch, which aims to provide CPU utilisation admission control. * It provides methods to apply admission control if configured limit has been reached */ public class CPUBasedAdmissionController extends AdmissionController { + public static final String CPU_BASED_ADMISSION_CONTROLLER = "global_cpu_usage"; private static final Logger LOGGER = LogManager.getLogger(CPUBasedAdmissionController.class); public CPUBasedAdmissionControllerSettings settings; @@ -41,7 +41,7 @@ public CPUBasedAdmissionController( ClusterService clusterService, Settings settings ) { - super(admissionControllerName, resourceUsageCollectorService, new AtomicLong(0), clusterService); + super(admissionControllerName, resourceUsageCollectorService, clusterService); this.settings = new CPUBasedAdmissionControllerSettings(clusterService.getClusterSettings(), settings); } diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/settings/CPUBasedAdmissionControllerSettings.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/settings/CPUBasedAdmissionControllerSettings.java index 397fd485c6da3..ed4d4094a3b75 100644 --- a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/settings/CPUBasedAdmissionControllerSettings.java +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/settings/CPUBasedAdmissionControllerSettings.java @@ -19,7 +19,6 @@ * @opensearch.internal */ public class CPUBasedAdmissionControllerSettings { - public static final String CPU_BASED_ADMISSION_CONTROLLER = "global_cpu_usage"; /** * Default parameters for the CPUBasedAdmissionControllerSettings diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControlStats.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControlStats.java index eab86c1fb3f2c..593eeb66b663c 100644 --- a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControlStats.java +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControlStats.java @@ -38,7 +38,7 @@ public AdmissionControlStats(List admissionControllerS * @throws IOException if an I/O error occurs */ public AdmissionControlStats(StreamInput in) throws IOException { - this.admissionControllerStatsList = in.readNamedWriteableList(AdmissionControllerStats.class); + this.admissionControllerStatsList = in.readList(AdmissionControllerStats::new); } /** @@ -51,6 +51,10 @@ public void writeTo(StreamOutput out) throws IOException { out.writeList(this.admissionControllerStatsList); } + public List getAdmissionControllerStatsList() { + return admissionControllerStatsList; + } + /** * @param builder * @param params @@ -62,7 +66,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.startObject("admission_control"); this.admissionControllerStatsList.forEach(stats -> { try { - builder.field(stats.getWriteableName(), stats); + builder.field(stats.getAdmissionControllerName(), stats); } catch (IOException e) { throw new RuntimeException(e); } diff --git a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControllerStats.java b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControllerStats.java index 2763b366f4b75..45ae5ab8a41cb 100644 --- a/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControllerStats.java +++ b/server/src/main/java/org/opensearch/ratelimitting/admissioncontrol/stats/AdmissionControllerStats.java @@ -8,9 +8,9 @@ package org.opensearch.ratelimitting.admissioncontrol.stats; -import org.opensearch.core.common.io.stream.NamedWriteable; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; import org.opensearch.core.xcontent.ToXContentFragment; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.ratelimitting.admissioncontrol.controllers.AdmissionController; @@ -21,7 +21,7 @@ /** * Class for admission controller ( such as CPU ) stats which includes rejection count for each action type */ -public class AdmissionControllerStats implements NamedWriteable, ToXContentFragment { +public class AdmissionControllerStats implements Writeable, ToXContentFragment { public Map rejectionCount; public String admissionControllerName; @@ -35,11 +35,14 @@ public AdmissionControllerStats(StreamInput in) throws IOException { this.admissionControllerName = in.readString(); } - @Override - public String getWriteableName() { + public String getAdmissionControllerName() { return admissionControllerName; } + public Map getRejectionCount() { + return rejectionCount; + } + /** * Write this into the {@linkplain StreamOutput}. * diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java index ebdd012006fb2..58392130d66fa 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java @@ -63,6 +63,11 @@ import org.opensearch.node.NodeResourceUsageStats; import org.opensearch.node.NodesResourceUsageStats; import org.opensearch.node.ResponseCollectorService; +import org.opensearch.ratelimitting.admissioncontrol.controllers.AdmissionController; +import org.opensearch.ratelimitting.admissioncontrol.controllers.CPUBasedAdmissionController; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; +import org.opensearch.ratelimitting.admissioncontrol.stats.AdmissionControlStats; +import org.opensearch.ratelimitting.admissioncontrol.stats.AdmissionControllerStats; import org.opensearch.script.ScriptCacheStats; import org.opensearch.script.ScriptStats; import org.opensearch.test.OpenSearchTestCase; @@ -516,15 +521,44 @@ public void testSerialization() throws IOException { assertEquals(replicationStats.getTotalBytesBehind(), deserializedReplicationStats.getTotalBytesBehind()); assertEquals(replicationStats.getMaxReplicationLag(), deserializedReplicationStats.getMaxReplicationLag()); } + AdmissionControlStats admissionControlStats = nodeStats.getAdmissionControlStats(); + AdmissionControlStats deserializedAdmissionControlStats = deserializedNodeStats.getAdmissionControlStats(); + if (admissionControlStats == null) { + assertNull(deserializedAdmissionControlStats); + } else { + assertEquals( + admissionControlStats.getAdmissionControllerStatsList().size(), + deserializedAdmissionControlStats.getAdmissionControllerStatsList().size() + ); + AdmissionControllerStats admissionControllerStats = admissionControlStats.getAdmissionControllerStatsList().get(0); + AdmissionControllerStats deserializedAdmissionControllerStats = deserializedAdmissionControlStats + .getAdmissionControllerStatsList() + .get(0); + assertEquals( + admissionControllerStats.getAdmissionControllerName(), + deserializedAdmissionControllerStats.getAdmissionControllerName() + ); + assertEquals(1, (long) admissionControllerStats.getRejectionCount().get(AdmissionControlActionType.SEARCH.getType())); + assertEquals( + admissionControllerStats.getRejectionCount().get(AdmissionControlActionType.SEARCH.getType()), + deserializedAdmissionControllerStats.getRejectionCount().get(AdmissionControlActionType.SEARCH.getType()) + ); + + assertEquals(2, (long) admissionControllerStats.getRejectionCount().get(AdmissionControlActionType.INDEXING.getType())); + assertEquals( + admissionControllerStats.getRejectionCount().get(AdmissionControlActionType.INDEXING.getType()), + deserializedAdmissionControllerStats.getRejectionCount().get(AdmissionControlActionType.INDEXING.getType()) + ); + } } } } - public static NodeStats createNodeStats() { + public static NodeStats createNodeStats() throws IOException { return createNodeStats(false); } - public static NodeStats createNodeStats(boolean remoteStoreStats) { + public static NodeStats createNodeStats(boolean remoteStoreStats) throws IOException { DiscoveryNode node = new DiscoveryNode( "test_node", buildNewFakeTransportAddress(), @@ -834,6 +868,29 @@ public static NodeStats createNodeStats(boolean remoteStoreStats) { clusterManagerThrottlingStats = new ClusterManagerThrottlingStats(); clusterManagerThrottlingStats.onThrottle("test-task", randomInt()); } + + AdmissionControlStats admissionControlStats = null; + if (frequently()) { + AdmissionController admissionController = new AdmissionController( + CPUBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER, + null, + null + ) { + @Override + public void apply(String action, AdmissionControlActionType admissionControlActionType) { + return; + } + }; + admissionController.addRejectionCount(AdmissionControlActionType.SEARCH.getType(), 1); + admissionController.addRejectionCount(AdmissionControlActionType.INDEXING.getType(), 2); + AdmissionControllerStats stats = new AdmissionControllerStats( + admissionController, + CPUBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER + ); + List statsList = new ArrayList(); + statsList.add(stats); + admissionControlStats = new AdmissionControlStats(statsList); + } ScriptCacheStats scriptCacheStats = scriptStats != null ? scriptStats.toScriptCacheStats() : null; WeightedRoutingStats weightedRoutingStats = null; @@ -871,7 +928,8 @@ public static NodeStats createNodeStats(boolean remoteStoreStats) { null, null, segmentReplicationRejectionStats, - null + null, + admissionControlStats ); } diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/stats/ClusterStatsNodesTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/stats/ClusterStatsNodesTests.java index a0c45f95ef7c0..40a30342b86b9 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/stats/ClusterStatsNodesTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/stats/ClusterStatsNodesTests.java @@ -41,6 +41,7 @@ import org.opensearch.core.xcontent.MediaTypeRegistry; import org.opensearch.test.OpenSearchTestCase; +import java.io.IOException; import java.util.Arrays; import java.util.Collections; import java.util.Iterator; @@ -87,7 +88,13 @@ public void testNetworkTypesToXContent() throws Exception { } public void testIngestStats() throws Exception { - NodeStats nodeStats = randomValueOtherThanMany(n -> n.getIngestStats() == null, NodeStatsTests::createNodeStats); + NodeStats nodeStats = randomValueOtherThanMany(n -> n.getIngestStats() == null, () -> { + try { + return NodeStatsTests.createNodeStats(); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); SortedMap processorStats = new TreeMap<>(); nodeStats.getIngestStats().getProcessorStats().values().forEach(stats -> { diff --git a/server/src/test/java/org/opensearch/cluster/DiskUsageTests.java b/server/src/test/java/org/opensearch/cluster/DiskUsageTests.java index f037b75dc16a3..ff47ec3015697 100644 --- a/server/src/test/java/org/opensearch/cluster/DiskUsageTests.java +++ b/server/src/test/java/org/opensearch/cluster/DiskUsageTests.java @@ -193,6 +193,7 @@ public void testFillDiskUsage() { null, null, null, + null, null ), new NodeStats( @@ -222,6 +223,7 @@ public void testFillDiskUsage() { null, null, null, + null, null ), new NodeStats( @@ -251,6 +253,7 @@ public void testFillDiskUsage() { null, null, null, + null, null ) ); @@ -311,6 +314,7 @@ public void testFillDiskUsageSomeInvalidValues() { null, null, null, + null, null ), new NodeStats( @@ -340,6 +344,7 @@ public void testFillDiskUsageSomeInvalidValues() { null, null, null, + null, null ), new NodeStats( @@ -369,6 +374,7 @@ public void testFillDiskUsageSomeInvalidValues() { null, null, null, + null, null ) ); diff --git a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlServiceTests.java b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlServiceTests.java index 95caa8b1a6a22..4b718c2c9815b 100644 --- a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlServiceTests.java +++ b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlServiceTests.java @@ -68,7 +68,7 @@ public void testAdmissionControllerSettings() { List admissionControllerList = admissionControlService.getAdmissionControllers(); assertEquals(admissionControllerList.size(), 1); CPUBasedAdmissionController cpuBasedAdmissionController = (CPUBasedAdmissionController) admissionControlService - .getAdmissionController(CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER); + .getAdmissionController(CPUBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER); assertEquals( admissionControlSettings.isTransportLayerAdmissionControlEnabled(), cpuBasedAdmissionController.isEnabledForTransportLayer( @@ -119,7 +119,7 @@ public void testApplyAdmissionControllerEnabled() { admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService, threadPool, null); admissionControlService.applyTransportAdmissionControl(this.action, null); assertEquals( - admissionControlService.getAdmissionController(CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER) + admissionControlService.getAdmissionController(CPUBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER) .getRejectionCount(AdmissionControlActionType.INDEXING.getType()), 0 ); @@ -131,13 +131,28 @@ public void testApplyAdmissionControllerEnabled() { ) .build(); clusterService.getClusterSettings().applySettings(settings); - admissionControlService.applyTransportAdmissionControl(this.action, null); List admissionControllerList = admissionControlService.getAdmissionControllers(); assertEquals(admissionControllerList.size(), 1); + } + + public void testApplyAdmissionControllerEnforced() { + this.action = "indices:data/write/bulk[s][p]"; + admissionControlService = new AdmissionControlService(Settings.EMPTY, clusterService, threadPool, null); + admissionControlService.applyTransportAdmissionControl(this.action, null); assertEquals( - admissionControlService.getAdmissionController(CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER) + admissionControlService.getAdmissionController(CPUBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER) .getRejectionCount(AdmissionControlActionType.INDEXING.getType()), - 1 + 0 ); + + Settings settings = Settings.builder() + .put( + CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), + AdmissionControlMode.MONITOR.getMode() + ) + .build(); + clusterService.getClusterSettings().applySettings(settings); + List admissionControllerList = admissionControlService.getAdmissionControllers(); + assertEquals(admissionControllerList.size(), 1); } } diff --git a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlSingleNodeTests.java b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlSingleNodeTests.java new file mode 100644 index 0000000000000..ddba42b158b99 --- /dev/null +++ b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/AdmissionControlSingleNodeTests.java @@ -0,0 +1,203 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.ratelimitting.admissioncontrol; + +import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest; +import org.opensearch.action.admin.cluster.state.ClusterStateRequest; +import org.opensearch.action.admin.cluster.state.ClusterStateResponse; +import org.opensearch.action.bulk.BulkRequestBuilder; +import org.opensearch.action.bulk.BulkResponse; +import org.opensearch.action.search.SearchPhaseExecutionException; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.common.action.ActionFuture; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.node.ResourceUsageCollectorService; +import org.opensearch.node.resource.tracker.ResourceTrackerSettings; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; +import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode; +import org.opensearch.ratelimitting.admissioncontrol.stats.AdmissionControllerStats; +import org.opensearch.test.OpenSearchSingleNodeTestCase; +import org.junit.After; + +import static org.opensearch.ratelimitting.admissioncontrol.AdmissionControlSettings.ADMISSION_CONTROL_TRANSPORT_LAYER_MODE; +import static org.opensearch.ratelimitting.admissioncontrol.settings.CPUBasedAdmissionControllerSettings.INDEXING_CPU_USAGE_LIMIT; +import static org.opensearch.ratelimitting.admissioncontrol.settings.CPUBasedAdmissionControllerSettings.SEARCH_CPU_USAGE_LIMIT; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.hamcrest.Matchers.is; + +/** + * Single node integration tests for admission control + */ +public class AdmissionControlSingleNodeTests extends OpenSearchSingleNodeTestCase { + + @Override + protected boolean resetNodeAfterTest() { + return true; + } + + @After + public void cleanup() { + assertAcked( + client().admin() + .cluster() + .prepareUpdateSettings() + .setPersistentSettings(Settings.builder().putNull("*")) + .setTransientSettings(Settings.builder().putNull("*")) + ); + } + + @Override + protected Settings nodeSettings() { + return Settings.builder() + .put(super.nodeSettings()) + .put(ResourceTrackerSettings.GLOBAL_CPU_USAGE_AC_WINDOW_DURATION_SETTING.getKey(), TimeValue.timeValueMillis(500)) + .put(ResourceTrackerSettings.GLOBAL_JVM_USAGE_AC_WINDOW_DURATION_SETTING.getKey(), TimeValue.timeValueMillis(500)) + .put(ADMISSION_CONTROL_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.ENFORCED) + .put(SEARCH_CPU_USAGE_LIMIT.getKey(), 0) + .put(INDEXING_CPU_USAGE_LIMIT.getKey(), 0) + .build(); + } + + public void testAdmissionControlRejectionEnforcedMode() throws Exception { + ensureGreen(); + assertBusy(() -> assertEquals(1, getInstanceFromNode(ResourceUsageCollectorService.class).getAllNodeStatistics().size())); + // Thread.sleep(700); + client().admin().indices().prepareCreate("index").execute().actionGet(); + BulkRequestBuilder bulk = client().prepareBulk(); + for (int i = 0; i < 3; i++) { + bulk.add(client().prepareIndex("index").setSource("foo", "bar " + i)); + } + // Verify that cluster state is updated + ActionFuture future2 = client().admin().cluster().state(new ClusterStateRequest()); + assertThat(future2.isDone(), is(true)); + + // verify bulk request hits 429 + BulkResponse res = client().bulk(bulk.request()).actionGet(); + assertEquals(429, res.getItems()[0].getFailure().getStatus().getStatus()); + AdmissionControlService admissionControlService = getInstanceFromNode(AdmissionControlService.class); + AdmissionControllerStats acStats = admissionControlService.stats().getAdmissionControllerStatsList().get(0); + assertEquals(1, (long) acStats.getRejectionCount().get(AdmissionControlActionType.INDEXING.getType())); + client().admin().indices().prepareRefresh("index").get(); + + // verify search request hits 429 + SearchRequest searchRequest = new SearchRequest("index"); + try { + client().search(searchRequest).actionGet(); + } catch (Exception e) { + assertTrue(((SearchPhaseExecutionException) e).getDetailedMessage().contains("OpenSearchRejectedExecutionException")); + } + acStats = admissionControlService.stats().getAdmissionControllerStatsList().get(0); + assertEquals(1, (long) acStats.getRejectionCount().get(AdmissionControlActionType.SEARCH.getType())); + } + + public void testAdmissionControlRejectionMonitorOnlyMode() throws Exception { + assertBusy(() -> assertEquals(1, getInstanceFromNode(ResourceUsageCollectorService.class).getAllNodeStatistics().size())); + // Verify that cluster state is updated + ActionFuture future2 = client().admin().cluster().state(new ClusterStateRequest()); + assertThat(future2.isDone(), is(true)); + + ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); + updateSettingsRequest.transientSettings( + Settings.builder() + .put(super.nodeSettings()) + .put(ADMISSION_CONTROL_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.MONITOR.getMode()) + ); + assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + + BulkRequestBuilder bulk = client().prepareBulk(); + for (int i = 0; i < 3; i++) { + bulk.add(client().prepareIndex("index").setSource("foo", "bar " + i)); + } + // verify bulk request success but admission control having rejections stats + BulkResponse res = client().bulk(bulk.request()).actionGet(); + assertFalse(res.hasFailures()); + AdmissionControlService admissionControlService = getInstanceFromNode(AdmissionControlService.class); + AdmissionControllerStats acStats = admissionControlService.stats().getAdmissionControllerStatsList().get(0); + assertEquals(1, (long) acStats.getRejectionCount().get(AdmissionControlActionType.INDEXING.getType())); + client().admin().indices().prepareRefresh("index").get(); + + // verify search request success but admission control having rejections stats + SearchRequest searchRequest = new SearchRequest("index"); + SearchResponse searchResponse = client().search(searchRequest).actionGet(); + assertEquals(3, searchResponse.getHits().getHits().length); + acStats = admissionControlService.stats().getAdmissionControllerStatsList().get(0); + assertEquals(1, (long) acStats.getRejectionCount().get(AdmissionControlActionType.SEARCH.getType())); + } + + public void testAdmissionControlRejectionDisabledMode() throws Exception { + assertBusy(() -> assertEquals(1, getInstanceFromNode(ResourceUsageCollectorService.class).getAllNodeStatistics().size())); + // Verify that cluster state is updated + ActionFuture future2 = client().admin().cluster().state(new ClusterStateRequest()); + assertThat(future2.isDone(), is(true)); + + ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); + updateSettingsRequest.transientSettings( + Settings.builder().put(super.nodeSettings()).put(ADMISSION_CONTROL_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.DISABLED) + ); + assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + + BulkRequestBuilder bulk = client().prepareBulk(); + for (int i = 0; i < 3; i++) { + bulk.add(client().prepareIndex("index").setSource("foo", "bar " + i)); + } + // verify bulk request success and no rejections + BulkResponse res = client().bulk(bulk.request()).actionGet(); + assertFalse(res.hasFailures()); + AdmissionControlService admissionControlService = getInstanceFromNode(AdmissionControlService.class); + AdmissionControllerStats acStats = admissionControlService.stats().getAdmissionControllerStatsList().get(0); + assertEquals(0, acStats.getRejectionCount().size()); + client().admin().indices().prepareRefresh("index").get(); + + // verify search request success and no rejections + SearchRequest searchRequest = new SearchRequest("index"); + SearchResponse searchResponse = client().search(searchRequest).actionGet(); + assertEquals(3, searchResponse.getHits().getHits().length); + acStats = admissionControlService.stats().getAdmissionControllerStatsList().get(0); + assertEquals(0, acStats.getRejectionCount().size()); + + } + + public void testAdmissionControlWithinLimits() throws Exception { + assertBusy(() -> assertEquals(1, getInstanceFromNode(ResourceUsageCollectorService.class).getAllNodeStatistics().size())); + // Verify that cluster state is updated + ActionFuture future2 = client().admin().cluster().state(new ClusterStateRequest()); + assertThat(future2.isDone(), is(true)); + + ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest(); + updateSettingsRequest.transientSettings( + Settings.builder() + .put(super.nodeSettings()) + .put(ADMISSION_CONTROL_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.ENFORCED) + .put(SEARCH_CPU_USAGE_LIMIT.getKey(), 101) + .put(INDEXING_CPU_USAGE_LIMIT.getKey(), 101) + ); + assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet()); + + BulkRequestBuilder bulk = client().prepareBulk(); + for (int i = 0; i < 3; i++) { + bulk.add(client().prepareIndex("index").setSource("foo", "bar " + i)); + } + // verify bulk request success and no rejections + BulkResponse res = client().bulk(bulk.request()).actionGet(); + assertFalse(res.hasFailures()); + AdmissionControlService admissionControlService = getInstanceFromNode(AdmissionControlService.class); + AdmissionControllerStats acStats = admissionControlService.stats().getAdmissionControllerStatsList().get(0); + assertEquals(0, acStats.getRejectionCount().size()); + client().admin().indices().prepareRefresh("index").get(); + + // verify search request success and no rejections + SearchRequest searchRequest = new SearchRequest("index"); + SearchResponse searchResponse = client().search(searchRequest).actionGet(); + assertEquals(3, searchResponse.getHits().getHits().length); + acStats = admissionControlService.stats().getAdmissionControllerStatsList().get(0); + assertEquals(0, acStats.getRejectionCount().size()); + } +} diff --git a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CPUBasedAdmissionControllerTests.java b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CPUBasedAdmissionControllerTests.java index 9d8fc967c5a82..76b7738ca1f18 100644 --- a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CPUBasedAdmissionControllerTests.java +++ b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/controllers/CPUBasedAdmissionControllerTests.java @@ -11,6 +11,7 @@ import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.node.ResourceUsageCollectorService; import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlActionType; import org.opensearch.ratelimitting.admissioncontrol.enums.AdmissionControlMode; import org.opensearch.ratelimitting.admissioncontrol.settings.CPUBasedAdmissionControllerSettings; @@ -18,6 +19,8 @@ import org.opensearch.threadpool.TestThreadPool; import org.opensearch.threadpool.ThreadPool; +import org.mockito.Mockito; + public class CPUBasedAdmissionControllerTests extends OpenSearchTestCase { private ClusterService clusterService; private ThreadPool threadPool; @@ -44,12 +47,12 @@ public void tearDown() throws Exception { public void testCheckDefaultParameters() { admissionController = new CPUBasedAdmissionController( - CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER, + CPUBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER, null, clusterService, Settings.EMPTY ); - assertEquals(admissionController.getName(), CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER); + assertEquals(admissionController.getName(), CPUBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER); assertEquals(admissionController.getRejectionCount(AdmissionControlActionType.INDEXING.getType()), 0); assertEquals(admissionController.settings.getTransportLayerAdmissionControllerMode(), AdmissionControlMode.DISABLED); assertFalse( @@ -59,7 +62,7 @@ public void testCheckDefaultParameters() { public void testCheckUpdateSettings() { admissionController = new CPUBasedAdmissionController( - CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER, + CPUBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER, null, clusterService, Settings.EMPTY @@ -72,16 +75,17 @@ public void testCheckUpdateSettings() { .build(); clusterService.getClusterSettings().applySettings(settings); - assertEquals(admissionController.getName(), CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER); + assertEquals(admissionController.getName(), CPUBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER); assertEquals(admissionController.getRejectionCount(AdmissionControlActionType.INDEXING.getType()), 0); assertEquals(admissionController.settings.getTransportLayerAdmissionControllerMode(), AdmissionControlMode.ENFORCED); assertTrue(admissionController.isEnabledForTransportLayer(admissionController.settings.getTransportLayerAdmissionControllerMode())); } public void testApplyControllerWithDefaultSettings() { + ResourceUsageCollectorService rs = Mockito.mock(ResourceUsageCollectorService.class); admissionController = new CPUBasedAdmissionController( - CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER, - null, + CPUBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER, + rs, clusterService, Settings.EMPTY ); @@ -92,23 +96,25 @@ public void testApplyControllerWithDefaultSettings() { assertEquals(admissionController.getRejectionCount(AdmissionControlActionType.INDEXING.getType()), 0); } - public void testApplyControllerWhenSettingsEnabled() { + public void testApplyControllerWhenSettingsEnabled() throws Exception { Settings settings = Settings.builder() .put( CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER_TRANSPORT_LAYER_MODE.getKey(), AdmissionControlMode.ENFORCED.getMode() ) .build(); + ResourceUsageCollectorService rs = Mockito.mock(ResourceUsageCollectorService.class); admissionController = new CPUBasedAdmissionController( - CPUBasedAdmissionControllerSettings.CPU_BASED_ADMISSION_CONTROLLER, - null, + CPUBasedAdmissionController.CPU_BASED_ADMISSION_CONTROLLER, + rs, clusterService, settings ); assertTrue(admissionController.isEnabledForTransportLayer(admissionController.settings.getTransportLayerAdmissionControllerMode())); + assertTrue( + admissionController.isAdmissionControllerEnforced(admissionController.settings.getTransportLayerAdmissionControllerMode()) + ); assertEquals(admissionController.getRejectionCount(AdmissionControlActionType.INDEXING.getType()), 0); - action = "indices:data/write/bulk[s][p]"; - admissionController.apply(action, AdmissionControlActionType.INDEXING); - assertEquals(admissionController.getRejectionCount(AdmissionControlActionType.INDEXING.getType()), 1); + // we can assert admission control and rejections as part of ITs } } diff --git a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/enums/TransportActionTypeTests.java b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/enums/AdmissionControlActionTypeTests.java similarity index 94% rename from server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/enums/TransportActionTypeTests.java rename to server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/enums/AdmissionControlActionTypeTests.java index 3923048376d69..15a25e6cbca1c 100644 --- a/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/enums/TransportActionTypeTests.java +++ b/server/src/test/java/org/opensearch/ratelimitting/admissioncontrol/enums/AdmissionControlActionTypeTests.java @@ -10,7 +10,7 @@ import org.opensearch.test.OpenSearchTestCase; -public class admissionControlActionTypeTests extends OpenSearchTestCase { +public class AdmissionControlActionTypeTests extends OpenSearchTestCase { public void testValidActionType() { assertEquals(AdmissionControlActionType.SEARCH.getType(), "search"); diff --git a/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java index 63d8f069bebea..e0cdf23ddf153 100644 --- a/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/opensearch/test/InternalTestCluster.java @@ -2723,6 +2723,7 @@ public void ensureEstimatedStats() { false, false, false, + false, false ); assertThat(