Skip to content

Commit 72eef46

Browse files
committed
[Refactor] Introduce Iceberg TableRuntimePlugin
1 parent 75f3a60 commit 72eef46

10 files changed

Lines changed: 298 additions & 181 deletions

File tree

amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,10 @@
5454
import org.apache.amoro.server.scheduler.inline.InlineTableExecutors;
5555
import org.apache.amoro.server.table.DefaultTableManager;
5656
import org.apache.amoro.server.table.DefaultTableService;
57-
import org.apache.amoro.server.table.RuntimeHandlerChain;
57+
import org.apache.amoro.server.table.IcebergTablePlugin;
5858
import org.apache.amoro.server.table.TableManager;
5959
import org.apache.amoro.server.table.TableRuntimeFactoryManager;
60+
import org.apache.amoro.server.table.TableRuntimePlugin;
6061
import org.apache.amoro.server.table.TableService;
6162
import org.apache.amoro.server.terminal.TerminalManager;
6263
import org.apache.amoro.server.utils.ThriftServiceProxy;
@@ -198,7 +199,7 @@ public void waitFollowerShip() throws Exception {
198199
haContainer.waitFollowerShip();
199200
}
200201

201-
public void startRestServices() throws Exception {
202+
public void startRestServices() {
202203
EventsManager.getInstance();
203204
MetricManager.getInstance();
204205

@@ -239,33 +240,32 @@ public void startOptimizingService() throws Exception {
239240
new DefaultOptimizingService(serviceConfig, catalogManager, optimizerManager, tableService);
240241

241242
processService = new ProcessService(serviceConfig, tableService);
242-
243-
LOG.info("Setting up AMS table executors...");
244-
InlineTableExecutors.getInstance().setup(tableService, serviceConfig);
245-
addHandlerChain(optimizingService.getTableRuntimeHandler());
246-
addHandlerChain(processService.getTableHandlerChain());
247-
addHandlerChain(InlineTableExecutors.getInstance().getDataExpiringExecutor());
248-
addHandlerChain(InlineTableExecutors.getInstance().getSnapshotsExpiringExecutor());
249-
addHandlerChain(InlineTableExecutors.getInstance().getOrphanFilesCleaningExecutor());
250-
addHandlerChain(InlineTableExecutors.getInstance().getDanglingDeleteFilesCleaningExecutor());
251-
addHandlerChain(InlineTableExecutors.getInstance().getOptimizingCommitExecutor());
252-
addHandlerChain(InlineTableExecutors.getInstance().getOptimizingExpiringExecutor());
253-
addHandlerChain(InlineTableExecutors.getInstance().getBlockerExpiringExecutor());
254-
addHandlerChain(InlineTableExecutors.getInstance().getHiveCommitSyncExecutor());
255-
addHandlerChain(InlineTableExecutors.getInstance().getTableRefreshingExecutor());
256-
addHandlerChain(InlineTableExecutors.getInstance().getTagsAutoCreatingExecutor());
257-
tableService.initialize();
243+
tableService.initialize(initTablePlugins());
258244
LOG.info("AMS table service have been initialized");
259-
tableManager.setTableService(tableService);
260245

261246
initThriftService();
262247
startThriftService();
263248
}
264249

265-
private void addHandlerChain(RuntimeHandlerChain chain) {
266-
if (chain != null) {
267-
tableService.addHandlerChain(chain);
268-
}
250+
private List<TableRuntimePlugin> initTablePlugins() {
251+
LOG.info("Setting up AMS table executors...");
252+
InlineTableExecutors.getInstance().setup(tableService, serviceConfig);
253+
IcebergTablePlugin icebergTablePlugin =
254+
IcebergTablePlugin.builder()
255+
.addHandler(optimizingService.getTableRuntimeHandler())
256+
.addHandler(processService.getTableHandlerChain())
257+
.addHandler(InlineTableExecutors.getInstance().getDataExpiringExecutor())
258+
.addHandler(InlineTableExecutors.getInstance().getSnapshotsExpiringExecutor())
259+
.addHandler(InlineTableExecutors.getInstance().getOrphanFilesCleaningExecutor())
260+
.addHandler(InlineTableExecutors.getInstance().getDanglingDeleteFilesCleaningExecutor())
261+
.addHandler(InlineTableExecutors.getInstance().getOptimizingCommitExecutor())
262+
.addHandler(InlineTableExecutors.getInstance().getOptimizingExpiringExecutor())
263+
.addHandler(InlineTableExecutors.getInstance().getBlockerExpiringExecutor())
264+
.addHandler(InlineTableExecutors.getInstance().getHiveCommitSyncExecutor())
265+
.addHandler(InlineTableExecutors.getInstance().getTableRefreshingExecutor())
266+
.addHandler(InlineTableExecutors.getInstance().getTagsAutoCreatingExecutor())
267+
.build();
268+
return List.of(icebergTablePlugin);
269269
}
270270

271271
public void disposeOptimizingService() {

amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableManager.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ public void dropTableMetadata(TableIdentifier tableIdentifier, boolean deleteDat
114114
}
115115

116116
ServerTableIdentifier serverTableIdentifier = internalCatalog.dropTable(database, table);
117-
tableService().ifPresent(s -> s.onTableDropped(internalCatalog, serverTableIdentifier));
117+
tableService().ifPresent(s -> s.removeTable(serverTableIdentifier));
118118
}
119119

120120
@Override
@@ -128,7 +128,7 @@ public void createTable(String catalogName, TableMetadata tableMetadata) {
128128
}
129129

130130
TableMetadata metadata = catalog.createTable(tableMetadata);
131-
tableService().ifPresent(s -> s.onTableCreated(catalog, metadata.getTableIdentifier()));
131+
tableService().ifPresent(s -> s.addTable(metadata));
132132
}
133133

134134
@Override

amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableRuntimeFactory.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public String name() {
4242
}
4343

4444
@Override
45-
public Optional<TableRuntimeCreator> accept(
45+
public Optional<Creator> accept(
4646
ServerTableIdentifier tableIdentifier, Map<String, String> tableProperties) {
4747
if (tableIdentifier
4848
.getFormat()
@@ -52,7 +52,7 @@ public Optional<TableRuntimeCreator> accept(
5252
return Optional.empty();
5353
}
5454

55-
private static class TableRuntimeCreatorImpl implements TableRuntimeFactory.TableRuntimeCreator {
55+
private static class TableRuntimeCreatorImpl implements TableRuntimeFactory.Creator {
5656
@Override
5757
public List<StateKey<?>> requiredStateKeys() {
5858
return DefaultTableRuntime.REQUIRED_STATES;

0 commit comments

Comments
 (0)