Skip to content

Commit bc8713c

Browse files
committed
[server] Coordinator Server Supports High-Available
1 parent c49b848 commit bc8713c

35 files changed

Lines changed: 679 additions & 108 deletions

fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -392,6 +392,12 @@ public class ConfigOptions {
392392
+ " (“50100,50101”), ranges (“50100-50200”) or a combination of both."
393393
+ "This option is deprecated. Please use bind.listeners instead, which provides a more flexible configuration for multiple ports");
394394

395+
public static final ConfigOption<Integer> COORDINATOR_ID =
396+
key("coordinator.id")
397+
.intType()
398+
.noDefaultValue()
399+
.withDescription("The id for the coordinator server.");
400+
395401
/**
396402
* @deprecated This option is deprecated. Please use {@link ConfigOptions#SERVER_IO_POOL_SIZE}
397403
* instead.
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.fluss.exception;
20+
21+
/** Exception thrown when the Coordinator leader epoch is fenced. */
22+
public class CoordinatorEpochFencedException extends RuntimeException {
23+
public CoordinatorEpochFencedException(String message) {
24+
super(message);
25+
}
26+
}

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ public class CoordinatorContext {
5555
private static final Logger LOG = LoggerFactory.getLogger(CoordinatorContext.class);
5656

5757
public static final int INITIAL_COORDINATOR_EPOCH = 0;
58+
public static final int INITIAL_COORDINATOR_EPOCH_ZK_VERSION = 0;
5859

5960
// for simplicity, we just use retry time, may consider make it a configurable value
6061
// and use combine retry times and retry delay
@@ -109,13 +110,23 @@ public class CoordinatorContext {
109110

110111
private ServerInfo coordinatorServerInfo = null;
111112
private int coordinatorEpoch = INITIAL_COORDINATOR_EPOCH;
113+
private int coordinatorEpochZkVersion = INITIAL_COORDINATOR_EPOCH_ZK_VERSION;
112114

113115
public CoordinatorContext() {}
114116

115117
public int getCoordinatorEpoch() {
116118
return coordinatorEpoch;
117119
}
118120

121+
public int getCoordinatorEpochZkVersion() {
122+
return coordinatorEpochZkVersion;
123+
}
124+
125+
public void setCoordinatorEpochAndZkVersion(int newEpoch, int newZkVersion) {
126+
this.coordinatorEpoch = newEpoch;
127+
this.coordinatorEpochZkVersion = newZkVersion;
128+
}
129+
119130
public Set<String> getLiveCoordinatorServers() {
120131
return liveCoordinatorServers;
121132
}
@@ -709,11 +720,12 @@ private void clearTablesState() {
709720
public void resetContext() {
710721
tablesToBeDeleted.clear();
711722
coordinatorEpoch = 0;
723+
coordinatorEpochZkVersion = INITIAL_COORDINATOR_EPOCH_ZK_VERSION;
712724
clearTablesState();
713725
liveTabletServers.clear();
726+
liveCoordinatorServers.clear();
714727
shuttingDownTabletServers.clear();
715728
serverTags.clear();
716-
liveCoordinatorServers.clear();
717729
}
718730

719731
public int getTotalPartitionCount() {

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1602,7 +1602,10 @@ private void updateReplicaAssignmentForBucket(
16021602
tableAssignment.forEach(
16031603
(bucket, replicas) ->
16041604
newTableAssignment.put(bucket, new BucketAssignment(replicas)));
1605-
zooKeeperClient.updateTableAssignment(tableId, new TableAssignment(newTableAssignment));
1605+
zooKeeperClient.updateTableAssignment(
1606+
tableId,
1607+
new TableAssignment(newTableAssignment),
1608+
coordinatorContext.getCoordinatorEpochZkVersion());
16061609
} else {
16071610
Map<Integer, List<Integer>> partitionAssignment =
16081611
coordinatorContext.getPartitionAssignment(
@@ -1659,7 +1662,8 @@ private List<AdjustIsrResultForBucket> tryProcessAdjustIsr(
16591662
}
16601663

16611664
try {
1662-
zooKeeperClient.batchUpdateLeaderAndIsr(newLeaderAndIsrList);
1665+
zooKeeperClient.batchUpdateLeaderAndIsr(
1666+
newLeaderAndIsrList, coordinatorContext.getCoordinatorEpochZkVersion());
16631667
newLeaderAndIsrList.forEach(
16641668
(tableBucket, newLeaderAndIsr) ->
16651669
result.add(new AdjustIsrResultForBucket(tableBucket, newLeaderAndIsr)));
@@ -1670,7 +1674,10 @@ private List<AdjustIsrResultForBucket> tryProcessAdjustIsr(
16701674
TableBucket tableBucket = entry.getKey();
16711675
LeaderAndIsr newLeaderAndIsr = entry.getValue();
16721676
try {
1673-
zooKeeperClient.updateLeaderAndIsr(tableBucket, newLeaderAndIsr);
1677+
zooKeeperClient.updateLeaderAndIsr(
1678+
tableBucket,
1679+
newLeaderAndIsr,
1680+
coordinatorContext.getCoordinatorEpochZkVersion());
16741681
} catch (Exception e) {
16751682
LOG.error("Error when register leader and isr.", e);
16761683
result.add(
@@ -2196,7 +2203,8 @@ private void updateBucketEpochAndSendRequest(TableBucket tableBucket, List<Integ
21962203
LeaderAndIsr newLeaderAndIsr = leaderAndIsr.newLeaderAndIsr(leaderAndIsr.isr());
21972204

21982205
coordinatorContext.putBucketLeaderAndIsr(tableBucket, newLeaderAndIsr);
2199-
zooKeeperClient.updateLeaderAndIsr(tableBucket, newLeaderAndIsr);
2206+
zooKeeperClient.updateLeaderAndIsr(
2207+
tableBucket, newLeaderAndIsr, coordinatorContext.getCoordinatorEpochZkVersion());
22002208

22012209
coordinatorRequestBatch.newBatch();
22022210
coordinatorRequestBatch.addNotifyLeaderRequestForTabletServers(

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorLeaderElection.java

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
package org.apache.fluss.server.coordinator;
2020

21+
import org.apache.fluss.exception.CoordinatorEpochFencedException;
22+
import org.apache.fluss.server.zk.ZkEpoch;
2123
import org.apache.fluss.server.zk.ZooKeeperClient;
2224
import org.apache.fluss.server.zk.data.ZkData;
2325
import org.apache.fluss.shaded.curator5.org.apache.curator.framework.recipes.leader.LeaderLatch;
@@ -26,6 +28,7 @@
2628
import org.slf4j.Logger;
2729
import org.slf4j.LoggerFactory;
2830

31+
import java.util.Optional;
2932
import java.util.concurrent.CompletableFuture;
3033
import java.util.concurrent.ExecutorService;
3134
import java.util.concurrent.Executors;
@@ -51,6 +54,8 @@ public class CoordinatorLeaderElection implements AutoCloseable {
5154
private static final Logger LOG = LoggerFactory.getLogger(CoordinatorLeaderElection.class);
5255

5356
private final String serverId;
57+
private final ZooKeeperClient zkClient;
58+
private final CoordinatorContext coordinatorContext;
5459
private final LeaderLatch leaderLatch;
5560
private final AtomicBoolean isLeader = new AtomicBoolean(false);
5661
// Cached thread pool to run leader init/cleanup callbacks outside Curator's EventThread.
@@ -63,8 +68,11 @@ public class CoordinatorLeaderElection implements AutoCloseable {
6368
private final AtomicReference<CompletableFuture<Void>> pendingCleanup =
6469
new AtomicReference<>(CompletableFuture.completedFuture(null));
6570

66-
public CoordinatorLeaderElection(ZooKeeperClient zkClient, String serverId) {
71+
public CoordinatorLeaderElection(
72+
ZooKeeperClient zkClient, String serverId, CoordinatorContext coordinatorContext) {
6773
this.serverId = serverId;
74+
this.zkClient = zkClient;
75+
this.coordinatorContext = coordinatorContext;
6876
this.leaderLatch =
6977
new LeaderLatch(
7078
zkClient.getCuratorClient(),
@@ -122,7 +130,25 @@ public void isLeader() {
122130
e);
123131
}
124132
try {
133+
// to avoid split-brain
134+
Optional<ZkEpoch> optionalEpoch =
135+
zkClient.fenceBecomeCoordinatorLeader(serverId);
136+
optionalEpoch.ifPresent(
137+
integer ->
138+
coordinatorContext
139+
.setCoordinatorEpochAndZkVersion(
140+
optionalEpoch
141+
.get()
142+
.getCoordinatorEpoch(),
143+
optionalEpoch
144+
.get()
145+
.getCoordinatorEpochZkVersion()));
125146
initLeaderServices.run();
147+
} catch (CoordinatorEpochFencedException e) {
148+
LOG.warn(
149+
"Coordinator server {} has been fenced and not become leader successfully.",
150+
serverId);
151+
throw e;
126152
} catch (Exception e) {
127153
LOG.error(
128154
"Failed to initialize leader services for server {}",

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -683,6 +683,7 @@ private UpdateMetadataRequest buildUpdateMetadataRequest() {
683683
// tablet servers.
684684
return makeUpdateMetadataRequest(
685685
coordinatorContext.getCoordinatorServerInfo(),
686+
coordinatorContext.getCoordinatorEpoch(),
686687
new HashSet<>(coordinatorContext.getLiveTabletServers().values()),
687688
tableMetadataList,
688689
partitionMetadataList);

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@
5959
import java.util.ArrayList;
6060
import java.util.Collection;
6161
import java.util.List;
62-
import java.util.UUID;
6362
import java.util.concurrent.CompletableFuture;
6463
import java.util.concurrent.ExecutorService;
6564
import java.util.concurrent.Executors;
@@ -220,9 +219,11 @@ protected void initCoordinatorStandby() throws Exception {
220219
serverId);
221220

222221
this.zkClient = ZooKeeperUtils.startZookeeperClient(conf, this);
222+
this.coordinatorContext = new CoordinatorContext();
223223

224224
// CoordinatorLeaderElection must be created after zkClient is initialized.
225-
this.coordinatorLeaderElection = new CoordinatorLeaderElection(zkClient, serverId);
225+
this.coordinatorLeaderElection =
226+
new CoordinatorLeaderElection(zkClient, serverId, coordinatorContext);
226227

227228
this.lakeCatalogDynamicLoader = new LakeCatalogDynamicLoader(conf, pluginManager, true);
228229
this.dynamicConfigManager = new DynamicConfigManager(zkClient, conf, true);
@@ -232,7 +233,6 @@ protected void initCoordinatorStandby() throws Exception {
232233

233234
dynamicConfigManager.startup();
234235

235-
this.coordinatorContext = new CoordinatorContext();
236236
this.metadataCache = new CoordinatorMetadataCache();
237237

238238
this.authorizer = AuthorizerLoader.createAuthorizer(conf, zkClient, pluginManager);

fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import org.apache.fluss.server.zk.data.PartitionRegistration;
5555
import org.apache.fluss.server.zk.data.TableAssignment;
5656
import org.apache.fluss.server.zk.data.TableRegistration;
57+
import org.apache.fluss.server.zk.data.ZkVersion;
5758
import org.apache.fluss.shaded.zookeeper3.org.apache.zookeeper.KeeperException;
5859
import org.apache.fluss.utils.function.RunnableWithException;
5960
import org.apache.fluss.utils.function.ThrowingRunnable;
@@ -337,7 +338,9 @@ public void completeDeleteTable(long tableId) {
337338
// delete bucket assignments node, which will also delete the bucket state node,
338339
// so that all the zk nodes related to this table are deleted.
339340
rethrowIfIsNotNoNodeException(
340-
() -> zookeeperClient.deleteTableAssignment(tableId),
341+
() ->
342+
zookeeperClient.deleteTableAssignment(
343+
tableId, ZkVersion.MATCH_ANY_VERSION.getVersion()),
341344
String.format("Delete tablet assignment meta fail for table %s.", tableId));
342345
}
343346

@@ -346,7 +349,9 @@ public void completeDeletePartition(long partitionId) {
346349
// delete partition assignments node, which will also delete the bucket state node,
347350
// so that all the zk nodes related to this partition are deleted.
348351
rethrowIfIsNotNoNodeException(
349-
() -> zookeeperClient.deletePartitionAssignment(partitionId),
352+
() ->
353+
zookeeperClient.deletePartitionAssignment(
354+
partitionId, ZkVersion.MATCH_ANY_VERSION.getVersion()),
350355
String.format("Delete tablet assignment meta fail for partition %s.", partitionId));
351356
}
352357

@@ -399,7 +404,8 @@ public long createTable(
399404
long tableId = zookeeperClient.getTableIdAndIncrement();
400405
if (tableAssignment != null) {
401406
// register table assignment
402-
zookeeperClient.registerTableAssignment(tableId, tableAssignment);
407+
zookeeperClient.registerTableAssignment(
408+
tableId, tableAssignment, ZkVersion.MATCH_ANY_VERSION.getVersion());
403409
}
404410
// register the table
405411
zookeeperClient.registerTable(

fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachine.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -487,7 +487,8 @@ private Map<TableBucketReplica, LeaderAndIsr> doRemoveReplicaFromIsr(
487487
toUpdateLeaderAndIsrList.put(tableBucket, adjustLeaderAndIsr);
488488
}
489489
try {
490-
zooKeeperClient.batchUpdateLeaderAndIsr(toUpdateLeaderAndIsrList);
490+
zooKeeperClient.batchUpdateLeaderAndIsr(
491+
toUpdateLeaderAndIsrList, coordinatorContext.getCoordinatorEpochZkVersion());
491492
toUpdateLeaderAndIsrList.forEach(coordinatorContext::putBucketLeaderAndIsr);
492493
return adjustedLeaderAndIsr;
493494
} catch (Exception e) {

fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachine.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -311,7 +311,10 @@ private Optional<ElectionResult> initLeaderForTableBuckets(
311311
ElectionResult electionResult = optionalElectionResult.get();
312312
LeaderAndIsr leaderAndIsr = electionResult.leaderAndIsr;
313313
try {
314-
zooKeeperClient.registerLeaderAndIsr(tableBucket, leaderAndIsr);
314+
zooKeeperClient.registerLeaderAndIsr(
315+
tableBucket,
316+
leaderAndIsr,
317+
coordinatorContext.getCoordinatorEpochZkVersion());
315318
} catch (Exception e) {
316319
LOG.error(
317320
"Fail to create state node for table bucket {} in zookeeper.",
@@ -379,7 +382,8 @@ public void batchHandleOnlineChangeAndInitLeader(Set<TableBucket> tableBuckets)
379382
if (!tableBucketLeadAndIsrInfos.isEmpty()) {
380383
try {
381384
zooKeeperClient.batchRegisterLeaderAndIsrForTablePartition(
382-
tableBucketLeadAndIsrInfos);
385+
tableBucketLeadAndIsrInfos,
386+
coordinatorContext.getCoordinatorEpochZkVersion());
383387
registerSuccessList.addAll(tableBucketLeadAndIsrInfos);
384388
} catch (Exception e) {
385389
LOG.error(
@@ -454,7 +458,10 @@ private List<RegisterTableBucketLeadAndIsrInfo> tryRegisterLeaderAndIsrOneByOne(
454458
List<RegisterTableBucketLeadAndIsrInfo> registerSuccessList = new ArrayList<>();
455459
for (RegisterTableBucketLeadAndIsrInfo info : registerList) {
456460
try {
457-
zooKeeperClient.registerLeaderAndIsr(info.getTableBucket(), info.getLeaderAndIsr());
461+
zooKeeperClient.registerLeaderAndIsr(
462+
info.getTableBucket(),
463+
info.getLeaderAndIsr(),
464+
coordinatorContext.getCoordinatorEpochZkVersion());
458465
registerSuccessList.add(info);
459466
} catch (Exception e) {
460467
LOG.error(
@@ -496,7 +503,10 @@ private Optional<ElectionResult> electNewLeaderForTableBuckets(
496503
}
497504
ElectionResult electionResult = optionalElectionResult.get();
498505
try {
499-
zooKeeperClient.updateLeaderAndIsr(tableBucket, electionResult.leaderAndIsr);
506+
zooKeeperClient.updateLeaderAndIsr(
507+
tableBucket,
508+
electionResult.leaderAndIsr,
509+
coordinatorContext.getCoordinatorEpochZkVersion());
500510
} catch (Exception e) {
501511
LOG.error(
502512
"Fail to update bucket LeaderAndIsr for table bucket {}.",

0 commit comments

Comments
 (0)