Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 38 additions & 33 deletions include/lockfree_spsc_bounded/impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,17 @@ namespace tsfqueue::impl
template <typename T, size_t Capacity>
void lockfree_spsc_bounded<T, Capacity>::wait_and_push(T value)
{
size_t next_tail = (tail_cache + 1) % capacity;

size_t cur_tail = tail.load(std::memory_order_acquire);
size_t next_tail = (cur_tail + 1) % capacity;
// size_t curr_head = head.load(std::memory_order_acquire);
while (next_tail == head_cache)
{
head_cache = head.load(std::memory_order_acquire); // busy wait
}

arr[tail_cache] = std::move(value);
tail_cache = next_tail;
tail.store(tail_cache, std::memory_order_release);
arr[cur_tail] = std::move(value);
// tail_cache = next_tail;
tail.store(next_tail, std::memory_order_release);
}

template <typename T, size_t Capacity>
Expand All @@ -30,83 +31,87 @@ namespace tsfqueue::impl
template <typename T, size_t Capacity>
bool lockfree_spsc_bounded<T, Capacity>::try_pop(T &value)
{
if (tail_cache == head_cache)
// cur_tail = tail.load(std::memory_order_acquire);
size_t cur_head = head.load(std::memory_order_acquire);
if (tail_cache == cur_head)
{
tail_cache = tail.load(std::memory_order_acquire);
if (tail_cache == head_cache)
{
if (tail_cache == cur_head)
return false;
}
}

value = std::move(arr[head_cache]);
head_cache = (head_cache + 1) % capacity;
head.store(head_cache, std::memory_order_release);
value = std::move(arr[cur_head]);
// head_cache = (cur_head + 1) % capacity;
head.store((cur_head + 1) % capacity, std::memory_order_release);
return true;
}

template <typename T, size_t Capacity>
void lockfree_spsc_bounded<T, Capacity>::wait_and_pop(T &value)
{
while (head_cache == tail_cache)
size_t cur_head = head.load(std::memory_order_acquire);
while (tail_cache == cur_head)
{
tail_cache = tail.load(std::memory_order_acquire); // busy wait
}

value = std::move(arr[head_cache]);
head_cache = (head_cache + 1) % capacity;
head.store(head_cache, std::memory_order_release);
value = std::move(arr[cur_head]);
// head_cache = (cur_head + 1) % capacity;
head.store((cur_head + 1) % capacity, std::memory_order_release);
}

template <typename T, size_t Capacity>
bool lockfree_spsc_bounded<T, Capacity>::peek(T &value)
{
if (head_cache == tail_cache)
size_t cur_head = head.load(std::memory_order_acquire);
if (cur_head == tail_cache)
{
tail_cache = tail.load(std::memory_order_acquire);
if(head_cache == tail_cache)
if (cur_head == tail_cache)
{
return false;
}
}
value = arr[head_cache];
value = arr[cur_head];
return true;
}

template <typename T, size_t Capacity>
template <typename... Args>
bool lockfree_spsc_bounded<T, Capacity>::emplace_back(Args &&...args)
{
if ((tail_cache + 1) % capacity == head_cache)
size_t cur_tail = tail.load(std::memory_order_acquire);
size_t next_tail = (cur_tail + 1) % capacity;
if (next_tail == head_cache)
{
head_cache = head.load(std::memory_order_acquire);
if ((tail_cache + 1) % capacity == head_cache)
if (next_tail == head_cache)
{
return false;
}
}

arr[tail_cache] = T(std::forward<Args>(args)...);
tail_cache = (tail_cache + 1) % capacity;
tail.store(tail_cache, std::memory_order_release);
arr[cur_tail] = T(std::forward<Args>(args)...);
// tail_cache = next_tail;
tail.store(next_tail, std::memory_order_release);
return true;
}

template <typename T, size_t Capacity>
bool lockfree_spsc_bounded<T, Capacity>::empty() const
{
return head.load(std::memory_order_relaxed) ==
return head.load(std::memory_order_relaxed) ==
tail.load(std::memory_order_relaxed);
// since queue is very frequently modified
// since queue is very frequently modified
}

template <typename T, size_t Capacity>
size_t lockfree_spsc_bounded<T, Capacity>::size() const
{
return (tail.load(std::memory_order_relaxed) -
head.load(std::memory_order_relaxed) +
capacity) % capacity;
// again, since size is very frequently changing.
return (tail.load(std::memory_order_relaxed) -
head.load(std::memory_order_relaxed) +
capacity) %
capacity;
// again, since size is very frequently changing.
}
} // namespace tsfqueue::impl

Expand Down
Loading