Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions quic/common/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,12 @@ mvfst_add_library(mvfst_common_quic_iobuf_queue
mvfst_common_quic_buffer
)

mvfst_add_library(mvfst_common_event_fd_queue
EXPORTED_DEPS
Folly::folly
mvfst_common_events_eventbase
)

add_subdirectory(events)
add_subdirectory(testutil)
add_subdirectory(third-party)
Expand Down
173 changes: 173 additions & 0 deletions quic/common/EventFdQueue.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/

#pragma once

#include <fcntl.h>
#include <unistd.h>

#if __has_include(<sys/eventfd.h>)
#include <sys/eventfd.h>
#define QUIC_HAS_EVENTFD 1
#else
#define QUIC_HAS_EVENTFD 0
#endif

#include <folly/Function.h>
#include <folly/ProducerConsumerQueue.h>
#include <folly/io/async/EventBase.h>
#include <folly/io/async/EventHandler.h>

#include <glog/logging.h>

namespace quic {

/**
* SPSC queue with asynchronous wakeup.
*
* The producer calls enqueue() from one thread and flush() once per batch.
* The consumer is driven by a folly::EventBase; setOnReadable() registers
* the callback fired when items are ready.
*
* Wakeup uses eventfd on Linux (EFD_NONBLOCK; counter semantics coalesce
* multiple flush() calls into one wakeup) and a non-blocking self-pipe on
* platforms without eventfd (e.g. macOS).
*/
template <typename T>
class EventFdQueue {
public:
EventFdQueue(folly::EventBase* consumerEvb, size_t capacity)
// ProducerConsumerQueue with size N holds N-1 items; add 1 so the
// external-facing capacity is exact.
: queue_(static_cast<uint32_t>(capacity + 1)) {
initNotifyFds();
handler_ =
std::make_unique<DrainHandler>(this, consumerEvb, readFd());
}

// Begin consuming. Must be called from the consumer EventBase thread.
// Calling registerHandler() (which calls event_add/kevent) from an off-thread
// is not safe on macOS kqueue — the filter may not be visible to the waiting
// kevent() call. Modelled on folly::NotificationQueue::Consumer::startConsuming().
void startConsuming() {
handler_->registerHandler(folly::EventHandler::READ | folly::EventHandler::PERSIST);
}

~EventFdQueue() {
handler_->unregisterHandler();
#if QUIC_HAS_EVENTFD
::close(eventfd_);
#else
::close(pipeFds_[0]);
::close(pipeFds_[1]);
#endif
}

// Producer thread. Returns false if queue is full.
bool enqueue(T item) {
if (!queue_.write(std::move(item))) {
return false;
}
pendingFlush_ = true;
return true;
}

// Producer thread. Signal the consumer once if anything was enqueued since
// last flush. Multiple enqueues coalesce into one wakeup.
void flush() {
if (!pendingFlush_) {
return;
}
pendingFlush_ = false;
#if QUIC_HAS_EVENTFD
uint64_t one = 1;
auto ret = ::write(eventfd_, &one, sizeof(one));
PCHECK(ret == (ssize_t)sizeof(one) || errno == EAGAIN || errno == EWOULDBLOCK);
#else
char one = 1;
auto ret = ::write(pipeFds_[1], &one, 1);
PCHECK(ret == 1 || errno == EAGAIN || errno == EWOULDBLOCK);
#endif
}

// Consumer setup. Must be called before events start firing.
void setOnReadable(folly::Function<void()> cb) {
onReadable_ = std::move(cb);
}

// Consumer thread. Returns false if queue is empty.
bool dequeue(T& out) {
return queue_.read(out);
}

// Approximate number of items currently in the queue. Safe to call from any
// thread; uses the same relaxed loads as folly::ProducerConsumerQueue.
size_t sizeGuess() const {
return queue_.sizeGuess();
}

private:
void initNotifyFds() {
#if QUIC_HAS_EVENTFD
eventfd_ = ::eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);
PCHECK(eventfd_ >= 0) << "eventfd() failed";
#else
PCHECK(::pipe(pipeFds_) == 0) << "pipe() failed";
PCHECK(::fcntl(pipeFds_[0], F_SETFL, O_NONBLOCK) != -1);
PCHECK(::fcntl(pipeFds_[1], F_SETFL, O_NONBLOCK) != -1);
#endif
}

int readFd() const {
#if QUIC_HAS_EVENTFD
return eventfd_;
#else
return pipeFds_[0];
#endif
}

// Drain the wakeup fd so it rearms for the next flush().
void drainWakeupFd() {
#if QUIC_HAS_EVENTFD
uint64_t val;
while (::read(eventfd_, &val, sizeof(val)) > 0) {
}
#else
char buf[64];
while (::read(pipeFds_[0], buf, sizeof(buf)) > 0) {
}
#endif
}

class DrainHandler : public folly::EventHandler {
public:
DrainHandler(EventFdQueue* q, folly::EventBase* evb, int fd)
: folly::EventHandler(evb, folly::NetworkSocket::fromFd(fd)), q_(q) {}

void handlerReady(uint16_t /*events*/) noexcept override {
q_->drainWakeupFd();
if (q_->onReadable_) {
q_->onReadable_();
}
}

private:
EventFdQueue* q_;
};

folly::ProducerConsumerQueue<T> queue_;
#if QUIC_HAS_EVENTFD
int eventfd_{-1};
#else
int pipeFds_[2]{-1, -1};
#endif
bool pendingFlush_{false};
std::unique_ptr<DrainHandler> handler_;
folly::Function<void()> onReadable_;
};

} // namespace quic
8 changes: 8 additions & 0 deletions quic/common/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,11 @@ quic_add_test(TARGET QuicCommonUtilTest SOURCES
mvfst_test_utils
${BOOST_LIBRARIES}
)

