From c7ac103ac804f01f3531c2e18ceb50344ca6e9a2 Mon Sep 17 00:00:00 2001 From: Jiwon Park Date: Fri, 27 Feb 2026 20:56:21 +0900 Subject: [PATCH 1/3] [AMORO-4099] Add table-summary metric collection option for non-optimizing tables - Allow collecting table_summary metrics when self-optimizing is disabled by setting table-summary.enabled=true - Fix periodic collection bug: remove optimizingNotNecessary() call in summary-only branch to prevent snapshot gate from blocking subsequent collections - Separate property key from self-optimizing prefix: self-optimizing.table-summary.enabled -> table-summary.enabled - Add debug logging for table summary collection path - Add comprehensive test coverage for summary-only mode Signed-off-by: Jiwon Park --- .../inline/TableRuntimeRefreshExecutor.java | 37 +- .../server/table/TableConfigurations.java | 5 + .../TestTableSummaryWithoutOptimizing.java | 418 ++++++++++++++++++ .../apache/amoro/config/OptimizingConfig.java | 15 + .../apache/amoro/table/TableProperties.java | 4 + 5 files changed, 468 insertions(+), 11 deletions(-) create mode 100644 amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableSummaryWithoutOptimizing.java diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TableRuntimeRefreshExecutor.java b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TableRuntimeRefreshExecutor.java index b6b7e4ba63..8f0cd02dcb 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TableRuntimeRefreshExecutor.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TableRuntimeRefreshExecutor.java @@ -70,11 +70,10 @@ protected long getNextExecutingTime(TableRuntime tableRuntime) { } private boolean tryEvaluatingPendingInput(DefaultTableRuntime tableRuntime, MixedTable table) { - // only evaluate pending input when optimizing is enabled and in idle state OptimizingConfig optimizingConfig = tableRuntime.getOptimizingConfig(); boolean optimizingEnabled = optimizingConfig.isEnabled(); if (optimizingEnabled && tableRuntime.getOptimizingStatus().equals(OptimizingStatus.IDLE)) { - + // Evaluate pending input and collect table summary when optimizing is enabled and idle if (optimizingConfig.isMetadataBasedTriggerEnabled() && !MetadataBasedEvaluationEvent.isEvaluatingNecessary( optimizingConfig, table, tableRuntime.getLastPlanTime())) { @@ -102,11 +101,21 @@ private boolean tryEvaluatingPendingInput(DefaultTableRuntime tableRuntime, Mixe tableRuntime.setTableSummary(evaluator.getPendingInput()); return evaluatorIsNecessary; + } else if (!optimizingEnabled && optimizingConfig.isTableSummaryEnabled()) { + // Collect table summary metrics even when optimizing is disabled + logger.debug( + "{} collecting table summary (optimizing disabled, tableSummary enabled)", + tableRuntime.getTableIdentifier()); + AbstractOptimizingEvaluator evaluator = + IcebergTableUtil.createOptimizingEvaluator(tableRuntime, table, maxPendingPartitions); + AbstractOptimizingEvaluator.PendingInput summary = evaluator.getPendingInput(); + logger.debug("{} table summary collected: {}", tableRuntime.getTableIdentifier(), summary); + tableRuntime.setTableSummary(summary); + return false; } else if (!optimizingEnabled) { logger.debug( "{} optimizing is not enabled, skip evaluating pending input", tableRuntime.getTableIdentifier()); - // indicates no optimization demand now return false; } else { logger.debug( @@ -147,21 +156,27 @@ public void execute(TableRuntime tableRuntime) { AmoroTable table = loadTable(tableRuntime); defaultTableRuntime.refresh(table); MixedTable mixedTable = (MixedTable) table.originalTable(); - // Check if there is any optimizing demand now. + boolean snapshotChanged = + (mixedTable.isKeyedTable() + && (lastOptimizedSnapshotId != defaultTableRuntime.getCurrentSnapshotId() + || lastOptimizedChangeSnapshotId + != defaultTableRuntime.getCurrentChangeSnapshotId())) + || (mixedTable.isUnkeyedTable() + && lastOptimizedSnapshotId != defaultTableRuntime.getCurrentSnapshotId()); + OptimizingConfig optimizingConfig = defaultTableRuntime.getOptimizingConfig(); + boolean tableSummaryOnly = + !optimizingConfig.isEnabled() && optimizingConfig.isTableSummaryEnabled(); boolean hasOptimizingDemand = false; - if ((mixedTable.isKeyedTable() - && (lastOptimizedSnapshotId != defaultTableRuntime.getCurrentSnapshotId() - || lastOptimizedChangeSnapshotId - != defaultTableRuntime.getCurrentChangeSnapshotId())) - || (mixedTable.isUnkeyedTable() - && lastOptimizedSnapshotId != defaultTableRuntime.getCurrentSnapshotId())) { + if (snapshotChanged || tableSummaryOnly) { hasOptimizingDemand = tryEvaluatingPendingInput(defaultTableRuntime, mixedTable); } else { logger.debug("{} optimizing is not necessary", defaultTableRuntime.getTableIdentifier()); } // Update adaptive interval according to evaluated result. - if (defaultTableRuntime.getOptimizingConfig().isRefreshTableAdaptiveEnabled(interval)) { + // Skip adaptive interval for table-summary-only mode to maintain fixed collection interval. + if (!tableSummaryOnly + && defaultTableRuntime.getOptimizingConfig().isRefreshTableAdaptiveEnabled(interval)) { defaultTableRuntime.setLatestEvaluatedNeedOptimizing(hasOptimizingDemand); long newInterval = getAdaptiveExecutingInterval(defaultTableRuntime); defaultTableRuntime.setLatestRefreshInterval(newInterval); diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableConfigurations.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableConfigurations.java index 1601166db4..2c403ebf32 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableConfigurations.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableConfigurations.java @@ -232,6 +232,11 @@ public static OptimizingConfig parseOptimizingConfig(Map propert properties, TableProperties.ENABLE_SELF_OPTIMIZING, TableProperties.ENABLE_SELF_OPTIMIZING_DEFAULT)) + .setTableSummaryEnabled( + CompatiblePropertyUtil.propertyAsBoolean( + properties, + TableProperties.TABLE_SUMMARY_ENABLED, + TableProperties.TABLE_SUMMARY_ENABLED_DEFAULT)) .setAllowPartialCommit( CompatiblePropertyUtil.propertyAsBoolean( properties, diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableSummaryWithoutOptimizing.java b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableSummaryWithoutOptimizing.java new file mode 100644 index 0000000000..444ba125a9 --- /dev/null +++ b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableSummaryWithoutOptimizing.java @@ -0,0 +1,418 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.table; + +import static org.apache.amoro.server.table.TableSummaryMetrics.TABLE_SUMMARY_HEALTH_SCORE; +import static org.apache.amoro.server.table.TableSummaryMetrics.TABLE_SUMMARY_TOTAL_FILES; +import static org.apache.amoro.server.table.TableSummaryMetrics.TABLE_SUMMARY_TOTAL_FILES_SIZE; +import static org.apache.amoro.server.table.TableSummaryMetrics.TABLE_SUMMARY_TOTAL_RECORDS; + +import org.apache.amoro.BasicTableTestHelper; +import org.apache.amoro.ServerTableIdentifier; +import org.apache.amoro.TableFormat; +import org.apache.amoro.TableTestHelper; +import org.apache.amoro.catalog.BasicCatalogTestHelper; +import org.apache.amoro.catalog.CatalogTestHelper; +import org.apache.amoro.config.OptimizingConfig; +import org.apache.amoro.io.MixedDataTestHelpers; +import org.apache.amoro.metrics.Gauge; +import org.apache.amoro.metrics.Metric; +import org.apache.amoro.metrics.MetricDefine; +import org.apache.amoro.metrics.MetricKey; +import org.apache.amoro.server.manager.MetricManager; +import org.apache.amoro.server.optimizing.OptimizingTestHelpers; +import org.apache.amoro.server.scheduler.inline.TableRuntimeRefreshExecutor; +import org.apache.amoro.shade.guava32.com.google.common.collect.ImmutableMap; +import org.apache.amoro.shade.guava32.com.google.common.collect.Lists; +import org.apache.amoro.table.MixedTable; +import org.apache.amoro.table.TableProperties; +import org.apache.amoro.table.UnkeyedTable; +import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.data.Record; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.jupiter.api.Assertions; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** Tests that table_summary metrics are collected when self-optimizing is disabled. */ +@RunWith(Parameterized.class) +public class TestTableSummaryWithoutOptimizing extends AMSTableTestBase { + + @Parameterized.Parameters(name = "{0}, {1}") + public static Object[] parameters() { + return new Object[][] { + {new BasicCatalogTestHelper(TableFormat.ICEBERG), new BasicTableTestHelper(true, false)} + }; + } + + public TestTableSummaryWithoutOptimizing( + CatalogTestHelper catalogTestHelper, TableTestHelper tableTestHelper) { + super(catalogTestHelper, tableTestHelper, false); + } + + @Before + public void prepare() { + createDatabase(); + createTable(); + } + + @After + public void clear() { + try { + dropTable(); + dropDatabase(); + } catch (Exception e) { + // ignore + } + } + + private UnkeyedTable loadUnkeyedTable() { + return ((MixedTable) tableService().loadTable(serverTableIdentifier()).originalTable()) + .asUnkeyedTable(); + } + + private void initTableWithFiles() { + UnkeyedTable table = loadUnkeyedTable(); + appendData(table); + appendPosDelete(table); + DefaultTableRuntime runtime = getDefaultTableRuntime(serverTableIdentifier().getId()); + runtime.refresh(tableService().loadTable(serverTableIdentifier())); + } + + private void appendData(UnkeyedTable table) { + ArrayList records = + Lists.newArrayList( + tableTestHelper().generateTestRecord(1, "111", 0, "2022-01-01T12:00:00"), + tableTestHelper().generateTestRecord(2, "222", 0, "2022-01-01T12:00:00")); + List dataFiles = + OptimizingTestHelpers.appendBase( + table, tableTestHelper().writeBaseStore(table, 0L, records, false)); + + AppendFiles appendFiles = table.newAppend(); + dataFiles.forEach(appendFiles::appendFile); + appendFiles.commit(); + } + + private void appendPosDelete(UnkeyedTable table) { + ArrayList records = + Lists.newArrayList( + tableTestHelper().generateTestRecord(3, "333", 0, "2022-01-01T12:00:00"), + tableTestHelper().generateTestRecord(4, "444", 0, "2022-01-01T12:00:00")); + List dataFiles = + OptimizingTestHelpers.appendBase( + table, tableTestHelper().writeBaseStore(table, 0L, records, false)); + List posDeleteFiles = Lists.newArrayList(); + for (DataFile dataFile : dataFiles) { + posDeleteFiles.addAll( + MixedDataTestHelpers.writeBaseStorePosDelete( + table, 0L, dataFile, Collections.singletonList(0L))); + } + OptimizingTestHelpers.appendBasePosDelete(table, posDeleteFiles); + } + + private void refreshPending() { + TableRuntimeRefreshExecutor refresher = + new TableRuntimeRefreshExecutor(tableService(), 1, Integer.MAX_VALUE, Integer.MAX_VALUE); + refresher.execute(getDefaultTableRuntime(serverTableIdentifier().getId())); + refresher.dispose(); + } + + @SuppressWarnings("unchecked") + private Gauge getMetric( + Map metrics, ServerTableIdentifier identifier, MetricDefine define) { + return (Gauge) + metrics.get( + new MetricKey( + define, + ImmutableMap.of( + "catalog", + identifier.getCatalog(), + "database", + identifier.getDatabase(), + "table", + identifier.getTableName()))); + } + + @Test + public void testSummaryCollectedWhenOptimizingDisabledAndSummaryEnabled() { + // First, add files with optimizing enabled (default), then verify metrics via refresh + initTableWithFiles(); + refreshPending(); + + Map metrics = MetricManager.getInstance().getGlobalRegistry().getMetrics(); + ServerTableIdentifier identifier = serverTableIdentifier(); + + Gauge totalFiles = getMetric(metrics, identifier, TABLE_SUMMARY_TOTAL_FILES); + Gauge totalSize = getMetric(metrics, identifier, TABLE_SUMMARY_TOTAL_FILES_SIZE); + Gauge totalRecords = getMetric(metrics, identifier, TABLE_SUMMARY_TOTAL_RECORDS); + Gauge healthScore = getMetric(metrics, identifier, TABLE_SUMMARY_HEALTH_SCORE); + + // Baseline: metrics should be populated with optimizing enabled + long baselineTotalFiles = totalFiles.getValue(); + long baselineTotalSize = totalSize.getValue(); + Assertions.assertTrue(baselineTotalFiles > 0, "baseline totalFiles should be > 0"); + Assertions.assertTrue(baselineTotalSize > 0, "baseline totalSize should be > 0"); + + // Now disable optimizing and explicitly enable table-summary + UnkeyedTable table = loadUnkeyedTable(); + table + .updateProperties() + .set(TableProperties.ENABLE_SELF_OPTIMIZING, "false") + .set(TableProperties.TABLE_SUMMARY_ENABLED, "true") + .commit(); + DefaultTableRuntime runtime = getDefaultTableRuntime(serverTableIdentifier().getId()); + runtime.refresh(tableService().loadTable(serverTableIdentifier())); + + // Append more data to create a new snapshot + appendData(loadUnkeyedTable()); + runtime.refresh(tableService().loadTable(serverTableIdentifier())); + + // Execute refresh - should collect summary via summary-only branch + refreshPending(); + + // Verify: metrics should be updated (totalFiles/totalSize should increase from new data) + Assertions.assertTrue( + totalFiles.getValue() >= baselineTotalFiles, + "totalFiles should be >= baseline after summary-only refresh"); + Assertions.assertTrue( + totalSize.getValue() >= baselineTotalSize, + "totalSize should be >= baseline after summary-only refresh"); + Assertions.assertTrue(totalRecords.getValue() > 0, "totalRecords should be > 0"); + Assertions.assertTrue(healthScore.getValue() >= 0, "healthScore should be >= 0"); + } + + @Test + public void testSummaryNotCollectedWhenOptimizingDisabledWithDefault() { + // Disable optimizing only — table-summary.enabled defaults to false + UnkeyedTable table = loadUnkeyedTable(); + table.updateProperties().set(TableProperties.ENABLE_SELF_OPTIMIZING, "false").commit(); + DefaultTableRuntime runtime = getDefaultTableRuntime(serverTableIdentifier().getId()); + runtime.refresh(tableService().loadTable(serverTableIdentifier())); + + // Add files and refresh + initTableWithFiles(); + refreshPending(); + + Map metrics = MetricManager.getInstance().getGlobalRegistry().getMetrics(); + ServerTableIdentifier identifier = serverTableIdentifier(); + + Gauge totalFiles = getMetric(metrics, identifier, TABLE_SUMMARY_TOTAL_FILES); + Gauge healthScore = getMetric(metrics, identifier, TABLE_SUMMARY_HEALTH_SCORE); + + // Metrics should remain at initial values (not collected) + Assertions.assertEquals(0, totalFiles.getValue(), "totalFiles should remain 0"); + Assertions.assertEquals(-1, healthScore.getValue(), "healthScore should remain at initial -1"); + } + + @Test + public void testSummaryCollectedRepeatedlyWithoutNewSnapshots() { + // Setup: add files, then switch to summary-only mode + initTableWithFiles(); + UnkeyedTable table = loadUnkeyedTable(); + table + .updateProperties() + .set(TableProperties.ENABLE_SELF_OPTIMIZING, "false") + .set(TableProperties.TABLE_SUMMARY_ENABLED, "true") + .commit(); + DefaultTableRuntime runtime = getDefaultTableRuntime(serverTableIdentifier().getId()); + runtime.refresh(tableService().loadTable(serverTableIdentifier())); + + // First refresh — should collect summary + refreshPending(); + + Map metrics = MetricManager.getInstance().getGlobalRegistry().getMetrics(); + ServerTableIdentifier identifier = serverTableIdentifier(); + Gauge totalFiles = getMetric(metrics, identifier, TABLE_SUMMARY_TOTAL_FILES); + Gauge healthScore = getMetric(metrics, identifier, TABLE_SUMMARY_HEALTH_SCORE); + + long firstTotalFiles = totalFiles.getValue(); + long firstHealthScore = healthScore.getValue(); + Assertions.assertTrue(firstTotalFiles > 0, "first refresh should collect totalFiles > 0"); + Assertions.assertTrue(firstHealthScore >= 0, "first refresh should collect healthScore >= 0"); + + // Second refresh without any new snapshot — should still collect (not blocked by snapshot gate) + refreshPending(); + + Assertions.assertEquals( + firstTotalFiles, + totalFiles.getValue(), + "totalFiles should remain the same after second refresh without new snapshot"); + Assertions.assertEquals( + firstHealthScore, + healthScore.getValue(), + "healthScore should remain the same after second refresh without new snapshot"); + + // Third refresh — still works + refreshPending(); + + Assertions.assertEquals( + firstTotalFiles, + totalFiles.getValue(), + "totalFiles should remain the same after third refresh without new snapshot"); + } + + @Test + public void testSummaryUpdatedAfterNewSnapshotInSummaryOnlyMode() { + // Setup: add initial files + initTableWithFiles(); + + // Switch to summary-only mode + UnkeyedTable table = loadUnkeyedTable(); + table + .updateProperties() + .set(TableProperties.ENABLE_SELF_OPTIMIZING, "false") + .set(TableProperties.TABLE_SUMMARY_ENABLED, "true") + .commit(); + DefaultTableRuntime runtime = getDefaultTableRuntime(serverTableIdentifier().getId()); + runtime.refresh(tableService().loadTable(serverTableIdentifier())); + + // First refresh — record baseline + refreshPending(); + + Map metrics = MetricManager.getInstance().getGlobalRegistry().getMetrics(); + ServerTableIdentifier identifier = serverTableIdentifier(); + Gauge totalFiles = getMetric(metrics, identifier, TABLE_SUMMARY_TOTAL_FILES); + Gauge totalSize = getMetric(metrics, identifier, TABLE_SUMMARY_TOTAL_FILES_SIZE); + + long baselineTotalFiles = totalFiles.getValue(); + long baselineTotalSize = totalSize.getValue(); + Assertions.assertTrue(baselineTotalFiles > 0, "baseline totalFiles should be > 0"); + + // Append more data — creates a new snapshot + appendData(loadUnkeyedTable()); + runtime.refresh(tableService().loadTable(serverTableIdentifier())); + + // Refresh again — metrics should reflect the additional files + refreshPending(); + + Assertions.assertTrue( + totalFiles.getValue() > baselineTotalFiles, + "totalFiles should increase after appending more data"); + Assertions.assertTrue( + totalSize.getValue() > baselineTotalSize, + "totalSize should increase after appending more data"); + } + + @Test + public void testPropertyKeyNotFilteredWhenOptimizingDisabled() { + // Verify that table-summary.enabled is parsed independently from self-optimizing.enabled + Map properties = new HashMap<>(); + properties.put(TableProperties.ENABLE_SELF_OPTIMIZING, "false"); + properties.put(TableProperties.TABLE_SUMMARY_ENABLED, "true"); + + OptimizingConfig config = TableConfigurations.parseOptimizingConfig(properties); + Assertions.assertFalse(config.isEnabled(), "optimizing should be disabled"); + Assertions.assertTrue( + config.isTableSummaryEnabled(), + "tableSummaryEnabled should be true independently of optimizing.enabled"); + + // Also verify default: when table-summary.enabled is not set, it defaults to false + Map defaultProperties = new HashMap<>(); + defaultProperties.put(TableProperties.ENABLE_SELF_OPTIMIZING, "false"); + + OptimizingConfig defaultConfig = TableConfigurations.parseOptimizingConfig(defaultProperties); + Assertions.assertFalse(defaultConfig.isEnabled(), "optimizing should be disabled"); + Assertions.assertFalse( + defaultConfig.isTableSummaryEnabled(), "tableSummaryEnabled should default to false"); + } + + @Test + public void testReEnableOptimizingAfterSummaryOnlyMode() { + // Setup: add files, switch to summary-only, collect summary + initTableWithFiles(); + UnkeyedTable table = loadUnkeyedTable(); + table + .updateProperties() + .set(TableProperties.ENABLE_SELF_OPTIMIZING, "false") + .set(TableProperties.TABLE_SUMMARY_ENABLED, "true") + .commit(); + DefaultTableRuntime runtime = getDefaultTableRuntime(serverTableIdentifier().getId()); + runtime.refresh(tableService().loadTable(serverTableIdentifier())); + refreshPending(); + + Map metrics = MetricManager.getInstance().getGlobalRegistry().getMetrics(); + ServerTableIdentifier identifier = serverTableIdentifier(); + Gauge totalFiles = getMetric(metrics, identifier, TABLE_SUMMARY_TOTAL_FILES); + Gauge healthScore = getMetric(metrics, identifier, TABLE_SUMMARY_HEALTH_SCORE); + + long summaryOnlyTotalFiles = totalFiles.getValue(); + Assertions.assertTrue(summaryOnlyTotalFiles > 0, "summary-only mode should collect metrics"); + + // Re-enable optimizing + table = loadUnkeyedTable(); + table.updateProperties().set(TableProperties.ENABLE_SELF_OPTIMIZING, "true").commit(); + runtime.refresh(tableService().loadTable(serverTableIdentifier())); + + // Append data and refresh — normal optimizing path should work + appendData(loadUnkeyedTable()); + runtime.refresh(tableService().loadTable(serverTableIdentifier())); + refreshPending(); + + // Metrics should still be collected via the optimizing-enabled branch + Assertions.assertTrue( + totalFiles.getValue() > 0, "totalFiles should be collected after re-enabling optimizing"); + Assertions.assertTrue( + healthScore.getValue() >= 0, + "healthScore should be collected after re-enabling optimizing"); + } + + @Test + public void testSummaryCollectedWithOptimizingEnabled() { + // Verify that table summary is always collected in the optimizing-enabled path, + // regardless of table-summary.enabled setting + initTableWithFiles(); + refreshPending(); + + Map metrics = MetricManager.getInstance().getGlobalRegistry().getMetrics(); + ServerTableIdentifier identifier = serverTableIdentifier(); + Gauge totalFiles = getMetric(metrics, identifier, TABLE_SUMMARY_TOTAL_FILES); + Gauge totalSize = getMetric(metrics, identifier, TABLE_SUMMARY_TOTAL_FILES_SIZE); + Gauge healthScore = getMetric(metrics, identifier, TABLE_SUMMARY_HEALTH_SCORE); + + // With optimizing enabled (default), summary should always be collected + Assertions.assertTrue(totalFiles.getValue() > 0, "totalFiles should be > 0"); + Assertions.assertTrue(totalSize.getValue() > 0, "totalSize should be > 0"); + Assertions.assertTrue(healthScore.getValue() >= 0, "healthScore should be >= 0"); + + // Append more data and refresh — summary should still be collected + long previousTotalFiles = totalFiles.getValue(); + long previousHealthScore = healthScore.getValue(); + appendData(loadUnkeyedTable()); + DefaultTableRuntime runtime = getDefaultTableRuntime(serverTableIdentifier().getId()); + runtime.refresh(tableService().loadTable(serverTableIdentifier())); + refreshPending(); + + Assertions.assertTrue( + totalFiles.getValue() >= previousTotalFiles, + "totalFiles should be >= previous after refresh with optimizing enabled"); + Assertions.assertTrue( + healthScore.getValue() >= 0, + "healthScore should remain valid after refresh with optimizing enabled"); + } +} diff --git a/amoro-common/src/main/java/org/apache/amoro/config/OptimizingConfig.java b/amoro-common/src/main/java/org/apache/amoro/config/OptimizingConfig.java index 39afbdbb5f..caa14ae94c 100644 --- a/amoro-common/src/main/java/org/apache/amoro/config/OptimizingConfig.java +++ b/amoro-common/src/main/java/org/apache/amoro/config/OptimizingConfig.java @@ -29,6 +29,9 @@ public class OptimizingConfig { // self-optimizing.enabled private boolean enabled; + // self-optimizing.table-summary.enabled + private boolean tableSummaryEnabled; + private boolean allowPartialCommit; // self-optimizing.quota @@ -114,6 +117,15 @@ public OptimizingConfig setEnabled(boolean enabled) { return this; } + public boolean isTableSummaryEnabled() { + return tableSummaryEnabled; + } + + public OptimizingConfig setTableSummaryEnabled(boolean tableSummaryEnabled) { + this.tableSummaryEnabled = tableSummaryEnabled; + return this; + } + public boolean isAllowPartialCommit() { return allowPartialCommit; } @@ -364,6 +376,7 @@ public boolean equals(Object o) { } OptimizingConfig that = (OptimizingConfig) o; return enabled == that.enabled + && tableSummaryEnabled == that.tableSummaryEnabled && allowPartialCommit == that.allowPartialCommit && Double.compare(that.targetQuota, targetQuota) == 0 && maxExecuteRetryCount == that.maxExecuteRetryCount @@ -396,6 +409,7 @@ public boolean equals(Object o) { public int hashCode() { return Objects.hashCode( enabled, + tableSummaryEnabled, allowPartialCommit, targetQuota, optimizerGroup, @@ -427,6 +441,7 @@ public int hashCode() { public String toString() { return MoreObjects.toStringHelper(this) .add("enabled", enabled) + .add("tableSummaryEnabled", tableSummaryEnabled) .add("commitOnPartialSuccess", allowPartialCommit) .add("targetQuota", targetQuota) .add("optimizerGroup", optimizerGroup) diff --git a/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java b/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java index 015c68d480..0905acf428 100644 --- a/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java +++ b/amoro-format-iceberg/src/main/java/org/apache/amoro/table/TableProperties.java @@ -76,6 +76,10 @@ private TableProperties() {} public static final boolean ENABLE_SELF_OPTIMIZING_DEFAULT = true; + public static final String TABLE_SUMMARY_ENABLED = "table-summary.enabled"; + + public static final boolean TABLE_SUMMARY_ENABLED_DEFAULT = false; + public static final String SELF_OPTIMIZING_ALLOW_PARTIAL_COMMIT = "self-optimizing.allow-partial-commit"; public static final boolean SELF_OPTIMIZING_ALLOW_PARTIAL_COMMIT_DEFAULT = false; From 65111c53525380e764369bfe8da7b8c728454d7a Mon Sep 17 00:00:00 2001 From: Jiwon Park Date: Fri, 6 Mar 2026 20:54:29 +0900 Subject: [PATCH 2/3] Fix review issues: remove double-append bug and redundant tableSummaryOnly gate - Remove duplicate newAppend().commit() in test appendData() since OptimizingTestHelpers.appendBase() already commits - Remove unused AppendFiles import - Remove redundant tableSummaryOnly bypass in execute() since snapshotChanged already covers unoptimized data - Fix misleading test comment Signed-off-by: Jiwon Park --- .../inline/TableRuntimeRefreshExecutor.java | 2 +- .../table/TestTableSummaryWithoutOptimizing.java | 12 +++--------- 2 files changed, 4 insertions(+), 10 deletions(-) diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TableRuntimeRefreshExecutor.java b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TableRuntimeRefreshExecutor.java index 8f0cd02dcb..ae9a1ea1d9 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TableRuntimeRefreshExecutor.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/TableRuntimeRefreshExecutor.java @@ -167,7 +167,7 @@ public void execute(TableRuntime tableRuntime) { boolean tableSummaryOnly = !optimizingConfig.isEnabled() && optimizingConfig.isTableSummaryEnabled(); boolean hasOptimizingDemand = false; - if (snapshotChanged || tableSummaryOnly) { + if (snapshotChanged) { hasOptimizingDemand = tryEvaluatingPendingInput(defaultTableRuntime, mixedTable); } else { logger.debug("{} optimizing is not necessary", defaultTableRuntime.getTableIdentifier()); diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableSummaryWithoutOptimizing.java b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableSummaryWithoutOptimizing.java index 444ba125a9..c3a6163363 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableSummaryWithoutOptimizing.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestTableSummaryWithoutOptimizing.java @@ -43,7 +43,6 @@ import org.apache.amoro.table.MixedTable; import org.apache.amoro.table.TableProperties; import org.apache.amoro.table.UnkeyedTable; -import org.apache.iceberg.AppendFiles; import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.data.Record; @@ -110,13 +109,8 @@ private void appendData(UnkeyedTable table) { Lists.newArrayList( tableTestHelper().generateTestRecord(1, "111", 0, "2022-01-01T12:00:00"), tableTestHelper().generateTestRecord(2, "222", 0, "2022-01-01T12:00:00")); - List dataFiles = - OptimizingTestHelpers.appendBase( - table, tableTestHelper().writeBaseStore(table, 0L, records, false)); - - AppendFiles appendFiles = table.newAppend(); - dataFiles.forEach(appendFiles::appendFile); - appendFiles.commit(); + OptimizingTestHelpers.appendBase( + table, tableTestHelper().writeBaseStore(table, 0L, records, false)); } private void appendPosDelete(UnkeyedTable table) { @@ -256,7 +250,7 @@ public void testSummaryCollectedRepeatedlyWithoutNewSnapshots() { Assertions.assertTrue(firstTotalFiles > 0, "first refresh should collect totalFiles > 0"); Assertions.assertTrue(firstHealthScore >= 0, "first refresh should collect healthScore >= 0"); - // Second refresh without any new snapshot — should still collect (not blocked by snapshot gate) + // Second refresh without any new snapshot — still collected because data was never optimized refreshPending(); Assertions.assertEquals( From 7df87bbc215bf78690996f4a61fe138d4311cb4f Mon Sep 17 00:00:00 2001 From: Jiwon Park Date: Fri, 6 Mar 2026 21:06:45 +0900 Subject: [PATCH 3/3] Fix property key comment to match actual table property name Signed-off-by: Jiwon Park --- .../src/main/java/org/apache/amoro/config/OptimizingConfig.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/amoro-common/src/main/java/org/apache/amoro/config/OptimizingConfig.java b/amoro-common/src/main/java/org/apache/amoro/config/OptimizingConfig.java index caa14ae94c..a8efccd3b0 100644 --- a/amoro-common/src/main/java/org/apache/amoro/config/OptimizingConfig.java +++ b/amoro-common/src/main/java/org/apache/amoro/config/OptimizingConfig.java @@ -29,7 +29,7 @@ public class OptimizingConfig { // self-optimizing.enabled private boolean enabled; - // self-optimizing.table-summary.enabled + // table-summary.enabled private boolean tableSummaryEnabled; private boolean allowPartialCommit;