diff --git a/SConstruct b/SConstruct index 63f829862..7a705520d 100644 --- a/SConstruct +++ b/SConstruct @@ -782,10 +782,15 @@ else: 'target_posix_pc', ]) - if meta.platform in ['linux', 'unix', 'android']: - env.Append(ROC_TARGETS=[ - 'target_posix_ext', - ]) + if meta.platform in ['linux', 'android', 'unix']: + if 'ROC_HAVE_SEM_CLOCKWAIT' in env['CPPDEFINES']: + env.Append(ROC_TARGETS=[ + 'target_posix_sem', + ]) + else: + env.Append(ROC_TARGETS=[ + 'target_nosem', + ]) if meta.platform in ['linux', 'unix', 'macos', 'windows', 'android']: env.Append(ROC_TARGETS=[ diff --git a/src/internal_modules/roc_core/target_nosem/roc_core/semaphore.cpp b/src/internal_modules/roc_core/target_nosem/roc_core/semaphore.cpp new file mode 100644 index 000000000..3a7103e16 --- /dev/null +++ b/src/internal_modules/roc_core/target_nosem/roc_core/semaphore.cpp @@ -0,0 +1,149 @@ +/* + * Copyright (c) 2020 Roc Streaming authors + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +#include "roc_core/semaphore.h" +#include "roc_core/cpu_instructions.h" +#include "roc_core/errno_to_str.h" +#include "roc_core/log.h" +#include "roc_core/panic.h" + +#include +#include + +namespace roc { +namespace core { + +Semaphore::Semaphore(unsigned counter) + : mutex_() + , counter_(counter) + , guard_(0) { + pthread_condattr_t attr; + + if (int err = pthread_condattr_init(&attr)) { + roc_panic("semaphore: pthread_condattr_init(): %s", errno_to_str(err).c_str()); + } + +#if defined(CLOCK_MONOTONIC) && !defined(__APPLE__) && !defined(__MACH__) + if (int err = pthread_condattr_setclock(&attr, CLOCK_MONOTONIC)) { + roc_panic("semaphore: pthread_condattr_setclock(): %s", + errno_to_str(err).c_str()); + } +#endif + + if (int err = pthread_cond_init(&cond_, &attr)) { + roc_panic("semaphore: pthread_cond_init(): %s", errno_to_str(err).c_str()); + } + + if (int err = pthread_condattr_destroy(&attr)) { + roc_panic("semaphore: pthread_condattr_destroy(): %s", errno_to_str(err).c_str()); + } +} + +Semaphore::~Semaphore() { + /* Ensure that signal() and broadcast() are not using condvar. + */ + while (guard_) { + cpu_relax(); + } + + int err; + +#if defined(__APPLE__) && defined(__MACH__) + if ((err = pthread_mutex_lock(&mutex_.mutex_))) { + roc_panic("mutex: pthread_mutex_lock(): %s", errno_to_str(err).c_str()); + } + + struct timespec ts; + ts.tv_sec = 0; + ts.tv_nsec = 1; + + err = pthread_cond_timedwait_relative_np(&cond_, &mutex_.mutex_, &ts); + if (err != 0 && err != ETIMEDOUT) { + roc_panic("mutex: pthread_cond_timedwait_relative_np(): %s", + errno_to_str(err).c_str()); + } + + if ((err = pthread_mutex_unlock(&mutex_.mutex_))) { + roc_panic("mutex: pthread_mutex_unlock(): %s", errno_to_str(err).c_str()); + } +#endif + + if ((err = pthread_cond_destroy(&cond_))) { + roc_panic("sem: pthread_cond_destroy(): %s", errno_to_str(err).c_str()); + } +} + +bool Semaphore::timed_wait(nanoseconds_t deadline) { + if (deadline < 0) { + roc_panic("semaphore: unexpected negative deadline"); + } + + struct timespec ts; + int err = 0; + + mutex_.lock(); + while (err == 0 && counter_ == 0) { +#if defined(__APPLE__) && defined(__MACH__) + // On macOS, convert absolute deadline to relative timeout + nanoseconds_t now = timestamp(ClockMonotonic); + if (deadline <= now) { + err = ETIMEDOUT; + break; + } + nanoseconds_t timeout = deadline - now; + + ts.tv_sec = time_t(timeout / Second); + ts.tv_nsec = long(timeout % Second); + + err = pthread_cond_timedwait_relative_np(&cond_, &mutex_.mutex_, &ts); +#else + ts.tv_sec = time_t(deadline / Second); + ts.tv_nsec = long(deadline % Second); + + err = pthread_cond_timedwait(&cond_, &mutex_.mutex_, &ts); +#endif + } + + bool acquired = (counter_ > 0); + if (acquired) { + counter_--; + } + mutex_.unlock(); + + if (err != 0 && err != ETIMEDOUT) { + roc_panic("semaphore: pthread_cond_timedwait(): %s", errno_to_str(err).c_str()); + } + + return acquired; +} + +void Semaphore::wait() { + int err = 0; + mutex_.lock(); + while (err == 0 && counter_ == 0) { + if ((err = pthread_cond_wait(&cond_, &mutex_.mutex_))) { + roc_panic("semaphore: pthread_cond_wait(): %s", errno_to_str(err).c_str()); + } + } + counter_--; + mutex_.unlock(); +} + +void Semaphore::post() { + ++guard_; + mutex_.lock(); + counter_++; + if (int err = pthread_cond_broadcast(&cond_)) { + roc_panic("cond: pthread_cond_broadcast(): %s", errno_to_str(err).c_str()); + } + mutex_.unlock(); + --guard_; +} + +} // namespace core +} // namespace roc \ No newline at end of file diff --git a/src/internal_modules/roc_core/target_nosem/roc_core/semaphore.h b/src/internal_modules/roc_core/target_nosem/roc_core/semaphore.h new file mode 100644 index 000000000..7ae079818 --- /dev/null +++ b/src/internal_modules/roc_core/target_nosem/roc_core/semaphore.h @@ -0,0 +1,55 @@ +/* + * Copyright (c) 2020 Roc Streaming authors + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +//! @file roc_core/target_nosem/roc_core/semaphore.h +//! @brief Semaphore. + +#ifndef ROC_CORE_SEMAPHORE_H_ +#define ROC_CORE_SEMAPHORE_H_ + +#include "roc_core/atomic_int.h" +#include "roc_core/attributes.h" +#include "roc_core/mutex.h" +#include "roc_core/noncopyable.h" +#include "roc_core/time.h" +#include + +namespace roc { +namespace core { + +//! Semaphore. +class Semaphore : public NonCopyable<> { +public: + //! Initialize semaphore with given counter. + explicit Semaphore(unsigned counter = 0); + + ~Semaphore(); + + //! Wait until the counter becomes non-zero, decrement it, and return true. + //! If deadline expires before the counter becomes non-zero, returns false. + //! Deadline should be in the same time domain as core::timestamp(). + ROC_NODISCARD bool timed_wait(nanoseconds_t deadline); + + //! Wait until the counter becomes non-zero, decrement it, and return. + void wait(); + + //! Increment counter and wake up blocked waits. + //! This method is implemented using mutex for platforms where + void post(); + +private: + pthread_cond_t cond_; + core::Mutex mutex_; + AtomicInt counter_; + AtomicInt guard_; +}; + +} // namespace core +} // namespace roc + +#endif // ROC_CORE_SEMAPHORE_H_ diff --git a/src/internal_modules/roc_core/target_posix/roc_core/mutex.h b/src/internal_modules/roc_core/target_posix/roc_core/mutex.h index efc1cecbe..df4d52d86 100644 --- a/src/internal_modules/roc_core/target_posix/roc_core/mutex.h +++ b/src/internal_modules/roc_core/target_posix/roc_core/mutex.h @@ -70,6 +70,7 @@ class Mutex : public NonCopyable<> { private: friend class Cond; + friend class Semaphore; mutable pthread_mutex_t mutex_; mutable AtomicInt guard_; diff --git a/src/internal_modules/roc_core/target_posix_ext/roc_core/semaphore.cpp b/src/internal_modules/roc_core/target_posix_sem/roc_core/semaphore.cpp similarity index 88% rename from src/internal_modules/roc_core/target_posix_ext/roc_core/semaphore.cpp rename to src/internal_modules/roc_core/target_posix_sem/roc_core/semaphore.cpp index 0d9685390..4b888f693 100644 --- a/src/internal_modules/roc_core/target_posix_ext/roc_core/semaphore.cpp +++ b/src/internal_modules/roc_core/target_posix_sem/roc_core/semaphore.cpp @@ -9,6 +9,7 @@ #include "roc_core/semaphore.h" #include "roc_core/cpu_instructions.h" #include "roc_core/errno_to_str.h" +#include "roc_core/log.h" #include "roc_core/panic.h" #include @@ -34,16 +35,17 @@ Semaphore::~Semaphore() { } bool Semaphore::timed_wait(nanoseconds_t deadline) { + printf("Enter function of timed_wait"); if (deadline < 0) { roc_panic("semaphore: unexpected negative deadline"); } - for (;;) { - timespec ts; - ts.tv_sec = long(deadline / Second); - ts.tv_nsec = long(deadline % Second); + timespec ts; + ts.tv_sec = long(deadline / Second); + ts.tv_nsec = long(deadline % Second); - if (sem_timedwait(&sem_, &ts) == 0) { + for (;;) { + if (sem_clockwait(&sem_, CLOCK_MONOTONIC, &ts) == 0) { return true; } diff --git a/src/internal_modules/roc_core/target_posix_ext/roc_core/semaphore.h b/src/internal_modules/roc_core/target_posix_sem/roc_core/semaphore.h similarity index 96% rename from src/internal_modules/roc_core/target_posix_ext/roc_core/semaphore.h rename to src/internal_modules/roc_core/target_posix_sem/roc_core/semaphore.h index 4eab2fd84..e768fb21b 100644 --- a/src/internal_modules/roc_core/target_posix_ext/roc_core/semaphore.h +++ b/src/internal_modules/roc_core/target_posix_sem/roc_core/semaphore.h @@ -6,7 +6,7 @@ * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ -//! @file roc_core/target_posix_ext/roc_core/semaphore.h +//! @file roc_core/target_posix_sem/roc_core/semaphore.h //! @brief Semaphore. #ifndef ROC_CORE_SEMAPHORE_H_ diff --git a/src/internal_modules/roc_pipeline/state_tracker.cpp b/src/internal_modules/roc_pipeline/state_tracker.cpp index bc2b6ef6f..aaa1779f9 100644 --- a/src/internal_modules/roc_pipeline/state_tracker.cpp +++ b/src/internal_modules/roc_pipeline/state_tracker.cpp @@ -7,15 +7,75 @@ */ #include "roc_pipeline/state_tracker.h" +#include "roc_core/log.h" #include "roc_core/panic.h" namespace roc { namespace pipeline { StateTracker::StateTracker() - : halt_state_(-1) + : sem_(0) + , halt_state_(-1) , active_sessions_(0) - , pending_packets_(0) { + , pending_packets_(0) + , sem_is_occupied_(0) + , mutex_() + , waiting_con_(mutex_) { +} + +// StateTracker::~StateTracker() { +// mutex_.unlock(); +// } + +// This method should block until the state becomes any of the states specified by the +// mask, or deadline expires. E.g. if mask is ACTIVE | PAUSED, it should block until state +// becomes either ACTIVE or PAUSED. (Currently only two states are used, but later more +// states will be needed). Deadline should be an absolute timestamp. + +// Questions: +// - When should the function return true vs false +bool StateTracker::wait_state(unsigned int state_mask, core::nanoseconds_t deadline) { + // If no state is specified in state_mask, return immediately + if (state_mask == 0) { + return true; + } + + mutex_.lock(); + for (;;) { + // roc_log(LogDebug, "loop top, state=%d, now=%lld, deadline=%lld", get_state(), + // core::timestamp(core::ClockMonotonic), deadline); + if (static_cast(get_state()) & state_mask) { + mutex_.unlock(); + return true; + } + + if (deadline >= 0 && deadline <= core::timestamp(core::ClockMonotonic)) { + mutex_.unlock(); + return false; + } + + if (sem_is_occupied_.compare_exchange(0, 1)) { + if (deadline >= 0) { + mutex_.unlock(); + (void)sem_.timed_wait(deadline); + + } else { + mutex_.unlock(); + sem_.wait(); + } + + mutex_.lock(); + sem_is_occupied_ = 0; + waiting_con_.broadcast(); + + } else { + if (deadline >= 0) { + (void)waiting_con_.timed_wait(deadline); + } else { + waiting_con_.wait(); + } + } + } } sndio::DeviceState StateTracker::get_state() const { @@ -65,22 +125,50 @@ size_t StateTracker::num_sessions() const { } void StateTracker::register_session() { - active_sessions_++; + if (active_sessions_++ == 0) { + signal_state_change(); + } } void StateTracker::unregister_session() { - if (--active_sessions_ < 0) { + int prev_sessions = active_sessions_--; + if (prev_sessions == 0) { roc_panic("state tracker: unpaired register/unregister session"); + } else if (prev_sessions == 1 && pending_packets_ == 0) { + signal_state_change(); } + + // if (--active_sessions_ < 0) { + // roc_panic("state tracker: unpaired register/unregister session"); + // } } void StateTracker::register_packet() { - pending_packets_++; + if (pending_packets_++ == 0 && active_sessions_ == 0) { + signal_state_change(); + } } void StateTracker::unregister_packet() { - if (--pending_packets_ < 0) { + int prev_packets = pending_packets_--; + if (prev_packets == 0) { roc_panic("state tracker: unpaired register/unregister packet"); + } else if (prev_packets == 1 && active_sessions_ == 0) { + signal_state_change(); + } + + // if (--pending_packets_ < 0) { + // roc_panic("state tracker: unpaired register/unregister packet"); + // } +} + +void StateTracker::signal_state_change() { + // if (waiting_mask_ != 0 && (static_cast(get_state()) & waiting_mask_)) { + // sem_.post(); + // } + if (sem_is_occupied_) { + roc_log(LogDebug, "signaling"); + sem_.post(); } } diff --git a/src/internal_modules/roc_pipeline/state_tracker.h b/src/internal_modules/roc_pipeline/state_tracker.h index b09180b3f..38159a818 100644 --- a/src/internal_modules/roc_pipeline/state_tracker.h +++ b/src/internal_modules/roc_pipeline/state_tracker.h @@ -13,8 +13,11 @@ #define ROC_PIPELINE_STATE_TRACKER_H_ #include "roc_core/atomic_int.h" +#include "roc_core/cond.h" #include "roc_core/noncopyable.h" +#include "roc_core/semaphore.h" #include "roc_core/stddefs.h" +#include "roc_core/time.h" #include "roc_sndio/device_defs.h" namespace roc { @@ -32,6 +35,9 @@ class StateTracker : public core::NonCopyable<> { //! Initialize all counters to zero. StateTracker(); + //! Block until state becomes any of the ones specified by state_mask. + bool wait_state(unsigned state_mask, core::nanoseconds_t deadline); + //! Compute current state. sndio::DeviceState get_state() const; @@ -63,9 +69,15 @@ class StateTracker : public core::NonCopyable<> { void unregister_packet(); private: + core::Semaphore sem_; core::AtomicInt halt_state_; core::AtomicInt active_sessions_; core::AtomicInt pending_packets_; + core::AtomicInt sem_is_occupied_; + core::Mutex mutex_; + core::Cond waiting_con_; + + void signal_state_change(); }; } // namespace pipeline diff --git a/src/tests/roc_pipeline/test_semaphore.cpp b/src/tests/roc_pipeline/test_semaphore.cpp new file mode 100644 index 000000000..e4f385e4f --- /dev/null +++ b/src/tests/roc_pipeline/test_semaphore.cpp @@ -0,0 +1,189 @@ +/* + * Copyright (c) 2023 Roc Streaming authors + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +#include "test_harness.h" + +#include "roc_address/protocol.h" +#include "roc_audio/mixer.h" +#include "roc_audio/sample.h" +#include "roc_core/atomic_int.h" +#include "roc_core/heap_arena.h" +#include "roc_core/noop_arena.h" +#include "roc_core/semaphore.h" +#include "roc_core/thread.h" +#include "roc_core/time.h" +#include "roc_pipeline/config.h" +#include "roc_pipeline/receiver_endpoint.h" +#include "roc_pipeline/receiver_session_group.h" + +namespace roc { +namespace pipeline { + +namespace { + +enum { PacketSz = 512 }; + +core::HeapArena arena; + +packet::PacketFactory packet_factory(arena, PacketSz); +audio::FrameFactory frame_factory(arena, PacketSz * sizeof(audio::sample_t)); + +audio::ProcessorMap processor_map(arena); +rtp::EncodingMap encoding_map(arena); + +class TestThread : public core::Thread { +public: + TestThread(core::nanoseconds_t deadline, core::Semaphore* semaphore) + : r_(0) + , deadline_(deadline) { + semaphore_ = semaphore; + } + + ~TestThread() { + join(); + } + + bool running() const { + return r_; + } + + void wait_running() { + while (!r_) { + core::sleep_for(core::ClockMonotonic, core::Microsecond); + } + } + +private: + virtual void run() { + r_ = true; + (void)semaphore_->timed_wait(deadline_); + r_ = false; + } + core::AtomicInt r_; + core::nanoseconds_t deadline_; + core::Semaphore* semaphore_; +}; + +} // namespace + +TEST_GROUP(semaphore) {}; + +TEST(semaphore, timeout_test) { + core::Semaphore sem(0); + roc_log(LogDebug, "ready"); + bool result = + sem.timed_wait(1 * core::Second + core::timestamp(core::ClockMonotonic)); + if (result) { + roc_log(LogDebug, "true, unlocked by other threads"); + } else { + roc_log(LogDebug, "false, timeout"); + } + CHECK(result == false); +} + +TEST(semaphore, block_test) { + core::Semaphore sem(0); + roc_log(LogDebug, "ready"); + + // Use a vector or array of pointers that we'll clean up + const int num_threads = 10; + TestThread* threads[num_threads]; + + for (int i = 0; i < num_threads; i++) { + threads[i] = new TestThread( + 2 * core::Second + core::timestamp(core::ClockMonotonic), &sem); + (void)threads[i]->start(); + } + + for (int i = 0; i < num_threads; i++) { + (void)threads[i]->wait_running(); + } + roc_log(LogDebug, "finish waiting running"); + + core::sleep_for(core::ClockMonotonic, core::Millisecond * 100); + + for (int i = 0; i < num_threads; i++) { + CHECK(threads[i]->running()); + } + roc_log(LogDebug, "finish checking running"); + + for (int i = 0; i < num_threads; i++) { + sem.post(); + } + + core::sleep_for(core::ClockMonotonic, core::Millisecond * 300); + + for (int i = 0; i < num_threads; i++) { + CHECK(!threads[i]->running()); + } + roc_log(LogDebug, "finish all finished running"); + + for (int i = 0; i < num_threads; i++) { + threads[i]->join(); + delete threads[i]; + } +} + +TEST(semaphore, multiple_post_before_wait) { + core::Semaphore sem(0); + + // Post multiple times before any waits + for (int i = 0; i < 5; i++) { + sem.post(); + } + + // All waits should succeed immediately without blocking + for (int i = 0; i < 5; i++) { + bool result = sem.timed_wait(core::timestamp(core::ClockMonotonic) + + 100 * core::Millisecond); + CHECK(result == true); + } + + // Next wait should timeout since counter is back to 0 + bool result = + sem.timed_wait(100 * core::Millisecond + core::timestamp(core::ClockMonotonic)); + CHECK(result == false); +} + +TEST(semaphore, concurrent_post_and_wait) { + core::Semaphore sem(0); + const int num_threads = 20; + TestThread* threads[num_threads]; + + // Start threads that will wait + for (int i = 0; i < num_threads; i++) { + threads[i] = new TestThread( + 1 * core::Second + core::timestamp(core::ClockMonotonic), &sem); + (void)threads[i]->start(); + } + + // Wait for all threads to be running and blocked + core::sleep_for(core::ClockMonotonic, core::Millisecond * 100); + for (int i = 0; i < num_threads; i++) { + CHECK(threads[i]->running()); + } + + // Post from multiple iterations to wake them up gradually + for (int i = 0; i < num_threads; i++) { + sem.post(); + } + + // All threads should eventually finish + core::sleep_for(core::ClockMonotonic, core::Millisecond * 200); + for (int i = 0; i < num_threads; i++) { + CHECK(!threads[i]->running()); + } + + for (int i = 0; i < num_threads; i++) { + threads[i]->join(); + delete threads[i]; + } +} + +} // namespace pipeline +} // namespace roc \ No newline at end of file diff --git a/src/tests/roc_pipeline/test_state_tracker.cpp b/src/tests/roc_pipeline/test_state_tracker.cpp new file mode 100644 index 000000000..aaa8f972d --- /dev/null +++ b/src/tests/roc_pipeline/test_state_tracker.cpp @@ -0,0 +1,187 @@ +/* + * Copyright (c) 2023 Roc Streaming authors + * + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ + +#include "test_harness.h" + +#include "roc_address/protocol.h" +#include "roc_audio/mixer.h" +#include "roc_audio/sample.h" +#include "roc_core/atomic_int.h" +#include "roc_core/heap_arena.h" +#include "roc_core/noop_arena.h" +#include "roc_core/semaphore.h" +#include "roc_core/thread.h" +#include "roc_core/time.h" +#include "roc_pipeline/config.h" +#include "roc_pipeline/receiver_endpoint.h" +#include "roc_pipeline/receiver_session_group.h" + +namespace roc { +namespace pipeline { + +namespace { + +enum { PacketSz = 512 }; + +core::HeapArena arena; + +packet::PacketFactory packet_factory(arena, PacketSz); +audio::FrameFactory frame_factory(arena, PacketSz * sizeof(audio::sample_t)); + +audio::ProcessorMap processor_map(arena); +rtp::EncodingMap encoding_map(arena); + +class TestThread : public core::Thread { +public: + TestThread(StateTracker& st, unsigned int state_mask, core::nanoseconds_t deadline) + : t_(st) + , r_(0) + , state_mask_(state_mask) + , deadline_(deadline) { + } + + ~TestThread() { + join(); + } + + bool running() const { + return r_; + } + + void wait_running() { + while (!r_) { + core::sleep_for(core::ClockMonotonic, core::Microsecond); + } + } + +private: + void run() { + r_ = true; + t_.wait_state(state_mask_, deadline_); + r_ = false; + } + + StateTracker& t_; + core::AtomicInt r_; + unsigned int state_mask_; + core::nanoseconds_t deadline_; +}; + +} // namespace + +TEST_GROUP(state_tracker) {}; + +// set a thread that last for 0.5 seconds, wait for 1 second to make it timeout. +TEST(state_tracker, simple_timeout) { + StateTracker state_tracker; + TestThread thr(state_tracker, sndio::DeviceState_Active, + core::timestamp(core::ClockMonotonic) + core::Millisecond * 100); + CHECK(thr.start()); + (void)thr.wait_running(); + core::sleep_for(core::ClockMonotonic, core::Millisecond * 500); + CHECK(!(thr.running())); + thr.join(); // Thread will be destroyed automatically when going out of scope +} + +TEST(state_tracker, multiple_timeout) { + StateTracker state_tracker; + const int num_threads = 10; + TestThread* threads[num_threads]; + + // set threads that last for 1 second + for (int i = 0; i < num_threads; i++) { + threads[i] = new TestThread(state_tracker, sndio::DeviceState_Active, + core::timestamp(core::ClockMonotonic) + + core::Millisecond * 5000); + } + + // wait for start, then check if threads are running + for (int i = 0; i < num_threads; i++) { + CHECK(threads[i]->start()); + } + for (int i = 0; i < num_threads; i++) { + (void)threads[i]->wait_running(); + } + + core::sleep_for(core::ClockMonotonic, core::Millisecond * 1000); + for (int i = 0; i < num_threads; i++) { + CHECK(threads[i]->running()); + } + + // sleep for 2 seconds, making the threads timeout + roc_log(LogDebug, "started running"); + core::sleep_for(core::ClockMonotonic, core::Millisecond * 5000); + + // check if threads are stopped + for (int i = 0; i < num_threads; i++) { + CHECK(!threads[i]->running()); + } + + roc_log(LogDebug, "started joining"); + + for (int i = 0; i < num_threads; i++) { + threads[i]->join(); + } + + roc_log(LogDebug, "finished joining"); + + for (int i = 0; i < num_threads; ++i) { + delete threads[i]; + } +} + +TEST(state_tracker, multiple_switch) { + StateTracker state_tracker; + const int num_threads = 10; + TestThread* threads[num_threads]; + + // set threads without waiting time + for (int i = 0; i < num_threads; i++) { + threads[i] = new TestThread(state_tracker, sndio::DeviceState_Active, -1); + } + + for (int i = 0; i < num_threads; i++) { + CHECK(threads[i]->start()); + } + + for (int i = 0; i < num_threads; i++) { + (void)threads[i]->wait_running(); + } + roc_log(LogDebug, "started running"); + + // wait for threads starting + core::sleep_for(core::ClockMonotonic, core::Millisecond * 500); + + // check if the threads have started + for (int i = 0; i < num_threads; i++) { + CHECK(threads[i]->running()); + } + + // register a packet + core::sleep_for(core::ClockMonotonic, core::Millisecond * 500); + state_tracker.register_packet(); + core::sleep_for(core::ClockMonotonic, core::Millisecond * 500); + + // check if the threads have been stopped + for (int i = 0; i < num_threads; i++) { + CHECK(!(threads[i]->running())); + } + + roc_log(LogDebug, "started joining"); + for (int i = 0; i < num_threads; i++) { + threads[i]->join(); + } + roc_log(LogDebug, "finished joining"); + + for (int i = 0; i < num_threads; ++i) { + delete threads[i]; + } +} + +} // namespace pipeline +} // namespace roc \ No newline at end of file