Skip to content

Commit bbe55c6

Browse files
committed
[server] Update leader election process and improve coordinator epoch handling
1 parent cb20334 commit bbe55c6

17 files changed

Lines changed: 120 additions & 151 deletions

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.fluss.server.coordinator.statemachine.BucketState;
2929
import org.apache.fluss.server.coordinator.statemachine.ReplicaState;
3030
import org.apache.fluss.server.metadata.ServerInfo;
31+
import org.apache.fluss.server.zk.ZkEpoch;
3132
import org.apache.fluss.server.zk.data.LeaderAndIsr;
3233
import org.apache.fluss.utils.types.Tuple2;
3334

@@ -108,25 +109,33 @@ public class CoordinatorContext {
108109
/** A mapping from tabletServers to server tag. */
109110
private final Map<Integer, ServerTag> serverTags = new HashMap<>();
110111

112+
/**
113+
* The epoch of the coordinator, which will be incremented whenever the coordinator is elected
114+
* or re-elected.
115+
*/
116+
private final int coordinatorEpoch;
117+
118+
/**
119+
* The coordinator epoch zk version is the zk version when the coordinator epoch is updated in
120+
* Zookeeper.
121+
*/
122+
private final int coordinatorEpochZkVersion;
123+
111124
private ServerInfo coordinatorServerInfo = null;
112-
private int coordinatorEpoch = INITIAL_COORDINATOR_EPOCH;
113-
private int coordinatorEpochZkVersion = INITIAL_COORDINATOR_EPOCH_ZK_VERSION;
114125

115-
public CoordinatorContext() {}
126+
public CoordinatorContext(ZkEpoch zkEpoch) {
127+
this.coordinatorEpoch = zkEpoch.getCoordinatorEpoch();
128+
this.coordinatorEpochZkVersion = zkEpoch.getCoordinatorEpochZkVersion();
129+
}
116130

117131
public int getCoordinatorEpoch() {
118132
return coordinatorEpoch;
119133
}
120134

121-
public int getCoordinatorEpochZkVersion() {
135+
public int getCoordinatorZkVersion() {
122136
return coordinatorEpochZkVersion;
123137
}
124138

125-
public void setCoordinatorEpochAndZkVersion(int newEpoch, int newZkVersion) {
126-
this.coordinatorEpoch = newEpoch;
127-
this.coordinatorEpochZkVersion = newZkVersion;
128-
}
129-
130139
public Set<String> getLiveCoordinatorServers() {
131140
return liveCoordinatorServers;
132141
}
@@ -719,8 +728,6 @@ private void clearTablesState() {
719728

720729
public void resetContext() {
721730
tablesToBeDeleted.clear();
722-
coordinatorEpoch = INITIAL_COORDINATOR_EPOCH;
723-
coordinatorEpochZkVersion = INITIAL_COORDINATOR_EPOCH_ZK_VERSION;
724731
clearTablesState();
725732
liveTabletServers.clear();
726733
liveCoordinatorServers.clear();

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,6 @@
108108
import org.apache.fluss.server.metadata.ServerInfo;
109109
import org.apache.fluss.server.metrics.group.CoordinatorMetricGroup;
110110
import org.apache.fluss.server.utils.ServerRpcMessageUtils;
111-
import org.apache.fluss.server.zk.ZkEpoch;
112111
import org.apache.fluss.server.zk.ZooKeeperClient;
113112
import org.apache.fluss.server.zk.data.BucketAssignment;
114113
import org.apache.fluss.server.zk.data.LeaderAndIsr;
@@ -189,7 +188,6 @@ public class CoordinatorEventProcessor implements EventProcessor {
189188

190189
public CoordinatorEventProcessor(
191190
ZooKeeperClient zooKeeperClient,
192-
ZkEpoch zkEpoch,
193191
CoordinatorMetadataCache serverMetadataCache,
194192
CoordinatorChannelManager coordinatorChannelManager,
195193
CoordinatorContext coordinatorContext,
@@ -255,9 +253,6 @@ public CoordinatorEventProcessor(
255253
this.ioExecutor = ioExecutor;
256254
this.lakeTableHelper =
257255
new LakeTableHelper(zooKeeperClient, conf.getString(ConfigOptions.REMOTE_DATA_DIR));
258-
259-
this.coordinatorContext.setCoordinatorEpochAndZkVersion(
260-
zkEpoch.getCoordinatorEpoch(), zkEpoch.getCoordinatorEpochZkVersion());
261256
}
262257

263258
public CoordinatorEventManager getCoordinatorEventManager() {
@@ -314,6 +309,7 @@ public void shutdown() {
314309
coordinatorEventManager.close();
315310
rebalanceManager.close();
316311
onShutdown();
312+
coordinatorContext.resetContext();
317313
}
318314

319315
private ServerInfo getCoordinatorServerInfo() {
@@ -1626,7 +1622,7 @@ private void updateReplicaAssignmentForBucket(
16261622
zooKeeperClient.updateTableAssignment(
16271623
tableId,
16281624
new TableAssignment(newTableAssignment),
1629-
coordinatorContext.getCoordinatorEpochZkVersion());
1625+
coordinatorContext.getCoordinatorZkVersion());
16301626
} else {
16311627
Map<Integer, List<Integer>> partitionAssignment =
16321628
coordinatorContext.getPartitionAssignment(
@@ -1639,7 +1635,7 @@ private void updateReplicaAssignmentForBucket(
16391635
zooKeeperClient.updatePartitionAssignment(
16401636
partitionId,
16411637
new PartitionAssignment(tableId, newPartitionAssignment),
1642-
coordinatorContext.getCoordinatorEpochZkVersion());
1638+
coordinatorContext.getCoordinatorZkVersion());
16431639
}
16441640
}
16451641

@@ -1686,7 +1682,7 @@ private List<AdjustIsrResultForBucket> tryProcessAdjustIsr(
16861682

16871683
try {
16881684
zooKeeperClient.batchUpdateLeaderAndIsr(
1689-
newLeaderAndIsrList, coordinatorContext.getCoordinatorEpochZkVersion());
1685+
newLeaderAndIsrList, coordinatorContext.getCoordinatorZkVersion());
16901686
newLeaderAndIsrList.forEach(
16911687
(tableBucket, newLeaderAndIsr) ->
16921688
result.add(new AdjustIsrResultForBucket(tableBucket, newLeaderAndIsr)));
@@ -1700,7 +1696,7 @@ private List<AdjustIsrResultForBucket> tryProcessAdjustIsr(
17001696
zooKeeperClient.updateLeaderAndIsr(
17011697
tableBucket,
17021698
newLeaderAndIsr,
1703-
coordinatorContext.getCoordinatorEpochZkVersion());
1699+
coordinatorContext.getCoordinatorZkVersion());
17041700
} catch (Exception e) {
17051701
LOG.error("Error when register leader and isr.", e);
17061702
result.add(
@@ -2227,7 +2223,7 @@ private void updateBucketEpochAndSendRequest(TableBucket tableBucket, List<Integ
22272223

22282224
coordinatorContext.putBucketLeaderAndIsr(tableBucket, newLeaderAndIsr);
22292225
zooKeeperClient.updateLeaderAndIsr(
2230-
tableBucket, newLeaderAndIsr, coordinatorContext.getCoordinatorEpochZkVersion());
2226+
tableBucket, newLeaderAndIsr, coordinatorContext.getCoordinatorZkVersion());
22312227

22322228
coordinatorRequestBatch.newBatch();
22332229
coordinatorRequestBatch.addNotifyLeaderRequestForTabletServers(

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorLeaderElection.java

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,6 @@ public class CoordinatorLeaderElection implements AutoCloseable {
5252
private static final Logger LOG = LoggerFactory.getLogger(CoordinatorLeaderElection.class);
5353

5454
private final String serverId;
55-
private final ZooKeeperClient zkClient;
56-
private final CoordinatorContext coordinatorContext;
5755
private final LeaderLatch leaderLatch;
5856
private final AtomicBoolean isLeader = new AtomicBoolean(false);
5957
// Cached thread pool to run leader init/cleanup callbacks outside Curator's EventThread.
@@ -66,11 +64,8 @@ public class CoordinatorLeaderElection implements AutoCloseable {
6664
private final AtomicReference<CompletableFuture<Void>> pendingCleanup =
6765
new AtomicReference<>(CompletableFuture.completedFuture(null));
6866

69-
public CoordinatorLeaderElection(
70-
ZooKeeperClient zkClient, String serverId, CoordinatorContext coordinatorContext) {
67+
public CoordinatorLeaderElection(ZooKeeperClient zkClient, String serverId) {
7168
this.serverId = serverId;
72-
this.zkClient = zkClient;
73-
this.coordinatorContext = coordinatorContext;
7469
this.leaderLatch =
7570
new LeaderLatch(
7671
zkClient.getCuratorClient(),
@@ -113,10 +108,6 @@ public void isLeader() {
113108
// Run init on a separate thread to avoid deadlock with
114109
// Curator's EventThread when performing ZK operations.
115110

116-
// Set leader flag before init completes, so when zk found this leader, the
117-
// leader can accept requests
118-
isLeader.set(true);
119-
120111
leaderCallbackExecutor.execute(
121112
() -> {
122113
// Wait for any pending cleanup to finish first.
@@ -134,6 +125,9 @@ public void isLeader() {
134125
}
135126
try {
136127
initLeaderServices.run();
128+
// Set leader flag after init completes, so when zk found
129+
// this leader, the leader can accept requests
130+
isLeader.set(true);
137131
} catch (CoordinatorEpochFencedException e) {
138132
LOG.warn(
139133
"Coordinator server {} has been fenced and not become leader successfully.",

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java

Lines changed: 6 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -141,9 +141,6 @@ public class CoordinatorServer extends ServerBase {
141141
@Nullable
142142
private Authorizer authorizer;
143143

144-
@GuardedBy("lock")
145-
private CoordinatorContext coordinatorContext;
146-
147144
@GuardedBy("lock")
148145
private DynamicConfigManager dynamicConfigManager;
149146

@@ -221,11 +218,9 @@ protected void initCoordinatorStandby() throws Exception {
221218
serverId);
222219

223220
this.zkClient = ZooKeeperUtils.startZookeeperClient(conf, this);
224-
this.coordinatorContext = new CoordinatorContext();
225221

226222
// CoordinatorLeaderElection must be created after zkClient is initialized.
227-
this.coordinatorLeaderElection =
228-
new CoordinatorLeaderElection(zkClient, serverId, coordinatorContext);
223+
this.coordinatorLeaderElection = new CoordinatorLeaderElection(zkClient, serverId);
229224

230225
this.lakeCatalogDynamicLoader = new LakeCatalogDynamicLoader(conf, pluginManager, true);
231226
this.dynamicConfigManager = new DynamicConfigManager(zkClient, conf, true);
@@ -298,8 +293,8 @@ protected void initCoordinatorStandby() throws Exception {
298293
protected void initCoordinatorLeader() throws Exception {
299294
// to avoid split-brain
300295
ZkEpoch zkEpoch = zkClient.fenceBecomeCoordinatorLeader(serverId);
301-
302296
registerCoordinatorLeader();
297+
303298
synchronized (lock) {
304299
this.clientMetricGroup = new ClientMetricGroup(metricRegistry, SERVER_NAME);
305300
this.rpcClient = RpcClient.create(conf, clientMetricGroup);
@@ -311,14 +306,13 @@ protected void initCoordinatorLeader() throws Exception {
311306
autoPartitionManager.start();
312307

313308
// start coordinator event processor after we register coordinator leader to zk
314-
// so that the event processor can get the coordinator leader node from zk during start
315-
// up.
316-
// in HA for coordinator server, the processor also need to know the leader node during
317-
// start up
309+
// so that the event processor can get the coordinator leader node from zk during
310+
// start up. In HA for coordinator server, the processor also need to know the leader
311+
// node during start up
312+
CoordinatorContext coordinatorContext = new CoordinatorContext(zkEpoch);
318313
this.coordinatorEventProcessor =
319314
new CoordinatorEventProcessor(
320315
zkClient,
321-
zkEpoch,
322316
metadataCache,
323317
coordinatorChannelManager,
324318
coordinatorContext,
@@ -402,11 +396,6 @@ protected void cleanupCoordinatorLeader() {
402396
LOG.warn("Failed to close client metric group", t);
403397
}
404398

405-
// Reset coordinator context for next election
406-
if (coordinatorContext != null) {
407-
coordinatorContext.resetContext();
408-
}
409-
410399
LOG.info("Coordinator leader services cleaned up successfully.");
411400
}
412401
}
@@ -601,15 +590,6 @@ CompletableFuture<Void> stopServices() {
601590
exception = ExceptionUtils.firstOrSuppressed(t, exception);
602591
}
603592

604-
try {
605-
if (coordinatorContext != null) {
606-
// then reset coordinatorContext
607-
coordinatorContext.resetContext();
608-
}
609-
} catch (Throwable t) {
610-
exception = ExceptionUtils.firstOrSuppressed(t, exception);
611-
}
612-
613593
try {
614594
if (lakeTableTieringManager != null) {
615595
lakeTableTieringManager.close();

fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachine.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -488,7 +488,7 @@ private Map<TableBucketReplica, LeaderAndIsr> doRemoveReplicaFromIsr(
488488
}
489489
try {
490490
zooKeeperClient.batchUpdateLeaderAndIsr(
491-
toUpdateLeaderAndIsrList, coordinatorContext.getCoordinatorEpochZkVersion());
491+
toUpdateLeaderAndIsrList, coordinatorContext.getCoordinatorZkVersion());
492492
toUpdateLeaderAndIsrList.forEach(coordinatorContext::putBucketLeaderAndIsr);
493493
return adjustedLeaderAndIsr;
494494
} catch (Exception e) {

fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachine.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -312,9 +312,7 @@ private Optional<ElectionResult> initLeaderForTableBuckets(
312312
LeaderAndIsr leaderAndIsr = electionResult.leaderAndIsr;
313313
try {
314314
zooKeeperClient.registerLeaderAndIsr(
315-
tableBucket,
316-
leaderAndIsr,
317-
coordinatorContext.getCoordinatorEpochZkVersion());
315+
tableBucket, leaderAndIsr, coordinatorContext.getCoordinatorZkVersion());
318316
} catch (Exception e) {
319317
LOG.error(
320318
"Fail to create state node for table bucket {} in zookeeper.",
@@ -382,8 +380,7 @@ public void batchHandleOnlineChangeAndInitLeader(Set<TableBucket> tableBuckets)
382380
if (!tableBucketLeadAndIsrInfos.isEmpty()) {
383381
try {
384382
zooKeeperClient.batchRegisterLeaderAndIsrForTablePartition(
385-
tableBucketLeadAndIsrInfos,
386-
coordinatorContext.getCoordinatorEpochZkVersion());
383+
tableBucketLeadAndIsrInfos, coordinatorContext.getCoordinatorZkVersion());
387384
registerSuccessList.addAll(tableBucketLeadAndIsrInfos);
388385
} catch (Exception e) {
389386
LOG.error(
@@ -461,7 +458,7 @@ private List<RegisterTableBucketLeadAndIsrInfo> tryRegisterLeaderAndIsrOneByOne(
461458
zooKeeperClient.registerLeaderAndIsr(
462459
info.getTableBucket(),
463460
info.getLeaderAndIsr(),
464-
coordinatorContext.getCoordinatorEpochZkVersion());
461+
coordinatorContext.getCoordinatorZkVersion());
465462
registerSuccessList.add(info);
466463
} catch (Exception e) {
467464
LOG.error(
@@ -506,7 +503,7 @@ private Optional<ElectionResult> electNewLeaderForTableBuckets(
506503
zooKeeperClient.updateLeaderAndIsr(
507504
tableBucket,
508505
electionResult.leaderAndIsr,
509-
coordinatorContext.getCoordinatorEpochZkVersion());
506+
coordinatorContext.getCoordinatorZkVersion());
510507
} catch (Exception e) {
511508
LOG.error(
512509
"Fail to update bucket LeaderAndIsr for table bucket {}.",

fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,11 @@ public int getMinInSyncReplicas() {
336336
return minInSyncReplicas;
337337
}
338338

339+
@VisibleForTesting
340+
public int getCoordinatorEpoch() {
341+
return coordinatorEpoch;
342+
}
343+
339344
// ============ ServerReconfigurable Implementation ============
340345

341346
@Override
@@ -1880,6 +1885,10 @@ private void validateAndApplyCoordinatorEpoch(int requestCoordinatorEpoch, Strin
18801885
LOG.warn("Ignore the {} request because {}", requestName, errorMessage);
18811886
throw new InvalidCoordinatorException(errorMessage);
18821887
} else {
1888+
LOG.info(
1889+
"Update coordinator epoch from {} to {} for coordinator leader switch.",
1890+
this.coordinatorEpoch,
1891+
requestCoordinatorEpoch);
18831892
this.coordinatorEpoch = requestCoordinatorEpoch;
18841893
}
18851894
}
@@ -1991,11 +2000,6 @@ public TabletServerMetricGroup getServerMetricGroup() {
19912000
return serverMetricGroup;
19922001
}
19932002

1994-
@VisibleForTesting
1995-
public void resetCoordinatorEpoch() {
1996-
this.coordinatorEpoch = CoordinatorContext.INITIAL_COORDINATOR_EPOCH;
1997-
}
1998-
19992003
/**
20002004
* Interface to represent the state of hosted {@link Replica}. We create a concrete (active)
20012005
* {@link Replica} instance when the TabletServer receives a createLogLeader request or

fluss-server/src/main/java/org/apache/fluss/server/zk/ZkEpoch.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,15 @@
1818

1919
package org.apache.fluss.server.zk;
2020

21+
import static org.apache.fluss.server.coordinator.CoordinatorContext.INITIAL_COORDINATOR_EPOCH;
22+
import static org.apache.fluss.server.coordinator.CoordinatorContext.INITIAL_COORDINATOR_EPOCH_ZK_VERSION;
23+
2124
/** Class for coordinator epoch and coordinator epoch zk version. */
2225
public class ZkEpoch {
26+
27+
public static final ZkEpoch INITIAL_EPOCH =
28+
new ZkEpoch(INITIAL_COORDINATOR_EPOCH, INITIAL_COORDINATOR_EPOCH_ZK_VERSION);
29+
2330
private final int coordinatorEpoch;
2431
private final int coordinatorEpochZkVersion;
2532

0 commit comments

Comments
 (0)