Skip to content

Commit 5ada25f

Browse files
authored
feat:call processingReturn/Completion to close even if task fails to process (#271)
1 parent 818b7c8 commit 5ada25f

2 files changed

Lines changed: 132 additions & 10 deletions

File tree

processor/src/main/java/com/linecorp/decaton/processor/runtime/internal/ProcessingContextImpl.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -127,27 +127,27 @@ private <P> Completion pushDownStream(List<DecatonProcessor<P>> downstreams, P t
127127
log.error("Exception from tracing", e);
128128
}
129129
nextProcessor.process(nextContext, taskData);
130+
} finally {
130131
try {
131132
traceHandle.processingReturn();
132133
} catch (Exception e) {
133134
log.error("Exception from tracing", e);
134135
}
135-
} finally {
136136
completion = nextContext.deferredCompletion.get();
137137
if (completion == null) {
138138
// If process didn't requested for deferred completion, we understand it as process
139139
// completed synchronously.
140140
completion = CompletionImpl.completedCompletion();
141141
}
142-
}
143142

144-
completion.asFuture().whenComplete((unused, throwable) -> {
145-
try {
146-
traceHandle.processingCompletion();
147-
} catch (Exception e) {
148-
log.error("Exception from tracing", e);
149-
}
150-
});
143+
completion.asFuture().whenComplete((unused, throwable) -> {
144+
try {
145+
traceHandle.processingCompletion();
146+
} catch (Exception e) {
147+
log.error("Exception from tracing", e);
148+
}
149+
});
150+
}
151151
return completion;
152152
}
153153

processor/src/test/java/com/linecorp/decaton/processor/runtime/internal/ProcessingContextImplTest.java

