Skip to content

Commit a7f3754

Browse files
committed
async_lock_lf
1 parent 1af2fe8 commit a7f3754

File tree

3 files changed

+158
-52
lines changed

3 files changed

+158
-52
lines changed

include/pot/sync/async_lock.h

Lines changed: 51 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,20 @@
22

33
#include <atomic>
44
#include <coroutine>
5-
#include <mutex>
6-
#include <queue>
5+
#include <optional>
6+
#include <thread>
7+
#include <utility>
8+
9+
#include "pot/executors/executor.h"
10+
#include "pot/algorithms/lfqueue.h"
711

812
namespace pot::sync
913
{
14+
class async_lock
15+
{
16+
public:
17+
using queue_type = std::pair<std::coroutine_handle<>, pot::executor*>;
1018

11-
class async_lock
12-
{
13-
public:
1419
class scoped_lock_guard
1520
{
1621
public:
@@ -48,62 +53,59 @@ class async_lock
4853

4954
struct lock_awaiter
5055
{
51-
async_lock &m_lock;
52-
53-
bool await_ready() const noexcept
54-
{
55-
bool expected = false;
56-
return m_lock.m_is_locked.compare_exchange_strong(expected, true,
57-
std::memory_order_acquire);
58-
}
56+
async_lock& m_lock;
57+
pot::executor* m_executor;
5958

60-
bool await_suspend(std::coroutine_handle<> h)
61-
{
62-
std::lock_guard guard(m_lock.m_queue_mutex);
59+
bool await_ready() const noexcept { return false; }
6360

64-
bool expected = false;
65-
if (m_lock.m_is_locked.compare_exchange_strong(expected, true,
66-
std::memory_order_acquire))
67-
{
68-
return false;
69-
}
61+
bool await_suspend(std::coroutine_handle<> handle)
62+
{
63+
int prev_state = m_lock.m_state.fetch_add(1, std::memory_order_acq_rel);
64+
if (prev_state == 0)
65+
{
66+
return false;
67+
}
7068

71-
m_lock.m_waiters.push(h);
72-
return true;
73-
}
69+
while (!m_lock.m_queue.push({handle, m_executor})) std::this_thread::yield();
70+
return true;
71+
}
7472

75-
scoped_lock_guard await_resume() noexcept { return scoped_lock_guard{m_lock}; }
73+
scoped_lock_guard await_resume() noexcept { return scoped_lock_guard(m_lock); }
7674
};
7775

78-
[[nodiscard]] lock_awaiter lock() { return lock_awaiter{*this}; }
76+
[[nodiscard]] lock_awaiter lock()
77+
{
78+
return lock_awaiter{*this, nullptr};
79+
}
80+
81+
[[nodiscard]] lock_awaiter lock(pot::executor* executor)
82+
{
83+
return lock_awaiter{*this, executor};
84+
}
85+
86+
private:
87+
std::atomic<int> m_state{0};
88+
pot::algorithms::lfqueue<queue_type> m_queue;
7989

80-
private:
8190
void unlock()
8291
{
83-
std::coroutine_handle<> next_handle = nullptr;
92+
int prev_state = m_state.fetch_sub(1, std::memory_order_acq_rel);
93+
if (prev_state == 1) return;
8494

95+
std::optional<queue_type> next_task;
96+
97+
while (!(next_task = m_queue.pop())) std::this_thread::yield();
98+
99+
auto [handle, executor] = *next_task;
100+
101+
if (executor)
85102
{
86-
std::lock_guard guard(m_queue_mutex);
87-
if (m_waiters.empty())
88-
{
89-
m_is_locked.store(false, std::memory_order_release);
90-
}
91-
else
92-
{
93-
next_handle = m_waiters.front();
94-
m_waiters.pop();
95-
}
103+
executor->run_detached([h = handle]() mutable { h.resume(); });
96104
}
97-
98-
if (next_handle)
105+
else
99106
{
100-
next_handle.resume();
107+
handle.resume();
101108
}
102109
}
103-
104-
std::atomic<bool> m_is_locked{false};
105-
std::mutex m_queue_mutex;
106-
std::queue<std::coroutine_handle<>> m_waiters;
107-
};
108-
109-
} // namespace pot::sync
110+
};
111+
}

test/CMakeLists.txt

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,20 +4,24 @@ include(${CMAKE_SOURCE_DIR}/cmake/Catch2.cmake)
44
# Create test executable
55
set(TEST_SOURCES
66
# test_async_lock.cpp
7+
test_async_lock_lf.cpp
78
# test_task.cpp
89
# test_parfor.cpp
910
# test_executor.cpp
1011
# test_LU.cpp
11-
test_fill.cpp
12+
# test_fill.cpp
1213
# test_thread.cpp
1314
# test_resume_on.cpp
15+
# test_exp.cpp
16+
# test_mandelbrot.cpp
17+
# test_disb.cpp
1418
)
1519

