diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 4a1a3d12ab075..c0a870ea29a89 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -3281,7 +3281,7 @@ void advanceCursorsIfNecessary(List ledgersToDelete) throws LedgerNo && highestPositionToDelete.compareTo(cursor.getManagedLedger() .getLastConfirmedEntry()) <= 0 && !(!cursor.isDurable() && cursor instanceof NonDurableCursorImpl && ((NonDurableCursorImpl) cursor).isReadCompacted())) { - cursor.asyncMarkDelete(highestPositionToDelete, cursor.getProperties(), new MarkDeleteCallback() { + cursor.asyncMarkDelete(highestPositionToDelete, null, new MarkDeleteCallback() { @Override public void markDeleteComplete(Object ctx) { } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index bef94ecec5e64..5dbdcaa71e8b5 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -26,6 +26,7 @@ import static org.mockito.ArgumentMatchers.anyMap; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.nullable; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; @@ -5409,4 +5410,65 @@ public void addFailed(ManagedLedgerException exception, Object ctx) { // Verify properties are preserved after cursor reset assertEquals(cursor.getProperties(), expectedProperties); } + + @Test + @SuppressWarnings("unchecked") + public void testAdvanceCursorsIfNecessaryNeverLoseMarkDeleteProperties() throws Exception { + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setMaxEntriesPerLedger(1); + config.setRetentionTime(0, TimeUnit.SECONDS); + config.setRetentionSizeInMB(0); + + @Cleanup + ManagedLedgerImpl ledger = + (ManagedLedgerImpl) factory.open("testAdvanceCursorsIfNecessaryNeverLoseMarkDeleteProperties", config); + @Cleanup + ManagedCursorImpl durableCursor = (ManagedCursorImpl) ledger.openCursor("durableCursor1"); + @Cleanup + NonDurableCursorImpl realNonDurableCursor = + (NonDurableCursorImpl) ledger.newNonDurableCursor(PositionFactory.EARLIEST); + NonDurableCursorImpl nonDurableCursor = spy(realNonDurableCursor); + + ledger.getCursors().removeCursor(realNonDurableCursor.getName()); + ledger.getCursors().add(nonDurableCursor, null); + + CountDownLatch advanceCursorsMarkDeleteEnteredLatch = new CountDownLatch(1); + CountDownLatch nonDurableCursorsMarkDeleteCompletedLatch = new CountDownLatch(1); + CountDownLatch advanceCursorsMarkDeleteCompletedLatch = new CountDownLatch(1); + + doAnswer(invocation -> { + Map invocationProperties = invocation.getArgument(1); + // Pause the advanceCursorsIfNecessary mark-delete so the nonDurableCursor markDelete() can complete first. + if (invocationProperties == null || invocationProperties.isEmpty()) { + advanceCursorsMarkDeleteEnteredLatch.countDown(); + assertTrue(nonDurableCursorsMarkDeleteCompletedLatch.await(5, TimeUnit.SECONDS)); + try { + return invocation.callRealMethod(); + } finally { + advanceCursorsMarkDeleteCompletedLatch.countDown(); + } + } + + return invocation.callRealMethod(); + }).when(nonDurableCursor) + .internalAsyncMarkDelete(any(Position.class), nullable(Map.class), any(MarkDeleteCallback.class), + nullable(Object.class), nullable(Runnable.class)); + + ledger.addEntry("entry-1".getBytes(Encoding)); + Position pos2 = ledger.addEntry("entry-2".getBytes(Encoding)); + + // Mark-delete the durable cursor to trigger trimming, which advances non-durable cursors. + durableCursor.markDelete(pos2); + assertTrue(advanceCursorsMarkDeleteEnteredLatch.await(5, TimeUnit.SECONDS)); + + String propertyKey = "test-property"; + Map properties = new HashMap<>(); + properties.put(propertyKey, 1L); + nonDurableCursor.markDelete(pos2, properties); + nonDurableCursorsMarkDeleteCompletedLatch.countDown(); + + assertTrue(advanceCursorsMarkDeleteCompletedLatch.await(5, TimeUnit.SECONDS)); + assertEquals(nonDurableCursor.getMarkDeletedPosition(), pos2); + assertEquals(nonDurableCursor.getProperties(), properties); + } }