Skip to content

Commit 42e99af

Browse files
committed
Add true streaming API with stream_start/3,4 and stream_cancel/1
New event-driven streaming functions that send {py_stream, Ref, Event} messages as values are yielded from Python generators: - py:stream_start/3,4 - Start streaming, returns immediately with ref - py:stream_cancel/1 - Cancel an active stream Events sent to owner process: - {py_stream, Ref, {data, Value}} - Each yielded value - {py_stream, Ref, done} - Stream completed - {py_stream, Ref, {error, Reason}} - Stream error Unlike py:stream/3,4 which collects all values at once, stream_start sends events incrementally. Useful for LLM token streaming, real-time data feeds, and processing large sequences without memory accumulation. Also fixes inaccurate comment in py_channel.erl about Python API.
1 parent 039e864 commit 42e99af

File tree

6 files changed

+500
-56
lines changed

6 files changed

+500
-56
lines changed

CHANGELOG.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,14 @@
22

33
## 2.2.0 (unreleased)
44

5+
### Added
6+
7+
- **True streaming API** - New `py:stream_start/3,4` and `py:stream_cancel/1` functions
8+
for event-driven streaming from Python generators. Unlike `py:stream/3,4` which
9+
collects all values at once, `stream_start` sends `{py_stream, Ref, {data, Value}}`
10+
messages as values are yielded. Supports both sync and async generators. Useful for
11+
LLM token streaming, real-time data feeds, and processing large sequences incrementally.
12+
513
### Fixed
614

715
- **Channel notification for create_task** - Fixed async channel receive hanging when using

docs/streaming.md

Lines changed: 105 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,83 @@ This guide covers working with Python generators from Erlang.
66

77
Python generators allow processing large datasets or infinite sequences
88
efficiently by yielding values one at a time. erlang_python supports
9-
streaming these values back to Erlang.
9+
two modes of streaming:
1010

11-
## Generator Expressions
11+
1. **Batch streaming** (`py:stream/3,4`, `py:stream_eval/1,2`) - Collects all values into a list
12+
2. **True streaming** (`py:stream_start/3,4`) - Sends events as values are yielded
1213

13-
The simplest way to stream is with generator expressions:
14+
## True Streaming (Event-driven)
15+
16+
For real-time processing where you need values as they arrive (e.g., LLM tokens,
17+
live data feeds), use `py:stream_start/3,4`:
18+
19+
```erlang
20+
%% Start streaming from a Python iterator
21+
{ok, Ref} = py:stream_start(builtins, iter, [[1,2,3,4,5]]),
22+
23+
%% Receive events as values are yielded
24+
receive_loop(Ref).
25+
26+
receive_loop(Ref) ->
27+
receive
28+
{py_stream, Ref, {data, Value}} ->
29+
io:format("Got: ~p~n", [Value]),
30+
receive_loop(Ref);
31+
{py_stream, Ref, done} ->
32+
io:format("Complete~n");
33+
{py_stream, Ref, {error, Reason}} ->
34+
io:format("Error: ~p~n", [Reason])
35+
after 30000 ->
36+
timeout
37+
end.
38+
```
39+
40+
### Events
41+
42+
The stream sends these messages to the owner process:
43+
44+
- `{py_stream, Ref, {data, Value}}` - Each yielded value
45+
- `{py_stream, Ref, done}` - Stream completed successfully
46+
- `{py_stream, Ref, {error, Reason}}` - Stream error
47+
48+
### Options
49+
50+
```erlang
51+
%% Send events to a different process
52+
{ok, Ref} = py:stream_start(Module, Func, Args, #{owner => OtherPid}).
53+
```
54+
55+
### Cancellation
56+
57+
Cancel an active stream:
58+
59+
```erlang
60+
{ok, Ref} = py:stream_start(my_module, long_generator, []),
61+
%% ... receive some values ...
62+
ok = py:stream_cancel(Ref).
63+
%% Stream will stop on next iteration
64+
```
65+
66+
### Async Generators
67+
68+
`stream_start` supports both sync and async generators:
69+
70+
```erlang
71+
%% Async generator (e.g., streaming from an async API)
72+
ok = py:exec(<<"
73+
async def async_gen():
74+
for i in range(5):
75+
await asyncio.sleep(0.1)
76+
yield i
77+
">>),
78+
{ok, Ref} = py:stream_start('__main__', async_gen, []).
79+
```
80+
81+
## Batch Streaming (Collecting All Values)
82+
83+
For simpler use cases where you want all values at once:
84+
85+
### Generator Expressions
1486

