From 01ff34699035aa5550ac647efd260af7e1f2e30f Mon Sep 17 00:00:00 2001 From: Alexey Kuznetsov Date: Tue, 30 Jun 2026 13:58:11 -0400 Subject: [PATCH] v1: serialize span links as structured data and honor baggage tag config. --- .../common/writer/ddagent/TraceMapperV1.java | 4 +- .../java/datadog/trace/core/CoreSpan.java | 13 +- .../main/java/datadog/trace/core/DDSpan.java | 6 +- .../datadog/trace/core/DDSpanContext.java | 14 + .../trace/common/metrics/SimpleSpan.groovy | 3 - .../ddagent/TraceMapperV1PayloadTest.groovy | 575 +----------------- .../trace/common/writer/TraceGenerator.java | 6 - .../writer/ddagent/V1PayloadReader.java | 558 +++++++++++++++++ .../trace/core/DDSpanSerializationTest.java | 151 +++++ .../groovy/TraceGenerator.groovy | 5 - 10 files changed, 757 insertions(+), 578 deletions(-) create mode 100644 dd-trace-core/src/test/java/datadog/trace/common/writer/ddagent/V1PayloadReader.java diff --git a/dd-trace-core/src/main/java/datadog/trace/common/writer/ddagent/TraceMapperV1.java b/dd-trace-core/src/main/java/datadog/trace/common/writer/ddagent/TraceMapperV1.java index 4cb41c597f4..819ff2021be 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/writer/ddagent/TraceMapperV1.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/writer/ddagent/TraceMapperV1.java @@ -89,7 +89,7 @@ public void map(List> trace, Writable writable) { } CoreSpan firstSpan = trace.get(0); - firstSpan.processTagsAndBaggage(spanMetadata, false, false); + firstSpan.processTagsAndBaggageWithStructuredLinks(spanMetadata); Metadata firstSpanMeta = spanMetadata.metadata; // encoded fields: 1..7, but skipping #5, as not required by tracers and set by the agent. @@ -128,7 +128,7 @@ private void encodeSpans(Writable writable, int fieldId, List span : spans) { if (meta == null) { - span.processTagsAndBaggage(spanMetadata, false, false); + span.processTagsAndBaggageWithStructuredLinks(spanMetadata); meta = spanMetadata.metadata; } TagMap tags = meta.getTags(); diff --git a/dd-trace-core/src/main/java/datadog/trace/core/CoreSpan.java b/dd-trace-core/src/main/java/datadog/trace/core/CoreSpan.java index 4d0d8c87f99..b0017afd749 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/CoreSpan.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/CoreSpan.java @@ -87,8 +87,17 @@ public interface CoreSpan> { void processTagsAndBaggage(MetadataConsumer consumer); - void processTagsAndBaggage( - MetadataConsumer consumer, boolean injectLinksAsTags, boolean injectBaggageAsTags); + /** + * Variant of {@link #processTagsAndBaggage(MetadataConsumer)} for protocols that serialize span + * links as first-class structured data rather than tags. Baggage tag injection still follows the + * tracer configuration. + * + *

