diff --git a/.bdignore b/.bdignore new file mode 100644 index 0000000..5e06ba1 --- /dev/null +++ b/.bdignore @@ -0,0 +1,3 @@ +# BlackDuck scan exclusions +# Vendored third-party source — attributed in NOTICE and LICENSE files +src/vendor/ diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 21b6861..2cdf40c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -199,3 +199,39 @@ jobs: - name: Add coverage summary to job summary run: cat code-coverage-results.md >> $GITHUB_STEP_SUMMARY + + network_isolation_tests: + permissions: + contents: read + packages: read + needs: [build_docker, build_project] + runs-on: ubuntu-latest + steps: + - name: Log in to the Container registry + uses: docker/login-action@v3 + with: + registry: ghcr.io + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + + - name: Checkout Code + uses: actions/checkout@v4 + + - name: Download build directory + uses: actions/download-artifact@v4 + with: + name: build-dir + path: ${{ github.workspace }}/build + + - name: Run network-isolation tests (--network none) + run: | + chmod +x ${{ github.workspace }}/build/test/utApp + docker run --rm --network none \ + --user "$(id -u):$(id -g)" \ + -v ${{ github.workspace }}:/workspace \ + -w /workspace \ + ${{ needs.build_docker.outputs.image_tag }} \ + bash -c " \ + build/test/utApp \ + --gtest_filter='TransportNumericIPUTest.*:TransportIPv6UTest.ConnectViaIPv6LoopbackIP:TransportNumericIPResolverTest.ConnectFailureViaNumericIP' \ + " diff --git a/.version b/.version new file mode 100644 index 0000000..18efdb9 --- /dev/null +++ b/.version @@ -0,0 +1 @@ +1.1.8 diff --git a/LICENSE b/LICENSE index d645695..8393cf6 100644 --- a/LICENSE +++ b/LICENSE @@ -200,3 +200,31 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. + +=========================================================================== + +BSD-3 License + +Copyright + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + * Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in + the documentation and/or other materials provided with the distribution. + * Neither the name of the copyright holder nor the names of its + contributors may be used to endorse or promote products derived from + this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/build.sh b/build.sh index fe04cd1..1ea0868 100755 --- a/build.sh +++ b/build.sh @@ -18,6 +18,39 @@ set -e +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +IMAGE="firebolt-cpp-transport-ci:local" + +# Pre-scan for --docker before arg parsing so forwarded args are preserved +use_docker=false +_forward_args=() +for _arg in "$@"; do + [[ "$_arg" == "--docker" ]] && { use_docker=true; continue; } + _forward_args+=("$_arg") +done + +if $use_docker; then + for _bdir in build build-dev; do + _cache="$SCRIPT_DIR/$_bdir/CMakeCache.txt" + if [[ -f "$_cache" ]]; then + _cached=$(grep '^CMAKE_HOME_DIRECTORY' "$_cache" 2>/dev/null | cut -d= -f2 || true) + # In Docker, the workspace is mounted at /workspace. Only wipe if the + # cache was configured for a different environment (e.g. a native host build). + if [[ -n "$_cached" && "$_cached" != "/workspace" ]]; then + echo "Wiping stale $_bdir (configured at $_cached, expected /workspace)..." + rm -rf "$SCRIPT_DIR/$_bdir" + fi + fi + done + if ! docker image inspect "$IMAGE" &>/dev/null; then + echo "Building CI Docker image (one-time)..." + docker build -t "$IMAGE" -f "$SCRIPT_DIR/.github/Dockerfile" "$SCRIPT_DIR" + fi + exec docker run --rm --user "$(id -u):$(id -g)" \ + -v "$SCRIPT_DIR:/workspace" -w /workspace \ + "$IMAGE" ./build.sh "${_forward_args[@]}" +fi + bdir="build" do_install=false params= @@ -50,12 +83,21 @@ while [[ ! -z $1 ]]; do esac; shift done -[[ ! -z $SYSROOT_PATH ]] || { echo "SYSROOT_PATH not set" >/dev/stderr; exit 1; } -[[ -e $SYSROOT_PATH ]] || { echo "SYSROOT_PATH not exist ($SYSROOT_PATH)" >/dev/stderr; exit 1; } +SYSROOT_PATH="${SYSROOT_PATH:-/}" +[[ -e $SYSROOT_PATH ]] || { echo "SYSROOT_PATH does not exist ($SYSROOT_PATH)" >/dev/stderr; exit 1; } $cleanFirst && rm -rf $bdir -if [[ ! -e "$bdir" || -n "$@" ]]; then +_cache="$SCRIPT_DIR/$bdir/CMakeCache.txt" +if [[ -f "$_cache" ]]; then + _cached=$(grep '^CMAKE_HOME_DIRECTORY' "$_cache" 2>/dev/null | cut -d= -f2 || true) + if [[ -n "$_cached" && "$_cached" != "$SCRIPT_DIR" ]]; then + echo "Wiping stale $bdir (configured at $_cached, now at $SCRIPT_DIR)..." + rm -rf "$SCRIPT_DIR/$bdir" + fi +fi + +if [[ ! -f "$bdir/CMakeCache.txt" || $# -gt 0 ]]; then params+=" -DCMAKE_EXPORT_COMPILE_COMMANDS=ON" command -v ccache >/dev/null 2>&1 && params+=" -DCMAKE_C_COMPILER_LAUNCHER=ccache -DCMAKE_CXX_COMPILER_LAUNCHER=ccache" cmake -B $bdir \ diff --git a/cmake/version.cmake b/cmake/version.cmake index 29c68aa..7d2417c 100644 --- a/cmake/version.cmake +++ b/cmake/version.cmake @@ -21,26 +21,26 @@ endif () if (NOT PROJECT_VERSION) set(VERSION_STRING "0.1.0-unknown") - find_package(Git QUIET) - - if (GIT_FOUND) - execute_process( - COMMAND ${GIT_EXECUTABLE} describe --tags --abbrev=0 --match "v[0-9]*.[0-9]*.[0-9]*" - WORKING_DIRECTORY ${CMAKE_SOURCE_DIR} - OUTPUT_VARIABLE GIT_VERSION - OUTPUT_STRIP_TRAILING_WHITESPACE - ERROR_QUIET - ) - endif () - - if (GIT_VERSION) - string(REGEX REPLACE "^v" "" VERSION_STRING "${GIT_VERSION}") - endif () - - if(VERSION_STRING STREQUAL "0.1.0-unknown" AND EXISTS "${CMAKE_SOURCE_DIR}/.version") + if (EXISTS "${CMAKE_SOURCE_DIR}/.version") file(READ "${CMAKE_SOURCE_DIR}/.version" VERSION_STRING) string(STRIP "${VERSION_STRING}" VERSION_STRING) - endif() + else () + find_package(Git QUIET) + + if (GIT_FOUND) + execute_process( + COMMAND ${GIT_EXECUTABLE} describe --tags --abbrev=0 --match "v[0-9]*.[0-9]*.[0-9]*" + WORKING_DIRECTORY ${CMAKE_SOURCE_DIR} + OUTPUT_VARIABLE GIT_VERSION + OUTPUT_STRIP_TRAILING_WHITESPACE + ERROR_QUIET + ) + endif () + + if (GIT_VERSION) + string(REGEX REPLACE "^v" "" VERSION_STRING "${GIT_VERSION}") + endif () + endif () set(PROJECT_VERSION "${VERSION_STRING}" CACHE STRING "Project version string") set(PROJECT_VERSION "${VERSION_STRING}") diff --git a/include/firebolt/config.h.in b/include/firebolt/config.h.in index ec9cdfd..b5f5305 100644 --- a/include/firebolt/config.h.in +++ b/include/firebolt/config.h.in @@ -73,6 +73,45 @@ struct Config /** Watchdog polling cycle in milliseconds. Default: 500. */ unsigned watchdogCycle_ms = 500; + + /** + * @brief Retry policy for the initial gateway connection. + * + * On embedded devices the Firebolt client may start before the gateway + * daemon is ready (poor systemd ordering) or before the loopback interface + * has an IPv4 address assigned. Setting reconnect_max_attempts > 0 causes + * connect() to retry up to that many times, waiting reconnect_delay_ms + * between each attempt, before returning an error to the caller. + * + * Each attempt is bounded by connect_attempt_timeout_ms: if the underlying + * transport (DNS + TCP + WebSocket handshake) does not complete within that + * window, the attempt is aborted and counted as a failure. connect() + * therefore blocks for at most: + * (reconnect_max_attempts + 1) * connect_attempt_timeout_ms + * + reconnect_max_attempts * reconnect_delay_ms + * + * Capped internally at 100 retries regardless of this value. + * + * Default: 0 (single attempt, no retry; connect() returns the last + * transport error, e.g. Error::NotConnected, Error::Timedout, or + * Error::General, or Error::NotConnected on per-attempt timeout). + */ + unsigned reconnect_max_attempts = 0; + + /** Milliseconds to wait between reconnect attempts. Default: 1000. */ + unsigned reconnect_delay_ms = 1000; + + /** + * @brief Per-attempt timeout for the initial connection handshake (ms). + * + * If DNS resolution + TCP connect + WebSocket handshake do not complete + * within this window, the in-flight attempt is aborted and connect() + * either retries (if reconnect_max_attempts > 0) or returns + * Error::NotConnected to the caller. + * + * Default: 10000 (10 seconds). + */ + unsigned connect_attempt_timeout_ms = 10000; }; } // namespace Firebolt diff --git a/include/firebolt/gateway.h b/include/firebolt/gateway.h index d3a49d7..5b693e0 100644 --- a/include/firebolt/gateway.h +++ b/include/firebolt/gateway.h @@ -36,6 +36,9 @@ class IGateway public: virtual ~IGateway(); + // NOTE: onConnectionChange is invoked once with the final result on the + // connect() calling thread. All subsequent callbacks (disconnect, watchdog + // reconnect) fire on the websocketpp IO thread. Callers must be thread-safe. virtual Firebolt::Error connect(const Firebolt::Config& config, ConnectionChangeCallback onConnectionChange) = 0; virtual Firebolt::Error disconnect() = 0; diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index f0045ea..230e8cc 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -82,6 +82,9 @@ target_link_libraries(${TARGET} target_include_directories(${TARGET} PRIVATE + # vendor/ must come before the system websocketpp headers so that + # our patched endpoint.hpp (numeric-IP resolver fix) takes priority. + $ $ $ PUBLIC diff --git a/src/gateway.cpp b/src/gateway.cpp index 8f163e1..11a3d8a 100644 --- a/src/gateway.cpp +++ b/src/gateway.cpp @@ -22,6 +22,7 @@ #include "firebolt/types.h" #include "transport.h" #include "utils.h" +#include #include #include #include @@ -382,17 +383,37 @@ class GatewayImpl : public IGateway, private IClientTransport Server server; std::thread watchdogThread; std::atomic watchdogRunning; + std::mutex watchdogMtx; + std::condition_variable watchdogCv; bool legacyRPCv1; std::map rpcv1_eventMap; std::mutex rpcv1_eventMap_mtx; + // Synchronisation for the initial connection attempt (and retries). + // connectResultMtx / connectResultCv are only used inside connect(). + std::mutex connectResultMtx; + std::condition_variable connectResultCv; + bool connectResultReady{false}; + bool connectResultOk{false}; + Firebolt::Error connectResultError{Firebolt::Error::None}; + + // Guards connectionChangeListener against concurrent reads (IO thread) and + // writes (main thread). Always acquire connectionListenerMtx BEFORE + // connectResultMtx to avoid inversion. + std::mutex connectionListenerMtx; + + // Set to true when disconnect() is called so a retry loop in connect() + // aborts early instead of sleeping out the full reconnect_delay_ms. + std::atomic disconnectRequested_{false}; + public: GatewayImpl() : client(*this), server(), watchdogRunning(false), - legacyRPCv1(false) + legacyRPCv1(false), + disconnectRequested_(false) { } @@ -401,6 +422,7 @@ class GatewayImpl : public IGateway, private IClientTransport if (watchdogRunning) { watchdogRunning = false; + watchdogCv.notify_one(); if (watchdogThread.joinable()) { watchdogThread.join(); @@ -416,10 +438,37 @@ class GatewayImpl : public IGateway, private IClientTransport Firebolt::Logger::setFormat(cfg.log.format.ts, cfg.log.format.location, cfg.log.format.function, cfg.log.format.thread); - connectionChangeListener = onConnectionChange; + // Signal the retry-loop condvar when each async open/fail arrives. + // Do NOT forward to onConnectionChange here — connect() fires it once + // with the final result after the loop, so callers never see partial + // failure callbacks from intermediate retry attempts. + ConnectionChangeCallback previousListener; + // NOTE: the wrapper intentionally does NOT forward events to the + // previous listener. Forwarding only during an AlreadyConnected window + // requires a flag set after transport.connect() returns, but the IO + // thread can fire onOpen/onFail between client_->connect() being queued + // and that flag being set — a race that triggers a spurious callback on + // the old session. The AlreadyConnected window is a few microseconds + // (synchronous return) and the listener is restored before connect() + // returns, so the theoretical one-event loss is preferable to the race. + { + std::lock_guard listenerLock(connectionListenerMtx); + previousListener = connectionChangeListener; + connectionChangeListener = [this](bool connected, Firebolt::Error error) + { + { + std::lock_guard lk(connectResultMtx); + connectResultReady = true; + connectResultOk = connected; + connectResultError = error; + } + connectResultCv.notify_one(); + }; + } runtime_waitTime_ms = cfg.waitTime_ms; legacyRPCv1 = cfg.legacyRPCv1; + disconnectRequested_ = false; std::string url = buildGatewayUrl(cfg.wsUrl, legacyRPCv1); @@ -432,17 +481,131 @@ class GatewayImpl : public IGateway, private IClientTransport FIREBOLT_LOG_NOTICE("Transport", "Legacy RPCv1"); } - FIREBOLT_LOG_NOTICE("Gateway", "Connecting to url = %s", url.c_str()); - Firebolt::Error status = transport.connect( - url, [this](const nlohmann::json& message) { this->onMessage(message); }, - [this](const bool connected, Firebolt::Error error) { this->onConnectionChange(connected, error); }, - transportLoggingInclude, transportLoggingExclude); + // Cap to avoid unsigned overflow when reconnect_max_attempts is very large. + constexpr unsigned kMaxRetries = 100u; + const unsigned maxAttempts = 1 + std::min(cfg.reconnect_max_attempts, kMaxRetries); + Firebolt::Error status = Firebolt::Error::NotConnected; + + for (unsigned attempt = 1; attempt <= maxAttempts; ++attempt) + { + if (disconnectRequested_) + { + break; + } + + if (attempt > 1) + { + FIREBOLT_LOG_NOTICE("Gateway", "Reconnect attempt %u/%u in %u ms ...", attempt, maxAttempts, + cfg.reconnect_delay_ms); + // Sleep in reconnect_delay_ms increments so disconnect() can + // abort the wait early by setting disconnectRequested_. + constexpr unsigned kSliceMs = 50; + for (unsigned elapsed = 0; elapsed < cfg.reconnect_delay_ms && !disconnectRequested_; elapsed += kSliceMs) + { + std::this_thread::sleep_for( + std::chrono::milliseconds(std::min(kSliceMs, cfg.reconnect_delay_ms - elapsed))); + } + if (disconnectRequested_) + break; + } + + { + std::lock_guard lk(connectResultMtx); + connectResultReady = false; + connectResultOk = false; + connectResultError = Firebolt::Error::None; + } + + FIREBOLT_LOG_NOTICE("Gateway", "Connecting to url = %s (attempt %u/%u)", url.c_str(), attempt, maxAttempts); + + status = transport.connect( + url, [this](const nlohmann::json& message) { this->onMessage(message); }, + [this](const bool connected, Firebolt::Error error) { this->onConnectionChange(connected, error); }, + transportLoggingInclude, transportLoggingExclude); + + if (status != Firebolt::Error::None) + { + // Synchronous error from transport (e.g. bad URL) — no point retrying. + break; + } + + // Wait for the async open/fail callback (with a generous ceiling). + bool attemptReady = false, attemptOk = false; + Firebolt::Error attemptError = Firebolt::Error::NotConnected; + { + const auto kConnectTimeout = std::chrono::milliseconds(cfg.connect_attempt_timeout_ms); + // Snapshot result fields while the mutex is held: the IO-thread + // callback can write to these fields after wait_for releases the + // lock, so reading them outside the scope is a data race. + std::unique_lock lk(connectResultMtx); + connectResultCv.wait_for(lk, kConnectTimeout, + [this] { return connectResultReady || disconnectRequested_.load(); }); + attemptReady = connectResultReady; + attemptOk = connectResultOk; + attemptError = connectResultError; + } + + if (disconnectRequested_) + { + // Abort the in-flight transport connection so the socket is + // cleaned up, and force a failure status so the post-loop code + // does not falsely report success, install the user listener, or + // start the watchdog. + transport.disconnect(); + status = Firebolt::Error::NotConnected; + break; + } + + if (attemptOk) + { + status = Firebolt::Error::None; + break; + } + + // Connection failed or timed out. Close the in-flight attempt before + // the next retry iteration so there are no overlapping websocket handles. + if (!attemptReady) + { + FIREBOLT_LOG_WARNING("Gateway", "Connect attempt %u/%u timed out; aborting", attempt, maxAttempts); + attemptError = Firebolt::Error::NotConnected; + } + transport.disconnect(); + status = (attemptError != Firebolt::Error::None) ? attemptError : Firebolt::Error::NotConnected; + } if (status != Firebolt::Error::None) { + if (status == Firebolt::Error::AlreadyConnected) + { + // Transport is already connected — restore the pre-existing listener + // so the live connection is not disrupted, and do NOT emit a false + // "disconnected" event to the caller. + std::lock_guard lk(connectionListenerMtx); + connectionChangeListener = previousListener; + return status; + } + // Restore the plain user callback so subsequent events (if any) are + // forwarded directly without the condvar logic. + { + std::lock_guard lk(connectionListenerMtx); + connectionChangeListener = onConnectionChange; + } + // NOTE: this final result callback fires on the connect() calling + // thread. Post-connect state-change callbacks (disconnect, watchdog + // reconnect) fire on the websocketpp IO thread. Callers must be + // prepared to receive callbacks on either thread. + onConnectionChange(false, status); return status; } + // Swap the wrapper out for the plain user callback now that we're connected. + { + std::lock_guard lk(connectionListenerMtx); + connectionChangeListener = onConnectionChange; + } + // See NOTE above about callback thread. + onConnectionChange(true, Firebolt::Error::None); + if (!watchdogRunning.exchange(true)) { watchdogThread = std::thread( @@ -450,8 +613,13 @@ class GatewayImpl : public IGateway, private IClientTransport { while (watchdogRunning) { - std::this_thread::sleep_for(std::chrono::milliseconds(watchdog_interval_ms)); - client.checkPromises(); + std::unique_lock lk(watchdogMtx); + bool timedOut = !watchdogCv.wait_for(lk, std::chrono::milliseconds(watchdog_interval_ms), + [this] { return !watchdogRunning.load(); }); + if (!watchdogRunning) + break; + if (timedOut) + client.checkPromises(); } }); } @@ -461,36 +629,28 @@ class GatewayImpl : public IGateway, private IClientTransport virtual Firebolt::Error disconnect() override { - FIREBOLT_LOG_DEBUG("Gateway", "[disconnect] transport.disconnect() start"); - auto t0_disc = std::chrono::steady_clock::now(); + disconnectRequested_ = true; + connectResultCv.notify_all(); // wake any in-progress retry wait + FIREBOLT_LOG_INFO("Gateway", "[shutdown] transport.disconnect() start"); Firebolt::Error status = transport.disconnect(); - FIREBOLT_LOG_INFO("Gateway", "[disconnect] transport.disconnect() done in %ld ms, status=%d", - std::chrono::duration_cast(std::chrono::steady_clock::now() - t0_disc) - .count(), - static_cast(status)); + FIREBOLT_LOG_INFO("Gateway", "[shutdown] transport.disconnect() done, status=%d", static_cast(status)); if (status != Firebolt::Error::None) { return status; } if (watchdogRunning.exchange(false)) { - FIREBOLT_LOG_DEBUG("Gateway", "[disconnect] waiting for watchdog thread join..."); - auto t0_wdog = std::chrono::steady_clock::now(); + watchdogCv.notify_one(); + FIREBOLT_LOG_INFO("Gateway", "[shutdown] waiting for watchdog join..."); if (watchdogThread.joinable()) { watchdogThread.join(); } - FIREBOLT_LOG_DEBUG("Gateway", "[disconnect] watchdog joined in %ld ms", - std::chrono::duration_cast(std::chrono::steady_clock::now() - - t0_wdog) - .count()); + FIREBOLT_LOG_INFO("Gateway", "[shutdown] watchdog joined"); } - FIREBOLT_LOG_DEBUG("Gateway", "[disconnect] stopping notification worker..."); - auto t0_nw = std::chrono::steady_clock::now(); + FIREBOLT_LOG_INFO("Gateway", "[shutdown] stopping notification worker..."); server.stopNotificationWorker(); - FIREBOLT_LOG_DEBUG("Gateway", "[disconnect] notification worker stopped in %ld ms", - std::chrono::duration_cast(std::chrono::steady_clock::now() - t0_nw) - .count()); + FIREBOLT_LOG_INFO("Gateway", "[shutdown] notification worker stopped"); return Error::None; } @@ -529,25 +689,15 @@ class GatewayImpl : public IGateway, private IClientTransport nlohmann::json params; params["listen"] = true; - FIREBOLT_LOG_DEBUG("Gateway", "[subscribe] waiting for subscribe ACK for '%s'...", event.c_str()); - auto t0_sub = std::chrono::steady_clock::now(); - auto future_sub = request(event, params, id); + auto future = request(event, params, id); constexpr auto kSubscribeAckTimeout = std::chrono::milliseconds(50); - if (future_sub.wait_for(kSubscribeAckTimeout) != std::future_status::ready) + if (future.wait_for(kSubscribeAckTimeout) != std::future_status::ready) { - FIREBOLT_LOG_DEBUG("Gateway", "[subscribe] ACK for '%s' timed out after %lld ms", event.c_str(), - static_cast(std::chrono::duration_cast( - std::chrono::steady_clock::now() - t0_sub) - .count())); status = Firebolt::Error::Timedout; } else { - FIREBOLT_LOG_DEBUG("Gateway", "[subscribe] ACK for '%s' received in %lld ms", event.c_str(), - static_cast(std::chrono::duration_cast( - std::chrono::steady_clock::now() - t0_sub) - .count())); - auto result = future_sub.get(); + auto result = future.get(); if (!result) { status = result.error(); @@ -570,7 +720,6 @@ class GatewayImpl : public IGateway, private IClientTransport { FIREBOLT_LOG_DEBUG("Gateway", "Unsubscribe called for event '%s'", event.c_str()); Firebolt::Error status = server.unsubscribe(event, usercb); - if (status != Firebolt::Error::None) { FIREBOLT_LOG_DEBUG("Gateway", "Unsubscribe failed for event '%s'", event.c_str()); @@ -685,7 +834,15 @@ class GatewayImpl : public IGateway, private IClientTransport FIREBOLT_LOG_ERROR("Gateway", "Invalid payload received: %s", message.dump().c_str()); } - void onConnectionChange(const bool connected, Firebolt::Error error) { connectionChangeListener(connected, error); } + void onConnectionChange(const bool connected, Firebolt::Error error) + { + ConnectionChangeCallback cb; + { + std::lock_guard lk(connectionListenerMtx); + cb = connectionChangeListener; + } + cb(connected, error); + } MessageID getNextMessageID() override { return transport.getNextMessageID(); } diff --git a/src/transport.cpp b/src/transport.cpp index d303874..4d585a8 100644 --- a/src/transport.cpp +++ b/src/transport.cpp @@ -32,6 +32,7 @@ using message_ptr = websocketpp::config::asio_client::message_type::ptr; enum class Transport::TransportState { NotStarted, + Connecting, // client_->connect() queued; onOpen/onFail not yet fired Connected, Disconnected, }; @@ -79,7 +80,7 @@ void Transport::start() connectionThread_.reset(new websocketpp::lib::thread(&client::run, client_.get())); startMessageWorker(); - connectionStatus_ = TransportState::Disconnected; + connectionStatus_.store(TransportState::Disconnected); } void Transport::startMessageWorker() @@ -144,13 +145,13 @@ Firebolt::Error Transport::connect(std::string url, MessageCallback onMessage, C std::optional transportLoggingInclude, std::optional transportLoggingExclude) { - if (connectionStatus_ == TransportState::Connected) + if (connectionStatus_.load() == TransportState::Connected) { FIREBOLT_LOG_WARNING("Transport", "Connect called when already connected. Ignoring"); return Firebolt::Error::AlreadyConnected; } - if (connectionStatus_ == TransportState::NotStarted) + if (connectionStatus_.load() == TransportState::NotStarted) { debugEnabled_ = Logger::isLogLevelEnabled(Firebolt::LogLevel::Debug); start(); @@ -204,6 +205,7 @@ Firebolt::Error Transport::connect(std::string url, MessageCallback onMessage, C con->set_message_handler(websocketpp::lib::bind(&Transport::onMessage, this, websocketpp::lib::placeholders::_1, websocketpp::lib::placeholders::_2)); + connectionStatus_.store(TransportState::Connecting); client_->connect(con); return Firebolt::Error::None; @@ -211,13 +213,24 @@ Firebolt::Error Transport::connect(std::string url, MessageCallback onMessage, C Firebolt::Error Transport::disconnect() { - if (connectionStatus_ == TransportState::NotStarted) + if (connectionStatus_.load() == TransportState::NotStarted) { return Firebolt::Error::None; } client_->stop_perpetual(); - if (connectionStatus_ == TransportState::Connected) + if (connectionStatus_.load() == TransportState::Connecting) + { + // A connect attempt is in-flight (DNS / TCP / WS handshake). stop_perpetual() + // alone does not cancel pending async operations; client_->stop() forces the + // asio io_service to exit run() immediately so connectionThread_ can be joined + // without waiting for the full OS-level TCP timeout. + // onFail will not fire after stop(), but the gateway's condvar is already + // woken by disconnectRequested_ so no callback is needed here. + FIREBOLT_LOG_DEBUG("Transport", "[disconnect] aborting in-progress connect attempt via stop()"); + client_->stop(); + } + else if (connectionStatus_.load() == TransportState::Connected) { // Shorten the close-handshake timeout so that join() below does not block // for the full websocketpp default (5 s) if the gateway is unresponsive. @@ -263,7 +276,7 @@ Firebolt::Error Transport::disconnect() .count())); client_ = std::make_unique(); - connectionStatus_ = TransportState::NotStarted; + connectionStatus_.store(TransportState::NotStarted); return Firebolt::Error::None; } @@ -274,7 +287,7 @@ unsigned Transport::getNextMessageID() Firebolt::Error Transport::send(const std::string& method, const nlohmann::json& params, const unsigned id) { - if (connectionStatus_ != TransportState::Connected) + if (connectionStatus_.load() != TransportState::Connected) { return Firebolt::Error::NotConnected; } @@ -332,7 +345,7 @@ void Transport::onMessage(websocketpp::connection_hdl /* hdl */, void Transport::onOpen(websocketpp::client* c, websocketpp::connection_hdl hdl) { - connectionStatus_ = TransportState::Connected; + connectionStatus_.store(TransportState::Connected); client::connection_ptr con = c->get_con_from_hdl(hdl); connectionReceiver_(true, Firebolt::Error::None); @@ -340,14 +353,14 @@ void Transport::onOpen(websocketpp::client* c, void Transport::onClose(websocketpp::client* c, websocketpp::connection_hdl hdl) { - connectionStatus_ = TransportState::Disconnected; + connectionStatus_.store(TransportState::Disconnected); client::connection_ptr con = c->get_con_from_hdl(hdl); connectionReceiver_(false, mapError(con->get_ec())); } void Transport::onFail(websocketpp::client* c, websocketpp::connection_hdl hdl) { - connectionStatus_ = TransportState::Disconnected; + connectionStatus_.store(TransportState::Disconnected); client::connection_ptr con = c->get_con_from_hdl(hdl); connectionReceiver_(false, mapError(con->get_ec())); } diff --git a/src/vendor/websocketpp/transport/asio/endpoint.hpp b/src/vendor/websocketpp/transport/asio/endpoint.hpp new file mode 100644 index 0000000..76f6d5f --- /dev/null +++ b/src/vendor/websocketpp/transport/asio/endpoint.hpp @@ -0,0 +1,1197 @@ +/* + * Copyright (c) 2015, Peter Thorson. All rights reserved. + * + * SPDX-License-Identifier: BSD-3-Clause + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * * Neither the name of the WebSocket++ Project nor the + * names of its contributors may be used to endorse or promote products + * derived from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL PETER THORSON BE LIABLE FOR ANY + * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES + * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND + * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#ifndef WEBSOCKETPP_TRANSPORT_ASIO_HPP +#define WEBSOCKETPP_TRANSPORT_ASIO_HPP + +#include +#include +#include + +#include +#include + +#include +#include + +#include +#include + +namespace websocketpp { +namespace transport { +namespace asio { + +/// Asio based endpoint transport component +/** + * transport::asio::endpoint implements an endpoint transport component using + * Asio. + */ +template +class endpoint : public config::socket_type { +public: + /// Type of this endpoint transport component + typedef endpoint type; + + /// Type of the concurrency policy + typedef typename config::concurrency_type concurrency_type; + /// Type of the socket policy + typedef typename config::socket_type socket_type; + /// Type of the error logging policy + typedef typename config::elog_type elog_type; + /// Type of the access logging policy + typedef typename config::alog_type alog_type; + + /// Type of the socket connection component + typedef typename socket_type::socket_con_type socket_con_type; + /// Type of a shared pointer to the socket connection component + typedef typename socket_con_type::ptr socket_con_ptr; + + /// Type of the connection transport component associated with this + /// endpoint transport component + typedef asio::connection transport_con_type; + /// Type of a shared pointer to the connection transport component + /// associated with this endpoint transport component + typedef typename transport_con_type::ptr transport_con_ptr; + + /// Type of a pointer to the ASIO io_service being used + typedef lib::asio::io_service * io_service_ptr; + /// Type of a shared pointer to the acceptor being used + typedef lib::shared_ptr acceptor_ptr; + /// Type of a shared pointer to the resolver being used + typedef lib::shared_ptr resolver_ptr; + /// Type of timer handle + typedef lib::shared_ptr timer_ptr; + /// Type of a shared pointer to an io_service work object + typedef lib::shared_ptr work_ptr; + + /// Type of socket pre-bind handler + typedef lib::function tcp_pre_bind_handler; + + // generate and manage our own io_service + explicit endpoint() + : m_io_service(NULL) + , m_external_io_service(false) + , m_listen_backlog(lib::asio::socket_base::max_connections) + , m_reuse_addr(false) + , m_state(UNINITIALIZED) + { + //std::cout << "transport::asio::endpoint constructor" << std::endl; + } + + ~endpoint() { + // clean up our io_service if we were initialized with an internal one. + + // Explicitly destroy local objects + m_acceptor.reset(); + m_resolver.reset(); + m_work.reset(); + if (m_state != UNINITIALIZED && !m_external_io_service) { + delete m_io_service; + } + } + + /// transport::asio objects are moveable but not copyable or assignable. + /// The following code sets this situation up based on whether or not we + /// have C++11 support or not +#ifdef _WEBSOCKETPP_DEFAULT_DELETE_FUNCTIONS_ + endpoint(const endpoint & src) = delete; + endpoint& operator= (const endpoint & rhs) = delete; +#else +private: + endpoint(const endpoint & src); + endpoint & operator= (const endpoint & rhs); +public: +#endif // _WEBSOCKETPP_DEFAULT_DELETE_FUNCTIONS_ + +#ifdef _WEBSOCKETPP_MOVE_SEMANTICS_ + endpoint (endpoint && src) + : config::socket_type(std::move(src)) + , m_tcp_pre_init_handler(src.m_tcp_pre_init_handler) + , m_tcp_post_init_handler(src.m_tcp_post_init_handler) + , m_io_service(src.m_io_service) + , m_external_io_service(src.m_external_io_service) + , m_acceptor(src.m_acceptor) + , m_listen_backlog(lib::asio::socket_base::max_connections) + , m_reuse_addr(src.m_reuse_addr) + , m_elog(src.m_elog) + , m_alog(src.m_alog) + , m_state(src.m_state) + { + src.m_io_service = NULL; + src.m_external_io_service = false; + src.m_acceptor = NULL; + src.m_state = UNINITIALIZED; + } + + /*endpoint & operator= (const endpoint && rhs) { + if (this != &rhs) { + m_io_service = rhs.m_io_service; + m_external_io_service = rhs.m_external_io_service; + m_acceptor = rhs.m_acceptor; + m_listen_backlog = rhs.m_listen_backlog; + m_reuse_addr = rhs.m_reuse_addr; + m_state = rhs.m_state; + + rhs.m_io_service = NULL; + rhs.m_external_io_service = false; + rhs.m_acceptor = NULL; + rhs.m_listen_backlog = lib::asio::socket_base::max_connections; + rhs.m_state = UNINITIALIZED; + + // TODO: this needs to be updated + } + return *this; + }*/ +#endif // _WEBSOCKETPP_MOVE_SEMANTICS_ + + /// Return whether or not the endpoint produces secure connections. + bool is_secure() const { + return socket_type::is_secure(); + } + + /// initialize asio transport with external io_service (exception free) + /** + * Initialize the ASIO transport policy for this endpoint using the provided + * io_service object. asio_init must be called exactly once on any endpoint + * that uses transport::asio before it can be used. + * + * @param ptr A pointer to the io_service to use for asio events + * @param ec Set to indicate what error occurred, if any. + */ + void init_asio(io_service_ptr ptr, lib::error_code & ec) { + if (m_state != UNINITIALIZED) { + m_elog->write(log::elevel::library, + "asio::init_asio called from the wrong state"); + using websocketpp::error::make_error_code; + ec = make_error_code(websocketpp::error::invalid_state); + return; + } + + m_alog->write(log::alevel::devel,"asio::init_asio"); + + m_io_service = ptr; + m_external_io_service = true; + m_acceptor.reset(new lib::asio::ip::tcp::acceptor(*m_io_service)); + + m_state = READY; + ec = lib::error_code(); + } + + /// initialize asio transport with external io_service + /** + * Initialize the ASIO transport policy for this endpoint using the provided + * io_service object. asio_init must be called exactly once on any endpoint + * that uses transport::asio before it can be used. + * + * @param ptr A pointer to the io_service to use for asio events + */ + void init_asio(io_service_ptr ptr) { + lib::error_code ec; + init_asio(ptr,ec); + if (ec) { throw exception(ec); } + } + + /// Initialize asio transport with internal io_service (exception free) + /** + * This method of initialization will allocate and use an internally managed + * io_service. + * + * @see init_asio(io_service_ptr ptr) + * + * @param ec Set to indicate what error occurred, if any. + */ + void init_asio(lib::error_code & ec) { + // Use a smart pointer until the call is successful and ownership has + // successfully been taken. Use unique_ptr when available. + // TODO: remove the use of auto_ptr when C++98/03 support is no longer + // necessary. +#ifdef _WEBSOCKETPP_CPP11_MEMORY_ + lib::unique_ptr service(new lib::asio::io_service()); +#else + lib::auto_ptr service(new lib::asio::io_service()); +#endif + init_asio(service.get(), ec); + if( !ec ) service.release(); // Call was successful, transfer ownership + m_external_io_service = false; + } + + /// Initialize asio transport with internal io_service + /** + * This method of initialization will allocate and use an internally managed + * io_service. + * + * @see init_asio(io_service_ptr ptr) + */ + void init_asio() { + // Use a smart pointer until the call is successful and ownership has + // successfully been taken. Use unique_ptr when available. + // TODO: remove the use of auto_ptr when C++98/03 support is no longer + // necessary. +#ifdef _WEBSOCKETPP_CPP11_MEMORY_ + lib::unique_ptr service(new lib::asio::io_service()); +#else + lib::auto_ptr service(new lib::asio::io_service()); +#endif + init_asio( service.get() ); + // If control got this far without an exception, then ownership has successfully been taken + service.release(); + m_external_io_service = false; + } + + /// Sets the tcp pre bind handler + /** + * The tcp pre bind handler is called after the listen acceptor has + * been created but before the socket bind is performed. + * + * @since 0.8.0 + * + * @param h The handler to call on tcp pre bind init. + */ + void set_tcp_pre_bind_handler(tcp_pre_bind_handler h) { + m_tcp_pre_bind_handler = h; + } + + /// Sets the tcp pre init handler + /** + * The tcp pre init handler is called after the raw tcp connection has been + * established but before any additional wrappers (proxy connects, TLS + * handshakes, etc) have been performed. + * + * @since 0.3.0 + * + * @param h The handler to call on tcp pre init. + */ + void set_tcp_pre_init_handler(tcp_init_handler h) { + m_tcp_pre_init_handler = h; + } + + /// Sets the tcp pre init handler (deprecated) + /** + * The tcp pre init handler is called after the raw tcp connection has been + * established but before any additional wrappers (proxy connects, TLS + * handshakes, etc) have been performed. + * + * @deprecated Use set_tcp_pre_init_handler instead + * + * @param h The handler to call on tcp pre init. + */ + void set_tcp_init_handler(tcp_init_handler h) { + set_tcp_pre_init_handler(h); + } + + /// Sets the tcp post init handler + /** + * The tcp post init handler is called after the tcp connection has been + * established and all additional wrappers (proxy connects, TLS handshakes, + * etc have been performed. This is fired before any bytes are read or any + * WebSocket specific handshake logic has been performed. + * + * @since 0.3.0 + * + * @param h The handler to call on tcp post init. + */ + void set_tcp_post_init_handler(tcp_init_handler h) { + m_tcp_post_init_handler = h; + } + + /// Sets the maximum length of the queue of pending connections. + /** + * Sets the maximum length of the queue of pending connections. Increasing + * this will allow WebSocket++ to queue additional incoming connections. + * Setting it higher may prevent failed connections at high connection rates + * but may cause additional latency. + * + * For this value to take effect you may need to adjust operating system + * settings. + * + * New values affect future calls to listen only. + * + * The default value is specified as *::asio::socket_base::max_connections + * which uses the operating system defined maximum queue length. Your OS + * may restrict or silently lower this value. A value of zero may cause + * all connections to be rejected. + * + * @since 0.3.0 + * + * @param backlog The maximum length of the queue of pending connections + */ + void set_listen_backlog(int backlog) { + m_listen_backlog = backlog; + } + + /// Sets whether to use the SO_REUSEADDR flag when opening listening sockets + /** + * Specifies whether or not to use the SO_REUSEADDR TCP socket option. What + * this flag does depends on your operating system. + * + * Please consult operating system documentation for more details. There + * may be security consequences to enabling this option. + * + * New values affect future calls to listen only so set this value prior to + * calling listen. + * + * The default is false. + * + * @since 0.3.0 + * + * @param value Whether or not to use the SO_REUSEADDR option + */ + void set_reuse_addr(bool value) { + m_reuse_addr = value; + } + + /// Retrieve a reference to the endpoint's io_service + /** + * The io_service may be an internal or external one. This may be used to + * call methods of the io_service that are not explicitly wrapped by the + * endpoint. + * + * This method is only valid after the endpoint has been initialized with + * `init_asio`. No error will be returned if it isn't. + * + * @return A reference to the endpoint's io_service + */ + lib::asio::io_service & get_io_service() { + return *m_io_service; + } + + /// Get local TCP endpoint + /** + * Extracts the local endpoint from the acceptor. This represents the + * address that WebSocket++ is listening on. + * + * Sets a bad_descriptor error if the acceptor is not currently listening + * or otherwise unavailable. + * + * @since 0.7.0 + * + * @param ec Set to indicate what error occurred, if any. + * @return The local endpoint + */ + lib::asio::ip::tcp::endpoint get_local_endpoint(lib::asio::error_code & ec) { + if (m_acceptor) { + return m_acceptor->local_endpoint(ec); + } else { + ec = lib::asio::error::make_error_code(lib::asio::error::bad_descriptor); + return lib::asio::ip::tcp::endpoint(); + } + } + + /// Set up endpoint for listening manually (exception free) + /** + * Bind the internal acceptor using the specified settings. The endpoint + * must have been initialized by calling init_asio before listening. + * + * @param ep An endpoint to read settings from + * @param ec Set to indicate what error occurred, if any. + */ + void listen(lib::asio::ip::tcp::endpoint const & ep, lib::error_code & ec) + { + if (m_state != READY) { + m_elog->write(log::elevel::library, + "asio::listen called from the wrong state"); + using websocketpp::error::make_error_code; + ec = make_error_code(websocketpp::error::invalid_state); + return; + } + + m_alog->write(log::alevel::devel,"asio::listen"); + + lib::asio::error_code bec; + + m_acceptor->open(ep.protocol(),bec); + if (bec) {ec = clean_up_listen_after_error(bec);return;} + + m_acceptor->set_option(lib::asio::socket_base::reuse_address(m_reuse_addr),bec); + if (bec) {ec = clean_up_listen_after_error(bec);return;} + + // if a TCP pre-bind handler is present, run it + if (m_tcp_pre_bind_handler) { + ec = m_tcp_pre_bind_handler(m_acceptor); + if (ec) { + ec = clean_up_listen_after_error(ec); + return; + } + } + + m_acceptor->bind(ep,bec); + if (bec) {ec = clean_up_listen_after_error(bec);return;} + + m_acceptor->listen(m_listen_backlog,bec); + if (bec) {ec = clean_up_listen_after_error(bec);return;} + + // Success + m_state = LISTENING; + ec = lib::error_code(); + } + + + + /// Set up endpoint for listening manually + /** + * Bind the internal acceptor using the settings specified by the endpoint e + * + * @param ep An endpoint to read settings from + */ + void listen(lib::asio::ip::tcp::endpoint const & ep) { + lib::error_code ec; + listen(ep,ec); + if (ec) { throw exception(ec); } + } + + /// Set up endpoint for listening with protocol and port (exception free) + /** + * Bind the internal acceptor using the given internet protocol and port. + * The endpoint must have been initialized by calling init_asio before + * listening. + * + * Common options include: + * - IPv6 with mapped IPv4 for dual stack hosts lib::asio::ip::tcp::v6() + * - IPv4 only: lib::asio::ip::tcp::v4() + * + * @param internet_protocol The internet protocol to use. + * @param port The port to listen on. + * @param ec Set to indicate what error occurred, if any. + */ + template + void listen(InternetProtocol const & internet_protocol, uint16_t port, + lib::error_code & ec) + { + lib::asio::ip::tcp::endpoint ep(internet_protocol, port); + listen(ep,ec); + } + + /// Set up endpoint for listening with protocol and port + /** + * Bind the internal acceptor using the given internet protocol and port. + * The endpoint must have been initialized by calling init_asio before + * listening. + * + * Common options include: + * - IPv6 with mapped IPv4 for dual stack hosts lib::asio::ip::tcp::v6() + * - IPv4 only: lib::asio::ip::tcp::v4() + * + * @param internet_protocol The internet protocol to use. + * @param port The port to listen on. + */ + template + void listen(InternetProtocol const & internet_protocol, uint16_t port) + { + lib::asio::ip::tcp::endpoint ep(internet_protocol, port); + listen(ep); + } + + /// Set up endpoint for listening on a port (exception free) + /** + * Bind the internal acceptor using the given port. The IPv6 protocol with + * mapped IPv4 for dual stack hosts will be used. If you need IPv4 only use + * the overload that allows specifying the protocol explicitly. + * + * The endpoint must have been initialized by calling init_asio before + * listening. + * + * @param port The port to listen on. + * @param ec Set to indicate what error occurred, if any. + */ + void listen(uint16_t port, lib::error_code & ec) { + listen(lib::asio::ip::tcp::v6(), port, ec); + } + + /// Set up endpoint for listening on a port + /** + * Bind the internal acceptor using the given port. The IPv6 protocol with + * mapped IPv4 for dual stack hosts will be used. If you need IPv4 only use + * the overload that allows specifying the protocol explicitly. + * + * The endpoint must have been initialized by calling init_asio before + * listening. + * + * @param port The port to listen on. + * @param ec Set to indicate what error occurred, if any. + */ + void listen(uint16_t port) { + listen(lib::asio::ip::tcp::v6(), port); + } + + /// Set up endpoint for listening on a host and service (exception free) + /** + * Bind the internal acceptor using the given host and service. More details + * about what host and service can be are available in the Asio + * documentation for ip::basic_resolver_query::basic_resolver_query's + * constructors. + * + * The endpoint must have been initialized by calling init_asio before + * listening. + * + * @param host A string identifying a location. May be a descriptive name or + * a numeric address string. + * @param service A string identifying the requested service. This may be a + * descriptive name or a numeric string corresponding to a port number. + * @param ec Set to indicate what error occurred, if any. + */ + void listen(std::string const & host, std::string const & service, + lib::error_code & ec) + { + using lib::asio::ip::tcp; + tcp::resolver r(*m_io_service); + tcp::resolver::query query(host, service); + tcp::resolver::iterator endpoint_iterator = r.resolve(query); + tcp::resolver::iterator end; + if (endpoint_iterator == end) { + m_elog->write(log::elevel::library, + "asio::listen could not resolve the supplied host or service"); + ec = make_error_code(error::invalid_host_service); + return; + } + listen(*endpoint_iterator,ec); + } + + /// Set up endpoint for listening on a host and service + /** + * Bind the internal acceptor using the given host and service. More details + * about what host and service can be are available in the Asio + * documentation for ip::basic_resolver_query::basic_resolver_query's + * constructors. + * + * The endpoint must have been initialized by calling init_asio before + * listening. + * + * @param host A string identifying a location. May be a descriptive name or + * a numeric address string. + * @param service A string identifying the requested service. This may be a + * descriptive name or a numeric string corresponding to a port number. + * @param ec Set to indicate what error occurred, if any. + */ + void listen(std::string const & host, std::string const & service) + { + lib::error_code ec; + listen(host,service,ec); + if (ec) { throw exception(ec); } + } + + /// Stop listening (exception free) + /** + * Stop listening and accepting new connections. This will not end any + * existing connections. + * + * @since 0.3.0-alpha4 + * @param ec A status code indicating an error, if any. + */ + void stop_listening(lib::error_code & ec) { + if (m_state != LISTENING) { + m_elog->write(log::elevel::library, + "asio::listen called from the wrong state"); + using websocketpp::error::make_error_code; + ec = make_error_code(websocketpp::error::invalid_state); + return; + } + + m_acceptor->close(); + m_state = READY; + ec = lib::error_code(); + } + + /// Stop listening + /** + * Stop listening and accepting new connections. This will not end any + * existing connections. + * + * @since 0.3.0-alpha4 + */ + void stop_listening() { + lib::error_code ec; + stop_listening(ec); + if (ec) { throw exception(ec); } + } + + /// Check if the endpoint is listening + /** + * @return Whether or not the endpoint is listening. + */ + bool is_listening() const { + return (m_state == LISTENING); + } + + /// wraps the run method of the internal io_service object + std::size_t run() { + return m_io_service->run(); + } + + /// wraps the run_one method of the internal io_service object + /** + * @since 0.3.0-alpha4 + */ + std::size_t run_one() { + return m_io_service->run_one(); + } + + /// wraps the stop method of the internal io_service object + void stop() { + m_io_service->stop(); + } + + /// wraps the poll method of the internal io_service object + std::size_t poll() { + return m_io_service->poll(); + } + + /// wraps the poll_one method of the internal io_service object + std::size_t poll_one() { + return m_io_service->poll_one(); + } + + /// wraps the reset method of the internal io_service object + void reset() { + m_io_service->reset(); + } + + /// wraps the stopped method of the internal io_service object + bool stopped() const { + return m_io_service->stopped(); + } + + /// Marks the endpoint as perpetual, stopping it from exiting when empty + /** + * Marks the endpoint as perpetual. Perpetual endpoints will not + * automatically exit when they run out of connections to process. To stop + * a perpetual endpoint call `end_perpetual`. + * + * An endpoint may be marked perpetual at any time by any thread. It must be + * called either before the endpoint has run out of work or before it was + * started + * + * @since 0.3.0 + */ + void start_perpetual() { + m_work.reset(new lib::asio::io_service::work(*m_io_service)); + } + + /// Clears the endpoint's perpetual flag, allowing it to exit when empty + /** + * Clears the endpoint's perpetual flag. This will cause the endpoint's run + * method to exit normally when it runs out of connections. If there are + * currently active connections it will not end until they are complete. + * + * @since 0.3.0 + */ + void stop_perpetual() { + m_work.reset(); + } + + /// Call back a function after a period of time. + /** + * Sets a timer that calls back a function after the specified period of + * milliseconds. Returns a handle that can be used to cancel the timer. + * A cancelled timer will return the error code error::operation_aborted + * A timer that expired will return no error. + * + * @param duration Length of time to wait in milliseconds + * @param callback The function to call back when the timer has expired + * @return A handle that can be used to cancel the timer if it is no longer + * needed. + */ + timer_ptr set_timer(long duration, timer_handler callback) { + timer_ptr new_timer = lib::make_shared( + *m_io_service, + lib::asio::milliseconds(duration) + ); + + new_timer->async_wait( + lib::bind( + &type::handle_timer, + this, + new_timer, + callback, + lib::placeholders::_1 + ) + ); + + return new_timer; + } + + /// Timer handler + /** + * The timer pointer is included to ensure the timer isn't destroyed until + * after it has expired. + * + * @param t Pointer to the timer in question + * @param callback The function to call back + * @param ec A status code indicating an error, if any. + */ + void handle_timer(timer_ptr, timer_handler callback, + lib::asio::error_code const & ec) + { + if (ec) { + if (ec == lib::asio::error::operation_aborted) { + callback(make_error_code(transport::error::operation_aborted)); + } else { + m_elog->write(log::elevel::info, + "asio handle_timer error: "+ec.message()); + log_err(log::elevel::info,"asio handle_timer",ec); + callback(socket_con_type::translate_ec(ec)); + } + } else { + callback(lib::error_code()); + } + } + + /// Accept the next connection attempt and assign it to con (exception free) + /** + * @param tcon The connection to accept into. + * @param callback The function to call when the operation is complete. + * @param ec A status code indicating an error, if any. + */ + void async_accept(transport_con_ptr tcon, accept_handler callback, + lib::error_code & ec) + { + if (m_state != LISTENING || !m_acceptor) { + using websocketpp::error::make_error_code; + ec = make_error_code(websocketpp::error::async_accept_not_listening); + return; + } + + m_alog->write(log::alevel::devel, "asio::async_accept"); + + if (config::enable_multithreading) { + m_acceptor->async_accept( + tcon->get_raw_socket(), + tcon->get_strand()->wrap(lib::bind( + &type::handle_accept, + this, + callback, + lib::placeholders::_1 + )) + ); + } else { + m_acceptor->async_accept( + tcon->get_raw_socket(), + lib::bind( + &type::handle_accept, + this, + callback, + lib::placeholders::_1 + ) + ); + } + } + + /// Accept the next connection attempt and assign it to con. + /** + * @param tcon The connection to accept into. + * @param callback The function to call when the operation is complete. + */ + void async_accept(transport_con_ptr tcon, accept_handler callback) { + lib::error_code ec; + async_accept(tcon,callback,ec); + if (ec) { throw exception(ec); } + } +protected: + /// Initialize logging + /** + * The loggers are located in the main endpoint class. As such, the + * transport doesn't have direct access to them. This method is called + * by the endpoint constructor to allow shared logging from the transport + * component. These are raw pointers to member variables of the endpoint. + * In particular, they cannot be used in the transport constructor as they + * haven't been constructed yet, and cannot be used in the transport + * destructor as they will have been destroyed by then. + */ + void init_logging(const lib::shared_ptr& a, const lib::shared_ptr& e) { + m_alog = a; + m_elog = e; + } + + void handle_accept(accept_handler callback, lib::asio::error_code const & + asio_ec) + { + lib::error_code ret_ec; + + m_alog->write(log::alevel::devel, "asio::handle_accept"); + + if (asio_ec) { + if (asio_ec == lib::asio::errc::operation_canceled) { + ret_ec = make_error_code(websocketpp::error::operation_canceled); + } else { + log_err(log::elevel::info,"asio handle_accept",asio_ec); + ret_ec = socket_con_type::translate_ec(asio_ec); + } + } + + callback(ret_ec); + } + + /// Initiate a new connection + // TODO: there have to be some more failure conditions here + void async_connect(transport_con_ptr tcon, uri_ptr u, connect_handler cb) { + using namespace lib::asio::ip; + + // Create a resolver + if (!m_resolver) { + m_resolver.reset(new lib::asio::ip::tcp::resolver(*m_io_service)); + } + + tcon->set_uri(u); + + std::string proxy = tcon->get_proxy(); + std::string host; + std::string port; + + if (proxy.empty()) { + host = u->get_host(); + port = u->get_port_str(); + } else { + lib::error_code ec; + + uri_ptr pu = lib::make_shared(proxy); + + if (!pu->get_valid()) { + cb(make_error_code(error::proxy_invalid)); + return; + } + + ec = tcon->proxy_init(u->get_authority()); + if (ec) { + cb(ec); + return; + } + + host = pu->get_host(); + port = pu->get_port_str(); + } + + // If the host is a numeric IP (e.g. 127.0.0.1) use the numeric_host + // resolver flag so that getaddrinfo() uses AI_NUMERICHOST and does NOT + // apply AI_ADDRCONFIG. AI_ADDRCONFIG causes resolution to fail on + // embedded devices where the loopback interface is not brought up when + // external network interfaces (eth0/wifi) are all down -- even though + // the gateway is reachable on 127.0.0.1 and the Firebolt gateway daemon + // is running locally. For genuine hostname strings the existing + // AI_ADDRCONFIG (address_configured) flag is preserved. + lib::asio::error_code m_numeric_ec; + lib::asio::ip::address::from_string(host, m_numeric_ec); + const tcp::resolver::query::flags resolve_flags = m_numeric_ec + ? tcp::resolver::query::address_configured + : tcp::resolver::query::numeric_host; + tcp::resolver::query query(host, port, resolve_flags); + + if (m_alog->static_test(log::alevel::devel)) { + m_alog->write(log::alevel::devel, + "starting async DNS resolve for "+host+":"+port); + } + + timer_ptr dns_timer; + + dns_timer = tcon->set_timer( + config::timeout_dns_resolve, + lib::bind( + &type::handle_resolve_timeout, + this, + dns_timer, + cb, + lib::placeholders::_1 + ) + ); + + if (config::enable_multithreading) { + m_resolver->async_resolve( + query, + tcon->get_strand()->wrap(lib::bind( + &type::handle_resolve, + this, + tcon, + dns_timer, + cb, + lib::placeholders::_1, + lib::placeholders::_2 + )) + ); + } else { + m_resolver->async_resolve( + query, + lib::bind( + &type::handle_resolve, + this, + tcon, + dns_timer, + cb, + lib::placeholders::_1, + lib::placeholders::_2 + ) + ); + } + } + + /// DNS resolution timeout handler + /** + * The timer pointer is included to ensure the timer isn't destroyed until + * after it has expired. + * + * @param dns_timer Pointer to the timer in question + * @param callback The function to call back + * @param ec A status code indicating an error, if any. + */ + void handle_resolve_timeout(timer_ptr, connect_handler callback, + lib::error_code const & ec) + { + lib::error_code ret_ec; + + if (ec) { + if (ec == transport::error::operation_aborted) { + m_alog->write(log::alevel::devel, + "asio handle_resolve_timeout timer cancelled"); + return; + } + + log_err(log::elevel::devel,"asio handle_resolve_timeout",ec); + ret_ec = ec; + } else { + ret_ec = make_error_code(transport::error::timeout); + } + + m_alog->write(log::alevel::devel,"DNS resolution timed out"); + m_resolver->cancel(); + callback(ret_ec); + } + + void handle_resolve(transport_con_ptr tcon, timer_ptr dns_timer, + connect_handler callback, lib::asio::error_code const & ec, + lib::asio::ip::tcp::resolver::iterator iterator) + { + if (ec == lib::asio::error::operation_aborted || + lib::asio::is_neg(dns_timer->expires_from_now())) + { + m_alog->write(log::alevel::devel,"async_resolve cancelled"); + return; + } + + dns_timer->cancel(); + + if (ec) { + log_err(log::elevel::info,"asio async_resolve",ec); + callback(socket_con_type::translate_ec(ec)); + return; + } + + if (m_alog->static_test(log::alevel::devel)) { + std::stringstream s; + s << "Async DNS resolve successful. Results: "; + + lib::asio::ip::tcp::resolver::iterator it, end; + for (it = iterator; it != end; ++it) { + s << (*it).endpoint() << " "; + } + + m_alog->write(log::alevel::devel,s.str()); + } + + m_alog->write(log::alevel::devel,"Starting async connect"); + + timer_ptr con_timer; + + con_timer = tcon->set_timer( + config::timeout_connect, + lib::bind( + &type::handle_connect_timeout, + this, + tcon, + con_timer, + callback, + lib::placeholders::_1 + ) + ); + + if (config::enable_multithreading) { + lib::asio::async_connect( + tcon->get_raw_socket(), + iterator, + tcon->get_strand()->wrap(lib::bind( + &type::handle_connect, + this, + tcon, + con_timer, + callback, + lib::placeholders::_1 + )) + ); + } else { + lib::asio::async_connect( + tcon->get_raw_socket(), + iterator, + lib::bind( + &type::handle_connect, + this, + tcon, + con_timer, + callback, + lib::placeholders::_1 + ) + ); + } + } + + /// Asio connect timeout handler + /** + * The timer pointer is included to ensure the timer isn't destroyed until + * after it has expired. + * + * @param tcon Pointer to the transport connection that is being connected + * @param con_timer Pointer to the timer in question + * @param callback The function to call back + * @param ec A status code indicating an error, if any. + */ + void handle_connect_timeout(transport_con_ptr tcon, timer_ptr, + connect_handler callback, lib::error_code const & ec) + { + lib::error_code ret_ec; + + if (ec) { + if (ec == transport::error::operation_aborted) { + m_alog->write(log::alevel::devel, + "asio handle_connect_timeout timer cancelled"); + return; + } + + log_err(log::elevel::devel,"asio handle_connect_timeout",ec); + ret_ec = ec; + } else { + ret_ec = make_error_code(transport::error::timeout); + } + + m_alog->write(log::alevel::devel,"TCP connect timed out"); + tcon->cancel_socket_checked(); + callback(ret_ec); + } + + void handle_connect(transport_con_ptr tcon, timer_ptr con_timer, + connect_handler callback, lib::asio::error_code const & ec) + { + if (ec == lib::asio::error::operation_aborted || + lib::asio::is_neg(con_timer->expires_from_now())) + { + m_alog->write(log::alevel::devel,"async_connect cancelled"); + return; + } + + con_timer->cancel(); + + if (ec) { + log_err(log::elevel::info,"asio async_connect",ec); + callback(socket_con_type::translate_ec(ec)); + return; + } + + if (m_alog->static_test(log::alevel::devel)) { + m_alog->write(log::alevel::devel, + "Async connect to "+tcon->get_remote_endpoint()+" successful."); + } + + callback(lib::error_code()); + } + + /// Initialize a connection + /** + * init is called by an endpoint once for each newly created connection. + * It's purpose is to give the transport policy the chance to perform any + * transport specific initialization that couldn't be done via the default + * constructor. + * + * @param tcon A pointer to the transport portion of the connection. + * + * @return A status code indicating the success or failure of the operation + */ + lib::error_code init(transport_con_ptr tcon) { + m_alog->write(log::alevel::devel, "transport::asio::init"); + + // Initialize the connection socket component + socket_type::init(lib::static_pointer_cast(tcon)); + + lib::error_code ec; + + ec = tcon->init_asio(m_io_service); + if (ec) {return ec;} + + tcon->set_tcp_pre_init_handler(m_tcp_pre_init_handler); + tcon->set_tcp_post_init_handler(m_tcp_post_init_handler); + + return lib::error_code(); + } +private: + /// Convenience method for logging the code and message for an error_code + template + void log_err(log::level l, char const * msg, error_type const & ec) { + std::stringstream s; + s << msg << " error: " << ec << " (" << ec.message() << ")"; + m_elog->write(l,s.str()); + } + + /// Helper for cleaning up in the listen method after an error + template + lib::error_code clean_up_listen_after_error(error_type const & ec) { + if (m_acceptor->is_open()) { + m_acceptor->close(); + } + log_err(log::elevel::info,"asio listen",ec); + return socket_con_type::translate_ec(ec); + } + + enum state { + UNINITIALIZED = 0, + READY = 1, + LISTENING = 2 + }; + + // Handlers + tcp_pre_bind_handler m_tcp_pre_bind_handler; + tcp_init_handler m_tcp_pre_init_handler; + tcp_init_handler m_tcp_post_init_handler; + + // Network Resources + io_service_ptr m_io_service; + bool m_external_io_service; + acceptor_ptr m_acceptor; + resolver_ptr m_resolver; + work_ptr m_work; + + // Network constants + int m_listen_backlog; + bool m_reuse_addr; + + lib::shared_ptr m_elog; + lib::shared_ptr m_alog; + + // Transport state + state m_state; +}; + +} // namespace asio +} // namespace transport +} // namespace websocketpp + +#endif // WEBSOCKETPP_TRANSPORT_ASIO_HPP diff --git a/test/integration/test-no-network.sh b/test/integration/test-no-network.sh new file mode 100755 index 0000000..72f2226 --- /dev/null +++ b/test/integration/test-no-network.sh @@ -0,0 +1,90 @@ +#!/usr/bin/env bash +# Copyright 2025 Comcast Cable Communications Management, LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 +# +# Integration test: ConnectViaNumericLoopbackIP with no external network. +# +# Reproduces the RDK STB failure (RDKEMW-16441): +# websocketpp used AI_ADDRCONFIG by default; on boxes with no active eth0/wifi, +# getaddrinfo("127.0.0.1") returned HOST_NOT_FOUND even though loopback is up. +# +# Isolation strategy (tried in order): +# 1. Docker --network none — loopback-only container; no special kernel privileges needed. +# Uses the same CI image as ./build.sh --docker. +# This is the preferred method and the pattern for future +# network-isolation tests in this repo. +# 2. unshare --net — kernel user namespace; requires +# sysctl kernel.unprivileged_userns_clone=1 (often restricted). +# +# Usage: +# ./test/integration/test-no-network.sh [path-to-utApp] +# +# The test binary defaults to build-dev/test/utApp (built with +tests). +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +REPO_ROOT="$(cd "$SCRIPT_DIR/../.." && pwd)" +BINARY="${1:-$REPO_ROOT/build-dev/test/utApp}" +IMAGE="firebolt-cpp-transport-ci:local" +FILTER="TransportNumericIPUTest.*:TransportIPv6UTest.ConnectViaIPv6LoopbackIP:TransportNumericIPResolverTest.ConnectFailureViaNumericIP" + +if [[ ! -x "$BINARY" ]]; then + echo "error: test binary not found: $BINARY" >&2 + echo " Build first: ./build.sh +tests (or ./build.sh --docker +tests)" >&2 + exit 1 +fi + +# Path of the binary relative to REPO_ROOT (used for Docker volume mount). +# If the binary is outside REPO_ROOT the Docker mount won't reach it — fall back +# to the default build-dev path which is always under REPO_ROOT. +if [[ "$BINARY" != "$REPO_ROOT"/* ]]; then + echo "warning: binary '$BINARY' is outside REPO_ROOT — using default build-dev/test/utApp for Docker mode" >&2 + BINARY="$REPO_ROOT/build-dev/test/utApp" +fi +REL_BIN="${BINARY#"$REPO_ROOT"/}" + +run_in_docker() { + if ! docker image inspect "$IMAGE" &>/dev/null; then + echo "Building CI Docker image (one-time)..." + docker build -t "$IMAGE" -f "$REPO_ROOT/.github/Dockerfile" "$REPO_ROOT" + fi + echo "==> Testing in Docker --network none: $BINARY" + docker run --rm --network none \ + --user "$(id -u):$(id -g)" \ + -v "$REPO_ROOT:/workspace" -w /workspace \ + "$IMAGE" \ + "./$REL_BIN" --gtest_filter="$FILTER" +} + +run_with_unshare() { + echo "==> Testing via unshare --net: $BINARY" + unshare --net bash -c " + set -euo pipefail + ip link set lo up + \"$BINARY\" --gtest_filter='$FILTER' + " +} + +if docker info &>/dev/null 2>&1; then + run_in_docker +elif unshare --net true 2>/dev/null; then + run_with_unshare +else + echo "error: neither Docker nor unshare --net is available." >&2 + echo " Option 1 (preferred): install/start Docker" >&2 + echo " Option 2: sudo sysctl -w kernel.unprivileged_userns_clone=1" >&2 + exit 1 +fi diff --git a/test/unit/gatewayTest.cpp b/test/unit/gatewayTest.cpp index a333776..e7ad0b2 100644 --- a/test/unit/gatewayTest.cpp +++ b/test/unit/gatewayTest.cpp @@ -19,6 +19,7 @@ #include "firebolt/gateway.h" #include "firebolt/logger.h" #include "utils.h" +#include #include #include #include @@ -698,3 +699,164 @@ TEST_F(GatewayUTest, DisconnectDoesNotHangWhenServerDisappearsWithActiveSubscrip EXPECT_LT(elapsed.count(), 200) << "unsubscribe() took " << elapsed.count() << " ms — blocked waiting for ACK from silent server (bug reproduced)"; } + +// --------------------------------------------------------------------------- +// Retry tests — verify reconnect_max_attempts / reconnect_delay_ms behaviour. +// Uses port 9008 to avoid conflict with GatewayUTest (9003). +// --------------------------------------------------------------------------- + +class GatewayRetryUTest : public ::testing::Test +{ +protected: + using server = websocketpp::server; + using connection_hdl = websocketpp::connection_hdl; + + server m_server; + std::unique_ptr m_serverThread; + bool m_serverStarted = false; + const std::string m_uri = "ws://127.0.0.1:9008"; + + void startServer() + { + try + { + m_server.init_asio(); + m_server.set_reuse_addr(true); + m_server.clear_access_channels(websocketpp::log::alevel::all); + m_server.listen( + websocketpp::lib::asio::ip::tcp::endpoint(websocketpp::lib::asio::ip::address::from_string("127.0.0.1"), + 9008)); + m_server.start_accept(); + m_serverThread = std::make_unique([this]() { m_server.run(); }); + m_serverStarted = true; + } + catch (const websocketpp::exception& ex) + { + FAIL() << "GatewayRetryUTest: server startup failed: " << ex.what(); + } + } + + void TearDown() override + { + GetGatewayInstance().disconnect(); + if (m_serverStarted) + { + m_server.stop_listening(); + m_server.stop(); + if (m_serverThread && m_serverThread->joinable()) + m_serverThread->join(); + } + } + + Firebolt::Config retryConfig(unsigned maxAttempts, unsigned delayMs = 50) + { + Firebolt::Config cfg{}; + cfg.wsUrl = m_uri; + cfg.reconnect_max_attempts = maxAttempts; + cfg.reconnect_delay_ms = delayMs; + return cfg; + } +}; + +// Server starts listening only after 2 retry delays have elapsed. +// connect() should keep retrying and ultimately return Error::None. +TEST_F(GatewayRetryUTest, RetryConnectsWhenServerDelayed) +{ + std::thread serverStarter( + [this]() + { + std::this_thread::sleep_for(std::chrono::milliseconds(120)); + startServer(); + }); + + IGateway& gateway = GetGatewayInstance(); + std::promise connectedPromise; + auto connectedFuture = connectedPromise.get_future(); + std::atomic promiseSet{false}; + + Firebolt::Error err = gateway.connect(retryConfig(5, 50), + [&](bool connected, const Firebolt::Error& /*error*/) + { + if (!promiseSet.exchange(true)) + connectedPromise.set_value(connected); + }); + + serverStarter.join(); + + ASSERT_EQ(err, Firebolt::Error::None) << "connect() should succeed after retries"; + ASSERT_EQ(connectedFuture.wait_for(std::chrono::milliseconds(100)), std::future_status::ready); + EXPECT_TRUE(connectedFuture.get()); + // Disconnect before locals go out of scope so TearDown's disconnect() is a + // no-op and cannot invoke this callback after the frame is gone. + gateway.disconnect(); +} + +// connect() with reconnect_max_attempts=0 (no retries) should return +// NotConnected immediately when the server is not listening. +TEST_F(GatewayRetryUTest, NoRetryFailsFast) +{ + IGateway& gateway = GetGatewayInstance(); + std::atomic callbackCount{0}; + + auto t0 = std::chrono::steady_clock::now(); + Firebolt::Error err = + gateway.connect(retryConfig(0), [&](bool /*connected*/, const Firebolt::Error& /*error*/) { ++callbackCount; }); + auto elapsed = std::chrono::duration_cast(std::chrono::steady_clock::now() - t0); + + EXPECT_NE(err, Firebolt::Error::None) << "connect() should fail when no server is listening"; + EXPECT_EQ(callbackCount.load(), 1); + EXPECT_LT(elapsed.count(), 5000) << "No-retry connect should fail quickly"; +} + +// disconnect() called from another thread while the retry loop is sleeping +// between attempts. connect() must return promptly (not after the full delay). +TEST_F(GatewayRetryUTest, DisconnectAbortsRetryDelay) +{ + IGateway& gateway = GetGatewayInstance(); + + // 3 attempts with 500 ms delay — without the abort fix this would take ~1500 ms. + auto cfg = retryConfig(3, 500); + + std::thread disconnecter( + [&gateway]() + { + std::this_thread::sleep_for(std::chrono::milliseconds(200)); + gateway.disconnect(); + }); + + auto t0 = std::chrono::steady_clock::now(); + gateway.connect(cfg, [](bool, const Firebolt::Error&) {}); + auto elapsed = std::chrono::duration_cast(std::chrono::steady_clock::now() - t0); + + disconnecter.join(); + EXPECT_LT(elapsed.count(), 800) << "connect() took " << elapsed.count() + << " ms — retry delay was not aborted by disconnect()"; +} + +// A second connect() while already connected must return AlreadyConnected and +// must NOT invoke the new callback with (false, ...) — doing so would be a +// spurious "disconnected" event to the caller. +TEST_F(GatewayRetryUTest, AlreadyConnectedNoFalseDisconnect) +{ + startServer(); + IGateway& gateway = GetGatewayInstance(); + + // First connect — must succeed. + std::atomic firstCallbacks{0}; + Firebolt::Error firstErr = gateway.connect(retryConfig(0), [&](bool, const Firebolt::Error&) { ++firstCallbacks; }); + ASSERT_EQ(firstErr, Firebolt::Error::None) << "first connect() should succeed"; + + // Second connect() while already connected. + std::atomic falseDisconnects{0}; + Firebolt::Error secondErr = gateway.connect(retryConfig(0), + [&](bool connected, const Firebolt::Error&) + { + if (!connected) + ++falseDisconnects; + }); + + EXPECT_EQ(secondErr, Firebolt::Error::AlreadyConnected); + EXPECT_EQ(falseDisconnects.load(), 0) << "second connect() must not emit a false disconnect event"; + // Disconnect before locals go out of scope (same reasoning as RetryConnectsWhenServerDelayed). + gateway.disconnect(); +} diff --git a/test/unit/transportTest.cpp b/test/unit/transportTest.cpp index 6bd710b..b26b88b 100644 --- a/test/unit/transportTest.cpp +++ b/test/unit/transportTest.cpp @@ -18,6 +18,7 @@ #include "transport.h" #include +#include #include #include #include @@ -696,3 +697,244 @@ TEST_F(TransportCustomServerUTest, MalformedMessageFromServer) err = transport.disconnect(); EXPECT_EQ(err, Firebolt::Error::None); } + +// Regression test for RDKEMW-16441: connecting via a numeric loopback IP +// (e.g. ws://127.0.0.1) must succeed even when external network interfaces +// are all down (no eth0/wifi). +// +// Root cause: websocketpp's resolver::query used default flags which include +// AI_ADDRCONFIG. On RDK STBs with no active external interface, +// getaddrinfo("127.0.0.1") returned HOST_NOT_FOUND because AI_ADDRCONFIG +// requires at least one configured non-loopback interface. +// +// Fix: src/vendor/websocketpp/transport/asio/endpoint.hpp detects numeric-IP +// hosts via asio::ip::address::from_string() and passes +// resolver::query::numeric_host (AI_NUMERICHOST), which bypasses AI_ADDRCONFIG. +// +// This test uses ws://127.0.0.1 (not "localhost") to exercise the numeric +// resolver path. In CI (Docker, loopback always up) it passes either way. +// To reproduce the original STB failure, run under +// test/integration/test-no-network.sh which uses `unshare --net`. +class TransportNumericIPUTest : public ::testing::Test +{ +protected: + using server = websocketpp::server; + using connection_hdl = websocketpp::connection_hdl; + + server m_server; + std::unique_ptr m_serverThread; + bool m_serverStarted = false; + const std::string m_uri = "ws://127.0.0.1:9005"; + + void SetUp() override + { + try + { + m_server.init_asio(); + m_server.set_reuse_addr(true); + m_server.clear_access_channels(websocketpp::log::alevel::all); + m_server.set_message_handler( + [this](connection_hdl hdl, server::message_ptr msg) + { + try + { + m_server.send(hdl, msg->get_payload(), msg->get_opcode()); + } + catch (const std::exception& ex) + { + ADD_FAILURE() << "Server echo send failed: " << ex.what(); + } + }); + m_server.listen( + websocketpp::lib::asio::ip::tcp::endpoint(websocketpp::lib::asio::ip::address::from_string("127.0.0.1"), + 9005)); + m_server.start_accept(); + m_serverThread = std::make_unique([this]() { m_server.run(); }); + m_serverStarted = true; + } + catch (const websocketpp::exception& ex) + { + FAIL() << "TransportNumericIPUTest: server startup failed: " << ex.what(); + } + } + + void TearDown() override + { + if (m_serverStarted) + { + m_server.stop_listening(); + m_server.stop(); + if (m_serverThread && m_serverThread->joinable()) + m_serverThread->join(); + } + } +}; + +TEST_F(TransportNumericIPUTest, ConnectViaNumericLoopbackIP) +{ + Transport transport; + std::promise connectionPromise; + auto connectionFuture = connectionPromise.get_future(); + std::atomic promiseSet{false}; + + auto onConnectionChange = [&](bool connected, const Firebolt::Error& /*err*/) + { + if (!promiseSet.exchange(true)) + { + connectionPromise.set_value(connected); + } + }; + + auto onMessage = [&](const nlohmann::json& /*msg*/) {}; + + Firebolt::Error err = transport.connect(m_uri, onMessage, onConnectionChange); + ASSERT_EQ(err, Firebolt::Error::None); + + auto status = connectionFuture.wait_for(std::chrono::milliseconds(500)); + ASSERT_EQ(status, std::future_status::ready) << "Connection to ws://127.0.0.1 timed out -- " + "AI_ADDRCONFIG bypass may be missing (see RDKEMW-16441)"; + EXPECT_TRUE(connectionFuture.get()); + + err = transport.disconnect(); + EXPECT_EQ(err, Firebolt::Error::None); +} + +// Full data round-trip over a numeric IP. The existing ConnectViaNumericLoopbackIP +// test only checks connection establishment; this one also exercises send/receive. +TEST_F(TransportNumericIPUTest, SendAndReceiveViaNumericIP) +{ + Transport transport; + std::promise connectionPromise; + auto connectionFuture = connectionPromise.get_future(); + std::promise messagePromise; + auto messageFuture = messagePromise.get_future(); + std::atomic connPromiseSet{false}; + + auto onConnectionChange = [&](bool connected, const Firebolt::Error& /*err*/) + { + if (!connPromiseSet.exchange(true)) + connectionPromise.set_value(connected); + }; + auto onMessage = [&](const nlohmann::json& msg) { messagePromise.set_value(msg); }; + + ASSERT_EQ(transport.connect(m_uri, onMessage, onConnectionChange), Firebolt::Error::None); + + ASSERT_EQ(connectionFuture.wait_for(std::chrono::milliseconds(500)), std::future_status::ready) + << "Connection to ws://127.0.0.1 timed out (RDKEMW-16441)"; + ASSERT_TRUE(connectionFuture.get()); + + nlohmann::json params = {{"key", "value"}}; + unsigned msgId = transport.getNextMessageID(); + ASSERT_EQ(transport.send("test.method", params, msgId), Firebolt::Error::None); + + ASSERT_EQ(messageFuture.wait_for(std::chrono::milliseconds(500)), std::future_status::ready) + << "Echo reply timed out"; + nlohmann::json received = messageFuture.get(); + EXPECT_EQ(received["id"], msgId); + EXPECT_EQ(received["method"], "test.method"); + EXPECT_EQ(received["params"]["key"], "value"); + + EXPECT_EQ(transport.disconnect(), Firebolt::Error::None); +} + +// Error path: onConnectionChange(false) must fire promptly when nothing is listening +// on the numeric IP — verifies the resolver error path through numeric_host flag. +TEST(TransportNumericIPResolverTest, ConnectFailureViaNumericIP) +{ + Transport transport; + std::promise connectedPromise; + auto connectedFuture = connectedPromise.get_future(); + std::promise errorPromise; + auto errorFuture = errorPromise.get_future(); + std::atomic promiseSet{false}; + + auto onConnectionChange = [&](bool connected, const Firebolt::Error& err) + { + if (!promiseSet.exchange(true)) + { + connectedPromise.set_value(connected); + if (!connected) + errorPromise.set_value(err); + } + }; + auto onMessage = [](const nlohmann::json& /*msg*/) + { FAIL() << "Should not receive a message on a failed connection"; }; + + ASSERT_EQ(transport.connect("ws://127.0.0.1:49152", onMessage, onConnectionChange), Firebolt::Error::None); + + ASSERT_EQ(connectedFuture.wait_for(std::chrono::milliseconds(500)), std::future_status::ready) + << "onConnectionChange callback timed out"; + EXPECT_FALSE(connectedFuture.get()); + + ASSERT_EQ(errorFuture.wait_for(std::chrono::milliseconds(500)), std::future_status::ready); + EXPECT_NE(errorFuture.get(), Firebolt::Error::None); +} + +// IPv6 loopback: ws://[::1]:9006 +// asio::ip::address::from_string("::1") succeeds, so numeric_host is used and +// AI_ADDRCONFIG is bypassed — same fix path as 127.0.0.1 but exercising IPv6. +class TransportIPv6UTest : public ::testing::Test +{ +protected: + using server = websocketpp::server; + using connection_hdl = websocketpp::connection_hdl; + + server m_server; + std::unique_ptr m_serverThread; + bool m_serverStarted = false; + const std::string m_uri = "ws://[::1]:9006"; + + void SetUp() override + { + try + { + m_server.init_asio(); + m_server.set_reuse_addr(true); + m_server.clear_access_channels(websocketpp::log::alevel::all); + m_server.listen( + websocketpp::lib::asio::ip::tcp::endpoint(websocketpp::lib::asio::ip::address::from_string("::1"), 9006)); + m_server.start_accept(); + m_serverThread = std::make_unique([this]() { m_server.run(); }); + m_serverStarted = true; + } + catch (const websocketpp::exception& ex) + { + FAIL() << "TransportIPv6UTest: server startup failed: " << ex.what(); + } + } + + void TearDown() override + { + if (m_serverStarted) + { + m_server.stop_listening(); + m_server.stop(); + if (m_serverThread && m_serverThread->joinable()) + m_serverThread->join(); + } + } +}; + +TEST_F(TransportIPv6UTest, ConnectViaIPv6LoopbackIP) +{ + Transport transport; + std::promise connectionPromise; + auto connectionFuture = connectionPromise.get_future(); + std::atomic promiseSet{false}; + + auto onConnectionChange = [&](bool connected, const Firebolt::Error& /*err*/) + { + if (!promiseSet.exchange(true)) + connectionPromise.set_value(connected); + }; + auto onMessage = [](const nlohmann::json& /*msg*/) {}; + + ASSERT_EQ(transport.connect(m_uri, onMessage, onConnectionChange), Firebolt::Error::None); + + ASSERT_EQ(connectionFuture.wait_for(std::chrono::milliseconds(500)), std::future_status::ready) + << "Connection to ws://[::1] timed out -- " + "AI_ADDRCONFIG bypass may be missing for IPv6 numeric addresses (RDKEMW-16441)"; + EXPECT_TRUE(connectionFuture.get()); + + EXPECT_EQ(transport.disconnect(), Firebolt::Error::None); +}