Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions docs/src/main/paradox/stream/stream-context.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,17 @@ while grouping operations will keep all contexts from the elements in
the group. Streaming one-to-many operations such as `mapConcat`
associate the original context with each of the produced elements.

The context-preserving filtering and truncating operators include
`filter`, `filterNot`, `collect`, `collectFirst`, `collectWhile`, `collectType`,
`mapOption`, `drop`, `dropWhile`, `dropWithin`, `dropRepeated`, `take`, `takeWhile`,
`takeUntil`, `takeWithin`, `limit`, and `limitWeighted`.

Scala
: @@snip [snip](/docs/src/test/scala/docs/stream/operators/WithContextSpec.scala) { #withContextOperators }

Java
: @@snip [snip](/docs/src/test/java/jdocs/stream/operators/WithContextTest.java) { #withContextOperators }

As an escape hatch, there is a `via` operator that allows you to
insert an arbitrary @apidoc[Flow] that can process the
@scala[tuples]@java[pairs] of elements and context in any way
Expand Down
27 changes: 27 additions & 0 deletions docs/src/test/java/jdocs/stream/operators/WithContextTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@

import static org.hamcrest.CoreMatchers.hasItems;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;

import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import jdocs.AbstractJavaTest;
Expand Down Expand Up @@ -119,4 +121,29 @@ public void documentAsFlowWithContext() throws Exception {
list, hasItems(Pair.create("yins", 1), Pair.create("zwyi", 2), Pair.create("dryi", 3)));
// #asFlowWithContext
}

@Test
public void documentWithContextOperators() throws Exception {
// #withContextOperators
Collection<Pair<String, Integer>> values =
List.of(
Pair.create("eins", 1),
Pair.create("eins", 2),
Pair.create("zwei", 3),
Pair.create("drei", 4));

SourceWithContext<String, Integer, NotUsed> filtered =
Source.from(values)
.asSourceWithContext(Pair::second)
.map(Pair::first)
.dropRepeated()
.mapOption(
word -> word.startsWith("z") ? Optional.empty() : Optional.of(word.toUpperCase()))
.take(2);

CompletionStage<List<Pair<String, Integer>>> result = filtered.runWith(Sink.seq(), system);
List<Pair<String, Integer>> list = result.toCompletableFuture().get(1, TimeUnit.SECONDS);
assertEquals(List.of(Pair.create("EINS", 1), Pair.create("DREI", 4)), list);
// #withContextOperators
}
}
25 changes: 25 additions & 0 deletions docs/src/test/scala/docs/stream/operators/WithContextSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,29 @@ class WithContextSpec extends PekkoSpec {
result.futureValue should contain theSameElementsInOrderAs immutable.Seq("snie" -> 1, "iewz" -> 2, "ierd" -> 3)
// #asFlowWithContext
}

"use filtering and truncating operators with context" in {
// #withContextOperators
import org.apache.pekko
import pekko.NotUsed
import pekko.stream.scaladsl.Sink
import pekko.stream.scaladsl.Source
import pekko.stream.scaladsl.SourceWithContext
import scala.collection.immutable

val values: immutable.Seq[(String, Int)] =
immutable.Seq("eins" -> 1, "eins" -> 2, "zwei" -> 3, "drei" -> 4)

val filtered: SourceWithContext[String, Int, NotUsed] =
Source(values)
.asSourceWithContext(_._2)
.map(_._1)
.dropRepeated()
.mapOption(word => if (word.startsWith("z")) None else Some(word.toUpperCase))
.take(2)

val result = filtered.runWith(Sink.seq)
result.futureValue should contain theSameElementsInOrderAs immutable.Seq("EINS" -> 1, "DREI" -> 4)
// #withContextOperators
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,14 @@
import static org.apache.pekko.NotUsed.notUsed;
import static org.junit.jupiter.api.Assertions.assertEquals;

import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import org.apache.pekko.NotUsed;
import org.apache.pekko.japi.Pair;
import org.apache.pekko.japi.pf.PFBuilder;
import org.apache.pekko.stream.StreamTestJupiter;
import org.apache.pekko.testkit.PekkoJUnitJupiterActorSystemResource;
import org.apache.pekko.testkit.PekkoSpec;
Expand Down Expand Up @@ -70,4 +73,135 @@ public void mustAllowComposingFlows() throws Exception {
assertEquals("1", pairs.get(0).first());
assertEquals(notUsed(), pairs.get(0).second());
}

@Test
public void mustPassContextThroughAdditionalFilteringOperators() throws Exception {
final FlowWithContext<Object, String, Integer, String, NotUsed> collectFlow =
FlowWithContext.<Object, String>create()
.collectType(Integer.class)
.collectWhile(
PFBuilder.<Integer, Integer>create()
.match(Integer.class, value -> value < 3, value -> value * 10)
.build());

final CompletionStage<List<Pair<Integer, String>>> collectResult =
Source.from(
List.of(
new Pair<Object, String>(1, "one"),
new Pair<Object, String>(2, "two"),
new Pair<Object, String>("three", "three-string"),
new Pair<Object, String>(3, "three")))
.via(collectFlow.asFlow())
.runWith(Sink.seq(), system);

assertEquals(
List.of(new Pair<>(10, "one"), new Pair<>(20, "two")),
collectResult.toCompletableFuture().get(3, TimeUnit.SECONDS));

final FlowWithContext<Object, String, Integer, String, NotUsed> collectFirstFlow =
FlowWithContext.<Object, String>create()
.collectFirst(
PFBuilder.<Object, Integer>create()
.match(Integer.class, value -> value > 1, value -> value * 10)
.build());

final CompletionStage<List<Pair<Integer, String>>> collectFirstResult =
Source.from(
List.of(
new Pair<Object, String>(1, "one"),
new Pair<Object, String>(2, "two"),
new Pair<Object, String>(3, "three")))
.via(collectFirstFlow.asFlow())
.runWith(Sink.seq(), system);

assertEquals(
List.of(new Pair<>(20, "two")),
collectFirstResult.toCompletableFuture().get(3, TimeUnit.SECONDS));

final FlowWithContext<Integer, String, Integer, String, NotUsed> mapOptionFlow =
FlowWithContext.<Integer, String>create()
.mapOption(value -> value == 0 ? Optional.empty() : Optional.of(value * 10));

final CompletionStage<List<Pair<Integer, String>>> mapOptionResult =
Source.from(
List.of(
new Pair<>(0, "zero"),
new Pair<>(1, "one"),
new Pair<>(2, "two"),
new Pair<>(3, "three")))
.via(mapOptionFlow.asFlow())
.runWith(Sink.seq(), system);

assertEquals(
List.of(new Pair<>(10, "one"), new Pair<>(20, "two"), new Pair<>(30, "three")),
mapOptionResult.toCompletableFuture().get(3, TimeUnit.SECONDS));
}

@Test
public void mustPassContextThroughTruncatingOperators() throws Exception {
final FlowWithContext<Integer, String, Integer, String, NotUsed> flow =
FlowWithContext.<Integer, String>create()
.drop(1)
.dropRepeated()
.dropWhile(value -> value < 2)
.takeUntil(value -> value == 3)
.takeWithin(Duration.ofDays(1))
.take(2)
.limit(2)
.limitWeighted(2, value -> 1L);

final CompletionStage<List<Pair<Integer, String>>> result =
Source.from(
List.of(
new Pair<>(0, "zero"),
new Pair<>(1, "one"),
new Pair<>(1, "one-duplicate"),
new Pair<>(2, "two"),
new Pair<>(3, "three"),
new Pair<>(4, "four")))
.via(flow.asFlow())
.runWith(Sink.seq(), system);

assertEquals(
List.of(new Pair<>(2, "two"), new Pair<>(3, "three")),
result.toCompletableFuture().get(3, TimeUnit.SECONDS));

final CompletionStage<List<Pair<Integer, String>>> inclusiveResult =
Source.from(List.of(new Pair<>(1, "one"), new Pair<>(2, "two"), new Pair<>(3, "three")))
.via(
FlowWithContext.<Integer, String>create()
.takeWhile(value -> value < 2, true)
.asFlow())
.runWith(Sink.seq(), system);

assertEquals(
List.of(new Pair<>(1, "one"), new Pair<>(2, "two")),
inclusiveResult.toCompletableFuture().get(3, TimeUnit.SECONDS));

final CompletionStage<List<Pair<Integer, String>>> customPredicateResult =
Source.from(List.of(new Pair<>(1, "one"), new Pair<>(3, "three"), new Pair<>(4, "four")))
.via(
FlowWithContext.<Integer, String>create()
.dropRepeated((left, right) -> left % 2 == right % 2)
.takeWhile(value -> value < 3)
.asFlow())
.runWith(Sink.seq(), system);

assertEquals(
List.of(new Pair<>(1, "one")),
customPredicateResult.toCompletableFuture().get(3, TimeUnit.SECONDS));

final CompletionStage<List<Pair<Integer, String>>> timedResult =
Source.single(new Pair<>(1, "one"))
.initialDelay(Duration.ofMillis(50))
.via(
FlowWithContext.<Integer, String>create()
.dropWithin(Duration.ofMillis(10))
.takeWithin(Duration.ofDays(1))
.asFlow())
.runWith(Sink.seq(), system);

assertEquals(
List.of(new Pair<>(1, "one")), timedResult.toCompletableFuture().get(3, TimeUnit.SECONDS));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.pekko.stream.javadsl;

import static org.junit.jupiter.api.Assertions.assertEquals;

import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.pekko.japi.Pair;
import org.apache.pekko.japi.pf.PFBuilder;
import org.apache.pekko.stream.StreamTestJupiter;
import org.apache.pekko.testkit.PekkoJUnitJupiterActorSystemResource;
import org.apache.pekko.testkit.PekkoSpec;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

public class SourceWithContextTest extends StreamTestJupiter {

public SourceWithContextTest() {
super(actorSystemResource);
}

@RegisterExtension
static PekkoJUnitJupiterActorSystemResource actorSystemResource =
new PekkoJUnitJupiterActorSystemResource("SourceWithContextTest", PekkoSpec.testConf());

@Test
public void mustPassContextThroughAdditionalFilteringOperators() throws Exception {
List<Pair<Object, String>> collectInput =
List.of(
new Pair<Object, String>(1, "one"),
new Pair<Object, String>(2, "two"),
new Pair<Object, String>("three", "three-string"),
new Pair<Object, String>(3, "three"));

List<Pair<Integer, String>> collectResult =
SourceWithContext.fromPairs(Source.from(collectInput))
.collectType(Integer.class)
.collectWhile(
PFBuilder.<Integer, Integer>create()
.match(Integer.class, value -> value < 3, value -> value * 10)
.build())
.runWith(Sink.seq(), system)
.toCompletableFuture()
.get(3, TimeUnit.SECONDS);

assertEquals(List.of(new Pair<>(10, "one"), new Pair<>(20, "two")), collectResult);

List<Pair<Integer, String>> collectFirstResult =
SourceWithContext.fromPairs(Source.from(collectInput))
.collectFirst(
PFBuilder.<Object, Integer>create()
.match(Integer.class, value -> value > 1, value -> value * 10)
.build())
.runWith(Sink.seq(), system)
.toCompletableFuture()
.get(3, TimeUnit.SECONDS);

assertEquals(List.of(new Pair<>(20, "two")), collectFirstResult);

List<Pair<Integer, String>> mapOptionInput =
List.of(
new Pair<>(0, "zero"),
new Pair<>(1, "one"),
new Pair<>(2, "two"),
new Pair<>(3, "three"));

List<Pair<Integer, String>> mapOptionResult =
SourceWithContext.fromPairs(Source.from(mapOptionInput))
.mapOption(value -> value == 0 ? Optional.empty() : Optional.of(value * 10))
.runWith(Sink.seq(), system)
.toCompletableFuture()
.get(3, TimeUnit.SECONDS);

assertEquals(
List.of(new Pair<>(10, "one"), new Pair<>(20, "two"), new Pair<>(30, "three")),
mapOptionResult);
}

@Test
public void mustPassContextThroughTruncatingOperators() throws Exception {
List<Pair<Integer, String>> input =
List.of(
new Pair<>(0, "zero"),
new Pair<>(1, "one"),
new Pair<>(1, "one-duplicate"),
new Pair<>(2, "two"),
new Pair<>(3, "three"),
new Pair<>(4, "four"));

List<Pair<Integer, String>> result =
SourceWithContext.fromPairs(Source.from(input))
.drop(1)
.dropRepeated()
.dropWhile(value -> value < 2)
.takeUntil(value -> value == 3)
.takeWithin(Duration.ofDays(1))
.take(2)
.limit(2)
.limitWeighted(2, value -> 1L)
.runWith(Sink.seq(), system)
.toCompletableFuture()
.get(3, TimeUnit.SECONDS);

assertEquals(List.of(new Pair<>(2, "two"), new Pair<>(3, "three")), result);

List<Pair<Integer, String>> inclusiveResult =
SourceWithContext.fromPairs(
Source.from(
List.of(new Pair<>(1, "one"), new Pair<>(2, "two"), new Pair<>(3, "three"))))
.takeWhile(value -> value < 2, true)
.runWith(Sink.seq(), system)
.toCompletableFuture()
.get(3, TimeUnit.SECONDS);

assertEquals(List.of(new Pair<>(1, "one"), new Pair<>(2, "two")), inclusiveResult);

List<Pair<Integer, String>> customPredicateResult =
SourceWithContext.fromPairs(
Source.from(
List.of(new Pair<>(1, "one"), new Pair<>(3, "three"), new Pair<>(4, "four"))))
.dropRepeated((left, right) -> left % 2 == right % 2)
.takeWhile(value -> value < 3)
.runWith(Sink.seq(), system)
.toCompletableFuture()
.get(3, TimeUnit.SECONDS);

assertEquals(List.of(new Pair<>(1, "one")), customPredicateResult);

List<Pair<Integer, String>> timedResult =
SourceWithContext.fromPairs(
Source.single(new Pair<>(1, "one")).initialDelay(Duration.ofMillis(50)))
.dropWithin(Duration.ofMillis(10))
.takeWithin(Duration.ofDays(1))
.runWith(Sink.seq(), system)
.toCompletableFuture()
.get(3, TimeUnit.SECONDS);

assertEquals(List.of(new Pair<>(1, "one")), timedResult);
}
}
Loading
Loading