refactor: replace timestamp-based node liveness with event-index staleness#1451
refactor: replace timestamp-based node liveness with event-index staleness#1451AlexCheema wants to merge 3 commits intomainfrom
Conversation
…eness Remove all datetime timestamps from the event-sourced state. Instead of tracking `last_seen: Mapping[NodeId, datetime]` and checking wall-clock deltas, track `last_event_index_by_node: Mapping[NodeId, int]` — the global event index at which each node was last heard from. The master's planning loop now compares snapshots of each node's last event index across consecutive cycles. If a node produces no new events for 3 consecutive plan cycles (~30s), it is disconnected. This eliminates false-positive node removals caused by slow info gathering tasks, since the timeout is now purely based on whether events are flowing — not on wall-clock timing of individual tasks. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
|
just throwing out that claude's analysis is straight up wrong here so please understand if this change is what you want before making it. "i haven't seen an event in 30s" is the outcome either way, this obfuscates that imo. Also the master timestamp was an intentional inclusion for debugging that is being removed here. |
|
The motivation for this change is that consumer device timestamps should not be trusted. Event-index staleness gives us a reliable, monotonic measure of liveness that doesn't depend on clock synchronization across nodes. |
|
To clarify — we're fine keeping the master timestamp for our own debugging purposes, but it shouldn't be used for liveness detection logic. The core liveness check should rely on event-index staleness, not timestamps from consumer devices that can drift or be out of sync. |
AlexCheema
left a comment
There was a problem hiding this comment.
Code Review: PR #1451 — Replace Timestamp-Based Node Liveness with Event-Indexed
Overall Assessment
Clean, focused refactor that eliminates a real problem (clock skew between nodes) by replacing wall-clock timestamps with event-index-based staleness detection. The change is well-scoped and the before/after is easy to follow.
Strengths
-
Eliminates clock-skew vulnerability: The old
last_seen: Mapping[NodeId, datetime]depended on master-assigned timestamps. If the master's clock drifted or if timestamp comparison had timezone issues, nodes could be falsely timed out or kept alive. Event indices are monotonically increasing and master-local — no clock synchronization needed. -
NodeDisconnectedis a better name thanNodeTimedOut: The rename better describes the semantics — the master is disconnecting a node it considers stale, not necessarily "timing out" in a traditional sense. -
Clean removal of
whenfield fromNodeGatheredInfo: This field was always awkward — it was "a manually cast datetime overrode by the master" (per the old comment). Removing it simplifies the event model. -
Test updates are correct:
test_master.pyno longer needs to passwhen=str(datetime.now(...))inNodeGatheredInfo, andtest_apply_node_timed_out.pycorrectly uses the newNodeDisconnectedevent andlast_event_index_by_node.
Issues
-
last_event_index_by_nodeis only updated forNodeGatheredInfoevents (apply.py~line 107):if isinstance(event.event, NodeGatheredInfo): update["last_event_index_by_node"] = { **new_state.last_event_index_by_node, event.event.node_id: event.idx, }
Other node-originated events (e.g.,
RunnerStatusUpdated,TaskStatusUpdated,TaskAcknowledged) do NOT update this index. If a node stops sendingNodeGatheredInfobut continues sending other events (e.g., runner status updates during inference), it would appear stale and get disconnected. Verify thatNodeGatheredInfois sent regularly by all active nodes regardless of what else they're doing. If it's sent by the info_gatherer on a fixed interval, this is fine. -
Stale detection state is not event-sourced:
_last_checked_indicesand_stale_cyclesare instance variables on the master, NOT part ofState. If the master process crashes and restarts, these reset to empty dicts. It then takes 3 full plan cycles (~30s at 10s/cycle) before any stale node is detected. This matches the old 30-second timeout behavior, so it's not a regression, but worth documenting. -
Edge case: node with index 0: When a brand-new node sends its first
NodeGatheredInfo, itslast_event_index_by_nodevalue will be some positive index._last_checked_indiceswon't have an entry yet, solast_checkeddefaults to-1. Sincecurrent_indices[node_id] != last_checked, the stale count won't increment. Correct behavior. -
Edge case: rapidly rejoining node: If a node disconnects (removed from
last_event_index_by_nodeviaapply_node_disconnected) and immediately rejoins with a new peer ID, the old stale data in_stale_cyclesfor the old peer ID is harmless (it just won't match any currentlast_event_index_by_nodekey and will be ignored). But_stale_cycleswill accumulate dead entries over time. Consider periodically pruning keys that are no longer instate.last_event_index_by_node.
Minor
- The
_planloop still doesawait anyio.sleep(10)— 10-second granularity for stale detection is reasonable but worth documenting the detection latency: 3 stale cycles * 10s = minimum 30 seconds to detect a dead node.
Verdict
Approve. This is a clean improvement that removes a real problem (clock dependency) with a simpler, deterministic approach. The only concern worth addressing is #5 (verify NodeGatheredInfo frequency) — if it's sent on a fixed timer by the info_gatherer, this is a non-issue.
🤖 Generated with Claude Code
Code Review: PR #1451 — refactor: replace timestamp-based node liveness with event-index stalenessAuthor: AlexCheema OverviewReplaces wall-clock timestamp-based node liveness (last_seen: datetime, 30s Files changed:
MotivationSOUND. Two real problems with the old approach:
The new approach is deterministic (event indices are derived from the event Correctness — State ChangesPASS. state.py: Replacing events.py: Removing the Correctness — Apply LogicPASS. In apply() (top-level), the index tracking is done at the IndexedEvent layer: apply_node_disconnected() properly cleans up last_event_index_by_node, Correctness — Staleness DetectionPASS with notes. The new logic in _plan():
This is correct:
Race Condition AnalysisNONE. _plan() runs in a single async task. _last_checked_indices and Master Re-electionSAFE. On re-election, a new Master instance is created with fresh empty dicts: Edge Case — Only NodeGatheredInfo Updates the IndexNOTE. The apply() function only updates last_event_index_by_node for This means a node that is actively running inference (producing In practice this is LOW RISK because:
However, if InfoGatherer crashes while the runner continues working, the node Timing ComparisonOld: Exact 30-second wall-clock check (but subject to clock skew) The effective timeout is 20-40 seconds depending on when the node stops Dashboard CompatibilityPASS. No references to Backwards CompatibilityBREAKING for any external consumers that:
These are internal types, so the risk is low unless there are external Test CoverageADEQUATE. The test_master.py fixture is updated to remove the NitsNone. VerdictLGTM. Well-motivated refactor that eliminates non-deterministic timestamps from |
Code Review -- PR #1451: refactor: replace timestamp-based node liveness with event-index stalenessCI status: All 8 checks PASSED (typecheck, Build and check on aarch64-darwin, aarch64-linux, x86_64-linux) Merge status: CONFLICTING -- this PR has merge conflicts with OverviewReplaces wall-clock Changes span 6 files: Critical Issues1. Merge conflict with #1493 -- PR #1493 ("don't time out node identities", merged today) intentionally removed node_identities = {
key: value
for key, value in state.node_identities.items()
if key != event.node_id
}When rebasing, the 2. The PR removes this line from event._master_time_stamp = datetime.now(tz=timezone.utc) # pyright: ignore[reportPrivateUsage]The Recommended fix: Restore the Significant Issues3. Only In if isinstance(event.event, NodeGatheredInfo):
update["last_event_index_by_node"] = {
**new_state.last_event_index_by_node,
event.event.node_id: event.idx,
}Other node-originated events ( In practice this is low risk because the InfoGatherer produces events from multiple independent sources:
ALL of these would have to fail simultaneously to miss the heartbeat window. But this is a design decision worth making explicitly -- a comment noting that Minor Issues4. Detection latency is variable (20-40s, not "~30s") The effective detection window depends on when the node stops producing events relative to the plan cycle boundary. The PR description and comments say "~30s" -- more precisely it's 20-40 seconds, comparable to the old behavior. 5. No unit test for the staleness detection logic The Correctness AnalysisThe staleness detection logic in
The What's Good
VerdictSound design that eliminates a real problem. The implementation is correct, but the PR needs work before merge:
After addressing those three items, this is ready. Review only -- not a merge approval. |
Motivation
The node liveness system uses
last_seen: Mapping[NodeId, datetime]in the event-sourced State and checksnow - last_seen > 30sto time out nodes. This has two fundamental problems:Timestamps don't belong in event-sourced state.
datetime.now()is a side effect — replaying the same events produces different state depending on when they're replayed. The master was even overwriting the worker's timestamp with its own clock at indexing time (event.when = str(datetime.now(...))), adding another source of non-determinism.The timeout is finicky. The 30-second wall-clock threshold fires whenever any info-gathering task is slow — not when the node is actually unreachable. A node can be perfectly connected via libp2p, actively participating in topology, but get removed because
system_profilertook 31 seconds to respond. This causes false-positive node removals in production.Changes
State (
state.py):last_seen: Mapping[NodeId, datetime]withlast_event_index_by_node: Mapping[NodeId, int]— stores the global event index at which each node last produced an event. Fully deterministic: derived from the event stream itself.Events (
events.py):when: strfield fromNodeGatheredInfo— no longer needed.NodeTimedOut→NodeDisconnected— reflects the actual semantics (the node stopped producing events, not "we checked the clock").Apply (
apply.py):apply()now recordslast_event_index_by_node[node_id] = event.idxwhenever aNodeGatheredInfois applied. This keeps index tracking as a concern of the indexed event layer, not the individual event handler.apply_node_gathered_info()no longer touches liveness state.apply_node_timed_out()→apply_node_disconnected()withlast_event_index_by_nodecleanup.Master (
master/main.py):datetime/timedelta/timezoneimports and theevent.whenoverwrite in_event_processor._plan()with cycle-based staleness detection: each plan cycle (10s), the master snapshotslast_event_index_by_nodeand compares to the previous snapshot. If a node's index hasn't advanced for 3 consecutive cycles, it's disconnected._last_checked_indices,_stale_cycles) is local master state — ephemeral, not event-sourced, reset on re-election.Worker (
worker/main.py):whenfield fromNodeGatheredInfoconstruction. Remove unuseddatetimeimport.Why It Works
The old system asked: "Has this node's timestamp been updated within 30 wall-clock seconds?" — conflating "slow info gathering" with "node is dead."
The new system asks: "Has this node produced ANY event since the last time I checked?" — which is the actual question we care about. A node running macmon at 1s intervals, memory polling, network monitoring, etc. produces many events per second. If none of those events arrive for 3 consecutive 10-second plan cycles, the node is genuinely unreachable — not just slow at one particular task.
The event index is deterministic and monotonically increasing. It's derived from the event stream itself, making it a natural fit for event-sourced state. No wall clocks, no timezone issues, no race between worker timestamps and master timestamps.
Test Plan
Manual Testing
Automated Testing
test_masterupdated: removedwhenfield fromNodeGatheredInfofixture — passes.basedpyright: 0 errors, 0 warnings.ruff check: all passed.nix fmt: 0 files changed.🤖 Generated with Claude Code