|
70 | 70 | import org.apache.iotdb.session.pool.SessionPool; |
71 | 71 | import org.apache.iotdb.session.pool.TableSessionPoolBuilder; |
72 | 72 |
|
| 73 | +import org.apache.thrift.TConfiguration; |
73 | 74 | import org.apache.thrift.TException; |
| 75 | +import org.apache.thrift.transport.TSocket; |
74 | 76 | import org.apache.thrift.transport.TTransportException; |
75 | 77 | import org.slf4j.Logger; |
76 | 78 |
|
@@ -1402,7 +1404,7 @@ public void shutdownForciblyAllDataNodes() { |
1402 | 1404 |
|
1403 | 1405 | @Override |
1404 | 1406 | public void ensureNodeStatus( |
1405 | | - final List<BaseNodeWrapper> nodes, final List<NodeStatus> targetStatus) |
| 1407 | + final List<BaseNodeWrapper> nodes, final List<NodeStatus> targetStatusList) |
1406 | 1408 | throws IllegalStateException { |
1407 | 1409 | Throwable lastException = null; |
1408 | 1410 | for (int i = 0; i < retryCount; i++) { |
@@ -1430,20 +1432,37 @@ public void ensureNodeStatus( |
1430 | 1432 | + node.getClientRpcEndPoint().getPort(), |
1431 | 1433 | node.getDataNodeId())); |
1432 | 1434 | for (int j = 0; j < nodes.size(); j++) { |
1433 | | - final String endpoint = nodes.get(j).getIpAndPortString(); |
| 1435 | + BaseNodeWrapper nodeWrapper = nodes.get(j); |
| 1436 | + String ipAndPortString = nodeWrapper.getIpAndPortString(); |
| 1437 | + final String endpoint = ipAndPortString; |
1434 | 1438 | if (!nodeIds.containsKey(endpoint)) { |
1435 | 1439 | // Node not exist |
1436 | 1440 | // Notice: Never modify this line, since the NodeLocation might be modified in IT |
1437 | 1441 | errorMessages.add("The node " + nodes.get(j).getIpAndPortString() + " is not found!"); |
1438 | 1442 | continue; |
1439 | 1443 | } |
1440 | 1444 | final String status = showClusterResp.getNodeStatus().get(nodeIds.get(endpoint)); |
1441 | | - if (!targetStatus.get(j).getStatus().equals(status)) { |
| 1445 | + final NodeStatus targetStatus = targetStatusList.get(j); |
| 1446 | + if (!targetStatus.getStatus().equals(status)) { |
1442 | 1447 | // Error status |
1443 | 1448 | errorMessages.add( |
1444 | 1449 | String.format( |
1445 | 1450 | "Node %s is in status %s, but expected %s", |
1446 | | - endpoint, status, targetStatus.get(j))); |
| 1451 | + endpoint, status, targetStatusList.get(j))); |
| 1452 | + continue; |
| 1453 | + } |
| 1454 | + if (nodeWrapper instanceof DataNodeWrapper && targetStatus.equals(NodeStatus.Running)) { |
| 1455 | + final String[] ipPort = nodeWrapper.getIpAndPortString().split(":"); |
| 1456 | + final String ip = ipPort[0]; |
| 1457 | + final int port = Integer.parseInt(ipPort[1]); |
| 1458 | + try (TSocket socket = new TSocket(new TConfiguration(), ip, port, 1000)) { |
| 1459 | + socket.open(); |
| 1460 | + } catch (final TTransportException e) { |
| 1461 | + errorMessages.add( |
| 1462 | + String.format( |
| 1463 | + "DataNode %s is not reachable: %s", |
| 1464 | + nodeWrapper.getIpAndPortString(), e.getMessage())); |
| 1465 | + } |
1447 | 1466 | } |
1448 | 1467 | } |
1449 | 1468 | if (errorMessages.isEmpty()) { |
|
0 commit comments