Skip to content

Commit b145694

Browse files
committed
Add per-interpreter reactor cache for SHARED_GIL subinterpreters
Replace global reactor module cache with per-interpreter module state to support isolated protocol factories in SHARED_GIL subinterpreters. Changes: - Add reactor cache fields to py_event_loop_module_state_t - Replace ensure_reactor_cached() with ensure_reactor_cached_for_interp() - Export extend_erlang_module_in_context/1 from py_context - Call extend_erlang_module_in_context in py_reactor_context init - Handle both atom and binary action forms in reactor_context handlers - Add reactor_context_subinterp_isolation_test - Add bench_reactor_modes.erl benchmark comparing modes
1 parent a94d736 commit b145694

File tree

7 files changed

+719
-46
lines changed

7 files changed

+719
-46
lines changed

CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
- `reactor.run_fd(fd, protocol_factory)` - Handle a single FD with a protocol
3434
- Integrates with Erlang's `enif_select` for efficient I/O multiplexing
3535
- Zero-copy buffer management for high-throughput scenarios
36+
- Supports SHARED_GIL subinterpreters via `py_reactor_context`
37+
- Each reactor context has isolated protocol factory when using `mode=subinterp`
3638

3739
- **ETF encoding for PIDs and References** - Full Erlang term format support
3840
- Erlang PIDs encode/decode properly in ETF binary format
@@ -122,6 +124,11 @@
122124

123125
### Fixed
124126

127+
- **`py_reactor_context` now extends erlang module in subinterpreters** - Previously,
128+
`py_reactor_context` with `mode=subinterp` would fail to import `erlang.reactor`
129+
because the erlang module extension was not applied. Now calls
130+
`py_context:extend_erlang_module_in_context/1` after context creation.
131+
125132
- **FD stealing and UDP connected socket issues** - Fixed file descriptor handling
126133
for UDP sockets in connected mode
127134

c_src/py_event_loop.c

Lines changed: 82 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,20 @@ typedef struct {
106106

107107
/** @brief Isolation mode: 0=global, 1=per_loop */
108108
int isolation_mode;
109+
110+
/* ========== Per-Interpreter Reactor Cache ========== */
111+
112+
/** @brief Cached erlang.reactor module for this interpreter */
113+
PyObject *reactor_module;
114+
115+
/** @brief Cached on_read_ready callable */
116+
PyObject *reactor_on_read;
117+
118+
/** @brief Cached on_write_ready callable */
119+
PyObject *reactor_on_write;
120+
121+
/** @brief Whether reactor cache has been initialized */
122+
bool reactor_initialized;
109123
} py_event_loop_module_state_t;
110124

111125
/* ============================================================================
@@ -121,43 +135,40 @@ static bool g_global_shared_router_valid = false;
121135
static pthread_mutex_t g_global_router_mutex = PTHREAD_MUTEX_INITIALIZER;
122136

123137
/* ============================================================================
124-
* Cached Reactor Callables (Performance Optimization)
138+
* Per-Interpreter Reactor Cache
125139
* ============================================================================
126140
*
127-
* Cache erlang.reactor module and callbacks to avoid expensive PyImport
128-
* on every read/write callback in the hot path.
141+
* Reactor callables (erlang.reactor.on_read_ready, on_write_ready) are cached
142+
* per-interpreter in the module state. This ensures that subinterpreters use
143+
* their own reactor module instance rather than the main interpreter's.
144+
*
145+
* The cache is populated lazily on first reactor operation within each
146+
* interpreter.
129147
*/
130-
static PyObject *g_reactor_module = NULL;
131-
static PyObject *g_on_read_ready = NULL;
132-
static PyObject *g_on_write_ready = NULL;
133-
static bool g_reactor_cached = false;
134-
static pthread_mutex_t g_reactor_cache_mutex = PTHREAD_MUTEX_INITIALIZER;
135148

