Skip to content

Conversation

@tom-s-powell
Copy link
Contributor

Adding key_metadata to manifest tables such that these will work when using EncryptingFileIO.

@huaxingao
Copy link
Contributor

cc @ggershinsky

@huaxingao
Copy link
Contributor

@tom-s-powell Thanks for the PR! The fix makes sense to me. Would it be possible to add a regression test that fails without this change and passes with it?

@tom-s-powell
Copy link
Contributor Author

We are also going to need #14751 for full testable change.

The test I would like to add to https://github.com/apache/iceberg/blob/main/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestTableEncryption.java is:

@TestTemplate
public void testDropTableWithPurge() {
  List<Object[]> dataFileTable =
      sql("SELECT file_path FROM %s.%s", tableName, MetadataTableType.ALL_DATA_FILES);
  List<String> dataFiles =
      Streams.concat(dataFileTable.stream())
          .map(row -> (String) row[0])
          .collect(Collectors.toList());
  assertThat(dataFiles).isNotEmpty();
  assertThat(dataFiles)
      .allSatisfy(filePath -> assertThat(localInput(filePath).exists()).isTrue());

  sql("DROP TABLE %s PURGE", tableName);

  assertThatThrownBy(() -> catalog.loadTable(tableIdent))
      .isInstanceOf(NoSuchTableException.class);
  assertThat(dataFiles)
      .allSatisfy(filePath -> assertThat(localInput(filePath).exists()).isFalse());
}

Without the changes in #14751 we get the following:

Caused by: java.lang.IllegalStateException: Cannot return the encryption keys after serialization
	at org.apache.iceberg.encryption.StandardEncryptionManager.encryptionKeys(StandardEncryptionManager.java:170)
	at org.apache.iceberg.encryption.EncryptionUtil.decryptManifestListKeyMetadata(EncryptionUtil.java:132)
	at org.apache.iceberg.BaseManifestListFile.decryptKeyMetadata(BaseManifestListFile.java:47)
	at org.apache.iceberg.encryption.EncryptingFileIO.newInputFile(EncryptingFileIO.java:115)
	at org.apache.iceberg.AllManifestsTable$ManifestListReadTask.file(AllManifestsTable.java:223)
	at org.apache.iceberg.AllManifestsTable$ManifestListReadTask.file(AllManifestsTable.java:160)
	at org.apache.iceberg.spark.source.RowDataReader.open(RowDataReader.java:87)
	at org.apache.iceberg.spark.source.RowDataReader.open(RowDataReader.java:43)

If we have the changes from #14751 without the changes in this PR we get the following:

org.apache.iceberg.exceptions.RuntimeIOException: Failed to open file: file:/var/folders/2v/qzfl1x_137l3dyycx4j_d_29081c3k/T/hive5130984441689838261/table/metadata/snap-1670846233047793002-1-e953eed2-338c-45fc-8060-6722e78ea54a.avro
	at app//org.apache.iceberg.avro.AvroIterable.newFileReader(AvroIterable.java:113)
	at app//org.apache.iceberg.avro.AvroIterable.iterator(AvroIterable.java:78)
	at app//org.apache.iceberg.io.CloseableIterable$7$1.<init>(CloseableIterable.java:205)
	at app//org.apache.iceberg.io.CloseableIterable$7.iterator(CloseableIterable.java:204)
	at app//org.apache.iceberg.io.CloseableIterable$7$1.<init>(CloseableIterable.java:205)
	at app//org.apache.iceberg.io.CloseableIterable$7.iterator(CloseableIterable.java:204)
	at app//org.apache.iceberg.io.CloseableIterable$7$1.<init>(CloseableIterable.java:205)
	at app//org.apache.iceberg.io.CloseableIterable$7.iterator(CloseableIterable.java:204)
	at app//org.apache.iceberg.io.CloseableIterable$7.iterator(CloseableIterable.java:196)
	at app//org.apache.iceberg.util.Filter.lambda$filter$0(Filter.java:34)
	at app//org.apache.iceberg.io.CloseableIterable$2.iterator(CloseableIterable.java:89)
	at app//org.apache.iceberg.spark.source.RowDataReader.open(RowDataReader.java:99)
	at app//org.apache.iceberg.spark.source.RowDataReader.open(RowDataReader.java:43)
	at app//org.apache.iceberg.spark.source.BaseReader.next(BaseReader.java:141)
	at app//org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:148)
	at app//org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:186)
	at app//org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1(DataSourceRDD.scala:72)
	at app//org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.$anonfun$hasNext$1$adapted(DataSourceRDD.scala:72)
	at app//scala.Option.exists(Option.scala:406)
	at app//org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:72)
	at app//org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.advanceToNextIter(DataSourceRDD.scala:103)
	at app//org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:72)
	at app//org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at app//scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:593)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithKeys_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at app//org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at app//org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:50)
	at app//scala.collection.Iterator$$anon$9.hasNext(Iterator.scala:593)
	at app//org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:143)
	at app//org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:57)
	at app//org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:111)
	at app//org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54)
	at app//org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171)
	at app//org.apache.spark.scheduler.Task.run(Task.scala:147)
	at app//org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:647)
	at app//org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:80)
	at app//org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:77)
	at app//org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:99)
	at app//org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:650)
	at java.base@21.0.5/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base@21.0.5/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base@21.0.5/java.lang.Thread.run(Thread.java:1583)
