From 25c98e82f80e3a4653315694840bc78b62d76a99 Mon Sep 17 00:00:00 2001 From: rarokni Date: Thu, 22 Apr 2021 14:32:11 +0800 Subject: [PATCH 1/4] [Retail] Remove ClickstreamAvro from Tests. Match Click Stream even to TG schema. Simplify session window for cleaner sample --- retail/retail-java-applications/README.MD | 2 +- .../clickstream/BackFillSessionData.java | 165 ------------- .../clickstream/ClickStreamSessions.java | 30 +-- .../clickstream/CountViewsPerProduct.java | 8 +- .../WriteAggregatesToBigTable.java | 5 +- .../EventItemCorrectionService.java | 9 +- .../validation/ValidateEventItems.java | 2 +- .../RetailCompanyServices.java | 2 +- .../core/utils/test/JSONUtilsTest.java | 49 +--- .../avrotestobjects/ClickStreamEventAVRO.java | 44 ---- .../test/avrotestobjects/InventoryAVRO.java | 4 + .../avrotestobjects/TransactionsAVRO.java | 5 +- .../clickstream/BackFillSessionDataTest.java | 46 +--- .../ClickStreamSessionTestUtil.java | 5 +- .../clickstream/CountViewsPerProductTest.java | 7 +- ...idateAndCorrectClickStreamEventsTests.java | 85 +++---- .../retail/dataobjects/ClickStream.java | 220 ++++++++++-------- .../sample/retail/dataobjects/Ecommerce.java | 50 ++++ .../sample/retail/dataobjects/Item.java | 130 +++++++++++ .../sample/retail/dataobjects/Purchase.java | 118 ++++++++++ .../RetailDataProcessingPipeline.java | 6 +- ...DataProcessingPipelineSimpleSmokeTest.java | 2 +- .../test/TestStreamGenerator.java | 197 +++++++--------- 23 files changed, 616 insertions(+), 575 deletions(-) delete mode 100644 retail/retail-java-applications/data-engineering-dept/business-logic/src/main/java/com/google/dataflow/sample/retail/businesslogic/core/transforms/clickstream/BackFillSessionData.java delete mode 100644 retail/retail-java-applications/data-engineering-dept/business-logic/src/test/java/com/google/dataflow/sample/retail/businesslogic/core/utils/test/avrotestobjects/ClickStreamEventAVRO.java create mode 100644 retail/retail-java-applications/data-engineering-dept/data-objects/src/main/java/com/google/dataflow/sample/retail/dataobjects/Ecommerce.java create mode 100644 retail/retail-java-applications/data-engineering-dept/data-objects/src/main/java/com/google/dataflow/sample/retail/dataobjects/Item.java create mode 100644 retail/retail-java-applications/data-engineering-dept/data-objects/src/main/java/com/google/dataflow/sample/retail/dataobjects/Purchase.java rename retail/retail-java-applications/data-engineering-dept/pipelines/src/test/java/com/google/dataflow/sample/retail/{pipeine => pipeline}/test/RetailDataProcessingPipelineSimpleSmokeTest.java (97%) rename retail/retail-java-applications/data-engineering-dept/pipelines/src/test/java/com/google/dataflow/sample/retail/{pipeine => pipeline}/test/TestStreamGenerator.java (57%) diff --git a/retail/retail-java-applications/README.MD b/retail/retail-java-applications/README.MD index e860c9d1..6a4ebe6f 100644 --- a/retail/retail-java-applications/README.MD +++ b/retail/retail-java-applications/README.MD @@ -65,7 +65,7 @@ The application processes the following types of data: ------ # To run the pipeline locally you can make use of the smoke test ``` -gradlew :data-engineering-dept:pipelines:test --tests com.google.dataflow.sample.retail.pipeine.test.RetailDataProcessingPipelineSimpleSmokeTest --info --rerun-tasks +gradlew :data-engineering-dept:pipelines:test --tests RetailDataProcessingPipelineSimpleSmokeTest --info --rerun-tasks ``` # Task patterns The application contains a number of task patterns that show the best way to accomplish Java programming tasks that are commonly needed to create this type of application. diff --git a/retail/retail-java-applications/data-engineering-dept/business-logic/src/main/java/com/google/dataflow/sample/retail/businesslogic/core/transforms/clickstream/BackFillSessionData.java b/retail/retail-java-applications/data-engineering-dept/business-logic/src/main/java/com/google/dataflow/sample/retail/businesslogic/core/transforms/clickstream/BackFillSessionData.java deleted file mode 100644 index 192dfcb9..00000000 --- a/retail/retail-java-applications/data-engineering-dept/business-logic/src/main/java/com/google/dataflow/sample/retail/businesslogic/core/transforms/clickstream/BackFillSessionData.java +++ /dev/null @@ -1,165 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.dataflow.sample.retail.businesslogic.core.transforms.clickstream; - -import com.google.auto.value.AutoValue; -import java.util.ArrayList; -import java.util.Comparator; -import java.util.List; -import java.util.stream.Collectors; -import java.util.stream.IntStream; -import java.util.stream.StreamSupport; -import org.apache.beam.sdk.transforms.MapElements; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.Row; -import org.apache.beam.sdk.values.TypeDescriptors; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions; - -@AutoValue -/** - * Takes a ROW generated with the {@link ClickStreamSessions} transform and back propagates fields - * throughout the session. Given a list of {@link - * com.google.dataflow.sample.retail.dataobjects.ClickStream.ClickStreamEvent} the fields provided - * in {@link BackFillSessionData#getBackPropogateFields()} will be back propagated to all events - * ordered by event time. - * - *

