from ipywidgets import interactive, interact, interact_manual from IPython.display import display from IPython.display import HTML import ipywidgets as widgets import numpy as np import matplotlib.pyplot as plt import DataManager as dm class interaction_tool: def __init__ (self, variable_name, value, device, config): self.variable_name = variable_name self.value = value self.device = device self.config = config layout_bt = widgets.Layout(width = '20px') layout_text = widgets.Layout(width = '180px') if type(value) == bool: self.cb = widgets.Checkbox(value=value, description=variable_name, layout = layout_text) self.cb.observe(self.change_bool, names = 'value') self.widget =self.cb elif type(value) == str: self.text = widgets.Text(value = value, describtion = variable_name) self.text.on_submit(self.change_str) self.widget = self.text elif np.dtype(type(value)) == np.dtype('float64'): self.flt = widgets.Text(value=str(np.round(value,4)),description=variable_name, layout = layout_text) self.step = widgets.FloatText(value=0.1,description='step', layout = layout_text) self.bp = widgets.Button(description='+', layout = layout_bt) self.bm = widgets.Button(description='-', layout = layout_bt) self.flt.on_submit(self.change_by_enter_float) self.bp.on_click(self.change_p_float) self.bm.on_click(self.change_m_float) self.widget = widgets.HBox([self.flt, self.step, self.bm, self.bp]) elif np.dtype(type(value)) == np.dtype('int64') or np.dtype(type(value)) == np.dtype('int32'): self.flt = widgets.Text(value=str(np.round(value,4)),description=variable_name, layout = layout_text) self.bp = widgets.Button(description='+1', layout = layout_bt) self.bm = widgets.Button(description='-1', layout = layout_bt) self.flt.on_submit(self.change_by_enter_int) self.bp.on_click(self.change_p_int) self.bm.on_click(self.change_m_int) self.widget = widgets.HBox([self.flt, self.bm, self.bp]) else: self.widget = widgets.HTML('Type '+str(np.dtype(type(value)))+' not changeable') def change_str(self,text): self.config[self.variable_name] = self.text.value self.device.set_config(self.config) def change_bool(self, new): self.config[self.variable_name] = new['new'] self.device.set_config(self.config) def change_by_enter_float(self, text): self.config[self.variable_name] = float(self.flt.value) self.device.set_config(self.config) def change_by_enter_int(self, text): self.config[self.variable_name] = int(self.flt.value) self.device.set_config(self.config) def change_p_float(self, bt): self.config[self.variable_name] = float(self.flt.value) + self.step.value self.flt.value = str(np.round(float(self.flt.value) + self.step.value,4)) self.device.set_config(self.config) def change_p_int(self, bt): self.config[self.variable_name] = int(self.flt.value) + 1 self.flt.value = str(int(self.flt.value) + 1) self.device.set_config(self.config) def change_m_int(self, bt): self.config[self.variable_name] = int(self.flt.value) - 1 self.flt.value = str(int(self.flt.value) - 1) self.device.set_config(self.config) def change_m_float(self, bt): self.config[self.variable_name] = float(self.flt.value) - self.step.value self.flt.value = str(np.round(float(self.flt.value) - self.step.value,4)) self.device.set_config(self.config) class plot_config_tool: def __init__ (self, variable_name, value, plot_config): self.variable_name = variable_name self.value = value self.plot_config = plot_config layout_bt = widgets.Layout(width = '20px') layout_text = widgets.Layout(width = 'auto') if np.dtype(type(value)) == np.dtype('float64') or np.dtype(type(value)) == np.dtype('int64') or np.dtype(type(value)) == np.dtype('int32'): plot_config[variable_name] = False self.cb = widgets.Checkbox(description=variable_name, layout = layout_text) self.cb.observe(self.change_bool, names = 'value') self.widget = self.cb else: self.widget = widgets.HTML('Type '+str(np.dtype(type(value)))+' not history plottalbe') def change_bool(self, new): self.plot_config[self.variable_name] = new['new'] class wrapper: def __init__(self, devices, i, io): self.device = devices[i]# self.devices = devices self.io = io self.measure_check = False def measure(self, check): self.measure_check = check['new'] def main(devices, io, parallel = False): config = dm.get_config(devices) config['comment'] = 'start of GUI' #io.save(config, comment = 'start of GUI') #main display(widgets.HTML('

LWFA GUI

