Skip to content

Commit 1b87ea2

Browse files
committed
Fix event loop isolation, atom safety, and Python 3.14 compat
- Add _has_loop_ref() to prevent concurrent loops while allowing sequential replacement (checks is_running() not just exists) - Add _clear_loop_ref() called on loop close for proper cleanup - Add global_loop_capsule_destructor to fix resource leak - Rename atom() to _atom() in C, add Python wrapper with cache and configurable limit (ERLANG_PYTHON_MAX_ATOMS, default 10000) - Use enif_make_existing_atom() first to avoid duplicate atoms - Fix venv .pth file processing for Python 3.14 subinterpreters by embedding site-packages path directly in exec code
1 parent 9e063fa commit 1b87ea2

File tree

6 files changed

+212
-15
lines changed

6 files changed

+212
-15
lines changed

c_src/py_callback.c

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2862,10 +2862,11 @@ static PyMethodDef ErlangModuleMethods[] = {
28622862
"Call a registered Erlang function.\n\n"
28632863
"Usage: erlang.call('func_name', arg1, arg2, ...)\n"
28642864
"Returns: The result from the Erlang function."},
2865-
{"atom", erlang_atom_impl, METH_VARARGS,
2866-
"Create an Erlang atom.\n\n"
2867-
"Usage: erlang.atom('name')\n"
2868-
"Returns: An ErlangAtom object that converts to an Erlang atom."},
2865+
{"_atom", erlang_atom_impl, METH_VARARGS,
2866+
"Internal: Create an Erlang atom.\n\n"
2867+
"Usage: erlang._atom('name')\n"
2868+
"Returns: An ErlangAtom object that converts to an Erlang atom.\n"
2869+
"NOTE: Use erlang.atom() wrapper instead for safety limits."},
28692870
{"send", erlang_send_impl, METH_VARARGS,
28702871
"Send a message to an Erlang process (fire-and-forget).\n\n"
28712872
"Usage: erlang.send(pid, term)\n"

c_src/py_convert.c

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -353,6 +353,12 @@ ERL_NIF_TERM py_to_term(ErlNifEnv *env, PyObject *obj) {
353353
/* Handle ErlangAtom → Erlang atom */
354354
if (Py_IS_TYPE(obj, &ErlangAtomType)) {
355355
ErlangAtomObject *atom_obj = (ErlangAtomObject *)obj;
356+
ERL_NIF_TERM atom_term;
357+
/* Try existing atom first (no new allocation) */
358+
if (enif_make_existing_atom(env, atom_obj->name, &atom_term, ERL_NIF_LATIN1)) {
359+
return atom_term;
360+
}
361+
/* Atom doesn't exist yet, create it */
356362
return enif_make_atom(env, atom_obj->name);
357363
}
358364

c_src/py_event_loop.c

Lines changed: 107 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6176,6 +6176,20 @@ static void loop_capsule_destructor(PyObject *capsule) {
61766176
}
61776177
}
61786178

6179+
/**
6180+
* Destructor for global loop capsules.
6181+
* Only releases reference - does NOT signal shutdown since the global
6182+
* loop is shared and managed by Erlang, not Python.
6183+
*/
6184+
static void global_loop_capsule_destructor(PyObject *capsule) {
6185+
erlang_event_loop_t *loop = (erlang_event_loop_t *)PyCapsule_GetPointer(
6186+
capsule, LOOP_CAPSULE_NAME);
6187+
if (loop != NULL) {
6188+
/* Only release the reference, don't shutdown */
6189+
enif_release_resource(loop);
6190+
}
6191+
}
6192+
61796193
/* Python function: _loop_new() -> capsule */
61806194
static PyObject *py_loop_new(PyObject *self, PyObject *args) {
61816195
(void)self;
@@ -6384,6 +6398,36 @@ static PyObject *py_set_global_loop_ref(PyObject *self, PyObject *args) {
63846398
Py_RETURN_NONE;
63856399
}
63866400

6401+
/**
6402+
* Python function: _clear_loop_ref(capsule)
6403+
*
6404+
* Clear the Python loop reference from an event loop capsule.
6405+
* Should be called when the Python loop is closed to allow
6406+
* creating a new loop later.
6407+
*/
6408+
static PyObject *py_clear_loop_ref(PyObject *self, PyObject *args) {
6409+
(void)self;
6410+
PyObject *capsule;
6411+
6412+
if (!PyArg_ParseTuple(args, "O", &capsule)) {
6413+
return NULL;
6414+
}
6415+
6416+
erlang_event_loop_t *loop = loop_from_capsule(capsule);
6417+
if (loop == NULL) {
6418+
return NULL;
6419+
}
6420+
6421+
/* Clear the Python loop reference */
6422+
if (loop->py_loop != NULL) {
6423+
Py_DECREF(loop->py_loop);
6424+
loop->py_loop = NULL;
6425+
}
6426+
loop->py_loop_valid = false;
6427+
6428+
Py_RETURN_NONE;
6429+
}
6430+
63876431
/* Python function: _get_global_loop_capsule() -> capsule
63886432
*
63896433
* Returns a capsule for the global interpreter event loop.
@@ -6405,7 +6449,67 @@ static PyObject *py_get_global_loop_capsule(PyObject *self, PyObject *args) {
64056449
/* Keep the resource alive while capsule exists */
64066450
enif_keep_resource(loop);
64076451

6408-
return PyCapsule_New(loop, LOOP_CAPSULE_NAME, NULL);
6452+
return PyCapsule_New(loop, LOOP_CAPSULE_NAME, global_loop_capsule_destructor);
6453+
}
6454+
6455+
/**
6456+
* Python function: _has_loop_ref(capsule) -> bool
6457+
*
6458+
* Check if a loop capsule has an ACTIVE Python loop reference.
6459+
* Returns True only if there's a valid loop that is currently RUNNING.
6460+
* This prevents multiple concurrent loops while allowing sequential
6461+
* loop replacement (e.g., between test cases).
6462+
*
6463+
* The key insight is that the event confusion bug occurs when multiple
6464+
* loops are running simultaneously. A non-running loop (even if not
6465+
* explicitly closed) can be safely replaced.
6466+
*/
6467+
static PyObject *py_has_loop_ref(PyObject *self, PyObject *args) {
6468+
(void)self;
6469+
PyObject *capsule;
6470+
6471+
if (!PyArg_ParseTuple(args, "O", &capsule)) {
6472+
return NULL;
6473+
}
6474+
6475+
erlang_event_loop_t *loop = loop_from_capsule(capsule);
6476+
if (loop == NULL) {
6477+
return NULL;
6478+
}
6479+
6480+
if (loop->py_loop_valid && loop->py_loop != NULL) {
6481+
/* Check if the existing loop is running - only block if running */
6482+
PyObject *is_running = PyObject_CallMethod(loop->py_loop, "is_running", NULL);
6483+
if (is_running != NULL) {
6484+
int running = PyObject_IsTrue(is_running);
6485+
Py_DECREF(is_running);
6486+
if (running) {
6487+
/* Loop is still running - prevent concurrent loop creation */
6488+
Py_RETURN_TRUE;
6489+
}
6490+
} else {
6491+
/* Error calling is_running - clear error and check is_closed as fallback */
6492+
PyErr_Clear();
6493+
}
6494+
6495+
/* Loop exists but is not running - check if closed for cleanup */
6496+
PyObject *is_closed = PyObject_CallMethod(loop->py_loop, "is_closed", NULL);
6497+
if (is_closed != NULL) {
6498+
int closed = PyObject_IsTrue(is_closed);
6499+
Py_DECREF(is_closed);
6500+
if (closed) {
6501+
/* Loop is closed, clean up reference */
6502+
Py_DECREF(loop->py_loop);
6503+
loop->py_loop = NULL;
6504+
loop->py_loop_valid = false;
6505+
}
6506+
} else {
6507+
PyErr_Clear();
6508+
}
6509+
/* Not running, allow replacement */
6510+
Py_RETURN_FALSE;
6511+
}
6512+
Py_RETURN_FALSE;
64096513
}
64106514

64116515
/* Python function: _run_once_native_for(capsule, timeout_ms) -> [(callback_id, event_type), ...] */
@@ -7031,6 +7135,8 @@ static PyMethodDef PyEventLoopMethods[] = {
70317135
/* Handle-based API (takes explicit loop capsule) */
70327136
{"_loop_new", py_loop_new, METH_NOARGS, "Create a new event loop, returns capsule"},
70337137
{"_get_global_loop_capsule", py_get_global_loop_capsule, METH_NOARGS, "Get capsule for global event loop"},
7138+
{"_has_loop_ref", py_has_loop_ref, METH_VARARGS, "Check if loop capsule has Python loop reference"},
7139+
{"_clear_loop_ref", py_clear_loop_ref, METH_VARARGS, "Clear Python loop reference from C struct"},
70347140
{"_loop_destroy", py_loop_destroy, METH_VARARGS, "Destroy an event loop"},
70357141
{"_set_loop_ref", py_set_loop_ref, METH_VARARGS, "Store Python loop reference in C struct"},
70367142
{"_set_global_loop_ref", py_set_global_loop_ref, METH_VARARGS, "Store Python loop reference in global loop"},

priv/_erlang_impl/__init__.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
loop.run_until_complete(main())
4545
"""
4646

47+
import os
4748
import sys
4849
import asyncio
4950
import time
@@ -82,12 +83,60 @@
8283
'Channel',
8384
'reply',
8485
'ChannelClosed',
86+
'atom',
8587
]
8688

89+
# Atom caching with configurable limit to prevent BEAM atom table exhaustion.
90+
# The BEAM VM has a hard limit (~1M atoms) and crashes when exceeded.
91+
# This provides a Python-level safety valve well under that limit.
92+
_MAX_USER_ATOMS = int(os.environ.get('ERLANG_PYTHON_MAX_ATOMS', '10000'))
93+
_atom_cache = {}
94+
8795
# Re-export for uvloop API compatibility
8896
EventLoopPolicy = ErlangEventLoopPolicy
8997

9098

99+
def atom(name):
100+
"""Create an Erlang atom with safety limit.
101+
102+
Atoms in Erlang are permanent and the BEAM VM has a hard limit
103+
(~1M atoms). This function provides a Python-level cache with
104+
a configurable limit to prevent atom table exhaustion from
105+
untrusted Python code.
106+
107+
Args:
108+
name: The atom name as a string.
109+
110+
Returns:
111+
An ErlangAtom object that converts to an Erlang atom.
112+
113+
Raises:
114+
RuntimeError: If the atom limit is reached.
115+
116+
The limit can be configured via the ERLANG_PYTHON_MAX_ATOMS
117+
environment variable (default: 10000).
118+
119+
Example:
120+
>>> import erlang
121+
>>> ok = erlang.atom('ok')
122+
>>> error = erlang.atom('error')
123+
"""
124+
if name in _atom_cache:
125+
return _atom_cache[name]
126+
127+
if len(_atom_cache) >= _MAX_USER_ATOMS:
128+
raise RuntimeError(
129+
f"Atom limit ({_MAX_USER_ATOMS}) reached. "
130+
"Set ERLANG_PYTHON_MAX_ATOMS env var to increase."
131+
)
132+
133+
# Import erlang module to access internal _atom function
134+
import erlang as _erlang
135+
result = _erlang._atom(name)
136+
_atom_cache[name] = result
137+
return result
138+
139+
91140
def get_event_loop_policy() -> ErlangEventLoopPolicy:
92141
"""Get an Erlang event loop policy instance.
93142

priv/_erlang_impl/_loop.py

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -125,9 +125,20 @@ def __init__(self):
125125
try:
126126
self._loop_capsule = self._pel._get_global_loop_capsule()
127127
self._uses_global_capsule = True
128-
except RuntimeError:
128+
# Check if another loop already owns this capsule.
129+
# Only one ErlangEventLoop per interpreter is supported.
130+
if hasattr(self._pel, '_has_loop_ref') and self._pel._has_loop_ref(self._loop_capsule):
131+
raise RuntimeError(
132+
"An ErlangEventLoop already exists for this interpreter. "
133+
"Only one loop per interpreter is supported."
134+
)
135+
except RuntimeError as e:
136+
# Re-raise our "already exists" error
137+
if "already exists" in str(e):
138+
raise
129139
# Fall back to creating a new loop if global not available
130140
self._loop_capsule = self._pel._loop_new()
141+
self._uses_global_capsule = False
131142
else:
132143
self._loop_capsule = self._pel._loop_new()
133144

@@ -318,6 +329,15 @@ def close(self):
318329
self._default_executor.shutdown(wait=True)
319330
self._default_executor = None
320331

332+
# Clear loop ref to allow creating a new loop later.
333+
# This is important for the global capsule case where the capsule
334+
# persists but a new Python loop may be created.
335+
if self._loop_capsule is not None and hasattr(self._pel, '_clear_loop_ref'):
336+
try:
337+
self._pel._clear_loop_ref(self._loop_capsule)
338+
except Exception:
339+
pass
340+
321341
# Destroy loop capsule (but not if using shared global capsule)
322342
if not self._uses_global_capsule:
323343
try:
@@ -1306,15 +1326,16 @@ async def _run_and_send(coro, caller_pid, ref):
13061326
(async_result, ref, (ok, result)) - on success
13071327
(async_result, ref, (error, error_str)) - on failure
13081328
1309-
Note: Uses erlang.atom() to create atoms for message keys, since Python
1329+
Note: Uses cached atom() to create atoms for message keys, since Python
13101330
strings become Erlang binaries but the await function expects atoms.
13111331
"""
13121332
import erlang
1333+
from . import atom # Use cached version from _erlang_impl
13131334

13141335
# Create atoms for message keys (strings become binaries, await expects atoms)
1315-
async_result = erlang.atom('async_result')
1316-
ok = erlang.atom('ok')
1317-
error = erlang.atom('error')
1336+
async_result = atom('async_result')
1337+
ok = atom('ok')
1338+
error = atom('error')
13181339

13191340
try:
13201341
result = await coro

src/py.erl

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1040,18 +1040,32 @@ activate_venv_with_site_packages(VenvBin, SitePackages) ->
10401040
{ok, _} = eval(<<"setattr(__import__('sys'), '_active_venv', vp)">>, #{vp => VenvBin}),
10411041
{ok, _} = eval(<<"setattr(__import__('sys'), '_venv_site_packages', sp)">>, #{sp => SitePackages}),
10421042
%% Add site-packages and process .pth files (editable installs)
1043-
ok = exec(<<"import site as _site, sys as _sys\n"
1044-
"_b = frozenset(_sys.path)\n"
1045-
"_site.addsitedir(_sys._venv_site_packages)\n"
1046-
"_sys.path[:] = [p for p in _sys.path if p not in _b] + [p for p in _sys.path if p in _b]\n"
1047-
"del _site, _sys, _b\n">>),
1043+
%% Note: We embed the site-packages path directly since exec doesn't support
1044+
%% variables and sys attributes may not persist across calls in subinterpreters
1045+
SitePackagesStr = binary_to_list(SitePackages),
1046+
ExecCode = iolist_to_binary([
1047+
<<"import site as _site, sys as _sys\n">>,
1048+
<<"_sp = '">>, escape_python_string(SitePackagesStr), <<"'\n">>,
1049+
<<"_b = frozenset(_sys.path)\n">>,
1050+
<<"_site.addsitedir(_sp)\n">>,
1051+
<<"_sys.path[:] = [p for p in _sys.path if p not in _b] + [p for p in _sys.path if p in _b]\n">>,
1052+
<<"del _site, _sys, _b, _sp\n">>
1053+
]),
1054+
ok = exec(ExecCode),
10481055
ok;
10491056
{ok, false} ->
10501057
{error, {invalid_venv, SitePackages}};
10511058
Error ->
10521059
Error
10531060
end.
10541061

1062+
%% @private Escape a string for embedding in Python code
1063+
escape_python_string(Str) ->
1064+
lists:flatmap(fun($') -> "\\'";
1065+
($\\) -> "\\\\";
1066+
(C) -> [C]
1067+
end, Str).
1068+
10551069
%% @doc Deactivate the current virtual environment.
10561070
%% Restores sys.path to its original state.
10571071
-spec deactivate_venv() -> ok | {error, term()}.

0 commit comments

Comments
 (0)