diff --git a/amoro-format-iceberg/pom.xml b/amoro-format-iceberg/pom.xml index 2931d90d8a..2f2283bc03 100644 --- a/amoro-format-iceberg/pom.xml +++ b/amoro-format-iceberg/pom.xml @@ -29,6 +29,12 @@ Amoro Project Iceberg Format https://amoro.apache.org + + 1.10.1 + 1.16.0 + 1.16.0 + + org.apache.amoro diff --git a/amoro-format-iceberg/src/main/java/org/apache/amoro/op/MixedOverwriteFiles.java b/amoro-format-iceberg/src/main/java/org/apache/amoro/op/MixedOverwriteFiles.java index 8555938791..11974c9be9 100644 --- a/amoro-format-iceberg/src/main/java/org/apache/amoro/op/MixedOverwriteFiles.java +++ b/amoro-format-iceberg/src/main/java/org/apache/amoro/op/MixedOverwriteFiles.java @@ -24,6 +24,8 @@ import org.apache.iceberg.Table; import org.apache.iceberg.Transaction; import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.util.DataFileSet; +import org.apache.iceberg.util.DeleteFileSet; import java.util.function.Supplier; @@ -69,6 +71,28 @@ public OverwriteFiles deleteFile(DataFile file) { return this; } + // Note: Do not add @Override here. This method implements the default method + // OverwriteFiles.deleteFiles(DataFileSet, DeleteFileSet) introduced in Iceberg 1.10.x. + // The @Override annotation would fail compilation against Iceberg 1.8.x (hadoop2/spark-3.3 + // profiles) where this interface method does not exist. At runtime on 1.10.x+, this method + // is still resolved correctly via dynamic dispatch. + @SuppressWarnings("unused") + public OverwriteFiles deleteFiles( + DataFileSet dataFilesToDelete, DeleteFileSet deleteFilesToDelete) { + try { + overwriteFiles + .getClass() + .getMethod("deleteFiles", DataFileSet.class, DeleteFileSet.class) + .invoke(overwriteFiles, dataFilesToDelete, deleteFilesToDelete); + } catch (ReflectiveOperationException e) { + throw new UnsupportedOperationException( + "Deleting data and delete files is not supported by the underlying implementation", e); + } + dataFilesToDelete.forEach(this::deleteIcebergDataFile); + deleteFilesToDelete.forEach(this::deleteIcebergDeleteFile); + return this; + } + @Override public OverwriteFiles validateAddedFilesMatchOverwriteFilter() { overwriteFiles.validateAddedFilesMatchOverwriteFilter(); diff --git a/amoro-format-iceberg/src/test/java/org/apache/amoro/optimizing/IcebergRewriteExecutorTest.java b/amoro-format-iceberg/src/test/java/org/apache/amoro/optimizing/IcebergRewriteExecutorTest.java index 37b79036cb..e854990c30 100644 --- a/amoro-format-iceberg/src/test/java/org/apache/amoro/optimizing/IcebergRewriteExecutorTest.java +++ b/amoro-format-iceberg/src/test/java/org/apache/amoro/optimizing/IcebergRewriteExecutorTest.java @@ -65,6 +65,8 @@ @RunWith(Parameterized.class) public class IcebergRewriteExecutorTest extends TableTestBase { + private static final int POSITION_DELETE_FILE_PATH_ID = 2147483546; + private static final int POSITION_DELETE_POS_ID = 2147483545; private final FileFormat fileFormat; @@ -72,8 +74,11 @@ public class IcebergRewriteExecutorTest extends TableTestBase { private RewriteFilesInput dataScanTask; + // When GenericParquetReaders.buildReader() is called with a schema containing _file / _pos, + // the TypeWithSchemaVisitor in Iceberg does strict schema matching against the Parquet file + // schema private final Schema posSchema = - new Schema(MetadataColumns.FILE_PATH, MetadataColumns.ROW_POSITION); + new Schema(MetadataColumns.DELETE_FILE_PATH, MetadataColumns.DELETE_FILE_POS); public IcebergRewriteExecutorTest(boolean hasPartition, FileFormat fileFormat) { super( @@ -109,6 +114,9 @@ private StructLike getPartitionData() { @Before public void initDataAndReader() throws IOException { + Assert.assertEquals(POSITION_DELETE_FILE_PATH_ID, MetadataColumns.DELETE_FILE_PATH.fieldId()); + Assert.assertEquals(POSITION_DELETE_POS_ID, MetadataColumns.DELETE_FILE_POS.fieldId()); + StructLike partitionData = getPartitionData(); OutputFileFactory outputFileFactory = OutputFileFactory.builderFor(getMixedTable().asUnkeyedTable(), 0, 1) diff --git a/amoro-format-mixed/amoro-mixed-hive/pom.xml b/amoro-format-mixed/amoro-mixed-hive/pom.xml index 40a9030cd0..64a0396782 100644 --- a/amoro-format-mixed/amoro-mixed-hive/pom.xml +++ b/amoro-format-mixed/amoro-mixed-hive/pom.xml @@ -30,6 +30,12 @@ Amoro Project Mixed Hive Format https://amoro.apache.org + + 1.10.1 + 1.16.0 + 1.16.0 + + org.apache.amoro diff --git a/amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/amoro/hive/op/OverwriteHiveFiles.java b/amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/amoro/hive/op/OverwriteHiveFiles.java index a5595197ce..295e22c2a2 100644 --- a/amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/amoro/hive/op/OverwriteHiveFiles.java +++ b/amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/amoro/hive/op/OverwriteHiveFiles.java @@ -26,6 +26,8 @@ import org.apache.iceberg.Transaction; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.util.DataFileSet; +import org.apache.iceberg.util.DeleteFileSet; import java.util.List; @@ -82,6 +84,27 @@ public OverwriteFiles deleteFile(DataFile file) { return this; } + // Note: Do not add @Override here. This method implements the default method + // OverwriteFiles.deleteFiles(DataFileSet, DeleteFileSet) introduced in Iceberg 1.10.x. + // The @Override annotation would fail compilation against Iceberg 1.8.x (hadoop2/spark-3.3 + // profiles) where this interface method does not exist. At runtime on 1.10.x+, this method + // is still resolved correctly via dynamic dispatch. + @SuppressWarnings("unused") + public OverwriteFiles deleteFiles( + DataFileSet dataFilesToDelete, DeleteFileSet deleteFilesToDelete) { + try { + delegate + .getClass() + .getMethod("deleteFiles", DataFileSet.class, DeleteFileSet.class) + .invoke(delegate, dataFilesToDelete, deleteFilesToDelete); + } catch (ReflectiveOperationException e) { + throw new UnsupportedOperationException( + "Deleting data and delete files is not supported by the underlying implementation", e); + } + dataFilesToDelete.stream().filter(this::isHiveDataFile).forEach(this.deleteFiles::add); + return this; + } + @Override public OverwriteFiles validateAddedFilesMatchOverwriteFilter() { delegate.validateAddedFilesMatchOverwriteFilter(); diff --git a/amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/iceberg/data/parquet/AdaptHiveGenericParquetReaders.java b/amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/iceberg/data/parquet/AdaptHiveGenericParquetReaders.java index ef42d99fa1..76e0b45d07 100644 --- a/amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/iceberg/data/parquet/AdaptHiveGenericParquetReaders.java +++ b/amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/iceberg/data/parquet/AdaptHiveGenericParquetReaders.java @@ -50,14 +50,14 @@ public static ParquetValueReader buildReader( @Override protected ParquetValueReader createStructReader( List types, List> fieldReaders, StructType structType) { - return new RecordReader(types, fieldReaders, structType); + return new RecordReader(fieldReaders, structType); } private static class RecordReader extends StructReader { private final StructType structType; - RecordReader(List types, List> readers, StructType struct) { - super(types, readers); + RecordReader(List> readers, StructType struct) { + super(readers); this.structType = struct; } diff --git a/amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/iceberg/parquet/AdaptHiveParquetReader.java b/amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/iceberg/parquet/AdaptHiveParquetReader.java index f1dd2da9de..5f6e087cc4 100644 --- a/amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/iceberg/parquet/AdaptHiveParquetReader.java +++ b/amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/iceberg/parquet/AdaptHiveParquetReader.java @@ -106,8 +106,6 @@ private static class FileIterator implements CloseableIterator { private final ParquetValueReader model; private final long totalValues; private final boolean reuseContainers; - private final long[] rowGroupsStartRowPos; - private int nextRowGroup = 0; private long nextRowGroupStart = 0; private long valuesRead = 0; @@ -119,7 +117,6 @@ private static class FileIterator implements CloseableIterator { this.model = conf.model(); this.totalValues = conf.totalValues(); this.reuseContainers = conf.reuseContainers(); - this.rowGroupsStartRowPos = conf.startRowPositions(); } @Override @@ -156,11 +153,10 @@ private void advance() { throw new RuntimeIOException(e); } - long rowPosition = rowGroupsStartRowPos[nextRowGroup]; nextRowGroupStart += pages.getRowCount(); nextRowGroup += 1; - model.setPageSource(pages, rowPosition); + model.setPageSource(pages); } @Override diff --git a/amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/iceberg/parquet/AdaptHiveParquetSchemaUtil.java b/amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/iceberg/parquet/AdaptHiveParquetSchemaUtil.java index 9ef8c1cad4..a3f378e5f8 100644 --- a/amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/iceberg/parquet/AdaptHiveParquetSchemaUtil.java +++ b/amoro-format-mixed/amoro-mixed-hive/src/main/java/org/apache/iceberg/parquet/AdaptHiveParquetSchemaUtil.java @@ -79,7 +79,9 @@ private static Schema convertInternal( public static MessageType pruneColumns(MessageType fileSchema, Schema expectedSchema) { // column order must match the incoming type, so it doesn't matter that the ids are unordered Set selectedIds = TypeUtil.getProjectedIds(expectedSchema); - return (MessageType) ParquetTypeVisitor.visit(fileSchema, new PruneColumns(selectedIds)); + return (MessageType) + TypeWithSchemaVisitor.visit( + expectedSchema.asStruct(), fileSchema, new PruneColumns(selectedIds)); } /** diff --git a/pom.xml b/pom.xml index 070c9c0f1d..5c5279f421 100644 --- a/pom.xml +++ b/pom.xml @@ -396,7 +396,6 @@ parquet-avro ${parquet-avro.version} - org.apache.parquet