From 9025bf1887f791fa167f6e7405e81193f2c36db2 Mon Sep 17 00:00:00 2001 From: Timothy Simpson Date: Thu, 7 May 2026 13:10:20 +0100 Subject: [PATCH 01/16] Initial Zenzen DDS Support --- srcZig/shape.idl | 9 + srcZig/zenzen-zig/.gitignore | 2 + srcZig/zenzen-zig/build.zig | 37 ++ srcZig/zenzen-zig/build.zig.zon | 16 + srcZig/zenzen-zig/src/shape_main.zig | 756 +++++++++++++++++++++++++++ 5 files changed, 820 insertions(+) create mode 100644 srcZig/shape.idl create mode 100644 srcZig/zenzen-zig/.gitignore create mode 100644 srcZig/zenzen-zig/build.zig create mode 100644 srcZig/zenzen-zig/build.zig.zon create mode 100644 srcZig/zenzen-zig/src/shape_main.zig diff --git a/srcZig/shape.idl b/srcZig/shape.idl new file mode 100644 index 00000000..0450a5df --- /dev/null +++ b/srcZig/shape.idl @@ -0,0 +1,9 @@ +@appendable +struct ShapeType { + @key + string<128> color; + int32 x; + int32 y; + int32 shapesize; + sequence additional_payload_size; +}; diff --git a/srcZig/zenzen-zig/.gitignore b/srcZig/zenzen-zig/.gitignore new file mode 100644 index 00000000..3389c86c --- /dev/null +++ b/srcZig/zenzen-zig/.gitignore @@ -0,0 +1,2 @@ +.zig-cache/ +zig-out/ diff --git a/srcZig/zenzen-zig/build.zig b/srcZig/zenzen-zig/build.zig new file mode 100644 index 00000000..1505113a --- /dev/null +++ b/srcZig/zenzen-zig/build.zig @@ -0,0 +1,37 @@ +const std = @import("std"); + +pub fn build(b: *std.Build) void { + const target = b.standardTargetOptions(.{}); + const optimize = b.standardOptimizeOption(.{}); + + const version = b.option([]const u8, "dds-version", + "ZenzenDDS version string embedded in the executable name (default: 0.0.0)") + orelse "0.0.0"; + + const zzdds_dep = b.dependency("zzdds", .{ .target = target, .optimize = optimize }); + const zzdds_mod = zzdds_dep.module("zzdds"); + const zzdds_gen = zzdds_dep.module("zzdds_generated"); + + const exe_name = std.fmt.allocPrint(b.allocator, + "zenzen_dds-{s}_shape_main_linux", .{version}) catch @panic("OOM"); + + const exe = b.addExecutable(.{ + .name = exe_name, + .root_module = b.createModule(.{ + .root_source_file = b.path("src/shape_main.zig"), + .target = target, + .optimize = optimize, + .imports = &.{ + .{ .name = "zzdds", .module = zzdds_mod }, + .{ .name = "zzdds_generated", .module = zzdds_gen }, + }, + }), + }); + exe.root_module.link_libc = true; + b.installArtifact(exe); + + const run_step = b.step("run", "Run shape_main"); + const run_cmd = b.addRunArtifact(exe); + if (b.args) |args| run_cmd.addArgs(args); + run_step.dependOn(&run_cmd.step); +} diff --git a/srcZig/zenzen-zig/build.zig.zon b/srcZig/zenzen-zig/build.zig.zon new file mode 100644 index 00000000..6b02f398 --- /dev/null +++ b/srcZig/zenzen-zig/build.zig.zon @@ -0,0 +1,16 @@ +.{ + .name = .zenzen_dds_shape_main, + .version = "0.0.0", + .fingerprint = 0x59a7de95ffa6098c, + .minimum_zig_version = "0.16.0-dev.2848+b4ffb402c", + .dependencies = .{ + .zzdds = .{ + .path = "../../../ZenzenDDS", + }, + }, + .paths = .{ + "build.zig", + "build.zig.zon", + "src", + }, +} diff --git a/srcZig/zenzen-zig/src/shape_main.zig b/srcZig/zenzen-zig/src/shape_main.zig new file mode 100644 index 00000000..e2f67060 --- /dev/null +++ b/srcZig/zenzen-zig/src/shape_main.zig @@ -0,0 +1,756 @@ +//! ZenzenDDS shape_main — interoperability test application for OMG dds-rtps. +//! +//! Mirrors the CLI interface of srcCxx/shape_main.cxx so the Python test +//! harness (interoperability_report.py) can drive it as a pub or sub. +//! +//! Required stdout strings (matched by the harness via pexpect): +//! Publisher: "Create topic:" → "Create writer for topic:" → +//! "on_publication_matched()" or "on_offered_incompatible_qos" → +//! "%-10s %-10s %03d %03d [%d]" (only when -w is passed) +//! Subscriber: "Create topic:" → "Create reader for topic:" → +//! "[]" in the sample line or "on_requested_incompatible_qos()" + +const std = @import("std"); +const zzdds = @import("zzdds"); +const DDS = @import("zzdds_generated").DDS; + +const UdpTransport = zzdds.udp_transport.UdpTransport; +const SpdpSedpDiscovery = zzdds.combined_discovery.SpdpSedpDiscovery; +const DomainParticipantFactoryImpl = zzdds.dcps.DomainParticipantFactoryImpl; +const DataWriterImpl = zzdds.dcps.DataWriterImpl; +const DataReaderImpl = zzdds.dcps.DataReaderImpl; +const TopicImpl = zzdds.dcps.TopicImpl; +const noop_security = zzdds.noop_security.noop_security_plugins; +const time_mod = zzdds.util.time; +const RtpsTimestamp = zzdds.util.time.RtpsTimestamp; +const history_mod = zzdds.rtps.history; +const nil = zzdds.dcps; + +// ── Stdout helpers ──────────────────────────────────────────────────────────── +// std.io was removed in Zig 0.16; write directly via the Linux write(2) syscall. + +fn stdoutWrite(bytes: []const u8) void { + var remaining = bytes; + while (remaining.len > 0) { + const rc = std.os.linux.write(std.posix.STDOUT_FILENO, remaining.ptr, remaining.len); + const n = @as(isize, @bitCast(rc)); + if (n <= 0) break; + remaining = remaining[@intCast(n)..]; + } +} + +fn stdoutPrint(comptime fmt: []const u8, args: anytype) void { + var buf: [2048]u8 = undefined; + var w: std.Io.Writer = .fixed(&buf); + w.print(fmt, args) catch {}; + stdoutWrite(w.buffered()); +} + +// ── Signal handling ─────────────────────────────────────────────────────────── + +var g_all_done: std.atomic.Value(bool) = std.atomic.Value(bool).init(false); + +fn handleSigint(sig: std.posix.SIG) callconv(.c) void { + _ = sig; + g_all_done.store(true, .release); +} + +// ── Options ─────────────────────────────────────────────────────────────────── + +const Options = struct { + publish: bool = false, + subscribe: bool = false, + domain_id: u32 = 0, + best_effort: bool = false, + reliable: bool = false, + history_depth: i32 = -1, // -1 = use default KEEP_LAST 1 + deadline_ms: u64 = 0, + ownership_strength: i32 = -1, // -1 = SHARED + topic_name: []const u8 = "Square", + color: ?[]const u8 = null, + partition: ?[]const u8 = null, + durability: u8 = 'v', + data_representation: u16 = 1, // 1=XCDR1, 2=XCDR2 + print_writer_samples: bool = false, + shapesize: i32 = 20, + write_period_ms: u64 = 33, + read_period_ms: u64 = 100, + num_iterations: i64 = -1, // -1 = infinite + num_instances: u32 = 1, + additional_payload: u32 = 0, + cft_expression: ?[]const u8 = null, // content filter expression (unsupported) +}; + +// ── CDR helpers ─────────────────────────────────────────────────────────────── + +fn writeU32Le(buf: *std.ArrayList(u8), alloc: std.mem.Allocator, v: u32) !void { + var b: [4]u8 = undefined; + std.mem.writeInt(u32, &b, v, .little); + try buf.appendSlice(alloc, &b); +} + +fn writeI32Le(buf: *std.ArrayList(u8), alloc: std.mem.Allocator, v: i32) !void { + var b: [4]u8 = undefined; + std.mem.writeInt(i32, &b, v, .little); + try buf.appendSlice(alloc, &b); +} + +fn align4(n: usize) usize { + return (n + 3) & ~@as(usize, 3); +} + +// ── ShapeType CDR serialization ─────────────────────────────────────────────── + +const ShapeData = struct { + color: []const u8, + x: i32, + y: i32, + shapesize: i32, + payload: u32, // additional_payload_size sequence length (all-zero bytes) +}; + +// Serialize ShapeType with a 4-byte encapsulation header. +// xcdr2=false → CDR_LE (XCDR1): @appendable treated as @final, no DHEADER. +// xcdr2=true → DELIMITED_CDR_LE (XCDR2): 4-byte DHEADER before struct members. +fn serializeShape(buf: *std.ArrayList(u8), alloc: std.mem.Allocator, s: ShapeData, xcdr2: bool) !void { + buf.clearRetainingCapacity(); + + if (xcdr2) { + // DELIMITED_CDR_LE encapsulation (XCDR2 @appendable) + try buf.appendSlice(alloc, &[4]u8{ 0x00, 0x09, 0x00, 0x00 }); + // Placeholder for DHEADER (filled in below) + const dheader_pos = buf.items.len; + try writeU32Le(buf, alloc, 0); + + var off: usize = 0; + const clen: u32 = @intCast(s.color.len + 1); + try writeU32Le(buf, alloc, clen); + off += 4; + try buf.appendSlice(alloc, s.color); + try buf.append(alloc, 0); + off += clen; + const pad = align4(off) - off; + for (0..pad) |_| try buf.append(alloc, 0); + off += pad; + try writeI32Le(buf, alloc, s.x); off += 4; + try writeI32Le(buf, alloc, s.y); off += 4; + try writeI32Le(buf, alloc, s.shapesize); off += 4; + try writeU32Le(buf, alloc, s.payload); off += 4; + for (0..s.payload) |_| try buf.append(alloc, 0); + // Patch DHEADER = length of struct members (bytes after DHEADER) + const member_len: u32 = @intCast(buf.items.len - dheader_pos - 4); + std.mem.writeInt(u32, buf.items[dheader_pos..][0..4], member_len, .little); + } else { + // CDR_LE encapsulation (XCDR1) + try buf.appendSlice(alloc, &[4]u8{ 0x00, 0x01, 0x00, 0x00 }); + + var off: usize = 0; + const clen: u32 = @intCast(s.color.len + 1); + try writeU32Le(buf, alloc, clen); + off += 4; + try buf.appendSlice(alloc, s.color); + try buf.append(alloc, 0); + off += clen; + const pad = align4(off) - off; + for (0..pad) |_| try buf.append(alloc, 0); + off += pad; + try writeI32Le(buf, alloc, s.x); off += 4; + try writeI32Le(buf, alloc, s.y); off += 4; + try writeI32Le(buf, alloc, s.shapesize); off += 4; + try writeU32Le(buf, alloc, s.payload); off += 4; + for (0..s.payload) |_| try buf.append(alloc, 0); + } +} + +// Compute 16-byte key hash from CDR-serialised key (color string). +// DDS spec: use the raw serialised key when it fits in 16 bytes, else MD5. +fn colorKeyHash(color: []const u8) [16]u8 { + var kh = std.mem.zeroes([16]u8); + // CDR key: u32 length (LE) + chars + '\0' + const clen: u32 = @intCast(color.len + 1); + const klen: usize = 4 + clen; + if (klen <= 16) { + std.mem.writeInt(u32, kh[0..4], clen, .little); + @memcpy(kh[4..][0..color.len], color); + // null at [4 + color.len]; already zero from zeroes() + } else { + // Fall back to zeroes — MD5 not implemented; works for typical test colors + } + return kh; +} + +const ParsedShape = struct { + color: []const u8, // slice into payload; valid while payload is alive + x: i32, + y: i32, + shapesize: i32, +}; + +// Deserialize a CDR/CDR2 ShapeType payload. Returns null on error / key-only. +fn deserializeShape(payload: []const u8) ?ParsedShape { + if (payload.len < 4) return null; + + const encap = payload[1]; + var off: usize = 4; // skip 4-byte encap header + + // XCDR2 @appendable: skip 4-byte struct DHEADER + if (encap == 0x08 or encap == 0x09) { + if (payload.len < off + 4) return null; + off += 4; + } + + // color string + if (payload.len < off + 4) return null; + const clen = std.mem.readInt(u32, payload[off..][0..4], .little); + off += 4; + if (clen == 0 or payload.len < off + clen) return null; + const color = payload[off .. off + clen - 1]; // strip null terminator + off += clen; + + // pad to 4-byte + off = align4(off); + + if (payload.len < off + 12) return null; // x + y + shapesize + const x = std.mem.readInt(i32, payload[off..][0..4], .little); off += 4; + const y = std.mem.readInt(i32, payload[off..][0..4], .little); off += 4; + const shapesize = std.mem.readInt(i32, payload[off..][0..4], .little); + + return ParsedShape{ .color = color, .x = x, .y = y, .shapesize = shapesize }; +} + +// ── Policy name mapping ─────────────────────────────────────────────────────── + +fn policyName(id: i32) []const u8 { + return switch (id) { + 2 => "DURABILITY", + 4 => "DEADLINE", + 5 => "LATENCYBUDGET", + 6 => "OWNERSHIP", + 8 => "LIVELINESS", + 10 => "PARTITION", + 11 => "RELIABILITY", + 12 => "DESTINATIONORDER", + 23 => "DATAREPRESENTATION", + else => "UNKNOWN", + }; +} + +// ── Listener context and vtables ────────────────────────────────────────────── + +const ListenerCtx = struct { + topic_name: []const u8, + type_name: []const u8 = "ShapeType", +}; + +// DataWriter listeners + +fn dwOnIncompatQos(ctx: *anyopaque, _: DDS.DataWriter, status: DDS.OfferedIncompatibleQosStatus) void { + const lc: *ListenerCtx = @ptrCast(@alignCast(ctx)); + stdoutPrint("on_offered_incompatible_qos() topic: '{s}' type: '{s}' : {d} ({s})\n", + .{ lc.topic_name, lc.type_name, status.last_policy_id, policyName(status.last_policy_id) }); +} + +fn dwOnDeadlineMissed(ctx: *anyopaque, _: DDS.DataWriter, status: DDS.OfferedDeadlineMissedStatus) void { + const lc: *ListenerCtx = @ptrCast(@alignCast(ctx)); + stdoutPrint("on_offered_deadline_missed() topic: '{s}' type: '{s}' : (total = {d}, change = {d})\n", + .{ lc.topic_name, lc.type_name, status.total_count, status.total_count_change }); +} + +fn dwOnLivelinessLost(_: *anyopaque, _: DDS.DataWriter, _: DDS.LivelinessLostStatus) void {} +fn dwOnPubMatched(_: *anyopaque, _: DDS.DataWriter, _: DDS.PublicationMatchedStatus) void {} +fn dwOnDeinit(_: *anyopaque) void {} + +var dw_vtable = DDS.DataWriterListener.Vtable{ + .on_offered_deadline_missed = dwOnDeadlineMissed, + .on_offered_incompatible_qos = dwOnIncompatQos, + .on_liveliness_lost = dwOnLivelinessLost, + .on_publication_matched = dwOnPubMatched, + .deinit = dwOnDeinit, +}; + +// DataReader listeners + +fn drOnIncompatQos(ctx: *anyopaque, _: DDS.DataReader, status: DDS.RequestedIncompatibleQosStatus) void { + const lc: *ListenerCtx = @ptrCast(@alignCast(ctx)); + stdoutPrint("on_requested_incompatible_qos() topic: '{s}' type: '{s}' : {d} ({s})\n", + .{ lc.topic_name, lc.type_name, status.last_policy_id, policyName(status.last_policy_id) }); +} + +fn drOnDeadlineMissed(ctx: *anyopaque, _: DDS.DataReader, status: DDS.RequestedDeadlineMissedStatus) void { + const lc: *ListenerCtx = @ptrCast(@alignCast(ctx)); + stdoutPrint("on_requested_deadline_missed() topic: '{s}' type: '{s}' : (total = {d}, change = {d})\n", + .{ lc.topic_name, lc.type_name, status.total_count, status.total_count_change }); +} + +fn drOnSampleRejected(_: *anyopaque, _: DDS.DataReader, _: DDS.SampleRejectedStatus) void {} +fn drOnLivelinessChanged(_: *anyopaque, _: DDS.DataReader, _: DDS.LivelinessChangedStatus) void {} +fn drOnDataAvail(_: *anyopaque, _: DDS.DataReader) void {} +fn drOnSubMatched(_: *anyopaque, _: DDS.DataReader, _: DDS.SubscriptionMatchedStatus) void {} +fn drOnSampleLost(_: *anyopaque, _: DDS.DataReader, _: DDS.SampleLostStatus) void {} +fn drOnDeinit(_: *anyopaque) void {} + +var dr_vtable = DDS.DataReaderListener.Vtable{ + .on_requested_deadline_missed = drOnDeadlineMissed, + .on_requested_incompatible_qos = drOnIncompatQos, + .on_sample_rejected = drOnSampleRejected, + .on_liveliness_changed = drOnLivelinessChanged, + .on_data_available = drOnDataAvail, + .on_subscription_matched = drOnSubMatched, + .on_sample_lost = drOnSampleLost, + .deinit = drOnDeinit, +}; + +// ── DataWriter QoS builder ──────────────────────────────────────────────────── + +fn buildWriterQos(alloc: std.mem.Allocator, opts: *const Options) !DDS.DataWriterQos { + var qos = DDS.DataWriterQos{}; + + qos.reliability.kind = if (opts.best_effort) + .BEST_EFFORT_RELIABILITY_QOS + else + .RELIABLE_RELIABILITY_QOS; + + if (opts.history_depth == 0) { + qos.history.kind = .KEEP_ALL_HISTORY_QOS; + } else if (opts.history_depth > 0) { + qos.history.kind = .KEEP_LAST_HISTORY_QOS; + qos.history.depth = opts.history_depth; + } + + qos.durability.kind = switch (opts.durability) { + 'l' => .TRANSIENT_LOCAL_DURABILITY_QOS, + 't' => .TRANSIENT_DURABILITY_QOS, + 'p' => .PERSISTENT_DURABILITY_QOS, + else => .VOLATILE_DURABILITY_QOS, + }; + + if (opts.ownership_strength >= 0) { + qos.ownership.kind = .EXCLUSIVE_OWNERSHIP_QOS; + qos.ownership_strength.value = opts.ownership_strength; + } + + qos.deadline.period = if (opts.deadline_ms > 0) .{ + .sec = @intCast(opts.deadline_ms / 1000), + .nanosec = @intCast((opts.deadline_ms % 1000) * std.time.ns_per_ms), + } else .{ .sec = 0x7fff_ffff, .nanosec = 0x7fff_ffff }; // INFINITE (DDS default) + + // Wire IDs: XCDR1=0, XCDR2=2. opts uses 1=XCDR1, 2=XCDR2 for backward compat. + const repr_id: i16 = if (opts.data_representation == 2) 2 else 0; + try qos.data_representation.value.append(alloc, repr_id); + + return qos; +} + +fn buildReaderQos(alloc: std.mem.Allocator, opts: *const Options) !DDS.DataReaderQos { + var qos = DDS.DataReaderQos{}; + + qos.reliability.kind = if (opts.best_effort) + .BEST_EFFORT_RELIABILITY_QOS + else if (opts.reliable) + .RELIABLE_RELIABILITY_QOS + else + .RELIABLE_RELIABILITY_QOS; // default: RELIABLE + + if (opts.history_depth == 0) { + qos.history.kind = .KEEP_ALL_HISTORY_QOS; + } else if (opts.history_depth > 0) { + qos.history.kind = .KEEP_LAST_HISTORY_QOS; + qos.history.depth = opts.history_depth; + } + + qos.durability.kind = switch (opts.durability) { + 'l' => .TRANSIENT_LOCAL_DURABILITY_QOS, + 't' => .TRANSIENT_DURABILITY_QOS, + 'p' => .PERSISTENT_DURABILITY_QOS, + else => .VOLATILE_DURABILITY_QOS, + }; + + if (opts.ownership_strength >= 0) { + qos.ownership.kind = .EXCLUSIVE_OWNERSHIP_QOS; + } + + qos.deadline.period = if (opts.deadline_ms > 0) .{ + .sec = @intCast(opts.deadline_ms / 1000), + .nanosec = @intCast((opts.deadline_ms % 1000) * std.time.ns_per_ms), + } else .{ .sec = 0x7fff_ffff, .nanosec = 0x7fff_ffff }; // INFINITE (DDS default) + + // Wire IDs: XCDR1=0, XCDR2=2. opts uses 1=XCDR1, 2=XCDR2 for backward compat. + const repr_id: i16 = if (opts.data_representation == 2) 2 else 0; + try qos.data_representation.value.append(alloc, repr_id); + + return qos; +} + +// ── Publisher ───────────────────────────────────────────────────────────────── + +fn runPublisher( + alloc: std.mem.Allocator, + dp: DDS.DomainParticipant, + topic: DDS.Topic, + opts: *const Options, +) !void { + const color = opts.color orelse "BLUE"; + const topic_impl: *TopicImpl = @ptrCast(@alignCast(topic.ptr)); + const topic_name = topic_impl.topic_name; + + const pub_ = dp.vtable.create_publisher(dp.ptr, .{}, nil.nil_pub_listener, 0); + if (pub_.ptr == nil.nil_pub_listener.ptr) return error.PublisherFailed; + + const dw_qos = try buildWriterQos(alloc, opts); + + var lctx = ListenerCtx{ .topic_name = topic_name }; + const dw_listener = DDS.DataWriterListener{ + .ptr = &lctx, + .vtable = &dw_vtable, + }; + // Enable incompatible-QoS and deadline-missed callbacks. + const listener_mask: DDS.StatusMask = + DDS.OFFERED_INCOMPATIBLE_QOS_STATUS | DDS.OFFERED_DEADLINE_MISSED_STATUS; + + const dw = pub_.vtable.create_datawriter(pub_.ptr, topic, dw_qos, dw_listener, listener_mask); + if (dw.ptr == nil.nil_dw_listener.ptr) return error.DataWriterFailed; + + stdoutPrint("Create writer for topic: {s} color: {s}\n", .{ topic_name, color }); + + const dw_impl: *DataWriterImpl = @ptrCast(@alignCast(dw.ptr)); + + // Write loop starts immediately so history accumulates before subscribers join. + // The match notification is detected inline and printed on first discovery. + // Timeout after 10 s with no reader → exit (test harness detects READER_NOT_MATCHED). + var buf: std.ArrayList(u8) = .empty; + defer buf.deinit(alloc); + + var shape = ShapeData{ + .color = color, + .x = 0, + .y = 0, + .shapesize = if (opts.shapesize == 0) 1 else opts.shapesize, + .payload = opts.additional_payload, + }; + var rng = std.Random.DefaultPrng.init(@intCast(time_mod.nanoTimestamp())); + const rand = rng.random(); + + const match_deadline = time_mod.nanoTimestamp() + 10 * std.time.ns_per_s; + var printed_matched = false; + + // Deadline monitoring: track time of last write to detect when the deadline + // period expires between writes (write_period > deadline → missed). + const deadline_ns: i64 = if (opts.deadline_ms > 0) + @intCast(opts.deadline_ms * std.time.ns_per_ms) + else + 0; + var last_write_ns: i64 = time_mod.nanoTimestamp(); + + var iteration: i64 = 0; + while (!g_all_done.load(.acquire)) { + if (opts.num_iterations >= 0 and iteration >= opts.num_iterations) break; + + // Detect reader match; emit notification once; exit on timeout (no match). + if (!printed_matched) { + if (dw_impl.matchedReaderCount() > 0) { + stdoutPrint( + "on_publication_matched() topic: '{s}' type: 'ShapeType' : matched readers {d} (change = 1)\n", + .{ topic_name, dw_impl.matchedReaderCount() }, + ); + printed_matched = true; + } else if (time_mod.nanoTimestamp() > match_deadline) { + return; // READER_NOT_MATCHED + } + } + + // Check if the deadline expired since the last write (write_period > deadline). + if (deadline_ns > 0) { + const elapsed = time_mod.nanoTimestamp() - last_write_ns; + if (elapsed > deadline_ns) dw_impl.notifyDeadlineMissed(); + } + + // Randomise position (0–319 x, 0–239 y) — matches C++ shape_main defaults + shape.x = @rem(@as(i32, rand.int(u16)), 320); + shape.y = @rem(@as(i32, rand.int(u16)), 240); + + // Serialize and write each instance + for (0..opts.num_instances) |inst| { + const inst_color: []const u8 = blk: { + if (inst == 0) break :blk color; + // additional instances: color + index (e.g. "BLUE1", "BLUE2") + break :blk std.fmt.allocPrint(alloc, "{s}{d}", .{ color, inst }) catch color; + }; + defer if (inst > 0) alloc.free(inst_color); + + shape.color = inst_color; + try serializeShape(&buf, alloc, shape, opts.data_representation == 2); + + const key_hash = colorKeyHash(inst_color); + _ = try dw_impl.writeRaw( + .alive, + RtpsTimestamp.now(), + history_mod.INSTANCE_HANDLE_NIL, + key_hash, + buf.items, + ); + + if (opts.print_writer_samples) { + stdoutPrint("{s:<10} {s:<10} {d:0>3} {d:0>3} [{d}]\n", + .{ topic_name, inst_color, @as(u32, @intCast(shape.x)), @as(u32, @intCast(shape.y)), shape.shapesize }); + } + } + + if (opts.shapesize == 0) { + shape.shapesize += 1; + } + + last_write_ns = time_mod.nanoTimestamp(); + iteration += 1; + time_mod.sleepNs(opts.write_period_ms * std.time.ns_per_ms); + } +} + +// ── Subscriber ──────────────────────────────────────────────────────────────── + +fn runSubscriber( + alloc: std.mem.Allocator, + dp: DDS.DomainParticipant, + topic: DDS.Topic, + opts: *const Options, +) !void { + const topic_impl: *TopicImpl = @ptrCast(@alignCast(topic.ptr)); + const topic_name = topic_impl.topic_name; + const topic_desc = topic_impl.toTopicDescription(); + + const sub = dp.vtable.create_subscriber(dp.ptr, .{}, nil.nil_sub_listener, 0); + if (sub.ptr == nil.nil_sub_listener.ptr) return error.SubscriberFailed; + + const dr_qos = try buildReaderQos(alloc, opts); + + var lctx = ListenerCtx{ .topic_name = topic_name }; + const dr_listener = DDS.DataReaderListener{ + .ptr = &lctx, + .vtable = &dr_vtable, + }; + const listener_mask: DDS.StatusMask = + DDS.REQUESTED_INCOMPATIBLE_QOS_STATUS | DDS.REQUESTED_DEADLINE_MISSED_STATUS; + + const dr = sub.vtable.create_datareader(sub.ptr, topic_desc, dr_qos, dr_listener, listener_mask); + if (dr.ptr == nil.nil_dr_listener.ptr) return error.DataReaderFailed; + + stdoutPrint("Create reader for topic: {s}\n", .{topic_name}); + + const dr_impl: *DataReaderImpl = @ptrCast(@alignCast(dr.ptr)); + + // Deadline monitoring: once a writer is matched, track time since last + // received sample; fire on_requested_deadline_missed when the deadline + // period expires without data. + const sub_deadline_ns: i64 = if (opts.deadline_ms > 0) + @intCast(opts.deadline_ms * std.time.ns_per_ms) + else + 0; + var deadline_base_ns: i64 = 0; // 0 = no matched writer yet + + var iteration: i64 = 0; + while (!g_all_done.load(.acquire)) { + if (opts.num_iterations >= 0 and iteration >= opts.num_iterations) break; + + // Start the deadline clock when the first writer is matched. + if (sub_deadline_ns > 0 and deadline_base_ns == 0 and dr_impl.matchedWriterCount() > 0) { + deadline_base_ns = time_mod.nanoTimestamp(); + } + + var got_data = false; + while (dr_impl.takeRaw()) |payload| { + defer alloc.free(payload); + got_data = true; + if (deserializeShape(payload)) |s| { + stdoutPrint("{s:<10} {s:<10} {d:0>3} {d:0>3} [{d}]\n", + .{ topic_name, s.color, @as(u32, @intCast(s.x)), @as(u32, @intCast(s.y)), s.shapesize }); + } + } + + if (got_data) { + // Reset deadline clock on each received sample. + deadline_base_ns = time_mod.nanoTimestamp(); + } else if (sub_deadline_ns > 0 and deadline_base_ns != 0) { + // No data this iteration — check if the deadline has expired. + if (time_mod.nanoTimestamp() - deadline_base_ns > sub_deadline_ns) { + dr_impl.notifyDeadlineMissed(); + deadline_base_ns = time_mod.nanoTimestamp(); // reset to avoid repeated firing + } + } + + iteration += 1; + time_mod.sleepNs(opts.read_period_ms * std.time.ns_per_ms); + } +} + +// ── Argument parsing ────────────────────────────────────────────────────────── + +fn parseArgs(process_args: std.process.Args) !Options { + var opts = Options{}; + var it = std.process.Args.Iterator.init(process_args); + _ = it.skip(); // program name + + while (it.next()) |arg| { + if (std.mem.eql(u8, arg, "-P")) { + opts.publish = true; + } else if (std.mem.eql(u8, arg, "-S")) { + opts.subscribe = true; + } else if (std.mem.eql(u8, arg, "-b")) { + opts.best_effort = true; + } else if (std.mem.eql(u8, arg, "-r")) { + opts.reliable = true; + } else if (std.mem.eql(u8, arg, "-w")) { + opts.print_writer_samples = true; + } else if (std.mem.eql(u8, arg, "-R")) { + // use read() instead of take() — no-op (we always use takeRaw) + } else if (std.mem.eql(u8, arg, "-d")) { + const v = it.next() orelse return error.MissingValue; + opts.domain_id = try std.fmt.parseInt(u32, v, 10); + } else if (std.mem.eql(u8, arg, "-k")) { + const v = it.next() orelse return error.MissingValue; + opts.history_depth = try std.fmt.parseInt(i32, v, 10); + } else if (std.mem.eql(u8, arg, "-f")) { + const v = it.next() orelse return error.MissingValue; + opts.deadline_ms = try std.fmt.parseInt(u64, v, 10); + } else if (std.mem.eql(u8, arg, "-s")) { + const v = it.next() orelse return error.MissingValue; + opts.ownership_strength = try std.fmt.parseInt(i32, v, 10); + } else if (std.mem.eql(u8, arg, "-t")) { + const v = it.next() orelse return error.MissingValue; + opts.topic_name = v; + } else if (std.mem.eql(u8, arg, "-c")) { + const v = it.next() orelse return error.MissingValue; + opts.color = v; + } else if (std.mem.eql(u8, arg, "-p")) { + const v = it.next() orelse return error.MissingValue; + opts.partition = v; + } else if (std.mem.eql(u8, arg, "-D")) { + const v = it.next() orelse return error.MissingValue; + if (v.len > 0) opts.durability = v[0]; + } else if (std.mem.eql(u8, arg, "-x")) { + const v = it.next() orelse return error.MissingValue; + opts.data_representation = try std.fmt.parseInt(u16, v, 10); + } else if (std.mem.eql(u8, arg, "-z")) { + const v = it.next() orelse return error.MissingValue; + opts.shapesize = try std.fmt.parseInt(i32, v, 10); + } else if (std.mem.eql(u8, arg, "--write-period")) { + const v = it.next() orelse return error.MissingValue; + opts.write_period_ms = try std.fmt.parseInt(u64, v, 10); + } else if (std.mem.eql(u8, arg, "--read-period")) { + const v = it.next() orelse return error.MissingValue; + opts.read_period_ms = try std.fmt.parseInt(u64, v, 10); + } else if (std.mem.eql(u8, arg, "--num-iterations")) { + const v = it.next() orelse return error.MissingValue; + opts.num_iterations = try std.fmt.parseInt(i64, v, 10); + } else if (std.mem.eql(u8, arg, "--num-instances")) { + const v = it.next() orelse return error.MissingValue; + opts.num_instances = try std.fmt.parseInt(u32, v, 10); + } else if (std.mem.eql(u8, arg, "--num-topics")) { + // not yet supported — silently ignore value + _ = it.next(); + } else if (std.mem.eql(u8, arg, "--additional-payload-size")) { + const v = it.next() orelse return error.MissingValue; + opts.additional_payload = try std.fmt.parseInt(u32, v, 10); + } else if (std.mem.eql(u8, arg, "--time-filter") or + std.mem.eql(u8, arg, "--lifespan") or + std.mem.eql(u8, arg, "--write-period") or + std.mem.eql(u8, arg, "--size-modulo") or + std.mem.eql(u8, arg, "--periodic-announcement") or + std.mem.eql(u8, arg, "--final-instance-state") or + std.mem.eql(u8, arg, "--access-scope") or + std.mem.eql(u8, arg, "--coherent-sample-count") or + std.mem.eql(u8, arg, "--take-read")) + { + // consume argument value and ignore — unimplemented options + _ = it.next(); + } else if (std.mem.eql(u8, arg, "--coherent") or + std.mem.eql(u8, arg, "--ordered")) + { + // boolean flags — ignore + } else if (std.mem.startsWith(u8, arg, "--") or std.mem.startsWith(u8, arg, "-")) { + std.log.warn("unrecognised option: {s}", .{arg}); + } + } + + // Publisher default color + if (opts.publish and opts.color == null) { + opts.color = "BLUE"; + } + + return opts; +} + +// ── main ────────────────────────────────────────────────────────────────────── + +pub fn main(init: std.process.Init.Minimal) !void { + // Install SIGINT handler so the test harness can cleanly terminate us. + const sa = std.posix.Sigaction{ + .handler = .{ .handler = handleSigint }, + .mask = std.posix.sigemptyset(), + .flags = 0, + }; + std.posix.sigaction(std.posix.SIG.INT, &sa, null); + + var gpa = std.heap.DebugAllocator(.{}){}; + defer _ = gpa.deinit(); + const alloc = gpa.allocator(); + + const opts = parseArgs(init.args) catch |err| { + std.log.err("argument error: {}", .{err}); + std.process.exit(1); + }; + + if (!opts.publish and !opts.subscribe) { + std.log.err("specify -P (publish) or -S (subscribe)", .{}); + std.process.exit(1); + } + + // If the subscriber was given a content-filter expression (via -c when + // subscribing, which the C++ code turns into a CFT), report it as unsupported + // so the harness gets SUB_UNSUPPORTED_FEATURE rather than hanging. + if (opts.subscribe and opts.cft_expression != null) { + stdoutPrint("not supported: ContentFilteredTopic\n", .{}); + return; + } + + const udp = try UdpTransport.init(alloc, .{}, opts.domain_id, null); + defer udp.deinit(); + const transport = udp.transport(); + + const disc = try SpdpSedpDiscovery.init(alloc, transport, opts.domain_id, 3_000); + defer disc.deinit(); + const discovery = disc.toDiscovery(); + + var factory = try DomainParticipantFactoryImpl.init( + alloc, transport, discovery, noop_security, .random, .{}, + ); + defer factory.deinit(); + const dpf = factory.toDDSFactory(); + + const dp = dpf.create_participant(opts.domain_id, .{}, nil.nil_dp_listener, 0); + if (dp.ptr == nil.nil_dp_listener.ptr) { + std.log.err("failed to create participant on domain {d}", .{opts.domain_id}); + std.process.exit(1); + } + defer _ = dpf.delete_participant(dp); + + const topic = dp.vtable.create_topic( + dp.ptr, opts.topic_name, "ShapeType", .{}, nil.nil_topic_listener, 0, + ); + if (topic.ptr == nil.nil_topic_listener.ptr) { + std.log.err("failed to create topic '{s}'", .{opts.topic_name}); + std.process.exit(1); + } + + stdoutPrint("Create topic: {s}\n", .{opts.topic_name}); + + if (opts.publish) { + runPublisher(alloc, dp, topic, &opts) catch |err| { + std.log.err("publisher error: {}", .{err}); + std.process.exit(1); + }; + } else { + runSubscriber(alloc, dp, topic, &opts) catch |err| { + std.log.err("subscriber error: {}", .{err}); + std.process.exit(1); + }; + } +} From 9bf4cc413c5b69e7e61568363b03ff37aa098a81 Mon Sep 17 00:00:00 2001 From: Timothy Simpson Date: Tue, 12 May 2026 08:06:04 +0100 Subject: [PATCH 02/16] move shape_main.zig to shared location --- srcZig/{zenzen-zig/src => }/shape_main.zig | 102 +++++++++++++++++---- srcZig/zenzen-zig/build.zig | 2 +- srcZig/zenzen-zig/build.zig.zon | 3 +- 3 files changed, 86 insertions(+), 21 deletions(-) rename srcZig/{zenzen-zig/src => }/shape_main.zig (88%) diff --git a/srcZig/zenzen-zig/src/shape_main.zig b/srcZig/shape_main.zig similarity index 88% rename from srcZig/zenzen-zig/src/shape_main.zig rename to srcZig/shape_main.zig index e2f67060..63b05b6e 100644 --- a/srcZig/zenzen-zig/src/shape_main.zig +++ b/srcZig/shape_main.zig @@ -20,6 +20,7 @@ const DomainParticipantFactoryImpl = zzdds.dcps.DomainParticipantFactoryImpl; const DataWriterImpl = zzdds.dcps.DataWriterImpl; const DataReaderImpl = zzdds.dcps.DataReaderImpl; const TopicImpl = zzdds.dcps.TopicImpl; +const ContentFilteredTopicImpl = zzdds.dcps.ContentFilteredTopicImpl; const noop_security = zzdds.noop_security.noop_security_plugins; const time_mod = zzdds.util.time; const RtpsTimestamp = zzdds.util.time.RtpsTimestamp; @@ -78,7 +79,8 @@ const Options = struct { num_iterations: i64 = -1, // -1 = infinite num_instances: u32 = 1, additional_payload: u32 = 0, - cft_expression: ?[]const u8 = null, // content filter expression (unsupported) + size_modulo: i32 = 0, // 0 = no cycling (--size-modulo) + cft_expression: ?[]const u8 = null, // content filter expression (--cft) }; // ── CDR helpers ─────────────────────────────────────────────────────────────── @@ -393,7 +395,11 @@ fn runPublisher( const topic_impl: *TopicImpl = @ptrCast(@alignCast(topic.ptr)); const topic_name = topic_impl.topic_name; - const pub_ = dp.vtable.create_publisher(dp.ptr, .{}, nil.nil_pub_listener, 0); + var pub_partition_name_buf: [1][]const u8 = .{opts.partition orelse ""}; + const pub_qos: DDS.PublisherQos = if (opts.partition) |_| .{ + .partition = .{ .name = .{ .items = &pub_partition_name_buf, .capacity = 1 } }, + } else .{}; + const pub_ = dp.vtable.create_publisher(dp.ptr, pub_qos, nil.nil_pub_listener, 0); if (pub_.ptr == nil.nil_pub_listener.ptr) return error.PublisherFailed; const dw_qos = try buildWriterQos(alloc, opts); @@ -497,6 +503,8 @@ fn runPublisher( if (opts.shapesize == 0) { shape.shapesize += 1; + if (opts.size_modulo > 0 and shape.shapesize > opts.size_modulo) + shape.shapesize = 1; } last_write_ns = time_mod.nanoTimestamp(); @@ -515,9 +523,36 @@ fn runSubscriber( ) !void { const topic_impl: *TopicImpl = @ptrCast(@alignCast(topic.ptr)); const topic_name = topic_impl.topic_name; - const topic_desc = topic_impl.toTopicDescription(); - const sub = dp.vtable.create_subscriber(dp.ptr, .{}, nil.nil_sub_listener, 0); + // Create a ContentFilteredTopic when a filter expression was supplied (-c). + // The CFT's TopicDescription returns the underlying topic name over RTPS so + // remote writers match on the base topic, while the filter is applied locally. + const cft: ?DDS.ContentFilteredTopic = blk: { + const expr = opts.cft_expression orelse break :blk null; + const cft_name = std.fmt.allocPrint( + alloc, "{s}_cft", .{topic_name}, + ) catch break :blk null; + defer alloc.free(cft_name); + const c = dp.vtable.create_contentfilteredtopic( + dp.ptr, cft_name, topic, expr, .empty, + ); + if (c.ptr == nil.nil_dr_listener.ptr) break :blk null; + break :blk c; + }; + defer { + if (cft) |c| _ = dp.vtable.delete_contentfilteredtopic(dp.ptr, c); + } + + const topic_desc: DDS.TopicDescription = if (cft) |c| blk: { + const impl: *ContentFilteredTopicImpl = @ptrCast(@alignCast(c.ptr)); + break :blk impl.toTopicDescription(); + } else topic_impl.toTopicDescription(); + + var sub_partition_name_buf: [1][]const u8 = .{opts.partition orelse ""}; + const sub_qos: DDS.SubscriberQos = if (opts.partition) |_| .{ + .partition = .{ .name = .{ .items = &sub_partition_name_buf, .capacity = 1 } }, + } else .{}; + const sub = dp.vtable.create_subscriber(dp.ptr, sub_qos, nil.nil_sub_listener, 0); if (sub.ptr == nil.nil_sub_listener.ptr) return error.SubscriberFailed; const dr_qos = try buildReaderQos(alloc, opts); @@ -546,6 +581,30 @@ fn runSubscriber( 0; var deadline_base_ns: i64 = 0; // 0 = no matched writer yet + // Extract the CFT impl pointer once so the hot path can call matchSample. + const cft_impl: ?*ContentFilteredTopicImpl = if (cft) |c| + @ptrCast(@alignCast(c.ptr)) + else + null; + + // Field accessor backed by a ParsedShape — used for CFT evaluation. + const ShapeAccessor = struct { + shape: *const ParsedShape, + + fn get(ctx: *anyopaque, field: []const u8) ?zzdds.dcps.filter.FilterValue { + const self: *const @This() = @ptrCast(@alignCast(ctx)); + if (std.mem.eql(u8, field, "color")) + return .{ .string = self.shape.color }; + if (std.mem.eql(u8, field, "x")) + return .{ .int = self.shape.x }; + if (std.mem.eql(u8, field, "y")) + return .{ .int = self.shape.y }; + if (std.mem.eql(u8, field, "shapesize")) + return .{ .int = self.shape.shapesize }; + return null; + } + }; + var iteration: i64 = 0; while (!g_all_done.load(.acquire)) { if (opts.num_iterations >= 0 and iteration >= opts.num_iterations) break; @@ -556,13 +615,23 @@ fn runSubscriber( } var got_data = false; - while (dr_impl.takeRaw()) |payload| { - defer alloc.free(payload); + while (dr_impl.takeRaw()) |taken| { + defer alloc.free(taken.data); got_data = true; - if (deserializeShape(payload)) |s| { - stdoutPrint("{s:<10} {s:<10} {d:0>3} {d:0>3} [{d}]\n", - .{ topic_name, s.color, @as(u32, @intCast(s.x)), @as(u32, @intCast(s.y)), s.shapesize }); + const s = deserializeShape(taken.data) orelse continue; + + // Apply CFT filter when one is configured. + if (cft_impl) |ci| { + var acc_ctx = ShapeAccessor{ .shape = &s }; + const accessor = zzdds.dcps.filter.FieldAccessor{ + .ctx = &acc_ctx, + .get = ShapeAccessor.get, + }; + if (!ci.matchSample(accessor)) continue; } + + stdoutPrint("{s:<10} {s:<10} {d:0>3} {d:0>3} [{d}]\n", + .{ topic_name, s.color, @as(u32, @intCast(s.x)), @as(u32, @intCast(s.y)), s.shapesize }); } if (got_data) { @@ -649,10 +718,15 @@ fn parseArgs(process_args: std.process.Args) !Options { } else if (std.mem.eql(u8, arg, "--additional-payload-size")) { const v = it.next() orelse return error.MissingValue; opts.additional_payload = try std.fmt.parseInt(u32, v, 10); + } else if (std.mem.eql(u8, arg, "--cft")) { + const v = it.next() orelse return error.MissingValue; + opts.cft_expression = v; + } else if (std.mem.eql(u8, arg, "--size-modulo")) { + const v = it.next() orelse return error.MissingValue; + opts.size_modulo = try std.fmt.parseInt(i32, v, 10); } else if (std.mem.eql(u8, arg, "--time-filter") or std.mem.eql(u8, arg, "--lifespan") or std.mem.eql(u8, arg, "--write-period") or - std.mem.eql(u8, arg, "--size-modulo") or std.mem.eql(u8, arg, "--periodic-announcement") or std.mem.eql(u8, arg, "--final-instance-state") or std.mem.eql(u8, arg, "--access-scope") or @@ -703,14 +777,6 @@ pub fn main(init: std.process.Init.Minimal) !void { std.process.exit(1); } - // If the subscriber was given a content-filter expression (via -c when - // subscribing, which the C++ code turns into a CFT), report it as unsupported - // so the harness gets SUB_UNSUPPORTED_FEATURE rather than hanging. - if (opts.subscribe and opts.cft_expression != null) { - stdoutPrint("not supported: ContentFilteredTopic\n", .{}); - return; - } - const udp = try UdpTransport.init(alloc, .{}, opts.domain_id, null); defer udp.deinit(); const transport = udp.transport(); diff --git a/srcZig/zenzen-zig/build.zig b/srcZig/zenzen-zig/build.zig index 1505113a..d6e947c9 100644 --- a/srcZig/zenzen-zig/build.zig +++ b/srcZig/zenzen-zig/build.zig @@ -18,7 +18,7 @@ pub fn build(b: *std.Build) void { const exe = b.addExecutable(.{ .name = exe_name, .root_module = b.createModule(.{ - .root_source_file = b.path("src/shape_main.zig"), + .root_source_file = b.path("../shape_main.zig"), .target = target, .optimize = optimize, .imports = &.{ diff --git a/srcZig/zenzen-zig/build.zig.zon b/srcZig/zenzen-zig/build.zig.zon index 6b02f398..4d5d1299 100644 --- a/srcZig/zenzen-zig/build.zig.zon +++ b/srcZig/zenzen-zig/build.zig.zon @@ -5,12 +5,11 @@ .minimum_zig_version = "0.16.0-dev.2848+b4ffb402c", .dependencies = .{ .zzdds = .{ - .path = "../../../ZenzenDDS", + .path = "../../../zz-dev/zzdds", }, }, .paths = .{ "build.zig", "build.zig.zon", - "src", }, } From be673cd4c36f2d88dddb79e883be184a8e210c78 Mon Sep 17 00:00:00 2001 From: Timothy Simpson Date: Tue, 12 May 2026 22:24:58 +0100 Subject: [PATCH 03/16] generalize shape_main / add dds shim --- srcZig/dds.zig | 78 +++++ srcZig/shape_main.zig | 507 ++++++++++++++++----------------- srcZig/zenzen-zig/build.zig | 36 ++- srcZig/zenzen-zig/dds_impl.zig | 203 +++++++++++++ 4 files changed, 553 insertions(+), 271 deletions(-) create mode 100644 srcZig/dds.zig create mode 100644 srcZig/zenzen-zig/dds_impl.zig diff --git a/srcZig/dds.zig b/srcZig/dds.zig new file mode 100644 index 00000000..ee955caf --- /dev/null +++ b/srcZig/dds.zig @@ -0,0 +1,78 @@ +//! Zig DDS shim — interface contract for srcZig/shape_main.zig. +//! +//! shape_main.zig imports a module named "dds". Any Zig DDS vendor that +//! wants to participate in the dds-rtps interoperability test suite provides +//! their own implementation of this module and wires it up as the "dds" +//! dependency in their build.zig. +//! +//! ZenzenDDS's implementation lives in zenzen-zig/dds_impl.zig. +//! +//! ── Required exports ────────────────────────────────────────────────────── +//! +//! pub const DDS = ...; +//! Re-export of the vendor's standard DDS type package. Must expose the +//! standard DCPS entity handles and QoS/Status types used by shape_main: +//! DomainParticipant, Publisher, Subscriber, Topic, ContentFilteredTopic, +//! TopicDescription, DataWriter, DataReader, DataWriterQos, DataReaderQos, +//! PublisherQos, SubscriberQos, DataWriterListener, DataReaderListener, +//! StatusMask, and the status constants (OFFERED_INCOMPATIBLE_QOS_STATUS, +//! etc.). +//! +//! pub const Participant = struct { ... }; +//! Opaque vendor state that bundles transport, discovery, and factory. +//! shape_main calls createParticipant / destroyParticipant and then +//! calls toDDS() to get the standard DomainParticipant handle for use +//! with the standard vtable API. +//! +//! pub fn createParticipant(alloc: std.mem.Allocator, domain_id: u32) !*Participant; +//! pub fn destroyParticipant(p: *Participant) void; +//! +//! pub fn topicName(topic: DDS.Topic) []const u8; +//! Returns the topic name string from a DDS.Topic handle. +//! +//! ── DataWriter extras (not in the standard DCPS vtable) ─────────────── +//! +//! pub const WriteKind = enum { alive, dispose, unregister }; +//! +//! pub fn writeRaw(dw: DDS.DataWriter, kind: WriteKind, +//! key_hash: [16]u8, data: []const u8) !void; +//! Write a pre-serialized CDR payload. The vendor stamps the source +//! timestamp internally (always "now") matching the behaviour of the +//! standard typed write() call in C/C++/Rust shape_main implementations. +//! +//! pub fn writerMatchedCount(dw: DDS.DataWriter) usize; +//! pub fn writerNotifyDeadline(dw: DDS.DataWriter) void; +//! +//! ── DataReader extras ───────────────────────────────────────────────── +//! +//! pub const TakenSample = struct { +//! data: []u8, +//! alloc: std.mem.Allocator, +//! pub fn deinit(self: TakenSample) void, +//! }; +//! +//! pub fn takeRaw(dr: DDS.DataReader) ?TakenSample; +//! Returns the next pending sample, or null if the queue is empty. +//! Caller must call sample.deinit() when done. +//! +//! pub fn readerMatchedCount(dr: DDS.DataReader) usize; +//! pub fn readerNotifyDeadline(dr: DDS.DataReader) void; +//! +//! ── ContentFilteredTopic evaluation ────────────────────────────────── +//! +//! pub const FilterValue = union(enum) { +//! string: []const u8, +//! int: i64, +//! float: f64, +//! }; +//! +//! pub const FieldAccessor = struct { +//! ctx: *anyopaque, +//! get: *const fn (ctx: *anyopaque, field: []const u8) ?FilterValue, +//! }; +//! +//! pub fn cftMatchSample(cft: DDS.ContentFilteredTopic, acc: FieldAccessor) bool; +//! pub fn cftTopicDescription(cft: DDS.ContentFilteredTopic) DDS.TopicDescription; + +// This file is documentation only. shape_main.zig imports the module +// named "dds" which is provided by the vendor's build.zig, not this file. diff --git a/srcZig/shape_main.zig b/srcZig/shape_main.zig index 63b05b6e..e997bf67 100644 --- a/srcZig/shape_main.zig +++ b/srcZig/shape_main.zig @@ -1,8 +1,12 @@ -//! ZenzenDDS shape_main — interoperability test application for OMG dds-rtps. +//! Zig DDS shape_main — interoperability test application for OMG dds-rtps. //! //! Mirrors the CLI interface of srcCxx/shape_main.cxx so the Python test //! harness (interoperability_report.py) can drive it as a pub or sub. //! +//! This file is vendor-agnostic. All DDS implementation details are hidden +//! behind the "dds" module, which each Zig DDS vendor supplies via their +//! build.zig. See srcZig/dds.zig for the full interface contract. +//! //! Required stdout strings (matched by the harness via pexpect): //! Publisher: "Create topic:" → "Create writer for topic:" → //! "on_publication_matched()" or "on_offered_incompatible_qos" → @@ -10,22 +14,26 @@ //! Subscriber: "Create topic:" → "Create reader for topic:" → //! "[]" in the sample line or "on_requested_incompatible_qos()" -const std = @import("std"); -const zzdds = @import("zzdds"); -const DDS = @import("zzdds_generated").DDS; - -const UdpTransport = zzdds.udp_transport.UdpTransport; -const SpdpSedpDiscovery = zzdds.combined_discovery.SpdpSedpDiscovery; -const DomainParticipantFactoryImpl = zzdds.dcps.DomainParticipantFactoryImpl; -const DataWriterImpl = zzdds.dcps.DataWriterImpl; -const DataReaderImpl = zzdds.dcps.DataReaderImpl; -const TopicImpl = zzdds.dcps.TopicImpl; -const ContentFilteredTopicImpl = zzdds.dcps.ContentFilteredTopicImpl; -const noop_security = zzdds.noop_security.noop_security_plugins; -const time_mod = zzdds.util.time; -const RtpsTimestamp = zzdds.util.time.RtpsTimestamp; -const history_mod = zzdds.rtps.history; -const nil = zzdds.dcps; +const std = @import("std"); +const dds = @import("dds"); +const DDS = dds.DDS; + +// ── Time helpers ───────────────────────────────────────────────────────────── +// std.time.nanoTimestamp / std.time.sleep were removed in Zig 0.16. + +fn monoNs() i64 { + var ts: std.os.linux.timespec = undefined; + _ = std.os.linux.clock_gettime(.MONOTONIC, &ts); + return ts.sec * std.time.ns_per_s + ts.nsec; +} + +fn sleepNs(ns: u64) void { + var req = std.os.linux.timespec{ + .sec = @intCast(ns / std.time.ns_per_s), + .nsec = @intCast(ns % std.time.ns_per_s), + }; + _ = std.os.linux.nanosleep(&req, null); +} // ── Stdout helpers ──────────────────────────────────────────────────────────── // std.io was removed in Zig 0.16; write directly via the Linux write(2) syscall. @@ -59,28 +67,28 @@ fn handleSigint(sig: std.posix.SIG) callconv(.c) void { // ── Options ─────────────────────────────────────────────────────────────────── const Options = struct { - publish: bool = false, - subscribe: bool = false, - domain_id: u32 = 0, - best_effort: bool = false, - reliable: bool = false, - history_depth: i32 = -1, // -1 = use default KEEP_LAST 1 - deadline_ms: u64 = 0, - ownership_strength: i32 = -1, // -1 = SHARED - topic_name: []const u8 = "Square", - color: ?[]const u8 = null, - partition: ?[]const u8 = null, - durability: u8 = 'v', - data_representation: u16 = 1, // 1=XCDR1, 2=XCDR2 - print_writer_samples: bool = false, - shapesize: i32 = 20, - write_period_ms: u64 = 33, - read_period_ms: u64 = 100, - num_iterations: i64 = -1, // -1 = infinite - num_instances: u32 = 1, - additional_payload: u32 = 0, - size_modulo: i32 = 0, // 0 = no cycling (--size-modulo) - cft_expression: ?[]const u8 = null, // content filter expression (--cft) + publish: bool = false, + subscribe: bool = false, + domain_id: u32 = 0, + best_effort: bool = false, + reliable: bool = false, + history_depth: i32 = -1, // -1 = use default KEEP_LAST 1 + deadline_ms: u64 = 0, + ownership_strength: i32 = -1, // -1 = SHARED + topic_name: []const u8 = "Square", + color: ?[]const u8 = null, + partition: ?[]const u8 = null, + durability: u8 = 'v', + data_representation: u16 = 1, // 1=XCDR1, 2=XCDR2 + print_writer_samples: bool = false, + shapesize: i32 = 20, + write_period_ms: u64 = 33, + read_period_ms: u64 = 100, + num_iterations: i64 = -1, // -1 = infinite + num_instances: u32 = 1, + additional_payload: u32 = 0, + size_modulo: i32 = 0, // 0 = no cycling (--size-modulo) + cft_expression: ?[]const u8 = null, // content filter expression (--cft) }; // ── CDR helpers ─────────────────────────────────────────────────────────────── @@ -104,11 +112,11 @@ fn align4(n: usize) usize { // ── ShapeType CDR serialization ─────────────────────────────────────────────── const ShapeData = struct { - color: []const u8, - x: i32, - y: i32, + color: []const u8, + x: i32, + y: i32, shapesize: i32, - payload: u32, // additional_payload_size sequence length (all-zero bytes) + payload: u32, // additional_payload_size sequence length (all-zero bytes) }; // Serialize ShapeType with a 4-byte encapsulation header. @@ -134,10 +142,14 @@ fn serializeShape(buf: *std.ArrayList(u8), alloc: std.mem.Allocator, s: ShapeDat const pad = align4(off) - off; for (0..pad) |_| try buf.append(alloc, 0); off += pad; - try writeI32Le(buf, alloc, s.x); off += 4; - try writeI32Le(buf, alloc, s.y); off += 4; - try writeI32Le(buf, alloc, s.shapesize); off += 4; - try writeU32Le(buf, alloc, s.payload); off += 4; + try writeI32Le(buf, alloc, s.x); + off += 4; + try writeI32Le(buf, alloc, s.y); + off += 4; + try writeI32Le(buf, alloc, s.shapesize); + off += 4; + try writeU32Le(buf, alloc, s.payload); + off += 4; for (0..s.payload) |_| try buf.append(alloc, 0); // Patch DHEADER = length of struct members (bytes after DHEADER) const member_len: u32 = @intCast(buf.items.len - dheader_pos - 4); @@ -156,10 +168,14 @@ fn serializeShape(buf: *std.ArrayList(u8), alloc: std.mem.Allocator, s: ShapeDat const pad = align4(off) - off; for (0..pad) |_| try buf.append(alloc, 0); off += pad; - try writeI32Le(buf, alloc, s.x); off += 4; - try writeI32Le(buf, alloc, s.y); off += 4; - try writeI32Le(buf, alloc, s.shapesize); off += 4; - try writeU32Le(buf, alloc, s.payload); off += 4; + try writeI32Le(buf, alloc, s.x); + off += 4; + try writeI32Le(buf, alloc, s.y); + off += 4; + try writeI32Le(buf, alloc, s.shapesize); + off += 4; + try writeU32Le(buf, alloc, s.payload); + off += 4; for (0..s.payload) |_| try buf.append(alloc, 0); } } @@ -182,9 +198,9 @@ fn colorKeyHash(color: []const u8) [16]u8 { } const ParsedShape = struct { - color: []const u8, // slice into payload; valid while payload is alive - x: i32, - y: i32, + color: []const u8, // slice into payload; valid while payload is alive + x: i32, + y: i32, shapesize: i32, }; @@ -213,8 +229,10 @@ fn deserializeShape(payload: []const u8) ?ParsedShape { off = align4(off); if (payload.len < off + 12) return null; // x + y + shapesize - const x = std.mem.readInt(i32, payload[off..][0..4], .little); off += 4; - const y = std.mem.readInt(i32, payload[off..][0..4], .little); off += 4; + const x = std.mem.readInt(i32, payload[off..][0..4], .little); + off += 4; + const y = std.mem.readInt(i32, payload[off..][0..4], .little); + off += 4; const shapesize = std.mem.readInt(i32, payload[off..][0..4], .little); return ParsedShape{ .color = color, .x = x, .y = y, .shapesize = shapesize }; @@ -224,11 +242,11 @@ fn deserializeShape(payload: []const u8) ?ParsedShape { fn policyName(id: i32) []const u8 { return switch (id) { - 2 => "DURABILITY", - 4 => "DEADLINE", - 5 => "LATENCYBUDGET", - 6 => "OWNERSHIP", - 8 => "LIVELINESS", + 2 => "DURABILITY", + 4 => "DEADLINE", + 5 => "LATENCYBUDGET", + 6 => "OWNERSHIP", + 8 => "LIVELINESS", 10 => "PARTITION", 11 => "RELIABILITY", 12 => "DESTINATIONORDER", @@ -241,21 +259,19 @@ fn policyName(id: i32) []const u8 { const ListenerCtx = struct { topic_name: []const u8, - type_name: []const u8 = "ShapeType", + type_name: []const u8 = "ShapeType", }; // DataWriter listeners fn dwOnIncompatQos(ctx: *anyopaque, _: DDS.DataWriter, status: DDS.OfferedIncompatibleQosStatus) void { const lc: *ListenerCtx = @ptrCast(@alignCast(ctx)); - stdoutPrint("on_offered_incompatible_qos() topic: '{s}' type: '{s}' : {d} ({s})\n", - .{ lc.topic_name, lc.type_name, status.last_policy_id, policyName(status.last_policy_id) }); + stdoutPrint("on_offered_incompatible_qos() topic: '{s}' type: '{s}' : {d} ({s})\n", .{ lc.topic_name, lc.type_name, status.last_policy_id, policyName(status.last_policy_id) }); } fn dwOnDeadlineMissed(ctx: *anyopaque, _: DDS.DataWriter, status: DDS.OfferedDeadlineMissedStatus) void { const lc: *ListenerCtx = @ptrCast(@alignCast(ctx)); - stdoutPrint("on_offered_deadline_missed() topic: '{s}' type: '{s}' : (total = {d}, change = {d})\n", - .{ lc.topic_name, lc.type_name, status.total_count, status.total_count_change }); + stdoutPrint("on_offered_deadline_missed() topic: '{s}' type: '{s}' : (total = {d}, change = {d})\n", .{ lc.topic_name, lc.type_name, status.total_count, status.total_count_change }); } fn dwOnLivelinessLost(_: *anyopaque, _: DDS.DataWriter, _: DDS.LivelinessLostStatus) void {} @@ -263,25 +279,23 @@ fn dwOnPubMatched(_: *anyopaque, _: DDS.DataWriter, _: DDS.PublicationMatchedSta fn dwOnDeinit(_: *anyopaque) void {} var dw_vtable = DDS.DataWriterListener.Vtable{ - .on_offered_deadline_missed = dwOnDeadlineMissed, + .on_offered_deadline_missed = dwOnDeadlineMissed, .on_offered_incompatible_qos = dwOnIncompatQos, - .on_liveliness_lost = dwOnLivelinessLost, - .on_publication_matched = dwOnPubMatched, - .deinit = dwOnDeinit, + .on_liveliness_lost = dwOnLivelinessLost, + .on_publication_matched = dwOnPubMatched, + .deinit = dwOnDeinit, }; // DataReader listeners fn drOnIncompatQos(ctx: *anyopaque, _: DDS.DataReader, status: DDS.RequestedIncompatibleQosStatus) void { const lc: *ListenerCtx = @ptrCast(@alignCast(ctx)); - stdoutPrint("on_requested_incompatible_qos() topic: '{s}' type: '{s}' : {d} ({s})\n", - .{ lc.topic_name, lc.type_name, status.last_policy_id, policyName(status.last_policy_id) }); + stdoutPrint("on_requested_incompatible_qos() topic: '{s}' type: '{s}' : {d} ({s})\n", .{ lc.topic_name, lc.type_name, status.last_policy_id, policyName(status.last_policy_id) }); } fn drOnDeadlineMissed(ctx: *anyopaque, _: DDS.DataReader, status: DDS.RequestedDeadlineMissedStatus) void { const lc: *ListenerCtx = @ptrCast(@alignCast(ctx)); - stdoutPrint("on_requested_deadline_missed() topic: '{s}' type: '{s}' : (total = {d}, change = {d})\n", - .{ lc.topic_name, lc.type_name, status.total_count, status.total_count_change }); + stdoutPrint("on_requested_deadline_missed() topic: '{s}' type: '{s}' : (total = {d}, change = {d})\n", .{ lc.topic_name, lc.type_name, status.total_count, status.total_count_change }); } fn drOnSampleRejected(_: *anyopaque, _: DDS.DataReader, _: DDS.SampleRejectedStatus) void {} @@ -292,14 +306,14 @@ fn drOnSampleLost(_: *anyopaque, _: DDS.DataReader, _: DDS.SampleLostStatus) voi fn drOnDeinit(_: *anyopaque) void {} var dr_vtable = DDS.DataReaderListener.Vtable{ - .on_requested_deadline_missed = drOnDeadlineMissed, + .on_requested_deadline_missed = drOnDeadlineMissed, .on_requested_incompatible_qos = drOnIncompatQos, - .on_sample_rejected = drOnSampleRejected, - .on_liveliness_changed = drOnLivelinessChanged, - .on_data_available = drOnDataAvail, - .on_subscription_matched = drOnSubMatched, - .on_sample_lost = drOnSampleLost, - .deinit = drOnDeinit, + .on_sample_rejected = drOnSampleRejected, + .on_liveliness_changed = drOnLivelinessChanged, + .on_data_available = drOnDataAvail, + .on_subscription_matched = drOnSubMatched, + .on_sample_lost = drOnSampleLost, + .deinit = drOnDeinit, }; // ── DataWriter QoS builder ──────────────────────────────────────────────────── @@ -313,172 +327,185 @@ fn buildWriterQos(alloc: std.mem.Allocator, opts: *const Options) !DDS.DataWrite .RELIABLE_RELIABILITY_QOS; if (opts.history_depth == 0) { - qos.history.kind = .KEEP_ALL_HISTORY_QOS; + qos.history.kind = .KEEP_ALL_HISTORY_QOS; } else if (opts.history_depth > 0) { - qos.history.kind = .KEEP_LAST_HISTORY_QOS; + qos.history.kind = .KEEP_LAST_HISTORY_QOS; qos.history.depth = opts.history_depth; } + if (opts.deadline_ms > 0) { + qos.deadline.period = .{ + .sec = @intCast(opts.deadline_ms / 1000), + .nanosec = @intCast((opts.deadline_ms % 1000) * std.time.ns_per_ms), + }; + } + + if (opts.ownership_strength >= 0) { + qos.ownership.kind = .EXCLUSIVE_OWNERSHIP_QOS; + qos.ownership_strength.value = opts.ownership_strength; + } + qos.durability.kind = switch (opts.durability) { + 'v' => .VOLATILE_DURABILITY_QOS, 'l' => .TRANSIENT_LOCAL_DURABILITY_QOS, 't' => .TRANSIENT_DURABILITY_QOS, 'p' => .PERSISTENT_DURABILITY_QOS, else => .VOLATILE_DURABILITY_QOS, }; - if (opts.ownership_strength >= 0) { - qos.ownership.kind = .EXCLUSIVE_OWNERSHIP_QOS; - qos.ownership_strength.value = opts.ownership_strength; - } - - qos.deadline.period = if (opts.deadline_ms > 0) .{ - .sec = @intCast(opts.deadline_ms / 1000), - .nanosec = @intCast((opts.deadline_ms % 1000) * std.time.ns_per_ms), - } else .{ .sec = 0x7fff_ffff, .nanosec = 0x7fff_ffff }; // INFINITE (DDS default) - - // Wire IDs: XCDR1=0, XCDR2=2. opts uses 1=XCDR1, 2=XCDR2 for backward compat. const repr_id: i16 = if (opts.data_representation == 2) 2 else 0; try qos.data_representation.value.append(alloc, repr_id); return qos; } +// ── DataReader QoS builder ──────────────────────────────────────────────────── + fn buildReaderQos(alloc: std.mem.Allocator, opts: *const Options) !DDS.DataReaderQos { var qos = DDS.DataReaderQos{}; qos.reliability.kind = if (opts.best_effort) .BEST_EFFORT_RELIABILITY_QOS - else if (opts.reliable) - .RELIABLE_RELIABILITY_QOS else - .RELIABLE_RELIABILITY_QOS; // default: RELIABLE + .RELIABLE_RELIABILITY_QOS; if (opts.history_depth == 0) { - qos.history.kind = .KEEP_ALL_HISTORY_QOS; + qos.history.kind = .KEEP_ALL_HISTORY_QOS; } else if (opts.history_depth > 0) { - qos.history.kind = .KEEP_LAST_HISTORY_QOS; + qos.history.kind = .KEEP_LAST_HISTORY_QOS; qos.history.depth = opts.history_depth; } + if (opts.deadline_ms > 0) { + qos.deadline.period = .{ + .sec = @intCast(opts.deadline_ms / 1000), + .nanosec = @intCast((opts.deadline_ms % 1000) * std.time.ns_per_ms), + }; + } + + if (opts.ownership_strength >= 0) { + qos.ownership.kind = .EXCLUSIVE_OWNERSHIP_QOS; + } + qos.durability.kind = switch (opts.durability) { + 'v' => .VOLATILE_DURABILITY_QOS, 'l' => .TRANSIENT_LOCAL_DURABILITY_QOS, 't' => .TRANSIENT_DURABILITY_QOS, 'p' => .PERSISTENT_DURABILITY_QOS, else => .VOLATILE_DURABILITY_QOS, }; - if (opts.ownership_strength >= 0) { - qos.ownership.kind = .EXCLUSIVE_OWNERSHIP_QOS; - } - - qos.deadline.period = if (opts.deadline_ms > 0) .{ - .sec = @intCast(opts.deadline_ms / 1000), - .nanosec = @intCast((opts.deadline_ms % 1000) * std.time.ns_per_ms), - } else .{ .sec = 0x7fff_ffff, .nanosec = 0x7fff_ffff }; // INFINITE (DDS default) - - // Wire IDs: XCDR1=0, XCDR2=2. opts uses 1=XCDR1, 2=XCDR2 for backward compat. const repr_id: i16 = if (opts.data_representation == 2) 2 else 0; try qos.data_representation.value.append(alloc, repr_id); return qos; } +// ── nil sentinel helpers ────────────────────────────────────────────────────── +// Delegated to the vendor dds module, which knows the implementation's nil +// sentinel value (each vendor may use a different non-null sentinel address). + +fn isNilTopic(t: DDS.Topic) bool { + return dds.isNilTopic(t); +} +fn isNilPub(p: DDS.Publisher) bool { + return dds.isNilPub(p); +} +fn isNilSub(s: DDS.Subscriber) bool { + return dds.isNilSub(s); +} +fn isNilDw(dw: DDS.DataWriter) bool { + return dds.isNilDw(dw); +} +fn isNilDr(dr: DDS.DataReader) bool { + return dds.isNilDr(dr); +} +fn isNilCft(cft: DDS.ContentFilteredTopic) bool { + return dds.isNilCft(cft); +} + // ── Publisher ───────────────────────────────────────────────────────────────── fn runPublisher( alloc: std.mem.Allocator, - dp: DDS.DomainParticipant, + dp: DDS.DomainParticipant, topic: DDS.Topic, - opts: *const Options, + opts: *const Options, ) !void { const color = opts.color orelse "BLUE"; - const topic_impl: *TopicImpl = @ptrCast(@alignCast(topic.ptr)); - const topic_name = topic_impl.topic_name; + const topic_name = dds.topicName(topic); var pub_partition_name_buf: [1][]const u8 = .{opts.partition orelse ""}; const pub_qos: DDS.PublisherQos = if (opts.partition) |_| .{ .partition = .{ .name = .{ .items = &pub_partition_name_buf, .capacity = 1 } }, } else .{}; - const pub_ = dp.vtable.create_publisher(dp.ptr, pub_qos, nil.nil_pub_listener, 0); - if (pub_.ptr == nil.nil_pub_listener.ptr) return error.PublisherFailed; + const pub_ = dp.vtable.create_publisher(dp.ptr, pub_qos, dds.nilPublisherListener(), 0); + if (isNilPub(pub_)) return error.PublisherFailed; const dw_qos = try buildWriterQos(alloc, opts); var lctx = ListenerCtx{ .topic_name = topic_name }; const dw_listener = DDS.DataWriterListener{ - .ptr = &lctx, + .ptr = &lctx, .vtable = &dw_vtable, }; - // Enable incompatible-QoS and deadline-missed callbacks. const listener_mask: DDS.StatusMask = DDS.OFFERED_INCOMPATIBLE_QOS_STATUS | DDS.OFFERED_DEADLINE_MISSED_STATUS; const dw = pub_.vtable.create_datawriter(pub_.ptr, topic, dw_qos, dw_listener, listener_mask); - if (dw.ptr == nil.nil_dw_listener.ptr) return error.DataWriterFailed; + if (isNilDw(dw)) return error.DataWriterFailed; stdoutPrint("Create writer for topic: {s} color: {s}\n", .{ topic_name, color }); - const dw_impl: *DataWriterImpl = @ptrCast(@alignCast(dw.ptr)); - - // Write loop starts immediately so history accumulates before subscribers join. - // The match notification is detected inline and printed on first discovery. - // Timeout after 10 s with no reader → exit (test harness detects READER_NOT_MATCHED). var buf: std.ArrayList(u8) = .empty; defer buf.deinit(alloc); var shape = ShapeData{ - .color = color, - .x = 0, - .y = 0, + .color = color, + .x = 0, + .y = 0, .shapesize = if (opts.shapesize == 0) 1 else opts.shapesize, - .payload = opts.additional_payload, + .payload = opts.additional_payload, }; - var rng = std.Random.DefaultPrng.init(@intCast(time_mod.nanoTimestamp())); + var rng = std.Random.DefaultPrng.init(@intCast(monoNs())); const rand = rng.random(); - const match_deadline = time_mod.nanoTimestamp() + 10 * std.time.ns_per_s; + const match_deadline = monoNs() + 10 * std.time.ns_per_s; var printed_matched = false; - // Deadline monitoring: track time of last write to detect when the deadline - // period expires between writes (write_period > deadline → missed). const deadline_ns: i64 = if (opts.deadline_ms > 0) @intCast(opts.deadline_ms * std.time.ns_per_ms) else 0; - var last_write_ns: i64 = time_mod.nanoTimestamp(); + var last_write_ns: i64 = monoNs(); var iteration: i64 = 0; while (!g_all_done.load(.acquire)) { if (opts.num_iterations >= 0 and iteration >= opts.num_iterations) break; - // Detect reader match; emit notification once; exit on timeout (no match). if (!printed_matched) { - if (dw_impl.matchedReaderCount() > 0) { + if (dds.writerMatchedCount(dw) > 0) { stdoutPrint( "on_publication_matched() topic: '{s}' type: 'ShapeType' : matched readers {d} (change = 1)\n", - .{ topic_name, dw_impl.matchedReaderCount() }, + .{ topic_name, dds.writerMatchedCount(dw) }, ); printed_matched = true; - } else if (time_mod.nanoTimestamp() > match_deadline) { + } else if (monoNs() > match_deadline) { return; // READER_NOT_MATCHED } } - // Check if the deadline expired since the last write (write_period > deadline). if (deadline_ns > 0) { - const elapsed = time_mod.nanoTimestamp() - last_write_ns; - if (elapsed > deadline_ns) dw_impl.notifyDeadlineMissed(); + const elapsed = monoNs() - last_write_ns; + if (elapsed > deadline_ns) dds.writerNotifyDeadline(dw); } - // Randomise position (0–319 x, 0–239 y) — matches C++ shape_main defaults shape.x = @rem(@as(i32, rand.int(u16)), 320); shape.y = @rem(@as(i32, rand.int(u16)), 240); - // Serialize and write each instance for (0..opts.num_instances) |inst| { const inst_color: []const u8 = blk: { if (inst == 0) break :blk color; - // additional instances: color + index (e.g. "BLUE1", "BLUE2") break :blk std.fmt.allocPrint(alloc, "{s}{d}", .{ color, inst }) catch color; }; defer if (inst > 0) alloc.free(inst_color); @@ -487,17 +514,10 @@ fn runPublisher( try serializeShape(&buf, alloc, shape, opts.data_representation == 2); const key_hash = colorKeyHash(inst_color); - _ = try dw_impl.writeRaw( - .alive, - RtpsTimestamp.now(), - history_mod.INSTANCE_HANDLE_NIL, - key_hash, - buf.items, - ); + try dds.writeRaw(dw, .alive, key_hash, buf.items); if (opts.print_writer_samples) { - stdoutPrint("{s:<10} {s:<10} {d:0>3} {d:0>3} [{d}]\n", - .{ topic_name, inst_color, @as(u32, @intCast(shape.x)), @as(u32, @intCast(shape.y)), shape.shapesize }); + stdoutPrint("{s:<10} {s:<10} {d:0>3} {d:0>3} [{d}]\n", .{ topic_name, inst_color, @as(u32, @intCast(shape.x)), @as(u32, @intCast(shape.y)), shape.shapesize }); } } @@ -507,9 +527,9 @@ fn runPublisher( shape.shapesize = 1; } - last_write_ns = time_mod.nanoTimestamp(); + last_write_ns = monoNs(); iteration += 1; - time_mod.sleepNs(opts.write_period_ms * std.time.ns_per_ms); + sleepNs(opts.write_period_ms * std.time.ns_per_ms); } } @@ -517,81 +537,71 @@ fn runPublisher( fn runSubscriber( alloc: std.mem.Allocator, - dp: DDS.DomainParticipant, + dp: DDS.DomainParticipant, topic: DDS.Topic, - opts: *const Options, + opts: *const Options, ) !void { - const topic_impl: *TopicImpl = @ptrCast(@alignCast(topic.ptr)); - const topic_name = topic_impl.topic_name; + const topic_name = dds.topicName(topic); - // Create a ContentFilteredTopic when a filter expression was supplied (-c). - // The CFT's TopicDescription returns the underlying topic name over RTPS so - // remote writers match on the base topic, while the filter is applied locally. const cft: ?DDS.ContentFilteredTopic = blk: { const expr = opts.cft_expression orelse break :blk null; const cft_name = std.fmt.allocPrint( - alloc, "{s}_cft", .{topic_name}, + alloc, + "{s}_cft", + .{topic_name}, ) catch break :blk null; defer alloc.free(cft_name); const c = dp.vtable.create_contentfilteredtopic( - dp.ptr, cft_name, topic, expr, .empty, + dp.ptr, + cft_name, + topic, + expr, + .empty, ); - if (c.ptr == nil.nil_dr_listener.ptr) break :blk null; + if (isNilCft(c)) break :blk null; break :blk c; }; defer { if (cft) |c| _ = dp.vtable.delete_contentfilteredtopic(dp.ptr, c); } - const topic_desc: DDS.TopicDescription = if (cft) |c| blk: { - const impl: *ContentFilteredTopicImpl = @ptrCast(@alignCast(c.ptr)); - break :blk impl.toTopicDescription(); - } else topic_impl.toTopicDescription(); + const topic_desc: DDS.TopicDescription = if (cft) |c| + dds.cftTopicDescription(c) + else + dp.vtable.lookup_topicdescription(dp.ptr, dds.topicName(topic)); var sub_partition_name_buf: [1][]const u8 = .{opts.partition orelse ""}; const sub_qos: DDS.SubscriberQos = if (opts.partition) |_| .{ .partition = .{ .name = .{ .items = &sub_partition_name_buf, .capacity = 1 } }, } else .{}; - const sub = dp.vtable.create_subscriber(dp.ptr, sub_qos, nil.nil_sub_listener, 0); - if (sub.ptr == nil.nil_sub_listener.ptr) return error.SubscriberFailed; + const sub = dp.vtable.create_subscriber(dp.ptr, sub_qos, dds.nilSubscriberListener(), 0); + if (isNilSub(sub)) return error.SubscriberFailed; const dr_qos = try buildReaderQos(alloc, opts); var lctx = ListenerCtx{ .topic_name = topic_name }; const dr_listener = DDS.DataReaderListener{ - .ptr = &lctx, + .ptr = &lctx, .vtable = &dr_vtable, }; const listener_mask: DDS.StatusMask = DDS.REQUESTED_INCOMPATIBLE_QOS_STATUS | DDS.REQUESTED_DEADLINE_MISSED_STATUS; const dr = sub.vtable.create_datareader(sub.ptr, topic_desc, dr_qos, dr_listener, listener_mask); - if (dr.ptr == nil.nil_dr_listener.ptr) return error.DataReaderFailed; + if (isNilDr(dr)) return error.DataReaderFailed; stdoutPrint("Create reader for topic: {s}\n", .{topic_name}); - const dr_impl: *DataReaderImpl = @ptrCast(@alignCast(dr.ptr)); - - // Deadline monitoring: once a writer is matched, track time since last - // received sample; fire on_requested_deadline_missed when the deadline - // period expires without data. const sub_deadline_ns: i64 = if (opts.deadline_ms > 0) @intCast(opts.deadline_ms * std.time.ns_per_ms) else 0; - var deadline_base_ns: i64 = 0; // 0 = no matched writer yet + var deadline_base_ns: i64 = 0; - // Extract the CFT impl pointer once so the hot path can call matchSample. - const cft_impl: ?*ContentFilteredTopicImpl = if (cft) |c| - @ptrCast(@alignCast(c.ptr)) - else - null; - - // Field accessor backed by a ParsedShape — used for CFT evaluation. const ShapeAccessor = struct { shape: *const ParsedShape, - fn get(ctx: *anyopaque, field: []const u8) ?zzdds.dcps.filter.FilterValue { + fn get(ctx: *anyopaque, field: []const u8) ?dds.FilterValue { const self: *const @This() = @ptrCast(@alignCast(ctx)); if (std.mem.eql(u8, field, "color")) return .{ .string = self.shape.color }; @@ -609,44 +619,39 @@ fn runSubscriber( while (!g_all_done.load(.acquire)) { if (opts.num_iterations >= 0 and iteration >= opts.num_iterations) break; - // Start the deadline clock when the first writer is matched. - if (sub_deadline_ns > 0 and deadline_base_ns == 0 and dr_impl.matchedWriterCount() > 0) { - deadline_base_ns = time_mod.nanoTimestamp(); + if (sub_deadline_ns > 0 and deadline_base_ns == 0 and dds.readerMatchedCount(dr) > 0) { + deadline_base_ns = monoNs(); } var got_data = false; - while (dr_impl.takeRaw()) |taken| { - defer alloc.free(taken.data); + while (dds.takeRaw(dr)) |taken| { + defer taken.deinit(); got_data = true; const s = deserializeShape(taken.data) orelse continue; - // Apply CFT filter when one is configured. - if (cft_impl) |ci| { + if (cft) |c| { var acc_ctx = ShapeAccessor{ .shape = &s }; - const accessor = zzdds.dcps.filter.FieldAccessor{ + const accessor = dds.FieldAccessor{ .ctx = &acc_ctx, .get = ShapeAccessor.get, }; - if (!ci.matchSample(accessor)) continue; + if (!dds.cftMatchSample(c, accessor)) continue; } - stdoutPrint("{s:<10} {s:<10} {d:0>3} {d:0>3} [{d}]\n", - .{ topic_name, s.color, @as(u32, @intCast(s.x)), @as(u32, @intCast(s.y)), s.shapesize }); + stdoutPrint("{s:<10} {s:<10} {d:0>3} {d:0>3} [{d}]\n", .{ topic_name, s.color, @as(u32, @intCast(s.x)), @as(u32, @intCast(s.y)), s.shapesize }); } if (got_data) { - // Reset deadline clock on each received sample. - deadline_base_ns = time_mod.nanoTimestamp(); + deadline_base_ns = monoNs(); } else if (sub_deadline_ns > 0 and deadline_base_ns != 0) { - // No data this iteration — check if the deadline has expired. - if (time_mod.nanoTimestamp() - deadline_base_ns > sub_deadline_ns) { - dr_impl.notifyDeadlineMissed(); - deadline_base_ns = time_mod.nanoTimestamp(); // reset to avoid repeated firing + if (monoNs() - deadline_base_ns > sub_deadline_ns) { + dds.readerNotifyDeadline(dr); + deadline_base_ns = monoNs(); } } iteration += 1; - time_mod.sleepNs(opts.read_period_ms * std.time.ns_per_ms); + sleepNs(opts.read_period_ms * std.time.ns_per_ms); } } @@ -693,50 +698,48 @@ fn parseArgs(process_args: std.process.Args) !Options { opts.partition = v; } else if (std.mem.eql(u8, arg, "-D")) { const v = it.next() orelse return error.MissingValue; - if (v.len > 0) opts.durability = v[0]; + opts.durability = if (v.len > 0) v[0] else 'v'; } else if (std.mem.eql(u8, arg, "-x")) { const v = it.next() orelse return error.MissingValue; - opts.data_representation = try std.fmt.parseInt(u16, v, 10); + opts.data_representation = std.fmt.parseInt(u16, v, 10) catch 1; } else if (std.mem.eql(u8, arg, "-z")) { const v = it.next() orelse return error.MissingValue; - opts.shapesize = try std.fmt.parseInt(i32, v, 10); + opts.shapesize = std.fmt.parseInt(i32, v, 10) catch 20; + } else if (std.mem.eql(u8, arg, "-n")) { + const v = it.next() orelse return error.MissingValue; + opts.num_instances = std.fmt.parseInt(u32, v, 10) catch 1; } else if (std.mem.eql(u8, arg, "--write-period")) { const v = it.next() orelse return error.MissingValue; - opts.write_period_ms = try std.fmt.parseInt(u64, v, 10); + opts.write_period_ms = std.fmt.parseInt(u64, v, 10) catch 33; } else if (std.mem.eql(u8, arg, "--read-period")) { const v = it.next() orelse return error.MissingValue; - opts.read_period_ms = try std.fmt.parseInt(u64, v, 10); - } else if (std.mem.eql(u8, arg, "--num-iterations")) { - const v = it.next() orelse return error.MissingValue; - opts.num_iterations = try std.fmt.parseInt(i64, v, 10); - } else if (std.mem.eql(u8, arg, "--num-instances")) { + opts.read_period_ms = std.fmt.parseInt(u64, v, 10) catch 100; + } else if (std.mem.eql(u8, arg, "--num-iterations") or + std.mem.eql(u8, arg, "-i")) + { const v = it.next() orelse return error.MissingValue; - opts.num_instances = try std.fmt.parseInt(u32, v, 10); - } else if (std.mem.eql(u8, arg, "--num-topics")) { - // not yet supported — silently ignore value - _ = it.next(); - } else if (std.mem.eql(u8, arg, "--additional-payload-size")) { + opts.num_iterations = std.fmt.parseInt(i64, v, 10) catch -1; + } else if (std.mem.eql(u8, arg, "--additional-payload")) { const v = it.next() orelse return error.MissingValue; - opts.additional_payload = try std.fmt.parseInt(u32, v, 10); - } else if (std.mem.eql(u8, arg, "--cft")) { - const v = it.next() orelse return error.MissingValue; - opts.cft_expression = v; + opts.additional_payload = std.fmt.parseInt(u32, v, 10) catch 0; } else if (std.mem.eql(u8, arg, "--size-modulo")) { const v = it.next() orelse return error.MissingValue; - opts.size_modulo = try std.fmt.parseInt(i32, v, 10); - } else if (std.mem.eql(u8, arg, "--time-filter") or - std.mem.eql(u8, arg, "--lifespan") or - std.mem.eql(u8, arg, "--write-period") or - std.mem.eql(u8, arg, "--periodic-announcement") or - std.mem.eql(u8, arg, "--final-instance-state") or - std.mem.eql(u8, arg, "--access-scope") or - std.mem.eql(u8, arg, "--coherent-sample-count") or - std.mem.eql(u8, arg, "--take-read")) + opts.size_modulo = std.fmt.parseInt(i32, v, 10) catch 0; + } else if (std.mem.eql(u8, arg, "--cft")) { + opts.cft_expression = it.next() orelse return error.MissingValue; + } else if (std.mem.eql(u8, arg, "--publisher-matches") or + std.mem.eql(u8, arg, "--subscriber-matches") or + std.mem.eql(u8, arg, "--deadline") or + std.mem.eql(u8, arg, "--periodic-announcement") or + std.mem.eql(u8, arg, "--final-instance-state") or + std.mem.eql(u8, arg, "--access-scope") or + std.mem.eql(u8, arg, "--coherent-sample-count") or + std.mem.eql(u8, arg, "--take-read")) { // consume argument value and ignore — unimplemented options _ = it.next(); } else if (std.mem.eql(u8, arg, "--coherent") or - std.mem.eql(u8, arg, "--ordered")) + std.mem.eql(u8, arg, "--ordered")) { // boolean flags — ignore } else if (std.mem.startsWith(u8, arg, "--") or std.mem.startsWith(u8, arg, "-")) { @@ -755,16 +758,15 @@ fn parseArgs(process_args: std.process.Args) !Options { // ── main ────────────────────────────────────────────────────────────────────── pub fn main(init: std.process.Init.Minimal) !void { - // Install SIGINT handler so the test harness can cleanly terminate us. const sa = std.posix.Sigaction{ .handler = .{ .handler = handleSigint }, - .mask = std.posix.sigemptyset(), - .flags = 0, + .mask = std.posix.sigemptyset(), + .flags = 0, }; std.posix.sigaction(std.posix.SIG.INT, &sa, null); - var gpa = std.heap.DebugAllocator(.{}){}; - defer _ = gpa.deinit(); + var gpa = std.heap.DebugAllocator(.{}){}; + defer _ = gpa.deinit(); const alloc = gpa.allocator(); const opts = parseArgs(init.args) catch |err| { @@ -777,31 +779,22 @@ pub fn main(init: std.process.Init.Minimal) !void { std.process.exit(1); } - const udp = try UdpTransport.init(alloc, .{}, opts.domain_id, null); - defer udp.deinit(); - const transport = udp.transport(); - - const disc = try SpdpSedpDiscovery.init(alloc, transport, opts.domain_id, 3_000); - defer disc.deinit(); - const discovery = disc.toDiscovery(); - - var factory = try DomainParticipantFactoryImpl.init( - alloc, transport, discovery, noop_security, .random, .{}, - ); - defer factory.deinit(); - const dpf = factory.toDDSFactory(); - - const dp = dpf.create_participant(opts.domain_id, .{}, nil.nil_dp_listener, 0); - if (dp.ptr == nil.nil_dp_listener.ptr) { - std.log.err("failed to create participant on domain {d}", .{opts.domain_id}); + const participant = dds.createParticipant(alloc, opts.domain_id) catch |err| { + std.log.err("failed to create participant on domain {d}: {}", .{ opts.domain_id, err }); std.process.exit(1); - } - defer _ = dpf.delete_participant(dp); + }; + defer dds.destroyParticipant(participant); + const dp = participant.toDDS(); const topic = dp.vtable.create_topic( - dp.ptr, opts.topic_name, "ShapeType", .{}, nil.nil_topic_listener, 0, + dp.ptr, + opts.topic_name, + "ShapeType", + .{}, + dds.nilTopicListener(), + 0, ); - if (topic.ptr == nil.nil_topic_listener.ptr) { + if (isNilTopic(topic)) { std.log.err("failed to create topic '{s}'", .{opts.topic_name}); std.process.exit(1); } diff --git a/srcZig/zenzen-zig/build.zig b/srcZig/zenzen-zig/build.zig index d6e947c9..561a900a 100644 --- a/srcZig/zenzen-zig/build.zig +++ b/srcZig/zenzen-zig/build.zig @@ -1,29 +1,37 @@ const std = @import("std"); pub fn build(b: *std.Build) void { - const target = b.standardTargetOptions(.{}); + const target = b.standardTargetOptions(.{}); const optimize = b.standardOptimizeOption(.{}); - const version = b.option([]const u8, "dds-version", - "ZenzenDDS version string embedded in the executable name (default: 0.0.0)") - orelse "0.0.0"; + const version = b.option([]const u8, "dds-version", "ZenzenDDS version string embedded in the executable name (default: 0.0.0)") orelse "0.0.0"; - const zzdds_dep = b.dependency("zzdds", .{ .target = target, .optimize = optimize }); - const zzdds_mod = zzdds_dep.module("zzdds"); - const zzdds_gen = zzdds_dep.module("zzdds_generated"); + const zzdds_dep = b.dependency("zzdds", .{ .target = target, .optimize = optimize }); + const zzdds_mod = zzdds_dep.module("zzdds"); + const zzdds_gen = zzdds_dep.module("zzdds_generated"); - const exe_name = std.fmt.allocPrint(b.allocator, - "zenzen_dds-{s}_shape_main_linux", .{version}) catch @panic("OOM"); + // Build the "dds" shim module from our vendor implementation. + // shape_main.zig imports only this module; it has no direct zzdds dependency. + const dds_mod = b.createModule(.{ + .root_source_file = b.path("dds_impl.zig"), + .target = target, + .optimize = optimize, + .imports = &.{ + .{ .name = "zzdds", .module = zzdds_mod }, + .{ .name = "zzdds_generated", .module = zzdds_gen }, + }, + }); + + const exe_name = std.fmt.allocPrint(b.allocator, "zenzen_dds-{s}_shape_main_linux", .{version}) catch @panic("OOM"); const exe = b.addExecutable(.{ .name = exe_name, .root_module = b.createModule(.{ .root_source_file = b.path("../shape_main.zig"), - .target = target, + .target = target, .optimize = optimize, - .imports = &.{ - .{ .name = "zzdds", .module = zzdds_mod }, - .{ .name = "zzdds_generated", .module = zzdds_gen }, + .imports = &.{ + .{ .name = "dds", .module = dds_mod }, }, }), }); @@ -31,7 +39,7 @@ pub fn build(b: *std.Build) void { b.installArtifact(exe); const run_step = b.step("run", "Run shape_main"); - const run_cmd = b.addRunArtifact(exe); + const run_cmd = b.addRunArtifact(exe); if (b.args) |args| run_cmd.addArgs(args); run_step.dependOn(&run_cmd.step); } diff --git a/srcZig/zenzen-zig/dds_impl.zig b/srcZig/zenzen-zig/dds_impl.zig new file mode 100644 index 00000000..de8b2e30 --- /dev/null +++ b/srcZig/zenzen-zig/dds_impl.zig @@ -0,0 +1,203 @@ +//! ZenzenDDS implementation of the srcZig/dds shim protocol. +//! +//! shape_main.zig imports this module as "dds". Every symbol exported here +//! matches the protocol documented in srcZig/dds.zig; see that file for the +//! full contract that any Zig DDS vendor must satisfy. + +const std = @import("std"); + +const zzdds = @import("zzdds"); +const zzdds_gen = @import("zzdds_generated"); + +pub const DDS = zzdds_gen.DDS; + +const UdpTransport = zzdds.udp_transport.UdpTransport; +const SpdpSedpDiscovery = zzdds.combined_discovery.SpdpSedpDiscovery; +const DomainParticipantFactoryImpl = zzdds.dcps.DomainParticipantFactoryImpl; +const DataWriterImpl = zzdds.dcps.DataWriterImpl; +const DataReaderImpl = zzdds.dcps.DataReaderImpl; +const TopicImpl = zzdds.dcps.TopicImpl; +const ContentFilteredTopicImpl = zzdds.dcps.ContentFilteredTopicImpl; +const noop_security = zzdds.noop_security.noop_security_plugins; +const time_mod = zzdds.util.time; +const history_mod = zzdds.rtps.history; +const nil = zzdds.dcps; +const filter_mod = zzdds.dcps.filter; + +// ── Participant bootstrapping ───────────────────────────────────────────────── + +pub const Participant = struct { + alloc: std.mem.Allocator, + udp: *UdpTransport, + disc: *SpdpSedpDiscovery, + factory: *DomainParticipantFactoryImpl, + dp: DDS.DomainParticipant, + + pub fn toDDS(self: *Participant) DDS.DomainParticipant { + return self.dp; + } +}; + +pub fn createParticipant(alloc: std.mem.Allocator, domain_id: u32) !*Participant { + const p = try alloc.create(Participant); + errdefer alloc.destroy(p); + + const udp = try UdpTransport.init(alloc, .{}, domain_id, null); + errdefer udp.deinit(); + const transport = udp.transport(); + + const disc = try SpdpSedpDiscovery.init(alloc, transport, domain_id, 3_000); + errdefer disc.deinit(); + const discovery = disc.toDiscovery(); + + var factory = try DomainParticipantFactoryImpl.init( + alloc, + transport, + discovery, + noop_security, + .random, + .{}, + ); + errdefer factory.deinit(); + const dpf = factory.toDDSFactory(); + + const dp = dpf.create_participant(domain_id, .{}, nil.nil_dp_listener, 0); + if (dp.ptr == nil.nil_dp_listener.ptr) return error.ParticipantFailed; + + p.* = .{ + .alloc = alloc, + .udp = udp, + .disc = disc, + .factory = factory, + .dp = dp, + }; + return p; +} + +pub fn destroyParticipant(p: *Participant) void { + const dpf = p.factory.toDDSFactory(); + _ = dpf.delete_participant(p.dp); + p.factory.deinit(); + p.disc.deinit(); + p.udp.deinit(); + p.alloc.destroy(p); +} + +// ── Topic name ──────────────────────────────────────────────────────────────── + +pub fn topicName(topic: DDS.Topic) []const u8 { + const impl: *TopicImpl = @ptrCast(@alignCast(topic.ptr)); + return impl.topic_name; +} + +// ── DataWriter extras ───────────────────────────────────────────────────────── + +pub const WriteKind = enum { alive, dispose, unregister }; + +pub fn writeRaw( + dw: DDS.DataWriter, + kind: WriteKind, + key_hash: [16]u8, + data: []const u8, +) !void { + const impl: *DataWriterImpl = @ptrCast(@alignCast(dw.ptr)); + const ck: history_mod.ChangeKind = switch (kind) { + .alive => .alive, + .dispose => .not_alive_disposed, + .unregister => .not_alive_unregistered, + }; + _ = try impl.writeRaw(ck, time_mod.RtpsTimestamp.now(), history_mod.INSTANCE_HANDLE_NIL, key_hash, data); +} + +pub fn writerMatchedCount(dw: DDS.DataWriter) usize { + const impl: *DataWriterImpl = @ptrCast(@alignCast(dw.ptr)); + return impl.matchedReaderCount(); +} + +pub fn writerNotifyDeadline(dw: DDS.DataWriter) void { + const impl: *DataWriterImpl = @ptrCast(@alignCast(dw.ptr)); + impl.notifyDeadlineMissed(); +} + +// ── DataReader extras ───────────────────────────────────────────────────────── + +pub const TakenSample = struct { + data: []u8, + alloc: std.mem.Allocator, + + pub fn deinit(self: TakenSample) void { + self.alloc.free(self.data); + } +}; + +pub fn takeRaw(dr: DDS.DataReader) ?TakenSample { + const impl: *DataReaderImpl = @ptrCast(@alignCast(dr.ptr)); + const taken = impl.takeRaw() orelse return null; + return .{ .data = taken.data, .alloc = impl.alloc }; +} + +pub fn readerMatchedCount(dr: DDS.DataReader) usize { + const impl: *DataReaderImpl = @ptrCast(@alignCast(dr.ptr)); + return impl.matchedWriterCount(); +} + +pub fn readerNotifyDeadline(dr: DDS.DataReader) void { + const impl: *DataReaderImpl = @ptrCast(@alignCast(dr.ptr)); + impl.notifyDeadlineMissed(); +} + +// ── ContentFilteredTopic evaluation ────────────────────────────────────────── + +pub const FilterValue = filter_mod.FilterValue; +pub const FieldAccessor = filter_mod.FieldAccessor; + +pub fn cftMatchSample(cft: DDS.ContentFilteredTopic, acc: FieldAccessor) bool { + const impl: *ContentFilteredTopicImpl = @ptrCast(@alignCast(cft.ptr)); + return impl.matchSample(acc); +} + +pub fn cftTopicDescription(cft: DDS.ContentFilteredTopic) DDS.TopicDescription { + const impl: *ContentFilteredTopicImpl = @ptrCast(@alignCast(cft.ptr)); + return impl.toTopicDescription(); +} + +// ── Nil sentinel helpers ────────────────────────────────────────────────────── +// All nil entities share the same underlying nil_storage address (NIL_PTR). +// We recover that address from any exported nil constant without needing to +// re-export NIL_PTR itself. + +fn nilPtr() *anyopaque { + return zzdds.dcps.nil_topic_listener.ptr; +} + +pub fn nilTopicListener() DDS.TopicListener { + return zzdds.dcps.nil_topic_listener; +} +pub fn nilPublisherListener() DDS.PublisherListener { + return zzdds.dcps.nil_pub_listener; +} +pub fn nilSubscriberListener() DDS.SubscriberListener { + return zzdds.dcps.nil_sub_listener; +} + +pub fn isNilDp(dp: DDS.DomainParticipant) bool { + return dp.ptr == nilPtr(); +} +pub fn isNilTopic(t: DDS.Topic) bool { + return t.ptr == nilPtr(); +} +pub fn isNilPub(p: DDS.Publisher) bool { + return p.ptr == nilPtr(); +} +pub fn isNilSub(s: DDS.Subscriber) bool { + return s.ptr == nilPtr(); +} +pub fn isNilDw(dw: DDS.DataWriter) bool { + return dw.ptr == nilPtr(); +} +pub fn isNilDr(dr: DDS.DataReader) bool { + return dr.ptr == nilPtr(); +} +pub fn isNilCft(cft: DDS.ContentFilteredTopic) bool { + return cft.ptr == nilPtr(); +} From d6a5dbeaaa0742de182f71acc35bfe53e1628d76 Mon Sep 17 00:00:00 2001 From: Timothy Simpson Date: Tue, 12 May 2026 22:57:51 +0100 Subject: [PATCH 04/16] update path / zig version --- srcZig/zenzen-zig/build.zig.zon | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/srcZig/zenzen-zig/build.zig.zon b/srcZig/zenzen-zig/build.zig.zon index 4d5d1299..fac2ca0f 100644 --- a/srcZig/zenzen-zig/build.zig.zon +++ b/srcZig/zenzen-zig/build.zig.zon @@ -2,10 +2,10 @@ .name = .zenzen_dds_shape_main, .version = "0.0.0", .fingerprint = 0x59a7de95ffa6098c, - .minimum_zig_version = "0.16.0-dev.2848+b4ffb402c", + .minimum_zig_version = "0.16.0", .dependencies = .{ .zzdds = .{ - .path = "../../../zz-dev/zzdds", + .path = "../../../zzdds", }, }, .paths = .{ From 0fe1ff2b3e504135db81fff2a336170f5e09ffa6 Mon Sep 17 00:00:00 2001 From: Timothy Simpson Date: Wed, 13 May 2026 06:54:40 +0100 Subject: [PATCH 05/16] update path --- srcZig/zenzen-zig/build.zig.zon | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/srcZig/zenzen-zig/build.zig.zon b/srcZig/zenzen-zig/build.zig.zon index fac2ca0f..72a3773c 100644 --- a/srcZig/zenzen-zig/build.zig.zon +++ b/srcZig/zenzen-zig/build.zig.zon @@ -5,7 +5,7 @@ .minimum_zig_version = "0.16.0", .dependencies = .{ .zzdds = .{ - .path = "../../../zzdds", + .path = "packages/zzdds", }, }, .paths = .{ From 19a48a663e5dbbce9ffe972f91ea71beb3e41449 Mon Sep 17 00:00:00 2001 From: Timothy Simpson Date: Wed, 13 May 2026 07:18:55 +0100 Subject: [PATCH 06/16] updates for CI --- srcZig/zenzen-zig/build.zig | 2 ++ 1 file changed, 2 insertions(+) diff --git a/srcZig/zenzen-zig/build.zig b/srcZig/zenzen-zig/build.zig index 561a900a..6f4aebd5 100644 --- a/srcZig/zenzen-zig/build.zig +++ b/srcZig/zenzen-zig/build.zig @@ -5,6 +5,7 @@ pub fn build(b: *std.Build) void { const optimize = b.standardOptimizeOption(.{}); const version = b.option([]const u8, "dds-version", "ZenzenDDS version string embedded in the executable name (default: 0.0.0)") orelse "0.0.0"; + const sanitize_thread = b.option(bool, "sanitize-thread", "Enable ThreadSanitizer (requires libc, Linux only)") orelse false; const zzdds_dep = b.dependency("zzdds", .{ .target = target, .optimize = optimize }); const zzdds_mod = zzdds_dep.module("zzdds"); @@ -36,6 +37,7 @@ pub fn build(b: *std.Build) void { }), }); exe.root_module.link_libc = true; + exe.root_module.sanitize_thread = sanitize_thread; b.installArtifact(exe); const run_step = b.step("run", "Run shape_main"); From 691d9fe1f7ed74584a735ed870ff805dce2e8d93 Mon Sep 17 00:00:00 2001 From: Timothy Simpson Date: Wed, 13 May 2026 14:01:39 +0100 Subject: [PATCH 07/16] cleanup qos allocations --- srcZig/shape_main.zig | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/srcZig/shape_main.zig b/srcZig/shape_main.zig index e997bf67..0da23700 100644 --- a/srcZig/shape_main.zig +++ b/srcZig/shape_main.zig @@ -442,7 +442,8 @@ fn runPublisher( const pub_ = dp.vtable.create_publisher(dp.ptr, pub_qos, dds.nilPublisherListener(), 0); if (isNilPub(pub_)) return error.PublisherFailed; - const dw_qos = try buildWriterQos(alloc, opts); + var dw_qos = try buildWriterQos(alloc, opts); + defer dw_qos.data_representation.value.deinit(alloc); var lctx = ListenerCtx{ .topic_name = topic_name }; const dw_listener = DDS.DataWriterListener{ @@ -577,7 +578,8 @@ fn runSubscriber( const sub = dp.vtable.create_subscriber(dp.ptr, sub_qos, dds.nilSubscriberListener(), 0); if (isNilSub(sub)) return error.SubscriberFailed; - const dr_qos = try buildReaderQos(alloc, opts); + var dr_qos = try buildReaderQos(alloc, opts); + defer dr_qos.data_representation.value.deinit(alloc); var lctx = ListenerCtx{ .topic_name = topic_name }; const dr_listener = DDS.DataReaderListener{ From 9b6870e57b3d8e49f4ffffde1f1d291791ac5e6c Mon Sep 17 00:00:00 2001 From: Timothy Simpson Date: Mon, 18 May 2026 10:59:49 +0000 Subject: [PATCH 08/16] build option to allow reduced debug output --- srcZig/shape_main.zig | 55 +++++++++++++++++++++++++++++++++++++ srcZig/zenzen-zig/build.zig | 10 +++++++ 2 files changed, 65 insertions(+) diff --git a/srcZig/shape_main.zig b/srcZig/shape_main.zig index 0da23700..a4a6e53f 100644 --- a/srcZig/shape_main.zig +++ b/srcZig/shape_main.zig @@ -17,6 +17,12 @@ const std = @import("std"); const dds = @import("dds"); const DDS = dds.DDS; +const shape_main_options = @import("shape_main_options"); + +pub const std_options: std.Options = .{ + .log_level = std.meta.stringToEnum(std.log.Level, shape_main_options.log_level) orelse + @compileError("invalid shape_main log level"), +}; // ── Time helpers ───────────────────────────────────────────────────────────── // std.time.nanoTimestamp / std.time.sleep were removed in Zig 0.16. @@ -744,6 +750,50 @@ fn parseArgs(process_args: std.process.Args) !Options { std.mem.eql(u8, arg, "--ordered")) { // boolean flags — ignore + } else if (std.mem.eql(u8, arg, "-h") or std.mem.eql(u8, arg, "--help")) { + stdoutWrite( + \\Usage: shape_main -P|-S [options] + \\ + \\Mode (required): + \\ -P Publisher + \\ -S Subscriber + \\ + \\QoS: + \\ -b BEST_EFFORT reliability (default: RELIABLE) + \\ -r RELIABLE reliability (explicit) + \\ -k History depth; 0 = KEEP_ALL (default: KEEP_LAST 1) + \\ -D v|l|t|p Durability: volatile, transient-local, transient, persistent + \\ -f Deadline period in milliseconds + \\ -s Ownership strength (enables EXCLUSIVE ownership) + \\ -x 1|2 Data representation: 1=XCDR1 (default), 2=XCDR2 + \\ -p Partition name + \\ + \\Topic / data: + \\ -t Topic name (default: Square) + \\ -c Color / key value (default: BLUE) + \\ -z Shape size; 0 = auto-increment each sample (default: 20) + \\ -n Number of instances to publish (default: 1) + \\ --additional-payload Extra zero bytes appended to each sample + \\ --size-modulo Cycle shapesize 1..n when -z 0 is active + \\ --cft Content filter expression (subscriber only) + \\ + \\Timing / iterations: + \\ -i, --num-iterations Stop after n samples (-1 = infinite, default) + \\ --write-period Publish interval in ms (default: 33) + \\ --read-period Read poll interval in ms (default: 100) + \\ + \\Other: + \\ -d Domain ID (default: 0) + \\ -w Print each sample on the writer side + \\ -h, --help Show this help and exit + \\ + \\Environment variables: + \\ SHAPE_STARTUP_DELAY_MS= Sleep before creating the DDS participant. + \\ Useful for late-join testing without relying + \\ on fixed sleeps in the test harness. + \\ + ); + std.process.exit(0); } else if (std.mem.startsWith(u8, arg, "--") or std.mem.startsWith(u8, arg, "-")) { std.log.warn("unrecognised option: {s}", .{arg}); } @@ -771,6 +821,11 @@ pub fn main(init: std.process.Init.Minimal) !void { defer _ = gpa.deinit(); const alloc = gpa.allocator(); + if (std.c.getenv("SHAPE_STARTUP_DELAY_MS")) |v| { + const ms = std.fmt.parseInt(u64, std.mem.span(v), 10) catch 0; + if (ms > 0) sleepNs(ms * std.time.ns_per_ms); + } + const opts = parseArgs(init.args) catch |err| { std.log.err("argument error: {}", .{err}); std.process.exit(1); diff --git a/srcZig/zenzen-zig/build.zig b/srcZig/zenzen-zig/build.zig index 6f4aebd5..827c1206 100644 --- a/srcZig/zenzen-zig/build.zig +++ b/srcZig/zenzen-zig/build.zig @@ -4,8 +4,15 @@ pub fn build(b: *std.Build) void { const target = b.standardTargetOptions(.{}); const optimize = b.standardOptimizeOption(.{}); + const LogLevel = enum { err, warn, info, debug }; + const version = b.option([]const u8, "dds-version", "ZenzenDDS version string embedded in the executable name (default: 0.0.0)") orelse "0.0.0"; const sanitize_thread = b.option(bool, "sanitize-thread", "Enable ThreadSanitizer (requires libc, Linux only)") orelse false; + const default_log_level: LogLevel = switch (optimize) { + .Debug => .debug, + .ReleaseSafe, .ReleaseFast, .ReleaseSmall => .info, + }; + const log_level = b.option(LogLevel, "log-level", "shape_main std.log level: err, warn, info, debug (default matches Zig build mode)") orelse default_log_level; const zzdds_dep = b.dependency("zzdds", .{ .target = target, .optimize = optimize }); const zzdds_mod = zzdds_dep.module("zzdds"); @@ -24,6 +31,8 @@ pub fn build(b: *std.Build) void { }); const exe_name = std.fmt.allocPrint(b.allocator, "zenzen_dds-{s}_shape_main_linux", .{version}) catch @panic("OOM"); + const shape_main_options = b.addOptions(); + shape_main_options.addOption([]const u8, "log_level", @tagName(log_level)); const exe = b.addExecutable(.{ .name = exe_name, @@ -33,6 +42,7 @@ pub fn build(b: *std.Build) void { .optimize = optimize, .imports = &.{ .{ .name = "dds", .module = dds_mod }, + .{ .name = "shape_main_options", .module = shape_main_options.createModule() }, }, }), }); From 67b56c24bd6e5ff8f4e93d35798e969dae8297c9 Mon Sep 17 00:00:00 2001 From: Timothy Simpson Date: Mon, 18 May 2026 11:03:54 +0000 Subject: [PATCH 09/16] rename working zzdds working folder --- srcZig/{zenzen-zig => zzdds}/.gitignore | 0 srcZig/{zenzen-zig => zzdds}/build.zig | 0 srcZig/{zenzen-zig => zzdds}/build.zig.zon | 0 srcZig/{zenzen-zig => zzdds}/dds_impl.zig | 0 4 files changed, 0 insertions(+), 0 deletions(-) rename srcZig/{zenzen-zig => zzdds}/.gitignore (100%) rename srcZig/{zenzen-zig => zzdds}/build.zig (100%) rename srcZig/{zenzen-zig => zzdds}/build.zig.zon (100%) rename srcZig/{zenzen-zig => zzdds}/dds_impl.zig (100%) diff --git a/srcZig/zenzen-zig/.gitignore b/srcZig/zzdds/.gitignore similarity index 100% rename from srcZig/zenzen-zig/.gitignore rename to srcZig/zzdds/.gitignore diff --git a/srcZig/zenzen-zig/build.zig b/srcZig/zzdds/build.zig similarity index 100% rename from srcZig/zenzen-zig/build.zig rename to srcZig/zzdds/build.zig diff --git a/srcZig/zenzen-zig/build.zig.zon b/srcZig/zzdds/build.zig.zon similarity index 100% rename from srcZig/zenzen-zig/build.zig.zon rename to srcZig/zzdds/build.zig.zon diff --git a/srcZig/zenzen-zig/dds_impl.zig b/srcZig/zzdds/dds_impl.zig similarity index 100% rename from srcZig/zenzen-zig/dds_impl.zig rename to srcZig/zzdds/dds_impl.zig From f28385ec2b3142abe086deb6f6c0447a80a46d82 Mon Sep 17 00:00:00 2001 From: Timothy Simpson Date: Mon, 18 May 2026 11:17:00 +0000 Subject: [PATCH 10/16] interop repo fix --- interoperability_report.py | 21 +++++++++++++++++---- srcZig/zzdds/build.zig | 2 +- srcZig/zzdds/build.zig.zon | 2 +- 3 files changed, 19 insertions(+), 6 deletions(-) diff --git a/interoperability_report.py b/interoperability_report.py index 1a6e78c4..5f64bd6d 100644 --- a/interoperability_report.py +++ b/interoperability_report.py @@ -65,6 +65,21 @@ def stop_process(child_process, timeout=30, poll_interval=0.2): return return_value +def drain_process_output(child_process, timeout=0.1): + """Drain pending output from a pexpect child so its pty cannot fill.""" + try: + child_process.expect([pexpect.EOF, pexpect.TIMEOUT], timeout=timeout) + except (pexpect.EOF, pexpect.TIMEOUT): + pass + +def wait_for_events_and_drain(child_process, events, timeout=0.1): + """Wait for multiprocessing events while continuing to drain child output.""" + while not all(element.is_set() for element in events): + if child_process.isalive(): + drain_process_output(child_process, timeout) + else: + time.sleep(timeout) + def run_subscriber_shape_main( name_executable: str, parameters: str, @@ -212,8 +227,7 @@ def run_subscriber_shape_main( subscriber_finished.set() # set subscriber as finished log_message(f'Subscriber {subscriber_index}: Waiting for Publishers to ' 'finish', verbosity) - for element in publishers_finished: - element.wait() # wait for all publishers to finish + wait_for_events_and_drain(child_sub, publishers_finished) # Stop process if not stop_process(child_sub): log_message(f'Subscriber {subscriber_index} process did not exit ' @@ -401,8 +415,7 @@ def run_publisher_shape_main( log_message(f'Publisher {publisher_index}: Waiting for Subscribers to finish', verbosity) - for element in subscribers_finished: - element.wait() # wait for all subscribers to finish + wait_for_events_and_drain(child_pub, subscribers_finished) publisher_finished.set() # set publisher as finished # Stop process if not stop_process(child_pub): diff --git a/srcZig/zzdds/build.zig b/srcZig/zzdds/build.zig index 827c1206..8bed7c11 100644 --- a/srcZig/zzdds/build.zig +++ b/srcZig/zzdds/build.zig @@ -30,7 +30,7 @@ pub fn build(b: *std.Build) void { }, }); - const exe_name = std.fmt.allocPrint(b.allocator, "zenzen_dds-{s}_shape_main_linux", .{version}) catch @panic("OOM"); + const exe_name = std.fmt.allocPrint(b.allocator, "zzdds-{s}_shape_main_linux", .{version}) catch @panic("OOM"); const shape_main_options = b.addOptions(); shape_main_options.addOption([]const u8, "log_level", @tagName(log_level)); diff --git a/srcZig/zzdds/build.zig.zon b/srcZig/zzdds/build.zig.zon index 72a3773c..5a2bde3a 100644 --- a/srcZig/zzdds/build.zig.zon +++ b/srcZig/zzdds/build.zig.zon @@ -1,5 +1,5 @@ .{ - .name = .zenzen_dds_shape_main, + .name = .zzdds_shape_main, .version = "0.0.0", .fingerprint = 0x59a7de95ffa6098c, .minimum_zig_version = "0.16.0", From 05ca8d001734c08f4967bdd8224f7661a5a0d9ad Mon Sep 17 00:00:00 2001 From: Timothy Simpson Date: Mon, 18 May 2026 11:18:51 +0000 Subject: [PATCH 11/16] fingerprint fix --- srcZig/zzdds/build.zig.zon | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/srcZig/zzdds/build.zig.zon b/srcZig/zzdds/build.zig.zon index 5a2bde3a..dd54e833 100644 --- a/srcZig/zzdds/build.zig.zon +++ b/srcZig/zzdds/build.zig.zon @@ -1,7 +1,7 @@ .{ .name = .zzdds_shape_main, .version = "0.0.0", - .fingerprint = 0x59a7de95ffa6098c, + .fingerprint = 0x336e9664cb8850f7, .minimum_zig_version = "0.16.0", .dependencies = .{ .zzdds = .{ From 382e4f1dd7bff3d66c7661ea13e77079f3ac5c62 Mon Sep 17 00:00:00 2001 From: Timothy Simpson Date: Wed, 20 May 2026 16:46:22 +0100 Subject: [PATCH 12/16] updates for version handling within CI --- srcZig/zzdds/build.zig | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/srcZig/zzdds/build.zig b/srcZig/zzdds/build.zig index 8bed7c11..a0e365c5 100644 --- a/srcZig/zzdds/build.zig +++ b/srcZig/zzdds/build.zig @@ -6,7 +6,7 @@ pub fn build(b: *std.Build) void { const LogLevel = enum { err, warn, info, debug }; - const version = b.option([]const u8, "dds-version", "ZenzenDDS version string embedded in the executable name (default: 0.0.0)") orelse "0.0.0"; + const version = b.option([]const u8, "dds-version", "Full zzdds version string for the executable name (e.g. 0.1.0-zig.0.16.0); omit for a stable CI-friendly name"); const sanitize_thread = b.option(bool, "sanitize-thread", "Enable ThreadSanitizer (requires libc, Linux only)") orelse false; const default_log_level: LogLevel = switch (optimize) { .Debug => .debug, @@ -30,7 +30,10 @@ pub fn build(b: *std.Build) void { }, }); - const exe_name = std.fmt.allocPrint(b.allocator, "zzdds-{s}_shape_main_linux", .{version}) catch @panic("OOM"); + const exe_name = if (version) |v| + std.fmt.allocPrint(b.allocator, "zzdds-{s}_shape_main_linux", .{v}) catch @panic("OOM") + else + "zzdds_shape_main_linux"; const shape_main_options = b.addOptions(); shape_main_options.addOption([]const u8, "log_level", @tagName(log_level)); From 50dd3c76ceae52e8aea9210bd56d7b189a246673 Mon Sep 17 00:00:00 2001 From: Timothy Simpson Date: Thu, 21 May 2026 12:14:39 +0100 Subject: [PATCH 13/16] Add CLA for Zenzen IoT --- CLA/CLA_Zenzen_IoT.md | 60 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 60 insertions(+) create mode 100644 CLA/CLA_Zenzen_IoT.md diff --git a/CLA/CLA_Zenzen_IoT.md b/CLA/CLA_Zenzen_IoT.md new file mode 100644 index 00000000..2310a85c --- /dev/null +++ b/CLA/CLA_Zenzen_IoT.md @@ -0,0 +1,60 @@ +# OMG DDS INTEROPERABILITY REPOSITORY - CONTRIBUTOR LICENSE AGREEMENT + +**This Contributor License Agreement ("Agreement") specifies the terms under which the individual or corporate entity specified in the signature block below (“You”) agree to make intellectual property contributions to the OMG DDS Interoperability Repository. BY SIGNING BELOW YOU ARE AGREEING TO BE BOUND BY THE TERMS OF THIS AGREEMENT. If You are signing this Agreement in Your capacity as an employee, THEN YOUR EMPLOYER AND YOU ARE BOTH BOUND BY THIS AGREEMENT.** + +1. Definitions + + 1. "OMG DDS Interoperability Repository" (or “Repository”) means the Git repository [https://github.com/omg-dds/dds-rtps](https://github.com/omg-dds/dds-rtps). + + 2. "Moderator" means an entity or individual responsible for authorizing changes to the Repository. + + 3. "Submit" (or “Submitted”) means any submission, including source code, binaries, code, pull requests, issue reports, comments, etc., made to the Moderators for inclusion in the Repository either through the Git repository interface or through electronic file transfer. + + 4. A "Contribution" is any original work of authorship, including any modifications or additions to an existing work, that You Submit to the DDS Interoperability Repository. + + 5. A "User" is anyone who accesses the Repository. + +2. Allowable Contribution Representations + + 1. You represent that You have the necessary rights to the Contribution(s) to meet the obligations of this Agreement. If You are employed, Your employer has authorized Contribution(s) under this Agreement. + + 2. You represent that you have no knowledge of third-party intellectual property rights that are likely to be infringed by the Contribution(s). You represent that you have no knowledge that such infringement or any allegation of misappropriation of intellectual property rights is likely to be claimed or has already been claimed. + +3. License + + You grant Moderators a perpetual, worldwide, non-exclusive, assignable, paid-up license to publish, display, and redistribute the Contribution as part of the Repository. You also license to Moderators under the same terms any other intellectual property rights required to publish, display, and redistribute the Contributions as part of the Repository. You further grant all Users of the Repository a license to the Contribution under the terms of the [OMG DDS Interoperability Testing License](../LICENSE.md) included in the Repository. Moderators are under no obligation to publish Contributions. + +4. No Warranty, Consequential Damages. Limited Liability + + Other than explicitly stated herein, You provide the Contribution(s) "as is" with no warranty nor claims of fitness to any purpose. Neither party shall be liable for consequential or special damages of any kind. Other than for breach of warranty or representations herein, the liability of either party to the other shall be limited to $1000. + +5. General + + 1. If You are an agency of the United States Government, then this Agreement will be governed by the United States federal common law. Otherwise, this Agreement will be governed by the laws of the State of California except with regard to its choice of law rules. + + 2. A party may assign this Agreement to an entity acquiring essentially all of the party’s relevant business. + +6. Electronic Signatures + + "Electronic Signature" means any electronic sound, symbol, or process attached to or logically associated with a record and executed and adopted by a party with the intent to sign such record. + + Each party agrees that the Electronic Signatures, whether digital or encrypted, of the parties included in this Agreement are intended to authenticate this writing and to have the same force and effect as manual signatures. + + +IN WITNESS WHEREOF, You, intending to be legally bound, have executed this Agreement or caused Your employer’s proper and duly authorized officer to execute and deliver this Agreement, for good and valuable consideration, the sufficiency of which is hereby acknowledged, as of the day and year first written below. + +**For:** + +Entity Name: Zenzen IoT + +Address: 28 Laurelhill Place, Stirling, FK8 2JJ, United Kingdom + + ("**You**") + +**By:** + +Name: Timothy Simpson + +Title: Founder + +Date: May 21st, 2026 From 8f69fa35ee9881ebbb61563d8b3102ac932fa2cc Mon Sep 17 00:00:00 2001 From: Timothy Simpson Date: Fri, 22 May 2026 11:33:50 +0100 Subject: [PATCH 14/16] Add type support registration --- srcZig/dds.zig | 15 +++++++++++++++ srcZig/shape_main.zig | 36 +++++++++++++++++++++++++++++++++--- srcZig/zzdds/dds_impl.zig | 14 ++++++++++++++ 3 files changed, 62 insertions(+), 3 deletions(-) diff --git a/srcZig/dds.zig b/srcZig/dds.zig index ee955caf..466ab7b9 100644 --- a/srcZig/dds.zig +++ b/srcZig/dds.zig @@ -73,6 +73,21 @@ //! //! pub fn cftMatchSample(cft: DDS.ContentFilteredTopic, acc: FieldAccessor) bool; //! pub fn cftTopicDescription(cft: DDS.ContentFilteredTopic) DDS.TopicDescription; +//! +//! ── TypeSupport (type schema registration) ──────────────────────────── +//! +//! pub const TypeSupport = struct { +//! compute_key_hash: *const fn (payload: []const u8) [16]u8, +//! }; +//! +//! pub fn registerTypeSupport(dp: DDS.DomainParticipant, +//! type_name: []const u8, +//! ts: TypeSupport) void; +//! Register a key-hash computation callback for a named type. Call +//! before creating DataReaders for that type so that received changes +//! whose inline-QoS omits a key_hash can have one computed from the +//! CDR payload. `payload` passed to compute_key_hash includes the +//! 4-byte CDR encapsulation header. // This file is documentation only. shape_main.zig imports the module // named "dds" which is provided by the vendor's build.zig, not this file. diff --git a/srcZig/shape_main.zig b/srcZig/shape_main.zig index a4a6e53f..5782b12f 100644 --- a/srcZig/shape_main.zig +++ b/srcZig/shape_main.zig @@ -187,14 +187,14 @@ fn serializeShape(buf: *std.ArrayList(u8), alloc: std.mem.Allocator, s: ShapeDat } // Compute 16-byte key hash from CDR-serialised key (color string). -// DDS spec: use the raw serialised key when it fits in 16 bytes, else MD5. +// DDS spec §9.6.3.8: key is CDR_BE (big-endian) serialization of the key fields. +// When the serialised key fits in 16 bytes, the hash is the raw bytes zero-padded. fn colorKeyHash(color: []const u8) [16]u8 { var kh = std.mem.zeroes([16]u8); - // CDR key: u32 length (LE) + chars + '\0' const clen: u32 = @intCast(color.len + 1); const klen: usize = 4 + clen; if (klen <= 16) { - std.mem.writeInt(u32, kh[0..4], clen, .little); + std.mem.writeInt(u32, kh[0..4], clen, .big); @memcpy(kh[4..][0..color.len], color); // null at [4 + color.len]; already zero from zeroes() } else { @@ -203,6 +203,34 @@ fn colorKeyHash(color: []const u8) [16]u8 { return kh; } +// Derive the key hash from a received CDR payload by parsing out the color field. +// Used as the TypeSupport compute_key_hash callback so the reader-side participant +// can recover instance identity when the writer omitted the inline-QoS key_hash. +fn shapeTypeKeyHash(payload: []const u8) [16]u8 { + if (payload.len < 4) return std.mem.zeroes([16]u8); + const encap = payload[1]; + var off: usize = 4; // skip 4-byte encapsulation header + + // XCDR2 @appendable (encap 0x08 or 0x09): skip 4-byte struct DHEADER + if (encap == 0x08 or encap == 0x09) { + if (payload.len < off + 4) return std.mem.zeroes([16]u8); + off += 4; + } + + if (payload.len < off + 4) return std.mem.zeroes([16]u8); + // String length endianness matches the payload: odd encap byte → little-endian + const is_le = (encap & 0x01) != 0; + const clen: u32 = if (is_le) + std.mem.readInt(u32, payload[off..][0..4], .little) + else + std.mem.readInt(u32, payload[off..][0..4], .big); + off += 4; + if (clen == 0 or payload.len < off + clen) return std.mem.zeroes([16]u8); + const color = payload[off .. off + clen - 1]; // strip null terminator + + return colorKeyHash(color); +} + const ParsedShape = struct { color: []const u8, // slice into payload; valid while payload is alive x: i32, @@ -843,6 +871,8 @@ pub fn main(init: std.process.Init.Minimal) !void { defer dds.destroyParticipant(participant); const dp = participant.toDDS(); + dds.registerTypeSupport(dp, "ShapeType", .{ .compute_key_hash = shapeTypeKeyHash }); + const topic = dp.vtable.create_topic( dp.ptr, opts.topic_name, diff --git a/srcZig/zzdds/dds_impl.zig b/srcZig/zzdds/dds_impl.zig index de8b2e30..84674f26 100644 --- a/srcZig/zzdds/dds_impl.zig +++ b/srcZig/zzdds/dds_impl.zig @@ -14,6 +14,7 @@ pub const DDS = zzdds_gen.DDS; const UdpTransport = zzdds.udp_transport.UdpTransport; const SpdpSedpDiscovery = zzdds.combined_discovery.SpdpSedpDiscovery; const DomainParticipantFactoryImpl = zzdds.dcps.DomainParticipantFactoryImpl; +const DomainParticipantImpl = zzdds.dcps.DomainParticipantImpl; const DataWriterImpl = zzdds.dcps.DataWriterImpl; const DataReaderImpl = zzdds.dcps.DataReaderImpl; const TopicImpl = zzdds.dcps.TopicImpl; @@ -161,6 +162,19 @@ pub fn cftTopicDescription(cft: DDS.ContentFilteredTopic) DDS.TopicDescription { return impl.toTopicDescription(); } +// ── TypeSupport ─────────────────────────────────────────────────────────────── + +pub const TypeSupport = zzdds.dcps.TypeSupport; + +pub fn registerTypeSupport( + dp: DDS.DomainParticipant, + type_name: []const u8, + ts: TypeSupport, +) void { + const impl: *DomainParticipantImpl = @ptrCast(@alignCast(dp.ptr)); + impl.registerTypeSupport(type_name, ts); +} + // ── Nil sentinel helpers ────────────────────────────────────────────────────── // All nil entities share the same underlying nil_storage address (NIL_PTR). // We recover that address from any exported nil constant without needing to From ba895b86d1d606207d9ad8703f6b24b63fb23b53 Mon Sep 17 00:00:00 2001 From: Timothy Simpson Date: Fri, 22 May 2026 13:10:05 +0100 Subject: [PATCH 15/16] Update GUID strategy --- srcZig/zzdds/dds_impl.zig | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/srcZig/zzdds/dds_impl.zig b/srcZig/zzdds/dds_impl.zig index 84674f26..d937303d 100644 --- a/srcZig/zzdds/dds_impl.zig +++ b/srcZig/zzdds/dds_impl.zig @@ -56,7 +56,7 @@ pub fn createParticipant(alloc: std.mem.Allocator, domain_id: u32) !*Participant transport, discovery, noop_security, - .random, + .spec_random, .{}, ); errdefer factory.deinit(); From 1f6ac47933889515d0cc8e707c46bdc54221966d Mon Sep 17 00:00:00 2001 From: Timothy Simpson Date: Thu, 28 May 2026 16:20:59 +0100 Subject: [PATCH 16/16] use 0.1.0 release --- srcZig/zzdds/.gitignore | 1 + srcZig/zzdds/build.zig.zon | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/srcZig/zzdds/.gitignore b/srcZig/zzdds/.gitignore index 3389c86c..03cb27d5 100644 --- a/srcZig/zzdds/.gitignore +++ b/srcZig/zzdds/.gitignore @@ -1,2 +1,3 @@ .zig-cache/ zig-out/ +zig-pkg/ diff --git a/srcZig/zzdds/build.zig.zon b/srcZig/zzdds/build.zig.zon index dd54e833..f2046b98 100644 --- a/srcZig/zzdds/build.zig.zon +++ b/srcZig/zzdds/build.zig.zon @@ -5,7 +5,8 @@ .minimum_zig_version = "0.16.0", .dependencies = .{ .zzdds = .{ - .path = "packages/zzdds", + .url = "https://github.com/zz-iot/zzdds/archive/refs/tags/v0.1.0-zig.0.16.0.tar.gz", + .hash = "zzdds-0.1.0-zig.0.16.0-sc7dmud6DwBmyrVud73QKTiyCmBO5V8AlD0G1fd-YhGi", }, }, .paths = .{