From 9e065cd77d08a850d389c2ac41d03be3db93054f Mon Sep 17 00:00:00 2001 From: "whenzhou.zc" Date: Thu, 24 Jul 2025 16:08:18 +0800 Subject: [PATCH 1/2] [server] Coordinator supports coordinator epoch protection (#2781) --- .../CoordinatorEpochFencedException.java | 26 ++ .../coordinator/CoordinatorContext.java | 16 +- .../CoordinatorEventProcessor.java | 25 +- .../CoordinatorLeaderElection.java | 22 +- .../coordinator/CoordinatorRequestBatch.java | 1 + .../server/coordinator/CoordinatorServer.java | 28 +- .../statemachine/ReplicaStateMachine.java | 3 +- .../statemachine/TableBucketStateMachine.java | 18 +- .../metrics/group/CoordinatorMetricGroup.java | 2 +- .../fluss/server/replica/ReplicaManager.java | 5 + .../server/utils/ServerRpcMessageUtils.java | 4 + .../org/apache/fluss/server/zk/ZkEpoch.java | 42 +++ .../fluss/server/zk/ZooKeeperClient.java | 249 ++++++++++++++++-- .../apache/fluss/server/zk/ZooKeeperOp.java | 64 +++++ .../apache/fluss/server/zk/data/ZkData.java | 18 ++ .../fluss/server/zk/data/ZkVersion.java | 35 +++ .../CoordinatorChannelManagerTest.java | 1 + .../CoordinatorEventProcessorTest.java | 6 +- .../CoordinatorHighAvailabilityITCase.java | 138 ++++++++++ .../CoordinatorServerElectionTest.java | 6 + .../coordinator/CoordinatorServerITCase.java | 1 - .../coordinator/TableManagerITCase.java | 10 +- .../server/coordinator/TableManagerTest.java | 2 +- .../coordinator/TestCoordinatorContext.java | 39 +++ .../rebalance/RebalanceManagerTest.java | 6 +- .../statemachine/ReplicaStateMachineTest.java | 18 +- .../TableBucketStateMachineTest.java | 21 +- .../metadata/ZkBasedMetadataProviderTest.java | 25 +- .../server/tablet/TabletServiceITCase.java | 1 + .../fluss/server/zk/ZooKeeperClientTest.java | 43 ++- 30 files changed, 798 insertions(+), 77 deletions(-) create mode 100644 fluss-common/src/main/java/org/apache/fluss/exception/CoordinatorEpochFencedException.java create mode 100644 fluss-server/src/main/java/org/apache/fluss/server/zk/ZkEpoch.java create mode 100644 fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperOp.java create mode 100644 fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkVersion.java create mode 100644 fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorContext.java diff --git a/fluss-common/src/main/java/org/apache/fluss/exception/CoordinatorEpochFencedException.java b/fluss-common/src/main/java/org/apache/fluss/exception/CoordinatorEpochFencedException.java new file mode 100644 index 0000000000..0e0e4e409a --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/exception/CoordinatorEpochFencedException.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.exception; + +/** Exception thrown when the Coordinator leader epoch is fenced. */ +public class CoordinatorEpochFencedException extends RuntimeException { + public CoordinatorEpochFencedException(String message) { + super(message); + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java index 57ed115289..138ddb1756 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java @@ -55,6 +55,7 @@ public class CoordinatorContext { private static final Logger LOG = LoggerFactory.getLogger(CoordinatorContext.class); public static final int INITIAL_COORDINATOR_EPOCH = 0; + public static final int INITIAL_COORDINATOR_EPOCH_ZK_VERSION = 0; // for simplicity, we just use retry time, may consider make it a configurable value // and use combine retry times and retry delay @@ -109,6 +110,7 @@ public class CoordinatorContext { private ServerInfo coordinatorServerInfo = null; private int coordinatorEpoch = INITIAL_COORDINATOR_EPOCH; + private int coordinatorEpochZkVersion = INITIAL_COORDINATOR_EPOCH_ZK_VERSION; public CoordinatorContext() {} @@ -116,6 +118,15 @@ public int getCoordinatorEpoch() { return coordinatorEpoch; } + public int getCoordinatorEpochZkVersion() { + return coordinatorEpochZkVersion; + } + + public void setCoordinatorEpochAndZkVersion(int newEpoch, int newZkVersion) { + this.coordinatorEpoch = newEpoch; + this.coordinatorEpochZkVersion = newZkVersion; + } + public Set getLiveCoordinatorServers() { return liveCoordinatorServers; } @@ -708,12 +719,13 @@ private void clearTablesState() { public void resetContext() { tablesToBeDeleted.clear(); - coordinatorEpoch = 0; + coordinatorEpoch = INITIAL_COORDINATOR_EPOCH; + coordinatorEpochZkVersion = INITIAL_COORDINATOR_EPOCH_ZK_VERSION; clearTablesState(); liveTabletServers.clear(); + liveCoordinatorServers.clear(); shuttingDownTabletServers.clear(); serverTags.clear(); - liveCoordinatorServers.clear(); } public int getTotalPartitionCount() { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java index 4f53ba666e..423d27622b 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java @@ -108,6 +108,7 @@ import org.apache.fluss.server.metadata.ServerInfo; import org.apache.fluss.server.metrics.group.CoordinatorMetricGroup; import org.apache.fluss.server.utils.ServerRpcMessageUtils; +import org.apache.fluss.server.zk.ZkEpoch; import org.apache.fluss.server.zk.ZooKeeperClient; import org.apache.fluss.server.zk.data.BucketAssignment; import org.apache.fluss.server.zk.data.LeaderAndIsr; @@ -188,6 +189,7 @@ public class CoordinatorEventProcessor implements EventProcessor { public CoordinatorEventProcessor( ZooKeeperClient zooKeeperClient, + ZkEpoch zkEpoch, CoordinatorMetadataCache serverMetadataCache, CoordinatorChannelManager coordinatorChannelManager, CoordinatorContext coordinatorContext, @@ -253,6 +255,9 @@ public CoordinatorEventProcessor( this.ioExecutor = ioExecutor; this.lakeTableHelper = new LakeTableHelper(zooKeeperClient, conf.getString(ConfigOptions.REMOTE_DATA_DIR)); + + this.coordinatorContext.setCoordinatorEpochAndZkVersion( + zkEpoch.getCoordinatorEpoch(), zkEpoch.getCoordinatorEpochZkVersion()); } public CoordinatorEventManager getCoordinatorEventManager() { @@ -1618,7 +1623,10 @@ private void updateReplicaAssignmentForBucket( tableAssignment.forEach( (bucket, replicas) -> newTableAssignment.put(bucket, new BucketAssignment(replicas))); - zooKeeperClient.updateTableAssignment(tableId, new TableAssignment(newTableAssignment)); + zooKeeperClient.updateTableAssignment( + tableId, + new TableAssignment(newTableAssignment), + coordinatorContext.getCoordinatorEpochZkVersion()); } else { Map> partitionAssignment = coordinatorContext.getPartitionAssignment( @@ -1629,7 +1637,9 @@ private void updateReplicaAssignmentForBucket( (bucket, replicas) -> newPartitionAssignment.put(bucket, new BucketAssignment(replicas))); zooKeeperClient.updatePartitionAssignment( - partitionId, new PartitionAssignment(tableId, newPartitionAssignment)); + partitionId, + new PartitionAssignment(tableId, newPartitionAssignment), + coordinatorContext.getCoordinatorEpochZkVersion()); } } @@ -1675,7 +1685,8 @@ private List tryProcessAdjustIsr( } try { - zooKeeperClient.batchUpdateLeaderAndIsr(newLeaderAndIsrList); + zooKeeperClient.batchUpdateLeaderAndIsr( + newLeaderAndIsrList, coordinatorContext.getCoordinatorEpochZkVersion()); newLeaderAndIsrList.forEach( (tableBucket, newLeaderAndIsr) -> result.add(new AdjustIsrResultForBucket(tableBucket, newLeaderAndIsr))); @@ -1686,7 +1697,10 @@ private List tryProcessAdjustIsr( TableBucket tableBucket = entry.getKey(); LeaderAndIsr newLeaderAndIsr = entry.getValue(); try { - zooKeeperClient.updateLeaderAndIsr(tableBucket, newLeaderAndIsr); + zooKeeperClient.updateLeaderAndIsr( + tableBucket, + newLeaderAndIsr, + coordinatorContext.getCoordinatorEpochZkVersion()); } catch (Exception e) { LOG.error("Error when register leader and isr.", e); result.add( @@ -2212,7 +2226,8 @@ private void updateBucketEpochAndSendRequest(TableBucket tableBucket, List> pendingCleanup = new AtomicReference<>(CompletableFuture.completedFuture(null)); - public CoordinatorLeaderElection(ZooKeeperClient zkClient, String serverId) { + public CoordinatorLeaderElection( + ZooKeeperClient zkClient, String serverId, CoordinatorContext coordinatorContext) { this.serverId = serverId; + this.zkClient = zkClient; + this.coordinatorContext = coordinatorContext; this.leaderLatch = new LeaderLatch( zkClient.getCuratorClient(), @@ -106,6 +112,11 @@ public void isLeader() { CompletableFuture cleanup = pendingCleanup.get(); // Run init on a separate thread to avoid deadlock with // Curator's EventThread when performing ZK operations. + + // Set leader flag before init completes, so when zk found this leader, the + // leader can accept requests + isLeader.set(true); + leaderCallbackExecutor.execute( () -> { // Wait for any pending cleanup to finish first. @@ -123,16 +134,19 @@ public void isLeader() { } try { initLeaderServices.run(); + } catch (CoordinatorEpochFencedException e) { + LOG.warn( + "Coordinator server {} has been fenced and not become leader successfully.", + serverId); + notLeader(); } catch (Exception e) { LOG.error( "Failed to initialize leader services for server {}", serverId, e); + notLeader(); } }); - // Set leader flag before init completes, so when zk found this leader, the - // leader can accept requests - isLeader.set(true); } @Override diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java index 60eaa988c9..3441053671 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java @@ -683,6 +683,7 @@ private UpdateMetadataRequest buildUpdateMetadataRequest() { // tablet servers. return makeUpdateMetadataRequest( coordinatorContext.getCoordinatorServerInfo(), + coordinatorContext.getCoordinatorEpoch(), new HashSet<>(coordinatorContext.getLiveTabletServers().values()), tableMetadataList, partitionMetadataList); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java index 25a0d866fe..a54fc2d58e 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java @@ -39,6 +39,7 @@ import org.apache.fluss.server.metrics.ServerMetricUtils; import org.apache.fluss.server.metrics.group.CoordinatorMetricGroup; import org.apache.fluss.server.metrics.group.LakeTieringMetricGroup; +import org.apache.fluss.server.zk.ZkEpoch; import org.apache.fluss.server.zk.ZooKeeperClient; import org.apache.fluss.server.zk.ZooKeeperUtils; import org.apache.fluss.server.zk.data.CoordinatorAddress; @@ -220,9 +221,11 @@ protected void initCoordinatorStandby() throws Exception { serverId); this.zkClient = ZooKeeperUtils.startZookeeperClient(conf, this); + this.coordinatorContext = new CoordinatorContext(); // CoordinatorLeaderElection must be created after zkClient is initialized. - this.coordinatorLeaderElection = new CoordinatorLeaderElection(zkClient, serverId); + this.coordinatorLeaderElection = + new CoordinatorLeaderElection(zkClient, serverId, coordinatorContext); this.lakeCatalogDynamicLoader = new LakeCatalogDynamicLoader(conf, pluginManager, true); this.dynamicConfigManager = new DynamicConfigManager(zkClient, conf, true); @@ -232,7 +235,6 @@ protected void initCoordinatorStandby() throws Exception { dynamicConfigManager.startup(); - this.coordinatorContext = new CoordinatorContext(); this.metadataCache = new CoordinatorMetadataCache(); this.authorizer = AuthorizerLoader.createAuthorizer(conf, zkClient, pluginManager); @@ -294,7 +296,10 @@ protected void initCoordinatorStandby() throws Exception { } protected void initCoordinatorLeader() throws Exception { + // to avoid split-brain + ZkEpoch zkEpoch = zkClient.fenceBecomeCoordinatorLeader(serverId); + registerCoordinatorLeader(); synchronized (lock) { this.clientMetricGroup = new ClientMetricGroup(metricRegistry, SERVER_NAME); this.rpcClient = RpcClient.create(conf, clientMetricGroup); @@ -305,7 +310,6 @@ protected void initCoordinatorLeader() throws Exception { new AutoPartitionManager(metadataCache, metadataManager, conf); autoPartitionManager.start(); - registerCoordinatorLeader(); // start coordinator event processor after we register coordinator leader to zk // so that the event processor can get the coordinator leader node from zk during start // up. @@ -314,6 +318,7 @@ protected void initCoordinatorLeader() throws Exception { this.coordinatorEventProcessor = new CoordinatorEventProcessor( zkClient, + zkEpoch, metadataCache, coordinatorChannelManager, coordinatorContext, @@ -337,10 +342,20 @@ protected void initCoordinatorLeader() throws Exception { * from leader to standby. It cleans up leader-only resources while keeping the server running * as a standby, ready to participate in future elections. */ - protected void cleanupCoordinatorLeader() throws Exception { + protected void cleanupCoordinatorLeader() { synchronized (lock) { LOG.info("Cleaning up coordinator leader services."); + try { + // make sure the current coordinator leader node is unregistered. + // Different from ZK disconnection, + // when we actively release the Leader's election, + // we need to manually delete the node + unregisterCoordinatorLeader(); + } catch (Throwable t) { + LOG.warn("Failed to unregister coordinator leader from Zookeeper", t); + } + // Clean up leader-specific resources in reverse order of initialization try { if (coordinatorEventProcessor != null) { @@ -427,6 +442,11 @@ private void registerCoordinatorLeader() throws Exception { "coordinator leader", () -> zkClient.registerCoordinatorLeader(coordinatorAddress)); } + private void unregisterCoordinatorLeader() throws Exception { + CoordinatorAddress coordinatorAddress = buildCoordinatorAddress(); + zkClient.unregisterCoordinatorLeader(coordinatorAddress); + } + private CoordinatorAddress buildCoordinatorAddress() { List bindEndpoints = rpcServer.getBindEndpoints(); return new CoordinatorAddress( diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachine.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachine.java index 091c2cbc9c..2416b163b5 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachine.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachine.java @@ -487,7 +487,8 @@ private Map doRemoveReplicaFromIsr( toUpdateLeaderAndIsrList.put(tableBucket, adjustLeaderAndIsr); } try { - zooKeeperClient.batchUpdateLeaderAndIsr(toUpdateLeaderAndIsrList); + zooKeeperClient.batchUpdateLeaderAndIsr( + toUpdateLeaderAndIsrList, coordinatorContext.getCoordinatorEpochZkVersion()); toUpdateLeaderAndIsrList.forEach(coordinatorContext::putBucketLeaderAndIsr); return adjustedLeaderAndIsr; } catch (Exception e) { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachine.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachine.java index 85dcc434f4..b01e65c92d 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachine.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachine.java @@ -311,7 +311,10 @@ private Optional initLeaderForTableBuckets( ElectionResult electionResult = optionalElectionResult.get(); LeaderAndIsr leaderAndIsr = electionResult.leaderAndIsr; try { - zooKeeperClient.registerLeaderAndIsr(tableBucket, leaderAndIsr); + zooKeeperClient.registerLeaderAndIsr( + tableBucket, + leaderAndIsr, + coordinatorContext.getCoordinatorEpochZkVersion()); } catch (Exception e) { LOG.error( "Fail to create state node for table bucket {} in zookeeper.", @@ -379,7 +382,8 @@ public void batchHandleOnlineChangeAndInitLeader(Set tableBuckets) if (!tableBucketLeadAndIsrInfos.isEmpty()) { try { zooKeeperClient.batchRegisterLeaderAndIsrForTablePartition( - tableBucketLeadAndIsrInfos); + tableBucketLeadAndIsrInfos, + coordinatorContext.getCoordinatorEpochZkVersion()); registerSuccessList.addAll(tableBucketLeadAndIsrInfos); } catch (Exception e) { LOG.error( @@ -454,7 +458,10 @@ private List tryRegisterLeaderAndIsrOneByOne( List registerSuccessList = new ArrayList<>(); for (RegisterTableBucketLeadAndIsrInfo info : registerList) { try { - zooKeeperClient.registerLeaderAndIsr(info.getTableBucket(), info.getLeaderAndIsr()); + zooKeeperClient.registerLeaderAndIsr( + info.getTableBucket(), + info.getLeaderAndIsr(), + coordinatorContext.getCoordinatorEpochZkVersion()); registerSuccessList.add(info); } catch (Exception e) { LOG.error( @@ -496,7 +503,10 @@ private Optional electNewLeaderForTableBuckets( } ElectionResult electionResult = optionalElectionResult.get(); try { - zooKeeperClient.updateLeaderAndIsr(tableBucket, electionResult.leaderAndIsr); + zooKeeperClient.updateLeaderAndIsr( + tableBucket, + electionResult.leaderAndIsr, + coordinatorContext.getCoordinatorEpochZkVersion()); } catch (Exception e) { LOG.error( "Fail to update bucket LeaderAndIsr for table bucket {}.", diff --git a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/CoordinatorMetricGroup.java b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/CoordinatorMetricGroup.java index 72ff328a7d..5ae0999230 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/CoordinatorMetricGroup.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/CoordinatorMetricGroup.java @@ -71,7 +71,7 @@ protected String getGroupName(CharacterFilter filter) { protected final void putVariables(Map variables) { variables.put("cluster_id", clusterId); variables.put("host", hostname); - variables.put("server_id", serverId); + variables.put("server_id", String.valueOf(serverId)); } public CoordinatorEventMetricGroup getOrAddEventTypeMetricGroup( diff --git a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java index ee3237032c..6960b51910 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java @@ -1991,6 +1991,11 @@ public TabletServerMetricGroup getServerMetricGroup() { return serverMetricGroup; } + @VisibleForTesting + public void resetCoordinatorEpoch() { + this.coordinatorEpoch = CoordinatorContext.INITIAL_COORDINATOR_EPOCH; + } + /** * Interface to represent the state of hosted {@link Replica}. We create a concrete (active) * {@link Replica} instance when the TabletServer receives a createLogLeader request or diff --git a/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java b/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java index 02d02c29ee..d37c36148e 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java @@ -461,6 +461,7 @@ public static MetadataResponse buildMetadataResponse( public static UpdateMetadataRequest makeUpdateMetadataRequest( @Nullable ServerInfo coordinatorServer, + @Nullable Integer coordinatorEpoch, Set aliveTableServers, List tableMetadataList, List partitionMetadataList) { @@ -503,6 +504,9 @@ public static UpdateMetadataRequest makeUpdateMetadataRequest( updateMetadataRequest.addAllTableMetadatas(pbTableMetadataList); updateMetadataRequest.addAllPartitionMetadatas(pbPartitionMetadataList); + if (coordinatorEpoch != null) { + updateMetadataRequest.setCoordinatorEpoch(coordinatorEpoch); + } return updateMetadataRequest; } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZkEpoch.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZkEpoch.java new file mode 100644 index 0000000000..f3d401a255 --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZkEpoch.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.zk; + +/** Class for coordinator epoch and coordinator epoch zk version. */ +public class ZkEpoch { + private final int coordinatorEpoch; + private final int coordinatorEpochZkVersion; + + public ZkEpoch(int coordinatorEpoch, int coordinatorEpochZkVersion) { + this.coordinatorEpoch = coordinatorEpoch; + this.coordinatorEpochZkVersion = coordinatorEpochZkVersion; + } + + public ZkEpoch nextZkEpoch() { + return new ZkEpoch(coordinatorEpoch + 1, coordinatorEpochZkVersion + 1); + } + + public int getCoordinatorEpoch() { + return coordinatorEpoch; + } + + public int getCoordinatorEpochZkVersion() { + return coordinatorEpochZkVersion; + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java index 78667ca48d..3c8b58fa2c 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java @@ -22,6 +22,7 @@ import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; import org.apache.fluss.config.FlussConfigUtils; +import org.apache.fluss.exception.CoordinatorEpochFencedException; import org.apache.fluss.metadata.DatabaseSummary; import org.apache.fluss.metadata.PhysicalTablePath; import org.apache.fluss.metadata.ResolvedPartitionSpec; @@ -34,6 +35,7 @@ import org.apache.fluss.security.acl.Resource; import org.apache.fluss.security.acl.ResourceType; import org.apache.fluss.server.authorizer.DefaultAuthorizer.VersionedAcls; +import org.apache.fluss.server.coordinator.CoordinatorContext; import org.apache.fluss.server.entity.RegisterTableBucketLeadAndIsrInfo; import org.apache.fluss.server.metadata.BucketMetadata; import org.apache.fluss.server.zk.ZkAsyncRequest.ZkCheckExistsRequest; @@ -86,6 +88,7 @@ import org.apache.fluss.server.zk.data.ZkData.TableZNode; import org.apache.fluss.server.zk.data.ZkData.TablesZNode; import org.apache.fluss.server.zk.data.ZkData.WriterIdZNode; +import org.apache.fluss.server.zk.data.ZkVersion; import org.apache.fluss.server.zk.data.lake.LakeTable; import org.apache.fluss.server.zk.data.lake.LakeTableSnapshot; import org.apache.fluss.server.zk.data.lease.KvSnapshotLeaseMetadata; @@ -127,11 +130,33 @@ import static java.util.stream.Collectors.toMap; import static org.apache.fluss.metadata.ResolvedPartitionSpec.fromPartitionName; +import static org.apache.fluss.server.zk.ZooKeeperOp.multiRequest; import static org.apache.fluss.utils.Preconditions.checkNotNull; /** * This class includes methods for write/read various metadata (leader address, tablet server * registration, table assignment, table, schema) in Zookeeper. + * + *

In some method, 'expectedZkVersion' is used to execute an epoch Zookeeper version check. + * Conditions requiring epoch checks (all must be met): + * ┌─────────────────────────────────────────────────────────┐ │ 1. Invoked by the Coordinator (not + * the TabletServer) │ │ 2. Operates on persistent nodes (not ephemeral) │ │ 3. Constitutes a + * "control plane" operation: │ │ partition assignment or LeaderAndIsr election │ │ 4. Concurrent + * access to the same path by old and new │ │ leaders during leader failover │ │ 5. No other + * mechanisms (optimistic locking, │ │ idempotency, or reloading) provide fallback │ + * └─────────────────────────────────────────────────────────┘ + * + *

In practice, only two types of operations truly require this: - CRUD for Table/Partition + * Assignment (assignment decisions). - CRUD for LeaderAndIsr (leader election results). + * + *

These operations are inevitably executed concurrently by the old and new coordinators during + * failover (as the new leader immediately reassigns partitions), and overwrites cannot be + * automatically recovered. + * + *

All other operations do not require epoch checks because: - DDL operations are protected + * against concurrency by client reconnection mechanisms. - TabletServer operations are unaffected + * by coordinator failovers. - ACLs and Configs have their own version control or idempotency + * guarantees. - Ephemeral nodes are managed via session lifecycle. */ @Internal public class ZooKeeperClient implements AutoCloseable { @@ -144,6 +169,7 @@ public class ZooKeeperClient implements AutoCloseable { private final CuratorFrameworkWithUnhandledErrorListener curatorFrameworkWrapper; private final CuratorFramework zkClient; + private final ZooKeeperOp zkOp; private final ZkSequenceIDCounter tableIdCounter; private final ZkSequenceIDCounter partitionIdCounter; private final ZkSequenceIDCounter writerIdCounter; @@ -158,6 +184,7 @@ public ZooKeeperClient( Configuration configuration) { this.curatorFrameworkWrapper = curatorFrameworkWrapper; this.zkClient = curatorFrameworkWrapper.asCuratorFramework(); + this.zkOp = new ZooKeeperOp(zkClient); this.tableIdCounter = new ZkSequenceIDCounter(zkClient, TableSequenceIdZNode.path()); this.partitionIdCounter = new ZkSequenceIDCounter(zkClient, PartitionSequenceIdZNode.path()); @@ -197,6 +224,49 @@ public void registerCoordinatorServer(CoordinatorAddress coordinatorAddress) thr LOG.info("Registered Coordinator server {} at path {}.", coordinatorAddress, path); } + /** + * Become coordinator leader. This method is a step after electCoordinatorLeader() and before + * registerCoordinatorLeader(). This is to ensure the coordinator get and update the coordinator + * epoch and coordinator epoch zk version. + */ + public ZkEpoch fenceBecomeCoordinatorLeader(String coordinatorId) throws Exception { + ensureEpochZnodeExists(); + + try { + ZkEpoch getEpoch = getCurrentEpoch(); + int currentCoordinatorEpoch = getEpoch.getCoordinatorEpoch(); + int currentCoordinatorEpochZkVersion = getEpoch.getCoordinatorEpochZkVersion(); + int newCoordinatorEpoch = currentCoordinatorEpoch + 1; + LOG.info( + "Coordinator leader {} tries to update epoch. Current epoch={}, Zookeeper version={}, new epoch={}", + coordinatorId, + currentCoordinatorEpoch, + currentCoordinatorEpochZkVersion, + newCoordinatorEpoch); + + // atomically update epoch + Stat stat = + zkClient.setData() + .withVersion(currentCoordinatorEpochZkVersion) + .forPath( + ZkData.CoordinatorEpochZNode.path(), + ZkData.CoordinatorEpochZNode.encode(newCoordinatorEpoch)); + + LOG.info( + "Coordinator leader has updated epoch. Current epoch={}, Zookeeper version={}", + newCoordinatorEpoch, + stat.getVersion()); + + return new ZkEpoch(newCoordinatorEpoch, stat.getVersion()); + } catch (KeeperException.BadVersionException e) { + // Other coordinator leader has updated epoch. + // If this happens, it means our fence is in effect. + LOG.info("Coordinator leader {} failed to update epoch.", coordinatorId); + throw new CoordinatorEpochFencedException( + "Coordinator leader election has been fenced."); + } + } + /** Register a coordinator leader to ZK. */ public void registerCoordinatorLeader(CoordinatorAddress coordinatorAddress) throws Exception { String path = ZkData.CoordinatorLeaderZNode.path(); @@ -207,6 +277,23 @@ public void registerCoordinatorLeader(CoordinatorAddress coordinatorAddress) thr LOG.info("Registered Coordinator leader {} at path {}.", coordinatorAddress, path); } + /** Manually unregister a coordinator leader from ZK. */ + public void unregisterCoordinatorLeader(CoordinatorAddress coordinatorAddress) + throws Exception { + String path = ZkData.CoordinatorLeaderZNode.path(); + Stat stat = new Stat(); + byte[] bytes = zkClient.getData().storingStatIn(stat).forPath(path); + if (bytes == null || bytes.length == 0) { + // not exists, not need to unregister + return; + } + CoordinatorAddress storedAddress = ZkData.CoordinatorLeaderZNode.decode(bytes); + if (storedAddress.getId().equals(coordinatorAddress.getId())) { + zkClient.delete().withVersion(stat.getVersion()).forPath(path); + LOG.info("Unregistered Coordinator leader {} at path {}.", coordinatorAddress, path); + } + } + /** Get the leader address registered in ZK. */ public Optional getCoordinatorLeaderAddress() throws Exception { Optional bytes = getOrEmpty(ZkData.CoordinatorLeaderZNode.path()); @@ -221,6 +308,35 @@ public List getCoordinatorServerList() throws Exception { return getChildren(ZkData.CoordinatorIdsZNode.path()); } + /** Ensure epoch znode exists. */ + public void ensureEpochZnodeExists() throws Exception { + String path = ZkData.CoordinatorEpochZNode.path(); + if (zkClient.checkExists().forPath(path) == null) { + try { + zkClient.create() + .creatingParentsIfNeeded() + .withMode(CreateMode.PERSISTENT) + .forPath( + path, + ZkData.CoordinatorEpochZNode.encode( + CoordinatorContext.INITIAL_COORDINATOR_EPOCH - 1)); + } catch (KeeperException.NodeExistsException e) { + // can be ignored when two coordinator almost simultaneously create the epoch znode + } + } + } + + /** Get epoch now in ZK. */ + public ZkEpoch getCurrentEpoch() throws Exception { + Stat currentStat = new Stat(); + byte[] bytes = + zkClient.getData() + .storingStatIn(currentStat) + .forPath(ZkData.CoordinatorEpochZNode.path()); + int currentEpoch = ZkData.CoordinatorEpochZNode.decode(bytes); + int currentVersion = currentStat.getVersion(); + return new ZkEpoch(currentEpoch, currentVersion); + } // -------------------------------------------------------------------------------------------- // Tablet server // -------------------------------------------------------------------------------------------- @@ -329,17 +445,26 @@ public Map getPartitionsAssignments(Collection "partition assignment"); } - public void updateTableAssignment(long tableId, TableAssignment tableAssignment) - throws Exception { + public void updateTableAssignment( + long tableId, TableAssignment tableAssignment, int expectedZkVersion) throws Exception { String path = TableIdZNode.path(tableId); - zkClient.setData().forPath(path, TableIdZNode.encode(tableAssignment)); + byte[] data = TableIdZNode.encode(tableAssignment); + CuratorOp updateOp = zkOp.updateOp(path, data); + List ops = wrapRequestWithEpochCheck(updateOp, expectedZkVersion); + + zkClient.transaction().forOperations(ops); LOG.debug("Updated table assignment {} for table id {}.", tableAssignment, tableId); } - public void updatePartitionAssignment(long partitionId, PartitionAssignment partitionAssignment) + public void updatePartitionAssignment( + long partitionId, PartitionAssignment partitionAssignment, int expectedZkVersion) throws Exception { String path = PartitionIdZNode.path(partitionId); - zkClient.setData().forPath(path, PartitionIdZNode.encode(partitionAssignment)); + byte[] data = PartitionIdZNode.encode(partitionAssignment); + CuratorOp updateOp = zkOp.updateOp(path, data); + List ops = wrapRequestWithEpochCheck(updateOp, expectedZkVersion); + + zkClient.transaction().forOperations(ops); LOG.debug( "Updated partition assignment {} for partition id {}.", partitionAssignment, @@ -363,18 +488,20 @@ public void deletePartitionAssignment(long partitionId) throws Exception { // -------------------------------------------------------------------------------------------- /** Register bucket LeaderAndIsr to ZK. */ - public void registerLeaderAndIsr(TableBucket tableBucket, LeaderAndIsr leaderAndIsr) + public void registerLeaderAndIsr( + TableBucket tableBucket, LeaderAndIsr leaderAndIsr, int expectedZkVersion) throws Exception { + String path = LeaderAndIsrZNode.path(tableBucket); - zkClient.create() - .creatingParentsIfNeeded() - .withMode(CreateMode.PERSISTENT) - .forPath(path, LeaderAndIsrZNode.encode(leaderAndIsr)); + byte[] data = LeaderAndIsrZNode.encode(leaderAndIsr); + + createRecursiveWithEpochCheck(path, data, expectedZkVersion, false); LOG.info("Registered {} for bucket {} in Zookeeper.", leaderAndIsr, tableBucket); } public void batchRegisterLeaderAndIsrForTablePartition( - List registerList) throws Exception { + List registerList, int expectedZkVersion) + throws Exception { if (registerList.isEmpty()) { return; } @@ -410,12 +537,14 @@ public void batchRegisterLeaderAndIsrForTablePartition( ops.add(parentNodeCreate); ops.add(currentNodeCreate); if (ops.size() == MAX_BATCH_SIZE) { - zkClient.transaction().forOperations(ops); + List wrapOps = wrapRequestsWithEpochCheck(ops, expectedZkVersion); + zkClient.transaction().forOperations(wrapOps); ops.clear(); } } if (!ops.isEmpty()) { - zkClient.transaction().forOperations(ops); + List wrapOps = wrapRequestsWithEpochCheck(ops, expectedZkVersion); + zkClient.transaction().forOperations(wrapOps); } LOG.info( "Batch registered leadAndIsr for tableId: {}, partitionId: {}, partitionName: {} in Zookeeper.", @@ -447,14 +576,21 @@ public Map getLeaderAndIsrs(Collection t "leader and isr"); } - public void updateLeaderAndIsr(TableBucket tableBucket, LeaderAndIsr leaderAndIsr) + public void updateLeaderAndIsr( + TableBucket tableBucket, LeaderAndIsr leaderAndIsr, int expectedZkVersion) throws Exception { String path = LeaderAndIsrZNode.path(tableBucket); - zkClient.setData().forPath(path, LeaderAndIsrZNode.encode(leaderAndIsr)); + byte[] data = LeaderAndIsrZNode.encode(leaderAndIsr); + + CuratorOp updateOp = zkOp.updateOp(path, data); + List ops = wrapRequestWithEpochCheck(updateOp, expectedZkVersion); + + zkClient.transaction().forOperations(ops); LOG.info("Updated {} for bucket {} in Zookeeper.", leaderAndIsr, tableBucket); } - public void batchUpdateLeaderAndIsr(Map leaderAndIsrList) + public void batchUpdateLeaderAndIsr( + Map leaderAndIsrList, int expectedZkVersion) throws Exception { if (leaderAndIsrList.isEmpty()) { return; @@ -471,16 +607,18 @@ public void batchUpdateLeaderAndIsr(Map leaderAndIsrL CuratorOp updateOp = zkClient.transactionOp().setData().forPath(path, data); ops.add(updateOp); if (ops.size() == MAX_BATCH_SIZE) { - zkClient.transaction().forOperations(ops); + List wrapOps = wrapRequestsWithEpochCheck(ops, expectedZkVersion); + zkClient.transaction().forOperations(wrapOps); ops.clear(); } } if (!ops.isEmpty()) { - zkClient.transaction().forOperations(ops); + List wrapOps = wrapRequestsWithEpochCheck(ops, expectedZkVersion); + zkClient.transaction().forOperations(wrapOps); } } - public void deleteLeaderAndIsr(TableBucket tableBucket) throws Exception { + protected void deleteLeaderAndIsr(TableBucket tableBucket) throws Exception { String path = LeaderAndIsrZNode.path(tableBucket); zkClient.delete().forPath(path); LOG.info("Deleted LeaderAndIsr for bucket {} in Zookeeper.", tableBucket); @@ -1781,6 +1919,79 @@ public static Map> processGetChildrenResponses( return result; } + /** + * create a node (recursively if parent path not exists) with Zk epoch version check. + * + * @param path the path to create + * @param data the data to write + * @param throwIfPathExists whether to throw exception if path exist + * @throws Exception if any error occurs + */ + public void createRecursiveWithEpochCheck( + String path, byte[] data, int expectedZkVersion, boolean throwIfPathExists) + throws Exception { + CuratorOp createOp = zkOp.createOp(path, data, CreateMode.PERSISTENT); + List ops = wrapRequestWithEpochCheck(createOp, expectedZkVersion); + + try { + // try to directly create + zkClient.transaction().forOperations(ops); + } catch (KeeperException.NodeExistsException e) { + // should not exist + if (throwIfPathExists) { + throw e; + } + } catch (KeeperException.NoNodeException e) { + // if parent does not exist, create parent first + int indexOfLastSlash = path.lastIndexOf("/"); + if (indexOfLastSlash == -1) { + throw new IllegalArgumentException("Invalid path: " + path); + } else if (indexOfLastSlash == 0) { + // root path can be directly create without fence + try { + zkClient.create() + .creatingParentsIfNeeded() + .withMode(CreateMode.PERSISTENT) + .forPath(path); + } catch (KeeperException.NodeExistsException ignored) { + // ignore + } + } else { + // indexOfLastSlash > 0 + String parentPath = path.substring(0, indexOfLastSlash); + createRecursiveWithEpochCheck( + parentPath, null, expectedZkVersion, throwIfPathExists); + // After creating parent (or if parent is root), retry creating the original path + zkClient.transaction().forOperations(ops); + } + } catch (KeeperException.BadVersionException e) { + LOG.error("Bad version for path {}, expected version {} ", path, expectedZkVersion); + throw e; + } + } + + public List wrapRequestWithEpochCheck(CuratorOp request, int expectedZkVersion) + throws Exception { + return wrapRequestsWithEpochCheck(Collections.singletonList(request), expectedZkVersion); + } + + public List wrapRequestsWithEpochCheck( + List requestList, int expectedZkVersion) throws Exception { + if (ZkVersion.MATCH_ANY_VERSION.getVersion() == expectedZkVersion) { + return requestList; + } else if (expectedZkVersion >= 0) { + CuratorOp checkOp = + zkOp.checkOp(ZkData.CoordinatorEpochZNode.path(), expectedZkVersion); + return multiRequest(checkOp, requestList); + } else { + throw new IllegalArgumentException( + "Expected coordinator epoch zkVersion " + + expectedZkVersion + + " should be non-negative or equal to " + + ZkVersion.MATCH_ANY_VERSION.getVersion()); + } + } + // -------------------------------------------------------------------------------------------- // Producer Offset Snapshot // -------------------------------------------------------------------------------------------- diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperOp.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperOp.java new file mode 100644 index 0000000000..e44670d643 --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperOp.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.zk; + +import org.apache.fluss.shaded.curator5.org.apache.curator.framework.CuratorFramework; +import org.apache.fluss.shaded.curator5.org.apache.curator.framework.api.transaction.CuratorOp; +import org.apache.fluss.shaded.zookeeper3.org.apache.zookeeper.CreateMode; + +import java.util.ArrayList; +import java.util.List; + +/** This class contains some utility methods for wrap/unwrap operations for Zookeeper. */ +public class ZooKeeperOp { + private final CuratorFramework zkClient; + + public ZooKeeperOp(CuratorFramework zkClient) { + this.zkClient = zkClient; + } + + public CuratorOp checkOp(String path, int expectZkVersion) throws Exception { + return zkClient.transactionOp().check().withVersion(expectZkVersion).forPath(path); + } + + public CuratorOp createOp(String path, byte[] data, CreateMode createMode) throws Exception { + return zkClient.transactionOp().create().withMode(createMode).forPath(path, data); + } + + public CuratorOp updateOp(String path, byte[] data) throws Exception { + return zkClient.transactionOp().setData().forPath(path, data); + } + + public CuratorOp deleteOp(String path) throws Exception { + return zkClient.transactionOp().delete().forPath(path); + } + + public static List multiRequest(CuratorOp op1, CuratorOp op2) { + List ops = new ArrayList<>(); + ops.add(op1); + ops.add(op2); + return ops; + } + + public static List multiRequest(CuratorOp op, List ops) { + List result = new ArrayList<>(); + result.add(op); + result.addAll(ops); + return result; + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java index 32b5a7c01d..941512e0f2 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java @@ -366,6 +366,24 @@ public static CoordinatorAddress decode(byte[] json) { } } + /** + * The znode for the coordinator epoch. The znode path is: + * + *

/coordinators/epoch + */ + public static final class CoordinatorEpochZNode { + public static String path() { + return "/coordinators/epoch"; + } + + public static byte[] encode(int epoch) { + return String.valueOf(epoch).getBytes(StandardCharsets.UTF_8); + } + + public static int decode(byte[] bytes) { + return Integer.parseInt(new String(bytes, StandardCharsets.UTF_8)); + } + } // ------------------------------------------------------------------------------------------ // ZNodes under "/tabletservers/" // ------------------------------------------------------------------------------------------ diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkVersion.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkVersion.java new file mode 100644 index 0000000000..26242df0f3 --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkVersion.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.zk.data; + +/** Enum to represent the type of special Zookeeper version. */ +public enum ZkVersion { + MATCH_ANY_VERSION(-1), + UNKNOWN_VERSION(-2); + + private final int version; + + ZkVersion(int version) { + this.version = version; + } + + public int getVersion() { + return version; + } +} diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorChannelManagerTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorChannelManagerTest.java index 9d8eda15bf..31c07d75cd 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorChannelManagerTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorChannelManagerTest.java @@ -92,6 +92,7 @@ private void checkSendRequest( // we use update metadata request to test for simplicity UpdateMetadataRequest updateMetadataRequest = makeUpdateMetadataRequest( + null, null, Collections.emptySet(), Collections.emptyList(), diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java index 997a8738f6..cd036b3641 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java @@ -65,6 +65,7 @@ import org.apache.fluss.server.metrics.group.TestingMetricGroups; import org.apache.fluss.server.tablet.TestTabletServerGateway; import org.apache.fluss.server.zk.NOPErrorHandler; +import org.apache.fluss.server.zk.ZkEpoch; import org.apache.fluss.server.zk.ZooKeeperClient; import org.apache.fluss.server.zk.ZooKeeperExtension; import org.apache.fluss.server.zk.data.BucketAssignment; @@ -148,6 +149,7 @@ class CoordinatorEventProcessorTest { private static ZooKeeperClient zookeeperClient; private static MetadataManager metadataManager; + private static ZkEpoch zkEpoch; private CoordinatorEventProcessor eventProcessor; private final String defaultDatabase = "db"; @@ -176,6 +178,7 @@ static void baseBeforeAll() throws Exception { new CoordinatorAddress( "2", Endpoint.fromListenersString("CLIENT://localhost:10012"))); + zkEpoch = zookeeperClient.fenceBecomeCoordinatorLeader("2"); // register 3 tablet servers for (int i = 0; i < 3; i++) { zookeeperClient.registerTabletServer( @@ -1058,9 +1061,10 @@ private void verifyIsr(TableBucket tb, int expectedLeader, List expecte private CoordinatorEventProcessor buildCoordinatorEventProcessor() { return new CoordinatorEventProcessor( zookeeperClient, + zkEpoch, serverMetadataCache, testCoordinatorChannelManager, - new CoordinatorContext(), + new TestCoordinatorContext(), autoPartitionManager, lakeTableTieringManager, TestingMetricGroups.COORDINATOR_METRICS, diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorHighAvailabilityITCase.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorHighAvailabilityITCase.java index 5d2bcd5745..e974ec2d97 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorHighAvailabilityITCase.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorHighAvailabilityITCase.java @@ -23,18 +23,25 @@ import org.apache.fluss.cluster.ServerType; import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; +import org.apache.fluss.exception.InvalidCoordinatorException; import org.apache.fluss.exception.NotCoordinatorLeaderException; +import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.rpc.GatewayClientProxy; import org.apache.fluss.rpc.RpcClient; import org.apache.fluss.rpc.gateway.CoordinatorGateway; +import org.apache.fluss.rpc.gateway.TabletServerGateway; import org.apache.fluss.rpc.messages.CreateDatabaseRequest; import org.apache.fluss.rpc.messages.MetadataRequest; +import org.apache.fluss.rpc.messages.UpdateMetadataRequest; import org.apache.fluss.rpc.metrics.TestingClientMetricGroup; +import org.apache.fluss.server.tablet.TabletServer; import org.apache.fluss.server.zk.NOPErrorHandler; import org.apache.fluss.server.zk.ZooKeeperClient; import org.apache.fluss.server.zk.ZooKeeperExtension; import org.apache.fluss.server.zk.data.CoordinatorAddress; +import org.apache.fluss.server.zk.data.LeaderAndIsr; import org.apache.fluss.shaded.curator5.org.apache.curator.framework.CuratorFramework; +import org.apache.fluss.shaded.zookeeper3.org.apache.zookeeper.KeeperException; import org.apache.fluss.shaded.zookeeper3.org.apache.zookeeper.Watcher; import org.apache.fluss.shaded.zookeeper3.org.apache.zookeeper.ZooKeeper; import org.apache.fluss.testutils.common.AllCallbackWrapper; @@ -44,7 +51,9 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.io.TempDir; +import java.nio.file.Path; import java.time.Duration; import java.util.Arrays; import java.util.List; @@ -78,8 +87,11 @@ class CoordinatorHighAvailabilityITCase { private CoordinatorServer coordinatorServer1; private CoordinatorServer coordinatorServer2; + private TabletServer tabletServer; private RpcClient rpcClient; + @TempDir Path tempDir; + @BeforeAll static void baseBeforeAll() { zookeeperClient = @@ -102,6 +114,9 @@ void tearDown() throws Exception { if (coordinatorServer2 != null) { coordinatorServer2.close(); } + if (tabletServer != null) { + tabletServer.close(); + } if (rpcClient != null) { rpcClient.close(); } @@ -242,6 +257,122 @@ void testMultipleLeadershipLossAndRecovery() throws Exception { createGatewayForServer(finalLeader).metadata(new MetadataRequest()).get(); } + @Test + void testTabletServerRejectsStaleCoordinatorEpochAfterLeaderSwitch() throws Exception { + coordinatorServer1 = new CoordinatorServer(createConfiguration()); + coordinatorServer2 = new CoordinatorServer(createConfiguration()); + tabletServer = new TabletServer(createTabletServerConfiguration()); + + coordinatorServer1.start(); + coordinatorServer2.start(); + tabletServer.start(); + + waitUntilCoordinatorServerElected(); + CoordinatorAddress firstLeaderAddr = zookeeperClient.getCoordinatorLeaderAddress().get(); + + CoordinatorServer leader = findServerById(firstLeaderAddr.getId()); + CoordinatorServer standby = findServerByNotId(firstLeaderAddr.getId()); + + // Record old coordinator epoch before killing the leader + int oldCoordinatorEpoch = leader.getCoordinatorEventProcessor().getCoordinatorEpoch(); + + killZkSession(leader); + waitUntilNewLeaderElected(leader.getServerId()); + assertThat(zookeeperClient.getCoordinatorLeaderAddress().get().getId()) + .as("After killing leader, standby should become leader") + .isEqualTo(standby.getServerId()); + + int newCoordinatorEpoch = standby.getCoordinatorEventProcessor().getCoordinatorEpoch(); + + TabletServerGateway tsGateway = createGatewayForTabletServer(tabletServer); + + // Send request with new coordinator epoch first to ensure the tablet server + // has updated its stored epoch to the latest value + tsGateway + .updateMetadata( + new UpdateMetadataRequest().setCoordinatorEpoch(newCoordinatorEpoch)) + .get(); + + // Send request with old coordinator epoch — tablet server should reject it + assertThatThrownBy( + () -> + tsGateway + .updateMetadata( + new UpdateMetadataRequest() + .setCoordinatorEpoch(oldCoordinatorEpoch)) + .get()) + .satisfies( + t -> + assertThat(getRootCause(t)) + .isInstanceOf(InvalidCoordinatorException.class)); + } + + @Test + void testZooKeeperRejectsStaleCoordinatorRequestAfterLeaderSwitch() throws Exception { + // 1. Start two coordinators, confirm leader + // 2. Record current coordinator epoch + // 3. Kill leader's ZK session, trigger leader switch + // 4. Wait for new leader election (epoch should increment) + // 5. Verify new leader can send requests to ZooKeeper + // 6. Verify requests with old ZkVersion epoch are rejected with BadVersionException + coordinatorServer1 = new CoordinatorServer(createConfiguration()); + coordinatorServer2 = new CoordinatorServer(createConfiguration()); + + coordinatorServer1.start(); + coordinatorServer2.start(); + + waitUntilCoordinatorServerElected(); + CoordinatorAddress firstLeaderAddr = zookeeperClient.getCoordinatorLeaderAddress().get(); + + CoordinatorServer leader = findServerById(firstLeaderAddr.getId()); + CoordinatorServer standby = findServerByNotId(firstLeaderAddr.getId()); + + killZkSession(leader); + waitUntilNewLeaderElected(leader.getServerId()); + assertThat(zookeeperClient.getCoordinatorLeaderAddress().get().getId()) + .as("After killing leader, standby should become leader") + .isEqualTo(standby.getServerId()); + + TableBucket tableBucket = new TableBucket(1, 1); + LeaderAndIsr leaderAndIsr = new LeaderAndIsr(0, 0, Arrays.asList(2, 3), 0, 0); + + int newLeaderEpochZkVersion = + standby.getCoordinatorEventProcessor() + .getCoordinatorContext() + .getCoordinatorEpochZkVersion(); + int oldLeaderEpochZkVersion = + leader.getCoordinatorEventProcessor() + .getCoordinatorContext() + .getCoordinatorEpochZkVersion(); + + assertThatThrownBy( + () -> + leader.getZooKeeperClient() + .registerLeaderAndIsr( + tableBucket, leaderAndIsr, oldLeaderEpochZkVersion)) + .satisfies( + t -> + assertThat(getRootCause(t)) + .isInstanceOf(KeeperException.BadVersionException.class)); + standby.getZooKeeperClient() + .registerLeaderAndIsr(tableBucket, leaderAndIsr, newLeaderEpochZkVersion); + assertThat(zookeeperClient.getLeaderAndIsr(tableBucket)).hasValue(leaderAndIsr); + } + + private TabletServerGateway createGatewayForTabletServer(TabletServer server) { + List endpoints = server.getRpcServer().getBindEndpoints(); + Endpoint endpoint = endpoints.get(0); + ServerNode serverNode = + new ServerNode( + server.getServerId(), + endpoint.getHost(), + endpoint.getPort(), + ServerType.TABLET_SERVER); + + return GatewayClientProxy.createGatewayProxy( + () -> serverNode, rpcClient, TabletServerGateway.class); + } + private CoordinatorGateway createGatewayForServer(CoordinatorServer server) { List endpoints = server.getRpcServer().getBindEndpoints(); Endpoint endpoint = endpoints.get(0); @@ -275,6 +406,13 @@ private static Configuration createConfiguration() { return configuration; } + private Configuration createTabletServerConfiguration() { + Configuration configuration = createConfiguration(); + configuration.set(ConfigOptions.TABLET_SERVER_ID, 0); + configuration.setString(ConfigOptions.DATA_DIR, tempDir.toAbsolutePath().toString()); + return configuration; + } + private void waitUntilCoordinatorServerElected() { waitUntil( () -> zookeeperClient.getCoordinatorLeaderAddress().isPresent(), diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorServerElectionTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorServerElectionTest.java index cf832a9555..e771823934 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorServerElectionTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorServerElectionTest.java @@ -83,6 +83,8 @@ void testCoordinatorServerElection() throws Exception { } } assertThat(firstLeader).isNotNull(); + assertThat(zookeeperClient.getCurrentEpoch().getCoordinatorEpoch()) + .isEqualTo(CoordinatorContext.INITIAL_COORDINATOR_EPOCH); firstLeader.close(); firstLeader.start(); @@ -91,6 +93,8 @@ void testCoordinatorServerElection() throws Exception { CoordinatorAddress secondLeaderAddress = zookeeperClient.getCoordinatorLeaderAddress().get(); assertThat(secondLeaderAddress).isNotEqualTo(firstLeaderAddress); + assertThat(zookeeperClient.getCurrentEpoch().getCoordinatorEpoch()) + .isEqualTo(CoordinatorContext.INITIAL_COORDINATOR_EPOCH + 1); CoordinatorServer secondLeader = null; for (CoordinatorServer coordinatorServer : coordinatorServerList) { @@ -117,6 +121,8 @@ void testCoordinatorServerElection() throws Exception { CoordinatorAddress thirdLeaderAddress = zookeeperClient.getCoordinatorLeaderAddress().get(); assertThat(thirdLeaderAddress.getId()).isEqualTo(firstLeaderAddress.getId()); + assertThat(zookeeperClient.getCurrentEpoch().getCoordinatorEpoch()) + .isEqualTo(CoordinatorContext.INITIAL_COORDINATOR_EPOCH + 2); } /** Create a configuration with Zookeeper address setting. */ diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorServerITCase.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorServerITCase.java index ae39bd185f..bd060d6eed 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorServerITCase.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorServerITCase.java @@ -63,7 +63,6 @@ protected Configuration getServerConfig() { ConfigOptions.BIND_LISTENERS, String.format("%s://%s:%d", DEFAULT_LISTENER_NAME, HOSTNAME, getPort())); conf.set(ConfigOptions.REMOTE_DATA_DIR, "/tmp/fluss/remote-data"); - return conf; } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java index 9f6c130640..b2abac8307 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java @@ -514,6 +514,7 @@ void testMetadata(boolean isCoordinatorServer) throws Exception { .updateMetadata( makeUpdateMetadataRequest( coordinatorServerInfo, + null, new HashSet<>(tabletServerInfos), Collections.emptyList(), Collections.emptyList())) @@ -652,6 +653,10 @@ void testMetadataCompatibility(boolean isCoordinatorServer) throws Exception { .updateMetadata( makeLegacyUpdateMetadataRequest( Optional.of(coordinatorServerInfo), + FLUSS_CLUSTER_EXTENSION + .getCoordinatorServer() + .getCoordinatorEventProcessor() + .getCoordinatorEpoch(), new HashSet<>(tabletServerInfos))) .get(); } @@ -873,7 +878,9 @@ private static Schema newPkSchema() { @SuppressWarnings("OptionalUsedAsFieldOrParameterType") private UpdateMetadataRequest makeLegacyUpdateMetadataRequest( - Optional coordinatorServer, Set aliveTableServers) { + Optional coordinatorServer, + int coordinatorEpoch, + Set aliveTableServers) { UpdateMetadataRequest updateMetadataRequest = new UpdateMetadataRequest(); Set aliveTableServerNodes = new HashSet<>(); for (ServerInfo serverInfo : aliveTableServers) { @@ -895,6 +902,7 @@ private UpdateMetadataRequest makeLegacyUpdateMetadataRequest( node -> { Endpoint endpoint = node.endpoints().get(0); updateMetadataRequest + .setCoordinatorEpoch(coordinatorEpoch) .setCoordinatorServer() .setNodeId(node.id()) .setHost(endpoint.getHost()) diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerTest.java index 024b12b336..2c30fc7482 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerTest.java @@ -113,7 +113,7 @@ static void afterAll() { private void initTableManager() { testingEventManager = new TestingEventManager(); - coordinatorContext = new CoordinatorContext(); + coordinatorContext = new TestCoordinatorContext(); testCoordinatorChannelManager = new TestCoordinatorChannelManager(); Configuration conf = new Configuration(); conf.setString(ConfigOptions.REMOTE_DATA_DIR, "/tmp/fluss/remote-data"); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorContext.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorContext.java new file mode 100644 index 0000000000..b931d564ff --- /dev/null +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorContext.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator; + +import org.apache.fluss.server.zk.data.ZkVersion; + +/** A coordinator context for test purpose which can set epoch manually. */ +public class TestCoordinatorContext extends CoordinatorContext { + public TestCoordinatorContext() { + // When create or modify ZooKeeper node, it should check ZooKeeper epoch node version + // to ensure the coordinator is still holding the leadership. However, in the test + // cases, we don't register epoch node, so we skip the check process by setting + // "coordinatorEpochZkVersion" to ZkVersion.MATCH_ANY_VERSION + super(); + this.setCoordinatorEpochAndZkVersion( + INITIAL_COORDINATOR_EPOCH, ZkVersion.MATCH_ANY_VERSION.getVersion()); + } + + public TestCoordinatorContext(int coordinatorEpoch, int coordinatorEpochZkVersion) { + super(); + this.setCoordinatorEpochAndZkVersion(coordinatorEpoch, coordinatorEpochZkVersion); + } +} diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManagerTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManagerTest.java index a961dc393e..57ba9fff7a 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManagerTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManagerTest.java @@ -30,6 +30,7 @@ import org.apache.fluss.server.metadata.CoordinatorMetadataCache; import org.apache.fluss.server.metrics.group.TestingMetricGroups; import org.apache.fluss.server.zk.NOPErrorHandler; +import org.apache.fluss.server.zk.ZkEpoch; import org.apache.fluss.server.zk.ZooKeeperClient; import org.apache.fluss.server.zk.ZooKeeperExtension; import org.apache.fluss.server.zk.data.RebalanceTask; @@ -60,6 +61,7 @@ public class RebalanceManagerTest { private static ZooKeeperClient zookeeperClient; private static MetadataManager metadataManager; + private static ZkEpoch zkEpoch; private CoordinatorMetadataCache serverMetadataCache; private TestCoordinatorChannelManager testCoordinatorChannelManager; @@ -69,11 +71,12 @@ public class RebalanceManagerTest { private KvSnapshotLeaseManager kvSnapshotLeaseManager; @BeforeAll - static void baseBeforeAll() { + static void baseBeforeAll() throws Exception { zookeeperClient = ZOO_KEEPER_EXTENSION_WRAPPER .getCustomExtension() .getZooKeeperClient(NOPErrorHandler.INSTANCE); + zkEpoch = zookeeperClient.fenceBecomeCoordinatorLeader("1"); } @BeforeEach @@ -134,6 +137,7 @@ void testRebalanceWithoutTask() throws Exception { private CoordinatorEventProcessor buildCoordinatorEventProcessor() { return new CoordinatorEventProcessor( zookeeperClient, + zkEpoch, serverMetadataCache, testCoordinatorChannelManager, new CoordinatorContext(), diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachineTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachineTest.java index c7c7591d5a..4c89b5722e 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachineTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachineTest.java @@ -30,6 +30,7 @@ import org.apache.fluss.server.coordinator.CoordinatorRequestBatch; import org.apache.fluss.server.coordinator.CoordinatorTestUtils; import org.apache.fluss.server.coordinator.TestCoordinatorChannelManager; +import org.apache.fluss.server.coordinator.TestCoordinatorContext; import org.apache.fluss.server.coordinator.event.DeleteReplicaResponseReceivedEvent; import org.apache.fluss.server.entity.DeleteReplicaResultForBucket; import org.apache.fluss.server.metadata.ServerInfo; @@ -37,6 +38,7 @@ import org.apache.fluss.server.zk.ZooKeeperClient; import org.apache.fluss.server.zk.ZooKeeperExtension; import org.apache.fluss.server.zk.data.LeaderAndIsr; +import org.apache.fluss.server.zk.data.ZkVersion; import org.apache.fluss.testutils.common.AllCallbackWrapper; import org.junit.jupiter.api.BeforeAll; @@ -80,7 +82,7 @@ static void baseBeforeAll() { @Test void testStartup() { - CoordinatorContext coordinatorContext = new CoordinatorContext(); + CoordinatorContext coordinatorContext = new TestCoordinatorContext(); // init coordinator server context with a table assignment TableBucket tableBucket = new TableBucket(1, 0); @@ -105,7 +107,7 @@ void testStartup() { @Test void testReplicaStateChange() { - CoordinatorContext coordinatorContext = new CoordinatorContext(); + CoordinatorContext coordinatorContext = new TestCoordinatorContext(); ReplicaStateMachine replicaStateMachine = createReplicaStateMachine(coordinatorContext); // test check valid replica state change @@ -134,7 +136,7 @@ void testReplicaStateChange() { @Test void testDeleteReplicaStateChange() { Map isReplicaDeleteSuccess = new HashMap<>(); - CoordinatorContext coordinatorContext = new CoordinatorContext(); + CoordinatorContext coordinatorContext = new TestCoordinatorContext(); coordinatorContext.setLiveTabletServers( CoordinatorTestUtils.createServers(Arrays.asList(0, 1))); // use a context that will return a gateway that always get success ack @@ -191,7 +193,7 @@ void testDeleteReplicaStateChange() { @Test void testOfflineReplicasShouldBeRemovedFromIsr() throws Exception { - CoordinatorContext coordinatorContext = new CoordinatorContext(); + CoordinatorContext coordinatorContext = new TestCoordinatorContext(); coordinatorContext.setLiveTabletServers(createServers(new int[] {0, 1, 2})); ReplicaStateMachine replicaStateMachine = createReplicaStateMachine(coordinatorContext); @@ -216,7 +218,8 @@ void testOfflineReplicasShouldBeRemovedFromIsr() throws Exception { } // put leader and isr LeaderAndIsr leaderAndIsr = new LeaderAndIsr(0, 0, Arrays.asList(0, 1, 2), 0, 0); - zookeeperClient.registerLeaderAndIsr(tableBucket, leaderAndIsr); + zookeeperClient.registerLeaderAndIsr( + tableBucket, leaderAndIsr, ZkVersion.MATCH_ANY_VERSION.getVersion()); coordinatorContext.updateBucketReplicaAssignment(tableBucket, Arrays.asList(0, 1, 2)); coordinatorContext.putBucketLeaderAndIsr(tableBucket, leaderAndIsr); @@ -230,7 +233,7 @@ void testOfflineReplicasShouldBeRemovedFromIsr() throws Exception { @Test void testOfflineReplicaShouldBeRemovedFromIsr() throws Exception { - CoordinatorContext coordinatorContext = new CoordinatorContext(); + CoordinatorContext coordinatorContext = new TestCoordinatorContext(); coordinatorContext.setLiveTabletServers(createServers(new int[] {0, 1, 2})); ReplicaStateMachine replicaStateMachine = createReplicaStateMachine(coordinatorContext); @@ -253,7 +256,8 @@ void testOfflineReplicaShouldBeRemovedFromIsr() throws Exception { } // put leader and isr LeaderAndIsr leaderAndIsr = new LeaderAndIsr(0, 0, Arrays.asList(0, 1, 2), 0, 0); - zookeeperClient.registerLeaderAndIsr(tableBucket, leaderAndIsr); + zookeeperClient.registerLeaderAndIsr( + tableBucket, leaderAndIsr, ZkVersion.MATCH_ANY_VERSION.getVersion()); coordinatorContext.updateBucketReplicaAssignment(tableBucket, Arrays.asList(0, 1, 2)); coordinatorContext.putBucketLeaderAndIsr(tableBucket, leaderAndIsr); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachineTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachineTest.java index bbac45c6b3..d6697f8cb3 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachineTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachineTest.java @@ -34,6 +34,7 @@ import org.apache.fluss.server.coordinator.LakeTableTieringManager; import org.apache.fluss.server.coordinator.MetadataManager; import org.apache.fluss.server.coordinator.TestCoordinatorChannelManager; +import org.apache.fluss.server.coordinator.TestCoordinatorContext; import org.apache.fluss.server.coordinator.event.CoordinatorEventManager; import org.apache.fluss.server.coordinator.lease.KvSnapshotLeaseManager; import org.apache.fluss.server.coordinator.statemachine.ReplicaLeaderElection.ControlledShutdownLeaderElection; @@ -41,9 +42,11 @@ import org.apache.fluss.server.metadata.CoordinatorMetadataCache; import org.apache.fluss.server.metrics.group.TestingMetricGroups; import org.apache.fluss.server.zk.NOPErrorHandler; +import org.apache.fluss.server.zk.ZkEpoch; import org.apache.fluss.server.zk.ZooKeeperClient; import org.apache.fluss.server.zk.ZooKeeperExtension; import org.apache.fluss.server.zk.data.LeaderAndIsr; +import org.apache.fluss.server.zk.data.ZkVersion; import org.apache.fluss.shaded.guava32.com.google.common.collect.Sets; import org.apache.fluss.testutils.common.AllCallbackWrapper; import org.apache.fluss.utils.clock.SystemClock; @@ -84,6 +87,8 @@ class TableBucketStateMachineTest { private static ZooKeeperClient zookeeperClient; private static CoordinatorContext coordinatorContext; + private static ZkEpoch zkEpoch; + private TestCoordinatorChannelManager testCoordinatorChannelManager; private CoordinatorRequestBatch coordinatorRequestBatch; private AutoPartitionManager autoPartitionManager; @@ -92,11 +97,12 @@ class TableBucketStateMachineTest { private KvSnapshotLeaseManager kvSnapshotLeaseManager; @BeforeAll - static void baseBeforeAll() { + static void baseBeforeAll() throws Exception { zookeeperClient = ZOO_KEEPER_EXTENSION_WRAPPER .getCustomExtension() .getZooKeeperClient(NOPErrorHandler.INSTANCE); + zkEpoch = zookeeperClient.fenceBecomeCoordinatorLeader("1"); } @BeforeEach @@ -105,7 +111,7 @@ void beforeEach() throws IOException { conf.setString(ConfigOptions.COORDINATOR_HOST, "localhost"); String remoteDir = "/tmp/fluss/remote-data"; conf.setString(ConfigOptions.REMOTE_DATA_DIR, remoteDir); - coordinatorContext = new CoordinatorContext(); + coordinatorContext = new TestCoordinatorContext(); testCoordinatorChannelManager = new TestCoordinatorChannelManager(); coordinatorRequestBatch = new CoordinatorRequestBatch( @@ -157,9 +163,13 @@ void testStartup() throws Exception { // create LeaderAndIsr for t10/t11 info in zk, zookeeperClient.registerLeaderAndIsr( - new TableBucket(t1Id, 0), new LeaderAndIsr(0, 0, Arrays.asList(0, 1), 0, 0)); + new TableBucket(t1Id, 0), + new LeaderAndIsr(0, 0, Arrays.asList(0, 1), 0, 0), + ZkVersion.MATCH_ANY_VERSION.getVersion()); zookeeperClient.registerLeaderAndIsr( - new TableBucket(t1Id, 1), new LeaderAndIsr(2, 0, Arrays.asList(2, 3), 0, 0)); + new TableBucket(t1Id, 1), + new LeaderAndIsr(2, 0, Arrays.asList(2, 3), 0, 0), + ZkVersion.MATCH_ANY_VERSION.getVersion()); // update the LeaderAndIsr to context coordinatorContext.putBucketLeaderAndIsr( t1b0, zookeeperClient.getLeaderAndIsr(new TableBucket(t1Id, 0)).get()); @@ -223,6 +233,8 @@ void testStateChangeToOnline() throws Exception { coordinatorContext.putTablePath(tableId, fakeTablePath); coordinatorContext.updateBucketReplicaAssignment(tableBucket, Arrays.asList(0, 1, 2)); coordinatorContext.putBucketState(tableBucket, NewBucket); + coordinatorContext.setCoordinatorEpochAndZkVersion( + 0, ZkVersion.MATCH_ANY_VERSION.getVersion()); // case1: init a new leader for NewBucket to OnlineBucket tableBucketStateMachine.handleStateChange(Collections.singleton(tableBucket), OnlineBucket); // non any alive servers, the state change fail @@ -268,6 +280,7 @@ void testStateChangeToOnline() throws Exception { CoordinatorEventProcessor coordinatorEventProcessor = new CoordinatorEventProcessor( zookeeperClient, + zkEpoch, serverMetadataCache, new CoordinatorChannelManager( RpcClient.create( diff --git a/fluss-server/src/test/java/org/apache/fluss/server/metadata/ZkBasedMetadataProviderTest.java b/fluss-server/src/test/java/org/apache/fluss/server/metadata/ZkBasedMetadataProviderTest.java index aac6f19ed4..5597bb01b1 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/metadata/ZkBasedMetadataProviderTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/metadata/ZkBasedMetadataProviderTest.java @@ -36,6 +36,7 @@ import org.apache.fluss.server.zk.data.PartitionAssignment; import org.apache.fluss.server.zk.data.TableAssignment; import org.apache.fluss.server.zk.data.TableRegistration; +import org.apache.fluss.server.zk.data.ZkVersion; import org.apache.fluss.testutils.common.AllCallbackWrapper; import org.junit.jupiter.api.AfterAll; @@ -110,8 +111,10 @@ void testGetTableMetadataFromZk() throws Exception { LeaderAndIsr leaderAndIsr0 = new LeaderAndIsr(1, 10, Arrays.asList(1, 2, 3), 100, 1000); LeaderAndIsr leaderAndIsr1 = new LeaderAndIsr(2, 20, Arrays.asList(2, 3, 4), 200, 2000); - zookeeperClient.registerLeaderAndIsr(tableBucket0, leaderAndIsr0); - zookeeperClient.registerLeaderAndIsr(tableBucket1, leaderAndIsr1); + zookeeperClient.registerLeaderAndIsr( + tableBucket0, leaderAndIsr0, ZkVersion.MATCH_ANY_VERSION.getVersion()); + zookeeperClient.registerLeaderAndIsr( + tableBucket1, leaderAndIsr1, ZkVersion.MATCH_ANY_VERSION.getVersion()); List tablesMetadataFromZK = metadataProvider.getTablesMetadataFromZK( @@ -176,8 +179,10 @@ void testGetPartitionMetadataFromZk() throws Exception { LeaderAndIsr leaderAndIsr0 = new LeaderAndIsr(1, 10, Arrays.asList(1, 2), 100, 1000); LeaderAndIsr leaderAndIsr1 = new LeaderAndIsr(2, 20, Arrays.asList(2, 3), 200, 2000); - zookeeperClient.registerLeaderAndIsr(partitionBucket0, leaderAndIsr0); - zookeeperClient.registerLeaderAndIsr(partitionBucket1, leaderAndIsr1); + zookeeperClient.registerLeaderAndIsr( + partitionBucket0, leaderAndIsr0, ZkVersion.MATCH_ANY_VERSION.getVersion()); + zookeeperClient.registerLeaderAndIsr( + partitionBucket1, leaderAndIsr1, ZkVersion.MATCH_ANY_VERSION.getVersion()); // Test getPartitionMetadataFromZkAsync PhysicalTablePath partitionPath = PhysicalTablePath.of(tablePath, partitionName); @@ -275,11 +280,17 @@ void testBatchGetPartitionMetadataFromZkAsync() throws Exception { TableBucket bucket3 = new TableBucket(tableId2, partitionId3, 0); zookeeperClient.registerLeaderAndIsr( - bucket1, new LeaderAndIsr(1, 10, Arrays.asList(1, 2), 100, 1000)); + bucket1, + new LeaderAndIsr(1, 10, Arrays.asList(1, 2), 100, 1000), + ZkVersion.MATCH_ANY_VERSION.getVersion()); zookeeperClient.registerLeaderAndIsr( - bucket2, new LeaderAndIsr(2, 20, Arrays.asList(2, 3), 200, 2000)); + bucket2, + new LeaderAndIsr(2, 20, Arrays.asList(2, 3), 200, 2000), + ZkVersion.MATCH_ANY_VERSION.getVersion()); zookeeperClient.registerLeaderAndIsr( - bucket3, new LeaderAndIsr(1, 30, Arrays.asList(1, 3), 300, 3000)); + bucket3, + new LeaderAndIsr(1, 30, Arrays.asList(1, 3), 300, 3000), + ZkVersion.MATCH_ANY_VERSION.getVersion()); // Test getPartitionsMetadataFromZK List partitionPaths = diff --git a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServiceITCase.java b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServiceITCase.java index 1137e499a2..7bdd781072 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServiceITCase.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServiceITCase.java @@ -890,6 +890,7 @@ void testBecomeLeaderOrFollowerWithOneTabletServerOffline() throws Exception { .updateMetadata( makeUpdateMetadataRequest( coordinatorServerInfo, + null, newTabletServerInfos, Collections.emptyList(), Collections.emptyList())) diff --git a/fluss-server/src/test/java/org/apache/fluss/server/zk/ZooKeeperClientTest.java b/fluss-server/src/test/java/org/apache/fluss/server/zk/ZooKeeperClientTest.java index c867b8dce0..16f59ddd86 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/zk/ZooKeeperClientTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/zk/ZooKeeperClientTest.java @@ -55,6 +55,7 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.params.ParameterizedTest; @@ -85,6 +86,7 @@ class ZooKeeperClientTest { private static ZooKeeperClient zookeeperClient; private static String remoteDataDir; + private static ZkEpoch zkEpoch; @BeforeAll static void beforeAll() { @@ -95,6 +97,12 @@ static void beforeAll() { remoteDataDir = zookeeperClient.getDefaultRemoteDataDir(); } + @BeforeEach + void beforeEach() throws Exception { + // create epoch node in beforeEach(), because it will always be cleaned up in afterEach() + zkEpoch = zookeeperClient.fenceBecomeCoordinatorLeader("tmp"); + } + @AfterEach void afterEach() { ZOO_KEEPER_EXTENSION_WRAPPER.getCustomExtension().cleanupRoot(); @@ -176,7 +184,8 @@ void testTabletAssignments() throws Exception { // test update TableAssignment tableAssignment3 = TableAssignment.builder().add(3, BucketAssignment.of(1, 5)).build(); - zookeeperClient.updateTableAssignment(tableId1, tableAssignment3); + zookeeperClient.updateTableAssignment( + tableId1, tableAssignment3, zkEpoch.getCoordinatorEpochZkVersion()); assertThat(zookeeperClient.getTableAssignment(tableId1)).contains(tableAssignment3); // test delete @@ -193,19 +202,22 @@ void testLeaderAndIsr() throws Exception { assertThat(zookeeperClient.getLeaderAndIsr(tableBucket2)).isEmpty(); // try to register bucket leaderAndIsr - LeaderAndIsr leaderAndIsr1 = new LeaderAndIsr(1, 10, Arrays.asList(1, 2, 3), 100, 1000); - LeaderAndIsr leaderAndIsr2 = new LeaderAndIsr(2, 10, Arrays.asList(4, 5, 6), 100, 1000); + LeaderAndIsr leaderAndIsr1 = new LeaderAndIsr(1, 10, Arrays.asList(1, 2, 3), 0, 1000); + LeaderAndIsr leaderAndIsr2 = new LeaderAndIsr(2, 10, Arrays.asList(4, 5, 6), 0, 1000); - zookeeperClient.registerLeaderAndIsr(tableBucket1, leaderAndIsr1); - zookeeperClient.registerLeaderAndIsr(tableBucket2, leaderAndIsr2); + zookeeperClient.registerLeaderAndIsr( + tableBucket1, leaderAndIsr1, zkEpoch.getCoordinatorEpochZkVersion()); + zookeeperClient.registerLeaderAndIsr( + tableBucket2, leaderAndIsr2, zkEpoch.getCoordinatorEpochZkVersion()); assertThat(zookeeperClient.getLeaderAndIsr(tableBucket1)).hasValue(leaderAndIsr1); assertThat(zookeeperClient.getLeaderAndIsr(tableBucket2)).hasValue(leaderAndIsr2); assertThat(zookeeperClient.getLeaderAndIsrs(Arrays.asList(tableBucket1, tableBucket2))) .containsValues(leaderAndIsr1, leaderAndIsr2); // test update - leaderAndIsr1 = new LeaderAndIsr(2, 20, Collections.emptyList(), 200, 2000); - zookeeperClient.updateLeaderAndIsr(tableBucket1, leaderAndIsr1); + leaderAndIsr1 = new LeaderAndIsr(2, 20, Collections.emptyList(), 0, 2000); + zookeeperClient.updateLeaderAndIsr( + tableBucket1, leaderAndIsr1, zkEpoch.getCoordinatorEpochZkVersion()); assertThat(zookeeperClient.getLeaderAndIsr(tableBucket1)).hasValue(leaderAndIsr1); // test delete @@ -222,7 +234,7 @@ void testBatchCreateAndUpdateLeaderAndIsr(boolean isPartitionTable) throws Excep TableBucket tableBucket = isPartitionTable ? new TableBucket(1, 2L, i) : new TableBucket(1, i); LeaderAndIsr leaderAndIsr = - new LeaderAndIsr(i, 10, Arrays.asList(i + 1, i + 2, i + 3), 100, 1000); + new LeaderAndIsr(i, 10, Arrays.asList(i + 1, i + 2, i + 3), 0, 1000); leaderAndIsrList.add(leaderAndIsr); RegisterTableBucketLeadAndIsrInfo info = isPartitionTable @@ -233,7 +245,8 @@ void testBatchCreateAndUpdateLeaderAndIsr(boolean isPartitionTable) throws Excep tableBucketInfo.add(info); } // batch create - zookeeperClient.batchRegisterLeaderAndIsrForTablePartition(tableBucketInfo); + zookeeperClient.batchRegisterLeaderAndIsrForTablePartition( + tableBucketInfo, zkEpoch.getCoordinatorEpochZkVersion()); for (int i = 0; i < 100; i++) { // each should register successful @@ -263,7 +276,7 @@ void testBatchCreateAndUpdateLeaderAndIsr(boolean isPartitionTable) throws Excep entry.setValue(adjustLeaderAndIsr); }); // batch update - zookeeperClient.batchUpdateLeaderAndIsr(updateMap); + zookeeperClient.batchUpdateLeaderAndIsr(updateMap, zkEpoch.getCoordinatorEpochZkVersion()); for (int i = 0; i < 100; i++) { // each should update successful Optional optionalLeaderAndIsr = @@ -282,9 +295,10 @@ void testBatchUpdateLeaderAndIsr() throws Exception { for (int i = 0; i < totalCount; i++) { TableBucket tableBucket = new TableBucket(1, i); LeaderAndIsr leaderAndIsr = - new LeaderAndIsr(i, 10, Arrays.asList(i + 1, i + 2, i + 3), 100, 1000); + new LeaderAndIsr(i, 10, Arrays.asList(i + 1, i + 2, i + 3), 0, 1000); leaderAndIsrList.put(tableBucket, leaderAndIsr); - zookeeperClient.registerLeaderAndIsr(tableBucket, leaderAndIsr); + zookeeperClient.registerLeaderAndIsr( + tableBucket, leaderAndIsr, zkEpoch.getCoordinatorEpochZkVersion()); } // try to batch update @@ -299,10 +313,11 @@ void testBatchUpdateLeaderAndIsr() throws Exception { old.leader() + 1, old.leaderEpoch() + 1, old.isr(), - old.coordinatorEpoch() + 1, + old.coordinatorEpoch(), old.bucketEpoch() + 1); })); - zookeeperClient.batchUpdateLeaderAndIsr(updateLeaderAndIsrList); + zookeeperClient.batchUpdateLeaderAndIsr( + updateLeaderAndIsrList, zkEpoch.getCoordinatorEpochZkVersion()); for (Map.Entry entry : updateLeaderAndIsrList.entrySet()) { TableBucket tableBucket = entry.getKey(); LeaderAndIsr leaderAndIsr = entry.getValue(); From 1a4384b95064644c4fdf0e8c5c05779ac2eb96d9 Mon Sep 17 00:00:00 2001 From: Jark Wu Date: Thu, 9 Apr 2026 15:05:09 +0800 Subject: [PATCH 2/2] [server] Update leader election process and improve coordinator epoch handling --- .../coordinator/CoordinatorContext.java | 29 ++++++++------ .../CoordinatorEventProcessor.java | 16 +++----- .../CoordinatorLeaderElection.java | 14 ++----- .../server/coordinator/CoordinatorServer.java | 32 +++------------ .../statemachine/ReplicaStateMachine.java | 2 +- .../statemachine/TableBucketStateMachine.java | 11 ++---- .../fluss/server/replica/ReplicaManager.java | 17 +++++--- .../org/apache/fluss/server/zk/ZkEpoch.java | 7 ++++ .../fluss/server/zk/ZooKeeperClient.java | 37 ++++++++++++------ .../coordinator/CoordinatorContextTest.java | 3 +- .../CoordinatorEventProcessorTest.java | 3 +- .../CoordinatorHighAvailabilityITCase.java | 33 ++++++++++------ .../server/coordinator/TableManagerTest.java | 7 +++- .../coordinator/TestCoordinatorContext.java | 39 ------------------- .../rebalance/RebalanceManagerTest.java | 3 +- .../statemachine/ReplicaStateMachineTest.java | 18 +++++---- .../TableBucketStateMachineTest.java | 9 +---- .../testutils/FlussClusterExtension.java | 7 +++- 18 files changed, 132 insertions(+), 155 deletions(-) delete mode 100644 fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorContext.java diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java index 138ddb1756..92dfc5ee5c 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java @@ -28,6 +28,7 @@ import org.apache.fluss.server.coordinator.statemachine.BucketState; import org.apache.fluss.server.coordinator.statemachine.ReplicaState; import org.apache.fluss.server.metadata.ServerInfo; +import org.apache.fluss.server.zk.ZkEpoch; import org.apache.fluss.server.zk.data.LeaderAndIsr; import org.apache.fluss.utils.types.Tuple2; @@ -108,25 +109,33 @@ public class CoordinatorContext { /** A mapping from tabletServers to server tag. */ private final Map serverTags = new HashMap<>(); + /** + * The epoch of the coordinator, which will be incremented whenever the coordinator is elected + * or re-elected. + */ + private final int coordinatorEpoch; + + /** + * The coordinator epoch zk version is the zk version when the coordinator epoch is updated in + * Zookeeper. + */ + private final int coordinatorEpochZkVersion; + private ServerInfo coordinatorServerInfo = null; - private int coordinatorEpoch = INITIAL_COORDINATOR_EPOCH; - private int coordinatorEpochZkVersion = INITIAL_COORDINATOR_EPOCH_ZK_VERSION; - public CoordinatorContext() {} + public CoordinatorContext(ZkEpoch zkEpoch) { + this.coordinatorEpoch = zkEpoch.getCoordinatorEpoch(); + this.coordinatorEpochZkVersion = zkEpoch.getCoordinatorEpochZkVersion(); + } public int getCoordinatorEpoch() { return coordinatorEpoch; } - public int getCoordinatorEpochZkVersion() { + public int getCoordinatorZkVersion() { return coordinatorEpochZkVersion; } - public void setCoordinatorEpochAndZkVersion(int newEpoch, int newZkVersion) { - this.coordinatorEpoch = newEpoch; - this.coordinatorEpochZkVersion = newZkVersion; - } - public Set getLiveCoordinatorServers() { return liveCoordinatorServers; } @@ -719,8 +728,6 @@ private void clearTablesState() { public void resetContext() { tablesToBeDeleted.clear(); - coordinatorEpoch = INITIAL_COORDINATOR_EPOCH; - coordinatorEpochZkVersion = INITIAL_COORDINATOR_EPOCH_ZK_VERSION; clearTablesState(); liveTabletServers.clear(); liveCoordinatorServers.clear(); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java index 423d27622b..7ae553caa7 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java @@ -108,7 +108,6 @@ import org.apache.fluss.server.metadata.ServerInfo; import org.apache.fluss.server.metrics.group.CoordinatorMetricGroup; import org.apache.fluss.server.utils.ServerRpcMessageUtils; -import org.apache.fluss.server.zk.ZkEpoch; import org.apache.fluss.server.zk.ZooKeeperClient; import org.apache.fluss.server.zk.data.BucketAssignment; import org.apache.fluss.server.zk.data.LeaderAndIsr; @@ -189,7 +188,6 @@ public class CoordinatorEventProcessor implements EventProcessor { public CoordinatorEventProcessor( ZooKeeperClient zooKeeperClient, - ZkEpoch zkEpoch, CoordinatorMetadataCache serverMetadataCache, CoordinatorChannelManager coordinatorChannelManager, CoordinatorContext coordinatorContext, @@ -255,9 +253,6 @@ public CoordinatorEventProcessor( this.ioExecutor = ioExecutor; this.lakeTableHelper = new LakeTableHelper(zooKeeperClient, conf.getString(ConfigOptions.REMOTE_DATA_DIR)); - - this.coordinatorContext.setCoordinatorEpochAndZkVersion( - zkEpoch.getCoordinatorEpoch(), zkEpoch.getCoordinatorEpochZkVersion()); } public CoordinatorEventManager getCoordinatorEventManager() { @@ -314,6 +309,7 @@ public void shutdown() { coordinatorEventManager.close(); rebalanceManager.close(); onShutdown(); + coordinatorContext.resetContext(); } private ServerInfo getCoordinatorServerInfo() { @@ -1626,7 +1622,7 @@ private void updateReplicaAssignmentForBucket( zooKeeperClient.updateTableAssignment( tableId, new TableAssignment(newTableAssignment), - coordinatorContext.getCoordinatorEpochZkVersion()); + coordinatorContext.getCoordinatorZkVersion()); } else { Map> partitionAssignment = coordinatorContext.getPartitionAssignment( @@ -1639,7 +1635,7 @@ private void updateReplicaAssignmentForBucket( zooKeeperClient.updatePartitionAssignment( partitionId, new PartitionAssignment(tableId, newPartitionAssignment), - coordinatorContext.getCoordinatorEpochZkVersion()); + coordinatorContext.getCoordinatorZkVersion()); } } @@ -1686,7 +1682,7 @@ private List tryProcessAdjustIsr( try { zooKeeperClient.batchUpdateLeaderAndIsr( - newLeaderAndIsrList, coordinatorContext.getCoordinatorEpochZkVersion()); + newLeaderAndIsrList, coordinatorContext.getCoordinatorZkVersion()); newLeaderAndIsrList.forEach( (tableBucket, newLeaderAndIsr) -> result.add(new AdjustIsrResultForBucket(tableBucket, newLeaderAndIsr))); @@ -1700,7 +1696,7 @@ private List tryProcessAdjustIsr( zooKeeperClient.updateLeaderAndIsr( tableBucket, newLeaderAndIsr, - coordinatorContext.getCoordinatorEpochZkVersion()); + coordinatorContext.getCoordinatorZkVersion()); } catch (Exception e) { LOG.error("Error when register leader and isr.", e); result.add( @@ -2227,7 +2223,7 @@ private void updateBucketEpochAndSendRequest(TableBucket tableBucket, List> pendingCleanup = new AtomicReference<>(CompletableFuture.completedFuture(null)); - public CoordinatorLeaderElection( - ZooKeeperClient zkClient, String serverId, CoordinatorContext coordinatorContext) { + public CoordinatorLeaderElection(ZooKeeperClient zkClient, String serverId) { this.serverId = serverId; - this.zkClient = zkClient; - this.coordinatorContext = coordinatorContext; this.leaderLatch = new LeaderLatch( zkClient.getCuratorClient(), @@ -113,10 +108,6 @@ public void isLeader() { // Run init on a separate thread to avoid deadlock with // Curator's EventThread when performing ZK operations. - // Set leader flag before init completes, so when zk found this leader, the - // leader can accept requests - isLeader.set(true); - leaderCallbackExecutor.execute( () -> { // Wait for any pending cleanup to finish first. @@ -134,6 +125,9 @@ public void isLeader() { } try { initLeaderServices.run(); + // Set leader flag after init completes, so when zk found + // this leader, the leader can accept requests + isLeader.set(true); } catch (CoordinatorEpochFencedException e) { LOG.warn( "Coordinator server {} has been fenced and not become leader successfully.", diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java index a54fc2d58e..f44bbd7a8d 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java @@ -141,9 +141,6 @@ public class CoordinatorServer extends ServerBase { @Nullable private Authorizer authorizer; - @GuardedBy("lock") - private CoordinatorContext coordinatorContext; - @GuardedBy("lock") private DynamicConfigManager dynamicConfigManager; @@ -221,11 +218,9 @@ protected void initCoordinatorStandby() throws Exception { serverId); this.zkClient = ZooKeeperUtils.startZookeeperClient(conf, this); - this.coordinatorContext = new CoordinatorContext(); // CoordinatorLeaderElection must be created after zkClient is initialized. - this.coordinatorLeaderElection = - new CoordinatorLeaderElection(zkClient, serverId, coordinatorContext); + this.coordinatorLeaderElection = new CoordinatorLeaderElection(zkClient, serverId); this.lakeCatalogDynamicLoader = new LakeCatalogDynamicLoader(conf, pluginManager, true); this.dynamicConfigManager = new DynamicConfigManager(zkClient, conf, true); @@ -298,8 +293,8 @@ protected void initCoordinatorStandby() throws Exception { protected void initCoordinatorLeader() throws Exception { // to avoid split-brain ZkEpoch zkEpoch = zkClient.fenceBecomeCoordinatorLeader(serverId); - registerCoordinatorLeader(); + synchronized (lock) { this.clientMetricGroup = new ClientMetricGroup(metricRegistry, SERVER_NAME); this.rpcClient = RpcClient.create(conf, clientMetricGroup); @@ -311,14 +306,13 @@ protected void initCoordinatorLeader() throws Exception { autoPartitionManager.start(); // start coordinator event processor after we register coordinator leader to zk - // so that the event processor can get the coordinator leader node from zk during start - // up. - // in HA for coordinator server, the processor also need to know the leader node during - // start up + // so that the event processor can get the coordinator leader node from zk during + // start up. In HA for coordinator server, the processor also need to know the leader + // node during start up + CoordinatorContext coordinatorContext = new CoordinatorContext(zkEpoch); this.coordinatorEventProcessor = new CoordinatorEventProcessor( zkClient, - zkEpoch, metadataCache, coordinatorChannelManager, coordinatorContext, @@ -402,11 +396,6 @@ protected void cleanupCoordinatorLeader() { LOG.warn("Failed to close client metric group", t); } - // Reset coordinator context for next election - if (coordinatorContext != null) { - coordinatorContext.resetContext(); - } - LOG.info("Coordinator leader services cleaned up successfully."); } } @@ -601,15 +590,6 @@ CompletableFuture stopServices() { exception = ExceptionUtils.firstOrSuppressed(t, exception); } - try { - if (coordinatorContext != null) { - // then reset coordinatorContext - coordinatorContext.resetContext(); - } - } catch (Throwable t) { - exception = ExceptionUtils.firstOrSuppressed(t, exception); - } - try { if (lakeTableTieringManager != null) { lakeTableTieringManager.close(); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachine.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachine.java index 2416b163b5..63f5dd9652 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachine.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachine.java @@ -488,7 +488,7 @@ private Map doRemoveReplicaFromIsr( } try { zooKeeperClient.batchUpdateLeaderAndIsr( - toUpdateLeaderAndIsrList, coordinatorContext.getCoordinatorEpochZkVersion()); + toUpdateLeaderAndIsrList, coordinatorContext.getCoordinatorZkVersion()); toUpdateLeaderAndIsrList.forEach(coordinatorContext::putBucketLeaderAndIsr); return adjustedLeaderAndIsr; } catch (Exception e) { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachine.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachine.java index b01e65c92d..300e98f08f 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachine.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachine.java @@ -312,9 +312,7 @@ private Optional initLeaderForTableBuckets( LeaderAndIsr leaderAndIsr = electionResult.leaderAndIsr; try { zooKeeperClient.registerLeaderAndIsr( - tableBucket, - leaderAndIsr, - coordinatorContext.getCoordinatorEpochZkVersion()); + tableBucket, leaderAndIsr, coordinatorContext.getCoordinatorZkVersion()); } catch (Exception e) { LOG.error( "Fail to create state node for table bucket {} in zookeeper.", @@ -382,8 +380,7 @@ public void batchHandleOnlineChangeAndInitLeader(Set tableBuckets) if (!tableBucketLeadAndIsrInfos.isEmpty()) { try { zooKeeperClient.batchRegisterLeaderAndIsrForTablePartition( - tableBucketLeadAndIsrInfos, - coordinatorContext.getCoordinatorEpochZkVersion()); + tableBucketLeadAndIsrInfos, coordinatorContext.getCoordinatorZkVersion()); registerSuccessList.addAll(tableBucketLeadAndIsrInfos); } catch (Exception e) { LOG.error( @@ -461,7 +458,7 @@ private List tryRegisterLeaderAndIsrOneByOne( zooKeeperClient.registerLeaderAndIsr( info.getTableBucket(), info.getLeaderAndIsr(), - coordinatorContext.getCoordinatorEpochZkVersion()); + coordinatorContext.getCoordinatorZkVersion()); registerSuccessList.add(info); } catch (Exception e) { LOG.error( @@ -506,7 +503,7 @@ private Optional electNewLeaderForTableBuckets( zooKeeperClient.updateLeaderAndIsr( tableBucket, electionResult.leaderAndIsr, - coordinatorContext.getCoordinatorEpochZkVersion()); + coordinatorContext.getCoordinatorZkVersion()); } catch (Exception e) { LOG.error( "Fail to update bucket LeaderAndIsr for table bucket {}.", diff --git a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java index 6960b51910..19ab3f936a 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java @@ -336,6 +336,11 @@ public int getMinInSyncReplicas() { return minInSyncReplicas; } + @VisibleForTesting + public int getCoordinatorEpoch() { + return coordinatorEpoch; + } + // ============ ServerReconfigurable Implementation ============ @Override @@ -1879,9 +1884,14 @@ private void validateAndApplyCoordinatorEpoch(int requestCoordinatorEpoch, Strin requestCoordinatorEpoch, requestName, this.coordinatorEpoch); LOG.warn("Ignore the {} request because {}", requestName, errorMessage); throw new InvalidCoordinatorException(errorMessage); - } else { + } else if (requestCoordinatorEpoch > this.coordinatorEpoch) { + LOG.info( + "Update coordinator epoch from {} to {} for coordinator leader switch.", + this.coordinatorEpoch, + requestCoordinatorEpoch); this.coordinatorEpoch = requestCoordinatorEpoch; } + // ignore equal case } private void dropEmptyTableOrPartitionDir(Path dir, long id, String dirType) { @@ -1991,11 +2001,6 @@ public TabletServerMetricGroup getServerMetricGroup() { return serverMetricGroup; } - @VisibleForTesting - public void resetCoordinatorEpoch() { - this.coordinatorEpoch = CoordinatorContext.INITIAL_COORDINATOR_EPOCH; - } - /** * Interface to represent the state of hosted {@link Replica}. We create a concrete (active) * {@link Replica} instance when the TabletServer receives a createLogLeader request or diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZkEpoch.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZkEpoch.java index f3d401a255..3f39843733 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZkEpoch.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZkEpoch.java @@ -18,8 +18,15 @@ package org.apache.fluss.server.zk; +import static org.apache.fluss.server.coordinator.CoordinatorContext.INITIAL_COORDINATOR_EPOCH; +import static org.apache.fluss.server.coordinator.CoordinatorContext.INITIAL_COORDINATOR_EPOCH_ZK_VERSION; + /** Class for coordinator epoch and coordinator epoch zk version. */ public class ZkEpoch { + + public static final ZkEpoch INITIAL_EPOCH = + new ZkEpoch(INITIAL_COORDINATOR_EPOCH, INITIAL_COORDINATOR_EPOCH_ZK_VERSION); + private final int coordinatorEpoch; private final int coordinatorEpochZkVersion; diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java index 3c8b58fa2c..cb34a2f2ed 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java @@ -139,24 +139,39 @@ * *

In some method, 'expectedZkVersion' is used to execute an epoch Zookeeper version check. * Conditions requiring epoch checks (all must be met): - * ┌─────────────────────────────────────────────────────────┐ │ 1. Invoked by the Coordinator (not - * the TabletServer) │ │ 2. Operates on persistent nodes (not ephemeral) │ │ 3. Constitutes a - * "control plane" operation: │ │ partition assignment or LeaderAndIsr election │ │ 4. Concurrent - * access to the same path by old and new │ │ leaders during leader failover │ │ 5. No other - * mechanisms (optimistic locking, │ │ idempotency, or reloading) provide fallback │ + * + *

+ * ┌─────────────────────────────────────────────────────────┐
+ * │ 1. Invoked by the Coordinator (not the TabletServer)    │
+ * │ 2. Operates on persistent nodes (not ephemeral)         │
+ * │ 3. Constitutes a "control plane" operation:             │
+ * │    partition assignment or LeaderAndIsr election        │
+ * │ 4. Concurrent access to the same path by old and new    │
+ * │    leaders during leader failover                       │
+ * │ 5. No other mechanisms (optimistic locking,             │
+ * │    idempotency, or reloading) provide fallback          │
  * └─────────────────────────────────────────────────────────┘
+ * 
+ * + *

In practice, only two types of operations truly require this: * - *

In practice, only two types of operations truly require this: - CRUD for Table/Partition - * Assignment (assignment decisions). - CRUD for LeaderAndIsr (leader election results). + *

    + *
  • CRUD for Table/Partition Assignment (assignment decisions). + *
  • CRUD for LeaderAndIsr (leader election results). + *
* *

These operations are inevitably executed concurrently by the old and new coordinators during * failover (as the new leader immediately reassigns partitions), and overwrites cannot be * automatically recovered. * - *

All other operations do not require epoch checks because: - DDL operations are protected - * against concurrency by client reconnection mechanisms. - TabletServer operations are unaffected - * by coordinator failovers. - ACLs and Configs have their own version control or idempotency - * guarantees. - Ephemeral nodes are managed via session lifecycle. + *

All other operations do not require epoch checks because: + * + *

    + *
  • DDL operations are protected against concurrency by client reconnection mechanisms. + *
  • TabletServer operations are unaffected by coordinator failovers. + *
  • ACLs and Configs have their own version control or idempotency guarantees. + *
  • Ephemeral nodes are managed via session lifecycle. + *
*/ @Internal public class ZooKeeperClient implements AutoCloseable { diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorContextTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorContextTest.java index 5c321dfffa..2b788bdb48 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorContextTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorContextTest.java @@ -22,6 +22,7 @@ import org.apache.fluss.metadata.TableDescriptor; import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.server.zk.ZkEpoch; import org.apache.fluss.types.DataTypes; import org.junit.jupiter.api.Test; @@ -37,7 +38,7 @@ class CoordinatorContextTest { @Test void testGetLakeTableCount() { - CoordinatorContext context = new CoordinatorContext(); + CoordinatorContext context = new CoordinatorContext(ZkEpoch.INITIAL_EPOCH); // Initially, there should be no tables assertThat(context.allTables()).isEmpty(); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java index cd036b3641..63f0b5fd90 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java @@ -1061,10 +1061,9 @@ private void verifyIsr(TableBucket tb, int expectedLeader, List expecte private CoordinatorEventProcessor buildCoordinatorEventProcessor() { return new CoordinatorEventProcessor( zookeeperClient, - zkEpoch, serverMetadataCache, testCoordinatorChannelManager, - new TestCoordinatorContext(), + new CoordinatorContext(zkEpoch), autoPartitionManager, lakeTableTieringManager, TestingMetricGroups.COORDINATOR_METRICS, diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorHighAvailabilityITCase.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorHighAvailabilityITCase.java index e974ec2d97..68e46e3590 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorHighAvailabilityITCase.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorHighAvailabilityITCase.java @@ -283,16 +283,14 @@ void testTabletServerRejectsStaleCoordinatorEpochAfterLeaderSwitch() throws Exce .isEqualTo(standby.getServerId()); int newCoordinatorEpoch = standby.getCoordinatorEventProcessor().getCoordinatorEpoch(); + // wait the new coordinator notifies the tablet server about the new coordinator epoch + waitUntil( + () -> tabletServer.getReplicaManager().getCoordinatorEpoch() == newCoordinatorEpoch, + Duration.ofSeconds(30), + "Tablet server did not update to new coordinator epoch"); TabletServerGateway tsGateway = createGatewayForTabletServer(tabletServer); - // Send request with new coordinator epoch first to ensure the tablet server - // has updated its stored epoch to the latest value - tsGateway - .updateMetadata( - new UpdateMetadataRequest().setCoordinatorEpoch(newCoordinatorEpoch)) - .get(); - // Send request with old coordinator epoch — tablet server should reject it assertThatThrownBy( () -> @@ -339,11 +337,11 @@ void testZooKeeperRejectsStaleCoordinatorRequestAfterLeaderSwitch() throws Excep int newLeaderEpochZkVersion = standby.getCoordinatorEventProcessor() .getCoordinatorContext() - .getCoordinatorEpochZkVersion(); + .getCoordinatorZkVersion(); int oldLeaderEpochZkVersion = leader.getCoordinatorEventProcessor() .getCoordinatorContext() - .getCoordinatorEpochZkVersion(); + .getCoordinatorZkVersion(); assertThatThrownBy( () -> @@ -413,11 +411,23 @@ private Configuration createTabletServerConfiguration() { return configuration; } - private void waitUntilCoordinatorServerElected() { + private void waitUntilCoordinatorServerElected() throws Exception { waitUntil( () -> zookeeperClient.getCoordinatorLeaderAddress().isPresent(), Duration.ofMinutes(1), "Fail to wait coordinator server elected"); + waitUntilCoordinatorLeaderReady(); + } + + private void waitUntilCoordinatorLeaderReady() throws Exception { + CoordinatorAddress currentLeaderAddress = + zookeeperClient.getCoordinatorLeaderAddress().get(); + CoordinatorServer currentLeader = findServerById(currentLeaderAddress.getId()); + // we need to wait some time for the coordinator leader start services after election + waitUntil( + () -> currentLeader.getCoordinatorService().isLeader(), + Duration.ofSeconds(30), + "Coordinator leader did not recognize itself as leader"); } private CoordinatorServer findServerById(String serverId) { @@ -493,7 +503,7 @@ private void killZkSession(CoordinatorServer server) throws Exception { } /** Waits until a new coordinator leader is elected that is different from the given old one. */ - private void waitUntilNewLeaderElected(String oldLeaderId) { + private void waitUntilNewLeaderElected(String oldLeaderId) throws Exception { waitUntil( () -> { try { @@ -507,6 +517,7 @@ private void waitUntilNewLeaderElected(String oldLeaderId) { }, Duration.ofMinutes(1), "Fail to wait for new coordinator leader to be elected"); + waitUntilCoordinatorLeaderReady(); } /** Verifies that the given server can process requests as a leader. */ diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerTest.java index 2c30fc7482..f5357c549f 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerTest.java @@ -31,6 +31,7 @@ import org.apache.fluss.server.entity.DeleteReplicaResultForBucket; import org.apache.fluss.server.metadata.ServerInfo; import org.apache.fluss.server.zk.NOPErrorHandler; +import org.apache.fluss.server.zk.ZkEpoch; import org.apache.fluss.server.zk.ZooKeeperClient; import org.apache.fluss.server.zk.ZooKeeperExtension; import org.apache.fluss.server.zk.data.BucketAssignment; @@ -78,6 +79,7 @@ class TableManagerTest { new AllCallbackWrapper<>(new ZooKeeperExtension()); private static ZooKeeperClient zookeeperClient; + private static ZkEpoch zkEpoch; private static ExecutorService ioExecutor; private CoordinatorContext coordinatorContext; @@ -86,11 +88,12 @@ class TableManagerTest { private TestCoordinatorChannelManager testCoordinatorChannelManager; @BeforeAll - static void baseBeforeAll() { + static void baseBeforeAll() throws Exception { zookeeperClient = ZOO_KEEPER_EXTENSION_WRAPPER .getCustomExtension() .getZooKeeperClient(NOPErrorHandler.INSTANCE); + zkEpoch = zookeeperClient.fenceBecomeCoordinatorLeader("1"); ioExecutor = Executors.newFixedThreadPool(1); } @@ -113,7 +116,7 @@ static void afterAll() { private void initTableManager() { testingEventManager = new TestingEventManager(); - coordinatorContext = new TestCoordinatorContext(); + coordinatorContext = new CoordinatorContext(zkEpoch); testCoordinatorChannelManager = new TestCoordinatorChannelManager(); Configuration conf = new Configuration(); conf.setString(ConfigOptions.REMOTE_DATA_DIR, "/tmp/fluss/remote-data"); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorContext.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorContext.java deleted file mode 100644 index b931d564ff..0000000000 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorContext.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.fluss.server.coordinator; - -import org.apache.fluss.server.zk.data.ZkVersion; - -/** A coordinator context for test purpose which can set epoch manually. */ -public class TestCoordinatorContext extends CoordinatorContext { - public TestCoordinatorContext() { - // When create or modify ZooKeeper node, it should check ZooKeeper epoch node version - // to ensure the coordinator is still holding the leadership. However, in the test - // cases, we don't register epoch node, so we skip the check process by setting - // "coordinatorEpochZkVersion" to ZkVersion.MATCH_ANY_VERSION - super(); - this.setCoordinatorEpochAndZkVersion( - INITIAL_COORDINATOR_EPOCH, ZkVersion.MATCH_ANY_VERSION.getVersion()); - } - - public TestCoordinatorContext(int coordinatorEpoch, int coordinatorEpochZkVersion) { - super(); - this.setCoordinatorEpochAndZkVersion(coordinatorEpoch, coordinatorEpochZkVersion); - } -} diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManagerTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManagerTest.java index 57ba9fff7a..6ae36b8dee 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManagerTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManagerTest.java @@ -137,10 +137,9 @@ void testRebalanceWithoutTask() throws Exception { private CoordinatorEventProcessor buildCoordinatorEventProcessor() { return new CoordinatorEventProcessor( zookeeperClient, - zkEpoch, serverMetadataCache, testCoordinatorChannelManager, - new CoordinatorContext(), + new CoordinatorContext(zkEpoch), autoPartitionManager, lakeTableTieringManager, TestingMetricGroups.COORDINATOR_METRICS, diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachineTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachineTest.java index 4c89b5722e..f1ad2fe1c1 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachineTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachineTest.java @@ -30,11 +30,11 @@ import org.apache.fluss.server.coordinator.CoordinatorRequestBatch; import org.apache.fluss.server.coordinator.CoordinatorTestUtils; import org.apache.fluss.server.coordinator.TestCoordinatorChannelManager; -import org.apache.fluss.server.coordinator.TestCoordinatorContext; import org.apache.fluss.server.coordinator.event.DeleteReplicaResponseReceivedEvent; import org.apache.fluss.server.entity.DeleteReplicaResultForBucket; import org.apache.fluss.server.metadata.ServerInfo; import org.apache.fluss.server.zk.NOPErrorHandler; +import org.apache.fluss.server.zk.ZkEpoch; import org.apache.fluss.server.zk.ZooKeeperClient; import org.apache.fluss.server.zk.ZooKeeperExtension; import org.apache.fluss.server.zk.data.LeaderAndIsr; @@ -71,18 +71,20 @@ class ReplicaStateMachineTest { new AllCallbackWrapper<>(new ZooKeeperExtension()); private static ZooKeeperClient zookeeperClient; + private static ZkEpoch zkEpoch; @BeforeAll - static void baseBeforeAll() { + static void baseBeforeAll() throws Exception { zookeeperClient = ZOO_KEEPER_EXTENSION_WRAPPER .getCustomExtension() .getZooKeeperClient(NOPErrorHandler.INSTANCE); + zkEpoch = zookeeperClient.fenceBecomeCoordinatorLeader("1"); } @Test void testStartup() { - CoordinatorContext coordinatorContext = new TestCoordinatorContext(); + CoordinatorContext coordinatorContext = new CoordinatorContext(zkEpoch); // init coordinator server context with a table assignment TableBucket tableBucket = new TableBucket(1, 0); @@ -107,7 +109,7 @@ void testStartup() { @Test void testReplicaStateChange() { - CoordinatorContext coordinatorContext = new TestCoordinatorContext(); + CoordinatorContext coordinatorContext = new CoordinatorContext(zkEpoch); ReplicaStateMachine replicaStateMachine = createReplicaStateMachine(coordinatorContext); // test check valid replica state change @@ -136,7 +138,7 @@ void testReplicaStateChange() { @Test void testDeleteReplicaStateChange() { Map isReplicaDeleteSuccess = new HashMap<>(); - CoordinatorContext coordinatorContext = new TestCoordinatorContext(); + CoordinatorContext coordinatorContext = new CoordinatorContext(zkEpoch); coordinatorContext.setLiveTabletServers( CoordinatorTestUtils.createServers(Arrays.asList(0, 1))); // use a context that will return a gateway that always get success ack @@ -167,7 +169,7 @@ void testDeleteReplicaStateChange() { } // now, we change a context that some gateway will return exception - coordinatorContext = new CoordinatorContext(); + coordinatorContext = new CoordinatorContext(zkEpoch); coordinatorContext.setLiveTabletServers( CoordinatorTestUtils.createServers(Arrays.asList(0, 1))); coordinatorContext.putBucketLeaderAndIsr(tableBucket1, new LeaderAndIsr(0, 0)); @@ -193,7 +195,7 @@ void testDeleteReplicaStateChange() { @Test void testOfflineReplicasShouldBeRemovedFromIsr() throws Exception { - CoordinatorContext coordinatorContext = new TestCoordinatorContext(); + CoordinatorContext coordinatorContext = new CoordinatorContext(zkEpoch); coordinatorContext.setLiveTabletServers(createServers(new int[] {0, 1, 2})); ReplicaStateMachine replicaStateMachine = createReplicaStateMachine(coordinatorContext); @@ -233,7 +235,7 @@ void testOfflineReplicasShouldBeRemovedFromIsr() throws Exception { @Test void testOfflineReplicaShouldBeRemovedFromIsr() throws Exception { - CoordinatorContext coordinatorContext = new TestCoordinatorContext(); + CoordinatorContext coordinatorContext = new CoordinatorContext(zkEpoch); coordinatorContext.setLiveTabletServers(createServers(new int[] {0, 1, 2})); ReplicaStateMachine replicaStateMachine = createReplicaStateMachine(coordinatorContext); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachineTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachineTest.java index d6697f8cb3..2ea3718ffb 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachineTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachineTest.java @@ -34,7 +34,6 @@ import org.apache.fluss.server.coordinator.LakeTableTieringManager; import org.apache.fluss.server.coordinator.MetadataManager; import org.apache.fluss.server.coordinator.TestCoordinatorChannelManager; -import org.apache.fluss.server.coordinator.TestCoordinatorContext; import org.apache.fluss.server.coordinator.event.CoordinatorEventManager; import org.apache.fluss.server.coordinator.lease.KvSnapshotLeaseManager; import org.apache.fluss.server.coordinator.statemachine.ReplicaLeaderElection.ControlledShutdownLeaderElection; @@ -57,7 +56,6 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; -import java.io.IOException; import java.time.Duration; import java.util.Arrays; import java.util.Collections; @@ -106,12 +104,12 @@ static void baseBeforeAll() throws Exception { } @BeforeEach - void beforeEach() throws IOException { + void beforeEach() { Configuration conf = new Configuration(); conf.setString(ConfigOptions.COORDINATOR_HOST, "localhost"); String remoteDir = "/tmp/fluss/remote-data"; conf.setString(ConfigOptions.REMOTE_DATA_DIR, remoteDir); - coordinatorContext = new TestCoordinatorContext(); + coordinatorContext = new CoordinatorContext(zkEpoch); testCoordinatorChannelManager = new TestCoordinatorChannelManager(); coordinatorRequestBatch = new CoordinatorRequestBatch( @@ -233,8 +231,6 @@ void testStateChangeToOnline() throws Exception { coordinatorContext.putTablePath(tableId, fakeTablePath); coordinatorContext.updateBucketReplicaAssignment(tableBucket, Arrays.asList(0, 1, 2)); coordinatorContext.putBucketState(tableBucket, NewBucket); - coordinatorContext.setCoordinatorEpochAndZkVersion( - 0, ZkVersion.MATCH_ANY_VERSION.getVersion()); // case1: init a new leader for NewBucket to OnlineBucket tableBucketStateMachine.handleStateChange(Collections.singleton(tableBucket), OnlineBucket); // non any alive servers, the state change fail @@ -280,7 +276,6 @@ void testStateChangeToOnline() throws Exception { CoordinatorEventProcessor coordinatorEventProcessor = new CoordinatorEventProcessor( zookeeperClient, - zkEpoch, serverMetadataCache, new CoordinatorChannelManager( RpcClient.create( diff --git a/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java b/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java index 9498419e0e..74a0415963 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java @@ -942,11 +942,16 @@ public CoordinatorServer getCoordinatorServer() { return coordinatorServer; } - public void waitUntilCoordinatorServerElected() throws Exception { + private void waitUntilCoordinatorServerElected() { waitUntil( () -> zooKeeperClient.getCoordinatorLeaderAddress().isPresent(), Duration.ofSeconds(10), "Fail to wait coordinator server elected"); + // we need to wait some time for the coordinator leader start services after election + waitUntil( + () -> coordinatorServer.getCoordinatorService().isLeader(), + Duration.ofSeconds(10), + "Coordinator leader did not recognize itself as leader"); } // --------------------------------------------------------------------------------------------