For example if foo is the field that is passed to {@link - * BackFillSessionData#getBackPropogateFields()} then the first occurrence of foo will be propagated - * to any events that have foo == null * - * - *

{@code
- * Given the following input
- * { key : sess_1
- *  [
- *  {event_1 : { timestamp : 1,  foo : null, bar : cup}}
- *  {event_2 : { timestamp : 2,  foo : null, bar : mug}}
- *  {event_3 : { timestamp : 3,  foo : 765   , bar : bottle }}
- *  {event_4 : { timestamp : 3,  foo : null   , bar : bottle }}
- *  ]
- * }
- *
- * The output will become
- * { key : sess_1
- *  [
- *  {event_1 : { timestamp : 1,  foo : 765, bar : cup}}
- *  {event_2 : { timestamp : 2,  foo : 765, bar : mug}}
- *  {event_3 : { timestamp : 3,  foo : 765, bar : bottle }}
- *  {event_4 : { timestamp : 4,  foo : null   , bar : bottle }}
- *  ]
- * }
- * }
- * - * If timestamp is missing from any of the elements, no correction is made as values can not be - * sorted by time order. Only values before the first non-null value are filled, values after the - * first non-null are assumed to be correct. - */ -public abstract class BackFillSessionData extends PTransform, PCollection> { - - public abstract List getBackPropogateFields(); - - public static BackFillSessionData create(List newBackPropogateFields) { - return builder().setBackPropogateFields(newBackPropogateFields).build(); - } - - public static Builder builder() { - return new AutoValue_BackFillSessionData.Builder(); - } - - @AutoValue.Builder - public abstract static class Builder { - public abstract Builder setBackPropogateFields(List backPropogateFields); - - public abstract BackFillSessionData build(); - } - - @Override - public PCollection expand(PCollection input) { - return input - .apply( - MapElements.into(TypeDescriptors.rows()) - .via(x -> backPropogate(x, getBackPropogateFields()))) - .setRowSchema(input.getSchema()); - } - - public static Row backPropogate(Row input, List fieldNames) { - - Iterable rows = input.getIterable("value"); - - List sortedRows = - StreamSupport.stream(rows.spliterator(), false).collect(Collectors.toList()); - - try { - sortedRows.sort( - new Comparator() { - - @Override - public int compare(Row o1, Row o2) { - Long timestamp1 = o1.getValue("timestamp"); - Long timestamp2 = o2.getValue("timestamp"); - Preconditions.checkNotNull(timestamp1); - Preconditions.checkNotNull(timestamp2); - return timestamp1.compareTo(timestamp2); - } - }); - } catch (NullPointerException npe) { - // If any of the timestamps are null we output without back propagation. - return input; - } - - // Loop through for all requested fieldNames - for (String fieldName : fieldNames) { - - // Find the first value which has a non null value - List correctedRows = new ArrayList<>(); - - int pos = - IntStream.range(0, sortedRows.size()) - .filter(i -> sortedRows.get(i).getValue(fieldName) != null) - .findFirst() // first occurrence - .orElse(-1); // No element found - - System.out.println(sortedRows); - - // Only correct values if there is at least one non-null values for the field and the first - // non-null value is not the first value seen - if (pos > 0) { - Row value = sortedRows.get(pos); - Object changeField = value.getValue(fieldName); - - // Update any fields that are empty in the list, stop when getting to first known non null - // pos - for (int i = 0; i < pos; i++) { - if (sortedRows.get(i).getValue(fieldName) == null) { - correctedRows.add(Row.fromRow(value).withFieldValue(fieldName, changeField).build()); - } else { - correctedRows.add(sortedRows.get(i)); - } - } - // Add the rest of the fields from pos onwards - IntStream.range(pos, sortedRows.size()).forEach(x -> correctedRows.add(sortedRows.get(x))); - } - if (correctedRows.size() > 0) { - sortedRows.clear(); - sortedRows.addAll(correctedRows); - correctedRows.clear(); - } - } - return Row.fromRow(input).withFieldValue("value", sortedRows).build(); - } -} diff --git a/retail/retail-java-applications/data-engineering-dept/business-logic/src/main/java/com/google/dataflow/sample/retail/businesslogic/core/transforms/clickstream/ClickStreamSessions.java b/retail/retail-java-applications/data-engineering-dept/business-logic/src/main/java/com/google/dataflow/sample/retail/businesslogic/core/transforms/clickstream/ClickStreamSessions.java index c1846289..5a54f26e 100644 --- a/retail/retail-java-applications/data-engineering-dept/business-logic/src/main/java/com/google/dataflow/sample/retail/businesslogic/core/transforms/clickstream/ClickStreamSessions.java +++ b/retail/retail-java-applications/data-engineering-dept/business-logic/src/main/java/com/google/dataflow/sample/retail/businesslogic/core/transforms/clickstream/ClickStreamSessions.java @@ -33,12 +33,12 @@ /** * This transform creates sessions from the incoming clickstream using the cliendId and - * SessionWindows. The output is a ROW with schema: + * SessionWindows. The output is a * *
{@code
- * Field Name        Field Type
- * key               ROW{clientID:STRING}
- * values	         ITERABLE[ROW[{@link com.google.dataflow.sample.retail.dataobjects.ClickStream.ClickStreamEvent}]]
+ * Field Name	    Field Type
+ * key	            ROW{clientID:STRING}
+ * value	        ITERABLE[ROW[ClickstreamEvent]]
  * }
*/ @Experimental @@ -71,17 +71,19 @@ public ClickStreamSessions withSessionWindowGapDuration(Duration sessionWindowGa } @Override - /** - * Returns a Row object in the format: - * - *
{@code
-   * Field Name	    Field Type
-   * key	        ROW{clientID:STRING}
-   * values	        ITERABLE[ROW[ClickstreamEvent]]
-   * }
- */ public PCollection expand(PCollection input) { - Preconditions.checkNotNull(this.getSessionWindowGapDuration(), "Must set a session value."); + + Preconditions.checkNotNull( + this.getSessionWindowGapDuration(), "Must set a session gap duration."); + /* + * Group.byFiledNames returns a Row object in the format: + * + *
{@code
+     * Field Name	    Field Type
+     * key	            ROW{clientID:STRING}
+     * value	        ITERABLE[ROW[ClickstreamEvent]]
+     * }
+ */ return input .apply(Window.into(Sessions.withGapDuration(getSessionWindowGapDuration()))) .apply(Group.byFieldNames("client_id")); diff --git a/retail/retail-java-applications/data-engineering-dept/business-logic/src/main/java/com/google/dataflow/sample/retail/businesslogic/core/transforms/clickstream/CountViewsPerProduct.java b/retail/retail-java-applications/data-engineering-dept/business-logic/src/main/java/com/google/dataflow/sample/retail/businesslogic/core/transforms/clickstream/CountViewsPerProduct.java index 5452e332..7d723fa7 100644 --- a/retail/retail-java-applications/data-engineering-dept/business-logic/src/main/java/com/google/dataflow/sample/retail/businesslogic/core/transforms/clickstream/CountViewsPerProduct.java +++ b/retail/retail-java-applications/data-engineering-dept/business-logic/src/main/java/com/google/dataflow/sample/retail/businesslogic/core/transforms/clickstream/CountViewsPerProduct.java @@ -67,9 +67,7 @@ public PCollection expand(PCollection input) { .apply(Filter.create().whereFieldName("event", c -> c.equals("browse"))) // Group By pageRef and count the results. .apply(Window.into(FixedWindows.of(pageViewCountWindowDuration))) - .apply( - Group.byFieldNames("pageRef") - .aggregateField("pageRef", Count.combineFn(), "count")) + .apply(Group.byFieldNames("page").aggregateField("page", Count.combineFn(), "count")) .apply(CreatePageViewAggregatorMetadata.create(pageViewCountWindowDuration.getMillis())); } @@ -97,14 +95,14 @@ public PCollection expand(PCollection input) { // TODO the schema registry for PageViewAggregator throws a class cast issue Schema schema = Schema.of( - Field.of("pageRef", FieldType.STRING), + Field.of("page", FieldType.STRING), Field.of("count", FieldType.INT64), Field.of("startTime", FieldType.INT64), Field.of("durationMS", FieldType.INT64)); return input // Note key and value are results of Group + Count operation in the previous transform. - .apply(Select.fieldNames("key.pageRef", "value.count")) + .apply(Select.fieldNames("key.page", "value.count")) // We need to add these fields to the ROW object before we convert the POJO .apply( AddFields.create() diff --git a/retail/retail-java-applications/data-engineering-dept/business-logic/src/main/java/com/google/dataflow/sample/retail/businesslogic/core/transforms/clickstream/WriteAggregatesToBigTable.java b/retail/retail-java-applications/data-engineering-dept/business-logic/src/main/java/com/google/dataflow/sample/retail/businesslogic/core/transforms/clickstream/WriteAggregatesToBigTable.java index 1d757676..fb9b9d90 100644 --- a/retail/retail-java-applications/data-engineering-dept/business-logic/src/main/java/com/google/dataflow/sample/retail/businesslogic/core/transforms/clickstream/WriteAggregatesToBigTable.java +++ b/retail/retail-java-applications/data-engineering-dept/business-logic/src/main/java/com/google/dataflow/sample/retail/businesslogic/core/transforms/clickstream/WriteAggregatesToBigTable.java @@ -92,13 +92,12 @@ public void process(@Element PageViewAggregator input, OutputReceiver throws UnsupportedEncodingException { Put put = - new Put( - String.format("%s-%s", input.getPageRef(), input.getStartTime()).getBytes("UTF-8")); + new Put(String.format("%s-%s", input.getPage(), input.getStartTime()).getBytes("UTF-8")); String charset = "UTF-8"; // TODO This should never be Null eliminate bug. - String pageRef = Optional.ofNullable(input.getPageRef()).orElse(""); + String pageRef = Optional.ofNullable(input.getPage()).orElse(""); Long count = Optional.ofNullable(input.getCount()).orElse(0L); put.addColumn( diff --git a/retail/retail-java-applications/data-engineering-dept/business-logic/src/main/java/com/google/dataflow/sample/retail/businesslogic/core/transforms/clickstream/validation/EventItemCorrectionService.java b/retail/retail-java-applications/data-engineering-dept/business-logic/src/main/java/com/google/dataflow/sample/retail/businesslogic/core/transforms/clickstream/validation/EventItemCorrectionService.java index 492d8564..8051de41 100644 --- a/retail/retail-java-applications/data-engineering-dept/business-logic/src/main/java/com/google/dataflow/sample/retail/businesslogic/core/transforms/clickstream/validation/EventItemCorrectionService.java +++ b/retail/retail-java-applications/data-engineering-dept/business-logic/src/main/java/com/google/dataflow/sample/retail/businesslogic/core/transforms/clickstream/validation/EventItemCorrectionService.java @@ -18,7 +18,7 @@ package com.google.dataflow.sample.retail.businesslogic.core.transforms.clickstream.validation; import com.google.dataflow.sample.retail.businesslogic.externalservices.RetailCompanyServices; -import com.google.dataflow.sample.retail.dataobjects.ClickStream.Item; +import com.google.dataflow.sample.retail.dataobjects.Item; import java.util.ArrayList; import java.util.Collection; import java.util.List; @@ -73,7 +73,7 @@ public void finishBundle(FinishBundleContext fbc) { for (_WindowWrappedEvent event : cache) { Row row = event.eventData.getRow("data"); - Collection items = row.getArray("items"); + Collection items = row.getRow("ecommerce").getArray("items"); List updatedItems = new ArrayList<>(); for (Row item : items) { @@ -88,7 +88,9 @@ public void finishBundle(FinishBundleContext fbc) { .build()); } - Row newDataRow = Row.fromRow(row).withFieldValue("items", updatedItems).build(); + Row itemsRow = + Row.fromRow(row.getRow("ecommerce")).withFieldValue("items", updatedItems).build(); + Row newDataRow = Row.fromRow(row).withFieldValue("ecommerce", itemsRow).build(); fbc.output( Row.fromRow(event.eventData).withFieldValue("data", newDataRow).build(), @@ -108,6 +110,7 @@ private List populateIds(List<_WindowWrappedEvent> events) { x -> x.eventData .getRow("data") + .getRow("ecommerce") .getArray("items") .forEach(y -> ids.add(((Row) y).getString("item_id")))); diff --git a/retail/retail-java-applications/data-engineering-dept/business-logic/src/main/java/com/google/dataflow/sample/retail/businesslogic/core/transforms/clickstream/validation/ValidateEventItems.java b/retail/retail-java-applications/data-engineering-dept/business-logic/src/main/java/com/google/dataflow/sample/retail/businesslogic/core/transforms/clickstream/validation/ValidateEventItems.java index ed465936..aefc6d6b 100644 --- a/retail/retail-java-applications/data-engineering-dept/business-logic/src/main/java/com/google/dataflow/sample/retail/businesslogic/core/transforms/clickstream/validation/ValidateEventItems.java +++ b/retail/retail-java-applications/data-engineering-dept/business-logic/src/main/java/com/google/dataflow/sample/retail/businesslogic/core/transforms/clickstream/validation/ValidateEventItems.java @@ -51,7 +51,7 @@ public void process(@Element Row input, @Timestamp Instant timestamp, MultiOutpu // If the event is of a type that needs Item to be present, do checks if (chkItemRequired(data)) { - Collection items = data.getArray("items"); + Collection items = data.getRow("ecommerce").getArray("items"); // If no items this is not recoverable, send to dead letter. if (items == null || items.size() == 0) { diff --git a/retail/retail-java-applications/data-engineering-dept/business-logic/src/main/java/com/google/dataflow/sample/retail/businesslogic/externalservices/RetailCompanyServices.java b/retail/retail-java-applications/data-engineering-dept/business-logic/src/main/java/com/google/dataflow/sample/retail/businesslogic/externalservices/RetailCompanyServices.java index 68d11d25..527e7beb 100644 --- a/retail/retail-java-applications/data-engineering-dept/business-logic/src/main/java/com/google/dataflow/sample/retail/businesslogic/externalservices/RetailCompanyServices.java +++ b/retail/retail-java-applications/data-engineering-dept/business-logic/src/main/java/com/google/dataflow/sample/retail/businesslogic/externalservices/RetailCompanyServices.java @@ -17,7 +17,7 @@ */ package com.google.dataflow.sample.retail.businesslogic.externalservices; -import com.google.dataflow.sample.retail.dataobjects.ClickStream.Item; +import com.google.dataflow.sample.retail.dataobjects.Item; import java.util.HashMap; import java.util.List; import java.util.Map; diff --git a/retail/retail-java-applications/data-engineering-dept/business-logic/src/test/java/com/google/dataflow/sample/retail/businesslogic/core/utils/test/JSONUtilsTest.java b/retail/retail-java-applications/data-engineering-dept/business-logic/src/test/java/com/google/dataflow/sample/retail/businesslogic/core/utils/test/JSONUtilsTest.java index d1a517c3..aacea50d 100644 --- a/retail/retail-java-applications/data-engineering-dept/business-logic/src/test/java/com/google/dataflow/sample/retail/businesslogic/core/utils/test/JSONUtilsTest.java +++ b/retail/retail-java-applications/data-engineering-dept/business-logic/src/test/java/com/google/dataflow/sample/retail/businesslogic/core/utils/test/JSONUtilsTest.java @@ -18,17 +18,13 @@ package com.google.dataflow.sample.retail.businesslogic.core.utils.test; import com.google.dataflow.sample.retail.businesslogic.core.utils.JSONUtils; -import com.google.dataflow.sample.retail.businesslogic.core.utils.test.avrotestobjects.ClickStreamEventAVRO; import com.google.dataflow.sample.retail.dataobjects.ClickStream.ClickStreamEvent; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.JsonToRow; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.vendor.grpc.v1p26p0.com.google.gson.Gson; -import org.apache.beam.vendor.grpc.v1p26p0.com.google.gson.GsonBuilder; import org.joda.time.Instant; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; @@ -41,40 +37,25 @@ public class JSONUtilsTest { private static final ClickStreamEvent AUTO_VALUE_EVENT = ClickStreamEvent.builder() - .setUid(1L) + .setUid(999L) .setClientId("1") - .setAgent("A") - .setPageRef("pageRef") - .setPageTarget("pageTarget") + .setPage("pageRef") + .setPagePrevious("pageTarget") .setEvent("event") - .setUid(1L) .setTimestamp(TIME) .build(); - private ClickStreamEventAVRO getEventAVRO() { - ClickStreamEventAVRO event = new ClickStreamEventAVRO(); - event.clientId = "1"; - event.uid = 1L; - event.agent = "A"; - event.pageRef = "pageRef"; - event.pageTarget = "pageTarget"; - event.event = "event"; - event.timestamp = TIME; - - return event; - } + private static final String JSON = + "{\"eventTime\":null,\"event\":\"event\",\"timestamp\":946656000000,\"user_id\":999,\"client_id\":\"1\",\"page\":\"pageRef\",\"page_previous\":\"pageTarget\",\"ecommerce\":null}\n"; @Rule public transient TestPipeline pipeline = TestPipeline.create(); @Test public void testParseCleanClickstream() { - Gson gson = new Gson(); - String jsonString = gson.toJson(getEventAVRO()); - PCollection events = pipeline - .apply(Create.of(jsonString)) + .apply(Create.of(JSON)) .apply(JSONUtils.ConvertJSONtoPOJO.create(ClickStreamEvent.class)); PAssert.that(events).containsInAnyOrder(AUTO_VALUE_EVENT); @@ -85,12 +66,7 @@ public void testParseCleanClickstream() { @Test public void testParseWithStrictNullsClickstream() { - ClickStreamEventAVRO withNull = getEventAVRO(); - withNull.uid = null; - - Gson gson = new GsonBuilder().serializeNulls().create(); - - String jsonString = gson.toJson(withNull); + String jsonString = JSON.replace("999", "null"); PCollection events = pipeline @@ -105,12 +81,7 @@ public void testParseWithStrictNullsClickstream() { @Test public void testParseWithNonStrictNullsClickstream() { - ClickStreamEventAVRO withNull = getEventAVRO(); - withNull.uid = null; - - Gson gson = new GsonBuilder().create(); - - String jsonString = gson.toJson(withNull); + String jsonString = JSON.replace("999", "null"); PCollection events = pipeline @@ -125,9 +96,7 @@ public void testParseWithNonStrictNullsClickstream() { @Test public void testParseWithTypeErrorClickstream() { - Gson gson = new Gson(); - String jsonString = gson.toJson(getEventAVRO()); - jsonString = jsonString.replace("\"A\"", "1.0"); + String jsonString = JSON.replace("\"event\"", "1.0"); PCollection events = pipeline diff --git a/retail/retail-java-applications/data-engineering-dept/business-logic/src/test/java/com/google/dataflow/sample/retail/businesslogic/core/utils/test/avrotestobjects/ClickStreamEventAVRO.java b/retail/retail-java-applications/data-engineering-dept/business-logic/src/test/java/com/google/dataflow/sample/retail/businesslogic/core/utils/test/avrotestobjects/ClickStreamEventAVRO.java deleted file mode 100644 index a933bcaa..00000000 --- a/retail/retail-java-applications/data-engineering-dept/business-logic/src/test/java/com/google/dataflow/sample/retail/businesslogic/core/utils/test/avrotestobjects/ClickStreamEventAVRO.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.dataflow.sample.retail.businesslogic.core.utils.test.avrotestobjects; - -import org.apache.avro.reflect.Nullable; -import org.apache.beam.sdk.coders.AvroCoder; -import org.apache.beam.sdk.coders.DefaultCoder; -import org.apache.beam.vendor.grpc.v1p26p0.com.google.gson.annotations.SerializedName; - -@DefaultCoder(AvroCoder.class) -/** Used as part of utility for creation of JSON with {@link Gson}. */ -public class ClickStreamEventAVRO { - public @Nullable long timestamp; - - @SerializedName(value = "user_id") - public @Nullable Long uid; - - @SerializedName(value = "client_id") - public @Nullable String clientId; - - @SerializedName(value = "event_datetime") - public @Nullable String eventDateTime; - - public @Nullable String pageRef; - public @Nullable String pageTarget; - public @Nullable String agent; - public @Nullable String event; - public @Nullable boolean transaction; -} diff --git a/retail/retail-java-applications/data-engineering-dept/business-logic/src/test/java/com/google/dataflow/sample/retail/businesslogic/core/utils/test/avrotestobjects/InventoryAVRO.java b/retail/retail-java-applications/data-engineering-dept/business-logic/src/test/java/com/google/dataflow/sample/retail/businesslogic/core/utils/test/avrotestobjects/InventoryAVRO.java index 38320eba..5da5043a 100644 --- a/retail/retail-java-applications/data-engineering-dept/business-logic/src/test/java/com/google/dataflow/sample/retail/businesslogic/core/utils/test/avrotestobjects/InventoryAVRO.java +++ b/retail/retail-java-applications/data-engineering-dept/business-logic/src/test/java/com/google/dataflow/sample/retail/businesslogic/core/utils/test/avrotestobjects/InventoryAVRO.java @@ -22,6 +22,10 @@ import org.apache.beam.sdk.coders.DefaultCoder; @DefaultCoder(AvroCoder.class) +/** + * Used as part of utility for creation of JSON with {@link Gson}. TODO Remove in favour of raw + * String for the JSON. + */ public class InventoryAVRO { public @Nullable long timestamp; diff --git a/retail/retail-java-applications/data-engineering-dept/business-logic/src/test/java/com/google/dataflow/sample/retail/businesslogic/core/utils/test/avrotestobjects/TransactionsAVRO.java b/retail/retail-java-applications/data-engineering-dept/business-logic/src/test/java/com/google/dataflow/sample/retail/businesslogic/core/utils/test/avrotestobjects/TransactionsAVRO.java index 79aa14d9..8404c79c 100644 --- a/retail/retail-java-applications/data-engineering-dept/business-logic/src/test/java/com/google/dataflow/sample/retail/businesslogic/core/utils/test/avrotestobjects/TransactionsAVRO.java +++ b/retail/retail-java-applications/data-engineering-dept/business-logic/src/test/java/com/google/dataflow/sample/retail/businesslogic/core/utils/test/avrotestobjects/TransactionsAVRO.java @@ -22,7 +22,10 @@ import org.apache.beam.sdk.coders.DefaultCoder; import org.apache.beam.vendor.grpc.v1p26p0.com.google.gson.Gson; -/** Used as part of utility for creation of JSON with {@link Gson}. */ +/** + * Used as part of utility for creation of JSON with {@link Gson}. TODO Remove in favour of raw + * String for the JSON. + */ @DefaultCoder(AvroCoder.class) public class TransactionsAVRO { public @Nullable long timestamp; diff --git a/retail/retail-java-applications/data-engineering-dept/business-logic/src/test/java/com/google/dataflow/sample/retail/businesslogic/core/utils/test/clickstream/BackFillSessionDataTest.java b/retail/retail-java-applications/data-engineering-dept/business-logic/src/test/java/com/google/dataflow/sample/retail/businesslogic/core/utils/test/clickstream/BackFillSessionDataTest.java index fb905074..2b873f28 100644 --- a/retail/retail-java-applications/data-engineering-dept/business-logic/src/test/java/com/google/dataflow/sample/retail/businesslogic/core/utils/test/clickstream/BackFillSessionDataTest.java +++ b/retail/retail-java-applications/data-engineering-dept/business-logic/src/test/java/com/google/dataflow/sample/retail/businesslogic/core/utils/test/clickstream/BackFillSessionDataTest.java @@ -17,11 +17,8 @@ */ package com.google.dataflow.sample.retail.businesslogic.core.utils.test.clickstream; -import com.google.dataflow.sample.retail.businesslogic.core.transforms.clickstream.BackFillSessionData; import com.google.dataflow.sample.retail.businesslogic.core.transforms.clickstream.ClickStreamSessions; -import java.util.Objects; import org.apache.beam.sdk.schemas.transforms.Convert; -import org.apache.beam.sdk.schemas.transforms.Select; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; @@ -29,8 +26,6 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.Row; -import org.apache.beam.sdk.values.TimestampedValue; -import org.apache.beam.vendor.grpc.v1p26p0.com.google.common.collect.ImmutableList; import org.joda.time.Duration; import org.junit.Rule; import org.junit.Test; @@ -44,7 +39,7 @@ public class BackFillSessionDataTest { * This test provides values from time 0 to time 2 with no agent, time 3 has a agent and time 4 is * again null */ - public void testBackFillSessionization() throws Exception { + public void testSessionization() throws Exception { Duration windowDuration = Duration.standardMinutes(5); @@ -54,43 +49,17 @@ public void testBackFillSessionization() throws Exception { pipeline .apply( Create.timestamped( - TimestampedValue.of( - ClickStreamSessionTestUtil.CLICK_STREAM_EVENT_0_MINS - .getValue() - .toBuilder() - .setAgent(null) - .build(), - ClickStreamSessionTestUtil.CLICK_STREAM_EVENT_0_MINS.getTimestamp()), - TimestampedValue.of( - ClickStreamSessionTestUtil.CLICK_STREAM_EVENT_1_MINS - .getValue() - .toBuilder() - .setAgent(null) - .build(), - ClickStreamSessionTestUtil.CLICK_STREAM_EVENT_1_MINS.getTimestamp()), - TimestampedValue.of( - ClickStreamSessionTestUtil.CLICK_STREAM_EVENT_2_MINS - .getValue() - .toBuilder() - .setAgent(null) - .build(), - ClickStreamSessionTestUtil.CLICK_STREAM_EVENT_2_MINS.getTimestamp()), + ClickStreamSessionTestUtil.CLICK_STREAM_EVENT_0_MINS, + ClickStreamSessionTestUtil.CLICK_STREAM_EVENT_1_MINS, + ClickStreamSessionTestUtil.CLICK_STREAM_EVENT_2_MINS, ClickStreamSessionTestUtil.CLICK_STREAM_EVENT_3_MINS, - TimestampedValue.of( - ClickStreamSessionTestUtil.CLICK_STREAM_EVENT_4_MINS - .getValue() - .toBuilder() - .setAgent(null) - .build(), - ClickStreamSessionTestUtil.CLICK_STREAM_EVENT_4_MINS.getTimestamp()), + ClickStreamSessionTestUtil.CLICK_STREAM_EVENT_4_MINS, ClickStreamSessionTestUtil.CLICK_STREAM_EVENT_10_MINS)) .apply(Convert.toRows()) .apply(ClickStreamSessions.create(windowDuration)) - .apply(BackFillSessionData.create(ImmutableList.of("agent"))) - .apply(Select.fieldNames("value.agent")) .apply(ParDo.of(new ExtractUserIDCountFromRow())); - PAssert.that(sessions).containsInAnyOrder(4L, 1L); + PAssert.that(sessions).containsInAnyOrder(5L, 1L); pipeline.run(); } @@ -98,8 +67,7 @@ public void testBackFillSessionization() throws Exception { static class ExtractUserIDCountFromRow extends DoFn { @ProcessElement public void process(ProcessContext pc) { - long count = pc.element().getArray("agent").stream().filter(Objects::nonNull).count(); - System.out.println(count); + long count = pc.element().getArray("value").size(); pc.output(count); } } diff --git a/retail/retail-java-applications/data-engineering-dept/business-logic/src/test/java/com/google/dataflow/sample/retail/businesslogic/core/utils/test/clickstream/ClickStreamSessionTestUtil.java b/retail/retail-java-applications/data-engineering-dept/business-logic/src/test/java/com/google/dataflow/sample/retail/businesslogic/core/utils/test/clickstream/ClickStreamSessionTestUtil.java index b37ff719..2a06b54b 100644 --- a/retail/retail-java-applications/data-engineering-dept/business-logic/src/test/java/com/google/dataflow/sample/retail/businesslogic/core/utils/test/clickstream/ClickStreamSessionTestUtil.java +++ b/retail/retail-java-applications/data-engineering-dept/business-logic/src/test/java/com/google/dataflow/sample/retail/businesslogic/core/utils/test/clickstream/ClickStreamSessionTestUtil.java @@ -30,9 +30,8 @@ public class ClickStreamSessionTestUtil { ClickStreamEvent.builder() .setUid(1L) .setClientId("1") - .setAgent("A") - .setPageRef("pageRef") - .setPageTarget("pageTarget") + .setPage("pageRef") + .setPagePrevious("pageTarget") .setEvent("browse") .setUid(1L) .setTimestamp(TIME) diff --git a/retail/retail-java-applications/data-engineering-dept/business-logic/src/test/java/com/google/dataflow/sample/retail/businesslogic/core/utils/test/clickstream/CountViewsPerProductTest.java b/retail/retail-java-applications/data-engineering-dept/business-logic/src/test/java/com/google/dataflow/sample/retail/businesslogic/core/utils/test/clickstream/CountViewsPerProductTest.java index f805eb1b..b2093d02 100644 --- a/retail/retail-java-applications/data-engineering-dept/business-logic/src/test/java/com/google/dataflow/sample/retail/businesslogic/core/utils/test/clickstream/CountViewsPerProductTest.java +++ b/retail/retail-java-applications/data-engineering-dept/business-logic/src/test/java/com/google/dataflow/sample/retail/businesslogic/core/utils/test/clickstream/CountViewsPerProductTest.java @@ -43,9 +43,8 @@ public class CountViewsPerProductTest { ClickStreamEvent.builder() .setUid(1L) .setClientId("1") - .setAgent("A") - .setPageRef("pageRef") - .setPageTarget("pageTarget") + .setPage("pageRef") + .setPagePrevious("pageTarget") .setEvent("browse") .setTimestamp(TIME) .build(); @@ -78,7 +77,7 @@ public void testCountViews() { PageViewAggregator pageViewAggregator = PageViewAggregator.builder() .setCount(2L) - .setPageRef("pageRef") + .setPage("pageRef") .setDurationMS(windowDuration.getMillis()) .setStartTime(TIME) .build(); diff --git a/retail/retail-java-applications/data-engineering-dept/business-logic/src/test/java/com/google/dataflow/sample/retail/businesslogic/core/utils/test/clickstream/ValidateAndCorrectClickStreamEventsTests.java b/retail/retail-java-applications/data-engineering-dept/business-logic/src/test/java/com/google/dataflow/sample/retail/businesslogic/core/utils/test/clickstream/ValidateAndCorrectClickStreamEventsTests.java index 01a699c1..4ce9fb0f 100644 --- a/retail/retail-java-applications/data-engineering-dept/business-logic/src/test/java/com/google/dataflow/sample/retail/businesslogic/core/utils/test/clickstream/ValidateAndCorrectClickStreamEventsTests.java +++ b/retail/retail-java-applications/data-engineering-dept/business-logic/src/test/java/com/google/dataflow/sample/retail/businesslogic/core/utils/test/clickstream/ValidateAndCorrectClickStreamEventsTests.java @@ -21,7 +21,8 @@ import com.google.dataflow.sample.retail.businesslogic.core.transforms.DeadLetterSink.SinkType; import com.google.dataflow.sample.retail.businesslogic.core.transforms.clickstream.ValidateAndCorrectCSEvt; import com.google.dataflow.sample.retail.dataobjects.ClickStream.ClickStreamEvent; -import com.google.dataflow.sample.retail.dataobjects.ClickStream.Item; +import com.google.dataflow.sample.retail.dataobjects.Ecommerce; +import com.google.dataflow.sample.retail.dataobjects.Item; import org.apache.beam.sdk.schemas.NoSuchSchemaException; import org.apache.beam.sdk.schemas.SchemaCoder; import org.apache.beam.sdk.schemas.transforms.Convert; @@ -48,9 +49,8 @@ public class ValidateAndCorrectClickStreamEventsTests { .setEventTime("2000-01-01 00:00:00") .setUid(1L) .setClientId("1") - .setAgent("A") - .setPageRef("pageRef") - .setPageTarget("pageTarget") + .setPage("pageRef") + .setPagePrevious("pageTarget") .setEvent("event") .build(), Instant.ofEpochMilli(TIME)); @@ -59,9 +59,8 @@ public class ValidateAndCorrectClickStreamEventsTests { ClickStreamEvent.builder() .setEventTime("2000-XX-01 00:00:00") .setClientId("1") - .setAgent("A") - .setPageRef("pageRef") - .setPageTarget("pageTarget") + .setPage("pageRef") + .setPagePrevious("pageTarget") .setEvent("event") .setUid(1L) .build(); @@ -71,9 +70,8 @@ public class ValidateAndCorrectClickStreamEventsTests { .setEventTime("2000-01-02 00:00:00") .setUid(1L) .setClientId("1") - .setAgent("A") - .setPageRef("pageRef") - .setPageTarget("pageTarget") + .setPage("pageRef") + .setPagePrevious("pageTarget") .setEvent("event") .setUid(1L) .build(); @@ -82,11 +80,13 @@ public class ValidateAndCorrectClickStreamEventsTests { ClickStreamEvent.builder() .setEventTime("2000-01-01 00:00:00") .setClientId("1") - .setAgent("A") - .setPageRef("pageRef") - .setPageTarget("pageTarget") + .setPage("pageRef") + .setPagePrevious("pageTarget") .setEvent("add_to_cart") - .setItems(ImmutableList.of(Item.builder().setPrice("1").setItemId("1").build())) + .setEcommerce( + Ecommerce.builder() + .setItems(ImmutableList.of(Item.builder().setPrice(1F).setItemId("1").build())) + .build()) .build(); private static final ClickStreamEvent CLEAN_DATE_TIMESTAMP = @@ -94,9 +94,8 @@ public class ValidateAndCorrectClickStreamEventsTests { .setEventTime("2000-XX-01 00:00:00") .setUid(1L) .setClientId("1") - .setAgent("A") - .setPageRef("pageRef") - .setPageTarget("pageTarget") + .setPage("pageRef") + .setPagePrevious("pageTarget") .setEvent("event") .build(); @@ -104,11 +103,13 @@ public class ValidateAndCorrectClickStreamEventsTests { ClickStreamEvent.builder() .setEventTime("2000-0X-01 00:00:00") .setClientId("1") - .setAgent("A") - .setPageRef("pageRef") - .setPageTarget("pageTarget") + .setPage("pageRef") + .setPagePrevious("pageTarget") .setEvent("add_to_cart") - .setItems(ImmutableList.of(Item.builder().setPrice("1").setItemId("1").build())) + .setEcommerce( + Ecommerce.builder() + .setItems(ImmutableList.of(Item.builder().setPrice(1F).setItemId("1").build())) + .build()) .build(); @Rule public transient TestPipeline pipeline = TestPipeline.create(); @@ -199,15 +200,18 @@ public void testMissingItems() throws NoSuchSchemaException { MISSING_ITEM_INFO .toBuilder() .setTimestamp(TIME) - .setItems( - ImmutableList.of( - Item.builder() - .setPrice("1") - .setItemId("1") - .setItemName("foo_name") - .setItemBrand("item_brand") - .setItemCat01("foo_category") - .build())) + .setEcommerce( + Ecommerce.builder() + .setItems( + ImmutableList.of( + Item.builder() + .setPrice(1F) + .setItemId("1") + .setItemName("foo_name") + .setItemBrand("item_brand") + .setItemCat01("foo_category") + .build())) + .build()) .build()); pipeline.run(); } @@ -235,15 +239,18 @@ public void testBadDateFormatAndMissingItems() throws NoSuchSchemaException { .containsInAnyOrder( MISSING_ITEM_INFO_BAD_DATE .toBuilder() - .setItems( - ImmutableList.of( - Item.builder() - .setPrice("1") - .setItemId("1") - .setItemName("foo_name") - .setItemBrand("item_brand") - .setItemCat01("foo_category") - .build())) + .setEcommerce( + Ecommerce.builder() + .setItems( + ImmutableList.of( + Item.builder() + .setPrice(1F) + .setItemId("1") + .setItemName("foo_name") + .setItemBrand("item_brand") + .setItemCat01("foo_category") + .build())) + .build()) .setTimestamp(TIME) .build()); pipeline.run(); diff --git a/retail/retail-java-applications/data-engineering-dept/data-objects/src/main/java/com/google/dataflow/sample/retail/dataobjects/ClickStream.java b/retail/retail-java-applications/data-engineering-dept/data-objects/src/main/java/com/google/dataflow/sample/retail/dataobjects/ClickStream.java index 08fdc304..f2d2cdb9 100644 --- a/retail/retail-java-applications/data-engineering-dept/data-objects/src/main/java/com/google/dataflow/sample/retail/dataobjects/ClickStream.java +++ b/retail/retail-java-applications/data-engineering-dept/data-objects/src/main/java/com/google/dataflow/sample/retail/dataobjects/ClickStream.java @@ -18,7 +18,6 @@ package com.google.dataflow.sample.retail.dataobjects; import com.google.auto.value.AutoValue; -import java.util.List; import javax.annotation.Nullable; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.schemas.AutoValueSchema; @@ -27,7 +26,113 @@ /** * Objects used for dealing with clickstream within the pipeline and schemas for I/O of clickstream - * events. + * events. Example events: + * + *

View Item + * + *

{@code
+ *     {
+ *   "event_datetime":"2020-11-16 22:59:59",
+ *   "event": "view_item",
+ *   "user_id": "UID00003",
+ *   "client_id": "CID00003",
+ *   "page":"/product-67890",
+ *   "page_previous": "/category-tshirts",
+ *   "ecommerce": {
+ *     "items": [{
+ *       "item_name": "Donut Friday Scented T-Shirt",
+ *       "item_id": "67890",
+ *       "price": 33.75,
+ *       "item_brand": "Google",
+ *       "item_category": "Apparel",
+ *       "item_category_2": "Mens",
+ *       "item_category_3": "Shirts",
+ *       "item_category_4": "Tshirts",
+ *       "item_variant": "Black",
+ *       "item_list_name": "Search Results",
+ *       "item_list_id": "SR123",
+ *       "index": 1,
+ *       "quantity": 1
+ *     }]
+ *   }
+ * }
+ *
+ *
+ * }
+ * + * add_to_cart + * + *
{@code
+ *     {
+ *   "event_datetime":"2020-11-16 20:59:59",
+ *   "event": "add_to_cart",
+ *   "user_id": "UID00003",
+ *   "client_id": "CID00003",
+ *   "page":"/product-67890",
+ *   "page_previous": "/category-tshirts",
+ *   "ecommerce": {
+ *     "items": [{
+ *       "item_name": "Donut Friday Scented T-Shirt",
+ *       "item_id": "67890",
+ *       "price": 33.75,
+ *       "item_brand": "Google",
+ *       "item_category": "Apparel",
+ *       "item_category_2": "Mens",
+ *       "item_category_3": "Shirts",
+ *       "item_category_4": "Tshirts",
+ *       "item_variant": "Black",
+ *       "item_list_name": "Search Results",
+ *       "item_list_id": "SR123",
+ *       "index": 1,
+ *       "quantity": 2
+ *     }]
+ *   }
+ * }
+ *
+ * }
+ * + * purchase + * + *
{@code
+ * {
+ *   "event_datetime":"2020-11-16 20:59:59",
+ *   "event": "purchase",
+ *   "user_id": "UID00001",
+ *   "client_id": "CID00003",
+ *   "page":"/checkout",
+ *   "page_previous": "/order-confirmation",
+ *   "ecommerce": {
+ *     "purchase": {
+ *       "transaction_id": "T12345",
+ *       "affiliation": "Online Store",
+ *       "value": 35.43,
+ *       "tax": 4.90,
+ *       "shipping": 5.99,
+ *       "currency": "EUR",
+ *       "coupon": "SUMMER_SALE",
+ *       "items": [{
+ *         "item_name": "Triblend Android T-Shirt",
+ *         "item_id": "12345",
+ *         "item_price": 15.25,
+ *         "item_brand": "Google",
+ *         "item_category": "Apparel",
+ *         "item_variant": "Gray",
+ *         "quantity": 1,
+ *         "item_coupon": ""
+ *       }, {
+ *         "item_name": "Donut Friday Scented T-Shirt",
+ *         "item_id": "67890",
+ *         "item_price": 33.75,
+ *         "item_brand": "Google",
+ *         "item_category": "Apparel",
+ *         "item_variant": "Black",
+ *         "quantity": 1
+ *       }]
+ *     }
+ *   }
+ * }
+ *
+ * }
*/ @Experimental public class ClickStream { @@ -55,17 +160,14 @@ public abstract static class ClickStreamEvent { @SchemaFieldName("client_id") public @Nullable abstract String getClientId(); - @SchemaFieldName("pageRef") - public @Nullable abstract String getPageRef(); + @SchemaFieldName("page") + public @Nullable abstract String getPage(); - @SchemaFieldName("pageTarget") - public @Nullable abstract String getPageTarget(); + @SchemaFieldName("page_previous") + public @Nullable abstract String getPagePrevious(); - @SchemaFieldName("agent") - public @Nullable abstract String getAgent(); - - @SchemaFieldName("items") - public @Nullable abstract List getItems(); + @SchemaFieldName("ecommerce") + public @Nullable abstract Ecommerce getEcommerce(); public abstract Builder toBuilder(); @@ -86,13 +188,11 @@ public abstract static class Builder { public abstract Builder setClientId(String value); - public abstract Builder setPageRef(String value); - - public abstract Builder setPageTarget(String value); + public abstract Builder setPage(String value); - public abstract Builder setAgent(String value); + public abstract Builder setPagePrevious(String value); - public abstract Builder setItems(List value); + public abstract Builder setEcommerce(Ecommerce ecommerce); public abstract ClickStreamEvent build(); } @@ -109,90 +209,6 @@ public static class ClickStreamBigTableSchema { public static final String PAGE_VIEW_AGGREGATION_COL_PAGE_VIEW_COUNT = "pageViewCount"; } - @AutoValue - @DefaultSchema(AutoValueSchema.class) - public abstract static class Item { - - @SchemaFieldName("item_name") - public @Nullable abstract String getItemName(); - - @SchemaFieldName("item_id") - public @Nullable abstract String getItemId(); - - @SchemaFieldName("price") - public @Nullable abstract String getPrice(); - - @SchemaFieldName("item_brand") - public @Nullable abstract String getItemBrand(); - - @SchemaFieldName("item_category") - public @Nullable abstract String getItemCat01(); - - @SchemaFieldName("item_category_2") - public @Nullable abstract String getItemCat02(); - - @SchemaFieldName("item_category_3") - public @Nullable abstract String getItemCat03(); - - @SchemaFieldName("item_category_4") - public @Nullable abstract String getItemCat04(); - - @SchemaFieldName("item_category_5") - public @Nullable abstract String getItemCat05(); - - @SchemaFieldName("item_variant") - public @Nullable abstract String getItemVariant(); - - @SchemaFieldName("item_list_name") - public @Nullable abstract String getItemListName(); - - @SchemaFieldName("item_list_id") - public @Nullable abstract String getItemListId(); - - @SchemaFieldName("index") - public @Nullable abstract String getIndex(); - - @SchemaFieldName("quantity") - public @Nullable abstract String getQuantity(); - - public static Builder builder() { - return new AutoValue_ClickStream_Item.Builder(); - } - - @AutoValue.Builder - public abstract static class Builder { - public abstract Builder setItemName(String newItemName); - - public abstract Builder setItemId(String newItemId); - - public abstract Builder setPrice(String newPrice); - - public abstract Builder setItemBrand(String newItemBrand); - - public abstract Builder setItemCat01(String newItemCat01); - - public abstract Builder setItemCat02(String newItemCat02); - - public abstract Builder setItemCat03(String newItemCat03); - - public abstract Builder setItemCat04(String newItemCat04); - - public abstract Builder setItemCat05(String newItemCat05); - - public abstract Builder setItemVariant(String newItemVariant); - - public abstract Builder setItemListName(String newItemListName); - - public abstract Builder setItemListId(String newItemListId); - - public abstract Builder setIndex(String newIndex); - - public abstract Builder setQuantity(String newQuantity); - - public abstract Item build(); - } - } - @AutoValue @DefaultSchema(AutoValueSchema.class) public abstract static class PageViewAggregator { @@ -200,7 +216,7 @@ public abstract static class PageViewAggregator { public @Nullable abstract Long getStartTime(); - public @Nullable abstract String getPageRef(); + public @Nullable abstract String getPage(); public @Nullable abstract Long getCount(); @@ -217,7 +233,7 @@ public abstract static class Builder { public abstract Builder setStartTime(Long value); - public abstract Builder setPageRef(String value); + public abstract Builder setPage(String value); public abstract Builder setCount(Long value); diff --git a/retail/retail-java-applications/data-engineering-dept/data-objects/src/main/java/com/google/dataflow/sample/retail/dataobjects/Ecommerce.java b/retail/retail-java-applications/data-engineering-dept/data-objects/src/main/java/com/google/dataflow/sample/retail/dataobjects/Ecommerce.java new file mode 100644 index 00000000..814e13f0 --- /dev/null +++ b/retail/retail-java-applications/data-engineering-dept/data-objects/src/main/java/com/google/dataflow/sample/retail/dataobjects/Ecommerce.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.dataflow.sample.retail.dataobjects; + +import com.google.auto.value.AutoValue; +import java.util.List; +import javax.annotation.Nullable; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldName; + +@AutoValue +@DefaultSchema(AutoValueSchema.class) +public abstract class Ecommerce { + + @SchemaFieldName("items") + public @Nullable abstract List getItems(); + + @SchemaFieldName("purchase") + public @Nullable abstract Purchase getPurchase(); + + public static Builder builder() { + + return new AutoValue_Ecommerce.Builder(); + } + + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder setItems(List value); + + public abstract Builder setPurchase(Purchase purchase); + + public abstract Ecommerce build(); + } +} diff --git a/retail/retail-java-applications/data-engineering-dept/data-objects/src/main/java/com/google/dataflow/sample/retail/dataobjects/Item.java b/retail/retail-java-applications/data-engineering-dept/data-objects/src/main/java/com/google/dataflow/sample/retail/dataobjects/Item.java new file mode 100644 index 00000000..146f73f3 --- /dev/null +++ b/retail/retail-java-applications/data-engineering-dept/data-objects/src/main/java/com/google/dataflow/sample/retail/dataobjects/Item.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.dataflow.sample.retail.dataobjects; + +import com.google.auto.value.AutoValue; +import javax.annotation.Nullable; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldName; + +/** + * + * + *
{@code
+ * {
+ *       "item_name": "Donut Friday Scented T-Shirt",
+ *       "item_id": "67890",
+ *       "price": 33.75,
+ *       "item_brand": "Google",
+ *       "item_category": "Apparel",
+ *       "item_category_2": "Mens",
+ *       "item_category_3": "Shirts",
+ *       "item_category_4": "Tshirts",
+ *       "item_variant": "Black",
+ *       "item_list_name": "Search Results",
+ *       "item_list_id": "SR123",
+ *       "index": 1,
+ *       "quantity": 2
+ *     }
+ *
+ * }
+ */ +@AutoValue +@DefaultSchema(AutoValueSchema.class) +public abstract class Item { + + @SchemaFieldName("item_name") + public @Nullable abstract String getItemName(); + + @SchemaFieldName("item_id") + public @Nullable abstract String getItemId(); + + @SchemaFieldName("price") + public @Nullable abstract Float getPrice(); + + @SchemaFieldName("item_brand") + public @Nullable abstract String getItemBrand(); + + @SchemaFieldName("item_category") + public @Nullable abstract String getItemCat01(); + + @SchemaFieldName("item_category_2") + public @Nullable abstract String getItemCat02(); + + @SchemaFieldName("item_category_3") + public @Nullable abstract String getItemCat03(); + + @SchemaFieldName("item_category_4") + public @Nullable abstract String getItemCat04(); + + @SchemaFieldName("item_category_5") + public @Nullable abstract String getItemCat05(); + + @SchemaFieldName("item_variant") + public @Nullable abstract String getItemVariant(); + + @SchemaFieldName("item_list_name") + public @Nullable abstract String getItemListName(); + + @SchemaFieldName("item_list_id") + public @Nullable abstract String getItemListId(); + + @SchemaFieldName("index") + public @Nullable abstract Integer getIndex(); + + @SchemaFieldName("quantity") + public @Nullable abstract Integer getQuantity(); + + public static Builder builder() { + return new AutoValue_Item.Builder(); + } + + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder setItemName(String newItemName); + + public abstract Builder setItemId(String newItemId); + + public abstract Builder setPrice(Float newPrice); + + public abstract Builder setItemBrand(String newItemBrand); + + public abstract Builder setItemCat01(String newItemCat01); + + public abstract Builder setItemCat02(String newItemCat02); + + public abstract Builder setItemCat03(String newItemCat03); + + public abstract Builder setItemCat04(String newItemCat04); + + public abstract Builder setItemCat05(String newItemCat05); + + public abstract Builder setItemVariant(String newItemVariant); + + public abstract Builder setItemListName(String newItemListName); + + public abstract Builder setItemListId(String newItemListId); + + public abstract Builder setIndex(Integer newIndex); + + public abstract Builder setQuantity(Integer newQuantity); + + public abstract Item build(); + } +} diff --git a/retail/retail-java-applications/data-engineering-dept/data-objects/src/main/java/com/google/dataflow/sample/retail/dataobjects/Purchase.java b/retail/retail-java-applications/data-engineering-dept/data-objects/src/main/java/com/google/dataflow/sample/retail/dataobjects/Purchase.java new file mode 100644 index 00000000..f536b82e --- /dev/null +++ b/retail/retail-java-applications/data-engineering-dept/data-objects/src/main/java/com/google/dataflow/sample/retail/dataobjects/Purchase.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.dataflow.sample.retail.dataobjects; + +import com.google.auto.value.AutoValue; +import javax.annotation.Nullable; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldName; + +/** + * + * + *
{@code
+ * {
+ *   "event_datetime":"2020-11-16 20:59:59",
+ *   "event": "purchase",
+ *   "user_id": "UID00001",
+ *   "client_id": "CID00003",
+ *   "page":"/checkout",
+ *   "page_previous": "/order-confirmation",
+ *   "ecommerce": {
+ *     "purchase": {
+ *       "transaction_id": "T12345",
+ *       "affiliation": "Online Store",
+ *       "value": 35.43,
+ *       "tax": 4.90,
+ *       "shipping": 5.99,
+ *       "currency": "EUR",
+ *       "coupon": "SUMMER_SALE",
+ *       "items": [{
+ *         "item_name": "Triblend Android T-Shirt",
+ *         "item_id": "12345",
+ *         "item_price": 15.25,
+ *         "item_brand": "Google",
+ *         "item_category": "Apparel",
+ *         "item_variant": "Gray",
+ *         "quantity": 1,
+ *         "item_coupon": ""
+ *       }, {
+ *         "item_name": "Donut Friday Scented T-Shirt",
+ *         "item_id": "67890",
+ *         "item_price": 33.75,
+ *         "item_brand": "Google",
+ *         "item_category": "Apparel",
+ *         "item_variant": "Black",
+ *         "quantity": 1
+ *       }]
+ *     }
+ *   }
+ * }
+ *
+ * }
+ */ +@AutoValue +@DefaultSchema(AutoValueSchema.class) +public abstract class Purchase { + + @SchemaFieldName("transaction_id") + public @Nullable abstract String getItemName(); + + @SchemaFieldName("affiliation") + public @Nullable abstract String getItemId(); + + @SchemaFieldName("value") + public @Nullable abstract String getPrice(); + + @SchemaFieldName("tax") + public @Nullable abstract String getItemBrand(); + + @SchemaFieldName("shipping") + public @Nullable abstract String getItemCat01(); + + @SchemaFieldName("currency") + public @Nullable abstract String getItemCat02(); + + @SchemaFieldName("coupon") + public @Nullable abstract String getItemCat03(); + + public static Builder builder() { + return new AutoValue_Purchase.Builder(); + } + + @AutoValue.Builder + public abstract static class Builder { + + public abstract Builder setItemName(String newItemName); + + public abstract Builder setItemId(String newItemId); + + public abstract Builder setPrice(String newPrice); + + public abstract Builder setItemBrand(String newItemBrand); + + public abstract Builder setItemCat01(String newItemCat01); + + public abstract Builder setItemCat02(String newItemCat02); + + public abstract Builder setItemCat03(String newItemCat03); + + public abstract Purchase build(); + } +} diff --git a/retail/retail-java-applications/data-engineering-dept/pipelines/src/main/java/com/google/dataflow/sample/retail/pipeline/RetailDataProcessingPipeline.java b/retail/retail-java-applications/data-engineering-dept/pipelines/src/main/java/com/google/dataflow/sample/retail/pipeline/RetailDataProcessingPipeline.java index d40aa3a7..c7e901c4 100644 --- a/retail/retail-java-applications/data-engineering-dept/pipelines/src/main/java/com/google/dataflow/sample/retail/pipeline/RetailDataProcessingPipeline.java +++ b/retail/retail-java-applications/data-engineering-dept/pipelines/src/main/java/com/google/dataflow/sample/retail/pipeline/RetailDataProcessingPipeline.java @@ -30,6 +30,7 @@ import com.google.dataflow.sample.retail.businesslogic.core.transforms.transaction.TransactionProcessing; import com.google.dataflow.sample.retail.businesslogic.core.utils.Print; import com.google.dataflow.sample.retail.businesslogic.core.utils.ReadPubSubMsgPayLoadAsString; +import com.google.dataflow.sample.retail.dataobjects.ClickStream.ClickStreamEvent; import com.google.dataflow.sample.retail.dataobjects.Stock.StockEvent; import com.google.dataflow.sample.retail.dataobjects.StockAggregation; import com.google.dataflow.sample.retail.dataobjects.Transaction.TransactionEvent; @@ -40,6 +41,7 @@ import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.MapElements; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.ToJson; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; import org.apache.beam.sdk.values.TypeDescriptors; @@ -53,7 +55,7 @@ @Experimental public class RetailDataProcessingPipeline { - @VisibleForTesting public PCollection testClickstreamEvents = null; + @VisibleForTesting public PCollection testClickstreamEvents = null; @VisibleForTesting public PCollection testTransactionEvents = null; @@ -81,7 +83,7 @@ public void startRetailPipeline(Pipeline p) throws Exception { .withTimestampAttribute("TIMESTAMP")); } else { checkNotNull(testClickstreamEvents, "In TestMode you must set testClickstreamEvents"); - clickStreamJSONMessages = testClickstreamEvents; + clickStreamJSONMessages = testClickstreamEvents.apply(ToJson.of()); } clickStreamJSONMessages.apply(new ClickstreamProcessing()); diff --git a/retail/retail-java-applications/data-engineering-dept/pipelines/src/test/java/com/google/dataflow/sample/retail/pipeine/test/RetailDataProcessingPipelineSimpleSmokeTest.java b/retail/retail-java-applications/data-engineering-dept/pipelines/src/test/java/com/google/dataflow/sample/retail/pipeline/test/RetailDataProcessingPipelineSimpleSmokeTest.java similarity index 97% rename from retail/retail-java-applications/data-engineering-dept/pipelines/src/test/java/com/google/dataflow/sample/retail/pipeine/test/RetailDataProcessingPipelineSimpleSmokeTest.java rename to retail/retail-java-applications/data-engineering-dept/pipelines/src/test/java/com/google/dataflow/sample/retail/pipeline/test/RetailDataProcessingPipelineSimpleSmokeTest.java index c4717c5c..4d31497a 100644 --- a/retail/retail-java-applications/data-engineering-dept/pipelines/src/test/java/com/google/dataflow/sample/retail/pipeine/test/RetailDataProcessingPipelineSimpleSmokeTest.java +++ b/retail/retail-java-applications/data-engineering-dept/pipelines/src/test/java/com/google/dataflow/sample/retail/pipeline/test/RetailDataProcessingPipelineSimpleSmokeTest.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.google.dataflow.sample.retail.pipeine.test; +package com.google.dataflow.sample.retail.pipeline.test; import com.google.dataflow.sample.retail.businesslogic.core.options.RetailPipelineOptions; import com.google.dataflow.sample.retail.pipeline.RetailDataProcessingPipeline; diff --git a/retail/retail-java-applications/data-engineering-dept/pipelines/src/test/java/com/google/dataflow/sample/retail/pipeine/test/TestStreamGenerator.java b/retail/retail-java-applications/data-engineering-dept/pipelines/src/test/java/com/google/dataflow/sample/retail/pipeline/test/TestStreamGenerator.java similarity index 57% rename from retail/retail-java-applications/data-engineering-dept/pipelines/src/test/java/com/google/dataflow/sample/retail/pipeine/test/TestStreamGenerator.java rename to retail/retail-java-applications/data-engineering-dept/pipelines/src/test/java/com/google/dataflow/sample/retail/pipeline/test/TestStreamGenerator.java index 6aed0883..f74b20f1 100644 --- a/retail/retail-java-applications/data-engineering-dept/pipelines/src/test/java/com/google/dataflow/sample/retail/pipeine/test/TestStreamGenerator.java +++ b/retail/retail-java-applications/data-engineering-dept/pipelines/src/test/java/com/google/dataflow/sample/retail/pipeline/test/TestStreamGenerator.java @@ -15,11 +15,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.google.dataflow.sample.retail.pipeine.test; +package com.google.dataflow.sample.retail.pipeline.test; -import com.google.dataflow.sample.retail.businesslogic.core.utils.test.avrotestobjects.ClickStreamEventAVRO; import com.google.dataflow.sample.retail.businesslogic.core.utils.test.avrotestobjects.InventoryAVRO; import com.google.dataflow.sample.retail.businesslogic.core.utils.test.avrotestobjects.TransactionsAVRO; +import com.google.dataflow.sample.retail.dataobjects.ClickStream.ClickStreamEvent; +import com.google.dataflow.sample.retail.dataobjects.Ecommerce; +import com.google.dataflow.sample.retail.dataobjects.Item; import java.util.ArrayList; import java.util.List; import java.util.UUID; @@ -32,6 +34,7 @@ import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; +import org.apache.beam.vendor.grpc.v1p26p0.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.grpc.v1p26p0.com.google.gson.Gson; import org.apache.beam.vendor.grpc.v1p26p0.com.google.gson.GsonBuilder; import org.joda.time.Duration; @@ -46,7 +49,7 @@ public class TestStreamGenerator extends PTransform { private static final Logger LOG = LoggerFactory.getLogger(TestStreamGenerator.class); - public static final TupleTag CLICKSTREAM = new TupleTag() {}; + public static final TupleTag CLICKSTREAM = new TupleTag() {}; public static final TupleTag TRANSACTION = new TupleTag() {}; public static final TupleTag STOCK = new TupleTag() {}; @@ -60,7 +63,7 @@ public PCollectionTuple expand(PBegin input) { .withOutputTags(CLICKSTREAM, TupleTagList.of(TRANSACTION).and(STOCK))); } - private static class CreateClickStream extends DoFn { + private static class CreateClickStream extends DoFn { Gson gson = null; DateTimeFormatter fm; @@ -86,74 +89,65 @@ public void process(ProcessContext pc, @Timestamp Instant time) { String sessionId = UUID.randomUUID().toString(); - ClickStreamEventAVRO click = new ClickStreamEventAVRO(); - click.clientId = sessionId; - click.eventDateTime = time.toString(fm); - click.clientId = UUID.randomUUID().toString(); - click.pageRef = String.format("P%s", pageReferrer); - click.pageTarget = String.format("P%s", currentPage); - click.agent = - "Mozilla/5.0 (iPhone; CPU iPhone OS 12_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148"; - click.transaction = false; - click.uid = 1L; - click.event = "browse"; - - pc.output(gson.toJson(click)); - LOG.debug(String.format("Genrating Msg: %s", gson.toJson(click))); + ClickStreamEvent click = + ClickStreamEvent.builder() + .setClientId(sessionId) + .setEventTime(time.toString(fm)) + .setClientId(UUID.randomUUID().toString()) + .setPage(String.format("P%s", pageReferrer)) + .setPagePrevious(String.format("P%s", currentPage)) + .setUid(1L) + .setEvent("browse") + .build(); + + pc.output(click); + LOG.debug(String.format("Generating Msg: %s", gson.toJson(click))); pageReferrer = currentPage; currentPage = ThreadLocalRandom.current().nextInt(10); } - ClickStreamEventAVRO click = new ClickStreamEventAVRO(); - click.clientId = UUID.randomUUID().toString(); - click.eventDateTime = time.toString(fm); - click.pageRef = String.format("P%s", pageReferrer); - click.pageTarget = String.format("P%s", currentPage); - click.agent = - "Mozilla/5.0 (iPhone; CPU iPhone OS 12_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148"; - click.transaction = false; - click.uid = 1L; - click.event = "ERROR-CODE-B87769A"; + ClickStreamEvent.Builder click = + ClickStreamEvent.builder() + .setClientId(UUID.randomUUID().toString()) + .setEventTime(time.toString(fm)) + .setPage(String.format("P%s", pageReferrer)) + .setPagePrevious(String.format("P%s", currentPage)) + .setUid(1L) + .setEvent("ERROR-CODE-B87769A"); /** * ********************************************************************************************** * Generate and error event. * ********************************************************************************************** */ - pc.output(gson.toJson(click)); - LOG.debug(String.format("Generating Msg: %s", gson.toJson(click))); + pc.output(click.build()); /** * ********************************************************************************************** * Generate missing uid event. * ********************************************************************************************** */ - click.event = "browse"; - click.uid = null; - click.clientId = UUID.randomUUID().toString(); - pc.output(gson.toJson(click)); - LOG.debug(String.format("Generating Msg: %s", gson.toJson(click))); + click.setEvent("browse").setUid(null).setClientId(UUID.randomUUID().toString()); + pc.output(click.build()); /** * ********************************************************************************************** * Generate bad date format events. * ********************************************************************************************** */ - click.clientId = UUID.randomUUID().toString(); - click.eventDateTime = "BROKEN DATE FORMAT!!!"; - pc.output(gson.toJson(click)); - LOG.debug(String.format("Generating Msg: %s", gson.toJson(click))); + click.setEventTime("BROKEN DATE FORMAT!!!").setClientId(UUID.randomUUID().toString()); + pc.output(click.build()); /** * ********************************************************************************************** * Generate events with date in the future * ********************************************************************************************** */ - click.clientId = UUID.randomUUID().toString(); - click.eventDateTime = time.plus(Duration.standardDays(30)).toString(fm); - pc.output(gson.toJson(click)); - LOG.debug(String.format("Generating Msg: %s", gson.toJson(click))); + click + .setEventTime(time.plus(Duration.standardDays(30)).toString(fm)) + .setClientId(UUID.randomUUID().toString()); + pc.output(click.build()); /** * ********************************************************************************************** @@ -165,24 +159,22 @@ public void process(ProcessContext pc, @Timestamp Instant time) { String sessionId = UUID.randomUUID().toString(); - List clickstream = new ArrayList<>(); + List clickstream = new ArrayList<>(); Instant clickTime = time; for (int i = 0; i < 3; i++) { - click = new ClickStreamEventAVRO(); - click.clientId = sessionId; - click.eventDateTime = clickTime.toString(fm); - click.pageRef = String.format("P%s", pageReferrer); - click.pageTarget = String.format("P%s", currentPage); - click.agent = - "Mozilla/5.0 (iPhone; CPU iPhone OS 12_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148"; - click.transaction = false; - click.uid = 1L; - click.event = "browse"; - - pc.outputWithTimestamp(CLICKSTREAM, gson.toJson(click), clickTime); - LOG.debug(String.format("Generating Msg: %s", gson.toJson(click))); + click = + ClickStreamEvent.builder() + .setClientId(sessionId) + .setEventTime(time.toString(fm)) + .setClientId(UUID.randomUUID().toString()) + .setPage(String.format("P%s", pageReferrer)) + .setPagePrevious(String.format("P%s", currentPage)) + .setUid(1L) + .setEvent("browse"); + + pc.outputWithTimestamp(CLICKSTREAM, click.build(), clickTime); clickTime = clickTime.plus(Duration.standardSeconds(i + 2)); pageReferrer = currentPage; @@ -191,35 +183,29 @@ public void process(ProcessContext pc, @Timestamp Instant time) { clickTime = clickTime.plus(Duration.standardSeconds(1)); - ClickStreamEventAVRO addToCart = new ClickStreamEventAVRO(); - addToCart.clientId = sessionId; - addToCart.eventDateTime = time.toString(fm); - addToCart.pageRef = String.format("P%s", pageReferrer); - addToCart.pageTarget = String.format("P%s", currentPage); - addToCart.agent = - "Mozilla/5.0 (iPhone; CPU iPhone OS 12_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148"; - addToCart.transaction = false; - addToCart.uid = 1L; - addToCart.event = "add-to-cart"; + ClickStreamEvent.Builder addToCart = click.setEvent("add-to-cart"); - clickstream.add(gson.toJson(addToCart)); - LOG.debug(String.format("Generating Msg: %s", gson.toJson(addToCart))); + clickstream.add(addToCart.build()); clickTime = clickTime.plus(Duration.standardSeconds(1)); - ClickStreamEventAVRO purchase = new ClickStreamEventAVRO(); - purchase.eventDateTime = time.toString(fm); - purchase.clientId = sessionId; - purchase.pageRef = String.format("P%s", pageReferrer); - purchase.pageTarget = String.format("P%s", currentPage); - purchase.agent = - "Mozilla/5.0 (iPhone; CPU iPhone OS 12_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148"; - purchase.transaction = false; - purchase.uid = 1L; - purchase.event = "purchase"; - - clickstream.add(gson.toJson(purchase)); - LOG.debug(String.format("Generating Msg: %s", gson.toJson(purchase))); + ClickStreamEvent.Builder purchase = + click + .setEvent("purchase") + .setEcommerce( + Ecommerce.builder() + .setItems( + ImmutableList.of( + Item.builder() + .setIndex(0) + .setItemCat01("cat01") + .setItemListId("1") + .setPrice(1F) + .setQuantity(1) + .build())) + .build()); + + clickstream.add(purchase.build()); clickTime = clickTime.plus(Duration.standardSeconds(1)); @@ -256,19 +242,17 @@ public void process(ProcessContext pc, @Timestamp Instant time) { for (int i = 0; i < 3; i++) { - click = new ClickStreamEventAVRO(); - click.clientId = sessionId; - click.eventDateTime = clickTime.toString(fm); - click.pageRef = String.format("P%s", pageReferrer); - click.pageTarget = String.format("P%s", currentPage); - click.agent = - "Mozilla/5.0 (iPhone; CPU iPhone OS 12_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148"; - click.transaction = false; - click.uid = 1L; - click.event = "browse"; - - pc.outputWithTimestamp(CLICKSTREAM, gson.toJson(click), clickTime); - LOG.debug(String.format("Generating Msg: %s", gson.toJson(click))); + click = + ClickStreamEvent.builder() + .setClientId(sessionId) + .setEventTime(time.toString(fm)) + .setClientId(UUID.randomUUID().toString()) + .setPage(String.format("P%s", pageReferrer)) + .setPagePrevious(String.format("P%s", currentPage)) + .setUid(1L) + .setEvent("browse"); + + pc.outputWithTimestamp(CLICKSTREAM, click.build(), clickTime); clickTime = clickTime.plus(Duration.standardSeconds(i + 2)); pageReferrer = currentPage; @@ -279,18 +263,17 @@ public void process(ProcessContext pc, @Timestamp Instant time) { for (int i = 0; i < 3; i++) { - click = new ClickStreamEventAVRO(); - click.clientId = sessionId; - click.eventDateTime = clickTime.toString(fm); - click.pageRef = String.format("P%s", pageReferrer); - click.pageTarget = String.format("P%s", currentPage); - click.agent = - "Mozilla/5.0 (iPhone; CPU iPhone OS 12_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148"; - click.transaction = false; - click.uid = 1L; - click.event = "browse"; - - pc.outputWithTimestamp(CLICKSTREAM, gson.toJson(click), clickTime); + click = + ClickStreamEvent.builder() + .setClientId(sessionId) + .setEventTime(time.toString(fm)) + .setClientId(UUID.randomUUID().toString()) + .setPage(String.format("P%s", pageReferrer)) + .setPagePrevious(String.format("P%s", currentPage)) + .setUid(1L) + .setEvent("browse"); + + pc.outputWithTimestamp(CLICKSTREAM, click.build(), clickTime); LOG.debug(String.format("Generating Msg: %s", gson.toJson(click))); clickTime = clickTime.plus(Duration.standardSeconds(i + 2)); From 9a815b0a4ed636ca51667f47fa4cef5435a0f22f Mon Sep 17 00:00:00 2001 From: rarokni Date: Thu, 20 May 2021 20:12:41 +0800 Subject: [PATCH 2/4] [Retail] Error output for gradle --- .../business-logic/build.gradle | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/retail/retail-java-applications/data-engineering-dept/business-logic/build.gradle b/retail/retail-java-applications/data-engineering-dept/business-logic/build.gradle index 46b770f3..1d5c0e28 100644 --- a/retail/retail-java-applications/data-engineering-dept/business-logic/build.gradle +++ b/retail/retail-java-applications/data-engineering-dept/business-logic/build.gradle @@ -34,6 +34,19 @@ tasks.matching {task -> task.name.startsWith('spotbugs')}.forEach { } } +test { + testLogging { + events "passed", "skipped", "failed" //, "standardOut", "standardError" + + showExceptions true + exceptionFormat "full" + showCauses true + showStackTraces true + + showStandardStreams = false + } +} + dependencies { compile project(':data-engineering-dept:data-objects') From 03123c448880bc9db932afb07eefbe64088a7b85 Mon Sep 17 00:00:00 2001 From: rarokni Date: Thu, 20 May 2021 20:21:56 +0800 Subject: [PATCH 3/4] [Retail] Fix unittest timezone issue --- .../retail/businesslogic/core/utils/test/JSONUtilsTest.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/retail/retail-java-applications/data-engineering-dept/business-logic/src/test/java/com/google/dataflow/sample/retail/businesslogic/core/utils/test/JSONUtilsTest.java b/retail/retail-java-applications/data-engineering-dept/business-logic/src/test/java/com/google/dataflow/sample/retail/businesslogic/core/utils/test/JSONUtilsTest.java index aacea50d..90fc0038 100644 --- a/retail/retail-java-applications/data-engineering-dept/business-logic/src/test/java/com/google/dataflow/sample/retail/businesslogic/core/utils/test/JSONUtilsTest.java +++ b/retail/retail-java-applications/data-engineering-dept/business-logic/src/test/java/com/google/dataflow/sample/retail/businesslogic/core/utils/test/JSONUtilsTest.java @@ -24,6 +24,7 @@ import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.JsonToRow; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; import org.joda.time.Instant; import org.junit.Rule; import org.junit.Test; @@ -33,7 +34,7 @@ /** Unit tests for {@link JsonToRow}. */ @RunWith(JUnit4.class) public class JSONUtilsTest { - private static final Long TIME = Instant.parse("2000-01-01T00:00:00").getMillis(); + private static final Long TIME = Instant.parse("2000-01-01T00:00:00Z").getMillis(); private static final ClickStreamEvent AUTO_VALUE_EVENT = ClickStreamEvent.builder() @@ -46,7 +47,7 @@ public class JSONUtilsTest { .build(); private static final String JSON = - "{\"eventTime\":null,\"event\":\"event\",\"timestamp\":946656000000,\"user_id\":999,\"client_id\":\"1\",\"page\":\"pageRef\",\"page_previous\":\"pageTarget\",\"ecommerce\":null}\n"; + "{\"eventTime\":null,\"event\":\"event\",\"timestamp\":946684800000,\"user_id\":999,\"client_id\":\"1\",\"page\":\"pageRef\",\"page_previous\":\"pageTarget\",\"ecommerce\":null}\n"; @Rule public transient TestPipeline pipeline = TestPipeline.create(); From 95102dc818e98a7ff04e6a3ebf682c406bd31f9c Mon Sep 17 00:00:00 2001 From: rarokni Date: Tue, 25 May 2021 10:13:01 +0800 Subject: [PATCH 4/4] [Retail] Rename clickstream session test --- ...BackFillSessionDataTest.java => ClickStreamSessionTest.java} | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) rename retail/retail-java-applications/data-engineering-dept/business-logic/src/test/java/com/google/dataflow/sample/retail/businesslogic/core/utils/test/clickstream/{BackFillSessionDataTest.java => ClickStreamSessionTest.java} (98%) diff --git a/retail/retail-java-applications/data-engineering-dept/business-logic/src/test/java/com/google/dataflow/sample/retail/businesslogic/core/utils/test/clickstream/BackFillSessionDataTest.java b/retail/retail-java-applications/data-engineering-dept/business-logic/src/test/java/com/google/dataflow/sample/retail/businesslogic/core/utils/test/clickstream/ClickStreamSessionTest.java similarity index 98% rename from retail/retail-java-applications/data-engineering-dept/business-logic/src/test/java/com/google/dataflow/sample/retail/businesslogic/core/utils/test/clickstream/BackFillSessionDataTest.java rename to retail/retail-java-applications/data-engineering-dept/business-logic/src/test/java/com/google/dataflow/sample/retail/businesslogic/core/utils/test/clickstream/ClickStreamSessionTest.java index 2b873f28..307ed706 100644 --- a/retail/retail-java-applications/data-engineering-dept/business-logic/src/test/java/com/google/dataflow/sample/retail/businesslogic/core/utils/test/clickstream/BackFillSessionDataTest.java +++ b/retail/retail-java-applications/data-engineering-dept/business-logic/src/test/java/com/google/dataflow/sample/retail/businesslogic/core/utils/test/clickstream/ClickStreamSessionTest.java @@ -30,7 +30,7 @@ import org.junit.Rule; import org.junit.Test; -public class BackFillSessionDataTest { +public class ClickStreamSessionTest { @Rule public transient TestPipeline pipeline = TestPipeline.create();