Skip to content

feat: add supervision strategy support for throttle costCalculation#3183

Open
He-Pin wants to merge 1 commit into
apache:mainfrom
He-Pin:feat/stream-throttle-supervision
Open

feat: add supervision strategy support for throttle costCalculation#3183
He-Pin wants to merge 1 commit into
apache:mainfrom
He-Pin:feat/stream-throttle-supervision

Conversation

@He-Pin

@He-Pin He-Pin commented Jun 25, 2026

Copy link
Copy Markdown
Member

Motivation

Per the stream error handling docs, operators that apply user-provided functions should consult the configured SupervisionStrategy. The throttle operator's costCalculation function is user-provided and may throw, but it was called without a try-catch, so any exception failed the stream unconditionally, even under Supervision.Resume/Restart.

This is part of the meta-issue #3110 (add supervisor strategy support to stream operators that accept user functions). One operator per PR.

Modification

  • Wrap costCalculation(elem) in Throttle.onPush() with a try/catch (NonFatal) that consults the SupervisionStrategy decider:
    • Stop → fail the stage (unchanged fail-fast behavior)
    • Resume → drop the offending element, keep the token bucket state
    • Restart → behave the same as Resume (throttle keeps no accumulated per-element state, so there is nothing element-specific to reset)
  • The cost calculation is evaluated before tokenBucket.offer(cost), so a throwing function never consumes rate budget.
  • Add a guard in the timer path so a supervised skip cannot dequeue from an empty buffer after an overflow handler has already mutated it.
  • decider is a lazy val → zero overhead on the happy path.
  • Document supervision adherence on the two costCalculation throttle overloads in the Scala/Java DSL scaladoc and the operator reference page. The element-count throttle overloads are intentionally left undocumented because they use a constant cost and never throw.

Result

throttle now adheres to the SupervisionStrategy attribute for its costCalculation function, with no buffer-underflow on supervised skip paths.

Tests

  • sbt "stream-tests/Test/testOnly org.apache.pekko.stream.scaladsl.FlowThrottleSpec"28/28 passed, including 6 new tests: explicit Stop, default Stop, Resume (Shaping), Resume (Enforcing), Restart (same as Resume for throttle), and a last-element Resume regression.
  • sbt "stream/mimaReportBinaryIssues" — clean (binary compatible)

References

Refs #3110, Refs #3101

This is a clean-room implementation written directly for Apache Pekko.

Motivation:
The throttle operator's costCalculation function is user-provided and may throw,
but it was called without a try-catch. Any exception failed the stream
unconditionally, ignoring the configured SupervisionStrategy.

Modification:
Wrap costCalculation(elem) in Throttle.onPush() with a try-catch that consults
the SupervisionStrategy decider. Stop fails the stage, and Resume and Restart
both skip the offending element (throttle keeps no accumulated per-element state,
so Restart behaves the same as Resume; the rate-limiting token bucket is
deliberately not reset). The costCalculation is wrapped before tokenBucket.offer
so a throwing function never consumes rate budget. The decider is a lazy val for
zero overhead on the happy path. Update the Scala and Java DSL scaladoc for the
costCalculation throttle overloads and the operator reference page.

Result:
throttle now adheres to the SupervisionStrategy attribute for its costCalculation
function.

Tests:
- sbt "stream-tests/Test/testOnly org.apache.pekko.stream.scaladsl.FlowThrottleSpec" -- 28/28 passed

References:
Refs apache#3110, Refs apache#3101
@He-Pin He-Pin marked this pull request as ready for review June 26, 2026 20:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant