diff --git a/core/src/main/java/kafka/server/share/DelayedShareFetch.java b/core/src/main/java/kafka/server/share/DelayedShareFetch.java index bc8146910ad21..af37723c4219c 100644 --- a/core/src/main/java/kafka/server/share/DelayedShareFetch.java +++ b/core/src/main/java/kafka/server/share/DelayedShareFetch.java @@ -55,6 +55,7 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; @@ -263,6 +264,7 @@ private void completeLocalLogShareFetchRequest() { } private void processAcquiredTopicPartitionsForLocalLogFetch(LinkedHashMap topicPartitionData) { + List shareFetchPartitionDataList = new ArrayList<>(); try { LinkedHashMap responseData; if (localPartitionsAlreadyFetched.isEmpty()) @@ -276,7 +278,6 @@ private void processAcquiredTopicPartitionsForLocalLogFetch(LinkedHashMap shareFetchPartitionDataList = new ArrayList<>(); responseData.forEach((topicIdPartition, logReadResult) -> { if (logReadResult.info().delayedRemoteStorageFetch.isEmpty()) { shareFetchPartitionDataList.add(new ShareFetchPartitionData( @@ -298,7 +299,7 @@ private void processAcquiredTopicPartitionsForLocalLogFetch(LinkedHashMap topicIdPartitions) { - if (topicIdPartitions.isEmpty()) { + private Set partitionsWithData(List shareFetchPartitionDataList) { + if (shareFetchPartitionDataList == null || shareFetchPartitionDataList.isEmpty()) { + return Set.of(); + } + Set partitionsWithData = new HashSet<>(); + shareFetchPartitionDataList.forEach(shareFetchPartitionData -> { + if (shareFetchPartitionData.fetchPartitionData() != null && + shareFetchPartitionData.fetchPartitionData().records != null && + shareFetchPartitionData.fetchPartitionData().records.sizeInBytes() > 0) { + partitionsWithData.add(shareFetchPartitionData.topicIdPartition()); + } + }); + return partitionsWithData; + } + + private void releasePartitionLocksAndAddToActionQueue(Set allAcquiredTopicIdPartitions, + Set topicIdPartitionsWithData) { + if (allAcquiredTopicIdPartitions.isEmpty()) { + // topicIdPartitionsWithData set should be a subset of allAcquiredTopicIdPartitions, hence it is safe to return. return; } // Releasing the lock to move ahead with the next request in queue. - releasePartitionLocks(topicIdPartitions); - replicaManager.addToActionQueue(() -> topicIdPartitions.forEach(topicIdPartition -> { + releasePartitionLocks(allAcquiredTopicIdPartitions); + if (topicIdPartitionsWithData.isEmpty()) { + return; + } + replicaManager.addToActionQueue(() -> topicIdPartitionsWithData.forEach(topicIdPartition -> { // If we have a fetch request completed for a share-partition, we release the locks for that partition, // then we should check if there is a pending share fetch request for the share-partition and complete it. // We add the action to delayed actions queue to avoid an infinite call stack, which could happen if @@ -844,15 +865,15 @@ private void releasePartitionLocksAndAddToActionQueue(Set topi */ private void completeRemoteStorageShareFetchRequest() { LinkedHashMap acquiredNonRemoteFetchTopicPartitionData = new LinkedHashMap<>(); + List shareFetchPartitionDataList = new ArrayList<>(); try { - List shareFetchPartitionData = new ArrayList<>(); int readableBytes = 0; for (RemoteFetch remoteFetch : pendingRemoteFetchesOpt.get().remoteFetches()) { if (remoteFetch.remoteFetchResult().isDone()) { RemoteLogReadResult remoteLogReadResult = remoteFetch.remoteFetchResult().get(); if (remoteLogReadResult.error().isPresent()) { // If there is any error for the remote fetch topic partition, we populate the error accordingly. - shareFetchPartitionData.add( + shareFetchPartitionDataList.add( new ShareFetchPartitionData( remoteFetch.topicIdPartition(), partitionsAcquired.get(remoteFetch.topicIdPartition()), @@ -863,7 +884,7 @@ private void completeRemoteStorageShareFetchRequest() { FetchDataInfo info = remoteLogReadResult.fetchDataInfo().get(); TopicIdPartition topicIdPartition = remoteFetch.topicIdPartition(); LogReadResult logReadResult = remoteFetch.logReadResult(); - shareFetchPartitionData.add( + shareFetchPartitionDataList.add( new ShareFetchPartitionData( topicIdPartition, partitionsAcquired.get(remoteFetch.topicIdPartition()), @@ -907,7 +928,7 @@ private void completeRemoteStorageShareFetchRequest() { resetFetchOffsetMetadataForRemoteFetchPartitions(acquiredNonRemoteFetchTopicPartitionData, responseData); for (Map.Entry entry : responseData.entrySet()) { if (entry.getValue().info().delayedRemoteStorageFetch.isEmpty()) { - shareFetchPartitionData.add( + shareFetchPartitionDataList.add( new ShareFetchPartitionData( entry.getKey(), acquiredNonRemoteFetchTopicPartitionData.get(entry.getKey()), @@ -925,7 +946,7 @@ private void completeRemoteStorageShareFetchRequest() { shareGroupMetrics.recordTopicPartitionsFetchRatio(shareFetch.groupId(), (int) (acquiredRatio * 100)); Map remoteFetchResponse = ShareFetchUtils.processFetchResponse( - shareFetch, shareFetchPartitionData, sharePartitions, replicaManager, exceptionHandler); + shareFetch, shareFetchPartitionDataList, sharePartitions, replicaManager, exceptionHandler); shareFetch.maybeComplete(remoteFetchResponse); log.trace("Remote share fetch request completed successfully, response: {}", remoteFetchResponse); } catch (InterruptedException | ExecutionException e) { @@ -937,7 +958,7 @@ private void completeRemoteStorageShareFetchRequest() { } finally { Set topicIdPartitions = new LinkedHashSet<>(partitionsAcquired.keySet()); topicIdPartitions.addAll(acquiredNonRemoteFetchTopicPartitionData.keySet()); - releasePartitionLocksAndAddToActionQueue(topicIdPartitions); + releasePartitionLocksAndAddToActionQueue(topicIdPartitions, partitionsWithData(shareFetchPartitionDataList)); } } diff --git a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java index 0f5c3b2a545e5..e34589f5a7b7d 100644 --- a/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java +++ b/core/src/test/java/kafka/server/share/DelayedShareFetchTest.java @@ -86,6 +86,7 @@ import static kafka.server.share.PendingRemoteFetches.RemoteFetch; import static kafka.server.share.SharePartitionManagerTest.DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL; import static kafka.server.share.SharePartitionManagerTest.REMOTE_FETCH_MAX_WAIT_MS; +import static kafka.server.share.SharePartitionManagerTest.buildEmptyLogReadResult; import static kafka.server.share.SharePartitionManagerTest.buildLogReadResult; import static kafka.server.share.SharePartitionManagerTest.mockReplicaManagerDelayedShareFetch; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -700,6 +701,112 @@ public void testForceCompleteTriggersDelayedActionsQueue() { } } + @Test + public void testForceCompleteNotTriggersDelayedActionsQueueWhenFetchDataIsEmpty() { + String groupId = "grp"; + Uuid topicId = Uuid.randomUuid(); + ReplicaManager replicaManager = mock(ReplicaManager.class); + TopicIdPartition tp0 = new TopicIdPartition(topicId, new TopicPartition("foo", 0)); + TopicIdPartition tp1 = new TopicIdPartition(topicId, new TopicPartition("foo", 1)); + TopicIdPartition tp2 = new TopicIdPartition(topicId, new TopicPartition("foo", 2)); + List topicIdPartitions1 = List.of(tp0, tp1); + + SharePartition sp0 = mock(SharePartition.class); + SharePartition sp1 = mock(SharePartition.class); + SharePartition sp2 = mock(SharePartition.class); + + LinkedHashMap sharePartitions1 = new LinkedHashMap<>(); + sharePartitions1.put(tp0, sp0); + sharePartitions1.put(tp1, sp1); + sharePartitions1.put(tp2, sp2); + + ShareFetch shareFetch1 = new ShareFetch(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(), + new CompletableFuture<>(), topicIdPartitions1, BATCH_OPTIMIZED, BATCH_SIZE, MAX_FETCH_RECORDS, + BROKER_TOPIC_STATS); + + DelayedOperationPurgatory delayedShareFetchPurgatory = new DelayedOperationPurgatory<>( + "TestShareFetch", mockTimer, replicaManager.localBrokerId(), + DELAYED_SHARE_FETCH_PURGATORY_PURGE_INTERVAL, false, true); + mockReplicaManagerDelayedShareFetch(replicaManager, delayedShareFetchPurgatory); + + List delayedShareFetchWatchKeys = new ArrayList<>(); + topicIdPartitions1.forEach(topicIdPartition -> delayedShareFetchWatchKeys.add(new DelayedShareFetchGroupKey(groupId, topicIdPartition.topicId(), topicIdPartition.partition()))); + + Uuid fetchId1 = Uuid.randomUuid(); + DelayedShareFetch delayedShareFetch1 = DelayedShareFetchTest.DelayedShareFetchBuilder.builder() + .withShareFetchData(shareFetch1) + .withReplicaManager(replicaManager) + .withSharePartitions(sharePartitions1) + .withFetchId(fetchId1) + .build(); + + // No share partition is available for acquiring initially. + when(sp0.maybeAcquireFetchLock(fetchId1)).thenReturn(false); + when(sp1.maybeAcquireFetchLock(fetchId1)).thenReturn(false); + when(sp2.maybeAcquireFetchLock(fetchId1)).thenReturn(false); + + // We add a delayed share fetch entry to the purgatory which will be waiting for completion since neither of the + // partitions in the share fetch request can be acquired. + delayedShareFetchPurgatory.tryCompleteElseWatch(delayedShareFetch1, delayedShareFetchWatchKeys); + + assertEquals(2, delayedShareFetchPurgatory.watched()); + assertFalse(shareFetch1.isCompleted()); + assertTrue(delayedShareFetch1.lock().tryLock()); + delayedShareFetch1.lock().unlock(); + + ShareFetch shareFetch2 = new ShareFetch(FETCH_PARAMS, groupId, Uuid.randomUuid().toString(), + new CompletableFuture<>(), List.of(tp0, tp1), BATCH_OPTIMIZED, BATCH_SIZE, MAX_FETCH_RECORDS, + BROKER_TOPIC_STATS); + + doAnswer(invocation -> buildEmptyLogReadResult(List.of(tp1))).when(replicaManager).readFromLog( + any(), any(), any(ReplicaQuota.class), anyBoolean()); + + PartitionMaxBytesStrategy partitionMaxBytesStrategy = mockPartitionMaxBytes(Set.of(tp1)); + + LinkedHashMap sharePartitions2 = new LinkedHashMap<>(); + sharePartitions2.put(tp0, sp0); + sharePartitions2.put(tp1, sp1); + sharePartitions2.put(tp2, sp2); + + Uuid fetchId2 = Uuid.randomUuid(); + BiConsumer exceptionHandler = mockExceptionHandler(); + DelayedShareFetch delayedShareFetch2 = spy(DelayedShareFetchBuilder.builder() + .withShareFetchData(shareFetch2) + .withReplicaManager(replicaManager) + .withSharePartitions(sharePartitions2) + .withPartitionMaxBytesStrategy(partitionMaxBytesStrategy) + .withFetchId(fetchId2) + .withExceptionHandler(exceptionHandler) + .build()); + + // sp1 can be acquired now. + when(sp1.maybeAcquireFetchLock(fetchId2)).thenReturn(true); + when(sp1.canAcquireRecords()).thenReturn(true); + try (MockedStatic mockedShareFetchUtils = Mockito.mockStatic(ShareFetchUtils.class, Mockito.CALLS_REAL_METHODS)) { + ShareFetchResponseData.PartitionData mockedPartitionData = mock(ShareFetchResponseData.PartitionData.class); + // Empty fetched data. + when(mockedPartitionData.records()).thenReturn(MemoryRecords.EMPTY); + mockedShareFetchUtils.when(() -> ShareFetchUtils.processFetchResponse(any(), any(), any(), any(), any())) + .thenReturn(Map.of(tp0, mockedPartitionData)); + + // when forceComplete is called for delayedShareFetch2, since tp1 is common in between delayed share fetch + // requests, it will not add a "check and complete" action for request key tp1 on the purgatory since no + // data was returned in tp1. + delayedShareFetch2.forceComplete(); + assertTrue(delayedShareFetch2.isCompleted()); + assertTrue(shareFetch2.isCompleted()); + Mockito.verify(exceptionHandler, never()).accept(any(), any()); + Mockito.verify(replicaManager, times(1)).readFromLog( + any(), any(), any(ReplicaQuota.class), anyBoolean()); + assertFalse(delayedShareFetch1.isCompleted()); + Mockito.verify(replicaManager, times(0)).addToActionQueue(any()); + Mockito.verify(replicaManager, times(0)).tryCompleteActions(); + Mockito.verify(delayedShareFetch2, times(1)).releasePartitionLocks(any()); + assertTrue(delayedShareFetch2.lock().tryLock()); + delayedShareFetch2.lock().unlock(); + } + } + @Test public void testCombineLogReadResponse() { String groupId = "grp"; @@ -2162,6 +2269,91 @@ public void testRemoteStorageFetchCompletionPostRegisteringCallbackByTimerTaskCo } } + @Test + public void testRemoteStorageFetchCompletionNotTriggerActionsQueue() { + ReplicaManager replicaManager = mock(ReplicaManager.class); + TopicIdPartition tp0 = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0)); + SharePartition sp0 = mock(SharePartition.class); + + when(sp0.canAcquireRecords()).thenReturn(true); + when(sp0.nextFetchOffset()).thenReturn(10L); + + LinkedHashMap sharePartitions = new LinkedHashMap<>(); + sharePartitions.put(tp0, sp0); + + CompletableFuture> future = new CompletableFuture<>(); + ShareFetch shareFetch = new ShareFetch(FETCH_PARAMS, "grp", Uuid.randomUuid().toString(), + future, List.of(tp0), BATCH_OPTIMIZED, BATCH_SIZE, MAX_FETCH_RECORDS, + BROKER_TOPIC_STATS); + + PendingRemoteFetches pendingRemoteFetches = mock(PendingRemoteFetches.class); + Uuid fetchId = Uuid.randomUuid(); + DelayedShareFetch delayedShareFetch = spy(DelayedShareFetchBuilder.builder() + .withShareFetchData(shareFetch) + .withReplicaManager(replicaManager) + .withSharePartitions(sharePartitions) + .withPartitionMaxBytesStrategy(PartitionMaxBytesStrategy.type(PartitionMaxBytesStrategy.StrategyType.UNIFORM)) + .withPendingRemoteFetches(pendingRemoteFetches) + .withFetchId(fetchId) + .build()); + + LinkedHashMap partitionsAcquired = new LinkedHashMap<>(); + partitionsAcquired.put(tp0, 10L); + // Manually update acquired partitions maps. + delayedShareFetch.updatePartitionsAcquired(partitionsAcquired); + + // Mock remote fetch result. + RemoteFetch remoteFetch = mock(RemoteFetch.class); + when(remoteFetch.topicIdPartition()).thenReturn(tp0); + when(remoteFetch.remoteFetchResult()).thenReturn(CompletableFuture.completedFuture( + new RemoteLogReadResult(Optional.of(REMOTE_FETCH_INFO), Optional.empty())) + ); + when(remoteFetch.logReadResult()).thenReturn(new LogReadResult( + REMOTE_FETCH_INFO, + Optional.empty(), + -1L, + -1L, + -1L, + -1L, + -1L, + OptionalLong.empty(), + OptionalInt.empty(), + Errors.NONE + )); + when(pendingRemoteFetches.remoteFetches()).thenReturn(List.of(remoteFetch)); + when(pendingRemoteFetches.isDone()).thenReturn(false); + + doAnswer(invocationOnMock -> null).when(pendingRemoteFetches).invokeCallbackOnCompletion(any()); + // Mock the behaviour of replica manager such that remote storage fetch completion timer task completes on adding it to the watch queue. + doAnswer(invocationOnMock -> { + TimerTask timerTask = invocationOnMock.getArgument(0); + timerTask.run(); + return null; + }).when(replicaManager).addShareFetchTimerRequest(any()); + + try (MockedStatic mockedShareFetchUtils = Mockito.mockStatic(ShareFetchUtils.class)) { + Map partitionDataMap = new LinkedHashMap<>(); + // 0 bytes fetched data for tp0. + ShareFetchResponseData.PartitionData mockedPartitionData0 = mock(ShareFetchResponseData.PartitionData.class); + when(mockedPartitionData0.records()).thenReturn(MemoryRecords.EMPTY); + partitionDataMap.put(tp0, mockedPartitionData0); + mockedShareFetchUtils.when(() -> ShareFetchUtils.processFetchResponse(any(), any(), any(), any(), any())).thenReturn(partitionDataMap); + + assertFalse(delayedShareFetch.isCompleted()); + delayedShareFetch.forceComplete(); + assertTrue(delayedShareFetch.isCompleted()); + // The future of shareFetch completes. + assertTrue(shareFetch.isCompleted()); + assertEquals(Set.of(tp0), future.join().keySet()); + // Verify the locks are released for tp0 but not added to action queue since no records are fetched from remote storage. + Mockito.verify(delayedShareFetch, times(1)).releasePartitionLocks(Set.of(tp0)); + Mockito.verify(replicaManager, times(0)).addToActionQueue(any()); + assertTrue(delayedShareFetch.outsidePurgatoryCallbackLock()); + assertTrue(delayedShareFetch.lock().tryLock()); + delayedShareFetch.lock().unlock(); + } + } + static void mockTopicIdPartitionToReturnDataEqualToMinBytes(ReplicaManager replicaManager, TopicIdPartition topicIdPartition, int minBytes) { LogOffsetMetadata hwmOffsetMetadata = new LogOffsetMetadata(1, 1, minBytes); LogOffsetSnapshot endOffsetSnapshot = new LogOffsetSnapshot(1, mock(LogOffsetMetadata.class), diff --git a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java index d4699b5dc0999..c6c96f79230c2 100644 --- a/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java +++ b/core/src/test/java/kafka/server/share/SharePartitionManagerTest.java @@ -44,6 +44,7 @@ import org.apache.kafka.common.protocol.ObjectSerializationCache; import org.apache.kafka.common.record.internal.FileRecords; import org.apache.kafka.common.record.internal.MemoryRecords; +import org.apache.kafka.common.record.internal.Records; import org.apache.kafka.common.record.internal.SimpleRecord; import org.apache.kafka.common.requests.FetchRequest; import org.apache.kafka.common.requests.ShareFetchResponse; @@ -3185,10 +3186,18 @@ private void validateBrokerTopicStatsMetrics( } static Seq> buildLogReadResult(List topicIdPartitions) { + return buildLogReadResult(topicIdPartitions, MemoryRecords.withRecords( + Compression.NONE, new SimpleRecord("test-key".getBytes(), "test-value".getBytes()))); + } + + static Seq> buildEmptyLogReadResult(List topicIdPartitions) { + return buildLogReadResult(topicIdPartitions, MemoryRecords.EMPTY); + } + + static Seq> buildLogReadResult(List topicIdPartitions, Records records) { List> logReadResults = new ArrayList<>(); topicIdPartitions.forEach(topicIdPartition -> logReadResults.add(new Tuple2<>(topicIdPartition, new LogReadResult( - new FetchDataInfo(new LogOffsetMetadata(0, 0, 0), MemoryRecords.withRecords( - Compression.NONE, new SimpleRecord("test-key".getBytes(), "test-value".getBytes()))), + new FetchDataInfo(new LogOffsetMetadata(0, 0, 0), records), Optional.empty(), -1L, -1L,