Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
231 changes: 231 additions & 0 deletions c_src/py_event_loop.c
Original file line number Diff line number Diff line change
Expand Up @@ -3337,6 +3337,237 @@ ERL_NIF_TERM nif_reactor_close_fd(ErlNifEnv *env, int argc,
return ATOM_OK;
}

/* ============================================================================
* Direct FD Operations
*
* These functions provide direct FD read/write for proxy/bridge use cases.
* ============================================================================ */

/**
* fd_read(Fd, Size) -> {ok, Data} | {error, Reason}
*
* Read up to Size bytes from a file descriptor.
*/
ERL_NIF_TERM nif_fd_read(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]) {
(void)argc;

int fd;
if (!enif_get_int(env, argv[0], &fd)) {
return make_error(env, "invalid_fd");
}

unsigned long size;
if (!enif_get_ulong(env, argv[1], &size)) {
return make_error(env, "invalid_size");
}

if (size > 1024 * 1024) {
size = 1024 * 1024;
}

ErlNifBinary bin;
if (!enif_alloc_binary(size, &bin)) {
return make_error(env, "alloc_failed");
}

ssize_t n = read(fd, bin.data, bin.size);
if (n < 0) {
enif_release_binary(&bin);
if (errno == EAGAIN || errno == EWOULDBLOCK) {
return make_error(env, "eagain");
}
return make_error(env, strerror(errno));
}

if ((size_t)n < bin.size) {
enif_realloc_binary(&bin, n);
}

ERL_NIF_TERM data = enif_make_binary(env, &bin);
return enif_make_tuple2(env, ATOM_OK, data);
}

/**
* fd_write(Fd, Data) -> {ok, Written} | {error, Reason}
*
* Write data to a file descriptor.
*/
ERL_NIF_TERM nif_fd_write(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]) {
(void)argc;

int fd;
if (!enif_get_int(env, argv[0], &fd)) {
return make_error(env, "invalid_fd");
}

ErlNifBinary bin;
if (!enif_inspect_binary(env, argv[1], &bin)) {
return make_error(env, "invalid_data");
}

ssize_t n = write(fd, bin.data, bin.size);
if (n < 0) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
return make_error(env, "eagain");
}
return make_error(env, strerror(errno));
}

return enif_make_tuple2(env, ATOM_OK, enif_make_long(env, n));
}

/**
* fd_select_read(Fd) -> {ok, FdRef} | {error, Reason}
*
* Register FD for read selection. Caller receives {select, FdRef, Ref, ready_input}.
* Returns a resource reference that must be kept alive while monitoring.
*/
ERL_NIF_TERM nif_fd_select_read(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]) {
(void)argc;

int fd;
if (!enif_get_int(env, argv[0], &fd)) {
return make_error(env, "invalid_fd");
}

ErlNifPid caller_pid;
if (!enif_self(env, &caller_pid)) {
return make_error(env, "no_caller_pid");
}

/* Allocate fd resource */
fd_resource_t *fd_res = enif_alloc_resource(FD_RESOURCE_TYPE,
sizeof(fd_resource_t));
if (fd_res == NULL) {
return make_error(env, "alloc_failed");
}

fd_res->fd = fd;
fd_res->read_callback_id = 0;
fd_res->write_callback_id = 0;
fd_res->owner_pid = caller_pid;
fd_res->reader_active = true;
fd_res->writer_active = false;
fd_res->loop = NULL;
atomic_store(&fd_res->closing_state, FD_STATE_OPEN);
fd_res->monitor_active = false;
fd_res->owns_fd = false; /* Caller owns the fd */

int ret = enif_select(env, (ErlNifEvent)fd, ERL_NIF_SELECT_READ,
fd_res, &caller_pid, enif_make_ref(env));
if (ret < 0) {
enif_release_resource(fd_res);
return make_error(env, "select_failed");
}

ERL_NIF_TERM fd_term = enif_make_resource(env, fd_res);
enif_release_resource(fd_res); /* Term now holds the reference */

return enif_make_tuple2(env, ATOM_OK, fd_term);
}

