Skip to content

Commit cfaa452

Browse files
authored
Merge pull request #50 from benoitc/fix/c-audit-findings
Fix C audit findings
2 parents ba5633c + 6dd6015 commit cfaa452

File tree

7 files changed

+243
-149
lines changed

7 files changed

+243
-149
lines changed

c_src/py_callback.c

Lines changed: 14 additions & 82 deletions
Original file line numberDiff line numberDiff line change
@@ -2682,8 +2682,8 @@ static PyObject *erlang_channel_try_receive_impl(PyObject *self, PyObject *args)
26822682
* Returns: Python object when data available
26832683
* Raises: RuntimeError if channel closed, TimeoutError if timeout
26842684
*
2685-
* This function releases the GIL while waiting, allowing other Python
2686-
* threads to run. Uses polling with short sleeps.
2685+
* This function releases the GIL while waiting on a condition variable,
2686+
* allowing other Python threads to run efficiently without polling.
26872687
*/
26882688
static PyObject *erlang_channel_receive_impl(PyObject *self, PyObject *args) {
26892689
(void)self;
@@ -2709,52 +2709,19 @@ static PyObject *erlang_channel_receive_impl(PyObject *self, PyObject *args) {
27092709
size_t size = 0;
27102710
int result;
27112711

2712-
/* First try without blocking */
2713-
result = channel_try_receive(channel, &data, &size);
2714-
if (result == 0) {
2715-
goto decode_and_return;
2716-
} else if (result == -1) {
2717-
PyErr_SetString(PyExc_RuntimeError, "channel closed");
2718-
return NULL;
2719-
}
2720-
2721-
/* Need to wait - release GIL and poll */
2722-
{
2723-
long elapsed_us = 0;
2724-
const long poll_interval_us = 100; /* 100 microseconds */
2725-
const long timeout_us = timeout_ms >= 0 ? timeout_ms * 1000 : -1;
2726-
2727-
Py_BEGIN_ALLOW_THREADS
2728-
2729-
while (1) {
2730-
result = channel_try_receive(channel, &data, &size);
2731-
if (result != 1) {
2732-
break; /* Got data or closed */
2733-
}
2734-
2735-
/* Check timeout */
2736-
if (timeout_us >= 0 && elapsed_us >= timeout_us) {
2737-
result = 2; /* Timeout */
2738-
break;
2739-
}
2740-
2741-
/* Sleep briefly */
2742-
usleep(poll_interval_us);
2743-
elapsed_us += poll_interval_us;
2744-
}
2712+
/* Block on condition variable until data available, closed, or timeout */
2713+
Py_BEGIN_ALLOW_THREADS
2714+
result = channel_receive_blocking(channel, &data, &size, timeout_ms);
2715+
Py_END_ALLOW_THREADS
27452716

2746-
Py_END_ALLOW_THREADS
2747-
}
2748-
2749-
if (result == 2) {
2717+
if (result == 1) {
27502718
PyErr_SetString(PyExc_TimeoutError, "channel receive timeout");
27512719
return NULL;
27522720
} else if (result == -1) {
27532721
PyErr_SetString(PyExc_RuntimeError, "channel closed");
27542722
return NULL;
27552723
}
27562724

2757-
decode_and_return:
27582725
/* Decode from Erlang external term format to Python */
27592726
{
27602727
ErlNifEnv *tmp_env = enif_alloc_env();
@@ -2945,8 +2912,8 @@ static PyObject *erlang_byte_channel_try_receive_bytes_impl(PyObject *self, PyOb
29452912
* Returns: bytes when data available
29462913
* Raises: RuntimeError if channel closed, TimeoutError if timeout
29472914
*
2948-
* This function releases the GIL while waiting, allowing other Python
2949-
* threads to run. Uses polling with short sleeps.
2915+
* This function releases the GIL while waiting on a condition variable,
2916+
* allowing other Python threads to run efficiently without polling.
29502917
*/
29512918
static PyObject *erlang_byte_channel_receive_bytes_impl(PyObject *self, PyObject *args) {
29522919
(void)self;
@@ -2972,47 +2939,12 @@ static PyObject *erlang_byte_channel_receive_bytes_impl(PyObject *self, PyObject
29722939
size_t size = 0;
29732940
int result;
29742941

2975-
/* First try without blocking */
2976-
result = channel_try_receive(channel, &data, &size);
2977-
if (result == 0) {
2978-
/* Got data - return raw bytes */
2979-
PyObject *bytes = PyBytes_FromStringAndSize((char *)data, size);
2980-
enif_free(data);
2981-
return bytes;
2982-
} else if (result == -1) {
2983-
PyErr_SetString(PyExc_RuntimeError, "channel closed");
2984-
return NULL;
2985-
}
2986-
2987-
/* Need to wait - release GIL and poll */
2988-
{
2989-
long elapsed_us = 0;
2990-
const long poll_interval_us = 100; /* 100 microseconds */
2991-
const long timeout_us = timeout_ms >= 0 ? timeout_ms * 1000 : -1;
2992-
2993-
Py_BEGIN_ALLOW_THREADS
2994-
2995-
while (1) {
2996-
result = channel_try_receive(channel, &data, &size);
2997-
if (result != 1) {
2998-
break; /* Got data or closed */
2999-
}
3000-
3001-
/* Check timeout */
3002-
if (timeout_us >= 0 && elapsed_us >= timeout_us) {
3003-
result = 2; /* Timeout */
3004-
break;
3005-
}
3006-
3007-
/* Sleep briefly */
3008-
usleep(poll_interval_us);
3009-
elapsed_us += poll_interval_us;
3010-
}
3011-
3012-
Py_END_ALLOW_THREADS
3013-
}
2942+
/* Block on condition variable until data available, closed, or timeout */
2943+
Py_BEGIN_ALLOW_THREADS
2944+
result = channel_receive_blocking(channel, &data, &size, timeout_ms);
2945+
Py_END_ALLOW_THREADS
30142946

3015-
if (result == 2) {
2947+
if (result == 1) {
30162948
PyErr_SetString(PyExc_TimeoutError, "channel receive timeout");
30172949
return NULL;
30182950
} else if (result == -1) {

c_src/py_channel.c

Lines changed: 90 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,9 @@ void channel_resource_dtor(ErlNifEnv *env, void *obj) {
6969
channel->queue = NULL;
7070
}
7171

72+
/* Wake any threads waiting on the condition before destroying */
73+
pthread_cond_broadcast(&channel->data_cond);
74+
pthread_cond_destroy(&channel->data_cond);
7275
pthread_mutex_destroy(&channel->mutex);
7376
}
7477

@@ -91,8 +94,6 @@ py_channel_t *channel_alloc(size_t max_size) {
9194

9295
channel->max_size = max_size;
9396
channel->current_size = 0;
94-
channel->waiting = NULL;
95-
channel->waiting_callback_id = 0;
9697
channel->waiter_loop = NULL;
9798
channel->waiter_callback_id = 0;
9899
channel->has_waiter = false;
@@ -107,6 +108,13 @@ py_channel_t *channel_alloc(size_t max_size) {
107108
return NULL;
108109
}
109110

111+
if (pthread_cond_init(&channel->data_cond, NULL) != 0) {
112+
pthread_mutex_destroy(&channel->mutex);
113+
enif_ioq_destroy(channel->queue);
114+
enif_release_resource(channel);
115+
return NULL;
116+
}
117+
110118
return channel;
111119
}
112120

@@ -141,9 +149,6 @@ int channel_send(py_channel_t *channel, const unsigned char *data, size_t size)
141149

142150
channel->current_size += size;
143151

144-
/* Check if there's a waiting context to resume */
145-
bool should_resume = (channel->waiting != NULL);
146-
147152
/* Check if there's an async waiter to dispatch.
148153
* IMPORTANT: Clear waiter state BEFORE releasing mutex to avoid race condition.
149154
* With task_ready notification, the callback can fire before we re-acquire the mutex.
@@ -171,12 +176,10 @@ int channel_send(py_channel_t *channel, const unsigned char *data, size_t size)
171176
channel->has_sync_waiter = false;
172177
}
173178

174-
pthread_mutex_unlock(&channel->mutex);
179+
/* Signal any threads waiting on the condition variable */
180+
pthread_cond_signal(&channel->data_cond);
175181

176-
/* Resume happens outside the lock to avoid deadlocks */
177-
if (should_resume) {
178-
channel_resume_waiting(channel);
179-
}
182+
pthread_mutex_unlock(&channel->mutex);
180183

181184
/* Dispatch async waiter via timer dispatch (same path as timers) */
182185
if (loop_to_wake != NULL) {
@@ -225,9 +228,6 @@ int channel_send_owned_binary(py_channel_t *channel, ErlNifBinary *bin) {
225228

226229
channel->current_size += msg_size;
227230

228-
/* Check if there's a waiting context to resume */
229-
bool should_resume = (channel->waiting != NULL);
230-
231231
/* Check if there's an async waiter to dispatch.
232232
* IMPORTANT: Clear waiter state BEFORE releasing mutex to avoid race condition.
233233
* With task_ready notification, the callback can fire before we re-acquire the mutex.
@@ -255,11 +255,10 @@ int channel_send_owned_binary(py_channel_t *channel, ErlNifBinary *bin) {
255255
channel->has_sync_waiter = false;
256256
}
257257

258-
pthread_mutex_unlock(&channel->mutex);
258+
/* Signal any threads waiting on the condition variable */
259+
pthread_cond_signal(&channel->data_cond);
259260

260-
if (should_resume) {
261-
channel_resume_waiting(channel);
262-
}
261+
pthread_mutex_unlock(&channel->mutex);
263262

264263
/* Dispatch async waiter via timer dispatch (same path as timers) */
265264
if (loop_to_wake != NULL) {
@@ -326,10 +325,80 @@ int channel_try_receive(py_channel_t *channel, unsigned char **out_data, size_t
326325
return 0;
327326
}
328327

328+
int channel_receive_blocking(py_channel_t *channel, unsigned char **out_data,
329+
size_t *out_size, long timeout_ms) {
330+
pthread_mutex_lock(&channel->mutex);
331+
332+
/* Calculate deadline for timed wait */
333+
struct timespec deadline;
334+
if (timeout_ms > 0) {
335+
clock_gettime(CLOCK_REALTIME, &deadline);
336+
deadline.tv_sec += timeout_ms / 1000;
337+
deadline.tv_nsec += (timeout_ms % 1000) * 1000000;
338+
if (deadline.tv_nsec >= 1000000000) {
339+
deadline.tv_sec++;
340+
deadline.tv_nsec -= 1000000000;
341+
}
342+
}
343+
344+
/* Wait for data or close */
345+
while (enif_ioq_size(channel->queue) == 0 && !channel->closed) {
346+
if (timeout_ms == 0) {
347+
/* Non-blocking: return immediately if no data */
348+
pthread_mutex_unlock(&channel->mutex);
349+
return 1; /* Empty/timeout */
350+
} else if (timeout_ms < 0) {
351+
/* Infinite wait */
352+
pthread_cond_wait(&channel->data_cond, &channel->mutex);
353+
} else {
354+
/* Timed wait */
355+
int rc = pthread_cond_timedwait(&channel->data_cond, &channel->mutex, &deadline);
356+
if (rc == ETIMEDOUT) {
357+
pthread_mutex_unlock(&channel->mutex);
358+
return 1; /* Timeout */
359+
}
360+
}
361+
}
362+
363+
/* Check if closed with no data */
364+
if (channel->closed && enif_ioq_size(channel->queue) == 0) {
365+
pthread_mutex_unlock(&channel->mutex);
366+
return -1; /* Closed */
367+
}
368+
369+
/* We have data - dequeue it */
370+
SysIOVec *iov;
371+
int iovcnt;
372+
iov = enif_ioq_peek(channel->queue, &iovcnt);
373+
374+
if (iovcnt == 0 || iov == NULL || iov[0].iov_len == 0) {
375+
pthread_mutex_unlock(&channel->mutex);
376+
return 1; /* Spurious wakeup, no data */
377+
}
378+
379+
size_t msg_size = iov[0].iov_len;
380+
381+
/* Allocate output buffer */
382+
*out_data = enif_alloc(msg_size);
383+
if (*out_data == NULL) {
384+
pthread_mutex_unlock(&channel->mutex);
385+
return -1; /* Allocation error */
386+
}
387+
388+
/* Copy data and dequeue */
389+
memcpy(*out_data, iov[0].iov_base, msg_size);
390+
*out_size = msg_size;
391+
392+
enif_ioq_deq(channel->queue, msg_size, NULL);
393+
channel->current_size -= msg_size;
394+
395+
pthread_mutex_unlock(&channel->mutex);
396+
return 0;
397+
}
398+
329399
void channel_close(py_channel_t *channel) {
330400
pthread_mutex_lock(&channel->mutex);
331401
channel->closed = true;
332-
bool should_resume = (channel->waiting != NULL);
333402

334403
/* Check if there's an async waiter to dispatch.
335404
* For close, we unconditionally clear the waiter since the channel
@@ -354,11 +423,10 @@ void channel_close(py_channel_t *channel) {
354423
channel->has_sync_waiter = false;
355424
}
356425

357-
pthread_mutex_unlock(&channel->mutex);
426+
/* Wake all threads waiting on the condition variable */
427+
pthread_cond_broadcast(&channel->data_cond);
358428

359-
if (should_resume) {
360-
channel_resume_waiting(channel);
361-
}
429+
pthread_mutex_unlock(&channel->mutex);
362430

363431
/* Dispatch async waiter to signal closure */
364432
if (loop_to_wake != NULL) {
@@ -377,13 +445,6 @@ void channel_close(py_channel_t *channel) {
377445
}
378446
}
379447

380-
void channel_resume_waiting(py_channel_t *channel) {
381-
/* This function would trigger resume of the suspended context.
382-
* For now, the actual resume logic is handled in the NIF receive function
383-
* by checking if data is available before suspending. */
384-
(void)channel;
385-
}
386-
387448
int channel_init(ErlNifEnv *env) {
388449
/* Initialize channel-specific atoms */
389450
ATOM_BUSY = enif_make_atom(env, "busy");

c_src/py_channel.h

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -86,11 +86,8 @@ typedef struct {
8686
/** @brief Mutex for thread-safe queue access */
8787
pthread_mutex_t mutex;
8888

89-
/** @brief Suspended Python context waiting on this channel */
90-
struct suspended_context_state *waiting;
91-
92-
/** @brief Callback ID for the waiting context */
93-
uint64_t waiting_callback_id;
89+
/** @brief Condition variable for blocking receives */
90+
pthread_cond_t data_cond;
9491

9592
/** @brief Event loop for async waiter (ref-counted via enif_keep_resource) */
9693
struct erlang_event_loop *waiter_loop;
@@ -175,22 +172,28 @@ int channel_send_owned_binary(py_channel_t *channel, ErlNifBinary *bin);
175172
int channel_try_receive(py_channel_t *channel, unsigned char **out_data, size_t *out_size);
176173

177174
/**
178-
* @brief Close a channel
175+
* @brief Receive a message from a channel (blocking with timeout)
179176
*
180-
* Closes the channel and wakes any waiting receivers with StopIteration.
177+
* Blocks on a condition variable until data is available, channel is closed,
178+
* or timeout expires. This is more efficient than polling.
181179
*
182-
* @param channel Channel to close
180+
* @param channel Channel to receive from
181+
* @param out_data Output: message data (caller must free with enif_free)
182+
* @param out_size Output: message size
183+
* @param timeout_ms Timeout in milliseconds (-1 for infinite, 0 for non-blocking)
184+
* @return 0 on success, 1 if timeout, -1 if closed
183185
*/
184-
void channel_close(py_channel_t *channel);
186+
int channel_receive_blocking(py_channel_t *channel, unsigned char **out_data,
187+
size_t *out_size, long timeout_ms);
185188

186189
/**
187-
* @brief Resume a waiting Python context with channel data
190+
* @brief Close a channel
188191
*
189-
* Called when data becomes available for a suspended receive.
192+
* Closes the channel and wakes any waiting receivers with StopIteration.
190193
*
191-
* @param channel Channel with waiting context
194+
* @param channel Channel to close
192195
*/
193-
void channel_resume_waiting(py_channel_t *channel);
196+
void channel_close(py_channel_t *channel);
194197

195198
/* ============================================================================
196199
* Channel NIF Declarations

0 commit comments

Comments
 (0)