Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions amoro-format-iceberg/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@
<name>Amoro Project Iceberg Format</name>
<url>https://amoro.apache.org</url>

<properties>
<iceberg.version>1.10.1</iceberg.version>
<parquet-avro.version>1.16.0</parquet-avro.version>
<parquet-jackson.version>1.16.0</parquet-jackson.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.amoro</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.iceberg.Table;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.util.DataFileSet;
import org.apache.iceberg.util.DeleteFileSet;

import java.util.function.Supplier;

Expand Down Expand Up @@ -69,6 +71,28 @@ public OverwriteFiles deleteFile(DataFile file) {
return this;
}

// Note: Do not add @Override here. This method implements the default method
// OverwriteFiles.deleteFiles(DataFileSet, DeleteFileSet) introduced in Iceberg 1.10.x.
// The @Override annotation would fail compilation against Iceberg 1.8.x (hadoop2/spark-3.3
// profiles) where this interface method does not exist. At runtime on 1.10.x+, this method
// is still resolved correctly via dynamic dispatch.
@SuppressWarnings("unused")
public OverwriteFiles deleteFiles(
DataFileSet dataFilesToDelete, DeleteFileSet deleteFilesToDelete) {
try {
overwriteFiles
.getClass()
.getMethod("deleteFiles", DataFileSet.class, DeleteFileSet.class)
.invoke(overwriteFiles, dataFilesToDelete, deleteFilesToDelete);
} catch (ReflectiveOperationException e) {
throw new UnsupportedOperationException(
"Deleting data and delete files is not supported by the underlying implementation", e);
}
dataFilesToDelete.forEach(this::deleteIcebergDataFile);
deleteFilesToDelete.forEach(this::deleteIcebergDeleteFile);
return this;
}

