Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 35 additions & 14 deletions core/src/main/java/kafka/server/share/DelayedShareFetch.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
Expand Down Expand Up @@ -263,6 +264,7 @@ private void completeLocalLogShareFetchRequest() {
}

private void processAcquiredTopicPartitionsForLocalLogFetch(LinkedHashMap<TopicIdPartition, Long> topicPartitionData) {
List<ShareFetchPartitionData> shareFetchPartitionDataList = new ArrayList<>();
try {
LinkedHashMap<TopicIdPartition, LogReadResult> responseData;
if (localPartitionsAlreadyFetched.isEmpty())
Expand All @@ -276,7 +278,6 @@ private void processAcquiredTopicPartitionsForLocalLogFetch(LinkedHashMap<TopicI

resetFetchOffsetMetadataForRemoteFetchPartitions(topicPartitionData, responseData);

List<ShareFetchPartitionData> shareFetchPartitionDataList = new ArrayList<>();
responseData.forEach((topicIdPartition, logReadResult) -> {
if (logReadResult.info().delayedRemoteStorageFetch.isEmpty()) {
shareFetchPartitionDataList.add(new ShareFetchPartitionData(
Expand All @@ -298,7 +299,7 @@ private void processAcquiredTopicPartitionsForLocalLogFetch(LinkedHashMap<TopicI
log.error("Error processing delayed share fetch request", e);
handleFetchException(shareFetch, topicPartitionData.keySet(), e);
} finally {
releasePartitionLocksAndAddToActionQueue(topicPartitionData.keySet());
releasePartitionLocksAndAddToActionQueue(topicPartitionData.keySet(), partitionsWithData(shareFetchPartitionDataList));
}
}

Expand Down Expand Up @@ -691,7 +692,7 @@ private boolean maybeProcessRemoteFetch(
});
// Release fetch lock for the topic partitions that were acquired but were not a part of remote fetch and add
// them to the delayed actions queue.
releasePartitionLocksAndAddToActionQueue(nonRemoteFetchTopicPartitions);
releasePartitionLocksAndAddToActionQueue(nonRemoteFetchTopicPartitions, nonRemoteFetchTopicPartitions);
processRemoteFetchOrException(remoteStorageFetchInfoMap);
// Check if remote fetch can be completed.
return maybeCompletePendingRemoteFetch();
Expand Down Expand Up @@ -809,18 +810,38 @@ private void completeErroneousRemoteShareFetchRequest() {
try {
handleFetchException(shareFetch, partitionsAcquired.keySet(), remoteStorageFetchException.get());
} finally {
releasePartitionLocksAndAddToActionQueue(partitionsAcquired.keySet());
releasePartitionLocksAndAddToActionQueue(partitionsAcquired.keySet(), partitionsAcquired.keySet());
}

}

private void releasePartitionLocksAndAddToActionQueue(Set<TopicIdPartition> topicIdPartitions) {
if (topicIdPartitions.isEmpty()) {
private Set<TopicIdPartition> partitionsWithData(List<ShareFetchPartitionData> shareFetchPartitionDataList) {
if (shareFetchPartitionDataList == null || shareFetchPartitionDataList.isEmpty()) {
return Set.of();
}
Set<TopicIdPartition> partitionsWithData = new HashSet<>();
shareFetchPartitionDataList.forEach(shareFetchPartitionData -> {
if (shareFetchPartitionData.fetchPartitionData() != null &&
shareFetchPartitionData.fetchPartitionData().records != null &&
shareFetchPartitionData.fetchPartitionData().records.sizeInBytes() > 0) {
partitionsWithData.add(shareFetchPartitionData.topicIdPartition());
}
});
return partitionsWithData;
}

private void releasePartitionLocksAndAddToActionQueue(Set<TopicIdPartition> allAcquiredTopicIdPartitions,
Set<TopicIdPartition> topicIdPartitionsWithData) {
if (allAcquiredTopicIdPartitions.isEmpty()) {
// topicIdPartitionsWithData set should be a subset of allAcquiredTopicIdPartitions, hence it is safe to return.
return;
}
// Releasing the lock to move ahead with the next request in queue.
releasePartitionLocks(topicIdPartitions);
replicaManager.addToActionQueue(() -> topicIdPartitions.forEach(topicIdPartition -> {
releasePartitionLocks(allAcquiredTopicIdPartitions);
if (topicIdPartitionsWithData.isEmpty()) {
return;
}
replicaManager.addToActionQueue(() -> topicIdPartitionsWithData.forEach(topicIdPartition -> {
// If we have a fetch request completed for a share-partition, we release the locks for that partition,
// then we should check if there is a pending share fetch request for the share-partition and complete it.
// We add the action to delayed actions queue to avoid an infinite call stack, which could happen if
Expand All @@ -844,15 +865,15 @@ private void releasePartitionLocksAndAddToActionQueue(Set<TopicIdPartition> topi
*/
private void completeRemoteStorageShareFetchRequest() {
LinkedHashMap<TopicIdPartition, Long> acquiredNonRemoteFetchTopicPartitionData = new LinkedHashMap<>();
List<ShareFetchPartitionData> shareFetchPartitionDataList = new ArrayList<>();
try {
List<ShareFetchPartitionData> shareFetchPartitionData = new ArrayList<>();
int readableBytes = 0;
for (RemoteFetch remoteFetch : pendingRemoteFetchesOpt.get().remoteFetches()) {
if (remoteFetch.remoteFetchResult().isDone()) {
RemoteLogReadResult remoteLogReadResult = remoteFetch.remoteFetchResult().get();
if (remoteLogReadResult.error().isPresent()) {
// If there is any error for the remote fetch topic partition, we populate the error accordingly.
shareFetchPartitionData.add(
shareFetchPartitionDataList.add(
new ShareFetchPartitionData(
remoteFetch.topicIdPartition(),
partitionsAcquired.get(remoteFetch.topicIdPartition()),
Expand All @@ -863,7 +884,7 @@ private void completeRemoteStorageShareFetchRequest() {
FetchDataInfo info = remoteLogReadResult.fetchDataInfo().get();
TopicIdPartition topicIdPartition = remoteFetch.topicIdPartition();
LogReadResult logReadResult = remoteFetch.logReadResult();
shareFetchPartitionData.add(
shareFetchPartitionDataList.add(
new ShareFetchPartitionData(
topicIdPartition,
partitionsAcquired.get(remoteFetch.topicIdPartition()),
Expand Down Expand Up @@ -907,7 +928,7 @@ private void completeRemoteStorageShareFetchRequest() {
resetFetchOffsetMetadataForRemoteFetchPartitions(acquiredNonRemoteFetchTopicPartitionData, responseData);
for (Map.Entry<TopicIdPartition, LogReadResult> entry : responseData.entrySet()) {
if (entry.getValue().info().delayedRemoteStorageFetch.isEmpty()) {
shareFetchPartitionData.add(
shareFetchPartitionDataList.add(
new ShareFetchPartitionData(
entry.getKey(),
acquiredNonRemoteFetchTopicPartitionData.get(entry.getKey()),
Expand All @@ -925,7 +946,7 @@ private void completeRemoteStorageShareFetchRequest() {
shareGroupMetrics.recordTopicPartitionsFetchRatio(shareFetch.groupId(), (int) (acquiredRatio * 100));

Map<TopicIdPartition, ShareFetchResponseData.PartitionData> remoteFetchResponse = ShareFetchUtils.processFetchResponse(
shareFetch, shareFetchPartitionData, sharePartitions, replicaManager, exceptionHandler);
shareFetch, shareFetchPartitionDataList, sharePartitions, replicaManager, exceptionHandler);
shareFetch.maybeComplete(remoteFetchResponse);
log.trace("Remote share fetch request completed successfully, response: {}", remoteFetchResponse);
} catch (InterruptedException | ExecutionException e) {
Expand All @@ -937,7 +958,7 @@ private void completeRemoteStorageShareFetchRequest() {
} finally {
Set<TopicIdPartition> topicIdPartitions = new LinkedHashSet<>(partitionsAcquired.keySet());
topicIdPartitions.addAll(acquiredNonRemoteFetchTopicPartitionData.keySet());
releasePartitionLocksAndAddToActionQueue(topicIdPartitions);
releasePartitionLocksAndAddToActionQueue(topicIdPartitions, partitionsWithData(shareFetchPartitionDataList));
}
}

Expand Down
Loading
Loading