Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion third_party/Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
This folder contains source codes from following open source projects:

* pfr: https://github.com/boostorg/pfr/releases/tag/boost-1.90.0
* moodycamel: https://github.com/cameron314/concurrentqueue/releases/tag/v1.0.4
* moodycamel: https://github.com/cameron314/concurrentqueue/releases/tag/v1.0.5
* rapidjson: https://github.com/Tencent/rapidjson/commit/083f359f5c36198accc2b9360ce1e32a333231d9 with RAPIDJSON_HAS_STDSTRING defined in rapidjson.h
* date: https://github.com/HowardHinnant/date/tree/v3.0.1
* uboost_coro: see uboost_coro/README.md
Expand Down
2 changes: 1 addition & 1 deletion third_party/moodycamel/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
These headers are imported from https://github.com/cameron314/concurrentqueue

Commit: https://github.com/cameron314/concurrentqueue/commit/6dd38b8a1dbaa7863aa907045f32308a56a6ff5d
Commit: https://github.com/cameron314/concurrentqueue/commit/9afb99746f0f5fc94ac8aef737053ae0481ba8d1

# moodycamel::ConcurrentQueue<T>

Expand Down
54 changes: 32 additions & 22 deletions third_party/moodycamel/include/moodycamel/blockingconcurrentqueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,18 @@ class BlockingConcurrentQueue
// includes making the memory effects of construction visible, possibly with a
// memory barrier).
explicit BlockingConcurrentQueue(size_t capacity = 6 * BLOCK_SIZE)
: inner(capacity), sema(create<LightweightSemaphore, ssize_t, int>(0, (int)Traits::MAX_SEMA_SPINS), &BlockingConcurrentQueue::template destroy<LightweightSemaphore>)
: inner(capacity), sema(create<LightweightSemaphore, ssize_t, int>(0, static_cast<int>(Traits::MAX_SEMA_SPINS)), &BlockingConcurrentQueue::template destroy<LightweightSemaphore>)
{
assert(reinterpret_cast<ConcurrentQueue*>((BlockingConcurrentQueue*)1) == &((BlockingConcurrentQueue*)1)->inner && "BlockingConcurrentQueue must have ConcurrentQueue as its first member");
assert(reinterpret_cast<ConcurrentQueue*>(reinterpret_cast<BlockingConcurrentQueue*>(1)) == &(reinterpret_cast<BlockingConcurrentQueue*>(1))->inner && "BlockingConcurrentQueue must have ConcurrentQueue as its first member");
if (!sema) {
MOODYCAMEL_THROW(std::bad_alloc());
}
}

BlockingConcurrentQueue(size_t minCapacity, size_t maxExplicitProducers, size_t maxImplicitProducers)
: inner(minCapacity, maxExplicitProducers, maxImplicitProducers), sema(create<LightweightSemaphore, ssize_t, int>(0, (int)Traits::MAX_SEMA_SPINS), &BlockingConcurrentQueue::template destroy<LightweightSemaphore>)
: inner(minCapacity, maxExplicitProducers, maxImplicitProducers), sema(create<LightweightSemaphore, ssize_t, int>(0, static_cast<int>(Traits::MAX_SEMA_SPINS)), &BlockingConcurrentQueue::template destroy<LightweightSemaphore>)
{
assert(reinterpret_cast<ConcurrentQueue*>((BlockingConcurrentQueue*)1) == &((BlockingConcurrentQueue*)1)->inner && "BlockingConcurrentQueue must have ConcurrentQueue as its first member");
assert(reinterpret_cast<ConcurrentQueue*>(reinterpret_cast<BlockingConcurrentQueue*>(1)) == &(reinterpret_cast<BlockingConcurrentQueue*>(1))->inner && "BlockingConcurrentQueue must have ConcurrentQueue as its first member");
if (!sema) {
MOODYCAMEL_THROW(std::bad_alloc());
}
Expand Down Expand Up @@ -179,12 +179,13 @@ class BlockingConcurrentQueue
inline bool enqueue_bulk(It itemFirst, size_t count)
{
if ((details::likely)(inner.enqueue_bulk(std::forward<It>(itemFirst), count))) {
sema->signal((LightweightSemaphore::ssize_t)(ssize_t)count);
assert(static_cast<ssize_t>(count) >= 0);
sema->signal(static_cast<LightweightSemaphore::ssize_t>(static_cast<ssize_t>(count)));
return true;
}
return false;
}

// Enqueues several items using an explicit producer token.
// Allocates memory if required. Only fails if memory allocation fails
// (or Traits::MAX_SUBQUEUE_SIZE has been defined and would be surpassed).
Expand All @@ -195,7 +196,8 @@ class BlockingConcurrentQueue
inline bool enqueue_bulk(producer_token_t const& token, It itemFirst, size_t count)
{
if ((details::likely)(inner.enqueue_bulk(token, std::forward<It>(itemFirst), count))) {
sema->signal((LightweightSemaphore::ssize_t)(ssize_t)count);
assert(static_cast<ssize_t>(count) >= 0);
sema->signal(static_cast<LightweightSemaphore::ssize_t>(static_cast<ssize_t>(count)));
return true;
}
return false;
Expand Down Expand Up @@ -264,12 +266,13 @@ class BlockingConcurrentQueue
inline bool try_enqueue_bulk(It itemFirst, size_t count)
{
if (inner.try_enqueue_bulk(std::forward<It>(itemFirst), count)) {
sema->signal((LightweightSemaphore::ssize_t)(ssize_t)count);
assert(static_cast<ssize_t>(count) >= 0);
sema->signal(static_cast<LightweightSemaphore::ssize_t>(static_cast<ssize_t>(count)));
return true;
}
return false;
}

// Enqueues several items using an explicit producer token.
// Does not allocate memory. Fails if not enough room to enqueue.
// Note: Use std::make_move_iterator if the elements should be moved
Expand All @@ -279,7 +282,8 @@ class BlockingConcurrentQueue
inline bool try_enqueue_bulk(producer_token_t const& token, It itemFirst, size_t count)
{
if (inner.try_enqueue_bulk(token, std::forward<It>(itemFirst), count)) {
sema->signal((LightweightSemaphore::ssize_t)(ssize_t)count);
assert(static_cast<ssize_t>(count) >= 0);
sema->signal(static_cast<LightweightSemaphore::ssize_t>(static_cast<ssize_t>(count)));
return true;
}
return false;
Expand Down Expand Up @@ -327,13 +331,14 @@ class BlockingConcurrentQueue
inline size_t try_dequeue_bulk(It itemFirst, size_t max)
{
size_t count = 0;
max = (size_t)sema->tryWaitMany((LightweightSemaphore::ssize_t)(ssize_t)max);
assert(static_cast<ssize_t>(max) >= 0);
max = static_cast<size_t>(sema->tryWaitMany(static_cast<LightweightSemaphore::ssize_t>(static_cast<ssize_t>(max))));
while (count != max) {
count += inner.template try_dequeue_bulk<It&>(itemFirst, max - count);
}
return count;
}

// Attempts to dequeue several elements from the queue using an explicit consumer token.
// Returns the number of items actually dequeued.
// Returns 0 if all producer streams appeared empty at the time they
Expand All @@ -343,7 +348,8 @@ class BlockingConcurrentQueue
inline size_t try_dequeue_bulk(consumer_token_t& token, It itemFirst, size_t max)
{
size_t count = 0;
max = (size_t)sema->tryWaitMany((LightweightSemaphore::ssize_t)(ssize_t)max);
assert(static_cast<ssize_t>(max) >= 0);
max = static_cast<size_t>(sema->tryWaitMany(static_cast<LightweightSemaphore::ssize_t>(static_cast<ssize_t>(max))));
while (count != max) {
count += inner.template try_dequeue_bulk<It&>(token, itemFirst, max - count);
}
Expand Down Expand Up @@ -447,13 +453,14 @@ class BlockingConcurrentQueue
inline size_t wait_dequeue_bulk(It itemFirst, size_t max)
{
size_t count = 0;
max = (size_t)sema->waitMany((LightweightSemaphore::ssize_t)(ssize_t)max);
assert(static_cast<ssize_t>(max) >= 0);
max = static_cast<size_t>(sema->waitMany(static_cast<LightweightSemaphore::ssize_t>(static_cast<ssize_t>(max))));
while (count != max) {
count += inner.template try_dequeue_bulk<It&>(itemFirst, max - count);
}
return count;
}

// Attempts to dequeue several elements from the queue.
// Returns the number of items actually dequeued, which can
// be 0 if the timeout expires while waiting for elements,
Expand All @@ -465,7 +472,8 @@ class BlockingConcurrentQueue
inline size_t wait_dequeue_bulk_timed(It itemFirst, size_t max, std::int64_t timeout_usecs)
{
size_t count = 0;
max = (size_t)sema->waitMany((LightweightSemaphore::ssize_t)(ssize_t)max, timeout_usecs);
assert(static_cast<ssize_t>(max) >= 0);
max = static_cast<size_t>(sema->waitMany(static_cast<LightweightSemaphore::ssize_t>(static_cast<ssize_t>(max)), timeout_usecs));
while (count != max) {
count += inner.template try_dequeue_bulk<It&>(itemFirst, max - count);
}
Expand All @@ -492,7 +500,8 @@ class BlockingConcurrentQueue
inline size_t wait_dequeue_bulk(consumer_token_t& token, It itemFirst, size_t max)
{
size_t count = 0;
max = (size_t)sema->waitMany((LightweightSemaphore::ssize_t)(ssize_t)max);
assert(static_cast<ssize_t>(max) >= 0);
max = static_cast<size_t>(sema->waitMany(static_cast<LightweightSemaphore::ssize_t>(static_cast<ssize_t>(max))));
while (count != max) {
count += inner.template try_dequeue_bulk<It&>(token, itemFirst, max - count);
}
Expand All @@ -510,7 +519,8 @@ class BlockingConcurrentQueue
inline size_t wait_dequeue_bulk_timed(consumer_token_t& token, It itemFirst, size_t max, std::int64_t timeout_usecs)
{
size_t count = 0;
max = (size_t)sema->waitMany((LightweightSemaphore::ssize_t)(ssize_t)max, timeout_usecs);
assert(static_cast<ssize_t>(max) >= 0);
max = static_cast<size_t>(sema->waitMany(static_cast<LightweightSemaphore::ssize_t>(static_cast<ssize_t>(max)), timeout_usecs));
while (count != max) {
count += inner.template try_dequeue_bulk<It&>(token, itemFirst, max - count);
}
Expand All @@ -537,7 +547,7 @@ class BlockingConcurrentQueue
// Thread-safe.
inline size_t size_approx() const
{
return (size_t)sema->availableApprox();
return static_cast<size_t>(sema->availableApprox());
}


Expand All @@ -548,7 +558,7 @@ class BlockingConcurrentQueue
{
return ConcurrentQueue::is_lock_free();
}


private:
template<typename U, typename A1, typename A2>
Expand All @@ -557,7 +567,7 @@ class BlockingConcurrentQueue
void* p = (Traits::malloc)(sizeof(U));
return p != nullptr ? new (p) U(std::forward<A1>(a1), std::forward<A2>(a2)) : nullptr;
}

template<typename U>
static inline void destroy(U* p)
{
Expand All @@ -566,7 +576,7 @@ class BlockingConcurrentQueue
}
(Traits::free)(p);
}

private:
ConcurrentQueue inner;
std::unique_ptr<LightweightSemaphore, void (*)(LightweightSemaphore*)> sema;
Expand Down
Loading
Loading