Code indexing in gitaly is broken and leads to code not being visible to the user. We work on the issue with highest priority.

Skip to content
Snippets Groups Projects
Commit 92442b70 authored by schuma_w's avatar schuma_w
Browse files

Copper neural net 0.90 r2

parent adb14d3a
No related branches found
No related tags found
No related merge requests found
import numpy as np
import torch
import random
# Your existing code
from Last_Preprocess_Overall import Make_Features_and_Labels
import torch.nn as nn
from sklearn.model_selection import KFold
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import torch.nn.functional as F
from sklearn.metrics import r2_score
from sklearn.preprocessing import MinMaxScaler,StandardScaler
import difflib
from sklearn.datasets import load_iris #Just to check
import inspect
some_list = ["pH","Leitfahigkeit","L/S kumuliert","Calcium","Arsenic", "Lead (Pb+2)","Copper",
"Manganese","Nickel","Zinc","Cobalt", "Aluminium","Chromium","Antimony","Chloride","Sulfate","Cadmium"] #Missing: Molybdan
some_list_2 = ["Calcium","Arsenic", "Lead (Pb+2)","Copper",
"Manganese","Nickel","Zinc","Cobalt", "Aluminium","Chromium","Antimony","Chloride","Sulfate","Cadmium"]
Features, Labels,_,names = Make_Features_and_Labels(some_list, method="Steady",add_noise=False,std_width = 0.01,factor=4,add_extra = False,Boolnorm = True) #method = Rolling, Steady
#add noise: adds noisy data. Adding noise in "Machine_Learning" will only add the noise when we
def Machine_Learning(Features, Labels,names,K_folds,seed,std_width = 0.1, add_noise = True, factor = 5,name = "Copper (Cu)", reduced_input = True,Archit = False):
'''
Parameters
----------
Features : Features for the model
Labels : Labels for the model
K_folds : How many folds using kfolds
std_width : if add_noise==True: Create new data, std_width=0.01 draws gaussian samples about
other values with std_dev 0.01. The default is 0.01.
add_noise : Boolean. DEfault = True
factor : Integer. Factor by how much should we increase data? if 3, our dataset will consist of original,
plus 3 with added noise, making the training data 4 times as big. The default is 3.
names: List of names in the data, like Calcium, L/S, etc;
name: The target label. Model will only predict the name of this label.
Returns
-------
x : TYPE
DESCRIPTION.
'''
print("")
print("Currently fitting for: ",str(name), "Using seed: ",str(seed))
shape_1 = np.shape(Features)
shape_2 = np.shape(Labels)
# print(shape_1,shape_2)
# Define the number of folds
k_folds = K_folds
# Initialize the KFold object
kf = KFold(n_splits=k_folds, shuffle = True, random_state=seed)
# Initialize lists to store the R-squared scores for each fold
r2_scores = []
# Convert feature matrix and labels to numpy array
Feature_Matrix_ = np.array(Features)
Label_Vector_ = np.array(Labels)
# Initialize MinMaxScaler
scaler = MinMaxScaler() #C = (c_i - c_min)/(c_max - c_min) -> Range in [0,1]
# Create DataLoader for training and testing
for train_index, test_index in kf.split(Feature_Matrix_):
X_train_fold, X_test_fold = Feature_Matrix_[train_index], Feature_Matrix_[test_index]
y_train_fold, y_test_fold = Label_Vector_[train_index], Label_Vector_[test_index]
#Here we should make more data from noise on the train folds. Perhaps this will give us convergence.
Feat_list = list(X_train_fold)
Lab_list = list(y_train_fold)
if add_noise: #This adds noisy data to the testing folds, to create more to train on.
#Here we should make more data from noise on the train folds. Perhaps this will give us convergence.
Feat_list = list(X_train_fold)
Lab_list = list(y_train_fold)
m,n = np.shape(Feat_list)
noise_Feats = []
noise_Labs = []
for times in range(factor):
for row in Feat_list: #This moves row by row
temp_row = []
for value in row: #This moves along the row
# std_dev = np.abs(value*std_width) #This is how big the fluctuation of the data should be. i.e "how much noise".
# print(std_dev)
std_dev = std_width
noise = np.random.normal(value, std_dev,1)[0] #This creates some noise.
temp_row.append(noise)
noise_Feats.append(temp_row)
for row in Lab_list:
temp_row = []
for value in row:
# std_dev = np.abs(value*std_width) #This is how big the fluctuation of the data should be. i.e how much noise.
std_dev = std_width
noise = np.random.normal(value, std_dev,1)[0] #This creates some noise.
temp_row.append(noise)
noise_Labs.append(temp_row)
Feat_list = Feat_list + noise_Feats
Lab_list = Lab_list + noise_Labs
X_train_fold = np.array(Feat_list)
y_train_fold = np.array(Lab_list)
if len(name)!=0:
close_match = difflib.get_close_matches(name, names, n=1,cutoff = 0.6)
index = (np.where(np.array(names) == close_match)[0][0])-3 #-3 since first three are removed from labels, and names has len(features)
y_train_fold = y_train_fold[:,index]
y_train_fold = y_train_fold.reshape(-1,1)
y_test_fold = y_test_fold[:,index]
y_test_fold = y_test_fold.reshape(-1,1)
shape_2 = np.shape(y_train_fold)
if reduced_input: #This will make only LS, Leitf, etc plus element of output to input.
X_train_fold = np.concatenate((X_train_fold[:, :3], X_train_fold[:, index+3].reshape(-1, 1)), axis=1)
# X_train_fold = X_train_fold.reshape(-1,1)
X_test_fold = np.concatenate((X_test_fold[:, :3], X_test_fold[:, index+3].reshape(-1, 1)), axis=1)
shape_1 = np.shape(X_train_fold)
# Scale the features and labels
X_train_fold = scaler.fit_transform(X_train_fold)
X_test_fold = scaler.transform(X_test_fold)
# print(np.shape(X_train_fold))
# print(np.shape(X_test_fold))
# y_train_fold = scaler.fit_transform(y_train_fold.reshape(-1, 1)).reshape(-1)
# y_test_fold = scaler.transform(y_test_fold.reshape(-1, 1)).reshape(-1)
# Convert data to torch tensors
X_train_tensor = torch.tensor(X_train_fold, dtype=torch.float32)
y_train_tensor = torch.tensor(y_train_fold, dtype=torch.float32)
X_test_tensor = torch.tensor(X_test_fold, dtype=torch.float32)
y_test_tensor = torch.tensor(y_test_fold, dtype=torch.float32)
# Create DataLoader for training
train_loader = DataLoader(TensorDataset(X_train_tensor, y_train_tensor), batch_size=16, shuffle=True)
test_loader = DataLoader(TensorDataset(X_test_tensor, y_test_tensor), batch_size=16)
# Define the model
class Net(nn.Module):
"""
The model class, which defines our feature extractor used in pretraining.
"""
def __init__(self):
"""
The constructor of the model.
"""
super().__init__()
# Define the architecture of the model.
self.fc = nn.Linear(shape_1[1], 120) #These guys allow for weight training.
self.fc1 = nn.Linear(120, 50)
self.fc2 = nn.Linear(30,50)
self.fc3 = nn.Linear(50,50)
self.fcfinal = nn.Linear(120, shape_2[1])
self.weight_layer = nn.Linear(shape_2[1],shape_2[1])
self.dropout = nn.Dropout(p=0.3)
self.leaky = nn.LeakyReLU(0.01)
self.weight2_layer = nn.Linear(shape_2[1],shape_2[1])
self.trainable_constant = nn.Parameter(torch.randn(1))
def forward(self, x):
"""
The forward pass of the model.
input: x: torch.Tensor, the input to the model
output: x: torch.Tensor, the output of the model
"""
# Amplitude = x[:,index+3].unsqueeze(-1) #Unsqueeze makes it compatible with batch dimensions.
# LS = x[:,2].unsqueeze(-1)
x = self.leaky(self.fc(x)) #Sigmoid early, to get the LS-Element thing going.
x = F.elu(-x)
x = self.dropout(x)
x = self.fcfinal(x)
# if torch.isnan(x).any():
# print("x has a nan value: final ",x)
# quit()
weight2 = self.weight2_layer(x)
weight = self.weight_layer(x)
x = torch.exp(-x)
x = weight*x #This should create an exponential fit like Aexp(-bx)
return x + self.trainable_constant #By extension, this should then look like Aexp(-bx) + c
# return torch.exp(-x) + self.trainable_constant
# return x + self.trainable_constant
return x #This with lr 0.001 gives 0.74 on first
def get_architecture(self):
architecture = []
forward_source = inspect.getsource(self.forward)
# Add the source code of the forward method to architecture
architecture.append("Forward Method:")
architecture.extend(forward_source.split('\n')[1:]) # Skip the first line (def forward(self, x):)
return architecture
# return x
# Create an instance of the model
model = Net()
# Define loss function and optimizer
criterion = nn.SmoothL1Loss() #nn.SmoothL1Loss(), nn.MSELoss()
learning_rate = 0.005
optimizer = optim.SGD(model.parameters(), lr=learning_rate) #Setting this high makes "grads*lr" too large, eventually creating infs when using MSELoss
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.25, patience=2500, verbose=False) #Verbose prints.
# Train the model
num_epochs = 10000
if Archit == True:#Just get the architecture.
avg_r2_score,MSE_loss,Arch = 0,0, model.get_architecture()
Arch.append(f"Number of epochs: {num_epochs}")
Arch.append(f"Learning rate: {learning_rate}")
return avg_r2_score,MSE_loss,Arch
#Since the model may overshoot towards the end of the training,we keep track of the best model as it goes.
best_r2_score = -np.inf
best_model_state = None
epoch = 0
while epoch < (num_epochs): #Changed this to while, since stuff must be dynamically changed.
epoch+=1
model.train() # Set the model to training mode.
running_loss = 0.0
for inputs, labels in train_loader: # Iterate over the training data
optimizer.zero_grad() # Zero the gradients.
outputs = model(inputs)
loss = criterion(outputs, labels) # Define the loss.
loss.backward() # Backward propagation.
optimizer.step() # Update parameters.
running_loss += loss.item() * inputs.size(0)
# print(running_loss)
epoch_loss = running_loss / len(train_loader.dataset)
if epoch % 10 == 0:
# print(f"Epoch [{epoch + 1}/{num_epochs}], Loss: {epoch_loss:.4f}")
test_loss = 0.0
y_true = []
y_pred = []
with torch.no_grad(): #The model is NOT training on this. This is on the validation.
for inputs, labels in test_loader:
outputs = model(inputs)
test_loss += criterion(outputs, labels).item() * inputs.size(0)
y_true.extend(labels.numpy())
y_pred.extend(outputs.numpy())
r2 = r2_score(y_true, y_pred)
# to_be_printed = np.round(r2_score(y_true, y_pred),3)
if r2>best_r2_score and epoch > num_epochs*0.95:
num_epochs = int(1.1*num_epochs)
# print(f"\rConvergence reached at epoch {epoch + 1}. Extending training for {int(num_epochs - epoch)} additional epochs.")
if r2 > best_r2_score:
epoch_loss = running_loss / len(train_loader.dataset)
to_be_printed = np.round(r2_score(y_true, y_pred),3)
print(f"\rEpoch [{epoch + 1}/{num_epochs}], R2-Loss: {to_be_printed}, CritLoss: {epoch_loss:.4f}",end="")
best_r2_score = r2
best_model_state = model.state_dict()
model.eval()
with torch.no_grad(): #This changes the learning rate if we are plateuing. plateou platuo plateuing
val_loss = 0.0
for inputs, labels in test_loader:
outputs = model(inputs)
val_loss += criterion(outputs, labels).item() * inputs.size(0)
val_loss /= len(test_loader.dataset)
scheduler.step(val_loss)
'''
TODO: We may need to change this, as we have different weights for each model. kfolds
takes the average. This means changing above in best_mode_state, and also below
etc; Either this, or one just accepts that we use the architecture as the model. Since
training takes like 5 minutes, this isnt the worst offence I can think of.
'''
if best_model_state is not None:
model.load_state_dict(best_model_state)
best_model = model #This is then the best model.
# print("Best model loaded with r^2 score:", best_r2_score)
best_model.eval()
# model.load_state_dict(best_model_state)
# Evaluate the model
model.eval()
test_loss = 0.0
y_true = []
y_pred = []
with torch.no_grad():
for inputs, labels in test_loader:
outputs = (model(inputs)) #We do this as well in case the model tries to give us a negative value...
test_loss += criterion(outputs, labels).item() * inputs.size(0)
y_true.extend(labels.numpy())
y_pred.extend(outputs.numpy())
# print(y_true)
# print(y_pred)
# Calculate the R-squared score for the fold
# r2 = r2_score(y_true, y_pred)
r2 = best_r2_score #Take the best we got.
print("")
r2_scores.append(r2)
# print(f"Fold R-squared: {r2:.4f}")
from sklearn.metrics import mean_absolute_percentage_error
MSE_loss = mean_absolute_percentage_error(y_true, y_pred)
# Calculate the average R-squared score across all folds
avg_r2_score = np.mean(r2_scores)
# print("Average R-squared score:", avg_r2_score)
# Arch = Net.get_architecture()
Arch = "Wrong place" #We dont bother with the architecture if we try to run the code. It has already been extracted.
return avg_r2_score,MSE_loss,Arch
#Calcium,Zinc is very fold dependent.
if __name__ == "__main__":
import warnings
warnings.filterwarnings("ignore")
if __name__ == "__main__":
import csv
import warnings
import os
from tqdm import tqdm
# Suppress warnings
warnings.filterwarnings("ignore")
# Define parameters
reduced_in = False
Ramona_norm = False
meth = "Steady" #hehe.meth.
kfold = 5
seed_list = [42,33,11,5,2]
# seed = 42
# seed = seed_list[0]
name = 'resultsNN_copper.csv'
counter = 0
# Create a directory for results if it doesn't exist
results_folder = "Results_Neural"
if not os.path.exists(results_folder):
os.makedirs(results_folder)
# Define the file path within the results folder
file_path = os.path.join(results_folder, name)
with open(file_path,'a',newline='') as csvfile:
writer = csv.writer(csvfile)
header = ["Reduced input", "Ramona norm", "method", "Kernel", "K fold","Seed"]
for elem in some_list_2:
header.extend([elem + "_MAPA", elem + "_R2"])
writer.writerow(header)
for seed in seed_list:
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.backends.cudnn.deterministic = True
np.random.seed(seed)
print("Seed percentage: "+str(100*counter/len(seed_list)))
counter+=1
# Generate features and labels
Features, Labels, _, names = Make_Features_and_Labels(some_list, method=meth, add_noise=False, std_width=0.01, factor=4, add_extra=False, Boolnorm=Ramona_norm)
# Open CSV file in append mode
# Architecture = Net.get_architecture()
_,_,Architecture = Machine_Learning(Features, Labels, names, kfold,seed, std_width=0.1, add_noise=False, factor=2, name=str("Calcium"), reduced_input=reduced_in,Archit = True)
r2s = []
row_data = [reduced_in, Ramona_norm, meth, Architecture, kfold, seed]
# Loop through elements in some_list_3
for elem in tqdm(some_list_2, desc = "Elements", position=0, leave=True):
if elem == "Copper":#Now we only try to fit Calcium.
#elem == "Calcium"or elem== "Chloride" or elem== "Chromium" or elem== "Copper"
r2, MSE,_ = Machine_Learning(Features, Labels, names, kfold,seed, std_width=0.1, add_noise=False, factor=2, name=str(elem), reduced_input=reduced_in)
r2s.append(r2)
row_data.append(MSE)
row_data.append(r2)
print("The r2 values were:"+str(r2s))
# Append element-specific data to the row
# row_data.extend([elem, MSE, r2])
# Write row to CSV file
writer.writerow(row_data)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment