From b5e817f5b6fc9e9035ac0f5422001b4293ab96f6 Mon Sep 17 00:00:00 2001 From: "zhangyongxiang.alpha" Date: Thu, 5 Mar 2026 20:07:02 +0800 Subject: [PATCH 1/8] [Improvement] Refactor SnapshotExpiring inline executor with ProcessFactory Co-Authored-By: Aime Change-Id: Idfac8a56427baccaeeca27e8f71719d476d7839a --- .gitignore | 9 + .../amoro/server/AmoroServiceContainer.java | 1 - .../process/ActionCoordinatorScheduler.java | 20 ++ .../executor/LocalExecutionEngine.java | 192 ++++++++++++++++++ .../iceberg/IcebergProcessFactory.java | 164 +++++++++++++++ .../iceberg/SnapshotsExpiringProcess.java | 66 ++++++ .../inline/InlineTableExecutors.java | 12 -- .../org.apache.amoro.process.ProcessFactory | 1 + ...moro.server.process.executor.ExecuteEngine | 1 + .../TestConfigurableIntervalExecutors.java | 57 ++++-- .../java/org/apache/amoro/IcebergActions.java | 1 + .../apache/amoro/process/LocalProcess.java | 40 ++++ .../conf/plugins/execute-engines.yaml | 24 +++ .../conf/plugins/process-factories.yaml | 25 +++ 14 files changed, 582 insertions(+), 31 deletions(-) create mode 100755 amoro-ams/src/main/java/org/apache/amoro/server/process/executor/LocalExecutionEngine.java create mode 100755 amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java create mode 100755 amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/SnapshotsExpiringProcess.java create mode 100755 amoro-ams/src/main/resources/META-INF/services/org.apache.amoro.process.ProcessFactory create mode 100755 amoro-ams/src/main/resources/META-INF/services/org.apache.amoro.server.process.executor.ExecuteEngine create mode 100755 amoro-common/src/main/java/org/apache/amoro/process/LocalProcess.java create mode 100755 dist/src/main/amoro-bin/conf/plugins/execute-engines.yaml create mode 100755 dist/src/main/amoro-bin/conf/plugins/process-factories.yaml diff --git a/.gitignore b/.gitignore index c140957d0a..81385cd434 100644 --- a/.gitignore +++ b/.gitignore @@ -78,5 +78,14 @@ public resources !javadoc/resources +# Maven resources directories should be tracked +!*/src/main/resources/ +!*/src/main/resources/** +!*/src/test/resources/ +!*/src/test/resources/** + +# dashboard static resources +amoro-web/src/main/resources/static/ + !dist/src/main/amoro-bin/bin/ !dist/src/main/amoro-bin/conf/ diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java index 25de6a7afe..ef3ad2642f 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java @@ -260,7 +260,6 @@ public void startOptimizingService() throws Exception { addHandlerChain(optimizingService.getTableRuntimeHandler()); addHandlerChain(processService.getTableHandlerChain()); addHandlerChain(InlineTableExecutors.getInstance().getDataExpiringExecutor()); - addHandlerChain(InlineTableExecutors.getInstance().getSnapshotsExpiringExecutor()); addHandlerChain(InlineTableExecutors.getInstance().getOrphanFilesCleaningExecutor()); addHandlerChain(InlineTableExecutors.getInstance().getDanglingDeleteFilesCleaningExecutor()); addHandlerChain(InlineTableExecutors.getInstance().getOptimizingCommitExecutor()); diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/process/ActionCoordinatorScheduler.java b/amoro-ams/src/main/java/org/apache/amoro/server/process/ActionCoordinatorScheduler.java index 4e9f79a352..47b49ff1d1 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/process/ActionCoordinatorScheduler.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/process/ActionCoordinatorScheduler.java @@ -18,6 +18,7 @@ package org.apache.amoro.server.process; +import org.apache.amoro.IcebergActions; import org.apache.amoro.TableFormat; import org.apache.amoro.TableRuntime; import org.apache.amoro.process.ActionCoordinator; @@ -25,6 +26,7 @@ import org.apache.amoro.process.TableProcessStore; import org.apache.amoro.server.scheduler.PeriodicTableScheduler; import org.apache.amoro.server.table.TableService; +import org.apache.amoro.server.table.cleanup.CleanupOperation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,12 +43,14 @@ public class ActionCoordinatorScheduler extends PeriodicTableScheduler { private final ActionCoordinator coordinator; private final ProcessService processService; + private final long intervalMillis; public ActionCoordinatorScheduler( ActionCoordinator coordinator, TableService tableService, ProcessService processService) { super(coordinator.action(), tableService, coordinator.parallelism()); this.coordinator = coordinator; this.processService = processService; + this.intervalMillis = coordinator.getExecutorDelay(); } /** @@ -112,6 +116,22 @@ protected TableProcess recover(TableRuntime tableRuntime, TableProcessStore proc return coordinator.recoverTableProcess(tableRuntime, processStore); } + @Override + protected CleanupOperation getCleanupOperation() { + if (IcebergActions.EXPIRE_SNAPSHOTS.equals(coordinator.action())) { + return CleanupOperation.SNAPSHOTS_EXPIRING; + } + return CleanupOperation.NONE; + } + + @Override + protected boolean shouldExecute(Long lastCleanupEndTime) { + if (getCleanupOperation() == CleanupOperation.SNAPSHOTS_EXPIRING) { + return System.currentTimeMillis() - lastCleanupEndTime >= intervalMillis; + } + return super.shouldExecute(lastCleanupEndTime); + } + @Override protected long getExecutorDelay() { return coordinator.getExecutorDelay(); diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/process/executor/LocalExecutionEngine.java b/amoro-ams/src/main/java/org/apache/amoro/server/process/executor/LocalExecutionEngine.java new file mode 100755 index 0000000000..3ebd552bcb --- /dev/null +++ b/amoro-ams/src/main/java/org/apache/amoro/server/process/executor/LocalExecutionEngine.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.process.executor; + +import org.apache.amoro.process.LocalProcess; +import org.apache.amoro.process.ProcessStatus; +import org.apache.amoro.process.TableProcess; +import org.apache.amoro.shade.guava32.com.google.common.util.concurrent.ThreadFactoryBuilder; + +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * Local execution engine that runs {@link LocalProcess} instances in AMS thread pools. + * + *

