Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,12 @@ public class ConfigOptions {
+ " (“50100,50101”), ranges (“50100-50200”) or a combination of both."
+ "This option is deprecated. Please use bind.listeners instead, which provides a more flexible configuration for multiple ports");

public static final ConfigOption<Integer> COORDINATOR_ID =
key("coordinator.id")
.intType()
.noDefaultValue()
.withDescription("The id for the coordinator server.");

/**
* @deprecated This option is deprecated. Please use {@link ConfigOptions#SERVER_IO_POOL_SIZE}
* instead.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.fluss.exception;

/** Exception thrown when the Coordinator leader epoch is fenced. */
public class CoordinatorEpochFencedException extends RuntimeException {
public CoordinatorEpochFencedException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class MetricNames {
// metrics for coordinator server
// --------------------------------------------------------------------------------------------
public static final String ACTIVE_COORDINATOR_COUNT = "activeCoordinatorCount";
public static final String ALIVE_COORDINATOR_COUNT = "aliveCoordinatorCount";
public static final String ACTIVE_TABLET_SERVER_COUNT = "activeTabletServerCount";
public static final String OFFLINE_BUCKET_COUNT = "offlineBucketCount";
public static final String TABLE_COUNT = "tableCount";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public class CoordinatorContext {
private static final Logger LOG = LoggerFactory.getLogger(CoordinatorContext.class);

public static final int INITIAL_COORDINATOR_EPOCH = 0;
public static final int INITIAL_COORDINATOR_EPOCH_ZK_VERSION = 0;

// for simplicity, we just use retry time, may consider make it a configurable value
// and use combine retry times and retry delay
Expand All @@ -67,6 +68,7 @@ public class CoordinatorContext {
// a success deletion.
private final Map<TableBucketReplica, Integer> failDeleteNumbers = new HashMap<>();

private final Set<Integer> liveCoordinatorServers = new HashSet<>();
private final Map<Integer, ServerInfo> liveTabletServers = new HashMap<>();
private final Set<Integer> shuttingDownTabletServers = new HashSet<>();

Expand Down Expand Up @@ -108,13 +110,40 @@ public class CoordinatorContext {

private ServerInfo coordinatorServerInfo = null;
private int coordinatorEpoch = INITIAL_COORDINATOR_EPOCH;
private int coordinatorEpochZkVersion = INITIAL_COORDINATOR_EPOCH_ZK_VERSION;

public CoordinatorContext() {}

public int getCoordinatorEpoch() {
return coordinatorEpoch;
}

public int getCoordinatorEpochZkVersion() {
return coordinatorEpochZkVersion;
}

public void setCoordinatorEpochAndZkVersion(int newEpoch, int newZkVersion) {
this.coordinatorEpoch = newEpoch;
this.coordinatorEpochZkVersion = newZkVersion;
}

public Set<Integer> getLiveCoordinatorServers() {
return liveCoordinatorServers;
}

public void setLiveCoordinators(Set<Integer> servers) {
liveCoordinatorServers.clear();
liveCoordinatorServers.addAll(servers);
}

public void addLiveCoordinator(int serverId) {
this.liveCoordinatorServers.add(serverId);
}

public void removeLiveCoordinator(int serverId) {
this.liveCoordinatorServers.remove(serverId);
}

public Map<Integer, ServerInfo> getLiveTabletServers() {
return liveTabletServers;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,15 @@
import org.apache.fluss.server.coordinator.event.CoordinatorEventManager;
import org.apache.fluss.server.coordinator.event.CreatePartitionEvent;
import org.apache.fluss.server.coordinator.event.CreateTableEvent;
import org.apache.fluss.server.coordinator.event.DeadCoordinatorEvent;
import org.apache.fluss.server.coordinator.event.DeadTabletServerEvent;
import org.apache.fluss.server.coordinator.event.DeleteReplicaResponseReceivedEvent;
import org.apache.fluss.server.coordinator.event.DropPartitionEvent;
import org.apache.fluss.server.coordinator.event.DropTableEvent;
import org.apache.fluss.server.coordinator.event.EventProcessor;
import org.apache.fluss.server.coordinator.event.FencedCoordinatorEvent;
import org.apache.fluss.server.coordinator.event.ListRebalanceProgressEvent;
import org.apache.fluss.server.coordinator.event.NewCoordinatorEvent;
import org.apache.fluss.server.coordinator.event.NewTabletServerEvent;
import org.apache.fluss.server.coordinator.event.NotifyKvSnapshotOffsetEvent;
import org.apache.fluss.server.coordinator.event.NotifyLakeTableOffsetEvent;
Expand All @@ -86,6 +88,7 @@
import org.apache.fluss.server.coordinator.event.RemoveServerTagEvent;
import org.apache.fluss.server.coordinator.event.SchemaChangeEvent;
import org.apache.fluss.server.coordinator.event.TableRegistrationChangeEvent;
import org.apache.fluss.server.coordinator.event.watcher.CoordinatorChangeWatcher;
import org.apache.fluss.server.coordinator.event.watcher.TableChangeWatcher;
import org.apache.fluss.server.coordinator.event.watcher.TabletServerChangeWatcher;
import org.apache.fluss.server.coordinator.rebalance.RebalanceManager;
Expand Down Expand Up @@ -128,6 +131,7 @@

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -172,6 +176,7 @@ public class CoordinatorEventProcessor implements EventProcessor {
private final LakeTableTieringManager lakeTableTieringManager;
private final TableChangeWatcher tableChangeWatcher;
private final CoordinatorChannelManager coordinatorChannelManager;
private final CoordinatorChangeWatcher coordinatorChangeWatcher;
private final TabletServerChangeWatcher tabletServerChangeWatcher;
private final CoordinatorMetadataCache serverMetadataCache;
private final CoordinatorRequestBatch coordinatorRequestBatch;
Expand Down Expand Up @@ -224,6 +229,8 @@ public CoordinatorEventProcessor(
tableBucketStateMachine,
new RemoteStorageCleaner(conf, ioExecutor),
ioExecutor);
this.coordinatorChangeWatcher =
new CoordinatorChangeWatcher(zooKeeperClient, coordinatorEventManager);
this.tableChangeWatcher = new TableChangeWatcher(zooKeeperClient, coordinatorEventManager);
this.tabletServerChangeWatcher =
new TabletServerChangeWatcher(zooKeeperClient, coordinatorEventManager);
Expand Down Expand Up @@ -261,6 +268,7 @@ public CoordinatorContext getCoordinatorContext() {
public void startup() {
coordinatorContext.setCoordinatorServerInfo(getCoordinatorServerInfo());
// start watchers first so that we won't miss node in zk;
coordinatorChangeWatcher.start();
tabletServerChangeWatcher.start();
tableChangeWatcher.start();
LOG.info("Initializing coordinator context.");
Expand Down Expand Up @@ -304,14 +312,11 @@ public void shutdown() {
private ServerInfo getCoordinatorServerInfo() {
try {
return zooKeeperClient
.getCoordinatorAddress()
.getCoordinatorLeaderAddress()
.map(
coordinatorAddress ->
// TODO we set id to 0 as that CoordinatorServer don't support
// HA, if we support HA, we need to set id to the config
// CoordinatorServer id to avoid node drift.
new ServerInfo(
0,
coordinatorAddress.getId(),
null, // For coordinatorServer, no rack info
coordinatorAddress.getEndpoints(),
ServerType.COORDINATOR))
Expand All @@ -332,6 +337,12 @@ public int getCoordinatorEpoch() {

private void initCoordinatorContext() throws Exception {
long start = System.currentTimeMillis();
// get all coordinator servers
int[] currentCoordinatorServers = zooKeeperClient.getCoordinatorServerList();
coordinatorContext.setLiveCoordinators(
Arrays.stream(currentCoordinatorServers).boxed().collect(Collectors.toSet()));
LOG.info("Load coordinator servers success when initializing coordinator context.");

// get all tablet server's
int[] currentServers = zooKeeperClient.getSortedTabletServerList();
List<ServerInfo> tabletServerInfos = new ArrayList<>();
Expand Down Expand Up @@ -546,6 +557,7 @@ private void onShutdown() {
tableManager.shutdown();

// then stop watchers
coordinatorChangeWatcher.stop();
tableChangeWatcher.stop();
tabletServerChangeWatcher.stop();
}
Expand All @@ -570,6 +582,10 @@ public void process(CoordinatorEvent event) {
(NotifyLeaderAndIsrResponseReceivedEvent) event);
} else if (event instanceof DeleteReplicaResponseReceivedEvent) {
processDeleteReplicaResponseReceived((DeleteReplicaResponseReceivedEvent) event);
} else if (event instanceof NewCoordinatorEvent) {
processNewCoordinator((NewCoordinatorEvent) event);
} else if (event instanceof DeadCoordinatorEvent) {
processDeadCoordinator((DeadCoordinatorEvent) event);
} else if (event instanceof NewTabletServerEvent) {
processNewTabletServer((NewTabletServerEvent) event);
} else if (event instanceof DeadTabletServerEvent) {
Expand Down Expand Up @@ -981,6 +997,28 @@ private void onReplicaBecomeOffline(Set<TableBucketReplica> offlineReplicas) {
replicaStateMachine.handleStateChanges(offlineReplicas, OfflineReplica);
}

private void processNewCoordinator(NewCoordinatorEvent newCoordinatorEvent) {
int coordinatorServerId = newCoordinatorEvent.getServerId();
if (coordinatorContext.getLiveCoordinatorServers().contains(coordinatorServerId)) {
return;
}

// process new coordinator server
LOG.info("New coordinator server callback for coordinator server {}", coordinatorServerId);

coordinatorContext.addLiveCoordinator(coordinatorServerId);
}

private void processDeadCoordinator(DeadCoordinatorEvent deadCoordinatorEvent) {
int coordinatorServerId = deadCoordinatorEvent.getServerId();
if (!coordinatorContext.getLiveCoordinatorServers().contains(coordinatorServerId)) {
return;
}
// process dead coordinator server
LOG.info("Coordinator server failure callback for {}.", coordinatorServerId);
coordinatorContext.removeLiveCoordinator(coordinatorServerId);
}

private void processNewTabletServer(NewTabletServerEvent newTabletServerEvent) {
// NOTE: we won't need to detect bounced tablet servers like Kafka as we won't
// miss the event of tablet server un-register and register again since we can
Expand All @@ -1003,7 +1041,7 @@ private void processNewTabletServer(NewTabletServerEvent newTabletServerEvent) {
// it may happen during coordinator server initiation, the watcher watch a new tablet
// server register event and put it to event manager, but after that, the coordinator
// server read
// all tablet server nodes registered which contain the tablet server a; in this case,
// all tablet server nodes registered which contain the tablet server; in this case,
// we can ignore it.
return;
}
Expand Down Expand Up @@ -1565,7 +1603,10 @@ private void updateReplicaAssignmentForBucket(
tableAssignment.forEach(
(bucket, replicas) ->
newTableAssignment.put(bucket, new BucketAssignment(replicas)));
zooKeeperClient.updateTableAssignment(tableId, new TableAssignment(newTableAssignment));
zooKeeperClient.updateTableAssignment(
tableId,
new TableAssignment(newTableAssignment),
coordinatorContext.getCoordinatorEpochZkVersion());
} else {
Map<Integer, List<Integer>> partitionAssignment =
coordinatorContext.getPartitionAssignment(
Expand Down Expand Up @@ -1622,7 +1663,8 @@ private List<AdjustIsrResultForBucket> tryProcessAdjustIsr(
}

try {
zooKeeperClient.batchUpdateLeaderAndIsr(newLeaderAndIsrList);
zooKeeperClient.batchUpdateLeaderAndIsr(
newLeaderAndIsrList, coordinatorContext.getCoordinatorEpochZkVersion());
newLeaderAndIsrList.forEach(
(tableBucket, newLeaderAndIsr) ->
result.add(new AdjustIsrResultForBucket(tableBucket, newLeaderAndIsr)));
Expand All @@ -1633,7 +1675,10 @@ private List<AdjustIsrResultForBucket> tryProcessAdjustIsr(
TableBucket tableBucket = entry.getKey();
LeaderAndIsr newLeaderAndIsr = entry.getValue();
try {
zooKeeperClient.updateLeaderAndIsr(tableBucket, newLeaderAndIsr);
zooKeeperClient.updateLeaderAndIsr(
tableBucket,
newLeaderAndIsr,
coordinatorContext.getCoordinatorEpochZkVersion());
} catch (Exception e) {
LOG.error("Error when register leader and isr.", e);
result.add(
Expand Down Expand Up @@ -2151,7 +2196,8 @@ private void updateBucketEpochAndSendRequest(TableBucket tableBucket, List<Integ
LeaderAndIsr newLeaderAndIsr = leaderAndIsr.newLeaderAndIsr(leaderAndIsr.isr());

coordinatorContext.putBucketLeaderAndIsr(tableBucket, newLeaderAndIsr);
zooKeeperClient.updateLeaderAndIsr(tableBucket, newLeaderAndIsr);
zooKeeperClient.updateLeaderAndIsr(
tableBucket, newLeaderAndIsr, coordinatorContext.getCoordinatorEpochZkVersion());

coordinatorRequestBatch.newBatch();
coordinatorRequestBatch.addNotifyLeaderRequestForTabletServers(
Expand Down
Loading