diff --git a/conf/defaults.yaml b/conf/defaults.yaml index 4368099725c..d92da7b4658 100644 --- a/conf/defaults.yaml +++ b/conf/defaults.yaml @@ -278,6 +278,7 @@ topology.max.task.parallelism: null topology.max.spout.pending: null # ideally should be larger than topology.producer.batch.size. (esp. if topology.batch.flush.interval.millis=0) topology.state.synchronization.timeout.secs: 60 topology.stats.sample.rate: 0.05 +topology.stats.ewma.enable: false topology.builtin.metrics.bucket.size.secs: 60 topology.fall.back.on.java.serialization: false topology.worker.childopts: null diff --git a/docs/Metrics.md b/docs/Metrics.md index 4a2db2728b7..6c65c0419dc 100644 --- a/docs/Metrics.md +++ b/docs/Metrics.md @@ -180,15 +180,46 @@ Similar to the tuple counting metrics storm also collects average latency metric ##### `__complete-latency` -The complete latency is just for spouts. It is the average amount of time it took for `ack` or `fail` to be called for a tuple after it was emitted. If acking is disabled this metric is likely to be blank or 0 for all values, and should be ignored. +The complete latency is just for spouts. It is the average amount of time it took for `ack` or `fail` to be called for a +tuple after it was emitted. If acking is disabled this metric is likely to be blank or 0 for all values, and should be +ignored. + +##### `__complete-rfc1889a-jitter` + +This metric is specific to spouts. It measures the variation (jitter) in the total completion time (end-to-end latency) +of tuples, calculated using the exponentially weighted moving average (EWMA) algorithm as defined in RFC 1889 appendix +A. +While `__complete-latency` indicates the average amount of time it took for a tuple to be fully processed by the +topology (from emission to the final ack), the jitter metric quantifies the consistency of that process. If acking is +disabled, this metric is likely to be blank or 0 and should be ignored. ##### `__execute-latency` -This is just for bolts. It is the average amount of time that the bolt spent in the call to the `execute` method. The higher this gets, the lower the throughput of tuples per bolt instance. +This is just for bolts. It is the average amount of time that the bolt spent in the call to the `execute` method. The +higher this gets, the lower the throughput of tuples per bolt instance. + +##### `__execute-rfc1889a-jitter` + +This metric is specific to bolts. It measures the variation (jitter) in the time spent within the execute method, +calculated using the exponentially weighted moving average (EWMA) algorithm as defined in RFC 1889 appendix A. +While `__execute-latency` provides the average time spent in the execute call, the jitter metric quantifies the +predictability of that execution time. It is a critical indicator of computational "smoothness". ##### `__process-latency` -This is also just for bolts. It is the average amount of time between when `execute` was called to start processing a tuple, to when it was acked or failed by the bolt. If your bolt is a very simple bolt and the processing is synchronous then `__process-latency` and `__execute-latency` should be very close to one another, with process latency being slightly smaller. If you are doing a join or have asynchronous processing then it may take a while for a tuple to be acked so the process latency would be higher than the execute latency. +This is also just for bolts. It is the average amount of time between when `execute` was called to start processing a +tuple, to when it was acked or failed by the bolt. If your bolt is a very simple bolt and the processing is synchronous +then `__process-latency` and `__execute-latency` should be very close to one another, with process latency being +slightly smaller. If you are doing a join or have asynchronous processing then it may take a while for a tuple to be +acked so the process latency would be higher than the execute latency. + +##### `__process-rfc1889a-jitter` + +This metric is specific to bolts. It measures the variation (jitter) in the process latency, calculated using the +exponentially weighted moving average (EWMA) algorithm as defined in RFC 1889 appendix A. +While `__process-latency` provides the average time a tuple spends being processed, the jitter metric quantifies the +stability of that processing time. It helps identify "noisy" execution environments where processing times fluctuate +significantly, even if the average remains within acceptable limits. ##### `__skipped-max-spout-ms` diff --git a/storm-client/src/jvm/org/apache/storm/Config.java b/storm-client/src/jvm/org/apache/storm/Config.java index e332b726b28..9e28aaec2dc 100644 --- a/storm-client/src/jvm/org/apache/storm/Config.java +++ b/storm-client/src/jvm/org/apache/storm/Config.java @@ -596,6 +596,20 @@ public class Config extends HashMap { */ @IsPositiveNumber public static final String TOPOLOGY_STATS_SAMPLE_RATE = "topology.stats.sample.rate"; + /** + * Enabling jitter streaming calculation (RFC 1889a). + * + * @see RFC 1889 Appendix A.8 + */ + @IsBoolean + public static final String TOPOLOGY_STATS_EWMA_ENABLE = "topology.stats.ewma.enable"; + /** + * The smoothing factor (alpha) used for exponential jitter calculation (RFC 1889a). + * + * @see RFC 1889 Appendix A.8 + */ + @CustomValidator(validatorClass = ConfigValidation.EwmaSmoothingFactorValidator.class) + public static final String TOPOLOGY_STATS_EWMA_SMOOTHING_FACTOR = "topology.stats.ewma.smoothing_factor"; /** * The time period that builtin metrics data in bucketed into. */ diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/EwmaGauge.java b/storm-client/src/jvm/org/apache/storm/metrics2/EwmaGauge.java new file mode 100644 index 00000000000..baff872f374 --- /dev/null +++ b/storm-client/src/jvm/org/apache/storm/metrics2/EwmaGauge.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + */ + +package org.apache.storm.metrics2; + +import static org.apache.storm.utils.ConfigUtils.RFC1889_ALPHA; + +import com.codahale.metrics.Gauge; + +import java.util.concurrent.atomic.AtomicLong; + +/** + * Lock-free jitter estimator following RFC 1889 Section 6.3.1. + * The jitter accumulator is stored as raw IEEE 754 bits in an AtomicLong + * so that CAS can be used without locks. + * Thread safety: addValue is lock-free; getValue is wait-free. + */ +public class EwmaGauge implements Gauge { + + private static final long UNSEEDED = Long.MIN_VALUE; + private static final long ZERO_BITS = Double.doubleToLongBits(0.0); + + private final AtomicLong lastTransit = new AtomicLong(UNSEEDED); + private final AtomicLong jitterBits = new AtomicLong(ZERO_BITS); + private final double alpha; + + EwmaGauge(double alpha) { + if (alpha <= 0.0 || alpha >= 1.0 || Double.isNaN(alpha)) { + throw new IllegalArgumentException( + "alpha must be in (0, 1), got: " + alpha); + } + this.alpha = alpha; + } + + EwmaGauge() { + this(RFC1889_ALPHA); // 1.0 / 16.0 + } + + /** + * Update the jitter estimate. + * + * @param transitMs transit time for this tuple: {@code arrival - timestamp} + * Negative values are silently ignored. + */ + public void addValue(long transitMs) { + if (transitMs < 0) { + return; + } + + // Seed on the very first packet: store transit, nothing to diff against yet. + if (lastTransit.compareAndSet(UNSEEDED, transitMs)) { + return; + } + + long prev = lastTransit.getAndSet(transitMs); + if (prev == UNSEEDED) { + // Lost a race during seeding; prev is not a real transit value. + return; + } + + double d = Math.abs(transitMs - prev); + + long currentBits; + long updatedBits; + do { + currentBits = jitterBits.get(); + double currentJitter = Double.longBitsToDouble(currentBits); + double updatedJitter = currentJitter + alpha * (d - currentJitter); + updatedBits = Double.doubleToLongBits(updatedJitter); + } while (!jitterBits.compareAndSet(currentBits, updatedBits)); + } + + /** + * Returns the current jitter estimate in timestamp units. + */ + @Override + public Double getValue() { + return Double.longBitsToDouble(jitterBits.get()); + } +} \ No newline at end of file diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/TaskMetrics.java b/storm-client/src/jvm/org/apache/storm/metrics2/TaskMetrics.java index 0ac3a5e9492..6bea1d3b822 100644 --- a/storm-client/src/jvm/org/apache/storm/metrics2/TaskMetrics.java +++ b/storm-client/src/jvm/org/apache/storm/metrics2/TaskMetrics.java @@ -12,10 +12,12 @@ package org.apache.storm.metrics2; -import com.codahale.metrics.Counter; +import com.codahale.metrics.Gauge; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.function.Supplier; + import org.apache.storm.task.WorkerTopologyContext; import org.apache.storm.utils.ConfigUtils; import org.apache.storm.utils.Utils; @@ -27,12 +29,18 @@ public class TaskMetrics { private static final String METRIC_NAME_TRANSFERRED = "__transfer-count"; private static final String METRIC_NAME_EXECUTED = "__execute-count"; private static final String METRIC_NAME_PROCESS_LATENCY = "__process-latency"; + private static final String METRIC_NAME_PROCESS_RFC_1889a_JITTER = "__process-rfc1889a-jitter"; private static final String METRIC_NAME_COMPLETE_LATENCY = "__complete-latency"; + private static final String METRIC_NAME_COMPLETE_RFC_1889a_JITTER = "__complete-rfc1889a-jitter"; private static final String METRIC_NAME_EXECUTE_LATENCY = "__execute-latency"; + private static final String METRIC_NAME_EXECUTE_RFC_1889a_JITTER = "__execute-rfc1889a-jitter"; private static final String METRIC_NAME_CAPACITY = "__capacity"; private final ConcurrentMap rateCounters = new ConcurrentHashMap<>(); - private final ConcurrentMap gauges = new ConcurrentHashMap<>(); + private final ConcurrentMap gauges = new ConcurrentHashMap<>(); + // Gauge supplier singleton factories + private final Supplier ewmaGaugeFactory; + private final Supplier rollingAverageGaugeFactory; private final String topologyId; private final String componentId; @@ -40,6 +48,7 @@ public class TaskMetrics { private final Integer workerPort; private final StormMetricRegistry metricRegistry; private final int samplingRate; + private final boolean ewmaEnable; public TaskMetrics(WorkerTopologyContext context, String componentId, Integer taskid, @@ -50,6 +59,10 @@ public TaskMetrics(WorkerTopologyContext context, String componentId, Integer ta this.taskId = taskid; this.workerPort = context.getThisWorkerPort(); this.samplingRate = ConfigUtils.samplingRate(topoConf); + double ewmaSmoothingFactor = ConfigUtils.ewmaSmoothingFactor(topoConf); + this.ewmaEnable = ConfigUtils.ewmaEnable(topoConf); + this.rollingAverageGaugeFactory = RollingAverageGauge::new; + this.ewmaGaugeFactory = () -> new EwmaGauge(ewmaSmoothingFactor); } public void setCapacity(double capacity) { @@ -67,6 +80,12 @@ public void spoutAckedTuple(String streamId, long latencyMs) { metricName = METRIC_NAME_COMPLETE_LATENCY + "-" + streamId; RollingAverageGauge gauge = this.getRollingAverageGauge(metricName, streamId); gauge.addValue(latencyMs); + + if (this.ewmaEnable) { + metricName = METRIC_NAME_COMPLETE_RFC_1889a_JITTER + "-" + streamId; + EwmaGauge ewmaGauge = this.getExponentialWeightedMobileAverageGauge(metricName, streamId); + ewmaGauge.addValue(latencyMs); + } } public void boltAckedTuple(String sourceComponentId, String sourceStreamId, long latencyMs) { @@ -78,6 +97,12 @@ public void boltAckedTuple(String sourceComponentId, String sourceStreamId, long metricName = METRIC_NAME_PROCESS_LATENCY + "-" + key; RollingAverageGauge gauge = this.getRollingAverageGauge(metricName, sourceStreamId); gauge.addValue(latencyMs); + + if (this.ewmaEnable) { + metricName = METRIC_NAME_PROCESS_RFC_1889a_JITTER + "-" + key; + EwmaGauge ewmaGauge = this.getExponentialWeightedMobileAverageGauge(metricName, sourceStreamId); + ewmaGauge.addValue(latencyMs); + } } public void spoutFailedTuple(String streamId) { @@ -117,6 +142,12 @@ public void boltExecuteTuple(String sourceComponentId, String sourceStreamId, lo metricName = METRIC_NAME_EXECUTE_LATENCY + "-" + key; RollingAverageGauge gauge = this.getRollingAverageGauge(metricName, sourceStreamId); gauge.addValue(latencyMs); + + if (this.ewmaEnable) { + metricName = METRIC_NAME_EXECUTE_RFC_1889a_JITTER + "-" + key; + EwmaGauge ewmaGauge = this.getExponentialWeightedMobileAverageGauge(metricName, sourceStreamId); + ewmaGauge.addValue(latencyMs); + } } private RateCounter getRateCounter(String metricName, String streamId) { @@ -135,18 +166,54 @@ private RateCounter getRateCounter(String metricName, String streamId) { } private RollingAverageGauge getRollingAverageGauge(String metricName, String streamId) { - RollingAverageGauge gauge = this.gauges.get(metricName); - if (gauge == null) { + return getOrCreateGauge(metricName, streamId, RollingAverageGauge.class, this.rollingAverageGaugeFactory); + } + + private EwmaGauge getExponentialWeightedMobileAverageGauge(String metricName, String streamId) { + return getOrCreateGauge(metricName, streamId, EwmaGauge.class, this.ewmaGaugeFactory); + } + + private > G getOrCreateGauge( + String metricName, + String streamId, + Class gaugeClass, + Supplier factory) { + + Object existing = this.gauges.get(metricName); + if (existing == null) { synchronized (this) { - gauge = this.gauges.get(metricName); - if (gauge == null) { - gauge = new RollingAverageGauge(); - metricRegistry.gauge(metricName, gauge, this.topologyId, this.componentId, - streamId, this.taskId, this.workerPort); - this.gauges.put(metricName, gauge); + existing = this.gauges.get(metricName); + if (existing == null) { + G created = factory.get(); + registerGauge(metricName, streamId, created); + this.gauges.put(metricName, created); + return created; } } } - return gauge; + + if (!gaugeClass.isInstance(existing)) { + throw new IllegalStateException( + "Metric '" + metricName + "' is registered as " + + existing.getClass().getName() + + " but expected " + gaugeClass.getName()); + } + + return gaugeClass.cast(existing); + } + + /* + * Safe cast: G is bounded by Gauge in the signature of getOrCreateGauge, + * so every instance of G is by definition a Gauge. + * The cast to raw Gauge is required because metricRegistry.gauge() does not + * accept Gauge the wildcard is not compatible with the type parameter T + * expected by the external API. Type-safety is guaranteed by the bound + * > declared at the call site. + */ + @SuppressWarnings({"unchecked", "rawtypes"}) + private void registerGauge(String metricName, String streamId, Gauge gauge) { + metricRegistry.gauge(metricName, (Gauge) gauge, this.topologyId, + this.componentId, streamId, this.taskId, this.workerPort); } + } diff --git a/storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java b/storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java index 9357cc562cf..94599ec8935 100644 --- a/storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java +++ b/storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java @@ -39,6 +39,7 @@ public class ConfigUtils { public static final String FILE_SEPARATOR = File.separator; public static final String STORM_HOME = "storm.home"; public static final String RESOURCES_SUBDIR = "resources"; + public static final double RFC1889_ALPHA = 1.0 / 16.0; private static final Set passwordConfigKeys = new HashSet<>(); @@ -175,6 +176,28 @@ public static int samplingRate(Map conf) { throw new IllegalArgumentException("Illegal topology.stats.sample.rate in conf: " + rate); } + public static double ewmaSmoothingFactor(Map conf) { + Object value = conf.get(Config.TOPOLOGY_STATS_EWMA_SMOOTHING_FACTOR); + if (value == null) { + return RFC1889_ALPHA; + } + double alpha = ObjectReader.getDouble(value); + if (alpha > 0.0 && alpha < 1.0) { + return alpha; + } + throw new IllegalArgumentException( + "Illegal " + Config.TOPOLOGY_STATS_EWMA_SMOOTHING_FACTOR + + " in conf: " + alpha + " must be in (0, 1)"); + } + + public static boolean ewmaEnable(Map conf) { + Object value = conf.get(Config.TOPOLOGY_STATS_EWMA_ENABLE); + if (value == null) { + return false; + } + return ObjectReader.getBoolean(value, false); + } + public static BooleanSupplier mkStatsSampler(Map conf) { return evenSampler(samplingRate(conf)); } diff --git a/storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java b/storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java index 4175ee9f4a7..228ca8bfb25 100644 --- a/storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java +++ b/storm-client/src/jvm/org/apache/storm/validation/ConfigValidation.java @@ -849,6 +849,23 @@ public void validateField(String name, Object o) { } } + public static class EwmaSmoothingFactorValidator extends Validator { + @Override + public void validateField(String name, Object o) { + if (o == null) { + return; + } + if (o instanceof Number) { + double alpha = ((Number) o).doubleValue(); + if (alpha > 0.0 && alpha < 1.0) { + return; + } + } + throw new IllegalArgumentException( + "Field " + name + " must be a number in the open interval (0, 1), got: " + o); + } + } + public static class CustomIsExactlyOneOfValidators extends Validator { private Class[] subValidators; private List validatorClassNames; diff --git a/storm-client/test/jvm/org/apache/storm/TestConfigValidate.java b/storm-client/test/jvm/org/apache/storm/TestConfigValidate.java index 3c335308da5..58668db1db6 100644 --- a/storm-client/test/jvm/org/apache/storm/TestConfigValidate.java +++ b/storm-client/test/jvm/org/apache/storm/TestConfigValidate.java @@ -206,6 +206,32 @@ public void testTopologyStatsSampleRateIsFloat() { ConfigValidation.validateFields(conf); } + @Test + public void testTopologyStatsEwmaEnableIsBoolean() { + Map conf = new HashMap<>(); + // optional configuration + ConfigValidation.validateFields(conf); + conf.put(Config.TOPOLOGY_STATS_EWMA_ENABLE, true); + ConfigValidation.validateFields(conf); + conf.put(Config.TOPOLOGY_STATS_EWMA_ENABLE, false); + ConfigValidation.validateFields(conf); + } + + @Test + public void testTopologyStatsEwmaSmoothingFactorCustomValidator() { + Map conf = new HashMap<>(); + // optional configuration + ConfigValidation.validateFields(conf); + conf.put(Config.TOPOLOGY_STATS_EWMA_SMOOTHING_FACTOR, 0.1); + ConfigValidation.validateFields(conf); + conf.put(Config.TOPOLOGY_STATS_EWMA_SMOOTHING_FACTOR, 0.9); + ConfigValidation.validateFields(conf); + for (double notAllowedValue : new double[]{0.0, -0.1, 1.9}) { + conf.put(Config.TOPOLOGY_STATS_EWMA_SMOOTHING_FACTOR, notAllowedValue); + assertThrows(IllegalArgumentException.class, () -> ConfigValidation.validateFields(conf)); + } + } + @Test public void testWorkerChildoptsIsStringOrStringList() { Map conf = new HashMap<>(); diff --git a/storm-client/test/jvm/org/apache/storm/metrics2/EwmaGaugeTest.java b/storm-client/test/jvm/org/apache/storm/metrics2/EwmaGaugeTest.java new file mode 100644 index 00000000000..48d11bc1262 --- /dev/null +++ b/storm-client/test/jvm/org/apache/storm/metrics2/EwmaGaugeTest.java @@ -0,0 +1,347 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + */ + +package org.apache.storm.metrics2; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class EwmaGaugeTest { + + private static final double DELTA = 1e-9; + + @Nested + @DisplayName("Construction") + class ConstructionTest { + + @Test + @DisplayName("Default constructor uses RFC 1889 alpha (1/16)") + void defaultAlpha() { + EwmaGauge gauge = new EwmaGauge(); + gauge.addValue(0L); + gauge.addValue(16L); // D = 16 ; J = 0 + (16 - 0) * (1/16) = 1.0 + assertEquals(1.0, gauge.getValue(), DELTA); + } + + @Test + @DisplayName("Invalid alpha values throw IllegalArgumentException") + void invalidAlphaThrows() { + double[] invalidAlphas = { + 0.0, 1.0, -0.1, 1.1, + Double.NaN, Double.POSITIVE_INFINITY, Double.NEGATIVE_INFINITY + }; + for (double alpha : invalidAlphas) { + assertThrows(IllegalArgumentException.class, + () -> new EwmaGauge(alpha), + "Expected IllegalArgumentException for alpha=" + alpha); + } + } + + @Test + @DisplayName("Valid alpha boundary values are accepted") + void validAlphaAccepted() { + double[] validAlphas = {0.001, 0.0625, 0.5, 0.999}; + for (double alpha : validAlphas) { + assertNotNull(new EwmaGauge(alpha), + "Expected no exception for alpha=" + alpha); + } + } + } + + + @Nested + @DisplayName("Cold-start semantics") + class ColdStartTest { + + private EwmaGauge gauge; + + @BeforeEach + void setUp() { + gauge = new EwmaGauge(); + } + + @Test + @DisplayName("getValue() returns 0.0 before any sample") + void noSamples() { + assertEquals(0.0, gauge.getValue(), DELTA); + } + + @Test + @DisplayName("getValue() returns 0.0 after exactly one sample (seed only)") + void oneSample() { + gauge.addValue(100L); + assertEquals(0.0, gauge.getValue(), DELTA); + } + + } + + @Nested + @DisplayName("EWMA formula RFC 1889 §A.8") + class FormulaTest { + + @Test + @DisplayName("Single update: J = 0 + (D - 0) * alpha") + void singleDeviation() { + EwmaGauge gauge = new EwmaGauge(0.5); + gauge.addValue(0L); + gauge.addValue(10L); + assertEquals(5.0, gauge.getValue(), DELTA); + } + + @Test + @DisplayName("Manual step-by-step verification against reference values") + void manualSteps() { + EwmaGauge gauge = new EwmaGauge(0.5); + + gauge.addValue(0L); // seed + + // Step 1: transit=10, prev=0, D=10, J = 0 + (10-0) * 0.5 = 5.0 + gauge.addValue(10L); + assertEquals(5.0, gauge.getValue(), DELTA, "Step 1"); + + // Step 2: transit=0, prev=10, D=10, J = 5.0 + (10-5.0) * 0.5 = 7.5 + gauge.addValue(0L); + assertEquals(7.5, gauge.getValue(), DELTA, "Step 2"); + + // Step 3: transit=10, prev=0, D=10, J = 7.5 + (10-7.5) * 0.5 = 8.75 + gauge.addValue(10L); + assertEquals(8.75, gauge.getValue(), DELTA, "Step 3"); + } + + @Test + @DisplayName("Zero deviation decays jitter toward zero") + void zeroDeviationDecays() { + EwmaGauge gauge = new EwmaGauge(0.5); + gauge.addValue(0L); // 0 + gauge.addValue(10L); // 0 + 5*alpha = 2.5 + double afterFirst = gauge.getValue(); + assertEquals(afterFirst, gauge.getValue(), DELTA); + + gauge.addValue(10L); // 2.5 - 2.5*alpha = 2.5 - 1.25 = 1.25 + assertEquals(afterFirst * 0.5, gauge.getValue(), DELTA); + } + + } + + + @Nested + @DisplayName("Negative value guard") + class NegativeValueTest { + + @Test + @DisplayName("Negative transit values are silently ignored before seed") + void negativeIgnoredBeforeSeed() { + EwmaGauge gauge = new EwmaGauge(); + gauge.addValue(-1L); + gauge.addValue(-100L); + assertEquals(0.0, gauge.getValue(), DELTA); + } + + @Test + @DisplayName("Negative value after seed does not corrupt lastTransit") + void negativeAfterSeedIgnored() { + EwmaGauge gauge = new EwmaGauge(0.5); + gauge.addValue(10L); + gauge.addValue(-5L); + gauge.addValue(20L); + assertEquals(5.0, gauge.getValue(), DELTA); + } + } + + + @Nested + @DisplayName("getValue() preserves EWMA across calls") + class GetValueIdempotentTest { + + @Test + @DisplayName("Repeated getValue() without new samples returns same estimate") + void repeatedGetValueStable() { + EwmaGauge gauge = new EwmaGauge(0.5); + gauge.addValue(0L); + gauge.addValue(10L); + double first = gauge.getValue(); + + assertEquals(first, gauge.getValue(), DELTA, "Second call"); + assertEquals(first, gauge.getValue(), DELTA, "Third call"); + } + + @Test + @DisplayName("EWMA accumulates correctly across multiple reporting windows") + void acrossReportingWindows() { + EwmaGauge gauge = new EwmaGauge(0.5); + gauge.addValue(0L); + + gauge.addValue(10L); + assertEquals(5.0, gauge.getValue(), DELTA, "Window 1"); + + gauge.addValue(0L); + assertEquals(7.5, gauge.getValue(), DELTA, "Window 2"); + + gauge.addValue(10L); + assertEquals(8.75, gauge.getValue(), DELTA, "Window 3"); + } + } + + @Nested + @DisplayName("thread safe") + class ConcurrencyTest { + + @Test + @DisplayName("Concurrent addValue() calls do not corrupt state") + void concurrentAddValue() throws InterruptedException { + EwmaGauge gauge = new EwmaGauge(); + int threads = 8; + int samplesPerThread = 10_000; + CountDownLatch ready = new CountDownLatch(threads); + CountDownLatch start = new CountDownLatch(1); + ExecutorService pool = Executors.newFixedThreadPool(threads); + + for (int t = 0; t < threads; t++) { + final long base = t * 10L; + pool.submit(() -> { + ready.countDown(); + try { + start.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + for (int i = 0; i < samplesPerThread; i++) { + gauge.addValue(base + (i % 10)); + } + }); + } + + ready.await(); + start.countDown(); + pool.shutdown(); + assertTrue(pool.awaitTermination(10, TimeUnit.SECONDS), + "Executor did not terminate — possible deadlock"); + + double value = gauge.getValue(); + assertTrue(value >= 0.0, "Jitter must be non-negative, got: " + value); + assertTrue(Double.isFinite(value), "Jitter must be finite, got: " + value); + } + + @Test + @DisplayName("Concurrent getValue() and addValue() do not deadlock") + void concurrentGetAndAdd() throws Exception { + EwmaGauge gauge = new EwmaGauge(); + ExecutorService pool = Executors.newFixedThreadPool(2); + CountDownLatch done = new CountDownLatch(2); + + Future writer = pool.submit(() -> { + for (int i = 0; i < 50_000; i++) { + gauge.addValue(i % 100); + } + done.countDown(); + }); + + Future reader = pool.submit(() -> { + for (int i = 0; i < 1_000; i++) { + double v = gauge.getValue(); + assertTrue(v >= 0.0 && Double.isFinite(v), + "getValue() returned invalid result: " + v); + } + done.countDown(); + }); + + assertTrue(done.await(10, TimeUnit.SECONDS), + "Test did not complete within timeout possible deadlock"); + + writer.get(); + reader.get(); + pool.shutdown(); + } + + @Test + @DisplayName("Only one thread seeds lastTransit all same value gives zero jitter") + void seedRace() throws InterruptedException { + EwmaGauge gauge = new EwmaGauge(); + int threads = 16; + CountDownLatch ready = new CountDownLatch(threads); + CountDownLatch start = new CountDownLatch(1); + ExecutorService pool = Executors.newFixedThreadPool(threads); + + for (int t = 0; t < threads; t++) { + pool.submit(() -> { + ready.countDown(); + try { + start.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + gauge.addValue(42L); + }); + } + + ready.await(); + start.countDown(); + pool.shutdown(); + assertTrue(pool.awaitTermination(5, TimeUnit.SECONDS), + "Executor did not terminate possible deadlock"); + + assertEquals(0.0, gauge.getValue(), DELTA); + } + } + + @Nested + @DisplayName("Edge cases") + class EdgeCaseTest { + + @Test + @DisplayName("Long.MAX_VALUE transit does not overflow deviation") + void maxLongTransit() { + EwmaGauge gauge = new EwmaGauge(0.5); + gauge.addValue(0L); + gauge.addValue(Long.MAX_VALUE); + double value = gauge.getValue(); + assertTrue(value > 0.0, "Jitter should be positive"); + assertTrue(Double.isFinite(value), "Jitter should be finite"); + } + + @Test + @DisplayName("Zero transit time is valid and produces zero deviation") + void zeroTransit() { + EwmaGauge gauge = new EwmaGauge(0.5); + gauge.addValue(0L); + gauge.addValue(0L); + assertEquals(0.0, gauge.getValue(), DELTA); + } + + @Test + @DisplayName("Large number of samples does not overflow LongAdder") + void manySamples() { + EwmaGauge gauge = new EwmaGauge(); + gauge.addValue(0L); + for (int i = 1; i <= 100_000; i++) { + gauge.addValue(i % 2 == 0 ? 0L : 10L); + } + double value = gauge.getValue(); + assertTrue(value > 0.0, "Jitter should be positive after many samples"); + assertTrue(value <= 10.0, "Jitter cannot exceed max deviation of 10"); + assertTrue(Double.isFinite(value), "Jitter must be finite"); + } + } +} \ No newline at end of file diff --git a/storm-client/test/jvm/org/apache/storm/metrics2/TaskMetricsTest.java b/storm-client/test/jvm/org/apache/storm/metrics2/TaskMetricsTest.java new file mode 100644 index 00000000000..6b6c7af786b --- /dev/null +++ b/storm-client/test/jvm/org/apache/storm/metrics2/TaskMetricsTest.java @@ -0,0 +1,487 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version + * 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions + * and limitations under the License. + */ + +package org.apache.storm.metrics2; + +import com.codahale.metrics.Gauge; +import org.apache.storm.task.WorkerTopologyContext; +import org.apache.storm.utils.ConfigUtils; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.mockito.MockedStatic; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.*; +import static org.mockito.Mockito.*; + +@ExtendWith(MockitoExtension.class) +class TaskMetricsTest { + + private static final String TOPOLOGY_ID = "test-topology-1"; + private static final String COMPONENT_ID = "test-bolt"; + private static final Integer TASK_ID = 42; + private static final Integer WORKER_PORT = 6700; + private static final String STREAM_ID = "default"; + private static final String SOURCE_COMP = "source-spout"; + private static final int SAMPLING_RATE = 1; + private static final double EWMA_FACTOR = 0.3; + + @Mock private WorkerTopologyContext context; + @Mock private StormMetricRegistry metricRegistry; + @Mock private RateCounter rateCounter; + @Mock private RollingAverageGauge rollingAverageGauge; + @Mock private EwmaGauge ewmaGauge; + + private Map topoConf; + + private TaskMetrics buildTaskMetrics(boolean ewmaEnabled) { + try (MockedStatic cfgUtils = mockStatic(ConfigUtils.class)) { + cfgUtils.when(() -> ConfigUtils.samplingRate(topoConf)).thenReturn(SAMPLING_RATE); + cfgUtils.when(() -> ConfigUtils.ewmaSmoothingFactor(topoConf)).thenReturn(EWMA_FACTOR); + cfgUtils.when(() -> ConfigUtils.ewmaEnable(topoConf)).thenReturn(ewmaEnabled); + + return new TaskMetrics(context, COMPONENT_ID, TASK_ID, metricRegistry, topoConf); + } + } + + @BeforeEach + void setUp() { + when(context.getStormId()).thenReturn(TOPOLOGY_ID); + when(context.getThisWorkerPort()).thenReturn(WORKER_PORT); + + topoConf = new HashMap<>(); + + when(metricRegistry.rateCounter(anyString(), anyString(), anyString(), + anyInt(), anyInt(), anyString())).thenReturn(rateCounter); + } + + @Test + void spoutAckedTuple_incrementsAckCounter() { + when(metricRegistry.gauge(anyString(), any(Gauge.class), anyString(), + anyString(), anyString(), anyInt(), anyInt())).thenReturn(null); + TaskMetrics tm = buildTaskMetrics(false); + + tm.spoutAckedTuple(STREAM_ID, 100L); + + verify(rateCounter).inc(SAMPLING_RATE); + } + + @Test + void spoutAckedTuple_registersCompleteLatencyGauge() { + when(metricRegistry.gauge(anyString(), any(Gauge.class), anyString(), + anyString(), anyString(), anyInt(), anyInt())).thenReturn(null); + TaskMetrics tm = buildTaskMetrics(false); + + tm.spoutAckedTuple(STREAM_ID, 200L); + + verify(metricRegistry, atLeastOnce()).gauge( + contains("__complete-latency"), any(Gauge.class), + anyString(), anyString(), anyString(), anyInt(), anyInt()); + } + + @Test + void spoutAckedTuple_withEwmaEnabled_registersJitterGauge() { + when(metricRegistry.gauge(anyString(), any(Gauge.class), anyString(), + anyString(), anyString(), anyInt(), anyInt())).thenReturn(null); + TaskMetrics tm = buildTaskMetrics(true); + + tm.spoutAckedTuple(STREAM_ID, 150L); + + verify(metricRegistry, atLeastOnce()).gauge( + contains("__complete-rfc1889a-jitter"), any(Gauge.class), + anyString(), anyString(), anyString(), anyInt(), anyInt()); + } + + @Test + void spoutAckedTuple_withEwmaDisabled_doesNotRegisterJitterGauge() { + when(metricRegistry.gauge(anyString(), any(Gauge.class), anyString(), + anyString(), anyString(), anyInt(), anyInt())).thenReturn(null); + TaskMetrics tm = buildTaskMetrics(false); + + tm.spoutAckedTuple(STREAM_ID, 150L); + + verify(metricRegistry, never()).gauge( + contains("__complete-rfc1889a-jitter"), any(Gauge.class), + anyString(), anyString(), anyString(), anyInt(), anyInt()); + } + + @Test + void boltAckedTuple_incrementsAckCounter() { + when(metricRegistry.gauge(anyString(), any(Gauge.class), anyString(), + anyString(), anyString(), anyInt(), anyInt())).thenReturn(null); + TaskMetrics tm = buildTaskMetrics(false); + + tm.boltAckedTuple(SOURCE_COMP, STREAM_ID, 50L); + + verify(rateCounter).inc(SAMPLING_RATE); + } + + @Test + void boltAckedTuple_registersProcessLatencyGauge() { + when(metricRegistry.gauge(anyString(), any(Gauge.class), anyString(), + anyString(), anyString(), anyInt(), anyInt())).thenReturn(null); + TaskMetrics tm = buildTaskMetrics(false); + + tm.boltAckedTuple(SOURCE_COMP, STREAM_ID, 50L); + + verify(metricRegistry, atLeastOnce()).gauge( + contains("__process-latency"), any(Gauge.class), + anyString(), anyString(), anyString(), anyInt(), anyInt()); + } + + @Test + void boltAckedTuple_withEwmaEnabled_registersJitterGauge() { + when(metricRegistry.gauge(anyString(), any(Gauge.class), anyString(), + anyString(), anyString(), anyInt(), anyInt())).thenReturn(null); + TaskMetrics tm = buildTaskMetrics(true); + + tm.boltAckedTuple(SOURCE_COMP, STREAM_ID, 75L); + + verify(metricRegistry, atLeastOnce()).gauge( + contains("__process-rfc1889a-jitter"), any(Gauge.class), + anyString(), anyString(), anyString(), anyInt(), anyInt()); + } + + @Test + void boltAckedTuple_metricKeyIncludesSourceComponentAndStream() { + when(metricRegistry.gauge(anyString(), any(Gauge.class), anyString(), + anyString(), anyString(), anyInt(), anyInt())).thenReturn(null); + TaskMetrics tm = buildTaskMetrics(false); + + tm.boltAckedTuple(SOURCE_COMP, STREAM_ID, 50L); + + verify(metricRegistry).rateCounter( + contains(SOURCE_COMP + ":" + STREAM_ID), + anyString(), anyString(), anyInt(), anyInt(), anyString()); + } + + @Test + void spoutFailedTuple_incrementsFailCounter() { + TaskMetrics tm = buildTaskMetrics(false); + + tm.spoutFailedTuple(STREAM_ID); + + verify(rateCounter).inc(SAMPLING_RATE); + } + + @Test + void spoutFailedTuple_usesCorrectMetricName() { + TaskMetrics tm = buildTaskMetrics(false); + + tm.spoutFailedTuple(STREAM_ID); + + verify(metricRegistry).rateCounter( + eq("__fail-count-" + STREAM_ID), + eq(TOPOLOGY_ID), eq(COMPONENT_ID), eq(TASK_ID), eq(WORKER_PORT), eq(STREAM_ID)); + } + + @Test + void boltFailedTuple_incrementsFailCounter() { + TaskMetrics tm = buildTaskMetrics(false); + + tm.boltFailedTuple(SOURCE_COMP, STREAM_ID); + + verify(rateCounter).inc(SAMPLING_RATE); + } + + @Test + void boltFailedTuple_metricKeyIncludesSourceComponentAndStream() { + TaskMetrics tm = buildTaskMetrics(false); + + tm.boltFailedTuple(SOURCE_COMP, STREAM_ID); + + verify(metricRegistry).rateCounter( + eq("__fail-count-" + SOURCE_COMP + ":" + STREAM_ID), + anyString(), anyString(), anyInt(), anyInt(), anyString()); + } + + @Test + void emittedTuple_incrementsEmitCounter() { + TaskMetrics tm = buildTaskMetrics(false); + + tm.emittedTuple(STREAM_ID); + + verify(rateCounter).inc(SAMPLING_RATE); + } + + @Test + void emittedTuple_usesCorrectMetricName() { + TaskMetrics tm = buildTaskMetrics(false); + + tm.emittedTuple(STREAM_ID); + + verify(metricRegistry).rateCounter( + eq("__emit-count-" + STREAM_ID), + eq(TOPOLOGY_ID), eq(COMPONENT_ID), eq(TASK_ID), eq(WORKER_PORT), eq(STREAM_ID)); + } + + @Test + void transferredTuples_incrementsByAmountTimesSamplingRate() { + TaskMetrics tm = buildTaskMetrics(false); + int amount = 5; + + tm.transferredTuples(STREAM_ID, amount); + + verify(rateCounter).inc(amount * SAMPLING_RATE); + } + + @Test + void transferredTuples_usesCorrectMetricName() { + TaskMetrics tm = buildTaskMetrics(false); + + tm.transferredTuples(STREAM_ID, 3); + + verify(metricRegistry).rateCounter( + eq("__transfer-count-" + STREAM_ID), + anyString(), anyString(), anyInt(), anyInt(), anyString()); + } + + @Test + void boltExecuteTuple_incrementsExecuteCounter() { + when(metricRegistry.gauge(anyString(), any(Gauge.class), anyString(), + anyString(), anyString(), anyInt(), anyInt())).thenReturn(null); + TaskMetrics tm = buildTaskMetrics(false); + + tm.boltExecuteTuple(SOURCE_COMP, STREAM_ID, 30L); + + verify(rateCounter).inc(SAMPLING_RATE); + } + + @Test + void boltExecuteTuple_registersExecuteLatencyGauge() { + when(metricRegistry.gauge(anyString(), any(Gauge.class), anyString(), + anyString(), anyString(), anyInt(), anyInt())).thenReturn(null); + TaskMetrics tm = buildTaskMetrics(false); + + tm.boltExecuteTuple(SOURCE_COMP, STREAM_ID, 30L); + + verify(metricRegistry, atLeastOnce()).gauge( + contains("__execute-latency"), any(Gauge.class), + anyString(), anyString(), anyString(), anyInt(), anyInt()); + } + + @Test + void boltExecuteTuple_withEwmaEnabled_registersJitterGauge() { + when(metricRegistry.gauge(anyString(), any(Gauge.class), anyString(), + anyString(), anyString(), anyInt(), anyInt())).thenReturn(null); + TaskMetrics tm = buildTaskMetrics(true); + + tm.boltExecuteTuple(SOURCE_COMP, STREAM_ID, 30L); + + verify(metricRegistry, atLeastOnce()).gauge( + contains("__execute-rfc1889a-jitter"), any(Gauge.class), + anyString(), anyString(), anyString(), anyInt(), anyInt()); + } + + @Test + void boltExecuteTuple_withEwmaDisabled_doesNotRegisterJitterGauge() { + when(metricRegistry.gauge(anyString(), any(Gauge.class), anyString(), + anyString(), anyString(), anyInt(), anyInt())).thenReturn(null); + TaskMetrics tm = buildTaskMetrics(false); + + tm.boltExecuteTuple(SOURCE_COMP, STREAM_ID, 30L); + + verify(metricRegistry, never()).gauge( + contains("__execute-rfc1889a-jitter"), any(Gauge.class), + anyString(), anyString(), anyString(), anyInt(), anyInt()); + } + + @Test + void differentStreams_produceSeparateRateCounters() { + TaskMetrics tm = buildTaskMetrics(false); + + tm.emittedTuple("stream-A"); + tm.emittedTuple("stream-B"); + + verify(metricRegistry).rateCounter( + eq("__emit-count-stream-A"), + anyString(), anyString(), anyInt(), anyInt(), eq("stream-A")); + verify(metricRegistry).rateCounter( + eq("__emit-count-stream-B"), + anyString(), anyString(), anyInt(), anyInt(), eq("stream-B")); + } + + @Test + void rateCounter_registeredOnlyOnceForSameMetricName() { + TaskMetrics tm = buildTaskMetrics(false); + + tm.emittedTuple(STREAM_ID); + tm.emittedTuple(STREAM_ID); + tm.emittedTuple(STREAM_ID); + + verify(metricRegistry, times(1)).rateCounter( + eq("__emit-count-" + STREAM_ID), + anyString(), anyString(), anyInt(), anyInt(), anyString()); + } + + @Test + void gauge_registeredOnlyOnceForSameMetricName() { + when(metricRegistry.gauge(anyString(), any(Gauge.class), anyString(), + anyString(), anyString(), anyInt(), anyInt())).thenReturn(null); + TaskMetrics tm = buildTaskMetrics(false); + + tm.spoutAckedTuple(STREAM_ID, 10L); + tm.spoutAckedTuple(STREAM_ID, 20L); + tm.spoutAckedTuple(STREAM_ID, 30L); + + verify(metricRegistry, times(1)).gauge( + contains("__complete-latency"), any(Gauge.class), + anyString(), anyString(), anyString(), anyInt(), anyInt()); + } + + @Test + void concurrentEmittedTuple_registersRateCounterExactlyOnce() throws InterruptedException { + TaskMetrics tm = buildTaskMetrics(false); + int threadCount = 20; + CountDownLatch ready = new CountDownLatch(threadCount); + CountDownLatch start = new CountDownLatch(1); + CountDownLatch done = new CountDownLatch(threadCount); + + ExecutorService pool = Executors.newFixedThreadPool(threadCount); + for (int i = 0; i < threadCount; i++) { + pool.submit(() -> { + ready.countDown(); + try { start.await(); } catch (InterruptedException ignored) {} + tm.emittedTuple(STREAM_ID); + done.countDown(); + }); + } + + ready.await(); + start.countDown(); + assertTrue(done.await(5, TimeUnit.SECONDS)); + pool.shutdown(); + + verify(metricRegistry, times(1)).rateCounter( + eq("__emit-count-" + STREAM_ID), + anyString(), anyString(), anyInt(), anyInt(), anyString()); + } + + @Test + void concurrentSpoutAckedTuple_registersGaugeExactlyOnce() throws InterruptedException { + when(metricRegistry.gauge(anyString(), any(Gauge.class), anyString(), + anyString(), anyString(), anyInt(), anyInt())).thenReturn(null); + TaskMetrics tm = buildTaskMetrics(false); + int threadCount = 20; + CountDownLatch ready = new CountDownLatch(threadCount); + CountDownLatch start = new CountDownLatch(1); + CountDownLatch done = new CountDownLatch(threadCount); + + ExecutorService pool = Executors.newFixedThreadPool(threadCount); + for (int i = 0; i < threadCount; i++) { + pool.submit(() -> { + ready.countDown(); + try { start.await(); } catch (InterruptedException ignored) {} + tm.spoutAckedTuple(STREAM_ID, 100L); + done.countDown(); + }); + } + + ready.await(); + start.countDown(); + assertTrue(done.await(5, TimeUnit.SECONDS)); + pool.shutdown(); + + verify(metricRegistry, times(1)).gauge( + contains("__complete-latency"), any(Gauge.class), + anyString(), anyString(), anyString(), anyInt(), anyInt()); + } + + @Test + void getOrCreateGauge_sameTypeReusedWithoutThrowing() { + when(metricRegistry.gauge(anyString(), any(Gauge.class), anyString(), + anyString(), anyString(), anyInt(), anyInt())).thenReturn(null); + TaskMetrics tm = buildTaskMetrics(false); + + tm.spoutAckedTuple(STREAM_ID, 10L); + tm.spoutAckedTuple(STREAM_ID, 20L); + + verify(metricRegistry, times(1)).gauge( + contains("__complete-latency"), any(Gauge.class), + anyString(), anyString(), anyString(), anyInt(), anyInt()); + } + + @Test + void boltAckedTuple_metricNameFormat() { + when(metricRegistry.gauge(anyString(), any(Gauge.class), anyString(), + anyString(), anyString(), anyInt(), anyInt())).thenReturn(null); + TaskMetrics tm = buildTaskMetrics(false); + String expectedKey = SOURCE_COMP + ":" + STREAM_ID; + + tm.boltAckedTuple(SOURCE_COMP, STREAM_ID, 10L); + + verify(metricRegistry).rateCounter( + eq("__ack-count-" + expectedKey), + eq(TOPOLOGY_ID), eq(COMPONENT_ID), eq(TASK_ID), eq(WORKER_PORT), eq(STREAM_ID)); + } + + @Test + void boltExecuteTuple_metricNameFormat() { + when(metricRegistry.gauge(anyString(), any(Gauge.class), anyString(), + anyString(), anyString(), anyInt(), anyInt())).thenReturn(null); + TaskMetrics tm = buildTaskMetrics(false); + String expectedKey = SOURCE_COMP + ":" + STREAM_ID; + + tm.boltExecuteTuple(SOURCE_COMP, STREAM_ID, 10L); + + verify(metricRegistry).rateCounter( + eq("__execute-count-" + expectedKey), + eq(TOPOLOGY_ID), eq(COMPONENT_ID), eq(TASK_ID), eq(WORKER_PORT), eq(STREAM_ID)); + } + + @Test + void boltFailedTuple_metricNameFormat() { + TaskMetrics tm = buildTaskMetrics(false); + String expectedKey = SOURCE_COMP + ":" + STREAM_ID; + + tm.boltFailedTuple(SOURCE_COMP, STREAM_ID); + + verify(metricRegistry).rateCounter( + eq("__fail-count-" + expectedKey), + eq(TOPOLOGY_ID), eq(COMPONENT_ID), eq(TASK_ID), eq(WORKER_PORT), eq(STREAM_ID)); + } + + @Test + void contextFields_propagatedCorrectlyToRegistry() { + TaskMetrics tm = buildTaskMetrics(false); + + tm.emittedTuple(STREAM_ID); + + ArgumentCaptor topoCaptor = ArgumentCaptor.forClass(String.class); + ArgumentCaptor compCaptor = ArgumentCaptor.forClass(String.class); + ArgumentCaptor taskCaptor = ArgumentCaptor.forClass(Integer.class); + ArgumentCaptor portCaptor = ArgumentCaptor.forClass(Integer.class); + + verify(metricRegistry).rateCounter( + anyString(), + topoCaptor.capture(), compCaptor.capture(), + taskCaptor.capture(), portCaptor.capture(), + anyString()); + + assertEquals(TOPOLOGY_ID, topoCaptor.getValue()); + assertEquals(COMPONENT_ID, compCaptor.getValue()); + assertEquals(TASK_ID, taskCaptor.getValue()); + assertEquals(WORKER_PORT, portCaptor.getValue()); + } +} \ No newline at end of file