Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,15 @@ public RecordIterator<InternalRow> readBatch() throws IOException {
iterators[i] = batch;
}
}
// Expose file path and position when possible so callers that need per-row file
// metadata (e.g. Spark metadata columns and copy-on-write group filtering) can treat
// the assembled row as coming from one deterministic member file of the row-id group.
for (RecordIterator<InternalRow> iterator : iterators) {
if (iterator instanceof FileRecordIterator) {
return new DataEvolutionFileRecordIterator(
row, iterators, (FileRecordIterator<InternalRow>) iterator);
}
}
return new DataEvolutionIterator(row, iterators);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.reader;

import org.apache.paimon.data.InternalRow;
import org.apache.paimon.fs.Path;
import org.apache.paimon.reader.RecordReader.RecordIterator;

/**
* A {@link DataEvolutionIterator} that is also a {@link FileRecordIterator}. The file path and
* returned position are delegated to one designated inner iterator, so a row assembled from
* multiple files reports one deterministic member file of its row-id group and its position within
* that group (all inner iterators are row-aligned).
*/
public class DataEvolutionFileRecordIterator extends DataEvolutionIterator
implements FileRecordIterator<InternalRow> {

private final FileRecordIterator<InternalRow> designated;

public DataEvolutionFileRecordIterator(
DataEvolutionRow row,
RecordIterator<InternalRow>[] iterators,
FileRecordIterator<InternalRow> designated) {
super(row, iterators);
this.designated = designated;
}

@Override
public long returnedPosition() {
return designated.returnedPosition();
}

@Override
public Path filePath() {
return designated.filePath();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,15 @@
package org.apache.paimon.append;

import org.apache.paimon.data.InternalRow;
import org.apache.paimon.fs.Path;
import org.apache.paimon.reader.FileRecordIterator;
import org.apache.paimon.reader.FileRecordReader;
import org.apache.paimon.reader.RecordReader;

import javax.annotation.Nullable;

import java.io.IOException;
import java.io.UncheckedIOException;

/**
* A record reader that merges all batches from a multi-batch reader into a single concatenated
Expand Down Expand Up @@ -53,7 +57,13 @@ public class ForceSingleBatchReader implements RecordReader<InternalRow> {

public ForceSingleBatchReader(RecordReader<InternalRow> multiBatchReader) {
this.multiBatchReader = multiBatchReader;
this.batch = new ConcatBatch(multiBatchReader);
// Preserve the file-record capability of the wrapped reader: callers like the
// data-evolution union read rely on per-row file path and position (e.g. Spark's
// __paimon_file_path metadata column and copy-on-write group filtering).
this.batch =
multiBatchReader instanceof FileRecordReader
? new FileConcatBatch(multiBatchReader)
: new ConcatBatch(multiBatchReader);
}

@Override
Expand All @@ -71,8 +81,8 @@ public void close() throws IOException {

private static class ConcatBatch implements RecordIterator<InternalRow> {

private final RecordReader<InternalRow> reader;
private RecordIterator<InternalRow> currentBatch;
protected final RecordReader<InternalRow> reader;
protected RecordIterator<InternalRow> currentBatch;

private ConcatBatch(RecordReader<InternalRow> reader) {
this.reader = reader;
Expand Down Expand Up @@ -107,4 +117,42 @@ public void releaseBatch() {
}
}
}

/**
* A {@link ConcatBatch} over a {@link FileRecordReader}, exposing the file path and returned
* position of the batch that produced the current row. Callers may ask for the file path before
* the first {@link #next()}, so the first underlying batch is loaded on demand.
*/
private static class FileConcatBatch extends ConcatBatch
implements FileRecordIterator<InternalRow> {

private FileConcatBatch(RecordReader<InternalRow> reader) {
super(reader);
}

@Override
public long returnedPosition() {
return currentFileBatch().returnedPosition();
}

@Override
public Path filePath() {
return currentFileBatch().filePath();
}

private FileRecordIterator<InternalRow> currentFileBatch() {
if (currentBatch == null) {
try {
currentBatch = reader.readBatch();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
if (currentBatch == null) {
throw new IllegalStateException(
"The file batch is exhausted, file path and position are unavailable.");
}
}
return (FileRecordIterator<InternalRow>) currentBatch;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -645,31 +645,81 @@ Optional<RuntimeException> checkRowIdExistence(
}
}

// Row-id ranges removed by this same commit, keyed by partition and bucket. A
// copy-on-write rewrite (e.g. Spark V2 DELETE on a data-evolution table) deletes whole
// row-id groups and re-adds the surviving rows with their original row ids, so an added
// file may cover only a sub-range of a deleted group and cannot mirror an existing file
// exactly; it is still consistent as long as its range is fully covered by ranges
// deleted in this commit (concurrent rewrites of those files are caught by the regular
// deleted-file conflict checks).
Map<FileRowIdKey, List<Range>> deletedRanges = new HashMap<>();
for (SimpleFileEntry entry : deltaEntries) {
if (entry.kind() == FileKind.DELETE && entry.firstRowId() != null) {
deletedRanges
.computeIfAbsent(
new FileRowIdKey(entry.partition(), entry.bucket(), 0, 0),
k -> new ArrayList<>())
.add(
new Range(
entry.firstRowId(),
entry.firstRowId() + entry.rowCount() - 1));
}
}

for (SimpleFileEntry entry : filesToCheck) {
FileRowIdKey key =
new FileRowIdKey(
entry.partition(),
entry.bucket(),
entry.firstRowId(),
entry.rowCount());
if (!existingIndex.contains(key)) {
return Optional.of(
new RuntimeException(
String.format(
"Row ID existence conflict: file '%s' references "
+ "firstRowId=%d, rowCount=%d in bucket %d, "
+ "but no matching file exists in the current snapshot. "
+ "The referenced file may have been rewritten by a "
+ "concurrent compaction or removed by an overwrite.",
entry.fileName(),
entry.firstRowId(),
entry.rowCount(),
entry.bucket())));
if (existingIndex.contains(key)) {
continue;
}
List<Range> deleted =
deletedRanges.get(new FileRowIdKey(entry.partition(), entry.bucket(), 0, 0));
if (coveredByRanges(
deleted, entry.firstRowId(), entry.firstRowId() + entry.rowCount() - 1)) {
continue;
}
return Optional.of(
new RuntimeException(
String.format(
"Row ID existence conflict: file '%s' references "
+ "firstRowId=%d, rowCount=%d in bucket %d, "
+ "but no matching file exists in the current snapshot. "
+ "The referenced file may have been rewritten by a "
+ "concurrent compaction or removed by an overwrite.",
entry.fileName(),
entry.firstRowId(),
entry.rowCount(),
entry.bucket())));
}
return Optional.empty();
}

/** Whether {@code [from, to]} is fully covered by the union of the given ranges. */
private static boolean coveredByRanges(@Nullable List<Range> ranges, long from, long to) {
if (ranges == null || ranges.isEmpty()) {
return false;
}
ranges.sort(Comparator.comparingLong(range -> range.from));
long cursor = from;
for (Range range : ranges) {
if (range.to < cursor) {
continue;
}
if (range.from > cursor) {
return false;
}
cursor = range.to + 1;
if (cursor > to) {
return true;
}
}
return cursor > to;
}

private static class FileRowIdKey {
private final BinaryRow partition;
private final int bucket;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,43 @@ void testCheckRowIdExistenceBaseFileRewritten() {
assertThat(result.get().getMessage()).contains("Row ID existence conflict");
}

@Test
void testCheckRowIdExistenceAcceptsSubRangeCoveredBySameCommitDeletes() {
ConflictDetection detection = createConflictDetection();

List<SimpleFileEntry> baseEntries = new ArrayList<>();
baseEntries.add(createFileEntryWithRowId("f1", ADD, 0L, 100L));

// Copy-on-write rewrite: the whole group [0, 100) is deleted and the surviving rows are
// re-added as sub-range files [0, 40) and [50, 100) with their original row ids.
List<SimpleFileEntry> deltaEntries = new ArrayList<>();
deltaEntries.add(createFileEntryWithRowId("f1", DELETE, 0L, 100L));
deltaEntries.add(createFileEntryWithRowId("p1", ADD, 0L, 40L));
deltaEntries.add(createFileEntryWithRowId("p2", ADD, 50L, 50L));

assertThat(detection.checkRowIdExistence(baseEntries, deltaEntries, 100L)).isEmpty();
}

@Test
void testCheckRowIdExistenceRejectsSubRangeNotCoveredBySameCommitDeletes() {
ConflictDetection detection = createConflictDetection();

List<SimpleFileEntry> baseEntries = new ArrayList<>();
baseEntries.add(createFileEntryWithRowId("f1", ADD, 0L, 100L));
baseEntries.add(createFileEntryWithRowId("f2", ADD, 100L, 100L));

// The added file [50, 150) spills past the deleted group [0, 100): rows of f2 are
// re-added without deleting f2, which would duplicate row ids.
List<SimpleFileEntry> deltaEntries = new ArrayList<>();
deltaEntries.add(createFileEntryWithRowId("f1", DELETE, 0L, 100L));
deltaEntries.add(createFileEntryWithRowId("p1", ADD, 50L, 100L));

Optional<RuntimeException> result =
detection.checkRowIdExistence(baseEntries, deltaEntries, 200L);
assertThat(result).isPresent();
assertThat(result.get().getMessage()).contains("Row ID existence conflict");
}

@Test
void testCheckRowIdExistenceSkipsNewlyAppendedFiles() {
ConflictDetection detection = createConflictDetection();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,27 @@ class RowTrackingTest extends RowTrackingTestBase {
}
}
}

test("Data Evolution: Spark 3.5 keeps data-evolution tables off the V2 row-level path") {
withSparkSQLConf("spark.paimon.write.use-v2-write" -> "true") {
withTable("t") {
sql(
"CREATE TABLE t (id INT, data INT) TBLPROPERTIES " +
"('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true')")
assert(!SparkTable.of(loadTable("t")).isInstanceOf[SupportsRowLevelOperations])

sql("INSERT INTO t VALUES (1, 1), (2, 2)")
assert(
intercept[RuntimeException] {
sql("DELETE FROM t WHERE id = 2")
}.getMessage
.contains("Delete operation is not supported when data evolution is enabled yet."))
assert(
intercept[RuntimeException] {
sql("UPDATE t SET data = 20 WHERE id = 2")
}.getMessage
.contains("Update operation is not supported when data evolution is enabled yet."))
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,53 @@

package org.apache.paimon.spark.sql

class RowTrackingTest extends RowTrackingTestBase {}
import org.apache.spark.sql.Row

class RowTrackingTest extends RowTrackingTestBase {

test("Data Evolution: Spark 4.0 uses V2 copy-on-write for DELETE and UPDATE") {
withSparkSQLConf("spark.paimon.write.use-v2-write" -> "true") {
withTable("t") {
sql(
"CREATE TABLE t (id INT, data INT) TBLPROPERTIES " +
"('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true')")
sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ * FROM VALUES (1, 1), (2, 2)")
sql("INSERT INTO t SELECT /*+ REPARTITION(1) */ * FROM VALUES (3, 3), (4, 4)")

sql("DELETE FROM t WHERE id = 2")
checkAnswer(
sql("SELECT *, _ROW_ID FROM t ORDER BY id"),
Seq(Row(1, 1, 0), Row(3, 3, 2), Row(4, 4, 3))
)

sql("UPDATE t SET data = 30 WHERE id = 3")
checkAnswer(
sql("SELECT *, _ROW_ID FROM t ORDER BY id"),
Seq(Row(1, 1, 0), Row(3, 30, 2), Row(4, 4, 3))
)
}
}
}

test("Data Evolution: partition column update is rejected") {
withSparkSQLConf("spark.paimon.write.use-v2-write" -> "true") {
withTable("t") {
sql("""
|CREATE TABLE t (id INT, data INT, dt STRING)
|PARTITIONED BY (dt)
|TBLPROPERTIES ('row-tracking.enabled' = 'true', 'data-evolution.enabled' = 'true')
|""".stripMargin)
sql("INSERT INTO t VALUES (1, 1, 'p1'), (2, 2, 'p2')")

assert(
intercept[Exception] {
sql("UPDATE t SET dt = 'p3' WHERE id = 1")
}.getMessage
.contains("Update to partition columns is not supported for data evolution tables"))

sql("UPDATE t SET data = 10 WHERE id = 1")
checkAnswer(sql("SELECT * FROM t ORDER BY id"), Seq(Row(1, 10, "p1"), Row(2, 2, "p2")))
}
}
}
}
Loading