Figure 4.4:

Evolution of the state (solid line) and UKF state estimate (dashed line).

Figure 4.4

Code for Figure 4.4

Text of the GNU GPL.

batchreactor.py


  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
# pars = batchreactor(Nsim)
#
# Returns a struct of batch reactor parameters to avoid duplication across
# MHE, EKF, and UKF scripts.
import os
import numpy as np
import scipy.io
import matplotlib.pyplot as plt
import casadi
import mpctools

def batchreactor(Nsim):
    """
    pars = batchreactor(Nsim)

    Returns a struct of batch reactor parameters to avoid duplication across
    MHE, EKF, and UKF scripts.
    """
    if not isinstance(Nsim, int):
        raise TypeError("Nsim must be an integer")

    # Sizes
    Delta = 0.25
    Nx = 3
    Ny = 1
    Nw = Nx
    Nv = Ny

    # Random variable standard deviations
    sig_v = 0.25  # Measurement noise
    sig_w = 0.001 # State noise
    sig_p = 0.5   # Prior

    P = sig_p**2 * np.eye(Nx)
    Q = sig_w**2 * np.eye(Nw)
    R = sig_v**2 * np.eye(Ny)

    # Create symbolic variables
    x_sym = casadi.SX.sym('x', Nx)

    # Define ODE system
    ode = {'x': x_sym, 'ode': batch_model(x_sym)}

    # Create integrator
    plant = casadi.integrator('integrator', 'cvodes', ode, 0, Delta)

    # Create RK4 discretization with M=4 substeps to match MATLAB (getCasadiFunc M=4)
    frk4 = mpctools.getCasadiFunc(batch_model, [Nx], ['x'],
                                   funcname='frk4', rk4=True, Delta=Delta, M=4)

    # Create state transition and measurement functions
    w_sym = casadi.SX.sym('w', Nw)
    f = casadi.Function('f', [x_sym, w_sym], [frk4(x_sym) + w_sym], ['x', 'w'], ['f'])
    h = casadi.Function('h', [x_sym], [measurement(x_sym)], ['x'], ['h'])

    # Choose prior
    xhat0 = np.array([1.0, 0.0, 4.0])
    x0 = np.array([0.5, 0.05, 0.0])

    # Simulate system to get data
    np.random.seed(0)
    w = sig_w * np.random.randn(Nw, Nsim)
    v = sig_v * np.random.randn(Nv, Nsim + 1)

    xsim = np.nan * np.ones((Nx, Nsim + 1))
    xsim[:,0] = x0

    ysim = np.nan * np.ones((Ny, Nsim + 1))
    yclean = np.nan * np.ones((Ny, Nsim + 1))

    for t in range(Nsim + 1):
        xsim[:,t] = np.maximum(xsim[:,t], 0)
        yclean[:,t] = np.array(h(xsim[:,t])).flatten()
        ysim[:,t] = yclean[:,t] + v[:,t]

        if t < Nsim:
            xsim[:,t+1] = np.array(plant(x0=xsim[:,t])['xf']).flatten() + w[:,t]

    t = np.arange(Nsim + 1) * Delta

    # Package everything up
    pars = {
        'Delta': Delta,
        'Nsim': Nsim,
        'Nx': Nx,
        'Ny': Ny,
        'Nw': Nw,
        'Nv': Nv,
        'sig_v': sig_v,
        'sig_w': sig_w,
        'sig_p': sig_p,
        'P': P,
        'Q': Q,
        'R': R,
        'plant': plant,
        'frk4': frk4,
        'f': f,
        'h': h,
        'xhat0': xhat0,
        'x0': x0,
        'xsim': xsim,
        'ysim': ysim,
        'yclean': yclean,
        't': t,
        'reactorplot': reactorplot
    }

    return pars

def batch_model(x):
    """Nonlinear ODE function"""
    k1 = 0.5
    km1 = 0.05
    k2 = 0.2
    km2 = 0.01

    cA = x[0]
    cB = x[1]
    cC = x[2]

    rate1 = k1*cA - km1*cB*cC
    rate2 = k2*cB**2 - km2*cC

    return casadi.vertcat(
        -rate1,
        rate1 - 2*rate2,
        rate1 + rate2
    )

def measurement(x):
    """Linear measurement function"""
    RT = 0.0821 * 400
    return RT * casadi.sum1(x)

def reactorplot(x, xhat, y, yhat, yclean, Delta):
    """Make a plot of actual and estimated states"""
    if not all(isinstance(arg, np.ndarray) for arg in [x, xhat, y, yhat, yclean]):
        raise TypeError("Arrays must be numpy arrays")
    if not isinstance(Delta, (int, float)):
        raise TypeError("Delta must be numeric")
    if os.getenv('OMIT_PLOTS') == 'true':
        return None

    fig = plt.figure()
    Nx = 3
    Nt = x.shape[1] - 1
    t = np.arange(Nt + 1) * Delta

    plt.subplot(2, 1, 2)
    plt.plot(t, yclean, '-k', label='Actual')
    plt.plot(t, yclean, '--c', label='Measured')
    plt.plot(t, yhat, 'ok', label='Estimated')
    plt.ylabel('P')
    plt.xlabel('t')
    plt.legend(loc='center left', bbox_to_anchor=(1, 0.5))
    plt.ylim([10, 35])

    plt.subplot(2, 1, 1)
    colors = ['r', 'b', 'g']
    names = ['A', 'B', 'C']
    for i in range(Nx):
        plt.plot(t, x[i,:] + 1e-10, '-'+colors[i],
                label=f'Actual C_{names[i]}')
        plt.plot(t, xhat[i,:] + 1e-10, 'o'+colors[i],
                label=f'Estimated C_{names[i]}')
    plt.xlabel('t')
    plt.ylabel('c_i')
    plt.legend(loc='center left', bbox_to_anchor=(1, 0.5))
    plt.ylim([-1, 1.5])

    return fig

