[Feature] Coordinator Server Supports coordinator epoch protect#2781
[Feature] Coordinator Server Supports coordinator epoch protect#2781wuchong merged 2 commits intoapache:mainfrom
Conversation
swuferhong
left a comment
There was a problem hiding this comment.
@zcoo Thanks for the great contributions. I left some comments.
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java
Outdated
Show resolved
Hide resolved
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorLeaderElection.java
Outdated
Show resolved
Hide resolved
fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java
Show resolved
Hide resolved
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java
Outdated
Show resolved
Hide resolved
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorLeaderElection.java
Outdated
Show resolved
Hide resolved
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorLeaderElection.java
Outdated
Show resolved
Hide resolved
8fa3cf7 to
bc8713c
Compare
e8dff9d to
c41673e
Compare
There was a problem hiding this comment.
Pull request overview
This PR implements “coordinator epoch” fencing to protect ZooKeeper metadata writes and tablet-server RPCs during coordinator leader changes (HA), aligning with the epoch-protection portion of issue #2778 / FIP-9.
Changes:
- Add a persistent
/coordinators/epochznode and leader-side epoch bumping (“fencing”) on leadership acquisition. - Wrap key ZK metadata mutations (table assignment, leader/isr) with an epoch znode version check via Curator transactions.
- Propagate coordinator epoch through
UpdateMetadataRequest, and update unit/integration tests accordingly.
Reviewed changes
Copilot reviewed 31 out of 31 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| fluss-server/src/test/java/org/apache/fluss/server/zk/ZooKeeperClientTest.java | Updates tests to use expected ZK versions and coordinatorEpoch in LeaderAndIsr. |
| fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java | Resets tablet server replica-manager coordinator epoch between tests. |
| fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServiceITCase.java | Updates makeUpdateMetadataRequest calls to include the new epoch parameter. |
| fluss-server/src/test/java/org/apache/fluss/server/metadata/ZkBasedMetadataProviderTest.java | Updates LeaderAndIsr registration calls to pass expected ZK version. |
| fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorContext.java | Adds a test-only CoordinatorContext that bypasses epoch-version checks. |
| fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerTest.java | Uses TestCoordinatorContext + passes expected versions to ZK writes. |
| fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java | Updates makeUpdateMetadataRequest calls to include the new epoch parameter. |
| fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachineTest.java | Uses TestCoordinatorContext and passes expected versions to ZK writes. |
| fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachineTest.java | Uses TestCoordinatorContext and passes expected versions to ZK writes. |
| fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorServerITCase.java | Minor formatting cleanup in test config. |
| fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorServerElectionTest.java | Asserts coordinator epoch increments across leader transitions. |
| fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java | Uses TestCoordinatorContext in event-processor test setup. |
| fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorChannelManagerTest.java | Updates makeUpdateMetadataRequest usage with the new signature. |
| fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperOp.java | Adds helpers to build Curator transaction ops (check/create/update/delete). |
| fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java | Implements epoch znode handling + expected-version checks for key metadata mutations. |
| fluss-server/src/main/java/org/apache/fluss/server/zk/ZkEpoch.java | Introduces a small value object for epoch + epoch-znode-version. |
| fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkVersion.java | Adds special version constants (match-any/unknown). |
| fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java | Adds CoordinatorEpochZNode path + encode/decode helpers. |
| fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java | Extends UpdateMetadataRequest builder to optionally include coordinatorEpoch. |
| fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java | Adds test-only reset hook for tablet-server coordinator epoch. |
| fluss-server/src/main/java/org/apache/fluss/server/metrics/group/CoordinatorMetricGroup.java | Ensures server_id metric variable is stringified. |
| fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachine.java | Passes expected epoch-znode version into ZK LeaderAndIsr mutations. |
| fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachine.java | Passes expected epoch-znode version into batch ZK LeaderAndIsr updates. |
| fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java | Uses match-any ZK version for deletions / initial table-assignment registration. |
| fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java | Wires CoordinatorContext into leader election construction. |
| fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java | Includes coordinatorEpoch in UpdateMetadata requests sent to tablet servers. |
| fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorLeaderElection.java | Adds leader fencing step that attempts to bump coordinator epoch in ZK. |
| fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java | Passes expected epoch-znode version into ZK assignment/LeaderAndIsr writes. |
| fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java | Tracks coordinatorEpoch and coordinatorEpochZkVersion. |
| fluss-common/src/main/java/org/apache/fluss/exception/CoordinatorEpochFencedException.java | Adds a runtime exception type for fencing failures. |
| fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java | Adds coordinator.id configuration option. |
Comments suppressed due to low confidence (1)
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorLeaderElection.java:162
- Leader fencing is effectively ignored:
fenceBecomeCoordinatorLeader(serverId)can returnOptional.empty()(BadVersion) butinitLeaderServices.run()still executes andisLeaderis set to true, which can allow a fenced coordinator to continue acting as leader. Consider treatingOptional.empty()as a hard failure (e.g., throw aCoordinatorEpochFencedException/ skip init and trigger leadership relinquish) and only markingisLeadertrue after fencing succeeds.
try {
// to avoid split-brain
Optional<ZkEpoch> optionalEpoch =
zkClient.fenceBecomeCoordinatorLeader(serverId);
optionalEpoch.ifPresent(
integer ->
coordinatorContext
.setCoordinatorEpochAndZkVersion(
optionalEpoch
.get()
.getCoordinatorEpoch(),
optionalEpoch
.get()
.getCoordinatorEpochZkVersion()));
initLeaderServices.run();
} catch (CoordinatorEpochFencedException e) {
LOG.warn(
"Coordinator server {} has been fenced and not become leader successfully.",
serverId);
throw e;
} catch (Exception e) {
LOG.error(
"Failed to initialize leader services for server {}",
serverId,
e);
}
});
// Set leader flag before init completes, so when zk found this leader, the
// leader can accept requests
isLeader.set(true);
}
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| public void createRecursiveWithEpochCheck( | ||
| String path, byte[] data, int expectedZkVersion, boolean throwIfPathExists) | ||
| throws Exception { | ||
| CuratorOp createOp = zkOp.createOp(path, data, CreateMode.PERSISTENT); |
There was a problem hiding this comment.
createRecursiveWithEpochCheck recursively calls itself with data = null for parent paths, but ZooKeeperOp.createOp(...).forPath(path, data) typically expects a non-null byte array. This can cause NPE/IAE during parent creation. Consider using Curator's create builder that omits data for parent nodes (or pass an explicit empty byte array) and only attach data to the final target node.
| CuratorOp createOp = zkOp.createOp(path, data, CreateMode.PERSISTENT); | |
| byte[] nodeData = data == null ? new byte[0] : data; | |
| CuratorOp createOp = zkOp.createOp(path, nodeData, CreateMode.PERSISTENT); |
| createRecursiveWithEpochCheck( | ||
| parentPath, null, expectedZkVersion, throwIfPathExists); |
There was a problem hiding this comment.
createRecursiveWithEpochCheck propagates throwIfPathExists into the recursive parent creation call. If throwIfPathExists is true for the leaf node, this will also throw when parent paths already exist, which breaks the usual contract of recursive-create (only the target path should be subject to the existence check). Consider passing false when creating parent paths, and applying throwIfPathExists only to the final path creation attempt.
| createRecursiveWithEpochCheck( | |
| parentPath, null, expectedZkVersion, throwIfPathExists); | |
| createRecursiveWithEpochCheck(parentPath, null, expectedZkVersion, false); |
fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java
Outdated
Show resolved
Hide resolved
fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java
Outdated
Show resolved
Hide resolved
fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
Outdated
Show resolved
Hide resolved
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java
Show resolved
Hide resolved
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorLeaderElection.java
Outdated
Show resolved
Hide resolved
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorLeaderElection.java
Outdated
Show resolved
Hide resolved
wuchong
left a comment
There was a problem hiding this comment.
Thanks @zcoo for the contribution. I left some comments below, besides, I think we should also add a test case to cover the epoch evolution in the cluster.
Missing test coverage for coordinator epoch propagation to TabletServer after leader switch
There is no test that verifies the end-to-end flow:
- Coordinator A is leader with epoch=1, sends requests to TabletServer
- Coordinator A loses leadership
- Coordinator B becomes leader with epoch=2
- TabletServer accepts requests from B (epoch=2) and rejects stale requests from A (epoch=1)
Existing tests cover individual pieces (ZK epoch increment in CoordinatorServerElectionTest, epoch fencing in ReplicaManagerTest) but not the full flow.
Suggestion: Add an integration test in CoordinatorHighAvailabilityITCase:
@Test
void testTabletServerRejectsStaleCoordinatorEpochAfterLeaderSwitch() {
// 1. Start two coordinators, confirm leader
// 2. Record current coordinator epoch
// 3. Kill leader's ZK session, trigger leader switch
// 4. Wait for new leader election (epoch should increment)
// 5. Verify new leader can send requests to TabletServer
// 6. Verify requests with old epoch are rejected with InvalidCoordinatorException
}
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java
Outdated
Show resolved
Hide resolved
fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorLeaderElection.java
Show resolved
Hide resolved
fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java
Outdated
Show resolved
Hide resolved
fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java
Outdated
Show resolved
Hide resolved
fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java
Outdated
Show resolved
Hide resolved
fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java
Outdated
Show resolved
Hide resolved
fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java
Outdated
Show resolved
Hide resolved
fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java
Outdated
Show resolved
Hide resolved
fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java
Outdated
Show resolved
Hide resolved
fluss-server/src/test/java/org/apache/fluss/server/zk/ZooKeeperClientTest.java
Outdated
Show resolved
Hide resolved
|
@wuchong @swuferhong Thanks for all your comments. Now they are all addressed. PLAT~ |
0122c9f to
9108e75
Compare
|
@wuchong To improve test coverage for coordinator epoch propagation to TabletServer/Zookeeper after leader switch, I just add 2 test cases in CoordinatorHighAvailabilityITCase: |
200ef9a to
bbe55c6
Compare
|
I pushed a commit fixes a race condition in leader election and refactors coordinator epoch to be immutable. Key Changes
|
bbe55c6 to
1a4384b
Compare
Purpose
Linked issue: close #2778
This is the part 2 pr for coordinator high availability focusing on "coordinator epoch" logic.
Will ready for review when part 1 finish and merge.
Brief change log
see https://cwiki.apache.org/confluence/display/FLUSS/FIP-9%3A+Support+CoordinatorServer+High+Availability
Tests
API and Format
Documentation