1587
```erlang
1688
%% Stream squares of numbers 0-9
@@ -26,7 +98,7 @@ The simplest way to stream is with generator expressions:
2698
%% Evens = [0,2,4,6,8,10,12,14,16,18]
2799
```
28100

29-
## Iterator Objects
101+
### Iterator Objects
30102

31103
Any Python iterator can be streamed:
32104

@@ -40,7 +112,7 @@ Any Python iterator can be streamed:
40112
%% Items = [{<<"a">>, 1}, {<<"b">>, 2}]
41113
```
42114

43-
## Generator Functions
115+
### Generator Functions
44116

45117
Define generator functions with `yield`:
46118

@@ -69,46 +141,44 @@ For reliable inline generators, use lambda with walrus operator (Python 3.8+):
69141
%% Fib = [0,1,1,2,3,5,8,13,21,34]
70142
```
71143

72-
## Streaming Protocol
144+
## When to Use Each Mode
73145

74-
Internally, streaming uses these messages:
146+
| Use Case | Recommended API |
147+
|----------|-----------------|
148+
| LLM token streaming | `stream_start/3,4` |
149+
| Real-time data feeds | `stream_start/3,4` |
150+
| Live progress updates | `stream_start/3,4` |
151+
| Batch processing | `stream/3,4` or `stream_eval/1,2` |
152+
| Small datasets | `stream/3,4` or `stream_eval/1,2` |
153+
| One-time collection | `stream/3,4` or `stream_eval/1,2` |
75154

76-
```erlang
77-
{py_chunk, Ref, Value} %% Each yielded value
78-
{py_end, Ref} %% Generator exhausted
79-
{py_error, Ref, Error} %% Exception occurred
80-
```
155+
## Memory Considerations
81156

82-
You can build custom streaming consumers:
157+
- `stream_start`: Low memory - values processed as they arrive
158+
- `stream/stream_eval`: Values collected into a list - memory grows with output size
159+
- Generators are garbage collected after exhaustion
160+
161+
## Use Cases
162+
163+
### LLM Token Streaming
83164

84165
```erlang
85-
start_stream(Code) ->
86-
Ref = make_ref(),
87-
py_pool:request({stream_eval, Ref, self(), Code, #{}}),
88-
process_stream(Ref).
166+
%% Stream tokens from an LLM
167+
{ok, Ref} = py:stream_start(llm_client, generate_tokens, [Prompt]),
168+
stream_to_client(Ref, WebSocket).
89169

90-
process_stream(Ref) ->
170+
stream_to_client(Ref, WS) ->
91171
receive
92-
{py_chunk, Ref, Value} ->
93-
io:format("Got: ~p~n", [Value]),
94-
process_stream(Ref);
95-
{py_end, Ref} ->
96-
io:format("Done~n");
97-
{py_error, Ref, Error} ->
98-
io:format("Error: ~p~n", [Error])
99-
after 30000 ->
100-
io:format("Timeout~n")
172+
{py_stream, Ref, {data, Token}} ->
173+
websocket:send(WS, Token),
174+
stream_to_client(Ref, WS);
175+
{py_stream, Ref, done} ->
176+
websocket:send(WS, <<"[DONE]">>);
177+
{py_stream, Ref, {error, _}} ->
178+
websocket:send(WS, <<"[ERROR]">>)
101179
end.
102180
```
103181

104-
## Memory Considerations
105-
106-
- Values are collected into a list by `stream_eval/1,2`
107-
- For large datasets, consider processing chunks as they arrive
108-
- Generators are garbage collected after exhaustion
109-
110-
## Use Cases
111-
112182
### Data Processing Pipelines
113183

114184
```erlang
@@ -119,22 +189,6 @@ process_stream(Ref) ->
119189
Results = [process_line(L) || L <- Lines].
120190
```
121191

122-
### Infinite Sequences
123-
124-
```erlang
125-
%% Define infinite counter
126-
ok = py:exec(<<"
127-
def counter():
128-
n = 0
129-
while True:
130-
yield n
131-
n += 1
132-
">>).
133-
134-
%% Take first 100 (use your own take function)
135-
%% Can't use stream/3 directly for infinite - need custom handling
136-
```
137-
138192
### Batch Processing
139193

140194
```erlang

0 commit comments

Comments
 (0)