16-
find_package(Eigen3 3.3 REQUIRED NO_MODULE)
20+
find_package(Eigen3 REQUIRED)
1721

1822
add_executable(tests ${TEST_SOURCES})
1923
target_link_libraries(tests PRIVATE pot Catch2::Catch2WithMain)
20-
target_link_libraries(tests PRIVATE pot Eigen3::Eigen)
24+
target_link_libraries(tests PRIVATE Eigen3::Eigen)
2125
target_link_libraries(tests PRIVATE pot concurrencpp::concurrencpp)
2226
target_link_libraries(tests PRIVATE pot fmt::fmt)
2327
target_link_libraries(tests PRIVATE pot OpenMP::OpenMP_CXX)

test/test_async_lock_lf.cpp

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
#define CATCH_CONFIG_MAIN
2+
#include <catch2/catch_test_macros.hpp>
3+
4+
#include <Eigen/Sparse>
5+
#include <fmt/core.h>
6+
#include <memory>
7+
#include <vector>
8+
#include <chrono>
9+
10+
11+
#include "pot/algorithms/parfor.h"
12+
#include "pot/executors/exp/thread_pool_executor_exp.h"
13+
#include "pot/sync/async_lock.h"
14+
#include "pot/sync/async_lock_m.h"
15+
#include "pot/utils/time_it.h"
16+
17+
18+
std::tuple<double, double, double> calculation(const int64_t row_idx)
19+
{
20+
double vl = 0.0, vc = 0.0, vr = 0.0;
21+
for (auto i = 0; i < 100; ++i)
22+
{
23+
double val = static_cast<double>(row_idx) * 10.0;
24+
vl += -std::exp(std::sin(std::cos(val))) - std::sin(10);
25+
vc += std::exp(std::sin(std::cos(val)));
26+
vr += -std::exp(std::sin(std::cos(val))) + std::sin(10);
27+
}
28+
return {vl, vc, vr};
29+
}
30+
31+
32+
template <typename Executor, typename LockType>
33+
void buildA_pot(const int64_t n, std::shared_ptr<Executor> executor)
34+
{
35+
std::pmr::synchronized_pool_resource pool(std::pmr::new_delete_resource());
36+
pot::memory::set_memory_resource(&pool);
37+
38+
Eigen::SparseMatrix<double> A(n, n);
39+
A.resize(n, n);
40+
41+
LockType locker;
42+
43+
pot::algorithms::parfor(*executor, (int64_t)0, n,
44+
[&](int64_t i) -> pot::coroutines::task<void>
45+
{
46+
auto [vl, vc, vr] = calculation(i);
47+
auto _ = co_await locker.lock(&*executor);
48+
49+
if (i > 0)
50+
A.coeffRef(i, i - 1) = vl;
51+
A.coeffRef(i, i) = vc;
52+
if (i < n - 1)
53+
A.coeffRef(i, i + 1) = vr;
54+
})
55+
.get();
56+
}
57+
58+
TEST_CASE("Async Lock Performance: Std Mutex vs Lock-Free", "[benchmark]")
59+
{
60+
61+
const std::vector<int64_t> thread_counts = {8};
62+
const std::vector<int64_t> sizes = {10'000, 20'000, 30'000, 40'000, 50'000, 60'000};
63+
const size_t test_count = 3;
64+
65+
SECTION("Compare locks on GQ Executor")
66+
{
67+
68+
fmt::print("{:>7} {:>7} | {:>12} {:>12}\n",
69+
"Threads", "Size", "Std Mutex", "Lock-Free");
70+
fmt::print("{:-<45}\n", "");
71+
72+
for (auto thread_count : thread_counts)
73+
{
74+
75+
auto exec_gq = std::make_shared<pot::executors::thread_pool_executor_gq>("GQ", thread_count);
76+
77+
for (auto n : sizes)
78+
{
79+
80+
auto time_lock_std = pot::utils::time_it<std::chrono::duration<double>>(
81+
test_count,
82+
[]() {},
83+
buildA_pot<pot::executors::thread_pool_executor_gq, pot::sync::async_lock_opt>,
84+
n, exec_gq);
85+
86+
87+
auto time_lock_lf = pot::utils::time_it<std::chrono::duration<double>>(
88+
test_count,
89+
[]() {},
90+
buildA_pot<pot::executors::thread_pool_executor_gq, pot::sync::async_lock_lf>,
91+
n, exec_gq);
92+
93+
94+
fmt::print("{:7} {:7} | {:12.5f} {:12.5f}\n",
95+
thread_count, n,
96+
time_lock_std.count(), time_lock_lf.count());
97+
}
98+
}
99+
}
100+
}

0 commit comments

Comments
 (0)