Skip to content

Commit c77fa10

Browse files
committed
Fix multicast context propagation race in reactor/reactive-streams
1 parent 3d19e7d commit c77fa10

23 files changed

Lines changed: 267 additions & 41 deletions

File tree

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package datadog.trace.bootstrap.instrumentation.reactivestreams;
2+
3+
import datadog.context.Context;
4+
5+
/**
6+
* Value of the {@code (Publisher, HandoffContext)} store used to hand a context from a publisher's
7+
* subscribe to that publisher's own subscriber (or a blocking call).
8+
*
9+
* <p>{@link #threadConfined} deposits are only adopted on the producing thread. The reactor-core
10+
* bridge uses them because a shared publisher — a multicast/replay {@code Sinks.Many} with several
11+
* consumers — is subscribed concurrently, and keying by publisher identity alone would let one
12+
* consumer adopt another's context. This is safe because a subscribe chain runs synchronously on
13+
* one thread. {@link #anyThread} deposits are adopted anywhere, for producers (resilience4j,
14+
* spring-webflux, spring-messaging) that attach to a unique publisher subscribed later, possibly on
15+
* another thread.
16+
*/
17+
public final class HandoffContext {
18+
19+
private static final long ANY_THREAD = 0L;
20+
21+
private final long threadId;
22+
private final Context context;
23+
24+
private HandoffContext(final Context context, final long threadId) {
25+
this.context = context;
26+
this.threadId = threadId;
27+
}
28+
29+
public static HandoffContext anyThread(final Context context) {
30+
return new HandoffContext(context, ANY_THREAD);
31+
}
32+
33+
public static HandoffContext threadConfined(final Context context) {
34+
return new HandoffContext(context, Thread.currentThread().getId());
35+
}
36+
37+
/** The context, or {@code null} if this is a thread-confined deposit read on another thread. */
38+
public Context contextForCurrentThread() {
39+
return threadId == ANY_THREAD || threadId == Thread.currentThread().getId() ? context : null;
40+
}
41+
}

dd-java-agent/instrumentation/reactive-streams-1.0/src/main/java/datadog/trace/instrumentation/reactivestreams/PublisherInstrumentation.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import datadog.context.ContextScope;
1414
import datadog.trace.agent.tooling.Instrumenter;
1515
import datadog.trace.bootstrap.InstrumentationContext;
16+
import datadog.trace.bootstrap.instrumentation.reactivestreams.HandoffContext;
1617
import net.bytebuddy.asm.Advice;
1718
import net.bytebuddy.description.type.TypeDescription;
1819
import net.bytebuddy.matcher.ElementMatcher;
@@ -55,7 +56,7 @@ public static ContextScope onSubscribe(
5556
return ReactiveStreamsContextPropagation.captureOnSubscribe(
5657
self,
5758
s,
58-
InstrumentationContext.get(Publisher.class, Context.class),
59+
InstrumentationContext.get(Publisher.class, HandoffContext.class),
5960
InstrumentationContext.get(Subscriber.class, Context.class));
6061
}
6162

dd-java-agent/instrumentation/reactive-streams-1.0/src/main/java/datadog/trace/instrumentation/reactivestreams/ReactiveStreamsContextPropagation.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import datadog.context.Context;
44
import datadog.context.ContextScope;
55
import datadog.trace.bootstrap.ContextStore;
6+
import datadog.trace.bootstrap.instrumentation.reactivestreams.HandoffContext;
67
import org.reactivestreams.Publisher;
78
import org.reactivestreams.Subscriber;
89

