-
Notifications
You must be signed in to change notification settings - Fork 0
Implement aborting read/write #6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
05545b9
978e50d
d231f8f
8ea6960
314c2e6
ab70d94
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,125 @@ | ||
| #include "abort_registry.hpp" | ||
|
|
||
| #include <array> | ||
| #include <fcntl.h> | ||
| #include <mutex> | ||
| #include <unistd.h> | ||
| #include <unordered_map> | ||
|
|
||
| namespace cpp_bindings_linux::detail | ||
| { | ||
| namespace | ||
| { | ||
| auto makePipeNonBlockingCloexec(std::array<int, 2> &out_pipe) -> bool | ||
| { | ||
| if (::pipe2(out_pipe.data(), O_NONBLOCK | O_CLOEXEC) == 0) | ||
| { | ||
| return true; | ||
| } | ||
|
|
||
| if (::pipe(out_pipe.data()) != 0) | ||
| { | ||
| return false; | ||
| } | ||
|
|
||
| for (int index = 0; index < 2; ++index) | ||
| { | ||
| const auto pipe_index = static_cast<size_t>(index); | ||
| const int flags = ::fcntl(out_pipe[pipe_index], F_GETFL, 0); | ||
| if (flags >= 0) | ||
| { | ||
| (void)::fcntl(out_pipe[pipe_index], F_SETFL, flags | O_NONBLOCK); | ||
| } | ||
| const int fd_flags = ::fcntl(out_pipe[pipe_index], F_GETFD, 0); | ||
| if (fd_flags >= 0) | ||
| { | ||
| (void)::fcntl(out_pipe[pipe_index], F_SETFD, fd_flags | FD_CLOEXEC); | ||
| } | ||
| } | ||
| return true; | ||
| } | ||
|
|
||
| auto closeIfValid(int fd) -> void | ||
| { | ||
| if (fd >= 0) | ||
| { | ||
| (void)::close(fd); | ||
| } | ||
| } | ||
|
|
||
| std::mutex g_abort_mutex; | ||
| std::unordered_map<int, AbortPipes> g_abort_by_fd; | ||
| } // namespace | ||
|
|
||
| auto registerAbortPipesForFd(int fd) -> bool | ||
| { | ||
| if (fd < 0) | ||
| { | ||
| return false; | ||
| } | ||
|
|
||
| std::lock_guard<std::mutex> lock(g_abort_mutex); | ||
| if (g_abort_by_fd.contains(fd)) | ||
| { | ||
| return true; | ||
| } | ||
|
|
||
| std::array<int, 2> read_pipe = {-1, -1}; | ||
| std::array<int, 2> write_pipe = {-1, -1}; | ||
| if (!makePipeNonBlockingCloexec(read_pipe) || !makePipeNonBlockingCloexec(write_pipe)) | ||
| { | ||
| closeIfValid(read_pipe[0]); | ||
| closeIfValid(read_pipe[1]); | ||
| closeIfValid(write_pipe[0]); | ||
| closeIfValid(write_pipe[1]); | ||
| return false; | ||
| } | ||
|
|
||
| AbortPipes pipes; | ||
| pipes.read_abort_r = read_pipe[0]; | ||
| pipes.read_abort_w = read_pipe[1]; | ||
| pipes.write_abort_r = write_pipe[0]; | ||
| pipes.write_abort_w = write_pipe[1]; | ||
|
|
||
| g_abort_by_fd.emplace(fd, pipes); | ||
| return true; | ||
| } | ||
|
|
||
| auto unregisterAbortPipesForFd(int fd) -> void | ||
| { | ||
| if (fd < 0) | ||
| { | ||
| return; | ||
| } | ||
|
|
||
| std::lock_guard<std::mutex> lock(g_abort_mutex); | ||
| auto iter = g_abort_by_fd.find(fd); | ||
| if (iter == g_abort_by_fd.end()) | ||
| { | ||
| return; | ||
| } | ||
|
|
||
| closeIfValid(iter->second.read_abort_r); | ||
| closeIfValid(iter->second.read_abort_w); | ||
| closeIfValid(iter->second.write_abort_r); | ||
| closeIfValid(iter->second.write_abort_w); | ||
|
|
||
| g_abort_by_fd.erase(iter); | ||
| } | ||
|
|
||
| auto getAbortPipesForFd(int fd) -> std::optional<AbortPipes> | ||
| { | ||
| if (fd < 0) | ||
| { | ||
| return std::nullopt; | ||
| } | ||
|
|
||
| std::lock_guard<std::mutex> lock(g_abort_mutex); | ||
| auto iter = g_abort_by_fd.find(fd); | ||
| if (iter == g_abort_by_fd.end()) | ||
| { | ||
| return std::nullopt; | ||
| } | ||
| return iter->second; | ||
| } | ||
| } // namespace cpp_bindings_linux::detail |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,24 @@ | ||
| #pragma once | ||
|
|
||
| #include <optional> | ||
|
|
||
| namespace cpp_bindings_linux::detail | ||
| { | ||
| struct AbortPipes | ||
| { | ||
| int read_abort_r = -1; | ||
| int read_abort_w = -1; | ||
| int write_abort_r = -1; | ||
| int write_abort_w = -1; | ||
| }; | ||
|
|
||
| // Create (if needed) and register abort pipes for a serial FD. | ||
| // Returns true on success. | ||
| auto registerAbortPipesForFd(int fd) -> bool; | ||
|
|
||
| // Unregister and close any abort pipes for a serial FD. Safe to call multiple times. | ||
| auto unregisterAbortPipesForFd(int fd) -> void; | ||
|
|
||
| // Returns the abort pipes for a serial FD, or std::nullopt if none exist. | ||
| auto getAbortPipesForFd(int fd) -> std::optional<AbortPipes>; | ||
| } // namespace cpp_bindings_linux::detail |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,6 +2,7 @@ | |
|
|
||
| #include <cpp_core/status_codes.h> | ||
|
|
||
| #include <array> | ||
| #include <cerrno> | ||
| #include <poll.h> | ||
| #include <string> | ||
|
|
@@ -95,6 +96,62 @@ inline auto failErrno(Callback error_callback, cpp_core::StatusCodes code) -> Re | |
| return static_cast<Ret>(code); | ||
| } | ||
|
|
||
| // Drain any pending bytes from a non-blocking FD (used for abort pipes). | ||
| inline auto drainNonBlockingFd(int file_descriptor) -> void | ||
| { | ||
| if (file_descriptor < 0) | ||
| { | ||
| return; | ||
| } | ||
| std::array<unsigned char, 64> buf = {}; | ||
| for (;;) | ||
| { | ||
| const ssize_t num_read = ::read(file_descriptor, buf.data(), buf.size()); | ||
| if (num_read > 0) | ||
| { | ||
| continue; | ||
| } | ||
| if (num_read == 0) | ||
| { | ||
| return; | ||
| } | ||
| if (errno == EAGAIN || errno == EWOULDBLOCK) | ||
| { | ||
| return; | ||
| } | ||
| if (errno == EINTR) | ||
| { | ||
| continue; | ||
| } | ||
| return; | ||
| } | ||
| } | ||
|
|
||
| // Returns true if the abort FD is signaled (has at least one byte to read). If true, drains it. | ||
| inline auto consumeAbortIfSet(int abort_fd) -> bool | ||
| { | ||
| if (abort_fd < 0) | ||
| { | ||
| return false; | ||
| } | ||
| struct pollfd poll_fd = {}; | ||
| poll_fd.fd = abort_fd; | ||
| poll_fd.events = POLLIN; | ||
| poll_fd.revents = 0; | ||
|
|
||
| const int poll_result = poll(&poll_fd, 1, 0); | ||
| if (poll_result <= 0) | ||
| { | ||
| return false; | ||
| } | ||
| if ((poll_fd.revents & POLLIN) == 0) | ||
| { | ||
| return false; | ||
| } | ||
| drainNonBlockingFd(abort_fd); | ||
| return true; | ||
| } | ||
|
|
||
| // Poll helper used by read/write to implement timeouts. | ||
| // Returns: -1 on poll error, 0 on timeout/not-ready, 1 on ready. | ||
| inline auto waitFdReady(int file_descriptor, int timeout_ms, bool for_read) -> int | ||
|
|
@@ -123,4 +180,48 @@ inline auto waitFdReady(int file_descriptor, int timeout_ms, bool for_read) -> i | |
| } | ||
| return 0; | ||
| } | ||
|
|
||
| // Poll helper with an optional abort FD. | ||
| // Returns: -1 on poll error, 0 on timeout/not-ready, 1 on ready, 2 on abort. | ||
| inline auto waitFdReadyOrAbort(int file_descriptor, int abort_fd, int timeout_ms, bool for_read) -> int | ||
| { | ||
| std::array<struct pollfd, 2> fds = {}; | ||
| fds[0].fd = file_descriptor; | ||
| fds[0].events = for_read ? POLLIN : POLLOUT; | ||
| fds[0].revents = 0; | ||
|
|
||
| nfds_t nfds = 1; | ||
| if (abort_fd >= 0) | ||
| { | ||
| fds[1].fd = abort_fd; | ||
| fds[1].events = POLLIN; | ||
| fds[1].revents = 0; | ||
| nfds = 2; | ||
| } | ||
|
|
||
| const int poll_result = poll(fds.data(), nfds, timeout_ms); | ||
| if (poll_result < 0) | ||
| { | ||
| return -1; | ||
| } | ||
| if (poll_result == 0) | ||
| { | ||
| return 0; | ||
| } | ||
|
|
||
| if (abort_fd >= 0 && ((fds[1].revents & POLLIN) != 0)) | ||
| { | ||
| return 2; | ||
| } | ||
|
|
||
| if (for_read && ((fds[0].revents & POLLIN) != 0)) | ||
| { | ||
| return 1; | ||
| } | ||
| if (!for_read && ((fds[0].revents & POLLOUT) != 0)) | ||
| { | ||
| return 1; | ||
| } | ||
| return 0; | ||
|
Comment on lines
+212
to
+225
|
||
| } | ||
| } // namespace cpp_bindings_linux::detail | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,49 @@ | ||
| #include <cpp_core/interface/serial_abort_read.h> | ||
| #include <cpp_core/status_codes.h> | ||
|
|
||
| #include "detail/abort_registry.hpp" | ||
| #include "detail/posix_helpers.hpp" | ||
|
|
||
| #include <cerrno> | ||
| #include <limits> | ||
| #include <unistd.h> | ||
|
|
||
| extern "C" | ||
| { | ||
| MODULE_API auto serialAbortRead(int64_t handle, ErrorCallbackT error_callback) -> int | ||
| { | ||
| if (handle <= 0 || handle > std::numeric_limits<int>::max()) | ||
| { | ||
| return cpp_bindings_linux::detail::failMsg<int>(error_callback, cpp_core::StatusCodes::kInvalidHandleError, | ||
| "Invalid handle"); | ||
| } | ||
|
|
||
| const int fd = static_cast<int>(handle); | ||
| const auto pipes = cpp_bindings_linux::detail::getAbortPipesForFd(fd); | ||
| if (!pipes) | ||
| { | ||
| return cpp_bindings_linux::detail::failMsg<int>(error_callback, cpp_core::StatusCodes::kInvalidHandleError, | ||
| "Invalid handle"); | ||
| } | ||
|
|
||
| const unsigned char token = 1; | ||
| for (;;) | ||
| { | ||
| const ssize_t num_written = ::write(pipes->read_abort_w, &token, 1); | ||
| if (num_written == 1) | ||
| { | ||
| return static_cast<int>(cpp_core::StatusCodes::kSuccess); | ||
| } | ||
| if (num_written < 0 && errno == EINTR) | ||
| { | ||
| continue; | ||
| } | ||
| // Pipe already full -> treat as success (abort already requested). | ||
| if (num_written < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) | ||
| { | ||
| return static_cast<int>(cpp_core::StatusCodes::kSuccess); | ||
| } | ||
| return cpp_bindings_linux::detail::failErrno<int>(error_callback, cpp_core::StatusCodes::kAbortReadError); | ||
| } | ||
| } | ||
| } // extern "C" |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,49 @@ | ||
| #include <cpp_core/interface/serial_abort_write.h> | ||
| #include <cpp_core/status_codes.h> | ||
|
|
||
| #include "detail/abort_registry.hpp" | ||
| #include "detail/posix_helpers.hpp" | ||
|
|
||
| #include <cerrno> | ||
| #include <limits> | ||
| #include <unistd.h> | ||
|
|
||
| extern "C" | ||
| { | ||
| MODULE_API auto serialAbortWrite(int64_t handle, ErrorCallbackT error_callback) -> int | ||
| { | ||
| if (handle <= 0 || handle > std::numeric_limits<int>::max()) | ||
| { | ||
| return cpp_bindings_linux::detail::failMsg<int>(error_callback, cpp_core::StatusCodes::kInvalidHandleError, | ||
| "Invalid handle"); | ||
| } | ||
|
|
||
| const int fd = static_cast<int>(handle); | ||
| const auto pipes = cpp_bindings_linux::detail::getAbortPipesForFd(fd); | ||
| if (!pipes) | ||
| { | ||
| return cpp_bindings_linux::detail::failMsg<int>(error_callback, cpp_core::StatusCodes::kInvalidHandleError, | ||
| "Invalid handle"); | ||
| } | ||
|
|
||
| const unsigned char token = 1; | ||
| for (;;) | ||
| { | ||
| const ssize_t num_written = ::write(pipes->write_abort_w, &token, 1); | ||
| if (num_written == 1) | ||
| { | ||
| return static_cast<int>(cpp_core::StatusCodes::kSuccess); | ||
| } | ||
| if (num_written < 0 && errno == EINTR) | ||
| { | ||
| continue; | ||
| } | ||
| // Pipe already full -> treat as success (abort already requested). | ||
| if (num_written < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) | ||
| { | ||
| return static_cast<int>(cpp_core::StatusCodes::kSuccess); | ||
| } | ||
| return cpp_bindings_linux::detail::failErrno<int>(error_callback, cpp_core::StatusCodes::kAbortWriteError); | ||
| } | ||
| } | ||
| } // extern "C" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The function comment should clarify that this function is specifically designed for draining abort pipes and does not invoke error callbacks. It also silently ignores errors other than EAGAIN/EWOULDBLOCK/EINTR, which is intentional for cleanup operations but should be documented to prevent confusion.