44#include " defs.hpp"
55#include < utility>
66
7- namespace tsfqueue ::impl {
8- template <typename T, size_t Capacity>
9- void lockfree_spsc_bounded<T, Capacity>::wait_and_push(T value) {
10- size_t next_tail = tail_cache + 1 ;
11- if (next_tail == capacity) {
12- next_tail = 0 ;
13- }
14-
15- static thread_local int spin_threshold = 100 ;
16- const int min_spin = 10 , max_spin = 1000 ;
17- int spin = 0 ;
18- bool spun_success = false ;
19- bool done = false ;
20- while (true ) {
21- if (next_tail == head_cache) {
22- done = true ;
23- head_cache = head.load (std::memory_order_acquire);
24- if (next_tail == head_cache) {
25- if (spin < spin_threshold) {
26- // Busy-wait
27- } else if (spin < spin_threshold + 100 ) {
28- std::this_thread::yield ();
29- } else {
30- head.wait (head_cache, std::memory_order_acquire);
31- }
32- spin++;
33- continue ;
34- }
7+ namespace tsfqueue ::impl
8+ {
9+ template <typename T, size_t Capacity>
10+ void lockfree_spsc_bounded<T, Capacity>::wait_and_push(T value)
11+ {
12+ size_t next_tail = (tail_cache + 1 ) % capacity;
13+
14+ while (next_tail == head_cache)
15+ {
16+ head_cache = head.load (std::memory_order_acquire); // busy wait
3517 }
3618
37- // Refresh head_cache before was_empty calculation to ensure correctness
38- if (!done) {
39- head_cache = head.load (std::memory_order_acquire);
40- }
41- spun_success = (spin < spin_threshold);
42- bool was_empty = (head_cache == tail_cache);
4319 arr[tail_cache] = std::move (value);
4420 tail_cache = next_tail;
4521 tail.store (tail_cache, std::memory_order_release);
46-
47- if (was_empty) {
48- tail.notify_one ();
49- }
50- break ;
5122 }
5223
53- // Adapt spin threshold for next time
54- int delta = std::max (1 , spin / 10 );
55- if (spun_success) {
56- spin_threshold = std::min (spin_threshold + delta, max_spin);
57- } else {
58- spin_threshold = std::max (spin_threshold - delta, min_spin);
24+ template <typename T, size_t Capacity>
25+ bool lockfree_spsc_bounded<T, Capacity>::try_push(T value)
26+ {
27+ return emplace_back (std::move (value));
5928 }
60- }
6129
62- template <typename T, size_t Capacity>
63- bool lockfree_spsc_bounded<T, Capacity>::try_push(T value) {
64- return emplace_back (std::move (value));
65- }
66-
67- template <typename T, size_t Capacity>
68- bool lockfree_spsc_bounded<T, Capacity>::try_pop(T &value) {
69- if (tail_cache == head_cache) {
70- tail_cache = tail.load (std::memory_order_acquire); // refresh cache
71- if (tail_cache == head_cache) // empty
30+ template <typename T, size_t Capacity>
31+ bool lockfree_spsc_bounded<T, Capacity>::try_pop(T &value)
32+ {
33+ if (tail_cache == head_cache)
7234 {
73- return false ;
74- }
75- }
76- bool was_full = (tail_cache + 1 ) % capacity == head_cache;
77- value = arr[head_cache];
78- head_cache = (head_cache + 1 ) % capacity;
79- head.store (head_cache, std::memory_order_release);
80- if (was_full) {
81- head.notify_one ();
82- }
83- return true ;
84- }
85-
86- template <typename T, size_t Capacity>
87- void lockfree_spsc_bounded<T, Capacity>::wait_and_pop(T &value) {
88- static thread_local int spin_threshold = 100 ;
89- const int min_spin = 10 , max_spin = 1000 ;
90- int spin = 0 ;
91- bool spun_success = false ;
92- bool done = false ;
93- while (true ) {
94- if (head_cache == tail_cache) {
95- done = true ;
9635 tail_cache = tail.load (std::memory_order_acquire);
97- if (head_cache == tail_cache) {
98- if (spin < spin_threshold) {
99- // Busy-wait
100- } else if (spin < spin_threshold + 100 ) {
101- std::this_thread::yield ();
102- } else {
103- tail.wait (tail_cache, std::memory_order_acquire);
104- }
105- ++spin;
106- continue ;
36+ if (tail_cache == head_cache)
37+ {
38+ return false ;
10739 }
10840 }
10941
110- // Refresh tail_cache before was_full calculation to ensure correctness
111- if (!done) {
112- tail_cache = tail.load (std::memory_order_acquire);
113- }
114- spun_success = (spin < spin_threshold);
115- size_t next_tail = tail_cache + 1 ;
116- if (next_tail == capacity) {
117- next_tail = 0 ;
118- }
119- bool was_full = (next_tail == head_cache);
120- value = arr[head_cache];
121- head_cache = head_cache + 1 ;
122- if (head_cache == capacity) {
123- head_cache = 0 ;
124- }
42+ value = std::move (arr[head_cache]);
43+ head_cache = (head_cache + 1 ) % capacity;
12544 head.store (head_cache, std::memory_order_release);
126- if (was_full) {
127- head.notify_one ();
128- }
129- break ;
45+ return true ;
13046 }
13147
132- // Adapt spin threshold for next time
133- int delta = std::max (1 , spin / 10 );
134- if (spun_success) {
135- spin_threshold = std::min (spin_threshold + delta, max_spin);
136- } else {
137- spin_threshold = std::max (spin_threshold - delta, min_spin);
138- }
139- }
48+ template <typename T, size_t Capacity>
49+ void lockfree_spsc_bounded<T, Capacity>::wait_and_pop(T &value)
50+ {
51+ while (head_cache == tail_cache)
52+ {
53+ tail_cache = tail.load (std::memory_order_acquire); // busy wait
54+ }
14055
141- template <typename T, size_t Capacity>
142- bool lockfree_spsc_bounded<T, Capacity>::peek(T &value) {
143- if (head_cache == tail_cache) {
144- tail_cache = tail.load (std::memory_order_acquire);
145- if (tail_cache == head_cache) // empty
146- return false ;
56+ value = std::move (arr[head_cache]);
57+ head_cache = (head_cache + 1 ) % capacity;
58+ head.store (head_cache, std::memory_order_release);
14759 }
148- value = arr[head_cache];
149- return true ;
150- }
15160
152- template <typename T, size_t Capacity>
153- bool lockfree_spsc_bounded<T, Capacity>::empty() {
154- return head.load (std::memory_order_acquire) ==
155- tail.load (std::memory_order_acquire);
156- }
61+ template <typename T, size_t Capacity>
62+ bool lockfree_spsc_bounded<T, Capacity>::peek(T &value)
63+ {
64+ if (head_cache == tail_cache)
65+ {
66+ tail_cache = tail.load (std::memory_order_acquire);
67+ if (head_cache == tail_cache)
68+ {
69+ return false ;
70+ }
71+ }
72+ value = arr[head_cache];
73+ return true ;
74+ }
15775
158- template <typename T, size_t Capacity>
159- template <typename ... Args>
160- bool lockfree_spsc_bounded<T, Capacity>::emplace_back(Args &&...args) {
161- if ((tail_cache + 1 ) % capacity == head_cache) {
162- head_cache = head.load (std::memory_order_acquire); // refresh cache
163- if ((tail_cache + 1 ) % capacity == head_cache) // full
76+ template <typename T, size_t Capacity>
77+ template <typename ... Args>
78+ bool lockfree_spsc_bounded<T, Capacity>::emplace_back(Args &&...args)
79+ {
80+ if ((tail_cache + 1 ) % capacity == head_cache)
16481 {
165- return false ;
82+ head_cache = head.load (std::memory_order_acquire);
83+ if ((tail_cache + 1 ) % capacity == head_cache)
84+ {
85+ return false ;
86+ }
16687 }
88+
89+ arr[tail_cache] = T (std::forward<Args>(args)...);
90+ tail_cache = (tail_cache + 1 ) % capacity;
91+ tail.store (tail_cache, std::memory_order_release);
92+ return true ;
16793 }
168- bool was_empty = (head_cache == tail_cache);
169- arr[tail_cache] = T (std::forward<Args>(args)...);
170- tail_cache = (tail_cache + 1 ) % capacity;
171- tail.store (tail_cache, std::memory_order_release);
172- if (was_empty) {
173- tail.notify_one ();
94+
95+ template <typename T, size_t Capacity>
96+ bool lockfree_spsc_bounded<T, Capacity>::empty() const
97+ {
98+ return head.load (std::memory_order_relaxed) ==
99+ tail.load (std::memory_order_relaxed);
100+ // since queue is very frequently modified
101+ }
102+
103+ template <typename T, size_t Capacity>
104+ size_t lockfree_spsc_bounded<T, Capacity>::size() const
105+ {
106+ return (tail.load (std::memory_order_relaxed) -
107+ head.load (std::memory_order_relaxed) +
108+ capacity) % capacity;
109+ // again, since size is very frequently changing.
174110 }
175- return true ;
176- }
177-
178- template <typename T, size_t Capacity>
179- size_t lockfree_spsc_bounded<T, Capacity>::size() {
180- size_t t = tail.load (std::memory_order_acquire);
181- size_t h = head.load (std::memory_order_acquire);
182- return (t - h + capacity) % capacity;
183- }
184111} // namespace tsfqueue::impl
185112
186- #endif
187-
188- // 1. Add static asserts
189- // 2. Add emplace_back using perfect forwarding and variadic templates (you
190- // can use this in push then)
191- // 3. Add size() function
192- // 4. Any more suggestions ??
113+ #endif
0 commit comments