From c4b7796c6017432ba0462c9055aeed0a12fd0321 Mon Sep 17 00:00:00 2001 From: ch4r10t33r Date: Sun, 12 Apr 2026 16:50:43 +0100 Subject: [PATCH] broadcast,transport: EngineQuicHost wires PeerConn SESS/CHUNK into Engine Add EngineQuicHost (engine_quic.zig): inbound SESS session_open decodes preamble and attachRelaySession; CHUNK frames call relayIngestChunkVerifiedEngine after finishBcastHandshakeRead sets remote_peer_id. PeerConn callbacks take user_data for embedder context. Engine.channelRs public lookup. Integration test in eth_ec_quic_enabled. Document in README/UPSTREAM; note outbound QUIC chunk path still future work. Closes #37 --- README.md | 7 +- UPSTREAM.md | 2 +- src/broadcast/engine.zig | 5 + src/broadcast/engine_quic.zig | 204 +++++++++++++++++++++++++ src/root.zig | 3 + src/transport/eth_ec_quic_enabled.zig | 212 ++++++++++++++++++++++++++ src/transport/eth_ec_quic_peer.zig | 21 +-- 7 files changed, 441 insertions(+), 13 deletions(-) create mode 100644 src/broadcast/engine_quic.zig diff --git a/README.md b/README.md index 218d5b8..21236b3 100644 --- a/README.md +++ b/README.md @@ -39,7 +39,7 @@ Zig helpers for the wire formats of **[ethp2p](https://github.com/ethp2p/ethp2p) | `PartialMessagesExtension` (nested in `RPC.partial`) | libp2p `rpc.proto` field 10 body | `encodePartialMessagesExtension`, `decodePartialMessagesExtensionOwned` | | Unsigned-varint length prefix before `RPC` body | common libp2p framing | `encodeRpcLengthPrefixed`, `decodeRpcLengthPrefixedPrefix` | | In-process duplex for length-prefixed `RPC` (simnet-style, no TCP/QUIC) | pair of `Endpoint`s over bounded byte queues | `sim.gossipsub_rpc_host` (`Link`, `Endpoint.sendRpc` / `recvRpcOwned`) | -| QUIC transport + UNI stream alignment | [`sim/host.go`](https://github.com/ethp2p/ethp2p/blob/main/sim/host.go), `peer.go`, `peer_ctrl.go`, `peer_in.go` | `transport.eth_ec_quic`: IETF QUIC, TLS 1.3, ALPN `eth-ec-broadcast`, **unidirectional** BCAST/SESS/CHUNK streams matching `OpenUniStream`/`AcceptUniStream`; `PeerConn` lifecycle sketch. See [QUIC transport](#quic-transport-lsquic-build). | +| QUIC transport + UNI stream alignment | [`sim/host.go`](https://github.com/ethp2p/ethp2p/blob/main/sim/host.go), `peer.go`, `peer_ctrl.go`, `peer_in.go` | `transport.eth_ec_quic`: IETF QUIC, TLS 1.3, ALPN `eth-ec-broadcast`, **unidirectional** BCAST/SESS/CHUNK streams matching `OpenUniStream`/`AcceptUniStream`; `PeerConn` + `broadcast.engine_quic` (`EngineQuicHost`) wire inbound SESS/CHUNK into `Engine` / `ChannelRs`. See [QUIC transport](#quic-transport-lsquic-build). | | **Still open** | — | [Pending work](#pending-work) | ## Scope on `main` (at a glance) @@ -52,7 +52,7 @@ This is **what is already implemented** — not the backlog. Per-module mapping - **EC scheme id:** `layer.ec_scheme` (`EcSchemeKind`, `"reed-solomon"` wire name); only Reed–Solomon is wired end-to-end ([#14](https://github.com/ch4r10t33r/zig-ethp2p/issues/14) tracks RLNC and further schemes). - **Abstract RS mesh:** heap-backed graphs and `PeerSessionStats` (`sim.rs_mesh`): 2-node, 4-node ring, 6-node `TestNetwork`-style topology, **partition/heal** line test, chunk-len variant; with `ZIG_ETHP2P_STRESS=1`, larger six-node budget plus **8-** and **16-node** rings. - **Gossipsub (sim / wire helpers):** transport, protocol, broadcast, interop, `RPC` encode/decode (including **`partial`** / `PartialMessagesExtension`), full `ControlMessage`, varint length prefix, in-process **`gossipsub_rpc_host`** for tests (`sim.gossipsub_*`, `broadcast.gossip`). -- **QUIC:** `transport.eth_ec_quic` — IETF QUIC, TLS 1.3, ALPN `eth-ec-broadcast`, **unidirectional** BCAST/SESS/CHUNK streams matching the ethp2p Go reference (`peer.go` `OpenUniStream`/`AcceptUniStream`). `PeerConn` poll-driven lifecycle sketch included. See [QUIC transport](#quic-transport-lsquic-build). +- **QUIC:** `transport.eth_ec_quic` — IETF QUIC, TLS 1.3, ALPN `eth-ec-broadcast`, **unidirectional** BCAST/SESS/CHUNK streams matching the ethp2p Go reference (`peer.go` `OpenUniStream`/`AcceptUniStream`). `PeerConn` is poll-driven; `broadcast.engine_quic.EngineQuicHost` connects inbound streams to the RS channel path. See [QUIC transport](#quic-transport-lsquic-build). - **CI:** aligned with [ethp2p's `ci.yml`](https://github.com/ethp2p/ethp2p/blob/main/.github/workflows/ci.yml): `zig build test-broadcast`, `test-sim-rs`, `test-sim-gossipsub` (Debug + TSan), `test-quic` (**`quic-transport`** job: vendored TLS, **45m** job timeout + **`timeout 40m`** on the command so a hung poll loop cannot exhaust the runner), `test-stress-ci` on **`main` only**, plus lint (`zig fmt --check`, `zig build`, `zig ast-check`). `build.zig.zon` **`minimum_zig_version`** must match workflow **`ZIG_VERSION`**; `just check-zig-ci-align` checks that locally. - **One-shot local verification:** `zig build test` runs the full suite. @@ -69,6 +69,7 @@ lsquic + BoringSSL are always compiled — no build flag is needed. **Windows** | `eth_ec_quic_common.zig` / `eth_ec_quic_enabled.zig` | Shared config and ALPN string; **enabled** path implements `listenImpl` / `dialImpl` and integration tests. | | `eth_ec_quic.zig` | Public `transport.eth_ec_quic`: `listen`, `dial`, `logInit` (programmatic lsquic logger), listener wrapper. | | `eth_ec_quic_peer.zig` | `PeerConn` poll-driven state machine: `idle → handshaking → active → closed`; symmetric BCAST UNI handshake + `runAcceptLoop` dispatch by protocol selector byte. | +| `broadcast/engine_quic.zig` | `EngineQuicHost`: SESS `session_open` → `ChannelRs.attachRelaySession`, CHUNK → `relayIngestChunkVerifiedEngine` (issue #37). | | `vendor/lsquic_zig/patch_uni.sh` | Build-time patch that removes `static` from `create_uni_stream_out` in lsquic and appends a public `lsquic_conn_make_uni_stream()` wrapper (lsquic 4.3 has no public API for outgoing UNI streams). | **Operation** @@ -90,7 +91,7 @@ Call **`quic.logInit("debug")`** (or any `lsquic_set_log_level` level) to enable ## Pending work -- **`PeerConn` wiring:** connect `eth_ec_quic_peer.zig` to `Engine`/channel tables (currently a lifecycle sketch only). +- **Outbound chunk path:** origin `ChannelRs` publish/drain over QUIC (sending CHUNK streams to peers) is not wired yet; inbound relay via `EngineQuicHost` is implemented (#37). - **Erasure coding:** **RLNC** (strategy, preamble, chunk layout) and any further `Scheme` types beyond Reed–Solomon ([#14](https://github.com/ch4r10t33r/zig-ethp2p/issues/14)). - **libp2p:** Noise handshake, multistream-select, Yamux, identify — handled by zeam's rust-libp2p layer; out of scope for this repo. diff --git a/UPSTREAM.md b/UPSTREAM.md index 3480e9f..9995d75 100644 --- a/UPSTREAM.md +++ b/UPSTREAM.md @@ -77,7 +77,7 @@ Zig alignment: - `lsquic_quic_shim.zig` detects stream type via `lsquic_stream_id() & 0x2` (bit 1 = unidirectional per RFC 9000 §2.1) - `incoming_uni_streams` queue holds peer-initiated UNI streams; `tryAcceptIncomingUniStream` pops them - `streamMakeUni` opens an outgoing UNI stream via `lsquic_conn_make_uni_stream` -- `eth_ec_quic_peer.zig` sketches the `PeerConn` poll-driven state machine (handshake + accept-loop) +- `eth_ec_quic_peer.zig` implements the `PeerConn` poll-driven state machine (handshake + accept-loop); `broadcast/engine_quic.zig` (`EngineQuicHost`) forwards inbound SESS/CHUNK into `Engine` / `ChannelRs` (#37) ### lsquic vendor patch diff --git a/src/broadcast/engine.zig b/src/broadcast/engine.zig index e83d18c..7655782 100644 --- a/src/broadcast/engine.zig +++ b/src/broadcast/engine.zig @@ -74,6 +74,11 @@ pub const Engine = struct { try self.channels.put(self.allocator, key, ch); return ch; } + + /// Look up an RS channel by id (e.g. QUIC `EngineQuicHost` inbound routing). + pub fn channelRs(self: *Engine, channel_id: []const u8) ?*ChannelRs { + return self.channels.get(channel_id); + } }; pub const Error = error{ChannelExists}; diff --git a/src/broadcast/engine_quic.zig b/src/broadcast/engine_quic.zig new file mode 100644 index 0000000..cf357fd --- /dev/null +++ b/src/broadcast/engine_quic.zig @@ -0,0 +1,204 @@ +//! Wire `transport/eth_ec_quic_peer.zig` `PeerConn` inbound SESS/CHUNK streams into +//! `broadcast/engine.zig` `Engine` / `ChannelRs` (issue #37). +//! +//! After TLS + BCAST handshake, call `wireEngine`, then `finishBcastHandshakeRead` +//! to capture the peer id. Drive `quic.poll` on both endpoints, then `PeerConn.drive` +//! (via `engineQuicDrive`) so inbound SESS opens relay sessions and CHUNK frames +//! call `relayIngestChunkVerifiedEngine`. + +const std = @import("std"); +const quic = @import("quic"); +const peer_mod = @import("../transport/eth_ec_quic_peer.zig"); +const Engine = @import("engine.zig").Engine; +const rs_strategy = @import("../layer/rs_strategy.zig"); +const chunk_stream = @import("../wire/chunk_stream.zig"); +const sess_stream = @import("../wire/sess_stream.zig"); +const bcast_stream = @import("../wire/bcast_stream.zig"); +const protocol = @import("../wire/protocol.zig"); +const wire_rs = @import("../wire/rs.zig"); + +const PeerConn = peer_mod.PeerConn; + +/// Bridges one QUIC `PeerConn` to an `Engine` for inbound RS relay traffic. +pub const EngineQuicHost = struct { + engine: *Engine, + allocator: std.mem.Allocator, + peer: PeerConn, + /// Set by `finishBcastHandshakeRead` from the peer's BCAST `peer_handshake`. + remote_peer_id: []u8 = &.{}, + /// Peer endpoint to co-poll while draining UNI streams (loopback tests). + peer_ep: ?*quic.QuicEndpoint = null, + + pub fn init(allocator: std.mem.Allocator, engine: *Engine, conn: *quic.QuicConnection, ep: *quic.QuicEndpoint) EngineQuicHost { + return .{ + .engine = engine, + .allocator = allocator, + .peer = PeerConn.init(allocator, conn, ep), + }; + } + + pub fn deinit(self: *EngineQuicHost) void { + self.peer.close(); + if (self.remote_peer_id.len != 0) { + self.allocator.free(self.remote_peer_id); + } + } + + /// Install SESS/CHUNK handlers that forward into `Engine` channels. + pub fn wireEngine(self: *EngineQuicHost) void { + self.peer.user_data = @ptrCast(self); + self.peer.on_sess_stream = engineQuicOnSessStream; + self.peer.on_chunk_stream = engineQuicOnChunkStream; + } + + pub fn setPeerEndpoint(self: *EngineQuicHost, peer_ep: ?*quic.QuicEndpoint) void { + self.peer_ep = peer_ep; + } + + /// After `PeerConn` reaches `.active`, read our inbound BCAST stream and + /// store the peer's `peer_id` (for chunk/session relay attribution). + pub fn finishBcastHandshakeRead(self: *EngineQuicHost) !void { + const st = self.peer.bcast_in orelse return error.MissingBcastIn; + const buf = try drainUniStream(self.allocator, st, self.peer.ep, self.peer_ep); + defer self.allocator.free(buf); + var fbs = std.io.fixedBufferStream(buf); + var owned = try bcast_stream.readBcastPeerHandshake(self.allocator, fbs.reader()); + defer owned.deinit(self.allocator); + switch (owned) { + .peer_handshake => |h| { + if (self.remote_peer_id.len != 0) self.allocator.free(self.remote_peer_id); + self.remote_peer_id = try self.allocator.dupe(u8, h.peer_id); + }, + else => return error.ExpectedPeerHandshake, + } + } + + pub fn drive(self: *EngineQuicHost) bool { + return self.peer.drive(); + } +}; + +fn engineQuicOnSessStream(user_data: ?*anyopaque, pc: *PeerConn, st: *quic.QuicStream) void { + _ = pc; + const host: *EngineQuicHost = @ptrCast(@alignCast(user_data orelse return)); + handleSessStream(host, st) catch {}; +} + +fn engineQuicOnChunkStream(user_data: ?*anyopaque, pc: *PeerConn, st: *quic.QuicStream) void { + _ = pc; + const host: *EngineQuicHost = @ptrCast(@alignCast(user_data orelse return)); + handleChunkStream(host, st) catch {}; +} + +fn preambleOwnedToRs(allocator: std.mem.Allocator, owned: wire_rs.PreambleOwned) (std.mem.Allocator.Error || error{InvalidPreambleHash})!rs_strategy.RsPreamble { + if (owned.hash.len != 32) { + var o = owned; + o.deinit(allocator); + return error.InvalidPreambleHash; + } + var msg_hash: [32]u8 = undefined; + @memcpy(&msg_hash, owned.hash); + + const n = owned.hashes.len; + const nd = owned.num_data; + const np = owned.num_parity; + const ml = owned.length; + + const hashes = try allocator.alloc([]u8, n); + errdefer { + for (hashes) |row| allocator.free(row); + allocator.free(hashes); + } + for (owned.hashes, 0..) |h, i| { + hashes[i] = try allocator.dupe(u8, h); + } + var o = owned; + o.deinit(allocator); + + return .{ + .data_chunks = nd, + .parity_chunks = np, + .message_length = ml, + .chunk_hashes = hashes, + .message_hash = msg_hash, + }; +} + +fn handleSessStream(host: *EngineQuicHost, st: *quic.QuicStream) !void { + const buf = try drainUniStream(host.allocator, st, host.peer.ep, host.peer_ep); + defer host.allocator.free(buf); + var fbs = std.io.fixedBufferStream(buf); + const r = fbs.reader(); + const sel = try protocol.readSelectorByte(r); + if (sel != .sess) return; + var open_msg = try sess_stream.readSessSessionOpenAfterSelector(host.allocator, r); + defer open_msg.deinit(host.allocator); + const open = switch (open_msg) { + .session_open => |o| o, + else => return, + }; + + const preamble_owned = try wire_rs.decodePreamble(host.allocator, open.preamble); + var rs_pre = try preambleOwnedToRs(host.allocator, preamble_owned); + errdefer rs_pre.deinit(host.allocator); + + const ch = host.engine.channelRs(open.channel) orelse return error.UnknownChannel; + try ch.attachRelaySession(open.message_id, &rs_pre); + rs_pre.deinit(host.allocator); +} + +fn handleChunkStream(host: *EngineQuicHost, st: *quic.QuicStream) !void { + if (host.remote_peer_id.len == 0) return error.MissingRemotePeerId; + + const buf = try drainUniStream(host.allocator, st, host.peer.ep, host.peer_ep); + defer host.allocator.free(buf); + var fbs = std.io.fixedBufferStream(buf); + var chunk_in = try chunk_stream.readChunkStream(host.allocator, fbs.reader()); + defer chunk_in.deinit(host.allocator); + + const ch = host.engine.channelRs(chunk_in.header.channel) orelse return error.UnknownChannel; + + const ident = try wire_rs.decodeChunkIdent(host.allocator, chunk_in.header.chunk_id); + + _ = try ch.relayIngestChunkVerifiedEngine( + chunk_in.header.message_id, + host.remote_peer_id, + .{ .index = ident.index }, + chunk_in.payload, + null, + ); +} + +/// Poll until a UNI stream's buffered length stabilizes, then copy out. +fn drainUniStream( + allocator: std.mem.Allocator, + st: *quic.QuicStream, + ep: *quic.QuicEndpoint, + peer: ?*quic.QuicEndpoint, +) ![]u8 { + var i: u32 = 0; + while (i < 30_000) : (i += 1) { + try quic.poll(ep, 0); + if (peer) |p| try quic.poll(p, 0); + const raw = quic.streamReadSlice(st); + if (raw.len == 0) continue; + + var last = raw.len; + var stable: u32 = 0; + var j: u32 = 0; + while (j < 2_000) : (j += 1) { + try quic.poll(ep, 0); + if (peer) |p| try quic.poll(p, 0); + const r2 = quic.streamReadSlice(st); + if (r2.len == last) { + stable += 1; + if (stable >= 2) return try allocator.dupe(u8, r2); + } else { + last = r2.len; + stable = 0; + } + } + return try allocator.dupe(u8, quic.streamReadSlice(st)); + } + return error.StreamDrainTimeout; +} diff --git a/src/root.zig b/src/root.zig index 4dab2fb..e3818ee 100644 --- a/src/root.zig +++ b/src/root.zig @@ -41,12 +41,14 @@ pub const layer = struct { /// QUIC listen/dial — see `transport/eth_ec_quic.zig` and README; mapping streams to `wire.*` is issue **#27**. pub const transport = struct { pub const eth_ec_quic = @import("transport/eth_ec_quic.zig"); + pub const eth_ec_quic_peer = @import("transport/eth_ec_quic_peer.zig"); pub const shared_udp_socket = @import("transport/shared_udp_socket.zig"); }; pub const broadcast = struct { pub const observer = @import("broadcast/observer.zig"); pub const engine = @import("broadcast/engine.zig"); + pub const engine_quic = @import("broadcast/engine_quic.zig"); pub const channel_rs = @import("broadcast/channel_rs.zig"); pub const session_rs = @import("broadcast/session_rs.zig"); pub const gossip = @import("broadcast/gossip.zig"); @@ -75,5 +77,6 @@ test { _ = broadcast.observer; _ = broadcast.gossip; _ = broadcast.relay_async_verify; + _ = broadcast.engine_quic; _ = discovery; } diff --git a/src/transport/eth_ec_quic_enabled.zig b/src/transport/eth_ec_quic_enabled.zig index 550a82b..b043b84 100644 --- a/src/transport/eth_ec_quic_enabled.zig +++ b/src/transport/eth_ec_quic_enabled.zig @@ -7,6 +7,13 @@ const test_certs = @import("eth_ec_quic_test_certs.zig"); const bcast_stream = @import("../wire/bcast_stream.zig"); const sess_stream = @import("../wire/sess_stream.zig"); const protocol = @import("../wire/protocol.zig"); +const chunk_stream = @import("../wire/chunk_stream.zig"); +const wire_rs = @import("../wire/rs.zig"); +const Engine = @import("../broadcast/engine.zig").Engine; +const engine_quic = @import("../broadcast/engine_quic.zig"); +const peer_mod = @import("eth_ec_quic_peer.zig"); +const rs_init = @import("../layer/rs_init.zig"); +const rs_strategy = @import("../layer/rs_strategy.zig"); fn acceptIncomingQuicStream( conn: *quic.QuicConnection, @@ -388,3 +395,208 @@ test "QUIC UNI streams: symmetric BCAST handshake + SESS session_open (wire fram quic.destroy(srv, sc); quic.destroy(client_ep, conn); } + +// EngineQuicHost: inbound SESS/CHUNK into Engine / ChannelRs (issue #37). +test "QUIC EngineQuicHost SESS session_open + CHUNK relay ingest" { + if (@import("builtin").os.tag == .windows) return error.SkipZigTest; + if (@import("builtin").os.tag == .wasi) return error.SkipZigTest; + + var gpa = std.heap.GeneralPurposeAllocator(.{}){}; + defer _ = gpa.deinit(); + var alloc = gpa.allocator(); + + const srv_cfg = common.EthEcQuicConfig{ + .server_certificate_pem_path = "src/transport/testdata/zethp2p_cert.pem", + .server_private_key_pem_path = "src/transport/testdata/zethp2p_key.pem", + .tls_insecure_skip_verify = true, + }; + var alpn_srv = [_][]const u8{common.alpn_eth_ec_broadcast}; + var qc_srv = toQuicConfig(srv_cfg, &alpn_srv); + var server_ep: ?*quic.QuicEndpoint = null; + var sport: u16 = 0; + for (0..64) |i| { + const p: u16 = @intCast(45300 + i); + const bind_s = try std.fmt.allocPrint(alloc, "127.0.0.1:{d}", .{p}); + defer alloc.free(bind_s); + server_ep = quic.endpointInit(&alloc, bind_s, &qc_srv) catch |err| switch (err) { + error.AddressInUse, error.AddressNotAvailable => continue, + else => |e| return e, + }; + sport = p; + break; + } + const srv = server_ep orelse return error.NoBindPort; + defer quic.endpointDeinit(srv); + + var alpn_cli = [_][]const u8{common.alpn_eth_ec_broadcast}; + var qc_cli = quic.QuicConfig{ + .alpn = &alpn_cli, + .allow_insecure = true, + .max_idle_timeout_ms = maxIdleTimeoutMs(common.EthEcQuicConfig.default().max_idle_timeout_ns), + .max_udp_payload = 1350, + }; + const client_ep = try quic.endpointInit(&alloc, "127.0.0.1:0", &qc_cli); + defer quic.endpointDeinit(client_ep); + + const remote_s = try std.fmt.allocPrint(alloc, "127.0.0.1:{d}", .{sport}); + defer alloc.free(remote_s); + + const conn = try quic.connect(client_ep, remote_s, test_certs.tls_server_name); + errdefer quic.destroy(client_ep, conn); + + var server_conn: ?*quic.QuicConnection = null; + var rounds: u32 = 0; + while (rounds < 30_000) : (rounds += 1) { + try quic.poll(srv, 0); + try quic.poll(client_ep, 0); + if (server_conn == null) server_conn = quic.tryAccept(srv); + if (quic.handshakeComplete(conn)) { + if (server_conn) |c| { + if (quic.handshakeComplete(c)) break; + } + } + } + const sc = server_conn orelse return error.MissingServerConnection; + try std.testing.expect(quic.handshakeComplete(conn)); + try std.testing.expect(quic.handshakeComplete(sc)); + + const cfg = rs_init.RsConfig{ + .data_shards = 4, + .parity_shards = 2, + .chunk_len = 0, + .bitmap_threshold = 0, + .forward_multiplier = 4, + .disable_bitmap = false, + }; + + var engine = try Engine.init(alloc, "server-local", .{}); + defer engine.deinit(); + + const ch = try engine.attachChannelRs("ch1", cfg); + try ch.addMember("client-peer"); + + var host = engine_quic.EngineQuicHost.init(alloc, &engine, sc, srv); + defer host.deinit(); + host.wireEngine(); + host.setPeerEndpoint(client_ep); + + var cli_pc = peer_mod.PeerConn.init(alloc, conn, client_ep); + + try host.peer.beginHandshake(client_ep, .{ + .version = 1, + .channels = &.{"ch1"}, + .peer_id = "server-peer", + }); + try cli_pc.beginHandshake(srv, .{ + .version = 1, + .channels = &.{"ch1"}, + .peer_id = "client-peer", + }); + + var srv_bcast_payload = std.ArrayList(u8).empty; + defer srv_bcast_payload.deinit(alloc); + { + const w = srv_bcast_payload.writer(alloc); + try bcast_stream.writeBcastHandshakeOpen(w, alloc, .{ + .version = 1, + .channels = &.{"ch1"}, + .peer_id = "server-peer", + }); + } + try quic.streamQueueWrite(host.peer.bcast_out.?, srv_bcast_payload.items); + try quic.streamDrainWrites(host.peer.bcast_out.?, client_ep, 10_000); + + var cli_bcast_payload = std.ArrayList(u8).empty; + defer cli_bcast_payload.deinit(alloc); + { + const w = cli_bcast_payload.writer(alloc); + try bcast_stream.writeBcastHandshakeOpen(w, alloc, .{ + .version = 1, + .channels = &.{"ch1"}, + .peer_id = "client-peer", + }); + } + try quic.streamQueueWrite(cli_pc.bcast_out.?, cli_bcast_payload.items); + try quic.streamDrainWrites(cli_pc.bcast_out.?, srv, 10_000); + + rounds = 0; + while (rounds < 30_000) : (rounds += 1) { + try quic.poll(srv, 0); + try quic.poll(client_ep, 0); + _ = host.peer.drive(); + _ = cli_pc.drive(); + if (host.peer.state == .active and cli_pc.state == .active) break; + } + try std.testing.expectEqual(peer_mod.PeerConnState.active, host.peer.state); + try std.testing.expectEqual(peer_mod.PeerConnState.active, cli_pc.state); + + try host.finishBcastHandshakeRead(); + try std.testing.expectEqualStrings("client-peer", host.remote_peer_id); + + const payload = [_]u8{ 9, 8, 7, 6, 5 }; + var origin = try rs_strategy.RsStrategy.newOrigin(alloc, cfg, &payload); + defer origin.deinit(); + + const hash_slices = try alloc.alloc([]const u8, origin.preamble.chunk_hashes.len); + defer alloc.free(hash_slices); + for (origin.preamble.chunk_hashes, 0..) |row, i| { + hash_slices[i] = row; + } + const pre_bytes = try wire_rs.encodePreamble(alloc, .{ + .num_data = origin.preamble.data_chunks, + .num_parity = origin.preamble.parity_chunks, + .length = origin.preamble.message_length, + .hashes = hash_slices, + .hash = &origin.preamble.message_hash, + }); + defer alloc.free(pre_bytes); + + const cli_sess = try quic.streamMakeUni(conn, srv); + var sess_pl = std.ArrayList(u8).empty; + defer sess_pl.deinit(alloc); + { + const w = sess_pl.writer(alloc); + try sess_stream.writeSessSessionOpen(w, alloc, .{ + .channel = "ch1", + .message_id = "m1", + .preamble = pre_bytes, + .initial_update = &.{}, + }); + } + try quic.streamQueueWrite(cli_sess, sess_pl.items); + try quic.streamDrainWrites(cli_sess, srv, 10_000); + + rounds = 0; + while (rounds < 20_000) : (rounds += 1) { + try quic.poll(srv, 0); + try quic.poll(client_ep, 0); + _ = host.peer.drive(); + if (ch.sessionStrategy("m1") != null) break; + } + try std.testing.expect(ch.sessionStrategy("m1") != null); + + const cli_chunk = try quic.streamMakeUni(conn, srv); + var chunk_pl = std.ArrayList(u8).empty; + defer chunk_pl.deinit(alloc); + { + const w = chunk_pl.writer(alloc); + try chunk_stream.writeRsShardChunk(w, alloc, "ch1", "m1", 0, origin.chunks[0]); + } + try quic.streamQueueWrite(cli_chunk, chunk_pl.items); + try quic.streamDrainWrites(cli_chunk, srv, 10_000); + + rounds = 0; + while (rounds < 20_000) : (rounds += 1) { + try quic.poll(srv, 0); + try quic.poll(client_ep, 0); + _ = host.peer.drive(); + const st = ch.sessionStrategy("m1") orelse break; + if (st.progress().have > 0) break; + } + + const st_final = ch.sessionStrategy("m1") orelse return error.MissingSession; + try std.testing.expect(st_final.progress().have > 0); + + quic.destroy(srv, sc); + quic.destroy(client_ep, conn); +} diff --git a/src/transport/eth_ec_quic_peer.zig b/src/transport/eth_ec_quic_peer.zig index 9e4a6e1..ff341a7 100644 --- a/src/transport/eth_ec_quic_peer.zig +++ b/src/transport/eth_ec_quic_peer.zig @@ -4,9 +4,9 @@ //! in the Go reference without goroutines. Callers drive progress by calling //! `poll` on the underlying `QuicEndpoint` then calling `drive()` on this struct. //! -//! **Sketch only** — not wired to an Engine or channel table yet. Demonstrates -//! that the symmetric BCAST handshake and `runAcceptLoop` pattern are achievable -//! in a single-threaded, poll-driven Zig context. +//! SESS/CHUNK inbound streams can be forwarded into `broadcast.Engine` via +//! `broadcast/engine_quic.zig` (`EngineQuicHost`). Callbacks receive an optional +//! `user_data` pointer for embedder context. const std = @import("std"); const quic = @import("quic"); @@ -45,10 +45,13 @@ pub const PeerConn = struct { allocator: std.mem.Allocator, - /// Optional callback for inbound SESS streams. Set by the owning Engine. - on_sess_stream: ?*const fn (self: *PeerConn, st: *quic.QuicStream) void = null, - /// Optional callback for inbound CHUNK streams. Set by the owning Engine. - on_chunk_stream: ?*const fn (self: *PeerConn, st: *quic.QuicStream) void = null, + /// Opaque context passed to stream callbacks (e.g. `*EngineQuicHost`). + user_data: ?*anyopaque = null, + + /// Optional callback for inbound SESS streams. + on_sess_stream: ?*const fn (user_data: ?*anyopaque, pc: *PeerConn, st: *quic.QuicStream) void = null, + /// Optional callback for inbound CHUNK streams. + on_chunk_stream: ?*const fn (user_data: ?*anyopaque, pc: *PeerConn, st: *quic.QuicStream) void = null, pub fn init( allocator: std.mem.Allocator, @@ -149,10 +152,10 @@ pub const PeerConn = struct { if (self.bcast_in == null) self.bcast_in = st; }, .sess => { - if (self.on_sess_stream) |cb| cb(self, st) else quic.streamCancelRead(st); + if (self.on_sess_stream) |cb| cb(self.user_data, self, st) else quic.streamCancelRead(st); }, .chunk => { - if (self.on_chunk_stream) |cb| cb(self, st) else quic.streamCancelRead(st); + if (self.on_chunk_stream) |cb| cb(self.user_data, self, st) else quic.streamCancelRead(st); }, else => { // Unknown protocol — cancel to release flow-control credit.