Skip to content

Commit 5879726

Browse files
committed
The logic of task/lazy_task has been changed
1 parent f2fdb63 commit 5879726

15 files changed

Lines changed: 568 additions & 480 deletions

File tree

CMakeLists.txt

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,16 +36,14 @@ set(POT_HEADERS
3636

3737
include/${PROJECT_NAME}/utils/time_it.h
3838
include/${PROJECT_NAME}/utils/platform.h
39-
# include/${PROJECT_NAME}/utils/this_thread.h
39+
include/${PROJECT_NAME}/utils/this_thread.h
4040
include/${PROJECT_NAME}/utils/cache_line.h
4141
include/${PROJECT_NAME}/utils/unique_function.h
4242

4343
include/${PROJECT_NAME}/executors/executor.h
4444
include/${PROJECT_NAME}/executors/inline_executor.h
4545
include/${PROJECT_NAME}/executors/thread_executor.h
4646
include/${PROJECT_NAME}/executors/thread_pool_executor.h
47-
include/${PROJECT_NAME}/executors/thread_pool_executor_lfgq.h
48-
include/${PROJECT_NAME}/executors/test_executor.h
4947

5048
include/${PROJECT_NAME}/coroutines/task.h
5149
include/${PROJECT_NAME}/coroutines/async_condition_variable.h
@@ -63,10 +61,8 @@ set(POT_HEADERS
6361
)
6462

6563
set(POT_SOURCES
66-
# src/utils/this_thread.cpp
64+
src/utils/this_thread.cpp
6765

68-
# src/threads/thread.cpp
69-
7066
# src/executors/inline_executor.cpp
7167
# src/executors/thread_executor.cpp
7268
)

include/pot/algorithms/parfor.h

Lines changed: 22 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include <cassert>
66
#include <memory>
77

8+
#include "pot/coroutines/resume_on.h"
89
#include "pot/executors/executor.h"
910

1011
namespace pot::algorithms
@@ -36,49 +37,46 @@ namespace pot::algorithms
3637
template <int64_t static_chunk_size = -1, typename IndexType, typename FuncType = void(IndexType)>
3738
requires std::invocable<FuncType &, IndexType>
3839
pot::coroutines::lazy_task<void>
39-
parfor(pot::executor &executor, IndexType from, IndexType to, FuncType&& func)
40+
parfor(pot::executor &executor, IndexType from, IndexType to, FuncType func)
4041
{
4142
assert(from < to);
4243

4344
const int64_t numIterations = static_cast<int64_t>(to - from);
4445

4546
int64_t chunk_size = static_chunk_size;
4647
if (chunk_size < 0)
47-
chunk_size = std::max<int64_t>(1, numIterations / static_cast<int64_t>(executor.thread_count()));
48+
chunk_size = std::max<int64_t>(1ull, numIterations / static_cast<int64_t>(executor.thread_count()));
4849

4950
const int64_t numChunks = (numIterations + chunk_size - 1) / chunk_size;
5051

5152
std::vector<pot::coroutines::task<void>> tasks;
5253
tasks.reserve(static_cast<size_t>(numChunks));
53-
54+
5455
for (int64_t chunkIndex = 0; chunkIndex < numChunks; ++chunkIndex)
5556
{
5657
const IndexType chunkStart = from + static_cast<IndexType>(chunkIndex * chunk_size);
5758
const IndexType chunkEnd = std::min<IndexType>(chunkStart + static_cast<IndexType>(chunk_size), to);
58-
59-
auto task = executor.run(
60-
[chunkStart, chunkEnd, func = std::forward<FuncType>(func)]() mutable -> pot::coroutines::task<void>
59+
60+
tasks.emplace_back(executor.run([=]() mutable -> pot::coroutines::task<void>
61+
{
62+
for (IndexType i = chunkStart; i < chunkEnd; ++i)
6163
{
62-
for (IndexType i = chunkStart; i < chunkEnd; ++i)
64+
using Ret = std::invoke_result_t<decltype(func) &, IndexType>;
65+
if constexpr (pot::traits::is_task_v<Ret> || pot::traits::is_lazy_task_v<Ret>)
6366
{
64-
using Ret = std::invoke_result_t<decltype(func) &, IndexType>;
65-
if constexpr (pot::traits::is_task_v<Ret> || pot::traits::is_lazy_task_v<Ret>)
66-
{
67-
co_await std::invoke(func, i);
68-
}
69-
else
70-
{
71-
std::invoke(func, i);
72-
}
67+
co_await std::invoke(func, i);
7368
}
74-
co_return;
75-
});
76-
77-
tasks.emplace_back(std::move(task));
69+
else
70+
{
71+
std::invoke(func, i);
72+
}
73+
}
74+
co_return;
75+
}));
7876
}
79-
80-
for (auto it = tasks.begin(); it != tasks.end(); ++it)
81-
std::move(*it).sync_wait();
77+
78+
co_await pot::coroutines::when_all(tasks);
79+
// for (auto && t : tasks) std::move(t).get(); // temp
8280
co_return;
8381
}
84-
}
82+
}

