diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java index 25de6a7afe..94734d6089 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java @@ -34,6 +34,7 @@ import org.apache.amoro.config.shade.utils.ConfigShadeUtils; import org.apache.amoro.exception.AmoroRuntimeException; import org.apache.amoro.process.ActionCoordinator; +import org.apache.amoro.process.ExecuteEngine; import org.apache.amoro.process.ProcessFactory; import org.apache.amoro.server.catalog.CatalogManager; import org.apache.amoro.server.catalog.DefaultCatalogManager; @@ -241,14 +242,15 @@ public void startOptimizingService() throws Exception { TableProcessFactoryManager tableProcessFactoryManager = new TableProcessFactoryManager(); tableProcessFactoryManager.initialize(); List processFactories = tableProcessFactoryManager.installedPlugins(); + ExecuteEngineManager executeEngineManager = new ExecuteEngineManager(); + executeEngineManager.initialize(); + List executeEngines = executeEngineManager.installedPlugins(); + processFactories.forEach(c -> c.availableExecuteEngines(executeEngines)); DefaultTableRuntimeFactory defaultRuntimeFactory = new DefaultTableRuntimeFactory(); defaultRuntimeFactory.initialize(processFactories); List actionCoordinators = defaultRuntimeFactory.supportedCoordinators(); - ExecuteEngineManager executeEngineManager = new ExecuteEngineManager(); - processFactories.forEach( - c -> c.availableExecuteEngines(executeEngineManager.installedPlugins())); tableService = new DefaultTableService(serviceConfig, catalogManager, defaultRuntimeFactory); processService = new ProcessService(tableService, actionCoordinators, executeEngineManager); @@ -260,7 +262,6 @@ public void startOptimizingService() throws Exception { addHandlerChain(optimizingService.getTableRuntimeHandler()); addHandlerChain(processService.getTableHandlerChain()); addHandlerChain(InlineTableExecutors.getInstance().getDataExpiringExecutor()); - addHandlerChain(InlineTableExecutors.getInstance().getSnapshotsExpiringExecutor()); addHandlerChain(InlineTableExecutors.getInstance().getOrphanFilesCleaningExecutor()); addHandlerChain(InlineTableExecutors.getInstance().getDanglingDeleteFilesCleaningExecutor()); addHandlerChain(InlineTableExecutors.getInstance().getOptimizingCommitExecutor()); diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java b/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java index 01944e8a26..c3cac4efe2 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java @@ -128,7 +128,6 @@ private void initialize(List tableRuntimes) { actionCoordinator.action().getName(), new ActionCoordinatorScheduler(actionCoordinator, tableService, ProcessService.this)); } - executeEngineManager.initialize(); executeEngineManager .installedPlugins() .forEach( diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java new file mode 100755 index 0000000000..18de241db8 --- /dev/null +++ b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.process.iceberg; + +import org.apache.amoro.Action; +import org.apache.amoro.IcebergActions; +import org.apache.amoro.TableFormat; +import org.apache.amoro.TableRuntime; +import org.apache.amoro.config.ConfigOption; +import org.apache.amoro.config.ConfigOptions; +import org.apache.amoro.config.Configurations; +import org.apache.amoro.process.ExecuteEngine; +import org.apache.amoro.process.LocalExecutionEngine; +import org.apache.amoro.process.ProcessFactory; +import org.apache.amoro.process.ProcessTriggerStrategy; +import org.apache.amoro.process.RecoverProcessFailedException; +import org.apache.amoro.process.TableProcess; +import org.apache.amoro.process.TableProcessStore; +import org.apache.amoro.server.table.DefaultTableRuntime; +import org.apache.amoro.shade.guava32.com.google.common.collect.Lists; +import org.apache.amoro.shade.guava32.com.google.common.collect.Maps; +import org.apache.commons.lang3.tuple.Pair; + +import java.time.Duration; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +/** Default process factory for Iceberg-related maintenance actions in AMS. */ +public class IcebergProcessFactory implements ProcessFactory { + + public static final String PLUGIN_NAME = "iceberg"; + public static final ConfigOption SNAPSHOT_EXPIRE_ENABLED = + ConfigOptions.key("expire-snapshots.enabled").booleanType().defaultValue(true); + + public static final ConfigOption SNAPSHOT_EXPIRE_INTERVAL = + ConfigOptions.key("expire-snapshots.interval") + .durationType() + .defaultValue(Duration.ofHours(1)); + + private ExecuteEngine localEngine; + private final Map actions = Maps.newHashMap(); + private final List formats = + Lists.newArrayList(TableFormat.ICEBERG, TableFormat.MIXED_ICEBERG, TableFormat.MIXED_HIVE); + + @Override + public void availableExecuteEngines(Collection allAvailableEngines) { + for (ExecuteEngine engine : allAvailableEngines) { + if (engine instanceof LocalExecutionEngine) { + this.localEngine = engine; + } + } + } + + @Override + public Map> supportedActions() { + return formats.stream() + .map(f -> Pair.of(f, actions.keySet())) + .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); + } + + @Override + public ProcessTriggerStrategy triggerStrategy(TableFormat format, Action action) { + return actions.getOrDefault(action, ProcessTriggerStrategy.METADATA_TRIGGER); + } + + @Override + public Optional trigger(TableRuntime tableRuntime, Action action) { + if (!actions.containsKey(action)) { + return Optional.empty(); + } + + if (IcebergActions.EXPIRE_SNAPSHOTS.equals(action)) { + return triggerExpireSnapshot(tableRuntime); + } + return Optional.empty(); + } + + @Override + public TableProcess recover(TableRuntime tableRuntime, TableProcessStore store) + throws RecoverProcessFailedException { + throw new RecoverProcessFailedException( + "Unsupported action for IcebergProcessFactory: " + store.getAction()); + } + + @Override + public void open(Map properties) { + if (properties == null || properties.isEmpty()) { + return; + } + Configurations configs = Configurations.fromMap(properties); + if (configs.getBoolean(SNAPSHOT_EXPIRE_ENABLED)) { + Duration interval = configs.getDuration(SNAPSHOT_EXPIRE_INTERVAL); + this.actions.put( + IcebergActions.EXPIRE_SNAPSHOTS, ProcessTriggerStrategy.triggerAtFixRate(interval)); + } + } + + private Optional triggerExpireSnapshot(TableRuntime tableRuntime) { + if (localEngine == null || !tableRuntime.getTableConfiguration().isExpireSnapshotEnabled()) { + return Optional.empty(); + } + + long lastExecuteTime = + tableRuntime.getState(DefaultTableRuntime.CLEANUP_STATE_KEY).getLastSnapshotsExpiringTime(); + ProcessTriggerStrategy strategy = actions.get(IcebergActions.EXPIRE_SNAPSHOTS); + if (System.currentTimeMillis() - lastExecuteTime < strategy.getTriggerInterval().toMillis()) { + return Optional.empty(); + } + + return Optional.of(new SnapshotsExpiringProcess(tableRuntime, localEngine)); + } + + @Override + public void close() {} + + @Override + public String name() { + return PLUGIN_NAME; + } +} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/SnapshotsExpiringProcess.java b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/SnapshotsExpiringProcess.java new file mode 100755 index 0000000000..3aea51d9f8 --- /dev/null +++ b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/SnapshotsExpiringProcess.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.process.iceberg; + +import org.apache.amoro.Action; +import org.apache.amoro.AmoroTable; +import org.apache.amoro.IcebergActions; +import org.apache.amoro.TableRuntime; +import org.apache.amoro.maintainer.TableMaintainer; +import org.apache.amoro.process.ExecuteEngine; +import org.apache.amoro.process.LocalProcess; +import org.apache.amoro.process.TableProcess; +import org.apache.amoro.server.optimizing.maintainer.TableMaintainers; +import org.apache.amoro.server.table.DefaultTableRuntime; +import org.apache.amoro.shade.guava32.com.google.common.collect.Maps; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; + +/** Local table process for expiring Iceberg snapshots. */ +public class SnapshotsExpiringProcess extends TableProcess implements LocalProcess { + + private static final Logger LOG = LoggerFactory.getLogger(SnapshotsExpiringProcess.class); + + public SnapshotsExpiringProcess(TableRuntime tableRuntime, ExecuteEngine engine) { + super(tableRuntime, engine); + } + + @Override + public String tag() { + return getAction().getName().toLowerCase(); + } + + @Override + public void run() { + try { + AmoroTable amoroTable = tableRuntime.loadTable(); + TableMaintainer tableMaintainer = TableMaintainers.create(amoroTable, tableRuntime); + tableMaintainer.expireSnapshots(); + tableRuntime.updateState( + DefaultTableRuntime.CLEANUP_STATE_KEY, + cleanUp -> cleanUp.setLastSnapshotsExpiringTime(System.currentTimeMillis())); + } catch (Throwable t) { + LOG.error("unexpected expire error of table {} ", tableRuntime.getTableIdentifier(), t); + } + } + + @Override + public Action getAction() { + return IcebergActions.EXPIRE_SNAPSHOTS; + } + + @Override + public Map getProcessParameters() { + return Maps.newHashMap(); + } + + @Override + public Map getSummary() { + return Maps.newHashMap(); + } +} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/InlineTableExecutors.java b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/InlineTableExecutors.java index 17205bfbfc..17eb72e392 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/InlineTableExecutors.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/InlineTableExecutors.java @@ -25,7 +25,6 @@ public class InlineTableExecutors { private static final InlineTableExecutors instance = new InlineTableExecutors(); - private SnapshotsExpiringExecutor snapshotsExpiringExecutor; private TableRuntimeRefreshExecutor tableRefreshingExecutor; private OrphanFilesCleaningExecutor orphanFilesCleaningExecutor; private DanglingDeleteFilesCleaningExecutor danglingDeleteFilesCleaningExecutor; @@ -41,13 +40,6 @@ public static InlineTableExecutors getInstance() { } public void setup(TableService tableService, Configurations conf) { - if (conf.getBoolean(AmoroManagementConf.EXPIRE_SNAPSHOTS_ENABLED)) { - this.snapshotsExpiringExecutor = - new SnapshotsExpiringExecutor( - tableService, - conf.getInteger(AmoroManagementConf.EXPIRE_SNAPSHOTS_THREAD_COUNT), - conf.get(AmoroManagementConf.EXPIRE_SNAPSHOTS_INTERVAL)); - } if (conf.getBoolean(AmoroManagementConf.CLEAN_ORPHAN_FILES_ENABLED)) { this.orphanFilesCleaningExecutor = new OrphanFilesCleaningExecutor( @@ -98,10 +90,6 @@ public void setup(TableService tableService, Configurations conf) { } } - public SnapshotsExpiringExecutor getSnapshotsExpiringExecutor() { - return snapshotsExpiringExecutor; - } - public TableRuntimeRefreshExecutor getTableRefreshingExecutor() { return tableRefreshingExecutor; } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/AbstractTableRuntime.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/AbstractTableRuntime.java index 05e17adc09..11f5f2f563 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/AbstractTableRuntime.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/AbstractTableRuntime.java @@ -25,12 +25,14 @@ import org.apache.amoro.server.persistence.PersistentBase; import org.apache.amoro.shade.guava32.com.google.common.collect.Lists; import org.apache.amoro.shade.zookeeper3.org.apache.curator.shaded.com.google.common.collect.Maps; +import org.apache.amoro.table.StateKey; import org.apache.amoro.table.TableRuntimeStore; import java.util.List; import java.util.Map; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Function; import java.util.stream.Collectors; public abstract class AbstractTableRuntime extends PersistentBase @@ -62,6 +64,16 @@ public Map getTableConfig() { return store().getTableConfig(); } + @Override + public T getState(StateKey key) { + return store().getState(key); + } + + @Override + public void updateState(StateKey key, Function updater) { + store().begin().updateState(key, updater).commit(); + } + @Override public List getProcessStates() { return processContainerMap.values().stream() diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java index ee23ab4d3d..9b4b56a95c 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java @@ -74,7 +74,7 @@ public class DefaultTableRuntime extends AbstractTableRuntime { .jsonType(AbstractOptimizingEvaluator.PendingInput.class) .defaultValue(new AbstractOptimizingEvaluator.PendingInput()); - private static final StateKey CLEANUP_STATE_KEY = + public static final StateKey CLEANUP_STATE_KEY = StateKey.stateKey("cleanup_state") .jsonType(TableRuntimeCleanupState.class) .defaultValue(new TableRuntimeCleanupState()); diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/TableRuntimeCleanupState.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/TableRuntimeCleanupState.java index 9dfb98f6ed..f65ae387ca 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/TableRuntimeCleanupState.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/TableRuntimeCleanupState.java @@ -52,7 +52,8 @@ public long getLastSnapshotsExpiringTime() { return lastSnapshotsExpiringTime; } - public void setLastSnapshotsExpiringTime(long lastSnapshotsExpiringTime) { + public TableRuntimeCleanupState setLastSnapshotsExpiringTime(long lastSnapshotsExpiringTime) { this.lastSnapshotsExpiringTime = lastSnapshotsExpiringTime; + return this; } } diff --git a/amoro-ams/src/main/resources/META-INF/services/org.apache.amoro.process.ProcessFactory b/amoro-ams/src/main/resources/META-INF/services/org.apache.amoro.process.ProcessFactory new file mode 100755 index 0000000000..96baa8ebf6 --- /dev/null +++ b/amoro-ams/src/main/resources/META-INF/services/org.apache.amoro.process.ProcessFactory @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +org.apache.amoro.server.process.iceberg.IcebergProcessFactory diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestIcebergProcessFactory.java b/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestIcebergProcessFactory.java new file mode 100644 index 0000000000..35151ce3d9 --- /dev/null +++ b/amoro-ams/src/test/java/org/apache/amoro/server/process/iceberg/TestIcebergProcessFactory.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.process.iceberg; + +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; + +import org.apache.amoro.IcebergActions; +import org.apache.amoro.TableFormat; +import org.apache.amoro.TableRuntime; +import org.apache.amoro.config.TableConfiguration; +import org.apache.amoro.process.LocalExecutionEngine; +import org.apache.amoro.process.ProcessTriggerStrategy; +import org.apache.amoro.server.table.DefaultTableRuntime; +import org.apache.amoro.server.table.cleanup.TableRuntimeCleanupState; +import org.junit.Assert; +import org.junit.Test; + +import java.time.Duration; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +public class TestIcebergProcessFactory { + + @Test + public void testOpenAndSupportedActions() { + IcebergProcessFactory factory = new IcebergProcessFactory(); + + Map properties = new HashMap<>(); + properties.put("expire-snapshots.enabled", "true"); + properties.put("expire-snapshots.interval", "1h"); + + factory.open(properties); + + Map> supported = factory.supportedActions(); + Assert.assertTrue(supported.get(TableFormat.ICEBERG).contains(IcebergActions.EXPIRE_SNAPSHOTS)); + Assert.assertTrue( + supported.get(TableFormat.MIXED_ICEBERG).contains(IcebergActions.EXPIRE_SNAPSHOTS)); + Assert.assertTrue( + supported.get(TableFormat.MIXED_HIVE).contains(IcebergActions.EXPIRE_SNAPSHOTS)); + + ProcessTriggerStrategy strategy = + factory.triggerStrategy(TableFormat.ICEBERG, IcebergActions.EXPIRE_SNAPSHOTS); + Assert.assertEquals(Duration.ofHours(1), strategy.getTriggerInterval()); + } + + @Test + public void testTriggerExpireSnapshotWhenDue() { + IcebergProcessFactory factory = new IcebergProcessFactory(); + + Map properties = new HashMap<>(); + properties.put("expire-snapshots.enabled", "true"); + properties.put("expire-snapshots.interval", "1h"); + factory.open(properties); + + LocalExecutionEngine localEngine = mock(LocalExecutionEngine.class); + doReturn(LocalExecutionEngine.ENGINE_NAME).when(localEngine).name(); + factory.availableExecuteEngines(Arrays.asList(localEngine)); + + TableConfiguration tableConfiguration = new TableConfiguration().setExpireSnapshotEnabled(true); + TableRuntimeCleanupState cleanupState = + new TableRuntimeCleanupState().setLastSnapshotsExpiringTime(0); + + TableRuntime runtime = mock(TableRuntime.class); + doReturn(tableConfiguration).when(runtime).getTableConfiguration(); + doReturn(cleanupState).when(runtime).getState(DefaultTableRuntime.CLEANUP_STATE_KEY); + + Optional process = + factory.trigger(runtime, IcebergActions.EXPIRE_SNAPSHOTS); + + Assert.assertTrue(process.isPresent()); + Assert.assertTrue(process.get() instanceof SnapshotsExpiringProcess); + Assert.assertEquals(LocalExecutionEngine.ENGINE_NAME, process.get().getExecutionEngine()); + } + + @Test + public void testTriggerExpireSnapshotNotDue() { + IcebergProcessFactory factory = new IcebergProcessFactory(); + + Map properties = new HashMap<>(); + properties.put("expire-snapshots.enabled", "true"); + properties.put("expire-snapshots.interval", "1h"); + factory.open(properties); + + factory.availableExecuteEngines(Arrays.asList(mock(LocalExecutionEngine.class))); + + TableConfiguration tableConfiguration = new TableConfiguration().setExpireSnapshotEnabled(true); + long now = System.currentTimeMillis(); + TableRuntimeCleanupState cleanupState = + new TableRuntimeCleanupState().setLastSnapshotsExpiringTime(now); + + TableRuntime runtime = mock(TableRuntime.class); + doReturn(tableConfiguration).when(runtime).getTableConfiguration(); + doReturn(cleanupState).when(runtime).getState(DefaultTableRuntime.CLEANUP_STATE_KEY); + + Optional process = + factory.trigger(runtime, IcebergActions.EXPIRE_SNAPSHOTS); + + Assert.assertFalse(process.isPresent()); + } + + @Test + public void testTriggerExpireSnapshotDisabled() { + IcebergProcessFactory factory = new IcebergProcessFactory(); + + Map properties = new HashMap<>(); + properties.put("expire-snapshots.enabled", "true"); + properties.put("expire-snapshots.interval", "1h"); + factory.open(properties); + + factory.availableExecuteEngines(Arrays.asList(mock(LocalExecutionEngine.class))); + + TableConfiguration tableConfiguration = + new TableConfiguration().setExpireSnapshotEnabled(false); + TableRuntimeCleanupState cleanupState = + new TableRuntimeCleanupState().setLastSnapshotsExpiringTime(0); + + TableRuntime runtime = mock(TableRuntime.class); + doReturn(tableConfiguration).when(runtime).getTableConfiguration(); + doReturn(cleanupState).when(runtime).getState(DefaultTableRuntime.CLEANUP_STATE_KEY); + + Optional process = + factory.trigger(runtime, IcebergActions.EXPIRE_SNAPSHOTS); + + Assert.assertFalse(process.isPresent()); + } +} diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestConfigurableIntervalExecutors.java b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestConfigurableIntervalExecutors.java index 3a20c6101a..a3d7b25152 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestConfigurableIntervalExecutors.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestConfigurableIntervalExecutors.java @@ -64,36 +64,4 @@ public void testDanglingDeleteFilesShouldExecuteAfterInterval() { // 5 hours ago - should not execute Assert.assertFalse(executor.shouldExecute(now - Duration.ofHours(5).toMillis())); } - - @Test - public void testSnapshotsExpiringDefaultInterval() { - Duration interval = Duration.ofHours(1); - SnapshotsExpiringExecutor executor = new SnapshotsExpiringExecutor(null, 1, interval); - - TableRuntime tableRuntime = Mockito.mock(TableRuntime.class); - Assert.assertEquals( - Duration.ofHours(1).toMillis(), executor.getNextExecutingTime(tableRuntime)); - } - - @Test - public void testSnapshotsExpiringCustomInterval() { - Duration interval = Duration.ofMinutes(30); - SnapshotsExpiringExecutor executor = new SnapshotsExpiringExecutor(null, 1, interval); - - TableRuntime tableRuntime = Mockito.mock(TableRuntime.class); - Assert.assertEquals( - Duration.ofMinutes(30).toMillis(), executor.getNextExecutingTime(tableRuntime)); - } - - @Test - public void testSnapshotsExpiringShouldExecuteAfterInterval() { - Duration interval = Duration.ofHours(2); - SnapshotsExpiringExecutor executor = new SnapshotsExpiringExecutor(null, 1, interval); - - long now = System.currentTimeMillis(); - // 3 hours ago - should execute - Assert.assertTrue(executor.shouldExecute(now - Duration.ofHours(3).toMillis())); - // 1 hour ago - should not execute - Assert.assertFalse(executor.shouldExecute(now - Duration.ofHours(1).toMillis())); - } } diff --git a/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java b/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java index c75c5ac8d0..7b83192602 100644 --- a/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java +++ b/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java @@ -28,4 +28,5 @@ public class IcebergActions { public static final Action DELETE_ORPHANS = Action.register("delete-orphans"); public static final Action SYNC_HIVE = Action.register("sync-hive"); public static final Action EXPIRE_DATA = Action.register("expire-data"); + public static final Action EXPIRE_SNAPSHOTS = Action.register("expire-snapshots"); } diff --git a/amoro-common/src/main/java/org/apache/amoro/TableRuntime.java b/amoro-common/src/main/java/org/apache/amoro/TableRuntime.java index 1d96553fed..0059593e85 100644 --- a/amoro-common/src/main/java/org/apache/amoro/TableRuntime.java +++ b/amoro-common/src/main/java/org/apache/amoro/TableRuntime.java @@ -22,9 +22,11 @@ import org.apache.amoro.metrics.MetricRegistry; import org.apache.amoro.process.ProcessFactory; import org.apache.amoro.process.TableProcessStore; +import org.apache.amoro.table.StateKey; import java.util.List; import java.util.Map; +import java.util.function.Function; /** * TableRuntime is the key interface for the AMS framework to interact with the table. Typically, it @@ -81,6 +83,24 @@ public interface TableRuntime { */ Map getTableConfig(); + /** + * Get the value of table-runtime state + * + * @param key the state key + * @return value of the state + * @param state value type + */ + T getState(StateKey key); + + /** + * Update the state + * + * @param key key of state + * @param updater value updater of state + * @param value type of state. + */ + void updateState(StateKey key, Function updater); + /** * Register the metric of the table runtime. * diff --git a/amoro-common/src/main/java/org/apache/amoro/config/Configurations.java b/amoro-common/src/main/java/org/apache/amoro/config/Configurations.java index 74ecf61a12..baaed361c5 100644 --- a/amoro-common/src/main/java/org/apache/amoro/config/Configurations.java +++ b/amoro-common/src/main/java/org/apache/amoro/config/Configurations.java @@ -548,6 +548,19 @@ public long getDurationInMillis(ConfigOption option) { return result; } + public Duration getDuration(ConfigOption option) { + try { + return getOptional(option).orElseGet(option::defaultValue); + } catch (Exception e) { // may be throw java.lang.ArithmeticException: long overflow + throw new ConfigurationException( + option.key(), + String.format( + "Exception when converting duration for config option '%s': %s", + option.key(), e.getMessage()), + e); + } + } + public Optional getOptional(ConfigOption option) { Optional rawValue = getRawValueFromOption(option); Class clazz = option.getClazz(); diff --git a/amoro-common/src/main/java/org/apache/amoro/process/LocalExecutionEngine.java b/amoro-common/src/main/java/org/apache/amoro/process/LocalExecutionEngine.java new file mode 100755 index 0000000000..5999de9f01 --- /dev/null +++ b/amoro-common/src/main/java/org/apache/amoro/process/LocalExecutionEngine.java @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.process; + +import org.apache.amoro.config.ConfigOption; +import org.apache.amoro.config.ConfigOptions; +import org.apache.amoro.config.Configurations; +import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; +import org.apache.amoro.shade.guava32.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.PrintWriter; +import java.io.StringWriter; +import java.time.Duration; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + +/** + * Local execution engine that runs {@link org.apache.amoro.process.LocalProcess} instances in AMS + * thread pools. + * + *

The engine maintains multiple thread pools keyed by {@link + * org.apache.amoro.process.LocalProcess#tag()}. + */ +public class LocalExecutionEngine implements ExecuteEngine { + private static final Logger LOG = LoggerFactory.getLogger(LocalExecutionEngine.class); + + public static final String ENGINE_NAME = "local"; + public static final String DEFAULT_POOL = "default"; + public static final String POOL_CONFIG_PREFIX = "pool."; + public static final String POOL_SIZE_SUFFIX = ".thread-count"; + public static final ConfigOption DEFAULT_POOL_SIZE = + ConfigOptions.key(POOL_CONFIG_PREFIX + DEFAULT_POOL + POOL_SIZE_SUFFIX) + .intType() + .defaultValue(10); + public static final ConfigOption PROCESS_STATUS_TTL = + ConfigOptions.key("process.status.ttl").durationType().defaultValue(Duration.ofHours(4)); + + private final Map pools = new ConcurrentHashMap<>(); + private final ConcurrentHashMap processes = new ConcurrentHashMap<>(); + + private long statusTtl = PROCESS_STATUS_TTL.defaultValue().toMillis(); + + @Override + public EngineType engineType() { + return EngineType.of(ENGINE_NAME); + } + + @Override + public ProcessStatus getStatus(String processIdentifier) { + if (processIdentifier == null || processIdentifier.isEmpty()) { + return ProcessStatus.UNKNOWN; + } + expire(); + + ProcessHolder process = processes.get(processIdentifier); + if (process == null) { + return ProcessStatus.UNKNOWN; + } + return process.getStatus(); + } + + @Override + public String submitTableProcess(TableProcess tableProcess) { + if (!(tableProcess instanceof LocalProcess)) { + throw new IllegalArgumentException( + "LocalExecutionEngine only supports LocalProcess, but got: " + tableProcess.getClass()); + } + + LocalProcess localProcess = (LocalProcess) tableProcess; + String identifier = UUID.randomUUID().toString(); + + ThreadPoolExecutor executor = getPool(localProcess.tag()); + CompletableFuture future = CompletableFuture.runAsync(localProcess::run, executor); + processes.put(identifier, new ProcessHolder(future)); + expire(); + return identifier; + } + + @Override + public ProcessStatus tryCancelTableProcess(TableProcess tableProcess, String processIdentifier) { + ProcessHolder p = this.processes.get(processIdentifier); + if (p == null) { + return ProcessStatus.UNKNOWN; + } + if (p.finishTime() > 0) { + return p.getStatus(); + } + p.cancel(); + return p.getStatus(); + } + + @Override + public void open(Map properties) { + Configurations configs = Configurations.fromMap(properties); + int defaultSize = configs.getInteger(DEFAULT_POOL_SIZE); + pools.put(DEFAULT_POOL, newFixedPool(DEFAULT_POOL, defaultSize)); + + Set customPools = + properties.keySet().stream() + .filter(key -> key.startsWith(POOL_CONFIG_PREFIX)) + .map(key -> key.substring(POOL_CONFIG_PREFIX.length())) + .map(key -> key.substring(0, key.indexOf("."))) + .map(String::toLowerCase) + .filter(name -> !DEFAULT_POOL.equalsIgnoreCase(name)) + .collect(Collectors.toSet()); + + customPools.forEach( + name -> { + ConfigOption poolSizeOpt = + ConfigOptions.key(POOL_CONFIG_PREFIX + name + POOL_SIZE_SUFFIX) + .intType() + .defaultValue(-1); + int size = configs.getInteger(poolSizeOpt); + Preconditions.checkArgument(size > 0, "Pool thread-count is not configured for %s", name); + pools.put(name, newFixedPool(name, size)); + LOG.info("Initialize local execute pool:{} with size:{}", name, size); + }); + this.statusTtl = configs.getDurationInMillis(PROCESS_STATUS_TTL); + } + + @Override + public void close() { + pools.values().forEach(ThreadPoolExecutor::shutdown); + pools.clear(); + } + + @Override + public String name() { + return ENGINE_NAME; + } + + private ThreadPoolExecutor getPool(String tag) { + if (pools.containsKey(tag)) { + return pools.get(tag); + } + return pools.get(DEFAULT_POOL); + } + + private ThreadPoolExecutor newFixedPool(String tag, int size) { + return new ThreadPoolExecutor( + size, + size, + 60, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>(), + new ThreadFactoryBuilder().setDaemon(false).setNameFormat("local-" + tag + "-%d").build()); + } + + private void expire() { + long threshold = System.currentTimeMillis() - statusTtl; + Set expireIdentifiers = + processes.entrySet().stream() + .filter(e -> e.getValue().finishTime() > 0 && e.getValue().finishTime() < threshold) + .map(Map.Entry::getKey) + .collect(Collectors.toSet()); + expireIdentifiers.forEach(processes::remove); + } + + private static class ProcessHolder { + private final CompletableFuture future; + private final AtomicLong finishTime = new AtomicLong(-1); + private final AtomicReference status = + new AtomicReference<>(ProcessStatus.RUNNING); + private final AtomicReference failedInfo = new AtomicReference<>(""); + + public ProcessHolder(CompletableFuture future) { + this.future = future; + this.future.whenComplete((v, t) -> onComplete(t)); + } + + private void onComplete(Throwable e) { + finishTime.compareAndSet(-1, System.currentTimeMillis()); + if (e != null) { + status.compareAndSet(ProcessStatus.RUNNING, ProcessStatus.FAILED); + failedInfo.compareAndSet("", exceptionToString(e)); + } else { + status.compareAndSet(ProcessStatus.RUNNING, ProcessStatus.SUCCESS); + } + } + + private static String exceptionToString(Throwable throwable) { + StringWriter sw = new StringWriter(); + throwable.printStackTrace(new PrintWriter(sw)); + return sw.toString(); + } + + public ProcessStatus getStatus() { + return status.get(); + } + + public void cancel() { + if (finishTime() > 0) { + return; + } + status.compareAndSet(ProcessStatus.RUNNING, ProcessStatus.CANCELED); + future.cancel(true); + } + + public long finishTime() { + return this.finishTime.get(); + } + } +} diff --git a/amoro-common/src/main/java/org/apache/amoro/process/LocalProcess.java b/amoro-common/src/main/java/org/apache/amoro/process/LocalProcess.java new file mode 100755 index 0000000000..7dc358ad88 --- /dev/null +++ b/amoro-common/src/main/java/org/apache/amoro/process/LocalProcess.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.process; + +/** + * A {@link TableProcess} that can be executed locally by a thread pool. + * + *

Local processes are executed by {@code ExecuteEngine} implementations which run the logic + * inside the AMS JVM. + */ +public interface LocalProcess extends AmoroProcess { + + /** Execute process logic locally. */ + void run(); + + /** + * Tag used by local execution engines to select a thread pool. + * + * @return pool tag + */ + default String tag() { + return "default"; + } +} diff --git a/amoro-common/src/main/resources/META-INF/services/org.apache.amoro.process.ExecuteEngine b/amoro-common/src/main/resources/META-INF/services/org.apache.amoro.process.ExecuteEngine new file mode 100755 index 0000000000..8859bb6d39 --- /dev/null +++ b/amoro-common/src/main/resources/META-INF/services/org.apache.amoro.process.ExecuteEngine @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +org.apache.amoro.process.LocalExecutionEngine diff --git a/amoro-common/src/test/java/org/apache/amoro/process/TestLocalExecutionEngine.java b/amoro-common/src/test/java/org/apache/amoro/process/TestLocalExecutionEngine.java new file mode 100644 index 0000000000..d6930622dc --- /dev/null +++ b/amoro-common/src/test/java/org/apache/amoro/process/TestLocalExecutionEngine.java @@ -0,0 +1,207 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.process; + +import static org.mockito.Mockito.mock; + +import org.apache.amoro.Action; +import org.apache.amoro.TableRuntime; +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +public class TestLocalExecutionEngine { + + private LocalExecutionEngine engine; + + @After + public void tearDown() { + if (engine != null) { + engine.close(); + } + } + + @Test + public void testSubmitUsesCustomPoolByTag() throws Exception { + engine = createEngineWithTtl("1h"); + + CountDownLatch started = new CountDownLatch(1); + AtomicReference threadName = new AtomicReference<>(); + + LocalProcessTableProcess process = + new LocalProcessTableProcess( + mock(TableRuntime.class), + engine, + "snapshots-expiring", + () -> { + threadName.set(Thread.currentThread().getName()); + started.countDown(); + }); + + String identifier = engine.submitTableProcess(process); + + Assert.assertTrue("process should start", started.await(5, TimeUnit.SECONDS)); + Assert.assertTrue( + "should run in custom pool", + threadName.get() != null && threadName.get().startsWith("local-snapshots-expiring-")); + + waitForStatus(identifier, ProcessStatus.SUCCESS, 5000); + } + + @Test + public void testCancelRunningProcess() throws Exception { + engine = createEngineWithTtl("1h"); + + CountDownLatch blockLatch = new CountDownLatch(1); + LocalProcessTableProcess process = + new LocalProcessTableProcess( + mock(TableRuntime.class), + engine, + "default", + () -> { + try { + blockLatch.await(); + } catch (InterruptedException ignored) { + // ignore + } + }); + + String identifier = engine.submitTableProcess(process); + + // Process may not be scheduled yet, but holder is already created. + Assert.assertEquals(ProcessStatus.RUNNING, engine.getStatus(identifier)); + + engine.tryCancelTableProcess(process, identifier); + + // Eventually the process should be marked as canceled. + waitForStatus(identifier, ProcessStatus.CANCELED, 5000); + } + + @Test + public void testFinishedStatusExpired() throws Exception { + engine = createEngineWithTtl("100ms"); + + LocalProcessTableProcess process = + new LocalProcessTableProcess(mock(TableRuntime.class), engine, "default", () -> {}); + + String identifier = engine.submitTableProcess(process); + + waitForStatus(identifier, ProcessStatus.SUCCESS, 5000); + + // Wait until process info should be expired. + Thread.sleep(200); + + Assert.assertEquals(ProcessStatus.UNKNOWN, engine.getStatus(identifier)); + } + + @Test + public void testFailedProcessStatus() throws Exception { + engine = createEngineWithTtl("1h"); + + LocalProcessTableProcess process = + new LocalProcessTableProcess( + mock(TableRuntime.class), + engine, + "default", + () -> { + throw new RuntimeException("boom"); + }); + + String identifier = engine.submitTableProcess(process); + + waitForStatus(identifier, ProcessStatus.FAILED, 5000); + } + + @Test + public void testGetStatusForInvalidIdentifier() { + engine = createEngineWithTtl("1h"); + + Assert.assertEquals(ProcessStatus.UNKNOWN, engine.getStatus(null)); + Assert.assertEquals(ProcessStatus.UNKNOWN, engine.getStatus("")); + Assert.assertEquals(ProcessStatus.UNKNOWN, engine.getStatus("not-exist")); + } + + private LocalExecutionEngine createEngineWithTtl(String ttl) { + LocalExecutionEngine localEngine = new LocalExecutionEngine(); + Map properties = new HashMap<>(); + properties.put("pool.default.thread-count", "1"); + properties.put("pool.snapshots-expiring.thread-count", "1"); + properties.put("process.status.ttl", ttl); + localEngine.open(properties); + return localEngine; + } + + private void waitForStatus(String identifier, ProcessStatus expected, long timeoutMillis) + throws InterruptedException { + long deadline = System.currentTimeMillis() + timeoutMillis; + while (System.currentTimeMillis() < deadline) { + ProcessStatus status = engine.getStatus(identifier); + if (status == expected) { + return; + } + Thread.sleep(10); + } + Assert.fail( + "Timeout waiting for status " + expected + ", current=" + engine.getStatus(identifier)); + } + + private static class LocalProcessTableProcess extends TableProcess implements LocalProcess { + + private final String tag; + private final Runnable runnable; + + LocalProcessTableProcess( + TableRuntime tableRuntime, ExecuteEngine engine, String tag, Runnable runnable) { + super(tableRuntime, engine); + this.tag = tag; + this.runnable = runnable; + } + + @Override + public void run() { + runnable.run(); + } + + @Override + public String tag() { + return tag; + } + + @Override + public Action getAction() { + return Action.register("TEST"); + } + + @Override + public Map getProcessParameters() { + return new HashMap<>(); + } + + @Override + public Map getSummary() { + return new HashMap<>(); + } + } +} diff --git a/dist/src/main/amoro-bin/conf/plugins/execute-engines.yaml b/dist/src/main/amoro-bin/conf/plugins/execute-engines.yaml new file mode 100755 index 0000000000..5c3199f320 --- /dev/null +++ b/dist/src/main/amoro-bin/conf/plugins/execute-engines.yaml @@ -0,0 +1,24 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +execute-engines: + - name: local + enabled: true + priority: 100 + properties: + pool.default.thread-count: 10 + pool.snapshots-expiring.thread-count: 10 diff --git a/dist/src/main/amoro-bin/conf/plugins/process-factories.yaml b/dist/src/main/amoro-bin/conf/plugins/process-factories.yaml new file mode 100755 index 0000000000..e1455c2cf1 --- /dev/null +++ b/dist/src/main/amoro-bin/conf/plugins/process-factories.yaml @@ -0,0 +1,24 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +process-factories: + - name: iceberg + enabled: true + priority: 100 + properties: + expire-snapshots.enabled: "true" + expire-snapshots.interval: "1h"