Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/audio_buffer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ namespace vsdk = ::viam::sdk;

constexpr int BUFFER_DURATION_SECONDS = 30; // How much audio history to keep in buffer

// Unit-conversion constant: nanoseconds per millisecond.
constexpr uint64_t NS_PER_MS = 1'000'000;

// Base class for audio buffering - lock-free circular buffer with atomic operations
// Can be used by both input (microphone) and output (speaker) models.
// There is a 1:1 correspondence between AudioBuffer and viam audio resource
Expand Down
55 changes: 47 additions & 8 deletions src/audio_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,47 @@ inline PaDeviceIndex findDeviceById(const std::string& id,
return paNoDevice;
}

// If `device_id` is non-empty, re-resolves it against the current PortAudio device state
// and updates `params.device_index` / `params.device_name` in-place if the device has
// moved (e.g. USB unplug/replug → kernel re-enumerated).
//
// Returns true if the caller should proceed with the restart — either device_id was
// not configured (caller falls back to cached params) or the device was found. Returns
// false only when device_id was configured AND the device is currently missing; in that
// case the caller should skip the actual stream open since it would fail.
//
// `pa` and `resolver` default to nullptr; when null, the real PortAudio interface and
// real DeviceIdResolver are used. Tests inject mocks here.
inline bool resolve_device_id_into_params(const std::string& device_id,
StreamParams& params,
const audio::portaudio::PortAudioInterface* pa,
const std::string& log_prefix,
const audio::device_id::DeviceIdResolver* resolver = nullptr) {
if (device_id.empty()) {
return true; // not configured — proceed with cached params
}
audio::portaudio::RealPortAudio real_pa;
const audio::portaudio::PortAudioInterface& audio_interface = pa ? *pa : real_pa;
audio::device_id::RealDeviceIdResolver real_resolver;
const audio::device_id::DeviceIdResolver& resolver_ref = resolver ? *resolver : real_resolver;
const PaDeviceIndex resolved = findDeviceById(device_id, audio_interface, resolver_ref);
if (resolved == paNoDevice) {
VIAM_SDK_LOG(debug) << log_prefix << " device_id '" << device_id << "' not found on system; skipping restart";
return false;
}
if (resolved == params.device_index) {
return true; // already pointing at the right device
}
const PaDeviceInfo* const new_info = audio_interface.getDeviceInfo(resolved);
VIAM_SDK_LOG(info) << log_prefix << " device_id '" << device_id << "' moved from index " << params.device_index << " to " << resolved
<< " (" << ((new_info && new_info->name) ? new_info->name : "<unnamed>") << "); updating stream params";
params.device_index = resolved;
if (new_info && new_info->name) {
params.device_name = new_info->name;
}
return true;
}

