Skip to content
Merged
6 changes: 3 additions & 3 deletions pynumdiff/linear_model/_linear_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def _polydiff(x, dt, polynomial_order, weights=None):
return _polydiff(x, dt, polynomial_order)

kernel = {'gaussian':utility.gaussian_kernel, 'friedrichs':utility.friedrichs_kernel}[kernel](window_size)
return utility.slide_function(_polydiff, x, dt, kernel, polynomial_order, step_size=step_size, pass_weights=True)
return utility.slide_function(_polydiff, x, dt, kernel, polynomial_order, stride=step_size, pass_weights=True)


#############
Expand Down Expand Up @@ -312,8 +312,8 @@ def _lineardiff(x, dt, order, gamma, solver=None):

kernel = {'gaussian':utility.gaussian_kernel, 'friedrichs':utility.friedrichs_kernel}[kernel](window_size)

x_hat_forward, _ = utility.slide_function(_lineardiff, x, dt, kernel, order, gamma, step_size=step_size, solver=solver)
x_hat_backward, _ = utility.slide_function(_lineardiff, x[::-1], dt, kernel, order, gamma, step_size=step_size, solver=solver)
x_hat_forward, _ = utility.slide_function(_lineardiff, x, dt, kernel, order, gamma, stride=step_size, solver=solver)
x_hat_backward, _ = utility.slide_function(_lineardiff, x[::-1], dt, kernel, order, gamma, stride=step_size, solver=solver)

# weights
w = np.arange(1, len(x_hat_forward)+1,1)[::-1]
Expand Down
1 change: 1 addition & 0 deletions pynumdiff/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ def store_plots(request):
request.config.plots = defaultdict(lambda: pyplot.subplots(6, 2, figsize=(12,7))) # 6 is len(test_funcs_and_derivs)

def pytest_sessionfinish(session, exitstatus):
if not hasattr(session.config, 'plots'): return
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Occasionally I'd get a failure while testing about this not being able to plot when that wasn't at all relevant. The cause was always a syntax error in the test code file, but this would obfuscate that with a different confusing error. Now we fall through to highlight the syntax error.

for method,(fig,axes) in session.config.plots.items():
axes[-1,-1].legend()
fig.suptitle(method.__name__)
Expand Down
16 changes: 11 additions & 5 deletions pynumdiff/tests/test_diff_methods.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@
from warnings import warn

from ..linear_model import lineardiff, polydiff, savgoldiff, spectraldiff
from ..total_variation_regularization import velocity, acceleration, jerk, iterative_velocity, smooth_acceleration
from ..total_variation_regularization import velocity, acceleration, jerk, iterative_velocity, smooth_acceleration, jerk_sliding
from ..kalman_smooth import constant_velocity, constant_acceleration, constant_jerk
from ..smooth_finite_difference import mediandiff, meandiff, gaussiandiff, friedrichsdiff, butterdiff, splinediff
from ..finite_difference import first_order, second_order
# Function aliases for testing cases where parameters change the behavior in a big way
def iterated_first_order(*args, **kwargs): return first_order(*args, **kwargs)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I briefly had some other vectors up here, but switching between them in the test is annoying, so I've tried to keep everyone on this one t and dt.

dt = 0.1
t = np.arange(0, 3+dt, dt) # sample locations, including the endpoint
t = np.linspace(0, 3, 31) # sample locations, including the endpoint
tt = np.linspace(0, 3) # full domain, for visualizing denser plots
np.random.seed(7) # for repeatability of the test, so we don't get random failures
noise = 0.05*np.random.randn(*t.shape)
Expand Down Expand Up @@ -51,8 +51,8 @@ def iterated_first_order(*args, **kwargs): return first_order(*args, **kwargs)
(acceleration, {'gamma':1}), (acceleration, [1]),
(jerk, {'gamma':10}), (jerk, [10]),
(iterative_velocity, {'num_iterations':5, 'gamma':0.05}), (iterative_velocity, [5, 0.05]),
(smooth_acceleration, {'gamma':2, 'window_size':5}), (smooth_acceleration, [2, 5])
# TODO (jerk_sliding), because with the test cases here (len < 1000) it would just be a duplicate of jerk
(smooth_acceleration, {'gamma':2, 'window_size':5}), (smooth_acceleration, [2, 5]),
(jerk_sliding, {'gamma':1, 'window_size':15}), (jerk_sliding, [1], {'window_size':15})
]

