# pars = batchreactor(Nsim)## Returns a struct of batch reactor parameters to avoid duplication across# MHE, EKF, and UKF scripts.importosimportnumpyasnpimportscipy.ioimportmatplotlib.pyplotaspltimportcasadiimportmpctoolsdefbatchreactor(Nsim):""" pars = batchreactor(Nsim) Returns a struct of batch reactor parameters to avoid duplication across MHE, EKF, and UKF scripts. """ifnotisinstance(Nsim,int):raiseTypeError("Nsim must be an integer")# SizesDelta=0.25Nx=3Ny=1Nw=NxNv=Ny# Random variable standard deviationssig_v=0.25# Measurement noisesig_w=0.001# State noisesig_p=0.5# PriorP=sig_p**2*np.eye(Nx)Q=sig_w**2*np.eye(Nw)R=sig_v**2*np.eye(Ny)# Create symbolic variablesx_sym=casadi.SX.sym('x',Nx)# Define ODE systemode={'x':x_sym,'ode':batch_model(x_sym)}# Create integratorplant=casadi.integrator('integrator','cvodes',ode,0,Delta)# Create RK4 discretization with M=4 substeps to match MATLAB (getCasadiFunc M=4)frk4=mpctools.getCasadiFunc(batch_model,[Nx],['x'],funcname='frk4',rk4=True,Delta=Delta,M=4)# Create state transition and measurement functionsw_sym=casadi.SX.sym('w',Nw)f=casadi.Function('f',[x_sym,w_sym],[frk4(x_sym)+w_sym],['x','w'],['f'])h=casadi.Function('h',[x_sym],[measurement(x_sym)],['x'],['h'])# Choose priorxhat0=np.array([1.0,0.0,4.0])x0=np.array([0.5,0.05,0.0])# Simulate system to get datanp.random.seed(0)w=sig_w*np.random.randn(Nw,Nsim)v=sig_v*np.random.randn(Nv,Nsim+1)xsim=np.nan*np.ones((Nx,Nsim+1))xsim[:,0]=x0ysim=np.nan*np.ones((Ny,Nsim+1))yclean=np.nan*np.ones((Ny,Nsim+1))fortinrange(Nsim+1):xsim[:,t]=np.maximum(xsim[:,t],0)yclean[:,t]=np.array(h(xsim[:,t])).flatten()ysim[:,t]=yclean[:,t]+v[:,t]ift<Nsim:xsim[:,t+1]=np.array(plant(x0=xsim[:,t])['xf']).flatten()+w[:,t]t=np.arange(Nsim+1)*Delta# Package everything uppars={'Delta':Delta,'Nsim':Nsim,'Nx':Nx,'Ny':Ny,'Nw':Nw,'Nv':Nv,'sig_v':sig_v,'sig_w':sig_w,'sig_p':sig_p,'P':P,'Q':Q,'R':R,'plant':plant,'frk4':frk4,'f':f,'h':h,'xhat0':xhat0,'x0':x0,'xsim':xsim,'ysim':ysim,'yclean':yclean,'t':t,'reactorplot':reactorplot}returnparsdefbatch_model(x):"""Nonlinear ODE function"""k1=0.5km1=0.05k2=0.2km2=0.01cA=x[0]cB=x[1]cC=x[2]rate1=k1*cA-km1*cB*cCrate2=k2*cB**2-km2*cCreturncasadi.vertcat(-rate1,rate1-2*rate2,rate1+rate2)defmeasurement(x):"""Linear measurement function"""RT=0.0821*400returnRT*casadi.sum1(x)defreactorplot(x,xhat,y,yhat,yclean,Delta):"""Make a plot of actual and estimated states"""ifnotall(isinstance(arg,np.ndarray)forargin[x,xhat,y,yhat,yclean]):raiseTypeError("Arrays must be numpy arrays")ifnotisinstance(Delta,(int,float)):raiseTypeError("Delta must be numeric")ifos.getenv('OMIT_PLOTS')=='true':returnNonefig=plt.figure()Nx=3Nt=x.shape[1]-1t=np.arange(Nt+1)*Deltaplt.subplot(2,1,2)plt.plot(t,yclean,'-k',label='Actual')plt.plot(t,yclean,'--c',label='Measured')plt.plot(t,yhat,'ok',label='Estimated')plt.ylabel('P')plt.xlabel('t')plt.legend(loc='center left',bbox_to_anchor=(1,0.5))plt.ylim([10,35])plt.subplot(2,1,1)colors=['r','b','g']names=['A','B','C']foriinrange(Nx):plt.plot(t,x[i,:]+1e-10,'-'+colors[i],label=f'Actual C_{names[i]}')plt.plot(t,xhat[i,:]+1e-10,'o'+colors[i],label=f'Estimated C_{names[i]}')plt.xlabel('t')plt.ylabel('c_i')plt.legend(loc='center left',bbox_to_anchor=(1,0.5))plt.ylim([-1,1.5])returnfig
# [makes] batchreactor_ekf.dat# [depends] functions/batchreactor.py## This file simulates the Batch Reactor Example (3.2.1) from Haseltine# & Rawlings (2005). EKF is implemented and it is shown that it fails# when a poor prior is used.importsysimportossys.path.insert(0,os.path.join(os.path.dirname(os.path.abspath(__file__)),'functions'))importnumpyasnpfrommiscimportgnuplotsavefrombatchreactorimportbatchreactordefdoekf(Nsim,clipping):"""Simulate EKF with or without clipping."""pars=batchreactor(Nsim)Nx=pars['Nx']Ny=pars['Ny']Nw=pars['Nw']# Get linearization functions via CasADi Jacobian factories.f=pars['f']Afunc=f.factory('Afunc',f.name_in(),['jac:f:x'])Gfunc=f.factory('Gfunc',f.name_in(),['jac:f:w'])h=pars['h']Cfunc=h.factory('Cfunc',h.name_in(),['jac:h:x'])Nsim=pars['Nsim']x=pars['xsim']y=pars['ysim']Qw=pars['Q']Rv=pars['R']xhat=np.full((Nx,Nsim+1),np.nan)yhat=np.full((Ny,Nsim+1),np.nan)e=np.full((Ny,Nsim+1),np.nan)xhat[:,0]=pars['xhat0']L=[None]*(Nsim+1)P=[None]*(Nsim+1)P[0]=pars['P']fortinrange(Nsim+1):# Measurement.yhat[:,t]=np.array(h(xhat[:,t])).flatten()e[:,t]=y[:,t]-yhat[:,t]# Correction.C=np.array(Cfunc(xhat[:,t])).reshape(Ny,Nx)L[t]=P[t]@C.T@np.linalg.inv(C@P[t]@C.T+Rv)P[t]=P[t]-L[t]@C@P[t]xhat[:,t]=xhat[:,t]+L[t]@e[:,t]ifclipping:xhat[:,t]=np.maximum(xhat[:,t],0)# Advance forecast.ift<Nsim:w=np.zeros(Nw)A=np.array(Afunc(xhat[:,t],w)).reshape(Nx,Nx)G=np.array(Gfunc(xhat[:,t],w)).reshape(Nx,Nw)xhat[:,t+1]=np.array(f(xhat[:,t],w)).flatten()P[t+1]=A@P[t]@A.T+G@Qw@G.T# Form data table: [t, x1, x2, x3, xhat1, xhat2, xhat3, y, yhat]returnnp.column_stack([pars['t'],x.T,xhat.T,y.T,yhat.T])print('Simulating without clipping.')data_noclip=doekf(120,False)print('Simulating with clipping.')data_yesclip=doekf(600,True)gnuplotsave('batchreactor_ekf.dat',{'noclip':data_noclip,'yesclip':data_yesclip})