Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions docs/Cluster-State-Serialization.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,3 +122,34 @@ non-Thrift Java-serialization variant).
The zstd codec is provided by Apache Commons Compress
(`org.apache.commons:commons-compress`) backed by the `com.github.luben:zstd-jni`
native binding.

## Heartbeat timestamps and the year 2038

Since STORM-7897, the `time_secs` field of `ClusterWorkerHeartbeat`,
`SupervisorWorkerHeartbeat` and `LSWorkerHeartbeat` is a 64-bit integer
(`i64`), and all heartbeat writers and timeout checks use the long-based
clock (`Time.currentTimeSecsLong()` / `Time.deltaSecsLong(...)`). Earlier
releases carried these timestamps as `i32` seconds, which overflows on
2038-01-19T03:14:07Z and would have caused Nimbus to treat live workers
as dead.

`uptime_secs` fields remain `i32`: they are relative durations, not
absolute timestamps. `Time.currentTimeSecs()` and `Time.deltaSecs(int)`
are deprecated but retained for such relative-duration callers.

### Upgrade implications

Thrift tags `i32` and `i64` values differently on the wire, so heartbeat
blobs written by a pre-upgrade daemon do **not** deserialize under the
new schema: the reader skips the mistyped field and the blob then fails
required-field validation with a `TProtocolException`. In practice:

