-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathConsumer.scala
More file actions
68 lines (54 loc) · 2 KB
/
Consumer.scala
File metadata and controls
68 lines (54 loc) · 2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package mqtt
import akka.actor._
import akka.event.LoggingAdapter
import org.eclipse.paho.client.mqttv3._
import com.datastax.oss.driver.api.core.CqlSession
import com.datastax.oss.driver.api.querybuilder.QueryBuilder._
import com.datastax.oss.driver.api.querybuilder.update._
import com.datastax.oss.driver.api.querybuilder.relation._
import lib._
import mqtt.Producer.MqttEntry
import scala.concurrent.ExecutionContext
object Consumer {
def props(mqttClient: MqttClient, session: CqlSession) =
Props(classOf[Consumer], mqttClient, session)
final case class Arrived(message: MqttMessage)
}
class Consumer(mqttClient: MqttClient, session: CqlSession)
extends Actor with ActorLogging {
import Consumer._
private[this] val conf = Config.get
implicit val system: ActorSystem = context.system
implicit val executionContext: ExecutionContext = system.dispatcher
implicit val logger: LoggingAdapter = log
mqttClient.subscribe(conf.mqtt.topic)
mqttClient.setCallback(new MqttCallback {
override def messageArrived(topic: String, message: MqttMessage): Unit = {
self ! Arrived(message)
}
override def connectionLost(cause: Throwable): Unit = {
log.info("Connection lost", cause)
}
override def deliveryComplete(token: IMqttDeliveryToken): Unit = {
}
})
override def postStop(): Unit = {
mqttClient.disconnect()
}
override def receive: Receive = {
case Arrived(message) =>
val serializer = new BinarySerializer()
val entry = serializer.fromBinary(
message.getPayload,
BinarySerializer.MqttEntryManifest
).asInstanceOf[MqttEntry]
val statement = update(conf.cassandra.table).set(
Assignment.setColumn("value", literal(entry.value)),
Assignment.setColumn("anomaly", literal(entry.anomaly)),
).where(
Relation.column("sensor").isEqualTo(literal(entry.sensor)),
Relation.column("ts").isEqualTo(literal(java.time.Instant.now()))
).build()
session.execute(statement)
}
}