Skip to content

Commit 1419ddd

Browse files
committed
Add Channel API documentation and changelog entry
- Add docs/channel.md with API reference and examples - Update CHANGELOG.md with Channel API features
1 parent 7ffa051 commit 1419ddd

File tree

2 files changed

+349
-1
lines changed

2 files changed

+349
-1
lines changed

CHANGELOG.md

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,22 @@
22

33
## Unreleased
44

5-
<<<<<<< HEAD
65
### Added
76

7+
- **Channel API** - Bidirectional message passing between Erlang and Python
8+
- `py_channel:new/0,1` - Create channels with optional backpressure (`max_size`)
9+
- `py_channel:send/2` - Send Erlang terms to Python (returns `busy` on backpressure)
10+
- `py_channel:close/1` - Close channel, signals `StopIteration` to Python
11+
- Python `Channel` class with sync and async interfaces:
12+
- `channel.receive()` - Blocking receive (suspends Python, yields to Erlang)
13+
- `channel.try_receive()` - Non-blocking receive
14+
- `await channel.async_receive()` - Asyncio-compatible receive
15+
- `for msg in channel:` - Sync iteration
16+
- `async for msg in channel:` - Async iteration
17+
- `erlang.channel.reply(pid, term)` - Send messages to Erlang processes
18+
- Zero-copy IOQueue buffering via `enif_ioq`
19+
- 8x faster than Reactor for small messages, 2x faster for 16KB messages
20+
821
- **OWN_GIL Subinterpreter Thread Pool** - True parallelism with Python 3.12+ subinterpreters
922
- Each subinterpreter runs in its own thread with its own GIL (`Py_GIL_OWN`)
1023
- Thread pool manages N subinterpreters for parallel Python execution

docs/channel.md

