Skip to content

Commit 7efb253

Browse files
committed
Simplify async_receive with polling, remove erlang.call() overhead
1 parent b55c3db commit 7efb253

File tree

1 file changed

+9
-74
lines changed

1 file changed

+9
-74
lines changed

priv/_erlang_impl/_channel.py

Lines changed: 9 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -160,9 +160,8 @@ async def async_receive(self):
160160
msg = await channel.async_receive()
161161
"""
162162
import asyncio
163-
import erlang
164163

165-
# Try non-blocking first
164+
# Try non-blocking first (direct NIF - fast)
166165
result = self.try_receive()
167166
if result is not None:
168167
return result
@@ -171,78 +170,14 @@ async def async_receive(self):
171170
if self._is_closed():
172171
raise ChannelClosed("Channel has been closed")
173172

174-
# Get the running event loop
175-
loop = asyncio.get_running_loop()
176-
future = loop.create_future()
177-
178-
# Get callback_id from the event loop
179-
callback_id = loop._next_id()
180-
181-
# Get the loop capsule for registration with Erlang
182-
loop_capsule = loop._loop_capsule
183-
184-
# Track this channel receive in the timers dict (reuses timer dispatch)
185-
# We store a tuple (channel, future) so we can resolve it
186-
self._pending_async = (callback_id, future)
187-
188-
# Create a handle that will resolve the future when dispatched
189-
from asyncio import events
190-
191-
def _resolve_callback():
192-
if not future.done():
193-
# Data should be available now, fetch it
194-
try:
195-
result = self.try_receive()
196-
if result is not None:
197-
future.set_result(result)
198-
elif self._is_closed():
199-
future.set_exception(ChannelClosed("Channel has been closed"))
200-
else:
201-
# Rare edge case: woken but no data. Re-register.
202-
future.set_exception(ChannelClosed("Channel state error"))
203-
except Exception as e:
204-
future.set_exception(e)
205-
206-
handle = events.Handle(_resolve_callback, (), loop)
207-
loop._timers[callback_id] = handle
208-
loop._handle_to_callback_id[id(handle)] = callback_id
209-
210-
# Tell Erlang to dispatch callback_id when data arrives
211-
try:
212-
wait_result = erlang.call('_py_channel_wait', self._ref, callback_id, loop_capsule)
213-
214-
# If data was immediately available, Erlang returns {ok, Data}
215-
if isinstance(wait_result, tuple) and len(wait_result) == 2:
216-
status, value = wait_result
217-
if status == 'ok':
218-
# Data returned immediately - cancel the waiter and return
219-
loop._timers.pop(callback_id, None)
220-
loop._handle_to_callback_id.pop(id(handle), None)
221-
return value
222-
elif status == 'error':
223-
loop._timers.pop(callback_id, None)
224-
loop._handle_to_callback_id.pop(id(handle), None)
225-
if value == 'closed':
226-
raise ChannelClosed("Channel has been closed")
227-
else:
228-
raise RuntimeError(f"Channel wait error: {value}")
229-
except Exception as e:
230-
# Cleanup on error
231-
loop._timers.pop(callback_id, None)
232-
loop._handle_to_callback_id.pop(id(handle), None)
233-
raise
234-
235-
try:
236-
return await future
237-
finally:
238-
# Cleanup on cancel or completion
239-
loop._timers.pop(callback_id, None)
240-
loop._handle_to_callback_id.pop(id(handle), None)
241-
# Cancel the waiter in Erlang if still registered
242-
try:
243-
erlang.call('_py_channel_cancel_wait', self._ref, callback_id)
244-
except Exception:
245-
pass
173+
# Poll with short sleeps, yielding to other coroutines
174+
while True:
175+
await asyncio.sleep(0.0001) # 100µs yield to event loop
176+
result = self.try_receive()
177+
if result is not None:
178+
return result
179+
if self._is_closed():
180+
raise ChannelClosed("Channel has been closed")
246181

247182
def __aiter__(self):
248183
"""Return async iterator for the channel.

0 commit comments

Comments
 (0)