# All the testing methodology follows the exact same pattern; the only thing that changes is the
Expand Down Expand Up @@ -184,7 +184,13 @@ def iterated_first_order(*args, **kwargs): return first_order(*args, **kwargs)
[(-2, -2), (-1, -1), (-1, -1), (0, -1)],
[(0, 0), (1, 0), (0, -1), (1, 0)],
[(1, 1), (2, 2), (1, 1), (2, 2)],
[(1, 1), (3, 3), (1, 1), (3, 3)]]
[(1, 1), (3, 3), (1, 1), (3, 3)]],
jerk_sliding: [[(-15, -15), (-16, -17), (0, -1), (1, 0)],
[(-14, -14), (-14, -14), (0, -1), (0, 0)],
[(-14, -14), (-14, -14), (0, -1), (0, 0)],
[(-1, -1), (0, 0), (0, -1), (0, 0)],
[(0, 0), (2, 2), (0, 0), (2, 2)],
[(1, 1), (3, 3), (1, 1), (3, 3)]]
}

# Essentially run the cartesian product of [diff methods] x [test functions] through this one test
Expand Down
2 changes: 1 addition & 1 deletion pynumdiff/tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def identity(x, dt): return x, 0 # should come back the same
x = np.arange(100)
kernel = utility.gaussian_kernel(9)

x_hat, dxdt_hat = utility.slide_function(identity, x, 0.1, kernel, step_size=2)
x_hat, dxdt_hat = utility.slide_function(identity, x, 0.1, kernel, stride=2)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I decided to rename this parameter. I like stride better, because dt is a step size. Less ambiguous.


assert np.allclose(x, x_hat)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ def _total_variation_regularized_derivative(x, dt, order, gamma, solver=None):
dxdt_hat = y/dt # y only holds the dx values; to get deriv scale by dt
x_hat = np.cumsum(y) + integration_constants.value[order-1] # smoothed data

dxdt_hat = (dxdt_hat[0:-1] + dxdt_hat[1:])/2 # take first order FD to smooth a touch
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for that little 0.

dxdt_hat = (dxdt_hat[:-1] + dxdt_hat[1:])/2 # take first order FD to smooth a touch
ddxdt_hat_f = dxdt_hat[-1] - dxdt_hat[-2]
dxdt_hat_f = dxdt_hat[-1] + ddxdt_hat_f # What is this doing? Could we use a 2nd order FD above natively?
dxdt_hat = np.hstack((dxdt_hat, dxdt_hat_f))
Expand All @@ -106,7 +106,7 @@ def _total_variation_regularized_derivative(x, dt, order, gamma, solver=None):
d = dxdt_hat[2] - dxdt_hat[1]
dxdt_hat[0] = dxdt_hat[1] - d

return x_hat*std+mean, dxdt_hat*std
return x_hat*std+mean, dxdt_hat*std # derivative is linear, so scale derivative by std


