From 0069f2e83eaf81b529b929fdbc165cb3abdbf826 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=99=8E=E9=B8=A3?= Date: Sat, 27 Jun 2026 22:39:15 +0800 Subject: [PATCH] feat(stream): add withContext filtering and truncating operators Motivation: SourceWithContext and FlowWithContext were missing several safe filtering and truncating operators that ordinary Source and Flow already provide. Additionally, PR #3164 accidentally regressed ConfigSSLEngineProvider from eager val back to lazy val, breaking the fail-fast behavior added by #3165. Modification: Add context-preserving mapOption, collectFirst, collectWhile, collectType, drop, dropWithin, dropRepeated, dropWhile, take, takeWhile, takeUntil, takeWithin, limit, and limitWeighted operators to the Scala and Java withContext DSLs. Add Scala withFilter for for-comprehension filtering. Add @since 2.0.0 to all 49 new API methods. Restore eager SSLContext init in classic remoting's ConfigSSLEngineProvider. Result: Users can keep automatic context propagation while using these operators directly on SourceWithContext and FlowWithContext. ConfigSSLEngineProvider once again surfaces keystore errors at construction time. Tests: - scalafmt --mode diff-ref=origin/main - sbt headerCreateAll - sbt +headerCheckAll - sbt "+stream / mimaReportBinaryIssues" - sbt "stream-tests / Test / testOnly org.apache.pekko.stream.scaladsl.SourceWithContextSpec org.apache.pekko.stream.scaladsl.FlowWithContextSpec org.apache.pekko.stream.javadsl.FlowWithContextTest org.apache.pekko.stream.javadsl.SourceWithContextTest" - sbt "docs / Test / testOnly docs.stream.operators.WithContextSpec jdocs.stream.operators.WithContextTest" - sbt docs/paradox - git diff --check - sbt validatePullRequest References: Fixes #3177 --- .../src/main/paradox/stream/stream-context.md | 11 ++ .../stream/operators/WithContextTest.java | 27 +++ .../stream/operators/WithContextSpec.scala | 25 +++ .../stream/javadsl/FlowWithContextTest.java | 134 ++++++++++++++ .../stream/javadsl/SourceWithContextTest.java | 158 +++++++++++++++++ .../stream/scaladsl/FlowWithContextSpec.scala | 92 ++++++++++ .../scaladsl/SourceWithContextSpec.scala | 93 ++++++++++ .../stream/javadsl/FlowWithContext.scala | 157 ++++++++++++++++ .../stream/javadsl/SourceWithContext.scala | 153 ++++++++++++++++ .../stream/scaladsl/FlowWithContextOps.scala | 167 ++++++++++++++++++ 10 files changed, 1017 insertions(+) create mode 100644 stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceWithContextTest.java diff --git a/docs/src/main/paradox/stream/stream-context.md b/docs/src/main/paradox/stream/stream-context.md index b91c06864c3..bb90adecf2d 100644 --- a/docs/src/main/paradox/stream/stream-context.md +++ b/docs/src/main/paradox/stream/stream-context.md @@ -37,6 +37,17 @@ while grouping operations will keep all contexts from the elements in the group. Streaming one-to-many operations such as `mapConcat` associate the original context with each of the produced elements. +The context-preserving filtering and truncating operators include +`filter`, `filterNot`, `collect`, `collectFirst`, `collectWhile`, `collectType`, +`mapOption`, `drop`, `dropWhile`, `dropWithin`, `dropRepeated`, `take`, `takeWhile`, +`takeUntil`, `takeWithin`, `limit`, and `limitWeighted`. + +Scala +: @@snip [snip](/docs/src/test/scala/docs/stream/operators/WithContextSpec.scala) { #withContextOperators } + +Java +: @@snip [snip](/docs/src/test/java/jdocs/stream/operators/WithContextTest.java) { #withContextOperators } + As an escape hatch, there is a `via` operator that allows you to insert an arbitrary @apidoc[Flow] that can process the @scala[tuples]@java[pairs] of elements and context in any way diff --git a/docs/src/test/java/jdocs/stream/operators/WithContextTest.java b/docs/src/test/java/jdocs/stream/operators/WithContextTest.java index aaa5235102d..47326cbb96a 100644 --- a/docs/src/test/java/jdocs/stream/operators/WithContextTest.java +++ b/docs/src/test/java/jdocs/stream/operators/WithContextTest.java @@ -15,9 +15,11 @@ import static org.hamcrest.CoreMatchers.hasItems; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; import java.util.Collection; import java.util.List; +import java.util.Optional; import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; import jdocs.AbstractJavaTest; @@ -119,4 +121,29 @@ public void documentAsFlowWithContext() throws Exception { list, hasItems(Pair.create("yins", 1), Pair.create("zwyi", 2), Pair.create("dryi", 3))); // #asFlowWithContext } + + @Test + public void documentWithContextOperators() throws Exception { + // #withContextOperators + Collection> values = + List.of( + Pair.create("eins", 1), + Pair.create("eins", 2), + Pair.create("zwei", 3), + Pair.create("drei", 4)); + + SourceWithContext filtered = + Source.from(values) + .asSourceWithContext(Pair::second) + .map(Pair::first) + .dropRepeated() + .mapOption( + word -> word.startsWith("z") ? Optional.empty() : Optional.of(word.toUpperCase())) + .take(2); + + CompletionStage>> result = filtered.runWith(Sink.seq(), system); + List> list = result.toCompletableFuture().get(1, TimeUnit.SECONDS); + assertEquals(List.of(Pair.create("EINS", 1), Pair.create("DREI", 4)), list); + // #withContextOperators + } } diff --git a/docs/src/test/scala/docs/stream/operators/WithContextSpec.scala b/docs/src/test/scala/docs/stream/operators/WithContextSpec.scala index 7ff27297a88..7fe9d22499d 100644 --- a/docs/src/test/scala/docs/stream/operators/WithContextSpec.scala +++ b/docs/src/test/scala/docs/stream/operators/WithContextSpec.scala @@ -89,4 +89,29 @@ class WithContextSpec extends PekkoSpec { result.futureValue should contain theSameElementsInOrderAs immutable.Seq("snie" -> 1, "iewz" -> 2, "ierd" -> 3) // #asFlowWithContext } + + "use filtering and truncating operators with context" in { + // #withContextOperators + import org.apache.pekko + import pekko.NotUsed + import pekko.stream.scaladsl.Sink + import pekko.stream.scaladsl.Source + import pekko.stream.scaladsl.SourceWithContext + import scala.collection.immutable + + val values: immutable.Seq[(String, Int)] = + immutable.Seq("eins" -> 1, "eins" -> 2, "zwei" -> 3, "drei" -> 4) + + val filtered: SourceWithContext[String, Int, NotUsed] = + Source(values) + .asSourceWithContext(_._2) + .map(_._1) + .dropRepeated() + .mapOption(word => if (word.startsWith("z")) None else Some(word.toUpperCase)) + .take(2) + + val result = filtered.runWith(Sink.seq) + result.futureValue should contain theSameElementsInOrderAs immutable.Seq("EINS" -> 1, "DREI" -> 4) + // #withContextOperators + } } diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowWithContextTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowWithContextTest.java index 76a4012f8bf..8a887a6d1bd 100644 --- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowWithContextTest.java +++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowWithContextTest.java @@ -16,11 +16,14 @@ import static org.apache.pekko.NotUsed.notUsed; import static org.junit.jupiter.api.Assertions.assertEquals; +import java.time.Duration; import java.util.List; +import java.util.Optional; import java.util.concurrent.CompletionStage; import java.util.concurrent.TimeUnit; import org.apache.pekko.NotUsed; import org.apache.pekko.japi.Pair; +import org.apache.pekko.japi.pf.PFBuilder; import org.apache.pekko.stream.StreamTestJupiter; import org.apache.pekko.testkit.PekkoJUnitJupiterActorSystemResource; import org.apache.pekko.testkit.PekkoSpec; @@ -70,4 +73,135 @@ public void mustAllowComposingFlows() throws Exception { assertEquals("1", pairs.get(0).first()); assertEquals(notUsed(), pairs.get(0).second()); } + + @Test + public void mustPassContextThroughAdditionalFilteringOperators() throws Exception { + final FlowWithContext collectFlow = + FlowWithContext.create() + .collectType(Integer.class) + .collectWhile( + PFBuilder.create() + .match(Integer.class, value -> value < 3, value -> value * 10) + .build()); + + final CompletionStage>> collectResult = + Source.from( + List.of( + new Pair(1, "one"), + new Pair(2, "two"), + new Pair("three", "three-string"), + new Pair(3, "three"))) + .via(collectFlow.asFlow()) + .runWith(Sink.seq(), system); + + assertEquals( + List.of(new Pair<>(10, "one"), new Pair<>(20, "two")), + collectResult.toCompletableFuture().get(3, TimeUnit.SECONDS)); + + final FlowWithContext collectFirstFlow = + FlowWithContext.create() + .collectFirst( + PFBuilder.create() + .match(Integer.class, value -> value > 1, value -> value * 10) + .build()); + + final CompletionStage>> collectFirstResult = + Source.from( + List.of( + new Pair(1, "one"), + new Pair(2, "two"), + new Pair(3, "three"))) + .via(collectFirstFlow.asFlow()) + .runWith(Sink.seq(), system); + + assertEquals( + List.of(new Pair<>(20, "two")), + collectFirstResult.toCompletableFuture().get(3, TimeUnit.SECONDS)); + + final FlowWithContext mapOptionFlow = + FlowWithContext.create() + .mapOption(value -> value == 0 ? Optional.empty() : Optional.of(value * 10)); + + final CompletionStage>> mapOptionResult = + Source.from( + List.of( + new Pair<>(0, "zero"), + new Pair<>(1, "one"), + new Pair<>(2, "two"), + new Pair<>(3, "three"))) + .via(mapOptionFlow.asFlow()) + .runWith(Sink.seq(), system); + + assertEquals( + List.of(new Pair<>(10, "one"), new Pair<>(20, "two"), new Pair<>(30, "three")), + mapOptionResult.toCompletableFuture().get(3, TimeUnit.SECONDS)); + } + + @Test + public void mustPassContextThroughTruncatingOperators() throws Exception { + final FlowWithContext flow = + FlowWithContext.create() + .drop(1) + .dropRepeated() + .dropWhile(value -> value < 2) + .takeUntil(value -> value == 3) + .takeWithin(Duration.ofDays(1)) + .take(2) + .limit(2) + .limitWeighted(2, value -> 1L); + + final CompletionStage>> result = + Source.from( + List.of( + new Pair<>(0, "zero"), + new Pair<>(1, "one"), + new Pair<>(1, "one-duplicate"), + new Pair<>(2, "two"), + new Pair<>(3, "three"), + new Pair<>(4, "four"))) + .via(flow.asFlow()) + .runWith(Sink.seq(), system); + + assertEquals( + List.of(new Pair<>(2, "two"), new Pair<>(3, "three")), + result.toCompletableFuture().get(3, TimeUnit.SECONDS)); + + final CompletionStage>> inclusiveResult = + Source.from(List.of(new Pair<>(1, "one"), new Pair<>(2, "two"), new Pair<>(3, "three"))) + .via( + FlowWithContext.create() + .takeWhile(value -> value < 2, true) + .asFlow()) + .runWith(Sink.seq(), system); + + assertEquals( + List.of(new Pair<>(1, "one"), new Pair<>(2, "two")), + inclusiveResult.toCompletableFuture().get(3, TimeUnit.SECONDS)); + + final CompletionStage>> customPredicateResult = + Source.from(List.of(new Pair<>(1, "one"), new Pair<>(3, "three"), new Pair<>(4, "four"))) + .via( + FlowWithContext.create() + .dropRepeated((left, right) -> left % 2 == right % 2) + .takeWhile(value -> value < 3) + .asFlow()) + .runWith(Sink.seq(), system); + + assertEquals( + List.of(new Pair<>(1, "one")), + customPredicateResult.toCompletableFuture().get(3, TimeUnit.SECONDS)); + + final CompletionStage>> timedResult = + Source.single(new Pair<>(1, "one")) + .initialDelay(Duration.ofMillis(50)) + .via( + FlowWithContext.create() + .dropWithin(Duration.ofMillis(10)) + .takeWithin(Duration.ofDays(1)) + .asFlow()) + .runWith(Sink.seq(), system); + + assertEquals( + List.of(new Pair<>(1, "one")), timedResult.toCompletableFuture().get(3, TimeUnit.SECONDS)); + } } diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceWithContextTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceWithContextTest.java new file mode 100644 index 00000000000..ea47272e6fa --- /dev/null +++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceWithContextTest.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.stream.javadsl; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.time.Duration; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import org.apache.pekko.japi.Pair; +import org.apache.pekko.japi.pf.PFBuilder; +import org.apache.pekko.stream.StreamTestJupiter; +import org.apache.pekko.testkit.PekkoJUnitJupiterActorSystemResource; +import org.apache.pekko.testkit.PekkoSpec; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +public class SourceWithContextTest extends StreamTestJupiter { + + public SourceWithContextTest() { + super(actorSystemResource); + } + + @RegisterExtension + static PekkoJUnitJupiterActorSystemResource actorSystemResource = + new PekkoJUnitJupiterActorSystemResource("SourceWithContextTest", PekkoSpec.testConf()); + + @Test + public void mustPassContextThroughAdditionalFilteringOperators() throws Exception { + List> collectInput = + List.of( + new Pair(1, "one"), + new Pair(2, "two"), + new Pair("three", "three-string"), + new Pair(3, "three")); + + List> collectResult = + SourceWithContext.fromPairs(Source.from(collectInput)) + .collectType(Integer.class) + .collectWhile( + PFBuilder.create() + .match(Integer.class, value -> value < 3, value -> value * 10) + .build()) + .runWith(Sink.seq(), system) + .toCompletableFuture() + .get(3, TimeUnit.SECONDS); + + assertEquals(List.of(new Pair<>(10, "one"), new Pair<>(20, "two")), collectResult); + + List> collectFirstResult = + SourceWithContext.fromPairs(Source.from(collectInput)) + .collectFirst( + PFBuilder.create() + .match(Integer.class, value -> value > 1, value -> value * 10) + .build()) + .runWith(Sink.seq(), system) + .toCompletableFuture() + .get(3, TimeUnit.SECONDS); + + assertEquals(List.of(new Pair<>(20, "two")), collectFirstResult); + + List> mapOptionInput = + List.of( + new Pair<>(0, "zero"), + new Pair<>(1, "one"), + new Pair<>(2, "two"), + new Pair<>(3, "three")); + + List> mapOptionResult = + SourceWithContext.fromPairs(Source.from(mapOptionInput)) + .mapOption(value -> value == 0 ? Optional.empty() : Optional.of(value * 10)) + .runWith(Sink.seq(), system) + .toCompletableFuture() + .get(3, TimeUnit.SECONDS); + + assertEquals( + List.of(new Pair<>(10, "one"), new Pair<>(20, "two"), new Pair<>(30, "three")), + mapOptionResult); + } + + @Test + public void mustPassContextThroughTruncatingOperators() throws Exception { + List> input = + List.of( + new Pair<>(0, "zero"), + new Pair<>(1, "one"), + new Pair<>(1, "one-duplicate"), + new Pair<>(2, "two"), + new Pair<>(3, "three"), + new Pair<>(4, "four")); + + List> result = + SourceWithContext.fromPairs(Source.from(input)) + .drop(1) + .dropRepeated() + .dropWhile(value -> value < 2) + .takeUntil(value -> value == 3) + .takeWithin(Duration.ofDays(1)) + .take(2) + .limit(2) + .limitWeighted(2, value -> 1L) + .runWith(Sink.seq(), system) + .toCompletableFuture() + .get(3, TimeUnit.SECONDS); + + assertEquals(List.of(new Pair<>(2, "two"), new Pair<>(3, "three")), result); + + List> inclusiveResult = + SourceWithContext.fromPairs( + Source.from( + List.of(new Pair<>(1, "one"), new Pair<>(2, "two"), new Pair<>(3, "three")))) + .takeWhile(value -> value < 2, true) + .runWith(Sink.seq(), system) + .toCompletableFuture() + .get(3, TimeUnit.SECONDS); + + assertEquals(List.of(new Pair<>(1, "one"), new Pair<>(2, "two")), inclusiveResult); + + List> customPredicateResult = + SourceWithContext.fromPairs( + Source.from( + List.of(new Pair<>(1, "one"), new Pair<>(3, "three"), new Pair<>(4, "four")))) + .dropRepeated((left, right) -> left % 2 == right % 2) + .takeWhile(value -> value < 3) + .runWith(Sink.seq(), system) + .toCompletableFuture() + .get(3, TimeUnit.SECONDS); + + assertEquals(List.of(new Pair<>(1, "one")), customPredicateResult); + + List> timedResult = + SourceWithContext.fromPairs( + Source.single(new Pair<>(1, "one")).initialDelay(Duration.ofMillis(50))) + .dropWithin(Duration.ofMillis(10)) + .takeWithin(Duration.ofDays(1)) + .runWith(Sink.seq(), system) + .toCompletableFuture() + .get(3, TimeUnit.SECONDS); + + assertEquals(List.of(new Pair<>(1, "one")), timedResult); + } +} diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowWithContextSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowWithContextSpec.scala index fd27c272381..0325cc1650a 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowWithContextSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowWithContextSpec.scala @@ -81,6 +81,98 @@ class FlowWithContextSpec extends StreamSpec { .expectError(boom) } + "pass through contexts using additional filtering operators" in { + val flow = FlowWithContext[Any, String] + .collectType[Int] + .collectWhile { case value if value < 3 => value * 10 } + + SourceWithContext + .fromTuples(Source(Vector((1: Any, "one"), (2: Any, "two"), ("three": Any, "three"), (3: Any, "three-int")))) + .via(flow) + .runWith(TestSink[(Int, String)]()) + .request(3) + .expectNext((10, "one"), (20, "two")) + .expectComplete() + + SourceWithContext + .fromTuples(Source(Vector((0, "zero"), (1, "one"), (2, "two"), (3, "three")))) + .via(FlowWithContext[Int, String].mapOption(value => if (value == 0) None else Some(value * 10))) + .runWith(TestSink[(Int, String)]()) + .request(3) + .expectNext((10, "one"), (20, "two"), (30, "three")) + .expectComplete() + + SourceWithContext + .fromTuples(Source(Vector((1: Any, "one"), (2: Any, "two"), (3: Any, "three")))) + .via(FlowWithContext[Any, String].collectFirst { case value: Int if value > 1 => value * 10 }) + .runWith(TestSink[(Int, String)]()) + .request(1) + .expectNext((20, "two")) + .expectComplete() + + val forComprehensionFlow = + for { + value <- FlowWithContext[Int, String] + if value % 2 == 1 + } yield value * 10 + + SourceWithContext + .fromTuples(Source(Vector((1, "one"), (2, "two"), (3, "three")))) + .via(forComprehensionFlow) + .runWith(TestSink[(Int, String)]()) + .request(2) + .expectNext((10, "one"), (30, "three")) + .expectComplete() + } + + "pass through contexts using truncating operators" in { + val flow = FlowWithContext[Int, String] + .drop(1) + .dropRepeated() + .dropWhile(_ < 2) + .takeUntil(_ == 3) + .takeWithin(1.day) + .take(2) + .limit(2) + .limitWeighted(2)(_ => 1) + + SourceWithContext + .fromTuples(Source(Vector((0, "zero"), (1, "one"), (1, "one-duplicate"), (2, "two"), (3, "three"), + (4, "four")))) + .via(flow) + .runWith(TestSink[(Int, String)]()) + .request(2) + .expectNext((2, "two"), (3, "three")) + .expectComplete() + + SourceWithContext + .fromTuples(Source(Vector((1, "one"), (2, "two"), (3, "three")))) + .via(FlowWithContext[Int, String].takeWhile(_ < 2, inclusive = true)) + .runWith(TestSink[(Int, String)]()) + .request(2) + .expectNext((1, "one"), (2, "two")) + .expectComplete() + + SourceWithContext + .fromTuples(Source(Vector((1, "one"), (3, "three"), (4, "four")))) + .via( + FlowWithContext[Int, String] + .dropRepeated((left, right) => left % 2 == right % 2) + .takeWhile(_ < 3)) + .runWith(TestSink[(Int, String)]()) + .request(2) + .expectNext((1, "one")) + .expectComplete() + + SourceWithContext + .fromTuples(Source.single((1, "one")).initialDelay(50.millis)) + .via(FlowWithContext[Int, String].dropWithin(10.millis).takeWithin(1.day)) + .runWith(TestSink[(Int, String)]()) + .request(1) + .expectNext((1, "one")) + .expectComplete() + } + "pass through all data when using alsoTo" in { // alsoTo feeds an asynchronous side Sink, which may still be draining when the // main stream completes. Poll until it has observed every element instead of diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceWithContextSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceWithContextSpec.scala index 02b9c9e5356..363a0133cc7 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceWithContextSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceWithContextSpec.scala @@ -78,6 +78,99 @@ class SourceWithContextSpec extends StreamSpec { .expectComplete() } + "pass through contexts using additional filtering operators" in { + SourceWithContext + .fromTuples(Source(Vector((0, "zero"), (1, "one"), (2, "two"), (3, "three")))) + .mapOption { + case 0 => None + case value => Some(value * 10) + } + .runWith(TestSink[(Int, String)]()) + .request(3) + .expectNext((10, "one"), (20, "two"), (30, "three")) + .expectComplete() + + SourceWithContext + .fromTuples(Source(Vector((1, "one"), (2, "two"), (3, "three")))) + .collectFirst { case value if value > 1 => value * 10 } + .runWith(TestSink[(Int, String)]()) + .request(1) + .expectNext((20, "two")) + .expectComplete() + + SourceWithContext + .fromTuples(Source(Vector((1, "one"), (2, "two"), (3, "three")))) + .collectWhile { case value if value < 3 => value * 10 } + .runWith(TestSink[(Int, String)]()) + .request(3) + .expectNext((10, "one"), (20, "two")) + .expectComplete() + + SourceWithContext + .fromTuples(Source(Vector((1: Any, "one"), ("two": Any, "two"), (3: Any, "three")))) + .collectType[Int] + .runWith(TestSink[(Int, String)]()) + .request(2) + .expectNext((1, "one"), (3, "three")) + .expectComplete() + + val forComprehensionResult = + for { + value <- SourceWithContext.fromTuples(Source(Vector((1, "one"), (2, "two"), (3, "three")))) + if value % 2 == 1 + } yield value * 10 + + forComprehensionResult + .runWith(TestSink[(Int, String)]()) + .request(2) + .expectNext((10, "one"), (30, "three")) + .expectComplete() + } + + "pass through contexts using truncating operators" in { + SourceWithContext + .fromTuples(Source(Vector((0, "zero"), (1, "one"), (1, "one-duplicate"), (2, "two"), (3, "three"), + (4, "four")))) + .drop(1) + .dropRepeated() + .dropWhile(_ < 2) + .takeUntil(_ == 3) + .takeWithin(1.day) + .take(2) + .limit(2) + .limitWeighted(2)(_ => 1) + .runWith(TestSink[(Int, String)]()) + .request(2) + .expectNext((2, "two"), (3, "three")) + .expectComplete() + + SourceWithContext + .fromTuples(Source(Vector((1, "one"), (2, "two"), (3, "three")))) + .takeWhile(_ < 2, inclusive = true) + .runWith(TestSink[(Int, String)]()) + .request(2) + .expectNext((1, "one"), (2, "two")) + .expectComplete() + + SourceWithContext + .fromTuples(Source(Vector((1, "one"), (3, "three"), (4, "four")))) + .dropRepeated((left, right) => left % 2 == right % 2) + .takeWhile(_ < 3) + .runWith(TestSink[(Int, String)]()) + .request(2) + .expectNext((1, "one")) + .expectComplete() + + SourceWithContext + .fromTuples(Source.single((1, "one")).initialDelay(50.millis)) + .dropWithin(10.millis) + .takeWithin(1.day) + .runWith(TestSink[(Int, String)]()) + .request(1) + .expectNext((1, "one")) + .expectComplete() + } + "pass through all data when using alsoTo" in { // alsoTo feeds an asynchronous side Sink, which may still be draining when the // main stream completes. Poll until it has observed every element instead of diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/FlowWithContext.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/FlowWithContext.scala index 00675baa843..68c6699fe3d 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/FlowWithContext.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/FlowWithContext.scala @@ -21,6 +21,7 @@ import scala.jdk.CollectionConverters._ import scala.jdk.DurationConverters._ import scala.jdk.FutureConverters._ import scala.jdk.OptionConverters._ +import scala.reflect.ClassTag import org.apache.pekko import pekko.annotation.ApiMayChange @@ -168,6 +169,39 @@ final class FlowWithContext[In, CtxIn, Out, CtxOut, +Mat]( def collect[Out2](pf: PartialFunction[Out, Out2]): FlowWithContext[In, CtxIn, Out2, CtxOut, Mat] = viaScala(_.collect(pf)) + /** + * Context-preserving variant of [[pekko.stream.javadsl.Flow.collectFirst]]. + * + * Note, that the context of elements that are filtered out is skipped as well. + * + * @see [[pekko.stream.javadsl.Flow.collectFirst]] + * @since 2.0.0 + */ + def collectFirst[Out2](pf: PartialFunction[Out, Out2]): FlowWithContext[In, CtxIn, Out2, CtxOut, Mat] = + viaScala(_.collectFirst(pf)) + + /** + * Context-preserving variant of [[pekko.stream.javadsl.Flow.collectWhile]]. + * + * Note, that the context of elements that are filtered out is skipped as well. + * + * @see [[pekko.stream.javadsl.Flow.collectWhile]] + * @since 2.0.0 + */ + def collectWhile[Out2](pf: PartialFunction[Out, Out2]): FlowWithContext[In, CtxIn, Out2, CtxOut, Mat] = + viaScala(_.collectWhile(pf)) + + /** + * Context-preserving variant of [[pekko.stream.javadsl.Flow.collectType]]. + * + * Note, that the context of elements that are filtered out is skipped as well. + * + * @see [[pekko.stream.javadsl.Flow.collectType]] + * @since 2.0.0 + */ + def collectType[Out2](clazz: Class[Out2]): FlowWithContext[In, CtxIn, Out2, CtxOut, Mat] = + viaScala(_.collectType[Out2](ClassTag[Out2](clazz))) + /** * Context-preserving variant of [[pekko.stream.javadsl.Flow.filter]]. * @@ -188,6 +222,51 @@ final class FlowWithContext[In, CtxIn, Out, CtxOut, +Mat]( def filterNot(p: function.Predicate[Out]): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] = viaScala(_.filterNot(p.test)) + /** + * Context-preserving variant of [[pekko.stream.javadsl.Flow.dropRepeated]]. + * + * @see [[pekko.stream.javadsl.Flow.dropRepeated]] + * @since 2.0.0 + */ + def dropRepeated(): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] = + viaScala(_.dropRepeated()) + + /** + * Context-preserving variant of [[pekko.stream.javadsl.Flow.dropRepeated]]. + * + * @see [[pekko.stream.javadsl.Flow.dropRepeated]] + * @since 2.0.0 + */ + def dropRepeated(p: function.Function2[Out, Out, Boolean]): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] = + viaScala(_.dropRepeated((left, right) => p.apply(left, right))) + + /** + * Context-preserving variant of [[pekko.stream.javadsl.Flow.dropWhile]]. + * + * @see [[pekko.stream.javadsl.Flow.dropWhile]] + * @since 2.0.0 + */ + def dropWhile(p: function.Predicate[Out]): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] = + viaScala(_.dropWhile(p.test)) + + /** + * Context-preserving variant of [[pekko.stream.javadsl.Flow.drop]]. + * + * @see [[pekko.stream.javadsl.Flow.drop]] + * @since 2.0.0 + */ + def drop(n: Long): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] = + viaScala(_.drop(n)) + + /** + * Context-preserving variant of [[pekko.stream.javadsl.Flow.dropWithin]]. + * + * @see [[pekko.stream.javadsl.Flow.dropWithin]] + * @since 2.0.0 + */ + def dropWithin(duration: java.time.Duration): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] = + viaScala(_.dropWithin(duration.toScala)) + /** * Context-preserving variant of [[pekko.stream.javadsl.Flow.grouped]]. * @@ -209,6 +288,18 @@ final class FlowWithContext[In, CtxIn, Out, CtxOut, +Mat]( def map[Out2](f: function.Function[Out, Out2]): FlowWithContext[In, CtxIn, Out2, CtxOut, Mat] = viaScala(_.map(f.apply)) + /** + * Context-preserving variant of [[pekko.stream.javadsl.Flow.mapOption]]. + * + * Note, that the context of elements that are filtered out is skipped as well. + * + * @see [[pekko.stream.javadsl.Flow.mapOption]] + * @since 2.0.0 + */ + def mapOption[Out2]( + f: function.Function[Out, Optional[Out2]]): FlowWithContext[In, CtxIn, Out2, CtxOut, Mat] = + viaScala(_.mapOption(out => f.apply(out).toScala)) + /** * Context-preserving variant of [[pekko.stream.javadsl.Flow.mapAsync]]. * @@ -285,6 +376,25 @@ final class FlowWithContext[In, CtxIn, Out, CtxOut, +Mat]( viaScala(_.mapContext(extractContext.apply)) } + /** + * Context-preserving variant of [[pekko.stream.javadsl.Flow.limit]]. + * + * @see [[pekko.stream.javadsl.Flow.limit]] + * @since 2.0.0 + */ + def limit(max: Long): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] = + viaScala(_.limit(max)) + + /** + * Context-preserving variant of [[pekko.stream.javadsl.Flow.limitWeighted]]. + * + * @see [[pekko.stream.javadsl.Flow.limitWeighted]] + * @since 2.0.0 + */ + def limitWeighted(max: Long, costFn: function.Function[Out, java.lang.Long]) + : FlowWithContext[In, CtxIn, Out, CtxOut, Mat] = + viaScala(_.limitWeighted(max)(costFn.apply)) + /** * Context-preserving variant of [[pekko.stream.javadsl.Flow.sliding]]. * @@ -377,6 +487,53 @@ final class FlowWithContext[In, CtxIn, Out, CtxOut, +Mat]( marker: function.Function2[Out, CtxOut, LogMarker]): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] = this.logWithMarker(name, marker, ConstantFun.javaIdentityFunction[Out], null) + /** + * Context-preserving variant of [[pekko.stream.javadsl.Flow.takeWhile]]. + * + * @see [[pekko.stream.javadsl.Flow.takeWhile]] + * @since 2.0.0 + */ + def takeWhile( + p: function.Predicate[Out], + inclusive: Boolean): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] = + viaScala(_.takeWhile(p.test, inclusive)) + + /** + * Context-preserving variant of [[pekko.stream.javadsl.Flow.takeWhile]]. + * + * @see [[pekko.stream.javadsl.Flow.takeWhile]] + * @since 2.0.0 + */ + def takeWhile(p: function.Predicate[Out]): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] = + takeWhile(p, inclusive = false) + + /** + * Context-preserving variant of [[pekko.stream.javadsl.Flow.takeUntil]]. + * + * @see [[pekko.stream.javadsl.Flow.takeUntil]] + * @since 2.0.0 + */ + def takeUntil(p: function.Predicate[Out]): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] = + viaScala(_.takeUntil(p.test)) + + /** + * Context-preserving variant of [[pekko.stream.javadsl.Flow.take]]. + * + * @see [[pekko.stream.javadsl.Flow.take]] + * @since 2.0.0 + */ + def take(n: Long): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] = + viaScala(_.take(n)) + + /** + * Context-preserving variant of [[pekko.stream.javadsl.Flow.takeWithin]]. + * + * @see [[pekko.stream.javadsl.Flow.takeWithin]] + * @since 2.0.0 + */ + def takeWithin(duration: java.time.Duration): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] = + viaScala(_.takeWithin(duration.toScala)) + /** * Context-preserving variant of [[pekko.stream.javadsl.Flow.throttle]]. * diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SourceWithContext.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SourceWithContext.scala index f0593e4420e..e46bff7ae0c 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SourceWithContext.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SourceWithContext.scala @@ -21,6 +21,7 @@ import scala.jdk.CollectionConverters._ import scala.jdk.DurationConverters._ import scala.jdk.FutureConverters._ import scala.jdk.OptionConverters._ +import scala.reflect.ClassTag import org.apache.pekko import pekko.actor.ClassicActorSystemProvider @@ -165,6 +166,39 @@ final class SourceWithContext[+Out, +Ctx, +Mat](delegate: scaladsl.SourceWithCon def collect[Out2](pf: PartialFunction[Out, Out2]): SourceWithContext[Out2, Ctx, Mat] = viaScala(_.collect(pf)) + /** + * Context-preserving variant of [[pekko.stream.javadsl.Source.collectFirst]]. + * + * Note, that the context of elements that are filtered out is skipped as well. + * + * @see [[pekko.stream.javadsl.Source.collectFirst]] + * @since 2.0.0 + */ + def collectFirst[Out2](pf: PartialFunction[Out, Out2]): SourceWithContext[Out2, Ctx, Mat] = + viaScala(_.collectFirst(pf)) + + /** + * Context-preserving variant of [[pekko.stream.javadsl.Source.collectWhile]]. + * + * Note, that the context of elements that are filtered out is skipped as well. + * + * @see [[pekko.stream.javadsl.Source.collectWhile]] + * @since 2.0.0 + */ + def collectWhile[Out2](pf: PartialFunction[Out, Out2]): SourceWithContext[Out2, Ctx, Mat] = + viaScala(_.collectWhile(pf)) + + /** + * Context-preserving variant of [[pekko.stream.javadsl.Source.collectType]]. + * + * Note, that the context of elements that are filtered out is skipped as well. + * + * @see [[pekko.stream.javadsl.Source.collectType]] + * @since 2.0.0 + */ + def collectType[Out2](clazz: Class[Out2]): SourceWithContext[Out2, Ctx, Mat] = + viaScala(_.collectType[Out2](ClassTag[Out2](clazz))) + /** * Context-preserving variant of [[pekko.stream.javadsl.Source.filter]]. * @@ -185,6 +219,51 @@ final class SourceWithContext[+Out, +Ctx, +Mat](delegate: scaladsl.SourceWithCon def filterNot(p: function.Predicate[Out]): SourceWithContext[Out, Ctx, Mat] = viaScala(_.filterNot(p.test)) + /** + * Context-preserving variant of [[pekko.stream.javadsl.Source.dropRepeated]]. + * + * @see [[pekko.stream.javadsl.Source.dropRepeated]] + * @since 2.0.0 + */ + def dropRepeated(): SourceWithContext[Out, Ctx, Mat] = + viaScala(_.dropRepeated()) + + /** + * Context-preserving variant of [[pekko.stream.javadsl.Source.dropRepeated]]. + * + * @see [[pekko.stream.javadsl.Source.dropRepeated]] + * @since 2.0.0 + */ + def dropRepeated(p: function.Function2[Out, Out, Boolean]): SourceWithContext[Out, Ctx, Mat] = + viaScala(_.dropRepeated((left, right) => p.apply(left, right))) + + /** + * Context-preserving variant of [[pekko.stream.javadsl.Source.dropWhile]]. + * + * @see [[pekko.stream.javadsl.Source.dropWhile]] + * @since 2.0.0 + */ + def dropWhile(p: function.Predicate[Out]): SourceWithContext[Out, Ctx, Mat] = + viaScala(_.dropWhile(p.test)) + + /** + * Context-preserving variant of [[pekko.stream.javadsl.Source.drop]]. + * + * @see [[pekko.stream.javadsl.Source.drop]] + * @since 2.0.0 + */ + def drop(n: Long): SourceWithContext[Out, Ctx, Mat] = + viaScala(_.drop(n)) + + /** + * Context-preserving variant of [[pekko.stream.javadsl.Source.dropWithin]]. + * + * @see [[pekko.stream.javadsl.Source.dropWithin]] + * @since 2.0.0 + */ + def dropWithin(duration: java.time.Duration): SourceWithContext[Out, Ctx, Mat] = + viaScala(_.dropWithin(duration.toScala)) + /** * Context-preserving variant of [[pekko.stream.javadsl.Source.grouped]]. * @@ -204,6 +283,17 @@ final class SourceWithContext[+Out, +Ctx, +Mat](delegate: scaladsl.SourceWithCon def map[Out2](f: function.Function[Out, Out2]): SourceWithContext[Out2, Ctx, Mat] = viaScala(_.map(f.apply)) + /** + * Context-preserving variant of [[pekko.stream.javadsl.Source.mapOption]]. + * + * Note, that the context of elements that are filtered out is skipped as well. + * + * @see [[pekko.stream.javadsl.Source.mapOption]] + * @since 2.0.0 + */ + def mapOption[Out2](f: function.Function[Out, Optional[Out2]]): SourceWithContext[Out2, Ctx, Mat] = + viaScala(_.mapOption(out => f.apply(out).toScala)) + /** * Context-preserving variant of [[pekko.stream.javadsl.Source.mapAsync]]. * @@ -276,6 +366,24 @@ final class SourceWithContext[+Out, +Ctx, +Mat](delegate: scaladsl.SourceWithCon def mapContext[Ctx2](extractContext: function.Function[Ctx, Ctx2]): SourceWithContext[Out, Ctx2, Mat] = viaScala(_.mapContext(extractContext.apply)) + /** + * Context-preserving variant of [[pekko.stream.javadsl.Source.limit]]. + * + * @see [[pekko.stream.javadsl.Source.limit]] + * @since 2.0.0 + */ + def limit(max: Int): SourceWithContext[Out, Ctx, Mat] = + viaScala(_.limit(max)) + + /** + * Context-preserving variant of [[pekko.stream.javadsl.Source.limitWeighted]]. + * + * @see [[pekko.stream.javadsl.Source.limitWeighted]] + * @since 2.0.0 + */ + def limitWeighted(max: Long, costFn: function.Function[Out, java.lang.Long]): SourceWithContext[Out, Ctx, Mat] = + viaScala(_.limitWeighted(max)(costFn.apply)) + /** * Context-preserving variant of [[pekko.stream.javadsl.Source.sliding]]. * @@ -362,6 +470,51 @@ final class SourceWithContext[+Out, +Ctx, +Mat](delegate: scaladsl.SourceWithCon def logWithMarker(name: String, marker: function.Function2[Out, Ctx, LogMarker]): SourceWithContext[Out, Ctx, Mat] = this.logWithMarker(name, marker, ConstantFun.javaIdentityFunction[Out], null) + /** + * Context-preserving variant of [[pekko.stream.javadsl.Source.takeWhile]]. + * + * @see [[pekko.stream.javadsl.Source.takeWhile]] + * @since 2.0.0 + */ + def takeWhile(p: function.Predicate[Out], inclusive: Boolean): SourceWithContext[Out, Ctx, Mat] = + viaScala(_.takeWhile(p.test, inclusive)) + + /** + * Context-preserving variant of [[pekko.stream.javadsl.Source.takeWhile]]. + * + * @see [[pekko.stream.javadsl.Source.takeWhile]] + * @since 2.0.0 + */ + def takeWhile(p: function.Predicate[Out]): SourceWithContext[Out, Ctx, Mat] = + takeWhile(p, inclusive = false) + + /** + * Context-preserving variant of [[pekko.stream.javadsl.Source.takeUntil]]. + * + * @see [[pekko.stream.javadsl.Source.takeUntil]] + * @since 2.0.0 + */ + def takeUntil(p: function.Predicate[Out]): SourceWithContext[Out, Ctx, Mat] = + viaScala(_.takeUntil(p.test)) + + /** + * Context-preserving variant of [[pekko.stream.javadsl.Source.take]]. + * + * @see [[pekko.stream.javadsl.Source.take]] + * @since 2.0.0 + */ + def take(n: Long): SourceWithContext[Out, Ctx, Mat] = + viaScala(_.take(n)) + + /** + * Context-preserving variant of [[pekko.stream.javadsl.Source.takeWithin]]. + * + * @see [[pekko.stream.javadsl.Source.takeWithin]] + * @since 2.0.0 + */ + def takeWithin(duration: java.time.Duration): SourceWithContext[Out, Ctx, Mat] = + viaScala(_.takeWithin(duration.toScala)) + /** * Context-preserving variant of [[pekko.stream.javadsl.Source.throttle]]. * diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContextOps.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContextOps.scala index 9fafa15e642..3a1193cf86c 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContextOps.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContextOps.scala @@ -18,6 +18,7 @@ import scala.collection.immutable import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.concurrent.duration.FiniteDuration +import scala.reflect.ClassTag import org.apache.pekko import pekko.NotUsed @@ -126,6 +127,17 @@ trait FlowWithContextOps[+Out, +Ctx, +Mat] { def map[Out2](f: Out => Out2): Repr[Out2, Ctx] = via(flow.map { case (e, ctx) => (f(e), ctx) }) + /** + * Context-preserving variant of [[pekko.stream.scaladsl.FlowOps.mapOption]]. + * + * Note, that the context of elements that are filtered out is skipped as well. + * + * @see [[pekko.stream.scaladsl.FlowOps.mapOption]] + * @since 2.0.0 + */ + def mapOption[Out2](f: Out => Option[Out2]): Repr[Out2, Ctx] = + via(flow.mapOption { case (e, ctx) => f(e).map(_ -> ctx) }) + /** * Context-preserving variant of [[pekko.stream.scaladsl.FlowOps.mapError]]. * @@ -206,6 +218,161 @@ trait FlowWithContextOps[+Out, +Ctx, +Mat] { def filterNot(pred: Out => Boolean): Repr[Out, Ctx] = collect { case e if !pred(e) => e } + /** + * Alias for [[filter]], added to enable filtering in for comprehensions. + * + * @see [[pekko.stream.scaladsl.FlowOps.withFilter]] + * @since 2.0.0 + */ + @ApiMayChange + def withFilter(pred: Out => Boolean): Repr[Out, Ctx] = + filter(pred) + + /** + * Context-preserving variant of [[pekko.stream.scaladsl.FlowOps.dropRepeated]]. + * + * @see [[pekko.stream.scaladsl.FlowOps.dropRepeated]] + * @since 2.0.0 + */ + def dropRepeated(): Repr[Out, Ctx] = + dropRepeated(ConstantFun.scalaAnyTwoEquals) + + /** + * Context-preserving variant of [[pekko.stream.scaladsl.FlowOps.dropRepeated]]. + * + * @see [[pekko.stream.scaladsl.FlowOps.dropRepeated]] + * @since 2.0.0 + */ + def dropRepeated(pred: (Out, Out) => Boolean): Repr[Out, Ctx] = + via(flow.dropRepeated((left, right) => pred(left._1, right._1))) + + /** + * Context-preserving variant of [[pekko.stream.scaladsl.FlowOps.takeWhile]]. + * + * @see [[pekko.stream.scaladsl.FlowOps.takeWhile]] + * @since 2.0.0 + */ + def takeWhile(pred: Out => Boolean): Repr[Out, Ctx] = + takeWhile(pred, inclusive = false) + + /** + * Context-preserving variant of [[pekko.stream.scaladsl.FlowOps.takeUntil]]. + * + * @see [[pekko.stream.scaladsl.FlowOps.takeUntil]] + * @since 2.0.0 + */ + def takeUntil(pred: Out => Boolean): Repr[Out, Ctx] = + takeWhile(!pred(_), inclusive = true) + + /** + * Context-preserving variant of [[pekko.stream.scaladsl.FlowOps.takeWhile]]. + * + * @see [[pekko.stream.scaladsl.FlowOps.takeWhile]] + * @since 2.0.0 + */ + def takeWhile(pred: Out => Boolean, inclusive: Boolean): Repr[Out, Ctx] = + via(flow.takeWhile({ case (e, _) => pred(e) }, inclusive)) + + /** + * Context-preserving variant of [[pekko.stream.scaladsl.FlowOps.dropWhile]]. + * + * @see [[pekko.stream.scaladsl.FlowOps.dropWhile]] + * @since 2.0.0 + */ + def dropWhile(pred: Out => Boolean): Repr[Out, Ctx] = + via(flow.dropWhile { case (e, _) => pred(e) }) + + /** + * Context-preserving variant of [[pekko.stream.scaladsl.FlowOps.collectFirst]]. + * + * Note, that the context of elements that are filtered out is skipped as well. + * + * @see [[pekko.stream.scaladsl.FlowOps.collectFirst]] + * @since 2.0.0 + */ + def collectFirst[Out2](f: PartialFunction[Out, Out2]): Repr[Out2, Ctx] = + via(flow.collectFirst { + case (e, ctx) if f.isDefinedAt(e) => (f(e), ctx) + }) + + /** + * Context-preserving variant of [[pekko.stream.scaladsl.FlowOps.collectWhile]]. + * + * Note, that the context of elements that are filtered out is skipped as well. + * + * @see [[pekko.stream.scaladsl.FlowOps.collectWhile]] + * @since 2.0.0 + */ + def collectWhile[Out2](f: PartialFunction[Out, Out2]): Repr[Out2, Ctx] = + via(flow.collectWhile { + case (e, ctx) if f.isDefinedAt(e) => (f(e), ctx) + }) + + /** + * Context-preserving variant of [[pekko.stream.scaladsl.FlowOps.collectType]]. + * + * Note, that the context of elements that are filtered out is skipped as well. + * + * @see [[pekko.stream.scaladsl.FlowOps.collectType]] + * @since 2.0.0 + */ + def collectType[Out2](implicit tag: ClassTag[Out2]): Repr[Out2, Ctx] = + collect { case tag(e) => e } + + /** + * Context-preserving variant of [[pekko.stream.scaladsl.FlowOps.drop]]. + * + * @see [[pekko.stream.scaladsl.FlowOps.drop]] + * @since 2.0.0 + */ + def drop(n: Long): Repr[Out, Ctx] = + via(flow.drop(n)) + + /** + * Context-preserving variant of [[pekko.stream.scaladsl.FlowOps.dropWithin]]. + * + * @see [[pekko.stream.scaladsl.FlowOps.dropWithin]] + * @since 2.0.0 + */ + def dropWithin(d: FiniteDuration): Repr[Out, Ctx] = + via(flow.dropWithin(d)) + + /** + * Context-preserving variant of [[pekko.stream.scaladsl.FlowOps.take]]. + * + * @see [[pekko.stream.scaladsl.FlowOps.take]] + * @since 2.0.0 + */ + def take(n: Long): Repr[Out, Ctx] = + via(flow.take(n)) + + /** + * Context-preserving variant of [[pekko.stream.scaladsl.FlowOps.takeWithin]]. + * + * @see [[pekko.stream.scaladsl.FlowOps.takeWithin]] + * @since 2.0.0 + */ + def takeWithin(d: FiniteDuration): Repr[Out, Ctx] = + via(flow.takeWithin(d)) + + /** + * Context-preserving variant of [[pekko.stream.scaladsl.FlowOps.limit]]. + * + * @see [[pekko.stream.scaladsl.FlowOps.limit]] + * @since 2.0.0 + */ + def limit(max: Long): Repr[Out, Ctx] = + limitWeighted(max)(_ => 1) + + /** + * Context-preserving variant of [[pekko.stream.scaladsl.FlowOps.limitWeighted]]. + * + * @see [[pekko.stream.scaladsl.FlowOps.limitWeighted]] + * @since 2.0.0 + */ + def limitWeighted(max: Long)(costFn: Out => Long): Repr[Out, Ctx] = + via(flow.limitWeighted(max) { case (e, _) => costFn(e) }) + /** * Context-preserving variant of [[pekko.stream.scaladsl.FlowOps.grouped]]. *