From 04bd158b8cc06350eb680abf76263a5941e45495 Mon Sep 17 00:00:00 2001 From: flw5469 Date: Wed, 7 Jan 2026 00:41:22 +0800 Subject: [PATCH 01/15] feat: implemented semaphore and condition variable based statetracker --- .../roc_pipeline/state_tracker.cpp | 99 +++++++++++++++++-- .../roc_pipeline/state_tracker.h | 20 +++- 2 files changed, 109 insertions(+), 10 deletions(-) diff --git a/src/internal_modules/roc_pipeline/state_tracker.cpp b/src/internal_modules/roc_pipeline/state_tracker.cpp index bc2b6ef6f..fbd808d92 100644 --- a/src/internal_modules/roc_pipeline/state_tracker.cpp +++ b/src/internal_modules/roc_pipeline/state_tracker.cpp @@ -7,15 +7,74 @@ */ #include "roc_pipeline/state_tracker.h" +#include "roc_core/log.h" #include "roc_core/panic.h" namespace roc { namespace pipeline { StateTracker::StateTracker() - : halt_state_(-1) + : sem_(0) + , halt_state_(-1) , active_sessions_(0) - , pending_packets_(0) { + , pending_packets_(0) + , sem_is_occupied_(0) + , waiting_mask_(0) + , mutex_() + , waiting_con_(mutex_) { +} + +// StateTracker::~StateTracker() { +// mutex_.unlock(); +// } + +// This method should block until the state becomes any of the states specified by the +// mask, or deadline expires. E.g. if mask is ACTIVE | PAUSED, it should block until state +// becomes either ACTIVE or PAUSED. (Currently only two states are used, but later more +// states will be needed). Deadline should be an absolute timestamp. + +// Questions: +// - When should the function return true vs false +bool StateTracker::wait_state(unsigned int state_mask, core::nanoseconds_t deadline) { + + mutex_.lock(); + for (;;) { + // If no state is specified in state_mask, return immediately + if (state_mask == 0) { + return true; + } + + if (static_cast(get_state()) & state_mask) { + return true; + } + + if (deadline >= 0 && deadline <= core::timestamp(core::ClockMonotonic)) { + return false; + } + + if (sem_is_occupied_.compare_exchange(0, 1)) { + + if (deadline >= 0) { + mutex_.unlock(); + (void)sem_.timed_wait(deadline); + + } else { + mutex_.unlock(); + sem_.wait(); + } + + mutex_.lock(); + sem_is_occupied_ = 0; + waiting_con_.broadcast(); + + } else { + if (deadline >= 0) { + (void)waiting_con_.timed_wait(deadline); + } else { + waiting_con_.wait(); + } + } + } } sndio::DeviceState StateTracker::get_state() const { @@ -65,22 +124,50 @@ size_t StateTracker::num_sessions() const { } void StateTracker::register_session() { - active_sessions_++; + if (active_sessions_++ == 0) { + signal_state_change(); + } } void StateTracker::unregister_session() { - if (--active_sessions_ < 0) { + int prev_sessions = active_sessions_--; + if (prev_sessions == 0) { roc_panic("state tracker: unpaired register/unregister session"); + } else if (prev_sessions == 1 && pending_packets_ == 0) { + signal_state_change(); } + + // if (--active_sessions_ < 0) { + // roc_panic("state tracker: unpaired register/unregister session"); + // } } void StateTracker::register_packet() { - pending_packets_++; + if (pending_packets_++ == 0 && active_sessions_ == 0) { + signal_state_change(); + } } void StateTracker::unregister_packet() { - if (--pending_packets_ < 0) { + int prev_packets = pending_packets_--; + if (prev_packets == 0) { roc_panic("state tracker: unpaired register/unregister packet"); + } else if (prev_packets == 1 && active_sessions_ == 0) { + signal_state_change(); + } + + // if (--pending_packets_ < 0) { + // roc_panic("state tracker: unpaired register/unregister packet"); + // } +} + +void StateTracker::signal_state_change() { + // if (waiting_mask_ != 0 && (static_cast(get_state()) & waiting_mask_)) { + // sem_.post(); + // } + if (sem_is_occupied_) { + roc_log(LogDebug, "signaling"); + sem_.post(); } } diff --git a/src/internal_modules/roc_pipeline/state_tracker.h b/src/internal_modules/roc_pipeline/state_tracker.h index b09180b3f..e5d896d2e 100644 --- a/src/internal_modules/roc_pipeline/state_tracker.h +++ b/src/internal_modules/roc_pipeline/state_tracker.h @@ -12,9 +12,11 @@ #ifndef ROC_PIPELINE_STATE_TRACKER_H_ #define ROC_PIPELINE_STATE_TRACKER_H_ -#include "roc_core/atomic_int.h" +#include "roc_core/atomic.h" #include "roc_core/noncopyable.h" +#include "roc_core/semaphore.h" #include "roc_core/stddefs.h" +#include "roc_core/time.h" #include "roc_sndio/device_defs.h" namespace roc { @@ -32,6 +34,9 @@ class StateTracker : public core::NonCopyable<> { //! Initialize all counters to zero. StateTracker(); + //! Block until state becomes any of the ones specified by state_mask. + bool wait_state(unsigned state_mask, core::nanoseconds_t deadline); + //! Compute current state. sndio::DeviceState get_state() const; @@ -63,9 +68,16 @@ class StateTracker : public core::NonCopyable<> { void unregister_packet(); private: - core::AtomicInt halt_state_; - core::AtomicInt active_sessions_; - core::AtomicInt pending_packets_; + core::Semaphore sem_; + core::Atomic halt_state_; + core::Atomic active_sessions_; + core::Atomic pending_packets_; + core::Atomic sem_is_occupied_; + core::Atomic waiting_mask_; + core::Mutex mutex_; + core::Cond waiting_con_; + + void signal_state_change(); }; } // namespace pipeline From cc259c0708a2698afecdb3325717f1065284c6ba Mon Sep 17 00:00:00 2001 From: flw5469 Date: Fri, 20 Feb 2026 16:03:04 +0800 Subject: [PATCH 02/15] update: add statetracker wait(), implement semaphore using mutex and cond var in platform without sem_clockwait() to prevent semaphore:timed_wait() to hang when changing sysem time, add test cases for state tracker and semaphore --- SConstruct | 13 +- .../target_nosem/roc_core/semaphore.cpp | 101 ++++++++++ .../target_nosem/roc_core/semaphore.h | 55 ++++++ .../roc_core/target_posix/roc_core/mutex.h | 1 + .../roc_core/semaphore.cpp | 12 +- .../roc_core/semaphore.h | 2 +- .../roc_pipeline/state_tracker.cpp | 15 +- .../roc_pipeline/state_tracker.h | 1 - src/tests/roc_pipeline/test_semaphore.cpp | 165 ++++++++++++++++ src/tests/roc_pipeline/test_state_tracker.cpp | 178 ++++++++++++++++++ 10 files changed, 524 insertions(+), 19 deletions(-) create mode 100644 src/internal_modules/roc_core/target_nosem/roc_core/semaphore.cpp create mode 100644 src/internal_modules/roc_core/target_nosem/roc_core/semaphore.h rename src/internal_modules/roc_core/{target_posix_ext => target_posix_sem}/roc_core/semaphore.cpp (88%) rename src/internal_modules/roc_core/{target_posix_ext => target_posix_sem}/roc_core/semaphore.h (96%) create mode 100644 src/tests/roc_pipeline/test_semaphore.cpp create mode 100644 src/tests/roc_pipeline/test_state_tracker.cpp diff --git a/SConstruct b/SConstruct index 63f829862..7a705520d 100644 --- a/SConstruct +++ b/SConstruct @@ -782,10 +782,15 @@ else: 'target_posix_pc', ]) - if meta.platform in ['linux', 'unix', 'android']: - env.Append(ROC_TARGETS=[ - 'target_posix_ext', - ]) + if meta.platform in ['linux', 'android', 'unix']: + if 'ROC_HAVE_SEM_CLOCKWAIT' in env['CPPDEFINES']: + env.Append(ROC_TARGETS=[ + 'target_posix_sem', + ]) + else: + env.Append(ROC_TARGETS=[ + 'target_nosem', + ]) if meta.platform in ['linux', 'unix', 'macos', 'windows', 'android']: env.Append(ROC_TARGETS=[ diff --git a/src/internal_modules/roc_core/target_nosem/roc_core/semaphore.cpp b/src/internal_modules/roc_core/target_nosem/roc_core/semaphore.cpp new file mode 100644 index 000000000..2b87b6e84 --- /dev/null +++ b/src/internal_modules/roc_core/target_nosem/roc_core/semaphore.cpp @@ -0,0 +1,101 @@ +/* + * Copyright (c) 2020 Roc Streaming authors + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +#include "roc_core/semaphore.h" +#include "roc_core/cpu_instructions.h" +#include "roc_core/errno_to_str.h" +#include "roc_core/log.h" +#include "roc_core/panic.h" + +#include +#include + +namespace roc { +namespace core { + +Semaphore::Semaphore(unsigned counter) + : mutex_() + , counter_(counter) + , guard_(0) { + if (int err = pthread_cond_init(&cond_, NULL)) { + roc_panic("sem: pthread_cond_init(): %s", errno_to_str(err).c_str()); + } +} + +Semaphore::~Semaphore() { + while (guard_) { + cpu_relax(); + } + + int err = 0; + if ((err = pthread_cond_destroy(&cond_))) { + roc_panic("sem: pthread_cond_destroy(): %s", errno_to_str(err).c_str()); + } +} + +bool Semaphore::timed_wait(nanoseconds_t deadline) { + if (deadline < 0) { + roc_panic("semaphore: unexpected negative deadline"); + } + + roc_log(roc::LogDebug, "origin time is %" PRId64 "\n", deadline); + roc_log(roc::LogDebug, "time is %" PRId64 "\n", deadline); + roc_log(roc::LogDebug, "now time is %" PRId64 "\n", + core::timestamp(core::ClockMonotonic)); + + struct timespec ts; + ts.tv_sec = time_t(deadline / Second); + ts.tv_nsec = long(deadline % Second); + + int err = 0; + mutex_.lock(); + while (err == 0 && counter_ == 0) { + err = pthread_cond_clockwait(&cond_, &mutex_.mutex_, CLOCK_MONOTONIC, &ts); + printf("finish waiting without sem"); + // roc_log(roc::LogDebug, "finish waiting without sem"); + + if (err != 0 && err != ETIMEDOUT) { + roc_panic("semaphore: pthread_cond_timedwait(): %s", + errno_to_str(err).c_str()); + } + } + + if (err == 0) { + counter_--; + } + mutex_.unlock(); + + // return false when err == ETIMEDOUT + return (err == 0); +} + +void Semaphore::wait() { + int err = 0; + mutex_.lock(); + while (err == 0 && counter_ == 0) { + if ((err = pthread_cond_wait(&cond_, &mutex_.mutex_))) { + roc_panic("semaphore: pthread_cond_wait(): %s", errno_to_str(err).c_str()); + } + } + counter_--; + mutex_.unlock(); +} + +void Semaphore::post() { + ++guard_; + mutex_.lock(); + counter_++; + if (int err = pthread_cond_broadcast(&cond_)) { + roc_panic("cond: pthread_cond_broadcast(): %s", errno_to_str(err).c_str()); + } + mutex_.unlock(); + --guard_; +} + +} // namespace core +} // namespace roc diff --git a/src/internal_modules/roc_core/target_nosem/roc_core/semaphore.h b/src/internal_modules/roc_core/target_nosem/roc_core/semaphore.h new file mode 100644 index 000000000..8eb70ce6b --- /dev/null +++ b/src/internal_modules/roc_core/target_nosem/roc_core/semaphore.h @@ -0,0 +1,55 @@ +/* + * Copyright (c) 2020 Roc Streaming authors + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +//! @file roc_core/target_nosem/roc_core/semaphore.h +//! @brief Semaphore. + +#ifndef ROC_CORE_SEMAPHORE_H_ +#define ROC_CORE_SEMAPHORE_H_ + +#include "roc_core/atomic.h" +#include "roc_core/attributes.h" +#include "roc_core/mutex.h" +#include "roc_core/noncopyable.h" +#include "roc_core/time.h" +#include + +namespace roc { +namespace core { + +//! Semaphore. +class Semaphore : public NonCopyable<> { +public: + //! Initialize semaphore with given counter. + explicit Semaphore(unsigned counter = 0); + + ~Semaphore(); + + //! Wait until the counter becomes non-zero, decrement it, and return true. + //! If deadline expires before the counter becomes non-zero, returns false. + //! Deadline should be in the same time domain as core::timestamp(). + ROC_NODISCARD bool timed_wait(nanoseconds_t deadline); + + //! Wait until the counter becomes non-zero, decrement it, and return. + void wait(); + + //! Increment counter and wake up blocked waits. + //! This method is implemented using mutex for platforms where + void post(); + +private: + pthread_cond_t cond_; + core::Mutex mutex_; + Atomic counter_; + Atomic guard_; +}; + +} // namespace core +} // namespace roc + +#endif // ROC_CORE_SEMAPHORE_H_ diff --git a/src/internal_modules/roc_core/target_posix/roc_core/mutex.h b/src/internal_modules/roc_core/target_posix/roc_core/mutex.h index efc1cecbe..df4d52d86 100644 --- a/src/internal_modules/roc_core/target_posix/roc_core/mutex.h +++ b/src/internal_modules/roc_core/target_posix/roc_core/mutex.h @@ -70,6 +70,7 @@ class Mutex : public NonCopyable<> { private: friend class Cond; + friend class Semaphore; mutable pthread_mutex_t mutex_; mutable AtomicInt guard_; diff --git a/src/internal_modules/roc_core/target_posix_ext/roc_core/semaphore.cpp b/src/internal_modules/roc_core/target_posix_sem/roc_core/semaphore.cpp similarity index 88% rename from src/internal_modules/roc_core/target_posix_ext/roc_core/semaphore.cpp rename to src/internal_modules/roc_core/target_posix_sem/roc_core/semaphore.cpp index 0d9685390..4b888f693 100644 --- a/src/internal_modules/roc_core/target_posix_ext/roc_core/semaphore.cpp +++ b/src/internal_modules/roc_core/target_posix_sem/roc_core/semaphore.cpp @@ -9,6 +9,7 @@ #include "roc_core/semaphore.h" #include "roc_core/cpu_instructions.h" #include "roc_core/errno_to_str.h" +#include "roc_core/log.h" #include "roc_core/panic.h" #include @@ -34,16 +35,17 @@ Semaphore::~Semaphore() { } bool Semaphore::timed_wait(nanoseconds_t deadline) { + printf("Enter function of timed_wait"); if (deadline < 0) { roc_panic("semaphore: unexpected negative deadline"); } - for (;;) { - timespec ts; - ts.tv_sec = long(deadline / Second); - ts.tv_nsec = long(deadline % Second); + timespec ts; + ts.tv_sec = long(deadline / Second); + ts.tv_nsec = long(deadline % Second); - if (sem_timedwait(&sem_, &ts) == 0) { + for (;;) { + if (sem_clockwait(&sem_, CLOCK_MONOTONIC, &ts) == 0) { return true; } diff --git a/src/internal_modules/roc_core/target_posix_ext/roc_core/semaphore.h b/src/internal_modules/roc_core/target_posix_sem/roc_core/semaphore.h similarity index 96% rename from src/internal_modules/roc_core/target_posix_ext/roc_core/semaphore.h rename to src/internal_modules/roc_core/target_posix_sem/roc_core/semaphore.h index 4eab2fd84..e768fb21b 100644 --- a/src/internal_modules/roc_core/target_posix_ext/roc_core/semaphore.h +++ b/src/internal_modules/roc_core/target_posix_sem/roc_core/semaphore.h @@ -6,7 +6,7 @@ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ -//! @file roc_core/target_posix_ext/roc_core/semaphore.h +//! @file roc_core/target_posix_sem/roc_core/semaphore.h //! @brief Semaphore. #ifndef ROC_CORE_SEMAPHORE_H_ diff --git a/src/internal_modules/roc_pipeline/state_tracker.cpp b/src/internal_modules/roc_pipeline/state_tracker.cpp index fbd808d92..aa93da6a6 100644 --- a/src/internal_modules/roc_pipeline/state_tracker.cpp +++ b/src/internal_modules/roc_pipeline/state_tracker.cpp @@ -19,7 +19,6 @@ StateTracker::StateTracker() , active_sessions_(0) , pending_packets_(0) , sem_is_occupied_(0) - , waiting_mask_(0) , mutex_() , waiting_con_(mutex_) { } @@ -36,24 +35,24 @@ StateTracker::StateTracker() // Questions: // - When should the function return true vs false bool StateTracker::wait_state(unsigned int state_mask, core::nanoseconds_t deadline) { - + // If no state is specified in state_mask, return immediately + if (state_mask == 0) { + return true; + } + mutex_.lock(); for (;;) { - // If no state is specified in state_mask, return immediately - if (state_mask == 0) { - return true; - } - if (static_cast(get_state()) & state_mask) { + mutex_.unlock(); return true; } if (deadline >= 0 && deadline <= core::timestamp(core::ClockMonotonic)) { + mutex_.unlock(); return false; } if (sem_is_occupied_.compare_exchange(0, 1)) { - if (deadline >= 0) { mutex_.unlock(); (void)sem_.timed_wait(deadline); diff --git a/src/internal_modules/roc_pipeline/state_tracker.h b/src/internal_modules/roc_pipeline/state_tracker.h index e5d896d2e..1b1ae8b4f 100644 --- a/src/internal_modules/roc_pipeline/state_tracker.h +++ b/src/internal_modules/roc_pipeline/state_tracker.h @@ -73,7 +73,6 @@ class StateTracker : public core::NonCopyable<> { core::Atomic active_sessions_; core::Atomic pending_packets_; core::Atomic sem_is_occupied_; - core::Atomic waiting_mask_; core::Mutex mutex_; core::Cond waiting_con_; diff --git a/src/tests/roc_pipeline/test_semaphore.cpp b/src/tests/roc_pipeline/test_semaphore.cpp new file mode 100644 index 000000000..64cf4e05d --- /dev/null +++ b/src/tests/roc_pipeline/test_semaphore.cpp @@ -0,0 +1,165 @@ +/* + * Copyright (c) 2023 Roc Streaming authors + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +#include "test_harness.h" + +#include "roc_address/protocol.h" +#include "roc_audio/mixer.h" +#include "roc_audio/sample.h" +#include "roc_core/atomic.h" +#include "roc_core/heap_arena.h" +#include "roc_core/noop_arena.h" +#include "roc_core/semaphore.h" +#include "roc_core/thread.h" +#include "roc_core/time.h" +#include "roc_pipeline/config.h" +#include "roc_pipeline/receiver_endpoint.h" +#include "roc_pipeline/receiver_session_group.h" + +namespace roc { +namespace pipeline { + +namespace { + +enum { PacketSz = 512 }; + +core::HeapArena arena; + +packet::PacketFactory packet_factory(arena, PacketSz); +audio::FrameFactory frame_factory(arena, PacketSz * sizeof(audio::sample_t)); + +audio::ProcessorMap processor_map(arena); +rtp::EncodingMap encoding_map(arena); + +class TestThread : public core::Thread { +public: + TestThread(core::nanoseconds_t deadline, core::Semaphore* semaphore) + : r_(0) + , deadline_(deadline) { + semaphore_ = semaphore; + } + + bool running() const { + return r_; + } + + void wait_running() { + while (!r_) { + core::sleep_for(core::ClockMonotonic, core::Microsecond); + } + } + +private: + virtual void run() { + r_ = true; + semaphore_->timed_wait(deadline_); + r_ = false; + } + core::Atomic r_; + core::nanoseconds_t deadline_; + core::Semaphore* semaphore_; +}; + +} // namespace + +TEST_GROUP(semaphore) {}; + +TEST(semaphore, timeout_test) { + core::Semaphore sem(0); + roc_log(LogDebug, "ready"); + bool result = + sem.timed_wait(1 * core::Second + core::timestamp(core::ClockMonotonic)); + if (result) { + roc_log(LogDebug, "true, unlocked by other threads"); + } else { + roc_log(LogDebug, "false, timeout"); + } + CHECK(result == false); +} + +TEST(semaphore, block_test) { + core::Semaphore sem(0); + roc_log(LogDebug, "ready"); + TestThread** threads_ptr = new TestThread*[10]; + for (int i = 0; i < 10; i++) { + threads_ptr[i] = new TestThread( + 1 * core::Second + core::timestamp(core::ClockMonotonic), &sem); + threads_ptr[i]->start(); + } + core::sleep_for(core::ClockMonotonic, core::Millisecond * 100); + for (int i = 0; i < 10; i++) { + CHECK(threads_ptr[i]->running()); + } + for (int i = 0; i < 10; i++) { + sem.post(); + } + core::sleep_for(core::ClockMonotonic, core::Millisecond * 100); + for (int i = 0; i < 10; i++) { + CHECK(!threads_ptr[i]->running()); + } + for (int i = 0; i < 10; i++) { + threads_ptr[i]->join(); + } +} + +TEST(semaphore, multiple_post_before_wait) { + core::Semaphore sem(0); + + // Post multiple times before any waits + for (int i = 0; i < 5; i++) { + sem.post(); + } + + // All waits should succeed immediately without blocking + for (int i = 0; i < 5; i++) { + bool result = sem.timed_wait(core::timestamp(core::ClockMonotonic)); + CHECK(result == true); + } + + // Next wait should timeout since counter is back to 0 + bool result = + sem.timed_wait(100 * core::Millisecond + core::timestamp(core::ClockMonotonic)); + CHECK(result == false); +} + +TEST(semaphore, concurrent_post_and_wait) { + core::Semaphore sem(0); + const int num_threads = 20; + TestThread** threads_ptr = new TestThread*[num_threads]; + + // Start threads that will wait + for (int i = 0; i < num_threads; i++) { + threads_ptr[i] = new TestThread( + 1 * core::Second + core::timestamp(core::ClockMonotonic), &sem); + threads_ptr[i]->start(); + } + + // Wait for all threads to be running and blocked + core::sleep_for(core::ClockMonotonic, core::Millisecond * 100); + for (int i = 0; i < num_threads; i++) { + CHECK(threads_ptr[i]->running()); + } + + // Post from multiple iterations to wake them up gradually + for (int i = 0; i < num_threads; i++) { + sem.post(); + } + + // All threads should eventually finish + core::sleep_for(core::ClockMonotonic, core::Millisecond * 200); + for (int i = 0; i < num_threads; i++) { + CHECK(!threads_ptr[i]->running()); + } + + for (int i = 0; i < num_threads; i++) { + threads_ptr[i]->join(); + } +} + +} // namespace pipeline +} // namespace roc diff --git a/src/tests/roc_pipeline/test_state_tracker.cpp b/src/tests/roc_pipeline/test_state_tracker.cpp new file mode 100644 index 000000000..17b47f1ee --- /dev/null +++ b/src/tests/roc_pipeline/test_state_tracker.cpp @@ -0,0 +1,178 @@ +/* + * Copyright (c) 2023 Roc Streaming authors + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +#include "test_harness.h" + +#include "roc_address/protocol.h" +#include "roc_audio/mixer.h" +#include "roc_audio/sample.h" +#include "roc_core/atomic.h" +#include "roc_core/heap_arena.h" +#include "roc_core/noop_arena.h" +#include "roc_core/semaphore.h" +#include "roc_core/thread.h" +#include "roc_core/time.h" +#include "roc_pipeline/config.h" +#include "roc_pipeline/receiver_endpoint.h" +#include "roc_pipeline/receiver_session_group.h" + +namespace roc { +namespace pipeline { + +namespace { + +enum { PacketSz = 512 }; + +core::HeapArena arena; + +packet::PacketFactory packet_factory(arena, PacketSz); +audio::FrameFactory frame_factory(arena, PacketSz * sizeof(audio::sample_t)); + +audio::ProcessorMap processor_map(arena); +rtp::EncodingMap encoding_map(arena); + +class TestThread : public core::Thread { +public: + TestThread(StateTracker& st, unsigned int state_mask, core::nanoseconds_t deadline) + : t_(st) + , r_(0) + , state_mask_(state_mask) + , deadline_(deadline) { + } + + bool running() const { + return r_; + } + + void wait_running() { + while (!r_) { + core::sleep_for(core::ClockMonotonic, core::Microsecond); + } + } + +private: + void run() { + r_ = true; + t_.wait_state(state_mask_, deadline_); + r_ = false; + } + + StateTracker& t_; + core::Atomic r_; + unsigned int state_mask_; + core::nanoseconds_t deadline_; +}; + +} // namespace + +TEST_GROUP(state_tracker) {}; + +// set a thread that last for 0.5 seconds, wait for 1 second to make it timeout. +TEST(state_tracker, simple_timeout) { + StateTracker state_tracker; + TestThread thr(state_tracker, sndio::DeviceState_Active, + core::timestamp(core::ClockMonotonic) + core::Millisecond * 500); + + CHECK(thr.start()); + core::sleep_for(core::ClockMonotonic, core::Millisecond * 1000); + CHECK(!(thr.running())); + thr.join(); +} + +TEST(state_tracker, multiple_timeout) { + StateTracker state_tracker; + TestThread** threads_ptr = new TestThread*[10]; + + // set threads that last for 1 second + for (int i = 0; i < 10; i++) { + threads_ptr[i] = new TestThread(state_tracker, sndio::DeviceState_Active, + core::timestamp(core::ClockMonotonic) + + core::Millisecond * 1000); + } + + // wait for start, then check if threads are running + for (int i = 0; i < 10; i++) { + CHECK(threads_ptr[i]->start()); + // CHECK(threads_ptr[i]->running()); + // roc_log(LogDebug, "check running %d\n", i); + } + core::sleep_for(core::ClockMonotonic, core::Millisecond * 10); + for (int i = 0; i < 10; i++) { + // roc_log(LogDebug, "check running %d\n", i); + CHECK(threads_ptr[i]->running()); + } + + // sleep for 2 seconds, making the threads timeout + roc_log(LogDebug, "started running"); + core::sleep_for(core::ClockMonotonic, core::Millisecond * 2000); + + // check if threads are stopped + for (int i = 0; i < 10; i++) { + CHECK(!threads_ptr[i]->running()); + } + + roc_log(LogDebug, "started joining"); + + for (int i = 0; i < 10; i++) { + threads_ptr[i]->join(); + } + + roc_log(LogDebug, "finished joining"); + + for (int i = 0; i < 10; ++i) { + delete threads_ptr[i]; + } + delete[] threads_ptr; +} + +TEST(state_tracker, multiple_switch) { + StateTracker state_tracker; + TestThread** threads_ptr = new TestThread*[10]; + + // set threads without waiting time + for (int i = 0; i < 10; i++) { + threads_ptr[i] = new TestThread(state_tracker, sndio::DeviceState_Active, -1); + } + + for (int i = 0; i < 10; i++) { + CHECK(threads_ptr[i]->start()); + } + + roc_log(LogDebug, "started running"); + + // wait for threads starting + core::sleep_for(core::ClockMonotonic, core::Millisecond * 500); + + // check if the threads have started + for (int i = 0; i < 10; i++) { + CHECK(threads_ptr[i]->running()); + } + + // register a packet + core::sleep_for(core::ClockMonotonic, core::Millisecond * 500); + state_tracker.register_packet(); + core::sleep_for(core::ClockMonotonic, core::Millisecond * 500); + + // check if the threads have been stopped + for (int i = 0; i < 10; i++) { + CHECK(!(threads_ptr[i]->running())); + } + + roc_log(LogDebug, "started joining"); + for (int i = 0; i < 10; i++) { + threads_ptr[i]->join(); + } + roc_log(LogDebug, "finished joining"); + + for (int i = 0; i < 10; ++i) { + delete threads_ptr[i]; + } + delete[] threads_ptr; +} +} // namespace pipeline +} // namespace roc From b404dad71bd7119dbbbf7117ce052e79ae9f2b73 Mon Sep 17 00:00:00 2001 From: flw5469 Date: Fri, 20 Feb 2026 17:06:19 +0800 Subject: [PATCH 03/15] update atomic --- .../roc_core/target_nosem/roc_core/semaphore.h | 6 +++--- src/internal_modules/roc_pipeline/state_tracker.h | 10 +++++----- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/internal_modules/roc_core/target_nosem/roc_core/semaphore.h b/src/internal_modules/roc_core/target_nosem/roc_core/semaphore.h index 8eb70ce6b..7ae079818 100644 --- a/src/internal_modules/roc_core/target_nosem/roc_core/semaphore.h +++ b/src/internal_modules/roc_core/target_nosem/roc_core/semaphore.h @@ -12,7 +12,7 @@ #ifndef ROC_CORE_SEMAPHORE_H_ #define ROC_CORE_SEMAPHORE_H_ -#include "roc_core/atomic.h" +#include "roc_core/atomic_int.h" #include "roc_core/attributes.h" #include "roc_core/mutex.h" #include "roc_core/noncopyable.h" @@ -45,8 +45,8 @@ class Semaphore : public NonCopyable<> { private: pthread_cond_t cond_; core::Mutex mutex_; - Atomic counter_; - Atomic guard_; + AtomicInt counter_; + AtomicInt guard_; }; } // namespace core diff --git a/src/internal_modules/roc_pipeline/state_tracker.h b/src/internal_modules/roc_pipeline/state_tracker.h index 1b1ae8b4f..116473211 100644 --- a/src/internal_modules/roc_pipeline/state_tracker.h +++ b/src/internal_modules/roc_pipeline/state_tracker.h @@ -12,7 +12,7 @@ #ifndef ROC_PIPELINE_STATE_TRACKER_H_ #define ROC_PIPELINE_STATE_TRACKER_H_ -#include "roc_core/atomic.h" +#include "roc_core/atomic_int.h" #include "roc_core/noncopyable.h" #include "roc_core/semaphore.h" #include "roc_core/stddefs.h" @@ -69,10 +69,10 @@ class StateTracker : public core::NonCopyable<> { private: core::Semaphore sem_; - core::Atomic halt_state_; - core::Atomic active_sessions_; - core::Atomic pending_packets_; - core::Atomic sem_is_occupied_; + core::AtomicInt halt_state_; + core::AtomicInt active_sessions_; + core::AtomicInt pending_packets_; + core::AtomicInt sem_is_occupied_; core::Mutex mutex_; core::Cond waiting_con_; From 9eb58e61314ce46acb0ac39dfe1ff622d2cdf239 Mon Sep 17 00:00:00 2001 From: flw5469 Date: Fri, 20 Feb 2026 17:09:58 +0800 Subject: [PATCH 04/15] add missing header --- src/internal_modules/roc_pipeline/state_tracker.h | 1 + 1 file changed, 1 insertion(+) diff --git a/src/internal_modules/roc_pipeline/state_tracker.h b/src/internal_modules/roc_pipeline/state_tracker.h index 116473211..08a5c2e49 100644 --- a/src/internal_modules/roc_pipeline/state_tracker.h +++ b/src/internal_modules/roc_pipeline/state_tracker.h @@ -17,6 +17,7 @@ #include "roc_core/semaphore.h" #include "roc_core/stddefs.h" #include "roc_core/time.h" +#include "roc_core/cond.h" #include "roc_sndio/device_defs.h" namespace roc { From b2e754d062fbbead2825a394128a4bdaa61601b3 Mon Sep 17 00:00:00 2001 From: flw5469 Date: Fri, 20 Feb 2026 17:13:11 +0800 Subject: [PATCH 05/15] fix atomic --- src/tests/roc_pipeline/test_state_tracker.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tests/roc_pipeline/test_state_tracker.cpp b/src/tests/roc_pipeline/test_state_tracker.cpp index 17b47f1ee..e194770da 100644 --- a/src/tests/roc_pipeline/test_state_tracker.cpp +++ b/src/tests/roc_pipeline/test_state_tracker.cpp @@ -11,7 +11,7 @@ #include "roc_address/protocol.h" #include "roc_audio/mixer.h" #include "roc_audio/sample.h" -#include "roc_core/atomic.h" +#include "roc_core/atomic_int.h" #include "roc_core/heap_arena.h" #include "roc_core/noop_arena.h" #include "roc_core/semaphore.h" From 39da1dce54d1f3020a14fd4888000947659f15a4 Mon Sep 17 00:00:00 2001 From: flw5469 Date: Fri, 20 Feb 2026 17:14:10 +0800 Subject: [PATCH 06/15] fix atomic --- src/tests/roc_pipeline/test_state_tracker.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/tests/roc_pipeline/test_state_tracker.cpp b/src/tests/roc_pipeline/test_state_tracker.cpp index e194770da..4c6c69389 100644 --- a/src/tests/roc_pipeline/test_state_tracker.cpp +++ b/src/tests/roc_pipeline/test_state_tracker.cpp @@ -63,7 +63,7 @@ class TestThread : public core::Thread { } StateTracker& t_; - core::Atomic r_; + core::AtomicInt r_; unsigned int state_mask_; core::nanoseconds_t deadline_; }; From 57906153349f684f86c34e8d2a2ed5b79f4c59fc Mon Sep 17 00:00:00 2001 From: flw5469 Date: Fri, 20 Feb 2026 21:08:58 +0800 Subject: [PATCH 07/15] update pthread function usage and test cases --- .../target_nosem/roc_core/semaphore.cpp | 24 ++++++------------- .../roc_pipeline/state_tracker.cpp | 1 + src/tests/roc_pipeline/test_semaphore.cpp | 10 ++++---- src/tests/roc_pipeline/test_state_tracker.cpp | 10 ++++---- 4 files changed, 18 insertions(+), 27 deletions(-) diff --git a/src/internal_modules/roc_core/target_nosem/roc_core/semaphore.cpp b/src/internal_modules/roc_core/target_nosem/roc_core/semaphore.cpp index 2b87b6e84..cf2941c09 100644 --- a/src/internal_modules/roc_core/target_nosem/roc_core/semaphore.cpp +++ b/src/internal_modules/roc_core/target_nosem/roc_core/semaphore.cpp @@ -43,35 +43,25 @@ bool Semaphore::timed_wait(nanoseconds_t deadline) { roc_panic("semaphore: unexpected negative deadline"); } - roc_log(roc::LogDebug, "origin time is %" PRId64 "\n", deadline); - roc_log(roc::LogDebug, "time is %" PRId64 "\n", deadline); - roc_log(roc::LogDebug, "now time is %" PRId64 "\n", - core::timestamp(core::ClockMonotonic)); - struct timespec ts; + clock_gettime(CLOCK_MONOTONIC, &ts); + deadline += (nanoseconds_t)ts.tv_sec * Second + (nanoseconds_t)ts.tv_nsec; ts.tv_sec = time_t(deadline / Second); ts.tv_nsec = long(deadline % Second); int err = 0; mutex_.lock(); while (err == 0 && counter_ == 0) { - err = pthread_cond_clockwait(&cond_, &mutex_.mutex_, CLOCK_MONOTONIC, &ts); - printf("finish waiting without sem"); - // roc_log(roc::LogDebug, "finish waiting without sem"); - - if (err != 0 && err != ETIMEDOUT) { - roc_panic("semaphore: pthread_cond_timedwait(): %s", - errno_to_str(err).c_str()); - } + err = pthread_cond_timedwait(&cond_, &mutex_.mutex_, &ts); } - if (err == 0) { + bool acquired = (counter_ > 0); + if (acquired) { counter_--; } mutex_.unlock(); - // return false when err == ETIMEDOUT - return (err == 0); + return acquired; } void Semaphore::wait() { @@ -98,4 +88,4 @@ void Semaphore::post() { } } // namespace core -} // namespace roc +} // namespace roc \ No newline at end of file diff --git a/src/internal_modules/roc_pipeline/state_tracker.cpp b/src/internal_modules/roc_pipeline/state_tracker.cpp index aa93da6a6..f32afa09c 100644 --- a/src/internal_modules/roc_pipeline/state_tracker.cpp +++ b/src/internal_modules/roc_pipeline/state_tracker.cpp @@ -42,6 +42,7 @@ bool StateTracker::wait_state(unsigned int state_mask, core::nanoseconds_t deadl mutex_.lock(); for (;;) { + //roc_log(LogDebug, "loop top, state=%d, now=%lld, deadline=%lld", get_state(), core::timestamp(core::ClockMonotonic), deadline); if (static_cast(get_state()) & state_mask) { mutex_.unlock(); return true; diff --git a/src/tests/roc_pipeline/test_semaphore.cpp b/src/tests/roc_pipeline/test_semaphore.cpp index 64cf4e05d..20da85e98 100644 --- a/src/tests/roc_pipeline/test_semaphore.cpp +++ b/src/tests/roc_pipeline/test_semaphore.cpp @@ -11,7 +11,7 @@ #include "roc_address/protocol.h" #include "roc_audio/mixer.h" #include "roc_audio/sample.h" -#include "roc_core/atomic.h" +#include "roc_core/atomic_int.h" #include "roc_core/heap_arena.h" #include "roc_core/noop_arena.h" #include "roc_core/semaphore.h" @@ -60,7 +60,7 @@ class TestThread : public core::Thread { semaphore_->timed_wait(deadline_); r_ = false; } - core::Atomic r_; + core::AtomicInt r_; core::nanoseconds_t deadline_; core::Semaphore* semaphore_; }; @@ -88,17 +88,17 @@ TEST(semaphore, block_test) { TestThread** threads_ptr = new TestThread*[10]; for (int i = 0; i < 10; i++) { threads_ptr[i] = new TestThread( - 1 * core::Second + core::timestamp(core::ClockMonotonic), &sem); + 2 * core::Second + core::timestamp(core::ClockMonotonic), &sem); threads_ptr[i]->start(); } - core::sleep_for(core::ClockMonotonic, core::Millisecond * 100); + core::sleep_for(core::ClockMonotonic, core::Millisecond * 1000); for (int i = 0; i < 10; i++) { CHECK(threads_ptr[i]->running()); } for (int i = 0; i < 10; i++) { sem.post(); } - core::sleep_for(core::ClockMonotonic, core::Millisecond * 100); + core::sleep_for(core::ClockMonotonic, core::Millisecond * 300); for (int i = 0; i < 10; i++) { CHECK(!threads_ptr[i]->running()); } diff --git a/src/tests/roc_pipeline/test_state_tracker.cpp b/src/tests/roc_pipeline/test_state_tracker.cpp index 4c6c69389..73238581d 100644 --- a/src/tests/roc_pipeline/test_state_tracker.cpp +++ b/src/tests/roc_pipeline/test_state_tracker.cpp @@ -76,10 +76,10 @@ TEST_GROUP(state_tracker) {}; TEST(state_tracker, simple_timeout) { StateTracker state_tracker; TestThread thr(state_tracker, sndio::DeviceState_Active, - core::timestamp(core::ClockMonotonic) + core::Millisecond * 500); + core::timestamp(core::ClockMonotonic) + core::Millisecond * 100); CHECK(thr.start()); - core::sleep_for(core::ClockMonotonic, core::Millisecond * 1000); + core::sleep_for(core::ClockMonotonic, core::Millisecond * 500); CHECK(!(thr.running())); thr.join(); } @@ -92,7 +92,7 @@ TEST(state_tracker, multiple_timeout) { for (int i = 0; i < 10; i++) { threads_ptr[i] = new TestThread(state_tracker, sndio::DeviceState_Active, core::timestamp(core::ClockMonotonic) - + core::Millisecond * 1000); + + core::Millisecond * 5000); } // wait for start, then check if threads are running @@ -101,7 +101,7 @@ TEST(state_tracker, multiple_timeout) { // CHECK(threads_ptr[i]->running()); // roc_log(LogDebug, "check running %d\n", i); } - core::sleep_for(core::ClockMonotonic, core::Millisecond * 10); + core::sleep_for(core::ClockMonotonic, core::Millisecond * 1000); for (int i = 0; i < 10; i++) { // roc_log(LogDebug, "check running %d\n", i); CHECK(threads_ptr[i]->running()); @@ -109,7 +109,7 @@ TEST(state_tracker, multiple_timeout) { // sleep for 2 seconds, making the threads timeout roc_log(LogDebug, "started running"); - core::sleep_for(core::ClockMonotonic, core::Millisecond * 2000); + core::sleep_for(core::ClockMonotonic, core::Millisecond * 5000); // check if threads are stopped for (int i = 0; i < 10; i++) { From 59811e291d512241d5f0f21a8a57f9472b58fb8c Mon Sep 17 00:00:00 2001 From: flw5469 Date: Fri, 20 Feb 2026 22:48:28 +0800 Subject: [PATCH 08/15] fix --- .../roc_core/target_nosem/roc_core/semaphore.cpp | 4 ++-- .../roc_pipeline/state_tracker.cpp | 3 ++- src/internal_modules/roc_pipeline/state_tracker.h | 2 +- src/tests/roc_pipeline/test_semaphore.cpp | 14 ++++++++++---- src/tests/roc_pipeline/test_state_tracker.cpp | 9 +++++++-- 5 files changed, 22 insertions(+), 10 deletions(-) diff --git a/src/internal_modules/roc_core/target_nosem/roc_core/semaphore.cpp b/src/internal_modules/roc_core/target_nosem/roc_core/semaphore.cpp index cf2941c09..05dc0e2ce 100644 --- a/src/internal_modules/roc_core/target_nosem/roc_core/semaphore.cpp +++ b/src/internal_modules/roc_core/target_nosem/roc_core/semaphore.cpp @@ -44,10 +44,10 @@ bool Semaphore::timed_wait(nanoseconds_t deadline) { } struct timespec ts; - clock_gettime(CLOCK_MONOTONIC, &ts); - deadline += (nanoseconds_t)ts.tv_sec * Second + (nanoseconds_t)ts.tv_nsec; ts.tv_sec = time_t(deadline / Second); ts.tv_nsec = long(deadline % Second); + // roc_log(LogDebug, "loop top, now=%lld, deadline=%lld", + // core::timestamp(core::ClockMonotonic), deadline); int err = 0; mutex_.lock(); diff --git a/src/internal_modules/roc_pipeline/state_tracker.cpp b/src/internal_modules/roc_pipeline/state_tracker.cpp index f32afa09c..aaa1779f9 100644 --- a/src/internal_modules/roc_pipeline/state_tracker.cpp +++ b/src/internal_modules/roc_pipeline/state_tracker.cpp @@ -42,7 +42,8 @@ bool StateTracker::wait_state(unsigned int state_mask, core::nanoseconds_t deadl mutex_.lock(); for (;;) { - //roc_log(LogDebug, "loop top, state=%d, now=%lld, deadline=%lld", get_state(), core::timestamp(core::ClockMonotonic), deadline); + // roc_log(LogDebug, "loop top, state=%d, now=%lld, deadline=%lld", get_state(), + // core::timestamp(core::ClockMonotonic), deadline); if (static_cast(get_state()) & state_mask) { mutex_.unlock(); return true; diff --git a/src/internal_modules/roc_pipeline/state_tracker.h b/src/internal_modules/roc_pipeline/state_tracker.h index 08a5c2e49..38159a818 100644 --- a/src/internal_modules/roc_pipeline/state_tracker.h +++ b/src/internal_modules/roc_pipeline/state_tracker.h @@ -13,11 +13,11 @@ #define ROC_PIPELINE_STATE_TRACKER_H_ #include "roc_core/atomic_int.h" +#include "roc_core/cond.h" #include "roc_core/noncopyable.h" #include "roc_core/semaphore.h" #include "roc_core/stddefs.h" #include "roc_core/time.h" -#include "roc_core/cond.h" #include "roc_sndio/device_defs.h" namespace roc { diff --git a/src/tests/roc_pipeline/test_semaphore.cpp b/src/tests/roc_pipeline/test_semaphore.cpp index 20da85e98..db9237a09 100644 --- a/src/tests/roc_pipeline/test_semaphore.cpp +++ b/src/tests/roc_pipeline/test_semaphore.cpp @@ -57,7 +57,7 @@ class TestThread : public core::Thread { private: virtual void run() { r_ = true; - semaphore_->timed_wait(deadline_); + (void)semaphore_->timed_wait(deadline_); r_ = false; } core::AtomicInt r_; @@ -89,12 +89,17 @@ TEST(semaphore, block_test) { for (int i = 0; i < 10; i++) { threads_ptr[i] = new TestThread( 2 * core::Second + core::timestamp(core::ClockMonotonic), &sem); - threads_ptr[i]->start(); + (void)threads_ptr[i]->start(); } - core::sleep_for(core::ClockMonotonic, core::Millisecond * 1000); + for (int i = 0; i < 10; i++) { + (void)threads_ptr[i]->wait_running(); + } + roc_log(LogDebug, "finish waiting running"); + core::sleep_for(core::ClockMonotonic, core::Millisecond * 100); for (int i = 0; i < 10; i++) { CHECK(threads_ptr[i]->running()); } + roc_log(LogDebug, "finish checking running"); for (int i = 0; i < 10; i++) { sem.post(); } @@ -102,6 +107,7 @@ TEST(semaphore, block_test) { for (int i = 0; i < 10; i++) { CHECK(!threads_ptr[i]->running()); } + roc_log(LogDebug, "finish all finished running"); for (int i = 0; i < 10; i++) { threads_ptr[i]->join(); } @@ -136,7 +142,7 @@ TEST(semaphore, concurrent_post_and_wait) { for (int i = 0; i < num_threads; i++) { threads_ptr[i] = new TestThread( 1 * core::Second + core::timestamp(core::ClockMonotonic), &sem); - threads_ptr[i]->start(); + (void)threads_ptr[i]->start(); } // Wait for all threads to be running and blocked diff --git a/src/tests/roc_pipeline/test_state_tracker.cpp b/src/tests/roc_pipeline/test_state_tracker.cpp index 73238581d..0d2da08b8 100644 --- a/src/tests/roc_pipeline/test_state_tracker.cpp +++ b/src/tests/roc_pipeline/test_state_tracker.cpp @@ -78,6 +78,7 @@ TEST(state_tracker, simple_timeout) { TestThread thr(state_tracker, sndio::DeviceState_Active, core::timestamp(core::ClockMonotonic) + core::Millisecond * 100); + (void)thr.wait_running(); CHECK(thr.start()); core::sleep_for(core::ClockMonotonic, core::Millisecond * 500); CHECK(!(thr.running())); @@ -94,6 +95,9 @@ TEST(state_tracker, multiple_timeout) { core::timestamp(core::ClockMonotonic) + core::Millisecond * 5000); } + for (int i = 0; i < 10; i++) { + (void)threads_ptr[i]->wait_running(); + } // wait for start, then check if threads are running for (int i = 0; i < 10; i++) { @@ -138,11 +142,12 @@ TEST(state_tracker, multiple_switch) { for (int i = 0; i < 10; i++) { threads_ptr[i] = new TestThread(state_tracker, sndio::DeviceState_Active, -1); } - for (int i = 0; i < 10; i++) { CHECK(threads_ptr[i]->start()); } - + for (int i = 0; i < 10; i++) { + (void)threads_ptr[i]->wait_running(); + } roc_log(LogDebug, "started running"); // wait for threads starting From 67786a1e7186ca6353d16704158bdc36b8bf62af Mon Sep 17 00:00:00 2001 From: flw5469 Date: Sat, 21 Feb 2026 01:49:46 +0800 Subject: [PATCH 09/15] fix bug in test case --- .../roc_core/target_nosem/roc_core/semaphore.cpp | 16 ++++++++++++++-- .../roc_pipeline/state_tracker.cpp | 4 ++-- src/tests/roc_pipeline/test_state_tracker.cpp | 3 +-- 3 files changed, 17 insertions(+), 6 deletions(-) diff --git a/src/internal_modules/roc_core/target_nosem/roc_core/semaphore.cpp b/src/internal_modules/roc_core/target_nosem/roc_core/semaphore.cpp index 05dc0e2ce..1faf5326a 100644 --- a/src/internal_modules/roc_core/target_nosem/roc_core/semaphore.cpp +++ b/src/internal_modules/roc_core/target_nosem/roc_core/semaphore.cpp @@ -22,8 +22,20 @@ Semaphore::Semaphore(unsigned counter) : mutex_() , counter_(counter) , guard_(0) { - if (int err = pthread_cond_init(&cond_, NULL)) { - roc_panic("sem: pthread_cond_init(): %s", errno_to_str(err).c_str()); + pthread_condattr_t attr; + + if (int err = pthread_condattr_init(&attr)) { + roc_panic("semaphore: pthread_condattr_init(): %s", errno_to_str(err).c_str()); + } + if (int err = pthread_condattr_setclock(&attr, CLOCK_MONOTONIC)) { + roc_panic("semaphore: pthread_condattr_setclock(): %s", errno_to_str(err).c_str()); + } + if (int err = pthread_cond_init(&cond_, &attr)) { + roc_panic("semaphore: pthread_cond_init(): %s", errno_to_str(err).c_str()); + } + + if (int err = pthread_condattr_destroy(&attr)) { + roc_panic("semaphore: pthread_condattr_destroy(): %s", errno_to_str(err).c_str()); } } diff --git a/src/internal_modules/roc_pipeline/state_tracker.cpp b/src/internal_modules/roc_pipeline/state_tracker.cpp index aaa1779f9..97709301b 100644 --- a/src/internal_modules/roc_pipeline/state_tracker.cpp +++ b/src/internal_modules/roc_pipeline/state_tracker.cpp @@ -42,8 +42,8 @@ bool StateTracker::wait_state(unsigned int state_mask, core::nanoseconds_t deadl mutex_.lock(); for (;;) { - // roc_log(LogDebug, "loop top, state=%d, now=%lld, deadline=%lld", get_state(), - // core::timestamp(core::ClockMonotonic), deadline); + roc_log(LogDebug, "loop top, state=%d, now=%lld, deadline=%lld", get_state(), + core::timestamp(core::ClockMonotonic), deadline); if (static_cast(get_state()) & state_mask) { mutex_.unlock(); return true; diff --git a/src/tests/roc_pipeline/test_state_tracker.cpp b/src/tests/roc_pipeline/test_state_tracker.cpp index 0d2da08b8..cf594f788 100644 --- a/src/tests/roc_pipeline/test_state_tracker.cpp +++ b/src/tests/roc_pipeline/test_state_tracker.cpp @@ -77,9 +77,8 @@ TEST(state_tracker, simple_timeout) { StateTracker state_tracker; TestThread thr(state_tracker, sndio::DeviceState_Active, core::timestamp(core::ClockMonotonic) + core::Millisecond * 100); - - (void)thr.wait_running(); CHECK(thr.start()); + (void)thr.wait_running(); core::sleep_for(core::ClockMonotonic, core::Millisecond * 500); CHECK(!(thr.running())); thr.join(); From 87fb07e3085c5cde067b1b72848fb08d8c597bcb Mon Sep 17 00:00:00 2001 From: flw5469 Date: Sat, 21 Feb 2026 01:54:23 +0800 Subject: [PATCH 10/15] fix: remove debug log --- src/internal_modules/roc_pipeline/state_tracker.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/internal_modules/roc_pipeline/state_tracker.cpp b/src/internal_modules/roc_pipeline/state_tracker.cpp index 97709301b..aaa1779f9 100644 --- a/src/internal_modules/roc_pipeline/state_tracker.cpp +++ b/src/internal_modules/roc_pipeline/state_tracker.cpp @@ -42,8 +42,8 @@ bool StateTracker::wait_state(unsigned int state_mask, core::nanoseconds_t deadl mutex_.lock(); for (;;) { - roc_log(LogDebug, "loop top, state=%d, now=%lld, deadline=%lld", get_state(), - core::timestamp(core::ClockMonotonic), deadline); + // roc_log(LogDebug, "loop top, state=%d, now=%lld, deadline=%lld", get_state(), + // core::timestamp(core::ClockMonotonic), deadline); if (static_cast(get_state()) & state_mask) { mutex_.unlock(); return true; From 91a45c40c2e5ad1c95c3a065476deff198588b74 Mon Sep 17 00:00:00 2001 From: flw5469 Date: Sat, 21 Feb 2026 02:19:31 +0800 Subject: [PATCH 11/15] fix test cases --- .../roc_core/target_nosem/roc_core/semaphore.cpp | 3 ++- src/tests/roc_pipeline/test_state_tracker.cpp | 6 +++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/src/internal_modules/roc_core/target_nosem/roc_core/semaphore.cpp b/src/internal_modules/roc_core/target_nosem/roc_core/semaphore.cpp index 1faf5326a..39c71530f 100644 --- a/src/internal_modules/roc_core/target_nosem/roc_core/semaphore.cpp +++ b/src/internal_modules/roc_core/target_nosem/roc_core/semaphore.cpp @@ -28,7 +28,8 @@ Semaphore::Semaphore(unsigned counter) roc_panic("semaphore: pthread_condattr_init(): %s", errno_to_str(err).c_str()); } if (int err = pthread_condattr_setclock(&attr, CLOCK_MONOTONIC)) { - roc_panic("semaphore: pthread_condattr_setclock(): %s", errno_to_str(err).c_str()); + roc_panic("semaphore: pthread_condattr_setclock(): %s", + errno_to_str(err).c_str()); } if (int err = pthread_cond_init(&cond_, &attr)) { roc_panic("semaphore: pthread_cond_init(): %s", errno_to_str(err).c_str()); diff --git a/src/tests/roc_pipeline/test_state_tracker.cpp b/src/tests/roc_pipeline/test_state_tracker.cpp index cf594f788..2f0cfc4b8 100644 --- a/src/tests/roc_pipeline/test_state_tracker.cpp +++ b/src/tests/roc_pipeline/test_state_tracker.cpp @@ -94,9 +94,6 @@ TEST(state_tracker, multiple_timeout) { core::timestamp(core::ClockMonotonic) + core::Millisecond * 5000); } - for (int i = 0; i < 10; i++) { - (void)threads_ptr[i]->wait_running(); - } // wait for start, then check if threads are running for (int i = 0; i < 10; i++) { @@ -104,6 +101,9 @@ TEST(state_tracker, multiple_timeout) { // CHECK(threads_ptr[i]->running()); // roc_log(LogDebug, "check running %d\n", i); } + for (int i = 0; i < 10; i++) { + (void)threads_ptr[i]->wait_running(); + } core::sleep_for(core::ClockMonotonic, core::Millisecond * 1000); for (int i = 0; i < 10; i++) { // roc_log(LogDebug, "check running %d\n", i); From 9981c5934c4a80c19b3129dff9846c07dec4092a Mon Sep 17 00:00:00 2001 From: flw5469 Date: Sat, 21 Feb 2026 02:52:28 +0800 Subject: [PATCH 12/15] add macos specific code --- .../target_nosem/roc_core/semaphore.cpp | 57 +++++++++++++++++-- 1 file changed, 51 insertions(+), 6 deletions(-) diff --git a/src/internal_modules/roc_core/target_nosem/roc_core/semaphore.cpp b/src/internal_modules/roc_core/target_nosem/roc_core/semaphore.cpp index 39c71530f..5946d9cb1 100644 --- a/src/internal_modules/roc_core/target_nosem/roc_core/semaphore.cpp +++ b/src/internal_modules/roc_core/target_nosem/roc_core/semaphore.cpp @@ -27,10 +27,14 @@ Semaphore::Semaphore(unsigned counter) if (int err = pthread_condattr_init(&attr)) { roc_panic("semaphore: pthread_condattr_init(): %s", errno_to_str(err).c_str()); } + +#if defined(CLOCK_MONOTONIC) && !defined(__APPLE__) && !defined(__MACH__) if (int err = pthread_condattr_setclock(&attr, CLOCK_MONOTONIC)) { roc_panic("semaphore: pthread_condattr_setclock(): %s", errno_to_str(err).c_str()); } +#endif + if (int err = pthread_cond_init(&cond_, &attr)) { roc_panic("semaphore: pthread_cond_init(): %s", errno_to_str(err).c_str()); } @@ -41,11 +45,34 @@ Semaphore::Semaphore(unsigned counter) } Semaphore::~Semaphore() { + /* Ensure that signal() and broadcast() are not using condvar. + */ while (guard_) { cpu_relax(); } - int err = 0; + int err; + +#if defined(__APPLE__) && defined(__MACH__) + if ((err = pthread_mutex_lock(&mutex_.mutex_))) { + roc_panic("mutex: pthread_mutex_lock(): %s", errno_to_str(err).c_str()); + } + + struct timespec ts; + ts.tv_sec = 0; + ts.tv_nsec = 1; + + err = pthread_cond_timedwait_relative_np(&cond_, &mutex_.mutex_, &ts); + if (err != 0 && err != ETIMEDOUT) { + roc_panic("mutex: pthread_cond_timedwait_relative_np(): %s", + errno_to_str(err).c_str()); + } + + if ((err = pthread_mutex_unlock(&mutex_.mutex_))) { + roc_panic("mutex: pthread_mutex_unlock(): %s", errno_to_str(err).c_str()); + } +#endif + if ((err = pthread_cond_destroy(&cond_))) { roc_panic("sem: pthread_cond_destroy(): %s", errno_to_str(err).c_str()); } @@ -57,15 +84,29 @@ bool Semaphore::timed_wait(nanoseconds_t deadline) { } struct timespec ts; - ts.tv_sec = time_t(deadline / Second); - ts.tv_nsec = long(deadline % Second); - // roc_log(LogDebug, "loop top, now=%lld, deadline=%lld", - // core::timestamp(core::ClockMonotonic), deadline); - int err = 0; + mutex_.lock(); while (err == 0 && counter_ == 0) { +#if defined(__APPLE__) && defined(__MACH__) + // On macOS, convert absolute deadline to relative timeout + nanoseconds_t now = timestamp(ClockMonotonic); + if (deadline <= now) { + err = ETIMEDOUT; + break; + } + nanoseconds_t timeout = deadline - now; + + ts.tv_sec = time_t(timeout / Second); + ts.tv_nsec = long(timeout % Second); + + err = pthread_cond_timedwait_relative_np(&cond_, &mutex_.mutex_, &ts); +#else + ts.tv_sec = time_t(deadline / Second); + ts.tv_nsec = long(deadline % Second); + err = pthread_cond_timedwait(&cond_, &mutex_.mutex_, &ts); +#endif } bool acquired = (counter_ > 0); @@ -74,6 +115,10 @@ bool Semaphore::timed_wait(nanoseconds_t deadline) { } mutex_.unlock(); + if (err != 0 && err != ETIMEDOUT) { + roc_panic("semaphore: pthread_cond_timedwait(): %s", errno_to_str(err).c_str()); + } + return acquired; } From e202d4b5af424daad04e1de49b4aa139f21c44ba Mon Sep 17 00:00:00 2001 From: flw5469 Date: Sat, 21 Feb 2026 02:57:04 +0800 Subject: [PATCH 13/15] fix memory leak --- src/tests/roc_pipeline/test_semaphore.cpp | 57 ++++++++----- src/tests/roc_pipeline/test_state_tracker.cpp | 81 ++++++++++--------- 2 files changed, 80 insertions(+), 58 deletions(-) diff --git a/src/tests/roc_pipeline/test_semaphore.cpp b/src/tests/roc_pipeline/test_semaphore.cpp index db9237a09..66e8b46de 100644 --- a/src/tests/roc_pipeline/test_semaphore.cpp +++ b/src/tests/roc_pipeline/test_semaphore.cpp @@ -44,6 +44,10 @@ class TestThread : public core::Thread { semaphore_ = semaphore; } + ~TestThread() { + join(); + } + bool running() const { return r_; } @@ -85,31 +89,43 @@ TEST(semaphore, timeout_test) { TEST(semaphore, block_test) { core::Semaphore sem(0); roc_log(LogDebug, "ready"); - TestThread** threads_ptr = new TestThread*[10]; - for (int i = 0; i < 10; i++) { - threads_ptr[i] = new TestThread( + + // Use a vector or array of pointers that we'll clean up + const int num_threads = 10; + TestThread* threads[num_threads]; + + for (int i = 0; i < num_threads; i++) { + threads[i] = new TestThread( 2 * core::Second + core::timestamp(core::ClockMonotonic), &sem); - (void)threads_ptr[i]->start(); + (void)threads[i]->start(); } - for (int i = 0; i < 10; i++) { - (void)threads_ptr[i]->wait_running(); + + for (int i = 0; i < num_threads; i++) { + (void)threads[i]->wait_running(); } roc_log(LogDebug, "finish waiting running"); + core::sleep_for(core::ClockMonotonic, core::Millisecond * 100); - for (int i = 0; i < 10; i++) { - CHECK(threads_ptr[i]->running()); + + for (int i = 0; i < num_threads; i++) { + CHECK(threads[i]->running()); } roc_log(LogDebug, "finish checking running"); - for (int i = 0; i < 10; i++) { + + for (int i = 0; i < num_threads; i++) { sem.post(); } + core::sleep_for(core::ClockMonotonic, core::Millisecond * 300); - for (int i = 0; i < 10; i++) { - CHECK(!threads_ptr[i]->running()); + + for (int i = 0; i < num_threads; i++) { + CHECK(!threads[i]->running()); } roc_log(LogDebug, "finish all finished running"); - for (int i = 0; i < 10; i++) { - threads_ptr[i]->join(); + + for (int i = 0; i < num_threads; i++) { + threads[i]->join(); + delete threads[i]; } } @@ -136,19 +152,19 @@ TEST(semaphore, multiple_post_before_wait) { TEST(semaphore, concurrent_post_and_wait) { core::Semaphore sem(0); const int num_threads = 20; - TestThread** threads_ptr = new TestThread*[num_threads]; + TestThread* threads[num_threads]; // Start threads that will wait for (int i = 0; i < num_threads; i++) { - threads_ptr[i] = new TestThread( + threads[i] = new TestThread( 1 * core::Second + core::timestamp(core::ClockMonotonic), &sem); - (void)threads_ptr[i]->start(); + (void)threads[i]->start(); } // Wait for all threads to be running and blocked core::sleep_for(core::ClockMonotonic, core::Millisecond * 100); for (int i = 0; i < num_threads; i++) { - CHECK(threads_ptr[i]->running()); + CHECK(threads[i]->running()); } // Post from multiple iterations to wake them up gradually @@ -159,13 +175,14 @@ TEST(semaphore, concurrent_post_and_wait) { // All threads should eventually finish core::sleep_for(core::ClockMonotonic, core::Millisecond * 200); for (int i = 0; i < num_threads; i++) { - CHECK(!threads_ptr[i]->running()); + CHECK(!threads[i]->running()); } for (int i = 0; i < num_threads; i++) { - threads_ptr[i]->join(); + threads[i]->join(); + delete threads[i]; } } } // namespace pipeline -} // namespace roc +} // namespace roc \ No newline at end of file diff --git a/src/tests/roc_pipeline/test_state_tracker.cpp b/src/tests/roc_pipeline/test_state_tracker.cpp index 2f0cfc4b8..3609adc7c 100644 --- a/src/tests/roc_pipeline/test_state_tracker.cpp +++ b/src/tests/roc_pipeline/test_state_tracker.cpp @@ -45,6 +45,10 @@ class TestThread : public core::Thread { , deadline_(deadline) { } + ~TestThread() { + join(); + } + bool running() const { return r_; } @@ -81,33 +85,32 @@ TEST(state_tracker, simple_timeout) { (void)thr.wait_running(); core::sleep_for(core::ClockMonotonic, core::Millisecond * 500); CHECK(!(thr.running())); - thr.join(); + thr.join(); // Thread will be destroyed automatically when going out of scope } TEST(state_tracker, multiple_timeout) { StateTracker state_tracker; - TestThread** threads_ptr = new TestThread*[10]; - + const int num_threads = 10; + TestThread* threads[num_threads]; + // set threads that last for 1 second - for (int i = 0; i < 10; i++) { - threads_ptr[i] = new TestThread(state_tracker, sndio::DeviceState_Active, + for (int i = 0; i < num_threads; i++) { + threads[i] = new TestThread(state_tracker, sndio::DeviceState_Active, core::timestamp(core::ClockMonotonic) + core::Millisecond * 5000); } // wait for start, then check if threads are running - for (int i = 0; i < 10; i++) { - CHECK(threads_ptr[i]->start()); - // CHECK(threads_ptr[i]->running()); - // roc_log(LogDebug, "check running %d\n", i); + for (int i = 0; i < num_threads; i++) { + CHECK(threads[i]->start()); } - for (int i = 0; i < 10; i++) { - (void)threads_ptr[i]->wait_running(); + for (int i = 0; i < num_threads; i++) { + (void)threads[i]->wait_running(); } + core::sleep_for(core::ClockMonotonic, core::Millisecond * 1000); - for (int i = 0; i < 10; i++) { - // roc_log(LogDebug, "check running %d\n", i); - CHECK(threads_ptr[i]->running()); + for (int i = 0; i < num_threads; i++) { + CHECK(threads[i]->running()); } // sleep for 2 seconds, making the threads timeout @@ -115,37 +118,39 @@ TEST(state_tracker, multiple_timeout) { core::sleep_for(core::ClockMonotonic, core::Millisecond * 5000); // check if threads are stopped - for (int i = 0; i < 10; i++) { - CHECK(!threads_ptr[i]->running()); + for (int i = 0; i < num_threads; i++) { + CHECK(!threads[i]->running()); } roc_log(LogDebug, "started joining"); - for (int i = 0; i < 10; i++) { - threads_ptr[i]->join(); + for (int i = 0; i < num_threads; i++) { + threads[i]->join(); } roc_log(LogDebug, "finished joining"); - for (int i = 0; i < 10; ++i) { - delete threads_ptr[i]; + for (int i = 0; i < num_threads; ++i) { + delete threads[i]; } - delete[] threads_ptr; } TEST(state_tracker, multiple_switch) { StateTracker state_tracker; - TestThread** threads_ptr = new TestThread*[10]; + const int num_threads = 10; + TestThread* threads[num_threads]; // set threads without waiting time - for (int i = 0; i < 10; i++) { - threads_ptr[i] = new TestThread(state_tracker, sndio::DeviceState_Active, -1); + for (int i = 0; i < num_threads; i++) { + threads[i] = new TestThread(state_tracker, sndio::DeviceState_Active, -1); } - for (int i = 0; i < 10; i++) { - CHECK(threads_ptr[i]->start()); + + for (int i = 0; i < num_threads; i++) { + CHECK(threads[i]->start()); } - for (int i = 0; i < 10; i++) { - (void)threads_ptr[i]->wait_running(); + + for (int i = 0; i < num_threads; i++) { + (void)threads[i]->wait_running(); } roc_log(LogDebug, "started running"); @@ -153,8 +158,8 @@ TEST(state_tracker, multiple_switch) { core::sleep_for(core::ClockMonotonic, core::Millisecond * 500); // check if the threads have started - for (int i = 0; i < 10; i++) { - CHECK(threads_ptr[i]->running()); + for (int i = 0; i < num_threads; i++) { + CHECK(threads[i]->running()); } // register a packet @@ -163,20 +168,20 @@ TEST(state_tracker, multiple_switch) { core::sleep_for(core::ClockMonotonic, core::Millisecond * 500); // check if the threads have been stopped - for (int i = 0; i < 10; i++) { - CHECK(!(threads_ptr[i]->running())); + for (int i = 0; i < num_threads; i++) { + CHECK(!(threads[i]->running())); } roc_log(LogDebug, "started joining"); - for (int i = 0; i < 10; i++) { - threads_ptr[i]->join(); + for (int i = 0; i < num_threads; i++) { + threads[i]->join(); } roc_log(LogDebug, "finished joining"); - for (int i = 0; i < 10; ++i) { - delete threads_ptr[i]; + for (int i = 0; i < num_threads; ++i) { + delete threads[i]; } - delete[] threads_ptr; } + } // namespace pipeline -} // namespace roc +} // namespace roc \ No newline at end of file From ec124c935962c5eb1f540183d67474cd0e088fbd Mon Sep 17 00:00:00 2001 From: flw5469 Date: Sat, 21 Feb 2026 02:57:30 +0800 Subject: [PATCH 14/15] format --- .../target_nosem/roc_core/semaphore.cpp | 6 ++--- src/tests/roc_pipeline/test_semaphore.cpp | 22 +++++++++---------- src/tests/roc_pipeline/test_state_tracker.cpp | 20 ++++++++--------- 3 files changed, 24 insertions(+), 24 deletions(-) diff --git a/src/internal_modules/roc_core/target_nosem/roc_core/semaphore.cpp b/src/internal_modules/roc_core/target_nosem/roc_core/semaphore.cpp index 5946d9cb1..3a7103e16 100644 --- a/src/internal_modules/roc_core/target_nosem/roc_core/semaphore.cpp +++ b/src/internal_modules/roc_core/target_nosem/roc_core/semaphore.cpp @@ -96,15 +96,15 @@ bool Semaphore::timed_wait(nanoseconds_t deadline) { break; } nanoseconds_t timeout = deadline - now; - + ts.tv_sec = time_t(timeout / Second); ts.tv_nsec = long(timeout % Second); - + err = pthread_cond_timedwait_relative_np(&cond_, &mutex_.mutex_, &ts); #else ts.tv_sec = time_t(deadline / Second); ts.tv_nsec = long(deadline % Second); - + err = pthread_cond_timedwait(&cond_, &mutex_.mutex_, &ts); #endif } diff --git a/src/tests/roc_pipeline/test_semaphore.cpp b/src/tests/roc_pipeline/test_semaphore.cpp index 66e8b46de..c293da859 100644 --- a/src/tests/roc_pipeline/test_semaphore.cpp +++ b/src/tests/roc_pipeline/test_semaphore.cpp @@ -89,43 +89,43 @@ TEST(semaphore, timeout_test) { TEST(semaphore, block_test) { core::Semaphore sem(0); roc_log(LogDebug, "ready"); - + // Use a vector or array of pointers that we'll clean up const int num_threads = 10; TestThread* threads[num_threads]; - + for (int i = 0; i < num_threads; i++) { threads[i] = new TestThread( 2 * core::Second + core::timestamp(core::ClockMonotonic), &sem); (void)threads[i]->start(); } - + for (int i = 0; i < num_threads; i++) { (void)threads[i]->wait_running(); } roc_log(LogDebug, "finish waiting running"); - + core::sleep_for(core::ClockMonotonic, core::Millisecond * 100); - + for (int i = 0; i < num_threads; i++) { CHECK(threads[i]->running()); } roc_log(LogDebug, "finish checking running"); - + for (int i = 0; i < num_threads; i++) { sem.post(); } - + core::sleep_for(core::ClockMonotonic, core::Millisecond * 300); - + for (int i = 0; i < num_threads; i++) { CHECK(!threads[i]->running()); } roc_log(LogDebug, "finish all finished running"); - + for (int i = 0; i < num_threads; i++) { threads[i]->join(); - delete threads[i]; + delete threads[i]; } } @@ -180,7 +180,7 @@ TEST(semaphore, concurrent_post_and_wait) { for (int i = 0; i < num_threads; i++) { threads[i]->join(); - delete threads[i]; + delete threads[i]; } } diff --git a/src/tests/roc_pipeline/test_state_tracker.cpp b/src/tests/roc_pipeline/test_state_tracker.cpp index 3609adc7c..aaa8f972d 100644 --- a/src/tests/roc_pipeline/test_state_tracker.cpp +++ b/src/tests/roc_pipeline/test_state_tracker.cpp @@ -85,19 +85,19 @@ TEST(state_tracker, simple_timeout) { (void)thr.wait_running(); core::sleep_for(core::ClockMonotonic, core::Millisecond * 500); CHECK(!(thr.running())); - thr.join(); // Thread will be destroyed automatically when going out of scope + thr.join(); // Thread will be destroyed automatically when going out of scope } TEST(state_tracker, multiple_timeout) { StateTracker state_tracker; const int num_threads = 10; TestThread* threads[num_threads]; - + // set threads that last for 1 second for (int i = 0; i < num_threads; i++) { threads[i] = new TestThread(state_tracker, sndio::DeviceState_Active, - core::timestamp(core::ClockMonotonic) - + core::Millisecond * 5000); + core::timestamp(core::ClockMonotonic) + + core::Millisecond * 5000); } // wait for start, then check if threads are running @@ -107,7 +107,7 @@ TEST(state_tracker, multiple_timeout) { for (int i = 0; i < num_threads; i++) { (void)threads[i]->wait_running(); } - + core::sleep_for(core::ClockMonotonic, core::Millisecond * 1000); for (int i = 0; i < num_threads; i++) { CHECK(threads[i]->running()); @@ -131,24 +131,24 @@ TEST(state_tracker, multiple_timeout) { roc_log(LogDebug, "finished joining"); for (int i = 0; i < num_threads; ++i) { - delete threads[i]; + delete threads[i]; } } TEST(state_tracker, multiple_switch) { StateTracker state_tracker; const int num_threads = 10; - TestThread* threads[num_threads]; + TestThread* threads[num_threads]; // set threads without waiting time for (int i = 0; i < num_threads; i++) { threads[i] = new TestThread(state_tracker, sndio::DeviceState_Active, -1); } - + for (int i = 0; i < num_threads; i++) { CHECK(threads[i]->start()); } - + for (int i = 0; i < num_threads; i++) { (void)threads[i]->wait_running(); } @@ -179,7 +179,7 @@ TEST(state_tracker, multiple_switch) { roc_log(LogDebug, "finished joining"); for (int i = 0; i < num_threads; ++i) { - delete threads[i]; + delete threads[i]; } } From 87a42b37b3703852fc748ebc10cb3d67ccd3434e Mon Sep 17 00:00:00 2001 From: flw5469 Date: Sun, 22 Feb 2026 16:41:26 +0800 Subject: [PATCH 15/15] update test case: extend deadline to prevent semaphore expiring before checking counter --- src/tests/roc_pipeline/test_semaphore.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/tests/roc_pipeline/test_semaphore.cpp b/src/tests/roc_pipeline/test_semaphore.cpp index c293da859..e4f385e4f 100644 --- a/src/tests/roc_pipeline/test_semaphore.cpp +++ b/src/tests/roc_pipeline/test_semaphore.cpp @@ -139,7 +139,8 @@ TEST(semaphore, multiple_post_before_wait) { // All waits should succeed immediately without blocking for (int i = 0; i < 5; i++) { - bool result = sem.timed_wait(core::timestamp(core::ClockMonotonic)); + bool result = sem.timed_wait(core::timestamp(core::ClockMonotonic) + + 100 * core::Millisecond); CHECK(result == true); }