From 9f48a023da5fbf2dd8d5b74232851b6f4e089908 Mon Sep 17 00:00:00 2001 From: Bharathwaj G Date: Mon, 13 Mar 2023 09:51:28 +0530 Subject: [PATCH 01/11] fixing input validation in segments and delete pit request Signed-off-by: Bharathwaj G --- CHANGELOG.md | 2 +- .../action/admin/indices/segments/PitSegmentsRequest.java | 6 +++++- .../java/org/opensearch/action/search/DeletePitRequest.java | 6 +++++- 3 files changed, 11 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index aa68140715202..9f9ab155067bc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -68,7 +68,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Fix 'org.apache.hc.core5.http.ParseException: Invalid protocol version' under JDK 16+ ([#4827](https://github.com/opensearch-project/OpenSearch/pull/4827)) - Fix compression support for h2c protocol ([#4944](https://github.com/opensearch-project/OpenSearch/pull/4944)) - Support OpenSSL Provider with default Netty allocator ([#5460](https://github.com/opensearch-project/OpenSearch/pull/5460)) - +- Fixing input validation in segments and delete pit request ([#6645](https://github.com/opensearch-project/OpenSearch/pull/6645)) ### Security ## [Unreleased 2.x] diff --git a/server/src/main/java/org/opensearch/action/admin/indices/segments/PitSegmentsRequest.java b/server/src/main/java/org/opensearch/action/admin/indices/segments/PitSegmentsRequest.java index 35a4deba7bb43..a9728d139f4d1 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/segments/PitSegmentsRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/segments/PitSegmentsRequest.java @@ -19,7 +19,9 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashSet; import java.util.List; +import java.util.Set; import static org.opensearch.action.ValidateActions.addValidationError; @@ -87,7 +89,7 @@ public ActionRequestValidationException validate() { } public void fromXContent(XContentParser parser) throws IOException { - pitIds.clear(); + Set pitIds = new HashSet<>(); if (parser.nextToken() != XContentParser.Token.START_OBJECT) { throw new IllegalArgumentException("Malformed content, must start with an object"); } else { @@ -117,5 +119,7 @@ public void fromXContent(XContentParser parser) throws IOException { } } } + this.pitIds.clear(); + this.pitIds.addAll(pitIds); } } diff --git a/server/src/main/java/org/opensearch/action/search/DeletePitRequest.java b/server/src/main/java/org/opensearch/action/search/DeletePitRequest.java index 90cd1ed474d61..a627aaf26ee44 100644 --- a/server/src/main/java/org/opensearch/action/search/DeletePitRequest.java +++ b/server/src/main/java/org/opensearch/action/search/DeletePitRequest.java @@ -21,7 +21,9 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashSet; import java.util.List; +import java.util.Set; import static org.opensearch.action.ValidateActions.addValidationError; @@ -91,7 +93,7 @@ public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params par } public void fromXContent(XContentParser parser) throws IOException { - pitIds.clear(); + Set pitIds = new HashSet<>(); if (parser.nextToken() != XContentParser.Token.START_OBJECT) { throw new IllegalArgumentException("Malformed content, must start with an object"); } else { @@ -121,6 +123,8 @@ public void fromXContent(XContentParser parser) throws IOException { } } } + this.pitIds.clear(); + this.pitIds.addAll(pitIds); } } From 84ee421c7b4f7e89a432c0b101ff5b8703686c3c Mon Sep 17 00:00:00 2001 From: Bharathwaj G Date: Thu, 18 May 2023 21:23:51 +0530 Subject: [PATCH 02/11] Update CHANGELOG.md - addressing comments Signed-off-by: Bharathwaj G --- CHANGELOG.md | 1 - 1 file changed, 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f90c6ed049ba5..97cff1063d632 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -74,7 +74,6 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Fix compression support for h2c protocol ([#4944](https://github.com/opensearch-project/OpenSearch/pull/4944)) - Support OpenSSL Provider with default Netty allocator ([#5460](https://github.com/opensearch-project/OpenSearch/pull/5460)) - Fixing input validation in segments and delete pit request ([#6645](https://github.com/opensearch-project/OpenSearch/pull/6645)) -- Avoid negative memory result in IndicesQueryCache stats calculation ([#6917](https://github.com/opensearch-project/OpenSearch/pull/6917)) - Replaces ZipInputStream with ZipFile to fix Zip Slip vulnerability ([#7230](https://github.com/opensearch-project/OpenSearch/pull/7230)) - Add missing validation/parsing of SearchBackpressureMode of SearchBackpressureSettings ([#7541](https://github.com/opensearch-project/OpenSearch/pull/7541)) From 3072b8eb1e1d752571c463a3df9e8f5b6a2470ed Mon Sep 17 00:00:00 2001 From: Bharathwaj G Date: Wed, 24 May 2023 09:53:03 +0530 Subject: [PATCH 03/11] adddressing comments Signed-off-by: Bharathwaj G --- CHANGELOG.md | 2 +- .../action/admin/indices/segments/PitSegmentsRequest.java | 6 +----- .../indices/segments/TransportPitSegmentsAction.java | 8 ++++++-- .../org/opensearch/action/search/DeletePitRequest.java | 6 +----- .../action/search/TransportDeletePitAction.java | 8 ++++++-- 5 files changed, 15 insertions(+), 15 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 97cff1063d632..7c6b7fdd22575 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -73,7 +73,6 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Fix 'org.apache.hc.core5.http.ParseException: Invalid protocol version' under JDK 16+ ([#4827](https://github.com/opensearch-project/OpenSearch/pull/4827)) - Fix compression support for h2c protocol ([#4944](https://github.com/opensearch-project/OpenSearch/pull/4944)) - Support OpenSSL Provider with default Netty allocator ([#5460](https://github.com/opensearch-project/OpenSearch/pull/5460)) -- Fixing input validation in segments and delete pit request ([#6645](https://github.com/opensearch-project/OpenSearch/pull/6645)) - Replaces ZipInputStream with ZipFile to fix Zip Slip vulnerability ([#7230](https://github.com/opensearch-project/OpenSearch/pull/7230)) - Add missing validation/parsing of SearchBackpressureMode of SearchBackpressureSettings ([#7541](https://github.com/opensearch-project/OpenSearch/pull/7541)) @@ -132,6 +131,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add more index blocks check for resize APIs ([#6774](https://github.com/opensearch-project/OpenSearch/pull/6774)) - Replaces ZipInputStream with ZipFile to fix Zip Slip vulnerability ([#7230](https://github.com/opensearch-project/OpenSearch/pull/7230)) - Add missing validation/parsing of SearchBackpressureMode of SearchBackpressureSettings ([#7541](https://github.com/opensearch-project/OpenSearch/pull/7541)) +- Fix input validation in segments and delete pit request ([#6645](https://github.com/opensearch-project/OpenSearch/pull/6645)) ### Security diff --git a/server/src/main/java/org/opensearch/action/admin/indices/segments/PitSegmentsRequest.java b/server/src/main/java/org/opensearch/action/admin/indices/segments/PitSegmentsRequest.java index a9728d139f4d1..35a4deba7bb43 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/segments/PitSegmentsRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/segments/PitSegmentsRequest.java @@ -19,9 +19,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; -import java.util.HashSet; import java.util.List; -import java.util.Set; import static org.opensearch.action.ValidateActions.addValidationError; @@ -89,7 +87,7 @@ public ActionRequestValidationException validate() { } public void fromXContent(XContentParser parser) throws IOException { - Set pitIds = new HashSet<>(); + pitIds.clear(); if (parser.nextToken() != XContentParser.Token.START_OBJECT) { throw new IllegalArgumentException("Malformed content, must start with an object"); } else { @@ -119,7 +117,5 @@ public void fromXContent(XContentParser parser) throws IOException { } } } - this.pitIds.clear(); - this.pitIds.addAll(pitIds); } } diff --git a/server/src/main/java/org/opensearch/action/admin/indices/segments/TransportPitSegmentsAction.java b/server/src/main/java/org/opensearch/action/admin/indices/segments/TransportPitSegmentsAction.java index 348a99fe89a34..7e08a4c9c2e00 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/segments/TransportPitSegmentsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/segments/TransportPitSegmentsAction.java @@ -44,8 +44,10 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; import static org.opensearch.action.search.SearchContextId.decode; @@ -94,8 +96,10 @@ public TransportPitSegmentsAction( */ @Override protected void doExecute(Task task, PitSegmentsRequest request, ActionListener listener) { - List pitIds = request.getPitIds(); - if (pitIds.size() == 1 && "_all".equals(pitIds.get(0))) { + // remove pit id duplicates from the request + Set pitIdSet = new LinkedHashSet<>(request.getPitIds()); + request.clearAndSetPitIds(new ArrayList<>(pitIdSet)); + if (request.getPitIds().size() == 1 && "_all".equals(request.getPitIds().get(0))) { pitService.getAllPits(ActionListener.wrap(response -> { request.clearAndSetPitIds(response.getPitInfos().stream().map(ListPitInfo::getPitId).collect(Collectors.toList())); super.doExecute(task, request, listener); diff --git a/server/src/main/java/org/opensearch/action/search/DeletePitRequest.java b/server/src/main/java/org/opensearch/action/search/DeletePitRequest.java index a627aaf26ee44..90cd1ed474d61 100644 --- a/server/src/main/java/org/opensearch/action/search/DeletePitRequest.java +++ b/server/src/main/java/org/opensearch/action/search/DeletePitRequest.java @@ -21,9 +21,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.HashSet; import java.util.List; -import java.util.Set; import static org.opensearch.action.ValidateActions.addValidationError; @@ -93,7 +91,7 @@ public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params par } public void fromXContent(XContentParser parser) throws IOException { - Set pitIds = new HashSet<>(); + pitIds.clear(); if (parser.nextToken() != XContentParser.Token.START_OBJECT) { throw new IllegalArgumentException("Malformed content, must start with an object"); } else { @@ -123,8 +121,6 @@ public void fromXContent(XContentParser parser) throws IOException { } } } - this.pitIds.clear(); - this.pitIds.addAll(pitIds); } } diff --git a/server/src/main/java/org/opensearch/action/search/TransportDeletePitAction.java b/server/src/main/java/org/opensearch/action/search/TransportDeletePitAction.java index b85fe302a748f..77e650bb89ef3 100644 --- a/server/src/main/java/org/opensearch/action/search/TransportDeletePitAction.java +++ b/server/src/main/java/org/opensearch/action/search/TransportDeletePitAction.java @@ -18,8 +18,10 @@ import java.util.ArrayList; import java.util.HashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; /** @@ -46,8 +48,10 @@ public TransportDeletePitAction( */ @Override protected void doExecute(Task task, DeletePitRequest request, ActionListener listener) { - List pitIds = request.getPitIds(); - if (pitIds.size() == 1 && "_all".equals(pitIds.get(0))) { + // remove pit id duplicates from the request + Set pitIdSet = new LinkedHashSet<>(request.getPitIds()); + request.clearAndSetPitIds(new ArrayList<>(pitIdSet)); + if (request.getPitIds().size() == 1 && "_all".equals(request.getPitIds().get(0))) { deleteAllPits(listener); } else { deletePits(listener, request); From 197cbec26a85ac7cc56ac65a09591bbc252d4698 Mon Sep 17 00:00:00 2001 From: Bharathwaj G Date: Wed, 24 May 2023 23:22:49 +0530 Subject: [PATCH 04/11] addressing comments Signed-off-by: Bharathwaj G --- .../admin/indices/segments/TransportPitSegmentsAction.java | 7 +++---- .../opensearch/action/search/TransportDeletePitAction.java | 7 +++---- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/admin/indices/segments/TransportPitSegmentsAction.java b/server/src/main/java/org/opensearch/action/admin/indices/segments/TransportPitSegmentsAction.java index 323d9e1e9d131..d843b3a9452a5 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/segments/TransportPitSegmentsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/segments/TransportPitSegmentsAction.java @@ -96,9 +96,6 @@ public TransportPitSegmentsAction( */ @Override protected void doExecute(Task task, PitSegmentsRequest request, ActionListener listener) { - // remove pit id duplicates from the request - Set pitIdSet = new LinkedHashSet<>(request.getPitIds()); - request.clearAndSetPitIds(new ArrayList<>(pitIdSet)); if (request.getPitIds().size() == 1 && "_all".equals(request.getPitIds().get(0))) { pitService.getAllPits(ActionListener.wrap(response -> { request.clearAndSetPitIds(response.getPitInfos().stream().map(ListPitInfo::getPitId).collect(Collectors.toList())); @@ -118,7 +115,9 @@ protected void doExecute(Task task, PitSegmentsRequest request, ActionListener iterators = new ArrayList<>(); - for (String pitId : request.getPitIds()) { + // remove duplicates from the request + Set uniquePitIds = new LinkedHashSet<>(request.getPitIds()); + for (String pitId : uniquePitIds) { SearchContextId searchContext = decode(namedWriteableRegistry, pitId); for (Map.Entry entry : searchContext.shards().entrySet()) { final SearchContextIdForNode perNode = entry.getValue(); diff --git a/server/src/main/java/org/opensearch/action/search/TransportDeletePitAction.java b/server/src/main/java/org/opensearch/action/search/TransportDeletePitAction.java index 77e650bb89ef3..217fcc1489df7 100644 --- a/server/src/main/java/org/opensearch/action/search/TransportDeletePitAction.java +++ b/server/src/main/java/org/opensearch/action/search/TransportDeletePitAction.java @@ -48,9 +48,6 @@ public TransportDeletePitAction( */ @Override protected void doExecute(Task task, DeletePitRequest request, ActionListener listener) { - // remove pit id duplicates from the request - Set pitIdSet = new LinkedHashSet<>(request.getPitIds()); - request.clearAndSetPitIds(new ArrayList<>(pitIdSet)); if (request.getPitIds().size() == 1 && "_all".equals(request.getPitIds().get(0))) { deleteAllPits(listener); } else { @@ -63,7 +60,9 @@ protected void doExecute(Task task, DeletePitRequest request, ActionListener listener, DeletePitRequest request) { Map> nodeToContextsMap = new HashMap<>(); - for (String pitId : request.getPitIds()) { + // remove duplicates from the request + Set uniquePitIds = new LinkedHashSet<>(request.getPitIds()); + for (String pitId : uniquePitIds) { SearchContextId contextId = SearchContextId.decode(namedWriteableRegistry, pitId); for (SearchContextIdForNode contextIdForNode : contextId.shards().values()) { PitSearchContextIdForNode pitSearchContext = new PitSearchContextIdForNode(pitId, contextIdForNode); From 27ec351b23caddc0120b21aacc0e35fc4566cea6 Mon Sep 17 00:00:00 2001 From: Bharathwaj G Date: Wed, 21 Jun 2023 13:18:54 +0530 Subject: [PATCH 05/11] AC poc changes by ajay Signed-off-by: Bharathwaj G --- .../AdmissionControllerPlugin.java | 34 +++++ .../AdmissionControllerRequestHandler.java | 69 ++++++++++ .../AdmissionControllerService.java | 129 ++++++++++++++++++ ...missionControllerTransportInterceptor.java | 35 +++++ .../common/network/NetworkModule.java | 11 +- .../org/opensearch/monitor/fs/FsInfo.java | 43 +++++- .../org/opensearch/monitor/fs/FsProbe.java | 2 + .../main/java/org/opensearch/node/Node.java | 19 ++- .../opensearch/search/query/QueryPhase.java | 4 + .../search/query/QuerySearchResult.java | 1 + .../cluster/node/stats/NodeStatsTests.java | 2 + .../monitor/fs/DeviceStatsTests.java | 5 +- 12 files changed, 348 insertions(+), 6 deletions(-) create mode 100644 server/src/main/java/org/opensearch/admissioncontroller/AdmissionControllerPlugin.java create mode 100644 server/src/main/java/org/opensearch/admissioncontroller/AdmissionControllerRequestHandler.java create mode 100644 server/src/main/java/org/opensearch/admissioncontroller/AdmissionControllerService.java create mode 100644 server/src/main/java/org/opensearch/admissioncontroller/AdmissionControllerTransportInterceptor.java diff --git a/server/src/main/java/org/opensearch/admissioncontroller/AdmissionControllerPlugin.java b/server/src/main/java/org/opensearch/admissioncontroller/AdmissionControllerPlugin.java new file mode 100644 index 0000000000000..85c1eef70a325 --- /dev/null +++ b/server/src/main/java/org/opensearch/admissioncontroller/AdmissionControllerPlugin.java @@ -0,0 +1,34 @@ + +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.admissioncontroller; + +import org.opensearch.common.io.stream.NamedWriteableRegistry; +import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.monitor.fs.FsService; +import org.opensearch.plugins.NetworkPlugin; +import org.opensearch.plugins.Plugin; +import org.opensearch.transport.TransportInterceptor; + +import java.util.ArrayList; +import java.util.List; + +public class AdmissionControllerPlugin extends Plugin implements NetworkPlugin { + + public AdmissionControllerService admissionControllerService; + public AdmissionControllerPlugin(AdmissionControllerService admissionControllerService) { + this.admissionControllerService = admissionControllerService; + } + @Override + public List getTransportInterceptors(NamedWriteableRegistry namedWriteableRegistry, ThreadContext threadContext) { + List interceptors = new ArrayList(1); + interceptors.add(new AdmissionControllerTransportInterceptor(null, this.admissionControllerService)); + return interceptors; + } +} diff --git a/server/src/main/java/org/opensearch/admissioncontroller/AdmissionControllerRequestHandler.java b/server/src/main/java/org/opensearch/admissioncontroller/AdmissionControllerRequestHandler.java new file mode 100644 index 0000000000000..9b774d3bb1727 --- /dev/null +++ b/server/src/main/java/org/opensearch/admissioncontroller/AdmissionControllerRequestHandler.java @@ -0,0 +1,69 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.admissioncontroller; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.common.util.concurrent.ThreadContext; +import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; +import org.opensearch.tasks.Task; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportChannel; +import org.opensearch.transport.TransportRequest; +import org.opensearch.transport.TransportRequestHandler; + +public class AdmissionControllerRequestHandler implements TransportRequestHandler { + private final String action; + private final TransportRequestHandler actualHandler; + private final ThreadPool threadPool; + protected final Logger log = LogManager.getLogger(this.getClass()); + public AdmissionControllerService admissionControllerService; + public AdmissionControllerRequestHandler(String action, TransportRequestHandler actualHandler, + ThreadPool threadPool, AdmissionControllerService admissionControllerService) { + super(); + this.action = action; + this.actualHandler = actualHandler; + this.threadPool = threadPool; + this.admissionControllerService = admissionControllerService; + } + + protected ThreadContext getThreadContext() { + if(threadPool == null) { + return null; + } + return threadPool.getThreadContext(); + } + + public boolean isSearchRequest() { + return this.action.startsWith("indices:data/read/search"); + } + + public boolean isIndexRequest(){ + return this.action.startsWith("indices:data/write"); + } + + @Override + public void messageReceived(T request, TransportChannel channel, Task task) throws Exception { + // Evaluate the requests here. + if (this.admissionControllerService.isIOInStress()) { + log.info("Admission controller service responded with IO is in stress state"); + if (this.isSearchRequest()){ + channel.sendResponse(new OpenSearchRejectedExecutionException("Execution Rejected due to high IO usage")); + return; + } + }else { + log.info("Admission controller service responded with IO is in healthy state"); + } + this.messageReceivedDecorate(request, actualHandler, channel, task); + } + + protected void messageReceivedDecorate(final T request, final TransportRequestHandler actualHandler, final TransportChannel transportChannel, Task task) throws Exception { + actualHandler.messageReceived(request, transportChannel, task); + } +} diff --git a/server/src/main/java/org/opensearch/admissioncontroller/AdmissionControllerService.java b/server/src/main/java/org/opensearch/admissioncontroller/AdmissionControllerService.java new file mode 100644 index 0000000000000..688cb35d19da1 --- /dev/null +++ b/server/src/main/java/org/opensearch/admissioncontroller/AdmissionControllerService.java @@ -0,0 +1,129 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.admissioncontroller; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.common.component.AbstractLifecycleComponent; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.env.NodeEnvironment; +import org.opensearch.monitor.fs.FsInfo; +import org.opensearch.monitor.fs.FsService; +import org.opensearch.threadpool.Scheduler; +import org.opensearch.threadpool.ThreadPool; + +import java.io.IOException; + +import java.util.*; +import java.util.concurrent.atomic.AtomicInteger; + +public class AdmissionControllerService extends AbstractLifecycleComponent { + + private static final Logger logger = LogManager.getLogger(AdmissionControllerService.class); + private final ThreadPool threadPool; + private final TimeValue refreshInterval; + private volatile Scheduler.Cancellable scheduledFuture; + private Map previousIOTimeMap; + private final Map> deviceIOUsage; + + private static final double IO_MAX_USAGE = 30; + + private static final double IO_THRESHOLD_WINDOW = 4; + private final AtomicInteger ioLimitBreachedCount; + + private final FsService fsService; + + public static final Setting REFRESH_INTERVAL_SETTING = Setting.timeSetting( + "admissioncontroller.io.monitor.refresh_interval", + TimeValue.timeValueSeconds(10), + TimeValue.timeValueMillis(1), + Setting.Property.NodeScope + ); + + public AdmissionControllerService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool, NodeEnvironment nodeEnv, FsService fsService){ + this.threadPool = threadPool; + this.fsService = fsService; + this.refreshInterval = REFRESH_INTERVAL_SETTING.get(settings); + this.previousIOTimeMap = new HashMap<>(); + this.deviceIOUsage = new HashMap<>(); + this.ioLimitBreachedCount = new AtomicInteger(0); + } + @Override + protected void doStart() { + this.scheduledFuture = threadPool.scheduleWithFixedDelay(new IOMonitor(this.fsService), refreshInterval, ThreadPool.Names.GENERIC); + } + + @Override + protected void doStop() { + this.scheduledFuture.cancel(); + } + + public boolean isIOInStress(){ + return this.ioLimitBreachedCount.get() >= IO_THRESHOLD_WINDOW; + } + + @Override + protected void doClose() throws IOException { + + } + + class IOMonitor implements Runnable { + + private final FsService fsService; + + IOMonitor(FsService fsService) { + this.fsService = fsService; + } + + @Override + public void run() { + try{ + monitorIOUtilisation(); + }catch (Exception e){ + logger.error("Exception on the getting IO utilisation"); + } + } + + private void monitorIOUtilisation() { + logger.info("IO stats is triggered"); + Map currentIOTimeMap = new HashMap<>(); + for (FsInfo.DeviceStats devicesStat : this.fsService.stats().getIoStats().getDevicesStats()) { + logger.info("Device Id: " + devicesStat.getDeviceName() + "; IO time: " + devicesStat.getCurrentIOTime()); + if (previousIOTimeMap.containsKey(devicesStat.getDeviceName())){ + long ioSpentTime = devicesStat.getCurrentIOTime() - previousIOTimeMap.get(devicesStat.getDeviceName()); + double ioUsePercent = (double) (ioSpentTime * 100) / (10 * 1000); + Queue ioUsageQueue; + if (deviceIOUsage.containsKey(devicesStat.getDeviceName())) { + ioUsageQueue = deviceIOUsage.get(devicesStat.getDeviceName()); + if (ioUsageQueue.size() == 10){ + double oldIOUsePercent = ioUsageQueue.remove(); + if (oldIOUsePercent > IO_MAX_USAGE){ + ioLimitBreachedCount.decrementAndGet(); + } + } + }else { + ioUsageQueue = new LinkedList<>(); + } + ioUsageQueue.add(ioUsePercent); + logger.info("Queue Details: " + ioUsageQueue); + if (ioUsePercent > IO_MAX_USAGE){ + ioLimitBreachedCount.incrementAndGet(); + } + deviceIOUsage.put(devicesStat.getDeviceName(), ioUsageQueue); + } + currentIOTimeMap.put(devicesStat.getDeviceName(), devicesStat.getCurrentIOTime()); + } + previousIOTimeMap = currentIOTimeMap; + } + } +} + diff --git a/server/src/main/java/org/opensearch/admissioncontroller/AdmissionControllerTransportInterceptor.java b/server/src/main/java/org/opensearch/admissioncontroller/AdmissionControllerTransportInterceptor.java new file mode 100644 index 0000000000000..5cc497d3cc641 --- /dev/null +++ b/server/src/main/java/org/opensearch/admissioncontroller/AdmissionControllerTransportInterceptor.java @@ -0,0 +1,35 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.admissioncontroller; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.monitor.fs.FsService; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportInterceptor; +import org.opensearch.transport.TransportRequest; +import org.opensearch.transport.TransportRequestHandler; + +public class AdmissionControllerTransportInterceptor implements TransportInterceptor { + + protected final Logger log = LogManager.getLogger(this.getClass()); + protected final ThreadPool threadPool; + public AdmissionControllerService admissionControllerService; + + public AdmissionControllerTransportInterceptor(final ThreadPool threadPool, AdmissionControllerService admissionControllerService) { + this.threadPool = threadPool; + this.admissionControllerService = admissionControllerService; + } + + @Override + public TransportRequestHandler interceptHandler(String action, String executor, boolean forceExecution, + TransportRequestHandler actualHandler) { + return new AdmissionControllerRequestHandler(action, actualHandler, threadPool, admissionControllerService); + } +} diff --git a/server/src/main/java/org/opensearch/common/network/NetworkModule.java b/server/src/main/java/org/opensearch/common/network/NetworkModule.java index 985697d46d39e..89cf51b22dafb 100644 --- a/server/src/main/java/org/opensearch/common/network/NetworkModule.java +++ b/server/src/main/java/org/opensearch/common/network/NetworkModule.java @@ -33,6 +33,8 @@ package org.opensearch.common.network; import org.opensearch.action.support.replication.ReplicationTask; +import org.opensearch.admissioncontroller.AdmissionControllerPlugin; +import org.opensearch.admissioncontroller.AdmissionControllerService; import org.opensearch.cluster.routing.allocation.command.AllocateEmptyPrimaryAllocationCommand; import org.opensearch.cluster.routing.allocation.command.AllocateReplicaAllocationCommand; import org.opensearch.cluster.routing.allocation.command.AllocateStalePrimaryAllocationCommand; @@ -131,6 +133,7 @@ public final class NetworkModule { private final Map> transportFactories = new HashMap<>(); private final Map> transportHttpFactories = new HashMap<>(); private final List transportIntercetors = new ArrayList<>(); + private final AdmissionControllerPlugin admissionControllerPlugin; /** * Creates a network module that custom networking classes can be plugged into. @@ -147,7 +150,8 @@ public NetworkModule( NamedXContentRegistry xContentRegistry, NetworkService networkService, HttpServerTransport.Dispatcher dispatcher, - ClusterSettings clusterSettings + ClusterSettings clusterSettings, + AdmissionControllerService admissionControllerService ) { this.settings = settings; for (NetworkPlugin plugin : plugins) { @@ -184,6 +188,11 @@ public NetworkModule( registerTransportInterceptor(interceptor); } } + admissionControllerPlugin = new AdmissionControllerPlugin(admissionControllerService); + List transportInterceptors = admissionControllerPlugin.getTransportInterceptors(namedWriteableRegistry, threadPool.getThreadContext()); + for (TransportInterceptor interceptor : transportInterceptors) { + registerTransportInterceptor(interceptor); + } } /** Adds a transport implementation that can be selected by setting {@link #TRANSPORT_TYPE_KEY}. */ diff --git a/server/src/main/java/org/opensearch/monitor/fs/FsInfo.java b/server/src/main/java/org/opensearch/monitor/fs/FsInfo.java index 1aa7e00ed86c1..13e522387dad0 100644 --- a/server/src/main/java/org/opensearch/monitor/fs/FsInfo.java +++ b/server/src/main/java/org/opensearch/monitor/fs/FsInfo.java @@ -235,6 +235,8 @@ public static class DeviceStats implements Writeable, ToXContentFragment { final long previousWritesCompleted; final long currentSectorsWritten; final long previousSectorsWritten; + final long currentIOTime; + final long previousIOTime; public DeviceStats( final int majorDeviceNumber, @@ -244,6 +246,7 @@ public DeviceStats( final long currentSectorsRead, final long currentWritesCompleted, final long currentSectorsWritten, + final long currentIOTime, final DeviceStats previousDeviceStats ) { this( @@ -257,7 +260,9 @@ public DeviceStats( currentSectorsRead, previousDeviceStats != null ? previousDeviceStats.currentSectorsRead : -1, currentWritesCompleted, - previousDeviceStats != null ? previousDeviceStats.currentWritesCompleted : -1 + previousDeviceStats != null ? previousDeviceStats.currentWritesCompleted : -1, + currentIOTime, + previousDeviceStats != null ? previousDeviceStats.currentIOTime : -1 ); } @@ -272,7 +277,9 @@ private DeviceStats( final long currentSectorsRead, final long previousSectorsRead, final long currentWritesCompleted, - final long previousWritesCompleted + final long previousWritesCompleted, + final long currentIOTime, + final long previousIOTime ) { this.majorDeviceNumber = majorDeviceNumber; this.minorDeviceNumber = minorDeviceNumber; @@ -285,6 +292,8 @@ private DeviceStats( this.previousSectorsRead = previousSectorsRead; this.currentSectorsWritten = currentSectorsWritten; this.previousSectorsWritten = previousSectorsWritten; + this.currentIOTime = currentIOTime; + this.previousIOTime = previousIOTime; } public DeviceStats(StreamInput in) throws IOException { @@ -299,6 +308,8 @@ public DeviceStats(StreamInput in) throws IOException { previousSectorsRead = in.readLong(); currentSectorsWritten = in.readLong(); previousSectorsWritten = in.readLong(); + currentIOTime = in.readLong(); + previousIOTime = in.readLong(); } @Override @@ -314,6 +325,8 @@ public void writeTo(StreamOutput out) throws IOException { out.writeLong(previousSectorsRead); out.writeLong(currentSectorsWritten); out.writeLong(previousSectorsWritten); + out.writeLong(currentIOTime); + out.writeLong(previousIOTime); } public long operations() { @@ -346,6 +359,20 @@ public long writeKilobytes() { return (currentSectorsWritten - previousSectorsWritten) / 2; } + public long ioTimeInMillis() { + if (previousIOTime == -1) return -1; + + return (currentIOTime - previousIOTime); + } + + public long getCurrentIOTime() { + return this.currentIOTime; + } + + public String getDeviceName() { + return this.deviceName; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.field("device_name", deviceName); @@ -354,6 +381,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(IoStats.WRITE_OPERATIONS, writeOperations()); builder.field(IoStats.READ_KILOBYTES, readKilobytes()); builder.field(IoStats.WRITE_KILOBYTES, writeKilobytes()); + builder.field(IoStats.IO_TIMEMS, ioTimeInMillis()); return builder; } @@ -371,6 +399,7 @@ public static class IoStats implements Writeable, ToXContentFragment { private static final String WRITE_OPERATIONS = "write_operations"; private static final String READ_KILOBYTES = "read_kilobytes"; private static final String WRITE_KILOBYTES = "write_kilobytes"; + private static final String IO_TIMEMS = "io_time_in_millis"; final DeviceStats[] devicesStats; final long totalOperations; @@ -378,6 +407,7 @@ public static class IoStats implements Writeable, ToXContentFragment { final long totalWriteOperations; final long totalReadKilobytes; final long totalWriteKilobytes; + final long totalIOTimeInMillis; public IoStats(final DeviceStats[] devicesStats) { this.devicesStats = devicesStats; @@ -386,18 +416,21 @@ public IoStats(final DeviceStats[] devicesStats) { long totalWriteOperations = 0; long totalReadKilobytes = 0; long totalWriteKilobytes = 0; + long totalIOTimeInMillis = 0; for (DeviceStats deviceStats : devicesStats) { totalOperations += deviceStats.operations() != -1 ? deviceStats.operations() : 0; totalReadOperations += deviceStats.readOperations() != -1 ? deviceStats.readOperations() : 0; totalWriteOperations += deviceStats.writeOperations() != -1 ? deviceStats.writeOperations() : 0; totalReadKilobytes += deviceStats.readKilobytes() != -1 ? deviceStats.readKilobytes() : 0; totalWriteKilobytes += deviceStats.writeKilobytes() != -1 ? deviceStats.writeKilobytes() : 0; + totalIOTimeInMillis += deviceStats.ioTimeInMillis() != -1 ? deviceStats.ioTimeInMillis() : 0; } this.totalOperations = totalOperations; this.totalReadOperations = totalReadOperations; this.totalWriteOperations = totalWriteOperations; this.totalReadKilobytes = totalReadKilobytes; this.totalWriteKilobytes = totalWriteKilobytes; + this.totalIOTimeInMillis = totalIOTimeInMillis; } public IoStats(StreamInput in) throws IOException { @@ -412,6 +445,7 @@ public IoStats(StreamInput in) throws IOException { this.totalWriteOperations = in.readLong(); this.totalReadKilobytes = in.readLong(); this.totalWriteKilobytes = in.readLong(); + this.totalIOTimeInMillis = in.readLong(); } @Override @@ -425,6 +459,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeLong(totalWriteOperations); out.writeLong(totalReadKilobytes); out.writeLong(totalWriteKilobytes); + out.writeLong(totalIOTimeInMillis); } public DeviceStats[] getDevicesStats() { @@ -438,6 +473,9 @@ public long getTotalOperations() { public long getTotalReadOperations() { return totalReadOperations; } + public long getTotalIOTimeMillis() { + return totalIOTimeInMillis; + } public long getTotalWriteOperations() { return totalWriteOperations; @@ -468,6 +506,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(WRITE_OPERATIONS, totalWriteOperations); builder.field(READ_KILOBYTES, totalReadKilobytes); builder.field(WRITE_KILOBYTES, totalWriteKilobytes); + builder.field(IO_TIMEMS, totalIOTimeInMillis); builder.endObject(); } return builder; diff --git a/server/src/main/java/org/opensearch/monitor/fs/FsProbe.java b/server/src/main/java/org/opensearch/monitor/fs/FsProbe.java index 50d1d981f3c98..55472aa2d3e35 100644 --- a/server/src/main/java/org/opensearch/monitor/fs/FsProbe.java +++ b/server/src/main/java/org/opensearch/monitor/fs/FsProbe.java @@ -123,6 +123,7 @@ final FsInfo.IoStats ioStats(final Set> devicesNumbers, final long sectorsRead = Long.parseLong(fields[5]); final long writesCompleted = Long.parseLong(fields[7]); final long sectorsWritten = Long.parseLong(fields[9]); + final long ioTime = Long.parseLong(fields[12]); final FsInfo.DeviceStats deviceStats = new FsInfo.DeviceStats( majorDeviceNumber, minorDeviceNumber, @@ -131,6 +132,7 @@ final FsInfo.IoStats ioStats(final Set> devicesNumbers, sectorsRead, writesCompleted, sectorsWritten, + ioTime, deviceMap.get(Tuple.tuple(majorDeviceNumber, minorDeviceNumber)) ); devicesStats.add(deviceStats); diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index c51eff57de68c..7fca47da5d98b 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -36,6 +36,7 @@ import org.apache.logging.log4j.Logger; import org.apache.lucene.util.Constants; import org.opensearch.ExceptionsHelper; +import org.opensearch.admissioncontroller.AdmissionControllerService; import org.opensearch.common.SetOnce; import org.opensearch.common.settings.SettingsException; import org.opensearch.common.unit.ByteSizeUnit; @@ -808,7 +809,13 @@ protected Node( modules.add(actionModule); final RestController restController = actionModule.getRestController(); - + final AdmissionControllerService admissionControllerService = new AdmissionControllerService( + settings, + clusterService.getClusterSettings(), + threadPool, + nodeEnvironment, + monitorService.fsService() + ); final NetworkModule networkModule = new NetworkModule( settings, pluginsService.filterPlugins(NetworkPlugin.class), @@ -820,7 +827,8 @@ protected Node( xContentRegistry, networkService, restController, - clusterService.getClusterSettings() + clusterService.getClusterSettings(), + admissionControllerService ); Collection>> indexTemplateMetadataUpgraders = pluginsService.filterPlugins( Plugin.class @@ -1089,6 +1097,7 @@ protected Node( b.bind(ClusterInfoService.class).toInstance(clusterInfoService); b.bind(SnapshotsInfoService.class).toInstance(snapshotsInfoService); b.bind(GatewayMetaState.class).toInstance(gatewayMetaState); + b.bind(AdmissionControllerService.class).toInstance(admissionControllerService); b.bind(Discovery.class).toInstance(discoveryModule.getDiscovery()); { processRecoverySettings(settingsModule.getClusterSettings(), recoverySettings); @@ -1236,6 +1245,7 @@ public Node start() throws NodeValidationException { nodeService.getMonitorService().start(); nodeService.getSearchBackpressureService().start(); nodeService.getTaskCancellationMonitoringService().start(); + injector.getInstance(AdmissionControllerService.class).start(); final ClusterService clusterService = injector.getInstance(ClusterService.class); @@ -1394,9 +1404,11 @@ private Node stop() { injector.getInstance(GatewayService.class).stop(); injector.getInstance(SearchService.class).stop(); injector.getInstance(TransportService.class).stop(); + injector.getInstance(AdmissionControllerService.class).stop(); nodeService.getTaskCancellationMonitoringService().stop(); pluginLifecycleComponents.forEach(LifecycleComponent::stop); + // we should stop this last since it waits for resources to get released // if we had scroll searchers etc or recovery going on we wait for to finish. injector.getInstance(IndicesService.class).stop(); @@ -1460,6 +1472,9 @@ public synchronized void close() throws IOException { toClose.add(injector.getInstance(TransportService.class)); toClose.add(nodeService.getTaskCancellationMonitoringService()); + toClose.add(() -> stopWatch.stop().start("admissioncontroller")); + toClose.add(injector.getInstance(AdmissionControllerService.class)); + for (LifecycleComponent plugin : pluginLifecycleComponents) { toClose.add(() -> stopWatch.stop().start("plugin(" + plugin.getClass().getName() + ")")); toClose.add(plugin); diff --git a/server/src/main/java/org/opensearch/search/query/QueryPhase.java b/server/src/main/java/org/opensearch/search/query/QueryPhase.java index 069b410280d63..1ec42e8a7d2a8 100644 --- a/server/src/main/java/org/opensearch/search/query/QueryPhase.java +++ b/server/src/main/java/org/opensearch/search/query/QueryPhase.java @@ -52,6 +52,7 @@ import org.opensearch.common.lucene.search.TopDocsAndMaxScore; import org.opensearch.common.util.concurrent.EWMATrackingThreadPoolExecutor; import org.opensearch.lucene.queries.SearchAfterSortedDocQuery; +import org.opensearch.monitor.process.ProcessStats; import org.opensearch.search.DocValueFormat; import org.opensearch.search.SearchContextSourcePrinter; import org.opensearch.search.SearchService; @@ -291,6 +292,9 @@ static boolean executeInternal(SearchContext searchContext, QueryPhaseSearcher q final EWMATrackingThreadPoolExecutor rExecutor = (EWMATrackingThreadPoolExecutor) executor; queryResult.nodeQueueSize(rExecutor.getCurrentQueueSize()); queryResult.serviceTimeEWMA((long) rExecutor.getTaskExecutionEWMA()); + + ProcessStats.Cpu cpu = new ProcessStats.Cpu(getProcessCpuPercent(), getProcessCpuTotalTime()); + ProcessStats.Mem mem = new ProcessStats.Mem(getTotalVirtualMemorySize()); } return shouldRescore; diff --git a/server/src/main/java/org/opensearch/search/query/QuerySearchResult.java b/server/src/main/java/org/opensearch/search/query/QuerySearchResult.java index a0c2970625472..21ca5a42ae8f5 100644 --- a/server/src/main/java/org/opensearch/search/query/QuerySearchResult.java +++ b/server/src/main/java/org/opensearch/search/query/QuerySearchResult.java @@ -86,6 +86,7 @@ public final class QuerySearchResult extends SearchPhaseResult { private long serviceTimeEWMA = -1; private int nodeQueueSize = -1; + private private final boolean isNull; public QuerySearchResult() { diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java index d99b93b780140..23920ee4d44c5 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java @@ -584,6 +584,7 @@ public static NodeStats createNodeStats() { randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), + randomNonNegativeLong(), null ); deviceStatsArray[i] = new FsInfo.DeviceStats( @@ -594,6 +595,7 @@ public static NodeStats createNodeStats() { randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), + randomNonNegativeLong(), previousDeviceStats ); } diff --git a/server/src/test/java/org/opensearch/monitor/fs/DeviceStatsTests.java b/server/src/test/java/org/opensearch/monitor/fs/DeviceStatsTests.java index 0fd039b84e887..8de3417a01c9c 100644 --- a/server/src/test/java/org/opensearch/monitor/fs/DeviceStatsTests.java +++ b/server/src/test/java/org/opensearch/monitor/fs/DeviceStatsTests.java @@ -46,7 +46,7 @@ public void testDeviceStats() { final int sectorsRead = randomIntBetween(8 * readsCompleted, 16 * readsCompleted); final int writesCompleted = randomIntBetween(1, 1 << 16); final int sectorsWritten = randomIntBetween(8 * writesCompleted, 16 * writesCompleted); - + final int ioTime = randomIntBetween(1, 1 << 16); FsInfo.DeviceStats previous = new FsInfo.DeviceStats( majorDeviceNumber, minorDeviceNumber, @@ -55,6 +55,7 @@ public void testDeviceStats() { sectorsRead, writesCompleted, sectorsWritten, + ioTime, null ); FsInfo.DeviceStats current = new FsInfo.DeviceStats( @@ -65,6 +66,7 @@ public void testDeviceStats() { sectorsRead + 16384, writesCompleted + 2048, sectorsWritten + 32768, + ioTime + 128, previous ); assertThat(current.operations(), equalTo(1024L + 2048L)); @@ -72,6 +74,7 @@ public void testDeviceStats() { assertThat(current.writeOperations(), equalTo(2048L)); assertThat(current.readKilobytes(), equalTo(16384L / 2)); assertThat(current.writeKilobytes(), equalTo(32768L / 2)); + assertThat(current.ioTimeInMillis(), equalTo(128L)); } } From e4be79d1d0667061c07150c9cd89292479b0c7fd Mon Sep 17 00:00:00 2001 From: Bharathwaj G Date: Wed, 21 Jun 2023 17:47:48 +0530 Subject: [PATCH 06/11] adding node perf stats--signoff --- .../search/SearchExecutionStatsCollector.java | 4 +- .../AdmissionControllerService.java | 27 +++++++++++++ .../admissioncontroller/NodePerfStats.java | 38 +++++++++++++++++++ .../node/ResponseCollectorService.java | 12 ++++-- .../opensearch/search/query/QueryPhase.java | 9 +++-- .../search/query/QuerySearchResult.java | 16 +++++++- 6 files changed, 97 insertions(+), 9 deletions(-) create mode 100644 server/src/main/java/org/opensearch/admissioncontroller/NodePerfStats.java diff --git a/server/src/main/java/org/opensearch/action/search/SearchExecutionStatsCollector.java b/server/src/main/java/org/opensearch/action/search/SearchExecutionStatsCollector.java index 7082e33dfd5c5..1dfe1121a7e33 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchExecutionStatsCollector.java +++ b/server/src/main/java/org/opensearch/action/search/SearchExecutionStatsCollector.java @@ -33,6 +33,7 @@ package org.opensearch.action.search; import org.opensearch.action.ActionListener; +import org.opensearch.admissioncontroller.NodePerfStats; import org.opensearch.node.ResponseCollectorService; import org.opensearch.search.SearchPhaseResult; import org.opensearch.search.fetch.QueryFetchSearchResult; @@ -91,9 +92,10 @@ public void onResponse(SearchPhaseResult response) { final long serviceTimeEWMA = queryResult.serviceTimeEWMA(); final int queueSize = queryResult.nodeQueueSize(); final long responseDuration = System.nanoTime() - startNanos; + final NodePerfStats nodePerfStats = queryResult.getNodePerfStats(); // EWMA/queue size may be -1 if the query node doesn't support capturing it if (serviceTimeEWMA > 0 && queueSize >= 0) { - collector.addNodeStatistics(nodeId, queueSize, responseDuration, serviceTimeEWMA); + collector.addNodeStatistics(nodeId, queueSize, responseDuration, serviceTimeEWMA, nodePerfStats); } } listener.onResponse(response); diff --git a/server/src/main/java/org/opensearch/admissioncontroller/AdmissionControllerService.java b/server/src/main/java/org/opensearch/admissioncontroller/AdmissionControllerService.java index 688cb35d19da1..c284b7a8b0223 100644 --- a/server/src/main/java/org/opensearch/admissioncontroller/AdmissionControllerService.java +++ b/server/src/main/java/org/opensearch/admissioncontroller/AdmissionControllerService.java @@ -10,6 +10,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.opensearch.common.ExponentiallyWeightedMovingAverage; import org.opensearch.common.component.AbstractLifecycleComponent; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Setting; @@ -18,6 +19,8 @@ import org.opensearch.env.NodeEnvironment; import org.opensearch.monitor.fs.FsInfo; import org.opensearch.monitor.fs.FsService; +import org.opensearch.monitor.jvm.JvmStats; +import org.opensearch.monitor.process.ProcessProbe; import org.opensearch.threadpool.Scheduler; import org.opensearch.threadpool.ThreadPool; @@ -41,6 +44,11 @@ public class AdmissionControllerService extends AbstractLifecycleComponent { private final AtomicInteger ioLimitBreachedCount; private final FsService fsService; + double EWMA_ALPHA = 0.3; + private final ExponentiallyWeightedMovingAverage cpuExecutionEWMA; + private final ExponentiallyWeightedMovingAverage memoryExecutionEWMA; + private final ExponentiallyWeightedMovingAverage ioExecutionEWMA; + public static final Setting REFRESH_INTERVAL_SETTING = Setting.timeSetting( "admissioncontroller.io.monitor.refresh_interval", @@ -56,6 +64,9 @@ public AdmissionControllerService(Settings settings, ClusterSettings clusterSett this.previousIOTimeMap = new HashMap<>(); this.deviceIOUsage = new HashMap<>(); this.ioLimitBreachedCount = new AtomicInteger(0); + this.cpuExecutionEWMA = new ExponentiallyWeightedMovingAverage(EWMA_ALPHA, 0); + this.memoryExecutionEWMA = new ExponentiallyWeightedMovingAverage(EWMA_ALPHA, 0); + this.ioExecutionEWMA = new ExponentiallyWeightedMovingAverage(EWMA_ALPHA, 0); } @Override protected void doStart() { @@ -71,6 +82,14 @@ public boolean isIOInStress(){ return this.ioLimitBreachedCount.get() >= IO_THRESHOLD_WINDOW; } + public double getCPUEWMA() { + return cpuExecutionEWMA.getAverage(); + } + + public double getMemoryEWMA() { + return memoryExecutionEWMA.getAverage(); + } + @Override protected void doClose() throws IOException { @@ -88,11 +107,19 @@ class IOMonitor implements Runnable { public void run() { try{ monitorIOUtilisation(); + monitorCpuUtilisation(); + monitorMemoryUtilisation(); }catch (Exception e){ logger.error("Exception on the getting IO utilisation"); } } + private void monitorCpuUtilisation() { + cpuExecutionEWMA.addValue( ProcessProbe.getInstance().getProcessCpuPercent() / 100.0); + } + private void monitorMemoryUtilisation() { + memoryExecutionEWMA.addValue(JvmStats.jvmStats().getMem().getHeapUsedPercent() / 100.0); + } private void monitorIOUtilisation() { logger.info("IO stats is triggered"); Map currentIOTimeMap = new HashMap<>(); diff --git a/server/src/main/java/org/opensearch/admissioncontroller/NodePerfStats.java b/server/src/main/java/org/opensearch/admissioncontroller/NodePerfStats.java new file mode 100644 index 0000000000000..287f42cc7a210 --- /dev/null +++ b/server/src/main/java/org/opensearch/admissioncontroller/NodePerfStats.java @@ -0,0 +1,38 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.admissioncontroller; + +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; + +import java.io.IOException; + +public class NodePerfStats { + public double cpuPercentAvg; + public double memoryPercentAvg; + public double ioPercentAvg; + + public NodePerfStats(StreamInput in) throws IOException { + this.cpuPercentAvg = in.readDouble(); + this.memoryPercentAvg = in.readDouble(); + this.ioPercentAvg = in.readDouble(); + } + + public NodePerfStats(double cpuPercentAvg, double memoryPercentAvg, double ioPercentAvg) { + this.cpuPercentAvg = cpuPercentAvg; + this.memoryPercentAvg = memoryPercentAvg; + this.ioPercentAvg = ioPercentAvg; + } + + public void writeTo(StreamOutput out) throws IOException { + out.writeDouble(cpuPercentAvg); + out.writeDouble(memoryPercentAvg); + out.writeDouble(ioPercentAvg); + } +} diff --git a/server/src/main/java/org/opensearch/node/ResponseCollectorService.java b/server/src/main/java/org/opensearch/node/ResponseCollectorService.java index fd246a4ccb25e..f671a07d1b5b6 100644 --- a/server/src/main/java/org/opensearch/node/ResponseCollectorService.java +++ b/server/src/main/java/org/opensearch/node/ResponseCollectorService.java @@ -32,6 +32,7 @@ package org.opensearch.node; +import org.opensearch.admissioncontroller.NodePerfStats; import org.opensearch.cluster.ClusterChangedEvent; import org.opensearch.cluster.ClusterStateListener; import org.opensearch.cluster.node.DiscoveryNode; @@ -79,16 +80,18 @@ void removeNode(String nodeId) { nodeIdToStats.remove(nodeId); } - public void addNodeStatistics(String nodeId, int queueSize, long responseTimeNanos, long avgServiceTimeNanos) { + public void addNodeStatistics(String nodeId, int queueSize, long responseTimeNanos, long avgServiceTimeNanos, + NodePerfStats nodePerfStats) { nodeIdToStats.compute(nodeId, (id, ns) -> { if (ns == null) { ExponentiallyWeightedMovingAverage queueEWMA = new ExponentiallyWeightedMovingAverage(ALPHA, queueSize); ExponentiallyWeightedMovingAverage responseEWMA = new ExponentiallyWeightedMovingAverage(ALPHA, responseTimeNanos); - return new NodeStatistics(nodeId, queueEWMA, responseEWMA, avgServiceTimeNanos); + return new NodeStatistics(nodeId, queueEWMA, responseEWMA, avgServiceTimeNanos, nodePerfStats); } else { ns.queueSize.addValue((double) queueSize); ns.responseTime.addValue((double) responseTimeNanos); ns.serviceTime = avgServiceTimeNanos; + ns.nodePerfStats = nodePerfStats; return ns; } }); @@ -229,17 +232,20 @@ private static class NodeStatistics { final ExponentiallyWeightedMovingAverage queueSize; final ExponentiallyWeightedMovingAverage responseTime; double serviceTime; + NodePerfStats nodePerfStats; NodeStatistics( String nodeId, ExponentiallyWeightedMovingAverage queueSizeEWMA, ExponentiallyWeightedMovingAverage responseTimeEWMA, - double serviceTimeEWMA + double serviceTimeEWMA, + NodePerfStats nodePerfStats ) { this.nodeId = nodeId; this.queueSize = queueSizeEWMA; this.responseTime = responseTimeEWMA; this.serviceTime = serviceTimeEWMA; + this.nodePerfStats = nodePerfStats; } } } diff --git a/server/src/main/java/org/opensearch/search/query/QueryPhase.java b/server/src/main/java/org/opensearch/search/query/QueryPhase.java index 1ec42e8a7d2a8..92674bcc66408 100644 --- a/server/src/main/java/org/opensearch/search/query/QueryPhase.java +++ b/server/src/main/java/org/opensearch/search/query/QueryPhase.java @@ -47,6 +47,7 @@ import org.apache.lucene.search.TopDocs; import org.apache.lucene.search.TotalHits; import org.opensearch.action.search.SearchShardTask; +import org.opensearch.admissioncontroller.NodePerfStats; import org.opensearch.common.Booleans; import org.opensearch.common.lucene.Lucene; import org.opensearch.common.lucene.search.TopDocsAndMaxScore; @@ -292,9 +293,11 @@ static boolean executeInternal(SearchContext searchContext, QueryPhaseSearcher q final EWMATrackingThreadPoolExecutor rExecutor = (EWMATrackingThreadPoolExecutor) executor; queryResult.nodeQueueSize(rExecutor.getCurrentQueueSize()); queryResult.serviceTimeEWMA((long) rExecutor.getTaskExecutionEWMA()); - - ProcessStats.Cpu cpu = new ProcessStats.Cpu(getProcessCpuPercent(), getProcessCpuTotalTime()); - ProcessStats.Mem mem = new ProcessStats.Mem(getTotalVirtualMemorySize()); + LOGGER.info("Adding node perf stats"); + NodePerfStats nodePerfStats = new NodePerfStats( + 95.0, 95.0, 95.0 + ); + queryResult.nodePerfStats(nodePerfStats); } return shouldRescore; diff --git a/server/src/main/java/org/opensearch/search/query/QuerySearchResult.java b/server/src/main/java/org/opensearch/search/query/QuerySearchResult.java index 21ca5a42ae8f5..cace17e9f1cd8 100644 --- a/server/src/main/java/org/opensearch/search/query/QuerySearchResult.java +++ b/server/src/main/java/org/opensearch/search/query/QuerySearchResult.java @@ -34,6 +34,7 @@ import org.apache.lucene.search.FieldDoc; import org.apache.lucene.search.TotalHits; +import org.opensearch.admissioncontroller.NodePerfStats; import org.opensearch.common.io.stream.DelayableWriteable; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; @@ -85,8 +86,7 @@ public final class QuerySearchResult extends SearchPhaseResult { private boolean hasProfileResults; private long serviceTimeEWMA = -1; private int nodeQueueSize = -1; - - private + private NodePerfStats nodePerfStats; private final boolean isNull; public QuerySearchResult() { @@ -312,11 +312,20 @@ public long serviceTimeEWMA() { return this.serviceTimeEWMA; } + public NodePerfStats getNodePerfStats() { + return this.nodePerfStats; + } + public QuerySearchResult serviceTimeEWMA(long serviceTimeEWMA) { this.serviceTimeEWMA = serviceTimeEWMA; return this; } + public QuerySearchResult nodePerfStats(NodePerfStats nodePerfStats) { + this.nodePerfStats = nodePerfStats; + return this; + } + public int nodeQueueSize() { return this.nodeQueueSize; } @@ -363,6 +372,7 @@ public void readFromWithId(ShardSearchContextId id, StreamInput in) throws IOExc hasProfileResults = profileShardResults != null; serviceTimeEWMA = in.readZLong(); nodeQueueSize = in.readInt(); + nodePerfStats = new NodePerfStats(in); setShardSearchRequest(in.readOptionalWriteable(ShardSearchRequest::new)); setRescoreDocIds(new RescoreDocIds(in)); } @@ -405,6 +415,8 @@ public void writeToNoId(StreamOutput out) throws IOException { out.writeOptionalWriteable(profileShardResults); out.writeZLong(serviceTimeEWMA); out.writeInt(nodeQueueSize); + nodePerfStats.writeTo(out); + out.writeOptionalWriteable(getShardSearchRequest()); getRescoreDocIds().writeTo(out); } From 2c98a0de19d98bec233e8e32a88ea07c6b19b387 Mon Sep 17 00:00:00 2001 From: Bharathwaj G Date: Tue, 27 Jun 2023 12:29:36 +0530 Subject: [PATCH 07/11] adding perf stats to ranking Signed-off-by: Bharathwaj G --- .../action/bulk/TransportBulkAction.java | 9 ++++ .../action/bulk/TransportShardBulkAction.java | 31 ++++++++++--- ...TransportFieldCapabilitiesIndexAction.java | 2 +- .../action/search/TransportSearchAction.java | 3 +- .../TransportReplicationAction.java | 5 +-- .../AdmissionControllerPlugin.java | 3 ++ .../AdmissionControllerRequestHandler.java | 7 ++- .../AdmissionControllerService.java | 5 +++ ...missionControllerTransportInterceptor.java | 3 ++ .../admissioncontroller/NodePerfStats.java | 6 ++- .../admissioncontroller/package-info.java | 12 ++++++ .../routing/IndexShardRoutingTable.java | 14 ++++-- .../cluster/routing/OperationRouting.java | 43 ++++++++++++++----- .../main/java/org/opensearch/node/Node.java | 14 +++--- .../node/ResponseCollectorService.java | 33 +++++++++++++- .../org/opensearch/search/SearchModule.java | 6 ++- .../org/opensearch/search/SearchService.java | 11 ++++- .../opensearch/search/query/QueryPhase.java | 38 +++++++++++----- .../search/query/QuerySearchResult.java | 4 +- .../tasks/TaskResourceTrackingService.java | 21 ++++++++- .../cluster/node/stats/NodeStatsTests.java | 4 +- .../node/tasks/TaskManagerTestCase.java | 2 +- .../bulk/TransportShardBulkActionTests.java | 9 ++-- .../routing/OperationRoutingTests.java | 19 ++++++-- .../common/network/NetworkModuleTests.java | 3 +- .../node/ResponseCollectorServiceTests.java | 1 + .../snapshots/SnapshotResiliencyTests.java | 3 +- .../TaskResourceTrackingServiceTests.java | 3 +- ...enSearchIndexLevelReplicationTestCase.java | 3 +- .../java/org/opensearch/node/MockNode.java | 10 +++-- .../opensearch/search/MockSearchService.java | 7 ++- 31 files changed, 270 insertions(+), 64 deletions(-) create mode 100644 server/src/main/java/org/opensearch/admissioncontroller/package-info.java diff --git a/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java index ed407550b8b2f..85878dc41f77d 100644 --- a/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java @@ -627,7 +627,16 @@ protected void doRun() { shardBulkAction.execute(bulkShardRequest, ActionListener.runBefore(new ActionListener() { @Override public void onResponse(BulkShardResponse bulkShardResponse) { + threadPool.getThreadContext().getResponseHeaders(); + if(threadPool.getThreadContext().getTransient("PERF_STATS") != null) { +// Map nodePerfStats = (Map) threadPool.getThreadContext().getTransient("PERF_STATS"); +// for (NodePerfStats perfStats : nodePerfStats.values()) { +// logger.info("Response " + +// "CPU : {} , MEM : {} , IO : {}", perfStats.cpuPercentAvg, perfStats.memoryPercentAvg, perfStats.ioPercentAvg); +// } + } for (BulkItemResponse bulkItemResponse : bulkShardResponse.getResponses()) { + clusterService.state().getRoutingTable().shardRoutingTable(bulkShardResponse.getShardId()); // we may have no response if item failed if (bulkItemResponse.getResponse() != null) { bulkItemResponse.getResponse().setShardInfo(bulkShardResponse.getShardInfo()); diff --git a/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java index cbb30714ee8e1..f6a54434d03c4 100644 --- a/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java @@ -56,6 +56,7 @@ import org.opensearch.action.update.UpdateHelper; import org.opensearch.action.update.UpdateRequest; import org.opensearch.action.update.UpdateResponse; +import org.opensearch.admissioncontroller.NodePerfStats; import org.opensearch.client.transport.NoNodeAvailableException; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterStateObserver; @@ -77,6 +78,7 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.FeatureFlags; import org.opensearch.common.util.concurrent.AbstractRunnable; +import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.common.xcontent.XContentHelper; import org.opensearch.common.xcontent.XContentType; import org.opensearch.common.lease.Releasable; @@ -108,9 +110,7 @@ import org.opensearch.transport.TransportService; import java.io.IOException; -import java.util.Locale; -import java.util.Map; -import java.util.Objects; +import java.util.*; import java.util.concurrent.Executor; import java.util.function.Consumer; import java.util.function.Function; @@ -428,7 +428,7 @@ public void onClusterServiceClose() { public void onTimeout(TimeValue timeout) { mappingUpdateListener.onFailure(new MapperException("timed out while waiting for a dynamic mapping update")); } - }), listener, threadPool, executor(primary)); + }), listener, threadPool, executor(primary), clusterService.localNode().getId()); } @Override @@ -453,7 +453,8 @@ public static void performOnPrimary( Consumer> waitForMappingUpdate, ActionListener> listener, ThreadPool threadPool, - String executorName + String executorName, + String nodeId ) { new ActionRunnable>(listener) { @@ -516,6 +517,24 @@ public boolean isForceExecution() { } private void finishRequest() { + Map nodePerfStatsMap = new HashMap(); + NodePerfStats nodePerfStats = new NodePerfStats(0.95, 0.95,0.95); + nodePerfStatsMap.put(nodeId, nodePerfStats); + ThreadContext threadContext = threadPool.getThreadContext(); + threadContext.addResponseHeader("PERF_STATS", String.valueOf(nodePerfStats.cpuPercentAvg) + "-" + + String.valueOf(nodePerfStats.memoryPercentAvg) + "-" + String.valueOf(nodePerfStats.ioPercentAvg)); +// Map np = new HashMap<>(); +// if(threadContext.getTransient("PERF_STATS") != null ) { +// np = threadContext.getTransient("PERF_STATS"); +// } +// np.put(nodeId, nodePerfStats); + //ThreadContext.StoredContext storedContext = threadContext.newStoredContext(true, Collections.singletonList("PERF_STATS")); + //ThreadContext.StoredContext storedContext = threadContext.newStoredContext(true, Collections.singletonList("PERF_STATS")); + //threadContext.putTransient("PERF_STATS", nodePerfStats); + //threadContext.putHeader("PERF_STATS", nodePerfStatsMap); +// ThreadContext.StoredContext storedContext1 = threadContext.newStoredContext(true, Collections.singletonList("T_ID")); +// threadContext.putTransient("T_ID", "nodePerfStats"); + ActionListener.completeWith( listener, () -> new WritePrimaryResult<>( @@ -527,6 +546,8 @@ private void finishRequest() { logger ) ); + //storedContext.close(); +// storedContext1.close(); } }.run(); } diff --git a/server/src/main/java/org/opensearch/action/fieldcaps/TransportFieldCapabilitiesIndexAction.java b/server/src/main/java/org/opensearch/action/fieldcaps/TransportFieldCapabilitiesIndexAction.java index 7fa9e43bee21d..d96efc1df3b83 100644 --- a/server/src/main/java/org/opensearch/action/fieldcaps/TransportFieldCapabilitiesIndexAction.java +++ b/server/src/main/java/org/opensearch/action/fieldcaps/TransportFieldCapabilitiesIndexAction.java @@ -248,7 +248,7 @@ private AsyncShardsAction(FieldCapabilitiesIndexRequest request, ActionListener< } shardsIt = clusterService.operationRouting() - .searchShards(clusterService.state(), new String[] { request.index() }, null, null, null, null); + .searchShards(clusterService.state(), new String[] { request.index() }, null, null, null, null, null); } public void start() { diff --git a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java index 69f529fe1d00c..7aac82c62196a 100644 --- a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java @@ -960,7 +960,8 @@ private void executeSearch( routingMap, searchRequest.preference(), searchService.getResponseCollectorService(), - nodeSearchCounts + nodeSearchCounts, + searchService.getAdmissionControllerService() ); localShardIterators = StreamSupport.stream(localShardRoutings.spliterator(), false) .map(it -> new SearchShardIterator(searchRequest.getLocalClusterAlias(), it.shardId(), it.getShardRoutings(), localIndices)) diff --git a/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java index 3feb5b23e5540..a3ed6ce9f52d5 100644 --- a/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java @@ -411,9 +411,8 @@ protected void handlePrimaryRequest(final ConcreteShardRequest request, new ChannelActionListener<>(channel, transportPrimaryAction, request), releasable::close ); - - try { - new AsyncPrimaryAction(request, listener, (ReplicationTask) task).run(); + // here + try {new AsyncPrimaryAction(request, listener, (ReplicationTask) task).run(); } catch (RuntimeException e) { listener.onFailure(e); } diff --git a/server/src/main/java/org/opensearch/admissioncontroller/AdmissionControllerPlugin.java b/server/src/main/java/org/opensearch/admissioncontroller/AdmissionControllerPlugin.java index 85c1eef70a325..5d201ad71ef3d 100644 --- a/server/src/main/java/org/opensearch/admissioncontroller/AdmissionControllerPlugin.java +++ b/server/src/main/java/org/opensearch/admissioncontroller/AdmissionControllerPlugin.java @@ -19,6 +19,9 @@ import java.util.ArrayList; import java.util.List; +/** + * Plugin + */ public class AdmissionControllerPlugin extends Plugin implements NetworkPlugin { public AdmissionControllerService admissionControllerService; diff --git a/server/src/main/java/org/opensearch/admissioncontroller/AdmissionControllerRequestHandler.java b/server/src/main/java/org/opensearch/admissioncontroller/AdmissionControllerRequestHandler.java index 9b774d3bb1727..c0cbf3ddc0a04 100644 --- a/server/src/main/java/org/opensearch/admissioncontroller/AdmissionControllerRequestHandler.java +++ b/server/src/main/java/org/opensearch/admissioncontroller/AdmissionControllerRequestHandler.java @@ -18,6 +18,10 @@ import org.opensearch.transport.TransportRequest; import org.opensearch.transport.TransportRequestHandler; +/** + * Handler + * @param + */ public class AdmissionControllerRequestHandler implements TransportRequestHandler { private final String action; private final TransportRequestHandler actualHandler; @@ -37,6 +41,7 @@ protected ThreadContext getThreadContext() { if(threadPool == null) { return null; } + threadPool.getThreadContext().getTransient("PERF_STATS"); return threadPool.getThreadContext(); } @@ -58,7 +63,7 @@ public void messageReceived(T request, TransportChannel channel, Task task) thro return; } }else { - log.info("Admission controller service responded with IO is in healthy state"); + //log.info("Admission controller service responded with IO is in healthy state"); } this.messageReceivedDecorate(request, actualHandler, channel, task); } diff --git a/server/src/main/java/org/opensearch/admissioncontroller/AdmissionControllerService.java b/server/src/main/java/org/opensearch/admissioncontroller/AdmissionControllerService.java index c284b7a8b0223..d63234b3b4676 100644 --- a/server/src/main/java/org/opensearch/admissioncontroller/AdmissionControllerService.java +++ b/server/src/main/java/org/opensearch/admissioncontroller/AdmissionControllerService.java @@ -29,6 +29,9 @@ import java.util.*; import java.util.concurrent.atomic.AtomicInteger; +/** + * Service + */ public class AdmissionControllerService extends AbstractLifecycleComponent { private static final Logger logger = LogManager.getLogger(AdmissionControllerService.class); @@ -90,6 +93,8 @@ public double getMemoryEWMA() { return memoryExecutionEWMA.getAverage(); } + public double getIoEWMA() { return ioExecutionEWMA.getAverage(); } + @Override protected void doClose() throws IOException { diff --git a/server/src/main/java/org/opensearch/admissioncontroller/AdmissionControllerTransportInterceptor.java b/server/src/main/java/org/opensearch/admissioncontroller/AdmissionControllerTransportInterceptor.java index 5cc497d3cc641..7650fd779b7c5 100644 --- a/server/src/main/java/org/opensearch/admissioncontroller/AdmissionControllerTransportInterceptor.java +++ b/server/src/main/java/org/opensearch/admissioncontroller/AdmissionControllerTransportInterceptor.java @@ -16,6 +16,9 @@ import org.opensearch.transport.TransportRequest; import org.opensearch.transport.TransportRequestHandler; +/** + * Interceptor + */ public class AdmissionControllerTransportInterceptor implements TransportInterceptor { protected final Logger log = LogManager.getLogger(this.getClass()); diff --git a/server/src/main/java/org/opensearch/admissioncontroller/NodePerfStats.java b/server/src/main/java/org/opensearch/admissioncontroller/NodePerfStats.java index 287f42cc7a210..e886ba0c9f841 100644 --- a/server/src/main/java/org/opensearch/admissioncontroller/NodePerfStats.java +++ b/server/src/main/java/org/opensearch/admissioncontroller/NodePerfStats.java @@ -10,10 +10,14 @@ import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; +import org.opensearch.common.io.stream.Writeable; import java.io.IOException; -public class NodePerfStats { +/** + * Node perf stats + */ +public class NodePerfStats implements Writeable { public double cpuPercentAvg; public double memoryPercentAvg; public double ioPercentAvg; diff --git a/server/src/main/java/org/opensearch/admissioncontroller/package-info.java b/server/src/main/java/org/opensearch/admissioncontroller/package-info.java new file mode 100644 index 0000000000000..22a1253f72370 --- /dev/null +++ b/server/src/main/java/org/opensearch/admissioncontroller/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * package + */ +package org.opensearch.admissioncontroller; diff --git a/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java b/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java index 7befec56abaa6..282c9331217e4 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java +++ b/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java @@ -34,6 +34,8 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.opensearch.admissioncontroller.AdmissionControllerService; +import org.opensearch.admissioncontroller.NodePerfStats; import org.opensearch.cluster.metadata.WeightedRoutingMetadata; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; @@ -292,8 +294,9 @@ public ShardIterator activeInitializingShardsIt(int seed) { */ public ShardIterator activeInitializingShardsRankedIt( @Nullable ResponseCollectorService collector, - @Nullable Map nodeSearchCounts - ) { + @Nullable Map nodeSearchCounts, + @Nullable AdmissionControllerService admissionControllerService + ) { final int seed = shuffler.nextSeed(); if (allInitializingShards.isEmpty()) { return new PlainShardIterator( @@ -483,7 +486,12 @@ private static void adjustStats( final int updatedQueue = (minStats.queueSize + stats.queueSize) / 2; final long updatedResponse = (long) (minStats.responseTime + stats.responseTime) / 2; final long updatedService = (long) (minStats.serviceTime + stats.serviceTime) / 2; - collector.addNodeStatistics(nodeId, updatedQueue, updatedResponse, updatedService); + // revisit this - basically reset stats based on time + final double cpuPercentAvg = stats.nodePerfStats.cpuPercentAvg * 0.99; + final double memPercentAvg = stats.nodePerfStats.memoryPercentAvg * 0.99; + final double ioPercentAvg = stats.nodePerfStats.ioPercentAvg * 0.99; + NodePerfStats nodePerfStats = new NodePerfStats(cpuPercentAvg, memPercentAvg, ioPercentAvg); + collector.addNodeStatistics(nodeId, updatedQueue, updatedResponse, updatedService, nodePerfStats); } } } diff --git a/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java b/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java index ade2cda797334..6cb412dbe3c0f 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java +++ b/server/src/main/java/org/opensearch/cluster/routing/OperationRouting.java @@ -32,6 +32,7 @@ package org.opensearch.cluster.routing; +import org.opensearch.admissioncontroller.AdmissionControllerService; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.WeightedRoutingMetadata; @@ -202,7 +203,8 @@ public ShardIterator getShards( preference, null, null, - clusterState.getMetadata().weightedRoutingMetadata() + clusterState.getMetadata().weightedRoutingMetadata(), + null ); } @@ -215,7 +217,8 @@ public ShardIterator getShards(ClusterState clusterState, String index, int shar preference, null, null, - clusterState.metadata().weightedRoutingMetadata() + clusterState.metadata().weightedRoutingMetadata(), + null ); } @@ -225,7 +228,7 @@ public GroupShardsIterator searchShards( @Nullable Map> routing, @Nullable String preference ) { - return searchShards(clusterState, concreteIndices, routing, preference, null, null); + return searchShards(clusterState, concreteIndices, routing, preference, null, null, null); } public GroupShardsIterator searchShards( @@ -236,6 +239,17 @@ public GroupShardsIterator searchShards( @Nullable ResponseCollectorService collectorService, @Nullable Map nodeCounts ) { + return searchShards(clusterState, concreteIndices, routing, preference, collectorService, nodeCounts, null); + } + public GroupShardsIterator searchShards( + ClusterState clusterState, + String[] concreteIndices, + @Nullable Map> routing, + @Nullable String preference, + @Nullable ResponseCollectorService collectorService, + @Nullable Map nodeCounts, + @Nullable AdmissionControllerService admissionControllerService + ) { final Set shards = computeTargetedShards(clusterState, concreteIndices, routing); final Set set = new HashSet<>(shards.size()); for (IndexShardRoutingTable shard : shards) { @@ -253,7 +267,8 @@ public GroupShardsIterator searchShards( preference, collectorService, nodeCounts, - clusterState.metadata().weightedRoutingMetadata() + clusterState.metadata().weightedRoutingMetadata(), + admissionControllerService ); if (iterator != null) { set.add(iterator); @@ -276,8 +291,10 @@ private Set computeTargetedShards( ) { routing = routing == null ? EMPTY_ROUTING : routing; // just use an empty map final Set set = new HashSet<>(); + final Set nodeIds = new HashSet<>(); // we use set here and not list since we might get duplicates for (String index : concreteIndices) { + // this is where we calculate shard information which contains nodes associated with each shard final IndexRoutingTable indexRouting = indexRoutingTable(clusterState, index); final IndexMetadata indexMetadata = indexMetadata(clusterState, index); final Set effectiveRouting = routing.get(index); @@ -285,7 +302,10 @@ private Set computeTargetedShards( for (String r : effectiveRouting) { final int routingPartitionSize = indexMetadata.getRoutingPartitionSize(); for (int partitionOffset = 0; partitionOffset < routingPartitionSize; partitionOffset++) { - set.add(RoutingTable.shardRoutingTable(indexRouting, calculateScaledShardId(indexMetadata, r, partitionOffset))); + final IndexShardRoutingTable indexShardRoutingTable = RoutingTable.shardRoutingTable(indexRouting, + calculateScaledShardId(indexMetadata, r, partitionOffset)); + //nodeIds.add(indexShardRoutingTable.s) + set.add(indexShardRoutingTable); } } } else { @@ -304,10 +324,11 @@ private ShardIterator preferenceActiveShardIterator( @Nullable String preference, @Nullable ResponseCollectorService collectorService, @Nullable Map nodeCounts, - @Nullable WeightedRoutingMetadata weightedRoutingMetadata + @Nullable WeightedRoutingMetadata weightedRoutingMetadata, + @Nullable AdmissionControllerService admissionControllerService ) { if (preference == null || preference.isEmpty()) { - return shardRoutings(indexShard, nodes, collectorService, nodeCounts, weightedRoutingMetadata); + return shardRoutings(indexShard, nodes, collectorService, nodeCounts, weightedRoutingMetadata, admissionControllerService); } if (preference.charAt(0) == '_') { @@ -335,12 +356,13 @@ private ShardIterator preferenceActiveShardIterator( } // no more preference if (index == -1 || index == preference.length() - 1) { - return shardRoutings(indexShard, nodes, collectorService, nodeCounts, weightedRoutingMetadata); + return shardRoutings(indexShard, nodes, collectorService, nodeCounts, weightedRoutingMetadata, admissionControllerService); } else { // update the preference and continue preference = preference.substring(index + 1); } } + // there are preference based routings as well preferenceType = Preference.parse(preference); checkPreferenceBasedRoutingAllowed(preferenceType, weightedRoutingMetadata); switch (preferenceType) { @@ -400,7 +422,8 @@ private ShardIterator shardRoutings( DiscoveryNodes nodes, @Nullable ResponseCollectorService collectorService, @Nullable Map nodeCounts, - @Nullable WeightedRoutingMetadata weightedRoutingMetadata + @Nullable WeightedRoutingMetadata weightedRoutingMetadata, + @Nullable AdmissionControllerService admissionControllerService ) { if (WeightedRoutingUtils.shouldPerformWeightedRouting(ignoreWeightedRouting, weightedRoutingMetadata)) { return indexShard.activeInitializingShardsWeightedIt( @@ -412,7 +435,7 @@ private ShardIterator shardRoutings( ); } else if (ignoreAwarenessAttributes()) { if (useAdaptiveReplicaSelection) { - return indexShard.activeInitializingShardsRankedIt(collectorService, nodeCounts); + return indexShard.activeInitializingShardsRankedIt(collectorService, nodeCounts, admissionControllerService); } else { return indexShard.activeInitializingShardsRandomIt(); } diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 7fca47da5d98b..a82e5484bef98 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -885,7 +885,8 @@ protected Node( final TaskResourceTrackingService taskResourceTrackingService = new TaskResourceTrackingService( settings, clusterService.getClusterSettings(), - threadPool + threadPool, + admissionControllerService ); final SearchBackpressureSettings searchBackpressureSettings = new SearchBackpressureSettings( @@ -1021,11 +1022,12 @@ protected Node( threadPool, scriptService, bigArrays, - searchModule.getQueryPhase(), + searchModule.getQueryPhase(admissionControllerService), searchModule.getFetchPhase(), responseCollectorService, circuitBreakerService, - searchModule.getIndexSearcherExecutor(threadPool) + searchModule.getIndexSearcherExecutor(threadPool), + admissionControllerService ); final List> tasksExecutors = pluginsService.filterPlugins(PersistentTaskPlugin.class) @@ -1632,7 +1634,8 @@ protected SearchService newSearchService( FetchPhase fetchPhase, ResponseCollectorService responseCollectorService, CircuitBreakerService circuitBreakerService, - Executor indexSearcherExecutor + Executor indexSearcherExecutor, + AdmissionControllerService admissionControllerService ) { return new SearchService( clusterService, @@ -1644,7 +1647,8 @@ protected SearchService newSearchService( fetchPhase, responseCollectorService, circuitBreakerService, - indexSearcherExecutor + indexSearcherExecutor, + admissionControllerService ); } diff --git a/server/src/main/java/org/opensearch/node/ResponseCollectorService.java b/server/src/main/java/org/opensearch/node/ResponseCollectorService.java index f671a07d1b5b6..413daf2b0692f 100644 --- a/server/src/main/java/org/opensearch/node/ResponseCollectorService.java +++ b/server/src/main/java/org/opensearch/node/ResponseCollectorService.java @@ -32,6 +32,7 @@ package org.opensearch.node; +import org.apache.logging.log4j.LogManager; import org.opensearch.admissioncontroller.NodePerfStats; import org.opensearch.cluster.ClusterChangedEvent; import org.opensearch.cluster.ClusterStateListener; @@ -42,6 +43,7 @@ import org.opensearch.common.io.stream.StreamOutput; import org.opensearch.common.io.stream.Writeable; import org.opensearch.common.util.concurrent.ConcurrentCollections; +import org.opensearch.common.util.concurrent.ThreadContext; import java.io.IOException; import java.util.HashMap; @@ -49,6 +51,7 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.ConcurrentMap; +import java.util.logging.Logger; /** * Collects statistics about queue size, response time, and service time of @@ -60,6 +63,8 @@ public final class ResponseCollectorService implements ClusterStateListener { private static final double ALPHA = 0.3; + org.apache.logging.log4j.Logger logger = LogManager.getLogger(ResponseCollectorService.class); + private final ConcurrentMap nodeIdToStats = ConcurrentCollections.newConcurrentMap(); @@ -80,9 +85,14 @@ void removeNode(String nodeId) { nodeIdToStats.remove(nodeId); } + public void addNodeStatistics(String nodeId, int queueSize, long responseTimeNanos, long avgServiceTimeNanos) { + addNodeStatistics(nodeId, queueSize, responseTimeNanos, avgServiceTimeNanos, null); + } public void addNodeStatistics(String nodeId, int queueSize, long responseTimeNanos, long avgServiceTimeNanos, NodePerfStats nodePerfStats) { nodeIdToStats.compute(nodeId, (id, ns) -> { + logger.info("Add node statistics - node id : {} , cpu : {} , mem : {} , io : {}", nodeId, nodePerfStats.cpuPercentAvg, + nodePerfStats.memoryPercentAvg, nodePerfStats.ioPercentAvg); if (ns == null) { ExponentiallyWeightedMovingAverage queueEWMA = new ExponentiallyWeightedMovingAverage(ALPHA, queueSize); ExponentiallyWeightedMovingAverage responseEWMA = new ExponentiallyWeightedMovingAverage(ALPHA, responseTimeNanos); @@ -137,13 +147,18 @@ public static class ComputedNodeStats implements Writeable { public final int queueSize; public final double responseTime; public final double serviceTime; + public final NodePerfStats nodePerfStats; + org.apache.logging.log4j.Logger logger = LogManager.getLogger(ResponseCollectorService.class); + - public ComputedNodeStats(String nodeId, int clientNum, int queueSize, double responseTime, double serviceTime) { + public ComputedNodeStats(String nodeId, int clientNum, int queueSize, double responseTime, double serviceTime, + NodePerfStats nodePerfStats) { this.nodeId = nodeId; this.clientNum = clientNum; this.queueSize = queueSize; this.responseTime = responseTime; this.serviceTime = serviceTime; + this.nodePerfStats = nodePerfStats; } ComputedNodeStats(int clientNum, NodeStatistics nodeStats) { @@ -152,7 +167,8 @@ public ComputedNodeStats(String nodeId, int clientNum, int queueSize, double res clientNum, (int) nodeStats.queueSize.getAverage(), nodeStats.responseTime.getAverage(), - nodeStats.serviceTime + nodeStats.serviceTime, + nodeStats.nodePerfStats ); } @@ -162,6 +178,7 @@ public ComputedNodeStats(String nodeId, int clientNum, int queueSize, double res this.queueSize = in.readInt(); this.responseTime = in.readDouble(); this.serviceTime = in.readDouble(); + this.nodePerfStats = new NodePerfStats(in); } @Override @@ -171,6 +188,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeInt(this.queueSize); out.writeDouble(this.responseTime); out.writeDouble(this.serviceTime); + this.nodePerfStats.writeTo(out); } /** @@ -198,13 +216,24 @@ private double innerRank(long outstandingRequests) { // The final formula double rank = rS - (1.0 / muBarS) + (Math.pow(qHatS, queueAdjustmentFactor) / muBarS); + + logger.info("queue size : {} , queue size with compensation factor : {} , response time : {} ," + + " service time : {} , rank : {}", qBar, qHatS, rS, muBarS, rank); + logger.info("CPU : {} , Mem : {} , IO : {} ", nodePerfStats.cpuPercentAvg, + nodePerfStats.memoryPercentAvg, nodePerfStats.ioPercentAvg); + if(nodePerfStats.cpuPercentAvg > 50.0 ) rank = rank * 2; + if(nodePerfStats.memoryPercentAvg > 50.0 ) rank = rank * 2; + if(nodePerfStats.ioPercentAvg > 50.0) rank = rank * 2; return rank; } public double rank(long outstandingRequests) { if (cachedRank == 0) { + logger.info("Cached rank : {}" , cachedRank); cachedRank = innerRank(outstandingRequests); + logger.info("Cached rank post : {}" , cachedRank); } + logger.info("Cached rank post : {}" , cachedRank); return cachedRank; } diff --git a/server/src/main/java/org/opensearch/search/SearchModule.java b/server/src/main/java/org/opensearch/search/SearchModule.java index aeb1d8325b1b8..00fb25b079e58 100644 --- a/server/src/main/java/org/opensearch/search/SearchModule.java +++ b/server/src/main/java/org/opensearch/search/SearchModule.java @@ -33,6 +33,7 @@ package org.opensearch.search; import org.apache.lucene.search.BooleanQuery; +import org.opensearch.admissioncontroller.AdmissionControllerService; import org.opensearch.common.NamedRegistry; import org.opensearch.common.Nullable; import org.opensearch.common.geo.GeoShapeType; @@ -1290,7 +1291,10 @@ public FetchPhase getFetchPhase() { } public QueryPhase getQueryPhase() { - return new QueryPhase(queryPhaseSearcher); + return new QueryPhase(queryPhaseSearcher, null); + } + public QueryPhase getQueryPhase(@Nullable AdmissionControllerService admissionControllerService) { + return new QueryPhase(queryPhaseSearcher, admissionControllerService); } public @Nullable ExecutorService getIndexSearcherExecutor(ThreadPool pool) { diff --git a/server/src/main/java/org/opensearch/search/SearchService.java b/server/src/main/java/org/opensearch/search/SearchService.java index 7d67c6c3b45f4..93a70dab22415 100644 --- a/server/src/main/java/org/opensearch/search/SearchService.java +++ b/server/src/main/java/org/opensearch/search/SearchService.java @@ -49,6 +49,7 @@ import org.opensearch.action.search.UpdatePitContextRequest; import org.opensearch.action.search.UpdatePitContextResponse; import org.opensearch.action.support.TransportActions; +import org.opensearch.admissioncontroller.AdmissionControllerService; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.CheckedSupplier; @@ -304,6 +305,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv private final String sessionId = UUIDs.randomBase64UUID(); private final Executor indexSearcherExecutor; + private final AdmissionControllerService admissionControllerService; + public SearchService( ClusterService clusterService, IndicesService indicesService, @@ -314,7 +317,8 @@ public SearchService( FetchPhase fetchPhase, ResponseCollectorService responseCollectorService, CircuitBreakerService circuitBreakerService, - Executor indexSearcherExecutor + Executor indexSearcherExecutor, + AdmissionControllerService admissionControllerService ) { Settings settings = clusterService.getSettings(); this.threadPool = threadPool; @@ -322,6 +326,7 @@ public SearchService( this.indicesService = indicesService; this.scriptService = scriptService; this.responseCollectorService = responseCollectorService; + this.admissionControllerService = admissionControllerService; this.bigArrays = bigArrays; this.queryPhase = queryPhase; this.fetchPhase = fetchPhase; @@ -1470,6 +1475,10 @@ public ResponseCollectorService getResponseCollectorService() { return this.responseCollectorService; } + public AdmissionControllerService getAdmissionControllerService() { + return this.admissionControllerService; + } + class Reaper implements Runnable { @Override public void run() { diff --git a/server/src/main/java/org/opensearch/search/query/QueryPhase.java b/server/src/main/java/org/opensearch/search/query/QueryPhase.java index 92674bcc66408..07232d8cd44c0 100644 --- a/server/src/main/java/org/opensearch/search/query/QueryPhase.java +++ b/server/src/main/java/org/opensearch/search/query/QueryPhase.java @@ -47,13 +47,16 @@ import org.apache.lucene.search.TopDocs; import org.apache.lucene.search.TotalHits; import org.opensearch.action.search.SearchShardTask; +import org.opensearch.admissioncontroller.AdmissionControllerService; import org.opensearch.admissioncontroller.NodePerfStats; import org.opensearch.common.Booleans; +import org.opensearch.common.Nullable; import org.opensearch.common.lucene.Lucene; import org.opensearch.common.lucene.search.TopDocsAndMaxScore; import org.opensearch.common.util.concurrent.EWMATrackingThreadPoolExecutor; import org.opensearch.lucene.queries.SearchAfterSortedDocQuery; import org.opensearch.monitor.process.ProcessStats; +import org.opensearch.node.Node; import org.opensearch.search.DocValueFormat; import org.opensearch.search.SearchContextSourcePrinter; import org.opensearch.search.SearchService; @@ -101,14 +104,17 @@ public class QueryPhase { private final SuggestProcessor suggestProcessor; private final RescoreProcessor rescoreProcessor; + private final AdmissionControllerService admissionControllerService; + public QueryPhase() { - this(DEFAULT_QUERY_PHASE_SEARCHER); + this(DEFAULT_QUERY_PHASE_SEARCHER, null); } - public QueryPhase(QueryPhaseSearcher queryPhaseSearcher) { + public QueryPhase(QueryPhaseSearcher queryPhaseSearcher, @Nullable AdmissionControllerService admissionControllerService) { this.queryPhaseSearcher = Objects.requireNonNull(queryPhaseSearcher, "QueryPhaseSearcher is required"); this.suggestProcessor = new SuggestProcessor(); this.rescoreProcessor = new RescoreProcessor(); + this.admissionControllerService = admissionControllerService; } public void preProcess(SearchContext context) { @@ -152,7 +158,7 @@ public void execute(SearchContext searchContext) throws QueryPhaseExecutionExcep // request, preProcess is called on the DFS phase phase, this is why we pre-process them // here to make sure it happens during the QUERY phase aggregationProcessor.preProcess(searchContext); - boolean rescore = executeInternal(searchContext, queryPhaseSearcher); + boolean rescore = executeInternal(searchContext, queryPhaseSearcher, this.admissionControllerService); if (rescore) { // only if we do a regular search rescoreProcessor.process(searchContext); @@ -180,15 +186,19 @@ public QueryPhaseSearcher getQueryPhaseSearcher() { * @return whether the rescoring phase should be executed */ static boolean executeInternal(SearchContext searchContext) throws QueryPhaseExecutionException { - return executeInternal(searchContext, QueryPhase.DEFAULT_QUERY_PHASE_SEARCHER); + return executeInternal(searchContext, QueryPhase.DEFAULT_QUERY_PHASE_SEARCHER, null); } + static boolean executeInternal(SearchContext searchContext, QueryPhaseSearcher queryPhaseSearcher) throws QueryPhaseExecutionException { + return executeInternal(searchContext, queryPhaseSearcher, null); + } /** * In a package-private method so that it can be tested without having to * wire everything (mapperService, etc.) * @return whether the rescoring phase should be executed */ - static boolean executeInternal(SearchContext searchContext, QueryPhaseSearcher queryPhaseSearcher) throws QueryPhaseExecutionException { + static boolean executeInternal(SearchContext searchContext, QueryPhaseSearcher queryPhaseSearcher, + @Nullable AdmissionControllerService admissionControllerService) throws QueryPhaseExecutionException { final ContextIndexSearcher searcher = searchContext.searcher(); final IndexReader reader = searcher.getIndexReader(); QuerySearchResult queryResult = searchContext.queryResult(); @@ -289,15 +299,23 @@ static boolean executeInternal(SearchContext searchContext, QueryPhaseSearcher q ); ExecutorService executor = searchContext.indexShard().getThreadPool().executor(ThreadPool.Names.SEARCH); + //Thread.sleep(5000); if (executor instanceof EWMATrackingThreadPoolExecutor) { final EWMATrackingThreadPoolExecutor rExecutor = (EWMATrackingThreadPoolExecutor) executor; queryResult.nodeQueueSize(rExecutor.getCurrentQueueSize()); queryResult.serviceTimeEWMA((long) rExecutor.getTaskExecutionEWMA()); - LOGGER.info("Adding node perf stats"); - NodePerfStats nodePerfStats = new NodePerfStats( - 95.0, 95.0, 95.0 - ); - queryResult.nodePerfStats(nodePerfStats); + LOGGER.info("Adding node perf stats for CPU : {} , MEM : {} , IO : {}" ,admissionControllerService.getCPUEWMA() * 1000, + admissionControllerService.getMemoryEWMA() * 1000, admissionControllerService.getIoEWMA() * 1000); + NodePerfStats nodePerfStats = null; + if(admissionControllerService != null) { + nodePerfStats = new NodePerfStats( + admissionControllerService.getCPUEWMA(), + admissionControllerService.getMemoryEWMA(), + admissionControllerService.getIoEWMA() + ); + queryResult.nodePerfStats(nodePerfStats); + + } } return shouldRescore; diff --git a/server/src/main/java/org/opensearch/search/query/QuerySearchResult.java b/server/src/main/java/org/opensearch/search/query/QuerySearchResult.java index cace17e9f1cd8..35832c1b58b20 100644 --- a/server/src/main/java/org/opensearch/search/query/QuerySearchResult.java +++ b/server/src/main/java/org/opensearch/search/query/QuerySearchResult.java @@ -372,7 +372,7 @@ public void readFromWithId(ShardSearchContextId id, StreamInput in) throws IOExc hasProfileResults = profileShardResults != null; serviceTimeEWMA = in.readZLong(); nodeQueueSize = in.readInt(); - nodePerfStats = new NodePerfStats(in); + nodePerfStats = in.readOptionalWriteable(NodePerfStats::new); setShardSearchRequest(in.readOptionalWriteable(ShardSearchRequest::new)); setRescoreDocIds(new RescoreDocIds(in)); } @@ -415,7 +415,7 @@ public void writeToNoId(StreamOutput out) throws IOException { out.writeOptionalWriteable(profileShardResults); out.writeZLong(serviceTimeEWMA); out.writeInt(nodeQueueSize); - nodePerfStats.writeTo(out); + out.writeOptionalWriteable(nodePerfStats); out.writeOptionalWriteable(getShardSearchRequest()); getRescoreDocIds().writeTo(out); diff --git a/server/src/main/java/org/opensearch/tasks/TaskResourceTrackingService.java b/server/src/main/java/org/opensearch/tasks/TaskResourceTrackingService.java index b4806b531429e..68e0783cb7a3a 100644 --- a/server/src/main/java/org/opensearch/tasks/TaskResourceTrackingService.java +++ b/server/src/main/java/org/opensearch/tasks/TaskResourceTrackingService.java @@ -13,6 +13,8 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.ExceptionsHelper; +import org.opensearch.admissioncontroller.AdmissionControllerService; +import org.opensearch.admissioncontroller.NodePerfStats; import org.opensearch.common.SuppressForbidden; import org.opensearch.common.inject.Inject; import org.opensearch.common.settings.ClusterSettings; @@ -55,11 +57,15 @@ public class TaskResourceTrackingService implements RunnableTaskExecutionListene private final ThreadPool threadPool; private volatile boolean taskResourceTrackingEnabled; + private final AdmissionControllerService admissionControllerService; + @Inject - public TaskResourceTrackingService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) { + public TaskResourceTrackingService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool, + AdmissionControllerService admissionControllerService) { this.taskResourceTrackingEnabled = TASK_RESOURCE_TRACKING_ENABLED.get(settings); this.threadPool = threadPool; clusterSettings.addSettingsUpdateConsumer(TASK_RESOURCE_TRACKING_ENABLED, this::setTaskResourceTrackingEnabled); + this.admissionControllerService = admissionControllerService; } public void setTaskResourceTrackingEnabled(boolean taskResourceTrackingEnabled) { @@ -87,6 +93,7 @@ public ThreadContext.StoredContext startTracking(Task task) { if (task.supportsResourceTracking() == false || isTaskResourceTrackingEnabled() == false || isTaskResourceTrackingSupported() == false) { + //return addPerfStatsToThreadContext(task); return () -> {}; } @@ -193,6 +200,7 @@ public void taskExecutionStartedOnThread(long taskId, long threadId) { public void taskExecutionFinishedOnThread(long taskId, long threadId) { try { final Task task = resourceAwareTasks.get(taskId); + long id = (long)threadPool.getThreadContext().getTransient("TASK_ID"); if (task != null) { logger.debug("Task execution finished on thread. Task: {}, Thread: {}", taskId, threadId); task.stopThreadResourceTracking(threadId, WORKER_STATS, getResourceUsageMetricsForThread(threadId)); @@ -257,6 +265,17 @@ private ThreadContext.StoredContext addTaskIdToThreadContext(Task task) { return storedContext; } + private ThreadContext.StoredContext addPerfStatsToThreadContext(Task task) { + ThreadContext threadContext = threadPool.getThreadContext(); + + ThreadContext.StoredContext storedContext = threadContext.newStoredContext(true, + Collections.singletonList("PERF_STATS")); + NodePerfStats nodePerfStats = new NodePerfStats(admissionControllerService.getCPUEWMA(), admissionControllerService.getMemoryEWMA(), + admissionControllerService.getIoEWMA()); + threadContext.putTransient("PERF_STATS", nodePerfStats); + return storedContext; + } + /** * Listener that gets invoked when a task execution completes. */ diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java index 23920ee4d44c5..dc50ced3bc91c 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/node/stats/NodeStatsTests.java @@ -32,6 +32,7 @@ package org.opensearch.action.admin.cluster.node.stats; +import org.opensearch.admissioncontroller.NodePerfStats; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.routing.WeightedRoutingStats; import org.opensearch.cluster.service.ClusterManagerThrottlingStats; @@ -710,7 +711,8 @@ public static NodeStats createNodeStats() { randomIntBetween(1, 10), randomIntBetween(0, 2000), randomDoubleBetween(1.0, 10000000.0, true), - randomDoubleBetween(1.0, 10000000.0, true) + randomDoubleBetween(1.0, 10000000.0, true), + new NodePerfStats(95.0, 95.0, 95.0) ); nodeStats.put(nodeId, stats); } diff --git a/server/src/test/java/org/opensearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java b/server/src/test/java/org/opensearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java index 843fd85d81197..0284265273085 100644 --- a/server/src/test/java/org/opensearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java +++ b/server/src/test/java/org/opensearch/action/admin/cluster/node/tasks/TaskManagerTestCase.java @@ -236,7 +236,7 @@ protected TaskManager createTaskManager( transportService.start(); clusterService = createClusterService(threadPool, discoveryNode.get()); clusterService.addStateApplier(transportService.getTaskManager()); - taskResourceTrackingService = new TaskResourceTrackingService(settings, clusterService.getClusterSettings(), threadPool); + taskResourceTrackingService = new TaskResourceTrackingService(settings, clusterService.getClusterSettings(), threadPool, null); transportService.getTaskManager().setTaskResourceTrackingService(taskResourceTrackingService); ActionFilters actionFilters = new ActionFilters(emptySet()); transportListTasksAction = new TransportListTasksAction( diff --git a/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java b/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java index cc7b5cb8dc845..0d621b92d4149 100644 --- a/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java +++ b/server/src/test/java/org/opensearch/action/bulk/TransportShardBulkActionTests.java @@ -279,7 +279,8 @@ public void testSkipBulkIndexRequestIfAborted() throws Exception { } }), latch::countDown), threadPool, - Names.WRITE + Names.WRITE, + "NoedId" ); latch.await(); @@ -941,7 +942,8 @@ public void testRetries() throws Exception { assertThat(response.getSeqNo(), equalTo(13L)); }), latch), threadPool, - Names.WRITE + Names.WRITE, + "NodeId" ); latch.await(); } @@ -1024,7 +1026,8 @@ public void testForceExecutionOnRejectionAfterMappingUpdate() throws Exception { // Assert that we still need to fsync the location that was successfully written assertThat(((WritePrimaryResult) result).location, equalTo(resultLocation1))), latch), rejectingThreadPool, - Names.WRITE + Names.WRITE, + "nodeId" ); latch.await(); diff --git a/server/src/test/java/org/opensearch/cluster/routing/OperationRoutingTests.java b/server/src/test/java/org/opensearch/cluster/routing/OperationRoutingTests.java index 55885fb61ee0c..e2a6407a384bb 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/OperationRoutingTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/OperationRoutingTests.java @@ -33,6 +33,7 @@ import org.opensearch.Version; import org.opensearch.action.support.replication.ClusterStateCreationUtils; +import org.opensearch.admissioncontroller.AdmissionControllerService; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.Metadata; @@ -45,9 +46,13 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.io.IOUtils; +import org.opensearch.env.Environment; +import org.opensearch.env.NodeEnvironment; import org.opensearch.index.Index; import org.opensearch.index.IndexModule; import org.opensearch.index.shard.ShardId; +import org.opensearch.index.store.remote.filecache.FileCache; +import org.opensearch.monitor.fs.FsService; import org.opensearch.node.ResponseCollectorService; import org.opensearch.test.ClusterServiceUtils; import org.opensearch.test.OpenSearchTestCase; @@ -595,6 +600,13 @@ public void testAdaptiveReplicaSelection() throws Exception { TestThreadPool threadPool = new TestThreadPool("testThatOnlyNodesSupportNodeIds"); ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool); ResponseCollectorService collector = new ResponseCollectorService(clusterService); + AdmissionControllerService admissionControllerService = new AdmissionControllerService( + Settings.EMPTY, + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + threadPool, + new NodeEnvironment(Settings.EMPTY, new Environment(Settings.EMPTY, null)), + new FsService(Settings.EMPTY, new NodeEnvironment(Settings.EMPTY, new Environment(Settings.EMPTY, null)), new FileCache(null, null)) + ); Map outstandingRequests = new HashMap<>(); GroupShardsIterator groupIterator = opRouting.searchShards( state, @@ -602,7 +614,8 @@ public void testAdaptiveReplicaSelection() throws Exception { null, null, collector, - outstandingRequests + outstandingRequests, + admissionControllerService ); assertThat("One group per index shard", groupIterator.size(), equalTo(numIndices * numShards)); @@ -614,7 +627,7 @@ public void testAdaptiveReplicaSelection() throws Exception { searchedShards.add(firstChoice); selectedNodes.add(firstChoice.currentNodeId()); - groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, outstandingRequests); + groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, outstandingRequests, admissionControllerService); assertThat(groupIterator.size(), equalTo(numIndices * numShards)); ShardRouting secondChoice = groupIterator.get(0).nextOrNull(); @@ -622,7 +635,7 @@ public void testAdaptiveReplicaSelection() throws Exception { searchedShards.add(secondChoice); selectedNodes.add(secondChoice.currentNodeId()); - groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, outstandingRequests); + groupIterator = opRouting.searchShards(state, indexNames, null, null, collector, outstandingRequests, admissionControllerService); assertThat(groupIterator.size(), equalTo(numIndices * numShards)); ShardRouting thirdChoice = groupIterator.get(0).nextOrNull(); diff --git a/server/src/test/java/org/opensearch/common/network/NetworkModuleTests.java b/server/src/test/java/org/opensearch/common/network/NetworkModuleTests.java index 3a22fed5bb2ec..347d8ff0eed45 100644 --- a/server/src/test/java/org/opensearch/common/network/NetworkModuleTests.java +++ b/server/src/test/java/org/opensearch/common/network/NetworkModuleTests.java @@ -313,7 +313,8 @@ private NetworkModule newNetworkModule(Settings settings, NetworkPlugin... plugi xContentRegistry(), null, new NullDispatcher(), - new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), + null ); } } diff --git a/server/src/test/java/org/opensearch/node/ResponseCollectorServiceTests.java b/server/src/test/java/org/opensearch/node/ResponseCollectorServiceTests.java index 2b13df3027cfa..e5c14c64485f6 100644 --- a/server/src/test/java/org/opensearch/node/ResponseCollectorServiceTests.java +++ b/server/src/test/java/org/opensearch/node/ResponseCollectorServiceTests.java @@ -78,6 +78,7 @@ public void tearDown() throws Exception { } public void testNodeStats() throws Exception { + // TODO collector.addNodeStatistics("node1", 1, 100, 10); Map nodeStats = collector.getAllNodeStatistics(); assertTrue(nodeStats.containsKey("node1")); diff --git a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java index 0bb2b604e8f1a..cecc1b1765778 100644 --- a/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/opensearch/snapshots/SnapshotResiliencyTests.java @@ -1762,7 +1762,7 @@ public void onFailure(final Exception e) { new ThreadContext(Settings.EMPTY) ); transportService.getTaskManager() - .setTaskResourceTrackingService(new TaskResourceTrackingService(settings, clusterSettings, threadPool)); + .setTaskResourceTrackingService(new TaskResourceTrackingService(settings, clusterSettings, threadPool, null)); repositoriesService = new RepositoriesService( settings, clusterService, @@ -2034,6 +2034,7 @@ public void onFailure(final Exception e) { new FetchPhase(Collections.emptyList()), responseCollectorService, new NoneCircuitBreakerService(), + null, null ); SearchPhaseController searchPhaseController = new SearchPhaseController( diff --git a/server/src/test/java/org/opensearch/tasks/TaskResourceTrackingServiceTests.java b/server/src/test/java/org/opensearch/tasks/TaskResourceTrackingServiceTests.java index 3dcd634c234a3..c40349251db0e 100644 --- a/server/src/test/java/org/opensearch/tasks/TaskResourceTrackingServiceTests.java +++ b/server/src/test/java/org/opensearch/tasks/TaskResourceTrackingServiceTests.java @@ -41,7 +41,8 @@ public void setup() { taskResourceTrackingService = new TaskResourceTrackingService( Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), - threadPool + threadPool, + null ); } diff --git a/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java index b815a819756b7..fff060939d1c8 100644 --- a/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/replication/OpenSearchIndexLevelReplicationTestCase.java @@ -981,7 +981,8 @@ private void executeShardBulkOnPrimary( listener.onResponse((TransportWriteAction.WritePrimaryResult) result); }), threadPool, - Names.WRITE + Names.WRITE, + "NodeId" ); } catch (Exception e) { listener.onFailure(e); diff --git a/test/framework/src/main/java/org/opensearch/node/MockNode.java b/test/framework/src/main/java/org/opensearch/node/MockNode.java index 1d8d2dd086418..e828c128f4433 100644 --- a/test/framework/src/main/java/org/opensearch/node/MockNode.java +++ b/test/framework/src/main/java/org/opensearch/node/MockNode.java @@ -32,6 +32,7 @@ package org.opensearch.node; +import org.opensearch.admissioncontroller.AdmissionControllerService; import org.opensearch.client.node.NodeClient; import org.opensearch.cluster.ClusterInfoService; import org.opensearch.cluster.MockInternalClusterInfoService; @@ -154,7 +155,8 @@ protected SearchService newSearchService( FetchPhase fetchPhase, ResponseCollectorService responseCollectorService, CircuitBreakerService circuitBreakerService, - Executor indexSearcherExecutor + Executor indexSearcherExecutor, + AdmissionControllerService admissionControllerService ) { if (getPluginsService().filterPlugins(MockSearchService.TestPlugin.class).isEmpty()) { return super.newSearchService( @@ -167,7 +169,8 @@ protected SearchService newSearchService( fetchPhase, responseCollectorService, circuitBreakerService, - indexSearcherExecutor + indexSearcherExecutor, + admissionControllerService ); } return new MockSearchService( @@ -179,7 +182,8 @@ protected SearchService newSearchService( queryPhase, fetchPhase, circuitBreakerService, - indexSearcherExecutor + indexSearcherExecutor, + admissionControllerService ); } diff --git a/test/framework/src/main/java/org/opensearch/search/MockSearchService.java b/test/framework/src/main/java/org/opensearch/search/MockSearchService.java index 808dc50512c58..fde09d23f2101 100644 --- a/test/framework/src/main/java/org/opensearch/search/MockSearchService.java +++ b/test/framework/src/main/java/org/opensearch/search/MockSearchService.java @@ -32,6 +32,7 @@ package org.opensearch.search; +import org.opensearch.admissioncontroller.AdmissionControllerService; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.util.BigArrays; import org.opensearch.indices.IndicesService; @@ -96,7 +97,8 @@ public MockSearchService( QueryPhase queryPhase, FetchPhase fetchPhase, CircuitBreakerService circuitBreakerService, - Executor indexSearcherExecutor + Executor indexSearcherExecutor, + AdmissionControllerService admissionControllerService ) { super( clusterService, @@ -108,7 +110,8 @@ public MockSearchService( fetchPhase, null, circuitBreakerService, - indexSearcherExecutor + indexSearcherExecutor, + admissionControllerService ); } From 9fb32539ba9b507378c07b7818caf21058b3ba6e Mon Sep 17 00:00:00 2001 From: Bharathwaj G Date: Sun, 2 Jul 2023 09:58:26 +0530 Subject: [PATCH 08/11] more changes Signed-off-by: Bharathwaj G --- .../action/bulk/TransportShardBulkAction.java | 4 +- .../AdmissionControllerRequestHandler.java | 8 +- .../AdmissionControllerService.java | 6 ++ .../routing/IndexShardRoutingTable.java | 5 + .../org/opensearch/monitor/fs/FsInfo.java | 91 ++++++++++++++++++- .../org/opensearch/monitor/fs/FsProbe.java | 8 ++ .../node/AdaptiveSelectionStats.java | 3 + .../node/ResponseCollectorService.java | 22 +++-- .../opensearch/search/query/QueryPhase.java | 10 +- 9 files changed, 138 insertions(+), 19 deletions(-) diff --git a/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java index f6a54434d03c4..5098c5380681d 100644 --- a/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java @@ -530,8 +530,8 @@ private void finishRequest() { // np.put(nodeId, nodePerfStats); //ThreadContext.StoredContext storedContext = threadContext.newStoredContext(true, Collections.singletonList("PERF_STATS")); //ThreadContext.StoredContext storedContext = threadContext.newStoredContext(true, Collections.singletonList("PERF_STATS")); - //threadContext.putTransient("PERF_STATS", nodePerfStats); - //threadContext.putHeader("PERF_STATS", nodePerfStatsMap); + // threadContext.putTransient("PERF_STATS", nodePerfStats); + // threadContext.putHeader("PERF_STATS", nodePerfStatsMap); // ThreadContext.StoredContext storedContext1 = threadContext.newStoredContext(true, Collections.singletonList("T_ID")); // threadContext.putTransient("T_ID", "nodePerfStats"); diff --git a/server/src/main/java/org/opensearch/admissioncontroller/AdmissionControllerRequestHandler.java b/server/src/main/java/org/opensearch/admissioncontroller/AdmissionControllerRequestHandler.java index c0cbf3ddc0a04..0b4b7bba48f72 100644 --- a/server/src/main/java/org/opensearch/admissioncontroller/AdmissionControllerRequestHandler.java +++ b/server/src/main/java/org/opensearch/admissioncontroller/AdmissionControllerRequestHandler.java @@ -58,10 +58,10 @@ public void messageReceived(T request, TransportChannel channel, Task task) thro // Evaluate the requests here. if (this.admissionControllerService.isIOInStress()) { log.info("Admission controller service responded with IO is in stress state"); - if (this.isSearchRequest()){ - channel.sendResponse(new OpenSearchRejectedExecutionException("Execution Rejected due to high IO usage")); - return; - } +// if (this.isSearchRequest()){ +// channel.sendResponse(new OpenSearchRejectedExecutionException("Execution Rejected due to high IO usage")); +// return; +// } }else { //log.info("Admission controller service responded with IO is in healthy state"); } diff --git a/server/src/main/java/org/opensearch/admissioncontroller/AdmissionControllerService.java b/server/src/main/java/org/opensearch/admissioncontroller/AdmissionControllerService.java index d63234b3b4676..3b8309dd0a4b3 100644 --- a/server/src/main/java/org/opensearch/admissioncontroller/AdmissionControllerService.java +++ b/server/src/main/java/org/opensearch/admissioncontroller/AdmissionControllerService.java @@ -130,9 +130,15 @@ private void monitorIOUtilisation() { Map currentIOTimeMap = new HashMap<>(); for (FsInfo.DeviceStats devicesStat : this.fsService.stats().getIoStats().getDevicesStats()) { logger.info("Device Id: " + devicesStat.getDeviceName() + "; IO time: " + devicesStat.getCurrentIOTime()); + logger.info("Read Latency : " + devicesStat.getCurrentReadLatency() + " Write latency : " + devicesStat.getCurrentWriteLatency()); + logger.info("Write time : " + devicesStat.getCurrentWriteTime() + "Read time : " + devicesStat.getCurrentReadTime()); + logger.info("Read latency diff : " + devicesStat.getReadLatency() + "Write latency diff" + devicesStat.getWriteLatency()); + logger.info("Read time diff : " + devicesStat.getReadTime() + "Write time diff" + devicesStat.getWriteTime()); + if (previousIOTimeMap.containsKey(devicesStat.getDeviceName())){ long ioSpentTime = devicesStat.getCurrentIOTime() - previousIOTimeMap.get(devicesStat.getDeviceName()); double ioUsePercent = (double) (ioSpentTime * 100) / (10 * 1000); + ioExecutionEWMA.addValue(ioUsePercent / 100.0); Queue ioUsageQueue; if (deviceIOUsage.containsKey(devicesStat.getDeviceName())) { ioUsageQueue = deviceIOUsage.get(devicesStat.getDeviceName()); diff --git a/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java b/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java index 282c9331217e4..8fd3ead06cbf0 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java +++ b/server/src/main/java/org/opensearch/cluster/routing/IndexShardRoutingTable.java @@ -487,9 +487,14 @@ private static void adjustStats( final long updatedResponse = (long) (minStats.responseTime + stats.responseTime) / 2; final long updatedService = (long) (minStats.serviceTime + stats.serviceTime) / 2; // revisit this - basically reset stats based on time + logger.info("Stats before adjust CPU : {}, MEM : {}, IO : {}", + stats.nodePerfStats.cpuPercentAvg, stats.nodePerfStats.memoryPercentAvg, stats.nodePerfStats.ioPercentAvg); final double cpuPercentAvg = stats.nodePerfStats.cpuPercentAvg * 0.99; final double memPercentAvg = stats.nodePerfStats.memoryPercentAvg * 0.99; final double ioPercentAvg = stats.nodePerfStats.ioPercentAvg * 0.99; + logger.info("Stats after adjust CPU : {}, MEM : {}, IO : {}", + cpuPercentAvg, memPercentAvg, ioPercentAvg); + NodePerfStats nodePerfStats = new NodePerfStats(cpuPercentAvg, memPercentAvg, ioPercentAvg); collector.addNodeStatistics(nodeId, updatedQueue, updatedResponse, updatedService, nodePerfStats); } diff --git a/server/src/main/java/org/opensearch/monitor/fs/FsInfo.java b/server/src/main/java/org/opensearch/monitor/fs/FsInfo.java index 13e522387dad0..31ebd768cc7bf 100644 --- a/server/src/main/java/org/opensearch/monitor/fs/FsInfo.java +++ b/server/src/main/java/org/opensearch/monitor/fs/FsInfo.java @@ -237,6 +237,14 @@ public static class DeviceStats implements Writeable, ToXContentFragment { final long previousSectorsWritten; final long currentIOTime; final long previousIOTime; + final double currentReadTime; + final double previousReadTime; + final double currentWriteTime; + final double previousWriteTime; + final double currentReadLatency; + final double previousReadLatency; + final double currentWriteLatency; + final double previousWriteLatency; public DeviceStats( final int majorDeviceNumber, @@ -247,6 +255,10 @@ public DeviceStats( final long currentWritesCompleted, final long currentSectorsWritten, final long currentIOTime, + final double currentReadTime, + final double currentWriteTime, + final double currentReadLatency, + final double currentWriteLatency, final DeviceStats previousDeviceStats ) { this( @@ -262,7 +274,15 @@ public DeviceStats( currentWritesCompleted, previousDeviceStats != null ? previousDeviceStats.currentWritesCompleted : -1, currentIOTime, - previousDeviceStats != null ? previousDeviceStats.currentIOTime : -1 + previousDeviceStats != null ? previousDeviceStats.currentIOTime : -1, + currentReadTime, + previousDeviceStats != null ? previousDeviceStats.previousReadTime : -1.0, + currentWriteTime, + previousDeviceStats != null ? previousDeviceStats.previousWriteTime : -1.0, + currentReadLatency, + previousDeviceStats != null ? previousDeviceStats.currentReadLatency : -1.0, + currentWriteLatency, + previousDeviceStats != null ? previousDeviceStats.currentWriteLatency : -1.0 ); } @@ -279,7 +299,15 @@ private DeviceStats( final long currentWritesCompleted, final long previousWritesCompleted, final long currentIOTime, - final long previousIOTime + final long previousIOTime, + final double currentReadTime, + final double previousReadTime, + final double currentWriteTime, + final double previousWriteTime, + final double currentReadLatency, + final double previousReadLatency, + final double currentWriteLatency, + final double previousWriteLatency ) { this.majorDeviceNumber = majorDeviceNumber; this.minorDeviceNumber = minorDeviceNumber; @@ -294,6 +322,14 @@ private DeviceStats( this.previousSectorsWritten = previousSectorsWritten; this.currentIOTime = currentIOTime; this.previousIOTime = previousIOTime; + this.currentReadTime = currentReadTime; + this.previousReadTime = previousReadTime; + this.currentWriteTime = currentWriteTime; + this.previousWriteTime = previousWriteTime; + this.currentReadLatency = currentReadLatency; + this.previousReadLatency = previousReadLatency; + this.currentWriteLatency = currentWriteLatency; + this.previousWriteLatency = previousWriteLatency; } public DeviceStats(StreamInput in) throws IOException { @@ -310,6 +346,14 @@ public DeviceStats(StreamInput in) throws IOException { previousSectorsWritten = in.readLong(); currentIOTime = in.readLong(); previousIOTime = in.readLong(); + currentReadTime = in.readDouble(); + previousReadTime = in.readDouble(); + currentWriteTime = in.readDouble(); + previousWriteTime = in.readDouble(); + currentReadLatency = in.readDouble(); + previousReadLatency = in.readDouble(); + currentWriteLatency = in.readDouble(); + previousWriteLatency = in.readDouble(); } @Override @@ -327,6 +371,13 @@ public void writeTo(StreamOutput out) throws IOException { out.writeLong(previousSectorsWritten); out.writeLong(currentIOTime); out.writeLong(previousIOTime); + out.writeDouble(currentReadTime); + out.writeDouble(currentWriteTime); + out.writeDouble(previousWriteTime); + out.writeDouble(currentReadLatency); + out.writeDouble(previousReadLatency); + out.writeDouble(currentWriteLatency); + out.writeDouble(previousWriteLatency); } public long operations() { @@ -365,10 +416,46 @@ public long ioTimeInMillis() { return (currentIOTime - previousIOTime); } + public double getWriteLatency() { + if(previousWriteLatency == -1.0) return -1.0; + return currentWriteLatency - previousWriteLatency; + } + + public double getReadLatency() { + if(previousReadLatency == -1.0) return -1.0; + return currentReadLatency - previousReadLatency; + } + + public double getReadTime() { + if(previousReadTime == -1.0) return -1.0; + return currentReadTime - previousReadTime; + } + + public double getWriteTime() { + if(previousWriteTime == -1.0) return -1.0; + return currentWriteTime - previousWriteTime; + } + public long getCurrentIOTime() { return this.currentIOTime; } + public double getCurrentReadTime() { + return this.currentReadTime; + } + + public double getCurrentWriteTime() { + return this.currentWriteTime; + } + + public double getCurrentReadLatency() { + return this.currentReadLatency; + } + + public double getCurrentWriteLatency() { + return this.currentWriteLatency; + } + public String getDeviceName() { return this.deviceName; } diff --git a/server/src/main/java/org/opensearch/monitor/fs/FsProbe.java b/server/src/main/java/org/opensearch/monitor/fs/FsProbe.java index 55472aa2d3e35..a366c2fd2e64a 100644 --- a/server/src/main/java/org/opensearch/monitor/fs/FsProbe.java +++ b/server/src/main/java/org/opensearch/monitor/fs/FsProbe.java @@ -123,6 +123,10 @@ final FsInfo.IoStats ioStats(final Set> devicesNumbers, final long sectorsRead = Long.parseLong(fields[5]); final long writesCompleted = Long.parseLong(fields[7]); final long sectorsWritten = Long.parseLong(fields[9]); + final double readTime = Double.parseDouble(fields[6]); + final double writeTime = Double.parseDouble(fields[10]); + final double readLatency = readTime / readsCompleted; + final double writeLatency = writeTime / writesCompleted; final long ioTime = Long.parseLong(fields[12]); final FsInfo.DeviceStats deviceStats = new FsInfo.DeviceStats( majorDeviceNumber, @@ -133,6 +137,10 @@ final FsInfo.IoStats ioStats(final Set> devicesNumbers, writesCompleted, sectorsWritten, ioTime, + readTime, + writeTime, + readLatency, + writeLatency, deviceMap.get(Tuple.tuple(majorDeviceNumber, minorDeviceNumber)) ); devicesStats.add(deviceStats); diff --git a/server/src/main/java/org/opensearch/node/AdaptiveSelectionStats.java b/server/src/main/java/org/opensearch/node/AdaptiveSelectionStats.java index df5c7b93bdc2d..191fbd1b20bbf 100644 --- a/server/src/main/java/org/opensearch/node/AdaptiveSelectionStats.java +++ b/server/src/main/java/org/opensearch/node/AdaptiveSelectionStats.java @@ -98,6 +98,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws } builder.field("avg_response_time_ns", (long) stats.responseTime); builder.field("rank", String.format(Locale.ROOT, "%.1f", stats.rank(outgoingSearches))); + builder.field("cpu", (double) stats.nodePerfStats.cpuPercentAvg); + builder.field("memory", (double) stats.nodePerfStats.memoryPercentAvg); + builder.field("io", (double) stats.nodePerfStats.ioPercentAvg); } builder.endObject(); } diff --git a/server/src/main/java/org/opensearch/node/ResponseCollectorService.java b/server/src/main/java/org/opensearch/node/ResponseCollectorService.java index 413daf2b0692f..42b503c2136a7 100644 --- a/server/src/main/java/org/opensearch/node/ResponseCollectorService.java +++ b/server/src/main/java/org/opensearch/node/ResponseCollectorService.java @@ -107,6 +107,10 @@ public void addNodeStatistics(String nodeId, int queueSize, long responseTimeNan }); } + /** + * This method + * @return + */ public Map getAllNodeStatistics() { final int clientNum = nodeIdToStats.size(); // Transform the mutable object internally used for accounting into the computed version @@ -196,6 +200,7 @@ public void writeTo(StreamOutput out) throws IOException { * https://www.usenix.org/system/files/conference/nsdi15/nsdi15-paper-suresh.pdf */ private double innerRank(long outstandingRequests) { + // // the concurrency compensation is defined as the number of // outstanding requests from the client to the node times the number // of clients in the system @@ -217,13 +222,18 @@ private double innerRank(long outstandingRequests) { // The final formula double rank = rS - (1.0 / muBarS) + (Math.pow(qHatS, queueAdjustmentFactor) / muBarS); - logger.info("queue size : {} , queue size with compensation factor : {} , response time : {} ," + - " service time : {} , rank : {}", qBar, qHatS, rS, muBarS, rank); - logger.info("CPU : {} , Mem : {} , IO : {} ", nodePerfStats.cpuPercentAvg, + logger.info("Node id : {} , queue size : {} , queue size with compensation factor : {} , response time : {} ," + + " service time : {} , rank : {}", nodeId, qBar, qHatS, rS, muBarS, rank); + logger.info("Node ID : {} CPU : {} , Mem : {} , IO : {} ", nodeId, nodePerfStats.cpuPercentAvg, nodePerfStats.memoryPercentAvg, nodePerfStats.ioPercentAvg); - if(nodePerfStats.cpuPercentAvg > 50.0 ) rank = rank * 2; - if(nodePerfStats.memoryPercentAvg > 50.0 ) rank = rank * 2; - if(nodePerfStats.ioPercentAvg > 50.0) rank = rank * 2; + logger.info("rank before adjustment : {}", rank); + // adjust rank if the node is overloaded + // how to adjust rank + if(nodePerfStats.cpuPercentAvg > 90.0 ) rank = rank * 2; + if(nodePerfStats.memoryPercentAvg > 90.0 ) rank = rank * 2; + if(nodePerfStats.ioPercentAvg > 90.0) rank = rank * 2; + // is there flaw in above logic + logger.info("rank after adjustment : {}", rank); return rank; } diff --git a/server/src/main/java/org/opensearch/search/query/QueryPhase.java b/server/src/main/java/org/opensearch/search/query/QueryPhase.java index 07232d8cd44c0..ae24cdba7d2ea 100644 --- a/server/src/main/java/org/opensearch/search/query/QueryPhase.java +++ b/server/src/main/java/org/opensearch/search/query/QueryPhase.java @@ -304,14 +304,14 @@ static boolean executeInternal(SearchContext searchContext, QueryPhaseSearcher q final EWMATrackingThreadPoolExecutor rExecutor = (EWMATrackingThreadPoolExecutor) executor; queryResult.nodeQueueSize(rExecutor.getCurrentQueueSize()); queryResult.serviceTimeEWMA((long) rExecutor.getTaskExecutionEWMA()); - LOGGER.info("Adding node perf stats for CPU : {} , MEM : {} , IO : {}" ,admissionControllerService.getCPUEWMA() * 1000, - admissionControllerService.getMemoryEWMA() * 1000, admissionControllerService.getIoEWMA() * 1000); + LOGGER.info("Adding node perf stats for CPU : {} , MEM : {} , IO : {}" ,admissionControllerService.getCPUEWMA() * 100, + admissionControllerService.getMemoryEWMA() * 100, admissionControllerService.getIoEWMA() * 100); NodePerfStats nodePerfStats = null; if(admissionControllerService != null) { nodePerfStats = new NodePerfStats( - admissionControllerService.getCPUEWMA(), - admissionControllerService.getMemoryEWMA(), - admissionControllerService.getIoEWMA() + admissionControllerService.getCPUEWMA() * 100, + admissionControllerService.getMemoryEWMA() * 100, + admissionControllerService.getIoEWMA() * 100 ); queryResult.nodePerfStats(nodePerfStats); From a046f9dfb6013e312b3875e353993aa5e1f43871 Mon Sep 17 00:00:00 2001 From: Bharathwaj G Date: Fri, 21 Jul 2023 09:35:32 +0530 Subject: [PATCH 09/11] latency Signed-off-by: Bharathwaj G --- .../AdmissionControllerService.java | 1 + .../main/java/org/opensearch/monitor/fs/FsInfo.java | 12 ++++++++++++ 2 files changed, 13 insertions(+) diff --git a/server/src/main/java/org/opensearch/admissioncontroller/AdmissionControllerService.java b/server/src/main/java/org/opensearch/admissioncontroller/AdmissionControllerService.java index 3b8309dd0a4b3..0a7e734048f1d 100644 --- a/server/src/main/java/org/opensearch/admissioncontroller/AdmissionControllerService.java +++ b/server/src/main/java/org/opensearch/admissioncontroller/AdmissionControllerService.java @@ -135,6 +135,7 @@ private void monitorIOUtilisation() { logger.info("Read latency diff : " + devicesStat.getReadLatency() + "Write latency diff" + devicesStat.getWriteLatency()); logger.info("Read time diff : " + devicesStat.getReadTime() + "Write time diff" + devicesStat.getWriteTime()); + logger.info("Read latency : " + devicesStat.getNewReadLatency() + " Write latency : " + devicesStat.getNewWriteLatency()); if (previousIOTimeMap.containsKey(devicesStat.getDeviceName())){ long ioSpentTime = devicesStat.getCurrentIOTime() - previousIOTimeMap.get(devicesStat.getDeviceName()); double ioUsePercent = (double) (ioSpentTime * 100) / (10 * 1000); diff --git a/server/src/main/java/org/opensearch/monitor/fs/FsInfo.java b/server/src/main/java/org/opensearch/monitor/fs/FsInfo.java index 31ebd768cc7bf..3d286ec816308 100644 --- a/server/src/main/java/org/opensearch/monitor/fs/FsInfo.java +++ b/server/src/main/java/org/opensearch/monitor/fs/FsInfo.java @@ -421,6 +421,18 @@ public double getWriteLatency() { return currentWriteLatency - previousWriteLatency; } + public double getNewWriteLatency() { + //double readLatency = getReadTime() / readOperations(); + double writeLatency = getWriteTime() / writeOperations(); + return writeLatency; + } + + public double getNewReadLatency() { + //double readLatency = getReadTime() / readOperations(); + double readLatency = getReadTime() / writeOperations(); + return readLatency; + } + public double getReadLatency() { if(previousReadLatency == -1.0) return -1.0; return currentReadLatency - previousReadLatency; From 0ddb0e278de44f92915f477f639684e076f5cb6f Mon Sep 17 00:00:00 2001 From: Bharathwaj G Date: Fri, 21 Jul 2023 10:16:53 +0530 Subject: [PATCH 10/11] latency Signed-off-by: Bharathwaj G --- .../AdmissionControllerService.java | 52 +++++++++++++++---- .../org/opensearch/monitor/fs/FsInfo.java | 10 +++- 2 files changed, 52 insertions(+), 10 deletions(-) diff --git a/server/src/main/java/org/opensearch/admissioncontroller/AdmissionControllerService.java b/server/src/main/java/org/opensearch/admissioncontroller/AdmissionControllerService.java index 0a7e734048f1d..133379cebc000 100644 --- a/server/src/main/java/org/opensearch/admissioncontroller/AdmissionControllerService.java +++ b/server/src/main/java/org/opensearch/admissioncontroller/AdmissionControllerService.java @@ -38,7 +38,7 @@ public class AdmissionControllerService extends AbstractLifecycleComponent { private final ThreadPool threadPool; private final TimeValue refreshInterval; private volatile Scheduler.Cancellable scheduledFuture; - private Map previousIOTimeMap; + private Map previousIOTimeMap; private final Map> deviceIOUsage; private static final double IO_MAX_USAGE = 30; @@ -100,6 +100,21 @@ protected void doClose() throws IOException { } + class DevicePreiousStats { + public long ioTime; + public double readTime; + public double writeTime; + public double readOps; + public double writeOps; + public DevicePreiousStats(long ioTime, double readTime, double writeTime, double readOps, double writeOps) { + this.ioTime = ioTime; + this.readTime = readTime; + this.writeTime = writeTime; + this.readOps = readOps; + this.writeOps = writeOps; + } + } + class IOMonitor implements Runnable { private final FsService fsService; @@ -127,19 +142,34 @@ private void monitorMemoryUtilisation() { } private void monitorIOUtilisation() { logger.info("IO stats is triggered"); - Map currentIOTimeMap = new HashMap<>(); + Map currentIOTimeMap = new HashMap<>(); for (FsInfo.DeviceStats devicesStat : this.fsService.stats().getIoStats().getDevicesStats()) { logger.info("Device Id: " + devicesStat.getDeviceName() + "; IO time: " + devicesStat.getCurrentIOTime()); - logger.info("Read Latency : " + devicesStat.getCurrentReadLatency() + " Write latency : " + devicesStat.getCurrentWriteLatency()); - logger.info("Write time : " + devicesStat.getCurrentWriteTime() + "Read time : " + devicesStat.getCurrentReadTime()); - logger.info("Read latency diff : " + devicesStat.getReadLatency() + "Write latency diff" + devicesStat.getWriteLatency()); - logger.info("Read time diff : " + devicesStat.getReadTime() + "Write time diff" + devicesStat.getWriteTime()); +// logger.info("Read Latency : " + devicesStat.getCurrentReadLatency() + " Write latency : " + devicesStat.getCurrentWriteLatency()); +// logger.info("Write time : " + devicesStat.getCurrentWriteTime() + "Read time : " + devicesStat.getCurrentReadTime()); +// logger.info("Read latency diff : " + devicesStat.getReadLatency() + "Write latency diff" + devicesStat.getWriteLatency()); +// logger.info("Read time diff : " + devicesStat.getReadTime() + "Write time diff" + devicesStat.getWriteTime()); - logger.info("Read latency : " + devicesStat.getNewReadLatency() + " Write latency : " + devicesStat.getNewWriteLatency()); + //logger.info("Read latency : " + devicesStat.getNewReadLatency() + " Write latency : " + devicesStat.getNewWriteLatency()); + + logger.info(""); if (previousIOTimeMap.containsKey(devicesStat.getDeviceName())){ - long ioSpentTime = devicesStat.getCurrentIOTime() - previousIOTimeMap.get(devicesStat.getDeviceName()); + long ioSpentTime = devicesStat.getCurrentIOTime() - previousIOTimeMap.get(devicesStat.getDeviceName()).ioTime; double ioUsePercent = (double) (ioSpentTime * 100) / (10 * 1000); ioExecutionEWMA.addValue(ioUsePercent / 100.0); + + double readOps = devicesStat.currentReadOperations() - previousIOTimeMap.get(devicesStat.getDeviceName()).readOps; + double writeOps = devicesStat.currentReadOperations() - previousIOTimeMap.get(devicesStat.getDeviceName()).writeOps; + + double readTime = devicesStat.getCurrentReadTime() - previousIOTimeMap.get(devicesStat.getDeviceName()).readTime; + double writeTime = devicesStat.getWriteTime() - previousIOTimeMap.get(devicesStat.getDeviceName()).writeTime; + + double readLatency = readOps / readTime; + double wrieLatency = writeOps / writeTime; + + logger.info("read ops : {} , writeops : {} , readtime: {} , writetime: {}", readOps, writeOps, readTime, writeTime); + logger.info("Read latency final : " + readLatency + "write latency final : " + wrieLatency); + Queue ioUsageQueue; if (deviceIOUsage.containsKey(devicesStat.getDeviceName())) { ioUsageQueue = deviceIOUsage.get(devicesStat.getDeviceName()); @@ -159,7 +189,11 @@ private void monitorIOUtilisation() { } deviceIOUsage.put(devicesStat.getDeviceName(), ioUsageQueue); } - currentIOTimeMap.put(devicesStat.getDeviceName(), devicesStat.getCurrentIOTime()); + + DevicePreiousStats ps = new DevicePreiousStats(devicesStat.getCurrentIOTime(), devicesStat.getCurrentReadTime(), + devicesStat.getCurrentWriteTime(), devicesStat.currentReadOperations(), devicesStat.currentWriteOpetations()); + + currentIOTimeMap.put(devicesStat.getDeviceName(), ps); } previousIOTimeMap = currentIOTimeMap; } diff --git a/server/src/main/java/org/opensearch/monitor/fs/FsInfo.java b/server/src/main/java/org/opensearch/monitor/fs/FsInfo.java index 3d286ec816308..3436b7b0b19f7 100644 --- a/server/src/main/java/org/opensearch/monitor/fs/FsInfo.java +++ b/server/src/main/java/org/opensearch/monitor/fs/FsInfo.java @@ -398,6 +398,14 @@ public long writeOperations() { return (currentWritesCompleted - previousWritesCompleted); } + public long currentReadOperations() { + return currentReadsCompleted; + } + + public long currentWriteOpetations() { + return currentWritesCompleted; + } + public long readKilobytes() { if (previousSectorsRead == -1) return -1; @@ -429,7 +437,7 @@ public double getNewWriteLatency() { public double getNewReadLatency() { //double readLatency = getReadTime() / readOperations(); - double readLatency = getReadTime() / writeOperations(); + double readLatency = getReadTime() / readOperations(); return readLatency; } From cb0adac7810a8b1eae8033d3c6f2242aef9026ec Mon Sep 17 00:00:00 2001 From: Bharathwaj G Date: Fri, 21 Jul 2023 10:21:41 +0530 Subject: [PATCH 11/11] latency Signed-off-by: Bharathwaj G --- .../admissioncontroller/AdmissionControllerService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/admissioncontroller/AdmissionControllerService.java b/server/src/main/java/org/opensearch/admissioncontroller/AdmissionControllerService.java index 133379cebc000..ab4ed550d9f2c 100644 --- a/server/src/main/java/org/opensearch/admissioncontroller/AdmissionControllerService.java +++ b/server/src/main/java/org/opensearch/admissioncontroller/AdmissionControllerService.java @@ -159,7 +159,7 @@ private void monitorIOUtilisation() { ioExecutionEWMA.addValue(ioUsePercent / 100.0); double readOps = devicesStat.currentReadOperations() - previousIOTimeMap.get(devicesStat.getDeviceName()).readOps; - double writeOps = devicesStat.currentReadOperations() - previousIOTimeMap.get(devicesStat.getDeviceName()).writeOps; + double writeOps = devicesStat.currentWriteOpetations() - previousIOTimeMap.get(devicesStat.getDeviceName()).writeOps; double readTime = devicesStat.getCurrentReadTime() - previousIOTimeMap.get(devicesStat.getDeviceName()).readTime; double writeTime = devicesStat.getWriteTime() - previousIOTimeMap.get(devicesStat.getDeviceName()).writeTime;