# pars = batchreactor(Nsim)## Returns a struct of batch reactor parameters to avoid duplication across# MHE, EKF, and UKF scripts.importosimportnumpyasnpimportscipy.ioimportmatplotlib.pyplotaspltimportcasadiimportmpctoolsdefbatchreactor(Nsim):""" pars = batchreactor(Nsim) Returns a struct of batch reactor parameters to avoid duplication across MHE, EKF, and UKF scripts. """ifnotisinstance(Nsim,int):raiseTypeError("Nsim must be an integer")# SizesDelta=0.25Nx=3Ny=1Nw=NxNv=Ny# Random variable standard deviationssig_v=0.25# Measurement noisesig_w=0.001# State noisesig_p=0.5# PriorP=sig_p**2*np.eye(Nx)Q=sig_w**2*np.eye(Nw)R=sig_v**2*np.eye(Ny)# Create symbolic variablesx_sym=casadi.SX.sym('x',Nx)# Define ODE systemode={'x':x_sym,'ode':batch_model(x_sym)}# Create integratorplant=casadi.integrator('integrator','cvodes',ode,0,Delta)# Create RK4 discretization with M=4 substeps to match MATLAB (getCasadiFunc M=4)frk4=mpctools.getCasadiFunc(batch_model,[Nx],['x'],funcname='frk4',rk4=True,Delta=Delta,M=4)# Create state transition and measurement functionsw_sym=casadi.SX.sym('w',Nw)f=casadi.Function('f',[x_sym,w_sym],[frk4(x_sym)+w_sym],['x','w'],['f'])h=casadi.Function('h',[x_sym],[measurement(x_sym)],['x'],['h'])# Choose priorxhat0=np.array([1.0,0.0,4.0])x0=np.array([0.5,0.05,0.0])# Simulate system to get datanp.random.seed(0)w=sig_w*np.random.randn(Nw,Nsim)v=sig_v*np.random.randn(Nv,Nsim+1)xsim=np.nan*np.ones((Nx,Nsim+1))xsim[:,0]=x0ysim=np.nan*np.ones((Ny,Nsim+1))yclean=np.nan*np.ones((Ny,Nsim+1))fortinrange(Nsim+1):xsim[:,t]=np.maximum(xsim[:,t],0)yclean[:,t]=np.array(h(xsim[:,t])).flatten()ysim[:,t]=yclean[:,t]+v[:,t]ift<Nsim:xsim[:,t+1]=np.array(plant(x0=xsim[:,t])['xf']).flatten()+w[:,t]t=np.arange(Nsim+1)*Delta# Package everything uppars={'Delta':Delta,'Nsim':Nsim,'Nx':Nx,'Ny':Ny,'Nw':Nw,'Nv':Nv,'sig_v':sig_v,'sig_w':sig_w,'sig_p':sig_p,'P':P,'Q':Q,'R':R,'plant':plant,'frk4':frk4,'f':f,'h':h,'xhat0':xhat0,'x0':x0,'xsim':xsim,'ysim':ysim,'yclean':yclean,'t':t,'reactorplot':reactorplot}returnparsdefbatch_model(x):"""Nonlinear ODE function"""k1=0.5km1=0.05k2=0.2km2=0.01cA=x[0]cB=x[1]cC=x[2]rate1=k1*cA-km1*cB*cCrate2=k2*cB**2-km2*cCreturncasadi.vertcat(-rate1,rate1-2*rate2,rate1+rate2)defmeasurement(x):"""Linear measurement function"""RT=0.0821*400returnRT*casadi.sum1(x)defreactorplot(x,xhat,y,yhat,yclean,Delta):"""Make a plot of actual and estimated states"""ifnotall(isinstance(arg,np.ndarray)forargin[x,xhat,y,yhat,yclean]):raiseTypeError("Arrays must be numpy arrays")ifnotisinstance(Delta,(int,float)):raiseTypeError("Delta must be numeric")ifos.getenv('OMIT_PLOTS')=='true':returnNonefig=plt.figure()Nx=3Nt=x.shape[1]-1t=np.arange(Nt+1)*Deltaplt.subplot(2,1,2)plt.plot(t,yclean,'-k',label='Actual')plt.plot(t,yclean,'--c',label='Measured')plt.plot(t,yhat,'ok',label='Estimated')plt.ylabel('P')plt.xlabel('t')plt.legend(loc='center left',bbox_to_anchor=(1,0.5))plt.ylim([10,35])plt.subplot(2,1,1)colors=['r','b','g']names=['A','B','C']foriinrange(Nx):plt.plot(t,x[i,:]+1e-10,'-'+colors[i],label=f'Actual C_{names[i]}')plt.plot(t,xhat[i,:]+1e-10,'o'+colors[i],label=f'Estimated C_{names[i]}')plt.xlabel('t')plt.ylabel('c_i')plt.legend(loc='center left',bbox_to_anchor=(1,0.5))plt.ylim([-1,1.5])returnfig
# [makes] batchreactor_ukf.dat# [depends] functions/batchreactor.py## This file simulates the Batch Reactor Example (3.2.1) from Haseltine# & Rawlings (2005). UKF is implemented and it is shown that it fails# when a poor prior is used.importsysimportossys.path.insert(0,os.path.join(os.path.dirname(os.path.abspath(__file__)),'functions'))importnumpyasnpfromscipyimportlinalgfrommiscimportgnuplotsavefrombatchreactorimportbatchreactordefget_sigma(Pz,xhat,pars):"""Returns sigma points obtained from scaled sqrtm(Pz)."""sqrtPz=np.real(linalg.sqrtm((pars['Nz']+pars['lambda'])*Pz))zhat=np.concatenate([xhat.flatten(),np.zeros(pars['Nw']+pars['Nv'])])cols=np.hstack([np.zeros((pars['Nz'],1)),sqrtPz,-sqrtPz])z_sigma=cols+zhat[:,np.newaxis]x_sigma=z_sigma[:pars['Nx'],:]w_sigma=z_sigma[pars['Nx']:pars['Nx']+pars['Nw'],:]v_sigma=z_sigma[pars['Nx']+pars['Nw']:,:]returnx_sigma,w_sigma,v_sigmadefukf_scale(x_sigma,pars):""" Rescales sigma points and adjusts weights so each point has no negative components. """x_center=x_sigma[:,0:1]x_outer=x_sigma[:,1:]dx=np.abs(x_outer-x_center)err=np.abs(np.minimum(x_outer,0))err[x_center.ravel()<0,:]=0dx[err==0]=1theta=1-np.max(err/dx,axis=0)x_outer=theta*x_outer+(1-theta)*x_centerx_sigma=np.hstack([x_center,x_outer])rho=(2*pars['lambda']-1)/(pars['Nsigma']-np.sum(theta))a=-rho/(2*(pars['Nz']+pars['lambda']))b=(1+rho)/(2*(pars['Nz']+pars['lambda']))weights=np.concatenate([[0],a*theta])+breturnx_sigma,weightsdefdoukf(Nsim,clipping):"""Simulate the UKF with or without clipping."""pars=batchreactor(Nsim)Nx=pars['Nx']Ny=pars['Ny']Nw=pars['Nw']Nv=pars['Nv']pars['Nz']=Nx+Nw+NvNz=pars['Nz']Nsim=pars['Nsim']x=pars['xsim']y=pars['ysim']Px=pars['P']Qw=pars['Q']Rv=pars['R']Nsigma=1+2*Nzpars['Nsigma']=Nsigmapars['lambda']=1weights=np.concatenate([[2*pars['lambda']],np.ones(2*Nz)]) \
/(2*(Nz+pars['lambda']))pars['weights']=weightsW=np.diag(weights)xhat=np.full((Nx,Nsim+1),np.nan)yhat=np.full((Ny,Nsim+1),np.nan)e=np.full((Ny,Nsim+1),np.nan)xhat[:,0]=pars['xhat0']Pz=linalg.block_diag(Px,Qw,Rv)x_sigma,w_sigma,v_sigma=get_sigma(Pz,xhat[:,0],pars)fortinrange(Nsim+1):# Take measurement.y_sigma=np.full((Ny,Nsigma),np.nan)foriinrange(Nsigma):y_sigma[:,i]=np.array(pars['h'](x_sigma[:,i])).flatten() \
+v_sigma[:,i]yhat[:,t]=y_sigma@pars['weights']e[:,t]=y[:,t]-yhat[:,t]# Apply correction.Ex=x_sigma-xhat[:,t:t+1]Ey=y_sigma-yhat[:,t:t+1]Exy=Ex@W@Ey.TEyy=Ey@W@Ey.TL=Exy@np.linalg.inv(Eyy)Px=Px-L@Exy.Txhat[:,t]=xhat[:,t]+(L@e[:,t:t+1]).flatten()ift>=Nsim:break# New sigma points; clip if requested.Pz=linalg.block_diag(Px,Qw,Rv)x_sigma,w_sigma,v_sigma=get_sigma(Pz,xhat[:,t],pars)ifclipping:x_sigma,pars['weights']=ukf_scale(x_sigma,pars)# Simulate sigma points forward.foriinrange(Nsigma):x_sigma[:,i]=np.array(pars['f'](x_sigma[:,i],w_sigma[:,i])).flatten()xhat[:,t+1]=x_sigma@pars['weights']Ex=x_sigma-xhat[:,t+1:t+2]Px=Ex@W@Ex.T# Form data table: [t, x1, x2, x3, xhat1, xhat2, xhat3, y, yhat]returnnp.column_stack([pars['t'],x.T,xhat.T,y.T,yhat.T])print('Simulating without clipping.')data_noclip=doukf(150,False)print('Simulating with clipping.')data_yesclip=doukf(180,True)gnuplotsave('batchreactor_ukf.dat',{'noclip':data_noclip,'yesclip':data_yesclip})