Skip to content

Commit 5ce4df0

Browse files
authored
Add synchronous blocking channel receive (#23)
* Add synchronous blocking channel receive Implement blocking receive for channels that suspends Python while waiting for data and releases the dirty scheduler worker. - Add sync_waiter_pid and has_sync_waiter fields to py_channel_t - Add channel_register_sync_waiter NIF to register calling process - Modify channel_send to notify sync waiter via Erlang message - Modify channel_close to notify sync waiter of channel closure - Implement blocking handle_receive using Erlang receive to wait - Add tests for immediate, delayed, and closed channel cases * Add async receive e2e test to verify asyncio integration * Add subinterpreter mode sync receive test Test verifies blocking channel receive works with Python 3.12+ subinterpreter contexts. Skips gracefully on older Python versions. * Fix subinterp_sync_receive_wait_test to be proper e2e test - Use py_context:start_link/2 with unique integer ID - Use py_context:stop/1 instead of gen_server:stop - Test immediate receive, blocking receive with delayed send, try_receive on empty, and closed channel detection * Fix race condition and cleanup in sync waiter registration - Check if data is available when registering sync waiter to handle race between try_receive returning empty and register_sync_waiter being called - Return 'has_data' atom when data arrived in the window, caller retries - Notify sync waiter in channel destructor when channel is GC'd - Do not notify async waiter in destructor to avoid use-after-free when event loop is destroyed concurrently - Update test to consume data before re-registering waiter * Remove unused ChannelBuffer Python type The ChannelBuffer type was defined but never used. Removing dead code. * Fix channel waiter race conditions and lost wakeups - Reject duplicate/mixed waiters: both async and sync waiter registration now return {error, waiter_exists} if any waiter already exists - Fix lost wakeups: event_loop_add_pending now returns bool; waiter state is only cleared after successful dispatch - Add null checks for enif_alloc_env in sync waiter notifications - Add tests for mixed waiter rejection scenarios * Remove deprecated ASGI/WSGI NIF tests, keep deprecation checks The asgi_run and wsgi_run NIF functions are deprecated. Removed tests that call these functions, keeping only the deprecation attribute tests. * Fix use-after-free in tl_pending_args across subinterpreters Clear tl_pending_args to NULL whenever tl_pending_callback is set to false. Previously, the thread-local pointer was left dangling after callback completion. When a dirty scheduler thread later handled a different subinterpreter's code, Py_XDECREF on the stale pointer would attempt to free memory from the wrong allocator.
1 parent 2c02500 commit 5ce4df0

File tree

11 files changed

+613
-450
lines changed

11 files changed

+613
-450
lines changed

c_src/py_callback.c

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -563,6 +563,7 @@ static PyObject *build_pending_callback_exc_args(void) {
563563
PyObject *exc_args = PyTuple_New(3);
564564
if (exc_args == NULL) {
565565
tl_pending_callback = false;
566+
Py_CLEAR(tl_pending_args);
566567
return NULL;
567568
}
568569

@@ -575,6 +576,7 @@ static PyObject *build_pending_callback_exc_args(void) {
575576
Py_XDECREF(func_name_obj);
576577
Py_DECREF(exc_args);
577578
tl_pending_callback = false;
579+
Py_CLEAR(tl_pending_args);
578580
return NULL;
579581
}
580582

@@ -610,6 +612,7 @@ static ERL_NIF_TERM build_suspended_result(ErlNifEnv *env, suspended_state_t *su
610612
ERL_NIF_TERM args_term = py_to_term(env, tl_pending_args);
611613

612614
tl_pending_callback = false;
615+
Py_CLEAR(tl_pending_args);
613616

614617
return enif_make_tuple4(env,
615618
ATOM_SUSPENDED,
@@ -811,6 +814,7 @@ static ERL_NIF_TERM build_suspended_context_result(ErlNifEnv *env, suspended_con
811814
ERL_NIF_TERM args_term = py_to_term(env, tl_pending_args);
812815

813816
tl_pending_callback = false;
817+
Py_CLEAR(tl_pending_args);
814818

815819
return enif_make_tuple4(env,
816820
ATOM_SUSPENDED,
@@ -1290,6 +1294,19 @@ PyTypeObject ErlangPidType = {
12901294
static PyObject *erlang_call_impl(PyObject *self, PyObject *args) {
12911295
(void)self;
12921296

1297+
/*
1298+
* Invariant check: pending callback TLS must be clear when entering.
1299+
* If any state is still set, it's leaked from a prior context that didn't
1300+
* properly clean up - fail loudly rather than risk cross-interpreter corruption.
1301+
*/
1302+
if (tl_pending_callback || tl_pending_args != NULL ||
1303+
tl_pending_func_name != NULL || tl_pending_callback_id != 0) {
1304+
PyErr_SetString(PyExc_RuntimeError,
1305+
"erlang.call: stale pending callback TLS detected - "
1306+
"prior context did not clean up properly");
1307+
return NULL;
1308+
}
1309+
12931310
/*
12941311
* Check if we have a callback handler available.
12951312
* Priority:
@@ -1553,6 +1570,7 @@ static PyObject *erlang_call_impl(PyObject *self, PyObject *args) {
15531570
tl_pending_func_name = enif_alloc(func_name_len + 1);
15541571
if (tl_pending_func_name == NULL) {
15551572
tl_pending_callback = false;
1573+
Py_CLEAR(tl_pending_args);
15561574
Py_DECREF(call_args);
15571575
PyErr_SetString(PyExc_MemoryError, "Failed to allocate function name");
15581576
return NULL;
@@ -1561,9 +1579,12 @@ static PyObject *erlang_call_impl(PyObject *self, PyObject *args) {
15611579
tl_pending_func_name[func_name_len] = '\0';
15621580
tl_pending_func_name_len = func_name_len;
15631581

1564-
/* Store args (take ownership) */
1565-
Py_XDECREF(tl_pending_args);
1566-
tl_pending_args = call_args; /* Takes ownership, don't decref */
1582+
/* Store args (take ownership)
1583+
* Use Py_XSETREF for swap-first pattern: sets tl_pending_args to new value
1584+
* BEFORE decref'ing old value. This prevents re-entrancy issues if the old
1585+
* object's finalizer triggers another erlang.call() during decref.
1586+
*/
1587+
Py_XSETREF(tl_pending_args, call_args);
15671588

15681589
/* Raise exception to abort Python execution */
15691590
PyErr_SetString(SuspensionRequiredException, "callback pending");
@@ -2649,6 +2670,7 @@ static ERL_NIF_TERM nif_resume_callback_dirty(ErlNifEnv *env, int argc, const ER
26492670
Py_DECREF(exc_args);
26502671
if (new_suspended == NULL) {
26512672
tl_pending_callback = false;
2673+
Py_CLEAR(tl_pending_args);
26522674
result = make_error(env, "create_nested_suspended_state_failed");
26532675
} else {
26542676
result = build_suspended_result(env, new_suspended);
@@ -2717,6 +2739,7 @@ static ERL_NIF_TERM nif_resume_callback_dirty(ErlNifEnv *env, int argc, const ER
27172739
Py_DECREF(exc_args);
27182740
if (new_suspended == NULL) {
27192741
tl_pending_callback = false;
2742+
Py_CLEAR(tl_pending_args);
27202743
result = make_error(env, "create_nested_suspended_state_failed");
27212744
} else {
27222745
result = build_suspended_result(env, new_suspended);

0 commit comments

Comments
 (0)