diff --git a/README.md b/README.md index 9ccdb53..06be021 100644 --- a/README.md +++ b/README.md @@ -90,7 +90,7 @@ All ethp2p application protocols use UNI streams — both peers independently op ## Pending work -- **Outbound chunk path:** origin `ChannelRs` publish/drain over QUIC (sending CHUNK streams to peers) is not wired yet; inbound relay via `EngineQuicHost` is implemented (#37). +- **Outbound chunk path:** use `broadcast.engine_quic.peerSendRsChunk` with `ChannelRs.sessionDrainOutboundOverQuic` / `SessionRs.drainOutboundOverQuic` (integration test in `eth_ec_quic_enabled.zig`). Inbound relay via `EngineQuicHost` is implemented (#37). - **Erasure coding:** **RLNC** (strategy, preamble, chunk layout) and any further `Scheme` types beyond Reed–Solomon ([#14](https://github.com/ch4r10t33r/zig-ethp2p/issues/14)). - **libp2p:** Noise handshake, multistream-select, Yamux, identify — handled by zeam's rust-libp2p layer; out of scope for this repo. diff --git a/src/broadcast/channel_rs.zig b/src/broadcast/channel_rs.zig index 012ae1b..97ba765 100644 --- a/src/broadcast/channel_rs.zig +++ b/src/broadcast/channel_rs.zig @@ -8,9 +8,12 @@ const std = @import("std"); const broadcast_types = @import("../layer/broadcast_types.zig"); const dedup_mod = @import("../layer/dedup.zig"); const dedup_registry_mod = @import("../layer/dedup_registry.zig"); +const emit_planner = @import("../layer/emit_planner.zig"); const rs_init = @import("../layer/rs_init.zig"); const rs_strategy = @import("../layer/rs_strategy.zig"); const Engine = @import("engine.zig").Engine; +const errors = @import("errors.zig"); +const SendRsChunkFn = @import("session_rs.zig").SendRsChunkFn; const SessionRs = @import("session_rs.zig").SessionRs; const Allocator = std.mem.Allocator; @@ -71,7 +74,7 @@ pub const ChannelRs = struct { /// Origin session: encodes `payload` and attaches current members. pub fn publish(self: *ChannelRs, message_id: []const u8, payload: []const u8) !void { - if (self.sessions.get(message_id) != null) return error.DuplicateMessage; + if (self.sessions.get(message_id) != null) return error.InvalidMessage; const mid = try self.allocator.dupe(u8, message_id); errdefer self.allocator.free(mid); @@ -113,7 +116,7 @@ pub const ChannelRs = struct { /// Relay-side session for an existing preamble (same members as `publish`). pub fn attachRelaySession(self: *ChannelRs, message_id: []const u8, preamble: *const rs_strategy.RsPreamble) !void { - if (self.sessions.get(message_id) != null) return error.DuplicateMessage; + if (self.sessions.get(message_id) != null) return error.InvalidMessage; const mid = try self.allocator.dupe(u8, message_id); errdefer self.allocator.free(mid); @@ -153,12 +156,23 @@ pub const ChannelRs = struct { } pub fn sessionDrainOutbound(self: *ChannelRs, message_id: []const u8) !usize { - const slot = self.sessions.getPtr(message_id) orelse return error.UnknownMessage; + const slot = self.sessions.getPtr(message_id) orelse return error.InvalidMessage; return slot.*.drainOutbound(); } + /// Same as [`SessionRs.drainOutboundOverQuic`](`SessionRs.drainOutboundOverQuic`) for `message_id` on this channel. + pub fn sessionDrainOutboundOverQuic( + self: *ChannelRs, + message_id: []const u8, + ctx: *anyopaque, + send_chunk: SendRsChunkFn, + ) (Allocator.Error || emit_planner.PlannerError || anyerror)!usize { + const slot = self.sessions.getPtr(message_id) orelse return error.InvalidMessage; + return slot.*.drainOutboundOverQuic(self.id, ctx, send_chunk); + } + pub fn sessionDecode(self: *ChannelRs, message_id: []const u8) ![]u8 { - const slot = self.sessions.getPtr(message_id) orelse return error.UnknownMessage; + const slot = self.sessions.getPtr(message_id) orelse return error.InvalidMessage; return slot.*.strategy.decode(); } @@ -186,8 +200,8 @@ pub const ChannelRs = struct { chunk_id: rs_strategy.ChunkIdent, data: []const u8, dedup: ?*broadcast_types.DedupCancel, - ) (Allocator.Error || error{UnknownMessage})!broadcast_types.ChunkIngestResult { - const strat = self.sessionStrategy(message_id) orelse return error.UnknownMessage; + ) (Allocator.Error || errors.Error)!broadcast_types.ChunkIngestResult { + const strat = self.sessionStrategy(message_id) orelse return error.InvalidMessage; if (registry) |reg| { const first = try reg.claim(self.allocator, self.id, message_id, chunk_id.index); if (!first) { @@ -205,7 +219,7 @@ pub const ChannelRs = struct { chunk_id: rs_strategy.ChunkIdent, data: []const u8, dedup: ?*broadcast_types.DedupCancel, - ) (Allocator.Error || error{UnknownMessage})!broadcast_types.ChunkIngestResult { + ) (Allocator.Error || errors.Error)!broadcast_types.ChunkIngestResult { return self.relayIngestChunk(self.engine.dedupRegistryPtr(), message_id, peer, chunk_id, data, dedup); } @@ -218,8 +232,8 @@ pub const ChannelRs = struct { chunk_id: rs_strategy.ChunkIdent, data: []const u8, dedup: ?*broadcast_types.DedupCancel, - ) (Allocator.Error || error{UnknownMessage})!broadcast_types.ChunkIngestResult { - const strat = self.sessionStrategy(message_id) orelse return error.UnknownMessage; + ) (Allocator.Error || errors.Error)!broadcast_types.ChunkIngestResult { + const strat = self.sessionStrategy(message_id) orelse return error.InvalidMessage; const v = strat.verifyChunk(chunk_id, data); if (v != .accepted) { return .{ .verdict = v, .complete = false }; @@ -235,7 +249,7 @@ pub const ChannelRs = struct { chunk_id: rs_strategy.ChunkIdent, data: []const u8, dedup: ?*broadcast_types.DedupCancel, - ) (Allocator.Error || error{UnknownMessage})!broadcast_types.ChunkIngestResult { + ) (Allocator.Error || errors.Error)!broadcast_types.ChunkIngestResult { return self.relayIngestChunkVerified(self.engine.dedupRegistryPtr(), message_id, peer, chunk_id, data, dedup); } }; @@ -323,7 +337,7 @@ test "relayIngestChunk unknown message" { const ch = try eng.attachChannelRs("topic", cfg); try std.testing.expectError( - error.UnknownMessage, + error.InvalidMessage, ch.relayIngestChunk(eng.dedupRegistryPtr(), "missing", "peer", .{ .index = 0 }, &.{}, null), ); } diff --git a/src/broadcast/engine.zig b/src/broadcast/engine.zig index 7655782..138f54b 100644 --- a/src/broadcast/engine.zig +++ b/src/broadcast/engine.zig @@ -3,10 +3,13 @@ const std = @import("std"); const ChannelRs = @import("channel_rs.zig").ChannelRs; const dedup_registry_mod = @import("../layer/dedup_registry.zig"); +const errors = @import("errors.zig"); const observer_mod = @import("observer.zig"); const Allocator = std.mem.Allocator; +pub const Error = errors.Error; + pub const EngineConfig = struct { observer: observer_mod.Observer = .{}, /// When set, `Engine` owns a `DedupRegistry` for `relayIngestChunk`-style helpers. @@ -64,7 +67,7 @@ pub const Engine = struct { self: *Engine, channel_id: []const u8, cfg: @import("../layer/rs_init.zig").RsConfig, - ) !*ChannelRs { + ) (Allocator.Error || Error)!*ChannelRs { if (self.channels.get(channel_id) != null) return error.ChannelExists; const key = try self.allocator.dupe(u8, channel_id); errdefer self.allocator.free(key); @@ -80,5 +83,3 @@ pub const Engine = struct { return self.channels.get(channel_id); } }; - -pub const Error = error{ChannelExists}; diff --git a/src/broadcast/engine_quic.zig b/src/broadcast/engine_quic.zig index cf357fd..1f61129 100644 --- a/src/broadcast/engine_quic.zig +++ b/src/broadcast/engine_quic.zig @@ -1,13 +1,15 @@ -//! Wire `transport/eth_ec_quic_peer.zig` `PeerConn` inbound SESS/CHUNK streams into -//! `broadcast/engine.zig` `Engine` / `ChannelRs` (issue #37). +//! Wire `transport/eth_ec_quic_peer.zig` `PeerConn` to `Engine` / `ChannelRs`: +//! - **Inbound:** SESS/CHUNK into relay ingest (#37). +//! - **Outbound:** `peerSendRsChunk` sends origin RS shards on new CHUNK UNI streams; pair with +//! `SessionRs.drainOutboundOverQuic` / `ChannelRs.sessionDrainOutboundOverQuic`. //! //! After TLS + BCAST handshake, call `wireEngine`, then `finishBcastHandshakeRead` //! to capture the peer id. Drive `quic.poll` on both endpoints, then `PeerConn.drive` -//! (via `engineQuicDrive`) so inbound SESS opens relay sessions and CHUNK frames -//! call `relayIngestChunkVerifiedEngine`. +//! so inbound SESS opens relay sessions and CHUNK frames call `relayIngestChunkVerifiedEngine`. const std = @import("std"); const quic = @import("quic"); +const errors = @import("errors.zig"); const peer_mod = @import("../transport/eth_ec_quic_peer.zig"); const Engine = @import("engine.zig").Engine; const rs_strategy = @import("../layer/rs_strategy.zig"); @@ -142,7 +144,7 @@ fn handleSessStream(host: *EngineQuicHost, st: *quic.QuicStream) !void { var rs_pre = try preambleOwnedToRs(host.allocator, preamble_owned); errdefer rs_pre.deinit(host.allocator); - const ch = host.engine.channelRs(open.channel) orelse return error.UnknownChannel; + const ch = host.engine.channelRs(open.channel) orelse return error.ChannelNotFound; try ch.attachRelaySession(open.message_id, &rs_pre); rs_pre.deinit(host.allocator); } @@ -156,7 +158,7 @@ fn handleChunkStream(host: *EngineQuicHost, st: *quic.QuicStream) !void { var chunk_in = try chunk_stream.readChunkStream(host.allocator, fbs.reader()); defer chunk_in.deinit(host.allocator); - const ch = host.engine.channelRs(chunk_in.header.channel) orelse return error.UnknownChannel; + const ch = host.engine.channelRs(chunk_in.header.channel) orelse return error.ChannelNotFound; const ident = try wire_rs.decodeChunkIdent(host.allocator, chunk_in.header.chunk_id); @@ -202,3 +204,25 @@ fn drainUniStream( } return error.StreamDrainTimeout; } + +/// Open a new outbound UNI stream and write one RS shard CHUNK (`wire/chunk_stream.writeRsShardChunk`). +/// `poll_peer` is the remote QUIC endpoint to co-poll (same as other `eth_ec_quic_enabled` tests). +pub fn peerSendRsChunk( + pc: *PeerConn, + poll_peer: ?*quic.QuicEndpoint, + channel_id: []const u8, + message_id: []const u8, + shard_index: i32, + payload: []const u8, +) (std.mem.Allocator.Error || errors.Error)!void { + const st = quic.streamMakeUni(pc.conn, poll_peer) catch return error.ChunkWriteFail; + var buf = std.ArrayList(u8).empty; + defer buf.deinit(pc.allocator); + { + const w = buf.writer(pc.allocator); + chunk_stream.writeRsShardChunk(w, pc.allocator, channel_id, message_id, shard_index, payload) catch return error.ChunkMarshal; + } + try quic.streamQueueWrite(st, buf.items); + const peer_ep = poll_peer orelse pc.ep; + quic.streamDrainWrites(st, peer_ep, 10_000) catch return error.ChunkWriteFail; +} diff --git a/src/broadcast/errors.zig b/src/broadcast/errors.zig new file mode 100644 index 0000000..d8762c2 --- /dev/null +++ b/src/broadcast/errors.zig @@ -0,0 +1,30 @@ +//! Stable error names aligned with ethp2p [`broadcast/errors.go`](https://github.com/ethp2p/ethp2p/blob/main/broadcast/errors.go). + +pub const Error = error{ + EngineClosed, + ChannelExists, + ChannelNotFound, + PeerExists, + PeerNotFound, + InvalidMessage, + ProtocolMismatch, + UnexpectedMsgType, + AlreadySubscribed, + UnbufferedSubscription, + ChunkMarshal, + ChunkPeerGone, + ChunkSlotFull, + ChunkWriteFail, + ChunkCancelled, + SessionClosing, +}; + +/// Mirrors `ChunkProcessError` in the reference (peer + channel + message + wrapped cause). +pub fn ChunkProcessError(comptime PeerId: type, comptime ChannelId: type, comptime MessageId: type) type { + return struct { + peer: PeerId, + channel_id: ChannelId, + message_id: MessageId, + err: anyerror, + }; +} diff --git a/src/broadcast/relay_async_verify.zig b/src/broadcast/relay_async_verify.zig index 7d5ee53..fae1c68 100644 --- a/src/broadcast/relay_async_verify.zig +++ b/src/broadcast/relay_async_verify.zig @@ -12,6 +12,7 @@ const rs_strategy = @import("../layer/rs_strategy.zig"); const verify_queue_mod = @import("../layer/verify_queue.zig"); const verify_workers_mod = @import("../layer/verify_workers.zig"); const ChannelRs = @import("channel_rs.zig").ChannelRs; +const errors = @import("errors.zig"); const Allocator = std.mem.Allocator; @@ -33,8 +34,7 @@ pub const RelayAsyncVerifier = struct { dedup: ?*broadcast_types.DedupCancel, }; - pub const Error = Allocator.Error || error{ - UnknownMessage, + pub const Error = Allocator.Error || errors.Error || error{ InvalidChunkIndex, OrphanVerifyRecord, SystemResources, @@ -106,7 +106,7 @@ pub const RelayAsyncVerifier = struct { data: []const u8, dedup: ?*broadcast_types.DedupCancel, ) Error!void { - const strat = self.channel.sessionStrategy(message_id) orelse return error.UnknownMessage; + const strat = self.channel.sessionStrategy(message_id) orelse return error.InvalidMessage; const idx_i = chunk_id.index; if (idx_i < 0) return error.InvalidChunkIndex; const idx: usize = @intCast(idx_i); @@ -159,7 +159,7 @@ pub const RelayAsyncVerifier = struct { data: []const u8, dedup: ?*broadcast_types.DedupCancel, ) Error!void { - const strat = self.channel.sessionStrategy(message_id) orelse return error.UnknownMessage; + const strat = self.channel.sessionStrategy(message_id) orelse return error.InvalidMessage; const idx_i = chunk_id.index; if (idx_i < 0) return error.InvalidChunkIndex; const idx: usize = @intCast(idx_i); diff --git a/src/broadcast/session_rs.zig b/src/broadcast/session_rs.zig index 4344237..38914c9 100644 --- a/src/broadcast/session_rs.zig +++ b/src/broadcast/session_rs.zig @@ -2,11 +2,21 @@ const std = @import("std"); const broadcast_types = @import("../layer/broadcast_types.zig"); +const emit_planner = @import("../layer/emit_planner.zig"); const rs_strategy = @import("../layer/rs_strategy.zig"); const Allocator = std.mem.Allocator; const RsStrategy = rs_strategy.RsStrategy; +/// Context + `peerSendRsChunk` (or custom transport) for [`drainOutboundOverQuic`](SessionRs.drainOutboundOverQuic). +pub const SendRsChunkFn = *const fn ( + ctx: *anyopaque, + channel_id: []const u8, + message_id: []const u8, + shard_index: i32, + payload: []const u8, +) anyerror!void; + pub const SessionRs = struct { allocator: Allocator, /// Owned by the parent `ChannelRs` map key; not freed here. @@ -42,4 +52,29 @@ pub const SessionRs = struct { } return total; } + + /// Drain the RS emit planner by sending each scheduled chunk via `send_chunk` (e.g. QUIC UNI per + /// [`engine_quic.peerSendRsChunk`](`@import("engine_quic.zig").peerSendRsChunk`)), then `chunkSent` with success. + pub fn drainOutboundOverQuic( + self: *SessionRs, + channel_id: []const u8, + ctx: *anyopaque, + send_chunk: SendRsChunkFn, + ) (Allocator.Error || emit_planner.PlannerError || anyerror)!usize { + var total: usize = 0; + while (true) { + const out = try self.strategy.pollChunks(); + defer self.allocator.free(out); + if (out.len == 0) break; + for (out) |disp| { + send_chunk(ctx, channel_id, self.message_id, disp.chunk_id.index, disp.data) catch |err| { + self.strategy.chunkSent(disp.peer, disp.chunk_id.handle(), false); + return err; + }; + self.strategy.chunkSent(disp.peer, disp.chunk_id.handle(), true); + total += 1; + } + } + return total; + } }; diff --git a/src/ci_root_broadcast.zig b/src/ci_root_broadcast.zig index 2b628c9..d84efc7 100644 --- a/src/ci_root_broadcast.zig +++ b/src/ci_root_broadcast.zig @@ -15,6 +15,7 @@ test { _ = @import("layer/rs_strategy.zig"); _ = @import("layer/verify_queue.zig"); _ = @import("layer/verify_workers.zig"); + _ = @import("broadcast/errors.zig"); _ = @import("broadcast/observer.zig"); _ = @import("broadcast/engine.zig"); _ = @import("broadcast/channel_rs.zig"); diff --git a/src/root.zig b/src/root.zig index 66ea73e..3e985dd 100644 --- a/src/root.zig +++ b/src/root.zig @@ -47,6 +47,7 @@ pub const transport = struct { }; pub const broadcast = struct { + pub const errors = @import("broadcast/errors.zig"); pub const observer = @import("broadcast/observer.zig"); pub const engine = @import("broadcast/engine.zig"); pub const engine_quic = @import("broadcast/engine_quic.zig"); @@ -73,6 +74,7 @@ test { _ = sim.gossipsub_interop; _ = sim.gossipsub_rpc_pb; _ = sim.gossipsub_rpc_host; + _ = broadcast.errors; _ = broadcast.engine; _ = broadcast.channel_rs; _ = broadcast.session_rs; diff --git a/src/transport/eth_ec_quic_enabled.zig b/src/transport/eth_ec_quic_enabled.zig index b043b84..0033ad9 100644 --- a/src/transport/eth_ec_quic_enabled.zig +++ b/src/transport/eth_ec_quic_enabled.zig @@ -11,6 +11,7 @@ const chunk_stream = @import("../wire/chunk_stream.zig"); const wire_rs = @import("../wire/rs.zig"); const Engine = @import("../broadcast/engine.zig").Engine; const engine_quic = @import("../broadcast/engine_quic.zig"); +const SendRsChunkFn = @import("../broadcast/session_rs.zig").SendRsChunkFn; const peer_mod = @import("eth_ec_quic_peer.zig"); const rs_init = @import("../layer/rs_init.zig"); const rs_strategy = @import("../layer/rs_strategy.zig"); @@ -600,3 +601,200 @@ test "QUIC EngineQuicHost SESS session_open + CHUNK relay ingest" { quic.destroy(srv, sc); quic.destroy(client_ep, conn); } + +// Origin outbound RS chunks over QUIC (#48): peerSendRsChunk + sessionDrainOutboundOverQuic. +test "QUIC origin RS outbound CHUNK (peerSendRsChunk + sessionDrainOutboundOverQuic)" { + if (@import("builtin").os.tag == .windows) return error.SkipZigTest; + if (@import("builtin").os.tag == .wasi) return error.SkipZigTest; + + var gpa = std.heap.GeneralPurposeAllocator(.{}){}; + defer _ = gpa.deinit(); + var alloc = gpa.allocator(); + + const srv_cfg = common.EthEcQuicConfig{ + .server_certificate_pem_path = "src/transport/testdata/zethp2p_cert.pem", + .server_private_key_pem_path = "src/transport/testdata/zethp2p_key.pem", + .tls_insecure_skip_verify = true, + }; + var alpn_srv = [_][]const u8{common.alpn_eth_ec_broadcast}; + var qc_srv = toQuicConfig(srv_cfg, &alpn_srv); + var server_ep: ?*quic.QuicEndpoint = null; + var sport: u16 = 0; + for (0..64) |i| { + const p: u16 = @intCast(45400 + i); + const bind_s = try std.fmt.allocPrint(alloc, "127.0.0.1:{d}", .{p}); + defer alloc.free(bind_s); + server_ep = quic.endpointInit(&alloc, bind_s, &qc_srv) catch |err| switch (err) { + error.AddressInUse, error.AddressNotAvailable => continue, + else => |e| return e, + }; + sport = p; + break; + } + const srv = server_ep orelse return error.NoBindPort; + defer quic.endpointDeinit(srv); + + var alpn_cli = [_][]const u8{common.alpn_eth_ec_broadcast}; + var qc_cli = quic.QuicConfig{ + .alpn = &alpn_cli, + .allow_insecure = true, + .max_idle_timeout_ms = maxIdleTimeoutMs(common.EthEcQuicConfig.default().max_idle_timeout_ns), + .max_udp_payload = 1350, + }; + const client_ep = try quic.endpointInit(&alloc, "127.0.0.1:0", &qc_cli); + defer quic.endpointDeinit(client_ep); + + const remote_s = try std.fmt.allocPrint(alloc, "127.0.0.1:{d}", .{sport}); + defer alloc.free(remote_s); + + const conn = try quic.connect(client_ep, remote_s, test_certs.tls_server_name); + errdefer quic.destroy(client_ep, conn); + + var server_conn: ?*quic.QuicConnection = null; + var rounds: u32 = 0; + while (rounds < 30_000) : (rounds += 1) { + try quic.poll(srv, 0); + try quic.poll(client_ep, 0); + if (server_conn == null) server_conn = quic.tryAccept(srv); + if (quic.handshakeComplete(conn)) { + if (server_conn) |c| { + if (quic.handshakeComplete(c)) break; + } + } + } + const sc = server_conn orelse return error.MissingServerConnection; + try std.testing.expect(quic.handshakeComplete(conn)); + try std.testing.expect(quic.handshakeComplete(sc)); + + const cfg = rs_init.RsConfig{ + .data_shards = 4, + .parity_shards = 2, + .chunk_len = 0, + .bitmap_threshold = 0, + .forward_multiplier = 4, + .disable_bitmap = false, + }; + + var engine = try Engine.init(alloc, "server-local", .{}); + defer engine.deinit(); + + const ch = try engine.attachChannelRs("ch1", cfg); + try ch.addMember("client-peer"); + + var host = engine_quic.EngineQuicHost.init(alloc, &engine, sc, srv); + defer host.deinit(); + host.wireEngine(); + host.setPeerEndpoint(client_ep); + + var cli_pc = peer_mod.PeerConn.init(alloc, conn, client_ep); + + try host.peer.beginHandshake(client_ep, .{ + .version = 1, + .channels = &.{"ch1"}, + .peer_id = "server-peer", + }); + try cli_pc.beginHandshake(srv, .{ + .version = 1, + .channels = &.{"ch1"}, + .peer_id = "client-peer", + }); + + var srv_bcast_payload = std.ArrayList(u8).empty; + defer srv_bcast_payload.deinit(alloc); + { + const w = srv_bcast_payload.writer(alloc); + try bcast_stream.writeBcastHandshakeOpen(w, alloc, .{ + .version = 1, + .channels = &.{"ch1"}, + .peer_id = "server-peer", + }); + } + try quic.streamQueueWrite(host.peer.bcast_out.?, srv_bcast_payload.items); + try quic.streamDrainWrites(host.peer.bcast_out.?, client_ep, 10_000); + + var cli_bcast_payload = std.ArrayList(u8).empty; + defer cli_bcast_payload.deinit(alloc); + { + const w = cli_bcast_payload.writer(alloc); + try bcast_stream.writeBcastHandshakeOpen(w, alloc, .{ + .version = 1, + .channels = &.{"ch1"}, + .peer_id = "client-peer", + }); + } + try quic.streamQueueWrite(cli_pc.bcast_out.?, cli_bcast_payload.items); + try quic.streamDrainWrites(cli_pc.bcast_out.?, srv, 10_000); + + rounds = 0; + while (rounds < 30_000) : (rounds += 1) { + try quic.poll(srv, 0); + try quic.poll(client_ep, 0); + _ = host.peer.drive(); + _ = cli_pc.drive(); + if (host.peer.state == .active and cli_pc.state == .active) break; + } + try std.testing.expectEqual(peer_mod.PeerConnState.active, host.peer.state); + try std.testing.expectEqual(peer_mod.PeerConnState.active, cli_pc.state); + + try host.finishBcastHandshakeRead(); + try std.testing.expectEqualStrings("client-peer", host.remote_peer_id); + + const payload = [_]u8{ 9, 8, 7, 6, 5 }; + try ch.publish("m1", &payload); + + const SendCtx = struct { + pc: *peer_mod.PeerConn, + poll_peer: *quic.QuicEndpoint, + }; + var send_ctx = SendCtx{ .pc = &host.peer, .poll_peer = client_ep }; + + const send_chunk: SendRsChunkFn = struct { + fn call( + ctx: *anyopaque, + channel_id: []const u8, + message_id: []const u8, + index: i32, + data: []const u8, + ) !void { + const c: *SendCtx = @ptrCast(@alignCast(ctx)); + try engine_quic.peerSendRsChunk(c.pc, c.poll_peer, channel_id, message_id, index, data); + } + }.call; + + const n = try ch.sessionDrainOutboundOverQuic("m1", &send_ctx, send_chunk); + try std.testing.expect(n > 0); + + const strat = ch.sessionStrategy("m1") orelse return error.MissingSession; + + const ust = try acceptIncomingQuicUniStream(conn, client_ep, srv); + var last_len: usize = 0; + var stable: u32 = 0; + var pb: u32 = 0; + while (pb < 10_000) : (pb += 1) { + try quic.poll(client_ep, 0); + try quic.poll(srv, 0); + const rlen = quic.streamReadSlice(ust).len; + if (rlen == last_len and rlen > 0) { + stable += 1; + if (stable >= 2) break; + } else { + stable = 0; + last_len = rlen; + } + } + const raw_in = quic.streamReadSlice(ust); + try std.testing.expect(raw_in.len > 0); + + var fbs = std.io.fixedBufferStream(raw_in); + var cin = try chunk_stream.readChunkStream(alloc, fbs.reader()); + defer cin.deinit(alloc); + try std.testing.expectEqualStrings("ch1", cin.header.channel); + try std.testing.expectEqualStrings("m1", cin.header.message_id); + const ident = try wire_rs.decodeChunkIdent(alloc, cin.header.chunk_id); + const idx: usize = @intCast(ident.index); + try std.testing.expect(idx < strat.chunks.len); + try std.testing.expectEqualSlices(u8, strat.chunks[idx], cin.payload); + + quic.destroy(srv, sc); + quic.destroy(client_ep, conn); +}