diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index a3642794a8..0a0ba34f09 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -2396,7 +2396,7 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, val init = initFork(pushed.complete(()).void) poll(init).onCancel(releaseAndCheckCompletion).flatMap { send => val action = F.catchNonFatal(f(el)).flatten.attempt.flatMap(send) *> pushed.get - F.start(stop.get.race(action) *> releaseAndCheckCompletion) + F.start(stop.get.race(action).void.guarantee(releaseAndCheckCompletion)) } } } diff --git a/core/shared/src/test/scala/fs2/ParEvalMapSuite.scala b/core/shared/src/test/scala/fs2/ParEvalMapSuite.scala index d394969623..7e036c0c4a 100644 --- a/core/shared/src/test/scala/fs2/ParEvalMapSuite.scala +++ b/core/shared/src/test/scala/fs2/ParEvalMapSuite.scala @@ -184,7 +184,7 @@ class ParEvalMapSuite extends Fs2Suite { group("cancels running computations when error raised") { test("parEvalMapUnordered") { - check(_.parEvalMapUnbounded(identity)) + check(_.parEvalMapUnorderedUnbounded(identity)) } test("parEvalMap") {