diff --git a/src/audio_buffer.hpp b/src/audio_buffer.hpp index 50cd40c..3e1b840 100644 --- a/src/audio_buffer.hpp +++ b/src/audio_buffer.hpp @@ -14,6 +14,9 @@ namespace vsdk = ::viam::sdk; constexpr int BUFFER_DURATION_SECONDS = 30; // How much audio history to keep in buffer +// Unit-conversion constant: nanoseconds per millisecond. +constexpr uint64_t NS_PER_MS = 1'000'000; + // Base class for audio buffering - lock-free circular buffer with atomic operations // Can be used by both input (microphone) and output (speaker) models. // There is a 1:1 correspondence between AudioBuffer and viam audio resource diff --git a/src/audio_utils.hpp b/src/audio_utils.hpp index 77c748e..e14b7b1 100644 --- a/src/audio_utils.hpp +++ b/src/audio_utils.hpp @@ -100,6 +100,47 @@ inline PaDeviceIndex findDeviceById(const std::string& id, return paNoDevice; } +// If `device_id` is non-empty, re-resolves it against the current PortAudio device state +// and updates `params.device_index` / `params.device_name` in-place if the device has +// moved (e.g. USB unplug/replug → kernel re-enumerated). +// +// Returns true if the caller should proceed with the restart — either device_id was +// not configured (caller falls back to cached params) or the device was found. Returns +// false only when device_id was configured AND the device is currently missing; in that +// case the caller should skip the actual stream open since it would fail. +// +// `pa` and `resolver` default to nullptr; when null, the real PortAudio interface and +// real DeviceIdResolver are used. Tests inject mocks here. +inline bool resolve_device_id_into_params(const std::string& device_id, + StreamParams& params, + const audio::portaudio::PortAudioInterface* pa, + const std::string& log_prefix, + const audio::device_id::DeviceIdResolver* resolver = nullptr) { + if (device_id.empty()) { + return true; // not configured — proceed with cached params + } + audio::portaudio::RealPortAudio real_pa; + const audio::portaudio::PortAudioInterface& audio_interface = pa ? *pa : real_pa; + audio::device_id::RealDeviceIdResolver real_resolver; + const audio::device_id::DeviceIdResolver& resolver_ref = resolver ? *resolver : real_resolver; + const PaDeviceIndex resolved = findDeviceById(device_id, audio_interface, resolver_ref); + if (resolved == paNoDevice) { + VIAM_SDK_LOG(debug) << log_prefix << " device_id '" << device_id << "' not found on system; skipping restart"; + return false; + } + if (resolved == params.device_index) { + return true; // already pointing at the right device + } + const PaDeviceInfo* const new_info = audio_interface.getDeviceInfo(resolved); + VIAM_SDK_LOG(info) << log_prefix << " device_id '" << device_id << "' moved from index " << params.device_index << " to " << resolved + << " (" << ((new_info && new_info->name) ? new_info->name : "") << "); updating stream params"; + params.device_index = resolved; + if (new_info && new_info->name) { + params.device_name = new_info->name; + } + return true; +} + inline ConfigParams parseConfigAttributes(const viam::sdk::ResourceConfig& cfg) { const auto attrs = cfg.attributes(); ConfigParams params; @@ -288,13 +329,11 @@ inline void openStream(PaStream*& stream, const StreamParams& params, const audi PaError err = audio_interface.isFormatSupported(inputParams, outputParams, params.sample_rate); if (err != paNoError) { std::ostringstream buffer; - buffer << "Audio format not supported by device '" << params.device_name << "' (index " << params.device_index - << "): " << Pa_GetErrorText(err) << "\n" - << "Requested configuration:\n" - << " - Sample rate: " << params.sample_rate << " Hz\n" - << " - Channels: " << params.num_channels << "\n" - << " - Format: 16-bit PCM\n" - << " - Latency: " << params.suggested_latency_seconds << " seconds"; + buffer << "Could not open stream — PortAudio: " << Pa_GetErrorText(err) << " — device '" << params.device_name << "' (index " + << params.device_index << "). " + << "Requested: sample_rate=" << params.sample_rate << "Hz, " + << "channels=" << params.num_channels << ", format=16-bit PCM, " + << "latency=" << params.suggested_latency_seconds << "s"; VIAM_SDK_LOG(error) << buffer.str(); throw std::runtime_error(buffer.str()); } @@ -463,7 +502,7 @@ inline void log_callback_staleness(const std::atomic& last_callback_ti const uint64_t last_cb = last_callback_time_ns.load(); if (last_cb > 0) { const uint64_t now_ns = static_cast(std::chrono::steady_clock::now().time_since_epoch().count()); - const uint64_t elapsed_ms = (now_ns - last_cb) / 1'000'000; + const uint64_t elapsed_ms = (now_ns - last_cb) / audio::NS_PER_MS; if (elapsed_ms > STREAM_RESTART_THRESHOLD_MS && now_ns - last_log_ns > STALENESS_LOG_THROTTLE_NS) { last_log_ns = now_ns; if (!stream) { diff --git a/src/microphone.cpp b/src/microphone.cpp index a015c86..92f8bf4 100644 --- a/src/microphone.cpp +++ b/src/microphone.cpp @@ -94,13 +94,25 @@ void Microphone::restart_stalled_stream(const std::shared_ptr= MAX_STREAM_RESTART_ATTEMPTS) { - VIAM_SDK_LOG(error) << "[get_audio] Failed to restart stream after " << MAX_STREAM_RESTART_ATTEMPTS - << " attempts, giving up: " << e.what(); - throw; + if (restart_attempts_ < audio::utils::MAX_RESTART_ATTEMPTS) { + ++restart_attempts_; } - VIAM_SDK_LOG(error) << "[get_audio] Failed to restart stream (attempt " << restart_attempts_ << "/" << MAX_STREAM_RESTART_ATTEMPTS - << "): " << e.what(); + VIAM_SDK_LOG(error) << "[microphone stall_watcher] Failed to restart stream (attempt " << restart_attempts_ << "/" + << audio::utils::MAX_RESTART_ATTEMPTS << "): " << e.what(); } } @@ -193,6 +202,7 @@ Microphone::Microphone(viam::sdk::Dependencies deps, viam::sdk::ResourceConfig c std::lock_guard lock(stream_ctx_mu_); stream_params_ = setup.stream_params; stream_params_.user_data = setup.audio_context.get(); + device_id_ = setup.config_params.device_id; audio::utils::restart_stream(stream_, stream_params_, pa_); latency_ = audio::utils::get_stream_latency(stream_, stream_params_, pa_); audio_context_ = setup.audio_context; @@ -200,10 +210,28 @@ Microphone::Microphone(viam::sdk::Dependencies deps, viam::sdk::ResourceConfig c setup.config_params.sample_rate.value_or(setup.stream_params.sample_rate); // User's requested rate, defaults to device rate historical_throttle_ms_ = setup.config_params.historical_throttle_ms.value_or(DEFAULT_HISTORICAL_THROTTLE_MS); } + + watchdog_ = std::make_unique>( + [this]() { + std::lock_guard lock(stream_ctx_mu_); + return audio_context_; + }, + [this]() { + std::lock_guard lock(stream_ctx_mu_); + return restart_attempts_; + }, + [this](const std::shared_ptr& ctx) { restart_stalled_stream(ctx); }, + "[microphone stall_watcher]"); + watchdog_->start(); } Microphone::~Microphone() { VIAM_SDK_LOG(debug) << "[Microphone::~Microphone] Destructor called"; + // Stop and join the watchdog before tearing down the stream so it can't touch a + // half-destroyed audio_context_. + if (watchdog_) { + watchdog_->stop(); + } if (stream_) { PaError err = Pa_StopStream(stream_); if (err != paNoError) { @@ -321,9 +349,11 @@ void Microphone::reconfigure(const viam::sdk::Dependencies& deps, const viam::sd stream_params_ = setup.stream_params; stream_params_.user_data = setup.audio_context.get(); + device_id_ = setup.config_params.device_id; audio::utils::restart_stream(stream_, stream_params_, pa_); latency_ = audio::utils::get_stream_latency(stream_, stream_params_, pa_); audio_context_ = setup.audio_context; + restart_attempts_ = 0; requested_sample_rate_ = setup.config_params.sample_rate.value_or( setup.stream_params.sample_rate); // User's requested rate, defaults to device rate historical_throttle_ms_ = setup.config_params.historical_throttle_ms.value_or(DEFAULT_HISTORICAL_THROTTLE_MS); @@ -444,19 +474,6 @@ void Microphone::get_audio(std::string const& codec, // Wait until we have a full chunk worth of samples if (available_samples < device_samples_per_chunk) { - audio::utils::log_callback_staleness( - stream_context->last_callback_time_ns, "[get_audio]", current_stream, last_staleness_log_ns); - - const uint64_t last_cb = stream_context->last_callback_time_ns.load(); - if (last_cb > 0) { - const uint64_t now_ns = static_cast(std::chrono::steady_clock::now().time_since_epoch().count()); - const uint64_t stale_ms = (now_ns - last_cb) / 1'000'000; - if (stale_ms > audio::utils::STREAM_RESTART_THRESHOLD_MS) { - VIAM_SDK_LOG(warn) << "[get_audio] Stream stalled for " << stale_ms << "ms, attempting restart"; - restart_stalled_stream(stream_context); - } - } - const uint64_t overflow_count = stream_context->input_overflow_count.load(); if (overflow_count != last_logged_overflow_count) { VIAM_SDK_LOG(warn) << "[get_audio] Input overflow detected — " << (overflow_count - last_logged_overflow_count) diff --git a/src/microphone.hpp b/src/microphone.hpp index a5cce9d..3ec3a7b 100644 --- a/src/microphone.hpp +++ b/src/microphone.hpp @@ -15,12 +15,12 @@ #include "audio_utils.hpp" #include "portaudio.h" #include "portaudio.hpp" +#include "watchdog.hpp" namespace microphone { namespace vsdk = ::viam::sdk; constexpr double DEFAULT_HISTORICAL_THROTTLE_MS = 50; -constexpr int MAX_STREAM_RESTART_ATTEMPTS = 3; PaDeviceIndex findDeviceByName(const std::string& name, const audio::portaudio::PortAudioInterface& pa); // Calculates the initial read position from a previous timestamp @@ -84,6 +84,15 @@ class Microphone final : public viam::sdk::AudioIn, public viam::sdk::Reconfigur int restart_attempts_; audio::utils::StreamParams stream_params_; + + // Device id from the resource config (empty if user configured by device_name or + // system default). Used by restart_stalled_stream to re-resolve the device's current + // PortAudio index, so we recover from kernel re-enumeration (e.g. USB unplug/replug). + std::string device_id_; + + // Background watchdog that polls audio_context_->last_callback_time_ns and triggers + // restart_stalled_stream when the mic callback has gone silent for too long. + std::unique_ptr> watchdog_; }; /** diff --git a/src/speaker.cpp b/src/speaker.cpp index 3fbb029..454fe62 100644 --- a/src/speaker.cpp +++ b/src/speaker.cpp @@ -27,21 +27,41 @@ Speaker::Speaker(viam::sdk::Dependencies deps, viam::sdk::ResourceConfig cfg, au // Set new configuration and start stream under lock { std::lock_guard lock(stream_mu_); - device_name_ = setup.stream_params.device_name; sample_rate_ = setup.stream_params.sample_rate; num_channels_ = setup.stream_params.num_channels; audio_context_ = setup.audio_context; setup.stream_params.user_data = setup.audio_context.get(); - audio::utils::restart_stream(stream_, setup.stream_params, pa_); - latency_ = audio::utils::get_stream_latency(stream_, setup.stream_params, pa_); + stream_params_ = setup.stream_params; + device_id_ = setup.config_params.device_id; + audio::utils::restart_stream(stream_, stream_params_, pa_); + latency_ = audio::utils::get_stream_latency(stream_, stream_params_, pa_); volume_ = setup.config_params.volume; if (volume_) { - audio::volume::set_volume(device_name_, *volume_); + audio::volume::set_volume(stream_params_.device_name, *volume_); } } + + watchdog_ = std::make_unique>( + [this]() { + std::lock_guard lock(stream_mu_); + return audio_context_; + }, + [this]() { + std::lock_guard lock(stream_mu_); + return restart_attempts_; + }, + [this](const std::shared_ptr& ctx) { restart_stalled_stream(ctx); }, + "[speaker stall_watcher]"); + watchdog_->start(); } Speaker::~Speaker() { + // Stop and join the watchdog before tearing down the stream so it can't touch a + // half-destroyed audio_context_. + if (watchdog_) { + watchdog_->stop(); + } + if (stream_) { PaError err = Pa_StopStream(stream_); if (err != paNoError) { @@ -57,6 +77,65 @@ Speaker::~Speaker() { vsdk::Model Speaker::model = {"viam", "system-audio", "speaker"}; +/** + * Tear down the existing stream and bring up a fresh one with the saved params. + * + * Bails out if `playback_context` is no longer the active audio_context_ — that means + * either reconfigure() or another stall recovery already replaced it, so the in-flight + * play() call (if any) is about to exit on its own and we shouldn't race. + * + * On a successful restart, any unplayed audio in the old buffer is discarded — same + * behavior as reconfigure(). The in-flight play() loop will see audio_context_ change + * and return early. + */ +void Speaker::restart_stalled_stream(const std::shared_ptr& playback_context) { + std::lock_guard lock(stream_mu_); + if (playback_context != audio_context_) { + return; + } + + VIAM_SDK_LOG(debug) << "[speaker stall_watcher] Restarting stalled speaker stream"; + + // If device_id was configured, re-resolve before restarting in case the kernel + // re-enumerated and the cached device_index is stale (e.g. USB unplug/replug). + // When the device is missing, skip the actual stream open — PortAudio would just + // spam ALSA errors. Bump attempts so the watchdog enters backoff; once the device + // returns, a backoff retry will resolve and proceed. + if (!audio::utils::resolve_device_id_into_params(device_id_, stream_params_, pa_, "[speaker stall_watcher]")) { + if (restart_attempts_ < audio::utils::MAX_RESTART_ATTEMPTS) { + ++restart_attempts_; + } + return; + } + + if (stream_) { + try { + audio::utils::abort_stream(stream_, pa_); + } catch (const std::exception& e) { + VIAM_SDK_LOG(error) << "[speaker stall_watcher] Error shutting down stalled stream: " << e.what(); + } + stream_ = nullptr; + } + + const viam::sdk::audio_info info{viam::sdk::audio_codecs::PCM_16, stream_params_.sample_rate, stream_params_.num_channels}; + const auto new_context = std::make_shared(info, audio::BUFFER_DURATION_SECONDS); + + try { + stream_params_.user_data = new_context.get(); + audio::utils::restart_stream(stream_, stream_params_, pa_); + latency_ = audio::utils::get_stream_latency(stream_, stream_params_, pa_); + audio_context_ = new_context; + restart_attempts_ = 0; + VIAM_SDK_LOG(info) << "[speaker stall_watcher] Speaker stream restarted successfully"; + } catch (const std::exception& e) { + if (restart_attempts_ < audio::utils::MAX_RESTART_ATTEMPTS) { + ++restart_attempts_; + } + VIAM_SDK_LOG(error) << "[speaker stall_watcher] Failed to restart stream (attempt " << restart_attempts_ << "/" + << audio::utils::MAX_RESTART_ATTEMPTS << "): " << e.what(); + } +} + /** * PortAudio callback function - runs on real-time audio thread. * This function must not: @@ -182,7 +261,7 @@ viam::sdk::ProtoStruct Speaker::do_command(const viam::sdk::ProtoStruct& command } std::lock_guard lock(stream_mu_); - audio::volume::set_volume(device_name_, vol); + audio::volume::set_volume(stream_params_.device_name, vol); volume_ = vol; return viam::sdk::ProtoStruct{{"volume", static_cast(vol)}}; @@ -453,15 +532,17 @@ void Speaker::reconfigure(const vsdk::Dependencies& deps, const vsdk::ResourceCo // Otherwise the callback thread may still be accessing the old context // after we destroy it (heap-use-after-free) setup.stream_params.user_data = setup.audio_context.get(); - audio::utils::restart_stream(stream_, setup.stream_params, pa_); - device_name_ = setup.stream_params.device_name; + stream_params_ = setup.stream_params; + audio::utils::restart_stream(stream_, stream_params_, pa_); sample_rate_ = setup.stream_params.sample_rate; num_channels_ = setup.stream_params.num_channels; - latency_ = audio::utils::get_stream_latency(stream_, setup.stream_params, pa_); + latency_ = audio::utils::get_stream_latency(stream_, stream_params_, pa_); audio_context_ = setup.audio_context; + device_id_ = setup.config_params.device_id; + restart_attempts_ = 0; volume_ = setup.config_params.volume; if (volume_) { - audio::volume::set_volume(device_name_, *volume_); + audio::volume::set_volume(stream_params_.device_name, *volume_); } } VIAM_SDK_LOG(info) << "[reconfigure] Reconfigure completed successfully"; diff --git a/src/speaker.hpp b/src/speaker.hpp index d7dd080..700fe29 100644 --- a/src/speaker.hpp +++ b/src/speaker.hpp @@ -5,14 +5,17 @@ #include #include #include +#include #include #include #include #include #include #include "audio_stream.hpp" +#include "audio_utils.hpp" #include "portaudio.h" #include "portaudio.hpp" +#include "watchdog.hpp" namespace speaker { namespace vsdk = ::viam::sdk; @@ -57,7 +60,6 @@ class Speaker final : public viam::sdk::AudioOut, public viam::sdk::Reconfigurab void reconfigure(const viam::sdk::Dependencies& deps, const viam::sdk::ResourceConfig& cfg); // Member variables - std::string device_name_; double latency_; int sample_rate_; int num_channels_; @@ -78,6 +80,28 @@ class Speaker final : public viam::sdk::AudioOut, public viam::sdk::Reconfigurab // Flag to interrupt playback std::atomic stop_requested_{false}; + + // Saved stream params so the watchdog can rebuild the stream with the same configuration. + audio::utils::StreamParams stream_params_; + + // Device id from the resource config (empty if user configured by device_name or + // system default). Used by restart_stalled_stream to re-resolve the device's current + // PortAudio index, so we recover from kernel re-enumeration (e.g. USB unplug/replug). + std::string device_id_; + + // Counts consecutive failed restart attempts; reset to 0 after a successful restart + // or a reconfigure(). Once it reaches audio::utils::MAX_RESTART_ATTEMPTS, the watchdog + // backs off to slow retries (audio::utils::BACKOFF_INTERVAL) instead of polling + // every audio::utils::POLL_INTERVAL — supports hot-replug recovery without spamming + // the kernel. The counter is capped at MAX so it doesn't grow unbounded. + int restart_attempts_ = 0; + + // Background watchdog that polls audio_context_->last_callback_time_ns and triggers + // restart_stalled_stream when the speaker callback has gone silent for too long, + std::unique_ptr> watchdog_; + + private: + void restart_stalled_stream(const std::shared_ptr& playback_context); }; } // namespace speaker diff --git a/src/watchdog.hpp b/src/watchdog.hpp new file mode 100644 index 0000000..fd852cc --- /dev/null +++ b/src/watchdog.hpp @@ -0,0 +1,149 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +namespace audio { +namespace utils { + +constexpr int MAX_RESTART_ATTEMPTS = 3; +constexpr std::chrono::milliseconds POLL_INTERVAL{200}; +constexpr std::chrono::milliseconds STALL_THRESHOLD{2000}; +// Once the attempts budget is exhausted, the watchdog stops attempting fast restarts and +// instead retries every BACKOFF_INTERVAL. This supports hot-replug scenarios: the device +// may come back later (USB plug-in, driver recovery) and we want to resume recovery +// without forcing the user to reconfigure. +constexpr std::chrono::milliseconds BACKOFF_INTERVAL{2000}; + +// Background watchdog that polls an audio component's `last_callback_time_ns` and +// triggers a restart when the callback has gone silent for too long. Used by both +// Speaker and Microphone — the component-specific bits are passed in as callbacks. +// +// ContextT is the concrete audio context type (InputStreamContext / OutputStreamContext). +// It must expose a `std::atomic last_callback_time_ns` member — both subclasses +// inherit one from AudioBuffer. +template +class StallWatchdog { + public: + // Returns the currently active audio context, or nullptr if no stream is up. + using GetContextFn = std::function()>; + + // Returns the current consecutive-failed-restart count. Watchdog stops calling + // restart_fn once this reaches MAX_RESTART_ATTEMPTS; the latch clears automatically + // once the count drops back below max (e.g. after a reconfigure). + using GetAttemptsFn = std::function; + + // Performs the restart. Argument is the same context get_context returned a moment + // earlier — the component should bail if it has since been swapped (reconfigure / + // peer restart). + using RestartFn = std::function&)>; + + StallWatchdog(GetContextFn get_context, GetAttemptsFn get_attempts, RestartFn restart_fn, std::string log_prefix) + : get_context_(std::move(get_context)), + get_attempts_(std::move(get_attempts)), + restart_fn_(std::move(restart_fn)), + log_prefix_(std::move(log_prefix)) {} + + ~StallWatchdog() { + stop(); + } + + // Spins up the poll thread. Call after the owning component has finished + // constructing its first stream so the first poll sees valid state. + void start() { + thread_ = std::thread([this] { loop(); }); + } + + void stop() { + stop_.store(true); + if (thread_.joinable()) { + thread_.join(); + } + } + + private: + void loop() { + while (!stop_.load()) { + std::this_thread::sleep_for(POLL_INTERVAL); + if (stop_.load()) { + return; + } + + const std::shared_ptr ctx = get_context_(); + if (!ctx) { + continue; + } + + const uint64_t last_cb = ctx->last_callback_time_ns.load(); + if (last_cb == 0) { + // Callback hasn't fired yet — stream just opened, give it time. + continue; + } + + const uint64_t now_ns = static_cast(std::chrono::steady_clock::now().time_since_epoch().count()); + if (now_ns <= last_cb) { + continue; + } + const auto stale = std::chrono::milliseconds((now_ns - last_cb) / audio::NS_PER_MS); + if (stale <= STALL_THRESHOLD) { + continue; + } + + const int attempts = get_attempts_(); + if (attempts >= MAX_RESTART_ATTEMPTS) { + // Budget exhausted — back off to slow retries instead of giving up + // permanently, so hot-replug (device unplugged then plugged back in) + // recovers automatically without a reconfigure. + const auto since_last = std::chrono::milliseconds((now_ns - last_attempt_ns_.load()) / audio::NS_PER_MS); + if (since_last < BACKOFF_INTERVAL) { + if (!backoff_logged_.exchange(true)) { + const auto backoff_seconds = std::chrono::duration_cast(BACKOFF_INTERVAL).count(); + VIAM_SDK_LOG(warn) << log_prefix_ << " Restart budget exhausted; backing off to " << backoff_seconds + << "s retries until the device returns"; + } + continue; + } + VIAM_SDK_LOG(debug) << log_prefix_ << " checking for device"; + } else { + // Attempts is below max — clear the backoff latch so a future exhaustion + // logs the "backing off" message again. + backoff_logged_.store(false); + VIAM_SDK_LOG(warn) << log_prefix_ << " Callback stale for " << stale.count() << "ms, attempting restart"; + } + + last_attempt_ns_.store(now_ns); + try { + restart_fn_(ctx); + } catch (const std::exception& e) { + VIAM_SDK_LOG(error) << log_prefix_ << " restart_fn threw: " << e.what(); + } + } + } + + GetContextFn get_context_; + GetAttemptsFn get_attempts_; + RestartFn restart_fn_; + std::string log_prefix_; + std::thread thread_; + std::atomic stop_{false}; + // Latches once the watchdog has logged the "backing off" message after the attempts + // budget was exhausted, so we don't spam the log every poll while waiting for backoff. + // Cleared automatically once get_attempts drops back below MAX_RESTART_ATTEMPTS + // (typically after a successful restart resets the counter). + std::atomic backoff_logged_{false}; + // Wall-clock time of the most recent restart attempt, used to enforce the + // BACKOFF_INTERVAL gap once the attempts budget is exhausted. Initialized to 0 + // so the very first stale-detection always fires immediately. + std::atomic last_attempt_ns_{0}; +}; + +} // namespace utils +} // namespace audio diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 8c3aa32..cc60d76 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -83,3 +83,4 @@ audio_add_gtest(resample_test.cpp) audio_add_gtest(audio_codec_test.cpp) audio_add_gtest(file_utils_test.cpp) audio_add_gtest(routing_filter_test.cpp) +audio_add_gtest(watchdog_test.cpp) diff --git a/test/audio_utils_test.cpp b/test/audio_utils_test.cpp index 0bb97a9..85a2bff 100644 --- a/test/audio_utils_test.cpp +++ b/test/audio_utils_test.cpp @@ -466,6 +466,108 @@ TEST_F(AudioUtilsTest, AbortStream_ThrowsOnCloseFailure) { EXPECT_THROW(audio::utils::abort_stream(nullptr, mock_pa_.get()), std::runtime_error); } +// --- resolve_device_id_into_params ------------------------------------------------- + +// File-scope helpers shared by the four resolver tests below. +namespace { + +audio::utils::StreamParams params_at(PaDeviceIndex idx, const std::string& name) { + audio::utils::StreamParams p{}; + p.device_index = idx; + p.device_name = name; + return p; +} + +PaDeviceInfo make_device_info(const char* name) { + PaDeviceInfo info{}; + info.name = name; + return info; +} + +// Build a fresh resolver mock and pre-load the "no match by default" behavior the +// resolver tests all want. Returns the unique_ptr so the caller controls lifetime. +std::unique_ptr<::testing::NiceMock> make_resolver_mock() { + auto resolver = std::make_unique<::testing::NiceMock>(); + ON_CALL(*resolver, resolve(::testing::_, ::testing::_)) + .WillByDefault(::testing::Return(std::string{})); + return resolver; +} + +} // namespace + +TEST_F(AudioUtilsTest, ResolveDeviceId_EmptyDeviceIdIsNoOp) { + auto resolver = make_resolver_mock(); + auto params = params_at(7, "old name"); + const bool proceed = + audio::utils::resolve_device_id_into_params(/*device_id=*/"", params, mock_pa_.get(), "[test]", resolver.get()); + EXPECT_TRUE(proceed) << "empty device_id should proceed with cached params"; + EXPECT_EQ(params.device_index, 7); + EXPECT_EQ(params.device_name, "old name"); +} + +TEST_F(AudioUtilsTest, ResolveDeviceId_NotFoundLeavesParamsUnchanged) { + using ::testing::Return; + using ::testing::_; + + auto resolver = make_resolver_mock(); + EXPECT_CALL(*mock_pa_, getDeviceCount()).WillRepeatedly(Return(2)); + PaDeviceInfo a = make_device_info("device a"); + PaDeviceInfo b = make_device_info("device b"); + EXPECT_CALL(*mock_pa_, getDeviceInfo(0)).WillRepeatedly(Return(&a)); + EXPECT_CALL(*mock_pa_, getDeviceInfo(1)).WillRepeatedly(Return(&b)); + // Resolver default returns "" for every device, so "looking-for-id" never matches. + + auto params = params_at(0, "device a"); + const bool proceed = + audio::utils::resolve_device_id_into_params("looking-for-id", params, mock_pa_.get(), "[test]", resolver.get()); + EXPECT_FALSE(proceed) << "missing device should signal skip"; + EXPECT_EQ(params.device_index, 0); + EXPECT_EQ(params.device_name, "device a"); +} + +TEST_F(AudioUtilsTest, ResolveDeviceId_AtSameIndexIsNoOp) { + using ::testing::Return; + using ::testing::_; + + auto resolver = make_resolver_mock(); + EXPECT_CALL(*mock_pa_, getDeviceCount()).WillRepeatedly(Return(2)); + PaDeviceInfo a = make_device_info("device a"); + PaDeviceInfo b = make_device_info("device b"); + EXPECT_CALL(*mock_pa_, getDeviceInfo(0)).WillRepeatedly(Return(&a)); + EXPECT_CALL(*mock_pa_, getDeviceInfo(1)).WillRepeatedly(Return(&b)); + + // Resolver: index 0 holds "stable-id"; others stay empty (default). + EXPECT_CALL(*resolver, resolve(0, _)).WillRepeatedly(Return(std::string{"stable-id"})); + + auto params = params_at(0, "device a"); + const bool proceed = + audio::utils::resolve_device_id_into_params("stable-id", params, mock_pa_.get(), "[test]", resolver.get()); + EXPECT_TRUE(proceed) << "device found at the cached index should proceed"; + EXPECT_EQ(params.device_index, 0); + EXPECT_EQ(params.device_name, "device a"); +} + +TEST_F(AudioUtilsTest, ResolveDeviceId_MovedUpdatesParams) { + using ::testing::Return; + using ::testing::_; + + auto resolver = make_resolver_mock(); + EXPECT_CALL(*mock_pa_, getDeviceCount()).WillRepeatedly(Return(2)); + PaDeviceInfo a = make_device_info("device a"); + PaDeviceInfo b = make_device_info("device b new path"); + EXPECT_CALL(*mock_pa_, getDeviceInfo(0)).WillRepeatedly(Return(&a)); + EXPECT_CALL(*mock_pa_, getDeviceInfo(1)).WillRepeatedly(Return(&b)); + + ON_CALL(*resolver, resolve(1, _)).WillByDefault(Return(std::string{"moving-id"})); + + auto params = params_at(0, "device a"); + const bool proceed = + audio::utils::resolve_device_id_into_params("moving-id", params, mock_pa_.get(), "[test]", resolver.get()); + EXPECT_TRUE(proceed) << "device found at a new index should proceed"; + EXPECT_EQ(params.device_index, 1); + EXPECT_EQ(params.device_name, "device b new path"); +} + int main(int argc, char **argv) { ::testing::InitGoogleTest(&argc, argv); ::testing::AddGlobalTestEnvironment(new test_utils::AudioTestEnvironment); diff --git a/test/microphone_test.cpp b/test/microphone_test.cpp index f86f49f..b61bf3f 100644 --- a/test/microphone_test.cpp +++ b/test/microphone_test.cpp @@ -1342,7 +1342,8 @@ TEST_F(MicrophoneTest, RestartStalledStream_IncrementsAttemptsOnFailure) { EXPECT_EQ(mic.audio_context_, original_context); // unchanged on failure } -TEST_F(MicrophoneTest, RestartStalledStream_ThrowsAfterThreeFailures) { + +TEST_F(MicrophoneTest, RestartStalledStream_DoesNotThrowAfterMaxAttempts) { auto config = createConfig(); microphone::Microphone mic(test_deps_, config, mock_pa_.get()); @@ -1357,7 +1358,12 @@ TEST_F(MicrophoneTest, RestartStalledStream_ThrowsAfterThreeFailures) { EXPECT_NO_THROW(mic.restart_stalled_stream(original_context)); // attempt 2 EXPECT_EQ(mic.restart_attempts_, 2); - EXPECT_THROW(mic.restart_stalled_stream(original_context), std::runtime_error); // attempt 3 + EXPECT_NO_THROW(mic.restart_stalled_stream(original_context)); // attempt 3 (hits MAX) + EXPECT_EQ(mic.restart_attempts_, audio::utils::MAX_RESTART_ATTEMPTS); + + // Subsequent failures should also not throw and the counter must stay capped. + EXPECT_NO_THROW(mic.restart_stalled_stream(original_context)); + EXPECT_EQ(mic.restart_attempts_, audio::utils::MAX_RESTART_ATTEMPTS); } TEST_F(MicrophoneTest, RestartStalledStream_ResetsAttemptsOnSuccess) { @@ -1475,6 +1481,81 @@ class AudioCallbackTest : public ::testing::Test { EXPECT_EQ(ctx->get_write_position(), 0); } +// Watchdog: when the microphone callback stops firing, the background watcher should +// detect the staleness on its next poll and replace audio_context_ with a fresh one. +TEST_F(MicrophoneTest, WatchdogRestartsStalledStream) { + auto config = createConfig(testDeviceName, 44100, 1); + Dependencies deps{}; + microphone::Microphone mic(deps, config, mock_pa_.get()); + + // Snapshot the audio context the watcher will see initially. + std::shared_ptr initial_context; + { + std::lock_guard lock(mic.stream_ctx_mu_); + initial_context = mic.audio_context_; + } + ASSERT_TRUE(initial_context); + + // Pretend the callback last fired 5 seconds ago — well past the 2s threshold. + test_utils::mark_callback_stale(initial_context); + + // Wait long enough for the watchdog to wake, see the stale timestamp, + // and run restart_stalled_stream. + test_utils::wait_one_poll(); + + std::shared_ptr after_context; + int after_attempts = -1; + { + std::lock_guard lock(mic.stream_ctx_mu_); + after_context = mic.audio_context_; + after_attempts = mic.restart_attempts_; + } + + EXPECT_NE(after_context.get(), initial_context.get()) + << "watchdog should have replaced audio_context_ after detecting stall"; + EXPECT_EQ(after_attempts, 0) + << "restart should have succeeded with the mock pa returning paNoError"; +} + +// Watchdog: if restart_stream fails (e.g. PortAudio errors), restart_attempts_ should +// climb instead of resetting to 0. The audio_context_ is also NOT swapped because +// restart_stalled_stream only updates it on success. +TEST_F(MicrophoneTest, WatchdogIncrementsAttemptsOnRestartFailure) { + using ::testing::_; + using ::testing::Return; + + auto config = createConfig(testDeviceName, 44100, 1); + Dependencies deps{}; + microphone::Microphone mic(deps, config, mock_pa_.get()); + + std::shared_ptr initial_context; + { + std::lock_guard lock(mic.stream_ctx_mu_); + initial_context = mic.audio_context_; + } + ASSERT_TRUE(initial_context); + + // Make any future startStream call fail. The initial stream is already up; + // this only affects the watcher's restart attempt. + EXPECT_CALL(*mock_pa_, startStream(_)).WillRepeatedly(Return(paInternalError)); + + test_utils::mark_callback_stale(initial_context); + + test_utils::wait_one_poll(); + + std::shared_ptr after_context; + int after_attempts = -1; + { + std::lock_guard lock(mic.stream_ctx_mu_); + after_context = mic.audio_context_; + after_attempts = mic.restart_attempts_; + } + + EXPECT_GE(after_attempts, 1) << "failed restart should have bumped restart_attempts_"; + EXPECT_EQ(after_context.get(), initial_context.get()) + << "audio_context_ should not be swapped on failed restart"; +} + int main(int argc, char **argv) { ::testing::InitGoogleTest(&argc, argv); ::testing::AddGlobalTestEnvironment(new test_utils::AudioTestEnvironment); diff --git a/test/speaker_test.cpp b/test/speaker_test.cpp index 2156be6..6c5e922 100644 --- a/test/speaker_test.cpp +++ b/test/speaker_test.cpp @@ -1,7 +1,10 @@ #include #include #include + +#include #include +#include #include "speaker.hpp" #include "test_utils.hpp" #include "audio_codec.hpp" @@ -1002,6 +1005,103 @@ TEST_F(SpeakerTest, PlayPCM16WithWavHeader) { } +// Watchdog: when the speaker callback stops firing, the background watcher should +// detect the staleness on its next poll and replace audio_context_ with a fresh one. +// We force the stale state by manually setting last_callback_time_ns to a timestamp +// well past STREAM_RESTART_THRESHOLD_MS in the past, then sleep one poll cycle + slack. +TEST_F(SpeakerTest, WatchdogRestartsStalledStream) { + const int sample_rate = 44100; + const int num_channels = 1; + + auto attributes = ProtoStruct{}; + attributes["sample_rate"] = static_cast(sample_rate); + attributes["num_channels"] = static_cast(num_channels); + + ResourceConfig config( + "rdk:component:audioout", "", test_name_, attributes, "", + speaker::Speaker::model, LinkConfig{}, log_level::info); + + Dependencies deps{}; + speaker::Speaker speaker(deps, config, mock_pa_.get()); + + // Snapshot the audio context the watcher will see initially. + std::shared_ptr initial_context; + { + std::lock_guard lock(speaker.stream_mu_); + initial_context = speaker.audio_context_; + } + ASSERT_TRUE(initial_context); + + // Pretend the callback last fired 5 seconds ago — well past the 2s threshold. + test_utils::mark_callback_stale(initial_context); + + // Wait long enough for the watchdog to wake, see the stale timestamp, + // and run restart_stalled_stream. + test_utils::wait_one_poll(); + + std::shared_ptr after_context; + int after_attempts = -1; + { + std::lock_guard lock(speaker.stream_mu_); + after_context = speaker.audio_context_; + after_attempts = speaker.restart_attempts_; + } + + EXPECT_NE(after_context.get(), initial_context.get()) + << "watchdog should have replaced audio_context_ after detecting stall"; + EXPECT_EQ(after_attempts, 0) + << "restart should have succeeded with the mock pa returning paNoError"; +} + +// Watchdog: if restart_stream fails (e.g. PortAudio errors), restart_attempts_ should +// climb instead of resetting to 0. The fresh audio_context_ is also NOT swapped in +// because restart_stalled_stream only updates audio_context_ on success. +TEST_F(SpeakerTest, WatchdogIncrementsAttemptsOnRestartFailure) { + using ::testing::_; + using ::testing::Return; + + const int sample_rate = 44100; + const int num_channels = 1; + + auto attributes = ProtoStruct{}; + attributes["sample_rate"] = static_cast(sample_rate); + attributes["num_channels"] = static_cast(num_channels); + + ResourceConfig config( + "rdk:component:audioout", "", test_name_, attributes, "", + speaker::Speaker::model, LinkConfig{}, log_level::info); + + Dependencies deps{}; + speaker::Speaker speaker(deps, config, mock_pa_.get()); + + std::shared_ptr initial_context; + { + std::lock_guard lock(speaker.stream_mu_); + initial_context = speaker.audio_context_; + } + ASSERT_TRUE(initial_context); + + // Make any future startStream call fail. The initial stream is already up; + // this only affects the watcher's restart attempt. + EXPECT_CALL(*mock_pa_, startStream(_)).WillRepeatedly(Return(paInternalError)); + + test_utils::mark_callback_stale(initial_context); + + test_utils::wait_one_poll(); + + std::shared_ptr after_context; + int after_attempts = -1; + { + std::lock_guard lock(speaker.stream_mu_); + after_context = speaker.audio_context_; + after_attempts = speaker.restart_attempts_; + } + + EXPECT_GE(after_attempts, 1) << "failed restart should have bumped restart_attempts_"; + EXPECT_EQ(after_context.get(), initial_context.get()) + << "audio_context_ should not be swapped on failed restart"; +} + int main(int argc, char **argv) { ::testing::InitGoogleTest(&argc, argv); ::testing::AddGlobalTestEnvironment(new test_utils::AudioTestEnvironment); diff --git a/test/test_utils.hpp b/test/test_utils.hpp index 6e58edf..ff5014d 100644 --- a/test/test_utils.hpp +++ b/test/test_utils.hpp @@ -1,10 +1,16 @@ #pragma once +#include +#include +#include +#include + #include #include #include "../src/audio_stream.hpp" #include "../src/device_id.hpp" #include "../src/portaudio.hpp" +#include "../src/watchdog.hpp" #include "portaudio.h" namespace test_utils { @@ -12,6 +18,32 @@ namespace test_utils { // Common test constants constexpr int DEFAULT_DEVICE_SAMPLE_RATE = 44100; // Device's native/default sample rate in tests +// Steady-clock "now" expressed as nanoseconds since the clock's epoch — same idiom as +// the production code uses for last_callback_time_ns. Useful for tests that need to +// fabricate stale timestamps. +inline uint64_t now_ns() { + return static_cast(std::chrono::steady_clock::now().time_since_epoch().count()); +} + +// Sleeps long enough for the watchdog's poll thread to wake at least once and run its +// staleness check + restart_fn. Use this in tests that set last_callback_time_ns to a +// stale value and then need to assert on the watchdog's reaction. +inline void wait_one_poll() { + std::this_thread::sleep_for(audio::utils::POLL_INTERVAL + std::chrono::milliseconds(150)); +} + +// Marks the given audio context's `last_callback_time_ns` to be `ms_ago` milliseconds +// in the past, so the watchdog will see it as stale on its next poll. Works with any +// type that exposes a std::atomic last_callback_time_ns member — both the +// real AudioBuffer subclasses and the FakeContext used in watchdog_test. +template +inline void mark_callback_stale(const std::shared_ptr& ctx, uint64_t ms_ago = 5000) { + const uint64_t now = now_ns(); + if (ms_ago * audio::NS_PER_MS <= now) { + ctx->last_callback_time_ns.store(now - ms_ago * audio::NS_PER_MS); + } +} + // Shared test environment for audio tests // Manages the viam::sdk::Instance lifecycle for all tests class AudioTestEnvironment : public ::testing::Environment { diff --git a/test/watchdog_test.cpp b/test/watchdog_test.cpp new file mode 100644 index 0000000..04d38f0 --- /dev/null +++ b/test/watchdog_test.cpp @@ -0,0 +1,132 @@ +#include + +#include +#include +#include +#include +#include + +#include "test_utils.hpp" +#include "watchdog.hpp" + +namespace { + +// Minimal context type that satisfies the watchdog's only requirement on ContextT: +// a `std::atomic last_callback_time_ns` member. Lets us test the watchdog +// without pulling in the full AudioBuffer / stream context machinery. +struct FakeContext { + std::atomic last_callback_time_ns{0}; +}; + +// Helper: build a context whose callback last fired `ms_ago` milliseconds in the past. +std::shared_ptr make_context_stale_by(uint64_t ms_ago) { + auto ctx = std::make_shared(); + test_utils::mark_callback_stale(ctx, ms_ago); + return ctx; +} + +} // namespace + +// 1. Restart fires when the callback is stale and attempts are below the budget. +TEST(StallWatchdog, RestartFiresWhenStale) { + const auto ctx = make_context_stale_by(5000); // 5s past threshold + std::atomic restart_calls{0}; + + audio::utils::StallWatchdog wd( + [ctx]() { return ctx; }, + []() { return 0; }, + [&restart_calls](const std::shared_ptr&) { restart_calls.fetch_add(1); }, + "[wd_test_1]"); + wd.start(); + + test_utils::wait_one_poll(); + wd.stop(); + + EXPECT_GE(restart_calls.load(), 1) << "restart_fn should have been called at least once"; +} + +// Once attempts hit MAX, the backoff gate prevents repeated calls within +// BACKOFF_INTERVAL. The very first poll past MAX still fires (the watchdog needs +// that to discover when the device returns), but subsequent polls within the backoff +// window are skipped. +TEST(StallWatchdog, BackoffGateLimitsRestartsWhenAttemptsExhausted) { + const auto ctx = make_context_stale_by(5000); + std::atomic restart_calls{0}; + + audio::utils::StallWatchdog wd( + [ctx]() { return ctx; }, + []() { return audio::utils::MAX_RESTART_ATTEMPTS; }, // already at the cap + [&restart_calls](const std::shared_ptr&) { restart_calls.fetch_add(1); }, + "[wd_test_2]"); + wd.start(); + + // Wait long enough for several poll cycles (200ms each). With attempts pinned at MAX + // and BACKOFF_INTERVAL = 5000, only the very first retry should fire — every poll + // for the next 5 seconds should hit the backoff gate and skip. + std::this_thread::sleep_for(std::chrono::milliseconds(1500)); + wd.stop(); + + EXPECT_EQ(restart_calls.load(), 1) + << "expected exactly one backoff retry inside the BACKOFF_INTERVAL window; " + << "subsequent polls should have been gated"; +} + +// No restart when last_callback_time_ns == 0 (stream just opened, callback hasn't +// fired yet). The watchdog should treat this as "give it more time", not as stalled. +TEST(StallWatchdog, NoRestartWhenCallbackNeverFired) { + const auto ctx = std::make_shared(); // last_callback_time_ns stays at 0 + std::atomic restart_calls{0}; + + audio::utils::StallWatchdog wd( + [ctx]() { return ctx; }, + []() { return 0; }, + [&restart_calls](const std::shared_ptr&) { restart_calls.fetch_add(1); }, + "[wd_test_4]"); + wd.start(); + + test_utils::wait_one_poll(); + wd.stop(); + + EXPECT_EQ(restart_calls.load(), 0); +} + +// No restart when the callback fired recently (well within threshold). +TEST(StallWatchdog, NoRestartWhenCallbackRecent) { + const auto ctx = make_context_stale_by(100); // 100ms old, threshold is 2000ms + std::atomic restart_calls{0}; + + audio::utils::StallWatchdog wd( + [ctx]() { return ctx; }, + []() { return 0; }, + [&restart_calls](const std::shared_ptr&) { restart_calls.fetch_add(1); }, + "[wd_test_5]"); + wd.start(); + + test_utils::wait_one_poll(); + wd.stop(); + + EXPECT_EQ(restart_calls.load(), 0); +} + +// No restart when get_context returns nullptr (no active stream). +TEST(StallWatchdog, NoRestartWhenContextIsNull) { + std::atomic restart_calls{0}; + + audio::utils::StallWatchdog wd( + []() { return std::shared_ptr{}; }, // always null + []() { return 0; }, + [&restart_calls](const std::shared_ptr&) { restart_calls.fetch_add(1); }, + "[wd_test_6]"); + wd.start(); + + test_utils::wait_one_poll(); + wd.stop(); + + EXPECT_EQ(restart_calls.load(), 0); +} + +int main(int argc, char** argv) { + ::testing::InitGoogleTest(&argc, argv); + ::testing::AddGlobalTestEnvironment(new test_utils::AudioTestEnvironment); + return RUN_ALL_TESTS(); +}