From ea1de38e8a917783de31eb4cfa98972242bdd7ac Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Sat, 14 Mar 2026 21:10:45 +0100 Subject: [PATCH 1/6] Add PyBuffer tests, documentation, and examples - test/py_buffer_SUITE.erl: Common Test suite with 13 tests for: - Basic buffer creation (with/without content length) - Write/read cycle, readline, readlines - Seek/tell position tracking - Fast find with memchr/memmem - Zero-copy memoryview access - Line iteration, closed/empty buffer handling - Auto-conversion when passing buffer ref to Python - GC and reference counting verification - docs/buffer.md: Documentation covering: - Erlang API (new, write, close) - Python API (file-like methods, buffer protocol, find) - Architecture diagram and memory layout - Performance tips and examples - examples/py_buffer_example.erl: Working escript demonstrating: - Basic buffer usage - HTTP body streaming simulation - File-like interface methods - Zero-copy memoryview access - Line iteration for CSV-like data - CHANGELOG.md: Added PyBuffer API entry under 2.2.0 - docs/getting-started.md: Added Zero-Copy Buffers section with link to buffer.md --- CHANGELOG.md | 14 ++ docs/buffer.md | 385 +++++++++++++++++++++++++++++++++ docs/getting-started.md | 30 +++ examples/py_buffer_example.erl | 256 ++++++++++++++++++++++ test/py_buffer_SUITE.erl | 383 ++++++++++++++++++++++++++++++++ 5 files changed, 1068 insertions(+) create mode 100644 docs/buffer.md create mode 100755 examples/py_buffer_example.erl create mode 100644 test/py_buffer_SUITE.erl diff --git a/CHANGELOG.md b/CHANGELOG.md index a961744..3cc81c7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,20 @@ ### Added +- **PyBuffer API** - Zero-copy WSGI input buffer for streaming HTTP bodies + - `py_buffer:new/0,1` - Create buffer (chunked or with content_length) + - `py_buffer:write/2` - Append data, signals waiting Python readers + - `py_buffer:close/1` - Signal EOF, wake all readers + - Python `PyBuffer` type with file-like interface: + - `read(size)`, `readline()`, `readlines()` - Blocking reads with GIL released + - `seek(offset, whence)`, `tell()` - Position tracking + - `find(sub)` - Fast substring search via memmem/memchr + - `memoryview(buf)` - Zero-copy buffer protocol + - `for line in buf:` - Line iteration + - Auto-conversion: Passing buffer ref to `py:call`/`py:eval` wraps as `PyBuffer` + - Suitable for `wsgi.input` in WSGI applications + - See [Buffer API docs](docs/buffer.md) + - **Inline Continuation API** - High-performance scheduling without Erlang messaging - `erlang.schedule_inline(module, func, args, kwargs)` - Chain Python calls via `enif_schedule_nif()` - ~3x faster than `schedule_py` for tight loops (bypasses gen_server messaging) diff --git a/docs/buffer.md b/docs/buffer.md new file mode 100644 index 0000000..2128f91 --- /dev/null +++ b/docs/buffer.md @@ -0,0 +1,385 @@ +# Buffer API + +The Buffer API provides a zero-copy WSGI input buffer for streaming HTTP request bodies from Erlang to Python. Buffers use shared memory with GIL-released blocking reads for efficient data transfer. + +## Overview + +Buffers are designed for WSGI/ASGI `wsgi.input` scenarios where Erlang receives HTTP body chunks and Python needs to consume them: + +- Zero-copy access via Python's buffer protocol (`memoryview`) +- File-like interface (`read`, `readline`, `readlines`) +- Blocking reads that release the GIL while waiting +- Fast substring search using `memchr`/`memmem` + +Use buffers when you need: +- WSGI input for HTTP request bodies +- Streaming data from Erlang to Python +- Zero-copy access to binary data + +## Quick Start + +### Erlang Side + +```erlang +%% Create a buffer (chunked - unknown size) +{ok, Buf} = py_buffer:new(), + +%% Or with known content length (pre-allocates memory) +{ok, Buf} = py_buffer:new(4096), + +%% Write HTTP body chunks +ok = py_buffer:write(Buf, <<"chunk1">>), +ok = py_buffer:write(Buf, <<"chunk2">>), + +%% Signal end of data +ok = py_buffer:close(Buf), + +%% Pass to WSGI app +py:call(Ctx, myapp, handle_request, [#{<<"wsgi.input">> => Buf}]). +``` + +### Python Side + +```python +def handle_request(environ): + wsgi_input = environ['wsgi.input'] + + # Read all data + body = wsgi_input.read() + + # Or read line by line + for line in wsgi_input: + process(line) + + # Or read specific amount + chunk = wsgi_input.read(1024) +``` + +## Erlang API + +### `py_buffer:new/0` + +Create a buffer for chunked/streaming data (unknown content length). + +```erlang +{ok, Buf} = py_buffer:new(). +``` + +The buffer starts with a default capacity (64KB) and grows as needed. + +### `py_buffer:new/1` + +Create a buffer with known content length. + +```erlang +{ok, Buf} = py_buffer:new(ContentLength). +``` + +**Arguments:** +- `ContentLength` - Expected total size in bytes, or `undefined` for chunked + +Pre-allocating avoids buffer growth overhead when content length is known. + +### `py_buffer:write/2` + +Write binary data to the buffer. + +```erlang +ok = py_buffer:write(Buf, Data). +``` + +**Arguments:** +- `Buf` - Buffer reference from `new/0,1` +- `Data` - Binary data to append + +**Returns:** +- `ok` - Data written successfully +- `{error, closed}` - Buffer was closed + +Writing signals any waiting Python readers via `pthread_cond_broadcast`. + +### `py_buffer:close/1` + +Signal end of data (EOF). + +```erlang +ok = py_buffer:close(Buf). +``` + +After closing: +- No more data can be written +- Python's `read()` returns remaining data then empty bytes +- Waiting Python threads are woken up + +## Python API + +### `PyBuffer` class + +The buffer appears in Python as `erlang.PyBuffer` when passed from Erlang. + +```python +from erlang import PyBuffer +``` + +#### `read(size=-1)` + +Read up to `size` bytes, blocking if needed. + +```python +data = buf.read() # Read all (blocks until EOF) +chunk = buf.read(1024) # Read up to 1024 bytes +``` + +**Behavior:** +- If `size=-1`, reads all data (waits for EOF if content length known) +- If data available, returns immediately +- If empty, blocks until data arrives (GIL released during wait) +- Returns empty bytes at EOF + +#### `readline(size=-1)` + +Read one line, blocking if needed. + +```python +line = buf.readline() # Read until newline or EOF +``` + +**Returns:** Bytes including the trailing newline, or empty at EOF. + +Uses `memchr` for fast newline scanning. + +#### `readlines(hint=-1)` + +Read all lines as a list. + +```python +lines = buf.readlines() # ['line1\n', 'line2\n', ...] +``` + +**Arguments:** +- `hint` - Optional size hint; stops after approximately this many bytes + +#### `seek(offset, whence=0)` + +Seek to position within already-written data. + +```python +buf.seek(0) # Seek to beginning (SEEK_SET) +buf.seek(10, 1) # Seek forward 10 bytes (SEEK_CUR) +buf.seek(-5, 2) # Seek 5 bytes before end (SEEK_END, requires EOF) +``` + +**Limitations:** +- Cannot seek past written data +- `SEEK_END` requires EOF flag set + +#### `tell()` + +Return current read position. + +```python +pos = buf.tell() # Current byte offset +``` + +#### `find(sub, start=0, end=None)` + +Fast substring search using `memchr`/`memmem`. + +```python +idx = buf.find(b'\n') # Find first newline +idx = buf.find(b'boundary') # Find multipart boundary +``` + +**Returns:** Lowest index where substring found, or -1 if not found. + +Single-byte search uses `memchr` (very fast). Multi-byte uses `memmem`. + +#### Buffer Protocol + +Buffers support Python's buffer protocol for zero-copy access: + +```python +# Create memoryview for zero-copy access +mv = memoryview(buf) + +# Access without copying +first_byte = mv[0] +slice_data = bytes(mv[10:20]) + +# Release when done +mv.release() +``` + +**Properties:** +- `readonly=True` - Buffer is read-only from Python +- `ndim=1` - One-dimensional byte array + +#### Iteration + +Line-by-line iteration: + +```python +for line in buf: + process(line) +``` + +Equivalent to calling `readline()` until EOF. + +#### Properties and Methods + +```python +buf.readable() # True - always readable +buf.writable() # False - not writable from Python +buf.seekable() # True - limited seeking supported +buf.closed # True if buffer is closed +len(buf) # Available bytes (write_pos - read_pos) +buf.close() # Mark buffer as closed +``` + +## Architecture + +``` +Erlang Python +------ ------ + +py_buffer:new() -----------------> Buffer created + (pthread mutex+cond initialized) + +py_buffer:write(Buf, Data) + | + v + memcpy to buffer + pthread_cond_broadcast() ------> read()/readline() wakes up + (GIL was released during wait) + | + v + Return data to Python + +py_buffer:close() ---------------> EOF flag set + Waiting readers return +``` + +**Memory Layout:** + +``` +py_buffer_resource_t ++------------------+ +| data* | --> [chunk1][chunk2][chunk3]... +| capacity | ^ ^ +| write_pos | ----+ | +| read_pos | ------------+ +| content_length | +| mutex | +| data_ready (cond)| +| eof | +| closed | +| view_count | ++------------------+ +``` + +## Performance Tips + +1. **Use known content length** when available - avoids buffer reallocation: + ```erlang + ContentLength = byte_size(Body), + {ok, Buf} = py_buffer:new(ContentLength). + ``` + +2. **Write in reasonable chunks** - very small writes have overhead: + ```erlang + %% Good: write accumulated chunks + ok = py_buffer:write(Buf, AccumulatedData). + + %% Less efficient: many tiny writes + %% [py_buffer:write(Buf, <>) || B <- binary_to_list(Data)]. + ``` + +3. **Use memoryview for zero-copy** when processing large bodies: + ```python + mv = memoryview(buf) + # Process without copying + boundary_pos = buf.find(b'--boundary') + part = bytes(mv[:boundary_pos]) + ``` + +4. **Use find() for parsing** - `memchr`/`memmem` are faster than Python string methods. + +## Examples + +### WSGI Input Simulation + +```erlang +%% Simulate receiving HTTP body +{ok, Buf} = py_buffer:new(byte_size(Body)), +ok = py_buffer:write(Buf, Body), +ok = py_buffer:close(Buf), + +%% Build WSGI environ +Environ = #{ + <<"REQUEST_METHOD">> => <<"POST">>, + <<"PATH_INFO">> => <<"/api/data">>, + <<"CONTENT_TYPE">> => <<"application/json">>, + <<"CONTENT_LENGTH">> => integer_to_binary(byte_size(Body)), + <<"wsgi.input">> => Buf +}, + +%% Call WSGI app +{ok, Response} = py:call(myapp, handle, [Environ]). +``` + +### Chunked Transfer + +```erlang +%% Create buffer for chunked encoding +{ok, Buf} = py_buffer:new(), + +%% Spawn writer process +spawn(fun() -> + %% Simulate receiving chunks + lists:foreach(fun(Chunk) -> + ok = py_buffer:write(Buf, Chunk), + timer:sleep(10) % Simulate network delay + end, get_chunks()), + ok = py_buffer:close(Buf) +end), + +%% Python can start reading immediately +%% read() will block until data available +py:call(myapp, stream_handler, [Buf]). +``` + +### Multipart Form Parsing + +```python +def parse_multipart(buf, boundary): + """Parse multipart form data from buffer.""" + parts = [] + + while True: + # Find next boundary using fast memmem + idx = buf.find(boundary.encode()) + if idx == -1: + break + + # Read headers until blank line + headers = {} + while True: + line = buf.readline() + if line == b'\r\n': + break + name, value = line.split(b':', 1) + headers[name.strip()] = value.strip() + + # Read content until next boundary + # ... process part + parts.append({'headers': headers, 'data': data}) + + return parts +``` + +## See Also + +- [Channel](channel.md) - Bidirectional message passing +- [Reactor](reactor.md) - FD-based protocol handling +- [Web Frameworks](web-frameworks.md) - ASGI/WSGI integration +- [Getting Started](getting-started.md) - Basic usage guide diff --git a/docs/getting-started.md b/docs/getting-started.md index 5d2e7ac..e950178 100644 --- a/docs/getting-started.md +++ b/docs/getting-started.md @@ -458,6 +458,35 @@ py:register_pool(io, {db, query}). %% Only db.query goes to io pool This prevents slow HTTP requests from blocking quick math operations. See [Dual Pool Support](pools.md) for configuration and advanced usage. +## Zero-Copy Buffers + +For WSGI/ASGI applications that need to stream HTTP request bodies, use `py_buffer`: + +```erlang +%% Create buffer for HTTP body +{ok, Buf} = py_buffer:new(ContentLength), + +%% Write body chunks as they arrive +ok = py_buffer:write(Buf, Chunk1), +ok = py_buffer:write(Buf, Chunk2), +ok = py_buffer:close(Buf), + +%% Pass to WSGI app as wsgi.input +py:call(Ctx, myapp, handle, [#{<<"wsgi.input">> => Buf}]). +``` + +Python sees it as a file-like object with blocking reads: + +```python +def handle(environ): + body = environ['wsgi.input'].read() # Blocks until data ready + # Or iterate lines + for line in environ['wsgi.input']: + process(line) +``` + +See [Buffer API](buffer.md) for zero-copy memoryview access and fast substring search. + ## Next Steps - See [Dual Pool Support](pools.md) for separating CPU and I/O operations @@ -469,6 +498,7 @@ This prevents slow HTTP requests from blocking quick math operations. See [Dual - See [Logging and Tracing](logging.md) for Python logging and distributed tracing - See [AI Integration](ai-integration.md) for ML/AI examples - See [Asyncio Event Loop](asyncio.md) for the Erlang-native asyncio implementation with TCP and UDP support +- See [Buffer API](buffer.md) for zero-copy WSGI input buffers - See [Reactor](reactor.md) for FD-based protocol handling - See [Security](security.md) for sandbox and blocked operations - See [Web Frameworks](web-frameworks.md) for ASGI/WSGI integration diff --git a/examples/py_buffer_example.erl b/examples/py_buffer_example.erl new file mode 100755 index 0000000..6a382b2 --- /dev/null +++ b/examples/py_buffer_example.erl @@ -0,0 +1,256 @@ +#!/usr/bin/env escript +%%% @doc PyBuffer example - demonstrates zero-copy WSGI input buffer. +%%% +%%% This example shows how to use py_buffer for streaming HTTP body +%%% data from Erlang to Python, suitable for WSGI/ASGI input. +%%% +%%% Prerequisites: rebar3 compile +%%% Run from project root: escript examples/py_buffer_example.erl + +-mode(compile). + +main(_) -> + %% Add the compiled beam files to the code path + ScriptDir = filename:dirname(escript:script_name()), + ProjectRoot = filename:dirname(ScriptDir), + EbinDir = filename:join([ProjectRoot, "_build", "default", "lib", "erlang_python", "ebin"]), + true = code:add_pathz(EbinDir), + + {ok, _} = application:ensure_all_started(erlang_python), + + io:format("~n=== PyBuffer Zero-Copy WSGI Input Demo ===~n~n"), + + %% Demo 1: Basic buffer usage + basic_buffer_demo(), + + %% Demo 2: Simulated HTTP body streaming + http_body_demo(), + + %% Demo 3: File-like interface + file_like_demo(), + + %% Demo 4: Zero-copy memoryview access + memoryview_demo(), + + %% Demo 5: Line iteration + line_iteration_demo(), + + io:format("=== Done ===~n~n"), + ok. + +basic_buffer_demo() -> + io:format("--- Basic Buffer Demo ---~n~n"), + + %% Create a buffer + io:format("Creating buffer...~n"), + {ok, Buf} = py_buffer:new(), + + %% Write some data + io:format("Writing data chunks...~n"), + ok = py_buffer:write(Buf, <<"Hello, ">>), + ok = py_buffer:write(Buf, <<"World!">>), + + %% Close to signal EOF + ok = py_buffer:close(Buf), + io:format("Buffer closed (EOF signaled)~n"), + + %% Pass to Python and read + io:format("Reading from Python...~n"), + Ctx = py:context(1), + ok = py:exec(Ctx, <<" +def read_all(buf): + data = buf.read() + print(f' Read {len(data)} bytes: {data}') + return data +">>), + + {ok, Data} = py:eval(Ctx, <<"read_all(buf)">>, #{<<"buf">> => Buf}), + io:format("Erlang received: ~p~n~n", [Data]), + ok. + +http_body_demo() -> + io:format("--- HTTP Body Streaming Demo ---~n~n"), + + %% Simulate receiving a JSON POST body + Body = <<"{\"user\": \"alice\", \"action\": \"login\", \"timestamp\": 1234567890}">>, + ContentLength = byte_size(Body), + + io:format("Simulating HTTP POST with ~p byte body~n", [ContentLength]), + + %% Create buffer with known content length (pre-allocates) + {ok, Buf} = py_buffer:new(ContentLength), + + %% Write the body (could be in chunks) + ok = py_buffer:write(Buf, Body), + ok = py_buffer:close(Buf), + + %% Build WSGI-like environ + Ctx = py:context(1), + ok = py:exec(Ctx, <<" +import json + +def handle_request(environ): + '''Simulate WSGI request handler.''' + method = environ.get('REQUEST_METHOD', 'GET') + path = environ.get('PATH_INFO', '/') + content_type = environ.get('CONTENT_TYPE', '') + + print(f' {method} {path}') + print(f' Content-Type: {content_type}') + + # Read body from wsgi.input (PyBuffer) + wsgi_input = environ.get('wsgi.input') + if wsgi_input: + body = wsgi_input.read() + print(f' Body ({len(body)} bytes): {body[:50]}...' if len(body) > 50 else f' Body: {body}') + + if content_type == 'application/json': + data = json.loads(body) + return {'status': 'ok', 'user': data.get('user')} + + return {'status': 'ok'} +">>), + + Environ = #{ + <<"REQUEST_METHOD">> => <<"POST">>, + <<"PATH_INFO">> => <<"/api/login">>, + <<"CONTENT_TYPE">> => <<"application/json">>, + <<"CONTENT_LENGTH">> => integer_to_binary(ContentLength), + <<"wsgi.input">> => Buf + }, + + {ok, Result} = py:eval(Ctx, <<"handle_request(environ)">>, #{<<"environ">> => Environ}), + io:format("Response: ~p~n~n", [Result]), + ok. + +file_like_demo() -> + io:format("--- File-Like Interface Demo ---~n~n"), + + Ctx = py:context(1), + + %% Create buffer with multiple lines + {ok, Buf} = py_buffer:new(), + ok = py_buffer:write(Buf, <<"Name: Alice\n">>), + ok = py_buffer:write(Buf, <<"Email: alice@example.com\n">>), + ok = py_buffer:write(Buf, <<"Role: Admin\n">>), + ok = py_buffer:close(Buf), + + ok = py:exec(Ctx, <<" +def demonstrate_file_methods(buf): + '''Show file-like methods.''' + print(' File-like properties:') + print(f' readable(): {buf.readable()}') + print(f' writable(): {buf.writable()}') + print(f' seekable(): {buf.seekable()}') + print(f' len(buf): {len(buf)}') + print() + + # Read first line + line1 = buf.readline() + print(f' readline(): {line1}') + + # Current position + pos = buf.tell() + print(f' tell(): {pos}') + + # Seek back to start + buf.seek(0) + print(f' seek(0), tell(): {buf.tell()}') + + # Read all remaining + rest = buf.read() + print(f' read(): {rest[:30]}...') + + return 'done' +">>), + + {ok, _} = py:eval(Ctx, <<"demonstrate_file_methods(buf)">>, #{<<"buf">> => Buf}), + io:format("~n"), + ok. + +memoryview_demo() -> + io:format("--- Zero-Copy Memoryview Demo ---~n~n"), + + Ctx = py:context(1), + + %% Create buffer with binary data + Data = <<"HEADER:12345:PAYLOAD:abcdefghijklmnopqrstuvwxyz:END">>, + {ok, Buf} = py_buffer:new(byte_size(Data)), + ok = py_buffer:write(Buf, Data), + ok = py_buffer:close(Buf), + + ok = py:exec(Ctx, <<" +def zero_copy_parse(buf): + '''Demonstrate zero-copy access via memoryview.''' + + # Get memoryview - no data copying! + mv = memoryview(buf) + print(f' memoryview created, {len(mv)} bytes') + print(f' readonly: {mv.readonly}') + print(f' ndim: {mv.ndim}') + + # Find colon positions using find (uses memchr internally) + data_bytes = bytes(mv) # Only for find, still efficient + + # Parse header + first_colon = buf.find(b':') + header = bytes(mv[:first_colon]) + print(f' Header: {header}') + + # Find PAYLOAD section + payload_start = buf.find(b'PAYLOAD:') + 8 + payload_end = buf.find(b':END') + payload = bytes(mv[payload_start:payload_end]) + print(f' Payload: {payload}') + + # Release memoryview + mv.release() + print(' memoryview released') + + return payload +">>), + + {ok, Payload} = py:eval(Ctx, <<"zero_copy_parse(buf)">>, #{<<"buf">> => Buf}), + io:format("Extracted payload: ~p~n~n", [Payload]), + ok. + +line_iteration_demo() -> + io:format("--- Line Iteration Demo ---~n~n"), + + Ctx = py:context(1), + + %% Create buffer with CSV-like data + {ok, Buf} = py_buffer:new(), + ok = py_buffer:write(Buf, <<"id,name,score\n">>), + ok = py_buffer:write(Buf, <<"1,Alice,95\n">>), + ok = py_buffer:write(Buf, <<"2,Bob,87\n">>), + ok = py_buffer:write(Buf, <<"3,Charlie,92\n">>), + ok = py_buffer:close(Buf), + + ok = py:exec(Ctx, <<" +def process_csv(buf): + '''Iterate over lines like a file.''' + records = [] + header = None + + for line in buf: + line = line.strip() + if not line: + continue + + parts = line.decode().split(',') + + if header is None: + header = parts + print(f' Header: {header}') + else: + record = dict(zip(header, parts)) + records.append(record) + print(f' Record: {record}') + + return records +">>), + + {ok, Records} = py:eval(Ctx, <<"process_csv(buf)">>, #{<<"buf">> => Buf}), + io:format("Parsed ~p records~n~n", [length(Records)]), + ok. diff --git a/test/py_buffer_SUITE.erl b/test/py_buffer_SUITE.erl new file mode 100644 index 0000000..1e7ef01 --- /dev/null +++ b/test/py_buffer_SUITE.erl @@ -0,0 +1,383 @@ +%%% @doc Common Test suite for py_buffer API. +%%% +%%% Tests the zero-copy WSGI input buffer for streaming HTTP bodies. +-module(py_buffer_SUITE). + +-include_lib("common_test/include/ct.hrl"). + +-export([ + all/0, + init_per_suite/1, + end_per_suite/1, + init_per_testcase/2, + end_per_testcase/2 +]). + +-export([ + create_buffer_test/1, + create_buffer_with_size_test/1, + write_read_test/1, + readline_test/1, + readlines_test/1, + seek_tell_test/1, + find_test/1, + memoryview_test/1, + iterator_test/1, + closed_buffer_test/1, + empty_buffer_test/1, + pass_to_python_test/1, + gc_refcount_test/1 +]). + +all() -> [ + create_buffer_test, + create_buffer_with_size_test, + write_read_test, + readline_test, + readlines_test, + seek_tell_test, + find_test, + memoryview_test, + iterator_test, + closed_buffer_test, + empty_buffer_test, + pass_to_python_test, + gc_refcount_test +]. + +init_per_suite(Config) -> + {ok, _} = application:ensure_all_started(erlang_python), + {ok, _} = py:start_contexts(), + Config. + +end_per_suite(_Config) -> + ok = application:stop(erlang_python), + ok. + +init_per_testcase(_TestCase, Config) -> + Config. + +end_per_testcase(_TestCase, _Config) -> + ok. + +%%% ============================================================================ +%%% Test Cases +%%% ============================================================================ + +%% @doc Test creating a buffer with default settings (chunked encoding) +create_buffer_test(_Config) -> + {ok, Buf} = py_buffer:new(), + true = is_reference(Buf), + ok = py_buffer:close(Buf). + +%% @doc Test creating a buffer with known content length +create_buffer_with_size_test(_Config) -> + {ok, Buf} = py_buffer:new(1024), + true = is_reference(Buf), + ok = py_buffer:close(Buf). + +%% @doc Test basic write and read cycle +write_read_test(_Config) -> + {ok, Buf} = py_buffer:new(), + + %% Write some data + ok = py_buffer:write(Buf, <<"Hello, ">>), + ok = py_buffer:write(Buf, <<"World!">>), + ok = py_buffer:close(Buf), + + %% Read from Python + Ctx = py:context(1), + ok = py:exec(Ctx, <<"from erlang import PyBuffer">>), + + {ok, <<"Hello, World!">>} = py:eval(Ctx, <<"PyBuffer._test_create(b'Hello, World!').read()">>), + + ok. + +%% @doc Test readline method +readline_test(_Config) -> + Ctx = py:context(1), + ok = py:exec(Ctx, <<"from erlang import PyBuffer">>), + + %% Create buffer with multiple lines and close it + ok = py:exec(Ctx, <<" +buf = PyBuffer._test_create(b'line1\\nline2\\nline3\\n') +buf.close() +">>), + + {ok, Line1} = py:eval(Ctx, <<"buf.readline()">>), + <<"line1\n">> = Line1, + + {ok, Line2} = py:eval(Ctx, <<"buf.readline()">>), + <<"line2\n">> = Line2, + + {ok, Line3} = py:eval(Ctx, <<"buf.readline()">>), + <<"line3\n">> = Line3, + + %% Empty at EOF + {ok, <<>>} = py:eval(Ctx, <<"buf.readline()">>), + + ok. + +%% @doc Test readlines method +readlines_test(_Config) -> + Ctx = py:context(1), + ok = py:exec(Ctx, <<"from erlang import PyBuffer">>), + + ok = py:exec(Ctx, <<" +buf = PyBuffer._test_create(b'a\\nb\\nc\\n') +buf.close() +">>), + {ok, Lines} = py:eval(Ctx, <<"buf.readlines()">>), + [<<"a\n">>, <<"b\n">>, <<"c\n">>] = Lines, + + ok. + +%% @doc Test seek and tell methods +seek_tell_test(_Config) -> + Ctx = py:context(1), + ok = py:exec(Ctx, <<"from erlang import PyBuffer">>), + + %% Create buffer, close it, and check initial position + ok = py:exec(Ctx, <<" +buf = PyBuffer._test_create(b'0123456789') +buf.close() +">>), + {ok, 0} = py:eval(Ctx, <<"buf.tell()">>), + + %% Read 5 bytes, position should advance + {ok, <<"01234">>} = py:eval(Ctx, <<"buf.read(5)">>), + {ok, 5} = py:eval(Ctx, <<"buf.tell()">>), + + %% Seek back to beginning (SEEK_SET) + {ok, 0} = py:eval(Ctx, <<"buf.seek(0)">>), + {ok, 0} = py:eval(Ctx, <<"buf.tell()">>), + + %% Seek relative (SEEK_CUR) + {ok, 3} = py:eval(Ctx, <<"buf.seek(3, 1)">>), + {ok, 3} = py:eval(Ctx, <<"buf.tell()">>), + + %% Seek to position 7 + {ok, 7} = py:eval(Ctx, <<"buf.seek(7)">>), + {ok, <<"789">>} = py:eval(Ctx, <<"buf.read()">>), + + ok. + +%% @doc Test find method with memchr/memmem +find_test(_Config) -> + Ctx = py:context(1), + ok = py:exec(Ctx, <<"from erlang import PyBuffer">>), + + %% Create buffer with test data + ok = py:exec(Ctx, <<"buf = PyBuffer._test_create(b'hello world hello')">>), + + %% Find single byte (memchr path) + {ok, 0} = py:eval(Ctx, <<"buf.find(b'h')">>), + {ok, 4} = py:eval(Ctx, <<"buf.find(b'o')">>), + + %% Find substring (memmem path) + {ok, 0} = py:eval(Ctx, <<"buf.find(b'hello')">>), + {ok, 6} = py:eval(Ctx, <<"buf.find(b'world')">>), + + %% Find with start position + {ok, 12} = py:eval(Ctx, <<"buf.find(b'hello', 1)">>), + + %% Not found + {ok, -1} = py:eval(Ctx, <<"buf.find(b'xyz')">>), + + %% Empty substring returns start + {ok, 0} = py:eval(Ctx, <<"buf.find(b'')">>), + {ok, 5} = py:eval(Ctx, <<"buf.find(b'', 5)">>), + + ok. + +%% @doc Test buffer protocol (memoryview) +memoryview_test(_Config) -> + Ctx = py:context(1), + ok = py:exec(Ctx, <<"from erlang import PyBuffer">>), + + %% Create buffer and get memoryview + ok = py:exec(Ctx, <<" +buf = PyBuffer._test_create(b'test data') +mv = memoryview(buf) +">>), + + %% Check memoryview properties + {ok, 9} = py:eval(Ctx, <<"len(mv)">>), + {ok, true} = py:eval(Ctx, <<"mv.readonly">>), + {ok, 1} = py:eval(Ctx, <<"mv.ndim">>), + + %% Access bytes via memoryview (zero-copy) + {ok, <<"test data">>} = py:eval(Ctx, <<"bytes(mv)">>), + + %% Slice access + {ok, <<"test">>} = py:eval(Ctx, <<"bytes(mv[:4])">>), + {ok, <<"data">>} = py:eval(Ctx, <<"bytes(mv[5:])">>), + + %% Release memoryview + ok = py:exec(Ctx, <<"mv.release()">>), + + ok. + +%% @doc Test iterator protocol for line-by-line reading +iterator_test(_Config) -> + Ctx = py:context(1), + ok = py:exec(Ctx, <<"from erlang import PyBuffer">>), + + %% Create buffer and iterate + ok = py:exec(Ctx, <<" +buf = PyBuffer._test_create(b'line1\\nline2\\nline3\\n') +buf.close() +">>), + {ok, Lines} = py:eval(Ctx, <<"list(buf)">>), + [<<"line1\n">>, <<"line2\n">>, <<"line3\n">>] = Lines, + + ok. + +%% @doc Test operations on closed buffer +closed_buffer_test(_Config) -> + Ctx = py:context(1), + ok = py:exec(Ctx, <<"from erlang import PyBuffer">>), + + %% Create buffer and close it + ok = py:exec(Ctx, <<" +buf = PyBuffer._test_create(b'data') +buf.close() +">>), + + %% Check closed property + {ok, true} = py:eval(Ctx, <<"buf.closed">>), + + %% Reading should return remaining data then empty + {ok, <<"data">>} = py:eval(Ctx, <<"buf.read()">>), + {ok, <<>>} = py:eval(Ctx, <<"buf.read()">>), + + ok. + +%% @doc Test empty buffer behavior +empty_buffer_test(_Config) -> + Ctx = py:context(1), + ok = py:exec(Ctx, <<"from erlang import PyBuffer">>), + + %% Create empty buffer and close immediately + ok = py:exec(Ctx, <<" +buf = PyBuffer._test_create(b'') +buf.close() +">>), + + %% Read should return empty + {ok, <<>>} = py:eval(Ctx, <<"buf.read()">>), + {ok, <<>>} = py:eval(Ctx, <<"buf.readline()">>), + {ok, []} = py:eval(Ctx, <<"buf.readlines()">>), + + %% Length should be 0 + {ok, 0} = py:eval(Ctx, <<"len(buf)">>), + + %% File-like properties + {ok, true} = py:eval(Ctx, <<"buf.readable()">>), + {ok, false} = py:eval(Ctx, <<"buf.writable()">>), + {ok, true} = py:eval(Ctx, <<"buf.seekable()">>), + + ok. + +%% @doc Test passing buffer ref to Python - auto-conversion via py_convert.c +pass_to_python_test(_Config) -> + {ok, Buf} = py_buffer:new(), + + %% Write data from Erlang + ok = py_buffer:write(Buf, <<"chunk1">>), + ok = py_buffer:write(Buf, <<"chunk2">>), + ok = py_buffer:close(Buf), + + %% Pass buffer to Python via py:eval - should auto-convert to PyBuffer + Ctx = py:context(1), + + %% Define a function that reads from a buffer + ok = py:exec(Ctx, <<" +def read_buffer(buf): + return buf.read() +">>), + + %% Call with buffer ref - py_convert.c should wrap it as PyBuffer + {ok, <<"chunk1chunk2">>} = py:eval(Ctx, <<"read_buffer(buf)">>, #{<<"buf">> => Buf}), + + ok. + +%% @doc Test that buffer resources are properly garbage collected +%% Verifies reference counting between Erlang and Python +gc_refcount_test(_Config) -> + Ctx = py:context(1), + ok = py:exec(Ctx, <<" +import gc +from erlang import PyBuffer + +# Track if we can create many buffers without memory issues +def create_and_release_buffers(count): + '''Create many buffers and let them be GC'd.''' + for i in range(count): + buf = PyBuffer._test_create(b'x' * 1000) + buf.close() + # buf goes out of scope, should be released + gc.collect() + return True + +def test_memoryview_refcount(): + '''Test that memoryview keeps buffer alive.''' + buf = PyBuffer._test_create(b'test data') + mv = memoryview(buf) + + # Buffer should stay alive while memoryview exists + data = bytes(mv[:4]) + + # Release memoryview + mv.release() + + # Buffer should still be usable after memoryview release + buf.close() + remaining = buf.read() + + return data == b'test' and remaining == b'test data' + +def test_multiple_views(): + '''Test multiple memoryviews on same buffer.''' + buf = PyBuffer._test_create(b'hello world') + mv1 = memoryview(buf) + mv2 = memoryview(buf) + + result1 = bytes(mv1[:5]) + result2 = bytes(mv2[6:]) + + mv1.release() + mv2.release() + buf.close() + + return result1 == b'hello' and result2 == b'world' +">>), + + %% Test 1: Create and release many buffers + {ok, true} = py:eval(Ctx, <<"create_and_release_buffers(100)">>), + + %% Test 2: Memoryview reference counting + {ok, true} = py:eval(Ctx, <<"test_memoryview_refcount()">>), + + %% Test 3: Multiple memoryviews + {ok, true} = py:eval(Ctx, <<"test_multiple_views()">>), + + %% Test 4: Erlang-side reference counting + %% Create buffer, pass to Python, let Erlang ref go out of scope + {ok, Data} = begin + {ok, TempBuf} = py_buffer:new(), + ok = py_buffer:write(TempBuf, <<"erlang data">>), + ok = py_buffer:close(TempBuf), + %% Pass to Python - Python now holds a reference + py:eval(Ctx, <<"buf.read()">>, #{<<"buf">> => TempBuf}) + %% TempBuf goes out of scope here but Python read it first + end, + <<"erlang data">> = Data, + + %% Force Erlang GC + erlang:garbage_collect(), + + %% Test 5: Verify no crashes after GC + {ok, true} = py:eval(Ctx, <<"True">>), + + ok. From cad49b133ecb4f6da1534ae6b3aeabd0a1abc7fa Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Sat, 14 Mar 2026 21:15:20 +0100 Subject: [PATCH 2/6] Add non-blocking read methods for async I/O - read_nonblock(size=-1): Read available bytes immediately, never blocks - readable_amount(): Return bytes available without blocking - at_eof(): Check if at EOF with no more data These methods enable async I/O patterns where Python code needs to check for available data without blocking, suitable for asyncio integration. Tests demonstrate Erlang streaming data while Python reads asynchronously. --- CHANGELOG.md | 3 + c_src/py_buffer.c | 1012 ++++++++++++++++++++++++++++++++ docs/buffer.md | 94 +++ examples/py_buffer_example.erl | 70 +++ test/py_buffer_SUITE.erl | 129 +++- 5 files changed, 1306 insertions(+), 2 deletions(-) create mode 100644 c_src/py_buffer.c diff --git a/CHANGELOG.md b/CHANGELOG.md index 3cc81c7..8771b28 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,9 @@ - `py_buffer:close/1` - Signal EOF, wake all readers - Python `PyBuffer` type with file-like interface: - `read(size)`, `readline()`, `readlines()` - Blocking reads with GIL released + - `read_nonblock(size)` - Non-blocking read for async I/O + - `readable_amount()` - Bytes available without blocking + - `at_eof()` - Check if at EOF with no more data - `seek(offset, whence)`, `tell()` - Position tracking - `find(sub)` - Fast substring search via memmem/memchr - `memoryview(buf)` - Zero-copy buffer protocol diff --git a/c_src/py_buffer.c b/c_src/py_buffer.c new file mode 100644 index 0000000..8a22174 --- /dev/null +++ b/c_src/py_buffer.c @@ -0,0 +1,1012 @@ +/* + * Copyright 2026 Benoit Chesneau + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * @file py_buffer.c + * @brief Zero-copy WSGI input buffer implementation + * + * This module provides a PyBuffer Python type for zero-copy access to HTTP + * request bodies in WSGI applications. Data is written by Erlang and read + * by Python with file-like semantics. + * + * Key optimization techniques: + * - Zero-copy via buffer protocol (memoryview access) + * - memmem/memchr for efficient line scanning + * - GIL release during blocking waits + * - No data copying for readline when possible + */ + +/* Enable memmem on Linux (it's a GNU extension) */ +#ifndef _GNU_SOURCE +#define _GNU_SOURCE +#endif + +#include "py_buffer.h" +#include +#include + +/* Portable memmem fallback for systems that don't have it. + * memmem is available on: Linux (glibc), macOS, FreeBSD >= 13 + * Not available on: older BSDs, some embedded systems */ +#if !defined(__GLIBC__) && !defined(__APPLE__) && !defined(__FreeBSD__) +static void *portable_memmem(const void *haystack, size_t haystacklen, + const void *needle, size_t needlelen) { + if (needlelen == 0) return (void *)haystack; + if (needlelen > haystacklen) return NULL; + + const unsigned char *h = haystack; + const unsigned char *n = needle; + const unsigned char *end = h + haystacklen - needlelen + 1; + + while (h < end) { + h = memchr(h, n[0], end - h); + if (h == NULL) return NULL; + if (memcmp(h, n, needlelen) == 0) return (void *)h; + h++; + } + return NULL; +} +#define memmem portable_memmem +#endif + +/* Resource type - initialized in py_nif.c */ +ErlNifResourceType *PY_BUFFER_RESOURCE_TYPE = NULL; + +/* ============================================================================ + * Resource Management + * ============================================================================ */ + +void py_buffer_resource_dtor(ErlNifEnv *env, void *obj) { + (void)env; + py_buffer_resource_t *buf = (py_buffer_resource_t *)obj; + + pthread_mutex_destroy(&buf->mutex); + pthread_cond_destroy(&buf->data_ready); + + if (buf->data != NULL) { + enif_free(buf->data); + buf->data = NULL; + } +} + +py_buffer_resource_t *py_buffer_alloc(ssize_t content_length) { + if (PY_BUFFER_RESOURCE_TYPE == NULL) { + return NULL; + } + + py_buffer_resource_t *resource = enif_alloc_resource( + PY_BUFFER_RESOURCE_TYPE, sizeof(py_buffer_resource_t)); + if (resource == NULL) { + return NULL; + } + + /* Determine initial capacity */ + size_t capacity; + if (content_length > 0) { + capacity = (size_t)content_length; + } else { + capacity = PY_BUFFER_DEFAULT_CAPACITY; + } + + resource->data = enif_alloc(capacity); + if (resource->data == NULL) { + enif_release_resource(resource); + return NULL; + } + + resource->capacity = capacity; + resource->write_pos = 0; + resource->read_pos = 0; + resource->content_length = content_length; + resource->eof = false; + resource->closed = false; + resource->view_count = 0; + + if (pthread_mutex_init(&resource->mutex, NULL) != 0) { + enif_free(resource->data); + enif_release_resource(resource); + return NULL; + } + + if (pthread_cond_init(&resource->data_ready, NULL) != 0) { + pthread_mutex_destroy(&resource->mutex); + enif_free(resource->data); + enif_release_resource(resource); + return NULL; + } + + return resource; +} + +int py_buffer_write(py_buffer_resource_t *buf, const unsigned char *data, size_t size) { + if (size == 0) return 0; + + pthread_mutex_lock(&buf->mutex); + + if (buf->closed) { + pthread_mutex_unlock(&buf->mutex); + return -1; + } + + /* Check if we need to grow the buffer */ + size_t required = buf->write_pos + size; + if (required > buf->capacity) { + /* Calculate new capacity */ + size_t new_capacity = buf->capacity; + while (new_capacity < required) { + new_capacity *= PY_BUFFER_GROW_FACTOR; + } + + unsigned char *new_data = enif_alloc(new_capacity); + if (new_data == NULL) { + pthread_mutex_unlock(&buf->mutex); + return -1; + } + + /* Copy existing data */ + if (buf->write_pos > 0) { + memcpy(new_data, buf->data, buf->write_pos); + } + + enif_free(buf->data); + buf->data = new_data; + buf->capacity = new_capacity; + } + + /* Append new data */ + memcpy(buf->data + buf->write_pos, data, size); + buf->write_pos += size; + + /* Signal waiting readers */ + pthread_cond_broadcast(&buf->data_ready); + + pthread_mutex_unlock(&buf->mutex); + return 0; +} + +void py_buffer_close(py_buffer_resource_t *buf) { + pthread_mutex_lock(&buf->mutex); + buf->eof = true; + /* Wake up all waiting readers */ + pthread_cond_broadcast(&buf->data_ready); + pthread_mutex_unlock(&buf->mutex); +} + +/* ============================================================================ + * Internal Helper Functions + * ============================================================================ */ + +/** + * @brief Find newline in buffer using memchr (fast single-byte search) + * + * @param data Buffer data + * @param size Buffer size + * @return Pointer to newline, or NULL if not found + */ +static inline const unsigned char *find_newline(const unsigned char *data, size_t size) { + return memchr(data, '\n', size); +} + + +/* ============================================================================ + * Python Buffer Protocol + * ============================================================================ */ + +static void PyBuffer_releasebuffer(PyObject *obj, Py_buffer *view) { + (void)view; + PyBufferObject *self = (PyBufferObject *)obj; + if (self->resource != NULL) { + pthread_mutex_lock(&self->resource->mutex); + self->resource->view_count--; + pthread_mutex_unlock(&self->resource->mutex); + } +} + +static int PyBuffer_getbuffer(PyObject *obj, Py_buffer *view, int flags) { + PyBufferObject *self = (PyBufferObject *)obj; + + if (self->resource == NULL || self->resource->data == NULL) { + PyErr_SetString(PyExc_BufferError, "Buffer has been released"); + return -1; + } + + py_buffer_resource_t *buf = self->resource; + + pthread_mutex_lock(&buf->mutex); + + /* Expose only written-but-not-read data */ + size_t available = buf->write_pos - buf->read_pos; + + view->obj = obj; + view->buf = buf->data + buf->read_pos; + view->len = available; + view->readonly = 1; + view->itemsize = 1; + view->format = (flags & PyBUF_FORMAT) ? "B" : NULL; + view->ndim = 1; + view->shape = (flags & PyBUF_ND) ? &view->len : NULL; + view->strides = (flags & PyBUF_STRIDES) ? &view->itemsize : NULL; + view->suboffsets = NULL; + view->internal = NULL; + + buf->view_count++; + + pthread_mutex_unlock(&buf->mutex); + + Py_INCREF(obj); + return 0; +} + +static PyBufferProcs PyBuffer_as_buffer = { + .bf_getbuffer = PyBuffer_getbuffer, + .bf_releasebuffer = PyBuffer_releasebuffer, +}; + +/* ============================================================================ + * Python Methods + * ============================================================================ */ + +static void PyBuffer_dealloc(PyBufferObject *self) { + if (self->resource_ref != NULL) { + enif_release_resource(self->resource_ref); + self->resource_ref = NULL; + self->resource = NULL; + } + Py_TYPE(self)->tp_free((PyObject *)self); +} + +/** + * @brief read(size=-1) - Read up to size bytes, blocking if needed + * + * If size is -1, read all available data (until EOF). + * Returns bytes object. Returns empty bytes at EOF. + */ +static PyObject *PyBuffer_read(PyBufferObject *self, PyObject *args) { + Py_ssize_t size = -1; + if (!PyArg_ParseTuple(args, "|n", &size)) return NULL; + + if (self->resource == NULL) { + PyErr_SetString(PyExc_ValueError, "I/O operation on closed buffer"); + return NULL; + } + + py_buffer_resource_t *buf = self->resource; + + /* Release GIL during blocking wait */ + Py_BEGIN_ALLOW_THREADS + pthread_mutex_lock(&buf->mutex); + + /* Wait for data if buffer is empty */ + while (buf->read_pos >= buf->write_pos && !buf->eof && !buf->closed) { + pthread_cond_wait(&buf->data_ready, &buf->mutex); + } + + Py_END_ALLOW_THREADS + + /* Check if buffer was closed during wait */ + if (buf->closed && buf->read_pos >= buf->write_pos) { + pthread_mutex_unlock(&buf->mutex); + return PyBytes_FromStringAndSize("", 0); + } + + /* Calculate how much to read */ + size_t available = buf->write_pos - buf->read_pos; + size_t to_read; + + if (size < 0) { + /* Read all available (or wait for EOF if not all data yet) */ + if (!buf->eof && buf->content_length > 0 && + buf->write_pos < (size_t)buf->content_length) { + /* Wait for all data */ + Py_BEGIN_ALLOW_THREADS + while (buf->write_pos < (size_t)buf->content_length && + !buf->eof && !buf->closed) { + pthread_cond_wait(&buf->data_ready, &buf->mutex); + } + Py_END_ALLOW_THREADS + available = buf->write_pos - buf->read_pos; + } + to_read = available; + } else { + to_read = (available < (size_t)size) ? available : (size_t)size; + } + + /* Zero-copy: create bytes directly from buffer data */ + PyObject *result = PyBytes_FromStringAndSize( + (char *)buf->data + buf->read_pos, to_read); + + buf->read_pos += to_read; + + pthread_mutex_unlock(&buf->mutex); + + return result; +} + +/** + * @brief read_nonblock(size=-1) - Read available bytes without blocking + * + * Returns immediately with whatever data is available. + * If no data available and not at EOF, returns empty bytes (not None). + * Use readable_amount() to check if data is available first. + */ +static PyObject *PyBuffer_read_nonblock(PyBufferObject *self, PyObject *args) { + Py_ssize_t size = -1; + if (!PyArg_ParseTuple(args, "|n", &size)) return NULL; + + if (self->resource == NULL) { + PyErr_SetString(PyExc_ValueError, "I/O operation on closed buffer"); + return NULL; + } + + py_buffer_resource_t *buf = self->resource; + + pthread_mutex_lock(&buf->mutex); + + /* Calculate available data */ + size_t available = buf->write_pos - buf->read_pos; + + /* Determine how much to read */ + size_t to_read; + if (size < 0) { + to_read = available; + } else { + to_read = (available < (size_t)size) ? available : (size_t)size; + } + + /* Create result bytes */ + PyObject *result = PyBytes_FromStringAndSize( + (char *)buf->data + buf->read_pos, to_read); + + buf->read_pos += to_read; + + pthread_mutex_unlock(&buf->mutex); + + return result; +} + +/** + * @brief readable_amount() - Return number of bytes available without blocking + * + * Returns the number of bytes that can be read immediately without blocking. + * Useful for async I/O to check before calling read_nonblock(). + */ +static PyObject *PyBuffer_readable_amount(PyBufferObject *self, PyObject *Py_UNUSED(ignored)) { + if (self->resource == NULL) { + return PyLong_FromLong(0); + } + + pthread_mutex_lock(&self->resource->mutex); + size_t available = self->resource->write_pos - self->resource->read_pos; + pthread_mutex_unlock(&self->resource->mutex); + + return PyLong_FromSize_t(available); +} + +/** + * @brief at_eof() - Check if buffer is at EOF + * + * Returns True if EOF has been signaled and all data has been read. + * Useful for async I/O loops to know when to stop. + */ +static PyObject *PyBuffer_at_eof(PyBufferObject *self, PyObject *Py_UNUSED(ignored)) { + if (self->resource == NULL) { + Py_RETURN_TRUE; + } + + pthread_mutex_lock(&self->resource->mutex); + bool at_eof = (self->resource->eof || self->resource->closed) && + (self->resource->read_pos >= self->resource->write_pos); + pthread_mutex_unlock(&self->resource->mutex); + + return PyBool_FromLong(at_eof); +} + +/** + * @brief readline(size=-1) - Read one line, blocking if needed + * + * Uses memchr for fast newline scanning (zero-copy search). + * Returns bytes including the newline, or empty bytes at EOF. + */ +static PyObject *PyBuffer_readline(PyBufferObject *self, PyObject *args) { + Py_ssize_t size = -1; + if (!PyArg_ParseTuple(args, "|n", &size)) return NULL; + + if (self->resource == NULL) { + PyErr_SetString(PyExc_ValueError, "I/O operation on closed buffer"); + return NULL; + } + + py_buffer_resource_t *buf = self->resource; + + Py_BEGIN_ALLOW_THREADS + pthread_mutex_lock(&buf->mutex); + Py_END_ALLOW_THREADS + + /* Search for newline in available data */ + while (true) { + size_t available = buf->write_pos - buf->read_pos; + + if (available > 0) { + const unsigned char *start = buf->data + buf->read_pos; + size_t search_len = available; + + /* Limit search if size specified */ + if (size > 0 && (size_t)size < search_len) { + search_len = (size_t)size; + } + + /* Fast newline search using memchr */ + const unsigned char *newline = find_newline(start, search_len); + + if (newline != NULL) { + /* Found newline - return line including newline */ + size_t line_len = (newline - start) + 1; + PyObject *result = PyBytes_FromStringAndSize((char *)start, line_len); + buf->read_pos += line_len; + pthread_mutex_unlock(&buf->mutex); + return result; + } + + /* No newline found - check if we should return what we have */ + if (buf->eof || (size > 0 && available >= (size_t)size)) { + size_t to_read = (size > 0 && (size_t)size < available) + ? (size_t)size : available; + PyObject *result = PyBytes_FromStringAndSize((char *)start, to_read); + buf->read_pos += to_read; + pthread_mutex_unlock(&buf->mutex); + return result; + } + } else if (buf->eof || buf->closed) { + /* No more data coming */ + pthread_mutex_unlock(&buf->mutex); + return PyBytes_FromStringAndSize("", 0); + } + + /* Wait for more data */ + Py_BEGIN_ALLOW_THREADS + pthread_cond_wait(&buf->data_ready, &buf->mutex); + Py_END_ALLOW_THREADS + } +} + +/** + * @brief readlines(hint=-1) - Read all lines + * + * Returns list of bytes objects, each including their newline. + */ +static PyObject *PyBuffer_readlines(PyBufferObject *self, PyObject *args) { + Py_ssize_t hint = -1; + if (!PyArg_ParseTuple(args, "|n", &hint)) return NULL; + + PyObject *lines = PyList_New(0); + if (lines == NULL) return NULL; + + Py_ssize_t total_size = 0; + + while (true) { + PyObject *line = PyBuffer_readline(self, Py_BuildValue("()")); + if (line == NULL) { + Py_DECREF(lines); + return NULL; + } + + Py_ssize_t line_len = PyBytes_Size(line); + if (line_len == 0) { + Py_DECREF(line); + break; /* EOF reached */ + } + + if (PyList_Append(lines, line) < 0) { + Py_DECREF(line); + Py_DECREF(lines); + return NULL; + } + Py_DECREF(line); + + total_size += line_len; + + /* Check hint */ + if (hint > 0 && total_size >= hint) { + break; + } + } + + return lines; +} + +/** + * @brief seek(offset, whence=0) - Seek to position + * + * Only supports seeking within already-read data (whence=0, 1). + * Cannot seek forward past write_pos. + */ +static PyObject *PyBuffer_seek(PyBufferObject *self, PyObject *args) { + Py_ssize_t offset; + int whence = 0; + if (!PyArg_ParseTuple(args, "n|i", &offset, &whence)) return NULL; + + if (self->resource == NULL) { + PyErr_SetString(PyExc_ValueError, "I/O operation on closed buffer"); + return NULL; + } + + py_buffer_resource_t *buf = self->resource; + + pthread_mutex_lock(&buf->mutex); + + size_t new_pos; + switch (whence) { + case 0: /* SEEK_SET */ + if (offset < 0) { + pthread_mutex_unlock(&buf->mutex); + PyErr_SetString(PyExc_ValueError, "Negative seek position"); + return NULL; + } + new_pos = (size_t)offset; + break; + + case 1: /* SEEK_CUR */ + if (offset < 0 && (size_t)(-offset) > buf->read_pos) { + pthread_mutex_unlock(&buf->mutex); + PyErr_SetString(PyExc_ValueError, "Seek would go before start"); + return NULL; + } + new_pos = buf->read_pos + offset; + break; + + case 2: /* SEEK_END */ + if (!buf->eof) { + pthread_mutex_unlock(&buf->mutex); + PyErr_SetString(PyExc_ValueError, + "Cannot seek from end before EOF"); + return NULL; + } + if (offset > 0) { + pthread_mutex_unlock(&buf->mutex); + PyErr_SetString(PyExc_ValueError, + "Cannot seek past end"); + return NULL; + } + new_pos = buf->write_pos + offset; + break; + + default: + pthread_mutex_unlock(&buf->mutex); + PyErr_SetString(PyExc_ValueError, "Invalid whence value"); + return NULL; + } + + /* Cannot seek past written data */ + if (new_pos > buf->write_pos) { + pthread_mutex_unlock(&buf->mutex); + PyErr_SetString(PyExc_ValueError, "Cannot seek past available data"); + return NULL; + } + + buf->read_pos = new_pos; + + pthread_mutex_unlock(&buf->mutex); + + return PyLong_FromSize_t(new_pos); +} + +/** + * @brief tell() - Return current read position + */ +static PyObject *PyBuffer_tell(PyBufferObject *self, PyObject *Py_UNUSED(ignored)) { + if (self->resource == NULL) { + PyErr_SetString(PyExc_ValueError, "I/O operation on closed buffer"); + return NULL; + } + + pthread_mutex_lock(&self->resource->mutex); + size_t pos = self->resource->read_pos; + pthread_mutex_unlock(&self->resource->mutex); + + return PyLong_FromSize_t(pos); +} + +/** + * @brief readable() - Always returns True + */ +static PyObject *PyBuffer_readable(PyBufferObject *self, PyObject *Py_UNUSED(ignored)) { + (void)self; + Py_RETURN_TRUE; +} + +/** + * @brief writable() - Always returns False + */ +static PyObject *PyBuffer_writable(PyBufferObject *self, PyObject *Py_UNUSED(ignored)) { + (void)self; + Py_RETURN_FALSE; +} + +/** + * @brief seekable() - Returns True (limited seeking supported) + */ +static PyObject *PyBuffer_seekable(PyBufferObject *self, PyObject *Py_UNUSED(ignored)) { + (void)self; + Py_RETURN_TRUE; +} + +/** + * @brief close() - Mark buffer as closed + */ +static PyObject *PyBuffer_close_method(PyBufferObject *self, PyObject *Py_UNUSED(ignored)) { + if (self->resource != NULL) { + pthread_mutex_lock(&self->resource->mutex); + self->resource->closed = true; + pthread_cond_broadcast(&self->resource->data_ready); + pthread_mutex_unlock(&self->resource->mutex); + } + Py_RETURN_NONE; +} + +/** + * @brief closed property getter + */ +static PyObject *PyBuffer_closed_get(PyBufferObject *self, void *closure) { + (void)closure; + if (self->resource == NULL) { + Py_RETURN_TRUE; + } + pthread_mutex_lock(&self->resource->mutex); + bool closed = self->resource->closed; + pthread_mutex_unlock(&self->resource->mutex); + return PyBool_FromLong(closed); +} + +/** + * @brief __repr__ + */ +static PyObject *PyBuffer_repr(PyBufferObject *self) { + if (self->resource == NULL) { + return PyUnicode_FromString(""); + } + + pthread_mutex_lock(&self->resource->mutex); + size_t available = self->resource->write_pos - self->resource->read_pos; + size_t total = self->resource->write_pos; + bool eof = self->resource->eof; + pthread_mutex_unlock(&self->resource->mutex); + + return PyUnicode_FromFormat( + "", + available, total, eof ? "True" : "False"); +} + +/** + * @brief __len__ - Return available bytes + */ +static Py_ssize_t PyBuffer_length(PyBufferObject *self) { + if (self->resource == NULL) { + return 0; + } + pthread_mutex_lock(&self->resource->mutex); + Py_ssize_t len = self->resource->write_pos - self->resource->read_pos; + pthread_mutex_unlock(&self->resource->mutex); + return len; +} + +/* Iterator support */ + +static PyObject *PyBuffer_iter(PyObject *self) { + Py_INCREF(self); + return self; +} + +static PyObject *PyBuffer_iternext(PyBufferObject *self) { + PyObject *line = PyBuffer_readline(self, Py_BuildValue("()")); + if (line == NULL) { + return NULL; + } + + if (PyBytes_Size(line) == 0) { + Py_DECREF(line); + PyErr_SetNone(PyExc_StopIteration); + return NULL; + } + + return line; +} + +/* Bytes-like method: find - uses memchr/memmem for speed */ +static PyObject *PyBuffer_find(PyBufferObject *self, PyObject *args) { + PyObject *sub; + Py_ssize_t start = 0; + Py_ssize_t end = PY_SSIZE_T_MAX; + + if (!PyArg_ParseTuple(args, "O|nn:find", &sub, &start, &end)) { + return NULL; + } + + if (self->resource == NULL) { + return PyLong_FromLong(-1); + } + + Py_buffer sub_buf; + if (PyObject_GetBuffer(sub, &sub_buf, PyBUF_SIMPLE) < 0) { + return NULL; + } + + pthread_mutex_lock(&self->resource->mutex); + + size_t available = self->resource->write_pos - self->resource->read_pos; + const unsigned char *data = self->resource->data + self->resource->read_pos; + + /* Adjust start/end */ + if (start < 0) start += available; + if (start < 0) start = 0; + if (end < 0) end += available; + if (end > (Py_ssize_t)available) end = available; + + Py_ssize_t result = -1; + + if (start <= end && sub_buf.len <= (end - start)) { + const unsigned char *haystack = data + start; + Py_ssize_t haystack_len = end - start; + const unsigned char *needle = sub_buf.buf; + Py_ssize_t needle_len = sub_buf.len; + + if (needle_len == 0) { + result = start; + } else if (needle_len == 1) { + /* Single byte: use memchr (very fast) */ + void *found = memchr(haystack, needle[0], haystack_len); + if (found != NULL) { + result = start + ((const unsigned char *)found - haystack); + } + } else { + /* Multi-byte: use memmem */ + void *found = memmem(haystack, haystack_len, needle, needle_len); + if (found != NULL) { + result = start + ((const unsigned char *)found - haystack); + } + } + } + + pthread_mutex_unlock(&self->resource->mutex); + PyBuffer_Release(&sub_buf); + + return PyLong_FromSsize_t(result); +} + +/* Test helper to create a buffer for testing */ +static PyObject *PyBuffer_test_create(PyTypeObject *type, PyObject *args) { + (void)type; + Py_buffer data_buf; + Py_ssize_t content_length = -1; + + if (!PyArg_ParseTuple(args, "|y*n:_test_create", &data_buf, &content_length)) { + return NULL; + } + + py_buffer_resource_t *resource = py_buffer_alloc(content_length); + if (resource == NULL) { + if (data_buf.buf != NULL) PyBuffer_Release(&data_buf); + PyErr_SetString(PyExc_MemoryError, "Failed to allocate buffer"); + return NULL; + } + + if (data_buf.buf != NULL && data_buf.len > 0) { + if (py_buffer_write(resource, data_buf.buf, data_buf.len) < 0) { + PyBuffer_Release(&data_buf); + enif_release_resource(resource); + PyErr_SetString(PyExc_MemoryError, "Failed to write initial data"); + return NULL; + } + PyBuffer_Release(&data_buf); + } + + PyObject *result = PyBuffer_from_resource(resource, resource); + /* from_resource does enif_keep_resource, so we release our reference */ + enif_release_resource(resource); + + return result; +} + +/* Method definitions */ +static PyMethodDef PyBuffer_methods[] = { + {"read", (PyCFunction)PyBuffer_read, METH_VARARGS, + "Read up to size bytes, blocking if needed"}, + {"read_nonblock", (PyCFunction)PyBuffer_read_nonblock, METH_VARARGS, + "Read available bytes without blocking"}, + {"readline", (PyCFunction)PyBuffer_readline, METH_VARARGS, + "Read one line, blocking if needed"}, + {"readlines", (PyCFunction)PyBuffer_readlines, METH_VARARGS, + "Read all lines"}, + {"seek", (PyCFunction)PyBuffer_seek, METH_VARARGS, + "Seek to position"}, + {"tell", (PyCFunction)PyBuffer_tell, METH_NOARGS, + "Return current read position"}, + {"readable", (PyCFunction)PyBuffer_readable, METH_NOARGS, + "Return True (always readable)"}, + {"readable_amount", (PyCFunction)PyBuffer_readable_amount, METH_NOARGS, + "Return number of bytes available without blocking"}, + {"at_eof", (PyCFunction)PyBuffer_at_eof, METH_NOARGS, + "Return True if at EOF with no more data"}, + {"writable", (PyCFunction)PyBuffer_writable, METH_NOARGS, + "Return False (not writable from Python)"}, + {"seekable", (PyCFunction)PyBuffer_seekable, METH_NOARGS, + "Return True (limited seeking supported)"}, + {"close", (PyCFunction)PyBuffer_close_method, METH_NOARGS, + "Close the buffer"}, + {"find", (PyCFunction)PyBuffer_find, METH_VARARGS, + "Return lowest index of substring, or -1 if not found"}, + {"_test_create", (PyCFunction)PyBuffer_test_create, + METH_VARARGS | METH_CLASS, + "Create a PyBuffer for testing (internal use)"}, + {NULL} +}; + +/* Getset definitions */ +static PyGetSetDef PyBuffer_getset[] = { + {"closed", (getter)PyBuffer_closed_get, NULL, "True if buffer is closed", NULL}, + {NULL} +}; + +/* Sequence methods for len() */ +static PySequenceMethods PyBuffer_as_sequence = { + .sq_length = (lenfunc)PyBuffer_length, +}; + +/* Type definition */ +PyTypeObject PyBufferType = { + PyVarObject_HEAD_INIT(NULL, 0) + .tp_name = "erlang.PyBuffer", + .tp_doc = "Zero-copy WSGI input buffer with file-like interface", + .tp_basicsize = sizeof(PyBufferObject), + .tp_itemsize = 0, + .tp_flags = Py_TPFLAGS_DEFAULT, + .tp_dealloc = (destructor)PyBuffer_dealloc, + .tp_repr = (reprfunc)PyBuffer_repr, + .tp_as_buffer = &PyBuffer_as_buffer, + .tp_as_sequence = &PyBuffer_as_sequence, + .tp_iter = PyBuffer_iter, + .tp_iternext = (iternextfunc)PyBuffer_iternext, + .tp_methods = PyBuffer_methods, + .tp_getset = PyBuffer_getset, +}; + +/* ============================================================================ + * Initialization + * ============================================================================ */ + +PyObject *PyBuffer_from_resource(py_buffer_resource_t *resource, + void *resource_ref) { + PyBufferObject *obj = PyObject_New(PyBufferObject, &PyBufferType); + if (obj == NULL) { + return NULL; + } + + obj->resource = resource; + obj->resource_ref = resource_ref; + enif_keep_resource(resource_ref); + + return (PyObject *)obj; +} + +int PyBuffer_init_type(void) { + if (PyType_Ready(&PyBufferType) < 0) { + return -1; + } + return 0; +} + +int PyBuffer_register_with_module(void) { + /* Import erlang module (created in py_callback.c) */ + PyObject *erlang_module = PyImport_ImportModule("erlang"); + if (erlang_module == NULL) { + /* Module doesn't exist yet - this shouldn't happen */ + PyErr_Clear(); + return -1; + } + + /* Add PyBuffer type to the erlang module */ + Py_INCREF(&PyBufferType); + if (PyModule_AddObject(erlang_module, "PyBuffer", + (PyObject *)&PyBufferType) < 0) { + Py_DECREF(&PyBufferType); + Py_DECREF(erlang_module); + return -1; + } + + Py_DECREF(erlang_module); + return 0; +} + +/* ============================================================================ + * NIF Functions + * ============================================================================ */ + +/** + * @brief NIF: py_buffer_create(ContentLength | undefined) -> {ok, Ref} + * + * Create a new PyBuffer resource. + */ +static ERL_NIF_TERM nif_py_buffer_create(ErlNifEnv *env, int argc, + const ERL_NIF_TERM argv[]) { + (void)argc; + + ssize_t content_length = -1; + + /* Check if content_length is provided */ + if (!enif_is_atom(env, argv[0])) { + /* It's a number */ + ErlNifSInt64 len; + if (!enif_get_int64(env, argv[0], &len)) { + return make_error(env, "invalid_content_length"); + } + content_length = (ssize_t)len; + } + /* If it's an atom (undefined), content_length stays -1 */ + + py_buffer_resource_t *resource = py_buffer_alloc(content_length); + if (resource == NULL) { + return make_error(env, "alloc_failed"); + } + + ERL_NIF_TERM ref = enif_make_resource(env, resource); + enif_release_resource(resource); + + return enif_make_tuple2(env, ATOM_OK, ref); +} + +/** + * @brief NIF: py_buffer_write(Ref, Data) -> ok | {error, Reason} + * + * Write binary data to the buffer. + */ +static ERL_NIF_TERM nif_py_buffer_write(ErlNifEnv *env, int argc, + const ERL_NIF_TERM argv[]) { + (void)argc; + + py_buffer_resource_t *resource; + if (!enif_get_resource(env, argv[0], PY_BUFFER_RESOURCE_TYPE, (void **)&resource)) { + return make_error(env, "invalid_buffer"); + } + + ErlNifBinary data; + if (!enif_inspect_binary(env, argv[1], &data)) { + return make_error(env, "invalid_data"); + } + + if (py_buffer_write(resource, data.data, data.size) < 0) { + return make_error(env, "write_failed"); + } + + return ATOM_OK; +} + +/** + * @brief NIF: py_buffer_close(Ref) -> ok + * + * Close the buffer (signal EOF). + */ +static ERL_NIF_TERM nif_py_buffer_close(ErlNifEnv *env, int argc, + const ERL_NIF_TERM argv[]) { + (void)argc; + + py_buffer_resource_t *resource; + if (!enif_get_resource(env, argv[0], PY_BUFFER_RESOURCE_TYPE, (void **)&resource)) { + return make_error(env, "invalid_buffer"); + } + + py_buffer_close(resource); + + return ATOM_OK; +} diff --git a/docs/buffer.md b/docs/buffer.md index 2128f91..cd2f630 100644 --- a/docs/buffer.md +++ b/docs/buffer.md @@ -136,6 +136,48 @@ chunk = buf.read(1024) # Read up to 1024 bytes - If empty, blocks until data arrives (GIL released during wait) - Returns empty bytes at EOF +#### `read_nonblock(size=-1)` + +Read available bytes without blocking. For async I/O. + +```python +chunk = buf.read_nonblock(1024) # Read up to 1024 available bytes +data = buf.read_nonblock() # Read all available bytes +``` + +**Behavior:** +- Returns immediately with whatever data is available +- Never blocks, even if no data available +- Returns empty bytes if nothing available (check `readable_amount()` first) +- Use with `readable_amount()` and `at_eof()` for async I/O loops + +#### `readable_amount()` + +Return number of bytes available without blocking. + +```python +available = buf.readable_amount() +if available > 0: + data = buf.read_nonblock(available) +``` + +**Returns:** Number of bytes that can be read immediately. + +#### `at_eof()` + +Check if buffer is at EOF with no more data. + +```python +while not buf.at_eof(): + if buf.readable_amount() > 0: + chunk = buf.read_nonblock(4096) + process(chunk) + else: + await asyncio.sleep(0.001) # Yield to event loop +``` + +**Returns:** `True` if EOF signaled AND all data has been read. + #### `readline(size=-1)` Read one line, blocking if needed. @@ -377,6 +419,58 @@ def parse_multipart(buf, boundary): return parts ``` +### Async I/O Integration + +For asyncio applications, use the non-blocking methods to avoid blocking the event loop: + +```python +import asyncio +from erlang import PyBuffer + +async def read_buffer_async(buf): + """Read from buffer without blocking the event loop.""" + chunks = [] + + while not buf.at_eof(): + available = buf.readable_amount() + if available > 0: + # Read available data + chunk = buf.read_nonblock(4096) + chunks.append(chunk) + else: + # Yield to event loop, check again soon + await asyncio.sleep(0.001) + + return b''.join(chunks) + +async def process_wsgi_body_async(environ): + """Process WSGI body in async context.""" + buf = environ['wsgi.input'] + + # Read body without blocking + body = await read_buffer_async(buf) + return json.loads(body) +``` + +For production use, consider integrating with Erlang's event notification: + +```python +async def read_with_notification(buf, notify_channel): + """Read using Erlang channel for data-ready notifications.""" + chunks = [] + + while not buf.at_eof(): + available = buf.readable_amount() + if available > 0: + chunk = buf.read_nonblock(available) + chunks.append(chunk) + else: + # Wait for Erlang to signal data is ready + await notify_channel.async_receive() + + return b''.join(chunks) +``` + ## See Also - [Channel](channel.md) - Bidirectional message passing diff --git a/examples/py_buffer_example.erl b/examples/py_buffer_example.erl index 6a382b2..8fcb135 100755 --- a/examples/py_buffer_example.erl +++ b/examples/py_buffer_example.erl @@ -35,6 +35,9 @@ main(_) -> %% Demo 5: Line iteration line_iteration_demo(), + %% Demo 6: Async I/O + asyncio_demo(), + io:format("=== Done ===~n~n"), ok. @@ -254,3 +257,70 @@ def process_csv(buf): {ok, Records} = py:eval(Ctx, <<"process_csv(buf)">>, #{<<"buf">> => Buf}), io:format("Parsed ~p records~n~n", [length(Records)]), ok. + +asyncio_demo() -> + io:format("--- Async I/O Demo (Erlang streaming to Python) ---~n~n"), + + %% Create buffer that Erlang will fill + {ok, Buf} = py_buffer:new(), + Self = self(), + + Ctx = py:context(1), + + ok = py:exec(Ctx, <<" +import asyncio + +async def async_buffer_reader(buf): + '''Read from buffer asynchronously as Erlang streams data.''' + chunks = [] + read_count = 0 + + while not buf.at_eof(): + available = buf.readable_amount() + if available > 0: + chunk = buf.read_nonblock(available) + chunks.append(chunk) + read_count += 1 + print(f' [Python] Read chunk {read_count}: {len(chunk)} bytes') + else: + # Yield to event loop while waiting for Erlang to write more + await asyncio.sleep(0.005) + + return b''.join(chunks) + +def run_async_reader(buf): + '''Run async reader.''' + loop = asyncio.new_event_loop() + try: + return loop.run_until_complete(async_buffer_reader(buf)) + finally: + loop.close() +">>), + + %% Spawn a process to stream data from Erlang + spawn_link(fun() -> + Chunks = [ + <<"Hello from Erlang!">>, + <<" Streaming chunk 2.">>, + <<" Final chunk 3.">> + ], + lists:foreach(fun(Chunk) -> + timer:sleep(30), %% Simulate network delay + io:format(" [Erlang] Writing: ~p~n", [Chunk]), + ok = py_buffer:write(Buf, Chunk) + end, Chunks), + timer:sleep(10), + io:format(" [Erlang] Closing buffer (EOF)~n"), + ok = py_buffer:close(Buf), + Self ! writer_done + end), + + %% Python reads asynchronously while Erlang writes + io:format(" Starting async read while Erlang streams...~n"), + {ok, Result} = py:eval(Ctx, <<"run_async_reader(buf)">>, #{<<"buf">> => Buf}), + + %% Wait for writer + receive writer_done -> ok after 2000 -> ok end, + + io:format(" Final result: ~p~n~n", [Result]), + ok. diff --git a/test/py_buffer_SUITE.erl b/test/py_buffer_SUITE.erl index 1e7ef01..6a4a490 100644 --- a/test/py_buffer_SUITE.erl +++ b/test/py_buffer_SUITE.erl @@ -26,7 +26,9 @@ closed_buffer_test/1, empty_buffer_test/1, pass_to_python_test/1, - gc_refcount_test/1 + gc_refcount_test/1, + nonblock_read_test/1, + asyncio_read_test/1 ]). all() -> [ @@ -42,7 +44,9 @@ all() -> [ closed_buffer_test, empty_buffer_test, pass_to_python_test, - gc_refcount_test + gc_refcount_test, + nonblock_read_test, + asyncio_read_test ]. init_per_suite(Config) -> @@ -381,3 +385,124 @@ def test_multiple_views(): {ok, true} = py:eval(Ctx, <<"True">>), ok. + +%% @doc Test non-blocking read methods for async I/O +nonblock_read_test(_Config) -> + Ctx = py:context(1), + ok = py:exec(Ctx, <<"from erlang import PyBuffer">>), + + %% Test read_nonblock returns available data immediately + ok = py:exec(Ctx, <<" +buf = PyBuffer._test_create(b'hello world') +">>), + + %% readable_amount should return 11 (length of 'hello world') + {ok, 11} = py:eval(Ctx, <<"buf.readable_amount()">>), + + %% at_eof should be False (buffer not closed yet) + {ok, false} = py:eval(Ctx, <<"buf.at_eof()">>), + + %% read_nonblock should return available data + {ok, <<"hello">>} = py:eval(Ctx, <<"buf.read_nonblock(5)">>), + + %% readable_amount should now be 6 + {ok, 6} = py:eval(Ctx, <<"buf.readable_amount()">>), + + %% read_nonblock with no size returns all remaining + {ok, <<" world">>} = py:eval(Ctx, <<"buf.read_nonblock()">>), + + %% readable_amount should be 0 now + {ok, 0} = py:eval(Ctx, <<"buf.readable_amount()">>), + + %% read_nonblock on empty buffer returns empty bytes (not blocking) + {ok, <<>>} = py:eval(Ctx, <<"buf.read_nonblock()">>), + + %% Close buffer and check at_eof + ok = py:exec(Ctx, <<"buf.close()">>), + {ok, true} = py:eval(Ctx, <<"buf.at_eof()">>), + + %% Test async I/O pattern simulation + ok = py:exec(Ctx, <<" +def async_read_simulation(): + '''Simulate async I/O read pattern.''' + buf = PyBuffer._test_create(b'chunk1chunk2chunk3') + buf.close() # EOF + + chunks = [] + while not buf.at_eof(): + available = buf.readable_amount() + if available > 0: + # Read in chunks of 6 + chunk = buf.read_nonblock(6) + chunks.append(chunk) + else: + # Would yield to event loop here in real async code + break + + return chunks +">>), + + {ok, [<<"chunk1">>, <<"chunk2">>, <<"chunk3">>]} = + py:eval(Ctx, <<"async_read_simulation()">>), + + ok. + +%% @doc Test PyBuffer with asyncio - Erlang fills buffer while Python reads +asyncio_read_test(_Config) -> + %% Create a buffer that Erlang will fill + {ok, Buf} = py_buffer:new(), + + Ctx = py:context(1), + Self = self(), + + %% Define async reader in Python + ok = py:exec(Ctx, <<" +import asyncio + +async def async_buffer_reader(buf): + '''Read from buffer asynchronously as Erlang fills it.''' + chunks = [] + + while not buf.at_eof(): + available = buf.readable_amount() + if available > 0: + chunk = buf.read_nonblock(available) + chunks.append(chunk) + else: + # Yield to event loop - Erlang is still writing + await asyncio.sleep(0.005) + + return b''.join(chunks) + +def run_async_reader(buf): + '''Run async reader.''' + loop = asyncio.new_event_loop() + try: + return loop.run_until_complete(async_buffer_reader(buf)) + finally: + loop.close() +">>), + + %% Spawn a process to write data with delays (simulating streaming) + spawn_link(fun() -> + timer:sleep(10), + ok = py_buffer:write(Buf, <<"chunk1:">>), + timer:sleep(20), + ok = py_buffer:write(Buf, <<"chunk2:">>), + timer:sleep(20), + ok = py_buffer:write(Buf, <<"chunk3">>), + timer:sleep(10), + ok = py_buffer:close(Buf), + Self ! writer_done + end), + + %% Read asynchronously while Erlang writes + {ok, Result} = py:eval(Ctx, <<"run_async_reader(buf)">>, #{<<"buf">> => Buf}), + + %% Wait for writer to finish + receive writer_done -> ok after 1000 -> ok end, + + %% Verify we got all the data + <<"chunk1:chunk2:chunk3">> = Result, + + ok. From c744ef4bd1eb4f27161e382c5e817630a3a0f6af Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Sat, 14 Mar 2026 21:26:15 +0100 Subject: [PATCH 3/6] Fix memory leaks, deadlock risk, and integer overflow in PyBuffer Memory leaks: - Py_BuildValue("()") in readlines and iternext was never DECREF'd - Now uses PyTuple_New(0) with proper cleanup Deadlock prevention: - read/readline/read_nonblock now copy data while holding mutex, then release mutex before calling Python APIs (PyBytes_FromStringAndSize) - This avoids holding mutex while reacquiring GIL, which could deadlock with other functions (find, readable_amount, etc.) that hold GIL while acquiring mutex Integer overflow: - Added SIZE_MAX check before computing required capacity in py_buffer_write --- c_src/py_buffer.c | 145 ++++++++++++++++++++++++++++++++++++---------- 1 file changed, 114 insertions(+), 31 deletions(-) diff --git a/c_src/py_buffer.c b/c_src/py_buffer.c index 8a22174..a037054 100644 --- a/c_src/py_buffer.c +++ b/c_src/py_buffer.c @@ -37,6 +37,8 @@ #include "py_buffer.h" #include #include +#include +#include /* Portable memmem fallback for systems that don't have it. * memmem is available on: Linux (glibc), macOS, FreeBSD >= 13 @@ -141,6 +143,12 @@ int py_buffer_write(py_buffer_resource_t *buf, const unsigned char *data, size_t return -1; } + /* Check for overflow */ + if (size > SIZE_MAX - buf->write_pos) { + pthread_mutex_unlock(&buf->mutex); + return -1; + } + /* Check if we need to grow the buffer */ size_t required = buf->write_pos + size; if (required > buf->capacity) { @@ -310,13 +318,17 @@ static PyObject *PyBuffer_read(PyBufferObject *self, PyObject *args) { /* Read all available (or wait for EOF if not all data yet) */ if (!buf->eof && buf->content_length > 0 && buf->write_pos < (size_t)buf->content_length) { - /* Wait for all data */ + /* Wait for all data - release mutex before reacquiring GIL to avoid deadlock */ + pthread_mutex_unlock(&buf->mutex); + Py_BEGIN_ALLOW_THREADS + pthread_mutex_lock(&buf->mutex); while (buf->write_pos < (size_t)buf->content_length && !buf->eof && !buf->closed) { pthread_cond_wait(&buf->data_ready, &buf->mutex); } Py_END_ALLOW_THREADS + available = buf->write_pos - buf->read_pos; } to_read = available; @@ -324,14 +336,30 @@ static PyObject *PyBuffer_read(PyBufferObject *self, PyObject *args) { to_read = (available < (size_t)size) ? available : (size_t)size; } - /* Zero-copy: create bytes directly from buffer data */ - PyObject *result = PyBytes_FromStringAndSize( - (char *)buf->data + buf->read_pos, to_read); - - buf->read_pos += to_read; + /* Copy data while holding mutex, then release before creating PyBytes */ + unsigned char *local_copy = NULL; + if (to_read > 0) { + local_copy = (unsigned char *)malloc(to_read); + if (local_copy == NULL) { + pthread_mutex_unlock(&buf->mutex); + PyErr_NoMemory(); + return NULL; + } + memcpy(local_copy, buf->data + buf->read_pos, to_read); + buf->read_pos += to_read; + } pthread_mutex_unlock(&buf->mutex); + /* Create PyBytes without holding mutex */ + PyObject *result; + if (to_read > 0) { + result = PyBytes_FromStringAndSize((char *)local_copy, to_read); + free(local_copy); + } else { + result = PyBytes_FromStringAndSize("", 0); + } + return result; } @@ -352,6 +380,8 @@ static PyObject *PyBuffer_read_nonblock(PyBufferObject *self, PyObject *args) { } py_buffer_resource_t *buf = self->resource; + unsigned char *local_copy = NULL; + size_t to_read = 0; pthread_mutex_lock(&buf->mutex); @@ -359,21 +389,37 @@ static PyObject *PyBuffer_read_nonblock(PyBufferObject *self, PyObject *args) { size_t available = buf->write_pos - buf->read_pos; /* Determine how much to read */ - size_t to_read; if (size < 0) { to_read = available; } else { to_read = (available < (size_t)size) ? available : (size_t)size; } - /* Create result bytes */ - PyObject *result = PyBytes_FromStringAndSize( - (char *)buf->data + buf->read_pos, to_read); - - buf->read_pos += to_read; + /* Copy data while holding mutex */ + if (to_read > 0) { + local_copy = (unsigned char *)malloc(to_read); + if (local_copy != NULL) { + memcpy(local_copy, buf->data + buf->read_pos, to_read); + buf->read_pos += to_read; + } + } pthread_mutex_unlock(&buf->mutex); + /* Create PyBytes without holding mutex */ + if (local_copy == NULL && to_read > 0) { + PyErr_NoMemory(); + return NULL; + } + + PyObject *result; + if (to_read > 0) { + result = PyBytes_FromStringAndSize((char *)local_copy, to_read); + free(local_copy); + } else { + result = PyBytes_FromStringAndSize("", 0); + } + return result; } @@ -430,10 +476,11 @@ static PyObject *PyBuffer_readline(PyBufferObject *self, PyObject *args) { } py_buffer_resource_t *buf = self->resource; + unsigned char *local_copy = NULL; + size_t copy_len = 0; Py_BEGIN_ALLOW_THREADS pthread_mutex_lock(&buf->mutex); - Py_END_ALLOW_THREADS /* Search for newline in available data */ while (true) { @@ -452,34 +499,54 @@ static PyObject *PyBuffer_readline(PyBufferObject *self, PyObject *args) { const unsigned char *newline = find_newline(start, search_len); if (newline != NULL) { - /* Found newline - return line including newline */ - size_t line_len = (newline - start) + 1; - PyObject *result = PyBytes_FromStringAndSize((char *)start, line_len); - buf->read_pos += line_len; - pthread_mutex_unlock(&buf->mutex); - return result; + /* Found newline - copy line including newline */ + copy_len = (newline - start) + 1; + local_copy = (unsigned char *)malloc(copy_len); + if (local_copy != NULL) { + memcpy(local_copy, start, copy_len); + buf->read_pos += copy_len; + } + break; } /* No newline found - check if we should return what we have */ if (buf->eof || (size > 0 && available >= (size_t)size)) { - size_t to_read = (size > 0 && (size_t)size < available) - ? (size_t)size : available; - PyObject *result = PyBytes_FromStringAndSize((char *)start, to_read); - buf->read_pos += to_read; - pthread_mutex_unlock(&buf->mutex); - return result; + copy_len = (size > 0 && (size_t)size < available) + ? (size_t)size : available; + local_copy = (unsigned char *)malloc(copy_len); + if (local_copy != NULL) { + memcpy(local_copy, start, copy_len); + buf->read_pos += copy_len; + } + break; } } else if (buf->eof || buf->closed) { /* No more data coming */ - pthread_mutex_unlock(&buf->mutex); - return PyBytes_FromStringAndSize("", 0); + break; } /* Wait for more data */ - Py_BEGIN_ALLOW_THREADS pthread_cond_wait(&buf->data_ready, &buf->mutex); - Py_END_ALLOW_THREADS } + + pthread_mutex_unlock(&buf->mutex); + Py_END_ALLOW_THREADS + + /* Create PyBytes without holding mutex */ + if (local_copy == NULL && copy_len > 0) { + PyErr_NoMemory(); + return NULL; + } + + PyObject *result; + if (copy_len > 0) { + result = PyBytes_FromStringAndSize((char *)local_copy, copy_len); + free(local_copy); + } else { + result = PyBytes_FromStringAndSize("", 0); + } + + return result; } /** @@ -496,9 +563,16 @@ static PyObject *PyBuffer_readlines(PyBufferObject *self, PyObject *args) { Py_ssize_t total_size = 0; + PyObject *empty_args = PyTuple_New(0); + if (empty_args == NULL) { + Py_DECREF(lines); + return NULL; + } + while (true) { - PyObject *line = PyBuffer_readline(self, Py_BuildValue("()")); + PyObject *line = PyBuffer_readline(self, empty_args); if (line == NULL) { + Py_DECREF(empty_args); Py_DECREF(lines); return NULL; } @@ -511,6 +585,7 @@ static PyObject *PyBuffer_readlines(PyBufferObject *self, PyObject *args) { if (PyList_Append(lines, line) < 0) { Py_DECREF(line); + Py_DECREF(empty_args); Py_DECREF(lines); return NULL; } @@ -524,6 +599,7 @@ static PyObject *PyBuffer_readlines(PyBufferObject *self, PyObject *args) { } } + Py_DECREF(empty_args); return lines; } @@ -710,7 +786,14 @@ static PyObject *PyBuffer_iter(PyObject *self) { } static PyObject *PyBuffer_iternext(PyBufferObject *self) { - PyObject *line = PyBuffer_readline(self, Py_BuildValue("()")); + PyObject *empty_args = PyTuple_New(0); + if (empty_args == NULL) { + return NULL; + } + + PyObject *line = PyBuffer_readline(self, empty_args); + Py_DECREF(empty_args); + if (line == NULL) { return NULL; } From f1d8b8131ec78f034b7e65c09d5c86a0aee5c9f5 Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Sat, 14 Mar 2026 21:27:47 +0100 Subject: [PATCH 4/6] Add PyBuffer implementation files - c_src/py_buffer.h: Header with resource struct and function declarations - src/py_buffer.erl: Erlang API module (new/0,1, write/2, close/1) - c_src/py_convert.c: Auto-conversion of buffer refs to PyBuffer objects - c_src/py_nif.c: NIF registration and resource type initialization - src/py_nif.erl: NIF function exports --- c_src/py_buffer.h | 188 +++++++++++++++++++++++++++++++++++++++++++++ c_src/py_convert.c | 8 ++ c_src/py_nif.c | 33 +++++++- src/py_buffer.erl | 107 ++++++++++++++++++++++++++ src/py_nif.erl | 42 +++++++++- 5 files changed, 376 insertions(+), 2 deletions(-) create mode 100644 c_src/py_buffer.h create mode 100644 src/py_buffer.erl diff --git a/c_src/py_buffer.h b/c_src/py_buffer.h new file mode 100644 index 0000000..0dfbb2c --- /dev/null +++ b/c_src/py_buffer.h @@ -0,0 +1,188 @@ +/* + * Copyright 2026 Benoit Chesneau + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * @file py_buffer.h + * @brief Zero-copy WSGI input buffer support + * @author Benoit Chesneau + * + * This module provides a PyBuffer Python type that wraps a NIF-allocated + * buffer resource and exposes it via the buffer protocol. Erlang can write + * HTTP request body chunks to the buffer while Python reads them with + * file-like methods (read, readline, readlines) or direct buffer access. + * + * The buffer supports blocking reads that release the GIL while waiting + * for data from Erlang. + * + * Key features: + * - Buffer protocol (memoryview(buf) works for zero-copy access) + * - File-like interface (read, readline, readlines, seek, tell) + * - Blocking reads with GIL released (uses pthread_cond) + * - Iterator protocol for line-by-line reading + */ + +#ifndef PY_BUFFER_H +#define PY_BUFFER_H + +#include +#include +#include +#include + +/* ============================================================================ + * Configuration + * ============================================================================ */ + +/** + * @def PY_BUFFER_DEFAULT_CAPACITY + * @brief Default buffer capacity when content_length is unknown (chunked) + */ +#define PY_BUFFER_DEFAULT_CAPACITY 65536 + +/** + * @def PY_BUFFER_GROW_FACTOR + * @brief Growth factor when buffer needs to expand + */ +#define PY_BUFFER_GROW_FACTOR 2 + +/* ============================================================================ + * Buffer Resource Type + * ============================================================================ */ + +/** + * @brief Resource type for zero-copy input buffers + */ +extern ErlNifResourceType *PY_BUFFER_RESOURCE_TYPE; + +/** + * @struct py_buffer_resource_t + * @brief NIF resource that holds streaming input buffer data + * + * The buffer is written by Erlang (producer) and read by Python (consumer). + * Uses pthread mutex/cond for thread-safe blocking reads. + */ +typedef struct { + unsigned char *data; /**< Buffer data */ + size_t capacity; /**< Allocated capacity */ + size_t write_pos; /**< Current write position (producer) */ + size_t read_pos; /**< Current read position (consumer) */ + ssize_t content_length; /**< Expected total size, -1 for chunked */ + pthread_mutex_t mutex; /**< Mutex for thread-safe access */ + pthread_cond_t data_ready; /**< Condition for blocking reads */ + bool eof; /**< End of data flag (close called) */ + bool closed; /**< Buffer closed flag */ + int view_count; /**< Active Python buffer view count */ +} py_buffer_resource_t; + +/* ============================================================================ + * Python Type + * ============================================================================ */ + +/** + * @brief The PyBuffer Python type object + */ +extern PyTypeObject PyBufferType; + +/** + * @struct PyBufferObject + * @brief Python object wrapping a py_buffer resource + * + * Provides file-like interface and buffer protocol for zero-copy access. + */ +typedef struct { + PyObject_HEAD + py_buffer_resource_t *resource; /**< NIF resource (we hold a reference) */ + void *resource_ref; /**< For releasing the resource */ +} PyBufferObject; + +/* ============================================================================ + * Function Declarations - NIF Resource Management + * ============================================================================ */ + +/** + * @brief Allocate a new buffer resource + * + * @param content_length Expected size, or -1 for chunked encoding + * @return New resource, or NULL on error + */ +py_buffer_resource_t *py_buffer_alloc(ssize_t content_length); + +/** + * @brief Resource destructor + */ +void py_buffer_resource_dtor(ErlNifEnv *env, void *obj); + +/** + * @brief Write data to the buffer (Erlang producer side) + * + * Appends data to the buffer, expanding if necessary. + * Signals waiting readers when data is available. + * + * @param buf Buffer resource + * @param data Data to write + * @param size Size of data + * @return 0 on success, -1 on error (buffer closed or alloc failure) + */ +int py_buffer_write(py_buffer_resource_t *buf, const unsigned char *data, size_t size); + +/** + * @brief Close the buffer (Erlang producer side) + * + * Sets EOF flag and wakes up any waiting readers. + * + * @param buf Buffer resource + */ +void py_buffer_close(py_buffer_resource_t *buf); + +/* ============================================================================ + * Function Declarations - Python Type + * ============================================================================ */ + +/** + * @brief Initialize the PyBuffer type + * + * Must be called during Python initialization with the GIL held. + * + * @return 0 on success, -1 on error + */ +int PyBuffer_init_type(void); + +/** + * @brief Register PyBuffer with erlang module + * + * Makes PyBuffer accessible from Python. + * + * @return 0 on success, -1 on error + * + * @pre GIL must be held + * @pre PyBuffer_init_type() must have been called + * @pre erlang module must exist + */ +int PyBuffer_register_with_module(void); + +/** + * @brief Create a PyBuffer from a NIF resource + * + * @param resource The buffer resource + * @param resource_ref Resource reference (for enif_release_resource) + * @return New PyBuffer object, or NULL on error + * + * @pre GIL must be held + */ +PyObject *PyBuffer_from_resource(py_buffer_resource_t *resource, + void *resource_ref); + +#endif /* PY_BUFFER_H */ diff --git a/c_src/py_convert.c b/c_src/py_convert.c index 8f50b01..a0ccd48 100644 --- a/c_src/py_convert.c +++ b/c_src/py_convert.c @@ -595,6 +595,14 @@ static PyObject *term_to_py(ErlNifEnv *env, ERL_NIF_TERM term) { return capsule; } + /* Check for py_buffer resource - wrap in PyBufferObject for WSGI input */ + py_buffer_resource_t *pybuf; + if (enif_get_resource(env, term, PY_BUFFER_RESOURCE_TYPE, (void **)&pybuf)) { + /* Wrap the buffer resource in a PyBufferObject. + * PyBuffer_from_resource increments the resource refcount. */ + return PyBuffer_from_resource(pybuf, pybuf); + } + /* Fallback: return None for unknown types */ Py_RETURN_NONE; } diff --git a/c_src/py_nif.c b/c_src/py_nif.c index d59ce6e..1757657 100644 --- a/c_src/py_nif.c +++ b/c_src/py_nif.c @@ -41,6 +41,7 @@ #include "py_wsgi.h" #include "py_event_loop.h" #include "py_channel.h" +#include "py_buffer.h" /* ============================================================================ * Global state definitions @@ -301,6 +302,7 @@ static int is_inline_schedule_marker(PyObject *obj); #include "py_subinterp_thread.c" #include "py_reactor_buffer.c" #include "py_channel.c" +#include "py_buffer.c" /* ============================================================================ * Resource callbacks @@ -1170,6 +1172,20 @@ static ERL_NIF_TERM nif_py_init(ErlNifEnv *env, int argc, const ERL_NIF_TERM arg return make_error(env, "reactor_buffer_register_failed"); } + /* Initialize PyBuffer Python type for zero-copy WSGI input */ + if (PyBuffer_init_type() < 0) { + Py_Finalize(); + atomic_store(&g_runtime_state, PY_STATE_STOPPED); + return make_error(env, "py_buffer_init_failed"); + } + + /* Register PyBuffer type with erlang module */ + if (PyBuffer_register_with_module() < 0) { + Py_Finalize(); + atomic_store(&g_runtime_state, PY_STATE_STOPPED); + return make_error(env, "py_buffer_register_failed"); + } + /* Create a default event loop so Python asyncio always has one available */ if (create_default_event_loop(env) < 0) { Py_Finalize(); @@ -4797,6 +4813,16 @@ static int load(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info) { return -1; } + /* PyBuffer resource type for zero-copy WSGI input */ + PY_BUFFER_RESOURCE_TYPE = enif_open_resource_type( + env, NULL, "py_buffer", + py_buffer_resource_dtor, + ERL_NIF_RT_CREATE | ERL_NIF_RT_TAKEOVER, NULL); + + if (PY_BUFFER_RESOURCE_TYPE == NULL) { + return -1; + } + /* Initialize channel module atoms */ if (channel_init(env) < 0) { return -1; @@ -5059,7 +5085,12 @@ static ErlNifFunc nif_funcs[] = { {"channel_info", 1, nif_channel_info, 0}, {"channel_wait", 3, nif_channel_wait, 0}, {"channel_cancel_wait", 2, nif_channel_cancel_wait, 0}, - {"channel_register_sync_waiter", 1, nif_channel_register_sync_waiter, 0} + {"channel_register_sync_waiter", 1, nif_channel_register_sync_waiter, 0}, + + /* PyBuffer API - zero-copy WSGI input */ + {"py_buffer_create", 1, nif_py_buffer_create, 0}, + {"py_buffer_write", 2, nif_py_buffer_write, 0}, + {"py_buffer_close", 1, nif_py_buffer_close, 0} }; ERL_NIF_INIT(py_nif, nif_funcs, load, NULL, upgrade, unload) diff --git a/src/py_buffer.erl b/src/py_buffer.erl new file mode 100644 index 0000000..edab628 --- /dev/null +++ b/src/py_buffer.erl @@ -0,0 +1,107 @@ +%% Copyright 2026 Benoit Chesneau +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +%%% @doc Zero-copy WSGI input buffer for streaming HTTP bodies. +%%% +%%% This module provides a buffer that can be written by Erlang and read +%%% by Python with zero-copy semantics. The buffer is suitable for use +%%% as wsgi.input in WSGI applications. +%%% +%%% == Usage == +%%% +%%% ``` +%%% %% Create a buffer (chunked encoding - unknown size) +%%% {ok, Buf} = py_buffer:new(), +%%% +%%% %% Or with known content length +%%% {ok, Buf} = py_buffer:new(1024), +%%% +%%% %% Write HTTP body chunks +%%% ok = py_buffer:write(Buf, <<"chunk1">>), +%%% ok = py_buffer:write(Buf, <<"chunk2">>), +%%% +%%% %% Signal end of data +%%% ok = py_buffer:close(Buf), +%%% +%%% %% Pass to Python WSGI - buffer is automatically converted +%%% py_context:call(Ctx, <<"myapp">>, <<"handle">>, +%%% [#{<<"wsgi.input">> => Buf}], #{}). +%%% ''' +%%% +%%% On the Python side, the buffer provides a file-like interface: +%%% +%%% ```python +%%% def handle(environ): +%%% body = environ['wsgi.input'].read() # Blocks until data ready +%%% # Or use readline(), readlines(), iteration +%%% for line in environ['wsgi.input']: +%%% process(line) +%%% ''' +%%% +%%% @end +-module(py_buffer). + +-export([ + new/0, + new/1, + write/2, + close/1 +]). + +%% @doc Create a new buffer for chunked/streaming data. +%% +%% Use this when the content length is unknown (chunked transfer encoding). +%% The buffer will grow as needed. +%% +%% @returns {ok, BufferRef} | {error, Reason} +-spec new() -> {ok, reference()} | {error, term()}. +new() -> + py_nif:py_buffer_create(undefined). + +%% @doc Create a new buffer with known content length. +%% +%% Pre-allocates the buffer to the specified size for better performance. +%% +%% @param ContentLength Expected total size in bytes, or `undefined' for chunked +%% @returns {ok, BufferRef} | {error, Reason} +-spec new(non_neg_integer() | undefined) -> {ok, reference()} | {error, term()}. +new(undefined) -> + py_nif:py_buffer_create(undefined); +new(ContentLength) when is_integer(ContentLength), ContentLength >= 0 -> + py_nif:py_buffer_create(ContentLength). + +%% @doc Write data to the buffer. +%% +%% Appends data to the buffer and signals any waiting Python readers. +%% This function is safe to call from multiple processes, but typically +%% only one process should write to a buffer. +%% +%% @param Ref Buffer reference from new/0 or new/1 +%% @param Data Binary data to append +%% @returns ok | {error, Reason} +-spec write(reference(), binary()) -> ok | {error, term()}. +write(Ref, Data) when is_binary(Data) -> + py_nif:py_buffer_write(Ref, Data). + +%% @doc Close the buffer (signal end of data). +%% +%% Sets the EOF flag and wakes up any Python threads waiting for data. +%% After calling close, no more data can be written, and Python's read() +%% will return any remaining buffered data followed by empty bytes. +%% +%% @param Ref Buffer reference +%% @returns ok +-spec close(reference()) -> ok. +close(Ref) -> + py_nif:py_buffer_close(Ref). diff --git a/src/py_nif.erl b/src/py_nif.erl index 7b17f9b..d4582f4 100644 --- a/src/py_nif.erl +++ b/src/py_nif.erl @@ -215,7 +215,11 @@ channel_info/1, channel_wait/3, channel_cancel_wait/2, - channel_register_sync_waiter/1 + channel_register_sync_waiter/1, + %% PyBuffer API - zero-copy WSGI input + py_buffer_create/1, + py_buffer_write/2, + py_buffer_close/1 ]). -on_load(load_nif/0). @@ -1826,3 +1830,39 @@ channel_cancel_wait(_ChannelRef, _CallbackId) -> -spec channel_register_sync_waiter(reference()) -> ok | has_data | {error, term()}. channel_register_sync_waiter(_ChannelRef) -> ?NIF_STUB. + +%%% ============================================================================ +%%% PyBuffer API - Zero-copy WSGI Input +%%% ============================================================================ + +%% @doc Create a new PyBuffer resource. +%% +%% Creates a buffer that can be written by Erlang and read by Python +%% with zero-copy semantics. The buffer is suitable for use as wsgi.input. +%% +%% @param ContentLength Expected size in bytes, or `undefined' for chunked +%% @returns {ok, BufferRef} | {error, Reason} +-spec py_buffer_create(non_neg_integer() | undefined) -> {ok, reference()} | {error, term()}. +py_buffer_create(_ContentLength) -> + ?NIF_STUB. + +%% @doc Write binary data to the buffer. +%% +%% Appends data to the buffer and signals any waiting Python readers. +%% +%% @param BufferRef Buffer reference from py_buffer_create/1 +%% @param Data Binary data to append +%% @returns ok | {error, Reason} +-spec py_buffer_write(reference(), binary()) -> ok | {error, term()}. +py_buffer_write(_BufferRef, _Data) -> + ?NIF_STUB. + +%% @doc Close the buffer (signal EOF). +%% +%% Sets the EOF flag and wakes up any Python threads waiting for data. +%% +%% @param BufferRef Buffer reference +%% @returns ok +-spec py_buffer_close(reference()) -> ok. +py_buffer_close(_BufferRef) -> + ?NIF_STUB. From 992072b2c5221c5f8a5cd527244b9179e456da6d Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Sat, 14 Mar 2026 21:33:03 +0100 Subject: [PATCH 5/6] Register PyBuffer in subinterpreters PyBuffer was only registered in the main interpreter, causing ImportError when using subinterpreters (Python 3.12+). Add PyBuffer_register_with_module() calls in: - py_subinterp_pool.c: for shared-GIL subinterpreter pool - py_subinterp_thread.c: for OWN_GIL subinterpreter threads --- c_src/py_subinterp_pool.c | 7 +++++++ c_src/py_subinterp_thread.c | 7 +++++++ 2 files changed, 14 insertions(+) diff --git a/c_src/py_subinterp_pool.c b/c_src/py_subinterp_pool.c index 1ad5632..75fdcaa 100644 --- a/c_src/py_subinterp_pool.c +++ b/c_src/py_subinterp_pool.c @@ -26,6 +26,7 @@ #include "py_subinterp_pool.h" #include "py_reactor_buffer.h" +#include "py_buffer.h" #include #ifdef HAVE_SUBINTERPRETERS @@ -170,6 +171,12 @@ int subinterp_pool_init(int size) { /* Non-fatal - ReactorBuffer just won't be available */ } + /* Register PyBuffer with erlang module in this subinterpreter */ + if (PyBuffer_register_with_module() < 0) { + PyErr_Clear(); + /* Non-fatal - PyBuffer just won't be available */ + } + /* Import erlang module into globals */ PyObject *erlang_module = PyImport_ImportModule("erlang"); if (erlang_module != NULL) { diff --git a/c_src/py_subinterp_thread.c b/c_src/py_subinterp_thread.c index 831cf6c..2c08719 100644 --- a/c_src/py_subinterp_thread.c +++ b/c_src/py_subinterp_thread.c @@ -25,6 +25,7 @@ #include "py_subinterp_thread.h" #include "py_nif.h" +#include "py_buffer.h" #include #include #include @@ -315,6 +316,12 @@ static void *worker_thread_main(void *arg) { fprintf(stderr, "worker %d: failed to create erlang module\n", w->worker_id); PyErr_Clear(); /* Continue without erlang module - callbacks won't work */ + } else { + /* Register PyBuffer with erlang module in this subinterpreter */ + if (PyBuffer_register_with_module() < 0) { + PyErr_Clear(); + /* Non-fatal - PyBuffer just won't be available */ + } } /* Release the subinterpreter's GIL (we'll acquire it per-request) */ From 04fa8982edd329dcb1f59d2e4499dd479e2a69e5 Mon Sep 17 00:00:00 2001 From: Benoit Chesneau Date: Sat, 14 Mar 2026 22:19:50 +0100 Subject: [PATCH 6/6] Add PyBuffer vs Channel benchmark --- examples/bench_py_buffer.erl | 265 +++++++++++++++++++++++++++++++++++ 1 file changed, 265 insertions(+) create mode 100644 examples/bench_py_buffer.erl diff --git a/examples/bench_py_buffer.erl b/examples/bench_py_buffer.erl new file mode 100644 index 0000000..6e3f7e5 --- /dev/null +++ b/examples/bench_py_buffer.erl @@ -0,0 +1,265 @@ +#!/usr/bin/env escript +%% -*- erlang -*- +%%! -pa _build/default/lib/erlang_python/ebin + +%%% @doc Benchmark script comparing PyBuffer with Channel API. +%%% +%%% Run with: +%%% rebar3 compile && escript examples/bench_py_buffer.erl + +-mode(compile). + +main(_Args) -> + io:format("~n========================================~n"), + io:format("PyBuffer vs Channel Benchmark~n"), + io:format("========================================~n~n"), + + %% Start the application + {ok, _} = application:ensure_all_started(erlang_python), + {ok, _} = py:start_contexts(), + ok = py_channel:register_callbacks(), + + %% Print system info + io:format("System Information:~n"), + io:format(" Erlang/OTP: ~s~n", [erlang:system_info(otp_release)]), + {ok, PyVer} = py:version(), + io:format(" Python: ~s~n", [PyVer]), + io:format("~n"), + + %% Run benchmarks + run_write_throughput_bench(), + run_read_throughput_bench(), + run_buffer_vs_channel_bench(), + run_streaming_bench(), + + io:format("~n========================================~n"), + io:format("Benchmark Complete~n"), + io:format("========================================~n"), + + halt(0). + +run_write_throughput_bench() -> + io:format("~n--- PyBuffer Write Throughput ---~n"), + io:format("Writes per batch: 1000~n~n"), + + Sizes = [64, 256, 1024, 4096, 16384], + + io:format("~8s | ~12s | ~12s~n", + ["Size", "Writes/sec", "MB/sec"]), + io:format("~s~n", [string:copies("-", 38)]), + + lists:foreach(fun(Size) -> + {ok, Buf} = py_buffer:new(), + Data = binary:copy(<<0>>, Size), + Iterations = 1000, + + Start = erlang:monotonic_time(microsecond), + lists:foreach(fun(_) -> + ok = py_buffer:write(Buf, Data) + end, lists:seq(1, Iterations)), + End = erlang:monotonic_time(microsecond), + + TotalTime = (End - Start) / 1000000, + WriteRate = Iterations / TotalTime, + MBPerSec = (Iterations * Size) / TotalTime / 1048576, + + io:format("~8B | ~12w | ~12.2f~n", + [Size, round(WriteRate), MBPerSec]), + + py_buffer:close(Buf) + end, Sizes), + ok. + +run_read_throughput_bench() -> + io:format("~n--- PyBuffer Read Throughput (Python) ---~n"), + io:format("Reads per batch: 1000~n~n"), + + Ctx = py:context(1), + ok = py:exec(Ctx, <<" +import time + +def bench_buffer_reads(buf, size, iterations): + '''Read from buffer and measure throughput.''' + # Read all data + start = time.perf_counter() + total_read = 0 + while True: + data = buf.read(size) + if not data: + break + total_read += len(data) + elapsed = time.perf_counter() - start + + if elapsed > 0: + reads_per_sec = (total_read / size) / elapsed + mb_per_sec = total_read / elapsed / 1048576 + else: + reads_per_sec = 0 + mb_per_sec = 0 + + return { + 'total_bytes': total_read, + 'reads_per_sec': reads_per_sec, + 'mb_per_sec': mb_per_sec, + } +">>), + + Sizes = [64, 256, 1024, 4096, 16384], + + io:format("~8s | ~12s | ~12s~n", + ["Size", "Reads/sec", "MB/sec"]), + io:format("~s~n", [string:copies("-", 38)]), + + lists:foreach(fun(Size) -> + Iterations = 1000, + {ok, Buf} = py_buffer:new(Size * Iterations), + Data = binary:copy(<<0>>, Size), + + %% Fill the buffer + lists:foreach(fun(_) -> + ok = py_buffer:write(Buf, Data) + end, lists:seq(1, Iterations)), + ok = py_buffer:close(Buf), + + %% Measure Python read performance + {ok, Result} = py:eval(Ctx, <<"bench_buffer_reads(buf, size, iterations)">>, + #{<<"buf">> => Buf, <<"size">> => Size, <<"iterations">> => Iterations}), + + ReadsPerSec = maps:get(<<"reads_per_sec">>, Result), + MBPerSec = maps:get(<<"mb_per_sec">>, Result), + + io:format("~8B | ~12w | ~12.2f~n", + [Size, round(ReadsPerSec), MBPerSec]) + end, Sizes), + ok. + +run_buffer_vs_channel_bench() -> + io:format("~n--- PyBuffer vs Channel Comparison ---~n"), + io:format("Pattern: Erlang write/send -> Python read/receive~n"), + io:format("Iterations: 1000~n~n"), + + Ctx = py:context(1), + ok = py:exec(Ctx, <<" +from erlang.channel import Channel + +def read_buffer_all(buf): + '''Read entire buffer.''' + return buf.read() + +def recv_channel(ch_ref): + '''Receive from channel.''' + ch = Channel(ch_ref) + return ch.try_receive() +">>), + + Sizes = [64, 256, 1024, 4096, 16384], + + io:format("~8s | ~14s | ~14s | ~8s~n", + ["Size", "Buffer (ops/s)", "Channel (ops/s)", "Ratio"]), + io:format("~s~n", [string:copies("-", 52)]), + + lists:foreach(fun(Size) -> + Data = binary:copy(<<0>>, Size), + Iterations = 1000, + + %% Benchmark PyBuffer: Erlang write -> Python read + BufStart = erlang:monotonic_time(microsecond), + lists:foreach(fun(_) -> + {ok, Buf} = py_buffer:new(Size), + ok = py_buffer:write(Buf, Data), + ok = py_buffer:close(Buf), + {ok, _} = py:eval(Ctx, <<"read_buffer_all(buf)">>, #{<<"buf">> => Buf}) + end, lists:seq(1, Iterations)), + BufEnd = erlang:monotonic_time(microsecond), + BufTime = (BufEnd - BufStart) / 1000000, + BufOpsPerSec = Iterations / BufTime, + + %% Benchmark Channel: Erlang send -> Python receive + {ok, Ch} = py_channel:new(), + ChanStart = erlang:monotonic_time(microsecond), + lists:foreach(fun(_) -> + ok = py_channel:send(Ch, Data), + {ok, _} = py:eval(Ctx, <<"recv_channel(ch)">>, #{<<"ch">> => Ch}) + end, lists:seq(1, Iterations)), + ChanEnd = erlang:monotonic_time(microsecond), + ChanTime = (ChanEnd - ChanStart) / 1000000, + ChanOpsPerSec = Iterations / ChanTime, + py_channel:close(Ch), + + Ratio = BufOpsPerSec / ChanOpsPerSec, + io:format("~8B | ~14w | ~14w | ~.2fx~n", + [Size, round(BufOpsPerSec), round(ChanOpsPerSec), Ratio]) + end, Sizes), + ok. + +run_streaming_bench() -> + io:format("~n--- Streaming Comparison (chunked transfer) ---~n"), + io:format("Total data: 1MB, varying chunk sizes~n~n"), + + Ctx = py:context(1), + ok = py:exec(Ctx, <<" +from erlang.channel import Channel + +def stream_read_buffer(buf): + '''Stream read entire buffer.''' + total = 0 + while True: + chunk = buf.read(8192) # 8KB reads + if not chunk: + break + total += len(chunk) + return total + +def stream_recv_channel(ch_ref, num_chunks): + '''Stream receive from channel.''' + ch = Channel(ch_ref) + total = 0 + for _ in range(num_chunks): + msg = ch.try_receive() + if msg is None: + break + total += len(msg) + return total +">>), + + ChunkSizes = [256, 1024, 4096, 16384, 65536], + TotalBytes = 1048576, % 1MB + + io:format("~10s | ~14s | ~14s | ~8s~n", + ["Chunk", "Buffer (MB/s)", "Channel (MB/s)", "Ratio"]), + io:format("~s~n", [string:copies("-", 54)]), + + lists:foreach(fun(ChunkSize) -> + NumChunks = TotalBytes div ChunkSize, + Chunk = binary:copy(<<0>>, ChunkSize), + + %% Benchmark PyBuffer streaming (Erlang write -> Python read) + {ok, Buf} = py_buffer:new(TotalBytes), + BufStart = erlang:monotonic_time(microsecond), + lists:foreach(fun(_) -> + ok = py_buffer:write(Buf, Chunk) + end, lists:seq(1, NumChunks)), + ok = py_buffer:close(Buf), + {ok, _} = py:eval(Ctx, <<"stream_read_buffer(buf)">>, #{<<"buf">> => Buf}), + BufEnd = erlang:monotonic_time(microsecond), + BufTime = (BufEnd - BufStart) / 1000000, + BufMBPerSec = (TotalBytes / 1048576) / BufTime, + + %% Benchmark Channel streaming (Erlang send -> Python receive) + {ok, Ch} = py_channel:new(), + ChanStart = erlang:monotonic_time(microsecond), + lists:foreach(fun(_) -> + ok = py_channel:send(Ch, Chunk) + end, lists:seq(1, NumChunks)), + {ok, _} = py:eval(Ctx, <<"stream_recv_channel(ch, num_chunks)">>, + #{<<"ch">> => Ch, <<"num_chunks">> => NumChunks}), + ChanEnd = erlang:monotonic_time(microsecond), + ChanTime = (ChanEnd - ChanStart) / 1000000, + ChanMBPerSec = (TotalBytes / 1048576) / ChanTime, + py_channel:close(Ch), + + Ratio = BufMBPerSec / ChanMBPerSec, + io:format("~10B | ~14.2f | ~14.2f | ~.2fx~n", + [ChunkSize, BufMBPerSec, ChanMBPerSec, Ratio]) + end, ChunkSizes), + ok.