Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ Run the given function when the first element is received.

Run the given function when the first element is received.

The `doOnFirst` operator adheres to the ActorAttributes.SupervisionStrategy attribute. On `Supervision.Resume` the failing first element is dropped and the function is not retried; on `Supervision.Restart` the next element is treated as the first.

## Examples

Scala
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,12 @@

package org.apache.pekko.stream.scaladsl

import org.apache.pekko.Done
import org.apache.pekko.stream.testkit._
import java.util.concurrent.atomic.AtomicInteger

import org.apache.pekko
import pekko.Done
import pekko.stream.{ ActorAttributes, Supervision }
import pekko.stream.testkit._

class FlowDoOnFirstSpec extends StreamSpec("""
pekko.stream.materializer.initial-input-buffer-size = 2
Expand All @@ -27,7 +31,7 @@ class FlowDoOnFirstSpec extends StreamSpec("""
"A DoOnFirst" must {

"can only invoke on first" in {
val invoked = new java.util.concurrent.atomic.AtomicInteger(0)
val invoked = new AtomicInteger(0)
Source(1 to 10)
.via(Flow[Int].doOnFirst(invoked.addAndGet))
.runWith(Sink.ignore)
Expand All @@ -37,7 +41,7 @@ class FlowDoOnFirstSpec extends StreamSpec("""
}

"will not invoke on empty stream" in {
val invoked = new java.util.concurrent.atomic.AtomicInteger(0)
val invoked = new AtomicInteger(0)
Source.empty
.via(Flow[Int].doOnFirst(invoked.addAndGet))
.runWith(Sink.ignore)
Expand All @@ -46,6 +50,77 @@ class FlowDoOnFirstSpec extends StreamSpec("""
invoked.get() shouldBe 0
}

"fail the stream when f throws and supervision is Stop" in {
val ex = new RuntimeException("doOnFirst-stop")
val result = Source(1 to 5)
.via(Flow[Int].doOnFirst(_ => throw ex))
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.stoppingDecider))
.runWith(Sink.ignore)

result.failed.futureValue shouldBe ex
}

"fail the stream when f throws and no supervision attribute is set (default Stop)" in {
val ex = new RuntimeException("doOnFirst-default")
val result = Source(1 to 5)
.via(Flow[Int].doOnFirst(_ => throw ex))
.runWith(Sink.ignore)

result.failed.futureValue shouldBe ex
}

"resume drops the first element, does not retry f, and passes the rest through" in {
val invocationCount = new AtomicInteger(0)
val ex = new RuntimeException("doOnFirst-resume")

val result = Source(1 to 5)
.via(Flow[Int].doOnFirst { _ =>
if (invocationCount.incrementAndGet() == 1) throw ex
})
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
.runWith(Sink.seq)
.futureValue

result shouldBe Seq(2, 3, 4, 5)
invocationCount.get() shouldBe 1
}

"restart re-arms so f is retried on the next element" in {
val invocationCount = new AtomicInteger(0)
val ex = new RuntimeException("doOnFirst-restart")

val result = Source(1 to 3)
.via(Flow[Int].doOnFirst { _ =>
invocationCount.incrementAndGet()
throw ex
})
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider))
.runWith(Sink.seq)
.futureValue

result shouldBe Seq.empty
invocationCount.get() shouldBe 3
}

"restart and continue with pass-through once f succeeds on retry" in {
// Element 1 triggers f which throws -> Restart drops element 1 and re-arms.
// Element 2 triggers f which succeeds -> handler switches to pass-through.
// Elements 3 and 4 pass through without invoking f.
val invocationCount = new AtomicInteger(0)
val ex = new RuntimeException("doOnFirst-restart-then-ok")

val result = Source(1 to 4)
.via(Flow[Int].doOnFirst { _ =>
if (invocationCount.incrementAndGet() == 1) throw ex
})
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider))
.runWith(Sink.seq)
.futureValue

result shouldBe Seq(2, 3, 4)
invocationCount.get() shouldBe 2
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@

package org.apache.pekko.stream.impl.fusing

import scala.util.control.NonFatal

import org.apache.pekko
import pekko.annotation.InternalApi
import pekko.stream.ActorAttributes.SupervisionStrategy
import pekko.stream.{ Attributes, FlowShape, Inlet, Outlet }
import pekko.stream.Attributes.SourceLocation
import pekko.stream.Supervision
import pekko.stream.impl.Stages.DefaultAttributes
import pekko.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler }

Expand All @@ -34,20 +38,40 @@ import pekko.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler }

override def initialAttributes: Attributes = DefaultAttributes.doOnFirst and SourceLocation.forLambda(f)

override def createLogic(inheritedAttributes: org.apache.pekko.stream.Attributes) =
override def createLogic(inheritedAttributes: Attributes) =
new GraphStageLogic(shape) with InHandler with OutHandler {
self =>
private lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider

final override def onPush(): Unit = push(out, grab(in))
final override def onPull(): Unit = pull(in)
setHandler(out, this)
setHandler(in,
new InHandler {
override def onPush(): Unit = {
setHandler(in, self)
val elem = grab(in)
f(elem)
push(out, elem)

// Resume consumes the one-shot and switches to pass-through; Restart keeps this handler armed for retry.
val firstHandler: InHandler = new InHandler {
override def onPush(): Unit = {
val elem = grab(in)
try f(elem)
catch {
case NonFatal(ex) =>
decider(ex) match {
case Supervision.Stop =>
failStage(ex)
return
case Supervision.Resume =>
setHandler(in, self)
pull(in)
return
case Supervision.Restart =>
pull(in)
return
}
}
})
setHandler(in, self)
push(out, elem)
}
}

setHandler(in, firstHandler)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1150,6 +1150,10 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
/**
* Run the given function when the first element is received.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. On `Supervision.Resume` the
* first element is dropped and the function is not retried; on `Supervision.Restart` the next
* element is treated as the first.
*
* '''Emits when''' upstream emits an element
*
* '''Backpressures when''' downstream backpressures
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3046,6 +3046,10 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
/**
* Run the given function when the first element is received.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. On `Supervision.Resume` the
* first element is dropped and the function is not retried; on `Supervision.Restart` the next
* element is treated as the first.
*
* '''Emits when''' upstream emits an element
*
* '''Backpressures when''' downstream backpressures
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,10 @@ final class SubFlow[In, Out, Mat](
/**
* Run the given function when the first element is received.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. On `Supervision.Resume` the
* first element is dropped and the function is not retried; on `Supervision.Restart` the next
* element is treated as the first.
*
* '''Emits when''' upstream emits an element
*
* '''Backpressures when''' downstream backpressures
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,10 @@ final class SubSource[Out, Mat](
/**
* Run the given function when the first element is received.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. On `Supervision.Resume` the
* first element is dropped and the function is not retried; on `Supervision.Restart` the next
* element is treated as the first.
*
* '''Emits when''' upstream emits an element
*
* '''Backpressures when''' downstream backpressures
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1594,6 +1594,10 @@ trait FlowOps[+Out, +Mat] {
/**
* Run the given function when the first element is received.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. On `Supervision.Resume` the
* first element is dropped and the function is not retried; on `Supervision.Restart` the next
* element is treated as the first.
*
* '''Emits when''' upstream emits an element
*
* '''Backpressures when''' downstream backpressures
Expand Down