diff --git a/docs/src/main/paradox/stream/operators/Source-or-Flow/alsoTo.md b/docs/src/main/paradox/stream/operators/Source-or-Flow/alsoTo.md index 452e012235..430b4fedb2 100644 --- a/docs/src/main/paradox/stream/operators/Source-or-Flow/alsoTo.md +++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/alsoTo.md @@ -14,6 +14,12 @@ Attaches the given `Sink` to this `Flow`, meaning that elements that pass throug Attaches the given `Sink` to this `Flow`, meaning that elements that pass through this `Flow` will also be sent to the `Sink`. +By default, cancellation or failure of the attached `Sink` cancels the main stream. The `alsoTo` overload with +`propagateCancellation = false` can be used when the attached `Sink` is a best-effort side sink, such as logging or +metrics, and its cancellation or failure should not terminate the main stream. In that mode, the operator still +backpressures when the side `Sink` is active and backpressuring, but once the side `Sink` cancels or fails, elements +continue to the main downstream only. + ## Reactive Streams semantics @@@div { .callout } @@ -24,8 +30,9 @@ Attaches the given `Sink` to this `Flow`, meaning that elements that pass throug **completes** when upstream completes -**cancels** when downstream or `Sink` cancels +**cancels** when downstream cancels. With the default cancellation propagation, the operator also cancels when the +attached `Sink` cancels or fails. With `propagateCancellation = false`, cancellation or failure of the attached `Sink` +does not cancel the main stream. @@@ - diff --git a/project/StreamOperatorsIndexGenerator.scala b/project/StreamOperatorsIndexGenerator.scala index 1dcce00698..daf88cc4e4 100644 --- a/project/StreamOperatorsIndexGenerator.scala +++ b/project/StreamOperatorsIndexGenerator.scala @@ -81,6 +81,7 @@ object StreamOperatorsIndexGenerator extends AutoPlugin { "mergeGraph", "wireTapGraph", "alsoToGraph", + "resilientAlsoToGraph", "orElseGraph", "divertToGraph", "zipWithGraph", diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java index 7dd153b278..56dd1177c5 100644 --- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java +++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java @@ -1890,6 +1890,16 @@ public void mustBeAbleToUseAlsoTo() { Flow.of(Integer.class).alsoToMat(Sink.ignore(), (i, n) -> "foo"); } + @Test + public void mustBeAbleToUseAlsoToWithPropagateCancellation() { + final Flow f = Flow.of(Integer.class).alsoTo(Sink.ignore(), false); + final Flow f2 = Flow.of(Integer.class).alsoTo(Sink.ignore(), true); + final Flow f3 = + Flow.of(Integer.class).alsoToMat(Sink.ignore(), false, (i, n) -> "foo"); + final Flow f4 = + Flow.of(Integer.class).alsoToMat(Sink.ignore(), true, (i, n) -> "foo"); + } + @Test public void mustBeAbleToUseAlsoToAll() { final Flow f = diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/GraphDslTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/GraphDslTest.java index 9de0b03298..110b65d863 100644 --- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/GraphDslTest.java +++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/GraphDslTest.java @@ -17,6 +17,7 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; @@ -80,6 +81,27 @@ public void demonstrateBuildSimpleGraph() throws Exception { new String[] {"31", "32", "33", "34", "35", "41", "42", "43", "44", "45"}, res); } + @Test + public void mustKeepFlowingWhenNonEagerBroadcastOutputCancels() throws Exception { + final Source source = Source.from(Arrays.asList(1, 2, 3)); + final Sink>> sink = Sink.seq(); + + final RunnableGraph>> graph = + RunnableGraph.fromGraph( + GraphDSL.create( + sink, + (builder, out) -> { + final UniformFanOutShape broadcast = + builder.add(Broadcast.create(2, true, Collections.singleton(1))); + builder.from(builder.add(source)).viaFanOut(broadcast).to(out); + builder.from(broadcast).to(builder.add(Sink.cancelled())); + return ClosedShape.getInstance(); + })); + + assertEquals( + Arrays.asList(1, 2, 3), graph.run(system).toCompletableFuture().get(3, TimeUnit.SECONDS)); + } + @Test @SuppressWarnings("unused") public void demonstrateConnectErrors() { diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java index db3341baea..93b8ad1652 100644 --- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java +++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java @@ -1490,6 +1490,16 @@ public void mustBeAbleToUseAlsoTo() { Source.empty().alsoToMat(Sink.ignore(), (i, n) -> "foo"); } + @Test + public void mustBeAbleToUseAlsoToWithPropagateCancellation() { + final Source f = Source.empty().alsoTo(Sink.ignore(), false); + final Source f2 = Source.empty().alsoTo(Sink.ignore(), true); + final Source f3 = + Source.empty().alsoToMat(Sink.ignore(), false, (i, n) -> "foo"); + final Source f4 = + Source.empty().alsoToMat(Sink.ignore(), true, (i, n) -> "foo"); + } + @Test public void mustBeAbleToUseAlsoToAll() { final Source f = diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/DslConsistencySpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/DslConsistencySpec.scala index 1befcaefb7..2aa2c3ec01 100755 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/DslConsistencySpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/DslConsistencySpec.scala @@ -90,6 +90,7 @@ class DslConsistencySpec extends AnyWordSpec with Matchers { "concatGraph", "prependGraph", "alsoToGraph", + "resilientAlsoToGraph", "wireTapGraph", "orElseGraph", "divertToGraph", diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowAlsoToSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowAlsoToSpec.scala new file mode 100644 index 0000000000..c78b25ebb8 --- /dev/null +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowAlsoToSpec.scala @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.stream.scaladsl + +import org.apache.pekko +import pekko.stream.testkit._ +import pekko.stream.testkit.scaladsl.TestSink +import pekko.stream.testkit.scaladsl.TestSource + +import scala.concurrent.duration._ + +class FlowAlsoToSpec extends StreamSpec(""" + pekko.stream.materializer.initial-input-buffer-size = 2 + """) { + + "alsoTo with propagateCancellation=true (default)" must { + + "cancel the stream when side sink cancels" in { + val (mainProbe, mainSink) = TestSink[Int]().preMaterialize() + val (sideProbe, sideSink) = TestSink[Int]().preMaterialize() + val (pub, src) = TestSource[Int]().preMaterialize() + + src.alsoTo(sideSink).runWith(mainSink) + + mainProbe.request(2) + sideProbe.request(2) + + pub.sendNext(1) + mainProbe.expectNext(1) + sideProbe.expectNext(1) + + sideProbe.cancel() + pub.expectCancellation() + } + + "forward elements to both downstreams" in { + val (mainProbe, mainSink) = TestSink[Int]().preMaterialize() + val (sideProbe, sideSink) = TestSink[Int]().preMaterialize() + val (pub, src) = TestSource[Int]().preMaterialize() + + src.alsoTo(sideSink).runWith(mainSink) + + mainProbe.request(3) + sideProbe.request(3) + + pub.sendNext(1) + pub.sendNext(2) + pub.sendNext(3) + + mainProbe.expectNext(1, 2, 3) + sideProbe.expectNext(1, 2, 3) + + pub.sendComplete() + mainProbe.expectComplete() + sideProbe.expectComplete() + } + } + + "alsoTo with propagateCancellation=false" must { + + "continue main stream when side sink cancels" in { + val (mainProbe, mainSink) = TestSink[Int]().preMaterialize() + val (sideProbe, sideSink) = TestSink[Int]().preMaterialize() + val (pub, src) = TestSource[Int]().preMaterialize() + + src.alsoTo(sideSink, propagateCancellation = false).runWith(mainSink) + + mainProbe.request(4) + sideProbe.request(2) + + pub.sendNext(1) + mainProbe.expectNext(1) + sideProbe.expectNext(1) + + pub.sendNext(2) + mainProbe.expectNext(2) + sideProbe.expectNext(2) + + sideProbe.cancel() + + pub.sendNext(3) + mainProbe.expectNext(3) + + pub.sendNext(4) + mainProbe.expectNext(4) + + pub.sendComplete() + mainProbe.expectComplete() + } + + "continue main stream when side sink fails" in { + val (mainProbe, mainSink) = TestSink[Int]().preMaterialize() + val (sideProbe, sideSink) = TestSink[Int]().preMaterialize() + val (pub, src) = TestSource[Int]().preMaterialize() + + val failingSideSink = Flow[Int].map { elem => + if (elem == 1) throw new RuntimeException("side sink failure") + elem + }.to(sideSink) + + src.alsoTo(failingSideSink, propagateCancellation = false).runWith(mainSink) + + mainProbe.request(3) + sideProbe.request(3) + + pub.sendNext(1) + mainProbe.expectNext(1) + + pub.sendNext(2) + mainProbe.expectNext(2) + + pub.sendNext(3) + mainProbe.expectNext(3) + + pub.sendComplete() + mainProbe.expectComplete() + } + + "cancel side sink when main downstream cancels" in { + val (mainProbe, mainSink) = TestSink[Int]().preMaterialize() + val (sideProbe, sideSink) = TestSink[Int]().preMaterialize() + val (pub, src) = TestSource[Int]().preMaterialize() + + src.alsoTo(sideSink, propagateCancellation = false).runWith(mainSink) + + mainProbe.request(1) + sideProbe.request(1) + + pub.sendNext(1) + mainProbe.expectNext(1) + sideProbe.expectNext(1) + + mainProbe.cancel() + pub.expectCancellation() + } + + "forward elements to both downstreams before side cancels" in { + val (mainProbe, mainSink) = TestSink[Int]().preMaterialize() + val (sideProbe, sideSink) = TestSink[Int]().preMaterialize() + val (pub, src) = TestSource[Int]().preMaterialize() + + src.alsoTo(sideSink, propagateCancellation = false).runWith(mainSink) + + mainProbe.request(3) + sideProbe.request(3) + + pub.sendNext(1) + pub.sendNext(2) + pub.sendNext(3) + + mainProbe.expectNext(1, 2, 3) + sideProbe.expectNext(1, 2, 3) + + pub.sendComplete() + mainProbe.expectComplete() + sideProbe.expectComplete() + } + + "complete normally when upstream completes" in { + val (mainProbe, mainSink) = TestSink[Int]().preMaterialize() + val (sideProbe, sideSink) = TestSink[Int]().preMaterialize() + val (pub, src) = TestSource[Int]().preMaterialize() + + src.alsoTo(sideSink, propagateCancellation = false).runWith(mainSink) + + mainProbe.request(2) + sideProbe.request(2) + + pub.sendNext(1) + pub.sendNext(2) + pub.sendComplete() + + mainProbe.expectNext(1, 2).expectComplete() + sideProbe.expectNext(1, 2).expectComplete() + } + + "handle side sink cancelling before any element is emitted" in { + val (mainProbe, mainSink) = TestSink[Int]().preMaterialize() + val (sideProbe, sideSink) = TestSink[Int]().preMaterialize() + val (pub, src) = TestSource[Int]().preMaterialize() + + src.alsoTo(sideSink, propagateCancellation = false).runWith(mainSink) + + mainProbe.request(2) + sideProbe.request(1) + + sideProbe.cancel() + + pub.sendNext(1) + mainProbe.expectNext(1) + + pub.sendNext(2) + mainProbe.expectNext(2) + + pub.sendComplete() + mainProbe.expectComplete() + } + + "propagate upstream failure to both downstreams" in { + val (mainProbe, mainSink) = TestSink[Int]().preMaterialize() + val (sideProbe, sideSink) = TestSink[Int]().preMaterialize() + val (pub, src) = TestSource[Int]().preMaterialize() + + src.alsoTo(sideSink, propagateCancellation = false).runWith(mainSink) + + mainProbe.request(1) + sideProbe.request(1) + + val ex = new RuntimeException("upstream boom") + pub.sendError(ex) + + mainProbe.expectError(ex) + sideProbe.expectError(ex) + } + + "backpressure when side sink is slow" in { + val (mainProbe, mainSink) = TestSink[Int]().preMaterialize() + val (sideProbe, sideSink) = TestSink[Int]().preMaterialize() + val (pub, src) = TestSource[Int]().preMaterialize() + + src.alsoTo(sideSink, propagateCancellation = false).runWith(mainSink) + + mainProbe.request(3) + sideProbe.request(1) + + pub.sendNext(1) + mainProbe.expectNext(1) + sideProbe.expectNext(1) + + sideProbe.request(1) + + pub.sendNext(2) + mainProbe.expectNext(2) + sideProbe.expectNext(2) + + sideProbe.request(1) + + pub.sendNext(3) + mainProbe.expectNext(3) + sideProbe.expectNext(3) + + pub.sendComplete() + mainProbe.expectComplete() + sideProbe.expectComplete() + } + + "handle side sink cancelling while pending element exists" in { + val (mainProbe, mainSink) = TestSink[Int]().preMaterialize() + val (sideProbe, sideSink) = TestSink[Int]().preMaterialize() + val (pub, src) = TestSource[Int]().preMaterialize() + + src.alsoTo(sideSink, propagateCancellation = false).runWith(mainSink) + + mainProbe.request(3) + sideProbe.request(1) + + pub.sendNext(1) + mainProbe.expectNext(1) + sideProbe.expectNext(1) + + pub.sendNext(2) + + sideProbe.cancel() + mainProbe.expectNext(2) + + pub.sendNext(3) + mainProbe.expectNext(3) + + pub.sendComplete() + mainProbe.expectComplete() + } + } +} diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/GraphBroadcastSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/GraphBroadcastSpec.scala index 5817f7a633..0b9b3c3884 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/GraphBroadcastSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/GraphBroadcastSpec.scala @@ -301,6 +301,51 @@ class GraphBroadcastSpec extends StreamSpec(""" pMain.cancel() pIn.expectCancellation() } + + "continue to remaining outputs when a non-eager output cancels" in { + val (pub, src) = TestSource[Int]().preMaterialize() + val (mainProbe, mainSink) = TestSink[Int]().preMaterialize() + val (sideProbe, sideSink) = TestSink[Int]().preMaterialize() + + RunnableGraph + .fromGraph(GraphDSL.create() { implicit b => + import GraphDSL.Implicits._ + // eagerCancel=true but side output (index 1) is non-eager: its cancellation + // must not tear down the stage. Main output (index 0) keeps flowing. + val bcast = b.add(Broadcast[Int](2, eagerCancel = true, nonEagerCancelOutputs = Set(1))) + src ~> bcast.in + bcast.out(0) ~> mainSink + bcast.out(1) ~> sideSink + ClosedShape + }) + .run() + + mainProbe.request(3) + sideProbe.request(2) + + pub.sendNext(1) + mainProbe.expectNext(1) + sideProbe.expectNext(1) + + pub.sendNext(2) + mainProbe.expectNext(2) + sideProbe.expectNext(2) + + sideProbe.cancel() + + pub.sendNext(3) + mainProbe.expectNext(3) + + pub.sendComplete() + mainProbe.expectComplete() + } + + "reject nonEagerCancelOutputs when eagerCancel is false" in { + val ex = intercept[IllegalArgumentException] { + Broadcast[Int](2, eagerCancel = false, nonEagerCancelOutputs = Set(1)) + } + ex.getMessage should include("requires eagerCancel=true") + } } } diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala index 512b8ac022..8e628c917b 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala @@ -3282,6 +3282,30 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr def alsoTo(that: Graph[SinkShape[Out], ?]): javadsl.Flow[In, Out, Mat] = new Flow(delegate.alsoTo(that)) + /** + * Attaches the given [[Sink]] to this [[Flow]], meaning that elements that pass + * through will also be sent to the [[Sink]]. + * + * When `propagateCancellation` is `false`, cancellation or failure of the side [[Sink]] + * will not cancel the main stream. Elements will continue to flow to the main downstream only. + * + * When `propagateCancellation` is `true` (the default), this behaves identically to [[#alsoTo]]. + * + * '''Emits when''' element is available and demand exists both from the Sink and the downstream. + * + * '''Backpressures when''' downstream or Sink backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels (the side [[Sink]] is also cancelled). + * When `propagateCancellation` is `true`, cancellation or failure of + * the side [[Sink]] also cancels the downstream. + * + * @since 2.0.0 + */ + def alsoTo(that: Graph[SinkShape[Out], ?], propagateCancellation: Boolean): javadsl.Flow[In, Out, Mat] = + new Flow(delegate.alsoTo(that, propagateCancellation)) + /** * Attaches the given [[Sink]]s to this [[Flow]], meaning that elements that passes * through will also be sent to all those [[Sink]]s. @@ -3317,6 +3341,23 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr matF: function.Function2[Mat, M2, M3]): javadsl.Flow[In, Out, M3] = new Flow(delegate.alsoToMat(that)(combinerToScala(matF))) + /** + * Attaches the given [[Sink]] to this [[Flow]], meaning that elements that pass + * through will also be sent to the [[Sink]]. + * + * @see [[#alsoTo]] + * + * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners + * where appropriate instead of manually writing functions that pass through one of the values. + * + * @since 2.0.0 + */ + def alsoToMat[M2, M3]( + that: Graph[SinkShape[Out], M2], + propagateCancellation: Boolean, + matF: function.Function2[Mat, M2, M3]): javadsl.Flow[In, Out, M3] = + new Flow(delegate.alsoToMat(that, propagateCancellation)(combinerToScala(matF))) + /** * Attaches the given [[Sink]] to this [[Flow]], meaning that elements will be sent to the [[Sink]] * instead of being passed through if the predicate `when` returns `true`. diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Graph.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Graph.scala index 4af7893d3f..f897bde130 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Graph.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Graph.scala @@ -209,6 +209,27 @@ object Broadcast { def create[T](outputCount: Int, eagerCancel: Boolean): Graph[UniformFanOutShape[T, T], NotUsed] = scaladsl.Broadcast(outputCount, eagerCancel = eagerCancel) + /** + * Create a new `Broadcast` operator with the specified input type and per-output cancellation behavior. + * + * @param outputCount number of output ports + * @param eagerCancel if true, broadcast cancels upstream if any of its downstreams cancel + * (except those listed in `nonEagerCancelOutputs`). + * @param nonEagerCancelOutputs set of output port indices whose cancellation should not + * trigger upstream cancellation. These outputs are silently + * removed from the broadcast when they cancel, and elements + * continue flowing to the remaining outputs. + * @since 2.0.0 + */ + def create[T]( + outputCount: Int, + eagerCancel: Boolean, + nonEagerCancelOutputs: util.Set[java.lang.Integer]): Graph[UniformFanOutShape[T, T], NotUsed] = + scaladsl.Broadcast( + outputCount, + eagerCancel = eagerCancel, + nonEagerCancelOutputs = nonEagerCancelOutputs.asScala.iterator.map(_.intValue()).toSet) + /** * Create a new `Broadcast` operator with the specified input type. * diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index e6ed914a8e..3a61888770 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -1406,6 +1406,30 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ def alsoTo(that: Graph[SinkShape[Out], ?]): javadsl.Source[Out, Mat] = new Source(delegate.alsoTo(that)) + /** + * Attaches the given [[Sink]] to this [[Source]], meaning that elements that pass + * through will also be sent to the [[Sink]]. + * + * When `propagateCancellation` is `false`, cancellation or failure of the side [[Sink]] + * will not cancel the main stream. Elements will continue to flow to the main downstream only. + * + * When `propagateCancellation` is `true` (the default), this behaves identically to [[#alsoTo]]. + * + * '''Emits when''' element is available and demand exists both from the Sink and the downstream. + * + * '''Backpressures when''' downstream or Sink backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels (the side [[Sink]] is also cancelled). + * When `propagateCancellation` is `true`, cancellation or failure of + * the side [[Sink]] also cancels the downstream. + * + * @since 2.0.0 + */ + def alsoTo(that: Graph[SinkShape[Out], ?], propagateCancellation: Boolean): javadsl.Source[Out, Mat] = + new Source(delegate.alsoTo(that, propagateCancellation)) + /** * Attaches the given [[Sink]]s to this [[Source]], meaning that elements that passes * through will also be sent to all those [[Sink]]s. @@ -1441,6 +1465,23 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ matF: function.Function2[Mat, M2, M3]): javadsl.Source[Out, M3] = new Source(delegate.alsoToMat(that)(combinerToScala(matF))) + /** + * Attaches the given [[Sink]] to this [[Source]], meaning that elements that pass + * through will also be sent to the [[Sink]]. + * + * @see [[#alsoTo]] + * + * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners + * where appropriate instead of manually writing functions that pass through one of the values. + * + * @since 2.0.0 + */ + def alsoToMat[M2, M3]( + that: Graph[SinkShape[Out], M2], + propagateCancellation: Boolean, + matF: function.Function2[Mat, M2, M3]): javadsl.Source[Out, M3] = + new Source(delegate.alsoToMat(that, propagateCancellation)(combinerToScala(matF))) + /** * Attaches the given [[Sink]] to this [[Flow]], meaning that elements will be sent to the [[Sink]] * instead of being passed through if the predicate `when` returns `true`. diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala index e2a1644e12..40d38b0081 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala @@ -2289,6 +2289,30 @@ final class SubFlow[In, Out, Mat]( def alsoTo(that: Graph[SinkShape[Out], ?]): SubFlow[In, Out, Mat] = new SubFlow(delegate.alsoTo(that)) + /** + * Attaches the given [[Sink]] to this [[Flow]], meaning that elements that pass + * through will also be sent to the [[Sink]]. + * + * When `propagateCancellation` is `false`, cancellation or failure of the side [[Sink]] + * will not cancel the main stream. Elements will continue to flow to the main downstream only. + * + * When `propagateCancellation` is `true` (the default), this behaves identically to [[#alsoTo]]. + * + * '''Emits when''' element is available and demand exists both from the Sink and the downstream. + * + * '''Backpressures when''' downstream or Sink backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels (the side [[Sink]] is also cancelled). + * When `propagateCancellation` is `true`, cancellation or failure of + * the side [[Sink]] also cancels the downstream. + * + * @since 2.0.0 + */ + def alsoTo(that: Graph[SinkShape[Out], ?], propagateCancellation: Boolean): SubFlow[In, Out, Mat] = + new SubFlow(delegate.alsoTo(that, propagateCancellation)) + /** * Attaches the given [[Sink]]s to this [[Flow]], meaning that elements that passes * through will also be sent to all those [[Sink]]s. diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala index c756c91c56..4259f3e9e2 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala @@ -2255,6 +2255,30 @@ final class SubSource[Out, Mat]( def alsoTo(that: Graph[SinkShape[Out], ?]): SubSource[Out, Mat] = new SubSource(delegate.alsoTo(that)) + /** + * Attaches the given [[Sink]] to this [[Source]], meaning that elements that pass + * through will also be sent to the [[Sink]]. + * + * When `propagateCancellation` is `false`, cancellation or failure of the side [[Sink]] + * will not cancel the main stream. Elements will continue to flow to the main downstream only. + * + * When `propagateCancellation` is `true` (the default), this behaves identically to [[#alsoTo]]. + * + * '''Emits when''' element is available and demand exists both from the Sink and the downstream. + * + * '''Backpressures when''' downstream or Sink backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels (the side [[Sink]] is also cancelled). + * When `propagateCancellation` is `true`, cancellation or failure of + * the side [[Sink]] also cancels the downstream. + * + * @since 2.0.0 + */ + def alsoTo(that: Graph[SinkShape[Out], ?], propagateCancellation: Boolean): SubSource[Out, Mat] = + new SubSource(delegate.alsoTo(that, propagateCancellation)) + /** * Attaches the given [[Sink]]s to this [[Source]], meaning that elements that passes * through will also be sent to all those [[Sink]]s. diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala index 4ae16e46ce..15e657c1b9 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala @@ -4019,6 +4019,44 @@ trait FlowOps[+Out, +Mat] { FlowShape(bcast.in, bcast.out(0)) } + /** + * Attaches the given [[Sink]] to this [[Flow]], meaning that elements that pass + * through will also be sent to the [[Sink]]. + * + * When `propagateCancellation` is `false`, cancellation or failure of the side [[Sink]] + * will not cancel the main stream. Elements will continue to flow to the main downstream + * only. This is useful for fire-and-forget side sinks (e.g. logging) where the side sink's + * availability should not affect the main business stream. + * + * When `propagateCancellation` is `true` (the default), this behaves identically to [[#alsoTo]]. + * + * It is similar to [[#wireTap]] but will backpressure instead of dropping elements when the given [[Sink]] is not ready. + * + * '''Emits when''' element is available and demand exists both from the Sink and the downstream. + * + * '''Backpressures when''' downstream or Sink backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels (the side [[Sink]] is also cancelled). + * When `propagateCancellation` is `true`, cancellation or failure of + * the side [[Sink]] also cancels the downstream. + * + * @since 2.0.0 + */ + def alsoTo(that: Graph[SinkShape[Out], ?], propagateCancellation: Boolean): Repr[Out] = + if (propagateCancellation) alsoTo(that) + else via(resilientAlsoToGraph(that)) + + protected def resilientAlsoToGraph[M]( + that: Graph[SinkShape[Out], M]): Graph[FlowShape[Out @uncheckedVariance, Out], M] = + GraphDSL.createGraph(that) { implicit b => r => + import GraphDSL.Implicits._ + val bcast = b.add(Broadcast[Out](2, eagerCancel = true, nonEagerCancelOutputs = Set(1))) + bcast.out(1) ~> r + FlowShape(bcast.in, bcast.out(0)) + } + /** * Attaches the given [[Sink]]s to this [[Source]], meaning that elements that pass * through will also be sent to the [[Sink]]. @@ -4536,6 +4574,22 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] { def alsoToMat[Mat2, Mat3](that: Graph[SinkShape[Out], Mat2])(matF: (Mat, Mat2) => Mat3): ReprMat[Out, Mat3] = viaMat(alsoToGraph(that))(matF) + /** + * Attaches the given [[Sink]] to this [[Flow]], meaning that elements that pass + * through will also be sent to the [[Sink]]. + * + * @see [[#alsoTo]] + * + * It is recommended to use the internally optimized `Keep.left` and `Keep.right` combiners + * where appropriate instead of manually writing functions that pass through one of the values. + * + * @since 2.0.0 + */ + def alsoToMat[Mat2, Mat3](that: Graph[SinkShape[Out], Mat2], propagateCancellation: Boolean)( + matF: (Mat, Mat2) => Mat3): ReprMat[Out, Mat3] = + if (propagateCancellation) viaMat(alsoToGraph(that))(matF) + else viaMat(resilientAlsoToGraph(that))(matF) + /** * Attaches the given [[Sink]] to this [[Flow]], meaning that elements will be sent to the [[Sink]] * instead of being passed through if the predicate `when` returns `true`. diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContext.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContext.scala index fd7c1eafb6..40cf9ce25f 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContext.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContext.scala @@ -149,6 +149,10 @@ final class FlowWithContext[-In, -CtxIn, +Out, +CtxOut, +Mat](delegate: Flow[(In override def alsoTo(that: Graph[SinkShape[Out], ?]): Repr[Out, CtxOut] = FlowWithContext.fromTuples(delegate.alsoTo(Sink.contramapImpl(that, (in: (Out, CtxOut)) => in._1))) + override def alsoTo(that: Graph[SinkShape[Out], ?], propagateCancellation: Boolean): Repr[Out, CtxOut] = + FlowWithContext.fromTuples( + delegate.alsoTo(Sink.contramapImpl(that, (in: (Out, CtxOut)) => in._1), propagateCancellation)) + override def alsoToContext(that: Graph[SinkShape[CtxOut], ?]): Repr[Out, CtxOut] = FlowWithContext.fromTuples(delegate.alsoTo(Sink.contramapImpl(that, (in: (Out, CtxOut)) => in._2))) diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContextOps.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContextOps.scala index 9fafa15e64..65aa7c095d 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContextOps.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContextOps.scala @@ -94,6 +94,14 @@ trait FlowWithContextOps[+Out, +Ctx, +Mat] { */ def alsoTo(that: Graph[SinkShape[Out], ?]): Repr[Out, Ctx] + /** + * Data variant of [[pekko.stream.scaladsl.FlowOps.alsoTo]] with configurable cancellation propagation. + * + * @see [[pekko.stream.scaladsl.FlowOps.alsoTo]] + * @since 2.0.0 + */ + def alsoTo(that: Graph[SinkShape[Out], ?], propagateCancellation: Boolean): Repr[Out, Ctx] + /** * Context variant of [[pekko.stream.scaladsl.FlowOps.alsoTo]] * diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Graph.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Graph.scala index 6cde3ffa9f..83c13b5173 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Graph.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Graph.scala @@ -609,6 +609,25 @@ object Broadcast { */ def apply[T](outputPorts: Int, eagerCancel: Boolean = false): Broadcast[T] = new Broadcast(outputPorts, eagerCancel) + + /** + * Create a new `Broadcast` with the specified number of output ports and per-output + * cancellation behavior. + * + * @param outputPorts number of output ports + * @param eagerCancel if true, broadcast cancels upstream if any of its downstreams cancel + * (except those listed in `nonEagerCancelOutputs`). + * @param nonEagerCancelOutputs set of output port indices whose cancellation should not + * trigger upstream cancellation. These outputs are silently + * removed from the broadcast when they cancel, and elements + * continue flowing to the remaining outputs. + * @since 2.0.0 + */ + def apply[T]( + outputPorts: Int, + eagerCancel: Boolean, + nonEagerCancelOutputs: Set[Int]): Broadcast[T] = + new Broadcast(outputPorts, eagerCancel, nonEagerCancelOutputs) } /** @@ -627,8 +646,46 @@ object Broadcast { final class Broadcast[T](val outputPorts: Int, val eagerCancel: Boolean) extends GraphStage[UniformFanOutShape[T, T]] with pekko.stream.TypePreservingFanOut { - // one output might seem counter intuitive but saves us from special handling in other places require(outputPorts >= 1, "A Broadcast must have one or more output ports") + + // Backing field for the 3-arg auxiliary constructor; default is empty (all outputs eager). + // The auxiliary constructor assigns this exactly once during construction, so the value is + // effectively immutable after init. Kept `var` because Scala auxiliary constructors cannot + // assign `val` fields. + private var _nonEagerCancelOutputs: Set[Int] = Set.empty + + /** + * Set of output port indices whose cancellation should not trigger upstream cancellation. + * Only meaningful when `eagerCancel` is `true`; these outputs are silently removed from the + * broadcast when they cancel, and elements continue flowing to the remaining outputs. + * + * @since 2.0.0 + */ + def nonEagerCancelOutputs: Set[Int] = _nonEagerCancelOutputs + + /** + * Create a new `Broadcast` with per-output cancellation behavior. + * + * @param nonEagerCancelOutputs set of output port indices whose cancellation should not + * trigger upstream cancellation when `eagerCancel` is `true`. + * These outputs are silently removed from the broadcast when + * they cancel, and elements continue flowing to the remaining + * outputs. Must be empty when `eagerCancel` is `false` (where + * the parameter would be meaningless). + * @since 2.0.0 + */ + def this(outputPorts: Int, eagerCancel: Boolean, nonEagerCancelOutputs: Set[Int]) = { + this(outputPorts, eagerCancel) + require( + eagerCancel || nonEagerCancelOutputs.isEmpty, + s"nonEagerCancelOutputs [$nonEagerCancelOutputs] requires eagerCancel=true; " + + s"with eagerCancel=false all outputs already behave non-eagerly") + require( + nonEagerCancelOutputs.subsetOf((0 until outputPorts).toSet), + s"nonEagerCancelOutputs [$nonEagerCancelOutputs] must be a subset of [${0 until outputPorts}]") + _nonEagerCancelOutputs = nonEagerCancelOutputs + } + val in: Inlet[T] = Inlet[T]("Broadcast.in") val out: immutable.IndexedSeq[Outlet[T]] = Vector.tabulate(outputPorts)(i => Outlet[T]("Broadcast.out" + i)) override def initialAttributes = DefaultAttributes.broadcast @@ -677,7 +734,7 @@ final class Broadcast[T](val outputPorts: Int, val eagerCancel: Boolean) } override def onDownstreamFinish(cause: Throwable) = { - if (eagerCancel) cancelStage(cause) + if (eagerCancel && !nonEagerCancelOutputs.contains(i)) cancelStage(cause) else { downstreamsRunning -= 1 if (downstreamsRunning == 0) cancelStage(cause) diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/SourceWithContext.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/SourceWithContext.scala index 4890b40efe..e97b206696 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/SourceWithContext.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/SourceWithContext.scala @@ -166,6 +166,10 @@ final class SourceWithContext[+Out, +Ctx, +Mat] private[stream] (delegate: Sourc override def alsoTo(that: Graph[SinkShape[Out], ?]): Repr[Out, Ctx] = SourceWithContext.fromTuples(delegate.alsoTo(Sink.contramapImpl(that, (in: (Out, Ctx)) => in._1))) + override def alsoTo(that: Graph[SinkShape[Out], ?], propagateCancellation: Boolean): Repr[Out, Ctx] = + SourceWithContext.fromTuples( + delegate.alsoTo(Sink.contramapImpl(that, (in: (Out, Ctx)) => in._1), propagateCancellation)) + override def alsoToContext(that: Graph[SinkShape[Ctx], ?]): Repr[Out, Ctx] = SourceWithContext.fromTuples(delegate.alsoTo(Sink.contramapImpl(that, (in: (Out, Ctx)) => in._2)))