-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmongoDBConsumer.py
More file actions
31 lines (21 loc) · 844 Bytes
/
mongoDBConsumer.py
File metadata and controls
31 lines (21 loc) · 844 Bytes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from kafka import KafkaConsumer
import json
from pymongo import MongoClient
# mongoDB credentials
client =MongoClient("mongodb+srv://pydatademo:pydatademo@cluster0.2dcmv.mongodb.net/?retryWrites=true&w=majority")
db = client.websitelogs
# insert into MongoDB
def passToMongoDB(messagejson):
mycol = db["kafka_logs"]
x = mycol.insert_one(messagejson)
output = ['#156','#1768']
return output
# consume events from topic
consumer = KafkaConsumer('heartbeatlogs',
group_id='mongoDB_C',
bootstrap_servers=['localhost:9092'],
enable_auto_commit=True,
value_deserializer=lambda m: json.loads(m.decode('ascii')))
# loop over events and send it to MongoDB
for message in consumer:
passToMongoDB(message.value)