To simplify tests, by default delegating to {@link + * #processTagsAndBaggage(MetadataConsumer)}. + */ + default void processTagsAndBaggageWithStructuredLinks(MetadataConsumer consumer) { + processTagsAndBaggage(consumer); + } T setSamplingPriority(int samplingPriority, int samplingMechanism); diff --git a/dd-trace-core/src/main/java/datadog/trace/core/DDSpan.java b/dd-trace-core/src/main/java/datadog/trace/core/DDSpan.java index 33206b491b8..cbf503c8d0c 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/DDSpan.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/DDSpan.java @@ -781,10 +781,8 @@ public void processTagsAndBaggage(final MetadataConsumer consumer) { } @Override - public void processTagsAndBaggage( - final MetadataConsumer consumer, boolean injectLinksAsTags, boolean injectBaggageAsTags) { - context.processTagsAndBaggage( - consumer, longRunningVersion, this, injectLinksAsTags, injectBaggageAsTags); + public void processTagsAndBaggageWithStructuredLinks(final MetadataConsumer consumer) { + context.processTagsAndBaggageWithStructuredLinks(consumer, longRunningVersion, this); } @Override diff --git a/dd-trace-core/src/main/java/datadog/trace/core/DDSpanContext.java b/dd-trace-core/src/main/java/datadog/trace/core/DDSpanContext.java index 520311a20c1..6120502ec09 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/DDSpanContext.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/DDSpanContext.java @@ -1199,6 +1199,20 @@ void processTagsAndBaggage( consumer, longRunningVersion, restrictedSpan, injectLinksAsTags, injectBaggageAsTags); } + /** + * Serialize span links as first-class structured data rather than tags. While baggage tag + * injection keeps following the tracer configuration. + */ + void processTagsAndBaggageWithStructuredLinks( + final MetadataConsumer consumer, int longRunningVersion, DDSpan restrictedSpan) { + processTagsAndBaggage( + consumer, + longRunningVersion, + restrictedSpan, + false, // injectLinksAsTags + injectBaggageAsTags); + } + void processTagsAndBaggage( final MetadataConsumer consumer, int longRunningVersion, diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SimpleSpan.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SimpleSpan.groovy index f23045856ef..8cb37243790 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SimpleSpan.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SimpleSpan.groovy @@ -245,9 +245,6 @@ class SimpleSpan implements CoreSpan { @Override void processTagsAndBaggage(MetadataConsumer consumer) {} - @Override - void processTagsAndBaggage(MetadataConsumer consumer, boolean injectLinksAsTags, boolean injectBaggageAsTags) {} - @Override SimpleSpan setSamplingPriority(int samplingPriority, int samplingMechanism) { return this diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/writer/ddagent/TraceMapperV1PayloadTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/writer/ddagent/TraceMapperV1PayloadTest.groovy index a1350a7538c..b9975dd3038 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/writer/ddagent/TraceMapperV1PayloadTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/writer/ddagent/TraceMapperV1PayloadTest.groovy @@ -1,14 +1,17 @@ package datadog.trace.common.writer.ddagent import static datadog.trace.common.writer.TraceGenerator.generateRandomTraces +import static datadog.trace.common.writer.ddagent.V1PayloadReader.readAttributes +import static datadog.trace.common.writer.ddagent.V1PayloadReader.readFirstSpan +import static datadog.trace.common.writer.ddagent.V1PayloadReader.readStreamingString +import static datadog.trace.common.writer.ddagent.V1PayloadReader.skipChunkField +import static datadog.trace.common.writer.ddagent.V1PayloadReader.skipPayloadField +import static datadog.trace.common.writer.ddagent.V1PayloadReader.skipSpanField +import static datadog.trace.common.writer.ddagent.V1PayloadReader.unpackUnsignedLong import static org.junit.jupiter.api.Assertions.assertArrayEquals import static org.junit.jupiter.api.Assertions.assertEquals import static org.junit.jupiter.api.Assertions.assertNotNull import static org.junit.jupiter.api.Assertions.assertTrue -import static org.msgpack.core.MessageFormat.FIXSTR -import static org.msgpack.core.MessageFormat.STR16 -import static org.msgpack.core.MessageFormat.STR32 -import static org.msgpack.core.MessageFormat.STR8 import datadog.communication.serialization.ByteBufferConsumer import datadog.communication.serialization.FlushingBuffer @@ -345,20 +348,20 @@ class TraceMapperV1PayloadTest extends DDSpecification { stringTable.add("") when: - List> links = readFirstSpanLinks(unpacker, stringTable) + List> links = readFirstSpan(unpacker, stringTable).links then: assertEquals(2, links.size()) assertArrayEquals(traceIdBytes(DDTraceId.fromHex("11223344556677889900aabbccddeeff")), links[0].traceId as byte[]) assertEquals(DDSpanId.fromHex("000000000000002a"), links[0].spanId) - assertEquals("dd=s:1", links[0].tracestate) - assertEquals(1L, links[0].flags) + assertEquals("dd=s:1", links[0].traceState) + assertEquals(1L, links[0].traceFlags) assertEquals(["link.kind": "follows_from", "context_headers": "tracecontext"], links[0].attributes) assertArrayEquals(traceIdBytes(DDTraceId.fromHex("00000000000000000000000000000001")), links[1].traceId as byte[]) assertEquals(DDSpanId.fromHex("0000000000000002"), links[1].spanId) - assertEquals("", links[1].tracestate) - assertEquals(0L, links[1].flags) + assertEquals("", links[1].traceState) + assertEquals(0L, links[1].traceFlags) assertEquals([:], links[1].attributes) } @@ -437,7 +440,7 @@ class TraceMapperV1PayloadTest extends DDSpecification { stringTable.add("") when: - List> links = readFirstSpanLinks(unpacker, stringTable) + List> links = readFirstSpan(unpacker, stringTable).links then: assertTrue(links.isEmpty()) @@ -487,7 +490,7 @@ class TraceMapperV1PayloadTest extends DDSpecification { stringTable.add("") when: - List> events = readFirstSpanEvents(unpacker, stringTable) + List> events = readFirstSpan(unpacker, stringTable).events then: assertEquals(2, events.size()) @@ -531,7 +534,7 @@ class TraceMapperV1PayloadTest extends DDSpecification { stringTable.add("") when: - List> events = readFirstSpanEvents(unpacker, stringTable) + List> events = readFirstSpan(unpacker, stringTable).events then: assertTrue(events.isEmpty()) @@ -565,7 +568,7 @@ class TraceMapperV1PayloadTest extends DDSpecification { stringTable.add("") when: - Map attributes = readFirstSpanAttributes(unpacker, stringTable) + Map attributes = readFirstSpan(unpacker, stringTable).attributes byte[] metaStructBytes = attributes["meta_key"] as byte[] MessageUnpacker metaStructUnpacker = MessagePack.newDefaultUnpacker(metaStructBytes) int metaStructFieldCount = metaStructUnpacker.unpackMapHeader() @@ -635,7 +638,7 @@ class TraceMapperV1PayloadTest extends DDSpecification { stringTable.add("") when: - Map attributes = readFirstSpanAttributes(unpacker, stringTable) + Map attributes = readFirstSpan(unpacker, stringTable).attributes then: assertTrue(attributes.containsKey("usr.id")) @@ -691,7 +694,7 @@ class TraceMapperV1PayloadTest extends DDSpecification { stringTable.add("") when: - Map attributes = readFirstSpanAttributes(unpacker, stringTable) + Map attributes = readFirstSpan(unpacker, stringTable).attributes then: assertEquals(true, attributes.get("tag.bool")) @@ -728,7 +731,7 @@ class TraceMapperV1PayloadTest extends DDSpecification { stringTable.add("") when: - Map attributes = readFirstSpanAttributes(unpacker, stringTable) + Map attributes = readFirstSpan(unpacker, stringTable).attributes then: assertAttributeValueEquals(span.getTag(DDTags.THREAD_ID), attributes.get(DDTags.THREAD_ID), DDTags.THREAD_ID) @@ -1031,40 +1034,6 @@ class TraceMapperV1PayloadTest extends DDSpecification { } } - private static Map readAttributes(MessageUnpacker unpacker, List stringTable) { - int attrArraySize = unpacker.unpackArrayHeader() - assertEquals(0, attrArraySize % 3) - int attrCount = attrArraySize / 3 - - Map attributes = new HashMap<>() - for (int i = 0; i < attrCount; i++) { - String key = readStreamingString(unpacker, stringTable) - int attrType = unpacker.unpackInt() - Object value - switch (attrType) { - case TraceMapperV1.VALUE_TYPE_STRING: - value = readStreamingString(unpacker, stringTable) - break - case TraceMapperV1.VALUE_TYPE_BOOLEAN: - value = unpacker.unpackBoolean() - break - case TraceMapperV1.VALUE_TYPE_FLOAT: - value = unpacker.unpackDouble() - break - case TraceMapperV1.VALUE_TYPE_BYTES: - int len = unpacker.unpackBinaryHeader() - byte[] data = new byte[len] - unpacker.readPayload(data) - value = data - break - default: - Assertions.fail("Unknown attribute value type: " + attrType) - } - attributes.put(key, value) - } - return attributes - } - private static void assertAttributeValueEquals(Object expected, Object actual, String key) { if (expected instanceof Number) { assertTrue(actual instanceof Number, "Attribute $key should be numeric") @@ -1079,14 +1048,6 @@ class TraceMapperV1PayloadTest extends DDSpecification { } } - private static long unpackUnsignedLong(MessageUnpacker unpacker) { - MessageFormat format = unpacker.nextFormat - if (format == MessageFormat.UINT64) { - return DDSpanId.from("${unpacker.unpackBigInteger()}") - } - return unpacker.unpackLong() - } - private static void addFlattenedExpectedAttribute( Map expectedAttributes, String key, @@ -1126,498 +1087,6 @@ class TraceMapperV1PayloadTest extends DDSpecification { } } - private static String readStreamingString(MessageUnpacker unpacker, List stringTable) { - MessageFormat format = unpacker.getNextFormat() - if (format == FIXSTR || format == STR8 || format == STR16 || format == STR32) { - String value = unpacker.unpackString() - if (!stringTable.contains(value)) { - stringTable.add(value) - } - return value - } - - int index = unpacker.unpackInt() - assertTrue(index >= 0 && index < stringTable.size(), "Invalid string-table index: " + index) - return stringTable.get(index) - } - - private static void skipPayloadField(MessageUnpacker unpacker, int fieldId, List stringTable) { - switch (fieldId) { - case 2: - case 3: - case 4: - case 5: - case 6: - case 7: - case 8: - case 9: - readStreamingString(unpacker, stringTable) - break - case 10: - readAttributes(unpacker, stringTable) - break - default: - Assertions.fail("Unexpected payload field id while skipping: " + fieldId) - } - } - - private static void skipChunkField(MessageUnpacker unpacker, int fieldId, List stringTable) { - switch (fieldId) { - case 1: - unpacker.unpackInt() - break - case 2: - readStreamingString(unpacker, stringTable) - break - case 3: - readAttributes(unpacker, stringTable) - break - case 4: - int spanCount = unpacker.unpackArrayHeader() - for (int i = 0; i < spanCount; i++) { - skipSpan(unpacker, stringTable) - } - break - case 5: - unpacker.unpackBoolean() - break - case 6: - int len = unpacker.unpackBinaryHeader() - byte[] ignored = new byte[len] - unpacker.readPayload(ignored) - break - case 7: - unpacker.unpackInt() - break - default: - Assertions.fail("Unexpected chunk field id while skipping: " + fieldId) - } - } - - private static void skipSpan(MessageUnpacker unpacker, List stringTable) { - int fieldCount = unpacker.unpackMapHeader() - for (int i = 0; i < fieldCount; i++) { - int fieldId = unpacker.unpackInt() - switch (fieldId) { - case 1: - case 2: - case 3: - case 10: - case 13: - case 14: - case 15: - readStreamingString(unpacker, stringTable) - break - case 4: - case 5: - unpacker.unpackValue().asNumberValue().toLong() - break - case 6: - case 7: - unpacker.unpackLong() - break - case 8: - unpacker.unpackBoolean() - break - case 9: - int attrArraySize = unpacker.unpackArrayHeader() - int attrCount = attrArraySize / 3 - for (int j = 0; j < attrCount; j++) { - readStreamingString(unpacker, stringTable) - int type = unpacker.unpackInt() - switch (type) { - case TraceMapperV1.VALUE_TYPE_STRING: - readStreamingString(unpacker, stringTable) - break - case TraceMapperV1.VALUE_TYPE_BOOLEAN: - unpacker.unpackBoolean() - break - case TraceMapperV1.VALUE_TYPE_FLOAT: - unpacker.unpackDouble() - break - case TraceMapperV1.VALUE_TYPE_BYTES: - int len = unpacker.unpackBinaryHeader() - byte[] ignored = new byte[len] - unpacker.readPayload(ignored) - break - default: - Assertions.fail("Unexpected attribute type while skipping: " + type) - } - } - break - case 11: - case 12: - unpacker.unpackArrayHeader() - break - case 16: - unpacker.unpackInt() - break - default: - Assertions.fail("Unexpected span field id while skipping: " + fieldId) - } - } - } - - private static Map readFirstSpanAttributes( - MessageUnpacker unpacker, - List stringTable) { - int payloadFieldCount = unpacker.unpackMapHeader() - for (int i = 0; i < payloadFieldCount; i++) { - int payloadFieldId = unpacker.unpackInt() - if (payloadFieldId != 11) { - skipPayloadField(unpacker, payloadFieldId, stringTable) - continue - } - - int chunkCount = unpacker.unpackArrayHeader() - assertEquals(1, chunkCount) - - int chunkFieldCount = unpacker.unpackMapHeader() - for (int chunkFieldIndex = 0; chunkFieldIndex < chunkFieldCount; chunkFieldIndex++) { - int chunkFieldId = unpacker.unpackInt() - if (chunkFieldId != 4) { - skipChunkField(unpacker, chunkFieldId, stringTable) - continue - } - - int spanCount = unpacker.unpackArrayHeader() - assertEquals(1, spanCount) - - int spanFieldCount = unpacker.unpackMapHeader() - for (int spanFieldIndex = 0; spanFieldIndex < spanFieldCount; spanFieldIndex++) { - int spanFieldId = unpacker.unpackInt() - if (spanFieldId == 9) { - return readAttributes(unpacker, stringTable) - } - skipSpanField(unpacker, spanFieldId, stringTable) - } - } - } - Assertions.fail("Could not find span attributes field in first span") - return [:] - } - - private static List> readFirstSpanLinks( - MessageUnpacker unpacker, - List stringTable) { - int payloadFieldCount = unpacker.unpackMapHeader() - for (int i = 0; i < payloadFieldCount; i++) { - int payloadFieldId = unpacker.unpackInt() - if (payloadFieldId != 11) { - skipPayloadField(unpacker, payloadFieldId, stringTable) - continue - } - - int chunkCount = unpacker.unpackArrayHeader() - assertEquals(1, chunkCount) - - int chunkFieldCount = unpacker.unpackMapHeader() - for (int chunkFieldIndex = 0; chunkFieldIndex < chunkFieldCount; chunkFieldIndex++) { - int chunkFieldId = unpacker.unpackInt() - if (chunkFieldId != 4) { - skipChunkField(unpacker, chunkFieldId, stringTable) - continue - } - - int spanCount = unpacker.unpackArrayHeader() - assertEquals(1, spanCount) - - int spanFieldCount = unpacker.unpackMapHeader() - for (int spanFieldIndex = 0; spanFieldIndex < spanFieldCount; spanFieldIndex++) { - int spanFieldId = unpacker.unpackInt() - if (spanFieldId == 11) { - return readSpanLinks(unpacker, stringTable) - } - skipSpanField(unpacker, spanFieldId, stringTable) - } - } - } - Assertions.fail("Could not find span links field in first span") - return [] - } - - private static void skipSpanField(MessageUnpacker unpacker, int fieldId, List stringTable) { - switch (fieldId) { - case 1: - case 2: - case 3: - case 10: - case 13: - case 14: - case 15: - readStreamingString(unpacker, stringTable) - break - case 4: - case 5: - unpacker.unpackValue().asNumberValue().toLong() - break - case 6: - case 7: - unpacker.unpackLong() - break - case 8: - unpacker.unpackBoolean() - break - case 9: - readAttributes(unpacker, stringTable) - break - case 12: - int eventsCount = unpacker.unpackArrayHeader() - for (int j = 0; j < eventsCount; j++) { - skipSpanEvent(unpacker, stringTable) - } - break - case 11: - int linksCount = unpacker.unpackArrayHeader() - for (int j = 0; j < linksCount; j++) { - int linkFieldCount = unpacker.unpackMapHeader() - for (int k = 0; k < linkFieldCount; k++) { - int linkFieldId = unpacker.unpackInt() - switch (linkFieldId) { - case 1: - int traceIdLen = unpacker.unpackBinaryHeader() - byte[] ignored = new byte[traceIdLen] - unpacker.readPayload(ignored) - break - case 2: - case 5: - unpacker.unpackValue().asNumberValue().toLong() - break - case 3: - readAttributes(unpacker, stringTable) - break - case 4: - readStreamingString(unpacker, stringTable) - break - default: - Assertions.fail("Unexpected span link field id while skipping: " + linkFieldId) - } - } - } - break - case 16: - unpacker.unpackInt() - break - default: - Assertions.fail("Unexpected span field id while skipping: " + fieldId) - } - } - - private static List> readSpanLinks( - MessageUnpacker unpacker, - List stringTable) { - int linksCount = unpacker.unpackArrayHeader() - List> links = [] - - for (int i = 0; i < linksCount; i++) { - int linkFieldCount = unpacker.unpackMapHeader() - assertEquals(5, linkFieldCount) - - byte[] traceId = null - Long spanId = null - Map attributes = null - String tracestate = null - Long flags = null - - for (int j = 0; j < linkFieldCount; j++) { - int linkFieldId = unpacker.unpackInt() - switch (linkFieldId) { - case 1: - int traceIdLen = unpacker.unpackBinaryHeader() - traceId = new byte[traceIdLen] - unpacker.readPayload(traceId) - break - case 2: - spanId = unpacker.unpackValue().asNumberValue().toLong() - break - case 3: - attributes = readAttributes(unpacker, stringTable) - break - case 4: - tracestate = readStreamingString(unpacker, stringTable) - break - case 5: - flags = unpacker.unpackValue().asNumberValue().toLong() - break - default: - Assertions.fail("Unexpected span link field id: " + linkFieldId) - } - } - - links.add([ - traceId : traceId, - spanId : spanId, - attributes: attributes, - tracestate: tracestate, - flags : flags - ]) - } - - return links - } - - private static List> readFirstSpanEvents( - MessageUnpacker unpacker, - List stringTable) { - int payloadFieldCount = unpacker.unpackMapHeader() - for (int i = 0; i < payloadFieldCount; i++) { - int payloadFieldId = unpacker.unpackInt() - if (payloadFieldId != 11) { - skipPayloadField(unpacker, payloadFieldId, stringTable) - continue - } - - int chunkCount = unpacker.unpackArrayHeader() - assertEquals(1, chunkCount) - - int chunkFieldCount = unpacker.unpackMapHeader() - for (int chunkFieldIndex = 0; chunkFieldIndex < chunkFieldCount; chunkFieldIndex++) { - int chunkFieldId = unpacker.unpackInt() - if (chunkFieldId != 4) { - skipChunkField(unpacker, chunkFieldId, stringTable) - continue - } - - int spanCount = unpacker.unpackArrayHeader() - assertEquals(1, spanCount) - - int spanFieldCount = unpacker.unpackMapHeader() - for (int spanFieldIndex = 0; spanFieldIndex < spanFieldCount; spanFieldIndex++) { - int spanFieldId = unpacker.unpackInt() - if (spanFieldId == 12) { - return readSpanEvents(unpacker, stringTable) - } - skipSpanField(unpacker, spanFieldId, stringTable) - } - } - } - Assertions.fail("Could not find span events field in first span") - return [] - } - - private static List> readSpanEvents( - MessageUnpacker unpacker, - List stringTable) { - int eventsCount = unpacker.unpackArrayHeader() - List> events = [] - - for (int i = 0; i < eventsCount; i++) { - int eventFieldCount = unpacker.unpackMapHeader() - assertEquals(3, eventFieldCount) - - Long timeUnixNano = null - String name = null - Map attributes = null - - for (int j = 0; j < eventFieldCount; j++) { - int eventFieldId = unpacker.unpackInt() - switch (eventFieldId) { - case 1: - timeUnixNano = unpacker.unpackLong() - break - case 2: - name = readStreamingString(unpacker, stringTable) - break - case 3: - attributes = readEventAttributes(unpacker, stringTable) - break - default: - Assertions.fail("Unexpected span event field id: " + eventFieldId) - } - } - - events.add([ - timeUnixNano: timeUnixNano, - name : name, - attributes : attributes - ]) - } - return events - } - - private static Map readEventAttributes( - MessageUnpacker unpacker, - List stringTable) { - int attrArraySize = unpacker.unpackArrayHeader() - assertEquals(0, attrArraySize % 3) - int attrCount = attrArraySize / 3 - Map attributes = new HashMap<>() - - for (int i = 0; i < attrCount; i++) { - String key = readStreamingString(unpacker, stringTable) - int attrType = unpacker.unpackInt() - Object value - switch (attrType) { - case TraceMapperV1.VALUE_TYPE_STRING: - value = readStreamingString(unpacker, stringTable) - break - case TraceMapperV1.VALUE_TYPE_BOOLEAN: - value = unpacker.unpackBoolean() - break - case TraceMapperV1.VALUE_TYPE_FLOAT: - value = unpacker.unpackDouble() - break - case TraceMapperV1.VALUE_TYPE_INT: - value = unpacker.unpackLong() - break - case TraceMapperV1.VALUE_TYPE_ARRAY: - value = readEventArrayValue(unpacker, stringTable) - break - default: - Assertions.fail("Unknown event attribute value type: " + attrType) - } - attributes.put(key, value) - } - return attributes - } - - private static List readEventArrayValue(MessageUnpacker unpacker, List stringTable) { - int itemArraySize = unpacker.unpackArrayHeader() - assertEquals(0, itemArraySize % 2) - int itemCount = itemArraySize / 2 - List values = [] - for (int i = 0; i < itemCount; i++) { - int itemType = unpacker.unpackInt() - switch (itemType) { - case TraceMapperV1.VALUE_TYPE_STRING: - values.add(readStreamingString(unpacker, stringTable)) - break - case TraceMapperV1.VALUE_TYPE_BOOLEAN: - values.add(unpacker.unpackBoolean()) - break - case TraceMapperV1.VALUE_TYPE_FLOAT: - values.add(unpacker.unpackDouble()) - break - case TraceMapperV1.VALUE_TYPE_INT: - values.add(unpacker.unpackLong()) - break - default: - Assertions.fail("Unknown event array item type: " + itemType) - } - } - return values - } - - private static void skipSpanEvent(MessageUnpacker unpacker, List stringTable) { - int fieldCount = unpacker.unpackMapHeader() - for (int i = 0; i < fieldCount; i++) { - int fieldId = unpacker.unpackInt() - switch (fieldId) { - case 1: - unpacker.unpackLong() - break - case 2: - readStreamingString(unpacker, stringTable) - break - case 3: - readEventAttributes(unpacker, stringTable) - break - default: - Assertions.fail("Unexpected event field id while skipping: " + fieldId) - } - } - } - private static byte[] serializeMappedPayload( TraceMapperV1 mapper, List> traces) { @@ -1699,12 +1168,6 @@ class TraceMapperV1PayloadTest extends DDSpecification { processTagsAndBaggageCount++ super.processTagsAndBaggage(consumer) } - - @Override - void processTagsAndBaggage(MetadataConsumer consumer, boolean injectLinksAsTags, boolean injectBaggageAsTags) { - processTagsAndBaggageCount++ - super.processTagsAndBaggage(consumer, injectLinksAsTags, injectBaggageAsTags) - } } private static class ByteArrayChannel implements WritableByteChannel { diff --git a/dd-trace-core/src/test/java/datadog/trace/common/writer/TraceGenerator.java b/dd-trace-core/src/test/java/datadog/trace/common/writer/TraceGenerator.java index 00610ddbb61..c8fad4ab0ee 100644 --- a/dd-trace-core/src/test/java/datadog/trace/common/writer/TraceGenerator.java +++ b/dd-trace-core/src/test/java/datadog/trace/common/writer/TraceGenerator.java @@ -402,12 +402,6 @@ public void processTagsAndBaggage(MetadataConsumer consumer) { consumer.accept(metadata); } - @Override - public void processTagsAndBaggage( - MetadataConsumer consumer, boolean injectLinksAsTags, boolean injectBaggageAsTags) { - consumer.accept(metadata); - } - @Override public PojoSpan setSamplingPriority(int samplingPriority, int samplingMechanism) { return this; diff --git a/dd-trace-core/src/test/java/datadog/trace/common/writer/ddagent/V1PayloadReader.java b/dd-trace-core/src/test/java/datadog/trace/common/writer/ddagent/V1PayloadReader.java new file mode 100644 index 00000000000..4d1af7bbfca --- /dev/null +++ b/dd-trace-core/src/test/java/datadog/trace/common/writer/ddagent/V1PayloadReader.java @@ -0,0 +1,558 @@ +package datadog.trace.common.writer.ddagent; + +import static datadog.trace.common.writer.ddagent.TraceMapperV1.VALUE_TYPE_ARRAY; +import static datadog.trace.common.writer.ddagent.TraceMapperV1.VALUE_TYPE_BOOLEAN; +import static datadog.trace.common.writer.ddagent.TraceMapperV1.VALUE_TYPE_BYTES; +import static datadog.trace.common.writer.ddagent.TraceMapperV1.VALUE_TYPE_FLOAT; +import static datadog.trace.common.writer.ddagent.TraceMapperV1.VALUE_TYPE_INT; +import static datadog.trace.common.writer.ddagent.TraceMapperV1.VALUE_TYPE_STRING; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import datadog.trace.api.DDSpanId; +import datadog.trace.api.DDTraceId; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.msgpack.core.MessageFormat; +import org.msgpack.core.MessagePack; +import org.msgpack.core.MessageUnpacker; +import org.msgpack.core.buffer.ArrayBufferInput; +import org.msgpack.value.ValueType; + +/** + * Shared decoder for the msgpack V1 trace payload wire format produced by {@link TraceMapperV1}. + * + *

This is the single source of truth for the low-level parse primitives used by the V1 payload + * tests (both {@code TraceMapperV1PayloadTest} and {@code DDSpanSerializationTest}). Methods take + * an explicit {@link MessageUnpacker} and {@code stringTable} so callers that need to assert on + * wire-level details can interleave their own reads while keeping the streaming-string table in + * sync. The string table must be seeded with the empty string at index 0, mirroring {@code + * TraceMapperV1}. + */ +public final class V1PayloadReader { + + /** msgpack field ids of the top-level payload (header) map, mirroring {@code buildHeader}. */ + private static final class PayloadField { + static final int CONTAINER_ID = 2; + static final int LANGUAGE_NAME = 3; + static final int LANGUAGE_VERSION = 4; + static final int TRACER_VERSION = 5; + static final int RUNTIME_ID = 6; + static final int ENV = 7; + static final int HOSTNAME = 8; + static final int APP_VERSION = 9; + static final int ATTRIBUTES = 10; + static final int CHUNKS = 11; + + private PayloadField() {} + } + + /** msgpack field ids of a trace chunk map, mirroring {@code TraceMapperV1.map} (no field 5). */ + private static final class ChunkField { + static final int PRIORITY = 1; + static final int ORIGIN = 2; + static final int ATTRIBUTES = 3; + static final int SPANS = 4; + static final int TRACE_ID = 6; + static final int SAMPLING_MECHANISM = 7; + + private ChunkField() {} + } + + /** msgpack field ids of a span map, mirroring {@code encodeSpans} (16 fields). */ + private static final class SpanField { + static final int SERVICE = 1; + static final int NAME = 2; + static final int RESOURCE = 3; + static final int SPAN_ID = 4; + static final int PARENT_ID = 5; + static final int START = 6; + static final int DURATION = 7; + static final int ERROR = 8; + static final int ATTRIBUTES = 9; + static final int TYPE = 10; + static final int LINKS = 11; + static final int EVENTS = 12; + static final int ENV = 13; + static final int VERSION = 14; + static final int COMPONENT = 15; + static final int KIND = 16; + + private SpanField() {} + } + + /** msgpack field ids of a span link map, mirroring {@code encodeSpanLinks} (5 fields). */ + private static final class LinkField { + static final int TRACE_ID = 1; + static final int SPAN_ID = 2; + static final int ATTRIBUTES = 3; + static final int TRACE_STATE = 4; + static final int TRACE_FLAGS = 5; + + private LinkField() {} + } + + /** msgpack field ids of a span event map, mirroring {@code encodeSpanEvents} (3 fields). */ + private static final class EventField { + static final int TIME_UNIX_NANO = 1; + static final int NAME = 2; + static final int ATTRIBUTES = 3; + + private EventField() {} + } + + private V1PayloadReader() {} + + /** Decodes the first span of the first chunk of an encoded V1 payload. */ + public static V1Span readFirstSpan(byte[] encoded) throws IOException { + MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(new ArrayBufferInput(encoded)); + List stringTable = newStringTable(); + return readFirstSpan(unpacker, stringTable); + } + + /** Creates a string table seeded with the empty string at index 0, as the writer expects. */ + public static List newStringTable() { + List stringTable = new ArrayList<>(); + stringTable.add(""); + return stringTable; + } + + public static V1Span readFirstSpan(MessageUnpacker unpacker, List stringTable) + throws IOException { + int payloadFieldCount = unpacker.unpackMapHeader(); + for (int i = 0; i < payloadFieldCount; i++) { + int payloadFieldId = unpacker.unpackInt(); + if (payloadFieldId != PayloadField.CHUNKS) { + skipPayloadField(unpacker, payloadFieldId, stringTable); + continue; + } + int chunkCount = unpacker.unpackArrayHeader(); + assertEquals(1, chunkCount); + int chunkFieldCount = unpacker.unpackMapHeader(); + for (int j = 0; j < chunkFieldCount; j++) { + int chunkFieldId = unpacker.unpackInt(); + if (chunkFieldId != ChunkField.SPANS) { + skipChunkField(unpacker, chunkFieldId, stringTable); + continue; + } + int spanCount = unpacker.unpackArrayHeader(); + assertEquals(1, spanCount); + return decodeSpan(unpacker, stringTable); + } + } + throw new AssertionError("Could not find first span in v1 payload"); + } + + private static V1Span decodeSpan(MessageUnpacker unpacker, List stringTable) + throws IOException { + int spanFieldCount = unpacker.unpackMapHeader(); + Map attributes = Collections.emptyMap(); + List links = Collections.emptyList(); + List events = Collections.emptyList(); + for (int i = 0; i < spanFieldCount; i++) { + int spanFieldId = unpacker.unpackInt(); + switch (spanFieldId) { + case SpanField.ATTRIBUTES: + attributes = readAttributes(unpacker, stringTable); + break; + case SpanField.LINKS: + links = readSpanLinks(unpacker, stringTable); + break; + case SpanField.EVENTS: + events = readSpanEvents(unpacker, stringTable); + break; + default: + skipSpanField(unpacker, spanFieldId, stringTable); + } + } + return new V1Span(attributes, links, events); + } + + public static Map readAttributes( + MessageUnpacker unpacker, List stringTable) throws IOException { + int arraySize = unpacker.unpackArrayHeader(); + assertEquals(0, arraySize % 3); + int attributeCount = arraySize / 3; + Map attributes = new HashMap<>(); + for (int i = 0; i < attributeCount; i++) { + String key = readStreamingString(unpacker, stringTable); + int valueType = unpacker.unpackInt(); + attributes.put(key, readAttributeValue(unpacker, stringTable, valueType)); + } + return attributes; + } + + private static Object readAttributeValue( + MessageUnpacker unpacker, List stringTable, int valueType) throws IOException { + switch (valueType) { + case VALUE_TYPE_STRING: + return readStreamingString(unpacker, stringTable); + case VALUE_TYPE_BOOLEAN: + return unpacker.unpackBoolean(); + case VALUE_TYPE_FLOAT: + return unpacker.unpackDouble(); + case VALUE_TYPE_BYTES: + return readBinary(unpacker); + default: + throw new IllegalArgumentException("Unknown v1 attribute value type: " + valueType); + } + } + + public static List readSpanLinks(MessageUnpacker unpacker, List stringTable) + throws IOException { + int linkCount = unpacker.unpackArrayHeader(); + List links = new ArrayList<>(linkCount); + for (int i = 0; i < linkCount; i++) { + int linkFieldCount = unpacker.unpackMapHeader(); + byte[] traceId = null; + long spanId = 0; + Map attributes = Collections.emptyMap(); + String traceState = ""; + long traceFlags = 0; + for (int j = 0; j < linkFieldCount; j++) { + int linkFieldId = unpacker.unpackInt(); + switch (linkFieldId) { + case LinkField.TRACE_ID: + traceId = readBinary(unpacker); + break; + case LinkField.SPAN_ID: + spanId = unpackUnsignedLong(unpacker); + break; + case LinkField.ATTRIBUTES: + attributes = readAttributes(unpacker, stringTable); + break; + case LinkField.TRACE_STATE: + traceState = readStreamingString(unpacker, stringTable); + break; + case LinkField.TRACE_FLAGS: + traceFlags = unpackUnsignedLong(unpacker); + break; + default: + throw new IllegalArgumentException("Unexpected v1 span link field id: " + linkFieldId); + } + } + links.add(new V1SpanLink(traceId, spanId, attributes, traceState, traceFlags)); + } + return links; + } + + public static List readSpanEvents(MessageUnpacker unpacker, List stringTable) + throws IOException { + int eventCount = unpacker.unpackArrayHeader(); + List events = new ArrayList<>(eventCount); + for (int i = 0; i < eventCount; i++) { + int eventFieldCount = unpacker.unpackMapHeader(); + long timeUnixNano = 0; + String name = null; + Map attributes = Collections.emptyMap(); + for (int j = 0; j < eventFieldCount; j++) { + int eventFieldId = unpacker.unpackInt(); + switch (eventFieldId) { + case EventField.TIME_UNIX_NANO: + timeUnixNano = unpacker.unpackLong(); + break; + case EventField.NAME: + name = readStreamingString(unpacker, stringTable); + break; + case EventField.ATTRIBUTES: + attributes = readEventAttributes(unpacker, stringTable); + break; + default: + throw new IllegalArgumentException( + "Unexpected v1 span event field id: " + eventFieldId); + } + } + events.add(new V1SpanEvent(timeUnixNano, name, attributes)); + } + return events; + } + + private static Map readEventAttributes( + MessageUnpacker unpacker, List stringTable) throws IOException { + int arraySize = unpacker.unpackArrayHeader(); + assertEquals(0, arraySize % 3); + int attributeCount = arraySize / 3; + Map attributes = new HashMap<>(); + for (int i = 0; i < attributeCount; i++) { + String key = readStreamingString(unpacker, stringTable); + int valueType = unpacker.unpackInt(); + Object value; + switch (valueType) { + case VALUE_TYPE_STRING: + value = readStreamingString(unpacker, stringTable); + break; + case VALUE_TYPE_BOOLEAN: + value = unpacker.unpackBoolean(); + break; + case VALUE_TYPE_FLOAT: + value = unpacker.unpackDouble(); + break; + case VALUE_TYPE_INT: + value = unpacker.unpackLong(); + break; + case VALUE_TYPE_ARRAY: + value = readEventArrayValue(unpacker, stringTable); + break; + default: + throw new IllegalArgumentException("Unknown v1 event attribute value type: " + valueType); + } + attributes.put(key, value); + } + return attributes; + } + + private static List readEventArrayValue( + MessageUnpacker unpacker, List stringTable) throws IOException { + int arraySize = unpacker.unpackArrayHeader(); + assertEquals(0, arraySize % 2); + int itemCount = arraySize / 2; + List values = new ArrayList<>(itemCount); + for (int i = 0; i < itemCount; i++) { + int itemType = unpacker.unpackInt(); + switch (itemType) { + case VALUE_TYPE_STRING: + values.add(readStreamingString(unpacker, stringTable)); + break; + case VALUE_TYPE_BOOLEAN: + values.add(unpacker.unpackBoolean()); + break; + case VALUE_TYPE_FLOAT: + values.add(unpacker.unpackDouble()); + break; + case VALUE_TYPE_INT: + values.add(unpacker.unpackLong()); + break; + default: + throw new IllegalArgumentException("Unknown v1 event array item type: " + itemType); + } + } + return values; + } + + public static void skipPayloadField( + MessageUnpacker unpacker, int fieldId, List stringTable) throws IOException { + switch (fieldId) { + case PayloadField.CONTAINER_ID: + case PayloadField.LANGUAGE_NAME: + case PayloadField.LANGUAGE_VERSION: + case PayloadField.TRACER_VERSION: + case PayloadField.RUNTIME_ID: + case PayloadField.ENV: + case PayloadField.HOSTNAME: + case PayloadField.APP_VERSION: + readStreamingString(unpacker, stringTable); + break; + case PayloadField.ATTRIBUTES: + readAttributes(unpacker, stringTable); + break; + default: + throw new IllegalArgumentException("Unexpected v1 payload field id: " + fieldId); + } + } + + public static void skipChunkField(MessageUnpacker unpacker, int fieldId, List stringTable) + throws IOException { + switch (fieldId) { + case ChunkField.PRIORITY: + case ChunkField.SAMPLING_MECHANISM: + unpacker.unpackInt(); + break; + case ChunkField.ORIGIN: + readStreamingString(unpacker, stringTable); + break; + case ChunkField.ATTRIBUTES: + readAttributes(unpacker, stringTable); + break; + case ChunkField.SPANS: + int spanCount = unpacker.unpackArrayHeader(); + for (int i = 0; i < spanCount; i++) { + decodeSpan(unpacker, stringTable); + } + break; + case ChunkField.TRACE_ID: + readBinary(unpacker); + break; + default: + throw new IllegalArgumentException("Unexpected v1 chunk field id: " + fieldId); + } + } + + public static void skipSpanField(MessageUnpacker unpacker, int fieldId, List stringTable) + throws IOException { + switch (fieldId) { + case SpanField.SERVICE: + case SpanField.NAME: + case SpanField.RESOURCE: + case SpanField.TYPE: + case SpanField.ENV: + case SpanField.VERSION: + case SpanField.COMPONENT: + readStreamingString(unpacker, stringTable); + break; + case SpanField.SPAN_ID: + case SpanField.PARENT_ID: + unpackUnsignedLong(unpacker); + break; + case SpanField.START: + case SpanField.DURATION: + unpacker.unpackLong(); + break; + case SpanField.ERROR: + unpacker.unpackBoolean(); + break; + case SpanField.ATTRIBUTES: + readAttributes(unpacker, stringTable); + break; + case SpanField.LINKS: + readSpanLinks(unpacker, stringTable); + break; + case SpanField.EVENTS: + readSpanEvents(unpacker, stringTable); + break; + case SpanField.KIND: + unpacker.unpackInt(); + break; + default: + throw new IllegalArgumentException("Unexpected v1 span field id: " + fieldId); + } + } + + /** + * Reads a streaming string: either an inline literal (which is appended to the table) or an + * integer index into the table populated by earlier literals. + */ + public static String readStreamingString(MessageUnpacker unpacker, List stringTable) + throws IOException { + ValueType valueType = unpacker.getNextFormat().getValueType(); + if (valueType == ValueType.STRING) { + String value = unpacker.unpackString(); + stringTable.add(value); + return value; + } + if (valueType == ValueType.INTEGER) { + int index = unpacker.unpackInt(); + assertTrue(index >= 0 && index < stringTable.size(), "Invalid string-table index: " + index); + return stringTable.get(index); + } + throw new IllegalArgumentException("Expected v1 streaming string, got: " + valueType); + } + + public static long unpackUnsignedLong(MessageUnpacker unpacker) throws IOException { + if (unpacker.getNextFormat() == MessageFormat.UINT64) { + return DDSpanId.from(unpacker.unpackBigInteger().toString()); + } + return unpacker.unpackLong(); + } + + public static byte[] readBinary(MessageUnpacker unpacker) throws IOException { + byte[] bytes = new byte[unpacker.unpackBinaryHeader()]; + unpacker.readPayload(bytes); + return bytes; + } + + /** Encodes a trace id the same way {@link TraceMapperV1} serializes it (16 big-endian bytes). */ + public static byte[] traceIdBytes(DDTraceId traceId) { + return ByteBuffer.allocate(16) + .putLong(traceId.toHighOrderLong()) + .putLong(traceId.toLong()) + .array(); + } + + /** A decoded V1 span, exposing only the fields the tests assert on. */ + public static final class V1Span { + private final Map attributes; + private final List links; + private final List events; + + private V1Span( + Map attributes, List links, List events) { + this.attributes = attributes; + this.links = links; + this.events = events; + } + + public Map getAttributes() { + return attributes; + } + + public List getLinks() { + return links; + } + + public List getEvents() { + return events; + } + } + + /** A decoded V1 structured span link. */ + public static final class V1SpanLink { + private final byte[] traceId; + private final long spanId; + private final Map attributes; + private final String traceState; + private final long traceFlags; + + private V1SpanLink( + byte[] traceId, + long spanId, + Map attributes, + String traceState, + long traceFlags) { + this.traceId = traceId; + this.spanId = spanId; + this.attributes = attributes; + this.traceState = traceState; + this.traceFlags = traceFlags; + } + + public byte[] getTraceId() { + return traceId; + } + + public long getSpanId() { + return spanId; + } + + public Map getAttributes() { + return attributes; + } + + public String getTraceState() { + return traceState; + } + + public long getTraceFlags() { + return traceFlags; + } + } + + /** A decoded V1 structured span event. */ + public static final class V1SpanEvent { + private final long timeUnixNano; + private final String name; + private final Map attributes; + + private V1SpanEvent(long timeUnixNano, String name, Map attributes) { + this.timeUnixNano = timeUnixNano; + this.name = name; + this.attributes = attributes; + } + + public long getTimeUnixNano() { + return timeUnixNano; + } + + public String getName() { + return name; + } + + public Map getAttributes() { + return attributes; + } + } +} diff --git a/dd-trace-core/src/test/java/datadog/trace/core/DDSpanSerializationTest.java b/dd-trace-core/src/test/java/datadog/trace/core/DDSpanSerializationTest.java index cc27763caa3..f6f8f774ecf 100644 --- a/dd-trace-core/src/test/java/datadog/trace/core/DDSpanSerializationTest.java +++ b/dd-trace-core/src/test/java/datadog/trace/core/DDSpanSerializationTest.java @@ -1,7 +1,12 @@ package datadog.trace.core; +import static datadog.trace.api.DDTags.SPAN_LINKS; import static datadog.trace.api.config.GeneralConfig.EXPERIMENTAL_PROPAGATE_PROCESS_TAGS_ENABLED; +import static datadog.trace.api.config.TracerConfig.TRACE_BAGGAGE_TAG_KEYS; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; import datadog.communication.serialization.ByteBufferConsumer; import datadog.communication.serialization.FlushingBuffer; @@ -13,12 +18,21 @@ import datadog.trace.api.ProcessTags; import datadog.trace.api.datastreams.NoopPathwayContext; import datadog.trace.api.sampling.PrioritySampling; +import datadog.trace.bootstrap.instrumentation.api.Baggage; +import datadog.trace.bootstrap.instrumentation.api.ProfilingContextIntegration; +import datadog.trace.bootstrap.instrumentation.api.SpanAttributes; import datadog.trace.common.writer.ListWriter; +import datadog.trace.common.writer.Payload; import datadog.trace.common.writer.ddagent.TraceMapperTestBridge; import datadog.trace.common.writer.ddagent.TraceMapperV0_4; import datadog.trace.common.writer.ddagent.TraceMapperV0_5; +import datadog.trace.common.writer.ddagent.TraceMapperV1; +import datadog.trace.common.writer.ddagent.V1PayloadReader; import datadog.trace.junit.utils.config.WithConfig; +import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.channels.Channels; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -310,6 +324,75 @@ void serializeTraceWithBaggageAndTagsCorrectlyV05( tracer.close(); } + @TableTest({ + "scenario | injectBaggage", + "baggage enabled | true ", + "baggage disabled | false " + }) + @WithConfig(key = TRACE_BAGGAGE_TAG_KEYS, value = "user.id,custom") + void serializeTraceWithConfiguredBaggageAndTagsCorrectlyV1(String scenario, boolean injectBaggage) + throws Exception { + CoreTracer tracer = tracerBuilder().writer(new ListWriter()).build(); + Map baggage = new HashMap<>(); + baggage.put("legacy-baggage", "legacy-value"); + + Map w3cBaggageItems = new HashMap<>(); + w3cBaggageItems.put("user.id", "user-1"); + w3cBaggageItems.put("custom", "custom-value"); + w3cBaggageItems.put("ignored", "ignored-value"); + + DDSpanContext context = + createSpanContext(tracer, baggage, Baggage.create(w3cBaggageItems), injectBaggage, true, 1); + context.setTag("span-tag", "span-value"); + DDSpan span = DDSpan.create("test", 0, context, null); + + V1PayloadReader.V1Span payload = V1PayloadReader.readFirstSpan(serializeV1Payload(span)); + Map attributes = payload.getAttributes(); + + assertEquals("span-value", attributes.get("span-tag")); + if (injectBaggage) { + assertEquals("legacy-value", attributes.get("legacy-baggage")); + assertEquals("user-1", attributes.get("baggage.user.id")); + assertEquals("custom-value", attributes.get("baggage.custom")); + } else { + assertFalse(attributes.containsKey("legacy-baggage")); + assertFalse(attributes.containsKey("baggage.user.id")); + assertFalse(attributes.containsKey("baggage.custom")); + } + assertFalse(attributes.containsKey("baggage.ignored")); + tracer.close(); + } + + @Test + void serializeTraceWithSpanLinksAsStructuredLinksOnlyV1() throws Exception { + CoreTracer tracer = tracerBuilder().writer(new ListWriter()).build(); + DDSpanContext context = createSpanContext(tracer, Collections.emptyMap(), null, true, true, 0); + DDSpan span = DDSpan.create("test", 0, context, null); + + Map linkAttributes = new HashMap<>(); + linkAttributes.put("link.source", "unit-test"); + DDSpanLink link = + new DDSpanLink( + DDTraceId.fromHex("11223344556677889900aabbccddeeff"), + DDSpanId.fromHex("123456789abcdef0"), + (byte) 1, + "dd=s:1", + SpanAttributes.fromMap(linkAttributes)); + span.addLink(link); + + V1PayloadReader.V1Span payload = V1PayloadReader.readFirstSpan(serializeV1Payload(span)); + + assertFalse(payload.getAttributes().containsKey(SPAN_LINKS)); + assertEquals(1, payload.getLinks().size()); + V1PayloadReader.V1SpanLink actualLink = payload.getLinks().get(0); + assertArrayEquals(V1PayloadReader.traceIdBytes(link.traceId()), actualLink.getTraceId()); + assertEquals(link.spanId(), actualLink.getSpanId()); + assertEquals(link.traceState(), actualLink.getTraceState()); + assertEquals((long) (link.traceFlags() & 0xFF), actualLink.getTraceFlags()); + assertEquals("unit-test", actualLink.getAttributes().get("link.source")); + tracer.close(); + } + @Test void serializeTraceWithFlatMapTagV04() throws Exception { CoreTracer tracer = tracerBuilder().writer(new ListWriter()).build(); @@ -486,6 +569,51 @@ private DDSpanContext createSpanContext( return ctx; } + private DDSpanContext createSpanContext( + CoreTracer tracer, + Map baggage, + Baggage w3cBaggage, + boolean injectBaggage, + boolean injectLinks, + int tagsSize) { + return new DDSpanContext( + DDTraceId.ONE, + 1, + DDSpanId.ZERO, + null, + null, + "fakeService", + "fakeOperation", + "fakeResource", + PrioritySampling.UNSET, + null, + baggage, + w3cBaggage, + false, + "fakeType", + tagsSize, + tracer.createTraceCollector(DDTraceId.ONE), + null, + null, + null, + NoopPathwayContext.INSTANCE, + false, + null, + ProfilingContextIntegration.NoOp.INSTANCE, + injectBaggage, + injectLinks); + } + + private static byte[] serializeV1Payload(DDSpan span) throws Exception { + TraceMapperV1 mapper = new TraceMapperV1(); + CapturePayloadBuffer capture = new CapturePayloadBuffer(mapper); + MsgPackWriter packer = new MsgPackWriter(new FlushingBuffer(1024, capture)); + packer.format(Collections.singletonList(span), mapper); + packer.flush(); + assertNotNull(capture.bytes); + return capture.bytes; + } + private static String[] buildDictionary(TraceMapperV0_5 mapper) throws Exception { GrowableBuffer dictionaryBuffer = TraceMapperTestBridge.getDictionary(mapper); Map encoding = TraceMapperTestBridge.getEncoding(mapper); @@ -497,4 +625,27 @@ private static String[] buildDictionary(TraceMapperV0_5 mapper) throws Exception } return dictionary; } + + private static final class CapturePayloadBuffer implements ByteBufferConsumer { + private final TraceMapperV1 mapper; + private byte[] bytes; + + private CapturePayloadBuffer(TraceMapperV1 mapper) { + this.mapper = mapper; + } + + @Override + public void accept(int messageCount, ByteBuffer buffer) { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + try { + Payload payload = mapper.newPayload().withBody(messageCount, buffer); + payload.writeTo(Channels.newChannel(out)); + bytes = out.toByteArray(); + } catch (IOException e) { + throw new IllegalStateException(e); + } finally { + mapper.reset(); + } + } + } } diff --git a/dd-trace-core/src/traceAgentTest/groovy/TraceGenerator.groovy b/dd-trace-core/src/traceAgentTest/groovy/TraceGenerator.groovy index 8477ad00df3..a0c81cbd0cb 100644 --- a/dd-trace-core/src/traceAgentTest/groovy/TraceGenerator.groovy +++ b/dd-trace-core/src/traceAgentTest/groovy/TraceGenerator.groovy @@ -332,11 +332,6 @@ class TraceGenerator { consumer.accept(metadata) } - @Override - void processTagsAndBaggage(MetadataConsumer consumer, boolean injectLinksAsTags, boolean injectBaggageAsTags) { - consumer.accept(metadata) - } - @Override PojoSpan setSamplingPriority(int samplingPriority, int samplingMechanism) { return this