From 54995839fcca57a1dd9189ca67715bbd35ef9a43 Mon Sep 17 00:00:00 2001 From: "He-Pin(kerr)" Date: Wed, 24 Jun 2026 13:01:28 +0800 Subject: [PATCH] Ignore stale classic remoting ACKs #3126 (#3158) Stale system-message acknowledgements can arrive after classic remoting reconnects and the sender has reset its resend buffer for the current UID. Treating a cumulative ACK newer than the sender buffer as a protocol violation gates/quarantines an otherwise valid association. Modification: Make AckedSendBuffer ignore cumulative ACKs beyond the highest sequence number it has seen, log and skip those ACKs in ReliableDeliverySupervisor, and add regression coverage for empty/non-empty buffers, stale ACKs with NACKs, and valid ACK processing after a stale ACK. Result: Late ACKs from an old receive buffer no longer trigger HopelessAssociation/quarantine, while real unfulfillable NACKs still fail through the existing ResendUnfulfillableException path. Tests: - sbt "remote / Test / testOnly org.apache.pekko.remote.AckedDeliverySpec" / passed: 13 tests - sbt checkCodeStyle / passed - scalafmt --list --mode diff-ref=origin/main / passed: no files listed - git diff --check / passed: no output - subAgent review / FINAL PASS - qodercli review (non-interactive, log: /tmp/tmo-qoder-review.log) / PASS References: Fixes #3126 --- .../apache/pekko/remote/AckedDelivery.scala | 26 +++++++------- .../org/apache/pekko/remote/Endpoint.scala | 34 ++++++++++++------- .../pekko/remote/AckedDeliverySpec.scala | 11 ++++++ 3 files changed, 46 insertions(+), 25 deletions(-) diff --git a/remote/src/main/scala/org/apache/pekko/remote/AckedDelivery.scala b/remote/src/main/scala/org/apache/pekko/remote/AckedDelivery.scala index f84c2c79166..16fc083272a 100644 --- a/remote/src/main/scala/org/apache/pekko/remote/AckedDelivery.scala +++ b/remote/src/main/scala/org/apache/pekko/remote/AckedDelivery.scala @@ -116,23 +116,25 @@ final case class AckedSendBuffer[T <: HasSequenceNumber]( /** * Processes an incoming acknowledgement and returns a new buffer with only unacknowledged elements remaining. + * Acknowledgements for sequence numbers newer than this buffer has seen are stale and return this buffer unchanged. * @param ack The received acknowledgement * @return An updated buffer containing the remaining unacknowledged messages */ def acknowledge(ack: Ack): AckedSendBuffer[T] = { - if (ack.cumulativeAck > maxSeq) - throw new IllegalArgumentException(s"Highest SEQ so far was $maxSeq but cumulative ACK is ${ack.cumulativeAck}") - val newNacked = - if (ack.nacks.isEmpty) Vector.empty + if (ack.cumulativeAck > maxSeq) this + else { + val newNacked = + if (ack.nacks.isEmpty) Vector.empty + else + (nacked ++ nonAcked).filter { m => + ack.nacks(m.seq) + } + if (newNacked.size < ack.nacks.size) throw new ResendUnfulfillableException else - (nacked ++ nonAcked).filter { m => - ack.nacks(m.seq) - } - if (newNacked.size < ack.nacks.size) throw new ResendUnfulfillableException - else - this.copy(nonAcked = nonAcked.filter { m => - m.seq > ack.cumulativeAck - }, nacked = newNacked) + this.copy(nonAcked = nonAcked.filter { m => + m.seq > ack.cumulativeAck + }, nacked = newNacked) + } } /** diff --git a/remote/src/main/scala/org/apache/pekko/remote/Endpoint.scala b/remote/src/main/scala/org/apache/pekko/remote/Endpoint.scala index 9da404cdadb..221fd47848b 100644 --- a/remote/src/main/scala/org/apache/pekko/remote/Endpoint.scala +++ b/remote/src/main/scala/org/apache/pekko/remote/Endpoint.scala @@ -366,20 +366,28 @@ private[remote] class ReliableDeliverySupervisor( case ack: Ack => // If we are not sure about the UID just ignore the ack. Ignoring is fine. if (uidConfirmed) { - try resendBuffer = resendBuffer.acknowledge(ack) - catch { - case NonFatal(e) => - throw new HopelessAssociation( - localAddress, - remoteAddress, - uid, - new IllegalStateException( - s"Error encountered while processing system message " + - s"acknowledgement buffer: $resendBuffer ack: $ack", - e)) - } + if (ack.cumulativeAck > resendBuffer.maxSeq) + log.warning( + "Ignoring stale system message acknowledgement from remote system [{}]. Highest SEQ so far was [{}] but cumulative ACK is [{}].", + remoteAddress, + resendBuffer.maxSeq, + ack.cumulativeAck) + else { + try resendBuffer = resendBuffer.acknowledge(ack) + catch { + case NonFatal(e) => + throw new HopelessAssociation( + localAddress, + remoteAddress, + uid, + new IllegalStateException( + s"Error encountered while processing system message " + + s"acknowledgement buffer: $resendBuffer ack: $ack", + e)) + } - resendNacked() + resendNacked() + } } case AttemptSysMsgRedelivery => if (uidConfirmed) resendAll() diff --git a/remote/src/test/scala/org/apache/pekko/remote/AckedDeliverySpec.scala b/remote/src/test/scala/org/apache/pekko/remote/AckedDeliverySpec.scala index 60494a661b9..cb3f22b62c8 100644 --- a/remote/src/test/scala/org/apache/pekko/remote/AckedDeliverySpec.scala +++ b/remote/src/test/scala/org/apache/pekko/remote/AckedDeliverySpec.scala @@ -151,6 +151,17 @@ class AckedDeliverySpec extends PekkoSpec { } + "ignore stale cumulative ack newer than the send buffer" in { + val emptyBuffer = new AckedSendBuffer[Sequenced](10) + emptyBuffer.acknowledge(Ack(SeqNo(2))) should ===(emptyBuffer) + emptyBuffer.acknowledge(Ack(SeqNo(2), nacks = Set(SeqNo(0)))) should ===(emptyBuffer) + + val buffer = emptyBuffer.buffer(msg(0)).buffer(msg(1)) + buffer.acknowledge(Ack(SeqNo(2))) should ===(buffer) + buffer.acknowledge(Ack(SeqNo(2), nacks = Set(SeqNo(0)))) should ===(buffer) + buffer.acknowledge(Ack(SeqNo(2))).acknowledge(Ack(SeqNo(0))).nonAcked should ===(Vector(msg(1))) + } + "keep NACKed messages in buffer if selective nacks are received" in { val b0 = new AckedSendBuffer[Sequenced](10) val msg0 = msg(0)