Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
aae2554
init
GGraziadei May 3, 2026
2aad8a2
allocate suppliers once
GGraziadei May 3, 2026
5cb1936
improve jitter definition
GGraziadei May 5, 2026
5aa8264
add documentation
GGraziadei May 5, 2026
946715d
format
GGraziadei May 5, 2026
ac4e3b1
format
GGraziadei May 5, 2026
23247e3
fix checkstyle
GGraziadei May 5, 2026
ce8e2b1
fix RFC 1889 jitter definition, guarantee jitter decay if stable latency
GGraziadei May 6, 2026
f040770
minor changes
GGraziadei May 8, 2026
b29b8e3
Merge branch '8538-rfc-1889a-jitter-metric' into 8538-jitter-internal…
GGraziadei May 9, 2026
cb69a04
publish control signal based on ewma metrics
GGraziadei May 9, 2026
f5e28f4
fix typo on `ZeroOneOpenIntervalValidator`
GGraziadei May 9, 2026
451eafa
consume metrics and update internal status
GGraziadei May 9, 2026
e7d94f8
implement a cache for child tasks stats
GGraziadei May 9, 2026
35abc68
Merge branch 'master' into 8538-jitter-control-loop
GGraziadei May 20, 2026
215e3e0
feedback message serialization and path decoupling
GGraziadei May 21, 2026
150e210
Merge remote-tracking branch 'upstream/master' into 8538-jitter-contr…
GGraziadei May 24, 2026
99f53e4
feedback refactoring + JitterAwareStreamGrouping
GGraziadei May 24, 2026
7d34c90
control loop lazy update
GGraziadei May 30, 2026
a76ee8d
send feedback according to a periodic tick tuple
GGraziadei May 31, 2026
a95f5f1
Merge branch 'master' into 8538-jitter-control-loop
GGraziadei May 31, 2026
c938196
minor changes
GGraziadei Jun 6, 2026
d8e7cda
add jitter aware grouping topology bench
GGraziadei Jun 7, 2026
2b4d8ad
remove not necessary UpstreamFeedbackCompareTopo.java
GGraziadei Jun 7, 2026
babd3d9
remove unnecessary import, FT for registering FEEDBACK_TICK_STREAM
GGraziadei Jun 7, 2026
7edf2d5
`LoadAwareCustomStreamGrouping` contract backward compatibility
GGraziadei Jun 12, 2026
d3f4e08
unit test + fix FeedbackRecord
GGraziadei Jun 12, 2026
0e0e18c
refine jitter aware stream grouping algorithm
GGraziadei Jun 13, 2026
f983050
review fixes
GGraziadei Jun 13, 2026
2d3f5fd
security comment + FT scheduleUpstreamFeedbackTick in system components
GGraziadei Jun 13, 2026
2643296
remove feedback stream default value
GGraziadei Jun 13, 2026
935b324
Removes the lazy update logic to ensure fault tolerance in case a
GGraziadei Jun 14, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions conf/defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,8 @@ topology.state.synchronization.timeout.secs: 60
topology.stats.sample.rate: 0.05
topology.stats.ewma.enable: false
topology.stats.ewma.smoothing.factor: 0.0625
topology.upstream.feedback.freq.secs: 10
topology.upstream.feedback.enable: false
topology.builtin.metrics.bucket.size.secs: 60
topology.fall.back.on.java.serialization: false
topology.worker.childopts: null
Expand Down

Large diffs are not rendered by default.

