Figure 6.9:

Closed-loop state and control evolution with (x_1(0),x_2(0)) = (3,-3).

Figure 6.9

Code for Figure 6.9

Text of the GNU GPL.

nonlinsys.py


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# sys = nonlinsys()
#
import numpy as np
from scipy.linalg import solve_discrete_are
from control import dlqr
import casadi

def nonlinsys():
    """
    Returns a dict of parameters for the unstable nonlinear system.
    """

    # Initialize system dict
    sys = {}

    # Define the model and stage cost
    sys['Nx'] = 2
    sys['Nu'] = 2

    def f(x, u):
        return np.array([x[0]**2 + x[1] + u[0]**3 + u[1],
                        x[0] + x[1]**2 + u[0] + u[1]**3])
    sys['f'] = f

    def l(x, u):
        return 0.5*(x.T @ x + u.T @ u)
    sys['l'] = l

    # Define steady state and bounds
    sys['xs'] = np.zeros(sys['Nx'])
    sys['us'] = np.zeros(sys['Nu'])
    sys['ulb'] = np.array([-2.5, -2.5])
    sys['uub'] = np.array([2.5, 2.5])

    # Linearize the ODE
    x = casadi.SX.sym('x', sys['Nx'])
    u = casadi.SX.sym('u', sys['Nu'])
    f_expr = casadi.vertcat(x[0]**2 + x[1] + u[0]**3 + u[1],
                           x[0] + x[1]**2 + u[0] + u[1]**3)
    fcasadi = casadi.Function('f', [x, u], [f_expr])

    # Get Jacobians
    A = casadi.jacobian(f_expr, x)
    B = casadi.jacobian(f_expr, u)

    # Create functions to evaluate Jacobians
    A_func = casadi.Function('A', [x, u], [A])
    B_func = casadi.Function('B', [x, u], [B])

    # Evaluate at steady state
    sys['A'] = np.array(A_func(sys['xs'], sys['us']))
    sys['B'] = np.array(B_func(sys['xs'], sys['us']))

    # Linearize the objective
    l_expr = 0.5*(x.T @ x + u.T @ u)
    lcasadi = casadi.Function('l', [x, u], [l_expr])

    # Get Hessians
    Q = casadi.hessian(l_expr, x)[0]
    R = casadi.hessian(l_expr, u)[0]

    # Create functions to evaluate Hessians
    Q_func = casadi.Function('Q', [x, u], [Q])
    R_func = casadi.Function('R', [x, u], [R])

    sys['Q'] = np.array(Q_func(sys['xs'], sys['us']))
    sys['R'] = np.array(R_func(sys['xs'], sys['us']))

    # Compute LQR controller
    K, S, E = dlqr(sys['A'], sys['B'], sys['Q'], sys['R'])
    sys['K'] = -K
    sys['P'] = S

    def Vf(x):
        return 0.5 * x.T @ sys['P'] @ x
    sys['Vf'] = Vf


    return sys

distributedgradient.py


  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# [ustar, uiter] = distributedgradient(V, u0, ulb, uub, ...)
#
# Runs a distributed gradient algorithm for a problem with box constraints.
#
# V should be a function handle defining the objective function. u0 should be
# the starting point. ulb and uub should give the box constraints for u.
#
# Additional arguments are passed as 'key' value pairs. They are as follows:
#
# - 'beta', 'sigma' : parameters for line search (default 0.8 and 0.01)
# - 'alphamax' : maximum step to take in any iteration (default 10)
# - 'Niter' : Maximum number of iterations (default 25)
# - 'steptol', 'gradtol' : Absolute tolerances on stepsize and gradient.
#                          Algorithm terminates if the stepsize is less than
#                          steptol and the gradient is less than gradtol
#                          (defaults 1e-5 and 1e-6). Set either to a negative
#                          value to never stop early.
# - 'Nlinesearch' : maximum number of backtracking steps (default 100)
# - 'I' : cell array defining the variables for each system. Default is
#         each variable is a separate system.
#
# Return values are ustar, the final point, and uiter, a matrix of iteration
# progress.
import numpy as np
from casadi import SX, jacobian, Function

