-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathexecutor.c
More file actions
454 lines (383 loc) · 13.9 KB
/
executor.c
File metadata and controls
454 lines (383 loc) · 13.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
/*
* executor.c — Worker Thread Pool for PTSS
*
* Workers are the execution engines. Each worker thread:
* 1. Sleeps on a condition variable until a task is assigned
* 2. Executes the task function in a controlled environment
* 3. Monitors for timeout / deadline violations
* 4. Reports completion or failure back to scheduler state
* 5. Returns to idle state
*
* Design: Workers never touch the ready queue. The scheduler thread assigns
* tasks to workers. This separation keeps the scheduling logic centralized
* and avoids lock contention between workers.
*
* Error handling: Task crashes (SIGSEGV, etc.) are caught via signal handlers
* set up per-thread. A crashed task is marked FAILED; the worker survives.
*/
#include "protocol.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <errno.h>
#include <setjmp.h>
#include <sys/resource.h>
/* ----------------------------- Thread-Local ------------------------------ */
/*
* Per-thread jump buffer for crash recovery.
* When a task triggers SIGSEGV/SIGFPE, we longjmp back to the worker loop
* instead of killing the entire process.
*/
static __thread sigjmp_buf worker_jmpbuf;
static __thread volatile sig_atomic_t worker_jmpbuf_set = 0;
/* ----------------------------- Signal Handling --------------------------- */
/*
* Task crash handler — installed per-worker-thread.
* Only catches faults that occur during task execution (jmpbuf_set == 1).
*/
static void worker_crash_handler(int sig)
{
if (worker_jmpbuf_set) {
worker_jmpbuf_set = 0;
siglongjmp(worker_jmpbuf, sig);
}
/* If not in a task, re-raise to get default behavior */
signal(sig, SIG_DFL);
raise(sig);
}
static void install_crash_handlers(void)
{
struct sigaction sa;
memset(&sa, 0, sizeof(sa));
sa.sa_handler = worker_crash_handler;
sa.sa_flags = 0;
sigemptyset(&sa.sa_mask);
sigaction(SIGSEGV, &sa, NULL);
sigaction(SIGFPE, &sa, NULL);
sigaction(SIGBUS, &sa, NULL);
sigaction(SIGABRT, &sa, NULL);
}
/* ----------------------------- Resource Monitoring ----------------------- */
/*
* Sample current resource usage for the calling thread/process.
* Best-effort: not all platforms give per-thread CPU accurately.
*/
static void sample_resources(task_resources_t *res)
{
struct rusage ru;
if (getrusage(RUSAGE_THREAD, &ru) == 0) {
/* Rough CPU: user + sys time as a percentage of wall time */
res->cpu_percent = 0; /* Approximation — see note below */
res->memory_kb = (int)(ru.ru_maxrss); /* Linux: in KB */
} else {
memset(res, 0, sizeof(*res));
}
res->child_processes = 0;
}
/*
* Check whether a task's resource limits are exceeded.
* Returns 1 if within limits, 0 if violated.
*/
static int check_resource_limits(task_t *task, task_resources_t *res)
{
if (task->max_memory_kb > 0 && res->memory_kb > task->max_memory_kb)
return 0;
if (task->max_cpu_percent > 0 && res->cpu_percent > task->max_cpu_percent)
return 0;
return 1;
}
/* ----------------------------- Deadline Checking ------------------------- */
/*
* Evaluate how close a task is to its deadline.
* Uses the timeout_seconds as the total allowed window.
*/
static deadline_status_t check_deadline(task_t *t)
{
struct timespec now;
timespec_now(&now);
if (timespec_cmp(&now, &t->deadline) > 0)
return DEADLINE_EXCEEDED;
double remaining_ms = timespec_diff_ms(&t->deadline, &now);
double allowed_ms = (double)t->timeout_seconds * 1000.0;
if (allowed_ms <= 0)
return DEADLINE_OK; /* No timeout set */
double percent_remaining = (remaining_ms * 100.0) / allowed_ms;
if (percent_remaining < 5.0) return DEADLINE_CRITICAL;
if (percent_remaining < 30.0) return DEADLINE_SOON;
return DEADLINE_OK;
}
/* ----------------------------- Task Execution ---------------------------- */
/*
* Execute a single task with full monitoring.
* This is the core of the worker — everything happens here.
*/
static void execute_task(worker_t *w, task_t *task)
{
struct timespec start_ts, end_ts;
int exit_code = -1;
int crashed = 0;
const char *fail_reason = NULL;
/* Record start */
timespec_now(&start_ts);
pthread_mutex_lock(&task->lock);
task->status = TASK_STATUS_RUNNING;
task->started_at = start_ts;
task->assigned_worker = w->id;
pthread_mutex_unlock(&task->lock);
pthread_mutex_lock(&g_scheduler.running_tasks_lock);
g_scheduler.running_task_count++;
pthread_mutex_unlock(&g_scheduler.running_tasks_lock);
ptss_log(LOG_LEVEL_INFO, "Worker %d: executing task %u '%s' (priority=%d, timeout=%ds)",
w->id, task->task_id, task->task_name, task->priority, task->timeout_seconds);
/* Set up crash recovery */
int crash_sig = sigsetjmp(worker_jmpbuf, 1);
if (crash_sig != 0) {
/* We got here via longjmp from a signal handler */
crashed = 1;
fail_reason = (crash_sig == SIGSEGV) ? "SIGSEGV" :
(crash_sig == SIGFPE) ? "SIGFPE" :
(crash_sig == SIGBUS) ? "SIGBUS" :
(crash_sig == SIGABRT) ? "SIGABRT" : "SIGNAL";
ptss_log(LOG_LEVEL_ERROR, "Worker %d: task %u crashed with %s",
w->id, task->task_id, fail_reason);
goto task_done;
}
worker_jmpbuf_set = 1;
/* Execute the task function */
if (task->task_function) {
exit_code = task->task_function(task->task_argument);
} else {
exit_code = 0; /* No-op task */
}
worker_jmpbuf_set = 0;
task_done:
timespec_now(&end_ts);
double duration_ms = timespec_diff_ms(&end_ts, &start_ts);
/* Check deadline after execution */
deadline_status_t ds = check_deadline(task);
/* Update task state */
pthread_mutex_lock(&task->lock);
task->last_completion_time = end_ts;
task->execution_count++;
task->last_exit_code = exit_code;
task->assigned_worker = -1;
if (crashed) {
task->status = TASK_STATUS_FAILED;
task->failed_count++;
snprintf(task->error_message, PTSS_ERROR_MSG_LEN,
"Crashed: %s", fail_reason);
} else if (ds == DEADLINE_EXCEEDED) {
task->status = TASK_STATUS_FAILED;
task->failed_count++;
snprintf(task->error_message, PTSS_ERROR_MSG_LEN,
"Deadline exceeded");
} else if (exit_code != 0) {
task->status = TASK_STATUS_FAILED;
task->failed_count++;
snprintf(task->error_message, PTSS_ERROR_MSG_LEN,
"Exited with code %d", exit_code);
} else {
task->status = TASK_STATUS_COMPLETED;
}
pthread_mutex_unlock(&task->lock);
/* Update global counters */
pthread_mutex_lock(&g_scheduler.running_tasks_lock);
g_scheduler.running_task_count--;
pthread_mutex_unlock(&g_scheduler.running_tasks_lock);
pthread_mutex_lock(&g_scheduler.state_lock);
g_scheduler.total_executed++;
g_scheduler.total_execution_time_ms += duration_ms;
if (task->status == TASK_STATUS_FAILED)
g_scheduler.total_failed++;
pthread_mutex_unlock(&g_scheduler.state_lock);
/* Log execution */
ptss_log_execution(task->task_id, (int)duration_ms, exit_code,
task->status == TASK_STATUS_COMPLETED ? "COMPLETED" :
task->error_message);
ptss_log(LOG_LEVEL_INFO, "Worker %d: task %u '%s' %s in %.1f ms (exit=%d)",
w->id, task->task_id, task->task_name,
task->status == TASK_STATUS_COMPLETED ? "completed" : "failed",
duration_ms, exit_code);
/* Suppress unused warning */
(void)sample_resources;
(void)check_resource_limits;
}
/* ----------------------------- Worker Thread Entry ----------------------- */
/*
* Main loop for each worker thread.
* Workers sleep on their personal condition variable until a task is assigned.
* This avoids thundering-herd problems — only the assigned worker wakes up.
*/
static void *worker_entry(void *arg)
{
worker_t *w = (worker_t *)arg;
install_crash_handlers();
ptss_log(LOG_LEVEL_DEBUG, "Worker %d: started", w->id);
while (1) {
pthread_mutex_lock(&w->lock);
/* Wait for a task to be assigned */
while (w->state == WORKER_IDLE && w->current_task == NULL) {
/* Check if we should exit */
if (w->state == WORKER_EXITING) {
pthread_mutex_unlock(&w->lock);
goto worker_exit;
}
pthread_cond_wait(&w->wake, &w->lock);
}
/* Check exit condition */
if (w->state == WORKER_EXITING) {
pthread_mutex_unlock(&w->lock);
break;
}
task_t *task = w->current_task;
pthread_mutex_unlock(&w->lock);
if (task) {
execute_task(w, task);
/* Return to idle state */
pthread_mutex_lock(&w->lock);
w->current_task = NULL;
w->state = WORKER_IDLE;
pthread_mutex_unlock(&w->lock);
}
}
worker_exit:
ptss_log(LOG_LEVEL_DEBUG, "Worker %d: exiting", w->id);
return NULL;
}
/* ----------------------------- Public API -------------------------------- */
int executor_init(int num_workers)
{
if (num_workers <= 0 || num_workers > PTSS_MAX_WORKERS)
return PTSS_ERR_INVALID_ARG;
g_scheduler.workers = calloc((size_t)num_workers, sizeof(worker_t));
if (!g_scheduler.workers)
return PTSS_ERR_MEMORY;
g_scheduler.worker_count = num_workers;
if (pthread_mutex_init(&g_scheduler.running_tasks_lock, NULL) != 0) {
free(g_scheduler.workers);
return PTSS_ERR_THREAD;
}
for (int i = 0; i < num_workers; i++) {
worker_t *w = &g_scheduler.workers[i];
w->id = i;
w->state = WORKER_IDLE;
w->current_task = NULL;
if (pthread_mutex_init(&w->lock, NULL) != 0 ||
pthread_cond_init(&w->wake, NULL) != 0) {
/* Cleanup already-initialized workers */
for (int j = 0; j < i; j++) {
pthread_mutex_destroy(&g_scheduler.workers[j].lock);
pthread_cond_destroy(&g_scheduler.workers[j].wake);
}
free(g_scheduler.workers);
return PTSS_ERR_THREAD;
}
if (pthread_create(&w->thread, NULL, worker_entry, w) != 0) {
ptss_log(LOG_LEVEL_ERROR, "Failed to create worker thread %d", i);
/* Mark already-created threads for exit */
for (int j = 0; j < i; j++) {
pthread_mutex_lock(&g_scheduler.workers[j].lock);
g_scheduler.workers[j].state = WORKER_EXITING;
pthread_cond_signal(&g_scheduler.workers[j].wake);
pthread_mutex_unlock(&g_scheduler.workers[j].lock);
pthread_join(g_scheduler.workers[j].thread, NULL);
pthread_mutex_destroy(&g_scheduler.workers[j].lock);
pthread_cond_destroy(&g_scheduler.workers[j].wake);
}
free(g_scheduler.workers);
return PTSS_ERR_THREAD;
}
}
ptss_log(LOG_LEVEL_INFO, "Executor initialized with %d workers", num_workers);
return PTSS_OK;
}
void executor_destroy(void)
{
if (!g_scheduler.workers)
return;
/* Signal all workers to exit */
for (int i = 0; i < g_scheduler.worker_count; i++) {
worker_t *w = &g_scheduler.workers[i];
pthread_mutex_lock(&w->lock);
w->state = WORKER_EXITING;
pthread_cond_signal(&w->wake);
pthread_mutex_unlock(&w->lock);
}
/* Wait for all workers to finish */
for (int i = 0; i < g_scheduler.worker_count; i++) {
pthread_join(g_scheduler.workers[i].thread, NULL);
pthread_mutex_destroy(&g_scheduler.workers[i].lock);
pthread_cond_destroy(&g_scheduler.workers[i].wake);
}
pthread_mutex_destroy(&g_scheduler.running_tasks_lock);
free(g_scheduler.workers);
g_scheduler.workers = NULL;
g_scheduler.worker_count = 0;
}
/*
* Assign a task to the first idle worker.
* Returns PTSS_OK on success, PTSS_ERR_FULL if all workers are busy.
*/
int executor_assign_task(task_t *task)
{
if (!task || !g_scheduler.workers)
return PTSS_ERR_INVALID_ARG;
for (int i = 0; i < g_scheduler.worker_count; i++) {
worker_t *w = &g_scheduler.workers[i];
pthread_mutex_lock(&w->lock);
if (w->state == WORKER_IDLE && w->current_task == NULL) {
w->current_task = task;
w->state = WORKER_BUSY;
pthread_cond_signal(&w->wake);
pthread_mutex_unlock(&w->lock);
return PTSS_OK;
}
pthread_mutex_unlock(&w->lock);
}
return PTSS_ERR_FULL; /* All workers busy */
}
/*
* Find the first idle worker. Returns worker index, or -1 if none available.
*/
int executor_get_idle_worker(void)
{
if (!g_scheduler.workers)
return -1;
for (int i = 0; i < g_scheduler.worker_count; i++) {
worker_t *w = &g_scheduler.workers[i];
pthread_mutex_lock(&w->lock);
int idle = (w->state == WORKER_IDLE && w->current_task == NULL);
pthread_mutex_unlock(&w->lock);
if (idle)
return i;
}
return -1;
}
/*
* Request soft shutdown of all running tasks.
* Sets stop_requested flag on each task so it can exit gracefully.
*/
void executor_request_shutdown(void)
{
if (!g_scheduler.workers)
return;
for (int i = 0; i < g_scheduler.worker_count; i++) {
worker_t *w = &g_scheduler.workers[i];
pthread_mutex_lock(&w->lock);
if (w->current_task) {
w->current_task->stop_requested = 1;
}
pthread_mutex_unlock(&w->lock);
}
}
/*
* Count of currently running tasks.
*/
int executor_running_count(void)
{
pthread_mutex_lock(&g_scheduler.running_tasks_lock);
int count = g_scheduler.running_task_count;
pthread_mutex_unlock(&g_scheduler.running_tasks_lock);
return count;
}