quic_add_test(TARGET EventFdQueueTest SOURCES
EventFdQueueTest.cpp
DEPENDS
Folly::folly
mvfst_common_event_fd_queue
mvfst_common_events_folly_eventbase
)
178 changes: 178 additions & 0 deletions quic/common/test/EventFdQueueTest.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/

#include <quic/common/EventFdQueue.h>

#include <atomic>
#include <thread>
#include <vector>

#include <folly/io/async/ScopedEventBaseThread.h>
#include <gtest/gtest.h>

using namespace quic;

// Items flushed before startConsuming() fires are still delivered because
// eventfd/pipe is level-triggered: once the handler is registered it sees the
// fd is already readable and fires immediately.
TEST(EventFdQueueTest, HandlerFiresAfterStartConsuming) {
folly::ScopedEventBaseThread consumer;
EventFdQueue<int> queue(consumer.getEventBase(), 8);

folly::Baton<> done;
queue.setOnReadable([&] {
int v;
while (queue.dequeue(v)) {
}
done.post();
});
consumer.getEventBase()->runInEventBaseThread([&] { queue.startConsuming(); });

queue.enqueue(42);
queue.flush();

done.wait();
}

TEST(EventFdQueueTest, BasicFifo) {
folly::ScopedEventBaseThread consumer;
EventFdQueue<int> queue(consumer.getEventBase(), 8);

std::vector<int> received;
folly::Baton<> done;

consumer.getEventBase()->runInEventBaseThread([&] {
queue.setOnReadable([&] {
int v;
while (queue.dequeue(v)) {
received.push_back(v);
}
if (received.size() == 4) {
done.post();
}
});
queue.startConsuming();
});

// Give consumer thread time to register handler
/* sleep override */ std::this_thread::sleep_for(std::chrono::milliseconds(10));

EXPECT_TRUE(queue.enqueue(1));
EXPECT_TRUE(queue.enqueue(2));
EXPECT_TRUE(queue.enqueue(3));
EXPECT_TRUE(queue.enqueue(4));
queue.flush();

done.wait();

ASSERT_EQ(received.size(), 4u);
EXPECT_EQ(received[0], 1);
EXPECT_EQ(received[1], 2);
EXPECT_EQ(received[2], 3);
EXPECT_EQ(received[3], 4);
}

// Backpressure is a property of the underlying SPSC queue; no notification
// path (startConsuming / flush) is needed to test it.
TEST(EventFdQueueTest, Backpressure) {
folly::ScopedEventBaseThread consumer;
EventFdQueue<int> queue(consumer.getEventBase(), 4);

// Fill the queue
EXPECT_TRUE(queue.enqueue(1));
EXPECT_TRUE(queue.enqueue(2));
EXPECT_TRUE(queue.enqueue(3));
EXPECT_TRUE(queue.enqueue(4));
// Queue is now full
EXPECT_FALSE(queue.enqueue(5));

// Drain one slot and verify we can enqueue again
int v;
EXPECT_TRUE(queue.dequeue(v));
EXPECT_EQ(v, 1);
EXPECT_TRUE(queue.enqueue(5));
}

TEST(EventFdQueueTest, Coalescing) {
// Multiple flush() calls while consumer is busy should coalesce into one
// wakeup that drains all items.
folly::ScopedEventBaseThread consumer;
EventFdQueue<int> queue(consumer.getEventBase(), 64);

std::atomic<int> wakeups{0};
std::atomic<int> totalDrained{0};
folly::Baton<> done;

consumer.getEventBase()->runInEventBaseThread([&] {
queue.setOnReadable([&] {
wakeups.fetch_add(1);
int v;
while (queue.dequeue(v)) {
totalDrained.fetch_add(1);
}
if (totalDrained.load() == 30) {
done.post();
}
});
queue.startConsuming();
});

/* sleep override */ std::this_thread::sleep_for(std::chrono::milliseconds(10));

// Enqueue 3 batches of 10, flushing after each
for (int batch = 0; batch < 3; batch++) {
for (int i = 0; i < 10; i++) {
EXPECT_TRUE(queue.enqueue(batch * 10 + i));
}
queue.flush();
}

done.wait();
EXPECT_EQ(totalDrained.load(), 30);
// May be 1, 2, or 3 wakeups depending on scheduling; just verify all items
// arrived and count is at least 1.
EXPECT_GE(wakeups.load(), 1);
}

TEST(EventFdQueueTest, NoItemLoss) {
folly::ScopedEventBaseThread consumer;
const int total = 200;
EventFdQueue<int> queue(consumer.getEventBase(), 64);

std::atomic<int> received{0};
folly::Baton<> done;

consumer.getEventBase()->runInEventBaseThread([&] {
queue.setOnReadable([&] {
int v;
while (queue.dequeue(v)) {
if (received.fetch_add(1) + 1 == total) {
done.post();
}
}
});
queue.startConsuming();
});

/* sleep override */ std::this_thread::sleep_for(std::chrono::milliseconds(10));

// Producer sends items in small batches; queue capacity may fill and require
// the consumer to drain before we can continue.
for (int i = 0; i < total; i++) {
while (!queue.enqueue(i)) {
// Queue full — yield and retry
/* sleep override */ std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
if ((i + 1) % 10 == 0) {
queue.flush();
}
}
queue.flush();

done.wait();
EXPECT_EQ(received.load(), total);
}
Loading