diff --git a/.claude/skills/investigate-continuation-leakage/SKILL.md b/.claude/skills/investigate-continuation-leakage/SKILL.md new file mode 100644 index 00000000000..b3985e3239a --- /dev/null +++ b/.claude/skills/investigate-continuation-leakage/SKILL.md @@ -0,0 +1,143 @@ +--- +name: investigate-continuation-leakage +description: > + Investigate scope-continuation leaks in an instrumentation. Use when asked to "investigate + continuation leakage", "find scope leaks", "why does this integration leak continuations", + "debug a leaked trace / pendingReferenceCount", or when a test needed strictTraceWrites(false) + to pass. Runs the chosen instrumentation test with the scope-continuation diagnostic enabled, + reads the logged full timeline, recaps the findings, and renders a Gantt or DAG (works whether + or not anything leaked). +user-invocable: true +context: fork +allowed-tools: + - Bash + - Read + - Edit + - Glob + - Grep + - AskUserQuestion +--- + +# Investigate scope-continuation leakage + +dd-trace-java moves trace scopes across threads via *continuations*: a scope is **captured** on +one thread (`ScopeContinuation`, bumping `PendingTrace.pendingReferenceCount`) and later +**activated** and/or **cancelled** on another. A continuation that is never resolved (the classic +leak), resolved twice, resolved after its root span was written, or activated after resolve, keeps +a trace alive or drops a late span — and in tests forces `strictTraceWrites(false)`, masking the +bug instead of locating it. + +The test-time diagnostic in `datadog.trace.agent.test.scopediag` records the lifecycle and logs a +full timeline of every continuation and scope (regardless of whether anything leaked). This skill +drives that diagnostic, reads the logged timeline, recaps it in plain language, and renders a +diagram. **The Java code no longer renders Gantt/Mermaid — you (the LLM) produce the diagram from +the timeline.** + +Background: `docs/superpowers/specs/2026-06-10-scope-continuation-leak-diagnostic-design.md`. +Test-run conventions: `docs/how_to_test.md`. + +## Step 1 — Select the target + +Identify the suspect instrumentation. If the user named it, resolve the module directory under +`dd-java-agent/instrumentation//-` with Glob. If ambiguous, list +the candidate test classes (Glob `**/src/test/**/*Test.{java,groovy}` in the module) and ask the +user which test to run with `AskUserQuestion`. You want one concrete test class (and ideally one +method) plus its Gradle module path, e.g. `:dd-java-agent:instrumentation:google-pubsub-1.116`. + +> **Note:** tracking is now **always-on** for every instrumentation test (`@TrackScopeContinuations` +> sits on the `AbstractInstrumentationTest` / `InstrumentationSpecification` base classes, report-only). +> If that base-class annotation is present, **skip Step 2** — just run the test (Step 3) and read the +> timeline. Only do Step 2 when tracking is *not* already inherited (e.g. the base annotation was +> removed) or you want method-level `failOnLeak=true` enforcement. + +## Step 2 — Enable tracking (only if not already inherited) + +With `Edit`, add the opt-in annotation to the chosen test class (or a single method): + +- Add import `datadog.trace.agent.test.scopediag.TrackScopeContinuations`. +- Annotate the class/method with `@TrackScopeContinuations`. Leave the default `failOnLeak=false` — + you want the report, not a failing test (a red test would still print the report, but the default + keeps the run green so the build doesn't stop early). + +This works for both JUnit 5 Java tests (extension is auto-registered on +`AbstractInstrumentationTest`) and Groovy `InstrumentationSpecification` subclasses. **Record the +exact file path** — you will revert it in Step 7. + +## Step 3 — Run the test, capturing the diagnostic output + +The diagnostic does **not** write a file; at the end of every tracked test it logs the **full +timeline** (`ScopeDiagnosticsReport.renderTimeline()`) — every continuation and scope with its +events, threads, relative timing, and callsites, regardless of whether anything leaked. Run with +output captured: + +```bash +./gradlew :dd-java-agent:instrumentation:-:test --tests '' --info 2>&1 | tee /tmp/scopediag-run.txt +``` + +(For the diagnostic harness's own tests the module is `:dd-java-agent:instrumentation-testing`.) +If the SLF4J line is not visible in console output, read the per-test captured stdout under +`/build/test-results/**/*.xml` (the `` element) or the HTML report under +`/build/reports/tests/`. + +## Step 4 — Collect the diagnostic output + +Grep the captured output for `Scope/continuation timeline` — one block per test. Shape: + +``` +Scope/continuation timeline (N continuations, M scopes; X leaked, Y late, Z double, + W activate-after-resolve | scopes: P never-closed, Q wrong-thread) + +# trace= span= "" src= [ORPHAN] [handoff] {failures} cap->resume= age= + capture +<Δms> @ at + resume +<Δms> @ at <...> + finish +<Δms> @ at <...> (or cancel / DOUBLE / act-fail) + scope# "" open +<Δms> @ close +<Δms> @ (active ) [handoff] {failures} + LEAKED (never finished or cancelled) (only when unresolved) +... +Non-continuation scopes: + scope# ... +``` + +Every record is listed (not just flagged ones), so you can reconstruct the full graph whether or not +anything leaked. `+Δms` is relative to the first recorded event. (`renderSummary()` — the +problem-only view — still backs `assertNoLeaks` failure messages, but the timeline is the feed.) + +## Step 5 — Summarize ("resume") + +Give a plain-language recap: + +- The header counts (leaked / late / double / activate-after-resolve / never-closed / wrong-thread). +- The dominant flow: where continuations are captured (callsite/thread) and where they're resumed / + resolved (thread), plus any thread handoffs. +- For each flagged record (if any): its failure set and capture/open callsite (cite `file:line`). +- A one-line hypothesis when there's a problem: which advice captured the continuation and where it + should have resolved it. + +## Step 6 — Visualize (auto-pick, user may override) + +Build the diagram from the **timeline** (works whether or not there are leaks): + +- **Gantt** — when the signal is **temporal / cross-thread** (thread handoffs, late-after-root, + never-closed, or the user wants the time view). Mermaid `gantt`, one `section` per thread; a bar + per continuation from capture→resolve and per scope from open→close using the `+Δms` offsets. Mark + leaks / never-closed `crit` to the window end; late / wrong-thread `active`; resolved-on-time + `done`; capture-only points as `milestone`. +- **DAG** — when the signal is **structural / ownership** (orphans, double-finish, + activate-after-resolve, or continuation→scope lineage). Mermaid `flowchart LR`: a node per + continuation (`#seq spanName`), its spawned scopes (linked via the nested `scope#` lines), edges + capture→resume→resolve labelled with thread + `+Δms`. Color leaked / double red, late amber, + resolved green. + +If there are no problems, the diagram simply shows the healthy capture→continue→resolve flow (all +green) — that is the expected "regardless of leak" output. If unsure which shape, ask with +`AskUserQuestion`. + +## Step 7 — Revert + +Undo the temporary annotation so the working tree is clean: + +```bash +git checkout -- +``` + +Report: the findings summary, the diagram, and that the annotation was reverted. diff --git a/dd-java-agent/instrumentation-testing/src/main/groovy/datadog/trace/agent/test/InstrumentationSpecification.groovy b/dd-java-agent/instrumentation-testing/src/main/groovy/datadog/trace/agent/test/InstrumentationSpecification.groovy index 8f1a6cd6df4..d9f12b8a215 100644 --- a/dd-java-agent/instrumentation-testing/src/main/groovy/datadog/trace/agent/test/InstrumentationSpecification.groovy +++ b/dd-java-agent/instrumentation-testing/src/main/groovy/datadog/trace/agent/test/InstrumentationSpecification.groovy @@ -34,6 +34,8 @@ import datadog.metrics.impl.DDSketchHistograms import datadog.metrics.impl.MonitoringImpl import datadog.trace.agent.test.asserts.ListWriterAssert import datadog.trace.agent.test.asserts.TagsAssert +import datadog.trace.agent.test.scopediag.ScopeDiagnostics +import datadog.trace.agent.test.scopediag.TrackScopeContinuations import datadog.trace.agent.test.datastreams.MockFeaturesDiscovery import datadog.trace.agent.test.datastreams.RecordingDatastreamsPayloadWriter import datadog.trace.agent.tooling.AgentInstaller @@ -112,6 +114,9 @@ import spock.lang.Shared @SuppressWarnings('UnnecessaryDotClass') @ExtendWith(TestClassShadowingExtension.class) @ExtendWith(TooManyInvocationsErrorHandler.class) +// Track scope continuations for every instrumentation spec (report-only: failOnLeak defaults to +// false). @Inherited, so all specs get the full timeline dumped after each test. +@TrackScopeContinuations abstract class InstrumentationSpecification extends DDSpecification implements AgentBuilder.Listener { private static final long TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(20) @@ -466,6 +471,9 @@ abstract class InstrumentationSpecification extends DDSpecification implements A } TEST_WRITER.start() + if (scopeDiagConfig() != null) { + ScopeDiagnostics.startRecording() + } TEST_DATA_STREAMS_WRITER.clear() TEST_DATA_STREAMS_MONITORING.clear() @@ -499,6 +507,8 @@ abstract class InstrumentationSpecification extends DDSpecification implements A } TEST_TRACER.flush() + reportScopeDiagnostics() + def util = new MockUtil() util.detachMock(STATS_D_CLIENT) @@ -522,6 +532,34 @@ abstract class InstrumentationSpecification extends DDSpecification implements A assert InstrumentationErrors.noErrors(): InstrumentationErrors.describeErrors() } + /** Resolves the {@link TrackScopeContinuations} annotation from the feature method or spec class. */ + private TrackScopeContinuations scopeDiagConfig() { + def method = specificationContext?.currentFeature?.featureMethod?.reflection + def ann = method?.getAnnotation(TrackScopeContinuations) + if (ann == null) { + ann = this.class.getAnnotation(TrackScopeContinuations) + } + return ann + } + + private void reportScopeDiagnostics() { + def config = scopeDiagConfig() + if (config == null) { + return + } + ScopeDiagnostics.stop() + def report = ScopeDiagnostics.report() + // Always dump the full timeline so a graph/report can be built regardless of leaks. + println(report.renderTimeline()) + try { + if (config.failOnLeak()) { + ScopeDiagnostics.assertNoLeaks() + } + } finally { + ScopeDiagnostics.reset() + } + } + private void doCheckRepeatedFinish() { for (Map.Entry> entry: this.spanFinishLocations.entrySet()) { if (entry.value.size() == 1) { diff --git a/dd-java-agent/instrumentation-testing/src/main/java/datadog/trace/agent/test/AbstractInstrumentationTest.java b/dd-java-agent/instrumentation-testing/src/main/java/datadog/trace/agent/test/AbstractInstrumentationTest.java index 5f724853972..81e6e906f54 100644 --- a/dd-java-agent/instrumentation-testing/src/main/java/datadog/trace/agent/test/AbstractInstrumentationTest.java +++ b/dd-java-agent/instrumentation-testing/src/main/java/datadog/trace/agent/test/AbstractInstrumentationTest.java @@ -7,6 +7,8 @@ import datadog.instrument.classinject.ClassInjector; import datadog.trace.agent.test.assertions.TraceAssertions; import datadog.trace.agent.test.assertions.TraceMatcher; +import datadog.trace.agent.test.scopediag.ScopeDiagnosticsExtension; +import datadog.trace.agent.test.scopediag.TrackScopeContinuations; import datadog.trace.agent.tooling.AgentInstaller; import datadog.trace.agent.tooling.InstrumenterModule; import datadog.trace.agent.tooling.TracerInstaller; @@ -54,7 +56,14 @@ * */ @WithConfig(key = "detailed.instrumentation.errors", value = "true") -@ExtendWith({TestClassShadowingExtension.class, AllowContextTestingExtension.class}) +@ExtendWith({ + TestClassShadowingExtension.class, + AllowContextTestingExtension.class, + ScopeDiagnosticsExtension.class +}) +// Track scope continuations for every instrumentation test (report-only: failOnLeak defaults to +// false). @Inherited, so all subclasses get the full timeline dumped after each test. +@TrackScopeContinuations public abstract class AbstractInstrumentationTest { static final Instrumentation INSTRUMENTATION = ByteBuddyAgent.getInstrumentation(); diff --git a/dd-java-agent/instrumentation-testing/src/main/java/datadog/trace/agent/test/scopediag/ContinuableScopeAdvice.java b/dd-java-agent/instrumentation-testing/src/main/java/datadog/trace/agent/test/scopediag/ContinuableScopeAdvice.java new file mode 100644 index 00000000000..3dab0453128 --- /dev/null +++ b/dd-java-agent/instrumentation-testing/src/main/java/datadog/trace/agent/test/scopediag/ContinuableScopeAdvice.java @@ -0,0 +1,39 @@ +package datadog.trace.agent.test.scopediag; + +import net.bytebuddy.asm.Advice; + +/** + * Test-only ByteBuddy advice woven into {@code datadog.trace.core.scopemanager.ContinuableScope} + * (and, by inheritance, {@code ContinuingScope}) to track the scope activation lifecycle. + * + *

The target type is package-private, so {@code this} is typed as {@link Object} and re-cast + * inside {@link ScopeContinuationProbe}. {@code afterActivated} is the open point (first call per + * scope identity), {@code onProperClose} the pop, and {@code close} the wrong-thread check. + */ +public final class ContinuableScopeAdvice { + private ContinuableScopeAdvice() {} + + /** {@code afterActivated()} — the scope became active. */ + public static final class AfterActivated { + @Advice.OnMethodExit(suppress = Throwable.class) + public static void exit(@Advice.This Object scope) { + ScopeContinuationProbe.onScopeOpen(scope); + } + } + + /** {@code onProperClose()} — the scope was popped from its thread's stack. */ + public static final class OnProperClose { + @Advice.OnMethodExit(suppress = Throwable.class) + public static void exit(@Advice.This Object scope) { + ScopeContinuationProbe.onScopeClose(scope); + } + } + + /** {@code close()} — check for an out-of-order / wrong-thread close. */ + public static final class Close { + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void enter(@Advice.This Object scope) { + ScopeContinuationProbe.onScopeClosing(scope); + } + } +} diff --git a/dd-java-agent/instrumentation-testing/src/main/java/datadog/trace/agent/test/scopediag/ContinuationAdvice.java b/dd-java-agent/instrumentation-testing/src/main/java/datadog/trace/agent/test/scopediag/ContinuationAdvice.java new file mode 100644 index 00000000000..300650aa4ac --- /dev/null +++ b/dd-java-agent/instrumentation-testing/src/main/java/datadog/trace/agent/test/scopediag/ContinuationAdvice.java @@ -0,0 +1,76 @@ +package datadog.trace.agent.test.scopediag; + +import net.bytebuddy.asm.Advice; + +/** + * Test-only ByteBuddy advice woven into {@code datadog.trace.core.scopemanager.ScopeContinuation}. + * + *

The target type is package-private and cannot be named here, so {@code this} is typed as + * {@link Object} and re-cast to the public {@code AgentScope.Continuation} supertype inside {@link + * ScopeContinuationProbe}. {@link Advice.FieldValue} reads the private {@code count} field — legal + * because the advice is inlined into the field's own class. + */ +public final class ContinuationAdvice { + private ContinuationAdvice() {} + + /** {@code register()} — the continuation was captured. */ + public static final class Register { + @Advice.OnMethodExit(suppress = Throwable.class) + public static void exit(@Advice.This Object self) { + ScopeContinuationProbe.onCapture(self); + } + } + + /** + * {@code activate()} — a (possibly noop) activation; the probe filters the rollback branch. + * + *

The activation timestamp is captured at method entry, not exit: the same-span reuse + * optimization ({@code ContinuableScopeManager.continueSpan}) cancels the continuation from + * inside {@code activate()} before it returns, so timestamping the resume at exit would + * order it after that internal resolution and spuriously flag {@code ACTIVATE_AFTER_RESOLVE}. + */ + public static final class Activate { + @Advice.OnMethodEnter + public static long enter() { + return System.nanoTime(); + } + + @Advice.OnMethodExit(suppress = Throwable.class) + public static void exit( + @Advice.This Object self, @Advice.Enter long ddActivateNanos, @Advice.Return Object scope) { + ScopeContinuationProbe.onActivate(self, scope, ddActivateNanos); + } + } + + /** + * Resolution detected via the {@code count} transition. Applied to both {@code cancel()} and + * {@code cancelFromContinuedScopeClose()} — they need identical before/after observation. The + * originating method name ({@code #m}) distinguishes an explicit cancel from a normal + * finish-on-scope-close. + * + *

The resolve timestamp is captured at method entry (the {@code ddResolveNanos} + * local), not at exit: the body itself may call {@code removeContinuation() -> + * PendingTrace.write()}, which is exactly where the root-written timestamp is taken. Timestamping + * at exit would place the resolution after the root write it triggered, producing a spurious + * late-finish. + */ + public static final class Cancel { + @Advice.OnMethodEnter + public static int enter( + @Advice.FieldValue("count") int count, + @Advice.Local("ddResolveNanos") long ddResolveNanos) { + ddResolveNanos = System.nanoTime(); + return count; + } + + @Advice.OnMethodExit(suppress = Throwable.class) + public static void exit( + @Advice.This Object self, + @Advice.Origin("#m") String method, + @Advice.Enter int countBefore, + @Advice.Local("ddResolveNanos") long ddResolveNanos, + @Advice.FieldValue("count") int countAfter) { + ScopeContinuationProbe.onResolve(self, method, countBefore, countAfter, ddResolveNanos); + } + } +} diff --git a/dd-java-agent/instrumentation-testing/src/main/java/datadog/trace/agent/test/scopediag/ContinuationRecord.java b/dd-java-agent/instrumentation-testing/src/main/java/datadog/trace/agent/test/scopediag/ContinuationRecord.java new file mode 100644 index 00000000000..c7e452889e8 --- /dev/null +++ b/dd-java-agent/instrumentation-testing/src/main/java/datadog/trace/agent/test/scopediag/ContinuationRecord.java @@ -0,0 +1,216 @@ +package datadog.trace.agent.test.scopediag; + +import datadog.trace.api.DDTraceId; +import java.util.ArrayList; +import java.util.EnumSet; +import java.util.List; + +/** + * The correlated continuation lifecycle: its capture, every resume (activation), any + * failed activation, and its terminal resolution (finish/cancel). Scope activation lifetimes are + * modelled separately by {@link ScopeRecord}; the scopes a continuation spawned are linked here by + * their seq ids ({@link #scopeRecordSeqs()}). + * + *

Built incrementally as events arrive on different threads, so all mutating access is + * synchronized on the instance. + */ +public final class ContinuationRecord { + public final long seq; + public final DDTraceId traceId; + public final long spanId; + public final String spanName; + public final byte source; + + /** {@code true} when a resume/resolution was seen without a preceding capture in this window. */ + public final boolean orphan; + + private final ScopeEvent capture; + private final List resumes = new ArrayList<>(1); + private final List failedActivations = new ArrayList<>(0); + private ScopeEvent terminal; + private final List extraTerminals = new ArrayList<>(0); + private final List scopeRecordSeqs = new ArrayList<>(1); + + ContinuationRecord( + long seq, + DDTraceId traceId, + long spanId, + String spanName, + byte source, + boolean orphan, + ScopeEvent capture) { + this.seq = seq; + this.traceId = traceId; + this.spanId = spanId; + this.spanName = spanName; + this.source = source; + this.orphan = orphan; + this.capture = capture; + } + + // ---- mutation ------------------------------------------------------------ + + synchronized void addResume(ScopeEvent event) { + resumes.add(event); + } + + synchronized void addFailedActivation(ScopeEvent event) { + failedActivations.add(event); + } + + /** First terminal sets {@link #terminal}; any subsequent terminal is a double-finish signal. */ + synchronized void setTerminalOrExtra(ScopeEvent event) { + if (terminal == null) { + terminal = event; + } else { + extraTerminals.add(event); + } + } + + synchronized void linkScope(long scopeSeq) { + scopeRecordSeqs.add(scopeSeq); + } + + // ---- accessors ----------------------------------------------------------- + + public synchronized ScopeEvent capture() { + return capture; + } + + public synchronized List resumes() { + return new ArrayList<>(resumes); + } + + public synchronized List failedActivations() { + return new ArrayList<>(failedActivations); + } + + public synchronized ScopeEvent terminal() { + return terminal; + } + + public synchronized List extraTerminals() { + return new ArrayList<>(extraTerminals); + } + + public synchronized List scopeRecordSeqs() { + return new ArrayList<>(scopeRecordSeqs); + } + + public synchronized boolean isResumed() { + return !resumes.isEmpty(); + } + + public synchronized boolean isResolved() { + return terminal != null; + } + + // ---- derived ------------------------------------------------------------- + + public synchronized ContinuationStatus status() { + if (terminal != null) { + return terminal.type == ScopeEvent.Type.RESOLVE_CANCEL + ? ContinuationStatus.CANCELLED + : ContinuationStatus.FINISHED; + } + return ContinuationStatus.LEAKED; + } + + /** + * Derives the failure set for this continuation. {@code rootWrittenNanos} may be {@code null}. + */ + public synchronized EnumSet failures(Long rootWrittenNanos) { + EnumSet failures = EnumSet.noneOf(Failure.class); + if (terminal == null) { + failures.add(Failure.LEAKED); + } + if (!extraTerminals.isEmpty()) { + failures.add(Failure.DOUBLE_FINISH); + } + if (!failedActivations.isEmpty() || resumedAfterTerminal()) { + failures.add(Failure.ACTIVATE_AFTER_RESOLVE); + } + if (rootWrittenNanos != null + && (laterThan(terminal, rootWrittenNanos) || laterThan(resumes, rootWrittenNanos))) { + failures.add(Failure.LATE_FINISH); + } + return failures; + } + + private boolean resumedAfterTerminal() { + if (terminal == null) { + return false; + } + for (ScopeEvent r : resumes) { + if (r.nanos > terminal.nanos) { + return true; + } + } + return false; + } + + /** {@code true} when capture and any resume/terminal happened on different threads. */ + public synchronized boolean threadHandoff() { + if (capture == null) { + return false; + } + String captureThread = capture.threadName; + for (ScopeEvent r : resumes) { + if (!captureThread.equals(r.threadName)) { + return true; + } + } + return terminal != null && !captureThread.equals(terminal.threadName); + } + + /** Nanos between capture and the first resume, or {@code null} if not both observed. */ + public synchronized Long captureToFirstResumeNanos() { + if (capture == null || resumes.isEmpty()) { + return null; + } + return resumes.get(0).nanos - capture.nanos; + } + + /** Nanos between capture and the terminal resolution, or {@code null} if not both observed. */ + public synchronized Long ageAtTerminalNanos() { + if (capture == null || terminal == null) { + return null; + } + return terminal.nanos - capture.nanos; + } + + /** Earliest known event time for ordering the timeline. */ + public synchronized long firstNanos() { + long min = capture != null ? capture.nanos : Long.MAX_VALUE; + for (ScopeEvent e : resumes) { + min = Math.min(min, e.nanos); + } + for (ScopeEvent e : failedActivations) { + min = Math.min(min, e.nanos); + } + if (terminal != null) { + min = Math.min(min, terminal.nanos); + } + for (ScopeEvent e : extraTerminals) { + min = Math.min(min, e.nanos); + } + return min; + } + + public String sourceName() { + return ScopeSources.name(source); + } + + private static boolean laterThan(ScopeEvent event, long nanos) { + return event != null && event.nanos > nanos; + } + + private static boolean laterThan(List events, long nanos) { + for (ScopeEvent e : events) { + if (e.nanos > nanos) { + return true; + } + } + return false; + } +} diff --git a/dd-java-agent/instrumentation-testing/src/main/java/datadog/trace/agent/test/scopediag/ContinuationStatus.java b/dd-java-agent/instrumentation-testing/src/main/java/datadog/trace/agent/test/scopediag/ContinuationStatus.java new file mode 100644 index 00000000000..213a57a62a8 --- /dev/null +++ b/dd-java-agent/instrumentation-testing/src/main/java/datadog/trace/agent/test/scopediag/ContinuationStatus.java @@ -0,0 +1,18 @@ +package datadog.trace.agent.test.scopediag; + +/** + * Derived lifecycle state of a continuation, for rendering. The authoritative bug signal is the + * {@link Failure} set, not this status. + */ +public enum ContinuationStatus { + /** Captured but not yet resumed or resolved. */ + CAPTURED, + /** Resumed at least once but not yet resolved. */ + RESUMED, + /** Resolved normally (all activations closed or a clean cancel with no outstanding work). */ + FINISHED, + /** Resolved via the cancel-with-outstanding-work path. */ + CANCELLED, + /** Captured (and possibly resumed) but never resolved within the recording window. */ + LEAKED +} diff --git a/dd-java-agent/instrumentation-testing/src/main/java/datadog/trace/agent/test/scopediag/Failure.java b/dd-java-agent/instrumentation-testing/src/main/java/datadog/trace/agent/test/scopediag/Failure.java new file mode 100644 index 00000000000..6632609584d --- /dev/null +++ b/dd-java-agent/instrumentation-testing/src/main/java/datadog/trace/agent/test/scopediag/Failure.java @@ -0,0 +1,21 @@ +package datadog.trace.agent.test.scopediag; + +/** + * A derived failure classification. Shared by {@link ContinuationRecord} (continuation-lifetime + * failures) and {@link ScopeRecord} (scope-lifetime failures). See {@link + * ScopeDiagnosticsReport#hasProblems()} for which of these fail a test versus are report-only. + */ +public enum Failure { + /** Continuation captured but never resolved within the window. */ + LEAKED, + /** Continuation resolved/resumed after the root span of its trace was already written. */ + LATE_FINISH, + /** Continuation resolved more than once. */ + DOUBLE_FINISH, + /** Continuation activated after it had already been resolved. */ + ACTIVATE_AFTER_RESOLVE, + /** Scope closed while not on top of its thread's stack (closed on the wrong thread / order). */ + CLOSE_WRONG_THREAD, + /** Scope opened but never closed within the window. */ + NEVER_CLOSED +} diff --git a/dd-java-agent/instrumentation-testing/src/main/java/datadog/trace/agent/test/scopediag/PendingTraceAdvice.java b/dd-java-agent/instrumentation-testing/src/main/java/datadog/trace/agent/test/scopediag/PendingTraceAdvice.java new file mode 100644 index 00000000000..48264955738 --- /dev/null +++ b/dd-java-agent/instrumentation-testing/src/main/java/datadog/trace/agent/test/scopediag/PendingTraceAdvice.java @@ -0,0 +1,26 @@ +package datadog.trace.agent.test.scopediag; + +import net.bytebuddy.asm.Advice; + +/** + * Test-only ByteBuddy advice woven into {@code datadog.trace.core.PendingTrace}. Fires the + * root-written signal at the exact site where the old production seam did: inside {@code + * write(boolean)} on the non-partial path, just before {@code rootSpanWritten} is set, gated on it + * not already being set. {@link Advice.FieldValue} reads the private {@code traceId}/{@code + * rootSpanWritten} fields — legal because the advice is inlined into their own class. + */ +public final class PendingTraceAdvice { + private PendingTraceAdvice() {} + + public static final class Write { + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void enter( + @Advice.Argument(0) boolean isPartial, + @Advice.FieldValue("rootSpanWritten") boolean alreadyWritten, + @Advice.FieldValue("traceId") Object traceId) { + if (!isPartial && !alreadyWritten) { + ScopeContinuationProbe.onRootWritten(traceId); + } + } + } +} diff --git a/dd-java-agent/instrumentation-testing/src/main/java/datadog/trace/agent/test/scopediag/ScopeContinuationProbe.java b/dd-java-agent/instrumentation-testing/src/main/java/datadog/trace/agent/test/scopediag/ScopeContinuationProbe.java new file mode 100644 index 00000000000..6bf1de9582b --- /dev/null +++ b/dd-java-agent/instrumentation-testing/src/main/java/datadog/trace/agent/test/scopediag/ScopeContinuationProbe.java @@ -0,0 +1,346 @@ +package datadog.trace.agent.test.scopediag; + +import datadog.trace.api.DDTraceId; +import datadog.trace.bootstrap.instrumentation.api.AgentScope; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import datadog.trace.bootstrap.instrumentation.api.AgentTracer; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.Collections; +import java.util.IdentityHashMap; +import java.util.Set; + +/** + * Recorder hook that the test-only ByteBuddy advice ({@link ContinuationAdvice}, {@link + * PendingTraceAdvice}) funnels scope-continuation lifecycle events into. It replaces the former + * production {@code ContinuationDiagnostics} seam: the advice is woven into {@code + * datadog.trace.core.scopemanager.ScopeContinuation} and {@code datadog.trace.core.PendingTrace} at + * test time only, so production tracer code carries no diagnostic footprint at all. + * + *

Inlined advice runs in the same app classloader as this class at test time, so it can call + * these statics directly. Every entry point first checks the {@link #recording} flag and is fully + * wrapped so a diagnostic failure can never propagate back into the tracer. + */ +public final class ScopeContinuationProbe { + /** + * Mirrors {@code ScopeContinuation.CANCELLED} (see {@code + * dd-trace-core/.../scopemanager/ScopeContinuation.java}). A continuation is resolved exactly + * when its {@code count} field transitions to this sentinel during a cancel call. Kept in sync by + * {@code ScopeContinuationProbeTest}. + */ + static final int CANCELLED = Integer.MIN_VALUE >> 1; + + private static volatile boolean recording = false; + private static volatile boolean installed = false; + + /** Cached reflective handle to the package-private {@code ScopeContinuation.source} field. */ + private static volatile Field sourceField; + + // cached reflective handles for scope-lifecycle reads (set-once, best-effort) + private static volatile Field scopeSourceField; // ContinuableScope.source + private static volatile Field continuationField; // ContinuingScope.continuation + private static volatile Field scopeManagerField; // ContinuableScope.scopeManager + private static volatile Method scopeStackMethod; // ContinuableScopeManager.scopeStack() + private static volatile Method checkTopMethod; // ScopeStack.checkTop(ContinuableScope) + + /** + * Continuations already recorded as resolved, by identity. The clean-resolution path that flips + * {@code count} to {@link #CANCELLED} can be observed by two inlined advice frames (the {@code + * cancelFromContinuedScopeClose} slow path delegates to {@code cancel()}); since {@code + * CANCELLED} is terminal, dedup by identity so each continuation resolves exactly once — matching + * the single {@code notifyResolve} the old production seam emitted. + */ + private static final Set resolved = + Collections.synchronizedSet(Collections.newSetFromMap(new IdentityHashMap<>())); + + private ScopeContinuationProbe() {} + + /** Installs the transformer (once per JVM) and starts recording. */ + static synchronized void enable() { + if (!installed) { + ScopeContinuationTransformer.install(); + installed = true; + } + recording = true; + } + + /** Stops recording. The transformer stays installed (inert while not recording). */ + static void disable() { + recording = false; + } + + /** Clears per-window state. Called from {@link ScopeDiagnostics#reset()}. */ + static void reset() { + resolved.clear(); + } + + // ---- advice entry points (public so inlined advice can reference them) ------------------- + + /** {@code ScopeContinuation.register()} exit: the continuation was captured. */ + public static void onCapture(Object self) { + if (!recording) { + return; + } + try { + AgentScope.Continuation continuation = (AgentScope.Continuation) self; + AgentSpan span = continuation.span(); + if (span != null) { + ScopeDiagnostics.recordCapture( + continuation, span.getTraceId(), span.getSpanId(), spanName(span), sourceOf(self)); + } + } catch (Throwable ignored) { + // diagnostics must never disturb the tracer + } + } + + /** + * {@code ScopeContinuation.activate()} exit: a real activation happened. The rollback branch + * returns the {@link AgentTracer#noopScope() noop scope} singleton, so a returned noop scope is + * skipped — this exactly reproduces the original "success branch only" semantics. + */ + public static void onActivate(Object self, Object returnedScope, long activateNanos) { + if (!recording) { + return; + } + try { + AgentScope.Continuation continuation = (AgentScope.Continuation) self; + if (returnedScope == AgentTracer.noopScope()) { + // activate() returned the noop scope: the continuation was already resolved. This is the + // activate-after-resolve signal — the engine records it only if a terminal was seen. + ScopeDiagnostics.recordActivateFailed(continuation); + return; + } + AgentSpan span = continuation.span(); + if (span != null) { + ScopeDiagnostics.recordActivate( + continuation, + span.getTraceId(), + span.getSpanId(), + spanName(span), + sourceOf(self), + activateNanos); + } + } catch (Throwable ignored) { + } + } + + /** + * {@code ScopeContinuation.cancel()} / {@code cancelFromContinuedScopeClose()} exit. The original + * production seam fired only from inside the clean resolution branch; here we detect that branch + * by observing the {@code count} field transition to {@link #CANCELLED} during this call. A + * cancel with outstanding activations leaves {@code count} unchanged (not a resolution). + */ + public static void onResolve( + Object self, String method, int countBefore, int countAfter, long resolveNanos) { + if (!recording) { + return; + } + if (countAfter != CANCELLED) { + return; // not a resolution + } + // An explicit cancel() is a discard; cancelFromContinuedScopeClose() is a normal finish once + // the continued scope closes. (Caveat: the rare cancelFromContinuedScopeClose slow path + // delegates to cancel(), so a multi-activation finish is recorded as a cancel.) + boolean cancelled = "cancel".equals(method); + try { + AgentScope.Continuation continuation = (AgentScope.Continuation) self; + if (countBefore == CANCELLED) { + // already cancelled before this call: a genuine second finish/cancel (the slow-path + // artifact always transitions 1->CANCELLED, never CANCELLED->CANCELLED). Surface it as a + // double finish, bypassing the first-resolution dedup. + ScopeDiagnostics.recordResolve(continuation, cancelled, resolveNanos); + } else if (resolved.add(self)) { + // first clean transition to CANCELLED; later observations of the SAME transition (the + // cancelFromContinuedScopeClose slow path's nested cancel()) are suppressed + ScopeDiagnostics.recordResolve(continuation, cancelled, resolveNanos); + } + } catch (Throwable ignored) { + } + } + + /** {@code PendingTrace.write()} root-written site. */ + public static void onRootWritten(Object traceId) { + if (!recording) { + return; + } + try { + ScopeDiagnostics.recordRootWritten((DDTraceId) traceId); + } catch (Throwable ignored) { + } + } + + /** + * {@code ContinuableScope.afterActivated()} exit: a scope became active. Re-activations (parent + * restored after a child closes) reach here too; the engine keeps only the first per scope + * identity. Links to the spawning continuation when the scope is a {@code ContinuingScope}. + */ + public static void onScopeOpen(Object scope) { + if (!recording) { + return; + } + try { + AgentSpan span = ((AgentScope) scope).span(); + DDTraceId traceId = span != null ? span.getTraceId() : DDTraceId.ZERO; + long spanId = span != null ? span.getSpanId() : 0L; + String name = span != null ? spanName(span) : null; + ScopeDiagnostics.recordScopeOpen( + scope, traceId, spanId, name, scopeSourceOf(scope), continuationOf(scope)); + } catch (Throwable ignored) { + } + } + + /** + * {@code ContinuableScope.onProperClose()} exit: the scope was popped from its thread's stack. + */ + public static void onScopeClose(Object scope) { + if (!recording) { + return; + } + try { + ScopeDiagnostics.recordScopeClose(scope); + } catch (Throwable ignored) { + } + } + + /** + * {@code ContinuableScope.close()} entry: if the scope is not on top of its thread's stack, this + * is an out-of-order / wrong-thread close. Best-effort — silently does nothing if the internal + * stack check cannot be reached reflectively. + */ + public static void onScopeClosing(Object scope) { + if (!recording) { + return; + } + try { + if (isNotOnTop(scope)) { + ScopeDiagnostics.recordScopeCloseWrongThread(scope); + } + } catch (Throwable ignored) { + } + } + + /** Snapshots the span name as a String (the CharSequence may mutate later), or {@code null}. */ + private static String spanName(AgentSpan span) { + try { + CharSequence name = span.getSpanName(); + return name == null ? null : name.toString(); + } catch (Throwable ignored) { + return null; + } + } + + /** + * Reads the package-private {@code source} byte field, falling back to the {@code -1} sentinel. + */ + private static byte sourceOf(Object self) { + try { + Field field = sourceField; + if (field == null) { + field = self.getClass().getDeclaredField("source"); + field.setAccessible(true); + sourceField = field; + } + return field.getByte(self); + } catch (Throwable ignored) { + return (byte) -1; + } + } + + /** Reads the {@code source} byte of a scope (declared on {@code ContinuableScope}). */ + private static byte scopeSourceOf(Object scope) { + try { + Field field = scopeSourceField; + if (field == null) { + field = findField(scope.getClass(), "source"); + scopeSourceField = field; + } + return field != null ? field.getByte(scope) : (byte) -1; + } catch (Throwable ignored) { + return (byte) -1; + } + } + + /** + * The continuation that spawned a scope, read from {@code ContinuingScope.continuation}; {@code + * null} for a plain (non-continuation) scope. + */ + private static AgentScope.Continuation continuationOf(Object scope) { + try { + Field field = continuationField; + if (field == null) { + field = findField(scope.getClass(), "continuation"); + continuationField = field; + } + if (field == null || !field.getDeclaringClass().isInstance(scope)) { + return null; // not a ContinuingScope + } + Object value = field.get(scope); + return value instanceof AgentScope.Continuation ? (AgentScope.Continuation) value : null; + } catch (Throwable ignored) { + return null; + } + } + + /** Best-effort: {@code true} when the scope is not on top of its thread's scope stack. */ + private static boolean isNotOnTop(Object scope) { + try { + Field managerField = scopeManagerField; + if (managerField == null) { + managerField = findField(scope.getClass(), "scopeManager"); + scopeManagerField = managerField; + } + Object manager = managerField != null ? managerField.get(scope) : null; + if (manager == null) { + return false; + } + Method stackMethod = scopeStackMethod; + if (stackMethod == null) { + stackMethod = findMethod(manager.getClass(), "scopeStack", 0); + scopeStackMethod = stackMethod; + } + Object stack = stackMethod != null ? stackMethod.invoke(manager) : null; + if (stack == null) { + return false; + } + Method check = checkTopMethod; + if (check == null) { + check = findMethod(stack.getClass(), "checkTop", 1); + checkTopMethod = check; + } + if (check == null) { + return false; + } + Object onTop = check.invoke(stack, scope); + return onTop instanceof Boolean && !((Boolean) onTop); + } catch (Throwable ignored) { + return false; + } + } + + /** Finds a named field declared on a class or any superclass, made accessible. */ + private static Field findField(Class cls, String name) { + for (Class c = cls; c != null; c = c.getSuperclass()) { + try { + Field f = c.getDeclaredField(name); + f.setAccessible(true); + return f; + } catch (NoSuchFieldException ignored) { + // keep walking up + } + } + return null; + } + + /** Finds a named method with the given parameter count on a class or superclass, accessible. */ + private static Method findMethod(Class cls, String name, int paramCount) { + for (Class c = cls; c != null; c = c.getSuperclass()) { + for (Method m : c.getDeclaredMethods()) { + if (m.getName().equals(name) && m.getParameterCount() == paramCount) { + m.setAccessible(true); + return m; + } + } + } + return null; + } +} diff --git a/dd-java-agent/instrumentation-testing/src/main/java/datadog/trace/agent/test/scopediag/ScopeContinuationTransformer.java b/dd-java-agent/instrumentation-testing/src/main/java/datadog/trace/agent/test/scopediag/ScopeContinuationTransformer.java new file mode 100644 index 00000000000..9e0babfd969 --- /dev/null +++ b/dd-java-agent/instrumentation-testing/src/main/java/datadog/trace/agent/test/scopediag/ScopeContinuationTransformer.java @@ -0,0 +1,65 @@ +package datadog.trace.agent.test.scopediag; + +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + +import java.lang.instrument.Instrumentation; +import net.bytebuddy.agent.ByteBuddyAgent; +import net.bytebuddy.agent.builder.AgentBuilder; +import net.bytebuddy.agent.builder.ResettableClassFileTransformer; +import net.bytebuddy.asm.Advice; + +/** + * Installs the test-only diagnostic advice into the tracer's own {@code + * datadog.trace.core.scopemanager.ScopeContinuation} and {@code datadog.trace.core.PendingTrace}. + * + *

These classes sit under {@code datadog.trace.core.*}, which the tracer's own {@code + * AgentBuilder} hard-ignores. We therefore install a separate {@link AgentBuilder} on the + * raw {@link Instrumentation} with no global-ignore filter, using retransformation so the + * already-loaded classes are rewoven on install. The advice is schema-preserving, so {@code + * disableClassFormatChanges()} + {@code REDEFINE} keep it retransform-safe. Installed once per JVM. + */ +final class ScopeContinuationTransformer { + private static volatile ResettableClassFileTransformer transformer; + + private ScopeContinuationTransformer() {} + + static synchronized void install() { + if (transformer != null) { + return; + } + Instrumentation instrumentation = ByteBuddyAgent.getInstrumentation(); + transformer = + new AgentBuilder.Default() + .disableClassFormatChanges() + .with(AgentBuilder.RedefinitionStrategy.RETRANSFORMATION) + .with(AgentBuilder.TypeStrategy.Default.REDEFINE) + .type(named("datadog.trace.core.scopemanager.ScopeContinuation")) + .transform( + (builder, type, classLoader, module, pd) -> + builder + .visit(Advice.to(ContinuationAdvice.Register.class).on(named("register"))) + .visit(Advice.to(ContinuationAdvice.Activate.class).on(named("activate"))) + .visit( + Advice.to(ContinuationAdvice.Cancel.class) + .on(named("cancel").or(named("cancelFromContinuedScopeClose"))))) + .type(named("datadog.trace.core.PendingTrace")) + .transform( + (builder, type, classLoader, module, pd) -> + builder.visit( + Advice.to(PendingTraceAdvice.Write.class) + .on(named("write").and(takesArguments(boolean.class))))) + .type(named("datadog.trace.core.scopemanager.ContinuableScope")) + .transform( + (builder, type, classLoader, module, pd) -> + builder + .visit( + Advice.to(ContinuableScopeAdvice.AfterActivated.class) + .on(named("afterActivated"))) + .visit( + Advice.to(ContinuableScopeAdvice.OnProperClose.class) + .on(named("onProperClose"))) + .visit(Advice.to(ContinuableScopeAdvice.Close.class).on(named("close")))) + .installOn(instrumentation); + } +} diff --git a/dd-java-agent/instrumentation-testing/src/main/java/datadog/trace/agent/test/scopediag/ScopeDiagnostics.java b/dd-java-agent/instrumentation-testing/src/main/java/datadog/trace/agent/test/scopediag/ScopeDiagnostics.java new file mode 100644 index 00000000000..9b6f6e2fdb0 --- /dev/null +++ b/dd-java-agent/instrumentation-testing/src/main/java/datadog/trace/agent/test/scopediag/ScopeDiagnostics.java @@ -0,0 +1,305 @@ +package datadog.trace.agent.test.scopediag; + +import datadog.trace.api.DDTraceId; +import datadog.trace.bootstrap.instrumentation.api.AgentScope; +import java.util.ArrayList; +import java.util.Collections; +import java.util.IdentityHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Test-time engine that records scope/continuation lifecycle events and renders leak reports. + * + *

It models two correlated lifecycles separately: continuations ({@link ContinuationRecord} — + * captured/resumed/finished) and scopes ({@link ScopeRecord} — opened/closed). While recording, + * {@link ScopeContinuationProbe} (test-only bytecode advice) feeds it events, which it correlates + * by identity (an {@link IdentityHashMap}, never {@code equals}/{@code hashCode}) — continuations + * by their {@code AgentScope.Continuation} instance, scopes by their scope instance. It assumes a + * single test runs at a time per JVM (true for instrumentation tests); {@link #reset()} isolates + * one test from the next. + * + *

Usage: + * + *

+ *   ScopeDiagnostics.startRecording();
+ *   ... exercise code under test ...
+ *   System.out.println(ScopeDiagnostics.report().renderSummary());
+ *   ScopeDiagnostics.assertNoLeaks();   // optional
+ *   ScopeDiagnostics.stop();
+ * 
+ */ +public final class ScopeDiagnostics { + private static final int DEFAULT_MAX_FRAMES = 6; + + private static final ScopeDiagnostics INSTANCE = new ScopeDiagnostics(); + + private final Map records = + Collections.synchronizedMap( + new IdentityHashMap()); + private final Map scopeRecords = + Collections.synchronizedMap(new IdentityHashMap()); + private final Map rootWrittenNanos = new ConcurrentHashMap<>(); + private final AtomicLong seq = new AtomicLong(); + private final AtomicLong scopeSeq = new AtomicLong(); + private volatile StackFilter stackFilter = new StackFilter(DEFAULT_MAX_FRAMES); + + private final Listener listener = new Listener(); + + private ScopeDiagnostics() {} + + // ---- public static facade ------------------------------------------------ + + /** Clears any prior data and starts recording with the default stack depth. */ + public static void startRecording() { + startRecording(DEFAULT_MAX_FRAMES); + } + + /** Clears any prior data and starts recording, keeping up to {@code maxFrames} per stack. */ + public static void startRecording(int maxFrames) { + INSTANCE.reset(); + INSTANCE.stackFilter = new StackFilter(maxFrames); + ScopeContinuationProbe.enable(); + } + + /** Stops recording (the probe goes inert). Recorded data remains queryable until reset. */ + public static void stop() { + ScopeContinuationProbe.disable(); + } + + /** Discards all recorded data. */ + public static void reset() { + INSTANCE.records.clear(); + INSTANCE.scopeRecords.clear(); + INSTANCE.rootWrittenNanos.clear(); + INSTANCE.seq.set(0); + INSTANCE.scopeSeq.set(0); + ScopeContinuationProbe.reset(); + } + + /** Builds an immutable snapshot report of everything recorded so far. */ + public static ScopeDiagnosticsReport report() { + List continuations; + synchronized (INSTANCE.records) { + continuations = new ArrayList<>(INSTANCE.records.values()); + } + List scopes; + synchronized (INSTANCE.scopeRecords) { + scopes = new ArrayList<>(INSTANCE.scopeRecords.values()); + } + return new ScopeDiagnosticsReport( + continuations, scopes, new ConcurrentHashMap<>(INSTANCE.rootWrittenNanos)); + } + + /** + * Fails with an {@link AssertionError} (carrying the problem summary) if the report flags a + * genuine bug (see {@link ScopeDiagnosticsReport#hasProblems()}). Report-only signals such as + * late-after-root and close-on-wrong-thread do not fail. + */ + public static void assertNoLeaks() { + ScopeDiagnosticsReport report = report(); + if (report.hasProblems()) { + throw new AssertionError("Scope continuation problems detected:\n" + report.renderSummary()); + } + } + + // ---- listener implementation --------------------------------------------- + + private static final StackTraceElement[] NO_STACK = new StackTraceElement[0]; + + private ScopeEvent event(ScopeEvent.Type type) { + return event(type, System.nanoTime()); + } + + /** Builds an event with an explicit timestamp (thread and stack are still captured now). */ + private ScopeEvent event(ScopeEvent.Type type, long nanos) { + // Capturing a stack per event is the dominant cost and perturbs the very timings we record; + // skip it entirely when callsites are disabled (maxFrames <= 0) rather than walking then + // trimming. + StackFilter filter = stackFilter; + StackTraceElement[] stack = + filter.maxFrames() <= 0 ? NO_STACK : filter.filter(new Throwable().getStackTrace()); + return new ScopeEvent(type, Thread.currentThread().getName(), nanos, stack); + } + + // ---- static forwarders called by ScopeContinuationProbe ------------------ + + static void recordCapture( + AgentScope.Continuation id, DDTraceId traceId, long spanId, String spanName, byte source) { + INSTANCE.listener.onCapture(id, traceId, spanId, spanName, source); + } + + static void recordActivate( + AgentScope.Continuation id, + DDTraceId traceId, + long spanId, + String spanName, + byte source, + long nanos) { + INSTANCE.listener.onActivate(id, traceId, spanId, spanName, source, nanos); + } + + static void recordActivateFailed(AgentScope.Continuation id) { + INSTANCE.listener.onActivateFailed(id); + } + + static void recordResolve(AgentScope.Continuation id, boolean cancelled, long resolveNanos) { + INSTANCE.listener.onResolve(id, cancelled, resolveNanos); + } + + static void recordRootWritten(DDTraceId traceId) { + INSTANCE.listener.onRootWritten(traceId); + } + + static void recordScopeOpen( + Object scope, + DDTraceId traceId, + long spanId, + String spanName, + byte source, + AgentScope.Continuation continuation) { + INSTANCE.listener.onScopeOpen(scope, traceId, spanId, spanName, source, continuation); + } + + static void recordScopeClose(Object scope) { + INSTANCE.listener.onScopeClose(scope); + } + + static void recordScopeCloseWrongThread(Object scope) { + INSTANCE.listener.onScopeCloseWrongThread(scope); + } + + private final class Listener { + void onCapture( + AgentScope.Continuation id, DDTraceId traceId, long spanId, String spanName, byte source) { + try { + ContinuationRecord record = + new ContinuationRecord( + seq.getAndIncrement(), + traceId, + spanId, + spanName, + source, + false, + event(ScopeEvent.Type.CAPTURE)); + records.put(id, record); + } catch (Throwable ignored) { + // diagnostics must never disturb the tracer + } + } + + void onActivate( + AgentScope.Continuation id, + DDTraceId traceId, + long spanId, + String spanName, + byte source, + long nanos) { + try { + recordFor(id, traceId, spanId, spanName, source) + .addResume(event(ScopeEvent.Type.ACTIVATE, nanos)); + } catch (Throwable ignored) { + } + } + + void onActivateFailed(AgentScope.Continuation id) { + try { + ContinuationRecord record = records.get(id); + // only an activation of an already-resolved continuation is a real failure; a plain + // rollback (e.g. cancelled before any capture was recorded) is benign and ignored + if (record != null && record.isResolved()) { + record.addFailedActivation(event(ScopeEvent.Type.ACTIVATE_FAILED)); + } + } catch (Throwable ignored) { + } + } + + void onResolve(AgentScope.Continuation id, boolean cancelled, long resolveNanos) { + try { + ScopeEvent.Type type = + cancelled ? ScopeEvent.Type.RESOLVE_CANCEL : ScopeEvent.Type.RESOLVE_FINISH; + recordFor(id, DDTraceId.ZERO, 0, null, (byte) -1) + .setTerminalOrExtra(event(type, resolveNanos)); + } catch (Throwable ignored) { + } + } + + void onRootWritten(DDTraceId traceId) { + try { + rootWrittenNanos.putIfAbsent(traceId, System.nanoTime()); + } catch (Throwable ignored) { + } + } + + void onScopeOpen( + Object scope, + DDTraceId traceId, + long spanId, + String spanName, + byte source, + AgentScope.Continuation continuation) { + try { + synchronized (scopeRecords) { + if (scopeRecords.containsKey(scope)) { + return; // re-activation of an already-open scope, not a new open + } + ContinuationRecord owner = continuation != null ? records.get(continuation) : null; + Long continuationSeq = owner != null ? owner.seq : null; + long s = scopeSeq.getAndIncrement(); + scopeRecords.put( + scope, + new ScopeRecord( + s, + traceId, + spanId, + spanName, + source, + continuationSeq, + event(ScopeEvent.Type.SCOPE_OPEN))); + if (owner != null) { + owner.linkScope(s); + } + } + } catch (Throwable ignored) { + } + } + + void onScopeClose(Object scope) { + try { + ScopeRecord record = scopeRecords.get(scope); + if (record != null) { + record.setClose(event(ScopeEvent.Type.SCOPE_CLOSE)); + } + } catch (Throwable ignored) { + } + } + + void onScopeCloseWrongThread(Object scope) { + try { + ScopeRecord record = scopeRecords.get(scope); + if (record != null) { + record.addWrongThreadClose(event(ScopeEvent.Type.SCOPE_CLOSE_WRONG_THREAD)); + } + } catch (Throwable ignored) { + } + } + + /** Returns the record for an id, creating an orphan record if capture was not observed. */ + private ContinuationRecord recordFor( + AgentScope.Continuation id, DDTraceId traceId, long spanId, String spanName, byte source) { + synchronized (records) { + ContinuationRecord existing = records.get(id); + if (existing != null) { + return existing; + } + ContinuationRecord orphan = + new ContinuationRecord( + seq.getAndIncrement(), traceId, spanId, spanName, source, true, null); + records.put(id, orphan); + return orphan; + } + } + } +} diff --git a/dd-java-agent/instrumentation-testing/src/main/java/datadog/trace/agent/test/scopediag/ScopeDiagnosticsExtension.java b/dd-java-agent/instrumentation-testing/src/main/java/datadog/trace/agent/test/scopediag/ScopeDiagnosticsExtension.java new file mode 100644 index 00000000000..1351ca02fb9 --- /dev/null +++ b/dd-java-agent/instrumentation-testing/src/main/java/datadog/trace/agent/test/scopediag/ScopeDiagnosticsExtension.java @@ -0,0 +1,65 @@ +package datadog.trace.agent.test.scopediag; + +import java.lang.reflect.AnnotatedElement; +import java.util.Optional; +import org.junit.jupiter.api.extension.AfterEachCallback; +import org.junit.jupiter.api.extension.BeforeEachCallback; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.platform.commons.support.AnnotationSupport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * JUnit5 extension that enables {@link ScopeDiagnostics} for tests annotated with {@link + * TrackScopeContinuations}. Registered on {@code AbstractInstrumentationTest} but dormant unless + * the test class or method carries the annotation, so unannotated tests pay nothing. + * + *

Per test: resets and starts recording before; after, logs the full timeline and optionally + * asserts no leaks. + */ +public final class ScopeDiagnosticsExtension implements BeforeEachCallback, AfterEachCallback { + private static final Logger log = LoggerFactory.getLogger(ScopeDiagnosticsExtension.class); + + @Override + public void beforeEach(ExtensionContext context) { + if (resolve(context) != null) { + ScopeDiagnostics.startRecording(); + } + } + + @Override + public void afterEach(ExtensionContext context) { + TrackScopeContinuations config = resolve(context); + if (config == null) { + return; + } + ScopeDiagnostics.stop(); + ScopeDiagnosticsReport report = ScopeDiagnostics.report(); + // Always dump the full timeline so a graph/report can be built regardless of whether anything + // leaked; renderSummary() remains the concise problem-only view used by assertNoLeaks. + log.info("[{}] {}", context.getDisplayName(), report.renderTimeline()); + try { + if (config.failOnLeak()) { + ScopeDiagnostics.assertNoLeaks(); + } + } finally { + ScopeDiagnostics.reset(); + } + } + + /** Method-level annotation wins; otherwise the test class (incl. inherited). */ + private static TrackScopeContinuations resolve(ExtensionContext context) { + Optional element = context.getElement(); + if (element.isPresent()) { + Optional onElement = + AnnotationSupport.findAnnotation(element.get(), TrackScopeContinuations.class); + if (onElement.isPresent()) { + return onElement.get(); + } + } + return context + .getTestClass() + .flatMap(c -> AnnotationSupport.findAnnotation(c, TrackScopeContinuations.class)) + .orElse(null); + } +} diff --git a/dd-java-agent/instrumentation-testing/src/main/java/datadog/trace/agent/test/scopediag/ScopeDiagnosticsReport.java b/dd-java-agent/instrumentation-testing/src/main/java/datadog/trace/agent/test/scopediag/ScopeDiagnosticsReport.java new file mode 100644 index 00000000000..ac6dab350a3 --- /dev/null +++ b/dd-java-agent/instrumentation-testing/src/main/java/datadog/trace/agent/test/scopediag/ScopeDiagnosticsReport.java @@ -0,0 +1,370 @@ +package datadog.trace.agent.test.scopediag; + +import datadog.trace.api.DDTraceId; +import java.util.ArrayList; +import java.util.EnumSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + * An immutable snapshot of the recorded continuation and scope lifecycles plus the derived failure + * findings. Exposes two text renderings: a problem-only summary ({@link #renderSummary()}) for + * quick triage / assertion messages, and a complete timeline ({@link #renderTimeline()}) that dumps + * every continuation and scope with its full event lineage — emitted on every tracked test + * regardless of whether anything leaked, so a graph or report can always be produced from it. + * + *

The two lifecycles are kept separate: {@link ContinuationRecord} (captured → resumed → + * finished) and {@link ScopeRecord} (opened → closed). A scope spawned by resuming a continuation + * is linked to it ({@link ContinuationRecord#scopeRecordSeqs()} / {@link + * ScopeRecord#continuationSeq}) and rendered nested under it in the timeline. + */ +public final class ScopeDiagnosticsReport { + private final List continuations; + private final List scopes; + private final long t0; + private final Map> continuationFailures; + private final Map> scopeFailures; + private final Map scopeBySeq; + + ScopeDiagnosticsReport( + List continuations, + List scopes, + Map rootWrittenNanos) { + this.continuations = new ArrayList<>(continuations); + this.continuations.sort((a, b) -> Long.compare(a.firstNanos(), b.firstNanos())); + this.scopes = new ArrayList<>(scopes); + this.scopes.sort((a, b) -> Long.compare(a.firstNanos(), b.firstNanos())); + this.t0 = computeT0(this.continuations, this.scopes); + this.continuationFailures = classifyContinuations(this.continuations, rootWrittenNanos); + this.scopeFailures = classifyScopes(this.scopes); + this.scopeBySeq = new LinkedHashMap<>(); + for (ScopeRecord s : this.scopes) { + scopeBySeq.put(s.seq, s); + } + } + + private static long computeT0(List continuations, List scopes) { + long min = Long.MAX_VALUE; + for (ContinuationRecord r : continuations) { + min = Math.min(min, r.firstNanos()); + } + for (ScopeRecord s : scopes) { + min = Math.min(min, s.firstNanos()); + } + return min == Long.MAX_VALUE ? 0 : min; + } + + private static Map> classifyContinuations( + List records, Map rootWrittenNanos) { + Map> result = new LinkedHashMap<>(); + for (ContinuationRecord r : records) { + EnumSet failures = r.failures(rootWrittenNanos.get(r.traceId)); + if (!failures.isEmpty()) { + result.put(r, failures); + } + } + return result; + } + + private static Map> classifyScopes(List scopes) { + Map> result = new LinkedHashMap<>(); + for (ScopeRecord s : scopes) { + EnumSet failures = s.failures(); + if (!failures.isEmpty()) { + result.put(s, failures); + } + } + return result; + } + + // ---- accessors ----------------------------------------------------------- + + public List records() { + return new ArrayList<>(continuations); + } + + public List scopeRecords() { + return new ArrayList<>(scopes); + } + + public Map> findings() { + return new LinkedHashMap<>(continuationFailures); + } + + public Map> scopeFindings() { + return new LinkedHashMap<>(scopeFailures); + } + + public int leakCount() { + return countWith(continuationFailures, Failure.LEAKED); + } + + public int lateCount() { + return countWith(continuationFailures, Failure.LATE_FINISH); + } + + public int doubleCount() { + return countWith(continuationFailures, Failure.DOUBLE_FINISH); + } + + public int activateAfterResolveCount() { + return countWith(continuationFailures, Failure.ACTIVATE_AFTER_RESOLVE); + } + + public int neverClosedScopeCount() { + return countWith(scopeFailures, Failure.NEVER_CLOSED); + } + + public int closeWrongThreadCount() { + return countWith(scopeFailures, Failure.CLOSE_WRONG_THREAD); + } + + private static int countWith(Map> findings, Failure failure) { + int n = 0; + for (EnumSet f : findings.values()) { + if (f.contains(failure)) { + n++; + } + } + return n; + } + + /** + * True when there is a genuine bug to fail on: a never-resolved leak, a double finish, an + * activation after resolve, or a scope that was never closed. {@link Failure#LATE_FINISH} and + * {@link Failure#CLOSE_WRONG_THREAD} are reported but do not fail (frequently legitimate async or + * teardown ordering). + */ + public boolean hasProblems() { + return leakCount() > 0 + || doubleCount() > 0 + || activateAfterResolveCount() > 0 + || neverClosedScopeCount() > 0; + } + + // ---- rendering: text summary --------------------------------------------- + + private void appendHeader(StringBuilder sb, String title) { + sb.append(title) + .append(" (") + .append(continuations.size()) + .append(" continuations, ") + .append(scopes.size()) + .append(" scopes; ") + .append(leakCount()) + .append(" leaked, ") + .append(lateCount()) + .append(" late, ") + .append(doubleCount()) + .append(" double, ") + .append(activateAfterResolveCount()) + .append(" activate-after-resolve | scopes: ") + .append(neverClosedScopeCount()) + .append(" never-closed, ") + .append(closeWrongThreadCount()) + .append(" wrong-thread)\n"); + } + + /** Problem-only summary: just the flagged continuations and scopes with their callsites. */ + public String renderSummary() { + StringBuilder sb = new StringBuilder(); + appendHeader(sb, "Scope/continuation problems"); + if (continuationFailures.isEmpty() && scopeFailures.isEmpty()) { + sb.append(" (none)\n"); + return sb.toString(); + } + for (Map.Entry> e : continuationFailures.entrySet()) { + ContinuationRecord r = e.getKey(); + ScopeEvent capture = r.capture(); + sb.append(" ") + .append(e.getValue()) + .append(" #") + .append(r.seq) + .append(" trace=") + .append(r.traceId) + .append(" src=") + .append(r.sourceName()) + .append(" captured at ") + .append(capture == null || capture.callsite() == null ? "" : capture.callsite()) + .append('\n'); + } + for (Map.Entry> e : scopeFailures.entrySet()) { + ScopeRecord s = e.getKey(); + ScopeEvent open = s.open(); + sb.append(" ") + .append(e.getValue()) + .append(" scope#") + .append(s.seq) + .append(" trace=") + .append(s.traceId) + .append(" src=") + .append(s.sourceName()) + .append(" opened at ") + .append(open == null || open.callsite() == null ? "" : open.callsite()) + .append('\n'); + } + return sb.toString(); + } + + // ---- rendering: complete timeline ---------------------------------------- + + private static final int TIMELINE_FRAMES = 3; + + /** + * Complete cross-thread timeline: one block per continuation (capture → resume(s) → terminal), + * with the scopes it spawned nested under it, followed by any non-continuation scopes. Unlike + * {@link #renderSummary()} this lists all records, not just the flagged ones, so a graph + * (Gantt/DAG) or report can be reconstructed from it whether or not anything leaked. Each event + * carries its relative time ({@code +Δms} from the first recorded event), thread, and callsite. + */ + public String renderTimeline() { + StringBuilder sb = new StringBuilder(); + appendHeader(sb, "Scope/continuation timeline"); + if (continuations.isEmpty() && scopes.isEmpty()) { + sb.append(" (nothing captured)\n"); + return sb.toString(); + } + + for (ContinuationRecord r : continuations) { + EnumSet failures = + continuationFailures.getOrDefault(r, EnumSet.noneOf(Failure.class)); + sb.append("\n#") + .append(r.seq) + .append(' ') + .append(r.status()) + .append(" trace=") + .append(r.traceId) + .append(" span=") + .append(r.spanId); + if (r.spanName != null) { + sb.append(" \"").append(r.spanName).append('"'); + } + sb.append(" src=").append(r.sourceName()); + if (r.orphan) { + sb.append(" [ORPHAN]"); + } + if (r.threadHandoff()) { + sb.append(" [handoff]"); + } + if (!failures.isEmpty()) { + sb.append(' ').append(failures); + } + sb.append(timing(r)).append('\n'); + + appendEvent(sb, "capture ", r.capture()); + for (ScopeEvent a : r.resumes()) { + appendEvent(sb, "resume ", a); + } + for (ScopeEvent f : r.failedActivations()) { + appendEvent(sb, "act-fail", f); + } + ScopeEvent terminal = r.terminal(); + if (terminal != null) { + appendEvent( + sb, + terminal.type == ScopeEvent.Type.RESOLVE_CANCEL ? "cancel " : "finish ", + terminal); + } + for (ScopeEvent extra : r.extraTerminals()) { + appendEvent(sb, "DOUBLE ", extra); + } + for (long scopeSeq : r.scopeRecordSeqs()) { + ScopeRecord scope = scopeBySeq.get(scopeSeq); + if (scope != null) { + appendScopeLine(sb, " ", scope); + } + } + if (terminal == null) { + sb.append(" LEAKED (never finished or cancelled)\n"); + } + } + + List orphanScopes = new ArrayList<>(); + for (ScopeRecord s : scopes) { + if (s.continuationSeq == null) { + orphanScopes.add(s); + } + } + if (!orphanScopes.isEmpty()) { + sb.append("\nNon-continuation scopes:\n"); + for (ScopeRecord s : orphanScopes) { + appendScopeLine(sb, " ", s); + } + } + return sb.toString(); + } + + private String timing(ContinuationRecord r) { + StringBuilder sb = new StringBuilder(); + Long capToResume = r.captureToFirstResumeNanos(); + Long age = r.ageAtTerminalNanos(); + if (capToResume != null) { + sb.append(" cap->resume=").append(millis(capToResume)).append("ms"); + } + if (age != null) { + sb.append(" age=").append(millis(age)).append("ms"); + } + return sb.toString(); + } + + private void appendScopeLine(StringBuilder sb, String indent, ScopeRecord scope) { + EnumSet failures = scopeFailures.getOrDefault(scope, EnumSet.noneOf(Failure.class)); + sb.append(indent).append("scope#").append(scope.seq).append(' ').append(scope.sourceName()); + if (scope.spanName != null) { + sb.append(" \"").append(scope.spanName).append('"'); + } + ScopeEvent open = scope.open(); + ScopeEvent close = scope.close(); + if (open != null) { + sb.append(" open +").append(relMillis(open.nanos)).append("ms @ ").append(open.threadName); + } + if (close != null) { + sb.append(" close +") + .append(relMillis(close.nanos)) + .append("ms @ ") + .append(close.threadName); + Long active = scope.activeDurationNanos(); + if (active != null) { + sb.append(" (active ").append(millis(active)).append("ms)"); + } + } + if (scope.threadHandoff()) { + sb.append(" [handoff]"); + } + if (!failures.isEmpty()) { + sb.append(' ').append(failures); + } + sb.append('\n'); + } + + private void appendEvent(StringBuilder sb, String label, ScopeEvent event) { + if (event == null) { + sb.append(" ").append(label).append(" (not observed)\n"); + return; + } + sb.append(" ") + .append(label) + .append(" +") + .append(relMillis(event.nanos)) + .append("ms @ ") + .append(event.threadName) + .append(" at ") + .append(event.callsite() == null ? "" : event.callsite()) + .append('\n'); + StackTraceElement[] stack = event.stack; + if (stack != null) { + for (int i = 1; i < stack.length && i < TIMELINE_FRAMES; i++) { + sb.append(" from ").append(stack[i]).append('\n'); + } + } + } + + private String relMillis(long nanos) { + return String.format("%.3f", (nanos - t0) / 1_000_000.0); + } + + private static String millis(long nanos) { + return String.format("%.3f", nanos / 1_000_000.0); + } +} diff --git a/dd-java-agent/instrumentation-testing/src/main/java/datadog/trace/agent/test/scopediag/ScopeEvent.java b/dd-java-agent/instrumentation-testing/src/main/java/datadog/trace/agent/test/scopediag/ScopeEvent.java new file mode 100644 index 00000000000..f622f19de9a --- /dev/null +++ b/dd-java-agent/instrumentation-testing/src/main/java/datadog/trace/agent/test/scopediag/ScopeEvent.java @@ -0,0 +1,40 @@ +package datadog.trace.agent.test.scopediag; + +/** + * A single observed point in a continuation's lifecycle. Time, thread, and stack are captured by + * the recorder on the event's own thread (notifications are synchronous), so they reflect the + * thread that actually captured/activated/resolved the continuation. + */ +public final class ScopeEvent { + public enum Type { + CAPTURE, + ACTIVATE, + /** An {@code activate()} that returned the noop scope after the continuation was resolved. */ + ACTIVATE_FAILED, + RESOLVE_FINISH, + RESOLVE_CANCEL, + /** A scope became active (first activation). */ + SCOPE_OPEN, + /** A scope was popped from its thread's stack. */ + SCOPE_CLOSE, + /** A scope was closed while not on top of its thread's stack. */ + SCOPE_CLOSE_WRONG_THREAD + } + + public final Type type; + public final String threadName; + public final long nanos; + public final StackTraceElement[] stack; + + ScopeEvent(Type type, String threadName, long nanos, StackTraceElement[] stack) { + this.type = type; + this.threadName = threadName; + this.nanos = nanos; + this.stack = stack; + } + + /** The most relevant (top, post-filter) frame, or {@code null} if none survived filtering. */ + public StackTraceElement callsite() { + return stack != null && stack.length > 0 ? stack[0] : null; + } +} diff --git a/dd-java-agent/instrumentation-testing/src/main/java/datadog/trace/agent/test/scopediag/ScopeRecord.java b/dd-java-agent/instrumentation-testing/src/main/java/datadog/trace/agent/test/scopediag/ScopeRecord.java new file mode 100644 index 00000000000..1cfc4eb71f0 --- /dev/null +++ b/dd-java-agent/instrumentation-testing/src/main/java/datadog/trace/agent/test/scopediag/ScopeRecord.java @@ -0,0 +1,117 @@ +package datadog.trace.agent.test.scopediag; + +import datadog.trace.api.DDTraceId; +import java.util.ArrayList; +import java.util.EnumSet; +import java.util.List; + +/** + * The scope activation lifecycle: a scope opened (first activation) → closed (popped from + * its thread's stack). Distinct from the continuation lifecycle ({@link ContinuationRecord}); when + * a scope was spawned by resuming a continuation, {@link #continuationSeq} links back to that + * continuation's {@link ContinuationRecord#seq}. + */ +public final class ScopeRecord { + public final long seq; + public final DDTraceId traceId; + public final long spanId; + public final String spanName; + public final byte source; + + /** + * The seq of the continuation that spawned this scope, or {@code null} for a plain activation. + */ + public final Long continuationSeq; + + private final ScopeEvent open; + private ScopeEvent close; + private final List wrongThreadCloses = new ArrayList<>(0); + + ScopeRecord( + long seq, + DDTraceId traceId, + long spanId, + String spanName, + byte source, + Long continuationSeq, + ScopeEvent open) { + this.seq = seq; + this.traceId = traceId; + this.spanId = spanId; + this.spanName = spanName; + this.source = source; + this.continuationSeq = continuationSeq; + this.open = open; + } + + // ---- mutation ------------------------------------------------------------ + + synchronized void setClose(ScopeEvent event) { + if (close == null) { + close = event; + } + } + + synchronized void addWrongThreadClose(ScopeEvent event) { + wrongThreadCloses.add(event); + } + + // ---- accessors ----------------------------------------------------------- + + public synchronized ScopeEvent open() { + return open; + } + + public synchronized ScopeEvent close() { + return close; + } + + public synchronized List wrongThreadCloses() { + return new ArrayList<>(wrongThreadCloses); + } + + public synchronized boolean closed() { + return close != null; + } + + // ---- derived ------------------------------------------------------------- + + /** {@code true} when the scope was opened and closed on different threads. */ + public synchronized boolean threadHandoff() { + return open != null && close != null && !open.threadName.equals(close.threadName); + } + + /** Nanos the scope was active, or {@code null} if it was never closed. */ + public synchronized Long activeDurationNanos() { + if (open == null || close == null) { + return null; + } + return close.nanos - open.nanos; + } + + public synchronized EnumSet failures() { + EnumSet failures = EnumSet.noneOf(Failure.class); + if (open != null && close == null) { + failures.add(Failure.NEVER_CLOSED); + } + if (!wrongThreadCloses.isEmpty()) { + failures.add(Failure.CLOSE_WRONG_THREAD); + } + return failures; + } + + public synchronized long firstNanos() { + long min = open != null ? open.nanos : Long.MAX_VALUE; + if (close != null) { + min = Math.min(min, close.nanos); + } + for (ScopeEvent e : wrongThreadCloses) { + min = Math.min(min, e.nanos); + } + return min; + } + + public String sourceName() { + return ScopeSources.name(source); + } +} diff --git a/dd-java-agent/instrumentation-testing/src/main/java/datadog/trace/agent/test/scopediag/ScopeSources.java b/dd-java-agent/instrumentation-testing/src/main/java/datadog/trace/agent/test/scopediag/ScopeSources.java new file mode 100644 index 00000000000..7bf7a81fa3b --- /dev/null +++ b/dd-java-agent/instrumentation-testing/src/main/java/datadog/trace/agent/test/scopediag/ScopeSources.java @@ -0,0 +1,25 @@ +package datadog.trace.agent.test.scopediag; + +/** + * Maps the {@code byte} scope source used by the tracer to a readable name. Mirrors the constants + * in {@code datadog.trace.core.scopemanager.ContinuableScope} (which are package-private and not + * visible from here). + */ +final class ScopeSources { + private ScopeSources() {} + + static String name(byte source) { + switch (source) { + case 0: + return "INSTRUMENTATION"; + case 1: + return "MANUAL"; + case 2: + return "ITERATION"; + case 3: + return "CONTEXT"; + default: + return "UNKNOWN(" + source + ")"; + } + } +} diff --git a/dd-java-agent/instrumentation-testing/src/main/java/datadog/trace/agent/test/scopediag/StackFilter.java b/dd-java-agent/instrumentation-testing/src/main/java/datadog/trace/agent/test/scopediag/StackFilter.java new file mode 100644 index 00000000000..8b360d38fa0 --- /dev/null +++ b/dd-java-agent/instrumentation-testing/src/main/java/datadog/trace/agent/test/scopediag/StackFilter.java @@ -0,0 +1,75 @@ +package datadog.trace.agent.test.scopediag; + +import java.util.ArrayList; +import java.util.List; + +/** + * Trims a raw stack trace down to the frames that point at where a continuation was + * captured/activated/resolved: it drops the diagnostic plumbing, the scope-manager internals, and + * the executor/reflection scaffolding, keeping the top {@code maxFrames} meaningful frames. + */ +final class StackFilter { + private static final String[] DROP_PREFIXES = { + // diagnostic harness itself + "datadog.trace.agent.test.scopediag.", + // tracer scope/continuation machinery and the capture/activate plumbing it sits behind + "datadog.trace.core.", + "datadog.trace.bootstrap.instrumentation.java.concurrent.", + "datadog.trace.bootstrap.instrumentation.api.", + "datadog.trace.bootstrap.InstrumentationContext", + // JDK executor/reflection scaffolding between the caller and the capture + "java.lang.Thread.getStackTrace", + "java.util.concurrent.ThreadPoolExecutor", + "java.util.concurrent.ScheduledThreadPoolExecutor", + "java.util.concurrent.ForkJoinPool", + "java.util.concurrent.ForkJoinWorkerThread", + "java.util.concurrent.Executors$", + "java.util.concurrent.FutureTask", + "java.util.concurrent.CompletableFuture", + "jdk.internal.reflect.", + "java.lang.reflect.", + "sun.reflect.", + // Spock/Groovy/ByteBuddy mock & dynamic-dispatch scaffolding (test harness, not a callsite) + "org.spockframework.mock.", + "org.codehaus.groovy.", + "groovy.lang.", + "net.bytebuddy.", + }; + + private final int maxFrames; + + StackFilter(int maxFrames) { + this.maxFrames = maxFrames; + } + + int maxFrames() { + return maxFrames; + } + + StackTraceElement[] filter(StackTraceElement[] raw) { + if (raw == null) { + return new StackTraceElement[0]; + } + List kept = new ArrayList<>(maxFrames); + for (StackTraceElement frame : raw) { + if (isDropped(frame)) { + continue; + } + kept.add(frame); + if (kept.size() >= maxFrames) { + break; + } + } + return kept.toArray(new StackTraceElement[0]); + } + + private static boolean isDropped(StackTraceElement frame) { + String fqn = frame.getClassName() + "." + frame.getMethodName(); + for (String prefix : DROP_PREFIXES) { + if (fqn.startsWith(prefix) || frame.getClassName().startsWith(prefix)) { + return true; + } + } + return false; + } +} diff --git a/dd-java-agent/instrumentation-testing/src/main/java/datadog/trace/agent/test/scopediag/TrackScopeContinuations.java b/dd-java-agent/instrumentation-testing/src/main/java/datadog/trace/agent/test/scopediag/TrackScopeContinuations.java new file mode 100644 index 00000000000..86450ca859f --- /dev/null +++ b/dd-java-agent/instrumentation-testing/src/main/java/datadog/trace/agent/test/scopediag/TrackScopeContinuations.java @@ -0,0 +1,22 @@ +package datadog.trace.agent.test.scopediag; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Inherited; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Opt-in marker that enables the scope-continuation leak diagnostic for an instrumentation test + * class or method. When absent the diagnostic stays dormant, so existing tests are unaffected. + * + *

Honored by both the JUnit5 {@link ScopeDiagnosticsExtension} and the Groovy/Spock {@code + * InstrumentationSpecification}. + */ +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.TYPE, ElementType.METHOD}) +@Inherited +public @interface TrackScopeContinuations { + /** Fail the test if a never-resolved leak or double/invalid resolution is detected. */ + boolean failOnLeak() default false; +} diff --git a/dd-java-agent/instrumentation-testing/src/test/java/datadog/trace/agent/test/scopediag/ScopeContinuationProbeTest.java b/dd-java-agent/instrumentation-testing/src/test/java/datadog/trace/agent/test/scopediag/ScopeContinuationProbeTest.java new file mode 100644 index 00000000000..73e34d755d8 --- /dev/null +++ b/dd-java-agent/instrumentation-testing/src/test/java/datadog/trace/agent/test/scopediag/ScopeContinuationProbeTest.java @@ -0,0 +1,99 @@ +package datadog.trace.agent.test.scopediag; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import org.junit.jupiter.api.Test; + +/** + * Guards the production names/constants that {@link ScopeContinuationProbe} relies on reflectively + * or mirrors. If any of these are renamed/changed in the tracer, these assertions fail loudly + * instead of the diagnostic silently going dark. + */ +class ScopeContinuationProbeTest { + + @Test + void cancelledSentinelMatchesProduction() throws Exception { + assertEquals(Integer.MIN_VALUE >> 1, ScopeContinuationProbe.CANCELLED); + + Class scopeContinuation = Class.forName("datadog.trace.core.scopemanager.ScopeContinuation"); + Field cancelled = scopeContinuation.getDeclaredField("CANCELLED"); + cancelled.setAccessible(true); + assertEquals( + cancelled.getInt(null), + ScopeContinuationProbe.CANCELLED, + "ScopeContinuationProbe.CANCELLED is out of sync with ScopeContinuation.CANCELLED"); + } + + @Test + void continuationHooksExist() throws Exception { + Class scopeContinuation = Class.forName("datadog.trace.core.scopemanager.ScopeContinuation"); + // methods woven by ScopeContinuationTransformer (matched by name) + assertNotNull(scopeContinuation.getDeclaredMethod("register"), "register() (capture)"); + assertNotNull(scopeContinuation.getDeclaredMethod("activate"), "activate() (resume)"); + assertNotNull(scopeContinuation.getDeclaredMethod("cancel"), "cancel() (resolve)"); + assertNotNull( + scopeContinuation.getDeclaredMethod("cancelFromContinuedScopeClose"), + "cancelFromContinuedScopeClose() (resolve)"); + // fields read by the Cancel advice (@Advice.FieldValue) and the probe (reflection) + assertNotNull(findField(scopeContinuation, "count"), "ScopeContinuation.count"); + assertNotNull(findField(scopeContinuation, "source"), "ScopeContinuation.source"); + } + + @Test + void rootWrittenHookExists() throws Exception { + Class pendingTrace = Class.forName("datadog.trace.core.PendingTrace"); + // PendingTraceAdvice matches write(boolean) and reads these fields via @Advice.FieldValue + assertNotNull( + pendingTrace.getDeclaredMethod("write", boolean.class), "PendingTrace.write(boolean)"); + assertNotNull(findField(pendingTrace, "rootSpanWritten"), "PendingTrace.rootSpanWritten"); + assertNotNull(findField(pendingTrace, "traceId"), "PendingTrace.traceId"); + } + + @Test + void scopeLifecycleHooksExist() throws Exception { + Class scope = Class.forName("datadog.trace.core.scopemanager.ContinuableScope"); + assertNotNull(scope.getDeclaredMethod("afterActivated"), "afterActivated() (scope open)"); + assertNotNull(scope.getDeclaredMethod("onProperClose"), "onProperClose() (scope close)"); + assertNotNull(scope.getDeclaredMethod("close"), "close() (wrong-thread check)"); + // source byte read reflectively in the probe + assertNotNull(findField(scope, "source"), "ContinuableScope.source"); + + Class continuing = Class.forName("datadog.trace.core.scopemanager.ContinuingScope"); + assertNotNull( + continuing.getDeclaredField("continuation"), "ContinuingScope.continuation (scope link)"); + } + + @Test + void wrongThreadCheckChainExists() throws Exception { + Class scope = Class.forName("datadog.trace.core.scopemanager.ContinuableScope"); + assertNotNull(findField(scope, "scopeManager"), "ContinuableScope.scopeManager"); + Class manager = Class.forName("datadog.trace.core.scopemanager.ContinuableScopeManager"); + assertNotNull(manager.getDeclaredMethod("scopeStack"), "ContinuableScopeManager.scopeStack()"); + Class stack = Class.forName("datadog.trace.core.scopemanager.ScopeStack"); + assertTrue(hasMethod(stack, "checkTop", 1), "ScopeStack.checkTop(scope)"); + } + + private static Field findField(Class cls, String name) { + for (Class c = cls; c != null; c = c.getSuperclass()) { + try { + return c.getDeclaredField(name); + } catch (NoSuchFieldException ignored) { + // keep walking + } + } + return null; + } + + private static boolean hasMethod(Class cls, String name, int params) { + for (Method m : cls.getDeclaredMethods()) { + if (m.getName().equals(name) && m.getParameterCount() == params) { + return true; + } + } + return false; + } +} diff --git a/dd-java-agent/instrumentation-testing/src/test/java/datadog/trace/agent/test/scopediag/ScopeDiagnosticsIntegrationTest.java b/dd-java-agent/instrumentation-testing/src/test/java/datadog/trace/agent/test/scopediag/ScopeDiagnosticsIntegrationTest.java new file mode 100644 index 00000000000..2c6a9df8ded --- /dev/null +++ b/dd-java-agent/instrumentation-testing/src/test/java/datadog/trace/agent/test/scopediag/ScopeDiagnosticsIntegrationTest.java @@ -0,0 +1,157 @@ +package datadog.trace.agent.test.scopediag; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import datadog.trace.bootstrap.instrumentation.api.AgentScope; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import datadog.trace.common.writer.ListWriter; +import datadog.trace.core.CoreTracer; +import java.util.List; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +/** + * Exercises the full path: a real {@link CoreTracer} capturing continuations via {@code + * captureSpan} drives {@code ScopeContinuation} -> {@code ScopeContinuationProbe} -> {@link + * ScopeDiagnostics}, and the derived report classifies the leak correctly. + */ +class ScopeDiagnosticsIntegrationTest { + + private CoreTracer tracer; + + @AfterEach + void tearDown() { + ScopeDiagnostics.stop(); + ScopeDiagnostics.reset(); + if (tracer != null) { + tracer.close(); + } + } + + @Test + void capturesRealLeakAndResolvedContinuation() { + tracer = CoreTracer.builder().writer(new ListWriter()).strictTraceWrites(false).build(); + + ScopeDiagnostics.startRecording(); + + AgentSpan span = tracer.startSpan("test", "op"); + AgentScope.Continuation leaked = tracer.captureSpan(span); // captured, never resolved + AgentScope.Continuation resolved = tracer.captureSpan(span); + resolved.cancel(); + + ScopeDiagnosticsReport report = ScopeDiagnostics.report(); + + assertEquals(2, report.records().size(), "both captures recorded"); + assertEquals(1, report.leakCount(), "exactly the un-resolved continuation leaks"); + assertTrue(report.hasProblems()); + // keep a reference so the leak isn't reclaimed before the assertion + assertFalse(leaked.toString().isEmpty()); + + span.finish(); + } + + @Test + void sameSpanReactivationIsNotFlaggedActivateAfterResolve() { + tracer = CoreTracer.builder().writer(new ListWriter()).strictTraceWrites(false).build(); + + ScopeDiagnostics.startRecording(); + + AgentSpan span = tracer.startSpan("test", "op"); + AgentScope active = tracer.activateSpan(span); // span becomes the active top scope + // Capturing then immediately activating the already-active span hits the continueSpan reuse + // optimization: it cancels the continuation from inside activate() before activate() returns. + // The resume must be timestamped at activate() entry (not exit) so it does not appear to occur + // after that internal resolution and spuriously trip ACTIVATE_AFTER_RESOLVE. + AgentScope.Continuation continuation = tracer.captureSpan(span); + AgentScope reused = continuation.activate(); + reused.close(); + active.close(); + span.finish(); + + ScopeDiagnosticsReport report = ScopeDiagnostics.report(); + + assertEquals(1, report.records().size()); + assertEquals( + 0, + report.activateAfterResolveCount(), + "a same-span re-activation resolved during activate() is not activate-after-resolve"); + assertEquals(0, report.leakCount()); + assertFalse(report.hasProblems()); + } + + @Test + void resolvedContinuationDoesNotLeak() { + tracer = CoreTracer.builder().writer(new ListWriter()).strictTraceWrites(false).build(); + + ScopeDiagnostics.startRecording(); + + AgentSpan span = tracer.startSpan("test", "op"); + AgentScope.Continuation continuation = tracer.captureSpan(span); + AgentScope scope = continuation.activate(); + scope.close(); + span.finish(); + + ScopeDiagnosticsReport report = ScopeDiagnostics.report(); + + assertEquals(1, report.records().size()); + assertEquals(0, report.leakCount(), "activated then closed continuation is resolved"); + } + + @Test + void scopeLifetimeRecordedAndLinkedToContinuation() { + tracer = CoreTracer.builder().writer(new ListWriter()).strictTraceWrites(false).build(); + + ScopeDiagnostics.startRecording(); + + AgentSpan span = tracer.startSpan("test", "op"); + AgentScope.Continuation continuation = tracer.captureSpan(span); + AgentScope scope = continuation.activate(); + scope.close(); + span.finish(); + + ScopeDiagnosticsReport report = ScopeDiagnostics.report(); + + ScopeRecord linked = continuationScope(report); + assertNotNull(linked, "the resumed scope was recorded"); + assertNotNull(linked.open(), "scope open observed"); + assertTrue(linked.closed(), "scope close observed"); + assertEquals(0, report.neverClosedScopeCount()); + // the scope links back to its continuation record + assertEquals(1, report.records().size()); + assertEquals(Long.valueOf(report.records().get(0).seq), linked.continuationSeq); + } + + @Test + void neverClosedScopeIsFlagged() { + tracer = CoreTracer.builder().writer(new ListWriter()).strictTraceWrites(false).build(); + + ScopeDiagnostics.startRecording(); + + AgentSpan span = tracer.startSpan("test", "op"); + AgentScope.Continuation continuation = tracer.captureSpan(span); + AgentScope scope = continuation.activate(); // opened, never closed + + ScopeDiagnosticsReport report = ScopeDiagnostics.report(); + + assertEquals(1, report.neverClosedScopeCount(), "the open scope never closed"); + assertEquals(1, report.leakCount(), "and the continuation it backs also leaks"); + assertTrue(report.hasProblems()); + + // clean up so the open scope does not pollute this thread's scope stack for later tests + scope.close(); + span.finish(); + } + + private static ScopeRecord continuationScope(ScopeDiagnosticsReport report) { + List scopes = report.scopeRecords(); + for (ScopeRecord s : scopes) { + if (s.continuationSeq != null) { + return s; + } + } + return null; + } +} diff --git a/dd-java-agent/instrumentation-testing/src/test/java/datadog/trace/agent/test/scopediag/ScopeDiagnosticsReportTest.java b/dd-java-agent/instrumentation-testing/src/test/java/datadog/trace/agent/test/scopediag/ScopeDiagnosticsReportTest.java new file mode 100644 index 00000000000..964ba7f521a --- /dev/null +++ b/dd-java-agent/instrumentation-testing/src/test/java/datadog/trace/agent/test/scopediag/ScopeDiagnosticsReportTest.java @@ -0,0 +1,168 @@ +package datadog.trace.agent.test.scopediag; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import datadog.trace.api.DDTraceId; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.junit.jupiter.api.Test; + +class ScopeDiagnosticsReportTest { + + private static final StackTraceElement[] STACK = { + new StackTraceElement("com.app.Worker", "submit", "Worker.java", 42) + }; + + private static ScopeEvent event(ScopeEvent.Type type, String thread, long nanos) { + return new ScopeEvent(type, thread, nanos, STACK); + } + + private static ContinuationRecord record(long seq, DDTraceId trace) { + return new ContinuationRecord( + seq, trace, 7L, "op", (byte) 0, false, event(ScopeEvent.Type.CAPTURE, "main", 1000)); + } + + @Test + void resolvedContinuationHasNoFailures() { + ContinuationRecord r = record(0, DDTraceId.from(10)); + r.addResume(event(ScopeEvent.Type.ACTIVATE, "pool-1", 2000)); + r.setTerminalOrExtra(event(ScopeEvent.Type.RESOLVE_FINISH, "pool-1", 3000)); + + ScopeDiagnosticsReport report = report(list(r), map()); + + assertEquals(0, report.leakCount()); + assertEquals(0, report.lateCount()); + assertEquals(0, report.doubleCount()); + assertEquals(ContinuationStatus.FINISHED, r.status()); + assertTrue(r.threadHandoff()); // captured on main, resolved on pool-1 + assertFalse(report.hasProblems()); + } + + @Test + void neverResolvedIsFlaggedAsLeak() { + ContinuationRecord r = record(0, DDTraceId.from(11)); + + ScopeDiagnosticsReport report = report(list(r), map()); + + assertEquals(1, report.leakCount()); + assertEquals(ContinuationStatus.LEAKED, r.status()); + assertTrue(report.hasProblems()); + assertTrue(report.renderSummary().contains("LEAKED")); + // the capture callsite is surfaced in the problem summary + assertTrue(report.renderSummary().contains("Worker.java:42")); + } + + @Test + void resolutionAfterRootWriteIsFlaggedLate() { + DDTraceId trace = DDTraceId.from(12); + ContinuationRecord r = record(0, trace); + r.addResume(event(ScopeEvent.Type.ACTIVATE, "pool-1", 5000)); + r.setTerminalOrExtra(event(ScopeEvent.Type.RESOLVE_FINISH, "pool-1", 6000)); + + Map rootWritten = map(); + rootWritten.put(trace, 4000L); // root written before the activation/resolution + + ScopeDiagnosticsReport report = report(list(r), rootWritten); + + assertEquals(1, report.lateCount()); + assertEquals(0, report.leakCount()); // it is resolved, just late + } + + @Test + void lateFinishDoesNotFail() { + DDTraceId trace = DDTraceId.from(120); + ContinuationRecord r = record(0, trace); + r.addResume(event(ScopeEvent.Type.ACTIVATE, "pool-1", 5000)); + r.setTerminalOrExtra(event(ScopeEvent.Type.RESOLVE_FINISH, "pool-1", 6000)); + Map rootWritten = map(); + rootWritten.put(trace, 4000L); + + ScopeDiagnosticsReport report = report(list(r), rootWritten); + + assertEquals(1, report.lateCount()); + assertFalse(report.hasProblems()); // late-finish is report-only + } + + @Test + void multipleResolutionsAreFlaggedDouble() { + ContinuationRecord r = record(0, DDTraceId.from(13)); + r.addResume(event(ScopeEvent.Type.ACTIVATE, "pool-1", 2000)); + r.setTerminalOrExtra(event(ScopeEvent.Type.RESOLVE_FINISH, "pool-1", 3000)); + r.setTerminalOrExtra(event(ScopeEvent.Type.RESOLVE_FINISH, "pool-1", 3500)); + + ScopeDiagnosticsReport report = report(list(r), map()); + + assertEquals(1, report.doubleCount()); + assertTrue(report.hasProblems()); + } + + @Test + void activationAfterResolveIsFailure() { + ContinuationRecord r = record(0, DDTraceId.from(14)); + r.setTerminalOrExtra(event(ScopeEvent.Type.RESOLVE_CANCEL, "pool-1", 2000)); + r.addResume(event(ScopeEvent.Type.ACTIVATE, "pool-2", 3000)); // resume after cancel + + ScopeDiagnosticsReport report = report(list(r), map()); + + assertEquals(1, report.activateAfterResolveCount()); + assertEquals(0, report.doubleCount()); + assertTrue(report.hasProblems()); + } + + @Test + void failedActivationIsActivateAfterResolve() { + ContinuationRecord r = record(0, DDTraceId.from(141)); + r.setTerminalOrExtra(event(ScopeEvent.Type.RESOLVE_CANCEL, "pool-1", 2000)); + r.addFailedActivation(event(ScopeEvent.Type.ACTIVATE_FAILED, "pool-2", 3000)); + + ScopeDiagnosticsReport report = report(list(r), map()); + + assertEquals(1, report.activateAfterResolveCount()); + assertTrue(report.hasProblems()); + } + + @Test + void timelineRendersResolvedContinuationEvenWithoutProblems() { + ContinuationRecord r = record(0, DDTraceId.from(30)); + r.addResume(event(ScopeEvent.Type.ACTIVATE, "pool-1", 2000)); + r.setTerminalOrExtra(event(ScopeEvent.Type.RESOLVE_FINISH, "pool-1", 3000)); + + ScopeDiagnosticsReport report = report(list(r), map()); + + // a clean run: the summary reports no problems ... + assertFalse(report.hasProblems()); + assertTrue(report.renderSummary().contains("(none)")); + + // ... but the timeline still dumps the full lineage so a graph/report can be built + String timeline = report.renderTimeline(); + assertTrue(timeline.contains("#0 FINISHED")); + assertTrue(timeline.contains("capture")); + assertTrue(timeline.contains("resume")); + assertTrue(timeline.contains("finish")); + assertTrue(timeline.contains("Worker.java:42")); // callsite preserved + assertTrue(timeline.contains("@ pool-1")); // resume/finish thread preserved + } + + // ---- helpers ------------------------------------------------------------- + + private static ScopeDiagnosticsReport report( + List records, Map rootWritten) { + return new ScopeDiagnosticsReport(records, new ArrayList<>(), rootWritten); + } + + private static List list(ContinuationRecord... rs) { + List l = new ArrayList<>(); + for (ContinuationRecord r : rs) { + l.add(r); + } + return l; + } + + private static Map map() { + return new HashMap<>(); + } +} diff --git a/dd-java-agent/instrumentation-testing/src/test/java/datadog/trace/agent/test/scopediag/ScopeRecordTest.java b/dd-java-agent/instrumentation-testing/src/test/java/datadog/trace/agent/test/scopediag/ScopeRecordTest.java new file mode 100644 index 00000000000..7ad0112a432 --- /dev/null +++ b/dd-java-agent/instrumentation-testing/src/test/java/datadog/trace/agent/test/scopediag/ScopeRecordTest.java @@ -0,0 +1,86 @@ +package datadog.trace.agent.test.scopediag; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import datadog.trace.api.DDTraceId; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import org.junit.jupiter.api.Test; + +class ScopeRecordTest { + + private static final StackTraceElement[] STACK = { + new StackTraceElement("com.app.Worker", "run", "Worker.java", 7) + }; + + private static ScopeEvent event(ScopeEvent.Type type, String thread, long nanos) { + return new ScopeEvent(type, thread, nanos, STACK); + } + + private static ScopeRecord scope(long seq, Long continuationSeq, String openThread, long nanos) { + return new ScopeRecord( + seq, + DDTraceId.from(1), + 9L, + "op", + (byte) 0, + continuationSeq, + event(ScopeEvent.Type.SCOPE_OPEN, openThread, nanos)); + } + + private static ScopeDiagnosticsReport report(ScopeRecord... scopes) { + List list = new ArrayList<>(); + for (ScopeRecord s : scopes) { + list.add(s); + } + return new ScopeDiagnosticsReport(new ArrayList<>(), list, new HashMap<>()); + } + + @Test + void openAndClosedHasNoFailures() { + ScopeRecord s = scope(0, null, "main", 1000); + s.setClose(event(ScopeEvent.Type.SCOPE_CLOSE, "main", 3000)); + + assertTrue(s.closed()); + assertEquals(0, s.failures().size()); + assertFalse(s.threadHandoff()); + assertEquals(Long.valueOf(2000), s.activeDurationNanos()); + assertFalse(report(s).hasProblems()); + } + + @Test + void openWithoutCloseIsNeverClosed() { + ScopeRecord s = scope(0, null, "main", 1000); + + assertFalse(s.closed()); + assertTrue(s.failures().contains(Failure.NEVER_CLOSED)); + + ScopeDiagnosticsReport report = report(s); + assertEquals(1, report.neverClosedScopeCount()); + assertTrue(report.hasProblems()); // never-closed is a genuine bug + } + + @Test + void openAndCloseOnDifferentThreadsIsHandoff() { + ScopeRecord s = scope(0, null, "main", 1000); + s.setClose(event(ScopeEvent.Type.SCOPE_CLOSE, "pool-1", 2000)); + + assertTrue(s.threadHandoff()); + } + + @Test + void wrongThreadCloseIsReportedButDoesNotFail() { + ScopeRecord s = scope(0, null, "main", 1000); + s.setClose(event(ScopeEvent.Type.SCOPE_CLOSE, "main", 2000)); + s.addWrongThreadClose(event(ScopeEvent.Type.SCOPE_CLOSE_WRONG_THREAD, "pool-2", 1500)); + + assertTrue(s.failures().contains(Failure.CLOSE_WRONG_THREAD)); + + ScopeDiagnosticsReport report = report(s); + assertEquals(1, report.closeWrongThreadCount()); + assertFalse(report.hasProblems()); // wrong-thread is report-only + } +} diff --git a/dd-java-agent/instrumentation-testing/src/test/java/datadog/trace/agent/test/scopediag/StackFilterTest.java b/dd-java-agent/instrumentation-testing/src/test/java/datadog/trace/agent/test/scopediag/StackFilterTest.java new file mode 100644 index 00000000000..3853ab274fd --- /dev/null +++ b/dd-java-agent/instrumentation-testing/src/test/java/datadog/trace/agent/test/scopediag/StackFilterTest.java @@ -0,0 +1,53 @@ +package datadog.trace.agent.test.scopediag; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import org.junit.jupiter.api.Test; + +class StackFilterTest { + + private static StackTraceElement frame(String cls, String method) { + return new StackTraceElement(cls, method, "Src.java", 1); + } + + @Test + void dropsPlumbingAndKeepsAppFrames() { + StackTraceElement[] raw = { + frame("java.lang.Thread", "getStackTrace"), + frame("datadog.trace.agent.test.scopediag.ScopeDiagnostics", "event"), + frame("datadog.trace.core.scopemanager.ScopeContinuation", "register"), + frame("java.util.concurrent.ThreadPoolExecutor", "execute"), + frame("com.app.Service", "doWork"), + frame("com.app.Main", "main"), + }; + + StackTraceElement[] filtered = new StackFilter(6).filter(raw); + + assertEquals(2, filtered.length); + assertEquals("com.app.Service", filtered[0].getClassName()); + assertEquals("com.app.Main", filtered[1].getClassName()); + } + + @Test + void respectsMaxFrames() { + StackTraceElement[] raw = { + frame("com.app.A", "a"), frame("com.app.B", "b"), frame("com.app.C", "c"), + }; + + assertEquals(2, new StackFilter(2).filter(raw).length); + } + + @Test + void handlesNullStack() { + assertEquals(0, new StackFilter(6).filter(null).length); + } + + @Test + void keepsScopeManagerFreeStacks() { + StackTraceElement[] raw = {frame("com.app.Only", "here")}; + StackTraceElement[] filtered = new StackFilter(6).filter(raw); + assertEquals(1, filtered.length); + assertTrue(filtered[0].getClassName().startsWith("com.app")); + } +} diff --git a/docs/superpowers/specs/2026-06-10-scope-continuation-leak-diagnostic-design.md b/docs/superpowers/specs/2026-06-10-scope-continuation-leak-diagnostic-design.md new file mode 100644 index 00000000000..8de35897bfe --- /dev/null +++ b/docs/superpowers/specs/2026-06-10-scope-continuation-leak-diagnostic-design.md @@ -0,0 +1,216 @@ +# Cross-thread scope-continuation leak diagnostic — design + +**Date:** 2026-06-10 +**Status:** Approved (design); implementation pending +**Scope:** Test-time only + +## Problem + +dd-trace-java transfers trace scopes between threads via *continuations*: a scope is +**captured** on one thread (`ScopeContinuation`, registered with the trace collector, +bumping `PendingTrace.pendingReferenceCount`) and later **activated** and/or **cancelled** +on another thread/executor. When a captured continuation is never resolved (neither +activated nor cancelled), or is resolved after the root span has already finished, the +`PendingTraceBuffer` either keeps the trace alive past the root close or discards a span +that arrives too late. In tests this surfaces as the need to disable `strictTraceWrites`, +masking the underlying leak instead of locating it. + +There is no way today to see **where** a continuation was captured, on **which thread** it +was activated/cancelled, and **whether** it leaked. The existing `HealthMetrics` +callbacks (`onCaptureContinuation` / `onFinishContinuation` / `onCancelContinuation`) are +**no-arg** — they carry no continuation identity and no trace id, so capture→activate→cancel +cannot be correlated and "late after root close" cannot be computed. + +## Goal + +A passive, test-time diagnostic that records the lifecycle of every scope continuation with +correlated identity, captures the capture/activate/resolve **callsites** (stack traces), +and renders a Gantt-style timeline plus a leak summary, so a developer can locate the +instrumentation that leaks continuations across threads. + +### Findings the diagnostic must derive +1. **Never-resolved captures** — captured but never activated *and* never cancelled within + the recording window. The classic leak that keeps `pendingReferenceCount > 0` and forces + `strictTraceWrites(false)`. +2. **Cross-thread timeline** — full Gantt of every capture→activate/resolve: capture + thread + callsite, activation thread + callsite, latency, and whether resolution happened + on a different thread/executor (even for non-leaking continuations). +3. **Late activation after root close** — captures that *are* resolved, but activated/closed + after the root span of their trace already finished (`rootSpanWritten`). +4. **Double / invalid resolution** — a continuation activated after cancel, resolved more + than once, or otherwise mishandled in the capture/activate handshake. + +## Non-goals +- Production runtime diagnostic (this is test-time only; production stays inert). +- HTML/visual timeline rendering (text + JSON only). +- Failing tests by default — passive/report-only unless explicitly opted in. +- Parallel in-JVM test execution support (instrumentation tests run one-at-a-time per JVM). + +## Architecture + +Four layers: a minimal production **seam** in `dd-trace-core`, a test-side **recorder +engine**, **renderers**, and **harness integration** (JUnit5 + Spock + static API). + +### 1. Production seam — `dd-trace-core` (`datadog.trace.core.scopemanager`) + +A new `ContinuationDiagnostics` holder, inert in production (listener is `null` unless test +code installs one): + +```java +public final class ContinuationDiagnostics { + public interface Listener { + void onCapture(AgentScope.Continuation id, DDTraceId traceId, long spanId, byte source); + void onActivate(AgentScope.Continuation id, DDTraceId traceId, long spanId, byte source); + void onResolve(AgentScope.Continuation id, boolean cancelled); // finish vs cancel + void onRootWritten(DDTraceId traceId); + } + private static volatile Listener LISTENER; // null in prod + public static void install(Listener l) { LISTENER = l; } + public static void clear() { LISTENER = null; } + static Listener listener() { return LISTENER; } // single volatile read +} +``` + +**Identity** is the `AgentScope.Continuation` instance itself (`ScopeContinuation implements +AgentScope.Continuation`), used only as an identity key downstream (never `equals`/`hashCode`). +`AgentScope.Continuation` lives in `internal-api`, so it is a safe public identity type. + +**Call sites** — each guarded by `Listener l = listener(); if (l != null) { ... }`: + +| Location | Method | Notify | +|---|---|---| +| `ScopeContinuation.register()` | after `traceCollector.registerContinuation(this)` | `onCapture(this, traceId, spanId, source)` | +| `ScopeContinuation.activate()` | successful branch (`COUNT.incrementAndGet(this) > 0`) | `onActivate(this, traceId, spanId, source)` | +| `ScopeContinuation.cancel()` | mirror `onFinishContinuation` (count==0) → `onResolve(this,false)`; mirror `onCancelContinuation` → `onResolve(this,true)` | +| `ScopeContinuation.cancelFromContinuedScopeClose()` | where it calls `onFinishContinuation` | `onResolve(this, false)` | +| `PendingTrace.write()` | non-partial path where `rootSpanWritten = true` | `onRootWritten(traceId)` | + +`traceId`/`spanId` at capture/activate are obtained from the available `context` via +`spanFromContext(context)` (already imported in that package). `source` is the existing +`byte` source field (`INSTRUMENTATION`/`MANUAL`/`ITERATION`/`CONTEXT`). + +**The seam carries only identity + ids + source.** Timestamp, thread, and stack trace are +captured by the recorder *inside* the callback — callbacks run synchronously on the event's +own thread, so `Thread.currentThread()` and `new Throwable().getStackTrace()` are accurate +for the capturing/activating/resolving thread. This keeps the production footprint to a +handful of null-guarded calls with zero behavior change and zero allocation when off. + +### 2. Recorder engine — `:dd-java-agent:instrumentation-testing` (`datadog.trace.agent.test.scopediag`) + +`ScopeDiagnostics` — static facade and the `Listener` implementation that installs itself: + +- API: `startRecording()`, `reset()`, `stop()`, `report() -> ScopeDiagnosticsReport`, + `assertNoLeaks()`. +- Correlation store: `Collections.synchronizedMap(new IdentityHashMap())` (identity-keyed; never calls `equals`/`hashCode` on the continuation). +- `ContinuationRecord`: monotonic seq id, `traceId`, `spanId`, `source`, the capture + `Event`, a list of activation `Event`s, and the resolution `Event`(s). +- `Event`: `{ threadName, nanos (System.nanoTime), filtered StackTraceElement[] }`. +- `Map rootWrittenNanos` for late-after-root detection. +- A global, time-ordered event list backing the timeline. +- **Stack filter**: drops frames in `datadog.trace.core.scopemanager`, the diagnostics + package, and `java.util.concurrent` executor internals; keeps the top N meaningful frames + (default 5, configurable). Goal: surface the integration advice + app callsite. + +**Derived findings** computed at `report()` time from the records + `rootWrittenNanos`: +never-resolved, late-after-root, double/invalid, and the full cross-thread timeline. + +### 3. Renderers + +> **Addendum (2026-06):** The Mermaid renderer (`toMermaidGantt()`) and the **JSON output** were +> removed — visualization moved to the `investigate-continuation-leakage` skill (an LLM reads text +> fine, and the hand-rolled JSON serializer was a liability). Java now emits two text views, both +> logged at the end of each tracked test: +> +> - **`renderTimeline()`** — the complete dump: *every* continuation and scope with its full event +> lineage (capture → resume(s) → terminal, threads, `+Δms` relative timing, callsites, nested +> spawned scopes), emitted **regardless of whether anything leaked**. This is the feed the skill +> reads to render a Gantt or DAG, so a graph/report can always be produced. (This replaces the old +> `renderGantt()`, minus the chart framing.) +> - **`renderSummary()`** — the problem-only view (flagged records + callsites), still used for +> `assertNoLeaks` failure messages. +> +> The `@TrackScopeContinuations` attributes were reduced to just `failOnLeak` (default `false`); +> `gantt()`/`mermaid()`/`json()` were all removed. +> +> **Always-on (2026-06):** `@TrackScopeContinuations` is now placed on the two base classes +> (`AbstractInstrumentationTest` and `InstrumentationSpecification`) and is `@Inherited`, so **every** +> instrumentation test tracks continuations and dumps the full timeline after each test — in +> report-only mode (`failOnLeak=false`; leaks are logged, never fail the test). This trades per-test +> cost (a ByteBuddy retransform of the tracer core classes per test JVM + a filtered stack capture +> per scope/continuation event) for fleet-wide visibility. Individual tests/methods can still carry +> their own `@TrackScopeContinuations(failOnLeak = true)` to opt into enforcement. + +- **Full timeline**: all continuations/scopes with per-event threads, relative timing, and + callsites — the feed the skill consumes to build a graph or report, leak or no leak. +- **Leak-only summary**: just the flagged problems with callsites — the `assertNoLeaks` message. + +### 4. Harness integration (passive by default — no impact on existing tests) + +- **Static API** (works anywhere, incl. Groovy and non-JUnit code): + `ScopeDiagnostics.startRecording()` / `report()` / `assertNoLeaks()`. +- **JUnit5**: `ScopeDiagnosticsExtension` (`BeforeEachCallback`, `AfterEachCallback`) + registered on `AbstractInstrumentationTest` via `@ExtendWith`, but **dormant** unless the + test class or method carries `@TrackScopeContinuations`. When enabled: `reset()` + + `startRecording()` beforeEach; afterEach renders the text Gantt to the log, writes the JSON + file, and calls `assertNoLeaks()` only if `@TrackScopeContinuations(failOnLeak = true)`. +- **Spock**: `InstrumentationSpecification.setup()/cleanup()` honor the same + `@TrackScopeContinuations` annotation (and/or an overridable `trackScopeContinuations()` + returning `false` by default), reusing the same static `ScopeDiagnostics` engine. + +`@TrackScopeContinuations` attributes: `failOnLeak` (default `false`). (The `gantt`/`mermaid`/`json` +attributes were removed — see the Renderers addendum.) + +## Data flow + +``` +capture thread: captureSpan() -> new ScopeContinuation -> register() + -> Listener.onCapture(this, traceId, spanId, source) + -> recorder: create ContinuationRecord{seq, ids, source, captureEvent(thread, t, stack)} + +worker thread: continuation.activate() (COUNT>0) + -> Listener.onActivate(this, ...) -> recorder: append activation Event + +worker thread: continued scope close / cancel + -> Listener.onResolve(this, cancelled) -> recorder: append resolution Event + +any thread: PendingTrace.write() (rootSpanWritten=true) + -> Listener.onRootWritten(traceId) -> recorder: rootWrittenNanos[traceId] = now + +report(): walk records -> classify (never-resolved / late-after-root / double) -> render +``` + +## Error handling +- Recorder callbacks must never throw into tracer code: each `Listener` method body is + wrapped so any diagnostic failure is swallowed/logged, never propagated into + `ScopeContinuation`/`PendingTrace`. +- An `onActivate`/`onResolve` for a continuation with no recorded capture (e.g. captured + before `startRecording()` in the previous test) is recorded as an "orphan" entry rather + than dropped — itself a useful cross-test leak signal. + +## Testing strategy +- Unit tests (JUnit5) for the recorder/classifier: feed synthetic `Listener` event sequences + (capture-only; capture+activate+resolve; capture+late-resolve vs `onRootWritten`; + activate-after-cancel) and assert the derived findings and rendered summary. +- Unit tests for the stack filter (frames in/out). +- Integration test: a small instrumentation test annotated `@TrackScopeContinuations` that + deliberately captures a continuation and never resolves it (e.g. submit to an executor that + drops the task), asserting `report()` flags exactly one never-resolved leak with the + expected capture callsite. Mirror in one Spock spec to verify the Groovy hook. +- Verify production inertness: with no listener installed, the guarded call sites are + exercised by existing tracer-core tests with no behavior change (no new failures). + +## Cost & caveats +- **When off:** one volatile read per scope lifecycle event, no allocation, no behavior change. +- **When on:** a stack-trace capture per event (test-time, acceptable). +- Assumes **one test at a time per JVM** (holds for instrumentation tests). `reset()` isolates + tests; events from a previous test's leaked continuation arriving after `reset()` are + attributed to the new test as orphans — a useful, not harmful, signal. + +## Module placement summary +| Piece | Module | +|---|---| +| `ContinuationDiagnostics` + seam call sites | `dd-trace-core` (`datadog.trace.core.scopemanager`, + `PendingTrace`) | +| `ScopeDiagnostics` engine, records, classifier, renderers | `:dd-java-agent:instrumentation-testing` (`datadog.trace.agent.test.scopediag`) | +| `ScopeDiagnosticsExtension`, `@TrackScopeContinuations` | `:dd-java-agent:instrumentation-testing` | +| `AbstractInstrumentationTest` / `InstrumentationSpecification` wiring | `:dd-java-agent:instrumentation-testing` |