Skip to content

Commit f31f751

Browse files
committed
Enhance lossy queue semantics and testing
- Update changelog with new features and fixes - Improve README documentation for lossy semantics and loss detection - Implement loss detection in SlickQueue with debug support - Add tests for lossy overwrite behavior and loss detection - Modify CMake to enable loss detection in tests
1 parent bc08eea commit f31f751

6 files changed

Lines changed: 200 additions & 12 deletions

File tree

CHANGELOG

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,9 @@
11
# v1.2.3 - unreleased
2+
- Initialize reserved counters for local and shared-memory queues
3+
- Validate shared-memory size is power-of-two and element size matches when attaching
4+
- Add lossy semantics documentation and debug loss detection counter
5+
- Add lossy overwrite tests for in-process and shared-memory queues
6+
- Guard against zero-length reservations
27
- Add limits header
38

49
# v1.2.2 - 2026-01-10

README.md

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -207,19 +207,24 @@ SlickQueue(const char* shm_name); // Reader/Attacher
207207
208208
### Core Methods
209209
210-
- `uint64_t reserve(uint32_t n = 1)` - Reserve `n` slots for writing (blocks if queue is full)
210+
- `uint64_t reserve(uint32_t n = 1)` - Reserve `n` slots for writing (non-blocking; may overwrite old data if consumers lag)
211211
- `T* operator[](uint64_t slot)` - Access reserved slot
212212
- `void publish(uint64_t slot, uint32_t n = 1)` - Publish `n` written items to consumers
213213
- `std::pair<T*, uint32_t> read(uint64_t& cursor)` - Read next available item (independent cursor)
214214
- `std::pair<T*, uint32_t> read(std::atomic<uint64_t>& cursor)` - Read next available item (shared atomic cursor for work-stealing)
215215
- `T* read_last()` - Read the most recently published item without a cursor
216216
- `uint32_t size()` - Get queue capacity
217+
- `uint64_t loss_count() const` - Get count of skipped items due to overwrite (debug-only if enabled)
217218
- `void reset()` - Reset the queue, invalidating all existing data
218219
219220
### Important Constraints
220221
221222
**Lock-Free Atomics Implementation**: SlickQueue uses a packed 64-bit atomic internally to guarantee lock-free operations on all platforms. This packs both the write index (48 bits) and the reservation size (16 bits) into a single atomic value.
222223
224+
**Lossy Semantics**: SlickQueue does not apply backpressure. If producers advance by at least the queue size before a consumer reads, older entries will be overwritten and the consumer will skip ahead to the latest value for a slot. Size the queue and read frequency to bound loss.
225+
226+
**Debug Loss Detection**: Define `SLICK_QUEUE_ENABLE_LOSS_DETECTION=1` to enable a per-instance skipped-item counter (enabled by default in Debug builds). Use `loss_count()` to inspect how many items were skipped.
227+
223228
**⚠️ Reserve Size Limitation**: When using `read_last()`, the number of slots in any `reserve(n)` call **must not exceed 65,535** (2^16 - 1). This is because the size is stored in 16 bits within the packed atomic.
224229
225230
- For typical use cases with `reserve()` or `reserve(1)`, this limit is not a concern

include/slick/queue.h

Lines changed: 92 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#include <cassert>
1919
#include <thread>
2020
#include <chrono>
21+
#include <limits>
2122

2223
#if defined(_MSC_VER)
2324
#ifndef NOMINMAX
@@ -34,7 +35,15 @@
3435
#include <fcntl.h>
3536
#include <unistd.h>
3637
#include <cerrno>
37-
#include <limits>
38+
#endif
39+
40+
41+
#ifndef SLICK_QUEUE_ENABLE_LOSS_DETECTION
42+
#if !defined(NDEBUG)
43+
#define SLICK_QUEUE_ENABLE_LOSS_DETECTION 1
44+
#else
45+
#define SLICK_QUEUE_ENABLE_LOSS_DETECTION 0
46+
#endif
3847
#endif
3948

