feat: add supervision strategy support for delayWith#3182
Open
He-Pin wants to merge 1 commit into
Open
Conversation
Motivation: The delayWith operator computes each element's delay via a user-provided DelayStrategy.nextDelay, but it was called without a try-catch. Any exception failed the stream unconditionally, ignoring the configured SupervisionStrategy. Modification: Wrap delayStrategy.nextDelay(element) in Delay.grabAndPull() with a try-catch that consults the SupervisionStrategy decider. Stop fails the stage, Resume drops the offending element while keeping buffered elements and the strategy, and Restart recreates the (potentially stateful) delay strategy from the supplier. The delayStrategy field became a var to allow recreation. Because a dropped element no longer enqueues, onPush and onTimer now guard their buffer access with a non-empty check so a skip cannot dequeue/peek an empty buffer (onTimer previously could NPE when an overflow handler emptied the buffer while a timer was still armed). The decider is a lazy val for zero overhead on the happy path. Update the delayWith Scala/Java DSL scaladoc and the operator reference page. Result: delayWith now adheres to the SupervisionStrategy attribute with well-defined Resume and Restart semantics, with no buffer-underflow on the supervised paths. Tests: - sbt "stream-tests/Test/testOnly org.apache.pekko.stream.scaladsl.FlowDelaySpec" -- 26/26 passed References: Refs apache#3110
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Motivation
Per the stream error handling docs, operators that apply user-provided functions should consult the configured
SupervisionStrategy. ThedelayWithoperator computes each element's delay via a user-providedDelayStrategy.nextDelay, but it was called without a try-catch, so any exception failed the stream unconditionally, even underSupervision.Resume/Restart.This is part of the meta-issue #3110 (add supervisor strategy support to stream operators that accept user functions). One operator per PR.
Modification
delayStrategy.nextDelay(element)inDelay.grabAndPull()with atry/catch(NonFatal) that consults theSupervisionStrategydecider:delayStrategybecame avarso Restart can recreate it.onPushandonTimernow guard their buffer access with a non-empty check. In particular,onTimercould previously hit aNullPointerException(the delay buffer is an unchecked fixed-size buffer) when an overflow handler emptied the buffer while a timer was still armed and anextDelayexception was then skipped. A dedicated regression test reproduces this (it fails without the guard).decideris alazy val→ zero overhead on the happy path.delayWithin the Scala/Java DSL scaladoc and the operator reference page (plaindelayis unaffected — it uses a fixed internal strategy that never throws).Result
delayWithnow adheres to theSupervisionStrategyattribute with well-defined Resume/Restart semantics and no buffer underflow on the supervised paths.Tests
sbt "stream-tests/Test/testOnly org.apache.pekko.stream.scaladsl.FlowDelaySpec"— 26/26 passed, including 6 new tests: Stop, default Stop, Resume (drops element), Restart (recreates the stateful strategy — verified via an instance counter), Resume (keeps the stateful strategy — inverted mirror), and an overflow-handler + throw regression that guards theonTimerunderflow fix.sbt "stream/mimaReportBinaryIssues"— clean (binary compatible)References
Refs #3110
This is a clean-room implementation written directly for Apache Pekko.