diff --git a/dd-java-agent/instrumentation/websocket/java-websocket/build.gradle b/dd-java-agent/instrumentation/websocket/java-websocket/build.gradle new file mode 100644 index 00000000000..c43f58d1382 --- /dev/null +++ b/dd-java-agent/instrumentation/websocket/java-websocket/build.gradle @@ -0,0 +1,17 @@ +muzzle { + pass { + name = "Java-WebSocket" + group = "org.java-websocket" + module = "Java-WebSocket" + versions = "[1.4.1,)" + } +} + +apply from: "$rootDir/gradle/java.gradle" + + +dependencies { + compileOnly group: 'org.java-websocket', name: 'Java-WebSocket', version: '1.5.3' + + testImplementation group: 'org.java-websocket', name: 'Java-WebSocket', version: '1.5.3' +} diff --git a/dd-java-agent/instrumentation/websocket/java-websocket/src/main/java/datadog/trace/instrumentation/websocket/org/OrgWebsocketModule.java b/dd-java-agent/instrumentation/websocket/java-websocket/src/main/java/datadog/trace/instrumentation/websocket/org/OrgWebsocketModule.java new file mode 100644 index 00000000000..aa2fcc614b6 --- /dev/null +++ b/dd-java-agent/instrumentation/websocket/java-websocket/src/main/java/datadog/trace/instrumentation/websocket/org/OrgWebsocketModule.java @@ -0,0 +1,63 @@ +package datadog.trace.instrumentation.websocket.org; + +import com.google.auto.service.AutoService; +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.agent.tooling.InstrumenterModule; +import datadog.trace.api.InstrumenterConfig; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@AutoService(InstrumenterModule.class) +public class OrgWebsocketModule extends InstrumenterModule.Tracing { + + public OrgWebsocketModule() { + this("java-websocket", "websocket"); + } + + protected OrgWebsocketModule(String instrumentationName, String... additionalNames) { + super(instrumentationName, additionalNames); + } + + @Override + public Map contextStore() { + Map contextStores = new HashMap<>(); + contextStores.put("org.java_websocket.WebSocket", WebsocketAgentSpanContext.class.getName()); + contextStores.put("org.java_websocket.client.WebSocketClient", AgentSpan.class.getName()); + return contextStores; + } + + @Override + public String[] helperClassNames() { + return new String[] { + packageName + ".WebSocketDecorator", + packageName + ".WebSocketClientDecorator", + packageName + ".WebSocketServerDecorator", + packageName + ".WebsocketExtractAdapter", + packageName + ".TraceDraft_6455", + packageName + ".WebsocketHeaderInjector", + packageName + ".WebsocketHeaderExtract", + packageName + ".WebsocketAgentSpanContext" + }; + } + + @Override + protected boolean defaultEnabled() { + return InstrumenterConfig.get().isWebsocketTracingEnabled(); + } + + @Override + public String muzzleDirective() { + return "Java-WebSocket"; + } + + @Override + public List typeInstrumentations() { + return Arrays.asList( + new WebsocketClientInstrumentation(), + new WebsocketServerInstrumentation(), + new WebSocketSendInstrumentation()); + } +} diff --git a/dd-java-agent/instrumentation/websocket/java-websocket/src/main/java/datadog/trace/instrumentation/websocket/org/TraceDraft_6455.java b/dd-java-agent/instrumentation/websocket/java-websocket/src/main/java/datadog/trace/instrumentation/websocket/org/TraceDraft_6455.java new file mode 100644 index 00000000000..290b76d3789 --- /dev/null +++ b/dd-java-agent/instrumentation/websocket/java-websocket/src/main/java/datadog/trace/instrumentation/websocket/org/TraceDraft_6455.java @@ -0,0 +1,68 @@ +package datadog.trace.instrumentation.websocket.org; + +import java.util.ArrayList; +import java.util.List; +import org.java_websocket.drafts.Draft; +import org.java_websocket.drafts.Draft_6455; +import org.java_websocket.extensions.IExtension; +import org.java_websocket.protocols.IProtocol; + +public class TraceDraft_6455 extends Draft_6455 { + + public TraceDraft_6455() { + super(); + } + + public TraceDraft_6455(List inputExtensions) { + super(inputExtensions); + } + + public TraceDraft_6455(List inputExtensions, List inputProtocols) { + super(inputExtensions, inputProtocols); + } + + public TraceDraft_6455( + List inputExtensions, List inputProtocols, int maxFrameSize) { + super(inputExtensions, inputProtocols, maxFrameSize); + } + + public static TraceDraft_6455 fromDraft6455(Draft_6455 original) { + List extensions = new ArrayList<>(); + for (IExtension ext : original.getKnownExtensions()) { + extensions.add(ext); + } + + List protocols = new ArrayList<>(); + for (IProtocol proto : original.getKnownProtocols()) { + protocols.add(proto); + } + + return new TraceDraft_6455(extensions, protocols, original.getMaxFrameSize()); + } + + @Override + public Draft copyInstance() { + ArrayList newExtensions = new ArrayList<>(); + for (IExtension knownExtension : this.getKnownExtensions()) { + newExtensions.add(knownExtension.copyInstance()); + } + + ArrayList newProtocols = new ArrayList<>(); + + for (IProtocol knownProtocol : this.getKnownProtocols()) { + newProtocols.add(knownProtocol.copyInstance()); + } + + return new TraceDraft_6455(newExtensions, newProtocols, getMaxFrameSize()); + } + + @Override + public boolean equals(Object o) { + return this == o || super.equals(o); + } + + @Override + public int hashCode() { + return super.hashCode(); + } +} diff --git a/dd-java-agent/instrumentation/websocket/java-websocket/src/main/java/datadog/trace/instrumentation/websocket/org/WebSocketClientDecorator.java b/dd-java-agent/instrumentation/websocket/java-websocket/src/main/java/datadog/trace/instrumentation/websocket/org/WebSocketClientDecorator.java new file mode 100644 index 00000000000..65991c95e8c --- /dev/null +++ b/dd-java-agent/instrumentation/websocket/java-websocket/src/main/java/datadog/trace/instrumentation/websocket/org/WebSocketClientDecorator.java @@ -0,0 +1,39 @@ +package datadog.trace.instrumentation.websocket.org; + +import static datadog.context.propagation.Propagators.defaultPropagator; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan; +import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_CLIENT; +import static datadog.trace.instrumentation.websocket.org.WebsocketHeaderInjector.SETTER; + +import datadog.trace.bootstrap.instrumentation.api.AgentScope; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import datadog.trace.bootstrap.instrumentation.api.Tags; +import org.java_websocket.client.WebSocketClient; + +public class WebSocketClientDecorator extends WebSocketDecorator { + public static final WebSocketClientDecorator CLIENT_DECORATE = new WebSocketClientDecorator(); + + @Override + protected CharSequence spanKind() { + return SPAN_KIND_CLIENT; + } + + private static final String OPERATION_NAME = "websocket.handshake"; + + public AgentScope startHandshakeSpan(WebSocketClient client) { + String uri = client.getURI().toString(); + AgentSpan span = startSpan("websocket.handshake", OPERATION_NAME); + span.setTag(Tags.HTTP_URL, uri); + span.setTag(Tags.HTTP_METHOD, "GET"); + AgentScope scope = activateSpan(span); + defaultPropagator().inject(scope.context(), client, SETTER); + afterStart(span); + return scope; + } + + public void onHandshakeSuccess(AgentSpan span, int httpStatus) { + span.setTag(Tags.HTTP_STATUS, httpStatus); + span.setTag("websocket.handshake.success", true); + } +} diff --git a/dd-java-agent/instrumentation/websocket/java-websocket/src/main/java/datadog/trace/instrumentation/websocket/org/WebSocketDecorator.java b/dd-java-agent/instrumentation/websocket/java-websocket/src/main/java/datadog/trace/instrumentation/websocket/org/WebSocketDecorator.java new file mode 100644 index 00000000000..1b988a2ad2d --- /dev/null +++ b/dd-java-agent/instrumentation/websocket/java-websocket/src/main/java/datadog/trace/instrumentation/websocket/org/WebSocketDecorator.java @@ -0,0 +1,107 @@ +package datadog.trace.instrumentation.websocket.org; + +import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.extractContextAndGetSpanContext; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan; +import static datadog.trace.bootstrap.instrumentation.websocket.HandlersExtractor.MESSAGE_TYPE_BINARY; +import static datadog.trace.bootstrap.instrumentation.websocket.HandlersExtractor.MESSAGE_TYPE_TEXT; +import static datadog.trace.instrumentation.websocket.org.WebsocketExtractAdapter.GETTER; + +import datadog.trace.api.DDSpanTypes; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext; +import datadog.trace.bootstrap.instrumentation.api.Tags; +import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; +import datadog.trace.bootstrap.instrumentation.decorator.BaseDecorator; +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import org.java_websocket.WebSocket; +import org.java_websocket.handshake.Handshakedata; + +public abstract class WebSocketDecorator extends BaseDecorator { + public static final CharSequence WEBSOCKET = UTF8BytesString.create("org-java-websocket"); + public static final CharSequence WEBSOCKET_OPEN = UTF8BytesString.create("websocket.open"); + public static final CharSequence WEBSOCKET_RECEIVE = UTF8BytesString.create("websocket.receive"); + public static final CharSequence WEBSOCKET_SEND = UTF8BytesString.create("websocket.send"); + public static final CharSequence WEBSOCKET_CLOSE = UTF8BytesString.create("websocket.close"); + private static final String[] INSTRUMENTATION_NAMES = {WEBSOCKET.toString()}; + private static final String REMOTE_ADDRESS = "remote.address"; + private static final String MESSAGE_TYPE = "message.type"; + private static final String MESSAGE_SIZE = "message.size"; + private static final String MESSAGE_REMOTE = "message.remote"; + private static final String MESSAGE_CODE = "message.code"; + private static final String MESSAGE_REASON = "message.reason"; + + @Override + protected String[] instrumentationNames() { + return INSTRUMENTATION_NAMES; + } + + @Override + protected CharSequence spanType() { + return DDSpanTypes.WEBSOCKET; + } + + @Override + protected CharSequence component() { + return WEBSOCKET; + } + + protected abstract CharSequence spanKind(); + + public AgentSpan open(WebSocket conn, Handshakedata handshake) { + AgentSpanContext parentContext = extractContextAndGetSpanContext(handshake, GETTER); + AgentSpan span = startSpan(instrumentationNames()[0], WEBSOCKET_OPEN, parentContext); + InetSocketAddress remoteAddress = conn.getRemoteSocketAddress(); + if (remoteAddress != null) { + span.setTag(REMOTE_ADDRESS, remoteAddress.toString()); + } + afterStart(span); + return span; + } + + public AgentSpan onMessage(Object message, AgentSpanContext spanContext) { + AgentSpan span = startSpan(instrumentationNames()[0], WEBSOCKET_RECEIVE, spanContext); + CharSequence messageType; + int msgSize; + if (message instanceof ByteBuffer) { + messageType = MESSAGE_TYPE_BINARY; + msgSize = ((ByteBuffer) message).remaining(); + } else { + messageType = MESSAGE_TYPE_TEXT; + msgSize = message == null ? 0 : ((CharSequence) message).length(); + } + span.setTag(MESSAGE_SIZE, msgSize); + span.setTag(MESSAGE_TYPE, messageType); + afterStart(span); + return span; + } + + public AgentSpan send(WebsocketAgentSpanContext spanContext) { + AgentSpan span; + if (spanContext == null) { + span = startSpan(instrumentationNames()[0], WEBSOCKET_SEND); + } else { + span = startSpan(instrumentationNames()[0], WEBSOCKET_SEND, spanContext.getSpanContext()); + } + afterStart(span); + if (spanContext != null) { + span.setTag(Tags.SPAN_KIND, spanContext.getSpanKind()); + } + return span; + } + + public AgentSpan onClose(int code, String reason, boolean remote, AgentSpanContext spanContext) { + AgentSpan span = startSpan(instrumentationNames()[0], WEBSOCKET_CLOSE, spanContext); + span.setTag(MESSAGE_REASON, reason); + span.setTag(MESSAGE_CODE, code); + span.setTag(MESSAGE_REMOTE, remote); + afterStart(span); + return span; + } + + @Override + public AgentSpan afterStart(AgentSpan span) { + span.setTag(Tags.SPAN_KIND, spanKind()); + return super.afterStart(span); + } +} diff --git a/dd-java-agent/instrumentation/websocket/java-websocket/src/main/java/datadog/trace/instrumentation/websocket/org/WebSocketSendInstrumentation.java b/dd-java-agent/instrumentation/websocket/java-websocket/src/main/java/datadog/trace/instrumentation/websocket/org/WebSocketSendInstrumentation.java new file mode 100644 index 00000000000..e024645ee1e --- /dev/null +++ b/dd-java-agent/instrumentation/websocket/java-websocket/src/main/java/datadog/trace/instrumentation/websocket/org/WebSocketSendInstrumentation.java @@ -0,0 +1,81 @@ +package datadog.trace.instrumentation.websocket.org; + +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.noopSpan; +import static datadog.trace.instrumentation.websocket.org.WebSocketServerDecorator.SERVER_DECORATOR; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.isPrivate; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.bootstrap.InstrumentationContext; +import datadog.trace.bootstrap.instrumentation.api.AgentScope; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import java.util.Collection; +import net.bytebuddy.asm.Advice; +import org.java_websocket.WebSocket; +import org.java_websocket.WebSocketImpl; +import org.java_websocket.framing.CloseFrame; +import org.java_websocket.framing.Framedata; +import org.java_websocket.framing.PingFrame; +import org.java_websocket.framing.PongFrame; + +public class WebSocketSendInstrumentation + implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice { + + public WebSocketSendInstrumentation() {} + + @Override + public String instrumentedType() { + return "org.java_websocket.WebSocketImpl"; + } + + @Override + public void methodAdvice(MethodTransformer transformer) { + transformer.applyAdvice( + isMethod() + .and(isPrivate()) + .and(named("send")) + .and(takesArguments(1)) + .and(takesArgument(0, Collection.class)), + getClass().getName() + "$OnSendAdvice"); + } + + public static class OnSendAdvice { + @Advice.OnMethodEnter(suppress = Throwable.class) + public static AgentScope onEnter( + @Advice.This WebSocketImpl impl, @Advice.Argument(value = 0) Collection frames) { + WebsocketAgentSpanContext context = + InstrumentationContext.get(WebSocket.class, WebsocketAgentSpanContext.class).get(impl); + if (context == null) { + // close after send + final AgentSpan wsSpan = SERVER_DECORATOR.send(null); + return activateSpan(wsSpan); + } + // ignore ping/pong/close + for (Framedata frame : frames) { + if (frame instanceof PingFrame + || frame instanceof PongFrame + || frame instanceof CloseFrame) { + return activateSpan(noopSpan()); + } + } + final AgentSpan wsSpan = SERVER_DECORATOR.send(context); + return activateSpan(wsSpan); + } + + @Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class) + public static void onExit( + @Advice.Enter final AgentScope scope, @Advice.Thrown final Throwable throwable) { + if (scope == null) { + return; + } + SERVER_DECORATOR.onError(scope.span(), throwable); + SERVER_DECORATOR.beforeFinish(scope.span()); + scope.close(); + scope.span().finish(); + } + } +} diff --git a/dd-java-agent/instrumentation/websocket/java-websocket/src/main/java/datadog/trace/instrumentation/websocket/org/WebSocketServerDecorator.java b/dd-java-agent/instrumentation/websocket/java-websocket/src/main/java/datadog/trace/instrumentation/websocket/org/WebSocketServerDecorator.java new file mode 100644 index 00000000000..e58ebb88523 --- /dev/null +++ b/dd-java-agent/instrumentation/websocket/java-websocket/src/main/java/datadog/trace/instrumentation/websocket/org/WebSocketServerDecorator.java @@ -0,0 +1,12 @@ +package datadog.trace.instrumentation.websocket.org; + +import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_SERVER; + +public class WebSocketServerDecorator extends WebSocketDecorator { + public static final WebSocketServerDecorator SERVER_DECORATOR = new WebSocketServerDecorator(); + + @Override + protected CharSequence spanKind() { + return SPAN_KIND_SERVER; + } +} diff --git a/dd-java-agent/instrumentation/websocket/java-websocket/src/main/java/datadog/trace/instrumentation/websocket/org/WebsocketAgentSpanContext.java b/dd-java-agent/instrumentation/websocket/java-websocket/src/main/java/datadog/trace/instrumentation/websocket/org/WebsocketAgentSpanContext.java new file mode 100644 index 00000000000..c86e20af3ad --- /dev/null +++ b/dd-java-agent/instrumentation/websocket/java-websocket/src/main/java/datadog/trace/instrumentation/websocket/org/WebsocketAgentSpanContext.java @@ -0,0 +1,23 @@ +package datadog.trace.instrumentation.websocket.org; + +import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext; + +public class WebsocketAgentSpanContext { + private AgentSpanContext spanContext; + private String spanKind; + + public WebsocketAgentSpanContext(AgentSpanContext spanContext, Object spanKindObj) { + this.spanContext = spanContext; + if (spanKindObj != null) { + this.spanKind = spanKindObj.toString(); + } + } + + public AgentSpanContext getSpanContext() { + return spanContext; + } + + public String getSpanKind() { + return spanKind; + } +} diff --git a/dd-java-agent/instrumentation/websocket/java-websocket/src/main/java/datadog/trace/instrumentation/websocket/org/WebsocketClientInstrumentation.java b/dd-java-agent/instrumentation/websocket/java-websocket/src/main/java/datadog/trace/instrumentation/websocket/org/WebsocketClientInstrumentation.java new file mode 100644 index 00000000000..4b1d40bf5bf --- /dev/null +++ b/dd-java-agent/instrumentation/websocket/java-websocket/src/main/java/datadog/trace/instrumentation/websocket/org/WebsocketClientInstrumentation.java @@ -0,0 +1,242 @@ +package datadog.trace.instrumentation.websocket.org; + +import static datadog.trace.agent.tooling.bytebuddy.matcher.HierarchyMatchers.extendsClass; +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.nameStartsWith; +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.noopScope; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.noopSpan; +import static datadog.trace.instrumentation.websocket.org.WebSocketClientDecorator.CLIENT_DECORATE; +import static datadog.trace.instrumentation.websocket.org.WebSocketDecorator.WEBSOCKET_CLOSE; +import static datadog.trace.instrumentation.websocket.org.WebSocketDecorator.WEBSOCKET_OPEN; +import static net.bytebuddy.matcher.ElementMatchers.isConstructor; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.isPublic; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.bootstrap.CallDepthThreadLocalMap; +import datadog.trace.bootstrap.InstrumentationContext; +import datadog.trace.bootstrap.instrumentation.api.AgentScope; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import datadog.trace.bootstrap.instrumentation.api.Tags; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.java_websocket.WebSocket; +import org.java_websocket.client.WebSocketClient; +import org.java_websocket.drafts.Draft; +import org.java_websocket.drafts.Draft_6455; +import org.java_websocket.handshake.Handshakedata; +import org.java_websocket.handshake.ServerHandshake; + +public class WebsocketClientInstrumentation + implements Instrumenter.ForTypeHierarchy, Instrumenter.HasMethodAdvice { + + public WebsocketClientInstrumentation() {} + + @Override + public String hierarchyMarkerType() { + return "org.java_websocket.client.WebSocketClient"; + } + + @Override + public ElementMatcher hierarchyMatcher() { + return extendsClass(named(hierarchyMarkerType())); + } + + @Override + public void methodAdvice(MethodTransformer transformer) { + transformer.applyAdvice( + isMethod().and(isPublic()).and(nameStartsWith("onWebsocketOpen")).and(takesArguments(2)), + getClass().getName() + "$OnOpenAdvice"); + + transformer.applyAdvice( + isConstructor().and(takesArgument(1, named("org.java_websocket.drafts.Draft"))), + getClass().getName() + "$ClientConstructorAdvice"); + + transformer.applyAdvice( + isMethod().and(isPublic()).and(named("connect")).and(takesArguments(0)), + getClass().getName() + "$ConnectAdvice"); + + transformer.applyAdvice( + isMethod().and(isPublic()).and(nameStartsWith("onWebsocketMessage")).and(takesArguments(2)), + getClass().getName() + "$OnMessageAdvice"); + + transformer.applyAdvice( + isMethod().and(isPublic()).and(nameStartsWith("onWebsocketClose")).and(takesArguments(4)), + getClass().getName() + "$OnCloseAdvice"); + + transformer.applyAdvice( + isMethod().and(isPublic()).and(named("onWebsocketError")).and(takesArguments(2)), + getClass().getName() + "$ErrorAdvice"); + } + + public static class ConnectAdvice { + @Advice.OnMethodEnter(suppress = Throwable.class) + public static AgentScope onEnter(@Advice.This WebSocketClient client) { + int callDepth = CallDepthThreadLocalMap.incrementCallDepth(WebSocketClient.class); + if (callDepth > 0) { + return null; + } + AgentScope scope = CLIENT_DECORATE.startHandshakeSpan(client); + InstrumentationContext.get(WebSocketClient.class, AgentSpan.class).put(client, scope.span()); + return scope; + } + + @Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class) + public static void onExit( + @Advice.Enter AgentScope scope, + @Advice.Thrown Throwable throwable, + @Advice.This WebSocketClient client) { + + CallDepthThreadLocalMap.decrementCallDepth(WebSocketClient.class); + if (scope == null) { + return; + } + if (throwable != null) { + CLIENT_DECORATE.onError(scope.span(), throwable); + CLIENT_DECORATE.beforeFinish(scope.span()); + InstrumentationContext.get(WebSocketClient.class, AgentSpan.class).remove(client); + } + scope.close(); + if (throwable != null) { + scope.span().finish(); + } + } + } + + public static class ClientConstructorAdvice { + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void onEnter(@Advice.Argument(value = 1, readOnly = false) Draft protocolDraft) { + if (protocolDraft == null) { + return; + } + if (protocolDraft instanceof Draft_6455 && !(protocolDraft instanceof TraceDraft_6455)) { + try { + protocolDraft = TraceDraft_6455.fromDraft6455((Draft_6455) protocolDraft); + } catch (Exception e) { + // Keep the original draft if wrapping fails. + } + } + } + } + + public static class OnOpenAdvice { + @Advice.OnMethodEnter(suppress = Throwable.class) + public static AgentScope onEnter( + @Advice.Argument(value = 0) WebSocket conn, + @Advice.Argument(value = 1) Handshakedata handshake, + @Advice.This WebSocketClient client) { + AgentSpan span = + InstrumentationContext.get(WebSocketClient.class, AgentSpan.class).get(client); + if (span == null) { + return noopScope(); + } + span.setOperationName(WEBSOCKET_OPEN); + CLIENT_DECORATE.onHandshakeSuccess(span, ((ServerHandshake) handshake).getHttpStatus()); + InstrumentationContext.get(WebSocketClient.class, AgentSpan.class).remove(client); + WebsocketAgentSpanContext spanContext = + new WebsocketAgentSpanContext(span.spanContext(), span.getTag(Tags.SPAN_KIND)); + InstrumentationContext.get(WebSocket.class, WebsocketAgentSpanContext.class) + .put(conn, spanContext); + return activateSpan(span); + } + + @Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class) + public static void onExit( + @Advice.Enter final AgentScope scope, @Advice.Thrown final Throwable throwable) { + if (scope == null) { + return; + } + CLIENT_DECORATE.onError(scope.span(), throwable); + CLIENT_DECORATE.beforeFinish(scope.span()); + + scope.close(); + scope.span().finish(); + } + } + + public static class OnMessageAdvice { + @Advice.OnMethodEnter(suppress = Throwable.class) + public static AgentScope onEnter( + @Advice.Argument(value = 0) WebSocket engine, @Advice.Argument(value = 1) Object message) { + WebsocketAgentSpanContext context = + InstrumentationContext.get(WebSocket.class, WebsocketAgentSpanContext.class).get(engine); + if (context == null) { + return activateSpan(noopSpan()); + } + final AgentSpan wsSpan = CLIENT_DECORATE.onMessage(message, context.getSpanContext()); + return activateSpan(wsSpan); + } + + @Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class) + public static void onExit( + @Advice.Enter final AgentScope scope, @Advice.Thrown final Throwable throwable) { + if (scope == null) { + return; + } + CLIENT_DECORATE.onError(scope.span(), throwable); + CLIENT_DECORATE.beforeFinish(scope.span()); + + scope.close(); + scope.span().finish(); + } + } + + public static class OnCloseAdvice { + @Advice.OnMethodEnter(suppress = Throwable.class) + public static AgentScope onEnter( + @Advice.This WebSocketClient client, + @Advice.Argument(value = 0) WebSocket conn, + @Advice.Argument(value = 1) int code, + @Advice.Argument(value = 2) String message, + @Advice.Argument(value = 3) boolean remote) { + WebsocketAgentSpanContext context = + InstrumentationContext.get(WebSocket.class, WebsocketAgentSpanContext.class).get(conn); + if (context == null) { + AgentSpan agentSpan = + InstrumentationContext.get(WebSocketClient.class, AgentSpan.class).get(client); + if (agentSpan != null) { + agentSpan.setOperationName(WEBSOCKET_CLOSE); + InstrumentationContext.get(WebSocketClient.class, AgentSpan.class).remove(client); + return activateSpan(agentSpan); + } + return activateSpan(noopSpan()); + } + + final AgentSpan wsSpan = + CLIENT_DECORATE.onClose(code, message, remote, context.getSpanContext()); + InstrumentationContext.get(WebSocket.class, WebsocketAgentSpanContext.class).remove(conn); + return activateSpan(wsSpan); + } + + @Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class) + public static void onExit( + @Advice.Enter final AgentScope scope, @Advice.Thrown final Throwable throwable) { + if (scope == null) { + return; + } + CLIENT_DECORATE.onError(scope.span(), throwable); + CLIENT_DECORATE.beforeFinish(scope.span()); + scope.close(); + scope.span().finish(); + } + } + + public static class ErrorAdvice { + @Advice.OnMethodExit(suppress = Throwable.class) + public static void onExit( + @Advice.This WebSocketClient client, + @Advice.Argument(value = 0) WebSocket conn, + @Advice.Argument(value = 1) Exception ex) { + AgentSpan agentSpan = + InstrumentationContext.get(WebSocketClient.class, AgentSpan.class).get(client); + if (agentSpan == null) { + return; + } + CLIENT_DECORATE.onError(agentSpan, ex); + } + } +} diff --git a/dd-java-agent/instrumentation/websocket/java-websocket/src/main/java/datadog/trace/instrumentation/websocket/org/WebsocketExtractAdapter.java b/dd-java-agent/instrumentation/websocket/java-websocket/src/main/java/datadog/trace/instrumentation/websocket/org/WebsocketExtractAdapter.java new file mode 100644 index 00000000000..420a5f2c418 --- /dev/null +++ b/dd-java-agent/instrumentation/websocket/java-websocket/src/main/java/datadog/trace/instrumentation/websocket/org/WebsocketExtractAdapter.java @@ -0,0 +1,29 @@ +package datadog.trace.instrumentation.websocket.org; + +import datadog.trace.bootstrap.instrumentation.api.AgentPropagation; +import java.util.Iterator; +import org.java_websocket.handshake.Handshakedata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class WebsocketExtractAdapter implements AgentPropagation.ContextVisitor { + public static final WebsocketExtractAdapter GETTER = new WebsocketExtractAdapter(); + private static final Logger log = LoggerFactory.getLogger(WebsocketExtractAdapter.class); + + @Override + public void forEachKey(Handshakedata carrier, AgentPropagation.KeyClassifier classifier) { + Iterator iterator = carrier.iterateHttpFields(); + while (iterator.hasNext()) { + String key = iterator.next(); + String value = carrier.getFieldValue(key); + if (log.isDebugEnabled()) { + log.info("websocket ==== key:{},value:{}", key, value); + } + if (null != value) { + if (!classifier.accept(key, value)) { + return; + } + } + } + } +} diff --git a/dd-java-agent/instrumentation/websocket/java-websocket/src/main/java/datadog/trace/instrumentation/websocket/org/WebsocketHeaderExtract.java b/dd-java-agent/instrumentation/websocket/java-websocket/src/main/java/datadog/trace/instrumentation/websocket/org/WebsocketHeaderExtract.java new file mode 100644 index 00000000000..4bfcfdaeb08 --- /dev/null +++ b/dd-java-agent/instrumentation/websocket/java-websocket/src/main/java/datadog/trace/instrumentation/websocket/org/WebsocketHeaderExtract.java @@ -0,0 +1,18 @@ +package datadog.trace.instrumentation.websocket.org; + +import datadog.trace.bootstrap.instrumentation.api.AgentPropagation; +import java.util.Map; + +public class WebsocketHeaderExtract + implements AgentPropagation.ContextVisitor> { + public static final WebsocketHeaderExtract HEADER_GETTER = new WebsocketHeaderExtract(); + + @Override + public void forEachKey(Map carrier, AgentPropagation.KeyClassifier classifier) { + for (Map.Entry entry : carrier.entrySet()) { + if (!classifier.accept(entry.getKey(), entry.getValue())) { + return; + } + } + } +} diff --git a/dd-java-agent/instrumentation/websocket/java-websocket/src/main/java/datadog/trace/instrumentation/websocket/org/WebsocketHeaderInjector.java b/dd-java-agent/instrumentation/websocket/java-websocket/src/main/java/datadog/trace/instrumentation/websocket/org/WebsocketHeaderInjector.java new file mode 100644 index 00000000000..9098bb6f421 --- /dev/null +++ b/dd-java-agent/instrumentation/websocket/java-websocket/src/main/java/datadog/trace/instrumentation/websocket/org/WebsocketHeaderInjector.java @@ -0,0 +1,13 @@ +package datadog.trace.instrumentation.websocket.org; + +import datadog.context.propagation.CarrierSetter; +import org.java_websocket.client.WebSocketClient; + +public class WebsocketHeaderInjector implements CarrierSetter { + public static final WebsocketHeaderInjector SETTER = new WebsocketHeaderInjector(); + + @Override + public void set(WebSocketClient carrier, String key, String value) { + carrier.addHeader(key, value); + } +} diff --git a/dd-java-agent/instrumentation/websocket/java-websocket/src/main/java/datadog/trace/instrumentation/websocket/org/WebsocketServerInstrumentation.java b/dd-java-agent/instrumentation/websocket/java-websocket/src/main/java/datadog/trace/instrumentation/websocket/org/WebsocketServerInstrumentation.java new file mode 100644 index 00000000000..52bc1227cb6 --- /dev/null +++ b/dd-java-agent/instrumentation/websocket/java-websocket/src/main/java/datadog/trace/instrumentation/websocket/org/WebsocketServerInstrumentation.java @@ -0,0 +1,138 @@ +package datadog.trace.instrumentation.websocket.org; + +import static datadog.trace.agent.tooling.bytebuddy.matcher.HierarchyMatchers.extendsClass; +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.nameStartsWith; +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.noopSpan; +import static datadog.trace.instrumentation.websocket.org.WebSocketServerDecorator.SERVER_DECORATOR; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.isPublic; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.bootstrap.InstrumentationContext; +import datadog.trace.bootstrap.instrumentation.api.AgentScope; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext; +import datadog.trace.bootstrap.instrumentation.api.Tags; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.java_websocket.WebSocket; +import org.java_websocket.handshake.Handshakedata; + +public class WebsocketServerInstrumentation + implements Instrumenter.ForTypeHierarchy, Instrumenter.HasMethodAdvice { + + public WebsocketServerInstrumentation() {} + + @Override + public String hierarchyMarkerType() { + return "org.java_websocket.server.WebSocketServer"; + } + + @Override + public ElementMatcher hierarchyMatcher() { + return extendsClass(named(hierarchyMarkerType())); + } + + @Override + public void methodAdvice(MethodTransformer transformer) { + transformer.applyAdvice( + isMethod().and(isPublic()).and(nameStartsWith("onWebsocketOpen")).and(takesArguments(2)), + getClass().getName() + "$OnOpenAdvice"); + transformer.applyAdvice( + isMethod().and(isPublic()).and(nameStartsWith("onWebsocketMessage")).and(takesArguments(2)), + getClass().getName() + "$OnMessageAdvice"); + transformer.applyAdvice( + isMethod().and(isPublic()).and(nameStartsWith("onWebsocketClose")).and(takesArguments(4)), + getClass().getName() + "$OnCloseAdvice"); + } + + public static class OnOpenAdvice { + @Advice.OnMethodEnter(suppress = Throwable.class) + public static AgentScope onEnter( + @Advice.Argument(value = 0) WebSocket conn, + @Advice.Argument(value = 1) Handshakedata handshake) { + AgentSpan span = SERVER_DECORATOR.open(conn, handshake); + WebsocketAgentSpanContext spanContext = + new WebsocketAgentSpanContext(span.spanContext(), span.getTag(Tags.SPAN_KIND)); + InstrumentationContext.get(WebSocket.class, WebsocketAgentSpanContext.class) + .put(conn, spanContext); + return activateSpan(span); + } + + @Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class) + public static void onExit( + @Advice.Enter final AgentScope scope, @Advice.Thrown final Throwable throwable) { + if (scope == null) { + return; + } + SERVER_DECORATOR.onError(scope.span(), throwable); + SERVER_DECORATOR.beforeFinish(scope.span()); + + scope.close(); + scope.span().finish(); + } + } + + public static class OnMessageAdvice { + @Advice.OnMethodEnter(suppress = Throwable.class) + public static AgentScope onEnter( + @Advice.Argument(value = 0) WebSocket conn, @Advice.Argument(value = 1) Object message) { + WebsocketAgentSpanContext context = + InstrumentationContext.get(WebSocket.class, WebsocketAgentSpanContext.class).get(conn); + if (context == null) { + return activateSpan(noopSpan()); + } + AgentSpanContext spanContext = context.getSpanContext(); + final AgentSpan wsSpan = SERVER_DECORATOR.onMessage(message, spanContext); + return activateSpan(wsSpan); + } + + @Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class) + public static void onExit( + @Advice.Enter final AgentScope scope, @Advice.Thrown final Throwable throwable) { + if (scope == null) { + return; + } + SERVER_DECORATOR.onError(scope.span(), throwable); + SERVER_DECORATOR.beforeFinish(scope.span()); + + scope.close(); + scope.span().finish(); + } + } + + public static class OnCloseAdvice { + @Advice.OnMethodEnter(suppress = Throwable.class) + public static AgentScope onEnter( + @Advice.Argument(value = 0) WebSocket conn, + @Advice.Argument(value = 1) int code, + @Advice.Argument(value = 2) String message, + @Advice.Argument(value = 3) boolean remote) { + WebsocketAgentSpanContext context = + InstrumentationContext.get(WebSocket.class, WebsocketAgentSpanContext.class).get(conn); + if (context == null) { + return activateSpan(noopSpan()); + } + AgentSpanContext spanContext = context.getSpanContext(); + final AgentSpan wsSpan = SERVER_DECORATOR.onClose(code, message, remote, spanContext); + InstrumentationContext.get(WebSocket.class, WebsocketAgentSpanContext.class).remove(conn); + return activateSpan(wsSpan); + } + + @Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class) + public static void onExit( + @Advice.Enter final AgentScope scope, @Advice.Thrown final Throwable throwable) { + if (scope == null) { + return; + } + SERVER_DECORATOR.onError(scope.span(), throwable); + SERVER_DECORATOR.beforeFinish(scope.span()); + scope.close(); + scope.span().finish(); + } + } +} diff --git a/dd-java-agent/instrumentation/websocket/java-websocket/src/test/java/datadog/trace/instrumentation/websocket/org/JavaWebSocketTest.java b/dd-java-agent/instrumentation/websocket/java-websocket/src/test/java/datadog/trace/instrumentation/websocket/org/JavaWebSocketTest.java new file mode 100644 index 00000000000..76c8f23c5d8 --- /dev/null +++ b/dd-java-agent/instrumentation/websocket/java-websocket/src/test/java/datadog/trace/instrumentation/websocket/org/JavaWebSocketTest.java @@ -0,0 +1,225 @@ +package datadog.trace.instrumentation.websocket.org; + +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import datadog.trace.agent.test.AbstractInstrumentationTest; +import datadog.trace.bootstrap.instrumentation.api.Tags; +import datadog.trace.core.DDSpan; +import java.net.InetSocketAddress; +import java.net.URI; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.java_websocket.WebSocket; +import org.java_websocket.client.WebSocketClient; +import org.java_websocket.handshake.ClientHandshake; +import org.java_websocket.handshake.ServerHandshake; +import org.java_websocket.server.WebSocketServer; +import org.junit.jupiter.api.Test; + +class JavaWebSocketTest extends AbstractInstrumentationTest { + + @Test + void tracesClientAndServerWebsocketLifecycle() throws Exception { + EchoServer server = new EchoServer(); + server.start(); + assertTrue(server.started.await(5, TimeUnit.SECONDS)); + + TestClient client = new TestClient(new URI("ws://localhost:" + server.getPort())); + client.connect(); + assertNull(activeSpan()); + assertTrue(client.opened.await(5, TimeUnit.SECONDS)); + assertTrue(server.opened.await(5, TimeUnit.SECONDS)); + + client.send("hello"); + assertTrue(server.textReceived.await(5, TimeUnit.SECONDS)); + assertTrue(client.textReceived.await(5, TimeUnit.SECONDS)); + assertEquals("echo:hello", client.textMessage.get()); + + byte[] payload = new byte[] {1, 2, 3}; + client.send(ByteBuffer.wrap(payload)); + assertTrue(server.binaryReceived.await(5, TimeUnit.SECONDS)); + assertTrue(client.binaryReceived.await(5, TimeUnit.SECONDS)); + assertArrayEquals(payload, client.binaryMessage.get()); + + client.close(1000, "done"); + assertTrue(client.closed.await(5, TimeUnit.SECONDS)); + assertTrue(server.closed.await(5, TimeUnit.SECONDS)); + server.stop(1000); + + blockUntilTracesMatch(traces -> countByOperation(flatten(traces), "websocket.open") >= 2); + + List spans = flatten(writer); + assertTrue( + countByOperation(spans, "websocket.receive") >= 4, + () -> "Expected websocket.receive spans. Spans: " + summarize(spans)); + assertTrue( + countByOperation(spans, "websocket.send") >= 4, + () -> "Expected websocket.send spans. Spans: " + summarize(spans)); + assertTrue( + countByOperation(spans, "websocket.close") >= 2, + () -> "Expected websocket.close spans. Spans: " + summarize(spans)); + + DDSpan clientOpen = findSpan(spans, "websocket.open", Tags.SPAN_KIND_CLIENT); + DDSpan serverOpen = findSpan(spans, "websocket.open", Tags.SPAN_KIND_SERVER); + assertNotNull(clientOpen); + assertNotNull(serverOpen); + assertEquals(clientOpen.getSpanId(), serverOpen.getParentId()); + + assertNotNull(findMessageSpan(spans, "text", 5)); + assertNotNull(findMessageSpan(spans, "binary", payload.length)); + + for (DDSpan span : spans) { + if (span.getOperationName().toString().startsWith("websocket.")) { + assertEquals("org-java-websocket", String.valueOf(span.getTag(Tags.COMPONENT))); + assertEquals("websocket", String.valueOf(span.getSpanType())); + } + } + } + + private static int countByOperation(List spans, String operationName) { + int count = 0; + for (DDSpan span : spans) { + if (operationName.equals(span.getOperationName().toString())) { + count++; + } + } + return count; + } + + private static DDSpan findSpan(List spans, String operationName, String spanKind) { + for (DDSpan span : spans) { + if (operationName.equals(span.getOperationName().toString()) + && spanKind.equals(span.getTag(Tags.SPAN_KIND))) { + return span; + } + } + return null; + } + + private static DDSpan findMessageSpan(List spans, String messageType, int messageSize) { + for (DDSpan span : spans) { + if ("websocket.receive".equals(span.getOperationName().toString()) + && messageType.equals(String.valueOf(span.getTag("message.type"))) + && Integer.valueOf(messageSize).equals(span.getTag("message.size"))) { + return span; + } + } + return null; + } + + private static List summarize(List spans) { + List summary = new ArrayList<>(); + for (DDSpan span : spans) { + summary.add( + span.getOperationName() + + "/" + + span.getTag(Tags.SPAN_KIND) + + "/" + + span.getTag("message.type") + + "/" + + span.getTag("message.size")); + } + return summary; + } + + private static List flatten(List> traces) { + List result = new ArrayList<>(); + for (List trace : traces) { + result.addAll(trace); + } + return result; + } + + static class EchoServer extends WebSocketServer { + final CountDownLatch started = new CountDownLatch(1); + final CountDownLatch opened = new CountDownLatch(1); + final CountDownLatch textReceived = new CountDownLatch(1); + final CountDownLatch binaryReceived = new CountDownLatch(1); + final CountDownLatch closed = new CountDownLatch(1); + + EchoServer() { + super(new InetSocketAddress("localhost", 0)); + } + + @Override + public void onOpen(WebSocket conn, ClientHandshake handshake) { + opened.countDown(); + } + + @Override + public void onMessage(WebSocket conn, String message) { + textReceived.countDown(); + conn.send("echo:" + message); + } + + @Override + public void onMessage(WebSocket conn, ByteBuffer message) { + byte[] bytes = new byte[message.remaining()]; + message.get(bytes); + binaryReceived.countDown(); + conn.send(ByteBuffer.wrap(bytes)); + } + + @Override + public void onClose(WebSocket conn, int code, String reason, boolean remote) { + closed.countDown(); + } + + @Override + public void onError(WebSocket conn, Exception ex) {} + + @Override + public void onStart() { + started.countDown(); + } + } + + static class TestClient extends WebSocketClient { + final CountDownLatch opened = new CountDownLatch(1); + final CountDownLatch textReceived = new CountDownLatch(1); + final CountDownLatch binaryReceived = new CountDownLatch(1); + final CountDownLatch closed = new CountDownLatch(1); + final AtomicReference textMessage = new AtomicReference<>(); + final AtomicReference binaryMessage = new AtomicReference<>(); + + TestClient(URI serverUri) { + super(serverUri); + } + + @Override + public void onOpen(ServerHandshake handshake) { + opened.countDown(); + } + + @Override + public void onMessage(String message) { + textMessage.set(message); + textReceived.countDown(); + } + + @Override + public void onMessage(ByteBuffer bytes) { + byte[] message = new byte[bytes.remaining()]; + bytes.get(message); + binaryMessage.set(message); + binaryReceived.countDown(); + } + + @Override + public void onClose(int code, String reason, boolean remote) { + closed.countDown(); + } + + @Override + public void onError(Exception ex) {} + } +} diff --git a/metadata/supported-configurations.json b/metadata/supported-configurations.json index ae081138b71..d0143e3475c 100644 --- a/metadata/supported-configurations.json +++ b/metadata/supported-configurations.json @@ -6841,6 +6841,14 @@ "aliases": ["DD_TRACE_INTEGRATION_JAVAX_WEBSOCKET_ENABLED", "DD_INTEGRATION_JAVAX_WEBSOCKET_ENABLED"] } ], + "DD_TRACE_JAVA_WEBSOCKET_ENABLED": [ + { + "version": "A", + "type": "boolean", + "default": "true", + "aliases": ["DD_TRACE_INTEGRATION_JAVA_WEBSOCKET_ENABLED", "DD_INTEGRATION_JAVA_WEBSOCKET_ENABLED"] + } + ], "DD_TRACE_JAVA_COMPLETABLEFUTURE_ENABLED": [ { "version": "A", diff --git a/settings.gradle.kts b/settings.gradle.kts index 0647382920a..8fed5558778 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -639,6 +639,7 @@ include( ":dd-java-agent:instrumentation:vertx:vertx-web:vertx-web-5.0", ":dd-java-agent:instrumentation:weaver-0.9", ":dd-java-agent:instrumentation:websocket:jakarta-websocket-2.0", + ":dd-java-agent:instrumentation:websocket:java-websocket", ":dd-java-agent:instrumentation:websocket:javax-websocket-1.0", ":dd-java-agent:instrumentation:websocket:jetty-websocket:jetty-websocket-10.0", ":dd-java-agent:instrumentation:websocket:jetty-websocket:jetty-websocket-11.0",