Skip to content

Conversation

@pavelkomarov
Copy link
Collaborator

@pavelkomarov pavelkomarov commented Aug 23, 2025

Addressing #150 and #80.

.. autofunction:: pynumdiff.kalman_smooth.constant_velocity
.. autofunction:: pynumdiff.kalman_smooth.constant_acceleration
.. autofunction:: pynumdiff.kalman_smooth.constant_jerk
.. autofunction:: pynumdiff.kalman_smooth.known_dynamics
Copy link
Collaborator Author

@pavelkomarov pavelkomarov Aug 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've decided to make my kalman filter function public, which then becomes kind of redundant with known_dynamics. I'm pretty sure no one was using known_dynamics, so I've removed it.

:param np.array y: series of measurements, stacked along axis 0. In this case just the raw noisy data (fully observable)
:param np.array A: discrete-time state transition matrix

def kalman_filter(y, _t, xhat0, P0, A, Q, C, R, B=None, u=None, save_P=True):
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was pretty tricky to get right, to cover all cases while keeping the code short. The parameters are now grouped by theme, data, initialization, process dynamics, measurement stuff, control stuff, return option.

xhat_pre = []; xhat_post = []; P_pre = []; P_post = []
N = y.shape[0]
m = xhat0.shape[0] # dimension of the state
xhat_post = np.empty((N,m))
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than use lists and have to convert to numpy arrays at the end, I'm preallocating and filling a little more carefully.

P_pre = np.empty((N,m,m)) # _post = a posteriori combinations of all information available at a step
P_post = np.empty((N,m,m))
# determine some things ahead of the loop
equispaced = np.isscalar(_t)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isscalar became one of my favorite new functions.


xhat = xhat_.copy() # copies very important here. See #122
for n in range(N):
if n == 0: # first iteration is a special case, involving less work
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't love having to do this check for every loop iteration, but if you don't, then you have repeated code above the loop, or you have to flip the a priori/a posteriori compute order in the loop, and you'd get a special case at the last index anyway. Hard to do as cleanly as I'd like, but a conditional check is super cheap.

eM = expm(M * dt) # form discrete-time matrices TODO doesn't work at n=0
An = eM[:m,:m] # upper left block
Qn = eM[:m,m:] @ An.T # upper right block
if dt < 0: Qn = np.abs(Qn) # eigenvalues go negative if reverse time, but noise shouldn't shrink
Copy link
Collaborator Author

@pavelkomarov pavelkomarov Aug 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is sort of a subtle thing.

In the backward/inverse dynamics case, you can get $A^{-1}$ by the same exponentiation trick by just using negative dt, which naturally shows up here if we reverse the time array, but $Q$ should be unchanged whether time is going forward or back, because noise covariance shouldn't shrink on us; entropy always increases. If you use the wrong $Q$, you get loopy, oscillatory answers. Took me a second to think about it. The $Q$ that comes out in that case, though, is the same as the forward $Q$ but with a criss-cross negation pattern. Try for yourself:

from scipy.linalg import expm
import numpy as np

_t = 0.01
q = 1000
order = 2

A = np.diag(np.ones(order), 1)
Q = np.zeros(A.shape); Q[-1,-1] = q
M = np.block([[A, Q],[np.zeros(A.shape), -A.T]])

Mn = expm(M * _t) # form discrete-time versions
Mninv = expm(M * -_t)

A = Mn[:order+1,:order+1]
Ainv = Mninv[:order+1,:order+1]
Q = Mn[:order+1,order+1:] @ A.T
Qrev = Mninv[:order+1,order+1:] @ Ainv.T

print(Q)
print(Qrev)

So to get back the $Q$ we want, we can just abs.

equispaced = np.isscalar(_t)
control = isinstance(B, np.ndarray) and isinstance(B, np.ndarray) # whether there is a control input
if equispaced:
An, Qn, Bn = A, Q, B # in this case only need to assign once
Copy link
Collaborator Author

@pavelkomarov pavelkomarov Aug 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want the general filtering function to be able to take discrete matrices if you're doing discrete filtering, because it seems more natural to not have to give them in continuous-time if the system is constant-dt and has the same discrete updates on each iteration. So then I have to treat the matrices slightly differently internally, which is an annoying complication.

Users may prefer continuous time sometimes. Depends. It's certainly easier to set up the naive constant-derivative system in continuous time, which is what I now do in rtsdiff, and having to exponentiate right after to find discrete matrices is an awkward step, but it's maybe the most technically correct way, because that's where the discrete matrices really come from.