')) display(widgets.HTML('Save and Restore configurations')) bt_save = widgets.Button(description= 'Save config') def save(bt): comment = text_comment.value if comment == '': ht_save.value = 'Enter comment to save config!' return config = dm.get_config(devices) io.save(config, comment = comment, elog = check_elog.value) text_comment.value = '' ht_save.value = 'Config saved!' select_restore.options = get_all() bt_save.on_click(save) ht_save = widgets.HTML('') def restore(bt): restore = select_restore.value config = io.load(restore) dm.set_config(devices,config) refresh('') ht_restore.value = 'restored config: '+config['comment']+'' select_restore.options = get_all() def restore_view(inst): restore = inst.new try: config = io.load(restore) except: ht_restore.value = 'Error: This config doesnot exist' return ht_restore.value = 'selected config: '+config['comment']+'' def get_all(): configs = io.search(output = False) c = {} for i in range(len(configs)): c[configs[i]['comment']] = i return c select_restore = widgets.Select(description='restore:', options = get_all()) bt_restore = widgets.Button(description='restore now!') bt_restore.on_click(restore) ht_restore = widgets.HTML('selected config: '+config['comment']+'') display(widgets.HBox([select_restore,bt_restore, ht_restore])) #browse configs def find_config(bt): config = io.load(gui = True) dm.set_config(devices,config) refresh('') print ('restored config: '+config['comment']) bt_browse = widgets.Button(description='Browse configs') bt_browse.on_click(find_config) #display(bt_browse) text_comment = widgets.Text(description= 'Comment:') check_elog = widgets.Checkbox(value=False, description='elog') display(widgets.HBox([bt_save,text_comment,check_elog,ht_save] )) def make_controller(): #devices vbs = [] names = {} wps = [] meas_check = {} for i in range(len(devices)): config = devices[i].get_config() wps += [wrapper(devices, i, io)] wgs = [] for item in sorted(config.items()): wgs += [interaction_tool(item[0], item[1], devices[i], config).widget] if hasattr(devices[i], 'measure'): meas_check[i] = widgets.Checkbox(description='measure') meas_check[i].observe(wps[i].measure, names = 'value') wgs += [meas_check[i]] vbs += [widgets.VBox(wgs)] names[i] = [type(devices[i]).__name__] tab = widgets.Tab(children = vbs, _titles= names) return tab, wps display(widgets.HTML('
Interact with devices
')) bt_refresh = widgets.Button(description='Refresh configs') global tab global wps tab, wps = make_controller() vb_config = widgets.VBox([bt_refresh, tab]) display(vb_config) def refresh(bt): global tab global wps tab.close() tab, wps = make_controller() vb_config.children = [bt_refresh, tab] bt_refresh.on_click(refresh) display(widgets.HTML('
Plot measure configuration
')) bt_plot_config_init = widgets.Button(description='Measure to initialize') global plot_config plot_config = {} def plot_configurator(bt): #devices vbs = [] names = {} wps = [] plot_meas_check = {} devices2measure = find_devices2measure() print (devices2measure) measurement = dm.measure(devices2measure) global plot_config plot_config = {} for i in range(len(devices2measure)): device_name = type(devices2measure[i]).__name__ plot_config[device_name] = {} wgs = [] for item in sorted(measurement['measure'][device_name].items()): wgs += [plot_config_tool(item[0], item[1], plot_config[device_name]).widget] if hasattr(devices2measure[i], 'plot_measure'): wgs += [plot_config_tool('plot_measure', 0, plot_config[device_name]).widget] vbs += [widgets.VBox(wgs)] names[i] = [type(devices2measure[i]).__name__] measure_tab = widgets.Tab(children = vbs, _titles= names) vb_plot_config.children = [bt_plot_config_init, measure_tab] bt_plot_config_init.on_click(plot_configurator) vb_plot_config = widgets.VBox([bt_plot_config_init]) display(vb_plot_config) display(widgets.HTML('
Make measurements
')) loop_inttext = widgets.IntText(value = 10, description = 'loop') output_check = widgets.Checkbox(description = 'output') save_check = widgets.Checkbox(description = 'save') display(widgets.HBox([loop_inttext, output_check, save_check])) rate_text = widgets.FloatText(value = 5, description = 'rate') display(rate_text) meas_bt = widgets.Button(description = 'measure') clear_hist_bt = widgets.Button(description = 'clear history') display(widgets.HBox([meas_bt, clear_hist_bt])) def find_devices2measure(): #load_devices to measure devices2measure = [] for i in range(len(devices)): if wps[i].measure_check: devices2measure += [devices[i]] return devices2measure def measure(bt, comment = '', name = None): devices2measure = find_devices2measure() #initialize parallelization if parallel: from multiprocessing import Process, Pipe class DataStreamProcess(Process): def __init__(self, connec, *args, **kwargs): self.connec = connec Process.__init__(self, *args, **kwargs) def run(self): loop_measure(parallel = True, connec = self.connec) #loop measure def loop_measure(parallel = False, connec = None): for i in range(loop_inttext.value): import IPython IPython.display.clear_output(wait = True) import time t = time.time() measurement = dm.measure(devices2measure) if save_check.value: if name == None: io.save(measurement, kind = 'measure', comment = comment) else: io.save(measurement, kind = 'measure', comment = comment, name = name+'_'+str(i)) if parallel: connec.send(measurement) elif output_check.value: dm.plot_measure(measurement, devices2measure,plot_config, 0.01) plt.pause(0.001) plt.draw_all(force = True) print ('maximal possible rate: ',np.round(1/(time.time() -t),1)) if (1/rate_text.value - (time.time() -t) < 0): print ('set rate cannot be reached.') else: time.sleep(1/rate_text.value - (time.time() -t)) if output_check.value and parallel: global plot_config conn1, conn2 = Pipe() data_stream = DataStreamProcess(conn1) data_stream.start() while True: if not(conn2.poll(0.1)): if not(data_stream.is_alive()): break else: continue measurement = conn2.recv() dm.plot_measure(measurement, devices2measure,plot_config, 0.01) plt.pause(0.001) plt.draw_all(force = True) else: loop_measure() def clear_hist(bt): import Globals Globals.plot_hist = {} meas_bt.on_click(measure) clear_hist_bt.on_click(clear_hist) display(widgets.HTML('Scanning Tool')) bt_scan = widgets.Button(description = 'start Scanning Tool') display(bt_scan) scanning_tool_box = widgets.VBox([]) display(scanning_tool_box) def scanning_tool(bt): def choose_device(bt): device = select_device.value def choose_parameter(bt): par_to_scan = select_scannable.value config = dm.get_config(devices) current_value = config[type(device).__name__][par_to_scan] def scan(bt): comment = 'scan of '+par_to_scan+' over '+str((minv.value, maxv.value, nsteps.value)) if save_check.value: import os import datetime folder_name = 'scan_'+par_to_scan+'_'+str(minv.value)+'-'+str(maxv.value)+'_'+str(nsteps.value)+'_'+str(datetime.datetime.now()).replace(':','-').replace(' ','_') os.mkdir(io.path + folder_name) conifg = dm.get_config(devices) io.save(config, name = folder_name+'/config_at_start',comment = 'before '+comment) else: folder_name = '' scan_values = np.linspace(minv.value, maxv.value, nsteps.value) for scan_value in scan_values: config[type(device).__name__][par_to_scan] = scan_value dm.set_config(devices, config) measure('', comment = comment+', current value: '+str(scan_value), name = folder_name+'/'+par_to_scan+'='+str(scan_value)) minv = widgets.FloatText(description='minv:',value = current_value) maxv = widgets.FloatText(description='maxv:',value = current_value) nsteps = widgets.IntText(description='nsteps:', value = 1) bt = widgets.Button(description='Run scan') bt.on_click(scan) menue = widgets.VBox([widgets.HTML('current value: '+str(current_value)), minv, maxv, nsteps, bt]) scanning_tool_box.children = list(scanning_tool_box.children) + [menue] scannable = [] for item in config[type(device).__name__].items(): if np.dtype(type(item[1])) == np.dtype('float64') or np.dtype(type(item[1])) == np.dtype(int): scannable += [item[0]] select_scannable = widgets.Select(description = 'parameter to scan: ',options = scannable) bt_scannable = widgets.Button(description = 'choose parameter') bt_scannable.on_click(choose_parameter) hb_scannable = widgets.HBox([select_scannable,bt_scannable]) scanning_tool_box.children = list(scanning_tool_box.children)[:1] + [hb_scannable] scanning_tool_box.children = [] device_dict = {} for device in devices: device_dict[type(device).__name__] = device select_device = widgets.Select(description = 'device to scan: ',options = device_dict) bt_device = widgets.Button(description = 'choose device') bt_device.on_click(choose_device) hb_device = widgets.HBox([select_device,bt_device]) scanning_tool_box.children = [hb_device] bt_scan.on_click(scanning_tool) def get_path(): import tkinter as tk from tkinter import filedialog root = tk.Tk() root.withdraw() return filedialog.askopenfilename()