Figure 4.2:

Comparison of filtering and smoothing updates for the batch reactor system. Second column shows absolute estimate error.

Figure 4.2

Code for Figure 4.2

Text of the GNU GPL.

main.py


  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# [makes] priorupdates.mat
#
# Comparison of different prior update strategies for a nonlinear system.
# Also compares EKF.
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(os.path.abspath(__file__)), 'functions'))

import numpy as np
import casadi
import mpctools as mpc
import scipy.io as sio

Nx = 2
Ny = 1
Nt = 10  # MHE horizon

k1 = 0.16
k2 = 0.0064
Delta = 0.1

def ode(x):
    return np.array([-2*k1*x[0]**2 + 2*k2*x[1],
                      k1*x[0]**2 - k2*x[1]])

def measurement(x):
    return x[0] + x[1]

sim = mpc.getCasadiIntegrator(ode, Delta, [Nx], ['x'], funcname='sim', verbosity=0)
f   = mpc.getCasadiFunc(ode, [Nx], ['x'], funcname='f', rk4=True, Delta=Delta)
h   = mpc.getCasadiFunc(measurement, [Nx], ['x'], funcname='h')

# Simulate system (no noise).
Nsim = 200
xsim = np.full((Nx, Nsim + 1), np.nan)
xsim[:, 0] = [3.0, 1.0]
ysim = np.full((Ny, Nsim + 1), np.nan)
for i in range(Nsim + 1):
    ysim[:, i] = [measurement(xsim[:, i])]
    if i < Nsim:
        xsim[:, i + 1] = np.array(sim(xsim[:, i])).flatten()

t = np.arange(Nsim + 1) * Delta

# MHE costs.
Q     = np.diag([1e-4, 1e-2])
R     = np.diag([1e-2])
P0    = np.diag([1.0, 1.0])
Qinv  = np.linalg.inv(Q)
Rinv  = np.linalg.inv(R)
P0inv = np.linalg.inv(P0)

w_sym = casadi.SX.sym('w', Nx)
v_sym = casadi.SX.sym('v', Ny)
l = casadi.Function('l', [w_sym, v_sym],
                    [casadi.mtimes(w_sym.T, casadi.mtimes(Qinv, w_sym))
                     + casadi.mtimes(v_sym.T, casadi.mtimes(Rinv, v_sym))],
                    ['w', 'v'], ['l'])

x0bar_init = np.array([0.1, 4.5])

N  = {'x': Nx, 'y': Ny, 't': Nt}
lb = {'x': np.zeros(Nx)}

xs = {'actual': xsim}
ys = {'actual': ysim}

# Growing-horizon startup: at step i the MHE has horizon i with measurements
# y(0)..y(i). At i=Nt the window first fills and the persistent full-horizon
# solver takes over. Filtering and smoothing solve identical NLPs while the
# window is filling, so they agree exactly through k=Nt; they diverge once
# newmeasurement begins shifting the window and the prior update fires.
def make_solver(N_t, y_window, priorupdate):
    Nd = {'x': Nx, 'y': Ny, 't': N_t}
    return mpc.nmhe(f, h, np.zeros((N_t, 0)), y_window,
                    l, Nd, lb=lb,
                    par={'x0bar': x0bar_init.copy(),
                         'Pinv':   P0inv.copy().flatten()},
                    funcargs={'f': ['x'], 'h': ['x'], 'l': ['w', 'v']},
                    wAdditive=True, verbosity=0, priorupdate=priorupdate)

for update_type in ['filtering', 'smoothing']:
    print('Simulating MHE with %s update.' % update_type)

    xmhe = np.full((Nx, Nsim + 1), np.nan)
    ymhe = np.full((Ny, Nsim + 1), np.nan)

    full_solver = None

    for i in range(Nsim + 1):
        if i < Nt:
            # Growing-horizon startup: horizon i, measurements y(0)..y(i).
            # Pass priorupdate=update_type so lx (arrival cost) is auto-built
            # from par['x0bar'] and par['Pinv']; saveestimate is never called
            # on these short-window solvers, so the prior-update logic itself
            # is inert and filtering/smoothing solve identical NLPs here.
            y_win = ysim[:, :i + 1].T
            solver = make_solver(i, y_win, priorupdate=update_type)
            solver.solve()
            if solver.stats['status'] != 'Solve_Succeeded':
                print('Warning: startup solver failure at step %d!' % i)
                break
            xmhe[:, i] = np.array(solver.var['x', i]).flatten()
        else:
            # Window full: build the persistent full-horizon solver once,
            # then advance with newmeasurement.
            if full_solver is None:
                y_win = ysim[:, i - Nt:i + 1].T  # y(0..Nt) when i == Nt
                full_solver = make_solver(Nt, y_win, priorupdate=update_type)
            else:
                full_solver.newmeasurement(ysim[:, i])

            full_solver.solve()
            if full_solver.stats['status'] != 'Solve_Succeeded':
                print('Warning: solver failure at time %d!' % i)
                break

            full_solver.saveestimate()
            xmhe[:, i] = np.array(full_solver.var['x', Nt]).flatten()

        ymhe[:, i] = [measurement(xmhe[:, i])]

    xs[update_type] = xmhe
    ys[update_type] = ymhe

# EKF (G = I: process noise enters all states).
print('Simulating EKF.')
Afunc = f.factory('Afunc', f.name_in(), ['jac:f:x'])
Cfunc = h.factory('Cfunc', h.name_in(), ['jac:h:x'])

xekf = np.full((Nx, Nsim + 1), np.nan)
xekf[:, 0] = x0bar_init.copy()
P    = P0.copy()
yekf = np.full((Ny, Nsim + 1), np.nan)

for i in range(Nsim + 1):
    e    = ysim[:, i] - np.array(h(xekf[:, i])).flatten()
    C    = np.array(Cfunc(xekf[:, i])).reshape(Ny, Nx)
    L    = P @ C.T @ np.linalg.inv(C @ P @ C.T + R)
    P    = P - L @ C @ P
    xekf[:, i]  = xekf[:, i] + L @ e
    yekf[:, i]  = [measurement(xekf[:, i])]

    if i < Nsim:
        A              = np.array(Afunc(xekf[:, i])).reshape(Nx, Nx)
        xekf[:, i + 1] = np.array(f(xekf[:, i])).flatten()
        P              = A @ P @ A.T + Q

xs['ekf'] = xekf
ys['ekf'] = yekf

sio.savemat('priorupdates.mat', {'x': xs, 'y': ys, 't': t})