136149
/**
137-
* Initialize cached reactor callables.
150+
* Initialize cached reactor callables for the current interpreter.
138151
* MUST be called with GIL held.
139-
* Thread-safe: uses mutex for first initialization.
140152
*
153+
* Uses the module state to cache per-interpreter reactor references.
154+
* This is safe for subinterpreters since each has its own module state.
155+
*
156+
* @param state Module state for current interpreter
141157
* @return true if callables are cached and ready, false on error
142158
*/
143-
static bool ensure_reactor_cached(void) {
144-
/* Fast path: already cached */
145-
if (g_reactor_cached) {
146-
return true;
159+
static bool ensure_reactor_cached_for_interp(py_event_loop_module_state_t *state) {
160+
if (state == NULL) {
161+
return false;
147162
}
148163

149-
pthread_mutex_lock(&g_reactor_cache_mutex);
150-
151-
/* Double-check after acquiring lock */
152-
if (g_reactor_cached) {
153-
pthread_mutex_unlock(&g_reactor_cache_mutex);
164+
/* Fast path: already cached for this interpreter */
165+
if (state->reactor_initialized) {
154166
return true;
155167
}
156168

157-
/* Import erlang.reactor module */
169+
/* Import erlang.reactor module in THIS interpreter */
158170
PyObject *module = PyImport_ImportModule("erlang.reactor");
159171
if (module == NULL) {
160-
pthread_mutex_unlock(&g_reactor_cache_mutex);
161172
return false;
162173
}
163174

@@ -166,7 +177,6 @@ static bool ensure_reactor_cached(void) {
166177
if (on_read == NULL || !PyCallable_Check(on_read)) {
167178
Py_XDECREF(on_read);
168179
Py_DECREF(module);
169-
pthread_mutex_unlock(&g_reactor_cache_mutex);
170180
return false;
171181
}
172182

@@ -176,20 +186,36 @@ static bool ensure_reactor_cached(void) {
176186
Py_XDECREF(on_write);
177187
Py_DECREF(on_read);
178188
Py_DECREF(module);
179-
pthread_mutex_unlock(&g_reactor_cache_mutex);
180189
return false;
181190
}
182191

183-
/* Store cached references */
184-
g_reactor_module = module;
185-
g_on_read_ready = on_read;
186-
g_on_write_ready = on_write;
187-
g_reactor_cached = true;
192+
/* Store cached references in module state */
193+
state->reactor_module = module;
194+
state->reactor_on_read = on_read;
195+
state->reactor_on_write = on_write;
196+
state->reactor_initialized = true;
188197

189-
pthread_mutex_unlock(&g_reactor_cache_mutex);
190198
return true;
191199
}
192200

201+
/**
202+
* Clean up reactor cache in module state.
203+
* Called during module deallocation.
204+
*/
205+
static void cleanup_reactor_cache(py_event_loop_module_state_t *state) {
206+
if (state == NULL) {
207+
return;
208+
}
209+
210+
Py_XDECREF(state->reactor_module);
211+
Py_XDECREF(state->reactor_on_read);
212+
Py_XDECREF(state->reactor_on_write);
213+
state->reactor_module = NULL;
214+
state->reactor_on_read = NULL;
215+
state->reactor_on_write = NULL;
216+
state->reactor_initialized = false;
217+
}
218+
193219
/* Forward declaration for module state access */
194220
static py_event_loop_module_state_t *get_module_state(void);
195221
static py_event_loop_module_state_t *get_module_state_from_module(PyObject *module);
@@ -3246,15 +3272,16 @@ ERL_NIF_TERM nif_reactor_on_read_ready(ErlNifEnv *env, int argc,
32463272
return make_error(env, "buffer_creation_failed");
32473273
}
32483274

3249-
/* Ensure reactor callables are cached (fast path after first call) */
3250-
if (!ensure_reactor_cached()) {
3275+
/* Get module state for THIS interpreter's reactor cache */
3276+
py_event_loop_module_state_t *state = get_module_state();
3277+
if (!ensure_reactor_cached_for_interp(state)) {
32513278
PyErr_Clear();
32523279
Py_DECREF(py_buffer);
32533280
py_context_release(&guard);
32543281
return make_error(env, "reactor_cache_init_failed");
32553282
}
32563283

3257-
/* Call cached on_read_ready(fd, data) - avoids PyImport on every call */
3284+
/* Call cached on_read_ready(fd, data) - uses THIS interpreter's reactor */
32583285
PyObject *py_fd = PyLong_FromLong(fd);
32593286
if (py_fd == NULL) {
32603287
PyErr_Clear();
@@ -3263,7 +3290,7 @@ ERL_NIF_TERM nif_reactor_on_read_ready(ErlNifEnv *env, int argc,
32633290
return make_error(env, "fd_conversion_failed");
32643291
}
32653292

3266-
PyObject *result = PyObject_CallFunctionObjArgs(g_on_read_ready, py_fd, py_buffer, NULL);
3293+
PyObject *result = PyObject_CallFunctionObjArgs(state->reactor_on_read, py_fd, py_buffer, NULL);
32673294
Py_DECREF(py_fd);
32683295
Py_DECREF(py_buffer);
32693296

@@ -3322,22 +3349,23 @@ ERL_NIF_TERM nif_reactor_on_write_ready(ErlNifEnv *env, int argc,
33223349
return make_error(env, "acquire_failed");
33233350
}
33243351

3325-
/* Ensure reactor callables are cached (fast path after first call) */
3326-
if (!ensure_reactor_cached()) {
3352+
/* Get module state for THIS interpreter's reactor cache */
3353+
py_event_loop_module_state_t *state = get_module_state();
3354+
if (!ensure_reactor_cached_for_interp(state)) {
33273355
PyErr_Clear();
33283356
py_context_release(&guard);
33293357
return make_error(env, "reactor_cache_init_failed");
33303358
}
33313359

3332-
/* Call cached on_write_ready(fd) - avoids PyImport on every call */
3360+
/* Call cached on_write_ready(fd) - uses THIS interpreter's reactor */
33333361
PyObject *py_fd = PyLong_FromLong(fd);
33343362
if (py_fd == NULL) {
33353363
PyErr_Clear();
33363364
py_context_release(&guard);
33373365
return make_error(env, "fd_conversion_failed");
33383366
}
33393367

3340-
PyObject *result = PyObject_CallFunctionObjArgs(g_on_write_ready, py_fd, NULL);
3368+
PyObject *result = PyObject_CallFunctionObjArgs(state->reactor_on_write, py_fd, NULL);
33413369
Py_DECREF(py_fd);
33423370

33433371
if (result == NULL) {
@@ -5257,13 +5285,25 @@ static PyMethodDef PyEventLoopMethods[] = {
52575285
{NULL, NULL, 0, NULL}
52585286
};
52595287

5288+
/**
5289+
* Module free callback - cleans up per-interpreter state.
5290+
* Called when the module is being deallocated.
5291+
*/
5292+
static void py_event_loop_module_free(void *module) {
5293+
py_event_loop_module_state_t *state = PyModule_GetState((PyObject *)module);
5294+
if (state != NULL) {
5295+
cleanup_reactor_cache(state);
5296+
}
5297+
}
5298+
52605299
/* Module definition with module state for per-interpreter isolation */
52615300
static struct PyModuleDef PyEventLoopModuleDef = {
52625301
PyModuleDef_HEAD_INIT,
52635302
.m_name = "py_event_loop",
52645303
.m_doc = "Erlang-native asyncio event loop",
52655304
.m_size = sizeof(py_event_loop_module_state_t),
52665305
.m_methods = PyEventLoopMethods,
5306+
.m_free = py_event_loop_module_free,
52675307
};
52685308

52695309
/**
@@ -5290,6 +5330,11 @@ int create_py_event_loop_module(void) {
52905330
state->event_loop = NULL;
52915331
state->shared_router_valid = false;
52925332
state->isolation_mode = 0; /* global mode by default */
5333+
/* Initialize reactor cache (will be populated lazily) */
5334+
state->reactor_module = NULL;
5335+
state->reactor_on_read = NULL;
5336+
state->reactor_on_write = NULL;
5337+
state->reactor_initialized = false;
52935338
}
52945339

52955340
/* Add module to sys.modules (reuse sys_modules from idempotency check) */

docs/reactor.md

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -571,6 +571,30 @@ Internal - called by NIF when fd is writable.
571571

572572
Internal - called by NIF to close connection.
573573

574+
## Subinterpreter Support
575+
576+
The reactor supports isolated subinterpreters via `py_reactor_context`. Each subinterpreter has its own reactor module cache, ensuring protocol factories are isolated between contexts.
577+
578+
```erlang
579+
%% Create context with subinterpreter mode
580+
{ok, Ctx1} = py_reactor_context:start_link(1, subinterp, #{
581+
setup_code => <<"
582+
import erlang.reactor as reactor
583+
reactor.set_protocol_factory(EchoProtocol)
584+
">>
585+
}),
586+
587+
%% Create another context with different protocol
588+
{ok, Ctx2} = py_reactor_context:start_link(2, subinterp, #{
589+
setup_code => <<"
590+
import erlang.reactor as reactor
591+
reactor.set_protocol_factory(HttpProtocol)
592+
">>
593+
}).
594+
```
595+
596+
Each context runs in its own subinterpreter with isolated protocol factory and connection state. This enables running multiple protocol handlers in the same BEAM VM without interference.
597+
574598
## See Also
575599

576600
- [Asyncio](asyncio.md) - Higher-level asyncio event loop for Python

0 commit comments

Comments
 (0)