The engine maintains multiple thread pools keyed by {@link LocalProcess#tag()}. + */ +public class LocalExecutionEngine implements ExecuteEngine { + + public static final String ENGINE_NAME = "local"; + public static final String DEFAULT_POOL = "default"; + public static final String SNAPSHOTS_EXPIRING_POOL = "snapshots-expiring"; + + private final Map pools = new ConcurrentHashMap<>(); + private final Map> activeInstances = new ConcurrentHashMap<>(); + private final Map> cancelingInstances = new ConcurrentHashMap<>(); + + @Override + public EngineType engineType() { + return EngineType.of(ENGINE_NAME); + } + + @Override + public ProcessStatus getStatus(String processIdentifier) { + if (processIdentifier == null || processIdentifier.isEmpty()) { + return ProcessStatus.UNKNOWN; + } + + Map> instances = + cancelingInstances.containsKey(processIdentifier) ? cancelingInstances : activeInstances; + + Future future = instances.get(processIdentifier); + if (future == null) { + return ProcessStatus.KILLED; + } + + if (future.isCancelled()) { + instances.remove(processIdentifier); + return ProcessStatus.CANCELED; + } + + if (future.isDone()) { + instances.remove(processIdentifier); + try { + future.get(); + return ProcessStatus.SUCCESS; + } catch (Exception e) { + return ProcessStatus.FAILED; + } + } + + return cancelingInstances.containsKey(processIdentifier) + ? ProcessStatus.CANCELING + : ProcessStatus.RUNNING; + } + + @Override + public String submitTableProcess(TableProcess tableProcess) { + if (!(tableProcess instanceof LocalProcess)) { + throw new IllegalArgumentException( + "LocalExecutionEngine only supports LocalProcess, but got: " + tableProcess.getClass()); + } + + LocalProcess localProcess = (LocalProcess) tableProcess; + String identifier = UUID.randomUUID().toString(); + + ThreadPoolExecutor executor = getOrCreatePool(localProcess.tag()); + Future future = + executor.submit( + () -> { + localProcess.run(); + return null; + }); + + activeInstances.put(identifier, future); + return identifier; + } + + @Override + public ProcessStatus tryCancelTableProcess(TableProcess tableProcess, String processIdentifier) { + Future future = activeInstances.get(processIdentifier); + if (future == null) { + return ProcessStatus.CANCELED; + } + + activeInstances.remove(processIdentifier); + cancelingInstances.put(processIdentifier, future); + + if (future.isDone()) { + try { + future.get(); + return ProcessStatus.SUCCESS; + } catch (Exception e) { + return ProcessStatus.FAILED; + } + } + + if (future.isCancelled()) { + return ProcessStatus.CANCELED; + } + + future.cancel(true); + return ProcessStatus.CANCELING; + } + + @Override + public void open(Map properties) { + String defaultSizeValue = properties == null ? null : properties.get("default.thread-count"); + int defaultSize = parseInt(defaultSizeValue, 10); + pools.put(DEFAULT_POOL, newFixedPool(DEFAULT_POOL, defaultSize)); + + String snapshotsExpiringSizeValue = + properties == null ? null : properties.get("snapshots-expiring.thread-count"); + int snapshotsExpiringSize = parseInt(snapshotsExpiringSizeValue, defaultSize); + pools.put( + SNAPSHOTS_EXPIRING_POOL, + newFixedPool(SNAPSHOTS_EXPIRING_POOL, Math.max(snapshotsExpiringSize, 1))); + } + + @Override + public void close() { + pools.values().forEach(ThreadPoolExecutor::shutdown); + pools.clear(); + activeInstances.clear(); + cancelingInstances.clear(); + } + + @Override + public String name() { + return ENGINE_NAME; + } + + private ThreadPoolExecutor getOrCreatePool(String tag) { + if (tag == null || tag.isEmpty()) { + tag = DEFAULT_POOL; + } + + return pools.computeIfAbsent(tag, t -> newFixedPool(t, 10)); + } + + private ThreadPoolExecutor newFixedPool(String tag, int size) { + return new ThreadPoolExecutor( + size, + size, + 60, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>(), + new ThreadFactoryBuilder().setDaemon(false).setNameFormat("local-" + tag + "-%d").build()); + } + + private int parseInt(String value, int defaultValue) { + if (value == null) { + return defaultValue; + } + try { + return Integer.parseInt(value); + } catch (NumberFormatException e) { + return defaultValue; + } + } +} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java new file mode 100755 index 0000000000..e768cf353b --- /dev/null +++ b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.process.iceberg; + +import org.apache.amoro.Action; +import org.apache.amoro.IcebergActions; +import org.apache.amoro.TableFormat; +import org.apache.amoro.TableRuntime; +import org.apache.amoro.config.ConfigHelpers; +import org.apache.amoro.process.ProcessFactory; +import org.apache.amoro.process.ProcessTriggerStrategy; +import org.apache.amoro.process.RecoverProcessFailedException; +import org.apache.amoro.process.TableProcess; +import org.apache.amoro.process.TableProcessStore; +import org.apache.amoro.server.process.DefaultTableProcessStore; +import org.apache.amoro.server.process.TableProcessMeta; +import org.apache.amoro.server.process.executor.LocalExecutionEngine; +import org.apache.amoro.server.utils.SnowflakeIdGenerator; + +import java.time.Duration; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +/** Default process factory for Iceberg-related maintenance actions in AMS. */ +public class IcebergProcessFactory implements ProcessFactory { + + public static final String PLUGIN_NAME = "iceberg"; + + private final SnowflakeIdGenerator idGenerator = new SnowflakeIdGenerator(); + + private boolean expireSnapshotsEnabled = true; + private int expireSnapshotsThreadCount = 10; + private Duration expireSnapshotsInterval = Duration.ofHours(1); + + @Override + public Map> supportedActions() { + Set actions = new HashSet<>(); + actions.add(IcebergActions.EXPIRE_SNAPSHOTS); + + Map> supported = new HashMap<>(); + supported.put(TableFormat.ICEBERG, actions); + supported.put(TableFormat.MIXED_ICEBERG, actions); + supported.put(TableFormat.MIXED_HIVE, actions); + return supported; + } + + @Override + public ProcessTriggerStrategy triggerStrategy(TableFormat format, Action action) { + if (IcebergActions.EXPIRE_SNAPSHOTS.equals(action)) { + return new ProcessTriggerStrategy( + expireSnapshotsInterval, false, Math.max(expireSnapshotsThreadCount, 1)); + } + + return ProcessTriggerStrategy.METADATA_TRIGGER; + } + + @Override + public Optional trigger(TableRuntime tableRuntime, Action action) { + if (IcebergActions.EXPIRE_SNAPSHOTS.equals(action) + && (!expireSnapshotsEnabled + || !tableRuntime.getTableConfiguration().isExpireSnapshotEnabled())) { + return Optional.empty(); + } + + long processId = idGenerator.generateId(); + TableProcessMeta meta = + TableProcessMeta.of( + processId, + tableRuntime.getTableIdentifier().getId(), + action.getName(), + LocalExecutionEngine.ENGINE_NAME, + Collections.emptyMap()); + + TableProcessStore store = new DefaultTableProcessStore(tableRuntime, meta, action); + + if (IcebergActions.EXPIRE_SNAPSHOTS.equals(action)) { + return Optional.of(new SnapshotsExpiringProcess(tableRuntime, store)); + } + + return Optional.empty(); + } + + @Override + public TableProcess recover(TableRuntime tableRuntime, TableProcessStore store) + throws RecoverProcessFailedException { + if (IcebergActions.EXPIRE_SNAPSHOTS.equals(store.getAction())) { + return new SnapshotsExpiringProcess(tableRuntime, store); + } + + throw new RecoverProcessFailedException( + "Unsupported action for IcebergProcessFactory: " + store.getAction()); + } + + @Override + public void open(Map properties) { + if (properties == null || properties.isEmpty()) { + return; + } + + expireSnapshotsEnabled = + parseBoolean(properties.get("expire-snapshots.enabled"), expireSnapshotsEnabled); + expireSnapshotsThreadCount = + parseInt(properties.get("expire-snapshots.thread-count"), expireSnapshotsThreadCount); + expireSnapshotsInterval = + parseDuration(properties.get("expire-snapshots.interval"), expireSnapshotsInterval); + } + + private boolean parseBoolean(String value, boolean defaultValue) { + if (value == null) { + return defaultValue; + } + return Boolean.parseBoolean(value.trim()); + } + + private int parseInt(String value, int defaultValue) { + if (value == null) { + return defaultValue; + } + try { + return Integer.parseInt(value.trim()); + } catch (NumberFormatException e) { + return defaultValue; + } + } + + private Duration parseDuration(String value, Duration defaultValue) { + if (value == null) { + return defaultValue; + } + try { + return ConfigHelpers.TimeUtils.parseDuration(value); + } catch (Exception e) { + return defaultValue; + } + } + + @Override + public void close() {} + + @Override + public String name() { + return PLUGIN_NAME; + } +} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/SnapshotsExpiringProcess.java b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/SnapshotsExpiringProcess.java new file mode 100755 index 0000000000..e92e93ab51 --- /dev/null +++ b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/SnapshotsExpiringProcess.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.process.iceberg; + +import org.apache.amoro.AmoroTable; +import org.apache.amoro.TableRuntime; +import org.apache.amoro.maintainer.TableMaintainer; +import org.apache.amoro.process.LocalProcess; +import org.apache.amoro.process.TableProcess; +import org.apache.amoro.process.TableProcessStore; +import org.apache.amoro.server.optimizing.maintainer.TableMaintainers; +import org.apache.amoro.server.process.executor.LocalExecutionEngine; +import org.apache.amoro.server.table.DefaultTableRuntime; +import org.apache.amoro.server.table.cleanup.CleanupOperation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Local table process for expiring Iceberg snapshots. */ +public class SnapshotsExpiringProcess extends TableProcess implements LocalProcess { + + private static final Logger LOG = LoggerFactory.getLogger(SnapshotsExpiringProcess.class); + + public SnapshotsExpiringProcess(TableRuntime tableRuntime, TableProcessStore store) { + super(tableRuntime, store); + } + + @Override + public String tag() { + return LocalExecutionEngine.SNAPSHOTS_EXPIRING_POOL; + } + + @Override + public void run() { + try { + AmoroTable amoroTable = tableRuntime.loadTable(); + TableMaintainer tableMaintainer = TableMaintainers.create(amoroTable, tableRuntime); + tableMaintainer.expireSnapshots(); + } catch (Throwable t) { + LOG.error("unexpected expire error of table {} ", tableRuntime.getTableIdentifier(), t); + } finally { + if (tableRuntime instanceof DefaultTableRuntime) { + ((DefaultTableRuntime) tableRuntime) + .updateLastCleanTime(CleanupOperation.SNAPSHOTS_EXPIRING, System.currentTimeMillis()); + } + } + } + + @Override + protected void closeInternal() {} +} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/InlineTableExecutors.java b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/InlineTableExecutors.java index 17205bfbfc..17eb72e392 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/InlineTableExecutors.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/InlineTableExecutors.java @@ -25,7 +25,6 @@ public class InlineTableExecutors { private static final InlineTableExecutors instance = new InlineTableExecutors(); - private SnapshotsExpiringExecutor snapshotsExpiringExecutor; private TableRuntimeRefreshExecutor tableRefreshingExecutor; private OrphanFilesCleaningExecutor orphanFilesCleaningExecutor; private DanglingDeleteFilesCleaningExecutor danglingDeleteFilesCleaningExecutor; @@ -41,13 +40,6 @@ public static InlineTableExecutors getInstance() { } public void setup(TableService tableService, Configurations conf) { - if (conf.getBoolean(AmoroManagementConf.EXPIRE_SNAPSHOTS_ENABLED)) { - this.snapshotsExpiringExecutor = - new SnapshotsExpiringExecutor( - tableService, - conf.getInteger(AmoroManagementConf.EXPIRE_SNAPSHOTS_THREAD_COUNT), - conf.get(AmoroManagementConf.EXPIRE_SNAPSHOTS_INTERVAL)); - } if (conf.getBoolean(AmoroManagementConf.CLEAN_ORPHAN_FILES_ENABLED)) { this.orphanFilesCleaningExecutor = new OrphanFilesCleaningExecutor( @@ -98,10 +90,6 @@ public void setup(TableService tableService, Configurations conf) { } } - public SnapshotsExpiringExecutor getSnapshotsExpiringExecutor() { - return snapshotsExpiringExecutor; - } - public TableRuntimeRefreshExecutor getTableRefreshingExecutor() { return tableRefreshingExecutor; } diff --git a/amoro-ams/src/main/resources/META-INF/services/org.apache.amoro.process.ProcessFactory b/amoro-ams/src/main/resources/META-INF/services/org.apache.amoro.process.ProcessFactory new file mode 100755 index 0000000000..bf27d9016d --- /dev/null +++ b/amoro-ams/src/main/resources/META-INF/services/org.apache.amoro.process.ProcessFactory @@ -0,0 +1 @@ +org.apache.amoro.server.process.iceberg.IcebergProcessFactory diff --git a/amoro-ams/src/main/resources/META-INF/services/org.apache.amoro.server.process.executor.ExecuteEngine b/amoro-ams/src/main/resources/META-INF/services/org.apache.amoro.server.process.executor.ExecuteEngine new file mode 100755 index 0000000000..601d7c06ee --- /dev/null +++ b/amoro-ams/src/main/resources/META-INF/services/org.apache.amoro.server.process.executor.ExecuteEngine @@ -0,0 +1 @@ +org.apache.amoro.server.process.executor.LocalExecutionEngine diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestConfigurableIntervalExecutors.java b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestConfigurableIntervalExecutors.java index 3a20c6101a..d006c7ff20 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestConfigurableIntervalExecutors.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestConfigurableIntervalExecutors.java @@ -66,34 +66,55 @@ public void testDanglingDeleteFilesShouldExecuteAfterInterval() { } @Test - public void testSnapshotsExpiringDefaultInterval() { + public void testExpireSnapshotsSchedulerInterval() { Duration interval = Duration.ofHours(1); - SnapshotsExpiringExecutor executor = new SnapshotsExpiringExecutor(null, 1, interval); + TestableActionCoordinatorScheduler scheduler = newExpireSnapshotsScheduler(interval); TableRuntime tableRuntime = Mockito.mock(TableRuntime.class); - Assert.assertEquals( - Duration.ofHours(1).toMillis(), executor.getNextExecutingTime(tableRuntime)); - } - - @Test - public void testSnapshotsExpiringCustomInterval() { - Duration interval = Duration.ofMinutes(30); - SnapshotsExpiringExecutor executor = new SnapshotsExpiringExecutor(null, 1, interval); - - TableRuntime tableRuntime = Mockito.mock(TableRuntime.class); - Assert.assertEquals( - Duration.ofMinutes(30).toMillis(), executor.getNextExecutingTime(tableRuntime)); + Assert.assertEquals(interval.toMillis(), scheduler.getNextExecutingTimePublic(tableRuntime)); } @Test - public void testSnapshotsExpiringShouldExecuteAfterInterval() { + public void testExpireSnapshotsSchedulerShouldExecuteAfterInterval() { Duration interval = Duration.ofHours(2); - SnapshotsExpiringExecutor executor = new SnapshotsExpiringExecutor(null, 1, interval); + TestableActionCoordinatorScheduler scheduler = newExpireSnapshotsScheduler(interval); long now = System.currentTimeMillis(); // 3 hours ago - should execute - Assert.assertTrue(executor.shouldExecute(now - Duration.ofHours(3).toMillis())); + Assert.assertTrue(scheduler.shouldExecutePublic(now - Duration.ofHours(3).toMillis())); // 1 hour ago - should not execute - Assert.assertFalse(executor.shouldExecute(now - Duration.ofHours(1).toMillis())); + Assert.assertFalse(scheduler.shouldExecutePublic(now - Duration.ofHours(1).toMillis())); + } + + private TestableActionCoordinatorScheduler newExpireSnapshotsScheduler(Duration interval) { + org.apache.amoro.process.ActionCoordinator coordinator = + Mockito.mock(org.apache.amoro.process.ActionCoordinator.class); + + Mockito.when(coordinator.action()).thenReturn(org.apache.amoro.IcebergActions.EXPIRE_SNAPSHOTS); + Mockito.when(coordinator.parallelism()).thenReturn(1); + Mockito.when(coordinator.getExecutorDelay()).thenReturn(interval.toMillis()); + Mockito.when(coordinator.getNextExecutingTime(Mockito.any())).thenReturn(interval.toMillis()); + Mockito.when(coordinator.formatSupported(Mockito.any())).thenReturn(true); + Mockito.when(coordinator.enabled(Mockito.any())).thenReturn(true); + Mockito.when(coordinator.trigger(Mockito.any())).thenReturn(java.util.Optional.empty()); + + return new TestableActionCoordinatorScheduler(coordinator); + } + + private static class TestableActionCoordinatorScheduler + extends org.apache.amoro.server.process.ActionCoordinatorScheduler { + + private TestableActionCoordinatorScheduler( + org.apache.amoro.process.ActionCoordinator coordinator) { + super(coordinator, null, null); + } + + long getNextExecutingTimePublic(TableRuntime tableRuntime) { + return getNextExecutingTime(tableRuntime); + } + + boolean shouldExecutePublic(long lastCleanupEndTime) { + return shouldExecute(lastCleanupEndTime); + } } } diff --git a/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java b/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java index c75c5ac8d0..7b83192602 100644 --- a/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java +++ b/amoro-common/src/main/java/org/apache/amoro/IcebergActions.java @@ -28,4 +28,5 @@ public class IcebergActions { public static final Action DELETE_ORPHANS = Action.register("delete-orphans"); public static final Action SYNC_HIVE = Action.register("sync-hive"); public static final Action EXPIRE_DATA = Action.register("expire-data"); + public static final Action EXPIRE_SNAPSHOTS = Action.register("expire-snapshots"); } diff --git a/amoro-common/src/main/java/org/apache/amoro/process/LocalProcess.java b/amoro-common/src/main/java/org/apache/amoro/process/LocalProcess.java new file mode 100755 index 0000000000..7dc358ad88 --- /dev/null +++ b/amoro-common/src/main/java/org/apache/amoro/process/LocalProcess.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.process; + +/** + * A {@link TableProcess} that can be executed locally by a thread pool. + * + *

Local processes are executed by {@code ExecuteEngine} implementations which run the logic + * inside the AMS JVM. + */ +public interface LocalProcess extends AmoroProcess { + + /** Execute process logic locally. */ + void run(); + + /** + * Tag used by local execution engines to select a thread pool. + * + * @return pool tag + */ + default String tag() { + return "default"; + } +} diff --git a/dist/src/main/amoro-bin/conf/plugins/execute-engines.yaml b/dist/src/main/amoro-bin/conf/plugins/execute-engines.yaml new file mode 100755 index 0000000000..a9ed821619 --- /dev/null +++ b/dist/src/main/amoro-bin/conf/plugins/execute-engines.yaml @@ -0,0 +1,24 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +execute-engines: + - name: local + enabled: true + priority: 100 + properties: + default.thread-count: "10" + snapshots-expiring.thread-count: "10" diff --git a/dist/src/main/amoro-bin/conf/plugins/process-factories.yaml b/dist/src/main/amoro-bin/conf/plugins/process-factories.yaml new file mode 100755 index 0000000000..03ba377a8d --- /dev/null +++ b/dist/src/main/amoro-bin/conf/plugins/process-factories.yaml @@ -0,0 +1,25 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +process-factories: + - name: iceberg + enabled: true + priority: 100 + properties: + expire-snapshots.enabled: "true" + expire-snapshots.interval: "1h" + expire-snapshots.thread-count: "10" From 902cb4eb9857ca4d8e0d7cbd022ac97a248844c8 Mon Sep 17 00:00:00 2001 From: "zhangyongxiang.alpha" Date: Fri, 13 Mar 2026 15:47:44 +0800 Subject: [PATCH 2/8] Refactor --- .../iceberg/IcebergProcessFactory.java | 122 +++++++----------- .../iceberg/SnapshotsExpiringProcess.java | 28 +++- ...moro.server.process.executor.ExecuteEngine | 2 +- .../apache/amoro/config/Configurations.java | 13 ++ .../amoro/process}/LocalExecutionEngine.java | 77 ++++++----- .../conf/plugins/execute-engines.yaml | 4 +- .../conf/plugins/process-factories.yaml | 1 - 7 files changed, 128 insertions(+), 119 deletions(-) rename {amoro-ams/src/main/java/org/apache/amoro/server/process/executor => amoro-common/src/main/java/org/apache/amoro/process}/LocalExecutionEngine.java (68%) diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java index e768cf353b..ddebac910e 100755 --- a/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java @@ -22,91 +22,81 @@ import org.apache.amoro.IcebergActions; import org.apache.amoro.TableFormat; import org.apache.amoro.TableRuntime; -import org.apache.amoro.config.ConfigHelpers; +import org.apache.amoro.config.ConfigOption; +import org.apache.amoro.config.ConfigOptions; +import org.apache.amoro.config.Configurations; +import org.apache.amoro.process.ExecuteEngine; +import org.apache.amoro.process.LocalExecutionEngine; import org.apache.amoro.process.ProcessFactory; import org.apache.amoro.process.ProcessTriggerStrategy; import org.apache.amoro.process.RecoverProcessFailedException; import org.apache.amoro.process.TableProcess; import org.apache.amoro.process.TableProcessStore; -import org.apache.amoro.server.process.DefaultTableProcessStore; -import org.apache.amoro.server.process.TableProcessMeta; -import org.apache.amoro.server.process.executor.LocalExecutionEngine; -import org.apache.amoro.server.utils.SnowflakeIdGenerator; +import org.apache.amoro.shade.guava32.com.google.common.collect.Lists; +import org.apache.amoro.shade.guava32.com.google.common.collect.Maps; +import org.apache.commons.lang3.tuple.Pair; import java.time.Duration; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; +import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.stream.Collectors; /** Default process factory for Iceberg-related maintenance actions in AMS. */ public class IcebergProcessFactory implements ProcessFactory { public static final String PLUGIN_NAME = "iceberg"; + public static final ConfigOption SNAPSHOT_EXPIRE_ENABLED = + ConfigOptions.key("expire-snapshots.enabled").booleanType().defaultValue(true); - private final SnowflakeIdGenerator idGenerator = new SnowflakeIdGenerator(); + public static final ConfigOption SNAPSHOT_EXPIRE_INTERVAL = + ConfigOptions.key("expire-snapshot.interval") + .durationType() + .defaultValue(Duration.ofHours(1)); - private boolean expireSnapshotsEnabled = true; - private int expireSnapshotsThreadCount = 10; - private Duration expireSnapshotsInterval = Duration.ofHours(1); + private ExecuteEngine localEngine; + private final Map actions = Maps.newHashMap(); + private final List formats = + Lists.newArrayList(TableFormat.ICEBERG, TableFormat.MIXED_ICEBERG, TableFormat.MIXED_HIVE); + + @Override + public void availableExecuteEngines(Collection allAvailableEngines) { + for (ExecuteEngine engine : allAvailableEngines) { + if (engine instanceof LocalExecutionEngine) { + this.localEngine = engine; + } + } + } @Override public Map> supportedActions() { - Set actions = new HashSet<>(); - actions.add(IcebergActions.EXPIRE_SNAPSHOTS); - - Map> supported = new HashMap<>(); - supported.put(TableFormat.ICEBERG, actions); - supported.put(TableFormat.MIXED_ICEBERG, actions); - supported.put(TableFormat.MIXED_HIVE, actions); - return supported; + return formats.stream() + .map(f -> Pair.of(f, actions.keySet())) + .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); } @Override public ProcessTriggerStrategy triggerStrategy(TableFormat format, Action action) { - if (IcebergActions.EXPIRE_SNAPSHOTS.equals(action)) { - return new ProcessTriggerStrategy( - expireSnapshotsInterval, false, Math.max(expireSnapshotsThreadCount, 1)); - } - - return ProcessTriggerStrategy.METADATA_TRIGGER; + return actions.getOrDefault(action, ProcessTriggerStrategy.METADATA_TRIGGER); } @Override public Optional trigger(TableRuntime tableRuntime, Action action) { - if (IcebergActions.EXPIRE_SNAPSHOTS.equals(action) - && (!expireSnapshotsEnabled - || !tableRuntime.getTableConfiguration().isExpireSnapshotEnabled())) { + if (!actions.containsKey(action)) { return Optional.empty(); } - long processId = idGenerator.generateId(); - TableProcessMeta meta = - TableProcessMeta.of( - processId, - tableRuntime.getTableIdentifier().getId(), - action.getName(), - LocalExecutionEngine.ENGINE_NAME, - Collections.emptyMap()); - - TableProcessStore store = new DefaultTableProcessStore(tableRuntime, meta, action); - if (IcebergActions.EXPIRE_SNAPSHOTS.equals(action)) { - return Optional.of(new SnapshotsExpiringProcess(tableRuntime, store)); + return triggerExpireSnapshot(tableRuntime); } - return Optional.empty(); } @Override public TableProcess recover(TableRuntime tableRuntime, TableProcessStore store) throws RecoverProcessFailedException { - if (IcebergActions.EXPIRE_SNAPSHOTS.equals(store.getAction())) { - return new SnapshotsExpiringProcess(tableRuntime, store); - } - throw new RecoverProcessFailedException( "Unsupported action for IcebergProcessFactory: " + store.getAction()); } @@ -116,42 +106,20 @@ public void open(Map properties) { if (properties == null || properties.isEmpty()) { return; } - - expireSnapshotsEnabled = - parseBoolean(properties.get("expire-snapshots.enabled"), expireSnapshotsEnabled); - expireSnapshotsThreadCount = - parseInt(properties.get("expire-snapshots.thread-count"), expireSnapshotsThreadCount); - expireSnapshotsInterval = - parseDuration(properties.get("expire-snapshots.interval"), expireSnapshotsInterval); - } - - private boolean parseBoolean(String value, boolean defaultValue) { - if (value == null) { - return defaultValue; + Configurations configs = Configurations.fromMap(properties); + if (configs.getBoolean(SNAPSHOT_EXPIRE_ENABLED)) { + Duration interval = configs.getDuration(SNAPSHOT_EXPIRE_INTERVAL); + this.actions.put( + IcebergActions.EXPIRE_SNAPSHOTS, ProcessTriggerStrategy.triggerAtFixRate(interval)); } - return Boolean.parseBoolean(value.trim()); } - private int parseInt(String value, int defaultValue) { - if (value == null) { - return defaultValue; - } - try { - return Integer.parseInt(value.trim()); - } catch (NumberFormatException e) { - return defaultValue; + private Optional triggerExpireSnapshot(TableRuntime tableRuntime) { + if (localEngine == null) { + return Optional.empty(); } - } - private Duration parseDuration(String value, Duration defaultValue) { - if (value == null) { - return defaultValue; - } - try { - return ConfigHelpers.TimeUtils.parseDuration(value); - } catch (Exception e) { - return defaultValue; - } + return Optional.of(new SnapshotsExpiringProcess(tableRuntime, localEngine)); } @Override diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/SnapshotsExpiringProcess.java b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/SnapshotsExpiringProcess.java index e92e93ab51..ccc5639997 100755 --- a/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/SnapshotsExpiringProcess.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/SnapshotsExpiringProcess.java @@ -18,31 +18,35 @@ package org.apache.amoro.server.process.iceberg; +import org.apache.amoro.Action; import org.apache.amoro.AmoroTable; +import org.apache.amoro.IcebergActions; import org.apache.amoro.TableRuntime; import org.apache.amoro.maintainer.TableMaintainer; +import org.apache.amoro.process.ExecuteEngine; import org.apache.amoro.process.LocalProcess; import org.apache.amoro.process.TableProcess; -import org.apache.amoro.process.TableProcessStore; import org.apache.amoro.server.optimizing.maintainer.TableMaintainers; -import org.apache.amoro.server.process.executor.LocalExecutionEngine; import org.apache.amoro.server.table.DefaultTableRuntime; import org.apache.amoro.server.table.cleanup.CleanupOperation; +import org.apache.paimon.shade.guava30.com.google.common.collect.Maps; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Map; + /** Local table process for expiring Iceberg snapshots. */ public class SnapshotsExpiringProcess extends TableProcess implements LocalProcess { private static final Logger LOG = LoggerFactory.getLogger(SnapshotsExpiringProcess.class); - public SnapshotsExpiringProcess(TableRuntime tableRuntime, TableProcessStore store) { - super(tableRuntime, store); + public SnapshotsExpiringProcess(TableRuntime tableRuntime, ExecuteEngine engine) { + super(tableRuntime, engine); } @Override public String tag() { - return LocalExecutionEngine.SNAPSHOTS_EXPIRING_POOL; + return getAction().getName().toLowerCase(); } @Override @@ -62,5 +66,17 @@ public void run() { } @Override - protected void closeInternal() {} + public Action getAction() { + return IcebergActions.EXPIRE_SNAPSHOTS; + } + + @Override + public Map getProcessParameters() { + return Maps.newHashMap(); + } + + @Override + public Map getSummary() { + return Maps.newHashMap(); + } } diff --git a/amoro-ams/src/main/resources/META-INF/services/org.apache.amoro.server.process.executor.ExecuteEngine b/amoro-ams/src/main/resources/META-INF/services/org.apache.amoro.server.process.executor.ExecuteEngine index 601d7c06ee..936de558ad 100755 --- a/amoro-ams/src/main/resources/META-INF/services/org.apache.amoro.server.process.executor.ExecuteEngine +++ b/amoro-ams/src/main/resources/META-INF/services/org.apache.amoro.server.process.executor.ExecuteEngine @@ -1 +1 @@ -org.apache.amoro.server.process.executor.LocalExecutionEngine +org.apache.amoro.process.LocalExecutionEngine diff --git a/amoro-common/src/main/java/org/apache/amoro/config/Configurations.java b/amoro-common/src/main/java/org/apache/amoro/config/Configurations.java index 74ecf61a12..baaed361c5 100644 --- a/amoro-common/src/main/java/org/apache/amoro/config/Configurations.java +++ b/amoro-common/src/main/java/org/apache/amoro/config/Configurations.java @@ -548,6 +548,19 @@ public long getDurationInMillis(ConfigOption option) { return result; } + public Duration getDuration(ConfigOption option) { + try { + return getOptional(option).orElseGet(option::defaultValue); + } catch (Exception e) { // may be throw java.lang.ArithmeticException: long overflow + throw new ConfigurationException( + option.key(), + String.format( + "Exception when converting duration for config option '%s': %s", + option.key(), e.getMessage()), + e); + } + } + public Optional getOptional(ConfigOption option) { Optional rawValue = getRawValueFromOption(option); Class clazz = option.getClazz(); diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/process/executor/LocalExecutionEngine.java b/amoro-common/src/main/java/org/apache/amoro/process/LocalExecutionEngine.java similarity index 68% rename from amoro-ams/src/main/java/org/apache/amoro/server/process/executor/LocalExecutionEngine.java rename to amoro-common/src/main/java/org/apache/amoro/process/LocalExecutionEngine.java index 3ebd552bcb..ed126af9cb 100755 --- a/amoro-ams/src/main/java/org/apache/amoro/server/process/executor/LocalExecutionEngine.java +++ b/amoro-common/src/main/java/org/apache/amoro/process/LocalExecutionEngine.java @@ -16,31 +16,44 @@ * limitations under the License. */ -package org.apache.amoro.server.process.executor; +package org.apache.amoro.process; -import org.apache.amoro.process.LocalProcess; -import org.apache.amoro.process.ProcessStatus; -import org.apache.amoro.process.TableProcess; +import org.apache.amoro.config.ConfigOption; +import org.apache.amoro.config.ConfigOptions; +import org.apache.amoro.config.Configurations; +import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; import org.apache.amoro.shade.guava32.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Map; +import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; /** - * Local execution engine that runs {@link LocalProcess} instances in AMS thread pools. + * Local execution engine that runs {@link org.apache.amoro.process.LocalProcess} instances in AMS + * thread pools. * - *

The engine maintains multiple thread pools keyed by {@link LocalProcess#tag()}. + *

The engine maintains multiple thread pools keyed by {@link + * org.apache.amoro.process.LocalProcess#tag()}. */ public class LocalExecutionEngine implements ExecuteEngine { + private static final Logger LOG = LoggerFactory.getLogger(LocalExecutionEngine.class); public static final String ENGINE_NAME = "local"; public static final String DEFAULT_POOL = "default"; - public static final String SNAPSHOTS_EXPIRING_POOL = "snapshots-expiring"; + public static final String POOL_CONFIG_PREFIX = "pool."; + public static final String POOL_SIZE_SUFFIX = ".thread-count"; + public static final ConfigOption DEFAULT_POOL_SIZE = + ConfigOptions.key(POOL_CONFIG_PREFIX + DEFAULT_POOL + POOL_SIZE_SUFFIX) + .intType() + .defaultValue(10); private final Map pools = new ConcurrentHashMap<>(); private final Map> activeInstances = new ConcurrentHashMap<>(); @@ -95,7 +108,7 @@ public String submitTableProcess(TableProcess tableProcess) { LocalProcess localProcess = (LocalProcess) tableProcess; String identifier = UUID.randomUUID().toString(); - ThreadPoolExecutor executor = getOrCreatePool(localProcess.tag()); + ThreadPoolExecutor executor = getPool(localProcess.tag()); Future future = executor.submit( () -> { @@ -136,16 +149,28 @@ public ProcessStatus tryCancelTableProcess(TableProcess tableProcess, String pro @Override public void open(Map properties) { - String defaultSizeValue = properties == null ? null : properties.get("default.thread-count"); - int defaultSize = parseInt(defaultSizeValue, 10); + Configurations configs = Configurations.fromMap(properties); + int defaultSize = configs.getInteger(DEFAULT_POOL_SIZE); pools.put(DEFAULT_POOL, newFixedPool(DEFAULT_POOL, defaultSize)); - String snapshotsExpiringSizeValue = - properties == null ? null : properties.get("snapshots-expiring.thread-count"); - int snapshotsExpiringSize = parseInt(snapshotsExpiringSizeValue, defaultSize); - pools.put( - SNAPSHOTS_EXPIRING_POOL, - newFixedPool(SNAPSHOTS_EXPIRING_POOL, Math.max(snapshotsExpiringSize, 1))); + Set customPools = + properties.keySet().stream() + .filter(key -> key.startsWith(POOL_CONFIG_PREFIX)) + .map(key -> key.substring(POOL_CONFIG_PREFIX.length())) + .map(key -> key.substring(0, key.indexOf(".") + 1)) + .collect(Collectors.toSet()); + + customPools.forEach( + name -> { + ConfigOption poolSizeOpt = + ConfigOptions.key(POOL_CONFIG_PREFIX + name + POOL_SIZE_SUFFIX) + .intType() + .defaultValue(-1); + int size = configs.getInteger(poolSizeOpt); + Preconditions.checkArgument(size > 0, "Pool thread-count is not configured for %s", name); + pools.put(name, newFixedPool(name, size)); + LOG.info("Initialize local execute pool:{} with size:{}", name, size); + }); } @Override @@ -161,12 +186,11 @@ public String name() { return ENGINE_NAME; } - private ThreadPoolExecutor getOrCreatePool(String tag) { - if (tag == null || tag.isEmpty()) { - tag = DEFAULT_POOL; + private ThreadPoolExecutor getPool(String tag) { + if (pools.containsKey(tag)) { + return pools.get(tag); } - - return pools.computeIfAbsent(tag, t -> newFixedPool(t, 10)); + return pools.get(DEFAULT_POOL); } private ThreadPoolExecutor newFixedPool(String tag, int size) { @@ -178,15 +202,4 @@ private ThreadPoolExecutor newFixedPool(String tag, int size) { new LinkedBlockingQueue<>(), new ThreadFactoryBuilder().setDaemon(false).setNameFormat("local-" + tag + "-%d").build()); } - - private int parseInt(String value, int defaultValue) { - if (value == null) { - return defaultValue; - } - try { - return Integer.parseInt(value); - } catch (NumberFormatException e) { - return defaultValue; - } - } } diff --git a/dist/src/main/amoro-bin/conf/plugins/execute-engines.yaml b/dist/src/main/amoro-bin/conf/plugins/execute-engines.yaml index a9ed821619..5c3199f320 100755 --- a/dist/src/main/amoro-bin/conf/plugins/execute-engines.yaml +++ b/dist/src/main/amoro-bin/conf/plugins/execute-engines.yaml @@ -20,5 +20,5 @@ execute-engines: enabled: true priority: 100 properties: - default.thread-count: "10" - snapshots-expiring.thread-count: "10" + pool.default.thread-count: 10 + pool.snapshots-expiring.thread-count: 10 diff --git a/dist/src/main/amoro-bin/conf/plugins/process-factories.yaml b/dist/src/main/amoro-bin/conf/plugins/process-factories.yaml index 03ba377a8d..e1455c2cf1 100755 --- a/dist/src/main/amoro-bin/conf/plugins/process-factories.yaml +++ b/dist/src/main/amoro-bin/conf/plugins/process-factories.yaml @@ -22,4 +22,3 @@ process-factories: properties: expire-snapshots.enabled: "true" expire-snapshots.interval: "1h" - expire-snapshots.thread-count: "10" From 3f7310fbf7bb1368f976afc34708d402930b77bc Mon Sep 17 00:00:00 2001 From: "zhangyongxiang.alpha" Date: Fri, 13 Mar 2026 15:53:15 +0800 Subject: [PATCH 3/8] fixed --- .../amoro/server/process/iceberg/SnapshotsExpiringProcess.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/SnapshotsExpiringProcess.java b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/SnapshotsExpiringProcess.java index ccc5639997..8bea04ae7c 100755 --- a/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/SnapshotsExpiringProcess.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/SnapshotsExpiringProcess.java @@ -29,7 +29,7 @@ import org.apache.amoro.server.optimizing.maintainer.TableMaintainers; import org.apache.amoro.server.table.DefaultTableRuntime; import org.apache.amoro.server.table.cleanup.CleanupOperation; -import org.apache.paimon.shade.guava30.com.google.common.collect.Maps; +import org.apache.amoro.shade.guava32.com.google.common.collect.Maps; import org.slf4j.Logger; import org.slf4j.LoggerFactory; From 167f8f39043c945a1dd996dba588df8e082ebe1e Mon Sep 17 00:00:00 2001 From: "zhangyongxiang.alpha" Date: Fri, 13 Mar 2026 16:38:46 +0800 Subject: [PATCH 4/8] check --- .../iceberg/IcebergProcessFactory.java | 10 +++++++++- .../iceberg/SnapshotsExpiringProcess.java | 8 +++----- .../server/table/AbstractTableRuntime.java | 12 +++++++++++ .../server/table/DefaultTableRuntime.java | 2 +- .../cleanup/TableRuntimeCleanupState.java | 3 ++- .../org.apache.amoro.process.ProcessFactory | 18 +++++++++++++++++ ...moro.server.process.executor.ExecuteEngine | 1 - .../java/org/apache/amoro/TableRuntime.java | 20 +++++++++++++++++++ .../org.apache.amoro.process.ExecuteEngine | 19 ++++++++++++++++++ 9 files changed, 84 insertions(+), 9 deletions(-) delete mode 100755 amoro-ams/src/main/resources/META-INF/services/org.apache.amoro.server.process.executor.ExecuteEngine create mode 100755 amoro-common/src/main/resources/META-INF/services/org.apache.amoro.process.ExecuteEngine diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java index ddebac910e..3ebec95fff 100755 --- a/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/IcebergProcessFactory.java @@ -32,6 +32,7 @@ import org.apache.amoro.process.RecoverProcessFailedException; import org.apache.amoro.process.TableProcess; import org.apache.amoro.process.TableProcessStore; +import org.apache.amoro.server.table.DefaultTableRuntime; import org.apache.amoro.shade.guava32.com.google.common.collect.Lists; import org.apache.amoro.shade.guava32.com.google.common.collect.Maps; import org.apache.commons.lang3.tuple.Pair; @@ -115,7 +116,14 @@ public void open(Map properties) { } private Optional triggerExpireSnapshot(TableRuntime tableRuntime) { - if (localEngine == null) { + if (localEngine == null || !tableRuntime.getTableConfiguration().isExpireSnapshotEnabled()) { + return Optional.empty(); + } + + long lastExecuteTime = + tableRuntime.getState(DefaultTableRuntime.CLEANUP_STATE_KEY).getLastSnapshotsExpiringTime(); + ProcessTriggerStrategy strategy = actions.get(IcebergActions.EXPIRE_SNAPSHOTS); + if (System.currentTimeMillis() - lastExecuteTime < strategy.getTriggerInterval().toMillis()) { return Optional.empty(); } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/SnapshotsExpiringProcess.java b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/SnapshotsExpiringProcess.java index 8bea04ae7c..9f9503614c 100755 --- a/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/SnapshotsExpiringProcess.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/process/iceberg/SnapshotsExpiringProcess.java @@ -28,7 +28,6 @@ import org.apache.amoro.process.TableProcess; import org.apache.amoro.server.optimizing.maintainer.TableMaintainers; import org.apache.amoro.server.table.DefaultTableRuntime; -import org.apache.amoro.server.table.cleanup.CleanupOperation; import org.apache.amoro.shade.guava32.com.google.common.collect.Maps; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,10 +57,9 @@ public void run() { } catch (Throwable t) { LOG.error("unexpected expire error of table {} ", tableRuntime.getTableIdentifier(), t); } finally { - if (tableRuntime instanceof DefaultTableRuntime) { - ((DefaultTableRuntime) tableRuntime) - .updateLastCleanTime(CleanupOperation.SNAPSHOTS_EXPIRING, System.currentTimeMillis()); - } + tableRuntime.updateState( + DefaultTableRuntime.CLEANUP_STATE_KEY, + cleanUp -> cleanUp.setLastSnapshotsExpiringTime(System.currentTimeMillis())); } } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/AbstractTableRuntime.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/AbstractTableRuntime.java index 05e17adc09..11f5f2f563 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/AbstractTableRuntime.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/AbstractTableRuntime.java @@ -25,12 +25,14 @@ import org.apache.amoro.server.persistence.PersistentBase; import org.apache.amoro.shade.guava32.com.google.common.collect.Lists; import org.apache.amoro.shade.zookeeper3.org.apache.curator.shaded.com.google.common.collect.Maps; +import org.apache.amoro.table.StateKey; import org.apache.amoro.table.TableRuntimeStore; import java.util.List; import java.util.Map; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Function; import java.util.stream.Collectors; public abstract class AbstractTableRuntime extends PersistentBase @@ -62,6 +64,16 @@ public Map getTableConfig() { return store().getTableConfig(); } + @Override + public T getState(StateKey key) { + return store().getState(key); + } + + @Override + public void updateState(StateKey key, Function updater) { + store().begin().updateState(key, updater).commit(); + } + @Override public List getProcessStates() { return processContainerMap.values().stream() diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java index ee23ab4d3d..9b4b56a95c 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntime.java @@ -74,7 +74,7 @@ public class DefaultTableRuntime extends AbstractTableRuntime { .jsonType(AbstractOptimizingEvaluator.PendingInput.class) .defaultValue(new AbstractOptimizingEvaluator.PendingInput()); - private static final StateKey CLEANUP_STATE_KEY = + public static final StateKey CLEANUP_STATE_KEY = StateKey.stateKey("cleanup_state") .jsonType(TableRuntimeCleanupState.class) .defaultValue(new TableRuntimeCleanupState()); diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/TableRuntimeCleanupState.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/TableRuntimeCleanupState.java index 9dfb98f6ed..f65ae387ca 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/TableRuntimeCleanupState.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/TableRuntimeCleanupState.java @@ -52,7 +52,8 @@ public long getLastSnapshotsExpiringTime() { return lastSnapshotsExpiringTime; } - public void setLastSnapshotsExpiringTime(long lastSnapshotsExpiringTime) { + public TableRuntimeCleanupState setLastSnapshotsExpiringTime(long lastSnapshotsExpiringTime) { this.lastSnapshotsExpiringTime = lastSnapshotsExpiringTime; + return this; } } diff --git a/amoro-ams/src/main/resources/META-INF/services/org.apache.amoro.process.ProcessFactory b/amoro-ams/src/main/resources/META-INF/services/org.apache.amoro.process.ProcessFactory index bf27d9016d..96baa8ebf6 100755 --- a/amoro-ams/src/main/resources/META-INF/services/org.apache.amoro.process.ProcessFactory +++ b/amoro-ams/src/main/resources/META-INF/services/org.apache.amoro.process.ProcessFactory @@ -1 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + org.apache.amoro.server.process.iceberg.IcebergProcessFactory diff --git a/amoro-ams/src/main/resources/META-INF/services/org.apache.amoro.server.process.executor.ExecuteEngine b/amoro-ams/src/main/resources/META-INF/services/org.apache.amoro.server.process.executor.ExecuteEngine deleted file mode 100755 index 936de558ad..0000000000 --- a/amoro-ams/src/main/resources/META-INF/services/org.apache.amoro.server.process.executor.ExecuteEngine +++ /dev/null @@ -1 +0,0 @@ -org.apache.amoro.process.LocalExecutionEngine diff --git a/amoro-common/src/main/java/org/apache/amoro/TableRuntime.java b/amoro-common/src/main/java/org/apache/amoro/TableRuntime.java index 1d96553fed..0059593e85 100644 --- a/amoro-common/src/main/java/org/apache/amoro/TableRuntime.java +++ b/amoro-common/src/main/java/org/apache/amoro/TableRuntime.java @@ -22,9 +22,11 @@ import org.apache.amoro.metrics.MetricRegistry; import org.apache.amoro.process.ProcessFactory; import org.apache.amoro.process.TableProcessStore; +import org.apache.amoro.table.StateKey; import java.util.List; import java.util.Map; +import java.util.function.Function; /** * TableRuntime is the key interface for the AMS framework to interact with the table. Typically, it @@ -81,6 +83,24 @@ public interface TableRuntime { */ Map getTableConfig(); + /** + * Get the value of table-runtime state + * + * @param key the state key + * @return value of the state + * @param state value type + */ + T getState(StateKey key); + + /** + * Update the state + * + * @param key key of state + * @param updater value updater of state + * @param value type of state. + */ + void updateState(StateKey key, Function updater); + /** * Register the metric of the table runtime. * diff --git a/amoro-common/src/main/resources/META-INF/services/org.apache.amoro.process.ExecuteEngine b/amoro-common/src/main/resources/META-INF/services/org.apache.amoro.process.ExecuteEngine new file mode 100755 index 0000000000..8859bb6d39 --- /dev/null +++ b/amoro-common/src/main/resources/META-INF/services/org.apache.amoro.process.ExecuteEngine @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +org.apache.amoro.process.LocalExecutionEngine From 21282823bbbebff331ae2b1a469c0a401acf6a33 Mon Sep 17 00:00:00 2001 From: "zhangyongxiang.alpha" Date: Fri, 13 Mar 2026 16:47:12 +0800 Subject: [PATCH 5/8] fix executeEngineManager --- .../org/apache/amoro/server/AmoroServiceContainer.java | 9 ++++++--- .../org/apache/amoro/server/process/ProcessService.java | 1 - 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java index ef3ad2642f..d36ec8bfb0 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java @@ -34,6 +34,7 @@ import org.apache.amoro.config.shade.utils.ConfigShadeUtils; import org.apache.amoro.exception.AmoroRuntimeException; import org.apache.amoro.process.ActionCoordinator; +import org.apache.amoro.process.ExecuteEngine; import org.apache.amoro.process.ProcessFactory; import org.apache.amoro.server.catalog.CatalogManager; import org.apache.amoro.server.catalog.DefaultCatalogManager; @@ -241,14 +242,16 @@ public void startOptimizingService() throws Exception { TableProcessFactoryManager tableProcessFactoryManager = new TableProcessFactoryManager(); tableProcessFactoryManager.initialize(); List processFactories = tableProcessFactoryManager.installedPlugins(); + ExecuteEngineManager executeEngineManager = new ExecuteEngineManager(); + executeEngineManager.initialize(); + List executeEngines = executeEngineManager.installedPlugins(); + processFactories.forEach( + c -> c.availableExecuteEngines(executeEngines)); DefaultTableRuntimeFactory defaultRuntimeFactory = new DefaultTableRuntimeFactory(); defaultRuntimeFactory.initialize(processFactories); List actionCoordinators = defaultRuntimeFactory.supportedCoordinators(); - ExecuteEngineManager executeEngineManager = new ExecuteEngineManager(); - processFactories.forEach( - c -> c.availableExecuteEngines(executeEngineManager.installedPlugins())); tableService = new DefaultTableService(serviceConfig, catalogManager, defaultRuntimeFactory); processService = new ProcessService(tableService, actionCoordinators, executeEngineManager); diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java b/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java index 01944e8a26..c3cac4efe2 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java @@ -128,7 +128,6 @@ private void initialize(List tableRuntimes) { actionCoordinator.action().getName(), new ActionCoordinatorScheduler(actionCoordinator, tableService, ProcessService.this)); } - executeEngineManager.initialize(); executeEngineManager .installedPlugins() .forEach( From ec455561165f4cc33e8ab9134f1be6811a4cf461 Mon Sep 17 00:00:00 2001 From: "zhangyongxiang.alpha" Date: Fri, 13 Mar 2026 16:59:13 +0800 Subject: [PATCH 6/8] spotless --- .../java/org/apache/amoro/server/AmoroServiceContainer.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java index d36ec8bfb0..94734d6089 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java @@ -245,8 +245,7 @@ public void startOptimizingService() throws Exception { ExecuteEngineManager executeEngineManager = new ExecuteEngineManager(); executeEngineManager.initialize(); List executeEngines = executeEngineManager.installedPlugins(); - processFactories.forEach( - c -> c.availableExecuteEngines(executeEngines)); + processFactories.forEach(c -> c.availableExecuteEngines(executeEngines)); DefaultTableRuntimeFactory defaultRuntimeFactory = new DefaultTableRuntimeFactory(); defaultRuntimeFactory.initialize(processFactories); From 3fe47646136767b464983285e16f86c36b1348c7 Mon Sep 17 00:00:00 2001 From: "zhangyongxiang.alpha" Date: Fri, 13 Mar 2026 17:18:04 +0800 Subject: [PATCH 7/8] fix ActionCoordinatorScheduler --- .../process/ActionCoordinatorScheduler.java | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/process/ActionCoordinatorScheduler.java b/amoro-ams/src/main/java/org/apache/amoro/server/process/ActionCoordinatorScheduler.java index 47b49ff1d1..038dd24718 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/process/ActionCoordinatorScheduler.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/process/ActionCoordinatorScheduler.java @@ -18,7 +18,6 @@ package org.apache.amoro.server.process; -import org.apache.amoro.IcebergActions; import org.apache.amoro.TableFormat; import org.apache.amoro.TableRuntime; import org.apache.amoro.process.ActionCoordinator; @@ -26,7 +25,6 @@ import org.apache.amoro.process.TableProcessStore; import org.apache.amoro.server.scheduler.PeriodicTableScheduler; import org.apache.amoro.server.table.TableService; -import org.apache.amoro.server.table.cleanup.CleanupOperation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -116,22 +114,6 @@ protected TableProcess recover(TableRuntime tableRuntime, TableProcessStore proc return coordinator.recoverTableProcess(tableRuntime, processStore); } - @Override - protected CleanupOperation getCleanupOperation() { - if (IcebergActions.EXPIRE_SNAPSHOTS.equals(coordinator.action())) { - return CleanupOperation.SNAPSHOTS_EXPIRING; - } - return CleanupOperation.NONE; - } - - @Override - protected boolean shouldExecute(Long lastCleanupEndTime) { - if (getCleanupOperation() == CleanupOperation.SNAPSHOTS_EXPIRING) { - return System.currentTimeMillis() - lastCleanupEndTime >= intervalMillis; - } - return super.shouldExecute(lastCleanupEndTime); - } - @Override protected long getExecutorDelay() { return coordinator.getExecutorDelay(); From 7330b1705477e66d7b448b57fc14530ccc32627d Mon Sep 17 00:00:00 2001 From: "zhangyongxiang.alpha" Date: Fri, 13 Mar 2026 17:19:36 +0800 Subject: [PATCH 8/8] fix ActionCoordinatorScheduler --- .../apache/amoro/server/process/ActionCoordinatorScheduler.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/process/ActionCoordinatorScheduler.java b/amoro-ams/src/main/java/org/apache/amoro/server/process/ActionCoordinatorScheduler.java index 038dd24718..4e9f79a352 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/process/ActionCoordinatorScheduler.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/process/ActionCoordinatorScheduler.java @@ -41,14 +41,12 @@ public class ActionCoordinatorScheduler extends PeriodicTableScheduler { private final ActionCoordinator coordinator; private final ProcessService processService; - private final long intervalMillis; public ActionCoordinatorScheduler( ActionCoordinator coordinator, TableService tableService, ProcessService processService) { super(coordinator.action(), tableService, coordinator.parallelism()); this.coordinator = coordinator; this.processService = processService; - this.intervalMillis = coordinator.getExecutorDelay(); } /**