|
26 | 26 | import org.apache.iceberg.io.OutputFileFactory; |
27 | 27 | import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; |
28 | 28 | import org.apache.iceberg.relocated.com.google.common.collect.Lists; |
| 29 | +import org.apache.iceberg.spark.actions.SparkActions; |
29 | 30 | import org.apache.iceberg.util.ArrayUtil; |
30 | 31 | import org.apache.spark.sql.Encoders; |
31 | 32 | import org.apache.spark.sql.Row; |
@@ -195,6 +196,92 @@ public void testCompactionCanRemoveEqualityDeleteFiles() throws NoSuchTableExcep |
195 | 196 | assertThat(stats.getNumCurrentSnapshotEqualityDeleteFiles()).isEqualTo(0L); |
196 | 197 | } |
197 | 198 |
|
| 199 | + @Test |
| 200 | + public void testBudgetedRewriteUsesDataLengthForTaskGrouping() throws NoSuchTableException { |
| 201 | + createAndInitTable("id int, data string"); |
| 202 | + |
| 203 | + // Create 4 separate data files by appending individually |
| 204 | + for (int i = 0; i < 4; i++) { |
| 205 | + List<SimpleRecord> records = |
| 206 | + Arrays.asList( |
| 207 | + new SimpleRecord(i * 2, "data_" + i), new SimpleRecord(i * 2 + 1, "data_" + i)); |
| 208 | + ops.spark() |
| 209 | + .createDataset(records, Encoders.bean(SimpleRecord.class)) |
| 210 | + .coalesce(1) |
| 211 | + .writeTo(tableName) |
| 212 | + .append(); |
| 213 | + } |
| 214 | + |
| 215 | + // Delete one row from each data file to produce partition-scoped position delete files. |
| 216 | + // In an unpartitioned table, all position deletes are in the same partition and thus |
| 217 | + // associated with ALL data files, inflating each task's sizeBytes relative to its length. |
| 218 | + for (int i = 0; i < 4; i++) { |
| 219 | + sql("DELETE FROM %s WHERE id = %d", tableName, i * 2); |
| 220 | + } |
| 221 | + |
| 222 | + Table table = ops.getTable(tableName); |
| 223 | + |
| 224 | + // Verify we have 4 data files and position delete files |
| 225 | + List<Object[]> dataFileCountResult = sql("SELECT count(*) FROM %s.data_files", tableName); |
| 226 | + assertThat((long) dataFileCountResult.get(0)[0]).isEqualTo(4L); |
| 227 | + |
| 228 | + List<Object[]> deleteFileCountResult = sql("SELECT count(*) FROM %s.delete_files", tableName); |
| 229 | + assertThat((long) deleteFileCountResult.get(0)[0]).isGreaterThanOrEqualTo(4L); |
| 230 | + |
| 231 | + // Compute budget as half of total data file size (by file_size_in_bytes from metadata, |
| 232 | + // excluding delete file sizes). If the old sizeBytes-based grouping was used, each task |
| 233 | + // would appear much larger (data + all partition-scoped delete files), and the budget |
| 234 | + // would cover fewer files. |
| 235 | + List<Object[]> totalSizeResult = |
| 236 | + sql("SELECT sum(file_size_in_bytes) FROM %s.data_files", tableName); |
| 237 | + long totalDataSize = (long) totalSizeResult.get(0)[0]; |
| 238 | + // add margin to total data size, file sizes are roughly the same but can vary by a few bytes |
| 239 | + long margin = totalDataSize / 10; |
| 240 | + long halfBudget = totalDataSize / 2 + margin; |
| 241 | + |
| 242 | + // Set target-file-size-bytes to the total size of 2 data files. With the length-based |
| 243 | + // grouping fix (linkedin/iceberg#233), the 2 rewritten data files are grouped into a |
| 244 | + // single task and merged into 1 output file. If sizeBytes (data + all partition-scoped |
| 245 | + // delete files) was used instead, each task would appear much larger than the target, |
| 246 | + // preventing them from being grouped together and producing 2 separate output files. |
| 247 | + long targetSize = halfBudget; |
| 248 | + |
| 249 | + log.info( |
| 250 | + "Budgeted rewrite test: totalDataSize={}, halfBudget={}, targetSize={}", |
| 251 | + totalDataSize, |
| 252 | + halfBudget, |
| 253 | + targetSize); |
| 254 | + |
| 255 | + // Use SparkActions directly instead of ops.rewriteDataFiles() because this test requires |
| 256 | + // fine-grained control over budget options (MAX_TOTAL_FILES_SIZE_BYTES, target-file-size-bytes) |
| 257 | + // that are not exposed through the Operations API. |
| 258 | + RewriteDataFiles.Result result = |
| 259 | + SparkActions.get(ops.spark()) |
| 260 | + .rewriteDataFiles(table) |
| 261 | + .binPack() |
| 262 | + .option(RewriteDataFiles.MAX_TOTAL_FILES_SIZE_BYTES, Long.toString(halfBudget)) |
| 263 | + .option("target-file-size-bytes", Long.toString(targetSize)) |
| 264 | + .option("min-file-size-bytes", "1") |
| 265 | + .option("max-file-size-bytes", Long.toString(targetSize * 2)) |
| 266 | + .option("min-input-files", "1") |
| 267 | + .option("delete-file-threshold", "0") |
| 268 | + .execute(); |
| 269 | + |
| 270 | + // Budget covers exactly half the data files by length. |
| 271 | + Assertions.assertEquals(2, result.rewrittenDataFilesCount()); |
| 272 | + // With length-based grouping, the 2 data files (total size = targetSize) fit in one group |
| 273 | + // and merge into 1 output file. With sizeBytes-based grouping, each file's perceived size |
| 274 | + // would be data_length + totalDeleteSize, far exceeding the target, so they would be |
| 275 | + // placed in separate groups producing 2 output files instead. |
| 276 | + Assertions.assertEquals(1, result.addedDataFilesCount()); |
| 277 | + |
| 278 | + // Verify data correctness: only odd-numbered IDs remain (even IDs were deleted) |
| 279 | + List<Object[]> expected = |
| 280 | + Arrays.asList(row(1, "data_0"), row(3, "data_1"), row(5, "data_2"), row(7, "data_3")); |
| 281 | + List<Object[]> actual = sql("SELECT * FROM %s ORDER BY id ASC", tableName); |
| 282 | + assertThat(actual).containsExactlyElementsOf(expected); |
| 283 | + } |
| 284 | + |
198 | 285 | private void writeEqDeleteRecord(Table table, String delCol, Object delVal) { |
199 | 286 | List<Integer> equalityFieldIds = Lists.newArrayList(table.schema().findField(delCol).fieldId()); |
200 | 287 | Schema eqDeleteRowSchema = table.schema().select(delCol); |
|
0 commit comments