-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathProducer-Notebook.scala
More file actions
98 lines (81 loc) · 3.44 KB
/
Producer-Notebook.scala
File metadata and controls
98 lines (81 loc) · 3.44 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
// Databricks notebook source
import scala.collection.JavaConverters._
import com.microsoft.azure.eventhubs._
import java.util.concurrent._
import scala.collection.immutable._
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
val namespaceName = " << fill here >> "
val eventHubName = " << fill here >> "
val sasKeyName = " << fill here >> "
val sasKey = " << fill here >> "
val connStr = new ConnectionStringBuilder()
.setNamespaceName(namespaceName)
.setEventHubName(eventHubName)
.setSasKeyName(sasKeyName)
.setSasKey(sasKey)
val pool = Executors.newScheduledThreadPool(1)
// original code line: EventHubClient.create(connStr.toString(), pool
// edited by Subhasish in Feb2020.
val eventHubClient = EventHubClient.createFromConnectionString(connStr.toString(), pool)
def sleep(time: Long): Unit = Thread.sleep(time)
def sendEvent(message: String, delay: Long) = {
sleep(delay)
val messageData = EventData.create(message.getBytes("UTF-8"))
eventHubClient.get().send(messageData)
System.out.println("Sent event: " + message + "\n")
}
// Add your own values to the list
val testSource = List("Azure is the greatest!", "Azure isn't working :(", "Azure is okay.")
// Specify 'test' if you prefer to not use Twitter API and loop through a list of values you define in `testSource`
// Otherwise specify 'twitter'
val dataSource = "twitter"
if (dataSource == "twitter") {
import twitter4j._
import twitter4j.TwitterFactory
import twitter4j.Twitter
import twitter4j.conf.ConfigurationBuilder
// Twitter configuration!
// Replace values below with you
val twitterConsumerKey = " << fill here >> "
val twitterConsumerSecret = " << fill here >> "
val twitterOauthAccessToken = " << fill here >> "
val twitterOauthTokenSecret = " << fill here >> "
val cb = new ConfigurationBuilder()
cb.setDebugEnabled(true)
.setOAuthConsumerKey(twitterConsumerKey)
.setOAuthConsumerSecret(twitterConsumerSecret)
.setOAuthAccessToken(twitterOauthAccessToken)
.setOAuthAccessTokenSecret(twitterOauthTokenSecret)
val twitterFactory = new TwitterFactory(cb.build())
val twitter = twitterFactory.getInstance()
// Getting tweets with keyword in quotes " " and sending them to the Event Hub in realtime!
// Use whatever keyword you want to use for your own use-case.
val query = new Query("#Microsoft")
query.setCount(100)
query.lang("en")
var finished = false
while (!finished) {
val result = twitter.search(query)
val statuses = result.getTweets()
var lowestStatusId = Long.MaxValue
for (status <- statuses.asScala) {
if(!status.isRetweet()){
sendEvent(status.getText(), 5000)
}
lowestStatusId = Math.min(status.getId(), lowestStatusId)
}
query.setMaxId(lowestStatusId - 1)
}
} else if (dataSource == "test") {
// Loop through the list of test input data
while (true) {
testSource.foreach {
sendEvent(_,5000)
}
}
} else {
System.out.println("Unsupported Data Source. Set 'dataSource' to \"twitter\" or \"test\"")
}
// Closing connection to the Event Hub
eventHubClient.get().close()