Lines changed: 335 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,335 @@
1+
# Channel API
2+
3+
The Channel API provides efficient bidirectional message passing between Erlang and Python. Channels use `enif_ioq` for zero-copy buffering and integrate with Python's asyncio for non-blocking operations.
4+
5+
## Overview
6+
7+
Channels are faster than the Reactor pattern for message passing scenarios:
8+
9+
| Message Size | Channel | Reactor | Speedup |
10+
|-------------|---------|---------|---------|
11+
| 64 bytes | 6.2M ops/s | 772K ops/s | **8x** |
12+
| 1KB | 3.8M ops/s | 734K ops/s | **5x** |
13+
| 16KB | 1.1M ops/s | 576K ops/s | **2x** |
14+
15+
Use channels when you need:
16+
- High-throughput message streaming
17+
- Bidirectional Erlang-Python communication
18+
- Asyncio integration
19+
- Backpressure support
20+
21+
## Quick Start
22+
23+
### Erlang Side
24+
25+
```erlang
26+
%% Create a channel
27+
{ok, Ch} = py_channel:new(),
28+
29+
%% Send messages to Python
30+
ok = py_channel:send(Ch, {request, <<"data">>}),
31+
ok = py_channel:send(Ch, {request, <<"more">>}),
32+
33+
%% Close when done
34+
py_channel:close(Ch).
35+
```
36+
37+
### Python Side (Sync)
38+
39+
```python
40+
from erlang.channel import Channel
41+
42+
def process_messages(channel_ref):
43+
ch = Channel(channel_ref)
44+
45+
# Iterate over messages until closed
46+
for msg in ch:
47+
print(f"Received: {msg}")
48+
```
49+
50+
### Python Side (Async)
51+
52+
```python
53+
from erlang.channel import Channel
54+
55+
async def process_messages(channel_ref):
56+
ch = Channel(channel_ref)
57+
58+
# Async iteration - yields to other coroutines while waiting
59+
async for msg in ch:
60+
print(f"Received: {msg}")
61+
```
62+
63+
## Erlang API
64+
65+
### `py_channel:new/0,1`
66+
67+
Create a new channel.
68+
69+
```erlang
70+
%% Unbounded channel
71+
{ok, Ch} = py_channel:new().
72+
73+
%% Channel with backpressure (max 10KB queued)
74+
{ok, Ch} = py_channel:new(#{max_size => 10000}).
75+
```
76+
77+
**Options:**
78+
- `max_size` - Maximum queue size in bytes. When exceeded, `send/2` returns `busy`.
79+
80+
### `py_channel:send/2`
81+
82+
Send an Erlang term to Python.
83+
84+
```erlang
85+
ok = py_channel:send(Ch, Term).
86+
```
87+
88+
**Returns:**
89+
- `ok` - Message queued successfully
90+
- `busy` - Queue full (backpressure)
91+
- `{error, closed}` - Channel was closed
92+
93+
### `py_channel:close/1`
94+
95+
Close the channel. Python receivers will get `StopIteration`.
96+
97+
```erlang
98+
ok = py_channel:close(Ch).
99+
```
100+
101+
### `py_channel:info/1`
102+
103+
Get channel status.
104+
105+
```erlang
106+
Info = py_channel:info(Ch).
107+
%% #{size => 1024, max_size => 10000, closed => false}
108+
```
109+
110+
## Python API
111+
112+
### `Channel` class
113+
114+
Wrapper for receiving messages from Erlang.
115+
116+
```python
117+
from erlang.channel import Channel
118+
119+
ch = Channel(channel_ref)
120+
```
121+
122+
#### `receive()`
123+
124+
Blocking receive. Suspends Python execution if empty, yielding to Erlang.
125+
126+
```python
127+
msg = ch.receive() # Blocks until message available
128+
```
129+
130+
**Raises:** `ChannelClosed` when the channel is closed.
131+
132+
#### `try_receive()`
133+
134+
Non-blocking receive. Returns immediately.
135+
136+
```python
137+
msg = ch.try_receive() # Returns None if empty
138+
```
139+
140+
**Returns:** Message or `None` if empty.
141+
**Raises:** `ChannelClosed` when the channel is closed.
142+
143+
#### `async_receive()`
144+
145+
Asyncio-compatible receive. Yields to other coroutines while waiting.
146+
147+
```python
148+
msg = await ch.async_receive()
149+
```
150+
151+
**Raises:** `ChannelClosed` when the channel is closed.
152+
153+
#### Iteration
154+
155+
```python
156+
# Sync iteration
157+
for msg in channel:
158+
process(msg)
159+
160+
# Async iteration
161+
async for msg in channel:
162+
process(msg)
163+
```
164+
165+
### `reply(pid, term)`
166+
167+
Send a message to an Erlang process.
168+
169+
```python
170+
from erlang.channel import reply
171+
172+
# Reply to the sender
173+
reply(sender_pid, {"status": "ok", "result": data})
174+
```
175+
176+
### `ChannelClosed` exception
177+
178+
Raised when receiving from a closed channel.
179+
180+
```python
181+
from erlang.channel import Channel, ChannelClosed
182+
183+
try:
184+
msg = ch.receive()
185+
except ChannelClosed:
186+
print("Channel closed")
187+
```
188+
189+
## Backpressure
190+
191+
Channels support backpressure to prevent unbounded memory growth.
192+
193+
### Erlang Side
194+
195+
```erlang
196+
{ok, Ch} = py_channel:new(#{max_size => 10000}),
197+
198+
case py_channel:send(Ch, LargeData) of
199+
ok ->
200+
continue;
201+
busy ->
202+
%% Queue is full, wait before retrying
203+
timer:sleep(10),
204+
retry
205+
end.
206+
```
207+
208+
### Monitoring Queue Size
209+
210+
```erlang
211+
#{size := Size, max_size := MaxSize} = py_channel:info(Ch),
212+
Utilization = Size / MaxSize.
213+
```
214+
215+
## Examples
216+
217+
### Request-Response Pattern
218+
219+
```erlang
220+
%% Erlang: Send request, receive response
221+
{ok, Ch} = py_channel:new(),
222+
ok = py_channel:send(Ch, {request, self(), <<"compute">>}),
223+
receive
224+
{response, Result} -> Result
225+
end.
226+
```
227+
228+
```python
229+
from erlang.channel import Channel, reply
230+
231+
def handle_requests(channel_ref):
232+
ch = Channel(channel_ref)
233+
234+
for msg in ch:
235+
if msg[0] == 'request':
236+
_, sender_pid, data = msg
237+
result = compute(data)
238+
reply(sender_pid, ('response', result))
239+
```
240+
241+
### Streaming Data
242+
243+
```erlang
244+
%% Erlang: Stream data to Python
245+
{ok, Ch} = py_channel:new(),
246+
lists:foreach(fun(Item) ->
247+
ok = py_channel:send(Ch, Item)
248+
end, large_list()),
249+
py_channel:close(Ch).
250+
```
251+
252+
```python
253+
async def process_stream(channel_ref):
254+
ch = Channel(channel_ref)
255+
results = []
256+
257+
async for item in ch:
258+
results.append(process(item))
259+
260+
return results
261+
```
262+
263+
### Worker Pool Pattern
264+
265+
```erlang
266+
%% Erlang: Distribute work across Python workers
267+
{ok, Ch} = py_channel:new(#{max_size => 100000}),
268+
269+
%% Start multiple Python workers on the channel
270+
[spawn_python_worker(Ch) || _ <- lists:seq(1, 4)],
271+
272+
%% Send work items
273+
[py_channel:send(Ch, {work, Item}) || Item <- WorkItems],
274+
275+
%% Signal completion
276+
py_channel:close(Ch).
277+
```
278+
279+
```python
280+
import asyncio
281+
from erlang.channel import Channel
282+
283+
async def worker(channel_ref, worker_id):
284+
ch = Channel(channel_ref)
285+
286+
async for msg in ch:
287+
if msg[0] == 'work':
288+
_, item = msg
289+
await process_item(item)
290+
print(f"Worker {worker_id} processed {item}")
291+
```
292+
293+
## Performance Tips
294+
295+
1. **Use async iteration** for high-throughput scenarios - it allows other coroutines to run while waiting.
296+
297+
2. **Set appropriate `max_size`** to prevent memory issues while maintaining throughput.
298+
299+
3. **Batch messages** when possible - sending fewer larger messages is more efficient than many small ones.
300+
301+
4. **Avoid `try_receive` polling** - use blocking `receive()` or async `async_receive()` instead.
302+
303+
## Architecture
304+
305+
```
306+
Erlang Python
307+
────── ──────
308+
309+
py_channel:new() ─────────────────▶ Channel created
310+
311+
py_channel:send(Ch, Term)
312+
313+
314+
enif_term_to_binary()
315+
316+
317+
enif_ioq_enq_binary() ──────────▶ channel.receive()
318+
319+
320+
enif_ioq_peek()
321+
322+
323+
enif_binary_to_term()
324+
325+
326+
Python term
327+
328+
py_channel:close() ───────────────▶ StopIteration
329+
```
330+
331+
## See Also
332+
333+
- [Reactor](reactor.md) - FD-based protocol handling for sockets
334+
- [Asyncio](asyncio.md) - Erlang-native asyncio event loop
335+
- [Getting Started](getting-started.md) - Basic usage guide

0 commit comments

Comments
 (0)