diff --git a/api/src/main/java/com/spotify/ffwd/model/v2/BatchMetadata.java b/api/src/main/java/com/spotify/ffwd/model/v2/BatchMetadata.java new file mode 100644 index 00000000..3f12d359 --- /dev/null +++ b/api/src/main/java/com/spotify/ffwd/model/v2/BatchMetadata.java @@ -0,0 +1,51 @@ +/*- + * -\-\- + * FastForward HTTP Module + * -- + * Copyright (C) 2016 - 2018 Spotify AB + * -- + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * -/-/- + */ + +package com.spotify.ffwd.model.v2; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableMap; +import java.util.Map; +import java.util.Optional; +import lombok.Data; +import lombok.EqualsAndHashCode; + +@Data +@JsonIgnoreProperties(ignoreUnknown = true) +@EqualsAndHashCode(of = { "commonTags", "commonResource" }) +public class BatchMetadata { + + private final Map commonTags; + private final Map commonResource; + + /** + * JSON creator. + */ + @JsonCreator + public static BatchMetadata create( + @JsonProperty("commonTags") final Optional> commonTags, + @JsonProperty("commonResource") final Optional> commonResource + ) { + return new BatchMetadata(commonTags.orElseGet(ImmutableMap::of), + commonResource.orElseGet(ImmutableMap::of)); + } +} diff --git a/modules/http/src/main/java/com/spotify/ffwd/http/HttpDecoder.java b/modules/http/src/main/java/com/spotify/ffwd/http/HttpDecoder.java index b7024521..71b019b4 100644 --- a/modules/http/src/main/java/com/spotify/ffwd/http/HttpDecoder.java +++ b/modules/http/src/main/java/com/spotify/ffwd/http/HttpDecoder.java @@ -25,9 +25,9 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.spotify.ffwd.model.v2.Batch; +import com.spotify.ffwd.model.v2.BatchMetadata; import com.spotify.ffwd.model.v2.Metric; import com.spotify.ffwd.model.v2.Value; -import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufInputStream; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler.Sharable; @@ -39,9 +39,11 @@ import io.netty.handler.codec.http.HttpVersion; import java.io.IOException; import java.io.InputStream; -import java.nio.charset.StandardCharsets; +import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.stream.Collectors; +import java.util.stream.IntStream; import javax.inject.Inject; import javax.inject.Named; import org.slf4j.Logger; @@ -49,6 +51,8 @@ @Sharable public class HttpDecoder extends MessageToMessageDecoder { + private final int MAX_BATCH_SIZE = 100_000; + private final int MAX_INPUT_MB = 800; private static final Logger log = LoggerFactory.getLogger(HttpDecoder.class); @@ -94,23 +98,34 @@ private boolean matchContentType(final FullHttpRequest in, final String expected private void postBatch( final ChannelHandlerContext ctx, final FullHttpRequest in, final List out ) { - final Object batch = convertToBatch(in); - out.add(batch); + final List batches = convertToBatches(in); + out.addAll(batches); ctx .channel() .writeAndFlush(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK)) .addListener((ChannelFutureListener) future -> future.channel().close()); } - private Object convertToBatch(final FullHttpRequest in) { + private List convertToBatches(final FullHttpRequest in) { final String endPoint = in.uri(); try (final InputStream inputStream = new ByteBufInputStream(in.content())) { + int inputSizeMb = inputStream.available() / 1000000; + if (inputSizeMb > MAX_INPUT_MB) { + BatchMetadata metadata = mapper.readValue(inputStream, BatchMetadata.class); + log.error( + "Input size is {}mb which is over the limit of {}mb. commonTags: {}, commonResource: {}", + inputSizeMb, + MAX_BATCH_SIZE, + metadata.getCommonTags(), + metadata.getCommonResource()); + throw new HttpException(HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE); + } if ("/v2/batch".equals(endPoint)) { - return mapper.readValue(inputStream, Batch.class); + return splitBatch(mapper.readValue(inputStream, Batch.class)); } else { com.spotify.ffwd.model.Batch batch = mapper.readValue(inputStream, com.spotify.ffwd.model.Batch.class); - return convert(batch); + return splitBatch(convert(batch)); } } catch (final IOException e) { log.error( @@ -119,6 +134,24 @@ private Object convertToBatch(final FullHttpRequest in) { } } + private List splitBatch(Batch batch) { + if (batch.getPoints().size() <= MAX_BATCH_SIZE) { + return Collections.singletonList(batch); + } + List points = batch.getPoints(); + int numBatches = points.size() / MAX_BATCH_SIZE + 1; + log.info("Splitting input into {} batches.", numBatches); + return IntStream.range(0, numBatches).mapToObj( + n -> points.subList(n * MAX_BATCH_SIZE, n == numBatches - 1 ? points.size() : (n + 1) * MAX_BATCH_SIZE)) + .map( + batchedPoints -> + Batch.create( + Optional.of(batch.getCommonTags()), + Optional.of(batch.getCommonResource()), + batchedPoints)) + .collect(Collectors.toList()); + } + private Batch convert(final com.spotify.ffwd.model.Batch batch) { List v1Point = batch.getPoints(); final List v2Point = v1Point