5050import com .alibaba .fluss .server .coordinator .event .CoordinatorEventManager ;
5151import com .alibaba .fluss .server .coordinator .event .CreatePartitionEvent ;
5252import com .alibaba .fluss .server .coordinator .event .CreateTableEvent ;
53+ import com .alibaba .fluss .server .coordinator .event .DeadCoordinatorServerEvent ;
5354import com .alibaba .fluss .server .coordinator .event .DeadTabletServerEvent ;
5455import com .alibaba .fluss .server .coordinator .event .DeleteReplicaResponseReceivedEvent ;
5556import com .alibaba .fluss .server .coordinator .event .DropPartitionEvent ;
5657import com .alibaba .fluss .server .coordinator .event .DropTableEvent ;
5758import com .alibaba .fluss .server .coordinator .event .EventProcessor ;
5859import com .alibaba .fluss .server .coordinator .event .FencedCoordinatorEvent ;
60+ import com .alibaba .fluss .server .coordinator .event .NewCoordinatorServerEvent ;
5961import com .alibaba .fluss .server .coordinator .event .NewTabletServerEvent ;
6062import com .alibaba .fluss .server .coordinator .event .NotifyLeaderAndIsrResponseReceivedEvent ;
63+ import com .alibaba .fluss .server .coordinator .event .watcher .CoordinatorServerChangeWatcher ;
6164import com .alibaba .fluss .server .coordinator .event .watcher .TableChangeWatcher ;
6265import com .alibaba .fluss .server .coordinator .event .watcher .TabletServerChangeWatcher ;
6366import com .alibaba .fluss .server .coordinator .statemachine .ReplicaState ;
9295import javax .annotation .concurrent .NotThreadSafe ;
9396
9497import java .util .ArrayList ;
98+ import java .util .Arrays ;
9599import java .util .Collections ;
96100import java .util .HashSet ;
97101import java .util .List ;
@@ -128,6 +132,7 @@ public class CoordinatorEventProcessor implements EventProcessor {
128132 private final LakeTableTieringManager lakeTableTieringManager ;
129133 private final TableChangeWatcher tableChangeWatcher ;
130134 private final CoordinatorChannelManager coordinatorChannelManager ;
135+ private final CoordinatorServerChangeWatcher coordinatorServerChangeWatcher ;
131136 private final TabletServerChangeWatcher tabletServerChangeWatcher ;
132137 private final CoordinatorMetadataCache serverMetadataCache ;
133138 private final CoordinatorRequestBatch coordinatorRequestBatch ;
@@ -137,6 +142,7 @@ public class CoordinatorEventProcessor implements EventProcessor {
137142 private final CompletedSnapshotStoreManager completedSnapshotStoreManager ;
138143
139144 // metrics
145+ private volatile int aliveCoordinatorServerCount ;
140146 private volatile int tabletServerCount ;
141147 private volatile int offlineBucketCount ;
142148 private volatile int tableCount ;
@@ -183,6 +189,8 @@ public CoordinatorEventProcessor(
183189 replicaStateMachine ,
184190 tableBucketStateMachine ,
185191 new RemoteStorageCleaner (conf , ioExecutor ));
192+ this .coordinatorServerChangeWatcher =
193+ new CoordinatorServerChangeWatcher (zooKeeperClient , coordinatorEventManager );
186194 this .tableChangeWatcher = new TableChangeWatcher (zooKeeperClient , coordinatorEventManager );
187195 this .tabletServerChangeWatcher =
188196 new TabletServerChangeWatcher (zooKeeperClient , coordinatorEventManager );
@@ -203,6 +211,8 @@ public CoordinatorEventProcessor(
203211
204212 private void registerMetrics () {
205213 coordinatorMetricGroup .gauge (MetricNames .ACTIVE_COORDINATOR_COUNT , () -> 1 );
214+ coordinatorMetricGroup .gauge (
215+ MetricNames .ALIVE_COORDINATOR_COUNT , () -> aliveCoordinatorServerCount );
206216 coordinatorMetricGroup .gauge (
207217 MetricNames .ACTIVE_TABLET_SERVER_COUNT , () -> tabletServerCount );
208218 coordinatorMetricGroup .gauge (MetricNames .OFFLINE_BUCKET_COUNT , () -> offlineBucketCount );
@@ -219,6 +229,7 @@ public CoordinatorEventManager getCoordinatorEventManager() {
219229 public void startup () {
220230 coordinatorContext .setCoordinatorServerInfo (getCoordinatorServerInfo ());
221231 // start watchers first so that we won't miss node in zk;
232+ coordinatorServerChangeWatcher .start ();
222233 tabletServerChangeWatcher .start ();
223234 tableChangeWatcher .start ();
224235 LOG .info ("Initializing coordinator context." );
@@ -257,7 +268,7 @@ public void shutdown() {
257268 private ServerInfo getCoordinatorServerInfo () {
258269 try {
259270 return zooKeeperClient
260- .getCoordinatorAddress ()
271+ .getCoordinatorLeaderAddress ()
261272 .map (
262273 coordinatorAddress ->
263274 // TODO we set id to 0 as that CoordinatorServer don't support
@@ -285,6 +296,11 @@ public int getCoordinatorEpoch() {
285296
286297 private void initCoordinatorContext () throws Exception {
287298 long start = System .currentTimeMillis ();
299+ // get all coordinator servers
300+ int [] currentCoordinatorServers = zooKeeperClient .getCoordinatorServerList ();
301+ coordinatorContext .setLiveCoordinatorServers (
302+ Arrays .stream (currentCoordinatorServers ).boxed ().collect (Collectors .toSet ()));
303+
288304 // get all tablet server's
289305 int [] currentServers = zooKeeperClient .getSortedTabletServerList ();
290306 List <ServerInfo > tabletServerInfos = new ArrayList <>();
@@ -441,6 +457,7 @@ private void onShutdown() {
441457 tableManager .shutdown ();
442458
443459 // then stop watchers
460+ coordinatorServerChangeWatcher .stop ();
444461 tableChangeWatcher .stop ();
445462 tabletServerChangeWatcher .stop ();
446463 }
@@ -461,6 +478,10 @@ public void process(CoordinatorEvent event) {
461478 (NotifyLeaderAndIsrResponseReceivedEvent ) event );
462479 } else if (event instanceof DeleteReplicaResponseReceivedEvent ) {
463480 processDeleteReplicaResponseReceived ((DeleteReplicaResponseReceivedEvent ) event );
481+ } else if (event instanceof NewCoordinatorServerEvent ) {
482+ processNewCoordinatorServer ((NewCoordinatorServerEvent ) event );
483+ } else if (event instanceof DeadCoordinatorServerEvent ) {
484+ processDeadCoordinatorServer ((DeadCoordinatorServerEvent ) event );
464485 } else if (event instanceof NewTabletServerEvent ) {
465486 processNewTabletServer ((NewTabletServerEvent ) event );
466487 } else if (event instanceof DeadTabletServerEvent ) {
@@ -505,6 +526,7 @@ public void process(CoordinatorEvent event) {
505526 }
506527
507528 private void updateMetrics () {
529+ aliveCoordinatorServerCount = coordinatorContext .getLiveCoordinatorServers ().size ();
508530 tabletServerCount = coordinatorContext .getLiveTabletServers ().size ();
509531 tableCount = coordinatorContext .allTables ().size ();
510532 bucketCount = coordinatorContext .bucketLeaderAndIsr ().size ();
@@ -754,10 +776,33 @@ private void onReplicaBecomeOffline(Set<TableBucketReplica> offlineReplicas) {
754776 replicaStateMachine .handleStateChanges (offlineReplicas , OfflineReplica );
755777 }
756778
779+ private void processNewCoordinatorServer (NewCoordinatorServerEvent newCoordinatorServerEvent ) {
780+ int coordinatorServerId = newCoordinatorServerEvent .getServerId ();
781+ if (coordinatorContext .getLiveCoordinatorServers ().contains (coordinatorServerId )) {
782+ return ;
783+ }
784+
785+ // process new coordinator server
786+ LOG .info ("New coordinator server callback for coordinator server {}" , coordinatorServerId );
787+
788+ coordinatorContext .addLiveCoordinatorServer (coordinatorServerId );
789+ }
790+
791+ private void processDeadCoordinatorServer (
792+ DeadCoordinatorServerEvent deadCoordinatorServerEvent ) {
793+ int coordinatorServerId = deadCoordinatorServerEvent .getServerId ();
794+ if (!coordinatorContext .getLiveCoordinatorServers ().contains (coordinatorServerId )) {
795+ return ;
796+ }
797+ // process dead coordinator server
798+ LOG .info ("Coordinator server failure callback for {}." , coordinatorServerId );
799+ coordinatorContext .removeLiveCoordinatorServer (coordinatorServerId );
800+ }
801+
757802 private void processNewTabletServer (NewTabletServerEvent newTabletServerEvent ) {
758803 // NOTE: we won't need to detect bounced tablet servers like Kafka as we won't
759804 // miss the event of tablet server un-register and register again since we can
760- // listener the children created and deleted in zk node.
805+ // listen the children created and deleted in zk node.
761806
762807 // Also, Kafka use broker epoch to make it can reject the LeaderAndIsrRequest,
763808 // UpdateMetadataRequest and StopReplicaRequest
@@ -776,7 +821,7 @@ private void processNewTabletServer(NewTabletServerEvent newTabletServerEvent) {
776821 // it may happen during coordinator server initiation, the watcher watch a new tablet
777822 // server register event and put it to event manager, but after that, the coordinator
778823 // server read
779- // all tablet server nodes registered which contain the tablet server a ; in this case,
824+ // all tablet server nodes registered which contain the tablet server; in this case,
780825 // we can ignore it.
781826 return ;
782827 }
0 commit comments