# pars = batchreactor(Nsim)## Returns a struct of batch reactor parameters to avoid duplication across# MHE, EKF, and UKF scripts.importosimportnumpyasnpimportscipy.ioimportmatplotlib.pyplotaspltimportcasadiimportmpctoolsdefbatchreactor(Nsim):""" pars = batchreactor(Nsim) Returns a struct of batch reactor parameters to avoid duplication across MHE, EKF, and UKF scripts. """ifnotisinstance(Nsim,int):raiseTypeError("Nsim must be an integer")# SizesDelta=0.25Nx=3Ny=1Nw=NxNv=Ny# Random variable standard deviationssig_v=0.25# Measurement noisesig_w=0.001# State noisesig_p=0.5# PriorP=sig_p**2*np.eye(Nx)Q=sig_w**2*np.eye(Nw)R=sig_v**2*np.eye(Ny)# Create symbolic variablesx_sym=casadi.SX.sym('x',Nx)# Define ODE systemode={'x':x_sym,'ode':batch_model(x_sym)}# Create integratorplant=casadi.integrator('integrator','cvodes',ode,0,Delta)# Create RK4 discretization with M=4 substeps to match MATLAB (getCasadiFunc M=4)frk4=mpctools.getCasadiFunc(batch_model,[Nx],['x'],funcname='frk4',rk4=True,Delta=Delta,M=4)# Create state transition and measurement functionsw_sym=casadi.SX.sym('w',Nw)f=casadi.Function('f',[x_sym,w_sym],[frk4(x_sym)+w_sym],['x','w'],['f'])h=casadi.Function('h',[x_sym],[measurement(x_sym)],['x'],['h'])# Choose priorxhat0=np.array([1.0,0.0,4.0])x0=np.array([0.5,0.05,0.0])# Simulate system to get datanp.random.seed(0)w=sig_w*np.random.randn(Nw,Nsim)v=sig_v*np.random.randn(Nv,Nsim+1)xsim=np.nan*np.ones((Nx,Nsim+1))xsim[:,0]=x0ysim=np.nan*np.ones((Ny,Nsim+1))yclean=np.nan*np.ones((Ny,Nsim+1))fortinrange(Nsim+1):xsim[:,t]=np.maximum(xsim[:,t],0)yclean[:,t]=np.array(h(xsim[:,t])).flatten()ysim[:,t]=yclean[:,t]+v[:,t]ift<Nsim:xsim[:,t+1]=np.array(plant(x0=xsim[:,t])['xf']).flatten()+w[:,t]t=np.arange(Nsim+1)*Delta# Package everything uppars={'Delta':Delta,'Nsim':Nsim,'Nx':Nx,'Ny':Ny,'Nw':Nw,'Nv':Nv,'sig_v':sig_v,'sig_w':sig_w,'sig_p':sig_p,'P':P,'Q':Q,'R':R,'plant':plant,'frk4':frk4,'f':f,'h':h,'xhat0':xhat0,'x0':x0,'xsim':xsim,'ysim':ysim,'yclean':yclean,'t':t,'reactorplot':reactorplot}returnparsdefbatch_model(x):"""Nonlinear ODE function"""k1=0.5km1=0.05k2=0.2km2=0.01cA=x[0]cB=x[1]cC=x[2]rate1=k1*cA-km1*cB*cCrate2=k2*cB**2-km2*cCreturncasadi.vertcat(-rate1,rate1-2*rate2,rate1+rate2)defmeasurement(x):"""Linear measurement function"""RT=0.0821*400returnRT*casadi.sum1(x)defreactorplot(x,xhat,y,yhat,yclean,Delta):"""Make a plot of actual and estimated states"""ifnotall(isinstance(arg,np.ndarray)forargin[x,xhat,y,yhat,yclean]):raiseTypeError("Arrays must be numpy arrays")ifnotisinstance(Delta,(int,float)):raiseTypeError("Delta must be numeric")ifos.getenv('OMIT_PLOTS')=='true':returnNonefig=plt.figure()Nx=3Nt=x.shape[1]-1t=np.arange(Nt+1)*Deltaplt.subplot(2,1,2)plt.plot(t,yclean,'-k',label='Actual')plt.plot(t,yclean,'--c',label='Measured')plt.plot(t,yhat,'ok',label='Estimated')plt.ylabel('P')plt.xlabel('t')plt.legend(loc='center left',bbox_to_anchor=(1,0.5))plt.ylim([10,35])plt.subplot(2,1,1)colors=['r','b','g']names=['A','B','C']foriinrange(Nx):plt.plot(t,x[i,:]+1e-10,'-'+colors[i],label=f'Actual C_{names[i]}')plt.plot(t,xhat[i,:]+1e-10,'o'+colors[i],label=f'Estimated C_{names[i]}')plt.xlabel('t')plt.ylabel('c_i')plt.legend(loc='center left',bbox_to_anchor=(1,0.5))plt.ylim([-1,1.5])returnfig
# [makes] batchreactor_mhe.dat# [depends] functions/batchreactor.py## Simulates the Batch Reactor Example (3.2.1) from Haseltine & Rawlings (2005).# MHE is implemented and it is shown that it performs better than EKF and UKF# when a poor prior is used.importsysimportossys.path.insert(0,os.path.join(os.path.dirname(os.path.abspath(__file__)),'functions'))importnumpyasnpimportscipy.linalgimportcasadiimportmpctoolsasmpcfrommiscimportgnuplotsavefrombatchreactorimportbatchreactorNsim=120pars=batchreactor(Nsim)Nx=pars['Nx']Ny=pars['Ny']Nw=pars['Nw']Nv=pars['Nv']Nt=10xsim=pars['xsim']ysim=pars['ysim']sig_w=pars['sig_w']sig_v=pars['sig_v']# Stage cost: w'w/sig_w^2 + v'v/sig_v^2w_sym=casadi.SX.sym('w',Nw)v_sym=casadi.SX.sym('v',Nv)l=casadi.Function('l',[w_sym,v_sym],[casadi.dot(w_sym,w_sym)/sig_w**2+casadi.dot(v_sym,v_sym)/sig_v**2],['w','v'],['l'])# ── Smoothing prior support ──────────────────────────────────────────────────# Jacobian functions for EKF stepx_jac=casadi.SX.sym('x_jac',Nx)w_jac=casadi.SX.sym('w_jac',Nw)f_jac=pars['f'](x_jac,w_jac)A_fn=casadi.Function('A_jac',[x_jac,w_jac],[casadi.jacobian(f_jac,x_jac)])Gdyn_fn=casadi.Function('Gdyn_jac',[x_jac,w_jac],[casadi.jacobian(f_jac,w_jac)])C_fn=casadi.Function('C_jac',[x_jac],[casadi.jacobian(pars['h'](x_jac),x_jac)])# Shooting function from x[1] over Nt steps (Nt-1 [v,w] pairs, last meas has no v)# W = [v[1];w[1]; v[2];w[2]; ...; v[Nt-1];w[Nt-1]] shape (Nt-1)*(Nv+Nw)x1_sh=casadi.SX.sym('x1',Nx)W_sh=casadi.SX.sym('W',(Nt-1)*(Nv+Nw))x_cur=x1_shy_sh=[]idx=0forkinrange(Nt-1):v_k=W_sh[idx:idx+Nv];idx+=Nvw_k=W_sh[idx:idx+Nw];idx+=Nwy_sh.append(pars['h'](x_cur)+v_k)x_cur=pars['f'](x_cur,w_k)y_sh.append(pars['h'](x_cur))# y[Nt]: no v[Nt] (matches Octave convention)Y_sh=casadi.vertcat(*y_sh)# shape (Nt*Ny,)shoot_fn=casadi.Function('shoot',[x1_sh,W_sh],[Y_sh,casadi.jacobian(Y_sh,x1_sh),casadi.jacobian(Y_sh,W_sh)],['x1','W'],['Y','O','G'])# Noise covariance block for Q_shoot = block_diag([R, Q_proc] * (Nt-1))R_mat=sig_v**2*np.eye(Nv)Q_proc=sig_w**2*np.eye(Nw)Q_shoot=scipy.linalg.block_diag(*([scipy.linalg.block_diag(R_mat,Q_proc)]*(Nt-1)))defsmoothing_update(solver,Pinv):""" Returns (Pinv_new, x0bar_new) using the MHE smoothing prior update (matches Octave mpctools MHESolver.smoothingupdate). """xhat_0=np.array(solver.var['x',0]).flatten()xhat_1=np.array(solver.var['x',1]).flatten()# EKF step: measurement update at x[0], then predict to x[1]P=np.linalg.inv(Pinv)xhatm=np.array(pars['f'](xhat_0,np.zeros(Nw))).flatten()A=np.array(A_fn(xhat_0,np.zeros(Nw))).reshape(Nx,Nx)G_dyn=np.array(Gdyn_fn(xhat_0,np.zeros(Nw))).reshape(Nx,Nw)C=np.array(C_fn(xhat_0)).reshape(Ny,Nx)# H = I (additive measurement noise), so H*R*H' = R_matS_ekf=C@P@C.T+R_matPm=(G_dyn@Q_proc@G_dyn.T+A@P@A.T-(A@P@C.T)@np.linalg.solve(S_ekf,C@P@A.T))Pm=0.5*(Pm+Pm.T)# symmetrize# Shooting from xhatm (zero noise linearization point)W_zeros=np.zeros((Nt-1)*(Nv+Nw))res=shoot_fn(x1=xhatm,W=W_zeros)Y_bar=np.array(res['Y']).flatten()# (Nt*Ny,)O_mat=np.array(res['O']).reshape(Nt*Ny,Nx)# (Nt*Ny, Nx)G_mat=np.array(res['G']).reshape(Nt*Ny,(Nt-1)*(Nv+Nw))# (Nt*Ny, ...)# Actual measurements y[1..Nt] from solver (skip oldest y[0])Y=np.concatenate([np.array(solver.par['y',k]).flatten()forkinrange(1,Nt+1)])E=Y-Y_bar# Innovation covariance from noise only (Pyx = G Q G')Pyx=G_mat@Q_shoot@G_mat.TPyx+=1e-10*np.eye(Nt*Ny)# numerical regularization# Pinv_new = Pm^{-1} (by Woodbury: Pxy^{-1} - O' Pyx^{-1} O = Pm^{-1})Pinv_new=np.linalg.inv(Pm)Pinv_new=0.5*(Pinv_new+Pinv_new.T)# x0bar_new = xhat[1] + Pm O' Pyx^{-1} (O (xhat[1] - xhatm) - E)x0bar_new=xhat_1+Pm@O_mat.T@np.linalg.solve(Pyx,O_mat@(xhat_1-xhatm)-E)returnPinv_new,x0bar_new# ── Prior cost: (x - x0bar)' Pinv (x - x0bar), with Pinv as a solver parameter ─x_lx=casadi.SX.sym('x_lx',Nx)xb_lx=casadi.SX.sym('x0bar_lx',Nx)Pf_lx=casadi.SX.sym('Pinv_flat',Nx*Nx)dx_lx=x_lx-xb_lxlx=casadi.Function('lx',[x_lx,xb_lx,Pf_lx],[casadi.mtimes(dx_lx.T,casadi.mtimes(casadi.reshape(Pf_lx,Nx,Nx),dx_lx))],['x','x0bar','Pinv_flat'],['lx'])Pinv=np.linalg.inv(pars['P'])x0bar=pars['xhat0'].copy()N={'x':Nx,'w':Nw,'y':Ny,'t':Nt}lb={'x':np.zeros((Nt+1,Nx))}ub={'x':10*np.ones((Nt+1,Nx))}# y is TIME-FIRST: shape (Nt+1, Ny). Load first full window of real data.y_init=ysim[:,:Nt+1].T.copy()# (Nt+1, Ny)solver=mpc.nmhe(pars['f'],pars['h'],np.zeros((Nt,0)),y_init,l,N,lx=lx,x0bar=x0bar,lb=lb,ub=ub,funcargs={'f':['x','w'],'h':['x'],'l':['w','v'],'lx':['x','x0bar','Pinv_flat']},par={'Pinv_flat':Pinv.ravel(order='F')},verbosity=0)xhat=np.full((Nx,Nsim+1),np.nan)yhat=np.full((Ny,Nsim+1),np.nan)fortinrange(Nsim+1):ift<=Nt:# Build up horizon: pad early slots with first available measurement.forkinrange(Nt+1):solver.par['y',k]=ysim[:,max(0,t-Nt+k)]else:solver.newmeasurement(ysim[:,t])status=solver.solve()ifsolver.stats['status']!='Solve_Succeeded':print('Warning: Solver failure at time %d!'%t)breaksolver.saveguess(toffset=1)xhat[:,t]=np.array(solver.var['x',Nt]).flatten()yhat[:,t]=np.array(pars['h'](xhat[:,t])).flatten()ift>=Nt:# Smoothing prior update (horizon full).Pinv,x0bar=smoothing_update(solver,Pinv)solver.par['x0bar',0]=x0barsolver.par['Pinv_flat',0]=Pinv.ravel(order='F')else:# Horizon still growing: just advance x0bar, keep initial Pinv.solver.par['x0bar',0]=np.array(solver.var['x',1]).flatten()data_table=np.column_stack([pars['t'],xsim.T,xhat.T,ysim.T,yhat.T])gnuplotsave('batchreactor_mhe.dat',data_table)