Skip to content

feat(stream): add alsoTo overload with configurable cancellation propagation#3127

Open
He-Pin wants to merge 7 commits into
mainfrom
feat/alsoTo-propagateCancellation-3104
Open

feat(stream): add alsoTo overload with configurable cancellation propagation#3127
He-Pin wants to merge 7 commits into
mainfrom
feat/alsoTo-propagateCancellation-3104

Conversation

@He-Pin

@He-Pin He-Pin commented Jun 22, 2026

Copy link
Copy Markdown
Member

Motivation

alsoTo uses Broadcast[Out](2, eagerCancel = true) internally. When the side sink fails or cancels, the entire stream is terminated. Users cannot isolate the side sink — for example, a fire-and-forget logging sink should not kill the main business stream when the logging destination is temporarily unavailable.

Fixes #3104.

Modification

Add a new alsoTo(sink, propagateCancellation: Boolean) overload to both Scala and Java DSLs:

  • When propagateCancellation = true (default), behavior is identical to existing alsoTo
  • When propagateCancellation = false, a new ResilientAlsoTo GraphStage is used that:
    • Backpressures when either output backpressures (same contract as alsoTo/Broadcast)
    • When the side sink cancels or fails, logs a warning and continues forwarding to main downstream only
    • When the main downstream cancels, cancels the side sink normally

Changes:

  • New ResilientAlsoTo GraphStage in Graph.scala
  • alsoTo(Graph, Boolean) and alsoToMat(Graph, Boolean)(matF) overloads in FlowOps/FlowOpsMat
  • Java DSL overloads: Flow, Source, SubFlow, SubSource
  • Java DSL alsoToMat(Graph, Boolean, Function2) overloads: Flow, Source
  • FlowWithContext/SourceWithContext overrides
  • DefaultAttributes.resilientAlsoTo for stage naming

Result

Users can now use alsoTo(sink, propagateCancellation = false) to fire-and-forget to a side sink without risking main stream termination. Default behavior is unchanged and fully backwards-compatible.

Tests

  • FlowAlsoToSpec: 11/11 passed — covers both propagation modes, upstream failure, backpressure, side cancellation scenarios
  • DslConsistencySpec: 12/12 passed — Scala/Java DSL consistency verified
  • FlowAlsoToAllSpec: 2/2 passed — no regression
sbt "stream-tests / Test / testOnly org.apache.pekko.stream.scaladsl.FlowAlsoToSpec"

References

Fixes #3104

@He-Pin He-Pin requested a review from pjfanning June 22, 2026 17:57
@He-Pin He-Pin marked this pull request as draft June 22, 2026 17:58
He-Pin added 6 commits June 23, 2026 22:34
…agation

Motivation:
alsoTo uses Broadcast[Out](2, eagerCancel = true) internally. When the
side sink fails or cancels, the entire stream is terminated. Users cannot
isolate the side sink — for example, a fire-and-forget logging sink
should not kill the main business stream when the logging destination is
temporarily unavailable.

Modification:
Add a new alsoTo(sink, propagateCancellation: Boolean) overload. When
propagateCancellation is false, a new ResilientAlsoTo GraphStage is used
instead of Broadcast. ResilientAlsoTo backpressures when either output
backpressures (same contract as alsoTo), but when the side sink cancels
or fails, elements continue flowing to the main downstream only. A
warning is logged on side sink failure. Also add alsoToMat overloads for
both Scala and Java DSLs, and update FlowWithContext/SourceWithContext.

Result:
Users can now use alsoTo(sink, propagateCancellation = false) to
fire-and-forget to a side sink without risking main stream termination.
Default behavior (propagateCancellation = true) is unchanged.

Tests:
- sbt "stream-tests / Test / testOnly org.apache.pekko.stream.scaladsl.FlowAlsoToSpec" — 11/11 passed
- sbt "stream-tests / Test / testOnly org.apache.pekko.stream.DslConsistencySpec" — 12/12 passed
- sbt "stream-tests / Test / testOnly org.apache.pekko.stream.scaladsl.FlowAlsoToAllSpec" — 2/2 passed

References:
Fixes #3104
…Attributes

Motivation:
Review feedback: @SInCE should be 2.0.0, Java tests were missing, and
DefaultAttributes.resilientAlsoTo was unnecessary.

Modification:
- Fix @SInCE 1.2.0 to @SInCE 2.0.0 across all new alsoTo/alsoToMat
  overloads
- Add Java DSL compilation tests for alsoTo(propagateCancellation)
  and alsoToMat(propagateCancellation) in FlowTest and SourceTest
- Remove DefaultAttributes.resilientAlsoTo; use inline
  Attributes.name("resilientAlsoTo") instead

Result:
Correct version annotation, Java API coverage, cleaner DefaultAttributes.

Tests:
- sbt "stream-tests / Test / testOnly org.apache.pekko.stream.scaladsl.FlowAlsoToSpec org.apache.pekko.stream.DslConsistencySpec org.apache.pekko.stream.javadsl.FlowTest org.apache.pekko.stream.javadsl.SourceTest" — 222/222 passed

