Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions docs/src/main/paradox/stream/operators/Source-or-Flow/alsoTo.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ Attaches the given `Sink` to this `Flow`, meaning that elements that pass throug

Attaches the given `Sink` to this `Flow`, meaning that elements that pass through this `Flow` will also be sent to the `Sink`.

By default, cancellation or failure of the attached `Sink` cancels the main stream. The `alsoTo` overload with
`propagateCancellation = false` can be used when the attached `Sink` is a best-effort side sink, such as logging or
metrics, and its cancellation or failure should not terminate the main stream. In that mode, the operator still
backpressures when the side `Sink` is active and backpressuring, but once the side `Sink` cancels or fails, elements
continue to the main downstream only.

## Reactive Streams semantics

@@@div { .callout }
Expand All @@ -24,8 +30,9 @@ Attaches the given `Sink` to this `Flow`, meaning that elements that pass throug

**completes** when upstream completes

**cancels** when downstream or `Sink` cancels
**cancels** when downstream cancels. With the default cancellation propagation, the operator also cancels when the
attached `Sink` cancels or fails. With `propagateCancellation = false`, cancellation or failure of the attached `Sink`
does not cancel the main stream.

@@@


1 change: 1 addition & 0 deletions project/StreamOperatorsIndexGenerator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ object StreamOperatorsIndexGenerator extends AutoPlugin {
"mergeGraph",
"wireTapGraph",
"alsoToGraph",
"resilientAlsoToGraph",
"orElseGraph",
"divertToGraph",
"zipWithGraph",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1890,6 +1890,16 @@ public void mustBeAbleToUseAlsoTo() {
Flow.of(Integer.class).alsoToMat(Sink.ignore(), (i, n) -> "foo");
}

@Test
public void mustBeAbleToUseAlsoToWithPropagateCancellation() {
final Flow<Integer, Integer, NotUsed> f = Flow.of(Integer.class).alsoTo(Sink.ignore(), false);
final Flow<Integer, Integer, NotUsed> f2 = Flow.of(Integer.class).alsoTo(Sink.ignore(), true);
final Flow<Integer, Integer, String> f3 =
Flow.of(Integer.class).alsoToMat(Sink.ignore(), false, (i, n) -> "foo");
final Flow<Integer, Integer, String> f4 =
Flow.of(Integer.class).alsoToMat(Sink.ignore(), true, (i, n) -> "foo");
}

@Test
public void mustBeAbleToUseAlsoToAll() {
final Flow<Integer, Integer, NotUsed> f =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -80,6 +81,27 @@ public void demonstrateBuildSimpleGraph() throws Exception {
new String[] {"31", "32", "33", "34", "35", "41", "42", "43", "44", "45"}, res);
}

@Test
public void mustKeepFlowingWhenNonEagerBroadcastOutputCancels() throws Exception {
final Source<Integer, NotUsed> source = Source.from(Arrays.asList(1, 2, 3));
final Sink<Integer, CompletionStage<List<Integer>>> sink = Sink.seq();

final RunnableGraph<CompletionStage<List<Integer>>> graph =
RunnableGraph.fromGraph(
GraphDSL.create(
sink,
(builder, out) -> {
final UniformFanOutShape<Integer, Integer> broadcast =
builder.add(Broadcast.create(2, true, Collections.singleton(1)));
builder.from(builder.add(source)).viaFanOut(broadcast).to(out);
builder.from(broadcast).to(builder.add(Sink.<Integer>cancelled()));
return ClosedShape.getInstance();
}));

assertEquals(
Arrays.asList(1, 2, 3), graph.run(system).toCompletableFuture().get(3, TimeUnit.SECONDS));
}

@Test
@SuppressWarnings("unused")
public void demonstrateConnectErrors() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1490,6 +1490,16 @@ public void mustBeAbleToUseAlsoTo() {
Source.<Integer>empty().alsoToMat(Sink.ignore(), (i, n) -> "foo");
}

@Test
public void mustBeAbleToUseAlsoToWithPropagateCancellation() {
final Source<Integer, NotUsed> f = Source.<Integer>empty().alsoTo(Sink.ignore(), false);
final Source<Integer, NotUsed> f2 = Source.<Integer>empty().alsoTo(Sink.ignore(), true);
final Source<Integer, String> f3 =
Source.<Integer>empty().alsoToMat(Sink.ignore(), false, (i, n) -> "foo");
final Source<Integer, String> f4 =
Source.<Integer>empty().alsoToMat(Sink.ignore(), true, (i, n) -> "foo");
}

@Test
public void mustBeAbleToUseAlsoToAll() {
final Source<Integer, NotUsed> f =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ class DslConsistencySpec extends AnyWordSpec with Matchers {
"concatGraph",
"prependGraph",
"alsoToGraph",
"resilientAlsoToGraph",
"wireTapGraph",
"orElseGraph",
"divertToGraph",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,288 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.pekko.stream.scaladsl

import org.apache.pekko
import pekko.stream.testkit._
import pekko.stream.testkit.scaladsl.TestSink
import pekko.stream.testkit.scaladsl.TestSource

import scala.concurrent.duration._

class FlowAlsoToSpec extends StreamSpec("""
pekko.stream.materializer.initial-input-buffer-size = 2
""") {

"alsoTo with propagateCancellation=true (default)" must {

"cancel the stream when side sink cancels" in {
val (mainProbe, mainSink) = TestSink[Int]().preMaterialize()
val (sideProbe, sideSink) = TestSink[Int]().preMaterialize()
val (pub, src) = TestSource[Int]().preMaterialize()

src.alsoTo(sideSink).runWith(mainSink)

mainProbe.request(2)
sideProbe.request(2)

pub.sendNext(1)
mainProbe.expectNext(1)
sideProbe.expectNext(1)

sideProbe.cancel()
pub.expectCancellation()
}

"forward elements to both downstreams" in {
val (mainProbe, mainSink) = TestSink[Int]().preMaterialize()
val (sideProbe, sideSink) = TestSink[Int]().preMaterialize()
val (pub, src) = TestSource[Int]().preMaterialize()

src.alsoTo(sideSink).runWith(mainSink)

mainProbe.request(3)
sideProbe.request(3)

pub.sendNext(1)
pub.sendNext(2)
pub.sendNext(3)

mainProbe.expectNext(1, 2, 3)
sideProbe.expectNext(1, 2, 3)

pub.sendComplete()
mainProbe.expectComplete()
sideProbe.expectComplete()
}
}

"alsoTo with propagateCancellation=false" must {

"continue main stream when side sink cancels" in {
val (mainProbe, mainSink) = TestSink[Int]().preMaterialize()
val (sideProbe, sideSink) = TestSink[Int]().preMaterialize()
val (pub, src) = TestSource[Int]().preMaterialize()

src.alsoTo(sideSink, propagateCancellation = false).runWith(mainSink)

mainProbe.request(4)
sideProbe.request(2)

pub.sendNext(1)
mainProbe.expectNext(1)
sideProbe.expectNext(1)

pub.sendNext(2)
mainProbe.expectNext(2)
sideProbe.expectNext(2)

sideProbe.cancel()

pub.sendNext(3)
mainProbe.expectNext(3)

pub.sendNext(4)
mainProbe.expectNext(4)

pub.sendComplete()
mainProbe.expectComplete()
}

"continue main stream when side sink fails" in {
val (mainProbe, mainSink) = TestSink[Int]().preMaterialize()
val (sideProbe, sideSink) = TestSink[Int]().preMaterialize()
val (pub, src) = TestSource[Int]().preMaterialize()

val failingSideSink = Flow[Int].map { elem =>
if (elem == 1) throw new RuntimeException("side sink failure")
elem
}.to(sideSink)

src.alsoTo(failingSideSink, propagateCancellation = false).runWith(mainSink)

mainProbe.request(3)
sideProbe.request(3)

pub.sendNext(1)
mainProbe.expectNext(1)

pub.sendNext(2)
mainProbe.expectNext(2)

pub.sendNext(3)
mainProbe.expectNext(3)

pub.sendComplete()
mainProbe.expectComplete()
}

"cancel side sink when main downstream cancels" in {
val (mainProbe, mainSink) = TestSink[Int]().preMaterialize()
val (sideProbe, sideSink) = TestSink[Int]().preMaterialize()
val (pub, src) = TestSource[Int]().preMaterialize()

src.alsoTo(sideSink, propagateCancellation = false).runWith(mainSink)

mainProbe.request(1)
sideProbe.request(1)

pub.sendNext(1)
mainProbe.expectNext(1)
sideProbe.expectNext(1)

mainProbe.cancel()
pub.expectCancellation()
}

"forward elements to both downstreams before side cancels" in {
val (mainProbe, mainSink) = TestSink[Int]().preMaterialize()
val (sideProbe, sideSink) = TestSink[Int]().preMaterialize()
val (pub, src) = TestSource[Int]().preMaterialize()

src.alsoTo(sideSink, propagateCancellation = false).runWith(mainSink)

mainProbe.request(3)
sideProbe.request(3)

pub.sendNext(1)
pub.sendNext(2)
pub.sendNext(3)

mainProbe.expectNext(1, 2, 3)
sideProbe.expectNext(1, 2, 3)

pub.sendComplete()
mainProbe.expectComplete()
sideProbe.expectComplete()
}

"complete normally when upstream completes" in {
val (mainProbe, mainSink) = TestSink[Int]().preMaterialize()
val (sideProbe, sideSink) = TestSink[Int]().preMaterialize()
val (pub, src) = TestSource[Int]().preMaterialize()

src.alsoTo(sideSink, propagateCancellation = false).runWith(mainSink)

mainProbe.request(2)
sideProbe.request(2)

pub.sendNext(1)
pub.sendNext(2)
pub.sendComplete()

mainProbe.expectNext(1, 2).expectComplete()
sideProbe.expectNext(1, 2).expectComplete()
}

"handle side sink cancelling before any element is emitted" in {
val (mainProbe, mainSink) = TestSink[Int]().preMaterialize()
val (sideProbe, sideSink) = TestSink[Int]().preMaterialize()
val (pub, src) = TestSource[Int]().preMaterialize()

src.alsoTo(sideSink, propagateCancellation = false).runWith(mainSink)

mainProbe.request(2)
sideProbe.request(1)

sideProbe.cancel()

pub.sendNext(1)
mainProbe.expectNext(1)

pub.sendNext(2)
mainProbe.expectNext(2)

pub.sendComplete()
mainProbe.expectComplete()
}

"propagate upstream failure to both downstreams" in {
val (mainProbe, mainSink) = TestSink[Int]().preMaterialize()
val (sideProbe, sideSink) = TestSink[Int]().preMaterialize()
val (pub, src) = TestSource[Int]().preMaterialize()

src.alsoTo(sideSink, propagateCancellation = false).runWith(mainSink)

mainProbe.request(1)
sideProbe.request(1)

val ex = new RuntimeException("upstream boom")
pub.sendError(ex)

mainProbe.expectError(ex)
sideProbe.expectError(ex)
}

"backpressure when side sink is slow" in {
val (mainProbe, mainSink) = TestSink[Int]().preMaterialize()
val (sideProbe, sideSink) = TestSink[Int]().preMaterialize()
val (pub, src) = TestSource[Int]().preMaterialize()

src.alsoTo(sideSink, propagateCancellation = false).runWith(mainSink)

mainProbe.request(3)
sideProbe.request(1)

pub.sendNext(1)
mainProbe.expectNext(1)
sideProbe.expectNext(1)

sideProbe.request(1)

pub.sendNext(2)
mainProbe.expectNext(2)
sideProbe.expectNext(2)

sideProbe.request(1)

pub.sendNext(3)
mainProbe.expectNext(3)
sideProbe.expectNext(3)

pub.sendComplete()
mainProbe.expectComplete()
sideProbe.expectComplete()
}

"handle side sink cancelling while pending element exists" in {
val (mainProbe, mainSink) = TestSink[Int]().preMaterialize()
val (sideProbe, sideSink) = TestSink[Int]().preMaterialize()
val (pub, src) = TestSource[Int]().preMaterialize()

src.alsoTo(sideSink, propagateCancellation = false).runWith(mainSink)

mainProbe.request(3)
sideProbe.request(1)

pub.sendNext(1)
mainProbe.expectNext(1)
sideProbe.expectNext(1)

pub.sendNext(2)

sideProbe.cancel()
mainProbe.expectNext(2)

pub.sendNext(3)
mainProbe.expectNext(3)

pub.sendComplete()
mainProbe.expectComplete()
}
}
}
Loading