def velocity(x, dt, params=None, options=None, gamma=None, solver=None):
Expand Down Expand Up @@ -229,7 +229,7 @@ def smooth_acceleration(x, dt, params=None, options=None, gamma=None, window_siz
return x_hat, dxdt_hat


def jerk_sliding(x, dt, params=None, options=None, gamma=None, solver=None):
def jerk_sliding(x, dt, params=None, options=None, gamma=None, solver=None, window_size=101):
Copy link
Collaborator Author

@pavelkomarov pavelkomarov Jul 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added the window_size parameter to match methods from the linear module. I have not, however, added the stride/step_size. We could, but I've just kept to striding by the ramp width.

"""Use convex optimization (cvxpy) to solve for the jerk total variation regularized derivative.

:param np.array[float] x: data to differentiate
Expand All @@ -239,6 +239,7 @@ def jerk_sliding(x, dt, params=None, options=None, gamma=None, solver=None):
:param float gamma: the regularization parameter
:param str solver: the solver CVXPY should use, 'MOSEK', 'CVXOPT', 'CLARABEL', 'ECOS', etc.
In testing, 'MOSEK' was the most robust. If not given, fall back to CVXPY's default.
:param int window_size: how wide to make the kernel

:return: tuple[np.array, np.array] of\n
- **x_hat** -- estimated (smoothed) x
Expand All @@ -250,68 +251,16 @@ def jerk_sliding(x, dt, params=None, options=None, gamma=None, solver=None):
gamma = params[0] if isinstance(params, list) else params
if options != None:
if 'solver' in options: solver = options['solver']
if 'window_size' in options: window_size = options['window_size']
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've allowed users to pass this via the old way too, just so I can be consistent in my tests.

elif gamma == None:
raise ValueError("`gamma` must be given.")

window_size = 1000
stride = 200
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hard-coding these makes this method less usable.


if len(x) < window_size:
warn("len(x) <= window_size, calling standard jerk() without sliding")
return _total_variation_regularized_derivative(x, dt, 3, gamma, solver=solver)

# slide the jerk
final_xsmooth = []
final_xdot_hat = []
first_idx = 0
final_idx = first_idx + window_size
last_loop = False

final_weighting = []

try:
while not last_loop:
xsmooth, xdot_hat = _total_variation_regularized_derivative(x[first_idx:final_idx], dt, 3,
gamma, solver=solver)

xsmooth = np.hstack(([0]*first_idx, xsmooth, [0]*(len(x)-final_idx)))
final_xsmooth.append(xsmooth)

xdot_hat = np.hstack(([0]*first_idx, xdot_hat, [0]*(len(x)-final_idx)))
final_xdot_hat.append(xdot_hat)

# blending
w = np.hstack(([0]*first_idx,
np.arange(1, 201)/200,
[1]*(final_idx-first_idx-400),
(np.arange(1, 201)/200)[::-1],
[0]*(len(x)-final_idx)))
final_weighting.append(w)

if final_idx >= len(x):
last_loop = True
else:
first_idx += stride
final_idx += stride
if final_idx > len(x):
final_idx = len(x)
if final_idx - first_idx < 200:
first_idx -= (200 - (final_idx - first_idx))

# normalize columns
weights = np.vstack(final_weighting)
for c in range(weights.shape[1]):
weights[:, c] /= np.sum(weights[:, c])

# weighted sums
xsmooth = np.vstack(final_xsmooth)
xsmooth = np.sum(xsmooth*weights, axis=0)

xdot_hat = np.vstack(final_xdot_hat)
xdot_hat = np.sum(xdot_hat*weights, axis=0)

return xsmooth, xdot_hat

except ValueError:
warn('Solver failed, returning finite difference instead')
from pynumdiff.finite_difference import second_order
return second_order(x, dt)
ramp = window_size//5
kernel = np.hstack((np.arange(1, ramp+1)/ramp, np.ones(window_size - 2*ramp), (np.arange(1, ramp+1)/ramp)[::-1]))
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This kernel isn't exactly the same as before, but it's basically the same shape, the same in spirit.

return utility.slide_function(_total_variation_regularized_derivative, x, dt, kernel, 3, gamma, stride=ramp, solver=solver)


8 changes: 4 additions & 4 deletions pynumdiff/utils/utility.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,15 +176,15 @@ def convolutional_smoother(x, kernel, iterations=1):
return x_hat[len(x):len(x)*2]


def slide_function(func, x, dt, kernel, *args, step_size=1, pass_weights=False, **kwargs):
def slide_function(func, x, dt, kernel, *args, stride=1, pass_weights=False, **kwargs):
"""Slide a smoothing derivative function across a timeseries with specified window size.

:param callable func: name of the function to slide
:param np.array[float] x: data to differentiate
:param float dt: step size
:param np.array[float] kernel: values to weight the sliding window
:param list args: passed to func
:param int step_size: step size for slide (e.g. 1 means slide by 1 step)
:param int stride: step size for slide (e.g. 1 means slide by 1 step)
:param bool pass_weights: whether weights should be passed to func via update to kwargs
:param dict kwargs: passed to func

Expand All @@ -195,11 +195,11 @@ def slide_function(func, x, dt, kernel, *args, step_size=1, pass_weights=False,
if len(kernel) % 2 == 0: raise ValueError("Kernel window size should be odd.")
half_window_size = (len(kernel) - 1)//2 # int because len(kernel) is always odd

weights = np.zeros((int(np.ceil(len(x)/step_size)), len(x)))
weights = np.zeros((int(np.ceil(len(x)/stride)), len(x)))
x_hats = np.zeros(weights.shape)
dxdt_hats = np.zeros(weights.shape)

for i,midpoint in enumerate(range(0, len(x), step_size)): # iterate window midpoints
for i,midpoint in enumerate(range(0, len(x), stride)): # iterate window midpoints
# find where to index data and kernel, taking care at edges
window = slice(max(0, midpoint - half_window_size),
min(len(x), midpoint + half_window_size + 1)) # +1 because slicing works [,)
Expand Down