2626
2727import com .salesforce .kafka .test .KafkaTestCluster ;
2828import com .salesforce .kafka .test .KafkaTestUtils ;
29+ import com .salesforce .kafka .test .listeners .BrokerListener ;
30+ import com .salesforce .kafka .test .listeners .PlainListener ;
31+ import com .salesforce .kafka .test .listeners .SaslPlainListener ;
32+ import com .salesforce .kafka .test .listeners .SaslSslListener ;
33+ import com .salesforce .kafka .test .listeners .SslListener ;
34+ import org .apache .commons .cli .CommandLine ;
35+ import org .apache .commons .cli .CommandLineParser ;
36+ import org .apache .commons .cli .DefaultParser ;
37+ import org .apache .commons .cli .HelpFormatter ;
38+ import org .apache .commons .cli .Option ;
39+ import org .apache .commons .cli .Options ;
40+ import org .apache .commons .cli .ParseException ;
2941import org .apache .kafka .clients .consumer .ConsumerRecords ;
3042import org .apache .kafka .clients .consumer .KafkaConsumer ;
3143import org .apache .kafka .common .serialization .StringDeserializer ;
3244import org .slf4j .Logger ;
3345import org .slf4j .LoggerFactory ;
3446
47+ import java .net .URL ;
3548import java .util .Collections ;
3649import java .util .Properties ;
3750
@@ -49,49 +62,147 @@ public class DevCluster {
4962 * @param args command line args.
5063 */
5164 public static void main (final String [] args ) throws Exception {
52- // Right now we accept one parameter, the number of nodes in the cluster.
53- final int clusterSize ;
54- if (args .length > 0 ) {
55- clusterSize = Integer .parseInt (args [0 ]);
56- } else {
57- clusterSize = 1 ;
58- }
65+ // Parse command line arguments
66+ final CommandLine cmd = parseArguments (args );
5967
68+ // Right now we accept one parameter, the number of nodes in the cluster.
69+ final int clusterSize = Integer .parseInt (cmd .getOptionValue ("size" ));
6070 logger .info ("Starting up kafka cluster with {} brokers" , clusterSize );
6171
72+ // Default to plaintext listener.
73+ BrokerListener listener = new PlainListener ();
74+
75+ final URL trustStore = DevCluster .class .getClassLoader ().getResource ("kafka.truststore.jks" );
76+ final URL keyStore = DevCluster .class .getClassLoader ().getResource ("kafka.keystore.jks" );
77+
78+ final Properties properties = new Properties ();
79+ if (cmd .hasOption ("sasl" ) && cmd .hasOption ("ssl" )) {
80+ listener = new SaslSslListener ()
81+ // SSL Options
82+ .withClientAuthRequired ()
83+ .withTrustStoreLocation (trustStore .getFile ())
84+ .withTrustStorePassword ("password" )
85+ .withKeyStoreLocation (keyStore .getFile ())
86+ .withKeyStorePassword ("password" )
87+ .withKeyPassword ("password" )
88+ // SASL Options.
89+ .withUsername ("kafkaclient" )
90+ .withPassword ("client-secret" );
91+ } else if (cmd .hasOption ("sasl" )) {
92+ listener = new SaslPlainListener ()
93+ .withUsername ("kafkaclient" )
94+ .withPassword ("client-secret" );
95+ } else if (cmd .hasOption ("ssl" )) {
96+ listener = new SslListener ()
97+ .withClientAuthRequired ()
98+ .withTrustStoreLocation (trustStore .getFile ())
99+ .withTrustStorePassword ("password" )
100+ .withKeyStoreLocation (keyStore .getFile ())
101+ .withKeyStorePassword ("password" )
102+ .withKeyPassword ("password" );
103+ }
104+
62105 // Create a test cluster
63- final KafkaTestCluster kafkaTestCluster = new KafkaTestCluster (clusterSize );
106+ final KafkaTestCluster kafkaTestCluster = new KafkaTestCluster (
107+ clusterSize ,
108+ properties ,
109+ Collections .singletonList (listener )
110+ );
64111
65112 // Start the cluster.
66113 kafkaTestCluster .start ();
67114
68- // Create a topic
69- final String topicName = "TestTopicA" ;
70- final int partitionsCount = 5 * clusterSize ;
71- final KafkaTestUtils utils = new KafkaTestUtils (kafkaTestCluster );
72- utils .createTopic (topicName , partitionsCount , (short ) clusterSize );
115+ // Create topics
116+ String [] topicNames = null ;
117+ if (cmd .hasOption ("topic" )) {
118+ topicNames = cmd .getOptionValues ("topic" );
119+
120+ for (final String topicName : topicNames ) {
121+ final KafkaTestUtils utils = new KafkaTestUtils (kafkaTestCluster );
122+ utils .createTopic (topicName , clusterSize , (short ) clusterSize );
73123
74- // Publish some data into that topic
75- for (int partition = 0 ; partition < partitionsCount ; partition ++) {
76- utils .produceRecords (1000 , topicName , partition );
124+ // Publish some data into that topic
125+ for (int partition = 0 ; partition < clusterSize ; partition ++) {
126+ utils .produceRecords (1000 , topicName , partition );
127+ }
128+ }
77129 }
78130
131+ // Log topic names created.
132+ if (topicNames != null ) {
133+ logger .info ("Created topics: {}" , String .join (", " , topicNames ));
134+ }
135+
136+ // Log how to connect to cluster brokers.
79137 kafkaTestCluster
80138 .getKafkaBrokers ()
81139 .stream ()
82140 .forEach ((broker ) -> {
83141 logger .info ("Started broker with Id {} at {}" , broker .getBrokerId (), broker .getConnectString ());
84142 });
85143
144+ // Log cluster connect string.
86145 logger .info ("Cluster started at: {}" , kafkaTestCluster .getKafkaConnectString ());
87146
88- runEndlessConsumer (topicName , utils );
89- runEndlessProducer (topicName , partitionsCount , utils );
147+ // runEndlessConsumer(topicName, utils);
148+ // runEndlessProducer(topicName, partitionsCount, utils);
90149
91150 // Wait forever.
92151 Thread .currentThread ().join ();
93152 }
94153
154+ private static CommandLine parseArguments (final String [] args ) throws ParseException {
155+ // create Options object
156+ final Options options = new Options ();
157+
158+ // add number of brokers
159+ options .addOption (Option .builder ("size" )
160+ .desc ("Number of brokers to start" )
161+ .required ()
162+ .hasArg ()
163+ .type (Integer .class )
164+ .build ()
165+ );
166+
167+ options .addOption (Option .builder ("topic" )
168+ .desc ("Create test topic" )
169+ .required (false )
170+ .hasArgs ()
171+ .build ()
172+ );
173+
174+ // Optionally enable SASL
175+ options .addOption (Option .builder ("sasl" )
176+ .desc ("Enable SASL authentication" )
177+ .required (false )
178+ .hasArg (false )
179+ .type (Boolean .class )
180+ .build ()
181+ );
182+
183+ // Optionally enable SSL
184+ options .addOption (Option .builder ("ssl" )
185+ .desc ("Enable SSL" )
186+ .required (false )
187+ .hasArg (false )
188+ .type (Boolean .class )
189+ .build ()
190+ );
191+
192+ try {
193+ final CommandLineParser parser = new DefaultParser ();
194+ return parser .parse (options , args );
195+ } catch (final Exception exception ) {
196+ System .out .println ("ERROR: " + exception .getMessage () + "\n " );
197+ final HelpFormatter formatter = new HelpFormatter ();
198+ formatter .printHelp ( "DevCluster" , options );
199+ System .out .println ("" );
200+ System .out .flush ();
201+
202+ throw exception ;
203+ }
204+ }
205+
95206 /**
96207 * Fire up a new thread running an endless producer script into the given topic and partitions.
97208 * @param topicName Name of the topic to produce records into.
0 commit comments