From 07df666166d17291c0b164be2461d2c96f8a41ab Mon Sep 17 00:00:00 2001 From: Sam Maya Date: Tue, 12 May 2026 13:36:15 -0400 Subject: [PATCH 1/4] implement mvp for span derived primary tags --- .../trace/api/config/GeneralConfig.java | 1 + .../ConflatingMetricsAggregatorBenchmark.java | 1 + .../metrics/ConflatingMetricsAggregator.java | 44 +++++- .../trace/common/metrics/MetricKey.java | 13 +- .../metrics/SerializingMetricWriter.java | 10 +- .../common/metrics/AggregateMetricTest.groovy | 4 +- .../ConflatingMetricAggregatorTest.groovy | 102 +++++++++----- .../common/metrics/FootprintForkedTest.groovy | 1 + .../SerializingMetricWriterTest.groovy | 50 ++++--- .../metrics/MetricKeyAdditionalTagsTest.java | 77 ++++++++++ ...alizingMetricWriterAdditionalTagsTest.java | 131 ++++++++++++++++++ .../groovy/MetricsIntegrationTest.groovy | 4 +- .../main/java/datadog/trace/api/Config.java | 5 + metadata/supported-configurations.json | 8 ++ 14 files changed, 384 insertions(+), 67 deletions(-) create mode 100644 dd-trace-core/src/test/java/datadog/trace/common/metrics/MetricKeyAdditionalTagsTest.java create mode 100644 dd-trace-core/src/test/java/datadog/trace/common/metrics/SerializingMetricWriterAdditionalTagsTest.java diff --git a/dd-trace-api/src/main/java/datadog/trace/api/config/GeneralConfig.java b/dd-trace-api/src/main/java/datadog/trace/api/config/GeneralConfig.java index 60af53815fc..945c34764c5 100644 --- a/dd-trace-api/src/main/java/datadog/trace/api/config/GeneralConfig.java +++ b/dd-trace-api/src/main/java/datadog/trace/api/config/GeneralConfig.java @@ -76,6 +76,7 @@ public final class GeneralConfig { public static final String TRACER_METRICS_MAX_PENDING = "trace.tracer.metrics.max.pending"; public static final String TRACER_METRICS_IGNORED_RESOURCES = "trace.tracer.metrics.ignored.resources"; + public static final String TRACE_STATS_ADDITIONAL_TAGS = "trace.stats.additional.tags"; public static final String AZURE_APP_SERVICES = "azure.app.services"; public static final String INTERNAL_EXIT_ON_FAILURE = "trace.internal.exit.on.failure"; diff --git a/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorBenchmark.java b/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorBenchmark.java index 971ee5cf6e4..bf868c4793e 100644 --- a/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorBenchmark.java +++ b/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorBenchmark.java @@ -40,6 +40,7 @@ public class ConflatingMetricsAggregatorBenchmark { new ConflatingMetricsAggregator( new WellKnownTags("", "", "", "", "", ""), Collections.emptySet(), + Collections.emptyList(), featuresDiscovery, HealthMetrics.NO_OP, new NullSink(), diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java index f60edf1d700..783097a58cf 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java @@ -80,6 +80,16 @@ public final class ConflatingMetricsAggregator implements MetricsAggregator, Eve Pair.of( DDCaches.newFixedSizeCache(512), value -> UTF8BytesString.create(key + ":" + value)); + private static final DDCache< + String, Pair, Function>> + ADDITIONAL_TAG_VALUES_CACHE = DDCaches.newFixedSizeCache(64); + private static final Function< + String, Pair, Function>> + ADDITIONAL_TAG_VALUES_CACHE_ADDER = + key -> + Pair.of( + DDCaches.newFixedSizeCache(512), + value -> UTF8BytesString.create(key + ":" + value)); private static final CharSequence SYNTHETICS_ORIGIN = "synthetics"; private static final Set ELIGIBLE_SPAN_KINDS_FOR_METRICS = @@ -93,6 +103,7 @@ public final class ConflatingMetricsAggregator implements MetricsAggregator, Eve new HashSet<>(Arrays.asList(SPAN_KIND_CLIENT, SPAN_KIND_PRODUCER, SPAN_KIND_CONSUMER))); private final Set ignoredResources; + private final List additionalTagKeys; private final MessagePassingQueue batchPool; private final ConcurrentHashMap pending; private final ConcurrentHashMap keys; @@ -115,6 +126,7 @@ public ConflatingMetricsAggregator( this( config.getWellKnownTags(), config.getMetricsIgnoredResources(), + config.getTraceStatsAdditionalTags(), sharedCommunicationObjects.featuresDiscovery(config), healthMetrics, new OkHttpSink( @@ -132,6 +144,7 @@ public ConflatingMetricsAggregator( ConflatingMetricsAggregator( WellKnownTags wellKnownTags, Set ignoredResources, + List additionalTagKeys, DDAgentFeaturesDiscovery features, HealthMetrics healthMetric, Sink sink, @@ -141,6 +154,7 @@ public ConflatingMetricsAggregator( this( wellKnownTags, ignoredResources, + additionalTagKeys, features, healthMetric, sink, @@ -154,6 +168,7 @@ public ConflatingMetricsAggregator( ConflatingMetricsAggregator( WellKnownTags wellKnownTags, Set ignoredResources, + List additionalTagKeys, DDAgentFeaturesDiscovery features, HealthMetrics healthMetric, Sink sink, @@ -164,6 +179,7 @@ public ConflatingMetricsAggregator( boolean includeEndpointInMetrics) { this( ignoredResources, + additionalTagKeys, features, healthMetric, sink, @@ -177,6 +193,7 @@ public ConflatingMetricsAggregator( ConflatingMetricsAggregator( Set ignoredResources, + List additionalTagKeys, DDAgentFeaturesDiscovery features, HealthMetrics healthMetric, Sink sink, @@ -187,6 +204,8 @@ public ConflatingMetricsAggregator( TimeUnit timeUnit, boolean includeEndpointInMetrics) { this.ignoredResources = ignoredResources; + this.additionalTagKeys = + additionalTagKeys == null ? Collections.emptyList() : additionalTagKeys; this.includeEndpointInMetrics = includeEndpointInMetrics; this.inbox = Queues.mpscArrayQueue(queueSize); this.batchPool = Queues.spmcArrayQueue(maxAggregates); @@ -350,7 +369,8 @@ private boolean publish(CoreSpan span, boolean isTopLevel, CharSequence spanK getPeerTags(span, spanKind.toString()), httpMethod, httpEndpoint, - grpcStatusCode); + grpcStatusCode, + getAdditionalTags(span)); MetricKey key = keys.putIfAbsent(newKey, newKey); if (null == key) { key = newKey; @@ -413,6 +433,28 @@ private List getPeerTags(CoreSpan span, String spanKind) { return Collections.emptyList(); } + private List getAdditionalTags(CoreSpan span) { + if (additionalTagKeys.isEmpty()) { + return Collections.emptyList(); + } + List result = null; + for (String tagKey : additionalTagKeys) { + Object value = span.unsafeGetTag(tagKey); + if (value == null) { + continue; + } + Pair, Function> cacheAndCreator = + ADDITIONAL_TAG_VALUES_CACHE.computeIfAbsent(tagKey, ADDITIONAL_TAG_VALUES_CACHE_ADDER); + UTF8BytesString formatted = + cacheAndCreator.getLeft().computeIfAbsent(value.toString(), cacheAndCreator.getRight()); + if (result == null) { + result = new ArrayList<>(additionalTagKeys.size()); + } + result.add(formatted); + } + return result == null ? Collections.emptyList() : result; + } + private static boolean isSynthetic(CoreSpan span) { return span.getOrigin() != null && SYNTHETICS_ORIGIN.equals(span.getOrigin().toString()); } diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricKey.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricKey.java index 9e2e2098d1f..8ae11341d51 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricKey.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/MetricKey.java @@ -39,6 +39,7 @@ public final class MetricKey { private final UTF8BytesString httpMethod; private final UTF8BytesString httpEndpoint; private final UTF8BytesString grpcStatusCode; + private final List additionalTags; public MetricKey( CharSequence resource, @@ -53,7 +54,8 @@ public MetricKey( List peerTags, CharSequence httpMethod, CharSequence httpEndpoint, - CharSequence grpcStatusCode) { + CharSequence grpcStatusCode, + List additionalTags) { this.resource = null == resource ? EMPTY : utf8(RESOURCE_CACHE, resource); this.service = null == service ? EMPTY : utf8(SERVICE_CACHE, service); this.serviceSource = null == serviceSource ? null : utf8(SERVICE_SOURCE_CACHE, serviceSource); @@ -68,6 +70,7 @@ public MetricKey( this.httpEndpoint = httpEndpoint == null ? null : utf8(HTTP_ENDPOINT_CACHE, httpEndpoint); this.grpcStatusCode = grpcStatusCode == null ? null : utf8(GRPC_STATUS_CODE_CACHE, grpcStatusCode); + this.additionalTags = additionalTags == null ? Collections.emptyList() : additionalTags; int tmpHash = 0; tmpHash = HashingUtils.addToHash(tmpHash, this.isTraceRoot); @@ -83,6 +86,7 @@ public MetricKey( tmpHash = HashingUtils.addToHash(tmpHash, this.httpEndpoint); tmpHash = HashingUtils.addToHash(tmpHash, this.httpMethod); tmpHash = HashingUtils.addToHash(tmpHash, this.grpcStatusCode); + tmpHash = HashingUtils.addToHash(tmpHash, this.additionalTags); this.hash = tmpHash; } @@ -146,6 +150,10 @@ public UTF8BytesString getGrpcStatusCode() { return grpcStatusCode; } + public List getAdditionalTags() { + return additionalTags; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -166,7 +174,8 @@ public boolean equals(Object o) { && Objects.equals(serviceSource, metricKey.serviceSource) && Objects.equals(httpMethod, metricKey.httpMethod) && Objects.equals(httpEndpoint, metricKey.httpEndpoint) - && Objects.equals(grpcStatusCode, metricKey.grpcStatusCode); + && Objects.equals(grpcStatusCode, metricKey.grpcStatusCode) + && additionalTags.equals(metricKey.additionalTags); } return false; } diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java index 0f84964e9db..ef482dee546 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java @@ -41,6 +41,7 @@ public final class SerializingMetricWriter implements MetricWriter { private static final byte[] IS_TRACE_ROOT = "IsTraceRoot".getBytes(ISO_8859_1); private static final byte[] SPAN_KIND = "SpanKind".getBytes(ISO_8859_1); private static final byte[] PEER_TAGS = "PeerTags".getBytes(ISO_8859_1); + private static final byte[] ADDITIONAL_METRIC_TAGS = "AdditionalMetricTags".getBytes(ISO_8859_1); private static final byte[] HTTP_METHOD = "HTTPMethod".getBytes(ISO_8859_1); private static final byte[] HTTP_ENDPOINT = "HTTPEndpoint".getBytes(ISO_8859_1); private static final byte[] GRPC_STATUS_CODE = "GRPCStatusCode".getBytes(ISO_8859_1); @@ -149,7 +150,7 @@ public void add(MetricKey key, AggregateMetric aggregate) { final boolean hasServiceSource = key.getServiceSource() != null; final boolean hasGrpcStatusCode = key.getGrpcStatusCode() != null; final int mapSize = - 15 + 16 + (hasServiceSource ? 1 : 0) + (hasHttpMethod ? 1 : 0) + (hasHttpEndpoint ? 1 : 0) @@ -189,6 +190,13 @@ public void add(MetricKey key, AggregateMetric aggregate) { writer.writeUTF8(peerTag); } + writer.writeUTF8(ADDITIONAL_METRIC_TAGS); + final List additionalTags = key.getAdditionalTags(); + writer.startArray(additionalTags.size()); + for (UTF8BytesString tag : additionalTags) { + writer.writeUTF8(tag); + } + if (hasServiceSource) { writer.writeUTF8(SERVICE_SOURCE); writer.writeUTF8(key.getServiceSource()); diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/AggregateMetricTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/AggregateMetricTest.groovy index 0b245552db3..b5c4d2717e5 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/AggregateMetricTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/AggregateMetricTest.groovy @@ -65,7 +65,7 @@ class AggregateMetricTest extends DDSpecification { given: AggregateMetric aggregate = new AggregateMetric().recordDurations(3, new AtomicLongArray(0L, 0L, 0L | ERROR_TAG | TOP_LEVEL_TAG)) - Batch batch = new Batch().reset(new MetricKey("foo", "bar", "qux", null, "type", 0, false, true, "corge", [UTF8BytesString.create("grault:quux")], null, null, null)) + Batch batch = new Batch().reset(new MetricKey("foo", "bar", "qux", null, "type", 0, false, true, "corge", [UTF8BytesString.create("grault:quux")], null, null, null, null)) batch.add(0L, 10) batch.add(0L, 10) batch.add(0L, 10) @@ -140,7 +140,7 @@ class AggregateMetricTest extends DDSpecification { def "consistent under concurrent attempts to read and write"() { given: AggregateMetric aggregate = new AggregateMetric() - MetricKey key = new MetricKey("foo", "bar", "qux", null, "type", 0, false, true, "corge", [UTF8BytesString.create("grault:quux")], null, null, null) + MetricKey key = new MetricKey("foo", "bar", "qux", null, "type", 0, false, true, "corge", [UTF8BytesString.create("grault:quux")], null, null, null, null) BlockingDeque queue = new LinkedBlockingDeque<>(1000) ExecutorService reader = Executors.newSingleThreadExecutor() int writerCount = 10 diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy index 962ad2ce892..bbc94103534 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy @@ -38,6 +38,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator( wellKnownTags, empty, + [] as List, features, HealthMetrics.NO_OP, sink, @@ -68,6 +69,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator( wellKnownTags, [ignoredResourceName].toSet(), + [] as List, features, HealthMetrics.NO_OP, sink, @@ -104,6 +106,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + [] as List, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, false) aggregator.start() @@ -133,7 +136,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { null, null, null - ), _) >> { MetricKey key, AggregateMetric value -> + , null), _) >> { MetricKey key, AggregateMetric value -> value.getHitCount() == 1 && value.getTopLevelCount() == 1 && value.getDuration() == 100 } 1 * writer.finishBucket() >> { latch.countDown() } @@ -150,6 +153,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + [] as List, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, false) aggregator.start() @@ -179,7 +183,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { null, null, null - ), _) >> { MetricKey key, AggregateMetric value -> + , null), _) >> { MetricKey key, AggregateMetric value -> value.getHitCount() == 1 && value.getTopLevelCount() == 1 && value.getDuration() == 100 } 1 * writer.finishBucket() >> { latch.countDown() } @@ -196,6 +200,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + [] as List, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, true) aggregator.start() @@ -231,7 +236,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { httpMethod, httpEndpoint, null - ), { AggregateMetric aggregateMetric -> + , null), { AggregateMetric aggregateMetric -> aggregateMetric.getHitCount() == 1 && aggregateMetric.getTopLevelCount() == 0 && aggregateMetric.getDuration() == 100 }) (statsComputed ? 1 : 0) * writer.finishBucket() >> { latch.countDown() } @@ -261,6 +266,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >>> [["country"], ["country", "georegion"],] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + [] as List, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, false) aggregator.start() @@ -293,7 +299,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { null, null, null - ), { AggregateMetric aggregateMetric -> + , null), { AggregateMetric aggregateMetric -> aggregateMetric.getHitCount() == 1 && aggregateMetric.getTopLevelCount() == 0 && aggregateMetric.getDuration() == 100 }) 1 * writer.add( @@ -311,7 +317,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { null, null, null - ), { AggregateMetric aggregateMetric -> + , null), { AggregateMetric aggregateMetric -> aggregateMetric.getHitCount() == 1 && aggregateMetric.getTopLevelCount() == 0 && aggregateMetric.getDuration() == 100 }) 1 * writer.finishBucket() >> { latch.countDown() } @@ -328,6 +334,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >> ["peer.hostname", "_dd.base_service"] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + [] as List, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, false) aggregator.start() @@ -358,7 +365,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { null, null, null - ), { AggregateMetric aggregateMetric -> + , null), { AggregateMetric aggregateMetric -> aggregateMetric.getHitCount() == 1 && aggregateMetric.getTopLevelCount() == 0 && aggregateMetric.getDuration() == 100 }) 1 * writer.finishBucket() >> { latch.countDown() } @@ -380,8 +387,8 @@ class ConflatingMetricAggregatorTest extends DDSpecification { DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true features.peerTags() >> [] - ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, features, HealthMetrics.NO_OP, - sink, writer, 10, queueSize, reportingInterval, SECONDS, false) + ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, [] as List, features, HealthMetrics.NO_OP, + sink, writer, 10, queueSize, reportingInterval, SECONDS, false) aggregator.start() when: @@ -410,7 +417,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { null, null, null - ), { AggregateMetric value -> + , null), { AggregateMetric value -> value.getHitCount() == 1 && value.getTopLevelCount() == topLevelCount && value.getDuration() == 100 }) 1 * writer.finishBucket() >> { latch.countDown() } @@ -433,6 +440,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + [] as List, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, false) long duration = 100 List trace = [ @@ -469,7 +477,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { null, null, null - ), { AggregateMetric value -> + , null), { AggregateMetric value -> value.getHitCount() == count && value.getDuration() == count * duration }) 1 * writer.add(new MetricKey( @@ -486,7 +494,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { null, null, null - ), { AggregateMetric value -> + , null), { AggregateMetric value -> value.getHitCount() == count && value.getDuration() == count * duration * 2 }) @@ -505,6 +513,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + [] as List, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, true) aggregator.start() @@ -540,7 +549,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { "GET", "/api/users/:id", null - ), { AggregateMetric value -> + , null), { AggregateMetric value -> value.getHitCount() == count && value.getDuration() == count * duration }) 1 * writer.finishBucket() >> { latch.countDown() } @@ -581,7 +590,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { "GET", "/api/users/:id", null - ), { AggregateMetric value -> + , null), { AggregateMetric value -> value.getHitCount() == 1 && value.getDuration() == duration }) 1 * writer.add(new MetricKey( @@ -598,7 +607,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { "GET", "/api/orders/:id", null - ), { AggregateMetric value -> + , null), { AggregateMetric value -> value.getHitCount() == 1 && value.getDuration() == duration * 2 }) 1 * writer.add(new MetricKey( @@ -615,7 +624,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { "POST", "/api/users/:id", null - ), { AggregateMetric value -> + , null), { AggregateMetric value -> value.getHitCount() == 1 && value.getDuration() == duration * 3 }) 1 * writer.finishBucket() >> { latch2.countDown() } @@ -632,6 +641,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + [] as List, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, true) aggregator.start() @@ -679,7 +689,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { "GET", "/api/users/:id", null - ), { AggregateMetric value -> + , null), { AggregateMetric value -> value.getHitCount() == 1 && value.getDuration() == duration }) 1 * writer.add(new MetricKey( @@ -696,7 +706,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { "POST", "/api/users/:id", null - ), { AggregateMetric value -> + , null), { AggregateMetric value -> value.getHitCount() == 1 && value.getDuration() == duration * 2 }) 1 * writer.add(new MetricKey( @@ -713,7 +723,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { "GET", "/api/users/:id", null - ), { AggregateMetric value -> + , null), { AggregateMetric value -> value.getHitCount() == 1 && value.getDuration() == duration * 3 }) 1 * writer.add(new MetricKey( @@ -730,7 +740,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { "GET", "/api/orders/:id", null - ), { AggregateMetric value -> + , null), { AggregateMetric value -> value.getHitCount() == 1 && value.getDuration() == duration * 4 }) 1 * writer.finishBucket() >> { latch.countDown() } @@ -747,6 +757,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + [] as List, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, true) aggregator.start() @@ -783,7 +794,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { null, null, null - ), { AggregateMetric value -> + , null), { AggregateMetric value -> value.getHitCount() == 1 && value.getDuration() == duration }) 1 * writer.add(new MetricKey( @@ -800,7 +811,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { "GET", "/api/users/:id", null - ), { AggregateMetric value -> + , null), { AggregateMetric value -> value.getHitCount() == 1 && value.getDuration() == duration * 2 }) 1 * writer.finishBucket() >> { latch.countDown() } @@ -817,6 +828,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + [] as List, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, false) aggregator.start() @@ -851,7 +863,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { null, null, null - ), { AggregateMetric value -> + , null), { AggregateMetric value -> value.getHitCount() == 2 && value.getDuration() == 2 * duration }) 1 * writer.add(new MetricKey( @@ -868,7 +880,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { null, null, null - ), { AggregateMetric value -> + , null), { AggregateMetric value -> value.getHitCount() == 1 && value.getDuration() == duration }) 1 * writer.finishBucket() >> { latch.countDown() } @@ -886,6 +898,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + [] as List, features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS, false) long duration = 100 aggregator.start() @@ -919,7 +932,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { null, null, null - ), _) >> { MetricKey key, AggregateMetric value -> + , null), _) >> { MetricKey key, AggregateMetric value -> value.getHitCount() == 1 && value.getDuration() == duration } } @@ -937,7 +950,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { null, null, null - ), _) + , null), _) 1 * writer.finishBucket() >> { latch.countDown() } cleanup: @@ -954,6 +967,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.peerTags() >> [] HealthMetrics healthMetrics = Mock(HealthMetrics) ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + [] as List, features, healthMetrics, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS, false) long duration = 100 aggregator.start() @@ -988,6 +1002,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.peerTags() >> [] HealthMetrics healthMetrics = Mock(HealthMetrics) ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + [] as List, features, healthMetrics, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS, false) aggregator.start() @@ -1033,6 +1048,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + [] as List, features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS, false) long duration = 100 aggregator.start() @@ -1066,7 +1082,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { null, null, null - ), { AggregateMetric value -> + , null), { AggregateMetric value -> value.getHitCount() == 1 && value.getDuration() == duration }) } @@ -1101,7 +1117,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { null, null, null - ), { AggregateMetric value -> + , null), { AggregateMetric value -> value.getHitCount() == 1 && value.getDuration() == duration }) } @@ -1119,7 +1135,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { null, null, null - ), _) + , null), _) 1 * writer.finishBucket() >> { latch.countDown() } cleanup: @@ -1135,6 +1151,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + [] as List, features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS, false) long duration = 100 aggregator.start() @@ -1168,7 +1185,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { null, null, null - ), { AggregateMetric value -> + , null), { AggregateMetric value -> value.getHitCount() == 1 && value.getDuration() == duration }) } @@ -1195,6 +1212,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + [] as List, features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS, false) long duration = 100 aggregator.start() @@ -1227,7 +1245,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { null, null, null - ), { AggregateMetric value -> + , null), { AggregateMetric value -> value.getHitCount() == 1 && value.getDuration() == duration }) } @@ -1246,6 +1264,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + [] as List, features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS, false) long duration = 100 aggregator.start() @@ -1277,6 +1296,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { Sink sink = Stub(Sink) DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + [] as List, features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS, false) aggregator.start() @@ -1299,6 +1319,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> false features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + [] as List, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, 200, MILLISECONDS, false) final spans = [ new SimpleSpan("service", "operation", "resource", "type", false, true, false, 0, 10, HTTP_OK) @@ -1331,6 +1352,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + [] as List, features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS, false) when: @@ -1364,6 +1386,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + [] as List, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, false) aggregator.start() @@ -1394,7 +1417,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { null, null, null - ), { AggregateMetric aggregateMetric -> + , null), { AggregateMetric aggregateMetric -> aggregateMetric.getHitCount() == 1 && aggregateMetric.getTopLevelCount() == 1 && aggregateMetric.getDuration() == 100 }) 1 * writer.finishBucket() >> { latch.countDown() } @@ -1411,6 +1434,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + [] as List, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, false) aggregator.start() @@ -1449,7 +1473,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { null, null, null - ), { AggregateMetric aggregateMetric -> + , null), { AggregateMetric aggregateMetric -> aggregateMetric.getHitCount() == 3 && aggregateMetric.getTopLevelCount() == 3 && aggregateMetric.getDuration() == 450 }) 1 * writer.finishBucket() >> { latch.countDown() } @@ -1466,6 +1490,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + [] as List, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, true) aggregator.start() @@ -1504,7 +1529,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { "GET", "/api/users/:id", null - ), { AggregateMetric aggregateMetric -> + , null), { AggregateMetric aggregateMetric -> aggregateMetric.getHitCount() == 1 && aggregateMetric.getTopLevelCount() == 1 && aggregateMetric.getDuration() == 100 }) 1 * writer.add( @@ -1522,7 +1547,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { "POST", "/api/orders", null - ), { AggregateMetric aggregateMetric -> + , null), { AggregateMetric aggregateMetric -> aggregateMetric.getHitCount() == 1 && aggregateMetric.getTopLevelCount() == 1 && aggregateMetric.getDuration() == 200 }) 1 * writer.add( @@ -1540,7 +1565,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { null, null, null - ), { AggregateMetric aggregateMetric -> + , null), { AggregateMetric aggregateMetric -> aggregateMetric.getHitCount() == 1 && aggregateMetric.getTopLevelCount() == 1 && aggregateMetric.getDuration() == 150 }) 1 * writer.finishBucket() >> { latch.countDown() } @@ -1557,6 +1582,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, + [] as List, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, false) aggregator.start() @@ -1592,7 +1618,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { null, null, "0" - ), _) + , null), _) 1 * writer.add(new MetricKey( "grpc.service/Method", "service", @@ -1607,7 +1633,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { null, null, "5" - ), _) + , null), _) 1 * writer.add(new MetricKey( "GET /api", "service", @@ -1622,7 +1648,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { null, null, null - ), _) + , null), _) 1 * writer.finishBucket() >> { latch.countDown() } cleanup: diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/FootprintForkedTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/FootprintForkedTest.groovy index eceedeb1935..adf8754dbe9 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/FootprintForkedTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/FootprintForkedTest.groovy @@ -40,6 +40,7 @@ class FootprintForkedTest extends DDSpecification { ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator( new WellKnownTags("runtimeid","hostname", "env", "service", "version","language"), [].toSet() as Set, + [] as List, features, HealthMetrics.NO_OP, sink, diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SerializingMetricWriterTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SerializingMetricWriterTest.groovy index 3ff81de9851..ecc76d74c3f 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SerializingMetricWriterTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SerializingMetricWriterTest.groovy @@ -74,7 +74,7 @@ class SerializingMetricWriterTest extends DDSpecification { null, null, null - ), + , null), new AggregateMetric().recordDurations(10, new AtomicLongArray(1L)) ), Pair.of( @@ -96,7 +96,7 @@ class SerializingMetricWriterTest extends DDSpecification { null, null, null - ), + , null), new AggregateMetric().recordDurations(9, new AtomicLongArray(1L)) ), Pair.of( @@ -114,7 +114,7 @@ class SerializingMetricWriterTest extends DDSpecification { "GET", "/api/users/:id", null - ), + , null), new AggregateMetric().recordDurations(5, new AtomicLongArray(1L)) ) ], @@ -134,7 +134,7 @@ class SerializingMetricWriterTest extends DDSpecification { null, null, null - ), + , null), new AggregateMetric().recordDurations(10, new AtomicLongArray(1L)) ) }) @@ -149,8 +149,8 @@ class SerializingMetricWriterTest extends DDSpecification { WellKnownTags wellKnownTags = new WellKnownTags("runtimeid", "hostname", "env", "service", "version", "language") // Create keys with different combinations of HTTP fields - def keyWithNoSource = new MetricKey("resource", "service", "operation", null, "type", 200, false, false, "server", [], "GET", "/api/users", null) - def keyWithSource = new MetricKey("resource", "service", "operation", "source", "type", 200, false, false, "server", [], "POST", null, null) + def keyWithNoSource = new MetricKey("resource", "service", "operation", null, "type", 200, false, false, "server", [], "GET", "/api/users", null, null) + def keyWithSource = new MetricKey("resource", "service", "operation", "source", "type", 200, false, false, "server", [], "POST", null, null, null) def content = [ Pair.of(keyWithNoSource, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L))), @@ -178,10 +178,10 @@ class SerializingMetricWriterTest extends DDSpecification { WellKnownTags wellKnownTags = new WellKnownTags("runtimeid", "hostname", "env", "service", "version", "language") // Create keys with different combinations of HTTP fields - def keyWithBoth = new MetricKey("resource", "service", "operation", null, "type", 200, false, false, "server", [], "GET", "/api/users", null) - def keyWithMethodOnly = new MetricKey("resource", "service", "operation", null, "type", 200, false, false, "server", [], "POST", null,null) - def keyWithEndpointOnly = new MetricKey("resource", "service", "operation", null, "type", 200, false, false, "server", [], null, "/api/orders",null) - def keyWithNeither = new MetricKey("resource", "service", "operation", null, "type", 200, false, false, "client", [], null, null, null) + def keyWithBoth = new MetricKey("resource", "service", "operation", null, "type", 200, false, false, "server", [], "GET", "/api/users", null, null) + def keyWithMethodOnly = new MetricKey("resource", "service", "operation", null, "type", 200, false, false, "server", [], "POST", null,null, null) + def keyWithEndpointOnly = new MetricKey("resource", "service", "operation", null, "type", 200, false, false, "server", [], null, "/api/orders",null, null) + def keyWithNeither = new MetricKey("resource", "service", "operation", null, "type", 200, false, false, "client", [], null, null, null, null) def content = [ Pair.of(keyWithBoth, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L))), @@ -217,7 +217,7 @@ class SerializingMetricWriterTest extends DDSpecification { WellKnownTags wellKnownTags = new WellKnownTags("runtimeid", "hostname", "env", "service", "version", "language") // Create keys with different combinations of HTTP fields - def key = new MetricKey("resource", "service", "operation", null, "type", 200, false, false, "server", [], "GET", "/api/users", null) + def key = new MetricKey("resource", "service", "operation", null, "type", 200, false, false, "server", [], "GET", "/api/users", null, null) def content = [Pair.of(key, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L))),] @@ -307,7 +307,7 @@ class SerializingMetricWriterTest extends DDSpecification { boolean hasHttpEndpoint = key.getHttpEndpoint() != null boolean hasServiceSource = key.getServiceSource() != null boolean hasGrpcStatusCode = key.getGrpcStatusCode() != null - int expectedMapSize = 15 + (hasServiceSource ? 1 : 0) + (hasHttpMethod ? 1 : 0) + (hasHttpEndpoint ? 1 : 0) + (hasGrpcStatusCode ? 1 : 0) + int expectedMapSize = 16 + (hasServiceSource ? 1 : 0) + (hasHttpMethod ? 1 : 0) + (hasHttpEndpoint ? 1 : 0) + (hasGrpcStatusCode ? 1 : 0) assert metricMapSize == expectedMapSize int elementCount = 0 assert unpacker.unpackString() == "Name" @@ -342,6 +342,14 @@ class SerializingMetricWriterTest extends DDSpecification { assert unpackedPeerTag == key.getPeerTags()[i].toString() } ++elementCount + assert unpacker.unpackString() == "AdditionalMetricTags" + int additionalTagsLength = unpacker.unpackArrayHeader() + assert additionalTagsLength == key.getAdditionalTags().size() + for (int i = 0; i < additionalTagsLength; i++) { + def unpackedTag = unpacker.unpackString() + assert unpackedTag == key.getAdditionalTags()[i].toString() + } + ++elementCount // Service source is only present when the service name has been overridden by the tracer if (hasServiceSource) { assert unpacker.unpackString() == "srv_src" @@ -405,8 +413,8 @@ class SerializingMetricWriterTest extends DDSpecification { WellKnownTags wellKnownTags = new WellKnownTags("runtimeid", "hostname", "env", "service", "version", "language") // Create keys with different combinations of HTTP fields - def keyWithNoSource = new MetricKey("resource", "service", "operation", null, "type", 200, false, false, "server", [], "GET", "/api/users", null) - def keyWithSource = new MetricKey("resource", "service", "operation", "source", "type", 200, false, false, "server", [], "POST", null, null) + def keyWithNoSource = new MetricKey("resource", "service", "operation", null, "type", 200, false, false, "server", [], "GET", "/api/users", null, null) + def keyWithSource = new MetricKey("resource", "service", "operation", "source", "type", 200, false, false, "server", [], "POST", null, null, null) def content = [ Pair.of(keyWithNoSource, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L))), @@ -433,9 +441,9 @@ class SerializingMetricWriterTest extends DDSpecification { long duration = SECONDS.toNanos(10) WellKnownTags wellKnownTags = new WellKnownTags("runtimeid", "hostname", "env", "service", "version", "language") - def keyWithGrpc = new MetricKey("grpc.service/Method", "grpc-service", "grpc.server", null, "rpc", 0, false, false, "server", [], null, null, "OK") - def keyWithGrpcError = new MetricKey("grpc.service/Method", "grpc-service", "grpc.server", null, "rpc", 0, false, false, "client", [], null, null, "NOT_FOUND") - def keyWithoutGrpc = new MetricKey("resource", "service", "operation", null, "web", 200, false, false, "server", [], null, null, null) + def keyWithGrpc = new MetricKey("grpc.service/Method", "grpc-service", "grpc.server", null, "rpc", 0, false, false, "server", [], null, null, "OK", null) + def keyWithGrpcError = new MetricKey("grpc.service/Method", "grpc-service", "grpc.server", null, "rpc", 0, false, false, "client", [], null, null, "NOT_FOUND", null) + def keyWithoutGrpc = new MetricKey("resource", "service", "operation", null, "web", 200, false, false, "server", [], null, null, null, null) def content = [ Pair.of(keyWithGrpc, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L))), @@ -464,10 +472,10 @@ class SerializingMetricWriterTest extends DDSpecification { WellKnownTags wellKnownTags = new WellKnownTags("runtimeid", "hostname", "env", "service", "version", "language") // Create keys with different combinations of HTTP fields - def keyWithBoth = new MetricKey("resource", "service", "operation", null, "type", 200, false, false, "server", [], "GET", "/api/users", null) - def keyWithMethodOnly = new MetricKey("resource", "service", "operation", null, "type", 200, false, false, "server", [], "POST", null, null) - def keyWithEndpointOnly = new MetricKey("resource", "service", "operation", null, "type", 200, false, false, "server", [], null, "/api/orders", null) - def keyWithNeither = new MetricKey("resource", "service", "operation", null, "type", 200, false, false, "client", [], null, null, null) + def keyWithBoth = new MetricKey("resource", "service", "operation", null, "type", 200, false, false, "server", [], "GET", "/api/users", null, null) + def keyWithMethodOnly = new MetricKey("resource", "service", "operation", null, "type", 200, false, false, "server", [], "POST", null, null, null) + def keyWithEndpointOnly = new MetricKey("resource", "service", "operation", null, "type", 200, false, false, "server", [], null, "/api/orders", null, null) + def keyWithNeither = new MetricKey("resource", "service", "operation", null, "type", 200, false, false, "client", [], null, null, null, null) def content = [ Pair.of(keyWithBoth, new AggregateMetric().recordDurations(1, new AtomicLongArray(1L))), diff --git a/dd-trace-core/src/test/java/datadog/trace/common/metrics/MetricKeyAdditionalTagsTest.java b/dd-trace-core/src/test/java/datadog/trace/common/metrics/MetricKeyAdditionalTagsTest.java new file mode 100644 index 00000000000..1cdd35f02bc --- /dev/null +++ b/dd-trace-core/src/test/java/datadog/trace/common/metrics/MetricKeyAdditionalTagsTest.java @@ -0,0 +1,77 @@ +package datadog.trace.common.metrics; + +import static java.util.Collections.emptyList; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; + +import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; +import java.util.ArrayList; +import java.util.List; +import org.junit.jupiter.api.Test; + +class MetricKeyAdditionalTagsTest { + + @Test + void emptyAndNullAdditionalTagsAreEquivalent() { + MetricKey a = key(null); + MetricKey b = key(emptyList()); + assertEquals(a, b); + assertEquals(a.hashCode(), b.hashCode()); + assertEquals(emptyList(), a.getAdditionalTags()); + } + + @Test + void sameOrderProducesEqualKeys() { + MetricKey a = key(tags("region:us-east-1", "tenant_id:acme")); + MetricKey b = key(tags("region:us-east-1", "tenant_id:acme")); + assertEquals(a, b); + assertEquals(a.hashCode(), b.hashCode()); + } + + @Test + void differentValuesProduceDifferentKeys() { + MetricKey a = key(tags("region:us-east-1")); + MetricKey b = key(tags("region:eu-west-1")); + assertNotEquals(a, b); + } + + @Test + void differentTagSetsProduceDifferentKeys() { + MetricKey a = key(tags("region:us-east-1")); + MetricKey b = key(tags("region:us-east-1", "tenant_id:acme")); + assertNotEquals(a, b); + } + + @Test + void keyWithAdditionalTagsDiffersFromKeyWithout() { + MetricKey a = key(emptyList()); + MetricKey b = key(tags("region:us-east-1")); + assertNotEquals(a, b); + } + + private static List tags(String... entries) { + List list = new ArrayList<>(entries.length); + for (String e : entries) { + list.add(UTF8BytesString.create(e)); + } + return list; + } + + private static MetricKey key(List additionalTags) { + return new MetricKey( + "resource", + "service", + "operation", + null, + "web", + 200, + false, + false, + "server", + emptyList(), + null, + null, + null, + additionalTags); + } +} diff --git a/dd-trace-core/src/test/java/datadog/trace/common/metrics/SerializingMetricWriterAdditionalTagsTest.java b/dd-trace-core/src/test/java/datadog/trace/common/metrics/SerializingMetricWriterAdditionalTagsTest.java new file mode 100644 index 00000000000..1b7187df5ab --- /dev/null +++ b/dd-trace-core/src/test/java/datadog/trace/common/metrics/SerializingMetricWriterAdditionalTagsTest.java @@ -0,0 +1,131 @@ +package datadog.trace.common.metrics; + +import static java.util.Collections.emptyList; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import datadog.metrics.api.Histograms; +import datadog.metrics.impl.DDSketchHistograms; +import datadog.trace.api.WellKnownTags; +import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.atomic.AtomicLongArray; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.msgpack.core.MessagePack; +import org.msgpack.core.MessageUnpacker; + +class SerializingMetricWriterAdditionalTagsTest { + + @BeforeAll + static void registerHistograms() { + Histograms.register(DDSketchHistograms.FACTORY); + } + + @Test + void emptyAdditionalTagsAreEmittedAsEmptyArray() throws Exception { + List emitted = roundTripAdditionalTags(emptyList()); + assertEquals(0, emitted.size()); + } + + @Test + void populatedAdditionalTagsAreEmittedInOrder() throws Exception { + List tags = + Arrays.asList( + UTF8BytesString.create("region:us-east-1"), UTF8BytesString.create("tenant_id:acme")); + List emitted = roundTripAdditionalTags(tags); + assertEquals(Arrays.asList("region:us-east-1", "tenant_id:acme"), emitted); + } + + private List roundTripAdditionalTags(List additionalTags) + throws Exception { + WellKnownTags wellKnownTags = + new WellKnownTags("runtimeid", "hostname", "env", "service", "version", "language"); + CapturingSink sink = new CapturingSink(); + SerializingMetricWriter writer = new SerializingMetricWriter(wellKnownTags, sink, 128); + MetricKey key = + new MetricKey( + "resource", + "service", + "operation", + null, + "web", + 200, + false, + false, + "server", + emptyList(), + null, + null, + null, + additionalTags); + AggregateMetric aggregate = + new AggregateMetric().recordDurations(1, new AtomicLongArray(new long[] {1L})); + long start = MILLISECONDS.toNanos(System.currentTimeMillis()); + long duration = SECONDS.toNanos(10); + writer.startBucket(1, start, duration); + writer.add(key, aggregate); + writer.finishBucket(); + assertNotNull(sink.captured); + return readAdditionalTagsFromPayload(sink.captured); + } + + private static List readAdditionalTagsFromPayload(ByteBuffer buffer) throws Exception { + MessageUnpacker unpacker = MessagePack.newDefaultUnpacker(buffer); + int envelopeSize = unpacker.unpackMapHeader(); + for (int i = 0; i < envelopeSize; i++) { + String envelopeKey = unpacker.unpackString(); + if ("Stats".equals(envelopeKey)) { + int bucketCount = unpacker.unpackArrayHeader(); + assertEquals(1, bucketCount); + int bucketMapSize = unpacker.unpackMapHeader(); + for (int b = 0; b < bucketMapSize; b++) { + String bk = unpacker.unpackString(); + if ("Stats".equals(bk)) { + int statCount = unpacker.unpackArrayHeader(); + assertEquals(1, statCount); + int metricMapSize = unpacker.unpackMapHeader(); + for (int m = 0; m < metricMapSize; m++) { + String mk = unpacker.unpackString(); + if ("AdditionalMetricTags".equals(mk)) { + int tagCount = unpacker.unpackArrayHeader(); + List result = new ArrayList<>(tagCount); + for (int t = 0; t < tagCount; t++) { + result.add(unpacker.unpackString()); + } + return result; + } else { + unpacker.skipValue(); + } + } + } else { + unpacker.skipValue(); + } + } + } else { + unpacker.skipValue(); + } + } + throw new AssertionError("AdditionalMetricTags field not found in payload"); + } + + private static final class CapturingSink implements Sink { + ByteBuffer captured; + final List listeners = new ArrayList<>(); + + @Override + public void register(EventListener listener) { + listeners.add(listener); + } + + @Override + public void accept(int messageCount, ByteBuffer buffer) { + this.captured = buffer; + } + } +} diff --git a/dd-trace-core/src/traceAgentTest/groovy/MetricsIntegrationTest.groovy b/dd-trace-core/src/traceAgentTest/groovy/MetricsIntegrationTest.groovy index 2972ffa2c18..ae6ec6b82eb 100644 --- a/dd-trace-core/src/traceAgentTest/groovy/MetricsIntegrationTest.groovy +++ b/dd-trace-core/src/traceAgentTest/groovy/MetricsIntegrationTest.groovy @@ -40,11 +40,11 @@ class MetricsIntegrationTest extends AbstractTraceAgentTest { ) writer.startBucket(2, System.nanoTime(), SECONDS.toNanos(10)) writer.add( - new MetricKey("resource1", "service1", "operation1", null, "sql", 0, false, true, "xyzzy", [UTF8BytesString.create("grault:quux")], null, null, null), + new MetricKey("resource1", "service1", "operation1", null, "sql", 0, false, true, "xyzzy", [UTF8BytesString.create("grault:quux")], null, null, null, null), new AggregateMetric().recordDurations(5, new AtomicLongArray(2, 1, 2, 250, 4, 5)) ) writer.add( - new MetricKey("resource2", "service2", "operation2", null, "web", 200, false, true, "xyzzy", [UTF8BytesString.create("grault:quux")], null, null, null), + new MetricKey("resource2", "service2", "operation2", null, "web", 200, false, true, "xyzzy", [UTF8BytesString.create("grault:quux")], null, null, null, null), new AggregateMetric().recordDurations(10, new AtomicLongArray(1, 1, 200, 2, 3, 4, 5, 6, 7, 8, 9)) ) writer.finishBucket() diff --git a/internal-api/src/main/java/datadog/trace/api/Config.java b/internal-api/src/main/java/datadog/trace/api/Config.java index a463887f61a..1cd27fe4595 100644 --- a/internal-api/src/main/java/datadog/trace/api/Config.java +++ b/internal-api/src/main/java/datadog/trace/api/Config.java @@ -417,6 +417,7 @@ import static datadog.trace.api.config.GeneralConfig.TRACER_METRICS_MAX_PENDING; import static datadog.trace.api.config.GeneralConfig.TRACE_DEBUG; import static datadog.trace.api.config.GeneralConfig.TRACE_LOG_LEVEL; +import static datadog.trace.api.config.GeneralConfig.TRACE_STATS_ADDITIONAL_TAGS; import static datadog.trace.api.config.GeneralConfig.TRACE_STATS_COMPUTATION_ENABLED; import static datadog.trace.api.config.GeneralConfig.TRACE_STATS_COMPUTATION_IGNORE_AGENT_VERSION; import static datadog.trace.api.config.GeneralConfig.TRACE_TAGS; @@ -5108,6 +5109,10 @@ public Set getMetricsIgnoredResources() { return tryMakeImmutableSet(configProvider.getList(TRACER_METRICS_IGNORED_RESOURCES)); } + public List getTraceStatsAdditionalTags() { + return tryMakeImmutableList(configProvider.getList(TRACE_STATS_ADDITIONAL_TAGS)); + } + public String getEnv() { // intentionally not thread safe if (env == null) { diff --git a/metadata/supported-configurations.json b/metadata/supported-configurations.json index 8db93e05399..d5b2ab3b74c 100644 --- a/metadata/supported-configurations.json +++ b/metadata/supported-configurations.json @@ -10577,6 +10577,14 @@ "aliases": [] } ], + "DD_TRACE_STATS_ADDITIONAL_TAGS": [ + { + "version": "B", + "type": "array", + "default": null, + "aliases": [] + } + ], "DD_TRACE_STATS_COMPUTATION_ENABLED": [ { "version": "B", From 4597764eec114d93993fa2c2b19d4cf327115d58 Mon Sep 17 00:00:00 2001 From: Sam Maya Date: Wed, 13 May 2026 11:17:08 -0400 Subject: [PATCH 2/4] add max keys and switch from list to set --- .../ConflatingMetricsAggregatorBenchmark.java | 2 +- .../metrics/ConflatingMetricsAggregator.java | 32 ++++++++-- .../metrics/SerializingMetricWriter.java | 18 +++--- .../ConflatingMetricAggregatorTest.groovy | 54 ++++++++--------- .../common/metrics/FootprintForkedTest.groovy | 2 +- .../SerializingMetricWriterTest.groovy | 19 +++--- ...ingMetricsAggregatorNormalizationTest.java | 58 +++++++++++++++++++ ...alizingMetricWriterAdditionalTagsTest.java | 7 ++- .../main/java/datadog/trace/api/Config.java | 4 +- 9 files changed, 142 insertions(+), 54 deletions(-) create mode 100644 dd-trace-core/src/test/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorNormalizationTest.java diff --git a/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorBenchmark.java b/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorBenchmark.java index bf868c4793e..f57cea255fa 100644 --- a/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorBenchmark.java +++ b/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorBenchmark.java @@ -40,7 +40,7 @@ public class ConflatingMetricsAggregatorBenchmark { new ConflatingMetricsAggregator( new WellKnownTags("", "", "", "", "", ""), Collections.emptySet(), - Collections.emptyList(), + Collections.emptySet(), featuresDiscovery, HealthMetrics.NO_OP, new NullSink(), diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java index 783097a58cf..4385c4e4ce0 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java @@ -102,6 +102,11 @@ public final class ConflatingMetricsAggregator implements MetricsAggregator, Eve unmodifiableSet( new HashSet<>(Arrays.asList(SPAN_KIND_CLIENT, SPAN_KIND_PRODUCER, SPAN_KIND_CONSUMER))); + // Cap on configured additional metric tag keys. By default only 4 primary tag dimensions are supported. + // We sometimes increase this limit for users so a value of 10 allows us to protect against extreme misconfiguration + // while still allowing some additional tags to be used. + static final int MAX_ADDITIONAL_TAG_KEYS = 10; + private final Set ignoredResources; private final List additionalTagKeys; private final MessagePassingQueue batchPool; @@ -144,7 +149,7 @@ public ConflatingMetricsAggregator( ConflatingMetricsAggregator( WellKnownTags wellKnownTags, Set ignoredResources, - List additionalTagKeys, + Set additionalTagKeys, DDAgentFeaturesDiscovery features, HealthMetrics healthMetric, Sink sink, @@ -168,7 +173,7 @@ public ConflatingMetricsAggregator( ConflatingMetricsAggregator( WellKnownTags wellKnownTags, Set ignoredResources, - List additionalTagKeys, + Set additionalTagKeys, DDAgentFeaturesDiscovery features, HealthMetrics healthMetric, Sink sink, @@ -193,7 +198,7 @@ public ConflatingMetricsAggregator( ConflatingMetricsAggregator( Set ignoredResources, - List additionalTagKeys, + Set additionalTagKeys, DDAgentFeaturesDiscovery features, HealthMetrics healthMetric, Sink sink, @@ -204,8 +209,7 @@ public ConflatingMetricsAggregator( TimeUnit timeUnit, boolean includeEndpointInMetrics) { this.ignoredResources = ignoredResources; - this.additionalTagKeys = - additionalTagKeys == null ? Collections.emptyList() : additionalTagKeys; + this.additionalTagKeys = normalizeAdditionalTagKeys(additionalTagKeys); this.includeEndpointInMetrics = includeEndpointInMetrics; this.inbox = Queues.mpscArrayQueue(queueSize); this.batchPool = Queues.spmcArrayQueue(maxAggregates); @@ -433,6 +437,24 @@ private List getPeerTags(CoreSpan span, String spanKind) { return Collections.emptyList(); } + static List normalizeAdditionalTagKeys(Set configured) { + if (configured == null || configured.isEmpty()) { + return Collections.emptyList(); + } + List sorted = new ArrayList<>(configured); + Collections.sort(sorted); + if (sorted.size() > MAX_ADDITIONAL_TAG_KEYS) { + log.warn( + "Configured additional metric tag keys ({}) exceeds the supported limit of {}; " + + "dropping extra keys: {}", + sorted.size(), + MAX_ADDITIONAL_TAG_KEYS, + sorted.subList(MAX_ADDITIONAL_TAG_KEYS, sorted.size())); + sorted = sorted.subList(0, MAX_ADDITIONAL_TAG_KEYS); + } + return Collections.unmodifiableList(new ArrayList<>(sorted)); + } + private List getAdditionalTags(CoreSpan span) { if (additionalTagKeys.isEmpty()) { return Collections.emptyList(); diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java index ef482dee546..b59f197a5e7 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/SerializingMetricWriter.java @@ -149,12 +149,14 @@ public void add(MetricKey key, AggregateMetric aggregate) { final boolean hasHttpEndpoint = key.getHttpEndpoint() != null; final boolean hasServiceSource = key.getServiceSource() != null; final boolean hasGrpcStatusCode = key.getGrpcStatusCode() != null; + final boolean hasAdditionalTags = !key.getAdditionalTags().isEmpty(); final int mapSize = - 16 + 15 + (hasServiceSource ? 1 : 0) + (hasHttpMethod ? 1 : 0) + (hasHttpEndpoint ? 1 : 0) - + (hasGrpcStatusCode ? 1 : 0); + + (hasGrpcStatusCode ? 1 : 0) + + (hasAdditionalTags ? 1 : 0); writer.startMap(mapSize); @@ -190,11 +192,13 @@ public void add(MetricKey key, AggregateMetric aggregate) { writer.writeUTF8(peerTag); } - writer.writeUTF8(ADDITIONAL_METRIC_TAGS); - final List additionalTags = key.getAdditionalTags(); - writer.startArray(additionalTags.size()); - for (UTF8BytesString tag : additionalTags) { - writer.writeUTF8(tag); + if (hasAdditionalTags) { + writer.writeUTF8(ADDITIONAL_METRIC_TAGS); + final List additionalTags = key.getAdditionalTags(); + writer.startArray(additionalTags.size()); + for (UTF8BytesString tag : additionalTags) { + writer.writeUTF8(tag); + } } if (hasServiceSource) { diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy index bbc94103534..b5942084458 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy @@ -38,7 +38,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator( wellKnownTags, empty, - [] as List, + [] as Set, features, HealthMetrics.NO_OP, sink, @@ -69,7 +69,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator( wellKnownTags, [ignoredResourceName].toSet(), - [] as List, + [] as Set, features, HealthMetrics.NO_OP, sink, @@ -106,7 +106,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - [] as List, + [] as Set, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, false) aggregator.start() @@ -153,7 +153,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - [] as List, + [] as Set, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, false) aggregator.start() @@ -200,7 +200,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - [] as List, + [] as Set, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, true) aggregator.start() @@ -266,7 +266,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >>> [["country"], ["country", "georegion"],] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - [] as List, + [] as Set, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, false) aggregator.start() @@ -334,7 +334,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >> ["peer.hostname", "_dd.base_service"] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - [] as List, + [] as Set, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, false) aggregator.start() @@ -387,7 +387,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true features.peerTags() >> [] - ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, [] as List, features, HealthMetrics.NO_OP, + ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, [] as Set, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, false) aggregator.start() @@ -440,7 +440,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - [] as List, + [] as Set, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, false) long duration = 100 List trace = [ @@ -513,7 +513,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - [] as List, + [] as Set, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, true) aggregator.start() @@ -641,7 +641,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - [] as List, + [] as Set, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, true) aggregator.start() @@ -757,7 +757,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - [] as List, + [] as Set, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, true) aggregator.start() @@ -828,7 +828,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - [] as List, + [] as Set, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, false) aggregator.start() @@ -898,7 +898,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - [] as List, + [] as Set, features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS, false) long duration = 100 aggregator.start() @@ -967,7 +967,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.peerTags() >> [] HealthMetrics healthMetrics = Mock(HealthMetrics) ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - [] as List, + [] as Set, features, healthMetrics, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS, false) long duration = 100 aggregator.start() @@ -1002,7 +1002,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.peerTags() >> [] HealthMetrics healthMetrics = Mock(HealthMetrics) ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - [] as List, + [] as Set, features, healthMetrics, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS, false) aggregator.start() @@ -1048,7 +1048,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - [] as List, + [] as Set, features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS, false) long duration = 100 aggregator.start() @@ -1151,7 +1151,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - [] as List, + [] as Set, features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS, false) long duration = 100 aggregator.start() @@ -1212,7 +1212,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - [] as List, + [] as Set, features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS, false) long duration = 100 aggregator.start() @@ -1264,7 +1264,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - [] as List, + [] as Set, features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS, false) long duration = 100 aggregator.start() @@ -1296,7 +1296,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { Sink sink = Stub(Sink) DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - [] as List, + [] as Set, features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS, false) aggregator.start() @@ -1319,7 +1319,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> false features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - [] as List, + [] as Set, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, 200, MILLISECONDS, false) final spans = [ new SimpleSpan("service", "operation", "resource", "type", false, true, false, 0, 10, HTTP_OK) @@ -1352,7 +1352,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - [] as List, + [] as Set, features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS, false) when: @@ -1386,7 +1386,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - [] as List, + [] as Set, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, false) aggregator.start() @@ -1434,7 +1434,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - [] as List, + [] as Set, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, false) aggregator.start() @@ -1490,7 +1490,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - [] as List, + [] as Set, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, true) aggregator.start() @@ -1582,7 +1582,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, - [] as List, + [] as Set, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, false) aggregator.start() diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/FootprintForkedTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/FootprintForkedTest.groovy index adf8754dbe9..c65662660ee 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/FootprintForkedTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/FootprintForkedTest.groovy @@ -40,7 +40,7 @@ class FootprintForkedTest extends DDSpecification { ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator( new WellKnownTags("runtimeid","hostname", "env", "service", "version","language"), [].toSet() as Set, - [] as List, + [] as Set, features, HealthMetrics.NO_OP, sink, diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SerializingMetricWriterTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SerializingMetricWriterTest.groovy index ecc76d74c3f..bda4aa9b654 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SerializingMetricWriterTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SerializingMetricWriterTest.groovy @@ -307,7 +307,8 @@ class SerializingMetricWriterTest extends DDSpecification { boolean hasHttpEndpoint = key.getHttpEndpoint() != null boolean hasServiceSource = key.getServiceSource() != null boolean hasGrpcStatusCode = key.getGrpcStatusCode() != null - int expectedMapSize = 16 + (hasServiceSource ? 1 : 0) + (hasHttpMethod ? 1 : 0) + (hasHttpEndpoint ? 1 : 0) + (hasGrpcStatusCode ? 1 : 0) + boolean hasAdditionalTags = key.getAdditionalTags().size() > 0 + int expectedMapSize = 15 + (hasServiceSource ? 1 : 0) + (hasHttpMethod ? 1 : 0) + (hasHttpEndpoint ? 1 : 0) + (hasGrpcStatusCode ? 1 : 0) + (hasAdditionalTags ? 1 : 0) assert metricMapSize == expectedMapSize int elementCount = 0 assert unpacker.unpackString() == "Name" @@ -342,14 +343,16 @@ class SerializingMetricWriterTest extends DDSpecification { assert unpackedPeerTag == key.getPeerTags()[i].toString() } ++elementCount - assert unpacker.unpackString() == "AdditionalMetricTags" - int additionalTagsLength = unpacker.unpackArrayHeader() - assert additionalTagsLength == key.getAdditionalTags().size() - for (int i = 0; i < additionalTagsLength; i++) { - def unpackedTag = unpacker.unpackString() - assert unpackedTag == key.getAdditionalTags()[i].toString() + if (hasAdditionalTags) { + assert unpacker.unpackString() == "AdditionalMetricTags" + int additionalTagsLength = unpacker.unpackArrayHeader() + assert additionalTagsLength == key.getAdditionalTags().size() + for (int i = 0; i < additionalTagsLength; i++) { + def unpackedTag = unpacker.unpackString() + assert unpackedTag == key.getAdditionalTags()[i].toString() + } + ++elementCount } - ++elementCount // Service source is only present when the service name has been overridden by the tracer if (hasServiceSource) { assert unpacker.unpackString() == "srv_src" diff --git a/dd-trace-core/src/test/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorNormalizationTest.java b/dd-trace-core/src/test/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorNormalizationTest.java new file mode 100644 index 00000000000..8453316a65a --- /dev/null +++ b/dd-trace-core/src/test/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorNormalizationTest.java @@ -0,0 +1,58 @@ +package datadog.trace.common.metrics; + +import static datadog.trace.common.metrics.ConflatingMetricsAggregator.MAX_ADDITIONAL_TAG_KEYS; +import static datadog.trace.common.metrics.ConflatingMetricsAggregator.normalizeAdditionalTagKeys; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Set; +import java.util.TreeSet; +import org.junit.jupiter.api.Test; + +class ConflatingMetricsAggregatorNormalizationTest { + + @Test + void nullOrEmptyProducesEmptyList() { + assertEquals(Collections.emptyList(), normalizeAdditionalTagKeys(null)); + assertEquals( + Collections.emptyList(), normalizeAdditionalTagKeys(Collections.emptySet())); + } + + @Test + void resultIsSortedAlphabetically() { + Set configured = new LinkedHashSet<>(Arrays.asList("region", "tenant_id", "az")); + assertEquals( + Arrays.asList("az", "region", "tenant_id"), normalizeAdditionalTagKeys(configured)); + } + + @Test + void resultIsImmutable() { + Set configured = new LinkedHashSet<>(Arrays.asList("region", "tenant_id")); + List normalized = normalizeAdditionalTagKeys(configured); + assertThrows(UnsupportedOperationException.class, () -> normalized.add("oops")); + } + + @Test + void inputOrderDoesNotAffectResult() { + Set a = new LinkedHashSet<>(Arrays.asList("region", "tenant_id")); + Set b = new LinkedHashSet<>(Arrays.asList("tenant_id", "region")); + assertEquals(normalizeAdditionalTagKeys(a), normalizeAdditionalTagKeys(b)); + } + + @Test + void exceedingMaxKeysTruncatesAfterSort() { + Set configured = new TreeSet<>(); + for (int i = 0; i < MAX_ADDITIONAL_TAG_KEYS + 5; i++) { + configured.add(String.format("tag_%02d", i)); + } + List normalized = normalizeAdditionalTagKeys(configured); + assertEquals(MAX_ADDITIONAL_TAG_KEYS, normalized.size()); + assertTrue(normalized.contains("tag_00")); + assertTrue(normalized.contains(String.format("tag_%02d", MAX_ADDITIONAL_TAG_KEYS - 1))); + } +} diff --git a/dd-trace-core/src/test/java/datadog/trace/common/metrics/SerializingMetricWriterAdditionalTagsTest.java b/dd-trace-core/src/test/java/datadog/trace/common/metrics/SerializingMetricWriterAdditionalTagsTest.java index 1b7187df5ab..2733b986b33 100644 --- a/dd-trace-core/src/test/java/datadog/trace/common/metrics/SerializingMetricWriterAdditionalTagsTest.java +++ b/dd-trace-core/src/test/java/datadog/trace/common/metrics/SerializingMetricWriterAdditionalTagsTest.java @@ -5,6 +5,7 @@ import static java.util.concurrent.TimeUnit.SECONDS; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; import datadog.metrics.api.Histograms; import datadog.metrics.impl.DDSketchHistograms; @@ -28,9 +29,9 @@ static void registerHistograms() { } @Test - void emptyAdditionalTagsAreEmittedAsEmptyArray() throws Exception { + void emptyAdditionalTagsOmitTheField() throws Exception { List emitted = roundTripAdditionalTags(emptyList()); - assertEquals(0, emitted.size()); + assertNull(emitted); } @Test @@ -111,7 +112,7 @@ private static List readAdditionalTagsFromPayload(ByteBuffer buffer) thr unpacker.skipValue(); } } - throw new AssertionError("AdditionalMetricTags field not found in payload"); + return null; } private static final class CapturingSink implements Sink { diff --git a/internal-api/src/main/java/datadog/trace/api/Config.java b/internal-api/src/main/java/datadog/trace/api/Config.java index 1cd27fe4595..d19ee39b4e1 100644 --- a/internal-api/src/main/java/datadog/trace/api/Config.java +++ b/internal-api/src/main/java/datadog/trace/api/Config.java @@ -5109,8 +5109,8 @@ public Set getMetricsIgnoredResources() { return tryMakeImmutableSet(configProvider.getList(TRACER_METRICS_IGNORED_RESOURCES)); } - public List getTraceStatsAdditionalTags() { - return tryMakeImmutableList(configProvider.getList(TRACE_STATS_ADDITIONAL_TAGS)); + public Set getTraceStatsAdditionalTags() { + return tryMakeImmutableSet(configProvider.getList(TRACE_STATS_ADDITIONAL_TAGS)); } public String getEnv() { From 983ae32b7d257b51560bd53cad77af3d96ce797c Mon Sep 17 00:00:00 2001 From: Sam Maya Date: Wed, 13 May 2026 11:56:38 -0400 Subject: [PATCH 3/4] add cardinality control --- .../trace/api/config/GeneralConfig.java | 2 + .../ConflatingMetricsAggregatorBenchmark.java | 1 + .../AdditionalTagsCardinalityLimiter.java | 70 ++++++++++++++ .../metrics/ConflatingMetricsAggregator.java | 42 ++++++++- .../trace/core/monitor/HealthMetrics.java | 2 + .../core/monitor/TracerHealthMetrics.java | 15 ++- .../ConflatingMetricAggregatorTest.groovy | 28 +++++- .../common/metrics/FootprintForkedTest.groovy | 1 + .../AdditionalTagsCardinalityLimiterTest.java | 93 +++++++++++++++++++ .../main/java/datadog/trace/api/Config.java | 13 +++ metadata/supported-configurations.json | 8 ++ 11 files changed, 270 insertions(+), 5 deletions(-) create mode 100644 dd-trace-core/src/main/java/datadog/trace/common/metrics/AdditionalTagsCardinalityLimiter.java create mode 100644 dd-trace-core/src/test/java/datadog/trace/common/metrics/AdditionalTagsCardinalityLimiterTest.java diff --git a/dd-trace-api/src/main/java/datadog/trace/api/config/GeneralConfig.java b/dd-trace-api/src/main/java/datadog/trace/api/config/GeneralConfig.java index 945c34764c5..9f1a458ad3d 100644 --- a/dd-trace-api/src/main/java/datadog/trace/api/config/GeneralConfig.java +++ b/dd-trace-api/src/main/java/datadog/trace/api/config/GeneralConfig.java @@ -77,6 +77,8 @@ public final class GeneralConfig { public static final String TRACER_METRICS_IGNORED_RESOURCES = "trace.tracer.metrics.ignored.resources"; public static final String TRACE_STATS_ADDITIONAL_TAGS = "trace.stats.additional.tags"; + public static final String TRACE_STATS_ADDITIONAL_TAGS_CARDINALITY_LIMIT = + "trace.stats.additional.tags.cardinality.limit"; public static final String AZURE_APP_SERVICES = "azure.app.services"; public static final String INTERNAL_EXIT_ON_FAILURE = "trace.internal.exit.on.failure"; diff --git a/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorBenchmark.java b/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorBenchmark.java index f57cea255fa..b66e2cfc266 100644 --- a/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorBenchmark.java +++ b/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorBenchmark.java @@ -41,6 +41,7 @@ public class ConflatingMetricsAggregatorBenchmark { new WellKnownTags("", "", "", "", "", ""), Collections.emptySet(), Collections.emptySet(), + 100, featuresDiscovery, HealthMetrics.NO_OP, new NullSink(), diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/AdditionalTagsCardinalityLimiter.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AdditionalTagsCardinalityLimiter.java new file mode 100644 index 00000000000..3d6b3bed265 --- /dev/null +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AdditionalTagsCardinalityLimiter.java @@ -0,0 +1,70 @@ +package datadog.trace.common.metrics; + +import datadog.trace.core.monitor.HealthMetrics; +import java.util.Collections; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Bounded per-tag cardinality protection for `additional_metric_tags`. + * + *

For each configured tag key, admits at most {@code limitPerTag} distinct values within a + * rolling window. Excess values are replaced with {@link #BLOCKED_VALUE} so the span's base stats + * still flow through but the extra dimension is suppressed. + * + *

The rolling window is implemented as a hard reset: callers schedule {@link #reset()} on a + * fixed interval (10 minutes by default). After a reset, previously blocked values get a fresh + * chance to be admitted. + */ +final class AdditionalTagsCardinalityLimiter { + + static final String BLOCKED_VALUE = "blocked_by_tracer"; + + private static final Logger log = LoggerFactory.getLogger(AdditionalTagsCardinalityLimiter.class); + + private final int limitPerTag; + private final HealthMetrics healthMetrics; + private final ConcurrentHashMap> seenValuesPerTag = new ConcurrentHashMap<>(); + private final Set warnedThisWindow = Collections.newSetFromMap(new ConcurrentHashMap<>()); + + AdditionalTagsCardinalityLimiter(int limitPerTag, HealthMetrics healthMetrics) { + this.limitPerTag = limitPerTag; + this.healthMetrics = healthMetrics; + } + + /** + * @return {@code value} if admitted under the cap, otherwise {@link #BLOCKED_VALUE}. + */ + String admitOrBlock(String tagKey, String value) { + Set seen = + seenValuesPerTag.computeIfAbsent( + tagKey, k -> Collections.newSetFromMap(new ConcurrentHashMap<>())); + if (seen.contains(value)) { + return value; + } + if (seen.size() >= limitPerTag) { + healthMetrics.onAdditionalTagValueCardinalityBlocked(tagKey); + if (warnedThisWindow.add(tagKey)) { + log.warn( + "Additional metric tag '{}' exceeded the per-tag cardinality limit of {}; " + + "replacing values with '{}' for the rest of the current window", + tagKey, + limitPerTag, + BLOCKED_VALUE); + } + return BLOCKED_VALUE; + } + seen.add(value); + return value; + } + + /** Clears per-tag value sets and rearms the per-key log line. Invoked by the periodic task. */ + void reset() { + for (Set seen : seenValuesPerTag.values()) { + seen.clear(); + } + warnedThisWindow.clear(); + } +} diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java index 4385c4e4ce0..be7d2efe76d 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java @@ -102,13 +102,16 @@ public final class ConflatingMetricsAggregator implements MetricsAggregator, Eve unmodifiableSet( new HashSet<>(Arrays.asList(SPAN_KIND_CLIENT, SPAN_KIND_PRODUCER, SPAN_KIND_CONSUMER))); - // Cap on configured additional metric tag keys. By default only 4 primary tag dimensions are supported. - // We sometimes increase this limit for users so a value of 10 allows us to protect against extreme misconfiguration + // Cap on configured additional metric tag keys. By default only 4 primary tag dimensions are + // supported. + // We sometimes increase this limit for users so a value of 10 allows us to protect against + // extreme misconfiguration // while still allowing some additional tags to be used. static final int MAX_ADDITIONAL_TAG_KEYS = 10; private final Set ignoredResources; private final List additionalTagKeys; + private final AdditionalTagsCardinalityLimiter cardinalityLimiter; private final MessagePassingQueue batchPool; private final ConcurrentHashMap pending; private final ConcurrentHashMap keys; @@ -123,6 +126,10 @@ public final class ConflatingMetricsAggregator implements MetricsAggregator, Eve private final boolean includeEndpointInMetrics; private volatile AgentTaskScheduler.Scheduled cancellation; + private volatile AgentTaskScheduler.Scheduled cardinalityResetCancellation; + + // Hard-reset window for per-tag value cardinality tracking. + static final long CARDINALITY_RESET_INTERVAL_MINUTES = 10; public ConflatingMetricsAggregator( Config config, @@ -132,6 +139,7 @@ public ConflatingMetricsAggregator( config.getWellKnownTags(), config.getMetricsIgnoredResources(), config.getTraceStatsAdditionalTags(), + config.getTraceStatsAdditionalTagsCardinalityLimit(), sharedCommunicationObjects.featuresDiscovery(config), healthMetrics, new OkHttpSink( @@ -150,6 +158,7 @@ public ConflatingMetricsAggregator( WellKnownTags wellKnownTags, Set ignoredResources, Set additionalTagKeys, + int additionalTagsCardinalityLimit, DDAgentFeaturesDiscovery features, HealthMetrics healthMetric, Sink sink, @@ -160,6 +169,7 @@ public ConflatingMetricsAggregator( wellKnownTags, ignoredResources, additionalTagKeys, + additionalTagsCardinalityLimit, features, healthMetric, sink, @@ -174,6 +184,7 @@ public ConflatingMetricsAggregator( WellKnownTags wellKnownTags, Set ignoredResources, Set additionalTagKeys, + int additionalTagsCardinalityLimit, DDAgentFeaturesDiscovery features, HealthMetrics healthMetric, Sink sink, @@ -185,6 +196,7 @@ public ConflatingMetricsAggregator( this( ignoredResources, additionalTagKeys, + additionalTagsCardinalityLimit, features, healthMetric, sink, @@ -199,6 +211,7 @@ public ConflatingMetricsAggregator( ConflatingMetricsAggregator( Set ignoredResources, Set additionalTagKeys, + int additionalTagsCardinalityLimit, DDAgentFeaturesDiscovery features, HealthMetrics healthMetric, Sink sink, @@ -210,6 +223,8 @@ public ConflatingMetricsAggregator( boolean includeEndpointInMetrics) { this.ignoredResources = ignoredResources; this.additionalTagKeys = normalizeAdditionalTagKeys(additionalTagKeys); + this.cardinalityLimiter = + new AdditionalTagsCardinalityLimiter(additionalTagsCardinalityLimit, healthMetric); this.includeEndpointInMetrics = includeEndpointInMetrics; this.inbox = Queues.mpscArrayQueue(queueSize); this.batchPool = Queues.spmcArrayQueue(maxAggregates); @@ -246,6 +261,14 @@ public void start() { reportingInterval, reportingInterval, reportingIntervalTimeUnit); + cardinalityResetCancellation = + AgentTaskScheduler.get() + .scheduleAtFixedRate( + new CardinalityResetTask(), + this, + CARDINALITY_RESET_INTERVAL_MINUTES, + CARDINALITY_RESET_INTERVAL_MINUTES, + TimeUnit.MINUTES); log.debug("started metrics aggregator"); } @@ -465,10 +488,11 @@ private List getAdditionalTags(CoreSpan span) { if (value == null) { continue; } + String admittedValue = cardinalityLimiter.admitOrBlock(tagKey, value.toString()); Pair, Function> cacheAndCreator = ADDITIONAL_TAG_VALUES_CACHE.computeIfAbsent(tagKey, ADDITIONAL_TAG_VALUES_CACHE_ADDER); UTF8BytesString formatted = - cacheAndCreator.getLeft().computeIfAbsent(value.toString(), cacheAndCreator.getRight()); + cacheAndCreator.getLeft().computeIfAbsent(admittedValue, cacheAndCreator.getRight()); if (result == null) { result = new ArrayList<>(additionalTagKeys.size()); } @@ -493,6 +517,9 @@ public void stop() { if (null != cancellation) { cancellation.cancel(); } + if (null != cardinalityResetCancellation) { + cardinalityResetCancellation.cancel(); + } inbox.offer(STOP); } @@ -546,4 +573,13 @@ public void run(ConflatingMetricsAggregator target) { target.report(); } } + + private static final class CardinalityResetTask + implements AgentTaskScheduler.Task { + + @Override + public void run(ConflatingMetricsAggregator target) { + target.cardinalityLimiter.reset(); + } + } } diff --git a/dd-trace-core/src/main/java/datadog/trace/core/monitor/HealthMetrics.java b/dd-trace-core/src/main/java/datadog/trace/core/monitor/HealthMetrics.java index 257d887029b..c7dadc17eff 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/monitor/HealthMetrics.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/monitor/HealthMetrics.java @@ -93,6 +93,8 @@ public void onClientStatDowngraded() {} public void onStatsAggregateDropped() {} + public void onAdditionalTagValueCardinalityBlocked(String tagKey) {} + /** * @return Human-readable summary of the current health metrics. */ diff --git a/dd-trace-core/src/main/java/datadog/trace/core/monitor/TracerHealthMetrics.java b/dd-trace-core/src/main/java/datadog/trace/core/monitor/TracerHealthMetrics.java index 2df54241e56..7ecc9c316aa 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/monitor/TracerHealthMetrics.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/monitor/TracerHealthMetrics.java @@ -98,6 +98,7 @@ public class TracerHealthMetrics extends HealthMetrics implements AutoCloseable private final LongAdder clientStatsDowngrades = new LongAdder(); private final LongAdder statsAggregateDropped = new LongAdder(); + private final LongAdder additionalTagValueCardinalityBlocked = new LongAdder(); private final StatsDClient statsd; private final long interval; @@ -352,6 +353,11 @@ public void onClientStatErrorReceived() { clientStatsErrors.increment(); } + @Override + public void onAdditionalTagValueCardinalityBlocked(String tagKey) { + additionalTagValueCardinalityBlocked.increment(); + } + @Override public void onStatsAggregateDropped() { statsAggregateDropped.increment(); @@ -504,6 +510,11 @@ public void run(TracerHealthMetrics target) { "stats.dropped_aggregates", target.statsAggregateDropped, REASON_LRU_EVICTION_TAG); + reportIfChanged( + target.statsd, + "stats.additional_tag.cardinality_blocked", + target.additionalTagValueCardinalityBlocked, + NO_TAGS); } catch (ArrayIndexOutOfBoundsException e) { log.warn( @@ -637,6 +648,8 @@ public String summary() { + "\nclientStatsProcessedTraces=" + clientStatsProcessedTraces.sum() + "\nstatsAggregateDropped=" - + statsAggregateDropped.sum(); + + statsAggregateDropped.sum() + + "\nadditionalTagValueCardinalityBlocked=" + + additionalTagValueCardinalityBlocked.sum(); } } diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy index b5942084458..4fb752d2faa 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy @@ -39,6 +39,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { wellKnownTags, empty, [] as Set, + 100, features, HealthMetrics.NO_OP, sink, @@ -70,6 +71,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { wellKnownTags, [ignoredResourceName].toSet(), [] as Set, + 100, features, HealthMetrics.NO_OP, sink, @@ -107,6 +109,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, [] as Set, + 100, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, false) aggregator.start() @@ -154,6 +157,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, [] as Set, + 100, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, false) aggregator.start() @@ -201,6 +205,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, [] as Set, + 100, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, true) aggregator.start() @@ -267,6 +272,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.peerTags() >>> [["country"], ["country", "georegion"],] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, [] as Set, + 100, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, false) aggregator.start() @@ -335,6 +341,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.peerTags() >> ["peer.hostname", "_dd.base_service"] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, [] as Set, + 100, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, false) aggregator.start() @@ -387,7 +394,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true features.peerTags() >> [] - ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, [] as Set, features, HealthMetrics.NO_OP, + ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, [] as Set, 100, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, false) aggregator.start() @@ -441,6 +448,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, [] as Set, + 100, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, false) long duration = 100 List trace = [ @@ -514,6 +522,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, [] as Set, + 100, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, true) aggregator.start() @@ -642,6 +651,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, [] as Set, + 100, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, true) aggregator.start() @@ -758,6 +768,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, [] as Set, + 100, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, true) aggregator.start() @@ -829,6 +840,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, [] as Set, + 100, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, false) aggregator.start() @@ -899,6 +911,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, [] as Set, + 100, features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS, false) long duration = 100 aggregator.start() @@ -968,6 +981,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { HealthMetrics healthMetrics = Mock(HealthMetrics) ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, [] as Set, + 100, features, healthMetrics, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS, false) long duration = 100 aggregator.start() @@ -1003,6 +1017,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { HealthMetrics healthMetrics = Mock(HealthMetrics) ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, [] as Set, + 100, features, healthMetrics, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS, false) aggregator.start() @@ -1049,6 +1064,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, [] as Set, + 100, features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS, false) long duration = 100 aggregator.start() @@ -1152,6 +1168,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, [] as Set, + 100, features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, reportingInterval, SECONDS, false) long duration = 100 aggregator.start() @@ -1213,6 +1230,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, [] as Set, + 100, features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS, false) long duration = 100 aggregator.start() @@ -1265,6 +1283,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, [] as Set, + 100, features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS, false) long duration = 100 aggregator.start() @@ -1297,6 +1316,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, [] as Set, + 100, features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS, false) aggregator.start() @@ -1320,6 +1340,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, [] as Set, + 100, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, 200, MILLISECONDS, false) final spans = [ new SimpleSpan("service", "operation", "resource", "type", false, true, false, 0, 10, HTTP_OK) @@ -1353,6 +1374,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, [] as Set, + 100, features, HealthMetrics.NO_OP, sink, writer, maxAggregates, queueSize, 1, SECONDS, false) when: @@ -1387,6 +1409,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.supportsMetrics() >> true ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, [] as Set, + 100, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, false) aggregator.start() @@ -1435,6 +1458,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, [] as Set, + 100, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, false) aggregator.start() @@ -1491,6 +1515,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, [] as Set, + 100, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, true) aggregator.start() @@ -1583,6 +1608,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { features.peerTags() >> [] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, [] as Set, + 100, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, false) aggregator.start() diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/FootprintForkedTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/FootprintForkedTest.groovy index c65662660ee..07f18997f0b 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/FootprintForkedTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/FootprintForkedTest.groovy @@ -41,6 +41,7 @@ class FootprintForkedTest extends DDSpecification { new WellKnownTags("runtimeid","hostname", "env", "service", "version","language"), [].toSet() as Set, [] as Set, + 100, features, HealthMetrics.NO_OP, sink, diff --git a/dd-trace-core/src/test/java/datadog/trace/common/metrics/AdditionalTagsCardinalityLimiterTest.java b/dd-trace-core/src/test/java/datadog/trace/common/metrics/AdditionalTagsCardinalityLimiterTest.java new file mode 100644 index 00000000000..92fe9bb69b0 --- /dev/null +++ b/dd-trace-core/src/test/java/datadog/trace/common/metrics/AdditionalTagsCardinalityLimiterTest.java @@ -0,0 +1,93 @@ +package datadog.trace.common.metrics; + +import static datadog.trace.common.metrics.AdditionalTagsCardinalityLimiter.BLOCKED_VALUE; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotEquals; + +import datadog.trace.core.monitor.HealthMetrics; +import java.util.ArrayList; +import java.util.List; +import org.junit.jupiter.api.Test; + +class AdditionalTagsCardinalityLimiterTest { + + @Test + void belowLimitAdmitsAllValues() { + AdditionalTagsCardinalityLimiter limiter = + new AdditionalTagsCardinalityLimiter(100, HealthMetrics.NO_OP); + for (int i = 0; i < 99; i++) { + String v = "v" + i; + assertEquals(v, limiter.admitOrBlock("region", v)); + } + } + + @Test + void atLimitNextNewValueIsBlocked() { + AdditionalTagsCardinalityLimiter limiter = + new AdditionalTagsCardinalityLimiter(3, HealthMetrics.NO_OP); + assertEquals("a", limiter.admitOrBlock("region", "a")); + assertEquals("b", limiter.admitOrBlock("region", "b")); + assertEquals("c", limiter.admitOrBlock("region", "c")); + assertEquals(BLOCKED_VALUE, limiter.admitOrBlock("region", "d")); + } + + @Test + void alreadyAdmittedValueStaysAdmittedAfterCapHit() { + AdditionalTagsCardinalityLimiter limiter = + new AdditionalTagsCardinalityLimiter(3, HealthMetrics.NO_OP); + limiter.admitOrBlock("region", "a"); + limiter.admitOrBlock("region", "b"); + limiter.admitOrBlock("region", "c"); + limiter.admitOrBlock("region", "d"); // blocked + assertEquals("a", limiter.admitOrBlock("region", "a")); + assertEquals("b", limiter.admitOrBlock("region", "b")); + assertNotEquals(BLOCKED_VALUE, limiter.admitOrBlock("region", "c")); + } + + @Test + void differentTagsAreIndependent() { + AdditionalTagsCardinalityLimiter limiter = + new AdditionalTagsCardinalityLimiter(2, HealthMetrics.NO_OP); + limiter.admitOrBlock("customer_id", "x"); + limiter.admitOrBlock("customer_id", "y"); + assertEquals(BLOCKED_VALUE, limiter.admitOrBlock("customer_id", "z")); + // region should be completely unaffected + assertEquals("us-east-1", limiter.admitOrBlock("region", "us-east-1")); + assertEquals("eu-west-1", limiter.admitOrBlock("region", "eu-west-1")); + assertEquals(BLOCKED_VALUE, limiter.admitOrBlock("region", "ap-south-1")); + } + + @Test + void resetReadmitsPreviouslyBlockedValues() { + AdditionalTagsCardinalityLimiter limiter = + new AdditionalTagsCardinalityLimiter(2, HealthMetrics.NO_OP); + limiter.admitOrBlock("region", "a"); + limiter.admitOrBlock("region", "b"); + assertEquals(BLOCKED_VALUE, limiter.admitOrBlock("region", "c")); + limiter.reset(); + assertEquals("c", limiter.admitOrBlock("region", "c")); + } + + @Test + void healthMetricFiresOnBlock() { + RecordingHealthMetrics health = new RecordingHealthMetrics(); + AdditionalTagsCardinalityLimiter limiter = new AdditionalTagsCardinalityLimiter(2, health); + limiter.admitOrBlock("region", "a"); + limiter.admitOrBlock("region", "b"); + assertEquals(0, health.blocked.size()); + limiter.admitOrBlock("region", "c"); // blocked + limiter.admitOrBlock("region", "d"); // blocked + assertEquals(2, health.blocked.size()); + assertEquals("region", health.blocked.get(0)); + assertEquals("region", health.blocked.get(1)); + } + + private static final class RecordingHealthMetrics extends HealthMetrics { + final List blocked = new ArrayList<>(); + + @Override + public void onAdditionalTagValueCardinalityBlocked(String tagKey) { + blocked.add(tagKey); + } + } +} diff --git a/internal-api/src/main/java/datadog/trace/api/Config.java b/internal-api/src/main/java/datadog/trace/api/Config.java index d19ee39b4e1..cc3cace68cf 100644 --- a/internal-api/src/main/java/datadog/trace/api/Config.java +++ b/internal-api/src/main/java/datadog/trace/api/Config.java @@ -418,6 +418,7 @@ import static datadog.trace.api.config.GeneralConfig.TRACE_DEBUG; import static datadog.trace.api.config.GeneralConfig.TRACE_LOG_LEVEL; import static datadog.trace.api.config.GeneralConfig.TRACE_STATS_ADDITIONAL_TAGS; +import static datadog.trace.api.config.GeneralConfig.TRACE_STATS_ADDITIONAL_TAGS_CARDINALITY_LIMIT; import static datadog.trace.api.config.GeneralConfig.TRACE_STATS_COMPUTATION_ENABLED; import static datadog.trace.api.config.GeneralConfig.TRACE_STATS_COMPUTATION_IGNORE_AGENT_VERSION; import static datadog.trace.api.config.GeneralConfig.TRACE_TAGS; @@ -5113,6 +5114,18 @@ public Set getTraceStatsAdditionalTags() { return tryMakeImmutableSet(configProvider.getList(TRACE_STATS_ADDITIONAL_TAGS)); } + public int getTraceStatsAdditionalTagsCardinalityLimit() { + int configured = configProvider.getInteger(TRACE_STATS_ADDITIONAL_TAGS_CARDINALITY_LIMIT, 100); + if (configured <= 0) { + log.warn( + "Invalid {} value: {}; falling back to default of 100", + TRACE_STATS_ADDITIONAL_TAGS_CARDINALITY_LIMIT, + configured); + return 100; + } + return configured; + } + public String getEnv() { // intentionally not thread safe if (env == null) { diff --git a/metadata/supported-configurations.json b/metadata/supported-configurations.json index d5b2ab3b74c..b1c9d4288a9 100644 --- a/metadata/supported-configurations.json +++ b/metadata/supported-configurations.json @@ -10585,6 +10585,14 @@ "aliases": [] } ], + "DD_TRACE_STATS_ADDITIONAL_TAGS_CARDINALITY_LIMIT": [ + { + "version": "B", + "type": "number", + "default": "100", + "aliases": [] + } + ], "DD_TRACE_STATS_COMPUTATION_ENABLED": [ { "version": "B", From 4b981d77cb8758d81afd17346cca19cb9b73b8be Mon Sep 17 00:00:00 2001 From: Sam Maya Date: Wed, 13 May 2026 15:45:47 -0400 Subject: [PATCH 4/4] add protection against long chars --- .../AdditionalTagsCardinalityLimiter.java | 30 ++++++++++++++++--- .../metrics/ConflatingMetricsAggregator.java | 15 +++++++++- .../AdditionalTagsCardinalityLimiterTest.java | 29 ++++++++++++++++++ 3 files changed, 69 insertions(+), 5 deletions(-) diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/AdditionalTagsCardinalityLimiter.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AdditionalTagsCardinalityLimiter.java index 3d6b3bed265..6c35134aad9 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/AdditionalTagsCardinalityLimiter.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AdditionalTagsCardinalityLimiter.java @@ -27,7 +27,10 @@ final class AdditionalTagsCardinalityLimiter { private final int limitPerTag; private final HealthMetrics healthMetrics; private final ConcurrentHashMap> seenValuesPerTag = new ConcurrentHashMap<>(); - private final Set warnedThisWindow = Collections.newSetFromMap(new ConcurrentHashMap<>()); + private final Set warnedAboutCardinality = + Collections.newSetFromMap(new ConcurrentHashMap<>()); + private final Set warnedAboutLength = + Collections.newSetFromMap(new ConcurrentHashMap<>()); AdditionalTagsCardinalityLimiter(int limitPerTag, HealthMetrics healthMetrics) { this.limitPerTag = limitPerTag; @@ -46,7 +49,7 @@ String admitOrBlock(String tagKey, String value) { } if (seen.size() >= limitPerTag) { healthMetrics.onAdditionalTagValueCardinalityBlocked(tagKey); - if (warnedThisWindow.add(tagKey)) { + if (warnedAboutCardinality.add(tagKey)) { log.warn( "Additional metric tag '{}' exceeded the per-tag cardinality limit of {}; " + "replacing values with '{}' for the rest of the current window", @@ -60,11 +63,30 @@ String admitOrBlock(String tagKey, String value) { return value; } - /** Clears per-tag value sets and rearms the per-key log line. Invoked by the periodic task. */ + /** + * Records that a value for {@code tagKey} was blocked due to exceeding the per-value length cap. + * Fires the same health metric as a cardinality block and emits a distinct warn log line once per + * tag key per window. + */ + void noteBlockedDueToLength(String tagKey, int valueLength, int maxLength) { + healthMetrics.onAdditionalTagValueCardinalityBlocked(tagKey); + if (warnedAboutLength.add(tagKey)) { + log.warn( + "Additional metric tag '{}' had a value of length {} exceeding the max length of {}; " + + "replacing with '{}' for the rest of the current window", + tagKey, + valueLength, + maxLength, + BLOCKED_VALUE); + } + } + + /** Clears per-tag value sets and rearms the per-key log lines. Invoked by the periodic task. */ void reset() { for (Set seen : seenValuesPerTag.values()) { seen.clear(); } - warnedThisWindow.clear(); + warnedAboutCardinality.clear(); + warnedAboutLength.clear(); } } diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java index be7d2efe76d..1a9b5117f17 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java @@ -109,6 +109,11 @@ public final class ConflatingMetricsAggregator implements MetricsAggregator, Eve // while still allowing some additional tags to be used. static final int MAX_ADDITIONAL_TAG_KEYS = 10; + // Maximum length of an additional metric tag *value*. Caps cache footprint and wire payload + // size from stack-trace / JSON / SQL stuffed into a tag by misconfigured app code. Values + // exceeding this are emitted as `:blocked_by_tracer`. + static final int MAX_ADDITIONAL_TAG_VALUE_LENGTH = 250; + private final Set ignoredResources; private final List additionalTagKeys; private final AdditionalTagsCardinalityLimiter cardinalityLimiter; @@ -488,7 +493,15 @@ private List getAdditionalTags(CoreSpan span) { if (value == null) { continue; } - String admittedValue = cardinalityLimiter.admitOrBlock(tagKey, value.toString()); + String rawValue = value.toString(); + String admittedValue; + if (rawValue.length() > MAX_ADDITIONAL_TAG_VALUE_LENGTH) { + cardinalityLimiter.noteBlockedDueToLength( + tagKey, rawValue.length(), MAX_ADDITIONAL_TAG_VALUE_LENGTH); + admittedValue = AdditionalTagsCardinalityLimiter.BLOCKED_VALUE; + } else { + admittedValue = cardinalityLimiter.admitOrBlock(tagKey, rawValue); + } Pair, Function> cacheAndCreator = ADDITIONAL_TAG_VALUES_CACHE.computeIfAbsent(tagKey, ADDITIONAL_TAG_VALUES_CACHE_ADDER); UTF8BytesString formatted = diff --git a/dd-trace-core/src/test/java/datadog/trace/common/metrics/AdditionalTagsCardinalityLimiterTest.java b/dd-trace-core/src/test/java/datadog/trace/common/metrics/AdditionalTagsCardinalityLimiterTest.java index 92fe9bb69b0..f15596fca2e 100644 --- a/dd-trace-core/src/test/java/datadog/trace/common/metrics/AdditionalTagsCardinalityLimiterTest.java +++ b/dd-trace-core/src/test/java/datadog/trace/common/metrics/AdditionalTagsCardinalityLimiterTest.java @@ -82,6 +82,35 @@ void healthMetricFiresOnBlock() { assertEquals("region", health.blocked.get(1)); } + @Test + void noteBlockedDueToLengthFiresHealthMetric() { + RecordingHealthMetrics health = new RecordingHealthMetrics(); + AdditionalTagsCardinalityLimiter limiter = new AdditionalTagsCardinalityLimiter(100, health); + limiter.noteBlockedDueToLength("region", 500, 250); + assertEquals(1, health.blocked.size()); + assertEquals("region", health.blocked.get(0)); + } + + @Test + void lengthAndCardinalityBlocksAreCountedSeparatelyInHealth() { + RecordingHealthMetrics health = new RecordingHealthMetrics(); + AdditionalTagsCardinalityLimiter limiter = new AdditionalTagsCardinalityLimiter(2, health); + // exhaust cardinality + limiter.admitOrBlock("region", "a"); + limiter.admitOrBlock("region", "b"); + limiter.admitOrBlock("region", "c"); // cardinality block -> 1 health event + // length block on same tag -> 2 health events total + limiter.noteBlockedDueToLength("region", 500, 250); + assertEquals(2, health.blocked.size()); + // reset rearms both branches + limiter.reset(); + limiter.admitOrBlock("region", "x"); + limiter.admitOrBlock("region", "y"); + limiter.admitOrBlock("region", "z"); // cardinality block again -> 3 + limiter.noteBlockedDueToLength("region", 500, 250); // length block again -> 4 + assertEquals(4, health.blocked.size()); + } + private static final class RecordingHealthMetrics extends HealthMetrics { final List blocked = new ArrayList<>();