Skip to content

Commit 1e358ad

Browse files
committed
Integrate extended events into Python asyncio event loop
Python ErlangEventLoop now handles all extended event types: - EVENT_TYPE_CALL_RESULT (10), EVENT_TYPE_CALL_ERROR (11) - EVENT_TYPE_ASYNC_RESULT (12), EVENT_TYPE_ASYNC_ERROR (13) - EVENT_TYPE_SUBPROCESS_EXIT (20), STDOUT (21), STDERR (22) - EVENT_TYPE_SOCKET_DATA (30), CLOSED (31), ERROR (32) Added registration methods for protocols/futures: - _register_call_future, _register_async_future - _register_subprocess, _register_socket - _next_callback_id for generating unique IDs Extended events are 3-tuples (callback_id, type, data) vs 2-tuples.
1 parent 8384988 commit 1e358ad

File tree

1 file changed

+131
-3
lines changed

1 file changed

+131
-3
lines changed

priv/erlang_loop.py

Lines changed: 131 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,18 @@ async def main():
4949
EVENT_TYPE_WRITE = 2
5050
EVENT_TYPE_TIMER = 3
5151

52+
# Extended event types for unified event processing
53+
EVENT_TYPE_CALL_RESULT = 10
54+
EVENT_TYPE_CALL_ERROR = 11
55+
EVENT_TYPE_ASYNC_RESULT = 12
56+
EVENT_TYPE_ASYNC_ERROR = 13
57+
EVENT_TYPE_SUBPROCESS_EXIT = 20
58+
EVENT_TYPE_SUBPROCESS_STDOUT = 21
59+
EVENT_TYPE_SUBPROCESS_STDERR = 22
60+
EVENT_TYPE_SOCKET_DATA = 30
61+
EVENT_TYPE_SOCKET_CLOSED = 31
62+
EVENT_TYPE_SOCKET_ERROR = 32
63+
5264
# Try to import selector_events for transport classes
5365
try:
5466
from asyncio import selector_events
@@ -88,6 +100,11 @@ class ErlangEventLoop(asyncio.AbstractEventLoop):
88100
'_debug', '_task_factory', '_default_executor',
89101
# Cached method references for hot paths
90102
'_ready_append', '_ready_popleft',
103+
# Extended event callbacks
104+
'_call_futures', # callback_id -> Future for call results
105+
'_async_futures', # callback_id -> Future for async results
106+
'_subprocess_cbs', # callback_id -> (protocol, transport)
107+
'_socket_cbs', # callback_id -> (protocol, transport)
91108
)
92109

93110
def __init__(self, isolated=False):
@@ -133,6 +150,12 @@ def __init__(self, isolated=False):
133150
self._ready = deque() # Callbacks ready to run
134151
self._callback_id = 0
135152

153+
# Extended event callbacks
154+
self._call_futures = {} # callback_id -> Future for call results
155+
self._async_futures = {} # callback_id -> Future for async results
156+
self._subprocess_cbs = {} # callback_id -> (protocol, transport)
157+
self._socket_cbs = {} # callback_id -> (protocol, transport)
158+
136159
# Cache deque methods for hot path (avoids attribute lookup)
137160
self._ready_append = self._ready.append
138161
self._ready_popleft = self._ready.popleft
@@ -889,8 +912,13 @@ def _run_once(self):
889912
else:
890913
pending = self._pel._run_once_native(timeout)
891914
dispatch = self._dispatch
892-
for callback_id, event_type in pending:
893-
dispatch(callback_id, event_type)
915+
for event in pending:
916+
# Extended events are 3-tuples (callback_id, event_type, data)
917+
# Simple events are 2-tuples (callback_id, event_type)
918+
if len(event) == 3:
919+
dispatch(event[0], event[1], event[2])
920+
else:
921+
dispatch(event[0], event[1])
894922
except AttributeError:
895923
# Fallback for old NIF without _run_once_native
896924
try:
@@ -909,12 +937,14 @@ def _run_once(self):
909937
# Fail fast on initialization errors - don't silently hang
910938
raise RuntimeError(f"Event loop poll failed: {e}") from e
911939

