@@ -501,8 +501,7 @@ async def receive_from_channel(ch_ref):
501501% %% ============================================================================
502502
503503% % @doc Test sync blocking receive works with subinterpreter mode contexts
504- % % This verifies that the blocking receive mechanism works correctly when
505- % % Python runs in a subinterpreter (Python 3.12+)
504+ % % This is a true e2e test: Python Channel.receive() blocks until Erlang sends data
506505subinterp_sync_receive_wait_test (_Config ) ->
507506 case py_nif :subinterp_supported () of
508507 false ->
@@ -516,35 +515,48 @@ do_subinterp_sync_receive_wait_test() ->
516515 Self = self (),
517516
518517 % % Create a context explicitly in subinterp mode
519- {ok , CtxPid } = py_context :start_link (#{mode => subinterp }),
518+ CtxId = erlang :unique_integer ([positive ]),
519+ {ok , CtxPid } = py_context :start_link (CtxId , subinterp ),
520520
521521 % % Import Channel class in the subinterp context
522522 ok = py_context :exec (CtxPid , <<" from erlang import Channel" >>),
523523
524- % % Spawn a process to do blocking receive via subinterp context
525- _Receiver = spawn_link (fun () ->
526- % % Use the subinterp context for receive - this calls ch.receive()
527- % % which internally uses erlang.call('_py_channel_receive', ch_ref)
528- % % and blocks using the sync waiter mechanism
529- Result = py_context :eval (CtxPid , <<" Channel(ch).receive()" >>,
530- #{<<" ch" >> => Ch }),
531- Self ! {subinterp_result , Result }
524+ % % Test 1: Immediate receive with data already available
525+ ok = py_channel :send (Ch , <<" immediate_data" >>),
526+ {ok , <<" immediate_data" >>} = py_context :eval (CtxPid ,
527+ <<" Channel(ch).receive()" >>, #{<<" ch" >> => Ch }),
528+ ct :pal (" Subinterp immediate receive OK" ),
529+
530+ % % Test 2: Blocking receive - spawn process to send data after delay
531+ spawn_link (fun () ->
532+ timer :sleep (100 ),
533+ ok = py_channel :send (Ch , <<" delayed_data" >>),
534+ Self ! data_sent
532535 end ),
533536
534- % % Give receiver time to register as waiter
535- timer :sleep (100 ),
537+ % % This should block until the spawned process sends data
538+ {ok , <<" delayed_data" >>} = py_context :eval (CtxPid ,
539+ <<" Channel(ch).receive()" >>, #{<<" ch" >> => Ch }),
540+ receive data_sent -> ok after 1000 -> ok end ,
541+ ct :pal (" Subinterp blocking receive OK" ),
536542
537- % % Send data - should wake up the receiver
538- ok = py_channel :send (Ch , <<" subinterp_delayed_data" >>),
539-
540- % % Wait for result
541- receive
542- {subinterp_result , {ok , <<" subinterp_delayed_data" >>}} ->
543- ct :pal (" Subinterp sync receive successfully waited for and received data" ),
544- ok
545- after 5000 ->
546- ct :fail (" Subinterp receiver did not get data within timeout" )
547- end ,
543+ % % Test 3: try_receive on empty channel returns None
544+ {ok , none } = py_context :eval (CtxPid ,
545+ <<" Channel(ch).try_receive()" >>, #{<<" ch" >> => Ch }),
546+ ct :pal (" Subinterp try_receive empty OK" ),
548547
548+ % % Test 4: Channel close detected by receive
549549 ok = py_channel :close (Ch ),
550- ok = py_context :stop (CtxPid ).
550+ ok = py_context :exec (CtxPid , <<"
551+ def test_closed(ch_ref):
552+ try:
553+ Channel(ch_ref).receive()
554+ return 'no_exception'
555+ except:
556+ return 'got_exception'
557+ " >>),
558+ {ok , <<" got_exception" >>} = py_context :eval (CtxPid ,
559+ <<" test_closed(ch)" >>, #{<<" ch" >> => Ch }),
560+ ct :pal (" Subinterp closed channel detected OK" ),
561+
562+ py_context :stop (CtxPid ).
0 commit comments