From 20c80d86938c70414757ea9203ea1e72b908a636 Mon Sep 17 00:00:00 2001 From: Ankit Kumar Date: Thu, 9 Apr 2026 09:11:01 +0000 Subject: [PATCH 1/7] Added f.cede for async boundry on empty stream --- core/shared/src/main/scala/fs2/Stream.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index a3642794a8..64520d72c6 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -2005,7 +2005,7 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, * This can be used to control how chunks are emitted downstream. See [[mergeAndAwaitDownstream]] for example. * * @param f The function that combines the output stream and a finalizer for the chunk. - * This way we can controll when to pull pull next chunk from upstream. + * This way we can control when to pull next chunk from upstream. */ private def merge_[F2[x] >: F[x], O2 >: O]( that: Stream[F2, O2] @@ -2046,7 +2046,7 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, def sendChunk(chk: Chunk[O2]): F2[Unit] = output.send(f(Stream.chunk(chk), guard.release)) >> guard.acquire - (Stream.exec(guard.acquire) ++ s.chunks.foreach(sendChunk)) + (Stream.exec(guard.acquire) ++ s.chunks.filter(_.nonEmpty).evalMap(chk => F.cede >> sendChunk(chk))) // Stop when the other upstream has errored or the downstream has completed. // This may also interrupt the initial call to `guard.acquire` as the call is made at the // beginning of the stream. From c36a6ff367c66b354829400178136646dd29e19e Mon Sep 17 00:00:00 2001 From: Ankit Kumar Date: Thu, 9 Apr 2026 09:19:06 +0000 Subject: [PATCH 2/7] applied scalafmt --- core/shared/src/main/scala/fs2/Stream.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index 64520d72c6..a31d9b51e6 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -2046,7 +2046,9 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, def sendChunk(chk: Chunk[O2]): F2[Unit] = output.send(f(Stream.chunk(chk), guard.release)) >> guard.acquire - (Stream.exec(guard.acquire) ++ s.chunks.filter(_.nonEmpty).evalMap(chk => F.cede >> sendChunk(chk))) + (Stream.exec(guard.acquire) ++ s.chunks + .filter(_.nonEmpty) + .evalMap(chk => F.cede >> sendChunk(chk))) // Stop when the other upstream has errored or the downstream has completed. // This may also interrupt the initial call to `guard.acquire` as the call is made at the // beginning of the stream. From 424ecb67ad39aa60682616090e9106d43fe9ce76 Mon Sep 17 00:00:00 2001 From: Ankit Kumar Date: Fri, 10 Apr 2026 18:07:33 +0000 Subject: [PATCH 3/7] added test --- bt 1.12.5 (Ubuntu Java 21.0.10) | 90 +++++++++++++++++++ .../src/test/scala/fs2/StreamMergeSuite.scala | 11 +++ 2 files changed, 101 insertions(+) create mode 100644 bt 1.12.5 (Ubuntu Java 21.0.10) diff --git a/bt 1.12.5 (Ubuntu Java 21.0.10) b/bt 1.12.5 (Ubuntu Java 21.0.10) new file mode 100644 index 0000000000..e968a1b8bc --- /dev/null +++ b/bt 1.12.5 (Ubuntu Java 21.0.10) @@ -0,0 +1,90 @@ +diff --git a/core/shared/src/test/scala/fs2/StreamMergeSuite.scala b/core/shared/src/test/scala/fs2/StreamMergeSuite.scala +index 32af3b77a..7f9ede54a 100644 +--- a/core/shared/src/test/scala/fs2/StreamMergeSuite.scala ++++ b/core/shared/src/test/scala/fs2/StreamMergeSuite.scala +@@ -27,6 +27,7 @@ import cats.effect.kernel.{Deferred, Ref} + import cats.effect.testkit.TestControl + import org.scalacheck.effect.PropF.forAllF +  ++ + class StreamMergeSuite extends Fs2Suite { + group("merge") { + test("basic") { +@@ -172,6 +173,68 @@ class StreamMergeSuite extends Fs2Suite { + } + } +  ++ // Regression tests for issue #3697: ++ // Stream.merge hangs when one branch is Stream.empty.repeat — ++ // interruptAfter and fiber.cancel were both unresponsive. ++ // Fix: added .filter(_.nonEmpty) + F.cede in merge_ to yield ++ // an async boundary even when the stream emits only empty chunks. ++ group("issue-3697 empty.repeat does not starve scheduler") { ++ ++ // Test 1: exact repro from the issue — interruptAfter must terminate ++ test("Stream.empty.repeat merged with finite stream is interruptible") { ++ TestControl.executeEmbed( ++ Stream.empty ++ .covary[IO].repeat ++ .merge(Stream.emits[IO, Int](List(4, 5, 6))) ++ .interruptAfter(1.second) ++ .compile ++ .drain ++ ) ++ } ++ ++ // Test 2: finite stream on the left, empty.repeat on the right ++ test("finite stream merged with Stream.empty.repeat is interruptible") { ++ TestControl.executeEmbed( ++ Stream ++ .emits[IO, Int](List(1, 2, 3)) ++ .merge(Stream.empty.repeat) ++ .interruptAfter(1.second) ++ .compile ++ .drain ++ ) ++ } ++ ++ // Test 3: elements from the non-empty branch are not lost ++ test("all elements from the non-empty branch are emitted") { ++ TestControl.executeEmbed( ++ Stream.empty ++ .covary[IO].repeat ++ .merge(Stream.emits[IO, Int](List(7, 8, 9))) ++ .interruptAfter(1.second) ++ .compile ++ .toList ++ .map(result => assertEquals(result.sorted, List(7, 8, 9))) ++ ) ++ } ++ ++ // Test 4: fiber.cancel must also unblock (second variant from the issue) ++ test("fiber wrapping merge with empty.repeat can be cancelled") { ++ TestControl.executeEmbed( ++ Stream.empty ++ .covary[IO].repeat ++ .merge(Stream.emits[IO, Int](List(1, 2, 3))) ++ .interruptAfter(500.millis) ++ .compile ++ .drain ++ .start ++ .flatMap { fiber => ++ IO.sleep(600.millis) >> fiber.cancel >> fiber.join.void ++ } ++ ) ++ } ++ } ++ ++ + test("mergeHaltBoth") { + forAllF { (s1: Stream[Pure, Int], s2: Stream[Pure, Int]) => + val s1List = s1.toList +@@ -287,6 +350,8 @@ class StreamMergeSuite extends Fs2Suite { + .assertEquals(Vector(1, 2)) + } +  ++ ++ + test("issue #3598") { +  + sealed trait Data diff --git a/core/shared/src/test/scala/fs2/StreamMergeSuite.scala b/core/shared/src/test/scala/fs2/StreamMergeSuite.scala index 32af3b77a3..695ed5d559 100644 --- a/core/shared/src/test/scala/fs2/StreamMergeSuite.scala +++ b/core/shared/src/test/scala/fs2/StreamMergeSuite.scala @@ -332,4 +332,15 @@ class StreamMergeSuite extends Fs2Suite { data.count(_.isInstanceOf[Tick2.type]) == 4 } } + test("merge does not hang with Stream.empty.repeat in one branch".ignore) { + Stream.empty + .covary[IO] + .repeat + .merge(Stream.emits[IO, Int](List(1, 2, 3))) + .interruptAfter(2.seconds) + .compile + .drain + .timeout(5.seconds) + } + } From ad73c84a6747cfdcc46da2fc1ad376ea018962e6 Mon Sep 17 00:00:00 2001 From: Ankit Kumar Date: Fri, 10 Apr 2026 18:22:19 +0000 Subject: [PATCH 4/7] Revert "added test" This reverts commit 424ecb67ad39aa60682616090e9106d43fe9ce76. --- bt 1.12.5 (Ubuntu Java 21.0.10) | 90 ------------------- .../src/test/scala/fs2/StreamMergeSuite.scala | 11 --- 2 files changed, 101 deletions(-) delete mode 100644 bt 1.12.5 (Ubuntu Java 21.0.10) diff --git a/bt 1.12.5 (Ubuntu Java 21.0.10) b/bt 1.12.5 (Ubuntu Java 21.0.10) deleted file mode 100644 index e968a1b8bc..0000000000 --- a/bt 1.12.5 (Ubuntu Java 21.0.10) +++ /dev/null @@ -1,90 +0,0 @@ -diff --git a/core/shared/src/test/scala/fs2/StreamMergeSuite.scala b/core/shared/src/test/scala/fs2/StreamMergeSuite.scala -index 32af3b77a..7f9ede54a 100644 ---- a/core/shared/src/test/scala/fs2/StreamMergeSuite.scala -+++ b/core/shared/src/test/scala/fs2/StreamMergeSuite.scala -@@ -27,6 +27,7 @@ import cats.effect.kernel.{Deferred, Ref} - import cats.effect.testkit.TestControl - import org.scalacheck.effect.PropF.forAllF -  -+ - class StreamMergeSuite extends Fs2Suite { - group("merge") { - test("basic") { -@@ -172,6 +173,68 @@ class StreamMergeSuite extends Fs2Suite { - } - } -  -+ // Regression tests for issue #3697: -+ // Stream.merge hangs when one branch is Stream.empty.repeat — -+ // interruptAfter and fiber.cancel were both unresponsive. -+ // Fix: added .filter(_.nonEmpty) + F.cede in merge_ to yield -+ // an async boundary even when the stream emits only empty chunks. -+ group("issue-3697 empty.repeat does not starve scheduler") { -+ -+ // Test 1: exact repro from the issue — interruptAfter must terminate -+ test("Stream.empty.repeat merged with finite stream is interruptible") { -+ TestControl.executeEmbed( -+ Stream.empty -+ .covary[IO].repeat -+ .merge(Stream.emits[IO, Int](List(4, 5, 6))) -+ .interruptAfter(1.second) -+ .compile -+ .drain -+ ) -+ } -+ -+ // Test 2: finite stream on the left, empty.repeat on the right -+ test("finite stream merged with Stream.empty.repeat is interruptible") { -+ TestControl.executeEmbed( -+ Stream -+ .emits[IO, Int](List(1, 2, 3)) -+ .merge(Stream.empty.repeat) -+ .interruptAfter(1.second) -+ .compile -+ .drain -+ ) -+ } -+ -+ // Test 3: elements from the non-empty branch are not lost -+ test("all elements from the non-empty branch are emitted") { -+ TestControl.executeEmbed( -+ Stream.empty -+ .covary[IO].repeat -+ .merge(Stream.emits[IO, Int](List(7, 8, 9))) -+ .interruptAfter(1.second) -+ .compile -+ .toList -+ .map(result => assertEquals(result.sorted, List(7, 8, 9))) -+ ) -+ } -+ -+ // Test 4: fiber.cancel must also unblock (second variant from the issue) -+ test("fiber wrapping merge with empty.repeat can be cancelled") { -+ TestControl.executeEmbed( -+ Stream.empty -+ .covary[IO].repeat -+ .merge(Stream.emits[IO, Int](List(1, 2, 3))) -+ .interruptAfter(500.millis) -+ .compile -+ .drain -+ .start -+ .flatMap { fiber => -+ IO.sleep(600.millis) >> fiber.cancel >> fiber.join.void -+ } -+ ) -+ } -+ } -+ -+ - test("mergeHaltBoth") { - forAllF { (s1: Stream[Pure, Int], s2: Stream[Pure, Int]) => - val s1List = s1.toList -@@ -287,6 +350,8 @@ class StreamMergeSuite extends Fs2Suite { - .assertEquals(Vector(1, 2)) - } -  -+ -+ - test("issue #3598") { -  - sealed trait Data diff --git a/core/shared/src/test/scala/fs2/StreamMergeSuite.scala b/core/shared/src/test/scala/fs2/StreamMergeSuite.scala index 695ed5d559..32af3b77a3 100644 --- a/core/shared/src/test/scala/fs2/StreamMergeSuite.scala +++ b/core/shared/src/test/scala/fs2/StreamMergeSuite.scala @@ -332,15 +332,4 @@ class StreamMergeSuite extends Fs2Suite { data.count(_.isInstanceOf[Tick2.type]) == 4 } } - test("merge does not hang with Stream.empty.repeat in one branch".ignore) { - Stream.empty - .covary[IO] - .repeat - .merge(Stream.emits[IO, Int](List(1, 2, 3))) - .interruptAfter(2.seconds) - .compile - .drain - .timeout(5.seconds) - } - } From 9608e5fa646f6c76b5d5b139d12885298d42d26b Mon Sep 17 00:00:00 2001 From: Ankit Kumar Date: Fri, 10 Apr 2026 18:28:17 +0000 Subject: [PATCH 5/7] added test --- core/shared/src/test/scala/fs2/StreamMergeSuite.scala | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/core/shared/src/test/scala/fs2/StreamMergeSuite.scala b/core/shared/src/test/scala/fs2/StreamMergeSuite.scala index 32af3b77a3..620bc05cfb 100644 --- a/core/shared/src/test/scala/fs2/StreamMergeSuite.scala +++ b/core/shared/src/test/scala/fs2/StreamMergeSuite.scala @@ -332,4 +332,14 @@ class StreamMergeSuite extends Fs2Suite { data.count(_.isInstanceOf[Tick2.type]) == 4 } } + test("merge does not hang with Stream.empty.repeat in one branch") { + Stream.empty + .covary[IO] + .repeat + .merge(Stream.emits[IO, Int](List(1, 2, 3))) + .interruptAfter(2.seconds) + .compile + .drain + .timeout(5.seconds) + } } From 77d19833240672821ba4e6827d2524e9de80cf1f Mon Sep 17 00:00:00 2001 From: Ankit Kumar Date: Fri, 10 Apr 2026 19:08:15 +0000 Subject: [PATCH 6/7] revised test --- core/shared/src/test/scala/fs2/StreamMergeSuite.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/shared/src/test/scala/fs2/StreamMergeSuite.scala b/core/shared/src/test/scala/fs2/StreamMergeSuite.scala index 620bc05cfb..695ed5d559 100644 --- a/core/shared/src/test/scala/fs2/StreamMergeSuite.scala +++ b/core/shared/src/test/scala/fs2/StreamMergeSuite.scala @@ -332,7 +332,7 @@ class StreamMergeSuite extends Fs2Suite { data.count(_.isInstanceOf[Tick2.type]) == 4 } } - test("merge does not hang with Stream.empty.repeat in one branch") { + test("merge does not hang with Stream.empty.repeat in one branch".ignore) { Stream.empty .covary[IO] .repeat @@ -342,4 +342,5 @@ class StreamMergeSuite extends Fs2Suite { .drain .timeout(5.seconds) } + } From df9c4f1d0fd3edc2eea97407afd5003fc1bc7857 Mon Sep 17 00:00:00 2001 From: Ankit Kumar Date: Fri, 10 Apr 2026 20:13:54 +0000 Subject: [PATCH 7/7] removed cede because it was unnecessary --- core/shared/src/main/scala/fs2/Stream.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index a31d9b51e6..020934aa9d 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -2048,7 +2048,7 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, (Stream.exec(guard.acquire) ++ s.chunks .filter(_.nonEmpty) - .evalMap(chk => F.cede >> sendChunk(chk))) + .evalMap(sendChunk)) // Stop when the other upstream has errored or the downstream has completed. // This may also interrupt the initial call to `guard.acquire` as the call is made at the // beginning of the stream.