Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,27 @@ public class AmoroManagementConf {
.defaultValue(java.time.Duration.ofSeconds(30))
.withDescription("TTL of HA lease.");

public static final ConfigOption<Integer> HA_BUCKET_ID_TOTAL_COUNT =
ConfigOptions.key("ha.bucket-id.total-count")
.intType()
.defaultValue(100)
.withDescription(
"Total count of bucket IDs for assignment. Bucket IDs range from 1 to this value.");

public static final ConfigOption<Duration> HA_NODE_OFFLINE_TIMEOUT =
ConfigOptions.key("ha.node-offline.timeout")
.durationType()
.defaultValue(Duration.ofMinutes(5))
.withDescription(
"Timeout duration to determine if a node is offline. After this duration, the node's bucket IDs will be reassigned.");

public static final ConfigOption<Duration> HA_ASSIGN_INTERVAL =
ConfigOptions.key("ha.bucket-assign.interval")
.durationType()
.defaultValue(Duration.ofSeconds(60))
.withDescription(
"Interval for bucket assignment service to detect node changes and redistribute bucket IDs.");

public static final ConfigOption<Integer> TABLE_SERVICE_THRIFT_BIND_PORT =
ConfigOptions.key("thrift-server.table-service.bind-port")
.intType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ public class AmoroServiceContainer {
private Javalin httpServer;
private AmsServiceMetrics amsServiceMetrics;
private HAState haState = HAState.INITIALIZING;
private AmsAssignService amsAssignService;

public AmoroServiceContainer() throws Exception {
initConfig();
Expand Down Expand Up @@ -240,6 +241,20 @@ public void startOptimizingService() throws Exception {

DefaultTableRuntimeFactory defaultRuntimeFactory = new DefaultTableRuntimeFactory();
defaultRuntimeFactory.initialize(processFactories);
// In master-slave mode, create AmsAssignService for bucket assignment
if (IS_MASTER_SLAVE_MODE && haContainer != null) {
try {
// Create and start AmsAssignService for bucket assignment
// The factory will handle different HA types (ZK, database, etc.)
amsAssignService = new AmsAssignService(haContainer, serviceConfig);
amsAssignService.start();
LOG.info("AmsAssignService started for master-slave mode");
} catch (UnsupportedOperationException e) {
LOG.info("Skip AmsAssignService: {}", e.getMessage());
} catch (Exception e) {
LOG.error("Failed to start AmsAssignService", e);
}
}

List<ActionCoordinator> actionCoordinators = defaultRuntimeFactory.supportedCoordinators();
ExecuteEngineManager executeEngineManager = new ExecuteEngineManager();
Expand Down Expand Up @@ -287,6 +302,11 @@ public void disposeOptimizingService() {
LOG.info("Stopping optimizing server[serving:{}] ...", optimizingServiceServer.isServing());
optimizingServiceServer.stop();
}
if (amsAssignService != null) {
LOG.info("Stopping AmsAssignService...");
amsAssignService.stop();
amsAssignService = null;
}
if (tableService != null) {
LOG.info("Stopping table service...");
tableService.dispose();
Expand Down
Loading
Loading