Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions core/shared/src/main/scala/fs2/Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,15 @@ import java.util.concurrent.Flow.{Processor, Publisher, Subscriber}
* but their stream counterparts return an (either empty or singleton) stream.
* Other methods, like `zipWithPrevious`, have a more complicated but still pure translation to list methods.
*
* == Scopes and Concurrent Effects ==
*
* Stream combinators which require `F` to have a `Concurrent` instance generally do not propagate open scopes. Finalizers
* may be run concurrently with subsequent operations in the stream. This means that these combinators should not be used
* with streams that must hold resources (eg. a network socket).
*
* The `parJoin` family of methods is designed to allow the elements of streams of resources to be used safely concurrently.
* Beware however that they still do not propagate the resource scope past the `parJoin` combinator.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's an example of this that is harmful?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In intended use cases, I doubt it would happen, but I wanted to make it clear the finalizer behavior of parJoin still doesn't save you from the general pitfalls documented. It just doesn't break like parEvalMap does.

*
* == Type-Class instances and laws of the Stream Operations ==
*
* Laws (using infix syntax):
Expand Down Expand Up @@ -1799,7 +1808,10 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
): Stream[F2, O] =
interruptWhen(haltWhenTrue.get)

/** Alias for `interruptWhen(haltWhenTrue.discrete)`. */
/** Alias for `interruptWhen(haltWhenTrue.discrete)`.
*
* This method propagates scopes despite requiring `Concurrent`.
*/
def interruptWhen[F2[x] >: F[x]: Concurrent](
haltWhenTrue: Signal[F2, Boolean]
): Stream[F2, O] =
Expand Down Expand Up @@ -2005,7 +2017,7 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
* This can be used to control how chunks are emitted downstream. See [[mergeAndAwaitDownstream]] for example.
*
* @param f The function that combines the output stream and a finalizer for the chunk.
* This way we can controll when to pull pull next chunk from upstream.
* This way we can control when to pull pull next chunk from upstream.
*/
private def merge_[F2[x] >: F[x], O2 >: O](
that: Stream[F2, O2]
Expand Down Expand Up @@ -2460,7 +2472,10 @@ final class Stream[+F[_], +O] private[fs2] (private[fs2] val underlying: Pull[F,
pauseWhen(pauseSignal).mergeHaltBoth(writer)
}

/** Pause this stream when `pauseWhenTrue` is `true`, resume when it's `false`. */
/** Pause this stream when `pauseWhenTrue` is `true`, resume when it's `false`.
*
* This method propagates scopes despite requiring `Concurrent`.
*/
def pauseWhen[F2[x] >: F[x]: Concurrent](
pauseWhenTrue: Signal[F2, Boolean]
): Stream[F2, O] = {
Expand Down