From a7295cc21f2502759bdfb3fcee30d7300d8e83da Mon Sep 17 00:00:00 2001 From: Davide Polato Date: Thu, 11 Jun 2026 09:46:01 +0200 Subject: [PATCH 1/6] test: add Y2038 heartbeat overflow regression tests Five regression tests for STORM #7897: - CWH time_secs i64 round-trip survives post-2038 epochs - HeartbeatCache accepts Long TIME_SECS beats without false timeout - documents int currentTimeSecs() overflow (negative guard) - legacy i32 LSWorkerHeartbeat blob fails required-field validation under the i64 schema (wire-compat semantics) - post-2038 executor launch window not misclassified as dead --- .../daemon/nimbus/Y2038HeartbeatTest.java | 211 ++++++++++++++++++ 1 file changed, 211 insertions(+) create mode 100644 storm-server/src/test/java/org/apache/storm/daemon/nimbus/Y2038HeartbeatTest.java diff --git a/storm-server/src/test/java/org/apache/storm/daemon/nimbus/Y2038HeartbeatTest.java b/storm-server/src/test/java/org/apache/storm/daemon/nimbus/Y2038HeartbeatTest.java new file mode 100644 index 0000000000..e4cc6d573d --- /dev/null +++ b/storm-server/src/test/java/org/apache/storm/daemon/nimbus/Y2038HeartbeatTest.java @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm.daemon.nimbus; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.storm.generated.Assignment; +import org.apache.storm.generated.ClusterWorkerHeartbeat; +import org.apache.storm.generated.LSWorkerHeartbeat; +import org.apache.storm.generated.NodeInfo; +import org.apache.storm.stats.ClientStatsUtil; +import org.apache.storm.thrift.TDeserializer; +import org.apache.storm.thrift.TException; +import org.apache.storm.thrift.TSerializer; +import org.apache.storm.thrift.protocol.TBinaryProtocol; +import org.apache.storm.thrift.protocol.TField; +import org.apache.storm.thrift.protocol.TList; +import org.apache.storm.thrift.protocol.TStruct; +import org.apache.storm.thrift.protocol.TType; +import org.apache.storm.thrift.transport.TMemoryBuffer; +import org.apache.storm.thrift.transport.TMemoryInputTransport; +import org.apache.storm.utils.Time; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Regression tests for the Y2038 heartbeat overflow (STORM issue #7897). + * + *

