From 8e9d43af7ddd909833476b755a63a3ea447508f5 Mon Sep 17 00:00:00 2001 From: ch4r10t33r Date: Sun, 12 Apr 2026 16:52:45 +0100 Subject: [PATCH] layer: RTT tier order for RS pollChunks; shared latency_tier PeerSessionStats gains rtt_ms (default unknown). RsStrategy.pollChunks sorts peers by LatencyTier then lower RTT before allocate. Extract tier thresholds and latencyTier into layer/latency_tier.zig; discovery/peering/score re-exports for existing API. Closes #39 --- src/ci_root_broadcast.zig | 1 + src/discovery/peering/score.zig | 30 +++------------- src/layer/broadcast_types.zig | 4 +++ src/layer/latency_tier.zig | 25 +++++++++++++ src/layer/rs_strategy.zig | 63 +++++++++++++++++++++++++++++++-- src/root.zig | 2 ++ 6 files changed, 98 insertions(+), 27 deletions(-) create mode 100644 src/layer/latency_tier.zig diff --git a/src/ci_root_broadcast.zig b/src/ci_root_broadcast.zig index 288eee2..2b628c9 100644 --- a/src/ci_root_broadcast.zig +++ b/src/ci_root_broadcast.zig @@ -9,6 +9,7 @@ test { _ = @import("layer/dedup_registry.zig"); _ = @import("layer/ec_scheme.zig"); _ = @import("layer/emit_planner.zig"); + _ = @import("layer/latency_tier.zig"); _ = @import("layer/rs_encode.zig"); _ = @import("layer/rs_init.zig"); _ = @import("layer/rs_strategy.zig"); diff --git a/src/discovery/peering/score.zig b/src/discovery/peering/score.zig index 866c41e..902fb8d 100644 --- a/src/discovery/peering/score.zig +++ b/src/discovery/peering/score.zig @@ -8,23 +8,12 @@ //! are derived directly from RTT and flow into chunk dispatch ordering. const std = @import("std"); +const latency_tier = @import("../../layer/latency_tier.zig"); -// --------------------------------------------------------------------------- -// Latency tiers (mirror of broadcast layer tiers, 002-ec-broadcast.md) -// --------------------------------------------------------------------------- - -/// Inner tier — chunks sent first. RTT < 60 ms. -pub const rtt_inner_ms: u32 = 60; -/// Mid tier — second priority. RTT 60–120 ms. -pub const rtt_mid_ms: u32 = 120; -/// Outer tier — everything above rtt_mid_ms. -pub const LatencyTier = enum { inner, mid, outer }; - -pub fn latencyTier(rtt_ms: u32) LatencyTier { - if (rtt_ms < rtt_inner_ms) return .inner; - if (rtt_ms < rtt_mid_ms) return .mid; - return .outer; -} +pub const rtt_inner_ms = latency_tier.rtt_inner_ms; +pub const rtt_mid_ms = latency_tier.rtt_mid_ms; +pub const LatencyTier = latency_tier.LatencyTier; +pub const latencyTier = latency_tier.latencyTier; // --------------------------------------------------------------------------- // Score decay @@ -138,15 +127,6 @@ pub const Score = struct { // Tests // --------------------------------------------------------------------------- -test "latencyTier boundaries" { - try std.testing.expectEqual(LatencyTier.inner, latencyTier(0)); - try std.testing.expectEqual(LatencyTier.inner, latencyTier(59)); - try std.testing.expectEqual(LatencyTier.mid, latencyTier(60)); - try std.testing.expectEqual(LatencyTier.mid, latencyTier(119)); - try std.testing.expectEqual(LatencyTier.outer, latencyTier(120)); - try std.testing.expectEqual(LatencyTier.outer, latencyTier(500)); -} - test "recordRtt lowers composite for high RTT" { var s = Score{}; s.recordRtt(300, 0); diff --git a/src/layer/broadcast_types.zig b/src/layer/broadcast_types.zig index a245665..dae2bb4 100644 --- a/src/layer/broadcast_types.zig +++ b/src/layer/broadcast_types.zig @@ -1,5 +1,7 @@ //! Broadcast-layer enums and aliases aligned with ethp2p `broadcast/types.go`. +const std = @import("std"); + pub const ChunkHandle = u64; pub const protocol_v1: u32 = 1; @@ -33,6 +35,8 @@ pub const DedupCancel = struct { /// Per-peer per-session stats; the session owns and mutates fields. Strategy holds a pointer only. pub const PeerSessionStats = struct { peer_id: []const u8 = &.{}, + /// Measured RTT in ms; `maxInt(u32)` means unknown (lowest dispatch preference). + rtt_ms: u32 = std.math.maxInt(u32), }; pub fn ChunkDispatch(comptime ChunkId: type) type { diff --git a/src/layer/latency_tier.zig b/src/layer/latency_tier.zig new file mode 100644 index 0000000..02bbf3f --- /dev/null +++ b/src/layer/latency_tier.zig @@ -0,0 +1,25 @@ +//! RTT → latency tier for RS chunk dispatch and peering (002-ec-broadcast). + +const std = @import("std"); + +/// Inner tier — chunks sent first. RTT < 60 ms. +pub const rtt_inner_ms: u32 = 60; +/// Mid tier — second priority. RTT 60–120 ms. +pub const rtt_mid_ms: u32 = 120; +/// Outer tier — everything above `rtt_mid_ms`. +pub const LatencyTier = enum { inner, mid, outer }; + +pub fn latencyTier(rtt_ms: u32) LatencyTier { + if (rtt_ms < rtt_inner_ms) return .inner; + if (rtt_ms < rtt_mid_ms) return .mid; + return .outer; +} + +test "latencyTier boundaries" { + try std.testing.expectEqual(LatencyTier.inner, latencyTier(0)); + try std.testing.expectEqual(LatencyTier.inner, latencyTier(59)); + try std.testing.expectEqual(LatencyTier.mid, latencyTier(60)); + try std.testing.expectEqual(LatencyTier.mid, latencyTier(119)); + try std.testing.expectEqual(LatencyTier.outer, latencyTier(120)); + try std.testing.expectEqual(LatencyTier.outer, latencyTier(500)); +} diff --git a/src/layer/rs_strategy.zig b/src/layer/rs_strategy.zig index 53cce69..9d92336 100644 --- a/src/layer/rs_strategy.zig +++ b/src/layer/rs_strategy.zig @@ -4,6 +4,7 @@ const std = @import("std"); const bitmap_mod = @import("bitmap.zig"); const broadcast_types = @import("broadcast_types.zig"); const emit_planner = @import("emit_planner.zig"); +const latency_tier = @import("latency_tier.zig"); const rs_encode = @import("rs_encode.zig"); const rs_init = @import("rs_init.zig"); @@ -405,16 +406,36 @@ pub const RsStrategy = struct { return out; } + const PollPeer = struct { + peer: []const u8, + ps: *PeerState, + }; + + fn lessPollPeer(_: void, a: PollPeer, b: PollPeer) bool { + const ta = latency_tier.latencyTier(a.ps.stats.rtt_ms); + const tb = latency_tier.latencyTier(b.ps.stats.rtt_ms); + if (ta != tb) return @intFromEnum(ta) < @intFromEnum(tb); + return a.ps.stats.rtt_ms < b.ps.stats.rtt_ms; + } + pub fn pollChunks(self: *RsStrategy) (Allocator.Error || emit_planner.PlannerError)![]broadcast_types.ChunkDispatch(ChunkIdent) { const allocator = self.allocator; var list: std.ArrayListUnmanaged(broadcast_types.ChunkDispatch(ChunkIdent)) = .{}; errdefer list.deinit(allocator); + var order: std.ArrayListUnmanaged(PollPeer) = .{}; + defer order.deinit(allocator); + var it = self.peers.iterator(); while (it.next()) |kv| { if (kv.value_ptr.completed) continue; - const peer = kv.key_ptr.*; - if (try self.allocate(peer, kv.value_ptr)) |disp| { + try order.append(allocator, .{ .peer = kv.key_ptr.*, .ps = kv.value_ptr }); + } + + std.sort.pdq(PollPeer, order.items, {}, lessPollPeer); + + for (order.items) |entry| { + if (try self.allocate(entry.peer, entry.ps)) |disp| { try list.append(allocator, disp); } } @@ -526,6 +547,44 @@ test "origin decode roundtrip" { try std.testing.expectEqualSlices(u8, &msg, out); } +test "pollChunks prefers inner-tier peer by RTT" { + const gpa = std.testing.allocator; + const msg = [_]u8{ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }; + const cfg = RsConfig{ + .data_shards = 4, + .parity_shards = 2, + .chunk_len = 0, + .bitmap_threshold = 50, + .forward_multiplier = 4, + .disable_bitmap = false, + }; + + var origin = try RsStrategy.newOrigin(gpa, cfg, &msg); + defer origin.deinit(); + + var relay = try RsStrategy.newRelay(gpa, cfg, &origin.preamble); + defer relay.deinit(); + + const peer_outer = "outer"; + const peer_inner = "inner"; + var stats_outer: broadcast_types.PeerSessionStats = .{ .peer_id = peer_outer, .rtt_ms = 200 }; + var stats_inner: broadcast_types.PeerSessionStats = .{ .peer_id = peer_inner, .rtt_ms = 20 }; + try relay.attachPeer(peer_inner, &stats_inner); + try relay.attachPeer(peer_outer, &stats_outer); + + // Ingest from a sender not in `peers` so downstream peer bitmaps stay empty for forwarding. + const upstream = "upstream"; + for (0..4) |i| { + const r = try relay.takeChunk(upstream, .{ .index = @intCast(i) }, origin.chunks[i], null); + try std.testing.expectEqual(broadcast_types.Verdict.accepted, r.verdict); + } + + const outgoing = try relay.pollChunks(); + defer gpa.free(outgoing); + try std.testing.expect(outgoing.len >= 1); + try std.testing.expectEqualStrings(peer_inner, outgoing[0].peer); +} + test "relay takeChunk and decode" { const gpa = std.testing.allocator; const msg = [_]u8{ 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 }; diff --git a/src/root.zig b/src/root.zig index 4dab2fb..52a92a3 100644 --- a/src/root.zig +++ b/src/root.zig @@ -31,6 +31,7 @@ pub const layer = struct { pub const rs_encode = @import("layer/rs_encode.zig"); pub const rs_init = @import("layer/rs_init.zig"); pub const rs_strategy = @import("layer/rs_strategy.zig"); + pub const latency_tier = @import("layer/latency_tier.zig"); pub const dedup = @import("layer/dedup.zig"); pub const dedup_registry = @import("layer/dedup_registry.zig"); pub const verify_queue = @import("layer/verify_queue.zig"); @@ -62,6 +63,7 @@ test { _ = layer.verify_queue; _ = layer.verify_workers; _ = layer.ec_scheme; + _ = layer.latency_tier; _ = sim.rs_mesh; _ = sim.gossipsub_transport; _ = sim.gossipsub_protocol;