inline ConfigParams parseConfigAttributes(const viam::sdk::ResourceConfig& cfg) {
const auto attrs = cfg.attributes();
ConfigParams params;
Expand Down Expand Up @@ -288,13 +329,11 @@ inline void openStream(PaStream*& stream, const StreamParams& params, const audi
PaError err = audio_interface.isFormatSupported(inputParams, outputParams, params.sample_rate);
if (err != paNoError) {
std::ostringstream buffer;
buffer << "Audio format not supported by device '" << params.device_name << "' (index " << params.device_index
<< "): " << Pa_GetErrorText(err) << "\n"
<< "Requested configuration:\n"
<< " - Sample rate: " << params.sample_rate << " Hz\n"
<< " - Channels: " << params.num_channels << "\n"
<< " - Format: 16-bit PCM\n"
<< " - Latency: " << params.suggested_latency_seconds << " seconds";
buffer << "Could not open stream — PortAudio: " << Pa_GetErrorText(err) << " — device '" << params.device_name << "' (index "
<< params.device_index << "). "
<< "Requested: sample_rate=" << params.sample_rate << "Hz, "
<< "channels=" << params.num_channels << ", format=16-bit PCM, "
<< "latency=" << params.suggested_latency_seconds << "s";
VIAM_SDK_LOG(error) << buffer.str();
throw std::runtime_error(buffer.str());
}
Expand Down Expand Up @@ -463,7 +502,7 @@ inline void log_callback_staleness(const std::atomic<uint64_t>& last_callback_ti
const uint64_t last_cb = last_callback_time_ns.load();
if (last_cb > 0) {
const uint64_t now_ns = static_cast<uint64_t>(std::chrono::steady_clock::now().time_since_epoch().count());
const uint64_t elapsed_ms = (now_ns - last_cb) / 1'000'000;
const uint64_t elapsed_ms = (now_ns - last_cb) / audio::NS_PER_MS;
if (elapsed_ms > STREAM_RESTART_THRESHOLD_MS && now_ns - last_log_ns > STALENESS_LOG_THROTTLE_NS) {
last_log_ns = now_ns;
if (!stream) {
Expand Down
65 changes: 41 additions & 24 deletions src/microphone.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,25 @@ void Microphone::restart_stalled_stream(const std::shared_ptr<audio::InputStream
return;
}

VIAM_SDK_LOG(warn) << "[get_audio] Restarting stalled stream (attempt " << restart_attempts_ + 1 << "/" << MAX_STREAM_RESTART_ATTEMPTS
<< ")";
VIAM_SDK_LOG(debug) << "[microphone stall_watcher] Restarting stalled stream";

// If device_id was configured, re-resolve before restarting in case the kernel
// re-enumerated and the cached device_index is stale (e.g. USB unplug/replug).
// When the device is missing, skip the actual stream open — PortAudio would just
// spam ALSA errors. Bump attempts so the watchdog enters backoff; once the device
// returns, a backoff retry will resolve and proceed.
if (!audio::utils::resolve_device_id_into_params(device_id_, stream_params_, pa_, "[microphone stall_watcher]")) {
if (restart_attempts_ < audio::utils::MAX_RESTART_ATTEMPTS) {
++restart_attempts_;
}
return;
}

if (stream_) {
try {
audio::utils::abort_stream(stream_, pa_);
} catch (const std::exception& e) {
VIAM_SDK_LOG(error) << "[get_audio] Error shutting down stalled stream: " << e.what();
VIAM_SDK_LOG(error) << "[microphone stall_watcher] Error shutting down stalled stream: " << e.what();
}
stream_ = nullptr;
}
Expand All @@ -114,16 +126,13 @@ void Microphone::restart_stalled_stream(const std::shared_ptr<audio::InputStream
latency_ = audio::utils::get_stream_latency(stream_, stream_params_, pa_);
audio_context_ = new_context;
restart_attempts_ = 0;
VIAM_SDK_LOG(info) << "[get_audio] Stream restarted successfully";
VIAM_SDK_LOG(info) << "[microphone stall_watcher] Stream restarted successfully";
} catch (const std::exception& e) {
++restart_attempts_;
if (restart_attempts_ >= MAX_STREAM_RESTART_ATTEMPTS) {
VIAM_SDK_LOG(error) << "[get_audio] Failed to restart stream after " << MAX_STREAM_RESTART_ATTEMPTS
<< " attempts, giving up: " << e.what();
throw;
if (restart_attempts_ < audio::utils::MAX_RESTART_ATTEMPTS) {
++restart_attempts_;
}
VIAM_SDK_LOG(error) << "[get_audio] Failed to restart stream (attempt " << restart_attempts_ << "/" << MAX_STREAM_RESTART_ATTEMPTS
<< "): " << e.what();
VIAM_SDK_LOG(error) << "[microphone stall_watcher] Failed to restart stream (attempt " << restart_attempts_ << "/"
<< audio::utils::MAX_RESTART_ATTEMPTS << "): " << e.what();
}
}

Expand Down Expand Up @@ -193,17 +202,36 @@ Microphone::Microphone(viam::sdk::Dependencies deps, viam::sdk::ResourceConfig c
std::lock_guard<std::mutex> lock(stream_ctx_mu_);
stream_params_ = setup.stream_params;
stream_params_.user_data = setup.audio_context.get();
device_id_ = setup.config_params.device_id;
audio::utils::restart_stream(stream_, stream_params_, pa_);
latency_ = audio::utils::get_stream_latency(stream_, stream_params_, pa_);
audio_context_ = setup.audio_context;
requested_sample_rate_ =
setup.config_params.sample_rate.value_or(setup.stream_params.sample_rate); // User's requested rate, defaults to device rate
historical_throttle_ms_ = setup.config_params.historical_throttle_ms.value_or(DEFAULT_HISTORICAL_THROTTLE_MS);
}

watchdog_ = std::make_unique<audio::utils::StallWatchdog<audio::InputStreamContext>>(
[this]() {
std::lock_guard<std::mutex> lock(stream_ctx_mu_);
return audio_context_;
},
[this]() {
std::lock_guard<std::mutex> lock(stream_ctx_mu_);
return restart_attempts_;
},
[this](const std::shared_ptr<audio::InputStreamContext>& ctx) { restart_stalled_stream(ctx); },
"[microphone stall_watcher]");
watchdog_->start();
}

Microphone::~Microphone() {
VIAM_SDK_LOG(debug) << "[Microphone::~Microphone] Destructor called";
// Stop and join the watchdog before tearing down the stream so it can't touch a
// half-destroyed audio_context_.
if (watchdog_) {
watchdog_->stop();
}
if (stream_) {
PaError err = Pa_StopStream(stream_);
if (err != paNoError) {
Expand Down Expand Up @@ -321,9 +349,11 @@ void Microphone::reconfigure(const viam::sdk::Dependencies& deps, const viam::sd

stream_params_ = setup.stream_params;
stream_params_.user_data = setup.audio_context.get();
device_id_ = setup.config_params.device_id;
audio::utils::restart_stream(stream_, stream_params_, pa_);
latency_ = audio::utils::get_stream_latency(stream_, stream_params_, pa_);
audio_context_ = setup.audio_context;
restart_attempts_ = 0;
requested_sample_rate_ = setup.config_params.sample_rate.value_or(
setup.stream_params.sample_rate); // User's requested rate, defaults to device rate
historical_throttle_ms_ = setup.config_params.historical_throttle_ms.value_or(DEFAULT_HISTORICAL_THROTTLE_MS);
Expand Down Expand Up @@ -444,19 +474,6 @@ void Microphone::get_audio(std::string const& codec,

// Wait until we have a full chunk worth of samples
if (available_samples < device_samples_per_chunk) {
audio::utils::log_callback_staleness(
stream_context->last_callback_time_ns, "[get_audio]", current_stream, last_staleness_log_ns);

const uint64_t last_cb = stream_context->last_callback_time_ns.load();
if (last_cb > 0) {
const uint64_t now_ns = static_cast<uint64_t>(std::chrono::steady_clock::now().time_since_epoch().count());
const uint64_t stale_ms = (now_ns - last_cb) / 1'000'000;
if (stale_ms > audio::utils::STREAM_RESTART_THRESHOLD_MS) {
VIAM_SDK_LOG(warn) << "[get_audio] Stream stalled for " << stale_ms << "ms, attempting restart";
restart_stalled_stream(stream_context);
}
}

const uint64_t overflow_count = stream_context->input_overflow_count.load();
if (overflow_count != last_logged_overflow_count) {
VIAM_SDK_LOG(warn) << "[get_audio] Input overflow detected — " << (overflow_count - last_logged_overflow_count)
Expand Down
11 changes: 10 additions & 1 deletion src/microphone.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
#include "audio_utils.hpp"
#include "portaudio.h"
#include "portaudio.hpp"
#include "watchdog.hpp"

namespace microphone {
namespace vsdk = ::viam::sdk;

constexpr double DEFAULT_HISTORICAL_THROTTLE_MS = 50;
constexpr int MAX_STREAM_RESTART_ATTEMPTS = 3;
PaDeviceIndex findDeviceByName(const std::string& name, const audio::portaudio::PortAudioInterface& pa);

// Calculates the initial read position from a previous timestamp
Expand Down Expand Up @@ -84,6 +84,15 @@ class Microphone final : public viam::sdk::AudioIn, public viam::sdk::Reconfigur
int restart_attempts_;

audio::utils::StreamParams stream_params_;

// Device id from the resource config (empty if user configured by device_name or
// system default). Used by restart_stalled_stream to re-resolve the device's current
// PortAudio index, so we recover from kernel re-enumeration (e.g. USB unplug/replug).
std::string device_id_;

// Background watchdog that polls audio_context_->last_callback_time_ns and triggers
// restart_stalled_stream when the mic callback has gone silent for too long.
std::unique_ptr<audio::utils::StallWatchdog<audio::InputStreamContext>> watchdog_;
};

/**
Expand Down
99 changes: 90 additions & 9 deletions src/speaker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,41 @@ Speaker::Speaker(viam::sdk::Dependencies deps, viam::sdk::ResourceConfig cfg, au
// Set new configuration and start stream under lock
{
std::lock_guard<std::mutex> lock(stream_mu_);
device_name_ = setup.stream_params.device_name;
sample_rate_ = setup.stream_params.sample_rate;
num_channels_ = setup.stream_params.num_channels;
audio_context_ = setup.audio_context;
setup.stream_params.user_data = setup.audio_context.get();
audio::utils::restart_stream(stream_, setup.stream_params, pa_);
latency_ = audio::utils::get_stream_latency(stream_, setup.stream_params, pa_);
stream_params_ = setup.stream_params;
device_id_ = setup.config_params.device_id;
audio::utils::restart_stream(stream_, stream_params_, pa_);
latency_ = audio::utils::get_stream_latency(stream_, stream_params_, pa_);
volume_ = setup.config_params.volume;
if (volume_) {
audio::volume::set_volume(device_name_, *volume_);
audio::volume::set_volume(stream_params_.device_name, *volume_);
}
}

watchdog_ = std::make_unique<audio::utils::StallWatchdog<audio::OutputStreamContext>>(
[this]() {
std::lock_guard<std::mutex> lock(stream_mu_);
return audio_context_;
},
[this]() {
std::lock_guard<std::mutex> lock(stream_mu_);
return restart_attempts_;
},
[this](const std::shared_ptr<audio::OutputStreamContext>& ctx) { restart_stalled_stream(ctx); },
"[speaker stall_watcher]");
watchdog_->start();
}

Speaker::~Speaker() {
// Stop and join the watchdog before tearing down the stream so it can't touch a
// half-destroyed audio_context_.
if (watchdog_) {
watchdog_->stop();
}

if (stream_) {
PaError err = Pa_StopStream(stream_);
if (err != paNoError) {
Expand All @@ -57,6 +77,65 @@ Speaker::~Speaker() {

vsdk::Model Speaker::model = {"viam", "system-audio", "speaker"};

/**
* Tear down the existing stream and bring up a fresh one with the saved params.
*
* Bails out if `playback_context` is no longer the active audio_context_ — that means
* either reconfigure() or another stall recovery already replaced it, so the in-flight
* play() call (if any) is about to exit on its own and we shouldn't race.
*
* On a successful restart, any unplayed audio in the old buffer is discarded — same
* behavior as reconfigure(). The in-flight play() loop will see audio_context_ change
* and return early.
*/
void Speaker::restart_stalled_stream(const std::shared_ptr<audio::OutputStreamContext>& playback_context) {
std::lock_guard<std::mutex> lock(stream_mu_);
if (playback_context != audio_context_) {
return;
}

VIAM_SDK_LOG(debug) << "[speaker stall_watcher] Restarting stalled speaker stream";

// If device_id was configured, re-resolve before restarting in case the kernel
// re-enumerated and the cached device_index is stale (e.g. USB unplug/replug).
// When the device is missing, skip the actual stream open — PortAudio would just
// spam ALSA errors. Bump attempts so the watchdog enters backoff; once the device
// returns, a backoff retry will resolve and proceed.
if (!audio::utils::resolve_device_id_into_params(device_id_, stream_params_, pa_, "[speaker stall_watcher]")) {
if (restart_attempts_ < audio::utils::MAX_RESTART_ATTEMPTS) {
++restart_attempts_;
}
return;
}

if (stream_) {
try {
audio::utils::abort_stream(stream_, pa_);
} catch (const std::exception& e) {
VIAM_SDK_LOG(error) << "[speaker stall_watcher] Error shutting down stalled stream: " << e.what();
}
stream_ = nullptr;
}

const viam::sdk::audio_info info{viam::sdk::audio_codecs::PCM_16, stream_params_.sample_rate, stream_params_.num_channels};
const auto new_context = std::make_shared<audio::OutputStreamContext>(info, audio::BUFFER_DURATION_SECONDS);

try {
stream_params_.user_data = new_context.get();
audio::utils::restart_stream(stream_, stream_params_, pa_);
latency_ = audio::utils::get_stream_latency(stream_, stream_params_, pa_);
audio_context_ = new_context;
restart_attempts_ = 0;
VIAM_SDK_LOG(info) << "[speaker stall_watcher] Speaker stream restarted successfully";
} catch (const std::exception& e) {
if (restart_attempts_ < audio::utils::MAX_RESTART_ATTEMPTS) {
++restart_attempts_;
}
VIAM_SDK_LOG(error) << "[speaker stall_watcher] Failed to restart stream (attempt " << restart_attempts_ << "/"
<< audio::utils::MAX_RESTART_ATTEMPTS << "): " << e.what();
}
}

/**
* PortAudio callback function - runs on real-time audio thread.
* This function must not:
Expand Down Expand Up @@ -182,7 +261,7 @@ viam::sdk::ProtoStruct Speaker::do_command(const viam::sdk::ProtoStruct& command
}

std::lock_guard<std::mutex> lock(stream_mu_);
audio::volume::set_volume(device_name_, vol);
audio::volume::set_volume(stream_params_.device_name, vol);
volume_ = vol;

return viam::sdk::ProtoStruct{{"volume", static_cast<double>(vol)}};
Expand Down Expand Up @@ -453,15 +532,17 @@ void Speaker::reconfigure(const vsdk::Dependencies& deps, const vsdk::ResourceCo
// Otherwise the callback thread may still be accessing the old context
// after we destroy it (heap-use-after-free)
setup.stream_params.user_data = setup.audio_context.get();
audio::utils::restart_stream(stream_, setup.stream_params, pa_);
device_name_ = setup.stream_params.device_name;
stream_params_ = setup.stream_params;
audio::utils::restart_stream(stream_, stream_params_, pa_);
sample_rate_ = setup.stream_params.sample_rate;
num_channels_ = setup.stream_params.num_channels;
latency_ = audio::utils::get_stream_latency(stream_, setup.stream_params, pa_);
latency_ = audio::utils::get_stream_latency(stream_, stream_params_, pa_);
audio_context_ = setup.audio_context;
device_id_ = setup.config_params.device_id;
restart_attempts_ = 0;
volume_ = setup.config_params.volume;
if (volume_) {
audio::volume::set_volume(device_name_, *volume_);
audio::volume::set_volume(stream_params_.device_name, *volume_);
}
}
VIAM_SDK_LOG(info) << "[reconfigure] Reconfigure completed successfully";
Expand Down
Loading
Loading