7171import org .apache .fluss .server .coordinator .event .CoordinatorEventManager ;
7272import org .apache .fluss .server .coordinator .event .CreatePartitionEvent ;
7373import org .apache .fluss .server .coordinator .event .CreateTableEvent ;
74+ import org .apache .fluss .server .coordinator .event .DeadCoordinatorEvent ;
7475import org .apache .fluss .server .coordinator .event .DeadTabletServerEvent ;
7576import org .apache .fluss .server .coordinator .event .DeleteReplicaResponseReceivedEvent ;
7677import org .apache .fluss .server .coordinator .event .DropPartitionEvent ;
7778import org .apache .fluss .server .coordinator .event .DropTableEvent ;
7879import org .apache .fluss .server .coordinator .event .EventProcessor ;
7980import org .apache .fluss .server .coordinator .event .FencedCoordinatorEvent ;
8081import org .apache .fluss .server .coordinator .event .ListRebalanceProgressEvent ;
82+ import org .apache .fluss .server .coordinator .event .NewCoordinatorEvent ;
8183import org .apache .fluss .server .coordinator .event .NewTabletServerEvent ;
8284import org .apache .fluss .server .coordinator .event .NotifyKvSnapshotOffsetEvent ;
8385import org .apache .fluss .server .coordinator .event .NotifyLakeTableOffsetEvent ;
8688import org .apache .fluss .server .coordinator .event .RemoveServerTagEvent ;
8789import org .apache .fluss .server .coordinator .event .SchemaChangeEvent ;
8890import org .apache .fluss .server .coordinator .event .TableRegistrationChangeEvent ;
91+ import org .apache .fluss .server .coordinator .event .watcher .CoordinatorChangeWatcher ;
8992import org .apache .fluss .server .coordinator .event .watcher .TableChangeWatcher ;
9093import org .apache .fluss .server .coordinator .event .watcher .TabletServerChangeWatcher ;
9194import org .apache .fluss .server .coordinator .lease .KvSnapshotLeaseManager ;
128131
129132import java .time .Duration ;
130133import java .util .ArrayList ;
134+ import java .util .Arrays ;
131135import java .util .Collections ;
132136import java .util .HashMap ;
133137import java .util .HashSet ;
@@ -172,6 +176,7 @@ public class CoordinatorEventProcessor implements EventProcessor {
172176 private final LakeTableTieringManager lakeTableTieringManager ;
173177 private final TableChangeWatcher tableChangeWatcher ;
174178 private final CoordinatorChannelManager coordinatorChannelManager ;
179+ private final CoordinatorChangeWatcher coordinatorChangeWatcher ;
175180 private final TabletServerChangeWatcher tabletServerChangeWatcher ;
176181 private final CoordinatorMetadataCache serverMetadataCache ;
177182 private final CoordinatorRequestBatch coordinatorRequestBatch ;
@@ -224,6 +229,8 @@ public CoordinatorEventProcessor(
224229 tableBucketStateMachine ,
225230 new RemoteStorageCleaner (conf , ioExecutor ),
226231 ioExecutor );
232+ this .coordinatorChangeWatcher =
233+ new CoordinatorChangeWatcher (zooKeeperClient , coordinatorEventManager );
227234 this .tableChangeWatcher = new TableChangeWatcher (zooKeeperClient , coordinatorEventManager );
228235 this .tabletServerChangeWatcher =
229236 new TabletServerChangeWatcher (zooKeeperClient , coordinatorEventManager );
@@ -263,6 +270,7 @@ public CoordinatorContext getCoordinatorContext() {
263270 public void startup () {
264271 coordinatorContext .setCoordinatorServerInfo (getCoordinatorServerInfo ());
265272 // start watchers first so that we won't miss node in zk;
273+ coordinatorChangeWatcher .start ();
266274 tabletServerChangeWatcher .start ();
267275 tableChangeWatcher .start ();
268276 LOG .info ("Initializing coordinator context." );
@@ -306,14 +314,11 @@ public void shutdown() {
306314 private ServerInfo getCoordinatorServerInfo () {
307315 try {
308316 return zooKeeperClient
309- .getCoordinatorAddress ()
317+ .getCoordinatorLeaderAddress ()
310318 .map (
311319 coordinatorAddress ->
312- // TODO we set id to 0 as that CoordinatorServer don't support
313- // HA, if we support HA, we need to set id to the config
314- // CoordinatorServer id to avoid node drift.
315320 new ServerInfo (
316- 0 ,
321+ coordinatorAddress . getId () ,
317322 null , // For coordinatorServer, no rack info
318323 coordinatorAddress .getEndpoints (),
319324 ServerType .COORDINATOR ))
@@ -334,6 +339,12 @@ public int getCoordinatorEpoch() {
334339
335340 private void initCoordinatorContext () throws Exception {
336341 long start = System .currentTimeMillis ();
342+ // get all coordinator servers
343+ int [] currentCoordinatorServers = zooKeeperClient .getCoordinatorServerList ();
344+ coordinatorContext .setLiveCoordinators (
345+ Arrays .stream (currentCoordinatorServers ).boxed ().collect (Collectors .toSet ()));
346+ LOG .info ("Load coordinator servers success when initializing coordinator context." );
347+
337348 // get all tablet server's
338349 int [] currentServers = zooKeeperClient .getSortedTabletServerList ();
339350 List <ServerInfo > tabletServerInfos = new ArrayList <>();
@@ -548,6 +559,7 @@ private void onShutdown() {
548559 tableManager .shutdown ();
549560
550561 // then stop watchers
562+ coordinatorChangeWatcher .stop ();
551563 tableChangeWatcher .stop ();
552564 tabletServerChangeWatcher .stop ();
553565 }
@@ -572,6 +584,10 @@ public void process(CoordinatorEvent event) {
572584 (NotifyLeaderAndIsrResponseReceivedEvent ) event );
573585 } else if (event instanceof DeleteReplicaResponseReceivedEvent ) {
574586 processDeleteReplicaResponseReceived ((DeleteReplicaResponseReceivedEvent ) event );
587+ } else if (event instanceof NewCoordinatorEvent ) {
588+ processNewCoordinator ((NewCoordinatorEvent ) event );
589+ } else if (event instanceof DeadCoordinatorEvent ) {
590+ processDeadCoordinator ((DeadCoordinatorEvent ) event );
575591 } else if (event instanceof NewTabletServerEvent ) {
576592 processNewTabletServer ((NewTabletServerEvent ) event );
577593 } else if (event instanceof DeadTabletServerEvent ) {
@@ -983,6 +999,28 @@ private void onReplicaBecomeOffline(Set<TableBucketReplica> offlineReplicas) {
983999 replicaStateMachine .handleStateChanges (offlineReplicas , OfflineReplica );
9841000 }
9851001
1002+ private void processNewCoordinator (NewCoordinatorEvent newCoordinatorEvent ) {
1003+ int coordinatorServerId = newCoordinatorEvent .getServerId ();
1004+ if (coordinatorContext .getLiveCoordinatorServers ().contains (coordinatorServerId )) {
1005+ return ;
1006+ }
1007+
1008+ // process new coordinator server
1009+ LOG .info ("New coordinator server callback for coordinator server {}" , coordinatorServerId );
1010+
1011+ coordinatorContext .addLiveCoordinator (coordinatorServerId );
1012+ }
1013+
1014+ private void processDeadCoordinator (DeadCoordinatorEvent deadCoordinatorEvent ) {
1015+ int coordinatorServerId = deadCoordinatorEvent .getServerId ();
1016+ if (!coordinatorContext .getLiveCoordinatorServers ().contains (coordinatorServerId )) {
1017+ return ;
1018+ }
1019+ // process dead coordinator server
1020+ LOG .info ("Coordinator server failure callback for {}." , coordinatorServerId );
1021+ coordinatorContext .removeLiveCoordinator (coordinatorServerId );
1022+ }
1023+
9861024 private void processNewTabletServer (NewTabletServerEvent newTabletServerEvent ) {
9871025 // NOTE: we won't need to detect bounced tablet servers like Kafka as we won't
9881026 // miss the event of tablet server un-register and register again since we can
@@ -1567,7 +1605,10 @@ private void updateReplicaAssignmentForBucket(
15671605 tableAssignment .forEach (
15681606 (bucket , replicas ) ->
15691607 newTableAssignment .put (bucket , new BucketAssignment (replicas )));
1570- zooKeeperClient .updateTableAssignment (tableId , new TableAssignment (newTableAssignment ));
1608+ zooKeeperClient .updateTableAssignment (
1609+ tableId ,
1610+ new TableAssignment (newTableAssignment ),
1611+ coordinatorContext .getCoordinatorEpochZkVersion ());
15711612 } else {
15721613 Map <Integer , List <Integer >> partitionAssignment =
15731614 coordinatorContext .getPartitionAssignment (
@@ -1624,7 +1665,8 @@ private List<AdjustIsrResultForBucket> tryProcessAdjustIsr(
16241665 }
16251666
16261667 try {
1627- zooKeeperClient .batchUpdateLeaderAndIsr (newLeaderAndIsrList );
1668+ zooKeeperClient .batchUpdateLeaderAndIsr (
1669+ newLeaderAndIsrList , coordinatorContext .getCoordinatorEpochZkVersion ());
16281670 newLeaderAndIsrList .forEach (
16291671 (tableBucket , newLeaderAndIsr ) ->
16301672 result .add (new AdjustIsrResultForBucket (tableBucket , newLeaderAndIsr )));
@@ -1635,7 +1677,10 @@ private List<AdjustIsrResultForBucket> tryProcessAdjustIsr(
16351677 TableBucket tableBucket = entry .getKey ();
16361678 LeaderAndIsr newLeaderAndIsr = entry .getValue ();
16371679 try {
1638- zooKeeperClient .updateLeaderAndIsr (tableBucket , newLeaderAndIsr );
1680+ zooKeeperClient .updateLeaderAndIsr (
1681+ tableBucket ,
1682+ newLeaderAndIsr ,
1683+ coordinatorContext .getCoordinatorEpochZkVersion ());
16391684 } catch (Exception e ) {
16401685 LOG .error ("Error when register leader and isr." , e );
16411686 result .add (
@@ -2161,7 +2206,8 @@ private void updateBucketEpochAndSendRequest(TableBucket tableBucket, List<Integ
21612206 LeaderAndIsr newLeaderAndIsr = leaderAndIsr .newLeaderAndIsr (leaderAndIsr .isr ());
21622207
21632208 coordinatorContext .putBucketLeaderAndIsr (tableBucket , newLeaderAndIsr );
2164- zooKeeperClient .updateLeaderAndIsr (tableBucket , newLeaderAndIsr );
2209+ zooKeeperClient .updateLeaderAndIsr (
2210+ tableBucket , newLeaderAndIsr , coordinatorContext .getCoordinatorEpochZkVersion ());
21652211
21662212 coordinatorRequestBatch .newBatch ();
21672213 coordinatorRequestBatch .addNotifyLeaderRequestForTabletServers (
0 commit comments