From 2b991b367717745284541338f74b596879c217d3 Mon Sep 17 00:00:00 2001 From: guptapratykshh Date: Thu, 19 Feb 2026 17:09:22 +0530 Subject: [PATCH] fix intermittent timeout in parevalmap by ensuring semaphore release on cancellation --- core/shared/src/main/scala/fs2/Stream.scala | 2 +- core/shared/src/test/scala/fs2/ParEvalMapSuite.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index a3642794a8..0a0ba34f09 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -2396,7 +2396,7 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, val init = initFork(pushed.complete(()).void) poll(init).onCancel(releaseAndCheckCompletion).flatMap { send => val action = F.catchNonFatal(f(el)).flatten.attempt.flatMap(send) *> pushed.get - F.start(stop.get.race(action) *> releaseAndCheckCompletion) + F.start(stop.get.race(action).void.guarantee(releaseAndCheckCompletion)) } } } diff --git a/core/shared/src/test/scala/fs2/ParEvalMapSuite.scala b/core/shared/src/test/scala/fs2/ParEvalMapSuite.scala index d394969623..7e036c0c4a 100644 --- a/core/shared/src/test/scala/fs2/ParEvalMapSuite.scala +++ b/core/shared/src/test/scala/fs2/ParEvalMapSuite.scala @@ -184,7 +184,7 @@ class ParEvalMapSuite extends Fs2Suite { group("cancels running computations when error raised") { test("parEvalMapUnordered") { - check(_.parEvalMapUnbounded(identity)) + check(_.parEvalMapUnorderedUnbounded(identity)) } test("parEvalMap") {