Figure 3.6:

Comparison of 100 realizations of standard and tube-based MPC for the chemical reactor example.

Figure 3.6

Code for Figure 3.6

Text of the GNU GPL.

cstrparam.py


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# Loads default parameters for robust MPC CSTR problem.
# Model Parameters.
import numpy as np

def cstrparam(Nsim=None):
    pars = {}

    # Model Parameters
    pars['theta'] = 20
    pars['k'] = 300
    pars['M'] = 5
    pars['xf'] = 0.3947
    pars['xc'] = 0.3816
    pars['alpha'] = 0.117

    # Simulation
    pars['tfinal'] = 300
    pars['N'] = 40
    pars['Q'] = 1/2
    pars['R'] = 1/2
    pars['Qf'] = 1e5
    pars['Nx'] = 2
    pars['Nu'] = 1
    pars['Np'] = 1
    pars['Delta'] = 3
    pars['xub'] = np.array([[2], [2]])
    pars['xlb'] = np.array([[0], [0]])
    pars['uub'] = 2
    pars['ulb'] = 0
    pars['colloc'] = 5
    pars['plantSteps'] = 2

    # Initial Conditions and Setpoints
    pars['x0'] = np.array([[0.9831], [0.3918]])
    pars['ze'] = np.array([[0.2632], [0.6519]])
    pars['uprev'] = 0.02
    pars['usp'] = 455/600

    # Nominal constraints
    pars['uub_nominal'] = 2
    pars['ulb_nominal'] = 0.02
    pars['A'] = 0
    pars['w'] = 1

    def cstrode(x, u, p, pars):
        # Extract states
        c = x[0]
        T = x[1]
        t = x[2] if len(x) > 2 else 0  # Handle optional time state

        # Calculate reaction rate and disturbance
        rate = pars['k'] * c * np.exp(-pars['M'] / T)
        w = pars['A'] * np.sin(t * pars['w']) * p

        # State derivatives
        dcdt = (1 - c) / pars['theta'] - rate
        dTdt = (pars['xf'] - T) / pars['theta'] + rate - pars['alpha'] * u * (T - pars['xc']) + w
        dtdt = 1

        return np.array([dcdt, dTdt, dtdt])

    # Add ODE function to parameters
    pars['ode'] = cstrode

    # Set random seed
    np.random.seed(0)

    return pars

nominalmpc.py


  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
# reference_trajectory = nominalmpc(pars)
#
# Runs nominal NMPC for CSTR example in Chapter 3 to generate the reference
# trajectory for tube-based MPC.
#
# Note: In this script, "p" denotes the disturbance ("w" in MPC textbook).
import numpy as np
import mpctools as mpc


