Commit f1f26915 authored by reiche's avatar reiche

Initial Commit

parent 1e2bafcb
This diff is collapsed.
import sys
import numpy as np
from subprocess import Popen, run
import tempfile
from os import chdir, listdir
# search path for online model
sys.path.append('/sf/bd/applications/OnlineModel/current')
from OMAppTemplate import ApplicationTemplate
from OMMadxLat import *
import OMFacility
#import OMElegant as Elegant
#import OMDistribution as Distribution
import OMMachineInterface
class DispersionModel:
def __init__(self):
self.Facility = OMFacility.Facility(1, 0) # initialize the lattice
self.Facility.forceEnergyAt('SINLH01.DBAM010', 140e6)
self.MI=OMMachineInterface.MachineInterface()
self.Madx=OMMadXLattice()
self.Facility.writeFacility(self.MI)
def CorrectorResponse(self,didx=0):
cors=['MQUA150','MQSK160','MQSK300','MQSK420','MQUA430']
sv={}
res={}
for cor in cors:
tag='SARCL02.'+cor
val=self.Facility.ElementDB[tag].k1
sv[cor]={'Name':tag,'Val':val}
opt=self.selectBeamline(didx)
sref,dxref,dyref,names=self.track(opt)
for i,cor in enumerate(cors):
for corall in cors:
if corall == cor:
self.Facility.ElementDB[sv[corall]['Name']].k1=sv[corall]['Val']+0.01
else:
self.Facility.ElementDB[sv[corall]['Name']].k1=sv[corall]['Val']
s,dx,dy,names=self.track(opt)
res[cor]={'X':dx-dxref,'Y':dy-dyref}
return sref,res,names
def EnergyResponse(self,didx=0):
opt=self.selectBeamline(didx)
s1,dx1,dy1,names=self.track(opt)
self.scaleMagnets(1)
s,dx2,dy2,names=self.track(opt)
return s,dx2-dx1,dy2-dy1,names
def selectBeamline(self,didx=0):
dest='SARUN20'
start='SARCL01'
opt=[24.58,1.705,10.035,-0.7964]
if didx >0:
dest='SATDI01'
start='S20SY02'
if 'SAT' in dest:
ang = self.Facility.getRegExpElement('S20SY02', 'MKAC', 'design_kick')
self.Facility.setRegExpElement('S20SY02', 'MKAC0.0', 'cory', ang[0])
ang = self.Facility.getRegExpElement('S20SY02', 'MKDC', 'design_kick')
self.Facility.setRegExpElement('S20SY02', 'MKDC0.0', 'cory', ang[0])
self.Facility.setRegExpElement('S20SY02', 'MBND', 'angle', 2)
else:
self.Facility.setRegExpElement('S20SY02', 'MK.C0.0', 'cory', 0)
self.Facility.setRegExpElement('S20SY02', 'MBND', 'angle', 0)
sec = self.Facility.getSection(dest)
path = sec.mapping
self.line = self.Facility.BeamPath(path)
self.line.setRange(start,dest)
self.tempdir=tempfile.TemporaryDirectory()
return opt
# single madx tracking
def track(self,opt):
self.Madx.clear()
self.Madx.write('option,-echo;\n')
self.Madx.write('betax0=%f;\n' % opt[0])
self.Madx.write('betay0=%f;\n' % opt[2])
self.Madx.write('alphax0=%f;\n'% opt[1])
self.Madx.write('alphay0=%f;\n\n'% opt[3])
self.Madx.write('beam, particle=electron,energy=3000,sigt=1e-3,sige=1e-4;\n\n')
# write the lattice
self.line.writeLattice(self.Madx, self.Facility) # write lattice to madx
self.Madx.write('use, sequence=swissfel;\n')
self.Madx.write('Select,flag=Error,pattern="SAT.*";\n')
self.Madx.write('EALIGN,DY=0.01;\n')
self.Madx.write('select, flag=twiss, column=NAME,S,BETX,ALFX,BETY,ALFY,DX,DY;\n')
self.Madx.write('twiss, range=#s/#e, sequence=swissfel,betx=betax0,bety=betay0,alfx=alphax0,alfy=alphay0,file="twiss.dat";\n')
self.Madx.write('plot, haxis = s, vaxis = betx, bety, range = #s/#e,colour=100;\n')
self.Madx.write('plot, haxis = s, vaxis = dx, dy, range = #s/#e,colour=100;\n')
self.Madx.write('exit;')
with open(self.tempdir.name+'/tmp-lattice.madx', 'w') as f:
for line in self.Madx.cc:
f.write(line)
# self.Log.appendPlainText('Tracking with MadX...')
chdir(self.tempdir.name)
self.p = run(["/sf/bd/bin/madx", "tmp-lattice.madx"])
return self.parseTwissFile()
def parseTwissFile(self):
start = False
skipline = False
res = []
with open(self.tempdir.name+'/twiss.dat') as f:
for line in f:
if '* NAME' in line:
names = line.split()[1:]
res.append(names)
start = True
skipline = True
continue
elif start is False:
continue
elif skipline is True:
skipline = False
continue
else:
val = line.split()
res.append(val)
nlen=len(res)-1
s=np.zeros((nlen))
dx=np.zeros((nlen))
dy=np.zeros((nlen))
names=[]
for i in range(1,nlen+1):
line=res[i]
names.append(line[0][1:-1])
s[i-1]=float(line[1])
dx[i-1]=float(line[6])
dy[i-1]=float(line[7])
return s,dx,dy,names
def scaleMagnets(self,val):
scl=1+0.01*val
for ele in self.Facility.ElementDB.keys():
if 'MQUA' in ele and 'SAR' in ele:
self.Facility.ElementDB[ele].k1*=scl
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
import sys
import time
import numpy as np
from numpy.linalg import inv,svd
from epics import PV,caput,caget
# Online Model specific, should become obsolete in the future.
sys.path.append('/sf/bd/applications/OnlineModel/current')
from OMMagnetCalibration import *
from OMFacility import *
class OrbitSteering:
def __init__(self, loc='SARCL'):
# get magnet callibration for steerer
self.Facility=Facility(1,0) # initialize the lattice and magnet callibration
self.calMagnet =SwissFELMagnet()
self.Facility.writeFacility(self.calMagnet)
# connection to the online model server for live update
self.PVOM=PV('OPTICSSERVER:STATUS-LIVE',callback=self.newModelAvailable)
self.notConnected=True
# BPMs and Steerer for SARCL02
self.bpm=[]
self.bpmidx=[]
self.PVbpm=[]
self.PVbpmFB=[]
self.PVbpmvalid=[]
self.cor=[]
self.coridx=[]
self.PVcor=[]
if loc is 'SARCL': # ARAMIS ENERGY COLIIMATOR
self.loc='SARCL'
for idx in np.array([220,260,330,470]):
self.bpm.append('SARCL02-DBPM%3.3d' % idx)
self.bpmidx.append(0)
self.PVbpm.append(PV('SARCL02-DBPM%3.3d:X1' % idx))
self.PVbpm.append(PV('SARCL02-DBPM%3.3d:Y1' % idx))
self.PVbpmFB.append(PV('SARCL02-DBPM%3.3d:X-REF-FB' % idx))
self.PVbpmFB.append(PV('SARCL02-DBPM%3.3d:Y-REF-FB' % idx))
self.PVbpmvalid.append(PV('SARCL02-DBPM%3.3d:X1-VALID' % idx))
self.PVbpmvalid.append(PV('SARCL02-DBPM%3.3d:Y1-VALID' % idx))
for idx in np.array([120,230,240,320,340]):
self.cor.append('SARCL02-MCOR%3.3d' % idx)
self.coridx.append(0)
self.PVcor.append(PV('SARCL02-MCRX%3.3d:I-SET' % idx))
self.PVcor.append(PV('SARCL02-MCRY%3.3d:I-SET' % idx))
self.Energy=PV('SARCL02-MBND100:ENERGY-OP')
else: # ATHOS SWITCHYARD
self.loc='SATSY'
print('building up lattice for Athos')
for idx in np.array([60,100,240,290]):
self.bpm.append('SATSY01-DBPM%3.3d' % idx)
self.bpmidx.append(0)
self.PVbpm.append(PV('SATSY01-DBPM%3.3d:X2' % idx))
self.PVbpm.append(PV('SATSY01-DBPM%3.3d:Y2' % idx))
self.PVbpmFB.append(PV('SATSY01-DBPM%3.3d:X-REF-FB' % idx))
self.PVbpmFB.append(PV('SATSY01-DBPM%3.3d:Y-REF-FB' % idx))
self.PVbpmvalid.append(PV('SATSY01-DBPM%3.3d:X2-VALID' % idx))
self.PVbpmvalid.append(PV('SATSY01-DBPM%3.3d:Y2-VALID' % idx))
for idx in np.array([20]):
self.bpm.append('SATSY02-DBPM%3.3d' % idx)
self.bpmidx.append(0)
self.PVbpm.append(PV('SATSY02-DBPM%3.3d:X2' % idx))
self.PVbpm.append(PV('SATSY02-DBPM%3.3d:Y2' % idx))
self.PVbpmFB.append(PV('SATSY02-DBPM%3.3d:X-REF-FB' % idx))
self.PVbpmFB.append(PV('SATSY02-DBPM%3.3d:Y-REF-FB' % idx))
self.PVbpmvalid.append(PV('SATSY02-DBPM%3.3d:X2-VALID' % idx))
self.PVbpmvalid.append(PV('SATSY02-DBPM%3.3d:Y2-VALID' % idx))
for idx in np.array([22,40,70,90,210,230,260,282,300]):
self.cor.append('SATSY01-MCOR%3.3d' % idx)
self.coridx.append(0)
self.PVcor.append(PV('SATSY01-MCRX%3.3d:I-SET' % idx))
self.PVcor.append(PV('SATSY01-MCRY%3.3d:I-SET' % idx))
self.Energy=PV('SATSY01-MBND200:ENERGY-OP')
self.R=np.zeros((4,4,3000))
self.iact=len(self.coridx)
self.isen=len(self.bpmidx)
self.T=np.zeros((2*self.isen,2*self.iact))
self.needsUpdate=True
self.getLocation()
def getLocation(self):
if self.loc is 'SARCL':
locnames=caget('OPTICSSERVER:ARAMIS-ELEMENT-LIVE')
if locnames is None:
print('*** ERROR: Cannot initialize Online Model')
return
names=bytes(caget('OPTICSSERVER:ARAMIS-ELEMENT-LIVE')).decode().split(';')
for i in range(len(names)):
if 'SARCL02' in names[i] and 'DBPM' in names[i] and 'MARK' in names[i]:
for j in range(len(self.bpm)):
tag=self.bpm[j][12:15]
if tag in names[i]:
self.bpmidx[j]=i
if 'SARCL02.M' in names[i] and 'CX' in names[i]:
for j in range(len(self.cor)):
tag=self.cor[j][12:15]
if tag in names[i]:
self.coridx[j]=i
else:
locnames=caget('OPTICSSERVER:ARAMIS-ELEMENT-LIVE')
if locnames is None:
print('*** ERROR: Cannot initialize Online Model')
return
names=bytes(caget('OPTICSSERVER:ATHOS-ELEMENT-LIVE')).decode().split(';')
for i in range(len(names)):
if ('SATSY01' in names[i] or 'SATSY02' in names[i]) and 'DBPM' in names[i] and 'MARK' in names[i]:
for j in range(len(self.bpm)):
tag=self.bpm[j][12:15]
if tag in names[i]:
self.bpmidx[j]=i
if 'SATSY01.M' in names[i] and 'CX' in names[i]:
for j in range(len(self.cor)):
tag=self.cor[j][12:15]
if tag in names[i]:
self.coridx[j]=i
self.notConnected=False
def getRValues(self):
# update machine
branch='ARAMIS'
if self.loc is 'SATSY':
branch='ATHOS'
for i in range(4):
for j in range(4):
self.R[i][j]=np.array(caget('OPTICSSERVER:%s-R%d%d-LIVE' % (branch,i+1,j+1)))
def constructRMatrix(self,s1,s2): # s2 is index of BPM, s1 is index of corrector
if s2 < s1:
return np.zeros((4,4))
R1=np.zeros((4,4))
R2=np.zeros((4,4))
for i in range(4):
for j in range(4):
R1[i][j]=self.R[i][j][s1]
R2[i][j]=self.R[i][j][s2]
return np.dot(R2,inv(R1))
def newModelAvailable(self,pvname=None,**kw):
self.wait=False
def calcResponseMatrix(self,updateOM=False):
if updateOM:
self.wait=True
caput('OPTICSSERVER:UPDATE',1)
icount=20
while self.wait:
time.sleep(0.5)
icount=icount-1
if icount <1:
self.wait=False
self.needsUpdate=True
return
# update machine and track new optics function
self.getRValues()
# construct responce matrix
for j in range(self.iact):
for i in range(self.isen):
R=self.constructRMatrix(self.coridx[j],self.bpmidx[i])
self.T[2*i][2*j]=R[0][1]
self.T[2*i][2*j+1]=R[2][1]
self.T[2*i+1][2*j]=R[0][3]
self.T[2*i+1][2*j+1]=R[2][3]
self.u,self.s,self.vh=svd(self.T) # do the SVD decomposition
self.sinv=[1/s for s in self.s]
self.uinv=np.transpose(self.u)
self.vinv=np.transpose(self.vh)
self.needsUpdate=False
def steer(self,gain,ncut=20):
if self.needsUpdate:
return False
# get BPM readings
X =[p.get() for p in self.PVbpm]
XFB =[p.get() for p in self.PVbpmFB]
VAL =[p.get() for p in self.PVbpmvalid]
if None in X or 0 in VAL or None in XFB:
return False
# calculate the correction
self.X=np.array(X)-np.array(XFB)
S=np.zeros((2*self.iact,2*self.isen))
for i in range(ncut):
if i < 2*self.iact and i < 2*self.isen:
S[i][i]=self.sinv[i]
self.C=np.dot(self.vinv,np.dot(S,np.dot(self.uinv,self.X)))
self.Xres=self.X-np.dot(self.T,self.C)
# apply correction
Cor=-self.C*gain
self.I=self.C
erg=self.Energy.get()
for i in range(self.iact):
name=self.cor[i].replace('-','.')
val=Cor[2*i]
Ix=self.calMagnet.CorrectorKick2I(name,val,erg*1000) # MeV-> eV but mrad -> rad : -> scale energy by 1000
self.I[2*i]=Ix
val=Cor[2*i+1]
Iy=self.calMagnet.CorrectorKick2I(name,val,erg*1000)
self.I[2*i+1]=Iy
Icurrent=[p.get() for p in self.PVcor]
if None in Icurrent:
return False
Iset=np.array(Icurrent)+self.I
for i in range(2*self.iact):
self.PVcor[i].put(Iset[i])
return True
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment