-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Description
I struggle to figure out how to drain a queue properly.
tl;dr Sentinel-based approach fails because sending the sentinel last doesn't guarantee it's read last.
I have a multi-producer queue.
I have multiple consumers, however they (blockingly) dequeue under a mutex, so from the queue perspective it's single-consumer:
(This is simplified - I actually use wait_dequeue_bulk with multiple iterations per under single mutex lock, but the core logic is the same)
void consumer() {
while (true) {
Item item;
{
const std::lock_guard<std::mutex> lock(gMutex);
wait_dequeue(&item);
}
process(item);
}
}Now, I want to drain the queue and exit gracefully.
My initial idea was to:
- Make sure producers are stopped.
- Add the sentinel to the queue (from the "main" thread rather than producer).
- When a consumer reads sentinel, it sets the global "exit_now" flag.
bool gExitNow = false;
void consumer() {
while (true) {
Item item;
{
const std::lock_guard<std::mutex> lock(gMutex);
if (gExitNow) return;
wait_dequeue(&item);
if (item == kSentinel) {
gExitNow = true;
return;
}
}
process(item);
}
}However, the documentation says that the order is not guaranteed in multiple-producer use case, so if there are some items in the queue when I add sentinel, it may happen that the sentinel would come up ahead of time and there are some other items in the queue.
So far I have a few options to work around it, but they don't look elegant.
-
After stopping all producers and before sending the sentinel, busy wait until size_approx() == 0. Doesn't sound exiting, and also I'm not sure whether I can rely on size_approx() not returning 0 when there's something in the queue.
-
Before sending sentinel, set another bool
shutdown_modetotrue. Inshutdown_mode, consumers would start to read non-blockingly rather than blockingly, and exit if there's nothing to read. When a consumer reads sentinel, it skips it but doesn't return (it will return on the next iteration when discovers non-blockingly that there's nothing to read).