From 29da30a1fa97f031c26b4a66c9a8d0a0fb6531bd Mon Sep 17 00:00:00 2001 From: Rishabh Maurya Date: Mon, 24 Jul 2023 22:44:06 -0700 Subject: [PATCH 1/4] Opentelemetry integration for trace events --- plugins/telemetry-otel/build.gradle | 50 ++------ .../telemetry/OTelTelemetryPlugin.java | 31 +++-- .../telemetry/OTelTelemetrySettings.java | 10 -- .../diagnostics/jmx/JMXMetricType.java | 44 +------ .../jmx/JMXMetricsObserverThread.java | 23 +--- .../diagnostics/jmx/JMXOTelMetricEmitter.java | 48 +------- .../jmx/JMXThreadResourceRecorder.java | 26 +--- .../metrics/OTelMetricExporterFactory.java | 116 +----------------- .../metrics/OTelMetricsTelemetry.java | 1 + .../tracing/OTelResourceProvider.java | 27 ++-- .../plugin-metadata/plugin-security.policy | 6 + 11 files changed, 70 insertions(+), 312 deletions(-) diff --git a/plugins/telemetry-otel/build.gradle b/plugins/telemetry-otel/build.gradle index f60fd170c218c..389e13ae8d999 100644 --- a/plugins/telemetry-otel/build.gradle +++ b/plugins/telemetry-otel/build.gradle @@ -30,14 +30,15 @@ dependencies { api "io.opentelemetry:opentelemetry-sdk-logs:${versions.opentelemetry}-alpha" api "io.opentelemetry:opentelemetry-api-logs:${versions.opentelemetry}-alpha" + // TODO add license files // exporters api "io.opentelemetry:opentelemetry-exporter-common:${versions.opentelemetry}" api "io.opentelemetry:opentelemetry-exporter-otlp:${versions.opentelemetry}" api "io.opentelemetry:opentelemetry-exporter-otlp-common:${versions.opentelemetry}" api "io.opentelemetry:opentelemetry-extension-incubator:${versions.opentelemetry}-alpha" - api "com.squareup.okhttp3:okhttp:4.10.0" - api "org.jetbrains.kotlin:kotlin-stdlib:${versions.kotlin}" + api("com.squareup.okhttp3:okhttp:${versions.okhttp}") + api 'org.jetbrains.kotlin:kotlin-stdlib:1.6.20' api 'com.squareup.okio:okio-jvm:3.0.0' } @@ -52,52 +53,17 @@ thirdPartyAudit { ) ignoreMissingClasses( - 'android.net.http.X509TrustManagerExtensions', - 'android.net.ssl.SSLSockets', - 'android.os.Build$VERSION', - 'android.security.NetworkSecurityPolicy', - 'android.util.Log', - 'com.google.common.io.ByteStreams', - 'com.google.common.util.concurrent.FutureCallback', - 'com.google.common.util.concurrent.Futures', - 'com.google.common.util.concurrent.ListenableFuture', - 'com.google.common.util.concurrent.MoreExecutors', - 'io.grpc.CallOptions', - 'io.grpc.Channel', - 'io.grpc.ClientInterceptor', - 'io.grpc.ClientInterceptors', - 'io.grpc.Codec', - 'io.grpc.Codec$Identity', - 'io.grpc.Drainable', - 'io.grpc.KnownLength', - 'io.grpc.ManagedChannel', - 'io.grpc.Metadata', - 'io.grpc.Metadata$Key', - 'io.grpc.MethodDescriptor', - 'io.grpc.MethodDescriptor$Builder', - 'io.grpc.MethodDescriptor$Marshaller', - 'io.grpc.MethodDescriptor$MethodType', - 'io.grpc.Status', - 'io.grpc.Status$Code', - 'io.grpc.stub.AbstractFutureStub', - 'io.grpc.stub.AbstractStub', - 'io.grpc.stub.ClientCalls', - 'io.grpc.stub.MetadataUtils', 'io.opentelemetry.api.events.EventEmitter', 'io.opentelemetry.api.events.EventEmitterBuilder', 'io.opentelemetry.api.events.EventEmitterProvider', + 'io.opentelemetry.extension.incubator.metrics.ExtendedDoubleHistogramBuilder', + 'io.opentelemetry.extension.incubator.metrics.ExtendedLongHistogramBuilder', + 'io.opentelemetry.extension.incubator.metrics.HistogramAdviceConfigurer', 'io.opentelemetry.sdk.autoconfigure.spi.ConfigProperties', 'io.opentelemetry.sdk.autoconfigure.spi.logs.ConfigurableLogRecordExporterProvider', 'io.opentelemetry.sdk.autoconfigure.spi.metrics.ConfigurableMetricExporterProvider', - 'io.opentelemetry.sdk.autoconfigure.spi.traces.ConfigurableSpanExporterProvider', - 'org.bouncycastle.jsse.BCSSLParameters', - 'org.bouncycastle.jsse.BCSSLSocket', - 'org.conscrypt.Conscrypt', - 'org.conscrypt.Conscrypt$Version', - 'org.conscrypt.ConscryptHostnameVerifier', - 'org.openjsse.javax.net.ssl.SSLParameters', - 'org.openjsse.javax.net.ssl.SSLSocket' - ) + 'io.opentelemetry.sdk.autoconfigure.spi.traces.ConfigurableSpanExporterProvider' + ) } tasks.named("bundlePlugin").configure { diff --git a/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/OTelTelemetryPlugin.java b/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/OTelTelemetryPlugin.java index 1556765d333f6..c7152b527f4e8 100644 --- a/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/OTelTelemetryPlugin.java +++ b/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/OTelTelemetryPlugin.java @@ -13,13 +13,20 @@ import org.opensearch.common.settings.Settings; import org.opensearch.plugins.Plugin; import org.opensearch.plugins.TelemetryPlugin; +import org.opensearch.telemetry.diagnostics.jmx.JMXMetricsObserverThread; +import org.opensearch.telemetry.diagnostics.jmx.JMXOTelMetricEmitter; +import org.opensearch.telemetry.diagnostics.jmx.JMXThreadResourceRecorder; +import org.opensearch.telemetry.tracing.listeners.TraceEventListener; import org.opensearch.telemetry.metrics.OTelMetricsTelemetry; import org.opensearch.telemetry.tracing.OTelResourceProvider; import org.opensearch.telemetry.tracing.OTelTelemetry; import org.opensearch.telemetry.tracing.OTelTracingTelemetry; +import org.opensearch.telemetry.diagnostics.DiagnosticsEventListener; import java.util.Arrays; +import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Optional; /** @@ -46,8 +53,7 @@ public List> getSettings() { OTelTelemetrySettings.TRACER_EXPORTER_DELAY_SETTING, OTelTelemetrySettings.TRACER_EXPORTER_MAX_QUEUE_SIZE_SETTING, OTelTelemetrySettings.OTEL_TRACER_SPAN_EXPORTER_CLASS_SETTING, - OTelTelemetrySettings.OTEL_TRACER_METRIC_EXPORTER_NAME_SETTING, - OTelTelemetrySettings.OTEL_TRACER_METRIC_READER_INTERVAL_SETTING + OTelTelemetrySettings.OTEL_TRACER_METRIC_EXPORTER_NAME_SETTING ); } @@ -56,6 +62,18 @@ public Optional getTelemetry(TelemetrySettings settings) { return Optional.of(telemetry()); } + @Override + public Map getTraceEventListeners(Telemetry telemetry) { + if (!ensureOpenTelemetry(telemetry)) { + return Collections.emptyMap(); + } + OpenTelemetry openTelemetry = ((OTelMetricsTelemetry)telemetry.getMetricsTelemetry()).getTelemetry(); + return Map.of("ThreadDiagnosticsTraceEventListener", new DiagnosticsEventListener( + new JMXThreadResourceRecorder(new JMXMetricsObserverThread()), + JMXOTelMetricEmitter.getInstance(openTelemetry) + )); + } + @Override public String getName() { return OTEL_TRACER_NAME; @@ -63,14 +81,13 @@ public String getName() { private Telemetry telemetry() { OpenTelemetry openTelemetry = OTelResourceProvider.get(settings); - return new OTelTelemetry(new OTelTracingTelemetry(openTelemetry), new OTelMetricsTelemetry(openTelemetry)); + return new OTelTelemetry(new OTelTracingTelemetry(openTelemetry), + new OTelMetricsTelemetry(openTelemetry)); } private boolean ensureOpenTelemetry(Telemetry telemetry) { - return (telemetry != null - && telemetry.getMetricsTelemetry() != null - && telemetry instanceof OTelTelemetry - && telemetry.getMetricsTelemetry() instanceof OTelMetricsTelemetry); + return (telemetry != null && telemetry.getMetricsTelemetry() != null + && telemetry instanceof OTelTelemetry && telemetry.getMetricsTelemetry() instanceof OTelMetricsTelemetry); } } diff --git a/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/OTelTelemetrySettings.java b/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/OTelTelemetrySettings.java index 92cee70424c18..2c928922c1e70 100644 --- a/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/OTelTelemetrySettings.java +++ b/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/OTelTelemetrySettings.java @@ -94,14 +94,4 @@ private OTelTelemetrySettings() {} Setting.Property.NodeScope, Setting.Property.Final ); - - /** - * Metric reader interval - */ - public static final Setting OTEL_TRACER_METRIC_READER_INTERVAL_SETTING = Setting.timeSetting( - "telemetry.otel.tracer.metric.reader.interval", - TimeValue.timeValueSeconds(60), - Setting.Property.NodeScope, - Setting.Property.Final - ); } diff --git a/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/diagnostics/jmx/JMXMetricType.java b/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/diagnostics/jmx/JMXMetricType.java index b7bae0ed1bfc6..f7e654528245f 100644 --- a/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/diagnostics/jmx/JMXMetricType.java +++ b/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/diagnostics/jmx/JMXMetricType.java @@ -9,46 +9,18 @@ package org.opensearch.telemetry.diagnostics.jmx; import com.sun.management.ThreadMXBean; -import org.opensearch.common.SuppressForbidden; - import java.util.function.BiFunction; /** - * Enum representing different types of JMX metrics and their corresponding value functions. - * Each metric type has a name and a value function that calculates the metric's value - * using the provided {@link ThreadMXBean} and {@link Thread}. + * Enum for all JMX metrics */ -@SuppressForbidden(reason = "java.lang.management.ThreadMXBean#getThreadAllocatedBytes() not supported") public enum JMXMetricType { - /** - * CPU time used by a thread. - */ CPU_TIME("cpu_time", (threadMXBean, t) -> threadMXBean.getThreadCpuTime(t.getId())), - - /** - * Number of bytes allocated by a thread. - */ ALLOCATED_BYTES("allocated_bytes", (threadMXBean, t) -> threadMXBean.getThreadAllocatedBytes(t.getId())), - - /** - * Number of times a thread has been blocked. - */ BLOCKED_COUNT("blocked_count", (threadMXBean, t) -> threadMXBean.getThreadInfo(t.getId()).getBlockedCount()), - - /** - * Amount of time a thread has spent blocked. - */ BLOCKED_TIME("blocked_time", (threadMXBean, t) -> threadMXBean.getThreadInfo(t.getId()).getBlockedTime()), - - /** - * Number of times a thread has waited. - */ WAITED_COUNT("waited_count", (threadMXBean, t) -> threadMXBean.getThreadInfo(t.getId()).getWaitedCount()), - - /** - * Amount of time a thread has spent waiting. - */ WAITED_TIME("waited_time", (threadMXBean, t) -> threadMXBean.getThreadInfo(t.getId()).getWaitedTime()); private final String name; @@ -59,23 +31,13 @@ public enum JMXMetricType { this.valueFunction = valueFunction; } - /** - * Get the name of the JMX metric type. - * - * @return the name of the metric type - */ public String getName() { return name; } - /** - * Get the value of the JMX metric for the given thread using the provided {@link ThreadMXBean}. - * - * @param threadMXBean the ThreadMXBean used to calculate the metric value - * @param t the Thread for which the metric value is calculated - * @return the calculated value of the metric - */ public long getValue(ThreadMXBean threadMXBean, Thread t) { return valueFunction.apply(threadMXBean, t); } } + + diff --git a/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/diagnostics/jmx/JMXMetricsObserverThread.java b/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/diagnostics/jmx/JMXMetricsObserverThread.java index 3201947191cd9..0209ea2d49a05 100644 --- a/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/diagnostics/jmx/JMXMetricsObserverThread.java +++ b/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/diagnostics/jmx/JMXMetricsObserverThread.java @@ -9,7 +9,6 @@ package org.opensearch.telemetry.diagnostics.jmx; import com.sun.management.ThreadMXBean; -import org.opensearch.common.SuppressForbidden; import org.opensearch.telemetry.diagnostics.ThreadResourceObserver; import org.opensearch.telemetry.metrics.Measurement; import org.opensearch.telemetry.metrics.MetricPoint; @@ -18,34 +17,16 @@ import java.util.HashMap; import java.util.Map; -/** - * Implementation of {@link ThreadResourceObserver} that observes various JMX metrics for a given thread. - * It collects metrics such as CPU time, allocated bytes, blocked count, blocked time, waited count, and waited time - * for the specified thread using {@link ThreadMXBean}. - */ -@SuppressForbidden(reason = "java.lang.management.ThreadMXBean#getThreadAllocatedBytes() not supported") public class JMXMetricsObserverThread implements ThreadResourceObserver { private static final ThreadMXBean threadMXBean = (ThreadMXBean) ManagementFactory.getThreadMXBean(); - /** - * Default constructor - */ - public JMXMetricsObserverThread() { - - } - - /** - * Observes JMX metrics for the given thread and creates a {@link MetricPoint} containing the measured values. - * - * @param t The thread for which to observe metrics. - * @return A {@link MetricPoint} containing the observed JMX metrics for the specified thread. - */ @Override public MetricPoint observe(Thread t) { long measurementTime = System.currentTimeMillis(); Map> measurements = new HashMap<>(); for (JMXMetricType measurement : JMXMetricType.values()) { - measurements.put(measurement.getName(), new Measurement<>(measurement.getName(), measurement.getValue(threadMXBean, t))); + measurements.put(measurement.getName(), + new Measurement<>(measurement.getName(), measurement.getValue(threadMXBean, t))); } return new MetricPoint(measurements, null, measurementTime); } diff --git a/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/diagnostics/jmx/JMXOTelMetricEmitter.java b/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/diagnostics/jmx/JMXOTelMetricEmitter.java index e43fe81e26021..03bb06e57d042 100644 --- a/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/diagnostics/jmx/JMXOTelMetricEmitter.java +++ b/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/diagnostics/jmx/JMXOTelMetricEmitter.java @@ -24,44 +24,20 @@ import static org.opensearch.telemetry.diagnostics.DiagnosticsEventListener.ELAPSED_TIME; -/** - * JMXOTelMetricEmitter is a MetricEmitter implementation that emits metrics using OpenTelemetry. - * It creates histograms for each {@link JMXMetricType} and records the metrics using the provided OpenTelemetry. - * The emitted metrics include the measurements received from the MetricPoint along with their attributes. - */ public class JMXOTelMetricEmitter implements MetricEmitter { - private static JMXOTelMetricEmitter INSTANCE; - static final Map histograms = new HashMap<>(); - + public static Map histograms = new HashMap<>(); private static Meter meter; - - /** - * Private constructor for JMXOTelMetricEmitter. - * It initializes the histograms for each JMX metric type using the provided OpenTelemetry. - * - * @param telemetry the OpenTelemetry instance to use for creating histograms - */ private JMXOTelMetricEmitter(OpenTelemetry telemetry) { JMXOTelMetricEmitter.meter = telemetry.getMeter(JMXOTelMetricEmitter.class.getName()); - for (JMXMetricType metricType : JMXMetricType.values()) { - LongHistogram histogram = AccessController.doPrivileged( - (PrivilegedAction) () -> meter.histogramBuilder(metricType.getName()).ofLongs().build() - ); + LongHistogram histogram = AccessController.doPrivileged((PrivilegedAction) () -> + meter.histogramBuilder(metricType.getName()).ofLongs().build()); histograms.put(metricType.getName(), histogram); } - histograms.put(ELAPSED_TIME, meter.histogramBuilder(ELAPSED_TIME).ofLongs().build()); } - /** - * Get the singleton instance of JMXOTelMetricEmitter. - * If the instance does not exist, it creates a new one using the provided OpenTelemetry. - * - * @param telemetry the OpenTelemetry instance to use for creating the singleton instance - * @return the singleton instance of JMXOTelMetricEmitter - */ synchronized public static JMXOTelMetricEmitter getInstance(OpenTelemetry telemetry) { if (INSTANCE == null) { INSTANCE = new JMXOTelMetricEmitter(telemetry); @@ -69,37 +45,23 @@ synchronized public static JMXOTelMetricEmitter getInstance(OpenTelemetry teleme return INSTANCE; } - /** - * Emit the given MetricPoint using OpenTelemetry. - * The method creates attributes for the metrics from the MetricPoint and records them to the corresponding histograms. - * - * @param metric the MetricPoint to emit - */ @Override public void emitMetric(MetricPoint metric) { AttributesBuilder attributesBuilder = Attributes.builder(); if (metric.getAttributes() != null) { - metric.getAttributes().forEach((k, v) -> attributesBuilder.put(k, String.valueOf(v))); + metric.getAttributes().forEach((k,v) -> attributesBuilder.put(k,String.valueOf(v))); } Attributes oTelAttributes = attributesBuilder.build(); - for (String measurementName : metric.getMeasurements().keySet()) { recordToHistogram(metric.getMeasurement(measurementName), oTelAttributes); } } - /** - * Records the given measurement to the corresponding histogram using OpenTelemetry. - * - * @param measurement the Measurement to record - * @param oTelAttributes the OpenTelemetry Attributes to associate with the recorded value - */ private void recordToHistogram(Measurement measurement, Attributes oTelAttributes) { LongHistogram histogram = JMXOTelMetricEmitter.histograms.get(measurement.getName()); - // assuming all measurements are of long type long value = measurement.getValue().longValue(); histogram.record(value, oTelAttributes); - // recording 0 value right after as it's the delta which we are emitting and not a gauge + // recording 0 value right after as it's the delta which we are emitting and not gauge histogram.record(0, oTelAttributes); } } diff --git a/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/diagnostics/jmx/JMXThreadResourceRecorder.java b/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/diagnostics/jmx/JMXThreadResourceRecorder.java index 7e0d095d0a45d..3c24a8cfe01b1 100644 --- a/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/diagnostics/jmx/JMXThreadResourceRecorder.java +++ b/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/diagnostics/jmx/JMXThreadResourceRecorder.java @@ -15,39 +15,21 @@ import java.util.HashMap; import java.util.Map; -/** - * A subclass of ThreadResourceRecorder that records thread resource metrics using a JMXMetricsObserverThread. - * It computes the difference between two MetricPoints and returns a new MetricPoint containing the differences - * in the measurements. - * - */ public class JMXThreadResourceRecorder extends ThreadResourceRecorder { - /** - * Constructs a JMXThreadResourceRecorder with the provided JMXMetricsObserverThread instance. - * - * @param observer The JMXMetricsObserverThread used for observing thread resource metrics. - */ public JMXThreadResourceRecorder(JMXMetricsObserverThread observer) { super(observer); } - /** - * Computes the difference between two MetricPoints and returns a new MetricPoint containing - * the differences in the measurements. - * - * @param startMetric The MetricPoint gauge observed at start - * @param endMetric The MetricPoint gauge observed at the end - * @return A MetricPoint containing the differences in the measurements between startMetric and endMetric. - */ @Override protected MetricPoint computeDiff(MetricPoint startMetric, MetricPoint endMetric) { Map> measurements = new HashMap<>(); for (String measurementName : endMetric.getMeasurements().keySet()) { - long startValue = startMetric.getMeasurement(measurementName).getValue().longValue(); - long endValue = endMetric.getMeasurement(measurementName).getValue().longValue(); - measurements.put(measurementName, new Measurement<>(measurementName, endValue - startValue)); + measurements.put(measurementName, new Measurement<>(measurementName, + endMetric.getMeasurement(measurementName).getValue().longValue() - + startMetric.getMeasurement(measurementName).getValue().longValue())); } return new MetricPoint(measurements, null, endMetric.getObservationTime()); } } + diff --git a/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/metrics/OTelMetricExporterFactory.java b/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/metrics/OTelMetricExporterFactory.java index bbeb9d0524f7c..ae3a1a03fa59e 100644 --- a/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/metrics/OTelMetricExporterFactory.java +++ b/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/metrics/OTelMetricExporterFactory.java @@ -12,48 +12,21 @@ import io.opentelemetry.exporter.otlp.http.metrics.OtlpHttpMetricExporter; import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporter; import io.opentelemetry.sdk.metrics.export.MetricExporter; -import io.opentelemetry.sdk.metrics.export.MetricReader; -import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader; import org.opensearch.common.settings.Settings; -import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.telemetry.OTelTelemetrySettings; import java.security.AccessController; -import java.security.PrivilegedAction; import java.security.PrivilegedActionException; import java.security.PrivilegedExceptionAction; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; -/** - * A factory class for creating different types of OpenTelemetry Metric Exporters. - */ public class OTelMetricExporterFactory { - - /** - * Default constructor - */ - private OTelMetricExporterFactory() { - - } - - private static final String METRIC_READER_THREAD_NAME = "otlp_metric_reader"; - - /** - * An enum representing different types of Metric Exporters. - */ public enum MetricExporterType { - /** - * OpenTelemetry gRPC Metric Exporter. - */ OTLP_GRPC { @Override public MetricExporter createExporter(Settings settings) throws PrivilegedActionException { String endpoint = "http://localhost:4317"; - return AccessController.doPrivileged( - (PrivilegedExceptionAction) () -> OtlpGrpcMetricExporter.builder().setEndpoint(endpoint).build() + return AccessController.doPrivileged((PrivilegedExceptionAction) () -> + OtlpGrpcMetricExporter.builder().setEndpoint(endpoint).build() ); } @@ -62,15 +35,12 @@ public String getName() { return "otlp_grpc"; } }, - /** - * OpenTelemetry HTTP Metric Exporter. - */ OTLP_HTTP { @Override public MetricExporter createExporter(Settings settings) throws PrivilegedActionException { String endpoint = "http://localhost:4318/v1/metrics"; - return AccessController.doPrivileged( - (PrivilegedExceptionAction) () -> OtlpHttpMetricExporter.builder().setEndpoint(endpoint).build() + return AccessController.doPrivileged((PrivilegedExceptionAction) () -> + OtlpHttpMetricExporter.builder().setEndpoint(endpoint).build() ); } @@ -79,9 +49,6 @@ public String getName() { return "otlp_http"; } }, - /** - * Logging Metric Exporter. - */ LOGGING { @Override public MetricExporter createExporter(Settings settings) { @@ -93,13 +60,6 @@ public String getName() { return "logging"; } }; - - /** - * Checks if the enum contains the given metric exporter type key. - * - * @param key The key to check. - * @return true if the key is found, false otherwise. - */ public static boolean containsKey(String key) { for (MetricExporterType value : MetricExporterType.values()) { if (value.getName().equals(key)) { @@ -109,14 +69,6 @@ public static boolean containsKey(String key) { return false; } - /** - * Creates a MetricExporter instance based on the given key and settings. - * - * @param key The key representing the type of MetricExporter. - * @param settings The settings to use for MetricExporter creation. - * @return A MetricExporter instance. - * @throws PrivilegedActionException if the creation of the MetricExporter fails. - */ public static MetricExporter createMetricExporter(String key, Settings settings) throws PrivilegedActionException { for (MetricExporterType value : MetricExporterType.values()) { if (value.getName().equals(key)) { @@ -125,31 +77,11 @@ public static MetricExporter createMetricExporter(String key, Settings settings) } return LOGGING.createExporter(settings); } - - /** - * Abstract method to create a MetricExporter instance. - * - * @param settings The settings to use for MetricExporter creation. - * @return A MetricExporter instance. - * @throws PrivilegedActionException if the creation of the MetricExporter fails. - */ public abstract MetricExporter createExporter(Settings settings) throws PrivilegedActionException; - - /** - * Abstract method to get the name of the MetricExporter type. - * - * @return The name of the MetricExporter type. - */ public abstract String getName(); } - /** - * Creates a MetricExporter instance based on the provided settings. - * - * @param settings The settings to use for MetricExporter creation. - * @return A MetricExporter instance. - */ - private static MetricExporter createMetricExporter(Settings settings) { + public static MetricExporter create(Settings settings) { String metricExporterName = OTelTelemetrySettings.OTEL_TRACER_METRIC_EXPORTER_NAME_SETTING.get(settings); try { return MetricExporterType.createMetricExporter(metricExporterName, settings); @@ -157,41 +89,5 @@ private static MetricExporter createMetricExporter(Settings settings) { throw new IllegalStateException("MetricExporter creation failed", ex.getCause()); } } - - /** - * Creates a PeriodicMetricReader using the provided settings. - * - * @param settings The settings to use for PeriodicMetricReader creation. - * @return A PeriodicMetricReader instance. - */ - public static MetricReader createPeriodicMetricReader(Settings settings) { - long interval = OTelTelemetrySettings.OTEL_TRACER_METRIC_READER_INTERVAL_SETTING.get(settings).getSeconds(); - ScheduledExecutorService pool = Executors.newScheduledThreadPool( - 1, - new PrivilegedThreadFactory(OpenSearchExecutors.daemonThreadFactory(METRIC_READER_THREAD_NAME)) - ); - return PeriodicMetricReader.builder(createMetricExporter(settings)) - .setExecutor(pool) - .setInterval(interval, TimeUnit.SECONDS) - .build(); - } - - /** - * A custom ThreadFactory that executes the thread with a privileged action. - */ - private static class PrivilegedThreadFactory implements ThreadFactory { - private final ThreadFactory delegate; - - public PrivilegedThreadFactory(ThreadFactory delegate) { - this.delegate = delegate; - } - - @Override - public Thread newThread(Runnable r) { - return delegate.newThread(() -> AccessController.doPrivileged((PrivilegedAction) () -> { - r.run(); - return null; - })); - } - } } + diff --git a/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/metrics/OTelMetricsTelemetry.java b/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/metrics/OTelMetricsTelemetry.java index 92aa322e7c0fe..2fa9500d212f7 100644 --- a/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/metrics/OTelMetricsTelemetry.java +++ b/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/metrics/OTelMetricsTelemetry.java @@ -35,3 +35,4 @@ public OpenTelemetry getTelemetry() { return openTelemetry; } } + diff --git a/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelResourceProvider.java b/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelResourceProvider.java index 25a4f8399b796..2469f297cdb45 100644 --- a/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelResourceProvider.java +++ b/plugins/telemetry-otel/src/main/java/org/opensearch/telemetry/tracing/OTelResourceProvider.java @@ -14,7 +14,8 @@ import io.opentelemetry.context.propagation.ContextPropagators; import io.opentelemetry.sdk.OpenTelemetrySdk; import io.opentelemetry.sdk.metrics.SdkMeterProvider; -import io.opentelemetry.sdk.metrics.export.MetricReader; +import io.opentelemetry.sdk.metrics.export.MetricExporter; +import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader; import io.opentelemetry.sdk.resources.Resource; import io.opentelemetry.sdk.trace.SdkTracerProvider; @@ -48,7 +49,7 @@ public static OpenTelemetry get(Settings settings) { return get( settings, OTelSpanExporterFactory.create(settings), - OTelMetricExporterFactory.createPeriodicMetricReader(settings), + OTelMetricExporterFactory.create(settings), ContextPropagators.create(W3CTraceContextPropagator.getInstance()), Sampler.alwaysOn() ); @@ -58,30 +59,24 @@ public static OpenTelemetry get(Settings settings) { * Creates OpenTelemetry instance with provided configuration * @param settings cluster settings * @param spanExporter span exporter instance - * @param metricReader MetricReader to be used in MeterProvider * @param contextPropagators context propagator instance * @param sampler sampler instance * @return Opentelemetry instance */ - public static OpenTelemetry get( - Settings settings, - SpanExporter spanExporter, - MetricReader metricReader, - ContextPropagators contextPropagators, - Sampler sampler - ) { + public static OpenTelemetry get(Settings settings, SpanExporter spanExporter, MetricExporter metricExporter, + ContextPropagators contextPropagators, Sampler sampler) { Resource resource = Resource.create(Attributes.of(ResourceAttributes.SERVICE_NAME, "OpenSearch")); SdkTracerProvider sdkTracerProvider = SdkTracerProvider.builder() .addSpanProcessor(spanProcessor(settings, spanExporter)) .setResource(resource) .setSampler(sampler) .build(); - SdkMeterProvider sdkMeterProvider = SdkMeterProvider.builder().registerMetricReader(metricReader).setResource(resource).build(); - return OpenTelemetrySdk.builder() - .setTracerProvider(sdkTracerProvider) - .setPropagators(contextPropagators) - .setMeterProvider(sdkMeterProvider) - .buildAndRegisterGlobal(); + SdkMeterProvider sdkMeterProvider = SdkMeterProvider.builder() + .registerMetricReader(PeriodicMetricReader.builder(metricExporter).build()) + .setResource(resource) + .build(); + return OpenTelemetrySdk.builder().setTracerProvider(sdkTracerProvider) + .setPropagators(contextPropagators).setMeterProvider(sdkMeterProvider).buildAndRegisterGlobal(); } private static BatchSpanProcessor spanProcessor(Settings settings, SpanExporter spanExporter) { diff --git a/plugins/telemetry-otel/src/main/plugin-metadata/plugin-security.policy b/plugins/telemetry-otel/src/main/plugin-metadata/plugin-security.policy index ec6497603f37f..ca63617c1a825 100644 --- a/plugins/telemetry-otel/src/main/plugin-metadata/plugin-security.policy +++ b/plugins/telemetry-otel/src/main/plugin-metadata/plugin-security.policy @@ -6,6 +6,12 @@ * compatible open source license. */ +// TODO - revisit this logic as it doesn't work as expected +grant codeBase "${codebase.okhttp}" { + permission java.net.SocketPermission "127.0.0.1:4318", "connect,resolve"; + permission java.net.SocketPermission "127.0.0.1:4317", "connect,resolve"; +}; + grant { permission java.lang.RuntimePermission "getClassLoader"; permission java.lang.RuntimePermission "accessDeclaredMembers"; From dc38d5ca4a579032a59e5ebfcf5316df3b97c0f2 Mon Sep 17 00:00:00 2001 From: Rishabh Maurya Date: Thu, 27 Jul 2023 12:47:37 -0700 Subject: [PATCH 2/4] trace events service integration with server --- .../common/settings/ClusterSettings.java | 3 +- .../util/concurrent/OpenSearchExecutors.java | 16 +++- .../OpenSearchThreadPoolExecutor.java | 32 +++++++- .../main/java/org/opensearch/node/Node.java | 43 +++++++++-- .../opensearch/plugins/TelemetryPlugin.java | 6 +- .../telemetry/TelemetrySettings.java | 26 ++++++- .../TraceEventTransportResponseHandler.java | 76 +++++++++++++++++++ .../telemetry/tracing/NoopTracerFactory.java | 3 +- .../telemetry/tracing/TracerFactory.java | 9 ++- .../threadpool/ScalingExecutorBuilder.java | 28 ++++++- .../org/opensearch/threadpool/ThreadPool.java | 12 ++- .../transport/TransportService.java | 20 ++++- .../telemetry/tracing/TracerFactoryTests.java | 15 ++-- .../java/org/opensearch/node/MockNode.java | 8 +- .../disruption/DisruptableMockTransport.java | 3 +- .../test/telemetry/MockTelemetryPlugin.java | 10 +-- .../test/transport/MockTransport.java | 4 +- .../test/transport/MockTransportService.java | 18 +++-- 18 files changed, 283 insertions(+), 49 deletions(-) create mode 100644 server/src/main/java/org/opensearch/telemetry/TraceEventTransportResponseHandler.java diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 802eb7bd01254..7588aefbea5e6 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -657,7 +657,8 @@ public void apply(Settings value, Settings current, Settings previous) { // Related to monitoring of task cancellation TaskCancellationMonitoringSettings.IS_ENABLED_SETTING, - TaskCancellationMonitoringSettings.DURATION_MILLIS_SETTING + TaskCancellationMonitoringSettings.DURATION_MILLIS_SETTING, + TelemetrySettings.DIAGNOSIS_ENABLED_SETTING ) ) ); diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/OpenSearchExecutors.java b/server/src/main/java/org/opensearch/common/util/concurrent/OpenSearchExecutors.java index ec1024bbe5f30..6c1f348d55f7c 100644 --- a/server/src/main/java/org/opensearch/common/util/concurrent/OpenSearchExecutors.java +++ b/server/src/main/java/org/opensearch/common/util/concurrent/OpenSearchExecutors.java @@ -40,6 +40,7 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.node.Node; +import org.opensearch.telemetry.tracing.listeners.TraceEventsService; import org.opensearch.threadpool.RunnableTaskExecutionListener; import org.opensearch.threadpool.TaskAwareRunnable; @@ -137,6 +138,18 @@ public static OpenSearchThreadPoolExecutor newScaling( TimeUnit unit, ThreadFactory threadFactory, ThreadContext contextHolder + ) { + return newScaling(name, min, max, keepAliveTime, unit, threadFactory, contextHolder, null); + } + public static OpenSearchThreadPoolExecutor newScaling( + String name, + int min, + int max, + long keepAliveTime, + TimeUnit unit, + ThreadFactory threadFactory, + ThreadContext contextHolder, + TraceEventsService traceEventsService ) { ExecutorScalingQueue queue = new ExecutorScalingQueue<>(); OpenSearchThreadPoolExecutor executor = new OpenSearchThreadPoolExecutor( @@ -148,7 +161,8 @@ public static OpenSearchThreadPoolExecutor newScaling( queue, threadFactory, new ForceQueuePolicy(), - contextHolder + contextHolder, + traceEventsService ); queue.executor = executor; return executor; diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/OpenSearchThreadPoolExecutor.java b/server/src/main/java/org/opensearch/common/util/concurrent/OpenSearchThreadPoolExecutor.java index d967b7423ca80..cc393d7c2564f 100644 --- a/server/src/main/java/org/opensearch/common/util/concurrent/OpenSearchThreadPoolExecutor.java +++ b/server/src/main/java/org/opensearch/common/util/concurrent/OpenSearchThreadPoolExecutor.java @@ -34,6 +34,7 @@ import org.opensearch.common.SuppressForbidden; import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; +import org.opensearch.telemetry.tracing.listeners.TraceEventsService; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ThreadFactory; @@ -52,6 +53,9 @@ public class OpenSearchThreadPoolExecutor extends ThreadPoolExecutor { private volatile ShutdownListener listener; private final Object monitor = new Object(); + + private final TraceEventsService traceEventsService; + /** * Name used in error reporting. */ @@ -95,10 +99,26 @@ final String getName() { ThreadFactory threadFactory, XRejectedExecutionHandler handler, ThreadContext contextHolder + ) { + this(name, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler, contextHolder, null); + } + + OpenSearchThreadPoolExecutor( + String name, + int corePoolSize, + int maximumPoolSize, + long keepAliveTime, + TimeUnit unit, + BlockingQueue workQueue, + ThreadFactory threadFactory, + XRejectedExecutionHandler handler, + ThreadContext contextHolder, + TraceEventsService traceEventsService ) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); this.name = name; this.contextHolder = contextHolder; + this.traceEventsService = traceEventsService; } @Override @@ -199,10 +219,18 @@ protected void appendThreadPoolExecutorDetails(final StringBuilder sb) { } protected Runnable wrapRunnable(Runnable command) { - return contextHolder.preserveContext(command); + if (traceEventsService != null) { + return traceEventsService.wrapRunnable(contextHolder.preserveContext(command)); + } else { + return contextHolder.preserveContext(command); + } } protected Runnable unwrap(Runnable runnable) { - return contextHolder.unwrap(runnable); + if (traceEventsService != null) { + return traceEventsService.unwrapRunnable(contextHolder.unwrap(runnable)); + } else { + return contextHolder.unwrap(runnable); + } } } diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 60c95a04a042f..66fc9fc7a6b1d 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -56,6 +56,8 @@ import org.opensearch.monitor.fs.FsProbe; import org.opensearch.plugins.ExtensionAwarePlugin; import org.opensearch.plugins.SearchPipelinePlugin; +import org.opensearch.telemetry.tracing.listeners.TraceEventListener; +import org.opensearch.telemetry.tracing.listeners.TraceEventsService; import org.opensearch.telemetry.tracing.NoopTracerFactory; import org.opensearch.telemetry.tracing.Tracer; import org.opensearch.telemetry.tracing.TracerFactory; @@ -66,6 +68,7 @@ import org.opensearch.tasks.TaskCancellationMonitoringSettings; import org.opensearch.tasks.TaskResourceTrackingService; import org.opensearch.tasks.consumer.TopNSearchTasksLogger; +import org.opensearch.telemetry.tracing.TracerUtil; import org.opensearch.threadpool.RunnableTaskExecutionListener; import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory; import org.opensearch.telemetry.TelemetryModule; @@ -380,6 +383,7 @@ public static class DiscoverySettings { private final LocalNodeFactory localNodeFactory; private final NodeService nodeService; private final Tracer tracer; + private final TraceEventsService traceEventsService; final NamedWriteableRegistry namedWriteableRegistry; private final AtomicReference runnableTaskListener; private FileCache fileCache; @@ -519,7 +523,8 @@ protected Node( final List> executorBuilders = pluginsService.getExecutorBuilders(settings); runnableTaskListener = new AtomicReference<>(); - final ThreadPool threadPool = new ThreadPool(settings, runnableTaskListener, executorBuilders.toArray(new ExecutorBuilder[0])); + traceEventsService = new TraceEventsService(); + final ThreadPool threadPool = new ThreadPool(settings, runnableTaskListener, traceEventsService, executorBuilders.toArray(new ExecutorBuilder[0])); resourcesToClose.add(() -> ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS)); final ResourceWatcherService resourceWatcherService = new ResourceWatcherService(settings, threadPool); resourcesToClose.add(resourceWatcherService); @@ -852,6 +857,7 @@ protected Node( pluginsService.filterPlugins(ActionPlugin.class).stream().flatMap(p -> p.getTaskHeaders().stream()), Stream.of(Task.X_OPAQUE_ID) ).collect(Collectors.toSet()); + final TransportService transportService = newTransportService( settings, transport, @@ -859,7 +865,8 @@ protected Node( networkModule.getTransportInterceptor(), localNodeFactory, settingsModule.getClusterSettings(), - taskHeaders + taskHeaders, + traceEventsService ); TopNSearchTasksLogger taskConsumer = new TopNSearchTasksLogger(settings, settingsModule.getClusterSettings()); transportService.getTaskManager().registerTaskResourceConsumer(taskConsumer); @@ -1031,15 +1038,19 @@ protected Node( TracerFactory tracerFactory; if (FeatureFlags.isEnabled(TELEMETRY)) { - final TelemetrySettings telemetrySettings = new TelemetrySettings(settings, clusterService.getClusterSettings()); + final TelemetrySettings telemetrySettings = new TelemetrySettings(settings, + clusterService.getClusterSettings(), traceEventsService); List telemetryPlugins = pluginsService.filterPlugins(TelemetryPlugin.class); TelemetryModule telemetryModule = new TelemetryModule(telemetryPlugins, telemetrySettings); - tracerFactory = new TracerFactory(telemetrySettings, telemetryModule.getTelemetry(), threadPool.getThreadContext()); + tracerFactory = new TracerFactory(telemetrySettings, telemetryModule.getTelemetry(), threadPool.getThreadContext(), traceEventsService); + initializeTraceEventService(traceEventsService, telemetryModule, telemetryPlugins); + TracerUtil.setTraceEventService(traceEventsService); } else { tracerFactory = new NoopTracerFactory(); + TracerUtil.setTraceEventService(traceEventsService); } tracer = tracerFactory.getTracer(); - resourcesToClose.add(tracer::close); + resourcesToClose.add(tracer); final List> tasksExecutors = pluginsService.filterPlugins(PersistentTaskPlugin.class) .stream() @@ -1103,6 +1114,7 @@ protected Node( b.bind(SearchPhaseController.class) .toInstance(new SearchPhaseController(namedWriteableRegistry, searchService::aggReduceContextBuilder)); b.bind(Transport.class).toInstance(transport); + b.bind(TraceEventsService.class).toInstance(traceEventsService); b.bind(TransportService.class).toInstance(transportService); b.bind(NetworkService.class).toInstance(networkService); b.bind(UpdateHelper.class).toInstance(new UpdateHelper(scriptService)); @@ -1199,9 +1211,11 @@ protected TransportService newTransportService( TransportInterceptor interceptor, Function localNodeFactory, ClusterSettings clusterSettings, - Set taskHeaders + Set taskHeaders, + TraceEventsService traceEventsService ) { - return new TransportService(settings, transport, threadPool, interceptor, localNodeFactory, clusterSettings, taskHeaders); + return new TransportService(settings, transport, threadPool, interceptor, localNodeFactory, clusterSettings, taskHeaders, + traceEventsService); } protected void processRecoverySettings(ClusterSettings clusterSettings, RecoverySettings recoverySettings) { @@ -1612,6 +1626,21 @@ public static CircuitBreakerService createCircuitBreakerService( } } + private static void initializeTraceEventService(TraceEventsService traceEventsService, + TelemetryModule telemetryModule, List telemetryPlugins) { + final Map traceEventListeners; + if (telemetryModule.getTelemetry().isPresent()) { + traceEventListeners = telemetryPlugins.stream() + .flatMap(plugin -> plugin.getTraceEventListeners(telemetryModule.getTelemetry().get()).entrySet().stream()) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } else { + traceEventListeners = Collections.emptyMap(); + } + for (Map.Entry entry : traceEventListeners.entrySet()) { + traceEventsService.registerTraceEventListener(entry.getKey(), entry.getValue()); + } + } + /** * Creates a new {@link BigArrays} instance used for this node. * This method can be overwritten by subclasses to change their {@link BigArrays} implementation for instance for testing diff --git a/server/src/main/java/org/opensearch/plugins/TelemetryPlugin.java b/server/src/main/java/org/opensearch/plugins/TelemetryPlugin.java index 047671f69098a..79529edd660ce 100644 --- a/server/src/main/java/org/opensearch/plugins/TelemetryPlugin.java +++ b/server/src/main/java/org/opensearch/plugins/TelemetryPlugin.java @@ -10,10 +10,8 @@ import org.opensearch.telemetry.Telemetry; import org.opensearch.telemetry.TelemetrySettings; -import org.opensearch.telemetry.diagnostics.DiagnosticsEventListener; import org.opensearch.telemetry.tracing.listeners.TraceEventListener; -import java.util.Collections; import java.util.Map; import java.util.Optional; @@ -24,8 +22,8 @@ public interface TelemetryPlugin { Optional getTelemetry(TelemetrySettings settings); - String getName(); - Map getTraceEventListeners(Telemetry telemetry); + String getName(); + } diff --git a/server/src/main/java/org/opensearch/telemetry/TelemetrySettings.java b/server/src/main/java/org/opensearch/telemetry/TelemetrySettings.java index 7c9e0d5ac8097..68d3a586fc1a8 100644 --- a/server/src/main/java/org/opensearch/telemetry/TelemetrySettings.java +++ b/server/src/main/java/org/opensearch/telemetry/TelemetrySettings.java @@ -11,6 +11,7 @@ import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; +import org.opensearch.telemetry.tracing.listeners.TraceEventsService; /** * Wrapper class to encapsulate tracing related settings @@ -23,20 +24,41 @@ public class TelemetrySettings { Setting.Property.Dynamic ); + public static final Setting DIAGNOSIS_ENABLED_SETTING = Setting.boolSetting( + "telemetry.diagnosis.enabled", + false, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + private volatile boolean tracingEnabled; + private volatile boolean diagnosisEnabled; - public TelemetrySettings(Settings settings, ClusterSettings clusterSettings) { - this.tracingEnabled = TRACER_ENABLED_SETTING.get(settings); + private final TraceEventsService traceEventsService; + public TelemetrySettings(Settings settings, ClusterSettings clusterSettings, TraceEventsService traceEventsService) { + this.traceEventsService = traceEventsService; + this.setTracingEnabled(TRACER_ENABLED_SETTING.get(settings)); + this.setDiagnosisEnabled(DIAGNOSIS_ENABLED_SETTING.get(settings)); clusterSettings.addSettingsUpdateConsumer(TRACER_ENABLED_SETTING, this::setTracingEnabled); + clusterSettings.addSettingsUpdateConsumer(DIAGNOSIS_ENABLED_SETTING, this::setDiagnosisEnabled); } public void setTracingEnabled(boolean tracingEnabled) { this.tracingEnabled = tracingEnabled; + traceEventsService.setTracingEnabled(tracingEnabled); + } + + public void setDiagnosisEnabled(boolean diagnosisEnabled) { + this.diagnosisEnabled = diagnosisEnabled; + traceEventsService.setDiagnosisEnabled(diagnosisEnabled); } public boolean isTracingEnabled() { return tracingEnabled; } + public boolean isDiagnosisEnabled() { + return diagnosisEnabled; + } } diff --git a/server/src/main/java/org/opensearch/telemetry/TraceEventTransportResponseHandler.java b/server/src/main/java/org/opensearch/telemetry/TraceEventTransportResponseHandler.java new file mode 100644 index 0000000000000..07bb1a1a2fe9a --- /dev/null +++ b/server/src/main/java/org/opensearch/telemetry/TraceEventTransportResponseHandler.java @@ -0,0 +1,76 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.telemetry; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.telemetry.tracing.listeners.TraceEventListener; +import org.opensearch.telemetry.tracing.listeners.TraceEventsRunnable; +import org.opensearch.telemetry.tracing.listeners.TraceEventsService; +import org.opensearch.transport.TransportException; +import org.opensearch.transport.TransportResponse; +import org.opensearch.transport.TransportResponseHandler; + +import java.io.IOException; + +/** + * This class acts as a wrapper around a given TransportResponseHandler to handle missed runnable start trace events + * {@link TraceEventListener#onRunnableStart} when a response is received and handled in the TransportService. + * A thread will only have thread context once TransportService.ContextRestoreResponseHandler is invoked and thus + * {@link TraceEventsRunnable#run()} will miss the onRunnableStart event as Span information will not be present. + * Eventually, in TraceEventsRunnable#run()'s delegate.run(), when thread context is restored by {@link org.opensearch.transport.TransportService.ContextRestoreResponseHandler} + * then this class will invoke missed trace event listener's the onRunnableStart() event. Whereas, onRunnableComplete will not + * be missed and doesn't need any special handling here. + */ +public final class TraceEventTransportResponseHandler implements TransportResponseHandler { + private static final Logger logger = LogManager.getLogger(TraceEventTransportResponseHandler.class); + + private final TransportResponseHandler delegate; + + private final TraceEventsService traceEventsService; + + public TraceEventTransportResponseHandler(TransportResponseHandler delegate, + TraceEventsService traceEventsService) { + this.delegate = delegate; + this.traceEventsService = traceEventsService; + } + + @Override + public T read(StreamInput in) throws IOException { + return delegate.read(in); + } + + @Override + public void handleResponse(T response) { + try { + TraceEventsRunnable.invokeOnRunnableStart(traceEventsService); + } catch (Exception e) { + logger.debug("Error in invoking onRunnableStart while TraceEventTransportResponseHandler handleResponse", e); + } finally { + delegate.handleResponse(response); + } + } + + @Override + public void handleException(TransportException exp) { + try { + TraceEventsRunnable.invokeOnRunnableStart(traceEventsService); + } catch (Exception e) { + logger.debug("Error in invoking onRunnableStart while TraceEventTransportResponseHandler handleResponse", e); + } finally { + delegate.handleException(exp); + } + } + + @Override + public String executor() { + return delegate.executor(); + } +} diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/NoopTracerFactory.java b/server/src/main/java/org/opensearch/telemetry/tracing/NoopTracerFactory.java index f82a390dc1754..701ee9baa9cd7 100644 --- a/server/src/main/java/org/opensearch/telemetry/tracing/NoopTracerFactory.java +++ b/server/src/main/java/org/opensearch/telemetry/tracing/NoopTracerFactory.java @@ -8,6 +8,7 @@ package org.opensearch.telemetry.tracing; +import org.opensearch.telemetry.tracing.listeners.TraceEventsService; import org.opensearch.telemetry.tracing.noop.NoopTracer; import java.util.Optional; @@ -19,7 +20,7 @@ */ public class NoopTracerFactory extends TracerFactory { public NoopTracerFactory() { - super(null, Optional.empty(), null); + super(null, Optional.empty(), null, new TraceEventsService()); } @Override diff --git a/server/src/main/java/org/opensearch/telemetry/tracing/TracerFactory.java b/server/src/main/java/org/opensearch/telemetry/tracing/TracerFactory.java index d8fe812c82f53..4b2fa331650e1 100644 --- a/server/src/main/java/org/opensearch/telemetry/tracing/TracerFactory.java +++ b/server/src/main/java/org/opensearch/telemetry/tracing/TracerFactory.java @@ -13,12 +13,14 @@ import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.telemetry.Telemetry; import org.opensearch.telemetry.TelemetrySettings; +import org.opensearch.telemetry.tracing.listeners.TraceEventsService; import org.opensearch.telemetry.tracing.noop.NoopTracer; import java.io.Closeable; import java.io.IOException; import java.util.Optional; + /** * TracerManager represents a single global class that is used to access tracers. *

@@ -31,10 +33,13 @@ public class TracerFactory implements Closeable { private final TelemetrySettings telemetrySettings; private final Tracer tracer; + private final TraceEventsService traceEventsService; - public TracerFactory(TelemetrySettings telemetrySettings, Optional telemetry, ThreadContext threadContext) { + public TracerFactory(TelemetrySettings telemetrySettings, Optional telemetry, ThreadContext threadContext, + TraceEventsService traceEventsService) { this.telemetrySettings = telemetrySettings; - this.tracer = tracer(telemetry, threadContext); + this.traceEventsService = traceEventsService; + this.tracer = traceEventsService.wrapAndSetTracer(tracer(telemetry, threadContext)); } /** diff --git a/server/src/main/java/org/opensearch/threadpool/ScalingExecutorBuilder.java b/server/src/main/java/org/opensearch/threadpool/ScalingExecutorBuilder.java index 2690a3cc30238..b56eca0526493 100644 --- a/server/src/main/java/org/opensearch/threadpool/ScalingExecutorBuilder.java +++ b/server/src/main/java/org/opensearch/threadpool/ScalingExecutorBuilder.java @@ -38,6 +38,7 @@ import org.opensearch.common.util.concurrent.OpenSearchExecutors; import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.node.Node; +import org.opensearch.telemetry.tracing.listeners.TraceEventsService; import java.util.Arrays; import java.util.List; @@ -57,6 +58,8 @@ public final class ScalingExecutorBuilder extends ExecutorBuilder maxSetting; private final Setting keepAliveSetting; + private final TraceEventsService traceEventsService; + /** * Construct a scaling executor builder; the settings will have the * key prefix "thread_pool." followed by the executor name. @@ -71,6 +74,22 @@ public ScalingExecutorBuilder(final String name, final int core, final int max, this(name, core, max, keepAlive, "thread_pool." + name); } + /** + * Construct a scaling executor builder; the settings will have the + * key prefix "thread_pool." followed by the executor name. + * + * @param name the name of the executor + * @param core the minimum number of threads in the pool + * @param max the maximum number of threads in the pool + * @param keepAlive the time that spare threads above {@code core} + * threads will be kept alive + * @param traceEventsService trace event service to wrap the Runnable to listen to Runnable events + * and notify registered event listeners with the service. + */ + public ScalingExecutorBuilder(final String name, final int core, final int max, final TimeValue keepAlive, + TraceEventsService traceEventsService) { + this(name, core, max, keepAlive, "thread_pool." + name, traceEventsService); + } /** * Construct a scaling executor builder; the settings will have the * specified key prefix. @@ -83,10 +102,16 @@ public ScalingExecutorBuilder(final String name, final int core, final int max, * @param prefix the prefix for the settings keys */ public ScalingExecutorBuilder(final String name, final int core, final int max, final TimeValue keepAlive, final String prefix) { + this(name, core, max, keepAlive, prefix, null); + } + + private ScalingExecutorBuilder(final String name, final int core, final int max, final TimeValue keepAlive, final String prefix, + TraceEventsService traceEventsService) { super(name); this.coreSetting = Setting.intSetting(settingsKey(prefix, "core"), core, Setting.Property.NodeScope); this.maxSetting = Setting.intSetting(settingsKey(prefix, "max"), max, Setting.Property.NodeScope); this.keepAliveSetting = Setting.timeSetting(settingsKey(prefix, "keep_alive"), keepAlive, Setting.Property.NodeScope); + this.traceEventsService = traceEventsService; } @Override @@ -118,7 +143,8 @@ ThreadPool.ExecutorHolder build(final ScalingExecutorSettings settings, final Th keepAlive.millis(), TimeUnit.MILLISECONDS, threadFactory, - threadContext + threadContext, + traceEventsService ); return new ThreadPool.ExecutorHolder(executor, info); } diff --git a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java index 0851677bcb13a..0608b69c035f3 100644 --- a/server/src/main/java/org/opensearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/opensearch/threadpool/ThreadPool.java @@ -54,6 +54,7 @@ import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.node.Node; import org.opensearch.node.ReportingService; +import org.opensearch.telemetry.tracing.listeners.TraceEventsService; import java.io.IOException; import java.util.ArrayList; @@ -221,6 +222,15 @@ public ThreadPool( final Settings settings, final AtomicReference runnableTaskListener, final ExecutorBuilder... customBuilders + ) { + this(settings, runnableTaskListener, null, customBuilders); + } + + public ThreadPool( + final Settings settings, + final AtomicReference runnableTaskListener, + TraceEventsService traceEventsService, + final ExecutorBuilder... customBuilders ) { assert Node.NODE_NAME_SETTING.exists(settings); @@ -229,7 +239,7 @@ public ThreadPool( final int halfProcMaxAt5 = halfAllocatedProcessorsMaxFive(allocatedProcessors); final int halfProcMaxAt10 = halfAllocatedProcessorsMaxTen(allocatedProcessors); final int genericThreadPoolMax = boundedBy(4 * allocatedProcessors, 128, 512); - builders.put(Names.GENERIC, new ScalingExecutorBuilder(Names.GENERIC, 4, genericThreadPoolMax, TimeValue.timeValueSeconds(30))); + builders.put(Names.GENERIC, new ScalingExecutorBuilder(Names.GENERIC, 4, genericThreadPoolMax, TimeValue.timeValueSeconds(30), traceEventsService)); builders.put(Names.WRITE, new FixedExecutorBuilder(settings, Names.WRITE, allocatedProcessors, 10000)); builders.put(Names.GET, new FixedExecutorBuilder(settings, Names.GET, allocatedProcessors, 1000)); builders.put(Names.ANALYZE, new FixedExecutorBuilder(settings, Names.ANALYZE, 1, 16)); diff --git a/server/src/main/java/org/opensearch/transport/TransportService.java b/server/src/main/java/org/opensearch/transport/TransportService.java index 61900fa1f0014..a49894e62a5e6 100644 --- a/server/src/main/java/org/opensearch/transport/TransportService.java +++ b/server/src/main/java/org/opensearch/transport/TransportService.java @@ -66,6 +66,8 @@ import org.opensearch.node.ReportingService; import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskManager; +import org.opensearch.telemetry.TraceEventTransportResponseHandler; +import org.opensearch.telemetry.tracing.listeners.TraceEventsService; import org.opensearch.threadpool.Scheduler; import org.opensearch.threadpool.ThreadPool; @@ -115,6 +117,8 @@ public class TransportService extends AbstractLifecycleComponent private final Transport.ResponseHandlers responseHandlers; private final TransportInterceptor interceptor; + private final TraceEventsService traceEventsService; + // An LRU (don't really care about concurrency here) that holds the latest timed out requests so if they // do show up, we can print more descriptive information about them final Map timeoutInfoHandlers = Collections.synchronizedMap( @@ -190,7 +194,8 @@ public TransportService( TransportInterceptor transportInterceptor, Function localNodeFactory, @Nullable ClusterSettings clusterSettings, - Set taskHeaders + Set taskHeaders, + TraceEventsService traceEventsService ) { this( settings, @@ -200,7 +205,8 @@ public TransportService( localNodeFactory, clusterSettings, taskHeaders, - new ClusterConnectionManager(settings, transport) + new ClusterConnectionManager(settings, transport), + traceEventsService ); } @@ -212,7 +218,8 @@ public TransportService( Function localNodeFactory, @Nullable ClusterSettings clusterSettings, Set taskHeaders, - ConnectionManager connectionManager + ConnectionManager connectionManager, + TraceEventsService traceEventsService ) { this.transport = transport; transport.setSlowLogThreshold(TransportSettings.SLOW_OPERATION_THRESHOLD_SETTING.get(settings)); @@ -229,6 +236,7 @@ public TransportService( this.remoteClusterClient = DiscoveryNode.isRemoteClusterClient(settings); remoteClusterService = new RemoteClusterService(settings, this); responseHandlers = transport.getResponseHandlers(); + this.traceEventsService = traceEventsService; if (clusterSettings != null) { clusterSettings.addSettingsUpdateConsumer(TransportSettings.TRACE_LOG_INCLUDE_SETTING, this::setTracerLogInclude); clusterSettings.addSettingsUpdateConsumer(TransportSettings.TRACE_LOG_EXCLUDE_SETTING, this::setTracerLogExclude); @@ -972,7 +980,11 @@ private void sendRequestInternal( DiscoveryNode node = connection.getNode(); Supplier storedContextSupplier = threadPool.getThreadContext().newRestorableContext(true); - ContextRestoreResponseHandler responseHandler = new ContextRestoreResponseHandler<>(storedContextSupplier, handler); + TransportResponseHandler traceEventTransportResponseHandler = handler; + if (traceEventsService != null && traceEventsService.isTracingEnabled()) { + traceEventTransportResponseHandler = new TraceEventTransportResponseHandler<>(handler, traceEventsService); + } + ContextRestoreResponseHandler responseHandler = new ContextRestoreResponseHandler<>(storedContextSupplier, traceEventTransportResponseHandler); // TODO we can probably fold this entire request ID dance into connection.sendReqeust but it will be a bigger refactoring final long requestId = responseHandlers.add(new Transport.ResponseContext<>(responseHandler, connection, action)); final TimeoutHandler timeoutHandler; diff --git a/server/src/test/java/org/opensearch/telemetry/tracing/TracerFactoryTests.java b/server/src/test/java/org/opensearch/telemetry/tracing/TracerFactoryTests.java index 0ffccee505d43..76280ddaf5e48 100644 --- a/server/src/test/java/org/opensearch/telemetry/tracing/TracerFactoryTests.java +++ b/server/src/test/java/org/opensearch/telemetry/tracing/TracerFactoryTests.java @@ -16,6 +16,7 @@ import org.opensearch.common.util.concurrent.ThreadContext; import org.opensearch.telemetry.Telemetry; import org.opensearch.telemetry.TelemetrySettings; +import org.opensearch.telemetry.tracing.listeners.TraceEventsService; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.telemetry.tracing.noop.NoopTracer; @@ -38,24 +39,24 @@ public void close() { public void testGetTracerWithUnavailableTracingTelemetryReturnsNoopTracer() { Settings settings = Settings.builder().put(TelemetrySettings.TRACER_ENABLED_SETTING.getKey(), false).build(); - TelemetrySettings telemetrySettings = new TelemetrySettings(settings, new ClusterSettings(settings, getClusterSettings())); + TraceEventsService traceEventsService = new TraceEventsService(); + TelemetrySettings telemetrySettings = new TelemetrySettings(settings, new ClusterSettings(settings, getClusterSettings()), traceEventsService); Telemetry mockTelemetry = mock(Telemetry.class); when(mockTelemetry.getTracingTelemetry()).thenReturn(mock(TracingTelemetry.class)); - tracerFactory = new TracerFactory(telemetrySettings, Optional.empty(), new ThreadContext(Settings.EMPTY)); - + tracerFactory = new TracerFactory(telemetrySettings, Optional.empty(), new ThreadContext(Settings.EMPTY), traceEventsService); Tracer tracer = tracerFactory.getTracer(); - assertTrue(tracer instanceof NoopTracer); assertTrue(tracer.startSpan("foo") == SpanScope.NO_OP); } public void testGetTracerWithAvailableTracingTelemetryReturnsWrappedTracer() { Settings settings = Settings.builder().put(TelemetrySettings.TRACER_ENABLED_SETTING.getKey(), true).build(); - TelemetrySettings telemetrySettings = new TelemetrySettings(settings, new ClusterSettings(settings, getClusterSettings())); + TraceEventsService traceEventsService = new TraceEventsService(); + + TelemetrySettings telemetrySettings = new TelemetrySettings(settings, new ClusterSettings(settings, getClusterSettings()), traceEventsService); Telemetry mockTelemetry = mock(Telemetry.class); when(mockTelemetry.getTracingTelemetry()).thenReturn(mock(TracingTelemetry.class)); - tracerFactory = new TracerFactory(telemetrySettings, Optional.of(mockTelemetry), new ThreadContext(Settings.EMPTY)); - + tracerFactory = new TracerFactory(telemetrySettings, Optional.of(mockTelemetry), new ThreadContext(Settings.EMPTY), traceEventsService); Tracer tracer = tracerFactory.getTracer(); assertTrue(tracer instanceof WrappedTracer); diff --git a/test/framework/src/main/java/org/opensearch/node/MockNode.java b/test/framework/src/main/java/org/opensearch/node/MockNode.java index 59c78d32c4c3c..181c931655171 100644 --- a/test/framework/src/main/java/org/opensearch/node/MockNode.java +++ b/test/framework/src/main/java/org/opensearch/node/MockNode.java @@ -60,6 +60,7 @@ import org.opensearch.search.SearchService; import org.opensearch.search.fetch.FetchPhase; import org.opensearch.search.query.QueryPhase; +import org.opensearch.telemetry.tracing.listeners.TraceEventsService; import org.opensearch.test.MockHttpTransport; import org.opensearch.test.transport.MockTransportService; import org.opensearch.threadpool.ThreadPool; @@ -199,16 +200,17 @@ protected TransportService newTransportService( TransportInterceptor interceptor, Function localNodeFactory, ClusterSettings clusterSettings, - Set taskHeaders + Set taskHeaders, + TraceEventsService traceEventsService ) { // we use the MockTransportService.TestPlugin class as a marker to create a network // module with this MockNetworkService. NetworkService is such an integral part of the systme // we don't allow to plug it in from plugins or anything. this is a test-only override and // can't be done in a production env. if (getPluginsService().filterPlugins(MockTransportService.TestPlugin.class).isEmpty()) { - return super.newTransportService(settings, transport, threadPool, interceptor, localNodeFactory, clusterSettings, taskHeaders); + return super.newTransportService(settings, transport, threadPool, interceptor, localNodeFactory, clusterSettings, taskHeaders, traceEventsService); } else { - return new MockTransportService(settings, transport, threadPool, interceptor, localNodeFactory, clusterSettings, taskHeaders); + return new MockTransportService(settings, transport, threadPool, interceptor, localNodeFactory, clusterSettings, taskHeaders, traceEventsService); } } diff --git a/test/framework/src/main/java/org/opensearch/test/disruption/DisruptableMockTransport.java b/test/framework/src/main/java/org/opensearch/test/disruption/DisruptableMockTransport.java index 1cd60690ca9d5..bb608c7882f87 100644 --- a/test/framework/src/main/java/org/opensearch/test/disruption/DisruptableMockTransport.java +++ b/test/framework/src/main/java/org/opensearch/test/disruption/DisruptableMockTransport.java @@ -41,6 +41,7 @@ import org.opensearch.common.settings.Settings; import org.opensearch.core.common.transport.BoundTransportAddress; import org.opensearch.core.common.transport.TransportAddress; +import org.opensearch.telemetry.tracing.listeners.TraceEventsService; import org.opensearch.test.transport.MockTransport; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.CloseableConnection; @@ -93,7 +94,7 @@ public TransportService createTransportService( @Nullable ClusterSettings clusterSettings, Set taskHeaders ) { - return new TransportService(settings, this, threadPool, interceptor, localNodeFactory, clusterSettings, taskHeaders); + return new TransportService(settings, this, threadPool, interceptor, localNodeFactory, clusterSettings, taskHeaders, new TraceEventsService()); } @Override diff --git a/test/framework/src/main/java/org/opensearch/test/telemetry/MockTelemetryPlugin.java b/test/framework/src/main/java/org/opensearch/test/telemetry/MockTelemetryPlugin.java index 17225f3a930fa..3a7b9dc8c538d 100644 --- a/test/framework/src/main/java/org/opensearch/test/telemetry/MockTelemetryPlugin.java +++ b/test/framework/src/main/java/org/opensearch/test/telemetry/MockTelemetryPlugin.java @@ -8,7 +8,6 @@ package org.opensearch.test.telemetry; -import java.util.Collections; import java.util.Map; import java.util.Optional; import org.opensearch.plugins.Plugin; @@ -36,12 +35,13 @@ public Optional getTelemetry(TelemetrySettings settings) { } @Override - public String getName() { - return MOCK_TRACER_NAME; + public Map getTraceEventListeners(Telemetry telemetry) { + // TODO revisit + return null; } @Override - public Map getTraceEventListeners(Telemetry telemetry) { - return Collections.emptyMap(); + public String getName() { + return MOCK_TRACER_NAME; } } diff --git a/test/framework/src/main/java/org/opensearch/test/transport/MockTransport.java b/test/framework/src/main/java/org/opensearch/test/transport/MockTransport.java index 0974a5f1f5671..824db19cb5338 100644 --- a/test/framework/src/main/java/org/opensearch/test/transport/MockTransport.java +++ b/test/framework/src/main/java/org/opensearch/test/transport/MockTransport.java @@ -44,6 +44,7 @@ import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.core.common.transport.BoundTransportAddress; +import org.opensearch.telemetry.tracing.listeners.TraceEventsService; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.CloseableConnection; import org.opensearch.transport.ClusterConnectionManager; @@ -93,7 +94,8 @@ public TransportService createTransportService( localNodeFactory, clusterSettings, taskHeaders, - connectionManager + connectionManager, + new TraceEventsService() ); } diff --git a/test/framework/src/main/java/org/opensearch/test/transport/MockTransportService.java b/test/framework/src/main/java/org/opensearch/test/transport/MockTransportService.java index 5cf451980ec98..e18f74945585f 100644 --- a/test/framework/src/main/java/org/opensearch/test/transport/MockTransportService.java +++ b/test/framework/src/main/java/org/opensearch/test/transport/MockTransportService.java @@ -57,6 +57,7 @@ import org.opensearch.node.Node; import org.opensearch.plugins.Plugin; import org.opensearch.tasks.TaskManager; +import org.opensearch.telemetry.tracing.listeners.TraceEventsService; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.tasks.MockTaskManager; import org.opensearch.threadpool.ThreadPool; @@ -176,7 +177,8 @@ public static MockTransportService createNewService( version ), clusterSettings, - taskHeaders + taskHeaders, + new TraceEventsService() ); } @@ -207,7 +209,8 @@ public MockTransportService( settings.get(Node.NODE_NAME_SETTING.getKey(), UUIDs.randomBase64UUID()) ), clusterSettings, - Collections.emptySet() + Collections.emptySet(), + new TraceEventsService() ); } @@ -225,9 +228,10 @@ public MockTransportService( TransportInterceptor interceptor, Function localNodeFactory, @Nullable ClusterSettings clusterSettings, - Set taskHeaders + Set taskHeaders, + TraceEventsService traceEventsService ) { - this(settings, new StubbableTransport(transport), threadPool, interceptor, localNodeFactory, clusterSettings, taskHeaders); + this(settings, new StubbableTransport(transport), threadPool, interceptor, localNodeFactory, clusterSettings, taskHeaders, traceEventsService); } private MockTransportService( @@ -237,7 +241,8 @@ private MockTransportService( TransportInterceptor interceptor, Function localNodeFactory, @Nullable ClusterSettings clusterSettings, - Set taskHeaders + Set taskHeaders, + TraceEventsService traceEventsService ) { super( settings, @@ -247,7 +252,8 @@ private MockTransportService( localNodeFactory, clusterSettings, taskHeaders, - new StubbableConnectionManager(new ClusterConnectionManager(settings, transport)) + new StubbableConnectionManager(new ClusterConnectionManager(settings, transport)), + traceEventsService ); this.original = transport.getDelegate(); } From ddd6a7cf4e2fe29813c873441bbe639194f1f8b1 Mon Sep 17 00:00:00 2001 From: Rishabh Maurya Date: Thu, 3 Aug 2023 12:58:31 -0700 Subject: [PATCH 3/4] fix TransportResponse refactor --- .../telemetry/TraceEventTransportResponseHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/telemetry/TraceEventTransportResponseHandler.java b/server/src/main/java/org/opensearch/telemetry/TraceEventTransportResponseHandler.java index 07bb1a1a2fe9a..43735d6ec6956 100644 --- a/server/src/main/java/org/opensearch/telemetry/TraceEventTransportResponseHandler.java +++ b/server/src/main/java/org/opensearch/telemetry/TraceEventTransportResponseHandler.java @@ -15,7 +15,7 @@ import org.opensearch.telemetry.tracing.listeners.TraceEventsRunnable; import org.opensearch.telemetry.tracing.listeners.TraceEventsService; import org.opensearch.transport.TransportException; -import org.opensearch.transport.TransportResponse; +import org.opensearch.core.transport.TransportResponse; import org.opensearch.transport.TransportResponseHandler; import java.io.IOException; From a2eec2fbc8609df237b00d51db4506ade04c23f8 Mon Sep 17 00:00:00 2001 From: Rishabh Maurya Date: Thu, 3 Aug 2023 12:59:40 -0700 Subject: [PATCH 4/4] remove TracerUtil --- server/src/main/java/org/opensearch/node/Node.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/server/src/main/java/org/opensearch/node/Node.java b/server/src/main/java/org/opensearch/node/Node.java index 66fc9fc7a6b1d..1449e5efe6ae1 100644 --- a/server/src/main/java/org/opensearch/node/Node.java +++ b/server/src/main/java/org/opensearch/node/Node.java @@ -68,7 +68,6 @@ import org.opensearch.tasks.TaskCancellationMonitoringSettings; import org.opensearch.tasks.TaskResourceTrackingService; import org.opensearch.tasks.consumer.TopNSearchTasksLogger; -import org.opensearch.telemetry.tracing.TracerUtil; import org.opensearch.threadpool.RunnableTaskExecutionListener; import org.opensearch.index.store.RemoteSegmentStoreDirectoryFactory; import org.opensearch.telemetry.TelemetryModule; @@ -1044,10 +1043,8 @@ protected Node( TelemetryModule telemetryModule = new TelemetryModule(telemetryPlugins, telemetrySettings); tracerFactory = new TracerFactory(telemetrySettings, telemetryModule.getTelemetry(), threadPool.getThreadContext(), traceEventsService); initializeTraceEventService(traceEventsService, telemetryModule, telemetryPlugins); - TracerUtil.setTraceEventService(traceEventsService); } else { tracerFactory = new NoopTracerFactory(); - TracerUtil.setTraceEventService(traceEventsService); } tracer = tracerFactory.getTracer(); resourcesToClose.add(tracer);