From d3a7b420f6943ef5141932d22b639ccb389296b8 Mon Sep 17 00:00:00 2001 From: banecogic Date: Wed, 25 Jan 2017 10:46:13 +0100 Subject: [PATCH] SAMZA-859: Create a simple join example in hello-samza tutorail (docs) --- .../versioned/container/state-management.md | 2 +- docs/learn/tutorials/versioned/index.md | 3 +- .../versioned/samza-joining-streams.md | 80 +++++++++++++++++++ 3 files changed, 83 insertions(+), 2 deletions(-) create mode 100644 docs/learn/tutorials/versioned/samza-joining-streams.md diff --git a/docs/learn/documentation/versioned/container/state-management.md b/docs/learn/documentation/versioned/container/state-management.md index 86b0d44176..d883e5e3ff 100644 --- a/docs/learn/documentation/versioned/container/state-management.md +++ b/docs/learn/documentation/versioned/container/state-management.md @@ -263,7 +263,7 @@ Implementation: The job subscribes to the stream of user profile updates and the If the next stage needs to aggregate by ZIP code, the ZIP code can be used as the partitioning key of the job's output stream. That ensures that all the events for the same ZIP code are sent to the same stream partition. -#### Stream-stream join +#### Stream-stream join *Example: Join a stream of ad clicks to a stream of ad impressions (to link the information on when the ad was shown to the information on when it was clicked)* diff --git a/docs/learn/tutorials/versioned/index.md b/docs/learn/tutorials/versioned/index.md index 6d6295ff7b..be8b0b9193 100644 --- a/docs/learn/tutorials/versioned/index.md +++ b/docs/learn/tutorials/versioned/index.md @@ -33,10 +33,11 @@ title: Tutorials [Samza Async API and Multithreading User Guide](samza-async-user-guide.html) +[Joining streams](samza-joining-streams.html) + + +The tutorial assumes you have successfully run [hello-samza](../../../startup/hello-samza/{{site.version}}/). +This tutorial represents stream-stream join use case implemented with key-value stores. +If you are not familiar with Samza's state management or a "stream join" term you should take a look into Samza's [State Management](../../documentation/{{site.version}}/container/state-management.html). +This tutorial follows the example described [here](../../documentation/{{site.version}}/container/state-management.html#stream-stream-join-example). + +### Produce some ad events + +Before producing ad impression and click events, we assume that you already got Hello Samza code, started Samza grid and built a Samza job package. If not, check [hello-samza's](../../../startup/hello-samza/{{site.version}}/) first three steps. +In this example we will rely on Kafka system. Before running the job you need to create kafka topics that you will be using. +We made a script that will create all required kafka topics and start producing raw ad impression and click events. It produces to localhost:9092 as the Kafka broker and uses localhost:2181 as zookeeper. +Raw ad impression and click events look like this: + +{% highlight bash %} +impression-id=1 type=impression advertiser-id=1 ip=111.111.111.* agent=Chrome timestamp=2017-01-01T00:00:00.000 +impression-id=1 type=click advertiser-id=1 ip=111.111.111.* agent=Chrome timestamp=2017-01-01T00:00:14.234 +{% endhighlight %} + +Make sure you navigate to the root hello-samza directory and run the script + +{% highlight bash %} +bin/produce-ad-event-data.sh +{% endhighlight %} + +### Run Samza jobs + +Now that you are producing raw ad events, you need to partition them by their impression ID. To do so, run the ad-event-feed job. + +{% highlight bash %} +bin/run-job.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/config/deploy/samza/config/ad-event-feed.properties +{% endhighlight %} + +Now you have partitioned ad impressions and ad clicks into 4 partitions each. Second Samza job will consume them and build joined events that join raw events and calculate passed time between impression and click event. + +{% highlight bash %} +bin/run-job.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/config/deploy/samza/config/ad-event-join.properties +{% endhighlight %} + +### The result + +Check out messages produced by jobs with + +{% highlight bash %} +deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic ad-imp-metadata +deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic ad-clk-metadata +deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic ad-join +{% endhighlight %} + +You can also produce events manually like this. For instance, following lines will produce one join event. + +{% highlight bash %} +deploy/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic ad-impression --property key.separator=, --property parse.key=true +11,impression-id=11 type=impression advertiser-id=1 ip=111.111.111.* agent=Chrome timestamp=2017-01-01T12:00:00.000 +{% endhighlight %} +{% highlight bash %} +deploy/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic ad-click --property key.separator=, --property parse.key=true +11,impression-id=11 type=click advertiser-id=1 ip=111.111.111.* agent=Chrome timestamp=2017-01-01T13:13:35.404 +{% endhighlight %} + +Congratulations! You have successfully run steam-stream join example implemented with Samza's key-value stores! \ No newline at end of file