From dfe16028871ea0cb6923f10b27f492ce9b585042 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=99=8E=E9=B8=A3?= Date: Sat, 20 Jun 2026 01:39:53 +0800 Subject: [PATCH 1/5] fix(stream): deprecate Source.queue overloads that materialize SourceQueueWithComplete Motivation: Source.queue(bufferSize, overflowStrategy) (both scaladsl and javadsl) materializes a SourceQueueWithComplete whose asynchronous offer future can hang indefinitely under OverflowStrategy.backpressure when the buffer is full and downstream stalls. The same class of issue has caused real-world deadlocks (akka/akka-core#29557) and prompted akka/akka.net#8248 to add a cancellation-aware offer overload. Pekko already has a safer drop-new primitive, BoundedSourceQueue, exposed via Source.queue(bufferSize), and backpressure is covered by Source.actorRefWithBackpressure and MergeHub.source. Upstream Akka has not converged on a new backpressure primitive in 4+ years (akka/akka-core#29801 open since 2021), so we should close the trap at the API surface now. Modification: Add @deprecated(since = "2.0.0") to the four Source.queue overloads that accept an OverflowStrategy (two scaladsl + two javadsl). The Source.queue[T](bufferSize) overloads that return BoundedSourceQueue are left as the recommended path. SourceQueue / SourceQueueWithComplete traits are not deprecated (they are still referenced by the deprecated factory return types and by user code), so only the factory entry points emit the warning. Result: Callers of Source.queue(Int, OverflowStrategy, ...) get a compile-time warning pointing them to Source.queue(bufferSize) for drop-new, or Source.actorRefWithBackpressure / MergeHub.source for backpressure. The hang trap is closed at the API level without breaking binary compatibility. Tests: Not run - API surface change only (annotation addition, no behavior change). Deprecation annotations are enforced by the Scala/Java compiler on every user compilation. References: Fixes apache/pekko#3094 Refs akka/akka-core#29801 (Replace old Source.queue - OPEN, stale since 2021) Refs akka/akka-core#29557 (Async callback tracking OOM in Source.queue) Refs akkadotnet/akka.net#8248 (Akka.NET cancellation-aware Source.Queue offer) --- .../scala/org/apache/pekko/stream/javadsl/Source.scala | 10 ++++++++++ .../org/apache/pekko/stream/scaladsl/Source.scala | 10 ++++++++++ 2 files changed, 20 insertions(+) diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index e60a445e62..8f51a6bbaf 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -692,6 +692,11 @@ object Source { * @param bufferSize size of buffer in element count * @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer */ + @deprecated( + "Prefer Source.queue(bufferSize) which materializes a BoundedSourceQueue with synchronous feedback " + + "(dropping the newest element when the buffer is full). For backpressure, use Source.actorRefWithBackpressure " + + "or MergeHub.source instead. See the Pekko Streams documentation for the Source.queue migration guide.", + since = "2.0.0") def queue[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, SourceQueueWithComplete[T]] = new Source( scaladsl.Source.queue[T](bufferSize, overflowStrategy, maxConcurrentOffers = 1).mapMaterializedValue(_.asJava)) @@ -730,6 +735,11 @@ object Source { * @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer * @param maxConcurrentOffers maximum number of pending offers when buffer is full, should be greater than 0. */ + @deprecated( + "Prefer Source.queue(bufferSize) which materializes a BoundedSourceQueue with synchronous feedback " + + "(dropping the newest element when the buffer is full). For backpressure, use Source.actorRefWithBackpressure " + + "or MergeHub.source instead. See the Pekko Streams documentation for the Source.queue migration guide.", + since = "2.0.0") def queue[T]( bufferSize: Int, overflowStrategy: OverflowStrategy, diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala index 4cd4594e1d..a379bf14c8 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala @@ -1002,6 +1002,11 @@ object Source { * @param bufferSize size of buffer in element count * @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer */ + @deprecated( + "Prefer Source.queue[T](bufferSize) which materializes a BoundedSourceQueue with synchronous feedback " + + "(dropping the newest element when the buffer is full). For backpressure, use Source.actorRefWithBackpressure " + + "or MergeHub.source instead. See the Pekko Streams documentation for the Source.queue migration guide.", + since = "2.0.0") def queue[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, SourceQueueWithComplete[T]] = queue(bufferSize, overflowStrategy, maxConcurrentOffers = 1) @@ -1039,6 +1044,11 @@ object Source { * @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer * @param maxConcurrentOffers maximum number of pending offers when buffer is full, should be greater than 0. */ + @deprecated( + "Prefer Source.queue[T](bufferSize) which materializes a BoundedSourceQueue with synchronous feedback " + + "(dropping the newest element when the buffer is full). For backpressure, use Source.actorRefWithBackpressure " + + "or MergeHub.source instead. See the Pekko Streams documentation for the Source.queue migration guide.", + since = "2.0.0") def queue[T]( bufferSize: Int, overflowStrategy: OverflowStrategy, From 43ffb648c3610da1e7855943e96fefda8af9d898 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=99=8E=E9=B8=A3?= Date: Sat, 20 Jun 2026 01:44:54 +0800 Subject: [PATCH 2/5] fix(stream): suppress deprecation warnings at internal call sites MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Motivation: The previous commit added @deprecated to the four Source.queue overloads that accept OverflowStrategy. Three internal call sites still invoke those deprecated overloads, which trips -Xfatal-warnings in Compile (scalac) and -Werror + -Xlint:deprecation (javac). Modification: - scaladsl.Source.queue(Int, OverflowStrategy): @nowarn("msg=deprecated") so its delegation to the 3-arg overload does not retrigger the warning. - javadsl.Source.queue(Int, OverflowStrategy) and the 3-arg overload: @nowarn("msg=deprecated") for the same reason on the scaladsl delegation. - InvokeWithFeedbackBenchmark: @nowarn on the class. The benchmark intentionally measures the deprecated backpressure path. - QueueSourceSpec: @nowarn on the class. The spec exercises every deprecated overload to lock in behavior until the APIs are removed. Result: Compile path passes -Xfatal-warnings and -Werror again. User code calling the deprecated factories still receives the warning. Tests: - sbt "stream / compile" — pass (213 Scala + 5 Java sources) - sbt "stream-tests / Test / compile" — pass (229 Scala + 32 Java) - sbt "bench-jmh / compile" — pass (98 Scala + 1 Java) References: Refs apache/pekko#3094 --- .../org/apache/pekko/stream/InvokeWithFeedbackBenchmark.scala | 2 ++ .../org/apache/pekko/stream/scaladsl/QueueSourceSpec.scala | 1 + .../main/scala/org/apache/pekko/stream/javadsl/Source.scala | 2 ++ .../main/scala/org/apache/pekko/stream/scaladsl/Source.scala | 3 ++- 4 files changed, 7 insertions(+), 1 deletion(-) diff --git a/bench-jmh/src/main/scala/org/apache/pekko/stream/InvokeWithFeedbackBenchmark.scala b/bench-jmh/src/main/scala/org/apache/pekko/stream/InvokeWithFeedbackBenchmark.scala index 17c30e5f94..caba964889 100644 --- a/bench-jmh/src/main/scala/org/apache/pekko/stream/InvokeWithFeedbackBenchmark.scala +++ b/bench-jmh/src/main/scala/org/apache/pekko/stream/InvokeWithFeedbackBenchmark.scala @@ -15,6 +15,7 @@ package org.apache.pekko.stream import java.util.concurrent.TimeUnit +import scala.annotation.nowarn import scala.concurrent._ import scala.concurrent.duration._ @@ -27,6 +28,7 @@ import pekko.stream.scaladsl._ @State(Scope.Benchmark) @OutputTimeUnit(TimeUnit.SECONDS) @BenchmarkMode(Array(Mode.Throughput)) +@nowarn("msg=deprecated") class InvokeWithFeedbackBenchmark { implicit val system: ActorSystem = ActorSystem("InvokeWithFeedbackBenchmark") diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/QueueSourceSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/QueueSourceSpec.scala index 559369ce26..320a00d6b6 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/QueueSourceSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/QueueSourceSpec.scala @@ -32,6 +32,7 @@ import pekko.testkit.TestProbe import org.scalatest.time.Span +@nowarn("msg=deprecated") class QueueSourceSpec extends StreamSpec { implicit val ec: ExecutionContextExecutor = system.dispatcher val pause = 300.millis diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index 8f51a6bbaf..929223ae60 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -697,6 +697,7 @@ object Source { "(dropping the newest element when the buffer is full). For backpressure, use Source.actorRefWithBackpressure " + "or MergeHub.source instead. See the Pekko Streams documentation for the Source.queue migration guide.", since = "2.0.0") + @nowarn("msg=deprecated") def queue[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, SourceQueueWithComplete[T]] = new Source( scaladsl.Source.queue[T](bufferSize, overflowStrategy, maxConcurrentOffers = 1).mapMaterializedValue(_.asJava)) @@ -740,6 +741,7 @@ object Source { "(dropping the newest element when the buffer is full). For backpressure, use Source.actorRefWithBackpressure " + "or MergeHub.source instead. See the Pekko Streams documentation for the Source.queue migration guide.", since = "2.0.0") + @nowarn("msg=deprecated") def queue[T]( bufferSize: Int, overflowStrategy: OverflowStrategy, diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala index a379bf14c8..f665bfb96d 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala @@ -15,7 +15,7 @@ package org.apache.pekko.stream.scaladsl import java.util.concurrent.CompletionStage -import scala.annotation.{ switch, tailrec, varargs } +import scala.annotation.{ nowarn, switch, tailrec, varargs } import scala.annotation.unchecked.uncheckedVariance import scala.collection.{ immutable, AbstractIterator } import scala.concurrent.{ Future, Promise } @@ -1007,6 +1007,7 @@ object Source { "(dropping the newest element when the buffer is full). For backpressure, use Source.actorRefWithBackpressure " + "or MergeHub.source instead. See the Pekko Streams documentation for the Source.queue migration guide.", since = "2.0.0") + @nowarn("msg=deprecated") def queue[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, SourceQueueWithComplete[T]] = queue(bufferSize, overflowStrategy, maxConcurrentOffers = 1) From 42550b8884911b0d2a5ac5e147109df0b6ba1177 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=99=8E=E9=B8=A3?= Date: Sat, 20 Jun 2026 01:54:15 +0800 Subject: [PATCH 3/5] docs(stream): update Source.queue paradox page for BoundedSourceQueue deprecation path MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Motivation: The Source.queue overloads that accept an OverflowStrategy are now deprecated since 2.0.0. The existing paradox page described both SourceQueue and BoundedSourceQueue as first-class alternatives, which no longer matches the recommended API surface. The Scala #source-queue snippet was also silently calling the deprecated path: BoundedSourceQueue.offer returns QueueOfferResult synchronously, so queue.offer(x).map { case ... } cannot type-check against a BoundedSourceQueue — the example in practice relied on Source.queue(bufferSize, dropNew). Modification: - queue.md: lead with the non-deprecated BoundedSourceQueue overload, add a @@@ warning { } callout flagging the deprecated OverflowStrategy overloads, and add a migration table that maps each OverflowStrategy to a recommended replacement (BoundedSourceQueue, Source.actorRefWithBackpressure, or MergeHub.source). - IntegrationDocSpec.scala: rewrite the Scala #source-queue snippet to match the Java counterpart — BoundedSourceQueue with a direct match on QueueOfferResult, no implicit ExecutionContext. - operators/index.md: drop "or SourceQueue" from the one-line summary so the landing row reflects the non-deprecated API. Result: The docs page now points readers at the recommended API up-front, the deprecated path is clearly flagged with a migration path, and the Scala/Java examples are consistent (both BoundedSourceQueue). Tests: - sbt "docs / Test / compile" — pass (224 Scala + 234 Java sources; the updated #source-queue snippet type-checks against BoundedSourceQueue.offer returning QueueOfferResult) References: Refs apache/pekko#3094 --- .../paradox/stream/operators/Source/queue.md | 70 +++++++++---------- .../main/paradox/stream/operators/index.md | 2 +- .../docs/stream/IntegrationDocSpec.scala | 9 ++- 3 files changed, 37 insertions(+), 44 deletions(-) diff --git a/docs/src/main/paradox/stream/operators/Source/queue.md b/docs/src/main/paradox/stream/operators/Source/queue.md index 71875dadeb..5b77e6ad1f 100644 --- a/docs/src/main/paradox/stream/operators/Source/queue.md +++ b/docs/src/main/paradox/stream/operators/Source/queue.md @@ -1,63 +1,58 @@ # Source.queue -Materialize a `BoundedSourceQueue` or `SourceQueue` onto which elements can be pushed for emitting from the source. +Materialize a @apidoc[BoundedSourceQueue] onto which elements can be pushed for emitting from the source. @ref[Source operators](../index.md#source-operators) -## Signature (`BoundedSourceQueue`) +## Signature -@apidoc[Source.queue](Source$) { scala="#queue[T](bufferSize:Int):org.apache.pekko.stream.scaladsl.Source[T,org.apache.pekko.stream.scaladsl.BoundedSourceQueue[T]]" java="#queue(int)" } +@apidoc[Source.queue](Source$) { scala="#queue[T](bufferSize:Int):org.apache.pekko.stream.scaladsl.Source[T,org.apache.pekko.stream.BoundedSourceQueue[T]]" java="#queue(int)" } -## Description (`BoundedSourceQueue`) +## Description -The `BoundedSourceQueue` is an optimized variant of the `SourceQueue` which will drop the newest elements when back pressuring and buffer is full. -The `BoundedSourceQueue` will give immediate, synchronous feedback whether an element was accepted or not and is therefore recommended for situations where overload and dropping elements is expected and needs to be handled quickly. +`Source.queue[T](bufferSize)` materializes a @apidoc[BoundedSourceQueue] that gives immediate, synchronous feedback on whether an element was accepted. When the buffer is full the newest element is dropped and `offer` returns @apidoc[QueueOfferResult.Dropped$]. This is the recommended path for bridging an imperative producer with a Pekko Stream. -In contrast, the `SourceQueue` offers more variety of `OverflowStrategies` but feedback is only asynchronously provided through a @scala[`Future`]@java[`CompletionStage`] value. -In cases where elements need to be discarded quickly at times of overload to avoid out-of-memory situations, delivering feedback asynchronously can itself become a problem. -This happens if elements come in faster than the feedback can be delivered in which case the feedback mechanism itself is part of the reason that an out-of-memory situation arises. +The @apidoc[BoundedSourceQueue] contains a buffer that can be used by many producers on different threads. When the buffer is full, it will not accept more elements. The return value of `BoundedSourceQueue.offer` is a @apidoc[QueueOfferResult] (not a @scala[`Future`]@java[`CompletionStage`]), which is important in overload scenarios: delivering acknowledgements asynchronously can itself become a source of out-of-memory errors when elements arrive faster than the feedback can be processed. -In summary, prefer `BoundedSourceQueue` over `SourceQueue` especially in high-load scenarios. -Use `SourceQueue` if you need one of the other `OverflowStrategies`. +If you need **backpressure** towards the producer (rather than dropping), prefer one of: -The `BoundedSourceQueue` contains a buffer that can be used by many producers on different threads. -When the buffer is full, the `BoundedSourceQueue` will not accept more elements. -The return value of `BoundedSourceQueue.offer()` immediately returns a `QueueOfferResult` (as opposed to an asynchronous value returned by `SourceQueue`). -A synchronous result is important in order to avoid situations where offer acknowledgements are handled slower than the rate of which elements are offered, which will eventually lead to an Out Of Memory error. +* @apidoc[Source.actorRefWithBackpressure] — for a single, imperative producer that can wait for ack messages. +* @apidoc[MergeHub.source$] — for multiple producers that should be backpressured individually. -## Example (`BoundedSourceQueue`) +@@@ warning { title="Deprecation notice" } -Scala -: @@snip [IntegrationDocSpec.scala](/docs/src/test/scala/docs/stream/IntegrationDocSpec.scala) { #source-queue-synchronous } +The `Source.queue` overloads that accept an @apidoc[OverflowStrategy] (and materialize a @apidoc[SourceQueueWithComplete]) are **deprecated since 2.0.0**. Their asynchronous `offer` @scala[`Future`]@java[`CompletionStage`] can hang indefinitely under `OverflowStrategy.backpressure` when downstream stalls, which has caused real-world deadlocks. Use `Source.queue[T](bufferSize)` (this page), @apidoc[Source.actorRefWithBackpressure], or @apidoc[MergeHub.source$] instead. -Java -: @@snip [IntegrationDocTest.java](/docs/src/test/java/jdocs/stream/IntegrationDocTest.java) { #source-queue-synchronous } +@@@ -## Signature (`SourceQueue`) +## Example -@apidoc[Source.queue](Source$) { scala="#queue[T](bufferSize:Int,overflowStrategy:org.apache.pekko.stream.OverflowStrategy):org.apache.pekko.stream.scaladsl.Source[T,org.apache.pekko.stream.scaladsl.SourceQueueWithComplete[T]]" java="#queue(int,org.apache.pekko.stream.OverflowStrategy)" } -@apidoc[Source.queue](Source$) { scala="#queue[T](bufferSize:Int,overflowStrategy:org.apache.pekko.stream.OverflowStrategy,maxConcurrentOffers:Int):org.apache.pekko.stream.scaladsl.Source[T,org.apache.pekko.stream.scaladsl.SourceQueueWithComplete[T]]" java="#queue(int,org.apache.pekko.stream.OverflowStrategy,int)" } +Scala +: @@snip [IntegrationDocSpec.scala](/docs/src/test/scala/docs/stream/IntegrationDocSpec.scala) { #source-queue } -## Description (`SourceQueue`) +Java +: @@snip [IntegrationDocTest.java](/docs/src/test/java/jdocs/stream/IntegrationDocTest.java) { #source-queue } -Materialize a `SourceQueue` onto which elements can be pushed for emitting from the source. The queue contains -a buffer, if elements are pushed onto the queue faster than the source is consumed the overflow will be handled with -a strategy specified by the user. Functionality for tracking when an element has been emitted is available through -`SourceQueue.offer`. +A synchronous example that inspects every `QueueOfferResult` directly: -Using `Source.queue` you can push elements to the queue and they will be emitted to the stream if there is -demand from downstream, otherwise they will be buffered until request for demand is received. Elements in the buffer -will be discarded if downstream is terminated. +Scala +: @@snip [IntegrationDocSpec.scala](/docs/src/test/scala/docs/stream/IntegrationDocSpec.scala) { #source-queue-synchronous } -In combination with the queue, the @ref[`throttle`](./../Source-or-Flow/throttle.md) operator can be used to control the processing to a given limit, e.g. `5 elements` per `3 seconds`. +Java +: @@snip [IntegrationDocTest.java](/docs/src/test/java/jdocs/stream/IntegrationDocTest.java) { #source-queue-synchronous } -## Example (`SourceQueue`) +## Migrating from the deprecated `Source.queue(Int, OverflowStrategy)` overloads -Scala -: @@snip [IntegrationDocSpec.scala](/docs/src/test/scala/docs/stream/IntegrationDocSpec.scala) { #source-queue } +| Old call | Replacement | +|----------|-------------| +| `Source.queue(n, OverflowStrategy.dropNew)` | `Source.queue[T](n)` — `BoundedSourceQueue` already drops the newest element. | +| `Source.queue(n, OverflowStrategy.dropHead)` | `Source.queue[T](n)` if drop-new is acceptable. Otherwise build a custom @apidoc[GraphStage] with a FIFO buffer that drops the head. | +| `Source.queue(n, OverflowStrategy.dropTail)` | Same as above; `BoundedSourceQueue` always drops the tail of the pending offer (i.e. the newest). | +| `Source.queue(n, OverflowStrategy.dropBuffer)` | `Source.queue[T](n)` combined with a @apidoc[GraphStage] that clears the buffer on overflow, or rework the producer to tolerate drops. | +| `Source.queue(n, OverflowStrategy.fail)` | `Source.queue[T](n)` and, on `QueueOfferResult.Dropped`, call @apidoc[BoundedSourceQueue.fail] with a `BufferOverflowException`. | +| `Source.queue(n, OverflowStrategy.backpressure)` | @apidoc[Source.actorRefWithBackpressure] (single imperative producer) or @apidoc[MergeHub.source$] (multiple producers). | -Java -: @@snip [IntegrationDocTest.java](/docs/src/test/java/jdocs/stream/IntegrationDocTest.java) { #source-queue } +`SourceQueueWithComplete.offer` returned a @scala[`Future[QueueOfferResult]`]@java[`CompletionStage`]; `BoundedSourceQueue.offer` returns `QueueOfferResult` synchronously. Call sites that previously chained `.map`/`.flatMap` on the offer future can usually be rewritten as a direct `match`/`switch` on the result. ## Reactive Streams semantics @@ -68,4 +63,3 @@ Java **completes** when downstream completes @@@ - diff --git a/docs/src/main/paradox/stream/operators/index.md b/docs/src/main/paradox/stream/operators/index.md index 011a1e1cf7..d77247fad6 100644 --- a/docs/src/main/paradox/stream/operators/index.md +++ b/docs/src/main/paradox/stream/operators/index.md @@ -33,7 +33,7 @@ These built-in sources are available from @scala[`org.apache.pekko.stream.scalad |Source|@ref[lazySource](Source/lazySource.md)|Defers creation and materialization of a `Source` until there is demand.| |Source|@ref[maybe](Source/maybe.md)|Create a source that emits once the materialized @scala[`Promise`] @java[`CompletableFuture`] is completed with a value.| |Source|@ref[never](Source/never.md)|Never emit any elements, never complete and never fail.| -|Source|@ref[queue](Source/queue.md)|Materialize a `BoundedSourceQueue` or `SourceQueue` onto which elements can be pushed for emitting from the source.| +|Source|@ref[queue](Source/queue.md)|Materialize a @apidoc[BoundedSourceQueue] onto which elements can be pushed for emitting from the source.| |Source|@ref[range](Source/range.md)|Emit each integer in a range, with an option to take bigger steps than 1.| |Source|@ref[repeat](Source/repeat.md)|Stream a single object repeatedly.| |Source|@ref[single](Source/single.md)|Stream a single object once.| diff --git a/docs/src/test/scala/docs/stream/IntegrationDocSpec.scala b/docs/src/test/scala/docs/stream/IntegrationDocSpec.scala index 02df48ced3..998e6c5a2d 100644 --- a/docs/src/test/scala/docs/stream/IntegrationDocSpec.scala +++ b/docs/src/test/scala/docs/stream/IntegrationDocSpec.scala @@ -478,7 +478,7 @@ class IntegrationDocSpec extends PekkoSpec(IntegrationDocSpec.config) { val bufferSize = 10 val elementsToProcess = 5 - val queue = Source + val queue: BoundedSourceQueue[Int] = Source .queue[Int](bufferSize) .throttle(elementsToProcess, 3.second) .map(x => x * x) @@ -487,16 +487,15 @@ class IntegrationDocSpec extends PekkoSpec(IntegrationDocSpec.config) { val source = Source(1 to 10) - implicit val ec = system.dispatcher source - .map(x => { - queue.offer(x).map { + .map { x => + queue.offer(x) match { case QueueOfferResult.Enqueued => println(s"enqueued $x") case QueueOfferResult.Dropped => println(s"dropped $x") case QueueOfferResult.Failure(ex) => println(s"Offer failed ${ex.getMessage}") case QueueOfferResult.QueueClosed => println("Source Queue closed") } - }) + } .runWith(Sink.ignore) // #source-queue } From 44e3d67c4209935da834138c7647b8e1d812b861 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=99=8E=E9=B8=A3?= Date: Sat, 20 Jun 2026 02:13:24 +0800 Subject: [PATCH 4/5] docs(stream): broaden deprecation coverage across actor-interop and see-also links MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Motivation: A reviewer pointed out that docs/src/main/paradox/stream/actor-interop.md still describes the deprecated OverflowStrategy.backpressure path as the recommended Source.queue usage, while the #source-queue snippet it references was updated in the previous commit to use BoundedSourceQueue with a synchronous match. The surrounding prose (lines 131-161) was therefore inconsistent with the snippet — telling the reader "offer returns a Future" while the snippet showed a direct match on QueueOfferResult. Separately, the "See also" entry for Source.queue in operators/Source/actorRefWithBackpressure.md still referred to the deprecated "SourceQueue" wording, even though the recommended path now materializes a BoundedSourceQueue. A second reviewer pass also flagged that the #source-queue-synchronous Scala snippet still declared `implicit val ec = system.dispatcher` even though the synchronous match on QueueOfferResult does not need an ExecutionContext. This was misleading in an example that is explicitly labeled "synchronous". Modification: - actor-interop.md: prepend a @@@ warning { } callout at the top of the Source.queue subsection flagging the deprecated API and explicitly noting that the referenced snippet has been updated to the non-deprecated path. Existing prose (lines 131-173) is left untouched so historical context is preserved. - actorRefWithBackpressure.md: update the Source.queue "See also" entry wording from "SourceQueue" to "BoundedSourceQueue". - queue.md: keep the original prose and structure intact (both BoundedSourceQueue and SourceQueue sections, both examples, all existing wording). Add a @@@ warning { } callout after the operator summary (post-index-extractor block) that points readers to the recommended API and to the migration table. Add a smaller warning directly above the deprecated Signature(SourceQueue) block. Append the per-OverflowStrategy migration table at the bottom. - index.md: restored (no change vs HEAD~1) — the landing row still mentions both BoundedSourceQueue and SourceQueue. - IntegrationDocSpec.scala: drop `implicit val ec = system.dispatcher` from the #source-queue-synchronous Scala snippet. The synchronous match on QueueOfferResult does not require an ExecutionContext. Result: Readers landing on actor-interop.md or the Source.queue page see a clear deprecation notice before the legacy prose. Operators index entry is unchanged. The SourceQueue subsection of queue.md still documents the deprecated path for users on older versions, with an explicit warning above it. The synchronous Scala snippet no longer declares an unused ExecutionContext. No historical prose was removed. Tests: - sbt "docs / Compile / managedResources" — pass (paradox StreamOperatorsIndexGenerator extracts the summary/category link successfully) - sbt "docs / Test / compile" — pass (1 Scala source incremental recompile after the ec removal; prior run had passed 224 Scala + 234 Java sources) References: Refs apache/pekko#3094 --- docs/src/main/paradox/stream/actor-interop.md | 8 ++ .../Source/actorRefWithBackpressure.md | 2 +- .../paradox/stream/operators/Source/queue.md | 88 +++++++++++++------ .../main/paradox/stream/operators/index.md | 2 +- .../docs/stream/IntegrationDocSpec.scala | 1 - 5 files changed, 70 insertions(+), 31 deletions(-) diff --git a/docs/src/main/paradox/stream/actor-interop.md b/docs/src/main/paradox/stream/actor-interop.md index cd1bee5a16..6a0239003a 100644 --- a/docs/src/main/paradox/stream/actor-interop.md +++ b/docs/src/main/paradox/stream/actor-interop.md @@ -128,6 +128,14 @@ use `Sink.actorRefWithBackpressure` or `ask` in `mapAsync`, though. ### Source.queue +@@@ warning { title="Deprecation notice (since 2.0.0)" } + +The `Source.queue(Int, OverflowStrategy)` overloads that this section describes — including `OverflowStrategy.backpressure` — are **deprecated** because their asynchronous `offer` @scala[`Future`]@java[`CompletionStage`] can hang indefinitely when downstream stalls. Use `Source.queue[T](bufferSize)` (materializes a @apidoc[BoundedSourceQueue] with synchronous feedback), or for backpressure towards the producer use @apidoc[Source.actorRefWithBackpressure] or @apidoc[MergeHub.source$]. See @ref:[the Source.queue operator page](operators/Source/queue.md) for a per-strategy migration table. + +The snippet below has been updated to the recommended non-deprecated API, so it uses a synchronous `match`/`switch` on `QueueOfferResult` rather than the `pipe`-the-`Future` pattern described in the surrounding prose. + +@@@ + `Source.queue` is an improvement over `Sink.actorRef`, since it can provide backpressure. The `offer` method returns a @scala[`Future`]@java[`CompletionStage`], which completes with the result of the enqueue operation. diff --git a/docs/src/main/paradox/stream/operators/Source/actorRefWithBackpressure.md b/docs/src/main/paradox/stream/operators/Source/actorRefWithBackpressure.md index 9d3601abdf..3061057ead 100644 --- a/docs/src/main/paradox/stream/operators/Source/actorRefWithBackpressure.md +++ b/docs/src/main/paradox/stream/operators/Source/actorRefWithBackpressure.md @@ -18,7 +18,7 @@ See also: * @ref[Source.actorRef](../Source/actorRef.md) This operator without backpressure control * @ref[ActorSource.actorRef](../ActorSource/actorRef.md) The operator for the new actors API without backpressure control * @ref[ActorSource.actorRefWithBackpressure](../ActorSource/actorRefWithBackpressure.md) The corresponding operator for the new actors API -* @ref[Source.queue](../Source/queue.md) Materialize a `SourceQueue` onto which elements can be pushed for emitting from the source +* @ref[Source.queue](../Source/queue.md) Materialize a `BoundedSourceQueue` onto which elements can be pushed for emitting from the source ## Examples diff --git a/docs/src/main/paradox/stream/operators/Source/queue.md b/docs/src/main/paradox/stream/operators/Source/queue.md index 5b77e6ad1f..877622c5c1 100644 --- a/docs/src/main/paradox/stream/operators/Source/queue.md +++ b/docs/src/main/paradox/stream/operators/Source/queue.md @@ -1,31 +1,71 @@ # Source.queue -Materialize a @apidoc[BoundedSourceQueue] onto which elements can be pushed for emitting from the source. +Materialize a `BoundedSourceQueue` or `SourceQueue` onto which elements can be pushed for emitting from the source. @ref[Source operators](../index.md#source-operators) -## Signature +@@@ warning { title="Deprecation notice (since 2.0.0)" } -@apidoc[Source.queue](Source$) { scala="#queue[T](bufferSize:Int):org.apache.pekko.stream.scaladsl.Source[T,org.apache.pekko.stream.BoundedSourceQueue[T]]" java="#queue(int)" } +The `Source.queue` overloads that accept an @apidoc[OverflowStrategy] (and materialize a @apidoc[SourceQueueWithComplete]) are **deprecated**. Their asynchronous `offer` @scala[`Future`]@java[`CompletionStage`] can hang indefinitely under `OverflowStrategy.backpressure` when downstream stalls, which has caused real-world deadlocks. -## Description +Prefer `Source.queue[T](bufferSize)` (this page), which materializes a @apidoc[BoundedSourceQueue] with synchronous feedback and drop-newest overflow. For backpressure towards the producer, use @apidoc[Source.actorRefWithBackpressure] (single imperative producer) or @apidoc[MergeHub.source$] (multiple producers). See the [migration table](#migrating-from-the-deprecated-sourcequeueint-overflowstrategy-overloads) below for a per-strategy replacement. -`Source.queue[T](bufferSize)` materializes a @apidoc[BoundedSourceQueue] that gives immediate, synchronous feedback on whether an element was accepted. When the buffer is full the newest element is dropped and `offer` returns @apidoc[QueueOfferResult.Dropped$]. This is the recommended path for bridging an imperative producer with a Pekko Stream. +@@@ + +## Signature (`BoundedSourceQueue`) + +@apidoc[Source.queue](Source$) { scala="#queue[T](bufferSize:Int):org.apache.pekko.stream.scaladsl.Source[T,org.apache.pekko.stream.scaladsl.BoundedSourceQueue[T]]" java="#queue(int)" } + +## Description (`BoundedSourceQueue`) + +The `BoundedSourceQueue` is an optimized variant of the `SourceQueue` which will drop the newest elements when back pressuring and buffer is full. +The `BoundedSourceQueue` will give immediate, synchronous feedback whether an element was accepted or not and is therefore recommended for situations where overload and dropping elements is expected and needs to be handled quickly. -The @apidoc[BoundedSourceQueue] contains a buffer that can be used by many producers on different threads. When the buffer is full, it will not accept more elements. The return value of `BoundedSourceQueue.offer` is a @apidoc[QueueOfferResult] (not a @scala[`Future`]@java[`CompletionStage`]), which is important in overload scenarios: delivering acknowledgements asynchronously can itself become a source of out-of-memory errors when elements arrive faster than the feedback can be processed. +In contrast, the `SourceQueue` offers more variety of `OverflowStrategies` but feedback is only asynchronously provided through a @scala[`Future`]@java[`CompletionStage`] value. +In cases where elements need to be discarded quickly at times of overload to avoid out-of-memory situations, delivering feedback asynchronously can itself become a problem. +This happens if elements come in faster than the feedback can be delivered in which case the feedback mechanism itself is part of the reason that an out-of-memory situation arises. + +In summary, prefer `BoundedSourceQueue` over `SourceQueue` especially in high-load scenarios. +Use `SourceQueue` if you need one of the other `OverflowStrategies`. + +The `BoundedSourceQueue` contains a buffer that can be used by many producers on different threads. +When the buffer is full, the `BoundedSourceQueue` will not accept more elements. +The return value of `BoundedSourceQueue.offer()` immediately returns a `QueueOfferResult` (as opposed to an asynchronous value returned by `SourceQueue`). +A synchronous result is important in order to avoid situations where offer acknowledgements are handled slower than the rate of which elements are offered, which will eventually lead to an Out Of Memory error. + +## Example (`BoundedSourceQueue`) + +Scala +: @@snip [IntegrationDocSpec.scala](/docs/src/test/scala/docs/stream/IntegrationDocSpec.scala) { #source-queue-synchronous } -If you need **backpressure** towards the producer (rather than dropping), prefer one of: +Java +: @@snip [IntegrationDocTest.java](/docs/src/test/java/jdocs/stream/IntegrationDocTest.java) { #source-queue-synchronous } -* @apidoc[Source.actorRefWithBackpressure] — for a single, imperative producer that can wait for ack messages. -* @apidoc[MergeHub.source$] — for multiple producers that should be backpressured individually. +## Signature (`SourceQueue`) -@@@ warning { title="Deprecation notice" } +@@@ warning -The `Source.queue` overloads that accept an @apidoc[OverflowStrategy] (and materialize a @apidoc[SourceQueueWithComplete]) are **deprecated since 2.0.0**. Their asynchronous `offer` @scala[`Future`]@java[`CompletionStage`] can hang indefinitely under `OverflowStrategy.backpressure` when downstream stalls, which has caused real-world deadlocks. Use `Source.queue[T](bufferSize)` (this page), @apidoc[Source.actorRefWithBackpressure], or @apidoc[MergeHub.source$] instead. +These two overloads are deprecated since 2.0.0. See the [migration table](#migrating-from-the-deprecated-sourcequeueint-overflowstrategy-overloads) below. @@@ -## Example +@apidoc[Source.queue](Source$) { scala="#queue[T](bufferSize:Int,overflowStrategy:org.apache.pekko.stream.OverflowStrategy):org.apache.pekko.stream.scaladsl.Source[T,org.apache.pekko.stream.scaladsl.SourceQueueWithComplete[T]]" java="#queue(int,org.apache.pekko.stream.OverflowStrategy)" } +@apidoc[Source.queue](Source$) { scala="#queue[T](bufferSize:Int,overflowStrategy:org.apache.pekko.stream.OverflowStrategy,maxConcurrentOffers:Int):org.apache.pekko.stream.scaladsl.Source[T,org.apache.pekko.stream.scaladsl.SourceQueueWithComplete[T]]" java="#queue(int,org.apache.pekko.stream.OverflowStrategy,int)" } + +## Description (`SourceQueue`) + +Materialize a `SourceQueue` onto which elements can be pushed for emitting from the source. The queue contains +a buffer, if elements are pushed onto the queue faster than the source is consumed the overflow will be handled with +a strategy specified by the user. Functionality for tracking when an element has been emitted is available through +`SourceQueue.offer`. + +Using `Source.queue` you can push elements to the queue and they will be emitted to the stream if there is +demand from downstream, otherwise they will be buffered until request for demand is received. Elements in the buffer +will be discarded if downstream is terminated. + +In combination with the queue, the @ref[`throttle`](./../Source-or-Flow/throttle.md) operator can be used to control the processing to a given limit, e.g. `5 elements` per `3 seconds`. + +## Example (`SourceQueue`) Scala : @@snip [IntegrationDocSpec.scala](/docs/src/test/scala/docs/stream/IntegrationDocSpec.scala) { #source-queue } @@ -33,13 +73,15 @@ Scala Java : @@snip [IntegrationDocTest.java](/docs/src/test/java/jdocs/stream/IntegrationDocTest.java) { #source-queue } -A synchronous example that inspects every `QueueOfferResult` directly: +## Reactive Streams semantics -Scala -: @@snip [IntegrationDocSpec.scala](/docs/src/test/scala/docs/stream/IntegrationDocSpec.scala) { #source-queue-synchronous } +@@@div { .callout } -Java -: @@snip [IntegrationDocTest.java](/docs/src/test/java/jdocs/stream/IntegrationDocTest.java) { #source-queue-synchronous } +**emits** when there is demand and the queue contains elements + +**completes** when downstream completes + +@@@ ## Migrating from the deprecated `Source.queue(Int, OverflowStrategy)` overloads @@ -47,19 +89,9 @@ Java |----------|-------------| | `Source.queue(n, OverflowStrategy.dropNew)` | `Source.queue[T](n)` — `BoundedSourceQueue` already drops the newest element. | | `Source.queue(n, OverflowStrategy.dropHead)` | `Source.queue[T](n)` if drop-new is acceptable. Otherwise build a custom @apidoc[GraphStage] with a FIFO buffer that drops the head. | -| `Source.queue(n, OverflowStrategy.dropTail)` | Same as above; `BoundedSourceQueue` always drops the tail of the pending offer (i.e. the newest). | +| `Source.queue(n, OverflowStrategy.dropTail)` | Same as above; `BoundedSourceQueue` always drops the newest offer (i.e. the tail). | | `Source.queue(n, OverflowStrategy.dropBuffer)` | `Source.queue[T](n)` combined with a @apidoc[GraphStage] that clears the buffer on overflow, or rework the producer to tolerate drops. | | `Source.queue(n, OverflowStrategy.fail)` | `Source.queue[T](n)` and, on `QueueOfferResult.Dropped`, call @apidoc[BoundedSourceQueue.fail] with a `BufferOverflowException`. | | `Source.queue(n, OverflowStrategy.backpressure)` | @apidoc[Source.actorRefWithBackpressure] (single imperative producer) or @apidoc[MergeHub.source$] (multiple producers). | `SourceQueueWithComplete.offer` returned a @scala[`Future[QueueOfferResult]`]@java[`CompletionStage`]; `BoundedSourceQueue.offer` returns `QueueOfferResult` synchronously. Call sites that previously chained `.map`/`.flatMap` on the offer future can usually be rewritten as a direct `match`/`switch` on the result. - -## Reactive Streams semantics - -@@@div { .callout } - -**emits** when there is demand and the queue contains elements - -**completes** when downstream completes - -@@@ diff --git a/docs/src/main/paradox/stream/operators/index.md b/docs/src/main/paradox/stream/operators/index.md index d77247fad6..011a1e1cf7 100644 --- a/docs/src/main/paradox/stream/operators/index.md +++ b/docs/src/main/paradox/stream/operators/index.md @@ -33,7 +33,7 @@ These built-in sources are available from @scala[`org.apache.pekko.stream.scalad |Source|@ref[lazySource](Source/lazySource.md)|Defers creation and materialization of a `Source` until there is demand.| |Source|@ref[maybe](Source/maybe.md)|Create a source that emits once the materialized @scala[`Promise`] @java[`CompletableFuture`] is completed with a value.| |Source|@ref[never](Source/never.md)|Never emit any elements, never complete and never fail.| -|Source|@ref[queue](Source/queue.md)|Materialize a @apidoc[BoundedSourceQueue] onto which elements can be pushed for emitting from the source.| +|Source|@ref[queue](Source/queue.md)|Materialize a `BoundedSourceQueue` or `SourceQueue` onto which elements can be pushed for emitting from the source.| |Source|@ref[range](Source/range.md)|Emit each integer in a range, with an option to take bigger steps than 1.| |Source|@ref[repeat](Source/repeat.md)|Stream a single object repeatedly.| |Source|@ref[single](Source/single.md)|Stream a single object once.| diff --git a/docs/src/test/scala/docs/stream/IntegrationDocSpec.scala b/docs/src/test/scala/docs/stream/IntegrationDocSpec.scala index 998e6c5a2d..2518cf09bd 100644 --- a/docs/src/test/scala/docs/stream/IntegrationDocSpec.scala +++ b/docs/src/test/scala/docs/stream/IntegrationDocSpec.scala @@ -518,7 +518,6 @@ class IntegrationDocSpec extends PekkoSpec(IntegrationDocSpec.config) { val fastElements = 1 to 10 - implicit val ec = system.dispatcher fastElements.foreach { x => queue.offer(x) match { case QueueOfferResult.Enqueued => println(s"enqueued $x") From afa7323dbc5175774070dc8a53835f94978b9866 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=99=8E=E9=B8=A3?= Date: Sat, 20 Jun 2026 03:39:36 +0800 Subject: [PATCH 5/5] fix(docs): resolve invalid @apidoc references in Source.queue deprecation docs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Motivation: Paradox @apidoc directives only resolve class/object names. Method references (Source.actorRefWithBackpressure, MergeHub.source$), arbitrary text (single imperative producer), and parenthetical text immediately following @apidoc[...] were all misparsed, causing the docs/paradox build to fail in CI. Modification: - Replace @apidoc[Source.actorRefWithBackpressure] with @ref links to the existing operator page. - Replace @apidoc[MergeHub.source$] with plain code (no operator page). - Replace @apidoc[SourceQueueWithComplete] and @apidoc[BoundedSourceQueue.fail] with plain code (class in context / method reference). - Restructure queue.md:9 to remove parentheses immediately after @apidoc[OverflowStrategy] that Paradox misread as anchor syntax. Result: sbt docs/paradox passes. CI paradoxMarkdownToHtml step succeeds. Tests: - sbt docs/paradox — pass References: Fixes CI failure in PR #3095 --- docs/src/main/paradox/stream/actor-interop.md | 2 +- docs/src/main/paradox/stream/operators/Source/queue.md | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/docs/src/main/paradox/stream/actor-interop.md b/docs/src/main/paradox/stream/actor-interop.md index 6a0239003a..0b04d99266 100644 --- a/docs/src/main/paradox/stream/actor-interop.md +++ b/docs/src/main/paradox/stream/actor-interop.md @@ -130,7 +130,7 @@ use `Sink.actorRefWithBackpressure` or `ask` in `mapAsync`, though. @@@ warning { title="Deprecation notice (since 2.0.0)" } -The `Source.queue(Int, OverflowStrategy)` overloads that this section describes — including `OverflowStrategy.backpressure` — are **deprecated** because their asynchronous `offer` @scala[`Future`]@java[`CompletionStage`] can hang indefinitely when downstream stalls. Use `Source.queue[T](bufferSize)` (materializes a @apidoc[BoundedSourceQueue] with synchronous feedback), or for backpressure towards the producer use @apidoc[Source.actorRefWithBackpressure] or @apidoc[MergeHub.source$]. See @ref:[the Source.queue operator page](operators/Source/queue.md) for a per-strategy migration table. +The `Source.queue(Int, OverflowStrategy)` overloads that this section describes — including `OverflowStrategy.backpressure` — are **deprecated** because their asynchronous `offer` @scala[`Future`]@java[`CompletionStage`] can hang indefinitely when downstream stalls. Use `Source.queue[T](bufferSize)` (materializes a @apidoc[BoundedSourceQueue] with synchronous feedback), or for backpressure towards the producer use @ref:[`Source.actorRefWithBackpressure`](operators/Source/actorRefWithBackpressure.md) or `MergeHub.source`. See @ref:[the Source.queue operator page](operators/Source/queue.md) for a per-strategy migration table. The snippet below has been updated to the recommended non-deprecated API, so it uses a synchronous `match`/`switch` on `QueueOfferResult` rather than the `pipe`-the-`Future` pattern described in the surrounding prose. diff --git a/docs/src/main/paradox/stream/operators/Source/queue.md b/docs/src/main/paradox/stream/operators/Source/queue.md index 877622c5c1..4fc5235162 100644 --- a/docs/src/main/paradox/stream/operators/Source/queue.md +++ b/docs/src/main/paradox/stream/operators/Source/queue.md @@ -6,9 +6,9 @@ Materialize a `BoundedSourceQueue` or `SourceQueue` onto which elements can be p @@@ warning { title="Deprecation notice (since 2.0.0)" } -The `Source.queue` overloads that accept an @apidoc[OverflowStrategy] (and materialize a @apidoc[SourceQueueWithComplete]) are **deprecated**. Their asynchronous `offer` @scala[`Future`]@java[`CompletionStage`] can hang indefinitely under `OverflowStrategy.backpressure` when downstream stalls, which has caused real-world deadlocks. +The `Source.queue` overloads that accept an @apidoc[OverflowStrategy] and materialize a `SourceQueueWithComplete` are **deprecated**. Their asynchronous `offer` @scala[`Future`]@java[`CompletionStage`] can hang indefinitely under `OverflowStrategy.backpressure` when downstream stalls, which has caused real-world deadlocks. -Prefer `Source.queue[T](bufferSize)` (this page), which materializes a @apidoc[BoundedSourceQueue] with synchronous feedback and drop-newest overflow. For backpressure towards the producer, use @apidoc[Source.actorRefWithBackpressure] (single imperative producer) or @apidoc[MergeHub.source$] (multiple producers). See the [migration table](#migrating-from-the-deprecated-sourcequeueint-overflowstrategy-overloads) below for a per-strategy replacement. +Prefer `Source.queue[T](bufferSize)` (this page), which materializes a @apidoc[BoundedSourceQueue] with synchronous feedback and drop-newest overflow. For backpressure towards the producer, use @ref:[`Source.actorRefWithBackpressure`](actorRefWithBackpressure.md) (single imperative producer) or `MergeHub.source` (multiple producers). See the [migration table](#migrating-from-the-deprecated-sourcequeueint-overflowstrategy-overloads) below for a per-strategy replacement. @@@ @@ -91,7 +91,7 @@ Java | `Source.queue(n, OverflowStrategy.dropHead)` | `Source.queue[T](n)` if drop-new is acceptable. Otherwise build a custom @apidoc[GraphStage] with a FIFO buffer that drops the head. | | `Source.queue(n, OverflowStrategy.dropTail)` | Same as above; `BoundedSourceQueue` always drops the newest offer (i.e. the tail). | | `Source.queue(n, OverflowStrategy.dropBuffer)` | `Source.queue[T](n)` combined with a @apidoc[GraphStage] that clears the buffer on overflow, or rework the producer to tolerate drops. | -| `Source.queue(n, OverflowStrategy.fail)` | `Source.queue[T](n)` and, on `QueueOfferResult.Dropped`, call @apidoc[BoundedSourceQueue.fail] with a `BufferOverflowException`. | -| `Source.queue(n, OverflowStrategy.backpressure)` | @apidoc[Source.actorRefWithBackpressure] (single imperative producer) or @apidoc[MergeHub.source$] (multiple producers). | +| `Source.queue(n, OverflowStrategy.fail)` | `Source.queue[T](n)` and, on `QueueOfferResult.Dropped`, call `BoundedSourceQueue.fail` with a `BufferOverflowException`. | +| `Source.queue(n, OverflowStrategy.backpressure)` | @ref:[`Source.actorRefWithBackpressure`](actorRefWithBackpressure.md) (single imperative producer) or `MergeHub.source` (multiple producers). | `SourceQueueWithComplete.offer` returned a @scala[`Future[QueueOfferResult]`]@java[`CompletionStage`]; `BoundedSourceQueue.offer` returns `QueueOfferResult` synchronously. Call sites that previously chained `.map`/`.flatMap` on the offer future can usually be rewritten as a direct `match`/`switch` on the result.