diff --git a/remote/src/main/scala/org/apache/pekko/remote/AckedDelivery.scala b/remote/src/main/scala/org/apache/pekko/remote/AckedDelivery.scala index f84c2c79166..16fc083272a 100644 --- a/remote/src/main/scala/org/apache/pekko/remote/AckedDelivery.scala +++ b/remote/src/main/scala/org/apache/pekko/remote/AckedDelivery.scala @@ -116,23 +116,25 @@ final case class AckedSendBuffer[T <: HasSequenceNumber]( /** * Processes an incoming acknowledgement and returns a new buffer with only unacknowledged elements remaining. + * Acknowledgements for sequence numbers newer than this buffer has seen are stale and return this buffer unchanged. * @param ack The received acknowledgement * @return An updated buffer containing the remaining unacknowledged messages */ def acknowledge(ack: Ack): AckedSendBuffer[T] = { - if (ack.cumulativeAck > maxSeq) - throw new IllegalArgumentException(s"Highest SEQ so far was $maxSeq but cumulative ACK is ${ack.cumulativeAck}") - val newNacked = - if (ack.nacks.isEmpty) Vector.empty + if (ack.cumulativeAck > maxSeq) this + else { + val newNacked = + if (ack.nacks.isEmpty) Vector.empty + else + (nacked ++ nonAcked).filter { m => + ack.nacks(m.seq) + } + if (newNacked.size < ack.nacks.size) throw new ResendUnfulfillableException else - (nacked ++ nonAcked).filter { m => - ack.nacks(m.seq) - } - if (newNacked.size < ack.nacks.size) throw new ResendUnfulfillableException - else - this.copy(nonAcked = nonAcked.filter { m => - m.seq > ack.cumulativeAck - }, nacked = newNacked) + this.copy(nonAcked = nonAcked.filter { m => + m.seq > ack.cumulativeAck + }, nacked = newNacked) + } } /** diff --git a/remote/src/main/scala/org/apache/pekko/remote/Endpoint.scala b/remote/src/main/scala/org/apache/pekko/remote/Endpoint.scala index 9da404cdadb..221fd47848b 100644 --- a/remote/src/main/scala/org/apache/pekko/remote/Endpoint.scala +++ b/remote/src/main/scala/org/apache/pekko/remote/Endpoint.scala @@ -366,20 +366,28 @@ private[remote] class ReliableDeliverySupervisor( case ack: Ack => // If we are not sure about the UID just ignore the ack. Ignoring is fine. if (uidConfirmed) { - try resendBuffer = resendBuffer.acknowledge(ack) - catch { - case NonFatal(e) => - throw new HopelessAssociation( - localAddress, - remoteAddress, - uid, - new IllegalStateException( - s"Error encountered while processing system message " + - s"acknowledgement buffer: $resendBuffer ack: $ack", - e)) - } + if (ack.cumulativeAck > resendBuffer.maxSeq) + log.warning( + "Ignoring stale system message acknowledgement from remote system [{}]. Highest SEQ so far was [{}] but cumulative ACK is [{}].", + remoteAddress, + resendBuffer.maxSeq, + ack.cumulativeAck) + else { + try resendBuffer = resendBuffer.acknowledge(ack) + catch { + case NonFatal(e) => + throw new HopelessAssociation( + localAddress, + remoteAddress, + uid, + new IllegalStateException( + s"Error encountered while processing system message " + + s"acknowledgement buffer: $resendBuffer ack: $ack", + e)) + } - resendNacked() + resendNacked() + } } case AttemptSysMsgRedelivery => if (uidConfirmed) resendAll() diff --git a/remote/src/test/scala/org/apache/pekko/remote/AckedDeliverySpec.scala b/remote/src/test/scala/org/apache/pekko/remote/AckedDeliverySpec.scala index 60494a661b9..cb3f22b62c8 100644 --- a/remote/src/test/scala/org/apache/pekko/remote/AckedDeliverySpec.scala +++ b/remote/src/test/scala/org/apache/pekko/remote/AckedDeliverySpec.scala @@ -151,6 +151,17 @@ class AckedDeliverySpec extends PekkoSpec { } + "ignore stale cumulative ack newer than the send buffer" in { + val emptyBuffer = new AckedSendBuffer[Sequenced](10) + emptyBuffer.acknowledge(Ack(SeqNo(2))) should ===(emptyBuffer) + emptyBuffer.acknowledge(Ack(SeqNo(2), nacks = Set(SeqNo(0)))) should ===(emptyBuffer) + + val buffer = emptyBuffer.buffer(msg(0)).buffer(msg(1)) + buffer.acknowledge(Ack(SeqNo(2))) should ===(buffer) + buffer.acknowledge(Ack(SeqNo(2), nacks = Set(SeqNo(0)))) should ===(buffer) + buffer.acknowledge(Ack(SeqNo(2))).acknowledge(Ack(SeqNo(0))).nonAcked should ===(Vector(msg(1))) + } + "keep NACKed messages in buffer if selective nacks are received" in { val b0 = new AckedSendBuffer[Sequenced](10) val msg0 = msg(0)