4049
namespace slick {
@@ -44,6 +53,7 @@ namespace slick {
4453
*
4554
* This queue allows a multiple producer thread to write data and a multiple consumer thread to read data concurrently without locks.
4655
* It can optionally use shared memory for inter-process communication.
56+
* This queue is lossy: if producers outrun consumers, older data may be overwritten.
4757
*
4858
* @tparam T The type of elements stored in the queue.
4959
*/
@@ -61,7 +71,10 @@ class SlickQueue {
6171
T* data_ = nullptr;
6272
slot* control_ = nullptr;
6373
std::atomic<reserved_info>* reserved_ = nullptr;
64-
std::atomic<reserved_info> reserved_local_;
74+
std::atomic<reserved_info> reserved_local_{0};
75+
#if SLICK_QUEUE_ENABLE_LOSS_DETECTION
76+
std::atomic<uint64_t> loss_count_{0};
77+
#endif
6578
bool own_ = false;
6679
bool use_shm_ = false;
6780
#if defined(_MSC_VER)
@@ -75,6 +88,10 @@ class SlickQueue {
7588

7689
static constexpr uint32_t HEADER_SIZE = 64;
7790

91+
static constexpr bool is_power_of_two(uint32_t value) noexcept {
92+
return value != 0 && ((value & (value - 1)) == 0);
93+
}
94+
7895
public:
7996
/**
8097
* @brief Construct a new SlickQueue object
@@ -87,16 +104,20 @@ class SlickQueue {
87104
*/
88105
SlickQueue(uint32_t size, const char* const shm_name = nullptr)
89106
: size_(size)
90-
, mask_(size - 1)
91-
, data_(shm_name ? nullptr : new T[size_])
92-
, control_(shm_name ? nullptr : new slot[size_])
93-
, reserved_(shm_name ? nullptr : &reserved_local_)
107+
, mask_(size ? size - 1 : 0)
94108
, own_(shm_name == nullptr)
95109
, use_shm_(shm_name != nullptr)
96110
{
97-
assert((size && !(size & (size - 1))) && "size must power of 2");
111+
if (!is_power_of_two(size_)) {
112+
throw std::invalid_argument("size must power of 2");
113+
}
98114
if (shm_name) {
99115
allocate_shm_data(shm_name, false);
116+
} else {
117+
reserved_ = &reserved_local_;
118+
reserved_->store(0, std::memory_order_relaxed);
119+
data_ = new T[size_];
120+
control_ = new slot[size_];
100121
}
101122
}
102123

@@ -108,7 +129,9 @@ class SlickQueue {
108129
* @throws std::runtime_error if shared memory allocation fails or the segment does not exist.
109130
*/
110131
SlickQueue(const char* const shm_name)
111-
: own_(false)
132+
: size_(0)
133+
, mask_(0)
134+
, own_(false)
112135
, use_shm_(true)
113136
{
114137
allocate_shm_data(shm_name, true);
@@ -167,6 +190,19 @@ class SlickQueue {
167190
*/
168191
constexpr uint32_t size() const noexcept { return size_; }
169192

193+
194+
/**
195+
* @brief Get the number of items skipped due to overwrite (debug-only if enabled).
196+
* @return Count of skipped items observed by this queue instance.
197+
*/
198+
uint64_t loss_count() const noexcept {
199+
#if SLICK_QUEUE_ENABLE_LOSS_DETECTION
200+
return loss_count_.load(std::memory_order_relaxed);
201+
#else
202+
return 0;
203+
#endif
204+
}
205+
170206
/**
171207
* @brief Get the initial reading index, which is 0 if the queue is newly created or the current writing index if opened existing
172208
* @return Initial reading index
@@ -181,6 +217,9 @@ class SlickQueue {
181217
* @return The starting index of the reserved space
182218
*/
183219
uint64_t reserve(uint32_t n = 1) {
220+
if (n == 0) [[unlikely]] {
221+
throw std::invalid_argument("required size must be > 0");
222+
}
184223
if (n > size_) [[unlikely]] {
185224
throw std::runtime_error("required size " + std::to_string(n) + " > queue size " + std::to_string(size_));
186225
}
@@ -257,7 +296,13 @@ class SlickQueue {
257296
// queue has been reset
258297
read_index = 0;
259298
}
260-
299+
300+
#if SLICK_QUEUE_ENABLE_LOSS_DETECTION
301+
if (index != std::numeric_limits<uint64_t>::max() && index > read_index && ((index & mask_) == idx)) {
302+
loss_count_.fetch_add(index - read_index, std::memory_order_relaxed);
303+
}
304+
#endif
305+
261306
if (index == std::numeric_limits<uint64_t>::max() || index < read_index) {
262307
// data not ready yet
263308
return std::make_pair(nullptr, 0);
@@ -300,7 +345,15 @@ class SlickQueue {
300345
// data not ready yet
301346
return std::make_pair(nullptr, 0);
302347
}
303-
else if (index > current_index && ((index & mask_) != idx)) {
348+
349+
#if SLICK_QUEUE_ENABLE_LOSS_DETECTION
350+
uint64_t overrun = 0;
351+
if (index > current_index && ((index & mask_) == idx)) {
352+
overrun = index - current_index;
353+
}
354+
#endif
355+
356+
if (index > current_index && ((index & mask_) != idx)) {
304357
// queue wrapped, skip the unused slots
305358
read_index.compare_exchange_weak(current_index, index, std::memory_order_release, std::memory_order_relaxed);
306359
continue;
@@ -309,6 +362,11 @@ class SlickQueue {
309362
// Try to atomically claim this item
310363
uint64_t next_index = index + current_slot->size;
311364
if (read_index.compare_exchange_weak(current_index, next_index, std::memory_order_release, std::memory_order_relaxed)) {
365+
#if SLICK_QUEUE_ENABLE_LOSS_DETECTION
366+
if (overrun != 0) {
367+
loss_count_.fetch_add(overrun, std::memory_order_relaxed);
368+
}
369+
#endif
312370
// Successfully claimed the item
313371
return std::make_pair(&data_[current_index & mask_], current_slot->size);
314372
}
@@ -343,6 +401,9 @@ class SlickQueue {
343401
control_ = new slot[size_];
344402
}
345403
reserved_->store(0, std::memory_order_release);
404+
#if SLICK_QUEUE_ENABLE_LOSS_DETECTION
405+
loss_count_.store(0, std::memory_order_relaxed);
406+
#endif
346407
}
347408

348409
private:
@@ -395,6 +456,9 @@ class SlickQueue {
395456
if (element_size != sizeof(T)) {
396457
throw std::runtime_error("Shared memory element size mismatch. Expected " + std::to_string(sizeof(T)) + " but got " + std::to_string(element_size));
397458
}
459+
if (!is_power_of_two(size_)) {
460+
throw std::runtime_error("Shared memory size must be power of 2. Got " + std::to_string(size_));
461+
}
398462
mask_ = size_ - 1;
399463
BF_SZ = HEADER_SIZE + sizeof(slot) * size_ + sizeof(T) * size_;
400464
UnmapViewOfFile(lpvMem);
@@ -462,6 +526,7 @@ class SlickQueue {
462526

463527
if (own_) {
464528
reserved_ = new (lpvMem_) std::atomic<reserved_info>();
529+
reserved_->store(0, std::memory_order_relaxed);
465530
// save queue size
466531
*reinterpret_cast<uint32_t*>(reinterpret_cast<uint8_t*>(lpvMem_) + sizeof(std::atomic<reserved_info>)) = size_;
467532
// save element size
@@ -501,11 +566,16 @@ class SlickQueue {
501566
throw std::runtime_error("Failed to map shm for size read. err=" + std::to_string(errno));
502567
}
503568
uint32_t shm_size = *reinterpret_cast<uint32_t*>(reinterpret_cast<uint8_t*>(temp_map) + sizeof(std::atomic<reserved_info>));
569+
uint32_t element_size = *reinterpret_cast<uint32_t*>(reinterpret_cast<uint8_t*>(temp_map) + sizeof(std::atomic<reserved_info>) + sizeof(shm_size));
504570
munmap(temp_map, HEADER_SIZE);
505571

506572
if (shm_size != size_) {
507573
throw std::runtime_error("Shared memory size mismatch. Expected " + std::to_string(size_) + " but got " + std::to_string(shm_size));
508574
}
575+
576+
if (element_size != sizeof(T)) {
577+
throw std::runtime_error("Shared memory element size mismatch. Expected " + std::to_string(sizeof(T)) + " but got " + std::to_string(element_size));
578+
}
509579
} else {
510580
throw std::runtime_error("Failed to open/create shm. err=" + std::to_string(errno));
511581
}
@@ -520,8 +590,18 @@ class SlickQueue {
520590
throw std::runtime_error("Failed to map shm for size read. err=" + std::to_string(errno));
521591
}
522592
size_ = *reinterpret_cast<uint32_t*>(reinterpret_cast<uint8_t*>(temp_map) + sizeof(std::atomic<reserved_info>));
523-
mask_ = size_ - 1;
593+
uint32_t element_size = *reinterpret_cast<uint32_t*>(reinterpret_cast<uint8_t*>(temp_map) + sizeof(std::atomic<reserved_info>) + sizeof(size_));
524594
munmap(temp_map, HEADER_SIZE);
595+
596+
if (!is_power_of_two(size_)) {
597+
throw std::runtime_error("Shared memory size must be power of 2. Got " + std::to_string(size_));
598+
}
599+
600+
if (element_size != sizeof(T)) {
601+
throw std::runtime_error("Shared memory element size mismatch. Expected " + std::to_string(sizeof(T)) + " but got " + std::to_string(element_size));
602+
}
603+
604+
mask_ = size_ - 1;
525605
}
526606

527607
BF_SZ = HEADER_SIZE + sizeof(slot) * size_ + sizeof(T) * size_;
@@ -539,6 +619,7 @@ class SlickQueue {
539619

540620
if (own_) {
541621
reserved_ = new (lpvMem_) std::atomic<reserved_info>();
622+
reserved_->store(0, std::memory_order_relaxed);
542623
*reinterpret_cast<uint32_t*>(reinterpret_cast<uint8_t*>(lpvMem_) + sizeof(std::atomic<reserved_info>)) = size_;
543624
*reinterpret_cast<uint32_t*>(reinterpret_cast<uint8_t*>(lpvMem_) + sizeof(std::atomic<reserved_info>) + sizeof(size_)) = sizeof(T);
544625
control_ = new ((uint8_t*)lpvMem_ + HEADER_SIZE) slot[size_];

tests/CMakeLists.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,8 @@ add_executable(slick-queue-tests tests.cpp shm_tests.cpp)
1616

1717
target_link_libraries(slick-queue-tests PRIVATE slick::queue GTest::gtest_main)
1818

19+
# Enable loss detection in tests
20+
target_compile_definitions(slick-queue-tests PRIVATE SLICK_QUEUE_ENABLE_LOSS_DETECTION=1)
21+
1922
include(GoogleTest)
2023
gtest_discover_tests(slick-queue-tests)

tests/shm_tests.cpp

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,43 @@ TEST(ShmTests, AtomicCursorWorkStealing) {
150150
EXPECT_EQ(shared_cursor.load(), 100);
151151
}
152152

153+
TEST(ShmTests, LossyOverwriteSkipsOldData) {
154+
SlickQueue<int> server(2, "sq_lossy_overwrite");
155+
SlickQueue<int> client("sq_lossy_overwrite");
156+
157+
auto s0 = server.reserve();
158+
*server[s0] = 10;
159+
server.publish(s0);
160+
161+
auto s1 = server.reserve();
162+
*server[s1] = 20;
163+
server.publish(s1);
164+
165+
auto s2 = server.reserve();
166+
*server[s2] = 30;
167+
server.publish(s2);
168+
169+
uint64_t read_cursor = 0;
170+
auto read = client.read(read_cursor);
171+
EXPECT_NE(read.first, nullptr);
172+
EXPECT_EQ(*read.first, 30);
173+
EXPECT_EQ(read_cursor, 3);
174+
175+
#if SLICK_QUEUE_ENABLE_LOSS_DETECTION
176+
EXPECT_EQ(client.loss_count(), 2u);
177+
#endif
178+
179+
read = client.read(read_cursor);
180+
EXPECT_EQ(read.first, nullptr);
181+
}
182+
183+
TEST(ShmTests, ElementSizeMismatch) {
184+
SlickQueue<int> server(4, "sq_element_mismatch");
185+
EXPECT_THROW({
186+
SlickQueue<double> client("sq_element_mismatch");
187+
}, std::runtime_error);
188+
}
189+
153190
TEST(ShmTests, SizeMismatch) {
154191
// Create a shared memory queue with size 4
155192
SlickQueue<int> server(4, "sq_size_mismatch");

0 commit comments

Comments
 (0)