From d17a0c4f6ee53b2132c8e4a966ec483372d00ee8 Mon Sep 17 00:00:00 2001 From: Patrik Wenger Date: Wed, 25 Mar 2026 19:10:02 +0100 Subject: [PATCH 1/3] flush pending writes before reading from buffer When read-ahead pulls more data than requested, subsequent reads can complete from the buffer without calling fill_read_buffer. This skips the flush call inside fill_read_buffer, leaving pending writes unsent. In bidirectional protocols (e.g. ZMTP handshake), this causes deadlock: fiber A's write sits in the buffer while fiber B blocks waiting for it. Move the flush to the top of #read so it always runs, regardless of whether the read buffer already has enough data. --- lib/io/stream/readable.rb | 6 ++++++ test/io/stream/buffered.rb | 25 +++++++++++++++++++++++++ 2 files changed, 31 insertions(+) diff --git a/lib/io/stream/readable.rb b/lib/io/stream/readable.rb index e627ee1..cd68f52 100644 --- a/lib/io/stream/readable.rb +++ b/lib/io/stream/readable.rb @@ -97,6 +97,12 @@ def read(size = nil, buffer = nil) end if size + # Ensure pending writes are flushed before we read, even when the + # read buffer already has enough data from a previous read-ahead. + # Without this, a bidirectional protocol can deadlock: our write + # sits in the buffer while the peer blocks waiting for it. + flush + until @finished or @read_buffer.bytesize >= size # Compute the amount of data we need to read from the underlying stream: read_size = size - @read_buffer.bytesize diff --git a/test/io/stream/buffered.rb b/test/io/stream/buffered.rb index d8a9cef..3a447c5 100644 --- a/test/io/stream/buffered.rb +++ b/test/io/stream/buffered.rb @@ -1011,6 +1011,8 @@ def after(error = nil) end describe "Socket.pair" do + include Sus::Fixtures::Async::ReactorContext + let(:sockets) {Socket.pair(:UNIX, :STREAM)} let(:client) {IO::Stream::Buffered.wrap(sockets[0])} let(:server) {IO::Stream::Buffered.wrap(sockets[1])} @@ -1022,6 +1024,29 @@ def after(error = nil) it_behaves_like AUnidirectionalStream it_behaves_like ABidirectionalStream + + it "flushes pending writes even when read buffer already has data" do + # When read-ahead pulls more data than requested, a subsequent read can + # complete from the buffer without calling fill_read_buffer — skipping + # flush. This test verifies that pending writes are still flushed. + task_a = reactor.async do + client.write("A1") + data = client.read_exactly(2) + client.write("A2") + client.read_exactly(2) + end + + task_b = reactor.async do + server.write("B1") + server.read_exactly(2) + server.write("B2") + server.read_exactly(2) + end + + # Without the fix, this deadlocks because A2 is never flushed. + task_a.wait + task_b.wait + end end describe TCPSocket do From 5f5b45d5a4e773b88fdcceba1805c76100fabdc2 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Fri, 27 Mar 2026 16:15:54 +1300 Subject: [PATCH 2/3] Only flush if required. --- lib/io/stream/readable.rb | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/lib/io/stream/readable.rb b/lib/io/stream/readable.rb index cd68f52..99b9d17 100644 --- a/lib/io/stream/readable.rb +++ b/lib/io/stream/readable.rb @@ -97,18 +97,19 @@ def read(size = nil, buffer = nil) end if size - # Ensure pending writes are flushed before we read, even when the - # read buffer already has enough data from a previous read-ahead. - # Without this, a bidirectional protocol can deadlock: our write - # sits in the buffer while the peer blocks waiting for it. - flush - - until @finished or @read_buffer.bytesize >= size - # Compute the amount of data we need to read from the underlying stream: - read_size = size - @read_buffer.bytesize - - # Don't read less than @minimum_read_size to avoid lots of small reads: - fill_read_buffer(read_size > @minimum_read_size ? read_size : @minimum_read_size) + if @finished or @read_buffer.bytesize >= size + # We have enough data in the read buffer, but we should still flush pending writes: + self.flush + else + while true + # Compute the amount of data we need to read from the underlying stream: + read_size = size - @read_buffer.bytesize + + # Don't read less than @minimum_read_size to avoid lots of small reads: + fill_read_buffer(read_size > @minimum_read_size ? read_size : @minimum_read_size) + + break if @finished or @read_buffer.bytesize >= size + end end else until @finished From ace3c0533ce2a284e22f87644c09e603b57997a3 Mon Sep 17 00:00:00 2001 From: Samuel Williams Date: Fri, 27 Mar 2026 16:23:06 +1300 Subject: [PATCH 3/3] Update release notes. --- releases.md | 1 + 1 file changed, 1 insertion(+) diff --git a/releases.md b/releases.md index 240fab8..06f3530 100644 --- a/releases.md +++ b/releases.md @@ -3,6 +3,7 @@ ## Unreleased - Remove old OpenSSL method shims. + - Ensure `IO::Stream::Readable#read` calls `#flush` even if buffered data is sufficient to satisfy the read, to maintain consistent behavior. ## v0.11.0