@@ -13,7 +14,7 @@ private ReactiveStreamsContextPropagation() {}
1314
public static ContextScope captureOnSubscribe(
1415
final Publisher<?> publisher,
1516
final Subscriber<?> subscriber,
16-
final ContextStore<Publisher, Context> publisherContexts,
17+
final ContextStore<Publisher, HandoffContext> publisherContexts,
1718
final ContextStore<Subscriber, Context> subscriberContexts) {
1819
// Don't consume the publisher context until we've verified the subscriber is non-null. For
1920
// subscribe(null), Reactive Streams mandates an NPE after this advice returns. Consuming the
@@ -22,7 +23,8 @@ public static ContextScope captureOnSubscribe(
2223
return null;
2324
}
2425

25-
final Context contextFromPublisher = publisherContexts.remove(publisher);
26+
final HandoffContext handoff = publisherContexts.remove(publisher);
27+
final Context contextFromPublisher = handoff == null ? null : handoff.contextForCurrentThread();
2628
final Context activeContext = Context.current();
2729
final Context context = contextFromPublisher != null ? contextFromPublisher : activeContext;
2830
if (context == Context.root()) {

dd-java-agent/instrumentation/reactive-streams-1.0/src/main/java/datadog/trace/instrumentation/reactivestreams/ReactiveStreamsModule.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import datadog.context.Context;
77
import datadog.trace.agent.tooling.Instrumenter;
88
import datadog.trace.agent.tooling.InstrumenterModule;
9+
import datadog.trace.bootstrap.instrumentation.reactivestreams.HandoffContext;
910
import java.util.HashMap;
1011
import java.util.List;
1112
import java.util.Map;
@@ -31,7 +32,7 @@ public String[] helperClassNames() {
3132
public Map<String, String> contextStore() {
3233
final Map<String, String> store = new HashMap<>();
3334
store.put("org.reactivestreams.Subscriber", Context.class.getName());
34-
store.put("org.reactivestreams.Publisher", Context.class.getName());
35+
store.put("org.reactivestreams.Publisher", HandoffContext.class.getName());
3536
return store;
3637
}
3738

dd-java-agent/instrumentation/reactive-streams-1.0/src/test/java/datadog/trace/instrumentation/reactivestreams/ReactiveStreamsContextPropagationTest.java

Lines changed: 60 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import datadog.context.ContextKey;
99
import datadog.context.ContextScope;
1010
import datadog.trace.bootstrap.ContextStore;
11+
import datadog.trace.bootstrap.instrumentation.reactivestreams.HandoffContext;
1112
import java.util.IdentityHashMap;
1213
import java.util.Map;
1314
import org.junit.jupiter.api.Test;
@@ -23,12 +24,12 @@ class ReactiveStreamsContextPropagationTest {
2324
void publisherCapturedContextOverridesActiveContext() {
2425
final Publisher<Object> publisher = subscriber -> {};
2526
final Subscriber<Object> subscriber = new NoopSubscriber();
26-
final ContextStore<Publisher, Context> publisherContexts = new MapContextStore<>();
27+
final ContextStore<Publisher, HandoffContext> publisherContexts = new MapContextStore<>();
2728
final ContextStore<Subscriber, Context> subscriberContexts = new MapContextStore<>();
2829

29-
// A context was captured on the publisher (e.g. at assembly / cross-thread subscribe).
30+
// A context was handed off on the publisher, confined to this (the producing) thread.
3031
final Context captured = Context.root().with(KEY, "captured");
31-
publisherContexts.put(publisher, captured);
32+
publisherContexts.put(publisher, HandoffContext.threadConfined(captured));
3233

3334
// The current thread already carries a different, non-root active context.
3435
final Context active = Context.root().with(KEY, "active");
@@ -52,11 +53,66 @@ void publisherCapturedContextOverridesActiveContext() {
5253
assertSame(active, Context.current());
5354
}
5455

55-
// The captured context is remembered for the subscriber, and removed from the publisher store.
56+
// The captured context is remembered for the subscriber, and consumed from the publisher store.
5657
assertSame(captured, subscriberContexts.get(subscriber));
5758
assertNull(publisherContexts.get(publisher));
5859
}
5960

61+
@Test
62+
void publisherContextFromAnotherThreadIsIgnored() throws InterruptedException {
63+
// A thread-confined deposit from another thread (concurrent multicast subscribe) must be
64+
// ignored.
65+
final Publisher<Object> publisher = subscriber -> {};
66+
final Subscriber<Object> subscriber = new NoopSubscriber();
67+
final ContextStore<Publisher, HandoffContext> publisherContexts = new MapContextStore<>();
68+
final ContextStore<Subscriber, Context> subscriberContexts = new MapContextStore<>();
69+
70+
final Context foreign = Context.root().with(KEY, "foreign");
71+
final Thread producer =
72+
new Thread(() -> publisherContexts.put(publisher, HandoffContext.threadConfined(foreign)));
73+
producer.start();
74+
producer.join();
75+
76+
final Context active = Context.root().with(KEY, "active");
77+
try (ContextScope activeScope = active.attach()) {
78+
final ContextScope scope =
79+
ReactiveStreamsContextPropagation.captureOnSubscribe(
80+
publisher, subscriber, publisherContexts, subscriberContexts);
81+
if (scope != null) {
82+
scope.close();
83+
}
84+
}
85+
86+
// The foreign deposit is ignored; the subscriber keeps this thread's active context.
87+
assertSame(active, subscriberContexts.get(subscriber));
88+
}
89+
90+
@Test
91+
void anyThreadPublisherContextIsAdoptedAcrossThreads() throws InterruptedException {
92+
// An any-thread deposit (resilience4j/spring-messaging: attached early, subscribed later) is
93+
// adopted even when the subscribe runs on a different thread.
94+
final Publisher<Object> publisher = subscriber -> {};
95+
final Subscriber<Object> subscriber = new NoopSubscriber();
96+
final ContextStore<Publisher, HandoffContext> publisherContexts = new MapContextStore<>();
97+
final ContextStore<Subscriber, Context> subscriberContexts = new MapContextStore<>();
98+
99+
final Context captured = Context.root().with(KEY, "captured");
100+
final Thread producer =
101+
new Thread(() -> publisherContexts.put(publisher, HandoffContext.anyThread(captured)));
102+
producer.start();
103+
producer.join();
104+
105+
final ContextScope scope =
106+
ReactiveStreamsContextPropagation.captureOnSubscribe(
107+
publisher, subscriber, publisherContexts, subscriberContexts);
108+
if (scope != null) {
109+
scope.close();
110+
}
111+
112+
// The any-thread deposit is adopted despite the cross-thread subscribe.
113+
assertSame(captured, subscriberContexts.get(subscriber));
114+
}
115+
60116
@Test
61117
void signalActivationIsSkippedWhenAnotherContextIsActive() {
62118
final Subscriber<Object> subscriber = new NoopSubscriber();

dd-java-agent/instrumentation/reactor-core-3.1/src/latestDepTest/groovy/ReactorCoreTest.groovy

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -383,6 +383,50 @@ class ReactorCoreTest extends InstrumentationSpecification {
383383
"immediate" | Schedulers.immediate()
384384
}
385385

386+
def "subscribe-time context propagates across threads with #name"() {
387+
// Guards that the thread-confined publisher hand-off (HandoffContext) does not break cross-thread
388+
// propagation: the @Trace "addOne" spans run in map's onNext on scheduler threads and must still
389+
// be children of the subscribe-time parent.
390+
when:
391+
runUnderTrace("parent") {
392+
pipeline.call().collectList().block()
393+
}
394+
395+
then:
396+
assertTraces(1) {
397+
trace(3) {
398+
sortSpansByStart()
399+
span {
400+
operationName "parent"
401+
parent()
402+
}
403+
span {
404+
operationName "addOne"
405+
childOf span(0)
406+
}
407+
span {
408+
operationName "addOne"
409+
childOf span(0)
410+
}
411+
}
412+
}
413+
414+
where:
415+
name | pipeline
416+
"publishOn" | {
417+
Flux.just(1, 2).publishOn(Schedulers.parallel()).map(addOne)
418+
}
419+
"subscribeOn" | {
420+
Flux.just(1, 2).subscribeOn(Schedulers.single()).map(addOne)
421+
}
422+
"subscribeOn+publishOn" | {
423+
Flux.just(1, 2)
424+
.subscribeOn(Schedulers.single())
425+
.publishOn(Schedulers.parallel())
426+
.map(addOne)
427+
}
428+
}
429+
386430
def "Context propagation through reactor context with span #spanType"() {
387431
when:
388432
runUnderTrace("parent", {

dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/BlockingPublisherInstrumentation.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,10 @@
55
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.namedOneOf;
66
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
77

8-
import datadog.context.Context;
98
import datadog.context.ContextScope;
109
import datadog.trace.agent.tooling.Instrumenter;
1110
import datadog.trace.bootstrap.InstrumentationContext;
11+
import datadog.trace.bootstrap.instrumentation.reactivestreams.HandoffContext;
1212
import net.bytebuddy.asm.Advice;
1313
import net.bytebuddy.description.type.TypeDescription;
1414
import net.bytebuddy.matcher.ElementMatcher;
@@ -42,7 +42,7 @@ public static class BlockingAdvice {
4242
@Advice.OnMethodEnter(suppress = Throwable.class)
4343
public static ContextScope before(@Advice.This final Publisher self) {
4444
return ReactorContextBridge.activateForBlocking(
45-
self, InstrumentationContext.get(Publisher.class, Context.class));
45+
self, InstrumentationContext.get(Publisher.class, HandoffContext.class));
4646
}
4747

4848
@Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class)

dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/CorePublisherInstrumentation.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import datadog.context.ContextScope;
1414
import datadog.trace.agent.tooling.Instrumenter;
1515
import datadog.trace.bootstrap.InstrumentationContext;
16+
import datadog.trace.bootstrap.instrumentation.reactivestreams.HandoffContext;
1617
import net.bytebuddy.asm.Advice;
1718
import net.bytebuddy.description.type.TypeDescription;
1819
import net.bytebuddy.matcher.ElementMatcher;
@@ -58,7 +59,7 @@ public static ContextScope before(
5859
return ReactorContextBridge.captureOnSubscribe(
5960
self,
6061
subscriber,
61-
InstrumentationContext.get(Publisher.class, Context.class),
62+
InstrumentationContext.get(Publisher.class, HandoffContext.class),
6263
InstrumentationContext.get(Subscriber.class, Context.class));
6364
}
6465

dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/OptimizableOperatorInstrumentation.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import datadog.context.Context;
1111
import datadog.trace.agent.tooling.Instrumenter;
1212
import datadog.trace.bootstrap.InstrumentationContext;
13+
import datadog.trace.bootstrap.instrumentation.reactivestreams.HandoffContext;
1314
import net.bytebuddy.asm.Advice;
1415
import net.bytebuddy.description.type.TypeDescription;
1516
import net.bytebuddy.matcher.ElementMatcher;
@@ -54,7 +55,7 @@ public static void onSubscribe(
5455
self,
5556
arg,
5657
s,
57-
InstrumentationContext.get(Publisher.class, Context.class),
58+
InstrumentationContext.get(Publisher.class, HandoffContext.class),
5859
InstrumentationContext.get(Subscriber.class, Context.class));
5960
}
6061
}

dd-java-agent/instrumentation/reactor-core-3.1/src/main/java/datadog/trace/instrumentation/reactor/core/ReactorContextBridge.java

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import datadog.context.ContextScope;
55
import datadog.trace.bootstrap.ContextStore;
66
import datadog.trace.bootstrap.instrumentation.api.WithAgentSpan;
7+
import datadog.trace.bootstrap.instrumentation.reactivestreams.HandoffContext;
78
import org.reactivestreams.Publisher;
89
import org.reactivestreams.Subscriber;
910
import reactor.core.CoreSubscriber;
@@ -48,38 +49,43 @@ public static ContextScope activateStoredContext(
4849
/**
4950
* On subscribe, hands the explicit context recorded for {@code subscriber} (a context-writing
5051
* subscriber) to the publisher store so the reactive-streams layer can propagate it, and attaches
51-
* it.
52+
* it. The deposit is {@linkplain HandoffContext#threadConfined thread-confined} since the
53+
* subscribed publisher may be a concurrently-subscribed shared sink.
5254
*/
5355
public static ContextScope captureOnSubscribe(
5456
final Publisher<?> publisher,
5557
final Subscriber<?> subscriber,
56-
final ContextStore<Publisher, Context> publisherContexts,
58+
final ContextStore<Publisher, HandoffContext> publisherContexts,
5759
final ContextStore<Subscriber, Context> subscriberContexts) {
5860
final Context context = subscriberContexts.get(subscriber);
5961
if (context == null) {
6062
return null;
6163
}
6264

63-
publisherContexts.put(publisher, context);
65+
publisherContexts.put(publisher, HandoffContext.threadConfined(context));
6466
return attachIfRequired(context, Context.current());
6567
}
6668

6769
public static ContextScope activateForBlocking(
68-
final Publisher<?> publisher, final ContextStore<Publisher, Context> publisherContexts) {
69-
return attachIfRequired(publisherContexts.get(publisher), Context.current());
70+
final Publisher<?> publisher,
71+
final ContextStore<Publisher, HandoffContext> publisherContexts) {
72+
final HandoffContext handoff = publisherContexts.get(publisher);
73+
return attachIfRequired(
74+
handoff == null ? null : handoff.contextForCurrentThread(), Context.current());
7075
}
7176

7277
public static void transferToOptimizedSubscriber(
7378
final Publisher<?> publisher,
7479
final Subscriber<?> source,
7580
final Subscriber<?> target,
76-
final ContextStore<Publisher, Context> publisherContexts,
81+
final ContextStore<Publisher, HandoffContext> publisherContexts,
7782
final ContextStore<Subscriber, Context> subscriberContexts) {
7883
if (source == null || target == null) {
7984
return;
8085
}
8186

82-
Context context = publisherContexts.get(publisher);
87+
final HandoffContext handoff = publisherContexts.get(publisher);
88+
Context context = handoff == null ? null : handoff.contextForCurrentThread();
8389
if (context == null) {
8490
context = subscriberContexts.get(source);
8591
}

0 commit comments

Comments
 (0)