diff --git a/integration_tests/ffi_bindings.ts b/integration_tests/ffi_bindings.ts index 6060b81..9c16f46 100644 --- a/integration_tests/ffi_bindings.ts +++ b/integration_tests/ffi_bindings.ts @@ -23,6 +23,14 @@ const symbols = { parameters: ["i64", "pointer", "i32", "i32", "i32", "pointer"] as const, result: "i32" as const, }, + serialAbortRead: { + parameters: ["i64", "pointer"] as const, + result: "i32" as const, + }, + serialAbortWrite: { + parameters: ["i64", "pointer"] as const, + result: "i32" as const, + }, }; /** diff --git a/integration_tests/integration_test.ts b/integration_tests/integration_test.ts index 806235f..d5c1a12 100644 --- a/integration_tests/integration_test.ts +++ b/integration_tests/integration_test.ts @@ -23,6 +23,8 @@ Deno.test({ assertExists(lib.serialClose); assertExists(lib.serialRead); assertExists(lib.serialWrite); + assertExists(lib.serialAbortRead); + assertExists(lib.serialAbortWrite); console.log("cpp-bindings-linux library loaded and symbols resolved"); }, diff --git a/src/detail/abort_registry.cpp b/src/detail/abort_registry.cpp new file mode 100644 index 0000000..2ff8b7b --- /dev/null +++ b/src/detail/abort_registry.cpp @@ -0,0 +1,125 @@ +#include "abort_registry.hpp" + +#include +#include +#include +#include +#include + +namespace cpp_bindings_linux::detail +{ +namespace +{ +auto makePipeNonBlockingCloexec(std::array &out_pipe) -> bool +{ + if (::pipe2(out_pipe.data(), O_NONBLOCK | O_CLOEXEC) == 0) + { + return true; + } + + if (::pipe(out_pipe.data()) != 0) + { + return false; + } + + for (int index = 0; index < 2; ++index) + { + const auto pipe_index = static_cast(index); + const int flags = ::fcntl(out_pipe[pipe_index], F_GETFL, 0); + if (flags >= 0) + { + (void)::fcntl(out_pipe[pipe_index], F_SETFL, flags | O_NONBLOCK); + } + const int fd_flags = ::fcntl(out_pipe[pipe_index], F_GETFD, 0); + if (fd_flags >= 0) + { + (void)::fcntl(out_pipe[pipe_index], F_SETFD, fd_flags | FD_CLOEXEC); + } + } + return true; +} + +auto closeIfValid(int fd) -> void +{ + if (fd >= 0) + { + (void)::close(fd); + } +} + +std::mutex g_abort_mutex; +std::unordered_map g_abort_by_fd; +} // namespace + +auto registerAbortPipesForFd(int fd) -> bool +{ + if (fd < 0) + { + return false; + } + + std::lock_guard lock(g_abort_mutex); + if (g_abort_by_fd.contains(fd)) + { + return true; + } + + std::array read_pipe = {-1, -1}; + std::array write_pipe = {-1, -1}; + if (!makePipeNonBlockingCloexec(read_pipe) || !makePipeNonBlockingCloexec(write_pipe)) + { + closeIfValid(read_pipe[0]); + closeIfValid(read_pipe[1]); + closeIfValid(write_pipe[0]); + closeIfValid(write_pipe[1]); + return false; + } + + AbortPipes pipes; + pipes.read_abort_r = read_pipe[0]; + pipes.read_abort_w = read_pipe[1]; + pipes.write_abort_r = write_pipe[0]; + pipes.write_abort_w = write_pipe[1]; + + g_abort_by_fd.emplace(fd, pipes); + return true; +} + +auto unregisterAbortPipesForFd(int fd) -> void +{ + if (fd < 0) + { + return; + } + + std::lock_guard lock(g_abort_mutex); + auto iter = g_abort_by_fd.find(fd); + if (iter == g_abort_by_fd.end()) + { + return; + } + + closeIfValid(iter->second.read_abort_r); + closeIfValid(iter->second.read_abort_w); + closeIfValid(iter->second.write_abort_r); + closeIfValid(iter->second.write_abort_w); + + g_abort_by_fd.erase(iter); +} + +auto getAbortPipesForFd(int fd) -> std::optional +{ + if (fd < 0) + { + return std::nullopt; + } + + std::lock_guard lock(g_abort_mutex); + auto iter = g_abort_by_fd.find(fd); + if (iter == g_abort_by_fd.end()) + { + return std::nullopt; + } + return iter->second; +} +} // namespace cpp_bindings_linux::detail diff --git a/src/detail/abort_registry.hpp b/src/detail/abort_registry.hpp new file mode 100644 index 0000000..688fef0 --- /dev/null +++ b/src/detail/abort_registry.hpp @@ -0,0 +1,24 @@ +#pragma once + +#include + +namespace cpp_bindings_linux::detail +{ +struct AbortPipes +{ + int read_abort_r = -1; + int read_abort_w = -1; + int write_abort_r = -1; + int write_abort_w = -1; +}; + +// Create (if needed) and register abort pipes for a serial FD. +// Returns true on success. +auto registerAbortPipesForFd(int fd) -> bool; + +// Unregister and close any abort pipes for a serial FD. Safe to call multiple times. +auto unregisterAbortPipesForFd(int fd) -> void; + +// Returns the abort pipes for a serial FD, or std::nullopt if none exist. +auto getAbortPipesForFd(int fd) -> std::optional; +} // namespace cpp_bindings_linux::detail diff --git a/src/detail/posix_helpers.hpp b/src/detail/posix_helpers.hpp index 8a78f6a..a0dc280 100644 --- a/src/detail/posix_helpers.hpp +++ b/src/detail/posix_helpers.hpp @@ -2,6 +2,7 @@ #include +#include #include #include #include @@ -95,6 +96,62 @@ inline auto failErrno(Callback error_callback, cpp_core::StatusCodes code) -> Re return static_cast(code); } +// Drain any pending bytes from a non-blocking FD (used for abort pipes). +inline auto drainNonBlockingFd(int file_descriptor) -> void +{ + if (file_descriptor < 0) + { + return; + } + std::array buf = {}; + for (;;) + { + const ssize_t num_read = ::read(file_descriptor, buf.data(), buf.size()); + if (num_read > 0) + { + continue; + } + if (num_read == 0) + { + return; + } + if (errno == EAGAIN || errno == EWOULDBLOCK) + { + return; + } + if (errno == EINTR) + { + continue; + } + return; + } +} + +// Returns true if the abort FD is signaled (has at least one byte to read). If true, drains it. +inline auto consumeAbortIfSet(int abort_fd) -> bool +{ + if (abort_fd < 0) + { + return false; + } + struct pollfd poll_fd = {}; + poll_fd.fd = abort_fd; + poll_fd.events = POLLIN; + poll_fd.revents = 0; + + const int poll_result = poll(&poll_fd, 1, 0); + if (poll_result <= 0) + { + return false; + } + if ((poll_fd.revents & POLLIN) == 0) + { + return false; + } + drainNonBlockingFd(abort_fd); + return true; +} + // Poll helper used by read/write to implement timeouts. // Returns: -1 on poll error, 0 on timeout/not-ready, 1 on ready. inline auto waitFdReady(int file_descriptor, int timeout_ms, bool for_read) -> int @@ -123,4 +180,48 @@ inline auto waitFdReady(int file_descriptor, int timeout_ms, bool for_read) -> i } return 0; } + +// Poll helper with an optional abort FD. +// Returns: -1 on poll error, 0 on timeout/not-ready, 1 on ready, 2 on abort. +inline auto waitFdReadyOrAbort(int file_descriptor, int abort_fd, int timeout_ms, bool for_read) -> int +{ + std::array fds = {}; + fds[0].fd = file_descriptor; + fds[0].events = for_read ? POLLIN : POLLOUT; + fds[0].revents = 0; + + nfds_t nfds = 1; + if (abort_fd >= 0) + { + fds[1].fd = abort_fd; + fds[1].events = POLLIN; + fds[1].revents = 0; + nfds = 2; + } + + const int poll_result = poll(fds.data(), nfds, timeout_ms); + if (poll_result < 0) + { + return -1; + } + if (poll_result == 0) + { + return 0; + } + + if (abort_fd >= 0 && ((fds[1].revents & POLLIN) != 0)) + { + return 2; + } + + if (for_read && ((fds[0].revents & POLLIN) != 0)) + { + return 1; + } + if (!for_read && ((fds[0].revents & POLLOUT) != 0)) + { + return 1; + } + return 0; +} } // namespace cpp_bindings_linux::detail diff --git a/src/serial_abort_read.cpp b/src/serial_abort_read.cpp new file mode 100644 index 0000000..08e2ca7 --- /dev/null +++ b/src/serial_abort_read.cpp @@ -0,0 +1,49 @@ +#include +#include + +#include "detail/abort_registry.hpp" +#include "detail/posix_helpers.hpp" + +#include +#include +#include + +extern "C" +{ + MODULE_API auto serialAbortRead(int64_t handle, ErrorCallbackT error_callback) -> int + { + if (handle <= 0 || handle > std::numeric_limits::max()) + { + return cpp_bindings_linux::detail::failMsg(error_callback, cpp_core::StatusCodes::kInvalidHandleError, + "Invalid handle"); + } + + const int fd = static_cast(handle); + const auto pipes = cpp_bindings_linux::detail::getAbortPipesForFd(fd); + if (!pipes) + { + return cpp_bindings_linux::detail::failMsg(error_callback, cpp_core::StatusCodes::kInvalidHandleError, + "Invalid handle"); + } + + const unsigned char token = 1; + for (;;) + { + const ssize_t num_written = ::write(pipes->read_abort_w, &token, 1); + if (num_written == 1) + { + return static_cast(cpp_core::StatusCodes::kSuccess); + } + if (num_written < 0 && errno == EINTR) + { + continue; + } + // Pipe already full -> treat as success (abort already requested). + if (num_written < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) + { + return static_cast(cpp_core::StatusCodes::kSuccess); + } + return cpp_bindings_linux::detail::failErrno(error_callback, cpp_core::StatusCodes::kAbortReadError); + } + } +} // extern "C" diff --git a/src/serial_abort_write.cpp b/src/serial_abort_write.cpp new file mode 100644 index 0000000..5e5d279 --- /dev/null +++ b/src/serial_abort_write.cpp @@ -0,0 +1,49 @@ +#include +#include + +#include "detail/abort_registry.hpp" +#include "detail/posix_helpers.hpp" + +#include +#include +#include + +extern "C" +{ + MODULE_API auto serialAbortWrite(int64_t handle, ErrorCallbackT error_callback) -> int + { + if (handle <= 0 || handle > std::numeric_limits::max()) + { + return cpp_bindings_linux::detail::failMsg(error_callback, cpp_core::StatusCodes::kInvalidHandleError, + "Invalid handle"); + } + + const int fd = static_cast(handle); + const auto pipes = cpp_bindings_linux::detail::getAbortPipesForFd(fd); + if (!pipes) + { + return cpp_bindings_linux::detail::failMsg(error_callback, cpp_core::StatusCodes::kInvalidHandleError, + "Invalid handle"); + } + + const unsigned char token = 1; + for (;;) + { + const ssize_t num_written = ::write(pipes->write_abort_w, &token, 1); + if (num_written == 1) + { + return static_cast(cpp_core::StatusCodes::kSuccess); + } + if (num_written < 0 && errno == EINTR) + { + continue; + } + // Pipe already full -> treat as success (abort already requested). + if (num_written < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) + { + return static_cast(cpp_core::StatusCodes::kSuccess); + } + return cpp_bindings_linux::detail::failErrno(error_callback, cpp_core::StatusCodes::kAbortWriteError); + } + } +} // extern "C" diff --git a/src/serial_close.cpp b/src/serial_close.cpp index 6ef9cea..f21eab1 100644 --- a/src/serial_close.cpp +++ b/src/serial_close.cpp @@ -1,6 +1,7 @@ #include #include +#include "detail/abort_registry.hpp" #include "detail/posix_helpers.hpp" #include @@ -22,6 +23,7 @@ extern "C" } const int fd = static_cast(handle); + cpp_bindings_linux::detail::unregisterAbortPipesForFd(fd); if (close(fd) != 0) { return cpp_bindings_linux::detail::failErrno(error_callback, cpp_core::StatusCodes::kCloseHandleError); diff --git a/src/serial_open.cpp b/src/serial_open.cpp index e2ddd13..91406b1 100644 --- a/src/serial_open.cpp +++ b/src/serial_open.cpp @@ -1,6 +1,7 @@ #include #include +#include "detail/abort_registry.hpp" #include "detail/posix_helpers.hpp" #include @@ -154,6 +155,12 @@ extern "C" tcflush(handle.get(), TCIOFLUSH); + if (!cpp_bindings_linux::detail::registerAbortPipesForFd(handle.get())) + { + return cpp_bindings_linux::detail::failMsg(error_callback, cpp_core::StatusCodes::kSetStateError, + "Failed to initialize abort pipes"); + } + // Note: Some devices (e.g., Arduino) reset when the serial port is opened. // It is recommended to wait 1-2 seconds after opening before sending data // to allow the device to initialize. diff --git a/src/serial_read.cpp b/src/serial_read.cpp index a883b80..f9e904f 100644 --- a/src/serial_read.cpp +++ b/src/serial_read.cpp @@ -1,15 +1,84 @@ #include #include +#include "detail/abort_registry.hpp" #include "detail/posix_helpers.hpp" #include #include +#include #include +namespace +{ +constexpr int kGraceRetryTimeoutMs = 10; + +auto tryReadOnceNonBlocking(int fd, unsigned char *dst, int size) -> ssize_t +{ + const ssize_t bytes = ::read(fd, dst, size); + if (bytes < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) + { + return 0; + } + return bytes; +} + +auto handleWaitResultForRead(int wait_result, int abort_fd, ErrorCallbackT error_callback) -> std::optional +{ + if (wait_result < 0) + { + return cpp_bindings_linux::detail::failErrno(error_callback, cpp_core::StatusCodes::kReadError); + } + if (wait_result == 0) + { + return 0; + } + if (wait_result == 2) + { + cpp_bindings_linux::detail::drainNonBlockingFd(abort_fd); + return static_cast(cpp_core::StatusCodes::kAbortReadError); + } + return std::nullopt; +} + +auto readUntilTimeoutOrFull(int fd, int abort_fd, unsigned char *buf, int buffer_size, int already_read, + int per_iteration_timeout_ms, ErrorCallbackT error_callback) -> int +{ + int total_read = already_read; + while (total_read < buffer_size) + { + const int wait_result = + cpp_bindings_linux::detail::waitFdReadyOrAbort(fd, abort_fd, per_iteration_timeout_ms, true); + if (const auto immediate = handleWaitResultForRead(wait_result, abort_fd, error_callback); + immediate.has_value()) + { + // timeout -> return what we have; abort/error handled by helper + if (*immediate == 0) + { + return total_read; + } + return *immediate; + } + + const ssize_t more_bytes = tryReadOnceNonBlocking(fd, buf + total_read, buffer_size - total_read); + if (more_bytes < 0) + { + return cpp_bindings_linux::detail::failErrno(error_callback, cpp_core::StatusCodes::kReadError); + } + if (more_bytes == 0) + { + // Driver reported readiness but returned nothing; treat like "no more data". + return total_read; + } + total_read += static_cast(more_bytes); + } + return total_read; +} +} // namespace + extern "C" { - MODULE_API auto serialRead(int64_t handle, void *buffer, int buffer_size, int timeout_ms, int /*multiplier*/, + MODULE_API auto serialRead(int64_t handle, void *buffer, int buffer_size, int timeout_ms, int multiplier, ErrorCallbackT error_callback) -> int { if (buffer == nullptr || buffer_size <= 0) @@ -27,26 +96,16 @@ extern "C" const int fd = static_cast(handle); auto *buf = static_cast(buffer); - const int ready = cpp_bindings_linux::detail::waitFdReady(fd, timeout_ms, true); - if (ready < 0) - { - return cpp_bindings_linux::detail::failErrno(error_callback, cpp_core::StatusCodes::kReadError); - } - if (ready == 0) + const auto abort_pipes = cpp_bindings_linux::detail::getAbortPipesForFd(fd); + const int abort_fd = abort_pipes ? abort_pipes->read_abort_r : -1; + + const int ready = cpp_bindings_linux::detail::waitFdReadyOrAbort(fd, abort_fd, timeout_ms, true); + if (const auto immediate = handleWaitResultForRead(ready, abort_fd, error_callback); immediate.has_value()) { - return 0; + return *immediate; } - const auto try_read_once = [&](unsigned char *dst, int size) -> ssize_t { - const ssize_t bytes = ::read(fd, dst, size); - if (bytes < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) - { - return 0; - } - return bytes; - }; - - ssize_t bytes_read = try_read_once(buf, buffer_size); + ssize_t bytes_read = tryReadOnceNonBlocking(fd, buf, buffer_size); if (bytes_read < 0) { return cpp_bindings_linux::detail::failErrno(error_callback, cpp_core::StatusCodes::kReadError); @@ -55,43 +114,36 @@ extern "C" // Some drivers can report readiness but still return 0; give it a tiny grace period and retry once. if (bytes_read == 0) { - const int retry_ready = cpp_bindings_linux::detail::waitFdReady(fd, 10, true); - if (retry_ready < 0) + const int retry_ready = + cpp_bindings_linux::detail::waitFdReadyOrAbort(fd, abort_fd, kGraceRetryTimeoutMs, true); + if (const auto immediate = handleWaitResultForRead(retry_ready, abort_fd, error_callback); + immediate.has_value()) { - return cpp_bindings_linux::detail::failErrno(error_callback, cpp_core::StatusCodes::kReadError); + return *immediate; } - if (retry_ready == 0) - { - return 0; - } - bytes_read = try_read_once(buf, buffer_size); + + bytes_read = tryReadOnceNonBlocking(fd, buf, buffer_size); if (bytes_read <= 0) { return 0; } } - int total_read = static_cast(bytes_read); - while (total_read < buffer_size) + const int already_read = static_cast(bytes_read); + + if (multiplier == 0) { - const int loop_ready = cpp_bindings_linux::detail::waitFdReady(fd, 0, true); - if (loop_ready < 0) - { - return cpp_bindings_linux::detail::failErrno(error_callback, cpp_core::StatusCodes::kReadError); - } - if (loop_ready == 0) - { - break; - } - const ssize_t more_bytes = try_read_once(buf + total_read, buffer_size - total_read); - if (more_bytes <= 0) - { - break; - } - total_read += static_cast(more_bytes); + return already_read; + } + + const int per_byte_timeout_ms = timeout_ms * multiplier; + if (per_byte_timeout_ms <= 0) + { + return readUntilTimeoutOrFull(fd, abort_fd, buf, buffer_size, already_read, 0, error_callback); } - return total_read; + return readUntilTimeoutOrFull(fd, abort_fd, buf, buffer_size, already_read, per_byte_timeout_ms, + error_callback); } } // extern "C" diff --git a/src/serial_write.cpp b/src/serial_write.cpp index abd7b10..840bf01 100644 --- a/src/serial_write.cpp +++ b/src/serial_write.cpp @@ -1,18 +1,18 @@ #include #include +#include "detail/abort_registry.hpp" #include "detail/posix_helpers.hpp" #include -#include #include -#include #include extern "C" { - MODULE_API auto serialWrite(int64_t handle, const void *buffer, int buffer_size, int timeout_ms, int /*multiplier*/, + // NOLINTNEXTLINE(readability-function-cognitive-complexity) + MODULE_API auto serialWrite(int64_t handle, const void *buffer, int buffer_size, int timeout_ms, int multiplier, ErrorCallbackT error_callback) -> int { if (buffer == nullptr || buffer_size <= 0) @@ -28,37 +28,74 @@ extern "C" } const int fd = static_cast(handle); + const auto abort_pipes = cpp_bindings_linux::detail::getAbortPipesForFd(fd); + const int abort_fd = abort_pipes ? abort_pipes->write_abort_r : -1; - ssize_t bytes_written = ::write(fd, buffer, buffer_size); - if (bytes_written < 0) + const auto *src = static_cast(buffer); + int total_written = 0; + + // Mirror cpp_core semantics: timeout_ms applies to the first byte, + // then timeout_ms * multiplier applies to subsequent bytes. + int current_timeout_ms = timeout_ms; + + while (total_written < buffer_size) { - if (errno == EAGAIN || errno == EWOULDBLOCK) + // Abort should also cancel writers that never hit EAGAIN/poll. + if (cpp_bindings_linux::detail::consumeAbortIfSet(abort_fd)) + { + return static_cast(cpp_core::StatusCodes::kAbortWriteError); + } + + const ssize_t num_written = ::write(fd, src + total_written, buffer_size - total_written); + if (num_written > 0) + { + total_written += static_cast(num_written); + + // If multiplier is 0, return immediately after the first successful write. + if (multiplier == 0) + { + return total_written; + } + // Subsequent bytes use scaled timeout. + current_timeout_ms = timeout_ms * multiplier; + continue; + } + + if (num_written < 0 && errno == EINTR) + { + continue; + } + + if (num_written < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) { - const int ready = cpp_bindings_linux::detail::waitFdReady(fd, timeout_ms, false); + const int ready = + cpp_bindings_linux::detail::waitFdReadyOrAbort(fd, abort_fd, current_timeout_ms, false); if (ready < 0) { return cpp_bindings_linux::detail::failErrno(error_callback, cpp_core::StatusCodes::kWriteError); } - if (ready > 0) + if (ready == 2) { - bytes_written = ::write(fd, buffer, buffer_size); + cpp_bindings_linux::detail::drainNonBlockingFd(abort_fd); + return static_cast(cpp_core::StatusCodes::kAbortWriteError); } - else + if (ready == 0) { - return 0; + return total_written; } + continue; } - if (bytes_written < 0) + if (num_written == 0) { - return cpp_bindings_linux::detail::failErrno(error_callback, cpp_core::StatusCodes::kWriteError); + return total_written; } - } - tcdrain(fd); + return cpp_bindings_linux::detail::failErrno(error_callback, cpp_core::StatusCodes::kWriteError); + } - return static_cast(bytes_written); + return total_written; } } // extern "C" diff --git a/tests/test_abort.cpp b/tests/test_abort.cpp new file mode 100644 index 0000000..1cfc541 --- /dev/null +++ b/tests/test_abort.cpp @@ -0,0 +1,149 @@ +#include +#include +#include +#include +#include +#include +#include +#include + +#include "../src/detail/abort_registry.hpp" + +#include +#include +#include + +#include +#include +#include + +namespace +{ +class UniqueFd +{ + public: + explicit UniqueFd(int fd) : fd_(fd) + { + } + UniqueFd(const UniqueFd &) = delete; + auto operator=(const UniqueFd &) -> UniqueFd & = delete; + UniqueFd(UniqueFd &&other) noexcept : fd_(other.fd_) + { + other.fd_ = -1; + } + auto operator=(UniqueFd &&other) noexcept -> UniqueFd & + { + if (this != &other) + { + reset(other.release()); + } + return *this; + } + ~UniqueFd() + { + reset(-1); + } + + [[nodiscard]] auto get() const -> int + { + return fd_; + } + [[nodiscard]] auto release() -> int + { + const int out = fd_; + fd_ = -1; + return out; + } + auto reset(int new_fd) -> void + { + if (fd_ >= 0) + { + (void)::close(fd_); + } + fd_ = new_fd; + } + + private: + int fd_ = -1; +}; + +auto fillNonBlockingFd(int fd) -> void +{ + const char buf[4096] = {}; + for (;;) + { + const ssize_t num_written = ::write(fd, buf, sizeof(buf)); + if (num_written > 0) + { + continue; + } + if (num_written < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) + { + return; + } + if (num_written < 0 && errno == EINTR) + { + continue; + } + return; + } +} +} // namespace + +TEST(AbortReadTest, AbortsBlockingReadFromOtherThread) +{ + int master_fd = -1; + int slave_fd = -1; + char slave_name[128] = {}; + ASSERT_EQ(::openpty(&master_fd, &slave_fd, slave_name, nullptr, nullptr), 0); + UniqueFd master(master_fd); + UniqueFd slave(slave_fd); + + // Use the path with serialOpen so we also test that abort pipes are registered on open(). + intptr_t handle = serialOpen(static_cast(slave_name), 115200, 8, 0, 0, nullptr); + ASSERT_GT(handle, 0); + + std::atomic read_result{999}; + std::thread reader([&] { + unsigned char buffer[16] = {}; + // Long timeout to ensure we block in poll() until abort happens. + read_result.store(serialRead(handle, buffer, sizeof(buffer), 10'000, 1, nullptr)); + }); + + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + EXPECT_EQ(serialAbortRead(handle, nullptr), 0); + + reader.join(); + EXPECT_EQ(read_result.load(), static_cast(cpp_core::StatusCodes::kAbortReadError)); + + EXPECT_EQ(serialClose(handle, nullptr), 0); +} + +TEST(AbortWriteTest, AbortsBlockingWriteFromOtherThread) +{ + int pipe_fds[2] = {-1, -1}; + ASSERT_EQ(::pipe2(pipe_fds, O_NONBLOCK | O_CLOEXEC), 0); + UniqueFd read_end(pipe_fds[0]); + UniqueFd write_end(pipe_fds[1]); + + // Register abort pipes since we bypass serialOpen() here. + ASSERT_TRUE(cpp_bindings_linux::detail::registerAbortPipesForFd(write_end.get())); + + // Fill the pipe buffer so future writes hit EAGAIN and serialWrite blocks in poll(). + fillNonBlockingFd(write_end.get()); + + const char payload[4096] = {}; + std::atomic write_result{999}; + std::thread writer([&] { + write_result.store( + serialWrite(write_end.get(), payload, static_cast(sizeof(payload)), 10'000, 1, nullptr)); + }); + + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + EXPECT_EQ(serialAbortWrite(write_end.get(), nullptr), 0); + + writer.join(); + EXPECT_EQ(write_result.load(), static_cast(cpp_core::StatusCodes::kAbortWriteError)); + + cpp_bindings_linux::detail::unregisterAbortPipesForFd(write_end.get()); +} diff --git a/tests/test_serial_arduino.cpp b/tests/test_serial_arduino.cpp index 6772c4a..9401a61 100644 --- a/tests/test_serial_arduino.cpp +++ b/tests/test_serial_arduino.cpp @@ -1,48 +1,84 @@ // Test for serial communication with Arduino echo script on /dev/ttyUSB0 +#include +#include #include #include #include #include -#include #include + #include +#include +#include +#include +#include +#include #include +#include +#include +#include + +namespace +{ +auto drainInput(intptr_t handle) -> void +{ + std::array tmp = {}; + for (;;) + { + const int res = serialRead(handle, tmp.data(), static_cast(tmp.size()), 10, 0, nullptr); + if (res < 0) + { + FAIL() << "drainInput failed with error " << res; + } + if (res == 0) + { + return; + } + } +} +} // namespace class SerialArduinoTest : public ::testing::Test { protected: void SetUp() override { + // NOLINTNEXTLINE(concurrency-mt-unsafe) const char *env_port = std::getenv("SERIAL_TEST_PORT"); const char *port = env_port != nullptr ? env_port : "/dev/ttyUSB0"; - handle_ = serialOpen(const_cast(static_cast(port)), 115200, 8, 0, 0, nullptr); + handle = serialOpen(const_cast(static_cast(port)), 115200, 8, 0, 0, nullptr); - if (handle_ <= 0) + if (handle <= 0) { - GTEST_SKIP() << "Could not open serial port '" << port + GTEST_FAIL() << "Could not open serial port '" << port << "'. Set SERIAL_TEST_PORT or connect Arduino on /dev/ttyUSB0."; } - usleep(2000000); + // Heuristic: real Arduinos often reset on open; CI pseudo TTYs do not. + const bool looks_like_ci_pty = (std::strstr(port, "/tmp/ttyCI_") != nullptr); + if (!looks_like_ci_pty) + { + usleep(2000000); + } } void TearDown() override { - if (handle_ > 0) + if (handle > 0) { - serialClose(handle_, nullptr); - handle_ = 0; + serialClose(handle, nullptr); + handle = 0; } } - intptr_t handle_ = 0; + intptr_t handle = 0; }; TEST_F(SerialArduinoTest, OpenClose) { - EXPECT_GT(handle_, 0) << "serialOpen should return a positive handle"; + EXPECT_GT(handle, 0) << "serialOpen should return a positive handle"; } TEST_F(SerialArduinoTest, WriteReadEcho) @@ -50,52 +86,166 @@ TEST_F(SerialArduinoTest, WriteReadEcho) const char *test_message = "Hello Arduino!\n"; int message_len = static_cast(strlen(test_message)); - int written = serialWrite(handle_, test_message, message_len, 1000, 1, nullptr); + int written = serialWrite(handle, test_message, message_len, 1000, 1, nullptr); EXPECT_EQ(written, message_len) << "Should write all bytes. Written: " << written << ", Expected: " << message_len; usleep(500000); - char read_buffer[256] = {0}; - int read_bytes = serialRead(handle_, read_buffer, sizeof(read_buffer) - 1, 2000, 1, nullptr); + std::array read_buffer = {}; + int read_bytes = serialRead(handle, read_buffer.data(), static_cast(read_buffer.size()) - 1, 2000, 1, nullptr); EXPECT_GT(read_bytes, 0) << "Should read at least some bytes"; - EXPECT_LE(read_bytes, static_cast(sizeof(read_buffer) - 1)) << "Should not overflow buffer"; + EXPECT_LE(read_bytes, static_cast(read_buffer.size() - 1)) << "Should not overflow buffer"; - read_buffer[read_bytes] = '\0'; - EXPECT_STRNE(read_buffer, "") << "Should receive echo from Arduino"; + read_buffer[static_cast(read_bytes)] = '\0'; + EXPECT_STRNE(read_buffer.data(), "") << "Should receive echo from Arduino"; } TEST_F(SerialArduinoTest, MultipleEchoCycles) { - const char *messages[] = {"Test1\n", "Test2\n", "Test3\n"}; - const int num_messages = 3; + const std::array messages = {"Test1\n", "Test2\n", "Test3\n"}; - for (int i = 0; i < num_messages; ++i) + for (size_t index = 0; index < messages.size(); ++index) { - int msg_len = static_cast(strlen(messages[i])); + int msg_len = static_cast(strlen(messages[index])); - int written = serialWrite(handle_, messages[i], msg_len, 1000, 1, nullptr); - EXPECT_EQ(written, msg_len) << "Cycle " << i << ": write failed"; + int written = serialWrite(handle, messages[index], msg_len, 1000, 1, nullptr); + EXPECT_EQ(written, msg_len) << "Cycle " << index << ": write failed"; usleep(500000); - char read_buffer[256] = {0}; - int read_bytes = serialRead(handle_, read_buffer, sizeof(read_buffer) - 1, 2000, 1, nullptr); - EXPECT_GT(read_bytes, 0) << "Cycle " << i << ": read failed"; + std::array read_buffer = {}; + int read_bytes = + serialRead(handle, read_buffer.data(), static_cast(read_buffer.size()) - 1, 2000, 1, nullptr); + EXPECT_GT(read_bytes, 0) << "Cycle " << index << ": read failed"; } } TEST_F(SerialArduinoTest, ReadTimeout) { - char buffer[256]; - int read_bytes = serialRead(handle_, buffer, sizeof(buffer), 100, 1, nullptr); + std::array buffer = {}; + int read_bytes = serialRead(handle, buffer.data(), static_cast(buffer.size()), 100, 1, nullptr); EXPECT_GE(read_bytes, 0) << "Timeout should return 0, not error"; } +TEST_F(SerialArduinoTest, Read60BytesWithoutWritingTimesOut) +{ + drainInput(handle); + std::array buffer = {}; + const int read_bytes = serialRead(handle, buffer.data(), static_cast(buffer.size()), 200, 1, nullptr); + EXPECT_EQ(read_bytes, 0) << "Expected timeout (0 bytes) when no data is sent"; +} + +TEST_F(SerialArduinoTest, Write10BytesRead60Returns10ThenTimesOut) +{ + drainInput(handle); + const std::array payload = {'0', '1', '2', '3', '4', '5', '6', '7', '8', '9'}; + const int written = serialWrite(handle, payload.data(), static_cast(payload.size()), 2000, 1, nullptr); + ASSERT_EQ(written, static_cast(payload.size())); + + std::array buffer = {}; + const int read_bytes = serialRead(handle, buffer.data(), static_cast(buffer.size()), 200, 1, nullptr); + EXPECT_EQ(read_bytes, static_cast(payload.size())) + << "Expected to read the 10 echoed bytes, then timeout waiting for more"; +} + +TEST_F(SerialArduinoTest, AbortRead) +{ + std::atomic read_result{999}; + std::thread reader([&] { + std::array buffer = {}; + read_result.store(serialRead(handle, buffer.data(), static_cast(buffer.size()), 10000, 1, nullptr)); + }); + + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + EXPECT_EQ(serialAbortRead(handle, nullptr), 0); + + reader.join(); + EXPECT_EQ(read_result.load(), static_cast(cpp_core::StatusCodes::kAbortReadError)); +} + +TEST_F(SerialArduinoTest, AbortWriteDuringLargeTransfer) +{ + const int64_t default_total_bytes = 100LL * 1024LL * 1024LL; + const int64_t default_abort_after_bytes = 1LL * 1024LL * 1024LL; + const int64_t total_bytes = default_total_bytes; + const int64_t abort_after_bytes = default_abort_after_bytes; + + // We intentionally do NOT read the echo here; the goal is to saturate the OS TX queue so + // serialWrite() hits EAGAIN -> poll(), then verify serialAbortWrite() unblocks it. + constexpr int kChunkSize = 64 * 1024; + static_assert(kChunkSize > 0); + + std::string chunk(static_cast(kChunkSize), '\x55'); + std::atomic bytes_sent{0}; + std::atomic in_write_call{false}; + std::atomic write_result{999}; + + std::thread writer([&] { + while (bytes_sent.load() < total_bytes) + { + in_write_call.store(true); + const int res = serialWrite(handle, chunk.data(), static_cast(chunk.size()), 10000, 1, nullptr); + in_write_call.store(false); + + if (res == static_cast(cpp_core::StatusCodes::kAbortWriteError)) + { + write_result.store(res); + return; + } + if (res < 0) + { + write_result.store(res); + return; + } + bytes_sent.fetch_add(res); + } + write_result.store(static_cast(cpp_core::StatusCodes::kSuccess)); + }); + + // Wait briefly for either progress OR an early error, then start aborting. + const auto wait_deadline = std::chrono::steady_clock::now() + std::chrono::milliseconds(500); + while (std::chrono::steady_clock::now() < wait_deadline) + { + if (write_result.load() != 999) + { + break; + } + if (bytes_sent.load() >= abort_after_bytes) + { + break; + } + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + + const auto abort_deadline = std::chrono::steady_clock::now() + std::chrono::seconds(3); + while (std::chrono::steady_clock::now() < abort_deadline) + { + if (write_result.load() != 999) + { + break; + } + EXPECT_EQ(serialAbortWrite(handle, nullptr), 0); + std::this_thread::sleep_for(std::chrono::milliseconds(25)); + } + + // Hard fail-safe: if abort did not take effect, close the handle to force the writer to exit. + if (write_result.load() != static_cast(cpp_core::StatusCodes::kAbortWriteError)) + { + (void)serialClose(handle, nullptr); + handle = 0; + } + + writer.join(); + + EXPECT_EQ(write_result.load(), static_cast(cpp_core::StatusCodes::kAbortWriteError)) + << "Expected abort during large transfer. bytes_sent=" << bytes_sent.load() << " total_bytes=" << total_bytes; +} + TEST(SerialInvalidHandleTest, InvalidHandleRead) { - char buffer[256]; - int result = serialRead(-1, buffer, sizeof(buffer), 1000, 1, nullptr); + std::array buffer = {}; + int result = serialRead(-1, buffer.data(), static_cast(buffer.size()), 1000, 1, nullptr); EXPECT_EQ(result, static_cast(cpp_core::StatusCodes::kInvalidHandleError)) << "Should return error for invalid handle"; }