Caused by: org.apache.avro.InvalidAvroMagicException: Not an Avro data file
	at app//org.apache.avro.file.DataFileReader.openReader(DataFileReader.java:79)
	at app//org.apache.iceberg.avro.AvroIterable.newFileReader(AvroIterable.java:104)
	... 42 more

private static final String SCHEMA = "schema";
private static final String SPECS = "partition-specs";
private static final String MANIFEST_LIST_LOCATION = "manifest-list-Location";
private static final String MANIFEST_LIST_KEY_ID = "manifest-list-key-id";
Copy link
Contributor

@ggershinsky ggershinsky Dec 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe not. The "manifest-list-Location" is also different from spec's version (and https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/SnapshotParser.java#L52). Is this ok?

@tom-s-powell
Copy link
Contributor Author

@huaxingao curious if you had more thoughts on this

ManifestsTable.partitionSummariesToRows(spec, manifest.partitions()),
referenceSnapshotId);
referenceSnapshotId,
manifest.keyMetadata() == null ? null : manifest.keyMetadata().array());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: manifest.keyMetadata().array() assumes the buffer is array‑backed. To be defensive against non‑array‑backed ByteBuffers, could we use ByteBuffers.toByteArray(manifest.keyMetadata()) here instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switched to that @huaxingao

Types.NestedField.optional(13, "upper_bound", Types.StringType.get())))),
REF_SNAPSHOT_ID);
REF_SNAPSHOT_ID,
Types.NestedField.optional(19, "key_metadata", Types.BinaryType.get()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@szehon-ho Just wondering if you have any comments on adding key_metadata in MANIFEST_FILE_SCHEMA

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i am not familiar with key_metadata. Which table is it in today? Files/Entries?

Do we need one in regular (not all) manifest table?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

key_metadata is the encryption key metadata. Today it’s in both files and entries. This PR is adding key_metadata for manifest files (i.e., ManifestFile.keyMetadata()) to .all_manifests, since Spark actions read manifests via .all_manifests and need the manifest’s key metadata to open/decrypt encrypted manifest files. I think we should add it to the regular .manifests table too for parity, but I guess it can be done in a followup.

@tom-s-powell
Copy link
Contributor Author

@huaxingao - are you happy with this in the current state, perhaps with a FLUP to add it to other tables?

Copy link
Contributor

@huaxingao huaxingao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@huaxingao huaxingao merged commit aee8900 into apache:main Jan 7, 2026
44 checks passed
@huaxingao
Copy link
Contributor

Thanks @tom-s-powell for the PR! Thanks @ggershinsky @ggershinsky for the review!

@huaxingao
Copy link
Contributor

@tom-s-powell Now iceberg supports Spark 4.1. Could you please have a follow-up PR to port the changes to 4.1?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants