Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion src/fibers/fiber.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -745,6 +745,9 @@ bool FiberScheduler::ProcessorState::enqueueIo(IoFuture * future, Setup && setup
{
setup(sqe);

// Contract: when future is non-null, enqueueIo writes it as user_data after
// setup runs; when future is null, the setup callback is responsible for
// setting user_data itself (typically to CQE_TAG_CANCEL to ignore the CQE).
if (future)
{
::io_uring_sqe_set_data(sqe, future);
Expand Down Expand Up @@ -1263,7 +1266,13 @@ void FiberScheduler::poll(int fd, uint32_t events, uint64_t * triggeredEvents, I
void FiberScheduler::cancelIo(IoFuture * future) noexcept
{
future->result = nullptr;
enqueueIo(nullptr, [=](io_uring_sqe * sqe) noexcept { ::io_uring_prep_cancel(sqe, future, 0); });
enqueueIo(
nullptr,
[=](io_uring_sqe * sqe) noexcept
{
::io_uring_prep_cancel(sqe, future, 0);
::io_uring_sqe_set_data64(sqe, CQE_TAG_CANCEL);
});
}

void FiberScheduler::sleep(uint64_t nanoseconds, SleepFuture * future) noexcept
Expand Down
87 changes: 87 additions & 0 deletions src/fibers/tests/fiber-test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -735,6 +735,93 @@ TEST(Fiber, WorkStealing)
EXPECT_FALSE(cpus.empty());
}

// Regression test: cancelIo must explicitly set CQE_TAG_CANCEL on the cancel
// SQE. io_uring_initialize_sqe does not clear user_data; SQ ring slots rotate
// after enough submissions, so a cancel SQE that omits set_data inherits a
// stale IoFuture* from a previously-completed op. handleCompletionQueue would
// then dispatch the cancel CQE as a real IO completion -- writing through
// future->result and signalling a future that has already returned to the
// caller, possibly overwriting unrelated stack memory.
//
// The pattern: drain >128 IOs to rotate slots, then cancel an IO and verify
// previously-completed futures are not re-touched. Without the fix, cancel's
// success CQE (res=0) writes 0 through a stale result pointer, clobbering the
// sentinel below.
TEST(Fiber, cancelDoesNotResignalCompletedFutures)
{
int fds[2];
int r = ::pipe(fds);
ASSERT_EQ(r, 0);

struct Params
{
int readFd;
int writeFd;

static int fiberMain(Params * p) noexcept
{
// Submit and complete enough polls to wrap the 128-entry SQ ring
// at least once, leaving stale IoFuture* leftovers in user_data.
static constexpr uint32_t COUNT = 256;
FiberScheduler::IoFuture futures[COUNT];
uint64_t triggered[COUNT] = {};

for (uint32_t i = 0; i < COUNT; ++i)
{
FiberScheduler::poll(p->readFd, POLLIN, &triggered[i], &futures[i]);
}

char byte = 1;
ssize_t written = ::write(p->writeFd, &byte, 1);
EXPECT_EQ(written, 1);

for (uint32_t i = 0; i < COUNT; ++i)
{
futures[i].wait();
}

// Drain the byte so the next poll blocks.
char drainBuf;
ssize_t bytesRead = ::read(p->readFd, &drainBuf, 1);
EXPECT_EQ(bytesRead, 1);

// Reset every result-pointer slot to a recognizable sentinel.
// If a stale-user_data cancel CQE is dispatched as a real IO
// completion, *future->result is overwritten with cqe->res (0 on
// cancel-success), erasing the sentinel.
static constexpr uint64_t SENTINEL = 0xCAFEBABE;
for (uint32_t i = 0; i < COUNT; ++i)
{
triggered[i] = SENTINEL;
}

// Submit a fresh poll into a slot whose user_data is now a stale
// pointer to one of the completed futures above; cancel it.
uint64_t cancelTriggered = 0;
FiberScheduler::IoFuture cancelFuture;
FiberScheduler::poll(p->readFd, POLLIN, &cancelTriggered, &cancelFuture);
cancelFuture.cancel();
int cancelResult = cancelFuture.wait();
EXPECT_TRUE(cancelResult == ECANCELED || cancelResult == 0);

// Sentinels must be intact: no stale-user_data CQE was dispatched
// as a real IO completion.
for (uint32_t i = 0; i < COUNT; ++i)
{
EXPECT_EQ(triggered[i], SENTINEL) << "future " << i << " was re-touched";
}

return 0;
}
};

r = FiberScheduler::run(Params::fiberMain, {fds[0], fds[1]});
ASSERT_EQ(r, 0);

::close(fds[0]);
::close(fds[1]);
}

// A single fiber posting more async polls than the SQE ring capacity (128)
// without waiting exposes exhaustion: with SQPOLL the kernel thread is pinned
// to the same CPU and cannot consume SQEs while the fiber runs.
Expand Down
Loading