From 32202b7c8ed3f1575f16f8c0e04c3aaaf6bf8db8 Mon Sep 17 00:00:00 2001 From: Rimuksh Kansal Date: Sun, 12 Apr 2026 23:06:28 +0900 Subject: [PATCH 1/5] add system functions --- foreign/cpp/build.rs | 1 + foreign/cpp/include/iggy.hpp | 36 ++ foreign/cpp/src/client.rs | 186 ++++++ foreign/cpp/src/consumer_group.rs | 14 +- foreign/cpp/src/lib.rs | 74 +++ foreign/cpp/tests/client/low_level_e2e.cpp | 672 +++++++++++++++++++++ foreign/cpp/tests/unit_tests.cpp | 19 + 7 files changed, 1001 insertions(+), 1 deletion(-) diff --git a/foreign/cpp/build.rs b/foreign/cpp/build.rs index d2cbf0094c..88034dabeb 100644 --- a/foreign/cpp/build.rs +++ b/foreign/cpp/build.rs @@ -26,5 +26,6 @@ fn main() { println!("cargo:rerun-if-changed=src/lib.rs"); println!("cargo:rerun-if-changed=src/messages.rs"); println!("cargo:rerun-if-changed=src/stream.rs"); + println!("cargo:rerun-if-changed=src/system.rs"); println!("cargo:rerun-if-changed=src/topic.rs"); } diff --git a/foreign/cpp/include/iggy.hpp b/foreign/cpp/include/iggy.hpp index 56641cb13a..2b565a2e7c 100644 --- a/foreign/cpp/include/iggy.hpp +++ b/foreign/cpp/include/iggy.hpp @@ -45,6 +45,42 @@ class CompressionAlgorithm final { std::string algorithm_; }; +class SnapshotCompression final { + public: + static SnapshotCompression stored() { return SnapshotCompression("stored"); } + static SnapshotCompression deflated() { return SnapshotCompression("deflated"); } + static SnapshotCompression bzip2() { return SnapshotCompression("bzip2"); } + static SnapshotCompression zstd() { return SnapshotCompression("zstd"); } + static SnapshotCompression lzma() { return SnapshotCompression("lzma"); } + static SnapshotCompression xz() { return SnapshotCompression("xz"); } + + std::string_view snapshot_compression_value() const { return snapshot_compression_; } + + private: + explicit SnapshotCompression(std::string snapshot_compression) + : snapshot_compression_(std::move(snapshot_compression)) {} + + std::string snapshot_compression_; +}; + +class SnapshotType final { + public: + static SnapshotType filesystem_overview() { return SnapshotType("filesystem_overview"); } + static SnapshotType process_list() { return SnapshotType("process_list"); } + static SnapshotType resource_usage() { return SnapshotType("resource_usage"); } + static SnapshotType test() { return SnapshotType("test"); } + static SnapshotType server_logs() { return SnapshotType("server_logs"); } + static SnapshotType server_config() { return SnapshotType("server_config"); } + static SnapshotType all() { return SnapshotType("all"); } + + std::string_view snapshot_type_value() const { return snapshot_type_; } + + private: + explicit SnapshotType(std::string snapshot_type) : snapshot_type_(std::move(snapshot_type)) {} + + std::string snapshot_type_; +}; + class Expiry final { public: static Expiry server_default() { return Expiry("server_default", 0); } diff --git a/foreign/cpp/src/client.rs b/foreign/cpp/src/client.rs index d4500937fc..28a6cea20a 100644 --- a/foreign/cpp/src/client.rs +++ b/foreign/cpp/src/client.rs @@ -22,7 +22,15 @@ use iggy::prelude::{ IggyClientBuilder as RustIggyClientBuilder, IggyError, IggyExpiry as RustIggyExpiry, IggyMessage, IggyTimestamp, MaxTopicSize as RustMaxTopicSize, MessageClient, PartitionClient, Partitioning, PollingStrategy, StreamClient, TopicClient, UserClient, + MaxTopicSize as RustMaxTopicSize, PartitionClient, + SnapshotCompression as RustSnapshotCompression, StreamClient, SystemClient as RustSystemClient, + SystemSnapshotType as RustSystemSnapshotType, TopicClient, UserClient, }; +use iggy_common::{ + CacheMetrics as RustCacheMetrics, CacheMetricsKey as RustCacheMetricsKey, + ClientInfo as RustClientInfo, ClientInfoDetails as RustClientInfoDetails, Stats as RustStats, +}; +use std::convert::TryFrom; use std::str::FromStr; use std::sync::Arc; @@ -30,6 +38,99 @@ use std::sync::Arc; /// partition based on the consumer/strategy. Cxx FFI does not support `Option`, so we /// reserve `u32::MAX` as the sentinel for `partition_id`. const ANY_PARTITION_ID: u32 = u32::MAX; +impl From for ffi::ClientInfo { + fn from(client: RustClientInfo) -> Self { + ffi::ClientInfo { + client_id: client.client_id, + // TODO(slbotbm): In high-level client, this should be converted to None. + user_id: client.user_id.unwrap_or(u32::MAX), + address: client.address, + transport: client.transport, + consumer_groups_count: client.consumer_groups_count, + } + } +} + +impl From for ffi::ClientInfoDetails { + fn from(client: RustClientInfoDetails) -> Self { + ffi::ClientInfoDetails { + client_id: client.client_id, + // TODO(slbotbm): In high-level client, this should be converted to None. + user_id: client.user_id.unwrap_or(u32::MAX), + address: client.address, + transport: client.transport, + consumer_groups_count: client.consumer_groups_count, + consumer_groups: client + .consumer_groups + .into_iter() + .map(ffi::ConsumerGroupInfo::from) + .collect(), + } + } +} + +impl TryFrom> for ffi::ClientInfoDetails { + type Error = String; + + fn try_from(client: Option) -> Result { + match client { + Some(client) => Ok(ffi::ClientInfoDetails::from(client)), + None => Err("client not found".to_string()), + } + } +} + +impl From<(RustCacheMetricsKey, RustCacheMetrics)> for ffi::CacheMetricEntry { + fn from((key, metrics): (RustCacheMetricsKey, RustCacheMetrics)) -> Self { + ffi::CacheMetricEntry { + stream_id: key.stream_id, + topic_id: key.topic_id, + partition_id: key.partition_id, + hits: metrics.hits, + misses: metrics.misses, + hit_ratio: metrics.hit_ratio, + } + } +} + +impl From for ffi::Stats { + fn from(stats: RustStats) -> Self { + ffi::Stats { + process_id: stats.process_id, + cpu_usage: stats.cpu_usage, + total_cpu_usage: stats.total_cpu_usage, + memory_usage: stats.memory_usage.as_bytes_u64(), + total_memory: stats.total_memory.as_bytes_u64(), + available_memory: stats.available_memory.as_bytes_u64(), + run_time: stats.run_time.as_micros(), + start_time: stats.start_time.as_micros(), + read_bytes: stats.read_bytes.as_bytes_u64(), + written_bytes: stats.written_bytes.as_bytes_u64(), + messages_size_bytes: stats.messages_size_bytes.as_bytes_u64(), + streams_count: stats.streams_count, + topics_count: stats.topics_count, + partitions_count: stats.partitions_count, + segments_count: stats.segments_count, + messages_count: stats.messages_count, + clients_count: stats.clients_count, + consumer_groups_count: stats.consumer_groups_count, + hostname: stats.hostname, + os_name: stats.os_name, + os_version: stats.os_version, + kernel_version: stats.kernel_version, + iggy_server_version: stats.iggy_server_version, + iggy_server_semver: stats.iggy_server_semver.unwrap_or(u32::MAX), + cache_metrics: stats + .cache_metrics + .into_iter() + .map(ffi::CacheMetricEntry::from) + .collect(), + threads_count: stats.threads_count, + free_disk_space: stats.free_disk_space.as_bytes_u64(), + total_disk_space: stats.total_disk_space.as_bytes_u64(), + } + } +} pub struct Client { pub inner: Arc, @@ -571,6 +672,57 @@ impl Client { rust_group_id, rust_topic_id, rust_stream_id ) })?; + pub fn get_stats(&self) -> Result { + RUNTIME.block_on(async { + let stats = self + .inner + .get_stats() + .await + .map_err(|error| format!("Could not get stats: {error}"))?; + Ok(ffi::Stats::from(stats)) + }) + } + + pub fn get_me(&self) -> Result { + RUNTIME.block_on(async { + let client = self + .inner + .get_me() + .await + .map_err(|error| format!("Could not get current client info: {error}"))?; + Ok(ffi::ClientInfoDetails::from(client)) + }) + } + + pub fn get_client(&self, client_id: u32) -> Result { + RUNTIME.block_on(async { + let client = self + .inner + .get_client(client_id) + .await + .map_err(|error| format!("Could not get client '{client_id}': {error}"))?; + ffi::ClientInfoDetails::try_from(client) + .map_err(|error| format!("Could not get client '{client_id}': {error}")) + }) + } + + pub fn get_clients(&self) -> Result, String> { + RUNTIME.block_on(async { + let clients = self + .inner + .get_clients() + .await + .map_err(|error| format!("Could not get clients: {error}"))?; + Ok(clients.into_iter().map(ffi::ClientInfo::from).collect()) + }) + } + + pub fn ping(&self) -> Result<(), String> { + RUNTIME.block_on(async { + self.inner + .ping() + .await + .map_err(|error| format!("Could not ping server: {error}"))?; Ok(()) }) } @@ -602,6 +754,40 @@ impl Client { ) })?; Ok(()) + pub fn heartbeat_interval(&self) -> u64 { + RUNTIME.block_on(async { self.inner.heartbeat_interval().await.as_micros() }) + } + + pub fn snapshot( + &self, + snapshot_compression: String, + snapshot_types: Vec, + ) -> Result, String> { + let rust_compression = match snapshot_compression.trim() { + "" => RustSnapshotCompression::default(), + value => RustSnapshotCompression::from_str(value).map_err(|error| { + format!("Could not capture snapshot: invalid compression '{value}': {error}") + })?, + }; + let rust_snapshot_types = snapshot_types + .into_iter() + .map(|snapshot_type| { + RustSystemSnapshotType::from_str(&snapshot_type).map_err(|error| { + format!( + "Could not capture snapshot: invalid snapshot type '{}': {}", + snapshot_type, error + ) + }) + }) + .collect::, _>>()?; + + RUNTIME.block_on(async { + let snapshot = self + .inner + .snapshot(rust_compression, rust_snapshot_types) + .await + .map_err(|error| format!("Could not capture snapshot: {error}"))?; + Ok(snapshot.0) }) } } diff --git a/foreign/cpp/src/consumer_group.rs b/foreign/cpp/src/consumer_group.rs index 28a2d92e2c..19a584a807 100644 --- a/foreign/cpp/src/consumer_group.rs +++ b/foreign/cpp/src/consumer_group.rs @@ -17,7 +17,19 @@ use crate::ffi; use iggy::prelude::ConsumerGroupDetails as RustConsumerGroupDetails; -use iggy_common::ConsumerGroupMember as RustConsumerGroupMember; +use iggy_common::{ + ConsumerGroupInfo as RustConsumerGroupInfo, ConsumerGroupMember as RustConsumerGroupMember, +}; + +impl From for ffi::ConsumerGroupInfo { + fn from(group: RustConsumerGroupInfo) -> Self { + ffi::ConsumerGroupInfo { + stream_id: group.stream_id, + topic_id: group.topic_id, + group_id: group.group_id, + } + } +} impl From for ffi::ConsumerGroupMember { fn from(member: RustConsumerGroupMember) -> Self { diff --git a/foreign/cpp/src/lib.rs b/foreign/cpp/src/lib.rs index 7f87d57652..c4e4fae659 100644 --- a/foreign/cpp/src/lib.rs +++ b/foreign/cpp/src/lib.rs @@ -114,6 +114,69 @@ mod ffi { members: Vec, } + struct ConsumerGroupInfo { + stream_id: u32, + topic_id: u32, + group_id: u32, + } + + struct ClientInfo { + client_id: u32, + user_id: u32, + address: String, + transport: String, + consumer_groups_count: u32, + } + + struct ClientInfoDetails { + client_id: u32, + user_id: u32, + address: String, + transport: String, + consumer_groups_count: u32, + consumer_groups: Vec, + } + + struct CacheMetricEntry { + stream_id: u32, + topic_id: u32, + partition_id: u32, + hits: u64, + misses: u64, + hit_ratio: f32, + } + + struct Stats { + process_id: u32, + cpu_usage: f32, + total_cpu_usage: f32, + memory_usage: u64, + total_memory: u64, + available_memory: u64, + run_time: u64, + start_time: u64, + read_bytes: u64, + written_bytes: u64, + messages_size_bytes: u64, + streams_count: u32, + topics_count: u32, + partitions_count: u32, + segments_count: u32, + messages_count: u64, + clients_count: u32, + consumer_groups_count: u32, + hostname: String, + os_name: String, + os_version: String, + kernel_version: String, + iggy_server_version: String, + iggy_server_semver: u32, + cache_metrics: Vec, + threads_count: u32, + free_disk_space: u64, + total_disk_space: u64, + } + extern "Rust" { type Client; @@ -207,6 +270,17 @@ mod ffi { partitioning_value: Vec, messages: Vec, ) -> Result<()>; + fn get_stats(self: &Client) -> Result; + fn get_me(self: &Client) -> Result; + fn get_client(self: &Client, client_id: u32) -> Result; + fn get_clients(self: &Client) -> Result>; + fn ping(self: &Client) -> Result<()>; + fn heartbeat_interval(self: &Client) -> u64; + fn snapshot( + self: &Client, + snapshot_compression: String, + snapshot_types: Vec, + ) -> Result>; unsafe fn delete_connection(client: *mut Client) -> Result<()>; diff --git a/foreign/cpp/tests/client/low_level_e2e.cpp b/foreign/cpp/tests/client/low_level_e2e.cpp index 7306f0c953..f9c98fcc70 100644 --- a/foreign/cpp/tests/client/low_level_e2e.cpp +++ b/foreign/cpp/tests/client/low_level_e2e.cpp @@ -18,7 +18,11 @@ // TODO(slbotbm): create fixture for setup/teardown. // TODO(slbotbm): Add tests for join_consumer_group() and leave_consumer_group() +#include +#include +#include #include +#include #include @@ -150,3 +154,671 @@ TEST(LowLevelE2E_Client, DeleteNullConnectionIsNoop) { iggy::ffi::Client *client = nullptr; ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); } + +TEST(LowLevelE2E_Client, GetStatsBeforeLoginThrows) { + RecordProperty("description", "Rejects get_stats before connect, and after connect but before login."); + iggy::ffi::Client *client = nullptr; + ASSERT_NO_THROW({ client = iggy::ffi::new_connection(""); }); + ASSERT_NE(client, nullptr); + + ASSERT_THROW(client->get_stats(), std::exception); + ASSERT_NO_THROW(client->connect()); + ASSERT_THROW(client->get_stats(), std::exception); + + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); + client = nullptr; +} + +// TODO(slbotbm-this PR): add a test to create some streams, topics, partitions, and segments, send messages, and create +// consumer groups and verify it. +TEST(LowLevelE2E_Client, GetStatsReturnsServerStats) { + RecordProperty("description", + "Returns empty resource counts first, then reflects aggregated streams, topics, partitions, " + "consumer groups, and clients."); + const std::string first_stream_name = "cpp-get-stats-stream-1"; + const std::string second_stream_name = "cpp-get-stats-stream-2"; + const std::string first_topic_name = "cpp-get-stats-topic-1"; + const std::string second_topic_name = "cpp-get-stats-topic-2"; + const std::string third_topic_name = "cpp-get-stats-topic-3"; + const std::string first_group_name = "cpp-get-stats-group-1"; + const std::string second_group_name = "cpp-get-stats-group-2"; + const std::string third_group_name = "cpp-get-stats-group-3"; + constexpr std::uint32_t additional_partitions_count = 2; + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + iggy::ffi::Client *second_client = nullptr; + iggy::ffi::Client *third_client = nullptr; + + iggy::ffi::Stats empty_stats{}; + ASSERT_NO_THROW({ + empty_stats = client->get_stats(); + EXPECT_NE(empty_stats.process_id, 0u); + EXPECT_GT(empty_stats.threads_count, 0u); + EXPECT_GT(empty_stats.total_memory, 0u); + EXPECT_GE(empty_stats.available_memory, 0u); + EXPECT_GE(empty_stats.total_disk_space, empty_stats.free_disk_space); + EXPECT_FALSE(static_cast(empty_stats.hostname).empty()); + EXPECT_FALSE(static_cast(empty_stats.os_name).empty()); + EXPECT_FALSE(static_cast(empty_stats.os_version).empty()); + EXPECT_FALSE(static_cast(empty_stats.kernel_version).empty()); + EXPECT_FALSE(static_cast(empty_stats.iggy_server_version).empty()); + EXPECT_TRUE(empty_stats.iggy_server_semver == std::numeric_limits::max() || + empty_stats.iggy_server_semver > 0u); + EXPECT_GE(empty_stats.cache_metrics.size(), 0u); + EXPECT_EQ(empty_stats.streams_count, 0u); + EXPECT_EQ(empty_stats.topics_count, 0u); + EXPECT_EQ(empty_stats.partitions_count, 0u); + EXPECT_EQ(empty_stats.consumer_groups_count, 0u); + }); + + ASSERT_NO_THROW(client->create_stream(first_stream_name)); + ASSERT_NO_THROW(client->create_stream(second_stream_name)); + ASSERT_NO_THROW(client->create_topic(make_string_identifier(first_stream_name), first_topic_name, 1, "none", 0, + "server_default", 0, "server_default")); + ASSERT_NO_THROW(client->create_topic(make_string_identifier(first_stream_name), second_topic_name, 2, "none", 0, + "server_default", 0, "server_default")); + ASSERT_NO_THROW(client->create_topic(make_string_identifier(second_stream_name), third_topic_name, 3, "none", 0, + "server_default", 0, "server_default")); + ASSERT_NO_THROW(client->create_partitions(make_string_identifier(first_stream_name), + make_string_identifier(first_topic_name), additional_partitions_count)); + const auto first_group = client->create_consumer_group(make_string_identifier(first_stream_name), + make_string_identifier(first_topic_name), first_group_name); + const auto second_group = client->create_consumer_group( + make_string_identifier(first_stream_name), make_string_identifier(second_topic_name), second_group_name); + const auto third_group = client->create_consumer_group(make_string_identifier(second_stream_name), + make_string_identifier(third_topic_name), third_group_name); + + ASSERT_NO_THROW({ second_client = login_to_server(); }); + ASSERT_NE(second_client, nullptr); + ASSERT_NO_THROW({ third_client = login_to_server(); }); + ASSERT_NE(third_client, nullptr); + + const auto first_stream_details = client->get_stream(make_string_identifier(first_stream_name)); + const auto second_stream_details = client->get_stream(make_string_identifier(second_stream_name)); + const std::uint32_t expected_topics_count = first_stream_details.topics_count + second_stream_details.topics_count; + std::uint32_t first_topic_partitions = 0; + std::uint32_t second_topic_partitions = 0; + std::uint32_t third_topic_partitions = 0; + for (const auto &topic : first_stream_details.topics) { + if (topic.name == first_topic_name) { + first_topic_partitions = topic.partitions_count; + } + if (topic.name == second_topic_name) { + second_topic_partitions = topic.partitions_count; + } + } + for (const auto &topic : second_stream_details.topics) { + if (topic.name == third_topic_name) { + third_topic_partitions = topic.partitions_count; + } + } + const std::uint32_t expected_partitions_count = + first_topic_partitions + second_topic_partitions + third_topic_partitions; + + ASSERT_NO_THROW({ + const auto stats = client->get_stats(); + EXPECT_EQ(stats.streams_count, 2u); + EXPECT_EQ(stats.topics_count, expected_topics_count); + EXPECT_EQ(stats.partitions_count, expected_partitions_count); + EXPECT_EQ(stats.segments_count, expected_partitions_count); + EXPECT_EQ(stats.consumer_groups_count, 3u); + EXPECT_EQ(stats.clients_count, empty_stats.clients_count + 2u); + EXPECT_EQ(first_group.partitions_count, first_topic_partitions); + EXPECT_EQ(second_group.partitions_count, second_topic_partitions); + EXPECT_EQ(third_group.partitions_count, third_topic_partitions); + }); + + ASSERT_NO_THROW(client->delete_stream(make_string_identifier(second_stream_name))); + ASSERT_NO_THROW(client->delete_stream(make_string_identifier(first_stream_name))); + ASSERT_NO_THROW(iggy::ffi::delete_connection(third_client)); + third_client = nullptr; + ASSERT_NO_THROW(iggy::ffi::delete_connection(second_client)); + second_client = nullptr; + + ASSERT_NO_THROW({ + const auto stats = client->get_stats(); + EXPECT_EQ(stats.streams_count, 0u); + EXPECT_EQ(stats.topics_count, 0u); + EXPECT_EQ(stats.partitions_count, 0u); + EXPECT_EQ(stats.segments_count, 0u); + EXPECT_EQ(stats.consumer_groups_count, 0u); + }); + + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); + client = nullptr; +} + +TEST(LowLevelE2E_Client, GetStatsIsStableAcrossBackToBackCalls) { + RecordProperty( + "description", + "Returns sane invariant fields across back-to-back get_stats calls on an idle authenticated client."); + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + iggy::ffi::Stats first_stats{}; + iggy::ffi::Stats second_stats{}; + ASSERT_NO_THROW({ + first_stats = client->get_stats(); + second_stats = client->get_stats(); + }); + + EXPECT_NE(first_stats.process_id, 0u); + EXPECT_NE(second_stats.process_id, 0u); + EXPECT_EQ(second_stats.process_id, first_stats.process_id); + EXPECT_GT(first_stats.threads_count, 0u); + EXPECT_GT(second_stats.threads_count, 0u); + EXPECT_GT(first_stats.total_memory, 0u); + EXPECT_GT(second_stats.total_memory, 0u); + EXPECT_FALSE(static_cast(first_stats.hostname).empty()); + EXPECT_FALSE(static_cast(second_stats.hostname).empty()); + EXPECT_FALSE(static_cast(first_stats.os_name).empty()); + EXPECT_FALSE(static_cast(second_stats.os_name).empty()); + EXPECT_FALSE(static_cast(first_stats.os_version).empty()); + EXPECT_FALSE(static_cast(second_stats.os_version).empty()); + EXPECT_FALSE(static_cast(first_stats.kernel_version).empty()); + EXPECT_FALSE(static_cast(second_stats.kernel_version).empty()); + EXPECT_FALSE(static_cast(first_stats.iggy_server_version).empty()); + EXPECT_FALSE(static_cast(second_stats.iggy_server_version).empty()); + EXPECT_EQ(static_cast(second_stats.hostname), static_cast(first_stats.hostname)); + EXPECT_EQ(static_cast(second_stats.os_name), static_cast(first_stats.os_name)); + EXPECT_EQ(static_cast(second_stats.os_version), static_cast(first_stats.os_version)); + EXPECT_EQ(static_cast(second_stats.kernel_version), + static_cast(first_stats.kernel_version)); + EXPECT_EQ(static_cast(second_stats.iggy_server_version), + static_cast(first_stats.iggy_server_version)); + EXPECT_EQ(second_stats.iggy_server_semver, first_stats.iggy_server_semver); + EXPECT_EQ(second_stats.clients_count, first_stats.clients_count); + EXPECT_EQ(second_stats.streams_count, first_stats.streams_count); + EXPECT_EQ(second_stats.topics_count, first_stats.topics_count); + EXPECT_EQ(second_stats.partitions_count, first_stats.partitions_count); + EXPECT_EQ(second_stats.consumer_groups_count, first_stats.consumer_groups_count); + + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); + client = nullptr; +} + +TEST(LowLevelE2E_Client, GetMeBeforeLoginThrows) { + RecordProperty("description", "Rejects get_me before connect, and after connect but before login."); + iggy::ffi::Client *client = nullptr; + ASSERT_NO_THROW({ client = iggy::ffi::new_connection(""); }); + ASSERT_NE(client, nullptr); + + ASSERT_THROW(client->get_me(), std::exception); + ASSERT_NO_THROW(client->connect()); + ASSERT_THROW(client->get_me(), std::exception); + + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); + client = nullptr; +} + +// TODO(slbotbm-this PR): add additional validation for get_me after merging join_consumer_group PR. +TEST(LowLevelE2E_Client, GetMeReturnsCurrentClientDetails) { + RecordProperty("description", "Returns the current authenticated client details."); + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + ASSERT_NO_THROW({ + const auto me = client->get_me(); + EXPECT_NE(me.client_id, 0u); + EXPECT_NE(me.user_id, std::numeric_limits::max()); + EXPECT_FALSE(static_cast(me.address).empty()); + EXPECT_EQ(static_cast(me.transport), "TCP"); + EXPECT_EQ(me.consumer_groups_count, 0u); + EXPECT_TRUE(me.consumer_groups.empty()); + }); + + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); + client = nullptr; +} + +TEST(LowLevelE2E_Client, GetMeIsStableAcrossBackToBackCalls) { + RecordProperty("description", "Returns stable current-client details across back-to-back get_me calls."); + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + iggy::ffi::ClientInfoDetails first_me{}; + iggy::ffi::ClientInfoDetails second_me{}; + ASSERT_NO_THROW({ + first_me = client->get_me(); + second_me = client->get_me(); + }); + + EXPECT_NE(first_me.client_id, 0u); + EXPECT_EQ(second_me.client_id, first_me.client_id); + EXPECT_EQ(second_me.user_id, first_me.user_id); + EXPECT_EQ(static_cast(second_me.address), static_cast(first_me.address)); + EXPECT_EQ(static_cast(first_me.transport), "TCP"); + EXPECT_EQ(static_cast(second_me.transport), "TCP"); + EXPECT_EQ(static_cast(second_me.transport), static_cast(first_me.transport)); + EXPECT_EQ(second_me.consumer_groups_count, first_me.consumer_groups_count); + EXPECT_EQ(second_me.consumer_groups.size(), first_me.consumer_groups.size()); + + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); + client = nullptr; +} + +TEST(LowLevelE2E_Client, GetMeReturnsDistinctClientIdsForDifferentSessions) { + RecordProperty( + "description", + "Returns different client ids for separate authenticated sessions while keeping the same user identity."); + iggy::ffi::Client *first_client = login_to_server(); + iggy::ffi::Client *second_client = login_to_server(); + ASSERT_NE(first_client, nullptr); + ASSERT_NE(second_client, nullptr); + + iggy::ffi::ClientInfoDetails first_me{}; + iggy::ffi::ClientInfoDetails second_me{}; + ASSERT_NO_THROW({ + first_me = first_client->get_me(); + second_me = second_client->get_me(); + }); + + EXPECT_NE(first_me.client_id, 0u); + EXPECT_NE(second_me.client_id, 0u); + EXPECT_NE(second_me.client_id, first_me.client_id); + EXPECT_EQ(second_me.user_id, first_me.user_id); + EXPECT_EQ(static_cast(first_me.transport), "TCP"); + EXPECT_EQ(static_cast(second_me.transport), "TCP"); + + ASSERT_NO_THROW(iggy::ffi::delete_connection(second_client)); + second_client = nullptr; + ASSERT_NO_THROW(iggy::ffi::delete_connection(first_client)); + first_client = nullptr; +} + +TEST(LowLevelE2E_Client, GetMeReturnsValidDetailsAfterReconnect) { + RecordProperty("description", + "Returns valid current-client details after reconnecting with a fresh authenticated session."); + iggy::ffi::Client *first_client = login_to_server(); + ASSERT_NE(first_client, nullptr); + + iggy::ffi::ClientInfoDetails first_me{}; + ASSERT_NO_THROW({ first_me = first_client->get_me(); }); + EXPECT_NE(first_me.client_id, 0u); + EXPECT_NE(first_me.user_id, std::numeric_limits::max()); + + ASSERT_NO_THROW(iggy::ffi::delete_connection(first_client)); + first_client = nullptr; + + iggy::ffi::Client *second_client = login_to_server(); + ASSERT_NE(second_client, nullptr); + + iggy::ffi::ClientInfoDetails second_me{}; + ASSERT_NO_THROW({ second_me = second_client->get_me(); }); + EXPECT_NE(second_me.client_id, 0u); + EXPECT_NE(second_me.user_id, std::numeric_limits::max()); + EXPECT_EQ(second_me.user_id, first_me.user_id); + EXPECT_EQ(static_cast(first_me.transport), "TCP"); + EXPECT_EQ(static_cast(second_me.transport), "TCP"); + EXPECT_EQ(static_cast(second_me.transport), static_cast(first_me.transport)); + EXPECT_FALSE(static_cast(second_me.address).empty()); + EXPECT_EQ(second_me.consumer_groups_count, 0u); + EXPECT_TRUE(second_me.consumer_groups.empty()); + + ASSERT_NO_THROW(iggy::ffi::delete_connection(second_client)); + second_client = nullptr; +} + +TEST(LowLevelE2E_Client, GetClientBeforeLoginThrows) { + RecordProperty("description", "Rejects get_client before connect, and after connect but before login."); + iggy::ffi::Client *client = nullptr; + ASSERT_NO_THROW({ client = iggy::ffi::new_connection(""); }); + ASSERT_NE(client, nullptr); + + ASSERT_THROW(client->get_client(1), std::exception); + ASSERT_NO_THROW(client->connect()); + ASSERT_THROW(client->get_client(1), std::exception); + + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); + client = nullptr; +} + +TEST(LowLevelE2E_Client, GetClientWithWrongClientIdThrows) { + RecordProperty("description", "Rejects querying invalid or non-existent client ids."); + const std::uint32_t wrong_client_ids[] = {0u, std::numeric_limits::max()}; + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + for (const std::uint32_t wrong_client_id : wrong_client_ids) { + SCOPED_TRACE(wrong_client_id); + ASSERT_THROW(client->get_client(wrong_client_id), std::exception); + } + + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); + client = nullptr; +} + +TEST(LowLevelE2E_Client, GetClientReturnsDetailsForMatchingClientId) { + RecordProperty("description", "Returns current client details when querying with the authenticated client id."); + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + iggy::ffi::ClientInfoDetails current_client{}; + iggy::ffi::ClientInfoDetails looked_up_client{}; + ASSERT_NO_THROW({ + current_client = client->get_me(); + looked_up_client = client->get_client(current_client.client_id); + }); + + EXPECT_NE(current_client.client_id, 0u); + EXPECT_EQ(looked_up_client.client_id, current_client.client_id); + EXPECT_EQ(looked_up_client.user_id, current_client.user_id); + EXPECT_EQ(static_cast(looked_up_client.address), static_cast(current_client.address)); + EXPECT_EQ(static_cast(looked_up_client.transport), "TCP"); + EXPECT_EQ(static_cast(looked_up_client.transport), static_cast(current_client.transport)); + EXPECT_EQ(looked_up_client.consumer_groups_count, current_client.consumer_groups_count); + EXPECT_EQ(looked_up_client.consumer_groups.size(), current_client.consumer_groups.size()); + + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); + client = nullptr; +} + +TEST(LowLevelE2E_Client, GetClientIsStableAcrossBackToBackCalls) { + RecordProperty("description", "Returns stable client details across back-to-back get_client calls."); + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + iggy::ffi::ClientInfoDetails current_client{}; + iggy::ffi::ClientInfoDetails first_lookup{}; + iggy::ffi::ClientInfoDetails second_lookup{}; + ASSERT_NO_THROW({ + current_client = client->get_me(); + first_lookup = client->get_client(current_client.client_id); + second_lookup = client->get_client(current_client.client_id); + }); + + EXPECT_NE(current_client.client_id, 0u); + EXPECT_EQ(first_lookup.client_id, current_client.client_id); + EXPECT_EQ(second_lookup.client_id, first_lookup.client_id); + EXPECT_EQ(second_lookup.user_id, first_lookup.user_id); + EXPECT_EQ(static_cast(second_lookup.address), static_cast(first_lookup.address)); + EXPECT_EQ(static_cast(first_lookup.transport), "TCP"); + EXPECT_EQ(static_cast(second_lookup.transport), "TCP"); + EXPECT_EQ(static_cast(second_lookup.transport), static_cast(first_lookup.transport)); + EXPECT_EQ(second_lookup.consumer_groups_count, first_lookup.consumer_groups_count); + EXPECT_EQ(second_lookup.consumer_groups.size(), first_lookup.consumer_groups.size()); + + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); + client = nullptr; +} + +TEST(LowLevelE2E_Client, GetClientsBeforeLoginThrows) { + RecordProperty("description", "Rejects get_clients before connect, and after connect but before login."); + iggy::ffi::Client *client = nullptr; + ASSERT_NO_THROW({ client = iggy::ffi::new_connection(""); }); + ASSERT_NE(client, nullptr); + + ASSERT_THROW(client->get_clients(), std::exception); + ASSERT_NO_THROW(client->connect()); + ASSERT_THROW(client->get_clients(), std::exception); + + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); + client = nullptr; +} + +TEST(LowLevelE2E_Client, GetClientsReturnsActiveClientSessions) { + RecordProperty("description", "Returns the currently active authenticated client sessions."); + iggy::ffi::Client *first_client = login_to_server(); + iggy::ffi::Client *second_client = login_to_server(); + ASSERT_NE(first_client, nullptr); + ASSERT_NE(second_client, nullptr); + + iggy::ffi::ClientInfoDetails first_me{}; + iggy::ffi::ClientInfoDetails second_me{}; + rust::Vec clients; + ASSERT_NO_THROW({ + first_me = first_client->get_me(); + second_me = second_client->get_me(); + clients = first_client->get_clients(); + }); + + ASSERT_GE(clients.size(), 2u); + + bool found_first = false; + bool found_second = false; + for (const auto &client : clients) { + EXPECT_NE(client.client_id, 0u); + EXPECT_EQ(static_cast(client.transport), "TCP"); + + if (client.client_id == first_me.client_id) { + found_first = true; + EXPECT_EQ(client.user_id, first_me.user_id); + EXPECT_EQ(static_cast(client.address), static_cast(first_me.address)); + EXPECT_EQ(client.consumer_groups_count, first_me.consumer_groups_count); + } + + if (client.client_id == second_me.client_id) { + found_second = true; + EXPECT_EQ(client.user_id, second_me.user_id); + EXPECT_EQ(static_cast(client.address), static_cast(second_me.address)); + EXPECT_EQ(client.consumer_groups_count, second_me.consumer_groups_count); + } + } + + EXPECT_TRUE(found_first); + EXPECT_TRUE(found_second); + + ASSERT_NO_THROW(iggy::ffi::delete_connection(second_client)); + second_client = nullptr; + ASSERT_NO_THROW(iggy::ffi::delete_connection(first_client)); + first_client = nullptr; +} + +TEST(LowLevelE2E_Client, GetClientsIsStableAcrossBackToBackCalls) { + RecordProperty("description", "Returns stable client lists across back-to-back get_clients calls."); + iggy::ffi::Client *first_client = login_to_server(); + iggy::ffi::Client *second_client = login_to_server(); + ASSERT_NE(first_client, nullptr); + ASSERT_NE(second_client, nullptr); + + rust::Vec first_clients; + rust::Vec second_clients; + ASSERT_NO_THROW({ + first_clients = first_client->get_clients(); + second_clients = first_client->get_clients(); + }); + + ASSERT_EQ(second_clients.size(), first_clients.size()); + for (const auto &first_entry : first_clients) { + bool found_match = false; + for (const auto &second_entry : second_clients) { + if (second_entry.client_id != first_entry.client_id) { + continue; + } + + found_match = true; + EXPECT_EQ(second_entry.user_id, first_entry.user_id); + EXPECT_EQ(static_cast(second_entry.address), static_cast(first_entry.address)); + EXPECT_EQ(static_cast(second_entry.transport), + static_cast(first_entry.transport)); + EXPECT_EQ(second_entry.consumer_groups_count, first_entry.consumer_groups_count); + break; + } + EXPECT_TRUE(found_match); + } + + ASSERT_NO_THROW(iggy::ffi::delete_connection(second_client)); + second_client = nullptr; + ASSERT_NO_THROW(iggy::ffi::delete_connection(first_client)); + first_client = nullptr; +} + +TEST(LowLevelE2E_Client, GetClientsMatchesGetClientForReturnedIds) { + RecordProperty("description", "Returns list entries that agree with get_client for each returned client id."); + iggy::ffi::Client *first_client = login_to_server(); + iggy::ffi::Client *second_client = login_to_server(); + ASSERT_NE(first_client, nullptr); + ASSERT_NE(second_client, nullptr); + + rust::Vec clients; + ASSERT_NO_THROW({ clients = first_client->get_clients(); }); + ASSERT_GE(clients.size(), 2u); + + for (const auto &client : clients) { + SCOPED_TRACE(client.client_id); + iggy::ffi::ClientInfoDetails details{}; + ASSERT_NO_THROW({ details = first_client->get_client(client.client_id); }); + + EXPECT_EQ(details.client_id, client.client_id); + EXPECT_EQ(details.user_id, client.user_id); + EXPECT_EQ(static_cast(details.address), static_cast(client.address)); + EXPECT_EQ(static_cast(details.transport), static_cast(client.transport)); + EXPECT_EQ(details.consumer_groups_count, client.consumer_groups_count); + } + + ASSERT_NO_THROW(iggy::ffi::delete_connection(second_client)); + second_client = nullptr; + ASSERT_NO_THROW(iggy::ffi::delete_connection(first_client)); + first_client = nullptr; +} + +TEST(LowLevelE2E_Client, GetClientsReflectsAdditionalSession) { + RecordProperty("description", "Reflects a newly added authenticated session in subsequent get_clients results."); + iggy::ffi::Client *first_client = login_to_server(); + ASSERT_NE(first_client, nullptr); + + rust::Vec clients_before; + ASSERT_NO_THROW({ clients_before = first_client->get_clients(); }); + + iggy::ffi::Client *second_client = login_to_server(); + ASSERT_NE(second_client, nullptr); + + iggy::ffi::ClientInfoDetails second_me{}; + rust::Vec clients_after; + ASSERT_NO_THROW({ + second_me = second_client->get_me(); + clients_after = first_client->get_clients(); + }); + + bool found_before = false; + for (const auto &client : clients_before) { + if (client.client_id == second_me.client_id) { + found_before = true; + break; + } + } + EXPECT_FALSE(found_before); + + bool found_after = false; + for (const auto &client : clients_after) { + if (client.client_id != second_me.client_id) { + continue; + } + + found_after = true; + EXPECT_EQ(client.user_id, second_me.user_id); + EXPECT_EQ(static_cast(client.address), static_cast(second_me.address)); + EXPECT_EQ(static_cast(client.transport), "TCP"); + EXPECT_EQ(client.consumer_groups_count, second_me.consumer_groups_count); + break; + } + EXPECT_TRUE(found_after); + + ASSERT_NO_THROW(iggy::ffi::delete_connection(second_client)); + second_client = nullptr; + ASSERT_NO_THROW(iggy::ffi::delete_connection(first_client)); + first_client = nullptr; +} + +TEST(LowLevelE2E_Client, PingSucceedsForNewConnection) { + RecordProperty("description", "Successfully pings the server from a fresh unauthenticated client session."); + iggy::ffi::Client *client = nullptr; + ASSERT_NO_THROW({ client = iggy::ffi::new_connection(""); }); + ASSERT_NE(client, nullptr); + + ASSERT_NO_THROW(client->ping()); + + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); + client = nullptr; +} + +TEST(LowLevelE2E_Client, HeartbeatIntervalReturnsDefaultValueForNewConnection) { + RecordProperty("description", + "Returns the default heartbeat interval in microseconds for a fresh unauthenticated client."); + iggy::ffi::Client *client = nullptr; + ASSERT_NO_THROW({ client = iggy::ffi::new_connection(""); }); + ASSERT_NE(client, nullptr); + + const auto heartbeat_interval = client->heartbeat_interval(); + EXPECT_EQ(heartbeat_interval, 5'000'000u); + + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); + client = nullptr; +} + +TEST(LowLevelE2E_Client, HeartbeatIntervalReturnsConfiguredValueFromConnectionString) { + RecordProperty("description", + "Returns the configured heartbeat interval in microseconds from the connection string."); + iggy::ffi::Client *client = nullptr; + ASSERT_NO_THROW({ client = iggy::ffi::new_connection("iggy://iggy:iggy@127.0.0.1:8090?heartbeat_interval=10s"); }); + ASSERT_NE(client, nullptr); + + const auto heartbeat_interval = client->heartbeat_interval(); + EXPECT_EQ(heartbeat_interval, 10'000'000u); + + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); + client = nullptr; +} + +TEST(LowLevelE2E_Client, SnapshotBeforeLoginThrows) { + RecordProperty("description", "Rejects snapshot before connect, and after connect but before login."); + iggy::ffi::Client *client = nullptr; + ASSERT_NO_THROW({ client = iggy::ffi::new_connection(""); }); + ASSERT_NE(client, nullptr); + + rust::Vec snapshot_types_before_connect; + snapshot_types_before_connect.push_back("test"); + ASSERT_THROW(client->snapshot("deflated", std::move(snapshot_types_before_connect)), std::exception); + + ASSERT_NO_THROW(client->connect()); + rust::Vec snapshot_types_before_login; + snapshot_types_before_login.push_back("test"); + ASSERT_THROW(client->snapshot("deflated", std::move(snapshot_types_before_login)), std::exception); + + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); + client = nullptr; +} + +TEST(LowLevelE2E_Client, SnapshotAllCombinedWithOtherTypeThrows) { + RecordProperty("description", "Rejects combining the all snapshot type with any other snapshot type."); + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + rust::Vec snapshot_types; + snapshot_types.push_back("all"); + snapshot_types.push_back("test"); + + ASSERT_THROW(client->snapshot("deflated", std::move(snapshot_types)), std::exception); + + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); + client = nullptr; +} + +TEST(LowLevelE2E_Client, SnapshotWithInvalidCompressionThrows) { + RecordProperty("description", "Rejects invalid snapshot compression values in the wrapper before sending."); + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + rust::Vec snapshot_types; + snapshot_types.push_back("test"); + + ASSERT_THROW(client->snapshot("invalid-compression", std::move(snapshot_types)), std::exception); + + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); + client = nullptr; +} + +TEST(LowLevelE2E_Client, SnapshotWithInvalidSnapshotTypeThrows) { + RecordProperty("description", "Rejects invalid snapshot type values in the wrapper before sending."); + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + rust::Vec snapshot_types; + snapshot_types.push_back("not-a-real-type"); + + ASSERT_THROW(client->snapshot("deflated", std::move(snapshot_types)), std::exception); + + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); + client = nullptr; +} diff --git a/foreign/cpp/tests/unit_tests.cpp b/foreign/cpp/tests/unit_tests.cpp index e61b814978..f12cd89158 100644 --- a/foreign/cpp/tests/unit_tests.cpp +++ b/foreign/cpp/tests/unit_tests.cpp @@ -26,6 +26,25 @@ TEST(CompressionAlgorithmTest, ReturnsExpectedValues) { EXPECT_EQ(iggy::CompressionAlgorithm::gzip().compression_algorithm_value(), "gzip"); } +TEST(SnapshotCompressionTest, ReturnsExpectedValues) { + EXPECT_EQ(iggy::SnapshotCompression::stored().snapshot_compression_value(), "stored"); + EXPECT_EQ(iggy::SnapshotCompression::deflated().snapshot_compression_value(), "deflated"); + EXPECT_EQ(iggy::SnapshotCompression::bzip2().snapshot_compression_value(), "bzip2"); + EXPECT_EQ(iggy::SnapshotCompression::zstd().snapshot_compression_value(), "zstd"); + EXPECT_EQ(iggy::SnapshotCompression::lzma().snapshot_compression_value(), "lzma"); + EXPECT_EQ(iggy::SnapshotCompression::xz().snapshot_compression_value(), "xz"); +} + +TEST(SnapshotTypeTest, ReturnsExpectedValues) { + EXPECT_EQ(iggy::SnapshotType::filesystem_overview().snapshot_type_value(), "filesystem_overview"); + EXPECT_EQ(iggy::SnapshotType::process_list().snapshot_type_value(), "process_list"); + EXPECT_EQ(iggy::SnapshotType::resource_usage().snapshot_type_value(), "resource_usage"); + EXPECT_EQ(iggy::SnapshotType::test().snapshot_type_value(), "test"); + EXPECT_EQ(iggy::SnapshotType::server_logs().snapshot_type_value(), "server_logs"); + EXPECT_EQ(iggy::SnapshotType::server_config().snapshot_type_value(), "server_config"); + EXPECT_EQ(iggy::SnapshotType::all().snapshot_type_value(), "all"); +} + TEST(IdKindTest, ReturnsExpectedValues) { EXPECT_EQ(iggy::IdKind::numeric().id_kind_value(), "numeric"); EXPECT_EQ(iggy::IdKind::string().id_kind_value(), "string"); From 6ead030ce8a2ca9cc1cf8cbcccf4b1f7c6cdf0cb Mon Sep 17 00:00:00 2001 From: Rimuksh Kansal Date: Wed, 29 Apr 2026 23:33:29 +0900 Subject: [PATCH 2/5] partially address review --- foreign/cpp/build.rs | 1 - foreign/cpp/src/client.rs | 89 ++++++++++----------- foreign/cpp/src/identifier.rs | 4 +- foreign/cpp/src/lib.rs | 7 +- foreign/cpp/tests/client/low_level_e2e.cpp | 74 ++++++++++------- foreign/cpp/tests/common/test_helpers.hpp | 13 ++- foreign/cpp/tests/identifier/unit_tests.cpp | 22 ++--- 7 files changed, 116 insertions(+), 94 deletions(-) diff --git a/foreign/cpp/build.rs b/foreign/cpp/build.rs index 88034dabeb..d2cbf0094c 100644 --- a/foreign/cpp/build.rs +++ b/foreign/cpp/build.rs @@ -26,6 +26,5 @@ fn main() { println!("cargo:rerun-if-changed=src/lib.rs"); println!("cargo:rerun-if-changed=src/messages.rs"); println!("cargo:rerun-if-changed=src/stream.rs"); - println!("cargo:rerun-if-changed=src/system.rs"); println!("cargo:rerun-if-changed=src/topic.rs"); } diff --git a/foreign/cpp/src/client.rs b/foreign/cpp/src/client.rs index 28a6cea20a..f590e03d00 100644 --- a/foreign/cpp/src/client.rs +++ b/foreign/cpp/src/client.rs @@ -40,10 +40,11 @@ use std::sync::Arc; const ANY_PARTITION_ID: u32 = u32::MAX; impl From for ffi::ClientInfo { fn from(client: RustClientInfo) -> Self { + let has_user_id = client.user_id.is_some(); ffi::ClientInfo { client_id: client.client_id, - // TODO(slbotbm): In high-level client, this should be converted to None. - user_id: client.user_id.unwrap_or(u32::MAX), + has_user_id, + user_id: client.user_id.unwrap_or(0), address: client.address, transport: client.transport, consumer_groups_count: client.consumer_groups_count, @@ -53,10 +54,11 @@ impl From for ffi::ClientInfo { impl From for ffi::ClientInfoDetails { fn from(client: RustClientInfoDetails) -> Self { + let has_user_id = client.user_id.is_some(); ffi::ClientInfoDetails { client_id: client.client_id, - // TODO(slbotbm): In high-level client, this should be converted to None. - user_id: client.user_id.unwrap_or(u32::MAX), + has_user_id, + user_id: client.user_id.unwrap_or(0), address: client.address, transport: client.transport, consumer_groups_count: client.consumer_groups_count, @@ -95,6 +97,7 @@ impl From<(RustCacheMetricsKey, RustCacheMetrics)> for ffi::CacheMetricEntry { impl From for ffi::Stats { fn from(stats: RustStats) -> Self { + let has_server_semver = stats.iggy_server_semver.is_some(); ffi::Stats { process_id: stats.process_id, cpu_usage: stats.cpu_usage, @@ -119,7 +122,8 @@ impl From for ffi::Stats { os_version: stats.os_version, kernel_version: stats.kernel_version, iggy_server_version: stats.iggy_server_version, - iggy_server_semver: stats.iggy_server_semver.unwrap_or(u32::MAX), + has_server_semver, + iggy_server_semver: stats.iggy_server_semver.unwrap_or(0), cache_metrics: stats .cache_metrics .into_iter() @@ -161,13 +165,13 @@ pub fn new_connection(connection_string: String) -> Result<*mut Client, String> .map_err(|error| format!("Could not build default connection: {error}"))?, s if s.starts_with("iggy://") || s.starts_with("iggy+") => { RustIggyClient::from_connection_string(s) - .map_err(|error| format!("Could not parse connection string '{}': {error}", s))? + .map_err(|error| format!("Could not parse connection string '{s}': {error}"))? } s => RustIggyClientBuilder::new() .with_tcp() .with_server_address(connection_string.clone()) .build() - .map_err(|error| format!("Could not build connection for address '{}': {error}", s))?, + .map_err(|error| format!("Could not build connection for address '{s}': {error}"))?, }; Ok(Box::into_raw(Box::new(Client { @@ -181,7 +185,7 @@ impl Client { self.inner .login_user(&username, &password) .await - .map_err(|error| format!("Could not login user '{}': {error}", username))?; + .map_err(|error| format!("Could not login user '{username}': {error}"))?; Ok(()) }) } @@ -212,7 +216,7 @@ impl Client { self.inner .create_stream(&stream_name) .await - .map_err(|error| format!("Could not create stream '{}': {error}", stream_name))?; + .map_err(|error| format!("Could not create stream '{stream_name}': {error}"))?; Ok(()) }) } @@ -226,9 +230,9 @@ impl Client { .inner .get_stream(&rust_stream_id) .await - .map_err(|error| format!("Could not get stream '{}': {error}", rust_stream_id))?; - let stream_details = stream_details - .ok_or_else(|| format!("Stream '{}' was not found", rust_stream_id))?; + .map_err(|error| format!("Could not get stream '{rust_stream_id}': {error}"))?; + let stream_details = + stream_details.ok_or_else(|| format!("Stream '{rust_stream_id}' was not found"))?; Ok(ffi::StreamDetails::from(stream_details)) }) } @@ -241,9 +245,7 @@ impl Client { self.inner .delete_stream(&rust_stream_id) .await - .map_err(|error| { - format!("Could not delete stream '{}': {error}", rust_stream_id) - })?; + .map_err(|error| format!("Could not delete stream '{rust_stream_id}': {error}"))?; Ok(()) }) } @@ -407,13 +409,12 @@ impl Client { max_topic_size: String, ) -> Result<(), String> { let rust_stream_id = RustIdentifier::try_from(stream_id) - .map_err(|error| format!("Could not create topic '{}': {error}", topic_name))?; + .map_err(|error| format!("Could not create topic '{topic_name}': {error}"))?; let rust_compression_algorithm = match compression_algorithm.to_lowercase().as_str() { "" | "none" => RustCompressionAlgorithm::None, _ => RustCompressionAlgorithm::from_str(&compression_algorithm).map_err(|error| { format!( - "Could not create topic '{}': invalid compression algorithm '{}': {error}", - topic_name, compression_algorithm + "Could not create topic '{topic_name}': invalid compression algorithm '{compression_algorithm}': {error}" ) })?, }; @@ -429,8 +430,7 @@ impl Client { )), _ => { return Err(format!( - "Could not create topic '{}': invalid message expiry kind '{}'", - topic_name, message_expiry_kind + "Could not create topic '{topic_name}': invalid message expiry kind '{message_expiry_kind}'" )); } }; @@ -438,8 +438,7 @@ impl Client { "" | "server_default" | "0" => RustMaxTopicSize::ServerDefault, _ => RustMaxTopicSize::from_str(&max_topic_size).map_err(|error| { format!( - "Could not create topic '{}': invalid max topic size '{}': {error}", - topic_name, max_topic_size + "Could not create topic '{topic_name}': invalid max topic size '{max_topic_size}': {error}" ) })?, }; @@ -458,8 +457,7 @@ impl Client { .await .map_err(|error| { format!( - "Could not create topic '{}' on stream '{}': {error}", - topic_name, rust_stream_id + "Could not create topic '{topic_name}' on stream '{rust_stream_id}': {error}" ) })?; Ok(()) @@ -510,8 +508,7 @@ impl Client { .await .map_err(|error| { format!( - "Could not create {partitions_count} partitions for topic '{}' on stream '{}': {error}", - rust_topic_id, rust_stream_id + "Could not create {partitions_count} partitions for topic '{rust_topic_id}' on stream '{rust_stream_id}': {error}" ) })?; Ok(()) @@ -537,8 +534,7 @@ impl Client { .await .map_err(|error| { format!( - "Could not delete {partitions_count} partitions for topic '{}' on stream '{}': {error}", - rust_topic_id, rust_stream_id + "Could not delete {partitions_count} partitions for topic '{rust_topic_id}' on stream '{rust_stream_id}': {error}" ) })?; Ok(()) @@ -552,16 +548,10 @@ impl Client { name: String, ) -> Result { let rust_stream_id = RustIdentifier::try_from(stream_id).map_err(|error| { - format!( - "Could not create consumer group '{}': invalid stream identifier: {error}", - name - ) + format!("Could not create consumer group '{name}': invalid stream identifier: {error}") })?; let rust_topic_id = RustIdentifier::try_from(topic_id).map_err(|error| { - format!( - "Could not create consumer group '{}': invalid topic identifier: {error}", - name - ) + format!("Could not create consumer group '{name}': invalid topic identifier: {error}") })?; RUNTIME.block_on(async { @@ -571,8 +561,7 @@ impl Client { .await .map_err(|error| { format!( - "Could not create consumer group '{}' for topic '{}' on stream '{}': {error}", - name, rust_topic_id, rust_stream_id + "Could not create consumer group '{name}' for topic '{rust_topic_id}' on stream '{rust_stream_id}': {error}" ) })?; Ok(ffi::ConsumerGroupDetails::from(group)) @@ -602,14 +591,12 @@ impl Client { .await .map_err(|error| { format!( - "Could not get consumer group '{}' for topic '{}' on stream '{}': {error}", - rust_group_id, rust_topic_id, rust_stream_id + "Could not get consumer group '{rust_group_id}' for topic '{rust_topic_id}' on stream '{rust_stream_id}': {error}" ) })?; let group = group.ok_or_else(|| { format!( - "Consumer group '{}' was not found for topic '{}' on stream '{}'", - rust_group_id, rust_topic_id, rust_stream_id + "Consumer group '{rust_group_id}' was not found for topic '{rust_topic_id}' on stream '{rust_stream_id}'" ) })?; Ok(ffi::ConsumerGroupDetails::from(group)) @@ -638,8 +625,7 @@ impl Client { .await .map_err(|error| { format!( - "Could not delete consumer group '{}' for topic '{}' on stream '{}': {error}", - rust_group_id, rust_topic_id, rust_stream_id + "Could not delete consumer group '{rust_group_id}' for topic '{rust_topic_id}' on stream '{rust_stream_id}': {error}" ) })?; Ok(()) @@ -755,6 +741,8 @@ impl Client { })?; Ok(()) pub fn heartbeat_interval(&self) -> u64 { + // The upstream client exposes this config-derived value via an async API, + // so the synchronous C++ wrapper reads it by blocking on the runtime. RUNTIME.block_on(async { self.inner.heartbeat_interval().await.as_micros() }) } @@ -764,7 +752,12 @@ impl Client { snapshot_types: Vec, ) -> Result, String> { let rust_compression = match snapshot_compression.trim() { - "" => RustSnapshotCompression::default(), + "" => { + return Err( + "Could not capture snapshot: snapshot_compression must not be empty" + .to_string(), + ); + } value => RustSnapshotCompression::from_str(value).map_err(|error| { format!("Could not capture snapshot: invalid compression '{value}': {error}") })?, @@ -774,8 +767,7 @@ impl Client { .map(|snapshot_type| { RustSystemSnapshotType::from_str(&snapshot_type).map_err(|error| { format!( - "Could not capture snapshot: invalid snapshot type '{}': {}", - snapshot_type, error + "Could not capture snapshot: invalid snapshot type '{snapshot_type}': {error}" ) }) }) @@ -787,7 +779,8 @@ impl Client { .snapshot(rust_compression, rust_snapshot_types) .await .map_err(|error| format!("Could not capture snapshot: {error}"))?; - Ok(snapshot.0) + let iggy_common::Snapshot(bytes) = snapshot; + Ok(bytes) }) } } diff --git a/foreign/cpp/src/identifier.rs b/foreign/cpp/src/identifier.rs index 93877458dd..e68816609c 100644 --- a/foreign/cpp/src/identifier.rs +++ b/foreign/cpp/src/identifier.rs @@ -67,14 +67,14 @@ impl TryFrom for RustIdentifier { // preserves the C++ ABI used by every test and downstream binding. #[allow(clippy::wrong_self_convention)] impl ffi::Identifier { - pub fn from_string(&mut self, id: String) -> Result<(), String> { + pub fn set_string(&mut self, id: String) -> Result<(), String> { *self = RustIdentifier::named(&id) .map(ffi::Identifier::from) .map_err(|error| format!("Could not create string identifier: {error}"))?; Ok(()) } - pub fn from_numeric(&mut self, id: u32) -> Result<(), String> { + pub fn set_numeric(&mut self, id: u32) -> Result<(), String> { *self = RustIdentifier::numeric(id) .map(ffi::Identifier::from) .map_err(|error| format!("Could not create numeric identifier: {error}"))?; diff --git a/foreign/cpp/src/lib.rs b/foreign/cpp/src/lib.rs index c4e4fae659..15fc28f5e9 100644 --- a/foreign/cpp/src/lib.rs +++ b/foreign/cpp/src/lib.rs @@ -122,6 +122,7 @@ mod ffi { struct ClientInfo { client_id: u32, + has_user_id: bool, user_id: u32, address: String, transport: String, @@ -130,6 +131,7 @@ mod ffi { struct ClientInfoDetails { client_id: u32, + has_user_id: bool, user_id: u32, address: String, transport: String, @@ -170,6 +172,7 @@ mod ffi { os_version: String, kernel_version: String, iggy_server_version: String, + has_server_semver: bool, iggy_server_semver: u32, cache_metrics: Vec, threads_count: u32, @@ -285,7 +288,7 @@ mod ffi { unsafe fn delete_connection(client: *mut Client) -> Result<()>; // Identifier functions - fn from_string(self: &mut Identifier, id: String) -> Result<()>; - fn from_numeric(self: &mut Identifier, id: u32) -> Result<()>; + fn set_string(self: &mut Identifier, id: String) -> Result<()>; + fn set_numeric(self: &mut Identifier, id: u32) -> Result<()>; } } diff --git a/foreign/cpp/tests/client/low_level_e2e.cpp b/foreign/cpp/tests/client/low_level_e2e.cpp index f9c98fcc70..25941874d8 100644 --- a/foreign/cpp/tests/client/low_level_e2e.cpp +++ b/foreign/cpp/tests/client/low_level_e2e.cpp @@ -169,7 +169,7 @@ TEST(LowLevelE2E_Client, GetStatsBeforeLoginThrows) { client = nullptr; } -// TODO(slbotbm-this PR): add a test to create some streams, topics, partitions, and segments, send messages, and create +// TODO(slbotbmR): add a test to create some streams, topics, partitions, and segments, send messages, and create // consumer groups and verify it. TEST(LowLevelE2E_Client, GetStatsReturnsServerStats) { RecordProperty("description", @@ -203,9 +203,7 @@ TEST(LowLevelE2E_Client, GetStatsReturnsServerStats) { EXPECT_FALSE(static_cast(empty_stats.os_version).empty()); EXPECT_FALSE(static_cast(empty_stats.kernel_version).empty()); EXPECT_FALSE(static_cast(empty_stats.iggy_server_version).empty()); - EXPECT_TRUE(empty_stats.iggy_server_semver == std::numeric_limits::max() || - empty_stats.iggy_server_semver > 0u); - EXPECT_GE(empty_stats.cache_metrics.size(), 0u); + EXPECT_TRUE(!empty_stats.has_server_semver || empty_stats.iggy_server_semver > 0u); EXPECT_EQ(empty_stats.streams_count, 0u); EXPECT_EQ(empty_stats.topics_count, 0u); EXPECT_EQ(empty_stats.partitions_count, 0u); @@ -327,6 +325,7 @@ TEST(LowLevelE2E_Client, GetStatsIsStableAcrossBackToBackCalls) { static_cast(first_stats.kernel_version)); EXPECT_EQ(static_cast(second_stats.iggy_server_version), static_cast(first_stats.iggy_server_version)); + EXPECT_EQ(second_stats.has_server_semver, first_stats.has_server_semver); EXPECT_EQ(second_stats.iggy_server_semver, first_stats.iggy_server_semver); EXPECT_EQ(second_stats.clients_count, first_stats.clients_count); EXPECT_EQ(second_stats.streams_count, first_stats.streams_count); @@ -352,7 +351,7 @@ TEST(LowLevelE2E_Client, GetMeBeforeLoginThrows) { client = nullptr; } -// TODO(slbotbm-this PR): add additional validation for get_me after merging join_consumer_group PR. +// TODO(slbotbm): add additional validation for get_me after merging join_consumer_group PR. TEST(LowLevelE2E_Client, GetMeReturnsCurrentClientDetails) { RecordProperty("description", "Returns the current authenticated client details."); iggy::ffi::Client *client = login_to_server(); @@ -361,7 +360,7 @@ TEST(LowLevelE2E_Client, GetMeReturnsCurrentClientDetails) { ASSERT_NO_THROW({ const auto me = client->get_me(); EXPECT_NE(me.client_id, 0u); - EXPECT_NE(me.user_id, std::numeric_limits::max()); + EXPECT_TRUE(me.has_user_id); EXPECT_FALSE(static_cast(me.address).empty()); EXPECT_EQ(static_cast(me.transport), "TCP"); EXPECT_EQ(me.consumer_groups_count, 0u); @@ -385,7 +384,10 @@ TEST(LowLevelE2E_Client, GetMeIsStableAcrossBackToBackCalls) { }); EXPECT_NE(first_me.client_id, 0u); + EXPECT_TRUE(first_me.has_user_id); + EXPECT_TRUE(second_me.has_user_id); EXPECT_EQ(second_me.client_id, first_me.client_id); + EXPECT_EQ(second_me.has_user_id, first_me.has_user_id); EXPECT_EQ(second_me.user_id, first_me.user_id); EXPECT_EQ(static_cast(second_me.address), static_cast(first_me.address)); EXPECT_EQ(static_cast(first_me.transport), "TCP"); @@ -416,7 +418,10 @@ TEST(LowLevelE2E_Client, GetMeReturnsDistinctClientIdsForDifferentSessions) { EXPECT_NE(first_me.client_id, 0u); EXPECT_NE(second_me.client_id, 0u); + EXPECT_TRUE(first_me.has_user_id); + EXPECT_TRUE(second_me.has_user_id); EXPECT_NE(second_me.client_id, first_me.client_id); + EXPECT_EQ(second_me.has_user_id, first_me.has_user_id); EXPECT_EQ(second_me.user_id, first_me.user_id); EXPECT_EQ(static_cast(first_me.transport), "TCP"); EXPECT_EQ(static_cast(second_me.transport), "TCP"); @@ -436,7 +441,7 @@ TEST(LowLevelE2E_Client, GetMeReturnsValidDetailsAfterReconnect) { iggy::ffi::ClientInfoDetails first_me{}; ASSERT_NO_THROW({ first_me = first_client->get_me(); }); EXPECT_NE(first_me.client_id, 0u); - EXPECT_NE(first_me.user_id, std::numeric_limits::max()); + EXPECT_TRUE(first_me.has_user_id); ASSERT_NO_THROW(iggy::ffi::delete_connection(first_client)); first_client = nullptr; @@ -447,7 +452,8 @@ TEST(LowLevelE2E_Client, GetMeReturnsValidDetailsAfterReconnect) { iggy::ffi::ClientInfoDetails second_me{}; ASSERT_NO_THROW({ second_me = second_client->get_me(); }); EXPECT_NE(second_me.client_id, 0u); - EXPECT_NE(second_me.user_id, std::numeric_limits::max()); + EXPECT_TRUE(second_me.has_user_id); + EXPECT_EQ(second_me.has_user_id, first_me.has_user_id); EXPECT_EQ(second_me.user_id, first_me.user_id); EXPECT_EQ(static_cast(first_me.transport), "TCP"); EXPECT_EQ(static_cast(second_me.transport), "TCP"); @@ -502,7 +508,10 @@ TEST(LowLevelE2E_Client, GetClientReturnsDetailsForMatchingClientId) { }); EXPECT_NE(current_client.client_id, 0u); + EXPECT_TRUE(current_client.has_user_id); + EXPECT_TRUE(looked_up_client.has_user_id); EXPECT_EQ(looked_up_client.client_id, current_client.client_id); + EXPECT_EQ(looked_up_client.has_user_id, current_client.has_user_id); EXPECT_EQ(looked_up_client.user_id, current_client.user_id); EXPECT_EQ(static_cast(looked_up_client.address), static_cast(current_client.address)); EXPECT_EQ(static_cast(looked_up_client.transport), "TCP"); @@ -529,8 +538,13 @@ TEST(LowLevelE2E_Client, GetClientIsStableAcrossBackToBackCalls) { }); EXPECT_NE(current_client.client_id, 0u); + EXPECT_TRUE(current_client.has_user_id); + EXPECT_TRUE(first_lookup.has_user_id); + EXPECT_TRUE(second_lookup.has_user_id); EXPECT_EQ(first_lookup.client_id, current_client.client_id); EXPECT_EQ(second_lookup.client_id, first_lookup.client_id); + EXPECT_EQ(first_lookup.has_user_id, current_client.has_user_id); + EXPECT_EQ(second_lookup.has_user_id, first_lookup.has_user_id); EXPECT_EQ(second_lookup.user_id, first_lookup.user_id); EXPECT_EQ(static_cast(second_lookup.address), static_cast(first_lookup.address)); EXPECT_EQ(static_cast(first_lookup.transport), "TCP"); @@ -583,6 +597,7 @@ TEST(LowLevelE2E_Client, GetClientsReturnsActiveClientSessions) { if (client.client_id == first_me.client_id) { found_first = true; + EXPECT_EQ(client.has_user_id, first_me.has_user_id); EXPECT_EQ(client.user_id, first_me.user_id); EXPECT_EQ(static_cast(client.address), static_cast(first_me.address)); EXPECT_EQ(client.consumer_groups_count, first_me.consumer_groups_count); @@ -590,6 +605,7 @@ TEST(LowLevelE2E_Client, GetClientsReturnsActiveClientSessions) { if (client.client_id == second_me.client_id) { found_second = true; + EXPECT_EQ(client.has_user_id, second_me.has_user_id); EXPECT_EQ(client.user_id, second_me.user_id); EXPECT_EQ(static_cast(client.address), static_cast(second_me.address)); EXPECT_EQ(client.consumer_groups_count, second_me.consumer_groups_count); @@ -628,6 +644,7 @@ TEST(LowLevelE2E_Client, GetClientsIsStableAcrossBackToBackCalls) { } found_match = true; + EXPECT_EQ(second_entry.has_user_id, first_entry.has_user_id); EXPECT_EQ(second_entry.user_id, first_entry.user_id); EXPECT_EQ(static_cast(second_entry.address), static_cast(first_entry.address)); EXPECT_EQ(static_cast(second_entry.transport), @@ -661,6 +678,7 @@ TEST(LowLevelE2E_Client, GetClientsMatchesGetClientForReturnedIds) { ASSERT_NO_THROW({ details = first_client->get_client(client.client_id); }); EXPECT_EQ(details.client_id, client.client_id); + EXPECT_EQ(details.has_user_id, client.has_user_id); EXPECT_EQ(details.user_id, client.user_id); EXPECT_EQ(static_cast(details.address), static_cast(client.address)); EXPECT_EQ(static_cast(details.transport), static_cast(client.transport)); @@ -707,6 +725,7 @@ TEST(LowLevelE2E_Client, GetClientsReflectsAdditionalSession) { } found_after = true; + EXPECT_EQ(client.has_user_id, second_me.has_user_id); EXPECT_EQ(client.user_id, second_me.user_id); EXPECT_EQ(static_cast(client.address), static_cast(second_me.address)); EXPECT_EQ(static_cast(client.transport), "TCP"); @@ -767,14 +786,10 @@ TEST(LowLevelE2E_Client, SnapshotBeforeLoginThrows) { ASSERT_NO_THROW({ client = iggy::ffi::new_connection(""); }); ASSERT_NE(client, nullptr); - rust::Vec snapshot_types_before_connect; - snapshot_types_before_connect.push_back("test"); - ASSERT_THROW(client->snapshot("deflated", std::move(snapshot_types_before_connect)), std::exception); + ASSERT_THROW(client->snapshot("deflated", make_snapshot_types({"test"})), std::exception); ASSERT_NO_THROW(client->connect()); - rust::Vec snapshot_types_before_login; - snapshot_types_before_login.push_back("test"); - ASSERT_THROW(client->snapshot("deflated", std::move(snapshot_types_before_login)), std::exception); + ASSERT_THROW(client->snapshot("deflated", make_snapshot_types({"test"})), std::exception); ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); client = nullptr; @@ -785,25 +800,33 @@ TEST(LowLevelE2E_Client, SnapshotAllCombinedWithOtherTypeThrows) { iggy::ffi::Client *client = login_to_server(); ASSERT_NE(client, nullptr); - rust::Vec snapshot_types; - snapshot_types.push_back("all"); - snapshot_types.push_back("test"); + ASSERT_THROW(client->snapshot("deflated", make_snapshot_types({"all", "test"})), std::exception); - ASSERT_THROW(client->snapshot("deflated", std::move(snapshot_types)), std::exception); + ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); + client = nullptr; +} + +TEST(LowLevelE2E_Client, SnapshotReturnsNonEmptyBytes) { + RecordProperty("description", "Returns a non-empty snapshot for a valid compression and snapshot type."); + iggy::ffi::Client *client = login_to_server(); + ASSERT_NE(client, nullptr); + + rust::Vec snapshot_bytes; + ASSERT_NO_THROW({ snapshot_bytes = client->snapshot("deflated", make_snapshot_types({"test"})); }); + EXPECT_FALSE(snapshot_bytes.empty()); ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); client = nullptr; } TEST(LowLevelE2E_Client, SnapshotWithInvalidCompressionThrows) { - RecordProperty("description", "Rejects invalid snapshot compression values in the wrapper before sending."); + RecordProperty("description", + "Rejects empty or invalid snapshot compression values in the wrapper before sending."); iggy::ffi::Client *client = login_to_server(); ASSERT_NE(client, nullptr); - rust::Vec snapshot_types; - snapshot_types.push_back("test"); - - ASSERT_THROW(client->snapshot("invalid-compression", std::move(snapshot_types)), std::exception); + ASSERT_THROW(client->snapshot("", make_snapshot_types({"test"})), std::exception); + ASSERT_THROW(client->snapshot("invalid-compression", make_snapshot_types({"test"})), std::exception); ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); client = nullptr; @@ -814,10 +837,7 @@ TEST(LowLevelE2E_Client, SnapshotWithInvalidSnapshotTypeThrows) { iggy::ffi::Client *client = login_to_server(); ASSERT_NE(client, nullptr); - rust::Vec snapshot_types; - snapshot_types.push_back("not-a-real-type"); - - ASSERT_THROW(client->snapshot("deflated", std::move(snapshot_types)), std::exception); + ASSERT_THROW(client->snapshot("deflated", make_snapshot_types({"not-a-real-type"})), std::exception); ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); client = nullptr; diff --git a/foreign/cpp/tests/common/test_helpers.hpp b/foreign/cpp/tests/common/test_helpers.hpp index 15851f08cc..7cb5160370 100644 --- a/foreign/cpp/tests/common/test_helpers.hpp +++ b/foreign/cpp/tests/common/test_helpers.hpp @@ -20,19 +20,20 @@ #pragma once #include +#include #include #include "lib.rs.h" inline iggy::ffi::Identifier make_string_identifier(const std::string &value) { iggy::ffi::Identifier identifier; - identifier.from_string(value); + identifier.set_string(value); return identifier; } inline iggy::ffi::Identifier make_numeric_identifier(const std::uint32_t value) { iggy::ffi::Identifier identifier; - identifier.from_numeric(value); + identifier.set_numeric(value); return identifier; } @@ -58,4 +59,10 @@ inline rust::Vec partition_id_bytes(std::uint32_t id) { v.push_back(static_cast((id >> 16) & 0xFF)); v.push_back(static_cast((id >> 24) & 0xFF)); return v; -} + inline rust::Vec make_snapshot_types(std::initializer_list values) { + rust::Vec snapshot_types; + for (const auto value : values) { + snapshot_types.push_back(value); + } + return snapshot_types; + } diff --git a/foreign/cpp/tests/identifier/unit_tests.cpp b/foreign/cpp/tests/identifier/unit_tests.cpp index bed56c273f..b5ee7f6276 100644 --- a/foreign/cpp/tests/identifier/unit_tests.cpp +++ b/foreign/cpp/tests/identifier/unit_tests.cpp @@ -29,7 +29,7 @@ TEST(LowLevelE2E_Identifier, FromStringCreatesStringIdentifier) { const std::string value = "stream-identifier"; iggy::ffi::Identifier identifier; - ASSERT_NO_THROW(identifier.from_string(value)); + ASSERT_NO_THROW(identifier.set_string(value)); ASSERT_EQ(identifier.kind, "string"); ASSERT_EQ(identifier.length, value.size()); @@ -50,7 +50,7 @@ TEST(LowLevelE2E_Identifier, FromStringAcceptsExact255ByteUtf8Value) { ASSERT_EQ(value.size(), 255u); iggy::ffi::Identifier identifier; - ASSERT_NO_THROW(identifier.from_string(value)); + ASSERT_NO_THROW(identifier.set_string(value)); ASSERT_EQ(identifier.kind, "string"); ASSERT_EQ(identifier.length, value.size()); @@ -64,7 +64,7 @@ TEST(LowLevelE2E_Identifier, FromStringRejectsEmptyValue) { RecordProperty("description", "Rejects creating a string identifier from an empty string."); iggy::ffi::Identifier identifier; - ASSERT_THROW(identifier.from_string(""), std::exception); + ASSERT_THROW(identifier.set_string(""), std::exception); } TEST(LowLevelE2E_Identifier, FromStringRejectsUtf8ValueLongerThan255Bytes) { @@ -77,7 +77,7 @@ TEST(LowLevelE2E_Identifier, FromStringRejectsUtf8ValueLongerThan255Bytes) { ASSERT_EQ(too_long_value.size(), 256u); - ASSERT_THROW(identifier.from_string(too_long_value), std::exception); + ASSERT_THROW(identifier.set_string(too_long_value), std::exception); } TEST(LowLevelE2E_Identifier, FromStringRejectsAsciiValueLongerThan255Bytes) { @@ -85,7 +85,7 @@ TEST(LowLevelE2E_Identifier, FromStringRejectsAsciiValueLongerThan255Bytes) { iggy::ffi::Identifier identifier; const std::string too_long_value(256, 'a'); - ASSERT_THROW(identifier.from_string(too_long_value), std::exception); + ASSERT_THROW(identifier.set_string(too_long_value), std::exception); } TEST(LowLevelE2E_Identifier, FromNumericCreatesNumericIdentifier) { @@ -94,7 +94,7 @@ TEST(LowLevelE2E_Identifier, FromNumericCreatesNumericIdentifier) { constexpr std::uint32_t value = 0x12345678; constexpr std::array expected_bytes = {0x78, 0x56, 0x34, 0x12}; - ASSERT_NO_THROW(identifier.from_numeric(value)); + ASSERT_NO_THROW(identifier.set_numeric(value)); ASSERT_EQ(identifier.kind, "numeric"); ASSERT_EQ(identifier.length, 4u); @@ -109,7 +109,7 @@ TEST(LowLevelE2E_Identifier, FromNumericCreatesUint32MaxIdentifier) { iggy::ffi::Identifier identifier; constexpr std::array expected_bytes = {0xFF, 0xFF, 0xFF, 0xFF}; - ASSERT_NO_THROW(identifier.from_numeric(std::numeric_limits::max())); + ASSERT_NO_THROW(identifier.set_numeric(std::numeric_limits::max())); ASSERT_EQ(identifier.kind, "numeric"); ASSERT_EQ(identifier.length, 4u); @@ -123,8 +123,8 @@ TEST(LowLevelE2E_Identifier, FromNumericOverwritesExistingStringIdentifier) { RecordProperty("description", "Replaces a previously created string identifier with numeric identifier data."); iggy::ffi::Identifier identifier; - ASSERT_NO_THROW(identifier.from_string("temporary-name")); - ASSERT_NO_THROW(identifier.from_numeric(7)); + ASSERT_NO_THROW(identifier.set_string("temporary-name")); + ASSERT_NO_THROW(identifier.set_numeric(7)); ASSERT_EQ(identifier.kind, "numeric"); ASSERT_EQ(identifier.length, 4u); @@ -140,8 +140,8 @@ TEST(LowLevelE2E_Identifier, FromStringOverwritesExistingNumericIdentifier) { const std::string value = "replacement-name"; iggy::ffi::Identifier identifier; - ASSERT_NO_THROW(identifier.from_numeric(42)); - ASSERT_NO_THROW(identifier.from_string(value)); + ASSERT_NO_THROW(identifier.set_numeric(42)); + ASSERT_NO_THROW(identifier.set_string(value)); ASSERT_EQ(identifier.kind, "string"); ASSERT_EQ(identifier.length, value.size()); From 4e2082ae05d8680f8349c64b2903201b594be07d Mon Sep 17 00:00:00 2001 From: Rimuksh Kansal Date: Mon, 11 May 2026 22:18:33 +0900 Subject: [PATCH 3/5] rebase to master --- foreign/cpp/.bazelignore | 1 + foreign/cpp/src/client.rs | 14 ++++++++++---- foreign/cpp/tests/common/test_helpers.hpp | 14 ++++++++------ 3 files changed, 19 insertions(+), 10 deletions(-) create mode 100644 foreign/cpp/.bazelignore diff --git a/foreign/cpp/.bazelignore b/foreign/cpp/.bazelignore new file mode 100644 index 0000000000..eb5a316cbd --- /dev/null +++ b/foreign/cpp/.bazelignore @@ -0,0 +1 @@ +target diff --git a/foreign/cpp/src/client.rs b/foreign/cpp/src/client.rs index f590e03d00..f20f96a02d 100644 --- a/foreign/cpp/src/client.rs +++ b/foreign/cpp/src/client.rs @@ -21,10 +21,9 @@ use iggy::prelude::{ ConsumerGroupClient, Identifier as RustIdentifier, IggyClient as RustIggyClient, IggyClientBuilder as RustIggyClientBuilder, IggyError, IggyExpiry as RustIggyExpiry, IggyMessage, IggyTimestamp, MaxTopicSize as RustMaxTopicSize, MessageClient, PartitionClient, - Partitioning, PollingStrategy, StreamClient, TopicClient, UserClient, - MaxTopicSize as RustMaxTopicSize, PartitionClient, - SnapshotCompression as RustSnapshotCompression, StreamClient, SystemClient as RustSystemClient, - SystemSnapshotType as RustSystemSnapshotType, TopicClient, UserClient, + Partitioning, PollingStrategy, SnapshotCompression as RustSnapshotCompression, StreamClient, + SystemClient as RustSystemClient, SystemSnapshotType as RustSystemSnapshotType, TopicClient, + UserClient, }; use iggy_common::{ CacheMetrics as RustCacheMetrics, CacheMetricsKey as RustCacheMetricsKey, @@ -658,6 +657,10 @@ impl Client { rust_group_id, rust_topic_id, rust_stream_id ) })?; + Ok(()) + }) + } + pub fn get_stats(&self) -> Result { RUNTIME.block_on(async { let stats = self @@ -740,6 +743,9 @@ impl Client { ) })?; Ok(()) + }) + } + pub fn heartbeat_interval(&self) -> u64 { // The upstream client exposes this config-derived value via an async API, // so the synchronous C++ wrapper reads it by blocking on the runtime. diff --git a/foreign/cpp/tests/common/test_helpers.hpp b/foreign/cpp/tests/common/test_helpers.hpp index 7cb5160370..110ee9d578 100644 --- a/foreign/cpp/tests/common/test_helpers.hpp +++ b/foreign/cpp/tests/common/test_helpers.hpp @@ -59,10 +59,12 @@ inline rust::Vec partition_id_bytes(std::uint32_t id) { v.push_back(static_cast((id >> 16) & 0xFF)); v.push_back(static_cast((id >> 24) & 0xFF)); return v; - inline rust::Vec make_snapshot_types(std::initializer_list values) { - rust::Vec snapshot_types; - for (const auto value : values) { - snapshot_types.push_back(value); - } - return snapshot_types; +} + +inline rust::Vec make_snapshot_types(std::initializer_list values) { + rust::Vec snapshot_types; + for (const auto value : values) { + snapshot_types.push_back(value); } + return snapshot_types; +} From 77338cc433db0df9df8a6afb1f8af65b30b7efd9 Mon Sep 17 00:00:00 2001 From: Rimuksh Kansal Date: Sat, 16 May 2026 16:25:23 +0900 Subject: [PATCH 4/5] add docs + warning to iggy.hpp --- foreign/cpp/include/iggy.hpp | 196 ++++++++++++++++++++++++++--------- 1 file changed, 146 insertions(+), 50 deletions(-) diff --git a/foreign/cpp/include/iggy.hpp b/foreign/cpp/include/iggy.hpp index 2b565a2e7c..28fa017085 100644 --- a/foreign/cpp/include/iggy.hpp +++ b/foreign/cpp/include/iggy.hpp @@ -26,82 +26,128 @@ namespace iggy { +/// Exception raised by the C++ client when an operation fails. class IggyException : public std::runtime_error { public: explicit IggyException(const char *message) : std::runtime_error(message) {} explicit IggyException(const std::string &message) : std::runtime_error(message) {} }; -class CompressionAlgorithm final { +namespace detail { + +/// Internal helper used by string-backed option types in this header. +template +class StringTag { + protected: + explicit StringTag(std::string value) : value_(std::move(value)) {} + + std::string_view value() const { return value_; } + + private: + std::string value_; +}; + +} // namespace detail + +/// Compression setting for `Client::create_topic(...)`. +/// +/// Use this type to choose whether messages in a topic are stored as-is or compressed with gzip. +/// +/// Internally, this value is passed across the Rust FFI as a string. +/// Values outside the supported set are rejected by the Rust client. +/// Some Rust-side aliases may also be accepted for compatibility. +class CompressionAlgorithm final : private detail::StringTag { public: + /// Store messages without compression. static CompressionAlgorithm none() { return CompressionAlgorithm("none"); } + /// Compress messages with gzip. static CompressionAlgorithm gzip() { return CompressionAlgorithm("gzip"); } - std::string_view compression_algorithm_value() const { return algorithm_; } + std::string_view compression_algorithm_value() const { return value(); } private: - explicit CompressionAlgorithm(std::string algorithm) : algorithm_(std::move(algorithm)) {} - - std::string algorithm_; + explicit CompressionAlgorithm(std::string algorithm) + : detail::StringTag(std::move(algorithm)) {} }; -class SnapshotCompression final { +/// Compression setting for `Client::snapshot(...)`. +/// +/// Use this type to choose how snapshot data is compressed in the generated archive. +/// +/// Internally, this value is passed across the Rust FFI as a string. +/// Values outside the supported set are rejected by the Rust client. +/// Some Rust-side aliases may also be accepted for compatibility. +class SnapshotCompression final : private detail::StringTag { public: + /// Store snapshot files without compression. static SnapshotCompression stored() { return SnapshotCompression("stored"); } + /// Use standard deflate compression. static SnapshotCompression deflated() { return SnapshotCompression("deflated"); } + /// Use bzip2 for a higher compression ratio at the cost of slower processing. static SnapshotCompression bzip2() { return SnapshotCompression("bzip2"); } + /// Use Zstandard for fast compression and decompression. static SnapshotCompression zstd() { return SnapshotCompression("zstd"); } + /// Use LZMA for high compression, especially for larger files. static SnapshotCompression lzma() { return SnapshotCompression("lzma"); } + /// Use XZ, which is similar to LZMA and often faster to decompress. static SnapshotCompression xz() { return SnapshotCompression("xz"); } - std::string_view snapshot_compression_value() const { return snapshot_compression_; } + std::string_view snapshot_compression_value() const { return value(); } private: explicit SnapshotCompression(std::string snapshot_compression) - : snapshot_compression_(std::move(snapshot_compression)) {} - - std::string snapshot_compression_; + : detail::StringTag(std::move(snapshot_compression)) {} }; -class SnapshotType final { +/// Snapshot selector for `Client::snapshot(...)`. +/// +/// Use this type to choose which snapshot data the server should include. +/// +/// Internally, each selected value is passed across the Rust FFI as a string. +/// Values outside the supported set are rejected by the Rust client. +/// Some Rust-side aliases may also be accepted for compatibility. +class SnapshotType final : private detail::StringTag { public: + /// Include an overview of the filesystem structure. static SnapshotType filesystem_overview() { return SnapshotType("filesystem_overview"); } + /// Include the list of currently running processes. static SnapshotType process_list() { return SnapshotType("process_list"); } + /// Include CPU, memory, and other system resource usage statistics. static SnapshotType resource_usage() { return SnapshotType("resource_usage"); } + /// Include the test snapshot used for development and testing. static SnapshotType test() { return SnapshotType("test"); } + /// Include server logs from the configured logging directory. static SnapshotType server_logs() { return SnapshotType("server_logs"); } + /// Include the server configuration. static SnapshotType server_config() { return SnapshotType("server_config"); } + /// Include all available snapshot types. static SnapshotType all() { return SnapshotType("all"); } - std::string_view snapshot_type_value() const { return snapshot_type_; } + std::string_view snapshot_type_value() const { return value(); } private: - explicit SnapshotType(std::string snapshot_type) : snapshot_type_(std::move(snapshot_type)) {} - - std::string snapshot_type_; + explicit SnapshotType(std::string snapshot_type) : detail::StringTag(std::move(snapshot_type)) {} }; -class Expiry final { - public: - static Expiry server_default() { return Expiry("server_default", 0); } - static Expiry never_expire() { return Expiry("never_expire", std::numeric_limits::max()); } - static Expiry duration(std::uint64_t micros) { return Expiry("duration", micros); } - - std::string_view expiry_kind() const { return expiry_kind_; } - std::uint64_t expiry_value() const { return expiry_value_; } - - private: - explicit Expiry(std::string expiry_kind, std::uint64_t expiry_value) - : expiry_kind_(std::move(expiry_kind)), expiry_value_(expiry_value) {} - - std::string expiry_kind_; - std::uint64_t expiry_value_; -}; - -class MaxTopicSize final { +/// Maximum retained size for a topic. +/// +/// Use this type to choose whether a topic uses the server default limit, no limit, or an +/// explicit byte limit. +/// +/// Internally, this value is passed across the Rust FFI as a string. +/// Values outside the supported set are rejected by the Rust client. +/// Some Rust-side aliases may also be accepted for compatibility. +class MaxTopicSize final : private detail::StringTag { public: + /// Use the server's default maximum topic size. static MaxTopicSize server_default() { return MaxTopicSize("server_default"); } + /// Disable the maximum topic size limit. static MaxTopicSize unlimited() { return MaxTopicSize("unlimited"); } + /// Set an explicit maximum topic size in bytes. + /// + /// A value of `0` is treated as `server_default()`. A value of + /// `std::numeric_limits::max()` is treated as `unlimited()`. + /// The configured limit cannot be lower than the server's segment size. static MaxTopicSize from_bytes(std::uint64_t bytes) { if (bytes == 0) { return server_default(); @@ -112,20 +158,84 @@ class MaxTopicSize final { return MaxTopicSize(std::to_string(bytes)); } - std::string_view max_topic_size() const { return max_topic_size_; } + std::string_view max_topic_size() const { return value(); } + + private: + explicit MaxTopicSize(std::string max_topic_size) : detail::StringTag(std::move(max_topic_size)) {} +}; + +// TODO(slbotbm): Add rust bindings for Identifier that will use IdKind +/// Describes whether an identifier is numeric or string-based. +/// +/// Use this type to describe how an identifier is encoded. +/// +/// This type is reserved for future identifier bindings and is not currently passed through the +/// Rust FFI. +class IdKind final : private detail::StringTag { + public: + /// A numeric identifier represented as a 32-bit integer. + static IdKind numeric() { return IdKind("numeric"); } + /// A string identifier represented by its text value. + static IdKind string() { return IdKind("string"); } + + std::string_view id_kind_value() const { return value(); } private: - explicit MaxTopicSize(std::string max_topic_size) : max_topic_size_(std::move(max_topic_size)) {} + explicit IdKind(std::string id_kind) : detail::StringTag(std::move(id_kind)) {} +}; + +/// Message expiry policy for `Client::create_topic(...)`. +/// +/// Use this type to choose how long messages in a topic are retained. +/// +/// `expiry_kind()` returns the selected mode. `expiry_value()` returns the associated payload: +/// the duration for `duration(micros)`, `0` for `server_default()`, or +/// `std::numeric_limits::max()` for `never_expire()`. +/// +/// Internally, this value is passed across the Rust FFI as a kind/value pair. +/// Unsupported kinds are rejected by the Rust client. +class Expiry final { + public: + /// Use the server's default message expiry policy. + static Expiry server_default() { return Expiry("server_default", 0); } + /// Keep messages until they are removed for some other reason, such as topic deletion. + static Expiry never_expire() { return Expiry("never_expire", std::numeric_limits::max()); } + /// Expire messages after the given number of microseconds. + static Expiry duration(std::uint64_t micros) { return Expiry("duration", micros); } - std::string max_topic_size_; + std::string_view expiry_kind() const { return expiry_kind_; } + std::uint64_t expiry_value() const { return expiry_value_; } + + private: + explicit Expiry(std::string expiry_kind, std::uint64_t expiry_value) + : expiry_kind_(std::move(expiry_kind)), expiry_value_(expiry_value) {} + + std::string expiry_kind_; + std::uint64_t expiry_value_; }; +/// Starting position for `Client::poll_messages(...)`. +/// +/// Use this type to choose where the server should begin reading messages. +/// +/// `polling_strategy_kind()` returns the selected mode. `polling_strategy_value()` returns the +/// associated offset or timestamp for the parameterized modes and `0` for the others. +/// +/// Internally, this value is passed across the Rust FFI as a kind/value pair. +/// Unsupported kinds are rejected by the Rust client. class PollingStrategy final { public: + /// Start polling from a specific message offset. static PollingStrategy offset(std::uint64_t value) { return PollingStrategy("offset", value); } + /// Start polling from a specific timestamp. static PollingStrategy timestamp(std::uint64_t value) { return PollingStrategy("timestamp", value); } + /// Start polling from the first message in the partition. static PollingStrategy first() { return PollingStrategy("first", 0); } + /// Start polling from the last message currently available in the partition. static PollingStrategy last() { return PollingStrategy("last", 0); } + /// Start polling from the next message after the stored consumer offset. + /// + /// This is typically used with automatic offset commits enabled. static PollingStrategy next() { return PollingStrategy("next", 0); } std::string_view polling_strategy_kind() const { return polling_strategy_kind_; } @@ -139,18 +249,4 @@ class PollingStrategy final { std::uint64_t polling_strategy_value_; }; -// TODO(slbotbm): Add rust bindings for Identifier that will use IdKind -class IdKind final { - public: - static IdKind numeric() { return IdKind("numeric"); } - static IdKind string() { return IdKind("string"); } - - std::string_view id_kind_value() const { return id_kind_; } - - private: - explicit IdKind(std::string id_kind) : id_kind_(std::move(id_kind)) {} - - std::string id_kind_; -}; - } // namespace iggy From 87123cd17341d396770b9cb41a24974ff876fc35 Mon Sep 17 00:00:00 2001 From: Rimuksh Kansal Date: Sat, 16 May 2026 16:58:24 +0900 Subject: [PATCH 5/5] address testing feedback --- foreign/cpp/tests/client/low_level_e2e.cpp | 55 +++++++++++++--------- 1 file changed, 33 insertions(+), 22 deletions(-) diff --git a/foreign/cpp/tests/client/low_level_e2e.cpp b/foreign/cpp/tests/client/low_level_e2e.cpp index 25941874d8..17fb57c532 100644 --- a/foreign/cpp/tests/client/low_level_e2e.cpp +++ b/foreign/cpp/tests/client/low_level_e2e.cpp @@ -19,9 +19,9 @@ // TODO(slbotbm): Add tests for join_consumer_group() and leave_consumer_group() #include -#include #include #include +#include #include #include @@ -196,7 +196,7 @@ TEST(LowLevelE2E_Client, GetStatsReturnsServerStats) { EXPECT_NE(empty_stats.process_id, 0u); EXPECT_GT(empty_stats.threads_count, 0u); EXPECT_GT(empty_stats.total_memory, 0u); - EXPECT_GE(empty_stats.available_memory, 0u); + EXPECT_LE(empty_stats.available_memory, empty_stats.total_memory); EXPECT_GE(empty_stats.total_disk_space, empty_stats.free_disk_space); EXPECT_FALSE(static_cast(empty_stats.hostname).empty()); EXPECT_FALSE(static_cast(empty_stats.os_name).empty()); @@ -204,10 +204,6 @@ TEST(LowLevelE2E_Client, GetStatsReturnsServerStats) { EXPECT_FALSE(static_cast(empty_stats.kernel_version).empty()); EXPECT_FALSE(static_cast(empty_stats.iggy_server_version).empty()); EXPECT_TRUE(!empty_stats.has_server_semver || empty_stats.iggy_server_semver > 0u); - EXPECT_EQ(empty_stats.streams_count, 0u); - EXPECT_EQ(empty_stats.topics_count, 0u); - EXPECT_EQ(empty_stats.partitions_count, 0u); - EXPECT_EQ(empty_stats.consumer_groups_count, 0u); }); ASSERT_NO_THROW(client->create_stream(first_stream_name)); @@ -256,11 +252,11 @@ TEST(LowLevelE2E_Client, GetStatsReturnsServerStats) { ASSERT_NO_THROW({ const auto stats = client->get_stats(); - EXPECT_EQ(stats.streams_count, 2u); - EXPECT_EQ(stats.topics_count, expected_topics_count); - EXPECT_EQ(stats.partitions_count, expected_partitions_count); - EXPECT_EQ(stats.segments_count, expected_partitions_count); - EXPECT_EQ(stats.consumer_groups_count, 3u); + EXPECT_EQ(stats.streams_count, empty_stats.streams_count + 2u); + EXPECT_EQ(stats.topics_count, empty_stats.topics_count + expected_topics_count); + EXPECT_EQ(stats.partitions_count, empty_stats.partitions_count + expected_partitions_count); + EXPECT_EQ(stats.segments_count, empty_stats.segments_count + expected_partitions_count); + EXPECT_EQ(stats.consumer_groups_count, empty_stats.consumer_groups_count + 3u); EXPECT_EQ(stats.clients_count, empty_stats.clients_count + 2u); EXPECT_EQ(first_group.partitions_count, first_topic_partitions); EXPECT_EQ(second_group.partitions_count, second_topic_partitions); @@ -276,11 +272,11 @@ TEST(LowLevelE2E_Client, GetStatsReturnsServerStats) { ASSERT_NO_THROW({ const auto stats = client->get_stats(); - EXPECT_EQ(stats.streams_count, 0u); - EXPECT_EQ(stats.topics_count, 0u); - EXPECT_EQ(stats.partitions_count, 0u); - EXPECT_EQ(stats.segments_count, 0u); - EXPECT_EQ(stats.consumer_groups_count, 0u); + EXPECT_EQ(stats.streams_count, empty_stats.streams_count); + EXPECT_EQ(stats.topics_count, empty_stats.topics_count); + EXPECT_EQ(stats.partitions_count, empty_stats.partitions_count); + EXPECT_EQ(stats.segments_count, empty_stats.segments_count); + EXPECT_EQ(stats.consumer_groups_count, empty_stats.consumer_groups_count); }); ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); @@ -482,10 +478,23 @@ TEST(LowLevelE2E_Client, GetClientBeforeLoginThrows) { TEST(LowLevelE2E_Client, GetClientWithWrongClientIdThrows) { RecordProperty("description", "Rejects querying invalid or non-existent client ids."); - const std::uint32_t wrong_client_ids[] = {0u, std::numeric_limits::max()}; - iggy::ffi::Client *client = login_to_server(); + iggy::ffi::Client *client = login_to_server(); ASSERT_NE(client, nullptr); + std::uint32_t non_existent_client_id = 1u; + ASSERT_NO_THROW({ + const auto clients = client->get_clients(); + std::unordered_set client_ids; + for (const auto &entry : clients) { + client_ids.insert(entry.client_id); + } + + while (client_ids.find(non_existent_client_id) != client_ids.end()) { + ++non_existent_client_id; + } + }); + + const std::uint32_t wrong_client_ids[] = {0u, non_existent_client_id}; for (const std::uint32_t wrong_client_id : wrong_client_ids) { SCOPED_TRACE(wrong_client_id); ASSERT_THROW(client->get_client(wrong_client_id), std::exception); @@ -755,12 +764,13 @@ TEST(LowLevelE2E_Client, PingSucceedsForNewConnection) { TEST(LowLevelE2E_Client, HeartbeatIntervalReturnsDefaultValueForNewConnection) { RecordProperty("description", "Returns the default heartbeat interval in microseconds for a fresh unauthenticated client."); - iggy::ffi::Client *client = nullptr; + constexpr std::uint64_t default_heartbeat_micros = 5'000'000ull; + iggy::ffi::Client *client = nullptr; ASSERT_NO_THROW({ client = iggy::ffi::new_connection(""); }); ASSERT_NE(client, nullptr); const auto heartbeat_interval = client->heartbeat_interval(); - EXPECT_EQ(heartbeat_interval, 5'000'000u); + EXPECT_EQ(heartbeat_interval, default_heartbeat_micros); ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); client = nullptr; @@ -769,12 +779,13 @@ TEST(LowLevelE2E_Client, HeartbeatIntervalReturnsDefaultValueForNewConnection) { TEST(LowLevelE2E_Client, HeartbeatIntervalReturnsConfiguredValueFromConnectionString) { RecordProperty("description", "Returns the configured heartbeat interval in microseconds from the connection string."); - iggy::ffi::Client *client = nullptr; + constexpr std::uint64_t configured_heartbeat_micros = 10'000'000ull; + iggy::ffi::Client *client = nullptr; ASSERT_NO_THROW({ client = iggy::ffi::new_connection("iggy://iggy:iggy@127.0.0.1:8090?heartbeat_interval=10s"); }); ASSERT_NE(client, nullptr); const auto heartbeat_interval = client->heartbeat_interval(); - EXPECT_EQ(heartbeat_interval, 10'000'000u); + EXPECT_EQ(heartbeat_interval, configured_heartbeat_micros); ASSERT_NO_THROW(iggy::ffi::delete_connection(client)); client = nullptr;