Commit 4c6101a1 authored by augustin_s's avatar augustin_s 🐍
Browse files

prototype for a camera server client plus CLI control script

parent 41ea3fa0
from types import SimpleNamespace
from .waiting_epics import PV, caput, caget
from .utils import intify
class CameraClient:
def __init__(self, name):
self.name = name
pv_roi_xmin = PV(name + ":REGIONX_START")
pv_roi_xmax = PV(name + ":REGIONX_END")
pv_roi_ymin = PV(name + ":REGIONY_START")
pv_roi_ymax = PV(name + ":REGIONY_END")
self.pvs = SimpleNamespace(
xmin=pv_roi_xmin,
xmax=pv_roi_xmax,
ymin=pv_roi_ymin,
ymax=pv_roi_ymax
)
self.max_roi = self.get_max_roi()
def __repr__(self):
tn = type(self).__name__
fn = self.name
return f"{tn}(\"{fn}\")"
@property
def status(self):
head = repr(self)
print(head)
print("-" * len(head))
channels = ("WARNCODE", "ERRCODE", "STATUSCODE", "BUSY", "CAMRATE", "FILERATE")
maxlen = max(len(ch) for ch in channels)
for ch in channels:
fch = self.name +":" + ch
val = caget(fch)
line = ch + ": "
line = line.ljust(maxlen + 2)
line += str(val)
print(line)
print("-" * len(head))
roi = self.get_roi()
full = self.get_max_roi()
print(f"roi: {roi}")
print(f"full: {full}")
def restart(self):
self.off()
self.offline()
self.init()
sleep(0.01)
self.running()
def offline(self):
caput(self.name + ":INIT", 0) # OFFLINE
def init(self):
caput(self.name + ":INIT", 1) # INIT
def off(self):
caput(self.name + ":CAMERA", 0) # OFF
def running(self):
caput(self.name + ":CAMERA", 1) # RUNNING
def reset_roi(self):
caput(self.name + ":RESETROI.PROC", 1) # Reset ROI
def set_parameters(self):
caput(self.name + ":SET_PARAM", 1) # Set Parameters
def clear_buffer(self):
caput(self.name + ":CLEARMEM", 1) # Clear Buffer
def get_max_roi(self):
current_roi = self.get_roi()
self.reset_roi()
max_roi = self.get_roi()
self._set_roi(*current_roi)
return max_roi
def get_roi(self):
xmin = self.pvs.xmin.get()
xmax = self.pvs.xmax.get()
ymin = self.pvs.ymin.get()
ymax = self.pvs.ymax.get()
return intify(xmin, xmax, ymin, ymax)
def set_roi(self, xmin, xmax, ymin, ymax, debug=False):
if debug:
asked = (xmin, xmax, ymin, ymax)
xminmin, xmaxmax, yminmin, ymaxmax = self.max_roi
xmin = max(xmin, xminmin)
xmax = min(xmax, xmaxmax)
ymin = max(ymin, yminmin)
ymax = min(ymax, ymaxmax)
ymindelta = ymin - yminmin
ymaxdelta = ymaxmax - ymax
ydelta = min(ymindelta, ymaxdelta)
ymin = yminmin + ydelta
ymax = ymaxmax - ydelta
if debug:
adjusted = (xmin, xmax, ymin, ymax)
print(" ", asked, "\n-> ", adjusted, ":", ydelta, ymindelta, ymaxdelta)
self._set_roi(xmin, xmax, ymin, ymax, debug=debug)
def _set_roi(self, xmin, xmax, ymin, ymax, debug=False):
xmin, xmax, ymin, ymax = intify(xmin, xmax, ymin, ymax)
self.off()
self.pvs.xmin.put(xmin)
self.pvs.xmax.put(xmax)
self.pvs.ymin.put(ymin)
self.pvs.ymax.put(ymax)
self.set_parameters()
self.clear_buffer()
self.running()
if debug:
print("-->", self.get_roi(), "\n")
def intify(*args):
return [int(round(i)) for i in args]
import epics
from epics import caget
def caput(*args, wait=True, **kwargs):
return epics.caput(*args, wait=wait, **kwargs)
class PV(epics.PV):
def put(self, *args, wait=True, **kwargs):
super().put(*args, wait=wait, **kwargs)
#!/usr/bin/env python
import argparse
from time import sleep
from cameras.cameraclient import CameraClient
def main():
commands = ["restart", "roi", "status", "test"]
commands.sort()
printable_commands = ", ".join(commands)
parser = argparse.ArgumentParser(description="CLI Cam (esh)")
parser.add_argument("-c", "--camera", help="camera name", default="SATES24-CAMS161-M1")
parser.add_argument("-v", "--verbose", help="verbose output", action="store_true")
subparsers = parser.add_subparsers(title="commands", dest="command", help=printable_commands)
subparser_restart = subparsers.add_parser("restart", description="Restart camera server")
subparser_roi = subparsers.add_parser("roi", description="Set ROI")
subparser_status = subparsers.add_parser("status", description="Print status")
subparser_test = subparsers.add_parser("test", description="Test setting ROIs")
for sp in subparsers.choices.values():
sp.add_argument("-v", "--verbose", help="verbose output", action="store_true")
subparser_restart.add_argument("-r", "--reset", help="reset ROI", action="store_true")
subparser_roi.add_argument("xmin", type=int, help="x min")
subparser_roi.add_argument("xmax", type=int, help="x max")
subparser_roi.add_argument("ymin", type=int, help="y min")
subparser_roi.add_argument("ymax", type=int, help="y max")
clargs = parser.parse_args()
command = clargs.command
if command is None:
parser.print_help()
raise SystemExit(1)
camera = CameraClient(clargs.camera)
if clargs.verbose:
print(camera)
print(f"command: {command}")
# print(clargs)
if command == "restart":
do_restart(camera, clargs)
elif command == "roi":
do_roi(camera, clargs)
elif command == "test":
do_test(camera, clargs)
elif command == "status":
do_status(camera, clargs)
else:
print(f"nothing assigned to command: {command}")
parser.print_help()
raise SystemExit(2)
def do_restart(camera, clargs):
if not clargs.reset:
roi = camera.get_roi()
camera.restart()
if not clargs.reset:
camera.set_roi(*roi, debug=clargs.verbose) #TODO: workaround! why does the first try always set to: [101, 2400, 100, 2061]?
camera.set_roi(*roi, debug=clargs.verbose)
def do_roi(camera, clargs):
camera.set_roi(clargs.xmin, clargs.xmax, clargs.ymin, clargs.ymax, debug=clargs.verbose)
def do_status(camera, clargs):
camera.status
def do_test(camera, _clargs):
camera.reset_roi() # [1, 2560, 1, 2160]
print(camera.get_roi())
for i in (1, 10, 100, 200, 300, 999, 1000, 1001):
camera.set_roi(i, 1500, i, 1900, debug=True)
if __name__ == "__main__":
main()
#CA client library tcp receive thread terminating due to a non-standard C++ exception
#FATAL: exception not rethrown
#Aborted (core dumped)
# "solved" by
sleep(0.0001)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment