Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 36 additions & 29 deletions remote/src/main/scala/org/apache/pekko/remote/Endpoint.scala
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,8 @@ private[remote] object ReliableDeliverySupervisor {
case object Ungate
case object AttemptSysMsgRedelivery
final case class GotUid(uid: Int, remoteAddres: Address)
@nowarn("msg=deprecated")
final case class AckFromReader(readerUid: Int, ack: Ack) extends NoSerializationVerificationNeeded

case object IsIdle
case object Idle
Expand Down Expand Up @@ -363,33 +365,9 @@ private[remote] class ReliableDeliverySupervisor(
case IsIdle => // Do not reply, we will Terminate soon, or send a GotUid
case s: Send =>
handleSend(s)
case ack: Ack =>
// If we are not sure about the UID just ignore the ack. Ignoring is fine.
if (uidConfirmed) {
if (ack.cumulativeAck > resendBuffer.maxSeq)
log.warning(
"Ignoring stale system message acknowledgement from remote system [{}]. Highest SEQ so far was [{}] but cumulative ACK is [{}].",
remoteAddress,
resendBuffer.maxSeq,
ack.cumulativeAck)
else {
try resendBuffer = resendBuffer.acknowledge(ack)
catch {
case NonFatal(e) =>
throw new HopelessAssociation(
localAddress,
remoteAddress,
uid,
new IllegalStateException(
s"Error encountered while processing system message " +
s"acknowledgement buffer: $resendBuffer ack: $ack",
e))
}

resendNacked()
}
}
case AttemptSysMsgRedelivery =>
case AckFromReader(readerUid, ack) => handleAck(readerUid, ack)
case _: Ack => () // Ignore untagged ACKs. EndpointReader attaches its UID before forwarding ACKs.
case AttemptSysMsgRedelivery =>
if (uidConfirmed) resendAll()
case Terminated(_) =>
currentHandle = None
Expand Down Expand Up @@ -525,6 +503,33 @@ private[remote] class ReliableDeliverySupervisor(
resendBuffer.nonAcked.take(settings.SysResendLimit).foreach { writer ! _ }
}

private def handleAck(readerUid: Int, ack: Ack): Unit =
// If we are not sure about the UID just ignore the ack. Ignoring is fine.
if (uidConfirmed && uid.contains(readerUid)) {
if (ack.cumulativeAck > resendBuffer.maxSeq)
log.warning(
"Ignoring stale system message acknowledgement from remote system [{}]. Highest SEQ so far was [{}] but cumulative ACK is [{}].",
remoteAddress,
resendBuffer.maxSeq,
ack.cumulativeAck)
else {
try resendBuffer = resendBuffer.acknowledge(ack)
catch {
case NonFatal(e) =>
throw new HopelessAssociation(
localAddress,
remoteAddress,
uid,
new IllegalStateException(
s"Error encountered while processing system message " +
s"acknowledgement buffer: $resendBuffer ack: $ack",
e))
}

resendNacked()
}
}

private def tryBuffer(s: Send): Unit =
try {
resendBuffer = resendBuffer.buffer(s)
Expand Down Expand Up @@ -1152,7 +1157,8 @@ private[remote] class EndpointReader(
case InboundPayload(p) if p.length <= transport.maximumPayloadBytes =>
val (ackOption, msgOption) = tryDecodeMessageAndAck(p)

for (ack <- ackOption; reliableDelivery <- reliableDeliverySupervisor) reliableDelivery ! ack
for (ack <- ackOption; reliableDelivery <- reliableDeliverySupervisor)
reliableDelivery ! ReliableDeliverySupervisor.AckFromReader(uid, ack)

msgOption match {
case Some(msg) =>
Expand Down Expand Up @@ -1200,7 +1206,8 @@ private[remote] class EndpointReader(

case InboundPayload(p) if p.length <= transport.maximumPayloadBytes =>
val (ackOption, msgOption) = tryDecodeMessageAndAck(p)
for (ack <- ackOption; reliableDelivery <- reliableDeliverySupervisor) reliableDelivery ! ack
for (ack <- ackOption; reliableDelivery <- reliableDeliverySupervisor)
reliableDelivery ! ReliableDeliverySupervisor.AckFromReader(uid, ack)

if (log.isWarningEnabled)
log.warning(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.pekko.remote

import java.util.concurrent.ConcurrentHashMap

import scala.annotation.nowarn
import scala.concurrent.Promise

import org.apache.pekko
import pekko.actor.{ ActorRef, Address, Nobody, RootActorPath }
import pekko.remote.EndpointManager.{ Link, ResendState, Send }
import pekko.remote.ReliableDeliverySupervisor.AckFromReader
import pekko.remote.transport._
import pekko.testkit.{ ImplicitSender, PekkoSpec, TestActorRef, TestProbe }
import pekko.util.OptionVal

import com.typesafe.config.ConfigFactory

object ReliableDeliverySupervisorSpec {
val config = ConfigFactory.parseString("""
pekko {
actor.provider = remote
actor.allow-java-serialization = on
actor.warn-about-java-serializer-usage = off

remote {
artery.enabled = off
warn-about-direct-use = off
use-unsafe-remote-features-outside-cluster = on

classic {
resend-interval = 30 s
system-message-buffer-size = 10
system-message-ack-piggyback-timeout = 30 s

netty.tcp {
hostname = "localhost"
port = 0
}
}
}
}
""")
}

@nowarn("msg=deprecated")
class ReliableDeliverySupervisorSpec extends PekkoSpec(ReliableDeliverySupervisorSpec.config) with ImplicitSender {

private val localAddress = Address("pekko.test", system.name, "localhost", 2551)
private val remoteAddress = Address("pekko.test", "remote-system", "localhost", 2552)
private val oldUid = 1
private val newUid = 2

"ReliableDeliverySupervisor" must {
"ignore ACKs from stale EndpointReader UIDs after a new UID is confirmed" in {
val parentProbe = TestProbe()
val supervisor = newSupervisor(initialUid = oldUid, parentProbe.ref)
val underlying = supervisor.underlyingActor

supervisor ! ReliableDeliverySupervisor.GotUid(newUid, remoteAddress)
parentProbe.expectMsg(ReliableDeliverySupervisor.GotUid(newUid, remoteAddress))
underlying.resendBuffer = bufferWith(0L, 1L)

supervisor ! AckFromReader(oldUid, Ack(SeqNo(0)))

underlying.resendBuffer.nonAcked.map(_.seq) should ===(Vector(SeqNo(0), SeqNo(1)))

supervisor ! Ack(SeqNo(0))

underlying.resendBuffer.nonAcked.map(_.seq) should ===(Vector(SeqNo(0), SeqNo(1)))

supervisor ! AckFromReader(newUid, Ack(SeqNo(0)))

underlying.resendBuffer.nonAcked.map(_.seq) should ===(Vector(SeqNo(1)))
underlying.resendBuffer = new AckedSendBuffer[Send](0)
}
}

private def newSupervisor(initialUid: Int, parent: ActorRef): TestActorRef[ReliableDeliverySupervisor] = {
val registry = new TestTransport.AssociationRegistry
val underlyingTransport =
new TestTransport(localAddress.copy(protocol = "test"), registry, schemeIdentifier = "test")
underlyingTransport.writeBehavior.pushConstant(true)
val protocolTransport = new PekkoProtocolTransport(
underlyingTransport,
system,
new PekkoProtocolSettings(system.settings.config),
PekkoPduProtobufCodec)
val underlyingHandle =
TestAssociationHandle(localAddress.copy(protocol = "test"), remoteAddress.copy(protocol = "test"),
underlyingTransport, inbound = true)
val handle = new PekkoProtocolHandle(
localAddress,
remoteAddress,
Promise[AssociationHandle.HandleEventListener](),
underlyingHandle,
HandshakeInfo(remoteAddress, initialUid),
TestProbe().ref,
PekkoPduProtobufCodec,
RARP(system).provider.remoteSettings.ProtocolName)

TestActorRef[ReliableDeliverySupervisor](
ReliableDeliverySupervisor.props(
handleOrActive = Some(handle),
localAddress,
remoteAddress,
refuseUid = None,
protocolTransport,
RARP(system).provider.remoteSettings,
PekkoPduProtobufCodec,
new ConcurrentHashMap[Link, ResendState]),
parent)
}

private def bufferWith(seqNumbers: Long*): AckedSendBuffer[Send] =
seqNumbers.foldLeft(new AckedSendBuffer[Send](10)) { (buffer, seq) =>
buffer.buffer(send(seq))
}

private def send(seq: Long): Send =
Send(
message = s"msg-$seq",
senderOption = OptionVal.None,
recipient = remoteRef,
seqOpt = Some(SeqNo(seq)))

private def remoteRef: RemoteActorRef =
new RemoteActorRef(
RARP(system).provider.transport,
localAddress,
RootActorPath(remoteAddress) / "user" / "recipient",
Nobody,
props = None,
deploy = None,
acceptProtocolNames = Set("pekko"))
}