diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml new file mode 100644 index 0000000..2214848 --- /dev/null +++ b/.github/workflows/tests.yaml @@ -0,0 +1,100 @@ +name: Queue Tests + +on: + push: + pull_request: + +jobs: + test: + runs-on: ubuntu-latest + + steps: + - name: Checkout repo + uses: actions/checkout@v3 + with: + fetch-depth: 0 + + # 🔹 Detect changed files + - name: Detect changes + id: changes + run: | + if [ "${{ github.event_name }}" = "pull_request" ]; then + BASE="origin/main" + git fetch origin main + else + BASE="${{ github.event.before }}" + fi + + echo "Base: $BASE" + echo "Head: ${{ github.sha }}" + + CHANGED_FILES=$(git diff --name-only $BASE ${{ github.sha }}) + + echo "Changed files:" + echo "$CHANGED_FILES" + + # SPSC (bounded + unbounded) + if echo "$CHANGED_FILES" | grep -E "lockfree_spsc"; then + echo "run_spsc=true" >> $GITHUB_OUTPUT + fi + + # MPSC + if echo "$CHANGED_FILES" | grep -E "lockfree_mpsc"; then + echo "run_mpsc=true" >> $GITHUB_OUTPUT + fi + + # MPMC + if echo "$CHANGED_FILES" | grep -E "lockfree_mpmc|blocking_mpmc"; then + echo "run_mpmc=true" >> $GITHUB_OUTPUT + fi + + # If tests themselves change → run corresponding + if echo "$CHANGED_FILES" | grep -E "test_spsc.cpp"; then + echo "run_spsc=true" >> $GITHUB_OUTPUT + fi + + if echo "$CHANGED_FILES" | grep -E "test_mpsc.cpp"; then + echo "run_mpsc=true" >> $GITHUB_OUTPUT + fi + + if echo "$CHANGED_FILES" | grep -E "test_mpmc.cpp"; then + echo "run_mpmc=true" >> $GITHUB_OUTPUT + fi + + # 🔹 Install deps + - name: Install dependencies + run: | + sudo apt-get update + sudo apt-get install -y cmake g++ libgtest-dev + + # 🔹 Build GTest + - name: Build GTest + run: | + cd /usr/src/gtest + sudo cmake . + sudo make + sudo cp lib/*.a /usr/lib + + # 🔹 Build project + - name: Build project + run: | + rm -rf build + cmake -B build + cmake --build build + + # 🔹 Selective tests + - name: Run SPSC tests + if: steps.changes.outputs.run_spsc == 'true' + run: cd build && ./test_spsc + + - name: Run MPSC tests + if: steps.changes.outputs.run_mpsc == 'true' + run: cd build && ./test_mpsc + + - name: Run MPMC tests + if: steps.changes.outputs.run_mpmc == 'true' + run: cd build && ./test_mpmc + + # 🔹 Safety net (always runs) + # - name: Run full test suite (safety) + # run: cd build && ctest --output-on-failure \ No newline at end of file diff --git a/CMakeLists.txt b/CMakeLists.txt new file mode 100644 index 0000000..958fb15 --- /dev/null +++ b/CMakeLists.txt @@ -0,0 +1,25 @@ +cmake_minimum_required(VERSION 3.14) +project(ThreadsafeQueueLib) + +set(CMAKE_CXX_STANDARD 17) +enable_testing() + +find_package(GTest REQUIRED) +include_directories(include) + +# SPSC tests +add_executable(test_spsc tests/test_spsc.cpp) +target_link_libraries(test_spsc GTest::GTest GTest::Main pthread) + +# MPSC tests +add_executable(test_mpsc tests/test_mpsc.cpp) +target_link_libraries(test_mpsc GTest::GTest GTest::Main pthread) + +# MPMC tests +add_executable(test_mpmc tests/test_mpmc.cpp) +target_link_libraries(test_mpmc GTest::GTest GTest::Main pthread) + +# Register tests +add_test(NAME SPSC COMMAND test_spsc) +add_test(NAME MPSC COMMAND test_mpsc) +add_test(NAME MPMC COMMAND test_mpmc) \ No newline at end of file diff --git a/include/FAST_lockfree_spsc_unbounded/block.hpp b/include/FAST_lockfree_spsc_unbounded/block.hpp index e69de29..6c14a69 100644 --- a/include/FAST_lockfree_spsc_unbounded/block.hpp +++ b/include/FAST_lockfree_spsc_unbounded/block.hpp @@ -0,0 +1,61 @@ +#ifndef FAST_LOCKFREE_SPSC_UNBOUNDED_BLOCK +#define FAST_LOCKFREE_SPSC_UNBOUNDED_BLOCK + +#include "../utils.hpp" +#include "slots.hpp" +#include + +namespace tsfqueue::impl{ + template + class Block_FAST{ + private: + using Slot_FAST = tsfqueue::FAST::Slot_FAST; + std::size_t capacity; + std::size_t mask; + char* raw; + char* data; + std::unique_ptr slots; + template + void inner_enqueue(U&& data){ + + } + public: + Block_FAST(std::size_t cap){ + // Initialize by bringing it to the closest power of 2 + // And then make mask + slots(static_cast(cap)); + } + ~Block_FAST(){ + // Make destructor + } + Block_FAST(const Block_FAST& other) = delete; + Block_FAST& operator=(const Block_FAST& other) = delete; + Block_FAST(Block_FAST&& other){ + // Move constructor + } + template + bool try_enqueue(U&& data){ + // Try to check if can push + if(slots->try_get()){ + inner_enqueue(std::forward(data)); + return true; + } + return false; + } + template + void wait_enqueue(U&& data){ + slots->wait_and_get(); + inner_enqueue(std::forward(data)); + } + template + bool wait_enqueue_timed(U&& data, std::int64_t time_usecs){ + if(slots->timed_get(time_usecs)){ + inner_enqueue(std::forward(data)); + return true; + } + return false; + } + }; +} + +#endif \ No newline at end of file diff --git a/include/FAST_lockfree_spsc_unbounded/slots.hpp b/include/FAST_lockfree_spsc_unbounded/slots.hpp new file mode 100644 index 0000000..fb7ed72 --- /dev/null +++ b/include/FAST_lockfree_spsc_unbounded/slots.hpp @@ -0,0 +1,106 @@ +#ifndef FAST_LOCKFREE_SPSC_UNBOUNDED_SLOT +#define FAST_LOCKFREE_SPSC_UNBOUNDED_SLOT + +#include "../utils.hpp" +#include + +namespace tsfqueue::FAST{ +#if defined(__MACH__) +#include + class Semaphore_FAST{ + private: + semaphore_t sema; + Semaphore_FAST(const Semaphore& other) = delete; + Semaphore_FAST(Semaphore&& other) = delete; + public: + Semaphore_FAST(int count = 0){ + assert(count >= 0); + kern_return_t ret = semaphore_create(mach_task_self(), &sema, SYNC_POLICY_FIFO, count); + assert(ret == KERN_SUCCESS); + } + ~Semaphore_FAST(){ + semaphore_destroy(mach_task_self(), sema); + } + bool try_get(){ + return timed_get(0); + } + bool timed_get(std::uint64_t time_usecs){ + mach_timespec_t time; + time.tv_sec = static_cast(time_usecs / 1000000); + time.tv_nsec = static_cast((time_usecs % 1000000) * 1000); + kern_return_t ret = semaphore_timedwait(sema, time); + return ret == KERN_SUCCESS; + } + void wait_and_get(){ + semaphore_wait(sema); + } + void signal(int times = 1){ + while(times--) + while(semaphore_signal(sema) != KERN_SUCCESS); + } + }; +#endif + + + class Slot_FAST{ + private: + std::atomic counter; + Semaphore_FAST sema; + static constexpr int SPIN_COUNT = 1024; // Change this for benchmakrs & set to best possible + inline bool hot_path(){ + for (int i = 0; i < SPIN_COUNT; i++){ + if(counter.load(std::memory_order_acquire) > 0){ + counter.fetch_sub(1); + return true; + } + } + return false; + } + bool get_with_sleep(std::int64_t time_usecs = -1){ + if(hot_path()) return true; + counter.fetch_sub(1); + if(time_usecs < 0){ + sema.wait_and_get() + return true; + } + if(sema.timed_get(static_cast(time_usecs))){ + return true; + } + + // Restore the semaphore + // Signal happened just after timeout expired + // Thus we add a while loop which keeps checking until all boundary conditions + // are gone. Super clever. + while(true){ + int old = counter.fetch_add(1); + if(old < 0) return false; // Restored successfully + old = counter.fetch_sub(1); + if(old > 0 && sema.try_get()){ + return true; + } + } + } + public: + Slot_FAST(int count) : counter(count), sema(0) {} + bool try_get(){ + // Ok since SPSC so counter cannot be decremented between load and fetch_sub + if(counter.load(std::memory_order_acquire) > 0){ + counter.fetch_sub(1); + return true; + } + return false; + } + bool timed_get(std::int64_t time_usecs){ + return get_with_sleep(time_usecs); + } + void wait_and_get(){ + return get_with_sleep(); + } + void signal(int times = 1){ + int old = counter.fetch_add(1); + if(old < 0) sema.signal(); + } + } +}; + +#endif \ No newline at end of file diff --git a/include/blocking_mpmc_unbounded/defs.hpp b/include/blocking_mpmc_unbounded/defs.hpp index cc6ec05..9eafb11 100644 --- a/include/blocking_mpmc_unbounded/defs.hpp +++ b/include/blocking_mpmc_unbounded/defs.hpp @@ -2,13 +2,13 @@ #define BLOCKING_MPMC_UNBOUNDED_DEFS #include "../utils.hpp" +#include #include #include #include -#include #include -namespace tsfqueue::__impl { +namespace tsfqueue::impl { template class blocking_mpmc_unbounded { // For the implementation, we start with a stub node and both head and tail // are initialized to it. When we push, we make a new stub node, move the data @@ -19,7 +19,7 @@ template class blocking_mpmc_unbounded { // by returning the data stored in head node and replacing head to its next // node. We handle the empty queue gracefully as per the pop type. private: - using node = tsfqueue::__utils::Node; + using node = tsfqueue::utils::Node; // Add private members : std::mutex head_mutex; @@ -27,7 +27,8 @@ template class blocking_mpmc_unbounded { std::mutex tail_mutex; node *tail; size_t size_q; // newly added -> to maintain size of the queue. - std::mutex size_mutex; // newly added -> lock for accessing or modifying size_q. + std::mutex + size_mutex; // newly added -> lock for accessing or modifying size_q. std::condition_variable cond; // Description of private members : @@ -57,17 +58,19 @@ template class blocking_mpmc_unbounded { std::unique_ptr wait_and_get(); std::unique_ptr try_get(); - std::unique_ptr wait_for_and_get(std::chrono::milliseconds); // Added New + std::unique_ptr + wait_for_and_get(std::chrono::milliseconds); // Added New - // node *get_tail() : Helper function to get normal pointer to tail at a - removed as not used - // particular instant std::unique_ptr wait_and_get() : Helper function to - // blocking wait on unique_ptr of head after popping std::unique_ptr try_get() - // : Helper function to try to get unique_ptr of head after popping + // node *get_tail() : Helper function to get normal pointer to tail at a - + // removed as not used particular instant std::unique_ptr wait_and_get() : + // Helper function to blocking wait on unique_ptr of head after popping + // std::unique_ptr try_get() : Helper function to try to get unique_ptr of + // head after popping public: // Public member functions : // Add relevant constructors and destructors -> Add these here only - blocking_mpmc_unbounded(){ + blocking_mpmc_unbounded() { static_assert(!std::is_reference_v, "Queue cannot store reference types."); @@ -84,26 +87,27 @@ template class blocking_mpmc_unbounded { std::is_destructible_v, "Unable to destroy the queue, as the given type is not destructable."); - while(head) - { + while (head) { head = std::move(head->next); } } - - // Removed Copy constrcutor, because we can't copy this queue, as it has pointers to memory locations. - blocking_mpmc_unbounded(const blocking_mpmc_unbounded& other) = delete; - blocking_mpmc_unbounded& operator=(const blocking_mpmc_unbounded& other) = delete; - + + // Removed Copy constrcutor, because we can't copy this queue, as it has + // pointers to memory locations. + blocking_mpmc_unbounded(const blocking_mpmc_unbounded &other) = delete; + blocking_mpmc_unbounded & + operator=(const blocking_mpmc_unbounded &other) = delete; + // Removed Move constrcutor, because mutexes are not movable. - blocking_mpmc_unbounded(blocking_mpmc_unbounded&& other) = delete; - blocking_mpmc_unbounded& operator=(blocking_mpmc_unbounded&& other) = delete; + blocking_mpmc_unbounded(blocking_mpmc_unbounded &&other) = delete; + blocking_mpmc_unbounded &operator=(blocking_mpmc_unbounded &&other) = delete; // 1. void push(value) : Pushes the value inside the queue, copies the value void push(T); // 2. void wait_and_pop(value ref) : Blocking wait on queue, returns value in // the reference passed as parameter - void wait_and_pop(T&); + void wait_and_pop(T &); // 3. std::shared_ptr wait_and_pop(void) : Blocking wait on queue, returns // value as a shared ptr allocated inside the call @@ -111,7 +115,7 @@ template class blocking_mpmc_unbounded { // 4. bool try_pop(value ref) : Returns true and gives the value in reference // passed, false otherwise - bool try_pop(T&); + bool try_pop(T &); // 5. std::shared_ptr try_pop() : Returns a shared ptr with data, returns // nullptr if failed @@ -124,8 +128,7 @@ template class blocking_mpmc_unbounded { // 8. Add emplace_back using perfect forwarding and variadic templates (you // can use this in push then) - template - void emplace_back(Args&&... args); + template void emplace_back(Args &&...args); // 9. Add size() function size_t size(); @@ -140,14 +143,13 @@ template class blocking_mpmc_unbounded { std::shared_ptr unsafe_peek(); // wait_for_get() added to private section. - bool wait_for_and_pop(T&, std::chrono::milliseconds); + bool wait_for_and_pop(T &, std::chrono::milliseconds); std::shared_ptr wait_for_and_pop(std::chrono::milliseconds); // wait_for_and_get() added to private section. void clear(); // clears all the elements in the queue. - }; -} // namespace tsfqueue::__impl +} // namespace tsfqueue::impl #endif diff --git a/include/blocking_mpmc_unbounded/impl.hpp b/include/blocking_mpmc_unbounded/impl.hpp index 55d8d3f..69156ff 100644 --- a/include/blocking_mpmc_unbounded/impl.hpp +++ b/include/blocking_mpmc_unbounded/impl.hpp @@ -3,15 +3,10 @@ #include "defs.hpp" -template -using queue = tsfqueue::__impl::blocking_mpmc_unbounded; - -template -using node = tsfqueue::__utils::Node; - -template void queue::push(T value) { +namespace tsfqueue::impl { +template void blocking_mpmc_unbounded::push(T value) { - static_assert(std::is_copy_constructible_ || + static_assert(std::is_copy_constructible_v || std::is_move_constructible_v, "T must be copyable or movable to be pushed into the queue."); @@ -38,63 +33,60 @@ template void queue::push(T value) { std::lock_guard guard_size_mutex(size_mutex); size_q++; } // added this scope because if we notify a thread before unlocking - // size_mutex, in - // the wait_and_pop() function we are checking empty() which requires size_mutex. + // size_mutex, in + // the wait_and_pop() function we are checking empty() which requires + // size_mutex. - // Notify any thread (if any) waiting in "wait_and_pop" to wake up and pop. - cond.notify_one(); + // Notify any thread (if any) waiting in "wait_and_pop" to wake up and pop. + cond.notify_one(); - return; + return; } - template -std::unique_ptr::node> queue::wait_and_get() { - //Locking the head mutex - std::unique_lock head_lock(head_mutex); - - //Waiting for the queue to not be empty - cond.wait(head_lock, [this]{ - return !empty(); - }); - - //Extracting the head node and updating it - std::unique_ptr old_head = std::move(head); - head = std::move(old_head->next); - - //Updating the queue size - { - std::lock_guard size_lock(size_mutex); - size_q--; - } - - return std::move(old_head); +std::unique_ptr::node> blocking_mpmc_unbounded::wait_and_get() { + // Locking the head mutex + std::unique_lock head_lock(head_mutex); + + // Waiting for the queue to not be empty + cond.wait(head_lock, [this] { return !empty(); }); + + // Extracting the head node and updating it + std::unique_ptr old_head = std::move(head); + head = std::move(old_head->next); + + // Updating the queue size + { + std::lock_guard size_lock(size_mutex); + size_q--; + } + + return std::move(old_head); } -template -std::unique_ptr::node> queue::try_get() { - std::lock_guard guard_head_mutex(head_mutex); - if (size() > 0){ - std::unique_ptr removing_node = std::move(head); - head = std::move(removing_node->next); +template +std::unique_ptr::node> blocking_mpmc_unbounded::try_get() { + std::lock_guard guard_head_mutex(head_mutex); + if (size() > 0) { + std::unique_ptr removing_node = std::move(head); + head = std::move(removing_node->next); - std::lock_guard guard_size_mutex(size_mutex); - size_q--; + std::lock_guard guard_size_mutex(size_mutex); + size_q--; - return std::move(removing_node); - } - - return nullptr; + return std::move(removing_node); + } + + return nullptr; } -template -size_t queue::size(){ - std::lock_guard lock_size(size_mutex); +template size_t blocking_mpmc_unbounded::size() { + std::lock_guard lock_size(size_mutex); - return size_q; + return size_q; } -template void queue::wait_and_pop(T &value) { +template void blocking_mpmc_unbounded::wait_and_pop(T &value) { static_assert(std::is_copy_assignable_v || std::is_move_assignable_v, "T must be copy-assignable or move-assignable to be popped " @@ -108,17 +100,17 @@ template void queue::wait_and_pop(T &value) { // pointer issue with peek() } -template std::shared_ptr queue::wait_and_pop() { +template +std::shared_ptr blocking_mpmc_unbounded::wait_and_pop() { - //Obtain the popped_node with the help of wait_and_get() - std::unique_ptr popped_node = wait_and_get(); + // Obtain the popped_node with the help of wait_and_get() + std::unique_ptr popped_node = wait_and_get(); - //Return the shared pointer of the data - return popped_node->data; + // Return the shared pointer of the data + return popped_node->data; } -template -bool queue::try_pop(T &value) { +template bool blocking_mpmc_unbounded::try_pop(T &value) { static_assert(std::is_copy_assignable_v || std::is_move_assignable_v, "T must be copy-assignable or move-assignable to be popped " @@ -134,48 +126,50 @@ bool queue::try_pop(T &value) { } } -template -std::shared_ptr queue::try_pop() { - std::unique_ptr removed_node = try_get(); - if (removed_node == nullptr){ - return nullptr; - }else{ - return removed_node->data; - } +template std::shared_ptr blocking_mpmc_unbounded::try_pop() { + std::unique_ptr removed_node = try_get(); + if (removed_node == nullptr) { + return nullptr; + } else { + return removed_node->data; + } } -template bool queue::empty() { - std::lock_guard lock_size(size_mutex); - - return (size_q == 0); +template bool blocking_mpmc_unbounded::empty() { + std::lock_guard lock_size(size_mutex); + + return (size_q == 0); } template template -void queue::emplace_back(Args&&... args){ - // Create a new tail node. - std::unique_ptr new_tail_unique_ptr = std::make_unique(); +void blocking_mpmc_unbounded::emplace_back(Args &&...args) { + // Create a new tail node. + std::unique_ptr new_tail_unique_ptr = std::make_unique(); - // Emplace the data directly at the memory address of shared_ptr. (Perfect forwarding) - std::shared_ptr shared_ptr_for_value = std::make_shared(std::forward(args)...); + // Emplace the data directly at the memory address of shared_ptr. (Perfect + // forwarding) + std::shared_ptr shared_ptr_for_value = + std::make_shared(std::forward(args)...); - // Get exclusive axcess [Do it aftere non-critical tasks] - std::lock_guard guard_tail_mutex(tail_mutex); + // Get exclusive axcess [Do it aftere non-critical tasks] + std::lock_guard guard_tail_mutex(tail_mutex); - tail->data = std::move(shared_ptr_for_value); - tail->next = std::move(new_tail_unique_ptr); + tail->data = std::move(shared_ptr_for_value); + tail->next = std::move(new_tail_unique_ptr); - // change the tail. - tail = tail->next.get(); + // change the tail. + tail = tail->next.get(); - // Increment size [Doing this at the end so that consumer thread does not interfer with this operation.] - std::lock_guard guard_size_mutex(size_mutex); - size_q++; + // Increment size [Doing this at the end so that consumer thread does not + // interfer with this operation.] + std::lock_guard guard_size_mutex(size_mutex); + size_q++; - return; + return; } -template bool queue::unsafe_peek(T &value) { +template bool blocking_mpmc_unbounded::unsafe_peek(T &value) { static_assert(std::is_copy_assignable_v || std::is_move_assignable_v, "T must be copy-assignable or move-assignable to be peeked " @@ -200,7 +194,8 @@ template bool queue::unsafe_peek(T &value) { return 0; } -template std::shared_ptr queue::unsafe_peek() { +template +std::shared_ptr blocking_mpmc_unbounded::unsafe_peek() { // Get exclusive axcess of head. std::lock_guard guard_head_mutex(head_mutex); @@ -221,78 +216,76 @@ template std::shared_ptr queue::unsafe_peek() { } template -std::unique_ptr::node> queue::wait_for_and_get(std::chrono::milliseconds timeout) -{ - // Using unique_lock to lock and unlock on our will. - std::unique_lock lock_head(head_mutex); - - // Waiting at max until timeout ms of time. - if(!cond.wait_for(lock_head,timeout,[this](){ - bool flag=empty(); +std::unique_ptr::node> blocking_mpmc_unbounded::wait_for_and_get( + std::chrono::milliseconds timeout) { + // Using unique_lock to lock and unlock on our will. + std::unique_lock lock_head(head_mutex); + + // Waiting at max until timeout ms of time. + if (!cond.wait_for(lock_head, timeout, [this]() { + bool flag = empty(); return !flag; - })) - { - return nullptr; - } + })) { + return nullptr; + } - std::unique_ptr new_head = std::move(head->next); - std::unique_ptr return_node = std::move(head); + std::unique_ptr new_head = std::move(head->next); + std::unique_ptr return_node = std::move(head); - head = std::move(new_head); + head = std::move(new_head); - { - std::lock_guard lock_size(size_mutex); - size_q--; - } + { + std::lock_guard lock_size(size_mutex); + size_q--; + } - return std::move(return_node); + return std::move(return_node); } template -std::shared_ptr queue::wait_for_and_pop(std::chrono::milliseconds timeout) -{ - std::unique_ptr::node> return_node = std::move(wait_for_and_get(timeout)); - if (return_node == nullptr) - { - return nullptr; - } - - return return_node->data; +std::shared_ptr blocking_mpmc_unbounded::wait_for_and_pop( + std::chrono::milliseconds timeout) { + std::unique_ptr::node> return_node = + std::move(wait_for_and_get(timeout)); + if (return_node == nullptr) { + return nullptr; + } + + return return_node->data; } template -bool queue::wait_for_and_pop(T &value,std::chrono::milliseconds timeout) -{ +bool blocking_mpmc_unbounded::wait_for_and_pop( + T &value, std::chrono::milliseconds timeout) { static_assert(std::is_copy_assignable_v || std::is_move_assignable_v, "T must be copy-assignable or move-assignable to be popped " "into a reference."); - std::unique_ptr::node> return_node = + std::unique_ptr::node> return_node = std::move(wait_for_and_get(timeout)); if (return_node == nullptr) { return false; } - value = *(return_node->data); + value = *(return_node->data); - return true; + return true; } -template -void queue::clear() -{ - std::lock_guard lock_head(head_mutex); - std::lock_guard lock_tail(tail_mutex); +template void blocking_mpmc_unbounded::clear() { + std::lock_guard lock_head(head_mutex); + std::lock_guard lock_tail(tail_mutex); - head = std::make_unique(); - tail = head.get(); + head = std::make_unique(); + tail = head.get(); - std::lock_guard lock_size(size_mutex); - size_q = 0; + std::lock_guard lock_size(size_mutex); + size_q = 0; - return; + return; } +} // namespace tsfqueue::impl #endif diff --git a/include/lockfree_spsc_bounded/defs.hpp b/include/lockfree_spsc_bounded/defs.hpp index 0ca12e5..e864a4a 100644 --- a/include/lockfree_spsc_bounded/defs.hpp +++ b/include/lockfree_spsc_bounded/defs.hpp @@ -1,12 +1,10 @@ #ifndef LOCKFREE_SPSC_BOUNDED_DEFS #define LOCKFREE_SPSC_BOUNDED_DEFS - -#include "utils.hpp" +#include "../utils.hpp" #include #include #include - -namespace tsfqueue::__impl { +namespace tsfqueue::impl { template class lockfree_spsc_bounded { // For the implementation, we first take the size of the bounded queue from // user inside the templates so that we can do compile time memory allocation. @@ -21,13 +19,13 @@ template class lockfree_spsc_bounded { // among the preferred endpoints as per use case. private: // Add the private members : - // std::atomic head; - // std::atomic tail; - // size_t head_cache; - // size_t tail_cache; - // T arr[]; - // static constexpr size_t capacity; - + alignas(cache_line_size) std::atomic head; + alignas(cache_line_size) size_t tail_cache; + alignas(cache_line_size) std::atomic tail; + alignas(cache_line_size) size_t head_cache; + static constexpr size_t capacity = Capacity + 1; + alignas(cache_line_size) T arr[capacity]; + // aligned the start of the array too // Description of private members : // 1. std::atomic head is the atomic head pointer // 2. std::atomic tail is the atomic tail pointer @@ -38,25 +36,53 @@ template class lockfree_spsc_bounded { // 6. static constexpr size_t capcity to store the capcity for operations in // functions Why static ?? Why constexpr ?? [Reason this] + static_assert(Capacity > 0, "queue capacity must be greater than zero"); + + static_assert(Capacity < std::numeric_limits::max() - 1, + "prevent overflow"); + + static_assert(std::is_default_constructible::value, + "type T must be default constructible for array allocation"); + + static_assert(std::is_move_assignable::value || + std::is_copy_assignable::value, + "type T must be either move-assignable or copy-assignable"); + + static_assert(std::is_destructible::value, "type T must be destructible"); + + static_assert(std::atomic::is_always_lock_free, "must be lock-free"); + + static_assert(alignof(std::atomic) <= cache_line_size, + "cache line size is inefficient"); + public: // Public Member functions : // Add appropriate constructors and destructors -> Add here only + lockfree_spsc_bounded() : head(0), tail(0), head_cache(0), tail_cache(0) {} + ~lockfree_spsc_bounded() = default; // 1. void wait_and_push(value) : Busy wait until element is pushed + void wait_and_push(T); // 2. bool try_push(value) : Try to push if not full else leave (returns false // if could not push else true) + bool try_push(T); // 3. void wait_and_pop(value ref) : Busy wait until we have atmost 1 elt and + void wait_and_pop(T &); // then pop it and store in reference // 4. bool try_pop(value ref) : Try to pop and return false if failed bool + bool try_pop(T &); // 5. empty(void) : Checks if the queue is empty and return bool + bool empty() const; // 6. bool peek(value ref) : Peek the top of the queue. + bool peek(T &); + template bool emplace_back(Args &&...args); + size_t size() const; // Will work only in SPSC/MPSC why ?? [Reason this] - // 7. Add static asserts // 8. Add emplace_back using perfect forwarding and variadic templates (you // can use this in push then) // 9. Add size() function // 10. Any more suggestions ?? + // can make the functions peek , empty and size const // 11. Why no shared_ptr ?? [Reason this] }; -} // namespace tsfqueue::__impl - -#endif \ No newline at end of file +} // namespace tsfqueue::impl +#endif diff --git a/include/lockfree_spsc_bounded/impl.hpp b/include/lockfree_spsc_bounded/impl.hpp index 9a0b36f..aa72edc 100644 --- a/include/lockfree_spsc_bounded/impl.hpp +++ b/include/lockfree_spsc_bounded/impl.hpp @@ -2,31 +2,117 @@ #define LOCKFREE_SPSC_BOUNDED_IMPL_CT #include "defs.hpp" +#include -template -using queue = tsfqueue::__impl::lockfree_spsc_bounded; +namespace tsfqueue::impl +{ + template + void lockfree_spsc_bounded::wait_and_push(T value) + { + size_t cur_tail = tail.load(std::memory_order_acquire); + size_t next_tail = (cur_tail + 1) % capacity; + // size_t curr_head = head.load(std::memory_order_acquire); + while (next_tail == head_cache) + { + head_cache = head.load(std::memory_order_acquire); // busy wait + } -template -void queue::wait_and_push(T value) {} + arr[cur_tail] = std::move(value); + // tail_cache = next_tail; + tail.store(next_tail, std::memory_order_release); + } -template -bool queue::try_push(T value) {} + template + bool lockfree_spsc_bounded::try_push(T value) + { + return emplace_back(std::move(value)); + } -template -bool queue::try_pop(T &value) {} + template + bool lockfree_spsc_bounded::try_pop(T &value) + { + // cur_tail = tail.load(std::memory_order_acquire); + size_t cur_head = head.load(std::memory_order_acquire); + if (tail_cache == cur_head) + { + tail_cache = tail.load(std::memory_order_acquire); + if (tail_cache == cur_head) + return false; + } -template -void queue::wait_and_pop(T &value) {} + value = std::move(arr[cur_head]); + // head_cache = (cur_head + 1) % capacity; + head.store((cur_head + 1) % capacity, std::memory_order_release); + return true; + } -template -bool queue::peek(T &value) {} + template + void lockfree_spsc_bounded::wait_and_pop(T &value) + { + size_t cur_head = head.load(std::memory_order_acquire); + while (tail_cache == cur_head) + { + tail_cache = tail.load(std::memory_order_acquire); // busy wait + } -template bool queue::empty() {} + value = std::move(arr[cur_head]); + // head_cache = (cur_head + 1) % capacity; + head.store((cur_head + 1) % capacity, std::memory_order_release); + } -#endif + template + bool lockfree_spsc_bounded::peek(T &value) + { + size_t cur_head = head.load(std::memory_order_acquire); + if (cur_head == tail_cache) + { + tail_cache = tail.load(std::memory_order_acquire); + if (cur_head == tail_cache) + { + return false; + } + } + value = arr[cur_head]; + return true; + } -// 1. Add static asserts -// 2. Add emplace_back using perfect forwarding and variadic templates (you -// can use this in push then) -// 3. Add size() function -// 4. Any more suggestions ?? \ No newline at end of file + template + template + bool lockfree_spsc_bounded::emplace_back(Args &&...args) + { + size_t cur_tail = tail.load(std::memory_order_acquire); + size_t next_tail = (cur_tail + 1) % capacity; + if (next_tail == head_cache) + { + head_cache = head.load(std::memory_order_acquire); + if (next_tail == head_cache) + { + return false; + } + } + arr[cur_tail] = T(std::forward(args)...); + // tail_cache = next_tail; + tail.store(next_tail, std::memory_order_release); + return true; + } + + template + bool lockfree_spsc_bounded::empty() const + { + return head.load(std::memory_order_relaxed) == + tail.load(std::memory_order_relaxed); + // since queue is very frequently modified + } + + template + size_t lockfree_spsc_bounded::size() const + { + return (tail.load(std::memory_order_relaxed) - + head.load(std::memory_order_relaxed) + + capacity) % + capacity; + // again, since size is very frequently changing. + } +} // namespace tsfqueue::impl + +#endif \ No newline at end of file diff --git a/include/lockfree_spsc_unbounded/defs.hpp b/include/lockfree_spsc_unbounded/defs.hpp index 5968a33..047fb87 100644 --- a/include/lockfree_spsc_unbounded/defs.hpp +++ b/include/lockfree_spsc_unbounded/defs.hpp @@ -3,57 +3,140 @@ #include "utils.hpp" #include +#include #include +#include #include -namespace tsfqueue::__impl { +namespace tsfqueue::impl { template class lockfree_spsc_unbounded { // Works exactly same as the blocking_mpmc_unbounded queue (see this once) // with tail pointer pointing to stub node and your head pointer updates as // per the pushes. See the Lockless_Node in utils to understand the working. + + //-------------------------------------------------------------------------- // Note that the next pointers are atomic there. Why ?? [Reason this] + // + // if head and tail point to same stub node ,producer is trying to modify next + // ptr,consumer is trying to read + // next ptr(to check if queue is empty)->race condition + //-------------------------------------------------------------------------- + + //------------------------------------------------------------------------------ // Also the head and tail members are cache-aligned. Why ?? [Reason this] (ask // me for details) + // + // cpu update on common cache line,if producer makes some changes,it + // invalidates other cores(consumers) cache,so this has to keep on updating + // which is time consuming + // cache aligning make the pointers sit on different cache lines,so now we can + // spam modifications + //-------------------------------------------------------------------------------- // [Copy of blocking_mpmc_unbounded] // For the implementation, we start with a stub node and both head and tail - // are initialized to it. When we push, we make a new stub node, move the data - // into the current tail and then change the tail to the new stub. We have two - // methods : wait_and_pop() which waits on the queue and returns element & - // try_pop() which returns an element if queue is not empty otherwise returns - // some neutral element OR a false boolean whichever is applicable. Pop works - // by returning the data stored in head node and replacing head to its next - // node. We handle the empty queue gracefully as per the pop type. + // are initialized to it. + + // When we push, we make a new stub node, move the data + // into the current tail and then change the tail to the new stub. + + // We have two methods : wait_and_pop() which waits on the queue and returns + // element & + // try_pop() which returns an element if queue is not empty otherwise returns + // some neutral element OR a false boolean whichever is applicable. Pop works + // by returning the data stored in head node and replacing head to its next + // node. We handle the empty queue gracefully as per the pop type. + private: - using node = tsfqueue::__utils::Lockless_Node; + using node = tsfqueue::utils::Lockless_Node; // Add the private members : // 1. node* head; // 2. node* tail; - // Description of priavte members : + alignas(cache_line_size) node *head; + alignas(cache_line_size) node *tail; + + // Description of private members : // 1. node* head -> Pointer to the head node // 2. node* tail -> Pointer to tail node // 3. Cache align 1-2 + // capacity + alignas(cache_line_size) std::atomic capacity{0}; + public: // Public member functions : + + // static asserts + static_assert(std::is_default_constructible_v, + "T must be default constructible"); + static_assert(std::is_nothrow_destructible_v, + "T must be nothrow destructible"); + // Add relevant constructors and destructors -> Add these here only + + lockfree_spsc_unbounded() { + + head = new node(); + head->next.store(nullptr, std::memory_order_relaxed); + + tail = head; + } + + ~lockfree_spsc_unbounded() { + + while (head != nullptr) { + node *current = head; + head = head->next.load(std::memory_order_relaxed); + delete current; + } + } + + // Delete Copy and Move constructor + // Copy constructor not required as its spsc,could also cause double free + // crash move constructor can break the queue if thread is running and we move + // the queue somewhere else + + lockfree_spsc_unbounded(const lockfree_spsc_unbounded &) = delete; + lockfree_spsc_unbounded &operator=(const lockfree_spsc_unbounded &) = delete; + + lockfree_spsc_unbounded(lockfree_spsc_unbounded &&) = delete; + lockfree_spsc_unbounded &operator=(lockfree_spsc_unbounded &&) = delete; + // 1. void push(value) : Pushes the value inside the queue, copies the value + void push(T); + // 2. void wait_and_pop(value ref) : Blocking wait on queue, returns value in // the reference passed as parameter + void wait_and_pop(T &); + // 3. bool try_pop(value ref) : Returns true and // gives the value in reference passed, false otherwise + bool try_pop(T &); + // 4. bool empty() : Returns // whether the queue is empty or not at that instant - // 5. bool peek(value ref) : Returns the front/top element of queue in ref (false if empty queue) + bool empty(); + + // 5. bool peek(value ref) : Returns the front/top element of queue in ref + // (false if empty queue) + bool unsafe_peek(T &); + // 6. Add static asserts + // 7. Add emplace_back using perfect forwarding and variadic templates (you // can use this in push then) + template void emplace_back(Args &&...args); + // 8. Add size() function + size_t size(); + // 9. Any more suggestions ?? + // 10. Why no shared_ptr ?? [Reason this] + // }; -} // namespace tsfqueue::__impl +} // namespace tsfqueue::impl #endif \ No newline at end of file diff --git a/include/lockfree_spsc_unbounded/impl.hpp b/include/lockfree_spsc_unbounded/impl.hpp index 8d737a9..f69b144 100644 --- a/include/lockfree_spsc_unbounded/impl.hpp +++ b/include/lockfree_spsc_unbounded/impl.hpp @@ -3,23 +3,105 @@ #include "defs.hpp" -template -using queue = tsfqueue::__impl::lockfree_spsc_unbounded; +namespace tsfqueue::impl { +template void lockfree_spsc_unbounded::push(T value) { + + static_assert(std::is_move_constructible_v, + "T must be move constructible"); + + emplace_back(std::move(value)); +} + +template bool lockfree_spsc_unbounded::try_pop(T &value) { + + static_assert(std::is_nothrow_destructible_v, + "T must be nothrow destructible"); + static_assert(std::is_move_assignable_v, "T must be move-assignable"); + + node *new_head = head->next.load(std::memory_order_acquire); + + if (new_head == nullptr) { + return false; + } + + node *old_head = head; + value = std::move(head->data); + head = new_head; + + delete old_head; + + capacity.fetch_sub(1, std::memory_order_relaxed); + + return true; +} + +template void lockfree_spsc_unbounded::wait_and_pop(T &value) { + + static_assert(std::is_nothrow_destructible_v, + "T must be nothrow destructible"); + static_assert(std::is_move_assignable_v, "T must be move-assignable"); -template void queue::push(T value) {} + node *new_head; + while ((new_head = head->next.load(std::memory_order_acquire)) == nullptr) { + std::this_thread::yield(); // low latency-high cpu usage + } -template bool queue::try_pop(T &value) {} + node *old_head = head; + value = std::move(head->data); + head = new_head; -template void queue::wait_and_pop(T &value) {} + delete old_head; -template bool queue::peek(T &value) {} + capacity.fetch_sub(1, std::memory_order_relaxed); +} -template bool queue::empty(void) {} +template bool lockfree_spsc_unbounded::unsafe_peek(T &value) { + + static_assert(std::is_copy_assignable_v, "T must be copy-assignable"); + + if (empty()) { + return false; + } + + value = head->data; + return true; +} + +template bool lockfree_spsc_unbounded::empty(void) { + return (head->next.load(std::memory_order_acquire) == nullptr); +} + +template +template +void lockfree_spsc_unbounded::emplace_back(Args &&...args) { + static_assert(std::is_constructible_v, + "T must be constructible with Args&&..."); + static_assert(std::is_default_constructible_v, + "T must be default constructible"); + static_assert(std::is_move_assignable_v, "T must be move-assignable"); + + node *stub = new node(); + stub->next.store(nullptr, std::memory_order_relaxed); + + tail->data = T(std::forward(args)...); + capacity.fetch_add(1, std::memory_order_relaxed); + tail->next.store(stub, std::memory_order_release); + + tail = stub; +} + +template size_t lockfree_spsc_unbounded::size() { + return capacity.load(std::memory_order_relaxed); +} +} // namespace tsfqueue::impl #endif // 1. Add static asserts + // 2. Add emplace_back using perfect forwarding and variadic templates (you // can use this in push then) + // 3. Add size() function + // 4. Any more suggestions ?? \ No newline at end of file diff --git a/include/tsfqueue.hpp b/include/tsfqueue.hpp index f90f2da..8f1f172 100644 --- a/include/tsfqueue.hpp +++ b/include/tsfqueue.hpp @@ -12,4 +12,14 @@ #include #endif +namespace tsfqueue +{ +template +using BlockingMPMCUnbounded = impl::blocking_mpmc_unbounded; +template +using SPSCBounded = impl::lockfree_spsc_bounded; +template +using SPSCUnbounded = impl::lockfree_spsc_unbounded; +} // namespace tsfqueue + #endif diff --git a/include/utils.hpp b/include/utils.hpp index fddfdc6..c016ba2 100644 --- a/include/utils.hpp +++ b/include/utils.hpp @@ -1,7 +1,19 @@ +#ifndef UTILS +#define UTILS + #include #include -namespace tsfqueue::__utils { +namespace tsfqueue::impl { +#ifdef __cpp_lib_hardware_interference_size +inline constexpr size_t cache_line_size = + std::hardware_destructive_interference_size; +#else +inline constexpr size_t cache_line_size = 64; // fallback +#endif +} // namespace tsfqueue::impl + +namespace tsfqueue::utils { template struct Node { std::shared_ptr data; std::unique_ptr> next; @@ -10,9 +22,6 @@ template struct Lockless_Node { T data; std::atomic next; }; -} // namespace tsfqueue::__utils +} // namespace tsfqueue::utils -namespace tsfq::__impl { -static constexpr size_t cache_line_size = - std::hardware_destructive_interference_size; -} \ No newline at end of file +#endif \ No newline at end of file diff --git a/tests/test_mpmc.cpp b/tests/test_mpmc.cpp new file mode 100644 index 0000000..06e3422 --- /dev/null +++ b/tests/test_mpmc.cpp @@ -0,0 +1,216 @@ +#include +#include +#include +#include +#include +#include +#include "tsfqueue.hpp" +#include + + +// Basic sanity +TEST(MPMCQueue, BasicPushPop_Unbounded) { + tsfqueue::BlockingMPMCUnbounded q; + + EXPECT_TRUE(q.empty()); + + q.push(1); + q.push(2); + + int x; + EXPECT_TRUE(q.try_pop(x)); + EXPECT_EQ(x, 1); + + EXPECT_TRUE(q.try_pop(x)); + EXPECT_EQ(x, 2); + + EXPECT_TRUE(q.empty()); +} + +TEST(MPMCQueue, WaitFor_isTimeExact) { + tsfqueue::BlockingMPMCUnbounded q; + + EXPECT_TRUE(q.empty()); + + q.push(1); + q.push(2); + + int x; + EXPECT_TRUE(q.try_pop(x)); + EXPECT_EQ(x, 1); + + EXPECT_TRUE(q.try_pop(x)); + EXPECT_EQ(x, 2); + + EXPECT_TRUE(q.empty()); + + int wtime = 200; + int error = 10; + // auto func = [&](){ + // std::this_thread::sleep_for(std::chrono::seconds(10)); + // q.push(12); + // q.push(31); + // }; + // std::thread t(func); + auto start = std::chrono::high_resolution_clock::now(); + int res = q.wait_for_and_pop(x, std::chrono::milliseconds(wtime)); + auto end = std::chrono::high_resolution_clock::now(); + auto duration = std::chrono::duration_cast(end - start).count(); + EXPECT_NEAR((int)duration, (int)wtime, error); +} + +TEST(MPMCQueue, WaitForTester) { + tsfqueue::BlockingMPMCUnbounded q; + + EXPECT_TRUE(q.empty()); + + q.push(1); + q.push(2); + + int x; + EXPECT_TRUE(q.try_pop(x)); + EXPECT_EQ(x, 1); + + EXPECT_TRUE(q.try_pop(x)); + EXPECT_EQ(x, 2); + + EXPECT_TRUE(q.empty()); + + int wtime = 4000; + int pushTime = 2000; + int error = 50; + auto func = [&](){ + std::this_thread::sleep_for(std::chrono::milliseconds(pushTime)); + q.push(12); + }; + std::thread t(func); + auto start = std::chrono::high_resolution_clock::now(); + EXPECT_TRUE(q.wait_for_and_pop(x, std::chrono::milliseconds(wtime))); + auto end = std::chrono::high_resolution_clock::now(); + auto duration = std::chrono::duration_cast(end - start).count(); + EXPECT_NEAR((int)duration, (int)pushTime, error); + + EXPECT_EQ(x, 12); + + EXPECT_TRUE(q.empty()); + t.join(); +} + +TEST(MPMCQueue, DataRaceStressTest) { + tsfqueue::BlockingMPMCUnbounded q; + const int num_threads = 8; + const int ops_per_thread = 1000; + std::atomic total_popped{0}; + std::vector producers; + std::vector consumers; + + // Launch Producers + for (int i = 0; i < num_threads; ++i) { + producers.emplace_back([&q, ops_per_thread]() { + for (int j = 0; j < ops_per_thread; ++j) { + q.push(j); + } + }); + } + + // Launch Consumers + for (int i = 0; i < num_threads; ++i) { + consumers.emplace_back([&q, ops_per_thread, &total_popped]() { + for (int j = 0; j < ops_per_thread; ++j) { + int val; + // Using wait_for to ensure we don't block forever if a push is missed + if (q.wait_for_and_pop(val, std::chrono::milliseconds(100))) { + total_popped++; + } + } + }); + } + + for (auto& t : producers) t.join(); + for (auto& t : consumers) t.join(); + + // Verify all items were accounted for + EXPECT_EQ(total_popped.load(), num_threads * ops_per_thread); + EXPECT_TRUE(q.empty()); +} + +// Testing Emplace Back working and Static Assert working, i.e it does not create a copy of the object. +struct MockObject { + static int copies; + static int moves; + int x; + MockObject(int val) : x(val) {} + MockObject(const MockObject& other) { copies++; } + MockObject(MockObject&& other) noexcept { moves++; } + MockObject& operator=(const MockObject& other) { + x = other.x; + copies++; + return *this; + } + + MockObject& operator=(MockObject&& other) noexcept { + x = other.x; + moves++; + return *this; + } +}; +int MockObject::copies = 0; +int MockObject::moves = 0; + +TEST(MPMCQueue, EmplaceBackEfficiency_and_StaticAsserts) { + tsfqueue::BlockingMPMCUnbounded q; + MockObject::copies = 0; + MockObject::moves = 0; + // Emplace should construct the object directly in the internal storage + q.emplace_back(42); + + EXPECT_EQ(MockObject::copies, 0); + // Depending on implementation, moves should ideally be 0 or 1 + EXPECT_LE(MockObject::moves, 1); + + MockObject out(0); + + EXPECT_TRUE(q.try_pop(out)); + EXPECT_EQ(out.x, 42); +} + +// --------------------------------------------------------- +// Helper: A type that is NEITHER copyable NOR movable +// --------------------------------------------------------- +struct LockedType { + LockedType() = default; + LockedType(const LockedType&) = delete; + LockedType(LockedType&&) = delete; + LockedType& operator=(const LockedType&) = delete; + LockedType& operator=(LockedType&&) = delete; +}; + +// --------------------------------------------------------- +// Helper: A type that is ONLY movable +// --------------------------------------------------------- +struct MoveOnly { + MoveOnly() = default; + MoveOnly(const MoveOnly&) = delete; + MoveOnly(MoveOnly&&) = default; + MoveOnly& operator=(const MoveOnly&) = delete; + MoveOnly& operator=(MoveOnly&&) = default; +}; + +// --------------------------------------------------------- +// The Test Case +// --------------------------------------------------------- +TEST(MPMCQueue, StaticRequirementsValidation) { + // 1. Test Copy/Move Constructibility (Required for Push/Emplace) + bool is_pushable = std::is_copy_constructible_v || std::is_move_constructible_v; + EXPECT_TRUE(is_pushable) << "Queue must support copy or move for push operations."; + + // 2. Test Assignability (Required for try_pop(T& value)) + // This mirrors your failing static_assert: + // static_assert(std::is_copy_assignable_v || std::is_move_assignable_v) + + bool move_only_assignable = std::is_copy_assignable_v || std::is_move_assignable_v; + EXPECT_TRUE(move_only_assignable) << "Move-only types should be allowed if they are move-assignable."; + + bool locked_type_assignable = std::is_copy_assignable_v || std::is_move_assignable_v; + EXPECT_FALSE(locked_type_assignable) << "LockedType should fail the assignability requirement."; +} diff --git a/tests/test_mpsc.cpp b/tests/test_mpsc.cpp new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_spsc.cpp b/tests/test_spsc.cpp new file mode 100644 index 0000000..c46cd4c --- /dev/null +++ b/tests/test_spsc.cpp @@ -0,0 +1,44 @@ +#include +#include +#include +#include + +#include "tsfqueue.hpp" + +// Basic sanity +TEST(SPSCQueue, BasicPushPop_Bounded) { + tsfqueue::SPSCBounded q; + + EXPECT_TRUE(q.empty()); + + EXPECT_TRUE(q.try_push(1)); + EXPECT_TRUE(q.try_push(2)); + + int x; + EXPECT_TRUE(q.try_pop(x)); + EXPECT_EQ(x, 1); + + EXPECT_TRUE(q.try_pop(x)); + EXPECT_EQ(x, 2); + + EXPECT_TRUE(q.empty()); +} + +// Basic sanity +TEST(SPSCQueue, BasicPushPop_Unbounded) { + tsfqueue::SPSCUnbounded q; + + EXPECT_TRUE(q.empty()); + + q.push(1); + q.push(2); + + int x; + EXPECT_TRUE(q.try_pop(x)); + EXPECT_EQ(x, 1); + + EXPECT_TRUE(q.try_pop(x)); + EXPECT_EQ(x, 2); + + EXPECT_TRUE(q.empty()); +} \ No newline at end of file