diff --git a/README.md b/README.md index d0dca67..d4cae7d 100644 --- a/README.md +++ b/README.md @@ -39,5 +39,6 @@ Domains that are currently modeled out of the box: * [Car Insurance] (https://github.com/pcodding/stream-simulator/tree/master/src/main/java/com/hortonworks/streaming/impl/domain/carinsurance) * [GPS] (https://github.com/pcodding/stream-simulator/tree/master/src/main/java/com/hortonworks/streaming/impl/domain/gps) +sdfsdfs * [Rental Car Pricing Simulation] (https://github.com/pcodding/stream-simulator/tree/master/src/main/java/com/hortonworks/streaming/impl/domain/rental) -* [Transportation (Trucks)] (https://github.com/pcodding/stream-simulator/tree/master/src/main/java/com/hortonworks/streaming/impl/domain/transport) \ No newline at end of file +* [Transportation (Trucks)] (https://github.com/pcodding/stream-simulator/tree/master/src/main/java/com/hortonworks/streaming/impl/domain/transport) diff --git a/pom.xml b/pom.xml index 975fd5a..9e89bd9 100644 --- a/pom.xml +++ b/pom.xml @@ -45,49 +45,51 @@ activemq-core 5.7.0 + + org.apache.kafka + kafka-clients + 1.0.0 + - stream-simulator + org.apache.maven.plugins - maven-jar-plugin - 2.4 + maven-compiler-plugin + 3.6.1 - - - true - com.hortonworks.App - - + 1.8 + 1.8 + + org.apache.maven.plugins - maven-dependency-plugin - 2.6 + maven-assembly-plugin + 3.1.0 - runtime - ${project.build.directory} + + jar-with-dependencies + + + + true + + + - copy-dependencies + assemble-all package - copy-dependencies + single - - org.apache.maven.plugins - maven-surefire-plugin - 2.13 - - true - - \ No newline at end of file diff --git a/run.sh b/run.sh index 4f7c8b1..1235390 100755 --- a/run.sh +++ b/run.sh @@ -1,3 +1,3 @@ #!/bin/bash -cd target -java -Xmx1024m -jar stream-simulator.jar "$@" +#cd target +java -Xmx1024m -cp target/stream-simulator-1.0-SNAPSHOT-jar-with-dependencies.jar com.hortonworks.App "$@" diff --git a/src/main/java/com/hortonworks/streaming/impl/collectors/UnsecuredKafkaEventCollector.java b/src/main/java/com/hortonworks/streaming/impl/collectors/UnsecuredKafkaEventCollector.java new file mode 100644 index 0000000..ccde3c0 --- /dev/null +++ b/src/main/java/com/hortonworks/streaming/impl/collectors/UnsecuredKafkaEventCollector.java @@ -0,0 +1,69 @@ +package com.hortonworks.streaming.impl.collectors; + +import com.hortonworks.streaming.impl.domain.AbstractEventCollector; +import com.hortonworks.streaming.impl.messages.DumpStats; +import com.hortonworks.streaming.results.utils.ConfigurationUtil; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; + +import java.util.Properties; + +public class UnsecuredKafkaEventCollector extends AbstractEventCollector { + private String topicName; + private String bootstrapServer; + private Properties props; + private org.apache.kafka.clients.producer.Producer producer; + + public UnsecuredKafkaEventCollector() { + super(); + try { + bootstrapServer = ConfigurationUtil.getInstance().getProperty( + "kafka.bootstrap.servers"); + topicName = ConfigurationUtil.getInstance().getProperty( + "kafka.topicName"); + logger.info("Setting up bootstrap servers " + bootstrapServer+ " and producing to topic "+topicName); + + props = new Properties(); + + props.put("bootstrap.servers", bootstrapServer); + + + props.put("key.serializer", + "org.apache.kafka.common.serialization.StringSerializer"); + + props.put("value.serializer", + "org.apache.kafka.common.serialization.StringSerializer"); + + producer = new KafkaProducer(props); + + } catch (Exception e) { + logger.error(e.getMessage(), e); + } + } + + public UnsecuredKafkaEventCollector(int maxEvents) { + super(maxEvents); + } + + @Override + public void onReceive(Object message) throws Exception { + logger.debug(message); + if (message instanceof DumpStats) { + logger.info("Processed " + numberOfEventsProcessed + " events"); + } + try { + producer.send(new ProducerRecord(topicName,message.toString())); + logger.debug(message.toString()); + } catch (Exception e) { + logger.error(e.getMessage(), e); + } + numberOfEventsProcessed++; + if (numberOfEventsProcessed != -1 + && numberOfEventsProcessed == maxNumberOfEvents) { + logger.info("Maximum number of messages processed, exiting"); + this.getContext().system().shutdown(); + System.exit(0); + } + } +} diff --git a/src/main/resources/config.properties b/src/main/resources/config.properties index 8cf5e2b..eef2e22 100644 --- a/src/main/resources/config.properties +++ b/src/main/resources/config.properties @@ -1,3 +1,5 @@ jms.host=sandbox jms.port=61616 -jms.queue=stream_data \ No newline at end of file +jms.queue=stream_data +kafka.bootstrap.servers=localhost:9092 +kafka.topicName=test \ No newline at end of file