From a208c1ffe0fa6069c004eb5bc8ff6d1b1c7e1eef Mon Sep 17 00:00:00 2001 From: ch4r10t33r Date: Wed, 13 May 2026 19:45:43 +0100 Subject: [PATCH 1/4] feat: bounded worker pool for multi-upstream polling Replace one detached OS thread per upstream with a fixed-size worker queue and Thread.join(), capping parallel SSZ downloads and thread churn when many nodes are configured. Add poll_max_concurrency (default 16): --poll-concurrency and LEANPOINT_POLL_CONCURRENCY; values above 256 are clamped in the poller. --- src/config.zig | 25 +++- src/poller.zig | 15 ++- src/upstreams.zig | 307 ++++++++++++++-------------------------------- 3 files changed, 126 insertions(+), 221 deletions(-) diff --git a/src/config.zig b/src/config.zig index e454fb7..4f59d0e 100644 --- a/src/config.zig +++ b/src/config.zig @@ -7,6 +7,9 @@ pub const Config = struct { lean_api_path: []const u8, poll_interval_ms: u64, request_timeout_ms: u64, + /// Max concurrent upstream HTTP polls per tick (multi-upstream mode only). + /// Avoids spawning one OS thread per node and limits parallel 16MB SSZ downloads. + poll_max_concurrency: u32, stale_after_ms: u64, static_dir: ?[]const u8, upstreams_config: ?[]const u8, @@ -28,6 +31,7 @@ pub fn load(allocator: std.mem.Allocator) !Config { var lean_api_path = try allocator.dupe(u8, defaults.lean_api_path); var poll_interval_ms: u64 = defaults.poll_interval_ms; var request_timeout_ms: u64 = defaults.request_timeout_ms; + var poll_max_concurrency: u32 = defaults.poll_max_concurrency; var stale_after_ms: u64 = defaults.stale_after_ms; var static_dir: ?[]const u8 = null; var upstreams_config: ?[]const u8 = null; @@ -56,6 +60,10 @@ pub fn load(allocator: std.mem.Allocator) !Config { request_timeout_ms = try parseU64(val); allocator.free(val); } + if (try getEnvOwned(allocator, "LEANPOINT_POLL_CONCURRENCY")) |val| { + poll_max_concurrency = try parseU32(val); + allocator.free(val); + } if (try getEnvOwned(allocator, "LEANPOINT_STALE_MS")) |val| { stale_after_ms = try parseU64(val); allocator.free(val); @@ -94,6 +102,9 @@ pub fn load(allocator: std.mem.Allocator) !Config { } else if (std.mem.eql(u8, arg, "--timeout-ms")) { const value = try needArg(&args, "--timeout-ms"); request_timeout_ms = try parseU64(value); + } else if (std.mem.eql(u8, arg, "--poll-concurrency")) { + const value = try needArg(&args, "--poll-concurrency"); + poll_max_concurrency = try parseU32(value); } else if (std.mem.eql(u8, arg, "--stale-ms")) { const value = try needArg(&args, "--stale-ms"); stale_after_ms = try parseU64(value); @@ -118,6 +129,8 @@ pub fn load(allocator: std.mem.Allocator) !Config { lean_api_path = prefixed; } + if (poll_max_concurrency == 0) poll_max_concurrency = defaults.poll_max_concurrency; + return Config{ .bind_address = bind_address, .bind_port = bind_port, @@ -125,6 +138,7 @@ pub fn load(allocator: std.mem.Allocator) !Config { .lean_api_path = lean_api_path, .poll_interval_ms = poll_interval_ms, .request_timeout_ms = request_timeout_ms, + .poll_max_concurrency = poll_max_concurrency, .stale_after_ms = stale_after_ms, .static_dir = static_dir, .upstreams_config = upstreams_config, @@ -138,6 +152,7 @@ const Defaults = struct { lean_api_path: []const u8 = "/status", poll_interval_ms: u64 = 10_000, request_timeout_ms: u64 = 5_000, + poll_max_concurrency: u32 = 16, stale_after_ms: u64 = 30_000, }; @@ -156,6 +171,10 @@ fn parseU16(value: []const u8) !u16 { return std.fmt.parseInt(u16, value, 10); } +fn parseU32(value: []const u8) !u32 { + return std.fmt.parseInt(u32, value, 10); +} + fn needArg(args: *std.process.ArgIterator, flag: []const u8) ![]const u8 { if (args.next()) |val| return val; std.debug.print("Missing value for {s}\n", .{flag}); @@ -176,7 +195,8 @@ fn printUsage() void { \\ --lean-path LeanEthereum path (default /status) \\ --upstreams-config JSON config file with multiple upstreams \\ --poll-ms Poll interval in milliseconds - \\ --timeout-ms Request timeout in milliseconds + \\ --timeout-ms Request timeout per upstream HTTP call + \\ --poll-concurrency Max parallel upstream polls (multi-upstream; default 16) \\ --stale-ms Stale threshold in milliseconds \\ --static-dir Optional static frontend directory \\ --help Show this help @@ -184,7 +204,8 @@ fn printUsage() void { \\Env vars: \\ LEANPOINT_BIND_ADDR, LEANPOINT_BIND_PORT, LEANPOINT_LEAN_URL, \\ LEANPOINT_LEAN_PATH, LEANPOINT_POLL_MS, LEANPOINT_TIMEOUT_MS, - \\ LEANPOINT_STALE_MS, LEANPOINT_STATIC_DIR, LEANPOINT_UPSTREAMS_CONFIG + \\ LEANPOINT_POLL_CONCURRENCY, LEANPOINT_STALE_MS, LEANPOINT_STATIC_DIR, + \\ LEANPOINT_UPSTREAMS_CONFIG \\ \\Multi-upstream mode: \\ When --upstreams-config is specified, leanpoint polls multiple lean diff --git a/src/poller.zig b/src/poller.zig index ca20882..301e111 100644 --- a/src/poller.zig +++ b/src/poller.zig @@ -103,13 +103,17 @@ pub const Poller = struct { } } - /// Poll multiple upstreams with consensus + /// Poll multiple upstreams with consensus (bounded worker pool in upstreams.zig). + /// `self.client` is only passed for API compatibility; workers use their own clients. fn pollMulti(self: *Poller, manager: *upstreams_mod.UpstreamManager, now_ms: i64) !void { - // Poll all upstreams concurrently and get consensus. - // self.client is passed for API compatibility but each upstream spawns its own - // client internally to avoid shared-state hangs. var state_ssz: ?[]u8 = null; - const consensus_slots = manager.pollUpstreams(&self.client, now_ms, self.config.request_timeout_ms, &state_ssz); + const consensus_slots = manager.pollUpstreams( + &self.client, + now_ms, + self.config.request_timeout_ms, + self.config.poll_max_concurrency, + &state_ssz, + ); if (consensus_slots) |slots| { const latency_ms: u64 = 0; // Latency not tracked in multi-upstream mode @@ -140,6 +144,7 @@ test "poller initialization" { .lean_api_path = try std.testing.allocator.dupe(u8, "/status"), .poll_interval_ms = 10_000, .request_timeout_ms = 5_000, + .poll_max_concurrency = 16, .stale_after_ms = 30_000, .static_dir = null, .upstreams_config = null, diff --git a/src/upstreams.zig b/src/upstreams.zig index 5977366..65705f4 100644 --- a/src/upstreams.zig +++ b/src/upstreams.zig @@ -106,133 +106,86 @@ pub const UpstreamManager = struct { const PollResult = struct { index: usize, slots: ?lean_api.Slots, - error_msg: ?[]const u8, + error_msg: ?[]u8, state_ssz: ?[]u8, is_aggregator: ?bool, head_slot: ?u64, }; - // --------------------------------------------------------------------------- - // Concurrent per-upstream polling context. - // - // Zig 0.14's std.http.Client does not expose connect_timeout / read_timeout, - // so a single hung TCP connection would stall the entire sequential poll loop - // forever. The fix: spawn one thread per upstream and wait for all of them - // behind a hard deadline (request_timeout_ms). If an upstream doesn't answer - // in time its thread is detached; it will free its own memory when it - // eventually returns (reference-counted via an atomic counter). - // --------------------------------------------------------------------------- - - /// Heap-allocated context shared between the spawner and the worker thread. - /// Reference-counted (starts at 2: one for the spawner, one for the thread). - /// Freed automatically when the last holder calls release(). - const PollCtx = struct { + /// Shared work queue for bounded-concurrency poll workers. + const PollWork = struct { allocator: std.mem.Allocator, - index: usize, - // owned copies of the target strings so the thread stays safe even after - // a timeout when the spawner has already moved on. - base_url: []u8, - path: []u8, - name: []u8, - // Socket-level timeout applied to the HTTP request inside the worker. - // Bounds the worker's lifetime so detached threads clean up their - // sockets within a predictable window (rather than hanging for the - // kernel's TCP retransmit window on unresponsive peers). + targets: []const PollTarget, + results: []PollResult, + next: std.atomic.Value(usize), timeout_ms: u64, - // written by the thread, read by the spawner after deadline - done: std.atomic.Value(bool), - slots: ?lean_api.Slots, - error_msg: ?[]u8, - state_ssz: ?[]u8, - is_aggregator: ?bool, - head_slot: ?u64, - ref_count: std.atomic.Value(u32), - - fn create( - allocator: std.mem.Allocator, - index: usize, - name: []const u8, - base_url: []const u8, - path: []const u8, - timeout_ms: u64, - ) !*PollCtx { - const ctx = try allocator.create(PollCtx); - ctx.* = .{ - .allocator = allocator, - .index = index, - .name = try allocator.dupe(u8, name), - .base_url = try allocator.dupe(u8, base_url), - .path = try allocator.dupe(u8, path), - .timeout_ms = timeout_ms, - .done = std.atomic.Value(bool).init(false), - .slots = null, - .error_msg = null, - .state_ssz = null, - .is_aggregator = null, - .head_slot = null, - .ref_count = std.atomic.Value(u32).init(2), - }; - return ctx; - } - - /// Drop one reference; destroy when both holders have released. - fn release(self: *PollCtx) void { - if (self.ref_count.fetchSub(1, .acq_rel) == 1) { - self.allocator.free(self.name); - self.allocator.free(self.base_url); - self.allocator.free(self.path); - if (self.error_msg) |m| self.allocator.free(m); - if (self.state_ssz) |s| self.allocator.free(s); - self.allocator.destroy(self); - } - } }; - /// Worker thread: creates its own HTTP client, fetches slots, writes result, - /// then releases its reference to the context. - fn pollUpstreamThread(ctx: *PollCtx) void { - defer ctx.release(); + fn pollUpstreamWorker(work: *PollWork) void { + while (true) { + const wi = work.next.fetchAdd(1, .monotonic); + if (wi >= work.targets.len) break; + work.results[wi] = pollOneUpstream(work.allocator, work.targets[wi], work.timeout_ms); + } + } - var client = std.http.Client{ .allocator = ctx.allocator }; + /// One upstream: own HTTP client (no cross-thread sharing), socket timeouts in lean_api. + fn pollOneUpstream(allocator: std.mem.Allocator, target: PollTarget, timeout_ms: u64) PollResult { + var client = std.http.Client{ .allocator = allocator }; defer client.deinit(); var state_ssz: ?[]u8 = null; const slots = lean_api.fetchSlots( - ctx.allocator, + allocator, &client, - ctx.base_url, - ctx.path, + target.base_url, + target.path, &state_ssz, - ctx.timeout_ms, + timeout_ms, ) catch |err| { - ctx.error_msg = std.fmt.allocPrint(ctx.allocator, "{s}", .{@errorName(err)}) catch null; - ctx.done.store(true, .release); - return; + const msg = std.fmt.allocPrint(allocator, "{s}", .{@errorName(err)}) catch null; + if (state_ssz) |s| allocator.free(s); + log.warn("Upstream {s} ({s}) failed: {s}", .{ target.name, target.base_url, @errorName(err) }); + return .{ + .index = target.index, + .slots = null, + .error_msg = msg, + .state_ssz = null, + .is_aggregator = null, + .head_slot = null, + }; }; - // Extra lean HTTP calls share part of the poll budget (avoid 3× full timeout). - const sub_to = @max(2_000, ctx.timeout_ms / 2); - ctx.is_aggregator = lean_api.fetchAggregatorOptional(ctx.allocator, &client, ctx.base_url, sub_to); - ctx.head_slot = lean_api.fetchHeadSlotOptional(ctx.allocator, &client, ctx.base_url, sub_to); - - ctx.slots = slots; - ctx.state_ssz = state_ssz; - ctx.done.store(true, .release); + const sub_to = @max(2_000, timeout_ms / 2); + const is_aggregator = lean_api.fetchAggregatorOptional(allocator, &client, target.base_url, sub_to); + const head_slot = lean_api.fetchHeadSlotOptional(allocator, &client, target.base_url, sub_to); + + log.debug("Upstream {s}: justified={d}, finalized={d}", .{ + target.name, slots.justified_slot, slots.finalized_slot, + }); + + return .{ + .index = target.index, + .slots = slots, + .error_msg = null, + .state_ssz = state_ssz, + .is_aggregator = is_aggregator, + .head_slot = head_slot, + }; } - /// Poll all upstreams concurrently and return consensus slots if 50%+ agree. + /// Poll upstreams with a bounded worker pool, then require 50%+ agreement on slots. /// - /// Each upstream is polled in its own OS thread. After spawning all threads - /// the function waits up to timeout_ms for them all to finish. Any thread - /// still running at the deadline is abandoned (detached); its PollCtx is - /// reference-counted so it cleans up itself when it eventually completes. + /// Each worker pulls the next upstream from a queue (own `std.http.Client` per poll). + /// This caps OS threads and parallel 16MB SSZ downloads when many nodes are configured, + /// while `lean_api` socket timeouts still bound each hung TCP connection. pub fn pollUpstreams( self: *UpstreamManager, - // client parameter kept for API compatibility but is no longer used here; - // each thread creates its own client to avoid shared-state data races. + // Kept for API compatibility; each worker creates its own client. _: *std.http.Client, now_ms: i64, timeout_ms: u64, + max_concurrency: u32, out_state_ssz: *?[]u8, ) ?lean_api.Slots { // Step 1: snapshot upstreams (brief lock) @@ -257,127 +210,53 @@ pub const UpstreamManager = struct { if (targets.items.len == 0) return null; - // Step 2: spawn one thread per upstream - var ctxs = std.ArrayList(*PollCtx).init(self.allocator); - defer { - for (ctxs.items) |ctx| ctx.release(); // spawner releases its ref - ctxs.deinit(); - } - - for (targets.items) |target| { - const ctx = PollCtx.create( - self.allocator, - target.index, - target.name, - target.base_url, - target.path, - timeout_ms, - ) catch |err| { - log.warn("Failed to allocate poll context for {s}: {s}", .{ target.name, @errorName(err) }); - continue; - }; - - const thread = std.Thread.spawn(.{}, pollUpstreamThread, .{ctx}) catch |err| { - log.warn("Failed to spawn poll thread for {s}: {s}", .{ target.name, @errorName(err) }); - ctx.release(); // thread never ran — release thread's ref too - ctx.release(); // release spawner's ref - continue; - }; - thread.detach(); - - ctxs.append(ctx) catch { - // append failed; ctx's thread is already running and holds its own ref, - // so just release the spawner's ref. - ctx.release(); - continue; - }; + const n = targets.items.len; + const max_w = @min(max_concurrency, 256); + const cap: usize = @max(1, @min(@as(usize, @intCast(max_w)), n)); + if (cap < n) { + log.debug("Polling {d} upstreams with concurrency {d} (≤{d} OS threads this tick)", .{ n, cap, cap }); } - if (ctxs.items.len == 0) return null; - - // Step 3: wait for all threads behind a hard deadline - const deadline_ms = now_ms + @as(i64, @intCast(timeout_ms)); - while (std.time.milliTimestamp() < deadline_ms) { - var all_done = true; - for (ctxs.items) |ctx| { - if (!ctx.done.load(.acquire)) { - all_done = false; - break; - } - } - if (all_done) break; - std.time.sleep(5 * std.time.ns_per_ms); - } - - // Log any upstreams that timed out - for (ctxs.items) |ctx| { - if (!ctx.done.load(.acquire)) { - log.warn("Upstream {s} ({s}) timed out after {d}ms — detaching thread", .{ - ctx.name, ctx.base_url, timeout_ms, - }); - } - } - - // Step 4: collect results from completed threads and update upstream state - var results = std.ArrayList(PollResult).init(self.allocator); + const results = self.allocator.alloc(PollResult, n) catch { + log.warn("Failed to allocate poll result buffer", .{}); + return null; + }; defer { - for (results.items) |result| { + for (results) |result| { if (result.error_msg) |msg| self.allocator.free(msg); if (result.state_ssz) |blob| self.allocator.free(blob); } - results.deinit(); + self.allocator.free(results); } - for (ctxs.items) |ctx| { - if (!ctx.done.load(.acquire)) { - // Timed out — record as an error so the upstream shows as failing - const error_msg = self.allocator.dupe(u8, "timeout") catch null; - results.append(PollResult{ - .index = ctx.index, - .slots = null, - .error_msg = error_msg, - .state_ssz = null, - .is_aggregator = null, - .head_slot = null, - }) catch continue; - continue; - } + var work = PollWork{ + .allocator = self.allocator, + .targets = targets.items, + .results = results, + .next = std.atomic.Value(usize).init(0), + .timeout_ms = timeout_ms, + }; - if (ctx.slots) |slots| { - log.debug("Upstream {s}: justified={d}, finalized={d}", .{ - ctx.name, slots.justified_slot, slots.finalized_slot, - }); - // Transfer ownership of SSZ blob out of the ctx before ctx.release() - // is called by the defer above. We null it in ctx to avoid double-free. - const ssz = ctx.state_ssz; - ctx.state_ssz = null; - results.append(PollResult{ - .index = ctx.index, - .slots = slots, - .error_msg = null, - .state_ssz = ssz, - .is_aggregator = ctx.is_aggregator, - .head_slot = ctx.head_slot, - }) catch continue; - } else { - const err_copy = if (ctx.error_msg) |m| self.allocator.dupe(u8, m) catch null else null; - log.warn("Upstream {s} ({s}) failed: {s}", .{ - ctx.name, - ctx.base_url, - ctx.error_msg orelse "unknown", - }); - results.append(PollResult{ - .index = ctx.index, - .slots = null, - .error_msg = err_copy, - .state_ssz = null, - .is_aggregator = null, - .head_slot = null, - }) catch continue; - } + const threads = self.allocator.alloc(std.Thread, cap) catch { + log.warn("Failed to allocate thread handles for upstream poll", .{}); + return null; + }; + defer self.allocator.free(threads); + + var spawned: usize = 0; + + for (0..cap) |_| { + threads[spawned] = std.Thread.spawn(.{}, pollUpstreamWorker, .{&work}) catch |err| { + log.warn("Failed to spawn poll worker ({d}/{d}): {s}", .{ spawned, cap, @errorName(err) }); + for (0..spawned) |j| threads[j].join(); + return null; + }; + spawned += 1; } - // Step 5: Update upstream states (brief lock) + for (0..spawned) |j| threads[j].join(); + + // Step 2: Update upstream states (brief lock) var slot_counts = std.AutoHashMap(u128, u32).init(self.allocator); defer slot_counts.deinit(); @@ -387,7 +266,7 @@ pub const UpstreamManager = struct { self.mutex.lock(); defer self.mutex.unlock(); - for (results.items, 0..) |*result, i| { + for (results, 0..) |*result, i| { if (result.index >= self.upstreams.items.len) continue; var upstream = &self.upstreams.items[result.index]; @@ -413,7 +292,7 @@ pub const UpstreamManager = struct { upstream.last_error = result.error_msg; upstream.is_aggregator = null; upstream.head_slot = null; - results.items[i].error_msg = null; // ownership transferred + results[i].error_msg = null; // ownership transferred } } } @@ -433,12 +312,12 @@ pub const UpstreamManager = struct { const justified_slot: u64 = @truncate(slot_key >> 64); const finalized_slot: u64 = @truncate(slot_key & 0xFFFFFFFFFFFFFFFF); - for (results.items, 0..) |*res, i| { + for (results, 0..) |*res, i| { if (res.slots) |s| { if (s.justified_slot == justified_slot and s.finalized_slot == finalized_slot) { if (res.state_ssz) |blob| { out_state_ssz.* = blob; - results.items[i].state_ssz = null; + results[i].state_ssz = null; break; } } From 650bed5c90041f47b30124cbaafe50df3a5bb2c5 Mon Sep 17 00:00:00 2001 From: ch4r10t33r Date: Wed, 13 May 2026 20:26:57 +0100 Subject: [PATCH 2/4] fix(upstreams): apply poll results after partial thread spawn failure If spawning the Nth worker failed, we joined already-running workers but returned null before Step 2, discarding completed HTTP poll results and leaving all upstreams at initial error_count/last_error. Break out of the spawn loop instead, join all started workers once, then run Step 2 when at least one worker started. Only return null when no worker could be spawned (results buffer uninitialized). --- src/upstreams.zig | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/upstreams.zig b/src/upstreams.zig index 65705f4..eec63a2 100644 --- a/src/upstreams.zig +++ b/src/upstreams.zig @@ -248,14 +248,20 @@ pub const UpstreamManager = struct { for (0..cap) |_| { threads[spawned] = std.Thread.spawn(.{}, pollUpstreamWorker, .{&work}) catch |err| { log.warn("Failed to spawn poll worker ({d}/{d}): {s}", .{ spawned, cap, @errorName(err) }); - for (0..spawned) |j| threads[j].join(); - return null; + // Do not return here: workers already running drain the full work queue; after + // join their `results` are valid. Proceeding to Step 2 applies per-upstream status. + break; }; spawned += 1; } for (0..spawned) |j| threads[j].join(); + if (spawned == 0) { + log.warn("No poll workers could be started (spawn failed for every slot)", .{}); + return null; + } + // Step 2: Update upstream states (brief lock) var slot_counts = std.AutoHashMap(u128, u32).init(self.allocator); defer slot_counts.deinit(); From 3f083074a82941c64d73c66729ea0cc936290d3e Mon Sep 17 00:00:00 2001 From: ch4r10t33r Date: Wed, 13 May 2026 20:40:39 +0100 Subject: [PATCH 3/4] fix: deadline-bounded dispatcher so a hung connect cannot wedge polling MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The previous worker-pool change used Thread.join() to wait for workers, but std.http.Client.open does a synchronous connect() and lean_api's SO_RCVTIMEO/SO_SNDTIMEO only bound read/write — not connect. A single blackholed peer hangs a worker on connect forever, so join() blocks forever and the poll loop never advances (live dashboard stuck on healthy=false, error_count=0, last_error=null for every node). Restore the original detach-on-deadline shape and layer bounded concurrency on top: - Refcounted PollCtx (spawner + worker hold a ref each); slow workers safely outlive the dispatcher. - SlotState (mutex + condvar + in_flight counter) caps parallel polls and signals the dispatcher when a slot frees up or all workers drain. - Per-tick deadline = ceil(N/cap) * timeout_ms + headroom; workers not done by then are abandoned (still detached, ctx survives). - Worker uses release store on done; dispatcher acquires before reading any output field. SSZ ownership transfer for the consensus winner unchanged. --- src/upstreams.zig | 338 +++++++++++++++++++++++++++++++--------------- 1 file changed, 228 insertions(+), 110 deletions(-) diff --git a/src/upstreams.zig b/src/upstreams.zig index eec63a2..6e9edd6 100644 --- a/src/upstreams.zig +++ b/src/upstreams.zig @@ -102,83 +102,153 @@ pub const UpstreamManager = struct { path: []const u8, }; - /// Result of polling a single upstream - const PollResult = struct { - index: usize, - slots: ?lean_api.Slots, - error_msg: ?[]u8, - state_ssz: ?[]u8, - is_aggregator: ?bool, - head_slot: ?u64, + // --------------------------------------------------------------------------- + // Bounded-concurrency poll dispatcher with deadline-based abandonment. + // + // Why this shape: + // * std.http.Client.open does a synchronous connect(); SO_RCVTIMEO/SO_SNDTIMEO + // in lean_api only bound the read/write phase, not connect itself. So a + // blackholed peer can hang a worker thread indefinitely. + // * Therefore the dispatcher must NOT join workers (one stuck connect would + // wedge the entire poll loop). It detaches them and waits on a deadline. + // * Refcounted PollCtx lets a slow worker safely outlive the dispatcher + // (it frees its own state when the spawner has already moved on). + // * A condvar-guarded in-flight counter caps OS thread / SSZ-download + // parallelism even when N upstreams >> the configured concurrency. + // --------------------------------------------------------------------------- + + /// Slot accounting: caps concurrent in-flight workers and signals the + /// dispatcher when one finishes (so the next can be spawned, and the final + /// drain wait can wake up early when all workers have completed). + const SlotState = struct { + mutex: std.Thread.Mutex = .{}, + cond: std.Thread.Condition = .{}, + in_flight: usize = 0, + cap: usize, }; - /// Shared work queue for bounded-concurrency poll workers. - const PollWork = struct { + /// Heap context shared between dispatcher and worker (refcount starts at 2). + /// Worker writes outputs then `done.store(true, .release)`; dispatcher only + /// reads outputs after it has observed done==true (via `.acquire` load). + const PollCtx = struct { allocator: std.mem.Allocator, - targets: []const PollTarget, - results: []PollResult, - next: std.atomic.Value(usize), + slot: *SlotState, + target_index: usize, + // Owned copies of target strings — safe past the dispatcher's lifetime. + name: []u8, + base_url: []u8, + path: []u8, timeout_ms: u64, - }; + done: std.atomic.Value(bool), + slots: ?lean_api.Slots = null, + error_msg: ?[]u8 = null, + state_ssz: ?[]u8 = null, + is_aggregator: ?bool = null, + head_slot: ?u64 = null, + refs: std.atomic.Value(u32), + + fn create( + allocator: std.mem.Allocator, + slot: *SlotState, + target: PollTarget, + timeout_ms: u64, + ) !*PollCtx { + const ctx = try allocator.create(PollCtx); + errdefer allocator.destroy(ctx); + const name_copy = try allocator.dupe(u8, target.name); + errdefer allocator.free(name_copy); + const url_copy = try allocator.dupe(u8, target.base_url); + errdefer allocator.free(url_copy); + const path_copy = try allocator.dupe(u8, target.path); + ctx.* = .{ + .allocator = allocator, + .slot = slot, + .target_index = target.index, + .name = name_copy, + .base_url = url_copy, + .path = path_copy, + .timeout_ms = timeout_ms, + .done = std.atomic.Value(bool).init(false), + .refs = std.atomic.Value(u32).init(2), + }; + return ctx; + } - fn pollUpstreamWorker(work: *PollWork) void { - while (true) { - const wi = work.next.fetchAdd(1, .monotonic); - if (wi >= work.targets.len) break; - work.results[wi] = pollOneUpstream(work.allocator, work.targets[wi], work.timeout_ms); + fn release(self: *PollCtx) void { + if (self.refs.fetchSub(1, .acq_rel) == 1) { + self.allocator.free(self.name); + self.allocator.free(self.base_url); + self.allocator.free(self.path); + if (self.error_msg) |m| self.allocator.free(m); + if (self.state_ssz) |s| self.allocator.free(s); + self.allocator.destroy(self); + } } + }; + + /// Decrement the in-flight count and wake any dispatcher waiting on a slot + /// or on the final drain. Always called once per worker (defer in worker). + fn slotRelease(slot: *SlotState) void { + slot.mutex.lock(); + defer slot.mutex.unlock(); + if (slot.in_flight > 0) slot.in_flight -= 1; + slot.cond.broadcast(); } - /// One upstream: own HTTP client (no cross-thread sharing), socket timeouts in lean_api. - fn pollOneUpstream(allocator: std.mem.Allocator, target: PollTarget, timeout_ms: u64) PollResult { - var client = std.http.Client{ .allocator = allocator }; + /// Worker thread: own HTTP client, write outputs, signal completion. If the + /// peer hangs on connect/read this thread may live well past the dispatcher's + /// deadline — its PollCtx ref keeps everything alive until cleanup. + fn workerThread(ctx: *PollCtx) void { + defer ctx.release(); + defer slotRelease(ctx.slot); + + var client = std.http.Client{ .allocator = ctx.allocator }; defer client.deinit(); var state_ssz: ?[]u8 = null; - const slots = lean_api.fetchSlots( - allocator, + if (lean_api.fetchSlots( + ctx.allocator, &client, - target.base_url, - target.path, + ctx.base_url, + ctx.path, &state_ssz, - timeout_ms, - ) catch |err| { - const msg = std.fmt.allocPrint(allocator, "{s}", .{@errorName(err)}) catch null; - if (state_ssz) |s| allocator.free(s); - log.warn("Upstream {s} ({s}) failed: {s}", .{ target.name, target.base_url, @errorName(err) }); - return .{ - .index = target.index, - .slots = null, - .error_msg = msg, - .state_ssz = null, - .is_aggregator = null, - .head_slot = null, - }; - }; + ctx.timeout_ms, + )) |slots| { + const sub_to = @max(2_000, ctx.timeout_ms / 2); + ctx.is_aggregator = lean_api.fetchAggregatorOptional(ctx.allocator, &client, ctx.base_url, sub_to); + ctx.head_slot = lean_api.fetchHeadSlotOptional(ctx.allocator, &client, ctx.base_url, sub_to); + ctx.slots = slots; + ctx.state_ssz = state_ssz; + log.debug("Upstream {s}: justified={d}, finalized={d}", .{ + ctx.name, slots.justified_slot, slots.finalized_slot, + }); + } else |err| { + if (state_ssz) |s| ctx.allocator.free(s); + ctx.error_msg = std.fmt.allocPrint(ctx.allocator, "{s}", .{@errorName(err)}) catch null; + log.warn("Upstream {s} ({s}) failed: {s}", .{ ctx.name, ctx.base_url, @errorName(err) }); + } - const sub_to = @max(2_000, timeout_ms / 2); - const is_aggregator = lean_api.fetchAggregatorOptional(allocator, &client, target.base_url, sub_to); - const head_slot = lean_api.fetchHeadSlotOptional(allocator, &client, target.base_url, sub_to); - - log.debug("Upstream {s}: justified={d}, finalized={d}", .{ - target.name, slots.justified_slot, slots.finalized_slot, - }); - - return .{ - .index = target.index, - .slots = slots, - .error_msg = null, - .state_ssz = state_ssz, - .is_aggregator = is_aggregator, - .head_slot = head_slot, - }; + // Publish results to the dispatcher (must come last; release ordering + // pairs with the dispatcher's acquire load before reading any field). + ctx.done.store(true, .release); + } + + /// Wait until cond is signaled or the deadline elapses. Returns true if + /// the deadline has not yet passed (caller can re-check its predicate). + fn waitUntilDeadline(slot: *SlotState, deadline_ms: i64) bool { + const now = std.time.milliTimestamp(); + if (now >= deadline_ms) return false; + const remaining_ms: u64 = @intCast(deadline_ms - now); + // Cap each individual wait so we re-check the predicate periodically + // even if a signal is missed for some reason. + const wait_ms: u64 = @min(remaining_ms, @as(u64, 100)); + const wait_ns: u64 = wait_ms * std.time.ns_per_ms; + slot.cond.timedWait(&slot.mutex, wait_ns) catch {}; + return std.time.milliTimestamp() < deadline_ms; } - /// Poll upstreams with a bounded worker pool, then require 50%+ agreement on slots. - /// - /// Each worker pulls the next upstream from a queue (own `std.http.Client` per poll). - /// This caps OS threads and parallel 16MB SSZ downloads when many nodes are configured, - /// while `lean_api` socket timeouts still bound each hung TCP connection. + /// Poll upstreams with bounded concurrency + a hard per-tick deadline. + /// Returns consensus slots if 50%+ of responding upstreams agree. pub fn pollUpstreams( self: *UpstreamManager, // Kept for API compatibility; each worker creates its own client. @@ -213,70 +283,107 @@ pub const UpstreamManager = struct { const n = targets.items.len; const max_w = @min(max_concurrency, 256); const cap: usize = @max(1, @min(@as(usize, @intCast(max_w)), n)); + + // Per-tick deadline: enough time to drain all batches, plus a small + // headroom so a worker that finishes right at the limit still counts. + const batches: u64 = (@as(u64, n) + @as(u64, cap) - 1) / @as(u64, cap); + const round_ms: u64 = batches * timeout_ms + timeout_ms / 2; + const deadline_ms: i64 = now_ms + @as(i64, @intCast(round_ms)); + if (cap < n) { - log.debug("Polling {d} upstreams with concurrency {d} (≤{d} OS threads this tick)", .{ n, cap, cap }); + log.debug( + "Polling {d} upstreams with concurrency {d} (≤{d} OS threads, deadline ≤{d}ms)", + .{ n, cap, cap, round_ms }, + ); } - const results = self.allocator.alloc(PollResult, n) catch { - log.warn("Failed to allocate poll result buffer", .{}); - return null; - }; + var slot = SlotState{ .cap = cap }; + + var ctxs = std.ArrayList(*PollCtx).init(self.allocator); defer { - for (results) |result| { - if (result.error_msg) |msg| self.allocator.free(msg); - if (result.state_ssz) |blob| self.allocator.free(blob); - } - self.allocator.free(results); + for (ctxs.items) |ctx| ctx.release(); + ctxs.deinit(); } - var work = PollWork{ - .allocator = self.allocator, - .targets = targets.items, - .results = results, - .next = std.atomic.Value(usize).init(0), - .timeout_ms = timeout_ms, - }; + // Step 2: dispatch — wait for a free slot (or the deadline) before each spawn + for (targets.items) |target| { + slot.mutex.lock(); + var slot_acquired = false; + while (slot.in_flight >= slot.cap) { + if (!waitUntilDeadline(&slot, deadline_ms)) break; + } + if (slot.in_flight < slot.cap) { + slot.in_flight += 1; + slot_acquired = true; + } + slot.mutex.unlock(); - const threads = self.allocator.alloc(std.Thread, cap) catch { - log.warn("Failed to allocate thread handles for upstream poll", .{}); - return null; - }; - defer self.allocator.free(threads); + if (!slot_acquired) { + log.warn( + "Dispatch deadline reached before {s} could start; remaining upstreams skipped this tick", + .{target.name}, + ); + break; + } - var spawned: usize = 0; + const ctx = PollCtx.create(self.allocator, &slot, target, timeout_ms) catch |err| { + log.warn("Failed to allocate poll context for {s}: {s}", .{ target.name, @errorName(err) }); + slotRelease(&slot); + continue; + }; - for (0..cap) |_| { - threads[spawned] = std.Thread.spawn(.{}, pollUpstreamWorker, .{&work}) catch |err| { - log.warn("Failed to spawn poll worker ({d}/{d}): {s}", .{ spawned, cap, @errorName(err) }); - // Do not return here: workers already running drain the full work queue; after - // join their `results` are valid. Proceeding to Step 2 applies per-upstream status. - break; + const t = std.Thread.spawn(.{}, workerThread, .{ctx}) catch |err| { + log.warn("Failed to spawn poll thread for {s}: {s}", .{ target.name, @errorName(err) }); + slotRelease(&slot); + ctx.release(); // worker ref (worker never started) + ctx.release(); // dispatcher ref + continue; }; - spawned += 1; - } + t.detach(); - for (0..spawned) |j| threads[j].join(); + ctxs.append(ctx) catch { + // Worker is already running and holds its own ref — just drop ours. + ctx.release(); + continue; + }; + } - if (spawned == 0) { - log.warn("No poll workers could be started (spawn failed for every slot)", .{}); - return null; + // Step 3: drain — wait for all in-flight workers to finish, or the deadline + slot.mutex.lock(); + while (slot.in_flight > 0) { + if (!waitUntilDeadline(&slot, deadline_ms)) break; } + slot.mutex.unlock(); - // Step 2: Update upstream states (brief lock) + // Step 4: collect results and update per-upstream state under our lock. var slot_counts = std.AutoHashMap(u128, u32).init(self.allocator); defer slot_counts.deinit(); var successful_polls: u32 = 0; + var timed_out: u32 = 0; { self.mutex.lock(); defer self.mutex.unlock(); - for (results, 0..) |*result, i| { - if (result.index >= self.upstreams.items.len) continue; - var upstream = &self.upstreams.items[result.index]; + for (ctxs.items) |ctx| { + if (ctx.target_index >= self.upstreams.items.len) continue; + var upstream = &self.upstreams.items[ctx.target_index]; + + // Acquire ordering: pairs with the worker's release store on `done`. + const finished = ctx.done.load(.acquire); - if (result.slots) |slots| { + if (!finished) { + timed_out += 1; + upstream.error_count += 1; + if (upstream.last_error) |old_err| self.allocator.free(old_err); + upstream.last_error = self.allocator.dupe(u8, "timeout (worker abandoned)") catch null; + upstream.is_aggregator = null; + upstream.head_slot = null; + continue; + } + + if (ctx.slots) |slots| { if (upstream.last_error) |old_err| { self.allocator.free(old_err); upstream.last_error = null; @@ -284,8 +391,8 @@ pub const UpstreamManager = struct { upstream.error_count = 0; upstream.last_slots = slots; upstream.last_success_ms = now_ms; - upstream.is_aggregator = result.is_aggregator; - upstream.head_slot = result.head_slot; + upstream.is_aggregator = ctx.is_aggregator; + upstream.head_slot = ctx.head_slot; const slot_key: u128 = (@as(u128, slots.justified_slot) << 64) | @as(u128, slots.finalized_slot); const count = slot_counts.get(slot_key) orelse 0; @@ -295,15 +402,22 @@ pub const UpstreamManager = struct { } else { upstream.error_count += 1; if (upstream.last_error) |old_err| self.allocator.free(old_err); - upstream.last_error = result.error_msg; + if (ctx.error_msg) |msg| { + upstream.last_error = self.allocator.dupe(u8, msg) catch null; + } else { + upstream.last_error = self.allocator.dupe(u8, "unknown error") catch null; + } upstream.is_aggregator = null; upstream.head_slot = null; - results[i].error_msg = null; // ownership transferred } } } - // Step 6: consensus (no lock needed) + if (timed_out > 0) { + log.warn("{d}/{d} upstream polls did not finish within deadline (continuing)", .{ timed_out, ctxs.items.len }); + } + + // Step 5: consensus (no lock needed) if (successful_polls == 0) { log.warn("No upstreams responded successfully", .{}); return null; @@ -318,12 +432,16 @@ pub const UpstreamManager = struct { const justified_slot: u64 = @truncate(slot_key >> 64); const finalized_slot: u64 = @truncate(slot_key & 0xFFFFFFFFFFFFFFFF); - for (results, 0..) |*res, i| { - if (res.slots) |s| { + // Hand out one matching SSZ blob. Move ownership out of the + // ctx so PollCtx.release() doesn't free it; remaining ctxs' + // ssz blobs are freed normally on release. + for (ctxs.items) |ctx| { + if (!ctx.done.load(.acquire)) continue; + if (ctx.slots) |s| { if (s.justified_slot == justified_slot and s.finalized_slot == finalized_slot) { - if (res.state_ssz) |blob| { + if (ctx.state_ssz) |blob| { out_state_ssz.* = blob; - results[i].state_ssz = null; + ctx.state_ssz = null; break; } } From 06eb868cde8b6dbfee88cedf76be23bdc23a9dbd Mon Sep 17 00:00:00 2001 From: ch4r10t33r Date: Wed, 13 May 2026 20:46:31 +0100 Subject: [PATCH 4/4] tune: cap default 64, deadline budgets aux calls + slow workers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit After deploying to a 64-node cluster the dispatcher was hitting its deadline at upstream 36/64. Two causes: 1. Worker slot occupancy was up to ~2× timeout_ms (fetchSlots up to timeout_ms, then aggregator + head_slot at timeout_ms/2 each), but the deadline assumed timeout_ms per worker. 2. Default cap of 16 forced 4 batches for the 64-node prod cluster, compounding the under-budgeting. Changes: - Default poll_max_concurrency 16 → 64. With one batch in prod, polls finish within stale_after_ms; SSZ memory peak is no worse than the pre-bounding code. - Aux call timeout: timeout_ms/2 → timeout_ms/4 (min 1s). Healthy workers free their slot in ~1.5× timeout_ms. - Deadline formula: per_worker_max_ms = 2 × timeout_ms; round_ms = batches × per_worker_max_ms + timeout_ms (drain). Dispatch will not starve when a batch is full of slow peers. --- src/config.zig | 4 ++-- src/upstreams.zig | 16 ++++++++++++---- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/src/config.zig b/src/config.zig index 4f59d0e..ab14a3d 100644 --- a/src/config.zig +++ b/src/config.zig @@ -152,7 +152,7 @@ const Defaults = struct { lean_api_path: []const u8 = "/status", poll_interval_ms: u64 = 10_000, request_timeout_ms: u64 = 5_000, - poll_max_concurrency: u32 = 16, + poll_max_concurrency: u32 = 64, stale_after_ms: u64 = 30_000, }; @@ -196,7 +196,7 @@ fn printUsage() void { \\ --upstreams-config JSON config file with multiple upstreams \\ --poll-ms Poll interval in milliseconds \\ --timeout-ms Request timeout per upstream HTTP call - \\ --poll-concurrency Max parallel upstream polls (multi-upstream; default 16) + \\ --poll-concurrency Max parallel upstream polls (multi-upstream; default 64) \\ --stale-ms Stale threshold in milliseconds \\ --static-dir Optional static frontend directory \\ --help Show this help diff --git a/src/upstreams.zig b/src/upstreams.zig index 6e9edd6..2f7a038 100644 --- a/src/upstreams.zig +++ b/src/upstreams.zig @@ -214,7 +214,10 @@ pub const UpstreamManager = struct { &state_ssz, ctx.timeout_ms, )) |slots| { - const sub_to = @max(2_000, ctx.timeout_ms / 2); + // Optional metadata calls share a small slice of the per-poll budget + // (was timeout_ms/2 each; that doubled worker slot occupancy on healthy + // peers and starved the dispatcher with many upstreams + bounded cap). + const sub_to = @max(@as(u64, 1_000), ctx.timeout_ms / 4); ctx.is_aggregator = lean_api.fetchAggregatorOptional(ctx.allocator, &client, ctx.base_url, sub_to); ctx.head_slot = lean_api.fetchHeadSlotOptional(ctx.allocator, &client, ctx.base_url, sub_to); ctx.slots = slots; @@ -284,10 +287,15 @@ pub const UpstreamManager = struct { const max_w = @min(max_concurrency, 256); const cap: usize = @max(1, @min(@as(usize, @intCast(max_w)), n)); - // Per-tick deadline: enough time to drain all batches, plus a small - // headroom so a worker that finishes right at the limit still counts. + // Per-tick deadline. A single worker can occupy its slot for up to + // ~timeout_ms (fetchSlots) + 2 × (timeout_ms / 4) (aux calls) ≈ + // 1.5 × timeout_ms; in the worst case (slow path on each call) closer + // to 2 × timeout_ms. We size the deadline assuming worst case so + // dispatch never starves with a bounded cap and slow peers, plus one + // extra timeout_ms for the final drain. const batches: u64 = (@as(u64, n) + @as(u64, cap) - 1) / @as(u64, cap); - const round_ms: u64 = batches * timeout_ms + timeout_ms / 2; + const per_worker_max_ms: u64 = timeout_ms * 2; + const round_ms: u64 = batches * per_worker_max_ms + timeout_ms; const deadline_ms: i64 = now_ms + @as(i64, @intCast(round_ms)); if (cap < n) {