-
-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy path__init__.py
More file actions
292 lines (242 loc) · 8.31 KB
/
__init__.py
File metadata and controls
292 lines (242 loc) · 8.31 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
# Copyright 2026 Benoit Chesneau
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Erlang-backed asyncio event loop - uvloop-compatible API.
This module provides a drop-in replacement for uvloop, using Erlang's
BEAM VM scheduler for I/O multiplexing via enif_select.
Usage patterns (matching uvloop exactly):
# Pattern 1: Recommended (Python 3.11+)
import erlang
erlang.run(main())
# Pattern 2: With asyncio.Runner (Python 3.11+)
import asyncio
import erlang
with asyncio.Runner(loop_factory=erlang.new_event_loop) as runner:
runner.run(main())
# Pattern 3: Legacy (deprecated in 3.12+)
import asyncio
import erlang
erlang.install()
asyncio.run(main())
# Pattern 4: Manual
import asyncio
import erlang
loop = erlang.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(main())
"""
import sys
import asyncio
import warnings
# Install sandbox when running inside Erlang VM
# This must happen before any other imports to block subprocess/fork
try:
import py_event_loop # Only available when running in Erlang NIF
from ._sandbox import install_sandbox
install_sandbox()
except ImportError:
pass # Not running inside Erlang VM
from ._loop import ErlangEventLoop
from ._policy import ErlangEventLoopPolicy
from ._mode import detect_mode, ExecutionMode
from . import _reactor as reactor
from . import _channel as channel
from ._channel import Channel, reply, ChannelClosed
__all__ = [
'run',
'spawn_task',
'new_event_loop',
'get_event_loop_policy',
'install',
'EventLoopPolicy',
'ErlangEventLoopPolicy',
'ErlangEventLoop',
'detect_mode',
'ExecutionMode',
'reactor',
'channel',
'Channel',
'reply',
'ChannelClosed',
]
# Re-export for uvloop API compatibility
EventLoopPolicy = ErlangEventLoopPolicy
def get_event_loop_policy() -> ErlangEventLoopPolicy:
"""Get an Erlang event loop policy instance.
Returns a policy that uses ErlangEventLoop for event loops.
This is used by Erlang code to set the default asyncio policy.
Returns:
ErlangEventLoopPolicy: A new policy instance.
"""
return ErlangEventLoopPolicy()
def new_event_loop() -> ErlangEventLoop:
"""Create a new Erlang-backed event loop.
Returns:
ErlangEventLoop: A new event loop instance backed by Erlang's
scheduler via enif_select. Each loop has its own isolated
capsule for proper timer and FD event routing.
"""
return ErlangEventLoop()
def run(main, *, debug=None, **run_kwargs):
"""Run a coroutine using Erlang event loop.
The preferred way to run async code with Erlang backend.
Equivalent to uvloop.run().
Args:
main: The coroutine to run.
debug: Enable debug mode if True.
**run_kwargs: Additional arguments passed to asyncio.run() or Runner.
Returns:
The return value of the coroutine.
Example:
import erlang
async def main():
await asyncio.sleep(1)
return "done"
result = erlang.run(main())
"""
if sys.version_info >= (3, 12):
# Python 3.12+ supports loop_factory in asyncio.run()
return asyncio.run(
main,
loop_factory=new_event_loop,
debug=debug,
**run_kwargs
)
elif sys.version_info >= (3, 11):
# Python 3.11 has asyncio.Runner with loop_factory
with asyncio.Runner(loop_factory=new_event_loop, debug=debug) as runner:
return runner.run(main)
else:
# Python 3.10 and earlier: manual loop management
loop = new_event_loop()
if debug is not None:
loop.set_debug(debug)
try:
asyncio.set_event_loop(loop)
return loop.run_until_complete(main)
finally:
try:
_cancel_all_tasks(loop)
loop.run_until_complete(loop.shutdown_asyncgens())
if hasattr(loop, 'shutdown_default_executor'):
loop.run_until_complete(loop.shutdown_default_executor())
finally:
asyncio.set_event_loop(None)
loop.close()
def spawn_task(coro, *, name=None):
"""Spawn an async task, working in both async and sync contexts.
This function creates and schedules a task on the event loop, with
automatic wakeup for Erlang-driven loops where the loop may not be
actively polling.
Args:
coro: The coroutine to run as a task.
name: Optional name for the task (Python 3.8+).
Returns:
asyncio.Task: The created task. Can be ignored (fire-and-forget)
or awaited/cancelled if needed.
Raises:
RuntimeError: If no event loop is available or the loop is closed.
Example:
# From sync code called by Erlang
def handle_request(data):
erlang.spawn_task(process_async(data))
return 'ok'
# From async code
async def handler():
erlang.spawn_task(background_work())
await other_work()
"""
# Try to get the running loop first (works in async context)
try:
loop = asyncio.get_running_loop()
# In async context, just create_task directly
if name is not None:
return loop.create_task(coro, name=name)
else:
return loop.create_task(coro)
except RuntimeError:
pass
# Sync context: get the event loop
try:
loop = asyncio.get_event_loop()
except RuntimeError:
coro.close() # Prevent "coroutine was never awaited" warning
raise RuntimeError(
"No event loop available. Ensure erlang is initialized or "
"call from within an async context."
)
if loop.is_closed():
coro.close()
raise RuntimeError("Event loop is closed")
# Create the task
try:
if name is not None:
task = loop.create_task(coro, name=name)
else:
task = loop.create_task(coro)
except Exception:
coro.close()
raise
# Wake up the event loop to process the task
# This is critical for sync context - without wakeup, the task
# waits until the next event/timeout
if hasattr(loop, '_pel') and hasattr(loop, '_loop_capsule'):
# ErlangEventLoop - use native wakeup
try:
loop._pel._wakeup_for(loop._loop_capsule)
except Exception:
pass
elif hasattr(loop, '_write_to_self'):
# Standard asyncio loop - use self-pipe trick
try:
loop._write_to_self()
except Exception:
pass
return task
def install():
"""Install ErlangEventLoopPolicy as the default event loop policy.
This function is deprecated in Python 3.12+. Use run() instead.
Example (legacy pattern):
import asyncio
import erlang
erlang.install()
asyncio.run(main()) # Uses Erlang event loop
"""
if sys.version_info >= (3, 12):
warnings.warn(
"erlang.install() is deprecated in Python 3.12+. "
"Use erlang.run(main()) instead.",
DeprecationWarning,
stacklevel=2
)
asyncio.set_event_loop_policy(ErlangEventLoopPolicy())
def _cancel_all_tasks(loop):
"""Cancel all tasks in the loop (helper for run())."""
to_cancel = asyncio.all_tasks(loop)
if not to_cancel:
return
for task in to_cancel:
task.cancel()
loop.run_until_complete(
asyncio.gather(*to_cancel, return_exceptions=True)
)
for task in to_cancel:
if task.cancelled():
continue
if task.exception() is not None:
loop.call_exception_handler({
'message': 'unhandled exception during erlang.run() shutdown',
'exception': task.exception(),
'task': task,
})