@Override
public OverwriteFiles validateAddedFilesMatchOverwriteFilter() {
overwriteFiles.validateAddedFilesMatchOverwriteFilter();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,20 @@

@RunWith(Parameterized.class)
public class IcebergRewriteExecutorTest extends TableTestBase {
private static final int POSITION_DELETE_FILE_PATH_ID = 2147483546;
private static final int POSITION_DELETE_POS_ID = 2147483545;

private final FileFormat fileFormat;

private RewriteFilesInput scanTask;

private RewriteFilesInput dataScanTask;

// When GenericParquetReaders.buildReader() is called with a schema containing _file / _pos,
// the TypeWithSchemaVisitor in Iceberg does strict schema matching against the Parquet file
// schema
private final Schema posSchema =
new Schema(MetadataColumns.FILE_PATH, MetadataColumns.ROW_POSITION);
new Schema(MetadataColumns.DELETE_FILE_PATH, MetadataColumns.DELETE_FILE_POS);

public IcebergRewriteExecutorTest(boolean hasPartition, FileFormat fileFormat) {
super(
Expand Down Expand Up @@ -109,6 +114,9 @@ private StructLike getPartitionData() {

@Before
public void initDataAndReader() throws IOException {
Assert.assertEquals(POSITION_DELETE_FILE_PATH_ID, MetadataColumns.DELETE_FILE_PATH.fieldId());
Assert.assertEquals(POSITION_DELETE_POS_ID, MetadataColumns.DELETE_FILE_POS.fieldId());

StructLike partitionData = getPartitionData();
OutputFileFactory outputFileFactory =
OutputFileFactory.builderFor(getMixedTable().asUnkeyedTable(), 0, 1)
Expand Down
6 changes: 6 additions & 0 deletions amoro-format-mixed/amoro-mixed-hive/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@
<name>Amoro Project Mixed Hive Format</name>
<url>https://amoro.apache.org</url>

<properties>
<iceberg.version>1.10.1</iceberg.version>
<parquet-avro.version>1.16.0</parquet-avro.version>
<parquet-jackson.version>1.16.0</parquet-jackson.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.amoro</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.iceberg.Transaction;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.util.DataFileSet;
import org.apache.iceberg.util.DeleteFileSet;

import java.util.List;

Expand Down Expand Up @@ -82,6 +84,27 @@ public OverwriteFiles deleteFile(DataFile file) {
return this;
}

// Note: Do not add @Override here. This method implements the default method
// OverwriteFiles.deleteFiles(DataFileSet, DeleteFileSet) introduced in Iceberg 1.10.x.
// The @Override annotation would fail compilation against Iceberg 1.8.x (hadoop2/spark-3.3
// profiles) where this interface method does not exist. At runtime on 1.10.x+, this method
// is still resolved correctly via dynamic dispatch.
@SuppressWarnings("unused")
public OverwriteFiles deleteFiles(
DataFileSet dataFilesToDelete, DeleteFileSet deleteFilesToDelete) {
try {
delegate
.getClass()
.getMethod("deleteFiles", DataFileSet.class, DeleteFileSet.class)
.invoke(delegate, dataFilesToDelete, deleteFilesToDelete);
} catch (ReflectiveOperationException e) {
throw new UnsupportedOperationException(
"Deleting data and delete files is not supported by the underlying implementation", e);
}
dataFilesToDelete.stream().filter(this::isHiveDataFile).forEach(this.deleteFiles::add);
return this;
}

@Override
public OverwriteFiles validateAddedFilesMatchOverwriteFilter() {
delegate.validateAddedFilesMatchOverwriteFilter();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,14 @@ public static ParquetValueReader<Record> buildReader(
@Override
protected ParquetValueReader<Record> createStructReader(
List<Type> types, List<ParquetValueReader<?>> fieldReaders, StructType structType) {
return new RecordReader(types, fieldReaders, structType);
return new RecordReader(fieldReaders, structType);
}

private static class RecordReader extends StructReader<Record, Record> {
private final StructType structType;

RecordReader(List<Type> types, List<ParquetValueReader<?>> readers, StructType struct) {
super(types, readers);
RecordReader(List<ParquetValueReader<?>> readers, StructType struct) {
super(readers);
this.structType = struct;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,6 @@ private static class FileIterator<T> implements CloseableIterator<T> {
private final ParquetValueReader<T> model;
private final long totalValues;
private final boolean reuseContainers;
private final long[] rowGroupsStartRowPos;

private int nextRowGroup = 0;
private long nextRowGroupStart = 0;
private long valuesRead = 0;
Expand All @@ -119,7 +117,6 @@ private static class FileIterator<T> implements CloseableIterator<T> {
this.model = conf.model();
this.totalValues = conf.totalValues();
this.reuseContainers = conf.reuseContainers();
this.rowGroupsStartRowPos = conf.startRowPositions();
}

@Override
Expand Down Expand Up @@ -156,11 +153,10 @@ private void advance() {
throw new RuntimeIOException(e);
}

long rowPosition = rowGroupsStartRowPos[nextRowGroup];
nextRowGroupStart += pages.getRowCount();
nextRowGroup += 1;

model.setPageSource(pages, rowPosition);
model.setPageSource(pages);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ private static Schema convertInternal(
public static MessageType pruneColumns(MessageType fileSchema, Schema expectedSchema) {
// column order must match the incoming type, so it doesn't matter that the ids are unordered
Set<Integer> selectedIds = TypeUtil.getProjectedIds(expectedSchema);
return (MessageType) ParquetTypeVisitor.visit(fileSchema, new PruneColumns(selectedIds));
return (MessageType)
TypeWithSchemaVisitor.visit(
expectedSchema.asStruct(), fileSchema, new PruneColumns(selectedIds));
}

/**
Expand Down
1 change: 0 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,6 @@
<artifactId>parquet-avro</artifactId>
<version>${parquet-avro.version}</version>
</dependency>

<!-- TODO: Remove this parquet-jackson when upgrade JDK to 17 or higher -->
<dependency>
<groupId>org.apache.parquet</groupId>
Expand Down
Loading