-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathEKF_MAT2PY.py
More file actions
66 lines (57 loc) · 2.67 KB
/
EKF_MAT2PY.py
File metadata and controls
66 lines (57 loc) · 2.67 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import numpy as np
import socket
import time
def recvall(soc,n):
d=soc.recv(n)
while len(d)<n:
d+=soc.recv(n-len(d))
return d
def mat2py(u1, u2, n_acts,n_pix):
while True:
try:
soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
soc.connect(("127.0.0.1", 4651))
# soc.send(u_tot[:n_acts].astype(dtype=">d").tobytes()) #send example
# soc.send(u_tot[n_acts:].astype(dtype=">d").tobytes()) #send example
soc.send(u1.astype(dtype=">d").tobytes()) #send example
soc.send(u2.astype(dtype=">d").tobytes()) #send example
# [len(DM1command):]
#more code...
# E_real = np.frombuffer(recvall(soc,n_pix*8), dtype=">d").reshape(n_pix) #receive example
# E_imag = np.frombuffer(recvall(soc,n_pix*8), dtype=">d").reshape(n_pix) #receive example
I = np.frombuffer(recvall(soc,n_pix*8), dtype=">d").reshape(n_pix) #receive example
E_real = np.frombuffer(recvall(soc,n_pix*8), dtype=">d").reshape(n_pix)
E_imag = np.frombuffer(recvall(soc,n_pix*8), dtype=">d").reshape(n_pix)
soc.close()
return I, E_real+E_imag*complex(0,1)
except socket.error:
print("Socket Error!")
time.sleep(1)
continue #retry if something went wrong
# class Filter:
# def __init__(self, index, x0 = np.zeros(2)):
# self.index = index #pixel index
# self.x_hat = np.array(x0) #electrif field (state) estimate (real 2x1 vector with real and imag parts)
#
# self.Q = drift_stats["covs"][self.index] #process noise covariance
# self.P = 16*self.Q #initial state covariance
#
# def advance(self, Gu, I):
# ## "predic" step
# E_hat = complex(self.x_hat[0] + Gu[self.index], self.x_hat[1] + Gu[self.index+G.shape[0]//2]) #closed loop electric field estimate (open loop field + Jacobian * controls)
# z_hat = abs(E_hat)**2 + dark_curr*scale # intensity estimate
# z = I[self.index] # intensity measurement
#
# H = [2*Gu[self.index] + 2*self.x_hat[0], 2*Gu[self.index+G.shape[0]//2] + 2*self.x_hat[1]] #observation equation linearization
#
# ## Kalman gain:
# S = self.P.dot(H).dot(H) + (dark_curr*scale)**2
# K = self.P.dot(H)/S
#
# ## "update" step
# self.x_hat = self.x_hat + K.dot(z - z_hat) #closed loop electric field estimate
# self.P = self.P - np.outer(K,H).dot(self.P) #electric field covariance
#
# ## "advance" step
# self.P = self.P + self.Q #covariance increases due to druft
# self.x_hat = self.x_hat*1.0 #mean remains the same (the drift is zero mean)