diff --git a/pynumdiff/optimize/_optimize.py b/pynumdiff/optimize/_optimize.py index eacef4a..b6150c7 100644 --- a/pynumdiff/optimize/_optimize.py +++ b/pynumdiff/optimize/_optimize.py @@ -66,9 +66,9 @@ 'lmbd': (1e-3, 0.5)}), tvrdiff: ({'gamma': [1e-2, 1e-1, 1, 10, 100, 1000], 'order': {1, 2, 3}, # warning: order 1 hacks the loss function when tvgamma is used, tends to win but is usually suboptimal choice in terms of true RMSE - 'huberM': [0., 1, 2, 6]}, # comb lower values more finely, because the scale of sigma is mad(x), bigger than mad(y-x) residuals + 'huberM': [2., 6]}, # the scale of sigma is mad(x), which is bigger than mad(y-x) residuals, so outliers likely come at lower M values {'gamma': (1e-4, 1e7), - 'huberM': (0, 6)}), + 'huberM': (2, 6)}), # huberM too low seeks sparse solutions, which hack the tvgamma loss function velocity: ({'gamma': [1e-2, 1e-1, 1, 10, 100, 1000]}, # Deprecated method {'gamma': (1e-4, 1e7)}), iterative_velocity: ({'scale': 'small', # Rare to optimize this one, because it's longer-running than convex version diff --git a/pynumdiff/tests/test_utils.py b/pynumdiff/tests/test_utils.py index 4580975..dc6e2b1 100644 --- a/pynumdiff/tests/test_utils.py +++ b/pynumdiff/tests/test_utils.py @@ -47,11 +47,6 @@ def test_convolutional_smoother(): assert np.allclose(utility.convolutional_smoother(x, kernel_even, num_iterations=3), np.ones(len(x))) -def test_hankel_matrix(): - """Ensure Hankel matrix comes back as defined""" - assert np.allclose(utility.hankel_matrix([1, 2, 3, 4, 5], 3), [[1, 2, 3],[2, 3, 4],[3, 4, 5]]) - - def test_peakdet(request): """Verify peakdet finds peaks and valleys""" t = np.arange(0, 10, 0.001) diff --git a/pynumdiff/total_variation_regularization/_total_variation_regularization.py b/pynumdiff/total_variation_regularization/_total_variation_regularization.py index 384b1af..e98d0e4 100644 --- a/pynumdiff/total_variation_regularization/_total_variation_regularization.py +++ b/pynumdiff/total_variation_regularization/_total_variation_regularization.py @@ -64,7 +64,7 @@ def tvrdiff(x, dt, order, gamma, huberM=float('inf'), solver=None): :param float gamma: regularization parameter :param float huberM: Huber loss parameter, in units of scaled median absolute deviation of input data, :code:`x`. :math:`M = \\infty` reduces to :math:`\\ell_2` loss squared on first, fidelity cost term, and - :math:`M = 0` reduces to :math:`\\ell_1` loss. + :math:`M = 0` reduces to :math:`\\ell_1` loss, which seeks sparse residuals. :param str solver: Solver to use. Solver options include: 'MOSEK', 'CVXOPT', 'CLARABEL', 'ECOS'. If not given, fall back to CVXPY's default. @@ -92,7 +92,7 @@ def tvrdiff(x, dt, order, gamma, huberM=float('inf'), solver=None): # so cvxpy's doubled Huber is already the right scale, and \ell_1 should be scaled by 2\sqrt{2} to match. fidelity_cost = cvxpy.sum_squares(y - x) if huberM == float('inf') \ else np.sqrt(8)*cvxpy.norm(y - x, 1) if huberM == 0 \ - else utility.huber_const(huberM)*cvxpy.sum(cvxpy.huber(y - x, huberM*sigma)) + else utility.huber_const(huberM)*cvxpy.sum(cvxpy.huber(y - x, huberM)) # data is already scaled, so M rather than M*sigma # Set up and solve the optimization problem prob = cvxpy.Problem(cvxpy.Minimize(fidelity_cost + gamma*cvxpy.sum(cvxpy.tv(deriv_values)) )) prob.solve(solver=solver) diff --git a/pynumdiff/utils/utility.py b/pynumdiff/utils/utility.py index c0c4af4..52b4035 100644 --- a/pynumdiff/utils/utility.py +++ b/pynumdiff/utils/utility.py @@ -5,98 +5,6 @@ from scipy.stats import median_abs_deviation, norm -def hankel_matrix(x, num_delays, pad=False): # fixed delay step of 1 - """Unused throughout the repo - - :param np.array[float] x: data - :param int num_delays: number of times to 1-step shift data - :param bool pad: if True, return width is len(x), else width is len(x) - num_delays + 1 - - :return: a Hankel Matrix `m`. For example, if `x = [a, b, c, d, e]` and `num_delays = 3`:\n - With `pad = False`::\n - m = [[a, b, c], - [b, c, d], - [c, d, e]]\n - With `pad = True`::\n - m = [[a, b, c, d, e], - [b, c, d, e, 0], - [c, d, e, 0, 0]] - """ - m = x.copy() - for d in range(1, num_delays): - xi = x[d:] - xi = np.pad(xi, (0, len(x)-len(xi)), 'constant', constant_values=0) - m = np.vstack((m, xi)) - if not pad: - return m[:, 0:-1*num_delays+1] - return m - - -def matrix_inv(X, max_sigma=1e-16): - """Stable (pseudo) matrix inversion using singular value decomposition. Unused throughout the repo. - - :param np.array X: matrix to invert - :param float max_sigma: smallest singular values to take into account. matrix will be truncated prior to inversion based on this value. - - :return: (np.array) -- pseudo inverse - """ - U, Sigma, V = np.linalg.svd(X, full_matrices=False) - Sigma_inv = Sigma**-1 - Sigma_inv[np.where(Sigma < max_sigma)[0]] = 0 # helps reduce instabilities - return V.T.dot(np.diag(Sigma_inv)).dot(U.T) - - -def peakdet(x, delta, t=None): - """Find peaks and valleys of 1D array. A point is considered a maximum peak if it has the maximal - value, and was preceded (to the left) by a value lower by delta. Converted from MATLAB script at - http://billauer.co.il/peakdet.html Eli Billauer, 3.4.05 (Explicitly not copyrighted). This function - is released to the public domain; Any use is allowed. - - :param np.array[float] x: array for which to find peaks and valleys - :param float delta: threshold for finding peaks and valleys. A point is considered a maximum peak - if it has the maximal value, and was preceded (to the left) by a value lower by delta. - :param np.array[float] t: optional domain points where data comes from, to make indices into locations - - :return: tuple[np.array, np.array] of\n - - **maxtab** -- indices or locations (column 1) and values (column 2) of maxima - - **mintab** -- indices or locations (column 1) and values (column 2) of minima - """ - maxtab = [] - mintab = [] - if t is None: - t = np.arange(len(x)) - elif len(x) != len(t): - raise ValueError('Input vectors x and t must have same length') - if not (np.isscalar(delta) and delta > 0): - raise ValueError('Input argument delta must be a positive scalar') - - mn, mx = np.inf, -1*np.inf - mnpos, mxpos = np.nan, np.nan - lookformax = True - for i in np.arange(len(x)): - this = x[i] - if this > mx: - mx = this - mxpos = t[i] - if this < mn: - mn = this - mnpos = t[i] - if lookformax: - if this < mx-delta: - maxtab.append((mxpos, mx)) - mn = this - mnpos = t[i] - lookformax = False # now searching for a min - else: - if this > mn+delta: - mintab.append((mnpos, mn)) - mx = this - mxpos = t[i] - lookformax = True # now searching for a max - - return np.array(maxtab), np.array(mintab) - - def huber(x, M): """Huber loss function, for outlier-robust applications, `see here `_""" absx = np.abs(x) @@ -228,3 +136,54 @@ def slide_function(func, x, dt, kernel, *args, stride=1, pass_weights=False, **k weight_sum[window] += w # save sum of weights for normalization at the end return x_hat/weight_sum, dxdt_hat/weight_sum + + +def peakdet(x, delta, t=None): + """Find peaks and valleys of 1D array. A point is considered a maximum peak if it has the maximal + value, and was preceded (to the left) by a value lower by delta. Converted from MATLAB script at + http://billauer.co.il/peakdet.html Eli Billauer, 3.4.05 (Explicitly not copyrighted). This function + is released to the public domain; Any use is allowed. + + :param np.array[float] x: array for which to find peaks and valleys + :param float delta: threshold for finding peaks and valleys. A point is considered a maximum peak + if it has the maximal value, and was preceded (to the left) by a value lower by delta. + :param np.array[float] t: optional domain points where data comes from, to make indices into locations + + :return: tuple[np.array, np.array] of\n + - **maxtab** -- indices or locations (column 1) and values (column 2) of maxima + - **mintab** -- indices or locations (column 1) and values (column 2) of minima + """ + maxtab = [] + mintab = [] + if t is None: + t = np.arange(len(x)) + elif len(x) != len(t): + raise ValueError('Input vectors x and t must have same length') + if not (np.isscalar(delta) and delta > 0): + raise ValueError('Input argument delta must be a positive scalar') + + mn, mx = np.inf, -1*np.inf + mnpos, mxpos = np.nan, np.nan + lookformax = True + for i in np.arange(len(x)): + this = x[i] + if this > mx: + mx = this + mxpos = t[i] + if this < mn: + mn = this + mnpos = t[i] + if lookformax: + if this < mx-delta: + maxtab.append((mxpos, mx)) + mn = this + mnpos = t[i] + lookformax = False # now searching for a min + else: + if this > mn+delta: + mintab.append((mnpos, mn)) + mx = this + mxpos = t[i] + lookformax = True # now searching for a max + + return np.array(maxtab), np.array(mintab)