Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions conf/defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ topology.max.task.parallelism: null
topology.max.spout.pending: null # ideally should be larger than topology.producer.batch.size. (esp. if topology.batch.flush.interval.millis=0)
topology.state.synchronization.timeout.secs: 60
topology.stats.sample.rate: 0.05
topology.stats.ewma.enable: false
topology.builtin.metrics.bucket.size.secs: 60
topology.fall.back.on.java.serialization: false
topology.worker.childopts: null
Expand Down
37 changes: 34 additions & 3 deletions docs/Metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -180,15 +180,46 @@ Similar to the tuple counting metrics storm also collects average latency metric

##### `__complete-latency`

The complete latency is just for spouts. It is the average amount of time it took for `ack` or `fail` to be called for a tuple after it was emitted. If acking is disabled this metric is likely to be blank or 0 for all values, and should be ignored.
The complete latency is just for spouts. It is the average amount of time it took for `ack` or `fail` to be called for a
tuple after it was emitted. If acking is disabled this metric is likely to be blank or 0 for all values, and should be
ignored.

##### `__complete-rfc1889a-jitter`

This metric is specific to spouts. It measures the variation (jitter) in the total completion time (end-to-end latency)
of tuples, calculated using the exponentially weighted moving average (EWMA) algorithm as defined in RFC 1889 appendix
A.
While `__complete-latency` indicates the average amount of time it took for a tuple to be fully processed by the
topology (from emission to the final ack), the jitter metric quantifies the consistency of that process. If acking is
disabled, this metric is likely to be blank or 0 and should be ignored.

##### `__execute-latency`

This is just for bolts. It is the average amount of time that the bolt spent in the call to the `execute` method. The higher this gets, the lower the throughput of tuples per bolt instance.
This is just for bolts. It is the average amount of time that the bolt spent in the call to the `execute` method. The
higher this gets, the lower the throughput of tuples per bolt instance.

##### `__execute-rfc1889a-jitter`

This metric is specific to bolts. It measures the variation (jitter) in the time spent within the execute method,
calculated using the exponentially weighted moving average (EWMA) algorithm as defined in RFC 1889 appendix A.
While `__execute-latency` provides the average time spent in the execute call, the jitter metric quantifies the
predictability of that execution time. It is a critical indicator of computational "smoothness".

##### `__process-latency`

This is also just for bolts. It is the average amount of time between when `execute` was called to start processing a tuple, to when it was acked or failed by the bolt. If your bolt is a very simple bolt and the processing is synchronous then `__process-latency` and `__execute-latency` should be very close to one another, with process latency being slightly smaller. If you are doing a join or have asynchronous processing then it may take a while for a tuple to be acked so the process latency would be higher than the execute latency.
This is also just for bolts. It is the average amount of time between when `execute` was called to start processing a
tuple, to when it was acked or failed by the bolt. If your bolt is a very simple bolt and the processing is synchronous
then `__process-latency` and `__execute-latency` should be very close to one another, with process latency being
slightly smaller. If you are doing a join or have asynchronous processing then it may take a while for a tuple to be
acked so the process latency would be higher than the execute latency.

##### `__process-rfc1889a-jitter`

This metric is specific to bolts. It measures the variation (jitter) in the process latency, calculated using the
exponentially weighted moving average (EWMA) algorithm as defined in RFC 1889 appendix A.
While `__process-latency` provides the average time a tuple spends being processed, the jitter metric quantifies the
stability of that processing time. It helps identify "noisy" execution environments where processing times fluctuate
significantly, even if the average remains within acceptable limits.

##### `__skipped-max-spout-ms`

