-
Notifications
You must be signed in to change notification settings - Fork 251
Expand file tree
/
Copy pathbasic_tasks.cpp
More file actions
411 lines (335 loc) · 10.5 KB
/
basic_tasks.cpp
File metadata and controls
411 lines (335 loc) · 10.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the Apache 2.0 License.
#include "tasks/basic_task.h"
#include "tasks/task_system.h"
#include "tasks/worker.h"
#include <doctest/doctest.h>
#include <iostream>
#include <numeric>
#include <thread>
#define FMT_HEADER_ONLY
#include <fmt/chrono.h>
#include <fmt/format.h>
#include <fmt/ranges.h>
TEST_CASE("JobBoard" * doctest::test_suite("basic_tasks"))
{
constexpr auto short_wait = std::chrono::milliseconds(10);
ccf::tasks::JobBoard job_board;
ccf::tasks::JobBoard::Summary empty_board{};
REQUIRE(job_board.get_summary() == empty_board);
REQUIRE(job_board.get_task() == nullptr);
REQUIRE(job_board.wait_for_task(short_wait) == nullptr);
// Encapsulate the work to be done in Tasks
// Either as a lambda passed to make_basic_task
std::atomic<bool> a = false;
ccf::tasks::Task toggle_a =
ccf::tasks::make_basic_task([&a]() { a.store(true); });
// Or by extending BaseTask
struct SetAtomic : public ccf::tasks::BaseTask
{
std::atomic<bool>& my_var;
SetAtomic(std::atomic<bool>& v) : my_var(v) {}
void do_task_implementation() override
{
my_var.store(true);
}
const std::string& get_name() const override
{
static const std::string name = "SetAtomic";
return name;
}
};
std::atomic<bool> b = false;
ccf::tasks::Task toggle_b = std::make_shared<SetAtomic>(b);
// These tasks aren't scheduled yet, and can't have been executed!
REQUIRE(job_board.get_summary() == empty_board);
REQUIRE_FALSE(a.load());
REQUIRE_FALSE(b.load());
// Queue them on a job board, where a worker can find them
job_board.add_task(toggle_a);
job_board.add_task(toggle_b);
// Now there's something scheduled
REQUIRE(job_board.get_summary().pending_tasks == 2);
// But it's not _executed_ yet
REQUIRE_FALSE(a.load());
REQUIRE_FALSE(b.load());
// Eventually something like a dedicated worker thread arrives, and asks for a
// task
auto first_task = job_board.get_task();
// They likely take things one-at-a-time, so there's still something scheduled
REQUIRE(job_board.get_summary().pending_tasks == 1);
// Not a critical guarantee, but for now the job board is FIFO, so in this
// constrained example we know exactly what the task is
REQUIRE(first_task == toggle_a);
// This caller has taken ownership of this task, and is now responsible for
// executing it
REQUIRE_FALSE(a.load());
first_task->do_task();
REQUIRE(a.load());
// Then someone, maybe the same worker, arrives and takes the second task
auto second_task = job_board.get_task();
REQUIRE(second_task == toggle_b);
REQUIRE(job_board.get_summary() == empty_board);
REQUIRE_FALSE(b.load());
second_task->do_task();
REQUIRE(b.load());
}
TEST_CASE("Cancellation" * doctest::test_suite("basic_tasks"))
{
ccf::tasks::JobBoard job_board;
// If you keep a handle to a task, you can cancel it...
std::atomic<bool> a = false;
ccf::tasks::Task toggle_a =
ccf::tasks::make_basic_task([&a]() { a.store(true); });
// ... even after it has been scheduled
job_board.add_task(toggle_a);
// ... at any point until some worker calls do_task
auto first_task = job_board.get_task();
REQUIRE(first_task != nullptr);
REQUIRE_FALSE(a.load());
toggle_a->cancel_task();
first_task->do_task();
REQUIRE_FALSE(a.load());
}
TEST_CASE("Scheduling" * doctest::test_suite("basic_tasks"))
{
ccf::tasks::JobBoard job_board;
// Tasks can be scheduled from anywhere, including during execution of
// other tasks
struct WaitPoint
{
std::atomic<bool> passed{false};
void wait()
{
while (!passed.load())
{
std::this_thread::yield();
}
}
void notify()
{
passed.store(true);
}
};
WaitPoint a_started;
WaitPoint b_started;
WaitPoint task_0_started;
WaitPoint task_1_started;
WaitPoint task_2_started;
WaitPoint task_3_started;
WaitPoint task_4_started;
WaitPoint task_5_started;
std::atomic<bool> stop_signal = false;
std::vector<size_t> count_with_me;
std::thread thread_a([&]() {
count_with_me.push_back(0);
a_started.notify();
job_board.add_task(ccf::tasks::make_basic_task([&]() {
task_1_started.wait();
count_with_me.push_back(2);
task_2_started.notify();
job_board.add_task(ccf::tasks::make_basic_task([&]() {
task_3_started.wait();
count_with_me.push_back(4);
task_4_started.notify();
job_board.add_task(ccf::tasks::make_basic_task([&]() {
task_5_started.wait();
count_with_me.push_back(6);
stop_signal.store(true);
}));
}));
}));
});
std::thread thread_b([&]() {
a_started.wait();
job_board.add_task(ccf::tasks::make_basic_task([&]() {
count_with_me.push_back(1);
task_1_started.notify();
job_board.add_task(ccf::tasks::make_basic_task([&]() {
task_4_started.wait();
count_with_me.push_back(5);
task_5_started.notify();
}));
job_board.add_task(ccf::tasks::make_basic_task([&]() {
task_2_started.wait();
count_with_me.push_back(3);
task_3_started.notify();
}));
}));
});
auto worker_fn = [&]() {
while (!stop_signal.load())
{
auto task = job_board.wait_for_task(std::chrono::milliseconds(100));
if (task != nullptr)
{
task->do_task();
}
}
};
std::vector<std::thread> workers;
// Potentially 3 parallel jobs => need at least 3 workers
for (size_t i = 0; i < 3; ++i)
{
workers.emplace_back(worker_fn);
}
std::thread watchdog([&]() {
using Clock = std::chrono::steady_clock;
auto start = Clock::now();
while (!stop_signal.load())
{
auto now = Clock::now();
auto elapsed = now - start;
REQUIRE(elapsed < std::chrono::seconds(1));
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
});
for (auto& worker : workers)
{
worker.join();
}
thread_a.join();
thread_b.join();
watchdog.join();
decltype(count_with_me) target(7);
std::iota(target.begin(), target.end(), 0);
REQUIRE(count_with_me == target);
}
// Call chains for stack trace verification. noinline ensures each
// function survives as a distinct frame in optimised builds.
namespace exception_handling_test
{
__attribute__((noinline)) void level_3_throws_runtime_error()
{
throw std::runtime_error("Test exception");
}
__attribute__((noinline)) void level_2_calls_level_3()
{
level_3_throws_runtime_error();
}
__attribute__((noinline)) void level_1_calls_level_2()
{
level_2_calls_level_3();
}
__attribute__((noinline)) void level_3_throws_int()
{
throw 42;
}
__attribute__((noinline)) void level_2_calls_level_3_int()
{
level_3_throws_int();
}
__attribute__((noinline)) void level_1_calls_level_2_int()
{
level_2_calls_level_3_int();
}
struct ThrowsException : public ccf::tasks::BaseTask
{
void do_task_implementation() override
{
level_1_calls_level_2();
}
const std::string& get_name() const override
{
static const std::string name = "ThrowsException";
return name;
}
};
struct ThrowsUnknown : public ccf::tasks::BaseTask
{
void do_task_implementation() override
{
level_1_calls_level_2_int();
}
const std::string& get_name() const override
{
static const std::string name = "ThrowsUnknown";
return name;
}
};
}
TEST_CASE("Exception handling" * doctest::test_suite("basic_tasks"))
{
// Custom logger that captures log messages for assertion
struct CapturingLogger : public ccf::logger::AbstractLogger
{
std::mutex mutex;
std::vector<std::string> messages;
void write(const ccf::logger::LogLine& ll) override
{
std::lock_guard<std::mutex> lock(mutex);
messages.push_back(ll.msg);
}
bool contains(const std::string& substring)
{
std::lock_guard<std::mutex> lock(mutex);
for (const auto& m : messages)
{
if (m.find(substring) != std::string::npos)
{
return true;
}
}
return false;
}
void clear()
{
std::lock_guard<std::mutex> lock(mutex);
messages.clear();
}
};
auto capturing_logger = std::make_unique<CapturingLogger>();
auto* logger_ptr = capturing_logger.get();
ccf::logger::config::loggers().push_back(std::move(capturing_logger));
// Task that runs successfully after exceptions
std::atomic<bool> success_task_ran = false;
ccf::tasks::Task success_task = ccf::tasks::make_basic_task(
[&success_task_ran]() { success_task_ran.store(true); }, "SuccessTask");
ccf::tasks::JobBoard job_board;
std::atomic<bool> stop_signal = false;
// Queue tasks: two that throw, then one that should still run
job_board.add_task(
std::make_shared<exception_handling_test::ThrowsException>());
job_board.add_task(
std::make_shared<exception_handling_test::ThrowsUnknown>());
job_board.add_task(success_task);
std::thread worker([&]() {
ccf::tasks::task_worker_loop(
job_board, stop_signal, /*abort_on_throw=*/false);
});
// Wait for the success task to run
const auto wait_step = std::chrono::milliseconds(10);
const auto max_wait = std::chrono::seconds(5);
auto waited = std::chrono::milliseconds(0);
while (!success_task_ran.load() && waited < max_wait)
{
std::this_thread::sleep_for(wait_step);
waited += wait_step;
}
stop_signal.store(true);
worker.join();
// With CCF_TASK_EXCEPTION_NO_ABORT, the worker loop continues after
// exceptions, so the success task should have run
REQUIRE(success_task_ran.load());
// Verify that fatal messages were logged for both exception types
REQUIRE(logger_ptr->contains(
"ThrowsException task failed with exception: Test exception"));
REQUIRE(
logger_ptr->contains("ThrowsUnknown task failed with unknown exception"));
// Verify demangled function names appear in the stack traces
// ThrowsException call chain
REQUIRE(logger_ptr->contains("level_3_throws_runtime_error"));
REQUIRE(logger_ptr->contains("level_2_calls_level_3"));
REQUIRE(logger_ptr->contains("level_1_calls_level_2"));
// ThrowsUnknown call chain
REQUIRE(logger_ptr->contains("level_2_calls_level_3_int"));
REQUIRE(logger_ptr->contains("level_1_calls_level_2_int"));
// Clean up: remove the capturing logger
auto& loggers = ccf::logger::config::loggers();
loggers.erase(
std::remove_if(
loggers.begin(),
loggers.end(),
[logger_ptr](const auto& l) { return l.get() == logger_ptr; }),
loggers.end());
}