/**
* fd_select_write(Fd) -> {ok, FdRef} | {error, Reason}
*
* Register FD for write selection. Caller receives {select, FdRef, Ref, ready_output}.
* Returns a resource reference that must be kept alive while monitoring.
*/
ERL_NIF_TERM nif_fd_select_write(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]) {
(void)argc;

int fd;
if (!enif_get_int(env, argv[0], &fd)) {
return make_error(env, "invalid_fd");
}

ErlNifPid caller_pid;
if (!enif_self(env, &caller_pid)) {
return make_error(env, "no_caller_pid");
}

/* Allocate fd resource */
fd_resource_t *fd_res = enif_alloc_resource(FD_RESOURCE_TYPE,
sizeof(fd_resource_t));
if (fd_res == NULL) {
return make_error(env, "alloc_failed");
}

fd_res->fd = fd;
fd_res->read_callback_id = 0;
fd_res->write_callback_id = 0;
fd_res->owner_pid = caller_pid;
fd_res->reader_active = false;
fd_res->writer_active = true;
fd_res->loop = NULL;
atomic_store(&fd_res->closing_state, FD_STATE_OPEN);
fd_res->monitor_active = false;
fd_res->owns_fd = false; /* Caller owns the fd */

int ret = enif_select(env, (ErlNifEvent)fd, ERL_NIF_SELECT_WRITE,
fd_res, &caller_pid, enif_make_ref(env));
if (ret < 0) {
enif_release_resource(fd_res);
return make_error(env, "select_failed");
}

ERL_NIF_TERM fd_term = enif_make_resource(env, fd_res);
enif_release_resource(fd_res); /* Term now holds the reference */

return enif_make_tuple2(env, ATOM_OK, fd_term);
}

/**
* socketpair() -> {ok, {Fd1, Fd2}} | {error, Reason}
*
* Create a Unix socketpair for bidirectional communication.
*/
ERL_NIF_TERM nif_socketpair(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]) {
(void)argc;
(void)argv;

int fds[2];
if (socketpair(AF_UNIX, SOCK_STREAM, 0, fds) < 0) {
return make_error(env, strerror(errno));
}

int flags1 = fcntl(fds[0], F_GETFL, 0);
int flags2 = fcntl(fds[1], F_GETFL, 0);
fcntl(fds[0], F_SETFL, flags1 | O_NONBLOCK);
fcntl(fds[1], F_SETFL, flags2 | O_NONBLOCK);

ERL_NIF_TERM fd_tuple = enif_make_tuple2(env,
enif_make_int(env, fds[0]),
enif_make_int(env, fds[1]));

return enif_make_tuple2(env, ATOM_OK, fd_tuple);
}

/**
* fd_close(Fd) -> ok | {error, Reason}
*
* Close a raw file descriptor (integer).
*/
ERL_NIF_TERM nif_fd_close(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]) {
(void)argc;

int fd;
if (!enif_get_int(env, argv[0], &fd)) {
return make_error(env, "invalid_fd");
}

if (close(fd) < 0) {
return make_error(env, strerror(errno));
}

return ATOM_OK;
}

/* ============================================================================
* Python Module: py_event_loop
*
Expand Down
52 changes: 52 additions & 0 deletions c_src/py_event_loop.h
Original file line number Diff line number Diff line change
Expand Up @@ -867,4 +867,56 @@ ERL_NIF_TERM nif_reactor_init_connection(ErlNifEnv *env, int argc,
ERL_NIF_TERM nif_reactor_close_fd(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]);

/* ============================================================================
* Direct FD Operations
* ============================================================================ */

/**
* @brief Read from file descriptor
*
* NIF: fd_read(Fd, Size) -> {ok, Data} | {error, Reason}
*/
ERL_NIF_TERM nif_fd_read(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]);

/**
* @brief Write to file descriptor
*
* NIF: fd_write(Fd, Data) -> {ok, Written} | {error, Reason}
*/
ERL_NIF_TERM nif_fd_write(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]);

/**
* @brief Register FD for read selection
*
* NIF: fd_select_read(Fd) -> ok | {error, Reason}
*/
ERL_NIF_TERM nif_fd_select_read(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]);

/**
* @brief Register FD for write selection
*
* NIF: fd_select_write(Fd) -> ok | {error, Reason}
*/
ERL_NIF_TERM nif_fd_select_write(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]);

/**
* @brief Create Unix socketpair
*
* NIF: socketpair() -> {ok, {Fd1, Fd2}} | {error, Reason}
*/
ERL_NIF_TERM nif_socketpair(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]);

/**
* @brief Close raw file descriptor (integer)
*
* NIF: fd_close(Fd) -> ok | {error, Reason}
*/
ERL_NIF_TERM nif_fd_close(ErlNifEnv *env, int argc,
const ERL_NIF_TERM argv[]);

