Skip to content

Commit 0122c9f

Browse files
committed
fix cr
1 parent 08eddf7 commit 0122c9f

4 files changed

Lines changed: 17 additions & 10 deletions

File tree

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorLeaderElection.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,11 @@ public void isLeader() {
112112
CompletableFuture<Void> cleanup = pendingCleanup.get();
113113
// Run init on a separate thread to avoid deadlock with
114114
// Curator's EventThread when performing ZK operations.
115+
116+
// Set leader flag before init completes, so when zk found this leader, the
117+
// leader can accept requests
118+
isLeader.set(true);
119+
115120
leaderCallbackExecutor.execute(
116121
() -> {
117122
// Wait for any pending cleanup to finish first.
@@ -133,17 +138,15 @@ public void isLeader() {
133138
LOG.warn(
134139
"Coordinator server {} has been fenced and not become leader successfully.",
135140
serverId);
136-
throw e;
141+
notLeader();
137142
} catch (Exception e) {
138143
LOG.error(
139144
"Failed to initialize leader services for server {}",
140145
serverId,
141146
e);
147+
notLeader();
142148
}
143149
});
144-
// Set leader flag before init completes, so when zk found this leader, the
145-
// leader can accept requests
146-
isLeader.set(true);
147150
}
148151

149152
@Override

fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -348,6 +348,9 @@ protected void cleanupCoordinatorLeader() {
348348

349349
try {
350350
// make sure the current coordinator leader node is unregistered.
351+
// Different from ZK disconnection,
352+
// when we actively release the Leader's election,
353+
// we need to manually delete the node
351354
unregisterCoordinatorLeader();
352355
} catch (Throwable t) {
353356
LOG.warn("Failed to unregister coordinator leader from Zookeeper", t);

fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -200,10 +200,6 @@ public void afterEach(ExtensionContext extensionContext) throws Exception {
200200
}
201201
}
202202
CompletableFuture.allOf(dropFutures.toArray(new CompletableFuture[0])).join();
203-
204-
for (TabletServer tabletServer : tabletServers.values()) {
205-
tabletServer.getReplicaManager().resetCoordinatorEpoch();
206-
}
207203
}
208204

209205
public void start() throws Exception {

fluss-server/src/test/java/org/apache/fluss/server/zk/ZooKeeperClientTest.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import org.junit.jupiter.api.AfterAll;
5656
import org.junit.jupiter.api.AfterEach;
5757
import org.junit.jupiter.api.BeforeAll;
58+
import org.junit.jupiter.api.BeforeEach;
5859
import org.junit.jupiter.api.Test;
5960
import org.junit.jupiter.api.extension.RegisterExtension;
6061
import org.junit.jupiter.params.ParameterizedTest;
@@ -88,15 +89,19 @@ class ZooKeeperClientTest {
8889
private static ZkEpoch zkEpoch;
8990

9091
@BeforeAll
91-
static void beforeAll() throws Exception {
92+
static void beforeAll() {
9293
zookeeperClient =
9394
ZOO_KEEPER_EXTENSION_WRAPPER
9495
.getCustomExtension()
9596
.getZooKeeperClient(NOPErrorHandler.INSTANCE);
96-
zkEpoch = zookeeperClient.fenceBecomeCoordinatorLeader("1");
9797
remoteDataDir = zookeeperClient.getDefaultRemoteDataDir();
9898
}
9999

100+
@BeforeEach
101+
void beforeEach() throws Exception {
102+
zkEpoch = zookeeperClient.fenceBecomeCoordinatorLeader("tmp");
103+
}
104+
100105
@AfterEach
101106
void afterEach() {
102107
ZOO_KEEPER_EXTENSION_WRAPPER.getCustomExtension().cleanupRoot();

0 commit comments

Comments
 (0)