Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.paimon.operation.commit.ManifestEntryChanges;
import org.apache.paimon.operation.commit.RetryCommitResult;
import org.apache.paimon.operation.commit.RetryCommitResult.CommitFailRetryResult;
import org.apache.paimon.operation.commit.RetryCommitResult.ManifestMergeResult;
import org.apache.paimon.operation.commit.RowIdColumnConflictChecker;
import org.apache.paimon.operation.commit.RowTrackingCommitUtils.RowTrackingAssigned;
import org.apache.paimon.operation.commit.StrictModeChecker;
Expand Down Expand Up @@ -977,10 +978,18 @@ CommitResult tryCommitOnce(
}

// try to merge old manifest files to create base manifest list
mergeAfterManifests =
ManifestFileMerger.merge(
mergeBeforeManifests, manifestFile, partitionType, options);
baseManifestList = manifestList.write(mergeAfterManifests);
ManifestMergeReuse manifestMergeReuse =
tryReuseManifestMergeResult(retryResult, mergeBeforeManifests);
if (manifestMergeReuse == null) {
mergeAfterManifests =
ManifestFileMerger.merge(
mergeBeforeManifests, manifestFile, partitionType, options);
baseManifestList = manifestList.write(mergeAfterManifests);
} else {
mergeBeforeManifests = manifestMergeReuse.preservedManifests;
mergeAfterManifests = manifestMergeReuse.mergeAfterManifests;
baseManifestList = manifestList.write(mergeAfterManifests);
}

if (options.rowTrackingEnabled()) {
if (options.rowTrackingPartitionGroupOnCommit()) {
Expand Down Expand Up @@ -1087,7 +1096,7 @@ CommitResult tryCommitOnce(
} catch (Exception e) {
// commit exception, not sure about the situation and should not clean up the files
LOG.warn("Retry commit for exception.", e);
return RetryCommitResult.forCommitFail(latestSnapshot, baseDataFiles, e);
return RetryCommitResult.forCommitFail(latestSnapshot, baseDataFiles, e, null);
}

if (!success) {
Expand All @@ -1101,7 +1110,11 @@ CommitResult tryCommitOnce(
identifier,
commitKind.name(),
commitTime);
return RetryCommitResult.forCommitFail(latestSnapshot, baseDataFiles, null);
return RetryCommitResult.forCommitFail(
latestSnapshot,
baseDataFiles,
null,
new ManifestMergeResult(mergeBeforeManifests, mergeAfterManifests));
}

LOG.info(
Expand All @@ -1123,6 +1136,81 @@ CommitResult tryCommitOnce(
return new SuccessCommitResult();
}

@Nullable
private ManifestMergeReuse tryReuseManifestMergeResult(
@Nullable RetryCommitResult retryResult, List<ManifestFileMeta> currentManifests) {
if (!(retryResult instanceof CommitFailRetryResult)) {
return null;
}

CommitFailRetryResult commitFailRetry = (CommitFailRetryResult) retryResult;
ManifestMergeResult previous = commitFailRetry.manifestMergeResult;
if (previous == null) {
return null;
}
if (previous.mergeBeforeManifests.isEmpty() && !currentManifests.isEmpty()) {
return null;
}

List<ManifestFileMeta> mergeAfterManifests =
replaceMergedManifests(
currentManifests,
previous.mergeBeforeManifests,
previous.mergeAfterManifests);
if (mergeAfterManifests == null) {
return null;
}

return new ManifestMergeReuse(currentManifests, mergeAfterManifests);
}

@Nullable
private List<ManifestFileMeta> replaceMergedManifests(
List<ManifestFileMeta> currentManifests,
List<ManifestFileMeta> mergeBeforeManifests,
List<ManifestFileMeta> mergedManifests) {
List<ManifestFileMeta> remainingMergeBefore = new ArrayList<>(mergeBeforeManifests);
List<ManifestFileMeta> replacementManifests =
new ArrayList<>(
Math.max(
0,
currentManifests.size()
- mergeBeforeManifests.size()
+ mergedManifests.size()));
boolean insertedMergeAfter = false;
for (ManifestFileMeta currentManifest : currentManifests) {
int mergeBeforeIndex = remainingMergeBefore.indexOf(currentManifest);
if (mergeBeforeIndex < 0) {
replacementManifests.add(currentManifest);
continue;
}

remainingMergeBefore.remove(mergeBeforeIndex);
if (!insertedMergeAfter) {
replacementManifests.addAll(mergedManifests);
insertedMergeAfter = true;
}
}

if (!remainingMergeBefore.isEmpty()) {
return null;
}
return replacementManifests;
}

private static class ManifestMergeReuse {

private final List<ManifestFileMeta> preservedManifests;
private final List<ManifestFileMeta> mergeAfterManifests;

private ManifestMergeReuse(
List<ManifestFileMeta> preservedManifests,
List<ManifestFileMeta> mergeAfterManifests) {
this.preservedManifests = preservedManifests;
this.mergeAfterManifests = mergeAfterManifests;
}
}

public boolean replaceManifestList(
Snapshot latest,
long totalRecordCount,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,13 @@
package org.apache.paimon.operation.commit;

import org.apache.paimon.Snapshot;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.manifest.SimpleFileEntry;

import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

/** Need to retry commit of {@link CommitResult}. */
Expand All @@ -35,8 +38,11 @@ private RetryCommitResult(Exception exception) {
}

public static RetryCommitResult forCommitFail(
Snapshot snapshot, List<SimpleFileEntry> baseDataFiles, Exception exception) {
return new CommitFailRetryResult(snapshot, baseDataFiles, exception);
Snapshot snapshot,
List<SimpleFileEntry> baseDataFiles,
Exception exception,
@Nullable ManifestMergeResult manifestMergeResult) {
return new CommitFailRetryResult(snapshot, baseDataFiles, exception, manifestMergeResult);
}

public static RetryCommitResult forRollback(Exception exception) {
Expand All @@ -53,14 +59,33 @@ public static class CommitFailRetryResult extends RetryCommitResult {

public final @Nullable Snapshot latestSnapshot;
public final @Nullable List<SimpleFileEntry> baseDataFiles;
public final @Nullable ManifestMergeResult manifestMergeResult;

private CommitFailRetryResult(
@Nullable Snapshot latestSnapshot,
@Nullable List<SimpleFileEntry> baseDataFiles,
Exception exception) {
Exception exception,
@Nullable ManifestMergeResult manifestMergeResult) {
super(exception);
this.latestSnapshot = latestSnapshot;
this.baseDataFiles = baseDataFiles;
this.manifestMergeResult = manifestMergeResult;
}
}

/** Manifest merge result which can be reused by commit retry. */
public static class ManifestMergeResult {

public final List<ManifestFileMeta> mergeBeforeManifests;
public final List<ManifestFileMeta> mergeAfterManifests;

public ManifestMergeResult(
List<ManifestFileMeta> mergeBeforeManifests,
List<ManifestFileMeta> mergeAfterManifests) {
this.mergeBeforeManifests =
Collections.unmodifiableList(new ArrayList<>(mergeBeforeManifests));
this.mergeAfterManifests =
Collections.unmodifiableList(new ArrayList<>(mergeAfterManifests));
}
}

Expand Down
Loading