#endif /* PY_EVENT_LOOP_H */
10 changes: 9 additions & 1 deletion c_src/py_nif.c
Original file line number Diff line number Diff line change
Expand Up @@ -3876,7 +3876,15 @@ static ErlNifFunc nif_funcs[] = {
{"reactor_on_read_ready", 2, nif_reactor_on_read_ready, ERL_NIF_DIRTY_JOB_CPU_BOUND},
{"reactor_on_write_ready", 2, nif_reactor_on_write_ready, ERL_NIF_DIRTY_JOB_CPU_BOUND},
{"reactor_init_connection", 3, nif_reactor_init_connection, ERL_NIF_DIRTY_JOB_CPU_BOUND},
{"reactor_close_fd", 2, nif_reactor_close_fd, 0}
{"reactor_close_fd", 2, nif_reactor_close_fd, 0},

/* Direct FD operations */
{"fd_read", 2, nif_fd_read, 0},
{"fd_write", 2, nif_fd_write, 0},
{"fd_select_read", 1, nif_fd_select_read, 0},
{"fd_select_write", 1, nif_fd_select_write, 0},
{"fd_close", 1, nif_fd_close, 0},
{"socketpair", 0, nif_socketpair, 0}
};

ERL_NIF_INIT(py_nif, nif_funcs, load, NULL, upgrade, unload)
70 changes: 69 additions & 1 deletion src/py_nif.erl
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,14 @@
reactor_on_read_ready/2,
reactor_on_write_ready/2,
reactor_init_connection/3,
reactor_close_fd/2
reactor_close_fd/2,
%% Direct FD operations
fd_read/2,
fd_write/2,
fd_select_read/1,
fd_select_write/1,
fd_close/1,
socketpair/0
]).

-on_load(load_nif/0).
Expand Down Expand Up @@ -1530,3 +1537,64 @@ reactor_init_connection(_ContextRef, _Fd, _ClientInfo) ->
-spec reactor_close_fd(reference(), reference()) -> ok | {error, term()}.
reactor_close_fd(_ContextRef, _FdRef) ->
?NIF_STUB.

%%% ============================================================================
%%% Direct FD Operations
%%% ============================================================================

%% @doc Read up to Size bytes from a file descriptor.
%%
%% @param Fd File descriptor
%% @param Size Maximum bytes to read
%% @returns {ok, Data} | {error, Reason}
-spec fd_read(integer(), non_neg_integer()) -> {ok, binary()} | {error, term()}.
fd_read(_Fd, _Size) ->
?NIF_STUB.

%% @doc Write data to a file descriptor.
%%
%% @param Fd File descriptor
%% @param Data Binary data to write
%% @returns {ok, Written} | {error, Reason}
-spec fd_write(integer(), binary()) -> {ok, non_neg_integer()} | {error, term()}.
fd_write(_Fd, _Data) ->
?NIF_STUB.

%% @doc Register FD for read selection.
%%
%% Caller will receive {select, FdRef, Ref, ready_input} when readable.
%% The returned FdRef must be kept alive while monitoring.
%%
%% @param Fd File descriptor
%% @returns {ok, FdRef} | {error, Reason}
-spec fd_select_read(integer()) -> {ok, reference()} | {error, term()}.
fd_select_read(_Fd) ->
?NIF_STUB.

%% @doc Register FD for write selection.
%%
%% Caller will receive {select, FdRef, Ref, ready_output} when writable.
%% The returned FdRef must be kept alive while monitoring.
%%
%% @param Fd File descriptor
%% @returns {ok, FdRef} | {error, Reason}
-spec fd_select_write(integer()) -> {ok, reference()} | {error, term()}.
fd_select_write(_Fd) ->
?NIF_STUB.

%% @doc Close a raw file descriptor.
%%
%% @param Fd File descriptor to close
%% @returns ok | {error, Reason}
-spec fd_close(integer()) -> ok | {error, term()}.
fd_close(_Fd) ->
?NIF_STUB.

%% @doc Create a Unix socketpair for bidirectional communication.
%%
%% Both FDs are set to non-blocking mode.
%%
%% @returns {ok, {Fd1, Fd2}} | {error, Reason}
-spec socketpair() -> {ok, {integer(), integer()}} | {error, term()}.
socketpair() ->
?NIF_STUB.
Loading