From 14582b72b54a9773a0ee2625ff3835a405a75230 Mon Sep 17 00:00:00 2001 From: chennuo Date: Thu, 17 Aug 2023 16:33:43 +0800 Subject: [PATCH] Make job fail instead of finished when consumes exception when use legacy source --- .../rocketmq/legacy/RocketMQSourceFunction.java | 5 ++++- .../connector/rocketmq/legacy/RunningChecker.java | 11 +++++++++++ .../rocketmq/legacy/common/util/RetryUtil.java | 1 + 3 files changed, 16 insertions(+), 1 deletion(-) diff --git a/src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSourceFunction.java b/src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSourceFunction.java index e4456433..ea0dbed5 100644 --- a/src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSourceFunction.java +++ b/src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSourceFunction.java @@ -358,9 +358,12 @@ public void run(SourceContext context) throws Exception { } private void awaitTermination() throws InterruptedException { - while (runningChecker.isRunning()) { + while (runningChecker.isRunning() && runningChecker.getError()==null) { Thread.sleep(50); } + if(runningChecker.getError()!=null){ + throw new RuntimeException(runningChecker.getError()); + } } /** diff --git a/src/main/java/org/apache/flink/connector/rocketmq/legacy/RunningChecker.java b/src/main/java/org/apache/flink/connector/rocketmq/legacy/RunningChecker.java index 961a3029..2b4a52fd 100644 --- a/src/main/java/org/apache/flink/connector/rocketmq/legacy/RunningChecker.java +++ b/src/main/java/org/apache/flink/connector/rocketmq/legacy/RunningChecker.java @@ -20,6 +20,7 @@ public class RunningChecker implements Serializable { private volatile boolean isRunning = false; + private volatile Throwable error; public boolean isRunning() { return isRunning; @@ -28,4 +29,14 @@ public boolean isRunning() { public void setRunning(boolean running) { isRunning = running; } + + public void setError(Throwable e) { + this.error = e; + } + + public Throwable getError() { + return this.error; + } + + } diff --git a/src/main/java/org/apache/flink/connector/rocketmq/legacy/common/util/RetryUtil.java b/src/main/java/org/apache/flink/connector/rocketmq/legacy/common/util/RetryUtil.java index d11105b9..aa4a754d 100644 --- a/src/main/java/org/apache/flink/connector/rocketmq/legacy/common/util/RetryUtil.java +++ b/src/main/java/org/apache/flink/connector/rocketmq/legacy/common/util/RetryUtil.java @@ -57,6 +57,7 @@ public static T call(Callable callable, String errorMsg, RunningChecker r } catch (Exception ex) { if (retries >= MAX_ATTEMPTS) { if (null != runningChecker) { + runningChecker.setError(ex); runningChecker.setRunning(false); } throw new RuntimeException(ex);