diff --git a/statsd/build.gradle b/statsd/build.gradle index 8325516..8ac716b 100644 --- a/statsd/build.gradle +++ b/statsd/build.gradle @@ -21,7 +21,7 @@ description = 'tally StatsD reporter' dependencies { - compile('com.datadoghq:java-dogstatsd-client:2.3') + compile('com.datadoghq:java-dogstatsd-client:4.0.0') compile project(path: ':tally-core', configuration: 'jmhFixturesUsageCompile') } diff --git a/statsd/src/jmh/java/com/uber/m3/tally/statsd/StatsdReporterBenchmark.java b/statsd/src/jmh/java/com/uber/m3/tally/statsd/StatsdReporterBenchmark.java index 07ec0f0..f365a02 100644 --- a/statsd/src/jmh/java/com/uber/m3/tally/statsd/StatsdReporterBenchmark.java +++ b/statsd/src/jmh/java/com/uber/m3/tally/statsd/StatsdReporterBenchmark.java @@ -21,6 +21,7 @@ package com.uber.m3.tally.statsd; import com.timgroup.statsd.NonBlockingStatsDClient; +import com.timgroup.statsd.NonBlockingStatsDClientBuilder; import com.timgroup.statsd.StatsDClient; import com.uber.m3.tally.AbstractReporterBenchmark; @@ -28,7 +29,11 @@ public class StatsdReporterBenchmark extends AbstractReporterBenchmark expectedStrs; - - StatsdAssertingUdpServer(String hostname, int port, Set expectedStrs) { - this.expectedStrs = expectedStrs; - - try { - this.socketAddress = new InetSocketAddress(InetAddress.getByName(hostname), port); - } catch (UnknownHostException e) { - throw new RuntimeException("Unable to open server", e); - } - } - - @Override - public synchronized void run() { - try (DatagramSocket serverSocket = new DatagramSocket(socketAddress)) { - serverSocket.setSoTimeout(TIMEOUT_MILLIS); - - for (int i = 0; i < expectedStrs.size(); i++) { - byte[] receiveData = new byte[RECEIVE_MAX_SIZE]; - - DatagramPacket receivePacket = new DatagramPacket(receiveData, receiveData.length); - serverSocket.receive(receivePacket); - - String receivedStr = new String(receivePacket.getData()); - - // Sometimes we get two messages at once - String[] strs = receivedStr.split("\n"); - - for (String str : strs) { - // Clean the received message - str = str.substring(0, str.lastIndexOf('|')); - - if (!expectedStrs.contains(str)) { - throw new IllegalStateException(String.format("Unexpected message: %s", str)); - } - } - } - } catch (Exception e) { - throw new RuntimeException("Exception while running server for assertions", e); - } - } -} diff --git a/statsd/src/main/java/com/uber/m3/tally/statsd/StatsdReporter.java b/statsd/src/main/java/com/uber/m3/tally/statsd/StatsdReporter.java index 5f76bce..d1e0366 100644 --- a/statsd/src/main/java/com/uber/m3/tally/statsd/StatsdReporter.java +++ b/statsd/src/main/java/com/uber/m3/tally/statsd/StatsdReporter.java @@ -78,20 +78,18 @@ public void close() { @Override public void reportCounter(String name, Map tags, long value) { - // We don't support tags for StatsD - statsdClient.count(name, value, sampleRate); + statsdClient.count(name, value, sampleRate, adaptTags(tags)); } @Override public void reportGauge(String name, Map tags, double value) { - // We don't support tags for StatsD - statsdClient.gauge(name, value, sampleRate); + statsdClient.gauge(name, value, sampleRate, adaptTags(tags)); } @Override public void reportTimer(String name, Map tags, Duration interval) { // We don't support tags for StatsD - statsdClient.time(name, interval.toMillis(), sampleRate); + statsdClient.time(name, interval.toMillis(), sampleRate, adaptTags(tags)); } @Override @@ -103,7 +101,6 @@ public void reportHistogramValueSamples( double bucketUpperBound, long samples ) { - // We don't support tags for StatsD statsdClient.count( bucketString( name, @@ -111,7 +108,8 @@ public void reportHistogramValueSamples( valueBucketString(bucketUpperBound) ), samples, - sampleRate + sampleRate, + adaptTags(tags) ); } @@ -132,7 +130,8 @@ public void reportHistogramDurationSamples( durationBucketString(bucketUpperBound) ), samples, - sampleRate + sampleRate, + adaptTags(tags) ); } @@ -163,4 +162,14 @@ private String durationBucketString(Duration bucketBound) { return bucketBound.toString(); } + + private String[] adaptTags(Map tags) { + if (tags == null) { + return null; + } + return tags.entrySet() + .stream() + .map(entry -> String.format("%s:%s", entry.getKey(), entry.getValue())) + .toArray(String[]::new); + } } diff --git a/statsd/src/test/java/com/uber/m3/tally/statsd/StatsdAssertingUdpServer.java b/statsd/src/test/java/com/uber/m3/tally/statsd/StatsdAssertingUdpServer.java new file mode 100644 index 0000000..97b1041 --- /dev/null +++ b/statsd/src/test/java/com/uber/m3/tally/statsd/StatsdAssertingUdpServer.java @@ -0,0 +1,154 @@ +// Copyright (c) 2021 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package com.uber.m3.tally.statsd; + +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class StatsdAssertingUdpServer implements Runnable { + private final int TIMEOUT_MILLIS = 1000; + private final int RECEIVE_MAX_SIZE = 1024; + private final SocketAddress socketAddress; + private final List errored; + private Set expected; + + StatsdAssertingUdpServer(String hostname, int port, Set expected) { + this.expected = expected; + this.errored = new ArrayList<>(); + + try { + this.socketAddress = new InetSocketAddress(InetAddress.getByName(hostname), port); + } catch (UnknownHostException e) { + throw new RuntimeException("Unable to open server", e); + } + } + + @Override + public synchronized void run() { + try (DatagramSocket serverSocket = new DatagramSocket(socketAddress)) { + serverSocket.setSoTimeout(TIMEOUT_MILLIS); + + for (int i = 0; i < expected.size(); i++) { + byte[] receiveData = new byte[RECEIVE_MAX_SIZE]; + + DatagramPacket receivePacket = new DatagramPacket(receiveData, receiveData.length); + serverSocket.receive(receivePacket); + + String receivedStr = new String(receivePacket.getData()); + + // Buffer might contain NUL chars at the end so we trim that + // And then we split the lines as sometimes we get two messages at once + String[] strs = receivedStr.trim().split("\n"); + + for (String str : strs) { + final ReportedMetric metric = ReportedMetric.valueOf(str); + if (!expected.contains(metric)) { + errored.add(metric); + } + } + } + } catch (Exception e) { + throw new RuntimeException("Exception while running server for assertions", e); + } + } + + public List getErrored() { + return errored; + } + + static class ReportedMetric { + + private static Pattern lineRegex = Pattern.compile( + "(?[a-z\\-.0-9]+):(?[a-z0-9.]+)(\\|(?\\w+))(\\|@\\d\\.\\d+){0,1}(\\|#(?\\S+)){0,1}"); + private String scope; + private String value; + private String type; + private Set tags; + + ReportedMetric(String scope, String value, String type, Set tags) { + this.scope = scope; + this.value = value; + this.type = type; + this.tags = tags; + } + + public static ReportedMetric valueOf(String line) { + final Matcher matcher = lineRegex.matcher(line); + if (!matcher.matches()) { + throw new RuntimeException("Input line cannot be handled"); + } + + Set tags = new HashSet<>(); + final String tagString = matcher.group("tags"); + if (tagString != null) { + tags.addAll(Arrays.asList(tagString.split(","))); + } + + return new ReportedMetric( + matcher.group("scope"), + matcher.group("value"), + matcher.group("type"), + tags); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ReportedMetric that = (ReportedMetric) o; + return Objects.equals(scope, that.scope) + && Objects.equals(value, that.value) + && Objects.equals(type, that.type) + && Objects.equals(tags, that.tags); + } + + @Override + public int hashCode() { + return Objects.hash(scope, value, type, tags); + } + + @Override + public String toString() { + return "ReportedMetric{" + + "scope='" + scope + '\'' + + ", value='" + value + '\'' + + ", type='" + type + '\'' + + ", tags=" + tags + + '}'; + } + } +} diff --git a/statsd/src/test/java/com/uber/m3/tally/statsd/StatsdReporterTest.java b/statsd/src/test/java/com/uber/m3/tally/statsd/StatsdReporterTest.java index 9a1891e..9027d26 100644 --- a/statsd/src/test/java/com/uber/m3/tally/statsd/StatsdReporterTest.java +++ b/statsd/src/test/java/com/uber/m3/tally/statsd/StatsdReporterTest.java @@ -21,17 +21,24 @@ package com.uber.m3.tally.statsd; import com.timgroup.statsd.NoOpStatsDClient; -import com.timgroup.statsd.NonBlockingStatsDClient; +import com.timgroup.statsd.NonBlockingStatsDClientBuilder; import com.timgroup.statsd.StatsDClient; import com.uber.m3.tally.CapableOf; import com.uber.m3.tally.DurationBuckets; import com.uber.m3.tally.ValueBuckets; +import com.uber.m3.tally.statsd.StatsdAssertingUdpServer.ReportedMetric; import com.uber.m3.util.Duration; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; import org.junit.Test; import java.util.HashSet; +import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; public class StatsdReporterTest { private final int PORT = 4434; @@ -41,29 +48,44 @@ public class StatsdReporterTest { @Test public void statsdClient() { - HashSet expectedStrs = new HashSet<>(); - expectedStrs.add("statsd-test.statsd-count:4|c"); - expectedStrs.add("statsd-test.statsd-gauge:1.5|g"); - expectedStrs.add("statsd-test.statsd-timer:250|ms"); - expectedStrs.add("statsd-test.statsd-histvalue.2000.000000-3000.000000:510|c"); - expectedStrs.add("statsd-test.statsd-histduration.19ms-20ms:1250|c"); - expectedStrs.add("statsd-test.statsd-histvalue-inf.-infinity-infinity:99|c"); - expectedStrs.add("statsd-test.statsd-histduration-inf.-infinity-infinity:999|c"); + Set expectedTags = new HashSet<>(); + expectedTags.add("key1:val1"); + expectedTags.add("key2:val:with:colons"); - StatsdAssertingUdpServer server = new StatsdAssertingUdpServer("localhost", PORT, expectedStrs); + Set expected = new HashSet<>(); + expected.add(new ReportedMetric("statsd-test.statsd-count", "4", "c", expectedTags)); + expected.add(new ReportedMetric("statsd-test.statsd-count-notags", "4", "c", new HashSet<>())); + expected.add(new ReportedMetric("statsd-test.statsd-gauge", "1.5", "g", expectedTags)); + expected.add(new ReportedMetric("statsd-test.statsd-timer", "250", "ms", expectedTags)); + expected.add(new ReportedMetric("statsd-test.statsd-histvalue.2000.000000-3000.000000", "510", "c", expectedTags)); + expected.add(new ReportedMetric("statsd-test.statsd-histduration.19ms-20ms", "1250", "c", expectedTags)); + expected.add(new ReportedMetric("statsd-test.statsd-histvalue-inf.-infinity-infinity", "99", "c", expectedTags)); + expected.add(new ReportedMetric("statsd-test.statsd-histduration-inf.-infinity-infinity", "999", "c", expectedTags)); + + StatsdAssertingUdpServer server = new StatsdAssertingUdpServer("localhost", PORT, expected); Thread serverThread = new Thread(server); serverThread.start(); - statsd = new NonBlockingStatsDClient("statsd-test", "localhost", PORT); + statsd = new NonBlockingStatsDClientBuilder() + .prefix("statsd-test") + .hostname("localhost") + .port(PORT) + .blocking(true) + .build(); reporter = new StatsdReporter(statsd); - reporter.reportCounter("statsd-count", null, 4); - reporter.reportGauge("statsd-gauge", null, 1.5); - reporter.reportTimer("statsd-timer", null, Duration.ofMillis(250)); + Map tags = new HashMap<>(); + tags.put("key1", "val1"); + tags.put("key2", "val:with:colons"); + + reporter.reportCounter("statsd-count", tags, 4); + reporter.reportCounter("statsd-count-notags", null, 4); + reporter.reportGauge("statsd-gauge", tags, 1.5); + reporter.reportTimer("statsd-timer", tags, Duration.ofMillis(250)); reporter.reportHistogramValueSamples( "statsd-histvalue", - null, + tags, ValueBuckets.linear(0, 1000, 6), 2000, 3000, @@ -71,7 +93,7 @@ public void statsdClient() { ); reporter.reportHistogramDurationSamples( "statsd-histduration", - null, + tags, DurationBuckets.linear(Duration.ofSeconds(10), Duration.ofSeconds(1), 11), Duration.ofMillis(19), Duration.ofMillis(20), @@ -79,7 +101,7 @@ public void statsdClient() { ); reporter.reportHistogramValueSamples( "statsd-histvalue-inf", - null, + tags, new ValueBuckets(new Double[]{-Double.MAX_VALUE, Double.MAX_VALUE}), -Double.MAX_VALUE, Double.MAX_VALUE, @@ -87,7 +109,7 @@ public void statsdClient() { ); reporter.reportHistogramDurationSamples( "statsd-histduration-inf", - null, + tags, new DurationBuckets(new Duration[]{Duration.MIN_VALUE, Duration.MAX_VALUE}), Duration.MIN_VALUE, Duration.MAX_VALUE, @@ -96,6 +118,8 @@ public void statsdClient() { statsd.stop(); reporter.close(); + + assertThat(server.getErrored(), is(new ArrayList<>())); } @Test