diff --git a/remote/src/main/scala/org/apache/pekko/remote/Endpoint.scala b/remote/src/main/scala/org/apache/pekko/remote/Endpoint.scala index aab9bef792..9dfdf249eb 100644 --- a/remote/src/main/scala/org/apache/pekko/remote/Endpoint.scala +++ b/remote/src/main/scala/org/apache/pekko/remote/Endpoint.scala @@ -226,6 +226,8 @@ private[remote] object ReliableDeliverySupervisor { case object Ungate case object AttemptSysMsgRedelivery final case class GotUid(uid: Int, remoteAddres: Address) + @nowarn("msg=deprecated") + final case class AckFromReader(readerUid: Int, ack: Ack) extends NoSerializationVerificationNeeded case object IsIdle case object Idle @@ -363,33 +365,9 @@ private[remote] class ReliableDeliverySupervisor( case IsIdle => // Do not reply, we will Terminate soon, or send a GotUid case s: Send => handleSend(s) - case ack: Ack => - // If we are not sure about the UID just ignore the ack. Ignoring is fine. - if (uidConfirmed) { - if (ack.cumulativeAck > resendBuffer.maxSeq) - log.warning( - "Ignoring stale system message acknowledgement from remote system [{}]. Highest SEQ so far was [{}] but cumulative ACK is [{}].", - remoteAddress, - resendBuffer.maxSeq, - ack.cumulativeAck) - else { - try resendBuffer = resendBuffer.acknowledge(ack) - catch { - case NonFatal(e) => - throw new HopelessAssociation( - localAddress, - remoteAddress, - uid, - new IllegalStateException( - s"Error encountered while processing system message " + - s"acknowledgement buffer: $resendBuffer ack: $ack", - e)) - } - - resendNacked() - } - } - case AttemptSysMsgRedelivery => + case AckFromReader(readerUid, ack) => handleAck(readerUid, ack) + case _: Ack => () // Ignore untagged ACKs. EndpointReader attaches its UID before forwarding ACKs. + case AttemptSysMsgRedelivery => if (uidConfirmed) resendAll() case Terminated(_) => currentHandle = None @@ -525,6 +503,33 @@ private[remote] class ReliableDeliverySupervisor( resendBuffer.nonAcked.take(settings.SysResendLimit).foreach { writer ! _ } } + private def handleAck(readerUid: Int, ack: Ack): Unit = + // If we are not sure about the UID just ignore the ack. Ignoring is fine. + if (uidConfirmed && uid.contains(readerUid)) { + if (ack.cumulativeAck > resendBuffer.maxSeq) + log.warning( + "Ignoring stale system message acknowledgement from remote system [{}]. Highest SEQ so far was [{}] but cumulative ACK is [{}].", + remoteAddress, + resendBuffer.maxSeq, + ack.cumulativeAck) + else { + try resendBuffer = resendBuffer.acknowledge(ack) + catch { + case NonFatal(e) => + throw new HopelessAssociation( + localAddress, + remoteAddress, + uid, + new IllegalStateException( + s"Error encountered while processing system message " + + s"acknowledgement buffer: $resendBuffer ack: $ack", + e)) + } + + resendNacked() + } + } + private def tryBuffer(s: Send): Unit = try { resendBuffer = resendBuffer.buffer(s) @@ -1152,7 +1157,8 @@ private[remote] class EndpointReader( case InboundPayload(p) if p.length <= transport.maximumPayloadBytes => val (ackOption, msgOption) = tryDecodeMessageAndAck(p) - for (ack <- ackOption; reliableDelivery <- reliableDeliverySupervisor) reliableDelivery ! ack + for (ack <- ackOption; reliableDelivery <- reliableDeliverySupervisor) + reliableDelivery ! ReliableDeliverySupervisor.AckFromReader(uid, ack) msgOption match { case Some(msg) => @@ -1200,7 +1206,8 @@ private[remote] class EndpointReader( case InboundPayload(p) if p.length <= transport.maximumPayloadBytes => val (ackOption, msgOption) = tryDecodeMessageAndAck(p) - for (ack <- ackOption; reliableDelivery <- reliableDeliverySupervisor) reliableDelivery ! ack + for (ack <- ackOption; reliableDelivery <- reliableDeliverySupervisor) + reliableDelivery ! ReliableDeliverySupervisor.AckFromReader(uid, ack) if (log.isWarningEnabled) log.warning( diff --git a/remote/src/test/scala/org/apache/pekko/remote/ReliableDeliverySupervisorSpec.scala b/remote/src/test/scala/org/apache/pekko/remote/ReliableDeliverySupervisorSpec.scala new file mode 100644 index 0000000000..d433f03bc0 --- /dev/null +++ b/remote/src/test/scala/org/apache/pekko/remote/ReliableDeliverySupervisorSpec.scala @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.remote + +import java.util.concurrent.ConcurrentHashMap + +import scala.annotation.nowarn +import scala.concurrent.Promise + +import org.apache.pekko +import pekko.actor.{ ActorRef, Address, Nobody, RootActorPath } +import pekko.remote.EndpointManager.{ Link, ResendState, Send } +import pekko.remote.ReliableDeliverySupervisor.AckFromReader +import pekko.remote.transport._ +import pekko.testkit.{ ImplicitSender, PekkoSpec, TestActorRef, TestProbe } +import pekko.util.OptionVal + +import com.typesafe.config.ConfigFactory + +object ReliableDeliverySupervisorSpec { + val config = ConfigFactory.parseString(""" + pekko { + actor.provider = remote + actor.allow-java-serialization = on + actor.warn-about-java-serializer-usage = off + + remote { + artery.enabled = off + warn-about-direct-use = off + use-unsafe-remote-features-outside-cluster = on + + classic { + resend-interval = 30 s + system-message-buffer-size = 10 + system-message-ack-piggyback-timeout = 30 s + + netty.tcp { + hostname = "localhost" + port = 0 + } + } + } + } + """) +} + +@nowarn("msg=deprecated") +class ReliableDeliverySupervisorSpec extends PekkoSpec(ReliableDeliverySupervisorSpec.config) with ImplicitSender { + + private val localAddress = Address("pekko.test", system.name, "localhost", 2551) + private val remoteAddress = Address("pekko.test", "remote-system", "localhost", 2552) + private val oldUid = 1 + private val newUid = 2 + + "ReliableDeliverySupervisor" must { + "ignore ACKs from stale EndpointReader UIDs after a new UID is confirmed" in { + val parentProbe = TestProbe() + val supervisor = newSupervisor(initialUid = oldUid, parentProbe.ref) + val underlying = supervisor.underlyingActor + + supervisor ! ReliableDeliverySupervisor.GotUid(newUid, remoteAddress) + parentProbe.expectMsg(ReliableDeliverySupervisor.GotUid(newUid, remoteAddress)) + underlying.resendBuffer = bufferWith(0L, 1L) + + supervisor ! AckFromReader(oldUid, Ack(SeqNo(0))) + + underlying.resendBuffer.nonAcked.map(_.seq) should ===(Vector(SeqNo(0), SeqNo(1))) + + supervisor ! Ack(SeqNo(0)) + + underlying.resendBuffer.nonAcked.map(_.seq) should ===(Vector(SeqNo(0), SeqNo(1))) + + supervisor ! AckFromReader(newUid, Ack(SeqNo(0))) + + underlying.resendBuffer.nonAcked.map(_.seq) should ===(Vector(SeqNo(1))) + underlying.resendBuffer = new AckedSendBuffer[Send](0) + } + } + + private def newSupervisor(initialUid: Int, parent: ActorRef): TestActorRef[ReliableDeliverySupervisor] = { + val registry = new TestTransport.AssociationRegistry + val underlyingTransport = + new TestTransport(localAddress.copy(protocol = "test"), registry, schemeIdentifier = "test") + underlyingTransport.writeBehavior.pushConstant(true) + val protocolTransport = new PekkoProtocolTransport( + underlyingTransport, + system, + new PekkoProtocolSettings(system.settings.config), + PekkoPduProtobufCodec) + val underlyingHandle = + TestAssociationHandle(localAddress.copy(protocol = "test"), remoteAddress.copy(protocol = "test"), + underlyingTransport, inbound = true) + val handle = new PekkoProtocolHandle( + localAddress, + remoteAddress, + Promise[AssociationHandle.HandleEventListener](), + underlyingHandle, + HandshakeInfo(remoteAddress, initialUid), + TestProbe().ref, + PekkoPduProtobufCodec, + RARP(system).provider.remoteSettings.ProtocolName) + + TestActorRef[ReliableDeliverySupervisor]( + ReliableDeliverySupervisor.props( + handleOrActive = Some(handle), + localAddress, + remoteAddress, + refuseUid = None, + protocolTransport, + RARP(system).provider.remoteSettings, + PekkoPduProtobufCodec, + new ConcurrentHashMap[Link, ResendState]), + parent) + } + + private def bufferWith(seqNumbers: Long*): AckedSendBuffer[Send] = + seqNumbers.foldLeft(new AckedSendBuffer[Send](10)) { (buffer, seq) => + buffer.buffer(send(seq)) + } + + private def send(seq: Long): Send = + Send( + message = s"msg-$seq", + senderOption = OptionVal.None, + recipient = remoteRef, + seqOpt = Some(SeqNo(seq))) + + private def remoteRef: RemoteActorRef = + new RemoteActorRef( + RARP(system).provider.transport, + localAddress, + RootActorPath(remoteAddress) / "user" / "recipient", + Nobody, + props = None, + deploy = None, + acceptProtocolNames = Set("pekko")) +}