From 05545b9c0061df5db4b1c082a09686ba6b287396 Mon Sep 17 00:00:00 2001 From: Katze719 Date: Tue, 16 Dec 2025 23:44:26 +0100 Subject: [PATCH 1/6] Add serial abort functionality and related infrastructure - Introduced `serialAbortRead` and `serialAbortWrite` functions to handle abort requests for serial operations. - Implemented `abort_registry` to manage abort pipes for serial file descriptors, including functions to register, unregister, and retrieve abort pipes. - Updated existing serial read and write functions to support abort functionality using the new abort pipes. - Enhanced `serial_close` and `serial_open` to manage abort pipes during the lifecycle of serial connections. - Added utility functions for non-blocking read operations from abort pipes to ensure responsiveness during serial communication. --- src/detail/abort_registry.cpp | 126 ++++++++++++++++++++++++++++++++++ src/detail/abort_registry.hpp | 24 +++++++ src/detail/posix_helpers.hpp | 75 ++++++++++++++++++++ src/serial_abort_read.cpp | 49 +++++++++++++ src/serial_abort_write.cpp | 49 +++++++++++++ src/serial_close.cpp | 2 + src/serial_open.cpp | 7 ++ src/serial_read.cpp | 25 ++++++- src/serial_write.cpp | 13 ++-- 9 files changed, 363 insertions(+), 7 deletions(-) create mode 100644 src/detail/abort_registry.cpp create mode 100644 src/detail/abort_registry.hpp create mode 100644 src/serial_abort_read.cpp create mode 100644 src/serial_abort_write.cpp diff --git a/src/detail/abort_registry.cpp b/src/detail/abort_registry.cpp new file mode 100644 index 0000000..370c671 --- /dev/null +++ b/src/detail/abort_registry.cpp @@ -0,0 +1,126 @@ +#include "abort_registry.hpp" + +#include +#include +#include +#include + +namespace cpp_bindings_linux::detail +{ +namespace +{ +auto makePipeNonBlockingCloexec(int out_pipe[2]) -> bool +{ +#ifdef __linux__ + // pipe2 is Linux-specific, which is fine for this bindings library. + if (::pipe2(out_pipe, O_NONBLOCK | O_CLOEXEC) == 0) + { + return true; + } +#endif + + if (::pipe(out_pipe) != 0) + { + return false; + } + + for (int i = 0; i < 2; ++i) + { + const int flags = ::fcntl(out_pipe[i], F_GETFL, 0); + if (flags >= 0) + { + (void)::fcntl(out_pipe[i], F_SETFL, flags | O_NONBLOCK); + } + const int fd_flags = ::fcntl(out_pipe[i], F_GETFD, 0); + if (fd_flags >= 0) + { + (void)::fcntl(out_pipe[i], F_SETFD, fd_flags | FD_CLOEXEC); + } + } + return true; +} + +auto closeIfValid(int fd) -> void +{ + if (fd >= 0) + { + (void)::close(fd); + } +} + +std::mutex g_abort_mu; +std::unordered_map g_abort_by_fd; +} // namespace + +auto registerAbortPipesForFd(int fd) -> bool +{ + if (fd < 0) + { + return false; + } + + std::lock_guard lock(g_abort_mu); + if (g_abort_by_fd.contains(fd)) + { + return true; + } + + int read_pipe[2] = {-1, -1}; + int write_pipe[2] = {-1, -1}; + if (!makePipeNonBlockingCloexec(read_pipe) || !makePipeNonBlockingCloexec(write_pipe)) + { + closeIfValid(read_pipe[0]); + closeIfValid(read_pipe[1]); + closeIfValid(write_pipe[0]); + closeIfValid(write_pipe[1]); + return false; + } + + AbortPipes pipes; + pipes.read_abort_r = read_pipe[0]; + pipes.read_abort_w = read_pipe[1]; + pipes.write_abort_r = write_pipe[0]; + pipes.write_abort_w = write_pipe[1]; + + g_abort_by_fd.emplace(fd, pipes); + return true; +} + +auto unregisterAbortPipesForFd(int fd) -> void +{ + if (fd < 0) + { + return; + } + + std::lock_guard lock(g_abort_mu); + auto it = g_abort_by_fd.find(fd); + if (it == g_abort_by_fd.end()) + { + return; + } + + closeIfValid(it->second.read_abort_r); + closeIfValid(it->second.read_abort_w); + closeIfValid(it->second.write_abort_r); + closeIfValid(it->second.write_abort_w); + + g_abort_by_fd.erase(it); +} + +auto getAbortPipesForFd(int fd) -> std::optional +{ + if (fd < 0) + { + return std::nullopt; + } + + std::lock_guard lock(g_abort_mu); + auto it = g_abort_by_fd.find(fd); + if (it == g_abort_by_fd.end()) + { + return std::nullopt; + } + return it->second; +} +} // namespace cpp_bindings_linux::detail diff --git a/src/detail/abort_registry.hpp b/src/detail/abort_registry.hpp new file mode 100644 index 0000000..688fef0 --- /dev/null +++ b/src/detail/abort_registry.hpp @@ -0,0 +1,24 @@ +#pragma once + +#include + +namespace cpp_bindings_linux::detail +{ +struct AbortPipes +{ + int read_abort_r = -1; + int read_abort_w = -1; + int write_abort_r = -1; + int write_abort_w = -1; +}; + +// Create (if needed) and register abort pipes for a serial FD. +// Returns true on success. +auto registerAbortPipesForFd(int fd) -> bool; + +// Unregister and close any abort pipes for a serial FD. Safe to call multiple times. +auto unregisterAbortPipesForFd(int fd) -> void; + +// Returns the abort pipes for a serial FD, or std::nullopt if none exist. +auto getAbortPipesForFd(int fd) -> std::optional; +} // namespace cpp_bindings_linux::detail diff --git a/src/detail/posix_helpers.hpp b/src/detail/posix_helpers.hpp index 8a78f6a..bdaa714 100644 --- a/src/detail/posix_helpers.hpp +++ b/src/detail/posix_helpers.hpp @@ -95,6 +95,37 @@ inline auto failErrno(Callback error_callback, cpp_core::StatusCodes code) -> Re return static_cast(code); } +// Drain any pending bytes from a non-blocking FD (used for abort pipes). +inline auto drainNonBlockingFd(int file_descriptor) -> void +{ + if (file_descriptor < 0) + { + return; + } + unsigned char buf[64]; + for (;;) + { + const ssize_t n = ::read(file_descriptor, buf, sizeof(buf)); + if (n > 0) + { + continue; + } + if (n == 0) + { + return; + } + if (errno == EAGAIN || errno == EWOULDBLOCK) + { + return; + } + if (errno == EINTR) + { + continue; + } + return; + } +} + // Poll helper used by read/write to implement timeouts. // Returns: -1 on poll error, 0 on timeout/not-ready, 1 on ready. inline auto waitFdReady(int file_descriptor, int timeout_ms, bool for_read) -> int @@ -123,4 +154,48 @@ inline auto waitFdReady(int file_descriptor, int timeout_ms, bool for_read) -> i } return 0; } + +// Poll helper with an optional abort FD. +// Returns: -1 on poll error, 0 on timeout/not-ready, 1 on ready, 2 on abort. +inline auto waitFdReadyOrAbort(int file_descriptor, int abort_fd, int timeout_ms, bool for_read) -> int +{ + struct pollfd fds[2] = {}; + fds[0].fd = file_descriptor; + fds[0].events = for_read ? POLLIN : POLLOUT; + fds[0].revents = 0; + + nfds_t nfds = 1; + if (abort_fd >= 0) + { + fds[1].fd = abort_fd; + fds[1].events = POLLIN; + fds[1].revents = 0; + nfds = 2; + } + + const int poll_result = poll(fds, nfds, timeout_ms); + if (poll_result < 0) + { + return -1; + } + if (poll_result == 0) + { + return 0; + } + + if (abort_fd >= 0 && ((fds[1].revents & POLLIN) != 0)) + { + return 2; + } + + if (for_read && ((fds[0].revents & POLLIN) != 0)) + { + return 1; + } + if (!for_read && ((fds[0].revents & POLLOUT) != 0)) + { + return 1; + } + return 0; +} } // namespace cpp_bindings_linux::detail diff --git a/src/serial_abort_read.cpp b/src/serial_abort_read.cpp new file mode 100644 index 0000000..89d6879 --- /dev/null +++ b/src/serial_abort_read.cpp @@ -0,0 +1,49 @@ +#include +#include + +#include "detail/abort_registry.hpp" +#include "detail/posix_helpers.hpp" + +#include +#include +#include + +extern "C" +{ + MODULE_API auto serialAbortRead(int64_t handle, ErrorCallbackT error_callback) -> int + { + if (handle <= 0 || handle > std::numeric_limits::max()) + { + return cpp_bindings_linux::detail::failMsg(error_callback, cpp_core::StatusCodes::kInvalidHandleError, + "Invalid handle"); + } + + const int fd = static_cast(handle); + const auto pipes = cpp_bindings_linux::detail::getAbortPipesForFd(fd); + if (!pipes) + { + return cpp_bindings_linux::detail::failMsg(error_callback, cpp_core::StatusCodes::kInvalidHandleError, + "Invalid handle"); + } + + const unsigned char token = 1; + for (;;) + { + const ssize_t n = ::write(pipes->read_abort_w, &token, 1); + if (n == 1) + { + return static_cast(cpp_core::StatusCodes::kSuccess); + } + if (n < 0 && errno == EINTR) + { + continue; + } + // Pipe already full -> treat as success (abort already requested). + if (n < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) + { + return static_cast(cpp_core::StatusCodes::kSuccess); + } + return cpp_bindings_linux::detail::failErrno(error_callback, cpp_core::StatusCodes::kAbortReadError); + } + } +} // extern "C" diff --git a/src/serial_abort_write.cpp b/src/serial_abort_write.cpp new file mode 100644 index 0000000..e85a77d --- /dev/null +++ b/src/serial_abort_write.cpp @@ -0,0 +1,49 @@ +#include +#include + +#include "detail/abort_registry.hpp" +#include "detail/posix_helpers.hpp" + +#include +#include +#include + +extern "C" +{ + MODULE_API auto serialAbortWrite(int64_t handle, ErrorCallbackT error_callback) -> int + { + if (handle <= 0 || handle > std::numeric_limits::max()) + { + return cpp_bindings_linux::detail::failMsg(error_callback, cpp_core::StatusCodes::kInvalidHandleError, + "Invalid handle"); + } + + const int fd = static_cast(handle); + const auto pipes = cpp_bindings_linux::detail::getAbortPipesForFd(fd); + if (!pipes) + { + return cpp_bindings_linux::detail::failMsg(error_callback, cpp_core::StatusCodes::kInvalidHandleError, + "Invalid handle"); + } + + const unsigned char token = 1; + for (;;) + { + const ssize_t n = ::write(pipes->write_abort_w, &token, 1); + if (n == 1) + { + return static_cast(cpp_core::StatusCodes::kSuccess); + } + if (n < 0 && errno == EINTR) + { + continue; + } + // Pipe already full -> treat as success (abort already requested). + if (n < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) + { + return static_cast(cpp_core::StatusCodes::kSuccess); + } + return cpp_bindings_linux::detail::failErrno(error_callback, cpp_core::StatusCodes::kAbortWriteError); + } + } +} // extern "C" diff --git a/src/serial_close.cpp b/src/serial_close.cpp index 6ef9cea..f21eab1 100644 --- a/src/serial_close.cpp +++ b/src/serial_close.cpp @@ -1,6 +1,7 @@ #include #include +#include "detail/abort_registry.hpp" #include "detail/posix_helpers.hpp" #include @@ -22,6 +23,7 @@ extern "C" } const int fd = static_cast(handle); + cpp_bindings_linux::detail::unregisterAbortPipesForFd(fd); if (close(fd) != 0) { return cpp_bindings_linux::detail::failErrno(error_callback, cpp_core::StatusCodes::kCloseHandleError); diff --git a/src/serial_open.cpp b/src/serial_open.cpp index e2ddd13..91406b1 100644 --- a/src/serial_open.cpp +++ b/src/serial_open.cpp @@ -1,6 +1,7 @@ #include #include +#include "detail/abort_registry.hpp" #include "detail/posix_helpers.hpp" #include @@ -154,6 +155,12 @@ extern "C" tcflush(handle.get(), TCIOFLUSH); + if (!cpp_bindings_linux::detail::registerAbortPipesForFd(handle.get())) + { + return cpp_bindings_linux::detail::failMsg(error_callback, cpp_core::StatusCodes::kSetStateError, + "Failed to initialize abort pipes"); + } + // Note: Some devices (e.g., Arduino) reset when the serial port is opened. // It is recommended to wait 1-2 seconds after opening before sending data // to allow the device to initialize. diff --git a/src/serial_read.cpp b/src/serial_read.cpp index a883b80..3d31360 100644 --- a/src/serial_read.cpp +++ b/src/serial_read.cpp @@ -1,6 +1,7 @@ #include #include +#include "detail/abort_registry.hpp" #include "detail/posix_helpers.hpp" #include @@ -27,11 +28,19 @@ extern "C" const int fd = static_cast(handle); auto *buf = static_cast(buffer); - const int ready = cpp_bindings_linux::detail::waitFdReady(fd, timeout_ms, true); + const auto abort_pipes = cpp_bindings_linux::detail::getAbortPipesForFd(fd); + const int abort_fd = abort_pipes ? abort_pipes->read_abort_r : -1; + + const int ready = cpp_bindings_linux::detail::waitFdReadyOrAbort(fd, abort_fd, timeout_ms, true); if (ready < 0) { return cpp_bindings_linux::detail::failErrno(error_callback, cpp_core::StatusCodes::kReadError); } + if (ready == 2) + { + cpp_bindings_linux::detail::drainNonBlockingFd(abort_fd); + return static_cast(cpp_core::StatusCodes::kAbortReadError); + } if (ready == 0) { return 0; @@ -55,11 +64,16 @@ extern "C" // Some drivers can report readiness but still return 0; give it a tiny grace period and retry once. if (bytes_read == 0) { - const int retry_ready = cpp_bindings_linux::detail::waitFdReady(fd, 10, true); + const int retry_ready = cpp_bindings_linux::detail::waitFdReadyOrAbort(fd, abort_fd, 10, true); if (retry_ready < 0) { return cpp_bindings_linux::detail::failErrno(error_callback, cpp_core::StatusCodes::kReadError); } + if (retry_ready == 2) + { + cpp_bindings_linux::detail::drainNonBlockingFd(abort_fd); + return static_cast(cpp_core::StatusCodes::kAbortReadError); + } if (retry_ready == 0) { return 0; @@ -74,11 +88,16 @@ extern "C" int total_read = static_cast(bytes_read); while (total_read < buffer_size) { - const int loop_ready = cpp_bindings_linux::detail::waitFdReady(fd, 0, true); + const int loop_ready = cpp_bindings_linux::detail::waitFdReadyOrAbort(fd, abort_fd, 0, true); if (loop_ready < 0) { return cpp_bindings_linux::detail::failErrno(error_callback, cpp_core::StatusCodes::kReadError); } + if (loop_ready == 2) + { + cpp_bindings_linux::detail::drainNonBlockingFd(abort_fd); + return static_cast(cpp_core::StatusCodes::kAbortReadError); + } if (loop_ready == 0) { break; diff --git a/src/serial_write.cpp b/src/serial_write.cpp index abd7b10..c368fb2 100644 --- a/src/serial_write.cpp +++ b/src/serial_write.cpp @@ -1,12 +1,12 @@ #include #include +#include "detail/abort_registry.hpp" #include "detail/posix_helpers.hpp" #include #include #include -#include #include extern "C" @@ -28,18 +28,25 @@ extern "C" } const int fd = static_cast(handle); + const auto abort_pipes = cpp_bindings_linux::detail::getAbortPipesForFd(fd); + const int abort_fd = abort_pipes ? abort_pipes->write_abort_r : -1; ssize_t bytes_written = ::write(fd, buffer, buffer_size); if (bytes_written < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK) { - const int ready = cpp_bindings_linux::detail::waitFdReady(fd, timeout_ms, false); + const int ready = cpp_bindings_linux::detail::waitFdReadyOrAbort(fd, abort_fd, timeout_ms, false); if (ready < 0) { return cpp_bindings_linux::detail::failErrno(error_callback, cpp_core::StatusCodes::kWriteError); } + if (ready == 2) + { + cpp_bindings_linux::detail::drainNonBlockingFd(abort_fd); + return static_cast(cpp_core::StatusCodes::kAbortWriteError); + } if (ready > 0) { bytes_written = ::write(fd, buffer, buffer_size); @@ -56,8 +63,6 @@ extern "C" } } - tcdrain(fd); - return static_cast(bytes_written); } From 978e50d5793815e7221abe6deb1064e01648a6cc Mon Sep 17 00:00:00 2001 From: Katze719 Date: Tue, 16 Dec 2025 23:47:21 +0100 Subject: [PATCH 2/6] Refactor serial read and write functions for improved error handling and non-blocking operations - Updated `serial_abort_read.cpp` and `serial_abort_write.cpp` to use `num_written` for clarity in write operations. - Enhanced `serial_read.cpp` with new non-blocking read utility functions and improved error handling for read operations. - Introduced `readUntilNotReady` and `tryReadOnceNonBlocking` to streamline the reading process and handle abort scenarios more effectively. - Refactored `abort_registry.cpp` to utilize `std::array` for pipe management and improved mutex naming for better readability. --- src/detail/abort_registry.cpp | 50 +++++++------ src/serial_abort_read.cpp | 8 +- src/serial_abort_write.cpp | 8 +- src/serial_read.cpp | 137 +++++++++++++++++++--------------- 4 files changed, 111 insertions(+), 92 deletions(-) diff --git a/src/detail/abort_registry.cpp b/src/detail/abort_registry.cpp index 370c671..f192230 100644 --- a/src/detail/abort_registry.cpp +++ b/src/detail/abort_registry.cpp @@ -1,5 +1,6 @@ #include "abort_registry.hpp" +#include #include #include #include @@ -9,32 +10,33 @@ namespace cpp_bindings_linux::detail { namespace { -auto makePipeNonBlockingCloexec(int out_pipe[2]) -> bool +auto makePipeNonBlockingCloexec(std::array &out_pipe) -> bool { #ifdef __linux__ // pipe2 is Linux-specific, which is fine for this bindings library. - if (::pipe2(out_pipe, O_NONBLOCK | O_CLOEXEC) == 0) + if (::pipe2(out_pipe.data(), O_NONBLOCK | O_CLOEXEC) == 0) { return true; } #endif - if (::pipe(out_pipe) != 0) + if (::pipe(out_pipe.data()) != 0) { return false; } - for (int i = 0; i < 2; ++i) + for (int index = 0; index < 2; ++index) { - const int flags = ::fcntl(out_pipe[i], F_GETFL, 0); + const auto pipe_index = static_cast(index); + const int flags = ::fcntl(out_pipe[pipe_index], F_GETFL, 0); if (flags >= 0) { - (void)::fcntl(out_pipe[i], F_SETFL, flags | O_NONBLOCK); + (void)::fcntl(out_pipe[pipe_index], F_SETFL, flags | O_NONBLOCK); } - const int fd_flags = ::fcntl(out_pipe[i], F_GETFD, 0); + const int fd_flags = ::fcntl(out_pipe[pipe_index], F_GETFD, 0); if (fd_flags >= 0) { - (void)::fcntl(out_pipe[i], F_SETFD, fd_flags | FD_CLOEXEC); + (void)::fcntl(out_pipe[pipe_index], F_SETFD, fd_flags | FD_CLOEXEC); } } return true; @@ -48,7 +50,7 @@ auto closeIfValid(int fd) -> void } } -std::mutex g_abort_mu; +std::mutex g_abort_mutex; std::unordered_map g_abort_by_fd; } // namespace @@ -59,14 +61,14 @@ auto registerAbortPipesForFd(int fd) -> bool return false; } - std::lock_guard lock(g_abort_mu); + std::lock_guard lock(g_abort_mutex); if (g_abort_by_fd.contains(fd)) { return true; } - int read_pipe[2] = {-1, -1}; - int write_pipe[2] = {-1, -1}; + std::array read_pipe = {-1, -1}; + std::array write_pipe = {-1, -1}; if (!makePipeNonBlockingCloexec(read_pipe) || !makePipeNonBlockingCloexec(write_pipe)) { closeIfValid(read_pipe[0]); @@ -93,19 +95,19 @@ auto unregisterAbortPipesForFd(int fd) -> void return; } - std::lock_guard lock(g_abort_mu); - auto it = g_abort_by_fd.find(fd); - if (it == g_abort_by_fd.end()) + std::lock_guard lock(g_abort_mutex); + auto iter = g_abort_by_fd.find(fd); + if (iter == g_abort_by_fd.end()) { return; } - closeIfValid(it->second.read_abort_r); - closeIfValid(it->second.read_abort_w); - closeIfValid(it->second.write_abort_r); - closeIfValid(it->second.write_abort_w); + closeIfValid(iter->second.read_abort_r); + closeIfValid(iter->second.read_abort_w); + closeIfValid(iter->second.write_abort_r); + closeIfValid(iter->second.write_abort_w); - g_abort_by_fd.erase(it); + g_abort_by_fd.erase(iter); } auto getAbortPipesForFd(int fd) -> std::optional @@ -115,12 +117,12 @@ auto getAbortPipesForFd(int fd) -> std::optional return std::nullopt; } - std::lock_guard lock(g_abort_mu); - auto it = g_abort_by_fd.find(fd); - if (it == g_abort_by_fd.end()) + std::lock_guard lock(g_abort_mutex); + auto iter = g_abort_by_fd.find(fd); + if (iter == g_abort_by_fd.end()) { return std::nullopt; } - return it->second; + return iter->second; } } // namespace cpp_bindings_linux::detail diff --git a/src/serial_abort_read.cpp b/src/serial_abort_read.cpp index 89d6879..08e2ca7 100644 --- a/src/serial_abort_read.cpp +++ b/src/serial_abort_read.cpp @@ -29,17 +29,17 @@ extern "C" const unsigned char token = 1; for (;;) { - const ssize_t n = ::write(pipes->read_abort_w, &token, 1); - if (n == 1) + const ssize_t num_written = ::write(pipes->read_abort_w, &token, 1); + if (num_written == 1) { return static_cast(cpp_core::StatusCodes::kSuccess); } - if (n < 0 && errno == EINTR) + if (num_written < 0 && errno == EINTR) { continue; } // Pipe already full -> treat as success (abort already requested). - if (n < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) + if (num_written < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) { return static_cast(cpp_core::StatusCodes::kSuccess); } diff --git a/src/serial_abort_write.cpp b/src/serial_abort_write.cpp index e85a77d..5e5d279 100644 --- a/src/serial_abort_write.cpp +++ b/src/serial_abort_write.cpp @@ -29,17 +29,17 @@ extern "C" const unsigned char token = 1; for (;;) { - const ssize_t n = ::write(pipes->write_abort_w, &token, 1); - if (n == 1) + const ssize_t num_written = ::write(pipes->write_abort_w, &token, 1); + if (num_written == 1) { return static_cast(cpp_core::StatusCodes::kSuccess); } - if (n < 0 && errno == EINTR) + if (num_written < 0 && errno == EINTR) { continue; } // Pipe already full -> treat as success (abort already requested). - if (n < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) + if (num_written < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) { return static_cast(cpp_core::StatusCodes::kSuccess); } diff --git a/src/serial_read.cpp b/src/serial_read.cpp index 3d31360..dfab698 100644 --- a/src/serial_read.cpp +++ b/src/serial_read.cpp @@ -6,8 +6,74 @@ #include #include +#include #include +namespace +{ +constexpr int kGraceRetryTimeoutMs = 10; + +auto tryReadOnceNonBlocking(int fd, unsigned char *dst, int size) -> ssize_t +{ + const ssize_t bytes = ::read(fd, dst, size); + if (bytes < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) + { + return 0; + } + return bytes; +} + +auto handleWaitResultForRead(int wait_result, int abort_fd, ErrorCallbackT error_callback) -> std::optional +{ + if (wait_result < 0) + { + return cpp_bindings_linux::detail::failErrno(error_callback, cpp_core::StatusCodes::kReadError); + } + if (wait_result == 0) + { + return 0; + } + if (wait_result == 2) + { + cpp_bindings_linux::detail::drainNonBlockingFd(abort_fd); + return static_cast(cpp_core::StatusCodes::kAbortReadError); + } + return std::nullopt; +} + +auto readUntilNotReady(int fd, int abort_fd, unsigned char *buf, int buffer_size, int already_read, + ErrorCallbackT error_callback) -> int +{ + int total_read = already_read; + while (total_read < buffer_size) + { + const int wait_result = cpp_bindings_linux::detail::waitFdReadyOrAbort(fd, abort_fd, 0, true); + if (const auto immediate = handleWaitResultForRead(wait_result, abort_fd, error_callback); + immediate.has_value()) + { + // timeout -> return what we have; abort/error handled by helper + if (*immediate == 0) + { + return total_read; + } + return *immediate; + } + + const ssize_t more_bytes = tryReadOnceNonBlocking(fd, buf + total_read, buffer_size - total_read); + if (more_bytes < 0) + { + return cpp_bindings_linux::detail::failErrno(error_callback, cpp_core::StatusCodes::kReadError); + } + if (more_bytes == 0) + { + return total_read; + } + total_read += static_cast(more_bytes); + } + return total_read; +} +} // namespace + extern "C" { MODULE_API auto serialRead(int64_t handle, void *buffer, int buffer_size, int timeout_ms, int /*multiplier*/, @@ -32,30 +98,12 @@ extern "C" const int abort_fd = abort_pipes ? abort_pipes->read_abort_r : -1; const int ready = cpp_bindings_linux::detail::waitFdReadyOrAbort(fd, abort_fd, timeout_ms, true); - if (ready < 0) - { - return cpp_bindings_linux::detail::failErrno(error_callback, cpp_core::StatusCodes::kReadError); - } - if (ready == 2) + if (const auto immediate = handleWaitResultForRead(ready, abort_fd, error_callback); immediate.has_value()) { - cpp_bindings_linux::detail::drainNonBlockingFd(abort_fd); - return static_cast(cpp_core::StatusCodes::kAbortReadError); + return *immediate; } - if (ready == 0) - { - return 0; - } - - const auto try_read_once = [&](unsigned char *dst, int size) -> ssize_t { - const ssize_t bytes = ::read(fd, dst, size); - if (bytes < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) - { - return 0; - } - return bytes; - }; - ssize_t bytes_read = try_read_once(buf, buffer_size); + ssize_t bytes_read = tryReadOnceNonBlocking(fd, buf, buffer_size); if (bytes_read < 0) { return cpp_bindings_linux::detail::failErrno(error_callback, cpp_core::StatusCodes::kReadError); @@ -64,53 +112,22 @@ extern "C" // Some drivers can report readiness but still return 0; give it a tiny grace period and retry once. if (bytes_read == 0) { - const int retry_ready = cpp_bindings_linux::detail::waitFdReadyOrAbort(fd, abort_fd, 10, true); - if (retry_ready < 0) + const int retry_ready = + cpp_bindings_linux::detail::waitFdReadyOrAbort(fd, abort_fd, kGraceRetryTimeoutMs, true); + if (const auto immediate = handleWaitResultForRead(retry_ready, abort_fd, error_callback); + immediate.has_value()) { - return cpp_bindings_linux::detail::failErrno(error_callback, cpp_core::StatusCodes::kReadError); + return *immediate; } - if (retry_ready == 2) - { - cpp_bindings_linux::detail::drainNonBlockingFd(abort_fd); - return static_cast(cpp_core::StatusCodes::kAbortReadError); - } - if (retry_ready == 0) - { - return 0; - } - bytes_read = try_read_once(buf, buffer_size); + + bytes_read = tryReadOnceNonBlocking(fd, buf, buffer_size); if (bytes_read <= 0) { return 0; } } - int total_read = static_cast(bytes_read); - while (total_read < buffer_size) - { - const int loop_ready = cpp_bindings_linux::detail::waitFdReadyOrAbort(fd, abort_fd, 0, true); - if (loop_ready < 0) - { - return cpp_bindings_linux::detail::failErrno(error_callback, cpp_core::StatusCodes::kReadError); - } - if (loop_ready == 2) - { - cpp_bindings_linux::detail::drainNonBlockingFd(abort_fd); - return static_cast(cpp_core::StatusCodes::kAbortReadError); - } - if (loop_ready == 0) - { - break; - } - const ssize_t more_bytes = try_read_once(buf + total_read, buffer_size - total_read); - if (more_bytes <= 0) - { - break; - } - total_read += static_cast(more_bytes); - } - - return total_read; + return readUntilNotReady(fd, abort_fd, buf, buffer_size, static_cast(bytes_read), error_callback); } } // extern "C" From d231f8f1dd766d9cc3cdf08f646bb8aa8613ff42 Mon Sep 17 00:00:00 2001 From: Katze719 Date: Tue, 16 Dec 2025 23:54:59 +0100 Subject: [PATCH 3/6] Add serial abort functionality and update test workflows - Introduced `serialAbortRead` and `serialAbortWrite` functions to handle abort requests for serial operations. - Updated the GitHub workflows for C++ and Deno tests to include the `SERIAL_TEST_SKIP_INIT_DELAY` environment variable. - Enhanced integration tests to assert the existence of new abort functions. - Added a new test file `test_abort.cpp` to validate the behavior of aborting blocking read and write operations in a multi-threaded context. - Modified existing tests to accommodate the new abort functionality and ensure proper error handling during serial operations. --- .github/workflows/cpp-tests.yml | 1 + .github/workflows/deno-tests.yml | 1 + integration_tests/ffi_bindings.ts | 8 ++ integration_tests/integration_test.ts | 2 + tests/test_abort.cpp | 150 ++++++++++++++++++++++++++ tests/test_serial_arduino.cpp | 27 ++++- 6 files changed, 188 insertions(+), 1 deletion(-) create mode 100644 tests/test_abort.cpp diff --git a/.github/workflows/cpp-tests.yml b/.github/workflows/cpp-tests.yml index cefe01e..67a10e9 100644 --- a/.github/workflows/cpp-tests.yml +++ b/.github/workflows/cpp-tests.yml @@ -15,6 +15,7 @@ jobs: env: SERIAL_TEST_PORT: /tmp/ttyCI_A + SERIAL_TEST_SKIP_INIT_DELAY: "1" steps: - name: Checkout repository diff --git a/.github/workflows/deno-tests.yml b/.github/workflows/deno-tests.yml index 983701b..40a356c 100644 --- a/.github/workflows/deno-tests.yml +++ b/.github/workflows/deno-tests.yml @@ -47,6 +47,7 @@ jobs: working-directory: integration_tests env: SERIAL_TEST_PORT: /tmp/ttyCI_A + SERIAL_TEST_SKIP_INIT_DELAY: "1" run: | deno task test diff --git a/integration_tests/ffi_bindings.ts b/integration_tests/ffi_bindings.ts index 6060b81..9c16f46 100644 --- a/integration_tests/ffi_bindings.ts +++ b/integration_tests/ffi_bindings.ts @@ -23,6 +23,14 @@ const symbols = { parameters: ["i64", "pointer", "i32", "i32", "i32", "pointer"] as const, result: "i32" as const, }, + serialAbortRead: { + parameters: ["i64", "pointer"] as const, + result: "i32" as const, + }, + serialAbortWrite: { + parameters: ["i64", "pointer"] as const, + result: "i32" as const, + }, }; /** diff --git a/integration_tests/integration_test.ts b/integration_tests/integration_test.ts index 806235f..d5c1a12 100644 --- a/integration_tests/integration_test.ts +++ b/integration_tests/integration_test.ts @@ -23,6 +23,8 @@ Deno.test({ assertExists(lib.serialClose); assertExists(lib.serialRead); assertExists(lib.serialWrite); + assertExists(lib.serialAbortRead); + assertExists(lib.serialAbortWrite); console.log("cpp-bindings-linux library loaded and symbols resolved"); }, diff --git a/tests/test_abort.cpp b/tests/test_abort.cpp new file mode 100644 index 0000000..8d6f324 --- /dev/null +++ b/tests/test_abort.cpp @@ -0,0 +1,150 @@ +#include +#include +#include +#include +#include +#include +#include +#include + +#include "../src/detail/abort_registry.hpp" + +#include +#include +#include +#include + +#include +#include +#include + +namespace +{ +class UniqueFd +{ + public: + explicit UniqueFd(int fd) : fd_(fd) + { + } + UniqueFd(const UniqueFd &) = delete; + auto operator=(const UniqueFd &) -> UniqueFd & = delete; + UniqueFd(UniqueFd &&other) noexcept : fd_(other.fd_) + { + other.fd_ = -1; + } + auto operator=(UniqueFd &&other) noexcept -> UniqueFd & + { + if (this != &other) + { + reset(other.release()); + } + return *this; + } + ~UniqueFd() + { + reset(-1); + } + + [[nodiscard]] auto get() const -> int + { + return fd_; + } + [[nodiscard]] auto release() -> int + { + const int out = fd_; + fd_ = -1; + return out; + } + auto reset(int new_fd) -> void + { + if (fd_ >= 0) + { + (void)::close(fd_); + } + fd_ = new_fd; + } + + private: + int fd_ = -1; +}; + +auto fillNonBlockingFd(int fd) -> void +{ + const char buf[4096] = {}; + for (;;) + { + const ssize_t num_written = ::write(fd, buf, sizeof(buf)); + if (num_written > 0) + { + continue; + } + if (num_written < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) + { + return; + } + if (num_written < 0 && errno == EINTR) + { + continue; + } + return; + } +} +} // namespace + +TEST(AbortReadTest, AbortsBlockingReadFromOtherThread) +{ + int master_fd = -1; + int slave_fd = -1; + char slave_name[128] = {}; + ASSERT_EQ(::openpty(&master_fd, &slave_fd, slave_name, nullptr, nullptr), 0); + UniqueFd master(master_fd); + UniqueFd slave(slave_fd); + + // Use the path with serialOpen so we also test that abort pipes are registered on open(). + intptr_t handle = serialOpen(static_cast(slave_name), 115200, 8, 0, 0, nullptr); + ASSERT_GT(handle, 0); + + std::atomic read_result{999}; + std::thread reader([&] { + unsigned char buffer[16] = {}; + // Long timeout to ensure we block in poll() until abort happens. + read_result.store(serialRead(handle, buffer, sizeof(buffer), 10'000, 1, nullptr)); + }); + + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + EXPECT_EQ(serialAbortRead(handle, nullptr), 0); + + reader.join(); + EXPECT_EQ(read_result.load(), static_cast(cpp_core::StatusCodes::kAbortReadError)); + + EXPECT_EQ(serialClose(handle, nullptr), 0); +} + +TEST(AbortWriteTest, AbortsBlockingWriteFromOtherThread) +{ + int pipe_fds[2] = {-1, -1}; + ASSERT_EQ(::pipe2(pipe_fds, O_NONBLOCK | O_CLOEXEC), 0); + UniqueFd read_end(pipe_fds[0]); + UniqueFd write_end(pipe_fds[1]); + + // Register abort pipes since we bypass serialOpen() here. + ASSERT_TRUE(cpp_bindings_linux::detail::registerAbortPipesForFd(write_end.get())); + + // Fill the pipe buffer so future writes hit EAGAIN and serialWrite blocks in poll(). + fillNonBlockingFd(write_end.get()); + + const char payload[4096] = {}; + std::atomic write_result{999}; + std::thread writer([&] { + write_result.store( + serialWrite(write_end.get(), payload, static_cast(sizeof(payload)), 10'000, 1, nullptr)); + }); + + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + EXPECT_EQ(serialAbortWrite(write_end.get(), nullptr), 0); + + writer.join(); + EXPECT_EQ(write_result.load(), static_cast(cpp_core::StatusCodes::kAbortWriteError)); + + cpp_bindings_linux::detail::unregisterAbortPipesForFd(write_end.get()); +} diff --git a/tests/test_serial_arduino.cpp b/tests/test_serial_arduino.cpp index 6772c4a..fc74c0a 100644 --- a/tests/test_serial_arduino.cpp +++ b/tests/test_serial_arduino.cpp @@ -1,5 +1,6 @@ // Test for serial communication with Arduino echo script on /dev/ttyUSB0 +#include #include #include #include @@ -8,7 +9,12 @@ #include #include +#include +#include +#include #include +#include +#include class SerialArduinoTest : public ::testing::Test { @@ -25,7 +31,11 @@ class SerialArduinoTest : public ::testing::Test << "'. Set SERIAL_TEST_PORT or connect Arduino on /dev/ttyUSB0."; } - usleep(2000000); + const char *skip_delay = std::getenv("SERIAL_TEST_SKIP_INIT_DELAY"); + if (skip_delay == nullptr || std::strcmp(skip_delay, "1") != 0) + { + usleep(2000000); + } } void TearDown() override @@ -92,6 +102,21 @@ TEST_F(SerialArduinoTest, ReadTimeout) EXPECT_GE(read_bytes, 0) << "Timeout should return 0, not error"; } +TEST_F(SerialArduinoTest, AbortRead) +{ + std::atomic read_result{999}; + std::thread reader([&] { + unsigned char buffer[16] = {}; + read_result.store(serialRead(handle_, buffer, sizeof(buffer), 10000, 1, nullptr)); + }); + + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + EXPECT_EQ(serialAbortRead(handle_, nullptr), 0); + + reader.join(); + EXPECT_EQ(read_result.load(), static_cast(cpp_core::StatusCodes::kAbortReadError)); +} + TEST(SerialInvalidHandleTest, InvalidHandleRead) { char buffer[256]; From 8ea696072a585a0209172197acea098ebda2f89e Mon Sep 17 00:00:00 2001 From: Katze719 Date: Tue, 16 Dec 2025 23:56:13 +0100 Subject: [PATCH 4/6] Refactor pipe creation in abort_registry for cross-platform compatibility - Removed Linux-specific code for pipe creation in `abort_registry.cpp`. - Simplified the implementation to use a standard pipe creation method, enhancing portability across different platforms. --- src/detail/abort_registry.cpp | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/detail/abort_registry.cpp b/src/detail/abort_registry.cpp index f192230..2ff8b7b 100644 --- a/src/detail/abort_registry.cpp +++ b/src/detail/abort_registry.cpp @@ -12,13 +12,10 @@ namespace { auto makePipeNonBlockingCloexec(std::array &out_pipe) -> bool { -#ifdef __linux__ - // pipe2 is Linux-specific, which is fine for this bindings library. if (::pipe2(out_pipe.data(), O_NONBLOCK | O_CLOEXEC) == 0) { return true; } -#endif if (::pipe(out_pipe.data()) != 0) { From 314c2e68a7da995b01f449f18a16fad116989f41 Mon Sep 17 00:00:00 2001 From: Katze719 Date: Wed, 17 Dec 2025 19:43:04 +0100 Subject: [PATCH 5/6] Remove SERIAL_TEST_SKIP_INIT_DELAY from CI workflows and refactor serialWrite function for improved timeout handling - Eliminated the `SERIAL_TEST_SKIP_INIT_DELAY` environment variable from both C++ and Deno test workflows. - Refactored the `serialWrite` function to enhance timeout management, allowing for a multiplier effect on subsequent byte writes. - Updated error handling and added support for abort scenarios during write operations. - Improved test cases to reflect changes in the serial communication logic and ensure robust functionality. --- .github/workflows/cpp-tests.yml | 1 - .github/workflows/deno-tests.yml | 1 - src/detail/posix_helpers.hpp | 38 ++++++-- src/serial_write.cpp | 62 +++++++++---- tests/test_abort.cpp | 1 - tests/test_serial_arduino.cpp | 150 ++++++++++++++++++++++++------- 6 files changed, 196 insertions(+), 57 deletions(-) diff --git a/.github/workflows/cpp-tests.yml b/.github/workflows/cpp-tests.yml index 67a10e9..cefe01e 100644 --- a/.github/workflows/cpp-tests.yml +++ b/.github/workflows/cpp-tests.yml @@ -15,7 +15,6 @@ jobs: env: SERIAL_TEST_PORT: /tmp/ttyCI_A - SERIAL_TEST_SKIP_INIT_DELAY: "1" steps: - name: Checkout repository diff --git a/.github/workflows/deno-tests.yml b/.github/workflows/deno-tests.yml index 40a356c..983701b 100644 --- a/.github/workflows/deno-tests.yml +++ b/.github/workflows/deno-tests.yml @@ -47,7 +47,6 @@ jobs: working-directory: integration_tests env: SERIAL_TEST_PORT: /tmp/ttyCI_A - SERIAL_TEST_SKIP_INIT_DELAY: "1" run: | deno task test diff --git a/src/detail/posix_helpers.hpp b/src/detail/posix_helpers.hpp index bdaa714..a0dc280 100644 --- a/src/detail/posix_helpers.hpp +++ b/src/detail/posix_helpers.hpp @@ -2,6 +2,7 @@ #include +#include #include #include #include @@ -102,15 +103,15 @@ inline auto drainNonBlockingFd(int file_descriptor) -> void { return; } - unsigned char buf[64]; + std::array buf = {}; for (;;) { - const ssize_t n = ::read(file_descriptor, buf, sizeof(buf)); - if (n > 0) + const ssize_t num_read = ::read(file_descriptor, buf.data(), buf.size()); + if (num_read > 0) { continue; } - if (n == 0) + if (num_read == 0) { return; } @@ -126,6 +127,31 @@ inline auto drainNonBlockingFd(int file_descriptor) -> void } } +// Returns true if the abort FD is signaled (has at least one byte to read). If true, drains it. +inline auto consumeAbortIfSet(int abort_fd) -> bool +{ + if (abort_fd < 0) + { + return false; + } + struct pollfd poll_fd = {}; + poll_fd.fd = abort_fd; + poll_fd.events = POLLIN; + poll_fd.revents = 0; + + const int poll_result = poll(&poll_fd, 1, 0); + if (poll_result <= 0) + { + return false; + } + if ((poll_fd.revents & POLLIN) == 0) + { + return false; + } + drainNonBlockingFd(abort_fd); + return true; +} + // Poll helper used by read/write to implement timeouts. // Returns: -1 on poll error, 0 on timeout/not-ready, 1 on ready. inline auto waitFdReady(int file_descriptor, int timeout_ms, bool for_read) -> int @@ -159,7 +185,7 @@ inline auto waitFdReady(int file_descriptor, int timeout_ms, bool for_read) -> i // Returns: -1 on poll error, 0 on timeout/not-ready, 1 on ready, 2 on abort. inline auto waitFdReadyOrAbort(int file_descriptor, int abort_fd, int timeout_ms, bool for_read) -> int { - struct pollfd fds[2] = {}; + std::array fds = {}; fds[0].fd = file_descriptor; fds[0].events = for_read ? POLLIN : POLLOUT; fds[0].revents = 0; @@ -173,7 +199,7 @@ inline auto waitFdReadyOrAbort(int file_descriptor, int abort_fd, int timeout_ms nfds = 2; } - const int poll_result = poll(fds, nfds, timeout_ms); + const int poll_result = poll(fds.data(), nfds, timeout_ms); if (poll_result < 0) { return -1; diff --git a/src/serial_write.cpp b/src/serial_write.cpp index c368fb2..840bf01 100644 --- a/src/serial_write.cpp +++ b/src/serial_write.cpp @@ -5,14 +5,14 @@ #include "detail/posix_helpers.hpp" #include -#include #include #include extern "C" { - MODULE_API auto serialWrite(int64_t handle, const void *buffer, int buffer_size, int timeout_ms, int /*multiplier*/, + // NOLINTNEXTLINE(readability-function-cognitive-complexity) + MODULE_API auto serialWrite(int64_t handle, const void *buffer, int buffer_size, int timeout_ms, int multiplier, ErrorCallbackT error_callback) -> int { if (buffer == nullptr || buffer_size <= 0) @@ -31,12 +31,45 @@ extern "C" const auto abort_pipes = cpp_bindings_linux::detail::getAbortPipesForFd(fd); const int abort_fd = abort_pipes ? abort_pipes->write_abort_r : -1; - ssize_t bytes_written = ::write(fd, buffer, buffer_size); - if (bytes_written < 0) + const auto *src = static_cast(buffer); + int total_written = 0; + + // Mirror cpp_core semantics: timeout_ms applies to the first byte, + // then timeout_ms * multiplier applies to subsequent bytes. + int current_timeout_ms = timeout_ms; + + while (total_written < buffer_size) { - if (errno == EAGAIN || errno == EWOULDBLOCK) + // Abort should also cancel writers that never hit EAGAIN/poll. + if (cpp_bindings_linux::detail::consumeAbortIfSet(abort_fd)) + { + return static_cast(cpp_core::StatusCodes::kAbortWriteError); + } + + const ssize_t num_written = ::write(fd, src + total_written, buffer_size - total_written); + if (num_written > 0) { - const int ready = cpp_bindings_linux::detail::waitFdReadyOrAbort(fd, abort_fd, timeout_ms, false); + total_written += static_cast(num_written); + + // If multiplier is 0, return immediately after the first successful write. + if (multiplier == 0) + { + return total_written; + } + // Subsequent bytes use scaled timeout. + current_timeout_ms = timeout_ms * multiplier; + continue; + } + + if (num_written < 0 && errno == EINTR) + { + continue; + } + + if (num_written < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) + { + const int ready = + cpp_bindings_linux::detail::waitFdReadyOrAbort(fd, abort_fd, current_timeout_ms, false); if (ready < 0) { return cpp_bindings_linux::detail::failErrno(error_callback, @@ -47,23 +80,22 @@ extern "C" cpp_bindings_linux::detail::drainNonBlockingFd(abort_fd); return static_cast(cpp_core::StatusCodes::kAbortWriteError); } - if (ready > 0) - { - bytes_written = ::write(fd, buffer, buffer_size); - } - else + if (ready == 0) { - return 0; + return total_written; } + continue; } - if (bytes_written < 0) + if (num_written == 0) { - return cpp_bindings_linux::detail::failErrno(error_callback, cpp_core::StatusCodes::kWriteError); + return total_written; } + + return cpp_bindings_linux::detail::failErrno(error_callback, cpp_core::StatusCodes::kWriteError); } - return static_cast(bytes_written); + return total_written; } } // extern "C" diff --git a/tests/test_abort.cpp b/tests/test_abort.cpp index 8d6f324..1cfc541 100644 --- a/tests/test_abort.cpp +++ b/tests/test_abort.cpp @@ -11,7 +11,6 @@ #include #include -#include #include #include diff --git a/tests/test_serial_arduino.cpp b/tests/test_serial_arduino.cpp index fc74c0a..dc813b8 100644 --- a/tests/test_serial_arduino.cpp +++ b/tests/test_serial_arduino.cpp @@ -1,18 +1,22 @@ // Test for serial communication with Arduino echo script on /dev/ttyUSB0 #include +#include #include #include #include #include -#include #include + #include +#include #include #include +#include #include #include +#include #include #include @@ -21,18 +25,20 @@ class SerialArduinoTest : public ::testing::Test protected: void SetUp() override { + // NOLINTNEXTLINE(concurrency-mt-unsafe) const char *env_port = std::getenv("SERIAL_TEST_PORT"); const char *port = env_port != nullptr ? env_port : "/dev/ttyUSB0"; - handle_ = serialOpen(const_cast(static_cast(port)), 115200, 8, 0, 0, nullptr); + handle = serialOpen(const_cast(static_cast(port)), 115200, 8, 0, 0, nullptr); - if (handle_ <= 0) + if (handle <= 0) { - GTEST_SKIP() << "Could not open serial port '" << port + GTEST_FAIL() << "Could not open serial port '" << port << "'. Set SERIAL_TEST_PORT or connect Arduino on /dev/ttyUSB0."; } - const char *skip_delay = std::getenv("SERIAL_TEST_SKIP_INIT_DELAY"); - if (skip_delay == nullptr || std::strcmp(skip_delay, "1") != 0) + // Heuristic: real Arduinos often reset on open; CI pseudo TTYs do not. + const bool looks_like_ci_pty = (std::strstr(port, "/tmp/ttyCI_") != nullptr); + if (!looks_like_ci_pty) { usleep(2000000); } @@ -40,19 +46,19 @@ class SerialArduinoTest : public ::testing::Test void TearDown() override { - if (handle_ > 0) + if (handle > 0) { - serialClose(handle_, nullptr); - handle_ = 0; + serialClose(handle, nullptr); + handle = 0; } } - intptr_t handle_ = 0; + intptr_t handle = 0; }; TEST_F(SerialArduinoTest, OpenClose) { - EXPECT_GT(handle_, 0) << "serialOpen should return a positive handle"; + EXPECT_GT(handle, 0) << "serialOpen should return a positive handle"; } TEST_F(SerialArduinoTest, WriteReadEcho) @@ -60,45 +66,45 @@ TEST_F(SerialArduinoTest, WriteReadEcho) const char *test_message = "Hello Arduino!\n"; int message_len = static_cast(strlen(test_message)); - int written = serialWrite(handle_, test_message, message_len, 1000, 1, nullptr); + int written = serialWrite(handle, test_message, message_len, 1000, 1, nullptr); EXPECT_EQ(written, message_len) << "Should write all bytes. Written: " << written << ", Expected: " << message_len; usleep(500000); - char read_buffer[256] = {0}; - int read_bytes = serialRead(handle_, read_buffer, sizeof(read_buffer) - 1, 2000, 1, nullptr); + std::array read_buffer = {}; + int read_bytes = serialRead(handle, read_buffer.data(), static_cast(read_buffer.size()) - 1, 2000, 1, nullptr); EXPECT_GT(read_bytes, 0) << "Should read at least some bytes"; - EXPECT_LE(read_bytes, static_cast(sizeof(read_buffer) - 1)) << "Should not overflow buffer"; + EXPECT_LE(read_bytes, static_cast(read_buffer.size() - 1)) << "Should not overflow buffer"; - read_buffer[read_bytes] = '\0'; - EXPECT_STRNE(read_buffer, "") << "Should receive echo from Arduino"; + read_buffer[static_cast(read_bytes)] = '\0'; + EXPECT_STRNE(read_buffer.data(), "") << "Should receive echo from Arduino"; } TEST_F(SerialArduinoTest, MultipleEchoCycles) { - const char *messages[] = {"Test1\n", "Test2\n", "Test3\n"}; - const int num_messages = 3; + const std::array messages = {"Test1\n", "Test2\n", "Test3\n"}; - for (int i = 0; i < num_messages; ++i) + for (size_t index = 0; index < messages.size(); ++index) { - int msg_len = static_cast(strlen(messages[i])); + int msg_len = static_cast(strlen(messages[index])); - int written = serialWrite(handle_, messages[i], msg_len, 1000, 1, nullptr); - EXPECT_EQ(written, msg_len) << "Cycle " << i << ": write failed"; + int written = serialWrite(handle, messages[index], msg_len, 1000, 1, nullptr); + EXPECT_EQ(written, msg_len) << "Cycle " << index << ": write failed"; usleep(500000); - char read_buffer[256] = {0}; - int read_bytes = serialRead(handle_, read_buffer, sizeof(read_buffer) - 1, 2000, 1, nullptr); - EXPECT_GT(read_bytes, 0) << "Cycle " << i << ": read failed"; + std::array read_buffer = {}; + int read_bytes = + serialRead(handle, read_buffer.data(), static_cast(read_buffer.size()) - 1, 2000, 1, nullptr); + EXPECT_GT(read_bytes, 0) << "Cycle " << index << ": read failed"; } } TEST_F(SerialArduinoTest, ReadTimeout) { - char buffer[256]; - int read_bytes = serialRead(handle_, buffer, sizeof(buffer), 100, 1, nullptr); + std::array buffer = {}; + int read_bytes = serialRead(handle, buffer.data(), static_cast(buffer.size()), 100, 1, nullptr); EXPECT_GE(read_bytes, 0) << "Timeout should return 0, not error"; } @@ -106,21 +112,99 @@ TEST_F(SerialArduinoTest, AbortRead) { std::atomic read_result{999}; std::thread reader([&] { - unsigned char buffer[16] = {}; - read_result.store(serialRead(handle_, buffer, sizeof(buffer), 10000, 1, nullptr)); + std::array buffer = {}; + read_result.store(serialRead(handle, buffer.data(), static_cast(buffer.size()), 10000, 1, nullptr)); }); std::this_thread::sleep_for(std::chrono::milliseconds(50)); - EXPECT_EQ(serialAbortRead(handle_, nullptr), 0); + EXPECT_EQ(serialAbortRead(handle, nullptr), 0); reader.join(); EXPECT_EQ(read_result.load(), static_cast(cpp_core::StatusCodes::kAbortReadError)); } +TEST_F(SerialArduinoTest, AbortWriteDuringLargeTransfer) +{ + const int64_t default_total_bytes = 100LL * 1024LL * 1024LL; + const int64_t default_abort_after_bytes = 1LL * 1024LL * 1024LL; + const int64_t total_bytes = default_total_bytes; + const int64_t abort_after_bytes = default_abort_after_bytes; + + // We intentionally do NOT read the echo here; the goal is to saturate the OS TX queue so + // serialWrite() hits EAGAIN -> poll(), then verify serialAbortWrite() unblocks it. + constexpr int kChunkSize = 64 * 1024; + static_assert(kChunkSize > 0); + + std::string chunk(static_cast(kChunkSize), '\x55'); + std::atomic bytes_sent{0}; + std::atomic in_write_call{false}; + std::atomic write_result{999}; + + std::thread writer([&] { + while (bytes_sent.load() < total_bytes) + { + in_write_call.store(true); + const int res = serialWrite(handle, chunk.data(), static_cast(chunk.size()), 10000, 1, nullptr); + in_write_call.store(false); + + if (res == static_cast(cpp_core::StatusCodes::kAbortWriteError)) + { + write_result.store(res); + return; + } + if (res < 0) + { + write_result.store(res); + return; + } + bytes_sent.fetch_add(res); + } + write_result.store(static_cast(cpp_core::StatusCodes::kSuccess)); + }); + + // Wait briefly for either progress OR an early error, then start aborting. + const auto wait_deadline = std::chrono::steady_clock::now() + std::chrono::milliseconds(500); + while (std::chrono::steady_clock::now() < wait_deadline) + { + if (write_result.load() != 999) + { + break; + } + if (bytes_sent.load() >= abort_after_bytes) + { + break; + } + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + + const auto abort_deadline = std::chrono::steady_clock::now() + std::chrono::seconds(3); + while (std::chrono::steady_clock::now() < abort_deadline) + { + if (write_result.load() != 999) + { + break; + } + EXPECT_EQ(serialAbortWrite(handle, nullptr), 0); + std::this_thread::sleep_for(std::chrono::milliseconds(25)); + } + + // Hard fail-safe: if abort did not take effect, close the handle to force the writer to exit. + if (write_result.load() != static_cast(cpp_core::StatusCodes::kAbortWriteError)) + { + (void)serialClose(handle, nullptr); + handle = 0; + } + + writer.join(); + + EXPECT_EQ(write_result.load(), static_cast(cpp_core::StatusCodes::kAbortWriteError)) + << "Expected abort during large transfer. bytes_sent=" << bytes_sent.load() << " total_bytes=" << total_bytes; +} + TEST(SerialInvalidHandleTest, InvalidHandleRead) { - char buffer[256]; - int result = serialRead(-1, buffer, sizeof(buffer), 1000, 1, nullptr); + std::array buffer = {}; + int result = serialRead(-1, buffer.data(), static_cast(buffer.size()), 1000, 1, nullptr); EXPECT_EQ(result, static_cast(cpp_core::StatusCodes::kInvalidHandleError)) << "Should return error for invalid handle"; } From ab70d942d43f745c5e7cad3e6eecaf3cf272171f Mon Sep 17 00:00:00 2001 From: Katze719 Date: Sat, 20 Dec 2025 00:33:04 +0100 Subject: [PATCH 6/6] Refactor serial read functions to enhance timeout handling and improve test coverage - Renamed `readUntilNotReady` to `readUntilTimeoutOrFull` for clarity and updated its parameters to include a per-iteration timeout. - Modified the `serialRead` function to utilize the new read function, allowing for more flexible timeout management based on a multiplier. - Added a new utility function `drainInput` in tests to facilitate input handling during tests. - Introduced new test cases to validate timeout behavior when reading from the serial interface, ensuring robustness in scenarios with no data available. --- src/serial_read.cpp | 26 +++++++++++++++++----- tests/test_serial_arduino.cpp | 41 +++++++++++++++++++++++++++++++++++ 2 files changed, 62 insertions(+), 5 deletions(-) diff --git a/src/serial_read.cpp b/src/serial_read.cpp index dfab698..f9e904f 100644 --- a/src/serial_read.cpp +++ b/src/serial_read.cpp @@ -41,13 +41,14 @@ auto handleWaitResultForRead(int wait_result, int abort_fd, ErrorCallbackT error return std::nullopt; } -auto readUntilNotReady(int fd, int abort_fd, unsigned char *buf, int buffer_size, int already_read, - ErrorCallbackT error_callback) -> int +auto readUntilTimeoutOrFull(int fd, int abort_fd, unsigned char *buf, int buffer_size, int already_read, + int per_iteration_timeout_ms, ErrorCallbackT error_callback) -> int { int total_read = already_read; while (total_read < buffer_size) { - const int wait_result = cpp_bindings_linux::detail::waitFdReadyOrAbort(fd, abort_fd, 0, true); + const int wait_result = + cpp_bindings_linux::detail::waitFdReadyOrAbort(fd, abort_fd, per_iteration_timeout_ms, true); if (const auto immediate = handleWaitResultForRead(wait_result, abort_fd, error_callback); immediate.has_value()) { @@ -66,6 +67,7 @@ auto readUntilNotReady(int fd, int abort_fd, unsigned char *buf, int buffer_size } if (more_bytes == 0) { + // Driver reported readiness but returned nothing; treat like "no more data". return total_read; } total_read += static_cast(more_bytes); @@ -76,7 +78,7 @@ auto readUntilNotReady(int fd, int abort_fd, unsigned char *buf, int buffer_size extern "C" { - MODULE_API auto serialRead(int64_t handle, void *buffer, int buffer_size, int timeout_ms, int /*multiplier*/, + MODULE_API auto serialRead(int64_t handle, void *buffer, int buffer_size, int timeout_ms, int multiplier, ErrorCallbackT error_callback) -> int { if (buffer == nullptr || buffer_size <= 0) @@ -127,7 +129,21 @@ extern "C" } } - return readUntilNotReady(fd, abort_fd, buf, buffer_size, static_cast(bytes_read), error_callback); + const int already_read = static_cast(bytes_read); + + if (multiplier == 0) + { + return already_read; + } + + const int per_byte_timeout_ms = timeout_ms * multiplier; + if (per_byte_timeout_ms <= 0) + { + return readUntilTimeoutOrFull(fd, abort_fd, buf, buffer_size, already_read, 0, error_callback); + } + + return readUntilTimeoutOrFull(fd, abort_fd, buf, buffer_size, already_read, per_byte_timeout_ms, + error_callback); } } // extern "C" diff --git a/tests/test_serial_arduino.cpp b/tests/test_serial_arduino.cpp index dc813b8..9401a61 100644 --- a/tests/test_serial_arduino.cpp +++ b/tests/test_serial_arduino.cpp @@ -20,6 +20,26 @@ #include #include +namespace +{ +auto drainInput(intptr_t handle) -> void +{ + std::array tmp = {}; + for (;;) + { + const int res = serialRead(handle, tmp.data(), static_cast(tmp.size()), 10, 0, nullptr); + if (res < 0) + { + FAIL() << "drainInput failed with error " << res; + } + if (res == 0) + { + return; + } + } +} +} // namespace + class SerialArduinoTest : public ::testing::Test { protected: @@ -108,6 +128,27 @@ TEST_F(SerialArduinoTest, ReadTimeout) EXPECT_GE(read_bytes, 0) << "Timeout should return 0, not error"; } +TEST_F(SerialArduinoTest, Read60BytesWithoutWritingTimesOut) +{ + drainInput(handle); + std::array buffer = {}; + const int read_bytes = serialRead(handle, buffer.data(), static_cast(buffer.size()), 200, 1, nullptr); + EXPECT_EQ(read_bytes, 0) << "Expected timeout (0 bytes) when no data is sent"; +} + +TEST_F(SerialArduinoTest, Write10BytesRead60Returns10ThenTimesOut) +{ + drainInput(handle); + const std::array payload = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9'}; + const int written = serialWrite(handle, payload.data(), static_cast(payload.size()), 2000, 1, nullptr); + ASSERT_EQ(written, static_cast(payload.size())); + + std::array buffer = {}; + const int read_bytes = serialRead(handle, buffer.data(), static_cast(buffer.size()), 200, 1, nullptr); + EXPECT_EQ(read_bytes, static_cast(payload.size())) + << "Expected to read the 10 echoed bytes, then timeout waiting for more"; +} + TEST_F(SerialArduinoTest, AbortRead) { std::atomic read_result{999};