41 changes: 39 additions & 2 deletions storm-client/src/jvm/org/apache/storm/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -608,8 +608,46 @@ public class Config extends HashMap<String, Object> {
*
* @see <a href="https://www.rfc-editor.org/rfc/rfc1889#appendix-A.8">RFC 1889 §A.8</a>
*/
@CustomValidator(validatorClass = ConfigValidation.EwmaSmoothingFactorValidator.class)
@CustomValidator(validatorClass = ConfigValidation.ZeroOneOpenIntervalValidator.class)
public static final String TOPOLOGY_STATS_EWMA_SMOOTHING_FACTOR = "topology.stats.ewma.smoothing.factor";
/**
* Flag to enable or disable the feedback channel for upstream communication.
* When true, components can send unanchored tuples back to their source tasks.
*
* <p><b>Security:</b> feedback tuples carry a routing control signal (e.g. per-task EWMA jitter
* used by grouping decisions), so a peer that can inject messages on the worker transport could
* forge feedback and deterministically steer a topology's traffic to a chosen task. This stays
* within Storm's existing worker-transport trust model, but because the default
* {@code storm.messaging.netty.authentication} is {@code false}, enable Netty authentication
* (and TLS where available) when running this feature in an untrusted network.</p>
*/
@IsBoolean
public static final String TOPOLOGY_UPSTREAM_FEEDBACK_ENABLE = "topology.upstream.feedback.enable";
/**
* The period, in seconds, between upstream feedback messages within the topology.
*
* <p>A dedicated internal feedback tick fires on this interval; on each tick a task emits
* a feedback tuple (containing metrics such as EWMA jitter stats) back to its parent tasks.
* This mechanism allows parent tasks to receive performance signals from downstream
* components to facilitate adaptive flow control or load balancing. Unlike a probabilistic
* trigger, the period yields a deterministic, data-volume-independent feedback cadence.</p>
*
* <p><b>Validation:</b> Must be a positive integer (seconds).</p>
*
* <p><b>Impact:</b>
* <ul>
* <li>Lower values provide more precise, real-time performance data but increase
* network overhead and CPU usage on the control plane.</li>
* <li>Higher values minimize the "observer effect" on the topology's throughput
* while still providing periodic statistical snapshots of health.</li>
* </ul>
* </p>
*
* Defaults to 10 if not explicitly configured.
*/
@IsInteger
@IsPositiveNumber
public static final String TOPOLOGY_UPSTREAM_FEEDBACK_FREQ_SECS = "topology.upstream.feedback.freq.secs";
/**
* The time period that builtin metrics data in bucketed into.
*/
Expand Down Expand Up @@ -1901,7 +1939,6 @@ public class Config extends HashMap<String, Object> {
public static final String STORM_MESSAGING_NETTY_TLS_SSL_PROTOCOLS = "storm.messaging.netty.tls.ssl.protocols";

/**
* /**
* Netty based messaging: The number of milliseconds that a Netty client will retry flushing messages that are already
* buffered to be sent.
*/
Expand Down
2 changes: 2 additions & 0 deletions storm-client/src/jvm/org/apache/storm/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ public class Constants {
public static final String METRICS_COMPONENT_ID_PREFIX = "__metrics_";
public static final String METRICS_STREAM_ID = "__metrics";
public static final String METRICS_TICK_STREAM_ID = "__metrics_tick";
public static final String FEEDBACK_STREAM_ID = "__feedback";
public static final String FEEDBACK_TICK_STREAM_ID = "__feedback_tick";

public static final Object TOPOLOGY = "topology";
public static final String SYSTEM_TOPOLOGY = "system-topology";
Expand Down
24 changes: 24 additions & 0 deletions storm-client/src/jvm/org/apache/storm/daemon/StormCommon.java
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,20 @@ public static void addEventLogger(Map<String, Object> conf, StormTopology topolo
topology.put_to_bolts(EVENTLOGGER_COMPONENT_ID, eventLoggerBolt);
}

public static void addUpstreamFeedback(Map<String, Object> conf, StormTopology topology) {
// Only invoked when hasUpstreamFeedback(conf) is true, so declare the feedback stream on every
// component unconditionally. The schema must match the tuple emitted by
// Executor.buildUpstreamFeedbackTuple: [TaskInfo, EwmaFeedbackRecord].
for (Object component : allComponents(topology).values()) {
ComponentCommon common = getComponentCommon(component);
common.put_to_streams(Constants.FEEDBACK_STREAM_ID, Thrift.outputFields(upstreamFeedbackFields()));
}
}

public static List<String> upstreamFeedbackFields() {
return Arrays.asList("task-info", "feedback");
}

@SuppressWarnings("unchecked")
public static Map<String, Bolt> metricsConsumerBoltSpecs(Map<String, Object> conf, StormTopology topology) {
Map<String, Bolt> metricsConsumerBolts = new HashMap<>();
Expand Down Expand Up @@ -429,6 +443,9 @@ public static void addSystemComponents(Map<String, Object> conf, StormTopology t
outputStreams.put(Constants.SYSTEM_TICK_STREAM_ID, Thrift.outputFields(Arrays.asList("rate_secs")));
outputStreams.put(Constants.SYSTEM_FLUSH_STREAM_ID, Thrift.outputFields(Arrays.asList()));
outputStreams.put(Constants.METRICS_TICK_STREAM_ID, Thrift.outputFields(Arrays.asList("interval")));
if (ConfigUtils.upstreamFeedbackEnable(conf)) {
outputStreams.put(Constants.FEEDBACK_TICK_STREAM_ID, Thrift.outputFields(Arrays.asList("interval")));
}

Map<String, Object> boltConf = new HashMap<>();
boltConf.put(Config.TOPOLOGY_TASKS, 0);
Expand Down Expand Up @@ -464,6 +481,10 @@ public static boolean hasEventLoggers(Map<String, Object> topoConf) {
return eventLoggerNum == null || ObjectReader.getInt(eventLoggerNum) > 0;
}

public static boolean hasUpstreamFeedback(Map<String, Object> topoConf) {
return ConfigUtils.upstreamFeedbackEnable(topoConf);
}

public static int numStartExecutors(Object component) throws InvalidTopologyException {
ComponentCommon common = getComponentCommon(component);
return Thrift.getParallelismHint(common);
Expand Down Expand Up @@ -538,6 +559,9 @@ protected StormTopology systemTopologyImpl(Map<String, Object> topoConf, StormTo
if (hasEventLoggers(topoConf)) {
addEventLogger(topoConf, ret);
}
if (hasUpstreamFeedback(topoConf)) {
addUpstreamFeedback(topoConf, ret);
}
addMetricComponents(topoConf, ret);
addSystemComponents(topoConf, ret);
addMetricStreams(ret);
Expand Down
21 changes: 21 additions & 0 deletions storm-client/src/jvm/org/apache/storm/daemon/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,27 @@ public void sendUnanchored(String stream, List<Object> values, ExecutorTransfer
}
}

/**
* Sends an unanchored feedback tuple directly to a specific task ID (typically upstream).
* <p>
* This method bypasses standard stream grouping logic and routes the tuple
* exclusively to the provided {@code targetTaskId}. It is a <b>non-blocking</b> call:
* if the destination buffer is full, the tuple is added to the {@code pendingEmits}
* queue for later retry, preventing executor stalls.
* </p>
*
* @param stream The ID of the stream to emit on (must be declared in the topology).
* @param values The data payload to be sent.
* @param targetTaskId The unique ID of the destination task (e.g., the sourceTaskId of an incoming tuple).
* @param transfer The {@link ExecutorTransfer} instance handling the physical data transfer.
* @param pendingEmits A queue used to store tuples that cannot be transferred immediately due to backpressure.
*/
public void sendUnanchoredFeedback(String stream, List<Object> values, int targetTaskId, ExecutorTransfer transfer, Queue<AddressedTuple> pendingEmits) {
Tuple tuple = getTuple(stream, values);
AddressedTuple addressedTuple = new AddressedTuple(targetTaskId, tuple);
transfer.tryTransfer(addressedTuple, pendingEmits);
}

/**
* Send sampled data to the eventlogger if the global or component level debug flag is set (via nimbus api).
*/
Expand Down
84 changes: 84 additions & 0 deletions storm-client/src/jvm/org/apache/storm/executor/ChildEwmaStats.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/

package org.apache.storm.executor;

import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.storm.metrics2.TaskMetrics;

/**
* Thread-safe store of EWMA jitter statistics reported by downstream (child) tasks back to a parent task.
* The data is indexed by parent {@code taskId} so a lookup touches only that task's children:
* {@link #getStats} is an O(1) map lookup and {@link #update} is O(metrics), neither scanning the whole
* store. This keeps cost bound to a single task's child fan-out, independent of how many tasks the
* executor hosts.
*/
public class ChildEwmaStats {

private final boolean enabled;
private final Map<Integer, ConcurrentHashMap<Integer, Map<String, Double>>> byTask;

private static final String[] JITTER_PRIORITY = {
TaskMetrics.METRIC_NAME_EXECUTE_JITTER,
TaskMetrics.METRIC_NAME_PROCESS_JITTER,
TaskMetrics.METRIC_NAME_COMPLETE_JITTER,
};

public ChildEwmaStats(boolean enabled) {
this.enabled = enabled;
this.byTask = enabled ? new ConcurrentHashMap<>() : Collections.emptyMap();
}

/**
* Records the jitter metrics reported by {@code childTaskId} for the given parent {@code taskId}.
* Runs in O(metrics) by writing straight into the task's bucket; no rescanning of existing data.
*/
public void update(int taskId, int childTaskId, EwmaFeedbackRecord feedback) {
if (!enabled) {
return;
}
ConcurrentHashMap<Integer, Map<String, Double>> children =
byTask.computeIfAbsent(taskId, k -> new ConcurrentHashMap<>());
Map<String, Double> metrics = children.computeIfAbsent(childTaskId, k -> new ConcurrentHashMap<>());
feedback.forEachMetric(metrics::put);
}

/**
* Returns the latest reported value of each metric, per child task, for the given source
* {@code taskId} as {@code childTaskId -> (metricName -> value)}.
*/
public Map<Integer, Map<String, Double>> getStats(int taskId) {
if (!enabled) {
return Collections.emptyMap();
}
Map<Integer, Map<String, Double>> children = byTask.get(taskId);
return children == null ? Collections.emptyMap() : children;
}

/**
* Compares two stats maps following {@link #JITTER_PRIORITY}, ascending. A missing metric
* is treated as {@link Double#MAX_VALUE} so it loses to any measured value.
*/
public static int compareByJitter(Map<String, Double> a, Map<String, Double> b) {
for (String metric : JITTER_PRIORITY) {
int cmp = Double.compare(
a.getOrDefault(metric, Double.MAX_VALUE),
b.getOrDefault(metric, Double.MAX_VALUE));
if (cmp != 0) {
return cmp;
}
}
return 0;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
* 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
*/

package org.apache.storm.executor;

import com.codahale.metrics.Gauge;
import java.util.Map;
import java.util.function.ObjDoubleConsumer;
import org.apache.storm.daemon.worker.WorkerState;
import org.apache.storm.metrics2.PerReporterGauge;
import org.apache.storm.metrics2.TaskMetrics;

/**
* Immutable snapshot of a task's jitter metrics, used as the payload of an upstream feedback tuple.
*
* @param processJitter The {@code __process-jitter} gauge value, or {@link #VOID} if absent.
* @param completeJitter The {@code __complete-jitter} gauge value, or {@link #VOID} if absent.
* @param executeJitter The {@code __execute-jitter} gauge value, or {@link #VOID} if absent.
*/
public record EwmaFeedbackRecord(double processJitter, double completeJitter, double executeJitter) {

// Sentinel for an absent metric. Jitter values are always >= 0, so a negative value can never
// collide with a real measurement and unambiguously marks "gauge missing / not a Number".
private static final double VOID = -1;

private static double fromGauge(Gauge<?> gauge) {
if (gauge != null && !(gauge instanceof PerReporterGauge)) {
Object v = gauge.getValue();
if (v instanceof Number) {
return ((Number) v).doubleValue();
}
}
return VOID;
}

private static double aggregate(Map<String, Gauge> gauges, String metricName) {
String suffixedPrefix = metricName + "-";
double agg = VOID;
for (Map.Entry<String, Gauge> entry : gauges.entrySet()) {
String name = entry.getKey();
if (!name.equals(metricName) && !name.startsWith(suffixedPrefix)) {
continue;
}
double value = fromGauge(entry.getValue());
if (value != VOID && (agg == VOID || value > agg)) {
agg = value;
}
}
return agg;
}

public static EwmaFeedbackRecord fromWorkerState(WorkerState workerData, int taskId) {
Map<String, Gauge> allGauges = workerData.getMetricRegistry().getTaskGauges(taskId);
return new EwmaFeedbackRecord(aggregate(allGauges, TaskMetrics.METRIC_NAME_PROCESS_JITTER),
aggregate(allGauges, TaskMetrics.METRIC_NAME_COMPLETE_JITTER),
aggregate(allGauges, TaskMetrics.METRIC_NAME_EXECUTE_JITTER));
}

/**
* Invokes {@code consumer} once for each present jitter metric.
*/
public void forEachMetric(ObjDoubleConsumer<String> consumer) {
if (processJitter != VOID) {
consumer.accept(TaskMetrics.METRIC_NAME_PROCESS_JITTER, processJitter);
}
if (completeJitter != VOID) {
consumer.accept(TaskMetrics.METRIC_NAME_COMPLETE_JITTER, completeJitter);
}
if (executeJitter != VOID) {
consumer.accept(TaskMetrics.METRIC_NAME_EXECUTE_JITTER, executeJitter);
}
}
}
Loading
Loading