Skip to content

Commit f9835c8

Browse files
committed
Add true streaming API with stream_start/3,4 and stream_cancel/1
New event-driven streaming functions that send {py_stream, Ref, Event} messages as values are yielded from Python generators: - py:stream_start/3,4 - Start streaming, returns immediately with ref - py:stream_cancel/1 - Cancel an active stream Events sent to owner process: - {py_stream, Ref, {data, Value}} - Each yielded value - {py_stream, Ref, done} - Stream completed - {py_stream, Ref, {error, Reason}} - Stream error Unlike py:stream/3,4 which collects all values at once, stream_start sends events incrementally. Useful for LLM token streaming, real-time data feeds, and processing large sequences without memory accumulation. Also fixes inaccurate comment in py_channel.erl about Python API.
1 parent 039e864 commit f9835c8

File tree

6 files changed

+482
-53
lines changed

6 files changed

+482
-53
lines changed

CHANGELOG.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,14 @@
22

33
## 2.2.0 (unreleased)
44

5+
### Added
6+
7+
- **True streaming API** - New `py:stream_start/3,4` and `py:stream_cancel/1` functions
8+
for event-driven streaming from Python generators. Unlike `py:stream/3,4` which
9+
collects all values at once, `stream_start` sends `{py_stream, Ref, {data, Value}}`
10+
messages as values are yielded. Supports both sync and async generators. Useful for
11+
LLM token streaming, real-time data feeds, and processing large sequences incrementally.
12+
513
### Fixed
614

715
- **Channel notification for create_task** - Fixed async channel receive hanging when using

docs/streaming.md

Lines changed: 105 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,83 @@ This guide covers working with Python generators from Erlang.
66

77
Python generators allow processing large datasets or infinite sequences
88
efficiently by yielding values one at a time. erlang_python supports
9-
streaming these values back to Erlang.
9+
two modes of streaming:
1010

11-
## Generator Expressions
11+
1. **Batch streaming** (`py:stream/3,4`, `py:stream_eval/1,2`) - Collects all values into a list
12+
2. **True streaming** (`py:stream_start/3,4`) - Sends events as values are yielded
1213

13-
The simplest way to stream is with generator expressions:
14+
## True Streaming (Event-driven)
15+
16+
For real-time processing where you need values as they arrive (e.g., LLM tokens,
17+
live data feeds), use `py:stream_start/3,4`:
18+
19+
```erlang
20+
%% Start streaming from a Python iterator
21+
{ok, Ref} = py:stream_start(builtins, iter, [[1,2,3,4,5]]),
22+
23+
%% Receive events as values are yielded
24+
receive_loop(Ref).
25+
26+
receive_loop(Ref) ->
27+
receive
28+
{py_stream, Ref, {data, Value}} ->
29+
io:format("Got: ~p~n", [Value]),
30+
receive_loop(Ref);
31+
{py_stream, Ref, done} ->
32+
io:format("Complete~n");
33+
{py_stream, Ref, {error, Reason}} ->
34+
io:format("Error: ~p~n", [Reason])
35+
after 30000 ->
36+
timeout
37+
end.
38+
```
39+
40+
### Events
41+
42+
The stream sends these messages to the owner process:
43+
44+
- `{py_stream, Ref, {data, Value}}` - Each yielded value
45+
- `{py_stream, Ref, done}` - Stream completed successfully
46+
- `{py_stream, Ref, {error, Reason}}` - Stream error
47+
48+
### Options
49+
50+
```erlang
51+
%% Send events to a different process
52+
{ok, Ref} = py:stream_start(Module, Func, Args, #{owner => OtherPid}).
53+
```
54+
55+
### Cancellation
56+
57+
Cancel an active stream:
58+
59+
```erlang
60+
{ok, Ref} = py:stream_start(my_module, long_generator, []),
61+
%% ... receive some values ...
62+
ok = py:stream_cancel(Ref).
63+
%% Stream will stop on next iteration
64+
```
65+
66+
### Async Generators
67+
68+
`stream_start` supports both sync and async generators:
69+
70+
```erlang
71+
%% Async generator (e.g., streaming from an async API)
72+
ok = py:exec(<<"
73+
async def async_gen():
74+
for i in range(5):
75+
await asyncio.sleep(0.1)
76+
yield i
77+
">>),
78+
{ok, Ref} = py:stream_start('__main__', async_gen, []).
79+
```
80+
81+
## Batch Streaming (Collecting All Values)
82+
83+
For simpler use cases where you want all values at once:
84+
85+
### Generator Expressions
1486

