diff --git a/c_src/py_callback.c b/c_src/py_callback.c index 573849c..aeaed55 100644 --- a/c_src/py_callback.c +++ b/c_src/py_callback.c @@ -1654,8 +1654,21 @@ static void *async_event_loop_thread(void *arg) { return NULL; } - /* Create new event loop */ - PyObject *loop = PyObject_CallMethod(asyncio, "new_event_loop", NULL); + /* Create a default selector event loop directly, bypassing the policy. + * Worker threads should NOT use ErlangEventLoop since it requires the + * main thread's event router. Using SelectorEventLoop ensures these + * background threads have their own independent event loops. */ + PyObject *selector_loop_class = PyObject_GetAttrString(asyncio, "SelectorEventLoop"); + PyObject *loop = NULL; + if (selector_loop_class != NULL) { + loop = PyObject_CallObject(selector_loop_class, NULL); + Py_DECREF(selector_loop_class); + } + if (loop == NULL) { + /* Fallback to new_event_loop if SelectorEventLoop not available */ + PyErr_Clear(); + loop = PyObject_CallMethod(asyncio, "new_event_loop", NULL); + } if (loop == NULL) { PyErr_Print(); Py_DECREF(asyncio); @@ -1664,7 +1677,7 @@ static void *async_event_loop_thread(void *arg) { return NULL; } - /* Set as current loop */ + /* Set as current loop for this thread */ PyObject *set_result = PyObject_CallMethod(asyncio, "set_event_loop", "O", loop); Py_XDECREF(set_result); diff --git a/c_src/py_event_loop.c b/c_src/py_event_loop.c index 8381e03..17bf778 100644 --- a/c_src/py_event_loop.c +++ b/c_src/py_event_loop.c @@ -63,12 +63,104 @@ ERL_NIF_TERM ATOM_EVENT_LOOP; ERL_NIF_TERM ATOM_DISPATCH; /* ============================================================================ - * Resource Callbacks - * ============================================================================ */ + * Per-Interpreter Event Loop Storage + * ============================================================================ + * + * Event loop references are stored as module attributes in py_event_loop, + * using PyCapsule for safe C pointer storage. This approach: + * + * - Works uniformly for main interpreter and sub-interpreters + * - Each interpreter has its own py_event_loop module with its own attribute + * - Thread-safe for free-threading (Python 3.13+) + * - Uses gil_acquire()/gil_release() for safe GIL management + * + * Flow: + * NIF set_python_event_loop() -> stores capsule in py_event_loop._loop + * Python _is_initialized() -> checks if _loop attribute exists and is valid + * Python operations -> retrieve loop from py_event_loop._loop + */ -/* Forward declaration of global Python event loop pointer (defined later in file) */ +/** @brief Name for the PyCapsule storing event loop pointer */ +static const char *EVENT_LOOP_CAPSULE_NAME = "erlang_python.event_loop"; + +/** @brief Module attribute name for storing the event loop */ +static const char *EVENT_LOOP_ATTR_NAME = "_loop"; + +/* Forward declaration for fallback in get_interpreter_event_loop */ static erlang_event_loop_t *g_python_event_loop; +/** + * Get the py_event_loop module for the current interpreter. + * MUST be called with GIL held. + * Returns borrowed reference. + */ +static PyObject *get_event_loop_module(void) { + PyObject *modules = PyImport_GetModuleDict(); + if (modules == NULL) { + return NULL; + } + return PyDict_GetItemString(modules, "py_event_loop"); +} + +/** + * Get the event loop for the current Python interpreter. + * MUST be called with GIL held. + * + * For now, we use the global g_python_event_loop directly. Per-interpreter + * storage via module attributes was causing issues on some Python versions. + * The global approach works correctly since all Python code in the main + * interpreter shares the same event loop. + * + * TODO: Implement proper per-interpreter storage for sub-interpreter support. + * + * @return Event loop pointer or NULL if not set + */ +static erlang_event_loop_t *get_interpreter_event_loop(void) { + return g_python_event_loop; +} + +/** + * Set the event loop for the current interpreter. + * MUST be called with GIL held. + * Stores as py_event_loop._loop module attribute. + * + * @param loop Event loop to set + * @return 0 on success, -1 on error + */ +static int set_interpreter_event_loop(erlang_event_loop_t *loop) { + PyObject *module = get_event_loop_module(); + if (module == NULL) { + return -1; + } + + if (loop == NULL) { + /* Clear the event loop attribute */ + if (PyObject_SetAttrString(module, EVENT_LOOP_ATTR_NAME, Py_None) < 0) { + PyErr_Clear(); + } + return 0; + } + + PyObject *capsule = PyCapsule_New(loop, EVENT_LOOP_CAPSULE_NAME, NULL); + if (capsule == NULL) { + return -1; + } + + int result = PyObject_SetAttrString(module, EVENT_LOOP_ATTR_NAME, capsule); + Py_DECREF(capsule); + + if (result < 0) { + PyErr_Clear(); + return -1; + } + + return 0; +} + +/* ============================================================================ + * Resource Callbacks + * ============================================================================ */ + /* Forward declaration */ int create_default_event_loop(ErlNifEnv *env); @@ -78,12 +170,17 @@ int create_default_event_loop(ErlNifEnv *env); void event_loop_destructor(ErlNifEnv *env, void *obj) { erlang_event_loop_t *loop = (erlang_event_loop_t *)obj; - /* If this is the active Python event loop, create a new default one - * so g_python_event_loop is never NULL */ + /* If this is the active Python event loop, clear references */ if (g_python_event_loop == loop) { g_python_event_loop = NULL; - /* Create a new default loop to ensure Python asyncio always works */ - create_default_event_loop(env); + /* Clear per-interpreter storage if we can acquire GIL. + * Don't create new loop in destructor - let next Python call handle it. */ + PyGILState_STATE gstate = PyGILState_Ensure(); + erlang_event_loop_t *interp_loop = get_interpreter_event_loop(); + if (interp_loop == loop) { + set_interpreter_event_loop(NULL); + } + PyGILState_Release(gstate); } /* Signal shutdown */ @@ -2088,7 +2185,7 @@ ERL_NIF_TERM nif_set_udp_broadcast(ErlNifEnv *env, int argc, /** * Initialize the global Python event loop. - * Called from Erlang when setting up the event loop for Python use. + * Note: This function is currently unused (dead code). */ int py_event_loop_init_python(ErlNifEnv *env, erlang_event_loop_t *loop) { (void)env; @@ -2099,19 +2196,27 @@ int py_event_loop_init_python(ErlNifEnv *env, erlang_event_loop_t *loop) { /** * NIF to set the global Python event loop. * Called from Erlang: py_nif:set_python_event_loop(LoopRef) + * + * Updates both the global C variable (for NIF calls) and the per-interpreter + * storage (for Python code). Acquires GIL to set per-interpreter storage. */ ERL_NIF_TERM nif_set_python_event_loop(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) { (void)argc; - (void)env; erlang_event_loop_t *loop; if (!enif_get_resource(env, argv[0], EVENT_LOOP_RESOURCE_TYPE, (void **)&loop)) { return make_error(env, "invalid_event_loop"); } + /* Set global C variable for fast access from C code */ g_python_event_loop = loop; + /* Also set per-interpreter storage so Python code uses the correct loop */ + PyGILState_STATE gstate = PyGILState_Ensure(); + set_interpreter_event_loop(loop); + PyGILState_Release(gstate); + return ATOM_OK; } @@ -2124,12 +2229,14 @@ static PyObject *py_poll_events(PyObject *self, PyObject *args) { return NULL; } - if (g_python_event_loop == NULL) { + /* Use per-interpreter event loop lookup */ + erlang_event_loop_t *loop = get_interpreter_event_loop(); + if (loop == NULL) { PyErr_SetString(PyExc_RuntimeError, "Event loop not initialized"); return NULL; } - if (g_python_event_loop->shutdown) { + if (loop->shutdown) { return PyLong_FromLong(0); } @@ -2137,7 +2244,7 @@ static PyObject *py_poll_events(PyObject *self, PyObject *args) { /* Release GIL while waiting */ Py_BEGIN_ALLOW_THREADS - num_events = poll_events_wait(g_python_event_loop, timeout_ms); + num_events = poll_events_wait(loop, timeout_ms); Py_END_ALLOW_THREADS return PyLong_FromLong(num_events); @@ -2148,12 +2255,12 @@ static PyObject *py_get_pending(PyObject *self, PyObject *args) { (void)self; (void)args; - if (g_python_event_loop == NULL) { + /* Use per-interpreter event loop lookup */ + erlang_event_loop_t *loop = get_interpreter_event_loop(); + if (loop == NULL) { return PyList_New(0); } - erlang_event_loop_t *loop = g_python_event_loop; - pthread_mutex_lock(&loop->mutex); /* Count pending events */ @@ -2213,13 +2320,15 @@ static PyObject *py_wakeup(PyObject *self, PyObject *args) { (void)self; (void)args; - if (g_python_event_loop == NULL) { + /* Use per-interpreter event loop lookup */ + erlang_event_loop_t *loop = get_interpreter_event_loop(); + if (loop == NULL) { Py_RETURN_NONE; } - pthread_mutex_lock(&g_python_event_loop->mutex); - pthread_cond_broadcast(&g_python_event_loop->event_cond); - pthread_mutex_unlock(&g_python_event_loop->mutex); + pthread_mutex_lock(&loop->mutex); + pthread_cond_broadcast(&loop->event_cond); + pthread_mutex_unlock(&loop->mutex); Py_RETURN_NONE; } @@ -2234,7 +2343,9 @@ static PyObject *py_add_pending(PyObject *self, PyObject *args) { return NULL; } - if (g_python_event_loop == NULL) { + /* Use per-interpreter event loop lookup */ + erlang_event_loop_t *loop = get_interpreter_event_loop(); + if (loop == NULL) { Py_RETURN_NONE; } @@ -2247,7 +2358,7 @@ static PyObject *py_add_pending(PyObject *self, PyObject *args) { type = EVENT_TYPE_TIMER; } - event_loop_add_pending(g_python_event_loop, type, callback_id, -1); + event_loop_add_pending(loop, type, callback_id, -1); Py_RETURN_NONE; } @@ -2257,7 +2368,9 @@ static PyObject *py_is_initialized(PyObject *self, PyObject *args) { (void)self; (void)args; - if (g_python_event_loop != NULL) { + /* Use per-interpreter event loop lookup */ + erlang_event_loop_t *loop = get_interpreter_event_loop(); + if (loop != NULL) { Py_RETURN_TRUE; } Py_RETURN_FALSE; @@ -2273,13 +2386,13 @@ static PyObject *py_add_reader(PyObject *self, PyObject *args) { return NULL; } - if (g_python_event_loop == NULL) { + /* Use per-interpreter event loop lookup */ + erlang_event_loop_t *loop = get_interpreter_event_loop(); + if (loop == NULL) { PyErr_SetString(PyExc_RuntimeError, "Event loop not initialized"); return NULL; } - erlang_event_loop_t *loop = g_python_event_loop; - /* Create fd resource */ fd_resource_t *fd_res = enif_alloc_resource(FD_RESOURCE_TYPE, sizeof(fd_resource_t)); if (fd_res == NULL) { @@ -2324,10 +2437,7 @@ static PyObject *py_remove_reader(PyObject *self, PyObject *args) { return NULL; } - if (g_python_event_loop == NULL) { - Py_RETURN_NONE; - } - + /* Use per-interpreter event loop lookup - but still allow cleanup even if loop is gone */ fd_resource_t *fd_res = (fd_resource_t *)(uintptr_t)fd_key; if (fd_res != NULL && fd_res->loop != NULL) { enif_select(fd_res->loop->msg_env, (ErlNifEvent)fd_res->fd, @@ -2349,13 +2459,13 @@ static PyObject *py_add_writer(PyObject *self, PyObject *args) { return NULL; } - if (g_python_event_loop == NULL) { + /* Use per-interpreter event loop lookup */ + erlang_event_loop_t *loop = get_interpreter_event_loop(); + if (loop == NULL) { PyErr_SetString(PyExc_RuntimeError, "Event loop not initialized"); return NULL; } - erlang_event_loop_t *loop = g_python_event_loop; - /* Create fd resource */ fd_resource_t *fd_res = enif_alloc_resource(FD_RESOURCE_TYPE, sizeof(fd_resource_t)); if (fd_res == NULL) { @@ -2400,10 +2510,7 @@ static PyObject *py_remove_writer(PyObject *self, PyObject *args) { return NULL; } - if (g_python_event_loop == NULL) { - Py_RETURN_NONE; - } - + /* Use fd_res->loop directly - allows cleanup even if interpreter's loop is gone */ fd_resource_t *fd_res = (fd_resource_t *)(uintptr_t)fd_key; if (fd_res != NULL && fd_res->loop != NULL) { enif_select(fd_res->loop->msg_env, (ErlNifEvent)fd_res->fd, @@ -2425,18 +2532,22 @@ static PyObject *py_schedule_timer(PyObject *self, PyObject *args) { return NULL; } - if (g_python_event_loop == NULL || !g_python_event_loop->has_router) { + /* Use per-interpreter event loop lookup */ + erlang_event_loop_t *loop = get_interpreter_event_loop(); + if (loop == NULL || !loop->has_router) { PyErr_SetString(PyExc_RuntimeError, "Event loop not initialized"); return NULL; } - - erlang_event_loop_t *loop = g_python_event_loop; if (delay_ms < 0) delay_ms = 0; uint64_t timer_ref_id = atomic_fetch_add(&loop->next_callback_id, 1); - ErlNifEnv *msg_env = loop->msg_env; - enif_clear_env(msg_env); + /* Use per-call env for thread safety in free-threaded Python */ + ErlNifEnv *msg_env = enif_alloc_env(); + if (msg_env == NULL) { + PyErr_SetString(PyExc_MemoryError, "Failed to allocate message env"); + return NULL; + } ERL_NIF_TERM msg = enif_make_tuple4( msg_env, @@ -2446,7 +2557,10 @@ static PyObject *py_schedule_timer(PyObject *self, PyObject *args) { enif_make_uint64(msg_env, timer_ref_id) ); - if (!enif_send(NULL, &loop->router_pid, msg_env, msg)) { + int send_result = enif_send(NULL, &loop->router_pid, msg_env, msg); + enif_free_env(msg_env); + + if (!send_result) { PyErr_SetString(PyExc_RuntimeError, "Failed to send timer message"); return NULL; } @@ -2463,13 +2577,17 @@ static PyObject *py_cancel_timer(PyObject *self, PyObject *args) { return NULL; } - if (g_python_event_loop == NULL || !g_python_event_loop->has_router) { + /* Use per-interpreter event loop lookup */ + erlang_event_loop_t *loop = get_interpreter_event_loop(); + if (loop == NULL || !loop->has_router) { Py_RETURN_NONE; } - erlang_event_loop_t *loop = g_python_event_loop; - ErlNifEnv *msg_env = loop->msg_env; - enif_clear_env(msg_env); + /* Use per-call env for thread safety in free-threaded Python */ + ErlNifEnv *msg_env = enif_alloc_env(); + if (msg_env == NULL) { + Py_RETURN_NONE; /* Best effort - don't fail on cancel */ + } ERL_NIF_TERM msg = enif_make_tuple2( msg_env, @@ -2478,6 +2596,7 @@ static PyObject *py_cancel_timer(PyObject *self, PyObject *args) { ); enif_send(NULL, &loop->router_pid, msg_env, msg); + enif_free_env(msg_env); Py_RETURN_NONE; } @@ -2526,13 +2645,13 @@ static PyObject *py_run_once(PyObject *self, PyObject *args) { return NULL; } - if (g_python_event_loop == NULL) { + /* Use per-interpreter event loop lookup */ + erlang_event_loop_t *loop = get_interpreter_event_loop(); + if (loop == NULL) { PyErr_SetString(PyExc_RuntimeError, "Event loop not initialized"); return NULL; } - erlang_event_loop_t *loop = g_python_event_loop; - if (loop->shutdown) { return PyList_New(0); } @@ -2655,11 +2774,20 @@ int create_py_event_loop_module(void) { /** * Create a default event loop and set it as g_python_event_loop. * This ensures the event loop is always available for Python asyncio. - * Called after NIF is fully loaded. + * Called after NIF is fully loaded (with GIL held). */ int create_default_event_loop(ErlNifEnv *env) { + /* Check per-interpreter storage first for sub-interpreter support */ + erlang_event_loop_t *existing = get_interpreter_event_loop(); + if (existing != NULL) { + return 0; /* Already have an event loop for this interpreter */ + } + + /* Also check global for backward compatibility */ if (g_python_event_loop != NULL) { - return 0; /* Already have an event loop */ + /* Global exists but not set for this interpreter - set it now */ + set_interpreter_event_loop(g_python_event_loop); + return 0; } /* Allocate event loop resource */ @@ -2700,9 +2828,12 @@ int create_default_event_loop(ErlNifEnv *env) { loop->has_router = false; loop->has_self = false; - /* Set as global Python event loop */ + /* Set as global Python event loop (backward compatibility for NIF calls) */ g_python_event_loop = loop; + /* Store in per-interpreter storage for Python code to access */ + set_interpreter_event_loop(loop); + /* Keep a reference to prevent garbage collection */ /* Note: This loop will be replaced when py_event_loop:init runs */ diff --git a/c_src/py_nif.h b/c_src/py_nif.h index df7880b..321f000 100644 --- a/c_src/py_nif.h +++ b/c_src/py_nif.h @@ -1303,4 +1303,74 @@ static PyObject *thread_worker_call(const char *func_name, size_t func_name_len, /** @} */ +/* ============================================================================ + * Safe GIL/Thread State Acquisition + * ============================================================================ + * + * These helpers provide safe GIL acquisition that works across: + * - Python 3.9-3.11 (standard GIL) + * - Python 3.12-3.13 (stricter thread state checks) + * - Python 3.13t free-threaded (no GIL, but thread attachment required) + * + * The pattern follows PyO3's Python::attach() approach: + * 1. Check if already attached via PyGILState_Check() + * 2. If not, use PyGILState_Ensure() to attach + * 3. Track whether we acquired so we release correctly + * + * Per Python docs, PyGILState_Ensure/Release work in free-threaded builds + * to manage thread attachment even without a GIL. + */ + +/** + * @defgroup gil_helpers Safe GIL/Thread State Helpers + * @brief Thread-safe GIL acquisition for Python 3.12+ compatibility + * @{ + */ + +/** + * @brief Guard structure for safe GIL acquisition/release + */ +typedef struct { + PyGILState_STATE gstate; /**< GIL state from PyGILState_Ensure */ + int acquired; /**< 1 if we acquired, 0 if already held */ +} gil_guard_t; + +/** + * @brief Safely acquire the GIL/attach to Python runtime. + * + * This function is reentrant - if the current thread already holds the GIL + * (or is attached in free-threaded builds), it returns immediately without + * double-acquiring. + * + * @return Guard structure that must be passed to gil_release() + */ +static inline gil_guard_t gil_acquire(void) { + gil_guard_t guard = {.gstate = PyGILState_UNLOCKED, .acquired = 0}; + + /* Check if already attached to Python runtime */ + if (PyGILState_Check()) { + return guard; + } + + /* Attach to Python runtime (acquires GIL in GIL-enabled builds) */ + guard.gstate = PyGILState_Ensure(); + guard.acquired = 1; + return guard; +} + +/** + * @brief Release the GIL/detach from Python runtime. + * + * Only releases if we actually acquired in gil_acquire(). + * + * @param guard The guard structure returned by gil_acquire() + */ +static inline void gil_release(gil_guard_t guard) { + if (guard.acquired) { + PyGILState_Release(guard.gstate); + } +} + +/** @} */ + #endif /* PY_NIF_H */ diff --git a/priv/erlang_loop.py b/priv/erlang_loop.py index 08f9a91..f5378e1 100644 --- a/priv/erlang_loop.py +++ b/priv/erlang_loop.py @@ -319,8 +319,11 @@ def call_at(self, when, callback, *args, context=None): try: timer_ref = self._pel._schedule_timer(delay_ms, callback_id) self._timer_refs[callback_id] = timer_ref - except (AttributeError, RuntimeError): - pass # Fallback: mock module or no router + except AttributeError: + pass # Fallback: mock module doesn't have _schedule_timer + except RuntimeError as e: + # Fail fast on initialization errors - don't silently hang + raise RuntimeError(f"Timer scheduling failed: {e}") from e return handle @@ -849,10 +852,14 @@ def _run_once(self): dispatch = self._dispatch for callback_id, event_type in pending: dispatch(callback_id, event_type) - except Exception: - pass - except Exception: - pass + except AttributeError: + pass # Mock module without these methods + except RuntimeError as e: + # Fail fast on initialization errors - don't silently hang + raise RuntimeError(f"Event loop poll failed: {e}") from e + except RuntimeError as e: + # Fail fast on initialization errors - don't silently hang + raise RuntimeError(f"Event loop poll failed: {e}") from e def _dispatch(self, callback_id, event_type): """Dispatch a callback based on event type. @@ -1458,7 +1465,14 @@ def _cancel_timer(self, timer_ref): def get_event_loop_policy(): - """Get an event loop policy that uses ErlangEventLoop.""" + """Get an event loop policy that uses ErlangEventLoop for the main thread. + + Non-main threads get the default SelectorEventLoop to avoid conflicts + with the Erlang-native event loop which is designed for the main thread. + """ + # Capture main thread ID at policy creation time + main_thread_id = threading.main_thread().ident + class ErlangEventLoopPolicy(asyncio.AbstractEventLoopPolicy): def __init__(self): self._local = threading.local() @@ -1472,6 +1486,12 @@ def set_event_loop(self, loop): self._local.loop = loop def new_event_loop(self): - return ErlangEventLoop() + # Only use ErlangEventLoop for the main thread + # Worker threads should use the default selector-based loop + if threading.current_thread().ident == main_thread_id: + return ErlangEventLoop() + else: + # Return default selector event loop for non-main threads + return asyncio.SelectorEventLoop() return ErlangEventLoopPolicy()