Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 14 additions & 12 deletions remote/src/main/scala/org/apache/pekko/remote/AckedDelivery.scala
Original file line number Diff line number Diff line change
Expand Up @@ -116,23 +116,25 @@ final case class AckedSendBuffer[T <: HasSequenceNumber](

/**
* Processes an incoming acknowledgement and returns a new buffer with only unacknowledged elements remaining.
* Acknowledgements for sequence numbers newer than this buffer has seen are stale and return this buffer unchanged.
* @param ack The received acknowledgement
* @return An updated buffer containing the remaining unacknowledged messages
*/
def acknowledge(ack: Ack): AckedSendBuffer[T] = {
if (ack.cumulativeAck > maxSeq)
throw new IllegalArgumentException(s"Highest SEQ so far was $maxSeq but cumulative ACK is ${ack.cumulativeAck}")
val newNacked =
if (ack.nacks.isEmpty) Vector.empty
if (ack.cumulativeAck > maxSeq) this
else {
val newNacked =
if (ack.nacks.isEmpty) Vector.empty
else
(nacked ++ nonAcked).filter { m =>
ack.nacks(m.seq)
}
if (newNacked.size < ack.nacks.size) throw new ResendUnfulfillableException
else
(nacked ++ nonAcked).filter { m =>
ack.nacks(m.seq)
}
if (newNacked.size < ack.nacks.size) throw new ResendUnfulfillableException
else
this.copy(nonAcked = nonAcked.filter { m =>
m.seq > ack.cumulativeAck
}, nacked = newNacked)
this.copy(nonAcked = nonAcked.filter { m =>
m.seq > ack.cumulativeAck
}, nacked = newNacked)
}
}

/**
Expand Down
34 changes: 21 additions & 13 deletions remote/src/main/scala/org/apache/pekko/remote/Endpoint.scala
Original file line number Diff line number Diff line change
Expand Up @@ -366,20 +366,28 @@ private[remote] class ReliableDeliverySupervisor(
case ack: Ack =>
// If we are not sure about the UID just ignore the ack. Ignoring is fine.
if (uidConfirmed) {
try resendBuffer = resendBuffer.acknowledge(ack)
catch {
case NonFatal(e) =>
throw new HopelessAssociation(
localAddress,
remoteAddress,
uid,
new IllegalStateException(
s"Error encountered while processing system message " +
s"acknowledgement buffer: $resendBuffer ack: $ack",
e))
}
if (ack.cumulativeAck > resendBuffer.maxSeq)
log.warning(
"Ignoring stale system message acknowledgement from remote system [{}]. Highest SEQ so far was [{}] but cumulative ACK is [{}].",
remoteAddress,
resendBuffer.maxSeq,
ack.cumulativeAck)
else {
try resendBuffer = resendBuffer.acknowledge(ack)
catch {
case NonFatal(e) =>
throw new HopelessAssociation(
localAddress,
remoteAddress,
uid,
new IllegalStateException(
s"Error encountered while processing system message " +
s"acknowledgement buffer: $resendBuffer ack: $ack",
e))
}

resendNacked()
resendNacked()
}
}
case AttemptSysMsgRedelivery =>
if (uidConfirmed) resendAll()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,17 @@ class AckedDeliverySpec extends PekkoSpec {

}

"ignore stale cumulative ack newer than the send buffer" in {
val emptyBuffer = new AckedSendBuffer[Sequenced](10)
emptyBuffer.acknowledge(Ack(SeqNo(2))) should ===(emptyBuffer)
emptyBuffer.acknowledge(Ack(SeqNo(2), nacks = Set(SeqNo(0)))) should ===(emptyBuffer)

val buffer = emptyBuffer.buffer(msg(0)).buffer(msg(1))
buffer.acknowledge(Ack(SeqNo(2))) should ===(buffer)
buffer.acknowledge(Ack(SeqNo(2), nacks = Set(SeqNo(0)))) should ===(buffer)
buffer.acknowledge(Ack(SeqNo(2))).acknowledge(Ack(SeqNo(0))).nonAcked should ===(Vector(msg(1)))
}

"keep NACKed messages in buffer if selective nacks are received" in {
val b0 = new AckedSendBuffer[Sequenced](10)
val msg0 = msg(0)
Expand Down