From 7107c7f97cf71d43a6765b68b47f996a7117e4bf Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Sat, 16 May 2026 21:33:51 +0800 Subject: [PATCH 1/6] Fix advanceCursorsIfNecessary() method may lose mark-delete properties --- .../mledger/impl/ManagedLedgerImpl.java | 2 +- .../mledger/impl/ManagedLedgerTest.java | 67 +++++++++++++++++++ 2 files changed, 68 insertions(+), 1 deletion(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 4a1a3d12ab075..c0a870ea29a89 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -3281,7 +3281,7 @@ void advanceCursorsIfNecessary(List ledgersToDelete) throws LedgerNo && highestPositionToDelete.compareTo(cursor.getManagedLedger() .getLastConfirmedEntry()) <= 0 && !(!cursor.isDurable() && cursor instanceof NonDurableCursorImpl && ((NonDurableCursorImpl) cursor).isReadCompacted())) { - cursor.asyncMarkDelete(highestPositionToDelete, cursor.getProperties(), new MarkDeleteCallback() { + cursor.asyncMarkDelete(highestPositionToDelete, null, new MarkDeleteCallback() { @Override public void markDeleteComplete(Object ctx) { } diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index bef94ecec5e64..13c8d2ce7d882 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -26,6 +26,7 @@ import static org.mockito.ArgumentMatchers.anyMap; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.nullable; import static org.mockito.Mockito.atLeast; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; @@ -5409,4 +5410,70 @@ public void addFailed(ManagedLedgerException exception, Object ctx) { // Verify properties are preserved after cursor reset assertEquals(cursor.getProperties(), expectedProperties); } + + @Test + @SuppressWarnings("unchecked") + public void testAdvanceCursorsIfNecessaryNeverLoseMarkDeleteProperties() throws Exception { + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setMaxEntriesPerLedger(1); + config.setRetentionTime(0, TimeUnit.SECONDS); + config.setRetentionSizeInMB(0); + + @Cleanup + ManagedLedgerImpl ledger = + (ManagedLedgerImpl) factory.open("testAdvanceCursorsIfNecessaryNeverLoseMarkDeleteProperties", config); + @Cleanup + ManagedCursorImpl durableCursor = (ManagedCursorImpl) ledger.openCursor("durableCursor1"); + @Cleanup + NonDurableCursorImpl realNonDurableCursor = + (NonDurableCursorImpl) ledger.newNonDurableCursor(PositionFactory.EARLIEST); + NonDurableCursorImpl nonDurableCursor = spy(realNonDurableCursor); + + ledger.getCursors().removeCursor(realNonDurableCursor.getName()); + ledger.getCursors().add(nonDurableCursor, null); + + CountDownLatch advanceCursorsMarkDeleteCompletedLatch = new CountDownLatch(1); + CountDownLatch nonDurableCursorMarkDeleteCompletedLatch = new CountDownLatch(1); + + doAnswer(invocation -> { + Map invocationProperties = invocation.getArgument(1); + + // Let the user-triggered nonDurableCursor.markDelete() with properties complete first. + if (invocationProperties != null && invocationProperties.size() == 1) { + try { + return invocation.callRealMethod(); + } finally { + nonDurableCursorMarkDeleteCompletedLatch.countDown(); + } + } + + // Then let the trim-triggered advanceCursorsIfNecessary() mark-delete proceed. + if (invocationProperties == null || invocationProperties.isEmpty()) { + nonDurableCursorMarkDeleteCompletedLatch.await(); + try { + return invocation.callRealMethod(); + } finally { + advanceCursorsMarkDeleteCompletedLatch.countDown(); + } + } + + return invocation.callRealMethod(); + }).when(nonDurableCursor) + .internalAsyncMarkDelete(any(Position.class), nullable(Map.class), any(MarkDeleteCallback.class), + nullable(Object.class), nullable(Runnable.class)); + + ledger.addEntry("entry-1".getBytes(Encoding)); + Position pos2 = ledger.addEntry("entry-2".getBytes(Encoding)); + + // Mark-delete the durable cursor to trigger trimming, which advances non-durable cursors. + durableCursor.markDelete(pos2); + + String propertyKey = "test-property"; + Map properties = new HashMap<>(); + properties.put(propertyKey, 1L); + nonDurableCursor.markDelete(pos2, properties); + + advanceCursorsMarkDeleteCompletedLatch.await(); + assertEquals(nonDurableCursor.getProperties(), properties); + } } From fd3edce6881731705da0d2930edda58f09a55708 Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Sun, 17 May 2026 08:56:16 +0800 Subject: [PATCH 2/6] Add mark-delete position assertion --- .../org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 13c8d2ce7d882..5105a5fa58a0a 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -5411,7 +5411,7 @@ public void addFailed(ManagedLedgerException exception, Object ctx) { assertEquals(cursor.getProperties(), expectedProperties); } - @Test + @Test(invocationCount = 100) @SuppressWarnings("unchecked") public void testAdvanceCursorsIfNecessaryNeverLoseMarkDeleteProperties() throws Exception { ManagedLedgerConfig config = new ManagedLedgerConfig(); @@ -5474,6 +5474,7 @@ public void testAdvanceCursorsIfNecessaryNeverLoseMarkDeleteProperties() throws nonDurableCursor.markDelete(pos2, properties); advanceCursorsMarkDeleteCompletedLatch.await(); + assertEquals(nonDurableCursor.getMarkDeletedPosition(), pos2); assertEquals(nonDurableCursor.getProperties(), properties); } } From 27a7969386c99363debe171a3ce88f105c057742 Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Mon, 18 May 2026 10:33:58 +0800 Subject: [PATCH 3/6] Remove invocation count --- .../org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 5105a5fa58a0a..16fb1df2f8379 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -5411,7 +5411,7 @@ public void addFailed(ManagedLedgerException exception, Object ctx) { assertEquals(cursor.getProperties(), expectedProperties); } - @Test(invocationCount = 100) + @Test @SuppressWarnings("unchecked") public void testAdvanceCursorsIfNecessaryNeverLoseMarkDeleteProperties() throws Exception { ManagedLedgerConfig config = new ManagedLedgerConfig(); From 58009ab530b89a9c7c3337bd59be0a326249e400 Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Mon, 18 May 2026 11:24:25 +0800 Subject: [PATCH 4/6] Fix advanceCursorsIfNecessary mark-delete execution order --- .../mledger/impl/ManagedLedgerTest.java | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 16fb1df2f8379..90e8b7de5130f 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -5432,24 +5432,20 @@ public void testAdvanceCursorsIfNecessaryNeverLoseMarkDeleteProperties() throws ledger.getCursors().removeCursor(realNonDurableCursor.getName()); ledger.getCursors().add(nonDurableCursor, null); + CountDownLatch advanceCursorsMarkDeleteEnteredLatch = new CountDownLatch(1); + CountDownLatch nonDurableCursorsMarkDeleteCompletedLatch = new CountDownLatch(1); CountDownLatch advanceCursorsMarkDeleteCompletedLatch = new CountDownLatch(1); - CountDownLatch nonDurableCursorMarkDeleteCompletedLatch = new CountDownLatch(1); doAnswer(invocation -> { Map invocationProperties = invocation.getArgument(1); - // Let the user-triggered nonDurableCursor.markDelete() with properties complete first. if (invocationProperties != null && invocationProperties.size() == 1) { - try { - return invocation.callRealMethod(); - } finally { - nonDurableCursorMarkDeleteCompletedLatch.countDown(); - } + return invocation.callRealMethod(); } - // Then let the trim-triggered advanceCursorsIfNecessary() mark-delete proceed. if (invocationProperties == null || invocationProperties.isEmpty()) { - nonDurableCursorMarkDeleteCompletedLatch.await(); + advanceCursorsMarkDeleteEnteredLatch.countDown(); + assertTrue(nonDurableCursorsMarkDeleteCompletedLatch.await(5, TimeUnit.SECONDS)); try { return invocation.callRealMethod(); } finally { @@ -5467,13 +5463,15 @@ public void testAdvanceCursorsIfNecessaryNeverLoseMarkDeleteProperties() throws // Mark-delete the durable cursor to trigger trimming, which advances non-durable cursors. durableCursor.markDelete(pos2); + assertTrue(advanceCursorsMarkDeleteEnteredLatch.await(5, TimeUnit.SECONDS)); String propertyKey = "test-property"; Map properties = new HashMap<>(); properties.put(propertyKey, 1L); nonDurableCursor.markDelete(pos2, properties); + nonDurableCursorsMarkDeleteCompletedLatch.countDown(); - advanceCursorsMarkDeleteCompletedLatch.await(); + assertTrue(advanceCursorsMarkDeleteCompletedLatch.await(5, TimeUnit.SECONDS)); assertEquals(nonDurableCursor.getMarkDeletedPosition(), pos2); assertEquals(nonDurableCursor.getProperties(), properties); } From 931f7b9e64779fd29dcb29c75787b5669902e86f Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Mon, 18 May 2026 12:59:01 +0800 Subject: [PATCH 5/6] Optimize test --- .../apache/bookkeeper/mledger/impl/ManagedLedgerTest.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 90e8b7de5130f..d02199aa13fd5 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -5438,11 +5438,6 @@ public void testAdvanceCursorsIfNecessaryNeverLoseMarkDeleteProperties() throws doAnswer(invocation -> { Map invocationProperties = invocation.getArgument(1); - - if (invocationProperties != null && invocationProperties.size() == 1) { - return invocation.callRealMethod(); - } - if (invocationProperties == null || invocationProperties.isEmpty()) { advanceCursorsMarkDeleteEnteredLatch.countDown(); assertTrue(nonDurableCursorsMarkDeleteCompletedLatch.await(5, TimeUnit.SECONDS)); From 3c5f3b0f632f9a41a847dadadbf5521c0043e77b Mon Sep 17 00:00:00 2001 From: Oneby Wang <891734032@qq.com> Date: Mon, 18 May 2026 13:02:40 +0800 Subject: [PATCH 6/6] Add comments --- .../org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index d02199aa13fd5..5dbdcaa71e8b5 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -5438,6 +5438,7 @@ public void testAdvanceCursorsIfNecessaryNeverLoseMarkDeleteProperties() throws doAnswer(invocation -> { Map invocationProperties = invocation.getArgument(1); + // Pause the advanceCursorsIfNecessary mark-delete so the nonDurableCursor markDelete() can complete first. if (invocationProperties == null || invocationProperties.isEmpty()) { advanceCursorsMarkDeleteEnteredLatch.countDown(); assertTrue(nonDurableCursorsMarkDeleteCompletedLatch.await(5, TimeUnit.SECONDS));