1587
```erlang
1688
%% Stream squares of numbers 0-9
@@ -26,7 +98,7 @@ The simplest way to stream is with generator expressions:
2698
%% Evens = [0,2,4,6,8,10,12,14,16,18]
2799
```
28100

29-
## Iterator Objects
101+
### Iterator Objects
30102

31103
Any Python iterator can be streamed:
32104

@@ -40,7 +112,7 @@ Any Python iterator can be streamed:
40112
%% Items = [{<<"a">>, 1}, {<<"b">>, 2}]
41113
```
42114

43-
## Generator Functions
115+
### Generator Functions
44116

45117
Define generator functions with `yield`:
46118

@@ -69,46 +141,44 @@ For reliable inline generators, use lambda with walrus operator (Python 3.8+):
69141
%% Fib = [0,1,1,2,3,5,8,13,21,34]
70142
```
71143

72-
## Streaming Protocol
144+
## When to Use Each Mode
73145

74-
Internally, streaming uses these messages:
146+
| Use Case | Recommended API |
147+
|----------|-----------------|
148+
| LLM token streaming | `stream_start/3,4` |
149+
| Real-time data feeds | `stream_start/3,4` |
150+
| Live progress updates | `stream_start/3,4` |
151+
| Batch processing | `stream/3,4` or `stream_eval/1,2` |
152+
| Small datasets | `stream/3,4` or `stream_eval/1,2` |
153+
| One-time collection | `stream/3,4` or `stream_eval/1,2` |
75154

76-
```erlang
77-
{py_chunk, Ref, Value} %% Each yielded value
78-
{py_end, Ref} %% Generator exhausted
79-
{py_error, Ref, Error} %% Exception occurred
80-
```
155+
## Memory Considerations
81156

82-
You can build custom streaming consumers:
157+
- `stream_start`: Low memory - values processed as they arrive
158+
- `stream/stream_eval`: Values collected into a list - memory grows with output size
159+
- Generators are garbage collected after exhaustion
160+
161+
## Use Cases
162+
163+
### LLM Token Streaming
83164

84165
```erlang
85-
start_stream(Code) ->
86-
Ref = make_ref(),
87-
py_pool:request({stream_eval, Ref, self(), Code, #{}}),
88-
process_stream(Ref).
166+
%% Stream tokens from an LLM
167+
{ok, Ref} = py:stream_start(llm_client, generate_tokens, [Prompt]),
168+
stream_to_client(Ref, WebSocket).
89169

90-
process_stream(Ref) ->
170+
stream_to_client(Ref, WS) ->
91171
receive
92-
{py_chunk, Ref, Value} ->
93-
io:format("Got: ~p~n", [Value]),
94-
process_stream(Ref);
95-
{py_end, Ref} ->
96-
io:format("Done~n");
97-
{py_error, Ref, Error} ->
98-
io:format("Error: ~p~n", [Error])
99-
after 30000 ->
100-
io:format("Timeout~n")
172+
{py_stream, Ref, {data, Token}} ->
173+
websocket:send(WS, Token),
174+
stream_to_client(Ref, WS);
175+
{py_stream, Ref, done} ->
176+
websocket:send(WS, <<"[DONE]">>);
177+
{py_stream, Ref, {error, _}} ->
178+
websocket:send(WS, <<"[ERROR]">>)
101179
end.
102180
```
103181

104-
## Memory Considerations
105-
106-
- Values are collected into a list by `stream_eval/1,2`
107-
- For large datasets, consider processing chunks as they arrive
108-
- Generators are garbage collected after exhaustion
109-
110-
## Use Cases
111-
112182
### Data Processing Pipelines
113183

