Skip to content

Commit a494b14

Browse files
authored
Merge pull request #17 from benoitc/feature/zero-copy-reactor-buffer
Add zero-copy ReactorBuffer for reactor read path
2 parents 79e30b0 + d8e1a5a commit a494b14

File tree

12 files changed

+1983
-14
lines changed

12 files changed

+1983
-14
lines changed

c_src/py_event_loop.c

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636

3737
#include "py_nif.h"
3838
#include "py_event_loop.h"
39+
#include "py_reactor_buffer.h"
3940

4041
/* ============================================================================
4142
* Global State
@@ -3065,7 +3066,8 @@ ERL_NIF_TERM nif_get_fd_from_resource(ErlNifEnv *env, int argc,
30653066
/**
30663067
* reactor_on_read_ready(ContextRef, Fd) -> {ok, Action} | {error, Reason}
30673068
*
3068-
* Call Python's erlang_reactor.on_read_ready(fd) and return the action.
3069+
* Read data from fd and call Python's erlang_reactor.on_read_ready(fd, data).
3070+
* Uses zero-copy ReactorBuffer for efficient data transfer.
30693071
* Action is one of: <<"continue">>, <<"write_pending">>, <<"close">>
30703072
*
30713073
* This is a dirty NIF since it acquires the GIL and calls Python.
@@ -3084,24 +3086,54 @@ ERL_NIF_TERM nif_reactor_on_read_ready(ErlNifEnv *env, int argc,
30843086
return make_error(env, "invalid_fd");
30853087
}
30863088

3089+
/* Read data from fd into buffer resource (before acquiring GIL) */
3090+
reactor_buffer_resource_t *buffer = NULL;
3091+
size_t bytes_read = 0;
3092+
int read_result = reactor_buffer_read_fd(fd, REACTOR_MAX_READ_SIZE,
3093+
&buffer, &bytes_read);
3094+
3095+
if (read_result < 0) {
3096+
return make_error(env, "read_failed");
3097+
}
3098+
3099+
if (read_result == 1 || (read_result == 0 && buffer == NULL)) {
3100+
/* EOF or would block with no data */
3101+
return enif_make_tuple2(env, ATOM_OK,
3102+
enif_make_atom(env, read_result == 1 ? "close" : "continue"));
3103+
}
3104+
30873105
/* Acquire context (handles both worker mode and subinterpreter mode) */
30883106
py_context_guard_t guard = py_context_acquire(ctx);
30893107
if (!guard.acquired) {
3108+
enif_release_resource(buffer);
30903109
return make_error(env, "acquire_failed");
30913110
}
30923111

3112+
/* Create ReactorBuffer Python object wrapping the resource */
3113+
PyObject *py_buffer = ReactorBuffer_from_resource(buffer, buffer);
3114+
/* Release our reference - Python now owns the only reference */
3115+
enif_release_resource(buffer);
3116+
3117+
if (py_buffer == NULL) {
3118+
PyErr_Clear();
3119+
py_context_release(&guard);
3120+
return make_error(env, "buffer_creation_failed");
3121+
}
3122+
30933123
/* Import erlang.reactor module */
30943124
PyObject *reactor_module = PyImport_ImportModule("erlang.reactor");
30953125
if (reactor_module == NULL) {
30963126
PyErr_Clear();
3127+
Py_DECREF(py_buffer);
30973128
py_context_release(&guard);
30983129
return make_error(env, "import_erlang_reactor_failed");
30993130
}
31003131

3101-
/* Call on_read_ready(fd) */
3132+
/* Call on_read_ready(fd, data) with the buffer */
31023133
PyObject *result = PyObject_CallMethod(reactor_module, "on_read_ready",
3103-
"i", fd);
3134+
"iO", fd, py_buffer);
31043135
Py_DECREF(reactor_module);
3136+
Py_DECREF(py_buffer);
31053137

31063138
if (result == NULL) {
31073139
PyErr_Clear();

c_src/py_nif.c

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ static ERL_NIF_TERM build_suspended_result(ErlNifEnv *env, suspended_state_t *su
169169
#include "py_worker_pool.c"
170170
#include "py_subinterp_pool.c"
171171
#include "py_subinterp_thread.c"
172+
#include "py_reactor_buffer.c"
172173

173174
/* ============================================================================
174175
* Resource callbacks
@@ -634,6 +635,20 @@ static ERL_NIF_TERM nif_py_init(ErlNifEnv *env, int argc, const ERL_NIF_TERM arg
634635
return make_error(env, "wsgi_scope_init_failed");
635636
}
636637

638+
/* Initialize ReactorBuffer Python type for zero-copy read handling */
639+
if (ReactorBuffer_init_type() < 0) {
640+
Py_Finalize();
641+
atomic_store(&g_runtime_state, PY_STATE_STOPPED);
642+
return make_error(env, "reactor_buffer_init_failed");
643+
}
644+
645+
/* Register ReactorBuffer type with erlang module for testing access */
646+
if (ReactorBuffer_register_with_reactor() < 0) {
647+
Py_Finalize();
648+
atomic_store(&g_runtime_state, PY_STATE_STOPPED);
649+
return make_error(env, "reactor_buffer_register_failed");
650+
}
651+
637652
/* Create a default event loop so Python asyncio always has one available */
638653
if (create_default_event_loop(env) < 0) {
639654
Py_Finalize();
@@ -3648,6 +3663,12 @@ static int load(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info) {
36483663
lazy_headers_resource_dtor,
36493664
ERL_NIF_RT_CREATE | ERL_NIF_RT_TAKEOVER, NULL);
36503665

3666+
/* Reactor buffer resource type for zero-copy read handling */
3667+
REACTOR_BUFFER_RESOURCE_TYPE = enif_open_resource_type(
3668+
env, NULL, "reactor_buffer",
3669+
reactor_buffer_resource_dtor,
3670+
ERL_NIF_RT_CREATE | ERL_NIF_RT_TAKEOVER, NULL);
3671+
36513672
/* Initialize event loop module */
36523673
if (event_loop_init(env) < 0) {
36533674
return -1;

0 commit comments

Comments
 (0)