def nominalmpc(pars):
    Nu     = pars['Nu']
    Np     = pars['Np']
    Delta  = pars['Delta']
    tfinal = pars['tfinal']
    N      = pars['N']
    uprev  = pars['uprev']
    usp    = pars['usp']
    colloc = pars['colloc']
    Q      = pars['Q']
    R      = pars['R']
    Qf     = pars['Qf']

    # Augmented state includes time as third component.
    Nx  = pars['Nx'] + 1
    xub = np.append(pars['xub'].flatten(), np.inf)
    xlb = np.append(pars['xlb'].flatten(), -np.inf)
    x0  = np.append(pars['x0'].flatten(), 0.0)
    ze  = np.append(pars['ze'].flatten(), tfinal)

    # Tightened constraints for nominal problem.
    uub = pars['uub_nominal']
    ulb = pars['ulb_nominal']

    def ode_model(x, u):
        return pars['ode'](x, u, 0, pars)

    def ode_plant(x, u, p):
        return pars['ode'](x, u, p, pars)

    def stagecost(x, u, x_sp, u_sp):
        xerr = x[:pars['Nx']] - x_sp[:pars['Nx']]
        uerr = u - u_sp
        return Q * (xerr @ xerr) + R * (uerr @ uerr)

    def termcost(x, x_sp):
        xerr = x[:pars['Nx']] - x_sp[:pars['Nx']]
        return Qf * (xerr @ xerr)

    ode_casadi = mpc.getCasadiFunc(ode_model, [Nx, Nu], ['x', 'u'], 'cstr_model')
    cstrsim    = mpc.getCasadiIntegrator(ode_plant, Delta, [Nx, Nu, Np],
                                         ['x', 'u', 'p'], 'cstr_plant')
    l  = mpc.getCasadiFunc(stagecost, [Nx, Nu, Nx, Nu], ['x', 'u', 'x_sp', 'u_sp'], 'l')
    Pf = mpc.getCasadiFunc(termcost,  [Nx, Nx],          ['x', 'x_sp'],               'Pf')

    # Controller dimensions and collocation.
    N_dict = {'x': Nx, 'u': Nu, 't': N, 'c': colloc}

    # Constant setpoints over the horizon.
    xsp_init = np.tile(ze, (N + 1, 1))      # (N+1, Nx)
    usp_init = np.full((N, Nu), usp)        # (N, Nu)

    # Initial guess: linear interpolation to setpoint.
    guess_x = np.column_stack([
        np.linspace(x0[0], ze[0], N + 1),
        np.linspace(x0[1], ze[1], N + 1),
        np.linspace(x0[2], x0[2] + N * Delta, N + 1),
    ])  # (N+1, Nx)
    guess_u = np.linspace(uprev, usp, N).reshape(N, Nu)  # (N, Nu)

    solver = mpc.nmpc(
        f=ode_casadi,
        N=N_dict,
        Delta=Delta,
        verbosity=0,
        l=l,
        Vf=Pf,
        x0=x0,
        guess={'x': guess_x, 'u': guess_u},
        par={'xsp': xsp_init, 'usp': usp_init},
        lb={'x': xlb, 'u': np.full(Nu, ulb)},
        ub={'x': xub, 'u': np.full(Nu, uub)},
        inferargs=True,
    )

    # Closed-loop simulation.
    Nsim = int(tfinal / Delta) + N
    x = np.full((Nx, Nsim + 1), np.nan)
    u = np.full((Nu, Nsim), np.nan)
    x[:, 0] = x0

    for i in range(Nsim):
        solver.fixvar('x', 0, x[:, i])
        solver.solve()
        print(f'{i}: {solver.stats["status"]}')
        if solver.stats['status'] != 'Solve_Succeeded':
            print(f'Warning: failed at step {i}!')
            break

        u[:, i] = np.array(solver.var['u', 0]).flatten()
        x[:, i + 1] = np.array(cstrsim(x[:, i], u[:, i], np.zeros(Np))).flatten()
        solver.saveguess()

    t = Delta * np.arange(Nsim + 1)
    return {'x': x, 'u': u, 't': t}

run_standardvstube.py


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
# [depends] standardmpc.py
# [depends] tubempc.py
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))

import numpy as np
import matplotlib.pyplot as plt
from standardmpc import standardmpc
from tubempc import tubempc


def run_standardvstube(standard_pars, tube_pars, Nsamples):
    """
    Runs standard and tube MPC for the CSTR problem and returns trajectories.
    """
    standardMPCdata = {}
    tubeMPCdata = {}

    for i in range(Nsamples):
        print(f'*** Sample {i + 1} of {Nsamples}.')

        # Disturbance for this realization.
        standard_pars['A'] = standard_pars['A_rand'][i]
        standard_pars['w'] = standard_pars['w_rand'][i]
        tube_pars['A']     = tube_pars['A_rand'][i]
        tube_pars['w']     = tube_pars['w_rand'][i]

        # Run standard MPC.
        result = standardmpc(standard_pars)
        if i == 0:
            Nt      = result['x'].shape[1]        # Nsim + 1
            Ntplant = result['xplant'].shape[1]   # plantSteps*Nsim + 1
            Nsim    = result['u'].shape[1]        # Nsim (number of control steps)
            standardMPCdata['x1']     = np.zeros((Nt, Nsamples))
            standardMPCdata['x2']     = np.zeros((Nt, Nsamples))
            standardMPCdata['x1plant']= np.zeros((Ntplant, Nsamples))
            standardMPCdata['x2plant']= np.zeros((Ntplant, Nsamples))
            standardMPCdata['u']      = np.zeros((Nsim, Nsamples))

        standardMPCdata['x1'][:, i]      = result['x'][0, :].flatten()
        standardMPCdata['x2'][:, i]      = result['x'][1, :].flatten()
        standardMPCdata['x1plant'][:, i] = result['xplant'][0, :].flatten()
        standardMPCdata['x2plant'][:, i] = result['xplant'][1, :].flatten()
        standardMPCdata['u'][:, i]       = result['u'].flatten()
        standardMPCdata['pars']          = result['pars']

        # Run tube-based MPC.
        result = tubempc(tube_pars)
        if i == 0:
            Nt_tube      = result['x'].shape[1]
            Ntplant_tube = result['xplant'].shape[1]
            Nsim_tube    = result['u'].shape[1]
            tubeMPCdata['x1']     = np.zeros((Nt_tube, Nsamples))
            tubeMPCdata['x2']     = np.zeros((Nt_tube, Nsamples))
            tubeMPCdata['x1plant']= np.zeros((Ntplant_tube, Nsamples))
            tubeMPCdata['x2plant']= np.zeros((Ntplant_tube, Nsamples))
            tubeMPCdata['u']      = np.zeros((Nsim_tube, Nsamples))

        tubeMPCdata['x1'][:, i]      = result['x'][0, :].flatten()
        tubeMPCdata['x2'][:, i]      = result['x'][1, :].flatten()
        tubeMPCdata['x1plant'][:, i] = result['xplant'][0, :].flatten()
        tubeMPCdata['x2plant'][:, i] = result['xplant'][1, :].flatten()
        tubeMPCdata['u'][:, i]       = result['u'].flatten()
        tubeMPCdata['pars']          = result['pars']

    standardMPCdata['t']      = result['t']
    standardMPCdata['tplant'] = result['tplant']
    tubeMPCdata['t']          = result['t']
    tubeMPCdata['tplant']     = result['tplant']

    return {'standard': standardMPCdata, 'tube': tubeMPCdata}

standardmpc.py


  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
# result = standardmpc(pars)
#
# Runs standard NMPC for CSTR example in Chapter 3.
#
# Note: In this script, "p" denotes the disturbance ("w" in MPC textbook).
import numpy as np
import mpctools as mpc


def standardmpc(pars):
    Nu         = pars['Nu']
    Np         = pars['Np']
    Delta      = pars['Delta']
    uub        = pars['uub']
    ulb        = pars['ulb']
    tfinal     = pars['tfinal']
    N          = pars['N']
    plantSteps = pars['plantSteps']
    uprev      = pars['uprev']
    usp        = pars['usp']
    colloc     = pars['colloc']
    Q          = pars['Q']
    R          = pars['R']
    Qf         = pars['Qf']

    # Augmented state includes time as third component.
    Nx  = pars['Nx'] + 1
    xub = np.append(pars['xub'].flatten(), np.inf)
    xlb = np.append(pars['xlb'].flatten(), -np.inf)
    x0  = np.append(pars['x0'].flatten(), 0.0)
    ze  = np.append(pars['ze'].flatten(), tfinal)

    def ode_model(x, u):
        return pars['ode'](x, u, 0, pars)

    def ode_plant(x, u, p):
        return pars['ode'](x, u, p, pars)

    def stagecost(x, u, x_sp, u_sp):
        xerr = x[:pars['Nx']] - x_sp[:pars['Nx']]
        uerr = u - u_sp
        return Q * (xerr @ xerr) + R * (uerr @ uerr)

    def termcost(x, x_sp):
        xerr = x[:pars['Nx']] - x_sp[:pars['Nx']]
        return Qf * (xerr @ xerr)

    ode_casadi = mpc.getCasadiFunc(ode_model, [Nx, Nu], ['x', 'u'], 'cstr_model')
    cstrsim    = mpc.getCasadiIntegrator(ode_plant, Delta / plantSteps, [Nx, Nu, Np],
                                         ['x', 'u', 'p'], 'cstr_plant')
    l  = mpc.getCasadiFunc(stagecost, [Nx, Nu, Nx, Nu], ['x', 'u', 'x_sp', 'u_sp'], 'l')
    Pf = mpc.getCasadiFunc(termcost,  [Nx, Nx],          ['x', 'x_sp'],               'Pf')

    # Controller dimensions and collocation.
    N_dict = {'x': Nx, 'u': Nu, 't': N, 'c': colloc}

    # Constant setpoints over the horizon.
    xsp_init = np.tile(ze, (N + 1, 1))      # (N+1, Nx)
    usp_init = np.full((N, Nu), usp)        # (N, Nu)

    # Initial guess: linear interpolation to setpoint.
    guess_x = np.column_stack([
        np.linspace(x0[0], ze[0], N + 1),
        np.linspace(x0[1], ze[1], N + 1),
        np.linspace(x0[2], x0[2] + N * Delta, N + 1),
    ])  # (N+1, Nx)
    guess_u = np.linspace(uprev, usp, N).reshape(N, Nu)  # (N, Nu)

    solver = mpc.nmpc(
        f=ode_casadi,
        N=N_dict,
        Delta=Delta,
        verbosity=0,
        l=l,
        Vf=Pf,
        x0=x0,
        guess={'x': guess_x, 'u': guess_u},
        par={'xsp': xsp_init, 'usp': usp_init},
        lb={'x': xlb, 'u': np.full(Nu, ulb)},
        ub={'x': xub, 'u': np.full(Nu, uub)},
        inferargs=True,
    )

    # Closed-loop simulation.
    Nsim       = int(tfinal / Delta)
    x          = np.full((Nx, Nsim + 1), np.nan)
    xplant     = np.full((Nx, plantSteps * Nsim + 1), np.nan)
    xplantnow  = np.full((Nx, plantSteps + 1), np.nan)
    u          = np.full((Nu, Nsim), np.nan)
    x[:, 0]         = x0
    xplant[:, 0]    = x0
    xplantnow[:, 0] = x0

    for i in range(Nsim):
        solver.fixvar('x', 0, x[:, i])
        solver.solve()
        print(f'{i}: {solver.stats["status"]}')
        if solver.stats['status'] != 'Solve_Succeeded':
            print(f'Warning: failed at step {i}!')
            break

        u[:, i] = np.array(solver.var['u', 0]).flatten()
        for j in range(plantSteps):
            xplantnow[:, j + 1] = np.array(
                cstrsim(xplantnow[:, j], u[:, i], np.ones(Np))
            ).flatten()

        xplant[:, 1 + i * plantSteps : 1 + (i + 1) * plantSteps] = xplantnow[:, 1:]
        xplantnow[:, 0] = xplantnow[:, -1]
        x[:, i + 1]     = xplantnow[:, -1]
        solver.saveguess()

    pars_save = {k: v for k, v in pars.items() if k != 'ode'}
    return {
        'pars':   pars_save,
        'x':      x,
        'u':      u,
        't':      np.arange(0, tfinal + Delta, Delta),
        'xplant': xplant,
        'tplant': np.linspace(0, tfinal, plantSteps * Nsim + 1),
    }

