Skip to content

Commit 2576501

Browse files
committed
Add close+drain tests for Channel and ByteChannel
Verify that closing a channel allows draining remaining data before ChannelClosed is raised. Tests cover Erlang-side, Python sync iteration, and Python async iteration for both Channel and ByteChannel.
1 parent 655fe9f commit 2576501

File tree

2 files changed

+208
-4
lines changed

2 files changed

+208
-4
lines changed

test/py_byte_channel_SUITE.erl

Lines changed: 104 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,11 @@
3737
%% Async event loop dispatch test
3838
async_receive_bytes_e2e_test/1,
3939
%% create_task + async receive test
40-
create_task_async_receive_test/1
40+
create_task_async_receive_test/1,
41+
%% Close and drain tests
42+
close_drain_bytes_erlang_test/1,
43+
close_drain_bytes_python_sync_test/1,
44+
close_drain_bytes_python_async_test/1
4145
]).
4246

4347
all() -> [
@@ -64,7 +68,11 @@ all() -> [
6468
%% Async event loop dispatch test
6569
async_receive_bytes_e2e_test,
6670
%% create_task + async receive test
67-
create_task_async_receive_test
71+
create_task_async_receive_test,
72+
%% Close and drain tests
73+
close_drain_bytes_erlang_test,
74+
close_drain_bytes_python_sync_test,
75+
close_drain_bytes_python_async_test
6876
].
6977

7078
init_per_suite(Config) ->
@@ -428,3 +436,97 @@ async def task_receive_bytes(ch_ref, reply_pid):
428436
end,
429437

430438
ok = py_byte_channel:close(Ch).
439+
440+
%%% ============================================================================
441+
%%% Close and Drain Tests
442+
%%% ============================================================================
443+
444+
%% @doc Test that data can be drained from byte channel after close (Erlang side)
445+
%% Verifies that closing a channel doesn't prevent reading existing data
446+
close_drain_bytes_erlang_test(_Config) ->
447+
{ok, Ch} = py_byte_channel:new(),
448+
449+
%% Send multiple messages
450+
ok = py_byte_channel:send(Ch, <<"chunk1">>),
451+
ok = py_byte_channel:send(Ch, <<"chunk2">>),
452+
ok = py_byte_channel:send(Ch, <<"chunk3">>),
453+
454+
%% Close the channel while data is still in queue
455+
ok = py_byte_channel:close(Ch),
456+
457+
%% Verify channel is marked as closed
458+
Info = py_byte_channel:info(Ch),
459+
true = maps:get(closed, Info),
460+
461+
%% Should still be able to drain all data
462+
{ok, <<"chunk1">>} = py_byte_channel:try_receive(Ch),
463+
{ok, <<"chunk2">>} = py_byte_channel:try_receive(Ch),
464+
{ok, <<"chunk3">>} = py_byte_channel:try_receive(Ch),
465+
466+
%% Only after draining should we get closed error
467+
{error, closed} = py_byte_channel:try_receive(Ch),
468+
469+
ct:pal("Erlang byte channel close+drain test passed").
470+
471+
%% @doc Test that Python can drain data from closed byte channel (sync iteration)
472+
close_drain_bytes_python_sync_test(_Config) ->
473+
{ok, Ch} = py_byte_channel:new(),
474+
475+
%% Send multiple messages
476+
ok = py_byte_channel:send(Ch, <<"data1">>),
477+
ok = py_byte_channel:send(Ch, <<"data2">>),
478+
ok = py_byte_channel:send(Ch, <<"data3">>),
479+
480+
%% Close the channel
481+
ok = py_byte_channel:close(Ch),
482+
483+
%% Python should be able to drain all messages via iteration
484+
Ctx = py:context(1),
485+
ok = py:exec(Ctx, <<"
486+
from erlang import ByteChannel, ByteChannelClosed
487+
488+
def drain_byte_channel(ch_ref):
489+
'''Drain all chunks from byte channel, return as list.'''
490+
ch = ByteChannel(ch_ref)
491+
chunks = []
492+
for chunk in ch:
493+
chunks.append(chunk)
494+
return chunks
495+
">>),
496+
497+
{ok, [<<"data1">>, <<"data2">>, <<"data3">>]} =
498+
py:eval(Ctx, <<"drain_byte_channel(ch)">>, #{<<"ch">> => Ch}),
499+
500+
ct:pal("Python sync byte channel close+drain test passed").
501+
502+
%% @doc Test that Python can drain data from closed byte channel (async iteration)
503+
close_drain_bytes_python_async_test(_Config) ->
504+
{ok, Ch} = py_byte_channel:new(),
505+
506+
%% Send multiple messages
507+
ok = py_byte_channel:send(Ch, <<"part1">>),
508+
ok = py_byte_channel:send(Ch, <<"part2">>),
509+
ok = py_byte_channel:send(Ch, <<"part3">>),
510+
511+
%% Close the channel
512+
ok = py_byte_channel:close(Ch),
513+
514+
%% Python should be able to drain all messages via async iteration
515+
Ctx = py:context(1),
516+
ok = py:exec(Ctx, <<"
517+
import erlang
518+
from erlang import ByteChannel, ByteChannelClosed
519+
520+
async def async_drain_byte_channel(ch_ref):
521+
'''Async drain all chunks from byte channel, return as list.'''
522+
ch = ByteChannel(ch_ref)
523+
chunks = []
524+
async for chunk in ch:
525+
chunks.append(chunk)
526+
return chunks
527+
">>),
528+
529+
{ok, [<<"part1">>, <<"part2">>, <<"part3">>]} =
530+
py:eval(Ctx, <<"erlang.run(async_drain_byte_channel(ch))">>, #{<<"ch">> => Ch}),
531+
532+
ct:pal("Python async byte channel close+drain test passed").

test/py_channel_SUITE.erl

Lines changed: 104 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,11 @@
4343
%% Async receive with actual waiting
4444
async_receive_wait_e2e_test/1,
4545
%% Subinterpreter mode tests
46-
subinterp_sync_receive_wait_test/1
46+
subinterp_sync_receive_wait_test/1,
47+
%% Close and drain tests
48+
close_drain_erlang_test/1,
49+
close_drain_python_sync_test/1,
50+
close_drain_python_async_test/1
4751
]).
4852

4953
all() -> [
@@ -76,7 +80,11 @@ all() -> [
7680
%% Async receive with actual waiting
7781
async_receive_wait_e2e_test,
7882
%% Subinterpreter mode tests
79-
subinterp_sync_receive_wait_test
83+
subinterp_sync_receive_wait_test,
84+
%% Close and drain tests
85+
close_drain_erlang_test,
86+
close_drain_python_sync_test,
87+
close_drain_python_async_test
8088
].
8189

8290
init_per_suite(Config) ->
@@ -605,3 +613,97 @@ def test_closed(ch_ref):
605613
ct:pal("Subinterp closed channel detected OK"),
606614

607615
py_context:stop(CtxPid).
616+
617+
%%% ============================================================================
618+
%%% Close and Drain Tests
619+
%%% ============================================================================
620+
621+
%% @doc Test that data can be drained from channel after close (Erlang side)
622+
%% Verifies that closing a channel doesn't prevent reading existing data
623+
close_drain_erlang_test(_Config) ->
624+
{ok, Ch} = py_channel:new(),
625+
626+
%% Send multiple messages
627+
ok = py_channel:send(Ch, <<"msg1">>),
628+
ok = py_channel:send(Ch, <<"msg2">>),
629+
ok = py_channel:send(Ch, <<"msg3">>),
630+
631+
%% Close the channel while data is still in queue
632+
ok = py_channel:close(Ch),
633+
634+
%% Verify channel is marked as closed
635+
Info = py_channel:info(Ch),
636+
true = maps:get(closed, Info),
637+
638+
%% Should still be able to drain all data
639+
{ok, <<"msg1">>} = py_nif:channel_try_receive(Ch),
640+
{ok, <<"msg2">>} = py_nif:channel_try_receive(Ch),
641+
{ok, <<"msg3">>} = py_nif:channel_try_receive(Ch),
642+
643+
%% Only after draining should we get closed error
644+
{error, closed} = py_nif:channel_try_receive(Ch),
645+
646+
ct:pal("Erlang close+drain test passed").
647+
648+
%% @doc Test that Python can drain data from closed channel (sync iteration)
649+
close_drain_python_sync_test(_Config) ->
650+
{ok, Ch} = py_channel:new(),
651+
652+
%% Send multiple messages
653+
ok = py_channel:send(Ch, <<"first">>),
654+
ok = py_channel:send(Ch, <<"second">>),
655+
ok = py_channel:send(Ch, <<"third">>),
656+
657+
%% Close the channel
658+
ok = py_channel:close(Ch),
659+
660+
%% Python should be able to drain all messages via iteration
661+
Ctx = py:context(1),
662+
ok = py:exec(Ctx, <<"
663+
from erlang import Channel, ChannelClosed
664+
665+
def drain_channel(ch_ref):
666+
'''Drain all messages from channel, return as list.'''
667+
ch = Channel(ch_ref)
668+
messages = []
669+
for msg in ch:
670+
messages.append(msg)
671+
return messages
672+
">>),
673+
674+
{ok, [<<"first">>, <<"second">>, <<"third">>]} =
675+
py:eval(Ctx, <<"drain_channel(ch)">>, #{<<"ch">> => Ch}),
676+
677+
ct:pal("Python sync close+drain test passed").
678+
679+
%% @doc Test that Python can drain data from closed channel (async iteration)
680+
close_drain_python_async_test(_Config) ->
681+
{ok, Ch} = py_channel:new(),
682+
683+
%% Send multiple messages
684+
ok = py_channel:send(Ch, <<"alpha">>),
685+
ok = py_channel:send(Ch, <<"beta">>),
686+
ok = py_channel:send(Ch, <<"gamma">>),
687+
688+
%% Close the channel
689+
ok = py_channel:close(Ch),
690+
691+
%% Python should be able to drain all messages via async iteration
692+
Ctx = py:context(1),
693+
ok = py:exec(Ctx, <<"
694+
import erlang
695+
from erlang import Channel, ChannelClosed
696+
697+
async def async_drain_channel(ch_ref):
698+
'''Async drain all messages from channel, return as list.'''
699+
ch = Channel(ch_ref)
700+
messages = []
701+
async for msg in ch:
702+
messages.append(msg)
703+
return messages
704+
">>),
705+
706+
{ok, [<<"alpha">>, <<"beta">>, <<"gamma">>]} =
707+
py:eval(Ctx, <<"erlang.run(async_drain_channel(ch))">>, #{<<"ch">> => Ch}),
708+
709+
ct:pal("Python async close+drain test passed").

0 commit comments

Comments
 (0)