diff --git a/bench-jmh/src/main/scala/org/apache/pekko/stream/InvokeWithFeedbackBenchmark.scala b/bench-jmh/src/main/scala/org/apache/pekko/stream/InvokeWithFeedbackBenchmark.scala index 17c30e5f948..caba9648898 100644 --- a/bench-jmh/src/main/scala/org/apache/pekko/stream/InvokeWithFeedbackBenchmark.scala +++ b/bench-jmh/src/main/scala/org/apache/pekko/stream/InvokeWithFeedbackBenchmark.scala @@ -15,6 +15,7 @@ package org.apache.pekko.stream import java.util.concurrent.TimeUnit +import scala.annotation.nowarn import scala.concurrent._ import scala.concurrent.duration._ @@ -27,6 +28,7 @@ import pekko.stream.scaladsl._ @State(Scope.Benchmark) @OutputTimeUnit(TimeUnit.SECONDS) @BenchmarkMode(Array(Mode.Throughput)) +@nowarn("msg=deprecated") class InvokeWithFeedbackBenchmark { implicit val system: ActorSystem = ActorSystem("InvokeWithFeedbackBenchmark") diff --git a/docs/src/main/paradox/stream/actor-interop.md b/docs/src/main/paradox/stream/actor-interop.md index cd1bee5a163..0b04d992668 100644 --- a/docs/src/main/paradox/stream/actor-interop.md +++ b/docs/src/main/paradox/stream/actor-interop.md @@ -128,6 +128,14 @@ use `Sink.actorRefWithBackpressure` or `ask` in `mapAsync`, though. ### Source.queue +@@@ warning { title="Deprecation notice (since 2.0.0)" } + +The `Source.queue(Int, OverflowStrategy)` overloads that this section describes — including `OverflowStrategy.backpressure` — are **deprecated** because their asynchronous `offer` @scala[`Future`]@java[`CompletionStage`] can hang indefinitely when downstream stalls. Use `Source.queue[T](bufferSize)` (materializes a @apidoc[BoundedSourceQueue] with synchronous feedback), or for backpressure towards the producer use @ref:[`Source.actorRefWithBackpressure`](operators/Source/actorRefWithBackpressure.md) or `MergeHub.source`. See @ref:[the Source.queue operator page](operators/Source/queue.md) for a per-strategy migration table. + +The snippet below has been updated to the recommended non-deprecated API, so it uses a synchronous `match`/`switch` on `QueueOfferResult` rather than the `pipe`-the-`Future` pattern described in the surrounding prose. + +@@@ + `Source.queue` is an improvement over `Sink.actorRef`, since it can provide backpressure. The `offer` method returns a @scala[`Future`]@java[`CompletionStage`], which completes with the result of the enqueue operation. diff --git a/docs/src/main/paradox/stream/operators/Source/actorRefWithBackpressure.md b/docs/src/main/paradox/stream/operators/Source/actorRefWithBackpressure.md index 9d3601abdf5..3061057eadb 100644 --- a/docs/src/main/paradox/stream/operators/Source/actorRefWithBackpressure.md +++ b/docs/src/main/paradox/stream/operators/Source/actorRefWithBackpressure.md @@ -18,7 +18,7 @@ See also: * @ref[Source.actorRef](../Source/actorRef.md) This operator without backpressure control * @ref[ActorSource.actorRef](../ActorSource/actorRef.md) The operator for the new actors API without backpressure control * @ref[ActorSource.actorRefWithBackpressure](../ActorSource/actorRefWithBackpressure.md) The corresponding operator for the new actors API -* @ref[Source.queue](../Source/queue.md) Materialize a `SourceQueue` onto which elements can be pushed for emitting from the source +* @ref[Source.queue](../Source/queue.md) Materialize a `BoundedSourceQueue` onto which elements can be pushed for emitting from the source ## Examples diff --git a/docs/src/main/paradox/stream/operators/Source/queue.md b/docs/src/main/paradox/stream/operators/Source/queue.md index 71875dadeb7..4fc52351622 100644 --- a/docs/src/main/paradox/stream/operators/Source/queue.md +++ b/docs/src/main/paradox/stream/operators/Source/queue.md @@ -4,6 +4,14 @@ Materialize a `BoundedSourceQueue` or `SourceQueue` onto which elements can be p @ref[Source operators](../index.md#source-operators) +@@@ warning { title="Deprecation notice (since 2.0.0)" } + +The `Source.queue` overloads that accept an @apidoc[OverflowStrategy] and materialize a `SourceQueueWithComplete` are **deprecated**. Their asynchronous `offer` @scala[`Future`]@java[`CompletionStage`] can hang indefinitely under `OverflowStrategy.backpressure` when downstream stalls, which has caused real-world deadlocks. + +Prefer `Source.queue[T](bufferSize)` (this page), which materializes a @apidoc[BoundedSourceQueue] with synchronous feedback and drop-newest overflow. For backpressure towards the producer, use @ref:[`Source.actorRefWithBackpressure`](actorRefWithBackpressure.md) (single imperative producer) or `MergeHub.source` (multiple producers). See the [migration table](#migrating-from-the-deprecated-sourcequeueint-overflowstrategy-overloads) below for a per-strategy replacement. + +@@@ + ## Signature (`BoundedSourceQueue`) @apidoc[Source.queue](Source$) { scala="#queue[T](bufferSize:Int):org.apache.pekko.stream.scaladsl.Source[T,org.apache.pekko.stream.scaladsl.BoundedSourceQueue[T]]" java="#queue(int)" } @@ -35,6 +43,12 @@ Java ## Signature (`SourceQueue`) +@@@ warning + +These two overloads are deprecated since 2.0.0. See the [migration table](#migrating-from-the-deprecated-sourcequeueint-overflowstrategy-overloads) below. + +@@@ + @apidoc[Source.queue](Source$) { scala="#queue[T](bufferSize:Int,overflowStrategy:org.apache.pekko.stream.OverflowStrategy):org.apache.pekko.stream.scaladsl.Source[T,org.apache.pekko.stream.scaladsl.SourceQueueWithComplete[T]]" java="#queue(int,org.apache.pekko.stream.OverflowStrategy)" } @apidoc[Source.queue](Source$) { scala="#queue[T](bufferSize:Int,overflowStrategy:org.apache.pekko.stream.OverflowStrategy,maxConcurrentOffers:Int):org.apache.pekko.stream.scaladsl.Source[T,org.apache.pekko.stream.scaladsl.SourceQueueWithComplete[T]]" java="#queue(int,org.apache.pekko.stream.OverflowStrategy,int)" } @@ -69,3 +83,15 @@ Java @@@ +## Migrating from the deprecated `Source.queue(Int, OverflowStrategy)` overloads + +| Old call | Replacement | +|----------|-------------| +| `Source.queue(n, OverflowStrategy.dropNew)` | `Source.queue[T](n)` — `BoundedSourceQueue` already drops the newest element. | +| `Source.queue(n, OverflowStrategy.dropHead)` | `Source.queue[T](n)` if drop-new is acceptable. Otherwise build a custom @apidoc[GraphStage] with a FIFO buffer that drops the head. | +| `Source.queue(n, OverflowStrategy.dropTail)` | Same as above; `BoundedSourceQueue` always drops the newest offer (i.e. the tail). | +| `Source.queue(n, OverflowStrategy.dropBuffer)` | `Source.queue[T](n)` combined with a @apidoc[GraphStage] that clears the buffer on overflow, or rework the producer to tolerate drops. | +| `Source.queue(n, OverflowStrategy.fail)` | `Source.queue[T](n)` and, on `QueueOfferResult.Dropped`, call `BoundedSourceQueue.fail` with a `BufferOverflowException`. | +| `Source.queue(n, OverflowStrategy.backpressure)` | @ref:[`Source.actorRefWithBackpressure`](actorRefWithBackpressure.md) (single imperative producer) or `MergeHub.source` (multiple producers). | + +`SourceQueueWithComplete.offer` returned a @scala[`Future[QueueOfferResult]`]@java[`CompletionStage`]; `BoundedSourceQueue.offer` returns `QueueOfferResult` synchronously. Call sites that previously chained `.map`/`.flatMap` on the offer future can usually be rewritten as a direct `match`/`switch` on the result. diff --git a/docs/src/test/scala/docs/stream/IntegrationDocSpec.scala b/docs/src/test/scala/docs/stream/IntegrationDocSpec.scala index 02df48ced34..2518cf09bda 100644 --- a/docs/src/test/scala/docs/stream/IntegrationDocSpec.scala +++ b/docs/src/test/scala/docs/stream/IntegrationDocSpec.scala @@ -478,7 +478,7 @@ class IntegrationDocSpec extends PekkoSpec(IntegrationDocSpec.config) { val bufferSize = 10 val elementsToProcess = 5 - val queue = Source + val queue: BoundedSourceQueue[Int] = Source .queue[Int](bufferSize) .throttle(elementsToProcess, 3.second) .map(x => x * x) @@ -487,16 +487,15 @@ class IntegrationDocSpec extends PekkoSpec(IntegrationDocSpec.config) { val source = Source(1 to 10) - implicit val ec = system.dispatcher source - .map(x => { - queue.offer(x).map { + .map { x => + queue.offer(x) match { case QueueOfferResult.Enqueued => println(s"enqueued $x") case QueueOfferResult.Dropped => println(s"dropped $x") case QueueOfferResult.Failure(ex) => println(s"Offer failed ${ex.getMessage}") case QueueOfferResult.QueueClosed => println("Source Queue closed") } - }) + } .runWith(Sink.ignore) // #source-queue } @@ -519,7 +518,6 @@ class IntegrationDocSpec extends PekkoSpec(IntegrationDocSpec.config) { val fastElements = 1 to 10 - implicit val ec = system.dispatcher fastElements.foreach { x => queue.offer(x) match { case QueueOfferResult.Enqueued => println(s"enqueued $x") diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/QueueSourceSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/QueueSourceSpec.scala index 559369ce261..320a00d6b6e 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/QueueSourceSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/QueueSourceSpec.scala @@ -32,6 +32,7 @@ import pekko.testkit.TestProbe import org.scalatest.time.Span +@nowarn("msg=deprecated") class QueueSourceSpec extends StreamSpec { implicit val ec: ExecutionContextExecutor = system.dispatcher val pause = 300.millis diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index e60a445e62c..929223ae60f 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -692,6 +692,12 @@ object Source { * @param bufferSize size of buffer in element count * @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer */ + @deprecated( + "Prefer Source.queue(bufferSize) which materializes a BoundedSourceQueue with synchronous feedback " + + "(dropping the newest element when the buffer is full). For backpressure, use Source.actorRefWithBackpressure " + + "or MergeHub.source instead. See the Pekko Streams documentation for the Source.queue migration guide.", + since = "2.0.0") + @nowarn("msg=deprecated") def queue[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, SourceQueueWithComplete[T]] = new Source( scaladsl.Source.queue[T](bufferSize, overflowStrategy, maxConcurrentOffers = 1).mapMaterializedValue(_.asJava)) @@ -730,6 +736,12 @@ object Source { * @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer * @param maxConcurrentOffers maximum number of pending offers when buffer is full, should be greater than 0. */ + @deprecated( + "Prefer Source.queue(bufferSize) which materializes a BoundedSourceQueue with synchronous feedback " + + "(dropping the newest element when the buffer is full). For backpressure, use Source.actorRefWithBackpressure " + + "or MergeHub.source instead. See the Pekko Streams documentation for the Source.queue migration guide.", + since = "2.0.0") + @nowarn("msg=deprecated") def queue[T]( bufferSize: Int, overflowStrategy: OverflowStrategy, diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala index 4cd4594e1d0..f665bfb96d4 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala @@ -15,7 +15,7 @@ package org.apache.pekko.stream.scaladsl import java.util.concurrent.CompletionStage -import scala.annotation.{ switch, tailrec, varargs } +import scala.annotation.{ nowarn, switch, tailrec, varargs } import scala.annotation.unchecked.uncheckedVariance import scala.collection.{ immutable, AbstractIterator } import scala.concurrent.{ Future, Promise } @@ -1002,6 +1002,12 @@ object Source { * @param bufferSize size of buffer in element count * @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer */ + @deprecated( + "Prefer Source.queue[T](bufferSize) which materializes a BoundedSourceQueue with synchronous feedback " + + "(dropping the newest element when the buffer is full). For backpressure, use Source.actorRefWithBackpressure " + + "or MergeHub.source instead. See the Pekko Streams documentation for the Source.queue migration guide.", + since = "2.0.0") + @nowarn("msg=deprecated") def queue[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, SourceQueueWithComplete[T]] = queue(bufferSize, overflowStrategy, maxConcurrentOffers = 1) @@ -1039,6 +1045,11 @@ object Source { * @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer * @param maxConcurrentOffers maximum number of pending offers when buffer is full, should be greater than 0. */ + @deprecated( + "Prefer Source.queue[T](bufferSize) which materializes a BoundedSourceQueue with synchronous feedback " + + "(dropping the newest element when the buffer is full). For backpressure, use Source.actorRefWithBackpressure " + + "or MergeHub.source instead. See the Pekko Streams documentation for the Source.queue migration guide.", + since = "2.0.0") def queue[T]( bufferSize: Int, overflowStrategy: OverflowStrategy,