Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 14 additions & 13 deletions flume-ng-core/src/main/java/org/apache/flume/SinkRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.flume.lifecycle.LifecycleState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.flume.Sink.Status;
/**
* <p>
* A driver for {@linkplain Sink sinks} that polls them, attempting to
Expand Down Expand Up @@ -78,12 +78,7 @@ public void start() {

policy.start();

runner = new PollingRunner();

runner.policy = policy;
runner.counterGroup = counterGroup;
runner.shouldStop = new AtomicBoolean();

runner = new PollingRunner(policy,counterGroup);
runnerThread = new Thread(runner);
runnerThread.setName("SinkRunner-PollingRunner-" +
policy.getClass().getSimpleName());
Expand All @@ -96,7 +91,7 @@ public void start() {
public void stop() {

if (runnerThread != null) {
runner.shouldStop.set(true);
runner.shouldStop=true;
runnerThread.interrupt();

while (runnerThread.isAlive()) {
Expand Down Expand Up @@ -133,16 +128,22 @@ public LifecycleState getLifecycleState() {
public static class PollingRunner implements Runnable {

private SinkProcessor policy;
private AtomicBoolean shouldStop;
private volatile boolean shouldStop;
private CounterGroup counterGroup;
PollingRunner(SinkProcessor policy,CounterGroup counterGroup){
this.policy=policy;
this.counterGroup=counterGroup;
this.shouldStop=false;
}

@Override
public void run() {
logger.debug("Polling sink runner starting");

while (!shouldStop.get()) {
try {
if (policy.process().equals(Sink.Status.BACKOFF)) {
while (!shouldStop) {
try {
Status stat=policy.process();
if(stat==null)continue;
if(stat.equals(Sink.Status.BACKOFF)) {
counterGroup.incrementAndGet("runner.backoffs");

Thread.sleep(Math.min(
Expand Down