From 58d5780b56d99f080bfa8c55f3b36b688a04b457 Mon Sep 17 00:00:00 2001 From: Justin Reardon Date: Sat, 7 Feb 2026 12:47:46 -0500 Subject: [PATCH] Document scope limitations of concurrent Stream combinators - add general documentation to Stream warning about scopes not extending through operations that use `Concurrent` - add specific documentation for the couple of methods that are safe --- core/shared/src/main/scala/fs2/Stream.scala | 21 ++++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index a3642794a8..c01c713337 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -92,6 +92,15 @@ import java.util.concurrent.Flow.{Processor, Publisher, Subscriber} * but their stream counterparts return an (either empty or singleton) stream. * Other methods, like `zipWithPrevious`, have a more complicated but still pure translation to list methods. * + * == Scopes and Concurrent Effects == + * + * Stream combinators which require `F` to have a `Concurrent` instance generally do not propagate open scopes. Finalizers + * may be run concurrently with subsequent operations in the stream. This means that these combinators should not be used + * with streams that must hold resources (eg. a network socket). + * + * The `parJoin` family of methods is designed to allow the elements of streams of resources to be used safely concurrently. + * Beware however that they still do not propagate the resource scope past the `parJoin` combinator. + * * == Type-Class instances and laws of the Stream Operations == * * Laws (using infix syntax): @@ -1799,7 +1808,10 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, ): Stream[F2, O] = interruptWhen(haltWhenTrue.get) - /** Alias for `interruptWhen(haltWhenTrue.discrete)`. */ + /** Alias for `interruptWhen(haltWhenTrue.discrete)`. + * + * This method propagates scopes despite requiring `Concurrent`. + */ def interruptWhen[F2[x] >: F[x]: Concurrent]( haltWhenTrue: Signal[F2, Boolean] ): Stream[F2, O] = @@ -2005,7 +2017,7 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, * This can be used to control how chunks are emitted downstream. See [[mergeAndAwaitDownstream]] for example. * * @param f The function that combines the output stream and a finalizer for the chunk. - * This way we can controll when to pull pull next chunk from upstream. + * This way we can control when to pull pull next chunk from upstream. */ private def merge_[F2[x] >: F[x], O2 >: O]( that: Stream[F2, O2] @@ -2460,7 +2472,10 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F, pauseWhen(pauseSignal).mergeHaltBoth(writer) } - /** Pause this stream when `pauseWhenTrue` is `true`, resume when it's `false`. */ + /** Pause this stream when `pauseWhenTrue` is `true`, resume when it's `false`. + * + * This method propagates scopes despite requiring `Concurrent`. + */ def pauseWhen[F2[x] >: F[x]: Concurrent]( pauseWhenTrue: Signal[F2, Boolean] ): Stream[F2, O] = {