From 3a6a1bac30c2cbe5b135289f345320bf9654f1bd Mon Sep 17 00:00:00 2001 From: oliviamiller <106617921+oliviamiller@users.noreply.github.com> Date: Tue, 5 May 2026 10:56:25 -0400 Subject: [PATCH 1/3] add watchdog --- src/audio_buffer.hpp | 3 + src/audio_utils.hpp | 57 ++++++++++++-- src/microphone.cpp | 69 +++++++++++------ src/microphone.hpp | 11 ++- src/speaker.cpp | 102 ++++++++++++++++++++++--- src/speaker.hpp | 26 ++++++- src/watchdog.hpp | 154 ++++++++++++++++++++++++++++++++++++++ test/CMakeLists.txt | 1 + test/audio_utils_test.cpp | 102 +++++++++++++++++++++++++ test/microphone_test.cpp | 85 ++++++++++++++++++++- test/speaker_test.cpp | 100 +++++++++++++++++++++++++ test/test_utils.hpp | 32 ++++++++ test/watchdog_test.cpp | 132 ++++++++++++++++++++++++++++++++ 13 files changed, 830 insertions(+), 44 deletions(-) create mode 100644 src/watchdog.hpp create mode 100644 test/watchdog_test.cpp diff --git a/src/audio_buffer.hpp b/src/audio_buffer.hpp index 50cd40c..3e1b840 100644 --- a/src/audio_buffer.hpp +++ b/src/audio_buffer.hpp @@ -14,6 +14,9 @@ namespace vsdk = ::viam::sdk; constexpr int BUFFER_DURATION_SECONDS = 30; // How much audio history to keep in buffer +// Unit-conversion constant: nanoseconds per millisecond. +constexpr uint64_t NS_PER_MS = 1'000'000; + // Base class for audio buffering - lock-free circular buffer with atomic operations // Can be used by both input (microphone) and output (speaker) models. // There is a 1:1 correspondence between AudioBuffer and viam audio resource diff --git a/src/audio_utils.hpp b/src/audio_utils.hpp index 77c748e..5fc7643 100644 --- a/src/audio_utils.hpp +++ b/src/audio_utils.hpp @@ -100,6 +100,49 @@ inline PaDeviceIndex findDeviceById(const std::string& id, return paNoDevice; } +// Result of resolve_device_id_into_params. +enum class ResolveStatus { + NotConfigured, // `device_id` was empty — caller should proceed with cached params + Found, // device_id resolved (params updated in place if the device moved) + NotFound, // device_id was configured but no matching device exists on the system (might be unplugged) +}; + +// If `device_id` is non-empty, re-resolves it against the current PortAudio device state +// and updates `params.device_index` / `params.device_name` in-place if the device has +// moved (e.g. USB unplug/replug → kernel re-enumerated). Returns a ResolveStatus the +// caller can use to decide whether the restart is worth attempting. +inline ResolveStatus resolve_device_id_into_params(const std::string& device_id, + StreamParams& params, + const audio::portaudio::PortAudioInterface* pa, + const std::string& log_prefix, + const audio::device_id::DeviceIdResolver* resolver = nullptr) { + if (device_id.empty()) { + return ResolveStatus::NotConfigured; + } + audio::portaudio::RealPortAudio real_pa; + const audio::portaudio::PortAudioInterface& audio_interface = pa ? *pa : real_pa; + audio::device_id::RealDeviceIdResolver real_resolver; + const audio::device_id::DeviceIdResolver& resolver_ref = resolver ? *resolver : real_resolver; + const PaDeviceIndex resolved = findDeviceById(device_id, audio_interface, resolver_ref); + if (resolved == paNoDevice) { + VIAM_SDK_LOG(warn) << log_prefix << " device_id '" << device_id + << "' not found on system; skipping restart"; + return ResolveStatus::NotFound; + } + if (resolved == params.device_index) { + return ResolveStatus::Found; + } + const PaDeviceInfo* const new_info = audio_interface.getDeviceInfo(resolved); + VIAM_SDK_LOG(info) << log_prefix << " device_id '" << device_id << "' moved from index " + << params.device_index << " to " << resolved << " (" + << ((new_info && new_info->name) ? new_info->name : "") << "); updating stream params"; + params.device_index = resolved; + if (new_info && new_info->name) { + params.device_name = new_info->name; + } + return ResolveStatus::Found; +} + inline ConfigParams parseConfigAttributes(const viam::sdk::ResourceConfig& cfg) { const auto attrs = cfg.attributes(); ConfigParams params; @@ -288,13 +331,11 @@ inline void openStream(PaStream*& stream, const StreamParams& params, const audi PaError err = audio_interface.isFormatSupported(inputParams, outputParams, params.sample_rate); if (err != paNoError) { std::ostringstream buffer; - buffer << "Audio format not supported by device '" << params.device_name << "' (index " << params.device_index - << "): " << Pa_GetErrorText(err) << "\n" - << "Requested configuration:\n" - << " - Sample rate: " << params.sample_rate << " Hz\n" - << " - Channels: " << params.num_channels << "\n" - << " - Format: 16-bit PCM\n" - << " - Latency: " << params.suggested_latency_seconds << " seconds"; + buffer << "Could not open stream — PortAudio: " << Pa_GetErrorText(err) << " — device '" + << params.device_name << "' (index " << params.device_index << "). " + << "Requested: sample_rate=" << params.sample_rate << "Hz, " + << "channels=" << params.num_channels << ", format=16-bit PCM, " + << "latency=" << params.suggested_latency_seconds << "s"; VIAM_SDK_LOG(error) << buffer.str(); throw std::runtime_error(buffer.str()); } @@ -463,7 +504,7 @@ inline void log_callback_staleness(const std::atomic& last_callback_ti const uint64_t last_cb = last_callback_time_ns.load(); if (last_cb > 0) { const uint64_t now_ns = static_cast(std::chrono::steady_clock::now().time_since_epoch().count()); - const uint64_t elapsed_ms = (now_ns - last_cb) / 1'000'000; + const uint64_t elapsed_ms = (now_ns - last_cb) / audio::NS_PER_MS; if (elapsed_ms > STREAM_RESTART_THRESHOLD_MS && now_ns - last_log_ns > STALENESS_LOG_THROTTLE_NS) { last_log_ns = now_ns; if (!stream) { diff --git a/src/microphone.cpp b/src/microphone.cpp index a015c86..0a61331 100644 --- a/src/microphone.cpp +++ b/src/microphone.cpp @@ -94,13 +94,27 @@ void Microphone::restart_stalled_stream(const std::shared_ptr= MAX_STREAM_RESTART_ATTEMPTS) { - VIAM_SDK_LOG(error) << "[get_audio] Failed to restart stream after " << MAX_STREAM_RESTART_ATTEMPTS - << " attempts, giving up: " << e.what(); - throw; + if (restart_attempts_ < audio::utils::MAX_RESTART_ATTEMPTS) { + ++restart_attempts_; } - VIAM_SDK_LOG(error) << "[get_audio] Failed to restart stream (attempt " << restart_attempts_ << "/" << MAX_STREAM_RESTART_ATTEMPTS - << "): " << e.what(); + VIAM_SDK_LOG(error) << "[microphone stall_watcher] Failed to restart stream (attempt " << restart_attempts_ << "/" + << audio::utils::MAX_RESTART_ATTEMPTS << "): " << e.what(); + // The watchdog tracks attempts and applies backoff once we hit MAX — no need + // to throw here, which would just re-emit the same error from the watchdog's + // catch handler. } } @@ -193,6 +207,7 @@ Microphone::Microphone(viam::sdk::Dependencies deps, viam::sdk::ResourceConfig c std::lock_guard lock(stream_ctx_mu_); stream_params_ = setup.stream_params; stream_params_.user_data = setup.audio_context.get(); + device_id_ = setup.config_params.device_id; audio::utils::restart_stream(stream_, stream_params_, pa_); latency_ = audio::utils::get_stream_latency(stream_, stream_params_, pa_); audio_context_ = setup.audio_context; @@ -200,10 +215,28 @@ Microphone::Microphone(viam::sdk::Dependencies deps, viam::sdk::ResourceConfig c setup.config_params.sample_rate.value_or(setup.stream_params.sample_rate); // User's requested rate, defaults to device rate historical_throttle_ms_ = setup.config_params.historical_throttle_ms.value_or(DEFAULT_HISTORICAL_THROTTLE_MS); } + + watchdog_ = std::make_unique>( + [this]() { + std::lock_guard lock(stream_ctx_mu_); + return audio_context_; + }, + [this]() { + std::lock_guard lock(stream_ctx_mu_); + return restart_attempts_; + }, + [this](const std::shared_ptr& ctx) { restart_stalled_stream(ctx); }, + "[microphone stall_watcher]"); + watchdog_->start(); } Microphone::~Microphone() { VIAM_SDK_LOG(debug) << "[Microphone::~Microphone] Destructor called"; + // Stop and join the watchdog before tearing down the stream so it can't touch a + // half-destroyed audio_context_. + if (watchdog_) { + watchdog_->stop(); + } if (stream_) { PaError err = Pa_StopStream(stream_); if (err != paNoError) { @@ -321,9 +354,11 @@ void Microphone::reconfigure(const viam::sdk::Dependencies& deps, const viam::sd stream_params_ = setup.stream_params; stream_params_.user_data = setup.audio_context.get(); + device_id_ = setup.config_params.device_id; audio::utils::restart_stream(stream_, stream_params_, pa_); latency_ = audio::utils::get_stream_latency(stream_, stream_params_, pa_); audio_context_ = setup.audio_context; + restart_attempts_ = 0; requested_sample_rate_ = setup.config_params.sample_rate.value_or( setup.stream_params.sample_rate); // User's requested rate, defaults to device rate historical_throttle_ms_ = setup.config_params.historical_throttle_ms.value_or(DEFAULT_HISTORICAL_THROTTLE_MS); @@ -444,18 +479,6 @@ void Microphone::get_audio(std::string const& codec, // Wait until we have a full chunk worth of samples if (available_samples < device_samples_per_chunk) { - audio::utils::log_callback_staleness( - stream_context->last_callback_time_ns, "[get_audio]", current_stream, last_staleness_log_ns); - - const uint64_t last_cb = stream_context->last_callback_time_ns.load(); - if (last_cb > 0) { - const uint64_t now_ns = static_cast(std::chrono::steady_clock::now().time_since_epoch().count()); - const uint64_t stale_ms = (now_ns - last_cb) / 1'000'000; - if (stale_ms > audio::utils::STREAM_RESTART_THRESHOLD_MS) { - VIAM_SDK_LOG(warn) << "[get_audio] Stream stalled for " << stale_ms << "ms, attempting restart"; - restart_stalled_stream(stream_context); - } - } const uint64_t overflow_count = stream_context->input_overflow_count.load(); if (overflow_count != last_logged_overflow_count) { diff --git a/src/microphone.hpp b/src/microphone.hpp index a5cce9d..3ec3a7b 100644 --- a/src/microphone.hpp +++ b/src/microphone.hpp @@ -15,12 +15,12 @@ #include "audio_utils.hpp" #include "portaudio.h" #include "portaudio.hpp" +#include "watchdog.hpp" namespace microphone { namespace vsdk = ::viam::sdk; constexpr double DEFAULT_HISTORICAL_THROTTLE_MS = 50; -constexpr int MAX_STREAM_RESTART_ATTEMPTS = 3; PaDeviceIndex findDeviceByName(const std::string& name, const audio::portaudio::PortAudioInterface& pa); // Calculates the initial read position from a previous timestamp @@ -84,6 +84,15 @@ class Microphone final : public viam::sdk::AudioIn, public viam::sdk::Reconfigur int restart_attempts_; audio::utils::StreamParams stream_params_; + + // Device id from the resource config (empty if user configured by device_name or + // system default). Used by restart_stalled_stream to re-resolve the device's current + // PortAudio index, so we recover from kernel re-enumeration (e.g. USB unplug/replug). + std::string device_id_; + + // Background watchdog that polls audio_context_->last_callback_time_ns and triggers + // restart_stalled_stream when the mic callback has gone silent for too long. + std::unique_ptr> watchdog_; }; /** diff --git a/src/speaker.cpp b/src/speaker.cpp index 3fbb029..3cd462e 100644 --- a/src/speaker.cpp +++ b/src/speaker.cpp @@ -27,21 +27,41 @@ Speaker::Speaker(viam::sdk::Dependencies deps, viam::sdk::ResourceConfig cfg, au // Set new configuration and start stream under lock { std::lock_guard lock(stream_mu_); - device_name_ = setup.stream_params.device_name; sample_rate_ = setup.stream_params.sample_rate; num_channels_ = setup.stream_params.num_channels; audio_context_ = setup.audio_context; setup.stream_params.user_data = setup.audio_context.get(); - audio::utils::restart_stream(stream_, setup.stream_params, pa_); - latency_ = audio::utils::get_stream_latency(stream_, setup.stream_params, pa_); + stream_params_ = setup.stream_params; + device_id_ = setup.config_params.device_id; + audio::utils::restart_stream(stream_, stream_params_, pa_); + latency_ = audio::utils::get_stream_latency(stream_, stream_params_, pa_); volume_ = setup.config_params.volume; if (volume_) { - audio::volume::set_volume(device_name_, *volume_); + audio::volume::set_volume(stream_params_.device_name, *volume_); } } + + watchdog_ = std::make_unique>( + [this]() { + std::lock_guard lock(stream_mu_); + return audio_context_; + }, + [this]() { + std::lock_guard lock(stream_mu_); + return restart_attempts_; + }, + [this](const std::shared_ptr& ctx) { restart_stalled_stream(ctx); }, + "[speaker stall_watcher]"); + watchdog_->start(); } Speaker::~Speaker() { + // Stop and join the watchdog before tearing down the stream so it can't touch a + // half-destroyed audio_context_. + if (watchdog_) { + watchdog_->stop(); + } + if (stream_) { PaError err = Pa_StopStream(stream_); if (err != paNoError) { @@ -57,6 +77,68 @@ Speaker::~Speaker() { vsdk::Model Speaker::model = {"viam", "system-audio", "speaker"}; +/** + * Tear down the existing stream and bring up a fresh one with the saved params. + * + * Bails out if `playback_context` is no longer the active audio_context_ — that means + * either reconfigure() or another stall recovery already replaced it, so the in-flight + * play() call (if any) is about to exit on its own and we shouldn't race. + * + * On a successful restart, any unplayed audio in the old buffer is discarded — same + * behavior as reconfigure(). The in-flight play() loop will see audio_context_ change + * and return early. + */ +void Speaker::restart_stalled_stream(const std::shared_ptr& playback_context) { + std::lock_guard lock(stream_mu_); + if (playback_context != audio_context_) { + return; + } + + VIAM_SDK_LOG(warn) << "[speaker stall_watcher] Restarting stalled speaker stream"; + + // If device_id was configured, re-resolve before restarting in case the kernel + // re-enumerated and the cached device_index is stale (e.g. USB unplug/replug). + const auto resolve_status = + audio::utils::resolve_device_id_into_params(device_id_, stream_params_, pa_, "[speaker stall_watcher]"); + if (resolve_status == audio::utils::ResolveStatus::NotFound) { + // Device is gone. Don't bother trying to open a stream against a stale path — + // PortAudio would just spam ALSA errors. Bump attempts so the watchdog enters + // backoff; once the device returns, a backoff retry will resolve and proceed. + if (restart_attempts_ < audio::utils::MAX_RESTART_ATTEMPTS) { + ++restart_attempts_; + } + return; + } + + if (stream_) { + try { + audio::utils::abort_stream(stream_, pa_); + } catch (const std::exception& e) { + VIAM_SDK_LOG(error) << "[speaker stall_watcher] Error shutting down stalled stream: " << e.what(); + } + stream_ = nullptr; + } + + const viam::sdk::audio_info info{viam::sdk::audio_codecs::PCM_16, stream_params_.sample_rate, stream_params_.num_channels}; + const auto new_context = std::make_shared(info, audio::BUFFER_DURATION_SECONDS); + + try { + stream_params_.user_data = new_context.get(); + audio::utils::restart_stream(stream_, stream_params_, pa_); + latency_ = audio::utils::get_stream_latency(stream_, stream_params_, pa_); + audio_context_ = new_context; + restart_attempts_ = 0; + VIAM_SDK_LOG(info) << "[speaker stall_watcher] Speaker stream restarted successfully"; + } catch (const std::exception& e) { + if (restart_attempts_ < audio::utils::MAX_RESTART_ATTEMPTS) { + ++restart_attempts_; + } + VIAM_SDK_LOG(error) << "[speaker stall_watcher] Failed to restart stream (attempt " << restart_attempts_ << "/" + << audio::utils::MAX_RESTART_ATTEMPTS << "): " << e.what(); + // Watchdog has no caller to throw to — keep going, it'll try again next tick. + } +} + /** * PortAudio callback function - runs on real-time audio thread. * This function must not: @@ -182,7 +264,7 @@ viam::sdk::ProtoStruct Speaker::do_command(const viam::sdk::ProtoStruct& command } std::lock_guard lock(stream_mu_); - audio::volume::set_volume(device_name_, vol); + audio::volume::set_volume(stream_params_.device_name, vol); volume_ = vol; return viam::sdk::ProtoStruct{{"volume", static_cast(vol)}}; @@ -453,15 +535,17 @@ void Speaker::reconfigure(const vsdk::Dependencies& deps, const vsdk::ResourceCo // Otherwise the callback thread may still be accessing the old context // after we destroy it (heap-use-after-free) setup.stream_params.user_data = setup.audio_context.get(); - audio::utils::restart_stream(stream_, setup.stream_params, pa_); - device_name_ = setup.stream_params.device_name; + stream_params_ = setup.stream_params; + audio::utils::restart_stream(stream_, stream_params_, pa_); sample_rate_ = setup.stream_params.sample_rate; num_channels_ = setup.stream_params.num_channels; - latency_ = audio::utils::get_stream_latency(stream_, setup.stream_params, pa_); + latency_ = audio::utils::get_stream_latency(stream_, stream_params_, pa_); audio_context_ = setup.audio_context; + device_id_ = setup.config_params.device_id; + restart_attempts_ = 0; volume_ = setup.config_params.volume; if (volume_) { - audio::volume::set_volume(device_name_, *volume_); + audio::volume::set_volume(stream_params_.device_name, *volume_); } } VIAM_SDK_LOG(info) << "[reconfigure] Reconfigure completed successfully"; diff --git a/src/speaker.hpp b/src/speaker.hpp index d7dd080..498a5a7 100644 --- a/src/speaker.hpp +++ b/src/speaker.hpp @@ -5,14 +5,17 @@ #include #include #include +#include #include #include #include #include #include #include "audio_stream.hpp" +#include "audio_utils.hpp" #include "portaudio.h" #include "portaudio.hpp" +#include "watchdog.hpp" namespace speaker { namespace vsdk = ::viam::sdk; @@ -57,7 +60,6 @@ class Speaker final : public viam::sdk::AudioOut, public viam::sdk::Reconfigurab void reconfigure(const viam::sdk::Dependencies& deps, const viam::sdk::ResourceConfig& cfg); // Member variables - std::string device_name_; double latency_; int sample_rate_; int num_channels_; @@ -78,6 +80,28 @@ class Speaker final : public viam::sdk::AudioOut, public viam::sdk::Reconfigurab // Flag to interrupt playback std::atomic stop_requested_{false}; + + // Saved stream params so the watchdog can rebuild the stream with the same configuration. + audio::utils::StreamParams stream_params_; + + // Device id from the resource config (empty if user configured by device_name or + // system default). Used by restart_stalled_stream to re-resolve the device's current + // PortAudio index, so we recover from kernel re-enumeration (e.g. USB unplug/replug). + std::string device_id_; + + // Counts consecutive failed restart attempts; reset to 0 after a successful restart + // or a reconfigure(). Once it reaches audio::utils::MAX_RESTART_ATTEMPTS, the watchdog + // backs off to slow retries (audio::utils::BACKOFF_INTERVAL_MS) instead of polling + // every audio::utils::POLL_INTERVAL — supports hot-replug recovery without spamming + // the kernel. The counter is capped at MAX so it doesn't grow unbounded. + int restart_attempts_ = 0; + + // Background watchdog that polls audio_context_->last_callback_time_ns and triggers + // restart_stalled_stream when the speaker callback has gone silent for too long, + std::unique_ptr> watchdog_; + + private: + void restart_stalled_stream(const std::shared_ptr& playback_context); }; } // namespace speaker diff --git a/src/watchdog.hpp b/src/watchdog.hpp new file mode 100644 index 0000000..54245a2 --- /dev/null +++ b/src/watchdog.hpp @@ -0,0 +1,154 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +namespace audio { +namespace utils { + +constexpr int MAX_RESTART_ATTEMPTS = 3; +constexpr std::chrono::milliseconds POLL_INTERVAL{200}; +constexpr uint64_t STALL_THRESHOLD_MS = 2000; +// Once the attempts budget is exhausted, the watchdog stops attempting fast restarts and +// instead retries every BACKOFF_INTERVAL_MS. This supports hot-replug scenarios: the +// device may come back later (USB plug-in, driver recovery) and we want to resume +// recovery without forcing the user to reconfigure. Each backoff retry costs the same +// as a normal restart attempt — milliseconds — so the long interval is purely about +// not flooding logs while waiting. +constexpr uint64_t BACKOFF_INTERVAL_MS = 5000; + +// Background watchdog that polls an audio component's `last_callback_time_ns` and +// triggers a restart when the callback has gone silent for too long. Used by both +// Speaker and Microphone — the component-specific bits are passed in as callbacks. +// +// ContextT is the concrete audio context type (InputStreamContext / OutputStreamContext). +// It must expose a `std::atomic last_callback_time_ns` member — both subclasses +// inherit one from AudioBuffer. +// +// Lifecycle: construct → start() → (optional) stop() → destruct. start() and stop() are +// not thread-safe; call them from the owning component only. The destructor joins the +// poll thread. +template +class StallWatchdog { + public: + // Returns the currently active audio context, or nullptr if no stream is up. + using GetContextFn = std::function()>; + + // Returns the current consecutive-failed-restart count. Watchdog stops calling + // restart_fn once this reaches MAX_RESTART_ATTEMPTS; the latch clears automatically + // once the count drops back below max (e.g. after a reconfigure). + using GetAttemptsFn = std::function; + + // Performs the restart. Argument is the same context get_context returned a moment + // earlier — the component should bail if it has since been swapped (reconfigure / + // peer restart). + using RestartFn = std::function&)>; + + StallWatchdog(GetContextFn get_context, GetAttemptsFn get_attempts, RestartFn restart_fn, std::string log_prefix) + : get_context_(std::move(get_context)), + get_attempts_(std::move(get_attempts)), + restart_fn_(std::move(restart_fn)), + log_prefix_(std::move(log_prefix)) {} + + ~StallWatchdog() { stop(); } + + // Spins up the poll thread. Call after the owning component has finished + // constructing its first stream so the first poll sees valid state. + void start() { + thread_ = std::thread([this] { loop(); }); + } + + // Signals stop and joins the poll thread. Safe to call multiple times. + // The destructor calls this automatically; explicit stop() is for callers + // that need to control teardown ordering. + void stop() { + stop_.store(true); + if (thread_.joinable()) { + thread_.join(); + } + } + + private: + void loop() { + while (!stop_.load()) { + std::this_thread::sleep_for(POLL_INTERVAL); + if (stop_.load()) { + return; + } + + const std::shared_ptr ctx = get_context_(); + if (!ctx) { + continue; + } + + const uint64_t last_cb = ctx->last_callback_time_ns.load(); + if (last_cb == 0) { + // Callback hasn't fired yet — stream just opened, give it time. + continue; + } + + const uint64_t now_ns = static_cast(std::chrono::steady_clock::now().time_since_epoch().count()); + if (now_ns <= last_cb) { + continue; + } + const uint64_t stale_ms = (now_ns - last_cb) / audio::NS_PER_MS; + if (stale_ms <= STALL_THRESHOLD_MS) { + continue; + } + + const int attempts = get_attempts_(); + if (attempts >= MAX_RESTART_ATTEMPTS) { + // Budget exhausted — back off to slow retries instead of giving up + // permanently, so hot-replug (device unplugged then plugged back in) + // recovers automatically without a reconfigure. + if (now_ns - last_attempt_ns_.load() < BACKOFF_INTERVAL_MS * audio::NS_PER_MS) { + if (!backoff_logged_.exchange(true)) { + VIAM_SDK_LOG(warn) << log_prefix_ << " Restart budget exhausted; backing off to " + << BACKOFF_INTERVAL_MS / 1000 << "s retries until the device returns"; + } + continue; + } + VIAM_SDK_LOG(info) << log_prefix_ << " Backoff retry (attempts=" << attempts << "); checking for device"; + } else { + // Attempts is below max — clear the backoff latch so a future exhaustion + // logs the "backing off" message again. + backoff_logged_.store(false); + VIAM_SDK_LOG(warn) << log_prefix_ << " Callback stale for " << stale_ms << "ms, attempting restart"; + } + + last_attempt_ns_.store(now_ns); + try { + restart_fn_(ctx); + } catch (const std::exception& e) { + VIAM_SDK_LOG(error) << log_prefix_ << " restart_fn threw: " << e.what(); + } + } + } + + GetContextFn get_context_; + GetAttemptsFn get_attempts_; + RestartFn restart_fn_; + std::string log_prefix_; + std::thread thread_; + std::atomic stop_{false}; + // Latches once the watchdog has logged the "backing off" message after the attempts + // budget was exhausted, so we don't spam the log every poll while waiting for backoff. + // Cleared automatically once get_attempts drops back below MAX_RESTART_ATTEMPTS + // (typically after a successful restart resets the counter). + std::atomic backoff_logged_{false}; + // Wall-clock time of the most recent restart attempt, used to enforce the + // BACKOFF_INTERVAL_MS gap once the attempts budget is exhausted. Initialized to 0 + // so the very first stale-detection always fires immediately. + std::atomic last_attempt_ns_{0}; +}; + +} // namespace utils +} // namespace audio diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 8c3aa32..cc60d76 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -83,3 +83,4 @@ audio_add_gtest(resample_test.cpp) audio_add_gtest(audio_codec_test.cpp) audio_add_gtest(file_utils_test.cpp) audio_add_gtest(routing_filter_test.cpp) +audio_add_gtest(watchdog_test.cpp) diff --git a/test/audio_utils_test.cpp b/test/audio_utils_test.cpp index 0bb97a9..f4620f7 100644 --- a/test/audio_utils_test.cpp +++ b/test/audio_utils_test.cpp @@ -466,6 +466,108 @@ TEST_F(AudioUtilsTest, AbortStream_ThrowsOnCloseFailure) { EXPECT_THROW(audio::utils::abort_stream(nullptr, mock_pa_.get()), std::runtime_error); } +// --- resolve_device_id_into_params ------------------------------------------------- + +// File-scope helpers shared by the four resolver tests below. +namespace { + +audio::utils::StreamParams params_at(PaDeviceIndex idx, const std::string& name) { + audio::utils::StreamParams p{}; + p.device_index = idx; + p.device_name = name; + return p; +} + +PaDeviceInfo make_device_info(const char* name) { + PaDeviceInfo info{}; + info.name = name; + return info; +} + +// Build a fresh resolver mock and pre-load the "no match by default" behavior the +// resolver tests all want. Returns the unique_ptr so the caller controls lifetime. +std::unique_ptr<::testing::NiceMock> make_resolver_mock() { + auto resolver = std::make_unique<::testing::NiceMock>(); + ON_CALL(*resolver, resolve(::testing::_, ::testing::_)) + .WillByDefault(::testing::Return(std::string{})); + return resolver; +} + +} // namespace + +TEST_F(AudioUtilsTest, ResolveDeviceId_EmptyDeviceIdIsNoOp) { + auto resolver = make_resolver_mock(); + auto params = params_at(7, "old name"); + const auto status = + audio::utils::resolve_device_id_into_params(/*device_id=*/"", params, mock_pa_.get(), "[test]", resolver.get()); + EXPECT_EQ(status, audio::utils::ResolveStatus::NotConfigured); + EXPECT_EQ(params.device_index, 7); + EXPECT_EQ(params.device_name, "old name"); +} + +TEST_F(AudioUtilsTest, ResolveDeviceId_NotFoundLeavesParamsUnchanged) { + using ::testing::Return; + using ::testing::_; + + auto resolver = make_resolver_mock(); + EXPECT_CALL(*mock_pa_, getDeviceCount()).WillRepeatedly(Return(2)); + PaDeviceInfo a = make_device_info("device a"); + PaDeviceInfo b = make_device_info("device b"); + EXPECT_CALL(*mock_pa_, getDeviceInfo(0)).WillRepeatedly(Return(&a)); + EXPECT_CALL(*mock_pa_, getDeviceInfo(1)).WillRepeatedly(Return(&b)); + // Resolver default returns "" for every device, so "looking-for-id" never matches. + + auto params = params_at(0, "device a"); + const auto status = + audio::utils::resolve_device_id_into_params("looking-for-id", params, mock_pa_.get(), "[test]", resolver.get()); + EXPECT_EQ(status, audio::utils::ResolveStatus::NotFound); + EXPECT_EQ(params.device_index, 0); + EXPECT_EQ(params.device_name, "device a"); +} + +TEST_F(AudioUtilsTest, ResolveDeviceId_AtSameIndexIsNoOp) { + using ::testing::Return; + using ::testing::_; + + auto resolver = make_resolver_mock(); + EXPECT_CALL(*mock_pa_, getDeviceCount()).WillRepeatedly(Return(2)); + PaDeviceInfo a = make_device_info("device a"); + PaDeviceInfo b = make_device_info("device b"); + EXPECT_CALL(*mock_pa_, getDeviceInfo(0)).WillRepeatedly(Return(&a)); + EXPECT_CALL(*mock_pa_, getDeviceInfo(1)).WillRepeatedly(Return(&b)); + + // Resolver: index 0 holds "stable-id"; others stay empty (default). + EXPECT_CALL(*resolver, resolve(0, _)).WillRepeatedly(Return(std::string{"stable-id"})); + + auto params = params_at(0, "device a"); + const auto status = + audio::utils::resolve_device_id_into_params("stable-id", params, mock_pa_.get(), "[test]", resolver.get()); + EXPECT_EQ(status, audio::utils::ResolveStatus::Found); + EXPECT_EQ(params.device_index, 0); + EXPECT_EQ(params.device_name, "device a"); +} + +TEST_F(AudioUtilsTest, ResolveDeviceId_MovedUpdatesParams) { + using ::testing::Return; + using ::testing::_; + + auto resolver = make_resolver_mock(); + EXPECT_CALL(*mock_pa_, getDeviceCount()).WillRepeatedly(Return(2)); + PaDeviceInfo a = make_device_info("device a"); + PaDeviceInfo b = make_device_info("device b new path"); + EXPECT_CALL(*mock_pa_, getDeviceInfo(0)).WillRepeatedly(Return(&a)); + EXPECT_CALL(*mock_pa_, getDeviceInfo(1)).WillRepeatedly(Return(&b)); + + ON_CALL(*resolver, resolve(1, _)).WillByDefault(Return(std::string{"moving-id"})); + + auto params = params_at(0, "device a"); + const auto status = + audio::utils::resolve_device_id_into_params("moving-id", params, mock_pa_.get(), "[test]", resolver.get()); + EXPECT_EQ(status, audio::utils::ResolveStatus::Found); + EXPECT_EQ(params.device_index, 1); + EXPECT_EQ(params.device_name, "device b new path"); +} + int main(int argc, char **argv) { ::testing::InitGoogleTest(&argc, argv); ::testing::AddGlobalTestEnvironment(new test_utils::AudioTestEnvironment); diff --git a/test/microphone_test.cpp b/test/microphone_test.cpp index f86f49f..b61bf3f 100644 --- a/test/microphone_test.cpp +++ b/test/microphone_test.cpp @@ -1342,7 +1342,8 @@ TEST_F(MicrophoneTest, RestartStalledStream_IncrementsAttemptsOnFailure) { EXPECT_EQ(mic.audio_context_, original_context); // unchanged on failure } -TEST_F(MicrophoneTest, RestartStalledStream_ThrowsAfterThreeFailures) { + +TEST_F(MicrophoneTest, RestartStalledStream_DoesNotThrowAfterMaxAttempts) { auto config = createConfig(); microphone::Microphone mic(test_deps_, config, mock_pa_.get()); @@ -1357,7 +1358,12 @@ TEST_F(MicrophoneTest, RestartStalledStream_ThrowsAfterThreeFailures) { EXPECT_NO_THROW(mic.restart_stalled_stream(original_context)); // attempt 2 EXPECT_EQ(mic.restart_attempts_, 2); - EXPECT_THROW(mic.restart_stalled_stream(original_context), std::runtime_error); // attempt 3 + EXPECT_NO_THROW(mic.restart_stalled_stream(original_context)); // attempt 3 (hits MAX) + EXPECT_EQ(mic.restart_attempts_, audio::utils::MAX_RESTART_ATTEMPTS); + + // Subsequent failures should also not throw and the counter must stay capped. + EXPECT_NO_THROW(mic.restart_stalled_stream(original_context)); + EXPECT_EQ(mic.restart_attempts_, audio::utils::MAX_RESTART_ATTEMPTS); } TEST_F(MicrophoneTest, RestartStalledStream_ResetsAttemptsOnSuccess) { @@ -1475,6 +1481,81 @@ class AudioCallbackTest : public ::testing::Test { EXPECT_EQ(ctx->get_write_position(), 0); } +// Watchdog: when the microphone callback stops firing, the background watcher should +// detect the staleness on its next poll and replace audio_context_ with a fresh one. +TEST_F(MicrophoneTest, WatchdogRestartsStalledStream) { + auto config = createConfig(testDeviceName, 44100, 1); + Dependencies deps{}; + microphone::Microphone mic(deps, config, mock_pa_.get()); + + // Snapshot the audio context the watcher will see initially. + std::shared_ptr initial_context; + { + std::lock_guard lock(mic.stream_ctx_mu_); + initial_context = mic.audio_context_; + } + ASSERT_TRUE(initial_context); + + // Pretend the callback last fired 5 seconds ago — well past the 2s threshold. + test_utils::mark_callback_stale(initial_context); + + // Wait long enough for the watchdog to wake, see the stale timestamp, + // and run restart_stalled_stream. + test_utils::wait_one_poll(); + + std::shared_ptr after_context; + int after_attempts = -1; + { + std::lock_guard lock(mic.stream_ctx_mu_); + after_context = mic.audio_context_; + after_attempts = mic.restart_attempts_; + } + + EXPECT_NE(after_context.get(), initial_context.get()) + << "watchdog should have replaced audio_context_ after detecting stall"; + EXPECT_EQ(after_attempts, 0) + << "restart should have succeeded with the mock pa returning paNoError"; +} + +// Watchdog: if restart_stream fails (e.g. PortAudio errors), restart_attempts_ should +// climb instead of resetting to 0. The audio_context_ is also NOT swapped because +// restart_stalled_stream only updates it on success. +TEST_F(MicrophoneTest, WatchdogIncrementsAttemptsOnRestartFailure) { + using ::testing::_; + using ::testing::Return; + + auto config = createConfig(testDeviceName, 44100, 1); + Dependencies deps{}; + microphone::Microphone mic(deps, config, mock_pa_.get()); + + std::shared_ptr initial_context; + { + std::lock_guard lock(mic.stream_ctx_mu_); + initial_context = mic.audio_context_; + } + ASSERT_TRUE(initial_context); + + // Make any future startStream call fail. The initial stream is already up; + // this only affects the watcher's restart attempt. + EXPECT_CALL(*mock_pa_, startStream(_)).WillRepeatedly(Return(paInternalError)); + + test_utils::mark_callback_stale(initial_context); + + test_utils::wait_one_poll(); + + std::shared_ptr after_context; + int after_attempts = -1; + { + std::lock_guard lock(mic.stream_ctx_mu_); + after_context = mic.audio_context_; + after_attempts = mic.restart_attempts_; + } + + EXPECT_GE(after_attempts, 1) << "failed restart should have bumped restart_attempts_"; + EXPECT_EQ(after_context.get(), initial_context.get()) + << "audio_context_ should not be swapped on failed restart"; +} + int main(int argc, char **argv) { ::testing::InitGoogleTest(&argc, argv); ::testing::AddGlobalTestEnvironment(new test_utils::AudioTestEnvironment); diff --git a/test/speaker_test.cpp b/test/speaker_test.cpp index 2156be6..6c5e922 100644 --- a/test/speaker_test.cpp +++ b/test/speaker_test.cpp @@ -1,7 +1,10 @@ #include #include #include + +#include #include +#include #include "speaker.hpp" #include "test_utils.hpp" #include "audio_codec.hpp" @@ -1002,6 +1005,103 @@ TEST_F(SpeakerTest, PlayPCM16WithWavHeader) { } +// Watchdog: when the speaker callback stops firing, the background watcher should +// detect the staleness on its next poll and replace audio_context_ with a fresh one. +// We force the stale state by manually setting last_callback_time_ns to a timestamp +// well past STREAM_RESTART_THRESHOLD_MS in the past, then sleep one poll cycle + slack. +TEST_F(SpeakerTest, WatchdogRestartsStalledStream) { + const int sample_rate = 44100; + const int num_channels = 1; + + auto attributes = ProtoStruct{}; + attributes["sample_rate"] = static_cast(sample_rate); + attributes["num_channels"] = static_cast(num_channels); + + ResourceConfig config( + "rdk:component:audioout", "", test_name_, attributes, "", + speaker::Speaker::model, LinkConfig{}, log_level::info); + + Dependencies deps{}; + speaker::Speaker speaker(deps, config, mock_pa_.get()); + + // Snapshot the audio context the watcher will see initially. + std::shared_ptr initial_context; + { + std::lock_guard lock(speaker.stream_mu_); + initial_context = speaker.audio_context_; + } + ASSERT_TRUE(initial_context); + + // Pretend the callback last fired 5 seconds ago — well past the 2s threshold. + test_utils::mark_callback_stale(initial_context); + + // Wait long enough for the watchdog to wake, see the stale timestamp, + // and run restart_stalled_stream. + test_utils::wait_one_poll(); + + std::shared_ptr after_context; + int after_attempts = -1; + { + std::lock_guard lock(speaker.stream_mu_); + after_context = speaker.audio_context_; + after_attempts = speaker.restart_attempts_; + } + + EXPECT_NE(after_context.get(), initial_context.get()) + << "watchdog should have replaced audio_context_ after detecting stall"; + EXPECT_EQ(after_attempts, 0) + << "restart should have succeeded with the mock pa returning paNoError"; +} + +// Watchdog: if restart_stream fails (e.g. PortAudio errors), restart_attempts_ should +// climb instead of resetting to 0. The fresh audio_context_ is also NOT swapped in +// because restart_stalled_stream only updates audio_context_ on success. +TEST_F(SpeakerTest, WatchdogIncrementsAttemptsOnRestartFailure) { + using ::testing::_; + using ::testing::Return; + + const int sample_rate = 44100; + const int num_channels = 1; + + auto attributes = ProtoStruct{}; + attributes["sample_rate"] = static_cast(sample_rate); + attributes["num_channels"] = static_cast(num_channels); + + ResourceConfig config( + "rdk:component:audioout", "", test_name_, attributes, "", + speaker::Speaker::model, LinkConfig{}, log_level::info); + + Dependencies deps{}; + speaker::Speaker speaker(deps, config, mock_pa_.get()); + + std::shared_ptr initial_context; + { + std::lock_guard lock(speaker.stream_mu_); + initial_context = speaker.audio_context_; + } + ASSERT_TRUE(initial_context); + + // Make any future startStream call fail. The initial stream is already up; + // this only affects the watcher's restart attempt. + EXPECT_CALL(*mock_pa_, startStream(_)).WillRepeatedly(Return(paInternalError)); + + test_utils::mark_callback_stale(initial_context); + + test_utils::wait_one_poll(); + + std::shared_ptr after_context; + int after_attempts = -1; + { + std::lock_guard lock(speaker.stream_mu_); + after_context = speaker.audio_context_; + after_attempts = speaker.restart_attempts_; + } + + EXPECT_GE(after_attempts, 1) << "failed restart should have bumped restart_attempts_"; + EXPECT_EQ(after_context.get(), initial_context.get()) + << "audio_context_ should not be swapped on failed restart"; +} + int main(int argc, char **argv) { ::testing::InitGoogleTest(&argc, argv); ::testing::AddGlobalTestEnvironment(new test_utils::AudioTestEnvironment); diff --git a/test/test_utils.hpp b/test/test_utils.hpp index 6e58edf..ff5014d 100644 --- a/test/test_utils.hpp +++ b/test/test_utils.hpp @@ -1,10 +1,16 @@ #pragma once +#include +#include +#include +#include + #include #include #include "../src/audio_stream.hpp" #include "../src/device_id.hpp" #include "../src/portaudio.hpp" +#include "../src/watchdog.hpp" #include "portaudio.h" namespace test_utils { @@ -12,6 +18,32 @@ namespace test_utils { // Common test constants constexpr int DEFAULT_DEVICE_SAMPLE_RATE = 44100; // Device's native/default sample rate in tests +// Steady-clock "now" expressed as nanoseconds since the clock's epoch — same idiom as +// the production code uses for last_callback_time_ns. Useful for tests that need to +// fabricate stale timestamps. +inline uint64_t now_ns() { + return static_cast(std::chrono::steady_clock::now().time_since_epoch().count()); +} + +// Sleeps long enough for the watchdog's poll thread to wake at least once and run its +// staleness check + restart_fn. Use this in tests that set last_callback_time_ns to a +// stale value and then need to assert on the watchdog's reaction. +inline void wait_one_poll() { + std::this_thread::sleep_for(audio::utils::POLL_INTERVAL + std::chrono::milliseconds(150)); +} + +// Marks the given audio context's `last_callback_time_ns` to be `ms_ago` milliseconds +// in the past, so the watchdog will see it as stale on its next poll. Works with any +// type that exposes a std::atomic last_callback_time_ns member — both the +// real AudioBuffer subclasses and the FakeContext used in watchdog_test. +template +inline void mark_callback_stale(const std::shared_ptr& ctx, uint64_t ms_ago = 5000) { + const uint64_t now = now_ns(); + if (ms_ago * audio::NS_PER_MS <= now) { + ctx->last_callback_time_ns.store(now - ms_ago * audio::NS_PER_MS); + } +} + // Shared test environment for audio tests // Manages the viam::sdk::Instance lifecycle for all tests class AudioTestEnvironment : public ::testing::Environment { diff --git a/test/watchdog_test.cpp b/test/watchdog_test.cpp new file mode 100644 index 0000000..674c567 --- /dev/null +++ b/test/watchdog_test.cpp @@ -0,0 +1,132 @@ +#include + +#include +#include +#include +#include +#include + +#include "test_utils.hpp" +#include "watchdog.hpp" + +namespace { + +// Minimal context type that satisfies the watchdog's only requirement on ContextT: +// a `std::atomic last_callback_time_ns` member. Lets us test the watchdog +// without pulling in the full AudioBuffer / stream context machinery. +struct FakeContext { + std::atomic last_callback_time_ns{0}; +}; + +// Helper: build a context whose callback last fired `ms_ago` milliseconds in the past. +std::shared_ptr make_context_stale_by(uint64_t ms_ago) { + auto ctx = std::make_shared(); + test_utils::mark_callback_stale(ctx, ms_ago); + return ctx; +} + +} // namespace + +// 1. Restart fires when the callback is stale and attempts are below the budget. +TEST(StallWatchdog, RestartFiresWhenStale) { + const auto ctx = make_context_stale_by(5000); // 5s past threshold + std::atomic restart_calls{0}; + + audio::utils::StallWatchdog wd( + [ctx]() { return ctx; }, + []() { return 0; }, + [&restart_calls](const std::shared_ptr&) { restart_calls.fetch_add(1); }, + "[wd_test_1]"); + wd.start(); + + test_utils::wait_one_poll(); + wd.stop(); + + EXPECT_GE(restart_calls.load(), 1) << "restart_fn should have been called at least once"; +} + +// Once attempts hit MAX, the backoff gate prevents repeated calls within +// BACKOFF_INTERVAL_MS. The very first poll past MAX still fires (the watchdog needs +// that to discover when the device returns), but subsequent polls within the backoff +// window are skipped. +TEST(StallWatchdog, BackoffGateLimitsRestartsWhenAttemptsExhausted) { + const auto ctx = make_context_stale_by(5000); + std::atomic restart_calls{0}; + + audio::utils::StallWatchdog wd( + [ctx]() { return ctx; }, + []() { return audio::utils::MAX_RESTART_ATTEMPTS; }, // already at the cap + [&restart_calls](const std::shared_ptr&) { restart_calls.fetch_add(1); }, + "[wd_test_2]"); + wd.start(); + + // Wait long enough for several poll cycles (200ms each). With attempts pinned at MAX + // and BACKOFF_INTERVAL_MS = 5000, only the very first retry should fire — every poll + // for the next 5 seconds should hit the backoff gate and skip. + std::this_thread::sleep_for(std::chrono::milliseconds(1500)); + wd.stop(); + + EXPECT_EQ(restart_calls.load(), 1) + << "expected exactly one backoff retry inside the BACKOFF_INTERVAL_MS window; " + << "subsequent polls should have been gated"; +} + +// No restart when last_callback_time_ns == 0 (stream just opened, callback hasn't +// fired yet). The watchdog should treat this as "give it more time", not as stalled. +TEST(StallWatchdog, NoRestartWhenCallbackNeverFired) { + const auto ctx = std::make_shared(); // last_callback_time_ns stays at 0 + std::atomic restart_calls{0}; + + audio::utils::StallWatchdog wd( + [ctx]() { return ctx; }, + []() { return 0; }, + [&restart_calls](const std::shared_ptr&) { restart_calls.fetch_add(1); }, + "[wd_test_4]"); + wd.start(); + + test_utils::wait_one_poll(); + wd.stop(); + + EXPECT_EQ(restart_calls.load(), 0); +} + +// No restart when the callback fired recently (well within threshold). +TEST(StallWatchdog, NoRestartWhenCallbackRecent) { + const auto ctx = make_context_stale_by(100); // 100ms old, threshold is 2000ms + std::atomic restart_calls{0}; + + audio::utils::StallWatchdog wd( + [ctx]() { return ctx; }, + []() { return 0; }, + [&restart_calls](const std::shared_ptr&) { restart_calls.fetch_add(1); }, + "[wd_test_5]"); + wd.start(); + + test_utils::wait_one_poll(); + wd.stop(); + + EXPECT_EQ(restart_calls.load(), 0); +} + +// No restart when get_context returns nullptr (no active stream). +TEST(StallWatchdog, NoRestartWhenContextIsNull) { + std::atomic restart_calls{0}; + + audio::utils::StallWatchdog wd( + []() { return std::shared_ptr{}; }, // always null + []() { return 0; }, + [&restart_calls](const std::shared_ptr&) { restart_calls.fetch_add(1); }, + "[wd_test_6]"); + wd.start(); + + test_utils::wait_one_poll(); + wd.stop(); + + EXPECT_EQ(restart_calls.load(), 0); +} + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + ::testing::AddGlobalTestEnvironment(new test_utils::AudioTestEnvironment); + return RUN_ALL_TESTS(); +} From 1e356e329424acbee7979349d1f64cde1cc0915f Mon Sep 17 00:00:00 2001 From: oliviamiller <106617921+oliviamiller@users.noreply.github.com> Date: Tue, 5 May 2026 11:32:39 -0400 Subject: [PATCH 2/3] lint/clean up logs --- src/audio_utils.hpp | 48 +++++++++++++++++++-------------------- src/microphone.cpp | 16 ++++--------- src/speaker.cpp | 13 ++++------- src/speaker.hpp | 2 +- src/watchdog.hpp | 41 +++++++++++++++------------------ test/audio_utils_test.cpp | 16 ++++++------- test/watchdog_test.cpp | 6 ++--- 7 files changed, 63 insertions(+), 79 deletions(-) diff --git a/src/audio_utils.hpp b/src/audio_utils.hpp index 5fc7643..e14b7b1 100644 --- a/src/audio_utils.hpp +++ b/src/audio_utils.hpp @@ -100,24 +100,24 @@ inline PaDeviceIndex findDeviceById(const std::string& id, return paNoDevice; } -// Result of resolve_device_id_into_params. -enum class ResolveStatus { - NotConfigured, // `device_id` was empty — caller should proceed with cached params - Found, // device_id resolved (params updated in place if the device moved) - NotFound, // device_id was configured but no matching device exists on the system (might be unplugged) -}; - // If `device_id` is non-empty, re-resolves it against the current PortAudio device state // and updates `params.device_index` / `params.device_name` in-place if the device has -// moved (e.g. USB unplug/replug → kernel re-enumerated). Returns a ResolveStatus the -// caller can use to decide whether the restart is worth attempting. -inline ResolveStatus resolve_device_id_into_params(const std::string& device_id, - StreamParams& params, - const audio::portaudio::PortAudioInterface* pa, - const std::string& log_prefix, - const audio::device_id::DeviceIdResolver* resolver = nullptr) { +// moved (e.g. USB unplug/replug → kernel re-enumerated). +// +// Returns true if the caller should proceed with the restart — either device_id was +// not configured (caller falls back to cached params) or the device was found. Returns +// false only when device_id was configured AND the device is currently missing; in that +// case the caller should skip the actual stream open since it would fail. +// +// `pa` and `resolver` default to nullptr; when null, the real PortAudio interface and +// real DeviceIdResolver are used. Tests inject mocks here. +inline bool resolve_device_id_into_params(const std::string& device_id, + StreamParams& params, + const audio::portaudio::PortAudioInterface* pa, + const std::string& log_prefix, + const audio::device_id::DeviceIdResolver* resolver = nullptr) { if (device_id.empty()) { - return ResolveStatus::NotConfigured; + return true; // not configured — proceed with cached params } audio::portaudio::RealPortAudio real_pa; const audio::portaudio::PortAudioInterface& audio_interface = pa ? *pa : real_pa; @@ -125,22 +125,20 @@ inline ResolveStatus resolve_device_id_into_params(const std::string& device_id, const audio::device_id::DeviceIdResolver& resolver_ref = resolver ? *resolver : real_resolver; const PaDeviceIndex resolved = findDeviceById(device_id, audio_interface, resolver_ref); if (resolved == paNoDevice) { - VIAM_SDK_LOG(warn) << log_prefix << " device_id '" << device_id - << "' not found on system; skipping restart"; - return ResolveStatus::NotFound; + VIAM_SDK_LOG(debug) << log_prefix << " device_id '" << device_id << "' not found on system; skipping restart"; + return false; } if (resolved == params.device_index) { - return ResolveStatus::Found; + return true; // already pointing at the right device } const PaDeviceInfo* const new_info = audio_interface.getDeviceInfo(resolved); - VIAM_SDK_LOG(info) << log_prefix << " device_id '" << device_id << "' moved from index " - << params.device_index << " to " << resolved << " (" - << ((new_info && new_info->name) ? new_info->name : "") << "); updating stream params"; + VIAM_SDK_LOG(info) << log_prefix << " device_id '" << device_id << "' moved from index " << params.device_index << " to " << resolved + << " (" << ((new_info && new_info->name) ? new_info->name : "") << "); updating stream params"; params.device_index = resolved; if (new_info && new_info->name) { params.device_name = new_info->name; } - return ResolveStatus::Found; + return true; } inline ConfigParams parseConfigAttributes(const viam::sdk::ResourceConfig& cfg) { @@ -331,8 +329,8 @@ inline void openStream(PaStream*& stream, const StreamParams& params, const audi PaError err = audio_interface.isFormatSupported(inputParams, outputParams, params.sample_rate); if (err != paNoError) { std::ostringstream buffer; - buffer << "Could not open stream — PortAudio: " << Pa_GetErrorText(err) << " — device '" - << params.device_name << "' (index " << params.device_index << "). " + buffer << "Could not open stream — PortAudio: " << Pa_GetErrorText(err) << " — device '" << params.device_name << "' (index " + << params.device_index << "). " << "Requested: sample_rate=" << params.sample_rate << "Hz, " << "channels=" << params.num_channels << ", format=16-bit PCM, " << "latency=" << params.suggested_latency_seconds << "s"; diff --git a/src/microphone.cpp b/src/microphone.cpp index 0a61331..92f8bf4 100644 --- a/src/microphone.cpp +++ b/src/microphone.cpp @@ -94,16 +94,14 @@ void Microphone::restart_stalled_stream(const std::shared_ptrinput_overflow_count.load(); if (overflow_count != last_logged_overflow_count) { VIAM_SDK_LOG(warn) << "[get_audio] Input overflow detected — " << (overflow_count - last_logged_overflow_count) diff --git a/src/speaker.cpp b/src/speaker.cpp index 3cd462e..454fe62 100644 --- a/src/speaker.cpp +++ b/src/speaker.cpp @@ -94,16 +94,14 @@ void Speaker::restart_stalled_stream(const std::shared_ptr last_callback_time_ns` member — both subclasses // inherit one from AudioBuffer. -// -// Lifecycle: construct → start() → (optional) stop() → destruct. start() and stop() are -// not thread-safe; call them from the owning component only. The destructor joins the -// poll thread. template class StallWatchdog { public: @@ -58,7 +52,9 @@ class StallWatchdog { restart_fn_(std::move(restart_fn)), log_prefix_(std::move(log_prefix)) {} - ~StallWatchdog() { stop(); } + ~StallWatchdog() { + stop(); + } // Spins up the poll thread. Call after the owning component has finished // constructing its first stream so the first poll sees valid state. @@ -66,9 +62,6 @@ class StallWatchdog { thread_ = std::thread([this] { loop(); }); } - // Signals stop and joins the poll thread. Safe to call multiple times. - // The destructor calls this automatically; explicit stop() is for callers - // that need to control teardown ordering. void stop() { stop_.store(true); if (thread_.joinable()) { @@ -99,8 +92,8 @@ class StallWatchdog { if (now_ns <= last_cb) { continue; } - const uint64_t stale_ms = (now_ns - last_cb) / audio::NS_PER_MS; - if (stale_ms <= STALL_THRESHOLD_MS) { + const auto stale = std::chrono::milliseconds((now_ns - last_cb) / audio::NS_PER_MS); + if (stale <= STALL_THRESHOLD) { continue; } @@ -109,19 +102,21 @@ class StallWatchdog { // Budget exhausted — back off to slow retries instead of giving up // permanently, so hot-replug (device unplugged then plugged back in) // recovers automatically without a reconfigure. - if (now_ns - last_attempt_ns_.load() < BACKOFF_INTERVAL_MS * audio::NS_PER_MS) { + const auto since_last = std::chrono::milliseconds((now_ns - last_attempt_ns_.load()) / audio::NS_PER_MS); + if (since_last < BACKOFF_INTERVAL) { if (!backoff_logged_.exchange(true)) { - VIAM_SDK_LOG(warn) << log_prefix_ << " Restart budget exhausted; backing off to " - << BACKOFF_INTERVAL_MS / 1000 << "s retries until the device returns"; + const auto backoff_seconds = std::chrono::duration_cast(BACKOFF_INTERVAL).count(); + VIAM_SDK_LOG(warn) << log_prefix_ << " Restart budget exhausted; backing off to " << backoff_seconds + << "s retries until the device returns"; } continue; } - VIAM_SDK_LOG(info) << log_prefix_ << " Backoff retry (attempts=" << attempts << "); checking for device"; + VIAM_SDK_LOG(debug) << log_prefix_ << " Backoff retry (attempts=" << attempts << "); checking for device"; } else { // Attempts is below max — clear the backoff latch so a future exhaustion // logs the "backing off" message again. backoff_logged_.store(false); - VIAM_SDK_LOG(warn) << log_prefix_ << " Callback stale for " << stale_ms << "ms, attempting restart"; + VIAM_SDK_LOG(warn) << log_prefix_ << " Callback stale for " << stale.count() << "ms, attempting restart"; } last_attempt_ns_.store(now_ns); @@ -145,7 +140,7 @@ class StallWatchdog { // (typically after a successful restart resets the counter). std::atomic backoff_logged_{false}; // Wall-clock time of the most recent restart attempt, used to enforce the - // BACKOFF_INTERVAL_MS gap once the attempts budget is exhausted. Initialized to 0 + // BACKOFF_INTERVAL gap once the attempts budget is exhausted. Initialized to 0 // so the very first stale-detection always fires immediately. std::atomic last_attempt_ns_{0}; }; diff --git a/test/audio_utils_test.cpp b/test/audio_utils_test.cpp index f4620f7..85a2bff 100644 --- a/test/audio_utils_test.cpp +++ b/test/audio_utils_test.cpp @@ -498,9 +498,9 @@ std::unique_ptr<::testing::NiceMock> make_reso TEST_F(AudioUtilsTest, ResolveDeviceId_EmptyDeviceIdIsNoOp) { auto resolver = make_resolver_mock(); auto params = params_at(7, "old name"); - const auto status = + const bool proceed = audio::utils::resolve_device_id_into_params(/*device_id=*/"", params, mock_pa_.get(), "[test]", resolver.get()); - EXPECT_EQ(status, audio::utils::ResolveStatus::NotConfigured); + EXPECT_TRUE(proceed) << "empty device_id should proceed with cached params"; EXPECT_EQ(params.device_index, 7); EXPECT_EQ(params.device_name, "old name"); } @@ -518,9 +518,9 @@ TEST_F(AudioUtilsTest, ResolveDeviceId_NotFoundLeavesParamsUnchanged) { // Resolver default returns "" for every device, so "looking-for-id" never matches. auto params = params_at(0, "device a"); - const auto status = + const bool proceed = audio::utils::resolve_device_id_into_params("looking-for-id", params, mock_pa_.get(), "[test]", resolver.get()); - EXPECT_EQ(status, audio::utils::ResolveStatus::NotFound); + EXPECT_FALSE(proceed) << "missing device should signal skip"; EXPECT_EQ(params.device_index, 0); EXPECT_EQ(params.device_name, "device a"); } @@ -540,9 +540,9 @@ TEST_F(AudioUtilsTest, ResolveDeviceId_AtSameIndexIsNoOp) { EXPECT_CALL(*resolver, resolve(0, _)).WillRepeatedly(Return(std::string{"stable-id"})); auto params = params_at(0, "device a"); - const auto status = + const bool proceed = audio::utils::resolve_device_id_into_params("stable-id", params, mock_pa_.get(), "[test]", resolver.get()); - EXPECT_EQ(status, audio::utils::ResolveStatus::Found); + EXPECT_TRUE(proceed) << "device found at the cached index should proceed"; EXPECT_EQ(params.device_index, 0); EXPECT_EQ(params.device_name, "device a"); } @@ -561,9 +561,9 @@ TEST_F(AudioUtilsTest, ResolveDeviceId_MovedUpdatesParams) { ON_CALL(*resolver, resolve(1, _)).WillByDefault(Return(std::string{"moving-id"})); auto params = params_at(0, "device a"); - const auto status = + const bool proceed = audio::utils::resolve_device_id_into_params("moving-id", params, mock_pa_.get(), "[test]", resolver.get()); - EXPECT_EQ(status, audio::utils::ResolveStatus::Found); + EXPECT_TRUE(proceed) << "device found at a new index should proceed"; EXPECT_EQ(params.device_index, 1); EXPECT_EQ(params.device_name, "device b new path"); } diff --git a/test/watchdog_test.cpp b/test/watchdog_test.cpp index 674c567..04d38f0 100644 --- a/test/watchdog_test.cpp +++ b/test/watchdog_test.cpp @@ -46,7 +46,7 @@ TEST(StallWatchdog, RestartFiresWhenStale) { } // Once attempts hit MAX, the backoff gate prevents repeated calls within -// BACKOFF_INTERVAL_MS. The very first poll past MAX still fires (the watchdog needs +// BACKOFF_INTERVAL. The very first poll past MAX still fires (the watchdog needs // that to discover when the device returns), but subsequent polls within the backoff // window are skipped. TEST(StallWatchdog, BackoffGateLimitsRestartsWhenAttemptsExhausted) { @@ -61,13 +61,13 @@ TEST(StallWatchdog, BackoffGateLimitsRestartsWhenAttemptsExhausted) { wd.start(); // Wait long enough for several poll cycles (200ms each). With attempts pinned at MAX - // and BACKOFF_INTERVAL_MS = 5000, only the very first retry should fire — every poll + // and BACKOFF_INTERVAL = 5000, only the very first retry should fire — every poll // for the next 5 seconds should hit the backoff gate and skip. std::this_thread::sleep_for(std::chrono::milliseconds(1500)); wd.stop(); EXPECT_EQ(restart_calls.load(), 1) - << "expected exactly one backoff retry inside the BACKOFF_INTERVAL_MS window; " + << "expected exactly one backoff retry inside the BACKOFF_INTERVAL window; " << "subsequent polls should have been gated"; } From e07fec232c9010101c67fada28bfa1ed435eb74a Mon Sep 17 00:00:00 2001 From: oliviamiller <106617921+oliviamiller@users.noreply.github.com> Date: Tue, 5 May 2026 11:35:35 -0400 Subject: [PATCH 3/3] fix log --- src/watchdog.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/watchdog.hpp b/src/watchdog.hpp index 35c1d39..fd852cc 100644 --- a/src/watchdog.hpp +++ b/src/watchdog.hpp @@ -111,7 +111,7 @@ class StallWatchdog { } continue; } - VIAM_SDK_LOG(debug) << log_prefix_ << " Backoff retry (attempts=" << attempts << "); checking for device"; + VIAM_SDK_LOG(debug) << log_prefix_ << " checking for device"; } else { // Attempts is below max — clear the backoff latch so a future exhaustion // logs the "backing off" message again.