diff --git a/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java index ed407550b8b2f..85878dc41f77d 100644 --- a/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java @@ -627,7 +627,16 @@ protected void doRun() { shardBulkAction.execute(bulkShardRequest, ActionListener.runBefore(new ActionListener() { @Override public void onResponse(BulkShardResponse bulkShardResponse) { + threadPool.getThreadContext().getResponseHeaders(); + if(threadPool.getThreadContext().getTransient("PERF_STATS") != null) { +// Map nodePerfStats = (Map) threadPool.getThreadContext().getTransient("PERF_STATS"); +// for (NodePerfStats perfStats : nodePerfStats.values()) { +// logger.info("Response " + +// "CPU : {} , MEM : {} , IO : {}", perfStats.cpuPercentAvg, perfStats.memoryPercentAvg, perfStats.ioPercentAvg); +// } + } for (BulkItemResponse bulkItemResponse : bulkShardResponse.getResponses()) { + clusterService.state().getRoutingTable().shardRoutingTable(bulkShardResponse.getShardId()); // we may have no response if item failed if (bulkItemResponse.getResponse() != null) { bulkItemResponse.getResponse().setShardInfo(bulkShardResponse.getShardInfo()); diff --git a/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java index cbb30714ee8e1..5098c5380681d 100644 --- a/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java @@ -56,6 +56,7 @@ import org.opensearch.action.update.UpdateHelper; import org.opensearch.action.update.UpdateRequest; import org.opensearch.action.update.UpdateResponse; +import org.opensearch.admissioncontroller.NodePerfStats; import org.opensearch.client.transport.NoNodeAvailableException; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterStateObserver; @@ -77,6 +78,7 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.FeatureFlags; import org.opensearch.common.util.concurrent.AbstractRunnable; +import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.common.xcontent.XContentHelper; import org.opensearch.common.xcontent.XContentType; import org.opensearch.common.lease.Releasable; @@ -108,9 +110,7 @@ import org.opensearch.transport.TransportService; import java.io.IOException; -import java.util.Locale; -import java.util.Map; -import java.util.Objects; +import java.util.*; import java.util.concurrent.Executor; import java.util.function.Consumer; import java.util.function.Function; @@ -428,7 +428,7 @@ public void onClusterServiceClose() { public void onTimeout(TimeValue timeout) { mappingUpdateListener.onFailure(new MapperException("timed out while waiting for a dynamic mapping update")); } - }), listener, threadPool, executor(primary)); + }), listener, threadPool, executor(primary), clusterService.localNode().getId()); } @Override @@ -453,7 +453,8 @@ public static void performOnPrimary( Consumer> waitForMappingUpdate, ActionListener> listener, ThreadPool threadPool, - String executorName + String executorName, + String nodeId ) { new ActionRunnable>(listener) { @@ -516,6 +517,24 @@ public boolean isForceExecution() { } private void finishRequest() { + Map nodePerfStatsMap = new HashMap(); + NodePerfStats nodePerfStats = new NodePerfStats(0.95, 0.95,0.95); + nodePerfStatsMap.put(nodeId, nodePerfStats); + ThreadContext threadContext = threadPool.getThreadContext(); + threadContext.addResponseHeader("PERF_STATS", String.valueOf(nodePerfStats.cpuPercentAvg) + "-" + + String.valueOf(nodePerfStats.memoryPercentAvg) + "-" + String.valueOf(nodePerfStats.ioPercentAvg)); +// Map np = new HashMap<>(); +// if(threadContext.getTransient("PERF_STATS") != null ) { +// np = threadContext.getTransient("PERF_STATS"); +// } +// np.put(nodeId, nodePerfStats); + //ThreadContext.StoredContext storedContext = threadContext.newStoredContext(true, Collections.singletonList("PERF_STATS")); + //ThreadContext.StoredContext storedContext = threadContext.newStoredContext(true, Collections.singletonList("PERF_STATS")); + // threadContext.putTransient("PERF_STATS", nodePerfStats); + // threadContext.putHeader("PERF_STATS", nodePerfStatsMap); +// ThreadContext.StoredContext storedContext1 = threadContext.newStoredContext(true, Collections.singletonList("T_ID")); +// threadContext.putTransient("T_ID", "nodePerfStats"); + ActionListener.completeWith( listener, () -> new WritePrimaryResult<>( @@ -527,6 +546,8 @@ private void finishRequest() { logger ) ); + //storedContext.close(); +// storedContext1.close(); } }.run(); } diff --git a/server/src/main/java/org/opensearch/action/fieldcaps/TransportFieldCapabilitiesIndexAction.java b/server/src/main/java/org/opensearch/action/fieldcaps/TransportFieldCapabilitiesIndexAction.java index 7fa9e43bee21d..d96efc1df3b83 100644 --- a/server/src/main/java/org/opensearch/action/fieldcaps/TransportFieldCapabilitiesIndexAction.java +++ b/server/src/main/java/org/opensearch/action/fieldcaps/TransportFieldCapabilitiesIndexAction.java @@ -248,7 +248,7 @@ private AsyncShardsAction(FieldCapabilitiesIndexRequest request, ActionListener< } shardsIt = clusterService.operationRouting() - .searchShards(clusterService.state(), new String[] { request.index() }, null, null, null, null); + .searchShards(clusterService.state(), new String[] { request.index() }, null, null, null, null, null); } public void start() { diff --git a/server/src/main/java/org/opensearch/action/search/SearchExecutionStatsCollector.java b/server/src/main/java/org/opensearch/action/search/SearchExecutionStatsCollector.java index 7082e33dfd5c5..1dfe1121a7e33 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchExecutionStatsCollector.java +++ b/server/src/main/java/org/opensearch/action/search/SearchExecutionStatsCollector.java @@ -33,6 +33,7 @@ package org.opensearch.action.search; import org.opensearch.action.ActionListener; +import org.opensearch.admissioncontroller.NodePerfStats; import org.opensearch.node.ResponseCollectorService; import org.opensearch.search.SearchPhaseResult; import org.opensearch.search.fetch.QueryFetchSearchResult; @@ -91,9 +92,10 @@ public void onResponse(SearchPhaseResult response) { final long serviceTimeEWMA = queryResult.serviceTimeEWMA(); final int queueSize = queryResult.nodeQueueSize(); final long responseDuration = System.nanoTime() - startNanos; + final NodePerfStats nodePerfStats = queryResult.getNodePerfStats(); // EWMA/queue size may be -1 if the query node doesn't support capturing it if (serviceTimeEWMA > 0 && queueSize >= 0) { - collector.addNodeStatistics(nodeId, queueSize, responseDuration, serviceTimeEWMA); + collector.addNodeStatistics(nodeId, queueSize, responseDuration, serviceTimeEWMA, nodePerfStats); } } listener.onResponse(response); diff --git a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java index 69f529fe1d00c..7aac82c62196a 100644 --- a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java @@ -960,7 +960,8 @@ private void executeSearch( routingMap, searchRequest.preference(), searchService.getResponseCollectorService(), - nodeSearchCounts + nodeSearchCounts, + searchService.getAdmissionControllerService() ); localShardIterators = StreamSupport.stream(localShardRoutings.spliterator(), false) .map(it -> new SearchShardIterator(searchRequest.getLocalClusterAlias(), it.shardId(), it.getShardRoutings(), localIndices)) diff --git a/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java index 3feb5b23e5540..a3ed6ce9f52d5 100644 --- a/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java @@ -411,9 +411,8 @@ protected void handlePrimaryRequest(final ConcreteShardRequest request, new ChannelActionListener<>(channel, transportPrimaryAction, request), releasable::close ); - - try { - new AsyncPrimaryAction(request, listener, (ReplicationTask) task).run(); + // here + try {new AsyncPrimaryAction(request, listener, (ReplicationTask) task).run(); } catch (RuntimeException e) { listener.onFailure(e); } diff --git a/server/src/main/java/org/opensearch/admissioncontroller/AdmissionControllerPlugin.java b/server/src/main/java/org/opensearch/admissioncontroller/AdmissionControllerPlugin.java new file mode 100644 index 0000000000000..5d201ad71ef3d --- /dev/null +++ b/server/src/main/java/org/opensearch/admissioncontroller/AdmissionControllerPlugin.java @@ -0,0 +1,37 @@ + +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.admissioncontroller; + +import org.opensearch.common.io.stream.NamedWriteableRegistry; +import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.monitor.fs.FsService; +import org.opensearch.plugins.NetworkPlugin; +import org.opensearch.plugins.Plugin; +import org.opensearch.transport.TransportInterceptor; + +import java.util.ArrayList; +import java.util.List; + +/** + * Plugin + */ +public class AdmissionControllerPlugin extends Plugin implements NetworkPlugin { + + public AdmissionControllerService admissionControllerService; + public AdmissionControllerPlugin(AdmissionControllerService admissionControllerService) { + this.admissionControllerService = admissionControllerService; + } + @Override + public List getTransportInterceptors(NamedWriteableRegistry namedWriteableRegistry, ThreadContext threadContext) { + List interceptors = new ArrayList(1); + interceptors.add(new AdmissionControllerTransportInterceptor(null, this.admissionControllerService)); + return interceptors; + } +} diff --git a/server/src/main/java/org/opensearch/admissioncontroller/AdmissionControllerRequestHandler.java b/server/src/main/java/org/opensearch/admissioncontroller/AdmissionControllerRequestHandler.java new file mode 100644 index 0000000000000..0b4b7bba48f72 --- /dev/null +++ b/server/src/main/java/org/opensearch/admissioncontroller/AdmissionControllerRequestHandler.java @@ -0,0 +1,74 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.admissioncontroller; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; +import org.opensearch.tasks.Task; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportChannel; +import org.opensearch.transport.TransportRequest; +import org.opensearch.transport.TransportRequestHandler; + +/** + * Handler + * @param + */ +public class AdmissionControllerRequestHandler implements TransportRequestHandler { + private final String action; + private final TransportRequestHandler actualHandler; + private final ThreadPool threadPool; + protected final Logger log = LogManager.getLogger(this.getClass()); + public AdmissionControllerService admissionControllerService; + public AdmissionControllerRequestHandler(String action, TransportRequestHandler actualHandler, + ThreadPool threadPool, AdmissionControllerService admissionControllerService) { + super(); + this.action = action; + this.actualHandler = actualHandler; + this.threadPool = threadPool; + this.admissionControllerService = admissionControllerService; + } + + protected ThreadContext getThreadContext() { + if(threadPool == null) { + return null; + } + threadPool.getThreadContext().getTransient("PERF_STATS"); + return threadPool.getThreadContext(); + } + + public boolean isSearchRequest() { + return this.action.startsWith("indices:data/read/search"); + } + + public boolean isIndexRequest(){ + return this.action.startsWith("indices:data/write"); + } + + @Override + public void messageReceived(T request, TransportChannel channel, Task task) throws Exception { + // Evaluate the requests here. + if (this.admissionControllerService.isIOInStress()) { + log.info("Admission controller service responded with IO is in stress state"); +// if (this.isSearchRequest()){ +// channel.sendResponse(new OpenSearchRejectedExecutionException("Execution Rejected due to high IO usage")); +// return; +// } + }else { + //log.info("Admission controller service responded with IO is in healthy state"); + } + this.messageReceivedDecorate(request, actualHandler, channel, task); + } + + protected void messageReceivedDecorate(final T request, final TransportRequestHandler actualHandler, final TransportChannel transportChannel, Task task) throws Exception { + actualHandler.messageReceived(request, transportChannel, task); + } +} diff --git a/server/src/main/java/org/opensearch/admissioncontroller/AdmissionControllerService.java b/server/src/main/java/org/opensearch/admissioncontroller/AdmissionControllerService.java new file mode 100644 index 0000000000000..ab4ed550d9f2c --- /dev/null +++ b/server/src/main/java/org/opensearch/admissioncontroller/AdmissionControllerService.java @@ -0,0 +1,202 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.admissioncontroller; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.common.ExponentiallyWeightedMovingAverage; +import org.opensearch.common.component.AbstractLifecycleComponent; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.env.NodeEnvironment; +import org.opensearch.monitor.fs.FsInfo; +import org.opensearch.monitor.fs.FsService; +import org.opensearch.monitor.jvm.JvmStats; +import org.opensearch.monitor.process.ProcessProbe; +import org.opensearch.threadpool.Scheduler; +import org.opensearch.threadpool.ThreadPool; + +import java.io.IOException; + +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Service + */ +public class AdmissionControllerService extends AbstractLifecycleComponent { + + private static final Logger logger = LogManager.getLogger(AdmissionControllerService.class); + private final ThreadPool threadPool; + private final TimeValue refreshInterval; + private volatile Scheduler.Cancellable scheduledFuture; + private Map previousIOTimeMap; + private final Map> deviceIOUsage; + + private static final double IO_MAX_USAGE = 30; + + private static final double IO_THRESHOLD_WINDOW = 4; + private final AtomicInteger ioLimitBreachedCount; + + private final FsService fsService; + double EWMA_ALPHA = 0.3; + private final ExponentiallyWeightedMovingAverage cpuExecutionEWMA; + private final ExponentiallyWeightedMovingAverage memoryExecutionEWMA; + private final ExponentiallyWeightedMovingAverage ioExecutionEWMA; + + + public static final Setting REFRESH_INTERVAL_SETTING = Setting.timeSetting( + "admissioncontroller.io.monitor.refresh_interval", + TimeValue.timeValueSeconds(10), + TimeValue.timeValueMillis(1), + Setting.Property.NodeScope + ); + + public AdmissionControllerService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool, NodeEnvironment nodeEnv, FsService fsService){ + this.threadPool = threadPool; + this.fsService = fsService; + this.refreshInterval = REFRESH_INTERVAL_SETTING.get(settings); + this.previousIOTimeMap = new HashMap<>(); + this.deviceIOUsage = new HashMap<>(); + this.ioLimitBreachedCount = new AtomicInteger(0); + this.cpuExecutionEWMA = new ExponentiallyWeightedMovingAverage(EWMA_ALPHA, 0); + this.memoryExecutionEWMA = new ExponentiallyWeightedMovingAverage(EWMA_ALPHA, 0); + this.ioExecutionEWMA = new ExponentiallyWeightedMovingAverage(EWMA_ALPHA, 0); + } + @Override + protected void doStart() { + this.scheduledFuture = threadPool.scheduleWithFixedDelay(new IOMonitor(this.fsService), refreshInterval, ThreadPool.Names.GENERIC); + } + + @Override + protected void doStop() { + this.scheduledFuture.cancel(); + } + + public boolean isIOInStress(){ + return this.ioLimitBreachedCount.get() >= IO_THRESHOLD_WINDOW; + } + + public double getCPUEWMA() { + return cpuExecutionEWMA.getAverage(); + } + + public double getMemoryEWMA() { + return memoryExecutionEWMA.getAverage(); + } + + public double getIoEWMA() { return ioExecutionEWMA.getAverage(); } + + @Override + protected void doClose() throws IOException { + + } + + class DevicePreiousStats { + public long ioTime; + public double readTime; + public double writeTime; + public double readOps; + public double writeOps; + public DevicePreiousStats(long ioTime, double readTime, double writeTime, double readOps, double writeOps) { + this.ioTime = ioTime; + this.readTime = readTime; + this.writeTime = writeTime; + this.readOps = readOps; + this.writeOps = writeOps; + } + } + + class IOMonitor implements Runnable { + + private final FsService fsService; + + IOMonitor(FsService fsService) { + this.fsService = fsService; + } + + @Override + public void run() { + try{ + monitorIOUtilisation(); + monitorCpuUtilisation(); + monitorMemoryUtilisation(); + }catch (Exception e){ + logger.error("Exception on the getting IO utilisation"); + } + } + + private void monitorCpuUtilisation() { + cpuExecutionEWMA.addValue( ProcessProbe.getInstance().getProcessCpuPercent() / 100.0); + } + private void monitorMemoryUtilisation() { + memoryExecutionEWMA.addValue(JvmStats.jvmStats().getMem().getHeapUsedPercent() / 100.0); + } + private void monitorIOUtilisation() { + logger.info("IO stats is triggered"); + Map currentIOTimeMap = new HashMap<>(); + for (FsInfo.DeviceStats devicesStat : this.fsService.stats().getIoStats().getDevicesStats()) { + logger.info("Device Id: " + devicesStat.getDeviceName() + "; IO time: " + devicesStat.getCurrentIOTime()); +// logger.info("Read Latency : " + devicesStat.getCurrentReadLatency() + " Write latency : " + devicesStat.getCurrentWriteLatency()); +// logger.info("Write time : " + devicesStat.getCurrentWriteTime() + "Read time : " + devicesStat.getCurrentReadTime()); +// logger.info("Read latency diff : " + devicesStat.getReadLatency() + "Write latency diff" + devicesStat.getWriteLatency()); +// logger.info("Read time diff : " + devicesStat.getReadTime() + "Write time diff" + devicesStat.getWriteTime()); + + //logger.info("Read latency : " + devicesStat.getNewReadLatency() + " Write latency : " + devicesStat.getNewWriteLatency()); + + logger.info(""); + if (previousIOTimeMap.containsKey(devicesStat.getDeviceName())){ + long ioSpentTime = devicesStat.getCurrentIOTime() - previousIOTimeMap.get(devicesStat.getDeviceName()).ioTime; + double ioUsePercent = (double) (ioSpentTime * 100) / (10 * 1000); + ioExecutionEWMA.addValue(ioUsePercent / 100.0); + + double readOps = devicesStat.currentReadOperations() - previousIOTimeMap.get(devicesStat.getDeviceName()).readOps; + double writeOps = devicesStat.currentWriteOpetations() - previousIOTimeMap.get(devicesStat.getDeviceName()).writeOps; + + double readTime = devicesStat.getCurrentReadTime() - previousIOTimeMap.get(devicesStat.getDeviceName()).readTime; + double writeTime = devicesStat.getWriteTime() - previousIOTimeMap.get(devicesStat.getDeviceName()).writeTime; + + double readLatency = readOps / readTime; + double wrieLatency = writeOps / writeTime; + + logger.info("read ops : {} , writeops : {} , readtime: {} , writetime: {}", readOps, writeOps, readTime, writeTime); + logger.info("Read latency final : " + readLatency + "write latency final : " + wrieLatency); + + Queue ioUsageQueue; + if (deviceIOUsage.containsKey(devicesStat.getDeviceName())) { + ioUsageQueue = deviceIOUsage.get(devicesStat.getDeviceName()); + if (ioUsageQueue.size() == 10){ + double oldIOUsePercent = ioUsageQueue.remove(); + if (oldIOUsePercent > IO_MAX_USAGE){ + ioLimitBreachedCount.decrementAndGet(); + } + } + }else { + ioUsageQueue = new LinkedList<>(); + } + ioUsageQueue.add(ioUsePercent); + logger.info("Queue Details: " + ioUsageQueue); + if (ioUsePercent > IO_MAX_USAGE){ + ioLimitBreachedCount.incrementAndGet(); + } + deviceIOUsage.put(devicesStat.getDeviceName(), ioUsageQueue); + } + + DevicePreiousStats ps = new DevicePreiousStats(devicesStat.getCurrentIOTime(), devicesStat.getCurrentReadTime(), + devicesStat.getCurrentWriteTime(), devicesStat.currentReadOperations(), devicesStat.currentWriteOpetations()); + + currentIOTimeMap.put(devicesStat.getDeviceName(), ps); + } + previousIOTimeMap = currentIOTimeMap; + } + } +} + diff --git a/server/src/main/java/org/opensearch/admissioncontroller/AdmissionControllerTransportInterceptor.java b/server/src/main/java/org/opensearch/admissioncontroller/AdmissionControllerTransportInterceptor.java new file mode 100644 index 0000000000000..7650fd779b7c5 --- /dev/null +++ b/server/src/main/java/org/opensearch/admissioncontroller/AdmissionControllerTransportInterceptor.java @@ -0,0 +1,38 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.admissioncontroller; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.monitor.fs.FsService; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportInterceptor; +import org.opensearch.transport.TransportRequest; +import org.opensearch.transport.TransportRequestHandler; + +/** + * Interceptor + */ +public class AdmissionControllerTransportInterceptor implements TransportInterceptor { + + protected final Logger log = LogManager.getLogger(this.getClass()); + protected final ThreadPool threadPool; + public AdmissionControllerService admissionControllerService; + + public AdmissionControllerTransportInterceptor(final ThreadPool threadPool, AdmissionControllerService admissionControllerService) { + this.threadPool = threadPool; + this.admissionControllerService = admissionControllerService; + } + + @Override + public TransportRequestHandler interceptHandler(String action, String executor, boolean forceExecution, + TransportRequestHandler actualHandler) { + return new AdmissionControllerRequestHandler(action, actualHandler, threadPool, admissionControllerService); + } +} diff --git a/server/src/main/java/org/opensearch/admissioncontroller/NodePerfStats.java b/server/src/main/java/org/opensearch/admissioncontroller/NodePerfStats.java new file mode 100644 index 0000000000000..e886ba0c9f841 --- /dev/null +++ b/server/src/main/java/org/opensearch/admissioncontroller/NodePerfStats.java @@ -0,0 +1,42 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.admissioncontroller; + +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.io.stream.Writeable; + +import java.io.IOException; + +/** + * Node perf stats + */ +public class NodePerfStats implements Writeable { + public double cpuPercentAvg; + public double memoryPercentAvg; + public double ioPercentAvg; + + public NodePerfStats(StreamInput in) throws IOException { + this.cpuPercentAvg = in.readDouble(); + this.memoryPercentAvg = in.readDouble(); + this.ioPercentAvg = in.readDouble(); + } + + public NodePerfStats(double cpuPercentAvg, double memoryPercentAvg, double ioPercentAvg) { + this.cpuPercentAvg = cpuPercentAvg; + this.memoryPercentAvg = memoryPercentAvg; + this.ioPercentAvg = ioPercentAvg; + } + + public void writeTo(StreamOutput out) throws IOException { + out.writeDouble(cpuPercentAvg); + out.writeDouble(memoryPercentAvg); + out.writeDouble(ioPercentAvg); + } +} diff --git a/server/src/main/java/org/opensearch/admissioncontroller/package-info.java b/server/src/main/java/org/opensearch/admissioncontroller/package-info.java new file mode 100644 index 0000000000000..22a1253f72370 --- /dev/null +++ b/server/src/main/java/org/opensearch/admissioncontroller/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * package + */ +package org.opensearch.admissioncontroller; diff --git a/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java b/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java index 7befec56abaa6..8fd3ead06cbf0 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java +++ b/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java @@ -34,6 +34,8 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.opensearch.admissioncontroller.AdmissionControllerService; +import org.opensearch.admissioncontroller.NodePerfStats; import org.opensearch.cluster.metadata.WeightedRoutingMetadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; @@ -292,8 +294,9 @@ public ShardIterator activeInitializingShardsIt(int seed) { */ public ShardIterator activeInitializingShardsRankedIt( @Nullable ResponseCollectorService collector, - @Nullable Map nodeSearchCounts - ) { + @Nullable Map nodeSearchCounts, + @Nullable AdmissionControllerService admissionControllerService + ) { final int seed = shuffler.nextSeed(); if (allInitializingShards.isEmpty()) { return new PlainShardIterator( @@ -483,7 +486,17 @@ private static void adjustStats( final int updatedQueue = (minStats.queueSize + stats.queueSize) / 2; final long updatedResponse = (long) (minStats.responseTime + stats.responseTime) / 2; final long updatedService = (long) (minStats.serviceTime + stats.serviceTime) / 2; - collector.addNodeStatistics(nodeId, updatedQueue, updatedResponse, updatedService); + // revisit this - basically reset stats based on time + logger.info("Stats before adjust CPU : {}, MEM : {}, IO : {}", + stats.nodePerfStats.cpuPercentAvg, stats.nodePerfStats.memoryPercentAvg, stats.nodePerfStats.ioPercentAvg); + final double cpuPercentAvg = stats.nodePerfStats.cpuPercentAvg * 0.99; + final double memPercentAvg = stats.nodePerfStats.memoryPercentAvg * 0.99; + final double ioPercentAvg = stats.nodePerfStats.ioPercentAvg * 0.99; + logger.info("Stats after adjust CPU : {}, MEM : {}, IO : {}", + cpuPercentAvg, memPercentAvg, ioPercentAvg); + + NodePerfStats nodePerfStats = new NodePerfStats(cpuPercentAvg, memPercentAvg, ioPercentAvg); + collector.addNodeStatistics(nodeId, updatedQueue, updatedResponse, updatedService, nodePerfStats); } } } diff --git a/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java b/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java index ade2cda797334..6cb412dbe3c0f 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java +++ b/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java @@ -32,6 +32,7 @@ package org.opensearch.cluster.routing; +import org.opensearch.admissioncontroller.AdmissionControllerService; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.WeightedRoutingMetadata; @@ -202,7 +203,8 @@ public ShardIterator getShards( preference, null, null, - clusterState.getMetadata().weightedRoutingMetadata() + clusterState.getMetadata().weightedRoutingMetadata(), + null ); } @@ -215,7 +217,8 @@ public ShardIterator getShards(ClusterState clusterState, String index, int shar preference, null, null, - clusterState.metadata().weightedRoutingMetadata() + clusterState.metadata().weightedRoutingMetadata(), + null ); } @@ -225,7 +228,7 @@ public GroupShardsIterator searchShards( @Nullable Map> routing, @Nullable String preference ) { - return searchShards(clusterState, concreteIndices, routing, preference, null, null); + return searchShards(clusterState, concreteIndices, routing, preference, null, null, null); } public GroupShardsIterator searchShards( @@ -236,6 +239,17 @@ public GroupShardsIterator searchShards( @Nullable ResponseCollectorService collectorService, @Nullable Map nodeCounts ) { + return searchShards(clusterState, concreteIndices, routing, preference, collectorService, nodeCounts, null); + } + public GroupShardsIterator searchShards( + ClusterState clusterState, + String[] concreteIndices, + @Nullable Map> routing, + @Nullable String preference, + @Nullable ResponseCollectorService collectorService, + @Nullable Map nodeCounts, + @Nullable AdmissionControllerService admissionControllerService + ) { final Set shards = computeTargetedShards(clusterState, concreteIndices, routing); final Set set = new HashSet<>(shards.size()); for (IndexShardRoutingTable shard : shards) { @@ -253,7 +267,8 @@ public GroupShardsIterator searchShards( preference, collectorService, nodeCounts, - clusterState.metadata().weightedRoutingMetadata() + clusterState.metadata().weightedRoutingMetadata(), + admissionControllerService ); if (iterator != null) { set.add(iterator); @@ -276,8 +291,10 @@ private Set computeTargetedShards( ) { routing = routing == null ? EMPTY_ROUTING : routing; // just use an empty map final Set set = new HashSet<>(); + final Set nodeIds = new HashSet<>(); // we use set here and not list since we might get duplicates for (String index : concreteIndices) { + // this is where we calculate shard information which contains nodes associated with each shard final IndexRoutingTable indexRouting = indexRoutingTable(clusterState, index); final IndexMetadata indexMetadata = indexMetadata(clusterState, index); final Set effectiveRouting = routing.get(index); @@ -285,7 +302,10 @@ private Set computeTargetedShards( for (String r : effectiveRouting) { final int routingPartitionSize = indexMetadata.getRoutingPartitionSize(); for (int partitionOffset = 0; partitionOffset < routingPartitionSize; partitionOffset++) { - set.add(RoutingTable.shardRoutingTable(indexRouting, calculateScaledShardId(indexMetadata, r, partitionOffset))); + final IndexShardRoutingTable indexShardRoutingTable = RoutingTable.shardRoutingTable(indexRouting, + calculateScaledShardId(indexMetadata, r, partitionOffset)); + //nodeIds.add(indexShardRoutingTable.s) + set.add(indexShardRoutingTable); } } } else { @@ -304,10 +324,11 @@ private ShardIterator preferenceActiveShardIterator( @Nullable String preference, @Nullable ResponseCollectorService collectorService, @Nullable Map nodeCounts, - @Nullable WeightedRoutingMetadata weightedRoutingMetadata + @Nullable WeightedRoutingMetadata weightedRoutingMetadata, + @Nullable AdmissionControllerService admissionControllerService ) { if (preference == null || preference.isEmpty()) { - return shardRoutings(indexShard, nodes, collectorService, nodeCounts, weightedRoutingMetadata); + return shardRoutings(indexShard, nodes, collectorService, nodeCounts, weightedRoutingMetadata, admissionControllerService); } if (preference.charAt(0) == '_') { @@ -335,12 +356,13 @@ private ShardIterator preferenceActiveShardIterator( } // no more preference if (index == -1 || index == preference.length() - 1) { - return shardRoutings(indexShard, nodes, collectorService, nodeCounts, weightedRoutingMetadata); + return shardRoutings(indexShard, nodes, collectorService, nodeCounts, weightedRoutingMetadata, admissionControllerService); } else { // update the preference and continue preference = preference.substring(index + 1); } } + // there are preference based routings as well preferenceType = Preference.parse(preference); checkPreferenceBasedRoutingAllowed(preferenceType, weightedRoutingMetadata); switch (preferenceType) { @@ -400,7 +422,8 @@ private ShardIterator shardRoutings( DiscoveryNodes nodes, @Nullable ResponseCollectorService collectorService, @Nullable Map nodeCounts, - @Nullable WeightedRoutingMetadata weightedRoutingMetadata + @Nullable WeightedRoutingMetadata weightedRoutingMetadata, + @Nullable AdmissionControllerService admissionControllerService ) { if (WeightedRoutingUtils.shouldPerformWeightedRouting(ignoreWeightedRouting, weightedRoutingMetadata)) { return indexShard.activeInitializingShardsWeightedIt( @@ -412,7 +435,7 @@ private ShardIterator shardRoutings( ); } else if (ignoreAwarenessAttributes()) { if (useAdaptiveReplicaSelection) { - return indexShard.activeInitializingShardsRankedIt(collectorService, nodeCounts); + return indexShard.activeInitializingShardsRankedIt(collectorService, nodeCounts, admissionControllerService); } else { return indexShard.activeInitializingShardsRandomIt(); } diff --git a/server/src/main/java/org/opensearch/common/network/NetworkModule.java b/server/src/main/java/org/opensearch/common/network/NetworkModule.java index 985697d46d39e..89cf51b22dafb 100644 --- a/server/src/main/java/org/opensearch/common/network/NetworkModule.java +++ b/server/src/main/java/org/opensearch/common/network/NetworkModule.java @@ -33,6 +33,8 @@ package org.opensearch.common.network; import org.opensearch.action.support.replication.ReplicationTask; +import org.opensearch.admissioncontroller.AdmissionControllerPlugin; +import org.opensearch.admissioncontroller.AdmissionControllerService; import org.opensearch.cluster.routing.allocation.command.AllocateEmptyPrimaryAllocationCommand; import org.opensearch.cluster.routing.allocation.command.AllocateReplicaAllocationCommand; import org.opensearch.cluster.routing.allocation.command.AllocateStalePrimaryAllocationCommand; @@ -131,6 +133,7 @@ public final class NetworkModule { private final Map> transportFactories = new HashMap<>(); private final Map> transportHttpFactories = new HashMap<>(); private final List transportIntercetors = new ArrayList<>(); + private final AdmissionControllerPlugin admissionControllerPlugin; /** * Creates a network module that custom networking classes can be plugged into. @@ -147,7 +150,8 @@ public NetworkModule( NamedXContentRegistry xContentRegistry, NetworkService networkService, HttpServerTransport.Dispatcher dispatcher, - ClusterSettings clusterSettings + ClusterSettings clusterSettings, + AdmissionControllerService admissionControllerService ) { this.settings = settings; for (NetworkPlugin plugin : plugins) { @@ -184,6 +188,11 @@ public NetworkModule( registerTransportInterceptor(interceptor); } } + admissionControllerPlugin = new AdmissionControllerPlugin(admissionControllerService); + List transportInterceptors = admissionControllerPlugin.getTransportInterceptors(namedWriteableRegistry, threadPool.getThreadContext()); + for (TransportInterceptor interceptor : transportInterceptors) { + registerTransportInterceptor(interceptor); + } } /** Adds a transport implementation that can be selected by setting {@link #TRANSPORT_TYPE_KEY}. */ diff --git a/server/src/main/java/org/opensearch/monitor/fs/FsInfo.java b/server/src/main/java/org/opensearch/monitor/fs/FsInfo.java index 1aa7e00ed86c1..3436b7b0b19f7 100644 --- a/server/src/main/java/org/opensearch/monitor/fs/FsInfo.java +++ b/server/src/main/java/org/opensearch/monitor/fs/FsInfo.java @@ -235,6 +235,16 @@ public static class DeviceStats implements Writeable, ToXContentFragment { final long previousWritesCompleted; final long currentSectorsWritten; final long previousSectorsWritten; + final long currentIOTime; + final long previousIOTime; + final double currentReadTime; + final double previousReadTime; + final double currentWriteTime; + final double previousWriteTime; + final double currentReadLatency; + final double previousReadLatency; + final double currentWriteLatency; + final double previousWriteLatency; public DeviceStats( final int majorDeviceNumber, @@ -244,6 +254,11 @@ public DeviceStats( final long currentSectorsRead, final long currentWritesCompleted, final long currentSectorsWritten, + final long currentIOTime, + final double currentReadTime, + final double currentWriteTime, + final double currentReadLatency, + final double currentWriteLatency, final DeviceStats previousDeviceStats ) { this( @@ -257,7 +272,17 @@ public DeviceStats( currentSectorsRead, previousDeviceStats != null ? previousDeviceStats.currentSectorsRead : -1, currentWritesCompleted, - previousDeviceStats != null ? previousDeviceStats.currentWritesCompleted : -1 + previousDeviceStats != null ? previousDeviceStats.currentWritesCompleted : -1, + currentIOTime, + previousDeviceStats != null ? previousDeviceStats.currentIOTime : -1, + currentReadTime, + previousDeviceStats != null ? previousDeviceStats.previousReadTime : -1.0, + currentWriteTime, + previousDeviceStats != null ? previousDeviceStats.previousWriteTime : -1.0, + currentReadLatency, + previousDeviceStats != null ? previousDeviceStats.currentReadLatency : -1.0, + currentWriteLatency, + previousDeviceStats != null ? previousDeviceStats.currentWriteLatency : -1.0 ); } @@ -272,7 +297,17 @@ private DeviceStats( final long currentSectorsRead, final long previousSectorsRead, final long currentWritesCompleted, - final long previousWritesCompleted + final long previousWritesCompleted, + final long currentIOTime, + final long previousIOTime, + final double currentReadTime, + final double previousReadTime, + final double currentWriteTime, + final double previousWriteTime, + final double currentReadLatency, + final double previousReadLatency, + final double currentWriteLatency, + final double previousWriteLatency ) { this.majorDeviceNumber = majorDeviceNumber; this.minorDeviceNumber = minorDeviceNumber; @@ -285,6 +320,16 @@ private DeviceStats( this.previousSectorsRead = previousSectorsRead; this.currentSectorsWritten = currentSectorsWritten; this.previousSectorsWritten = previousSectorsWritten; + this.currentIOTime = currentIOTime; + this.previousIOTime = previousIOTime; + this.currentReadTime = currentReadTime; + this.previousReadTime = previousReadTime; + this.currentWriteTime = currentWriteTime; + this.previousWriteTime = previousWriteTime; + this.currentReadLatency = currentReadLatency; + this.previousReadLatency = previousReadLatency; + this.currentWriteLatency = currentWriteLatency; + this.previousWriteLatency = previousWriteLatency; } public DeviceStats(StreamInput in) throws IOException { @@ -299,6 +344,16 @@ public DeviceStats(StreamInput in) throws IOException { previousSectorsRead = in.readLong(); currentSectorsWritten = in.readLong(); previousSectorsWritten = in.readLong(); + currentIOTime = in.readLong(); + previousIOTime = in.readLong(); + currentReadTime = in.readDouble(); + previousReadTime = in.readDouble(); + currentWriteTime = in.readDouble(); + previousWriteTime = in.readDouble(); + currentReadLatency = in.readDouble(); + previousReadLatency = in.readDouble(); + currentWriteLatency = in.readDouble(); + previousWriteLatency = in.readDouble(); } @Override @@ -314,6 +369,15 @@ public void writeTo(StreamOutput out) throws IOException { out.writeLong(previousSectorsRead); out.writeLong(currentSectorsWritten); out.writeLong(previousSectorsWritten); + out.writeLong(currentIOTime); + out.writeLong(previousIOTime); + out.writeDouble(currentReadTime); + out.writeDouble(currentWriteTime); + out.writeDouble(previousWriteTime); + out.writeDouble(currentReadLatency); + out.writeDouble(previousReadLatency); + out.writeDouble(currentWriteLatency); + out.writeDouble(previousWriteLatency); } public long operations() { @@ -334,6 +398,14 @@ public long writeOperations() { return (currentWritesCompleted - previousWritesCompleted); } + public long currentReadOperations() { + return currentReadsCompleted; + } + + public long currentWriteOpetations() { + return currentWritesCompleted; + } + public long readKilobytes() { if (previousSectorsRead == -1) return -1; @@ -346,6 +418,68 @@ public long writeKilobytes() { return (currentSectorsWritten - previousSectorsWritten) / 2; } + public long ioTimeInMillis() { + if (previousIOTime == -1) return -1; + + return (currentIOTime - previousIOTime); + } + + public double getWriteLatency() { + if(previousWriteLatency == -1.0) return -1.0; + return currentWriteLatency - previousWriteLatency; + } + + public double getNewWriteLatency() { + //double readLatency = getReadTime() / readOperations(); + double writeLatency = getWriteTime() / writeOperations(); + return writeLatency; + } + + public double getNewReadLatency() { + //double readLatency = getReadTime() / readOperations(); + double readLatency = getReadTime() / readOperations(); + return readLatency; + } + + public double getReadLatency() { + if(previousReadLatency == -1.0) return -1.0; + return currentReadLatency - previousReadLatency; + } + + public double getReadTime() { + if(previousReadTime == -1.0) return -1.0; + return currentReadTime - previousReadTime; + } + + public double getWriteTime() { + if(previousWriteTime == -1.0) return -1.0; + return currentWriteTime - previousWriteTime; + } + + public long getCurrentIOTime() { + return this.currentIOTime; + } + + public double getCurrentReadTime() { + return this.currentReadTime; + } + + public double getCurrentWriteTime() { + return this.currentWriteTime; + } + + public double getCurrentReadLatency() { + return this.currentReadLatency; + } + + public double getCurrentWriteLatency() { + return this.currentWriteLatency; + } + + public String getDeviceName() { + return this.deviceName; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.field("device_name", deviceName); @@ -354,6 +488,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(IoStats.WRITE_OPERATIONS, writeOperations()); builder.field(IoStats.READ_KILOBYTES, readKilobytes()); builder.field(IoStats.WRITE_KILOBYTES, writeKilobytes()); + builder.field(IoStats.IO_TIMEMS, ioTimeInMillis()); return builder; } @@ -371,6 +506,7 @@ public static class IoStats implements Writeable, ToXContentFragment { private static final String WRITE_OPERATIONS = "write_operations"; private static final String READ_KILOBYTES = "read_kilobytes"; private static final String WRITE_KILOBYTES = "write_kilobytes"; + private static final String IO_TIMEMS = "io_time_in_millis"; final DeviceStats[] devicesStats; final long totalOperations; @@ -378,6 +514,7 @@ public static class IoStats implements Writeable, ToXContentFragment { final long totalWriteOperations; final long totalReadKilobytes; final long totalWriteKilobytes; + final long totalIOTimeInMillis; public IoStats(final DeviceStats[] devicesStats) { this.devicesStats = devicesStats; @@ -386,18 +523,21 @@ public IoStats(final DeviceStats[] devicesStats) { long totalWriteOperations = 0; long totalReadKilobytes = 0; long totalWriteKilobytes = 0; + long totalIOTimeInMillis = 0; for (DeviceStats deviceStats : devicesStats) { totalOperations += deviceStats.operations() != -1 ? deviceStats.operations() : 0; totalReadOperations += deviceStats.readOperations() != -1 ? deviceStats.readOperations() : 0; totalWriteOperations += deviceStats.writeOperations() != -1 ? deviceStats.writeOperations() : 0; totalReadKilobytes += deviceStats.readKilobytes() != -1 ? deviceStats.readKilobytes() : 0; totalWriteKilobytes += deviceStats.writeKilobytes() != -1 ? deviceStats.writeKilobytes() : 0; + totalIOTimeInMillis += deviceStats.ioTimeInMillis() != -1 ? deviceStats.ioTimeInMillis() : 0; } this.totalOperations = totalOperations; this.totalReadOperations = totalReadOperations; this.totalWriteOperations = totalWriteOperations; this.totalReadKilobytes = totalReadKilobytes; this.totalWriteKilobytes = totalWriteKilobytes; + this.totalIOTimeInMillis = totalIOTimeInMillis; } public IoStats(StreamInput in) throws IOException { @@ -412,6 +552,7 @@ public IoStats(StreamInput in) throws IOException { this.totalWriteOperations = in.readLong(); this.totalReadKilobytes = in.readLong(); this.totalWriteKilobytes = in.readLong(); + this.totalIOTimeInMillis = in.readLong(); } @Override @@ -425,6 +566,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeLong(totalWriteOperations); out.writeLong(totalReadKilobytes); out.writeLong(totalWriteKilobytes); + out.writeLong(totalIOTimeInMillis); } public DeviceStats[] getDevicesStats() { @@ -438,6 +580,9 @@ public long getTotalOperations() { public long getTotalReadOperations() { return totalReadOperations; } + public long getTotalIOTimeMillis() { + return totalIOTimeInMillis; + } public long getTotalWriteOperations() { return totalWriteOperations; @@ -468,6 +613,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(WRITE_OPERATIONS, totalWriteOperations); builder.field(READ_KILOBYTES, totalReadKilobytes); builder.field(WRITE_KILOBYTES, totalWriteKilobytes); + builder.field(IO_TIMEMS, totalIOTimeInMillis); builder.endObject(); } return builder; diff --git a/server/src/main/java/org/opensearch/monitor/fs/FsProbe.java b/server/src/main/java/org/opensearch/monitor/fs/FsProbe.java index 50d1d981f3c98..a366c2fd2e64a 100644 --- a/server/src/main/java/org/opensearch/monitor/fs/FsProbe.java +++ b/server/src/main/java/org/opensearch/monitor/fs/FsProbe.java @@ -123,6 +123,11 @@ final FsInfo.IoStats ioStats(final Set> devicesNumbers, final long sectorsRead = Long.parseLong(fields[5]); final long writesCompleted = Long.parseLong(fields[7]); final long sectorsWritten = Long.parseLong(fields[9]); + final double readTime = Double.parseDouble(fields[6]); + final double writeTime = Double.parseDouble(fields[10]); + final double readLatency = readTime / readsCompleted; + final double writeLatency = writeTime / writesCompleted; + final long ioTime = Long.parseLong(fields[12]); final FsInfo.DeviceStats deviceStats = new FsInfo.DeviceStats( majorDeviceNumber, minorDeviceNumber, @@ -131,6 +136,11 @@ final FsInfo.IoStats ioStats(final Set> devicesNumbers, sectorsRead, writesCompleted, sectorsWritten, + ioTime, + readTime, + writeTime, + readLatency, + writeLatency, deviceMap.get(Tuple.tuple(majorDeviceNumber, minorDeviceNumber)) ); devicesStats.add(deviceStats); diff --git a/server/src/main/java/org/opensearch/node/AdaptiveSelectionStats.java b/server/src/main/java/org/opensearch/node/AdaptiveSelectionStats.java index df5c7b93bdc2d..191fbd1b20bbf 100644 --- a/server/src/main/java/org/opensearch/node/AdaptiveSelectionStats.java +++ b/server/src/main/java/org/opensearch/node/AdaptiveSelectionStats.java @@ -98,6 +98,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws } builder.field("avg_response_time_ns", (long) stats.responseTime); builder.field("rank", String.format(Locale.ROOT, "%.1f", stats.rank(outgoingSearches))); + builder.field("cpu", (double) stats.nodePerfStats.cpuPercentAvg); + builder.field("memory", (double) stats.nodePerfStats.memoryPercentAvg); + builder.field("io", (double) stats.nodePerfStats.ioPercentAvg); } builder.endObject(); } diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index c51eff57de68c..a82e5484bef98 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -36,6 +36,7 @@ import org.apache.logging.log4j.Logger; import org.apache.lucene.util.Constants; import org.opensearch.ExceptionsHelper; +import org.opensearch.admissioncontroller.AdmissionControllerService; import org.opensearch.common.SetOnce; import org.opensearch.common.settings.SettingsException; import org.opensearch.common.unit.ByteSizeUnit; @@ -808,7 +809,13 @@ protected Node( modules.add(actionModule); final RestController restController = actionModule.getRestController(); - + final AdmissionControllerService admissionControllerService = new AdmissionControllerService( + settings, + clusterService.getClusterSettings(), + threadPool, + nodeEnvironment, + monitorService.fsService() + ); final NetworkModule networkModule = new NetworkModule( settings, pluginsService.filterPlugins(NetworkPlugin.class), @@ -820,7 +827,8 @@ protected Node( xContentRegistry, networkService, restController, - clusterService.getClusterSettings() + clusterService.getClusterSettings(), + admissionControllerService ); Collection>> indexTemplateMetadataUpgraders = pluginsService.filterPlugins( Plugin.class @@ -877,7 +885,8 @@ protected Node( final TaskResourceTrackingService taskResourceTrackingService = new TaskResourceTrackingService( settings, clusterService.getClusterSettings(), - threadPool + threadPool, + admissionControllerService ); final SearchBackpressureSettings searchBackpressureSettings = new SearchBackpressureSettings( @@ -1013,11 +1022,12 @@ protected Node( threadPool, scriptService, bigArrays, - searchModule.getQueryPhase(), + searchModule.getQueryPhase(admissionControllerService), searchModule.getFetchPhase(), responseCollectorService, circuitBreakerService, - searchModule.getIndexSearcherExecutor(threadPool) + searchModule.getIndexSearcherExecutor(threadPool), + admissionControllerService ); final List> tasksExecutors = pluginsService.filterPlugins(PersistentTaskPlugin.class) @@ -1089,6 +1099,7 @@ protected Node( b.bind(ClusterInfoService.class).toInstance(clusterInfoService); b.bind(SnapshotsInfoService.class).toInstance(snapshotsInfoService); b.bind(GatewayMetaState.class).toInstance(gatewayMetaState); + b.bind(AdmissionControllerService.class).toInstance(admissionControllerService); b.bind(Discovery.class).toInstance(discoveryModule.getDiscovery()); { processRecoverySettings(settingsModule.getClusterSettings(), recoverySettings); @@ -1236,6 +1247,7 @@ public Node start() throws NodeValidationException { nodeService.getMonitorService().start(); nodeService.getSearchBackpressureService().start(); nodeService.getTaskCancellationMonitoringService().start(); + injector.getInstance(AdmissionControllerService.class).start(); final ClusterService clusterService = injector.getInstance(ClusterService.class); @@ -1394,9 +1406,11 @@ private Node stop() { injector.getInstance(GatewayService.class).stop(); injector.getInstance(SearchService.class).stop(); injector.getInstance(TransportService.class).stop(); + injector.getInstance(AdmissionControllerService.class).stop(); nodeService.getTaskCancellationMonitoringService().stop(); pluginLifecycleComponents.forEach(LifecycleComponent::stop); + // we should stop this last since it waits for resources to get released // if we had scroll searchers etc or recovery going on we wait for to finish. injector.getInstance(IndicesService.class).stop(); @@ -1460,6 +1474,9 @@ public synchronized void close() throws IOException { toClose.add(injector.getInstance(TransportService.class)); toClose.add(nodeService.getTaskCancellationMonitoringService()); + toClose.add(() -> stopWatch.stop().start("admissioncontroller")); + toClose.add(injector.getInstance(AdmissionControllerService.class)); + for (LifecycleComponent plugin : pluginLifecycleComponents) { toClose.add(() -> stopWatch.stop().start("plugin(" + plugin.getClass().getName() + ")")); toClose.add(plugin); @@ -1617,7 +1634,8 @@ protected SearchService newSearchService( FetchPhase fetchPhase, ResponseCollectorService responseCollectorService, CircuitBreakerService circuitBreakerService, - Executor indexSearcherExecutor + Executor indexSearcherExecutor, + AdmissionControllerService admissionControllerService ) { return new SearchService( clusterService, @@ -1629,7 +1647,8 @@ protected SearchService newSearchService( fetchPhase, responseCollectorService, circuitBreakerService, - indexSearcherExecutor + indexSearcherExecutor, + admissionControllerService ); } diff --git a/server/src/main/java/org/opensearch/node/ResponseCollectorService.java b/server/src/main/java/org/opensearch/node/ResponseCollectorService.java index fd246a4ccb25e..42b503c2136a7 100644 --- a/server/src/main/java/org/opensearch/node/ResponseCollectorService.java +++ b/server/src/main/java/org/opensearch/node/ResponseCollectorService.java @@ -32,6 +32,8 @@ package org.opensearch.node; +import org.apache.logging.log4j.LogManager; +import org.opensearch.admissioncontroller.NodePerfStats; import org.opensearch.cluster.ClusterChangedEvent; import org.opensearch.cluster.ClusterStateListener; import org.opensearch.cluster.node.DiscoveryNode; @@ -41,6 +43,7 @@ import org.opensearch.common.io.stream.StreamOutput; import org.opensearch.common.io.stream.Writeable; import org.opensearch.common.util.concurrent.ConcurrentCollections; +import org.opensearch.common.util.concurrent.ThreadContext; import java.io.IOException; import java.util.HashMap; @@ -48,6 +51,7 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.ConcurrentMap; +import java.util.logging.Logger; /** * Collects statistics about queue size, response time, and service time of @@ -59,6 +63,8 @@ public final class ResponseCollectorService implements ClusterStateListener { private static final double ALPHA = 0.3; + org.apache.logging.log4j.Logger logger = LogManager.getLogger(ResponseCollectorService.class); + private final ConcurrentMap nodeIdToStats = ConcurrentCollections.newConcurrentMap(); @@ -80,20 +86,31 @@ void removeNode(String nodeId) { } public void addNodeStatistics(String nodeId, int queueSize, long responseTimeNanos, long avgServiceTimeNanos) { + addNodeStatistics(nodeId, queueSize, responseTimeNanos, avgServiceTimeNanos, null); + } + public void addNodeStatistics(String nodeId, int queueSize, long responseTimeNanos, long avgServiceTimeNanos, + NodePerfStats nodePerfStats) { nodeIdToStats.compute(nodeId, (id, ns) -> { + logger.info("Add node statistics - node id : {} , cpu : {} , mem : {} , io : {}", nodeId, nodePerfStats.cpuPercentAvg, + nodePerfStats.memoryPercentAvg, nodePerfStats.ioPercentAvg); if (ns == null) { ExponentiallyWeightedMovingAverage queueEWMA = new ExponentiallyWeightedMovingAverage(ALPHA, queueSize); ExponentiallyWeightedMovingAverage responseEWMA = new ExponentiallyWeightedMovingAverage(ALPHA, responseTimeNanos); - return new NodeStatistics(nodeId, queueEWMA, responseEWMA, avgServiceTimeNanos); + return new NodeStatistics(nodeId, queueEWMA, responseEWMA, avgServiceTimeNanos, nodePerfStats); } else { ns.queueSize.addValue((double) queueSize); ns.responseTime.addValue((double) responseTimeNanos); ns.serviceTime = avgServiceTimeNanos; + ns.nodePerfStats = nodePerfStats; return ns; } }); } + /** + * This method + * @return + */ public Map getAllNodeStatistics() { final int clientNum = nodeIdToStats.size(); // Transform the mutable object internally used for accounting into the computed version @@ -134,13 +151,18 @@ public static class ComputedNodeStats implements Writeable { public final int queueSize; public final double responseTime; public final double serviceTime; + public final NodePerfStats nodePerfStats; + org.apache.logging.log4j.Logger logger = LogManager.getLogger(ResponseCollectorService.class); + - public ComputedNodeStats(String nodeId, int clientNum, int queueSize, double responseTime, double serviceTime) { + public ComputedNodeStats(String nodeId, int clientNum, int queueSize, double responseTime, double serviceTime, + NodePerfStats nodePerfStats) { this.nodeId = nodeId; this.clientNum = clientNum; this.queueSize = queueSize; this.responseTime = responseTime; this.serviceTime = serviceTime; + this.nodePerfStats = nodePerfStats; } ComputedNodeStats(int clientNum, NodeStatistics nodeStats) { @@ -149,7 +171,8 @@ public ComputedNodeStats(String nodeId, int clientNum, int queueSize, double res clientNum, (int) nodeStats.queueSize.getAverage(), nodeStats.responseTime.getAverage(), - nodeStats.serviceTime + nodeStats.serviceTime, + nodeStats.nodePerfStats ); } @@ -159,6 +182,7 @@ public ComputedNodeStats(String nodeId, int clientNum, int queueSize, double res this.queueSize = in.readInt(); this.responseTime = in.readDouble(); this.serviceTime = in.readDouble(); + this.nodePerfStats = new NodePerfStats(in); } @Override @@ -168,6 +192,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeInt(this.queueSize); out.writeDouble(this.responseTime); out.writeDouble(this.serviceTime); + this.nodePerfStats.writeTo(out); } /** @@ -175,6 +200,7 @@ public void writeTo(StreamOutput out) throws IOException { * https://www.usenix.org/system/files/conference/nsdi15/nsdi15-paper-suresh.pdf */ private double innerRank(long outstandingRequests) { + // // the concurrency compensation is defined as the number of // outstanding requests from the client to the node times the number // of clients in the system @@ -195,13 +221,29 @@ private double innerRank(long outstandingRequests) { // The final formula double rank = rS - (1.0 / muBarS) + (Math.pow(qHatS, queueAdjustmentFactor) / muBarS); + + logger.info("Node id : {} , queue size : {} , queue size with compensation factor : {} , response time : {} ," + + " service time : {} , rank : {}", nodeId, qBar, qHatS, rS, muBarS, rank); + logger.info("Node ID : {} CPU : {} , Mem : {} , IO : {} ", nodeId, nodePerfStats.cpuPercentAvg, + nodePerfStats.memoryPercentAvg, nodePerfStats.ioPercentAvg); + logger.info("rank before adjustment : {}", rank); + // adjust rank if the node is overloaded + // how to adjust rank + if(nodePerfStats.cpuPercentAvg > 90.0 ) rank = rank * 2; + if(nodePerfStats.memoryPercentAvg > 90.0 ) rank = rank * 2; + if(nodePerfStats.ioPercentAvg > 90.0) rank = rank * 2; + // is there flaw in above logic + logger.info("rank after adjustment : {}", rank); return rank; } public double rank(long outstandingRequests) { if (cachedRank == 0) { + logger.info("Cached rank : {}" , cachedRank); cachedRank = innerRank(outstandingRequests); + logger.info("Cached rank post : {}" , cachedRank); } + logger.info("Cached rank post : {}" , cachedRank); return cachedRank; } @@ -229,17 +271,20 @@ private static class NodeStatistics { final ExponentiallyWeightedMovingAverage queueSize; final ExponentiallyWeightedMovingAverage responseTime; double serviceTime; + NodePerfStats nodePerfStats; NodeStatistics( String nodeId, ExponentiallyWeightedMovingAverage queueSizeEWMA, ExponentiallyWeightedMovingAverage responseTimeEWMA, - double serviceTimeEWMA + double serviceTimeEWMA, + NodePerfStats nodePerfStats ) { this.nodeId = nodeId; this.queueSize = queueSizeEWMA; this.responseTime = responseTimeEWMA; this.serviceTime = serviceTimeEWMA; + this.nodePerfStats = nodePerfStats; } } } diff --git a/server/src/main/java/org/opensearch/search/SearchModule.java b/server/src/main/java/org/opensearch/search/SearchModule.java index aeb1d8325b1b8..00fb25b079e58 100644 --- a/server/src/main/java/org/opensearch/search/SearchModule.java +++ b/server/src/main/java/org/opensearch/search/SearchModule.java @@ -33,6 +33,7 @@ package org.opensearch.search; import org.apache.lucene.search.BooleanQuery; +import org.opensearch.admissioncontroller.AdmissionControllerService; import org.opensearch.common.NamedRegistry; import org.opensearch.common.Nullable; import org.opensearch.common.geo.GeoShapeType; @@ -1290,7 +1291,10 @@ public FetchPhase getFetchPhase() { } public QueryPhase getQueryPhase() { - return new QueryPhase(queryPhaseSearcher); + return new QueryPhase(queryPhaseSearcher, null); + } + public QueryPhase getQueryPhase(@Nullable AdmissionControllerService admissionControllerService) { + return new QueryPhase(queryPhaseSearcher, admissionControllerService); } public @Nullable ExecutorService getIndexSearcherExecutor(ThreadPool pool) { diff --git a/server/src/main/java/org/opensearch/search/SearchService.java b/server/src/main/java/org/opensearch/search/SearchService.java index 7d67c6c3b45f4..93a70dab22415 100644 --- a/server/src/main/java/org/opensearch/search/SearchService.java +++ b/server/src/main/java/org/opensearch/search/SearchService.java @@ -49,6 +49,7 @@ import org.opensearch.action.search.UpdatePitContextRequest; import org.opensearch.action.search.UpdatePitContextResponse; import org.opensearch.action.support.TransportActions; +import org.opensearch.admissioncontroller.AdmissionControllerService; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.CheckedSupplier; @@ -304,6 +305,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv private final String sessionId = UUIDs.randomBase64UUID(); private final Executor indexSearcherExecutor; + private final AdmissionControllerService admissionControllerService; + public SearchService( ClusterService clusterService, IndicesService indicesService, @@ -314,7 +317,8 @@ public SearchService( FetchPhase fetchPhase, ResponseCollectorService responseCollectorService, CircuitBreakerService circuitBreakerService, - Executor indexSearcherExecutor + Executor indexSearcherExecutor, + AdmissionControllerService admissionControllerService ) { Settings settings = clusterService.getSettings(); this.threadPool = threadPool; @@ -322,6 +326,7 @@ public SearchService( this.indicesService = indicesService; this.scriptService = scriptService; this.responseCollectorService = responseCollectorService; + this.admissionControllerService = admissionControllerService; this.bigArrays = bigArrays; this.queryPhase = queryPhase; this.fetchPhase = fetchPhase; @@ -1470,6 +1475,10 @@ public ResponseCollectorService getResponseCollectorService() { return this.responseCollectorService; } + public AdmissionControllerService getAdmissionControllerService() { + return this.admissionControllerService; + } + class Reaper implements Runnable { @Override public void run() { diff --git a/server/src/main/java/org/opensearch/search/query/QueryPhase.java b/server/src/main/java/org/opensearch/search/query/QueryPhase.java index 069b410280d63..ae24cdba7d2ea 100644 --- a/server/src/main/java/org/opensearch/search/query/QueryPhase.java +++ b/server/src/main/java/org/opensearch/search/query/QueryPhase.java @@ -47,11 +47,16 @@ import org.apache.lucene.search.TopDocs; import org.apache.lucene.search.TotalHits; import org.opensearch.action.search.SearchShardTask; +import org.opensearch.admissioncontroller.AdmissionControllerService; +import org.opensearch.admissioncontroller.NodePerfStats; import org.opensearch.common.Booleans; +import org.opensearch.common.Nullable; import org.opensearch.common.lucene.Lucene; import org.opensearch.common.lucene.search.TopDocsAndMaxScore; import org.opensearch.common.util.concurrent.EWMATrackingThreadPoolExecutor; import org.opensearch.lucene.queries.SearchAfterSortedDocQuery; +import org.opensearch.monitor.process.ProcessStats; +import org.opensearch.node.Node; import org.opensearch.search.DocValueFormat; import org.opensearch.search.SearchContextSourcePrinter; import org.opensearch.search.SearchService; @@ -99,14 +104,17 @@ public class QueryPhase { private final SuggestProcessor suggestProcessor; private final RescoreProcessor rescoreProcessor; + private final AdmissionControllerService admissionControllerService; + public QueryPhase() { - this(DEFAULT_QUERY_PHASE_SEARCHER); + this(DEFAULT_QUERY_PHASE_SEARCHER, null); } - public QueryPhase(QueryPhaseSearcher queryPhaseSearcher) { + public QueryPhase(QueryPhaseSearcher queryPhaseSearcher, @Nullable AdmissionControllerService admissionControllerService) { this.queryPhaseSearcher = Objects.requireNonNull(queryPhaseSearcher, "QueryPhaseSearcher is required"); this.suggestProcessor = new SuggestProcessor(); this.rescoreProcessor = new RescoreProcessor(); + this.admissionControllerService = admissionControllerService; } public void preProcess(SearchContext context) { @@ -150,7 +158,7 @@ public void execute(SearchContext searchContext) throws QueryPhaseExecutionExcep // request, preProcess is called on the DFS phase phase, this is why we pre-process them // here to make sure it happens during the QUERY phase aggregationProcessor.preProcess(searchContext); - boolean rescore = executeInternal(searchContext, queryPhaseSearcher); + boolean rescore = executeInternal(searchContext, queryPhaseSearcher, this.admissionControllerService); if (rescore) { // only if we do a regular search rescoreProcessor.process(searchContext); @@ -178,15 +186,19 @@ public QueryPhaseSearcher getQueryPhaseSearcher() { * @return whether the rescoring phase should be executed */ static boolean executeInternal(SearchContext searchContext) throws QueryPhaseExecutionException { - return executeInternal(searchContext, QueryPhase.DEFAULT_QUERY_PHASE_SEARCHER); + return executeInternal(searchContext, QueryPhase.DEFAULT_QUERY_PHASE_SEARCHER, null); } + static boolean executeInternal(SearchContext searchContext, QueryPhaseSearcher queryPhaseSearcher) throws QueryPhaseExecutionException { + return executeInternal(searchContext, queryPhaseSearcher, null); + } /** * In a package-private method so that it can be tested without having to * wire everything (mapperService, etc.) * @return whether the rescoring phase should be executed */ - static boolean executeInternal(SearchContext searchContext, QueryPhaseSearcher queryPhaseSearcher) throws QueryPhaseExecutionException { + static boolean executeInternal(SearchContext searchContext, QueryPhaseSearcher queryPhaseSearcher, + @Nullable AdmissionControllerService admissionControllerService) throws QueryPhaseExecutionException { final ContextIndexSearcher searcher = searchContext.searcher(); final IndexReader reader = searcher.getIndexReader(); QuerySearchResult queryResult = searchContext.queryResult(); @@ -287,10 +299,23 @@ static boolean executeInternal(SearchContext searchContext, QueryPhaseSearcher q ); ExecutorService executor = searchContext.indexShard().getThreadPool().executor(ThreadPool.Names.SEARCH); + //Thread.sleep(5000); if (executor instanceof EWMATrackingThreadPoolExecutor) { final EWMATrackingThreadPoolExecutor rExecutor = (EWMATrackingThreadPoolExecutor) executor; queryResult.nodeQueueSize(rExecutor.getCurrentQueueSize()); queryResult.serviceTimeEWMA((long) rExecutor.getTaskExecutionEWMA()); + LOGGER.info("Adding node perf stats for CPU : {} , MEM : {} , IO : {}" ,admissionControllerService.getCPUEWMA() * 100, + admissionControllerService.getMemoryEWMA() * 100, admissionControllerService.getIoEWMA() * 100); + NodePerfStats nodePerfStats = null; + if(admissionControllerService != null) { + nodePerfStats = new NodePerfStats( + admissionControllerService.getCPUEWMA() * 100, + admissionControllerService.getMemoryEWMA() * 100, + admissionControllerService.getIoEWMA() * 100 + ); + queryResult.nodePerfStats(nodePerfStats); + + } } return shouldRescore; diff --git a/server/src/main/java/org/opensearch/search/query/QuerySearchResult.java b/server/src/main/java/org/opensearch/search/query/QuerySearchResult.java index a0c2970625472..35832c1b58b20 100644 --- a/server/src/main/java/org/opensearch/search/query/QuerySearchResult.java +++ b/server/src/main/java/org/opensearch/search/query/QuerySearchResult.java @@ -34,6 +34,7 @@ import org.apache.lucene.search.FieldDoc; import org.apache.lucene.search.TotalHits; +import org.opensearch.admissioncontroller.NodePerfStats; import org.opensearch.common.io.stream.DelayableWriteable; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; @@ -85,7 +86,7 @@ public final class QuerySearchResult extends SearchPhaseResult { private boolean hasProfileResults; private long serviceTimeEWMA = -1; private int nodeQueueSize = -1; - + private NodePerfStats nodePerfStats; private final boolean isNull; public QuerySearchResult() { @@ -311,11 +312,20 @@ public long serviceTimeEWMA() { return this.serviceTimeEWMA; } + public NodePerfStats getNodePerfStats() { + return this.nodePerfStats; + } + public QuerySearchResult serviceTimeEWMA(long serviceTimeEWMA) { this.serviceTimeEWMA = serviceTimeEWMA; return this; } + public QuerySearchResult nodePerfStats(NodePerfStats nodePerfStats) { + this.nodePerfStats = nodePerfStats; + return this; + } + public int nodeQueueSize() { return this.nodeQueueSize; } @@ -362,6 +372,7 @@ public void readFromWithId(ShardSearchContextId id, StreamInput in) throws IOExc hasProfileResults = profileShardResults != null; serviceTimeEWMA = in.readZLong(); nodeQueueSize = in.readInt(); + nodePerfStats = in.readOptionalWriteable(NodePerfStats::new); setShardSearchRequest(in.readOptionalWriteable(ShardSearchRequest::new)); setRescoreDocIds(new RescoreDocIds(in)); } @@ -404,6 +415,8 @@ public void writeToNoId(StreamOutput out) throws IOException { out.writeOptionalWriteable(profileShardResults); out.writeZLong(serviceTimeEWMA); out.writeInt(nodeQueueSize); + out.writeOptionalWriteable(nodePerfStats); + out.writeOptionalWriteable(getShardSearchRequest()); getRescoreDocIds().writeTo(out); } diff --git a/server/src/main/java/org/opensearch/tasks/TaskResourceTrackingService.java b/server/src/main/java/org/opensearch/tasks/TaskResourceTrackingService.java index b4806b531429e..68e0783cb7a3a 100644 --- a/server/src/main/java/org/opensearch/tasks/TaskResourceTrackingService.java +++ b/server/src/main/java/org/opensearch/tasks/TaskResourceTrackingService.java @@ -13,6 +13,8 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.ExceptionsHelper; +import org.opensearch.admissioncontroller.AdmissionControllerService; +import org.opensearch.admissioncontroller.NodePerfStats; import org.opensearch.common.SuppressForbidden; import org.opensearch.common.inject.Inject; import org.opensearch.common.settings.ClusterSettings; @@ -55,11 +57,15 @@ public class TaskResourceTrackingService implements RunnableTaskExecutionListene private final ThreadPool threadPool; private volatile boolean taskResourceTrackingEnabled; + private final AdmissionControllerService admissionControllerService; + @Inject - public TaskResourceTrackingService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) { + public TaskResourceTrackingService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool, + AdmissionControllerService admissionControllerService) { this.taskResourceTrackingEnabled = TASK_RESOURCE_TRACKING_ENABLED.get(settings); this.threadPool = threadPool; clusterSettings.addSettingsUpdateConsumer(TASK_RESOURCE_TRACKING_ENABLED, this::setTaskResourceTrackingEnabled); + this.admissionControllerService = admissionControllerService; } public void setTaskResourceTrackingEnabled(boolean taskResourceTrackingEnabled) { @@ -87,6 +93,7 @@ public ThreadContext.StoredContext startTracking(Task task) { if (task.supportsResourceTracking() == false || isTaskResourceTrackingEnabled() == false || isTaskResourceTrackingSupported() == false) { + //return addPerfStatsToThreadContext(task); return () -> {}; } @@ -193,6 +200,7 @@ public void taskExecutionStartedOnThread(long taskId, long threadId) { public void taskExecutionFinishedOnThread(long taskId, long threadId) { try { final Task task = resourceAwareTasks.get(taskId); + long id = (long)threadPool.getThreadContext().getTransient("TASK_ID"); if (task != null) { logger.debug("Task execution finished on thread. Task: {}, Thread: {}", taskId, threadId); task.stopThreadResourceTracking(threadId, WORKER_STATS, getResourceUsageMetricsForThread(threadId)); @@ -257,6 +265,17 @@ private ThreadContext.StoredContext addTaskIdToThreadContext(Task task) { return storedContext; } + private ThreadContext.StoredContext addPerfStatsToThreadContext(Task task) { + ThreadContext threadContext = threadPool.getThreadContext(); + + ThreadContext.StoredContext storedContext = threadContext.newStoredContext(true, + Collections.singletonList("PERF_STATS")); + NodePerfStats nodePerfStats = new NodePerfStats(admissionControllerService.getCPUEWMA(), admissionControllerService.getMemoryEWMA(), + admissionControllerService.getIoEWMA()); + threadContext.putTransient("PERF_STATS", nodePerfStats); + return storedContext; + } + /** * Listener that gets invoked when a task execution completes. */ diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java index d99b93b780140..dc50ced3bc91c 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java @@ -32,6 +32,7 @@ package org.opensearch.action.admin.cluster.node.stats; +import org.opensearch.admissioncontroller.NodePerfStats; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.routing.WeightedRoutingStats; import org.opensearch.cluster.service.ClusterManagerThrottlingStats; @@ -584,6 +585,7 @@ public static NodeStats createNodeStats() { randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), + randomNonNegativeLong(), null ); deviceStatsArray[i] = new FsInfo.DeviceStats( @@ -594,6 +596,7 @@ public static NodeStats createNodeStats() { randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), + randomNonNegativeLong(), previousDeviceStats ); } @@ -708,7 +711,8 @@ public static NodeStats createNodeStats() { randomIntBetween(1, 10), randomIntBetween(0, 2000), randomDoubleBetween(1.0, 10000000.0, true), - randomDoubleBetween(1.0, 10000000.0, true) + randomDoubleBetween(1.0, 10000000.0, true), + new NodePerfStats(95.0, 95.0, 95.0) ); nodeStats.put(nodeId, stats); } diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java b/server/src/test/java/org/opensearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java index 843fd85d81197..0284265273085 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java @@ -236,7 +236,7 @@ protected TaskManager createTaskManager( transportService.start(); clusterService = createClusterService(threadPool, discoveryNode.get()); clusterService.addStateApplier(transportService.getTaskManager()); - taskResourceTrackingService = new TaskResourceTrackingService(settings, clusterService.getClusterSettings(), threadPool); + taskResourceTrackingService = new TaskResourceTrackingService(settings, clusterService.getClusterSettings(), threadPool, null); transportService.getTaskManager().setTaskResourceTrackingService(taskResourceTrackingService); ActionFilters actionFilters = new ActionFilters(emptySet()); transportListTasksAction = new TransportListTasksAction( diff --git a/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java b/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java index cc7b5cb8dc845..0d621b92d4149 100644 --- a/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java +++ b/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java @@ -279,7 +279,8 @@ public void testSkipBulkIndexRequestIfAborted() throws Exception { } }), latch::countDown), threadPool, - Names.WRITE + Names.WRITE, + "NoedId" ); latch.await(); @@ -941,7 +942,8 @@ public void testRetries() throws Exception { assertThat(response.getSeqNo(), equalTo(13L)); }), latch), threadPool, - Names.WRITE + Names.WRITE, + "NodeId" ); latch.await(); } @@ -1024,7 +1026,8 @@ public void testForceExecutionOnRejectionAfterMappingUpdate() throws Exception { // Assert that we still need to fsync the location that was successfully written assertThat(((WritePrimaryResult) result).location, equalTo(resultLocation1))), latch), rejectingThreadPool, - Names.WRITE + Names.WRITE, + "nodeId" ); latch.await(); diff --git a/server/src/test/java/org/opensearch/cluster/routing/OperationRoutingTests.java b/server/src/test/java/org/opensearch/cluster/routing/OperationRoutingTests.java index 55885fb61ee0c..e2a6407a384bb 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/OperationRoutingTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/OperationRoutingTests.java @@ -33,6 +33,7 @@ import org.opensearch.Version; import org.opensearch.action.support.replication.ClusterStateCreationUtils; +import org.opensearch.admissioncontroller.AdmissionControllerService; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.Metadata; @@ -45,9 +46,13 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.io.IOUtils; +import org.opensearch.env.Environment; +import org.opensearch.env.NodeEnvironment; import org.opensearch.index.Index; import org.opensearch.index.IndexModule; import org.opensearch.index.shard.ShardId; +import org.opensearch.index.store.remote.filecache.FileCache; +import org.opensearch.monitor.fs.FsService; import org.opensearch.node.ResponseCollectorService; import org.opensearch.test.ClusterServiceUtils; import org.opensearch.test.OpenSearchTestCase; @@ -595,6 +600,13 @@ public void testAdaptiveReplicaSelection() throws Exception { TestThreadPool threadPool = new TestThreadPool("testThatOnlyNodesSupportNodeIds"); ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool); ResponseCollectorService collector = new ResponseCollectorService(clusterService); + AdmissionControllerService admissionControllerService = new AdmissionControllerService( + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + threadPool, + new NodeEnvironment(Settings.EMPTY, new Environment(Settings.EMPTY, null)), + new FsService(Settings.EMPTY, new NodeEnvironment(Settings.EMPTY, new Environment(Settings.EMPTY, null)), new FileCache(null, null)) + ); Map outstandingRequests = new HashMap<>(); GroupShardsIterator groupIterator = opRouting.searchShards( state, @@ -602,7 +614,8 @@ public void testAdaptiveReplicaSelection() throws Exception { null, null, collector, - outstandingRequests + outstandingRequests, + admissionControllerService ); assertThat("One group per index shard", groupIterator.size(), equalTo(numIndices * numShards)); @@ -614,7 +627,7 @@ public void testAdaptiveReplicaSelection() throws Exception { searchedShards.add(firstChoice); selectedNodes.add(firstChoice.currentNodeId()); - groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, outstandingRequests); + groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, outstandingRequests, admissionControllerService); assertThat(groupIterator.size(), equalTo(numIndices * numShards)); ShardRouting secondChoice = groupIterator.get(0).nextOrNull(); @@ -622,7 +635,7 @@ public void testAdaptiveReplicaSelection() throws Exception { searchedShards.add(secondChoice); selectedNodes.add(secondChoice.currentNodeId()); - groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, outstandingRequests); + groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, outstandingRequests, admissionControllerService); assertThat(groupIterator.size(), equalTo(numIndices * numShards)); ShardRouting thirdChoice = groupIterator.get(0).nextOrNull(); diff --git a/server/src/test/java/org/opensearch/common/network/NetworkModuleTests.java b/server/src/test/java/org/opensearch/common/network/NetworkModuleTests.java index 3a22fed5bb2ec..347d8ff0eed45 100644 --- a/server/src/test/java/org/opensearch/common/network/NetworkModuleTests.java +++ b/server/src/test/java/org/opensearch/common/network/NetworkModuleTests.java @@ -313,7 +313,8 @@ private NetworkModule newNetworkModule(Settings settings, NetworkPlugin... plugi xContentRegistry(), null, new NullDispatcher(), - new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + null ); } } diff --git a/server/src/test/java/org/opensearch/monitor/fs/DeviceStatsTests.java b/server/src/test/java/org/opensearch/monitor/fs/DeviceStatsTests.java index 0fd039b84e887..8de3417a01c9c 100644 --- a/server/src/test/java/org/opensearch/monitor/fs/DeviceStatsTests.java +++ b/server/src/test/java/org/opensearch/monitor/fs/DeviceStatsTests.java @@ -46,7 +46,7 @@ public void testDeviceStats() { final int sectorsRead = randomIntBetween(8 * readsCompleted, 16 * readsCompleted); final int writesCompleted = randomIntBetween(1, 1 << 16); final int sectorsWritten = randomIntBetween(8 * writesCompleted, 16 * writesCompleted); - + final int ioTime = randomIntBetween(1, 1 << 16); FsInfo.DeviceStats previous = new FsInfo.DeviceStats( majorDeviceNumber, minorDeviceNumber, @@ -55,6 +55,7 @@ public void testDeviceStats() { sectorsRead, writesCompleted, sectorsWritten, + ioTime, null ); FsInfo.DeviceStats current = new FsInfo.DeviceStats( @@ -65,6 +66,7 @@ public void testDeviceStats() { sectorsRead + 16384, writesCompleted + 2048, sectorsWritten + 32768, + ioTime + 128, previous ); assertThat(current.operations(), equalTo(1024L + 2048L)); @@ -72,6 +74,7 @@ public void testDeviceStats() { assertThat(current.writeOperations(), equalTo(2048L)); assertThat(current.readKilobytes(), equalTo(16384L / 2)); assertThat(current.writeKilobytes(), equalTo(32768L / 2)); + assertThat(current.ioTimeInMillis(), equalTo(128L)); } } diff --git a/server/src/test/java/org/opensearch/node/ResponseCollectorServiceTests.java b/server/src/test/java/org/opensearch/node/ResponseCollectorServiceTests.java index 2b13df3027cfa..e5c14c64485f6 100644 --- a/server/src/test/java/org/opensearch/node/ResponseCollectorServiceTests.java +++ b/server/src/test/java/org/opensearch/node/ResponseCollectorServiceTests.java @@ -78,6 +78,7 @@ public void tearDown() throws Exception { } public void testNodeStats() throws Exception { + // TODO collector.addNodeStatistics("node1", 1, 100, 10); Map nodeStats = collector.getAllNodeStatistics(); assertTrue(nodeStats.containsKey("node1")); diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 0bb2b604e8f1a..cecc1b1765778 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -1762,7 +1762,7 @@ public void onFailure(final Exception e) { new ThreadContext(Settings.EMPTY) ); transportService.getTaskManager() - .setTaskResourceTrackingService(new TaskResourceTrackingService(settings, clusterSettings, threadPool)); + .setTaskResourceTrackingService(new TaskResourceTrackingService(settings, clusterSettings, threadPool, null)); repositoriesService = new RepositoriesService( settings, clusterService, @@ -2034,6 +2034,7 @@ public void onFailure(final Exception e) { new FetchPhase(Collections.emptyList()), responseCollectorService, new NoneCircuitBreakerService(), + null, null ); SearchPhaseController searchPhaseController = new SearchPhaseController( diff --git a/server/src/test/java/org/opensearch/tasks/TaskResourceTrackingServiceTests.java b/server/src/test/java/org/opensearch/tasks/TaskResourceTrackingServiceTests.java index 3dcd634c234a3..c40349251db0e 100644 --- a/server/src/test/java/org/opensearch/tasks/TaskResourceTrackingServiceTests.java +++ b/server/src/test/java/org/opensearch/tasks/TaskResourceTrackingServiceTests.java @@ -41,7 +41,8 @@ public void setup() { taskResourceTrackingService = new TaskResourceTrackingService( Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), - threadPool + threadPool, + null ); } diff --git a/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java index b815a819756b7..fff060939d1c8 100644 --- a/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java @@ -981,7 +981,8 @@ private void executeShardBulkOnPrimary( listener.onResponse((TransportWriteAction.WritePrimaryResult) result); }), threadPool, - Names.WRITE + Names.WRITE, + "NodeId" ); } catch (Exception e) { listener.onFailure(e); diff --git a/test/framework/src/main/java/org/opensearch/node/MockNode.java b/test/framework/src/main/java/org/opensearch/node/MockNode.java index 1d8d2dd086418..e828c128f4433 100644 --- a/test/framework/src/main/java/org/opensearch/node/MockNode.java +++ b/test/framework/src/main/java/org/opensearch/node/MockNode.java @@ -32,6 +32,7 @@ package org.opensearch.node; +import org.opensearch.admissioncontroller.AdmissionControllerService; import org.opensearch.client.node.NodeClient; import org.opensearch.cluster.ClusterInfoService; import org.opensearch.cluster.MockInternalClusterInfoService; @@ -154,7 +155,8 @@ protected SearchService newSearchService( FetchPhase fetchPhase, ResponseCollectorService responseCollectorService, CircuitBreakerService circuitBreakerService, - Executor indexSearcherExecutor + Executor indexSearcherExecutor, + AdmissionControllerService admissionControllerService ) { if (getPluginsService().filterPlugins(MockSearchService.TestPlugin.class).isEmpty()) { return super.newSearchService( @@ -167,7 +169,8 @@ protected SearchService newSearchService( fetchPhase, responseCollectorService, circuitBreakerService, - indexSearcherExecutor + indexSearcherExecutor, + admissionControllerService ); } return new MockSearchService( @@ -179,7 +182,8 @@ protected SearchService newSearchService( queryPhase, fetchPhase, circuitBreakerService, - indexSearcherExecutor + indexSearcherExecutor, + admissionControllerService ); } diff --git a/test/framework/src/main/java/org/opensearch/search/MockSearchService.java b/test/framework/src/main/java/org/opensearch/search/MockSearchService.java index 808dc50512c58..fde09d23f2101 100644 --- a/test/framework/src/main/java/org/opensearch/search/MockSearchService.java +++ b/test/framework/src/main/java/org/opensearch/search/MockSearchService.java @@ -32,6 +32,7 @@ package org.opensearch.search; +import org.opensearch.admissioncontroller.AdmissionControllerService; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.util.BigArrays; import org.opensearch.indices.IndicesService; @@ -96,7 +97,8 @@ public MockSearchService( QueryPhase queryPhase, FetchPhase fetchPhase, CircuitBreakerService circuitBreakerService, - Executor indexSearcherExecutor + Executor indexSearcherExecutor, + AdmissionControllerService admissionControllerService ) { super( clusterService, @@ -108,7 +110,8 @@ public MockSearchService( fetchPhase, null, circuitBreakerService, - indexSearcherExecutor + indexSearcherExecutor, + admissionControllerService ); }