tubempc.py


  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# result = tubempc(pars)
#
# Runs tube-based NMPC for CSTR example in Chapter 3.
#
# Note: In this script, "p" denotes the disturbance ("w" in MPC textbook).
import numpy as np
import mpctools as mpc


def tubempc(pars):
    Nu         = pars['Nu']
    Np         = pars['Np']
    Delta      = pars['Delta']
    uub        = pars['uub']
    ulb        = pars['ulb']
    tfinal     = pars['tfinal']
    N          = pars['N']
    plantSteps = pars['plantSteps']
    colloc     = pars['colloc']
    Q          = pars['Q']
    R          = pars['R']
    Qf         = pars['Qf']

    # Augmented state includes time as third component.
    Nx  = pars['Nx'] + 1
    xub = np.append(pars['xub'].flatten(), np.inf)
    xlb = np.append(pars['xlb'].flatten(), -np.inf)
    x0  = np.append(pars['x0'].flatten(), 0.0)

    # Reference trajectories from nominal MPC.
    xsp = pars['reftraj']['x']   # (Nx, Nsim_nom+1)
    usp = pars['reftraj']['u']   # (Nu, Nsim_nom)

    def ode_model(x, u):
        return pars['ode'](x, u, 0, pars)

    def ode_plant(x, u, p):
        return pars['ode'](x, u, p, pars)

    def stagecost(x, u, x_sp, u_sp):
        xerr = x[:pars['Nx']] - x_sp[:pars['Nx']]
        uerr = u - u_sp
        return Q * (xerr @ xerr) + R * (uerr @ uerr)

    def termcost(x, x_sp):
        xerr = x[:pars['Nx']] - x_sp[:pars['Nx']]
        return Qf * (xerr @ xerr)

    ode_casadi = mpc.getCasadiFunc(ode_model, [Nx, Nu], ['x', 'u'], 'cstr_model')
    cstrsim    = mpc.getCasadiIntegrator(ode_plant, Delta / plantSteps, [Nx, Nu, Np],
                                         ['x', 'u', 'p'], 'cstr_plant')
    l  = mpc.getCasadiFunc(stagecost, [Nx, Nu, Nx, Nu], ['x', 'u', 'x_sp', 'u_sp'], 'l')
    Pf = mpc.getCasadiFunc(termcost,  [Nx, Nx],          ['x', 'x_sp'],               'Pf')

    # Controller dimensions and collocation.
    N_dict = {'x': Nx, 'u': Nu, 't': N, 'c': colloc}

    # Initial setpoints from reference trajectory.
    xsp_init = xsp[:, :N + 1].T   # (N+1, Nx) — time-first
    usp_init = usp[:, :N].T       # (N, Nu)

    # Initial guess from reference trajectory.
    guess_x = xsp[:, :N + 1].T    # (N+1, Nx)
    guess_u = usp[:, :N].T        # (N, Nu)

    solver = mpc.nmpc(
        f=ode_casadi,
        N=N_dict,
        Delta=Delta,
        verbosity=0,
        l=l,
        Vf=Pf,
        x0=x0,
        guess={'x': guess_x, 'u': guess_u},
        par={'xsp': xsp_init, 'usp': usp_init},
        lb={'x': xlb, 'u': np.full(Nu, ulb)},
        ub={'x': xub, 'u': np.full(Nu, uub)},
        inferargs=True,
    )

    # Closed-loop simulation.
    Nsim       = int(tfinal / Delta)
    x          = np.full((Nx, Nsim + 1), np.nan)
    xplant     = np.full((Nx, plantSteps * Nsim + 1), np.nan)
    xplantnow  = np.full((Nx, plantSteps + 1), np.nan)
    u          = np.full((Nu, Nsim), np.nan)
    x[:, 0]         = x0
    xplant[:, 0]    = x0
    xplantnow[:, 0] = x0

    for i in range(Nsim):
        solver.fixvar('x', 0, x[:, i])
        # Update time-varying setpoints from reference trajectory (one step at a time).
        for t in range(N + 1):
            solver.par['x_sp', t] = xsp[:, i + t]
        for t in range(N):
            solver.par['u_sp', t] = usp[:, i + t]
        solver.solve()
        print(f'{i}: {solver.stats["status"]}')
        if solver.stats['status'] != 'Solve_Succeeded':
            print(f'Warning: failed at step {i}!')
            break

        u[:, i] = np.array(solver.var['u', 0]).flatten()
        for j in range(plantSteps):
            xplantnow[:, j + 1] = np.array(
                cstrsim(xplantnow[:, j], u[:, i], np.ones(Np))
            ).flatten()

        xplant[:, 1 + i * plantSteps : 1 + (i + 1) * plantSteps] = xplantnow[:, 1:]
        xplantnow[:, 0] = xplantnow[:, -1]
        x[:, i + 1]     = xplantnow[:, -1]
        solver.saveguess()

    pars_save = {k: v for k, v in pars.items() if k != 'ode'}
    return {
        'pars':   pars_save,
        'x':      x,
        'u':      u,
        't':      np.arange(0, tfinal + Delta, Delta),
        'xplant': xplant,
        'tplant': np.linspace(0, tfinal, plantSteps * Nsim + 1),
    }

main.py


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# [makes] standardvstube.mat
## [rule] copymat
# [depends] functions/cstrparam.py
# [depends] functions/nominalmpc.py
# [depends] functions/run_standardvstube.py
#
# Robust control of an exothermic reaction.
# Note: 100 samples takes ~30 min; the .mat is checked in to data/.
# If you change anything, re-run and copy standardvstube.mat to data/.
import sys
import os
sys.path.insert(0, os.path.join(os.path.dirname(os.path.abspath(__file__)), 'functions'))

import numpy as np
from scipy.io import savemat
from cstrparam import cstrparam
from nominalmpc import nominalmpc
from run_standardvstube import run_standardvstube

# Load parameters.
pars = cstrparam()

# Run nominal MPC to generate reference trajectory for tube-based MPC.
pars['reftraj'] = nominalmpc(pars)

# Create random disturbance scenarios.
Nsamples = 10   # Use 100 for the text figure (takes ~30 min).
pars['A_rand'] = 0.001 * np.random.rand(Nsamples)
pars['w_rand'] = np.random.rand(Nsamples)

# Run both standard and tube-based MPC for all samples.
data = run_standardvstube(pars, pars, Nsamples)