-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathKafkaServer.scala
More file actions
104 lines (93 loc) · 3.7 KB
/
KafkaServer.scala
File metadata and controls
104 lines (93 loc) · 3.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
package com.cloudwick.spark.embedded
import java.nio.file.Files
import java.util.Properties
import com.cloudwick.logging.LazyLogging
import kafka.admin.AdminUtils
import kafka.server.{KafkaConfig, KafkaServerStartable}
import kafka.utils.ZKStringSerializer
import org.I0Itec.zkclient.ZkClient
import scala.concurrent.duration._
/**
* Runs an in-memory, "embedded" instance of a Kafka broker, which listens at `127.0.0.1:9092`
* by default.
*
* Requires a running ZooKeeper instance to connect to. By default, it expects a ZooKeeper instance
* running at `127.0.0.1:2181`.
*
* @param config Broker configuration settings. Used to modify, for example, on which port the
* broker should listen to.
*/
class KafkaServer(config: Properties = new Properties) extends LazyLogging {
private val defaultZkConnect = "127.0.0.1:2181"
private val logDir = Files.createTempDirectory(this.getClass.getSimpleName)
private val effectiveConfig = {
val c = new Properties
c.load(this.getClass.getResourceAsStream("/broker-defaults.properties"))
c.putAll(config)
c.setProperty("log.dirs", logDir.toString)
c
}
private val kafkaConfig = new KafkaConfig(effectiveConfig)
private val kafka = new KafkaServerStartable(kafkaConfig)
// This is broker's `metadata.broker.list` value. Example: `127.0.0.1:9092`.
val brokerList = kafka.serverConfig.hostName + ":" + kafka.serverConfig.port
// The ZooKeeper connection string aka `zookeeper.connect`.
val zookeeperConnect = {
val zkConnectLookup = Option(effectiveConfig.getProperty("zookeeper.connect"))
zkConnectLookup match {
case Some(zkConnect) => zkConnect
case _ =>
logger.warn(s"zookeeper.connect is not configured -- falling back to default " +
s"setting $defaultZkConnect")
defaultZkConnect
}
}
/**
* Start the broker
*/
def start() {
logger.debug(s"Starting embedded Kafka broker at $brokerList (with ZK server " +
s"at $zookeeperConnect) ...")
kafka.startup()
logger.debug(s"Startup of embedded Kafka broker at $brokerList completed (with ZK server " +
s"at $zookeeperConnect)")
}
/**
* Stop the broker
*/
def stop() {
logger.debug(s"Shutting down embedded Kafka broker at $brokerList (with ZK server " +
s"at $zookeeperConnect)...")
kafka.shutdown()
Files.deleteIfExists(logDir)
logger.debug(s"Shutdown of embedded Kafka broker at $brokerList completed (with ZK server " +
s"at $zookeeperConnect)")
}
/**
* Creates a topic with specified name
* @param topic name of the topic to create
* @param partitions number of partitions for the topic
* @param replicationFactor replication factor for the topic
* @param config kafka configuration properties
*/
def createTopic(topic: String,
partitions: Int = 1,
replicationFactor: Int = 1,
config: Properties = new Properties): Unit = {
logger.debug(s"Creating topic { name: $topic, partitions: $partitions, " +
s"replicationFactor: $replicationFactor, config: $config }")
val sessionTimeout = 10.seconds
val connectionTimeout = 8.seconds
// Note: You must initialize the ZkClient with ZKStringSerializer. If you don't, then
// createTopic() will only seem to work (it will return without error). Topic will exist in
// only ZooKeeper, and will be returned when listing topics, but Kafka itself does not create
// the topic.
val zkClient = new ZkClient(zookeeperConnect,
sessionTimeout.toMillis.toInt,
connectionTimeout.toMillis.toInt,
ZKStringSerializer
)
AdminUtils.createTopic(zkClient, topic, partitions, replicationFactor, config)
zkClient.close()
}
}