Skip to content

Commit 557e024

Browse files
committed
Added blocking get. Added bench for CPU cores.
1 parent 7c0d0f5 commit 557e024

File tree

6 files changed

+932
-90
lines changed

6 files changed

+932
-90
lines changed

CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ set(POT_HEADERS
5656

5757
include/${PROJECT_NAME}/algorithms/parfor.h
5858
include/${PROJECT_NAME}/algorithms/lfqueue.h
59+
include/${PROJECT_NAME}/algorithms/lfdequeue.h
5960
include/${PROJECT_NAME}/algorithms/reduce.h
6061
include/${PROJECT_NAME}/algorithms/dot.h
6162
include/${PROJECT_NAME}/algorithms/parsections.h

include/pot/algorithms/lfdequeue.h

Lines changed: 268 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,268 @@
1+
#pragma once
2+
3+
// temp
4+
5+
#include <atomic>
6+
#include <optional>
7+
8+
namespace pot::algorithms
9+
{
10+
template <typename T>
11+
class lfdequeue
12+
{
13+
public:
14+
struct stealer_token
15+
{
16+
std::atomic<long> id_last_used;
17+
std::atomic<bool> was_idle;
18+
stealer_token *next;
19+
};
20+
21+
lfdequeue();
22+
~lfdequeue();
23+
24+
void push(const T &item);
25+
[[nodiscard]] std::optional<T> pop();
26+
27+
[[nodiscard]] stealer_token *register_stealer();
28+
[[nodiscard]] std::optional<T> pop_back(stealer_token *token);
29+
30+
private:
31+
struct ring_buffer
32+
{
33+
long id;
34+
int log_size;
35+
T *segment;
36+
ring_buffer *next;
37+
38+
ring_buffer(int ls, long i);
39+
~ring_buffer();
40+
41+
long size() const;
42+
T get(long i) const;
43+
void put(long i, T item);
44+
ring_buffer *resize(long b, long t, int delta);
45+
};
46+
47+
static constexpr int log_initial_size = 4;
48+
49+
alignas(64) std::atomic<long> top{0};
50+
alignas(64) std::atomic<long> bottom{0};
51+
52+
ring_buffer *unlinked{nullptr};
53+
std::atomic<stealer_token *> id_list{nullptr};
54+
std::atomic<ring_buffer *> active_buffer;
55+
56+
void reclaim_buffers(ring_buffer *new_buffer);
57+
};
58+
}
59+
60+
template <typename T>
61+
pot::algorithms::lfdequeue<T>::ring_buffer::ring_buffer(int ls, long i)
62+
: id(i), log_size(ls), next(nullptr)
63+
{
64+
segment = new T[1 << log_size];
65+
}
66+
67+
template <typename T>
68+
pot::algorithms::lfdequeue<T>::ring_buffer::~ring_buffer()
69+
{
70+
delete[] segment;
71+
}
72+
73+
template <typename T>
74+
long pot::algorithms::lfdequeue<T>::ring_buffer::size() const
75+
{
76+
return static_cast<long>(1 << log_size);
77+
}
78+
79+
template <typename T>
80+
T pot::algorithms::lfdequeue<T>::ring_buffer::get(long i) const
81+
{
82+
return segment[i % size()];
83+
}
84+
85+
template <typename T>
86+
void pot::algorithms::lfdequeue<T>::ring_buffer::put(long i, T item)
87+
{
88+
segment[i % size()] = item;
89+
}
90+
91+
template <typename T>
92+
typename pot::algorithms::lfdequeue<T>::ring_buffer *pot::algorithms::lfdequeue<T>::ring_buffer::resize(long b, long t, int delta)
93+
{
94+
auto buffer = new ring_buffer(log_size + delta, id + 1);
95+
for (auto i = t; i < b; ++i)
96+
{
97+
buffer->put(i, get(i));
98+
}
99+
next = buffer;
100+
return buffer;
101+
}
102+
103+
template <typename T>
104+
pot::algorithms::lfdequeue<T>::lfdequeue()
105+
: active_buffer(new ring_buffer(log_initial_size, 0))
106+
{
107+
}
108+
109+
template <typename T>
110+
pot::algorithms::lfdequeue<T>::~lfdequeue()
111+
{
112+
auto b = active_buffer.load(std::memory_order_relaxed);
113+
114+
while (unlinked && unlinked != b)
115+
{
116+
auto reclaimed = unlinked;
117+
unlinked = unlinked->next;
118+
delete reclaimed;
119+
}
120+
121+
delete b;
122+
123+
auto head = id_list.load(std::memory_order_relaxed);
124+
while (head)
125+
{
126+
auto reclaimed = head;
127+
head = head->next;
128+
delete reclaimed;
129+
}
130+
}
131+
132+
template <typename T>
133+
void pot::algorithms::lfdequeue<T>::push(const T &item)
134+
{
135+
auto b = bottom.load(std::memory_order_relaxed);
136+
auto t = top.load(std::memory_order_acquire);
137+
auto a = active_buffer.load(std::memory_order_relaxed);
138+
139+
auto current_size = b - t;
140+
if (current_size >= a->size() - 1)
141+
{
142+
unlinked = unlinked ? unlinked : a;
143+
a = a->resize(b, t, 1);
144+
active_buffer.store(a, std::memory_order_release);
145+
}
146+
147+
if (unlinked)
148+
{
149+
reclaim_buffers(a);
150+
}
151+
152+
a->put(b, item);
153+
std::atomic_thread_fence(std::memory_order_release);
154+
bottom.store(b + 1, std::memory_order_relaxed);
155+
}
156+
157+
template <typename T>
158+
std::optional<T> pot::algorithms::lfdequeue<T>::pop()
159+
{
160+
auto b = bottom.load(std::memory_order_relaxed);
161+
auto a = active_buffer.load(std::memory_order_acquire);
162+
163+
bottom.store(b - 1, std::memory_order_relaxed);
164+
std::atomic_thread_fence(std::memory_order_seq_cst);
165+
auto t = top.load(std::memory_order_relaxed);
166+
167+
auto current_size = b - t;
168+
std::optional<T> popped = std::nullopt;
169+
170+
if (current_size <= 0)
171+
{
172+
bottom.store(b, std::memory_order_relaxed);
173+
}
174+
else if (current_size == 1)
175+
{
176+
if (top.compare_exchange_strong(t, t + 1, std::memory_order_seq_cst, std::memory_order_relaxed))
177+
{
178+
popped = a->get(t);
179+
}
180+
bottom.store(b, std::memory_order_relaxed);
181+
}
182+
else
183+
{
184+
popped = a->get(b - 1);
185+
186+
if (current_size <= a->size() / 3 && current_size > (1 << log_initial_size))
187+
{
188+
unlinked = unlinked ? unlinked : a;
189+
a = a->resize(b, t, -1);
190+
active_buffer.store(a, std::memory_order_release);
191+
}
192+
193+
if (unlinked)
194+
{
195+
reclaim_buffers(a);
196+
}
197+
}
198+
199+
return popped;
200+
}
201+
202+
template <typename T>
203+
typename pot::algorithms::lfdequeue<T>::stealer_token *pot::algorithms::lfdequeue<T>::register_stealer()
204+
{
205+
auto token = new stealer_token{{0}, {true}, nullptr};
206+
token->next = id_list.load(std::memory_order_relaxed);
207+
208+
while (!id_list.compare_exchange_weak(token->next, token))
209+
{
210+
token->next = id_list.load(std::memory_order_relaxed);
211+
}
212+
213+
return token;
214+
}
215+
216+
template <typename T>
217+
std::optional<T> pot::algorithms::lfdequeue<T>::pop_back(stealer_token *token)
218+
{
219+
token->was_idle.store(false, std::memory_order_release);
220+
221+
auto t = top.load(std::memory_order_acquire);
222+
std::atomic_thread_fence(std::memory_order_seq_cst);
223+
auto b = bottom.load(std::memory_order_acquire);
224+
225+
long current_size = b - t;
226+
std::optional<T> stolen = std::nullopt;
227+
228+
if (current_size > 0)
229+
{
230+
auto a = active_buffer.load(std::memory_order_consume);
231+
if (top.compare_exchange_strong(t, t + 1, std::memory_order_seq_cst, std::memory_order_relaxed))
232+
{
233+
stolen = a->get(t);
234+
}
235+
}
236+
237+
token->was_idle.store(true, std::memory_order_release);
238+
239+
auto a = active_buffer.load(std::memory_order_consume);
240+
token->id_last_used.store(a->id, std::memory_order_relaxed);
241+
242+
return stolen;
243+
}
244+
245+
template <typename T>
246+
void pot::algorithms::lfdequeue<T>::reclaim_buffers(ring_buffer *new_buffer)
247+
{
248+
auto min_id = new_buffer->id;
249+
auto head = id_list.load(std::memory_order_relaxed);
250+
251+
while (head)
252+
{
253+
auto idle = head->was_idle.load(std::memory_order_acquire);
254+
if (!idle)
255+
{
256+
auto last_used_id = head->id_last_used.load(std::memory_order_relaxed);
257+
min_id = std::min(min_id, last_used_id);
258+
}
259+
head = head->next;
260+
}
261+
262+
while (unlinked && unlinked->id < min_id)
263+
{
264+
auto reclaimed = unlinked;
265+
unlinked = unlinked->next;
266+
delete reclaimed;
267+
}
268+
}

0 commit comments

Comments
 (0)