Commit 326c5a5f authored by Advanced Instrumentation's avatar Advanced Instrumentation
Browse files

measurement parallelized with multiprocessing - not threading

parent 42c1235a
{
"cells": [],
"metadata": {},
"nbformat": 4,
"nbformat_minor": 2
}
......@@ -74,7 +74,11 @@ def plot_measure(data, devices, plot_config = {}, sleep = 0):
g.plot_hist[device_name][item[0]] = [measure[device_name][item[0]]]
plt.figure('history: '+device_name+' '+item[0])
plt.clf()
plt.plot(g.plot_hist[device_name][item[0]])
xx = range(-len(g.plot_hist[device_name][item[0]])+1,1)
plt.plot(xx,g.plot_hist[device_name][item[0]])
plt.xlabel('last measurements')
plt.ylabel('value')
plt.hlines([min(g.plot_hist[device_name][item[0]]), max(g.plot_hist[device_name][item[0]])],min(xx), max(xx))
plt.pause(sleep)
#important: sort by time and plot sepecific elements
......
......@@ -27,8 +27,8 @@ class FocusCamera:
def __init__(self, config):
self.psize=1.67 #mm
self.mm=int(1./self.psize)
self.psize=1.67 #um
self.um=int(1./self.psize)
self.ep=1.74/self.psize
self.connect_camera()
......@@ -67,7 +67,6 @@ class FocusCamera:
comment = 'beam detected'
else:
comment = 'No beam detected'
print (comment)
FWHMx = None
FWHMy = None
cxmm = None
......@@ -79,9 +78,23 @@ class FocusCamera:
def plot_measure(self, measurement, config = None):
im = measurement['im']
plt.figure('measurement: FocusCamera')
ax = plt.gca()
LA.find_beam(im,axis = ax)
ymin, ymax = plt.gca().get_ylim()
xmin, xmax = plt.gca().get_xlim()
plt.clf()
ax_beam = plt.subplot(111)
max_int = np.max(im)
if max_int > 1000:
maximum_pos, contour = LA.find_beam(im, axis = ax_beam)
ymin, ymax = ax_beam.get_ylim()
xmin, xmax = ax_beam.get_xlim()
else:
ax_beam.set_ylim([ymin, ymax])
ax_beam.set_xlim([xmin, xmax])
LA.showimage(im, axis = ax_beam)
def connect_camera(self):
......
......@@ -308,53 +308,70 @@ def main(devices, io):
devices2measure += [devices[i]]
return devices2measure
def measure(bt, comment = '', name = None):
devices2measure = find_devices2measure()
from multiprocessing import Process, Pipe
#loop measure
for i in range(loop_inttext.value):
import time
t = time.time()
measurement = dm.measure(devices2measure)
if save_check.value:
if name == None:
io.save(measurement,
kind = 'measure', comment = comment)
else:
io.save(measurement,
kind = 'measure', comment = comment, name = name+'_'+str(i))
if output_check.value:
global plot_config
import threading
still_plotting = True
while still_plotting:
still_plotting = False
for thread in threading.enumerate():
if thread.name == 'plotting':
print (thread.name)
still_plotting = True
continue
th = threading.Thread(target = dm.plot_measure, args = (measurement, devices2measure,
plot_config, 0.01), name = 'plotting')
th.start()
#dm.plot_measure(measurement, devices2measure,
# plot_config = plot_config)
plt.draw_all(force = True)
def loop_measure(parallel = False, connec = None):
for i in range(loop_inttext.value):
print ('maximal possible rate: ',np.round(1/(time.time() -t),1))
if (1/rate_text.value - (time.time() -t) < 0):
print ('set rate cannot be reached.')
else:
plt.pause(1/rate_text.value - (time.time() -t))
else:
import time
t = time.time()
measurement = dm.measure(devices2measure)
if save_check.value:
if name == None:
io.save(measurement,
kind = 'measure', comment = comment)
else:
io.save(measurement,
kind = 'measure', comment = comment, name = name+'_'+str(i))
if parallel:
connec.send(measurement)
print ('maximal possible rate: ',np.round(1/(time.time() -t),1))
if (1/rate_text.value - (time.time() -t) < 0):
print ('set rate cannot be reached.')
else:
time.sleep(1/rate_text.value - (time.time() -t))
plt.pause(0.0001)
import IPython
IPython.display.clear_output(wait = True)
import IPython
IPython.display.clear_output(wait = True)
class DataStreamProcess(Process):
def __init__(self, connec, *args, **kwargs):
self.connec = connec
Process.__init__(self, *args, **kwargs)
def run(self):
loop_measure(parallel = True, connec = self.connec)
if output_check.value:
global plot_config
conn1, conn2 = Pipe()
data_stream = DataStreamProcess(conn1)
data_stream.start()
while True:
if not(conn2.poll(0.1)):
if not(data_stream.is_alive()):
break
else:
continue
measurement = conn2.recv()
dm.plot_measure(measurement, devices2measure,plot_config, 0.01)
plt.pause(0.001)
plt.draw_all(force = True)
else:
loop_measure()
print ('Cheese')
def clear_hist(bt):
......
......@@ -22,7 +22,7 @@
},
{
"cell_type": "code",
"execution_count": 5,
"execution_count": 2,
"metadata": {
"collapsed": false,
"deletable": true,
......@@ -33,7 +33,7 @@
"name": "stdout",
"output_type": "stream",
"text": [
"Using matplotlib backend: Qt5Agg\n"
"Using matplotlib backend: MacOSX\n"
]
}
],
......@@ -116,7 +116,10 @@
},
{
"cell_type": "markdown",
"metadata": {},
"metadata": {
"deletable": true,
"editable": true
},
"source": [
"## FocusCamera"
]
......@@ -125,7 +128,9 @@
"cell_type": "code",
"execution_count": 3,
"metadata": {
"collapsed": false
"collapsed": false,
"deletable": true,
"editable": true
},
"outputs": [
{
......@@ -156,7 +161,9 @@
"cell_type": "code",
"execution_count": 7,
"metadata": {
"collapsed": false
"collapsed": false,
"deletable": true,
"editable": true
},
"outputs": [
{
......@@ -175,7 +182,9 @@
"cell_type": "code",
"execution_count": 10,
"metadata": {
"collapsed": false
"collapsed": false,
"deletable": true,
"editable": true
},
"outputs": [
{
......@@ -555,7 +564,7 @@
},
{
"cell_type": "code",
"execution_count": 1,
"execution_count": 3,
"metadata": {
"collapsed": false,
"deletable": true,
......@@ -581,20 +590,13 @@
},
{
"cell_type": "code",
"execution_count": 2,
"execution_count": 25,
"metadata": {
"collapsed": false,
"deletable": true,
"editable": true
},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"ERROR:root:Line magic function `%autoreload` not found.\n"
]
},
{
"name": "stdout",
"output_type": "stream",
......@@ -619,7 +621,7 @@
},
{
"cell_type": "code",
"execution_count": 3,
"execution_count": 26,
"metadata": {
"collapsed": false,
"deletable": true,
......@@ -633,7 +635,7 @@
},
{
"cell_type": "code",
"execution_count": 4,
"execution_count": 27,
"metadata": {
"collapsed": false,
"deletable": true,
......@@ -645,54 +647,7 @@
"name": "stdout",
"output_type": "stream",
"text": [
"test_device1 measured\n",
"test_device2 measured\n",
"plotting\n",
"plotting\n",
"plotting\n",
"plotting\n",
"plotting\n",
"plotting\n",
"plotting\n",
"plotting\n",
"plotting\n",
"plotting\n",
"plotting\n",
"plotting\n",
"plotting\n",
"plotting\n",
"plotting\n",
"plotting\n",
"plotting\n",
"plotting\n",
"plotting\n",
"plotting\n",
"plotting\n",
"plotting\n",
"plotting\n",
"plotting\n",
"plotting\n",
"plotting\n",
"plotting\n",
"plotting\n",
"plotting\n",
"plotting\n",
"plotting\n",
"plotting\n",
"plotting\n",
"plotting\n",
"plotting\n",
"plotting\n",
"plotting\n",
"plotting\n",
"plotting\n",
"plotting\n",
"plotting\n",
"plotting\n",
"plotting\n",
"plotting\n",
"plotting\n",
"maximal possible rate: 9.9\n"
"set rate cannot be reached.\n"
]
}
],
......@@ -786,297 +741,6 @@
"dm.get_config(devices)"
]
},
{
"cell_type": "code",
"execution_count": 26,
"metadata": {
"collapsed": true,
"deletable": true,
"editable": true
},
"outputs": [],
"source": [
"import threading"
]
},
{
"cell_type": "code",
"execution_count": 28,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"5"
]
},
"execution_count": 28,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"threading.active_count()"
]
},
{
"cell_type": "code",
"execution_count": 30,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Help on function wait in module threading:\n",
"\n",
"wait(self, timeout=None)\n",
" Wait for the barrier.\n",
" \n",
" When the specified number of threads have started waiting, they are all\n",
" simultaneously awoken. If an 'action' was provided for the barrier, one\n",
" of the threads will have executed that callback prior to returning.\n",
" Returns an individual index number from 0 to 'parties-1'.\n",
"\n"
]
}
],
"source": [
"help(threading.Barrier.wait)"
]
},
{
"cell_type": "code",
"execution_count": 62,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"[<_MainThread(MainThread, started 140735297941504)>,\n",
" <Thread(Thread-2, started daemon 123145308631040)>,\n",
" <Heartbeat(Thread-3, started daemon 123145313886208)>,\n",
" <HistorySavingThread(IPythonHistorySavingThread, started 123145320214528)>,\n",
" <ParentPollerUnix(Thread-1, started daemon 123145325469696)>]"
]
},
"execution_count": 62,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"lth = threading.enumerate()\n",
"lth"
]
},
{
"cell_type": "code",
"execution_count": 57,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"I am doing my stuff\n"
]
}
],
"source": [
"th.start()"
]
},
{
"cell_type": "code",
"execution_count": 56,
"metadata": {
"collapsed": false
},
"outputs": [],
"source": [
"th = lth[1]\n",
"def test():\n",
" import time\n",
" print ('I am doing my stuff')\n",
" time.sleep(5)\n",
"\n",
"th = threading.Thread(target = test, name = 'test')"
]
},
{
"cell_type": "code",
"execution_count": 39,
"metadata": {
"collapsed": false
},
"outputs": [
{
"data": {
"text/plain": [
"'Thread-2'"
]
},
"execution_count": 39,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"th.getName()"
]
},
{
"cell_type": "code",
"execution_count": 40,
"metadata": {
"collapsed": false
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Help on class Thread in module threading:\n",
"\n",
"class Thread(builtins.object)\n",
" | A class that represents a thread of control.\n",
" | \n",
" | This class can be safely subclassed in a limited fashion. There are two ways\n",
" | to specify the activity: by passing a callable object to the constructor, or\n",
" | by overriding the run() method in a subclass.\n",
" | \n",
" | Methods defined here:\n",
" | \n",
" | __init__(self, group=None, target=None, name=None, args=(), kwargs=None, *, daemon=None)\n",
" | This constructor should always be called with keyword arguments. Arguments are:\n",
" | \n",
" | *group* should be None; reserved for future extension when a ThreadGroup\n",
" | class is implemented.\n",
" | \n",
" | *target* is the callable object to be invoked by the run()\n",
" | method. Defaults to None, meaning nothing is called.\n",
" | \n",
" | *name* is the thread name. By default, a unique name is constructed of\n",
" | the form \"Thread-N\" where N is a small decimal number.\n",
" | \n",
" | *args* is the argument tuple for the target invocation. Defaults to ().\n",
" | \n",
" | *kwargs* is a dictionary of keyword arguments for the target\n",
" | invocation. Defaults to {}.\n",
" | \n",
" | If a subclass overrides the constructor, it must make sure to invoke\n",
" | the base class constructor (Thread.__init__()) before doing anything\n",
" | else to the thread.\n",
" | \n",
" | __repr__(self)\n",
" | Return repr(self).\n",
" | \n",
" | getName(self)\n",
" | \n",
" | isAlive = is_alive(self)\n",
" | \n",
" | isDaemon(self)\n",
" | \n",
" | is_alive(self)\n",
" | Return whether the thread is alive.\n",
" | \n",
" | This method returns True just before the run() method starts until just\n",
" | after the run() method terminates. The module function enumerate()\n",
" | returns a list of all alive threads.\n",
" | \n",
" | join(self, timeout=None)\n",
" | Wait until the thread terminates.\n",
" | \n",
" | This blocks the calling thread until the thread whose join() method is\n",
" | called terminates -- either normally or through an unhandled exception\n",
" | or until the optional timeout occurs.\n",
" | \n",
" | When the timeout argument is present and not None, it should be a\n",
" | floating point number specifying a timeout for the operation in seconds\n",
" | (or fractions thereof). As join() always returns None, you must call\n",
" | isAlive() after join() to decide whether a timeout happened -- if the\n",
" | thread is still alive, the join() call timed out.\n",
" | \n",
" | When the timeout argument is not present or None, the operation will\n",
" | block until the thread terminates.\n",
" | \n",
" | A thread can be join()ed many times.\n",
" | \n",
" | join() raises a RuntimeError if an attempt is made to join the current\n",
" | thread as that would cause a deadlock. It is also an error to join() a\n",
" | thread before it has been started and attempts to do so raises the same\n",
" | exception.\n",
" | \n",
" | run(self)\n",
" | Method representing the thread's activity.\n",
" | \n",
" | You may override this method in a subclass. The standard run() method\n",
" | invokes the callable object passed to the object's constructor as the\n",
" | target argument, if any, with sequential and keyword arguments taken\n",
" | from the args and kwargs arguments, respectively.\n",
" | \n",
" | setDaemon(self, daemonic)\n",
" | \n",
" | setName(self, name)\n",
" | \n",
" | start(self)\n",
" | Start the thread's activity.\n",
" | \n",
" | It must be called at most once per thread object. It arranges for the\n",
" | object's run() method to be invoked in a separate thread of control.\n",
" | \n",
" | This method will raise a RuntimeError if called more than once on the\n",
" | same thread object.\n",
" | \n",
" | ----------------------------------------------------------------------\n",
" | Data descriptors defined here:\n",
" | \n",
" | __dict__\n",
" | dictionary for instance variables (if defined)\n",
" | \n",
" | __weakref__\n",
" | list of weak references to the object (if defined)\n",
" | \n",
" | daemon\n",
" | A boolean value indicating whether this thread is a daemon thread.\n",
" | \n",
" | This must be set before start() is called, otherwise RuntimeError is\n",
" | raised. Its initial value is inherited from the creating thread; the\n",
" | main thread is not a daemon thread and therefore all threads created in\n",
" | the main thread default to daemon = False.\n",
" | \n",
" | The entire Python program exits when no alive non-daemon threads are\n",
" | left.\n",
" | \n",
" | ident\n",
" | Thread identifier of this thread or None if it has not been started.\n",
" | \n",
" | This is a nonzero integer. See the thread.get_ident() function. Thread\n",
" | identifiers may be recycled when a thread exits and another thread is\n",
" | created. The identifier is available even after the thread has exited.\n",
" | \n",
" | name\n",
" | A string used for identification purposes only.\n",
" | \n",
" | It has no semantics. Multiple threads may be given the same name. The\n",
" | initial name is set by the constructor.\n",
"\n"
]
}
],
"source": [
"help(threading.Thread)"
]
},
{
"cell_type": "code",
"execution_count": null,
......@@ -1090,7 +754,7 @@
"metadata": {
"anaconda-cloud": {},
"kernelspec": {
"display_name": "Python [default]",
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
......@@ -1104,7 +768,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.5.2"
"version": "3.6.0"