Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,10 @@ protected long getNextExecutingTime(TableRuntime tableRuntime) {
}

private boolean tryEvaluatingPendingInput(DefaultTableRuntime tableRuntime, MixedTable table) {
// only evaluate pending input when optimizing is enabled and in idle state
OptimizingConfig optimizingConfig = tableRuntime.getOptimizingConfig();
boolean optimizingEnabled = optimizingConfig.isEnabled();
if (optimizingEnabled && tableRuntime.getOptimizingStatus().equals(OptimizingStatus.IDLE)) {

// Evaluate pending input and collect table summary when optimizing is enabled and idle
if (optimizingConfig.isMetadataBasedTriggerEnabled()
&& !MetadataBasedEvaluationEvent.isEvaluatingNecessary(
optimizingConfig, table, tableRuntime.getLastPlanTime())) {
Expand Down Expand Up @@ -102,11 +101,21 @@ private boolean tryEvaluatingPendingInput(DefaultTableRuntime tableRuntime, Mixe

tableRuntime.setTableSummary(evaluator.getPendingInput());
return evaluatorIsNecessary;
} else if (!optimizingEnabled && optimizingConfig.isTableSummaryEnabled()) {
// Collect table summary metrics even when optimizing is disabled
logger.debug(
"{} collecting table summary (optimizing disabled, tableSummary enabled)",
tableRuntime.getTableIdentifier());
AbstractOptimizingEvaluator evaluator =
IcebergTableUtil.createOptimizingEvaluator(tableRuntime, table, maxPendingPartitions);
AbstractOptimizingEvaluator.PendingInput summary = evaluator.getPendingInput();
logger.debug("{} table summary collected: {}", tableRuntime.getTableIdentifier(), summary);
tableRuntime.setTableSummary(summary);
return false;
} else if (!optimizingEnabled) {
logger.debug(
"{} optimizing is not enabled, skip evaluating pending input",
tableRuntime.getTableIdentifier());
// indicates no optimization demand now
return false;
} else {
logger.debug(
Expand Down Expand Up @@ -147,21 +156,27 @@ public void execute(TableRuntime tableRuntime) {
AmoroTable<?> table = loadTable(tableRuntime);
defaultTableRuntime.refresh(table);
MixedTable mixedTable = (MixedTable) table.originalTable();
// Check if there is any optimizing demand now.
boolean snapshotChanged =
(mixedTable.isKeyedTable()
&& (lastOptimizedSnapshotId != defaultTableRuntime.getCurrentSnapshotId()
|| lastOptimizedChangeSnapshotId
!= defaultTableRuntime.getCurrentChangeSnapshotId()))
|| (mixedTable.isUnkeyedTable()
&& lastOptimizedSnapshotId != defaultTableRuntime.getCurrentSnapshotId());
OptimizingConfig optimizingConfig = defaultTableRuntime.getOptimizingConfig();
boolean tableSummaryOnly =
!optimizingConfig.isEnabled() && optimizingConfig.isTableSummaryEnabled();
boolean hasOptimizingDemand = false;
if ((mixedTable.isKeyedTable()
&& (lastOptimizedSnapshotId != defaultTableRuntime.getCurrentSnapshotId()
|| lastOptimizedChangeSnapshotId
!= defaultTableRuntime.getCurrentChangeSnapshotId()))
|| (mixedTable.isUnkeyedTable()
&& lastOptimizedSnapshotId != defaultTableRuntime.getCurrentSnapshotId())) {
if (snapshotChanged) {
hasOptimizingDemand = tryEvaluatingPendingInput(defaultTableRuntime, mixedTable);
} else {
logger.debug("{} optimizing is not necessary", defaultTableRuntime.getTableIdentifier());
}

// Update adaptive interval according to evaluated result.
if (defaultTableRuntime.getOptimizingConfig().isRefreshTableAdaptiveEnabled(interval)) {
// Skip adaptive interval for table-summary-only mode to maintain fixed collection interval.
if (!tableSummaryOnly
&& defaultTableRuntime.getOptimizingConfig().isRefreshTableAdaptiveEnabled(interval)) {
defaultTableRuntime.setLatestEvaluatedNeedOptimizing(hasOptimizingDemand);
long newInterval = getAdaptiveExecutingInterval(defaultTableRuntime);
defaultTableRuntime.setLatestRefreshInterval(newInterval);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,11 @@ public static OptimizingConfig parseOptimizingConfig(Map<String, String> propert
properties,
TableProperties.ENABLE_SELF_OPTIMIZING,
TableProperties.ENABLE_SELF_OPTIMIZING_DEFAULT))
.setTableSummaryEnabled(
CompatiblePropertyUtil.propertyAsBoolean(
properties,
TableProperties.TABLE_SUMMARY_ENABLED,
TableProperties.TABLE_SUMMARY_ENABLED_DEFAULT))
.setAllowPartialCommit(
CompatiblePropertyUtil.propertyAsBoolean(
properties,
Expand Down
Loading