|
29 | 29 | import org.apache.iotdb.confignode.client.DataNodeRequestType; |
30 | 30 | import org.apache.iotdb.confignode.client.async.datanode.AsyncDataNodeClientPool; |
31 | 31 | import org.apache.iotdb.confignode.client.sync.datanode.SyncDataNodeClientPool; |
| 32 | +import org.apache.iotdb.confignode.conf.ConfigNodeConfig; |
| 33 | +import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; |
32 | 34 | import org.apache.iotdb.confignode.consensus.request.write.RemoveDataNodePlan; |
33 | 35 | import org.apache.iotdb.confignode.consensus.request.write.UpdateRegionLocationPlan; |
34 | 36 | import org.apache.iotdb.confignode.consensus.response.DataNodeToStatusResp; |
35 | 37 | import org.apache.iotdb.confignode.manager.ConfigManager; |
| 38 | +import org.apache.iotdb.confignode.manager.load.heartbeat.BaseNodeCache; |
36 | 39 | import org.apache.iotdb.confignode.persistence.NodeInfo; |
37 | 40 | import org.apache.iotdb.confignode.procedure.exception.ProcedureException; |
38 | 41 | import org.apache.iotdb.confignode.procedure.scheduler.LockQueue; |
| 42 | +import org.apache.iotdb.consensus.ConsensusFactory; |
39 | 43 | import org.apache.iotdb.mpp.rpc.thrift.TCreatePeerReq; |
40 | 44 | import org.apache.iotdb.mpp.rpc.thrift.TDisableDataNodeReq; |
41 | 45 | import org.apache.iotdb.mpp.rpc.thrift.TMaintainPeerReq; |
|
53 | 57 | public class DataNodeRemoveHandler { |
54 | 58 | private static final Logger LOGGER = LoggerFactory.getLogger(DataNodeRemoveHandler.class); |
55 | 59 |
|
| 60 | + private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf(); |
| 61 | + |
56 | 62 | private final ConfigManager configManager; |
57 | 63 |
|
58 | 64 | /** region migrate lock */ |
@@ -386,14 +392,20 @@ public TSStatus stopDataNode(TDataNodeLocation dataNode) throws ProcedureExcepti |
386 | 392 | public DataNodeToStatusResp checkRemoveDataNodeRequest(RemoveDataNodePlan removeDataNodePlan) { |
387 | 393 | DataNodeToStatusResp dataSet = new DataNodeToStatusResp(); |
388 | 394 | dataSet.setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode())); |
389 | | - TSStatus status = checkRegionReplication(removeDataNodePlan); |
390 | | - if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { |
| 395 | + |
| 396 | + TSStatus status = checkClusterProtocol(); |
| 397 | + if (isFailed(status)) { |
| 398 | + dataSet.setStatus(status); |
| 399 | + return dataSet; |
| 400 | + } |
| 401 | + status = checkRegionReplication(removeDataNodePlan); |
| 402 | + if (isFailed(status)) { |
391 | 403 | dataSet.setStatus(status); |
392 | 404 | return dataSet; |
393 | 405 | } |
394 | 406 |
|
395 | 407 | status = checkDataNodeExist(removeDataNodePlan); |
396 | | - if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { |
| 408 | + if (isFailed(status)) { |
397 | 409 | dataSet.setStatus(status); |
398 | 410 | return dataSet; |
399 | 411 | } |
@@ -433,8 +445,31 @@ private TSStatus checkDataNodeExist(RemoveDataNodePlan removeDataNodePlan) { |
433 | 445 | */ |
434 | 446 | private TSStatus checkRegionReplication(RemoveDataNodePlan removeDataNodePlan) { |
435 | 447 | TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); |
436 | | - int removedDataNodeSize = removeDataNodePlan.getDataNodeLocations().size(); |
| 448 | + List<TDataNodeLocation> removedDataNodes = removeDataNodePlan.getDataNodeLocations(); |
437 | 449 | int allDataNodeSize = configManager.getNodeManager().getRegisteredDataNodeCount(); |
| 450 | + |
| 451 | + // when the configuration is one replication, it will be failed if the data node is not in |
| 452 | + // running state. |
| 453 | + if (CONF.getSchemaReplicationFactor() == 1 || CONF.getDataReplicationFactor() == 1) { |
| 454 | + for (TDataNodeLocation dataNodeLocation : removedDataNodes) { |
| 455 | + // check whether removed data node is in running state |
| 456 | + BaseNodeCache nodeCache = |
| 457 | + configManager.getNodeManager().getNodeCacheMap().get(dataNodeLocation.getDataNodeId()); |
| 458 | + if (!nodeCache.getNodeStatus().getStatus().equals("Running")) { |
| 459 | + removedDataNodes.remove(dataNodeLocation); |
| 460 | + LOGGER.error( |
| 461 | + "Failed to remove data node {} because it is not in running and the configuration of cluster is one replication", |
| 462 | + dataNodeLocation); |
| 463 | + } |
| 464 | + if (removedDataNodes.size() == 0) { |
| 465 | + status.setCode(TSStatusCode.LACK_REPLICATION.getStatusCode()); |
| 466 | + status.setMessage("Failed to remove all requested data nodes"); |
| 467 | + return status; |
| 468 | + } |
| 469 | + } |
| 470 | + } |
| 471 | + |
| 472 | + int removedDataNodeSize = removeDataNodePlan.getDataNodeLocations().size(); |
438 | 473 | if (allDataNodeSize - removedDataNodeSize < NodeInfo.getMinimumDataNode()) { |
439 | 474 | status.setCode(TSStatusCode.LACK_REPLICATION.getStatusCode()); |
440 | 475 | status.setMessage( |
@@ -492,4 +527,21 @@ private Optional<TDataNodeLocation> filterDataNodeWithOtherRegionReplica( |
492 | 527 | // TODO replace findAny() by select the low load node. |
493 | 528 | return regionReplicaNodes.stream().filter(e -> !e.equals(filterLocation)).findAny(); |
494 | 529 | } |
| 530 | + |
| 531 | + /** |
| 532 | + * Check the protocol of the cluster, standalone is not supported to remove data node currently |
| 533 | + * |
| 534 | + * @return SUCCEED_STATUS if the Cluster is not standalone protocol, REMOVE_DATANODE_FAILED |
| 535 | + * otherwise |
| 536 | + */ |
| 537 | + private TSStatus checkClusterProtocol() { |
| 538 | + TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()); |
| 539 | + if (CONF.getDataRegionConsensusProtocolClass().equals(ConsensusFactory.StandAloneConsensus) |
| 540 | + || CONF.getSchemaRegionConsensusProtocolClass() |
| 541 | + .equals(ConsensusFactory.StandAloneConsensus)) { |
| 542 | + status.setCode(TSStatusCode.REMOVE_DATANODE_FAILED.getStatusCode()); |
| 543 | + status.setMessage("standalone protocol is not supported to remove data node"); |
| 544 | + } |
| 545 | + return status; |
| 546 | + } |
495 | 547 | } |
0 commit comments