def distributedgradient(V, u0, ulb, uub, **kwargs):
    """
    Runs a distributed gradient algorithm for a problem with box constraints.
    """
    # Get options
    param = {
        'beta': 0.8,
        'sigma': 0.01,
        'alphamax': 15,
        'Niter': 50,
        'steptol': 1e-5,
        'gradtol': 1e-6,
        'Nlinesearch': 100,
        'I': None
    }
    param.update(kwargs)

    Nu = len(u0)
    if param['I'] is None:
        param['I'] = [[i] for i in range(Nu)]

    # Project initial condition to feasible space
    u0 = np.minimum(np.maximum(u0, ulb), uub)

    # Get gradient function via CasADi
    usym = SX.sym('u', Nu)
    vsym = V(usym)
    Jsym = jacobian(vsym, usym)
    dVcasadi = Function('V', [usym], [Jsym])

    # Start the loop
    k = 0
    uiter = np.full((Nu, param['Niter'] + 1), np.nan)
    uiter[:,0] = u0
    g, gred = calcgrad(dVcasadi, u0, ulb, uub)
    v = np.full(Nu, np.inf)

    while k < param['Niter'] and (np.linalg.norm(gred) > param['gradtol'] or np.linalg.norm(v) > param['steptol']):
        # Take step
        uiter[:,k+1], v = coopstep(param['I'], V, g, uiter[:,k], ulb, uub, param)
        k += 1

        # Update gradient
        g, gred = calcgrad(dVcasadi, uiter[:,k], ulb, uub)

    # Get final point and strip off extra points
    uiter = uiter[:,:k+1]
    ustar = uiter[:,-1]

    return ustar, uiter

def coopstep(I, V, g, u0, ulb, uub, param):
    """
    Takes one step of cooperative distributed gradient projection.
    """
    Nu = len(u0)
    Ni = len(I)
    us = np.full((Nu, Ni), np.nan)
    Vs = np.full(Ni, np.nan)

    # Run each optimization
    V0 = float(V(u0))
    for j in range(Ni):
        # Compute step
        v = np.zeros(Nu)
        i = I[j]
        v[i] = -g[i]
        ubar = np.minimum(np.maximum(u0 + v, ulb), uub)
        v = ubar - u0

        # Choose alpha
        if np.any(ubar == ulb) or np.any(ubar == uub):
            alpha = 1.0
        else:
            with np.errstate(divide='ignore'):
                alpha_lb = (ulb[i] - u0[i]) / v[i]
                alpha_ub = (uub[i] - u0[i]) / v[i]
            alpha = np.concatenate([alpha_lb, alpha_ub])
            alpha = alpha[np.isfinite(alpha) & (alpha >= 0)]
            if len(alpha) == 0:
                alpha = 0.0
            else:
                alpha = min(np.min(alpha), param['alphamax'])

        # Backtracking line search
        k = 1
        while float(V(u0 + alpha * v)) > V0 + param['sigma'] * alpha * np.dot(g[i].flatten(), v[i].flatten()) \
              and k <= param['Nlinesearch']:
            alpha = param['beta'] * alpha
            k += 1

        us[:, j] = u0 + alpha * v
        Vs[j] = float(V(us[:, j]))

    # Choose step to take
    w = np.ones(Ni) / Ni
    for j in range(Ni):
        # Check for descent
        u = us @ w
        if float(V(u)) <= np.dot(Vs, w):
            break
        elif j == Ni - 1:
            u = u0.copy()
            print('Warning: No descent directions!')
            break

        # Get rid of worst player
        jmax = np.argmax(Vs)
        Vs[jmax] = -np.finfo(float).max
        w[jmax] = 0
        w = w / np.sum(w)

    # Calculate actual step
    v = u - u0
    return u, v


def calcgrad(dV, u, ulb, uub):
    """
    Calculates gradient and reduced gradient at a given point.
    """
    g = np.array(dV(u)).flatten()
    gred = g.copy()
    gred[(u == ulb) & (g > 0)] = 0
    gred[(u == uub) & (g < 0)] = 0
    return g, gred

main.py


  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
# Closed-loop evolution of nonlinear system for N = 2.
# [depends] functions/nonlinsys.py functions/distributedgradient.py
# [makes] nonlinsys_cl.mat
import sys
import os
import matplotlib.pyplot as plt
import numpy as np
import casadi as ca
import scipy.io as sio

sys.path.append('functions/')
from nonlinsys import nonlinsys
from distributedgradient import distributedgradient

sys = nonlinsys()

