From 302f1eab04318d99c3234b98873ff58e99790393 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=99=8E=E9=B8=A3?= Date: Tue, 23 Jun 2026 01:36:37 +0800 Subject: [PATCH 1/7] feat(stream): add alsoTo overload with configurable cancellation propagation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Motivation: alsoTo uses Broadcast[Out](2, eagerCancel = true) internally. When the side sink fails or cancels, the entire stream is terminated. Users cannot isolate the side sink — for example, a fire-and-forget logging sink should not kill the main business stream when the logging destination is temporarily unavailable. Modification: Add a new alsoTo(sink, propagateCancellation: Boolean) overload. When propagateCancellation is false, a new ResilientAlsoTo GraphStage is used instead of Broadcast. ResilientAlsoTo backpressures when either output backpressures (same contract as alsoTo), but when the side sink cancels or fails, elements continue flowing to the main downstream only. A warning is logged on side sink failure. Also add alsoToMat overloads for both Scala and Java DSLs, and update FlowWithContext/SourceWithContext. Result: Users can now use alsoTo(sink, propagateCancellation = false) to fire-and-forget to a side sink without risking main stream termination. Default behavior (propagateCancellation = true) is unchanged. Tests: - sbt "stream-tests / Test / testOnly org.apache.pekko.stream.scaladsl.FlowAlsoToSpec" — 11/11 passed - sbt "stream-tests / Test / testOnly org.apache.pekko.stream.DslConsistencySpec" — 12/12 passed - sbt "stream-tests / Test / testOnly org.apache.pekko.stream.scaladsl.FlowAlsoToAllSpec" — 2/2 passed References: Fixes #3104 --- project/StreamOperatorsIndexGenerator.scala | 1 + .../pekko/stream/DslConsistencySpec.scala | 1 + .../stream/scaladsl/FlowAlsoToSpec.scala | 288 ++++++++++++++++++ .../org/apache/pekko/stream/impl/Stages.scala | 1 + .../apache/pekko/stream/javadsl/Flow.scala | 41 +++ .../apache/pekko/stream/javadsl/Source.scala | 41 +++ .../apache/pekko/stream/javadsl/SubFlow.scala | 24 ++ .../pekko/stream/javadsl/SubSource.scala | 24 ++ .../apache/pekko/stream/scaladsl/Flow.scala | 54 ++++ .../stream/scaladsl/FlowWithContext.scala | 4 + .../stream/scaladsl/FlowWithContextOps.scala | 8 + .../apache/pekko/stream/scaladsl/Graph.scala | 106 ++++++- .../stream/scaladsl/SourceWithContext.scala | 4 + 13 files changed, 596 insertions(+), 1 deletion(-) create mode 100644 stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowAlsoToSpec.scala diff --git a/project/StreamOperatorsIndexGenerator.scala b/project/StreamOperatorsIndexGenerator.scala index 1dcce006985..daf88cc4e41 100644 --- a/project/StreamOperatorsIndexGenerator.scala +++ b/project/StreamOperatorsIndexGenerator.scala @@ -81,6 +81,7 @@ object StreamOperatorsIndexGenerator extends AutoPlugin { "mergeGraph", "wireTapGraph", "alsoToGraph", + "resilientAlsoToGraph", "orElseGraph", "divertToGraph", "zipWithGraph", diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/DslConsistencySpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/DslConsistencySpec.scala index 1befcaefb76..2aa2c3ec010 100755 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/DslConsistencySpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/DslConsistencySpec.scala @@ -90,6 +90,7 @@ class DslConsistencySpec extends AnyWordSpec with Matchers { "concatGraph", "prependGraph", "alsoToGraph", + "resilientAlsoToGraph", "wireTapGraph", "orElseGraph", "divertToGraph", diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowAlsoToSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowAlsoToSpec.scala new file mode 100644 index 00000000000..c78b25ebb82 --- /dev/null +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowAlsoToSpec.scala @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.stream.scaladsl + +import org.apache.pekko +import pekko.stream.testkit._ +import pekko.stream.testkit.scaladsl.TestSink +import pekko.stream.testkit.scaladsl.TestSource + +import scala.concurrent.duration._ + +class FlowAlsoToSpec extends StreamSpec(""" + pekko.stream.materializer.initial-input-buffer-size = 2 + """) { + + "alsoTo with propagateCancellation=true (default)" must { + + "cancel the stream when side sink cancels" in { + val (mainProbe, mainSink) = TestSink[Int]().preMaterialize() + val (sideProbe, sideSink) = TestSink[Int]().preMaterialize() + val (pub, src) = TestSource[Int]().preMaterialize() + + src.alsoTo(sideSink).runWith(mainSink) + + mainProbe.request(2) + sideProbe.request(2) + + pub.sendNext(1) + mainProbe.expectNext(1) + sideProbe.expectNext(1) + + sideProbe.cancel() + pub.expectCancellation() + } + + "forward elements to both downstreams" in { + val (mainProbe, mainSink) = TestSink[Int]().preMaterialize() + val (sideProbe, sideSink) = TestSink[Int]().preMaterialize() + val (pub, src) = TestSource[Int]().preMaterialize() + + src.alsoTo(sideSink).runWith(mainSink) + + mainProbe.request(3) + sideProbe.request(3) + + pub.sendNext(1) + pub.sendNext(2) + pub.sendNext(3) + + mainProbe.expectNext(1, 2, 3) + sideProbe.expectNext(1, 2, 3) + + pub.sendComplete() + mainProbe.expectComplete() + sideProbe.expectComplete() + } + } + + "alsoTo with propagateCancellation=false" must { + + "continue main stream when side sink cancels" in { + val (mainProbe, mainSink) = TestSink[Int]().preMaterialize() + val (sideProbe, sideSink) = TestSink[Int]().preMaterialize() + val (pub, src) = TestSource[Int]().preMaterialize() + + src.alsoTo(sideSink, propagateCancellation = false).runWith(mainSink) + + mainProbe.request(4) + sideProbe.request(2) + + pub.sendNext(1) + mainProbe.expectNext(1) + sideProbe.expectNext(1) + + pub.sendNext(2) + mainProbe.expectNext(2) + sideProbe.expectNext(2) + + sideProbe.cancel() + + pub.sendNext(3) + mainProbe.expectNext(3) + + pub.sendNext(4) + mainProbe.expectNext(4) + + pub.sendComplete() + mainProbe.expectComplete() + } + + "continue main stream when side sink fails" in { + val (mainProbe, mainSink) = TestSink[Int]().preMaterialize() + val (sideProbe, sideSink) = TestSink[Int]().preMaterialize() + val (pub, src) = TestSource[Int]().preMaterialize() + + val failingSideSink = Flow[Int].map { elem => + if (elem == 1) throw new RuntimeException("side sink failure") + elem + }.to(sideSink) + + src.alsoTo(failingSideSink, propagateCancellation = false).runWith(mainSink) + + mainProbe.request(3) + sideProbe.request(3) + + pub.sendNext(1) + mainProbe.expectNext(1) + + pub.sendNext(2) + mainProbe.expectNext(2) + + pub.sendNext(3) + mainProbe.expectNext(3) + + pub.sendComplete() + mainProbe.expectComplete() + } + + "cancel side sink when main downstream cancels" in { + val (mainProbe, mainSink) = TestSink[Int]().preMaterialize() + val (sideProbe, sideSink) = TestSink[Int]().preMaterialize() + val (pub, src) = TestSource[Int]().preMaterialize() + + src.alsoTo(sideSink, propagateCancellation = false).runWith(mainSink) + + mainProbe.request(1) + sideProbe.request(1) + + pub.sendNext(1) + mainProbe.expectNext(1) + sideProbe.expectNext(1) + + mainProbe.cancel() + pub.expectCancellation() + } + + "forward elements to both downstreams before side cancels" in { + val (mainProbe, mainSink) = TestSink[Int]().preMaterialize() + val (sideProbe, sideSink) = TestSink[Int]().preMaterialize() + val (pub, src) = TestSource[Int]().preMaterialize() + + src.alsoTo(sideSink, propagateCancellation = false).runWith(mainSink) + + mainProbe.request(3) + sideProbe.request(3) + + pub.sendNext(1) + pub.sendNext(2) + pub.sendNext(3) + + mainProbe.expectNext(1, 2, 3) + sideProbe.expectNext(1, 2, 3) + + pub.sendComplete() + mainProbe.expectComplete() + sideProbe.expectComplete() + } + + "complete normally when upstream completes" in { + val (mainProbe, mainSink) = TestSink[Int]().preMaterialize() + val (sideProbe, sideSink) = TestSink[Int]().preMaterialize() + val (pub, src) = TestSource[Int]().preMaterialize() + + src.alsoTo(sideSink, propagateCancellation = false).runWith(mainSink) + + mainProbe.request(2) + sideProbe.request(2) + + pub.sendNext(1) + pub.sendNext(2) + pub.sendComplete() + + mainProbe.expectNext(1, 2).expectComplete() + sideProbe.expectNext(1, 2).expectComplete() + } + + "handle side sink cancelling before any element is emitted" in { + val (mainProbe, mainSink) = TestSink[Int]().preMaterialize() + val (sideProbe, sideSink) = TestSink[Int]().preMaterialize() + val (pub, src) = TestSource[Int]().preMaterialize() + + src.alsoTo(sideSink, propagateCancellation = false).runWith(mainSink) + + mainProbe.request(2) + sideProbe.request(1) + + sideProbe.cancel() + + pub.sendNext(1) + mainProbe.expectNext(1) + + pub.sendNext(2) + mainProbe.expectNext(2) + + pub.sendComplete() + mainProbe.expectComplete() + } + + "propagate upstream failure to both downstreams" in { + val (mainProbe, mainSink) = TestSink[Int]().preMaterialize() + val (sideProbe, sideSink) = TestSink[Int]().preMaterialize() + val (pub, src) = TestSource[Int]().preMaterialize() + + src.alsoTo(sideSink, propagateCancellation = false).runWith(mainSink) + + mainProbe.request(1) + sideProbe.request(1) + + val ex = new RuntimeException("upstream boom") + pub.sendError(ex) + + mainProbe.expectError(ex) + sideProbe.expectError(ex) + } + + "backpressure when side sink is slow" in { + val (mainProbe, mainSink) = TestSink[Int]().preMaterialize() + val (sideProbe, sideSink) = TestSink[Int]().preMaterialize() + val (pub, src) = TestSource[Int]().preMaterialize() + + src.alsoTo(sideSink, propagateCancellation = false).runWith(mainSink) + + mainProbe.request(3) + sideProbe.request(1) + + pub.sendNext(1) + mainProbe.expectNext(1) + sideProbe.expectNext(1) + + sideProbe.request(1) + + pub.sendNext(2) + mainProbe.expectNext(2) + sideProbe.expectNext(2) + + sideProbe.request(1) + + pub.sendNext(3) + mainProbe.expectNext(3) + sideProbe.expectNext(3) + + pub.sendComplete() + mainProbe.expectComplete() + sideProbe.expectComplete() + } + + "handle side sink cancelling while pending element exists" in { + val (mainProbe, mainSink) = TestSink[Int]().preMaterialize() + val (sideProbe, sideSink) = TestSink[Int]().preMaterialize() + val (pub, src) = TestSource[Int]().preMaterialize() + + src.alsoTo(sideSink, propagateCancellation = false).runWith(mainSink) + + mainProbe.request(3) + sideProbe.request(1) + + pub.sendNext(1) + mainProbe.expectNext(1) + sideProbe.expectNext(1) + + pub.sendNext(2) + + sideProbe.cancel() + mainProbe.expectNext(2) + + pub.sendNext(3) + mainProbe.expectNext(3) + + pub.sendComplete() + mainProbe.expectComplete() + } + } +} diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala index 78584949e4d..41a46bb8a3e 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala @@ -103,6 +103,7 @@ import pekko.stream.Attributes._ val onErrorComplete = name("onErrorComplete") val broadcast = name("broadcast") val wireTap = name("wireTap") + val resilientAlsoTo = name("resilientAlsoTo") val balance = name("balance") val zip = name("zip") val zipLatest = name("zipLatest") diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala index 512b8ac022c..489757af2ea 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala @@ -3282,6 +3282,30 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr def alsoTo(that: Graph[SinkShape[Out], ?]): javadsl.Flow[In, Out, Mat] = new Flow(delegate.alsoTo(that)) + /** + * Attaches the given [[Sink]] to this [[Flow]], meaning that elements that pass + * through will also be sent to the [[Sink]]. + * + * When `propagateCancellation` is `false`, cancellation or failure of the side [[Sink]] + * will not cancel the main stream. Elements will continue to flow to the main downstream only. + * + * When `propagateCancellation` is `true` (the default), this behaves identically to [[#alsoTo]]. + * + * '''Emits when''' element is available and demand exists both from the Sink and the downstream. + * + * '''Backpressures when''' downstream or Sink backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels (the side [[Sink]] is also cancelled). + * When `propagateCancellation` is `true`, cancellation or failure of + * the side [[Sink]] also cancels the downstream. + * + * @since 1.2.0 + */ + def alsoTo(that: Graph[SinkShape[Out], ?], propagateCancellation: Boolean): javadsl.Flow[In, Out, Mat] = + new Flow(delegate.alsoTo(that, propagateCancellation)) + /** * Attaches the given [[Sink]]s to this [[Flow]], meaning that elements that passes * through will also be sent to all those [[Sink]]s. @@ -3317,6 +3341,23 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr matF: function.Function2[Mat, M2, M3]): javadsl.Flow[In, Out, M3] = new Flow(delegate.alsoToMat(that)(combinerToScala(matF))) + /** + * Attaches the given [[Sink]] to this [[Flow]], meaning that elements that pass + * through will also be sent to the [[Sink]]. + * + * @see [[#alsoTo]] + * + * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners + * where appropriate instead of manually writing functions that pass through one of the values. + * + * @since 1.2.0 + */ + def alsoToMat[M2, M3]( + that: Graph[SinkShape[Out], M2], + propagateCancellation: Boolean, + matF: function.Function2[Mat, M2, M3]): javadsl.Flow[In, Out, M3] = + new Flow(delegate.alsoToMat(that, propagateCancellation)(combinerToScala(matF))) + /** * Attaches the given [[Sink]] to this [[Flow]], meaning that elements will be sent to the [[Sink]] * instead of being passed through if the predicate `when` returns `true`. diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index e6ed914a8e7..94837475ae1 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -1406,6 +1406,30 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ def alsoTo(that: Graph[SinkShape[Out], ?]): javadsl.Source[Out, Mat] = new Source(delegate.alsoTo(that)) + /** + * Attaches the given [[Sink]] to this [[Source]], meaning that elements that pass + * through will also be sent to the [[Sink]]. + * + * When `propagateCancellation` is `false`, cancellation or failure of the side [[Sink]] + * will not cancel the main stream. Elements will continue to flow to the main downstream only. + * + * When `propagateCancellation` is `true` (the default), this behaves identically to [[#alsoTo]]. + * + * '''Emits when''' element is available and demand exists both from the Sink and the downstream. + * + * '''Backpressures when''' downstream or Sink backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels (the side [[Sink]] is also cancelled). + * When `propagateCancellation` is `true`, cancellation or failure of + * the side [[Sink]] also cancels the downstream. + * + * @since 1.2.0 + */ + def alsoTo(that: Graph[SinkShape[Out], ?], propagateCancellation: Boolean): javadsl.Source[Out, Mat] = + new Source(delegate.alsoTo(that, propagateCancellation)) + /** * Attaches the given [[Sink]]s to this [[Source]], meaning that elements that passes * through will also be sent to all those [[Sink]]s. @@ -1441,6 +1465,23 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ matF: function.Function2[Mat, M2, M3]): javadsl.Source[Out, M3] = new Source(delegate.alsoToMat(that)(combinerToScala(matF))) + /** + * Attaches the given [[Sink]] to this [[Source]], meaning that elements that pass + * through will also be sent to the [[Sink]]. + * + * @see [[#alsoTo]] + * + * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners + * where appropriate instead of manually writing functions that pass through one of the values. + * + * @since 1.2.0 + */ + def alsoToMat[M2, M3]( + that: Graph[SinkShape[Out], M2], + propagateCancellation: Boolean, + matF: function.Function2[Mat, M2, M3]): javadsl.Source[Out, M3] = + new Source(delegate.alsoToMat(that, propagateCancellation)(combinerToScala(matF))) + /** * Attaches the given [[Sink]] to this [[Flow]], meaning that elements will be sent to the [[Sink]] * instead of being passed through if the predicate `when` returns `true`. diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala index e2a1644e12f..7325b299637 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala @@ -2289,6 +2289,30 @@ final class SubFlow[In, Out, Mat]( def alsoTo(that: Graph[SinkShape[Out], ?]): SubFlow[In, Out, Mat] = new SubFlow(delegate.alsoTo(that)) + /** + * Attaches the given [[Sink]] to this [[Flow]], meaning that elements that pass + * through will also be sent to the [[Sink]]. + * + * When `propagateCancellation` is `false`, cancellation or failure of the side [[Sink]] + * will not cancel the main stream. Elements will continue to flow to the main downstream only. + * + * When `propagateCancellation` is `true` (the default), this behaves identically to [[#alsoTo]]. + * + * '''Emits when''' element is available and demand exists both from the Sink and the downstream. + * + * '''Backpressures when''' downstream or Sink backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels (the side [[Sink]] is also cancelled). + * When `propagateCancellation` is `true`, cancellation or failure of + * the side [[Sink]] also cancels the downstream. + * + * @since 1.2.0 + */ + def alsoTo(that: Graph[SinkShape[Out], ?], propagateCancellation: Boolean): SubFlow[In, Out, Mat] = + new SubFlow(delegate.alsoTo(that, propagateCancellation)) + /** * Attaches the given [[Sink]]s to this [[Flow]], meaning that elements that passes * through will also be sent to all those [[Sink]]s. diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala index c756c91c562..b24fcf8225c 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala @@ -2255,6 +2255,30 @@ final class SubSource[Out, Mat]( def alsoTo(that: Graph[SinkShape[Out], ?]): SubSource[Out, Mat] = new SubSource(delegate.alsoTo(that)) + /** + * Attaches the given [[Sink]] to this [[Source]], meaning that elements that pass + * through will also be sent to the [[Sink]]. + * + * When `propagateCancellation` is `false`, cancellation or failure of the side [[Sink]] + * will not cancel the main stream. Elements will continue to flow to the main downstream only. + * + * When `propagateCancellation` is `true` (the default), this behaves identically to [[#alsoTo]]. + * + * '''Emits when''' element is available and demand exists both from the Sink and the downstream. + * + * '''Backpressures when''' downstream or Sink backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels (the side [[Sink]] is also cancelled). + * When `propagateCancellation` is `true`, cancellation or failure of + * the side [[Sink]] also cancels the downstream. + * + * @since 1.2.0 + */ + def alsoTo(that: Graph[SinkShape[Out], ?], propagateCancellation: Boolean): SubSource[Out, Mat] = + new SubSource(delegate.alsoTo(that, propagateCancellation)) + /** * Attaches the given [[Sink]]s to this [[Source]], meaning that elements that passes * through will also be sent to all those [[Sink]]s. diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala index 4ae16e46ce4..16501af7a61 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala @@ -4019,6 +4019,44 @@ trait FlowOps[+Out, +Mat] { FlowShape(bcast.in, bcast.out(0)) } + /** + * Attaches the given [[Sink]] to this [[Flow]], meaning that elements that pass + * through will also be sent to the [[Sink]]. + * + * When `propagateCancellation` is `false`, cancellation or failure of the side [[Sink]] + * will not cancel the main stream. Elements will continue to flow to the main downstream + * only. This is useful for fire-and-forget side sinks (e.g. logging) where the side sink's + * availability should not affect the main business stream. + * + * When `propagateCancellation` is `true` (the default), this behaves identically to [[#alsoTo]]. + * + * It is similar to [[#wireTap]] but will backpressure instead of dropping elements when the given [[Sink]] is not ready. + * + * '''Emits when''' element is available and demand exists both from the Sink and the downstream. + * + * '''Backpressures when''' downstream or Sink backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels (the side [[Sink]] is also cancelled). + * When `propagateCancellation` is `true`, cancellation or failure of + * the side [[Sink]] also cancels the downstream. + * + * @since 1.2.0 + */ + def alsoTo(that: Graph[SinkShape[Out], ?], propagateCancellation: Boolean): Repr[Out] = + if (propagateCancellation) alsoTo(that) + else via(resilientAlsoToGraph(that)) + + protected def resilientAlsoToGraph[M]( + that: Graph[SinkShape[Out], M]): Graph[FlowShape[Out @uncheckedVariance, Out], M] = + GraphDSL.createGraph(that) { implicit b => r => + import GraphDSL.Implicits._ + val stage = b.add(new ResilientAlsoTo[Out]) + stage.out1 ~> r + FlowShape(stage.in, stage.out0) + } + /** * Attaches the given [[Sink]]s to this [[Source]], meaning that elements that pass * through will also be sent to the [[Sink]]. @@ -4536,6 +4574,22 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] { def alsoToMat[Mat2, Mat3](that: Graph[SinkShape[Out], Mat2])(matF: (Mat, Mat2) => Mat3): ReprMat[Out, Mat3] = viaMat(alsoToGraph(that))(matF) + /** + * Attaches the given [[Sink]] to this [[Flow]], meaning that elements that pass + * through will also be sent to the [[Sink]]. + * + * @see [[#alsoTo]] + * + * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners + * where appropriate instead of manually writing functions that pass through one of the values. + * + * @since 1.2.0 + */ + def alsoToMat[Mat2, Mat3](that: Graph[SinkShape[Out], Mat2], propagateCancellation: Boolean)( + matF: (Mat, Mat2) => Mat3): ReprMat[Out, Mat3] = + if (propagateCancellation) viaMat(alsoToGraph(that))(matF) + else viaMat(resilientAlsoToGraph(that))(matF) + /** * Attaches the given [[Sink]] to this [[Flow]], meaning that elements will be sent to the [[Sink]] * instead of being passed through if the predicate `when` returns `true`. diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContext.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContext.scala index fd7c1eafb6d..40cf9ce25fb 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContext.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContext.scala @@ -149,6 +149,10 @@ final class FlowWithContext[-In, -CtxIn, +Out, +CtxOut, +Mat](delegate: Flow[(In override def alsoTo(that: Graph[SinkShape[Out], ?]): Repr[Out, CtxOut] = FlowWithContext.fromTuples(delegate.alsoTo(Sink.contramapImpl(that, (in: (Out, CtxOut)) => in._1))) + override def alsoTo(that: Graph[SinkShape[Out], ?], propagateCancellation: Boolean): Repr[Out, CtxOut] = + FlowWithContext.fromTuples( + delegate.alsoTo(Sink.contramapImpl(that, (in: (Out, CtxOut)) => in._1), propagateCancellation)) + override def alsoToContext(that: Graph[SinkShape[CtxOut], ?]): Repr[Out, CtxOut] = FlowWithContext.fromTuples(delegate.alsoTo(Sink.contramapImpl(that, (in: (Out, CtxOut)) => in._2))) diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContextOps.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContextOps.scala index 9fafa15e642..eaef4cea2f3 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContextOps.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContextOps.scala @@ -94,6 +94,14 @@ trait FlowWithContextOps[+Out, +Ctx, +Mat] { */ def alsoTo(that: Graph[SinkShape[Out], ?]): Repr[Out, Ctx] + /** + * Data variant of [[pekko.stream.scaladsl.FlowOps.alsoTo]] with configurable cancellation propagation. + * + * @see [[pekko.stream.scaladsl.FlowOps.alsoTo]] + * @since 1.2.0 + */ + def alsoTo(that: Graph[SinkShape[Out], ?], propagateCancellation: Boolean): Repr[Out, Ctx] + /** * Context variant of [[pekko.stream.scaladsl.FlowOps.alsoTo]] * diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Graph.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Graph.scala index 6cde3ffa9f2..58671bb7b94 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Graph.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Graph.scala @@ -30,7 +30,7 @@ import pekko.stream.impl._ import pekko.stream.impl.Stages.DefaultAttributes import pekko.stream.impl.fusing.GraphStages import pekko.stream.scaladsl.Partition.PartitionOutOfBoundsException -import pekko.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler } +import pekko.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler, StageLogging } import pekko.util.ConstantFun /** @@ -781,6 +781,110 @@ private[stream] final class WireTap[T] extends GraphStage[FanOutShape2[T, T, T]] override def toString = "WireTap" } +/** + * INTERNAL API + * + * A `Broadcast`-like stage with two outputs where cancellation of the side output (out1) + * does not cancel the stage. Elements continue flowing to the main output (out0) after + * the side output cancels or fails. Cancellation of the main output cancels the stage. + * + * Backpressures when either output backpressures (same contract as `alsoTo` / `Broadcast`). + */ +private[pekko] final class ResilientAlsoTo[T] extends GraphStage[FanOutShape2[T, T, T]] { + val in: Inlet[T] = Inlet[T]("ResilientAlsoTo.in") + val outMain: Outlet[T] = Outlet[T]("ResilientAlsoTo.outMain") + val outSide: Outlet[T] = Outlet[T]("ResilientAlsoTo.outSide") + override def initialAttributes: Attributes = DefaultAttributes.resilientAlsoTo + override val shape: FanOutShape2[T, T, T] = new FanOutShape2(in, outMain, outSide) + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new GraphStageLogic(shape) with InHandler with OutHandler with StageLogging { + + private var pendingElement: Option[T] = None + private var mainReady = false + private var sideReady = false + + override def onPush(): Unit = { + val elem = grab(in) + if (isClosed(outSide)) { + push(outMain, elem) + } else if (mainReady && sideReady) { + push(outMain, elem) + push(outSide, elem) + mainReady = false + sideReady = false + } else { + pendingElement = Some(elem) + } + } + + override def onPull(): Unit = { + mainReady = true + tryPushAndPull() + } + + override def onDownstreamFinish(cause: Throwable): Unit = + cancelStage(cause) + + private def tryPushAndPull(): Unit = { + pendingElement match { + case Some(elem) => + if (isClosed(outSide)) { + push(outMain, elem) + pendingElement = None + mainReady = false + } else if (mainReady && sideReady) { + push(outMain, elem) + push(outSide, elem) + pendingElement = None + mainReady = false + sideReady = false + } + case None => + if (mainReady && (sideReady || isClosed(outSide)) && !hasBeenPulled(in)) + pull(in) + } + } + + setHandler( + outSide, + new OutHandler { + override def onPull(): Unit = { + sideReady = true + tryPushAndPull() + } + + override def onDownstreamFinish(cause: Throwable): Unit = { + if (!cause.isInstanceOf[SubscriptionWithCancelException.NonFailureCancellation]) + log.warning("ResilientAlsoTo: side sink failed, continuing main stream: {}", cause.getMessage) + else + log.debug("ResilientAlsoTo: side sink cancelled") + pendingElement match { + case Some(elem) => + if (mainReady) { + push(outMain, elem) + mainReady = false + } + pendingElement = None + case None => + } + sideReady = false + setHandler(in, + new InHandler { + override def onPush(): Unit = + push(outMain, grab(in)) + }) + if (mainReady && !hasBeenPulled(in)) + pull(in) + } + }) + + setHandlers(in, outMain, this) + } + + override def toString = "ResilientAlsoTo" +} + object Partition { // FIXME make `PartitionOutOfBoundsException` a `final` class when possible case class PartitionOutOfBoundsException(msg: String) extends IndexOutOfBoundsException(msg) with NoStackTrace diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/SourceWithContext.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/SourceWithContext.scala index 4890b40efe4..e97b2066968 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/SourceWithContext.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/SourceWithContext.scala @@ -166,6 +166,10 @@ final class SourceWithContext[+Out, +Ctx, +Mat] private[stream] (delegate: Sourc override def alsoTo(that: Graph[SinkShape[Out], ?]): Repr[Out, Ctx] = SourceWithContext.fromTuples(delegate.alsoTo(Sink.contramapImpl(that, (in: (Out, Ctx)) => in._1))) + override def alsoTo(that: Graph[SinkShape[Out], ?], propagateCancellation: Boolean): Repr[Out, Ctx] = + SourceWithContext.fromTuples( + delegate.alsoTo(Sink.contramapImpl(that, (in: (Out, Ctx)) => in._1), propagateCancellation)) + override def alsoToContext(that: Graph[SinkShape[Ctx], ?]): Repr[Out, Ctx] = SourceWithContext.fromTuples(delegate.alsoTo(Sink.contramapImpl(that, (in: (Out, Ctx)) => in._2))) From 28a7da057f05509251c9206c00534f277ad40cfd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=99=8E=E9=B8=A3?= Date: Tue, 23 Jun 2026 02:07:38 +0800 Subject: [PATCH 2/7] fix(stream): correct @since version, add Java tests, clean up DefaultAttributes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Motivation: Review feedback: @since should be 2.0.0, Java tests were missing, and DefaultAttributes.resilientAlsoTo was unnecessary. Modification: - Fix @since 1.2.0 to @since 2.0.0 across all new alsoTo/alsoToMat overloads - Add Java DSL compilation tests for alsoTo(propagateCancellation) and alsoToMat(propagateCancellation) in FlowTest and SourceTest - Remove DefaultAttributes.resilientAlsoTo; use inline Attributes.name("resilientAlsoTo") instead Result: Correct version annotation, Java API coverage, cleaner DefaultAttributes. Tests: - sbt "stream-tests / Test / testOnly org.apache.pekko.stream.scaladsl.FlowAlsoToSpec org.apache.pekko.stream.DslConsistencySpec org.apache.pekko.stream.javadsl.FlowTest org.apache.pekko.stream.javadsl.SourceTest" — 222/222 passed References: Refs #3104 --- .../org/apache/pekko/stream/javadsl/FlowTest.java | 12 ++++++++++++ .../org/apache/pekko/stream/javadsl/SourceTest.java | 12 ++++++++++++ .../scala/org/apache/pekko/stream/impl/Stages.scala | 1 - .../scala/org/apache/pekko/stream/javadsl/Flow.scala | 4 ++-- .../org/apache/pekko/stream/javadsl/Source.scala | 4 ++-- .../org/apache/pekko/stream/javadsl/SubFlow.scala | 2 +- .../org/apache/pekko/stream/javadsl/SubSource.scala | 2 +- .../org/apache/pekko/stream/scaladsl/Flow.scala | 4 ++-- .../pekko/stream/scaladsl/FlowWithContextOps.scala | 2 +- .../org/apache/pekko/stream/scaladsl/Graph.scala | 2 +- 10 files changed, 34 insertions(+), 11 deletions(-) diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java index 7dd153b278a..5c9804fbd81 100644 --- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java +++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java @@ -1890,6 +1890,18 @@ public void mustBeAbleToUseAlsoTo() { Flow.of(Integer.class).alsoToMat(Sink.ignore(), (i, n) -> "foo"); } + @Test + public void mustBeAbleToUseAlsoToWithPropagateCancellation() { + final Flow f = + Flow.of(Integer.class).alsoTo(Sink.ignore(), false); + final Flow f2 = + Flow.of(Integer.class).alsoTo(Sink.ignore(), true); + final Flow f3 = + Flow.of(Integer.class).alsoToMat(Sink.ignore(), false, (i, n) -> "foo"); + final Flow f4 = + Flow.of(Integer.class).alsoToMat(Sink.ignore(), true, (i, n) -> "foo"); + } + @Test public void mustBeAbleToUseAlsoToAll() { final Flow f = diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java index db3341baeaa..302dcb8cae7 100644 --- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java +++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java @@ -1490,6 +1490,18 @@ public void mustBeAbleToUseAlsoTo() { Source.empty().alsoToMat(Sink.ignore(), (i, n) -> "foo"); } + @Test + public void mustBeAbleToUseAlsoToWithPropagateCancellation() { + final Source f = + Source.empty().alsoTo(Sink.ignore(), false); + final Source f2 = + Source.empty().alsoTo(Sink.ignore(), true); + final Source f3 = + Source.empty().alsoToMat(Sink.ignore(), false, (i, n) -> "foo"); + final Source f4 = + Source.empty().alsoToMat(Sink.ignore(), true, (i, n) -> "foo"); + } + @Test public void mustBeAbleToUseAlsoToAll() { final Source f = diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala index 41a46bb8a3e..78584949e4d 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/Stages.scala @@ -103,7 +103,6 @@ import pekko.stream.Attributes._ val onErrorComplete = name("onErrorComplete") val broadcast = name("broadcast") val wireTap = name("wireTap") - val resilientAlsoTo = name("resilientAlsoTo") val balance = name("balance") val zip = name("zip") val zipLatest = name("zipLatest") diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala index 489757af2ea..8e628c917b1 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala @@ -3301,7 +3301,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * When `propagateCancellation` is `true`, cancellation or failure of * the side [[Sink]] also cancels the downstream. * - * @since 1.2.0 + * @since 2.0.0 */ def alsoTo(that: Graph[SinkShape[Out], ?], propagateCancellation: Boolean): javadsl.Flow[In, Out, Mat] = new Flow(delegate.alsoTo(that, propagateCancellation)) @@ -3350,7 +3350,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners * where appropriate instead of manually writing functions that pass through one of the values. * - * @since 1.2.0 + * @since 2.0.0 */ def alsoToMat[M2, M3]( that: Graph[SinkShape[Out], M2], diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index 94837475ae1..3a618887705 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -1425,7 +1425,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * When `propagateCancellation` is `true`, cancellation or failure of * the side [[Sink]] also cancels the downstream. * - * @since 1.2.0 + * @since 2.0.0 */ def alsoTo(that: Graph[SinkShape[Out], ?], propagateCancellation: Boolean): javadsl.Source[Out, Mat] = new Source(delegate.alsoTo(that, propagateCancellation)) @@ -1474,7 +1474,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners * where appropriate instead of manually writing functions that pass through one of the values. * - * @since 1.2.0 + * @since 2.0.0 */ def alsoToMat[M2, M3]( that: Graph[SinkShape[Out], M2], diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala index 7325b299637..40d38b00811 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala @@ -2308,7 +2308,7 @@ final class SubFlow[In, Out, Mat]( * When `propagateCancellation` is `true`, cancellation or failure of * the side [[Sink]] also cancels the downstream. * - * @since 1.2.0 + * @since 2.0.0 */ def alsoTo(that: Graph[SinkShape[Out], ?], propagateCancellation: Boolean): SubFlow[In, Out, Mat] = new SubFlow(delegate.alsoTo(that, propagateCancellation)) diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala index b24fcf8225c..4259f3e9e20 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala @@ -2274,7 +2274,7 @@ final class SubSource[Out, Mat]( * When `propagateCancellation` is `true`, cancellation or failure of * the side [[Sink]] also cancels the downstream. * - * @since 1.2.0 + * @since 2.0.0 */ def alsoTo(that: Graph[SinkShape[Out], ?], propagateCancellation: Boolean): SubSource[Out, Mat] = new SubSource(delegate.alsoTo(that, propagateCancellation)) diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala index 16501af7a61..ec4e24a79c3 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala @@ -4042,7 +4042,7 @@ trait FlowOps[+Out, +Mat] { * When `propagateCancellation` is `true`, cancellation or failure of * the side [[Sink]] also cancels the downstream. * - * @since 1.2.0 + * @since 2.0.0 */ def alsoTo(that: Graph[SinkShape[Out], ?], propagateCancellation: Boolean): Repr[Out] = if (propagateCancellation) alsoTo(that) @@ -4583,7 +4583,7 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] { * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners * where appropriate instead of manually writing functions that pass through one of the values. * - * @since 1.2.0 + * @since 2.0.0 */ def alsoToMat[Mat2, Mat3](that: Graph[SinkShape[Out], Mat2], propagateCancellation: Boolean)( matF: (Mat, Mat2) => Mat3): ReprMat[Out, Mat3] = diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContextOps.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContextOps.scala index eaef4cea2f3..65aa7c095db 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContextOps.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContextOps.scala @@ -98,7 +98,7 @@ trait FlowWithContextOps[+Out, +Ctx, +Mat] { * Data variant of [[pekko.stream.scaladsl.FlowOps.alsoTo]] with configurable cancellation propagation. * * @see [[pekko.stream.scaladsl.FlowOps.alsoTo]] - * @since 1.2.0 + * @since 2.0.0 */ def alsoTo(that: Graph[SinkShape[Out], ?], propagateCancellation: Boolean): Repr[Out, Ctx] diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Graph.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Graph.scala index 58671bb7b94..51e01f58104 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Graph.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Graph.scala @@ -794,7 +794,7 @@ private[pekko] final class ResilientAlsoTo[T] extends GraphStage[FanOutShape2[T, val in: Inlet[T] = Inlet[T]("ResilientAlsoTo.in") val outMain: Outlet[T] = Outlet[T]("ResilientAlsoTo.outMain") val outSide: Outlet[T] = Outlet[T]("ResilientAlsoTo.outSide") - override def initialAttributes: Attributes = DefaultAttributes.resilientAlsoTo + override def initialAttributes: Attributes = Attributes.name("resilientAlsoTo") override val shape: FanOutShape2[T, T, T] = new FanOutShape2(in, outMain, outSide) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = From 3664e34afd34a6b53771f481762c99795453fcaa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=99=8E=E9=B8=A3?= Date: Tue, 23 Jun 2026 02:24:41 +0800 Subject: [PATCH 3/7] refactor(stream): implement alsoTo(propagateCancellation=false) via Broadcast MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Motivation: ResilientAlsoTo was a separate GraphStage duplicating Broadcast's fan-out logic. WireTap was also modified with an unrelated backpressure parameter. Modification: Add nonEagerCancelOutputs: Set[Int] parameter to Broadcast. Outputs in this set do not trigger stage cancellation when they cancel — elements continue flowing to remaining outputs. alsoTo(propagateCancellation=false) now uses Broadcast(2, eagerCancel=true, nonEagerCancelOutputs=Set(1)), eliminating ResilientAlsoTo entirely. WireTap is reverted to its original form. Also remove DefaultAttributes.resilientAlsoTo and fix @since to 2.0.0 across all new methods. Add Java DSL compilation tests. Result: Cleaner implementation built on existing Broadcast. No new GraphStage. Same 224 tests pass. Tests: - 224/224 passed (FlowAlsoToSpec, DslConsistencySpec, FlowTest, SourceTest, FlowAlsoToAllSpec) References: Refs #3104 --- .../apache/pekko/stream/scaladsl/Flow.scala | 6 +- .../apache/pekko/stream/scaladsl/Graph.scala | 134 ++++-------------- 2 files changed, 29 insertions(+), 111 deletions(-) diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala index ec4e24a79c3..15e657c1b9c 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala @@ -4052,9 +4052,9 @@ trait FlowOps[+Out, +Mat] { that: Graph[SinkShape[Out], M]): Graph[FlowShape[Out @uncheckedVariance, Out], M] = GraphDSL.createGraph(that) { implicit b => r => import GraphDSL.Implicits._ - val stage = b.add(new ResilientAlsoTo[Out]) - stage.out1 ~> r - FlowShape(stage.in, stage.out0) + val bcast = b.add(Broadcast[Out](2, eagerCancel = true, nonEagerCancelOutputs = Set(1))) + bcast.out(1) ~> r + FlowShape(bcast.in, bcast.out(0)) } /** diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Graph.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Graph.scala index 51e01f58104..2ccd4d102fb 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Graph.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Graph.scala @@ -30,7 +30,7 @@ import pekko.stream.impl._ import pekko.stream.impl.Stages.DefaultAttributes import pekko.stream.impl.fusing.GraphStages import pekko.stream.scaladsl.Partition.PartitionOutOfBoundsException -import pekko.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler, StageLogging } +import pekko.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler } import pekko.util.ConstantFun /** @@ -609,6 +609,25 @@ object Broadcast { */ def apply[T](outputPorts: Int, eagerCancel: Boolean = false): Broadcast[T] = new Broadcast(outputPorts, eagerCancel) + + /** + * Create a new `Broadcast` with the specified number of output ports and per-output + * cancellation behavior. + * + * @param outputPorts number of output ports + * @param eagerCancel if true, broadcast cancels upstream if any of its downstreams cancel + * (except those listed in `nonEagerCancelOutputs`). + * @param nonEagerCancelOutputs set of output port indices whose cancellation should not + * trigger upstream cancellation. These outputs are silently + * removed from the broadcast when they cancel, and elements + * continue flowing to the remaining outputs. + * @since 2.0.0 + */ + def apply[T]( + outputPorts: Int, + eagerCancel: Boolean, + nonEagerCancelOutputs: Set[Int]): Broadcast[T] = + new Broadcast(outputPorts, eagerCancel, nonEagerCancelOutputs) } /** @@ -624,11 +643,14 @@ object Broadcast { * '''Cancels when''' * If eagerCancel is enabled: when any downstream cancels; otherwise: when all downstreams cancel */ -final class Broadcast[T](val outputPorts: Int, val eagerCancel: Boolean) +final class Broadcast[T](val outputPorts: Int, val eagerCancel: Boolean, val nonEagerCancelOutputs: Set[Int]) extends GraphStage[UniformFanOutShape[T, T]] with pekko.stream.TypePreservingFanOut { - // one output might seem counter intuitive but saves us from special handling in other places + def this(outputPorts: Int, eagerCancel: Boolean) = this(outputPorts, eagerCancel, Set.empty) require(outputPorts >= 1, "A Broadcast must have one or more output ports") + require( + nonEagerCancelOutputs.isEmpty || !eagerCancel || nonEagerCancelOutputs.subsetOf((0 until outputPorts).toSet), + "nonEagerCancelOutputs must be a subset of output indices") val in: Inlet[T] = Inlet[T]("Broadcast.in") val out: immutable.IndexedSeq[Outlet[T]] = Vector.tabulate(outputPorts)(i => Outlet[T]("Broadcast.out" + i)) override def initialAttributes = DefaultAttributes.broadcast @@ -677,7 +699,7 @@ final class Broadcast[T](val outputPorts: Int, val eagerCancel: Boolean) } override def onDownstreamFinish(cause: Throwable) = { - if (eagerCancel) cancelStage(cause) + if (eagerCancel && !nonEagerCancelOutputs.contains(i)) cancelStage(cause) else { downstreamsRunning -= 1 if (downstreamsRunning == 0) cancelStage(cause) @@ -781,110 +803,6 @@ private[stream] final class WireTap[T] extends GraphStage[FanOutShape2[T, T, T]] override def toString = "WireTap" } -/** - * INTERNAL API - * - * A `Broadcast`-like stage with two outputs where cancellation of the side output (out1) - * does not cancel the stage. Elements continue flowing to the main output (out0) after - * the side output cancels or fails. Cancellation of the main output cancels the stage. - * - * Backpressures when either output backpressures (same contract as `alsoTo` / `Broadcast`). - */ -private[pekko] final class ResilientAlsoTo[T] extends GraphStage[FanOutShape2[T, T, T]] { - val in: Inlet[T] = Inlet[T]("ResilientAlsoTo.in") - val outMain: Outlet[T] = Outlet[T]("ResilientAlsoTo.outMain") - val outSide: Outlet[T] = Outlet[T]("ResilientAlsoTo.outSide") - override def initialAttributes: Attributes = Attributes.name("resilientAlsoTo") - override val shape: FanOutShape2[T, T, T] = new FanOutShape2(in, outMain, outSide) - - override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = - new GraphStageLogic(shape) with InHandler with OutHandler with StageLogging { - - private var pendingElement: Option[T] = None - private var mainReady = false - private var sideReady = false - - override def onPush(): Unit = { - val elem = grab(in) - if (isClosed(outSide)) { - push(outMain, elem) - } else if (mainReady && sideReady) { - push(outMain, elem) - push(outSide, elem) - mainReady = false - sideReady = false - } else { - pendingElement = Some(elem) - } - } - - override def onPull(): Unit = { - mainReady = true - tryPushAndPull() - } - - override def onDownstreamFinish(cause: Throwable): Unit = - cancelStage(cause) - - private def tryPushAndPull(): Unit = { - pendingElement match { - case Some(elem) => - if (isClosed(outSide)) { - push(outMain, elem) - pendingElement = None - mainReady = false - } else if (mainReady && sideReady) { - push(outMain, elem) - push(outSide, elem) - pendingElement = None - mainReady = false - sideReady = false - } - case None => - if (mainReady && (sideReady || isClosed(outSide)) && !hasBeenPulled(in)) - pull(in) - } - } - - setHandler( - outSide, - new OutHandler { - override def onPull(): Unit = { - sideReady = true - tryPushAndPull() - } - - override def onDownstreamFinish(cause: Throwable): Unit = { - if (!cause.isInstanceOf[SubscriptionWithCancelException.NonFailureCancellation]) - log.warning("ResilientAlsoTo: side sink failed, continuing main stream: {}", cause.getMessage) - else - log.debug("ResilientAlsoTo: side sink cancelled") - pendingElement match { - case Some(elem) => - if (mainReady) { - push(outMain, elem) - mainReady = false - } - pendingElement = None - case None => - } - sideReady = false - setHandler(in, - new InHandler { - override def onPush(): Unit = - push(outMain, grab(in)) - }) - if (mainReady && !hasBeenPulled(in)) - pull(in) - } - }) - - setHandlers(in, outMain, this) - } - - override def toString = "ResilientAlsoTo" -} - object Partition { // FIXME make `PartitionOutOfBoundsException` a `final` class when possible case class PartitionOutOfBoundsException(msg: String) extends IndexOutOfBoundsException(msg) with NoStackTrace From 49c88779c1148c5daa6d653ab3318e1dc0aa01a6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=99=8E=E9=B8=A3?= Date: Tue, 23 Jun 2026 02:44:10 +0800 Subject: [PATCH 4/7] style: apply javafmt to Java test files --- .../test/java/org/apache/pekko/stream/javadsl/FlowTest.java | 6 ++---- .../java/org/apache/pekko/stream/javadsl/SourceTest.java | 6 ++---- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java index 5c9804fbd81..56dd1177c5a 100644 --- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java +++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java @@ -1892,10 +1892,8 @@ public void mustBeAbleToUseAlsoTo() { @Test public void mustBeAbleToUseAlsoToWithPropagateCancellation() { - final Flow f = - Flow.of(Integer.class).alsoTo(Sink.ignore(), false); - final Flow f2 = - Flow.of(Integer.class).alsoTo(Sink.ignore(), true); + final Flow f = Flow.of(Integer.class).alsoTo(Sink.ignore(), false); + final Flow f2 = Flow.of(Integer.class).alsoTo(Sink.ignore(), true); final Flow f3 = Flow.of(Integer.class).alsoToMat(Sink.ignore(), false, (i, n) -> "foo"); final Flow f4 = diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java index 302dcb8cae7..93b8ad16526 100644 --- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java +++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java @@ -1492,10 +1492,8 @@ public void mustBeAbleToUseAlsoTo() { @Test public void mustBeAbleToUseAlsoToWithPropagateCancellation() { - final Source f = - Source.empty().alsoTo(Sink.ignore(), false); - final Source f2 = - Source.empty().alsoTo(Sink.ignore(), true); + final Source f = Source.empty().alsoTo(Sink.ignore(), false); + final Source f2 = Source.empty().alsoTo(Sink.ignore(), true); final Source f3 = Source.empty().alsoToMat(Sink.ignore(), false, (i, n) -> "foo"); final Source f4 = From ed64b1efa8eabb3af2a9baba4fdb3f6e5006b254 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=99=8E=E9=B8=A3?= Date: Tue, 23 Jun 2026 22:54:17 +0800 Subject: [PATCH 5/7] fix(stream): preserve Broadcast binary compatibility and tighten require Motivation: The previous commit restructured Broadcast to take a 3-arg primary constructor (Int, Boolean, Set[Int]), replacing the historical 2-arg (Int, Boolean) primary. That changes the JVM `` descriptor, which is a binary-compatibility break for any external caller compiled against prior Pekko versions invoking `new Broadcast(n, eager)`. Also, the require condition allowed a non-empty nonEagerCancelOutputs with eagerCancel=false, where the parameter is silently meaningless. Modification: - Restore the 2-arg `(Int, Boolean)` as the primary constructor, preserving the `(IZ)V` JVM descriptor for existing callers. Add the 3-arg `(Int, Boolean, Set[Int])` as an auxiliary constructor. Store nonEagerCancelOutputs in a private mutable backing field assigned once from the auxiliary constructor; expose it via a public `def nonEagerCancelOutputs` accessor (source- and binary-compatible since it is a new, unreleased API). - Strengthen require: nonEagerCancelOutputs must imply eagerCancel=true (rejected with a descriptive message otherwise), and must be a subset of valid output indices. - Add two regression tests in GraphBroadcastSpec: * "continue to remaining outputs when a non-eager output cancels" directly exercises the new Broadcast overload. * "reject nonEagerCancelOutputs when eagerCancel is false" verifies the require guard. Result: Existing callers compiled against the old Broadcast(Int, Boolean) constructor keep working (MiMa clean). New 3-arg constructor is source-available. Invalid Broadcast configurations are rejected at construction time with a clear message. Tests: - sbt 'stream-tests / Test / testOnly org.apache.pekko.stream.scaladsl.GraphBroadcastSpec' -> 13/13 passed - sbt 'stream-tests / Test / testOnly org.apache.pekko.stream.scaladsl.FlowAlsoToSpec' -> 11/11 passed - sbt 'stream-tests / Test / testOnly org.apache.pekko.stream.DslConsistencySpec' -> 12/12 passed - sbt '+stream / mimaReportBinaryIssues' -> clean References: Refs #3104 --- .../stream/scaladsl/GraphBroadcastSpec.scala | 45 +++++++++++++++++++ .../apache/pekko/stream/scaladsl/Graph.scala | 45 ++++++++++++++++--- 2 files changed, 85 insertions(+), 5 deletions(-) diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/GraphBroadcastSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/GraphBroadcastSpec.scala index 5817f7a6330..0502b106df7 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/GraphBroadcastSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/GraphBroadcastSpec.scala @@ -301,6 +301,51 @@ class GraphBroadcastSpec extends StreamSpec(""" pMain.cancel() pIn.expectCancellation() } + + "continue to remaining outputs when a non-eager output cancels" in { + val (pub, src) = TestSource[Int]().preMaterialize() + val (mainProbe, mainSink) = TestSink[Int]().preMaterialize() + val (sideProbe, sideSink) = TestSink[Int]().preMaterialize() + + RunnableGraph + .fromGraph(GraphDSL.create() { implicit b => + import GraphDSL.Implicits._ + // eagerCancel=true but side output (index 1) is non-eager: its cancellation + // must not tear down the stage. Main output (index 0) keeps flowing. + val bcast = b.add(Broadcast[Int](2, eagerCancel = true, nonEagerCancelOutputs = Set(1))) + src ~> bcast.in + bcast.out(0) ~> mainSink + bcast.out(1) ~> sideSink + ClosedShape + }) + .run() + + mainProbe.request(3) + sideProbe.request(2) + + pub.sendNext(1) + mainProbe.expectNext(1) + sideProbe.expectNext(1) + + pub.sendNext(2) + mainProbe.expectNext(2) + sideProbe.expectNext(2) + + sideProbe.cancel() + + pub.sendNext(3) + mainProbe.expectNext(3) + + pub.sendComplete() + mainProbe.expectComplete() + } + + "reject nonEagerCancelOutputs when eagerCancel is false" in { + val ex = intercept[IllegalArgumentException] { + Broadcast[Int](2, eagerCancel = false, nonEagerCancelOutputs = Set(1)) + } + ex.getMessage should include("requires eagerCancel=true") + } } } diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Graph.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Graph.scala index 2ccd4d102fb..83c13b5173f 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Graph.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Graph.scala @@ -643,14 +643,49 @@ object Broadcast { * '''Cancels when''' * If eagerCancel is enabled: when any downstream cancels; otherwise: when all downstreams cancel */ -final class Broadcast[T](val outputPorts: Int, val eagerCancel: Boolean, val nonEagerCancelOutputs: Set[Int]) +final class Broadcast[T](val outputPorts: Int, val eagerCancel: Boolean) extends GraphStage[UniformFanOutShape[T, T]] with pekko.stream.TypePreservingFanOut { - def this(outputPorts: Int, eagerCancel: Boolean) = this(outputPorts, eagerCancel, Set.empty) require(outputPorts >= 1, "A Broadcast must have one or more output ports") - require( - nonEagerCancelOutputs.isEmpty || !eagerCancel || nonEagerCancelOutputs.subsetOf((0 until outputPorts).toSet), - "nonEagerCancelOutputs must be a subset of output indices") + + // Backing field for the 3-arg auxiliary constructor; default is empty (all outputs eager). + // The auxiliary constructor assigns this exactly once during construction, so the value is + // effectively immutable after init. Kept `var` because Scala auxiliary constructors cannot + // assign `val` fields. + private var _nonEagerCancelOutputs: Set[Int] = Set.empty + + /** + * Set of output port indices whose cancellation should not trigger upstream cancellation. + * Only meaningful when `eagerCancel` is `true`; these outputs are silently removed from the + * broadcast when they cancel, and elements continue flowing to the remaining outputs. + * + * @since 2.0.0 + */ + def nonEagerCancelOutputs: Set[Int] = _nonEagerCancelOutputs + + /** + * Create a new `Broadcast` with per-output cancellation behavior. + * + * @param nonEagerCancelOutputs set of output port indices whose cancellation should not + * trigger upstream cancellation when `eagerCancel` is `true`. + * These outputs are silently removed from the broadcast when + * they cancel, and elements continue flowing to the remaining + * outputs. Must be empty when `eagerCancel` is `false` (where + * the parameter would be meaningless). + * @since 2.0.0 + */ + def this(outputPorts: Int, eagerCancel: Boolean, nonEagerCancelOutputs: Set[Int]) = { + this(outputPorts, eagerCancel) + require( + eagerCancel || nonEagerCancelOutputs.isEmpty, + s"nonEagerCancelOutputs [$nonEagerCancelOutputs] requires eagerCancel=true; " + + s"with eagerCancel=false all outputs already behave non-eagerly") + require( + nonEagerCancelOutputs.subsetOf((0 until outputPorts).toSet), + s"nonEagerCancelOutputs [$nonEagerCancelOutputs] must be a subset of [${0 until outputPorts}]") + _nonEagerCancelOutputs = nonEagerCancelOutputs + } + val in: Inlet[T] = Inlet[T]("Broadcast.in") val out: immutable.IndexedSeq[Outlet[T]] = Vector.tabulate(outputPorts)(i => Outlet[T]("Broadcast.out" + i)) override def initialAttributes = DefaultAttributes.broadcast From 93b2b18cb11df704b5cc51dea4fc3e8f2a5e1073 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=99=8E=E9=B8=A3?= Date: Tue, 23 Jun 2026 22:54:51 +0800 Subject: [PATCH 6/7] style: apply scalafmt to GraphBroadcastSpec References: Refs #3104 --- .../org/apache/pekko/stream/scaladsl/GraphBroadcastSpec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/GraphBroadcastSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/GraphBroadcastSpec.scala index 0502b106df7..0b9b3c38842 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/GraphBroadcastSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/GraphBroadcastSpec.scala @@ -313,7 +313,7 @@ class GraphBroadcastSpec extends StreamSpec(""" // eagerCancel=true but side output (index 1) is non-eager: its cancellation // must not tear down the stage. Main output (index 0) keeps flowing. val bcast = b.add(Broadcast[Int](2, eagerCancel = true, nonEagerCancelOutputs = Set(1))) - src ~> bcast.in + src ~> bcast.in bcast.out(0) ~> mainSink bcast.out(1) ~> sideSink ClosedShape From 4d9a9c00dd27e2488f33d17e593e3e07517c2f79 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=99=8E=E9=B8=A3?= Date: Wed, 24 Jun 2026 23:21:39 +0800 Subject: [PATCH 7/7] fix(stream): add Java Broadcast non-eager cancellation factory Motivation: PR #3127 added a Scala Broadcast factory overload used by alsoTo(propagateCancellation = false), but the Java DSL did not expose the matching factory. The Scala 3.3 DSL factory consistency check therefore failed in CI. Modification: Add the Java Broadcast.create overload accepting nonEagerCancelOutputs, cover it with a Java GraphDSL behavior test, and clarify alsoTo cancellation propagation docs. Result: Java and Scala DSL factory coverage is back in parity and alsoTo non-propagating cancellation behavior is documented. Tests: - rtk sbt "++ 3.3.8" "stream-tests / Test / testOnly org.apache.pekko.stream.DslFactoriesConsistencySpec org.apache.pekko.stream.javadsl.GraphDslTest org.apache.pekko.stream.scaladsl.GraphBroadcastSpec org.apache.pekko.stream.scaladsl.FlowAlsoToSpec" - passed - rtk sbt docs/paradox - passed - rtk sbt "+stream / mimaReportBinaryIssues" - passed - rtk scalafmt --mode diff-ref=origin/main - passed - rtk scalafmt --list --mode diff-ref=origin/main - passed - rtk sbt javafmtAll with JDK 17 - passed - rtk sbt javafmtCheckAll checkCodeStyle with JDK 17 - passed - rtk git diff --check - passed References: Refs #3127 --- .../stream/operators/Source-or-Flow/alsoTo.md | 11 ++++++++-- .../pekko/stream/javadsl/GraphDslTest.java | 22 +++++++++++++++++++ .../apache/pekko/stream/javadsl/Graph.scala | 21 ++++++++++++++++++ 3 files changed, 52 insertions(+), 2 deletions(-) diff --git a/docs/src/main/paradox/stream/operators/Source-or-Flow/alsoTo.md b/docs/src/main/paradox/stream/operators/Source-or-Flow/alsoTo.md index 452e012235a..430b4fedb20 100644 --- a/docs/src/main/paradox/stream/operators/Source-or-Flow/alsoTo.md +++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/alsoTo.md @@ -14,6 +14,12 @@ Attaches the given `Sink` to this `Flow`, meaning that elements that pass throug Attaches the given `Sink` to this `Flow`, meaning that elements that pass through this `Flow` will also be sent to the `Sink`. +By default, cancellation or failure of the attached `Sink` cancels the main stream. The `alsoTo` overload with +`propagateCancellation = false` can be used when the attached `Sink` is a best-effort side sink, such as logging or +metrics, and its cancellation or failure should not terminate the main stream. In that mode, the operator still +backpressures when the side `Sink` is active and backpressuring, but once the side `Sink` cancels or fails, elements +continue to the main downstream only. + ## Reactive Streams semantics @@@div { .callout } @@ -24,8 +30,9 @@ Attaches the given `Sink` to this `Flow`, meaning that elements that pass throug **completes** when upstream completes -**cancels** when downstream or `Sink` cancels +**cancels** when downstream cancels. With the default cancellation propagation, the operator also cancels when the +attached `Sink` cancels or fails. With `propagateCancellation = false`, cancellation or failure of the attached `Sink` +does not cancel the main stream. @@@ - diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/GraphDslTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/GraphDslTest.java index 9de0b03298f..110b65d863b 100644 --- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/GraphDslTest.java +++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/GraphDslTest.java @@ -17,6 +17,7 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; @@ -80,6 +81,27 @@ public void demonstrateBuildSimpleGraph() throws Exception { new String[] {"31", "32", "33", "34", "35", "41", "42", "43", "44", "45"}, res); } + @Test + public void mustKeepFlowingWhenNonEagerBroadcastOutputCancels() throws Exception { + final Source source = Source.from(Arrays.asList(1, 2, 3)); + final Sink>> sink = Sink.seq(); + + final RunnableGraph>> graph = + RunnableGraph.fromGraph( + GraphDSL.create( + sink, + (builder, out) -> { + final UniformFanOutShape broadcast = + builder.add(Broadcast.create(2, true, Collections.singleton(1))); + builder.from(builder.add(source)).viaFanOut(broadcast).to(out); + builder.from(broadcast).to(builder.add(Sink.cancelled())); + return ClosedShape.getInstance(); + })); + + assertEquals( + Arrays.asList(1, 2, 3), graph.run(system).toCompletableFuture().get(3, TimeUnit.SECONDS)); + } + @Test @SuppressWarnings("unused") public void demonstrateConnectErrors() { diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Graph.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Graph.scala index 4af7893d3f1..f897bde130f 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Graph.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Graph.scala @@ -209,6 +209,27 @@ object Broadcast { def create[T](outputCount: Int, eagerCancel: Boolean): Graph[UniformFanOutShape[T, T], NotUsed] = scaladsl.Broadcast(outputCount, eagerCancel = eagerCancel) + /** + * Create a new `Broadcast` operator with the specified input type and per-output cancellation behavior. + * + * @param outputCount number of output ports + * @param eagerCancel if true, broadcast cancels upstream if any of its downstreams cancel + * (except those listed in `nonEagerCancelOutputs`). + * @param nonEagerCancelOutputs set of output port indices whose cancellation should not + * trigger upstream cancellation. These outputs are silently + * removed from the broadcast when they cancel, and elements + * continue flowing to the remaining outputs. + * @since 2.0.0 + */ + def create[T]( + outputCount: Int, + eagerCancel: Boolean, + nonEagerCancelOutputs: util.Set[java.lang.Integer]): Graph[UniformFanOutShape[T, T], NotUsed] = + scaladsl.Broadcast( + outputCount, + eagerCancel = eagerCancel, + nonEagerCancelOutputs = nonEagerCancelOutputs.asScala.iterator.map(_.intValue()).toSet) + /** * Create a new `Broadcast` operator with the specified input type. *