114184
```erlang
@@ -119,22 +189,6 @@ process_stream(Ref) ->
119189
Results = [process_line(L) || L <- Lines].
120190
```
121191

122-
### Infinite Sequences
123-
124-
```erlang
125-
%% Define infinite counter
126-
ok = py:exec(<<"
127-
def counter():
128-
n = 0
129-
while True:
130-
yield n
131-
n += 1
132-
">>).
133-
134-
%% Take first 100 (use your own take function)
135-
%% Can't use stream/3 directly for infinite - need custom handling
136-
```
137-
138192
### Batch Processing
139193

140194
```erlang

src/py.erl

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,9 @@
5656
stream/4,
5757
stream_eval/1,
5858
stream_eval/2,
59+
stream_start/3,
60+
stream_start/4,
61+
stream_cancel/1,
5962
version/0,
6063
memory_stats/0,
6164
gc/0,
@@ -468,6 +471,134 @@ stream_eval(Code, Locals) ->
468471
WrappedCode = <<"list(", CodeBin/binary, ")">>,
469472
py_context:eval(Ctx, WrappedCode, Locals).
470473

474+
%%% ============================================================================
475+
%%% True Streaming API (Event-driven)
476+
%%% ============================================================================
477+
478+
%% @doc Start a true streaming iteration from a Python generator.
479+
%%
480+
%% Unlike stream/3,4 which collects all values at once, this function
481+
%% returns immediately with a reference and sends values as events
482+
%% to the calling process as they are yielded.
483+
%%
484+
%% Events sent to the owner process:
485+
%% - `{py_stream, Ref, {data, Value}}' - Each yielded value
486+
%% - `{py_stream, Ref, done}' - Stream completed
487+
%% - `{py_stream, Ref, {error, Reason}}' - Stream error
488+
%%
489+
%% Supports both sync generators and async generators (coroutines).
490+
%%
491+
%% Example:
492+
%% ```
493+
%% {ok, Ref} = py:stream_start(builtins, iter, [[1,2,3,4,5]]),
494+
%% receive_loop(Ref).
495+
%%
496+
%% receive_loop(Ref) ->
497+
%% receive
498+
%% {py_stream, Ref, {data, Value}} ->
499+
%% io:format("Got: ~p~n", [Value]),
500+
%% receive_loop(Ref);
501+
%% {py_stream, Ref, done} ->
502+
%% io:format("Complete~n");
503+
%% {py_stream, Ref, {error, Reason}} ->
504+
%% io:format("Error: ~p~n", [Reason])
505+
%% after 30000 ->
506+
%% timeout
507+
%% end.
508+
%% '''
509+
-spec stream_start(py_module(), py_func(), py_args()) -> {ok, reference()}.
510+
stream_start(Module, Func, Args) ->
511+
stream_start(Module, Func, Args, #{}).
512+
513+
%% @doc Start a true streaming iteration with options.
514+
%%
515+
%% Options:
516+
%% - `owner => pid()' - Process to receive events (default: self())
517+
%%
518+
%% @param Module Python module name
519+
%% @param Func Python function name
520+
%% @param Args Function arguments
521+
%% @param Opts Options map
522+
%% @returns {ok, Ref} where Ref is used to identify stream events
523+
-spec stream_start(py_module(), py_func(), py_args(), map()) -> {ok, reference()}.
524+
stream_start(Module, Func, Args, Opts) ->
525+
Owner = maps:get(owner, Opts, self()),
526+
Ref = make_ref(),
527+
ModuleBin = ensure_binary(Module),
528+
FuncBin = ensure_binary(Func),
529+
RefHash = erlang:phash2(Ref),
530+
%% Store owner and ref for Python to retrieve
531+
%% Use binary keys because Python strings become binaries
532+
py_state:store({<<"stream_owner">>, RefHash}, Owner),
533+
py_state:store({<<"stream_ref">>, RefHash}, Ref),
534+
py_state:store({<<"stream_args">>, RefHash}, Args),
535+
%% Spawn an Erlang process to run the streaming iteration
536+
spawn(fun() ->
537+
stream_run_python(ModuleBin, FuncBin, RefHash)
538+
end),
539+
{ok, Ref}.
540+
541+
%% @private Run the streaming via Python code
542+
stream_run_python(ModuleBin, FuncBin, RefHash) ->
543+
RefHashBin = integer_to_binary(RefHash),
544+
%% Build Python code that streams values using callbacks
545+
Code = iolist_to_binary([
546+
<<"import erlang\n">>,
547+
<<"_rh = ">>, RefHashBin, <<"\n">>,
548+
<<"_args = erlang.call('state_get', ('stream_args', _rh))\n">>,
549+
<<"if _args is None:\n">>,
550+
<<" _args = []\n">>,
551+
<<"try:\n">>,
552+
<<" _mod = __import__('">>, ModuleBin, <<"')\n">>,
553+
<<" _fn = getattr(_mod, '">>, FuncBin, <<"')\n">>,
554+
<<" _gen = _fn(*_args) if _args else _fn()\n">>,
555+
<<" for _val in _gen:\n">>,
556+
<<" if erlang.call('_py_stream_cancelled', _rh):\n">>,
557+
<<" erlang.call('_py_stream_send', _rh, 'error', 'cancelled')\n">>,
558+
<<" break\n">>,
559+
<<" erlang.call('_py_stream_send', _rh, 'data', _val)\n">>,
560+
<<" else:\n">>,
561+
<<" erlang.call('_py_stream_send', _rh, 'done', None)\n">>,
562+
<<"except Exception as _e:\n">>,
563+
<<" erlang.call('_py_stream_send', _rh, 'error', str(_e))\n">>,
564+
<<"finally:\n">>,
565+
<<" erlang.call('_py_stream_cleanup', _rh)\n">>
566+
]),
567+
%% Execute the streaming code
568+
case exec(Code) of
569+
ok -> ok;
570+
{error, Reason} ->
571+
%% Try to notify owner of error
572+
case py_state:fetch({<<"stream_owner">>, RefHash}) of
573+
{ok, Owner} ->
574+
case py_state:fetch({<<"stream_ref">>, RefHash}) of
575+
{ok, Ref} ->
576+
Owner ! {py_stream, Ref, {error, Reason}},
577+
py_state:remove({<<"stream_owner">>, RefHash}),
578+
py_state:remove({<<"stream_ref">>, RefHash}),
579+
py_state:remove({<<"stream_args">>, RefHash});
580+
_ -> ok
581+
end;
582+
_ -> ok
583+
end
584+
end.
585+
586+
%% @doc Cancel an active stream.
587+
%%
588+
%% Sends a cancellation signal to stop the stream iteration.
589+
%% Any pending values may still be delivered before the stream stops.
590+
%%
591+
%% @param Ref The stream reference from stream_start/3,4
592+
%% @returns ok
593+
-spec stream_cancel(reference()) -> ok.
594+
stream_cancel(Ref) when is_reference(Ref) ->
595+
%% Store cancellation flag that the streaming task checks
596+
%% Use hash because we can't pass Erlang refs to Python callbacks easily
597+
%% Use binary key because Python strings become binaries
598+
RefHash = erlang:phash2(Ref),
599+
py_state:store({<<"stream_cancelled_hash">>, RefHash}, true),
600+
ok.
601+
471602
%%% ============================================================================
472603
%%% Info
473604
%%% ============================================================================

src/py_channel.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@
2626
%%% %% Send messages to Python
2727
%%% ok = py_channel:send(Ch, {request, self(), <<"data">>}),
2828
%%%
29-
%%% %% Python receives via channel.receive()
30-
%%% %% Python sends back via erlang.channel_reply(pid, term)
29+
%%% %% Python: ch = Channel(ref); msg = ch.receive()
30+
%%% %% Python: reply(pid, term)
3131
%%%
3232
%%% %% Close when done
3333
%%% py_channel:close(Ch).

0 commit comments

Comments
 (0)