diff --git a/stream/src/main/mima-filters/2.0.x.backwards.excludes/remove-fanout-publisher-sink.excludes b/stream/src/main/mima-filters/2.0.x.backwards.excludes/remove-fanout-publisher-sink.excludes new file mode 100644 index 0000000000..eafe5a194b --- /dev/null +++ b/stream/src/main/mima-filters/2.0.x.backwards.excludes/remove-fanout-publisher-sink.excludes @@ -0,0 +1,19 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Remove the remaining SinkModule wrapper around the GraphStage fanout publisher bridge +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.stream.impl.FanoutPublisherSink") diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/Sinks.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/Sinks.scala index 5d61e9f5bf..b05574af60 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/Sinks.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/Sinks.scala @@ -108,26 +108,6 @@ import org.reactivestreams.Subscriber new PublisherSink[In](attr, amendShape(attr)) } -/** - * INTERNAL API - */ -@InternalApi private[pekko] final class FanoutPublisherSink[In](val attributes: Attributes, shape: SinkShape[In]) - extends SinkModule[In, Publisher[In]](shape) { - - override def create(context: MaterializationContext): (Subscriber[In], Publisher[In]) = { - context.materializer.materialize( - Source - // Keep the SinkModule ABI but route the runtime through the new GraphStage bridge instead - // of reviving the legacy FanoutProcessorImpl / ActorPublisher path. - .asSubscriber[In] - .toMat(Sink.fromGraph(new FanoutPublisherBridgeStage[In]))(Keep.both), - context.effectiveAttributes) - } - - override def withAttributes(attr: Attributes): SinkModule[In, Publisher[In]] = - new FanoutPublisherSink[In](attr, amendShape(attr)) -} - /** * INTERNAL API * Attaches a subscriber to this stream. diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala index d4e78bebf9..c61c7959ea 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala @@ -302,9 +302,8 @@ object Sink { * reject any additional `Subscriber`s. */ def asPublisher[T](fanout: Boolean): Sink[T, Publisher[T]] = - fromGraph( - if (fanout) new FanoutPublisherSink[T](DefaultAttributes.fanoutPublisherSink, shape("FanoutPublisherSink")) - else new PublisherSink[T](DefaultAttributes.publisherSink, shape("PublisherSink"))) + if (fanout) Flow[T].async.toMat(Sink.fromGraph(new FanoutPublisherBridgeStage[T]))(Keep.right) + else fromGraph(new PublisherSink[T](DefaultAttributes.publisherSink, shape("PublisherSink"))) /** * A `Sink` that materializes this `Sink` itself as a `Source`.