-
Notifications
You must be signed in to change notification settings - Fork 335
Alexeyk/runnable future fix netty #11350
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
47cc821
77b14f2
8d73579
e8b08b4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -10,6 +10,7 @@ | |
| import static datadog.trace.instrumentation.java.concurrent.ConcurrentInstrumentationNames.EXECUTOR_INSTRUMENTATION_NAME; | ||
| import static net.bytebuddy.matcher.ElementMatchers.isDeclaredBy; | ||
| import static net.bytebuddy.matcher.ElementMatchers.isTypeInitializer; | ||
| import static net.bytebuddy.matcher.ElementMatchers.takesArguments; | ||
|
|
||
| import com.google.auto.service.AutoService; | ||
| import datadog.trace.agent.tooling.Instrumenter; | ||
|
|
@@ -45,6 +46,7 @@ public AsyncPropagatingDisableInstrumentation() { | |
| nameEndsWith("io.grpc.internal.ManagedChannelImpl"); | ||
| private static final ElementMatcher<TypeDescription> REACTOR_DISABLED_TYPE_INITIALIZERS = | ||
| namedOneOf("reactor.core.scheduler.SchedulerTask", "reactor.core.scheduler.WorkerTask"); | ||
| private static final String VERTX_IMPL = "io.vertx.core.impl.VertxImpl"; | ||
|
|
||
| @Override | ||
| public boolean onlyMatchKnownTypes() { | ||
|
|
@@ -77,7 +79,8 @@ public String[] knownMatchingTypes() { | |
| "net.sf.ehcache.store.disk.DiskStorageFactory", | ||
| "org.springframework.jms.listener.DefaultMessageListenerContainer", | ||
| "org.apache.activemq.broker.TransactionBroker", | ||
| "com.mongodb.internal.connection.DefaultConnectionPool$AsyncWorkManager" | ||
| "com.mongodb.internal.connection.DefaultConnectionPool$AsyncWorkManager", | ||
| VERTX_IMPL | ||
| }; | ||
| } | ||
|
|
||
|
|
@@ -170,6 +173,19 @@ public void methodAdvice(MethodTransformer transformer) { | |
| named( | ||
| "com.mongodb.internal.connection.DefaultConnectionPool$AsyncWorkManager"))), | ||
| advice); | ||
| // Vert.x can schedule long-running internal timers while a request span is active. | ||
| // Suppress propagation only for Vert.x-owned timer handlers so user timers still keep context. | ||
| String disableVertxInternalTimerAdvice = | ||
| getClass().getName() + "$DisableVertxInternalTimerAdvice"; | ||
| transformer.applyAdvice( | ||
| named("scheduleTimeout").and(isDeclaredBy(named(VERTX_IMPL))).and(takesArguments(4)), | ||
| disableVertxInternalTimerAdvice); | ||
| transformer.applyAdvice( | ||
| named("scheduleTimeout").and(isDeclaredBy(named(VERTX_IMPL))).and(takesArguments(6)), | ||
| disableVertxInternalTimerAdvice); | ||
| transformer.applyAdvice( | ||
| named("scheduleTimeout").and(isDeclaredBy(named(VERTX_IMPL))).and(takesArguments(7)), | ||
| disableVertxInternalTimerAdvice); | ||
| transformer.applyAdvice( | ||
| isTypeInitializer().and(isDeclaredBy(REACTOR_DISABLED_TYPE_INITIALIZERS)), advice); | ||
| } | ||
|
|
@@ -192,4 +208,53 @@ public static void after(@Advice.Enter boolean wasDisabled) { | |
| } | ||
| } | ||
| } | ||
|
|
||
| public static class DisableVertxInternalTimerAdvice { | ||
|
|
||
| @Advice.OnMethodEnter(suppress = Throwable.class) | ||
| public static boolean before(@Advice.AllArguments Object[] args) { | ||
| for (Object arg : args) { | ||
| if (isVertxInternalHandler(arg)) { | ||
| return DisableAsyncAdvice.before(); | ||
| } | ||
| } | ||
| return false; | ||
| } | ||
|
|
||
| @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) | ||
| public static void after(@Advice.Enter boolean wasDisabled) { | ||
| DisableAsyncAdvice.after(wasDisabled); | ||
| } | ||
|
|
||
| private static boolean isVertxInternalHandler(Object arg) { | ||
| if (arg == null || !arg.getClass().getName().startsWith("io.vertx.")) { | ||
| return false; | ||
| } | ||
| return implementsVertxHandler(arg.getClass()); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this just checking Even so, I think isAssignableFrom would likely be better. |
||
| } | ||
|
|
||
| private static boolean implementsVertxHandler(Class<?> clazz) { | ||
| while (clazz != null) { | ||
| for (Class<?> iface : clazz.getInterfaces()) { | ||
| if (isVertxHandler(iface)) { | ||
| return true; | ||
| } | ||
| } | ||
| clazz = clazz.getSuperclass(); | ||
| } | ||
| return false; | ||
| } | ||
|
|
||
| private static boolean isVertxHandler(Class<?> iface) { | ||
| if ("io.vertx.core.Handler".equals(iface.getName())) { | ||
| return true; | ||
| } | ||
| for (Class<?> parent : iface.getInterfaces()) { | ||
| if (isVertxHandler(parent)) { | ||
| return true; | ||
| } | ||
| } | ||
| return false; | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,6 +22,7 @@ | |
| import datadog.trace.agent.tooling.ExcludeFilterProvider; | ||
| import datadog.trace.agent.tooling.Instrumenter; | ||
| import datadog.trace.agent.tooling.InstrumenterModule; | ||
| import datadog.trace.bootstrap.ContextStore; | ||
| import datadog.trace.bootstrap.InstrumentationContext; | ||
| import datadog.trace.bootstrap.instrumentation.api.AgentScope; | ||
| import datadog.trace.bootstrap.instrumentation.java.concurrent.ExcludeFilter; | ||
|
|
@@ -31,6 +32,8 @@ | |
| import java.util.Map; | ||
| import java.util.concurrent.Callable; | ||
| import java.util.concurrent.RunnableFuture; | ||
| import java.util.concurrent.ScheduledFuture; | ||
| import java.util.concurrent.TimeUnit; | ||
| import net.bytebuddy.asm.Advice; | ||
| import net.bytebuddy.description.type.TypeDescription; | ||
| import net.bytebuddy.matcher.ElementMatcher; | ||
|
|
@@ -140,14 +143,31 @@ public static final class Construct { | |
|
|
||
| @Advice.OnMethodExit | ||
| public static <T> void captureScope(@Advice.This RunnableFuture<T> task) { | ||
| capture(InstrumentationContext.get(RunnableFuture.class, State.class), task); | ||
| ContextStore<RunnableFuture, State> contextStore = | ||
| InstrumentationContext.get(RunnableFuture.class, State.class); | ||
| capture(contextStore, task); | ||
| } | ||
| } | ||
|
|
||
| public static final class Run { | ||
| @Advice.OnMethodEnter | ||
| public static <T> AgentScope activate(@Advice.This RunnableFuture<T> task) { | ||
| return startTaskScope(InstrumentationContext.get(RunnableFuture.class, State.class), task); | ||
| ContextStore<RunnableFuture, State> contextStore = | ||
| InstrumentationContext.get(RunnableFuture.class, State.class); | ||
|
|
||
| // Netty 4.1.44+ invokes ScheduledFutureTask.run() once to self-enqueue | ||
| // delayed tasks scheduled from outside the event loop, then again when | ||
| // the deadline elapses. Only skip the first call when there is a captured | ||
| // continuation to preserve for the actual fire. | ||
| State state = contextStore.get(task); | ||
| if (task instanceof ScheduledFuture | ||
| && task.getClass().getName().endsWith(".netty.util.concurrent.ScheduledFutureTask")) { | ||
| long delayNanos = ((ScheduledFuture<?>) task).getDelay(TimeUnit.NANOSECONDS); | ||
| if (delayNanos > 0 && state != null && state.getSpan() != null) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. question: is delayNanos always > 0 at this moment? |
||
| return null; | ||
| } | ||
| } | ||
| return startTaskScope(state); | ||
| } | ||
|
|
||
| @Advice.OnMethodExit(onThrowable = Throwable.class) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,149 @@ | ||
| package executor; | ||
|
|
||
| import static datadog.trace.agent.test.assertions.SpanMatcher.span; | ||
| import static datadog.trace.agent.test.assertions.TraceMatcher.SORT_BY_START_TIME; | ||
| import static datadog.trace.agent.test.assertions.TraceMatcher.trace; | ||
| import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan; | ||
| import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan; | ||
| import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan; | ||
| import static java.util.concurrent.TimeUnit.MILLISECONDS; | ||
| import static java.util.concurrent.TimeUnit.SECONDS; | ||
| import static org.junit.jupiter.api.Assertions.assertFalse; | ||
| import static org.junit.jupiter.api.Assertions.assertTrue; | ||
| import static org.junit.jupiter.api.Assumptions.assumeTrue; | ||
|
|
||
| import datadog.trace.agent.test.AbstractInstrumentationTest; | ||
| import datadog.trace.api.Trace; | ||
| import datadog.trace.bootstrap.instrumentation.api.AgentScope; | ||
| import datadog.trace.bootstrap.instrumentation.api.AgentSpan; | ||
| import io.netty.util.Version; | ||
| import io.netty.util.concurrent.DefaultEventExecutorGroup; | ||
| import io.netty.util.concurrent.EventExecutor; | ||
| import java.util.Map; | ||
| import java.util.concurrent.CountDownLatch; | ||
| import java.util.concurrent.atomic.AtomicBoolean; | ||
| import org.junit.jupiter.api.Test; | ||
|
|
||
| class NettyScheduledFutureTaskContextPropagationTest extends AbstractInstrumentationTest { | ||
| @Test | ||
| void testNettyVersionCompatible() { | ||
| assertFalse(isCompatibleVersion("4.0.0.Final")); | ||
| assertFalse(isCompatibleVersion("4.1.9.Final")); | ||
| assertFalse(isCompatibleVersion("4.1.43.Final")); | ||
| assertTrue(isCompatibleVersion("4.1.44.Final")); | ||
| assertTrue(isCompatibleVersion("4.2.13.Final")); | ||
| assertTrue(isCompatibleVersion("5.0.0.Alpha2")); | ||
| assertTrue(isCompatibleVersion("5.0.0.Final")); | ||
| } | ||
|
|
||
| @Test | ||
| void testDelayedScheduledFutureTaskActivatesCapturedContinuationWhenDelayExpires() | ||
| throws Exception { | ||
| assumeTrue(hasCompatibleVersion()); | ||
|
|
||
| try (CloseableDefaultEventExecutorGroup group = new CloseableDefaultEventExecutorGroup()) { | ||
| EventExecutor executor = group.next(); | ||
| BlockingTraceableTask task = new BlockingTraceableTask(); | ||
| AgentSpan parent = startSpan("test", "parent"); | ||
|
|
||
| // Netty 4.1.44+ calls ScheduledFutureTask.run() once while enqueueing a delayed task and | ||
| // again when the delay expires. The continuation captured here must survive the enqueue run. | ||
| try (AgentScope ignored = activateSpan(parent)) { | ||
| executor.schedule(task, 50, MILLISECONDS); | ||
| } finally { | ||
| parent.finish(); | ||
| } | ||
|
|
||
| // When the delayed task actually runs, instrumentation should activate the captured | ||
| // continuation so traced work in the task remains a child of the scheduling span. | ||
| assertTrue(task.started.await(5, SECONDS)); | ||
| try { | ||
| assertTrue(task.sawActiveSpan.get()); | ||
| } finally { | ||
| task.proceed.countDown(); | ||
| } | ||
| assertTrue(task.finished.await(5, SECONDS)); | ||
|
|
||
| assertTraces( | ||
| trace( | ||
| SORT_BY_START_TIME, | ||
| span().root().operationName("parent"), | ||
| span().childOfPrevious().operationName("asyncChild"))); | ||
| } | ||
| } | ||
|
|
||
| private static boolean hasCompatibleVersion() { | ||
| for (Map.Entry<String, Version> entry : Version.identify().entrySet()) { | ||
| if (entry.getKey().startsWith("netty-")) { | ||
| return isCompatibleVersion(entry.getValue().artifactVersion()); | ||
| } | ||
| } | ||
| return false; | ||
| } | ||
|
|
||
| private static boolean isCompatibleVersion(String version) { | ||
| String[] parts = version.split("\\."); | ||
| if (parts.length < 3) { | ||
| return false; | ||
| } | ||
|
|
||
| int major = Integer.parseInt(parts[0]); | ||
| int minor = Integer.parseInt(parts[1]); | ||
| int patch = Integer.parseInt(parts[2]); | ||
|
|
||
| if (major > 4) { | ||
| return true; | ||
| } | ||
|
|
||
| if (major != 4) { | ||
| return false; | ||
| } | ||
|
|
||
| if (minor > 1) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think that this is missing otherwise will make it pass things like |
||
| return true; | ||
| } | ||
|
|
||
| // Since 4.1.44+ Netty uses a self-enqueue path for delayed tasks. | ||
| return patch >= 44; | ||
| } | ||
|
|
||
| private static final class CloseableDefaultEventExecutorGroup extends DefaultEventExecutorGroup | ||
| implements AutoCloseable { | ||
| private CloseableDefaultEventExecutorGroup() { | ||
| super(1); | ||
| } | ||
|
|
||
| @Override | ||
| public void close() { | ||
| try { | ||
| shutdownGracefully(0, 1, SECONDS).sync(); | ||
| } catch (InterruptedException e) { | ||
| Thread.currentThread().interrupt(); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| private static final class BlockingTraceableTask implements Runnable { | ||
| private final CountDownLatch started = new CountDownLatch(1); | ||
| private final CountDownLatch proceed = new CountDownLatch(1); | ||
| private final CountDownLatch finished = new CountDownLatch(1); | ||
| private final AtomicBoolean sawActiveSpan = new AtomicBoolean(); | ||
|
|
||
| @Override | ||
| public void run() { | ||
| sawActiveSpan.set(activeSpan() != null); | ||
| started.countDown(); | ||
| try { | ||
| proceed.await(5, SECONDS); | ||
| asyncChild(); | ||
| } catch (InterruptedException e) { | ||
| Thread.currentThread().interrupt(); | ||
| } finally { | ||
| finished.countDown(); | ||
| } | ||
| } | ||
|
|
||
| @Trace(operationName = "asyncChild") | ||
| private void asyncChild() {} | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How often is this called? I'm worried that the reflective type checking could negatively impact throughput.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the usual
isInstance/isAssignableFrommethods are not available then we should at least consider using aClassValueto cache the result per-lookup-class.Also this advice snippet is referring to a method in the surrounding advice class, which is a big no-no.
This would likely result in an error at runtime because the advice bytecode is lifted and patched into the library's class. It won't have access to the surrounding methods in this class because it is no longer part of the class (it has literally been cut out and pasted somewhere else.)
Even if somehow it did find the original class (say you mistakenly added the advice class as a helper) - that would result in the advice class getting pinned, and with it the entire instrumentation class-loader which means advice and instrumentation classes wouldn't unload.
The only code that advice snippets should call are:
internal-apietc.)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can see this in the failing instrumentation tests: