Add withContext operators#3199
Open
He-Pin wants to merge 3 commits into
Open
Conversation
dd149a7 to
38b219b
Compare
Contributor
There was a problem hiding this comment.
Pull request overview
This PR expands the Pekko Streams “withContext” DSL so SourceWithContext / FlowWithContext can use the same filtering and truncation-style operators as ordinary Source / Flow, while preserving (and correctly pairing) the context values through those operations.
Changes:
- Added context-preserving filtering/truncation operators to the Scala
FlowWithContextOps(shared by ScalaFlowWithContextandSourceWithContext), includingmapOption,collectFirst/While/Type,drop*,take*,limit*, pluswithFilterfor Scala for-comprehensions. - Added corresponding operators to the Java
SourceWithContext/FlowWithContextAPIs. - Added Scala + Java tests and updated the context-propagation documentation with compiled snippets covering the new operators.
Reviewed changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated no comments.
Show a summary per file
| File | Description |
|---|---|
| stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContextOps.scala | Adds context-preserving filtering/truncation operators (and withFilter) to the shared Scala withContext ops. |
| stream/src/main/scala/org/apache/pekko/stream/javadsl/SourceWithContext.scala | Exposes the new operator set on the Java SourceWithContext wrapper via delegation to Scala DSL. |
| stream/src/main/scala/org/apache/pekko/stream/javadsl/FlowWithContext.scala | Exposes the new operator set on the Java FlowWithContext wrapper via delegation to Scala DSL. |
| stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SourceWithContextSpec.scala | Adds Scala tests asserting correct context pairing through the new filtering/truncation operators (incl. for-comprehension filtering). |
| stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowWithContextSpec.scala | Adds Scala tests for the new FlowWithContext operators and context preservation. |
| stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceWithContextTest.java | New Java test coverage for the added SourceWithContext operators and context preservation. |
| stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowWithContextTest.java | Adds Java test coverage for the new FlowWithContext operators and context preservation. |
| docs/src/test/scala/docs/stream/operators/WithContextSpec.scala | Adds compiled Scala documentation snippet demonstrating filtering/truncation operators with context. |
| docs/src/test/java/jdocs/stream/operators/WithContextTest.java | Adds compiled Java documentation snippet demonstrating filtering/truncation operators with context. |
| docs/src/main/paradox/stream/stream-context.md | Documents the supported context-preserving filtering/truncation operators and references the compiled snippets. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
He-Pin
added a commit
to He-Pin/incubator-pekko
that referenced
this pull request
Jun 27, 2026
…fast Motivation: PR apache#3164 accidentally changed ConfigSSLEngineProvider.sslContext from eager val back to lazy val (it was based on a branch predating apache#3165). This regressed the fail-fast behavior added by apache#3165: keystore and truststore errors are now only discovered on the first connection attempt instead of at provider construction, and the regression test ConfigSSLEngineProviderSpec started failing on CI with "Expected exception RemoteTransportException to be thrown, but no exception was thrown". Modification: Change `private lazy val sslContext` back to `private val sslContext` in classic remoting's ConfigSSLEngineProvider so SSL initialization runs during construction and surfaces misconfiguration immediately. Result: ConfigSSLEngineProvider once again throws RemoteTransportException at construction time when the keystore cannot be loaded, matching the behavior asserted by ConfigSSLEngineProviderSpec and the intent of PR apache#3165. Tests: - sbt "remote / Test / testOnly org.apache.pekko.remote.ConfigSSLEngineProviderSpec org.apache.pekko.remote.Ticket1978ConfigSpec" Tests: succeeded 3, failed 0 References: Fixes CI failure on PR apache#3199; regression from apache#3164 over apache#3165
pjfanning
reviewed
Jun 27, 2026
| * | ||
| * Note, that the context of elements that are filtered out is skipped as well. | ||
| * | ||
| * @see [[pekko.stream.javadsl.Flow.collectFirst]] |
Member
There was a problem hiding this comment.
add @since 2.0.0 on new API functions
can you also rebase to pick up latest merges? There is at least one change in this PR that is not needed in this PR and is merged to main branch already.
Motivation: SourceWithContext and FlowWithContext were missing several safe filtering and truncating operators that ordinary Source and Flow already provide. Modification: Add context-preserving mapOption, collect*, drop/take, dropWithin/takeWithin, dropRepeated, takeUntil, and limit operators to Scala and Java withContext DSLs. Add Scala and Java regression coverage proving contexts stay paired with emitted elements, including representative overloads. Document the supported filtering and truncating operators in the context propagation guide with compiled Scala and Java snippets. Result: Users can use these operators directly on SourceWithContext and FlowWithContext without dropping into tuple-based streams. Tests: - rtk scalafmt --mode diff-ref=origin/main - rtk scalafmt --list --mode diff-ref=origin/main - rtk bash -lc "export JAVA_HOME=$(/usr/libexec/java_home -v 17); export PATH=$JAVA_HOME/bin:$PATH; sbt stream-tests/javafmtAll docs/javafmtAll" - rtk bash -lc "export JAVA_HOME=$(/usr/libexec/java_home -v 17); export PATH=$JAVA_HOME/bin:$PATH; sbt stream-tests/javafmtCheckAll docs/javafmtCheckAll" - rtk sbt headerCreateAll - rtk sbt +headerCheckAll - rtk sbt "+stream / mimaReportBinaryIssues" - rtk sbt "stream-tests / Test / testOnly org.apache.pekko.stream.scaladsl.SourceWithContextSpec org.apache.pekko.stream.scaladsl.FlowWithContextSpec org.apache.pekko.stream.javadsl.FlowWithContextTest org.apache.pekko.stream.javadsl.SourceWithContextTest" - rtk sbt "docs / Test / testOnly docs.stream.operators.WithContextSpec jdocs.stream.operators.WithContextTest" - rtk sbt docs/paradox - rtk git diff --check - rtk sbt validatePullRequest (stopped after unrelated local environment failures: leveldbjni native library missing for arm64 and ConfigSSLEngineProviderSpec local failure) References: Fixes apache#3177
…fast Motivation: PR apache#3164 accidentally changed ConfigSSLEngineProvider.sslContext from eager val back to lazy val (it was based on a branch predating apache#3165). This regressed the fail-fast behavior added by apache#3165: keystore and truststore errors are now only discovered on the first connection attempt instead of at provider construction, and the regression test ConfigSSLEngineProviderSpec started failing on CI with "Expected exception RemoteTransportException to be thrown, but no exception was thrown". Modification: Change `private lazy val sslContext` back to `private val sslContext` in classic remoting's ConfigSSLEngineProvider so SSL initialization runs during construction and surfaces misconfiguration immediately. Result: ConfigSSLEngineProvider once again throws RemoteTransportException at construction time when the keystore cannot be loaded, matching the behavior asserted by ConfigSSLEngineProviderSpec and the intent of PR apache#3165. Tests: - sbt "remote / Test / testOnly org.apache.pekko.remote.ConfigSSLEngineProviderSpec org.apache.pekko.remote.Ticket1978ConfigSpec" Tests: succeeded 3, failed 0 References: Fixes CI failure on PR apache#3199; regression from apache#3164 over apache#3165
Address review comment by @pjfanning on apache#3199: apache#3199 (review) Add `@since 2.0.0` to the scaladoc of every new API method introduced by the withContext operators commit, across the Java and Scala DSLs: - javadsl.FlowWithContext (16 methods) - javadsl.SourceWithContext (16 methods) - scaladsl.FlowWithContextOps (17 methods) Total: 49 @SInCE annotations added.
abebe9c to
a46c7b9
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Motivation
SourceWithContextandFlowWithContextwere missing safe filtering and truncating operators available on ordinarySourceandFlow.Modification
Add context-preserving
mapOption,collectFirst,collectWhile,collectType,drop,dropWithin,dropRepeated,dropWhile,take,takeWhile,takeUntil,takeWithin,limit, andlimitWeightedoperators to the Scala and Java withContext DSLs. Add ScalawithFilterfor for-comprehension filtering.Add Scala and Java tests covering the new operators, representative overloads, and context pairing. Document the supported filtering and truncating operators in the context propagation guide with compiled Scala and Java snippets.
Result
Users can keep automatic context propagation while using these operators directly on
SourceWithContextandFlowWithContext.Tests
rtk scalafmt --mode diff-ref=origin/mainrtk scalafmt --list --mode diff-ref=origin/mainrtk bash -lc "export JAVA_HOME=$(/usr/libexec/java_home -v 17); export PATH=\"$JAVA_HOME/bin:$PATH\"; sbt \"stream-tests / javafmtAll\" \"docs / javafmtAll\""rtk bash -lc "export JAVA_HOME=$(/usr/libexec/java_home -v 17); export PATH=\"$JAVA_HOME/bin:$PATH\"; sbt \"stream-tests / javafmtCheckAll\" \"docs / javafmtCheckAll\""rtk sbt headerCreateAllrtk sbt +headerCheckAllrtk sbt "+stream / mimaReportBinaryIssues"rtk sbt "stream-tests / Test / testOnly org.apache.pekko.stream.scaladsl.SourceWithContextSpec org.apache.pekko.stream.scaladsl.FlowWithContextSpec org.apache.pekko.stream.javadsl.FlowWithContextTest org.apache.pekko.stream.javadsl.SourceWithContextTest"rtk sbt "docs / Test / testOnly docs.stream.operators.WithContextSpec jdocs.stream.operators.WithContextTest"rtk sbt docs/paradoxrtk git diff --checkrtk sbt validatePullRequestattempted locally, then stopped after unrelated local environment failures:leveldbjninative library missing arm64 support in persistence/leveldb tests, andConfigSSLEngineProviderSpecfailing outside this change.References
Fixes #3177