From 020e3e7af7556b1e02dca3bf1d1cd9723416fa80 Mon Sep 17 00:00:00 2001 From: "zhangyongxiang.alpha" Date: Thu, 26 Feb 2026 12:42:28 +0800 Subject: [PATCH 1/7] feat(ams): load process factories via DefaultTableRuntimeFactory ## Summary Wire the new process plugin model into AMS so that table processes are discovered from `ProcessFactory` implementations and scheduled via the existing `ProcessService`. ## Details - Extend `DefaultTableRuntimeFactory` to implement the `TableRuntimeFactory` process APIs: - Aggregate installed `ProcessFactory` instances by `TableFormat` / `Action` and expose derived `ActionCoordinator` plugins via `supportedCoordinators()` - Merge `DefaultTableRuntime.REQUIRED_STATES` with additional states required by all process factories for the same format when building `TableRuntimeCreator` - Keep using `DefaultTableRuntime` without introducing extra runtime types - Introduce `DefaultActionCoordinator` to bridge `ProcessFactory` trigger/recover semantics to the existing scheduler: - Build trigger strategies from `ProcessFactory#triggerStrategy` - Delegate `trigger` and `recoverTableProcess` to the underlying factory - Add `TableProcessFactoryManager` as an `AbstractPluginManager` using the `process-factories` plugin namespace - Refactor `AmoroServiceContainer` startup sequence: - Initialize `TableProcessFactoryManager` and collect all installed `ProcessFactory` instances - Initialize all `TableRuntimeFactory` plugins with the shared list of process factories - Collect all derived `ActionCoordinator`s from table runtime factories and inject them into `ProcessService` - Update `ProcessService` to accept a pre-built list of `ActionCoordinator`s while keeping the original constructors for backward compatibility and tests - Run `mvn spotless:apply` and `mvn -pl amoro-ams -am -DskipTests compile` to ensure style and compilation pass Co-Authored-By: Aime Change-Id: Iaa867503c8b0bf93c2b7f0b8fe7d752e2ddbac67 --- .../amoro/server/AmoroServiceContainer.java | 24 +++- .../amoro/server/process/ProcessService.java | 32 +++-- .../process/TableProcessFactoryManager.java | 36 +++++ .../table/DefaultActionCoordinator.java | 124 ++++++++++++++++++ .../table/DefaultTableRuntimeFactory.java | 101 ++++++++++++-- 5 files changed, 291 insertions(+), 26 deletions(-) create mode 100755 amoro-ams/src/main/java/org/apache/amoro/server/process/TableProcessFactoryManager.java create mode 100755 amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultActionCoordinator.java diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java index 4fd3e5e9af..406a55ccb9 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java @@ -32,6 +32,8 @@ import org.apache.amoro.config.Configurations; import org.apache.amoro.config.shade.utils.ConfigShadeUtils; import org.apache.amoro.exception.AmoroRuntimeException; +import org.apache.amoro.process.ActionCoordinator; +import org.apache.amoro.process.ProcessFactory; import org.apache.amoro.server.catalog.CatalogManager; import org.apache.amoro.server.catalog.DefaultCatalogManager; import org.apache.amoro.server.dashboard.DashboardServer; @@ -47,6 +49,8 @@ import org.apache.amoro.server.persistence.HttpSessionHandlerFactory; import org.apache.amoro.server.persistence.SqlSessionFactoryProvider; import org.apache.amoro.server.process.ProcessService; +import org.apache.amoro.server.process.ProcessService.ExecuteEngineManager; +import org.apache.amoro.server.process.TableProcessFactoryManager; import org.apache.amoro.server.resource.ContainerMetadata; import org.apache.amoro.server.resource.Containers; import org.apache.amoro.server.resource.DefaultOptimizerManager; @@ -75,6 +79,7 @@ import org.apache.amoro.shade.thrift.org.apache.thrift.transport.TTransportException; import org.apache.amoro.shade.thrift.org.apache.thrift.transport.TTransportFactory; import org.apache.amoro.shade.thrift.org.apache.thrift.transport.layered.TFramedTransport; +import org.apache.amoro.table.TableRuntimeFactory; import org.apache.amoro.utils.IcebergThreadPools; import org.apache.amoro.utils.JacksonUtil; import org.apache.commons.lang3.StringUtils; @@ -96,6 +101,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; +import java.util.stream.Collectors; public class AmoroServiceContainer { @@ -238,7 +244,23 @@ public void startOptimizingService() throws Exception { optimizingService = new DefaultOptimizingService(serviceConfig, catalogManager, optimizerManager, tableService); - processService = new ProcessService(serviceConfig, tableService); + // Load process factories and build action coordinators from all table runtime factories. + TableProcessFactoryManager tableProcessFactoryManager = new TableProcessFactoryManager(); + tableProcessFactoryManager.initialize(); + List processFactories = tableProcessFactoryManager.installedPlugins(); + + List tableRuntimeFactories = tableRuntimeFactoryManager.installedPlugins(); + tableRuntimeFactories.forEach(factory -> factory.initialize(processFactories)); + + List actionCoordinators = + tableRuntimeFactories.stream() + .flatMap(factory -> factory.supportedCoordinators().stream()) + .collect(Collectors.toList()); + + ExecuteEngineManager executeEngineManager = new ExecuteEngineManager(); + + processService = + new ProcessService(serviceConfig, tableService, actionCoordinators, executeEngineManager); LOG.info("Setting up AMS table executors..."); InlineTableExecutors.getInstance().setup(tableService, serviceConfig); diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java b/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java index 252d1574bf..53de2fb3d6 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java @@ -64,8 +64,8 @@ public class ProcessService extends PersistentBase { new ConcurrentHashMap<>(); private final Map executeEngines = new ConcurrentHashMap<>(); - private final ActionCoordinatorManager actionCoordinatorManager; private final ExecuteEngineManager executeEngineManager; + private final List actionCoordinatorList; private final ProcessRuntimeHandler tableRuntimeHandler = new ProcessRuntimeHandler(); private final ThreadPoolExecutor processExecutionPool = new ThreadPoolExecutor(10, 100, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>()); @@ -74,16 +74,25 @@ public class ProcessService extends PersistentBase { new ConcurrentHashMap<>(); public ProcessService(Configurations serviceConfig, TableService tableService) { - this(serviceConfig, tableService, new ActionCoordinatorManager(), new ExecuteEngineManager()); + this(serviceConfig, tableService, Collections.emptyList(), new ExecuteEngineManager()); } + @Deprecated public ProcessService( Configurations serviceConfig, TableService tableService, ActionCoordinatorManager actionCoordinatorManager, ExecuteEngineManager executeEngineManager) { + this(serviceConfig, tableService, Collections.emptyList(), executeEngineManager); + } + + public ProcessService( + Configurations serviceConfig, + TableService tableService, + List actionCoordinators, + ExecuteEngineManager executeEngineManager) { this.tableService = tableService; - this.actionCoordinatorManager = actionCoordinatorManager; + this.actionCoordinatorList = actionCoordinators; this.executeEngineManager = executeEngineManager; } @@ -148,7 +157,6 @@ public void cancel(TableProcess process) { /** Dispose the service, shutdown engines and clear active processes. */ public void dispose() { - actionCoordinatorManager.close(); executeEngineManager.close(); processExecutionPool.shutdown(); activeTableProcess.clear(); @@ -156,16 +164,12 @@ public void dispose() { private void initialize(List tableRuntimes) { LOG.info("Initializing process service"); - actionCoordinatorManager.initialize(); - actionCoordinatorManager - .installedPlugins() - .forEach( - actionCoordinator -> { - actionCoordinators.put( - actionCoordinator.action().getName(), - new ActionCoordinatorScheduler( - actionCoordinator, tableService, ProcessService.this)); - }); + // Pre-configured coordinators built from TableRuntimeFactory / ProcessFactory + for (ActionCoordinator actionCoordinator : actionCoordinatorList) { + actionCoordinators.put( + actionCoordinator.action().getName(), + new ActionCoordinatorScheduler(actionCoordinator, tableService, ProcessService.this)); + } executeEngineManager.initialize(); executeEngineManager .installedPlugins() diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/process/TableProcessFactoryManager.java b/amoro-ams/src/main/java/org/apache/amoro/server/process/TableProcessFactoryManager.java new file mode 100755 index 0000000000..f54290937d --- /dev/null +++ b/amoro-ams/src/main/java/org/apache/amoro/server/process/TableProcessFactoryManager.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.process; + +import org.apache.amoro.process.ProcessFactory; +import org.apache.amoro.server.manager.AbstractPluginManager; + +/** + * Plugin manager for {@link ProcessFactory} implementations. + * + *

Process factories are configured via {@code plugins/process-factories.yaml} and are + * responsible for describing how different {@code TableFormat} / {@code Action} combinations should + * be scheduled and executed. + */ +public class TableProcessFactoryManager extends AbstractPluginManager { + + public TableProcessFactoryManager() { + super("process-factories"); + } +} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultActionCoordinator.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultActionCoordinator.java new file mode 100755 index 0000000000..053e3c1cff --- /dev/null +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultActionCoordinator.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.table; + +import org.apache.amoro.Action; +import org.apache.amoro.TableFormat; +import org.apache.amoro.TableRuntime; +import org.apache.amoro.process.ActionCoordinator; +import org.apache.amoro.process.ProcessFactory; +import org.apache.amoro.process.ProcessTriggerStrategy; +import org.apache.amoro.process.RecoverProcessFailedException; +import org.apache.amoro.process.TableProcess; +import org.apache.amoro.process.TableProcessStore; +import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions; + +import java.util.Map; +import java.util.Optional; + +/** + * Default implementation of {@link ActionCoordinator} that bridges {@link ProcessFactory} + * declarations to the AMS scheduling framework. + */ +public class DefaultActionCoordinator implements ActionCoordinator { + + private final Action action; + private final TableFormat format; + private final ProcessFactory factory; + private final ProcessTriggerStrategy strategy; + + public DefaultActionCoordinator(TableFormat format, Action action, ProcessFactory factory) { + this.action = action; + this.format = format; + this.factory = factory; + this.strategy = factory.triggerStrategy(format, action); + Preconditions.checkArgument( + strategy != null, + "ProcessTriggerStrategy cannot be null for format %s, action %s, factory %s", + format, + action, + factory.name()); + } + + @Override + public String name() { + // No need to be globally unique, this coordinator is not discovered via plugin manager. + return String.format("%s-%s-coordinator", format.name().toLowerCase(), action.getName()); + } + + @Override + public void open(Map properties) { + // No-op: lifecycle is managed by owning TableRuntimeFactory. + } + + @Override + public void close() { + // No-op: nothing to close. + } + + @Override + public boolean formatSupported(TableFormat format) { + return this.format.equals(format); + } + + @Override + public int parallelism() { + return strategy.getTriggerParallelism(); + } + + @Override + public Action action() { + return action; + } + + @Override + public long getNextExecutingTime(TableRuntime tableRuntime) { + // Fixed-rate scheduling based on configured trigger interval. + return strategy.getTriggerInterval().toMillis(); + } + + @Override + public boolean enabled(TableRuntime tableRuntime) { + return formatSupported(tableRuntime.getFormat()); + } + + @Override + public long getExecutorDelay() { + return strategy.getTriggerInterval().toMillis(); + } + + @Override + public Optional trigger(TableRuntime tableRuntime) { + return factory.trigger(tableRuntime, action); + } + + @Override + public TableProcess recoverTableProcess( + TableRuntime tableRuntime, TableProcessStore processStore) { + try { + return factory.recover(tableRuntime, processStore); + } catch (RecoverProcessFailedException e) { + throw new IllegalStateException( + String.format( + "Failed to recover table process for format %s, action %s, table %s", + format, action, tableRuntime.getTableIdentifier()), + e); + } + } +} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntimeFactory.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntimeFactory.java index 41c8041f22..aa46a23005 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntimeFactory.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntimeFactory.java @@ -18,6 +18,7 @@ package org.apache.amoro.server.table; +import org.apache.amoro.Action; import org.apache.amoro.ServerTableIdentifier; import org.apache.amoro.TableFormat; import org.apache.amoro.TableRuntime; @@ -28,16 +29,37 @@ import org.apache.amoro.table.TableRuntimeFactory; import org.apache.amoro.table.TableRuntimeStore; +import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; +/** + * Default {@link TableRuntimeFactory} implementation used by AMS. + * + *

Besides creating {@link DefaultTableRuntime} instances for mixed/iceberg formats, this factory + * also aggregates {@link ProcessFactory} declarations to expose {@link ActionCoordinator} plugins + * for different {@link TableFormat}/{@link Action} combinations. + */ public class DefaultTableRuntimeFactory implements TableRuntimeFactory { + + /** Mapping from table format to its supported actions and corresponding process factory. */ + private final Map> factoriesByFormat = new HashMap<>(); + + /** Coordinators derived from all installed process factories. */ + private final List supportedCoordinators = Lists.newArrayList(); + @Override - public void open(Map properties) {} + public void open(Map properties) { + // No-op: all configuration is handled by process factories themselves. + } @Override - public void close() {} + public void close() { + // No-op + } @Override public String name() { @@ -46,27 +68,84 @@ public String name() { @Override public List supportedCoordinators() { - return Lists.newArrayList(); + return supportedCoordinators; } @Override - public void initialize(List factories) {} + public void initialize(List factories) { + factoriesByFormat.clear(); + supportedCoordinators.clear(); + + for (ProcessFactory factory : factories) { + Map> supported = factory.supportedActions(); + if (supported == null || supported.isEmpty()) { + continue; + } + + for (Map.Entry> entry : supported.entrySet()) { + TableFormat format = entry.getKey(); + Map byAction = + factoriesByFormat.computeIfAbsent(format, k -> new HashMap<>()); + + for (Action action : entry.getValue()) { + ProcessFactory existed = byAction.get(action); + if (existed != null && existed != factory) { + throw new IllegalArgumentException( + String.format( + "ProcessFactory conflict for format %s and action %s, existing: %s, new: %s", + format, action, existed.name(), factory.name())); + } + byAction.put(action, factory); + supportedCoordinators.add(new DefaultActionCoordinator(format, action, factory)); + } + } + } + } @Override public Optional accept( ServerTableIdentifier tableIdentifier, Map tableProperties) { - if (tableIdentifier - .getFormat() - .in(TableFormat.MIXED_ICEBERG, TableFormat.MIXED_HIVE, TableFormat.ICEBERG)) { - return Optional.of(new TableRuntimeCreatorImpl()); + TableFormat format = tableIdentifier.getFormat(); + boolean defaultSupported = + format.in(TableFormat.MIXED_ICEBERG, TableFormat.MIXED_HIVE, TableFormat.ICEBERG); + boolean hasProcessFactories = factoriesByFormat.containsKey(format); + + if (!defaultSupported && !hasProcessFactories) { + return Optional.empty(); } - return Optional.empty(); + + return Optional.of(new TableRuntimeCreatorImpl(format)); } - private static class TableRuntimeCreatorImpl implements TableRuntimeFactory.TableRuntimeCreator { + private class TableRuntimeCreatorImpl implements TableRuntimeFactory.TableRuntimeCreator { + + private final TableFormat format; + + private TableRuntimeCreatorImpl(TableFormat format) { + this.format = format; + } + @Override public List> requiredStateKeys() { - return DefaultTableRuntime.REQUIRED_STATES; + Map> merged = new LinkedHashMap<>(); + // 1) DefaultTableRuntime required states + for (StateKey stateKey : DefaultTableRuntime.REQUIRED_STATES) { + merged.put(stateKey.getKey(), stateKey); + } + + // 2) Extra states from all process factories for this format (if any) + Map byAction = factoriesByFormat.get(format); + if (byAction != null) { + byAction + .values() + .forEach( + factory -> + factory + .requiredStates() + .forEach(stateKey -> merged.put(stateKey.getKey(), stateKey))); + } + + return Lists.newArrayList(merged.values()); } @Override From 935f399f047d193a71bafcacb211a342528e58ba Mon Sep 17 00:00:00 2001 From: "zhangyongxiang.alpha" Date: Fri, 27 Feb 2026 22:00:39 +0800 Subject: [PATCH 2/7] refactor(table-runtime): decouple TableRuntimeFactory from ActivePlugin and use default runtime ## Summary Decouple `TableRuntimeFactory` from the generic `ActivePlugin` mechanism and make `AmoroServiceContainer` explicitly use `DefaultTableRuntimeFactory` as the default implementation. ## Details - Change `TableRuntimeFactory` in `amoro-common` so it no longer extends `ActivePlugin`, keeping only process-related APIs: - `List supportedCoordinators()` - `void initialize(List factories)` - `Optional accept(ServerTableIdentifier, Map)` - Refactor `TableRuntimeFactoryManager` in AMS: - Remove inheritance from `AbstractPluginManager` to avoid the `ActivePlugin` constraint - Implement a simple manager that wraps a provided `List` - Provide a default no-arg constructor that installs a single `DefaultTableRuntimeFactory` - Keep `initialize()` as a no-op and `installedPlugins()` as the accessor to preserve existing wiring in `DefaultTableService` - Update `DefaultTableRuntimeFactory`: - Remove `@Override` annotations from `open/close/name` since they no longer implement `ActivePlugin` methods - Keep the methods as no-op helpers with the same behavior - Update `AmoroServiceContainer.startOptimizingService` to use `DefaultTableRuntimeFactory` directly: - Construct a `DefaultTableRuntimeFactory` instance explicitly - Wrap it into `TableRuntimeFactoryManager` via `Collections.singletonList(tableRuntimeFactory)` - Use the resulting singleton list as the only table runtime factory when initializing process factories and collecting `ActionCoordinator`s - Leave `DefaultTableService` logic unchanged, it still uses `TableRuntimeFactoryManager.installedPlugins()` but now operates over the explicit default factory list - Ensure `ProcessService` changes from previous step are included in this commit so the module compiles cleanly ## Verification - Ran `./mvnw -pl amoro-ams -am -DskipTests compile` from repo root - Build succeeded for the full AMS reactor - `spotless` and `checkstyle` passed with only existing warnings unrelated to this change Co-Authored-By: Aime Change-Id: Ifa8deef0d2553176300cdef2c0cb073d52ee3303 --- .../amoro/server/AmoroServiceContainer.java | 7 +++-- .../amoro/server/process/ProcessService.java | 9 ------ .../table/DefaultTableRuntimeFactory.java | 3 -- .../table/TableRuntimeFactoryManager.java | 31 ++++++++++++++++--- .../amoro/table/TableRuntimeFactory.java | 3 +- 5 files changed, 33 insertions(+), 20 deletions(-) diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java index 406a55ccb9..7a801c593e 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java @@ -57,6 +57,7 @@ import org.apache.amoro.server.resource.OptimizerManager; import org.apache.amoro.server.scheduler.inline.InlineTableExecutors; import org.apache.amoro.server.table.DefaultTableManager; +import org.apache.amoro.server.table.DefaultTableRuntimeFactory; import org.apache.amoro.server.table.DefaultTableService; import org.apache.amoro.server.table.RuntimeHandlerChain; import org.apache.amoro.server.table.TableManager; @@ -94,6 +95,7 @@ import java.nio.file.Files; import java.nio.file.Paths; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Locale; @@ -235,8 +237,9 @@ public void transitionToFollower() { } public void startOptimizingService() throws Exception { - TableRuntimeFactoryManager tableRuntimeFactoryManager = new TableRuntimeFactoryManager(); - tableRuntimeFactoryManager.initialize(); + DefaultTableRuntimeFactory tableRuntimeFactory = new DefaultTableRuntimeFactory(); + TableRuntimeFactoryManager tableRuntimeFactoryManager = + new TableRuntimeFactoryManager(Collections.singletonList(tableRuntimeFactory)); tableService = new DefaultTableService(serviceConfig, catalogManager, tableRuntimeFactoryManager); diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java b/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java index 53de2fb3d6..c2f18730a3 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java @@ -77,15 +77,6 @@ public ProcessService(Configurations serviceConfig, TableService tableService) { this(serviceConfig, tableService, Collections.emptyList(), new ExecuteEngineManager()); } - @Deprecated - public ProcessService( - Configurations serviceConfig, - TableService tableService, - ActionCoordinatorManager actionCoordinatorManager, - ExecuteEngineManager executeEngineManager) { - this(serviceConfig, tableService, Collections.emptyList(), executeEngineManager); - } - public ProcessService( Configurations serviceConfig, TableService tableService, diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntimeFactory.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntimeFactory.java index aa46a23005..af5f9ad0b2 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntimeFactory.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntimeFactory.java @@ -51,17 +51,14 @@ public class DefaultTableRuntimeFactory implements TableRuntimeFactory { /** Coordinators derived from all installed process factories. */ private final List supportedCoordinators = Lists.newArrayList(); - @Override public void open(Map properties) { // No-op: all configuration is handled by process factories themselves. } - @Override public void close() { // No-op } - @Override public String name() { return "default"; } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableRuntimeFactoryManager.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableRuntimeFactoryManager.java index 65a9061081..acbb18cb78 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableRuntimeFactoryManager.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableRuntimeFactoryManager.java @@ -18,13 +18,36 @@ package org.apache.amoro.server.table; -import org.apache.amoro.server.manager.AbstractPluginManager; import org.apache.amoro.table.TableRuntimeFactory; -public class TableRuntimeFactoryManager extends AbstractPluginManager { - public static final String PLUGIN_CATEGORY = "table-runtime-factories"; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * Simple manager for {@link TableRuntimeFactory} implementations. + * + *

AMS currently only ships with {@link DefaultTableRuntimeFactory}. This manager wraps a list of + * factories so that existing wiring code in {@link DefaultTableService} can stay unchanged while + * {@link TableRuntimeFactory} is no longer an {@code ActivePlugin}. + */ +public class TableRuntimeFactoryManager { + + private final List factories = new ArrayList<>(); public TableRuntimeFactoryManager() { - super(PLUGIN_CATEGORY); + this(Collections.singletonList(new DefaultTableRuntimeFactory())); + } + + public TableRuntimeFactoryManager(List factories) { + this.factories.addAll(factories); + } + + public void initialize() { + // kept for backward compatibility, no-op for now + } + + public List installedPlugins() { + return factories; } } diff --git a/amoro-common/src/main/java/org/apache/amoro/table/TableRuntimeFactory.java b/amoro-common/src/main/java/org/apache/amoro/table/TableRuntimeFactory.java index 90e26bd4b6..5799166c59 100644 --- a/amoro-common/src/main/java/org/apache/amoro/table/TableRuntimeFactory.java +++ b/amoro-common/src/main/java/org/apache/amoro/table/TableRuntimeFactory.java @@ -18,7 +18,6 @@ package org.apache.amoro.table; -import org.apache.amoro.ActivePlugin; import org.apache.amoro.ServerTableIdentifier; import org.apache.amoro.TableRuntime; import org.apache.amoro.process.ActionCoordinator; @@ -29,7 +28,7 @@ import java.util.Optional; /** Table runtime factory. */ -public interface TableRuntimeFactory extends ActivePlugin { +public interface TableRuntimeFactory { List supportedCoordinators(); From 8fb278a70fbb6eb552af8fc4d6b4d07372dca45c Mon Sep 17 00:00:00 2001 From: "zhangyongxiang.alpha" Date: Sat, 28 Feb 2026 17:44:55 +0800 Subject: [PATCH 3/7] refactor(table-runtime): remove TableRuntimeFactoryManager and inline default factory ## Summary - Remove TableRuntimeFactoryManager indirection and wire DefaultTableService directly with a TableRuntimeFactory implementation. - Simplify DefaultTableRuntimeFactory after decoupling from plugin framework and ActivePlugin lifecycle. - Inline default table runtime factory wiring into AmoroServiceContainer and tests, and update process/service initialization. ## Details - DefaultTableService - Change constructor to accept a TableRuntimeFactory instead of TableRuntimeFactoryManager. - Replace iteration over installed factories with a single factory.accept(...) call when creating TableRuntime instances. - DefaultTableRuntimeFactory - Drop unused ActivePlugin-style methods: open(Map), close(), and name(). - AmoroServiceContainer - Instantiate a single DefaultTableRuntimeFactory in startOptimizingService and pass it into DefaultTableService. - Initialize the defaultRuntimeFactory with ProcessFactory plugins and derive ActionCoordinators from it directly. - Tests - AMSServiceTestBase: construct DefaultTableService with a concrete DefaultTableRuntimeFactory instead of a mocked TableRuntimeFactoryManager. - TestDefaultTableRuntimeHandler: hold a DefaultTableRuntimeFactory field and pass it into DefaultTableService for all test setups. - Cleanup - Delete TableRuntimeFactoryManager class and remove all references and related imports across main and test code. - Fix spotless formatting violations in the touched files so that `mvn -pl amoro-ams -am -DskipTests compile` passes. Co-Authored-By: Aime Change-Id: I8acca1841470dfc1bbd87b77e424a34f4d52ae82 --- .../amoro/server/AmoroServiceContainer.java | 21 ++------ .../table/DefaultTableRuntimeFactory.java | 12 ----- .../server/table/DefaultTableService.java | 35 ++++++------ .../table/TableRuntimeFactoryManager.java | 53 ------------------- .../amoro/server/AMSServiceTestBase.java | 12 +---- .../table/TestDefaultTableRuntimeHandler.java | 17 ++---- 6 files changed, 28 insertions(+), 122 deletions(-) delete mode 100644 amoro-ams/src/main/java/org/apache/amoro/server/table/TableRuntimeFactoryManager.java diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java index 7a801c593e..b916731eee 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java @@ -61,7 +61,6 @@ import org.apache.amoro.server.table.DefaultTableService; import org.apache.amoro.server.table.RuntimeHandlerChain; import org.apache.amoro.server.table.TableManager; -import org.apache.amoro.server.table.TableRuntimeFactoryManager; import org.apache.amoro.server.table.TableService; import org.apache.amoro.server.terminal.TerminalManager; import org.apache.amoro.server.utils.ThriftServiceProxy; @@ -80,7 +79,6 @@ import org.apache.amoro.shade.thrift.org.apache.thrift.transport.TTransportException; import org.apache.amoro.shade.thrift.org.apache.thrift.transport.TTransportFactory; import org.apache.amoro.shade.thrift.org.apache.thrift.transport.layered.TFramedTransport; -import org.apache.amoro.table.TableRuntimeFactory; import org.apache.amoro.utils.IcebergThreadPools; import org.apache.amoro.utils.JacksonUtil; import org.apache.commons.lang3.StringUtils; @@ -95,7 +93,6 @@ import java.nio.file.Files; import java.nio.file.Paths; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Locale; @@ -103,7 +100,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; -import java.util.stream.Collectors; public class AmoroServiceContainer { @@ -237,28 +233,21 @@ public void transitionToFollower() { } public void startOptimizingService() throws Exception { - DefaultTableRuntimeFactory tableRuntimeFactory = new DefaultTableRuntimeFactory(); - TableRuntimeFactoryManager tableRuntimeFactoryManager = - new TableRuntimeFactoryManager(Collections.singletonList(tableRuntimeFactory)); + DefaultTableRuntimeFactory defaultRuntimeFactory = new DefaultTableRuntimeFactory(); - tableService = - new DefaultTableService(serviceConfig, catalogManager, tableRuntimeFactoryManager); + tableService = new DefaultTableService(serviceConfig, catalogManager, defaultRuntimeFactory); optimizingService = new DefaultOptimizingService(serviceConfig, catalogManager, optimizerManager, tableService); - // Load process factories and build action coordinators from all table runtime factories. + // Load process factories and build action coordinators from default table runtime factory. TableProcessFactoryManager tableProcessFactoryManager = new TableProcessFactoryManager(); tableProcessFactoryManager.initialize(); List processFactories = tableProcessFactoryManager.installedPlugins(); - List tableRuntimeFactories = tableRuntimeFactoryManager.installedPlugins(); - tableRuntimeFactories.forEach(factory -> factory.initialize(processFactories)); + defaultRuntimeFactory.initialize(processFactories); - List actionCoordinators = - tableRuntimeFactories.stream() - .flatMap(factory -> factory.supportedCoordinators().stream()) - .collect(Collectors.toList()); + List actionCoordinators = defaultRuntimeFactory.supportedCoordinators(); ExecuteEngineManager executeEngineManager = new ExecuteEngineManager(); diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntimeFactory.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntimeFactory.java index af5f9ad0b2..e148aca4b0 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntimeFactory.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntimeFactory.java @@ -51,18 +51,6 @@ public class DefaultTableRuntimeFactory implements TableRuntimeFactory { /** Coordinators derived from all installed process factories. */ private final List supportedCoordinators = Lists.newArrayList(); - public void open(Map properties) { - // No-op: all configuration is handled by process factories themselves. - } - - public void close() { - // No-op - } - - public String name() { - return "default"; - } - @Override public List supportedCoordinators() { return supportedCoordinators; diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java index 464cd5601d..a247b18041 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java @@ -44,6 +44,7 @@ import org.apache.amoro.shade.guava32.com.google.common.collect.Lists; import org.apache.amoro.shade.guava32.com.google.common.collect.Sets; import org.apache.amoro.shade.guava32.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.amoro.table.TableRuntimeFactory; import org.apache.amoro.table.TableSummary; import org.apache.amoro.utils.TablePropertyUtil; import org.slf4j.Logger; @@ -85,19 +86,19 @@ public class DefaultTableService extends PersistentBase implements TableService private final CompletableFuture initialized = new CompletableFuture<>(); private final Configurations serverConfiguration; private final CatalogManager catalogManager; - private final TableRuntimeFactoryManager tableRuntimeFactoryManager; + private final TableRuntimeFactory tableRuntimeFactory; private RuntimeHandlerChain headHandler; private ExecutorService tableExplorerExecutors; public DefaultTableService( Configurations configuration, CatalogManager catalogManager, - TableRuntimeFactoryManager tableRuntimeFactoryManager) { + TableRuntimeFactory tableRuntimeFactory) { this.catalogManager = catalogManager; this.externalCatalogRefreshingInterval = configuration.get(AmoroManagementConf.REFRESH_EXTERNAL_CATALOGS_INTERVAL).toMillis(); this.serverConfiguration = configuration; - this.tableRuntimeFactoryManager = tableRuntimeFactoryManager; + this.tableRuntimeFactory = tableRuntimeFactory; } @Override @@ -515,21 +516,19 @@ private Optional createTableRuntime( ServerTableIdentifier identifier, TableRuntimeMeta runtimeMeta, List restoredStates) { - return tableRuntimeFactoryManager.installedPlugins().stream() - .map(f -> f.accept(identifier, runtimeMeta.getTableConfig())) - .filter(Optional::isPresent) - .map(Optional::get) - .findFirst() - .map( - creator -> { - DefaultTableRuntimeStore store = - new DefaultTableRuntimeStore( - identifier, runtimeMeta, creator.requiredStateKeys(), restoredStates); - store.setRuntimeHandler(this); - TableRuntime tableRuntime = creator.create(store); - store.setTableRuntime(tableRuntime); - return tableRuntime; - }); + Optional creatorOpt = + tableRuntimeFactory.accept(identifier, runtimeMeta.getTableConfig()); + + return creatorOpt.map( + creator -> { + DefaultTableRuntimeStore store = + new DefaultTableRuntimeStore( + identifier, runtimeMeta, creator.requiredStateKeys(), restoredStates); + store.setRuntimeHandler(this); + TableRuntime tableRuntime = creator.create(store); + store.setTableRuntime(tableRuntime); + return tableRuntime; + }); } private void revertTableRuntimeAdded( diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableRuntimeFactoryManager.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableRuntimeFactoryManager.java deleted file mode 100644 index acbb18cb78..0000000000 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableRuntimeFactoryManager.java +++ /dev/null @@ -1,53 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.amoro.server.table; - -import org.apache.amoro.table.TableRuntimeFactory; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - -/** - * Simple manager for {@link TableRuntimeFactory} implementations. - * - *

AMS currently only ships with {@link DefaultTableRuntimeFactory}. This manager wraps a list of - * factories so that existing wiring code in {@link DefaultTableService} can stay unchanged while - * {@link TableRuntimeFactory} is no longer an {@code ActivePlugin}. - */ -public class TableRuntimeFactoryManager { - - private final List factories = new ArrayList<>(); - - public TableRuntimeFactoryManager() { - this(Collections.singletonList(new DefaultTableRuntimeFactory())); - } - - public TableRuntimeFactoryManager(List factories) { - this.factories.addAll(factories); - } - - public void initialize() { - // kept for backward compatibility, no-op for now - } - - public List installedPlugins() { - return factories; - } -} diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java b/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java index 9f3d25b3f7..bd8e4cdec8 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java @@ -26,12 +26,9 @@ import org.apache.amoro.server.table.DefaultTableRuntime; import org.apache.amoro.server.table.DefaultTableRuntimeFactory; import org.apache.amoro.server.table.DefaultTableService; -import org.apache.amoro.server.table.TableRuntimeFactoryManager; -import org.apache.amoro.shade.guava32.com.google.common.collect.Lists; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; -import org.mockito.Mockito; import java.time.Duration; @@ -40,22 +37,16 @@ public abstract class AMSServiceTestBase extends AMSManagerTestBase { private static DefaultOptimizingService OPTIMIZING_SERVICE = null; private static ProcessService PROCESS_SERVICE = null; - private static TableRuntimeFactoryManager tableRuntimeFactoryManager = null; - @BeforeClass public static void initTableService() { DefaultTableRuntimeFactory runtimeFactory = new DefaultTableRuntimeFactory(); - tableRuntimeFactoryManager = Mockito.mock(TableRuntimeFactoryManager.class); - Mockito.when(tableRuntimeFactoryManager.installedPlugins()) - .thenReturn(Lists.newArrayList(runtimeFactory)); try { Configurations configurations = new Configurations(); configurations.set(AmoroManagementConf.OPTIMIZER_HB_TIMEOUT, Duration.ofMillis(800L)); configurations.set( AmoroManagementConf.OPTIMIZER_TASK_EXECUTE_TIMEOUT, Duration.ofMillis(30000L)); TABLE_SERVICE = - new DefaultTableService( - new Configurations(), CATALOG_MANAGER, tableRuntimeFactoryManager); + new DefaultTableService(new Configurations(), CATALOG_MANAGER, runtimeFactory); OPTIMIZING_SERVICE = new DefaultOptimizingService( configurations, CATALOG_MANAGER, OPTIMIZER_MANAGER, TABLE_SERVICE); @@ -80,7 +71,6 @@ public static void disposeTableService() { TABLE_SERVICE.dispose(); MetricManager.dispose(); EventsManager.dispose(); - tableRuntimeFactoryManager = null; } protected DefaultTableService tableService() { diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestDefaultTableRuntimeHandler.java b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestDefaultTableRuntimeHandler.java index 792faebcf7..898b3d0151 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestDefaultTableRuntimeHandler.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestDefaultTableRuntimeHandler.java @@ -41,7 +41,6 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import org.mockito.Mockito; import java.util.List; @@ -49,7 +48,7 @@ public class TestDefaultTableRuntimeHandler extends AMSTableTestBase { private DefaultTableService tableService; - private final TableRuntimeFactoryManager runtimeFactoryManager; + private final DefaultTableRuntimeFactory runtimeFactory; @Parameterized.Parameters(name = "{0}, {1}") public static Object[] parameters() { @@ -66,16 +65,12 @@ public static Object[] parameters() { public TestDefaultTableRuntimeHandler( CatalogTestHelper catalogTestHelper, TableTestHelper tableTestHelper) { super(catalogTestHelper, tableTestHelper, false); - DefaultTableRuntimeFactory runtimeFactory = new DefaultTableRuntimeFactory(); - runtimeFactoryManager = Mockito.mock(TableRuntimeFactoryManager.class); - Mockito.when(runtimeFactoryManager.installedPlugins()) - .thenReturn(Lists.newArrayList(runtimeFactory)); + this.runtimeFactory = new DefaultTableRuntimeFactory(); } @Test public void testInitialize() throws Exception { - tableService = - new DefaultTableService(new Configurations(), CATALOG_MANAGER, runtimeFactoryManager); + tableService = new DefaultTableService(new Configurations(), CATALOG_MANAGER, runtimeFactory); TestHandler handler = new TestHandler(); tableService.addHandlerChain(handler); tableService.initialize(); @@ -94,8 +89,7 @@ public void testInitialize() throws Exception { Assert.assertTrue(handler.isDisposed()); // initialize with a history table - tableService = - new DefaultTableService(new Configurations(), CATALOG_MANAGER, runtimeFactoryManager); + tableService = new DefaultTableService(new Configurations(), CATALOG_MANAGER, runtimeFactory); handler = new TestHandler(); tableService.addHandlerChain(handler); tableService.initialize(); @@ -132,8 +126,7 @@ public void testInitialize() throws Exception { @Test public void testRefreshUpdatesOptimizerGroup() throws Exception { - tableService = - new DefaultTableService(new Configurations(), CATALOG_MANAGER, runtimeFactoryManager); + tableService = new DefaultTableService(new Configurations(), CATALOG_MANAGER, runtimeFactory); TestHandler handler = new TestHandler(); tableService.addHandlerChain(handler); tableService.initialize(); From 0ca943c8f8d3176f1e24ebfc0d3501f6e76fa9f3 Mon Sep 17 00:00:00 2001 From: "zhangyongxiang.alpha" Date: Wed, 4 Mar 2026 21:21:01 +0800 Subject: [PATCH 4/7] fix comment --- .../apache/amoro/server/AmoroServiceContainer.java | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java index b916731eee..5f3365c899 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java @@ -233,26 +233,22 @@ public void transitionToFollower() { } public void startOptimizingService() throws Exception { - DefaultTableRuntimeFactory defaultRuntimeFactory = new DefaultTableRuntimeFactory(); - - tableService = new DefaultTableService(serviceConfig, catalogManager, defaultRuntimeFactory); - - optimizingService = - new DefaultOptimizingService(serviceConfig, catalogManager, optimizerManager, tableService); - // Load process factories and build action coordinators from default table runtime factory. TableProcessFactoryManager tableProcessFactoryManager = new TableProcessFactoryManager(); tableProcessFactoryManager.initialize(); List processFactories = tableProcessFactoryManager.installedPlugins(); + DefaultTableRuntimeFactory defaultRuntimeFactory = new DefaultTableRuntimeFactory(); defaultRuntimeFactory.initialize(processFactories); List actionCoordinators = defaultRuntimeFactory.supportedCoordinators(); - ExecuteEngineManager executeEngineManager = new ExecuteEngineManager(); + tableService = new DefaultTableService(serviceConfig, catalogManager, defaultRuntimeFactory); processService = new ProcessService(serviceConfig, tableService, actionCoordinators, executeEngineManager); + optimizingService = + new DefaultOptimizingService(serviceConfig, catalogManager, optimizerManager, tableService); LOG.info("Setting up AMS table executors..."); InlineTableExecutors.getInstance().setup(tableService, serviceConfig); From 425326a156ff5ff12f447f8bbfe4cd70e6766ddd Mon Sep 17 00:00:00 2001 From: "zhangyongxiang.alpha" Date: Wed, 4 Mar 2026 21:25:45 +0800 Subject: [PATCH 5/7] fix comment --- .../org/apache/amoro/server/process/ProcessService.java | 7 ------- 1 file changed, 7 deletions(-) diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java b/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java index c2f18730a3..bf4ab6564b 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java @@ -548,13 +548,6 @@ protected void doDispose() { } } - /** Manager for {@link ActionCoordinator} plugins. */ - public static class ActionCoordinatorManager extends AbstractPluginManager { - public ActionCoordinatorManager() { - super("action-coordinators"); - } - } - /** Manager for {@link ExecuteEngine} plugins. */ public static class ExecuteEngineManager extends AbstractPluginManager { public ExecuteEngineManager() { From bc8175d706d37ad7722445a9f16a340f61638afc Mon Sep 17 00:00:00 2001 From: "zhangyongxiang.alpha" Date: Thu, 5 Mar 2026 12:10:44 +0800 Subject: [PATCH 6/7] fix comment --- .../amoro/server/AmoroServiceContainer.java | 3 +-- .../amoro/server/process/ProcessService.java | 6 ++---- .../server/table/DefaultTableRuntimeFactory.java | 16 ++++++++++++++-- .../apache/amoro/server/AMSServiceTestBase.java | 2 +- 4 files changed, 18 insertions(+), 9 deletions(-) diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java index 5f3365c899..6fa3293e99 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java @@ -245,8 +245,7 @@ public void startOptimizingService() throws Exception { ExecuteEngineManager executeEngineManager = new ExecuteEngineManager(); tableService = new DefaultTableService(serviceConfig, catalogManager, defaultRuntimeFactory); - processService = - new ProcessService(serviceConfig, tableService, actionCoordinators, executeEngineManager); + processService = new ProcessService(tableService, actionCoordinators, executeEngineManager); optimizingService = new DefaultOptimizingService(serviceConfig, catalogManager, optimizerManager, tableService); diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java b/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java index bf4ab6564b..79881c61ba 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/process/ProcessService.java @@ -23,7 +23,6 @@ import org.apache.amoro.ServerTableIdentifier; import org.apache.amoro.TableFormat; import org.apache.amoro.TableRuntime; -import org.apache.amoro.config.Configurations; import org.apache.amoro.config.TableConfiguration; import org.apache.amoro.process.ActionCoordinator; import org.apache.amoro.process.ProcessEvent; @@ -73,12 +72,11 @@ public class ProcessService extends PersistentBase { private final Map> activeTableProcess = new ConcurrentHashMap<>(); - public ProcessService(Configurations serviceConfig, TableService tableService) { - this(serviceConfig, tableService, Collections.emptyList(), new ExecuteEngineManager()); + public ProcessService(TableService tableService) { + this(tableService, Collections.emptyList(), new ExecuteEngineManager()); } public ProcessService( - Configurations serviceConfig, TableService tableService, List actionCoordinators, ExecuteEngineManager executeEngineManager) { diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntimeFactory.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntimeFactory.java index e148aca4b0..d2a622208e 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntimeFactory.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntimeFactory.java @@ -29,6 +29,7 @@ import org.apache.amoro.table.TableRuntimeFactory; import org.apache.amoro.table.TableRuntimeStore; +import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; @@ -53,7 +54,7 @@ public class DefaultTableRuntimeFactory implements TableRuntimeFactory { @Override public List supportedCoordinators() { - return supportedCoordinators; + return Collections.unmodifiableList(supportedCoordinators); } @Override @@ -127,7 +128,18 @@ public List> requiredStateKeys() { factory -> factory .requiredStates() - .forEach(stateKey -> merged.put(stateKey.getKey(), stateKey))); + .forEach( + stateKey -> { + if (merged.containsKey(stateKey.getKey())) { + throw new IllegalStateException( + "Failed to initialize table runtime creator, due to stateKey: " + + stateKey.getKey() + + " declared by process-factory: " + + factory.name() + + " has already been defined."); + } + merged.put(stateKey.getKey(), stateKey); + })); } return Lists.newArrayList(merged.values()); diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java b/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java index bd8e4cdec8..f7266ba04d 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java @@ -50,7 +50,7 @@ public static void initTableService() { OPTIMIZING_SERVICE = new DefaultOptimizingService( configurations, CATALOG_MANAGER, OPTIMIZER_MANAGER, TABLE_SERVICE); - PROCESS_SERVICE = new ProcessService(configurations, TABLE_SERVICE); + PROCESS_SERVICE = new ProcessService(TABLE_SERVICE); TABLE_SERVICE.addHandlerChain(OPTIMIZING_SERVICE.getTableRuntimeHandler()); TABLE_SERVICE.addHandlerChain(PROCESS_SERVICE.getTableHandlerChain()); From 9e435348fba90b34692ff8693c35ab08c5233a7a Mon Sep 17 00:00:00 2001 From: "zhangyongxiang.alpha" Date: Thu, 5 Mar 2026 15:08:33 +0800 Subject: [PATCH 7/7] fix comment --- .../apache/amoro/server/table/DefaultTableRuntimeFactory.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntimeFactory.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntimeFactory.java index d2a622208e..83050799a2 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntimeFactory.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntimeFactory.java @@ -122,7 +122,9 @@ public List> requiredStateKeys() { // 2) Extra states from all process factories for this format (if any) Map byAction = factoriesByFormat.get(format); if (byAction != null) { - byAction + Map deFactories = new HashMap<>(); + byAction.forEach((a, f) -> deFactories.putIfAbsent(f.name(), f)); + deFactories .values() .forEach( factory ->