include/pot/coroutines/task.h

Lines changed: 103 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,12 @@ namespace pot::coroutines::detail
2323
{
2424
bool await_ready() const noexcept { return false; }
2525
template <typename PROMISE>
26-
std::coroutine_handle<> await_suspend(std::coroutine_handle<PROMISE> h) const noexcept
26+
std::coroutine_handle<> await_suspend(std::coroutine_handle<PROMISE> handle) const noexcept
2727
{
28-
return h.promise().m_continuation ? h.promise().m_continuation : std::noop_coroutine();
28+
handle.promise().m_ready.store(true, std::memory_order_release);
29+
if (handle.promise().m_continuation)
30+
return handle.promise().m_continuation;
31+
return std::noop_coroutine();
2932
}
3033
void await_resume() const noexcept {}
3134
};
@@ -40,23 +43,17 @@ namespace pot::coroutines::detail
4043
void set_value(U &&value)
4144
{
4245
m_data.template emplace<T>(std::forward<U>(value));
43-
if (m_ready.exchange(true, std::memory_order_release))
44-
throw std::runtime_error("Value already set in promise_type.");
4546
}
4647

4748
void set_value()
4849
requires(std::is_void_v<T>)
4950
{
5051
m_data.template emplace<std::monostate>();
51-
if (m_ready.exchange(true, std::memory_order_release))
52-
throw std::runtime_error("Void value already set in promise_type.");
5352
}
5453

5554
void set_exception(std::exception_ptr exception)
5655
{
5756
m_data.template emplace<std::exception_ptr>(exception);
58-
if (m_ready.exchange(true, std::memory_order_release))
59-
throw std::runtime_error("Exception already set in promise_type.");
6057
}
6158

6259
bool is_ready() const noexcept { return m_ready.load(std::memory_order_acquire); }
@@ -99,9 +96,9 @@ namespace pot::coroutines
9996
* @tparam T The result type (may be `void`).
10097
*
10198
* @return
102-
* - `co_await lazy_task<T>`: Suspends caller until the coroutine completes.
99+
* - `co_await lazy_task<T>`: Suspends caller until the coroutine completes.
103100
* Returns the stored value (or nothing for `void`).
104-
* - `get()`: Runs to completion synchronously and returns the result.
101+
* - `get()`: Runs to completion synchronously and returns the result.
105102
* - `sync_wait()`: Busy-waits until ready and then returns the result.
106103
*/
107104
template <typename T>
@@ -113,7 +110,7 @@ namespace pot::coroutines
113110
using value_type = T;
114111

115112
lazy_task() = default;
116-
explicit lazy_task(handle_type h) noexcept : m_handle(h) {}
113+
explicit lazy_task(handle_type handle) noexcept : m_handle(handle) {}
117114

118115
lazy_task(lazy_task const &) = delete;
119116
lazy_task &operator=(lazy_task const &) = delete;
@@ -131,55 +128,70 @@ namespace pot::coroutines
131128
return *this;
132129
}
133130

134-
explicit operator bool() const noexcept { return m_handle && m_handle.done(); }
131+
explicit operator bool() const noexcept { return m_handle != nullptr; }
135132

136-
bool await_ready() const noexcept
133+
auto operator co_await() && noexcept
137134
{
138-
return !m_handle || m_handle.promise().is_ready();
135+
struct awaiter
136+
{
137+
handle_type handle;
138+
139+
bool await_ready() const noexcept
140+
{
141+
return !handle || handle.promise().is_ready();
142+
}
143+
144+
void await_suspend(std::coroutine_handle<> awaiting) noexcept
145+
{
146+
handle.promise().m_continuation = awaiting;
147+
148+
if (!handle || handle.promise().is_ready())
149+
awaiting.resume();
150+
151+
if (handle && !handle.done())
152+
handle.resume();
153+
}
154+
155+
auto await_resume()
156+
{
157+
if constexpr (std::is_void_v<T>)
158+
{
159+
handle.promise().get();
160+
handle.destroy();
161+
return;
162+
}
163+
else
164+
{
165+
auto result = handle.promise().get();
166+
handle.destroy();
167+
return result;
168+
}
169+
}
170+
};
171+
172+
return awaiter{std::exchange(m_handle, {})};
139173
}
140174

