-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathSignalFinder.py
More file actions
166 lines (134 loc) · 4.76 KB
/
SignalFinder.py
File metadata and controls
166 lines (134 loc) · 4.76 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
import csv
import os.path as path
import datetime as dt
class DataMayMissingError(Exception):
def __init__(self):
self.message = 'Data may missing for the timestamp.'
def __str__(self):
return self.message
class PhaseNotExistError(Exception):
def __init__(self):
self.message = 'Phase may not exist.'
def __str__(self):
return self.message
class CSVNotFoundError(Exception):
def __init__(self, errID, errorDate):
self.message = "Signal %i @ %s is not in the database." % (errID, errorDate)
self.errSignalID = errID
self.errorDate = errorDate
def __str__(self):
return self.message
class DataEntry(object):
def __init__(self, timeObject, para, event):
self.dtObject = timeObject
self.Parameter = para
self.EventID = event
class SignalFinder(object):
def __init__(self, dataPath):
self.currList = None
self.currID = None
self.currDate = None
self.currPath = None
self.prevList = None
self.prevDate = None
self.prevPath = None
self.dataPath = dataPath
self.translateMap = {1: "Green", 8: "Yellow", 10: "Red"}
def findSignalStatus(self, phaseInterested, signalNumber, checkDtString=None, checkDtObject=None):
if checkDtString == None and checkDtObject == None:
raise ValueError("Must enter a timeStamp to check!")
if checkDtString != None:
checkDtObject = dt.datetime.strptime(checkDtString, '%Y-%m-%d %H:%M:%S')
thisDate = checkDtObject.strftime('%Y-%m-%d')
if self.currID != signalNumber or self.currDate != thisDate:
thisPath = path.join(self.dataPath, 'ID' + str(signalNumber), thisDate + '.csv')
try:
self.currList = self.loadCSV(thisPath)
except FileNotFoundError:
raise CSVNotFoundError(signalNumber, thisDate)
self.currDate = thisDate
self.currID = signalNumber
self.currPath = thisPath
# binary search
left = 0
right = self.currList.__len__() - 1
while left < right - 1:
mid = (left + right) // 2
if self.currList[mid].dtObject > checkDtObject:
right = mid - 1
else:
left = mid
currIndex = left
# print(self.currList[currIndex].dtObject)
# search back for phase
# while self.currList[currIndex].Parameter != phaseInterested or self.currList[
# currIndex].EventID not in self.translateMap:
# if self.currList[currIndex].dtObject < checkDtObject - dt.timedelta(minutes=30):
# raise DataMayMissingError
# if currIndex == -1:
# self.checkForPrevDay(phaseInterested, checkDtObject)
# currIndex -= 1
seenPhase = False
while True:
if self.currList[currIndex].Parameter == phaseInterested and seenPhase == False:
seenPhase = True
if self.currList[currIndex].dtObject < checkDtObject - dt.timedelta(minutes=30):
if seenPhase:
raise DataMayMissingError
else:
for count in range(0, 1000):
currIndex -= 1
if self.currList[currIndex].Parameter == phaseInterested:
raise DataMayMissingError
raise PhaseNotExistError
if currIndex == -1:
self.checkForPrevDay(phaseInterested, checkDtObject)
if self.currList[currIndex].Parameter == phaseInterested and self.currList[
currIndex].EventID in self.translateMap:
break
currIndex -= 1
return self.translateMap[self.currList[currIndex].EventID]
def checkForPrevDay(self, phaseInterested, checkDtObject):
# load prev date:
thisDate = (dt.datetime.strptime(self.currDate, "%Y-%m-%d") - dt.timedelta(days=1)).strftime('%Y-%m-%d')
if self.prevDate != thisDate:
thisPath = path.join(self.dataPath, 'ID' + str(self.currID), thisDate + '.csv')
try:
self.prevList = self.loadCSV(thisPath)
except FileNotFoundError:
raise CSVNotFoundError(self.currID, thisDate)
self.prevDate = thisDate
self.prevPath = thisPath
currIndex = -1
seenPhase = False
while True:
if self.prevList[currIndex].Parameter == phaseInterested and seenPhase == False:
seenPhase = True
if self.prevList[currIndex].dtObject < checkDtObject - dt.timedelta(minutes=30):
if seenPhase:
raise DataMayMissingError
else:
for count in range(0, 1000):
currIndex -= 1
if self.prevList[currIndex].Parameter == phaseInterested:
raise DataMayMissingError
raise PhaseNotExistError
if self.currList[currIndex].Parameter == phaseInterested and self.currList[
currIndex].EventID in self.translateMap:
break
currIndex -= 1
def loadCSV(self, csvPath):
currList = []
with open(csvPath) as csvfile:
reader: object = csv.reader(csvfile, delimiter=',')
reader.__next__()
for row in reader:
thisTime = row[0]
if thisTime[-3:-1] == '00':
thisTime = thisTime[:-3]
dtObj = dt.datetime.strptime(thisTime, '%Y-%m-%d %H:%M:%S.%f')
# 2019-05-06 00:00:08.700000000
else:
dtObj = dt.datetime.strptime(thisTime, '%Y-%m-%d %H:%M:%S')
currList.append(DataEntry(dtObj, float(row[3]), int(row[2])))
return currList