References:
Refs #3104
…roadcast

Motivation:
ResilientAlsoTo was a separate GraphStage duplicating Broadcast's
fan-out logic. WireTap was also modified with an unrelated backpressure
parameter.

Modification:
Add nonEagerCancelOutputs: Set[Int] parameter to Broadcast. Outputs in
this set do not trigger stage cancellation when they cancel — elements
continue flowing to remaining outputs. alsoTo(propagateCancellation=false)
now uses Broadcast(2, eagerCancel=true, nonEagerCancelOutputs=Set(1)),
eliminating ResilientAlsoTo entirely. WireTap is reverted to its
original form. Also remove DefaultAttributes.resilientAlsoTo and fix
@SInCE to 2.0.0 across all new methods. Add Java DSL compilation tests.

Result:
Cleaner implementation built on existing Broadcast. No new GraphStage.
Same 224 tests pass.

Tests:
- 224/224 passed (FlowAlsoToSpec, DslConsistencySpec, FlowTest, SourceTest, FlowAlsoToAllSpec)

References:
Refs #3104
Motivation:
The previous commit restructured Broadcast to take a 3-arg primary
constructor (Int, Boolean, Set[Int]), replacing the historical 2-arg
(Int, Boolean) primary. That changes the JVM `<init>` descriptor, which
is a binary-compatibility break for any external caller compiled against
prior Pekko versions invoking `new Broadcast(n, eager)`. Also, the
require condition allowed a non-empty nonEagerCancelOutputs with
eagerCancel=false, where the parameter is silently meaningless.

Modification:
- Restore the 2-arg `(Int, Boolean)` as the primary constructor,
  preserving the `<init>(IZ)V` JVM descriptor for existing callers.
  Add the 3-arg `(Int, Boolean, Set[Int])` as an auxiliary
  constructor. Store nonEagerCancelOutputs in a private mutable
  backing field assigned once from the auxiliary constructor; expose
  it via a public `def nonEagerCancelOutputs` accessor (source- and
  binary-compatible since it is a new, unreleased API).
- Strengthen require: nonEagerCancelOutputs must imply eagerCancel=true
  (rejected with a descriptive message otherwise), and must be a
  subset of valid output indices.
- Add two regression tests in GraphBroadcastSpec:
  * "continue to remaining outputs when a non-eager output cancels"
    directly exercises the new Broadcast overload.
  * "reject nonEagerCancelOutputs when eagerCancel is false" verifies
    the require guard.

Result:
Existing callers compiled against the old Broadcast(Int, Boolean)
constructor keep working (MiMa clean). New 3-arg constructor is
source-available. Invalid Broadcast configurations are rejected at
construction time with a clear message.

Tests:
- sbt 'stream-tests / Test / testOnly org.apache.pekko.stream.scaladsl.GraphBroadcastSpec' -> 13/13 passed
- sbt 'stream-tests / Test / testOnly org.apache.pekko.stream.scaladsl.FlowAlsoToSpec' -> 11/11 passed
- sbt 'stream-tests / Test / testOnly org.apache.pekko.stream.DslConsistencySpec' -> 12/12 passed
- sbt '+stream / mimaReportBinaryIssues' -> clean

References:
Refs #3104
@He-Pin He-Pin force-pushed the feat/alsoTo-propagateCancellation-3104 branch from 6c3c249 to 93b2b18 Compare June 23, 2026 14:55
Motivation:
PR #3127 added a Scala Broadcast factory overload used by alsoTo(propagateCancellation = false), but the Java DSL did not expose the matching factory. The Scala 3.3 DSL factory consistency check therefore failed in CI.

Modification:
Add the Java Broadcast.create overload accepting nonEagerCancelOutputs, cover it with a Java GraphDSL behavior test, and clarify alsoTo cancellation propagation docs.

Result:
Java and Scala DSL factory coverage is back in parity and alsoTo non-propagating cancellation behavior is documented.

Tests:
- rtk sbt "++ 3.3.8" "stream-tests / Test / testOnly org.apache.pekko.stream.DslFactoriesConsistencySpec org.apache.pekko.stream.javadsl.GraphDslTest org.apache.pekko.stream.scaladsl.GraphBroadcastSpec org.apache.pekko.stream.scaladsl.FlowAlsoToSpec" - passed
- rtk sbt docs/paradox - passed
- rtk sbt "+stream / mimaReportBinaryIssues" - passed
- rtk scalafmt --mode diff-ref=origin/main - passed
- rtk scalafmt --list --mode diff-ref=origin/main - passed
- rtk sbt javafmtAll with JDK 17 - passed
- rtk sbt javafmtCheckAll checkCodeStyle with JDK 17 - passed
- rtk git diff --check - passed

References:
Refs #3127
@He-Pin He-Pin marked this pull request as ready for review June 24, 2026 15:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

AlsoTo should support configurable downstream failure propagation

1 participant