-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathPeriodStats.py
More file actions
121 lines (93 loc) · 3.75 KB
/
PeriodStats.py
File metadata and controls
121 lines (93 loc) · 3.75 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
from saleae.range_measurements import DigitalMeasurer
from math import sqrt
# 'User' parameters
class RunningSD:
def __init__(self):
self.n = 0
self.oldMean = 0
self.oldSum = 0
def add(self, value):
self.n += 1
if self.n < 2:
self.oldMean = value
self.newMean = value
self.oldSum = 0.0
return
self.newMean = self.oldMean + (value - self.oldMean) / float(self.n)
self.newSum = self.oldSum + (value - self.oldMean) * (value - self.newMean)
self.oldMean = self.newMean
self.oldSum = self.newSum
def StdDev(self):
if self.n > 1:
return sqrt(self.newSum / float(self.n - 1))
return 0.0
class PeriodStatsMeasurer(DigitalMeasurer):
supported_measurements = ['pMin', 'pMax', 'pSDev']
'''
Initialize PeriodStatsMeasurer object instance. An instance is created for
each measurement session so this code is called once at the start of each
measurement.
process_data(...) is then called multiple times to process data in time
order.
After all data has been processed measure(...) is called to complete
analysis and return a dictionary of results.
'''
def __init__(self, requested_measurements):
super().__init__(requested_measurements)
self.pMin = None
self.pMean = 0.0
self.pMax = None
self.periodCount = 0.0
self.lastTime = None
self.lastState = None
self.SDev = RunningSD()
# kWantRisingEdge is set False for falling edge driven measurements or True for
# rising edge driven measurements based on the direction of the first edge
# seen.
self.mWantRisingEdge = None
'''
process_data() will be called one or more times per measurement with batches
of data.
data has the following interface:
* Iterate over data to get transitions in the form of pairs of
`Time`, Bitstate (`True` for high, `False` for low)
* The first datum is at the first transition
`Time` currently only allows taking a difference with another `Time`, to
produce a `float` number of seconds
'''
def process_data(self, data):
for t, bitstate in data:
if self.mWantRisingEdge == None:
self.mWantRisingEdge = bitstate
if bitstate != self.mWantRisingEdge:
continue
if self.lastState is None:
self.lastState = bitstate
self.lastTime = t
# Can't generate stats for the first edge
continue
timeDelta = float(t - self.lastTime)
self.lastTime = t
# Interesting edge - rising edge if bitstate is true
self.periodCount += 1
self.pMean += timeDelta
self.SDev.add(timeDelta)
if self.pMin == None or timeDelta < self.pMin:
self.pMin = timeDelta
if self.pMax == None or timeDelta > self.pMax:
self.pMax = timeDelta
'''
measure(...) is called after all the relevant data has been processed by
process_data(...). It returns a dictionary of measurement results.
'''
def measure(self):
values = {}
if self.pMin != None:
values['pMin'] = self.pMin
values['pMean'] = self.pMean / self.periodCount
values['pMax'] = self.pMax
values['pSDev'] = self.SDev.StdDev()
values['pCount'] = self.periodCount
if self.pMean != None and self.pMean > 0.0:
values['pFreq'] = self.periodCount / self.pMean
return values