[Improvement]: Load process factories via DefaultTableRuntimeFactory#4100
Conversation
## Summary Wire the new process plugin model into AMS so that table processes are discovered from `ProcessFactory` implementations and scheduled via the existing `ProcessService`. ## Details - Extend `DefaultTableRuntimeFactory` to implement the `TableRuntimeFactory` process APIs: - Aggregate installed `ProcessFactory` instances by `TableFormat` / `Action` and expose derived `ActionCoordinator` plugins via `supportedCoordinators()` - Merge `DefaultTableRuntime.REQUIRED_STATES` with additional states required by all process factories for the same format when building `TableRuntimeCreator` - Keep using `DefaultTableRuntime` without introducing extra runtime types - Introduce `DefaultActionCoordinator` to bridge `ProcessFactory` trigger/recover semantics to the existing scheduler: - Build trigger strategies from `ProcessFactory#triggerStrategy` - Delegate `trigger` and `recoverTableProcess` to the underlying factory - Add `TableProcessFactoryManager` as an `AbstractPluginManager<ProcessFactory>` using the `process-factories` plugin namespace - Refactor `AmoroServiceContainer` startup sequence: - Initialize `TableProcessFactoryManager` and collect all installed `ProcessFactory` instances - Initialize all `TableRuntimeFactory` plugins with the shared list of process factories - Collect all derived `ActionCoordinator`s from table runtime factories and inject them into `ProcessService` - Update `ProcessService` to accept a pre-built list of `ActionCoordinator`s while keeping the original constructors for backward compatibility and tests - Run `mvn spotless:apply` and `mvn -pl amoro-ams -am -DskipTests compile` to ensure style and compilation pass Co-Authored-By: Aime <aime@bytedance.com> Change-Id: Iaa867503c8b0bf93c2b7f0b8fe7d752e2ddbac67
…in and use default runtime ## Summary Decouple `TableRuntimeFactory` from the generic `ActivePlugin` mechanism and make `AmoroServiceContainer` explicitly use `DefaultTableRuntimeFactory` as the default implementation. ## Details - Change `TableRuntimeFactory` in `amoro-common` so it no longer extends `ActivePlugin`, keeping only process-related APIs: - `List<ActionCoordinator> supportedCoordinators()` - `void initialize(List<ProcessFactory> factories)` - `Optional<TableRuntimeCreator> accept(ServerTableIdentifier, Map<String, String>)` - Refactor `TableRuntimeFactoryManager` in AMS: - Remove inheritance from `AbstractPluginManager<TableRuntimeFactory>` to avoid the `ActivePlugin` constraint - Implement a simple manager that wraps a provided `List<TableRuntimeFactory>` - Provide a default no-arg constructor that installs a single `DefaultTableRuntimeFactory` - Keep `initialize()` as a no-op and `installedPlugins()` as the accessor to preserve existing wiring in `DefaultTableService` - Update `DefaultTableRuntimeFactory`: - Remove `@Override` annotations from `open/close/name` since they no longer implement `ActivePlugin` methods - Keep the methods as no-op helpers with the same behavior - Update `AmoroServiceContainer.startOptimizingService` to use `DefaultTableRuntimeFactory` directly: - Construct a `DefaultTableRuntimeFactory` instance explicitly - Wrap it into `TableRuntimeFactoryManager` via `Collections.singletonList(tableRuntimeFactory)` - Use the resulting singleton list as the only table runtime factory when initializing process factories and collecting `ActionCoordinator`s - Leave `DefaultTableService` logic unchanged, it still uses `TableRuntimeFactoryManager.installedPlugins()` but now operates over the explicit default factory list - Ensure `ProcessService` changes from previous step are included in this commit so the module compiles cleanly ## Verification - Ran `./mvnw -pl amoro-ams -am -DskipTests compile` from repo root - Build succeeded for the full AMS reactor - `spotless` and `checkstyle` passed with only existing warnings unrelated to this change Co-Authored-By: Aime <aime@bytedance.com> Change-Id: Ifa8deef0d2553176300cdef2c0cb073d52ee3303
… default factory ## Summary - Remove TableRuntimeFactoryManager indirection and wire DefaultTableService directly with a TableRuntimeFactory implementation. - Simplify DefaultTableRuntimeFactory after decoupling from plugin framework and ActivePlugin lifecycle. - Inline default table runtime factory wiring into AmoroServiceContainer and tests, and update process/service initialization. ## Details - DefaultTableService - Change constructor to accept a TableRuntimeFactory instead of TableRuntimeFactoryManager. - Replace iteration over installed factories with a single factory.accept(...) call when creating TableRuntime instances. - DefaultTableRuntimeFactory - Drop unused ActivePlugin-style methods: open(Map<String, String>), close(), and name(). - AmoroServiceContainer - Instantiate a single DefaultTableRuntimeFactory in startOptimizingService and pass it into DefaultTableService. - Initialize the defaultRuntimeFactory with ProcessFactory plugins and derive ActionCoordinators from it directly. - Tests - AMSServiceTestBase: construct DefaultTableService with a concrete DefaultTableRuntimeFactory instead of a mocked TableRuntimeFactoryManager. - TestDefaultTableRuntimeHandler: hold a DefaultTableRuntimeFactory field and pass it into DefaultTableService for all test setups. - Cleanup - Delete TableRuntimeFactoryManager class and remove all references and related imports across main and test code. - Fix spotless formatting violations in the touched files so that `mvn -pl amoro-ams -am -DskipTests compile` passes. Co-Authored-By: Aime <aime@bytedance.com> Change-Id: I8acca1841470dfc1bbd87b77e424a34f4d52ae82
zhoujinsong
left a comment
There was a problem hiding this comment.
The change looks good to me over all, I left some small suggestion, PTLA.
| new ConcurrentHashMap<>(); | ||
| private final Map<EngineType, ExecuteEngine> executeEngines = new ConcurrentHashMap<>(); | ||
|
|
||
| private final ActionCoordinatorManager actionCoordinatorManager; |
There was a problem hiding this comment.
It seems the ActionCoordinatorManager class can be dropped now.
|
|
||
| /** Table runtime factory. */ | ||
| public interface TableRuntimeFactory extends ActivePlugin { | ||
| public interface TableRuntimeFactory { |
There was a problem hiding this comment.
Do we still need other table runtime factory implementation now, or is the default the only one?
| } | ||
|
|
||
| public ProcessService( | ||
| Configurations serviceConfig, |
There was a problem hiding this comment.
The serviceConfig seems not be used.
| new DefaultOptimizingService(serviceConfig, catalogManager, optimizerManager, tableService); | ||
|
|
||
| processService = new ProcessService(serviceConfig, tableService); | ||
| // Load process factories and build action coordinators from default table runtime factory. |
There was a problem hiding this comment.
// Load process factories and build action coordinators from default table runtime factory.
TableProcessFactoryManager tableProcessFactoryManager = new TableProcessFactoryManager();
tableProcessFactoryManager.initialize();
List<ProcessFactory> processFactories = tableProcessFactoryManager.installedPlugins();
DefaultTableRuntimeFactory defaultRuntimeFactory = new DefaultTableRuntimeFactory();
defaultRuntimeFactory.initialize(processFactories);
List<ActionCoordinator> actionCoordinators = defaultRuntimeFactory.supportedCoordinators();
ExecuteEngineManager executeEngineManager = new ExecuteEngineManager();
tableService = new DefaultTableService(serviceConfig, catalogManager, defaultRuntimeFactory);
processService =
new ProcessService(serviceConfig, tableService, actionCoordinators, executeEngineManager);
optimizingService =
new DefaultOptimizingService(serviceConfig, catalogManager, optimizerManager, tableService);
How about changing the code structure like this?
…y' into upstream/table-runtime-factory
| } | ||
|
|
||
| // 2) Extra states from all process factories for this format (if any) | ||
| Map<Action, ProcessFactory> byAction = factoriesByFormat.get(format); |
There was a problem hiding this comment.
If DefaultTableRuntime.REQUIRED_STATES declares a state key with the same name as a ProcessFactory's required state but with a different type or default value, the factory's key silently overwrites the built-in one.
Suggestion: Add a type-compatibility check or throw on conflicting keys:
There was a problem hiding this comment.
I will add a check to throw an exception when conflicts occur.
There was a problem hiding this comment.
When a single ProcessFactory supports multiple actions for the same format, byAction.values() will contain that factory instance multiple times. Each invocation of factory.requiredStates() produces the same keys, which will hit the merged.containsKey check and throw an IllegalStateException
Fix: Deduplicate factories before iterating?
There was a problem hiding this comment.
I have deduplicate factories.
| return "default"; | ||
| } | ||
| /** Coordinators derived from all installed process factories. */ | ||
| private final List<ActionCoordinator> supportedCoordinators = Lists.newArrayList(); |
There was a problem hiding this comment.
Callers can accidentally mutate the internal supportedCoordinators list. Return Collections.unmodifiableList(supportedCoordinators) instead.
| @Override | ||
| public long getNextExecutingTime(TableRuntime tableRuntime) { | ||
| // Fixed-rate scheduling based on configured trigger interval. | ||
| return strategy.getTriggerInterval().toMillis(); |
There was a problem hiding this comment.
The method returns strategy.getTriggerInterval().toMillis() — a duration, not an absolute timestamp.
Do you mean System.currentTimeMillis() + interval?
There was a problem hiding this comment.
The getNextExecutingTime method actually returns the duration. This is a legacy issue.
There was a problem hiding this comment.
Maybe we should update the interface in a separate PR?
Summary
This PR is a subtask of #4100
Wire the new process plugin model(Introduced by #4097) into AMS so that table processes are discovered from
ProcessFactoryimplementations and scheduled via the existingProcessService.Details
Promote
ProcessFactoryto the single extension point for table processes.DefaultTableRuntimeFactoryaggregates allProcessFactoryimplementations, exposes derivedActionCoordinators, and merges theirrequiredStates()withDefaultTableRuntime.REQUIRED_STATES.Simplify
TableRuntimeFactoryso it only describes table runtime & process wiring.ActivePlugin; lifecycle methods (open/close/name) are removed from the interface and fromDefaultTableRuntimeFactory.Wire AMS directly to
DefaultTableRuntimeFactoryinstead of a generic plugin manager.AmoroServiceContainercreates a singleDefaultTableRuntimeFactory, passes it toDefaultTableService, then initializes it with allProcessFactoryimplementations and usessupportedCoordinators()to build the coordinator list forProcessService.DefaultTableServicenow depends on aTableRuntimeFactorydirectly and uses it to createTableRuntimeinstances.Clean up
ProcessServiceconstructors to match the new model.ActionCoordinatorManager.List<ActionCoordinator>; tests use the 2‑argument convenience overload.