From 6b6802a3e6e9bca3bff1dfee41474096705aef87 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=ABl=20Marty?= <134835+joelmarty@users.noreply.github.com> Date: Thu, 21 Apr 2022 10:20:33 +0200 Subject: [PATCH 1/3] Bump statsd client version --- statsd/build.gradle | 2 +- .../com/uber/m3/tally/statsd/StatsdReporterBenchmark.java | 7 ++++++- .../java/com/uber/m3/tally/statsd/StatsdReporterTest.java | 7 ++++++- 3 files changed, 13 insertions(+), 3 deletions(-) diff --git a/statsd/build.gradle b/statsd/build.gradle index 8325516..8ac716b 100644 --- a/statsd/build.gradle +++ b/statsd/build.gradle @@ -21,7 +21,7 @@ description = 'tally StatsD reporter' dependencies { - compile('com.datadoghq:java-dogstatsd-client:2.3') + compile('com.datadoghq:java-dogstatsd-client:4.0.0') compile project(path: ':tally-core', configuration: 'jmhFixturesUsageCompile') } diff --git a/statsd/src/jmh/java/com/uber/m3/tally/statsd/StatsdReporterBenchmark.java b/statsd/src/jmh/java/com/uber/m3/tally/statsd/StatsdReporterBenchmark.java index 07ec0f0..f365a02 100644 --- a/statsd/src/jmh/java/com/uber/m3/tally/statsd/StatsdReporterBenchmark.java +++ b/statsd/src/jmh/java/com/uber/m3/tally/statsd/StatsdReporterBenchmark.java @@ -21,6 +21,7 @@ package com.uber.m3.tally.statsd; import com.timgroup.statsd.NonBlockingStatsDClient; +import com.timgroup.statsd.NonBlockingStatsDClientBuilder; import com.timgroup.statsd.StatsDClient; import com.uber.m3.tally.AbstractReporterBenchmark; @@ -28,7 +29,11 @@ public class StatsdReporterBenchmark extends AbstractReporterBenchmark Date: Thu, 21 Apr 2022 15:49:47 +0200 Subject: [PATCH 2/3] Add tag support for StatsdReporter --- .../statsd/StatsdAssertingUdpServer.java | 100 ++++++++++++++++-- .../uber/m3/tally/statsd/StatsdReporter.java | 25 +++-- .../m3/tally/statsd/StatsdReporterTest.java | 53 +++++++--- 3 files changed, 142 insertions(+), 36 deletions(-) diff --git a/statsd/src/main/java/com/uber/m3/tally/statsd/StatsdAssertingUdpServer.java b/statsd/src/main/java/com/uber/m3/tally/statsd/StatsdAssertingUdpServer.java index 74c418f..97b1041 100644 --- a/statsd/src/main/java/com/uber/m3/tally/statsd/StatsdAssertingUdpServer.java +++ b/statsd/src/main/java/com/uber/m3/tally/statsd/StatsdAssertingUdpServer.java @@ -26,16 +26,25 @@ import java.net.InetSocketAddress; import java.net.SocketAddress; import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Objects; import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; public class StatsdAssertingUdpServer implements Runnable { private final int TIMEOUT_MILLIS = 1000; private final int RECEIVE_MAX_SIZE = 1024; private final SocketAddress socketAddress; - private Set expectedStrs; + private final List errored; + private Set expected; - StatsdAssertingUdpServer(String hostname, int port, Set expectedStrs) { - this.expectedStrs = expectedStrs; + StatsdAssertingUdpServer(String hostname, int port, Set expected) { + this.expected = expected; + this.errored = new ArrayList<>(); try { this.socketAddress = new InetSocketAddress(InetAddress.getByName(hostname), port); @@ -49,7 +58,7 @@ public synchronized void run() { try (DatagramSocket serverSocket = new DatagramSocket(socketAddress)) { serverSocket.setSoTimeout(TIMEOUT_MILLIS); - for (int i = 0; i < expectedStrs.size(); i++) { + for (int i = 0; i < expected.size(); i++) { byte[] receiveData = new byte[RECEIVE_MAX_SIZE]; DatagramPacket receivePacket = new DatagramPacket(receiveData, receiveData.length); @@ -57,15 +66,14 @@ public synchronized void run() { String receivedStr = new String(receivePacket.getData()); - // Sometimes we get two messages at once - String[] strs = receivedStr.split("\n"); + // Buffer might contain NUL chars at the end so we trim that + // And then we split the lines as sometimes we get two messages at once + String[] strs = receivedStr.trim().split("\n"); for (String str : strs) { - // Clean the received message - str = str.substring(0, str.lastIndexOf('|')); - - if (!expectedStrs.contains(str)) { - throw new IllegalStateException(String.format("Unexpected message: %s", str)); + final ReportedMetric metric = ReportedMetric.valueOf(str); + if (!expected.contains(metric)) { + errored.add(metric); } } } @@ -73,4 +81,74 @@ public synchronized void run() { throw new RuntimeException("Exception while running server for assertions", e); } } + + public List getErrored() { + return errored; + } + + static class ReportedMetric { + + private static Pattern lineRegex = Pattern.compile( + "(?[a-z\\-.0-9]+):(?[a-z0-9.]+)(\\|(?\\w+))(\\|@\\d\\.\\d+){0,1}(\\|#(?\\S+)){0,1}"); + private String scope; + private String value; + private String type; + private Set tags; + + ReportedMetric(String scope, String value, String type, Set tags) { + this.scope = scope; + this.value = value; + this.type = type; + this.tags = tags; + } + + public static ReportedMetric valueOf(String line) { + final Matcher matcher = lineRegex.matcher(line); + if (!matcher.matches()) { + throw new RuntimeException("Input line cannot be handled"); + } + + Set tags = new HashSet<>(); + final String tagString = matcher.group("tags"); + if (tagString != null) { + tags.addAll(Arrays.asList(tagString.split(","))); + } + + return new ReportedMetric( + matcher.group("scope"), + matcher.group("value"), + matcher.group("type"), + tags); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ReportedMetric that = (ReportedMetric) o; + return Objects.equals(scope, that.scope) + && Objects.equals(value, that.value) + && Objects.equals(type, that.type) + && Objects.equals(tags, that.tags); + } + + @Override + public int hashCode() { + return Objects.hash(scope, value, type, tags); + } + + @Override + public String toString() { + return "ReportedMetric{" + + "scope='" + scope + '\'' + + ", value='" + value + '\'' + + ", type='" + type + '\'' + + ", tags=" + tags + + '}'; + } + } } diff --git a/statsd/src/main/java/com/uber/m3/tally/statsd/StatsdReporter.java b/statsd/src/main/java/com/uber/m3/tally/statsd/StatsdReporter.java index 5f76bce..d1e0366 100644 --- a/statsd/src/main/java/com/uber/m3/tally/statsd/StatsdReporter.java +++ b/statsd/src/main/java/com/uber/m3/tally/statsd/StatsdReporter.java @@ -78,20 +78,18 @@ public void close() { @Override public void reportCounter(String name, Map tags, long value) { - // We don't support tags for StatsD - statsdClient.count(name, value, sampleRate); + statsdClient.count(name, value, sampleRate, adaptTags(tags)); } @Override public void reportGauge(String name, Map tags, double value) { - // We don't support tags for StatsD - statsdClient.gauge(name, value, sampleRate); + statsdClient.gauge(name, value, sampleRate, adaptTags(tags)); } @Override public void reportTimer(String name, Map tags, Duration interval) { // We don't support tags for StatsD - statsdClient.time(name, interval.toMillis(), sampleRate); + statsdClient.time(name, interval.toMillis(), sampleRate, adaptTags(tags)); } @Override @@ -103,7 +101,6 @@ public void reportHistogramValueSamples( double bucketUpperBound, long samples ) { - // We don't support tags for StatsD statsdClient.count( bucketString( name, @@ -111,7 +108,8 @@ public void reportHistogramValueSamples( valueBucketString(bucketUpperBound) ), samples, - sampleRate + sampleRate, + adaptTags(tags) ); } @@ -132,7 +130,8 @@ public void reportHistogramDurationSamples( durationBucketString(bucketUpperBound) ), samples, - sampleRate + sampleRate, + adaptTags(tags) ); } @@ -163,4 +162,14 @@ private String durationBucketString(Duration bucketBound) { return bucketBound.toString(); } + + private String[] adaptTags(Map tags) { + if (tags == null) { + return null; + } + return tags.entrySet() + .stream() + .map(entry -> String.format("%s:%s", entry.getKey(), entry.getValue())) + .toArray(String[]::new); + } } diff --git a/statsd/src/test/java/com/uber/m3/tally/statsd/StatsdReporterTest.java b/statsd/src/test/java/com/uber/m3/tally/statsd/StatsdReporterTest.java index 07597ae..9027d26 100644 --- a/statsd/src/test/java/com/uber/m3/tally/statsd/StatsdReporterTest.java +++ b/statsd/src/test/java/com/uber/m3/tally/statsd/StatsdReporterTest.java @@ -21,18 +21,24 @@ package com.uber.m3.tally.statsd; import com.timgroup.statsd.NoOpStatsDClient; -import com.timgroup.statsd.NonBlockingStatsDClient; import com.timgroup.statsd.NonBlockingStatsDClientBuilder; import com.timgroup.statsd.StatsDClient; import com.uber.m3.tally.CapableOf; import com.uber.m3.tally.DurationBuckets; import com.uber.m3.tally.ValueBuckets; +import com.uber.m3.tally.statsd.StatsdAssertingUdpServer.ReportedMetric; import com.uber.m3.util.Duration; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; import org.junit.Test; import java.util.HashSet; +import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; public class StatsdReporterTest { private final int PORT = 4434; @@ -42,16 +48,21 @@ public class StatsdReporterTest { @Test public void statsdClient() { - HashSet expectedStrs = new HashSet<>(); - expectedStrs.add("statsd-test.statsd-count:4|c"); - expectedStrs.add("statsd-test.statsd-gauge:1.5|g"); - expectedStrs.add("statsd-test.statsd-timer:250|ms"); - expectedStrs.add("statsd-test.statsd-histvalue.2000.000000-3000.000000:510|c"); - expectedStrs.add("statsd-test.statsd-histduration.19ms-20ms:1250|c"); - expectedStrs.add("statsd-test.statsd-histvalue-inf.-infinity-infinity:99|c"); - expectedStrs.add("statsd-test.statsd-histduration-inf.-infinity-infinity:999|c"); + Set expectedTags = new HashSet<>(); + expectedTags.add("key1:val1"); + expectedTags.add("key2:val:with:colons"); - StatsdAssertingUdpServer server = new StatsdAssertingUdpServer("localhost", PORT, expectedStrs); + Set expected = new HashSet<>(); + expected.add(new ReportedMetric("statsd-test.statsd-count", "4", "c", expectedTags)); + expected.add(new ReportedMetric("statsd-test.statsd-count-notags", "4", "c", new HashSet<>())); + expected.add(new ReportedMetric("statsd-test.statsd-gauge", "1.5", "g", expectedTags)); + expected.add(new ReportedMetric("statsd-test.statsd-timer", "250", "ms", expectedTags)); + expected.add(new ReportedMetric("statsd-test.statsd-histvalue.2000.000000-3000.000000", "510", "c", expectedTags)); + expected.add(new ReportedMetric("statsd-test.statsd-histduration.19ms-20ms", "1250", "c", expectedTags)); + expected.add(new ReportedMetric("statsd-test.statsd-histvalue-inf.-infinity-infinity", "99", "c", expectedTags)); + expected.add(new ReportedMetric("statsd-test.statsd-histduration-inf.-infinity-infinity", "999", "c", expectedTags)); + + StatsdAssertingUdpServer server = new StatsdAssertingUdpServer("localhost", PORT, expected); Thread serverThread = new Thread(server); serverThread.start(); @@ -60,15 +71,21 @@ public void statsdClient() { .prefix("statsd-test") .hostname("localhost") .port(PORT) + .blocking(true) .build(); reporter = new StatsdReporter(statsd); - reporter.reportCounter("statsd-count", null, 4); - reporter.reportGauge("statsd-gauge", null, 1.5); - reporter.reportTimer("statsd-timer", null, Duration.ofMillis(250)); + Map tags = new HashMap<>(); + tags.put("key1", "val1"); + tags.put("key2", "val:with:colons"); + + reporter.reportCounter("statsd-count", tags, 4); + reporter.reportCounter("statsd-count-notags", null, 4); + reporter.reportGauge("statsd-gauge", tags, 1.5); + reporter.reportTimer("statsd-timer", tags, Duration.ofMillis(250)); reporter.reportHistogramValueSamples( "statsd-histvalue", - null, + tags, ValueBuckets.linear(0, 1000, 6), 2000, 3000, @@ -76,7 +93,7 @@ public void statsdClient() { ); reporter.reportHistogramDurationSamples( "statsd-histduration", - null, + tags, DurationBuckets.linear(Duration.ofSeconds(10), Duration.ofSeconds(1), 11), Duration.ofMillis(19), Duration.ofMillis(20), @@ -84,7 +101,7 @@ public void statsdClient() { ); reporter.reportHistogramValueSamples( "statsd-histvalue-inf", - null, + tags, new ValueBuckets(new Double[]{-Double.MAX_VALUE, Double.MAX_VALUE}), -Double.MAX_VALUE, Double.MAX_VALUE, @@ -92,7 +109,7 @@ public void statsdClient() { ); reporter.reportHistogramDurationSamples( "statsd-histduration-inf", - null, + tags, new DurationBuckets(new Duration[]{Duration.MIN_VALUE, Duration.MAX_VALUE}), Duration.MIN_VALUE, Duration.MAX_VALUE, @@ -101,6 +118,8 @@ public void statsdClient() { statsd.stop(); reporter.close(); + + assertThat(server.getErrored(), is(new ArrayList<>())); } @Test From 68e5fdea8a105000b5c0b9b2dbd9aa12d8bde9e4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=ABl=20Marty?= <134835+joelmarty@users.noreply.github.com> Date: Thu, 21 Apr 2022 15:51:43 +0200 Subject: [PATCH 3/3] Move StatsdAssertingUdpServer to test package It's just used in tests and should probably not be in the main package. --- .../java/com/uber/m3/tally/statsd/StatsdAssertingUdpServer.java | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename statsd/src/{main => test}/java/com/uber/m3/tally/statsd/StatsdAssertingUdpServer.java (100%) diff --git a/statsd/src/main/java/com/uber/m3/tally/statsd/StatsdAssertingUdpServer.java b/statsd/src/test/java/com/uber/m3/tally/statsd/StatsdAssertingUdpServer.java similarity index 100% rename from statsd/src/main/java/com/uber/m3/tally/statsd/StatsdAssertingUdpServer.java rename to statsd/src/test/java/com/uber/m3/tally/statsd/StatsdAssertingUdpServer.java