Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 66 additions & 21 deletions src/emc/motion/emcmotutil.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
* Author:
* License: GPL Version 2
* System: Linux
*
*
* Copyright (c) 2004 All rights reserved.
********************************************************************/

Expand All @@ -16,17 +16,33 @@
#include "dbuf.h"
#include "stashf.h"

/*
* Lockfree multi-producer single-consumer (MPSC) ring buffer.
*
* Producers (real-time threads invoked via the rtapi message handler) may
* race; the consumer (the userspace task) is single-threaded.
*
* Producer:
* 1. CAS-loop on write_reserve to claim seq S; bail if buffer full.
* 2. Write payload into slot S % EMCMOT_ERROR_NUM.
* 3. fetch_add(write_commit, 1) with release ordering.
* Consumer:
* 1. If write_reserve != write_commit, some producer is mid-write; skip
* (return -1) so the RT producer is never blocked. Next consumer call
* will retry.
* 2. Otherwise drain one slot at read_seq, increment read_seq.
*/

int emcmotErrorInit(emcmot_error_t * errlog)
{
if (errlog == 0) {
return -1;
}

errlog->head = 0;
errlog->start = 0;
errlog->end = 0;
errlog->num = 0;
errlog->tail = 0;
errlog->write_reserve = 0;
errlog->write_commit = 0;
errlog->read_seq = 0;
atomic_thread_fence(memory_order_release);
Comment thread
BsAtHome marked this conversation as resolved.

return 0;
}
Expand All @@ -35,22 +51,38 @@ int emcmotErrorPutfv(emcmot_error_t * errlog, const char *fmt, va_list ap)
{
struct dbuf errbuf;
struct dbuf_iter it;
unsigned long long my_seq;

if (errlog == 0 || errlog->num == EMCMOT_ERROR_NUM) {
/* full */
if (errlog == 0) {
return -1;
}

errlog->head++;
{
unsigned long long w = atomic_load_explicit(&errlog->write_reserve,
memory_order_relaxed);
for (;;) {
unsigned long long r = atomic_load_explicit(&errlog->read_seq,
memory_order_acquire);
if (w - r >= (unsigned long long)EMCMOT_ERROR_NUM) {
return -1;
}
if (atomic_compare_exchange_weak_explicit(
&errlog->write_reserve, &w, w + 1,
memory_order_acquire, memory_order_relaxed)) {
my_seq = w;
break;
}
}
}

dbuf_init(&errbuf, (unsigned char*)errlog->error[errlog->end], EMCMOT_ERROR_LEN);
dbuf_init(&errbuf,
(unsigned char*)errlog->error[my_seq % EMCMOT_ERROR_NUM],
EMCMOT_ERROR_LEN);
dbuf_iter_init(&it, &errbuf);
vstashf(&it, fmt, ap);

errlog->end = (errlog->end + 1) % EMCMOT_ERROR_NUM;
errlog->num++;

errlog->tail = errlog->head;
atomic_fetch_add_explicit(&errlog->write_commit, 1ULL,
memory_order_release);

return 0;
}
Expand All @@ -72,16 +104,29 @@ int emcmotErrorPut(emcmot_error_t *errlog, const char *error)

int emcmotErrorGet(emcmot_error_t * errlog, char *error)
{
if (errlog == 0 || errlog->num == 0) {
/* empty */
unsigned long long r;
unsigned long long w_res;
unsigned long long w_com;

if (errlog == 0) {
return -1;
}

w_com = atomic_load_explicit(&errlog->write_commit, memory_order_acquire);
w_res = atomic_load_explicit(&errlog->write_reserve,
memory_order_acquire);
if (w_res != w_com) {
/* a producer is mid-write; try again next cycle */
return -1;
}

r = atomic_load_explicit(&errlog->read_seq, memory_order_relaxed);
if (r >= w_com) {
return -1;
}

errlog->head++;
memcpy(error, errlog->error[errlog->start], EMCMOT_ERROR_LEN);
errlog->start = (errlog->start + 1) % EMCMOT_ERROR_NUM;
errlog->num--;
errlog->tail = errlog->head;
memcpy(error, errlog->error[r % EMCMOT_ERROR_NUM], EMCMOT_ERROR_LEN);
atomic_store_explicit(&errlog->read_seq, r + 1, memory_order_release);

return 0;
}
13 changes: 7 additions & 6 deletions src/emc/motion/motion.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ to another.
#define MOTION_INVALID_ID INT_MIN
#define MOTION_ID_VALID(x) ((x) != MOTION_INVALID_ID)

#include <rtapi.h> /* must precede rtapi_atomic.h in kernel mode */
#include <rtapi_atomic.h>

#ifdef __cplusplus
extern "C" {
Comment thread
BsAtHome marked this conversation as resolved.
#endif
Expand Down Expand Up @@ -739,14 +742,12 @@ Suggestion: Split this in to an Error and a Status flag register..
int inhibit_probe_home_error;
} emcmot_config_t;

/* error structure - A ring buffer used to pass formatted printf strings to usr space */
/* error structure - lockfree MPSC ring buffer. See emcmotutil.c. */
typedef struct emcmot_error_t {
unsigned char head; /* flag count for mutex detect */
char error[EMCMOT_ERROR_NUM][EMCMOT_ERROR_LEN];
int start; /* index of oldest error */
int end; /* index of newest error */
int num; /* number of items */
unsigned char tail; /* flag count for mutex detect */
rtapi_atomic_ullong write_reserve;
rtapi_atomic_ullong write_commit;
rtapi_atomic_ullong read_seq;
} emcmot_error_t;


Expand Down
32 changes: 32 additions & 0 deletions src/rtapi/rtapi_atomic.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,36 @@

#endif // defined(__cplusplus)

/* Prefixed aliases for the C11 atomic typedefs. C++ pre-C++23 does not
expose the unqualified <stdatomic.h> typedefs at global scope, so use
these names when declaring atomic fields in headers shared between C
and C++ translation units. */
#if defined(__cplusplus)
typedef std::atomic_bool rtapi_atomic_bool;
typedef std::atomic_char rtapi_atomic_char;
typedef std::atomic_schar rtapi_atomic_schar;
typedef std::atomic_uchar rtapi_atomic_uchar;
typedef std::atomic_short rtapi_atomic_short;
typedef std::atomic_ushort rtapi_atomic_ushort;
typedef std::atomic_int rtapi_atomic_int;
typedef std::atomic_uint rtapi_atomic_uint;
typedef std::atomic_long rtapi_atomic_long;
typedef std::atomic_ulong rtapi_atomic_ulong;
typedef std::atomic_llong rtapi_atomic_llong;
typedef std::atomic_ullong rtapi_atomic_ullong;
#else
typedef atomic_bool rtapi_atomic_bool;
typedef atomic_char rtapi_atomic_char;
typedef atomic_schar rtapi_atomic_schar;
typedef atomic_uchar rtapi_atomic_uchar;
typedef atomic_short rtapi_atomic_short;
typedef atomic_ushort rtapi_atomic_ushort;
typedef atomic_int rtapi_atomic_int;
typedef atomic_uint rtapi_atomic_uint;
typedef atomic_long rtapi_atomic_long;
typedef atomic_ulong rtapi_atomic_ulong;
typedef atomic_llong rtapi_atomic_llong;
typedef atomic_ullong rtapi_atomic_ullong;
#endif

#endif
1 change: 1 addition & 0 deletions tests/lowlevel/emcmot-error-mpsc/expected
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
test passed
112 changes: 112 additions & 0 deletions tests/lowlevel/emcmot-error-mpsc/test.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Concurrency / correctness test for the emcmotError MPSC ring buffer.
*
* Spawns NPROD producer threads pumping NMSG messages each, plus one
* consumer that drains. Verifies every message arrives exactly once
* and that messages from each individual producer arrive in producer-
* local sequence order. Across producers any interleaving is allowed.
*/

#include <pthread.h>
#include <sched.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>

#include <emcmotcfg.h>
#include "motion.h"
#include "dbuf.h"
#include "stashf.h"
#include <rtapi_atomic.h>

#define NPROD 8
#define NMSG 100000

static emcmot_error_t errlog;
static rtapi_atomic_int producers_done = 0;

static void *producer_fn(void *arg) {
long id = (long)arg;
char buf[64];
for (int i = 0; i < NMSG; i++) {
snprintf(buf, sizeof(buf), "P%ld:%010d", id, i);
while (emcmotErrorPut(&errlog, buf) < 0) {
sched_yield();
}
}
atomic_fetch_add(&producers_done, 1);
return NULL;
}

static void *consumer_fn(void *arg) {
(void)arg;
long total = (long)NPROD * NMSG;
long received = 0;
int per_producer[NPROD] = {0};
char raw[EMCMOT_ERROR_LEN];
char msg[EMCMOT_ERROR_LEN];
int errors = 0;

while (received < total) {
struct dbuf d;
struct dbuf_iter di;
/* dbuf_init zero-fills the buffer, so call it BEFORE
emcmotErrorGet, which then writes the received bytes in. */
dbuf_init(&d, (unsigned char *)raw, EMCMOT_ERROR_LEN);
if (emcmotErrorGet(&errlog, raw) == 0) {
dbuf_iter_init(&di, &d);
if (snprintdbuf(msg, sizeof(msg), &di) < 0) {
fprintf(stderr, "snprintdbuf failed\n");
errors++;
received++;
continue;
}
long pid;
int seq;
if (sscanf(msg, "P%ld:%d", &pid, &seq) != 2
|| pid < 0 || pid >= NPROD) {
fprintf(stderr, "bad msg: '%s'\n", msg);
errors++;
} else if (seq != per_producer[pid]) {
fprintf(stderr, "out-of-order P%ld: got %d, want %d\n",
pid, seq, per_producer[pid]);
errors++;
} else {
per_producer[pid]++;
}
received++;
} else {
if (atomic_load(&producers_done) >= NPROD) {
struct timespec ts = {0, 1000000};
nanosleep(&ts, NULL);
} else {
sched_yield();
}
}
}

if (errors == 0) {
printf("test passed\n");
} else {
printf("test FAILED with %d errors\n", errors);
}
return errors == 0 ? (void *)0 : (void *)1;
}

int main(void) {
emcmotErrorInit(&errlog);

pthread_t prods[NPROD];
pthread_t cons;
pthread_create(&cons, NULL, consumer_fn, NULL);
for (long i = 0; i < NPROD; i++) {
pthread_create(&prods[i], NULL, producer_fn, (void *)i);
}
for (long i = 0; i < NPROD; i++) {
pthread_join(prods[i], NULL);
}
void *cret;
pthread_join(cons, &cret);
return cret ? 1 : 0;
}
27 changes: 27 additions & 0 deletions tests/lowlevel/emcmot-error-mpsc/test.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#!/bin/sh
# Concurrency / correctness test for the emcmotError MPSC ring buffer.
# RIP-only: builds the test driver against the in-tree motion sources.

SCRIPTDIR="$(cd "$(dirname "$0")" && pwd)"
TOPDIR="$(cd "$SCRIPTDIR/../../.." && pwd)"
SRCDIR="${TOPDIR}/src/emc/motion"
INCDIR="${TOPDIR}/include"

if [ ! -f "${SRCDIR}/emcmotutil.c" ]; then
echo "skip: motion sources not found (RIP build required)"
exit 0
fi

gcc -O -pthread -DULAPI -Wall \
-I"${INCDIR}" \
-I"${SRCDIR}" \
"${SCRIPTDIR}/test.c" \
"${SRCDIR}/emcmotutil.c" \
"${SRCDIR}/dbuf.c" \
"${SRCDIR}/stashf.c" \
-o "${SCRIPTDIR}/test" || { echo "compile failed"; exit 1; }

"${SCRIPTDIR}/test"
exitval=$?
rm -f "${SCRIPTDIR}/test"
exit $exitval
Loading