diff --git a/src/config.zig b/src/config.zig index e454fb7..ab14a3d 100644 --- a/src/config.zig +++ b/src/config.zig @@ -7,6 +7,9 @@ pub const Config = struct { lean_api_path: []const u8, poll_interval_ms: u64, request_timeout_ms: u64, + /// Max concurrent upstream HTTP polls per tick (multi-upstream mode only). + /// Avoids spawning one OS thread per node and limits parallel 16MB SSZ downloads. + poll_max_concurrency: u32, stale_after_ms: u64, static_dir: ?[]const u8, upstreams_config: ?[]const u8, @@ -28,6 +31,7 @@ pub fn load(allocator: std.mem.Allocator) !Config { var lean_api_path = try allocator.dupe(u8, defaults.lean_api_path); var poll_interval_ms: u64 = defaults.poll_interval_ms; var request_timeout_ms: u64 = defaults.request_timeout_ms; + var poll_max_concurrency: u32 = defaults.poll_max_concurrency; var stale_after_ms: u64 = defaults.stale_after_ms; var static_dir: ?[]const u8 = null; var upstreams_config: ?[]const u8 = null; @@ -56,6 +60,10 @@ pub fn load(allocator: std.mem.Allocator) !Config { request_timeout_ms = try parseU64(val); allocator.free(val); } + if (try getEnvOwned(allocator, "LEANPOINT_POLL_CONCURRENCY")) |val| { + poll_max_concurrency = try parseU32(val); + allocator.free(val); + } if (try getEnvOwned(allocator, "LEANPOINT_STALE_MS")) |val| { stale_after_ms = try parseU64(val); allocator.free(val); @@ -94,6 +102,9 @@ pub fn load(allocator: std.mem.Allocator) !Config { } else if (std.mem.eql(u8, arg, "--timeout-ms")) { const value = try needArg(&args, "--timeout-ms"); request_timeout_ms = try parseU64(value); + } else if (std.mem.eql(u8, arg, "--poll-concurrency")) { + const value = try needArg(&args, "--poll-concurrency"); + poll_max_concurrency = try parseU32(value); } else if (std.mem.eql(u8, arg, "--stale-ms")) { const value = try needArg(&args, "--stale-ms"); stale_after_ms = try parseU64(value); @@ -118,6 +129,8 @@ pub fn load(allocator: std.mem.Allocator) !Config { lean_api_path = prefixed; } + if (poll_max_concurrency == 0) poll_max_concurrency = defaults.poll_max_concurrency; + return Config{ .bind_address = bind_address, .bind_port = bind_port, @@ -125,6 +138,7 @@ pub fn load(allocator: std.mem.Allocator) !Config { .lean_api_path = lean_api_path, .poll_interval_ms = poll_interval_ms, .request_timeout_ms = request_timeout_ms, + .poll_max_concurrency = poll_max_concurrency, .stale_after_ms = stale_after_ms, .static_dir = static_dir, .upstreams_config = upstreams_config, @@ -138,6 +152,7 @@ const Defaults = struct { lean_api_path: []const u8 = "/status", poll_interval_ms: u64 = 10_000, request_timeout_ms: u64 = 5_000, + poll_max_concurrency: u32 = 64, stale_after_ms: u64 = 30_000, }; @@ -156,6 +171,10 @@ fn parseU16(value: []const u8) !u16 { return std.fmt.parseInt(u16, value, 10); } +fn parseU32(value: []const u8) !u32 { + return std.fmt.parseInt(u32, value, 10); +} + fn needArg(args: *std.process.ArgIterator, flag: []const u8) ![]const u8 { if (args.next()) |val| return val; std.debug.print("Missing value for {s}\n", .{flag}); @@ -176,7 +195,8 @@ fn printUsage() void { \\ --lean-path LeanEthereum path (default /status) \\ --upstreams-config JSON config file with multiple upstreams \\ --poll-ms Poll interval in milliseconds - \\ --timeout-ms Request timeout in milliseconds + \\ --timeout-ms Request timeout per upstream HTTP call + \\ --poll-concurrency Max parallel upstream polls (multi-upstream; default 64) \\ --stale-ms Stale threshold in milliseconds \\ --static-dir Optional static frontend directory \\ --help Show this help @@ -184,7 +204,8 @@ fn printUsage() void { \\Env vars: \\ LEANPOINT_BIND_ADDR, LEANPOINT_BIND_PORT, LEANPOINT_LEAN_URL, \\ LEANPOINT_LEAN_PATH, LEANPOINT_POLL_MS, LEANPOINT_TIMEOUT_MS, - \\ LEANPOINT_STALE_MS, LEANPOINT_STATIC_DIR, LEANPOINT_UPSTREAMS_CONFIG + \\ LEANPOINT_POLL_CONCURRENCY, LEANPOINT_STALE_MS, LEANPOINT_STATIC_DIR, + \\ LEANPOINT_UPSTREAMS_CONFIG \\ \\Multi-upstream mode: \\ When --upstreams-config is specified, leanpoint polls multiple lean diff --git a/src/poller.zig b/src/poller.zig index ca20882..301e111 100644 --- a/src/poller.zig +++ b/src/poller.zig @@ -103,13 +103,17 @@ pub const Poller = struct { } } - /// Poll multiple upstreams with consensus + /// Poll multiple upstreams with consensus (bounded worker pool in upstreams.zig). + /// `self.client` is only passed for API compatibility; workers use their own clients. fn pollMulti(self: *Poller, manager: *upstreams_mod.UpstreamManager, now_ms: i64) !void { - // Poll all upstreams concurrently and get consensus. - // self.client is passed for API compatibility but each upstream spawns its own - // client internally to avoid shared-state hangs. var state_ssz: ?[]u8 = null; - const consensus_slots = manager.pollUpstreams(&self.client, now_ms, self.config.request_timeout_ms, &state_ssz); + const consensus_slots = manager.pollUpstreams( + &self.client, + now_ms, + self.config.request_timeout_ms, + self.config.poll_max_concurrency, + &state_ssz, + ); if (consensus_slots) |slots| { const latency_ms: u64 = 0; // Latency not tracked in multi-upstream mode @@ -140,6 +144,7 @@ test "poller initialization" { .lean_api_path = try std.testing.allocator.dupe(u8, "/status"), .poll_interval_ms = 10_000, .request_timeout_ms = 5_000, + .poll_max_concurrency = 16, .stale_after_ms = 30_000, .static_dir = null, .upstreams_config = null, diff --git a/src/upstreams.zig b/src/upstreams.zig index 5977366..2f7a038 100644 --- a/src/upstreams.zig +++ b/src/upstreams.zig @@ -102,82 +102,80 @@ pub const UpstreamManager = struct { path: []const u8, }; - /// Result of polling a single upstream - const PollResult = struct { - index: usize, - slots: ?lean_api.Slots, - error_msg: ?[]const u8, - state_ssz: ?[]u8, - is_aggregator: ?bool, - head_slot: ?u64, - }; - // --------------------------------------------------------------------------- - // Concurrent per-upstream polling context. + // Bounded-concurrency poll dispatcher with deadline-based abandonment. // - // Zig 0.14's std.http.Client does not expose connect_timeout / read_timeout, - // so a single hung TCP connection would stall the entire sequential poll loop - // forever. The fix: spawn one thread per upstream and wait for all of them - // behind a hard deadline (request_timeout_ms). If an upstream doesn't answer - // in time its thread is detached; it will free its own memory when it - // eventually returns (reference-counted via an atomic counter). + // Why this shape: + // * std.http.Client.open does a synchronous connect(); SO_RCVTIMEO/SO_SNDTIMEO + // in lean_api only bound the read/write phase, not connect itself. So a + // blackholed peer can hang a worker thread indefinitely. + // * Therefore the dispatcher must NOT join workers (one stuck connect would + // wedge the entire poll loop). It detaches them and waits on a deadline. + // * Refcounted PollCtx lets a slow worker safely outlive the dispatcher + // (it frees its own state when the spawner has already moved on). + // * A condvar-guarded in-flight counter caps OS thread / SSZ-download + // parallelism even when N upstreams >> the configured concurrency. // --------------------------------------------------------------------------- - /// Heap-allocated context shared between the spawner and the worker thread. - /// Reference-counted (starts at 2: one for the spawner, one for the thread). - /// Freed automatically when the last holder calls release(). + /// Slot accounting: caps concurrent in-flight workers and signals the + /// dispatcher when one finishes (so the next can be spawned, and the final + /// drain wait can wake up early when all workers have completed). + const SlotState = struct { + mutex: std.Thread.Mutex = .{}, + cond: std.Thread.Condition = .{}, + in_flight: usize = 0, + cap: usize, + }; + + /// Heap context shared between dispatcher and worker (refcount starts at 2). + /// Worker writes outputs then `done.store(true, .release)`; dispatcher only + /// reads outputs after it has observed done==true (via `.acquire` load). const PollCtx = struct { allocator: std.mem.Allocator, - index: usize, - // owned copies of the target strings so the thread stays safe even after - // a timeout when the spawner has already moved on. + slot: *SlotState, + target_index: usize, + // Owned copies of target strings — safe past the dispatcher's lifetime. + name: []u8, base_url: []u8, path: []u8, - name: []u8, - // Socket-level timeout applied to the HTTP request inside the worker. - // Bounds the worker's lifetime so detached threads clean up their - // sockets within a predictable window (rather than hanging for the - // kernel's TCP retransmit window on unresponsive peers). timeout_ms: u64, - // written by the thread, read by the spawner after deadline done: std.atomic.Value(bool), - slots: ?lean_api.Slots, - error_msg: ?[]u8, - state_ssz: ?[]u8, - is_aggregator: ?bool, - head_slot: ?u64, - ref_count: std.atomic.Value(u32), + slots: ?lean_api.Slots = null, + error_msg: ?[]u8 = null, + state_ssz: ?[]u8 = null, + is_aggregator: ?bool = null, + head_slot: ?u64 = null, + refs: std.atomic.Value(u32), fn create( allocator: std.mem.Allocator, - index: usize, - name: []const u8, - base_url: []const u8, - path: []const u8, + slot: *SlotState, + target: PollTarget, timeout_ms: u64, ) !*PollCtx { const ctx = try allocator.create(PollCtx); + errdefer allocator.destroy(ctx); + const name_copy = try allocator.dupe(u8, target.name); + errdefer allocator.free(name_copy); + const url_copy = try allocator.dupe(u8, target.base_url); + errdefer allocator.free(url_copy); + const path_copy = try allocator.dupe(u8, target.path); ctx.* = .{ .allocator = allocator, - .index = index, - .name = try allocator.dupe(u8, name), - .base_url = try allocator.dupe(u8, base_url), - .path = try allocator.dupe(u8, path), + .slot = slot, + .target_index = target.index, + .name = name_copy, + .base_url = url_copy, + .path = path_copy, .timeout_ms = timeout_ms, .done = std.atomic.Value(bool).init(false), - .slots = null, - .error_msg = null, - .state_ssz = null, - .is_aggregator = null, - .head_slot = null, - .ref_count = std.atomic.Value(u32).init(2), + .refs = std.atomic.Value(u32).init(2), }; return ctx; } - /// Drop one reference; destroy when both holders have released. fn release(self: *PollCtx) void { - if (self.ref_count.fetchSub(1, .acq_rel) == 1) { + if (self.refs.fetchSub(1, .acq_rel) == 1) { self.allocator.free(self.name); self.allocator.free(self.base_url); self.allocator.free(self.path); @@ -188,51 +186,79 @@ pub const UpstreamManager = struct { } }; - /// Worker thread: creates its own HTTP client, fetches slots, writes result, - /// then releases its reference to the context. - fn pollUpstreamThread(ctx: *PollCtx) void { + /// Decrement the in-flight count and wake any dispatcher waiting on a slot + /// or on the final drain. Always called once per worker (defer in worker). + fn slotRelease(slot: *SlotState) void { + slot.mutex.lock(); + defer slot.mutex.unlock(); + if (slot.in_flight > 0) slot.in_flight -= 1; + slot.cond.broadcast(); + } + + /// Worker thread: own HTTP client, write outputs, signal completion. If the + /// peer hangs on connect/read this thread may live well past the dispatcher's + /// deadline — its PollCtx ref keeps everything alive until cleanup. + fn workerThread(ctx: *PollCtx) void { defer ctx.release(); + defer slotRelease(ctx.slot); var client = std.http.Client{ .allocator = ctx.allocator }; defer client.deinit(); var state_ssz: ?[]u8 = null; - const slots = lean_api.fetchSlots( + if (lean_api.fetchSlots( ctx.allocator, &client, ctx.base_url, ctx.path, &state_ssz, ctx.timeout_ms, - ) catch |err| { + )) |slots| { + // Optional metadata calls share a small slice of the per-poll budget + // (was timeout_ms/2 each; that doubled worker slot occupancy on healthy + // peers and starved the dispatcher with many upstreams + bounded cap). + const sub_to = @max(@as(u64, 1_000), ctx.timeout_ms / 4); + ctx.is_aggregator = lean_api.fetchAggregatorOptional(ctx.allocator, &client, ctx.base_url, sub_to); + ctx.head_slot = lean_api.fetchHeadSlotOptional(ctx.allocator, &client, ctx.base_url, sub_to); + ctx.slots = slots; + ctx.state_ssz = state_ssz; + log.debug("Upstream {s}: justified={d}, finalized={d}", .{ + ctx.name, slots.justified_slot, slots.finalized_slot, + }); + } else |err| { + if (state_ssz) |s| ctx.allocator.free(s); ctx.error_msg = std.fmt.allocPrint(ctx.allocator, "{s}", .{@errorName(err)}) catch null; - ctx.done.store(true, .release); - return; - }; - - // Extra lean HTTP calls share part of the poll budget (avoid 3× full timeout). - const sub_to = @max(2_000, ctx.timeout_ms / 2); - ctx.is_aggregator = lean_api.fetchAggregatorOptional(ctx.allocator, &client, ctx.base_url, sub_to); - ctx.head_slot = lean_api.fetchHeadSlotOptional(ctx.allocator, &client, ctx.base_url, sub_to); + log.warn("Upstream {s} ({s}) failed: {s}", .{ ctx.name, ctx.base_url, @errorName(err) }); + } - ctx.slots = slots; - ctx.state_ssz = state_ssz; + // Publish results to the dispatcher (must come last; release ordering + // pairs with the dispatcher's acquire load before reading any field). ctx.done.store(true, .release); } - /// Poll all upstreams concurrently and return consensus slots if 50%+ agree. - /// - /// Each upstream is polled in its own OS thread. After spawning all threads - /// the function waits up to timeout_ms for them all to finish. Any thread - /// still running at the deadline is abandoned (detached); its PollCtx is - /// reference-counted so it cleans up itself when it eventually completes. + /// Wait until cond is signaled or the deadline elapses. Returns true if + /// the deadline has not yet passed (caller can re-check its predicate). + fn waitUntilDeadline(slot: *SlotState, deadline_ms: i64) bool { + const now = std.time.milliTimestamp(); + if (now >= deadline_ms) return false; + const remaining_ms: u64 = @intCast(deadline_ms - now); + // Cap each individual wait so we re-check the predicate periodically + // even if a signal is missed for some reason. + const wait_ms: u64 = @min(remaining_ms, @as(u64, 100)); + const wait_ns: u64 = wait_ms * std.time.ns_per_ms; + slot.cond.timedWait(&slot.mutex, wait_ns) catch {}; + return std.time.milliTimestamp() < deadline_ms; + } + + /// Poll upstreams with bounded concurrency + a hard per-tick deadline. + /// Returns consensus slots if 50%+ of responding upstreams agree. pub fn pollUpstreams( self: *UpstreamManager, - // client parameter kept for API compatibility but is no longer used here; - // each thread creates its own client to avoid shared-state data races. + // Kept for API compatibility; each worker creates its own client. _: *std.http.Client, now_ms: i64, timeout_ms: u64, + max_concurrency: u32, out_state_ssz: *?[]u8, ) ?lean_api.Slots { // Step 1: snapshot upstreams (brief lock) @@ -257,141 +283,115 @@ pub const UpstreamManager = struct { if (targets.items.len == 0) return null; - // Step 2: spawn one thread per upstream + const n = targets.items.len; + const max_w = @min(max_concurrency, 256); + const cap: usize = @max(1, @min(@as(usize, @intCast(max_w)), n)); + + // Per-tick deadline. A single worker can occupy its slot for up to + // ~timeout_ms (fetchSlots) + 2 × (timeout_ms / 4) (aux calls) ≈ + // 1.5 × timeout_ms; in the worst case (slow path on each call) closer + // to 2 × timeout_ms. We size the deadline assuming worst case so + // dispatch never starves with a bounded cap and slow peers, plus one + // extra timeout_ms for the final drain. + const batches: u64 = (@as(u64, n) + @as(u64, cap) - 1) / @as(u64, cap); + const per_worker_max_ms: u64 = timeout_ms * 2; + const round_ms: u64 = batches * per_worker_max_ms + timeout_ms; + const deadline_ms: i64 = now_ms + @as(i64, @intCast(round_ms)); + + if (cap < n) { + log.debug( + "Polling {d} upstreams with concurrency {d} (≤{d} OS threads, deadline ≤{d}ms)", + .{ n, cap, cap, round_ms }, + ); + } + + var slot = SlotState{ .cap = cap }; + var ctxs = std.ArrayList(*PollCtx).init(self.allocator); defer { - for (ctxs.items) |ctx| ctx.release(); // spawner releases its ref + for (ctxs.items) |ctx| ctx.release(); ctxs.deinit(); } + // Step 2: dispatch — wait for a free slot (or the deadline) before each spawn for (targets.items) |target| { - const ctx = PollCtx.create( - self.allocator, - target.index, - target.name, - target.base_url, - target.path, - timeout_ms, - ) catch |err| { + slot.mutex.lock(); + var slot_acquired = false; + while (slot.in_flight >= slot.cap) { + if (!waitUntilDeadline(&slot, deadline_ms)) break; + } + if (slot.in_flight < slot.cap) { + slot.in_flight += 1; + slot_acquired = true; + } + slot.mutex.unlock(); + + if (!slot_acquired) { + log.warn( + "Dispatch deadline reached before {s} could start; remaining upstreams skipped this tick", + .{target.name}, + ); + break; + } + + const ctx = PollCtx.create(self.allocator, &slot, target, timeout_ms) catch |err| { log.warn("Failed to allocate poll context for {s}: {s}", .{ target.name, @errorName(err) }); + slotRelease(&slot); continue; }; - const thread = std.Thread.spawn(.{}, pollUpstreamThread, .{ctx}) catch |err| { + const t = std.Thread.spawn(.{}, workerThread, .{ctx}) catch |err| { log.warn("Failed to spawn poll thread for {s}: {s}", .{ target.name, @errorName(err) }); - ctx.release(); // thread never ran — release thread's ref too - ctx.release(); // release spawner's ref + slotRelease(&slot); + ctx.release(); // worker ref (worker never started) + ctx.release(); // dispatcher ref continue; }; - thread.detach(); + t.detach(); ctxs.append(ctx) catch { - // append failed; ctx's thread is already running and holds its own ref, - // so just release the spawner's ref. + // Worker is already running and holds its own ref — just drop ours. ctx.release(); continue; }; } - if (ctxs.items.len == 0) return null; - - // Step 3: wait for all threads behind a hard deadline - const deadline_ms = now_ms + @as(i64, @intCast(timeout_ms)); - while (std.time.milliTimestamp() < deadline_ms) { - var all_done = true; - for (ctxs.items) |ctx| { - if (!ctx.done.load(.acquire)) { - all_done = false; - break; - } - } - if (all_done) break; - std.time.sleep(5 * std.time.ns_per_ms); + // Step 3: drain — wait for all in-flight workers to finish, or the deadline + slot.mutex.lock(); + while (slot.in_flight > 0) { + if (!waitUntilDeadline(&slot, deadline_ms)) break; } + slot.mutex.unlock(); - // Log any upstreams that timed out - for (ctxs.items) |ctx| { - if (!ctx.done.load(.acquire)) { - log.warn("Upstream {s} ({s}) timed out after {d}ms — detaching thread", .{ - ctx.name, ctx.base_url, timeout_ms, - }); - } - } - - // Step 4: collect results from completed threads and update upstream state - var results = std.ArrayList(PollResult).init(self.allocator); - defer { - for (results.items) |result| { - if (result.error_msg) |msg| self.allocator.free(msg); - if (result.state_ssz) |blob| self.allocator.free(blob); - } - results.deinit(); - } - - for (ctxs.items) |ctx| { - if (!ctx.done.load(.acquire)) { - // Timed out — record as an error so the upstream shows as failing - const error_msg = self.allocator.dupe(u8, "timeout") catch null; - results.append(PollResult{ - .index = ctx.index, - .slots = null, - .error_msg = error_msg, - .state_ssz = null, - .is_aggregator = null, - .head_slot = null, - }) catch continue; - continue; - } - - if (ctx.slots) |slots| { - log.debug("Upstream {s}: justified={d}, finalized={d}", .{ - ctx.name, slots.justified_slot, slots.finalized_slot, - }); - // Transfer ownership of SSZ blob out of the ctx before ctx.release() - // is called by the defer above. We null it in ctx to avoid double-free. - const ssz = ctx.state_ssz; - ctx.state_ssz = null; - results.append(PollResult{ - .index = ctx.index, - .slots = slots, - .error_msg = null, - .state_ssz = ssz, - .is_aggregator = ctx.is_aggregator, - .head_slot = ctx.head_slot, - }) catch continue; - } else { - const err_copy = if (ctx.error_msg) |m| self.allocator.dupe(u8, m) catch null else null; - log.warn("Upstream {s} ({s}) failed: {s}", .{ - ctx.name, - ctx.base_url, - ctx.error_msg orelse "unknown", - }); - results.append(PollResult{ - .index = ctx.index, - .slots = null, - .error_msg = err_copy, - .state_ssz = null, - .is_aggregator = null, - .head_slot = null, - }) catch continue; - } - } - - // Step 5: Update upstream states (brief lock) + // Step 4: collect results and update per-upstream state under our lock. var slot_counts = std.AutoHashMap(u128, u32).init(self.allocator); defer slot_counts.deinit(); var successful_polls: u32 = 0; + var timed_out: u32 = 0; { self.mutex.lock(); defer self.mutex.unlock(); - for (results.items, 0..) |*result, i| { - if (result.index >= self.upstreams.items.len) continue; - var upstream = &self.upstreams.items[result.index]; + for (ctxs.items) |ctx| { + if (ctx.target_index >= self.upstreams.items.len) continue; + var upstream = &self.upstreams.items[ctx.target_index]; + + // Acquire ordering: pairs with the worker's release store on `done`. + const finished = ctx.done.load(.acquire); + + if (!finished) { + timed_out += 1; + upstream.error_count += 1; + if (upstream.last_error) |old_err| self.allocator.free(old_err); + upstream.last_error = self.allocator.dupe(u8, "timeout (worker abandoned)") catch null; + upstream.is_aggregator = null; + upstream.head_slot = null; + continue; + } - if (result.slots) |slots| { + if (ctx.slots) |slots| { if (upstream.last_error) |old_err| { self.allocator.free(old_err); upstream.last_error = null; @@ -399,8 +399,8 @@ pub const UpstreamManager = struct { upstream.error_count = 0; upstream.last_slots = slots; upstream.last_success_ms = now_ms; - upstream.is_aggregator = result.is_aggregator; - upstream.head_slot = result.head_slot; + upstream.is_aggregator = ctx.is_aggregator; + upstream.head_slot = ctx.head_slot; const slot_key: u128 = (@as(u128, slots.justified_slot) << 64) | @as(u128, slots.finalized_slot); const count = slot_counts.get(slot_key) orelse 0; @@ -410,15 +410,22 @@ pub const UpstreamManager = struct { } else { upstream.error_count += 1; if (upstream.last_error) |old_err| self.allocator.free(old_err); - upstream.last_error = result.error_msg; + if (ctx.error_msg) |msg| { + upstream.last_error = self.allocator.dupe(u8, msg) catch null; + } else { + upstream.last_error = self.allocator.dupe(u8, "unknown error") catch null; + } upstream.is_aggregator = null; upstream.head_slot = null; - results.items[i].error_msg = null; // ownership transferred } } } - // Step 6: consensus (no lock needed) + if (timed_out > 0) { + log.warn("{d}/{d} upstream polls did not finish within deadline (continuing)", .{ timed_out, ctxs.items.len }); + } + + // Step 5: consensus (no lock needed) if (successful_polls == 0) { log.warn("No upstreams responded successfully", .{}); return null; @@ -433,12 +440,16 @@ pub const UpstreamManager = struct { const justified_slot: u64 = @truncate(slot_key >> 64); const finalized_slot: u64 = @truncate(slot_key & 0xFFFFFFFFFFFFFFFF); - for (results.items, 0..) |*res, i| { - if (res.slots) |s| { + // Hand out one matching SSZ blob. Move ownership out of the + // ctx so PollCtx.release() doesn't free it; remaining ctxs' + // ssz blobs are freed normally on release. + for (ctxs.items) |ctx| { + if (!ctx.done.load(.acquire)) continue; + if (ctx.slots) |s| { if (s.justified_slot == justified_slot and s.finalized_slot == finalized_slot) { - if (res.state_ssz) |blob| { + if (ctx.state_ssz) |blob| { out_state_ssz.* = blob; - results.items[i].state_ssz = null; + ctx.state_ssz = null; break; } }