|
| 1 | +From c3c9abdf3acad91028da5aa87470bfac4e2525a3 Mon Sep 17 00:00:00 2001 |
| 2 | +From: Razvan-Daniel Mihai <84674+razvan@users.noreply.github.com> |
| 3 | +Date: Mon, 30 Mar 2026 10:56:06 +0200 |
| 4 | +Subject: Fix deleting incorrect records in Delta Lake |
| 5 | + |
| 6 | +This is a backport of https://github.com/trinodb/trino/pull/28907 which is |
| 7 | +only available starting with Trino 481. |
| 8 | +--- |
| 9 | + .../plugin/deltalake/DeltaLakeMetadata.java | 2 +- |
| 10 | + .../DeltaLakePageSourceProvider.java | 2 +- |
| 11 | + .../deltalake/DeltaLakeTableHandle.java | 27 ++++++++++++++++++ |
| 12 | + .../plugin/deltalake/TestDeltaLakeBasic.java | 21 ++++++++++++++ |
| 13 | + .../deltalake/large_parquet_file/README.md | 12 ++++++++ |
| 14 | + .../_delta_log/00000000000000000000.json | 3 ++ |
| 15 | + .../_delta_log/00000000000000000001.json | 2 ++ |
| 16 | + ...4d6b-b4e5-eb56bab9ca7f-c000.snappy.parquet | Bin 0 -> 728 bytes |
| 17 | + 8 files changed, 67 insertions(+), 2 deletions(-) |
| 18 | + create mode 100644 plugin/trino-delta-lake/src/test/resources/deltalake/large_parquet_file/README.md |
| 19 | + create mode 100644 plugin/trino-delta-lake/src/test/resources/deltalake/large_parquet_file/_delta_log/00000000000000000000.json |
| 20 | + create mode 100644 plugin/trino-delta-lake/src/test/resources/deltalake/large_parquet_file/_delta_log/00000000000000000001.json |
| 21 | + create mode 100644 plugin/trino-delta-lake/src/test/resources/deltalake/large_parquet_file/part-00000-2e1c15db-7523-4d6b-b4e5-eb56bab9ca7f-c000.snappy.parquet |
| 22 | + |
| 23 | +diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java |
| 24 | +index 1f336bd1257..98892ff6110 100644 |
| 25 | +--- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java |
| 26 | ++++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java |
| 27 | +@@ -2591,7 +2591,7 @@ public class DeltaLakeMetadata |
| 28 | + DeltaLakeInsertTableHandle insertHandle = createInsertHandle(retryMode, handle, inputColumns); |
| 29 | + |
| 30 | + Map<String, DeletionVectorEntry> deletionVectors = loadDeletionVectors(session, handle); |
| 31 | +- return new DeltaLakeMergeTableHandle(handle, insertHandle, deletionVectors, findShallowCloneSourceTableLocation(session, handle)); |
| 32 | ++ return new DeltaLakeMergeTableHandle(handle.forMerge(), insertHandle, deletionVectors, findShallowCloneSourceTableLocation(session, handle)); |
| 33 | + } |
| 34 | + |
| 35 | + private Optional<String> findShallowCloneSourceTableLocation(ConnectorSession session, DeltaLakeTableHandle handle) |
| 36 | +diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSourceProvider.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSourceProvider.java |
| 37 | +index 9e0f75ec116..906efe496f0 100644 |
| 38 | +--- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSourceProvider.java |
| 39 | ++++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakePageSourceProvider.java |
| 40 | +@@ -226,7 +226,7 @@ public class DeltaLakePageSourceProvider |
| 41 | + .withMaxReadBlockSize(getParquetMaxReadBlockSize(session)) |
| 42 | + .withMaxReadBlockRowCount(getParquetMaxReadBlockRowCount(session)) |
| 43 | + .withSmallFileThreshold(getParquetSmallFileThreshold(session)) |
| 44 | +- .withUseColumnIndex(split.getDeletionVector().isEmpty() && isParquetUseColumnIndex(session)) |
| 45 | ++ .withUseColumnIndex(table.getWriteType().isEmpty() && split.getDeletionVector().isEmpty() && isParquetUseColumnIndex(session)) |
| 46 | + .withIgnoreStatistics(isParquetIgnoreStatistics(session)) |
| 47 | + .withVectorizedDecodingEnabled(isParquetVectorizedDecodingEnabled(session)) |
| 48 | + .build(); |
| 49 | +diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeTableHandle.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeTableHandle.java |
| 50 | +index 205cfcec48e..6583623858a 100644 |
| 51 | +--- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeTableHandle.java |
| 52 | ++++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeTableHandle.java |
| 53 | +@@ -31,6 +31,7 @@ import java.util.Optional; |
| 54 | + import java.util.Set; |
| 55 | + |
| 56 | + import static com.google.common.base.Preconditions.checkArgument; |
| 57 | ++import static io.trino.plugin.deltalake.DeltaLakeTableHandle.WriteType.MERGE; |
| 58 | + import static io.trino.plugin.deltalake.DeltaLakeTableHandle.WriteType.UPDATE; |
| 59 | + import static java.util.Objects.requireNonNull; |
| 60 | + |
| 61 | +@@ -40,6 +41,7 @@ public class DeltaLakeTableHandle |
| 62 | + // Insert is not included here because it uses a separate TableHandle type |
| 63 | + public enum WriteType |
| 64 | + { |
| 65 | ++ MERGE, |
| 66 | + UPDATE, |
| 67 | + DELETE |
| 68 | + } |
| 69 | +@@ -212,6 +214,31 @@ public class DeltaLakeTableHandle |
| 70 | + vendedCredentials); |
| 71 | + } |
| 72 | + |
| 73 | ++ public DeltaLakeTableHandle forMerge() |
| 74 | ++ { |
| 75 | ++ return new DeltaLakeTableHandle( |
| 76 | ++ schemaName, |
| 77 | ++ tableName, |
| 78 | ++ managed, |
| 79 | ++ location, |
| 80 | ++ metadataEntry, |
| 81 | ++ protocolEntry, |
| 82 | ++ enforcedPartitionConstraint, |
| 83 | ++ nonPartitionConstraint, |
| 84 | ++ constraintColumns, |
| 85 | ++ Optional.of(MERGE), |
| 86 | ++ projectedColumns, |
| 87 | ++ updatedColumns, |
| 88 | ++ updateRowIdColumns, |
| 89 | ++ analyzeHandle, |
| 90 | ++ recordScannedFiles, |
| 91 | ++ isOptimize, |
| 92 | ++ maxScannedFileSize, |
| 93 | ++ readVersion, |
| 94 | ++ timeTravel, |
| 95 | ++ vendedCredentials); |
| 96 | ++ } |
| 97 | ++ |
| 98 | + @Override |
| 99 | + public SchemaTableName schemaTableName() |
| 100 | + { |
| 101 | +diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java |
| 102 | +index 27db0930313..cd01bfe1de2 100644 |
| 103 | +--- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java |
| 104 | ++++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeBasic.java |
| 105 | +@@ -1537,6 +1537,27 @@ public class TestDeltaLakeBasic |
| 106 | + assertUpdate("DROP TABLE " + tableName); |
| 107 | + } |
| 108 | + |
| 109 | ++ @Test // regression test for https://github.com/trinodb/trino/issues/28885 |
| 110 | ++ public void testDeleteFromLargeParquetFile() |
| 111 | ++ throws Exception |
| 112 | ++ { |
| 113 | ++ String tableName = "delete_from_large_parquet_file_" + randomNameSuffix(); |
| 114 | ++ |
| 115 | ++ Path tableLocation = catalogDir.resolve(tableName); |
| 116 | ++ copyDirectoryContents(new File(Resources.getResource("deltalake/large_parquet_file").toURI()).toPath(), tableLocation); |
| 117 | ++ assertUpdate("CALL system.register_table('%s', '%s', '%s')".formatted(getSession().getSchema().orElseThrow(), tableName, tableLocation.toUri())); |
| 118 | ++ |
| 119 | ++ assertThat(query("SELECT count(*) FROM " + tableName + " WHERE data = 5")) |
| 120 | ++ .matches("VALUES BIGINT '5000'"); |
| 121 | ++ |
| 122 | ++ assertUpdate("DELETE FROM " + tableName + " WHERE data = 5", 5000); |
| 123 | ++ |
| 124 | ++ assertThat(query("SELECT count(*) FROM " + tableName + " WHERE data = 5")) |
| 125 | ++ .matches("VALUES BIGINT '0'"); |
| 126 | ++ |
| 127 | ++ assertUpdate("DROP TABLE " + tableName); |
| 128 | ++ } |
| 129 | ++ |
| 130 | + /** |
| 131 | + * @see deltalake.liquid_clustering |
| 132 | + */ |
| 133 | +diff --git a/plugin/trino-delta-lake/src/test/resources/deltalake/large_parquet_file/README.md b/plugin/trino-delta-lake/src/test/resources/deltalake/large_parquet_file/README.md |
| 134 | +new file mode 100644 |
| 135 | +index 00000000000..c06589d44ad |
| 136 | +--- /dev/null |
| 137 | ++++ b/plugin/trino-delta-lake/src/test/resources/deltalake/large_parquet_file/README.md |
| 138 | +@@ -0,0 +1,12 @@ |
| 139 | ++Data generated using Delta Lake 4.0.0: |
| 140 | ++ |
| 141 | ++```sql |
| 142 | ++CREATE TABLE test_large_parquet |
| 143 | ++(data INT) |
| 144 | ++USING delta |
| 145 | ++LOCATION 's3://test-bucket/test_large_parquet'; |
| 146 | ++ |
| 147 | ++INSERT INTO test_large_parquet |
| 148 | ++SELECT id / 5000 FROM RANGE(0, 50000) |
| 149 | ++DISTRIBUTE BY 1; |
| 150 | ++``` |
| 151 | +diff --git a/plugin/trino-delta-lake/src/test/resources/deltalake/large_parquet_file/_delta_log/00000000000000000000.json b/plugin/trino-delta-lake/src/test/resources/deltalake/large_parquet_file/_delta_log/00000000000000000000.json |
| 152 | +new file mode 100644 |
| 153 | +index 00000000000..5f057f64adc |
| 154 | +--- /dev/null |
| 155 | ++++ b/plugin/trino-delta-lake/src/test/resources/deltalake/large_parquet_file/_delta_log/00000000000000000000.json |
| 156 | +@@ -0,0 +1,3 @@ |
| 157 | ++{"commitInfo":{"timestamp":1774686576505,"operation":"CREATE TABLE","operationParameters":{"partitionBy":"[]","clusterBy":"[]","description":null,"isManaged":"false","properties":"{}"},"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{},"engineInfo":"Apache-Spark/4.0.0 Delta-Lake/4.0.0","txnId":"1cd74e1c-d2e0-4b5a-a1ec-cb160b52c0c9"}} |
| 158 | ++{"metaData":{"id":"26c72ddc-b89c-424c-8099-44e8da080d57","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"data\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1774686576349}} |
| 159 | ++{"protocol":{"minReaderVersion":1,"minWriterVersion":2}} |
| 160 | +diff --git a/plugin/trino-delta-lake/src/test/resources/deltalake/large_parquet_file/_delta_log/00000000000000000001.json b/plugin/trino-delta-lake/src/test/resources/deltalake/large_parquet_file/_delta_log/00000000000000000001.json |
| 161 | +new file mode 100644 |
| 162 | +index 00000000000..db2d77c6173 |
| 163 | +--- /dev/null |
| 164 | ++++ b/plugin/trino-delta-lake/src/test/resources/deltalake/large_parquet_file/_delta_log/00000000000000000001.json |
| 165 | +@@ -0,0 +1,2 @@ |
| 166 | ++{"commitInfo":{"timestamp":1774686583256,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":0,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"50000","numOutputBytes":"728"},"engineInfo":"Apache-Spark/4.0.0 Delta-Lake/4.0.0","txnId":"f231fd6c-67d7-4d6c-b0af-5f9801b16769"}} |
| 167 | ++{"add":{"path":"part-00000-2e1c15db-7523-4d6b-b4e5-eb56bab9ca7f-c000.snappy.parquet","partitionValues":{},"size":728,"modificationTime":1774686583000,"dataChange":true,"stats":"{\"numRecords\":50000,\"minValues\":{\"data\":0},\"maxValues\":{\"data\":9},\"nullCount\":{\"data\":0}}"}} |
| 168 | +diff --git a/plugin/trino-delta-lake/src/test/resources/deltalake/large_parquet_file/part-00000-2e1c15db-7523-4d6b-b4e5-eb56bab9ca7f-c000.snappy.parquet b/plugin/trino-delta-lake/src/test/resources/deltalake/large_parquet_file/part-00000-2e1c15db-7523-4d6b-b4e5-eb56bab9ca7f-c000.snappy.parquet |
| 169 | +new file mode 100644 |
| 170 | +index 0000000000000000000000000000000000000000..a93217a5925798e1f1ccb31a67c40bd10bf7dd20 |
| 171 | +GIT binary patch |
| 172 | +literal 728 |
| 173 | +zcmZ{f!D|yi6vn@uOtxDPz4R?Rut*p%v=WzWn@y8Ygx<u%5=w7Hbdzq{&?HSa8%0V$ |
| 174 | +zdJ;5cPo9G8Me*cOatwke@t;s{UKBiw%-Vt=9v;Jc-}k=xz2^3gOPrdtOJCo-`gnDV |
| 175 | +zR)_<uGXT&4FaS9K96%m`4qyp@0bm&bfo_sR@1B17&TLxD8FA9d0H~=}7IUVt#7E+5 |
| 176 | +ziO&*0cj2FY`uW`Wk3X0AJn{31uP6Qzfv%IJ58vKC%iEOBw9EW!HI<*vG>s>|k@(BP |
| 177 | +zWsHez3a&J+7)6a|_==flz)V!tyt<gP#hj%xO>`7yR_RQG*fP&FdSC+PXZ0%0e9vgt |
| 178 | +zeDZ?>#6;=NmC-PWj_!}zhhaY;24Xju#rK0afSFEpVGFwmo0#wnor^yTF(sM5n0cNn |
| 179 | +zo@qtHwlW|~vBJdaUmZk=IB}H>buhNpvfLulsScR0Io&p2<S5ag0@JBpG|cndidwX+ |
| 180 | +z<LJN%hC%x<bTUroVHl0N#{(;0amr2^*4jUg(Miu42GPkljGgHCk<$<3K%Haloyz#} |
| 181 | +zFq92BilcEmmL=KghQ0lfZ0w!NLC{x{a>~-Bu{(&vgD{dMIT-hPL8}+aMjVa9lDtfn |
| 182 | +zjnm2G-UQa&i$8DmqrwGb!F62ESud=%#@*h2q2|@YpzT(@R=L)xuWxn&x8l{y?Tw9A |
| 183 | +a+v`*|tCh0XX;o`$!nOgt)PUypj{6$`)ub^1 |
| 184 | + |
| 185 | +literal 0 |
| 186 | +HcmV?d00001 |
| 187 | + |
0 commit comments