diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index afb7ad6698dc..7d38c66d5317 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -49,6 +49,7 @@ import org.apache.paimon.operation.commit.ManifestEntryChanges; import org.apache.paimon.operation.commit.RetryCommitResult; import org.apache.paimon.operation.commit.RetryCommitResult.CommitFailRetryResult; +import org.apache.paimon.operation.commit.RetryCommitResult.ManifestMergeResult; import org.apache.paimon.operation.commit.RowIdColumnConflictChecker; import org.apache.paimon.operation.commit.RowTrackingCommitUtils.RowTrackingAssigned; import org.apache.paimon.operation.commit.StrictModeChecker; @@ -958,6 +959,7 @@ CommitResult tryCommitOnce( String indexManifest = null; List mergeBeforeManifests = new ArrayList<>(); List mergeAfterManifests = new ArrayList<>(); + boolean skipManifestMergeOnRetry = false; long nextRowIdStart = firstRowIdStart; try { long previousTotalRecordCount = 0L; @@ -977,9 +979,19 @@ CommitResult tryCommitOnce( } // try to merge old manifest files to create base manifest list - mergeAfterManifests = - ManifestFileMerger.merge( - mergeBeforeManifests, manifestFile, partitionType, options); + ManifestMergeReuse manifestMergeReuse = + tryReuseManifestMergeResult(retryResult, mergeBeforeManifests); + skipManifestMergeOnRetry = manifestMergeReuse == null && retryResult != null; + if (manifestMergeReuse != null) { + mergeBeforeManifests = manifestMergeReuse.preservedManifests; + mergeAfterManifests = manifestMergeReuse.mergeAfterManifests; + } else if (skipManifestMergeOnRetry) { + mergeAfterManifests = mergeBeforeManifests; + } else { + mergeAfterManifests = + ManifestFileMerger.merge( + mergeBeforeManifests, manifestFile, partitionType, options); + } baseManifestList = manifestList.write(mergeAfterManifests); if (options.rowTrackingEnabled()) { @@ -1087,7 +1099,7 @@ CommitResult tryCommitOnce( } catch (Exception e) { // commit exception, not sure about the situation and should not clean up the files LOG.warn("Retry commit for exception.", e); - return RetryCommitResult.forCommitFail(latestSnapshot, baseDataFiles, e); + return RetryCommitResult.forCommitFail(latestSnapshot, baseDataFiles, e, null); } if (!success) { @@ -1101,7 +1113,13 @@ CommitResult tryCommitOnce( identifier, commitKind.name(), commitTime); - return RetryCommitResult.forCommitFail(latestSnapshot, baseDataFiles, null); + return RetryCommitResult.forCommitFail( + latestSnapshot, + baseDataFiles, + null, + skipManifestMergeOnRetry + ? null + : new ManifestMergeResult(mergeBeforeManifests, mergeAfterManifests)); } LOG.info( @@ -1123,6 +1141,81 @@ CommitResult tryCommitOnce( return new SuccessCommitResult(); } + @Nullable + private ManifestMergeReuse tryReuseManifestMergeResult( + @Nullable RetryCommitResult retryResult, List currentManifests) { + if (!(retryResult instanceof CommitFailRetryResult)) { + return null; + } + + CommitFailRetryResult commitFailRetry = (CommitFailRetryResult) retryResult; + ManifestMergeResult previous = commitFailRetry.manifestMergeResult; + if (previous == null) { + return null; + } + if (previous.mergeBeforeManifests.isEmpty() && !currentManifests.isEmpty()) { + return null; + } + + List mergeAfterManifests = + replaceMergedManifests( + currentManifests, + previous.mergeBeforeManifests, + previous.mergeAfterManifests); + if (mergeAfterManifests == null) { + return null; + } + + return new ManifestMergeReuse(currentManifests, mergeAfterManifests); + } + + @Nullable + private List replaceMergedManifests( + List currentManifests, + List mergeBeforeManifests, + List mergedManifests) { + List remainingMergeBefore = new ArrayList<>(mergeBeforeManifests); + List replacementManifests = + new ArrayList<>( + Math.max( + 0, + currentManifests.size() + - mergeBeforeManifests.size() + + mergedManifests.size())); + boolean insertedMergeAfter = false; + for (ManifestFileMeta currentManifest : currentManifests) { + int mergeBeforeIndex = remainingMergeBefore.indexOf(currentManifest); + if (mergeBeforeIndex < 0) { + replacementManifests.add(currentManifest); + continue; + } + + remainingMergeBefore.remove(mergeBeforeIndex); + if (!insertedMergeAfter) { + replacementManifests.addAll(mergedManifests); + insertedMergeAfter = true; + } + } + + if (!remainingMergeBefore.isEmpty()) { + return null; + } + return replacementManifests; + } + + private static class ManifestMergeReuse { + + private final List preservedManifests; + private final List mergeAfterManifests; + + private ManifestMergeReuse( + List preservedManifests, + List mergeAfterManifests) { + this.preservedManifests = preservedManifests; + this.mergeAfterManifests = mergeAfterManifests; + } + } + public boolean replaceManifestList( Snapshot latest, long totalRecordCount, diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/commit/RetryCommitResult.java b/paimon-core/src/main/java/org/apache/paimon/operation/commit/RetryCommitResult.java index b9e0ab2a2ef0..717df209cedc 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/commit/RetryCommitResult.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/commit/RetryCommitResult.java @@ -19,10 +19,13 @@ package org.apache.paimon.operation.commit; import org.apache.paimon.Snapshot; +import org.apache.paimon.manifest.ManifestFileMeta; import org.apache.paimon.manifest.SimpleFileEntry; import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; /** Need to retry commit of {@link CommitResult}. */ @@ -35,8 +38,11 @@ private RetryCommitResult(Exception exception) { } public static RetryCommitResult forCommitFail( - Snapshot snapshot, List baseDataFiles, Exception exception) { - return new CommitFailRetryResult(snapshot, baseDataFiles, exception); + Snapshot snapshot, + List baseDataFiles, + Exception exception, + @Nullable ManifestMergeResult manifestMergeResult) { + return new CommitFailRetryResult(snapshot, baseDataFiles, exception, manifestMergeResult); } public static RetryCommitResult forRollback(Exception exception) { @@ -53,14 +59,33 @@ public static class CommitFailRetryResult extends RetryCommitResult { public final @Nullable Snapshot latestSnapshot; public final @Nullable List baseDataFiles; + public final @Nullable ManifestMergeResult manifestMergeResult; private CommitFailRetryResult( @Nullable Snapshot latestSnapshot, @Nullable List baseDataFiles, - Exception exception) { + Exception exception, + @Nullable ManifestMergeResult manifestMergeResult) { super(exception); this.latestSnapshot = latestSnapshot; this.baseDataFiles = baseDataFiles; + this.manifestMergeResult = manifestMergeResult; + } + } + + /** Manifest merge result which can be reused by commit retry. */ + public static class ManifestMergeResult { + + public final List mergeBeforeManifests; + public final List mergeAfterManifests; + + public ManifestMergeResult( + List mergeBeforeManifests, + List mergeAfterManifests) { + this.mergeBeforeManifests = + Collections.unmodifiableList(new ArrayList<>(mergeBeforeManifests)); + this.mergeAfterManifests = + Collections.unmodifiableList(new ArrayList<>(mergeAfterManifests)); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java index ef40a0fa2c4d..1ef3c5809cc8 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java @@ -42,8 +42,10 @@ import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.manifest.ManifestFile; import org.apache.paimon.manifest.ManifestFileMeta; +import org.apache.paimon.manifest.ManifestList; import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction; import org.apache.paimon.operation.commit.ConflictDetection; +import org.apache.paimon.operation.commit.ManifestEntryChanges; import org.apache.paimon.operation.commit.RetryCommitResult; import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.schema.Schema; @@ -60,6 +62,7 @@ import org.apache.paimon.types.RowKind; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.FailingFileIO; +import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TraceableFileIO; @@ -1074,7 +1077,8 @@ public void testCommitTwiceWithDifferentKind() throws Exception { null); // Compact commit.tryCommitOnce( - RetryCommitResult.forCommitFail(firstLatest, Collections.emptyList(), null), + RetryCommitResult.forCommitFail( + firstLatest, Collections.emptyList(), null, null), Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), @@ -1125,6 +1129,401 @@ public void testCommitRetryAfterFalseSuccessDoesNotCleanManifest() throws Except assertThat(store.readKvsFromSnapshot(latestSnapshot.id())).hasSize(1); } + @Test + public void testCommitRetryReusePreviousManifestMergeResultWhenBeforeStillExists() + throws Exception { + TestFileStore store = + createStore( + false, + Collections.singletonMap( + CoreOptions.MANIFEST_FULL_COMPACTION_FILE_SIZE.key(), "1")); + + store.commitData( + Collections.singletonList(gen.nextInsert("20211110", 8, 1L, null, "a")), + gen::getPartition, + kv -> 0); + store.commitData( + Collections.singletonList(gen.nextInsert("20211111", 9, 2L, null, "b")), + gen::getPartition, + kv -> 0); + + AtomicReference committableRef = new AtomicReference<>(); + store.commitDataImpl( + Collections.singletonList(gen.nextInsert("20211110", 9, 3L, null, "c")), + gen::getPartition, + value -> 0, + false, + 23L, + null, + Collections.emptyList(), + (commit, committable) -> committableRef.set(committable)); + + ConflictingSnapshotCommit snapshotCommit = + new ConflictingSnapshotCommit( + new RenamingSnapshotCommit(store.snapshotManager(), Lock.empty()), + store.snapshotManager(), + store.manifestListFactory().create(), + store.manifestFileFactory().create(), + false, + conflictAttempts(Collections.emptyList())); + try (FileStoreCommitImpl commit = + newCommitWithSnapshotCommit(store, "retry-reuse-merge", snapshotCommit)) { + commit.commit(checkNotNull(committableRef.get()), false); + } + + Snapshot latestSnapshot = checkNotNull(store.snapshotManager().latestSnapshot()); + List finalBaseManifests = + store.manifestListFactory() + .create() + .read( + latestSnapshot.baseManifestList(), + latestSnapshot.baseManifestListSize()); + assertThat(finalBaseManifests) + .containsExactlyElementsOf(snapshotCommit.firstAttemptBaseManifests()); + assertThat(store.readKvsFromSnapshot(latestSnapshot.id())).hasSize(3); + } + + @Test + public void testCommitRetryReusePreviousManifestMergeResultWhenBeforeExistsNonContiguously() + throws Exception { + TestFileStore store = + createStore( + false, + Collections.singletonMap( + CoreOptions.MANIFEST_FULL_COMPACTION_FILE_SIZE.key(), "1")); + + store.commitData( + Collections.singletonList(gen.nextInsert("20211110", 8, 1L, null, "a")), + gen::getPartition, + kv -> 0); + store.commitData( + Collections.singletonList(gen.nextInsert("20211111", 9, 2L, null, "b")), + gen::getPartition, + kv -> 0); + + AtomicReference conflictCommittableRef = new AtomicReference<>(); + store.commitDataImpl( + Collections.singletonList(gen.nextInsert("20211112", 10, 4L, null, "d")), + gen::getPartition, + value -> 0, + false, + 22L, + null, + Collections.emptyList(), + (commit, committable) -> conflictCommittableRef.set(committable)); + List conflictDeltaFiles = + tableFilesFrom(checkNotNull(conflictCommittableRef.get()), store.options()); + + AtomicReference committableRef = new AtomicReference<>(); + store.commitDataImpl( + Collections.singletonList(gen.nextInsert("20211110", 9, 3L, null, "c")), + gen::getPartition, + value -> 0, + false, + 23L, + null, + Collections.emptyList(), + (commit, committable) -> committableRef.set(committable)); + + ConflictingSnapshotCommit snapshotCommit = + new ConflictingSnapshotCommit( + new RenamingSnapshotCommit(store.snapshotManager(), Lock.empty()), + store.snapshotManager(), + store.manifestListFactory().create(), + store.manifestFileFactory().create(), + false, + conflictAttempts(conflictDeltaFiles)); + try (FileStoreCommitImpl commit = + newCommitWithSnapshotCommit(store, "retry-reuse-non-contiguous", snapshotCommit)) { + commit.commit(checkNotNull(committableRef.get()), false); + } + + Snapshot latestSnapshot = checkNotNull(store.snapshotManager().latestSnapshot()); + List expectedBaseManifests = + new ArrayList<>(snapshotCommit.firstAttemptBaseManifests()); + expectedBaseManifests.addAll(snapshotCommit.conflictDeltaManifests()); + List finalBaseManifests = + store.manifestListFactory() + .create() + .read( + latestSnapshot.baseManifestList(), + latestSnapshot.baseManifestListSize()); + assertThat(finalBaseManifests).containsExactlyElementsOf(expectedBaseManifests); + assertThat(store.readKvsFromSnapshot(latestSnapshot.id())).hasSize(4); + } + + @Test + public void testCommitRetrySkipsManifestMergeWhenPreviousMergeCannotBeReused() + throws Exception { + Map options = new HashMap<>(); + options.put(CoreOptions.MANIFEST_FULL_COMPACTION_FILE_SIZE.key(), "1"); + TestFileStore store = createStore(false, options); + + store.commitData( + Collections.singletonList(gen.nextInsert("20211110", 8, 1L, null, "a")), + gen::getPartition, + kv -> 0); + store.commitData( + Collections.singletonList(gen.nextInsert("20211111", 9, 2L, null, "b")), + gen::getPartition, + kv -> 0); + + Snapshot latestBeforeRetry = checkNotNull(store.snapshotManager().latestSnapshot()); + assertThat(store.manifestListFactory().create().readDataManifests(latestBeforeRetry)) + .hasSize(2); + + AtomicReference committableRef = new AtomicReference<>(); + store.commitDataImpl( + Collections.singletonList(gen.nextInsert("20211110", 9, 3L, null, "c")), + gen::getPartition, + value -> 0, + false, + 23L, + null, + Collections.emptyList(), + (commit, committable) -> committableRef.set(committable)); + + ConflictingSnapshotCommit snapshotCommit = + new ConflictingSnapshotCommit( + new RenamingSnapshotCommit(store.snapshotManager(), Lock.empty()), + store.snapshotManager(), + store.manifestListFactory().create(), + store.manifestFileFactory().create(), + true, + conflictAttempts(Collections.emptyList())); + try (FileStoreCommitImpl commit = + newCommitWithSnapshotCommit(store, "retry-reuse-merge", snapshotCommit)) { + commit.commit(checkNotNull(committableRef.get()), false); + } + + Snapshot latestSnapshot = checkNotNull(store.snapshotManager().latestSnapshot()); + List finalBaseManifests = + store.manifestListFactory() + .create() + .read( + latestSnapshot.baseManifestList(), + latestSnapshot.baseManifestListSize()); + assertThat(snapshotCommit.conflictBaseManifests()).hasSize(2); + assertThat(finalBaseManifests) + .containsExactlyElementsOf(snapshotCommit.conflictBaseManifests()); + assertThat(finalBaseManifests).isNotEqualTo(snapshotCommit.firstAttemptBaseManifests()); + assertThat(store.readKvsFromSnapshot(latestSnapshot.id())).hasSize(3); + } + + @Test + public void testCommitRetryReusePreviousManifestMergeResultAcrossMultipleRetries() + throws Exception { + TestFileStore store = + createStore( + false, + Collections.singletonMap( + CoreOptions.MANIFEST_FULL_COMPACTION_FILE_SIZE.key(), "1")); + + store.commitData( + Collections.singletonList(gen.nextInsert("20211110", 8, 1L, null, "a")), + gen::getPartition, + kv -> 0); + store.commitData( + Collections.singletonList(gen.nextInsert("20211111", 9, 2L, null, "b")), + gen::getPartition, + kv -> 0); + + AtomicReference firstConflictCommittableRef = new AtomicReference<>(); + store.commitDataImpl( + Collections.singletonList(gen.nextInsert("20211112", 10, 4L, null, "d")), + gen::getPartition, + value -> 0, + false, + 22L, + null, + Collections.emptyList(), + (commit, committable) -> firstConflictCommittableRef.set(committable)); + List firstConflictDeltaFiles = + tableFilesFrom(checkNotNull(firstConflictCommittableRef.get()), store.options()); + + AtomicReference secondConflictCommittableRef = new AtomicReference<>(); + store.commitDataImpl( + Collections.singletonList(gen.nextInsert("20211113", 11, 5L, null, "e")), + gen::getPartition, + value -> 0, + false, + 24L, + null, + Collections.emptyList(), + (commit, committable) -> secondConflictCommittableRef.set(committable)); + List secondConflictDeltaFiles = + tableFilesFrom(checkNotNull(secondConflictCommittableRef.get()), store.options()); + + AtomicReference committableRef = new AtomicReference<>(); + store.commitDataImpl( + Collections.singletonList(gen.nextInsert("20211110", 9, 3L, null, "c")), + gen::getPartition, + value -> 0, + false, + 23L, + null, + Collections.emptyList(), + (commit, committable) -> committableRef.set(committable)); + + ConflictingSnapshotCommit snapshotCommit = + new ConflictingSnapshotCommit( + new RenamingSnapshotCommit(store.snapshotManager(), Lock.empty()), + store.snapshotManager(), + store.manifestListFactory().create(), + store.manifestFileFactory().create(), + false, + conflictAttempts(firstConflictDeltaFiles, secondConflictDeltaFiles)); + try (FileStoreCommitImpl commit = + newCommitWithSnapshotCommit(store, "retry-reuse-multiple", snapshotCommit)) { + commit.commit(checkNotNull(committableRef.get()), false); + } + + Snapshot latestSnapshot = checkNotNull(store.snapshotManager().latestSnapshot()); + List expectedBaseManifests = + new ArrayList<>(snapshotCommit.attemptBaseManifests(1)); + expectedBaseManifests.addAll(snapshotCommit.conflictDeltaManifests(1)); + List finalBaseManifests = + store.manifestListFactory() + .create() + .read( + latestSnapshot.baseManifestList(), + latestSnapshot.baseManifestListSize()); + assertThat(finalBaseManifests).containsExactlyElementsOf(expectedBaseManifests); + assertThat(store.readKvsFromSnapshot(latestSnapshot.id())).hasSize(5); + } + + @Test + public void testCommitRetryFromEmptyTableWithConcurrentFirstSnapshot() throws Exception { + TestFileStore store = + createStore( + false, + Collections.singletonMap( + CoreOptions.MANIFEST_FULL_COMPACTION_FILE_SIZE.key(), "1")); + + AtomicReference conflictCommittableRef = new AtomicReference<>(); + store.commitDataImpl( + Collections.singletonList(gen.nextInsert("20211112", 10, 4L, null, "d")), + gen::getPartition, + value -> 0, + false, + 22L, + null, + Collections.emptyList(), + (commit, committable) -> conflictCommittableRef.set(committable)); + List conflictDeltaFiles = + tableFilesFrom(checkNotNull(conflictCommittableRef.get()), store.options()); + + AtomicReference committableRef = new AtomicReference<>(); + store.commitDataImpl( + Collections.singletonList(gen.nextInsert("20211110", 9, 3L, null, "c")), + gen::getPartition, + value -> 0, + false, + 23L, + null, + Collections.emptyList(), + (commit, committable) -> committableRef.set(committable)); + + ConflictingSnapshotCommit snapshotCommit = + new ConflictingSnapshotCommit( + new RenamingSnapshotCommit(store.snapshotManager(), Lock.empty()), + store.snapshotManager(), + store.manifestListFactory().create(), + store.manifestFileFactory().create(), + false, + conflictAttempts(conflictDeltaFiles)); + try (FileStoreCommitImpl commit = + newCommitWithSnapshotCommit(store, "retry-reuse-empty-table", snapshotCommit)) { + commit.commit(checkNotNull(committableRef.get()), false); + } + + Snapshot latestSnapshot = checkNotNull(store.snapshotManager().latestSnapshot()); + List finalBaseManifests = + store.manifestListFactory() + .create() + .read( + latestSnapshot.baseManifestList(), + latestSnapshot.baseManifestListSize()); + assertThat(finalBaseManifests) + .containsExactlyElementsOf(snapshotCommit.conflictDeltaManifests()); + assertThat(store.readKvsFromSnapshot(latestSnapshot.id())).hasSize(2); + } + + @Test + public void testCommitRetryReusePreviousManifestMergeResultPreservesDeleteOrder() + throws Exception { + TestFileStore store = + createStore( + false, + Collections.singletonMap( + CoreOptions.MANIFEST_FULL_COMPACTION_FILE_SIZE.key(), "1")); + + store.commitData( + Collections.singletonList(gen.nextInsert("20211110", 8, 1L, null, "a")), + gen::getPartition, + kv -> 0); + store.commitData( + Collections.singletonList(gen.nextInsert("20211111", 9, 2L, null, "b")), + gen::getPartition, + kv -> 0); + + Snapshot latestBeforeRetry = checkNotNull(store.snapshotManager().latestSnapshot()); + List manifestsBeforeRetry = + store.manifestListFactory().create().readDataManifests(latestBeforeRetry); + assertThat(manifestsBeforeRetry).hasSize(2); + ManifestEntry firstEntry = + store.manifestFileFactory() + .create() + .read(manifestsBeforeRetry.get(0).fileName()) + .get(0); + ManifestEntry deleteFirstEntry = + ManifestEntry.create( + FileKind.DELETE, + firstEntry.partition(), + firstEntry.bucket(), + firstEntry.totalBuckets(), + firstEntry.file()); + + AtomicReference committableRef = new AtomicReference<>(); + store.commitDataImpl( + Collections.singletonList(gen.nextInsert("20211110", 9, 3L, null, "c")), + gen::getPartition, + value -> 0, + false, + 23L, + null, + Collections.emptyList(), + (commit, committable) -> committableRef.set(committable)); + + ConflictingSnapshotCommit snapshotCommit = + new ConflictingSnapshotCommit( + new RenamingSnapshotCommit(store.snapshotManager(), Lock.empty()), + store.snapshotManager(), + store.manifestListFactory().create(), + store.manifestFileFactory().create(), + false, + conflictAttempts(Collections.singletonList(deleteFirstEntry))); + try (FileStoreCommitImpl commit = + newCommitWithSnapshotCommit(store, "retry-reuse-delete-order", snapshotCommit)) { + commit.commit(checkNotNull(committableRef.get()), false); + } + + Snapshot latestSnapshot = checkNotNull(store.snapshotManager().latestSnapshot()); + List expectedBaseManifests = + new ArrayList<>(snapshotCommit.firstAttemptBaseManifests()); + expectedBaseManifests.addAll(snapshotCommit.conflictDeltaManifests()); + List finalBaseManifests = + store.manifestListFactory() + .create() + .read( + latestSnapshot.baseManifestList(), + latestSnapshot.baseManifestListSize()); + assertThat(finalBaseManifests).containsExactlyElementsOf(expectedBaseManifests); + assertThat(store.readKvsFromSnapshot(latestSnapshot.id())) + .extracting(kv -> kv.value().getString(6).toString()) + .containsExactlyInAnyOrder("b", "c"); + } + private FileStoreCommitImpl newCommitWithSnapshotCommit( TestFileStore store, String commitUser, SnapshotCommit snapshotCommit) { return newCommitWithSnapshotCommit( @@ -1199,6 +1598,177 @@ private ManifestCommittable indexCommittable( return committable; } + private static List tableFilesFrom( + ManifestCommittable committable, CoreOptions options) { + ManifestEntryChanges changes = new ManifestEntryChanges(options.bucket()); + committable.fileCommittables().forEach(changes::collect); + return new ArrayList<>(changes.appendTableFiles); + } + + @SafeVarargs + private static List> conflictAttempts(List... attempts) { + return Arrays.asList(attempts); + } + + private static class ConflictingSnapshotCommit implements SnapshotCommit { + + private final SnapshotCommit delegate; + private final SnapshotManager snapshotManager; + private final ManifestList manifestList; + private final ManifestFile manifestFile; + private final boolean mergeConflictManifests; + private final List> conflictDeltaFilesByAttempt; + private int commitAttempt = 0; + private final List> attemptBaseManifests; + private final List> conflictDeltaManifestsByAttempt; + private List conflictBaseManifests; + + private ConflictingSnapshotCommit( + SnapshotCommit delegate, + SnapshotManager snapshotManager, + ManifestList manifestList, + ManifestFile manifestFile, + boolean mergeConflictManifests, + List> conflictDeltaFilesByAttempt) { + this.delegate = delegate; + this.snapshotManager = snapshotManager; + this.manifestList = manifestList; + this.manifestFile = manifestFile; + this.mergeConflictManifests = mergeConflictManifests; + this.conflictDeltaFilesByAttempt = conflictDeltaFilesByAttempt; + this.attemptBaseManifests = new ArrayList<>(); + this.conflictDeltaManifestsByAttempt = new ArrayList<>(); + } + + @Override + public boolean commit( + Snapshot snapshot, + String branch, + List statistics) + throws Exception { + if (commitAttempt >= conflictDeltaFilesByAttempt.size()) { + return delegate.commit(snapshot, branch, statistics); + } + + List conflictDeltaFiles = conflictDeltaFilesByAttempt.get(commitAttempt); + commitAttempt++; + attemptBaseManifests.add( + manifestList.read( + snapshot.baseManifestList(), snapshot.baseManifestListSize())); + + Snapshot previousSnapshot = + snapshot.id() == Snapshot.FIRST_SNAPSHOT_ID + ? null + : snapshotManager.snapshot(snapshot.id() - 1); + List previousManifests = + previousSnapshot == null + ? Collections.emptyList() + : manifestList.readDataManifests(previousSnapshot); + conflictBaseManifests = + mergeConflictManifests + ? rewriteManifests(previousManifests) + : previousManifests; + List conflictNewManifests = + conflictDeltaFiles.isEmpty() + ? Collections.emptyList() + : manifestFile.write(conflictDeltaFiles); + conflictDeltaManifestsByAttempt.add(conflictNewManifests); + boolean putConflictNewManifestsInBase = + !mergeConflictManifests + && !conflictNewManifests.isEmpty() + && !previousManifests.isEmpty(); + if (putConflictNewManifestsInBase) { + conflictBaseManifests = new ArrayList<>(); + conflictBaseManifests.add(previousManifests.get(0)); + conflictBaseManifests.addAll(conflictNewManifests); + conflictBaseManifests.addAll( + previousManifests.subList(1, previousManifests.size())); + } + Pair conflictBaseManifestList = manifestList.write(conflictBaseManifests); + long conflictDeltaRecordCount = + conflictDeltaFiles.stream() + .mapToLong( + entry -> + entry.kind() == FileKind.ADD + ? entry.file().rowCount() + : -entry.file().rowCount()) + .sum(); + Pair conflictDeltaManifestList = + manifestList.write( + putConflictNewManifestsInBase + ? Collections.emptyList() + : conflictNewManifests); + Snapshot conflictSnapshot = + new Snapshot( + snapshot.id(), + previousSnapshot == null + ? snapshot.schemaId() + : previousSnapshot.schemaId(), + conflictBaseManifestList.getLeft(), + conflictBaseManifestList.getRight(), + conflictDeltaManifestList.getLeft(), + conflictDeltaManifestList.getRight(), + null, + null, + previousSnapshot == null ? null : previousSnapshot.indexManifest(), + "conflict-user", + Long.MAX_VALUE, + Snapshot.CommitKind.ANALYZE, + System.currentTimeMillis(), + (previousSnapshot == null ? 0L : previousSnapshot.totalRecordCount()) + + conflictDeltaRecordCount, + conflictDeltaRecordCount, + null, + previousSnapshot == null ? null : previousSnapshot.watermark(), + previousSnapshot == null ? null : previousSnapshot.statistics(), + previousSnapshot == null ? null : previousSnapshot.properties(), + previousSnapshot == null ? null : previousSnapshot.nextRowId()); + assertThat(delegate.commit(conflictSnapshot, branch, Collections.emptyList())).isTrue(); + return false; + } + + private List rewriteManifests(List manifests) { + if (manifests.isEmpty()) { + return Collections.emptyList(); + } + + List rewrittenManifests = new ArrayList<>(); + for (ManifestFileMeta manifest : manifests) { + rewrittenManifests.addAll( + manifestFile.write(manifestFile.read(manifest.fileName()))); + } + return rewrittenManifests; + } + + private List firstAttemptBaseManifests() { + return attemptBaseManifests(0); + } + + private List attemptBaseManifests(int attempt) { + assertThat(attemptBaseManifests).hasSizeGreaterThan(attempt); + return attemptBaseManifests.get(attempt); + } + + private List conflictBaseManifests() { + return checkNotNull(conflictBaseManifests); + } + + private List conflictDeltaManifests() { + assertThat(conflictDeltaManifestsByAttempt).isNotEmpty(); + return conflictDeltaManifestsByAttempt.get(conflictDeltaManifestsByAttempt.size() - 1); + } + + private List conflictDeltaManifests(int attempt) { + assertThat(conflictDeltaManifestsByAttempt).hasSizeGreaterThan(attempt); + return conflictDeltaManifestsByAttempt.get(attempt); + } + + @Override + public void close() throws Exception { + delegate.close(); + } + } + private static class FalseSuccessSnapshotCommit implements SnapshotCommit { private final SnapshotCommit delegate;