Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 15 additions & 11 deletions core/src/main/java/org/apache/iceberg/AllManifestsTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.ByteBuffers;
import org.apache.iceberg.util.StructProjection;

/**
Expand Down Expand Up @@ -76,7 +77,8 @@ public class AllManifestsTable extends BaseMetadataTable {
Types.NestedField.required(11, "contains_nan", Types.BooleanType.get()),
Types.NestedField.optional(12, "lower_bound", Types.StringType.get()),
Types.NestedField.optional(13, "upper_bound", Types.StringType.get())))),
REF_SNAPSHOT_ID);
REF_SNAPSHOT_ID,
Types.NestedField.optional(19, "key_metadata", Types.BinaryType.get()));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@szehon-ho Just wondering if you have any comments on adding key_metadata in MANIFEST_FILE_SCHEMA

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i am not familiar with key_metadata. Which table is it in today? Files/Entries?

Do we need one in regular (not all) manifest table?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

key_metadata is the encryption key metadata. Today it’s in both files and entries. This PR is adding key_metadata for manifest files (i.e., ManifestFile.keyMetadata()) to .all_manifests, since Spark actions read manifests via .all_manifests and need the manifest’s key metadata to open/decrypt encrypted manifest files. I think we should add it to the regular .manifests table too for parity, but I guess it can be done in a followup.


AllManifestsTable(Table table) {
this(table, table.name() + ".all_manifests");
Expand Down Expand Up @@ -138,7 +140,7 @@ protected CloseableIterable<FileScanTask> doPlanFiles() {
io,
schema(),
specs,
snap.manifestListLocation(),
new BaseManifestListFile(snap.manifestListLocation(), snap.keyId()),
filter,
snap.snapshotId());
} else {
Expand All @@ -161,7 +163,7 @@ static class ManifestListReadTask implements DataTask {
private final FileIO io;
private final Schema schema;
private final Map<Integer, PartitionSpec> specs;
private final String manifestListLocation;
private final ManifestListFile manifestList;
private final Expression residual;
private final long referenceSnapshotId;
private DataFile lazyDataFile = null;
Expand All @@ -171,14 +173,14 @@ static class ManifestListReadTask implements DataTask {
FileIO io,
Schema schema,
Map<Integer, PartitionSpec> specs,
String manifestListLocation,
ManifestListFile manifestList,
Expression residual,
long referenceSnapshotId) {
this.dataTableSchema = dataTableSchema;
this.io = io;
this.schema = schema;
this.specs = specs;
this.manifestListLocation = manifestListLocation;
this.manifestList = manifestList;
this.residual = residual;
this.referenceSnapshotId = referenceSnapshotId;
}
Expand All @@ -191,7 +193,7 @@ public List<DeleteFile> deletes() {
@Override
public CloseableIterable<StructLike> rows() {
try (CloseableIterable<ManifestFile> manifests =
InternalData.read(FileFormat.AVRO, io.newInputFile(manifestListLocation))
InternalData.read(FileFormat.AVRO, io.newInputFile(manifestList))
.setRootType(GenericManifestFile.class)
.setCustomType(
ManifestFile.PARTITION_SUMMARIES_ELEMENT_ID, GenericPartitionFieldSummary.class)
Expand All @@ -209,7 +211,8 @@ public CloseableIterable<StructLike> rows() {
return CloseableIterable.transform(rowIterable, projection::wrap);

} catch (IOException e) {
throw new RuntimeIOException(e, "Cannot read manifest list file: %s", manifestListLocation);
throw new RuntimeIOException(
e, "Cannot read manifest list file: %s", manifestList.location());
}
}

Expand All @@ -218,7 +221,7 @@ public DataFile file() {
if (lazyDataFile == null) {
this.lazyDataFile =
DataFiles.builder(PartitionSpec.unpartitioned())
.withInputFile(io.newInputFile(manifestListLocation))
.withInputFile(io.newInputFile(manifestList))
.withRecordCount(1)
.withFormat(FileFormat.AVRO)
.build();
Expand Down Expand Up @@ -271,8 +274,8 @@ Map<Integer, PartitionSpec> specsById() {
return specs;
}

String manifestListLocation() {
return manifestListLocation;
ManifestListFile manifestList() {
return manifestList;
}

long referenceSnapshotId() {
Expand All @@ -295,7 +298,8 @@ static StaticDataTask.Row manifestFileToRow(
manifest.content() == ManifestContent.DELETES ? manifest.existingFilesCount() : 0,
manifest.content() == ManifestContent.DELETES ? manifest.deletedFilesCount() : 0,
ManifestsTable.partitionSummariesToRows(spec, manifest.partitions()),
referenceSnapshotId);
referenceSnapshotId,
manifest.keyMetadata() == null ? null : ByteBuffers.toByteArray(manifest.keyMetadata()));
}

private static class SnapshotEvaluator {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ class AllManifestsTableTaskParser {
private static final String SCHEMA = "schema";
private static final String SPECS = "partition-specs";
private static final String MANIFEST_LIST_LOCATION = "manifest-list-Location";
private static final String MANIFEST_LIST_KEY_ID = "manifest-list-key-id";
Copy link
Copy Markdown
Contributor

@ggershinsky ggershinsky Dec 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe not. The "manifest-list-Location" is also different from spec's version (and https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/SnapshotParser.java#L52). Is this ok?

private static final String RESIDUAL = "residual-filter";
private static final String REFERENCE_SNAPSHOT_ID = "reference-snapshot-id";

Expand All @@ -63,7 +64,10 @@ static void toJson(AllManifestsTable.ManifestListReadTask task, JsonGenerator ge

generator.writeEndArray();

generator.writeStringField(MANIFEST_LIST_LOCATION, task.manifestListLocation());
generator.writeStringField(MANIFEST_LIST_LOCATION, task.manifestList().location());
if (task.manifestList().encryptionKeyID() != null) {
generator.writeStringField(MANIFEST_LIST_KEY_ID, task.manifestList().encryptionKeyID());
}

generator.writeFieldName(RESIDUAL);
ExpressionParser.toJson(task.residual(), generator);
Expand Down Expand Up @@ -92,6 +96,7 @@ static AllManifestsTable.ManifestListReadTask fromJson(JsonNode jsonNode) {

Map<Integer, PartitionSpec> specsById = PartitionUtil.indexSpecs(specsBuilder.build());
String manifestListLocation = JsonUtil.getString(MANIFEST_LIST_LOCATION, jsonNode);
String manifestListKeyId = JsonUtil.getStringOrNull(MANIFEST_LIST_KEY_ID, jsonNode);
Expression residualFilter = ExpressionParser.fromJson(JsonUtil.get(RESIDUAL, jsonNode));
long referenceSnapshotId = JsonUtil.getLong(REFERENCE_SNAPSHOT_ID, jsonNode);

Expand All @@ -100,7 +105,7 @@ static AllManifestsTable.ManifestListReadTask fromJson(JsonNode jsonNode) {
fileIO,
schema,
specsById,
manifestListLocation,
new BaseManifestListFile(manifestListLocation, manifestListKeyId),
residualFilter,
referenceSnapshotId);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ private AllManifestsTable.ManifestListReadTask createTask() {
fileIO,
AllManifestsTable.MANIFEST_FILE_SCHEMA,
specsById,
"/path/manifest-list-file.avro",
new BaseManifestListFile("/path/manifest-list-file.avro", "a"),
Expressions.equal("id", 1),
1L);
}
Expand Down Expand Up @@ -121,10 +121,12 @@ private String taskJson() {
+ "{\"id\":11,\"name\":\"contains_nan\",\"required\":true,\"type\":\"boolean\"},"
+ "{\"id\":12,\"name\":\"lower_bound\",\"required\":false,\"type\":\"string\"},"
+ "{\"id\":13,\"name\":\"upper_bound\",\"required\":false,\"type\":\"string\"}]},\"element-required\":true}},"
+ "{\"id\":18,\"name\":\"reference_snapshot_id\",\"required\":true,\"type\":\"long\"}]},"
+ "{\"id\":18,\"name\":\"reference_snapshot_id\",\"required\":true,\"type\":\"long\"},"
+ "{\"id\":19,\"name\":\"key_metadata\",\"required\":false,\"type\":\"binary\"}]},"
+ "\"partition-specs\":[{\"spec-id\":0,\"fields\":[{\"name\":\"data_bucket\","
+ "\"transform\":\"bucket[16]\",\"source-id\":4,\"field-id\":1000}]}],"
+ "\"manifest-list-Location\":\"/path/manifest-list-file.avro\","
+ "\"manifest-list-key-id\":\"a\","
+ "\"residual-filter\":{\"type\":\"eq\",\"term\":\"id\",\"value\":1},"
+ "\"reference-snapshot-id\":1}";
}
Expand All @@ -145,7 +147,9 @@ private void assertTaskEquals(
.isEqualTo(expected.schema().asStruct());

assertThat(actual.specsById()).isEqualTo(expected.specsById());
assertThat(actual.manifestListLocation()).isEqualTo(expected.manifestListLocation());
assertThat(actual.manifestList().location()).isEqualTo(expected.manifestList().location());
assertThat(actual.manifestList().encryptionKeyID())
.isEqualTo(expected.manifestList().encryptionKeyID());
assertThat(actual.residual().toString()).isEqualTo(expected.residual().toString());
assertThat(actual.referenceSnapshotId()).isEqualTo(expected.referenceSnapshotId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,8 @@ protected Dataset<FileInfo> contentFileDS(Table table, Set<Long> snapshotIds) {
"length",
"0 as sequenceNumber",
"partition_spec_id as partitionSpecId",
"added_snapshot_id as addedSnapshotId")
"added_snapshot_id as addedSnapshotId",
"key_metadata as keyMetadata")
.dropDuplicates("path")
.repartition(numShufflePartitions) // avoid adaptive execution combining tasks
.as(ManifestFileBean.ENCODER);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class ManifestFileBean implements ManifestFile, Serializable {
private Integer content = null;
private Long sequenceNumber = null;
private Long firstRowId = null;
private byte[] keyMetadata = null;

public static ManifestFileBean fromManifest(ManifestFile manifest) {
ManifestFileBean bean = new ManifestFileBean();
Expand All @@ -48,6 +49,7 @@ public static ManifestFileBean fromManifest(ManifestFile manifest) {
bean.setContent(manifest.content().id());
bean.setSequenceNumber(manifest.sequenceNumber());
bean.setFirstRowId(manifest.firstRowId());
bean.setKeyMetadata(manifest.keyMetadata() == null ? null : manifest.keyMetadata().array());

return bean;
}
Expand Down Expand Up @@ -104,6 +106,14 @@ public void setFirstRowId(Long firstRowId) {
this.firstRowId = firstRowId;
}

public byte[] getKeyMetadata() {
return keyMetadata;
}

public void setKeyMetadata(byte[] keyMetadata) {
this.keyMetadata = keyMetadata;
}

@Override
public String path() {
return path;
Expand Down Expand Up @@ -176,7 +186,7 @@ public List<PartitionFieldSummary> partitions() {

@Override
public ByteBuffer keyMetadata() {
return null;
return keyMetadata == null ? null : ByteBuffer.wrap(keyMetadata);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ protected Dataset<FileInfo> contentFileDS(Table table, Set<Long> snapshotIds) {
"length",
"0 as sequenceNumber",
"partition_spec_id as partitionSpecId",
"added_snapshot_id as addedSnapshotId")
"added_snapshot_id as addedSnapshotId",
"key_metadata as keyMetadata")
.dropDuplicates("path")
.repartition(numShufflePartitions) // avoid adaptive execution combining tasks
.as(ManifestFileBean.ENCODER);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class ManifestFileBean implements ManifestFile, Serializable {
private Integer content = null;
private Long sequenceNumber = null;
private Long firstRowId = null;
private byte[] keyMetadata = null;

public static ManifestFileBean fromManifest(ManifestFile manifest) {
ManifestFileBean bean = new ManifestFileBean();
Expand All @@ -48,6 +49,7 @@ public static ManifestFileBean fromManifest(ManifestFile manifest) {
bean.setContent(manifest.content().id());
bean.setSequenceNumber(manifest.sequenceNumber());
bean.setFirstRowId(manifest.firstRowId());
bean.setKeyMetadata(manifest.keyMetadata() == null ? null : manifest.keyMetadata().array());

return bean;
}
Expand Down Expand Up @@ -104,6 +106,14 @@ public void setFirstRowId(Long firstRowId) {
this.firstRowId = firstRowId;
}

public byte[] getKeyMetadata() {
return keyMetadata;
}

public void setKeyMetadata(byte[] keyMetadata) {
this.keyMetadata = keyMetadata;
}

@Override
public String path() {
return path;
Expand Down Expand Up @@ -176,7 +186,7 @@ public List<PartitionFieldSummary> partitions() {

@Override
public ByteBuffer keyMetadata() {
return null;
return keyMetadata == null ? null : ByteBuffer.wrap(keyMetadata);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ protected Dataset<FileInfo> contentFileDS(Table table, Set<Long> snapshotIds) {
"length",
"0 as sequenceNumber",
"partition_spec_id as partitionSpecId",
"added_snapshot_id as addedSnapshotId")
"added_snapshot_id as addedSnapshotId",
"key_metadata as keyMetadata")
.dropDuplicates("path")
.repartition(numShufflePartitions) // avoid adaptive execution combining tasks
.as(ManifestFileBean.ENCODER);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class ManifestFileBean implements ManifestFile, Serializable {
private Integer content = null;
private Long sequenceNumber = null;
private Long firstRowId = null;
private byte[] keyMetadata = null;

public static ManifestFileBean fromManifest(ManifestFile manifest) {
ManifestFileBean bean = new ManifestFileBean();
Expand All @@ -48,6 +49,7 @@ public static ManifestFileBean fromManifest(ManifestFile manifest) {
bean.setContent(manifest.content().id());
bean.setSequenceNumber(manifest.sequenceNumber());
bean.setFirstRowId(manifest.firstRowId());
bean.setKeyMetadata(manifest.keyMetadata() == null ? null : manifest.keyMetadata().array());

return bean;
}
Expand Down Expand Up @@ -104,6 +106,14 @@ public void setFirstRowId(Long firstRowId) {
this.firstRowId = firstRowId;
}

public byte[] getKeyMetadata() {
return keyMetadata;
}

public void setKeyMetadata(byte[] keyMetadata) {
this.keyMetadata = keyMetadata;
}

@Override
public String path() {
return path;
Expand Down Expand Up @@ -176,7 +186,7 @@ public List<PartitionFieldSummary> partitions() {

@Override
public ByteBuffer keyMetadata() {
return null;
return keyMetadata == null ? null : ByteBuffer.wrap(keyMetadata);
}

@Override
Expand Down