-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathRoom_mapping.py
More file actions
81 lines (59 loc) · 2.33 KB
/
Room_mapping.py
File metadata and controls
81 lines (59 loc) · 2.33 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import pandas as pd
import pymongo
from sklearn.neighbors import KNeighborsClassifier
class RoomMapper:
def __init__(self):
self.knn = KNeighborsClassifier(n_neighbors=2)
self.isTrained = False
self.allNetworks = set()
self.client = pymongo.MongoClient("mongodb://root:example@mongo:27017/")
self.db = self.client["beacon_blink"]
self.data = self.db["rooms"]
self.X = []
self.Y = []
def trained(self):
return self.isTrained
def retrain(self):
if self.isTrained:
self.isTrained = False
self.allNetworks = set()
self.knn = KNeighborsClassifier(n_neighbors=2)
self.X = []
self.Y = []
self.train()
def train(self):
print("MODEL IS TRAINING", flush=True)
if self.data.count_documents({}) == 0:
print("Database is empty. Model cannot be trained", flush=True)
return
number_of_all_scans = 0
for room in self.data.find():
number_of_all_scans += len(room["scan_results"])
for scan in room["scan_results"]:
self.Y.append(str(room["_id"]))
for network in scan:
self.allNetworks.add(network["bssid"])
df = pd.DataFrame(-100, index=range(number_of_all_scans), columns=list(self.allNetworks))
self.Y = pd.DataFrame(self.Y)
index = 0
for room in self.data.find():
for scan in room["scan_results"]:
for network in scan:
df.loc[index, network["bssid"]] = network["rssi"]
# print("DLA BSSID: ", network["bssid"] , " MOC: ", network["rssi"], flush=True)
index += 1
self.Y = self.Y.values.ravel()
self.knn.fit(df, self.Y)
self.isTrained = True
def get_device_location(self, scan_results):
X_predict = pd.DataFrame(-100, index=range(1), columns=list(self.allNetworks))
for scan in scan_results.scan_results:
if scan.bssid in self.allNetworks:
X_predict.loc[0, scan.bssid] = scan.rssi
tmp = self.knn.predict(X_predict)
print("RESULTS: ", tmp, flush=True)
return tmp[0]
if __name__ == "__main__":
RM = RoomMapper()
RM.train()
print(RM.get_device_location(None))