diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index a3642794a8..020934aa9d 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -2005,7 +2005,7 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, * This can be used to control how chunks are emitted downstream. See [[mergeAndAwaitDownstream]] for example. * * @param f The function that combines the output stream and a finalizer for the chunk. - * This way we can controll when to pull pull next chunk from upstream. + * This way we can control when to pull next chunk from upstream. */ private def merge_[F2[x] >: F[x], O2 >: O]( that: Stream[F2, O2] @@ -2046,7 +2046,9 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, def sendChunk(chk: Chunk[O2]): F2[Unit] = output.send(f(Stream.chunk(chk), guard.release)) >> guard.acquire - (Stream.exec(guard.acquire) ++ s.chunks.foreach(sendChunk)) + (Stream.exec(guard.acquire) ++ s.chunks + .filter(_.nonEmpty) + .evalMap(sendChunk)) // Stop when the other upstream has errored or the downstream has completed. // This may also interrupt the initial call to `guard.acquire` as the call is made at the // beginning of the stream. diff --git a/core/shared/src/test/scala/fs2/StreamMergeSuite.scala b/core/shared/src/test/scala/fs2/StreamMergeSuite.scala index 32af3b77a3..695ed5d559 100644 --- a/core/shared/src/test/scala/fs2/StreamMergeSuite.scala +++ b/core/shared/src/test/scala/fs2/StreamMergeSuite.scala @@ -332,4 +332,15 @@ class StreamMergeSuite extends Fs2Suite { data.count(_.isInstanceOf[Tick2.type]) == 4 } } + test("merge does not hang with Stream.empty.repeat in one branch".ignore) { + Stream.empty + .covary[IO] + .repeat + .merge(Stream.emits[IO, Int](List(1, 2, 3))) + .interruptAfter(2.seconds) + .compile + .drain + .timeout(5.seconds) + } + }