44#include " ../utils.hpp"
55#include < atomic>
66
7- namespace tsfqueue ::FAST{
7+ namespace tsfqueue ::FAST {
88#if defined(__MACH__)
99#include < mach/mach.h>
10- class Semaphore_FAST {
11- private:
12- semaphore_t sema;
13- Semaphore_FAST (const Semaphore& other) = delete ;
14- Semaphore_FAST (Semaphore&& other) = delete ;
15- public:
16- Semaphore_FAST (int count = 0 ){
17- assert (count >= 0 );
18- kern_return_t ret = semaphore_create (mach_task_self (), &sema, SYNC_POLICY_FIFO, count);
19- assert (ret == KERN_SUCCESS);
20- }
21- ~Semaphore_FAST (){
22- semaphore_destroy (mach_task_self (), sema);
23- }
24- bool try_get (){
25- return timed_get (0 );
26- }
27- bool timed_get (std::uint64_t time_usecs){
28- mach_timespec_t time;
29- time.tv_sec = static_cast <unsigned int >(time_usecs / 1000000 );
30- time.tv_nsec = static_cast <int >((time_usecs % 1000000 ) * 1000 );
31- kern_return_t ret = semaphore_timedwait (sema, time);
32- return ret == KERN_SUCCESS;
33- }
34- void wait_and_get (){
35- semaphore_wait (sema);
36- }
37- void signal (int times = 1 ){
38- while (times--)
39- while (semaphore_signal (sema) != KERN_SUCCESS);
40- }
41- };
10+ class Semaphore_FAST {
11+ private:
12+ semaphore_t sema;
13+ Semaphore_FAST (const Semaphore &other) = delete ;
14+ Semaphore_FAST (Semaphore &&other) = delete ;
15+
16+ public:
17+ Semaphore_FAST (int count = 0 ) {
18+ assert (count >= 0 );
19+ kern_return_t ret =
20+ semaphore_create (mach_task_self (), &sema, SYNC_POLICY_FIFO, count);
21+ assert (ret == KERN_SUCCESS);
22+ }
23+ ~Semaphore_FAST () { semaphore_destroy (mach_task_self (), sema); }
24+ bool try_get () { return timed_get (0 ); }
25+ bool timed_get (std::uint64_t time_usecs) {
26+ mach_timespec_t time;
27+ time.tv_sec = static_cast <unsigned int >(time_usecs / 1000000 );
28+ time.tv_nsec = static_cast <int >((time_usecs % 1000000 ) * 1000 );
29+ kern_return_t ret = semaphore_timedwait (sema, time);
30+ return ret == KERN_SUCCESS;
31+ }
32+ void wait_and_get () { semaphore_wait (sema); }
33+ void signal (int times = 1 ) {
34+ while (times--)
35+ while (semaphore_signal (sema) != KERN_SUCCESS)
36+ ;
37+ }
38+ };
4239#endif
4340
41+ class Slot_FAST {
42+ private:
43+ std::atomic<int > counter;
44+ Semaphore_FAST sema;
45+ static constexpr int SPIN_COUNT =
46+ 1024 ; // Change this for benchmakrs & set to best possible
47+ inline bool hot_path () {
48+ for (int i = 0 ; i < SPIN_COUNT; i++) {
49+ if (counter.load (std::memory_order_acquire) > 0 ) {
50+ counter.fetch_sub (1 );
51+ return true ;
52+ }
53+ }
54+ return false ;
55+ }
56+ bool get_with_sleep (std::int64_t time_usecs = -1 ) {
57+ if (hot_path ())
58+ return true ;
59+ counter.fetch_sub (1 );
60+ if (time_usecs < 0 ) {
61+ sema.wait_and_get () return true ;
62+ }
63+ if (sema.timed_get (static_cast <std::uint64_t >(time_usecs))) {
64+ return true ;
65+ }
4466
45- class Slot_FAST {
46- private:
47- std::atomic<int > counter;
48- Semaphore_FAST sema;
49- static constexpr int SPIN_COUNT = 1024 ; // Change this for benchmakrs & set to best possible
50- inline bool hot_path (){
51- for (int i = 0 ; i < SPIN_COUNT; i++){
52- if (counter.load (std::memory_order_acquire) > 0 ){
53- counter.fetch_sub (1 );
54- return true ;
55- }
56- }
57- return false ;
58- }
59- bool get_with_sleep (std::int64_t time_usecs = -1 ){
60- if (hot_path ()) return true ;
61- counter.fetch_sub (1 );
62- if (time_usecs < 0 ){
63- sema.wait_and_get ()
64- return true ;
65- }
66- if (sema.timed_get (static_cast <std::uint64_t >(time_usecs))){
67- return true ;
68- }
67+ // Restore the semaphore
68+ // Signal happened just after timeout expired
69+ // Thus we add a while loop which keeps checking until all boundary
70+ // conditions are gone. Super clever.
71+ while (true ) {
72+ int old = counter.fetch_add (1 );
73+ if (old < 0 )
74+ return false ; // Restored successfully
75+ old = counter.fetch_sub (1 );
76+ if (old > 0 && sema.try_get ()) {
77+ return true ;
78+ }
79+ }
80+ }
6981
70- // Restore the semaphore
71- // Signal happened just after timeout expired
72- // Thus we add a while loop which keeps checking until all boundary conditions
73- // are gone. Super clever.
74- while (true ){
75- int old = counter.fetch_add (1 );
76- if (old < 0 ) return false ; // Restored successfully
77- old = counter.fetch_sub (1 );
78- if (old > 0 && sema.try_get ()){
79- return true ;
80- }
81- }
82- }
83- public:
84- Slot_FAST (int count) : counter(count), sema(0 ) {}
85- bool try_get (){
86- // Ok since SPSC so counter cannot be decremented between load and fetch_sub
87- if (counter.load (std::memory_order_acquire) > 0 ){
88- counter.fetch_sub (1 );
89- return true ;
90- }
91- return false ;
92- }
93- bool timed_get (std::int64_t time_usecs){
94- return get_with_sleep (time_usecs);
95- }
96- void wait_and_get (){
97- return get_with_sleep ();
98- }
99- void signal (int times = 1 ){
100- int old = counter.fetch_add (1 );
101- if (old < 0 ) sema.signal ();
102- }
82+ public:
83+ Slot_FAST (int count) : counter(count), sema(0 ) {}
84+ bool try_get () {
85+ // Ok since SPSC so counter cannot be decremented between load and fetch_sub
86+ if (counter.load (std::memory_order_acquire) > 0 ) {
87+ counter.fetch_sub (1 );
88+ return true ;
10389 }
104- };
90+ return false ;
91+ }
92+ bool timed_get (std::int64_t time_usecs) { return get_with_sleep (time_usecs); }
93+ void wait_and_get () { return get_with_sleep (); }
94+ void signal (int times = 1 ) {
95+ int old = counter.fetch_add (1 );
96+ if (old < 0 )
97+ sema.signal ();
98+ }
99+ }
100+ }; // namespace tsfqueue::FAST
105101
106102#endif
0 commit comments