diff --git a/c_src/py_event_loop.c b/c_src/py_event_loop.c index c4b4bb5..5b7480b 100644 --- a/c_src/py_event_loop.c +++ b/c_src/py_event_loop.c @@ -3337,6 +3337,237 @@ ERL_NIF_TERM nif_reactor_close_fd(ErlNifEnv *env, int argc, return ATOM_OK; } +/* ============================================================================ + * Direct FD Operations + * + * These functions provide direct FD read/write for proxy/bridge use cases. + * ============================================================================ */ + +/** + * fd_read(Fd, Size) -> {ok, Data} | {error, Reason} + * + * Read up to Size bytes from a file descriptor. + */ +ERL_NIF_TERM nif_fd_read(ErlNifEnv *env, int argc, + const ERL_NIF_TERM argv[]) { + (void)argc; + + int fd; + if (!enif_get_int(env, argv[0], &fd)) { + return make_error(env, "invalid_fd"); + } + + unsigned long size; + if (!enif_get_ulong(env, argv[1], &size)) { + return make_error(env, "invalid_size"); + } + + if (size > 1024 * 1024) { + size = 1024 * 1024; + } + + ErlNifBinary bin; + if (!enif_alloc_binary(size, &bin)) { + return make_error(env, "alloc_failed"); + } + + ssize_t n = read(fd, bin.data, bin.size); + if (n < 0) { + enif_release_binary(&bin); + if (errno == EAGAIN || errno == EWOULDBLOCK) { + return make_error(env, "eagain"); + } + return make_error(env, strerror(errno)); + } + + if ((size_t)n < bin.size) { + enif_realloc_binary(&bin, n); + } + + ERL_NIF_TERM data = enif_make_binary(env, &bin); + return enif_make_tuple2(env, ATOM_OK, data); +} + +/** + * fd_write(Fd, Data) -> {ok, Written} | {error, Reason} + * + * Write data to a file descriptor. + */ +ERL_NIF_TERM nif_fd_write(ErlNifEnv *env, int argc, + const ERL_NIF_TERM argv[]) { + (void)argc; + + int fd; + if (!enif_get_int(env, argv[0], &fd)) { + return make_error(env, "invalid_fd"); + } + + ErlNifBinary bin; + if (!enif_inspect_binary(env, argv[1], &bin)) { + return make_error(env, "invalid_data"); + } + + ssize_t n = write(fd, bin.data, bin.size); + if (n < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + return make_error(env, "eagain"); + } + return make_error(env, strerror(errno)); + } + + return enif_make_tuple2(env, ATOM_OK, enif_make_long(env, n)); +} + +/** + * fd_select_read(Fd) -> {ok, FdRef} | {error, Reason} + * + * Register FD for read selection. Caller receives {select, FdRef, Ref, ready_input}. + * Returns a resource reference that must be kept alive while monitoring. + */ +ERL_NIF_TERM nif_fd_select_read(ErlNifEnv *env, int argc, + const ERL_NIF_TERM argv[]) { + (void)argc; + + int fd; + if (!enif_get_int(env, argv[0], &fd)) { + return make_error(env, "invalid_fd"); + } + + ErlNifPid caller_pid; + if (!enif_self(env, &caller_pid)) { + return make_error(env, "no_caller_pid"); + } + + /* Allocate fd resource */ + fd_resource_t *fd_res = enif_alloc_resource(FD_RESOURCE_TYPE, + sizeof(fd_resource_t)); + if (fd_res == NULL) { + return make_error(env, "alloc_failed"); + } + + fd_res->fd = fd; + fd_res->read_callback_id = 0; + fd_res->write_callback_id = 0; + fd_res->owner_pid = caller_pid; + fd_res->reader_active = true; + fd_res->writer_active = false; + fd_res->loop = NULL; + atomic_store(&fd_res->closing_state, FD_STATE_OPEN); + fd_res->monitor_active = false; + fd_res->owns_fd = false; /* Caller owns the fd */ + + int ret = enif_select(env, (ErlNifEvent)fd, ERL_NIF_SELECT_READ, + fd_res, &caller_pid, enif_make_ref(env)); + if (ret < 0) { + enif_release_resource(fd_res); + return make_error(env, "select_failed"); + } + + ERL_NIF_TERM fd_term = enif_make_resource(env, fd_res); + enif_release_resource(fd_res); /* Term now holds the reference */ + + return enif_make_tuple2(env, ATOM_OK, fd_term); +} + +/** + * fd_select_write(Fd) -> {ok, FdRef} | {error, Reason} + * + * Register FD for write selection. Caller receives {select, FdRef, Ref, ready_output}. + * Returns a resource reference that must be kept alive while monitoring. + */ +ERL_NIF_TERM nif_fd_select_write(ErlNifEnv *env, int argc, + const ERL_NIF_TERM argv[]) { + (void)argc; + + int fd; + if (!enif_get_int(env, argv[0], &fd)) { + return make_error(env, "invalid_fd"); + } + + ErlNifPid caller_pid; + if (!enif_self(env, &caller_pid)) { + return make_error(env, "no_caller_pid"); + } + + /* Allocate fd resource */ + fd_resource_t *fd_res = enif_alloc_resource(FD_RESOURCE_TYPE, + sizeof(fd_resource_t)); + if (fd_res == NULL) { + return make_error(env, "alloc_failed"); + } + + fd_res->fd = fd; + fd_res->read_callback_id = 0; + fd_res->write_callback_id = 0; + fd_res->owner_pid = caller_pid; + fd_res->reader_active = false; + fd_res->writer_active = true; + fd_res->loop = NULL; + atomic_store(&fd_res->closing_state, FD_STATE_OPEN); + fd_res->monitor_active = false; + fd_res->owns_fd = false; /* Caller owns the fd */ + + int ret = enif_select(env, (ErlNifEvent)fd, ERL_NIF_SELECT_WRITE, + fd_res, &caller_pid, enif_make_ref(env)); + if (ret < 0) { + enif_release_resource(fd_res); + return make_error(env, "select_failed"); + } + + ERL_NIF_TERM fd_term = enif_make_resource(env, fd_res); + enif_release_resource(fd_res); /* Term now holds the reference */ + + return enif_make_tuple2(env, ATOM_OK, fd_term); +} + +/** + * socketpair() -> {ok, {Fd1, Fd2}} | {error, Reason} + * + * Create a Unix socketpair for bidirectional communication. + */ +ERL_NIF_TERM nif_socketpair(ErlNifEnv *env, int argc, + const ERL_NIF_TERM argv[]) { + (void)argc; + (void)argv; + + int fds[2]; + if (socketpair(AF_UNIX, SOCK_STREAM, 0, fds) < 0) { + return make_error(env, strerror(errno)); + } + + int flags1 = fcntl(fds[0], F_GETFL, 0); + int flags2 = fcntl(fds[1], F_GETFL, 0); + fcntl(fds[0], F_SETFL, flags1 | O_NONBLOCK); + fcntl(fds[1], F_SETFL, flags2 | O_NONBLOCK); + + ERL_NIF_TERM fd_tuple = enif_make_tuple2(env, + enif_make_int(env, fds[0]), + enif_make_int(env, fds[1])); + + return enif_make_tuple2(env, ATOM_OK, fd_tuple); +} + +/** + * fd_close(Fd) -> ok | {error, Reason} + * + * Close a raw file descriptor (integer). + */ +ERL_NIF_TERM nif_fd_close(ErlNifEnv *env, int argc, + const ERL_NIF_TERM argv[]) { + (void)argc; + + int fd; + if (!enif_get_int(env, argv[0], &fd)) { + return make_error(env, "invalid_fd"); + } + + if (close(fd) < 0) { + return make_error(env, strerror(errno)); + } + + return ATOM_OK; +} + /* ============================================================================ * Python Module: py_event_loop * diff --git a/c_src/py_event_loop.h b/c_src/py_event_loop.h index c13231b..083031d 100644 --- a/c_src/py_event_loop.h +++ b/c_src/py_event_loop.h @@ -867,4 +867,56 @@ ERL_NIF_TERM nif_reactor_init_connection(ErlNifEnv *env, int argc, ERL_NIF_TERM nif_reactor_close_fd(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]); +/* ============================================================================ + * Direct FD Operations + * ============================================================================ */ + +/** + * @brief Read from file descriptor + * + * NIF: fd_read(Fd, Size) -> {ok, Data} | {error, Reason} + */ +ERL_NIF_TERM nif_fd_read(ErlNifEnv *env, int argc, + const ERL_NIF_TERM argv[]); + +/** + * @brief Write to file descriptor + * + * NIF: fd_write(Fd, Data) -> {ok, Written} | {error, Reason} + */ +ERL_NIF_TERM nif_fd_write(ErlNifEnv *env, int argc, + const ERL_NIF_TERM argv[]); + +/** + * @brief Register FD for read selection + * + * NIF: fd_select_read(Fd) -> ok | {error, Reason} + */ +ERL_NIF_TERM nif_fd_select_read(ErlNifEnv *env, int argc, + const ERL_NIF_TERM argv[]); + +/** + * @brief Register FD for write selection + * + * NIF: fd_select_write(Fd) -> ok | {error, Reason} + */ +ERL_NIF_TERM nif_fd_select_write(ErlNifEnv *env, int argc, + const ERL_NIF_TERM argv[]); + +/** + * @brief Create Unix socketpair + * + * NIF: socketpair() -> {ok, {Fd1, Fd2}} | {error, Reason} + */ +ERL_NIF_TERM nif_socketpair(ErlNifEnv *env, int argc, + const ERL_NIF_TERM argv[]); + +/** + * @brief Close raw file descriptor (integer) + * + * NIF: fd_close(Fd) -> ok | {error, Reason} + */ +ERL_NIF_TERM nif_fd_close(ErlNifEnv *env, int argc, + const ERL_NIF_TERM argv[]); + #endif /* PY_EVENT_LOOP_H */ diff --git a/c_src/py_nif.c b/c_src/py_nif.c index 1649a20..e5c7ae8 100644 --- a/c_src/py_nif.c +++ b/c_src/py_nif.c @@ -3876,7 +3876,15 @@ static ErlNifFunc nif_funcs[] = { {"reactor_on_read_ready", 2, nif_reactor_on_read_ready, ERL_NIF_DIRTY_JOB_CPU_BOUND}, {"reactor_on_write_ready", 2, nif_reactor_on_write_ready, ERL_NIF_DIRTY_JOB_CPU_BOUND}, {"reactor_init_connection", 3, nif_reactor_init_connection, ERL_NIF_DIRTY_JOB_CPU_BOUND}, - {"reactor_close_fd", 2, nif_reactor_close_fd, 0} + {"reactor_close_fd", 2, nif_reactor_close_fd, 0}, + + /* Direct FD operations */ + {"fd_read", 2, nif_fd_read, 0}, + {"fd_write", 2, nif_fd_write, 0}, + {"fd_select_read", 1, nif_fd_select_read, 0}, + {"fd_select_write", 1, nif_fd_select_write, 0}, + {"fd_close", 1, nif_fd_close, 0}, + {"socketpair", 0, nif_socketpair, 0} }; ERL_NIF_INIT(py_nif, nif_funcs, load, NULL, upgrade, unload) diff --git a/src/py_nif.erl b/src/py_nif.erl index d3facaf..56d6be6 100644 --- a/src/py_nif.erl +++ b/src/py_nif.erl @@ -188,7 +188,14 @@ reactor_on_read_ready/2, reactor_on_write_ready/2, reactor_init_connection/3, - reactor_close_fd/2 + reactor_close_fd/2, + %% Direct FD operations + fd_read/2, + fd_write/2, + fd_select_read/1, + fd_select_write/1, + fd_close/1, + socketpair/0 ]). -on_load(load_nif/0). @@ -1530,3 +1537,64 @@ reactor_init_connection(_ContextRef, _Fd, _ClientInfo) -> -spec reactor_close_fd(reference(), reference()) -> ok | {error, term()}. reactor_close_fd(_ContextRef, _FdRef) -> ?NIF_STUB. + +%%% ============================================================================ +%%% Direct FD Operations +%%% ============================================================================ + +%% @doc Read up to Size bytes from a file descriptor. +%% +%% @param Fd File descriptor +%% @param Size Maximum bytes to read +%% @returns {ok, Data} | {error, Reason} +-spec fd_read(integer(), non_neg_integer()) -> {ok, binary()} | {error, term()}. +fd_read(_Fd, _Size) -> + ?NIF_STUB. + +%% @doc Write data to a file descriptor. +%% +%% @param Fd File descriptor +%% @param Data Binary data to write +%% @returns {ok, Written} | {error, Reason} +-spec fd_write(integer(), binary()) -> {ok, non_neg_integer()} | {error, term()}. +fd_write(_Fd, _Data) -> + ?NIF_STUB. + +%% @doc Register FD for read selection. +%% +%% Caller will receive {select, FdRef, Ref, ready_input} when readable. +%% The returned FdRef must be kept alive while monitoring. +%% +%% @param Fd File descriptor +%% @returns {ok, FdRef} | {error, Reason} +-spec fd_select_read(integer()) -> {ok, reference()} | {error, term()}. +fd_select_read(_Fd) -> + ?NIF_STUB. + +%% @doc Register FD for write selection. +%% +%% Caller will receive {select, FdRef, Ref, ready_output} when writable. +%% The returned FdRef must be kept alive while monitoring. +%% +%% @param Fd File descriptor +%% @returns {ok, FdRef} | {error, Reason} +-spec fd_select_write(integer()) -> {ok, reference()} | {error, term()}. +fd_select_write(_Fd) -> + ?NIF_STUB. + +%% @doc Close a raw file descriptor. +%% +%% @param Fd File descriptor to close +%% @returns ok | {error, Reason} +-spec fd_close(integer()) -> ok | {error, term()}. +fd_close(_Fd) -> + ?NIF_STUB. + +%% @doc Create a Unix socketpair for bidirectional communication. +%% +%% Both FDs are set to non-blocking mode. +%% +%% @returns {ok, {Fd1, Fd2}} | {error, Reason} +-spec socketpair() -> {ok, {integer(), integer()}} | {error, term()}. +socketpair() -> + ?NIF_STUB. diff --git a/test/py_fd_ops_SUITE.erl b/test/py_fd_ops_SUITE.erl new file mode 100644 index 0000000..c4b1873 --- /dev/null +++ b/test/py_fd_ops_SUITE.erl @@ -0,0 +1,144 @@ +%% Copyright 2026 Benoit Chesneau +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. + +%%% @doc Test suite for direct FD operations (fd_read, fd_write, fd_close, socketpair). +-module(py_fd_ops_SUITE). + +-include_lib("common_test/include/ct.hrl"). + +-export([ + all/0, + init_per_suite/1, + end_per_suite/1 +]). + +-export([ + socketpair_test/1, + fd_read_write_test/1, + fd_close_test/1, + fd_select_test/1 +]). + +all() -> + [ + socketpair_test, + fd_read_write_test, + fd_close_test, + fd_select_test + ]. + +init_per_suite(Config) -> + application:ensure_all_started(erlang_python), + Config. + +end_per_suite(_Config) -> + ok. + +%%% ============================================================================ +%%% Test Cases +%%% ============================================================================ + +%% @doc Test socketpair creation +socketpair_test(_Config) -> + {ok, {Fd1, Fd2}} = py_nif:socketpair(), + true = is_integer(Fd1), + true = is_integer(Fd2), + true = Fd1 >= 0, + true = Fd2 >= 0, + true = Fd1 =/= Fd2, + %% Clean up + ok = py_nif:fd_close(Fd1), + ok = py_nif:fd_close(Fd2), + ok. + +%% @doc Test fd_read and fd_write +fd_read_write_test(_Config) -> + {ok, {Fd1, Fd2}} = py_nif:socketpair(), + + %% Write to Fd1, read from Fd2 + TestData = <<"hello world">>, + {ok, Written} = py_nif:fd_write(Fd1, TestData), + Written = byte_size(TestData), + + %% Small delay to ensure data is available + timer:sleep(10), + + {ok, ReadData} = py_nif:fd_read(Fd2, 1024), + TestData = ReadData, + + %% Write to Fd2, read from Fd1 + TestData2 = <<"response data">>, + {ok, Written2} = py_nif:fd_write(Fd2, TestData2), + Written2 = byte_size(TestData2), + + timer:sleep(10), + + {ok, ReadData2} = py_nif:fd_read(Fd1, 1024), + TestData2 = ReadData2, + + %% Clean up + ok = py_nif:fd_close(Fd1), + ok = py_nif:fd_close(Fd2), + ok. + +%% @doc Test fd_close +fd_close_test(_Config) -> + {ok, {Fd1, Fd2}} = py_nif:socketpair(), + + %% Close should succeed + ok = py_nif:fd_close(Fd1), + ok = py_nif:fd_close(Fd2), + + %% Reading from closed fd should fail + {error, _} = py_nif:fd_read(Fd1, 1024), + ok. + +%% @doc Test fd_select_read and fd_select_write +fd_select_test(_Config) -> + {ok, {Fd1, Fd2}} = py_nif:socketpair(), + + %% Register for read on Fd2 - returns {ok, FdRef} + {ok, ReadRef} = py_nif:fd_select_read(Fd2), + true = is_reference(ReadRef), + + %% Write data to Fd1 (should trigger read ready on Fd2) + TestData = <<"select test">>, + {ok, _} = py_nif:fd_write(Fd1, TestData), + + %% Should receive select message with the FdRef + receive + {select, ReadRef, _Ref, ready_input} -> + ok + after 1000 -> + ct:fail(select_timeout) + end, + + %% Read the data + {ok, ReadData} = py_nif:fd_read(Fd2, 1024), + TestData = ReadData, + + %% Test write select - sockets are usually immediately writable + {ok, WriteRef} = py_nif:fd_select_write(Fd1), + true = is_reference(WriteRef), + receive + {select, WriteRef, _Ref2, ready_output} -> + ok + after 1000 -> + ct:fail(write_select_timeout) + end, + + %% Clean up + ok = py_nif:fd_close(Fd1), + ok = py_nif:fd_close(Fd2), + ok.