# CasADi-compatible dynamics and costs (sys['f']/sys['Vf'] use numpy internally).
P = sys['P']

def f_ca(x, u):
    return ca.vertcat(x[0]**2 + x[1] + u[0]**3 + u[1],
                      x[0] + x[1]**2 + u[0] + u[1]**3)

def l_ca(x, u):
    return 0.5*(x[0]**2 + x[1]**2 + u[0]**2 + u[1]**2)

def Vf_ca(x):
    Px0 = P[0,0]*x[0] + P[0,1]*x[1]
    Px1 = P[1,0]*x[0] + P[1,1]*x[1]
    return 0.5*(x[0]*Px0 + x[1]*Px1)

# Define objective function and bounds.
Nt = 2
Nu = sys['Nu']
Nx = sys['Nx']

def make_V(Nt, Nu):
    def V(x, u):
        return Vf_ca(x)
    for t in range(Nt - 1, -1, -1):
        i = slice(t*Nu, (t + 1)*Nu)
        def V_new(x, u, i=i, V_prev=V):
            return l_ca(x, u[i]) + V_prev(f_ca(x, u[i]), u)
        V = V_new
    return V

V = make_V(Nt, Nu)

ulb = np.tile(sys['ulb'], Nt)
uub = np.tile(sys['uub'], Nt)
I = [list(range(i, Nt*Nu, Nu)) for i in range(Nu)]

def warmstart(x, u, model):
    """Fills in a warm start using the terminal control law."""
    Nt = u.shape[1]
    x_full = np.hstack([x.reshape(-1, 1), np.full((len(x), Nt), np.nan)])
    u = u.copy()
    for t in range(Nt):
        if np.any(np.isnan(u[:,t])):
            u[:,t] = np.minimum(np.maximum(sys['K'] @ x_full[:,t], sys['ulb']), sys['uub'])
        if model == 'nonlinear':
            x_full[:,t+1] = np.array(sys['f'](x_full[:,t], u[:,t])).flatten()
        elif model == 'linear':
            x_full[:,t+1] = sys['A'] @ x_full[:,t] + sys['B'] @ u[:,t]
        else:
            raise ValueError('Invalid choice for model!')
    return x_full, u

# Simulate closed loop for different numbers of iterations.
Nsim = 10
Ps = [3, 10]
options = {}
options['p3'] = {'alphamax': 50, 'beta': 0.95}

x0 = np.array([3., -3.])
data = {}
for P_val in Ps:
    key = f'p{P_val}'
    opts = options.get(key, {})

    x = np.full((Nx, Nsim + 1), np.nan)
    x[:,0] = x0
    u = np.full((Nu, Nsim), np.nan)

    # Generate initial guess using the linear control law.
    _, uguess = warmstart(x0, np.full((Nu, Nt), np.nan), 'linear')

    # Simulate in closed loop.
    for t in range(Nsim):
        useq, _ = distributedgradient(lambda u, xt=x[:,t]: V(xt, u),
                                      uguess.flatten(order='F'), ulb, uub,
                                      I=I, Niter=P_val, **opts)
        u[:,t] = useq[:Nu]

        x[:,t+1] = np.array(sys['f'](x[:,t], u[:,t])).flatten()
        uguess = np.hstack([useq[Nu:].reshape(Nu, Nt - 1), np.full((Nu, 1), np.nan)])
        _, uguess = warmstart(x[:,t+1], uguess, 'nonlinear')

        if np.linalg.norm(x[:,t+1]) > 100:
            import warnings
            warnings.warn('x has diverged!')
            break

    tplot = np.arange(Nsim + 1)
    if not os.getenv('OMIT_PLOTS') == 'true':
        plt.figure()

        plt.subplot(2, 1, 1)
        plt.plot(tplot, x[0,:], '-or')
        plt.plot(tplot, x[1,:], '-sb')
        plt.title(f'p = {P_val}')

        plt.subplot(2, 1, 2)
        plt.step(tplot, np.append(u[0,:], u[0,-1]), '-or', where='post')
        plt.step(tplot, np.append(u[1,:], u[1,-1]), '-sb', where='post')
        plt.plot([0, Nsim], [ulb[0]]*2, ':k', [0, Nsim], [uub[0]]*2, ':k')

    data[key] = {'t': tplot, 'x': x, 'u': u, 'ulb': ulb, 'uub': uub}