Skip to content

Commit af35f91

Browse files
committed
Add synchronous blocking channel receive
Implement blocking receive for channels that suspends Python while waiting for data and releases the dirty scheduler worker. - Add sync_waiter_pid and has_sync_waiter fields to py_channel_t - Add channel_register_sync_waiter NIF to register calling process - Modify channel_send to notify sync waiter via Erlang message - Modify channel_close to notify sync waiter of channel closure - Implement blocking handle_receive using Erlang receive to wait - Add tests for immediate, delayed, and closed channel cases
1 parent 2c02500 commit af35f91

File tree

6 files changed

+289
-6
lines changed

6 files changed

+289
-6
lines changed

c_src/py_channel.c

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,8 @@ py_channel_t *channel_alloc(size_t max_size) {
8282
channel->waiter_loop = NULL;
8383
channel->waiter_callback_id = 0;
8484
channel->has_waiter = false;
85+
channel->has_sync_waiter = false;
86+
memset(&channel->sync_waiter_pid, 0, sizeof(ErlNifPid));
8587
channel->closed = false;
8688
channel->channel_id = atomic_fetch_add(&g_channel_id_counter, 1);
8789

@@ -140,6 +142,16 @@ int channel_send(py_channel_t *channel, const unsigned char *data, size_t size)
140142
/* Note: Keep the reference until after dispatch */
141143
}
142144

145+
/* Check if there's a sync waiter to notify */
146+
ErlNifPid sync_waiter;
147+
bool notify_sync = false;
148+
149+
if (channel->has_sync_waiter) {
150+
sync_waiter = channel->sync_waiter_pid;
151+
notify_sync = true;
152+
channel->has_sync_waiter = false;
153+
}
154+
143155
pthread_mutex_unlock(&channel->mutex);
144156

145157
/* Resume happens outside the lock to avoid deadlocks */
@@ -154,6 +166,14 @@ int channel_send(py_channel_t *channel, const unsigned char *data, size_t size)
154166
enif_release_resource(loop_to_wake);
155167
}
156168

169+
/* Notify sync waiter via Erlang message */
170+
if (notify_sync) {
171+
ErlNifEnv *msg_env = enif_alloc_env();
172+
enif_send(NULL, &sync_waiter, msg_env,
173+
enif_make_atom(msg_env, "channel_data_ready"));
174+
enif_free_env(msg_env);
175+
}
176+
157177
return 0;
158178
}
159179

@@ -198,6 +218,16 @@ int channel_send_owned_binary(py_channel_t *channel, ErlNifBinary *bin) {
198218
channel->waiter_loop = NULL;
199219
}
200220

221+
/* Check if there's a sync waiter to notify */
222+
ErlNifPid sync_waiter;
223+
bool notify_sync = false;
224+
225+
if (channel->has_sync_waiter) {
226+
sync_waiter = channel->sync_waiter_pid;
227+
notify_sync = true;
228+
channel->has_sync_waiter = false;
229+
}
230+
201231
pthread_mutex_unlock(&channel->mutex);
202232

203233
if (should_resume) {
@@ -209,6 +239,14 @@ int channel_send_owned_binary(py_channel_t *channel, ErlNifBinary *bin) {
209239
enif_release_resource(loop_to_wake);
210240
}
211241

242+
/* Notify sync waiter via Erlang message */
243+
if (notify_sync) {
244+
ErlNifEnv *msg_env = enif_alloc_env();
245+
enif_send(NULL, &sync_waiter, msg_env,
246+
enif_make_atom(msg_env, "channel_data_ready"));
247+
enif_free_env(msg_env);
248+
}
249+
212250
return 0;
213251
}
214252

@@ -273,6 +311,16 @@ void channel_close(py_channel_t *channel) {
273311
channel->waiter_loop = NULL;
274312
}
275313

314+
/* Check if there's a sync waiter to notify */
315+
ErlNifPid sync_waiter;
316+
bool notify_sync = false;
317+
318+
if (channel->has_sync_waiter) {
319+
sync_waiter = channel->sync_waiter_pid;
320+
notify_sync = true;
321+
channel->has_sync_waiter = false;
322+
}
323+
276324
pthread_mutex_unlock(&channel->mutex);
277325

