From e07f22e23d9a34dfab52c50b92c2035adf3a243b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=99=8E=E9=B8=A3?= Date: Thu, 25 Jun 2026 04:09:13 +0800 Subject: [PATCH 1/2] Remove fanout publisher sink wrapper Motivation: Issue #2860 tracks moving stream internals away from old infrastructure. Sink.asPublisher(fanout = true) still kept an internal SinkModule wrapper around the GraphStage fanout publisher bridge. Modification: Build the fanout publisher sink directly from Flow.async and FanoutPublisherBridgeStage, preserving the async boundary while deleting FanoutPublisherSink. Add a MiMa filter for the removed internal class. Result: The fanout publisher path no longer needs a dedicated SinkModule wrapper, while the public Sink.asPublisher API remains unchanged. Tests: - rtk env JAVA_HOME=/Users/hepin/Library/Java/JavaVirtualMachines/azul-17.0.17/Contents/Home PATH=/Users/hepin/Library/Java/JavaVirtualMachines/azul-17.0.17/Contents/Home/bin:$PATH sbt "stream-tests / Test / testOnly org.apache.pekko.stream.impl.FanoutPublisherBehaviorSpec" - passed - rtk env JAVA_HOME=/Users/hepin/Library/Java/JavaVirtualMachines/azul-17.0.17/Contents/Home PATH=/Users/hepin/Library/Java/JavaVirtualMachines/azul-17.0.17/Contents/Home/bin:$PATH sbt "stream-tests / Test / testOnly org.apache.pekko.stream.scaladsl.PublisherSinkSpec org.apache.pekko.stream.scaladsl.SinkSpec org.apache.pekko.stream.scaladsl.FlowSpec org.apache.pekko.stream.impl.TimeoutsSpec" - passed - rtk env JAVA_HOME=/Users/hepin/Library/Java/JavaVirtualMachines/azul-17.0.17/Contents/Home PATH=/Users/hepin/Library/Java/JavaVirtualMachines/azul-17.0.17/Contents/Home/bin:$PATH sbt "stream-tests-tck / Test / testOnly org.apache.pekko.stream.tck.FanoutPublisherTest" - passed - rtk env JAVA_HOME=/Users/hepin/Library/Java/JavaVirtualMachines/azul-17.0.17/Contents/Home PATH=/Users/hepin/Library/Java/JavaVirtualMachines/azul-17.0.17/Contents/Home/bin:$PATH sbt "stream / mimaReportBinaryIssues" - passed - rtk env JAVA_HOME=/Users/hepin/Library/Java/JavaVirtualMachines/azul-17.0.17/Contents/Home PATH=/Users/hepin/Library/Java/JavaVirtualMachines/azul-17.0.17/Contents/Home/bin:$PATH sbt "+headerCheckAll" - passed - rtk scalafmt --list --mode diff-ref=origin/main - passed - rtk git diff --check - passed References: Refs #2860 --- .../remove-fanout-publisher-sink.excludes | 2 ++ .../org/apache/pekko/stream/impl/Sinks.scala | 20 ------------------- .../apache/pekko/stream/scaladsl/Sink.scala | 5 ++--- 3 files changed, 4 insertions(+), 23 deletions(-) create mode 100644 stream/src/main/mima-filters/2.0.x.backwards.excludes/remove-fanout-publisher-sink.excludes diff --git a/stream/src/main/mima-filters/2.0.x.backwards.excludes/remove-fanout-publisher-sink.excludes b/stream/src/main/mima-filters/2.0.x.backwards.excludes/remove-fanout-publisher-sink.excludes new file mode 100644 index 0000000000..da6f97b090 --- /dev/null +++ b/stream/src/main/mima-filters/2.0.x.backwards.excludes/remove-fanout-publisher-sink.excludes @@ -0,0 +1,2 @@ +# Remove the remaining SinkModule wrapper around the GraphStage fanout publisher bridge +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.stream.impl.FanoutPublisherSink") diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/Sinks.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/Sinks.scala index 5d61e9f5bf..b05574af60 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/Sinks.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/Sinks.scala @@ -108,26 +108,6 @@ import org.reactivestreams.Subscriber new PublisherSink[In](attr, amendShape(attr)) } -/** - * INTERNAL API - */ -@InternalApi private[pekko] final class FanoutPublisherSink[In](val attributes: Attributes, shape: SinkShape[In]) - extends SinkModule[In, Publisher[In]](shape) { - - override def create(context: MaterializationContext): (Subscriber[In], Publisher[In]) = { - context.materializer.materialize( - Source - // Keep the SinkModule ABI but route the runtime through the new GraphStage bridge instead - // of reviving the legacy FanoutProcessorImpl / ActorPublisher path. - .asSubscriber[In] - .toMat(Sink.fromGraph(new FanoutPublisherBridgeStage[In]))(Keep.both), - context.effectiveAttributes) - } - - override def withAttributes(attr: Attributes): SinkModule[In, Publisher[In]] = - new FanoutPublisherSink[In](attr, amendShape(attr)) -} - /** * INTERNAL API * Attaches a subscriber to this stream. diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala index d4e78bebf9..c61c7959ea 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala @@ -302,9 +302,8 @@ object Sink { * reject any additional `Subscriber`s. */ def asPublisher[T](fanout: Boolean): Sink[T, Publisher[T]] = - fromGraph( - if (fanout) new FanoutPublisherSink[T](DefaultAttributes.fanoutPublisherSink, shape("FanoutPublisherSink")) - else new PublisherSink[T](DefaultAttributes.publisherSink, shape("PublisherSink"))) + if (fanout) Flow[T].async.toMat(Sink.fromGraph(new FanoutPublisherBridgeStage[T]))(Keep.right) + else fromGraph(new PublisherSink[T](DefaultAttributes.publisherSink, shape("PublisherSink"))) /** * A `Sink` that materializes this `Sink` itself as a `Source`. From eb34e9683f708a10b6f8d885d96c3075ba6068db Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=99=8E=E9=B8=A3?= Date: Sat, 27 Jun 2026 06:21:48 +0800 Subject: [PATCH 2/2] chore: add Apache license header to fanout publisher sink MiMa filter Motivation: All other .excludes files in stream/src/main/mima-filters/ 2.0.x.backwards.excludes/ carry the standard Apache 2.0 header; the newly added remove-fanout-publisher-sink.excludes did not. Modification: Prepend the 16-line license block to match neighboring files. Result: Directory consistency with the established license-header convention. Tests: Not run - docs only (MiMa filter is metadata, not executable code). References: Refs #3176 --- .../remove-fanout-publisher-sink.excludes | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/stream/src/main/mima-filters/2.0.x.backwards.excludes/remove-fanout-publisher-sink.excludes b/stream/src/main/mima-filters/2.0.x.backwards.excludes/remove-fanout-publisher-sink.excludes index da6f97b090..eafe5a194b 100644 --- a/stream/src/main/mima-filters/2.0.x.backwards.excludes/remove-fanout-publisher-sink.excludes +++ b/stream/src/main/mima-filters/2.0.x.backwards.excludes/remove-fanout-publisher-sink.excludes @@ -1,2 +1,19 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + # Remove the remaining SinkModule wrapper around the GraphStage fanout publisher bridge ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.stream.impl.FanoutPublisherSink")