Expand Down
14 changes: 14 additions & 0 deletions storm-client/src/jvm/org/apache/storm/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,20 @@ public class Config extends HashMap<String, Object> {
*/
@IsPositiveNumber
public static final String TOPOLOGY_STATS_SAMPLE_RATE = "topology.stats.sample.rate";
/**
* Enabling jitter streaming calculation (RFC 1889a).
*
* @see <a href="https://www.rfc-editor.org/rfc/rfc1889#appendix-A.8">RFC 1889 Appendix A.8</a>
*/
@IsBoolean
public static final String TOPOLOGY_STATS_EWMA_ENABLE = "topology.stats.ewma.enable";
/**
* The smoothing factor (alpha) used for exponential jitter calculation (RFC 1889a).
*
* @see <a href="https://www.rfc-editor.org/rfc/rfc1889#appendix-A.8">RFC 1889 Appendix A.8</a>
*/
@CustomValidator(validatorClass = ConfigValidation.EwmaSmoothingFactorValidator.class)
public static final String TOPOLOGY_STATS_EWMA_SMOOTHING_FACTOR = "topology.stats.ewma.smoothing_factor";
/**
* The time period that builtin metrics data in bucketed into.
*/
Expand Down
89 changes: 89 additions & 0 deletions storm-client/src/jvm/org/apache/storm/metrics2/EwmaGauge.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/

package org.apache.storm.metrics2;

import static org.apache.storm.utils.ConfigUtils.RFC1889_ALPHA;

import com.codahale.metrics.Gauge;

import java.util.concurrent.atomic.AtomicLong;

/**
* Lock-free jitter estimator following RFC 1889 Section 6.3.1.
* The jitter accumulator is stored as raw IEEE 754 bits in an AtomicLong
* so that CAS can be used without locks.
* Thread safety: addValue is lock-free; getValue is wait-free.
*/
public class EwmaGauge implements Gauge<Double> {

private static final long UNSEEDED = Long.MIN_VALUE;
private static final long ZERO_BITS = Double.doubleToLongBits(0.0);

private final AtomicLong lastTransit = new AtomicLong(UNSEEDED);
private final AtomicLong jitterBits = new AtomicLong(ZERO_BITS);
private final double alpha;

EwmaGauge(double alpha) {
if (alpha <= 0.0 || alpha >= 1.0 || Double.isNaN(alpha)) {
throw new IllegalArgumentException(
"alpha must be in (0, 1), got: " + alpha);
}
this.alpha = alpha;
}

EwmaGauge() {
this(RFC1889_ALPHA); // 1.0 / 16.0
}

/**
* Update the jitter estimate.
*
* @param transitMs transit time for this tuple: {@code arrival - timestamp}
* Negative values are silently ignored.
*/
public void addValue(long transitMs) {
if (transitMs < 0) {
return;
}

// Seed on the very first packet: store transit, nothing to diff against yet.
if (lastTransit.compareAndSet(UNSEEDED, transitMs)) {
return;
}

long prev = lastTransit.getAndSet(transitMs);
if (prev == UNSEEDED) {
// Lost a race during seeding; prev is not a real transit value.
return;
}

double d = Math.abs(transitMs - prev);

long currentBits;
long updatedBits;
do {
currentBits = jitterBits.get();
double currentJitter = Double.longBitsToDouble(currentBits);
double updatedJitter = currentJitter + alpha * (d - currentJitter);
updatedBits = Double.doubleToLongBits(updatedJitter);
} while (!jitterBits.compareAndSet(currentBits, updatedBits));
}

/**
* Returns the current jitter estimate in timestamp units.
*/
@Override
public Double getValue() {
return Double.longBitsToDouble(jitterBits.get());
}
}
89 changes: 78 additions & 11 deletions storm-client/src/jvm/org/apache/storm/metrics2/TaskMetrics.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@

package org.apache.storm.metrics2;

import com.codahale.metrics.Counter;
import com.codahale.metrics.Gauge;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Supplier;

import org.apache.storm.task.WorkerTopologyContext;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.Utils;
Expand All @@ -27,19 +29,26 @@ public class TaskMetrics {
private static final String METRIC_NAME_TRANSFERRED = "__transfer-count";
private static final String METRIC_NAME_EXECUTED = "__execute-count";
private static final String METRIC_NAME_PROCESS_LATENCY = "__process-latency";
private static final String METRIC_NAME_PROCESS_RFC_1889a_JITTER = "__process-rfc1889a-jitter";
private static final String METRIC_NAME_COMPLETE_LATENCY = "__complete-latency";
private static final String METRIC_NAME_COMPLETE_RFC_1889a_JITTER = "__complete-rfc1889a-jitter";
private static final String METRIC_NAME_EXECUTE_LATENCY = "__execute-latency";
private static final String METRIC_NAME_EXECUTE_RFC_1889a_JITTER = "__execute-rfc1889a-jitter";
private static final String METRIC_NAME_CAPACITY = "__capacity";

private final ConcurrentMap<String, RateCounter> rateCounters = new ConcurrentHashMap<>();
private final ConcurrentMap<String, RollingAverageGauge> gauges = new ConcurrentHashMap<>();
private final ConcurrentMap<String, Gauge> gauges = new ConcurrentHashMap<>();
// Gauge supplier singleton factories
private final Supplier<EwmaGauge> ewmaGaugeFactory;
private final Supplier<RollingAverageGauge> rollingAverageGaugeFactory;

private final String topologyId;
private final String componentId;
private final Integer taskId;
private final Integer workerPort;
private final StormMetricRegistry metricRegistry;
private final int samplingRate;
private final boolean ewmaEnable;


public TaskMetrics(WorkerTopologyContext context, String componentId, Integer taskid,
Expand All @@ -50,6 +59,10 @@ public TaskMetrics(WorkerTopologyContext context, String componentId, Integer ta
this.taskId = taskid;
this.workerPort = context.getThisWorkerPort();
this.samplingRate = ConfigUtils.samplingRate(topoConf);
double ewmaSmoothingFactor = ConfigUtils.ewmaSmoothingFactor(topoConf);
this.ewmaEnable = ConfigUtils.ewmaEnable(topoConf);
this.rollingAverageGaugeFactory = RollingAverageGauge::new;
this.ewmaGaugeFactory = () -> new EwmaGauge(ewmaSmoothingFactor);
}

public void setCapacity(double capacity) {
Expand All @@ -67,6 +80,12 @@ public void spoutAckedTuple(String streamId, long latencyMs) {
metricName = METRIC_NAME_COMPLETE_LATENCY + "-" + streamId;
RollingAverageGauge gauge = this.getRollingAverageGauge(metricName, streamId);
gauge.addValue(latencyMs);

if (this.ewmaEnable) {
metricName = METRIC_NAME_COMPLETE_RFC_1889a_JITTER + "-" + streamId;
EwmaGauge ewmaGauge = this.getExponentialWeightedMobileAverageGauge(metricName, streamId);
ewmaGauge.addValue(latencyMs);
}
}

public void boltAckedTuple(String sourceComponentId, String sourceStreamId, long latencyMs) {
Expand All @@ -78,6 +97,12 @@ public void boltAckedTuple(String sourceComponentId, String sourceStreamId, long
metricName = METRIC_NAME_PROCESS_LATENCY + "-" + key;
RollingAverageGauge gauge = this.getRollingAverageGauge(metricName, sourceStreamId);
gauge.addValue(latencyMs);

if (this.ewmaEnable) {
metricName = METRIC_NAME_PROCESS_RFC_1889a_JITTER + "-" + key;
EwmaGauge ewmaGauge = this.getExponentialWeightedMobileAverageGauge(metricName, sourceStreamId);
ewmaGauge.addValue(latencyMs);
}
}

public void spoutFailedTuple(String streamId) {
Expand Down Expand Up @@ -117,6 +142,12 @@ public void boltExecuteTuple(String sourceComponentId, String sourceStreamId, lo
metricName = METRIC_NAME_EXECUTE_LATENCY + "-" + key;
RollingAverageGauge gauge = this.getRollingAverageGauge(metricName, sourceStreamId);
gauge.addValue(latencyMs);

if (this.ewmaEnable) {
metricName = METRIC_NAME_EXECUTE_RFC_1889a_JITTER + "-" + key;
EwmaGauge ewmaGauge = this.getExponentialWeightedMobileAverageGauge(metricName, sourceStreamId);
ewmaGauge.addValue(latencyMs);
}
}

private RateCounter getRateCounter(String metricName, String streamId) {
Expand All @@ -135,18 +166,54 @@ private RateCounter getRateCounter(String metricName, String streamId) {
}

private RollingAverageGauge getRollingAverageGauge(String metricName, String streamId) {
RollingAverageGauge gauge = this.gauges.get(metricName);
if (gauge == null) {
return getOrCreateGauge(metricName, streamId, RollingAverageGauge.class, this.rollingAverageGaugeFactory);
}

private EwmaGauge getExponentialWeightedMobileAverageGauge(String metricName, String streamId) {
return getOrCreateGauge(metricName, streamId, EwmaGauge.class, this.ewmaGaugeFactory);
}

private <G extends Gauge<?>> G getOrCreateGauge(
String metricName,
String streamId,
Class<G> gaugeClass,
Supplier<G> factory) {

Object existing = this.gauges.get(metricName);
if (existing == null) {
synchronized (this) {
gauge = this.gauges.get(metricName);
if (gauge == null) {
gauge = new RollingAverageGauge();
metricRegistry.gauge(metricName, gauge, this.topologyId, this.componentId,
streamId, this.taskId, this.workerPort);
this.gauges.put(metricName, gauge);
existing = this.gauges.get(metricName);
if (existing == null) {
G created = factory.get();
registerGauge(metricName, streamId, created);
this.gauges.put(metricName, created);
return created;
}
}
}
return gauge;

if (!gaugeClass.isInstance(existing)) {
throw new IllegalStateException(
"Metric '" + metricName + "' is registered as "
+ existing.getClass().getName()
+ " but expected " + gaugeClass.getName());
}

return gaugeClass.cast(existing);
}

/*
* Safe cast: G is bounded by Gauge<?> in the signature of getOrCreateGauge,
* so every instance of G is by definition a Gauge.
* The cast to raw Gauge is required because metricRegistry.gauge() does not
* accept Gauge<?> the wildcard is not compatible with the type parameter T
* expected by the external API. Type-safety is guaranteed by the bound
* <G extends Gauge<?>> declared at the call site.
*/
@SuppressWarnings({"unchecked", "rawtypes"})
private void registerGauge(String metricName, String streamId, Gauge<?> gauge) {
metricRegistry.gauge(metricName, (Gauge) gauge, this.topologyId,
this.componentId, streamId, this.taskId, this.workerPort);
}

}
23 changes: 23 additions & 0 deletions storm-client/src/jvm/org/apache/storm/utils/ConfigUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class ConfigUtils {
public static final String FILE_SEPARATOR = File.separator;
public static final String STORM_HOME = "storm.home";
public static final String RESOURCES_SUBDIR = "resources";
public static final double RFC1889_ALPHA = 1.0 / 16.0;

private static final Set<String> passwordConfigKeys = new HashSet<>();

Expand Down Expand Up @@ -175,6 +176,28 @@ public static int samplingRate(Map<String, Object> conf) {
throw new IllegalArgumentException("Illegal topology.stats.sample.rate in conf: " + rate);
}

public static double ewmaSmoothingFactor(Map<String, Object> conf) {
Object value = conf.get(Config.TOPOLOGY_STATS_EWMA_SMOOTHING_FACTOR);
if (value == null) {
return RFC1889_ALPHA;
}
double alpha = ObjectReader.getDouble(value);
if (alpha > 0.0 && alpha < 1.0) {
return alpha;
}
throw new IllegalArgumentException(
"Illegal " + Config.TOPOLOGY_STATS_EWMA_SMOOTHING_FACTOR
+ " in conf: " + alpha + " must be in (0, 1)");
}

public static boolean ewmaEnable(Map<String, Object> conf) {
Object value = conf.get(Config.TOPOLOGY_STATS_EWMA_ENABLE);
if (value == null) {
return false;
}
return ObjectReader.getBoolean(value, false);
}

public static BooleanSupplier mkStatsSampler(Map<String, Object> conf) {
return evenSampler(samplingRate(conf));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -849,6 +849,23 @@ public void validateField(String name, Object o) {
}
}

public static class EwmaSmoothingFactorValidator extends Validator {
@Override
public void validateField(String name, Object o) {
if (o == null) {
return;
}
if (o instanceof Number) {
double alpha = ((Number) o).doubleValue();
if (alpha > 0.0 && alpha < 1.0) {
return;
}
}
throw new IllegalArgumentException(
"Field " + name + " must be a number in the open interval (0, 1), got: " + o);
}
}

public static class CustomIsExactlyOneOfValidators extends Validator {
private Class<?>[] subValidators;
private List<String> validatorClassNames;
Expand Down
Loading
Loading