141-
template <typename PROMISE>
142-
void await_suspend(std::coroutine_handle<PROMISE> awaiting)
175+
auto get()
143176
{
144-
m_handle.promise().m_continuation = awaiting;
145-
if (!m_handle.done())
146-
m_handle.resume();
147-
}
177+
auto handle = std::exchange(m_handle, {});
178+
if (handle && !handle.promise().is_ready())
179+
handle.resume();
148180

149-
auto await_resume()
150-
{
151181
if constexpr (std::is_void_v<T>)
152182
{
153-
m_handle.promise().get();
183+
handle.promise().get();
184+
handle.destroy();
154185
return;
155186
}
156187
else
157188
{
158-
return m_handle.promise().get();
189+
auto result = handle.promise().get();
190+
handle.destroy();
191+
return result;
159192
}
160193
}
161194

162-
auto get()
163-
{
164-
if (m_handle && !m_handle.promise().is_ready())
165-
m_handle.resume();
166-
167-
if constexpr (std::is_void_v<T>)
168-
m_handle.promise().get();
169-
else
170-
return m_handle.promise().get();
171-
}
172-
173-
auto sync_wait()
174-
{
175-
m_handle.promise().wait();
176-
177-
if constexpr (std::is_void_v<T>)
178-
m_handle.promise().get();
179-
else
180-
return m_handle.promise().get();
181-
}
182-
183195
~lazy_task()
184196
{
185197
if (m_handle)
@@ -233,51 +245,65 @@ namespace pot::coroutines
233245
return *this;
234246
}
235247

236-
explicit operator bool() const noexcept { return m_handle && m_handle.done(); }
248+
explicit operator bool() const noexcept { return m_handle != nullptr; }
237249

238-
bool await_ready() const noexcept { return !m_handle || m_handle.promise().is_ready(); }
239-
240-
template <typename PROMISE>
241-
void await_suspend(std::coroutine_handle<PROMISE> awaiting)
250+
auto operator co_await() && noexcept
242251
{
243-
m_handle.promise().m_continuation = awaiting;
244-
if (!m_handle.done())
245-
m_handle.resume();
252+
struct awaiter
253+
{
254+
handle_type handle;
255+
256+
bool await_ready() const noexcept
257+
{
258+
return !handle || handle.promise().is_ready();
259+
}
260+
261+
void await_suspend(std::coroutine_handle<> awaiting) noexcept
262+
{
263+
awaiting.resume();
264+
}
265+
266+
auto await_resume()
267+
{
268+
if constexpr (std::is_void_v<T>)
269+
{
270+
handle.promise().get();
271+
handle.destroy();
272+
return;
273+
}
274+
else
275+
{
276+
auto res = handle.promise().get();
277+
handle.destroy();
278+
return res;
279+
}
280+
}
281+
};
282+
283+
return awaiter{std::exchange(m_handle, {})};
246284
}
247285

248-
auto await_resume()
286+
auto get()
249287
{
288+
auto handle = std::exchange(m_handle, {});
289+
if (!handle)
290+
throw std::runtime_error("get() on empty lazy_task");
291+
250292
if constexpr (std::is_void_v<T>)
251293
{
252-
m_handle.promise().get();
294+
handle.promise().get();
295+
handle.destroy();
253296
return;
254297
}
255298
else
256299
{
257-
return m_handle.promise().get();
300+
auto result = handle.promise().get();
301+
handle.destroy();
302+
return result;
258303
}
259304
}
260305

261-
auto get()
262-
{
263-
if (m_handle && !m_handle.promise().is_ready())
264-
m_handle.resume();
265-
266-
if constexpr (std::is_void_v<T>)
267-
m_handle.promise().get();
268-
else
269-
return m_handle.promise().get();
270-
}
271-
272-
auto sync_wait()
273-
{
274-
m_handle.promise().wait();
275-
276-
if constexpr (std::is_void_v<T>)
277-
m_handle.promise().get();
278-
else
279-
return m_handle.promise().get();
280-
}
306+
void sync_wait() {}
281307

282308
~task()
283309
{
@@ -410,4 +436,4 @@ namespace pot::traits
410436
};
411437
template <typename T>
412438
using awaitable_value_t = typename awaitable_value<T>::type;
413-
} // namespace pot
439+
} // namespace pot

0 commit comments

Comments
 (0)