diff --git a/flume-ng-core/src/main/java/org/apache/flume/SinkRunner.java b/flume-ng-core/src/main/java/org/apache/flume/SinkRunner.java index f513bebc19..c16d933781 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/SinkRunner.java +++ b/flume-ng-core/src/main/java/org/apache/flume/SinkRunner.java @@ -25,7 +25,7 @@ import org.apache.flume.lifecycle.LifecycleState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - +import org.apache.flume.Sink.Status; /** *
* A driver for {@linkplain Sink sinks} that polls them, attempting to @@ -78,12 +78,7 @@ public void start() { policy.start(); - runner = new PollingRunner(); - - runner.policy = policy; - runner.counterGroup = counterGroup; - runner.shouldStop = new AtomicBoolean(); - + runner = new PollingRunner(policy,counterGroup); runnerThread = new Thread(runner); runnerThread.setName("SinkRunner-PollingRunner-" + policy.getClass().getSimpleName()); @@ -96,7 +91,7 @@ public void start() { public void stop() { if (runnerThread != null) { - runner.shouldStop.set(true); + runner.shouldStop=true; runnerThread.interrupt(); while (runnerThread.isAlive()) { @@ -133,16 +128,22 @@ public LifecycleState getLifecycleState() { public static class PollingRunner implements Runnable { private SinkProcessor policy; - private AtomicBoolean shouldStop; + private volatile boolean shouldStop; private CounterGroup counterGroup; + PollingRunner(SinkProcessor policy,CounterGroup counterGroup){ + this.policy=policy; + this.counterGroup=counterGroup; + this.shouldStop=false; + } @Override public void run() { logger.debug("Polling sink runner starting"); - - while (!shouldStop.get()) { - try { - if (policy.process().equals(Sink.Status.BACKOFF)) { + while (!shouldStop) { + try { + Status stat=policy.process(); + if(stat==null)continue; + if(stat.equals(Sink.Status.BACKOFF)) { counterGroup.incrementAndGet("runner.backoffs"); Thread.sleep(Math.min(