Code indexing in gitaly is broken and leads to code not being visible to the user. We work on the issue with highest priority.

Skip to content
Snippets Groups Projects
Unverified Commit dbc643ab authored by JakHolzer's avatar JakHolzer Committed by GitHub
Browse files

Update param_study_moduls.py

Updated the create_dataframe and added function called variables, which tries to decide which variables to plot in parametric study and q scans. Works good for primary variable (usually om), and reduces the secondary (slice variable, temperature, mag.field,...) variables to a few candidates from which one has to be picked. In one set for param study, it identified all parameters correctly, in q scan, the temperature varied as well as H index, so technically both could be used, but only one makes sense and that will have to be picked by user.
parent 08567050
No related branches found
No related tags found
No related merge requests found
......@@ -7,10 +7,10 @@ import pandas as pd
import scipy.io as sio
import uncertainties as u
from mpl_toolkits.mplot3d import Axes3D # dont delete, otherwise waterfall wont work
import collections
from .ccl_io import load_1D
def create_tuples(x, y, y_err):
"""creates tuples for sorting and merginng of the data
Counts need to be normalized to monitor before"""
......@@ -49,45 +49,45 @@ def load_dats(filepath):
if data_type == "txt":
dict1 = add_dict(dict1, load_1D(file_list[i][0]))
else:
dict1 = add_dict(dict1, load_1D(file_list[i]))
dict1["scan"][i + 1]["params"] = {}
if data_type == "txt":
for x in range(len(col_names) - 1):
dict1["scan"][i + 1]["params"][col_names[x + 1]] = file_list[i][x + 1]
dict1["scan"][i + 1]["params"][col_names[x + 1]] = float(file_list[i][x + 1])
return dict1
def create_dataframe(dict1):
def create_dataframe(dict1, variables):
"""Creates pandas dataframe from the dictionary
:arg ccl like dictionary
:return pandas dataframe"""
# create dictionary to which we pull only wanted items before transforming it to pd.dataframe
pull_dict = {}
pull_dict["filenames"] = list()
for key in dict1["scan"][1]["params"]:
pull_dict[key] = list()
pull_dict["temperature"] = list()
pull_dict["mag_field"] = list()
for keys in variables:
for item in variables[keys]:
pull_dict[item] = list()
pull_dict["fit_area"] = list()
pull_dict["int_area"] = list()
pull_dict["om"] = list()
pull_dict["Counts"] = list()
for keys in pull_dict:
print(keys)
# populate the dict
for keys in dict1["scan"]:
if "file_of_origin" in dict1["scan"][keys]:
pull_dict["filenames"].append(dict1["scan"][keys]["file_of_origin"].split("/")[-1])
else:
pull_dict["filenames"].append(dict1["meta"]["original_filename"].split("/")[-1])
for key in dict1["scan"][keys]["params"]:
pull_dict[str(key)].append(float(dict1["scan"][keys]["params"][key]))
pull_dict["temperature"].append(dict1["scan"][keys]["temperature"])
pull_dict["mag_field"].append(dict1["scan"][keys]["mag_field"])
pull_dict["fit_area"].append(dict1["scan"][keys]["fit"]["fit_area"])
pull_dict["int_area"].append(dict1["scan"][keys]["fit"]["int_area"])
pull_dict["om"].append(dict1["scan"][keys]["om"])
pull_dict["Counts"].append(dict1["scan"][keys]["Counts"])
for key in variables:
for i in variables[key]:
pull_dict[i].append(_finditem(dict1["scan"][keys], i))
return pd.DataFrame(data=pull_dict)
......@@ -213,7 +213,8 @@ def save_table(data, filetype, name, path=None):
if filetype == "json":
data.to_json((path + name + ".json"))
def normalize(dict, key, monitor):
def normalize(scan, monitor):
"""Normalizes the measurement to monitor, checks if sigma exists, otherwise creates it
:arg dict : dictionary to from which to tkae the scan
:arg key : which scan to normalize from dict1
......@@ -221,15 +222,16 @@ def normalize(dict, key, monitor):
:return counts - normalized counts
:return sigma - normalized sigma"""
counts = np.array(dict["scan"][key]["Counts"])
sigma = np.sqrt(counts) if "sigma" not in dict["scan"][key] else dict["scan"][key]["sigma"]
monitor_ratio = monitor / dict["scan"][key]["monitor"]
counts = np.array(scan["Counts"])
sigma = np.sqrt(counts) if "sigma" not in scan else scan["sigma"]
monitor_ratio = monitor / scan["monitor"]
scaled_counts = counts * monitor_ratio
scaled_sigma = np.array(sigma) * monitor_ratio
return scaled_counts, scaled_sigma
def merge(dict1, dict2, scand_dict_result, keep=True, monitor=100000):
def merge(scan1, scan2, keep=True, monitor=100000):
"""merges the two tuples and sorts them, if om value is same, Counts value is average
averaging is propagated into sigma if dict1 == dict2, key[1] is deleted after merging
:arg dict1 : dictionary to which measurement will be merged
......@@ -240,62 +242,45 @@ def merge(dict1, dict2, scand_dict_result, keep=True, monitor=100000):
:arg monitor : final monitor after merging
note: dict1 and dict2 can be same dict
:return dict1 with merged scan"""
for keys in scand_dict_result:
for j in range(len(scand_dict_result[keys])):
first, second = scand_dict_result[keys][j][0], scand_dict_result[keys][j][1]
print(first, second)
if keep:
if dict1["scan"][first]["monitor"] == dict2["scan"][second]["monitor"]:
monitor = dict1["scan"][first]["monitor"]
# load om and Counts
x1, x2 = dict1["scan"][first]["om"], dict2["scan"][second]["om"]
cor_y1, y_err1 = normalize(dict1, first, monitor=monitor)
cor_y2, y_err2 = normalize(dict2, second, monitor=monitor)
# creates touples (om, Counts, sigma) for sorting and further processing
tuple_list = create_tuples(x1, cor_y1, y_err1) + create_tuples(x2, cor_y2, y_err2)
# Sort the list on om and add 0 0 0 tuple to the last position
sorted_t = sorted(tuple_list, key=lambda tup: tup[0])
sorted_t.append((0, 0, 0))
om, Counts, sigma = [], [], []
seen = list()
for i in range(len(sorted_t) - 1):
if sorted_t[i][0] not in seen:
if sorted_t[i][0] != sorted_t[i + 1][0]:
om = np.append(om, sorted_t[i][0])
Counts = np.append(Counts, sorted_t[i][1])
sigma = np.append(sigma, sorted_t[i][2])
else:
om = np.append(om, sorted_t[i][0])
counts1, counts2 = sorted_t[i][1], sorted_t[i + 1][1]
sigma1, sigma2 = sorted_t[i][2], sorted_t[i + 1][2]
count_err1 = u.ufloat(counts1, sigma1)
count_err2 = u.ufloat(counts2, sigma2)
avg = (count_err1 + count_err2) / 2
Counts = np.append(Counts, avg.n)
sigma = np.append(sigma, avg.s)
seen.append(sorted_t[i][0])
else:
continue
if dict1 == dict2:
del dict1["scan"][second]
note = (
f"This measurement was merged with measurement {second} from "
f'file {dict2["meta"]["original_filename"]} \n'
)
if "notes" not in dict1["scan"][first]:
dict1["scan"][first]["notes"] = note
if keep:
if scan1["monitor"] == scan2["monitor"]:
monitor = scan1["monitor"]
# load om and Counts
x1, x2 = scan1["om"], scan2["om"]
cor_y1, y_err1 = normalize(scan1, monitor=monitor)
cor_y2, y_err2 = normalize(scan2, monitor=monitor)
# creates touples (om, Counts, sigma) for sorting and further processing
tuple_list = create_tuples(x1, cor_y1, y_err1) + create_tuples(x2, cor_y2, y_err2)
# Sort the list on om and add 0 0 0 tuple to the last position
sorted_t = sorted(tuple_list, key=lambda tup: tup[0])
sorted_t.append((0, 0, 0))
om, Counts, sigma = [], [], []
seen = list()
for i in range(len(sorted_t) - 1):
if sorted_t[i][0] not in seen:
if sorted_t[i][0] != sorted_t[i + 1][0]:
om = np.append(om, sorted_t[i][0])
Counts = np.append(Counts, sorted_t[i][1])
sigma = np.append(sigma, sorted_t[i][2])
else:
dict1["scan"][first]["notes"] += note
dict1["scan"][first]["om"] = om
dict1["scan"][first]["Counts"] = Counts
dict1["scan"][first]["sigma"] = sigma
dict1["scan"][first]["monitor"] = monitor
print("merging done")
return dict1
om = np.append(om, sorted_t[i][0])
counts1, counts2 = sorted_t[i][1], sorted_t[i + 1][1]
sigma1, sigma2 = sorted_t[i][2], sorted_t[i + 1][2]
count_err1 = u.ufloat(counts1, sigma1)
count_err2 = u.ufloat(counts2, sigma2)
avg = (count_err1 + count_err2) / 2
Counts = np.append(Counts, avg.n)
sigma = np.append(sigma, avg.s)
seen.append(sorted_t[i][0])
else:
continue
scan1["om"] = om
scan1["Counts"] = Counts
scan1["sigma"] = sigma
scan1["monitor"] = monitor
print("merging done")
def add_dict(dict1, dict2):
......@@ -306,9 +291,13 @@ def add_dict(dict1, dict2):
:return dict1 : combined dictionary
Note: dict1 must be made from ccl, otherwise we would have to change the structure of loaded
dat file"""
if dict1["meta"]["zebra_mode"] != dict2["meta"]["zebra_mode"]:
print("You are trying to add scans measured with different zebra modes")
return
try:
if dict1["meta"]["zebra_mode"] != dict2["meta"]["zebra_mode"]:
print("You are trying to add scans measured with different zebra modes")
return
# this is for the qscan case
except KeyError:
print("Zebra mode not specified")
max_measurement_dict1 = max([keys for keys in dict1["scan"]])
new_filenames = np.arange(
max_measurement_dict1 + 1, max_measurement_dict1 + 1 + len(dict2["scan"])
......@@ -371,6 +360,9 @@ def scan_dict(dict, precision=0.5):
itup.append(abs(abs(dict["scan"][i][k]) - abs(dict["scan"][j][k])))
if all(i <= precision for i in itup):
print(itup)
print([dict["scan"][i][k] for k in angles])
print([dict["scan"][j][k] for k in angles])
if str([np.around(dict["scan"][i][k], 0) for k in angles]) not in d:
d[str([np.around(dict["scan"][i][k], 0) for k in angles])] = list()
d[str([np.around(dict["scan"][i][k], 0) for k in angles])].append((i, j))
......@@ -382,4 +374,115 @@ def scan_dict(dict, precision=0.5):
else:
continue
return d
def _finditem(obj, key):
if key in obj:
return obj[key]
for k, v in obj.items():
if isinstance(v, dict):
item = _finditem(v, key)
if item is not None:
return item
def most_common(lst):
return max(set(lst), key=lst.count)
def variables(dictionary):
"""Funcrion to guess what variables will be used in the param study
i call pripary variable the one the array like variable, usually omega
and secondary the slicing variable, different for each scan,for example temperature"""
# find all variables that are in all scans
stdev_precision = 0.05
all_vars = list()
for keys in dictionary["scan"]:
all_vars.append([key for key in dictionary["scan"][keys] if key != "params"])
if dictionary["scan"][keys]["params"]:
all_vars.append(key for key in dictionary["scan"][keys]["params"])
all_vars = [i for sublist in all_vars for i in sublist]
# get the ones that are in all scans
b = collections.Counter(all_vars)
inall = [key for key in b if b[key] == len(dictionary["scan"])]
# delete those that are obviously wrong
wrong = [
"NP",
"Counts",
"Monitor1",
"Monitor2",
"Monitor3",
"h_index",
"l_index",
"k_index",
"number_of_measurements",
"monitor",
"Time",
"omega_angle",
"twotheta_angle",
"chi_angle",
"phi_angle",
"nu_angle",
]
inall_red = [i for i in inall if i not in wrong]
# check for primary variable, needs to be list, we dont suspect the
# primary variable be as a parameter (be in scan[params])
primary_candidates = list()
for key in dictionary["scan"]:
for i in inall_red:
if isinstance(_finditem(dictionary["scan"][key], i), list):
if np.std(_finditem(dictionary["scan"][key], i)) > stdev_precision:
primary_candidates.append(i)
# check which of the primary are in every scan
primary_candidates = collections.Counter(primary_candidates)
second_round_primary_candidates = [
key for key in primary_candidates if primary_candidates[key] == len(dictionary["scan"])
]
if len(second_round_primary_candidates) == 1:
print("We've got a primary winner!", second_round_primary_candidates)
else:
print("Still not sure with primary:(", second_round_primary_candidates)
# check for secondary variable, we suspect a float\int or not changing array
# we dont need to check for primary ones
secondary_candidates = [i for i in inall_red if i not in second_round_primary_candidates]
# print("secondary candidates", secondary_candidates)
# select arrays and floats and ints
second_round_secondary_candidates = list()
for key in dictionary["scan"]:
for i in secondary_candidates:
if isinstance(_finditem(dictionary["scan"][key], i), float):
second_round_secondary_candidates.append(i)
elif isinstance(_finditem(dictionary["scan"][key], i), int):
second_round_secondary_candidates.append(i)
elif isinstance(_finditem(dictionary["scan"][key], i), list):
if np.std(_finditem(dictionary["scan"][key], i)) < stdev_precision:
second_round_secondary_candidates.append(i)
second_round_secondary_candidates = collections.Counter(second_round_secondary_candidates)
second_round_secondary_candidates = [
key
for key in second_round_secondary_candidates
if second_round_secondary_candidates[key] == len(dictionary["scan"])
]
# print("secondary candidates after second round", second_round_secondary_candidates)
# now we check if they vary between the scans
third_round_sec_candidates = list()
for i in second_round_secondary_candidates:
check_array = list()
for keys in dictionary["scan"]:
check_array.append(np.average(_finditem(dictionary["scan"][keys], i)))
# print(i, check_array, np.std(check_array))
if np.std(check_array) > stdev_precision:
third_round_sec_candidates.append(i)
if len(third_round_sec_candidates) == 1:
print("We've got a secondary winner!", third_round_sec_candidates)
else:
print("Still not sure with secondary :(", third_round_sec_candidates)
return {"primary": second_round_primary_candidates, "secondary": third_round_sec_candidates}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment