# Loads default parameters for robust MPC CSTR problem.# Model Parameters.importnumpyasnpdefcstrparam(Nsim=None):pars={}# Model Parameterspars['theta']=20pars['k']=300pars['M']=5pars['xf']=0.3947pars['xc']=0.3816pars['alpha']=0.117# Simulationpars['tfinal']=300pars['N']=40pars['Q']=1/2pars['R']=1/2pars['Qf']=1e5pars['Nx']=2pars['Nu']=1pars['Np']=1pars['Delta']=3pars['xub']=np.array([[2],[2]])pars['xlb']=np.array([[0],[0]])pars['uub']=2pars['ulb']=0pars['colloc']=5pars['plantSteps']=2# Initial Conditions and Setpointspars['x0']=np.array([[0.9831],[0.3918]])pars['ze']=np.array([[0.2632],[0.6519]])pars['uprev']=0.02pars['usp']=455/600# Nominal constraintspars['uub_nominal']=2pars['ulb_nominal']=0.02pars['A']=0pars['w']=1defcstrode(x,u,p,pars):# Extract statesc=x[0]T=x[1]t=x[2]iflen(x)>2else0# Handle optional time state# Calculate reaction rate and disturbancerate=pars['k']*c*np.exp(-pars['M']/T)w=pars['A']*np.sin(t*pars['w'])*p# State derivativesdcdt=(1-c)/pars['theta']-ratedTdt=(pars['xf']-T)/pars['theta']+rate-pars['alpha']*u*(T-pars['xc'])+wdtdt=1returnnp.array([dcdt,dTdt,dtdt])# Add ODE function to parameterspars['ode']=cstrode# Set random seednp.random.seed(0)returnpars
# reference_trajectory = nominalmpc(pars)## Runs nominal NMPC for CSTR example in Chapter 3 to generate the reference# trajectory for tube-based MPC.## Note: In this script, "p" denotes the disturbance ("w" in MPC textbook).importnumpyasnpimportmpctoolsasmpcdefnominalmpc(pars):Nu=pars['Nu']Np=pars['Np']Delta=pars['Delta']tfinal=pars['tfinal']N=pars['N']uprev=pars['uprev']usp=pars['usp']colloc=pars['colloc']Q=pars['Q']R=pars['R']Qf=pars['Qf']# Augmented state includes time as third component.Nx=pars['Nx']+1xub=np.append(pars['xub'].flatten(),np.inf)xlb=np.append(pars['xlb'].flatten(),-np.inf)x0=np.append(pars['x0'].flatten(),0.0)ze=np.append(pars['ze'].flatten(),tfinal)# Tightened constraints for nominal problem.uub=pars['uub_nominal']ulb=pars['ulb_nominal']defode_model(x,u):returnpars['ode'](x,u,0,pars)defode_plant(x,u,p):returnpars['ode'](x,u,p,pars)defstagecost(x,u,x_sp,u_sp):xerr=x[:pars['Nx']]-x_sp[:pars['Nx']]uerr=u-u_spreturnQ*(xerr@xerr)+R*(uerr@uerr)deftermcost(x,x_sp):xerr=x[:pars['Nx']]-x_sp[:pars['Nx']]returnQf*(xerr@xerr)ode_casadi=mpc.getCasadiFunc(ode_model,[Nx,Nu],['x','u'],'cstr_model')cstrsim=mpc.getCasadiIntegrator(ode_plant,Delta,[Nx,Nu,Np],['x','u','p'],'cstr_plant')l=mpc.getCasadiFunc(stagecost,[Nx,Nu,Nx,Nu],['x','u','x_sp','u_sp'],'l')Pf=mpc.getCasadiFunc(termcost,[Nx,Nx],['x','x_sp'],'Pf')# Controller dimensions and collocation.N_dict={'x':Nx,'u':Nu,'t':N,'c':colloc}# Constant setpoints over the horizon.xsp_init=np.tile(ze,(N+1,1))# (N+1, Nx)usp_init=np.full((N,Nu),usp)# (N, Nu)# Initial guess: linear interpolation to setpoint.guess_x=np.column_stack([np.linspace(x0[0],ze[0],N+1),np.linspace(x0[1],ze[1],N+1),np.linspace(x0[2],x0[2]+N*Delta,N+1),])# (N+1, Nx)guess_u=np.linspace(uprev,usp,N).reshape(N,Nu)# (N, Nu)solver=mpc.nmpc(f=ode_casadi,N=N_dict,Delta=Delta,verbosity=0,l=l,Vf=Pf,x0=x0,guess={'x':guess_x,'u':guess_u},par={'xsp':xsp_init,'usp':usp_init},lb={'x':xlb,'u':np.full(Nu,ulb)},ub={'x':xub,'u':np.full(Nu,uub)},inferargs=True,)# Closed-loop simulation.Nsim=int(tfinal/Delta)+Nx=np.full((Nx,Nsim+1),np.nan)u=np.full((Nu,Nsim),np.nan)x[:,0]=x0foriinrange(Nsim):solver.fixvar('x',0,x[:,i])solver.solve()print(f'{i}: {solver.stats["status"]}')ifsolver.stats['status']!='Solve_Succeeded':print(f'Warning: failed at step {i}!')breaku[:,i]=np.array(solver.var['u',0]).flatten()x[:,i+1]=np.array(cstrsim(x[:,i],u[:,i],np.zeros(Np))).flatten()solver.saveguess()t=Delta*np.arange(Nsim+1)return{'x':x,'u':u,'t':t}
# [depends] standardmpc.py# [depends] tubempc.pyimportsysimportossys.path.insert(0,os.path.dirname(os.path.abspath(__file__)))importnumpyasnpimportmatplotlib.pyplotaspltfromstandardmpcimportstandardmpcfromtubempcimporttubempcdefrun_standardvstube(standard_pars,tube_pars,Nsamples):""" Runs standard and tube MPC for the CSTR problem and returns trajectories. """standardMPCdata={}tubeMPCdata={}foriinrange(Nsamples):print(f'*** Sample {i+1} of {Nsamples}.')# Disturbance for this realization.standard_pars['A']=standard_pars['A_rand'][i]standard_pars['w']=standard_pars['w_rand'][i]tube_pars['A']=tube_pars['A_rand'][i]tube_pars['w']=tube_pars['w_rand'][i]# Run standard MPC.result=standardmpc(standard_pars)ifi==0:Nt=result['x'].shape[1]# Nsim + 1Ntplant=result['xplant'].shape[1]# plantSteps*Nsim + 1Nsim=result['u'].shape[1]# Nsim (number of control steps)standardMPCdata['x1']=np.zeros((Nt,Nsamples))standardMPCdata['x2']=np.zeros((Nt,Nsamples))standardMPCdata['x1plant']=np.zeros((Ntplant,Nsamples))standardMPCdata['x2plant']=np.zeros((Ntplant,Nsamples))standardMPCdata['u']=np.zeros((Nsim,Nsamples))standardMPCdata['x1'][:,i]=result['x'][0,:].flatten()standardMPCdata['x2'][:,i]=result['x'][1,:].flatten()standardMPCdata['x1plant'][:,i]=result['xplant'][0,:].flatten()standardMPCdata['x2plant'][:,i]=result['xplant'][1,:].flatten()standardMPCdata['u'][:,i]=result['u'].flatten()standardMPCdata['pars']=result['pars']# Run tube-based MPC.result=tubempc(tube_pars)ifi==0:Nt_tube=result['x'].shape[1]Ntplant_tube=result['xplant'].shape[1]Nsim_tube=result['u'].shape[1]tubeMPCdata['x1']=np.zeros((Nt_tube,Nsamples))tubeMPCdata['x2']=np.zeros((Nt_tube,Nsamples))tubeMPCdata['x1plant']=np.zeros((Ntplant_tube,Nsamples))tubeMPCdata['x2plant']=np.zeros((Ntplant_tube,Nsamples))tubeMPCdata['u']=np.zeros((Nsim_tube,Nsamples))tubeMPCdata['x1'][:,i]=result['x'][0,:].flatten()tubeMPCdata['x2'][:,i]=result['x'][1,:].flatten()tubeMPCdata['x1plant'][:,i]=result['xplant'][0,:].flatten()tubeMPCdata['x2plant'][:,i]=result['xplant'][1,:].flatten()tubeMPCdata['u'][:,i]=result['u'].flatten()tubeMPCdata['pars']=result['pars']standardMPCdata['t']=result['t']standardMPCdata['tplant']=result['tplant']tubeMPCdata['t']=result['t']tubeMPCdata['tplant']=result['tplant']return{'standard':standardMPCdata,'tube':tubeMPCdata}
# result = standardmpc(pars)## Runs standard NMPC for CSTR example in Chapter 3.## Note: In this script, "p" denotes the disturbance ("w" in MPC textbook).importnumpyasnpimportmpctoolsasmpcdefstandardmpc(pars):Nu=pars['Nu']Np=pars['Np']Delta=pars['Delta']uub=pars['uub']ulb=pars['ulb']tfinal=pars['tfinal']N=pars['N']plantSteps=pars['plantSteps']uprev=pars['uprev']usp=pars['usp']colloc=pars['colloc']Q=pars['Q']R=pars['R']Qf=pars['Qf']# Augmented state includes time as third component.Nx=pars['Nx']+1xub=np.append(pars['xub'].flatten(),np.inf)xlb=np.append(pars['xlb'].flatten(),-np.inf)x0=np.append(pars['x0'].flatten(),0.0)ze=np.append(pars['ze'].flatten(),tfinal)defode_model(x,u):returnpars['ode'](x,u,0,pars)defode_plant(x,u,p):returnpars['ode'](x,u,p,pars)defstagecost(x,u,x_sp,u_sp):xerr=x[:pars['Nx']]-x_sp[:pars['Nx']]uerr=u-u_spreturnQ*(xerr@xerr)+R*(uerr@uerr)deftermcost(x,x_sp):xerr=x[:pars['Nx']]-x_sp[:pars['Nx']]returnQf*(xerr@xerr)ode_casadi=mpc.getCasadiFunc(ode_model,[Nx,Nu],['x','u'],'cstr_model')cstrsim=mpc.getCasadiIntegrator(ode_plant,Delta/plantSteps,[Nx,Nu,Np],['x','u','p'],'cstr_plant')l=mpc.getCasadiFunc(stagecost,[Nx,Nu,Nx,Nu],['x','u','x_sp','u_sp'],'l')Pf=mpc.getCasadiFunc(termcost,[Nx,Nx],['x','x_sp'],'Pf')# Controller dimensions and collocation.N_dict={'x':Nx,'u':Nu,'t':N,'c':colloc}# Constant setpoints over the horizon.xsp_init=np.tile(ze,(N+1,1))# (N+1, Nx)usp_init=np.full((N,Nu),usp)# (N, Nu)# Initial guess: linear interpolation to setpoint.guess_x=np.column_stack([np.linspace(x0[0],ze[0],N+1),np.linspace(x0[1],ze[1],N+1),np.linspace(x0[2],x0[2]+N*Delta,N+1),])# (N+1, Nx)guess_u=np.linspace(uprev,usp,N).reshape(N,Nu)# (N, Nu)solver=mpc.nmpc(f=ode_casadi,N=N_dict,Delta=Delta,verbosity=0,l=l,Vf=Pf,x0=x0,guess={'x':guess_x,'u':guess_u},par={'xsp':xsp_init,'usp':usp_init},lb={'x':xlb,'u':np.full(Nu,ulb)},ub={'x':xub,'u':np.full(Nu,uub)},inferargs=True,)# Closed-loop simulation.Nsim=int(tfinal/Delta)x=np.full((Nx,Nsim+1),np.nan)xplant=np.full((Nx,plantSteps*Nsim+1),np.nan)xplantnow=np.full((Nx,plantSteps+1),np.nan)u=np.full((Nu,Nsim),np.nan)x[:,0]=x0xplant[:,0]=x0xplantnow[:,0]=x0foriinrange(Nsim):solver.fixvar('x',0,x[:,i])solver.solve()print(f'{i}: {solver.stats["status"]}')ifsolver.stats['status']!='Solve_Succeeded':print(f'Warning: failed at step {i}!')breaku[:,i]=np.array(solver.var['u',0]).flatten()forjinrange(plantSteps):xplantnow[:,j+1]=np.array(cstrsim(xplantnow[:,j],u[:,i],np.ones(Np))).flatten()xplant[:,1+i*plantSteps:1+(i+1)*plantSteps]=xplantnow[:,1:]xplantnow[:,0]=xplantnow[:,-1]x[:,i+1]=xplantnow[:,-1]solver.saveguess()pars_save={k:vfork,vinpars.items()ifk!='ode'}return{'pars':pars_save,'x':x,'u':u,'t':np.arange(0,tfinal+Delta,Delta),'xplant':xplant,'tplant':np.linspace(0,tfinal,plantSteps*Nsim+1),}
# result = tubempc(pars)## Runs tube-based NMPC for CSTR example in Chapter 3.## Note: In this script, "p" denotes the disturbance ("w" in MPC textbook).importnumpyasnpimportmpctoolsasmpcdeftubempc(pars):Nu=pars['Nu']Np=pars['Np']Delta=pars['Delta']uub=pars['uub']ulb=pars['ulb']tfinal=pars['tfinal']N=pars['N']plantSteps=pars['plantSteps']colloc=pars['colloc']Q=pars['Q']R=pars['R']Qf=pars['Qf']# Augmented state includes time as third component.Nx=pars['Nx']+1xub=np.append(pars['xub'].flatten(),np.inf)xlb=np.append(pars['xlb'].flatten(),-np.inf)x0=np.append(pars['x0'].flatten(),0.0)# Reference trajectories from nominal MPC.xsp=pars['reftraj']['x']# (Nx, Nsim_nom+1)usp=pars['reftraj']['u']# (Nu, Nsim_nom)defode_model(x,u):returnpars['ode'](x,u,0,pars)defode_plant(x,u,p):returnpars['ode'](x,u,p,pars)defstagecost(x,u,x_sp,u_sp):xerr=x[:pars['Nx']]-x_sp[:pars['Nx']]uerr=u-u_spreturnQ*(xerr@xerr)+R*(uerr@uerr)deftermcost(x,x_sp):xerr=x[:pars['Nx']]-x_sp[:pars['Nx']]returnQf*(xerr@xerr)ode_casadi=mpc.getCasadiFunc(ode_model,[Nx,Nu],['x','u'],'cstr_model')cstrsim=mpc.getCasadiIntegrator(ode_plant,Delta/plantSteps,[Nx,Nu,Np],['x','u','p'],'cstr_plant')l=mpc.getCasadiFunc(stagecost,[Nx,Nu,Nx,Nu],['x','u','x_sp','u_sp'],'l')Pf=mpc.getCasadiFunc(termcost,[Nx,Nx],['x','x_sp'],'Pf')# Controller dimensions and collocation.N_dict={'x':Nx,'u':Nu,'t':N,'c':colloc}# Initial setpoints from reference trajectory.xsp_init=xsp[:,:N+1].T# (N+1, Nx) — time-firstusp_init=usp[:,:N].T# (N, Nu)# Initial guess from reference trajectory.guess_x=xsp[:,:N+1].T# (N+1, Nx)guess_u=usp[:,:N].T# (N, Nu)solver=mpc.nmpc(f=ode_casadi,N=N_dict,Delta=Delta,verbosity=0,l=l,Vf=Pf,x0=x0,guess={'x':guess_x,'u':guess_u},par={'xsp':xsp_init,'usp':usp_init},lb={'x':xlb,'u':np.full(Nu,ulb)},ub={'x':xub,'u':np.full(Nu,uub)},inferargs=True,)# Closed-loop simulation.Nsim=int(tfinal/Delta)x=np.full((Nx,Nsim+1),np.nan)xplant=np.full((Nx,plantSteps*Nsim+1),np.nan)xplantnow=np.full((Nx,plantSteps+1),np.nan)u=np.full((Nu,Nsim),np.nan)x[:,0]=x0xplant[:,0]=x0xplantnow[:,0]=x0foriinrange(Nsim):solver.fixvar('x',0,x[:,i])# Update time-varying setpoints from reference trajectory (one step at a time).fortinrange(N+1):solver.par['x_sp',t]=xsp[:,i+t]fortinrange(N):solver.par['u_sp',t]=usp[:,i+t]solver.solve()print(f'{i}: {solver.stats["status"]}')ifsolver.stats['status']!='Solve_Succeeded':print(f'Warning: failed at step {i}!')breaku[:,i]=np.array(solver.var['u',0]).flatten()forjinrange(plantSteps):xplantnow[:,j+1]=np.array(cstrsim(xplantnow[:,j],u[:,i],np.ones(Np))).flatten()xplant[:,1+i*plantSteps:1+(i+1)*plantSteps]=xplantnow[:,1:]xplantnow[:,0]=xplantnow[:,-1]x[:,i+1]=xplantnow[:,-1]solver.saveguess()pars_save={k:vfork,vinpars.items()ifk!='ode'}return{'pars':pars_save,'x':x,'u':u,'t':np.arange(0,tfinal+Delta,Delta),'xplant':xplant,'tplant':np.linspace(0,tfinal,plantSteps*Nsim+1),}
# [makes] standardvstube.mat## [rule] copymat# [depends] functions/cstrparam.py# [depends] functions/nominalmpc.py# [depends] functions/run_standardvstube.py## Robust control of an exothermic reaction.# Note: 100 samples takes ~30 min; the .mat is checked in to data/.# If you change anything, re-run and copy standardvstube.mat to data/.importsysimportossys.path.insert(0,os.path.join(os.path.dirname(os.path.abspath(__file__)),'functions'))importnumpyasnpfromscipy.ioimportsavematfromcstrparamimportcstrparamfromnominalmpcimportnominalmpcfromrun_standardvstubeimportrun_standardvstube# Load parameters.pars=cstrparam()# Run nominal MPC to generate reference trajectory for tube-based MPC.pars['reftraj']=nominalmpc(pars)# Create random disturbance scenarios.Nsamples=10# Use 100 for the text figure (takes ~30 min).pars['A_rand']=0.001*np.random.rand(Nsamples)pars['w_rand']=np.random.rand(Nsamples)# Run both standard and tube-based MPC for all samples.data=run_standardvstube(pars,pars,Nsamples)