diff --git a/src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSourceFunction.java b/src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSourceFunction.java index e4456433..ea0dbed5 100644 --- a/src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSourceFunction.java +++ b/src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSourceFunction.java @@ -358,9 +358,12 @@ public void run(SourceContext context) throws Exception { } private void awaitTermination() throws InterruptedException { - while (runningChecker.isRunning()) { + while (runningChecker.isRunning() && runningChecker.getError()==null) { Thread.sleep(50); } + if(runningChecker.getError()!=null){ + throw new RuntimeException(runningChecker.getError()); + } } /** diff --git a/src/main/java/org/apache/flink/connector/rocketmq/legacy/RunningChecker.java b/src/main/java/org/apache/flink/connector/rocketmq/legacy/RunningChecker.java index 961a3029..2b4a52fd 100644 --- a/src/main/java/org/apache/flink/connector/rocketmq/legacy/RunningChecker.java +++ b/src/main/java/org/apache/flink/connector/rocketmq/legacy/RunningChecker.java @@ -20,6 +20,7 @@ public class RunningChecker implements Serializable { private volatile boolean isRunning = false; + private volatile Throwable error; public boolean isRunning() { return isRunning; @@ -28,4 +29,14 @@ public boolean isRunning() { public void setRunning(boolean running) { isRunning = running; } + + public void setError(Throwable e) { + this.error = e; + } + + public Throwable getError() { + return this.error; + } + + } diff --git a/src/main/java/org/apache/flink/connector/rocketmq/legacy/common/util/RetryUtil.java b/src/main/java/org/apache/flink/connector/rocketmq/legacy/common/util/RetryUtil.java index d11105b9..aa4a754d 100644 --- a/src/main/java/org/apache/flink/connector/rocketmq/legacy/common/util/RetryUtil.java +++ b/src/main/java/org/apache/flink/connector/rocketmq/legacy/common/util/RetryUtil.java @@ -57,6 +57,7 @@ public static T call(Callable callable, String errorMsg, RunningChecker r } catch (Exception ex) { if (retries >= MAX_ATTEMPTS) { if (null != runningChecker) { + runningChecker.setError(ex); runningChecker.setRunning(false); } throw new RuntimeException(ex);