diff --git a/retail/retail-java-applications/README.MD b/retail/retail-java-applications/README.MD index e860c9d1..6a4ebe6f 100644 --- a/retail/retail-java-applications/README.MD +++ b/retail/retail-java-applications/README.MD @@ -65,7 +65,7 @@ The application processes the following types of data: ------ # To run the pipeline locally you can make use of the smoke test ``` -gradlew :data-engineering-dept:pipelines:test --tests com.google.dataflow.sample.retail.pipeine.test.RetailDataProcessingPipelineSimpleSmokeTest --info --rerun-tasks +gradlew :data-engineering-dept:pipelines:test --tests RetailDataProcessingPipelineSimpleSmokeTest --info --rerun-tasks ``` # Task patterns The application contains a number of task patterns that show the best way to accomplish Java programming tasks that are commonly needed to create this type of application. diff --git a/retail/retail-java-applications/data-engineering-dept/business-logic/build.gradle b/retail/retail-java-applications/data-engineering-dept/business-logic/build.gradle index 46b770f3..1d5c0e28 100644 --- a/retail/retail-java-applications/data-engineering-dept/business-logic/build.gradle +++ b/retail/retail-java-applications/data-engineering-dept/business-logic/build.gradle @@ -34,6 +34,19 @@ tasks.matching {task -> task.name.startsWith('spotbugs')}.forEach { } } +test { + testLogging { + events "passed", "skipped", "failed" //, "standardOut", "standardError" + + showExceptions true + exceptionFormat "full" + showCauses true + showStackTraces true + + showStandardStreams = false + } +} + dependencies { compile project(':data-engineering-dept:data-objects') diff --git a/retail/retail-java-applications/data-engineering-dept/business-logic/src/main/java/com/google/dataflow/sample/retail/businesslogic/core/transforms/clickstream/BackFillSessionData.java b/retail/retail-java-applications/data-engineering-dept/business-logic/src/main/java/com/google/dataflow/sample/retail/businesslogic/core/transforms/clickstream/BackFillSessionData.java deleted file mode 100644 index 192dfcb9..00000000 --- a/retail/retail-java-applications/data-engineering-dept/business-logic/src/main/java/com/google/dataflow/sample/retail/businesslogic/core/transforms/clickstream/BackFillSessionData.java +++ /dev/null @@ -1,165 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.dataflow.sample.retail.businesslogic.core.transforms.clickstream; - -import com.google.auto.value.AutoValue; -import java.util.ArrayList; -import java.util.Comparator; -import java.util.List; -import java.util.stream.Collectors; -import java.util.stream.IntStream; -import java.util.stream.StreamSupport; -import org.apache.beam.sdk.transforms.MapElements; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.Row; -import org.apache.beam.sdk.values.TypeDescriptors; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions; - -@AutoValue -/** - * Takes a ROW generated with the {@link ClickStreamSessions} transform and back propagates fields - * throughout the session. Given a list of {@link - * com.google.dataflow.sample.retail.dataobjects.ClickStream.ClickStreamEvent} the fields provided - * in {@link BackFillSessionData#getBackPropogateFields()} will be back propagated to all events - * ordered by event time. - * - *
For example if foo is the field that is passed to {@link - * BackFillSessionData#getBackPropogateFields()} then the first occurrence of foo will be propagated - * to any events that have foo == null * - * - *
{@code
- * Given the following input
- * { key : sess_1
- * [
- * {event_1 : { timestamp : 1, foo : null, bar : cup}}
- * {event_2 : { timestamp : 2, foo : null, bar : mug}}
- * {event_3 : { timestamp : 3, foo : 765 , bar : bottle }}
- * {event_4 : { timestamp : 3, foo : null , bar : bottle }}
- * ]
- * }
- *
- * The output will become
- * { key : sess_1
- * [
- * {event_1 : { timestamp : 1, foo : 765, bar : cup}}
- * {event_2 : { timestamp : 2, foo : 765, bar : mug}}
- * {event_3 : { timestamp : 3, foo : 765, bar : bottle }}
- * {event_4 : { timestamp : 4, foo : null , bar : bottle }}
- * ]
- * }
- * }
- *
- * If timestamp is missing from any of the elements, no correction is made as values can not be
- * sorted by time order. Only values before the first non-null value are filled, values after the
- * first non-null are assumed to be correct.
- */
-public abstract class BackFillSessionData extends PTransform{@code
- * Field Name Field Type
- * key ROW{clientID:STRING}
- * values ITERABLE[ROW[{@link com.google.dataflow.sample.retail.dataobjects.ClickStream.ClickStreamEvent}]]
+ * Field Name Field Type
+ * key ROW{clientID:STRING}
+ * value ITERABLE[ROW[ClickstreamEvent]]
* }
*/
@Experimental
@@ -71,17 +71,19 @@ public ClickStreamSessions withSessionWindowGapDuration(Duration sessionWindowGa
}
@Override
- /**
- * Returns a Row object in the format:
- *
- * {@code
- * Field Name Field Type
- * key ROW{clientID:STRING}
- * values ITERABLE[ROW[ClickstreamEvent]]
- * }
- */
public PCollection{@code
+ * Field Name Field Type
+ * key ROW{clientID:STRING}
+ * value ITERABLE[ROW[ClickstreamEvent]]
+ * }
+ */
return input
.apply(Window.into(Sessions.withGapDuration(getSessionWindowGapDuration())))
.apply(Group.byFieldNames("client_id"));
diff --git a/retail/retail-java-applications/data-engineering-dept/business-logic/src/main/java/com/google/dataflow/sample/retail/businesslogic/core/transforms/clickstream/CountViewsPerProduct.java b/retail/retail-java-applications/data-engineering-dept/business-logic/src/main/java/com/google/dataflow/sample/retail/businesslogic/core/transforms/clickstream/CountViewsPerProduct.java
index 5452e332..7d723fa7 100644
--- a/retail/retail-java-applications/data-engineering-dept/business-logic/src/main/java/com/google/dataflow/sample/retail/businesslogic/core/transforms/clickstream/CountViewsPerProduct.java
+++ b/retail/retail-java-applications/data-engineering-dept/business-logic/src/main/java/com/google/dataflow/sample/retail/businesslogic/core/transforms/clickstream/CountViewsPerProduct.java
@@ -67,9 +67,7 @@ public PCollectionView Item + * + *
{@code
+ * {
+ * "event_datetime":"2020-11-16 22:59:59",
+ * "event": "view_item",
+ * "user_id": "UID00003",
+ * "client_id": "CID00003",
+ * "page":"/product-67890",
+ * "page_previous": "/category-tshirts",
+ * "ecommerce": {
+ * "items": [{
+ * "item_name": "Donut Friday Scented T-Shirt",
+ * "item_id": "67890",
+ * "price": 33.75,
+ * "item_brand": "Google",
+ * "item_category": "Apparel",
+ * "item_category_2": "Mens",
+ * "item_category_3": "Shirts",
+ * "item_category_4": "Tshirts",
+ * "item_variant": "Black",
+ * "item_list_name": "Search Results",
+ * "item_list_id": "SR123",
+ * "index": 1,
+ * "quantity": 1
+ * }]
+ * }
+ * }
+ *
+ *
+ * }
+ *
+ * add_to_cart
+ *
+ * {@code
+ * {
+ * "event_datetime":"2020-11-16 20:59:59",
+ * "event": "add_to_cart",
+ * "user_id": "UID00003",
+ * "client_id": "CID00003",
+ * "page":"/product-67890",
+ * "page_previous": "/category-tshirts",
+ * "ecommerce": {
+ * "items": [{
+ * "item_name": "Donut Friday Scented T-Shirt",
+ * "item_id": "67890",
+ * "price": 33.75,
+ * "item_brand": "Google",
+ * "item_category": "Apparel",
+ * "item_category_2": "Mens",
+ * "item_category_3": "Shirts",
+ * "item_category_4": "Tshirts",
+ * "item_variant": "Black",
+ * "item_list_name": "Search Results",
+ * "item_list_id": "SR123",
+ * "index": 1,
+ * "quantity": 2
+ * }]
+ * }
+ * }
+ *
+ * }
+ *
+ * purchase
+ *
+ * {@code
+ * {
+ * "event_datetime":"2020-11-16 20:59:59",
+ * "event": "purchase",
+ * "user_id": "UID00001",
+ * "client_id": "CID00003",
+ * "page":"/checkout",
+ * "page_previous": "/order-confirmation",
+ * "ecommerce": {
+ * "purchase": {
+ * "transaction_id": "T12345",
+ * "affiliation": "Online Store",
+ * "value": 35.43,
+ * "tax": 4.90,
+ * "shipping": 5.99,
+ * "currency": "EUR",
+ * "coupon": "SUMMER_SALE",
+ * "items": [{
+ * "item_name": "Triblend Android T-Shirt",
+ * "item_id": "12345",
+ * "item_price": 15.25,
+ * "item_brand": "Google",
+ * "item_category": "Apparel",
+ * "item_variant": "Gray",
+ * "quantity": 1,
+ * "item_coupon": ""
+ * }, {
+ * "item_name": "Donut Friday Scented T-Shirt",
+ * "item_id": "67890",
+ * "item_price": 33.75,
+ * "item_brand": "Google",
+ * "item_category": "Apparel",
+ * "item_variant": "Black",
+ * "quantity": 1
+ * }]
+ * }
+ * }
+ * }
+ *
+ * }
*/
@Experimental
public class ClickStream {
@@ -55,17 +160,14 @@ public abstract static class ClickStreamEvent {
@SchemaFieldName("client_id")
public @Nullable abstract String getClientId();
- @SchemaFieldName("pageRef")
- public @Nullable abstract String getPageRef();
+ @SchemaFieldName("page")
+ public @Nullable abstract String getPage();
- @SchemaFieldName("pageTarget")
- public @Nullable abstract String getPageTarget();
+ @SchemaFieldName("page_previous")
+ public @Nullable abstract String getPagePrevious();
- @SchemaFieldName("agent")
- public @Nullable abstract String getAgent();
-
- @SchemaFieldName("items")
- public @Nullable abstract List{@code
+ * {
+ * "item_name": "Donut Friday Scented T-Shirt",
+ * "item_id": "67890",
+ * "price": 33.75,
+ * "item_brand": "Google",
+ * "item_category": "Apparel",
+ * "item_category_2": "Mens",
+ * "item_category_3": "Shirts",
+ * "item_category_4": "Tshirts",
+ * "item_variant": "Black",
+ * "item_list_name": "Search Results",
+ * "item_list_id": "SR123",
+ * "index": 1,
+ * "quantity": 2
+ * }
+ *
+ * }
+ */
+@AutoValue
+@DefaultSchema(AutoValueSchema.class)
+public abstract class Item {
+
+ @SchemaFieldName("item_name")
+ public @Nullable abstract String getItemName();
+
+ @SchemaFieldName("item_id")
+ public @Nullable abstract String getItemId();
+
+ @SchemaFieldName("price")
+ public @Nullable abstract Float getPrice();
+
+ @SchemaFieldName("item_brand")
+ public @Nullable abstract String getItemBrand();
+
+ @SchemaFieldName("item_category")
+ public @Nullable abstract String getItemCat01();
+
+ @SchemaFieldName("item_category_2")
+ public @Nullable abstract String getItemCat02();
+
+ @SchemaFieldName("item_category_3")
+ public @Nullable abstract String getItemCat03();
+
+ @SchemaFieldName("item_category_4")
+ public @Nullable abstract String getItemCat04();
+
+ @SchemaFieldName("item_category_5")
+ public @Nullable abstract String getItemCat05();
+
+ @SchemaFieldName("item_variant")
+ public @Nullable abstract String getItemVariant();
+
+ @SchemaFieldName("item_list_name")
+ public @Nullable abstract String getItemListName();
+
+ @SchemaFieldName("item_list_id")
+ public @Nullable abstract String getItemListId();
+
+ @SchemaFieldName("index")
+ public @Nullable abstract Integer getIndex();
+
+ @SchemaFieldName("quantity")
+ public @Nullable abstract Integer getQuantity();
+
+ public static Builder builder() {
+ return new AutoValue_Item.Builder();
+ }
+
+ @AutoValue.Builder
+ public abstract static class Builder {
+ public abstract Builder setItemName(String newItemName);
+
+ public abstract Builder setItemId(String newItemId);
+
+ public abstract Builder setPrice(Float newPrice);
+
+ public abstract Builder setItemBrand(String newItemBrand);
+
+ public abstract Builder setItemCat01(String newItemCat01);
+
+ public abstract Builder setItemCat02(String newItemCat02);
+
+ public abstract Builder setItemCat03(String newItemCat03);
+
+ public abstract Builder setItemCat04(String newItemCat04);
+
+ public abstract Builder setItemCat05(String newItemCat05);
+
+ public abstract Builder setItemVariant(String newItemVariant);
+
+ public abstract Builder setItemListName(String newItemListName);
+
+ public abstract Builder setItemListId(String newItemListId);
+
+ public abstract Builder setIndex(Integer newIndex);
+
+ public abstract Builder setQuantity(Integer newQuantity);
+
+ public abstract Item build();
+ }
+}
diff --git a/retail/retail-java-applications/data-engineering-dept/data-objects/src/main/java/com/google/dataflow/sample/retail/dataobjects/Purchase.java b/retail/retail-java-applications/data-engineering-dept/data-objects/src/main/java/com/google/dataflow/sample/retail/dataobjects/Purchase.java
new file mode 100644
index 00000000..f536b82e
--- /dev/null
+++ b/retail/retail-java-applications/data-engineering-dept/data-objects/src/main/java/com/google/dataflow/sample/retail/dataobjects/Purchase.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.dataflow.sample.retail.dataobjects;
+
+import com.google.auto.value.AutoValue;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldName;
+
+/**
+ *
+ *
+ * {@code
+ * {
+ * "event_datetime":"2020-11-16 20:59:59",
+ * "event": "purchase",
+ * "user_id": "UID00001",
+ * "client_id": "CID00003",
+ * "page":"/checkout",
+ * "page_previous": "/order-confirmation",
+ * "ecommerce": {
+ * "purchase": {
+ * "transaction_id": "T12345",
+ * "affiliation": "Online Store",
+ * "value": 35.43,
+ * "tax": 4.90,
+ * "shipping": 5.99,
+ * "currency": "EUR",
+ * "coupon": "SUMMER_SALE",
+ * "items": [{
+ * "item_name": "Triblend Android T-Shirt",
+ * "item_id": "12345",
+ * "item_price": 15.25,
+ * "item_brand": "Google",
+ * "item_category": "Apparel",
+ * "item_variant": "Gray",
+ * "quantity": 1,
+ * "item_coupon": ""
+ * }, {
+ * "item_name": "Donut Friday Scented T-Shirt",
+ * "item_id": "67890",
+ * "item_price": 33.75,
+ * "item_brand": "Google",
+ * "item_category": "Apparel",
+ * "item_variant": "Black",
+ * "quantity": 1
+ * }]
+ * }
+ * }
+ * }
+ *
+ * }
+ */
+@AutoValue
+@DefaultSchema(AutoValueSchema.class)
+public abstract class Purchase {
+
+ @SchemaFieldName("transaction_id")
+ public @Nullable abstract String getItemName();
+
+ @SchemaFieldName("affiliation")
+ public @Nullable abstract String getItemId();
+
+ @SchemaFieldName("value")
+ public @Nullable abstract String getPrice();
+
+ @SchemaFieldName("tax")
+ public @Nullable abstract String getItemBrand();
+
+ @SchemaFieldName("shipping")
+ public @Nullable abstract String getItemCat01();
+
+ @SchemaFieldName("currency")
+ public @Nullable abstract String getItemCat02();
+
+ @SchemaFieldName("coupon")
+ public @Nullable abstract String getItemCat03();
+
+ public static Builder builder() {
+ return new AutoValue_Purchase.Builder();
+ }
+
+ @AutoValue.Builder
+ public abstract static class Builder {
+
+ public abstract Builder setItemName(String newItemName);
+
+ public abstract Builder setItemId(String newItemId);
+
+ public abstract Builder setPrice(String newPrice);
+
+ public abstract Builder setItemBrand(String newItemBrand);
+
+ public abstract Builder setItemCat01(String newItemCat01);
+
+ public abstract Builder setItemCat02(String newItemCat02);
+
+ public abstract Builder setItemCat03(String newItemCat03);
+
+ public abstract Purchase build();
+ }
+}
diff --git a/retail/retail-java-applications/data-engineering-dept/pipelines/src/main/java/com/google/dataflow/sample/retail/pipeline/RetailDataProcessingPipeline.java b/retail/retail-java-applications/data-engineering-dept/pipelines/src/main/java/com/google/dataflow/sample/retail/pipeline/RetailDataProcessingPipeline.java
index d40aa3a7..c7e901c4 100644
--- a/retail/retail-java-applications/data-engineering-dept/pipelines/src/main/java/com/google/dataflow/sample/retail/pipeline/RetailDataProcessingPipeline.java
+++ b/retail/retail-java-applications/data-engineering-dept/pipelines/src/main/java/com/google/dataflow/sample/retail/pipeline/RetailDataProcessingPipeline.java
@@ -30,6 +30,7 @@
import com.google.dataflow.sample.retail.businesslogic.core.transforms.transaction.TransactionProcessing;
import com.google.dataflow.sample.retail.businesslogic.core.utils.Print;
import com.google.dataflow.sample.retail.businesslogic.core.utils.ReadPubSubMsgPayLoadAsString;
+import com.google.dataflow.sample.retail.dataobjects.ClickStream.ClickStreamEvent;
import com.google.dataflow.sample.retail.dataobjects.Stock.StockEvent;
import com.google.dataflow.sample.retail.dataobjects.StockAggregation;
import com.google.dataflow.sample.retail.dataobjects.Transaction.TransactionEvent;
@@ -40,6 +41,7 @@
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.ToJson;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.TypeDescriptors;
@@ -53,7 +55,7 @@
@Experimental
public class RetailDataProcessingPipeline {
- @VisibleForTesting public PCollection