Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.exception;

/** Exception thrown when the Coordinator leader epoch is fenced. */
public class CoordinatorEpochFencedException extends RuntimeException {
public CoordinatorEpochFencedException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.fluss.server.coordinator.statemachine.BucketState;
import org.apache.fluss.server.coordinator.statemachine.ReplicaState;
import org.apache.fluss.server.metadata.ServerInfo;
import org.apache.fluss.server.zk.ZkEpoch;
import org.apache.fluss.server.zk.data.LeaderAndIsr;
import org.apache.fluss.utils.types.Tuple2;

Expand Down Expand Up @@ -55,6 +56,7 @@ public class CoordinatorContext {
private static final Logger LOG = LoggerFactory.getLogger(CoordinatorContext.class);

public static final int INITIAL_COORDINATOR_EPOCH = 0;
public static final int INITIAL_COORDINATOR_EPOCH_ZK_VERSION = 0;

// for simplicity, we just use retry time, may consider make it a configurable value
// and use combine retry times and retry delay
Expand Down Expand Up @@ -107,15 +109,33 @@ public class CoordinatorContext {
/** A mapping from tabletServers to server tag. */
private final Map<Integer, ServerTag> serverTags = new HashMap<>();

/**
* The epoch of the coordinator, which will be incremented whenever the coordinator is elected
* or re-elected.
*/
private final int coordinatorEpoch;

/**
* The coordinator epoch zk version is the zk version when the coordinator epoch is updated in
* Zookeeper.
*/
private final int coordinatorEpochZkVersion;

private ServerInfo coordinatorServerInfo = null;
private int coordinatorEpoch = INITIAL_COORDINATOR_EPOCH;

public CoordinatorContext() {}
public CoordinatorContext(ZkEpoch zkEpoch) {
this.coordinatorEpoch = zkEpoch.getCoordinatorEpoch();
this.coordinatorEpochZkVersion = zkEpoch.getCoordinatorEpochZkVersion();
}

public int getCoordinatorEpoch() {
return coordinatorEpoch;
}

public int getCoordinatorZkVersion() {
return coordinatorEpochZkVersion;
}

public Set<String> getLiveCoordinatorServers() {
return liveCoordinatorServers;
}
Expand Down Expand Up @@ -708,12 +728,11 @@ private void clearTablesState() {

public void resetContext() {
tablesToBeDeleted.clear();
coordinatorEpoch = 0;
clearTablesState();
liveTabletServers.clear();
liveCoordinatorServers.clear();
shuttingDownTabletServers.clear();
serverTags.clear();
liveCoordinatorServers.clear();
}

public int getTotalPartitionCount() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ public void shutdown() {
coordinatorEventManager.close();
rebalanceManager.close();
onShutdown();
coordinatorContext.resetContext();
}

private ServerInfo getCoordinatorServerInfo() {
Expand Down Expand Up @@ -1618,7 +1619,10 @@ private void updateReplicaAssignmentForBucket(
tableAssignment.forEach(
(bucket, replicas) ->
newTableAssignment.put(bucket, new BucketAssignment(replicas)));
zooKeeperClient.updateTableAssignment(tableId, new TableAssignment(newTableAssignment));
zooKeeperClient.updateTableAssignment(
tableId,
new TableAssignment(newTableAssignment),
coordinatorContext.getCoordinatorZkVersion());
} else {
Map<Integer, List<Integer>> partitionAssignment =
coordinatorContext.getPartitionAssignment(
Expand All @@ -1629,7 +1633,9 @@ private void updateReplicaAssignmentForBucket(
(bucket, replicas) ->
newPartitionAssignment.put(bucket, new BucketAssignment(replicas)));
zooKeeperClient.updatePartitionAssignment(
partitionId, new PartitionAssignment(tableId, newPartitionAssignment));
partitionId,
new PartitionAssignment(tableId, newPartitionAssignment),
coordinatorContext.getCoordinatorZkVersion());
}
}

Expand Down Expand Up @@ -1675,7 +1681,8 @@ private List<AdjustIsrResultForBucket> tryProcessAdjustIsr(
}

try {
zooKeeperClient.batchUpdateLeaderAndIsr(newLeaderAndIsrList);
zooKeeperClient.batchUpdateLeaderAndIsr(
newLeaderAndIsrList, coordinatorContext.getCoordinatorZkVersion());
newLeaderAndIsrList.forEach(
(tableBucket, newLeaderAndIsr) ->
result.add(new AdjustIsrResultForBucket(tableBucket, newLeaderAndIsr)));
Expand All @@ -1686,7 +1693,10 @@ private List<AdjustIsrResultForBucket> tryProcessAdjustIsr(
TableBucket tableBucket = entry.getKey();
LeaderAndIsr newLeaderAndIsr = entry.getValue();
try {
zooKeeperClient.updateLeaderAndIsr(tableBucket, newLeaderAndIsr);
zooKeeperClient.updateLeaderAndIsr(
tableBucket,
newLeaderAndIsr,
coordinatorContext.getCoordinatorZkVersion());
} catch (Exception e) {
LOG.error("Error when register leader and isr.", e);
result.add(
Expand Down Expand Up @@ -2212,7 +2222,8 @@ private void updateBucketEpochAndSendRequest(TableBucket tableBucket, List<Integ
LeaderAndIsr newLeaderAndIsr = leaderAndIsr.newLeaderAndIsr(leaderAndIsr.isr());

coordinatorContext.putBucketLeaderAndIsr(tableBucket, newLeaderAndIsr);
zooKeeperClient.updateLeaderAndIsr(tableBucket, newLeaderAndIsr);
zooKeeperClient.updateLeaderAndIsr(
tableBucket, newLeaderAndIsr, coordinatorContext.getCoordinatorZkVersion());

coordinatorRequestBatch.newBatch();
coordinatorRequestBatch.addNotifyLeaderRequestForTabletServers(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.fluss.server.coordinator;

import org.apache.fluss.exception.CoordinatorEpochFencedException;
import org.apache.fluss.server.zk.ZooKeeperClient;
import org.apache.fluss.server.zk.data.ZkData;
import org.apache.fluss.shaded.curator5.org.apache.curator.framework.recipes.leader.LeaderLatch;
Expand Down Expand Up @@ -106,6 +107,7 @@ public void isLeader() {
CompletableFuture<Void> cleanup = pendingCleanup.get();
// Run init on a separate thread to avoid deadlock with
// Curator's EventThread when performing ZK operations.

leaderCallbackExecutor.execute(
() -> {
// Wait for any pending cleanup to finish first.
Expand All @@ -123,16 +125,22 @@ public void isLeader() {
}
try {
initLeaderServices.run();
// Set leader flag after init completes, so when zk found
// this leader, the leader can accept requests
isLeader.set(true);
} catch (CoordinatorEpochFencedException e) {
LOG.warn(
"Coordinator server {} has been fenced and not become leader successfully.",
serverId);
notLeader();
} catch (Exception e) {
LOG.error(
"Failed to initialize leader services for server {}",
serverId,
e);
notLeader();
}
});
// Set leader flag before init completes, so when zk found this leader, the
// leader can accept requests
isLeader.set(true);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -683,6 +683,7 @@ private UpdateMetadataRequest buildUpdateMetadataRequest() {
// tablet servers.
return makeUpdateMetadataRequest(
coordinatorContext.getCoordinatorServerInfo(),
coordinatorContext.getCoordinatorEpoch(),
new HashSet<>(coordinatorContext.getLiveTabletServers().values()),
tableMetadataList,
partitionMetadataList);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.fluss.server.metrics.ServerMetricUtils;
import org.apache.fluss.server.metrics.group.CoordinatorMetricGroup;
import org.apache.fluss.server.metrics.group.LakeTieringMetricGroup;
import org.apache.fluss.server.zk.ZkEpoch;
import org.apache.fluss.server.zk.ZooKeeperClient;
import org.apache.fluss.server.zk.ZooKeeperUtils;
import org.apache.fluss.server.zk.data.CoordinatorAddress;
Expand Down Expand Up @@ -140,9 +141,6 @@ public class CoordinatorServer extends ServerBase {
@Nullable
private Authorizer authorizer;

@GuardedBy("lock")
private CoordinatorContext coordinatorContext;

@GuardedBy("lock")
private DynamicConfigManager dynamicConfigManager;

Expand Down Expand Up @@ -232,7 +230,6 @@ protected void initCoordinatorStandby() throws Exception {

dynamicConfigManager.startup();

this.coordinatorContext = new CoordinatorContext();
this.metadataCache = new CoordinatorMetadataCache();

this.authorizer = AuthorizerLoader.createAuthorizer(conf, zkClient, pluginManager);
Expand Down Expand Up @@ -294,6 +291,9 @@ protected void initCoordinatorStandby() throws Exception {
}

protected void initCoordinatorLeader() throws Exception {
// to avoid split-brain
ZkEpoch zkEpoch = zkClient.fenceBecomeCoordinatorLeader(serverId);
registerCoordinatorLeader();

synchronized (lock) {
this.clientMetricGroup = new ClientMetricGroup(metricRegistry, SERVER_NAME);
Expand All @@ -305,12 +305,11 @@ protected void initCoordinatorLeader() throws Exception {
new AutoPartitionManager(metadataCache, metadataManager, conf);
autoPartitionManager.start();

registerCoordinatorLeader();
// start coordinator event processor after we register coordinator leader to zk
// so that the event processor can get the coordinator leader node from zk during start
// up.
// in HA for coordinator server, the processor also need to know the leader node during
// start up
// so that the event processor can get the coordinator leader node from zk during
// start up. In HA for coordinator server, the processor also need to know the leader
// node during start up
CoordinatorContext coordinatorContext = new CoordinatorContext(zkEpoch);
this.coordinatorEventProcessor =
new CoordinatorEventProcessor(
zkClient,
Expand All @@ -337,10 +336,20 @@ protected void initCoordinatorLeader() throws Exception {
* from leader to standby. It cleans up leader-only resources while keeping the server running
* as a standby, ready to participate in future elections.
*/
protected void cleanupCoordinatorLeader() throws Exception {
protected void cleanupCoordinatorLeader() {
synchronized (lock) {
LOG.info("Cleaning up coordinator leader services.");

try {
// make sure the current coordinator leader node is unregistered.
// Different from ZK disconnection,
// when we actively release the Leader's election,
// we need to manually delete the node
unregisterCoordinatorLeader();
} catch (Throwable t) {
LOG.warn("Failed to unregister coordinator leader from Zookeeper", t);
}

// Clean up leader-specific resources in reverse order of initialization
try {
if (coordinatorEventProcessor != null) {
Expand Down Expand Up @@ -387,11 +396,6 @@ protected void cleanupCoordinatorLeader() throws Exception {
LOG.warn("Failed to close client metric group", t);
}

// Reset coordinator context for next election
if (coordinatorContext != null) {
coordinatorContext.resetContext();
}

LOG.info("Coordinator leader services cleaned up successfully.");
}
}
Expand Down Expand Up @@ -427,6 +431,11 @@ private void registerCoordinatorLeader() throws Exception {
"coordinator leader", () -> zkClient.registerCoordinatorLeader(coordinatorAddress));
}

private void unregisterCoordinatorLeader() throws Exception {
CoordinatorAddress coordinatorAddress = buildCoordinatorAddress();
zkClient.unregisterCoordinatorLeader(coordinatorAddress);
}

private CoordinatorAddress buildCoordinatorAddress() {
List<Endpoint> bindEndpoints = rpcServer.getBindEndpoints();
return new CoordinatorAddress(
Expand Down Expand Up @@ -581,15 +590,6 @@ CompletableFuture<Void> stopServices() {
exception = ExceptionUtils.firstOrSuppressed(t, exception);
}

try {
if (coordinatorContext != null) {
// then reset coordinatorContext
coordinatorContext.resetContext();
}
} catch (Throwable t) {
exception = ExceptionUtils.firstOrSuppressed(t, exception);
}

try {
if (lakeTableTieringManager != null) {
lakeTableTieringManager.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,8 @@ private Map<TableBucketReplica, LeaderAndIsr> doRemoveReplicaFromIsr(
toUpdateLeaderAndIsrList.put(tableBucket, adjustLeaderAndIsr);
}
try {
zooKeeperClient.batchUpdateLeaderAndIsr(toUpdateLeaderAndIsrList);
zooKeeperClient.batchUpdateLeaderAndIsr(
toUpdateLeaderAndIsrList, coordinatorContext.getCoordinatorZkVersion());
toUpdateLeaderAndIsrList.forEach(coordinatorContext::putBucketLeaderAndIsr);
return adjustedLeaderAndIsr;
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,8 @@ private Optional<ElectionResult> initLeaderForTableBuckets(
ElectionResult electionResult = optionalElectionResult.get();
LeaderAndIsr leaderAndIsr = electionResult.leaderAndIsr;
try {
zooKeeperClient.registerLeaderAndIsr(tableBucket, leaderAndIsr);
zooKeeperClient.registerLeaderAndIsr(
tableBucket, leaderAndIsr, coordinatorContext.getCoordinatorZkVersion());
} catch (Exception e) {
LOG.error(
"Fail to create state node for table bucket {} in zookeeper.",
Expand Down Expand Up @@ -379,7 +380,7 @@ public void batchHandleOnlineChangeAndInitLeader(Set<TableBucket> tableBuckets)
if (!tableBucketLeadAndIsrInfos.isEmpty()) {
try {
zooKeeperClient.batchRegisterLeaderAndIsrForTablePartition(
tableBucketLeadAndIsrInfos);
tableBucketLeadAndIsrInfos, coordinatorContext.getCoordinatorZkVersion());
registerSuccessList.addAll(tableBucketLeadAndIsrInfos);
} catch (Exception e) {
LOG.error(
Expand Down Expand Up @@ -454,7 +455,10 @@ private List<RegisterTableBucketLeadAndIsrInfo> tryRegisterLeaderAndIsrOneByOne(
List<RegisterTableBucketLeadAndIsrInfo> registerSuccessList = new ArrayList<>();
for (RegisterTableBucketLeadAndIsrInfo info : registerList) {
try {
zooKeeperClient.registerLeaderAndIsr(info.getTableBucket(), info.getLeaderAndIsr());
zooKeeperClient.registerLeaderAndIsr(
info.getTableBucket(),
info.getLeaderAndIsr(),
coordinatorContext.getCoordinatorZkVersion());
registerSuccessList.add(info);
} catch (Exception e) {
LOG.error(
Expand Down Expand Up @@ -496,7 +500,10 @@ private Optional<ElectionResult> electNewLeaderForTableBuckets(
}
ElectionResult electionResult = optionalElectionResult.get();
try {
zooKeeperClient.updateLeaderAndIsr(tableBucket, electionResult.leaderAndIsr);
zooKeeperClient.updateLeaderAndIsr(
tableBucket,
electionResult.leaderAndIsr,
coordinatorContext.getCoordinatorZkVersion());
} catch (Exception e) {
LOG.error(
"Fail to update bucket LeaderAndIsr for table bucket {}.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ protected String getGroupName(CharacterFilter filter) {
protected final void putVariables(Map<String, String> variables) {
variables.put("cluster_id", clusterId);
variables.put("host", hostname);
variables.put("server_id", serverId);
variables.put("server_id", String.valueOf(serverId));
}

public CoordinatorEventMetricGroup getOrAddEventTypeMetricGroup(
Expand Down
Loading