Skip to content

feat: add supervision strategy support for groupedAdjacentBy/Weighted#3181

Open
He-Pin wants to merge 2 commits into
apache:mainfrom
He-Pin:feat/stream-groupedadjacent-supervision
Open

feat: add supervision strategy support for groupedAdjacentBy/Weighted#3181
He-Pin wants to merge 2 commits into
apache:mainfrom
He-Pin:feat/stream-groupedadjacent-supervision

Conversation

@He-Pin

@He-Pin He-Pin commented Jun 25, 2026

Copy link
Copy Markdown
Member

Motivation

Per the stream error handling docs, operators that apply user-provided functions should consult the configured SupervisionStrategy. The groupedAdjacentBy and groupedAdjacentByWeighted operators apply two user functions — a key function f and (for the weighted variant) a costFn — but both were called without a try-catch, so any exception failed the stream unconditionally, even under Supervision.Resume/Restart.

This is part of the meta-issue #3110 (add supervisor strategy support to stream operators that accept user functions). One operator (group) per PR.

Modification

  • Wrap both the costFn and the key function calls in GroupedAdjacentByWeighted with a try/catch (NonFatal) that consults the SupervisionStrategy decider via a shared superviseUserFn helper:
    • Stop → fail the stage (unchanged fail-fast behavior)
    • Resume → skip the offending element, keep the current group
    • Restartdrop the in-progress group (an already-completed group awaiting downstream demand is preserved)
  • The negative-cost and null-key checks are intentionally left unchanged — they remain unconditional failures, because they are output-contract violations (the function returned an invalid value) rather than user-function exceptions. This matches the sibling groupedWeighted/groupedWeightedWithin operators.
  • decider is a lazy val → zero overhead on the happy path.
  • Document supervision adherence on both operators in the Scala/Java DSL scaladoc and the operator reference pages.

Result

groupedAdjacentBy and groupedAdjacentByWeighted now adhere to the SupervisionStrategy attribute for both user functions.

Tests

  • sbt "stream-tests/Test/testOnly org.apache.pekko.stream.scaladsl.FlowGroupedAdjacentByWeightedSpec"14/14 passed, including 8 new directional tests covering both functions: costFn throws (Stop, default Stop, Resume, Restart), key function throws (Stop, Resume, Restart, and Resume-on-last-element flushed at completion).
  • sbt "stream/mimaReportBinaryIssues" — clean (binary compatible)

References

Refs #3110

This is a clean-room implementation written directly for Apache Pekko.

He-Pin added 2 commits June 25, 2026 14:02
Motivation:
The groupedAdjacentBy and groupedAdjacentByWeighted operators apply two
user-provided functions, a key function and a cost function, but both were
called without a try-catch. Any exception failed the stream unconditionally,
ignoring the configured SupervisionStrategy.

Modification:
Wrap both the costFn and the key function calls in GroupedAdjacentByWeighted
with a try-catch that consults the SupervisionStrategy decider via a shared
superviseUserFn helper. Stop fails the stage, Resume skips the offending
element while keeping the current group, and Restart drops the in-progress
group. The negative-cost and null-key contract checks are left unchanged
(they remain unconditional failures, since they are output-contract
violations rather than user-function exceptions). The decider is a lazy val
for zero overhead on the happy path. Update the Scala and Java DSL scaladoc
for both operators and the operator reference pages.

Result:
groupedAdjacentBy and groupedAdjacentByWeighted now adhere to the
SupervisionStrategy attribute for both user functions.

Tests:
- sbt "stream-tests/Test/testOnly org.apache.pekko.stream.scaladsl.FlowGroupedAdjacentByWeightedSpec" -- 14/14 passed

References:
Refs apache#3110
Motivation:
The weighted variant's documentation explicitly describes what
Resume and Restart do, but the non-weighted groupedAdjacentBy
only said "adheres to SupervisionStrategy" without the detail.

Modification:
Extend the supervision note in groupedAdjacentBy.md and the
groupedAdjacentBy Scaladoc on FlowOps and all four Java DSL
classes (Flow, Source, SubFlow, SubSource) to state that
Resume skips the offending element while Restart drops the
current in-progress group, matching the weighted variant.

Result:
Both groupedAdjacentBy operators document their supervision
semantics with the same level of detail across all DSLs.

Tests:
- sbt "stream-tests/Test/testOnly org.apache.pekko.stream.scaladsl.FlowGroupedAdjacentByWeightedSpec" (14/14 passed)

References:
Refs apache#3110
@He-Pin He-Pin marked this pull request as ready for review June 26, 2026 21:01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant