diff --git a/src/emc/motion/emcmotutil.c b/src/emc/motion/emcmotutil.c index 94c3b020f03..8adabd9bb61 100644 --- a/src/emc/motion/emcmotutil.c +++ b/src/emc/motion/emcmotutil.c @@ -7,7 +7,7 @@ * Author: * License: GPL Version 2 * System: Linux -* +* * Copyright (c) 2004 All rights reserved. ********************************************************************/ @@ -16,17 +16,33 @@ #include "dbuf.h" #include "stashf.h" +/* + * Lockfree multi-producer single-consumer (MPSC) ring buffer. + * + * Producers (real-time threads invoked via the rtapi message handler) may + * race; the consumer (the userspace task) is single-threaded. + * + * Producer: + * 1. CAS-loop on write_reserve to claim seq S; bail if buffer full. + * 2. Write payload into slot S % EMCMOT_ERROR_NUM. + * 3. fetch_add(write_commit, 1) with release ordering. + * Consumer: + * 1. If write_reserve != write_commit, some producer is mid-write; skip + * (return -1) so the RT producer is never blocked. Next consumer call + * will retry. + * 2. Otherwise drain one slot at read_seq, increment read_seq. + */ + int emcmotErrorInit(emcmot_error_t * errlog) { if (errlog == 0) { return -1; } - errlog->head = 0; - errlog->start = 0; - errlog->end = 0; - errlog->num = 0; - errlog->tail = 0; + errlog->write_reserve = 0; + errlog->write_commit = 0; + errlog->read_seq = 0; + atomic_thread_fence(memory_order_release); return 0; } @@ -35,22 +51,38 @@ int emcmotErrorPutfv(emcmot_error_t * errlog, const char *fmt, va_list ap) { struct dbuf errbuf; struct dbuf_iter it; + unsigned long long my_seq; - if (errlog == 0 || errlog->num == EMCMOT_ERROR_NUM) { - /* full */ + if (errlog == 0) { return -1; } - errlog->head++; + { + unsigned long long w = atomic_load_explicit(&errlog->write_reserve, + memory_order_relaxed); + for (;;) { + unsigned long long r = atomic_load_explicit(&errlog->read_seq, + memory_order_acquire); + if (w - r >= (unsigned long long)EMCMOT_ERROR_NUM) { + return -1; + } + if (atomic_compare_exchange_weak_explicit( + &errlog->write_reserve, &w, w + 1, + memory_order_acquire, memory_order_relaxed)) { + my_seq = w; + break; + } + } + } - dbuf_init(&errbuf, (unsigned char*)errlog->error[errlog->end], EMCMOT_ERROR_LEN); + dbuf_init(&errbuf, + (unsigned char*)errlog->error[my_seq % EMCMOT_ERROR_NUM], + EMCMOT_ERROR_LEN); dbuf_iter_init(&it, &errbuf); vstashf(&it, fmt, ap); - errlog->end = (errlog->end + 1) % EMCMOT_ERROR_NUM; - errlog->num++; - - errlog->tail = errlog->head; + atomic_fetch_add_explicit(&errlog->write_commit, 1ULL, + memory_order_release); return 0; } @@ -72,16 +104,29 @@ int emcmotErrorPut(emcmot_error_t *errlog, const char *error) int emcmotErrorGet(emcmot_error_t * errlog, char *error) { - if (errlog == 0 || errlog->num == 0) { - /* empty */ + unsigned long long r; + unsigned long long w_res; + unsigned long long w_com; + + if (errlog == 0) { + return -1; + } + + w_com = atomic_load_explicit(&errlog->write_commit, memory_order_acquire); + w_res = atomic_load_explicit(&errlog->write_reserve, + memory_order_acquire); + if (w_res != w_com) { + /* a producer is mid-write; try again next cycle */ + return -1; + } + + r = atomic_load_explicit(&errlog->read_seq, memory_order_relaxed); + if (r >= w_com) { return -1; } - errlog->head++; - memcpy(error, errlog->error[errlog->start], EMCMOT_ERROR_LEN); - errlog->start = (errlog->start + 1) % EMCMOT_ERROR_NUM; - errlog->num--; - errlog->tail = errlog->head; + memcpy(error, errlog->error[r % EMCMOT_ERROR_NUM], EMCMOT_ERROR_LEN); + atomic_store_explicit(&errlog->read_seq, r + 1, memory_order_release); return 0; } diff --git a/src/emc/motion/motion.h b/src/emc/motion/motion.h index 31183a18356..9c62af046cf 100644 --- a/src/emc/motion/motion.h +++ b/src/emc/motion/motion.h @@ -82,6 +82,9 @@ to another. #define MOTION_INVALID_ID INT_MIN #define MOTION_ID_VALID(x) ((x) != MOTION_INVALID_ID) +#include /* must precede rtapi_atomic.h in kernel mode */ +#include + #ifdef __cplusplus extern "C" { #endif @@ -739,14 +742,12 @@ Suggestion: Split this in to an Error and a Status flag register.. int inhibit_probe_home_error; } emcmot_config_t; -/* error structure - A ring buffer used to pass formatted printf strings to usr space */ +/* error structure - lockfree MPSC ring buffer. See emcmotutil.c. */ typedef struct emcmot_error_t { - unsigned char head; /* flag count for mutex detect */ char error[EMCMOT_ERROR_NUM][EMCMOT_ERROR_LEN]; - int start; /* index of oldest error */ - int end; /* index of newest error */ - int num; /* number of items */ - unsigned char tail; /* flag count for mutex detect */ + rtapi_atomic_ullong write_reserve; + rtapi_atomic_ullong write_commit; + rtapi_atomic_ullong read_seq; } emcmot_error_t; diff --git a/src/rtapi/rtapi_atomic.h b/src/rtapi/rtapi_atomic.h index 7af2b8d43ad..e0d2049b623 100644 --- a/src/rtapi/rtapi_atomic.h +++ b/src/rtapi/rtapi_atomic.h @@ -46,4 +46,36 @@ #endif // defined(__cplusplus) +/* Prefixed aliases for the C11 atomic typedefs. C++ pre-C++23 does not + expose the unqualified typedefs at global scope, so use + these names when declaring atomic fields in headers shared between C + and C++ translation units. */ +#if defined(__cplusplus) +typedef std::atomic_bool rtapi_atomic_bool; +typedef std::atomic_char rtapi_atomic_char; +typedef std::atomic_schar rtapi_atomic_schar; +typedef std::atomic_uchar rtapi_atomic_uchar; +typedef std::atomic_short rtapi_atomic_short; +typedef std::atomic_ushort rtapi_atomic_ushort; +typedef std::atomic_int rtapi_atomic_int; +typedef std::atomic_uint rtapi_atomic_uint; +typedef std::atomic_long rtapi_atomic_long; +typedef std::atomic_ulong rtapi_atomic_ulong; +typedef std::atomic_llong rtapi_atomic_llong; +typedef std::atomic_ullong rtapi_atomic_ullong; +#else +typedef atomic_bool rtapi_atomic_bool; +typedef atomic_char rtapi_atomic_char; +typedef atomic_schar rtapi_atomic_schar; +typedef atomic_uchar rtapi_atomic_uchar; +typedef atomic_short rtapi_atomic_short; +typedef atomic_ushort rtapi_atomic_ushort; +typedef atomic_int rtapi_atomic_int; +typedef atomic_uint rtapi_atomic_uint; +typedef atomic_long rtapi_atomic_long; +typedef atomic_ulong rtapi_atomic_ulong; +typedef atomic_llong rtapi_atomic_llong; +typedef atomic_ullong rtapi_atomic_ullong; +#endif + #endif diff --git a/tests/lowlevel/emcmot-error-mpsc/expected b/tests/lowlevel/emcmot-error-mpsc/expected new file mode 100644 index 00000000000..c9634c3f9ad --- /dev/null +++ b/tests/lowlevel/emcmot-error-mpsc/expected @@ -0,0 +1 @@ +test passed diff --git a/tests/lowlevel/emcmot-error-mpsc/test.c b/tests/lowlevel/emcmot-error-mpsc/test.c new file mode 100644 index 00000000000..a550f752d5c --- /dev/null +++ b/tests/lowlevel/emcmot-error-mpsc/test.c @@ -0,0 +1,112 @@ +/* + * Concurrency / correctness test for the emcmotError MPSC ring buffer. + * + * Spawns NPROD producer threads pumping NMSG messages each, plus one + * consumer that drains. Verifies every message arrives exactly once + * and that messages from each individual producer arrive in producer- + * local sequence order. Across producers any interleaving is allowed. + */ + +#include +#include +#include +#include +#include +#include + +#include +#include "motion.h" +#include "dbuf.h" +#include "stashf.h" +#include + +#define NPROD 8 +#define NMSG 100000 + +static emcmot_error_t errlog; +static rtapi_atomic_int producers_done = 0; + +static void *producer_fn(void *arg) { + long id = (long)arg; + char buf[64]; + for (int i = 0; i < NMSG; i++) { + snprintf(buf, sizeof(buf), "P%ld:%010d", id, i); + while (emcmotErrorPut(&errlog, buf) < 0) { + sched_yield(); + } + } + atomic_fetch_add(&producers_done, 1); + return NULL; +} + +static void *consumer_fn(void *arg) { + (void)arg; + long total = (long)NPROD * NMSG; + long received = 0; + int per_producer[NPROD] = {0}; + char raw[EMCMOT_ERROR_LEN]; + char msg[EMCMOT_ERROR_LEN]; + int errors = 0; + + while (received < total) { + struct dbuf d; + struct dbuf_iter di; + /* dbuf_init zero-fills the buffer, so call it BEFORE + emcmotErrorGet, which then writes the received bytes in. */ + dbuf_init(&d, (unsigned char *)raw, EMCMOT_ERROR_LEN); + if (emcmotErrorGet(&errlog, raw) == 0) { + dbuf_iter_init(&di, &d); + if (snprintdbuf(msg, sizeof(msg), &di) < 0) { + fprintf(stderr, "snprintdbuf failed\n"); + errors++; + received++; + continue; + } + long pid; + int seq; + if (sscanf(msg, "P%ld:%d", &pid, &seq) != 2 + || pid < 0 || pid >= NPROD) { + fprintf(stderr, "bad msg: '%s'\n", msg); + errors++; + } else if (seq != per_producer[pid]) { + fprintf(stderr, "out-of-order P%ld: got %d, want %d\n", + pid, seq, per_producer[pid]); + errors++; + } else { + per_producer[pid]++; + } + received++; + } else { + if (atomic_load(&producers_done) >= NPROD) { + struct timespec ts = {0, 1000000}; + nanosleep(&ts, NULL); + } else { + sched_yield(); + } + } + } + + if (errors == 0) { + printf("test passed\n"); + } else { + printf("test FAILED with %d errors\n", errors); + } + return errors == 0 ? (void *)0 : (void *)1; +} + +int main(void) { + emcmotErrorInit(&errlog); + + pthread_t prods[NPROD]; + pthread_t cons; + pthread_create(&cons, NULL, consumer_fn, NULL); + for (long i = 0; i < NPROD; i++) { + pthread_create(&prods[i], NULL, producer_fn, (void *)i); + } + for (long i = 0; i < NPROD; i++) { + pthread_join(prods[i], NULL); + } + void *cret; + pthread_join(cons, &cret); + return cret ? 1 : 0; +} diff --git a/tests/lowlevel/emcmot-error-mpsc/test.sh b/tests/lowlevel/emcmot-error-mpsc/test.sh new file mode 100755 index 00000000000..82985bcb627 --- /dev/null +++ b/tests/lowlevel/emcmot-error-mpsc/test.sh @@ -0,0 +1,27 @@ +#!/bin/sh +# Concurrency / correctness test for the emcmotError MPSC ring buffer. +# RIP-only: builds the test driver against the in-tree motion sources. + +SCRIPTDIR="$(cd "$(dirname "$0")" && pwd)" +TOPDIR="$(cd "$SCRIPTDIR/../../.." && pwd)" +SRCDIR="${TOPDIR}/src/emc/motion" +INCDIR="${TOPDIR}/include" + +if [ ! -f "${SRCDIR}/emcmotutil.c" ]; then + echo "skip: motion sources not found (RIP build required)" + exit 0 +fi + +gcc -O -pthread -DULAPI -Wall \ + -I"${INCDIR}" \ + -I"${SRCDIR}" \ + "${SCRIPTDIR}/test.c" \ + "${SRCDIR}/emcmotutil.c" \ + "${SRCDIR}/dbuf.c" \ + "${SRCDIR}/stashf.c" \ + -o "${SCRIPTDIR}/test" || { echo "compile failed"; exit 1; } + +"${SCRIPTDIR}/test" +exitval=$? +rm -f "${SCRIPTDIR}/test" +exit $exitval