There may be a cleaner way to organize this, like a continuous_or_discrete parameter, but at this round of changes, a truly better solution isn't occurring to me.

for n in range(N):
if n == 0: # first iteration is a special case, involving less work
xhat_ = xhat0
P_ = P0
Copy link
Collaborator Author

@pavelkomarov pavelkomarov Aug 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've decided it makes more sense to treat the initial guesses as a priori for the first step based on some notional past history, rather than a posteriori from a notional previous step. This is closer to how the code was before my changes, and Chat tells me this is the more standard thing to do too. You skip passing the initial guesses through the dynamics, which is fine, and maybe more intuitive anyway. Another hint this is the right thing to do is that in the variable step case we get to skip computing expm using an unknown dt and only calculate dt at n > 0 when there is a valid n-1 index.


return xhat_post if not save_P else (xhat_pre, xhat_post, P_pre, P_post)

return np.stack(xhat_pre, axis=0), np.stack(xhat_post, axis=0), np.stack(P_pre, axis=0), np.stack(P_post, axis=0)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stacking begone!

:return: tuple[np.array, np.array] of\n
- **xhat_smooth** -- RTS smoothed xhat
- **P_smooth** -- RTS smoothed P
if :code:`compute_P_smooth` is :code:`True` else only **xhat_smooth**
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked that this renders okay in Sphinx.

P_smooth.append(P_post[n] + C_RTS @ (P_smooth[-1] - P_pre[n+1]) @ C_RTS.T) # be confused with the measurement matrix

return np.stack(xhat_smooth[::-1], axis=0), np.stack(P_smooth[::-1], axis=0) # reverse lists
if not equispaced: An = expm(A * (_t[n+1] - _t[n])) # state transition from n to n+1, per the paper
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a little simpler to handle the discrete/continuous divide in this function.


return xhat_smooth if not compute_P_smooth else (xhat_smooth, P_smooth)

def _RTS_smooth(xhat0, P0, y, A, C, Q, R, u=None, B=None):
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function was a little redundant, and I was the one to create it, so I've yoinked it back out of existence.

if order == 1:
A = np.array([[1, dt], [0, 1]]) # states are x, x'. This basically does an Euler update.
C = np.array([[1, 0]]) # we measure only y = noisy x
R = np.array([[r]])
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realized a lot of this was redundant and could have been done outside the cases. It's more consolidated now.

[0, 0, 0, q]]) # uncertainty is around the jerk
P0 = np.array(100*np.eye(4)) # See #110 for why this choice of P0

A = np.diag(np.ones(order), 1) # continuous-time A just has 1s on the first diagonal (where 0th is main diagonal)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are neat ways to make the matrices flexibly, given the order.

Q = eM[:order+1,order+1:] @ A.T

xhat_pre, xhat_post, P_pre, P_post = kalman_filter(x, _t, xhat0, P0, A, Q, C, R) # noisy x are the "y" in Kalman-land
xhat_smooth = rts_smooth(_t, A, xhat_pre, xhat_post, P_pre, P_post, compute_P_smooth=False)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

compute_P_smooth=False now makes this execution just a bit cheaper, because why not.

return rtsdiff(x, dt, 3, q/r, forwardbackward)


def known_dynamics(x, params, u=None, options=None, xhat0=None, P0=None, A=None,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was essentially an interface to the filtering and smoothing functions, which now are exposed directly.

[(-1, -2), (0, 0), (0, -1), (0, 0)],
[(1, 0), (2, 1), (1, 0), (2, 1)],
[(1, 1), (3, 3), (1, 1), (3, 3)]],
constant_velocity: [[(-25, -25), (-25, -25), (0, -1), (1, 1)],
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mostly these improved.

[(-3, -3), (-1, -1), (0, -1), (1, 1)],
[(-1, -1), (1, 1), (0, -1), (1, 1)],
[(0, 0), (3, 3), (0, 0), (3, 3)]],
rtsdiff: [[(-25, -25), (-25, -25), (0, -1), (1, 1)],
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one is tested in the irregular dt list, and it works great.

@pavelkomarov pavelkomarov merged commit 6161d35 into master Aug 23, 2025
1 check passed
@pavelkomarov pavelkomarov deleted the kalman-variable-dt branch August 24, 2025 00:28
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants