Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.actiontech.dble.util.DebugUtil;
import com.actiontech.dble.util.PropertiesUtil;
import com.actiontech.dble.util.StringUtil;
import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import org.apache.commons.lang.StringUtils;
Expand All @@ -43,6 +42,7 @@ public final class UcoreSender extends AbstractConsulSender {


private volatile UcoreGrpc.UcoreBlockingStub stub = null;
private volatile ManagedChannel channel = null;
private ConcurrentHashMap<String, Thread> lockMap = new ConcurrentHashMap<>();
private List<String> ipList = new ArrayList<>();
private static final String SOURCE_COMPONENT_TYPE = "dble";
Expand All @@ -56,7 +56,7 @@ public void initConInfo() {
} catch (Exception e) {
LOGGER.error("error:", e);
}
Channel channel = ManagedChannelBuilder.forAddress(getIpList().get(0),
channel = ManagedChannelBuilder.forAddress(getIpList().get(0),
ClusterConfig.getInstance().getClusterPort()).usePlaintext().build();
stub = UcoreGrpc.newBlockingStub(channel).withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS);
}
Expand All @@ -70,7 +70,7 @@ public void initCluster() {
} catch (Exception e) {
LOGGER.error("error:", e);
}
Channel channel = ManagedChannelBuilder.forAddress(getIpList().get(0),
channel = ManagedChannelBuilder.forAddress(getIpList().get(0),
ClusterConfig.getInstance().getClusterPort()).usePlaintext().build();
stub = UcoreGrpc.newBlockingStub(channel).withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS);
if (!skipSyncUcores()) {
Expand Down Expand Up @@ -129,17 +129,18 @@ private void log(String message, Exception e) {
return output.getSessionId();
} catch (Exception e1) {
for (String ip : getIpList()) {
ManagedChannel channel = null;
ManagedChannel newChannel = null;
try {
channel = ManagedChannelBuilder.forAddress(ip,
newChannel = ManagedChannelBuilder.forAddress(ip,
ClusterConfig.getInstance().getClusterPort()).usePlaintext().build();
stub = UcoreGrpc.newBlockingStub(channel).withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS);
output = stub.withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS).lockOnSession(input);
UcoreGrpc.UcoreBlockingStub newStub = UcoreGrpc.newBlockingStub(newChannel).withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS);
output = newStub.withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS).lockOnSession(input);
replaceChannelAndStub(newChannel, newStub);
return output.getSessionId();
} catch (Exception e2) {
LOGGER.info("connect to ucore error ", e2);
if (channel != null) {
channel.shutdownNow();
if (newChannel != null) {
newChannel.shutdownNow();
}
}
}
Expand Down Expand Up @@ -168,17 +169,18 @@ public void setKV(String path, String value) throws Exception {
stub.withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS).putKv(input);
} catch (Exception e1) {
for (String ip : getIpList()) {
ManagedChannel channel = null;
ManagedChannel newChannel = null;
try {
channel = ManagedChannelBuilder.forAddress(ip,
newChannel = ManagedChannelBuilder.forAddress(ip,
ClusterConfig.getInstance().getClusterPort()).usePlaintext().build();
stub = UcoreGrpc.newBlockingStub(channel).withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS);
stub.withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS).putKv(input);
UcoreGrpc.UcoreBlockingStub newStub = UcoreGrpc.newBlockingStub(newChannel).withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS);
newStub.withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS).putKv(input);
replaceChannelAndStub(newChannel, newStub);
return;
} catch (Exception e2) {
LOGGER.info("connect to ucore error ", e2);
if (channel != null) {
channel.shutdownNow();
if (newChannel != null) {
newChannel.shutdownNow();
}
}
}
Expand All @@ -195,16 +197,17 @@ public KvBean getKV(String path) {
output = stub.withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS).getKv(input);
} catch (Exception e1) {
for (String ip : getIpList()) {
ManagedChannel channel = null;
ManagedChannel newChannel = null;
try {
channel = ManagedChannelBuilder.forAddress(ip,
newChannel = ManagedChannelBuilder.forAddress(ip,
ClusterConfig.getInstance().getClusterPort()).usePlaintext().build();
stub = UcoreGrpc.newBlockingStub(channel).withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS);
output = stub.withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS).getKv(input);
UcoreGrpc.UcoreBlockingStub newStub = UcoreGrpc.newBlockingStub(newChannel).withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS);
output = newStub.withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS).getKv(input);
replaceChannelAndStub(newChannel, newStub);
} catch (Exception e2) {
LOGGER.info("connect to ucore error ", e2);
if (channel != null) {
channel.shutdownNow();
if (newChannel != null) {
newChannel.shutdownNow();
}
}
}
Expand All @@ -230,16 +233,17 @@ public List<KvBean> getKVPath(String path) {
output = stub.withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS).getKvTree(input);
} catch (Exception e1) {
for (String ip : getIpList()) {
ManagedChannel channel = null;
ManagedChannel newChannel = null;
try {
channel = ManagedChannelBuilder.forAddress(ip,
newChannel = ManagedChannelBuilder.forAddress(ip,
ClusterConfig.getInstance().getClusterPort()).usePlaintext().build();
stub = UcoreGrpc.newBlockingStub(channel).withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS);
output = stub.withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS).getKvTree(input);
UcoreGrpc.UcoreBlockingStub newStub = UcoreGrpc.newBlockingStub(newChannel).withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS);
output = newStub.withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS).getKvTree(input);
replaceChannelAndStub(newChannel, newStub);
} catch (Exception e2) {
LOGGER.info("connect to ucore error ", e2);
if (channel != null) {
channel.shutdownNow();
if (newChannel != null) {
newChannel.shutdownNow();
}
}
}
Expand All @@ -265,17 +269,18 @@ public void cleanPath(String path) {
} catch (Exception e1) {
boolean flag = false;
for (String ip : getIpList()) {
ManagedChannel channel = null;
ManagedChannel newChannel = null;
try {
channel = ManagedChannelBuilder.forAddress(ip,
newChannel = ManagedChannelBuilder.forAddress(ip,
ClusterConfig.getInstance().getClusterPort()).usePlaintext().build();
stub = UcoreGrpc.newBlockingStub(channel).withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS);
stub.withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS).deleteKvTree(input);
UcoreGrpc.UcoreBlockingStub newStub = UcoreGrpc.newBlockingStub(newChannel).withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS);
newStub.withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS).deleteKvTree(input);
flag = true;
replaceChannelAndStub(newChannel, newStub);
} catch (Exception e2) {
LOGGER.info("connect to ucore error ", e2);
if (channel != null) {
channel.shutdownNow();
if (newChannel != null) {
newChannel.shutdownNow();
}
}
}
Expand All @@ -292,17 +297,18 @@ public void cleanKV(String path) {
stub.withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS).deleteKv(input);
} catch (Exception e1) {
for (String ip : getIpList()) {
ManagedChannel channel = null;
ManagedChannel newChannel = null;
try {
channel = ManagedChannelBuilder.forAddress(ip,
newChannel = ManagedChannelBuilder.forAddress(ip,
ClusterConfig.getInstance().getClusterPort()).usePlaintext().build();
stub = UcoreGrpc.newBlockingStub(channel).withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS);
stub.withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS).deleteKv(input);
UcoreGrpc.UcoreBlockingStub newStub = UcoreGrpc.newBlockingStub(newChannel).withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS);
newStub.withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS).deleteKv(input);
replaceChannelAndStub(newChannel, newStub);
return;
} catch (Exception e2) {
LOGGER.info("connect to ucore error ", e2);
if (channel != null) {
channel.shutdownNow();
if (newChannel != null) {
newChannel.shutdownNow();
}
}
}
Expand All @@ -320,18 +326,19 @@ public SubscribeReturnBean subscribeKvPrefix(SubscribeRequest request) throws Ex
return groupSubscribeResult(output);
} catch (Exception e1) {
for (String ip : getIpList()) {
ManagedChannel channel = null;
ManagedChannel newChannel = null;
try {
channel = ManagedChannelBuilder.forAddress(ip,
newChannel = ManagedChannelBuilder.forAddress(ip,
ClusterConfig.getInstance().getClusterPort()).usePlaintext().build();
stub = UcoreGrpc.newBlockingStub(channel).withDeadlineAfter(GRPC_SUBTIMEOUT, TimeUnit.SECONDS);
UcoreInterface.SubscribeKvPrefixOutput output = stub.withDeadlineAfter(GRPC_SUBTIMEOUT, TimeUnit.SECONDS).subscribeKvPrefix(input);
UcoreGrpc.UcoreBlockingStub newStub = UcoreGrpc.newBlockingStub(newChannel).withDeadlineAfter(GRPC_SUBTIMEOUT, TimeUnit.SECONDS);
UcoreInterface.SubscribeKvPrefixOutput output = newStub.withDeadlineAfter(GRPC_SUBTIMEOUT, TimeUnit.SECONDS).subscribeKvPrefix(input);
replaceChannelAndStub(newChannel, newStub);
return groupSubscribeResult(output);

} catch (Exception e2) {
LOGGER.info("connect to ucore at " + ip + " failure", e2);
if (channel != null) {
channel.shutdownNow();
if (newChannel != null) {
newChannel.shutdownNow();
}
}
}
Expand All @@ -346,16 +353,17 @@ public void alert(ClusterAlertBean alert) {
stub.withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS).alert(input);
} catch (Exception e) {
for (String ip : getIpList()) {
ManagedChannel channel = null;
ManagedChannel newChannel = null;
try {
channel = ManagedChannelBuilder.forAddress(ip, ClusterConfig.getInstance().getClusterPort()).usePlaintext().build();
stub = UcoreGrpc.newBlockingStub(channel).withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS);
stub.withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS).alert(input);
newChannel = ManagedChannelBuilder.forAddress(ip, ClusterConfig.getInstance().getClusterPort()).usePlaintext().build();
UcoreGrpc.UcoreBlockingStub newStub = UcoreGrpc.newBlockingStub(newChannel).withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS);
newStub.withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS).alert(input);
replaceChannelAndStub(newChannel, newStub);
return;
} catch (Exception e2) {
LOGGER.info("alert to ucore error ", e2);
if (channel != null) {
channel.shutdownNow();
if (newChannel != null) {
newChannel.shutdownNow();
}
}
}
Expand All @@ -370,16 +378,17 @@ public boolean alertResolve(ClusterAlertBean alert) {
return true;
} catch (Exception e) {
for (String ip : getIpList()) {
ManagedChannel channel = null;
ManagedChannel newChannel = null;
try {
channel = ManagedChannelBuilder.forAddress(ip, ClusterConfig.getInstance().getClusterPort()).usePlaintext().build();
stub = UcoreGrpc.newBlockingStub(channel).withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS);
stub.withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS).alertResolve(input);
newChannel = ManagedChannelBuilder.forAddress(ip, ClusterConfig.getInstance().getClusterPort()).usePlaintext().build();
UcoreGrpc.UcoreBlockingStub newStub = UcoreGrpc.newBlockingStub(newChannel).withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS);
newStub.withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS).alertResolve(input);
replaceChannelAndStub(newChannel, newStub);
return true;
} catch (Exception e2) {
LOGGER.info("alertResolve to ucore error ", e2);
if (channel != null) {
channel.shutdownNow();
if (newChannel != null) {
newChannel.shutdownNow();
}
return false;
}
Expand Down Expand Up @@ -415,17 +424,18 @@ public boolean renewLock(String sessionId) throws Exception {
} catch (Exception e1) {
LOGGER.info("connect to ucore renew error and will retry");
for (String ip : getIpList()) {
ManagedChannel channel = null;
ManagedChannel newChannel = null;
try {
channel = ManagedChannelBuilder.forAddress(ip,
newChannel = ManagedChannelBuilder.forAddress(ip,
ClusterConfig.getInstance().getClusterPort()).usePlaintext().build();
stub = UcoreGrpc.newBlockingStub(channel).withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS);
stub.withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS).renewSession(input);
UcoreGrpc.UcoreBlockingStub newStub = UcoreGrpc.newBlockingStub(newChannel).withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS);
newStub.withDeadlineAfter(GENERAL_GRPC_TIMEOUT, TimeUnit.SECONDS).renewSession(input);
replaceChannelAndStub(newChannel, newStub);
return true;
} catch (Exception e2) {
LOGGER.info("connect to ucore renew error " + stub, e2);
if (channel != null) {
channel.shutdownNow();
if (newChannel != null) {
newChannel.shutdownNow();
}
}
}
Expand All @@ -452,16 +462,18 @@ public UcoreInterface.SubscribeNodesOutput subscribeNodes(UcoreInterface.Subscri
} catch (Exception e) {
//the first try failure ,try for all the other ucore ip
for (String ip : getIpList()) {
ManagedChannel channel = null;
ManagedChannel newChannel = null;
try {
channel = ManagedChannelBuilder.forAddress(ip,
newChannel = ManagedChannelBuilder.forAddress(ip,
ClusterConfig.getInstance().getClusterPort()).usePlaintext().build();
stub = UcoreGrpc.newBlockingStub(channel).withDeadlineAfter(GRPC_SUBTIMEOUT, TimeUnit.SECONDS);
return stub.withDeadlineAfter(GRPC_SUBTIMEOUT, TimeUnit.SECONDS).subscribeNodes(subscribeNodesInput);
UcoreGrpc.UcoreBlockingStub newStub = UcoreGrpc.newBlockingStub(newChannel).withDeadlineAfter(GRPC_SUBTIMEOUT, TimeUnit.SECONDS);
UcoreInterface.SubscribeNodesOutput output = newStub.withDeadlineAfter(GRPC_SUBTIMEOUT, TimeUnit.SECONDS).subscribeNodes(subscribeNodesInput);
replaceChannelAndStub(newChannel, newStub);
return output;
} catch (Exception e2) {
LOGGER.info("try connection IP " + ip + " failure ", e2);
if (channel != null) {
channel.shutdownNow();
if (newChannel != null) {
newChannel.shutdownNow();
}
}
}
Expand Down Expand Up @@ -526,4 +538,13 @@ private boolean skipSyncUcores() {
return false;
}

private synchronized void replaceChannelAndStub(ManagedChannel newChannel, UcoreGrpc.UcoreBlockingStub newStub) {
if (channel != null) {
channel.shutdownNow();
}
channel = newChannel;
stub = newStub;
}


}
Loading