Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -78,5 +78,14 @@ public
resources
!javadoc/resources

# Maven resources directories should be tracked
!*/src/main/resources/
!*/src/main/resources/**
!*/src/test/resources/
!*/src/test/resources/**

# dashboard static resources
amoro-web/src/main/resources/static/

!dist/src/main/amoro-bin/bin/
!dist/src/main/amoro-bin/conf/
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.amoro.config.shade.utils.ConfigShadeUtils;
import org.apache.amoro.exception.AmoroRuntimeException;
import org.apache.amoro.process.ActionCoordinator;
import org.apache.amoro.process.ExecuteEngine;
import org.apache.amoro.process.ProcessFactory;
import org.apache.amoro.server.catalog.CatalogManager;
import org.apache.amoro.server.catalog.DefaultCatalogManager;
Expand Down Expand Up @@ -241,14 +242,15 @@ public void startOptimizingService() throws Exception {
TableProcessFactoryManager tableProcessFactoryManager = new TableProcessFactoryManager();
tableProcessFactoryManager.initialize();
List<ProcessFactory> processFactories = tableProcessFactoryManager.installedPlugins();
ExecuteEngineManager executeEngineManager = new ExecuteEngineManager();
executeEngineManager.initialize();
List<ExecuteEngine> executeEngines = executeEngineManager.installedPlugins();
processFactories.forEach(c -> c.availableExecuteEngines(executeEngines));

DefaultTableRuntimeFactory defaultRuntimeFactory = new DefaultTableRuntimeFactory();
defaultRuntimeFactory.initialize(processFactories);

List<ActionCoordinator> actionCoordinators = defaultRuntimeFactory.supportedCoordinators();
ExecuteEngineManager executeEngineManager = new ExecuteEngineManager();
processFactories.forEach(
c -> c.availableExecuteEngines(executeEngineManager.installedPlugins()));

tableService = new DefaultTableService(serviceConfig, catalogManager, defaultRuntimeFactory);
processService = new ProcessService(tableService, actionCoordinators, executeEngineManager);
Expand All @@ -260,7 +262,6 @@ public void startOptimizingService() throws Exception {
addHandlerChain(optimizingService.getTableRuntimeHandler());
addHandlerChain(processService.getTableHandlerChain());
addHandlerChain(InlineTableExecutors.getInstance().getDataExpiringExecutor());
addHandlerChain(InlineTableExecutors.getInstance().getSnapshotsExpiringExecutor());
addHandlerChain(InlineTableExecutors.getInstance().getOrphanFilesCleaningExecutor());
addHandlerChain(InlineTableExecutors.getInstance().getDanglingDeleteFilesCleaningExecutor());
addHandlerChain(InlineTableExecutors.getInstance().getOptimizingCommitExecutor());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ private void initialize(List<TableRuntime> tableRuntimes) {
actionCoordinator.action().getName(),
new ActionCoordinatorScheduler(actionCoordinator, tableService, ProcessService.this));
}
executeEngineManager.initialize();
executeEngineManager
.installedPlugins()
.forEach(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.amoro.server.process.iceberg;

import org.apache.amoro.Action;
import org.apache.amoro.IcebergActions;
import org.apache.amoro.TableFormat;
import org.apache.amoro.TableRuntime;
import org.apache.amoro.config.ConfigOption;
import org.apache.amoro.config.ConfigOptions;
import org.apache.amoro.config.Configurations;
import org.apache.amoro.process.ExecuteEngine;
import org.apache.amoro.process.LocalExecutionEngine;
import org.apache.amoro.process.ProcessFactory;
import org.apache.amoro.process.ProcessTriggerStrategy;
import org.apache.amoro.process.RecoverProcessFailedException;
import org.apache.amoro.process.TableProcess;
import org.apache.amoro.process.TableProcessStore;
import org.apache.amoro.server.table.DefaultTableRuntime;
import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
import org.apache.commons.lang3.tuple.Pair;

import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

/** Default process factory for Iceberg-related maintenance actions in AMS. */
public class IcebergProcessFactory implements ProcessFactory {

public static final String PLUGIN_NAME = "iceberg";
public static final ConfigOption<Boolean> SNAPSHOT_EXPIRE_ENABLED =
ConfigOptions.key("expire-snapshots.enabled").booleanType().defaultValue(true);

public static final ConfigOption<Duration> SNAPSHOT_EXPIRE_INTERVAL =
ConfigOptions.key("expire-snapshot.interval")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

YAML is expire-snapshots.interval

.durationType()
.defaultValue(Duration.ofHours(1));

private ExecuteEngine localEngine;
private final Map<Action, ProcessTriggerStrategy> actions = Maps.newHashMap();
private final List<TableFormat> formats =
Lists.newArrayList(TableFormat.ICEBERG, TableFormat.MIXED_ICEBERG, TableFormat.MIXED_HIVE);

@Override
public void availableExecuteEngines(Collection<ExecuteEngine> allAvailableEngines) {
for (ExecuteEngine engine : allAvailableEngines) {
if (engine instanceof LocalExecutionEngine) {
this.localEngine = engine;
}
}
}

@Override
public Map<TableFormat, Set<Action>> supportedActions() {
return formats.stream()
.map(f -> Pair.of(f, actions.keySet()))
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
}

@Override
public ProcessTriggerStrategy triggerStrategy(TableFormat format, Action action) {
return actions.getOrDefault(action, ProcessTriggerStrategy.METADATA_TRIGGER);
}

@Override
public Optional<TableProcess> trigger(TableRuntime tableRuntime, Action action) {
if (!actions.containsKey(action)) {
return Optional.empty();
}

if (IcebergActions.EXPIRE_SNAPSHOTS.equals(action)) {
return triggerExpireSnapshot(tableRuntime);
}
return Optional.empty();
}

@Override
public TableProcess recover(TableRuntime tableRuntime, TableProcessStore store)
throws RecoverProcessFailedException {
throw new RecoverProcessFailedException(
"Unsupported action for IcebergProcessFactory: " + store.getAction());
}

@Override
public void open(Map<String, String> properties) {
if (properties == null || properties.isEmpty()) {
return;
}
Configurations configs = Configurations.fromMap(properties);
if (configs.getBoolean(SNAPSHOT_EXPIRE_ENABLED)) {
Duration interval = configs.getDuration(SNAPSHOT_EXPIRE_INTERVAL);
this.actions.put(
IcebergActions.EXPIRE_SNAPSHOTS, ProcessTriggerStrategy.triggerAtFixRate(interval));
}
}

private Optional<TableProcess> triggerExpireSnapshot(TableRuntime tableRuntime) {
if (localEngine == null || !tableRuntime.getTableConfiguration().isExpireSnapshotEnabled()) {
return Optional.empty();
}

long lastExecuteTime =
tableRuntime.getState(DefaultTableRuntime.CLEANUP_STATE_KEY).getLastSnapshotsExpiringTime();
ProcessTriggerStrategy strategy = actions.get(IcebergActions.EXPIRE_SNAPSHOTS);
if (System.currentTimeMillis() - lastExecuteTime < strategy.getTriggerInterval().toMillis()) {
return Optional.empty();
}

return Optional.of(new SnapshotsExpiringProcess(tableRuntime, localEngine));
}

@Override
public void close() {}

@Override
public String name() {
return PLUGIN_NAME;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.amoro.server.process.iceberg;

import org.apache.amoro.Action;
import org.apache.amoro.AmoroTable;
import org.apache.amoro.IcebergActions;
import org.apache.amoro.TableRuntime;
import org.apache.amoro.maintainer.TableMaintainer;
import org.apache.amoro.process.ExecuteEngine;
import org.apache.amoro.process.LocalProcess;
import org.apache.amoro.process.TableProcess;
import org.apache.amoro.server.optimizing.maintainer.TableMaintainers;
import org.apache.amoro.server.table.DefaultTableRuntime;
import org.apache.amoro.shade.guava32.com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

/** Local table process for expiring Iceberg snapshots. */
public class SnapshotsExpiringProcess extends TableProcess implements LocalProcess {

private static final Logger LOG = LoggerFactory.getLogger(SnapshotsExpiringProcess.class);

public SnapshotsExpiringProcess(TableRuntime tableRuntime, ExecuteEngine engine) {
super(tableRuntime, engine);
}

@Override
public String tag() {
return getAction().getName().toLowerCase();
}

@Override
public void run() {
try {
AmoroTable<?> amoroTable = tableRuntime.loadTable();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is that the new scheduling path no longer preserves the old “run, then record cleanup time” behavior for snapshot expiration.

In the old implementation, SnapshotsExpiringExecutor.java executed tableMaintainer.expireSnapshots() synchronously. Only after that finished did PeriodicTableScheduler.java (line 125) update lastCleanTime and schedule the next run. So the interval was effectively measured from the end of the previous cleanup.

In the new path, ActionCoordinatorScheduler.java (line 103) only submits/registers a process and returns immediately. After that return, PeriodicTableScheduler still updates lastCleanTime right away, even though the real cleanup work has not finished yet. The actual cleanup now happens later in SnapshotsExpiringProcess.java (line 53).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Building on your observation — the async submission also introduces a state-loss issue in LocalExecutionEngine.getStatus().

getStatus() removes the Future from the map on terminal states (isDone/isCancelled), making it non-idempotent:

Call 1: future.isDone() == true → remove → SUCCESS
Call 2: future == null          → KILLED  (wrong!)

TableProcessExecutor polls getStatus() in a loop (line 107), so if any retry or concurrent access queries the same identifier twice after completion, it gets KILLED instead of the real result.

There's also a TOCTOU race between containsKey and get across cancelingInstances/activeInstances (lines 67-70), since the compound check-then-act isn't atomic even with ConcurrentHashMap.

TableMaintainer tableMaintainer = TableMaintainers.create(amoroTable, tableRuntime);
tableMaintainer.expireSnapshots();
} catch (Throwable t) {
LOG.error("unexpected expire error of table {} ", tableRuntime.getTableIdentifier(), t);
} finally {
tableRuntime.updateState(
DefaultTableRuntime.CLEANUP_STATE_KEY,
cleanUp -> cleanUp.setLastSnapshotsExpiringTime(System.currentTimeMillis()));
}
}

@Override
public Action getAction() {
return IcebergActions.EXPIRE_SNAPSHOTS;
}

@Override
public Map<String, String> getProcessParameters() {
return Maps.newHashMap();
}

@Override
public Map<String, String> getSummary() {
return Maps.newHashMap();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
public class InlineTableExecutors {

private static final InlineTableExecutors instance = new InlineTableExecutors();
private SnapshotsExpiringExecutor snapshotsExpiringExecutor;
private TableRuntimeRefreshExecutor tableRefreshingExecutor;
private OrphanFilesCleaningExecutor orphanFilesCleaningExecutor;
private DanglingDeleteFilesCleaningExecutor danglingDeleteFilesCleaningExecutor;
Expand All @@ -41,13 +40,6 @@ public static InlineTableExecutors getInstance() {
}

public void setup(TableService tableService, Configurations conf) {
if (conf.getBoolean(AmoroManagementConf.EXPIRE_SNAPSHOTS_ENABLED)) {
this.snapshotsExpiringExecutor =
new SnapshotsExpiringExecutor(
tableService,
conf.getInteger(AmoroManagementConf.EXPIRE_SNAPSHOTS_THREAD_COUNT),
conf.get(AmoroManagementConf.EXPIRE_SNAPSHOTS_INTERVAL));
}
if (conf.getBoolean(AmoroManagementConf.CLEAN_ORPHAN_FILES_ENABLED)) {
this.orphanFilesCleaningExecutor =
new OrphanFilesCleaningExecutor(
Expand Down Expand Up @@ -98,10 +90,6 @@ public void setup(TableService tableService, Configurations conf) {
}
}

public SnapshotsExpiringExecutor getSnapshotsExpiringExecutor() {
return snapshotsExpiringExecutor;
}

public TableRuntimeRefreshExecutor getTableRefreshingExecutor() {
return tableRefreshingExecutor;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@
import org.apache.amoro.server.persistence.PersistentBase;
import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
import org.apache.amoro.shade.zookeeper3.org.apache.curator.shaded.com.google.common.collect.Maps;
import org.apache.amoro.table.StateKey;
import org.apache.amoro.table.TableRuntimeStore;

import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;
import java.util.stream.Collectors;

public abstract class AbstractTableRuntime extends PersistentBase
Expand Down Expand Up @@ -62,6 +64,16 @@ public Map<String, String> getTableConfig() {
return store().getTableConfig();
}

@Override
public <T> T getState(StateKey<T> key) {
return store().getState(key);
}

@Override
public <T> void updateState(StateKey<T> key, Function<T, T> updater) {
store().begin().updateState(key, updater).commit();
}

@Override
public List<TableProcessStore> getProcessStates() {
return processContainerMap.values().stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public class DefaultTableRuntime extends AbstractTableRuntime {
.jsonType(AbstractOptimizingEvaluator.PendingInput.class)
.defaultValue(new AbstractOptimizingEvaluator.PendingInput());

private static final StateKey<TableRuntimeCleanupState> CLEANUP_STATE_KEY =
public static final StateKey<TableRuntimeCleanupState> CLEANUP_STATE_KEY =
StateKey.stateKey("cleanup_state")
.jsonType(TableRuntimeCleanupState.class)
.defaultValue(new TableRuntimeCleanupState());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ public long getLastSnapshotsExpiringTime() {
return lastSnapshotsExpiringTime;
}

public void setLastSnapshotsExpiringTime(long lastSnapshotsExpiringTime) {
public TableRuntimeCleanupState setLastSnapshotsExpiringTime(long lastSnapshotsExpiringTime) {
this.lastSnapshotsExpiringTime = lastSnapshotsExpiringTime;
return this;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

org.apache.amoro.server.process.iceberg.IcebergProcessFactory
Loading
Loading