-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathscope_processing.py
More file actions
191 lines (161 loc) · 5.81 KB
/
scope_processing.py
File metadata and controls
191 lines (161 loc) · 5.81 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
#!/usr/bin/env python3
import logging
import numpy as np
from data_reader import get_channels, get_data
def find_state_at_edge(shot,
scope=4,
trigger_channel=4,
trigger=0.2,
xlabel='Time',
ylabel='Ampl',
edge_type='rising'):
"""
Find the start time and index of the start time for each channel.
Find the rising edge on the trigger channel, use that to set the start time
(note that the actuall trigger won't fire until ~1us after it's told to)
"""
trigger_data = get_data(shot, scope, trigger_channel)
if trigger_data is None:
raise FileNotFoundError(
'Failed to find shot '
'{shot}, scope {scope}, channel {channel}'.format(
shot=shot, scope=scope, channel=trigger_channel))
# define the edge finding function based on edge_type
def edge_found(y):
if edge_type == 'both':
return abs(y) >= trigger
elif edge_type == 'rising':
return y >= trigger
elif edge_type == 'falling':
return y <= trigger
# find when the (rising,falling,both) edge happens
start_time = min([
t for t, y in zip(trigger_data[xlabel], trigger_data[ylabel])
if edge_found(y)
])
# get the state of all channels at the tine of the trigger/edge
channels = get_channels(shot, scope)
channels = [x for x in channels if x != trigger_channel]
initial_values = {}
start_indexes = {}
for channel in channels:
data = get_data(shot, scope, channel)
start_indexes[channel] = max(
[i for i, t in enumerate(data[xlabel]) if t < start_time])
initial_values[channel] = np.mean(
data[ylabel][:int(start_indexes[channel] * 0.95)])
# return the state at the edge
return start_time, start_indexes, initial_values
def normalise(x):
xmax = max(x)
xmin = min(x)
xfactor = 1 / (xmax - xmin)
xout = [(i - xmin) * xfactor for i in x]
return xout
def get_normalised_data(shot,
scope,
trigger_channel=4,
trigger=0.2,
xlabel='Time',
ylabel='Ampl'):
"""Get the data for shot/scope number, set t=0 from trigger channel."""
start, indexes, values = find_state_at_edge(
shot,
scope,
trigger_channel=trigger_channel,
trigger=trigger,
xlabel=xlabel,
ylabel=ylabel,
edge_type='rising')
channels = get_channels(shot, scope)
channels = [x for x in channels if x != trigger_channel]
ys, ts = {}, {}
for channel in channels:
data = get_data(shot, scope, channel)
t = data[xlabel][indexes[channel]:]
t0 = t[0]
t = [x - t0 for x in t]
ts[channel] = t
y = data[ylabel][indexes[channel]:]
y = [a - values[channel] for a in y]
ys[channel] = y
return ts, ys
def smooth_simple(array, kernel_size=150):
kernel = np.ones(kernel_size) / kernel_size
smoothed = np.convolve(array, kernel, mode='same')
return smoothed
def smooth(ts, ys, kernel_size=150):
"""Smooth each channel with kernel of given size."""
kernel = np.ones(kernel_size) / kernel_size
convolved_ys = {}
error_ys = {}
for channel in ts:
convolved_ys[channel] = np.convolve(ys[channel], kernel, mode='same')
error_ys[channel] = np.zeros(len(ys[channel]))
for i in range(len(ys[channel])):
start = max(0, int(i - kernel_size / 2))
end = min(len(ys[channel]) - 1, int(i + kernel_size / 2))
error = np.std(ys[channel][start:end])
error_ys[channel][i] = error
return ts, convolved_ys, error_ys
def integrate(ts, ys, I0=0):
Is = {}
for channel in ts:
y = ys[channel]
I = np.zeros(len(y))
for index in range(1, len(y)):
di_dt = y[index]
dt = ts[channel][index] - ts[channel][index - 1]
I[index] = I[index - 1] + dt * di_dt
Is[channel] = I
return ts, Is
def differentiate_single(xs, ys):
dys = list(np.diff(ys))
dys.append(dys[-1])
dxs = list(np.diff(xs))
dxs.append(dxs[-1])
dy_dx = [dy / dx for dy, dx in zip(dys, dxs)]
return xs, dy_dx
def get_error(time, array, start, end):
# find start index
startindex = [i for i, t in enumerate(time) if t > start][-1]
# find end index
endindex = [i for i, t in enumerate(time) if t < end][0]
# return the error in that range
error = np.std(array[startindex:endindex])
return error
def get_range(time,
voltage,
start,
end,
ignore_negative=False,
plot=False,
show=True):
# TODO get trigger start time
# TODO get current pulse start to rise time
# TODO get pinch time
from plotting import plt
# find start index
startindex = [i for i, t in enumerate(time) if t < start][-1]
# find end index
endindex = [i for i, t in enumerate(time) if t > end][0]
# find dt
dt = time[1] - time[0]
# get v*dt, paying attention to ignore_negative as required
logging.debug(startindex, endindex)
tv = [(t, v * dt) for t, v in zip(time[startindex:endindex],
voltage[startindex:endindex])
if v > 0 or not ignore_negative]
v = [v for (t, v) in tv]
t = [t for (t, v) in tv]
if plot:
plt.plot(time, [volt * dt for volt in voltage], label='raw')
plt.plot(t, v, label='area of interest')
if show:
plt.show()
return v
def plot(shot, scope, channel):
from plotting import plt
data = get_data(shot, scope, channel)
plt.plot(data['Time'], data['Ampl'])
plt.show()