diff --git a/.github/workflows/ci_zzdds.yml b/.github/workflows/ci_zzdds.yml new file mode 100644 index 0000000..d51174a --- /dev/null +++ b/.github/workflows/ci_zzdds.yml @@ -0,0 +1,32 @@ +name: CI zzdds + +on: + pull_request: + paths: + - 'srcZig/**' + push: + paths: + - 'srcZig/**' + workflow_dispatch: + +concurrency: + group: ${{ github.workflow }}-${{ github.ref }} + cancel-in-progress: true + +jobs: + build: + name: Build shape_main + runs-on: ubuntu-24.04 + + steps: + - name: Checkout dds-rtps + uses: actions/checkout@v4 + + - name: Install Zig + uses: mlugg/setup-zig@v2 + with: + version: "0.16.0" + + - name: Build shape_main + working-directory: srcZig/zzdds + run: zig build -Dtarget=x86_64-linux-gnu -Doptimize=ReleaseSafe diff --git a/CLA/CLA_Zenzen_IoT.md b/CLA/CLA_Zenzen_IoT.md new file mode 100644 index 0000000..2310a85 --- /dev/null +++ b/CLA/CLA_Zenzen_IoT.md @@ -0,0 +1,60 @@ +# OMG DDS INTEROPERABILITY REPOSITORY - CONTRIBUTOR LICENSE AGREEMENT + +**This Contributor License Agreement ("Agreement") specifies the terms under which the individual or corporate entity specified in the signature block below (“You”) agree to make intellectual property contributions to the OMG DDS Interoperability Repository. BY SIGNING BELOW YOU ARE AGREEING TO BE BOUND BY THE TERMS OF THIS AGREEMENT. If You are signing this Agreement in Your capacity as an employee, THEN YOUR EMPLOYER AND YOU ARE BOTH BOUND BY THIS AGREEMENT.** + +1. Definitions + + 1. "OMG DDS Interoperability Repository" (or “Repository”) means the Git repository [https://github.com/omg-dds/dds-rtps](https://github.com/omg-dds/dds-rtps). + + 2. "Moderator" means an entity or individual responsible for authorizing changes to the Repository. + + 3. "Submit" (or “Submitted”) means any submission, including source code, binaries, code, pull requests, issue reports, comments, etc., made to the Moderators for inclusion in the Repository either through the Git repository interface or through electronic file transfer. + + 4. A "Contribution" is any original work of authorship, including any modifications or additions to an existing work, that You Submit to the DDS Interoperability Repository. + + 5. A "User" is anyone who accesses the Repository. + +2. Allowable Contribution Representations + + 1. You represent that You have the necessary rights to the Contribution(s) to meet the obligations of this Agreement. If You are employed, Your employer has authorized Contribution(s) under this Agreement. + + 2. You represent that you have no knowledge of third-party intellectual property rights that are likely to be infringed by the Contribution(s). You represent that you have no knowledge that such infringement or any allegation of misappropriation of intellectual property rights is likely to be claimed or has already been claimed. + +3. License + + You grant Moderators a perpetual, worldwide, non-exclusive, assignable, paid-up license to publish, display, and redistribute the Contribution as part of the Repository. You also license to Moderators under the same terms any other intellectual property rights required to publish, display, and redistribute the Contributions as part of the Repository. You further grant all Users of the Repository a license to the Contribution under the terms of the [OMG DDS Interoperability Testing License](../LICENSE.md) included in the Repository. Moderators are under no obligation to publish Contributions. + +4. No Warranty, Consequential Damages. Limited Liability + + Other than explicitly stated herein, You provide the Contribution(s) "as is" with no warranty nor claims of fitness to any purpose. Neither party shall be liable for consequential or special damages of any kind. Other than for breach of warranty or representations herein, the liability of either party to the other shall be limited to $1000. + +5. General + + 1. If You are an agency of the United States Government, then this Agreement will be governed by the United States federal common law. Otherwise, this Agreement will be governed by the laws of the State of California except with regard to its choice of law rules. + + 2. A party may assign this Agreement to an entity acquiring essentially all of the party’s relevant business. + +6. Electronic Signatures + + "Electronic Signature" means any electronic sound, symbol, or process attached to or logically associated with a record and executed and adopted by a party with the intent to sign such record. + + Each party agrees that the Electronic Signatures, whether digital or encrypted, of the parties included in this Agreement are intended to authenticate this writing and to have the same force and effect as manual signatures. + + +IN WITNESS WHEREOF, You, intending to be legally bound, have executed this Agreement or caused Your employer’s proper and duly authorized officer to execute and deliver this Agreement, for good and valuable consideration, the sufficiency of which is hereby acknowledged, as of the day and year first written below. + +**For:** + +Entity Name: Zenzen IoT + +Address: 28 Laurelhill Place, Stirling, FK8 2JJ, United Kingdom + + ("**You**") + +**By:** + +Name: Timothy Simpson + +Title: Founder + +Date: May 21st, 2026 diff --git a/srcZig/dds.zig b/srcZig/dds.zig new file mode 100644 index 0000000..69067dc --- /dev/null +++ b/srcZig/dds.zig @@ -0,0 +1,134 @@ +//! Zig DDS shim — interface contract for srcZig/shape_main.zig. +//! +//! shape_main.zig imports a module named "dds". Any Zig DDS vendor that +//! wants to participate in the dds-rtps interoperability test suite provides +//! their own implementation of this module and wires it up as the "dds" +//! dependency in their build.zig. +//! +//! ZenzenDDS's implementation lives in zenzen-zig/dds_impl.zig. +//! +//! ── Required exports ────────────────────────────────────────────────────── +//! +//! pub const DDS = ...; +//! Re-export of the vendor's standard DDS type package. Must expose the +//! standard DCPS entity handles and QoS/Status types used by shape_main: +//! DomainParticipant, Publisher, Subscriber, Topic, ContentFilteredTopic, +//! TopicDescription, DataWriter, DataReader, DataWriterQos, DataReaderQos, +//! PublisherQos, SubscriberQos, DataWriterListener, DataReaderListener, +//! StatusMask, and the status constants (OFFERED_INCOMPATIBLE_QOS_STATUS, +//! etc.). +//! +//! pub const Participant = struct { ... }; +//! Opaque vendor state that bundles transport, discovery, and factory. +//! shape_main calls createParticipant / destroyParticipant and then +//! calls toDDS() to get the standard DomainParticipant handle for use +//! with the standard vtable API. +//! +//! pub fn createParticipant(alloc: std.mem.Allocator, domain_id: u32) !*Participant; +//! pub fn destroyParticipant(p: *Participant) void; +//! +//! pub fn topicName(topic: DDS.Topic) []const u8; +//! Returns the topic name string from a DDS.Topic handle. +//! +//! ── DataWriter extras (not in the standard DCPS vtable) ─────────────── +//! +//! pub const WriteKind = enum { alive, dispose, unregister }; +//! +//! pub fn writeRaw(dw: DDS.DataWriter, kind: WriteKind, +//! key_hash: [16]u8, data: []const u8) !void; +//! Write a pre-serialized CDR payload. The vendor stamps the source +//! timestamp internally (always "now") matching the behaviour of the +//! standard typed write() call in C/C++/Rust shape_main implementations. +//! +//! pub fn writerMatchedCount(dw: DDS.DataWriter) usize; +//! pub fn writerNotifyDeadline(dw: DDS.DataWriter) void; +//! +//! ── DataReader extras ───────────────────────────────────────────────── +//! +//! pub const TakenSample = struct { +//! data: []u8, +//! alloc: std.mem.Allocator, +//! pub fn deinit(self: TakenSample) void, +//! }; +//! +//! pub fn takeRaw(dr: DDS.DataReader) ?TakenSample; +//! Returns the next pending sample, or null if the queue is empty. +//! Caller must call sample.deinit() when done. +//! +//! pub fn readerMatchedCount(dr: DDS.DataReader) usize; +//! pub fn readerNotifyDeadline(dr: DDS.DataReader) void; +//! +//! ── ContentFilteredTopic evaluation ────────────────────────────────── +//! +//! pub const FilterValue = union(enum) { +//! string: []const u8, +//! int: i64, +//! float: f64, +//! }; +//! +//! pub const FieldAccessor = struct { +//! ctx: *anyopaque, +//! get: *const fn (ctx: *anyopaque, field: []const u8) ?FilterValue, +//! }; +//! +//! pub fn cftMatchSample(cft: DDS.ContentFilteredTopic, acc: FieldAccessor) bool; +//! pub fn cftTopicDescription(cft: DDS.ContentFilteredTopic) DDS.TopicDescription; +//! +//! ── TypeSupport (type schema registration) ──────────────────────────── +//! +//! pub const TypeSupport = struct { +//! compute_key_hash: *const fn (payload: []const u8) [16]u8, +//! }; +//! +//! pub fn registerTypeSupport(dp: DDS.DomainParticipant, +//! type_name: []const u8, +//! ts: TypeSupport) void; +//! Register a key-hash computation callback for a named type. Call +//! before creating DataReaders for that type so that received changes +//! whose inline-QoS omits a key_hash can have one computed from the +//! CDR payload. `payload` passed to compute_key_hash includes the +//! 4-byte CDR encapsulation header. + +// ── ShapeType serialization (Option B) ──────────────────────────────────────── +// +// shape_main.zig delegates all CDR serialization and key-hash computation to +// the vendor module so each vendor can handle type support however they like — +// generated code from their IDL compiler, hand-coded CDR, etc. For zzdds the +// implementations live in dds_impl.zig and will eventually be produced by zidl +// from srcZig/shape.idl. +// +// pub const ShapeType = struct { +// color: []const u8, +// x: i32, +// y: i32, +// shapesize: i32, +// additional_payload: u32, // extra CDR bytes appended by publisher (test harness) +// last_payload_byte: ?u8, // last byte of received payload (null if absent) +// }; +// +// pub fn serializeShape( +// buf: *std.ArrayList(u8), alloc: std.mem.Allocator, +// s: ShapeType, xcdr2: bool) !void; +// Serialize `s` into `buf` (cleared first). xcdr2 selects XCDR2 DELIMITED_CDR. +// +// pub fn serializeShapeKeyOnly( +// buf: *std.ArrayList(u8), alloc: std.mem.Allocator, +// color: []const u8) !void; +// Serialize a key-only CDR_LE payload containing just `color`. +// +// pub fn deserializeShape(payload: []const u8) ?ShapeType; +// Parse CDR/CDR2 bytes into ShapeType. Returns null on error or key-only payload. +// Returned `color` slice borrows from `payload`; valid while `payload` is alive. +// +// pub fn deserializeShapeKey(payload: []const u8) []const u8; +// Extract the color key string from any ShapeType CDR payload (full or key-only). +// +// pub fn shapeKeyHash(color: []const u8) [16]u8; +// Compute the RTPS 16-byte key hash for a given color string. +// +// pub fn shapeKeyHashFromCdr(payload: []const u8) [16]u8; +// Compute the RTPS key hash from a received CDR payload. +// Passed as TypeSupport.compute_key_hash to registerTypeSupport. + +// This file is documentation only. shape_main.zig imports the module +// named "dds" which is provided by the vendor's build.zig, not this file. diff --git a/srcZig/shape.idl b/srcZig/shape.idl new file mode 100644 index 0000000..0450a5d --- /dev/null +++ b/srcZig/shape.idl @@ -0,0 +1,9 @@ +@appendable +struct ShapeType { + @key + string<128> color; + int32 x; + int32 y; + int32 shapesize; + sequence additional_payload_size; +}; diff --git a/srcZig/shape_main.zig b/srcZig/shape_main.zig new file mode 100644 index 0000000..a1c3748 --- /dev/null +++ b/srcZig/shape_main.zig @@ -0,0 +1,842 @@ +//! Zig DDS shape_main — interoperability test application for OMG dds-rtps. +//! +//! Mirrors the CLI interface of srcCxx/shape_main.cxx so the Python test +//! harness (interoperability_report.py) can drive it as a pub or sub. +//! +//! This file is vendor-agnostic. All DDS implementation details are hidden +//! behind the "dds" module, which each Zig DDS vendor supplies via their +//! build.zig. See srcZig/dds.zig for the full interface contract. +//! +//! Required stdout strings (matched by the harness via pexpect): +//! Publisher: "Create topic:" → "Create writer for topic:" → +//! "on_publication_matched()" or "on_offered_incompatible_qos" → +//! "%-10s %-10s %03d %03d [%d]" (only when -w is passed) +//! Subscriber: "Create topic:" → "Create reader for topic:" → +//! "[]" in the sample line or "on_requested_incompatible_qos()" + +const std = @import("std"); +const dds = @import("dds"); +const DDS = dds.DDS; +const shape_main_options = @import("shape_main_options"); + +pub const std_options: std.Options = .{ + .log_level = std.meta.stringToEnum(std.log.Level, shape_main_options.log_level) orelse + @compileError("invalid shape_main log level"), +}; + +// ── Time helpers ───────────────────────────────────────────────────────────── +// std.time.nanoTimestamp / std.time.sleep were removed in Zig 0.16. + +fn monoNs() i64 { + var ts: std.os.linux.timespec = undefined; + _ = std.os.linux.clock_gettime(.MONOTONIC, &ts); + return ts.sec * std.time.ns_per_s + ts.nsec; +} + +fn sleepNs(ns: u64) void { + var req = std.os.linux.timespec{ + .sec = @intCast(ns / std.time.ns_per_s), + .nsec = @intCast(ns % std.time.ns_per_s), + }; + _ = std.os.linux.nanosleep(&req, null); +} + +// ── Stdout helpers ──────────────────────────────────────────────────────────── +// std.io was removed in Zig 0.16; write directly via the Linux write(2) syscall. + +fn stdoutWrite(bytes: []const u8) void { + var remaining = bytes; + while (remaining.len > 0) { + const rc = std.os.linux.write(std.posix.STDOUT_FILENO, remaining.ptr, remaining.len); + const n = @as(isize, @bitCast(rc)); + if (n <= 0) break; + remaining = remaining[@intCast(n)..]; + } +} + +fn stdoutPrint(comptime fmt: []const u8, args: anytype) void { + var buf: [2048]u8 = undefined; + var w: std.Io.Writer = .fixed(&buf); + w.print(fmt, args) catch {}; + stdoutWrite(w.buffered()); +} + +// ── Signal handling ─────────────────────────────────────────────────────────── + +var g_all_done: std.atomic.Value(bool) = std.atomic.Value(bool).init(false); + +fn handleSigint(sig: std.posix.SIG) callconv(.c) void { + _ = sig; + g_all_done.store(true, .release); +} + +// ── Options ─────────────────────────────────────────────────────────────────── + +const Options = struct { + publish: bool = false, + subscribe: bool = false, + domain_id: u32 = 0, + best_effort: bool = false, + reliable: bool = false, + history_depth: i32 = -1, // -1 = use default KEEP_LAST 1 + deadline_ms: u64 = 0, + ownership_strength: i32 = -1, // -1 = SHARED + topic_name: []const u8 = "Square", + color: ?[]const u8 = null, + partition: ?[]const u8 = null, + durability: u8 = 'v', + data_representation: u16 = 1, // 1=XCDR1, 2=XCDR2 + print_writer_samples: bool = false, + shapesize: i32 = 20, + write_period_ms: u64 = 33, + read_period_ms: u64 = 100, + num_iterations: i64 = -1, // -1 = infinite + num_instances: u32 = 1, + additional_payload: u32 = 0, + size_modulo: i32 = 0, // 0 = no cycling (--size-modulo) + cft_expression: ?[]const u8 = null, // content filter expression (--cft) + time_filter_ms: u64 = 0, // TIME_BASED_FILTER minimum_separation in ms (--time-filter) + final_instance_state: u8 = 0, // 0=none, 'u'=unregister, 'd'=dispose (--final-instance-state) + access_scope: u8 = 'i', // 'i'=instance (default), 't'=topic, 'g'=group (--access-scope) + ordered_access: bool = false, // --ordered + coherent_access: bool = false, // --coherent + num_topics: u32 = 1, // --num-topics (>1 not supported) + take_read: bool = false, // --take-read (not supported) +}; + +// ── ShapeType ───────────────────────────────────────────────────────────────── +// CDR serialization/deserialization and key-hash computation live in the vendor +// module (dds_impl.zig for zzdds) so each vendor can handle type support in +// whatever way suits them — generated code, hand-coded, or otherwise. +// shape_main.zig only knows about dds.ShapeType and calls dds.serializeShape / +// dds.deserializeShape / dds.serializeShapeKeyOnly / dds.deserializeShapeKey. + +// ── Policy name mapping ─────────────────────────────────────────────────────── + +fn policyName(id: i32) []const u8 { + return switch (id) { + 2 => "DURABILITY", + 4 => "DEADLINE", + 5 => "LATENCYBUDGET", + 6 => "OWNERSHIP", + 8 => "LIVELINESS", + 10 => "PARTITION", + 11 => "RELIABILITY", + 12 => "DESTINATIONORDER", + 23 => "DATAREPRESENTATION", + else => "UNKNOWN", + }; +} + +// ── Listener context and vtables ────────────────────────────────────────────── + +const ListenerCtx = struct { + topic_name: []const u8, + type_name: []const u8 = "ShapeType", +}; + +// DataWriter listeners + +fn dwOnIncompatQos(ctx: *anyopaque, _: DDS.DataWriter, status: DDS.OfferedIncompatibleQosStatus) void { + const lc: *ListenerCtx = @ptrCast(@alignCast(ctx)); + stdoutPrint("on_offered_incompatible_qos() topic: '{s}' type: '{s}' : {d} ({s})\n", .{ lc.topic_name, lc.type_name, status.last_policy_id, policyName(status.last_policy_id) }); +} + +fn dwOnDeadlineMissed(ctx: *anyopaque, _: DDS.DataWriter, status: DDS.OfferedDeadlineMissedStatus) void { + const lc: *ListenerCtx = @ptrCast(@alignCast(ctx)); + stdoutPrint("on_offered_deadline_missed() topic: '{s}' type: '{s}' : (total = {d}, change = {d})\n", .{ lc.topic_name, lc.type_name, status.total_count, status.total_count_change }); +} + +fn dwOnLivelinessLost(_: *anyopaque, _: DDS.DataWriter, _: DDS.LivelinessLostStatus) void {} +fn dwOnPubMatched(_: *anyopaque, _: DDS.DataWriter, _: DDS.PublicationMatchedStatus) void {} +fn dwOnDeinit(_: *anyopaque) void {} + +var dw_vtable = DDS.DataWriterListener.Vtable{ + .on_offered_deadline_missed = dwOnDeadlineMissed, + .on_offered_incompatible_qos = dwOnIncompatQos, + .on_liveliness_lost = dwOnLivelinessLost, + .on_publication_matched = dwOnPubMatched, + .deinit = dwOnDeinit, +}; + +// DataReader listeners + +fn drOnIncompatQos(ctx: *anyopaque, _: DDS.DataReader, status: DDS.RequestedIncompatibleQosStatus) void { + const lc: *ListenerCtx = @ptrCast(@alignCast(ctx)); + stdoutPrint("on_requested_incompatible_qos() topic: '{s}' type: '{s}' : {d} ({s})\n", .{ lc.topic_name, lc.type_name, status.last_policy_id, policyName(status.last_policy_id) }); +} + +fn drOnDeadlineMissed(ctx: *anyopaque, _: DDS.DataReader, status: DDS.RequestedDeadlineMissedStatus) void { + const lc: *ListenerCtx = @ptrCast(@alignCast(ctx)); + stdoutPrint("on_requested_deadline_missed() topic: '{s}' type: '{s}' : (total = {d}, change = {d})\n", .{ lc.topic_name, lc.type_name, status.total_count, status.total_count_change }); +} + +fn drOnSampleRejected(_: *anyopaque, _: DDS.DataReader, _: DDS.SampleRejectedStatus) void {} +fn drOnLivelinessChanged(_: *anyopaque, _: DDS.DataReader, _: DDS.LivelinessChangedStatus) void {} +fn drOnDataAvail(_: *anyopaque, _: DDS.DataReader) void {} +fn drOnSubMatched(_: *anyopaque, _: DDS.DataReader, _: DDS.SubscriptionMatchedStatus) void {} +fn drOnSampleLost(_: *anyopaque, _: DDS.DataReader, _: DDS.SampleLostStatus) void {} +fn drOnDeinit(_: *anyopaque) void {} + +var dr_vtable = DDS.DataReaderListener.Vtable{ + .on_requested_deadline_missed = drOnDeadlineMissed, + .on_requested_incompatible_qos = drOnIncompatQos, + .on_sample_rejected = drOnSampleRejected, + .on_liveliness_changed = drOnLivelinessChanged, + .on_data_available = drOnDataAvail, + .on_subscription_matched = drOnSubMatched, + .on_sample_lost = drOnSampleLost, + .deinit = drOnDeinit, +}; + +// ── DataWriter QoS builder ──────────────────────────────────────────────────── + +fn buildWriterQos(alloc: std.mem.Allocator, opts: *const Options) !DDS.DataWriterQos { + var qos = DDS.DataWriterQos{}; + + qos.reliability.kind = if (opts.best_effort) + .BEST_EFFORT_RELIABILITY_QOS + else + .RELIABLE_RELIABILITY_QOS; + + if (opts.history_depth == 0) { + qos.history.kind = .KEEP_ALL_HISTORY_QOS; + } else if (opts.history_depth > 0) { + qos.history.kind = .KEEP_LAST_HISTORY_QOS; + qos.history.depth = opts.history_depth; + } + + if (opts.deadline_ms > 0) { + qos.deadline.period = .{ + .sec = @intCast(opts.deadline_ms / 1000), + .nanosec = @intCast((opts.deadline_ms % 1000) * std.time.ns_per_ms), + }; + } + + if (opts.ownership_strength >= 0) { + qos.ownership.kind = .EXCLUSIVE_OWNERSHIP_QOS; + qos.ownership_strength.value = opts.ownership_strength; + } + + qos.durability.kind = switch (opts.durability) { + 'v' => .VOLATILE_DURABILITY_QOS, + 'l' => .TRANSIENT_LOCAL_DURABILITY_QOS, + 't' => .TRANSIENT_DURABILITY_QOS, + 'p' => .PERSISTENT_DURABILITY_QOS, + else => .VOLATILE_DURABILITY_QOS, + }; + + const repr_id: i16 = if (opts.data_representation == 2) 2 else 0; + try qos.data_representation.value.append(alloc, repr_id); + + return qos; +} + +// ── DataReader QoS builder ──────────────────────────────────────────────────── + +fn buildReaderQos(alloc: std.mem.Allocator, opts: *const Options) !DDS.DataReaderQos { + var qos = DDS.DataReaderQos{}; + + qos.reliability.kind = if (opts.best_effort) + .BEST_EFFORT_RELIABILITY_QOS + else + .RELIABLE_RELIABILITY_QOS; + + if (opts.history_depth == 0) { + qos.history.kind = .KEEP_ALL_HISTORY_QOS; + } else if (opts.history_depth > 0) { + qos.history.kind = .KEEP_LAST_HISTORY_QOS; + qos.history.depth = opts.history_depth; + } + + if (opts.deadline_ms > 0) { + qos.deadline.period = .{ + .sec = @intCast(opts.deadline_ms / 1000), + .nanosec = @intCast((opts.deadline_ms % 1000) * std.time.ns_per_ms), + }; + } + + if (opts.ownership_strength >= 0) { + qos.ownership.kind = .EXCLUSIVE_OWNERSHIP_QOS; + } + + qos.durability.kind = switch (opts.durability) { + 'v' => .VOLATILE_DURABILITY_QOS, + 'l' => .TRANSIENT_LOCAL_DURABILITY_QOS, + 't' => .TRANSIENT_DURABILITY_QOS, + 'p' => .PERSISTENT_DURABILITY_QOS, + else => .VOLATILE_DURABILITY_QOS, + }; + + if (opts.time_filter_ms > 0) { + qos.time_based_filter.minimum_separation = .{ + .sec = @intCast(opts.time_filter_ms / 1000), + .nanosec = @intCast((opts.time_filter_ms % 1000) * std.time.ns_per_ms), + }; + } + + const repr_id: i16 = if (opts.data_representation == 2) 2 else 0; + try qos.data_representation.value.append(alloc, repr_id); + + return qos; +} + +// ── nil sentinel helpers ────────────────────────────────────────────────────── +// Delegated to the vendor dds module, which knows the implementation's nil +// sentinel value (each vendor may use a different non-null sentinel address). + +fn isNilTopic(t: DDS.Topic) bool { + return dds.isNilTopic(t); +} +fn isNilPub(p: DDS.Publisher) bool { + return dds.isNilPub(p); +} +fn isNilSub(s: DDS.Subscriber) bool { + return dds.isNilSub(s); +} +fn isNilDw(dw: DDS.DataWriter) bool { + return dds.isNilDw(dw); +} +fn isNilDr(dr: DDS.DataReader) bool { + return dds.isNilDr(dr); +} +fn isNilCft(cft: DDS.ContentFilteredTopic) bool { + return dds.isNilCft(cft); +} + +// ── Publisher ───────────────────────────────────────────────────────────────── + +fn runPublisher( + alloc: std.mem.Allocator, + dp: DDS.DomainParticipant, + topic: DDS.Topic, + opts: *const Options, +) !void { + const color = opts.color orelse "BLUE"; + const topic_name = dds.topicName(topic); + + const pub_presentation = DDS.PresentationQosPolicy{ + .access_scope = switch (opts.access_scope) { + 't' => .TOPIC_PRESENTATION_QOS, + 'g' => .GROUP_PRESENTATION_QOS, + else => .INSTANCE_PRESENTATION_QOS, + }, + .coherent_access = opts.coherent_access, + .ordered_access = opts.ordered_access, + }; + var pub_partition_name_buf: [1][]const u8 = .{opts.partition orelse ""}; + const pub_qos: DDS.PublisherQos = if (opts.partition) |_| .{ + .presentation = pub_presentation, + .partition = .{ .name = .{ .items = &pub_partition_name_buf, .capacity = 1 } }, + } else .{ .presentation = pub_presentation }; + const pub_ = dp.vtable.create_publisher(dp.ptr, pub_qos, dds.nilPublisherListener(), 0); + if (isNilPub(pub_)) return error.PublisherFailed; + + var dw_qos = try buildWriterQos(alloc, opts); + defer dw_qos.data_representation.value.deinit(alloc); + + var lctx = ListenerCtx{ .topic_name = topic_name }; + const dw_listener = DDS.DataWriterListener{ + .ptr = &lctx, + .vtable = &dw_vtable, + }; + const listener_mask: DDS.StatusMask = + DDS.OFFERED_INCOMPATIBLE_QOS_STATUS | DDS.OFFERED_DEADLINE_MISSED_STATUS; + + const dw = pub_.vtable.create_datawriter(pub_.ptr, topic, dw_qos, dw_listener, listener_mask); + if (isNilDw(dw)) return error.DataWriterFailed; + + stdoutPrint("Create writer for topic: {s} color: {s}\n", .{ topic_name, color }); + + var buf: std.ArrayList(u8) = .empty; + defer buf.deinit(alloc); + + var shape = dds.ShapeType{ + .color = color, + .x = 0, + .y = 0, + .shapesize = if (opts.shapesize == 0) 1 else opts.shapesize, + .additional_payload = opts.additional_payload, + }; + var rng = std.Random.DefaultPrng.init(@intCast(monoNs())); + const rand = rng.random(); + + const match_deadline = monoNs() + 10 * std.time.ns_per_s; + var printed_matched = false; + + const deadline_ns: i64 = if (opts.deadline_ms > 0) + @intCast(opts.deadline_ms * std.time.ns_per_ms) + else + 0; + var last_write_ns: i64 = monoNs(); + + var iteration: i64 = 0; + while (!g_all_done.load(.acquire)) { + if (opts.num_iterations >= 0 and iteration >= opts.num_iterations) break; + + if (!printed_matched) { + if (dds.writerMatchedCount(dw) > 0) { + stdoutPrint( + "on_publication_matched() topic: '{s}' type: 'ShapeType' : matched readers {d} (change = 1)\n", + .{ topic_name, dds.writerMatchedCount(dw) }, + ); + printed_matched = true; + } else if (monoNs() > match_deadline) { + return; // READER_NOT_MATCHED + } + } + + if (deadline_ns > 0) { + const elapsed = monoNs() - last_write_ns; + if (elapsed > deadline_ns) dds.writerNotifyDeadline(dw); + } + + shape.x = @rem(@as(i32, rand.int(u16)), 320); + shape.y = @rem(@as(i32, rand.int(u16)), 240); + + for (0..opts.num_instances) |inst| { + const inst_color: []const u8 = blk: { + if (inst == 0) break :blk color; + break :blk std.fmt.allocPrint(alloc, "{s}{d}", .{ color, inst }) catch color; + }; + defer if (inst > 0) alloc.free(inst_color); + + shape.color = inst_color; + try dds.serializeShape(&buf, alloc, shape, opts.data_representation == 2); + + const key_hash = dds.shapeKeyHash(inst_color); + try dds.writeRaw(dw, .alive, key_hash, buf.items); + + if (opts.print_writer_samples) { + stdoutPrint("{s:<10} {s:<10} {d:0>3} {d:0>3} [{d}]\n", .{ topic_name, inst_color, @as(u32, @intCast(shape.x)), @as(u32, @intCast(shape.y)), shape.shapesize }); + } + } + + if (opts.shapesize == 0) { + shape.shapesize += 1; + if (opts.size_modulo > 0 and shape.shapesize > opts.size_modulo) + shape.shapesize = 1; + } + + last_write_ns = monoNs(); + iteration += 1; + sleepNs(opts.write_period_ms * std.time.ns_per_ms); + } + + // Unregister/dispose all instances when the publisher exits with a finite num_iterations. + // For explicit --final-instance-state, use the requested kind; otherwise default to .unregister + // so that DataReader instances transition to NOT_ALIVE_NO_WRITERS (DDS spec §2.2.2.4.1.13). + if (opts.num_iterations >= 0) { + const kind: dds.WriteKind = switch (opts.final_instance_state) { + 'd' => .dispose, + else => .unregister, + }; + for (0..opts.num_instances) |inst| { + const inst_color: []const u8 = blk: { + if (inst == 0) break :blk color; + break :blk std.fmt.allocPrint(alloc, "{s}{d}", .{ color, inst }) catch color; + }; + defer if (inst > 0) alloc.free(inst_color); + const key_hash = dds.shapeKeyHash(inst_color); + try dds.serializeShapeKeyOnly(&buf, alloc, inst_color); + dds.writeRaw(dw, kind, key_hash, buf.items) catch {}; + } + // Brief drain to let RELIABLE transport deliver the NOT_ALIVE changes before + // the participant is torn down. + sleepNs(300 * std.time.ns_per_ms); + } +} + +// ── Subscriber ──────────────────────────────────────────────────────────────── + +fn runSubscriber( + alloc: std.mem.Allocator, + dp: DDS.DomainParticipant, + topic: DDS.Topic, + opts: *const Options, +) !void { + const topic_name = dds.topicName(topic); + + // When -c COLOR is passed to a subscriber without --cft, synthesize a color filter. + var synth_cft_buf: [64]u8 = undefined; + const effective_cft_expr: ?[]const u8 = if (opts.cft_expression) |e| + e + else if (opts.color) |c| + std.fmt.bufPrint(&synth_cft_buf, "color = '{s}'", .{c}) catch null + else + null; + + const cft: ?DDS.ContentFilteredTopic = blk: { + const expr = effective_cft_expr orelse break :blk null; + const cft_name = std.fmt.allocPrint( + alloc, + "{s}_cft", + .{topic_name}, + ) catch break :blk null; + defer alloc.free(cft_name); + const c = dp.vtable.create_contentfilteredtopic( + dp.ptr, + cft_name, + topic, + expr, + .empty, + ); + if (isNilCft(c)) break :blk null; + break :blk c; + }; + defer { + if (cft) |c| _ = dp.vtable.delete_contentfilteredtopic(dp.ptr, c); + } + + const topic_desc: DDS.TopicDescription = if (cft) |c| + dds.cftTopicDescription(c) + else + dp.vtable.lookup_topicdescription(dp.ptr, dds.topicName(topic)); + + const sub_presentation = DDS.PresentationQosPolicy{ + .access_scope = switch (opts.access_scope) { + 't' => .TOPIC_PRESENTATION_QOS, + 'g' => .GROUP_PRESENTATION_QOS, + else => .INSTANCE_PRESENTATION_QOS, + }, + .coherent_access = opts.coherent_access, + .ordered_access = opts.ordered_access, + }; + var sub_partition_name_buf: [1][]const u8 = .{opts.partition orelse ""}; + const sub_qos: DDS.SubscriberQos = if (opts.partition) |_| .{ + .presentation = sub_presentation, + .partition = .{ .name = .{ .items = &sub_partition_name_buf, .capacity = 1 } }, + } else .{ .presentation = sub_presentation }; + const sub = dp.vtable.create_subscriber(dp.ptr, sub_qos, dds.nilSubscriberListener(), 0); + if (isNilSub(sub)) return error.SubscriberFailed; + + var dr_qos = try buildReaderQos(alloc, opts); + defer dr_qos.data_representation.value.deinit(alloc); + + var lctx = ListenerCtx{ .topic_name = topic_name }; + const dr_listener = DDS.DataReaderListener{ + .ptr = &lctx, + .vtable = &dr_vtable, + }; + const listener_mask: DDS.StatusMask = + DDS.REQUESTED_INCOMPATIBLE_QOS_STATUS | DDS.REQUESTED_DEADLINE_MISSED_STATUS; + + const dr = sub.vtable.create_datareader(sub.ptr, topic_desc, dr_qos, dr_listener, listener_mask); + if (isNilDr(dr)) return error.DataReaderFailed; + + stdoutPrint("Create reader for topic: {s}\n", .{topic_name}); + + const sub_deadline_ns: i64 = if (opts.deadline_ms > 0) + @intCast(opts.deadline_ms * std.time.ns_per_ms) + else + 0; + var deadline_base_ns: i64 = 0; + + const ShapeAccessor = struct { + shape: *const dds.ShapeType, + + fn get(ctx: *anyopaque, field: []const u8) ?dds.FilterValue { + const self: *const @This() = @ptrCast(@alignCast(ctx)); + if (std.mem.eql(u8, field, "color")) + return .{ .string = self.shape.color }; + if (std.mem.eql(u8, field, "x")) + return .{ .int = self.shape.x }; + if (std.mem.eql(u8, field, "y")) + return .{ .int = self.shape.y }; + if (std.mem.eql(u8, field, "shapesize")) + return .{ .int = self.shape.shapesize }; + return null; + } + }; + + var iteration: i64 = 0; + while (!g_all_done.load(.acquire)) { + if (opts.num_iterations >= 0 and iteration >= opts.num_iterations) break; + + if (sub_deadline_ns > 0 and deadline_base_ns == 0 and dds.readerMatchedCount(dr) > 0) { + deadline_base_ns = monoNs(); + } + + var got_data = false; + while (dds.takeRaw(dr)) |taken| { + defer taken.deinit(); + got_data = true; + + if (taken.instance_state == DDS.NOT_ALIVE_NO_WRITERS_INSTANCE_STATE or + taken.instance_state == DDS.NOT_ALIVE_DISPOSED_INSTANCE_STATE) + { + const key = dds.deserializeShapeKey(taken.data); + const state_str = if (taken.instance_state == DDS.NOT_ALIVE_DISPOSED_INSTANCE_STATE) + "NOT_ALIVE_DISPOSED_INSTANCE_STATE" + else + "NOT_ALIVE_NO_WRITERS_INSTANCE_STATE"; + stdoutPrint("{s:<10} {s:<10} {s}\n", .{ topic_name, key, state_str }); + continue; + } + + const s = dds.deserializeShape(taken.data) orelse continue; + + if (cft) |c| { + var acc_ctx = ShapeAccessor{ .shape = &s }; + const accessor = dds.FieldAccessor{ + .ctx = &acc_ctx, + .get = ShapeAccessor.get, + }; + if (!dds.cftMatchSample(c, accessor)) continue; + } + + if (s.last_payload_byte) |lb| { + stdoutPrint("{s:<10} {s:<10} {d:0>3} {d:0>3} [{d}] {{{d}}}\n", .{ topic_name, s.color, @as(u32, @intCast(s.x)), @as(u32, @intCast(s.y)), s.shapesize, lb }); + } else { + stdoutPrint("{s:<10} {s:<10} {d:0>3} {d:0>3} [{d}]\n", .{ topic_name, s.color, @as(u32, @intCast(s.x)), @as(u32, @intCast(s.y)), s.shapesize }); + } + } + + if (got_data) { + deadline_base_ns = monoNs(); + } else if (sub_deadline_ns > 0 and deadline_base_ns != 0) { + if (monoNs() - deadline_base_ns > sub_deadline_ns) { + dds.readerNotifyDeadline(dr); + deadline_base_ns = monoNs(); + } + } + + iteration += 1; + sleepNs(opts.read_period_ms * std.time.ns_per_ms); + } +} + +// ── Argument parsing ────────────────────────────────────────────────────────── + +fn parseArgs(process_args: std.process.Args) !Options { + var opts = Options{}; + var it = std.process.Args.Iterator.init(process_args); + _ = it.skip(); // program name + + while (it.next()) |arg| { + if (std.mem.eql(u8, arg, "-P")) { + opts.publish = true; + } else if (std.mem.eql(u8, arg, "-S")) { + opts.subscribe = true; + } else if (std.mem.eql(u8, arg, "-b")) { + opts.best_effort = true; + } else if (std.mem.eql(u8, arg, "-r")) { + opts.reliable = true; + } else if (std.mem.eql(u8, arg, "-w")) { + opts.print_writer_samples = true; + } else if (std.mem.eql(u8, arg, "-R")) { + // use read() instead of take() — no-op (we always use takeRaw) + } else if (std.mem.eql(u8, arg, "-d")) { + const v = it.next() orelse return error.MissingValue; + opts.domain_id = try std.fmt.parseInt(u32, v, 10); + } else if (std.mem.eql(u8, arg, "-k")) { + const v = it.next() orelse return error.MissingValue; + opts.history_depth = try std.fmt.parseInt(i32, v, 10); + } else if (std.mem.eql(u8, arg, "-f")) { + const v = it.next() orelse return error.MissingValue; + opts.deadline_ms = try std.fmt.parseInt(u64, v, 10); + } else if (std.mem.eql(u8, arg, "-s")) { + const v = it.next() orelse return error.MissingValue; + opts.ownership_strength = try std.fmt.parseInt(i32, v, 10); + } else if (std.mem.eql(u8, arg, "-t")) { + const v = it.next() orelse return error.MissingValue; + opts.topic_name = v; + } else if (std.mem.eql(u8, arg, "-c")) { + const v = it.next() orelse return error.MissingValue; + opts.color = v; + } else if (std.mem.eql(u8, arg, "-p")) { + const v = it.next() orelse return error.MissingValue; + opts.partition = v; + } else if (std.mem.eql(u8, arg, "-D")) { + const v = it.next() orelse return error.MissingValue; + opts.durability = if (v.len > 0) v[0] else 'v'; + } else if (std.mem.eql(u8, arg, "-x")) { + const v = it.next() orelse return error.MissingValue; + opts.data_representation = std.fmt.parseInt(u16, v, 10) catch 1; + } else if (std.mem.eql(u8, arg, "-z")) { + const v = it.next() orelse return error.MissingValue; + opts.shapesize = std.fmt.parseInt(i32, v, 10) catch 20; + } else if (std.mem.eql(u8, arg, "-n") or + std.mem.eql(u8, arg, "--num-instances")) + { + const v = it.next() orelse return error.MissingValue; + opts.num_instances = std.fmt.parseInt(u32, v, 10) catch 1; + } else if (std.mem.eql(u8, arg, "--write-period")) { + const v = it.next() orelse return error.MissingValue; + opts.write_period_ms = std.fmt.parseInt(u64, v, 10) catch 33; + } else if (std.mem.eql(u8, arg, "--read-period")) { + const v = it.next() orelse return error.MissingValue; + opts.read_period_ms = std.fmt.parseInt(u64, v, 10) catch 100; + } else if (std.mem.eql(u8, arg, "--num-iterations") or + std.mem.eql(u8, arg, "-i")) + { + const v = it.next() orelse return error.MissingValue; + opts.num_iterations = std.fmt.parseInt(i64, v, 10) catch -1; + } else if (std.mem.eql(u8, arg, "--additional-payload") or + std.mem.eql(u8, arg, "--additional-payload-size")) + { + const v = it.next() orelse return error.MissingValue; + opts.additional_payload = std.fmt.parseInt(u32, v, 10) catch 0; + } else if (std.mem.eql(u8, arg, "--size-modulo")) { + const v = it.next() orelse return error.MissingValue; + opts.size_modulo = std.fmt.parseInt(i32, v, 10) catch 0; + } else if (std.mem.eql(u8, arg, "--cft")) { + opts.cft_expression = it.next() orelse return error.MissingValue; + } else if (std.mem.eql(u8, arg, "--time-filter")) { + const v = it.next() orelse return error.MissingValue; + opts.time_filter_ms = std.fmt.parseInt(u64, v, 10) catch 0; + } else if (std.mem.eql(u8, arg, "--final-instance-state")) { + const v = it.next() orelse return error.MissingValue; + opts.final_instance_state = if (v.len > 0) v[0] else 0; + } else if (std.mem.eql(u8, arg, "--access-scope")) { + const v = it.next() orelse return error.MissingValue; + opts.access_scope = if (v.len > 0) v[0] else 'i'; + } else if (std.mem.eql(u8, arg, "--ordered")) { + opts.ordered_access = true; + } else if (std.mem.eql(u8, arg, "--coherent")) { + opts.coherent_access = true; + } else if (std.mem.eql(u8, arg, "--num-topics")) { + const v = it.next() orelse return error.MissingValue; + opts.num_topics = std.fmt.parseInt(u32, v, 10) catch 1; + } else if (std.mem.eql(u8, arg, "--take-read")) { + opts.take_read = true; + } else if (std.mem.eql(u8, arg, "--publisher-matches") or + std.mem.eql(u8, arg, "--subscriber-matches") or + std.mem.eql(u8, arg, "--deadline") or + std.mem.eql(u8, arg, "--periodic-announcement") or + std.mem.eql(u8, arg, "--coherent-sample-count")) + { + // consume argument value and ignore — unimplemented options + _ = it.next(); + } else if (std.mem.eql(u8, arg, "-h") or std.mem.eql(u8, arg, "--help")) { + stdoutWrite( + \\Usage: shape_main -P|-S [options] + \\ + \\Mode (required): + \\ -P Publisher + \\ -S Subscriber + \\ + \\QoS: + \\ -b BEST_EFFORT reliability (default: RELIABLE) + \\ -r RELIABLE reliability (explicit) + \\ -k History depth; 0 = KEEP_ALL (default: KEEP_LAST 1) + \\ -D v|l|t|p Durability: volatile, transient-local, transient, persistent + \\ -f Deadline period in milliseconds + \\ -s Ownership strength (enables EXCLUSIVE ownership) + \\ -x 1|2 Data representation: 1=XCDR1 (default), 2=XCDR2 + \\ -p Partition name + \\ + \\Topic / data: + \\ -t Topic name (default: Square) + \\ -c Color / key value (default: BLUE) + \\ -z Shape size; 0 = auto-increment each sample (default: 20) + \\ -n Number of instances to publish (default: 1) + \\ --additional-payload Extra zero bytes appended to each sample + \\ --size-modulo Cycle shapesize 1..n when -z 0 is active + \\ --cft Content filter expression (subscriber only) + \\ + \\Timing / iterations: + \\ -i, --num-iterations Stop after n samples (-1 = infinite, default) + \\ --write-period Publish interval in ms (default: 33) + \\ --read-period Read poll interval in ms (default: 100) + \\ + \\Other: + \\ -d Domain ID (default: 0) + \\ -w Print each sample on the writer side + \\ -h, --help Show this help and exit + \\ + \\Environment variables: + \\ SHAPE_STARTUP_DELAY_MS= Sleep before creating the DDS participant. + \\ Useful for late-join testing without relying + \\ on fixed sleeps in the test harness. + \\ + ); + std.process.exit(0); + } else if (std.mem.startsWith(u8, arg, "--") or std.mem.startsWith(u8, arg, "-")) { + std.log.warn("unrecognised option: {s}", .{arg}); + } + } + + // Publisher default color + if (opts.publish and opts.color == null) { + opts.color = "BLUE"; + } + + return opts; +} + +// ── main ────────────────────────────────────────────────────────────────────── + +pub fn main(init: std.process.Init.Minimal) !void { + const sa = std.posix.Sigaction{ + .handler = .{ .handler = handleSigint }, + .mask = std.posix.sigemptyset(), + .flags = 0, + }; + std.posix.sigaction(std.posix.SIG.INT, &sa, null); + + var gpa = std.heap.DebugAllocator(.{}){}; + defer _ = gpa.deinit(); + const alloc = gpa.allocator(); + + if (std.c.getenv("SHAPE_STARTUP_DELAY_MS")) |v| { + const ms = std.fmt.parseInt(u64, std.mem.span(v), 10) catch 0; + if (ms > 0) sleepNs(ms * std.time.ns_per_ms); + } + + const opts = parseArgs(init.args) catch |err| { + std.log.err("argument error: {}", .{err}); + std.process.exit(1); + }; + + if (!opts.publish and !opts.subscribe) { + std.log.err("specify -P (publish) or -S (subscribe)", .{}); + std.process.exit(1); + } + + // Signal unsupported features to the test harness so it marks them as vendor-skipped. + if (opts.num_topics > 1) { + stdoutPrint("not supported: --num-topics > 1\n", .{}); + return; + } + if (opts.take_read) { + stdoutPrint("not supported: --take-read\n", .{}); + return; + } + + const participant = dds.createParticipant(alloc, opts.domain_id) catch |err| { + std.log.err("failed to create participant on domain {d}: {}", .{ opts.domain_id, err }); + std.process.exit(1); + }; + defer dds.destroyParticipant(participant); + const dp = participant.toDDS(); + + dds.registerTypeSupport(dp, "ShapeType", .{ .compute_key_hash = dds.shapeKeyHashFromCdr }); + + const topic = dp.vtable.create_topic( + dp.ptr, + opts.topic_name, + "ShapeType", + .{}, + dds.nilTopicListener(), + 0, + ); + if (isNilTopic(topic)) { + std.log.err("failed to create topic '{s}'", .{opts.topic_name}); + std.process.exit(1); + } + + stdoutPrint("Create topic: {s}\n", .{opts.topic_name}); + + if (opts.publish) { + runPublisher(alloc, dp, topic, &opts) catch |err| { + std.log.err("publisher error: {}", .{err}); + std.process.exit(1); + }; + } else { + runSubscriber(alloc, dp, topic, &opts) catch |err| { + std.log.err("subscriber error: {}", .{err}); + std.process.exit(1); + }; + } +} diff --git a/srcZig/zzdds/.gitignore b/srcZig/zzdds/.gitignore new file mode 100644 index 0000000..03cb27d --- /dev/null +++ b/srcZig/zzdds/.gitignore @@ -0,0 +1,3 @@ +.zig-cache/ +zig-out/ +zig-pkg/ diff --git a/srcZig/zzdds/build.zig b/srcZig/zzdds/build.zig new file mode 100644 index 0000000..1b79275 --- /dev/null +++ b/srcZig/zzdds/build.zig @@ -0,0 +1,83 @@ +const std = @import("std"); + +pub fn build(b: *std.Build) void { + const target = b.standardTargetOptions(.{}); + const optimize = b.standardOptimizeOption(.{}); + + const LogLevel = enum { err, warn, info, debug }; + + const version = b.option([]const u8, "dds-version", "Full zzdds version string for the executable name (e.g. 0.1.0-zig.0.16.0); omit for a stable CI-friendly name"); + const sanitize_thread = b.option(bool, "sanitize-thread", "Enable ThreadSanitizer (requires libc, Linux only)") orelse false; + const default_log_level: LogLevel = switch (optimize) { + .Debug => .debug, + .ReleaseSafe, .ReleaseFast, .ReleaseSmall => .info, + }; + const log_level = b.option(LogLevel, "log-level", "shape_main std.log level: err, warn, info, debug (default matches Zig build mode)") orelse default_log_level; + + const zzdds_dep = b.dependency("zzdds", .{ .target = target, .optimize = optimize }); + const zzdds_mod = zzdds_dep.module("zzdds"); + const zzdds_gen = zzdds_dep.module("zzdds_generated"); + + // Acquire zidl executable and zidl_rt module from the zidl dependency. + const zidl_dep = b.dependency("zidl", .{ .target = target, .optimize = optimize }); + const zidl_exe = zidl_dep.artifact("zidl"); + const zidl_rt_mod = zidl_dep.module("zidl_rt"); + + // Generate ShapeType Zig bindings from srcZig/shape.idl. + // Output lands in the build cache (not checked in). + const gen_shape = b.addRunArtifact(zidl_exe); + gen_shape.addArgs(&.{ "-b", "zig", "--split-files", "-o" }); + const shape_gen_dir = gen_shape.addOutputDirectoryArg("shape-generated"); + gen_shape.addFileArg(b.path("../shape.idl")); + + const shape_gen_mod = b.createModule(.{ + .root_source_file = shape_gen_dir.path(b, "shape.zig"), + .target = target, + .optimize = optimize, + .imports = &.{ + .{ .name = "zidl_rt", .module = zidl_rt_mod }, + }, + }); + + // Build the "dds" shim module from our vendor implementation. + // shape_main.zig imports only this module; it has no direct zzdds dependency. + const dds_mod = b.createModule(.{ + .root_source_file = b.path("dds_impl.zig"), + .target = target, + .optimize = optimize, + .imports = &.{ + .{ .name = "zzdds", .module = zzdds_mod }, + .{ .name = "zzdds_generated", .module = zzdds_gen }, + .{ .name = "shape_gen", .module = shape_gen_mod }, + .{ .name = "zidl_rt", .module = zidl_rt_mod }, + }, + }); + + const exe_name = if (version) |v| + std.fmt.allocPrint(b.allocator, "zzdds-{s}_shape_main_linux", .{v}) catch @panic("OOM") + else + "zzdds_shape_main_linux"; + const shape_main_options = b.addOptions(); + shape_main_options.addOption([]const u8, "log_level", @tagName(log_level)); + + const exe = b.addExecutable(.{ + .name = exe_name, + .root_module = b.createModule(.{ + .root_source_file = b.path("../shape_main.zig"), + .target = target, + .optimize = optimize, + .imports = &.{ + .{ .name = "dds", .module = dds_mod }, + .{ .name = "shape_main_options", .module = shape_main_options.createModule() }, + }, + }), + }); + exe.root_module.link_libc = true; + exe.root_module.sanitize_thread = sanitize_thread; + b.installArtifact(exe); + + const run_step = b.step("run", "Run shape_main"); + const run_cmd = b.addRunArtifact(exe); + if (b.args) |args| run_cmd.addArgs(args); + run_step.dependOn(&run_cmd.step); +} diff --git a/srcZig/zzdds/build.zig.zon b/srcZig/zzdds/build.zig.zon new file mode 100644 index 0000000..1db598c --- /dev/null +++ b/srcZig/zzdds/build.zig.zon @@ -0,0 +1,20 @@ +.{ + .name = .zzdds_shape_main, + .version = "0.0.0", + .fingerprint = 0x336e9664cb8850f7, + .minimum_zig_version = "0.16.0", + .dependencies = .{ + .zzdds = .{ + .url = "https://github.com/zz-iot/zzdds/archive/refs/tags/v0.1.1-zig.0.16.0.tar.gz", + .hash = "zzdds-0.1.1-zig.0.16.0-sc7dmmd6EQDV3ZWDOlSq3haCRrGWOVURHbeRw_MtuVCS", + }, + .zidl = .{ + .url = "https://github.com/zz-iot/zidl/archive/refs/tags/v0.1.0-zig.0.16.0.tar.gz", + .hash = "zidl-0.1.0-zig.0.16.0-H6NwLstUGABxQfEtYyyc0UyX1ownreZMlXQIjHPb0vWO", + }, + }, + .paths = .{ + "build.zig", + "build.zig.zon", + }, +} diff --git a/srcZig/zzdds/dds_impl.zig b/srcZig/zzdds/dds_impl.zig new file mode 100644 index 0000000..730e4f6 --- /dev/null +++ b/srcZig/zzdds/dds_impl.zig @@ -0,0 +1,343 @@ +//! ZenzenDDS implementation of the srcZig/dds shim protocol. +//! +//! shape_main.zig imports this module as "dds". Every symbol exported here +//! matches the protocol documented in srcZig/dds.zig; see that file for the +//! full contract that any Zig DDS vendor must satisfy. + +const std = @import("std"); + +const zzdds = @import("zzdds"); +const zzdds_gen = @import("zzdds_generated"); + +pub const DDS = zzdds_gen.DDS; + +const UdpTransport = zzdds.udp_transport.UdpTransport; +const SpdpSedpDiscovery = zzdds.combined_discovery.SpdpSedpDiscovery; +const DomainParticipantFactoryImpl = zzdds.dcps.DomainParticipantFactoryImpl; +const DomainParticipantImpl = zzdds.dcps.DomainParticipantImpl; +const DataWriterImpl = zzdds.dcps.DataWriterImpl; +const DataReaderImpl = zzdds.dcps.DataReaderImpl; +const TopicImpl = zzdds.dcps.TopicImpl; +const ContentFilteredTopicImpl = zzdds.dcps.ContentFilteredTopicImpl; +const noop_security = zzdds.noop_security.noop_security_plugins; +const time_mod = zzdds.util.time; +const history_mod = zzdds.rtps.history; +const nil = zzdds.dcps; +const filter_mod = zzdds.dcps.filter; + +// ── Participant bootstrapping ───────────────────────────────────────────────── + +pub const Participant = struct { + alloc: std.mem.Allocator, + udp: *UdpTransport, + disc: *SpdpSedpDiscovery, + factory: *DomainParticipantFactoryImpl, + dp: DDS.DomainParticipant, + + pub fn toDDS(self: *Participant) DDS.DomainParticipant { + return self.dp; + } +}; + +pub fn createParticipant(alloc: std.mem.Allocator, domain_id: u32) !*Participant { + const p = try alloc.create(Participant); + errdefer alloc.destroy(p); + + const udp = try UdpTransport.init(alloc, .{}, domain_id, null); + errdefer udp.deinit(); + const transport = udp.transport(); + + const disc = try SpdpSedpDiscovery.init(alloc, transport, domain_id, 3_000); + errdefer disc.deinit(); + const discovery = disc.toDiscovery(); + + var factory = try DomainParticipantFactoryImpl.init( + alloc, + transport, + discovery, + noop_security, + .spec_random, + .{}, + ); + errdefer factory.deinit(); + const dpf = factory.toDDSFactory(); + + const dp = dpf.create_participant(domain_id, .{}, nil.nil_dp_listener, 0); + if (dp.ptr == nil.nil_dp_listener.ptr) return error.ParticipantFailed; + + p.* = .{ + .alloc = alloc, + .udp = udp, + .disc = disc, + .factory = factory, + .dp = dp, + }; + return p; +} + +pub fn destroyParticipant(p: *Participant) void { + const dpf = p.factory.toDDSFactory(); + _ = dpf.delete_participant(p.dp); + p.factory.deinit(); + p.disc.deinit(); + p.udp.deinit(); + p.alloc.destroy(p); +} + +// ── Topic name ──────────────────────────────────────────────────────────────── + +pub fn topicName(topic: DDS.Topic) []const u8 { + const impl: *TopicImpl = @ptrCast(@alignCast(topic.ptr)); + return impl.topic_name; +} + +// ── DataWriter extras ───────────────────────────────────────────────────────── + +pub const WriteKind = enum { alive, dispose, unregister }; + +pub fn writeRaw( + dw: DDS.DataWriter, + kind: WriteKind, + key_hash: [16]u8, + data: []const u8, +) !void { + const impl: *DataWriterImpl = @ptrCast(@alignCast(dw.ptr)); + const ck: history_mod.ChangeKind = switch (kind) { + .alive => .alive, + .dispose => .not_alive_disposed, + .unregister => .not_alive_unregistered, + }; + _ = try impl.writeRaw(ck, time_mod.RtpsTimestamp.now(), history_mod.INSTANCE_HANDLE_NIL, key_hash, data); +} + +pub fn writerMatchedCount(dw: DDS.DataWriter) usize { + const impl: *DataWriterImpl = @ptrCast(@alignCast(dw.ptr)); + return impl.matchedReaderCount(); +} + +pub fn writerNotifyDeadline(dw: DDS.DataWriter) void { + const impl: *DataWriterImpl = @ptrCast(@alignCast(dw.ptr)); + impl.notifyDeadlineMissed(); +} + +// ── DataReader extras ───────────────────────────────────────────────────────── + +pub const TakenSample = struct { + data: []u8, + alloc: std.mem.Allocator, + instance_state: DDS.InstanceStateKind, + + pub fn deinit(self: TakenSample) void { + self.alloc.free(self.data); + } +}; + +pub fn takeRaw(dr: DDS.DataReader) ?TakenSample { + const impl: *DataReaderImpl = @ptrCast(@alignCast(dr.ptr)); + const taken = impl.takeRaw() orelse return null; + return .{ + .data = taken.data, + .alloc = impl.alloc, + .instance_state = taken.info.instance_state, + }; +} + +pub fn readerMatchedCount(dr: DDS.DataReader) usize { + const impl: *DataReaderImpl = @ptrCast(@alignCast(dr.ptr)); + return impl.matchedWriterCount(); +} + +pub fn readerNotifyDeadline(dr: DDS.DataReader) void { + const impl: *DataReaderImpl = @ptrCast(@alignCast(dr.ptr)); + impl.notifyDeadlineMissed(); +} + +// ── ContentFilteredTopic evaluation ────────────────────────────────────────── + +pub const FilterValue = filter_mod.FilterValue; +pub const FieldAccessor = filter_mod.FieldAccessor; + +pub fn cftMatchSample(cft: DDS.ContentFilteredTopic, acc: FieldAccessor) bool { + const impl: *ContentFilteredTopicImpl = @ptrCast(@alignCast(cft.ptr)); + return impl.matchSample(acc); +} + +pub fn cftTopicDescription(cft: DDS.ContentFilteredTopic) DDS.TopicDescription { + const impl: *ContentFilteredTopicImpl = @ptrCast(@alignCast(cft.ptr)); + return impl.toTopicDescription(); +} + +// ── TypeSupport ─────────────────────────────────────────────────────────────── + +pub const TypeSupport = zzdds.dcps.TypeSupport; + +pub fn registerTypeSupport( + dp: DDS.DomainParticipant, + type_name: []const u8, + ts: TypeSupport, +) void { + const impl: *DomainParticipantImpl = @ptrCast(@alignCast(dp.ptr)); + impl.registerTypeSupport(type_name, ts); +} + +// ── ShapeType CDR ───────────────────────────────────────────────────────────── +// +// ShapeType CDR serialization uses zidl-generated code from srcZig/shape.idl. +// The generated module (shape_gen) handles all CDR encoding details. + +const shape_gen = @import("shape_gen"); +const zidl_rt = @import("zidl_rt"); + +/// Wrapper type for shape_main.zig. Keeps a borrowed `color` slice (valid +/// while the source payload is alive) and separates the publisher-side +/// `additional_payload` size from the subscriber-side `last_payload_byte`. +pub const ShapeType = struct { + color: []const u8, + x: i32 = 0, + y: i32 = 0, + shapesize: i32 = 20, + additional_payload: u32 = 0, + last_payload_byte: ?u8 = null, +}; + +/// Serialize shape into `buf` (cleared first) using zidl-generated CDR. +/// xcdr2=false → CDR_LE (0x0001); xcdr2=true → CDR2_LE (0x0007) with DHEADER. +pub fn serializeShape( + buf: *std.ArrayList(u8), + alloc: std.mem.Allocator, + s: ShapeType, + xcdr2: bool, +) !void { + // Build a temporary generated ShapeType for the serializer. + var shape: shape_gen.ShapeType = .{ + .color = zidl_rt.BoundedArray(u8, 128).fromSlice(s.color) catch return error.ColorTooLong, + .x = s.x, + .y = s.y, + .shapesize = s.shapesize, + }; + if (s.additional_payload > 0) { + try shape.additional_payload_size.ensureTotalCapacity(alloc, s.additional_payload); + for (0..s.additional_payload - 1) |_| + shape.additional_payload_size.appendAssumeCapacity(0); + shape.additional_payload_size.appendAssumeCapacity(255); + } + defer shape.additional_payload_size.deinit(alloc); + + buf.clearRetainingCapacity(); + if (xcdr2) { + var w = zidl_rt.CdrWriter(.xcdr2).init(buf, alloc); + try w.writeEncapHeader(); + try shape_gen.ShapeType.serialize(&w, shape); + } else { + var w = zidl_rt.CdrWriter(.xcdr1).init(buf, alloc); + try w.writeEncapHeader(); + try shape_gen.ShapeType.serialize(&w, shape); + } +} + +/// Serialize a key-only CDR payload (XCDR1) for dispose/unregister writes. +pub fn serializeShapeKeyOnly( + buf: *std.ArrayList(u8), + alloc: std.mem.Allocator, + color: []const u8, +) !void { + const shape = shape_gen.ShapeType{ + .color = zidl_rt.BoundedArray(u8, 128).fromSlice(color) catch return error.ColorTooLong, + }; + buf.clearRetainingCapacity(); + var w = zidl_rt.CdrWriter(.xcdr1).init(buf, alloc); + try w.writeEncapHeader(); + try shape_gen.ShapeType.serializeKey(&w, shape); +} + +/// Deserialize a CDR/CDR2 ShapeType payload. Returns null on parse error or +/// if the payload is key-only (missing x/y/shapesize). The returned color +/// slice is zero-copied from `payload` and is valid only while `payload` lives. +pub fn deserializeShape(payload: []const u8) ?ShapeType { + var reader = zidl_rt.CdrReader.init(payload) catch return null; + reader.skipDheaderIfXcdr2() catch return null; + const color = reader.readStringZeroCopy() catch return null; + if (color.len > 128) return null; + const x = reader.readI32() catch return null; + const y = reader.readI32() catch return null; + const shapesize = reader.readI32() catch return null; + const extra_len = reader.readU32() catch 0; + var last_byte: ?u8 = null; + if (extra_len > 0) { + reader.skip(extra_len - 1) catch {}; + last_byte = reader.readU8() catch null; + } + return .{ + .color = color, + .x = x, + .y = y, + .shapesize = shapesize, + .additional_payload = extra_len, + .last_payload_byte = last_byte, + }; +} + +/// Extract the color key string from a CDR payload (full or key-only). +/// Zero-copied from `payload`; valid only while `payload` is alive. +pub fn deserializeShapeKey(payload: []const u8) []const u8 { + var reader = zidl_rt.CdrReader.init(payload) catch return ""; + reader.skipDheaderIfXcdr2() catch return ""; + return reader.readStringZeroCopy() catch ""; +} + +/// Compute the RTPS key hash for a ShapeType instance from its color string. +/// Delegates to the generated computeKeyHash which uses zidl_rt.KeyHashWriter +/// (CDR_BE, MD5 fallback for keys > 16 bytes). +pub fn shapeKeyHash(color: []const u8) [16]u8 { + const shape = shape_gen.ShapeType{ + .color = zidl_rt.BoundedArray(u8, 128).fromSlice(color) catch return std.mem.zeroes([16]u8), + }; + return shape_gen.ShapeType.computeKeyHash(shape); +} + +/// Compute the RTPS key hash from a received CDR payload. +/// Suitable for use as TypeSupport.compute_key_hash. +pub fn shapeKeyHashFromCdr(payload: []const u8) [16]u8 { + return shapeKeyHash(deserializeShapeKey(payload)); +} + +// ── Nil sentinel helpers ────────────────────────────────────────────────────── +// All nil entities share the same underlying nil_storage address (NIL_PTR). +// We recover that address from any exported nil constant without needing to +// re-export NIL_PTR itself. + +fn nilPtr() *anyopaque { + return zzdds.dcps.nil_topic_listener.ptr; +} + +pub fn nilTopicListener() DDS.TopicListener { + return zzdds.dcps.nil_topic_listener; +} +pub fn nilPublisherListener() DDS.PublisherListener { + return zzdds.dcps.nil_pub_listener; +} +pub fn nilSubscriberListener() DDS.SubscriberListener { + return zzdds.dcps.nil_sub_listener; +} + +pub fn isNilDp(dp: DDS.DomainParticipant) bool { + return dp.ptr == nilPtr(); +} +pub fn isNilTopic(t: DDS.Topic) bool { + return t.ptr == nilPtr(); +} +pub fn isNilPub(p: DDS.Publisher) bool { + return p.ptr == nilPtr(); +} +pub fn isNilSub(s: DDS.Subscriber) bool { + return s.ptr == nilPtr(); +} +pub fn isNilDw(dw: DDS.DataWriter) bool { + return dw.ptr == nilPtr(); +} +pub fn isNilDr(dr: DDS.DataReader) bool { + return dr.ptr == nilPtr(); +} +pub fn isNilCft(cft: DDS.ContentFilteredTopic) bool { + return cft.ptr == nilPtr(); +}