912-
def _dispatch(self, callback_id, event_type):
940+
def _dispatch(self, callback_id, event_type, data=None):
913941
"""Dispatch a callback based on event type.
914942
915943
Uses O(1) reverse map lookup for fd events instead of O(n) iteration.
916944
Event types are integers for fast comparison (no string allocation).
917945
Inlined lookup: dict.get(None) returns None, so single expression is safe.
946+
947+
Extended events (10+) carry data and resolve futures or call protocols.
918948
"""
919949
# Integer comparison is faster than string - NIF returns integers
920950
if event_type == 1: # EVENT_TYPE_READ
@@ -933,6 +963,63 @@ def _dispatch(self, callback_id, event_type):
933963
self._handle_to_callback_id.pop(id(handle), None) # Clean up reverse map
934964
if not handle._cancelled:
935965
self._ready_append(handle)
966+
# Extended events - call results
967+
elif event_type == 10: # EVENT_TYPE_CALL_RESULT
968+
fut = self._call_futures.pop(callback_id, None)
969+
if fut is not None and not fut.done():
970+
fut.set_result(data)
971+
elif event_type == 11: # EVENT_TYPE_CALL_ERROR
972+
fut = self._call_futures.pop(callback_id, None)
973+
if fut is not None and not fut.done():
974+
fut.set_exception(Exception(data) if isinstance(data, str) else data)
975+
# Extended events - async results
976+
elif event_type == 12: # EVENT_TYPE_ASYNC_RESULT
977+
fut = self._async_futures.pop(callback_id, None)
978+
if fut is not None and not fut.done():
979+
fut.set_result(data)
980+
elif event_type == 13: # EVENT_TYPE_ASYNC_ERROR
981+
fut = self._async_futures.pop(callback_id, None)
982+
if fut is not None and not fut.done():
983+
fut.set_exception(Exception(data) if isinstance(data, str) else data)
984+
# Extended events - subprocess
985+
elif event_type == 20: # EVENT_TYPE_SUBPROCESS_EXIT
986+
entry = self._subprocess_cbs.pop(callback_id, None)
987+
if entry is not None:
988+
protocol, transport = entry
989+
self._ready_append(self._get_handle(
990+
protocol.process_exited, ()))
991+
elif event_type == 21: # EVENT_TYPE_SUBPROCESS_STDOUT
992+
entry = self._subprocess_cbs.get(callback_id)
993+
if entry is not None:
994+
protocol, transport = entry
995+
self._ready_append(self._get_handle(
996+
protocol.pipe_data_received, (1, data))) # stdout = fd 1
997+
elif event_type == 22: # EVENT_TYPE_SUBPROCESS_STDERR
998+
entry = self._subprocess_cbs.get(callback_id)
999+
if entry is not None:
1000+
protocol, transport = entry
1001+
self._ready_append(self._get_handle(
1002+
protocol.pipe_data_received, (2, data))) # stderr = fd 2
1003+
# Extended events - socket/network
1004+
elif event_type == 30: # EVENT_TYPE_SOCKET_DATA
1005+
entry = self._socket_cbs.get(callback_id)
1006+
if entry is not None:
1007+
protocol, transport = entry
1008+
self._ready_append(self._get_handle(
1009+
protocol.data_received, (data,)))
1010+
elif event_type == 31: # EVENT_TYPE_SOCKET_CLOSED
1011+
entry = self._socket_cbs.pop(callback_id, None)
1012+
if entry is not None:
1013+
protocol, transport = entry
1014+
self._ready_append(self._get_handle(
1015+
protocol.connection_lost, (None,)))
1016+
elif event_type == 32: # EVENT_TYPE_SOCKET_ERROR
1017+
entry = self._socket_cbs.pop(callback_id, None)
1018+
if entry is not None:
1019+
protocol, transport = entry
1020+
exc = Exception(data) if isinstance(data, str) else data
1021+
self._ready_append(self._get_handle(
1022+
protocol.connection_lost, (exc,)))
9361023

9371024
def _check_closed(self):
9381025
"""Raise an error if the loop is closed."""
@@ -960,6 +1047,47 @@ def _timer_handle_cancelled(self, handle):
9601047
except (AttributeError, RuntimeError):
9611048
pass
9621049

1050+
# ========================================================================
1051+
# Extended event registration (call, async, subprocess, socket)
1052+
# ========================================================================
1053+
1054+
def _register_call_future(self, callback_id, future):
1055+
"""Register a Future for receiving call_result/call_error events."""
1056+
self._call_futures[callback_id] = future
1057+
1058+
def _unregister_call_future(self, callback_id):
1059+
"""Unregister a call future."""
1060+
self._call_futures.pop(callback_id, None)
1061+
1062+
def _register_async_future(self, callback_id, future):
1063+
"""Register a Future for receiving async_result/async_error events."""
1064+
self._async_futures[callback_id] = future
1065+
1066+
def _unregister_async_future(self, callback_id):
1067+
"""Unregister an async future."""
1068+
self._async_futures.pop(callback_id, None)
1069+
1070+
def _register_subprocess(self, callback_id, protocol, transport):
1071+
"""Register a subprocess protocol for receiving subprocess events."""
1072+
self._subprocess_cbs[callback_id] = (protocol, transport)
1073+
1074+
def _unregister_subprocess(self, callback_id):
1075+
"""Unregister a subprocess."""
1076+
self._subprocess_cbs.pop(callback_id, None)
1077+
1078+
def _register_socket(self, callback_id, protocol, transport):
1079+
"""Register a socket protocol for receiving socket events."""
1080+
self._socket_cbs[callback_id] = (protocol, transport)
1081+
1082+
def _unregister_socket(self, callback_id):
1083+
"""Unregister a socket."""
1084+
self._socket_cbs.pop(callback_id, None)
1085+
1086+
def _next_callback_id(self):
1087+
"""Generate the next unique callback ID."""
1088+
self._callback_id += 1
1089+
return self._callback_id
1090+
9631091
def _set_coroutine_origin_tracking(self, enabled):
9641092
"""Enable/disable coroutine origin tracking."""
9651093
if enabled:

0 commit comments

Comments
 (0)