From 1de73c646316f0896389b6f46e64666db573035d Mon Sep 17 00:00:00 2001 From: Samantha Fadrigalan Date: Mon, 1 Mar 2021 16:30:23 -0600 Subject: [PATCH 1/5] Split batch --- .../com/spotify/ffwd/http/HttpDecoder.java | 35 +++++++++++++++---- 1 file changed, 28 insertions(+), 7 deletions(-) diff --git a/modules/http/src/main/java/com/spotify/ffwd/http/HttpDecoder.java b/modules/http/src/main/java/com/spotify/ffwd/http/HttpDecoder.java index b7024521..374006a4 100644 --- a/modules/http/src/main/java/com/spotify/ffwd/http/HttpDecoder.java +++ b/modules/http/src/main/java/com/spotify/ffwd/http/HttpDecoder.java @@ -27,7 +27,6 @@ import com.spotify.ffwd.model.v2.Batch; import com.spotify.ffwd.model.v2.Metric; import com.spotify.ffwd.model.v2.Value; -import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufInputStream; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler.Sharable; @@ -39,8 +38,10 @@ import io.netty.handler.codec.http.HttpVersion; import java.io.IOException; import java.io.InputStream; -import java.nio.charset.StandardCharsets; +import java.util.Collections; import java.util.List; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import javax.inject.Inject; import javax.inject.Named; @@ -49,6 +50,7 @@ @Sharable public class HttpDecoder extends MessageToMessageDecoder { + private final int MAX_BATCH_SIZE = 100_000; private static final Logger log = LoggerFactory.getLogger(HttpDecoder.class); @@ -94,23 +96,23 @@ private boolean matchContentType(final FullHttpRequest in, final String expected private void postBatch( final ChannelHandlerContext ctx, final FullHttpRequest in, final List out ) { - final Object batch = convertToBatch(in); - out.add(batch); + final List batches = convertToBatches(in); + out.addAll(batches); ctx .channel() .writeAndFlush(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK)) .addListener((ChannelFutureListener) future -> future.channel().close()); } - private Object convertToBatch(final FullHttpRequest in) { + private List convertToBatches(final FullHttpRequest in) { final String endPoint = in.uri(); try (final InputStream inputStream = new ByteBufInputStream(in.content())) { if ("/v2/batch".equals(endPoint)) { - return mapper.readValue(inputStream, Batch.class); + return splitBatch(mapper.readValue(inputStream, Batch.class)); } else { com.spotify.ffwd.model.Batch batch = mapper.readValue(inputStream, com.spotify.ffwd.model.Batch.class); - return convert(batch); + return splitBatch(convert(batch)); } } catch (final IOException e) { log.error( @@ -119,6 +121,25 @@ private Object convertToBatch(final FullHttpRequest in) { } } + private List splitBatch(Batch batch) { + if (batch.getPoints().size() <= MAX_BATCH_SIZE) { + return Collections.singletonList(batch); + } + List points = batch.getPoints(); + AtomicInteger counter = new AtomicInteger(); + return points.stream() + .collect(Collectors.groupingBy(it -> counter.getAndIncrement() / MAX_BATCH_SIZE)) + .values() + .stream() + .map( + batchedPoints -> + Batch.create( + Optional.of(batch.getCommonTags()), + Optional.of(batch.getCommonResource()), + batchedPoints)) + .collect(Collectors.toList()); + } + private Batch convert(final com.spotify.ffwd.model.Batch batch) { List v1Point = batch.getPoints(); final List v2Point = v1Point From 7c37b76c2f87b306a67216f5feda96678712ca8f Mon Sep 17 00:00:00 2001 From: Samantha Fadrigalan Date: Mon, 1 Mar 2021 17:26:50 -0600 Subject: [PATCH 2/5] Add some logging --- .../spotify/ffwd/model/v2/BatchMetadata.java | 31 +++++++++++++++++++ .../com/spotify/ffwd/http/HttpDecoder.java | 13 ++++++++ 2 files changed, 44 insertions(+) create mode 100644 api/src/main/java/com/spotify/ffwd/model/v2/BatchMetadata.java diff --git a/api/src/main/java/com/spotify/ffwd/model/v2/BatchMetadata.java b/api/src/main/java/com/spotify/ffwd/model/v2/BatchMetadata.java new file mode 100644 index 00000000..f206a117 --- /dev/null +++ b/api/src/main/java/com/spotify/ffwd/model/v2/BatchMetadata.java @@ -0,0 +1,31 @@ +package com.spotify.ffwd.model.v2; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableMap; +import java.util.Map; +import java.util.Optional; +import lombok.Data; +import lombok.EqualsAndHashCode; + +@Data +@JsonIgnoreProperties(ignoreUnknown = true) +@EqualsAndHashCode(of = { "commonTags", "commonResource" }) +public class BatchMetadata { + + private final Map commonTags; + private final Map commonResource; + + /** + * JSON creator. + */ + @JsonCreator + public static BatchMetadata create( + @JsonProperty("commonTags") final Optional> commonTags, + @JsonProperty("commonResource") final Optional> commonResource + ) { + return new BatchMetadata(commonTags.orElseGet(ImmutableMap::of), + commonResource.orElseGet(ImmutableMap::of)); + } +} diff --git a/modules/http/src/main/java/com/spotify/ffwd/http/HttpDecoder.java b/modules/http/src/main/java/com/spotify/ffwd/http/HttpDecoder.java index 374006a4..9abb2ced 100644 --- a/modules/http/src/main/java/com/spotify/ffwd/http/HttpDecoder.java +++ b/modules/http/src/main/java/com/spotify/ffwd/http/HttpDecoder.java @@ -25,6 +25,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.spotify.ffwd.model.v2.Batch; +import com.spotify.ffwd.model.v2.BatchMetadata; import com.spotify.ffwd.model.v2.Metric; import com.spotify.ffwd.model.v2.Value; import io.netty.buffer.ByteBufInputStream; @@ -51,6 +52,7 @@ @Sharable public class HttpDecoder extends MessageToMessageDecoder { private final int MAX_BATCH_SIZE = 100_000; + private final int MAX_INPUT_MB = 800; private static final Logger log = LoggerFactory.getLogger(HttpDecoder.class); @@ -107,6 +109,17 @@ private void postBatch( private List convertToBatches(final FullHttpRequest in) { final String endPoint = in.uri(); try (final InputStream inputStream = new ByteBufInputStream(in.content())) { + int inputSizeMb = inputStream.available() / 1000000; + if (inputSizeMb > MAX_INPUT_MB) { + BatchMetadata metadata = mapper.readValue(inputStream, BatchMetadata.class); + log.error( + "Input size is {}mb which is over the limit of {}mb. commonTags: {}, commonResource: {}", + inputSizeMb, + MAX_BATCH_SIZE, + metadata.getCommonTags(), + metadata.getCommonResource()); + throw new HttpException(HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE); + } if ("/v2/batch".equals(endPoint)) { return splitBatch(mapper.readValue(inputStream, Batch.class)); } else { From ad8c6a9fa654f74aa3df21b5ef4713b4b2401529 Mon Sep 17 00:00:00 2001 From: Samantha Fadrigalan Date: Mon, 1 Mar 2021 17:32:44 -0600 Subject: [PATCH 3/5] Change batching scheme --- .../http/src/main/java/com/spotify/ffwd/http/HttpDecoder.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/modules/http/src/main/java/com/spotify/ffwd/http/HttpDecoder.java b/modules/http/src/main/java/com/spotify/ffwd/http/HttpDecoder.java index 9abb2ced..be8245a5 100644 --- a/modules/http/src/main/java/com/spotify/ffwd/http/HttpDecoder.java +++ b/modules/http/src/main/java/com/spotify/ffwd/http/HttpDecoder.java @@ -139,9 +139,11 @@ private List splitBatch(Batch batch) { return Collections.singletonList(batch); } List points = batch.getPoints(); + int numBatches = points.size() / MAX_BATCH_SIZE + 1; + log.info("Splitting input into {} batches.", numBatches); AtomicInteger counter = new AtomicInteger(); return points.stream() - .collect(Collectors.groupingBy(it -> counter.getAndIncrement() / MAX_BATCH_SIZE)) + .collect(Collectors.groupingBy(it -> counter.getAndIncrement() / numBatches)) .values() .stream() .map( From 7a069b84c7157f0693453ad410317e22f3a38595 Mon Sep 17 00:00:00 2001 From: Samantha Fadrigalan Date: Tue, 2 Mar 2021 14:27:32 -0600 Subject: [PATCH 4/5] Split batch into continuous lists --- .../src/main/java/com/spotify/ffwd/http/HttpDecoder.java | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/modules/http/src/main/java/com/spotify/ffwd/http/HttpDecoder.java b/modules/http/src/main/java/com/spotify/ffwd/http/HttpDecoder.java index be8245a5..71b019b4 100644 --- a/modules/http/src/main/java/com/spotify/ffwd/http/HttpDecoder.java +++ b/modules/http/src/main/java/com/spotify/ffwd/http/HttpDecoder.java @@ -42,8 +42,8 @@ import java.util.Collections; import java.util.List; import java.util.Optional; -import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; +import java.util.stream.IntStream; import javax.inject.Inject; import javax.inject.Named; import org.slf4j.Logger; @@ -141,11 +141,8 @@ private List splitBatch(Batch batch) { List points = batch.getPoints(); int numBatches = points.size() / MAX_BATCH_SIZE + 1; log.info("Splitting input into {} batches.", numBatches); - AtomicInteger counter = new AtomicInteger(); - return points.stream() - .collect(Collectors.groupingBy(it -> counter.getAndIncrement() / numBatches)) - .values() - .stream() + return IntStream.range(0, numBatches).mapToObj( + n -> points.subList(n * MAX_BATCH_SIZE, n == numBatches - 1 ? points.size() : (n + 1) * MAX_BATCH_SIZE)) .map( batchedPoints -> Batch.create( From da8c0117418ef0dce40f2939e02e942d06eea88e Mon Sep 17 00:00:00 2001 From: Samantha Fadrigalan Date: Tue, 2 Mar 2021 14:30:21 -0600 Subject: [PATCH 5/5] Add header --- .../spotify/ffwd/model/v2/BatchMetadata.java | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/api/src/main/java/com/spotify/ffwd/model/v2/BatchMetadata.java b/api/src/main/java/com/spotify/ffwd/model/v2/BatchMetadata.java index f206a117..3f12d359 100644 --- a/api/src/main/java/com/spotify/ffwd/model/v2/BatchMetadata.java +++ b/api/src/main/java/com/spotify/ffwd/model/v2/BatchMetadata.java @@ -1,3 +1,23 @@ +/*- + * -\-\- + * FastForward HTTP Module + * -- + * Copyright (C) 2016 - 2018 Spotify AB + * -- + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * -/-/- + */ + package com.spotify.ffwd.model.v2; import com.fasterxml.jackson.annotation.JsonCreator;