Lines changed: 123 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import java.util.Arrays;
3636
import java.util.Collections;
3737
import java.util.concurrent.CompletableFuture;
38+
import java.util.concurrent.CompletionException;
3839
import java.util.concurrent.CountDownLatch;
3940
import java.util.concurrent.ExecutorService;
4041
import java.util.concurrent.Executors;
@@ -475,7 +476,7 @@ public void testTrace_Async() throws InterruptedException {
475476
assertFalse(comp.isComplete());
476477
// The trace for this processor should no longer be "current"
477478
// (because sync execution has finished)
478-
// but it is should still be "open"
479+
// but it should still be "open"
479480
assertNull(TestTracingProvider.getCurrentTraceId());
480481
assertThat(TestTracingProvider.getOpenTraces(), hasItem("testTrace_Async-Async"));
481482

@@ -488,6 +489,127 @@ public void testTrace_Async() throws InterruptedException {
488489
// Trace ID is not propagated unless the implementation does so manually
489490
assertNull(traceDuringAsyncProcessing.get());
490491
} finally {
492+
executor.shutdownNow();
493+
handle.processingCompletion();
494+
}
495+
TestTracingProvider.assertAllTracesWereClosed();
496+
}
497+
498+
@Test
499+
@Timeout(5)
500+
public void testTrace_SyncFailure() {
501+
RecordTraceHandle handle = new TestTraceHandle("testTrace");
502+
final AtomicReference<String> traceDuringProcessing = new AtomicReference<>();
503+
try {
504+
ProcessingContextImpl<HelloTask> context = context(handle,
505+
new NamedProcessor("SyncFail", (ctx, task) -> {
506+
traceDuringProcessing
507+
.set(TestTracingProvider.getCurrentTraceId());
508+
throw new RuntimeException("task failure");
509+
}));
510+
511+
RuntimeException e = assertThrows(RuntimeException.class, () -> context.push(TASK));
512+
assertEquals("task failure", e.getMessage());
513+
assertNull(TestTracingProvider.getCurrentTraceId());
514+
assertEquals("testTrace-SyncFail", traceDuringProcessing.get());
515+
// Trace is closed normally
516+
assertThat(TestTracingProvider.getOpenTraces(), not(hasItem("testTrace-SyncFail")));
517+
} finally {
518+
handle.processingCompletion();
519+
}
520+
TestTracingProvider.assertAllTracesWereClosed();
521+
}
522+
523+
@Test
524+
@Timeout(5)
525+
public void testTrace_AsyncFailure() throws InterruptedException {
526+
CountDownLatch latch = new CountDownLatch(1);
527+
ExecutorService executor = Executors.newSingleThreadExecutor();
528+
RecordTraceHandle handle = new TestTraceHandle("testTrace");
529+
final AtomicReference<String> traceDuringSyncProcessing = new AtomicReference<>();
530+
final AtomicReference<String> traceDuringAsyncProcessing = new AtomicReference<>();
531+
try {
532+
ProcessingContextImpl<HelloTask> context =
533+
context(handle, new NamedProcessor("AsyncFail", (ctx, task) -> {
534+
DeferredCompletion comp = ctx.deferCompletion();
535+
traceDuringSyncProcessing.set(TestTracingProvider.getCurrentTraceId());
536+
executor.execute(() -> {
537+
traceDuringAsyncProcessing.set(TestTracingProvider.getCurrentTraceId());
538+
safeAwait(latch);
539+
comp.complete();
540+
});
541+
// exception outside async call
542+
throw new RuntimeException("task failure");
543+
}));
544+
545+
RuntimeException e = assertThrows(RuntimeException.class, () -> context.push(TASK));
546+
assertEquals("task failure", e.getMessage());
547+
assertNull(TestTracingProvider.getCurrentTraceId());
548+
549+
// Trace is still be "open"
550+
assertThat(TestTracingProvider.getOpenTraces(), hasItem("testTrace-AsyncFail"));
551+
assertEquals("testTrace-AsyncFail", traceDuringSyncProcessing.get());
552+
assertNull(traceDuringAsyncProcessing.get());
553+
554+
latch.countDown();
555+
terminateExecutor(executor);
556+
// Trace is closed normally
557+
assertThat(TestTracingProvider.getOpenTraces(), not(hasItem("testTrace-AsyncFail")));
558+
// Trace ID is not propagated unless the implementation does so manually
559+
assertNull(traceDuringAsyncProcessing.get());
560+
} finally {
561+
executor.shutdownNow();
562+
handle.processingCompletion();
563+
}
564+
TestTracingProvider.assertAllTracesWereClosed();
565+
}
566+
567+
@Test
568+
@Timeout(5)
569+
public void testTrace_AsyncThreadFailure() throws InterruptedException {
570+
CountDownLatch latch = new CountDownLatch(1);
571+
ExecutorService executor = Executors.newSingleThreadExecutor();
572+
RecordTraceHandle handle = new TestTraceHandle("testTrace_AsyncThreadFailure");
573+
final AtomicReference<String> traceDuringSyncProcessing = new AtomicReference<>();
574+
final AtomicReference<String> traceDuringAsyncProcessing = new AtomicReference<>();
575+
final AtomicReference<CompletableFuture<Void>> asyncStageRef = new AtomicReference<>();
576+
try {
577+
ProcessingContextImpl<HelloTask> context =
578+
context(handle, new NamedProcessor("AsyncThreadFail", (ctx, task) -> {
579+
DeferredCompletion comp = ctx.deferCompletion();
580+
traceDuringSyncProcessing.set(TestTracingProvider.getCurrentTraceId());
581+
582+
CompletableFuture<Void> asyncStage = CompletableFuture.runAsync(() -> {
583+
traceDuringAsyncProcessing.set(TestTracingProvider.getCurrentTraceId());
584+
safeAwait(latch);
585+
// inside async call
586+
throw new RuntimeException("async failure");
587+
}, executor);
588+
asyncStageRef.set(asyncStage);
589+
comp.completeWith(asyncStage);
590+
}));
591+
592+
Completion comp = context.push(TASK);
593+
assertFalse(comp.isComplete());
594+
assertNull(TestTracingProvider.getCurrentTraceId());
595+
596+
// Trace is still be "open"
597+
assertThat(TestTracingProvider.getOpenTraces(),
598+
hasItem("testTrace_AsyncThreadFailure-AsyncThreadFail"));
599+
assertEquals("testTrace_AsyncThreadFailure-AsyncThreadFail", traceDuringSyncProcessing.get());
600+
assertNull(traceDuringAsyncProcessing.get());
601+
602+
latch.countDown();
603+
assertThrows(CompletionException.class, () -> asyncStageRef.get().join());
604+
comp.asFuture().toCompletableFuture().join();
605+
assertTrue(comp.isComplete());
606+
// Trace is closed normally
607+
assertThat(TestTracingProvider.getOpenTraces(),
608+
not(hasItem("testTrace_AsyncThreadFailure-AsyncThreadFail")));
609+
// Trace ID is not propagated unless the implementation does so manually
610+
assertNull(traceDuringAsyncProcessing.get());
611+
} finally {
612+
executor.shutdownNow();
491613
handle.processingCompletion();
492614
}
493615
TestTracingProvider.assertAllTracesWereClosed();

0 commit comments

Comments
 (0)