From 51fdd3b38abc8dbcdf0380ec4e5b3399f258ebe2 Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Sun, 8 Mar 2026 01:47:09 +0100 Subject: [PATCH 1/5] Add async_pending action and reactor NIF support - Add handoff/2,3 API functions for FD handoff - Add setup_code option to run Python code in context - Add async_pending action handling for task-based ASGI - Add {write_ready, Fd} message handling for async completion - Register reactor NIFs in nif_funcs table - Fix erlang.reactor module import path in NIFs - Add async_pending_test to py_reactor_SUITE --- c_src/py_event_loop.c | 8 +-- c_src/py_event_loop.h | 68 +++++++++++++++++++ c_src/py_nif.c | 13 +++- src/py_reactor_context.erl | 95 +++++++++++++++++++++++++- test/py_reactor_SUITE.erl | 135 ++++++++++++++++++++++++++++++++++++- 5 files changed, 310 insertions(+), 9 deletions(-) diff --git a/c_src/py_event_loop.c b/c_src/py_event_loop.c index 38d55b3..f17cdff 100644 --- a/c_src/py_event_loop.c +++ b/c_src/py_event_loop.c @@ -3088,7 +3088,7 @@ ERL_NIF_TERM nif_reactor_on_read_ready(ErlNifEnv *env, int argc, gil_guard_t guard = gil_acquire(); /* Import erlang_reactor module */ - PyObject *reactor_module = PyImport_ImportModule("erlang_reactor"); + PyObject *reactor_module = PyImport_ImportModule("erlang.reactor"); if (reactor_module == NULL) { PyErr_Clear(); gil_release(guard); @@ -3153,7 +3153,7 @@ ERL_NIF_TERM nif_reactor_on_write_ready(ErlNifEnv *env, int argc, gil_guard_t guard = gil_acquire(); /* Import erlang_reactor module */ - PyObject *reactor_module = PyImport_ImportModule("erlang_reactor"); + PyObject *reactor_module = PyImport_ImportModule("erlang.reactor"); if (reactor_module == NULL) { PyErr_Clear(); gil_release(guard); @@ -3231,7 +3231,7 @@ ERL_NIF_TERM nif_reactor_init_connection(ErlNifEnv *env, int argc, } /* Import erlang_reactor module */ - PyObject *reactor_module = PyImport_ImportModule("erlang_reactor"); + PyObject *reactor_module = PyImport_ImportModule("erlang.reactor"); if (reactor_module == NULL) { Py_DECREF(client_info); PyErr_Clear(); @@ -3286,7 +3286,7 @@ ERL_NIF_TERM nif_reactor_close_fd(ErlNifEnv *env, int argc, if (fd >= 0) { gil_guard_t guard = gil_acquire(); - PyObject *reactor_module = PyImport_ImportModule("erlang_reactor"); + PyObject *reactor_module = PyImport_ImportModule("erlang.reactor"); if (reactor_module != NULL) { PyObject *result = PyObject_CallMethod(reactor_module, "close_connection", "i", fd); diff --git a/c_src/py_event_loop.h b/c_src/py_event_loop.h index 0cd9735..2d3336c 100644 --- a/c_src/py_event_loop.h +++ b/c_src/py_event_loop.h @@ -799,4 +799,72 @@ int create_default_event_loop(ErlNifEnv *env); */ int init_subinterpreter_event_loop(ErlNifEnv *env); +/* ============================================================================ + * Reactor NIF Functions (Erlang-as-Reactor architecture) + * ============================================================================ */ + +/** + * @brief Register a file descriptor for reactor monitoring + * + * NIF: reactor_register_fd(ContextRef, Fd, OwnerPid) -> {ok, FdRef} | {error, Reason} + */ +ERL_NIF_TERM nif_reactor_register_fd(ErlNifEnv *env, int argc, + const ERL_NIF_TERM argv[]); + +/** + * @brief Re-register for read events after a one-shot event + * + * NIF: reactor_reselect_read(FdRef) -> ok | {error, Reason} + */ +ERL_NIF_TERM nif_reactor_reselect_read(ErlNifEnv *env, int argc, + const ERL_NIF_TERM argv[]); + +/** + * @brief Register for write events + * + * NIF: reactor_select_write(FdRef) -> ok | {error, Reason} + */ +ERL_NIF_TERM nif_reactor_select_write(ErlNifEnv *env, int argc, + const ERL_NIF_TERM argv[]); + +/** + * @brief Get the FD integer from an FD resource + * + * NIF: get_fd_from_resource(FdRef) -> Fd | {error, Reason} + */ +ERL_NIF_TERM nif_get_fd_from_resource(ErlNifEnv *env, int argc, + const ERL_NIF_TERM argv[]); + +/** + * @brief Call Python protocol on_read_ready + * + * NIF: reactor_on_read_ready(ContextRef, Fd) -> {ok, Action} | {error, Reason} + */ +ERL_NIF_TERM nif_reactor_on_read_ready(ErlNifEnv *env, int argc, + const ERL_NIF_TERM argv[]); + +/** + * @brief Call Python protocol on_write_ready + * + * NIF: reactor_on_write_ready(ContextRef, Fd) -> {ok, Action} | {error, Reason} + */ +ERL_NIF_TERM nif_reactor_on_write_ready(ErlNifEnv *env, int argc, + const ERL_NIF_TERM argv[]); + +/** + * @brief Initialize connection with Python protocol + * + * NIF: reactor_init_connection(ContextRef, Fd, ClientInfo) -> ok | {error, Reason} + */ +ERL_NIF_TERM nif_reactor_init_connection(ErlNifEnv *env, int argc, + const ERL_NIF_TERM argv[]); + +/** + * @brief Close FD and cleanup Python protocol + * + * NIF: reactor_close_fd(FdRef) -> ok | {error, Reason} + */ +ERL_NIF_TERM nif_reactor_close_fd(ErlNifEnv *env, int argc, + const ERL_NIF_TERM argv[]); + #endif /* PY_EVENT_LOOP_H */ diff --git a/c_src/py_nif.c b/c_src/py_nif.c index 323603f..1f4219f 100644 --- a/c_src/py_nif.c +++ b/c_src/py_nif.c @@ -39,6 +39,7 @@ #include "py_nif.h" #include "py_asgi.h" #include "py_wsgi.h" +#include "py_event_loop.h" /* ============================================================================ * Global state definitions @@ -3865,7 +3866,17 @@ static ErlNifFunc nif_funcs[] = { {"ref_interp_id", 1, nif_ref_interp_id, 0}, {"ref_to_term", 1, nif_ref_to_term, 0}, {"ref_getattr", 2, nif_ref_getattr, ERL_NIF_DIRTY_JOB_CPU_BOUND}, - {"ref_call_method", 3, nif_ref_call_method, ERL_NIF_DIRTY_JOB_CPU_BOUND} + {"ref_call_method", 3, nif_ref_call_method, ERL_NIF_DIRTY_JOB_CPU_BOUND}, + + /* Reactor NIFs - Erlang-as-Reactor architecture */ + {"reactor_register_fd", 3, nif_reactor_register_fd, 0}, + {"reactor_reselect_read", 1, nif_reactor_reselect_read, 0}, + {"reactor_select_write", 1, nif_reactor_select_write, 0}, + {"get_fd_from_resource", 1, nif_get_fd_from_resource, 0}, + {"reactor_on_read_ready", 2, nif_reactor_on_read_ready, ERL_NIF_DIRTY_JOB_CPU_BOUND}, + {"reactor_on_write_ready", 2, nif_reactor_on_write_ready, ERL_NIF_DIRTY_JOB_CPU_BOUND}, + {"reactor_init_connection", 3, nif_reactor_init_connection, ERL_NIF_DIRTY_JOB_CPU_BOUND}, + {"reactor_close_fd", 1, nif_reactor_close_fd, 0} }; ERL_NIF_INIT(py_nif, nif_funcs, load, NULL, upgrade, unload) diff --git a/src/py_reactor_context.erl b/src/py_reactor_context.erl index 2860c42..1047478 100644 --- a/src/py_reactor_context.erl +++ b/src/py_reactor_context.erl @@ -40,7 +40,9 @@ start_link/2, start_link/3, stop/1, - stats/1 + stats/1, + handoff/2, + handoff/3 ]). %% Internal exports @@ -89,6 +91,8 @@ start_link(Id, Mode) -> %% - max_connections: Maximum connections per context (default: 100) %% - app_module: Python module containing ASGI/WSGI app %% - app_callable: Python callable name (e.g., "app", "application") +%% - setup_code: Binary Python code to execute after context creation +%% (useful for setting up protocol factory) %% %% @param Id Unique identifier for this context %% @param Mode Context mode (auto, subinterp, worker) @@ -141,6 +145,33 @@ stats(Ctx) when is_pid(Ctx) -> {error, timeout} end. +%% @doc Hand off a file descriptor to this reactor context. +%% +%% The context takes ownership of the FD and will handle I/O events. +%% This is the main entry point for handing off accepted connections. +%% +%% @param Fd The raw file descriptor (from inet:getfd/1) +%% @param ClientInfo Map with connection metadata (addr, port, type, etc.) +-spec handoff(integer(), map()) -> ok | {error, term()}. +handoff(Fd, ClientInfo) when is_integer(Fd), is_map(ClientInfo) -> + %% Get a reactor context from the pool or use default + case whereis(py_reactor_context_default) of + undefined -> + {error, no_reactor_context}; + Ctx -> + handoff(Ctx, Fd, ClientInfo) + end. + +%% @doc Hand off a file descriptor to a specific reactor context. +%% +%% @param Ctx The reactor context pid +%% @param Fd The raw file descriptor (from inet:getfd/1) +%% @param ClientInfo Map with connection metadata +-spec handoff(pid(), integer(), map()) -> ok | {error, term()}. +handoff(Ctx, Fd, ClientInfo) when is_pid(Ctx), is_integer(Fd), is_map(ClientInfo) -> + Ctx ! {fd_handoff, Fd, ClientInfo}, + ok. + %% ============================================================================ %% Process loop %% ============================================================================ @@ -169,6 +200,19 @@ init(Parent, Id, Mode, Opts) -> AppModule = maps:get(app_module, Opts, undefined), AppCallable = maps:get(app_callable, Opts, undefined), + %% Execute setup code if specified (e.g., set protocol factory) + SetupCode = maps:get(setup_code, Opts, undefined), + case SetupCode of + undefined -> ok; + _ when is_binary(SetupCode) -> + case py_nif:context_exec(Ref, SetupCode) of + ok -> ok; + {error, Reason} -> + error_logger:error_msg( + "py_reactor_context setup_code failed: ~p~n", [Reason]) + end + end, + %% Initialize app in Python context if specified case AppModule of undefined -> ok; @@ -214,6 +258,15 @@ loop(State) -> {select, FdRes, _Ref, ready_output} -> handle_write_ready(FdRes, State); + %% Async completion signal from Python + %% Sent when an async task (e.g., ASGI app) completes and response is ready + %% Accept both atom and binary forms since Python sends binaries + {write_ready, Fd} -> + handle_async_write_ready(Fd, State); + + {<<"write_ready">>, Fd} -> + handle_async_write_ready(Fd, State); + %% Control messages {stop, From, MRef} -> cleanup(State), @@ -265,8 +318,11 @@ handle_fd_handoff(Fd, ClientInfo, State) -> %% Register FD for monitoring case py_nif:reactor_register_fd(Ref, Fd, self()) of {ok, FdRef} -> + %% Inject reactor_pid into client_info for async signaling + ClientInfoWithPid = ClientInfo#{reactor_pid => self()}, + %% Initialize Python protocol handler - case py_nif:reactor_init_connection(Ref, Fd, ClientInfo) of + case py_nif:reactor_init_connection(Ref, Fd, ClientInfoWithPid) of ok -> %% Store connection info ConnInfo = #{ @@ -319,6 +375,16 @@ handle_read_ready(FdRes, State) -> }, loop(NewState); + {ok, <<"async_pending">>} -> + %% Async task submitted (e.g., ASGI app running as task) + %% Don't reselect - wait for {write_ready, Fd} signal from Python + %% Increment request count since task was accepted + error_logger:info_msg("Received async_pending for fd=~p~n", [Fd]), + NewState = State#state{ + total_requests = State#state.total_requests + 1 + }, + loop(NewState); + {ok, <<"close">>} -> %% Close connection close_connection(Fd, FdRes, State); @@ -370,6 +436,31 @@ handle_write_ready(FdRes, State) -> loop(State) end. +%% ============================================================================ +%% Async Write Ready Handler +%% ============================================================================ + +%% @private +%% Handle async completion signal from Python. +%% This is sent when an async task (like an ASGI app) has completed +%% and the response buffer is ready to be written. +handle_async_write_ready(Fd, State) -> + #state{connections = Conns} = State, + + error_logger:info_msg("handle_async_write_ready called for fd=~p~n", [Fd]), + + case maps:get(Fd, Conns, undefined) of + #{fd_ref := FdRef} -> + %% Response buffer is ready, trigger write selection + error_logger:info_msg("Triggering write selection for fd=~p~n", [Fd]), + py_nif:reactor_select_write(FdRef), + loop(State); + undefined -> + %% Connection not found (may have been closed), ignore + error_logger:warning_msg("Connection not found for fd=~p~n", [Fd]), + loop(State) + end. + %% ============================================================================ %% Connection Management %% ============================================================================ diff --git a/test/py_reactor_SUITE.erl b/test/py_reactor_SUITE.erl index 65e8b82..e1b541e 100644 --- a/test/py_reactor_SUITE.erl +++ b/test/py_reactor_SUITE.erl @@ -20,7 +20,8 @@ set_protocol_factory_test/1, echo_protocol_test/1, multiple_connections_test/1, - protocol_close_test/1 + protocol_close_test/1, + async_pending_test/1 ]). all() -> [ @@ -29,7 +30,8 @@ all() -> [ set_protocol_factory_test, echo_protocol_test, multiple_connections_test, - protocol_close_test + protocol_close_test, + async_pending_test ]. init_per_suite(Config) -> @@ -204,3 +206,132 @@ def run_close_test(): _close_test_result = run_close_test() ">>), {ok, true} = py:eval(Ctx, <<"_close_test_result">>). + +%% @doc Test async_pending action for task-based async operations. +%% This tests the pattern used by task-based ASGI where: +%% 1. Protocol returns "async_pending" to indicate a task was submitted +%% 2. Later, Python sends {write_ready, Fd} to signal completion +%% 3. Reactor then triggers write selection +async_pending_test(_Config) -> + %% Protocol factory code to run in reactor context + SetupCode = <<" +import erlang +import erlang.reactor as reactor + +class AsyncPendingProtocol(reactor.Protocol): + '''Protocol that returns async_pending and signals completion.''' + + def __init__(self): + super().__init__() + self.reactor_pid = None + self.pending_response = b'' + + def connection_made(self, fd, client_info): + super().connection_made(fd, client_info) + self.reactor_pid = client_info.get('reactor_pid') + + def data_received(self, data): + import sys + self.pending_response = b'ASYNC:' + data + # Immediately complete the task and signal reactor + self.write_buffer.extend(self.pending_response) + if self.reactor_pid: + print(f'Sending write_ready to {self.reactor_pid} for fd={self.fd}', file=sys.stderr) + try: + erlang.send(self.reactor_pid, ('write_ready', self.fd)) + print('write_ready sent successfully', file=sys.stderr) + except Exception as e: + print(f'erlang.send failed: {e}', file=sys.stderr) + else: + print('No reactor_pid available!', file=sys.stderr) + return 'async_pending' + + def write_ready(self): + if not self.write_buffer: + return 'close' + written = self.write(bytes(self.write_buffer)) + del self.write_buffer[:written] + return 'continue' if self.write_buffer else 'close' + +reactor.set_protocol_factory(AsyncPendingProtocol) +">>, + + %% Start reactor context with protocol factory setup + {ok, ReactorCtx} = py_reactor_context:start_link(1, auto, #{ + setup_code => SetupCode + }), + + %% Use py:context(1) for test helpers (socket management) + PyCtx = py:context(1), + ok = py:exec(PyCtx, <<" +import socket + +_async_test_state = {} + +def setup_socketpair(): + global _async_test_state + s1, s2 = socket.socketpair() + s1.setblocking(False) + s2.setblocking(False) + _async_test_state = {'s1': s1, 's2': s2, 'fd': s1.fileno()} + return s1.fileno() + +def send_test_data(): + s2 = _async_test_state['s2'] + s2.send(b'hello') + return True + +def read_response(): + s2 = _async_test_state['s2'] + s2.setblocking(True) + s2.settimeout(2.0) + try: + return s2.recv(1024) + except socket.timeout: + return b'TIMEOUT' + +def cleanup(): + s1 = _async_test_state.get('s1') + s2 = _async_test_state.get('s2') + try: + if s1: s1.close() + except: + pass + try: + if s2: s2.close() + except: + pass + _async_test_state.clear() + return True +">>), + + %% Step 1: Create socketpair + {ok, Fd} = py:eval(PyCtx, <<"setup_socketpair()">>), + ct:pal("Created socketpair with fd=~p", [Fd]), + + %% Check reactor context is alive + ct:pal("Reactor context ~p is alive: ~p", [ReactorCtx, is_process_alive(ReactorCtx)]), + + %% Step 2: Send fd_handoff to reactor context + ok = py_reactor_context:handoff(ReactorCtx, Fd, #{}), + timer:sleep(100), + + %% Check reactor stats after handoff + Stats = py_reactor_context:stats(ReactorCtx), + ct:pal("Reactor stats after handoff: ~p", [Stats]), + + %% Step 3: Send test data - triggers async_pending and immediate completion + {ok, true} = py:eval(PyCtx, <<"send_test_data()">>), + timer:sleep(200), + + %% Step 4: Read response + {ok, Response} = py:eval(PyCtx, <<"read_response()">>), + ct:pal("Response: ~p", [Response]), + + %% Verify response + <<"ASYNC:hello">> = Response, + + %% Cleanup + {ok, _} = py:eval(PyCtx, <<"cleanup()">>), + py_reactor_context:stop(ReactorCtx), + ok. From 5b0f4e3414b7aa25fde6f380a7387d864c449deb Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Sun, 8 Mar 2026 02:08:12 +0100 Subject: [PATCH 2/5] Fix reactor NIFs to use py_context_acquire for subinterpreter support The reactor NIFs were using gil_acquire() which doesn't switch to the context's subinterpreter state, causing segfaults when running Python code in the wrong interpreter. Changed nif_reactor_on_read_ready, nif_reactor_on_write_ready, and nif_reactor_init_connection to use py_context_acquire/py_context_release which properly handles thread state switching for subinterpreters. --- c_src/py_event_loop.c | 47 ++++++++++++++++++++++++++----------------- 1 file changed, 28 insertions(+), 19 deletions(-) diff --git a/c_src/py_event_loop.c b/c_src/py_event_loop.c index f17cdff..45ce1a0 100644 --- a/c_src/py_event_loop.c +++ b/c_src/py_event_loop.c @@ -3084,14 +3084,17 @@ ERL_NIF_TERM nif_reactor_on_read_ready(ErlNifEnv *env, int argc, return make_error(env, "invalid_fd"); } - /* Acquire GIL and call Python */ - gil_guard_t guard = gil_acquire(); + /* Acquire context (handles both worker mode and subinterpreter mode) */ + py_context_guard_t guard = py_context_acquire(ctx); + if (!guard.acquired) { + return make_error(env, "acquire_failed"); + } - /* Import erlang_reactor module */ + /* Import erlang.reactor module */ PyObject *reactor_module = PyImport_ImportModule("erlang.reactor"); if (reactor_module == NULL) { PyErr_Clear(); - gil_release(guard); + py_context_release(&guard); return make_error(env, "import_erlang_reactor_failed"); } @@ -3102,7 +3105,7 @@ ERL_NIF_TERM nif_reactor_on_read_ready(ErlNifEnv *env, int argc, if (result == NULL) { PyErr_Clear(); - gil_release(guard); + py_context_release(&guard); return make_error(env, "on_read_ready_failed"); } @@ -3122,7 +3125,7 @@ ERL_NIF_TERM nif_reactor_on_read_ready(ErlNifEnv *env, int argc, } Py_DECREF(result); - gil_release(guard); + py_context_release(&guard); return enif_make_tuple2(env, ATOM_OK, action); } @@ -3149,14 +3152,17 @@ ERL_NIF_TERM nif_reactor_on_write_ready(ErlNifEnv *env, int argc, return make_error(env, "invalid_fd"); } - /* Acquire GIL and call Python */ - gil_guard_t guard = gil_acquire(); + /* Acquire context (handles both worker mode and subinterpreter mode) */ + py_context_guard_t guard = py_context_acquire(ctx); + if (!guard.acquired) { + return make_error(env, "acquire_failed"); + } - /* Import erlang_reactor module */ + /* Import erlang.reactor module */ PyObject *reactor_module = PyImport_ImportModule("erlang.reactor"); if (reactor_module == NULL) { PyErr_Clear(); - gil_release(guard); + py_context_release(&guard); return make_error(env, "import_erlang_reactor_failed"); } @@ -3167,7 +3173,7 @@ ERL_NIF_TERM nif_reactor_on_write_ready(ErlNifEnv *env, int argc, if (result == NULL) { PyErr_Clear(); - gil_release(guard); + py_context_release(&guard); return make_error(env, "on_write_ready_failed"); } @@ -3187,7 +3193,7 @@ ERL_NIF_TERM nif_reactor_on_write_ready(ErlNifEnv *env, int argc, } Py_DECREF(result); - gil_release(guard); + py_context_release(&guard); return enif_make_tuple2(env, ATOM_OK, action); } @@ -3219,23 +3225,26 @@ ERL_NIF_TERM nif_reactor_init_connection(ErlNifEnv *env, int argc, return make_error(env, "invalid_client_info"); } - /* Acquire GIL and call Python */ - gil_guard_t guard = gil_acquire(); + /* Acquire context (handles both worker mode and subinterpreter mode) */ + py_context_guard_t guard = py_context_acquire(ctx); + if (!guard.acquired) { + return make_error(env, "acquire_failed"); + } /* Convert Erlang map to Python dict */ PyObject *client_info = term_to_py(env, argv[2]); if (client_info == NULL) { PyErr_Clear(); - gil_release(guard); + py_context_release(&guard); return make_error(env, "client_info_conversion_failed"); } - /* Import erlang_reactor module */ + /* Import erlang.reactor module */ PyObject *reactor_module = PyImport_ImportModule("erlang.reactor"); if (reactor_module == NULL) { Py_DECREF(client_info); PyErr_Clear(); - gil_release(guard); + py_context_release(&guard); return make_error(env, "import_erlang_reactor_failed"); } @@ -3247,12 +3256,12 @@ ERL_NIF_TERM nif_reactor_init_connection(ErlNifEnv *env, int argc, if (result == NULL) { PyErr_Clear(); - gil_release(guard); + py_context_release(&guard); return make_error(env, "init_connection_failed"); } Py_DECREF(result); - gil_release(guard); + py_context_release(&guard); return ATOM_OK; } From 77a3994308dc5f71bf71bfcd01cfdc154eae82b6 Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Sun, 8 Mar 2026 02:12:14 +0100 Subject: [PATCH 3/5] Add reactor.signal_write_ready(fd) API Provides a clean API for protocols to signal async completion instead of manually sending to the reactor PID. The reactor PID is stored per-fd in init_connection and cleaned up in close_connection. --- priv/_erlang_impl/_reactor.py | 29 ++++++++++++++++++++++++++++- test/py_reactor_SUITE.erl | 17 +---------------- 2 files changed, 29 insertions(+), 17 deletions(-) diff --git a/priv/_erlang_impl/_reactor.py b/priv/_erlang_impl/_reactor.py index 2144df9..a0cf612 100644 --- a/priv/_erlang_impl/_reactor.py +++ b/priv/_erlang_impl/_reactor.py @@ -47,6 +47,7 @@ def write_ready(self): 'on_read_ready', 'on_write_ready', 'close_connection', + 'signal_write_ready', ] @@ -153,6 +154,7 @@ def write(self, data: bytes) -> int: # ============================================================================= _protocols: Dict[int, Protocol] = {} +_reactor_pids: Dict[int, object] = {} # fd -> reactor PID _protocol_factory: Optional[Callable[[], Protocol]] = None @@ -193,11 +195,15 @@ def init_connection(fd: int, client_info: dict): fd: File descriptor client_info: Connection metadata from Erlang """ - global _protocols, _protocol_factory + global _protocols, _protocol_factory, _reactor_pids if _protocol_factory is not None: proto = _protocol_factory() proto.connection_made(fd, client_info) _protocols[fd] = proto + # Store reactor PID for signal_write_ready + reactor_pid = client_info.get('reactor_pid') + if reactor_pid is not None: + _reactor_pids[fd] = reactor_pid def on_read_ready(fd: int) -> str: @@ -246,6 +252,27 @@ def close_connection(fd: int): fd: File descriptor """ proto = _protocols.pop(fd, None) + _reactor_pids.pop(fd, None) if proto is not None: proto.closed = True proto.connection_lost() + + +def signal_write_ready(fd: int) -> bool: + """Signal the reactor that a response is ready for the given fd. + + Call this after an async task completes and the response buffer is ready. + The reactor will then trigger write selection for the fd. + + Args: + fd: File descriptor with pending response + + Returns: + True if signal was sent, False if no reactor PID registered + """ + import erlang + reactor_pid = _reactor_pids.get(fd) + if reactor_pid is not None: + erlang.send(reactor_pid, ('write_ready', fd)) + return True + return False diff --git a/test/py_reactor_SUITE.erl b/test/py_reactor_SUITE.erl index e1b541e..9aec5f8 100644 --- a/test/py_reactor_SUITE.erl +++ b/test/py_reactor_SUITE.erl @@ -215,7 +215,6 @@ _close_test_result = run_close_test() async_pending_test(_Config) -> %% Protocol factory code to run in reactor context SetupCode = <<" -import erlang import erlang.reactor as reactor class AsyncPendingProtocol(reactor.Protocol): @@ -223,27 +222,13 @@ class AsyncPendingProtocol(reactor.Protocol): def __init__(self): super().__init__() - self.reactor_pid = None self.pending_response = b'' - def connection_made(self, fd, client_info): - super().connection_made(fd, client_info) - self.reactor_pid = client_info.get('reactor_pid') - def data_received(self, data): - import sys self.pending_response = b'ASYNC:' + data # Immediately complete the task and signal reactor self.write_buffer.extend(self.pending_response) - if self.reactor_pid: - print(f'Sending write_ready to {self.reactor_pid} for fd={self.fd}', file=sys.stderr) - try: - erlang.send(self.reactor_pid, ('write_ready', self.fd)) - print('write_ready sent successfully', file=sys.stderr) - except Exception as e: - print(f'erlang.send failed: {e}', file=sys.stderr) - else: - print('No reactor_pid available!', file=sys.stderr) + reactor.signal_write_ready(self.fd) return 'async_pending' def write_ready(self): From c19328781b702b0e5ca4bbb131b2b21729440200 Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Sun, 8 Mar 2026 02:25:17 +0100 Subject: [PATCH 4/5] Fix reactor_close_fd to use context for subinterpreter support Added context parameter to reactor_close_fd NIF so it can properly switch to the subinterpreter's thread state before calling Python. --- c_src/py_event_loop.c | 36 +++++++++++++++++++++--------------- c_src/py_event_loop.h | 2 +- c_src/py_nif.c | 2 +- src/py_nif.erl | 7 ++++--- src/py_reactor_context.erl | 5 +++-- 5 files changed, 30 insertions(+), 22 deletions(-) diff --git a/c_src/py_event_loop.c b/c_src/py_event_loop.c index 45ce1a0..c4b4bb5 100644 --- a/c_src/py_event_loop.c +++ b/c_src/py_event_loop.c @@ -3267,7 +3267,7 @@ ERL_NIF_TERM nif_reactor_init_connection(ErlNifEnv *env, int argc, } /** - * reactor_close_fd(FdRef) -> ok | {error, Reason} + * reactor_close_fd(ContextRef, FdRef) -> ok | {error, Reason} * * Close an FD and clean up the protocol handler. * Calls Python's erlang_reactor.close_connection(fd) if registered. @@ -3276,8 +3276,13 @@ ERL_NIF_TERM nif_reactor_close_fd(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) { (void)argc; + py_context_t *ctx; + if (!enif_get_resource(env, argv[0], PY_CONTEXT_RESOURCE_TYPE, (void **)&ctx)) { + return make_error(env, "invalid_context"); + } + fd_resource_t *fd_res; - if (!enif_get_resource(env, argv[0], FD_RESOURCE_TYPE, (void **)&fd_res)) { + if (!enif_get_resource(env, argv[1], FD_RESOURCE_TYPE, (void **)&fd_res)) { return make_error(env, "invalid_fd_ref"); } @@ -3293,20 +3298,21 @@ ERL_NIF_TERM nif_reactor_close_fd(ErlNifEnv *env, int argc, /* Call Python to clean up protocol handler */ if (fd >= 0) { - gil_guard_t guard = gil_acquire(); - - PyObject *reactor_module = PyImport_ImportModule("erlang.reactor"); - if (reactor_module != NULL) { - PyObject *result = PyObject_CallMethod(reactor_module, - "close_connection", "i", fd); - Py_XDECREF(result); - Py_DECREF(reactor_module); - PyErr_Clear(); /* Ignore errors during cleanup */ - } else { - PyErr_Clear(); - } + py_context_guard_t guard = py_context_acquire(ctx); + if (guard.acquired) { + PyObject *reactor_module = PyImport_ImportModule("erlang.reactor"); + if (reactor_module != NULL) { + PyObject *result = PyObject_CallMethod(reactor_module, + "close_connection", "i", fd); + Py_XDECREF(result); + Py_DECREF(reactor_module); + PyErr_Clear(); /* Ignore errors during cleanup */ + } else { + PyErr_Clear(); + } - gil_release(guard); + py_context_release(&guard); + } } /* Take ownership for cleanup */ diff --git a/c_src/py_event_loop.h b/c_src/py_event_loop.h index 2d3336c..c13231b 100644 --- a/c_src/py_event_loop.h +++ b/c_src/py_event_loop.h @@ -862,7 +862,7 @@ ERL_NIF_TERM nif_reactor_init_connection(ErlNifEnv *env, int argc, /** * @brief Close FD and cleanup Python protocol * - * NIF: reactor_close_fd(FdRef) -> ok | {error, Reason} + * NIF: reactor_close_fd(ContextRef, FdRef) -> ok | {error, Reason} */ ERL_NIF_TERM nif_reactor_close_fd(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]); diff --git a/c_src/py_nif.c b/c_src/py_nif.c index 1f4219f..1649a20 100644 --- a/c_src/py_nif.c +++ b/c_src/py_nif.c @@ -3876,7 +3876,7 @@ static ErlNifFunc nif_funcs[] = { {"reactor_on_read_ready", 2, nif_reactor_on_read_ready, ERL_NIF_DIRTY_JOB_CPU_BOUND}, {"reactor_on_write_ready", 2, nif_reactor_on_write_ready, ERL_NIF_DIRTY_JOB_CPU_BOUND}, {"reactor_init_connection", 3, nif_reactor_init_connection, ERL_NIF_DIRTY_JOB_CPU_BOUND}, - {"reactor_close_fd", 1, nif_reactor_close_fd, 0} + {"reactor_close_fd", 2, nif_reactor_close_fd, 0} }; ERL_NIF_INIT(py_nif, nif_funcs, load, NULL, upgrade, unload) diff --git a/src/py_nif.erl b/src/py_nif.erl index 82b2c79..d3facaf 100644 --- a/src/py_nif.erl +++ b/src/py_nif.erl @@ -188,7 +188,7 @@ reactor_on_read_ready/2, reactor_on_write_ready/2, reactor_init_connection/3, - reactor_close_fd/1 + reactor_close_fd/2 ]). -on_load(load_nif/0). @@ -1524,8 +1524,9 @@ reactor_init_connection(_ContextRef, _Fd, _ClientInfo) -> %% Calls Python's erlang_reactor.close_connection(fd) to clean up %% the protocol handler, then closes the FD. %% +%% @param ContextRef Context resource reference %% @param FdRef FD resource reference %% @returns ok | {error, Reason} --spec reactor_close_fd(reference()) -> ok | {error, term()}. -reactor_close_fd(_FdRef) -> +-spec reactor_close_fd(reference(), reference()) -> ok | {error, term()}. +reactor_close_fd(_ContextRef, _FdRef) -> ?NIF_STUB. diff --git a/src/py_reactor_context.erl b/src/py_reactor_context.erl index 1047478..fa4175b 100644 --- a/src/py_reactor_context.erl +++ b/src/py_reactor_context.erl @@ -468,12 +468,13 @@ handle_async_write_ready(Fd, State) -> %% @private close_connection(Fd, FdRes, State) -> #state{ + ref = Ref, connections = Conns, active_connections = Active } = State, %% Close via NIF (cleans up Python protocol handler) - py_nif:reactor_close_fd(FdRes), + py_nif:reactor_close_fd(Ref, FdRes), %% Remove from connections map NewConns = maps:remove(Fd, Conns), @@ -489,7 +490,7 @@ cleanup(State) -> %% Close all connections maps:foreach(fun(_Fd, #{fd_ref := FdRef}) -> - py_nif:reactor_close_fd(FdRef) + py_nif:reactor_close_fd(Ref, FdRef) end, Conns), %% Destroy Python context From 7e2ba21e188ed12ec4a61c3c48199d884edc8bc1 Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Sun, 8 Mar 2026 02:32:01 +0100 Subject: [PATCH 5/5] Fix missing context arg in reactor_close_fd call --- src/py_reactor_context.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/py_reactor_context.erl b/src/py_reactor_context.erl index fa4175b..60fb64d 100644 --- a/src/py_reactor_context.erl +++ b/src/py_reactor_context.erl @@ -339,7 +339,7 @@ handle_fd_handoff(Fd, ClientInfo, State) -> {error, _Reason} -> %% Failed to init connection, close - py_nif:reactor_close_fd(FdRef), + py_nif:reactor_close_fd(Ref, FdRef), loop(State) end;