Worker heartbeat timestamps were carried as i32 seconds, which overflows on + * 2038-01-19T03:14:07Z. These tests pin the post-2038 behavior: heartbeat + * {@code time_secs} must survive serialization as a 64-bit value, and the Nimbus + * {@link HeartbeatCache} must not flag fresh post-2038 heartbeats as timed out. + */ +class Y2038HeartbeatTest { + private static final String TOPO_ID = "y2038-topology-1"; + private static final int TIMEOUT_SECS = 30; + /** 2040-01-01T00:00:00Z — comfortably past the 2038 i32 rollover. */ + private static final long POST_2038_EPOCH_SECS = 2_208_988_800L; + /** 2023-11-14T22:13:20Z — a plausible pre-2038 heartbeat timestamp. */ + private static final int LEGACY_EPOCH_SECS = 1_700_000_000; + + /** + * A post-2038 epoch must round-trip through ClusterWorkerHeartbeat serialization + * without narrowing. Uses the dynamic field API so this test compiles against both + * the i32 and i64 schema; with the i32 schema the Long value is rejected. + */ + @Test + void testCwhTimeSecsSurvivesPost2038RoundTrip() throws Exception { + ClusterWorkerHeartbeat hb = new ClusterWorkerHeartbeat(); + hb.set_storm_id(TOPO_ID); + hb.set_executor_stats(new HashMap<>()); + hb.set_uptime_secs(120); + hb.setFieldValue(ClusterWorkerHeartbeat._Fields.TIME_SECS, POST_2038_EPOCH_SECS); + + TSerializer ser = new TSerializer(new TBinaryProtocol.Factory()); + byte[] bytes = ser.serialize(hb); + ClusterWorkerHeartbeat read = new ClusterWorkerHeartbeat(); + TDeserializer des = new TDeserializer(new TBinaryProtocol.Factory()); + des.deserialize(read, bytes); + + long timeSecs = ((Number) read.getFieldValue(ClusterWorkerHeartbeat._Fields.TIME_SECS)).longValue(); + assertTrue(timeSecs > Integer.MAX_VALUE, "time_secs must not be narrowed to i32"); + assertEquals(POST_2038_EPOCH_SECS, timeSecs, "time_secs must round-trip unchanged"); + } + + /** + * A fresh heartbeat received after 2038 must not be flagged as timed out. + * The beat map carries TIME_SECS as a Long, matching what the post-fix + * heartbeat writer produces. + */ + @Test + void testHeartbeatCacheNoFalseTimeoutPost2038() { + try (Time.SimulatedTime ignored = new Time.SimulatedTime()) { + Time.advanceTime(POST_2038_EPOCH_SECS * 1000L); + + HeartbeatCache cache = new HeartbeatCache(); + Set> allExecutors = Collections.singleton(Arrays.asList(1, 1)); + Assignment assignment = mkAssignment(POST_2038_EPOCH_SECS, 1, 1); + + cache.updateFromZkHeartbeat(TOPO_ID, mkZkExecutorBeats(1, 1, POST_2038_EPOCH_SECS), allExecutors, + TIMEOUT_SECS); + Time.advanceTimeSecs(1); + cache.timeoutOldHeartbeats(TOPO_ID, TIMEOUT_SECS); + + Set> alive = cache.getAliveExecutors(TOPO_ID, allExecutors, assignment, TIMEOUT_SECS); + assertFalse(alive.isEmpty(), "A fresh post-2038 heartbeat must not be flagged as timed out"); + } + } + + /** + * Documents why the long-based clock path is required: the int-based + * {@code Time.currentTimeSecs()} overflows past 2038. + */ + @Test + void testCurrentTimeSecsIntOverflowsPost2038() { + try (Time.SimulatedTime ignored = new Time.SimulatedTime()) { + Time.advanceTime(POST_2038_EPOCH_SECS * 1000L); + assertTrue(Time.currentTimeSecs() < 0, + "currentTimeSecs() (int) is expected to overflow past 2038 — long path required"); + } + } + + /** + * Wire compatibility: an LSWorkerHeartbeat blob written by the legacy i32 schema + * must fail loudly (required-field validation) under the i64 schema, not be + * silently misread. Supervisors treat the entry as stale and rebuild local state. + */ + @Test + void testLegacyI32LswhBlobFailsValidationUnderI64Schema() throws Exception { + byte[] legacyBlob = writeLegacyI32Lswh(LEGACY_EPOCH_SECS, TOPO_ID, 6700); + + LSWorkerHeartbeat read = new LSWorkerHeartbeat(); + assertThrows(TException.class, + () -> read.read(new TBinaryProtocol(new TMemoryInputTransport(legacyBlob))), + "A legacy i32 time_secs blob must fail required-field validation under the i64 schema"); + } + + /** + * Startup guard: an executor whose assignment start time is past 2038 must not be + * misclassified as dead by getAliveExecutors while inside the launch window. + */ + @Test + void testExecutorStartedPost2038NotMisclassifiedDead() { + try (Time.SimulatedTime ignored = new Time.SimulatedTime()) { + Time.advanceTime(POST_2038_EPOCH_SECS * 1000L); + + HeartbeatCache cache = new HeartbeatCache(); + Set> allExecutors = Collections.singleton(Arrays.asList(1, 1)); + Assignment assignment = mkAssignment(POST_2038_EPOCH_SECS, 1, 1); + + // No heartbeat reported yet; the executor is within the task launch window. + Time.advanceTimeSecs(1); + Set> alive = cache.getAliveExecutors(TOPO_ID, allExecutors, assignment, TIMEOUT_SECS); + assertFalse(alive.isEmpty(), + "An executor launched post-2038 must stay alive during its launch window"); + } + } + + /** + * Emulates the wire bytes an old (pre-fix) writer produced for LSWorkerHeartbeat: + * field 1 time_secs as i32, field 2 topology_id, field 3 empty executors list, + * field 4 port. Field ids and types match the legacy schema. + */ + private byte[] writeLegacyI32Lswh(int timeSecs, String topologyId, int port) throws Exception { + TMemoryBuffer buffer = new TMemoryBuffer(128); + TBinaryProtocol prot = new TBinaryProtocol(buffer); + prot.writeStructBegin(new TStruct("LSWorkerHeartbeat")); + prot.writeFieldBegin(new TField("time_secs", TType.I32, (short) 1)); + prot.writeI32(timeSecs); + prot.writeFieldEnd(); + prot.writeFieldBegin(new TField("topology_id", TType.STRING, (short) 2)); + prot.writeString(topologyId); + prot.writeFieldEnd(); + prot.writeFieldBegin(new TField("executors", TType.LIST, (short) 3)); + prot.writeListBegin(new TList(TType.STRUCT, 0)); + prot.writeListEnd(); + prot.writeFieldEnd(); + prot.writeFieldBegin(new TField("port", TType.I32, (short) 4)); + prot.writeI32(port); + prot.writeFieldEnd(); + prot.writeFieldStop(); + prot.writeStructEnd(); + return Arrays.copyOf(buffer.getArray(), buffer.length()); + } + + private Map, Map> mkZkExecutorBeats(int taskStart, int taskEnd, long timeSecs) { + Map beat = new HashMap<>(); + beat.put(ClientStatsUtil.TIME_SECS, timeSecs); + return Collections.singletonMap(Arrays.asList(taskStart, taskEnd), beat); + } + + private Assignment mkAssignment(long startTimeSecs, int... executors) { + Assignment assignment = new Assignment(); + Map, Long> execToStartTime = new HashMap<>(); + Map, NodeInfo> execToNodePort = new HashMap<>(); + NodeInfo nodeInfo = new NodeInfo("node1", Collections.singleton(6700L)); + for (int i = 0; i < executors.length - 1; i += 2) { + List exec = Arrays.asList((long) executors[i], (long) executors[i + 1]); + execToStartTime.put(exec, startTimeSecs); + execToNodePort.put(exec, nodeInfo); + } + assignment.set_executor_start_time_secs(execToStartTime); + assignment.set_executor_node_port(execToNodePort); + return assignment; + } +} From 9b979c588d9fa1d71ec01cb5deb870434eaa0dee Mon Sep 17 00:00:00 2001 From: Davide Polato Date: Thu, 11 Jun 2026 09:49:45 +0200 Subject: [PATCH 2/6] fix: add long-based clock methods to Time, deprecate int epoch-seconds Time.currentTimeSecs() narrows epoch seconds to int, which overflows on 2038-01-19T03:14:07Z and is the root cause of STORM #7897 (workers falsely timed out post-2038). Add currentTimeSecsLong() and deltaSecsLong(long) for absolute timestamps. The int variants stay, deprecated, for short relative durations (uptime/UI). --- .../src/jvm/org/apache/storm/utils/Time.java | 31 +++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/storm-client/src/jvm/org/apache/storm/utils/Time.java b/storm-client/src/jvm/org/apache/storm/utils/Time.java index 0c7bfab80a..9946b46e8c 100644 --- a/storm-client/src/jvm/org/apache/storm/utils/Time.java +++ b/storm-client/src/jvm/org/apache/storm/utils/Time.java @@ -150,14 +150,45 @@ public static long secsToMillisLong(double secs) { return (long) (1000 * secs); } + /** + * Current time in seconds since the epoch, as an int. + * + * @deprecated the int representation overflows on 2038-01-19T03:14:07Z. Use + * {@link #currentTimeSecsLong()} for absolute timestamps; this method remains + * valid only for short relative durations such as uptime. + */ + @Deprecated public static int currentTimeSecs() { return (int) (currentTimeMillis() / 1000); } + /** + * Current time in seconds since the epoch, as a long. Safe past the 2038 + * int-seconds overflow; use this for all absolute timestamps. + */ + public static long currentTimeSecsLong() { + return currentTimeMillis() / 1000; + } + + /** + * Seconds elapsed since {@code timeInSeconds}, as an int. + * + * @deprecated int epoch-seconds overflow on 2038-01-19T03:14:07Z. Use + * {@link #deltaSecsLong(long)} when the reference point is an absolute timestamp. + */ + @Deprecated public static int deltaSecs(int timeInSeconds) { return Time.currentTimeSecs() - timeInSeconds; } + /** + * Seconds elapsed since {@code timeInSeconds} (an absolute epoch timestamp), as a long. + * Safe past the 2038 int-seconds overflow. + */ + public static long deltaSecsLong(long timeInSeconds) { + return currentTimeSecsLong() - timeInSeconds; + } + public static long deltaMs(long timeInMilliseconds) { return Time.currentTimeMillis() - timeInMilliseconds; } From 7a98b446126819108db34fdfe21555943b004ee6 Mon Sep 17 00:00:00 2001 From: Davide Polato Date: Thu, 11 Jun 2026 14:52:50 +0200 Subject: [PATCH 3/6] fix: promote heartbeat time_secs to i64 in thrift schema Promote time_secs from i32 to i64 in ClusterWorkerHeartbeat, SupervisorWorkerHeartbeat and LSWorkerHeartbeat; uptime_secs stays i32 (relative duration). Regenerate the Java and Python thrift bindings for the three structs. Widen the callers that the type change forces at compile time: ExecutorBeat.timeSecs, ClusterUtils.convertExecutorBeats and the latest-heartbeat comparison in PaceMakerStateStorage.get_worker_hb (an absolute comparison that broke across the 2038 rollover). Wire compat: thrift tags i32/i64 differently, so blobs written by the old schema fail required-field validation under the new one. Heartbeats self-heal on re-report; a full-cluster bounce upgrade is required. --- .../apache/storm/cluster/ClusterUtils.java | 2 +- .../apache/storm/cluster/ExecutorBeat.java | 6 ++-- .../storm/cluster/PaceMakerStateStorage.java | 2 +- .../generated/ClusterWorkerHeartbeat.java | 26 ++++++++-------- .../storm/generated/LSWorkerHeartbeat.java | 26 ++++++++-------- .../generated/SupervisorWorkerHeartbeat.java | 26 ++++++++-------- storm-client/src/py/storm/ttypes.py | 30 +++++++++---------- storm-client/src/storm.thrift | 6 ++-- 8 files changed, 62 insertions(+), 62 deletions(-) diff --git a/storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java b/storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java index c0bacc4618..0c322d9ace 100644 --- a/storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java +++ b/storm-client/src/jvm/org/apache/storm/cluster/ClusterUtils.java @@ -262,7 +262,7 @@ public static Map convertExecutorBeats(List executorStatsMap = workerHeartbeat.get_executor_stats(); for (ExecutorInfo executor : executors) { if (executorStatsMap.containsKey(executor)) { - int time = workerHeartbeat.get_time_secs(); + long time = workerHeartbeat.get_time_secs(); int uptime = workerHeartbeat.get_uptime_secs(); ExecutorStats executorStats = workerHeartbeat.get_executor_stats().get(executor); ExecutorBeat executorBeat = new ExecutorBeat(time, uptime, executorStats); diff --git a/storm-client/src/jvm/org/apache/storm/cluster/ExecutorBeat.java b/storm-client/src/jvm/org/apache/storm/cluster/ExecutorBeat.java index 37ecc32ba8..91e344bf34 100644 --- a/storm-client/src/jvm/org/apache/storm/cluster/ExecutorBeat.java +++ b/storm-client/src/jvm/org/apache/storm/cluster/ExecutorBeat.java @@ -15,17 +15,17 @@ import org.apache.storm.generated.ExecutorStats; public class ExecutorBeat { - private final int timeSecs; + private final long timeSecs; private final int uptime; private final ExecutorStats stats; - public ExecutorBeat(int timeSecs, int uptime, ExecutorStats stats) { + public ExecutorBeat(long timeSecs, int uptime, ExecutorStats stats) { this.timeSecs = timeSecs; this.uptime = uptime; this.stats = stats; } - public int getTimeSecs() { + public long getTimeSecs() { return timeSecs; } diff --git a/storm-client/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java b/storm-client/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java index 401771a2e6..e746e5ec64 100644 --- a/storm-client/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java +++ b/storm-client/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java @@ -149,7 +149,7 @@ public byte[] get_worker_hb(String path, boolean watch) { while (true) { try { byte[] ret = null; - int latestTimeSecs = 0; + long latestTimeSecs = 0; boolean gotResponse = false; HBMessage message = new HBMessage(HBServerMessageType.GET_PULSE, HBMessageData.path(path)); diff --git a/storm-client/src/jvm/org/apache/storm/generated/ClusterWorkerHeartbeat.java b/storm-client/src/jvm/org/apache/storm/generated/ClusterWorkerHeartbeat.java index ac81e4fc0b..90c73722f0 100644 --- a/storm-client/src/jvm/org/apache/storm/generated/ClusterWorkerHeartbeat.java +++ b/storm-client/src/jvm/org/apache/storm/generated/ClusterWorkerHeartbeat.java @@ -30,7 +30,7 @@ public class ClusterWorkerHeartbeat implements org.apache.storm.thrift.TBase executor_stats; // required - private int time_secs; // required + private long time_secs; // required private int uptime_secs; // required /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ @@ -126,7 +126,7 @@ public java.lang.String getFieldName() { new org.apache.storm.thrift.meta_data.StructMetaData(org.apache.storm.thrift.protocol.TType.STRUCT, ExecutorInfo.class), new org.apache.storm.thrift.meta_data.StructMetaData(org.apache.storm.thrift.protocol.TType.STRUCT, ExecutorStats.class)))); tmpMap.put(_Fields.TIME_SECS, new org.apache.storm.thrift.meta_data.FieldMetaData("time_secs", org.apache.storm.thrift.TFieldRequirementType.REQUIRED, - new org.apache.storm.thrift.meta_data.FieldValueMetaData(org.apache.storm.thrift.protocol.TType.I32))); + new org.apache.storm.thrift.meta_data.FieldValueMetaData(org.apache.storm.thrift.protocol.TType.I64))); tmpMap.put(_Fields.UPTIME_SECS, new org.apache.storm.thrift.meta_data.FieldMetaData("uptime_secs", org.apache.storm.thrift.TFieldRequirementType.REQUIRED, new org.apache.storm.thrift.meta_data.FieldValueMetaData(org.apache.storm.thrift.protocol.TType.I32))); metaDataMap = java.util.Collections.unmodifiableMap(tmpMap); @@ -139,7 +139,7 @@ public ClusterWorkerHeartbeat() { public ClusterWorkerHeartbeat( java.lang.String storm_id, java.util.Map executor_stats, - int time_secs, + long time_secs, int uptime_secs) { this(); @@ -252,11 +252,11 @@ public void set_executor_stats_isSet(boolean value) { } } - public int get_time_secs() { + public long get_time_secs() { return this.time_secs; } - public void set_time_secs(int time_secs) { + public void set_time_secs(long time_secs) { this.time_secs = time_secs; set_time_secs_isSet(true); } @@ -319,7 +319,7 @@ public void setFieldValue(_Fields field, @org.apache.storm.thrift.annotation.Nul if (value == null) { unset_time_secs(); } else { - set_time_secs((java.lang.Integer)value); + set_time_secs((java.lang.Long)value); } break; @@ -438,7 +438,7 @@ public int hashCode() { if (is_set_executor_stats()) hashCode = hashCode * 8191 + executor_stats.hashCode(); - hashCode = hashCode * 8191 + time_secs; + hashCode = hashCode * 8191 + org.apache.storm.thrift.TBaseHelper.hashCode(time_secs); hashCode = hashCode * 8191 + uptime_secs; @@ -634,8 +634,8 @@ public void read(org.apache.storm.thrift.protocol.TProtocol iprot, ClusterWorker } break; case 3: // TIME_SECS - if (schemeField.type == org.apache.storm.thrift.protocol.TType.I32) { - struct.time_secs = iprot.readI32(); + if (schemeField.type == org.apache.storm.thrift.protocol.TType.I64) { + struct.time_secs = iprot.readI64(); struct.set_time_secs_isSet(true); } else { org.apache.storm.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); @@ -682,7 +682,7 @@ public void write(org.apache.storm.thrift.protocol.TProtocol oprot, ClusterWorke oprot.writeFieldEnd(); } oprot.writeFieldBegin(TIME_SECS_FIELD_DESC); - oprot.writeI32(struct.time_secs); + oprot.writeI64(struct.time_secs); oprot.writeFieldEnd(); oprot.writeFieldBegin(UPTIME_SECS_FIELD_DESC); oprot.writeI32(struct.uptime_secs); @@ -714,7 +714,7 @@ public void write(org.apache.storm.thrift.protocol.TProtocol prot, ClusterWorker _iter827.getValue().write(oprot); } } - oprot.writeI32(struct.time_secs); + oprot.writeI64(struct.time_secs); oprot.writeI32(struct.uptime_secs); } @@ -738,7 +738,7 @@ public void read(org.apache.storm.thrift.protocol.TProtocol prot, ClusterWorkerH } } struct.set_executor_stats_isSet(true); - struct.time_secs = iprot.readI32(); + struct.time_secs = iprot.readI64(); struct.set_time_secs_isSet(true); struct.uptime_secs = iprot.readI32(); struct.set_uptime_secs_isSet(true); diff --git a/storm-client/src/jvm/org/apache/storm/generated/LSWorkerHeartbeat.java b/storm-client/src/jvm/org/apache/storm/generated/LSWorkerHeartbeat.java index d5e6e903ed..be10e28bd1 100644 --- a/storm-client/src/jvm/org/apache/storm/generated/LSWorkerHeartbeat.java +++ b/storm-client/src/jvm/org/apache/storm/generated/LSWorkerHeartbeat.java @@ -28,7 +28,7 @@ public class LSWorkerHeartbeat implements org.apache.storm.thrift.TBase, java.io.Serializable, Cloneable, Comparable { private static final org.apache.storm.thrift.protocol.TStruct STRUCT_DESC = new org.apache.storm.thrift.protocol.TStruct("LSWorkerHeartbeat"); - private static final org.apache.storm.thrift.protocol.TField TIME_SECS_FIELD_DESC = new org.apache.storm.thrift.protocol.TField("time_secs", org.apache.storm.thrift.protocol.TType.I32, (short)1); + private static final org.apache.storm.thrift.protocol.TField TIME_SECS_FIELD_DESC = new org.apache.storm.thrift.protocol.TField("time_secs", org.apache.storm.thrift.protocol.TType.I64, (short)1); private static final org.apache.storm.thrift.protocol.TField TOPOLOGY_ID_FIELD_DESC = new org.apache.storm.thrift.protocol.TField("topology_id", org.apache.storm.thrift.protocol.TType.STRING, (short)2); private static final org.apache.storm.thrift.protocol.TField EXECUTORS_FIELD_DESC = new org.apache.storm.thrift.protocol.TField("executors", org.apache.storm.thrift.protocol.TType.LIST, (short)3); private static final org.apache.storm.thrift.protocol.TField PORT_FIELD_DESC = new org.apache.storm.thrift.protocol.TField("port", org.apache.storm.thrift.protocol.TType.I32, (short)4); @@ -36,7 +36,7 @@ public class LSWorkerHeartbeat implements org.apache.storm.thrift.TBase executors; // required private int port; // required @@ -120,7 +120,7 @@ public java.lang.String getFieldName() { static { java.util.Map<_Fields, org.apache.storm.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, org.apache.storm.thrift.meta_data.FieldMetaData>(_Fields.class); tmpMap.put(_Fields.TIME_SECS, new org.apache.storm.thrift.meta_data.FieldMetaData("time_secs", org.apache.storm.thrift.TFieldRequirementType.REQUIRED, - new org.apache.storm.thrift.meta_data.FieldValueMetaData(org.apache.storm.thrift.protocol.TType.I32))); + new org.apache.storm.thrift.meta_data.FieldValueMetaData(org.apache.storm.thrift.protocol.TType.I64))); tmpMap.put(_Fields.TOPOLOGY_ID, new org.apache.storm.thrift.meta_data.FieldMetaData("topology_id", org.apache.storm.thrift.TFieldRequirementType.REQUIRED, new org.apache.storm.thrift.meta_data.FieldValueMetaData(org.apache.storm.thrift.protocol.TType.STRING))); tmpMap.put(_Fields.EXECUTORS, new org.apache.storm.thrift.meta_data.FieldMetaData("executors", org.apache.storm.thrift.TFieldRequirementType.REQUIRED, @@ -136,7 +136,7 @@ public LSWorkerHeartbeat() { } public LSWorkerHeartbeat( - int time_secs, + long time_secs, java.lang.String topology_id, java.util.List executors, int port) @@ -184,11 +184,11 @@ public void clear() { this.port = 0; } - public int get_time_secs() { + public long get_time_secs() { return this.time_secs; } - public void set_time_secs(int time_secs) { + public void set_time_secs(long time_secs) { this.time_secs = time_secs; set_time_secs_isSet(true); } @@ -299,7 +299,7 @@ public void setFieldValue(_Fields field, @org.apache.storm.thrift.annotation.Nul if (value == null) { unset_time_secs(); } else { - set_time_secs((java.lang.Integer)value); + set_time_secs((java.lang.Long)value); } break; @@ -426,7 +426,7 @@ public boolean equals(LSWorkerHeartbeat that) { public int hashCode() { int hashCode = 1; - hashCode = hashCode * 8191 + time_secs; + hashCode = hashCode * 8191 + org.apache.storm.thrift.TBaseHelper.hashCode(time_secs); hashCode = hashCode * 8191 + ((is_set_topology_id()) ? 131071 : 524287); if (is_set_topology_id()) @@ -600,8 +600,8 @@ public void read(org.apache.storm.thrift.protocol.TProtocol iprot, LSWorkerHeart } switch (schemeField.id) { case 1: // TIME_SECS - if (schemeField.type == org.apache.storm.thrift.protocol.TType.I32) { - struct.time_secs = iprot.readI32(); + if (schemeField.type == org.apache.storm.thrift.protocol.TType.I64) { + struct.time_secs = iprot.readI64(); struct.set_time_secs_isSet(true); } else { org.apache.storm.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); @@ -657,7 +657,7 @@ public void write(org.apache.storm.thrift.protocol.TProtocol oprot, LSWorkerHear oprot.writeStructBegin(STRUCT_DESC); oprot.writeFieldBegin(TIME_SECS_FIELD_DESC); - oprot.writeI32(struct.time_secs); + oprot.writeI64(struct.time_secs); oprot.writeFieldEnd(); if (struct.topology_id != null) { oprot.writeFieldBegin(TOPOLOGY_ID_FIELD_DESC); @@ -697,7 +697,7 @@ private static class LSWorkerHeartbeatTupleScheme extends org.apache.storm.thrif @Override public void write(org.apache.storm.thrift.protocol.TProtocol prot, LSWorkerHeartbeat struct) throws org.apache.storm.thrift.TException { org.apache.storm.thrift.protocol.TTupleProtocol oprot = (org.apache.storm.thrift.protocol.TTupleProtocol) prot; - oprot.writeI32(struct.time_secs); + oprot.writeI64(struct.time_secs); oprot.writeString(struct.topology_id); { oprot.writeI32(struct.executors.size()); @@ -712,7 +712,7 @@ public void write(org.apache.storm.thrift.protocol.TProtocol prot, LSWorkerHeart @Override public void read(org.apache.storm.thrift.protocol.TProtocol prot, LSWorkerHeartbeat struct) throws org.apache.storm.thrift.TException { org.apache.storm.thrift.protocol.TTupleProtocol iprot = (org.apache.storm.thrift.protocol.TTupleProtocol) prot; - struct.time_secs = iprot.readI32(); + struct.time_secs = iprot.readI64(); struct.set_time_secs_isSet(true); struct.topology_id = iprot.readString(); struct.set_topology_id_isSet(true); diff --git a/storm-client/src/jvm/org/apache/storm/generated/SupervisorWorkerHeartbeat.java b/storm-client/src/jvm/org/apache/storm/generated/SupervisorWorkerHeartbeat.java index 215c78636c..ca31a8f286 100644 --- a/storm-client/src/jvm/org/apache/storm/generated/SupervisorWorkerHeartbeat.java +++ b/storm-client/src/jvm/org/apache/storm/generated/SupervisorWorkerHeartbeat.java @@ -30,14 +30,14 @@ public class SupervisorWorkerHeartbeat implements org.apache.storm.thrift.TBase< private static final org.apache.storm.thrift.protocol.TField STORM_ID_FIELD_DESC = new org.apache.storm.thrift.protocol.TField("storm_id", org.apache.storm.thrift.protocol.TType.STRING, (short)1); private static final org.apache.storm.thrift.protocol.TField EXECUTORS_FIELD_DESC = new org.apache.storm.thrift.protocol.TField("executors", org.apache.storm.thrift.protocol.TType.LIST, (short)2); - private static final org.apache.storm.thrift.protocol.TField TIME_SECS_FIELD_DESC = new org.apache.storm.thrift.protocol.TField("time_secs", org.apache.storm.thrift.protocol.TType.I32, (short)3); + private static final org.apache.storm.thrift.protocol.TField TIME_SECS_FIELD_DESC = new org.apache.storm.thrift.protocol.TField("time_secs", org.apache.storm.thrift.protocol.TType.I64, (short)3); private static final org.apache.storm.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new SupervisorWorkerHeartbeatStandardSchemeFactory(); private static final org.apache.storm.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new SupervisorWorkerHeartbeatTupleSchemeFactory(); private @org.apache.storm.thrift.annotation.Nullable java.lang.String storm_id; // required private @org.apache.storm.thrift.annotation.Nullable java.util.List executors; // required - private int time_secs; // required + private long time_secs; // required /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ public enum _Fields implements org.apache.storm.thrift.TFieldIdEnum { @@ -119,7 +119,7 @@ public java.lang.String getFieldName() { new org.apache.storm.thrift.meta_data.ListMetaData(org.apache.storm.thrift.protocol.TType.LIST, new org.apache.storm.thrift.meta_data.StructMetaData(org.apache.storm.thrift.protocol.TType.STRUCT, ExecutorInfo.class)))); tmpMap.put(_Fields.TIME_SECS, new org.apache.storm.thrift.meta_data.FieldMetaData("time_secs", org.apache.storm.thrift.TFieldRequirementType.REQUIRED, - new org.apache.storm.thrift.meta_data.FieldValueMetaData(org.apache.storm.thrift.protocol.TType.I32))); + new org.apache.storm.thrift.meta_data.FieldValueMetaData(org.apache.storm.thrift.protocol.TType.I64))); metaDataMap = java.util.Collections.unmodifiableMap(tmpMap); org.apache.storm.thrift.meta_data.FieldMetaData.addStructMetaDataMap(SupervisorWorkerHeartbeat.class, metaDataMap); } @@ -130,7 +130,7 @@ public SupervisorWorkerHeartbeat() { public SupervisorWorkerHeartbeat( java.lang.String storm_id, java.util.List executors, - int time_secs) + long time_secs) { this(); this.storm_id = storm_id; @@ -234,11 +234,11 @@ public void set_executors_isSet(boolean value) { } } - public int get_time_secs() { + public long get_time_secs() { return this.time_secs; } - public void set_time_secs(int time_secs) { + public void set_time_secs(long time_secs) { this.time_secs = time_secs; set_time_secs_isSet(true); } @@ -279,7 +279,7 @@ public void setFieldValue(_Fields field, @org.apache.storm.thrift.annotation.Nul if (value == null) { unset_time_secs(); } else { - set_time_secs((java.lang.Integer)value); + set_time_secs((java.lang.Long)value); } break; @@ -376,7 +376,7 @@ public int hashCode() { if (is_set_executors()) hashCode = hashCode * 8191 + executors.hashCode(); - hashCode = hashCode * 8191 + time_secs; + hashCode = hashCode * 8191 + org.apache.storm.thrift.TBaseHelper.hashCode(time_secs); return hashCode; } @@ -549,8 +549,8 @@ public void read(org.apache.storm.thrift.protocol.TProtocol iprot, SupervisorWor } break; case 3: // TIME_SECS - if (schemeField.type == org.apache.storm.thrift.protocol.TType.I32) { - struct.time_secs = iprot.readI32(); + if (schemeField.type == org.apache.storm.thrift.protocol.TType.I64) { + struct.time_secs = iprot.readI64(); struct.set_time_secs_isSet(true); } else { org.apache.storm.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); @@ -588,7 +588,7 @@ public void write(org.apache.storm.thrift.protocol.TProtocol oprot, SupervisorWo oprot.writeFieldEnd(); } oprot.writeFieldBegin(TIME_SECS_FIELD_DESC); - oprot.writeI32(struct.time_secs); + oprot.writeI64(struct.time_secs); oprot.writeFieldEnd(); oprot.writeFieldStop(); oprot.writeStructEnd(); @@ -616,7 +616,7 @@ public void write(org.apache.storm.thrift.protocol.TProtocol prot, SupervisorWor _iter924.write(oprot); } } - oprot.writeI32(struct.time_secs); + oprot.writeI64(struct.time_secs); } @Override @@ -636,7 +636,7 @@ public void read(org.apache.storm.thrift.protocol.TProtocol prot, SupervisorWork } } struct.set_executors_isSet(true); - struct.time_secs = iprot.readI32(); + struct.time_secs = iprot.readI64(); struct.set_time_secs_isSet(true); } } diff --git a/storm-client/src/py/storm/ttypes.py b/storm-client/src/py/storm/ttypes.py index 03e892620f..4c47b00e42 100644 --- a/storm-client/src/py/storm/ttypes.py +++ b/storm-client/src/py/storm/ttypes.py @@ -7678,8 +7678,8 @@ def read(self, iprot): else: iprot.skip(ftype) elif fid == 3: - if ftype == TType.I32: - self.time_secs = iprot.readI32() + if ftype == TType.I64: + self.time_secs = iprot.readI64() else: iprot.skip(ftype) elif fid == 4: @@ -7711,8 +7711,8 @@ def write(self, oprot): oprot.writeMapEnd() oprot.writeFieldEnd() if self.time_secs is not None: - oprot.writeFieldBegin('time_secs', TType.I32, 3) - oprot.writeI32(self.time_secs) + oprot.writeFieldBegin('time_secs', TType.I64, 3) + oprot.writeI64(self.time_secs) oprot.writeFieldEnd() if self.uptime_secs is not None: oprot.writeFieldBegin('uptime_secs', TType.I32, 4) @@ -8239,8 +8239,8 @@ def read(self, iprot): if ftype == TType.STOP: break if fid == 1: - if ftype == TType.I32: - self.time_secs = iprot.readI32() + if ftype == TType.I64: + self.time_secs = iprot.readI64() else: iprot.skip(ftype) elif fid == 2: @@ -8276,8 +8276,8 @@ def write(self, oprot): return oprot.writeStructBegin('LSWorkerHeartbeat') if self.time_secs is not None: - oprot.writeFieldBegin('time_secs', TType.I32, 1) - oprot.writeI32(self.time_secs) + oprot.writeFieldBegin('time_secs', TType.I64, 1) + oprot.writeI64(self.time_secs) oprot.writeFieldEnd() if self.topology_id is not None: oprot.writeFieldBegin('topology_id', TType.STRING, 2) @@ -9183,8 +9183,8 @@ def read(self, iprot): else: iprot.skip(ftype) elif fid == 3: - if ftype == TType.I32: - self.time_secs = iprot.readI32() + if ftype == TType.I64: + self.time_secs = iprot.readI64() else: iprot.skip(ftype) else: @@ -9210,8 +9210,8 @@ def write(self, oprot): oprot.writeListEnd() oprot.writeFieldEnd() if self.time_secs is not None: - oprot.writeFieldBegin('time_secs', TType.I32, 3) - oprot.writeI32(self.time_secs) + oprot.writeFieldBegin('time_secs', TType.I64, 3) + oprot.writeI64(self.time_secs) oprot.writeFieldEnd() oprot.writeFieldStop() oprot.writeStructEnd() @@ -13264,7 +13264,7 @@ def __ne__(self, other): None, # 0 (1, TType.STRING, 'storm_id', 'UTF8', None, ), # 1 (2, TType.MAP, 'executor_stats', (TType.STRUCT, [ExecutorInfo, None], TType.STRUCT, [ExecutorStats, None], False), None, ), # 2 - (3, TType.I32, 'time_secs', None, None, ), # 3 + (3, TType.I64, 'time_secs', None, None, ), # 3 (4, TType.I32, 'uptime_secs', None, None, ), # 4 ) all_structs.append(ThriftSerializedObject) @@ -13305,7 +13305,7 @@ def __ne__(self, other): all_structs.append(LSWorkerHeartbeat) LSWorkerHeartbeat.thrift_spec = ( None, # 0 - (1, TType.I32, 'time_secs', None, None, ), # 1 + (1, TType.I64, 'time_secs', None, None, ), # 1 (2, TType.STRING, 'topology_id', 'UTF8', None, ), # 2 (3, TType.LIST, 'executors', (TType.STRUCT, [ExecutorInfo, None], False), None, ), # 3 (4, TType.I32, 'port', None, None, ), # 4 @@ -13382,7 +13382,7 @@ def __ne__(self, other): None, # 0 (1, TType.STRING, 'storm_id', 'UTF8', None, ), # 1 (2, TType.LIST, 'executors', (TType.STRUCT, [ExecutorInfo, None], False), None, ), # 2 - (3, TType.I32, 'time_secs', None, None, ), # 3 + (3, TType.I64, 'time_secs', None, None, ), # 3 ) all_structs.append(SupervisorWorkerHeartbeats) SupervisorWorkerHeartbeats.thrift_spec = ( diff --git a/storm-client/src/storm.thrift b/storm-client/src/storm.thrift index 274c435177..ae39af1744 100644 --- a/storm-client/src/storm.thrift +++ b/storm-client/src/storm.thrift @@ -551,7 +551,7 @@ struct StormBase { struct ClusterWorkerHeartbeat { 1: required string storm_id; 2: required map executor_stats; - 3: required i32 time_secs; + 3: required i64 time_secs; 4: required i32 uptime_secs; } @@ -586,7 +586,7 @@ struct LSSupervisorAssignments { } struct LSWorkerHeartbeat { - 1: required i32 time_secs; + 1: required i64 time_secs; 2: required string topology_id; 3: required list executors 4: required i32 port; @@ -688,7 +688,7 @@ struct OwnerResourceSummary { struct SupervisorWorkerHeartbeat { 1: required string storm_id; 2: required list executors - 3: required i32 time_secs; + 3: required i64 time_secs; } struct SupervisorWorkerHeartbeats { From 707e5200a2bcb5e79337fd603ce1f34c7f5d5590 Mon Sep 17 00:00:00 2001 From: Davide Polato Date: Thu, 11 Jun 2026 14:56:53 +0200 Subject: [PATCH 4/6] fix: write heartbeat timestamps from the long-based clock Switch every heartbeat writer to Time.currentTimeSecsLong(): - Worker.doHeartBeat (LSWorkerHeartbeat, on-disk local state) - ClientStatsUtil.mkZkWorkerHb (ZK beat map, TIME_SECS now a Long) - ClientStatsUtil.thriftifyZkWorkerHb (keep full long, no intValue narrowing) - StatsUtil.thriftifyRpcWorkerHb (SupervisorWorkerHeartbeat) - SupervisorHeartbeat (SupervisorInfo.time_secs, already i64 on the wire but previously fed a wrapped int) --- .../src/jvm/org/apache/storm/daemon/worker/Worker.java | 2 +- .../src/jvm/org/apache/storm/stats/ClientStatsUtil.java | 4 ++-- .../storm/daemon/supervisor/timer/SupervisorHeartbeat.java | 4 ++-- .../src/main/java/org/apache/storm/stats/StatsUtil.java | 2 +- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java index e78b01804b..aab416491e 100644 --- a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java +++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java @@ -387,7 +387,7 @@ public void doRefreshLoad() { public void doHeartBeat() throws IOException { LocalState state = ConfigUtils.workerState(workerState.conf, workerState.workerId); - LSWorkerHeartbeat lsWorkerHeartbeat = new LSWorkerHeartbeat(Time.currentTimeSecs(), workerState.topologyId, + LSWorkerHeartbeat lsWorkerHeartbeat = new LSWorkerHeartbeat(Time.currentTimeSecsLong(), workerState.topologyId, workerState.localExecutors.stream() .map(executor -> new ExecutorInfo( executor.get(0).intValue(), diff --git a/storm-client/src/jvm/org/apache/storm/stats/ClientStatsUtil.java b/storm-client/src/jvm/org/apache/storm/stats/ClientStatsUtil.java index 62269a4089..463b47480f 100644 --- a/storm-client/src/jvm/org/apache/storm/stats/ClientStatsUtil.java +++ b/storm-client/src/jvm/org/apache/storm/stats/ClientStatsUtil.java @@ -85,7 +85,7 @@ public static Map mkZkWorkerHb(String topoId, Map, ret.put("storm-id", topoId); ret.put(EXECUTOR_STATS, executorStats); ret.put(UPTIME, uptime); - ret.put(TIME_SECS, Time.currentTimeSecs()); + ret.put(TIME_SECS, Time.currentTimeSecsLong()); return ret; } @@ -119,7 +119,7 @@ public static ClusterWorkerHeartbeat thriftifyZkWorkerHb(Map hea ClusterWorkerHeartbeat ret = new ClusterWorkerHeartbeat(); ret.set_uptime_secs(getByKeyOr0(heartbeat, UPTIME).intValue()); ret.set_storm_id((String) heartbeat.get("storm-id")); - ret.set_time_secs(getByKeyOr0(heartbeat, TIME_SECS).intValue()); + ret.set_time_secs(getByKeyOr0(heartbeat, TIME_SECS).longValue()); Map convertedStats = new HashMap<>(); diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java index b4fec0bb63..b613f36ee9 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/timer/SupervisorHeartbeat.java @@ -74,7 +74,7 @@ private Map buildSupervisorInfo(Map conf if (validatedNumaMap != null) { for (Map.Entry numaMapEntry : validatedNumaMap.entrySet()) { SupervisorInfo supervisorInfo = new SupervisorInfo(); - supervisorInfo.set_time_secs(Time.currentTimeSecs()); + supervisorInfo.set_time_secs(Time.currentTimeSecsLong()); supervisorInfo.set_hostname(supervisor.getHostName()); supervisorInfo.set_assignment_id( supervisor.getAssignmentId() + ServerConstants.NUMA_ID_SEPARATOR + numaMapEntry.getKey() @@ -106,7 +106,7 @@ private Map buildSupervisorInfo(Map conf if (totalSupervisorNormalizedResources.getTotalCpu() > 0 && totalSupervisorNormalizedResources.getTotalMemoryMb() > 0 && !allPortList.isEmpty()) { SupervisorInfo supervisorInfo = new SupervisorInfo(); - supervisorInfo.set_time_secs(Time.currentTimeSecs()); + supervisorInfo.set_time_secs(Time.currentTimeSecsLong()); supervisorInfo.set_hostname(supervisor.getHostName()); supervisorInfo.set_assignment_id(supervisor.getAssignmentId()); supervisorInfo.set_server_port(supervisor.getThriftServerPort()); diff --git a/storm-server/src/main/java/org/apache/storm/stats/StatsUtil.java b/storm-server/src/main/java/org/apache/storm/stats/StatsUtil.java index ea0ec58911..f449e2553e 100644 --- a/storm-server/src/main/java/org/apache/storm/stats/StatsUtil.java +++ b/storm-server/src/main/java/org/apache/storm/stats/StatsUtil.java @@ -1885,7 +1885,7 @@ public static SupervisorWorkerHeartbeat thriftifyRpcWorkerHb(String stormId, Lis supervisorWorkerHeartbeat.set_storm_id(stormId); supervisorWorkerHeartbeat .set_executors(Collections.singletonList(new ExecutorInfo(executorId.get(0).intValue(), executorId.get(1).intValue()))); - supervisorWorkerHeartbeat.set_time_secs(Time.currentTimeSecs()); + supervisorWorkerHeartbeat.set_time_secs(Time.currentTimeSecsLong()); return supervisorWorkerHeartbeat; } From c94daf01d360fa089eca79b4d4b5dca50e083922 Mon Sep 17 00:00:00 2001 From: Davide Polato Date: Thu, 11 Jun 2026 15:02:52 +0200 Subject: [PATCH 5/6] fix: widen heartbeat timeout state and consumers to long HeartbeatCache (the Nimbus worker-liveness site) tracks receipt time and reported time as Long and computes timeouts with deltaSecsLong; beat map values are read through Number so Integer beats from legacy producers still work. The executor launch-window check no longer truncates assignment start times through intValue(). Supervisor and logviewer consumers move off the int clock as well: Slot heartbeat-age checks use deltaSecsLong, and the logviewer alive-worker scan (WorkerLogs/LogCleaner) carries epoch seconds as long end to end. --- .../storm/daemon/nimbus/HeartbeatCache.java | 22 +++++++++---------- .../apache/storm/daemon/supervisor/Slot.java | 4 ++-- .../daemon/logviewer/utils/LogCleaner.java | 4 ++-- .../daemon/logviewer/utils/WorkerLogs.java | 6 ++--- .../logviewer/utils/LogCleanerTest.java | 2 +- 5 files changed, 19 insertions(+), 19 deletions(-) diff --git a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/HeartbeatCache.java b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/HeartbeatCache.java index c69aa2c250..128b06da57 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/nimbus/HeartbeatCache.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/nimbus/HeartbeatCache.java @@ -47,17 +47,17 @@ public class HeartbeatCache { private static class ExecutorCache { private Boolean isTimedOut; - private Integer nimbusTimeSecs; - private Integer executorReportedTimeSecs; + private Long nimbusTimeSecs; + private Long executorReportedTimeSecs; ExecutorCache(Map newBeat) { if (newBeat != null) { - executorReportedTimeSecs = (Integer) newBeat.getOrDefault(ClientStatsUtil.TIME_SECS, 0); + executorReportedTimeSecs = ((Number) newBeat.getOrDefault(ClientStatsUtil.TIME_SECS, 0L)).longValue(); } else { - executorReportedTimeSecs = 0; + executorReportedTimeSecs = 0L; } - nimbusTimeSecs = Time.currentTimeSecs(); + nimbusTimeSecs = Time.currentTimeSecsLong(); isTimedOut = false; } @@ -65,18 +65,18 @@ public synchronized Boolean isTimedOut() { return isTimedOut; } - public synchronized Integer getNimbusTimeSecs() { + public synchronized Long getNimbusTimeSecs() { return nimbusTimeSecs; } public synchronized void updateTimeout(Integer timeout) { - isTimedOut = Time.deltaSecs(getNimbusTimeSecs()) >= timeout; + isTimedOut = Time.deltaSecsLong(getNimbusTimeSecs()) >= timeout; } // Used for RPC heartbeats: nimbusTimeSecs is refreshed on every heartbeat so that // idle-but-alive executors (whose stats TIME_SECS may not advance) are not falsely timed out. public synchronized void updateFromRpcHb(Integer timeout) { - nimbusTimeSecs = Time.currentTimeSecs(); + nimbusTimeSecs = Time.currentTimeSecsLong(); updateTimeout(timeout); } @@ -84,9 +84,9 @@ public synchronized void updateFromRpcHb(Integer timeout) { // TIME_SECS advances, preserving zombie detection for legacy topologies. public synchronized void updateFromZkHb(Integer timeout, Map newBeat) { if (newBeat != null) { - Integer newReportedTime = (Integer) newBeat.getOrDefault(ClientStatsUtil.TIME_SECS, 0); + Long newReportedTime = ((Number) newBeat.getOrDefault(ClientStatsUtil.TIME_SECS, 0L)).longValue(); if (!newReportedTime.equals(executorReportedTimeSecs)) { - nimbusTimeSecs = Time.currentTimeSecs(); + nimbusTimeSecs = Time.currentTimeSecsLong(); } executorReportedTimeSecs = newReportedTime; } @@ -224,7 +224,7 @@ public Set> getAliveExecutors(String topoId, Set> al ExecutorCache executorCache = topoCache.get(exec); //null isTimedOut means worker never reported any heartbeat boolean isTimedOut = executorCache == null ? true : executorCache.isTimedOut(); - Integer delta = startTime == null ? null : Time.deltaSecs(startTime.intValue()); + Long delta = startTime == null ? null : Time.deltaSecsLong(startTime); if (startTime != null && ((delta < taskLaunchSecs) || !isTimedOut)) { ret.add(exec); } else { diff --git a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java index ae0fcbe809..27b8d7cc61 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/supervisor/Slot.java @@ -724,7 +724,7 @@ private static DynamicState handleWaitingForWorkerStart(DynamicState dynamicStat LSWorkerHeartbeat hb = dynamicState.container.readHeartbeat(); if (hb != null) { - long hbAgeMs = (Time.currentTimeSecs() - hb.get_time_secs()) * 1000; + long hbAgeMs = Time.deltaSecsLong(hb.get_time_secs()) * 1000; long hbTimeoutMs = getHbTimeoutMs(staticState, dynamicState); if (hbAgeMs <= hbTimeoutMs) { return dynamicState.withState(MachineState.RUNNING); @@ -820,7 +820,7 @@ private static DynamicState handleRunning(DynamicState dynamicState, StaticState return killContainerFor(KillReason.HB_NULL, dynamicState, staticState); } - long timeDiffMs = (Time.currentTimeSecs() - hb.get_time_secs()) * 1000; + long timeDiffMs = Time.deltaSecsLong(hb.get_time_secs()) * 1000; long hbTimeoutMs = getHbTimeoutMs(staticState, dynamicState); if (timeDiffMs > hbTimeoutMs) { LOG.warn("SLOT {}: HB is too old {} > {} for topology: {}", diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/LogCleaner.java b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/LogCleaner.java index 077dff4177..f5740d8241 100644 --- a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/LogCleaner.java +++ b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/LogCleaner.java @@ -167,7 +167,7 @@ public void run() { Set oldLogDirs = selectDirsForCleanup(nowMills); final long nowSecs = TimeUnit.MILLISECONDS.toSeconds(nowMills); - SortedSet deadWorkerDirs = getDeadWorkerDirs((int) nowSecs, oldLogDirs); + SortedSet deadWorkerDirs = getDeadWorkerDirs(nowSecs, oldLogDirs); LOG.debug("log cleanup: now={} old log dirs {} dead worker dirs {}", nowSecs, oldLogDirs.stream().map(p -> p.getFileName().toString()).collect(joining(",")), @@ -247,7 +247,7 @@ void cleanupEmptyTopoDirectory(Path dir) throws IOException { * Return a sorted set of paths that were written by workers that are now dead. */ @VisibleForTesting - SortedSet getDeadWorkerDirs(int nowSecs, Set logDirs) throws Exception { + SortedSet getDeadWorkerDirs(long nowSecs, Set logDirs) throws Exception { if (logDirs.isEmpty()) { return new TreeSet<>(); } else { diff --git a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/WorkerLogs.java b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/WorkerLogs.java index 43319ce613..7a9af36478 100644 --- a/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/WorkerLogs.java +++ b/storm-webapp/src/main/java/org/apache/storm/daemon/logviewer/utils/WorkerLogs.java @@ -155,7 +155,7 @@ public Set getAllWorkerDirs() { * Return a sorted set of paths that were written by workers that are now active. */ public SortedSet getAliveWorkerDirs() throws IOException { - Set aliveIds = getAliveIds(Time.currentTimeSecs()); + Set aliveIds = getAliveIds(Time.currentTimeSecsLong()); Set logDirs = getAllWorkerDirs(); return getLogDirs(logDirs, (wid) -> aliveIds.contains(wid)); } @@ -199,14 +199,14 @@ public String getTopologyOwnerFromMetadataFile(Path metaFile) { * * @param nowSecs current time in seconds */ - public Set getAliveIds(int nowSecs) throws IOException { + public Set getAliveIds(long nowSecs) throws IOException { return SupervisorUtils.readWorkerHeartbeats(stormConf).entrySet().stream() .filter(entry -> Objects.nonNull(entry.getValue()) && !isTimedOut(nowSecs, entry)) .map(Map.Entry::getKey) .collect(toCollection(TreeSet::new)); } - private boolean isTimedOut(int nowSecs, Map.Entry entry) { + private boolean isTimedOut(long nowSecs, Map.Entry entry) { LSWorkerHeartbeat hb = entry.getValue(); int workerLogTimeout = getTopologyTimeout(hb); return (nowSecs - hb.get_time_secs()) >= workerLogTimeout; diff --git a/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/utils/LogCleanerTest.java b/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/utils/LogCleanerTest.java index ab4733b3b5..004b99a164 100644 --- a/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/utils/LogCleanerTest.java +++ b/storm-webapp/src/test/java/org/apache/storm/daemon/logviewer/utils/LogCleanerTest.java @@ -281,7 +281,7 @@ Set selectDirsForCleanup(long nowMillis) { } @Override - SortedSet getDeadWorkerDirs(int nowSecs, Set logDirs) { + SortedSet getDeadWorkerDirs(long nowSecs, Set logDirs) { SortedSet dirs = new TreeSet<>(); dirs.add(dir1.getFile().toPath()); dirs.add(dir2.getFile().toPath()); From b562a741b0073d7e3142704b68c64361659a596a Mon Sep 17 00:00:00 2001 From: Davide Polato Date: Thu, 11 Jun 2026 15:05:12 +0200 Subject: [PATCH 6/6] docs: document i64 heartbeat timestamps and upgrade implications Explain the STORM-7897 time_secs i32->i64 promotion, why uptime_secs stays i32, the deprecation of the int-based Time methods, and the full-cluster bounce upgrade requirement (legacy heartbeat blobs fail required-field validation and self-heal on re-report). --- docs/Cluster-State-Serialization.md | 31 +++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/docs/Cluster-State-Serialization.md b/docs/Cluster-State-Serialization.md index 8581448f40..b59a0b2f40 100644 --- a/docs/Cluster-State-Serialization.md +++ b/docs/Cluster-State-Serialization.md @@ -122,3 +122,34 @@ non-Thrift Java-serialization variant). The zstd codec is provided by Apache Commons Compress (`org.apache.commons:commons-compress`) backed by the `com.github.luben:zstd-jni` native binding. + +## Heartbeat timestamps and the year 2038 + +Since STORM-7897, the `time_secs` field of `ClusterWorkerHeartbeat`, +`SupervisorWorkerHeartbeat` and `LSWorkerHeartbeat` is a 64-bit integer +(`i64`), and all heartbeat writers and timeout checks use the long-based +clock (`Time.currentTimeSecsLong()` / `Time.deltaSecsLong(...)`). Earlier +releases carried these timestamps as `i32` seconds, which overflows on +2038-01-19T03:14:07Z and would have caused Nimbus to treat live workers +as dead. + +`uptime_secs` fields remain `i32`: they are relative durations, not +absolute timestamps. `Time.currentTimeSecs()` and `Time.deltaSecs(int)` +are deprecated but retained for such relative-duration callers. + +### Upgrade implications + +Thrift tags `i32` and `i64` values differently on the wire, so heartbeat +blobs written by a pre-upgrade daemon do **not** deserialize under the +new schema: the reader skips the mistyped field and the blob then fails +required-field validation with a `TProtocolException`. In practice: + +1. **A full-cluster upgrade is required.** Do not run a mixed-version + cluster across Nimbus, Supervisors and workers: heartbeats do not + round-trip between the old and new schema in either direction. +2. **In-flight heartbeats are dropped once, then self-heal.** Nimbus + times workers out by *receipt* time, so a dropped heartbeat is + replaced on the next report interval; supervisors rewrite their + on-disk `LSWorkerHeartbeat` local state the same way. Expect at most + one report cycle of staleness around the restart, which is within the + normal tolerance of a full-cluster bounce.