diff --git a/divolte-kafka-streams/.gitignore b/divolte-kafka-streams/.gitignore
new file mode 100755
index 0000000..e7f58e1
--- /dev/null
+++ b/divolte-kafka-streams/.gitignore
@@ -0,0 +1,25 @@
+# Log file
+*.log
+
+# BlueJ files
+*.ctxt
+
+# Mobile Tools for Java (J2ME)
+.mtj.tmp/
+
+# Package Files #
+*.jar
+*.war
+*.ear
+*.zip
+*.tar.gz
+*.rar
+
+target/
+
+# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
+hs_err_pid*
+
+#IDE
+idea/
+*.iml
diff --git a/divolte-kafka-streams/README.md b/divolte-kafka-streams/README.md
new file mode 100755
index 0000000..4fc9329
--- /dev/null
+++ b/divolte-kafka-streams/README.md
@@ -0,0 +1,11 @@
+### Avro Data convert to Json as a new topic
+
+```
+mvn clean install
+mvn clean package
+```
+
+#### Run Consumer
+```
+java -jar target/divolte-kafka-streams-1.0-SNAPSHOT-jar-with-dependencies.jar
+```
diff --git a/divolte-kafka-streams/pom.xml b/divolte-kafka-streams/pom.xml
new file mode 100755
index 0000000..e714ab6
--- /dev/null
+++ b/divolte-kafka-streams/pom.xml
@@ -0,0 +1,133 @@
+
+
+ 4.0.0
+
+ com.divolte.kafka.streams
+ divolte-kafka-streams
+ 1.0-SNAPSHOT
+
+
+
+
+ confluent
+ http://packages.confluent.io/maven/
+
+
+
+
+ 1.8
+ 0.11.0.0-cp1
+ 2.11
+ ${kafka.scala.version}.8
+ 3.3.0
+ 2.2.6
+ 0.13.0
+ 1.8.2
+ 0.9.2
+ 9.2.12.v20150709
+ 2.8.8
+ 2.25
+ UTF-8
+
+
+
+
+ com.twitter
+ bijection-avro_2.10
+ 0.9.2
+
+
+
+ io.confluent
+ kafka-streams-avro-serde
+ ${confluent.version}
+
+
+ io.confluent
+ kafka-avro-serializer
+ ${confluent.version}
+
+
+ io.confluent
+ kafka-schema-registry-client
+ ${confluent.version}
+
+
+
+
+ org.apache.kafka
+ kafka-streams
+ 0.11.0.0
+
+
+
+
+ org.slf4j
+ slf4j-api
+ 1.7.25
+
+
+
+
+ org.slf4j
+ slf4j-log4j12
+ 1.7.25
+
+
+
+
+ junit
+ junit
+ 4.12
+ test
+
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.6.1
+
+ 1.8
+ 1.8
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-assembly-plugin
+ 2.5.2
+
+
+ jar-with-dependencies
+
+
+
+ true
+ com.divolte.kafka.streams.StreamsDivolteApp
+
+
+
+
+
+ assemble-all
+ package
+
+ single
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/divolte-kafka-streams/src/main/java/com/divolte/kafka/streams/AvroUtil.java b/divolte-kafka-streams/src/main/java/com/divolte/kafka/streams/AvroUtil.java
new file mode 100755
index 0000000..cbd3fa2
--- /dev/null
+++ b/divolte-kafka-streams/src/main/java/com/divolte/kafka/streams/AvroUtil.java
@@ -0,0 +1,39 @@
+package com.divolte.kafka.streams;
+
+import java.io.IOException;
+import java.io.EOFException;
+import java.io.InputStream;
+import java.io.*;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.io.*;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.Schema;
+import com.twitter.bijection.Injection;
+import com.twitter.bijection.avro.GenericAvroCodecs;
+
+public class AvroUtil {
+ public static String transform(byte[] value) {
+ String returnVal = "";
+ try {
+ Schema.Parser parser = new Schema.Parser();
+ Schema schema = parser.parse(new File("src/main/resources/MyEventRecord.avsc"));
+ GenericRecord avroRecord = new GenericData.Record(schema);
+ returnVal = avroRecord.toString();
+
+ Injection recordInjection = GenericAvroCodecs.toBinary(schema);
+ GenericRecord record = recordInjection.invert(value).get();
+
+ returnVal = record.toString();
+ } catch (Exception e) {
+ String ex = e.toString();
+ }
+
+ return returnVal;
+ }
+
+}
diff --git a/divolte-kafka-streams/src/main/java/com/divolte/kafka/streams/StreamsDivolteApp.java b/divolte-kafka-streams/src/main/java/com/divolte/kafka/streams/StreamsDivolteApp.java
new file mode 100755
index 0000000..77a7fbc
--- /dev/null
+++ b/divolte-kafka-streams/src/main/java/com/divolte/kafka/streams/StreamsDivolteApp.java
@@ -0,0 +1,40 @@
+package com.divolte.kafka.streams;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import java.util.Properties;
+import org.apache.kafka.common.serialization.Serde;
+
+public class StreamsDivolteApp {
+
+ public static void main(String[] args) {
+
+ Properties config = new Properties();
+ config.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-divolte-app");
+ config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.102:9092");
+ config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+ config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+
+ KStreamBuilder builder = new KStreamBuilder();
+ final Serde stringSerde = Serdes.String();
+ final Serde byteArraySerde = Serdes.ByteArray();
+
+ builder.stream(stringSerde, byteArraySerde, "divolte-data")
+ .mapValues(value -> AvroUtil.transform(value))
+ .to("divolte-json");
+
+ KafkaStreams streams = new KafkaStreams(builder, config);
+ streams.cleanUp(); // only do this in dev - not in prod
+ streams.start();
+
+ // print the topology
+ System.out.println(streams.toString());
+
+ // shutdown hook to correctly close the streams application
+ Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
+ }
+}
diff --git a/divolte-kafka-streams/src/main/resources/MyEventRecord.avsc b/divolte-kafka-streams/src/main/resources/MyEventRecord.avsc
new file mode 100755
index 0000000..1d16802
--- /dev/null
+++ b/divolte-kafka-streams/src/main/resources/MyEventRecord.avsc
@@ -0,0 +1,41 @@
+{
+ "namespace": "io.divolte.record",
+ "type": "record",
+ "name": "MyEventRecord",
+ "fields": [
+ { "name": "userId", "type": ["null", "int"], "default": null },
+ { "name": "q", "type": ["null", "string"], "default": null },
+ { "name": "page", "type": ["null", "string"], "default": null },
+ { "name": "n", "type": ["null", "int"], "default": null },
+ { "name": "cookieCustom", "type": ["null", "string"], "default": null },
+ { "name": "detectedDuplicate", "type": ["null", "boolean"], "default": null },
+ { "name": "detectedCorruption", "type": ["null", "boolean"], "default": null },
+ { "name": "firstInSession", "type": ["null", "boolean"], "default": null },
+ { "name": "timestamp", "type": ["null", "long"], "default": null },
+ { "name": "clientTimestamp", "type": ["null", "long"], "default": null },
+ { "name": "remoteHost", "type": "string", "default": null },
+ { "name": "referer", "type": ["null", "string"], "default": null },
+ { "name": "location", "type": ["null", "string"], "default": null },
+ { "name": "viewportPixelWidth", "type": ["null", "int"], "default": null },
+ { "name": "viewportPixelHeight", "type": ["null", "int"], "default": null },
+ { "name": "screenPixelWidth", "type": ["null", "int"], "default": null },
+ { "name": "screenPixelHeight", "type": ["null", "int"], "default": null },
+ { "name": "devicePixelRatio", "type": ["null", "int"], "default": null },
+ { "name": "partyId", "type": ["null", "string"], "default": null },
+ { "name": "sessionId", "type": ["null", "string"], "default": null },
+ { "name": "pageViewId", "type": ["null", "string"], "default": null },
+ { "name": "eventType", "type": "string", "default": "unknown" },
+ { "name": "eventId", "type": "string", "default": "unknown" },
+ { "name": "localPath", "type": ["null", "string"], "default": null },
+ { "name": "userAgentString", "type": ["null", "string"], "default": null },
+ { "name": "userAgentName", "type": ["null", "string"], "default": null },
+ { "name": "userAgentFamily", "type": ["null", "string"], "default": null },
+ { "name": "userAgentVendor", "type": ["null", "string"], "default": null },
+ { "name": "userAgentType", "type": ["null", "string"], "default": null },
+ { "name": "userAgentVersion", "type": ["null", "string"], "default": null },
+ { "name": "userAgentDeviceCategory", "type": ["null", "string"], "default": null },
+ { "name": "userAgentOsFamily", "type": ["null", "string"], "default": null },
+ { "name": "userAgentOsVersion", "type": ["null", "string"], "default": null },
+ { "name": "userAgentOsVendor", "type": ["null", "string"], "default": null }
+ ]
+}
\ No newline at end of file
diff --git a/divolte-kafka-streams/src/main/resources/log4j.properties b/divolte-kafka-streams/src/main/resources/log4j.properties
new file mode 100755
index 0000000..d511cbd
--- /dev/null
+++ b/divolte-kafka-streams/src/main/resources/log4j.properties
@@ -0,0 +1,5 @@
+log4j.rootLogger=INFO, stdout
+
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%p %m (%c:%L) %n
\ No newline at end of file
diff --git a/divolte-kafka-streams/src/main/resources/mapping.groovy b/divolte-kafka-streams/src/main/resources/mapping.groovy
new file mode 100755
index 0000000..c24abba
--- /dev/null
+++ b/divolte-kafka-streams/src/main/resources/mapping.groovy
@@ -0,0 +1,45 @@
+mapping {
+ map {parse eventParameters().value('userId') to int32 } onto 'userId'
+
+ map eventType() onto 'eventType'
+ map firstInSession() onto 'firstInSession'
+ map timestamp() onto 'timestamp'
+ map remoteHost() onto 'remoteHost'
+ map duplicate() onto 'detectedDuplicate'
+ map corrupt() onto 'detectedCorruption'
+ map clientTimestamp() onto 'clientTimestamp'
+ map eventId() onto 'eventId'
+ map cookie('cookieCustom') onto 'cookieCustom'
+
+ map referer() onto 'referer'
+ map location() onto 'location'
+ map viewportPixelWidth() onto 'viewportPixelWidth'
+ map viewportPixelHeight() onto 'viewportPixelHeight'
+ map screenPixelWidth() onto 'screenPixelWidth'
+ map screenPixelHeight() onto 'screenPixelHeight'
+ map devicePixelRatio() onto 'devicePixelRatio'
+ map partyId() onto 'partyId'
+ map sessionId() onto 'sessionId'
+ map pageViewId() onto 'pageViewId'
+
+ map userAgentString() onto 'userAgentString'
+ def ua = userAgent()
+ map ua.name() onto 'userAgentName'
+ map ua.family() onto 'userAgentFamily'
+ map ua.vendor() onto 'userAgentVendor'
+ map ua.type() onto 'userAgentType'
+ map ua.version() onto 'userAgentVersion'
+ map ua.deviceCategory() onto 'userAgentDeviceCategory'
+ map ua.osFamily() onto 'userAgentOsFamily'
+ map ua.osVersion() onto 'userAgentOsVersion'
+ map ua.osVendor() onto 'userAgentOsVendor'
+
+ def locationUri = parse location() to uri
+ def localUri = parse locationUri.rawFragment() to uri
+ map localUri.path() onto 'localPath'
+
+ def localQuery = localUri.query()
+ map localQuery.value('q') onto 'q'
+ map localQuery.value('page') onto 'page'
+ map { parse localQuery.value('n') to int32 } onto 'n'
+}
\ No newline at end of file