Skip to content

Commit 50c97f2

Browse files
authored
Add erlang.sleep() with dirty scheduler release (#24)
* Add erlang.sleep() with callback-based sync suspension Add erlang.sleep() function that works in both async and sync contexts: - Async: returns asyncio.sleep() which uses Erlang timer system - Sync: uses erlang.call('_py_sleep') callback with receive/after, truly releasing the dirty scheduler for cooperative yielding Remove unused _erlang_sleep NIF which only released the GIL but blocked the pthread. The callback approach properly suspends the Erlang process. Changes: - Add sleep() to _erlang_impl and export to erlang module - Add _py_sleep callback in py_event_loop.erl - Remove py_erlang_sleep NIF and dispatch_sleep_complete - Remove sync_sleep fields from event loop struct - Remove sleep handlers from py_event_worker - Update tests to use erlang.sleep() * Document that erlang.sleep() releases dirty scheduler Update docstring and asyncio.md to clarify: - Both sync and async modes release the dirty NIF scheduler - Async: yields to event loop via asyncio.sleep()/call_later() - Sync: suspends Erlang process via receive/after callback Also fix outdated architecture diagram that referenced removed sleep_wait/dispatch_sleep_complete NIF.
1 parent 5ce4df0 commit 50c97f2

File tree

11 files changed

+292
-243
lines changed

11 files changed

+292
-243
lines changed

c_src/py_callback.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2409,6 +2409,7 @@ static int create_erlang_module(void) {
24092409
" import erlang\n"
24102410
" # Primary exports (uvloop-compatible)\n"
24112411
" erlang.run = _erlang_impl.run\n"
2412+
" erlang.sleep = _erlang_impl.sleep\n"
24122413
" erlang.spawn_task = _erlang_impl.spawn_task\n"
24132414
" erlang.new_event_loop = _erlang_impl.new_event_loop\n"
24142415
" erlang.ErlangEventLoop = _erlang_impl.ErlangEventLoop\n"

c_src/py_event_loop.c

Lines changed: 1 addition & 159 deletions
Original file line numberDiff line numberDiff line change
@@ -365,12 +365,9 @@ void event_loop_destructor(ErlNifEnv *env, void *obj) {
365365
/* Signal shutdown */
366366
loop->shutdown = true;
367367

368-
/* Wake up any waiting threads (including sync sleep waiters) */
368+
/* Wake up any waiting threads */
369369
pthread_mutex_lock(&loop->mutex);
370370
pthread_cond_broadcast(&loop->event_cond);
371-
if (loop->sync_sleep_cond_initialized) {
372-
pthread_cond_broadcast(&loop->sync_sleep_cond);
373-
}
374371
pthread_mutex_unlock(&loop->mutex);
375372

376373
/* Clear pending events (returns them to freelist) */
@@ -395,9 +392,6 @@ void event_loop_destructor(ErlNifEnv *env, void *obj) {
395392
/* Destroy synchronization primitives */
396393
pthread_mutex_destroy(&loop->mutex);
397394
pthread_cond_destroy(&loop->event_cond);
398-
if (loop->sync_sleep_cond_initialized) {
399-
pthread_cond_destroy(&loop->sync_sleep_cond);
400-
}
401395
}
402396

403397
/**
@@ -619,19 +613,8 @@ ERL_NIF_TERM nif_event_loop_new(ErlNifEnv *env, int argc,
619613
return make_error(env, "cond_init_failed");
620614
}
621615

622-
if (pthread_cond_init(&loop->sync_sleep_cond, NULL) != 0) {
623-
pthread_cond_destroy(&loop->event_cond);
624-
pthread_mutex_destroy(&loop->mutex);
625-
enif_release_resource(loop);
626-
return make_error(env, "sleep_cond_init_failed");
627-
}
628-
loop->sync_sleep_cond_initialized = true;
629-
atomic_store(&loop->sync_sleep_id, 0);
630-
atomic_store(&loop->sync_sleep_complete, false);
631-
632616
loop->msg_env = enif_alloc_env();
633617
if (loop->msg_env == NULL) {
634-
pthread_cond_destroy(&loop->sync_sleep_cond);
635618
pthread_cond_destroy(&loop->event_cond);
636619
pthread_mutex_destroy(&loop->mutex);
637620
enif_release_resource(loop);
@@ -1325,38 +1308,6 @@ ERL_NIF_TERM nif_dispatch_timer(ErlNifEnv *env, int argc,
13251308
return ATOM_OK;
13261309
}
13271310

1328-
/**
1329-
* dispatch_sleep_complete(LoopRef, SleepId) -> ok
1330-
*
1331-
* Called from Erlang when a synchronous sleep timer expires.
1332-
* Signals the waiting Python thread to wake up.
1333-
*/
1334-
ERL_NIF_TERM nif_dispatch_sleep_complete(ErlNifEnv *env, int argc,
1335-
const ERL_NIF_TERM argv[]) {
1336-
(void)argc;
1337-
1338-
erlang_event_loop_t *loop;
1339-
if (!enif_get_resource(env, argv[0], EVENT_LOOP_RESOURCE_TYPE,
1340-
(void **)&loop)) {
1341-
return make_error(env, "invalid_loop");
1342-
}
1343-
1344-
ErlNifUInt64 sleep_id;
1345-
if (!enif_get_uint64(env, argv[1], &sleep_id)) {
1346-
return make_error(env, "invalid_sleep_id");
1347-
}
1348-
1349-
/* Only signal if this is the sleep we're waiting for */
1350-
pthread_mutex_lock(&loop->mutex);
1351-
if (atomic_load(&loop->sync_sleep_id) == sleep_id) {
1352-
atomic_store(&loop->sync_sleep_complete, true);
1353-
pthread_cond_broadcast(&loop->sync_sleep_cond);
1354-
}
1355-
pthread_mutex_unlock(&loop->mutex);
1356-
1357-
return ATOM_OK;
1358-
}
1359-
13601311
/**
13611312
* handle_fd_event(FdRes, Type) -> ok | {error, Reason}
13621313
*
@@ -5151,102 +5102,6 @@ static PyObject *py_get_pending_for(PyObject *self, PyObject *args) {
51515102
return list;
51525103
}
51535104

5154-
/**
5155-
* Python function: _erlang_sleep(delay_ms) -> None
5156-
*
5157-
* Synchronous sleep that uses Erlang's timer system instead of asyncio.
5158-
* Sends {sleep_wait, DelayMs, SleepId} to the worker, then blocks waiting
5159-
* for the sleep completion signal.
5160-
*
5161-
* This is called from the ASGI fast path when asyncio.sleep() is detected,
5162-
* avoiding the need to create a full event loop.
5163-
*/
5164-
static PyObject *py_erlang_sleep(PyObject *self, PyObject *args) {
5165-
(void)self;
5166-
int delay_ms;
5167-
5168-
if (!PyArg_ParseTuple(args, "i", &delay_ms)) {
5169-
return NULL;
5170-
}
5171-
5172-
/* For zero or negative delay, return immediately */
5173-
if (delay_ms <= 0) {
5174-
Py_RETURN_NONE;
5175-
}
5176-
5177-
erlang_event_loop_t *loop = get_interpreter_event_loop();
5178-
if (loop == NULL || loop->shutdown) {
5179-
PyErr_SetString(PyExc_RuntimeError, "Event loop not initialized");
5180-
return NULL;
5181-
}
5182-
5183-
/* Check if we have a worker to send to */
5184-
if (!event_loop_ensure_router(loop)) {
5185-
PyErr_SetString(PyExc_RuntimeError, "No worker or router configured");
5186-
return NULL;
5187-
}
5188-
5189-
/* Generate a unique sleep ID */
5190-
uint64_t sleep_id = atomic_fetch_add(&loop->next_callback_id, 1);
5191-
5192-
/* FIX: Store sleep_id BEFORE sending to prevent race condition.
5193-
* If completion arrives before storage, it would be dropped and waiter deadlocks. */
5194-
pthread_mutex_lock(&loop->mutex);
5195-
atomic_store(&loop->sync_sleep_id, sleep_id);
5196-
atomic_store(&loop->sync_sleep_complete, false);
5197-
pthread_mutex_unlock(&loop->mutex);
5198-
5199-
/* Send {sleep_wait, DelayMs, SleepId} to worker */
5200-
ErlNifEnv *msg_env = enif_alloc_env();
5201-
if (msg_env == NULL) {
5202-
/* On failure, reset sleep_id */
5203-
pthread_mutex_lock(&loop->mutex);
5204-
atomic_store(&loop->sync_sleep_id, 0);
5205-
pthread_mutex_unlock(&loop->mutex);
5206-
PyErr_SetString(PyExc_MemoryError, "Failed to allocate message environment");
5207-
return NULL;
5208-
}
5209-
5210-
ERL_NIF_TERM msg = enif_make_tuple3(
5211-
msg_env,
5212-
enif_make_atom(msg_env, "sleep_wait"),
5213-
enif_make_int(msg_env, delay_ms),
5214-
enif_make_uint64(msg_env, sleep_id)
5215-
);
5216-
5217-
/* Use worker_pid when available, otherwise fall back to router_pid */
5218-
ErlNifPid *target_pid = loop->has_worker ? &loop->worker_pid : &loop->router_pid;
5219-
if (!enif_send(NULL, target_pid, msg_env, msg)) {
5220-
/* On failure, reset sleep_id */
5221-
pthread_mutex_lock(&loop->mutex);
5222-
atomic_store(&loop->sync_sleep_id, 0);
5223-
pthread_mutex_unlock(&loop->mutex);
5224-
enif_free_env(msg_env);
5225-
PyErr_SetString(PyExc_RuntimeError, "Failed to send sleep message");
5226-
return NULL;
5227-
}
5228-
enif_free_env(msg_env);
5229-
5230-
/* Wait for completion - sleep_id already set above */
5231-
pthread_mutex_lock(&loop->mutex);
5232-
5233-
/* Release GIL and wait for completion */
5234-
Py_BEGIN_ALLOW_THREADS
5235-
while (!atomic_load(&loop->sync_sleep_complete) && !loop->shutdown) {
5236-
pthread_cond_wait(&loop->sync_sleep_cond, &loop->mutex);
5237-
}
5238-
Py_END_ALLOW_THREADS
5239-
5240-
pthread_mutex_unlock(&loop->mutex);
5241-
5242-
if (loop->shutdown) {
5243-
PyErr_SetString(PyExc_RuntimeError, "Event loop shutdown during sleep");
5244-
return NULL;
5245-
}
5246-
5247-
Py_RETURN_NONE;
5248-
}
5249-
52505105
/* Module method definitions */
52515106
static PyMethodDef PyEventLoopMethods[] = {
52525107
/* Legacy API (uses global event loop) */
@@ -5282,8 +5137,6 @@ static PyMethodDef PyEventLoopMethods[] = {
52825137
{"_release_fd_resource", py_release_fd_resource, METH_VARARGS, "Release fd resource"},
52835138
{"_schedule_timer_for", py_schedule_timer_for, METH_VARARGS, "Schedule timer on specific loop"},
52845139
{"_cancel_timer_for", py_cancel_timer_for, METH_VARARGS, "Cancel timer on specific loop"},
5285-
/* Synchronous sleep (for ASGI fast path) */
5286-
{"_erlang_sleep", py_erlang_sleep, METH_VARARGS, "Synchronous sleep using Erlang timer"},
52875140
{NULL, NULL, 0, NULL}
52885141
};
52895142

@@ -5382,19 +5235,8 @@ int create_default_event_loop(ErlNifEnv *env) {
53825235
return -1;
53835236
}
53845237

5385-
if (pthread_cond_init(&loop->sync_sleep_cond, NULL) != 0) {
5386-
pthread_cond_destroy(&loop->event_cond);
5387-
pthread_mutex_destroy(&loop->mutex);
5388-
enif_release_resource(loop);
5389-
return -1;
5390-
}
5391-
loop->sync_sleep_cond_initialized = true;
5392-
atomic_store(&loop->sync_sleep_id, 0);
5393-
atomic_store(&loop->sync_sleep_complete, false);
5394-
53955238
loop->msg_env = enif_alloc_env();
53965239
if (loop->msg_env == NULL) {
5397-
pthread_cond_destroy(&loop->sync_sleep_cond);
53985240
pthread_cond_destroy(&loop->event_cond);
53995241
pthread_mutex_destroy(&loop->mutex);
54005242
enif_release_resource(loop);

c_src/py_event_loop.h

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -246,20 +246,6 @@ typedef struct erlang_event_loop {
246246
/** @brief Flag indicating a wakeup is pending (uvloop-style coalescing) */
247247
_Atomic bool wake_pending;
248248

249-
/* ========== Synchronous Sleep Support ========== */
250-
251-
/** @brief Current synchronous sleep ID being waited on */
252-
_Atomic uint64_t sync_sleep_id;
253-
254-
/** @brief Flag indicating sleep has completed */
255-
_Atomic bool sync_sleep_complete;
256-
257-
/** @brief Condition variable for sleep completion notification */
258-
pthread_cond_t sync_sleep_cond;
259-
260-
/** @brief Whether sync_sleep_cond has been initialized */
261-
bool sync_sleep_cond_initialized;
262-
263249
/** @brief Interpreter ID: 0 = main interpreter, >0 = subinterpreter */
264250
uint32_t interp_id;
265251
} erlang_event_loop_t;

c_src/py_nif.c

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3856,7 +3856,6 @@ static ErlNifFunc nif_funcs[] = {
38563856
{"get_pending", 1, nif_get_pending, 0},
38573857
{"dispatch_callback", 3, nif_dispatch_callback, 0},
38583858
{"dispatch_timer", 2, nif_dispatch_timer, 0},
3859-
{"dispatch_sleep_complete", 2, nif_dispatch_sleep_complete, 0},
38603859
{"get_fd_callback_id", 2, nif_get_fd_callback_id, 0},
38613860
{"reselect_reader", 2, nif_reselect_reader, 0},
38623861
{"reselect_writer", 2, nif_reselect_writer, 0},

docs/asyncio.md

Lines changed: 48 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,9 @@ erlang.run(main())
5454
│ ┌──────────────────┐ └────────────────────────────────────┘ │
5555
│ │ asyncio (via │ │
5656
│ │ erlang.run()) │ ┌────────────────────────────────────┐ │
57-
│ │ sleep() ──┼─{sleep_wait}──▶erlang:send_after() + cond_wait │ │
58-
│ │ gather() │ │ │
59-
│ │ wait_for() │◀──{complete}───pthread_cond_broadcast() │ │
57+
│ │ sleep() │ asyncio.sleep() uses call_later() │ │
58+
│ │ gather() │─call_later()──▶which triggers erlang:send_after │ │
59+
│ │ wait_for() │ │ │
6060
│ │ create_task() │ └────────────────────────────────────┘ │
6161
│ └──────────────────┘ │
6262
│ │
@@ -632,41 +632,42 @@ Unlike Python's standard polling-based event loop, the Erlang event loop uses `e
632632

633633
```
634634
┌─────────────────────────────────────────────────────────────────────────┐
635-
asyncio.sleep() via ErlangEventLoop │
635+
│ asyncio.sleep() via ErlangEventLoop
636636
│ │
637637
│ Python Erlang │
638638
│ ────── ────── │
639639
│ │
640640
│ ┌─────────────────┐ ┌─────────────────────────────────┐ │
641641
│ │ asyncio.sleep │ │ py_event_worker │ │
642642
│ │ (0.1) │ │ │ │
643-
│ └────────┬────────┘ │ handle_info({sleep_wait,...}) │ │
644-
│ │ │ │ │ │
645-
│ ▼ │ ▼ │ │
646-
│ ┌─────────────────┐ │ erlang:send_after(100ms) │ │
647-
│ │ ErlangEventLoop │──{sleep_wait,│ │ │ │
648-
│ │ call_later() │ 100, Id}──▶│ ▼ │ │
649-
│ └────────┬────────┘ │ handle_info({sleep_complete}) │ │
650-
│ │ │ │ │ │
651-
│ ┌────────▼────────┐ │ ▼ │ │
652-
│ │ Release GIL │ │ py_nif:dispatch_sleep_complete │ │
653-
│ │ pthread_cond_ │◀─────────────│ │ │ │
654-
│ │ wait() │ signal └─────────┼───────────────────────┘ │
643+
│ └────────┬────────┘ │ │ │
644+
│ │ │ │ │
645+
│ ▼ │ │ │
646+
│ ┌─────────────────┐ │ │ │
647+
│ │ ErlangEventLoop │──{timer,100, │ erlang:send_after(100ms) │ │
648+
│ │ call_later() │ Id}─────▶│ │ │ │
649+
│ └────────┬────────┘ │ ▼ │ │
650+
│ │ │ handle_info({timeout, ...}) │ │
651+
│ ┌────────▼────────┐ │ │ │ │
652+
│ │ Yield to event │ │ ▼ │ │
653+
│ │ loop (dirty │ │ py_nif:dispatch_timer() │ │
654+
│ │ scheduler │◀─────────────│ │ │ │
655+
│ │ released) │ callback └─────────┼───────────────────────┘ │
655656
│ └────────┬────────┘ │ │
656657
│ │ │ │
657658
│ ▼ ▼ │
658659
│ ┌─────────────────┐ ┌─────────────────────────────────┐ │
659-
│ │ Reacquire GIL │ │ pthread_cond_broadcast() │ │
660-
│ │ Return result │ │ (wakes Python thread) │ │
660+
│ │ Resume after │ │ Timer callback dispatched to │ │
661+
│ │ timer fires │ │ Python pending queue │ │
661662
│ └─────────────────┘ └─────────────────────────────────┘ │
662663
│ │
663664
└─────────────────────────────────────────────────────────────────────────┘
664665
```
665666

666667
**Key features:**
667-
- **GIL released during sleep** - Python thread doesn't hold the GIL while waiting
668+
- **Dirty scheduler released during sleep** - Python yields to event loop, freeing the dirty NIF thread
668669
- **BEAM scheduler integration** - Uses Erlang's native timer system
669-
- **Zero CPU usage** - Condition variable wait, no polling
670+
- **Zero CPU usage** - No polling, event-driven callback
670671
- **Sub-millisecond precision** - Timers managed by BEAM scheduler
671672

672673
### Basic Usage
@@ -688,6 +689,33 @@ result = erlang.run(my_handler())
688689

689690
When using `erlang.run()` or the Erlang event loop, all standard asyncio functions work seamlessly with Erlang's backend.
690691

692+
#### erlang.sleep(seconds)
693+
694+
Sleep for the specified duration. Works in both async and sync contexts, and **always releases the dirty NIF scheduler**.
695+
696+
```python
697+
import erlang
698+
699+
# Async context - releases dirty scheduler via event loop yield
700+
async def async_handler():
701+
await erlang.sleep(0.1) # Uses asyncio.sleep() internally
702+
return "done"
703+
704+
# Sync context - releases dirty scheduler via Erlang process suspension
705+
def sync_handler():
706+
erlang.sleep(0.1) # Uses receive/after, true cooperative yield
707+
return "done"
708+
```
709+
710+
**Dirty Scheduler Release:**
711+
712+
| Context | Mechanism | Dirty Scheduler |
713+
|---------|-----------|-----------------|
714+
| Async (`await erlang.sleep()`) | `asyncio.sleep()` via `call_later()` | Released (yields to event loop) |
715+
| Sync (`erlang.sleep()`) | `erlang.call('_py_sleep')` with `receive/after` | Released (Erlang process suspends) |
716+
717+
Both modes allow other Erlang processes and Python contexts to run during the sleep.
718+
691719
#### asyncio.sleep(delay)
692720

693721
Sleep for the specified delay. Uses Erlang's `erlang:send_after/3` internally.

0 commit comments

Comments
 (0)