1. **A full-cluster upgrade is required.** Do not run a mixed-version
cluster across Nimbus, Supervisors and workers: heartbeats do not
round-trip between the old and new schema in either direction.
2. **In-flight heartbeats are dropped once, then self-heal.** Nimbus
times workers out by *receipt* time, so a dropped heartbeat is
replaced on the next report interval; supervisors rewrite their
on-disk `LSWorkerHeartbeat` local state the same way. Expect at most
one report cycle of staleness around the restart, which is within the
normal tolerance of a full-cluster bounce.
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ public static Map<ExecutorInfo, ExecutorBeat> convertExecutorBeats(List<Executor
Map<ExecutorInfo, ExecutorStats> executorStatsMap = workerHeartbeat.get_executor_stats();
for (ExecutorInfo executor : executors) {
if (executorStatsMap.containsKey(executor)) {
int time = workerHeartbeat.get_time_secs();
long time = workerHeartbeat.get_time_secs();
int uptime = workerHeartbeat.get_uptime_secs();
ExecutorStats executorStats = workerHeartbeat.get_executor_stats().get(executor);
ExecutorBeat executorBeat = new ExecutorBeat(time, uptime, executorStats);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,17 @@
import org.apache.storm.generated.ExecutorStats;

public class ExecutorBeat {
private final int timeSecs;
private final long timeSecs;
private final int uptime;
private final ExecutorStats stats;

public ExecutorBeat(int timeSecs, int uptime, ExecutorStats stats) {
public ExecutorBeat(long timeSecs, int uptime, ExecutorStats stats) {
this.timeSecs = timeSecs;
this.uptime = uptime;
this.stats = stats;
}

public int getTimeSecs() {
public long getTimeSecs() {
return timeSecs;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public byte[] get_worker_hb(String path, boolean watch) {
while (true) {
try {
byte[] ret = null;
int latestTimeSecs = 0;
long latestTimeSecs = 0;
boolean gotResponse = false;

HBMessage message = new HBMessage(HBServerMessageType.GET_PULSE, HBMessageData.path(path));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ public void doRefreshLoad() {

public void doHeartBeat() throws IOException {
LocalState state = ConfigUtils.workerState(workerState.conf, workerState.workerId);
LSWorkerHeartbeat lsWorkerHeartbeat = new LSWorkerHeartbeat(Time.currentTimeSecs(), workerState.topologyId,
LSWorkerHeartbeat lsWorkerHeartbeat = new LSWorkerHeartbeat(Time.currentTimeSecsLong(), workerState.topologyId,
workerState.localExecutors.stream()
.map(executor -> new ExecutorInfo(
executor.get(0).intValue(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,15 @@ public class ClusterWorkerHeartbeat implements org.apache.storm.thrift.TBase<Clu

private static final org.apache.storm.thrift.protocol.TField STORM_ID_FIELD_DESC = new org.apache.storm.thrift.protocol.TField("storm_id", org.apache.storm.thrift.protocol.TType.STRING, (short)1);
private static final org.apache.storm.thrift.protocol.TField EXECUTOR_STATS_FIELD_DESC = new org.apache.storm.thrift.protocol.TField("executor_stats", org.apache.storm.thrift.protocol.TType.MAP, (short)2);
private static final org.apache.storm.thrift.protocol.TField TIME_SECS_FIELD_DESC = new org.apache.storm.thrift.protocol.TField("time_secs", org.apache.storm.thrift.protocol.TType.I32, (short)3);
private static final org.apache.storm.thrift.protocol.TField TIME_SECS_FIELD_DESC = new org.apache.storm.thrift.protocol.TField("time_secs", org.apache.storm.thrift.protocol.TType.I64, (short)3);
private static final org.apache.storm.thrift.protocol.TField UPTIME_SECS_FIELD_DESC = new org.apache.storm.thrift.protocol.TField("uptime_secs", org.apache.storm.thrift.protocol.TType.I32, (short)4);

private static final org.apache.storm.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new ClusterWorkerHeartbeatStandardSchemeFactory();
private static final org.apache.storm.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new ClusterWorkerHeartbeatTupleSchemeFactory();

private @org.apache.storm.thrift.annotation.Nullable java.lang.String storm_id; // required
private @org.apache.storm.thrift.annotation.Nullable java.util.Map<ExecutorInfo,ExecutorStats> executor_stats; // required
private int time_secs; // required
private long time_secs; // required
private int uptime_secs; // required

/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
Expand Down Expand Up @@ -126,7 +126,7 @@ public java.lang.String getFieldName() {
new org.apache.storm.thrift.meta_data.StructMetaData(org.apache.storm.thrift.protocol.TType.STRUCT, ExecutorInfo.class),
new org.apache.storm.thrift.meta_data.StructMetaData(org.apache.storm.thrift.protocol.TType.STRUCT, ExecutorStats.class))));
tmpMap.put(_Fields.TIME_SECS, new org.apache.storm.thrift.meta_data.FieldMetaData("time_secs", org.apache.storm.thrift.TFieldRequirementType.REQUIRED,
new org.apache.storm.thrift.meta_data.FieldValueMetaData(org.apache.storm.thrift.protocol.TType.I32)));
new org.apache.storm.thrift.meta_data.FieldValueMetaData(org.apache.storm.thrift.protocol.TType.I64)));
tmpMap.put(_Fields.UPTIME_SECS, new org.apache.storm.thrift.meta_data.FieldMetaData("uptime_secs", org.apache.storm.thrift.TFieldRequirementType.REQUIRED,
new org.apache.storm.thrift.meta_data.FieldValueMetaData(org.apache.storm.thrift.protocol.TType.I32)));
metaDataMap = java.util.Collections.unmodifiableMap(tmpMap);
Expand All @@ -139,7 +139,7 @@ public ClusterWorkerHeartbeat() {
public ClusterWorkerHeartbeat(
java.lang.String storm_id,
java.util.Map<ExecutorInfo,ExecutorStats> executor_stats,
int time_secs,
long time_secs,
int uptime_secs)
{
this();
Expand Down Expand Up @@ -252,11 +252,11 @@ public void set_executor_stats_isSet(boolean value) {
}
}

public int get_time_secs() {
public long get_time_secs() {
return this.time_secs;
}

public void set_time_secs(int time_secs) {
public void set_time_secs(long time_secs) {
this.time_secs = time_secs;
set_time_secs_isSet(true);
}
Expand Down Expand Up @@ -319,7 +319,7 @@ public void setFieldValue(_Fields field, @org.apache.storm.thrift.annotation.Nul
if (value == null) {
unset_time_secs();
} else {
set_time_secs((java.lang.Integer)value);
set_time_secs((java.lang.Long)value);
}
break;

Expand Down Expand Up @@ -438,7 +438,7 @@ public int hashCode() {
if (is_set_executor_stats())
hashCode = hashCode * 8191 + executor_stats.hashCode();

hashCode = hashCode * 8191 + time_secs;
hashCode = hashCode * 8191 + org.apache.storm.thrift.TBaseHelper.hashCode(time_secs);

hashCode = hashCode * 8191 + uptime_secs;

Expand Down Expand Up @@ -634,8 +634,8 @@ public void read(org.apache.storm.thrift.protocol.TProtocol iprot, ClusterWorker
}
break;
case 3: // TIME_SECS
if (schemeField.type == org.apache.storm.thrift.protocol.TType.I32) {
struct.time_secs = iprot.readI32();
if (schemeField.type == org.apache.storm.thrift.protocol.TType.I64) {
struct.time_secs = iprot.readI64();
struct.set_time_secs_isSet(true);
} else {
org.apache.storm.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
Expand Down Expand Up @@ -682,7 +682,7 @@ public void write(org.apache.storm.thrift.protocol.TProtocol oprot, ClusterWorke
oprot.writeFieldEnd();
}
oprot.writeFieldBegin(TIME_SECS_FIELD_DESC);
oprot.writeI32(struct.time_secs);
oprot.writeI64(struct.time_secs);
oprot.writeFieldEnd();
oprot.writeFieldBegin(UPTIME_SECS_FIELD_DESC);
oprot.writeI32(struct.uptime_secs);
Expand Down Expand Up @@ -714,7 +714,7 @@ public void write(org.apache.storm.thrift.protocol.TProtocol prot, ClusterWorker
_iter827.getValue().write(oprot);
}
}
oprot.writeI32(struct.time_secs);
oprot.writeI64(struct.time_secs);
oprot.writeI32(struct.uptime_secs);
}

Expand All @@ -738,7 +738,7 @@ public void read(org.apache.storm.thrift.protocol.TProtocol prot, ClusterWorkerH
}
}
struct.set_executor_stats_isSet(true);
struct.time_secs = iprot.readI32();
struct.time_secs = iprot.readI64();
struct.set_time_secs_isSet(true);
struct.uptime_secs = iprot.readI32();
struct.set_uptime_secs_isSet(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@
public class LSWorkerHeartbeat implements org.apache.storm.thrift.TBase<LSWorkerHeartbeat, LSWorkerHeartbeat._Fields>, java.io.Serializable, Cloneable, Comparable<LSWorkerHeartbeat> {
private static final org.apache.storm.thrift.protocol.TStruct STRUCT_DESC = new org.apache.storm.thrift.protocol.TStruct("LSWorkerHeartbeat");

private static final org.apache.storm.thrift.protocol.TField TIME_SECS_FIELD_DESC = new org.apache.storm.thrift.protocol.TField("time_secs", org.apache.storm.thrift.protocol.TType.I32, (short)1);
private static final org.apache.storm.thrift.protocol.TField TIME_SECS_FIELD_DESC = new org.apache.storm.thrift.protocol.TField("time_secs", org.apache.storm.thrift.protocol.TType.I64, (short)1);
private static final org.apache.storm.thrift.protocol.TField TOPOLOGY_ID_FIELD_DESC = new org.apache.storm.thrift.protocol.TField("topology_id", org.apache.storm.thrift.protocol.TType.STRING, (short)2);
private static final org.apache.storm.thrift.protocol.TField EXECUTORS_FIELD_DESC = new org.apache.storm.thrift.protocol.TField("executors", org.apache.storm.thrift.protocol.TType.LIST, (short)3);
private static final org.apache.storm.thrift.protocol.TField PORT_FIELD_DESC = new org.apache.storm.thrift.protocol.TField("port", org.apache.storm.thrift.protocol.TType.I32, (short)4);

private static final org.apache.storm.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new LSWorkerHeartbeatStandardSchemeFactory();
private static final org.apache.storm.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new LSWorkerHeartbeatTupleSchemeFactory();

private int time_secs; // required
private long time_secs; // required
private @org.apache.storm.thrift.annotation.Nullable java.lang.String topology_id; // required
private @org.apache.storm.thrift.annotation.Nullable java.util.List<ExecutorInfo> executors; // required
private int port; // required
Expand Down Expand Up @@ -120,7 +120,7 @@ public java.lang.String getFieldName() {
static {
java.util.Map<_Fields, org.apache.storm.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, org.apache.storm.thrift.meta_data.FieldMetaData>(_Fields.class);
tmpMap.put(_Fields.TIME_SECS, new org.apache.storm.thrift.meta_data.FieldMetaData("time_secs", org.apache.storm.thrift.TFieldRequirementType.REQUIRED,
new org.apache.storm.thrift.meta_data.FieldValueMetaData(org.apache.storm.thrift.protocol.TType.I32)));
new org.apache.storm.thrift.meta_data.FieldValueMetaData(org.apache.storm.thrift.protocol.TType.I64)));
tmpMap.put(_Fields.TOPOLOGY_ID, new org.apache.storm.thrift.meta_data.FieldMetaData("topology_id", org.apache.storm.thrift.TFieldRequirementType.REQUIRED,
new org.apache.storm.thrift.meta_data.FieldValueMetaData(org.apache.storm.thrift.protocol.TType.STRING)));
tmpMap.put(_Fields.EXECUTORS, new org.apache.storm.thrift.meta_data.FieldMetaData("executors", org.apache.storm.thrift.TFieldRequirementType.REQUIRED,
Expand All @@ -136,7 +136,7 @@ public LSWorkerHeartbeat() {
}

public LSWorkerHeartbeat(
int time_secs,
long time_secs,
java.lang.String topology_id,
java.util.List<ExecutorInfo> executors,
int port)
Expand Down Expand Up @@ -184,11 +184,11 @@ public void clear() {
this.port = 0;
}

public int get_time_secs() {
public long get_time_secs() {
return this.time_secs;
}

public void set_time_secs(int time_secs) {
public void set_time_secs(long time_secs) {
this.time_secs = time_secs;
set_time_secs_isSet(true);
}
Expand Down Expand Up @@ -299,7 +299,7 @@ public void setFieldValue(_Fields field, @org.apache.storm.thrift.annotation.Nul
if (value == null) {
unset_time_secs();
} else {
set_time_secs((java.lang.Integer)value);
set_time_secs((java.lang.Long)value);
}
break;

Expand Down Expand Up @@ -426,7 +426,7 @@ public boolean equals(LSWorkerHeartbeat that) {
public int hashCode() {
int hashCode = 1;

hashCode = hashCode * 8191 + time_secs;
hashCode = hashCode * 8191 + org.apache.storm.thrift.TBaseHelper.hashCode(time_secs);

hashCode = hashCode * 8191 + ((is_set_topology_id()) ? 131071 : 524287);
if (is_set_topology_id())
Expand Down Expand Up @@ -600,8 +600,8 @@ public void read(org.apache.storm.thrift.protocol.TProtocol iprot, LSWorkerHeart
}
switch (schemeField.id) {
case 1: // TIME_SECS
if (schemeField.type == org.apache.storm.thrift.protocol.TType.I32) {
struct.time_secs = iprot.readI32();
if (schemeField.type == org.apache.storm.thrift.protocol.TType.I64) {
struct.time_secs = iprot.readI64();
struct.set_time_secs_isSet(true);
} else {
org.apache.storm.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
Expand Down Expand Up @@ -657,7 +657,7 @@ public void write(org.apache.storm.thrift.protocol.TProtocol oprot, LSWorkerHear

oprot.writeStructBegin(STRUCT_DESC);
oprot.writeFieldBegin(TIME_SECS_FIELD_DESC);
oprot.writeI32(struct.time_secs);
oprot.writeI64(struct.time_secs);
oprot.writeFieldEnd();
if (struct.topology_id != null) {
oprot.writeFieldBegin(TOPOLOGY_ID_FIELD_DESC);
Expand Down Expand Up @@ -697,7 +697,7 @@ private static class LSWorkerHeartbeatTupleScheme extends org.apache.storm.thrif
@Override
public void write(org.apache.storm.thrift.protocol.TProtocol prot, LSWorkerHeartbeat struct) throws org.apache.storm.thrift.TException {
org.apache.storm.thrift.protocol.TTupleProtocol oprot = (org.apache.storm.thrift.protocol.TTupleProtocol) prot;
oprot.writeI32(struct.time_secs);
oprot.writeI64(struct.time_secs);
oprot.writeString(struct.topology_id);
{
oprot.writeI32(struct.executors.size());
Expand All @@ -712,7 +712,7 @@ public void write(org.apache.storm.thrift.protocol.TProtocol prot, LSWorkerHeart
@Override
public void read(org.apache.storm.thrift.protocol.TProtocol prot, LSWorkerHeartbeat struct) throws org.apache.storm.thrift.TException {
org.apache.storm.thrift.protocol.TTupleProtocol iprot = (org.apache.storm.thrift.protocol.TTupleProtocol) prot;
struct.time_secs = iprot.readI32();
struct.time_secs = iprot.readI64();
struct.set_time_secs_isSet(true);
struct.topology_id = iprot.readString();
struct.set_topology_id_isSet(true);
Expand Down
Loading
Loading