From 5bab043345addde4e93e274ac7944aeba36700cd Mon Sep 17 00:00:00 2001 From: Tijo Thomas Date: Tue, 18 Jun 2019 10:46:27 +0530 Subject: [PATCH 1/5] Support for pushing data to kafka --- .../impl/collectors/UnsecuredKafkaEventCollector.java | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 src/main/java/com/hortonworks/streaming/impl/collectors/UnsecuredKafkaEventCollector.java diff --git a/src/main/java/com/hortonworks/streaming/impl/collectors/UnsecuredKafkaEventCollector.java b/src/main/java/com/hortonworks/streaming/impl/collectors/UnsecuredKafkaEventCollector.java new file mode 100644 index 0000000..6c2a0d1 --- /dev/null +++ b/src/main/java/com/hortonworks/streaming/impl/collectors/UnsecuredKafkaEventCollector.java @@ -0,0 +1,4 @@ +package com.hortonworks.streaming.impl.collectors; + +public class UnsecuredKafkaEventCollector { +} From 55ee3ae77388da2722039f0b753f7f53f9adf01f Mon Sep 17 00:00:00 2001 From: Tijo Thomas Date: Tue, 18 Jun 2019 10:50:54 +0530 Subject: [PATCH 2/5] kafka client dependency --- pom.xml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pom.xml b/pom.xml index 975fd5a..0ab1f1b 100644 --- a/pom.xml +++ b/pom.xml @@ -45,6 +45,11 @@ activemq-core 5.7.0 + + org.apache.kafka + kafka-clients + 1.0.0 + stream-simulator From 146cfab35fbfa266c618919f6a960f113eba4926 Mon Sep 17 00:00:00 2001 From: Tijo Thomas Date: Tue, 18 Jun 2019 10:51:54 +0530 Subject: [PATCH 3/5] Kafka config details --- src/main/resources/config.properties | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/main/resources/config.properties b/src/main/resources/config.properties index 8cf5e2b..cca3402 100644 --- a/src/main/resources/config.properties +++ b/src/main/resources/config.properties @@ -1,3 +1,5 @@ jms.host=sandbox jms.port=61616 -jms.queue=stream_data \ No newline at end of file +jms.queue=stream_data +kafka.bootstrap.servers=c449-node2.squadron-labs.com:6667,c449-node3.squadron-labs.com:6667,c449-node4.squadron-labs.com:6667 +kafka.topicName=test \ No newline at end of file From 4b8b4bb97041a10cf7e8f519f07a5b5caf6fa98a Mon Sep 17 00:00:00 2001 From: tijoparacka Date: Mon, 23 Sep 2019 16:27:36 +0530 Subject: [PATCH 4/5] test --- README.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index d0dca67..d4cae7d 100644 --- a/README.md +++ b/README.md @@ -39,5 +39,6 @@ Domains that are currently modeled out of the box: * [Car Insurance] (https://github.com/pcodding/stream-simulator/tree/master/src/main/java/com/hortonworks/streaming/impl/domain/carinsurance) * [GPS] (https://github.com/pcodding/stream-simulator/tree/master/src/main/java/com/hortonworks/streaming/impl/domain/gps) +sdfsdfs * [Rental Car Pricing Simulation] (https://github.com/pcodding/stream-simulator/tree/master/src/main/java/com/hortonworks/streaming/impl/domain/rental) -* [Transportation (Trucks)] (https://github.com/pcodding/stream-simulator/tree/master/src/main/java/com/hortonworks/streaming/impl/domain/transport) \ No newline at end of file +* [Transportation (Trucks)] (https://github.com/pcodding/stream-simulator/tree/master/src/main/java/com/hortonworks/streaming/impl/domain/transport) From 78ff16ff44be0099d72aee8ec89d257b5a3ca954 Mon Sep 17 00:00:00 2001 From: tijoparacka Date: Mon, 23 Sep 2019 16:40:52 +0530 Subject: [PATCH 5/5] kafka support --- pom.xml | 43 ++++++------ run.sh | 4 +- .../UnsecuredKafkaEventCollector.java | 67 ++++++++++++++++++- src/main/resources/config.properties | 2 +- 4 files changed, 89 insertions(+), 27 deletions(-) diff --git a/pom.xml b/pom.xml index 0ab1f1b..9e89bd9 100644 --- a/pom.xml +++ b/pom.xml @@ -52,47 +52,44 @@ - stream-simulator + org.apache.maven.plugins - maven-jar-plugin - 2.4 + maven-compiler-plugin + 3.6.1 - - - true - com.hortonworks.App - - + 1.8 + 1.8 + + org.apache.maven.plugins - maven-dependency-plugin - 2.6 + maven-assembly-plugin + 3.1.0 - runtime - ${project.build.directory} + + jar-with-dependencies + + + + true + + + - copy-dependencies + assemble-all package - copy-dependencies + single - - org.apache.maven.plugins - maven-surefire-plugin - 2.13 - - true - - \ No newline at end of file diff --git a/run.sh b/run.sh index 4f7c8b1..1235390 100755 --- a/run.sh +++ b/run.sh @@ -1,3 +1,3 @@ #!/bin/bash -cd target -java -Xmx1024m -jar stream-simulator.jar "$@" +#cd target +java -Xmx1024m -cp target/stream-simulator-1.0-SNAPSHOT-jar-with-dependencies.jar com.hortonworks.App "$@" diff --git a/src/main/java/com/hortonworks/streaming/impl/collectors/UnsecuredKafkaEventCollector.java b/src/main/java/com/hortonworks/streaming/impl/collectors/UnsecuredKafkaEventCollector.java index 6c2a0d1..ccde3c0 100644 --- a/src/main/java/com/hortonworks/streaming/impl/collectors/UnsecuredKafkaEventCollector.java +++ b/src/main/java/com/hortonworks/streaming/impl/collectors/UnsecuredKafkaEventCollector.java @@ -1,4 +1,69 @@ package com.hortonworks.streaming.impl.collectors; -public class UnsecuredKafkaEventCollector { +import com.hortonworks.streaming.impl.domain.AbstractEventCollector; +import com.hortonworks.streaming.impl.messages.DumpStats; +import com.hortonworks.streaming.results.utils.ConfigurationUtil; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; + +import java.util.Properties; + +public class UnsecuredKafkaEventCollector extends AbstractEventCollector { + private String topicName; + private String bootstrapServer; + private Properties props; + private org.apache.kafka.clients.producer.Producer producer; + + public UnsecuredKafkaEventCollector() { + super(); + try { + bootstrapServer = ConfigurationUtil.getInstance().getProperty( + "kafka.bootstrap.servers"); + topicName = ConfigurationUtil.getInstance().getProperty( + "kafka.topicName"); + logger.info("Setting up bootstrap servers " + bootstrapServer+ " and producing to topic "+topicName); + + props = new Properties(); + + props.put("bootstrap.servers", bootstrapServer); + + + props.put("key.serializer", + "org.apache.kafka.common.serialization.StringSerializer"); + + props.put("value.serializer", + "org.apache.kafka.common.serialization.StringSerializer"); + + producer = new KafkaProducer(props); + + } catch (Exception e) { + logger.error(e.getMessage(), e); + } + } + + public UnsecuredKafkaEventCollector(int maxEvents) { + super(maxEvents); + } + + @Override + public void onReceive(Object message) throws Exception { + logger.debug(message); + if (message instanceof DumpStats) { + logger.info("Processed " + numberOfEventsProcessed + " events"); + } + try { + producer.send(new ProducerRecord(topicName,message.toString())); + logger.debug(message.toString()); + } catch (Exception e) { + logger.error(e.getMessage(), e); + } + numberOfEventsProcessed++; + if (numberOfEventsProcessed != -1 + && numberOfEventsProcessed == maxNumberOfEvents) { + logger.info("Maximum number of messages processed, exiting"); + this.getContext().system().shutdown(); + System.exit(0); + } + } } diff --git a/src/main/resources/config.properties b/src/main/resources/config.properties index cca3402..eef2e22 100644 --- a/src/main/resources/config.properties +++ b/src/main/resources/config.properties @@ -1,5 +1,5 @@ jms.host=sandbox jms.port=61616 jms.queue=stream_data -kafka.bootstrap.servers=c449-node2.squadron-labs.com:6667,c449-node3.squadron-labs.com:6667,c449-node4.squadron-labs.com:6667 +kafka.bootstrap.servers=localhost:9092 kafka.topicName=test \ No newline at end of file