diff --git a/quic/common/CMakeLists.txt b/quic/common/CMakeLists.txt index 44dfea018..4973c9dfa 100644 --- a/quic/common/CMakeLists.txt +++ b/quic/common/CMakeLists.txt @@ -151,6 +151,12 @@ mvfst_add_library(mvfst_common_quic_iobuf_queue mvfst_common_quic_buffer ) +mvfst_add_library(mvfst_common_event_fd_queue + EXPORTED_DEPS + Folly::folly + mvfst_common_events_eventbase +) + add_subdirectory(events) add_subdirectory(testutil) add_subdirectory(third-party) diff --git a/quic/common/EventFdQueue.h b/quic/common/EventFdQueue.h new file mode 100644 index 000000000..d221a0a5f --- /dev/null +++ b/quic/common/EventFdQueue.h @@ -0,0 +1,173 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +#pragma once + +#include +#include + +#if __has_include() +#include +#define QUIC_HAS_EVENTFD 1 +#else +#define QUIC_HAS_EVENTFD 0 +#endif + +#include +#include +#include +#include + +#include + +namespace quic { + +/** + * SPSC queue with asynchronous wakeup. + * + * The producer calls enqueue() from one thread and flush() once per batch. + * The consumer is driven by a folly::EventBase; setOnReadable() registers + * the callback fired when items are ready. + * + * Wakeup uses eventfd on Linux (EFD_NONBLOCK; counter semantics coalesce + * multiple flush() calls into one wakeup) and a non-blocking self-pipe on + * platforms without eventfd (e.g. macOS). + */ +template +class EventFdQueue { + public: + EventFdQueue(folly::EventBase* consumerEvb, size_t capacity) + // ProducerConsumerQueue with size N holds N-1 items; add 1 so the + // external-facing capacity is exact. + : queue_(static_cast(capacity + 1)) { + initNotifyFds(); + handler_ = + std::make_unique(this, consumerEvb, readFd()); + } + + // Begin consuming. Must be called from the consumer EventBase thread. + // Calling registerHandler() (which calls event_add/kevent) from an off-thread + // is not safe on macOS kqueue — the filter may not be visible to the waiting + // kevent() call. Modelled on folly::NotificationQueue::Consumer::startConsuming(). + void startConsuming() { + handler_->registerHandler(folly::EventHandler::READ | folly::EventHandler::PERSIST); + } + + ~EventFdQueue() { + handler_->unregisterHandler(); +#if QUIC_HAS_EVENTFD + ::close(eventfd_); +#else + ::close(pipeFds_[0]); + ::close(pipeFds_[1]); +#endif + } + + // Producer thread. Returns false if queue is full. + bool enqueue(T item) { + if (!queue_.write(std::move(item))) { + return false; + } + pendingFlush_ = true; + return true; + } + + // Producer thread. Signal the consumer once if anything was enqueued since + // last flush. Multiple enqueues coalesce into one wakeup. + void flush() { + if (!pendingFlush_) { + return; + } + pendingFlush_ = false; +#if QUIC_HAS_EVENTFD + uint64_t one = 1; + auto ret = ::write(eventfd_, &one, sizeof(one)); + PCHECK(ret == (ssize_t)sizeof(one) || errno == EAGAIN || errno == EWOULDBLOCK); +#else + char one = 1; + auto ret = ::write(pipeFds_[1], &one, 1); + PCHECK(ret == 1 || errno == EAGAIN || errno == EWOULDBLOCK); +#endif + } + + // Consumer setup. Must be called before events start firing. + void setOnReadable(folly::Function cb) { + onReadable_ = std::move(cb); + } + + // Consumer thread. Returns false if queue is empty. + bool dequeue(T& out) { + return queue_.read(out); + } + + // Approximate number of items currently in the queue. Safe to call from any + // thread; uses the same relaxed loads as folly::ProducerConsumerQueue. + size_t sizeGuess() const { + return queue_.sizeGuess(); + } + + private: + void initNotifyFds() { +#if QUIC_HAS_EVENTFD + eventfd_ = ::eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); + PCHECK(eventfd_ >= 0) << "eventfd() failed"; +#else + PCHECK(::pipe(pipeFds_) == 0) << "pipe() failed"; + PCHECK(::fcntl(pipeFds_[0], F_SETFL, O_NONBLOCK) != -1); + PCHECK(::fcntl(pipeFds_[1], F_SETFL, O_NONBLOCK) != -1); +#endif + } + + int readFd() const { +#if QUIC_HAS_EVENTFD + return eventfd_; +#else + return pipeFds_[0]; +#endif + } + + // Drain the wakeup fd so it rearms for the next flush(). + void drainWakeupFd() { +#if QUIC_HAS_EVENTFD + uint64_t val; + while (::read(eventfd_, &val, sizeof(val)) > 0) { + } +#else + char buf[64]; + while (::read(pipeFds_[0], buf, sizeof(buf)) > 0) { + } +#endif + } + + class DrainHandler : public folly::EventHandler { + public: + DrainHandler(EventFdQueue* q, folly::EventBase* evb, int fd) + : folly::EventHandler(evb, folly::NetworkSocket::fromFd(fd)), q_(q) {} + + void handlerReady(uint16_t /*events*/) noexcept override { + q_->drainWakeupFd(); + if (q_->onReadable_) { + q_->onReadable_(); + } + } + + private: + EventFdQueue* q_; + }; + + folly::ProducerConsumerQueue queue_; +#if QUIC_HAS_EVENTFD + int eventfd_{-1}; +#else + int pipeFds_[2]{-1, -1}; +#endif + bool pendingFlush_{false}; + std::unique_ptr handler_; + folly::Function onReadable_; +}; + +} // namespace quic diff --git a/quic/common/test/CMakeLists.txt b/quic/common/test/CMakeLists.txt index 60e1c65c4..1a067d52f 100644 --- a/quic/common/test/CMakeLists.txt +++ b/quic/common/test/CMakeLists.txt @@ -84,3 +84,11 @@ quic_add_test(TARGET QuicCommonUtilTest SOURCES mvfst_test_utils ${BOOST_LIBRARIES} ) + +quic_add_test(TARGET EventFdQueueTest SOURCES + EventFdQueueTest.cpp + DEPENDS + Folly::folly + mvfst_common_event_fd_queue + mvfst_common_events_folly_eventbase +) diff --git a/quic/common/test/EventFdQueueTest.cpp b/quic/common/test/EventFdQueueTest.cpp new file mode 100644 index 000000000..8c9182f14 --- /dev/null +++ b/quic/common/test/EventFdQueueTest.cpp @@ -0,0 +1,178 @@ +/* + * Copyright (c) Meta Platforms, Inc. and affiliates. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +#include + +#include +#include +#include + +#include +#include + +using namespace quic; + +// Items flushed before startConsuming() fires are still delivered because +// eventfd/pipe is level-triggered: once the handler is registered it sees the +// fd is already readable and fires immediately. +TEST(EventFdQueueTest, HandlerFiresAfterStartConsuming) { + folly::ScopedEventBaseThread consumer; + EventFdQueue queue(consumer.getEventBase(), 8); + + folly::Baton<> done; + queue.setOnReadable([&] { + int v; + while (queue.dequeue(v)) { + } + done.post(); + }); + consumer.getEventBase()->runInEventBaseThread([&] { queue.startConsuming(); }); + + queue.enqueue(42); + queue.flush(); + + done.wait(); +} + +TEST(EventFdQueueTest, BasicFifo) { + folly::ScopedEventBaseThread consumer; + EventFdQueue queue(consumer.getEventBase(), 8); + + std::vector received; + folly::Baton<> done; + + consumer.getEventBase()->runInEventBaseThread([&] { + queue.setOnReadable([&] { + int v; + while (queue.dequeue(v)) { + received.push_back(v); + } + if (received.size() == 4) { + done.post(); + } + }); + queue.startConsuming(); + }); + + // Give consumer thread time to register handler + /* sleep override */ std::this_thread::sleep_for(std::chrono::milliseconds(10)); + + EXPECT_TRUE(queue.enqueue(1)); + EXPECT_TRUE(queue.enqueue(2)); + EXPECT_TRUE(queue.enqueue(3)); + EXPECT_TRUE(queue.enqueue(4)); + queue.flush(); + + done.wait(); + + ASSERT_EQ(received.size(), 4u); + EXPECT_EQ(received[0], 1); + EXPECT_EQ(received[1], 2); + EXPECT_EQ(received[2], 3); + EXPECT_EQ(received[3], 4); +} + +// Backpressure is a property of the underlying SPSC queue; no notification +// path (startConsuming / flush) is needed to test it. +TEST(EventFdQueueTest, Backpressure) { + folly::ScopedEventBaseThread consumer; + EventFdQueue queue(consumer.getEventBase(), 4); + + // Fill the queue + EXPECT_TRUE(queue.enqueue(1)); + EXPECT_TRUE(queue.enqueue(2)); + EXPECT_TRUE(queue.enqueue(3)); + EXPECT_TRUE(queue.enqueue(4)); + // Queue is now full + EXPECT_FALSE(queue.enqueue(5)); + + // Drain one slot and verify we can enqueue again + int v; + EXPECT_TRUE(queue.dequeue(v)); + EXPECT_EQ(v, 1); + EXPECT_TRUE(queue.enqueue(5)); +} + +TEST(EventFdQueueTest, Coalescing) { + // Multiple flush() calls while consumer is busy should coalesce into one + // wakeup that drains all items. + folly::ScopedEventBaseThread consumer; + EventFdQueue queue(consumer.getEventBase(), 64); + + std::atomic wakeups{0}; + std::atomic totalDrained{0}; + folly::Baton<> done; + + consumer.getEventBase()->runInEventBaseThread([&] { + queue.setOnReadable([&] { + wakeups.fetch_add(1); + int v; + while (queue.dequeue(v)) { + totalDrained.fetch_add(1); + } + if (totalDrained.load() == 30) { + done.post(); + } + }); + queue.startConsuming(); + }); + + /* sleep override */ std::this_thread::sleep_for(std::chrono::milliseconds(10)); + + // Enqueue 3 batches of 10, flushing after each + for (int batch = 0; batch < 3; batch++) { + for (int i = 0; i < 10; i++) { + EXPECT_TRUE(queue.enqueue(batch * 10 + i)); + } + queue.flush(); + } + + done.wait(); + EXPECT_EQ(totalDrained.load(), 30); + // May be 1, 2, or 3 wakeups depending on scheduling; just verify all items + // arrived and count is at least 1. + EXPECT_GE(wakeups.load(), 1); +} + +TEST(EventFdQueueTest, NoItemLoss) { + folly::ScopedEventBaseThread consumer; + const int total = 200; + EventFdQueue queue(consumer.getEventBase(), 64); + + std::atomic received{0}; + folly::Baton<> done; + + consumer.getEventBase()->runInEventBaseThread([&] { + queue.setOnReadable([&] { + int v; + while (queue.dequeue(v)) { + if (received.fetch_add(1) + 1 == total) { + done.post(); + } + } + }); + queue.startConsuming(); + }); + + /* sleep override */ std::this_thread::sleep_for(std::chrono::milliseconds(10)); + + // Producer sends items in small batches; queue capacity may fill and require + // the consumer to drain before we can continue. + for (int i = 0; i < total; i++) { + while (!queue.enqueue(i)) { + // Queue full — yield and retry + /* sleep override */ std::this_thread::sleep_for(std::chrono::milliseconds(1)); + } + if ((i + 1) % 10 == 0) { + queue.flush(); + } + } + queue.flush(); + + done.wait(); + EXPECT_EQ(received.load(), total); +}