From 5d3a2a4b39236cb6644d501bac4ecf4386183895 Mon Sep 17 00:00:00 2001 From: rarokni Date: Mon, 13 Jun 2022 20:03:31 -0700 Subject: [PATCH] [Timeseries] Add Noop LAST Type 1 Metric --- .../metrics/core/typeone/Last.java | 41 ++++++ .../timeseriesflow/metrics/LastTest.java | 117 ++++++++++++++++++ .../src/test/resources/LastTest.json | 47 +++++++ 3 files changed, 205 insertions(+) create mode 100644 timeseries-streaming/timeseries-java-applications/TimeSeriesMetricsLibrary/src/main/java/com/google/dataflow/sample/timeseriesflow/metrics/core/typeone/Last.java create mode 100644 timeseries-streaming/timeseries-java-applications/TimeSeriesMetricsLibrary/src/test/java/com/google/dataflow/sample/timeseriesflow/metrics/LastTest.java create mode 100644 timeseries-streaming/timeseries-java-applications/TimeSeriesMetricsLibrary/src/test/resources/LastTest.json diff --git a/timeseries-streaming/timeseries-java-applications/TimeSeriesMetricsLibrary/src/main/java/com/google/dataflow/sample/timeseriesflow/metrics/core/typeone/Last.java b/timeseries-streaming/timeseries-java-applications/TimeSeriesMetricsLibrary/src/main/java/com/google/dataflow/sample/timeseriesflow/metrics/core/typeone/Last.java new file mode 100644 index 00000000..903303d6 --- /dev/null +++ b/timeseries-streaming/timeseries-java-applications/TimeSeriesMetricsLibrary/src/main/java/com/google/dataflow/sample/timeseriesflow/metrics/core/typeone/Last.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.dataflow.sample.timeseriesflow.metrics.core.typeone; + +import com.google.dataflow.sample.timeseriesflow.TimeSeriesData.TSAccum; +import com.google.dataflow.sample.timeseriesflow.TimeSeriesData.TSDataPoint; +import com.google.dataflow.sample.timeseriesflow.combiners.BTypeOne; +import com.google.dataflow.sample.timeseriesflow.datamap.AccumCoreNumericBuilder; +import org.apache.beam.sdk.options.PipelineOptions; + +/** A NOOP class used when a user only wants the Last Value */ +public class Last extends BTypeOne { + + public interface MaxOptions extends PipelineOptions {}; + + @Override + public TSAccum addInput(TSAccum accumulator, TSDataPoint dataPoint) { + AccumCoreNumericBuilder coreNumeric = new AccumCoreNumericBuilder(accumulator); + return coreNumeric.build(); + } + + @Override + public TSAccum mergeDataAccums(TSAccum a, TSAccum b) { + return a; + } +} diff --git a/timeseries-streaming/timeseries-java-applications/TimeSeriesMetricsLibrary/src/test/java/com/google/dataflow/sample/timeseriesflow/metrics/LastTest.java b/timeseries-streaming/timeseries-java-applications/TimeSeriesMetricsLibrary/src/test/java/com/google/dataflow/sample/timeseriesflow/metrics/LastTest.java new file mode 100644 index 00000000..9aa873e6 --- /dev/null +++ b/timeseries-streaming/timeseries-java-applications/TimeSeriesMetricsLibrary/src/test/java/com/google/dataflow/sample/timeseriesflow/metrics/LastTest.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.dataflow.sample.timeseriesflow.metrics; + +import com.google.dataflow.sample.timeseriesflow.DerivedAggregations.Indicators; +import com.google.dataflow.sample.timeseriesflow.TimeSeriesData.TSDataPoint; +import com.google.dataflow.sample.timeseriesflow.common.CommonUtils; +import com.google.dataflow.sample.timeseriesflow.graph.GenerateComputations; +import com.google.dataflow.sample.timeseriesflow.metrics.core.typeone.Last; +import com.google.dataflow.sample.timeseriesflow.options.TSFlowOptions; +import com.google.gson.stream.JsonReader; +import common.TSTestData; +import java.io.File; +import java.io.FileReader; +import java.util.Optional; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.TestStream; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.Values; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TypeDescriptors; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class LastTest { + + @Rule public final transient TestPipeline p = TestPipeline.create(); + + static final Instant NOW = Instant.parse("2000-01-01T00:00:00Z"); + + private static final String PRICE = "PRICE"; + + @Test + /** + * This is an integration test which will simulate a real computation type Volume Weighted Average + * Price + */ + public void testLast() throws Exception { + + String resourceName = "LastTest.json"; + ClassLoader classLoader = getClass().getClassLoader(); + File file = new File(classLoader.getResource(resourceName).getFile()); + String absolutePath = file.getAbsolutePath(); + + TSFlowOptions options = p.getOptions().as(TSFlowOptions.class); + options.setTypeOneComputationsLengthInSecs(5); + options.setTypeTwoComputationsLengthInSecs(5); + + TSTestData tsTestData = + TSTestData.toBuilder() + .setInputTSDataFromJSON( + new JsonReader(new FileReader(absolutePath)), + Duration.standardSeconds(5), + Duration.standardSeconds(5)) + .build(); + + TestStream stream = tsTestData.inputTSData(); + + GenerateComputations generateComputations = + GenerateComputations.fromPiplineOptions(options) + .setBasicType1Metrics(ImmutableList.of(Last.class)) + .build(); + + PCollection testStream = p.apply(stream); + + PCollection result = + testStream + .apply(generateComputations) + .apply(Values.create()) + .apply( + MapElements.into(TypeDescriptors.strings()) + .via( + x -> + x.getKey().getMajorKey() + + "::" + + Optional.ofNullable( + x.getDataStoreMap().get(Indicators.LAST.name())) + .orElse(CommonUtils.createNumData(0D)) + .getDoubleVal())); + + PAssert.that(result) + .inWindow( + new IntervalWindow(new Instant("2000-01-01T00:00:00Z"), Duration.standardSeconds(5))) + .containsInAnyOrder("Key-A::40.0") + .inWindow( + new IntervalWindow(new Instant("2000-01-01T00:00:05Z"), Duration.standardSeconds(5))) + .containsInAnyOrder("Key-A::10.0") + .inWindow( + new IntervalWindow(new Instant("2000-01-01T00:00:10Z"), Duration.standardSeconds(5))) + .empty(); + + p.run(); + } +} diff --git a/timeseries-streaming/timeseries-java-applications/TimeSeriesMetricsLibrary/src/test/resources/LastTest.json b/timeseries-streaming/timeseries-java-applications/TimeSeriesMetricsLibrary/src/test/resources/LastTest.json new file mode 100644 index 00000000..1c9e40e2 --- /dev/null +++ b/timeseries-streaming/timeseries-java-applications/TimeSeriesMetricsLibrary/src/test/resources/LastTest.json @@ -0,0 +1,47 @@ +[ + { + "key": { + "major_key": "Key-A", + "minor_key_string": "PRICE" + }, + "data": {"double_val": 10.0 + }, + "timestamp": "2000-01-01T00:00:00Z" + }, + + { + "key": { + "major_key": "Key-A", + "minor_key_string": "PRICE" + }, + "data": {"double_val": 20.0 + }, + "timestamp": "2000-01-01T00:00:01Z" + }, + + { + "key": { + "major_key": "Key-A", + "minor_key_string": "PRICE" + }, + "data": {"double_val": 40.0 + }, + "timestamp": "2000-01-01T00:00:02Z", + "time": { + "advance_watermark_seconds": 5 + } + }, + + { + "key": { + "major_key": "Key-A", + "minor_key_string": "PRICE" + }, + "data": {"double_val": 10.0 + }, + "timestamp": "2000-01-01T00:00:05Z", + "time": { + "advance_watermark_expression": "INFINITY" + } + } +] \ No newline at end of file