main.py


  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# [makes] batchreactor_ukf.dat
# [depends] functions/batchreactor.py
#
# This file simulates the Batch Reactor Example (3.2.1) from Haseltine
# & Rawlings (2005). UKF is implemented and it is shown that it fails
# when a poor prior is used.
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(os.path.abspath(__file__)), 'functions'))

import numpy as np
from scipy import linalg
from misc import gnuplotsave
from batchreactor import batchreactor


def get_sigma(Pz, xhat, pars):
    """Returns sigma points obtained from scaled sqrtm(Pz)."""
    sqrtPz = np.real(linalg.sqrtm((pars['Nz'] + pars['lambda']) * Pz))
    zhat   = np.concatenate([xhat.flatten(), np.zeros(pars['Nw'] + pars['Nv'])])
    cols   = np.hstack([np.zeros((pars['Nz'], 1)), sqrtPz, -sqrtPz])
    z_sigma = cols + zhat[:, np.newaxis]

    x_sigma = z_sigma[:pars['Nx'], :]
    w_sigma = z_sigma[pars['Nx']:pars['Nx'] + pars['Nw'], :]
    v_sigma = z_sigma[pars['Nx'] + pars['Nw']:, :]
    return x_sigma, w_sigma, v_sigma


def ukf_scale(x_sigma, pars):
    """
    Rescales sigma points and adjusts weights so each point has no
    negative components.
    """
    x_center = x_sigma[:, 0:1]
    x_outer  = x_sigma[:, 1:]

    dx    = np.abs(x_outer - x_center)
    err   = np.abs(np.minimum(x_outer, 0))
    err[x_center.ravel() < 0, :] = 0
    dx[err == 0] = 1
    theta = 1 - np.max(err / dx, axis=0)

    x_outer = theta * x_outer + (1 - theta) * x_center
    x_sigma = np.hstack([x_center, x_outer])

    rho     = (2 * pars['lambda'] - 1) / (pars['Nsigma'] - np.sum(theta))
    a       = -rho / (2 * (pars['Nz'] + pars['lambda']))
    b       = (1 + rho) / (2 * (pars['Nz'] + pars['lambda']))
    weights = np.concatenate([[0], a * theta]) + b
    return x_sigma, weights


def doukf(Nsim, clipping):
    """Simulate the UKF with or without clipping."""
    pars = batchreactor(Nsim)
    Nx   = pars['Nx']
    Ny   = pars['Ny']
    Nw   = pars['Nw']
    Nv   = pars['Nv']
    pars['Nz'] = Nx + Nw + Nv
    Nz   = pars['Nz']

    Nsim = pars['Nsim']
    x    = pars['xsim']
    y    = pars['ysim']
    Px   = pars['P']
    Qw   = pars['Q']
    Rv   = pars['R']

    Nsigma          = 1 + 2 * Nz
    pars['Nsigma']  = Nsigma
    pars['lambda']  = 1
    weights         = np.concatenate([[2 * pars['lambda']], np.ones(2 * Nz)]) \
                      / (2 * (Nz + pars['lambda']))
    pars['weights'] = weights
    W = np.diag(weights)

    xhat = np.full((Nx, Nsim + 1), np.nan)
    yhat = np.full((Ny, Nsim + 1), np.nan)
    e    = np.full((Ny, Nsim + 1), np.nan)

    xhat[:, 0] = pars['xhat0']
    Pz = linalg.block_diag(Px, Qw, Rv)
    x_sigma, w_sigma, v_sigma = get_sigma(Pz, xhat[:, 0], pars)

    for t in range(Nsim + 1):
        # Take measurement.
        y_sigma = np.full((Ny, Nsigma), np.nan)
        for i in range(Nsigma):
            y_sigma[:, i] = np.array(pars['h'](x_sigma[:, i])).flatten() \
                            + v_sigma[:, i]
        yhat[:, t] = y_sigma @ pars['weights']
        e[:, t]    = y[:, t] - yhat[:, t]

        # Apply correction.
        Ex  = x_sigma - xhat[:, t:t + 1]
        Ey  = y_sigma - yhat[:, t:t + 1]
        Exy = Ex @ W @ Ey.T
        Eyy = Ey @ W @ Ey.T
        L   = Exy @ np.linalg.inv(Eyy)
        Px  = Px - L @ Exy.T
        xhat[:, t] = xhat[:, t] + (L @ e[:, t:t + 1]).flatten()

        if t >= Nsim:
            break

        # New sigma points; clip if requested.
        Pz = linalg.block_diag(Px, Qw, Rv)
        x_sigma, w_sigma, v_sigma = get_sigma(Pz, xhat[:, t], pars)
        if clipping:
            x_sigma, pars['weights'] = ukf_scale(x_sigma, pars)

        # Simulate sigma points forward.
        for i in range(Nsigma):
            x_sigma[:, i] = np.array(
                pars['f'](x_sigma[:, i], w_sigma[:, i])
            ).flatten()
        xhat[:, t + 1] = x_sigma @ pars['weights']
        Ex = x_sigma - xhat[:, t + 1:t + 2]
        Px = Ex @ W @ Ex.T

    # Form data table: [t, x1, x2, x3, xhat1, xhat2, xhat3, y, yhat]
    return np.column_stack([pars['t'], x.T, xhat.T, y.T, yhat.T])


print('Simulating without clipping.')
data_noclip  = doukf(150, False)
print('Simulating with clipping.')
data_yesclip = doukf(180, True)

gnuplotsave('batchreactor_ukf.dat', {'noclip': data_noclip, 'yesclip': data_yesclip})