Skip to content

Commit 7b6be6e

Browse files
Merge branch 'master' into bdu/change-preferences-dir-for-tests
2 parents 4945696 + 823cec6 commit 7b6be6e

File tree

6 files changed

+116
-14
lines changed

6 files changed

+116
-14
lines changed

.gitlab/benchmarks/bp-runner.fail-on-breach.yml

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,13 @@ experiments:
3636
- throughput > 1100.0 op/s
3737

3838
# Startup macrobenchmarks
39-
# https://benchmarking.us1.prod.dog/trends?projectId=4&branch=master&trendsTab=per_scenario&scenario=startup%3Apetclinic%3Atracing%3AGlobalTracer&trendsType=scenario
40-
# https://benchmarking.us1.prod.dog/trends?projectId=4&branch=master&trendsTab=per_scenario&scenario=startup%3Apetclinic%3Aappsec%3AGlobalTracer&trendsType=scenario
41-
# https://benchmarking.us1.prod.dog/trends?projectId=4&branch=master&trendsTab=per_scenario&scenario=startup%3Apetclinic%3Aiast%3AGlobalTracer&trendsType=scenario
42-
- name: "startup:petclinic:(tracing|appsec|iast):GlobalTracer"
39+
# https://benchmarking.us1.prod.dog/trends?projectId=4&branch=master&trendsTab=per_scenario&scenario=startup%3Apetclinic%3Atracing%3AAgent.start&trendsType=scenario
40+
- name: "startup:petclinic:tracing:Agent.start"
4341
thresholds:
44-
- execution_time < 280 ms
45-
# https://benchmarking.us1.prod.dog/trends?projectId=4&branch=master&trendsTab=per_scenario&scenario=startup%3Apetclinic%3Aprofiling%3AGlobalTracer&trendsType=scenario
46-
- name: "startup:petclinic:profiling:GlobalTracer"
42+
- execution_time < 1120 ms
43+
# https://benchmarking.us1.prod.dog/trends?projectId=4&branch=master&trendsTab=per_scenario&scenario=startup%3Apetclinic%3Aprofiling%3AAgent.start&trendsType=scenario
44+
# https://benchmarking.us1.prod.dog/trends?projectId=4&branch=master&trendsTab=per_scenario&scenario=startup%3Apetclinic%3Aappsec%3AAgent.start&trendsType=scenario
45+
# https://benchmarking.us1.prod.dog/trends?projectId=4&branch=master&trendsTab=per_scenario&scenario=startup%3Apetclinic%3Aiast%3AAgent.start&trendsType=scenario
46+
- name: "startup:petclinic:(profiling|appsec|iast):Agent.start"
4747
thresholds:
48-
- execution_time < 420 ms
48+
- execution_time < 1300 ms

dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import static datadog.trace.api.datastreams.DataStreamsTags.Direction.OUTBOUND;
88
import static datadog.trace.api.datastreams.DataStreamsTags.createWithClusterId;
99
import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.DSM_CONCERN;
10+
import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.extractContextAndGetSpanContext;
1011
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
1112
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan;
1213
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan;
@@ -33,6 +34,7 @@
3334
import datadog.trace.bootstrap.InstrumentationContext;
3435
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
3536
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
37+
import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext;
3638
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
3739
import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags;
3840
import datadog.trace.instrumentation.kafka_common.ClusterIdHolder;
@@ -76,6 +78,7 @@ public String[] helperClassNames() {
7678
packageName + ".KafkaDecorator",
7779
packageName + ".TextMapInjectAdapterInterface",
7880
packageName + ".TextMapInjectAdapter",
81+
packageName + ".TextMapExtractAdapter",
7982
packageName + ".NoopTextMapInjectAdapter",
8083
packageName + ".KafkaProducerCallback",
8184
"datadog.trace.instrumentation.kafka_common.StreamingContext",
@@ -125,12 +128,26 @@ public static AgentScope onEnter(
125128
ClusterIdHolder.set(clusterId);
126129
}
127130

128-
final AgentSpan parent = activeSpan();
129-
final AgentSpan span = startSpan(KAFKA_PRODUCE);
131+
// Try to extract existing trace context from record headers
132+
final AgentSpanContext extractedContext =
133+
extractContextAndGetSpanContext(record.headers(), TextMapExtractAdapter.GETTER);
134+
135+
final AgentSpan localActiveSpan = activeSpan();
136+
137+
final AgentSpan span;
138+
final AgentSpan callbackParentSpan;
139+
140+
if (extractedContext != null) {
141+
span = startSpan(KAFKA_PRODUCE, extractedContext);
142+
callbackParentSpan = span;
143+
} else {
144+
span = startSpan(KAFKA_PRODUCE);
145+
callbackParentSpan = localActiveSpan;
146+
}
130147
PRODUCER_DECORATE.afterStart(span);
131148
PRODUCER_DECORATE.onProduce(span, record, producerConfig);
132149

133-
callback = new KafkaProducerCallback(callback, parent, span, clusterId);
150+
callback = new KafkaProducerCallback(callback, callbackParentSpan, span, clusterId);
134151

135152
if (record.value() == null) {
136153
span.setTag(InstrumentationTags.TOMBSTONE, true);

dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,11 @@ import org.apache.kafka.clients.producer.ProducerConfig
2525
import org.apache.kafka.clients.producer.ProducerRecord
2626
import org.apache.kafka.clients.producer.RecordMetadata
2727
import org.apache.kafka.common.TopicPartition
28+
import org.apache.kafka.common.header.internals.RecordHeader
29+
import org.apache.kafka.common.header.internals.RecordHeaders
2830
import org.apache.kafka.common.serialization.StringSerializer
31+
32+
import java.nio.charset.StandardCharsets
2933
import org.junit.Rule
3034
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
3135
import org.springframework.kafka.core.DefaultKafkaProducerFactory
@@ -1013,6 +1017,36 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
10131017
"true" | true
10141018
}
10151019

1020+
def "test producer extracts and uses existing trace context from record headers"() {
1021+
setup:
1022+
def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString())
1023+
def producer = new KafkaProducer<>(senderProps)
1024+
1025+
def existingTraceId = 1234567890123456L
1026+
def existingSpanId = 9876543210987654L
1027+
def headers = new RecordHeaders()
1028+
headers.add(new RecordHeader("x-datadog-trace-id",
1029+
String.valueOf(existingTraceId).getBytes(StandardCharsets.UTF_8)))
1030+
headers.add(new RecordHeader("x-datadog-parent-id",
1031+
String.valueOf(existingSpanId).getBytes(StandardCharsets.UTF_8)))
1032+
1033+
when:
1034+
def record = new ProducerRecord(SHARED_TOPIC, 0, null, "test-context-extraction", headers)
1035+
producer.send(record).get()
1036+
1037+
then:
1038+
TEST_WRITER.waitForTraces(1)
1039+
def producedSpan = TEST_WRITER[0][0]
1040+
// Verify the span used the extracted context as parent
1041+
producedSpan.traceId.toLong() == existingTraceId
1042+
producedSpan.parentId == existingSpanId
1043+
// Verify a new span was created (not reusing the extracted span ID)
1044+
producedSpan.spanId != existingSpanId
1045+
1046+
cleanup:
1047+
producer?.close()
1048+
}
1049+
10161050
def containerProperties() {
10171051
try {
10181052
// Different class names for test and latestDepTest.

dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java/datadog/trace/instrumentation/kafka_clients38/KafkaProducerInstrumentation.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ public String[] helperClassNames() {
3838
packageName + ".KafkaDecorator",
3939
packageName + ".TextMapInjectAdapterInterface",
4040
packageName + ".TextMapInjectAdapter",
41+
packageName + ".TextMapExtractAdapter",
4142
packageName + ".NoopTextMapInjectAdapter",
4243
packageName + ".KafkaProducerCallback",
4344
"datadog.trace.instrumentation.kafka_common.StreamingContext",

dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/ProducerAdvice.java

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import static datadog.trace.api.datastreams.DataStreamsTags.Direction.OUTBOUND;
66
import static datadog.trace.api.datastreams.DataStreamsTags.create;
77
import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.DSM_CONCERN;
8+
import static datadog.trace.bootstrap.instrumentation.api.AgentPropagation.extractContextAndGetSpanContext;
89
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
910
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan;
1011
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan;
@@ -21,6 +22,7 @@
2122
import datadog.trace.bootstrap.InstrumentationContext;
2223
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
2324
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
25+
import datadog.trace.bootstrap.instrumentation.api.AgentSpanContext;
2426
import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags;
2527
import datadog.trace.instrumentation.kafka_common.ClusterIdHolder;
2628
import net.bytebuddy.asm.Advice;
@@ -46,12 +48,26 @@ public static AgentScope onEnter(
4648
ClusterIdHolder.set(clusterId);
4749
}
4850

49-
final AgentSpan parent = activeSpan();
50-
final AgentSpan span = startSpan(KAFKA_PRODUCE);
51+
// Try to extract existing trace context from record headers
52+
final AgentSpanContext extractedContext =
53+
extractContextAndGetSpanContext(record.headers(), TextMapExtractAdapter.GETTER);
54+
55+
final AgentSpan localActiveSpan = activeSpan();
56+
57+
final AgentSpan span;
58+
final AgentSpan callbackParentSpan;
59+
60+
if (extractedContext != null) {
61+
span = startSpan(KAFKA_PRODUCE, extractedContext);
62+
callbackParentSpan = span;
63+
} else {
64+
span = startSpan(KAFKA_PRODUCE);
65+
callbackParentSpan = localActiveSpan;
66+
}
5167
PRODUCER_DECORATE.afterStart(span);
5268
PRODUCER_DECORATE.onProduce(span, record, producerConfig);
5369

54-
callback = new KafkaProducerCallback(callback, parent, span, clusterId);
70+
callback = new KafkaProducerCallback(callback, callbackParentSpan, span, clusterId);
5571

5672
if (record.value() == null) {
5773
span.setTag(InstrumentationTags.TOMBSTONE, true);

dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/test/groovy/KafkaClientTestBase.groovy

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,11 @@ import org.apache.kafka.clients.producer.ProducerConfig
1818
import org.apache.kafka.clients.producer.ProducerRecord
1919
import org.apache.kafka.clients.producer.RecordMetadata
2020
import org.apache.kafka.common.TopicPartition
21+
import org.apache.kafka.common.header.internals.RecordHeader
22+
import org.apache.kafka.common.header.internals.RecordHeaders
2123
import org.apache.kafka.common.serialization.StringSerializer
24+
25+
import java.nio.charset.StandardCharsets
2226
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
2327
import org.springframework.kafka.core.DefaultKafkaProducerFactory
2428
import org.springframework.kafka.core.KafkaTemplate
@@ -853,6 +857,36 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
853857
"true" | true
854858
}
855859
860+
def "test producer extracts and uses existing trace context from record headers"() {
861+
setup:
862+
def senderProps = KafkaTestUtils.producerProps(embeddedKafka.getBrokersAsString())
863+
def producer = new KafkaProducer<>(senderProps)
864+
865+
def existingTraceId = 1234567890123456L
866+
def existingSpanId = 9876543210987654L
867+
def headers = new RecordHeaders()
868+
headers.add(new RecordHeader("x-datadog-trace-id",
869+
String.valueOf(existingTraceId).getBytes(StandardCharsets.UTF_8)))
870+
headers.add(new RecordHeader("x-datadog-parent-id",
871+
String.valueOf(existingSpanId).getBytes(StandardCharsets.UTF_8)))
872+
873+
when:
874+
def record = new ProducerRecord(SHARED_TOPIC, 0, null, "test-context-extraction", headers)
875+
producer.send(record).get()
876+
877+
then:
878+
TEST_WRITER.waitForTraces(1)
879+
def producedSpan = TEST_WRITER[0][0]
880+
// Verify the span used the extracted context as parent
881+
producedSpan.traceId.toLong() == existingTraceId
882+
producedSpan.parentId == existingSpanId
883+
// Verify a new span was created (not reusing the extracted span ID)
884+
producedSpan.spanId != existingSpanId
885+
886+
cleanup:
887+
producer?.close()
888+
}
889+
856890
def producerSpan(
857891
TraceAssert trace,
858892
Map<String, ?> config,

0 commit comments

Comments
 (0)