from ipywidgets import interactive, interact, interact_manual
from IPython.display import display
from IPython.display import HTML
import ipywidgets as widgets
import numpy as np
import matplotlib.pyplot as plt
import DataManager as dm
class interaction_tool:
def __init__ (self, variable_name, value, device, config):
self.variable_name = variable_name
self.value = value
self.device = device
self.config = config
layout_bt = widgets.Layout(width = '20px')
layout_text = widgets.Layout(width = '180px')
if type(value) == bool:
self.cb = widgets.Checkbox(value=value,
description=variable_name, layout = layout_text)
self.cb.observe(self.change_bool, names = 'value')
self.widget =self.cb
elif type(value) == str:
self.text = widgets.Text(value = value, describtion = variable_name)
self.text.on_submit(self.change_str)
self.widget = self.text
elif np.dtype(type(value)) == np.dtype('float64'):
self.flt = widgets.Text(value=str(np.round(value,4)),description=variable_name, layout = layout_text)
self.step = widgets.FloatText(value=0.1,description='step', layout = layout_text)
self.bp = widgets.Button(description='+', layout = layout_bt)
self.bm = widgets.Button(description='-', layout = layout_bt)
self.flt.on_submit(self.change_by_enter_float)
self.bp.on_click(self.change_p_float)
self.bm.on_click(self.change_m_float)
self.widget = widgets.HBox([self.flt, self.step, self.bm, self.bp])
elif np.dtype(type(value)) == np.dtype('int64') or np.dtype(type(value)) == np.dtype('int32'):
self.flt = widgets.Text(value=str(np.round(value,4)),description=variable_name, layout = layout_text)
self.bp = widgets.Button(description='+1', layout = layout_bt)
self.bm = widgets.Button(description='-1', layout = layout_bt)
self.flt.on_submit(self.change_by_enter_int)
self.bp.on_click(self.change_p_int)
self.bm.on_click(self.change_m_int)
self.widget = widgets.HBox([self.flt, self.bm, self.bp])
else:
self.widget = widgets.HTML('Type '+str(np.dtype(type(value)))+' not changeable')
def change_str(self,text):
self.config[self.variable_name] = self.text.value
self.device.set_config(self.config)
def change_bool(self, new):
self.config[self.variable_name] = new['new']
self.device.set_config(self.config)
def change_by_enter_float(self, text):
self.config[self.variable_name] = float(self.flt.value)
self.device.set_config(self.config)
def change_by_enter_int(self, text):
self.config[self.variable_name] = int(self.flt.value)
self.device.set_config(self.config)
def change_p_float(self, bt):
self.config[self.variable_name] = float(self.flt.value) + self.step.value
self.flt.value = str(np.round(float(self.flt.value) + self.step.value,4))
self.device.set_config(self.config)
def change_p_int(self, bt):
self.config[self.variable_name] = int(self.flt.value) + 1
self.flt.value = str(int(self.flt.value) + 1)
self.device.set_config(self.config)
def change_m_int(self, bt):
self.config[self.variable_name] = int(self.flt.value) - 1
self.flt.value = str(int(self.flt.value) - 1)
self.device.set_config(self.config)
def change_m_float(self, bt):
self.config[self.variable_name] = float(self.flt.value) - self.step.value
self.flt.value = str(np.round(float(self.flt.value) - self.step.value,4))
self.device.set_config(self.config)
class plot_config_tool:
def __init__ (self, variable_name, value, plot_config):
self.variable_name = variable_name
self.value = value
self.plot_config = plot_config
layout_bt = widgets.Layout(width = '20px')
layout_text = widgets.Layout(width = 'auto')
if np.dtype(type(value)) == np.dtype('float64') or np.dtype(type(value)) == np.dtype('int64') or np.dtype(type(value)) == np.dtype('int32'):
plot_config[variable_name] = False
self.cb = widgets.Checkbox(description=variable_name, layout = layout_text)
self.cb.observe(self.change_bool, names = 'value')
self.widget = self.cb
else:
self.widget = widgets.HTML('Type '+str(np.dtype(type(value)))+' not history plottalbe')
def change_bool(self, new):
self.plot_config[self.variable_name] = new['new']
class wrapper:
def __init__(self, devices, i, io):
self.device = devices[i]#
self.devices = devices
self.io = io
self.measure_check = False
def measure(self, check):
self.measure_check = check['new']
def main(devices, io, parallel = False):
config = dm.get_config(devices)
config['comment'] = 'start of GUI'
#io.save(config, comment = 'start of GUI')
#main
display(widgets.HTML('
LWFA GUI
'))
display(widgets.HTML('Save and Restore configurations'))
bt_save = widgets.Button(description= 'Save config')
def save(bt):
comment = text_comment.value
if comment == '':
ht_save.value = 'Enter comment to save config!'
return
config = dm.get_config(devices)
io.save(config, comment = comment, elog = check_elog.value)
text_comment.value = ''
ht_save.value = 'Config saved!'
select_restore.options = get_all()
bt_save.on_click(save)
ht_save = widgets.HTML('')
def restore(bt):
restore = select_restore.value
config = io.load(restore)
dm.set_config(devices,config)
refresh('')
ht_restore.value = 'restored config: '+config['comment']+''
select_restore.options = get_all()
def restore_view(inst):
restore = inst.new
try:
config = io.load(restore)
except:
ht_restore.value = 'Error: This config doesnot exist'
return
ht_restore.value = 'selected config: '+config['comment']+''
def get_all():
configs = io.search(output = False)
c = {}
for i in range(len(configs)):
c[configs[i]['comment']] = i
return c
select_restore = widgets.Select(description='restore:', options = get_all())
bt_restore = widgets.Button(description='restore now!')
bt_restore.on_click(restore)
ht_restore = widgets.HTML('selected config: '+config['comment']+'')
display(widgets.HBox([select_restore,bt_restore, ht_restore]))
#browse configs
def find_config(bt):
config = io.load(gui = True)
dm.set_config(devices,config)
refresh('')
print ('restored config: '+config['comment'])
bt_browse = widgets.Button(description='Browse configs')
bt_browse.on_click(find_config)
#display(bt_browse)
text_comment = widgets.Text(description= 'Comment:')
check_elog = widgets.Checkbox(value=False,
description='elog')
display(widgets.HBox([bt_save,text_comment,check_elog,ht_save] ))
def make_controller():
#devices
vbs = []
names = {}
wps = []
meas_check = {}
for i in range(len(devices)):
config = devices[i].get_config()
wps += [wrapper(devices, i, io)]
wgs = []
for item in sorted(config.items()):
wgs += [interaction_tool(item[0],
item[1],
devices[i],
config).widget]
if hasattr(devices[i], 'measure'):
meas_check[i] = widgets.Checkbox(description='measure')
meas_check[i].observe(wps[i].measure, names = 'value')
wgs += [meas_check[i]]
vbs += [widgets.VBox(wgs)]
names[i] = [type(devices[i]).__name__]
tab = widgets.Tab(children = vbs, _titles= names)
return tab, wps
display(widgets.HTML('
Interact with devices'))
bt_refresh = widgets.Button(description='Refresh configs')
global tab
global wps
tab, wps = make_controller()
vb_config = widgets.VBox([bt_refresh, tab])
display(vb_config)
def refresh(bt):
global tab
global wps
tab.close()
tab, wps = make_controller()
vb_config.children = [bt_refresh, tab]
bt_refresh.on_click(refresh)
display(widgets.HTML('
Plot measure configuration'))
bt_plot_config_init = widgets.Button(description='Measure to initialize')
global plot_config
plot_config = {}
def plot_configurator(bt):
#devices
vbs = []
names = {}
wps = []
plot_meas_check = {}
devices2measure = find_devices2measure()
print (devices2measure)
measurement = dm.measure(devices2measure)
global plot_config
plot_config = {}
for i in range(len(devices2measure)):
device_name = type(devices2measure[i]).__name__
plot_config[device_name] = {}
wgs = []
for item in sorted(measurement['measure'][device_name].items()):
wgs += [plot_config_tool(item[0],
item[1],
plot_config[device_name]).widget]
if hasattr(devices2measure[i], 'plot_measure'):
wgs += [plot_config_tool('plot_measure',
0,
plot_config[device_name]).widget]
vbs += [widgets.VBox(wgs)]
names[i] = [type(devices2measure[i]).__name__]
measure_tab = widgets.Tab(children = vbs, _titles= names)
vb_plot_config.children = [bt_plot_config_init, measure_tab]
bt_plot_config_init.on_click(plot_configurator)
vb_plot_config = widgets.VBox([bt_plot_config_init])
display(vb_plot_config)
display(widgets.HTML('
Make measurements'))
loop_inttext = widgets.IntText(value = 10, description = 'loop')
output_check = widgets.Checkbox(description = 'output')
save_check = widgets.Checkbox(description = 'save')
display(widgets.HBox([loop_inttext, output_check, save_check]))
rate_text = widgets.FloatText(value = 5, description = 'rate')
display(rate_text)
meas_bt = widgets.Button(description = 'measure')
clear_hist_bt = widgets.Button(description = 'clear history')
display(widgets.HBox([meas_bt, clear_hist_bt]))
def find_devices2measure():
#load_devices to measure
devices2measure = []
for i in range(len(devices)):
if wps[i].measure_check:
devices2measure += [devices[i]]
return devices2measure
def measure(bt, comment = '', name = None):
devices2measure = find_devices2measure()
#initialize parallelization
if parallel:
from multiprocessing import Process, Pipe
class DataStreamProcess(Process):
def __init__(self, connec, *args, **kwargs):
self.connec = connec
Process.__init__(self, *args, **kwargs)
def run(self):
loop_measure(parallel = True, connec = self.connec)
#loop measure
def loop_measure(parallel = False, connec = None):
for i in range(loop_inttext.value):
import IPython
IPython.display.clear_output(wait = True)
import time
t = time.time()
measurement = dm.measure(devices2measure)
if save_check.value:
if name == None:
io.save(measurement,
kind = 'measure', comment = comment)
else:
io.save(measurement,
kind = 'measure', comment = comment, name = name+'_'+str(i))
if parallel:
connec.send(measurement)
elif output_check.value:
dm.plot_measure(measurement, devices2measure,plot_config, 0.01)
plt.pause(0.001)
plt.draw_all(force = True)
print ('maximal possible rate: ',np.round(1/(time.time() -t),1))
if (1/rate_text.value - (time.time() -t) < 0):
print ('set rate cannot be reached.')
else:
time.sleep(1/rate_text.value - (time.time() -t))
if output_check.value and parallel:
global plot_config
conn1, conn2 = Pipe()
data_stream = DataStreamProcess(conn1)
data_stream.start()
while True:
if not(conn2.poll(0.1)):
if not(data_stream.is_alive()):
break
else:
continue
measurement = conn2.recv()
dm.plot_measure(measurement, devices2measure,plot_config, 0.01)
plt.pause(0.001)
plt.draw_all(force = True)
else:
loop_measure()
def clear_hist(bt):
import Globals
Globals.plot_hist = {}
meas_bt.on_click(measure)
clear_hist_bt.on_click(clear_hist)
display(widgets.HTML('Scanning Tool'))
bt_scan = widgets.Button(description = 'start Scanning Tool')
display(bt_scan)
scanning_tool_box = widgets.VBox([])
display(scanning_tool_box)
def scanning_tool(bt):
def choose_device(bt):
device = select_device.value
def choose_parameter(bt):
par_to_scan = select_scannable.value
config = dm.get_config(devices)
current_value = config[type(device).__name__][par_to_scan]
def scan(bt):
comment = 'scan of '+par_to_scan+' over '+str((minv.value, maxv.value, nsteps.value))
if save_check.value:
import os
import datetime
folder_name = 'scan_'+par_to_scan+'_'+str(minv.value)+'-'+str(maxv.value)+'_'+str(nsteps.value)+'_'+str(datetime.datetime.now()).replace(':','-').replace(' ','_')
os.mkdir(io.path + folder_name)
conifg = dm.get_config(devices)
io.save(config, name = folder_name+'/config_at_start',comment = 'before '+comment)
else:
folder_name = ''
scan_values = np.linspace(minv.value, maxv.value, nsteps.value)
for scan_value in scan_values:
config[type(device).__name__][par_to_scan] = scan_value
dm.set_config(devices, config)
measure('', comment = comment+', current value: '+str(scan_value), name = folder_name+'/'+par_to_scan+'='+str(scan_value))
minv = widgets.FloatText(description='minv:',value = current_value)
maxv = widgets.FloatText(description='maxv:',value = current_value)
nsteps = widgets.IntText(description='nsteps:', value = 1)
bt = widgets.Button(description='Run scan')
bt.on_click(scan)
menue = widgets.VBox([widgets.HTML('current value: '+str(current_value)), minv, maxv, nsteps, bt])
scanning_tool_box.children = list(scanning_tool_box.children) + [menue]
scannable = []
for item in config[type(device).__name__].items():
if np.dtype(type(item[1])) == np.dtype('float64') or np.dtype(type(item[1])) == np.dtype(int):
scannable += [item[0]]
select_scannable = widgets.Select(description = 'parameter to scan: ',options = scannable)
bt_scannable = widgets.Button(description = 'choose parameter')
bt_scannable.on_click(choose_parameter)
hb_scannable = widgets.HBox([select_scannable,bt_scannable])
scanning_tool_box.children = list(scanning_tool_box.children)[:1] + [hb_scannable]
scanning_tool_box.children = []
device_dict = {}
for device in devices:
device_dict[type(device).__name__] = device
select_device = widgets.Select(description = 'device to scan: ',options = device_dict)
bt_device = widgets.Button(description = 'choose device')
bt_device.on_click(choose_device)
hb_device = widgets.HBox([select_device,bt_device])
scanning_tool_box.children = [hb_device]
bt_scan.on_click(scanning_tool)
def get_path():
import tkinter as tk
from tkinter import filedialog
root = tk.Tk()
root.withdraw()
return filedialog.askopenfilename()