diff --git a/pynumdiff/kalman_smooth/_kalman_smooth.py b/pynumdiff/kalman_smooth/_kalman_smooth.py index 225ca55..ef30c18 100644 --- a/pynumdiff/kalman_smooth/_kalman_smooth.py +++ b/pynumdiff/kalman_smooth/_kalman_smooth.py @@ -123,8 +123,8 @@ def constant_velocity(x, dt, params=None, options=None, r=None, q=None, forwardb - **dxdt_hat** -- estimated derivative of x """ if params != None: # boilerplate backwards compatibility code - warn("""`params` and `options` parameters will be removed in a future version. Use `r`, - `q`, and `forwardbackward` instead.""", DeprecationWarning) + warn("`params` and `options` parameters will be removed in a future version. Use `r`, " + + "`q`, and `forwardbackward` instead.", DeprecationWarning) r, q = params if options != None: if 'forwardbackward' in options: forwardbackward = options['forwardbackward'] @@ -158,8 +158,8 @@ def constant_acceleration(x, dt, params=None, options=None, r=None, q=None, forw - **dxdt_hat** -- estimated derivative of x """ if params != None: # boilerplate backwards compatibility code - warn("""`params` and `options` parameters will be removed in a future version. Use `r`, - `q`, and `forwardbackward` instead.""", DeprecationWarning) + warn("`params` and `options` parameters will be removed in a future version. Use `r`, " + + "`q`, and `forwardbackward` instead.", DeprecationWarning) r, q = params if options != None: if 'forwardbackward' in options: forwardbackward = options['forwardbackward'] @@ -197,8 +197,8 @@ def constant_jerk(x, dt, params=None, options=None, r=None, q=None, forwardbackw - **dxdt_hat** -- estimated derivative of x """ if params != None: # boilerplate backwards compatibility code - warn("""`params` and `options` parameters will be removed in a future version. Use `r`, - `q`, and `forwardbackward` instead.""", DeprecationWarning) + warn("`params` and `options` parameters will be removed in a future version. Use `r`, " + + "`q`, and `forwardbackward` instead.", DeprecationWarning) r, q = params if options != None: if 'forwardbackward' in options: forwardbackward = options['forwardbackward'] @@ -242,8 +242,8 @@ def known_dynamics(x, params, u=None, options=None, xhat0=None, P0=None, A=None, :return: np.array **x_hat** -- estimated (smoothed) x """ # Why not also returning derivative here? if params != None: - warn("""`params` and `options` parameters will be removed in a future version. Use `xhat0`, - `P0`, `A`, `B`, `C`, `Q`, `R`, and `smooth` instead.""", DeprecationWarning) + warn("`params` and `options` parameters will be removed in a future version. Use `xhat0`, " + + "`P0`, `A`, `B`, `C`, `Q`, `R`, and `smooth` instead.", DeprecationWarning) xhat0, P0, A, B, C, R, Q = params if options != None: if 'smooth' in options: smooth = options['smooth'] diff --git a/pynumdiff/linear_model/_linear_model.py b/pynumdiff/linear_model/_linear_model.py index 61ae6f1..eda431f 100644 --- a/pynumdiff/linear_model/_linear_model.py +++ b/pynumdiff/linear_model/_linear_model.py @@ -9,93 +9,14 @@ try: import cvxpy except ImportError: pass -KERNELS = {'friedrichs': utility._friedrichs_kernel, - 'gaussian': utility._gaussian_kernel} - -#################### -# Helper functions # -#################### -def _slide_function(func, x, dt, args, window_size, step_size, kernel_name): - """Slide a smoothing derivative function across a timeseries with specified window size. - - :param callable func: name of the function to slide - :param np.array[float] x: data to differentiate - :param float dt: time step - :param dict args: see func for requirements - :param int window_size: size of the sliding window - :param int step_size: step size for slide (e.g. 1 means slide by 1 step) - :param str kernel_name: name of the smoothing kernel (e.g. 'friedrichs' or 'gaussian') - - :return: tuple[np.array, np.array] of\n - - **x_hat** -- estimated (smoothed) x - - **dxdt_hat** -- estimated derivative of x - """ - - # get smoothing kernel - if not window_size % 2: # then make odd - window_size += 1 - ker = KERNELS[kernel_name](window_size) - - x_hat_list = [] - dxdt_hat_list = [] - weight_list = [] - - for p in range(0, len(x), step_size): - # deal with end points - start = p - int((window_size-1)/2) - end = p + int((window_size-1)/2)+1 - - ker_start = 0 - ker_end = window_size - - if start < 0: - ker_start = np.abs(start) - start = 0 - if end > len(x): - ker_end = window_size - (end-len(x)) - end = len(x) - - # weights - w = ker[ker_start:ker_end] - w = w/np.sum(w) - - # run the function on the window - _x = x[start:end] - x_hat, dxdt_hat = func(_x, dt, *args, weights=w) - - # stack results - z_x_hat = np.zeros([len(x)]) - z_x_hat[start:end] = x_hat - x_hat_list.append(z_x_hat) - - z_dxdt_hat = np.zeros([len(x)]) - z_dxdt_hat[start:end] = dxdt_hat - dxdt_hat_list.append(z_dxdt_hat) - - z_weights = np.zeros([len(x)]) - z_weights[start:end] = w - weight_list.append(z_weights) - - # column norm weights - weights = np.vstack(weight_list) - for col in range(weights.shape[1]): - weights[:, col] = weights[:, col] / np.sum(weights[:, col]) - - # stack and weight x_hat and dxdt_hat - x_hat = np.vstack(x_hat_list) - dxdt_hat = np.vstack(dxdt_hat_list) - - x_hat = np.sum(weights*x_hat, axis=0) - dxdt_hat = np.sum(weights*dxdt_hat, axis=0) - - return x_hat, dxdt_hat - ######################### # Savitzky-Golay filter # ######################### def savgoldiff(x, dt, params=None, options=None, polynomial_order=None, window_size=None, smoothing_win=None): - """Use the Savitzky-Golay to smooth the data and calculate the first derivative. It wses scipy.signal.savgol_filter. The Savitzky-Golay is very similar to the sliding polynomial fit, but slightly noisier, and much faster + """Use the Savitzky-Golay to smooth the data and calculate the first derivative. It wses + scipy.signal.savgol_filter. The Savitzky-Golay is very similar to the sliding polynomial fit, + but slightly noisier, and much faster :param np.array[float] x: data to differentiate :param float dt: step size @@ -103,7 +24,8 @@ def savgoldiff(x, dt, params=None, options=None, polynomial_order=None, window_s :param dict options: (**deprecated**) :param int polynomial_order: order of the polynomial :param int window_size: size of the sliding window, must be odd (if not, 1 is added) - :param int smoothing_win: size of the window used for gaussian smoothing, a good default is window_size, but smaller for high frequnecy data + :param int smoothing_win: size of the window used for gaussian smoothing, a good default is + window_size, but smaller for high frequnecy data :return: tuple[np.array, np.array] of\n - **x_hat** -- estimated (smoothed) x @@ -116,22 +38,14 @@ def savgoldiff(x, dt, params=None, options=None, polynomial_order=None, window_s elif polynomial_order == None or window_size == None or smoothing_win == None: raise ValueError("`polynomial_order`, `window_size`, and `smoothing_win` must be given.") - if window_size > len(x)-1: - window_size = len(x)-1 - - if smoothing_win > len(x)-1: - smoothing_win = len(x)-1 + window_size = np.clip(window_size, polynomial_order + 1, len(x)-1) + if not window_size % 2: window_size += 1 # window_size needs to be odd + smoothing_win = min(smoothing_win, len(x)-1) - if window_size <= polynomial_order: - window_size = polynomial_order + 1 + dxdt_hat = scipy.signal.savgol_filter(x, window_size, polynomial_order, deriv=1)/dt - if not window_size % 2: # then make odd - window_size += 1 - - dxdt_hat = scipy.signal.savgol_filter(x, window_size, polynomial_order, deriv=1) / dt - - kernel = utility._gaussian_kernel(smoothing_win) - dxdt_hat = utility.convolutional_smoother(dxdt_hat, kernel, 1) + kernel = utility.gaussian_kernel(smoothing_win) + dxdt_hat = utility.convolutional_smoother(dxdt_hat, kernel) x_hat = utility.integrate_dxdt_hat(dxdt_hat, dt) x0 = utility.estimate_initial_condition(x, x_hat) @@ -143,47 +57,19 @@ def savgoldiff(x, dt, params=None, options=None, polynomial_order=None, window_s ###################### # Polynomial fitting # ###################### -def _polydiff(x, dt, polynomial_order, weights=None): - """Fit polynomials to the timeseries, and differentiate the polynomials. - - :param np.array[float] x: data to differentiate - :param float dt: time step - :param int polynomial_order: order of the polynomial - :param np.array[float] weights: weights applied to each point in calculating the polynomial fit. - Defaults to 1s if missing. - - :return: tuple[np.array, np.array] of\n - - **x_hat** -- estimated (smoothed) x - - **dxdt_hat** -- estimated derivative of x - """ - if weights is None: - weights = np.ones(x.shape) - - t = np.arange(1, len(x)+1)*dt - - r = np.polyfit(t, x, polynomial_order, w=weights) # polyfit returns highest order first - dr = np.polyder(r) # power rule already implemented for us - - dxdt_hat = np.polyval(dr, t) # evaluate the derivative and original polynomials at points t - x_hat = np.polyval(r, t) # smoothed x - - return x_hat, dxdt_hat - - -def polydiff(x, dt, params=None, options=None, polynomial_order=None, window_size=None, - sliding=True, step_size=1, kernel='friedrichs'): +def polydiff(x, dt, params=None, options=None, polynomial_order=None, window_size=None, step_size=1, + kernel='friedrichs'): """Fit polynomials to the data, and differentiate the polynomials. :param np.array[float] x: data to differentiate :param float dt: step size :param list[int] params: (**deprecated**, prefer :code:`polynomial_order` and :code:`window_size`) - :param dict options: (**deprecated**, prefer :code:`sliding`, :code:`step_size`, and :code:`kernel`) + :param dict options: (**deprecated**, prefer :code:`step_size` and :code:`kernel`) a dictionary consisting of {'sliding': (bool), 'step_size': (int), 'kernel_name': (str)} :param int polynomial_order: order of the polynomial - :param int window_size: size of the sliding window (ignored if not sliding) - :param bool sliding: whether to use sliding approach + :param int window_size: size of the sliding window, if not given no sliding :param int step_size: step size for sliding - :param str kernel: kernel to use for weighting and smoothing windows ('gaussian' or 'friedrichs') + :param str kernel: name of kernel to use for weighting and smoothing windows ('gaussian' or 'friedrichs') :return: tuple[np.array, np.array] of\n - **x_hat** -- estimated (smoothed) x @@ -192,9 +78,10 @@ def polydiff(x, dt, params=None, options=None, polynomial_order=None, window_siz if params != None: warn("`params` and `options` parameters will be removed in a future version. Use `polynomial_order` " + "and `window_size` instead.", DeprecationWarning) - polynomial_order, window_size = params + polynomial_order = params[0] + if len(params) > 1: window_size = params[1] if options != None: - if 'sliding' in options: sliding = options['sliding'] + if 'sliding' in options and not options['sliding']: window_size = None if 'step_size' in options: step_size = options['step_size'] if 'kernel_name' in options: kernel = options['kernel_name'] elif polynomial_order == None or window_size == None: @@ -203,18 +90,28 @@ def polydiff(x, dt, params=None, options=None, polynomial_order=None, window_siz if window_size < polynomial_order*3: window_size = polynomial_order*3+1 - if sliding: - return _slide_function(_polydiff, x, dt, [polynomial_order], window_size, step_size, kernel) + def _polydiff(x, dt, polynomial_order, weights=None): + t = np.arange(len(x))*dt - return _polydiff(x, dt, polynomial_order) + r = np.polyfit(t, x, polynomial_order, w=weights) # polyfit returns highest order first + dr = np.polyder(r) # power rule already implemented for us + + dxdt_hat = np.polyval(dr, t) # evaluate the derivative and original polynomials at points t + x_hat = np.polyval(r, t) # smoothed x + + return x_hat, dxdt_hat + + if not window_size: + return _polydiff(x, dt, polynomial_order) + + kernel = {'gaussian':utility.gaussian_kernel, 'friedrichs':utility.friedrichs_kernel}[kernel](window_size) + return utility.slide_function(_polydiff, x, dt, kernel, polynomial_order, step_size=step_size, pass_weights=True) ############# # Chebychev # # Removed - Not Useful and requires old package ############# - - # def __chebydiff__(x, dt, params, options=None): # """ # Fit the timeseries with chebyshev polynomials, and differentiate this model. @@ -248,7 +145,6 @@ def polydiff(x, dt, params=None, options=None, polynomial_order=None, window_siz # return x_hat, dxdt_hat - # def chebydiff(x, dt, params, options=None): # """ # Slide a smoothing derivative function across a times eries with specified window size. @@ -296,11 +192,12 @@ def polydiff(x, dt, params=None, options=None, polynomial_order=None, window_siz # return __chebydiff__(x, dt, params) -def __solve_for_A_and_C_given_X_and_Xdot__(X, Xdot, num_integrations, dt, gammaC=1e-1, gammaA=1e-6, +############### +# Linear diff # +############### +def _solve_for_A_and_C_given_X_and_Xdot(X, Xdot, num_integrations, dt, gammaC=1e-1, gammaA=1e-6, solver=None, A_known=None, epsilon=1e-6, rows_of_interest='all'): - """Given state and the derivative, find the system evolution and measurement matrices. - """ - + """Given state and the derivative, find the system evolution and measurement matrices.""" if rows_of_interest == 'all': rows_of_interest = np.arange(0, X.shape[0]) @@ -341,69 +238,8 @@ def __solve_for_A_and_C_given_X_and_Xdot__(X, Xdot, num_integrations, dt, gammaC A = np.array(A.value) return A, np.array(C.value) - -def __integrate_dxdt_hat_matrix__(dxdt_hat, dt): - """Do integration analogous to integrate_dxdt_hat in the utilities, except on a 2D matrix. - """ - if len(dxdt_hat.shape) == 1: - dxdt_hat = np.reshape(dxdt_hat, [1, len(dxdt_hat)]) - x = np.array(scipy.integrate.cumulative_trapezoid(dxdt_hat, axis=1)) - first_value = x[:, 0:1] - np.mean(dxdt_hat[:, 0:1], axis=1).reshape(dxdt_hat.shape[0], 1) - x = np.hstack((first_value, x))*dt - return x - - -def _lineardiff(x, dt, N, gamma, solver=None, weights=None): - """Estimate the parameters for a system xdot = Ax, and use that to calculate the derivative - - :param np.array[float] x: data to differentiate - :param float dt: time step - :param int > 1 N: order (e.g. 2: velocity; 3: acceleration) - :param float gamma: regularization term - :param np.array[float] weights: Bug? Currently not used here, although `lineardiff` takes a kernel - :param str solver: which cvxpy solver to use - - :return: tuple[np.array, np.array] of\n - - **x_hat** -- estimated (smoothed) x - - **dxdt_hat** -- estimated derivative of x - """ - mean = np.mean(x) - x = x - mean - - # Generate the matrix of integrals of x - X = [x] - for n in range(1, N): - X.append(utility.integrate_dxdt_hat(X[-1], dt)) - X = (np.vstack(X[::-1])) - integral_Xdot = X - integral_X = __integrate_dxdt_hat_matrix__(X, dt) - - # Solve for A and the integration constants - A, C = __solve_for_A_and_C_given_X_and_Xdot__(integral_X, integral_Xdot, N, dt, gamma, solver=solver) - - # Add the integration constants - Csum = 0 - t = np.arange(0, X.shape[1])*dt - for n in range(0, N - 1): - C_subscript = n - t_exponent = N - n - 2 - den = math.factorial(t_exponent) - Cn = np.vstack([1/den*C[i, C_subscript]*t**t_exponent for i in range(X.shape[0])]) - Csum = Csum + Cn - Csum = np.array(Csum) - - # Use A and C to calculate the derivative - Xdot_reconstructed = (A@X + Csum) - dxdt_hat = np.ravel(Xdot_reconstructed[-1, :]) - - x_hat = utility.integrate_dxdt_hat(dxdt_hat, dt) - x_hat = x_hat + utility.estimate_initial_condition(x+mean, x_hat) - - return x_hat, dxdt_hat - - def lineardiff(x, dt, params=None, options=None, order=None, gamma=None, window_size=None, - sliding=True, step_size=10, kernel='friedrichs', solver=None): + step_size=10, kernel='friedrichs', solver=None): """Slide a smoothing derivative function across data, with specified window size. :param np.array[float] x: data to differentiate @@ -414,9 +250,8 @@ def lineardiff(x, dt, params=None, options=None, order=None, gamma=None, window_ :param int>1 order: order of the polynomial :param float gamma: regularization term :param int window_size: size of the sliding window (ignored if not sliding) - :param bool sliding: whether to use sliding approach :param int step_size: step size for sliding - :param str kernel_name: kernel to use for weighting and smoothing windows ('gaussian' or 'friedrichs') + :param str kernel: name of kernel to use for weighting and smoothing windows ('gaussian' or 'friedrichs') :param str solver: CVXPY solver to use, one of :code:`cvxpy.installed_solvers()` :return: tuple[np.array, np.array] of\n @@ -426,39 +261,76 @@ def lineardiff(x, dt, params=None, options=None, order=None, gamma=None, window_ if params != None: warn("`params` and `options` parameters will be removed in a future version. Use `order`, " + "`gamma`, and `window_size` instead.", DeprecationWarning) - order, gamma, window_size = params + order, gamma = params[:2] + if len(params) > 2: window_size = params[2] if options != None: - if 'sliding' in options: sliding = options['sliding'] + if 'sliding' in options and not options['sliding']: window_size = None if 'step_size' in options: step_size = options['step_size'] if 'kernel_name' in options: kernel = options['kernel_name'] if 'solver' in options: solver = options['solver'] elif order == None or gamma == None or window_size == None: raise ValueError("`order`, `gamma`, and `window_size` must be given.") - if sliding: - # forward and backward - x_hat_forward, _ = _slide_function(_lineardiff, x, dt, [order, gamma, solver], window_size, step_size, - kernel) - x_hat_backward, _ = _slide_function(_lineardiff, x[::-1], dt, [order, gamma, solver], window_size, step_size, - kernel) + def _lineardiff(x, dt, order, gamma, solver=None): + """Estimate the parameters for a system xdot = Ax, and use that to calculate the derivative""" + mean = np.mean(x) + x = x - mean + + # Generate the matrix of integrals of x + X = [x] + for n in range(1, order): + X.append(utility.integrate_dxdt_hat(X[-1], dt)) + X = np.vstack(X[::-1]) + integral_Xdot = X + integral_X = np.hstack((np.zeros((X.shape[0], 1)), scipy.integrate.cumulative_trapezoid(X, axis=1)))*dt + + # Solve for A and the integration constants + A, C = _solve_for_A_and_C_given_X_and_Xdot(integral_X, integral_Xdot, order, dt, gamma, solver=solver) + + # Add the integration constants + Csum = 0 + t = np.arange(0, X.shape[1])*dt + for n in range(0, order - 1): + C_subscript = n + t_exponent = order - n - 2 + den = math.factorial(t_exponent) + Cn = np.vstack([1/den*C[i, C_subscript]*t**t_exponent for i in range(X.shape[0])]) + Csum = Csum + Cn + Csum = np.array(Csum) + + # Use A and C to calculate the derivative + Xdot_reconstructed = (A@X + Csum) + dxdt_hat = np.ravel(Xdot_reconstructed[-1, :]) + + x_hat = utility.integrate_dxdt_hat(dxdt_hat, dt) + x_hat = x_hat + utility.estimate_initial_condition(x+mean, x_hat) - # weights - w = np.arange(1, len(x_hat_forward)+1,1)[::-1] - w = np.pad(w, [0, len(x)-len(w)], mode='constant') - wfb = np.vstack((w, w[::-1])) - norm = np.sum(wfb, axis=0) + return x_hat, dxdt_hat - # orient and pad - x_hat_forward = np.pad(x_hat_forward, [0, len(x)-len(x_hat_forward)], mode='constant') - x_hat_backward = np.pad(x_hat_backward[::-1], [len(x)-len(x_hat_backward), 0], mode='constant') + if not window_size: + return _lineardiff(x, dt, order, gamma, solver) - # merge - x_hat = x_hat_forward*w/norm + x_hat_backward*w[::-1]/norm - x_hat, dxdt_hat = finite_difference(x_hat, dt) + kernel = {'gaussian':utility.gaussian_kernel, 'friedrichs':utility.friedrichs_kernel}[kernel](window_size) - return x_hat, dxdt_hat + x_hat_forward, _ = utility.slide_function(_lineardiff, x, dt, kernel, order, gamma, step_size=step_size, solver=solver) + x_hat_backward, _ = utility.slide_function(_lineardiff, x[::-1], dt, kernel, order, gamma, step_size=step_size, solver=solver) + + # weights + w = np.arange(1, len(x_hat_forward)+1,1)[::-1] + w = np.pad(w, [0, len(x)-len(w)], mode='constant') + wfb = np.vstack((w, w[::-1])) + norm = np.sum(wfb, axis=0) + + # orient and pad + x_hat_forward = np.pad(x_hat_forward, [0, len(x)-len(x_hat_forward)], mode='constant') + x_hat_backward = np.pad(x_hat_backward[::-1], [len(x)-len(x_hat_backward), 0], mode='constant') + + # merge + x_hat = x_hat_forward*w/norm + x_hat_backward*w[::-1]/norm + x_hat, dxdt_hat = finite_difference(x_hat, dt) + + return x_hat, dxdt_hat - return _lineardiff(x, dt, params, options) ############################### # Fourier Spectral derivative # @@ -498,7 +370,7 @@ def spectraldiff(x, dt, params=None, options=None, high_freq_cutoff=None, even_e pre = x[0]*np.ones(padding) post = x[-1]*np.ones(padding) x = np.hstack((pre, x, post)) - x_hat, _ = smooth_finite_difference.meandiff(x, dt, [int(padding/2)], options={'iterate': False}) + x_hat, _ = smooth_finite_difference.meandiff(x, dt, window_size=int(padding/2)) x_hat[padding:-padding] = x[padding:-padding] x = x_hat else: diff --git a/pynumdiff/smooth_finite_difference/_smooth_finite_difference.py b/pynumdiff/smooth_finite_difference/_smooth_finite_difference.py index 88fecd7..0bf252c 100644 --- a/pynumdiff/smooth_finite_difference/_smooth_finite_difference.py +++ b/pynumdiff/smooth_finite_difference/_smooth_finite_difference.py @@ -66,7 +66,7 @@ def meandiff(x, dt, params=None, options={}, window_size=5, num_iterations=1): if 'iterate' in options and options['iterate']: num_iterations = params[1] - kernel = utility._mean_kernel(window_size) + kernel = utility.mean_kernel(window_size) x_hat = utility.convolutional_smoother(x, kernel, num_iterations) x_hat, dxdt_hat = finite_difference(x_hat, dt) @@ -95,7 +95,7 @@ def gaussiandiff(x, dt, params=None, options={}, window_size=5, num_iterations=1 if 'iterate' in options and options['iterate']: num_iterations = params[1] - kernel = utility._gaussian_kernel(window_size) + kernel = utility.gaussian_kernel(window_size) x_hat = utility.convolutional_smoother(x, kernel, num_iterations) x_hat, dxdt_hat = finite_difference(x_hat, dt) @@ -124,7 +124,7 @@ def friedrichsdiff(x, dt, params=None, options={}, window_size=5, num_iterations if 'iterate' in options and options['iterate']: num_iterations = params[1] - kernel = utility._friedrichs_kernel(window_size) + kernel = utility.friedrichs_kernel(window_size) x_hat = utility.convolutional_smoother(x, kernel, num_iterations) x_hat, dxdt_hat = finite_difference(x_hat, dt) diff --git a/pynumdiff/tests/test_diff_methods.py b/pynumdiff/tests/test_diff_methods.py index 7309160..c725602 100644 --- a/pynumdiff/tests/test_diff_methods.py +++ b/pynumdiff/tests/test_diff_methods.py @@ -33,7 +33,7 @@ def iterated_first_order(*args, **kwargs): return first_order(*args, **kwargs) (first_order, {}), # empty dictionary for the case of no parameters. no params -> no diff in new vs old (iterated_first_order, {'num_iterations':5}), (iterated_first_order, [5], {'iterate':True}), (second_order, {}), - (lineardiff, {'order':3, 'gamma':5, 'window_size':10, 'solver':'CLARABEL'}), (lineardiff, [3, 5, 10], {'solver':'CLARABEL'}), + (lineardiff, {'order':3, 'gamma':5, 'window_size':11, 'solver':'CLARABEL'}), (lineardiff, [3, 5, 11], {'solver':'CLARABEL'}), (polydiff, {'polynomial_order':2, 'window_size':3}), (polydiff, [2, 3]), (savgoldiff, {'polynomial_order':2, 'window_size':4, 'smoothing_win':4}), (savgoldiff, [2, 4, 4]), (spectraldiff, {'high_freq_cutoff':0.1}), (spectraldiff, [0.1]), @@ -79,8 +79,8 @@ def iterated_first_order(*args, **kwargs): return first_order(*args, **kwargs) [(-25, -25), (3, 3), (0, 0), (3, 3)]], lineardiff: [[(-6, -6), (-5, -6), (0, -1), (0, 0)], [(0, 0), (1, 1), (0, 0), (1, 1)], + [(1, 0), (2, 2), (1, 0), (2, 2)], [(1, 0), (2, 1), (1, 0), (2, 1)], - [(1, 0), (1, 1), (1, 0), (1, 1)], [(1, 1), (2, 2), (1, 1), (2, 2)], [(1, 1), (3, 3), (1, 1), (3, 3)]], polydiff: [[(-14, -15), (-14, -14), (0, -1), (1, 1)], diff --git a/pynumdiff/tests/test_optimize.py b/pynumdiff/tests/test_optimize.py index 4bcc374..b6a4f9e 100644 --- a/pynumdiff/tests/test_optimize.py +++ b/pynumdiff/tests/test_optimize.py @@ -145,8 +145,8 @@ def test_spectraldiff(): def test_polydiff(): params_1, val_1 = polydiff(x, dt, params=None, tvgamma=tvgamma, dxdt_truth=dxdt_truth) params_2, val_2 = polydiff(x, dt, params=None, tvgamma=0, dxdt_truth=None) - assert params_1 == [6, 50] - assert params_2 == [4, 10] + assert params_1 == [2, 10] + assert params_2 == [2, 10] # def test_chebydiff(self): # try: diff --git a/pynumdiff/tests/test_utils.py b/pynumdiff/tests/test_utils.py index 882570d..d0f2884 100644 --- a/pynumdiff/tests/test_utils.py +++ b/pynumdiff/tests/test_utils.py @@ -5,23 +5,27 @@ import numpy as np from pynumdiff.utils import utility, simulate, evaluate +np.random.seed(42) # The answer to life, the universe, and everything def test_integrate_dxdt_hat(): + """For known simple functions, make sure integration is as expected""" dt = 0.1 - for dxdt,expected in [(np.ones(10), np.arange(0, 1, 0.1)), # constant derivative + for dxdt,expected in [(np.ones(10), np.arange(0, 1, dt)), # constant derivative (np.linspace(0, 1, 11), [0, 0.005, 0.02, 0.045, 0.08, 0.125, 0.18, 0.245, 0.32, 0.405, 0.5]), # linear derivative (np.array([1.0]), [0])]: # edge case of just one entry x_hat = utility.integrate_dxdt_hat(dxdt, dt) - np.testing.assert_allclose(x_hat, expected) + assert np.allclose(x_hat, expected) assert len(x_hat) == len(dxdt) + def test_estimate_initial_condition(): + """For known simple functions, make sure the initial condition is as expected""" for x,x_hat,c in [([1.0, 2.0, 3.0, 4.0, 5.0], [0.0, 1.0, 2.0, 3.0, 4.0], 1), # Perfect alignment case, xhat shifted by 1 (np.ones(5)*10, np.ones(5)*5, 5), ([0], [1], -1)]: x0 = utility.estimate_initial_condition(x, x_hat) - np.testing.assert_allclose(x0, float(c), rtol=1e-3) + assert np.allclose(x0, float(c), rtol=1e-3) np.random.seed(42) # Noisy case. Seed for reproducibility x0 = utility.estimate_initial_condition([1.0, 2.0, 3.0, 4.0, 5.0], @@ -29,6 +33,51 @@ def test_estimate_initial_condition(): assert 0.9 < x0 < 1.1 # The result should be close to 1.0, but not exactly due to noise +def test_hankel_matrix(): + """Ensure Hankel matrix comes back as defined""" + assert np.allclose(utility.hankel_matrix([1, 2, 3, 4, 5], 3), [[1, 2, 3],[2, 3, 4],[3, 4, 5]]) + + +def test_peakdet(request): + """Verify peakdet finds peaks and valleys""" + t = np.arange(0, 10, 0.001) + x = 0.3*np.sin(t) + np.sin(1.3*t) + 0.9*np.sin(4.2*t) + 0.02*np.random.randn(10000) + maxtab, mintab = utility.peakdet(x, 0.5, t) + + if request.config.getoption("--plot"): + from matplotlib import pyplot + pyplot.plot(t, x) + pyplot.plot(mintab[:,0], mintab[:,1], 'g*') + pyplot.plot(maxtab[:,0], maxtab[:,1], 'r*') + pyplot.title('peakdet validataion') + pyplot.show() + + assert np.allclose(maxtab, [[0.473, 1.59843494], # these numbers validated by eye with --plot + [1.795, 1.90920786], + [3.314, -0.04585991], + [4.992, 0.74798665], + [6.345, 1.89597554], + [7.778, 0.57190318], + [9.424, 0.58764606]]) + assert np.allclose(mintab, [[1.096, 0.30361178], + [2.739, -1.12624328], + [4.072, -2.00254655], + [5.582, -0.31529832], + [7.135, -0.58327787], + [8.603, -1.71278265]]) + +def test_slide_function(): + """Verify the slide function's weighting scheme calculates as expected""" + def identity(x, dt): return x, 0 # should come back the same + + x = np.arange(100) + kernel = utility.gaussian_kernel(9) + + x_hat, dxdt_hat = utility.slide_function(identity, x, 0.1, kernel, step_size=2) + + assert np.allclose(x, x_hat) + + def test_simulate(): return diff --git a/pynumdiff/total_variation_regularization/_total_variation_regularization.py b/pynumdiff/total_variation_regularization/_total_variation_regularization.py index 9c29e26..06717a9 100644 --- a/pynumdiff/total_variation_regularization/_total_variation_regularization.py +++ b/pynumdiff/total_variation_regularization/_total_variation_regularization.py @@ -219,7 +219,7 @@ def smooth_acceleration(x, dt, params=None, options=None, gamma=None, window_siz _, dxdt_hat = _total_variation_regularized_derivative(x, dt, 2, gamma, solver=solver) - kernel = utility._gaussian_kernel(window_size) + kernel = utility.gaussian_kernel(window_size) dxdt_hat = utility.convolutional_smoother(dxdt_hat, kernel, 1) x_hat = utility.integrate_dxdt_hat(dxdt_hat, dt) diff --git a/pynumdiff/utils/utility.py b/pynumdiff/utils/utility.py index 115f308..b3908eb 100644 --- a/pynumdiff/utils/utility.py +++ b/pynumdiff/utils/utility.py @@ -2,15 +2,14 @@ import numpy as np -def hankel_matrix(x, num_delays, pad=False): # fixed delay step of 1 - """ - :param x: numpy array or matrix - :param num_delays: int, number of times to 1-step shift data - :param pad: - :return: a Hankel Matrix m +def hankel_matrix(x, num_delays, pad=False): # fixed delay step of 1 + """Unused throughout the repo - e.g. if - x = [a, b, c, d, e] and num_delays = 3 + :param np.array[float] x: data + :param int num_delays: number of times to 1-step shift data + :param bool pad: if True, return width is len(x), else width is len(x) - num_delays + 1 + :return: a Hankel Matrix m + e.g. if x = [a, b, c, d, e] and num_delays = 3 then with pad = False: m = [['a', 'b', 'c'], ['b', 'c', 'd'], @@ -20,19 +19,18 @@ def hankel_matrix(x, num_delays, pad=False): # fixed delay step of 1 ['b', 'c', 'd', 'e', 0], ['c', 'd', 'e', 0, 0]] """ - m = copy.copy(x) + m = x.copy() for d in range(1, num_delays): - xi = x[:, d:] - xi = np.pad(xi, ((0, 0), (0, x.shape[1]-xi.shape[1])), 'constant', constant_values=0) + xi = x[d:] + xi = np.pad(xi, (0, len(x)-len(xi)), 'constant', constant_values=0) m = np.vstack((m, xi)) if not pad: - return m[:, 0:-1*num_delays] + return m[:, 0:-1*num_delays+1] return m def matrix_inv(X, max_sigma=1e-16): - """ - Stable (pseudo) matrix inversion using singular value decomposition + """Stable (pseudo) matrix inversion using singular value decomposition. Unused throughout the repo. :param X: matrix to invert :type X: np.matrix or np.array @@ -50,83 +48,70 @@ def matrix_inv(X, max_sigma=1e-16): def total_variation(x): - """ - Calculate the total variation of an array - - :param x: timeseries - :type x: np.array - - :return: total variation - :rtype: float + """Calculate the total variation of an array. Used by optimizer. + :param np.array[float] x: data + :return: float total variation """ if np.isnan(x).any(): return np.nan - x1 = np.ravel(x)[0:-1] + x1 = np.ravel(x)[:-1] x2 = np.ravel(x)[1:] return np.sum(np.abs(x2-x1))/len(x1) # mostly equivalent to cvxpy.tv(x2-x1).value -def peakdet(v, delta, x=None): - """ - Find peaks and valleys of 1D array. A point is considered a maximum peak if it has the maximal value, and was preceded (to the left) by a value lower by delta. - - Converted from MATLAB script at http://billauer.co.il/peakdet.html - % Eli Billauer, 3.4.05 (Explicitly not copyrighted). - % This function is released to the public domain; Any use is allowed. - - :param v: array for which to find peaks and valleys - :typpe v: np.array +def peakdet(x, delta, t=None): + """Find peaks and valleys of 1D array. A point is considered a maximum peak if it has the maximal + value, and was preceded (to the left) by a value lower by delta. Converted from MATLAB script at + http://billauer.co.il/peakdet.html Eli Billauer, 3.4.05 (Explicitly not copyrighted). This function + is released to the public domain; Any use is allowed. - :param delta: threshold for finding peaks and valleys. A point is considered a maximum peak if it has the maximal value, and was preceded (to the left) by a value lower by delta. - :type delta: float - - :return: tuple of min and max locations and values: - - maxtab: array with locations (column 1) and values of maxima (column 2) - - mintab: array with locations (column 1) and values of minima (column 2) - :rtype: tuple -> (np.array, np.array) + :param np.array[float] x: array for which to find peaks and valleys + :param float delta: threshold for finding peaks and valleys. A point is considered a maximum peak + if it has the maximal value, and was preceded (to the left) by a value lower by delta. + :param np.array[float] t: optional domain points where data comes from, to make indices into locations + :return: tuple[np.array, np.array] of\n + - **maxtab** -- indices or locations (column 1) and values (column 2) of maxima + - **mintab** -- indices or locations (column 1) and values (column 2) of minima """ maxtab = [] mintab = [] - if x is None: - x = np.arange(len(v)) - v = np.asarray(v) - if len(v) != len(x): - sys.exit('Input vectors v and x must have same length') - if not np.isscalar(delta): - sys.exit('Input argument delta must be a scalar') - if delta <= 0: - sys.exit('Input argument delta must be positive') - - mn, mx = np.Inf, -1*np.Inf - mnpos, mxpos = np.NaN, np.NaN + if t is None: + t = np.arange(len(x)) + elif len(x) != len(t): + raise ValueError('Input vectors x and t must have same length') + if not (np.isscalar(delta) and delta > 0): + raise ValueError('Input argument delta must be a positive scalar') + + mn, mx = np.inf, -1*np.inf + mnpos, mxpos = np.nan, np.nan lookformax = True - for i in np.arange(len(v)): - this = v[i] + for i in np.arange(len(x)): + this = x[i] if this > mx: mx = this - mxpos = x[i] + mxpos = t[i] if this < mn: mn = this - mnpos = x[i] + mnpos = t[i] if lookformax: if this < mx-delta: maxtab.append((mxpos, mx)) mn = this - mnpos = x[i] - lookformax = False + mnpos = t[i] + lookformax = False # now searching for a min else: if this > mn+delta: mintab.append((mnpos, mn)) mx = this - mxpos = x[i] - lookformax = True + mxpos = t[i] + lookformax = True # now searching for a max return np.array(maxtab), np.array(mintab) -# Trapazoidal integration, with interpolated final point so that the lengths match. +# Trapazoidal integration, with 0 first value so that the lengths match. See #88. def integrate_dxdt_hat(dxdt_hat, dt): """Wrapper for scipy.integrate.cumulative_trapezoid to integrate dxdt_hat that ensures the integral has the same length @@ -154,27 +139,24 @@ def estimate_initial_condition(x, x_hat): # kernels -def _mean_kernel(window_size): +def mean_kernel(window_size): """A uniform boxcar of total integral 1""" return np.ones(window_size)/window_size - -def _gaussian_kernel(window_size): +def gaussian_kernel(window_size): """A truncated gaussian""" sigma = window_size / 6. t = np.linspace(-2.7*sigma, 2.7*sigma, window_size) ker = 1/np.sqrt(2*np.pi*sigma**2) * np.exp(-(t**2)/(2*sigma**2)) # gaussian function itself return ker / np.sum(ker) - -def _friedrichs_kernel(window_size): +def friedrichs_kernel(window_size): """A bump function""" x = np.linspace(-0.999, 0.999, window_size) ker = np.exp(-1/(1-x**2)) return ker / np.sum(ker) - -def convolutional_smoother(x, kernel, iterations): +def convolutional_smoother(x, kernel, iterations=1): """Perform smoothing by convolving x with a kernel. :param np.array[float] x: 1D data @@ -183,7 +165,7 @@ def convolutional_smoother(x, kernel, iterations): :return: **x_hat** (np.array[float]) -- smoothed x """ x_hat = np.hstack((x[::-1], x, x[::-1])) # pad - w = np.arange(len(x_hat)) / (len(x_hat) - 1) # weights + w = np.linspace(0, 1, len(x_hat)) # weights for _ in range(iterations): x_hat_f = np.convolve(x_hat, kernel, 'same') @@ -192,3 +174,47 @@ def convolutional_smoother(x, kernel, iterations): x_hat = x_hat_f*w + x_hat_b*(1-w) return x_hat[len(x):len(x)*2] + + +def slide_function(func, x, dt, kernel, *args, step_size=1, pass_weights=False, **kwargs): + """Slide a smoothing derivative function across a timeseries with specified window size. + + :param callable func: name of the function to slide + :param np.array[float] x: data to differentiate + :param float dt: step size + :param np.array[float] kernel: values to weight the sliding window + :param list args: passed to func + :param int step_size: step size for slide (e.g. 1 means slide by 1 step) + :param bool pass_weights: whether weights should be passed to func via update to kwargs + :param dict kwargs: passed to func + + :return: tuple[np.array, np.array] of\n + - **x_hat** -- estimated (smoothed) x + - **dxdt_hat** -- estimated derivative of x + """ + if len(kernel) % 2 == 0: raise ValueError("Kernel window size should be odd.") + half_window_size = (len(kernel) - 1)//2 # int because len(kernel) is always odd + + weights = np.zeros((int(np.ceil(len(x)/step_size)), len(x))) + x_hats = np.zeros(weights.shape) + dxdt_hats = np.zeros(weights.shape) + + for i,midpoint in enumerate(range(0, len(x), step_size)): # iterate window midpoints + # find where to index data and kernel, taking care at edges + window = slice(max(0, midpoint - half_window_size), + min(len(x), midpoint + half_window_size + 1)) # +1 because slicing works [,) + kslice = slice(max(0, half_window_size - midpoint), + min(len(kernel), len(kernel) - (midpoint + half_window_size + 1 - len(x)))) + + # weights need to be renormalized if running off an edge + weights[i, window] = kernel if kslice.stop - kslice.stop == len(kernel) else kernel[kslice]/np.sum(kernel[kslice]) + if pass_weights: kwargs['weights'] = weights[i, window] + + # run the function on the window and save results + x_hats[i,window], dxdt_hats[i,window] = func(x[window], dt, *args, **kwargs) + + weights /= weights.sum(axis=0, keepdims=True) # normalize the weights + x_hat = np.sum(weights*x_hats, axis=0) + dxdt_hat = np.sum(weights*dxdt_hats, axis=0) + + return x_hat, dxdt_hat