Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,6 @@ Domains that are currently modeled out of the box:

* [Car Insurance] (https://github.com/pcodding/stream-simulator/tree/master/src/main/java/com/hortonworks/streaming/impl/domain/carinsurance)
* [GPS] (https://github.com/pcodding/stream-simulator/tree/master/src/main/java/com/hortonworks/streaming/impl/domain/gps)
sdfsdfs
* [Rental Car Pricing Simulation] (https://github.com/pcodding/stream-simulator/tree/master/src/main/java/com/hortonworks/streaming/impl/domain/rental)
* [Transportation (Trucks)] (https://github.com/pcodding/stream-simulator/tree/master/src/main/java/com/hortonworks/streaming/impl/domain/transport)
* [Transportation (Trucks)] (https://github.com/pcodding/stream-simulator/tree/master/src/main/java/com/hortonworks/streaming/impl/domain/transport)
48 changes: 25 additions & 23 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -45,49 +45,51 @@
<artifactId>activemq-core</artifactId>
<version>5.7.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.0.0</version>
</dependency>
</dependencies>
<build>
<finalName>stream-simulator</finalName>
<plugins>
<!--force java 8-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>2.4</version>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.1</version>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<mainClass>com.hortonworks.App</mainClass>
</manifest>
</archive>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>

<!--package as one fat jar-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<version>2.6</version>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.1.0</version>
<configuration>
<includeScope>runtime</includeScope>
<outputDirectory>${project.build.directory}</outputDirectory>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<addClasspath>true</addClasspath>

</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>copy-dependencies</id>
<id>assemble-all</id>
<phase>package</phase>
<goals>
<goal>copy-dependencies</goal>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.13</version>
<configuration>
<skipTests>true</skipTests>
</configuration>
</plugin>
</plugins>
</build>
</project>
4 changes: 2 additions & 2 deletions run.sh
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
#!/bin/bash
cd target
java -Xmx1024m -jar stream-simulator.jar "$@"
#cd target
java -Xmx1024m -cp target/stream-simulator-1.0-SNAPSHOT-jar-with-dependencies.jar com.hortonworks.App "$@"
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package com.hortonworks.streaming.impl.collectors;

import com.hortonworks.streaming.impl.domain.AbstractEventCollector;
import com.hortonworks.streaming.impl.messages.DumpStats;
import com.hortonworks.streaming.results.utils.ConfigurationUtil;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class UnsecuredKafkaEventCollector extends AbstractEventCollector {
private String topicName;
private String bootstrapServer;
private Properties props;
private org.apache.kafka.clients.producer.Producer<String, String> producer;

public UnsecuredKafkaEventCollector() {
super();
try {
bootstrapServer = ConfigurationUtil.getInstance().getProperty(
"kafka.bootstrap.servers");
topicName = ConfigurationUtil.getInstance().getProperty(
"kafka.topicName");
logger.info("Setting up bootstrap servers " + bootstrapServer+ " and producing to topic "+topicName);

props = new Properties();

props.put("bootstrap.servers", bootstrapServer);


props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");

props.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");

producer = new KafkaProducer<String, String>(props);

} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}

public UnsecuredKafkaEventCollector(int maxEvents) {
super(maxEvents);
}

@Override
public void onReceive(Object message) throws Exception {
logger.debug(message);
if (message instanceof DumpStats) {
logger.info("Processed " + numberOfEventsProcessed + " events");
}
try {
producer.send(new ProducerRecord<String, String>(topicName,message.toString()));
logger.debug(message.toString());
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
numberOfEventsProcessed++;
if (numberOfEventsProcessed != -1
&& numberOfEventsProcessed == maxNumberOfEvents) {
logger.info("Maximum number of messages processed, exiting");
this.getContext().system().shutdown();
System.exit(0);
}
}
}
4 changes: 3 additions & 1 deletion src/main/resources/config.properties
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
jms.host=sandbox
jms.port=61616
jms.queue=stream_data
jms.queue=stream_data
kafka.bootstrap.servers=localhost:9092
kafka.topicName=test