diff --git a/fluss-common/src/main/java/org/apache/fluss/exception/CoordinatorEpochFencedException.java b/fluss-common/src/main/java/org/apache/fluss/exception/CoordinatorEpochFencedException.java new file mode 100644 index 0000000000..0e0e4e409a --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/exception/CoordinatorEpochFencedException.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.exception; + +/** Exception thrown when the Coordinator leader epoch is fenced. */ +public class CoordinatorEpochFencedException extends RuntimeException { + public CoordinatorEpochFencedException(String message) { + super(message); + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java index 57ed115289..92dfc5ee5c 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorContext.java @@ -28,6 +28,7 @@ import org.apache.fluss.server.coordinator.statemachine.BucketState; import org.apache.fluss.server.coordinator.statemachine.ReplicaState; import org.apache.fluss.server.metadata.ServerInfo; +import org.apache.fluss.server.zk.ZkEpoch; import org.apache.fluss.server.zk.data.LeaderAndIsr; import org.apache.fluss.utils.types.Tuple2; @@ -55,6 +56,7 @@ public class CoordinatorContext { private static final Logger LOG = LoggerFactory.getLogger(CoordinatorContext.class); public static final int INITIAL_COORDINATOR_EPOCH = 0; + public static final int INITIAL_COORDINATOR_EPOCH_ZK_VERSION = 0; // for simplicity, we just use retry time, may consider make it a configurable value // and use combine retry times and retry delay @@ -107,15 +109,33 @@ public class CoordinatorContext { /** A mapping from tabletServers to server tag. */ private final Map serverTags = new HashMap<>(); + /** + * The epoch of the coordinator, which will be incremented whenever the coordinator is elected + * or re-elected. + */ + private final int coordinatorEpoch; + + /** + * The coordinator epoch zk version is the zk version when the coordinator epoch is updated in + * Zookeeper. + */ + private final int coordinatorEpochZkVersion; + private ServerInfo coordinatorServerInfo = null; - private int coordinatorEpoch = INITIAL_COORDINATOR_EPOCH; - public CoordinatorContext() {} + public CoordinatorContext(ZkEpoch zkEpoch) { + this.coordinatorEpoch = zkEpoch.getCoordinatorEpoch(); + this.coordinatorEpochZkVersion = zkEpoch.getCoordinatorEpochZkVersion(); + } public int getCoordinatorEpoch() { return coordinatorEpoch; } + public int getCoordinatorZkVersion() { + return coordinatorEpochZkVersion; + } + public Set getLiveCoordinatorServers() { return liveCoordinatorServers; } @@ -708,12 +728,11 @@ private void clearTablesState() { public void resetContext() { tablesToBeDeleted.clear(); - coordinatorEpoch = 0; clearTablesState(); liveTabletServers.clear(); + liveCoordinatorServers.clear(); shuttingDownTabletServers.clear(); serverTags.clear(); - liveCoordinatorServers.clear(); } public int getTotalPartitionCount() { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java index 4f53ba666e..7ae553caa7 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessor.java @@ -309,6 +309,7 @@ public void shutdown() { coordinatorEventManager.close(); rebalanceManager.close(); onShutdown(); + coordinatorContext.resetContext(); } private ServerInfo getCoordinatorServerInfo() { @@ -1618,7 +1619,10 @@ private void updateReplicaAssignmentForBucket( tableAssignment.forEach( (bucket, replicas) -> newTableAssignment.put(bucket, new BucketAssignment(replicas))); - zooKeeperClient.updateTableAssignment(tableId, new TableAssignment(newTableAssignment)); + zooKeeperClient.updateTableAssignment( + tableId, + new TableAssignment(newTableAssignment), + coordinatorContext.getCoordinatorZkVersion()); } else { Map> partitionAssignment = coordinatorContext.getPartitionAssignment( @@ -1629,7 +1633,9 @@ private void updateReplicaAssignmentForBucket( (bucket, replicas) -> newPartitionAssignment.put(bucket, new BucketAssignment(replicas))); zooKeeperClient.updatePartitionAssignment( - partitionId, new PartitionAssignment(tableId, newPartitionAssignment)); + partitionId, + new PartitionAssignment(tableId, newPartitionAssignment), + coordinatorContext.getCoordinatorZkVersion()); } } @@ -1675,7 +1681,8 @@ private List tryProcessAdjustIsr( } try { - zooKeeperClient.batchUpdateLeaderAndIsr(newLeaderAndIsrList); + zooKeeperClient.batchUpdateLeaderAndIsr( + newLeaderAndIsrList, coordinatorContext.getCoordinatorZkVersion()); newLeaderAndIsrList.forEach( (tableBucket, newLeaderAndIsr) -> result.add(new AdjustIsrResultForBucket(tableBucket, newLeaderAndIsr))); @@ -1686,7 +1693,10 @@ private List tryProcessAdjustIsr( TableBucket tableBucket = entry.getKey(); LeaderAndIsr newLeaderAndIsr = entry.getValue(); try { - zooKeeperClient.updateLeaderAndIsr(tableBucket, newLeaderAndIsr); + zooKeeperClient.updateLeaderAndIsr( + tableBucket, + newLeaderAndIsr, + coordinatorContext.getCoordinatorZkVersion()); } catch (Exception e) { LOG.error("Error when register leader and isr.", e); result.add( @@ -2212,7 +2222,8 @@ private void updateBucketEpochAndSendRequest(TableBucket tableBucket, List cleanup = pendingCleanup.get(); // Run init on a separate thread to avoid deadlock with // Curator's EventThread when performing ZK operations. + leaderCallbackExecutor.execute( () -> { // Wait for any pending cleanup to finish first. @@ -123,16 +125,22 @@ public void isLeader() { } try { initLeaderServices.run(); + // Set leader flag after init completes, so when zk found + // this leader, the leader can accept requests + isLeader.set(true); + } catch (CoordinatorEpochFencedException e) { + LOG.warn( + "Coordinator server {} has been fenced and not become leader successfully.", + serverId); + notLeader(); } catch (Exception e) { LOG.error( "Failed to initialize leader services for server {}", serverId, e); + notLeader(); } }); - // Set leader flag before init completes, so when zk found this leader, the - // leader can accept requests - isLeader.set(true); } @Override diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java index 60eaa988c9..3441053671 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorRequestBatch.java @@ -683,6 +683,7 @@ private UpdateMetadataRequest buildUpdateMetadataRequest() { // tablet servers. return makeUpdateMetadataRequest( coordinatorContext.getCoordinatorServerInfo(), + coordinatorContext.getCoordinatorEpoch(), new HashSet<>(coordinatorContext.getLiveTabletServers().values()), tableMetadataList, partitionMetadataList); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java index 25a0d866fe..f44bbd7a8d 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java @@ -39,6 +39,7 @@ import org.apache.fluss.server.metrics.ServerMetricUtils; import org.apache.fluss.server.metrics.group.CoordinatorMetricGroup; import org.apache.fluss.server.metrics.group.LakeTieringMetricGroup; +import org.apache.fluss.server.zk.ZkEpoch; import org.apache.fluss.server.zk.ZooKeeperClient; import org.apache.fluss.server.zk.ZooKeeperUtils; import org.apache.fluss.server.zk.data.CoordinatorAddress; @@ -140,9 +141,6 @@ public class CoordinatorServer extends ServerBase { @Nullable private Authorizer authorizer; - @GuardedBy("lock") - private CoordinatorContext coordinatorContext; - @GuardedBy("lock") private DynamicConfigManager dynamicConfigManager; @@ -232,7 +230,6 @@ protected void initCoordinatorStandby() throws Exception { dynamicConfigManager.startup(); - this.coordinatorContext = new CoordinatorContext(); this.metadataCache = new CoordinatorMetadataCache(); this.authorizer = AuthorizerLoader.createAuthorizer(conf, zkClient, pluginManager); @@ -294,6 +291,9 @@ protected void initCoordinatorStandby() throws Exception { } protected void initCoordinatorLeader() throws Exception { + // to avoid split-brain + ZkEpoch zkEpoch = zkClient.fenceBecomeCoordinatorLeader(serverId); + registerCoordinatorLeader(); synchronized (lock) { this.clientMetricGroup = new ClientMetricGroup(metricRegistry, SERVER_NAME); @@ -305,12 +305,11 @@ protected void initCoordinatorLeader() throws Exception { new AutoPartitionManager(metadataCache, metadataManager, conf); autoPartitionManager.start(); - registerCoordinatorLeader(); // start coordinator event processor after we register coordinator leader to zk - // so that the event processor can get the coordinator leader node from zk during start - // up. - // in HA for coordinator server, the processor also need to know the leader node during - // start up + // so that the event processor can get the coordinator leader node from zk during + // start up. In HA for coordinator server, the processor also need to know the leader + // node during start up + CoordinatorContext coordinatorContext = new CoordinatorContext(zkEpoch); this.coordinatorEventProcessor = new CoordinatorEventProcessor( zkClient, @@ -337,10 +336,20 @@ protected void initCoordinatorLeader() throws Exception { * from leader to standby. It cleans up leader-only resources while keeping the server running * as a standby, ready to participate in future elections. */ - protected void cleanupCoordinatorLeader() throws Exception { + protected void cleanupCoordinatorLeader() { synchronized (lock) { LOG.info("Cleaning up coordinator leader services."); + try { + // make sure the current coordinator leader node is unregistered. + // Different from ZK disconnection, + // when we actively release the Leader's election, + // we need to manually delete the node + unregisterCoordinatorLeader(); + } catch (Throwable t) { + LOG.warn("Failed to unregister coordinator leader from Zookeeper", t); + } + // Clean up leader-specific resources in reverse order of initialization try { if (coordinatorEventProcessor != null) { @@ -387,11 +396,6 @@ protected void cleanupCoordinatorLeader() throws Exception { LOG.warn("Failed to close client metric group", t); } - // Reset coordinator context for next election - if (coordinatorContext != null) { - coordinatorContext.resetContext(); - } - LOG.info("Coordinator leader services cleaned up successfully."); } } @@ -427,6 +431,11 @@ private void registerCoordinatorLeader() throws Exception { "coordinator leader", () -> zkClient.registerCoordinatorLeader(coordinatorAddress)); } + private void unregisterCoordinatorLeader() throws Exception { + CoordinatorAddress coordinatorAddress = buildCoordinatorAddress(); + zkClient.unregisterCoordinatorLeader(coordinatorAddress); + } + private CoordinatorAddress buildCoordinatorAddress() { List bindEndpoints = rpcServer.getBindEndpoints(); return new CoordinatorAddress( @@ -581,15 +590,6 @@ CompletableFuture stopServices() { exception = ExceptionUtils.firstOrSuppressed(t, exception); } - try { - if (coordinatorContext != null) { - // then reset coordinatorContext - coordinatorContext.resetContext(); - } - } catch (Throwable t) { - exception = ExceptionUtils.firstOrSuppressed(t, exception); - } - try { if (lakeTableTieringManager != null) { lakeTableTieringManager.close(); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachine.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachine.java index 091c2cbc9c..63f5dd9652 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachine.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachine.java @@ -487,7 +487,8 @@ private Map doRemoveReplicaFromIsr( toUpdateLeaderAndIsrList.put(tableBucket, adjustLeaderAndIsr); } try { - zooKeeperClient.batchUpdateLeaderAndIsr(toUpdateLeaderAndIsrList); + zooKeeperClient.batchUpdateLeaderAndIsr( + toUpdateLeaderAndIsrList, coordinatorContext.getCoordinatorZkVersion()); toUpdateLeaderAndIsrList.forEach(coordinatorContext::putBucketLeaderAndIsr); return adjustedLeaderAndIsr; } catch (Exception e) { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachine.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachine.java index 85dcc434f4..300e98f08f 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachine.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachine.java @@ -311,7 +311,8 @@ private Optional initLeaderForTableBuckets( ElectionResult electionResult = optionalElectionResult.get(); LeaderAndIsr leaderAndIsr = electionResult.leaderAndIsr; try { - zooKeeperClient.registerLeaderAndIsr(tableBucket, leaderAndIsr); + zooKeeperClient.registerLeaderAndIsr( + tableBucket, leaderAndIsr, coordinatorContext.getCoordinatorZkVersion()); } catch (Exception e) { LOG.error( "Fail to create state node for table bucket {} in zookeeper.", @@ -379,7 +380,7 @@ public void batchHandleOnlineChangeAndInitLeader(Set tableBuckets) if (!tableBucketLeadAndIsrInfos.isEmpty()) { try { zooKeeperClient.batchRegisterLeaderAndIsrForTablePartition( - tableBucketLeadAndIsrInfos); + tableBucketLeadAndIsrInfos, coordinatorContext.getCoordinatorZkVersion()); registerSuccessList.addAll(tableBucketLeadAndIsrInfos); } catch (Exception e) { LOG.error( @@ -454,7 +455,10 @@ private List tryRegisterLeaderAndIsrOneByOne( List registerSuccessList = new ArrayList<>(); for (RegisterTableBucketLeadAndIsrInfo info : registerList) { try { - zooKeeperClient.registerLeaderAndIsr(info.getTableBucket(), info.getLeaderAndIsr()); + zooKeeperClient.registerLeaderAndIsr( + info.getTableBucket(), + info.getLeaderAndIsr(), + coordinatorContext.getCoordinatorZkVersion()); registerSuccessList.add(info); } catch (Exception e) { LOG.error( @@ -496,7 +500,10 @@ private Optional electNewLeaderForTableBuckets( } ElectionResult electionResult = optionalElectionResult.get(); try { - zooKeeperClient.updateLeaderAndIsr(tableBucket, electionResult.leaderAndIsr); + zooKeeperClient.updateLeaderAndIsr( + tableBucket, + electionResult.leaderAndIsr, + coordinatorContext.getCoordinatorZkVersion()); } catch (Exception e) { LOG.error( "Fail to update bucket LeaderAndIsr for table bucket {}.", diff --git a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/CoordinatorMetricGroup.java b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/CoordinatorMetricGroup.java index 72ff328a7d..5ae0999230 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/CoordinatorMetricGroup.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/metrics/group/CoordinatorMetricGroup.java @@ -71,7 +71,7 @@ protected String getGroupName(CharacterFilter filter) { protected final void putVariables(Map variables) { variables.put("cluster_id", clusterId); variables.put("host", hostname); - variables.put("server_id", serverId); + variables.put("server_id", String.valueOf(serverId)); } public CoordinatorEventMetricGroup getOrAddEventTypeMetricGroup( diff --git a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java index ee3237032c..19ab3f936a 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java @@ -336,6 +336,11 @@ public int getMinInSyncReplicas() { return minInSyncReplicas; } + @VisibleForTesting + public int getCoordinatorEpoch() { + return coordinatorEpoch; + } + // ============ ServerReconfigurable Implementation ============ @Override @@ -1879,9 +1884,14 @@ private void validateAndApplyCoordinatorEpoch(int requestCoordinatorEpoch, Strin requestCoordinatorEpoch, requestName, this.coordinatorEpoch); LOG.warn("Ignore the {} request because {}", requestName, errorMessage); throw new InvalidCoordinatorException(errorMessage); - } else { + } else if (requestCoordinatorEpoch > this.coordinatorEpoch) { + LOG.info( + "Update coordinator epoch from {} to {} for coordinator leader switch.", + this.coordinatorEpoch, + requestCoordinatorEpoch); this.coordinatorEpoch = requestCoordinatorEpoch; } + // ignore equal case } private void dropEmptyTableOrPartitionDir(Path dir, long id, String dirType) { diff --git a/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java b/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java index 02d02c29ee..d37c36148e 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/utils/ServerRpcMessageUtils.java @@ -461,6 +461,7 @@ public static MetadataResponse buildMetadataResponse( public static UpdateMetadataRequest makeUpdateMetadataRequest( @Nullable ServerInfo coordinatorServer, + @Nullable Integer coordinatorEpoch, Set aliveTableServers, List tableMetadataList, List partitionMetadataList) { @@ -503,6 +504,9 @@ public static UpdateMetadataRequest makeUpdateMetadataRequest( updateMetadataRequest.addAllTableMetadatas(pbTableMetadataList); updateMetadataRequest.addAllPartitionMetadatas(pbPartitionMetadataList); + if (coordinatorEpoch != null) { + updateMetadataRequest.setCoordinatorEpoch(coordinatorEpoch); + } return updateMetadataRequest; } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZkEpoch.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZkEpoch.java new file mode 100644 index 0000000000..3f39843733 --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZkEpoch.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.zk; + +import static org.apache.fluss.server.coordinator.CoordinatorContext.INITIAL_COORDINATOR_EPOCH; +import static org.apache.fluss.server.coordinator.CoordinatorContext.INITIAL_COORDINATOR_EPOCH_ZK_VERSION; + +/** Class for coordinator epoch and coordinator epoch zk version. */ +public class ZkEpoch { + + public static final ZkEpoch INITIAL_EPOCH = + new ZkEpoch(INITIAL_COORDINATOR_EPOCH, INITIAL_COORDINATOR_EPOCH_ZK_VERSION); + + private final int coordinatorEpoch; + private final int coordinatorEpochZkVersion; + + public ZkEpoch(int coordinatorEpoch, int coordinatorEpochZkVersion) { + this.coordinatorEpoch = coordinatorEpoch; + this.coordinatorEpochZkVersion = coordinatorEpochZkVersion; + } + + public ZkEpoch nextZkEpoch() { + return new ZkEpoch(coordinatorEpoch + 1, coordinatorEpochZkVersion + 1); + } + + public int getCoordinatorEpoch() { + return coordinatorEpoch; + } + + public int getCoordinatorEpochZkVersion() { + return coordinatorEpochZkVersion; + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java index 78667ca48d..cb34a2f2ed 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java @@ -22,6 +22,7 @@ import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; import org.apache.fluss.config.FlussConfigUtils; +import org.apache.fluss.exception.CoordinatorEpochFencedException; import org.apache.fluss.metadata.DatabaseSummary; import org.apache.fluss.metadata.PhysicalTablePath; import org.apache.fluss.metadata.ResolvedPartitionSpec; @@ -34,6 +35,7 @@ import org.apache.fluss.security.acl.Resource; import org.apache.fluss.security.acl.ResourceType; import org.apache.fluss.server.authorizer.DefaultAuthorizer.VersionedAcls; +import org.apache.fluss.server.coordinator.CoordinatorContext; import org.apache.fluss.server.entity.RegisterTableBucketLeadAndIsrInfo; import org.apache.fluss.server.metadata.BucketMetadata; import org.apache.fluss.server.zk.ZkAsyncRequest.ZkCheckExistsRequest; @@ -86,6 +88,7 @@ import org.apache.fluss.server.zk.data.ZkData.TableZNode; import org.apache.fluss.server.zk.data.ZkData.TablesZNode; import org.apache.fluss.server.zk.data.ZkData.WriterIdZNode; +import org.apache.fluss.server.zk.data.ZkVersion; import org.apache.fluss.server.zk.data.lake.LakeTable; import org.apache.fluss.server.zk.data.lake.LakeTableSnapshot; import org.apache.fluss.server.zk.data.lease.KvSnapshotLeaseMetadata; @@ -127,11 +130,48 @@ import static java.util.stream.Collectors.toMap; import static org.apache.fluss.metadata.ResolvedPartitionSpec.fromPartitionName; +import static org.apache.fluss.server.zk.ZooKeeperOp.multiRequest; import static org.apache.fluss.utils.Preconditions.checkNotNull; /** * This class includes methods for write/read various metadata (leader address, tablet server * registration, table assignment, table, schema) in Zookeeper. + * + *

In some method, 'expectedZkVersion' is used to execute an epoch Zookeeper version check. + * Conditions requiring epoch checks (all must be met): + * + *

+ * ┌─────────────────────────────────────────────────────────┐
+ * │ 1. Invoked by the Coordinator (not the TabletServer)    │
+ * │ 2. Operates on persistent nodes (not ephemeral)         │
+ * │ 3. Constitutes a "control plane" operation:             │
+ * │    partition assignment or LeaderAndIsr election        │
+ * │ 4. Concurrent access to the same path by old and new    │
+ * │    leaders during leader failover                       │
+ * │ 5. No other mechanisms (optimistic locking,             │
+ * │    idempotency, or reloading) provide fallback          │
+ * └─────────────────────────────────────────────────────────┘
+ * 
+ * + *

In practice, only two types of operations truly require this: + * + *

    + *
  • CRUD for Table/Partition Assignment (assignment decisions). + *
  • CRUD for LeaderAndIsr (leader election results). + *
+ * + *

These operations are inevitably executed concurrently by the old and new coordinators during + * failover (as the new leader immediately reassigns partitions), and overwrites cannot be + * automatically recovered. + * + *

All other operations do not require epoch checks because: + * + *

    + *
  • DDL operations are protected against concurrency by client reconnection mechanisms. + *
  • TabletServer operations are unaffected by coordinator failovers. + *
  • ACLs and Configs have their own version control or idempotency guarantees. + *
  • Ephemeral nodes are managed via session lifecycle. + *
*/ @Internal public class ZooKeeperClient implements AutoCloseable { @@ -144,6 +184,7 @@ public class ZooKeeperClient implements AutoCloseable { private final CuratorFrameworkWithUnhandledErrorListener curatorFrameworkWrapper; private final CuratorFramework zkClient; + private final ZooKeeperOp zkOp; private final ZkSequenceIDCounter tableIdCounter; private final ZkSequenceIDCounter partitionIdCounter; private final ZkSequenceIDCounter writerIdCounter; @@ -158,6 +199,7 @@ public ZooKeeperClient( Configuration configuration) { this.curatorFrameworkWrapper = curatorFrameworkWrapper; this.zkClient = curatorFrameworkWrapper.asCuratorFramework(); + this.zkOp = new ZooKeeperOp(zkClient); this.tableIdCounter = new ZkSequenceIDCounter(zkClient, TableSequenceIdZNode.path()); this.partitionIdCounter = new ZkSequenceIDCounter(zkClient, PartitionSequenceIdZNode.path()); @@ -197,6 +239,49 @@ public void registerCoordinatorServer(CoordinatorAddress coordinatorAddress) thr LOG.info("Registered Coordinator server {} at path {}.", coordinatorAddress, path); } + /** + * Become coordinator leader. This method is a step after electCoordinatorLeader() and before + * registerCoordinatorLeader(). This is to ensure the coordinator get and update the coordinator + * epoch and coordinator epoch zk version. + */ + public ZkEpoch fenceBecomeCoordinatorLeader(String coordinatorId) throws Exception { + ensureEpochZnodeExists(); + + try { + ZkEpoch getEpoch = getCurrentEpoch(); + int currentCoordinatorEpoch = getEpoch.getCoordinatorEpoch(); + int currentCoordinatorEpochZkVersion = getEpoch.getCoordinatorEpochZkVersion(); + int newCoordinatorEpoch = currentCoordinatorEpoch + 1; + LOG.info( + "Coordinator leader {} tries to update epoch. Current epoch={}, Zookeeper version={}, new epoch={}", + coordinatorId, + currentCoordinatorEpoch, + currentCoordinatorEpochZkVersion, + newCoordinatorEpoch); + + // atomically update epoch + Stat stat = + zkClient.setData() + .withVersion(currentCoordinatorEpochZkVersion) + .forPath( + ZkData.CoordinatorEpochZNode.path(), + ZkData.CoordinatorEpochZNode.encode(newCoordinatorEpoch)); + + LOG.info( + "Coordinator leader has updated epoch. Current epoch={}, Zookeeper version={}", + newCoordinatorEpoch, + stat.getVersion()); + + return new ZkEpoch(newCoordinatorEpoch, stat.getVersion()); + } catch (KeeperException.BadVersionException e) { + // Other coordinator leader has updated epoch. + // If this happens, it means our fence is in effect. + LOG.info("Coordinator leader {} failed to update epoch.", coordinatorId); + throw new CoordinatorEpochFencedException( + "Coordinator leader election has been fenced."); + } + } + /** Register a coordinator leader to ZK. */ public void registerCoordinatorLeader(CoordinatorAddress coordinatorAddress) throws Exception { String path = ZkData.CoordinatorLeaderZNode.path(); @@ -207,6 +292,23 @@ public void registerCoordinatorLeader(CoordinatorAddress coordinatorAddress) thr LOG.info("Registered Coordinator leader {} at path {}.", coordinatorAddress, path); } + /** Manually unregister a coordinator leader from ZK. */ + public void unregisterCoordinatorLeader(CoordinatorAddress coordinatorAddress) + throws Exception { + String path = ZkData.CoordinatorLeaderZNode.path(); + Stat stat = new Stat(); + byte[] bytes = zkClient.getData().storingStatIn(stat).forPath(path); + if (bytes == null || bytes.length == 0) { + // not exists, not need to unregister + return; + } + CoordinatorAddress storedAddress = ZkData.CoordinatorLeaderZNode.decode(bytes); + if (storedAddress.getId().equals(coordinatorAddress.getId())) { + zkClient.delete().withVersion(stat.getVersion()).forPath(path); + LOG.info("Unregistered Coordinator leader {} at path {}.", coordinatorAddress, path); + } + } + /** Get the leader address registered in ZK. */ public Optional getCoordinatorLeaderAddress() throws Exception { Optional bytes = getOrEmpty(ZkData.CoordinatorLeaderZNode.path()); @@ -221,6 +323,35 @@ public List getCoordinatorServerList() throws Exception { return getChildren(ZkData.CoordinatorIdsZNode.path()); } + /** Ensure epoch znode exists. */ + public void ensureEpochZnodeExists() throws Exception { + String path = ZkData.CoordinatorEpochZNode.path(); + if (zkClient.checkExists().forPath(path) == null) { + try { + zkClient.create() + .creatingParentsIfNeeded() + .withMode(CreateMode.PERSISTENT) + .forPath( + path, + ZkData.CoordinatorEpochZNode.encode( + CoordinatorContext.INITIAL_COORDINATOR_EPOCH - 1)); + } catch (KeeperException.NodeExistsException e) { + // can be ignored when two coordinator almost simultaneously create the epoch znode + } + } + } + + /** Get epoch now in ZK. */ + public ZkEpoch getCurrentEpoch() throws Exception { + Stat currentStat = new Stat(); + byte[] bytes = + zkClient.getData() + .storingStatIn(currentStat) + .forPath(ZkData.CoordinatorEpochZNode.path()); + int currentEpoch = ZkData.CoordinatorEpochZNode.decode(bytes); + int currentVersion = currentStat.getVersion(); + return new ZkEpoch(currentEpoch, currentVersion); + } // -------------------------------------------------------------------------------------------- // Tablet server // -------------------------------------------------------------------------------------------- @@ -329,17 +460,26 @@ public Map getPartitionsAssignments(Collection "partition assignment"); } - public void updateTableAssignment(long tableId, TableAssignment tableAssignment) - throws Exception { + public void updateTableAssignment( + long tableId, TableAssignment tableAssignment, int expectedZkVersion) throws Exception { String path = TableIdZNode.path(tableId); - zkClient.setData().forPath(path, TableIdZNode.encode(tableAssignment)); + byte[] data = TableIdZNode.encode(tableAssignment); + CuratorOp updateOp = zkOp.updateOp(path, data); + List ops = wrapRequestWithEpochCheck(updateOp, expectedZkVersion); + + zkClient.transaction().forOperations(ops); LOG.debug("Updated table assignment {} for table id {}.", tableAssignment, tableId); } - public void updatePartitionAssignment(long partitionId, PartitionAssignment partitionAssignment) + public void updatePartitionAssignment( + long partitionId, PartitionAssignment partitionAssignment, int expectedZkVersion) throws Exception { String path = PartitionIdZNode.path(partitionId); - zkClient.setData().forPath(path, PartitionIdZNode.encode(partitionAssignment)); + byte[] data = PartitionIdZNode.encode(partitionAssignment); + CuratorOp updateOp = zkOp.updateOp(path, data); + List ops = wrapRequestWithEpochCheck(updateOp, expectedZkVersion); + + zkClient.transaction().forOperations(ops); LOG.debug( "Updated partition assignment {} for partition id {}.", partitionAssignment, @@ -363,18 +503,20 @@ public void deletePartitionAssignment(long partitionId) throws Exception { // -------------------------------------------------------------------------------------------- /** Register bucket LeaderAndIsr to ZK. */ - public void registerLeaderAndIsr(TableBucket tableBucket, LeaderAndIsr leaderAndIsr) + public void registerLeaderAndIsr( + TableBucket tableBucket, LeaderAndIsr leaderAndIsr, int expectedZkVersion) throws Exception { + String path = LeaderAndIsrZNode.path(tableBucket); - zkClient.create() - .creatingParentsIfNeeded() - .withMode(CreateMode.PERSISTENT) - .forPath(path, LeaderAndIsrZNode.encode(leaderAndIsr)); + byte[] data = LeaderAndIsrZNode.encode(leaderAndIsr); + + createRecursiveWithEpochCheck(path, data, expectedZkVersion, false); LOG.info("Registered {} for bucket {} in Zookeeper.", leaderAndIsr, tableBucket); } public void batchRegisterLeaderAndIsrForTablePartition( - List registerList) throws Exception { + List registerList, int expectedZkVersion) + throws Exception { if (registerList.isEmpty()) { return; } @@ -410,12 +552,14 @@ public void batchRegisterLeaderAndIsrForTablePartition( ops.add(parentNodeCreate); ops.add(currentNodeCreate); if (ops.size() == MAX_BATCH_SIZE) { - zkClient.transaction().forOperations(ops); + List wrapOps = wrapRequestsWithEpochCheck(ops, expectedZkVersion); + zkClient.transaction().forOperations(wrapOps); ops.clear(); } } if (!ops.isEmpty()) { - zkClient.transaction().forOperations(ops); + List wrapOps = wrapRequestsWithEpochCheck(ops, expectedZkVersion); + zkClient.transaction().forOperations(wrapOps); } LOG.info( "Batch registered leadAndIsr for tableId: {}, partitionId: {}, partitionName: {} in Zookeeper.", @@ -447,14 +591,21 @@ public Map getLeaderAndIsrs(Collection t "leader and isr"); } - public void updateLeaderAndIsr(TableBucket tableBucket, LeaderAndIsr leaderAndIsr) + public void updateLeaderAndIsr( + TableBucket tableBucket, LeaderAndIsr leaderAndIsr, int expectedZkVersion) throws Exception { String path = LeaderAndIsrZNode.path(tableBucket); - zkClient.setData().forPath(path, LeaderAndIsrZNode.encode(leaderAndIsr)); + byte[] data = LeaderAndIsrZNode.encode(leaderAndIsr); + + CuratorOp updateOp = zkOp.updateOp(path, data); + List ops = wrapRequestWithEpochCheck(updateOp, expectedZkVersion); + + zkClient.transaction().forOperations(ops); LOG.info("Updated {} for bucket {} in Zookeeper.", leaderAndIsr, tableBucket); } - public void batchUpdateLeaderAndIsr(Map leaderAndIsrList) + public void batchUpdateLeaderAndIsr( + Map leaderAndIsrList, int expectedZkVersion) throws Exception { if (leaderAndIsrList.isEmpty()) { return; @@ -471,16 +622,18 @@ public void batchUpdateLeaderAndIsr(Map leaderAndIsrL CuratorOp updateOp = zkClient.transactionOp().setData().forPath(path, data); ops.add(updateOp); if (ops.size() == MAX_BATCH_SIZE) { - zkClient.transaction().forOperations(ops); + List wrapOps = wrapRequestsWithEpochCheck(ops, expectedZkVersion); + zkClient.transaction().forOperations(wrapOps); ops.clear(); } } if (!ops.isEmpty()) { - zkClient.transaction().forOperations(ops); + List wrapOps = wrapRequestsWithEpochCheck(ops, expectedZkVersion); + zkClient.transaction().forOperations(wrapOps); } } - public void deleteLeaderAndIsr(TableBucket tableBucket) throws Exception { + protected void deleteLeaderAndIsr(TableBucket tableBucket) throws Exception { String path = LeaderAndIsrZNode.path(tableBucket); zkClient.delete().forPath(path); LOG.info("Deleted LeaderAndIsr for bucket {} in Zookeeper.", tableBucket); @@ -1781,6 +1934,79 @@ public static Map> processGetChildrenResponses( return result; } + /** + * create a node (recursively if parent path not exists) with Zk epoch version check. + * + * @param path the path to create + * @param data the data to write + * @param throwIfPathExists whether to throw exception if path exist + * @throws Exception if any error occurs + */ + public void createRecursiveWithEpochCheck( + String path, byte[] data, int expectedZkVersion, boolean throwIfPathExists) + throws Exception { + CuratorOp createOp = zkOp.createOp(path, data, CreateMode.PERSISTENT); + List ops = wrapRequestWithEpochCheck(createOp, expectedZkVersion); + + try { + // try to directly create + zkClient.transaction().forOperations(ops); + } catch (KeeperException.NodeExistsException e) { + // should not exist + if (throwIfPathExists) { + throw e; + } + } catch (KeeperException.NoNodeException e) { + // if parent does not exist, create parent first + int indexOfLastSlash = path.lastIndexOf("/"); + if (indexOfLastSlash == -1) { + throw new IllegalArgumentException("Invalid path: " + path); + } else if (indexOfLastSlash == 0) { + // root path can be directly create without fence + try { + zkClient.create() + .creatingParentsIfNeeded() + .withMode(CreateMode.PERSISTENT) + .forPath(path); + } catch (KeeperException.NodeExistsException ignored) { + // ignore + } + } else { + // indexOfLastSlash > 0 + String parentPath = path.substring(0, indexOfLastSlash); + createRecursiveWithEpochCheck( + parentPath, null, expectedZkVersion, throwIfPathExists); + // After creating parent (or if parent is root), retry creating the original path + zkClient.transaction().forOperations(ops); + } + } catch (KeeperException.BadVersionException e) { + LOG.error("Bad version for path {}, expected version {} ", path, expectedZkVersion); + throw e; + } + } + + public List wrapRequestWithEpochCheck(CuratorOp request, int expectedZkVersion) + throws Exception { + return wrapRequestsWithEpochCheck(Collections.singletonList(request), expectedZkVersion); + } + + public List wrapRequestsWithEpochCheck( + List requestList, int expectedZkVersion) throws Exception { + if (ZkVersion.MATCH_ANY_VERSION.getVersion() == expectedZkVersion) { + return requestList; + } else if (expectedZkVersion >= 0) { + CuratorOp checkOp = + zkOp.checkOp(ZkData.CoordinatorEpochZNode.path(), expectedZkVersion); + return multiRequest(checkOp, requestList); + } else { + throw new IllegalArgumentException( + "Expected coordinator epoch zkVersion " + + expectedZkVersion + + " should be non-negative or equal to " + + ZkVersion.MATCH_ANY_VERSION.getVersion()); + } + } + // -------------------------------------------------------------------------------------------- // Producer Offset Snapshot // -------------------------------------------------------------------------------------------- diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperOp.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperOp.java new file mode 100644 index 0000000000..e44670d643 --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperOp.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.zk; + +import org.apache.fluss.shaded.curator5.org.apache.curator.framework.CuratorFramework; +import org.apache.fluss.shaded.curator5.org.apache.curator.framework.api.transaction.CuratorOp; +import org.apache.fluss.shaded.zookeeper3.org.apache.zookeeper.CreateMode; + +import java.util.ArrayList; +import java.util.List; + +/** This class contains some utility methods for wrap/unwrap operations for Zookeeper. */ +public class ZooKeeperOp { + private final CuratorFramework zkClient; + + public ZooKeeperOp(CuratorFramework zkClient) { + this.zkClient = zkClient; + } + + public CuratorOp checkOp(String path, int expectZkVersion) throws Exception { + return zkClient.transactionOp().check().withVersion(expectZkVersion).forPath(path); + } + + public CuratorOp createOp(String path, byte[] data, CreateMode createMode) throws Exception { + return zkClient.transactionOp().create().withMode(createMode).forPath(path, data); + } + + public CuratorOp updateOp(String path, byte[] data) throws Exception { + return zkClient.transactionOp().setData().forPath(path, data); + } + + public CuratorOp deleteOp(String path) throws Exception { + return zkClient.transactionOp().delete().forPath(path); + } + + public static List multiRequest(CuratorOp op1, CuratorOp op2) { + List ops = new ArrayList<>(); + ops.add(op1); + ops.add(op2); + return ops; + } + + public static List multiRequest(CuratorOp op, List ops) { + List result = new ArrayList<>(); + result.add(op); + result.addAll(ops); + return result; + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java index 32b5a7c01d..941512e0f2 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java @@ -366,6 +366,24 @@ public static CoordinatorAddress decode(byte[] json) { } } + /** + * The znode for the coordinator epoch. The znode path is: + * + *

/coordinators/epoch + */ + public static final class CoordinatorEpochZNode { + public static String path() { + return "/coordinators/epoch"; + } + + public static byte[] encode(int epoch) { + return String.valueOf(epoch).getBytes(StandardCharsets.UTF_8); + } + + public static int decode(byte[] bytes) { + return Integer.parseInt(new String(bytes, StandardCharsets.UTF_8)); + } + } // ------------------------------------------------------------------------------------------ // ZNodes under "/tabletservers/" // ------------------------------------------------------------------------------------------ diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkVersion.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkVersion.java new file mode 100644 index 0000000000..26242df0f3 --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkVersion.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.zk.data; + +/** Enum to represent the type of special Zookeeper version. */ +public enum ZkVersion { + MATCH_ANY_VERSION(-1), + UNKNOWN_VERSION(-2); + + private final int version; + + ZkVersion(int version) { + this.version = version; + } + + public int getVersion() { + return version; + } +} diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorChannelManagerTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorChannelManagerTest.java index 9d8eda15bf..31c07d75cd 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorChannelManagerTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorChannelManagerTest.java @@ -92,6 +92,7 @@ private void checkSendRequest( // we use update metadata request to test for simplicity UpdateMetadataRequest updateMetadataRequest = makeUpdateMetadataRequest( + null, null, Collections.emptySet(), Collections.emptyList(), diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorContextTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorContextTest.java index 5c321dfffa..2b788bdb48 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorContextTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorContextTest.java @@ -22,6 +22,7 @@ import org.apache.fluss.metadata.TableDescriptor; import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePath; +import org.apache.fluss.server.zk.ZkEpoch; import org.apache.fluss.types.DataTypes; import org.junit.jupiter.api.Test; @@ -37,7 +38,7 @@ class CoordinatorContextTest { @Test void testGetLakeTableCount() { - CoordinatorContext context = new CoordinatorContext(); + CoordinatorContext context = new CoordinatorContext(ZkEpoch.INITIAL_EPOCH); // Initially, there should be no tables assertThat(context.allTables()).isEmpty(); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java index 997a8738f6..63f0b5fd90 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorEventProcessorTest.java @@ -65,6 +65,7 @@ import org.apache.fluss.server.metrics.group.TestingMetricGroups; import org.apache.fluss.server.tablet.TestTabletServerGateway; import org.apache.fluss.server.zk.NOPErrorHandler; +import org.apache.fluss.server.zk.ZkEpoch; import org.apache.fluss.server.zk.ZooKeeperClient; import org.apache.fluss.server.zk.ZooKeeperExtension; import org.apache.fluss.server.zk.data.BucketAssignment; @@ -148,6 +149,7 @@ class CoordinatorEventProcessorTest { private static ZooKeeperClient zookeeperClient; private static MetadataManager metadataManager; + private static ZkEpoch zkEpoch; private CoordinatorEventProcessor eventProcessor; private final String defaultDatabase = "db"; @@ -176,6 +178,7 @@ static void baseBeforeAll() throws Exception { new CoordinatorAddress( "2", Endpoint.fromListenersString("CLIENT://localhost:10012"))); + zkEpoch = zookeeperClient.fenceBecomeCoordinatorLeader("2"); // register 3 tablet servers for (int i = 0; i < 3; i++) { zookeeperClient.registerTabletServer( @@ -1060,7 +1063,7 @@ private CoordinatorEventProcessor buildCoordinatorEventProcessor() { zookeeperClient, serverMetadataCache, testCoordinatorChannelManager, - new CoordinatorContext(), + new CoordinatorContext(zkEpoch), autoPartitionManager, lakeTableTieringManager, TestingMetricGroups.COORDINATOR_METRICS, diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorHighAvailabilityITCase.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorHighAvailabilityITCase.java index 5d2bcd5745..68e46e3590 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorHighAvailabilityITCase.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorHighAvailabilityITCase.java @@ -23,18 +23,25 @@ import org.apache.fluss.cluster.ServerType; import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; +import org.apache.fluss.exception.InvalidCoordinatorException; import org.apache.fluss.exception.NotCoordinatorLeaderException; +import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.rpc.GatewayClientProxy; import org.apache.fluss.rpc.RpcClient; import org.apache.fluss.rpc.gateway.CoordinatorGateway; +import org.apache.fluss.rpc.gateway.TabletServerGateway; import org.apache.fluss.rpc.messages.CreateDatabaseRequest; import org.apache.fluss.rpc.messages.MetadataRequest; +import org.apache.fluss.rpc.messages.UpdateMetadataRequest; import org.apache.fluss.rpc.metrics.TestingClientMetricGroup; +import org.apache.fluss.server.tablet.TabletServer; import org.apache.fluss.server.zk.NOPErrorHandler; import org.apache.fluss.server.zk.ZooKeeperClient; import org.apache.fluss.server.zk.ZooKeeperExtension; import org.apache.fluss.server.zk.data.CoordinatorAddress; +import org.apache.fluss.server.zk.data.LeaderAndIsr; import org.apache.fluss.shaded.curator5.org.apache.curator.framework.CuratorFramework; +import org.apache.fluss.shaded.zookeeper3.org.apache.zookeeper.KeeperException; import org.apache.fluss.shaded.zookeeper3.org.apache.zookeeper.Watcher; import org.apache.fluss.shaded.zookeeper3.org.apache.zookeeper.ZooKeeper; import org.apache.fluss.testutils.common.AllCallbackWrapper; @@ -44,7 +51,9 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.jupiter.api.io.TempDir; +import java.nio.file.Path; import java.time.Duration; import java.util.Arrays; import java.util.List; @@ -78,8 +87,11 @@ class CoordinatorHighAvailabilityITCase { private CoordinatorServer coordinatorServer1; private CoordinatorServer coordinatorServer2; + private TabletServer tabletServer; private RpcClient rpcClient; + @TempDir Path tempDir; + @BeforeAll static void baseBeforeAll() { zookeeperClient = @@ -102,6 +114,9 @@ void tearDown() throws Exception { if (coordinatorServer2 != null) { coordinatorServer2.close(); } + if (tabletServer != null) { + tabletServer.close(); + } if (rpcClient != null) { rpcClient.close(); } @@ -242,6 +257,120 @@ void testMultipleLeadershipLossAndRecovery() throws Exception { createGatewayForServer(finalLeader).metadata(new MetadataRequest()).get(); } + @Test + void testTabletServerRejectsStaleCoordinatorEpochAfterLeaderSwitch() throws Exception { + coordinatorServer1 = new CoordinatorServer(createConfiguration()); + coordinatorServer2 = new CoordinatorServer(createConfiguration()); + tabletServer = new TabletServer(createTabletServerConfiguration()); + + coordinatorServer1.start(); + coordinatorServer2.start(); + tabletServer.start(); + + waitUntilCoordinatorServerElected(); + CoordinatorAddress firstLeaderAddr = zookeeperClient.getCoordinatorLeaderAddress().get(); + + CoordinatorServer leader = findServerById(firstLeaderAddr.getId()); + CoordinatorServer standby = findServerByNotId(firstLeaderAddr.getId()); + + // Record old coordinator epoch before killing the leader + int oldCoordinatorEpoch = leader.getCoordinatorEventProcessor().getCoordinatorEpoch(); + + killZkSession(leader); + waitUntilNewLeaderElected(leader.getServerId()); + assertThat(zookeeperClient.getCoordinatorLeaderAddress().get().getId()) + .as("After killing leader, standby should become leader") + .isEqualTo(standby.getServerId()); + + int newCoordinatorEpoch = standby.getCoordinatorEventProcessor().getCoordinatorEpoch(); + // wait the new coordinator notifies the tablet server about the new coordinator epoch + waitUntil( + () -> tabletServer.getReplicaManager().getCoordinatorEpoch() == newCoordinatorEpoch, + Duration.ofSeconds(30), + "Tablet server did not update to new coordinator epoch"); + + TabletServerGateway tsGateway = createGatewayForTabletServer(tabletServer); + + // Send request with old coordinator epoch — tablet server should reject it + assertThatThrownBy( + () -> + tsGateway + .updateMetadata( + new UpdateMetadataRequest() + .setCoordinatorEpoch(oldCoordinatorEpoch)) + .get()) + .satisfies( + t -> + assertThat(getRootCause(t)) + .isInstanceOf(InvalidCoordinatorException.class)); + } + + @Test + void testZooKeeperRejectsStaleCoordinatorRequestAfterLeaderSwitch() throws Exception { + // 1. Start two coordinators, confirm leader + // 2. Record current coordinator epoch + // 3. Kill leader's ZK session, trigger leader switch + // 4. Wait for new leader election (epoch should increment) + // 5. Verify new leader can send requests to ZooKeeper + // 6. Verify requests with old ZkVersion epoch are rejected with BadVersionException + coordinatorServer1 = new CoordinatorServer(createConfiguration()); + coordinatorServer2 = new CoordinatorServer(createConfiguration()); + + coordinatorServer1.start(); + coordinatorServer2.start(); + + waitUntilCoordinatorServerElected(); + CoordinatorAddress firstLeaderAddr = zookeeperClient.getCoordinatorLeaderAddress().get(); + + CoordinatorServer leader = findServerById(firstLeaderAddr.getId()); + CoordinatorServer standby = findServerByNotId(firstLeaderAddr.getId()); + + killZkSession(leader); + waitUntilNewLeaderElected(leader.getServerId()); + assertThat(zookeeperClient.getCoordinatorLeaderAddress().get().getId()) + .as("After killing leader, standby should become leader") + .isEqualTo(standby.getServerId()); + + TableBucket tableBucket = new TableBucket(1, 1); + LeaderAndIsr leaderAndIsr = new LeaderAndIsr(0, 0, Arrays.asList(2, 3), 0, 0); + + int newLeaderEpochZkVersion = + standby.getCoordinatorEventProcessor() + .getCoordinatorContext() + .getCoordinatorZkVersion(); + int oldLeaderEpochZkVersion = + leader.getCoordinatorEventProcessor() + .getCoordinatorContext() + .getCoordinatorZkVersion(); + + assertThatThrownBy( + () -> + leader.getZooKeeperClient() + .registerLeaderAndIsr( + tableBucket, leaderAndIsr, oldLeaderEpochZkVersion)) + .satisfies( + t -> + assertThat(getRootCause(t)) + .isInstanceOf(KeeperException.BadVersionException.class)); + standby.getZooKeeperClient() + .registerLeaderAndIsr(tableBucket, leaderAndIsr, newLeaderEpochZkVersion); + assertThat(zookeeperClient.getLeaderAndIsr(tableBucket)).hasValue(leaderAndIsr); + } + + private TabletServerGateway createGatewayForTabletServer(TabletServer server) { + List endpoints = server.getRpcServer().getBindEndpoints(); + Endpoint endpoint = endpoints.get(0); + ServerNode serverNode = + new ServerNode( + server.getServerId(), + endpoint.getHost(), + endpoint.getPort(), + ServerType.TABLET_SERVER); + + return GatewayClientProxy.createGatewayProxy( + () -> serverNode, rpcClient, TabletServerGateway.class); + } + private CoordinatorGateway createGatewayForServer(CoordinatorServer server) { List endpoints = server.getRpcServer().getBindEndpoints(); Endpoint endpoint = endpoints.get(0); @@ -275,11 +404,30 @@ private static Configuration createConfiguration() { return configuration; } - private void waitUntilCoordinatorServerElected() { + private Configuration createTabletServerConfiguration() { + Configuration configuration = createConfiguration(); + configuration.set(ConfigOptions.TABLET_SERVER_ID, 0); + configuration.setString(ConfigOptions.DATA_DIR, tempDir.toAbsolutePath().toString()); + return configuration; + } + + private void waitUntilCoordinatorServerElected() throws Exception { waitUntil( () -> zookeeperClient.getCoordinatorLeaderAddress().isPresent(), Duration.ofMinutes(1), "Fail to wait coordinator server elected"); + waitUntilCoordinatorLeaderReady(); + } + + private void waitUntilCoordinatorLeaderReady() throws Exception { + CoordinatorAddress currentLeaderAddress = + zookeeperClient.getCoordinatorLeaderAddress().get(); + CoordinatorServer currentLeader = findServerById(currentLeaderAddress.getId()); + // we need to wait some time for the coordinator leader start services after election + waitUntil( + () -> currentLeader.getCoordinatorService().isLeader(), + Duration.ofSeconds(30), + "Coordinator leader did not recognize itself as leader"); } private CoordinatorServer findServerById(String serverId) { @@ -355,7 +503,7 @@ private void killZkSession(CoordinatorServer server) throws Exception { } /** Waits until a new coordinator leader is elected that is different from the given old one. */ - private void waitUntilNewLeaderElected(String oldLeaderId) { + private void waitUntilNewLeaderElected(String oldLeaderId) throws Exception { waitUntil( () -> { try { @@ -369,6 +517,7 @@ private void waitUntilNewLeaderElected(String oldLeaderId) { }, Duration.ofMinutes(1), "Fail to wait for new coordinator leader to be elected"); + waitUntilCoordinatorLeaderReady(); } /** Verifies that the given server can process requests as a leader. */ diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorServerElectionTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorServerElectionTest.java index cf832a9555..e771823934 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorServerElectionTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorServerElectionTest.java @@ -83,6 +83,8 @@ void testCoordinatorServerElection() throws Exception { } } assertThat(firstLeader).isNotNull(); + assertThat(zookeeperClient.getCurrentEpoch().getCoordinatorEpoch()) + .isEqualTo(CoordinatorContext.INITIAL_COORDINATOR_EPOCH); firstLeader.close(); firstLeader.start(); @@ -91,6 +93,8 @@ void testCoordinatorServerElection() throws Exception { CoordinatorAddress secondLeaderAddress = zookeeperClient.getCoordinatorLeaderAddress().get(); assertThat(secondLeaderAddress).isNotEqualTo(firstLeaderAddress); + assertThat(zookeeperClient.getCurrentEpoch().getCoordinatorEpoch()) + .isEqualTo(CoordinatorContext.INITIAL_COORDINATOR_EPOCH + 1); CoordinatorServer secondLeader = null; for (CoordinatorServer coordinatorServer : coordinatorServerList) { @@ -117,6 +121,8 @@ void testCoordinatorServerElection() throws Exception { CoordinatorAddress thirdLeaderAddress = zookeeperClient.getCoordinatorLeaderAddress().get(); assertThat(thirdLeaderAddress.getId()).isEqualTo(firstLeaderAddress.getId()); + assertThat(zookeeperClient.getCurrentEpoch().getCoordinatorEpoch()) + .isEqualTo(CoordinatorContext.INITIAL_COORDINATOR_EPOCH + 2); } /** Create a configuration with Zookeeper address setting. */ diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorServerITCase.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorServerITCase.java index ae39bd185f..bd060d6eed 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorServerITCase.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/CoordinatorServerITCase.java @@ -63,7 +63,6 @@ protected Configuration getServerConfig() { ConfigOptions.BIND_LISTENERS, String.format("%s://%s:%d", DEFAULT_LISTENER_NAME, HOSTNAME, getPort())); conf.set(ConfigOptions.REMOTE_DATA_DIR, "/tmp/fluss/remote-data"); - return conf; } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java index 9f6c130640..b2abac8307 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerITCase.java @@ -514,6 +514,7 @@ void testMetadata(boolean isCoordinatorServer) throws Exception { .updateMetadata( makeUpdateMetadataRequest( coordinatorServerInfo, + null, new HashSet<>(tabletServerInfos), Collections.emptyList(), Collections.emptyList())) @@ -652,6 +653,10 @@ void testMetadataCompatibility(boolean isCoordinatorServer) throws Exception { .updateMetadata( makeLegacyUpdateMetadataRequest( Optional.of(coordinatorServerInfo), + FLUSS_CLUSTER_EXTENSION + .getCoordinatorServer() + .getCoordinatorEventProcessor() + .getCoordinatorEpoch(), new HashSet<>(tabletServerInfos))) .get(); } @@ -873,7 +878,9 @@ private static Schema newPkSchema() { @SuppressWarnings("OptionalUsedAsFieldOrParameterType") private UpdateMetadataRequest makeLegacyUpdateMetadataRequest( - Optional coordinatorServer, Set aliveTableServers) { + Optional coordinatorServer, + int coordinatorEpoch, + Set aliveTableServers) { UpdateMetadataRequest updateMetadataRequest = new UpdateMetadataRequest(); Set aliveTableServerNodes = new HashSet<>(); for (ServerInfo serverInfo : aliveTableServers) { @@ -895,6 +902,7 @@ private UpdateMetadataRequest makeLegacyUpdateMetadataRequest( node -> { Endpoint endpoint = node.endpoints().get(0); updateMetadataRequest + .setCoordinatorEpoch(coordinatorEpoch) .setCoordinatorServer() .setNodeId(node.id()) .setHost(endpoint.getHost()) diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerTest.java index 024b12b336..f5357c549f 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TableManagerTest.java @@ -31,6 +31,7 @@ import org.apache.fluss.server.entity.DeleteReplicaResultForBucket; import org.apache.fluss.server.metadata.ServerInfo; import org.apache.fluss.server.zk.NOPErrorHandler; +import org.apache.fluss.server.zk.ZkEpoch; import org.apache.fluss.server.zk.ZooKeeperClient; import org.apache.fluss.server.zk.ZooKeeperExtension; import org.apache.fluss.server.zk.data.BucketAssignment; @@ -78,6 +79,7 @@ class TableManagerTest { new AllCallbackWrapper<>(new ZooKeeperExtension()); private static ZooKeeperClient zookeeperClient; + private static ZkEpoch zkEpoch; private static ExecutorService ioExecutor; private CoordinatorContext coordinatorContext; @@ -86,11 +88,12 @@ class TableManagerTest { private TestCoordinatorChannelManager testCoordinatorChannelManager; @BeforeAll - static void baseBeforeAll() { + static void baseBeforeAll() throws Exception { zookeeperClient = ZOO_KEEPER_EXTENSION_WRAPPER .getCustomExtension() .getZooKeeperClient(NOPErrorHandler.INSTANCE); + zkEpoch = zookeeperClient.fenceBecomeCoordinatorLeader("1"); ioExecutor = Executors.newFixedThreadPool(1); } @@ -113,7 +116,7 @@ static void afterAll() { private void initTableManager() { testingEventManager = new TestingEventManager(); - coordinatorContext = new CoordinatorContext(); + coordinatorContext = new CoordinatorContext(zkEpoch); testCoordinatorChannelManager = new TestCoordinatorChannelManager(); Configuration conf = new Configuration(); conf.setString(ConfigOptions.REMOTE_DATA_DIR, "/tmp/fluss/remote-data"); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManagerTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManagerTest.java index a961dc393e..6ae36b8dee 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManagerTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/rebalance/RebalanceManagerTest.java @@ -30,6 +30,7 @@ import org.apache.fluss.server.metadata.CoordinatorMetadataCache; import org.apache.fluss.server.metrics.group.TestingMetricGroups; import org.apache.fluss.server.zk.NOPErrorHandler; +import org.apache.fluss.server.zk.ZkEpoch; import org.apache.fluss.server.zk.ZooKeeperClient; import org.apache.fluss.server.zk.ZooKeeperExtension; import org.apache.fluss.server.zk.data.RebalanceTask; @@ -60,6 +61,7 @@ public class RebalanceManagerTest { private static ZooKeeperClient zookeeperClient; private static MetadataManager metadataManager; + private static ZkEpoch zkEpoch; private CoordinatorMetadataCache serverMetadataCache; private TestCoordinatorChannelManager testCoordinatorChannelManager; @@ -69,11 +71,12 @@ public class RebalanceManagerTest { private KvSnapshotLeaseManager kvSnapshotLeaseManager; @BeforeAll - static void baseBeforeAll() { + static void baseBeforeAll() throws Exception { zookeeperClient = ZOO_KEEPER_EXTENSION_WRAPPER .getCustomExtension() .getZooKeeperClient(NOPErrorHandler.INSTANCE); + zkEpoch = zookeeperClient.fenceBecomeCoordinatorLeader("1"); } @BeforeEach @@ -136,7 +139,7 @@ private CoordinatorEventProcessor buildCoordinatorEventProcessor() { zookeeperClient, serverMetadataCache, testCoordinatorChannelManager, - new CoordinatorContext(), + new CoordinatorContext(zkEpoch), autoPartitionManager, lakeTableTieringManager, TestingMetricGroups.COORDINATOR_METRICS, diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachineTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachineTest.java index c7c7591d5a..f1ad2fe1c1 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachineTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/ReplicaStateMachineTest.java @@ -34,9 +34,11 @@ import org.apache.fluss.server.entity.DeleteReplicaResultForBucket; import org.apache.fluss.server.metadata.ServerInfo; import org.apache.fluss.server.zk.NOPErrorHandler; +import org.apache.fluss.server.zk.ZkEpoch; import org.apache.fluss.server.zk.ZooKeeperClient; import org.apache.fluss.server.zk.ZooKeeperExtension; import org.apache.fluss.server.zk.data.LeaderAndIsr; +import org.apache.fluss.server.zk.data.ZkVersion; import org.apache.fluss.testutils.common.AllCallbackWrapper; import org.junit.jupiter.api.BeforeAll; @@ -69,18 +71,20 @@ class ReplicaStateMachineTest { new AllCallbackWrapper<>(new ZooKeeperExtension()); private static ZooKeeperClient zookeeperClient; + private static ZkEpoch zkEpoch; @BeforeAll - static void baseBeforeAll() { + static void baseBeforeAll() throws Exception { zookeeperClient = ZOO_KEEPER_EXTENSION_WRAPPER .getCustomExtension() .getZooKeeperClient(NOPErrorHandler.INSTANCE); + zkEpoch = zookeeperClient.fenceBecomeCoordinatorLeader("1"); } @Test void testStartup() { - CoordinatorContext coordinatorContext = new CoordinatorContext(); + CoordinatorContext coordinatorContext = new CoordinatorContext(zkEpoch); // init coordinator server context with a table assignment TableBucket tableBucket = new TableBucket(1, 0); @@ -105,7 +109,7 @@ void testStartup() { @Test void testReplicaStateChange() { - CoordinatorContext coordinatorContext = new CoordinatorContext(); + CoordinatorContext coordinatorContext = new CoordinatorContext(zkEpoch); ReplicaStateMachine replicaStateMachine = createReplicaStateMachine(coordinatorContext); // test check valid replica state change @@ -134,7 +138,7 @@ void testReplicaStateChange() { @Test void testDeleteReplicaStateChange() { Map isReplicaDeleteSuccess = new HashMap<>(); - CoordinatorContext coordinatorContext = new CoordinatorContext(); + CoordinatorContext coordinatorContext = new CoordinatorContext(zkEpoch); coordinatorContext.setLiveTabletServers( CoordinatorTestUtils.createServers(Arrays.asList(0, 1))); // use a context that will return a gateway that always get success ack @@ -165,7 +169,7 @@ void testDeleteReplicaStateChange() { } // now, we change a context that some gateway will return exception - coordinatorContext = new CoordinatorContext(); + coordinatorContext = new CoordinatorContext(zkEpoch); coordinatorContext.setLiveTabletServers( CoordinatorTestUtils.createServers(Arrays.asList(0, 1))); coordinatorContext.putBucketLeaderAndIsr(tableBucket1, new LeaderAndIsr(0, 0)); @@ -191,7 +195,7 @@ void testDeleteReplicaStateChange() { @Test void testOfflineReplicasShouldBeRemovedFromIsr() throws Exception { - CoordinatorContext coordinatorContext = new CoordinatorContext(); + CoordinatorContext coordinatorContext = new CoordinatorContext(zkEpoch); coordinatorContext.setLiveTabletServers(createServers(new int[] {0, 1, 2})); ReplicaStateMachine replicaStateMachine = createReplicaStateMachine(coordinatorContext); @@ -216,7 +220,8 @@ void testOfflineReplicasShouldBeRemovedFromIsr() throws Exception { } // put leader and isr LeaderAndIsr leaderAndIsr = new LeaderAndIsr(0, 0, Arrays.asList(0, 1, 2), 0, 0); - zookeeperClient.registerLeaderAndIsr(tableBucket, leaderAndIsr); + zookeeperClient.registerLeaderAndIsr( + tableBucket, leaderAndIsr, ZkVersion.MATCH_ANY_VERSION.getVersion()); coordinatorContext.updateBucketReplicaAssignment(tableBucket, Arrays.asList(0, 1, 2)); coordinatorContext.putBucketLeaderAndIsr(tableBucket, leaderAndIsr); @@ -230,7 +235,7 @@ void testOfflineReplicasShouldBeRemovedFromIsr() throws Exception { @Test void testOfflineReplicaShouldBeRemovedFromIsr() throws Exception { - CoordinatorContext coordinatorContext = new CoordinatorContext(); + CoordinatorContext coordinatorContext = new CoordinatorContext(zkEpoch); coordinatorContext.setLiveTabletServers(createServers(new int[] {0, 1, 2})); ReplicaStateMachine replicaStateMachine = createReplicaStateMachine(coordinatorContext); @@ -253,7 +258,8 @@ void testOfflineReplicaShouldBeRemovedFromIsr() throws Exception { } // put leader and isr LeaderAndIsr leaderAndIsr = new LeaderAndIsr(0, 0, Arrays.asList(0, 1, 2), 0, 0); - zookeeperClient.registerLeaderAndIsr(tableBucket, leaderAndIsr); + zookeeperClient.registerLeaderAndIsr( + tableBucket, leaderAndIsr, ZkVersion.MATCH_ANY_VERSION.getVersion()); coordinatorContext.updateBucketReplicaAssignment(tableBucket, Arrays.asList(0, 1, 2)); coordinatorContext.putBucketLeaderAndIsr(tableBucket, leaderAndIsr); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachineTest.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachineTest.java index bbac45c6b3..2ea3718ffb 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachineTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/statemachine/TableBucketStateMachineTest.java @@ -41,9 +41,11 @@ import org.apache.fluss.server.metadata.CoordinatorMetadataCache; import org.apache.fluss.server.metrics.group.TestingMetricGroups; import org.apache.fluss.server.zk.NOPErrorHandler; +import org.apache.fluss.server.zk.ZkEpoch; import org.apache.fluss.server.zk.ZooKeeperClient; import org.apache.fluss.server.zk.ZooKeeperExtension; import org.apache.fluss.server.zk.data.LeaderAndIsr; +import org.apache.fluss.server.zk.data.ZkVersion; import org.apache.fluss.shaded.guava32.com.google.common.collect.Sets; import org.apache.fluss.testutils.common.AllCallbackWrapper; import org.apache.fluss.utils.clock.SystemClock; @@ -54,7 +56,6 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; -import java.io.IOException; import java.time.Duration; import java.util.Arrays; import java.util.Collections; @@ -84,6 +85,8 @@ class TableBucketStateMachineTest { private static ZooKeeperClient zookeeperClient; private static CoordinatorContext coordinatorContext; + private static ZkEpoch zkEpoch; + private TestCoordinatorChannelManager testCoordinatorChannelManager; private CoordinatorRequestBatch coordinatorRequestBatch; private AutoPartitionManager autoPartitionManager; @@ -92,20 +95,21 @@ class TableBucketStateMachineTest { private KvSnapshotLeaseManager kvSnapshotLeaseManager; @BeforeAll - static void baseBeforeAll() { + static void baseBeforeAll() throws Exception { zookeeperClient = ZOO_KEEPER_EXTENSION_WRAPPER .getCustomExtension() .getZooKeeperClient(NOPErrorHandler.INSTANCE); + zkEpoch = zookeeperClient.fenceBecomeCoordinatorLeader("1"); } @BeforeEach - void beforeEach() throws IOException { + void beforeEach() { Configuration conf = new Configuration(); conf.setString(ConfigOptions.COORDINATOR_HOST, "localhost"); String remoteDir = "/tmp/fluss/remote-data"; conf.setString(ConfigOptions.REMOTE_DATA_DIR, remoteDir); - coordinatorContext = new CoordinatorContext(); + coordinatorContext = new CoordinatorContext(zkEpoch); testCoordinatorChannelManager = new TestCoordinatorChannelManager(); coordinatorRequestBatch = new CoordinatorRequestBatch( @@ -157,9 +161,13 @@ void testStartup() throws Exception { // create LeaderAndIsr for t10/t11 info in zk, zookeeperClient.registerLeaderAndIsr( - new TableBucket(t1Id, 0), new LeaderAndIsr(0, 0, Arrays.asList(0, 1), 0, 0)); + new TableBucket(t1Id, 0), + new LeaderAndIsr(0, 0, Arrays.asList(0, 1), 0, 0), + ZkVersion.MATCH_ANY_VERSION.getVersion()); zookeeperClient.registerLeaderAndIsr( - new TableBucket(t1Id, 1), new LeaderAndIsr(2, 0, Arrays.asList(2, 3), 0, 0)); + new TableBucket(t1Id, 1), + new LeaderAndIsr(2, 0, Arrays.asList(2, 3), 0, 0), + ZkVersion.MATCH_ANY_VERSION.getVersion()); // update the LeaderAndIsr to context coordinatorContext.putBucketLeaderAndIsr( t1b0, zookeeperClient.getLeaderAndIsr(new TableBucket(t1Id, 0)).get()); diff --git a/fluss-server/src/test/java/org/apache/fluss/server/metadata/ZkBasedMetadataProviderTest.java b/fluss-server/src/test/java/org/apache/fluss/server/metadata/ZkBasedMetadataProviderTest.java index aac6f19ed4..5597bb01b1 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/metadata/ZkBasedMetadataProviderTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/metadata/ZkBasedMetadataProviderTest.java @@ -36,6 +36,7 @@ import org.apache.fluss.server.zk.data.PartitionAssignment; import org.apache.fluss.server.zk.data.TableAssignment; import org.apache.fluss.server.zk.data.TableRegistration; +import org.apache.fluss.server.zk.data.ZkVersion; import org.apache.fluss.testutils.common.AllCallbackWrapper; import org.junit.jupiter.api.AfterAll; @@ -110,8 +111,10 @@ void testGetTableMetadataFromZk() throws Exception { LeaderAndIsr leaderAndIsr0 = new LeaderAndIsr(1, 10, Arrays.asList(1, 2, 3), 100, 1000); LeaderAndIsr leaderAndIsr1 = new LeaderAndIsr(2, 20, Arrays.asList(2, 3, 4), 200, 2000); - zookeeperClient.registerLeaderAndIsr(tableBucket0, leaderAndIsr0); - zookeeperClient.registerLeaderAndIsr(tableBucket1, leaderAndIsr1); + zookeeperClient.registerLeaderAndIsr( + tableBucket0, leaderAndIsr0, ZkVersion.MATCH_ANY_VERSION.getVersion()); + zookeeperClient.registerLeaderAndIsr( + tableBucket1, leaderAndIsr1, ZkVersion.MATCH_ANY_VERSION.getVersion()); List tablesMetadataFromZK = metadataProvider.getTablesMetadataFromZK( @@ -176,8 +179,10 @@ void testGetPartitionMetadataFromZk() throws Exception { LeaderAndIsr leaderAndIsr0 = new LeaderAndIsr(1, 10, Arrays.asList(1, 2), 100, 1000); LeaderAndIsr leaderAndIsr1 = new LeaderAndIsr(2, 20, Arrays.asList(2, 3), 200, 2000); - zookeeperClient.registerLeaderAndIsr(partitionBucket0, leaderAndIsr0); - zookeeperClient.registerLeaderAndIsr(partitionBucket1, leaderAndIsr1); + zookeeperClient.registerLeaderAndIsr( + partitionBucket0, leaderAndIsr0, ZkVersion.MATCH_ANY_VERSION.getVersion()); + zookeeperClient.registerLeaderAndIsr( + partitionBucket1, leaderAndIsr1, ZkVersion.MATCH_ANY_VERSION.getVersion()); // Test getPartitionMetadataFromZkAsync PhysicalTablePath partitionPath = PhysicalTablePath.of(tablePath, partitionName); @@ -275,11 +280,17 @@ void testBatchGetPartitionMetadataFromZkAsync() throws Exception { TableBucket bucket3 = new TableBucket(tableId2, partitionId3, 0); zookeeperClient.registerLeaderAndIsr( - bucket1, new LeaderAndIsr(1, 10, Arrays.asList(1, 2), 100, 1000)); + bucket1, + new LeaderAndIsr(1, 10, Arrays.asList(1, 2), 100, 1000), + ZkVersion.MATCH_ANY_VERSION.getVersion()); zookeeperClient.registerLeaderAndIsr( - bucket2, new LeaderAndIsr(2, 20, Arrays.asList(2, 3), 200, 2000)); + bucket2, + new LeaderAndIsr(2, 20, Arrays.asList(2, 3), 200, 2000), + ZkVersion.MATCH_ANY_VERSION.getVersion()); zookeeperClient.registerLeaderAndIsr( - bucket3, new LeaderAndIsr(1, 30, Arrays.asList(1, 3), 300, 3000)); + bucket3, + new LeaderAndIsr(1, 30, Arrays.asList(1, 3), 300, 3000), + ZkVersion.MATCH_ANY_VERSION.getVersion()); // Test getPartitionsMetadataFromZK List partitionPaths = diff --git a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServiceITCase.java b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServiceITCase.java index 1137e499a2..7bdd781072 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServiceITCase.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TabletServiceITCase.java @@ -890,6 +890,7 @@ void testBecomeLeaderOrFollowerWithOneTabletServerOffline() throws Exception { .updateMetadata( makeUpdateMetadataRequest( coordinatorServerInfo, + null, newTabletServerInfos, Collections.emptyList(), Collections.emptyList())) diff --git a/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java b/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java index 9498419e0e..74a0415963 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/testutils/FlussClusterExtension.java @@ -942,11 +942,16 @@ public CoordinatorServer getCoordinatorServer() { return coordinatorServer; } - public void waitUntilCoordinatorServerElected() throws Exception { + private void waitUntilCoordinatorServerElected() { waitUntil( () -> zooKeeperClient.getCoordinatorLeaderAddress().isPresent(), Duration.ofSeconds(10), "Fail to wait coordinator server elected"); + // we need to wait some time for the coordinator leader start services after election + waitUntil( + () -> coordinatorServer.getCoordinatorService().isLeader(), + Duration.ofSeconds(10), + "Coordinator leader did not recognize itself as leader"); } // -------------------------------------------------------------------------------------------- diff --git a/fluss-server/src/test/java/org/apache/fluss/server/zk/ZooKeeperClientTest.java b/fluss-server/src/test/java/org/apache/fluss/server/zk/ZooKeeperClientTest.java index c867b8dce0..16f59ddd86 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/zk/ZooKeeperClientTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/zk/ZooKeeperClientTest.java @@ -55,6 +55,7 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; import org.junit.jupiter.params.ParameterizedTest; @@ -85,6 +86,7 @@ class ZooKeeperClientTest { private static ZooKeeperClient zookeeperClient; private static String remoteDataDir; + private static ZkEpoch zkEpoch; @BeforeAll static void beforeAll() { @@ -95,6 +97,12 @@ static void beforeAll() { remoteDataDir = zookeeperClient.getDefaultRemoteDataDir(); } + @BeforeEach + void beforeEach() throws Exception { + // create epoch node in beforeEach(), because it will always be cleaned up in afterEach() + zkEpoch = zookeeperClient.fenceBecomeCoordinatorLeader("tmp"); + } + @AfterEach void afterEach() { ZOO_KEEPER_EXTENSION_WRAPPER.getCustomExtension().cleanupRoot(); @@ -176,7 +184,8 @@ void testTabletAssignments() throws Exception { // test update TableAssignment tableAssignment3 = TableAssignment.builder().add(3, BucketAssignment.of(1, 5)).build(); - zookeeperClient.updateTableAssignment(tableId1, tableAssignment3); + zookeeperClient.updateTableAssignment( + tableId1, tableAssignment3, zkEpoch.getCoordinatorEpochZkVersion()); assertThat(zookeeperClient.getTableAssignment(tableId1)).contains(tableAssignment3); // test delete @@ -193,19 +202,22 @@ void testLeaderAndIsr() throws Exception { assertThat(zookeeperClient.getLeaderAndIsr(tableBucket2)).isEmpty(); // try to register bucket leaderAndIsr - LeaderAndIsr leaderAndIsr1 = new LeaderAndIsr(1, 10, Arrays.asList(1, 2, 3), 100, 1000); - LeaderAndIsr leaderAndIsr2 = new LeaderAndIsr(2, 10, Arrays.asList(4, 5, 6), 100, 1000); + LeaderAndIsr leaderAndIsr1 = new LeaderAndIsr(1, 10, Arrays.asList(1, 2, 3), 0, 1000); + LeaderAndIsr leaderAndIsr2 = new LeaderAndIsr(2, 10, Arrays.asList(4, 5, 6), 0, 1000); - zookeeperClient.registerLeaderAndIsr(tableBucket1, leaderAndIsr1); - zookeeperClient.registerLeaderAndIsr(tableBucket2, leaderAndIsr2); + zookeeperClient.registerLeaderAndIsr( + tableBucket1, leaderAndIsr1, zkEpoch.getCoordinatorEpochZkVersion()); + zookeeperClient.registerLeaderAndIsr( + tableBucket2, leaderAndIsr2, zkEpoch.getCoordinatorEpochZkVersion()); assertThat(zookeeperClient.getLeaderAndIsr(tableBucket1)).hasValue(leaderAndIsr1); assertThat(zookeeperClient.getLeaderAndIsr(tableBucket2)).hasValue(leaderAndIsr2); assertThat(zookeeperClient.getLeaderAndIsrs(Arrays.asList(tableBucket1, tableBucket2))) .containsValues(leaderAndIsr1, leaderAndIsr2); // test update - leaderAndIsr1 = new LeaderAndIsr(2, 20, Collections.emptyList(), 200, 2000); - zookeeperClient.updateLeaderAndIsr(tableBucket1, leaderAndIsr1); + leaderAndIsr1 = new LeaderAndIsr(2, 20, Collections.emptyList(), 0, 2000); + zookeeperClient.updateLeaderAndIsr( + tableBucket1, leaderAndIsr1, zkEpoch.getCoordinatorEpochZkVersion()); assertThat(zookeeperClient.getLeaderAndIsr(tableBucket1)).hasValue(leaderAndIsr1); // test delete @@ -222,7 +234,7 @@ void testBatchCreateAndUpdateLeaderAndIsr(boolean isPartitionTable) throws Excep TableBucket tableBucket = isPartitionTable ? new TableBucket(1, 2L, i) : new TableBucket(1, i); LeaderAndIsr leaderAndIsr = - new LeaderAndIsr(i, 10, Arrays.asList(i + 1, i + 2, i + 3), 100, 1000); + new LeaderAndIsr(i, 10, Arrays.asList(i + 1, i + 2, i + 3), 0, 1000); leaderAndIsrList.add(leaderAndIsr); RegisterTableBucketLeadAndIsrInfo info = isPartitionTable @@ -233,7 +245,8 @@ void testBatchCreateAndUpdateLeaderAndIsr(boolean isPartitionTable) throws Excep tableBucketInfo.add(info); } // batch create - zookeeperClient.batchRegisterLeaderAndIsrForTablePartition(tableBucketInfo); + zookeeperClient.batchRegisterLeaderAndIsrForTablePartition( + tableBucketInfo, zkEpoch.getCoordinatorEpochZkVersion()); for (int i = 0; i < 100; i++) { // each should register successful @@ -263,7 +276,7 @@ void testBatchCreateAndUpdateLeaderAndIsr(boolean isPartitionTable) throws Excep entry.setValue(adjustLeaderAndIsr); }); // batch update - zookeeperClient.batchUpdateLeaderAndIsr(updateMap); + zookeeperClient.batchUpdateLeaderAndIsr(updateMap, zkEpoch.getCoordinatorEpochZkVersion()); for (int i = 0; i < 100; i++) { // each should update successful Optional optionalLeaderAndIsr = @@ -282,9 +295,10 @@ void testBatchUpdateLeaderAndIsr() throws Exception { for (int i = 0; i < totalCount; i++) { TableBucket tableBucket = new TableBucket(1, i); LeaderAndIsr leaderAndIsr = - new LeaderAndIsr(i, 10, Arrays.asList(i + 1, i + 2, i + 3), 100, 1000); + new LeaderAndIsr(i, 10, Arrays.asList(i + 1, i + 2, i + 3), 0, 1000); leaderAndIsrList.put(tableBucket, leaderAndIsr); - zookeeperClient.registerLeaderAndIsr(tableBucket, leaderAndIsr); + zookeeperClient.registerLeaderAndIsr( + tableBucket, leaderAndIsr, zkEpoch.getCoordinatorEpochZkVersion()); } // try to batch update @@ -299,10 +313,11 @@ void testBatchUpdateLeaderAndIsr() throws Exception { old.leader() + 1, old.leaderEpoch() + 1, old.isr(), - old.coordinatorEpoch() + 1, + old.coordinatorEpoch(), old.bucketEpoch() + 1); })); - zookeeperClient.batchUpdateLeaderAndIsr(updateLeaderAndIsrList); + zookeeperClient.batchUpdateLeaderAndIsr( + updateLeaderAndIsrList, zkEpoch.getCoordinatorEpochZkVersion()); for (Map.Entry entry : updateLeaderAndIsrList.entrySet()) { TableBucket tableBucket = entry.getKey(); LeaderAndIsr leaderAndIsr = entry.getValue();