Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion retail/retail-java-applications/README.MD
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ The application processes the following types of data:
------
# To run the pipeline locally you can make use of the smoke test
```
gradlew :data-engineering-dept:pipelines:test --tests com.google.dataflow.sample.retail.pipeine.test.RetailDataProcessingPipelineSimpleSmokeTest --info --rerun-tasks
gradlew :data-engineering-dept:pipelines:test --tests RetailDataProcessingPipelineSimpleSmokeTest --info --rerun-tasks
```
# Task patterns
The application contains a number of task patterns that show the best way to accomplish Java programming tasks that are commonly needed to create this type of application.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,19 @@ tasks.matching {task -> task.name.startsWith('spotbugs')}.forEach {
}
}

test {
testLogging {
events "passed", "skipped", "failed" //, "standardOut", "standardError"

showExceptions true
exceptionFormat "full"
showCauses true
showStackTraces true

showStandardStreams = false
}
}

dependencies {
compile project(':data-engineering-dept:data-objects')

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@

/**
* This transform creates sessions from the incoming clickstream using the cliendId and
* SessionWindows. The output is a ROW with schema:
* SessionWindows. The output is a
*
* <pre>{@code
* Field Name Field Type
* key ROW{clientID:STRING}
* values ITERABLE[ROW[{@link com.google.dataflow.sample.retail.dataobjects.ClickStream.ClickStreamEvent}]]
* Field Name Field Type
* key ROW{clientID:STRING}
* value ITERABLE[ROW[ClickstreamEvent]]
* }</pre>
*/
@Experimental
Expand Down Expand Up @@ -71,17 +71,19 @@ public ClickStreamSessions withSessionWindowGapDuration(Duration sessionWindowGa
}

@Override
/**
* Returns a Row object in the format:
*
* <pre>{@code
* Field Name Field Type
* key ROW{clientID:STRING}
* values ITERABLE[ROW[ClickstreamEvent]]
* }</pre>
*/
public PCollection<Row> expand(PCollection<Row> input) {
Preconditions.checkNotNull(this.getSessionWindowGapDuration(), "Must set a session value.");

Preconditions.checkNotNull(
this.getSessionWindowGapDuration(), "Must set a session gap duration.");
/*
* Group.byFiledNames returns a Row object in the format:
*
* <pre>{@code
* Field Name Field Type
* key ROW{clientID:STRING}
* value ITERABLE[ROW[ClickstreamEvent]]
* }</pre>
*/
return input
.apply(Window.into(Sessions.withGapDuration(getSessionWindowGapDuration())))
.apply(Group.byFieldNames("client_id"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,7 @@ public PCollection<PageViewAggregator> expand(PCollection<Row> input) {
.apply(Filter.<Row>create().whereFieldName("event", c -> c.equals("browse")))
// Group By pageRef and count the results.
.apply(Window.into(FixedWindows.of(pageViewCountWindowDuration)))
.apply(
Group.<Row>byFieldNames("pageRef")
.aggregateField("pageRef", Count.combineFn(), "count"))
.apply(Group.<Row>byFieldNames("page").aggregateField("page", Count.combineFn(), "count"))
.apply(CreatePageViewAggregatorMetadata.create(pageViewCountWindowDuration.getMillis()));
}

Expand Down Expand Up @@ -97,14 +95,14 @@ public PCollection<PageViewAggregator> expand(PCollection<Row> input) {
// TODO the schema registry for PageViewAggregator throws a class cast issue
Schema schema =
Schema.of(
Field.of("pageRef", FieldType.STRING),
Field.of("page", FieldType.STRING),
Field.of("count", FieldType.INT64),
Field.of("startTime", FieldType.INT64),
Field.of("durationMS", FieldType.INT64));

return input
// Note key and value are results of Group + Count operation in the previous transform.
.apply(Select.fieldNames("key.pageRef", "value.count"))
.apply(Select.fieldNames("key.page", "value.count"))
// We need to add these fields to the ROW object before we convert the POJO
.apply(
AddFields.<Row>create()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,12 @@ public void process(@Element PageViewAggregator input, OutputReceiver<Mutation>
throws UnsupportedEncodingException {

Put put =
new Put(
String.format("%s-%s", input.getPageRef(), input.getStartTime()).getBytes("UTF-8"));
new Put(String.format("%s-%s", input.getPage(), input.getStartTime()).getBytes("UTF-8"));

String charset = "UTF-8";

// TODO This should never be Null eliminate bug.
String pageRef = Optional.ofNullable(input.getPageRef()).orElse("");
String pageRef = Optional.ofNullable(input.getPage()).orElse("");
Long count = Optional.ofNullable(input.getCount()).orElse(0L);

put.addColumn(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package com.google.dataflow.sample.retail.businesslogic.core.transforms.clickstream.validation;

import com.google.dataflow.sample.retail.businesslogic.externalservices.RetailCompanyServices;
import com.google.dataflow.sample.retail.dataobjects.ClickStream.Item;
import com.google.dataflow.sample.retail.dataobjects.Item;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
Expand Down Expand Up @@ -73,7 +73,7 @@ public void finishBundle(FinishBundleContext fbc) {
for (_WindowWrappedEvent event : cache) {

Row row = event.eventData.getRow("data");
Collection<Row> items = row.getArray("items");
Collection<Row> items = row.getRow("ecommerce").getArray("items");
List<Row> updatedItems = new ArrayList<>();

for (Row item : items) {
Expand All @@ -88,7 +88,9 @@ public void finishBundle(FinishBundleContext fbc) {
.build());
}

Row newDataRow = Row.fromRow(row).withFieldValue("items", updatedItems).build();
Row itemsRow =
Row.fromRow(row.getRow("ecommerce")).withFieldValue("items", updatedItems).build();
Row newDataRow = Row.fromRow(row).withFieldValue("ecommerce", itemsRow).build();

fbc.output(
Row.fromRow(event.eventData).withFieldValue("data", newDataRow).build(),
Expand All @@ -108,6 +110,7 @@ private List<String> populateIds(List<_WindowWrappedEvent> events) {
x ->
x.eventData
.getRow("data")
.getRow("ecommerce")
.getArray("items")
.forEach(y -> ids.add(((Row) y).getString("item_id"))));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public void process(@Element Row input, @Timestamp Instant timestamp, MultiOutpu
// If the event is of a type that needs Item to be present, do checks
if (chkItemRequired(data)) {

Collection<Row> items = data.getArray("items");
Collection<Row> items = data.getRow("ecommerce").getArray("items");
// If no items this is not recoverable, send to dead letter.
if (items == null || items.size() == 0) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/
package com.google.dataflow.sample.retail.businesslogic.externalservices;

import com.google.dataflow.sample.retail.dataobjects.ClickStream.Item;
import com.google.dataflow.sample.retail.dataobjects.Item;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down
Loading