Skip to content
4 changes: 2 additions & 2 deletions pynumdiff/optimize/_optimize.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@
'lmbd': (1e-3, 0.5)}),
tvrdiff: ({'gamma': [1e-2, 1e-1, 1, 10, 100, 1000],
'order': {1, 2, 3}, # warning: order 1 hacks the loss function when tvgamma is used, tends to win but is usually suboptimal choice in terms of true RMSE
'huberM': [0., 1, 2, 6]}, # comb lower values more finely, because the scale of sigma is mad(x), bigger than mad(y-x) residuals
'huberM': [2., 6]}, # the scale of sigma is mad(x), which is bigger than mad(y-x) residuals, so outliers likely come at lower M values
{'gamma': (1e-4, 1e7),
'huberM': (0, 6)}),
'huberM': (2, 6)}), # huberM too low seeks sparse solutions, which hack the tvgamma loss function
velocity: ({'gamma': [1e-2, 1e-1, 1, 10, 100, 1000]}, # Deprecated method
{'gamma': (1e-4, 1e7)}),
iterative_velocity: ({'scale': 'small', # Rare to optimize this one, because it's longer-running than convex version
Expand Down
5 changes: 0 additions & 5 deletions pynumdiff/tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,6 @@ def test_convolutional_smoother():
assert np.allclose(utility.convolutional_smoother(x, kernel_even, num_iterations=3), np.ones(len(x)))


def test_hankel_matrix():
"""Ensure Hankel matrix comes back as defined"""
assert np.allclose(utility.hankel_matrix([1, 2, 3, 4, 5], 3), [[1, 2, 3],[2, 3, 4],[3, 4, 5]])


def test_peakdet(request):
"""Verify peakdet finds peaks and valleys"""
t = np.arange(0, 10, 0.001)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def tvrdiff(x, dt, order, gamma, huberM=float('inf'), solver=None):
:param float gamma: regularization parameter
:param float huberM: Huber loss parameter, in units of scaled median absolute deviation of input data, :code:`x`.
:math:`M = \\infty` reduces to :math:`\\ell_2` loss squared on first, fidelity cost term, and
:math:`M = 0` reduces to :math:`\\ell_1` loss.
:math:`M = 0` reduces to :math:`\\ell_1` loss, which seeks sparse residuals.
:param str solver: Solver to use. Solver options include: 'MOSEK', 'CVXOPT', 'CLARABEL', 'ECOS'.
If not given, fall back to CVXPY's default.

Expand Down Expand Up @@ -92,7 +92,7 @@ def tvrdiff(x, dt, order, gamma, huberM=float('inf'), solver=None):
# so cvxpy's doubled Huber is already the right scale, and \ell_1 should be scaled by 2\sqrt{2} to match.
fidelity_cost = cvxpy.sum_squares(y - x) if huberM == float('inf') \
else np.sqrt(8)*cvxpy.norm(y - x, 1) if huberM == 0 \
else utility.huber_const(huberM)*cvxpy.sum(cvxpy.huber(y - x, huberM*sigma))
else utility.huber_const(huberM)*cvxpy.sum(cvxpy.huber(y - x, huberM)) # data is already scaled, so M rather than M*sigma
# Set up and solve the optimization problem
prob = cvxpy.Problem(cvxpy.Minimize(fidelity_cost + gamma*cvxpy.sum(cvxpy.tv(deriv_values)) ))
prob.solve(solver=solver)
Expand Down
143 changes: 51 additions & 92 deletions pynumdiff/utils/utility.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,98 +5,6 @@
from scipy.stats import median_abs_deviation, norm


def hankel_matrix(x, num_delays, pad=False): # fixed delay step of 1
"""Unused throughout the repo

:param np.array[float] x: data
:param int num_delays: number of times to 1-step shift data
:param bool pad: if True, return width is len(x), else width is len(x) - num_delays + 1

:return: a Hankel Matrix `m`. For example, if `x = [a, b, c, d, e]` and `num_delays = 3`:\n
With `pad = False`::\n
m = [[a, b, c],
[b, c, d],
[c, d, e]]\n
With `pad = True`::\n
m = [[a, b, c, d, e],
[b, c, d, e, 0],
[c, d, e, 0, 0]]
"""
m = x.copy()
for d in range(1, num_delays):
xi = x[d:]
xi = np.pad(xi, (0, len(x)-len(xi)), 'constant', constant_values=0)
m = np.vstack((m, xi))
if not pad:
return m[:, 0:-1*num_delays+1]
return m


def matrix_inv(X, max_sigma=1e-16):
"""Stable (pseudo) matrix inversion using singular value decomposition. Unused throughout the repo.

:param np.array X: matrix to invert
:param float max_sigma: smallest singular values to take into account. matrix will be truncated prior to inversion based on this value.

:return: (np.array) -- pseudo inverse
"""
U, Sigma, V = np.linalg.svd(X, full_matrices=False)
Sigma_inv = Sigma**-1
Sigma_inv[np.where(Sigma < max_sigma)[0]] = 0 # helps reduce instabilities
return V.T.dot(np.diag(Sigma_inv)).dot(U.T)


def peakdet(x, delta, t=None):
"""Find peaks and valleys of 1D array. A point is considered a maximum peak if it has the maximal
value, and was preceded (to the left) by a value lower by delta. Converted from MATLAB script at
http://billauer.co.il/peakdet.html Eli Billauer, 3.4.05 (Explicitly not copyrighted). This function
is released to the public domain; Any use is allowed.

:param np.array[float] x: array for which to find peaks and valleys
:param float delta: threshold for finding peaks and valleys. A point is considered a maximum peak
if it has the maximal value, and was preceded (to the left) by a value lower by delta.
:param np.array[float] t: optional domain points where data comes from, to make indices into locations

:return: tuple[np.array, np.array] of\n
- **maxtab** -- indices or locations (column 1) and values (column 2) of maxima
- **mintab** -- indices or locations (column 1) and values (column 2) of minima
"""
maxtab = []
mintab = []
if t is None:
t = np.arange(len(x))
elif len(x) != len(t):
raise ValueError('Input vectors x and t must have same length')
if not (np.isscalar(delta) and delta > 0):
raise ValueError('Input argument delta must be a positive scalar')

mn, mx = np.inf, -1*np.inf
mnpos, mxpos = np.nan, np.nan
lookformax = True
for i in np.arange(len(x)):
this = x[i]
if this > mx:
mx = this
mxpos = t[i]
if this < mn:
mn = this
mnpos = t[i]
if lookformax:
if this < mx-delta:
maxtab.append((mxpos, mx))
mn = this
mnpos = t[i]
lookformax = False # now searching for a min
else:
if this > mn+delta:
mintab.append((mnpos, mn))
mx = this
mxpos = t[i]
lookformax = True # now searching for a max

return np.array(maxtab), np.array(mintab)


def huber(x, M):
"""Huber loss function, for outlier-robust applications, `see here <https://www.cvxpy.org/api_reference/cvxpy.atoms.elementwise.html#huber>`_"""
absx = np.abs(x)
Expand Down Expand Up @@ -228,3 +136,54 @@ def slide_function(func, x, dt, kernel, *args, stride=1, pass_weights=False, **k
weight_sum[window] += w # save sum of weights for normalization at the end

return x_hat/weight_sum, dxdt_hat/weight_sum


def peakdet(x, delta, t=None):
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved this guy to the bottom, because it's only used by the Triangles simulations.

"""Find peaks and valleys of 1D array. A point is considered a maximum peak if it has the maximal
value, and was preceded (to the left) by a value lower by delta. Converted from MATLAB script at
http://billauer.co.il/peakdet.html Eli Billauer, 3.4.05 (Explicitly not copyrighted). This function
is released to the public domain; Any use is allowed.

:param np.array[float] x: array for which to find peaks and valleys
:param float delta: threshold for finding peaks and valleys. A point is considered a maximum peak
if it has the maximal value, and was preceded (to the left) by a value lower by delta.
:param np.array[float] t: optional domain points where data comes from, to make indices into locations

:return: tuple[np.array, np.array] of\n
- **maxtab** -- indices or locations (column 1) and values (column 2) of maxima
- **mintab** -- indices or locations (column 1) and values (column 2) of minima
"""
maxtab = []
mintab = []
if t is None:
t = np.arange(len(x))
elif len(x) != len(t):
raise ValueError('Input vectors x and t must have same length')
if not (np.isscalar(delta) and delta > 0):
raise ValueError('Input argument delta must be a positive scalar')

mn, mx = np.inf, -1*np.inf
mnpos, mxpos = np.nan, np.nan
lookformax = True
for i in np.arange(len(x)):
this = x[i]
if this > mx:
mx = this
mxpos = t[i]
if this < mn:
mn = this
mnpos = t[i]
if lookformax:
if this < mx-delta:
maxtab.append((mxpos, mx))
mn = this
mnpos = t[i]
lookformax = False # now searching for a min
else:
if this > mn+delta:
mintab.append((mnpos, mn))
mx = this
mxpos = t[i]
lookformax = True # now searching for a max

return np.array(maxtab), np.array(mintab)