Skip to content

Commit 7bb28a9

Browse files
committed
Add _erlang_sleep for ASGI fast path and erlang_asyncio module
- Add sync_sleep fields to erlang_event_loop_t (id, complete flag, cond var) - Add py_erlang_sleep() Python function that blocks on Erlang timer - Add nif_dispatch_sleep_complete() to signal sleep completion - Handle {sleep_wait, DelayMs, SleepId} in py_event_worker.erl - Add erlang_asyncio module with full asyncio-compatible API: * sleep(), run(), gather(), wait(), wait_for() * create_task(), ensure_future(), shield() * get_event_loop(), new_event_loop(), set_event_loop() * TimeoutError, CancelledError exceptions * ALL_COMPLETED, FIRST_COMPLETED, FIRST_EXCEPTION constants - Add py_erlang_sleep_SUITE with 8 tests Usage: import erlang_asyncio async def main(): await erlang_asyncio.sleep(0.001) results = await erlang_asyncio.gather(task1(), task2()) erlang_asyncio.run(main())
1 parent e5906f1 commit 7bb28a9

File tree

7 files changed

+720
-3
lines changed

7 files changed

+720
-3
lines changed

c_src/py_event_loop.c

Lines changed: 146 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,9 +193,12 @@ void event_loop_destructor(ErlNifEnv *env, void *obj) {
193193
/* Signal shutdown */
194194
loop->shutdown = true;
195195

196-
/* Wake up any waiting threads */
196+
/* Wake up any waiting threads (including sync sleep waiters) */
197197
pthread_mutex_lock(&loop->mutex);
198198
pthread_cond_broadcast(&loop->event_cond);
199+
if (loop->sync_sleep_cond_initialized) {
200+
pthread_cond_broadcast(&loop->sync_sleep_cond);
201+
}
199202
pthread_mutex_unlock(&loop->mutex);
200203

201204
/* Clear pending events (returns them to freelist) */
@@ -220,6 +223,9 @@ void event_loop_destructor(ErlNifEnv *env, void *obj) {
220223
/* Destroy synchronization primitives */
221224
pthread_mutex_destroy(&loop->mutex);
222225
pthread_cond_destroy(&loop->event_cond);
226+
if (loop->sync_sleep_cond_initialized) {
227+
pthread_cond_destroy(&loop->sync_sleep_cond);
228+
}
223229
}
224230

225231
/**
@@ -441,8 +447,19 @@ ERL_NIF_TERM nif_event_loop_new(ErlNifEnv *env, int argc,
441447
return make_error(env, "cond_init_failed");
442448
}
443449

450+
if (pthread_cond_init(&loop->sync_sleep_cond, NULL) != 0) {
451+
pthread_cond_destroy(&loop->event_cond);
452+
pthread_mutex_destroy(&loop->mutex);
453+
enif_release_resource(loop);
454+
return make_error(env, "sleep_cond_init_failed");
455+
}
456+
loop->sync_sleep_cond_initialized = true;
457+
atomic_store(&loop->sync_sleep_id, 0);
458+
atomic_store(&loop->sync_sleep_complete, false);
459+
444460
loop->msg_env = enif_alloc_env();
445461
if (loop->msg_env == NULL) {
462+
pthread_cond_destroy(&loop->sync_sleep_cond);
446463
pthread_cond_destroy(&loop->event_cond);
447464
pthread_mutex_destroy(&loop->mutex);
448465
enif_release_resource(loop);
@@ -1094,6 +1111,38 @@ ERL_NIF_TERM nif_dispatch_timer(ErlNifEnv *env, int argc,
10941111
return ATOM_OK;
10951112
}
10961113

1114+
/**
1115+
* dispatch_sleep_complete(LoopRef, SleepId) -> ok
1116+
*
1117+
* Called from Erlang when a synchronous sleep timer expires.
1118+
* Signals the waiting Python thread to wake up.
1119+
*/
1120+
ERL_NIF_TERM nif_dispatch_sleep_complete(ErlNifEnv *env, int argc,
1121+
const ERL_NIF_TERM argv[]) {
1122+
(void)argc;
1123+
1124+
erlang_event_loop_t *loop;
1125+
if (!enif_get_resource(env, argv[0], EVENT_LOOP_RESOURCE_TYPE,
1126+
(void **)&loop)) {
1127+
return make_error(env, "invalid_loop");
1128+
}
1129+
1130+
ErlNifUInt64 sleep_id;
1131+
if (!enif_get_uint64(env, argv[1], &sleep_id)) {
1132+
return make_error(env, "invalid_sleep_id");
1133+
}
1134+
1135+
/* Only signal if this is the sleep we're waiting for */
1136+
pthread_mutex_lock(&loop->mutex);
1137+
if (atomic_load(&loop->sync_sleep_id) == sleep_id) {
1138+
atomic_store(&loop->sync_sleep_complete, true);
1139+
pthread_cond_broadcast(&loop->sync_sleep_cond);
1140+
}
1141+
pthread_mutex_unlock(&loop->mutex);
1142+
1143+
return ATOM_OK;
1144+
}
1145+
10971146
/**
10981147
* handle_fd_event(FdRes, Type) -> ok | {error, Reason}
10991148
*
@@ -3611,6 +3660,89 @@ static PyObject *py_get_pending_for(PyObject *self, PyObject *args) {
36113660
return list;
36123661
}
36133662

3663+
/**
3664+
* Python function: _erlang_sleep(delay_ms) -> None
3665+
*
3666+
* Synchronous sleep that uses Erlang's timer system instead of asyncio.
3667+
* Sends {sleep_wait, DelayMs, SleepId} to the worker, then blocks waiting
3668+
* for the sleep completion signal.
3669+
*
3670+
* This is called from the ASGI fast path when asyncio.sleep() is detected,
3671+
* avoiding the need to create a full event loop.
3672+
*/
3673+
static PyObject *py_erlang_sleep(PyObject *self, PyObject *args) {
3674+
(void)self;
3675+
int delay_ms;
3676+
3677+
if (!PyArg_ParseTuple(args, "i", &delay_ms)) {
3678+
return NULL;
3679+
}
3680+
3681+
/* For zero or negative delay, return immediately */
3682+
if (delay_ms <= 0) {
3683+
Py_RETURN_NONE;
3684+
}
3685+
3686+
erlang_event_loop_t *loop = get_interpreter_event_loop();
3687+
if (loop == NULL || loop->shutdown) {
3688+
PyErr_SetString(PyExc_RuntimeError, "Event loop not initialized");
3689+
return NULL;
3690+
}
3691+
3692+
/* Check if we have a worker to send to */
3693+
if (!loop->has_worker && !loop->has_router) {
3694+
PyErr_SetString(PyExc_RuntimeError, "No worker or router configured");
3695+
return NULL;
3696+
}
3697+
3698+
/* Generate a unique sleep ID */
3699+
uint64_t sleep_id = atomic_fetch_add(&loop->next_callback_id, 1);
3700+
3701+
/* Send {sleep_wait, DelayMs, SleepId} to worker */
3702+
ErlNifEnv *msg_env = enif_alloc_env();
3703+
if (msg_env == NULL) {
3704+
PyErr_SetString(PyExc_MemoryError, "Failed to allocate message environment");
3705+
return NULL;
3706+
}
3707+
3708+
ERL_NIF_TERM msg = enif_make_tuple3(
3709+
msg_env,
3710+
enif_make_atom(msg_env, "sleep_wait"),
3711+
enif_make_int(msg_env, delay_ms),
3712+
enif_make_uint64(msg_env, sleep_id)
3713+
);
3714+
3715+
/* Use worker_pid when available, otherwise fall back to router_pid */
3716+
ErlNifPid *target_pid = loop->has_worker ? &loop->worker_pid : &loop->router_pid;
3717+
if (!enif_send(NULL, target_pid, msg_env, msg)) {
3718+
enif_free_env(msg_env);
3719+
PyErr_SetString(PyExc_RuntimeError, "Failed to send sleep message");
3720+
return NULL;
3721+
}
3722+
enif_free_env(msg_env);
3723+
3724+
/* Set up for waiting on this sleep */
3725+
pthread_mutex_lock(&loop->mutex);
3726+
atomic_store(&loop->sync_sleep_id, sleep_id);
3727+
atomic_store(&loop->sync_sleep_complete, false);
3728+
3729+
/* Release GIL and wait for completion */
3730+
Py_BEGIN_ALLOW_THREADS
3731+
while (!atomic_load(&loop->sync_sleep_complete) && !loop->shutdown) {
3732+
pthread_cond_wait(&loop->sync_sleep_cond, &loop->mutex);
3733+
}
3734+
Py_END_ALLOW_THREADS
3735+
3736+
pthread_mutex_unlock(&loop->mutex);
3737+
3738+
if (loop->shutdown) {
3739+
PyErr_SetString(PyExc_RuntimeError, "Event loop shutdown during sleep");
3740+
return NULL;
3741+
}
3742+
3743+
Py_RETURN_NONE;
3744+
}
3745+
36143746
/* Module method definitions */
36153747
static PyMethodDef PyEventLoopMethods[] = {
36163748
/* Legacy API (uses global event loop) */
@@ -3640,6 +3772,8 @@ static PyMethodDef PyEventLoopMethods[] = {
36403772
{"_remove_writer_for", py_remove_writer_for, METH_VARARGS, "Stop monitoring fd for writes on specific loop"},
36413773
{"_schedule_timer_for", py_schedule_timer_for, METH_VARARGS, "Schedule timer on specific loop"},
36423774
{"_cancel_timer_for", py_cancel_timer_for, METH_VARARGS, "Cancel timer on specific loop"},
3775+
/* Synchronous sleep (for ASGI fast path) */
3776+
{"_erlang_sleep", py_erlang_sleep, METH_VARARGS, "Synchronous sleep using Erlang timer"},
36433777
{NULL, NULL, 0, NULL}
36443778
};
36453779

@@ -3714,8 +3848,19 @@ int create_default_event_loop(ErlNifEnv *env) {
37143848
return -1;
37153849
}
37163850

3851+
if (pthread_cond_init(&loop->sync_sleep_cond, NULL) != 0) {
3852+
pthread_cond_destroy(&loop->event_cond);
3853+
pthread_mutex_destroy(&loop->mutex);
3854+
enif_release_resource(loop);
3855+
return -1;
3856+
}
3857+
loop->sync_sleep_cond_initialized = true;
3858+
atomic_store(&loop->sync_sleep_id, 0);
3859+
atomic_store(&loop->sync_sleep_complete, false);
3860+
37173861
loop->msg_env = enif_alloc_env();
37183862
if (loop->msg_env == NULL) {
3863+
pthread_cond_destroy(&loop->sync_sleep_cond);
37193864
pthread_cond_destroy(&loop->event_cond);
37203865
pthread_mutex_destroy(&loop->mutex);
37213866
enif_release_resource(loop);

c_src/py_event_loop.h

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,20 @@ typedef struct erlang_event_loop {
239239

240240
/** @brief Count of occupied slots in hash set */
241241
int pending_hash_count;
242+
243+
/* ========== Synchronous Sleep Support ========== */
244+
245+
/** @brief Current synchronous sleep ID being waited on */
246+
_Atomic uint64_t sync_sleep_id;
247+
248+
/** @brief Flag indicating sleep has completed */
249+
_Atomic bool sync_sleep_complete;
250+
251+
/** @brief Condition variable for sleep completion notification */
252+
pthread_cond_t sync_sleep_cond;
253+
254+
/** @brief Whether sync_sleep_cond has been initialized */
255+
bool sync_sleep_cond_initialized;
242256
} erlang_event_loop_t;
243257

244258
/* ============================================================================
@@ -441,6 +455,16 @@ ERL_NIF_TERM nif_dispatch_timer(ErlNifEnv *env, int argc,
441455
ERL_NIF_TERM nif_event_loop_wakeup(ErlNifEnv *env, int argc,
442456
const ERL_NIF_TERM argv[]);
443457

458+
/**
459+
* @brief Signal that a synchronous sleep has completed
460+
*
461+
* Called from Erlang when a sleep timer expires.
462+
*
463+
* NIF: dispatch_sleep_complete(LoopRef, SleepId) -> ok
464+
*/
465+
ERL_NIF_TERM nif_dispatch_sleep_complete(ErlNifEnv *env, int argc,
466+
const ERL_NIF_TERM argv[]);
467+
444468
/* ============================================================================
445469
* Internal Helper Functions
446470
* ============================================================================ */

c_src/py_nif.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1895,6 +1895,7 @@ static ErlNifFunc nif_funcs[] = {
18951895
{"get_pending", 1, nif_get_pending, 0},
18961896
{"dispatch_callback", 3, nif_dispatch_callback, 0},
18971897
{"dispatch_timer", 2, nif_dispatch_timer, 0},
1898+
{"dispatch_sleep_complete", 2, nif_dispatch_sleep_complete, 0},
18981899
{"get_fd_callback_id", 2, nif_get_fd_callback_id, 0},
18991900
{"reselect_reader", 2, nif_reselect_reader, 0},
19001901
{"reselect_writer", 2, nif_reselect_writer, 0},

0 commit comments

Comments
 (0)