Skip to content

Commit 84c1db7

Browse files
committed
user LeaderLatch instead of LeaderSelector
1 parent e237eee commit 84c1db7

1 file changed

Lines changed: 47 additions & 54 deletions

File tree

fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorLeaderElection.java

Lines changed: 47 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -20,78 +20,71 @@
2020

2121
import com.alibaba.fluss.server.zk.data.ZkData;
2222
import com.alibaba.fluss.shaded.curator5.org.apache.curator.framework.CuratorFramework;
23-
import com.alibaba.fluss.shaded.curator5.org.apache.curator.framework.recipes.leader.LeaderSelector;
24-
import com.alibaba.fluss.shaded.curator5.org.apache.curator.framework.recipes.leader.LeaderSelectorListener;
25-
import com.alibaba.fluss.shaded.curator5.org.apache.curator.framework.state.ConnectionState;
26-
import com.alibaba.fluss.utils.concurrent.ExecutorThreadFactory;
23+
import com.alibaba.fluss.shaded.curator5.org.apache.curator.framework.recipes.leader.LeaderLatch;
24+
import com.alibaba.fluss.shaded.curator5.org.apache.curator.framework.recipes.leader.LeaderLatchListener;
2725

2826
import org.slf4j.Logger;
2927
import org.slf4j.LoggerFactory;
3028

31-
import java.util.concurrent.Executors;
32-
import java.util.concurrent.ScheduledExecutorService;
33-
import java.util.concurrent.TimeUnit;
29+
import java.io.IOException;
30+
import java.util.concurrent.atomic.AtomicBoolean;
3431

3532
/** Using by coordinator server. Coordinator servers listen ZK node and elect leadership. */
36-
public class CoordinatorLeaderElection {
33+
public class CoordinatorLeaderElection implements AutoCloseable {
3734
private static final Logger LOG = LoggerFactory.getLogger(CoordinatorLeaderElection.class);
3835

39-
private final CuratorFramework zkClient;
4036
private final int serverId;
41-
private final ScheduledExecutorService executor;
37+
private final LeaderLatch leaderLatch;
38+
private final AtomicBoolean isLeader = new AtomicBoolean(false);
4239

4340
public CoordinatorLeaderElection(CuratorFramework zkClient, int serverId) {
44-
this(
45-
zkClient,
46-
serverId,
47-
Executors.newSingleThreadScheduledExecutor(
48-
new ExecutorThreadFactory("fluss-coordinator-leader-election")));
49-
}
50-
51-
protected CoordinatorLeaderElection(
52-
CuratorFramework zkClient, int serverId, ScheduledExecutorService executor) {
53-
this.zkClient = zkClient;
5441
this.serverId = serverId;
55-
this.executor = executor;
42+
this.leaderLatch =
43+
new LeaderLatch(
44+
zkClient, ZkData.CoordinatorElectionZNode.path(), String.valueOf(serverId));
5645
}
5746

5847
public void startElectLeader(Runnable initLeaderServices) {
59-
executor.schedule(() -> electLeader(initLeaderServices), 0, TimeUnit.MILLISECONDS);
60-
}
48+
leaderLatch.addListener(
49+
new LeaderLatchListener() {
50+
@Override
51+
public void isLeader() {
52+
LOG.info("Coordinator server {} has become the leader.", serverId);
53+
isLeader.set(true);
54+
}
55+
56+
@Override
57+
public void notLeader() {
58+
LOG.warn("Coordinator server {} has lost the leadership.", serverId);
59+
isLeader.set(false);
60+
}
61+
});
6162

62-
private void electLeader(Runnable initLeaderServices) {
63-
LeaderSelector leaderSelector =
64-
new LeaderSelector(
65-
zkClient,
66-
ZkData.CoordinatorElectionZNode.path(),
67-
new LeaderSelectorListener() {
68-
@Override
69-
public void takeLeadership(CuratorFramework client) {
70-
LOG.info(
71-
"Coordinator server {} win the leader in election now.",
72-
serverId);
73-
initLeaderServices.run();
63+
try {
64+
leaderLatch.start();
65+
LOG.info("Coordinator server {} started leader election.", serverId);
7466

75-
// Do not return, otherwise the leader will be released immediately.
76-
while (true) {
77-
try {
78-
Thread.sleep(1000);
79-
} catch (InterruptedException e) {
80-
}
81-
}
82-
}
67+
// todo: Currently, we await the leader latch and do nothing until it becomes leader.
68+
// Later we can make it as a hot backup server to continuously synchronize metadata from
69+
// Zookeeper, which save time from initializing context
70+
leaderLatch.await();
71+
initLeaderServices.run();
8372

84-
@Override
85-
public void stateChanged(
86-
CuratorFramework client, ConnectionState newState) {
87-
if (newState == ConnectionState.LOST) {
88-
LOG.info("Coordinator leader {} lost connection", serverId);
89-
}
90-
}
91-
});
73+
} catch (Exception e) {
74+
LOG.error("Failed to start LeaderLatch for server {}", serverId, e);
75+
throw new RuntimeException("Leader election start failed", e);
76+
}
77+
}
78+
79+
@Override
80+
public void close() throws IOException {
81+
LOG.info("Closing LeaderLatch for server {}.", serverId);
82+
if (leaderLatch != null) {
83+
leaderLatch.close();
84+
}
85+
}
9286

93-
// allow reelection
94-
leaderSelector.autoRequeue();
95-
leaderSelector.start();
87+
public boolean isLeader() {
88+
return this.isLeader.get();
9689
}
9790
}

0 commit comments

Comments
 (0)