diff --git a/CHANGELOG.md b/CHANGELOG.md index 16051a5..a3dc1c8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,12 @@ ### Added +- **Isolated Event Loops** - Create isolated event loops with `ErlangEventLoop(isolated=True)` + - Each isolated loop has its own pending queue + - Full asyncio support (timers, FD operations) via shared router + - Useful for multi-threaded Python applications + - See `docs/asyncio.md` for usage and architecture details + - **Python Logging Integration** - Forward Python's `logging` module to Erlang's `logger` - `py:configure_logging/0,1` - Setup Python logging to forward to Erlang - `erlang.ErlangHandler` - Python logging handler that sends to Erlang diff --git a/c_src/py_event_loop.c b/c_src/py_event_loop.c index 17bf778..dc734af 100644 --- a/c_src/py_event_loop.c +++ b/c_src/py_event_loop.c @@ -89,6 +89,13 @@ static const char *EVENT_LOOP_ATTR_NAME = "_loop"; /* Forward declaration for fallback in get_interpreter_event_loop */ static erlang_event_loop_t *g_python_event_loop; +/* Global flag for isolation mode - set by Erlang via NIF */ +static volatile int g_isolation_mode = 0; /* 0 = global, 1 = per_loop */ + +/* Global shared router PID - set during init, used by all loops in per_loop mode */ +static ErlNifPid g_shared_router; +static volatile int g_shared_router_valid = 0; + /** * Get the py_event_loop module for the current interpreter. * MUST be called with GIL held. @@ -1420,6 +1427,88 @@ ERL_NIF_TERM nif_reselect_writer(ErlNifEnv *env, int argc, return ATOM_OK; } +/** + * reselect_reader_fd(FdRes) -> ok | {error, Reason} + * + * Re-register an fd for read monitoring using fd_res->loop. + * This variant doesn't require LoopRef since the fd resource + * already has a back-reference to its parent loop. + */ +ERL_NIF_TERM nif_reselect_reader_fd(ErlNifEnv *env, int argc, + const ERL_NIF_TERM argv[]) { + (void)argc; + + fd_resource_t *fd_res; + if (!enif_get_resource(env, argv[0], FD_RESOURCE_TYPE, (void **)&fd_res)) { + return make_error(env, "invalid_fd_ref"); + } + + /* Don't reselect if reader was removed or FD is closing */ + if (!fd_res->reader_active) { + return ATOM_OK; + } + if (atomic_load(&fd_res->closing_state) != FD_STATE_OPEN) { + return ATOM_OK; + } + + /* Use the loop stored in the fd resource */ + erlang_event_loop_t *loop = fd_res->loop; + if (loop == NULL || !loop->has_router) { + return make_error(env, "no_loop"); + } + + /* Re-register with Erlang scheduler for read monitoring */ + int ret = enif_select(env, (ErlNifEvent)fd_res->fd, ERL_NIF_SELECT_READ, + fd_res, &loop->router_pid, enif_make_ref(env)); + + if (ret < 0) { + return make_error(env, "reselect_failed"); + } + + return ATOM_OK; +} + +/** + * reselect_writer_fd(FdRes) -> ok | {error, Reason} + * + * Re-register an fd for write monitoring using fd_res->loop. + * This variant doesn't require LoopRef since the fd resource + * already has a back-reference to its parent loop. + */ +ERL_NIF_TERM nif_reselect_writer_fd(ErlNifEnv *env, int argc, + const ERL_NIF_TERM argv[]) { + (void)argc; + + fd_resource_t *fd_res; + if (!enif_get_resource(env, argv[0], FD_RESOURCE_TYPE, (void **)&fd_res)) { + return make_error(env, "invalid_fd_ref"); + } + + /* Don't reselect if writer was removed or FD is closing */ + if (!fd_res->writer_active) { + return ATOM_OK; + } + if (atomic_load(&fd_res->closing_state) != FD_STATE_OPEN) { + return ATOM_OK; + } + + /* Use the loop stored in the fd resource */ + erlang_event_loop_t *loop = fd_res->loop; + if (loop == NULL || !loop->has_router) { + return make_error(env, "no_loop"); + } + + /* Re-register with Erlang scheduler for write monitoring */ + int ret = enif_select(env, (ErlNifEvent)fd_res->fd, ERL_NIF_SELECT_WRITE, + fd_res, &loop->router_pid, enif_make_ref(env)); + + if (ret < 0) { + return make_error(env, "reselect_failed"); + } + + return ATOM_OK; +} + /** * stop_reader(FdRef) -> ok | {error, Reason} * @@ -2220,6 +2309,45 @@ ERL_NIF_TERM nif_set_python_event_loop(ErlNifEnv *env, int argc, return ATOM_OK; } +/** + * set_isolation_mode(Mode) -> ok + * + * Set the event loop isolation mode. + * Called from Erlang: py_nif:set_isolation_mode(global | per_loop) + */ +ERL_NIF_TERM nif_set_isolation_mode(ErlNifEnv *env, int argc, + const ERL_NIF_TERM argv[]) { + (void)argc; + + if (enif_is_atom(env, argv[0])) { + char atom_buf[32]; + if (enif_get_atom(env, argv[0], atom_buf, sizeof(atom_buf), ERL_NIF_LATIN1)) { + if (strcmp(atom_buf, "per_loop") == 0) { + g_isolation_mode = 1; + } else { + g_isolation_mode = 0; /* global or any other value */ + } + return ATOM_OK; + } + } + return make_error(env, "invalid_mode"); +} + +/** + * Set the shared router PID for per-loop created loops. + * This router will be used by all loops created via _loop_new(). + */ +ERL_NIF_TERM nif_set_shared_router(ErlNifEnv *env, int argc, + const ERL_NIF_TERM argv[]) { + (void)argc; + + if (!enif_get_local_pid(env, argv[0], &g_shared_router)) { + return make_error(env, "invalid_pid"); + } + g_shared_router_valid = 1; + return ATOM_OK; +} + /* Python function: _poll_events(timeout_ms) -> num_events */ static PyObject *py_poll_events(PyObject *self, PyObject *args) { (void)self; @@ -2376,6 +2504,17 @@ static PyObject *py_is_initialized(PyObject *self, PyObject *args) { Py_RETURN_FALSE; } +/* Python function: _get_isolation_mode() -> str ("global" or "per_loop") */ +static PyObject *py_get_isolation_mode(PyObject *self, PyObject *args) { + (void)self; + (void)args; + + if (g_isolation_mode == 1) { + return PyUnicode_FromString("per_loop"); + } + return PyUnicode_FromString("global"); +} + /* Python function: _add_reader(fd, callback_id) -> fd_key */ static PyObject *py_add_reader(PyObject *self, PyObject *args) { (void)self; @@ -2724,20 +2863,621 @@ static PyObject *py_run_once(PyObject *self, PyObject *args) { return list; } +/* ============================================================================ + * Handle-Based Python API (_for methods) + * ============================================================================ + * + * These methods take an explicit loop handle (PyCapsule) as the first argument, + * allowing Python code to operate on multiple independent event loops. + */ + +/** @brief Capsule name for event loop handles */ +static const char *LOOP_CAPSULE_NAME = "erlang_python.event_loop"; + +/** + * Extract event loop pointer from a PyCapsule. + * + * @param capsule The PyCapsule containing an erlang_event_loop_t pointer + * @return Pointer to the event loop, or NULL if invalid + */ +static erlang_event_loop_t *loop_from_capsule(PyObject *capsule) { + if (!PyCapsule_CheckExact(capsule)) { + PyErr_SetString(PyExc_TypeError, "Expected event loop capsule"); + return NULL; + } + void *ptr = PyCapsule_GetPointer(capsule, LOOP_CAPSULE_NAME); + if (ptr == NULL) { + /* PyCapsule_GetPointer sets appropriate error */ + return NULL; + } + return (erlang_event_loop_t *)ptr; +} + +/** + * Get the default event loop for backward compatibility. + * Used by legacy API methods. + */ +static erlang_event_loop_t *default_loop_for_compat(void) { + return get_interpreter_event_loop(); +} + +/** + * Destructor callback for loop capsules. + * Called when the capsule is garbage collected. + */ +static void loop_capsule_destructor(PyObject *capsule) { + erlang_event_loop_t *loop = (erlang_event_loop_t *)PyCapsule_GetPointer( + capsule, LOOP_CAPSULE_NAME); + if (loop != NULL) { + /* Signal shutdown */ + loop->shutdown = true; + + /* Wake up any waiting threads */ + pthread_mutex_lock(&loop->mutex); + pthread_cond_broadcast(&loop->event_cond); + pthread_mutex_unlock(&loop->mutex); + + /* Release the NIF resource reference */ + enif_release_resource(loop); + } +} + +/* Python function: _loop_new() -> capsule */ +static PyObject *py_loop_new(PyObject *self, PyObject *args) { + (void)self; + (void)args; + + /* Allocate event loop resource */ + erlang_event_loop_t *loop = enif_alloc_resource( + EVENT_LOOP_RESOURCE_TYPE, sizeof(erlang_event_loop_t)); + + if (loop == NULL) { + PyErr_SetString(PyExc_MemoryError, "Failed to allocate event loop"); + return NULL; + } + + /* Initialize fields */ + memset(loop, 0, sizeof(erlang_event_loop_t)); + + if (pthread_mutex_init(&loop->mutex, NULL) != 0) { + enif_release_resource(loop); + PyErr_SetString(PyExc_RuntimeError, "Failed to initialize mutex"); + return NULL; + } + + if (pthread_cond_init(&loop->event_cond, NULL) != 0) { + pthread_mutex_destroy(&loop->mutex); + enif_release_resource(loop); + PyErr_SetString(PyExc_RuntimeError, "Failed to initialize condition variable"); + return NULL; + } + + loop->msg_env = enif_alloc_env(); + if (loop->msg_env == NULL) { + pthread_cond_destroy(&loop->event_cond); + pthread_mutex_destroy(&loop->mutex); + enif_release_resource(loop); + PyErr_SetString(PyExc_MemoryError, "Failed to allocate message environment"); + return NULL; + } + + atomic_store(&loop->next_callback_id, 1); + atomic_store(&loop->pending_count, 0); + loop->pending_head = NULL; + loop->pending_tail = NULL; + loop->shutdown = false; + loop->has_router = false; + loop->has_self = false; + loop->event_freelist = NULL; + loop->freelist_count = 0; + + /* Use shared router if available (for per-loop mode) */ + if (g_shared_router_valid) { + loop->router_pid = g_shared_router; + loop->has_router = true; + } + + /* Create a capsule wrapping the loop pointer */ + PyObject *capsule = PyCapsule_New(loop, LOOP_CAPSULE_NAME, loop_capsule_destructor); + if (capsule == NULL) { + pthread_cond_destroy(&loop->event_cond); + pthread_mutex_destroy(&loop->mutex); + enif_free_env(loop->msg_env); + enif_release_resource(loop); + return NULL; + } + + /* Keep a reference to the NIF resource (capsule destructor will release) */ + enif_keep_resource(loop); + + return capsule; +} + +/* Python function: _loop_destroy(capsule) -> None */ +static PyObject *py_loop_destroy(PyObject *self, PyObject *args) { + (void)self; + PyObject *capsule; + + if (!PyArg_ParseTuple(args, "O", &capsule)) { + return NULL; + } + + erlang_event_loop_t *loop = loop_from_capsule(capsule); + if (loop == NULL) { + return NULL; + } + + /* Signal shutdown */ + loop->shutdown = true; + + /* Wake up any waiting threads */ + pthread_mutex_lock(&loop->mutex); + pthread_cond_broadcast(&loop->event_cond); + pthread_mutex_unlock(&loop->mutex); + + Py_RETURN_NONE; +} + +/* Python function: _run_once_native_for(capsule, timeout_ms) -> [(callback_id, event_type), ...] */ +static PyObject *py_run_once_for(PyObject *self, PyObject *args) { + (void)self; + PyObject *capsule; + int timeout_ms; + + if (!PyArg_ParseTuple(args, "Oi", &capsule, &timeout_ms)) { + return NULL; + } + + erlang_event_loop_t *loop = loop_from_capsule(capsule); + if (loop == NULL) { + return NULL; + } + + if (loop->shutdown) { + return PyList_New(0); + } + + /* Release GIL while waiting for events */ + Py_BEGIN_ALLOW_THREADS + poll_events_wait(loop, timeout_ms); + Py_END_ALLOW_THREADS + + /* Build pending list with GIL held */ + pthread_mutex_lock(&loop->mutex); + + int count = atomic_load(&loop->pending_count); + if (count == 0) { + pthread_mutex_unlock(&loop->mutex); + return PyList_New(0); + } + + PyObject *list = PyList_New(count); + if (list == NULL) { + pthread_mutex_unlock(&loop->mutex); + return NULL; + } + + pending_event_t *current = loop->pending_head; + int i = 0; + while (current != NULL && i < count) { + PyObject *tuple = make_event_tuple(current->callback_id, (int)current->type); + if (tuple == NULL) { + Py_DECREF(list); + while (current != NULL) { + pending_event_t *next = current->next; + return_pending_event(loop, current); + current = next; + } + loop->pending_head = NULL; + loop->pending_tail = NULL; + atomic_store(&loop->pending_count, 0); + pending_hash_clear(loop); + pthread_mutex_unlock(&loop->mutex); + return NULL; + } + PyList_SET_ITEM(list, i++, tuple); + + pending_event_t *next = current->next; + return_pending_event(loop, current); + current = next; + } + + while (current != NULL) { + pending_event_t *next = current->next; + return_pending_event(loop, current); + current = next; + } + + loop->pending_head = NULL; + loop->pending_tail = NULL; + atomic_store(&loop->pending_count, 0); + pending_hash_clear(loop); + + pthread_mutex_unlock(&loop->mutex); + + return list; +} + +/* Python function: _add_reader_for(capsule, fd, callback_id) -> fd_key */ +static PyObject *py_add_reader_for(PyObject *self, PyObject *args) { + (void)self; + PyObject *capsule; + int fd; + unsigned long long callback_id; + + if (!PyArg_ParseTuple(args, "OiK", &capsule, &fd, &callback_id)) { + return NULL; + } + + erlang_event_loop_t *loop = loop_from_capsule(capsule); + if (loop == NULL) { + return NULL; + } + + if (!loop->has_router) { + PyErr_SetString(PyExc_RuntimeError, "Event loop has no router"); + return NULL; + } + + fd_resource_t *fd_res = enif_alloc_resource(FD_RESOURCE_TYPE, sizeof(fd_resource_t)); + if (fd_res == NULL) { + PyErr_SetString(PyExc_MemoryError, "Failed to allocate fd resource"); + return NULL; + } + + fd_res->fd = fd; + fd_res->loop = loop; + fd_res->read_callback_id = callback_id; + fd_res->write_callback_id = 0; + fd_res->reader_active = true; + fd_res->writer_active = false; + fd_res->owner_pid = loop->router_pid; + atomic_store(&fd_res->closing_state, FD_STATE_OPEN); + fd_res->monitor_active = false; + fd_res->owns_fd = false; + + int ret = enif_select(loop->msg_env, (ErlNifEvent)fd, + ERL_NIF_SELECT_READ, fd_res, &loop->router_pid, ATOM_UNDEFINED); + + if (ret < 0) { + enif_release_resource(fd_res); + PyErr_SetString(PyExc_RuntimeError, "Failed to register fd for reading"); + return NULL; + } + + unsigned long long key = (unsigned long long)(uintptr_t)fd_res; + return PyLong_FromUnsignedLongLong(key); +} + +/* Python function: _remove_reader_for(capsule, fd_key) -> None */ +static PyObject *py_remove_reader_for(PyObject *self, PyObject *args) { + (void)self; + PyObject *capsule; + unsigned long long fd_key; + + if (!PyArg_ParseTuple(args, "OK", &capsule, &fd_key)) { + return NULL; + } + + /* Validate capsule (but we use fd_res->loop directly) */ + if (!PyCapsule_CheckExact(capsule)) { + PyErr_SetString(PyExc_TypeError, "Expected event loop capsule"); + return NULL; + } + + fd_resource_t *fd_res = (fd_resource_t *)(uintptr_t)fd_key; + if (fd_res != NULL && fd_res->loop != NULL) { + enif_select(fd_res->loop->msg_env, (ErlNifEvent)fd_res->fd, + ERL_NIF_SELECT_STOP, fd_res, NULL, ATOM_UNDEFINED); + fd_res->reader_active = false; + enif_release_resource(fd_res); + } + + Py_RETURN_NONE; +} + +/* Python function: _add_writer_for(capsule, fd, callback_id) -> fd_key */ +static PyObject *py_add_writer_for(PyObject *self, PyObject *args) { + (void)self; + PyObject *capsule; + int fd; + unsigned long long callback_id; + + if (!PyArg_ParseTuple(args, "OiK", &capsule, &fd, &callback_id)) { + return NULL; + } + + erlang_event_loop_t *loop = loop_from_capsule(capsule); + if (loop == NULL) { + return NULL; + } + + if (!loop->has_router) { + PyErr_SetString(PyExc_RuntimeError, "Event loop has no router"); + return NULL; + } + + fd_resource_t *fd_res = enif_alloc_resource(FD_RESOURCE_TYPE, sizeof(fd_resource_t)); + if (fd_res == NULL) { + PyErr_SetString(PyExc_MemoryError, "Failed to allocate fd resource"); + return NULL; + } + + fd_res->fd = fd; + fd_res->loop = loop; + fd_res->read_callback_id = 0; + fd_res->write_callback_id = callback_id; + fd_res->reader_active = false; + fd_res->writer_active = true; + fd_res->owner_pid = loop->router_pid; + atomic_store(&fd_res->closing_state, FD_STATE_OPEN); + fd_res->monitor_active = false; + fd_res->owns_fd = false; + + int ret = enif_select(loop->msg_env, (ErlNifEvent)fd, + ERL_NIF_SELECT_WRITE, fd_res, &loop->router_pid, ATOM_UNDEFINED); + + if (ret < 0) { + enif_release_resource(fd_res); + PyErr_SetString(PyExc_RuntimeError, "Failed to register fd for writing"); + return NULL; + } + + unsigned long long key = (unsigned long long)(uintptr_t)fd_res; + return PyLong_FromUnsignedLongLong(key); +} + +/* Python function: _remove_writer_for(capsule, fd_key) -> None */ +static PyObject *py_remove_writer_for(PyObject *self, PyObject *args) { + (void)self; + PyObject *capsule; + unsigned long long fd_key; + + if (!PyArg_ParseTuple(args, "OK", &capsule, &fd_key)) { + return NULL; + } + + /* Validate capsule (but we use fd_res->loop directly) */ + if (!PyCapsule_CheckExact(capsule)) { + PyErr_SetString(PyExc_TypeError, "Expected event loop capsule"); + return NULL; + } + + fd_resource_t *fd_res = (fd_resource_t *)(uintptr_t)fd_key; + if (fd_res != NULL && fd_res->loop != NULL) { + enif_select(fd_res->loop->msg_env, (ErlNifEvent)fd_res->fd, + ERL_NIF_SELECT_STOP, fd_res, NULL, ATOM_UNDEFINED); + fd_res->writer_active = false; + enif_release_resource(fd_res); + } + + Py_RETURN_NONE; +} + +/* Python function: _schedule_timer_for(capsule, delay_ms, callback_id) -> timer_ref */ +static PyObject *py_schedule_timer_for(PyObject *self, PyObject *args) { + (void)self; + PyObject *capsule; + int delay_ms; + unsigned long long callback_id; + + if (!PyArg_ParseTuple(args, "OiK", &capsule, &delay_ms, &callback_id)) { + return NULL; + } + + erlang_event_loop_t *loop = loop_from_capsule(capsule); + if (loop == NULL) { + return NULL; + } + + if (!loop->has_router) { + PyErr_SetString(PyExc_RuntimeError, "Event loop has no router"); + return NULL; + } + + if (delay_ms < 0) delay_ms = 0; + + uint64_t timer_ref_id = atomic_fetch_add(&loop->next_callback_id, 1); + + ErlNifEnv *msg_env = enif_alloc_env(); + if (msg_env == NULL) { + PyErr_SetString(PyExc_MemoryError, "Failed to allocate message env"); + return NULL; + } + + /* Include loop resource in message so router dispatches to correct loop */ + ERL_NIF_TERM loop_term = enif_make_resource(msg_env, loop); + + ERL_NIF_TERM msg = enif_make_tuple5( + msg_env, + ATOM_START_TIMER, + loop_term, + enif_make_int(msg_env, delay_ms), + enif_make_uint64(msg_env, callback_id), + enif_make_uint64(msg_env, timer_ref_id) + ); + + int send_result = enif_send(NULL, &loop->router_pid, msg_env, msg); + enif_free_env(msg_env); + + if (!send_result) { + PyErr_SetString(PyExc_RuntimeError, "Failed to send timer message"); + return NULL; + } + + return PyLong_FromUnsignedLongLong(timer_ref_id); +} + +/* Python function: _cancel_timer_for(capsule, timer_ref) -> None */ +static PyObject *py_cancel_timer_for(PyObject *self, PyObject *args) { + (void)self; + PyObject *capsule; + unsigned long long timer_ref_id; + + if (!PyArg_ParseTuple(args, "OK", &capsule, &timer_ref_id)) { + return NULL; + } + + erlang_event_loop_t *loop = loop_from_capsule(capsule); + if (loop == NULL) { + PyErr_Clear(); /* Don't fail on cancel */ + Py_RETURN_NONE; + } + + if (!loop->has_router) { + Py_RETURN_NONE; + } + + ErlNifEnv *msg_env = enif_alloc_env(); + if (msg_env == NULL) { + Py_RETURN_NONE; + } + + ERL_NIF_TERM msg = enif_make_tuple2( + msg_env, + ATOM_CANCEL_TIMER, + enif_make_uint64(msg_env, timer_ref_id) + ); + + enif_send(NULL, &loop->router_pid, msg_env, msg); + enif_free_env(msg_env); + Py_RETURN_NONE; +} + +/* Python function: _wakeup_for(capsule) -> None */ +static PyObject *py_wakeup_for(PyObject *self, PyObject *args) { + (void)self; + PyObject *capsule; + + if (!PyArg_ParseTuple(args, "O", &capsule)) { + return NULL; + } + + erlang_event_loop_t *loop = loop_from_capsule(capsule); + if (loop == NULL) { + PyErr_Clear(); + Py_RETURN_NONE; + } + + pthread_mutex_lock(&loop->mutex); + pthread_cond_broadcast(&loop->event_cond); + pthread_mutex_unlock(&loop->mutex); + + Py_RETURN_NONE; +} + +/* Python function: _is_initialized_for(capsule) -> bool */ +static PyObject *py_is_initialized_for(PyObject *self, PyObject *args) { + (void)self; + PyObject *capsule; + + if (!PyArg_ParseTuple(args, "O", &capsule)) { + return NULL; + } + + if (!PyCapsule_CheckExact(capsule)) { + Py_RETURN_FALSE; + } + + void *ptr = PyCapsule_GetPointer(capsule, LOOP_CAPSULE_NAME); + if (ptr == NULL) { + PyErr_Clear(); + Py_RETURN_FALSE; + } + + erlang_event_loop_t *loop = (erlang_event_loop_t *)ptr; + if (loop->shutdown) { + Py_RETURN_FALSE; + } + + Py_RETURN_TRUE; +} + +/* Python function: _get_pending_for(capsule) -> [(callback_id, event_type), ...] */ +static PyObject *py_get_pending_for(PyObject *self, PyObject *args) { + (void)self; + PyObject *capsule; + + if (!PyArg_ParseTuple(args, "O", &capsule)) { + return NULL; + } + + erlang_event_loop_t *loop = loop_from_capsule(capsule); + if (loop == NULL) { + PyErr_Clear(); + return PyList_New(0); + } + + pthread_mutex_lock(&loop->mutex); + + int count = 0; + pending_event_t *current = loop->pending_head; + while (current != NULL) { + count++; + current = current->next; + } + + PyObject *list = PyList_New(count); + if (list == NULL) { + pthread_mutex_unlock(&loop->mutex); + return NULL; + } + + current = loop->pending_head; + int i = 0; + while (current != NULL) { + PyObject *tuple = make_event_tuple(current->callback_id, (int)current->type); + if (tuple == NULL) { + Py_DECREF(list); + pthread_mutex_unlock(&loop->mutex); + return NULL; + } + PyList_SET_ITEM(list, i++, tuple); + + pending_event_t *next = current->next; + return_pending_event(loop, current); + current = next; + } + + loop->pending_head = NULL; + loop->pending_tail = NULL; + atomic_store(&loop->pending_count, 0); + pending_hash_clear(loop); + + pthread_mutex_unlock(&loop->mutex); + + return list; +} + /* Module method definitions */ static PyMethodDef PyEventLoopMethods[] = { + /* Legacy API (uses global event loop) */ {"_poll_events", py_poll_events, METH_VARARGS, "Wait for events with timeout"}, {"_get_pending", py_get_pending, METH_NOARGS, "Get and clear pending events"}, {"_run_once_native", py_run_once, METH_VARARGS, "Combined poll + get_pending with int event types"}, {"_wakeup", py_wakeup, METH_NOARGS, "Wake up the event loop"}, {"_add_pending", py_add_pending, METH_VARARGS, "Add a pending event"}, {"_is_initialized", py_is_initialized, METH_NOARGS, "Check if event loop is initialized"}, + {"_get_isolation_mode", py_get_isolation_mode, METH_NOARGS, "Get event loop isolation mode (global or per_loop)"}, {"_add_reader", py_add_reader, METH_VARARGS, "Register fd for read monitoring"}, {"_remove_reader", py_remove_reader, METH_VARARGS, "Stop monitoring fd for reads"}, {"_add_writer", py_add_writer, METH_VARARGS, "Register fd for write monitoring"}, {"_remove_writer", py_remove_writer, METH_VARARGS, "Stop monitoring fd for writes"}, {"_schedule_timer", py_schedule_timer, METH_VARARGS, "Schedule a timer with Erlang"}, {"_cancel_timer", py_cancel_timer, METH_VARARGS, "Cancel an Erlang timer"}, + /* Handle-based API (takes explicit loop capsule) */ + {"_loop_new", py_loop_new, METH_NOARGS, "Create a new event loop, returns capsule"}, + {"_loop_destroy", py_loop_destroy, METH_VARARGS, "Destroy an event loop"}, + {"_run_once_native_for", py_run_once_for, METH_VARARGS, "Combined poll + get_pending for specific loop"}, + {"_get_pending_for", py_get_pending_for, METH_VARARGS, "Get and clear pending events for specific loop"}, + {"_wakeup_for", py_wakeup_for, METH_VARARGS, "Wake up specific event loop"}, + {"_is_initialized_for", py_is_initialized_for, METH_VARARGS, "Check if specific loop is initialized"}, + {"_add_reader_for", py_add_reader_for, METH_VARARGS, "Register fd for read monitoring on specific loop"}, + {"_remove_reader_for", py_remove_reader_for, METH_VARARGS, "Stop monitoring fd for reads on specific loop"}, + {"_add_writer_for", py_add_writer_for, METH_VARARGS, "Register fd for write monitoring on specific loop"}, + {"_remove_writer_for", py_remove_writer_for, METH_VARARGS, "Stop monitoring fd for writes on specific loop"}, + {"_schedule_timer_for", py_schedule_timer_for, METH_VARARGS, "Schedule timer on specific loop"}, + {"_cancel_timer_for", py_cancel_timer_for, METH_VARARGS, "Cancel timer on specific loop"}, {NULL, NULL, 0, NULL} }; diff --git a/c_src/py_nif.c b/c_src/py_nif.c index b140dbb..96159ff 100644 --- a/c_src/py_nif.c +++ b/c_src/py_nif.c @@ -1896,6 +1896,8 @@ static ErlNifFunc nif_funcs[] = { {"get_fd_callback_id", 2, nif_get_fd_callback_id, 0}, {"reselect_reader", 2, nif_reselect_reader, 0}, {"reselect_writer", 2, nif_reselect_writer, 0}, + {"reselect_reader_fd", 1, nif_reselect_reader_fd, 0}, + {"reselect_writer_fd", 1, nif_reselect_writer_fd, 0}, /* FD lifecycle management (uvloop-like API) */ {"handle_fd_event", 2, nif_handle_fd_event, 0}, {"stop_reader", 1, nif_stop_reader, 0}, @@ -1921,6 +1923,8 @@ static ErlNifFunc nif_funcs[] = { {"set_udp_broadcast", 2, nif_set_udp_broadcast, 0}, /* Python event loop integration */ {"set_python_event_loop", 1, nif_set_python_event_loop, 0}, + {"set_isolation_mode", 1, nif_set_isolation_mode, 0}, + {"set_shared_router", 1, nif_set_shared_router, 0}, /* ASGI optimizations */ {"asgi_build_scope", 1, nif_asgi_build_scope, ERL_NIF_DIRTY_JOB_IO_BOUND}, diff --git a/docs/asyncio.md b/docs/asyncio.md index c3e25b2..95eba39 100644 --- a/docs/asyncio.md +++ b/docs/asyncio.md @@ -449,6 +449,99 @@ ok = py_nif:event_loop_set_router(LoopRef, RouterPid). Events are delivered as Erlang messages, enabling the event loop to participate in BEAM's supervision trees and distributed computing capabilities. +## Isolated Event Loops + +By default, all `ErlangEventLoop` instances share a single underlying native event loop managed by Erlang. For multi-threaded applications where each thread needs its own event loop, you can create isolated loops. + +### Creating an Isolated Loop + +Use the `isolated=True` parameter to create a loop with its own pending queue: + +```python +from erlang_loop import ErlangEventLoop + +# Default: uses shared global loop +shared_loop = ErlangEventLoop() + +# Isolated: creates its own native loop +isolated_loop = ErlangEventLoop(isolated=True) +``` + +### When to Use Isolated Loops + +| Use Case | Loop Type | +|----------|-----------| +| Single-threaded asyncio applications | Default (shared) | +| Web frameworks (ASGI/WSGI) | Default (shared) | +| Multi-threaded Python with separate event loops | `isolated=True` | +| Sub-interpreters | `isolated=True` | +| Free-threaded Python (3.13+) | `isolated=True` | +| Testing loop isolation | `isolated=True` | + +### Multi-threaded Example + +```python +from erlang_loop import ErlangEventLoop +import threading + +def run_isolated_tasks(loop_id): + """Each thread gets its own isolated event loop.""" + loop = ErlangEventLoop(isolated=True) + + results = [] + + def callback(): + results.append(loop_id) + + # Schedule callbacks - isolated to this loop + loop.call_soon(callback) + loop.call_later(0.01, callback) + + # Process events + import time + deadline = time.time() + 0.5 + while time.time() < deadline and len(results) < 2: + loop._run_once() + time.sleep(0.01) + + loop.close() + return results + +# Run in separate threads +t1 = threading.Thread(target=run_isolated_tasks, args=('loop_a',)) +t2 = threading.Thread(target=run_isolated_tasks, args=('loop_b',)) + +t1.start() +t2.start() +t1.join() +t2.join() +# Each thread only sees its own callbacks +``` + +### Architecture + +A shared router process handles timer and FD events for all loops: + +``` +┌─────────────────────────────────────────────────────────────────┐ +│ py_event_router (shared) │ +│ │ +│ Receives: │ +│ - Timer expirations from erlang:send_after │ +│ - FD ready events from enif_select │ +│ │ +│ Dispatches to correct loop via resource backref │ +└─────────────────────────────────────────────────────────────────┘ + ▲ ▲ ▲ + │ │ │ + ┌────┴────┐ ┌────┴────┐ ┌────┴────┐ + │ Loop A │ │ Loop B │ │ Loop C │ + │ pending │ │ pending │ │ pending │ + └─────────┘ └─────────┘ └─────────┘ +``` + +Each isolated loop has its own pending queue, ensuring callbacks are processed only by the loop that scheduled them. The shared router dispatches timer and FD events to the correct loop based on the resource backref. + ## See Also - [Threading](threading.md) - For `erlang.async_call()` in asyncio contexts diff --git a/priv/erlang_loop.py b/priv/erlang_loop.py index f5378e1..e0b588f 100644 --- a/priv/erlang_loop.py +++ b/priv/erlang_loop.py @@ -79,7 +79,8 @@ class ErlangEventLoop(asyncio.AbstractEventLoop): # Use __slots__ for faster attribute access and reduced memory __slots__ = ( - '_pel', '_readers', '_writers', '_readers_by_cid', '_writers_by_cid', + '_pel', '_loop_handle', # Native loop handle (capsule) for per-loop isolation + '_readers', '_writers', '_readers_by_cid', '_writers_by_cid', '_timers', '_timer_refs', '_timer_heap', '_handle_to_callback_id', '_ready', '_callback_id', '_handle_pool', '_handle_pool_max', '_running', '_stopping', '_closed', @@ -89,22 +90,36 @@ class ErlangEventLoop(asyncio.AbstractEventLoop): '_ready_append', '_ready_popleft', ) - def __init__(self): + def __init__(self, isolated=False): """Initialize the Erlang event loop. The event loop is backed by Erlang's scheduler via the py_event_loop C module. This module provides direct access to the event loop without going through Erlang callbacks. + + Args: + isolated: If True, create an isolated event loop with its own + pending queue. Useful for multi-threaded applications where + each thread needs its own event loop. If False (default), + use the shared global loop managed by Erlang. """ try: import py_event_loop as pel self._pel = pel - if not pel._is_initialized(): - raise RuntimeError("Erlang event loop not initialized. " - "Make sure erlang_python application is started.") + + if isolated: + # Create a new isolated loop handle + self._loop_handle = pel._loop_new() + else: + # Use shared global loop - check it's initialized + if not pel._is_initialized(): + raise RuntimeError("Erlang event loop not initialized. " + "Make sure erlang_python application is started.") + self._loop_handle = None except ImportError: # Fallback for testing without actual NIF self._pel = _MockNifModule() + self._loop_handle = None # Callback management self._readers = {} # fd -> (callback, args, callback_id, fd_key) @@ -245,7 +260,10 @@ def close(self): timer_ref = self._timer_refs.get(callback_id) if timer_ref is not None: try: - self._pel._cancel_timer(timer_ref) + if self._loop_handle is not None: + self._pel._cancel_timer_for(self._loop_handle, timer_ref) + else: + self._pel._cancel_timer(timer_ref) except (AttributeError, RuntimeError): pass self._timers.clear() @@ -259,6 +277,14 @@ def close(self): for fd in list(self._writers.keys()): self.remove_writer(fd) + # Destroy the native loop handle if we have one + if self._loop_handle is not None: + try: + self._pel._loop_destroy(self._loop_handle) + except (AttributeError, RuntimeError): + pass + self._loop_handle = None + # Shutdown default executor if self._default_executor is not None: self._default_executor.shutdown(wait=False) @@ -291,7 +317,10 @@ def call_soon_threadsafe(self, callback, *args, context=None): handle = self.call_soon(callback, *args, context=context) # Wake up the event loop try: - self._pel._wakeup() + if self._loop_handle is not None: + self._pel._wakeup_for(self._loop_handle) + else: + self._pel._wakeup() except Exception: pass return handle @@ -317,7 +346,10 @@ def call_at(self, when, callback, *args, context=None): # Schedule with Erlang's native timer system delay_ms = max(0, int((when - self.time()) * 1000)) try: - timer_ref = self._pel._schedule_timer(delay_ms, callback_id) + if self._loop_handle is not None: + timer_ref = self._pel._schedule_timer_for(self._loop_handle, delay_ms, callback_id) + else: + timer_ref = self._pel._schedule_timer(delay_ms, callback_id) self._timer_refs[callback_id] = timer_ref except AttributeError: pass # Fallback: mock module doesn't have _schedule_timer @@ -381,7 +413,10 @@ def add_reader(self, fd, callback, *args): callback_id = self._next_id() try: - fd_key = self._pel._add_reader(fd, callback_id) + if self._loop_handle is not None: + fd_key = self._pel._add_reader_for(self._loop_handle, fd, callback_id) + else: + fd_key = self._pel._add_reader(fd, callback_id) self._readers[fd] = (callback, args, callback_id, fd_key) self._readers_by_cid[callback_id] = fd # Reverse map for O(1) dispatch except Exception as e: @@ -397,7 +432,10 @@ def remove_reader(self, fd): self._readers_by_cid.pop(callback_id, None) # Clean up reverse map try: if fd_key is not None: - self._pel._remove_reader(fd_key) + if self._loop_handle is not None: + self._pel._remove_reader_for(self._loop_handle, fd_key) + else: + self._pel._remove_reader(fd_key) except Exception: pass return True @@ -411,7 +449,10 @@ def add_writer(self, fd, callback, *args): callback_id = self._next_id() try: - fd_key = self._pel._add_writer(fd, callback_id) + if self._loop_handle is not None: + fd_key = self._pel._add_writer_for(self._loop_handle, fd, callback_id) + else: + fd_key = self._pel._add_writer(fd, callback_id) self._writers[fd] = (callback, args, callback_id, fd_key) self._writers_by_cid[callback_id] = fd # Reverse map for O(1) dispatch except Exception as e: @@ -427,7 +468,10 @@ def remove_writer(self, fd): self._writers_by_cid.pop(callback_id, None) # Clean up reverse map try: if fd_key is not None: - self._pel._remove_writer(fd_key) + if self._loop_handle is not None: + self._pel._remove_writer_for(self._loop_handle, fd_key) + else: + self._pel._remove_writer(fd_key) except Exception: pass return True @@ -839,7 +883,11 @@ def _run_once(self): # Use combined poll + get_pending (single NIF call, integer event types) try: - pending = self._pel._run_once_native(timeout) + # Use handle-based API when loop_handle is set, legacy otherwise + if self._loop_handle is not None: + pending = self._pel._run_once_native_for(self._loop_handle, timeout) + else: + pending = self._pel._run_once_native(timeout) dispatch = self._dispatch for callback_id, event_type in pending: dispatch(callback_id, event_type) diff --git a/priv/test_multi_loop.py b/priv/test_multi_loop.py new file mode 100644 index 0000000..0e6cb7a --- /dev/null +++ b/priv/test_multi_loop.py @@ -0,0 +1,228 @@ +""" +Test helpers for multi-loop isolation testing. + +These helpers are used to verify that multiple ErlangEventLoop instances +can operate concurrently without cross-talk between their pending queues. + +Usage from Erlang: + py:call(test_multi_loop, run_concurrent_timers, [100]) +""" + +import asyncio +import time +from typing import List, Tuple, Dict, Set + +# Try to import the actual event loop module +try: + import py_event_loop as pel + from erlang_loop import ErlangEventLoop + HAS_EVENT_LOOP = True +except ImportError: + HAS_EVENT_LOOP = False + + +def check_available() -> bool: + """Check if multi-loop testing is available.""" + return HAS_EVENT_LOOP + + +def create_isolated_loops(count: int = 2) -> List: + """ + Create multiple isolated ErlangEventLoop instances. + + This tests that _loop_new() creates truly independent loops. + With proper isolation, each loop has its own: + - pending queue + - callback ID space (though we use Python-side IDs) + - condition variable for wakeup + + Returns: + List of ErlangEventLoop instances + """ + if not HAS_EVENT_LOOP: + raise RuntimeError("Event loop module not available") + + loops = [] + for _ in range(count): + loop = ErlangEventLoop() + loops.append(loop) + return loops + + +def run_concurrent_timers_on_loop(loop, timer_count: int, base_id: int) -> List[int]: + """ + Schedule timers on a loop and collect the callback IDs that fire. + + Args: + loop: ErlangEventLoop instance + timer_count: Number of timers to schedule + base_id: Starting callback ID (for identification) + + Returns: + List of callback IDs that were dispatched + """ + received_ids = [] + + def make_callback(cid): + def callback(): + received_ids.append(cid) + return callback + + # Schedule all timers with small delays + handles = [] + for i in range(timer_count): + callback_id = base_id + i + handle = loop.call_later(0.001, make_callback(callback_id)) # 1ms delay + handles.append(handle) + + # Run the loop until all timers fire or timeout + start = time.monotonic() + timeout = 1.0 # 1 second timeout + + while len(received_ids) < timer_count: + if time.monotonic() - start > timeout: + break + loop._run_once() + + return received_ids + + +def test_two_loops_concurrent_timers(timer_count: int = 100) -> Dict: + """ + Test that two loops can schedule concurrent timers without cross-talk. + + Returns: + Dict with test results: + - loop_a_count: Number of events received by loop A + - loop_b_count: Number of events received by loop B + - loop_a_ids: Set of callback IDs received by loop A + - loop_b_ids: Set of callback IDs received by loop B + - overlap: Any IDs that appear in both (should be empty) + - passed: True if test passed + """ + if not HAS_EVENT_LOOP: + return {"error": "Event loop not available", "passed": False} + + loop_a = ErlangEventLoop() + loop_b = ErlangEventLoop() + + loop_a_base = 1 # IDs 1-100 + loop_b_base = 1001 # IDs 1001-1100 + + # Run timers on both loops + # Note: In real implementation with _for methods, these would be isolated + ids_a = run_concurrent_timers_on_loop(loop_a, timer_count, loop_a_base) + ids_b = run_concurrent_timers_on_loop(loop_b, timer_count, loop_b_base) + + # Check for overlap + set_a = set(ids_a) + set_b = set(ids_b) + overlap = set_a & set_b + + # Expected IDs + expected_a = set(range(loop_a_base, loop_a_base + timer_count)) + expected_b = set(range(loop_b_base, loop_b_base + timer_count)) + + passed = ( + len(ids_a) == timer_count and + len(ids_b) == timer_count and + set_a == expected_a and + set_b == expected_b and + len(overlap) == 0 + ) + + loop_a.close() + loop_b.close() + + return { + "loop_a_count": len(ids_a), + "loop_b_count": len(ids_b), + "loop_a_ids": sorted(ids_a), + "loop_b_ids": sorted(ids_b), + "overlap": sorted(overlap), + "passed": passed + } + + +def test_cross_isolation() -> Dict: + """ + Test that events on loop A are not visible to loop B. + + Returns: + Dict with test results + """ + if not HAS_EVENT_LOOP: + return {"error": "Event loop not available", "passed": False} + + loop_a = ErlangEventLoop() + loop_b = ErlangEventLoop() + + a_received = [] + b_received = [] + + def callback_a(): + a_received.append("event_a") + + def callback_b(): + b_received.append("event_b") + + # Schedule only on loop A + loop_a.call_soon(callback_a) + + # Run loop A - should receive the event + loop_a._run_once() + + # Run loop B - should NOT receive loop A's event + loop_b._run_once() + + passed = ( + len(a_received) == 1 and + len(b_received) == 0 + ) + + loop_a.close() + loop_b.close() + + return { + "loop_a_events": len(a_received), + "loop_b_events": len(b_received), + "passed": passed + } + + +def test_cleanup_no_leak() -> Dict: + """ + Test that destroying one loop doesn't affect another. + + Returns: + Dict with test results + """ + if not HAS_EVENT_LOOP: + return {"error": "Event loop not available", "passed": False} + + loop_a = ErlangEventLoop() + loop_b = ErlangEventLoop() + + b_received = [] + + def callback_b(): + b_received.append("event_b") + + # Schedule timer on loop B for after loop A is destroyed + loop_b.call_later(0.05, callback_b) # 50ms delay + + # Close loop A + loop_a.close() + + # Wait and run loop B - should still work + time.sleep(0.1) # 100ms + loop_b._run_once() + + passed = len(b_received) == 1 + + loop_b.close() + + return { + "loop_b_events": len(b_received), + "passed": passed + } diff --git a/src/py_event_loop.erl b/src/py_event_loop.erl index 44790a7..53bc49d 100644 --- a/src/py_event_loop.erl +++ b/src/py_event_loop.erl @@ -94,6 +94,9 @@ init([]) -> {ok, LoopRef} -> {ok, RouterPid} = py_event_router:start_link(LoopRef), ok = py_nif:event_loop_set_router(LoopRef, RouterPid), + %% Set shared router for per-loop created loops + %% All loops created via _loop_new() in Python will use this router + ok = py_nif:set_shared_router(RouterPid), %% Make the event loop available to Python ok = py_nif:set_python_event_loop(LoopRef), %% Set ErlangEventLoop as the default asyncio policy diff --git a/src/py_event_router.erl b/src/py_event_router.erl index 8a1a282..76a08a8 100644 --- a/src/py_event_router.erl +++ b/src/py_event_router.erl @@ -42,8 +42,9 @@ -record(state, { loop_ref :: reference(), - %% Map of TimerRef -> {ErlangTimerRef, CallbackId} - timers = #{} :: #{reference() => {reference(), non_neg_integer()}} + %% Map of TimerRef -> {LoopRef, ErlangTimerRef, CallbackId} + %% LoopRef is included to dispatch to the correct loop for per-loop timers + timers = #{} :: #{non_neg_integer() => {reference(), reference(), non_neg_integer()}} }). %% ============================================================================ @@ -86,26 +87,35 @@ handle_cast(_Msg, State) -> %% Handle enif_select messages for read readiness handle_info({select, FdRes, _Ref, ready_input}, State) -> - #state{loop_ref = LoopRef} = State, py_nif:handle_fd_event(FdRes, read), %% Re-register for more events (enif_select is one-shot) - py_nif:reselect_reader(LoopRef, FdRes), + %% Uses fd_res->loop internally, no need to pass LoopRef + py_nif:reselect_reader_fd(FdRes), {noreply, State}; %% Handle enif_select messages for write readiness handle_info({select, FdRes, _Ref, ready_output}, State) -> - #state{loop_ref = LoopRef} = State, py_nif:handle_fd_event(FdRes, write), %% Re-register for more events (enif_select is one-shot) - py_nif:reselect_writer(LoopRef, FdRes), + %% Uses fd_res->loop internally, no need to pass LoopRef + py_nif:reselect_writer_fd(FdRes), {noreply, State}; -%% Handle timer start request from call_later NIF -handle_info({start_timer, DelayMs, CallbackId, TimerRef}, State) -> +%% Handle timer start request from call_later NIF (new format with LoopRef) +handle_info({start_timer, LoopRef, DelayMs, CallbackId, TimerRef}, State) -> #state{timers = Timers} = State, %% Create the actual Erlang timer - ErlTimerRef = erlang:send_after(DelayMs, self(), {timeout, TimerRef, CallbackId}), - NewTimers = maps:put(TimerRef, {ErlTimerRef, CallbackId}, Timers), + ErlTimerRef = erlang:send_after(DelayMs, self(), {timeout, TimerRef}), + %% Store LoopRef so we dispatch to the correct loop + NewTimers = maps:put(TimerRef, {LoopRef, ErlTimerRef, CallbackId}, Timers), + {noreply, State#state{timers = NewTimers}}; + +%% Handle timer start request (legacy format without LoopRef - uses state's loop_ref) +handle_info({start_timer, DelayMs, CallbackId, TimerRef}, State) -> + #state{loop_ref = LoopRef, timers = Timers} = State, + %% Create the actual Erlang timer + ErlTimerRef = erlang:send_after(DelayMs, self(), {timeout, TimerRef}), + NewTimers = maps:put(TimerRef, {LoopRef, ErlTimerRef, CallbackId}, Timers), {noreply, State#state{timers = NewTimers}}; %% Handle timer cancellation @@ -114,20 +124,26 @@ handle_info({cancel_timer, TimerRef}, State) -> case maps:get(TimerRef, Timers, undefined) of undefined -> {noreply, State}; - {ErlTimerRef, _CallbackId} -> + {_LoopRef, ErlTimerRef, _CallbackId} -> erlang:cancel_timer(ErlTimerRef), NewTimers = maps:remove(TimerRef, Timers), {noreply, State#state{timers = NewTimers}} end; %% Handle timer expiration -handle_info({timeout, TimerRef, CallbackId}, State) -> - #state{loop_ref = LoopRef, timers = Timers} = State, - %% Dispatch the timer callback - py_nif:dispatch_timer(LoopRef, CallbackId), - %% Remove from active timers - NewTimers = maps:remove(TimerRef, Timers), - {noreply, State#state{timers = NewTimers}}; +handle_info({timeout, TimerRef}, State) -> + #state{timers = Timers} = State, + case maps:get(TimerRef, Timers, undefined) of + undefined -> + %% Timer was cancelled + {noreply, State}; + {LoopRef, _ErlTimerRef, CallbackId} -> + %% Dispatch the timer callback to the correct loop + py_nif:dispatch_timer(LoopRef, CallbackId), + %% Remove from active timers + NewTimers = maps:remove(TimerRef, Timers), + {noreply, State#state{timers = NewTimers}} + end; %% Handle select stop notifications handle_info({select, _FdRes, _Ref, cancelled}, State) -> @@ -139,7 +155,7 @@ handle_info(_Info, State) -> terminate(_Reason, #state{timers = Timers}) -> %% Cancel all pending timers - maps:foreach(fun(_TimerRef, {ErlTimerRef, _CallbackId}) -> + maps:foreach(fun(_TimerRef, {_LoopRef, ErlTimerRef, _CallbackId}) -> erlang:cancel_timer(ErlTimerRef) end, Timers), ok. diff --git a/src/py_nif.erl b/src/py_nif.erl index cdb1186..08a8d68 100644 --- a/src/py_nif.erl +++ b/src/py_nif.erl @@ -92,6 +92,8 @@ get_fd_callback_id/2, reselect_reader/2, reselect_writer/2, + reselect_reader_fd/1, + reselect_writer_fd/1, %% FD lifecycle management (uvloop-like API) handle_fd_event/2, stop_reader/1, @@ -117,6 +119,8 @@ set_udp_broadcast/2, %% Python event loop integration set_python_event_loop/1, + set_isolation_mode/1, + set_shared_router/1, %% ASGI optimizations asgi_build_scope/1, asgi_run/5, @@ -606,6 +610,20 @@ reselect_reader(_LoopRef, _FdRes) -> reselect_writer(_LoopRef, _FdRes) -> ?NIF_STUB. +%% @doc Re-register an fd resource for read monitoring using fd_res->loop. +%% This variant doesn't require LoopRef since the fd resource already +%% has a back-reference to its parent loop. +-spec reselect_reader_fd(reference()) -> ok | {error, term()}. +reselect_reader_fd(_FdRes) -> + ?NIF_STUB. + +%% @doc Re-register an fd resource for write monitoring using fd_res->loop. +%% This variant doesn't require LoopRef since the fd resource already +%% has a back-reference to its parent loop. +-spec reselect_writer_fd(reference()) -> ok | {error, term()}. +reselect_writer_fd(_FdRes) -> + ?NIF_STUB. + %%% ============================================================================ %%% FD Lifecycle Management (uvloop-like API) %%% ============================================================================ @@ -747,6 +765,20 @@ set_udp_broadcast(_Fd, _Enable) -> set_python_event_loop(_LoopRef) -> ?NIF_STUB. +%% @doc Set the event loop isolation mode. +%% - global: all ErlangEventLoop instances share the global loop (default) +%% - per_loop: each ErlangEventLoop creates its own isolated native loop +-spec set_isolation_mode(global | per_loop) -> ok | {error, term()}. +set_isolation_mode(_Mode) -> + ?NIF_STUB. + +%% @doc Set the shared router PID for per-loop created loops. +%% All loops created via _loop_new() in Python will use this router +%% for FD monitoring and timer operations. +-spec set_shared_router(pid()) -> ok | {error, term()}. +set_shared_router(_RouterPid) -> + ?NIF_STUB. + %%% ============================================================================ %%% ASGI Optimizations %%% ============================================================================ diff --git a/test/py_multi_loop_SUITE.erl b/test/py_multi_loop_SUITE.erl new file mode 100644 index 0000000..c7ac648 --- /dev/null +++ b/test/py_multi_loop_SUITE.erl @@ -0,0 +1,290 @@ +%%% @doc Common Test suite for multi-loop isolation. +%%% +%%% Tests that multiple erlang_event_loop_t instances are fully isolated: +%%% - Each loop has its own pending queue +%%% - Events don't cross-dispatch between loops +%%% - Destroying one loop doesn't affect others +%%% +%%% These tests initially fail with global g_python_event_loop coupling +%%% and should pass after per-loop isolation is implemented. +-module(py_multi_loop_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("stdlib/include/assert.hrl"). + +-export([ + all/0, + init_per_suite/1, + end_per_suite/1, + init_per_testcase/2, + end_per_testcase/2 +]). + +-export([ + test_two_loops_concurrent_timers/1, + test_two_loops_cross_isolation/1, + test_loop_cleanup_no_leak/1 +]). + +all() -> + [ + test_two_loops_concurrent_timers, + test_two_loops_cross_isolation, + test_loop_cleanup_no_leak + ]. + +init_per_suite(Config) -> + case application:ensure_all_started(erlang_python) of + {ok, _} -> + case wait_for_event_loop(5000) of + ok -> + Config; + {error, Reason} -> + ct:fail({event_loop_not_ready, Reason}) + end; + {error, {App, Reason}} -> + ct:fail({failed_to_start, App, Reason}) + end. + +wait_for_event_loop(Timeout) when Timeout =< 0 -> + {error, timeout}; +wait_for_event_loop(Timeout) -> + case py_event_loop:get_loop() of + {ok, LoopRef} when is_reference(LoopRef) -> + case py_nif:event_loop_new() of + {ok, TestLoop} -> + py_nif:event_loop_destroy(TestLoop), + ok; + _ -> + timer:sleep(100), + wait_for_event_loop(Timeout - 100) + end; + _ -> + timer:sleep(100), + wait_for_event_loop(Timeout - 100) + end. + +end_per_suite(_Config) -> + ok = application:stop(erlang_python), + ok. + +init_per_testcase(_TestCase, Config) -> + Config. + +end_per_testcase(_TestCase, _Config) -> + ok. + +%% ============================================================================ +%% Test: Two loops with concurrent timers +%% ============================================================================ +%% +%% Creates two independent event loops (LoopA and LoopB). +%% Each schedules 100 timers with unique callback IDs. +%% Verifies that: +%% - Each loop receives exactly its own 100 timer events +%% - No timers are lost or duplicated +%% - No cross-dispatch between loops +%% +%% Expected behavior with per-loop isolation: +%% LoopA receives callback IDs 1-100 +%% LoopB receives callback IDs 1001-1100 +%% +%% Current behavior with global coupling: +%% Both loops share pending queue, events may be lost or misrouted + +test_two_loops_concurrent_timers(_Config) -> + %% Create two independent event loops + {ok, LoopA} = py_nif:event_loop_new(), + {ok, LoopB} = py_nif:event_loop_new(), + + %% Start routers for each loop + {ok, RouterA} = py_event_router:start_link(LoopA), + {ok, RouterB} = py_event_router:start_link(LoopB), + + ok = py_nif:event_loop_set_router(LoopA, RouterA), + ok = py_nif:event_loop_set_router(LoopB, RouterB), + + %% Schedule 100 timers on each loop with different callback ID ranges + NumTimers = 100, + LoopABase = 1, %% Callback IDs 1-100 + LoopBBase = 1001, %% Callback IDs 1001-1100 + + %% Schedule timers on LoopA (small delay for quick test) + _TimerRefsA = [begin + CallbackId = LoopABase + I - 1, + {ok, TimerRef} = py_nif:call_later(LoopA, 10, CallbackId), + TimerRef + end || I <- lists:seq(1, NumTimers)], + + %% Schedule timers on LoopB + _TimerRefsB = [begin + CallbackId = LoopBBase + I - 1, + {ok, TimerRef} = py_nif:call_later(LoopB, 10, CallbackId), + TimerRef + end || I <- lists:seq(1, NumTimers)], + + %% Wait for all timers to fire + timer:sleep(200), + + %% Collect pending events from each loop + EventsA = py_nif:get_pending(LoopA), + EventsB = py_nif:get_pending(LoopB), + + %% Extract callback IDs + CallbackIdsA = [CallbackId || {CallbackId, timer} <- EventsA], + CallbackIdsB = [CallbackId || {CallbackId, timer} <- EventsB], + + ct:pal("LoopA received ~p timer events: ~p", [length(CallbackIdsA), lists:sort(CallbackIdsA)]), + ct:pal("LoopB received ~p timer events: ~p", [length(CallbackIdsB), lists:sort(CallbackIdsB)]), + + %% Verify: LoopA should have exactly IDs 1-100 + ExpectedA = lists:seq(LoopABase, LoopABase + NumTimers - 1), + %% LoopA should receive 100 timers + ?assertEqual(NumTimers, length(CallbackIdsA)), + %% LoopA callback IDs should match expected + ?assertEqual(ExpectedA, lists:sort(CallbackIdsA)), + + %% Verify: LoopB should have exactly IDs 1001-1100 + ExpectedB = lists:seq(LoopBBase, LoopBBase + NumTimers - 1), + %% LoopB should receive 100 timers + ?assertEqual(NumTimers, length(CallbackIdsB)), + %% LoopB callback IDs should match expected + ?assertEqual(ExpectedB, lists:sort(CallbackIdsB)), + + %% Verify: No overlap between loops (no cross-dispatch) + Intersection = lists:filter(fun(Id) -> lists:member(Id, CallbackIdsB) end, CallbackIdsA), + ?assertEqual([], Intersection), + + %% Cleanup + py_event_router:stop(RouterA), + py_event_router:stop(RouterB), + py_nif:event_loop_destroy(LoopA), + py_nif:event_loop_destroy(LoopB), + ok. + +%% ============================================================================ +%% Test: Cross-isolation verification +%% ============================================================================ +%% +%% Verifies that events dispatched to LoopA are never seen by LoopB. +%% Uses fd callbacks which are more direct than timers. +%% +%% Expected behavior with per-loop isolation: +%% LoopA pending queue receives LoopA events only +%% LoopB pending queue receives nothing +%% +%% Current behavior with global coupling: +%% Both loops share the same global pending queue + +test_two_loops_cross_isolation(_Config) -> + {ok, LoopA} = py_nif:event_loop_new(), + {ok, LoopB} = py_nif:event_loop_new(), + + {ok, RouterA} = py_event_router:start_link(LoopA), + {ok, RouterB} = py_event_router:start_link(LoopB), + + ok = py_nif:event_loop_set_router(LoopA, RouterA), + ok = py_nif:event_loop_set_router(LoopB, RouterB), + + %% Create a pipe for LoopA only + {ok, {ReadFd, WriteFd}} = py_nif:create_test_pipe(), + + %% Register reader on LoopA with callback ID 42 + CallbackIdA = 42, + {ok, FdRefA} = py_nif:add_reader(LoopA, ReadFd, CallbackIdA), + + %% Write to trigger read event + ok = py_nif:write_test_fd(WriteFd, <<"test data">>), + + %% Wait for event to be dispatched + timer:sleep(100), + + %% Get pending from both loops + EventsA = py_nif:get_pending(LoopA), + EventsB = py_nif:get_pending(LoopB), + + ct:pal("LoopA events: ~p", [EventsA]), + ct:pal("LoopB events: ~p", [EventsB]), + + %% LoopA should have the read event + ReadEventsA = [E || {_Cid, read} = E <- EventsA], + %% LoopA should have 1 read event + ?assertEqual(1, length(ReadEventsA)), + + %% LoopB should have NO events - this is the isolation test + ?assertEqual([], EventsB), + + %% Cleanup + py_nif:remove_reader(LoopA, FdRefA), + py_nif:close_test_fd(ReadFd), + py_nif:close_test_fd(WriteFd), + py_event_router:stop(RouterA), + py_event_router:stop(RouterB), + py_nif:event_loop_destroy(LoopA), + py_nif:event_loop_destroy(LoopB), + ok. + +%% ============================================================================ +%% Test: Loop cleanup without leaks +%% ============================================================================ +%% +%% Destroys LoopA while LoopB continues operating. +%% Verifies that: +%% - LoopB continues to receive its events +%% - No memory corruption or resource leaks +%% - Events scheduled on destroyed loop don't crash system +%% +%% Expected behavior with per-loop isolation: +%% LoopB operates independently after LoopA destruction +%% +%% Current behavior with global coupling: +%% Destroying LoopA may clear g_python_event_loop affecting LoopB + +test_loop_cleanup_no_leak(_Config) -> + {ok, LoopA} = py_nif:event_loop_new(), + {ok, LoopB} = py_nif:event_loop_new(), + + {ok, RouterA} = py_event_router:start_link(LoopA), + {ok, RouterB} = py_event_router:start_link(LoopB), + + ok = py_nif:event_loop_set_router(LoopA, RouterA), + ok = py_nif:event_loop_set_router(LoopB, RouterB), + + %% Schedule a timer on LoopB that will fire after LoopA is destroyed + CallbackIdB = 999, + {ok, _TimerRefB} = py_nif:call_later(LoopB, 100, CallbackIdB), + + %% Schedule a timer on LoopA (won't be received since we destroy it) + {ok, _TimerRefA} = py_nif:call_later(LoopA, 50, 111), + + %% Destroy LoopA before its timer fires + timer:sleep(20), + py_event_router:stop(RouterA), + py_nif:event_loop_destroy(LoopA), + + %% Wait for LoopB timer to fire + timer:sleep(150), + + %% LoopB should still receive its event + EventsB = py_nif:get_pending(LoopB), + ct:pal("LoopB events after LoopA destruction: ~p", [EventsB]), + + %% Verify LoopB still works - should receive its timer after LoopA destroyed + TimerEventsB = [CallbackId || {CallbackId, timer} <- EventsB], + ?assertEqual([CallbackIdB], TimerEventsB), + + %% Schedule another timer on LoopB to verify it still works + CallbackIdB2 = 1000, + {ok, _} = py_nif:call_later(LoopB, 10, CallbackIdB2), + timer:sleep(50), + + EventsB2 = py_nif:get_pending(LoopB), + TimerEventsB2 = [CallbackId || {CallbackId, timer} <- EventsB2], + %% LoopB should still work after LoopA destroyed + ?assertEqual([CallbackIdB2], TimerEventsB2), + + %% Cleanup + py_event_router:stop(RouterB), + py_nif:event_loop_destroy(LoopB), + ok. + diff --git a/test/py_multi_loop_integration_SUITE.erl b/test/py_multi_loop_integration_SUITE.erl new file mode 100644 index 0000000..8ca267b --- /dev/null +++ b/test/py_multi_loop_integration_SUITE.erl @@ -0,0 +1,292 @@ +%%% @doc Integration test suite for isolated event loops with real asyncio workloads. +%%% +%%% Tests that multiple ErlangEventLoop instances created with `isolated=True` +%%% can run real asyncio operations concurrently without interference. +-module(py_multi_loop_integration_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("stdlib/include/assert.hrl"). + +-export([ + all/0, + init_per_suite/1, + end_per_suite/1, + init_per_testcase/2, + end_per_testcase/2 +]). + +-export([ + test_isolated_loop_creation/1, + test_two_loops_concurrent_callbacks/1, + test_two_loops_isolation/1, + test_multiple_loops_lifecycle/1, + test_isolated_loops_with_timers/1 +]). + +all() -> + [ + test_isolated_loop_creation, + test_two_loops_concurrent_callbacks, + test_two_loops_isolation, + test_multiple_loops_lifecycle, + test_isolated_loops_with_timers + ]. + +init_per_suite(Config) -> + %% Stop application if already running (from other test suites) + _ = application:stop(erlang_python), + timer:sleep(200), + + {ok, _} = application:ensure_all_started(erlang_python), + timer:sleep(200), + Config. + +end_per_suite(_Config) -> + ok = application:stop(erlang_python), + ok. + +init_per_testcase(_TestCase, Config) -> + py:unbind(), + Config. + +end_per_testcase(_TestCase, _Config) -> + py:unbind(), + ok. + +%% ============================================================================ +%% Test: Verify isolated loop can be created with isolated=True +%% ============================================================================ + +test_isolated_loop_creation(_Config) -> + ok = py:exec(<<" +from erlang_loop import ErlangEventLoop + +# Default loop uses shared global +default_loop = ErlangEventLoop() +assert default_loop._loop_handle is None, 'Default loop should use global (None handle)' +default_loop.close() + +# Isolated loop gets its own handle +isolated_loop = ErlangEventLoop(isolated=True) +assert isolated_loop._loop_handle is not None, 'Isolated loop should have its own handle' +isolated_loop.close() +">>), + ok. + +%% ============================================================================ +%% Test: Two isolated loops running concurrent call_soon callbacks +%% ============================================================================ + +test_two_loops_concurrent_callbacks(_Config) -> + ok = py:exec(<<" +import threading +from erlang_loop import ErlangEventLoop + +results = {} + +def run_loop_callbacks(loop_id, num_callbacks): + '''Run callbacks in a separate thread with its own isolated loop.''' + loop = ErlangEventLoop(isolated=True) + + callback_results = [] + + def make_callback(val): + def cb(): + callback_results.append(val) + return cb + + try: + # Schedule callbacks + for i in range(num_callbacks): + loop.call_soon(make_callback(i)) + + # Run loop to process callbacks + loop._run_once() + + results[loop_id] = { + 'count': len(callback_results), + 'values': callback_results[:5], + 'success': True + } + except Exception as e: + results[loop_id] = {'error': str(e), 'success': False} + finally: + loop.close() + +# Run two isolated loops concurrently +t1 = threading.Thread(target=run_loop_callbacks, args=('loop_a', 10)) +t2 = threading.Thread(target=run_loop_callbacks, args=('loop_b', 10)) + +t1.start() +t2.start() +t1.join() +t2.join() + +# Both should complete +assert results.get('loop_a', {}).get('success'), f'Loop A failed: {results.get(\"loop_a\")}' +assert results.get('loop_b', {}).get('success'), f'Loop B failed: {results.get(\"loop_b\")}' + +# Each should process 10 callbacks +assert results['loop_a']['count'] == 10, f'Loop A wrong count: {results[\"loop_a\"][\"count\"]}' +assert results['loop_b']['count'] == 10, f'Loop B wrong count: {results[\"loop_b\"][\"count\"]}' +">>), + ok. + +%% ============================================================================ +%% Test: Callbacks are isolated between loops +%% ============================================================================ + +test_two_loops_isolation(_Config) -> + ok = py:exec(<<" +import threading +from erlang_loop import ErlangEventLoop + +results = {} + +def run_isolated_loop(loop_id, marker_value): + '''Each loop schedules callbacks with unique markers.''' + loop = ErlangEventLoop(isolated=True) + + collected = [] + + def cb(): + collected.append(marker_value) + + try: + # Schedule 5 callbacks with this loop's marker + for _ in range(5): + loop.call_soon(cb) + + # Process + loop._run_once() + + # All collected should be our marker + all_match = all(v == marker_value for v in collected) + results[loop_id] = { + 'collected': collected, + 'all_match': all_match, + 'count': len(collected), + 'success': True + } + except Exception as e: + results[loop_id] = {'error': str(e), 'success': False} + finally: + loop.close() + +# Run with different markers +t1 = threading.Thread(target=run_isolated_loop, args=('loop_a', 'A')) +t2 = threading.Thread(target=run_isolated_loop, args=('loop_b', 'B')) + +t1.start() +t2.start() +t1.join() +t2.join() + +# Both should succeed +assert results.get('loop_a', {}).get('success'), f'Loop A failed: {results.get(\"loop_a\")}' +assert results.get('loop_b', {}).get('success'), f'Loop B failed: {results.get(\"loop_b\")}' + +# Each should only see its own marker (isolation test) +assert results['loop_a']['all_match'], f'Loop A saw wrong markers: {results[\"loop_a\"][\"collected\"]}' +assert results['loop_b']['all_match'], f'Loop B saw wrong markers: {results[\"loop_b\"][\"collected\"]}' + +# Each should have 5 callbacks +assert results['loop_a']['count'] == 5, f'Loop A wrong count' +assert results['loop_b']['count'] == 5, f'Loop B wrong count' +">>), + ok. + +%% ============================================================================ +%% Test: Multiple isolated loops lifecycle +%% ============================================================================ + +test_multiple_loops_lifecycle(_Config) -> + ok = py:exec(<<" +from erlang_loop import ErlangEventLoop + +# Create multiple isolated loops +loops = [] +for i in range(3): + loop = ErlangEventLoop(isolated=True) + loops.append(loop) + +# Each loop should be independent +collected = {i: [] for i in range(3)} + +for i, loop in enumerate(loops): + def make_cb(idx): + def cb(): + collected[idx].append(idx) + return cb + loop.call_soon(make_cb(i)) + +# Process each loop +for loop in loops: + loop._run_once() + +# Verify isolation +for i in range(3): + assert collected[i] == [i], f'Loop {i} wrong: {collected[i]}' + +# Close loops in reverse order +for loop in reversed(loops): + loop.close() + +# Verify all closed +for loop in loops: + assert loop.is_closed(), 'Loop not closed' +">>), + ok. + +%% ============================================================================ +%% Test: Isolated loops with timer callbacks (call_later) +%% ============================================================================ + +test_isolated_loops_with_timers(_Config) -> + ok = py:exec(<<" +import time +from erlang_loop import ErlangEventLoop + +# Test timer in an isolated loop +loop = ErlangEventLoop(isolated=True) +timer_fired = [] + +def timer_callback(): + timer_fired.append(time.time()) + +# Schedule a 50ms timer +handle = loop.call_later(0.05, timer_callback) + +# Run the loop to process the timer +start = time.time() +deadline = start + 0.5 # 500ms timeout + +while time.time() < deadline and not timer_fired: + loop._run_once() + time.sleep(0.01) + +assert len(timer_fired) > 0, f'Timer did not fire within timeout, elapsed={time.time()-start:.3f}s' + +loop.close() + +# Now test two isolated loops sequentially +for loop_id in ['loop_a', 'loop_b']: + loop = ErlangEventLoop(isolated=True) + timer_result = [] + + def make_cb(lid): + def cb(): + timer_result.append(lid) + return cb + + loop.call_later(0.03, make_cb(loop_id)) + + start = time.time() + while time.time() < start + 0.3 and not timer_result: + loop._run_once() + time.sleep(0.01) + + assert timer_result == [loop_id], f'Loop {loop_id} timer failed: {timer_result}' + loop.close() +">>), + ok.