From 46b8dd51a97532dbfe314da7552d2a2f2dd0c6bd Mon Sep 17 00:00:00 2001 From: tianruixiang Date: Fri, 6 Mar 2026 18:05:59 +0800 Subject: [PATCH] shutdown the old channel --- .../cluster/general/impl/UcoreSender.java | 159 ++++++++++-------- 1 file changed, 90 insertions(+), 69 deletions(-) diff --git a/src/main/java/com/actiontech/dble/cluster/general/impl/UcoreSender.java b/src/main/java/com/actiontech/dble/cluster/general/impl/UcoreSender.java index 2c4565a7b..9c0109396 100644 --- a/src/main/java/com/actiontech/dble/cluster/general/impl/UcoreSender.java +++ b/src/main/java/com/actiontech/dble/cluster/general/impl/UcoreSender.java @@ -19,7 +19,6 @@ import com.actiontech.dble.util.DebugUtil; import com.actiontech.dble.util.PropertiesUtil; import com.actiontech.dble.util.StringUtil; -import io.grpc.Channel; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import org.apache.commons.lang.StringUtils; @@ -43,6 +42,7 @@ public final class UcoreSender extends AbstractConsulSender { private volatile UcoreGrpc.UcoreBlockingStub stub = null; + private volatile ManagedChannel channel = null; private ConcurrentHashMap lockMap = new ConcurrentHashMap<>(); private List ipList = new ArrayList<>(); private static final String SOURCE_COMPONENT_TYPE = "dble"; @@ -56,7 +56,7 @@ public void initConInfo() { } catch (Exception e) { LOGGER.error("error:", e); } - Channel channel = ManagedChannelBuilder.forAddress(getIpList().get(0), + channel = ManagedChannelBuilder.forAddress(getIpList().get(0), ClusterConfig.getInstance().getClusterPort()).usePlaintext().build(); stub = UcoreGrpc.newBlockingStub(channel).withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS); } @@ -70,7 +70,7 @@ public void initCluster() { } catch (Exception e) { LOGGER.error("error:", e); } - Channel channel = ManagedChannelBuilder.forAddress(getIpList().get(0), + channel = ManagedChannelBuilder.forAddress(getIpList().get(0), ClusterConfig.getInstance().getClusterPort()).usePlaintext().build(); stub = UcoreGrpc.newBlockingStub(channel).withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS); if (!skipSyncUcores()) { @@ -129,17 +129,18 @@ private void log(String message, Exception e) { return output.getSessionId(); } catch (Exception e1) { for (String ip : getIpList()) { - ManagedChannel channel = null; + ManagedChannel newChannel = null; try { - channel = ManagedChannelBuilder.forAddress(ip, + newChannel = ManagedChannelBuilder.forAddress(ip, ClusterConfig.getInstance().getClusterPort()).usePlaintext().build(); - stub = UcoreGrpc.newBlockingStub(channel).withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS); - output = stub.withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS).lockOnSession(input); + UcoreGrpc.UcoreBlockingStub newStub = UcoreGrpc.newBlockingStub(newChannel).withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS); + output = newStub.withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS).lockOnSession(input); + replaceChannelAndStub(newChannel, newStub); return output.getSessionId(); } catch (Exception e2) { LOGGER.info("connect to ucore error ", e2); - if (channel != null) { - channel.shutdownNow(); + if (newChannel != null) { + newChannel.shutdownNow(); } } } @@ -168,17 +169,18 @@ public void setKV(String path, String value) throws Exception { stub.withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS).putKv(input); } catch (Exception e1) { for (String ip : getIpList()) { - ManagedChannel channel = null; + ManagedChannel newChannel = null; try { - channel = ManagedChannelBuilder.forAddress(ip, + newChannel = ManagedChannelBuilder.forAddress(ip, ClusterConfig.getInstance().getClusterPort()).usePlaintext().build(); - stub = UcoreGrpc.newBlockingStub(channel).withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS); - stub.withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS).putKv(input); + UcoreGrpc.UcoreBlockingStub newStub = UcoreGrpc.newBlockingStub(newChannel).withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS); + newStub.withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS).putKv(input); + replaceChannelAndStub(newChannel, newStub); return; } catch (Exception e2) { LOGGER.info("connect to ucore error ", e2); - if (channel != null) { - channel.shutdownNow(); + if (newChannel != null) { + newChannel.shutdownNow(); } } } @@ -195,16 +197,17 @@ public KvBean getKV(String path) { output = stub.withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS).getKv(input); } catch (Exception e1) { for (String ip : getIpList()) { - ManagedChannel channel = null; + ManagedChannel newChannel = null; try { - channel = ManagedChannelBuilder.forAddress(ip, + newChannel = ManagedChannelBuilder.forAddress(ip, ClusterConfig.getInstance().getClusterPort()).usePlaintext().build(); - stub = UcoreGrpc.newBlockingStub(channel).withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS); - output = stub.withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS).getKv(input); + UcoreGrpc.UcoreBlockingStub newStub = UcoreGrpc.newBlockingStub(newChannel).withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS); + output = newStub.withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS).getKv(input); + replaceChannelAndStub(newChannel, newStub); } catch (Exception e2) { LOGGER.info("connect to ucore error ", e2); - if (channel != null) { - channel.shutdownNow(); + if (newChannel != null) { + newChannel.shutdownNow(); } } } @@ -230,16 +233,17 @@ public List getKVPath(String path) { output = stub.withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS).getKvTree(input); } catch (Exception e1) { for (String ip : getIpList()) { - ManagedChannel channel = null; + ManagedChannel newChannel = null; try { - channel = ManagedChannelBuilder.forAddress(ip, + newChannel = ManagedChannelBuilder.forAddress(ip, ClusterConfig.getInstance().getClusterPort()).usePlaintext().build(); - stub = UcoreGrpc.newBlockingStub(channel).withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS); - output = stub.withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS).getKvTree(input); + UcoreGrpc.UcoreBlockingStub newStub = UcoreGrpc.newBlockingStub(newChannel).withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS); + output = newStub.withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS).getKvTree(input); + replaceChannelAndStub(newChannel, newStub); } catch (Exception e2) { LOGGER.info("connect to ucore error ", e2); - if (channel != null) { - channel.shutdownNow(); + if (newChannel != null) { + newChannel.shutdownNow(); } } } @@ -265,17 +269,18 @@ public void cleanPath(String path) { } catch (Exception e1) { boolean flag = false; for (String ip : getIpList()) { - ManagedChannel channel = null; + ManagedChannel newChannel = null; try { - channel = ManagedChannelBuilder.forAddress(ip, + newChannel = ManagedChannelBuilder.forAddress(ip, ClusterConfig.getInstance().getClusterPort()).usePlaintext().build(); - stub = UcoreGrpc.newBlockingStub(channel).withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS); - stub.withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS).deleteKvTree(input); + UcoreGrpc.UcoreBlockingStub newStub = UcoreGrpc.newBlockingStub(newChannel).withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS); + newStub.withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS).deleteKvTree(input); flag = true; + replaceChannelAndStub(newChannel, newStub); } catch (Exception e2) { LOGGER.info("connect to ucore error ", e2); - if (channel != null) { - channel.shutdownNow(); + if (newChannel != null) { + newChannel.shutdownNow(); } } } @@ -292,17 +297,18 @@ public void cleanKV(String path) { stub.withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS).deleteKv(input); } catch (Exception e1) { for (String ip : getIpList()) { - ManagedChannel channel = null; + ManagedChannel newChannel = null; try { - channel = ManagedChannelBuilder.forAddress(ip, + newChannel = ManagedChannelBuilder.forAddress(ip, ClusterConfig.getInstance().getClusterPort()).usePlaintext().build(); - stub = UcoreGrpc.newBlockingStub(channel).withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS); - stub.withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS).deleteKv(input); + UcoreGrpc.UcoreBlockingStub newStub = UcoreGrpc.newBlockingStub(newChannel).withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS); + newStub.withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS).deleteKv(input); + replaceChannelAndStub(newChannel, newStub); return; } catch (Exception e2) { LOGGER.info("connect to ucore error ", e2); - if (channel != null) { - channel.shutdownNow(); + if (newChannel != null) { + newChannel.shutdownNow(); } } } @@ -320,18 +326,19 @@ public SubscribeReturnBean subscribeKvPrefix(SubscribeRequest request) throws Ex return groupSubscribeResult(output); } catch (Exception e1) { for (String ip : getIpList()) { - ManagedChannel channel = null; + ManagedChannel newChannel = null; try { - channel = ManagedChannelBuilder.forAddress(ip, + newChannel = ManagedChannelBuilder.forAddress(ip, ClusterConfig.getInstance().getClusterPort()).usePlaintext().build(); - stub = UcoreGrpc.newBlockingStub(channel).withDeadlineAfter(GRPC_SUBTIMEOUT, TimeUnit.SECONDS); - UcoreInterface.SubscribeKvPrefixOutput output = stub.withDeadlineAfter(GRPC_SUBTIMEOUT, TimeUnit.SECONDS).subscribeKvPrefix(input); + UcoreGrpc.UcoreBlockingStub newStub = UcoreGrpc.newBlockingStub(newChannel).withDeadlineAfter(GRPC_SUBTIMEOUT, TimeUnit.SECONDS); + UcoreInterface.SubscribeKvPrefixOutput output = newStub.withDeadlineAfter(GRPC_SUBTIMEOUT, TimeUnit.SECONDS).subscribeKvPrefix(input); + replaceChannelAndStub(newChannel, newStub); return groupSubscribeResult(output); } catch (Exception e2) { LOGGER.info("connect to ucore at " + ip + " failure", e2); - if (channel != null) { - channel.shutdownNow(); + if (newChannel != null) { + newChannel.shutdownNow(); } } } @@ -346,16 +353,17 @@ public void alert(ClusterAlertBean alert) { stub.withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS).alert(input); } catch (Exception e) { for (String ip : getIpList()) { - ManagedChannel channel = null; + ManagedChannel newChannel = null; try { - channel = ManagedChannelBuilder.forAddress(ip, ClusterConfig.getInstance().getClusterPort()).usePlaintext().build(); - stub = UcoreGrpc.newBlockingStub(channel).withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS); - stub.withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS).alert(input); + newChannel = ManagedChannelBuilder.forAddress(ip, ClusterConfig.getInstance().getClusterPort()).usePlaintext().build(); + UcoreGrpc.UcoreBlockingStub newStub = UcoreGrpc.newBlockingStub(newChannel).withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS); + newStub.withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS).alert(input); + replaceChannelAndStub(newChannel, newStub); return; } catch (Exception e2) { LOGGER.info("alert to ucore error ", e2); - if (channel != null) { - channel.shutdownNow(); + if (newChannel != null) { + newChannel.shutdownNow(); } } } @@ -370,16 +378,17 @@ public boolean alertResolve(ClusterAlertBean alert) { return true; } catch (Exception e) { for (String ip : getIpList()) { - ManagedChannel channel = null; + ManagedChannel newChannel = null; try { - channel = ManagedChannelBuilder.forAddress(ip, ClusterConfig.getInstance().getClusterPort()).usePlaintext().build(); - stub = UcoreGrpc.newBlockingStub(channel).withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS); - stub.withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS).alertResolve(input); + newChannel = ManagedChannelBuilder.forAddress(ip, ClusterConfig.getInstance().getClusterPort()).usePlaintext().build(); + UcoreGrpc.UcoreBlockingStub newStub = UcoreGrpc.newBlockingStub(newChannel).withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS); + newStub.withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS).alertResolve(input); + replaceChannelAndStub(newChannel, newStub); return true; } catch (Exception e2) { LOGGER.info("alertResolve to ucore error ", e2); - if (channel != null) { - channel.shutdownNow(); + if (newChannel != null) { + newChannel.shutdownNow(); } return false; } @@ -415,17 +424,18 @@ public boolean renewLock(String sessionId) throws Exception { } catch (Exception e1) { LOGGER.info("connect to ucore renew error and will retry"); for (String ip : getIpList()) { - ManagedChannel channel = null; + ManagedChannel newChannel = null; try { - channel = ManagedChannelBuilder.forAddress(ip, + newChannel = ManagedChannelBuilder.forAddress(ip, ClusterConfig.getInstance().getClusterPort()).usePlaintext().build(); - stub = UcoreGrpc.newBlockingStub(channel).withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS); - stub.withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS).renewSession(input); + UcoreGrpc.UcoreBlockingStub newStub = UcoreGrpc.newBlockingStub(newChannel).withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS); + newStub.withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS).renewSession(input); + replaceChannelAndStub(newChannel, newStub); return true; } catch (Exception e2) { LOGGER.info("connect to ucore renew error " + stub, e2); - if (channel != null) { - channel.shutdownNow(); + if (newChannel != null) { + newChannel.shutdownNow(); } } } @@ -452,16 +462,18 @@ public UcoreInterface.SubscribeNodesOutput subscribeNodes(UcoreInterface.Subscri } catch (Exception e) { //the first try failure ,try for all the other ucore ip for (String ip : getIpList()) { - ManagedChannel channel = null; + ManagedChannel newChannel = null; try { - channel = ManagedChannelBuilder.forAddress(ip, + newChannel = ManagedChannelBuilder.forAddress(ip, ClusterConfig.getInstance().getClusterPort()).usePlaintext().build(); - stub = UcoreGrpc.newBlockingStub(channel).withDeadlineAfter(GRPC_SUBTIMEOUT, TimeUnit.SECONDS); - return stub.withDeadlineAfter(GRPC_SUBTIMEOUT, TimeUnit.SECONDS).subscribeNodes(subscribeNodesInput); + UcoreGrpc.UcoreBlockingStub newStub = UcoreGrpc.newBlockingStub(newChannel).withDeadlineAfter(GRPC_SUBTIMEOUT, TimeUnit.SECONDS); + UcoreInterface.SubscribeNodesOutput output = newStub.withDeadlineAfter(GRPC_SUBTIMEOUT, TimeUnit.SECONDS).subscribeNodes(subscribeNodesInput); + replaceChannelAndStub(newChannel, newStub); + return output; } catch (Exception e2) { LOGGER.info("try connection IP " + ip + " failure ", e2); - if (channel != null) { - channel.shutdownNow(); + if (newChannel != null) { + newChannel.shutdownNow(); } } } @@ -526,4 +538,13 @@ private boolean skipSyncUcores() { return false; } + private synchronized void replaceChannelAndStub(ManagedChannel newChannel, UcoreGrpc.UcoreBlockingStub newStub) { + if (channel != null) { + channel.shutdownNow(); + } + channel = newChannel; + stub = newStub; + } + + }