278326
if (should_resume) {
@@ -284,6 +332,14 @@ void channel_close(py_channel_t *channel) {
284332
event_loop_add_pending(loop_to_wake, EVENT_TYPE_TIMER, callback_id, -1);
285333
enif_release_resource(loop_to_wake);
286334
}
335+
336+
/* Notify sync waiter that channel is closed */
337+
if (notify_sync) {
338+
ErlNifEnv *msg_env = enif_alloc_env();
339+
enif_send(NULL, &sync_waiter, msg_env,
340+
enif_make_atom(msg_env, "channel_closed"));
341+
enif_free_env(msg_env);
342+
}
287343
}
288344

289345
void channel_resume_waiting(py_channel_t *channel) {
@@ -897,3 +953,46 @@ ERL_NIF_TERM nif_channel_cancel_wait(ErlNifEnv *env, int argc, const ERL_NIF_TER
897953
pthread_mutex_unlock(&channel->mutex);
898954
return ATOM_OK;
899955
}
956+
957+
/**
958+
* @brief Register a sync waiter for blocking receive
959+
*
960+
* nif_channel_register_sync_waiter(ChannelRef) -> ok | {error, Reason}
961+
*
962+
* Registers the calling process as a sync waiter. When data arrives,
963+
* the waiter receives a 'channel_data_ready' message. When the channel
964+
* closes, receives 'channel_closed'.
965+
*/
966+
ERL_NIF_TERM nif_channel_register_sync_waiter(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
967+
(void)argc;
968+
py_channel_t *channel;
969+
970+
if (!enif_get_resource(env, argv[0], CHANNEL_RESOURCE_TYPE, (void **)&channel)) {
971+
return make_error(env, "invalid_channel");
972+
}
973+
974+
pthread_mutex_lock(&channel->mutex);
975+
976+
/* Check if channel is closed */
977+
if (channel->closed) {
978+
pthread_mutex_unlock(&channel->mutex);
979+
return enif_make_tuple2(env, ATOM_ERROR, enif_make_atom(env, "closed"));
980+
}
981+
982+
/* Check if another sync waiter is already registered */
983+
if (channel->has_sync_waiter) {
984+
pthread_mutex_unlock(&channel->mutex);
985+
return enif_make_tuple2(env, ATOM_ERROR, enif_make_atom(env, "waiter_exists"));
986+
}
987+
988+
/* Get calling process PID */
989+
if (!enif_self(env, &channel->sync_waiter_pid)) {
990+
pthread_mutex_unlock(&channel->mutex);
991+
return make_error(env, "no_calling_process");
992+
}
993+
994+
channel->has_sync_waiter = true;
995+
996+
pthread_mutex_unlock(&channel->mutex);
997+
return ATOM_OK;
998+
}

c_src/py_channel.h

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,12 @@ typedef struct {
105105
/** @brief Flag: async waiter is registered */
106106
bool has_waiter;
107107

108+
/** @brief Sync waiter Erlang PID (for blocking receive) */
109+
ErlNifPid sync_waiter_pid;
110+
111+
/** @brief Flag: sync waiter is registered */
112+
bool has_sync_waiter;
113+
108114
/** @brief Flag: channel is closed */
109115
bool closed;
110116

@@ -265,4 +271,16 @@ ERL_NIF_TERM nif_channel_wait(ErlNifEnv *env, int argc,
265271
ERL_NIF_TERM nif_channel_cancel_wait(ErlNifEnv *env, int argc,
266272
const ERL_NIF_TERM argv[]);
267273

274+
/**
275+
* @brief Register a sync waiter for blocking receive
276+
*
277+
* NIF: channel_register_sync_waiter(ChannelRef) -> ok | {error, Reason}
278+
*
279+
* Registers the calling process as a sync waiter. When data arrives via
280+
* channel_send, the waiter receives a 'channel_data_ready' message.
281+
* When the channel closes, receives 'channel_closed'.
282+
*/
283+
ERL_NIF_TERM nif_channel_register_sync_waiter(ErlNifEnv *env, int argc,
284+
const ERL_NIF_TERM argv[]);
285+
268286
#endif /* PY_CHANNEL_H */

c_src/py_nif.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3928,7 +3928,8 @@ static ErlNifFunc nif_funcs[] = {
39283928
{"channel_close", 1, nif_channel_close, 0},
39293929
{"channel_info", 1, nif_channel_info, 0},
39303930
{"channel_wait", 3, nif_channel_wait, 0},
3931-
{"channel_cancel_wait", 2, nif_channel_cancel_wait, 0}
3931+
{"channel_cancel_wait", 2, nif_channel_cancel_wait, 0},
3932+
{"channel_register_sync_waiter", 1, nif_channel_register_sync_waiter, 0}
39323933
};
39333934

39343935
ERL_NIF_INIT(py_nif, nif_funcs, load, NULL, upgrade, unload)

src/py_channel.erl

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -155,10 +155,52 @@ register_callbacks() ->
155155

156156
%% @private
157157
%% Handle blocking receive from Python.
158+
%% This blocks until data is available by registering as a sync waiter
159+
%% and blocking on Erlang receive. This releases the Python worker while waiting.
158160
%% Args: [ChannelRef]
159161
-spec handle_receive([term()]) -> term().
160162
handle_receive([ChannelRef]) ->
161-
py_nif:channel_try_receive(ChannelRef).
163+
case py_nif:channel_try_receive(ChannelRef) of
164+
{ok, Data} ->
165+
{ok, Data};
166+
{error, closed} ->
167+
{error, closed};
168+
{error, empty} ->
169+
%% Channel is empty, register as sync waiter and block
170+
case py_nif:channel_register_sync_waiter(ChannelRef) of
171+
ok ->
172+
wait_for_channel_data(ChannelRef);
173+
{error, Reason} ->
174+
{error, Reason}
175+
end
176+
end.
177+
178+
%% @private
179+
%% Wait for channel data to arrive via Erlang message passing.
180+
%% This function blocks using native Erlang receive, which releases
181+
%% the Python dirty scheduler worker while waiting.
182+
-spec wait_for_channel_data(reference()) -> {ok, term()} | {error, term()}.
183+
wait_for_channel_data(ChannelRef) ->
184+
receive
185+
channel_data_ready ->
186+
case py_nif:channel_try_receive(ChannelRef) of
187+
{ok, Data} ->
188+
{ok, Data};
189+
{error, empty} ->
190+
%% Race condition: data was consumed by another waiter.
191+
%% Re-register and wait again.
192+
case py_nif:channel_register_sync_waiter(ChannelRef) of
193+
ok ->
194+
wait_for_channel_data(ChannelRef);
195+
{error, Reason} ->
196+
{error, Reason}
197+
end;
198+
{error, closed} ->
199+
{error, closed}
200+
end;
201+
channel_closed ->
202+
{error, closed}
203+
end.
162204

163205
%% @private
164206
%% Handle non-blocking receive from Python.

src/py_nif.erl

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,8 @@
206206
channel_close/1,
207207
channel_info/1,
208208
channel_wait/3,
209-
channel_cancel_wait/2
209+
channel_cancel_wait/2,
210+
channel_register_sync_waiter/1
210211
]).
211212

212213
-on_load(load_nif/0).
@@ -1712,3 +1713,15 @@ channel_wait(_ChannelRef, _CallbackId, _LoopRef) ->
17121713
-spec channel_cancel_wait(reference(), non_neg_integer()) -> ok.
17131714
channel_cancel_wait(_ChannelRef, _CallbackId) ->
17141715
?NIF_STUB.
1716+
1717+
%% @doc Register a sync waiter for blocking receive.
1718+
%%
1719+
%% Registers the calling process as a sync waiter. When data arrives,
1720+
%% the waiter receives a 'channel_data_ready' message. When the channel
1721+
%% closes, receives 'channel_closed'.
1722+
%%
1723+
%% @param ChannelRef Channel reference
1724+
%% @returns ok | {error, closed} | {error, waiter_exists}
1725+
-spec channel_register_sync_waiter(reference()) -> ok | {error, term()}.
1726+
channel_register_sync_waiter(_ChannelRef) ->
1727+
?NIF_STUB.

0 commit comments

Comments
 (0)