Skip to content

Commit 655fe9f

Browse files
authored
Merge pull request #38 from benoitc/fix/channel-wait-capsule-name
Fix loop capsule name mismatch in channel wait functions
2 parents 6d12c7f + c686fdb commit 655fe9f

File tree

2 files changed

+65
-4
lines changed

2 files changed

+65
-4
lines changed

c_src/py_callback.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3062,7 +3062,7 @@ static PyObject *erlang_channel_wait_impl(PyObject *self, PyObject *args) {
30623062
return NULL;
30633063
}
30643064

3065-
erlang_event_loop_t *loop = (erlang_event_loop_t *)PyCapsule_GetPointer(loop_capsule, "erlang.event_loop");
3065+
erlang_event_loop_t *loop = (erlang_event_loop_t *)PyCapsule_GetPointer(loop_capsule, "erlang_python.event_loop");
30663066
if (loop == NULL) {
30673067
PyErr_SetString(PyExc_ValueError, "invalid loop reference");
30683068
return NULL;
@@ -3220,7 +3220,7 @@ static PyObject *erlang_byte_channel_wait_impl(PyObject *self, PyObject *args) {
32203220
return NULL;
32213221
}
32223222

3223-
erlang_event_loop_t *loop = (erlang_event_loop_t *)PyCapsule_GetPointer(loop_capsule, "erlang.event_loop");
3223+
erlang_event_loop_t *loop = (erlang_event_loop_t *)PyCapsule_GetPointer(loop_capsule, "erlang_python.event_loop");
32243224
if (loop == NULL) {
32253225
PyErr_SetString(PyExc_ValueError, "invalid loop reference");
32263226
return NULL;

test/py_byte_channel_SUITE.erl

Lines changed: 63 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,9 @@
3535
%% Large payload test
3636
large_payload_bytes_test/1,
3737
%% Async event loop dispatch test
38-
async_receive_bytes_e2e_test/1
38+
async_receive_bytes_e2e_test/1,
39+
%% create_task + async receive test
40+
create_task_async_receive_test/1
3941
]).
4042

4143
all() -> [
@@ -60,7 +62,9 @@ all() -> [
6062
%% Large payload test
6163
large_payload_bytes_test,
6264
%% Async event loop dispatch test
63-
async_receive_bytes_e2e_test
65+
async_receive_bytes_e2e_test,
66+
%% create_task + async receive test
67+
create_task_async_receive_test
6468
].
6569

6670
init_per_suite(Config) ->
@@ -367,3 +371,60 @@ async_receive_bytes_e2e_test(_Config) ->
367371
ct:pal("Async receive via erlang.run() OK"),
368372

369373
ok = py_byte_channel:close(Ch).
374+
375+
%%% ============================================================================
376+
%%% create_task + Async ByteChannel Test
377+
%%% ============================================================================
378+
379+
%% @doc Test async_receive_bytes works correctly with py_event_loop:create_task
380+
%% This verifies the loop capsule name is correct (erlang_python.event_loop)
381+
create_task_async_receive_test(_Config) ->
382+
{ok, Ch} = py_byte_channel:new(),
383+
384+
%% First send data so it's available when task starts
385+
ok = py_byte_channel:send(Ch, <<"create_task_bytes">>),
386+
387+
%% Define and run the task - env reuse should make __main__ functions visible
388+
Ctx = py:context(1),
389+
ok = py:exec(Ctx, <<"
390+
import erlang
391+
from erlang import ByteChannel
392+
393+
async def task_receive_bytes(ch_ref, reply_pid):
394+
'''Task that receives bytes and sends result back to Erlang.'''
395+
try:
396+
ch = ByteChannel(ch_ref)
397+
data = await ch.async_receive_bytes()
398+
erlang.send(reply_pid, ('result', data))
399+
except Exception as e:
400+
erlang.send(reply_pid, ('error', str(e)))
401+
">>),
402+
403+
%% Create a task that will await on channel receive
404+
%% create_task/3 uses the global event loop internally
405+
TaskRef = py_event_loop:create_task(
406+
"__main__", "task_receive_bytes", [Ch, self()]),
407+
408+
%% Wait for result from the task
409+
%% Note: Python tuple ('result', data) becomes {<<"result">>, data} in Erlang
410+
receive
411+
{<<"result">>, <<"create_task_bytes">>} ->
412+
ct:pal("create_task + async_receive_bytes OK");
413+
{<<"error">>, ErrMsg} ->
414+
ct:pal("Task error: ~p", [ErrMsg]),
415+
ct:fail({task_error, ErrMsg});
416+
Other ->
417+
ct:pal("Unexpected message: ~p", [Other]),
418+
ct:fail({unexpected_message, Other})
419+
after 5000 ->
420+
ct:fail("Timeout waiting for task result")
421+
end,
422+
423+
%% Wait for task to complete
424+
case py_event_loop:await(TaskRef, 5000) of
425+
{ok, _} -> ok;
426+
{error, AwaitErr} ->
427+
ct:pal("Await error: ~p", [AwaitErr])
428+
end,
429+
430+
ok = py_byte_channel:close(Ch).

0 commit comments

Comments
 (0)