# Loads default parameters for robust MPC CSTR problem.# Model Parameters.importnumpyasnpdefcstrparam(Nsim=None):pars={}# Model Parameterspars['theta']=20pars['k']=300pars['M']=5pars['xf']=0.3947pars['xc']=0.3816pars['alpha']=0.117# Simulationpars['tfinal']=300pars['N']=40pars['Q']=1/2pars['R']=1/2pars['Qf']=1e5pars['Nx']=2pars['Nu']=1pars['Np']=1pars['Delta']=3pars['xub']=np.array([[2],[2]])pars['xlb']=np.array([[0],[0]])pars['uub']=2pars['ulb']=0pars['colloc']=5pars['plantSteps']=2# Initial Conditions and Setpointspars['x0']=np.array([[0.9831],[0.3918]])pars['ze']=np.array([[0.2632],[0.6519]])pars['uprev']=0.02pars['usp']=455/600# Nominal constraintspars['uub_nominal']=2pars['ulb_nominal']=0.02pars['A']=0pars['w']=1defcstrode(x,u,p,pars):# Extract statesc=x[0]T=x[1]t=x[2]iflen(x)>2else0# Handle optional time state# Calculate reaction rate and disturbancerate=pars['k']*c*np.exp(-pars['M']/T)w=pars['A']*np.sin(t*pars['w'])*p# State derivativesdcdt=(1-c)/pars['theta']-ratedTdt=(pars['xf']-T)/pars['theta']+rate-pars['alpha']*u*(T-pars['xc'])+wdtdt=1returnnp.array([dcdt,dTdt,dtdt])# Add ODE function to parameterspars['ode']=cstrode# Set random seednp.random.seed(0)returnpars
# reference_trajectory = nominalmpc(pars)## Runs nominal NMPC for CSTR example in Chapter 3 to generate the reference# trajectory for tube-based MPC.## Note: In this script, "p" denotes the disturbance ("w" in MPC textbook).importnumpyasnpimportmpctoolsasmpcdefnominalmpc(pars):Nu=pars['Nu']Np=pars['Np']Delta=pars['Delta']tfinal=pars['tfinal']N=pars['N']uprev=pars['uprev']usp=pars['usp']colloc=pars['colloc']Q=pars['Q']R=pars['R']Qf=pars['Qf']# Augmented state includes time as third component.Nx=pars['Nx']+1xub=np.append(pars['xub'].flatten(),np.inf)xlb=np.append(pars['xlb'].flatten(),-np.inf)x0=np.append(pars['x0'].flatten(),0.0)ze=np.append(pars['ze'].flatten(),tfinal)# Tightened constraints for nominal problem.uub=pars['uub_nominal']ulb=pars['ulb_nominal']defode_model(x,u):returnpars['ode'](x,u,0,pars)defode_plant(x,u,p):returnpars['ode'](x,u,p,pars)defstagecost(x,u,x_sp,u_sp):xerr=x[:pars['Nx']]-x_sp[:pars['Nx']]uerr=u-u_spreturnQ*(xerr@xerr)+R*(uerr@uerr)deftermcost(x,x_sp):xerr=x[:pars['Nx']]-x_sp[:pars['Nx']]returnQf*(xerr@xerr)ode_casadi=mpc.getCasadiFunc(ode_model,[Nx,Nu],['x','u'],'cstr_model')cstrsim=mpc.getCasadiIntegrator(ode_plant,Delta,[Nx,Nu,Np],['x','u','p'],'cstr_plant')l=mpc.getCasadiFunc(stagecost,[Nx,Nu,Nx,Nu],['x','u','x_sp','u_sp'],'l')Pf=mpc.getCasadiFunc(termcost,[Nx,Nx],['x','x_sp'],'Pf')# Controller dimensions and collocation.N_dict={'x':Nx,'u':Nu,'t':N,'c':colloc}# Constant setpoints over the horizon.xsp_init=np.tile(ze,(N+1,1))# (N+1, Nx)usp_init=np.full((N,Nu),usp)# (N, Nu)# Initial guess: linear interpolation to setpoint.guess_x=np.column_stack([np.linspace(x0[0],ze[0],N+1),np.linspace(x0[1],ze[1],N+1),np.linspace(x0[2],x0[2]+N*Delta,N+1),])# (N+1, Nx)guess_u=np.linspace(uprev,usp,N).reshape(N,Nu)# (N, Nu)solver=mpc.nmpc(f=ode_casadi,N=N_dict,Delta=Delta,verbosity=0,l=l,Vf=Pf,x0=x0,guess={'x':guess_x,'u':guess_u},par={'xsp':xsp_init,'usp':usp_init},lb={'x':xlb,'u':np.full(Nu,ulb)},ub={'x':xub,'u':np.full(Nu,uub)},inferargs=True,)# Closed-loop simulation.Nsim=int(tfinal/Delta)+Nx=np.full((Nx,Nsim+1),np.nan)u=np.full((Nu,Nsim),np.nan)x[:,0]=x0foriinrange(Nsim):solver.fixvar('x',0,x[:,i])solver.solve()print(f'{i}: {solver.stats["status"]}')ifsolver.stats['status']!='Solve_Succeeded':print(f'Warning: failed at step {i}!')breaku[:,i]=np.array(solver.var['u',0]).flatten()x[:,i+1]=np.array(cstrsim(x[:,i],u[:,i],np.zeros(Np))).flatten()solver.saveguess()t=Delta*np.arange(Nsim+1)return{'x':x,'u':u,'t':t}
# result = tubempc(pars)## Runs tube-based NMPC for CSTR example in Chapter 3.## Note: In this script, "p" denotes the disturbance ("w" in MPC textbook).importnumpyasnpimportmpctoolsasmpcdeftubempc(pars):Nu=pars['Nu']Np=pars['Np']Delta=pars['Delta']uub=pars['uub']ulb=pars['ulb']tfinal=pars['tfinal']N=pars['N']plantSteps=pars['plantSteps']colloc=pars['colloc']Q=pars['Q']R=pars['R']Qf=pars['Qf']# Augmented state includes time as third component.Nx=pars['Nx']+1xub=np.append(pars['xub'].flatten(),np.inf)xlb=np.append(pars['xlb'].flatten(),-np.inf)x0=np.append(pars['x0'].flatten(),0.0)# Reference trajectories from nominal MPC.xsp=pars['reftraj']['x']# (Nx, Nsim_nom+1)usp=pars['reftraj']['u']# (Nu, Nsim_nom)defode_model(x,u):returnpars['ode'](x,u,0,pars)defode_plant(x,u,p):returnpars['ode'](x,u,p,pars)defstagecost(x,u,x_sp,u_sp):xerr=x[:pars['Nx']]-x_sp[:pars['Nx']]uerr=u-u_spreturnQ*(xerr@xerr)+R*(uerr@uerr)deftermcost(x,x_sp):xerr=x[:pars['Nx']]-x_sp[:pars['Nx']]returnQf*(xerr@xerr)ode_casadi=mpc.getCasadiFunc(ode_model,[Nx,Nu],['x','u'],'cstr_model')cstrsim=mpc.getCasadiIntegrator(ode_plant,Delta/plantSteps,[Nx,Nu,Np],['x','u','p'],'cstr_plant')l=mpc.getCasadiFunc(stagecost,[Nx,Nu,Nx,Nu],['x','u','x_sp','u_sp'],'l')Pf=mpc.getCasadiFunc(termcost,[Nx,Nx],['x','x_sp'],'Pf')# Controller dimensions and collocation.N_dict={'x':Nx,'u':Nu,'t':N,'c':colloc}# Initial setpoints from reference trajectory.xsp_init=xsp[:,:N+1].T# (N+1, Nx) — time-firstusp_init=usp[:,:N].T# (N, Nu)# Initial guess from reference trajectory.guess_x=xsp[:,:N+1].T# (N+1, Nx)guess_u=usp[:,:N].T# (N, Nu)solver=mpc.nmpc(f=ode_casadi,N=N_dict,Delta=Delta,verbosity=0,l=l,Vf=Pf,x0=x0,guess={'x':guess_x,'u':guess_u},par={'xsp':xsp_init,'usp':usp_init},lb={'x':xlb,'u':np.full(Nu,ulb)},ub={'x':xub,'u':np.full(Nu,uub)},inferargs=True,)# Closed-loop simulation.Nsim=int(tfinal/Delta)x=np.full((Nx,Nsim+1),np.nan)xplant=np.full((Nx,plantSteps*Nsim+1),np.nan)xplantnow=np.full((Nx,plantSteps+1),np.nan)u=np.full((Nu,Nsim),np.nan)x[:,0]=x0xplant[:,0]=x0xplantnow[:,0]=x0foriinrange(Nsim):solver.fixvar('x',0,x[:,i])# Update time-varying setpoints from reference trajectory (one step at a time).fortinrange(N+1):solver.par['x_sp',t]=xsp[:,i+t]fortinrange(N):solver.par['u_sp',t]=usp[:,i+t]solver.solve()print(f'{i}: {solver.stats["status"]}')ifsolver.stats['status']!='Solve_Succeeded':print(f'Warning: failed at step {i}!')breaku[:,i]=np.array(solver.var['u',0]).flatten()forjinrange(plantSteps):xplantnow[:,j+1]=np.array(cstrsim(xplantnow[:,j],u[:,i],np.ones(Np))).flatten()xplant[:,1+i*plantSteps:1+(i+1)*plantSteps]=xplantnow[:,1:]xplantnow[:,0]=xplantnow[:,-1]x[:,i+1]=xplantnow[:,-1]solver.saveguess()pars_save={k:vfork,vinpars.items()ifk!='ode'}return{'pars':pars_save,'x':x,'u':u,'t':np.arange(0,tfinal+Delta,Delta),'xplant':xplant,'tplant':np.linspace(0,tfinal,plantSteps*Nsim+1),}
# [makes] tubempc_sampletimes.mat## [rule] copymat# [depends] functions/cstrparam.py# [depends] functions/nominalmpc.py# [depends] functions/tubempc.py## Robust control of an exothermic reaction — effect of sample time.# Note: takes ~3 min; the .mat is checked in to data/.# If you change anything, re-run and copy tubempc_sampletimes.mat to data/.importsysimportossys.path.insert(0,os.path.join(os.path.dirname(os.path.abspath(__file__)),'functions'))importnumpyasnpfromscipy.ioimportsavematfromscipy.interpolateimportinterp1dfromcstrparamimportcstrparamfromnominalmpcimportnominalmpcfromtubempcimporttubempcdefresample_nominal(reftraj,Delta):"""Resample the reference (central path) trajectory to a new sample time."""t=reftraj['t']u=reftraj['u']# (Nu, Nsim_nom)x=reftraj['x']# (Nx, Nsim_nom+1)t0,tf=t[0],t[-1]tnew=np.arange(t0,tf+1e-10,Delta)# Interpolate u (defined at t[:-1]) and x (defined at t).u_interp=interp1d(t[:-1],u,kind='cubic',fill_value='extrapolate')x_interp=interp1d(t,x,kind='cubic',fill_value='extrapolate')unew=u_interp(tnew)# (Nu, len(tnew))xnew=x_interp(tnew)# (Nx, len(tnew))return{'t':tnew,'u':unew,'x':xnew}# Base parameters.nompars=cstrparam()nompars['tfinal']=600nompars['colloc']=10nompars['plantSteps']=20nompars['A']=0.002nompars['w']=0.4# Two sample times.Deltas=[8,12]predHorizon=360pars={}forDeltainDeltas:key=f'd{Delta}'pars[key]=nompars.copy()pars[key]['Delta']=Deltapars[key]['N']=round(predHorizon/Delta)pars['d12']['colloc']=24# More collocation points for longer interval.# Run nominal (central path) MPC using Delta=12, then resample for Delta=8.print('Running nominalmpc with Delta=12...')reftraj=nominalmpc(pars['d12'])pars['d12']['reftraj']=reftrajpars['d8']['reftraj']=resample_nominal(reftraj,pars['d8']['Delta'])# Run tube-based MPC for each sample time.results={}forkeyin[f'd{d}'fordinDeltas]:print(f'Running tubempc for {key}...')results[key]=tubempc(pars[key])# Build output data structure.data={}data['ref']={'x1':reftraj['x'][0,:],'x2':reftraj['x'][1,:],'u':reftraj['u'].flatten(),'t':reftraj['t'],}forkeyin[f'd{d}'fordinDeltas]:r=results[key]data[key]={'x1':r['x'][0,:],'x2':r['x'][1,:],'x1plant':r['xplant'][0,:],'x2plant':r['xplant'][1,:],'u':r['u'].flatten(),'t':r['t'],'tplant':r['tplant'],}