Commit 05b4ddf8 by Stalin Munoz

k-fold refactoring, cubic interpolation, and adasyn

parent 4b274c88
import models as m
import roc_auc as ra
from matplotlib.pyplot \
import figure,plot,title,ylabel,xlabel,legend,savefig,ioff
from numpy import expand_dims as dims
from numpy import unique
from prec_recall import create_pr, avg_pr
import sys
from sklearn.metrics import confusion_matrix
import numpy as np
from sklearn.model_selection import train_test_split
from imblearn.over_sampling import SMOTE
from sklearn.model_selection import StratifiedKFold
from sklearn.utils import class_weight
from imblearn.over_sampling import SMOTE,ADASYN
from tensorflow.keras.utils import to_categorical
def encode(output,N=4):
"""
One hot encoding of the input
## global variables
N_SPLITS = 5 # for the kfold
N_CLASSES = 4
Parameters
----------
output : numpy array
vector with the target outputs.
def run_nn(input_, output_, n_experiences, params):
Returns
-------
numpy array
matrix with the one hot encoding of the outputs.
c, b, e = params
"""
return to_categorical(output,N)
def balance(X,y,method):
"""
Balances the training data
Parameters
----------
X : numpy array
inputs.
y : numpy array
outputs.
method : str
any of 'smote','adasyn','class_weight'
Returns
-------
numpy array
balanced input.
numpy array
balanced output.
numpy array
class weights. Only present for the
'class_weight' method
# kfold validation
"""
X for the input and y for the output
if method == 'smote':
print('SMOTE')
smote = SMOTE(random_state=0xAAAA)
return smote.fit_resample(X, y),None
elif method == 'adasyn':
print('ADASYN')
adasyn = ADASYN(random_state=0xAAAA)
return adasyn.fit_resample(X, y),None
elif method == 'class_weight':
print('CLASS WEIGHTS')
weights = class_weight.compute_class_weight('balanced',unique(y),y)
return (X,y),weights
class CNN_Antifrag:
"""
Convolutional Neural Network
For predicting Robustness and Evolvability
Based on antifragility estimations
"""
skf = StratifiedKFold(N_SPLITS)
#kfold = KFold(N_SPLITS, True, 1) #on definit la methode a utiliser en choisisant n_splits, shuffle on/off, random_state
X_train_kfold = []
X_test_kfold = []
y_train_kfold = []
y_test_kfold = []
#split the input data into k sets
#for train_index, test_index in kfold.split(input_):
for train_index, test_index in skf.split(input_, output_):
X_train_kfold.append(input_[train_index])
X_test_kfold.append(input_[test_index])
y_train_kfold.append(output_[train_index])
y_test_kfold.append(output_[test_index])
#balancing the data
sm = SMOTE(random_state=2)
for i in range(len(X_train_kfold)):
X_train_kfold[i], y_train_kfold[i] = sm.fit_sample(X_train_kfold[i],y_train_kfold[i].ravel())
# print(len(X_train_kfold[0])/(len(X_train_kfold[0])+len(X_test_kfold[0]))) #gives 0.8 OK
#build 4 sub-sub-sets out of each of the k subsets (we iterate the validation, taking it from the train set)
X_validation = [[0]*(N_SPLITS-1) for i in range(N_SPLITS)]
X_train = [[0]*(N_SPLITS-1) for i in range(N_SPLITS)]
y_validation = [[0]*(N_SPLITS-1) for i in range(N_SPLITS)]
y_train = [[0]*(N_SPLITS-1) for i in range(N_SPLITS)]
len_validation = int(len(X_train_kfold[0])/(N_SPLITS))
for i in range(N_SPLITS):
idx = 0
for j in range(N_SPLITS-1):
X_validation[i][j] = X_train_kfold[i][idx:idx+len_validation]
X_train[i][j] = list(X_train_kfold[i][0:idx]) + list(X_train_kfold[i][idx+len_validation:])
y_validation[i][j] = y_train_kfold[i][idx:idx+len_validation]
y_train[i][j] = list(y_train_kfold[i][0:idx]) + list(y_train_kfold[i][idx+len_validation:])
idx+=len_validation
#print(len(X_validation[0][0]), len(X_train[0][0])) #we expect X_validation[0] to be 1/3 of X_train's length
validation_Y_one_hot = [[0]*(N_SPLITS-1) for i in range(N_SPLITS)]
train_Y_one_hot = [[0]*(N_SPLITS-1) for i in range(N_SPLITS)]
for i in range(N_SPLITS):
for j in range(N_SPLITS-1):
# change the labels from categorical to one-hot encoding
train_Y_one_hot[i][j] = to_categorical(y_train[i][j], num_classes = 4)
validation_Y_one_hot[i][j] = to_categorical(y_validation[i][j], num_classes = 4)
def __init__(self,name='CNN',K = 5,N = 4):
"""
Creates a Convolutional Neural Network
Modeling experiment
Parameters
----------
name: str,optional
prefix for history files
The default is 'CNN'
K : int, optional
Number of folds in the cross validation.
The default is 5.
N : int, optional
Number of classes.
The default is 4.
Returns
-------
None.
#convert input to np.array
X_train[i][j] = np.array(X_train[i][j])
X_validation[i][j] = np.array(X_validation[i][j])
"""
self.name = name
self.K = K
self.N = N
ioff()
#convert each element of the train and test set into a matrix of size 30x1(?)
X_train[i][j] = X_train[i][j].reshape(-1, 30, 1)
X_validation[i][j] = X_validation[i][j].reshape(-1, 30, 1)
def save_history_plots(self,history,outer,inner=None,name=None):
"""
#convert the data from an int8 format to a float32 type
X_train[i][j] = X_train[i][j].astype('float32')
X_validation[i][j] = X_validation[i][j].astype('float32')
Parameters
----------
history : dictionary
Model fitting history.
inner : int
Index of validation set.
outer : int
Index of test set.
#self reminder : warning! be careful not to use i and j as indexes later in here for something else
Returns
-------
None.
#i: number of the test_set (i belongs to {0, ..., k-1})
#j: number of the validation_set (j belongs to {0, ..., k-2})
total_acc = 0
total_auc = 0
"""
name = name if name else self.name
figure()
plot(history.history['acc'])
plot(history.history['val_acc'])
s1 = '(inner fold =%d,outer fold=%d)'%(inner,outer) \
if inner is not None else '(fold=%d)'%outer
title('Model accuracy %s'%s1)
ylabel('Accuracy')
xlabel('Epoch')
legend(['Training','Validation'],loc='upper left')
s2 = '%d_%d'%(inner,outer) if inner is not None else '%d'%outer
savefig('out/'+name+'_accuracy_%s.pdf'%s2)
figure()
plot(history.history['loss'])
plot(history.history['val_loss'])
title('Model Loss %s'%s1)
ylabel('Cross entropy loss')
xlabel('Epoch')
legend(['Training','Validation'],loc='upper right')
savefig('out/'+name+'_loss_%s.pdf'%s2)
def run_nn(self,X, y, params):
c, b, e, o = params
bs, ep = m.choose_batch_epochs(b,e)
for i in range(N_SPLITS):
for j in range(N_SPLITS-1):
#defining keras model
o = m.choose_balancing_method(o)
K,N = self.K,self.N
# random states are defined for reproducibility of results
outer = StratifiedKFold(K,shuffle=True,random_state=0xBBBB)
inner = StratifiedKFold(K-1,shuffle=True,random_state=0xCCCC)
total_acc, total_auc = 0,0
# outer loop splits test sets
# Test data is never used in training or cross validation
# therefore we use underscore to ignore indices
for (data_idx, _),i in zip(outer.split(X, y),range(K)):
# balancing training and validation sets
(X_D,y_D),weights = balance(X[data_idx],y[data_idx],o)
# test set is left imbalanced, one hot encoding for output
# inner loop splits training and validation sets
for (train_idx,val_idx),j in zip(inner.split(X_D,y_D),range(K-1)):
X_train,y_train = dims(X_D[train_idx],2),encode(y_D[train_idx])
X_val,y_val = dims(X_D[val_idx],2),encode(y_D[val_idx])
# creating a new instance of the architecture
model = m.model_architecture(c)
#compile the keras model
model.compile(loss = 'categorical_crossentropy', optimizer = 'adam', metrics = ['accuracy'])
model.fit(X_train[i][j], train_Y_one_hot[i][j], batch_size = bs, epochs = ep, verbose = 2, validation_data = (X_validation[i][j], validation_Y_one_hot[i][j]))
#calculate accuracy
_,accuracy = model.evaluate(X_validation[i][j], validation_Y_one_hot[i][j], verbose = 0)
# compile the keras model
model.compile(loss = 'categorical_crossentropy',
optimizer = 'adam',
metrics = ['accuracy'])
# model training, 3D expansion of the input required
# for convolutional layers
history = model.fit(X_train,
y_train,
batch_size = bs,
epochs = ep,
verbose = 2,
class_weight=weights,
validation_data = (X_val, y_val))
# save history of accuracy and loss
self.save_history_plots(history,i,j)
# calculate accuracy
_,accuracy = model.evaluate(X_val, y_val,verbose = 0)
total_acc += accuracy
print("t_set = " + str(i) + " v_set = " + str(j))
print('Test accuracy:', accuracy)
# calculate area under the curve and confu
y_pred = model.predict(X_validation[i][j], batch_size = bs)
fpr, tpr, auc = ra.roc_auc(N_CLASSES, validation_Y_one_hot[i][j], y_pred)
# calculate area under the curve
y_pred = model.predict(X_val, batch_size = bs)
fpr, tpr, auc = ra.roc_auc(N, y_val, y_pred)
total_auc += auc
print("Area under the curve:", auc)
total_acc = total_acc/(N_SPLITS*(N_SPLITS-1))
total_auc = total_auc/(N_SPLITS*(N_SPLITS-1))
total_acc = total_acc/(K*(K-1))
total_auc = total_auc/(K*(K-1))
print("Average accuracy: ", total_acc)
print("Average area under the curve: ", total_auc)
return total_acc, total_auc, X_train_kfold, X_test_kfold, y_train_kfold, y_test_kfold
def run_kfold(X_train, X_test, y_train, y_test, params):
c, b, e = params
return total_acc, total_auc
for i in range(N_SPLITS):
# change the labels from categorical to one-hot encoding
y_train[i] = to_categorical(y_train[i], num_classes = 4)
y_test[i] = to_categorical(y_test[i], num_classes = 4)
def run_kfold(self,X, y, params):
#convert input to np.array
X_train[i] = np.array(X_train[i])
X_test[i] = np.array(X_test[i])
#convert each element of the train and test set into a matrix of size 30x1(?)
X_train[i] = X_train[i].reshape(-1, 30, 1)
X_test[i] = X_test[i].reshape(-1, 30, 1)
#convert the data from an int8 format to a float32 type
X_train[i] = X_train[i].astype('float32')
X_test[i] = X_test[i].astype('float32')
total_acc = 0
total_auc = 0
precs_k = [] #it will contain the average pr curve for each class
recs_k = []
avgs_k = []
c, b, e, o = params
bs, ep = m.choose_batch_epochs(b,e)
for i in range(N_SPLITS):
o = m.choose_balancing_method(o)
K,N = self.K,self.N
# random states are defined for reproducibility of results
kfold = StratifiedKFold(K,shuffle=True,random_state=0xBBBB)
precs_k, recs_k, avgs_k = [], [], []
total_acc, total_auc = 0,0
# outer loop splits test sets
# Test data is never used in training or cross validation
# therefore we use underscore to ignore indices
for (train_idx,test_idx),i in zip(kfold.split(X, y),range(K)):
# balancing training set
(X_train,y_train),weights = balance(X[train_idx],y[train_idx],o)
X_train,y_train = dims(X_train,2),encode(y_train)
# test set is left imbalanced, one hot encoding for output
(X_test,y_test) = dims(X[test_idx],2),encode(y[test_idx])
# creating a new instance of the architecture
model = m.model_architecture(c)
#compile the keras model
model.compile(loss = 'categorical_crossentropy', optimizer = 'adam', metrics = ['accuracy'])
#train the model
model.fit(X_train[i], y_train[i], batch_size = bs, epochs = ep, verbose = 1, validation_data = (X_test[i], y_test[i]))
#calculate accuracy
_,accuracy = model.evaluate(X_test[i], y_test[i], verbose = 0)
# compile the keras model
model.compile(loss = 'categorical_crossentropy',
optimizer = 'adam',
metrics = ['accuracy'])
# model training, 3D expansion of the input required
# for convolutional layers
history = model.fit(X_train,
y_train,
batch_size = bs,
epochs = ep,
verbose = 2,
class_weight=weights,
validation_data = (X_test, y_test))
# save history of accuracy and loss
self.save_history_plots(history,i,name='Eval_'+self.name)
# calculate accuracy
_,accuracy = model.evaluate(X_test, y_test,verbose = 0)
total_acc += accuracy
print("t_set = " + str(i))
print("fold = " + str(i))
print('Test accuracy:', accuracy)
# calculate area under the curve
y_pred = model.predict(X_test[i], batch_size = bs)
fpr, tpr, auc = ra.roc_auc(N_CLASSES, y_test[i], y_pred)
y_pred = model.predict(X_test, batch_size = bs)
fpr, tpr, auc = ra.roc_auc(N, y_test, y_pred)
total_auc += auc
print("Area under the curve:", auc)
# confusion matrix
if i == 0:
cm = confusion_matrix(y_test[i].argmax(axis=1), y_pred.argmax(axis=1))
cm = confusion_matrix(
y_test.argmax(axis=1), y_pred.argmax(axis=1))
else:
cm+=confusion_matrix(y_test[i].argmax(axis=1), y_pred.argmax(axis=1))
cm += confusion_matrix(
y_test.argmax(axis=1), y_pred.argmax(axis=1))
#pr curve (contains 4 pr curves: one for each class)
recall, precision, average_prec = create_pr(N_CLASSES, y_test[i], y_pred)
recall, precision, average_prec = create_pr(N, y_test, y_pred)
recs_k.append(recall)
precs_k.append(precision)
avgs_k.append(average_prec)
#average of acc, auc, cm, pr
total_acc = total_acc/(N_SPLITS)
total_auc = total_auc/(N_SPLITS)
cm = cm/N_SPLITS
pr = avg_pr(N_SPLITS, N_CLASSES, recs_k, precs_k, avgs_k)
total_acc = total_acc/K
total_auc = total_auc/K
cm = cm/K
pr = avg_pr(K, N, recs_k, precs_k, avgs_k)
print("Average accuracy: ", total_acc)
print("Average area under the curve: ", total_auc)
return total_acc, total_auc, cm, pr
......@@ -49,7 +49,8 @@ def plot_confusion_matrix(cm,
if normalize:
cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
plt.figure(figsize=(8, 6))
fig = plt.figure(figsize=(8, 6))
ax = fig.add_subplot()
plt.imshow(cm, interpolation='nearest', cmap=cmap)
plt.title(title)
plt.colorbar()
......@@ -88,5 +89,10 @@ def plot_confusion_matrix(cm,
plt.ylabel('True label')
plt.xlabel('Predicted label\naccuracy={:0.4f}; misclass={:0.4f}'.format(accuracy, misclass))
#plt.show()
plt.savefig("confusion_matrix")
# Added labels
labels = ['']*(2*len(target_names))
labels[::2]=target_names
ax.set_xticklabels([''] + labels)
ax.set_yticklabels([''] + labels)
plt.savefig("confusion_matrix.pdf")
plt.close()
......@@ -61,14 +61,23 @@ def model_architecture(c):
model.add(Dropout(0.5))
model.add(Dense(num_classes, activation='softmax'))
return model
def choose_batch_epochs(b,e):
if b == 0 and e == 0:
return 16, 128
return 16, 12
if b == 0 and e == 1:
return 16, 512
if b == 1 and e == 0:
return 64, 128
return 64, 12
if b == 1 and e == 1:
return 64, 512
def choose_balancing_method(o):
if o == 0:
return 'smote'
elif o == 1:
return 'adasyn'
elif o == 2:
return 'class_weight'
\ No newline at end of file
def output_convert(N, e, r):
from itertools import product
B = [0,1]
converter = {(e,r):[i] for (e,r),i in zip(product(B,B),range(4))}
def output_convert(e, r):
"""
Encodes outputs as integers
Parameters
-----------
e : 1 evolvable, 0 not evolvable
r : 1 robust, 0 not robust
Returns
-----------
the encoded output
output meaning:
0 : not evolv. and not rob.
1 : evol. and not rob.
2 : not evol. and rob.
3 : evol. and rob.
"""
output = []
if (e[0]):
if (r[0]):
output.append(3)
else: output.append(1)
elif (r[0]):
output.append(2)
else: output.append(0)
return output
return converter[r,e]
import numpy as np
import pandas as pd
import output_convert as oc
import os
from scipy.interpolate import interp1d as interp
def parse_data(n_experiences):
N_DATA = n_experiences #from 2 to 1001
#DATA PARSING
X = []
def interpolate(y,K,kind='linear'):
"""
Interpolates vector x
Parameters
----------
y : list of real numbers
dependen variable values.
K : integer
samples to interpolate
Returns
-------
the interpolation values
"""
N = len(y)
x = np.arange(1/N,1+1/N,1/N)
f = interp(x,y,kind=kind,fill_value="extrapolate")
xintp = np.arange(1/K,1+1/K,1/K)
return f(xintp)
def parse_data(n_experiences,folder="data",samples=30,kind='linear'):
N_DATA = n_experiences #from 2 to 10001
input_ = []
output_ = []
N0 = 15 #15 or 20
#table of the name of the bionets
str_ = ["arabidopsis", "cardiac", "cd4", "mammalian", "metabolic", "anemia", "aurka", "b-cell", "body-drosophila", "bt474", "bt474-ErbB", "cycle-cdk", "fgf-drosophila", "gonadal", "hcc1954", "hcc1954-ErbB", "hh-drosophila", "l-arabinose-operon", "leukemia", "neurotransmitter", "oxidative-stress", "skbr-long", "skbr3-short", "spz-drosophila", "t-lgl-survival", "tol", "toll-drosophila", "trichostrongylus", "vegf-drosophila", "wg-drosophila", "yeast-cycle", "aspergillus-fumigatus", "budding-yeast", "gene-cardiac", "t-cell-differentiation", "lac-operon-bistability", "core-cell-cycle", "cortical"]
str_ = [
'anemia',
'cd4',
'lac-operon',
'spz-drosophila',
'arabidopsis',
'core-cell-cycle',
'lac-operon-bistability',
't-cell-differentiation',
'aspergillus-fumigatus',
'cortical',
'l-arabinose-operon',
't-lgl-survival',
'aurka',
'cycle-cdk',
'leukemia',
'tol',
'b-cell',
'fgf-drosophila',
'mammalian',
'trichostrongylus',
'body-drosophila',
'gene-cardiac',
'metabolic',
'vegf-drosophila',
'bt474',
'gonadal',
'neurotransmitter',
'wg-drosophila',
'bt474-ErbB',
'hcc1954',
'oxidative-stress',
'yeast-cycle',
'budding-yeast',
'hcc1954-ErbB',
'skbr3-long',
'cardiac',
'hh-drosophila',
'skbr3-short'
]
for s in str_:
dataXi_original = pd.read_csv("updated_data/"+ s + "/" + s + "_metrics.csv", sep=",", header=0)
N = int(dataXi_original.loc[1, 'N'])
antifragility_original = list(np.array(dataXi_original.loc[:, 'Antifragility']).astype(float))
# 15 or 20 points describing the relationship between the original values of antifragility in the network before perturbations and X/N
X_tmp = list(np.arange(1,N+1, 1))
X_N = [X_tmp[i]/N for i in range(N)]
original_points = [np.interp(i/N0, X_N, antifragility_original) for i in range(1, N0+1)]
for i in range(1,N_DATA):
data = pd.read_csv(
os.path.join(folder,s,s + "_metrics.csv"), sep=",", header=0)
# 30 points describing the relationship between the original
# values of antifragility in the network before perturbations and X/N
before =interpolate(
np.array(data.loc[:, 'Antifragility']).astype(float),
samples,
kind=kind)
for i in range(N_DATA):
#read the data for each experience
n = format(i, '09')
dataXi_tmp = pd.read_csv("updated_data/"+ s + "/" + s + "_" + n + "_metrics.csv", sep=",", header=0)
n = format(i+1, '09')
data = pd.read_csv(
os.path.join(folder,s,s + "_" + n + "_metrics.csv"),
sep=",", header=0)
#antifragility of the mutant
after = interpolate(
np.array(data.loc[:, 'Antifragility']).astype(float),
samples,
kind=kind)
# computes the difference of the antifragility curves
input_tmp = after-before
antifragility_tmp = list(np.array(dataXi_tmp.loc[:, 'Antifragility']).astype(float))
evolvable_tmp = list(
np.array(data.loc[:, 'Evolvability']).astype(int))
input_tmp = original_points + [np.interp(i/N0, X_N, antifragility_tmp) for i in range(1, N0+1)]
robust_tmp = list(
np.array(data.loc[:, 'Robustness']).astype(int))
evolvable_tmp = list(np.array(dataXi_tmp.loc[:, 'Evolvability']).astype(int))
robust_tmp = list(np.array(dataXi_tmp.loc[:, 'Robustness']).astype(int))
output_tmp = oc.output_convert(N, evolvable_tmp, robust_tmp)
# even though evolvable_tmp and robust_tmp are vectors,
# all entries are the same, we take the first
output_tmp = oc.output_convert(evolvable_tmp[0], robust_tmp[0])
input_.append(input_tmp)
output_+= output_tmp
......
......@@ -31,7 +31,7 @@ def plot_pr(recall, precision, average_precision):
plt.ylim([0.0, 1.05])
plt.xlim([0.0, 1.0])
plt.title('Average precision score, micro-averaged over all classes: AP={0:0.2f}'.format(average_precision["micro"]))
plt.savefig("precision_recall_curve")
plt.savefig("out/precision_recall_curve.pdf")
#plt.show()
plt.close()
......@@ -67,4 +67,4 @@ def avg_pr(n_splits, num_classes, recs_k, precs_k, avgs_k):
plt.ylim([0.0, 1.05])
plt.xlim([0.0, 1.0])
plt.title('Average precision score, over class {0}: AP={1:0.2f}'.format(i, avg_prec[i]))
plt.savefig("pr_curve_class_" +str(i))
plt.savefig("out/pr_curve_class_" +str(i)+".pdf")
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment