Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package org.apache.pekko.stream

import java.util.concurrent.TimeUnit

import scala.annotation.nowarn
import scala.concurrent._
import scala.concurrent.duration._

Expand All @@ -27,6 +28,7 @@ import pekko.stream.scaladsl._
@State(Scope.Benchmark)
@OutputTimeUnit(TimeUnit.SECONDS)
@BenchmarkMode(Array(Mode.Throughput))
@nowarn("msg=deprecated")
class InvokeWithFeedbackBenchmark {
implicit val system: ActorSystem = ActorSystem("InvokeWithFeedbackBenchmark")

Expand Down
8 changes: 8 additions & 0 deletions docs/src/main/paradox/stream/actor-interop.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,14 @@ use `Sink.actorRefWithBackpressure` or `ask` in `mapAsync`, though.

### Source.queue

@@@ warning { title="Deprecation notice (since 2.0.0)" }

The `Source.queue(Int, OverflowStrategy)` overloads that this section describes — including `OverflowStrategy.backpressure` — are **deprecated** because their asynchronous `offer` @scala[`Future`]@java[`CompletionStage`] can hang indefinitely when downstream stalls. Use `Source.queue[T](bufferSize)` (materializes a @apidoc[BoundedSourceQueue] with synchronous feedback), or for backpressure towards the producer use @ref:[`Source.actorRefWithBackpressure`](operators/Source/actorRefWithBackpressure.md) or `MergeHub.source`. See @ref:[the Source.queue operator page](operators/Source/queue.md) for a per-strategy migration table.

The snippet below has been updated to the recommended non-deprecated API, so it uses a synchronous `match`/`switch` on `QueueOfferResult` rather than the `pipe`-the-`Future` pattern described in the surrounding prose.

@@@

`Source.queue` is an improvement over `Sink.actorRef`, since it can provide backpressure.
The `offer` method returns a @scala[`Future`]@java[`CompletionStage`], which completes with the result of the enqueue operation.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ See also:
* @ref[Source.actorRef](../Source/actorRef.md) This operator without backpressure control
* @ref[ActorSource.actorRef](../ActorSource/actorRef.md) The operator for the new actors API without backpressure control
* @ref[ActorSource.actorRefWithBackpressure](../ActorSource/actorRefWithBackpressure.md) The corresponding operator for the new actors API
* @ref[Source.queue](../Source/queue.md) Materialize a `SourceQueue` onto which elements can be pushed for emitting from the source
* @ref[Source.queue](../Source/queue.md) Materialize a `BoundedSourceQueue` onto which elements can be pushed for emitting from the source

## Examples

Expand Down
26 changes: 26 additions & 0 deletions docs/src/main/paradox/stream/operators/Source/queue.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,14 @@ Materialize a `BoundedSourceQueue` or `SourceQueue` onto which elements can be p

@ref[Source operators](../index.md#source-operators)

@@@ warning { title="Deprecation notice (since 2.0.0)" }

The `Source.queue` overloads that accept an @apidoc[OverflowStrategy] and materialize a `SourceQueueWithComplete` are **deprecated**. Their asynchronous `offer` @scala[`Future`]@java[`CompletionStage`] can hang indefinitely under `OverflowStrategy.backpressure` when downstream stalls, which has caused real-world deadlocks.

Prefer `Source.queue[T](bufferSize)` (this page), which materializes a @apidoc[BoundedSourceQueue] with synchronous feedback and drop-newest overflow. For backpressure towards the producer, use @ref:[`Source.actorRefWithBackpressure`](actorRefWithBackpressure.md) (single imperative producer) or `MergeHub.source` (multiple producers). See the [migration table](#migrating-from-the-deprecated-sourcequeueint-overflowstrategy-overloads) below for a per-strategy replacement.

@@@

## Signature (`BoundedSourceQueue`)

@apidoc[Source.queue](Source$) { scala="#queue[T](bufferSize:Int):org.apache.pekko.stream.scaladsl.Source[T,org.apache.pekko.stream.scaladsl.BoundedSourceQueue[T]]" java="#queue(int)" }
Expand Down Expand Up @@ -35,6 +43,12 @@ Java

## Signature (`SourceQueue`)

@@@ warning

These two overloads are deprecated since 2.0.0. See the [migration table](#migrating-from-the-deprecated-sourcequeueint-overflowstrategy-overloads) below.

@@@

@apidoc[Source.queue](Source$) { scala="#queue[T](bufferSize:Int,overflowStrategy:org.apache.pekko.stream.OverflowStrategy):org.apache.pekko.stream.scaladsl.Source[T,org.apache.pekko.stream.scaladsl.SourceQueueWithComplete[T]]" java="#queue(int,org.apache.pekko.stream.OverflowStrategy)" }
@apidoc[Source.queue](Source$) { scala="#queue[T](bufferSize:Int,overflowStrategy:org.apache.pekko.stream.OverflowStrategy,maxConcurrentOffers:Int):org.apache.pekko.stream.scaladsl.Source[T,org.apache.pekko.stream.scaladsl.SourceQueueWithComplete[T]]" java="#queue(int,org.apache.pekko.stream.OverflowStrategy,int)" }

Expand Down Expand Up @@ -69,3 +83,15 @@ Java

@@@

## Migrating from the deprecated `Source.queue(Int, OverflowStrategy)` overloads

| Old call | Replacement |
|----------|-------------|
| `Source.queue(n, OverflowStrategy.dropNew)` | `Source.queue[T](n)` — `BoundedSourceQueue` already drops the newest element. |
| `Source.queue(n, OverflowStrategy.dropHead)` | `Source.queue[T](n)` if drop-new is acceptable. Otherwise build a custom @apidoc[GraphStage] with a FIFO buffer that drops the head. |
| `Source.queue(n, OverflowStrategy.dropTail)` | Same as above; `BoundedSourceQueue` always drops the newest offer (i.e. the tail). |
| `Source.queue(n, OverflowStrategy.dropBuffer)` | `Source.queue[T](n)` combined with a @apidoc[GraphStage] that clears the buffer on overflow, or rework the producer to tolerate drops. |
| `Source.queue(n, OverflowStrategy.fail)` | `Source.queue[T](n)` and, on `QueueOfferResult.Dropped`, call `BoundedSourceQueue.fail` with a `BufferOverflowException`. |
| `Source.queue(n, OverflowStrategy.backpressure)` | @ref:[`Source.actorRefWithBackpressure`](actorRefWithBackpressure.md) (single imperative producer) or `MergeHub.source` (multiple producers). |

`SourceQueueWithComplete.offer` returned a @scala[`Future[QueueOfferResult]`]@java[`CompletionStage<QueueOfferResult>`]; `BoundedSourceQueue.offer` returns `QueueOfferResult` synchronously. Call sites that previously chained `.map`/`.flatMap` on the offer future can usually be rewritten as a direct `match`/`switch` on the result.
10 changes: 4 additions & 6 deletions docs/src/test/scala/docs/stream/IntegrationDocSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ class IntegrationDocSpec extends PekkoSpec(IntegrationDocSpec.config) {
val bufferSize = 10
val elementsToProcess = 5

val queue = Source
val queue: BoundedSourceQueue[Int] = Source
.queue[Int](bufferSize)
.throttle(elementsToProcess, 3.second)
.map(x => x * x)
Expand All @@ -487,16 +487,15 @@ class IntegrationDocSpec extends PekkoSpec(IntegrationDocSpec.config) {

val source = Source(1 to 10)

implicit val ec = system.dispatcher
source
.map(x => {
queue.offer(x).map {
.map { x =>
queue.offer(x) match {
case QueueOfferResult.Enqueued => println(s"enqueued $x")
case QueueOfferResult.Dropped => println(s"dropped $x")
case QueueOfferResult.Failure(ex) => println(s"Offer failed ${ex.getMessage}")
case QueueOfferResult.QueueClosed => println("Source Queue closed")
}
})
}
.runWith(Sink.ignore)
// #source-queue
}
Expand All @@ -519,7 +518,6 @@ class IntegrationDocSpec extends PekkoSpec(IntegrationDocSpec.config) {

val fastElements = 1 to 10

implicit val ec = system.dispatcher
fastElements.foreach { x =>
queue.offer(x) match {
case QueueOfferResult.Enqueued => println(s"enqueued $x")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import pekko.testkit.TestProbe

import org.scalatest.time.Span

@nowarn("msg=deprecated")
class QueueSourceSpec extends StreamSpec {
implicit val ec: ExecutionContextExecutor = system.dispatcher
val pause = 300.millis
Expand Down
12 changes: 12 additions & 0 deletions stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala
Original file line number Diff line number Diff line change
Expand Up @@ -692,6 +692,12 @@ object Source {
* @param bufferSize size of buffer in element count
* @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer
*/
@deprecated(
"Prefer Source.queue(bufferSize) which materializes a BoundedSourceQueue with synchronous feedback " +
"(dropping the newest element when the buffer is full). For backpressure, use Source.actorRefWithBackpressure " +
"or MergeHub.source instead. See the Pekko Streams documentation for the Source.queue migration guide.",

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not 100% sure but will go with the consensus.

Could we consider not deprecating and just being more persuasive in the docs?

I've already run into issues with the methods that we removed already in 2.0.0 and it being far from obvious what the replacement is. For me, if it's not broken, why remove it?
Having alternatives is great.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it will cause OOM and FGC at $Work, the real bug, trust me ,I was using it and later migrate to the BoundedSourceQueue one by @jrudolph , the now it works very well, my system is the Taobao Live.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since these messages all end up in the mailbox, if the system is under heavy load, the backpressure mechanism fails; you then see heap memory steadily rising, followed by Full GCs, and eventually the system crashes. I think it’s better to just remove it—how should I put it? The feature works fine under low load, but it’s riddled with bugs under high load. akkadotnet/akka.net#8248 @Aaronontheweb FYI

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I mean is, since this has indeed caused problems in my experience, we could deprecate it first and then remove it after a few years once hardly anyone is using it.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are other overflow stategies - should we consider doing something about OverflowStrategy .backpressure? We've already deprecated and removed OverflowStrategy.dropNew.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It’s futile, because messages must first enter the mailbox before policies can actually be executed; if the actor never gets scheduled, none of those policies take effect. Consequently, an Out-of-Memory (OOM) error occurs as the mailbox grows indefinitely. In my view, this design is fundamentally flawed—it poses serious production safety risks under high load and, quite simply, is prone to causing production incidents.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah these days we have Source.Channel which uses a .NET System.Threading.Channel, which has bounded write / multi or single reader / writer semantics built in. I modified the queue here to accept a cancellation token mostly to bring it in line with .NET idioms for async programming.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow, nice! Thanks for the comment. I maintain a Taobao livestreaming system; initially, I used RxJava, but I frequently ran into OOM (Out of Memory) and FGC (Full Garbage Collection) issues during broadcasts by top streamers. Later, I switched to Akka Streams' Source.queue, but I still encountered FGC problems. This had a huge impact on the streams of individual top broadcasters, and the issue wasn't resolved until I finally switched to BoundedSourceQueue. The system is running very smoothly now. thanks @jrudolph

since = "2.0.0")
@nowarn("msg=deprecated")
def queue[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, SourceQueueWithComplete[T]] =
new Source(
scaladsl.Source.queue[T](bufferSize, overflowStrategy, maxConcurrentOffers = 1).mapMaterializedValue(_.asJava))
Expand Down Expand Up @@ -730,6 +736,12 @@ object Source {
* @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer
* @param maxConcurrentOffers maximum number of pending offers when buffer is full, should be greater than 0.
*/
@deprecated(
"Prefer Source.queue(bufferSize) which materializes a BoundedSourceQueue with synchronous feedback " +
"(dropping the newest element when the buffer is full). For backpressure, use Source.actorRefWithBackpressure " +
"or MergeHub.source instead. See the Pekko Streams documentation for the Source.queue migration guide.",
since = "2.0.0")
@nowarn("msg=deprecated")
def queue[T](
bufferSize: Int,
overflowStrategy: OverflowStrategy,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ package org.apache.pekko.stream.scaladsl

import java.util.concurrent.CompletionStage

import scala.annotation.{ switch, tailrec, varargs }
import scala.annotation.{ nowarn, switch, tailrec, varargs }
import scala.annotation.unchecked.uncheckedVariance
import scala.collection.{ immutable, AbstractIterator }
import scala.concurrent.{ Future, Promise }
Expand Down Expand Up @@ -1002,6 +1002,12 @@ object Source {
* @param bufferSize size of buffer in element count
* @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer
*/
@deprecated(
"Prefer Source.queue[T](bufferSize) which materializes a BoundedSourceQueue with synchronous feedback " +
"(dropping the newest element when the buffer is full). For backpressure, use Source.actorRefWithBackpressure " +
"or MergeHub.source instead. See the Pekko Streams documentation for the Source.queue migration guide.",
since = "2.0.0")
@nowarn("msg=deprecated")
def queue[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, SourceQueueWithComplete[T]] =
queue(bufferSize, overflowStrategy, maxConcurrentOffers = 1)

Expand Down Expand Up @@ -1039,6 +1045,11 @@ object Source {
* @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer
* @param maxConcurrentOffers maximum number of pending offers when buffer is full, should be greater than 0.
*/
@deprecated(
"Prefer Source.queue[T](bufferSize) which materializes a BoundedSourceQueue with synchronous feedback " +
"(dropping the newest element when the buffer is full). For backpressure, use Source.actorRefWithBackpressure " +
"or MergeHub.source instead. See the Pekko Streams documentation for the Source.queue migration guide.",
since = "2.0.0")
def queue[T](
bufferSize: Int,
overflowStrategy: OverflowStrategy,
Expand Down