Commit 19fd77b7 by Pamela Osuna

multilabel version

parent 0850d7be
import models as m
import roc_auc as ra
from matplotlib.pyplot \
import figure,plot,title,ylabel,xlabel,legend,savefig,ioff
from numpy import expand_dims as dims
from numpy import unique
from random import shuffle
from functools import reduce
from prec_recall import create_pr, avg_pr
from sklearn.metrics import confusion_matrix
from sklearn.model_selection import StratifiedKFold
from sklearn.utils import class_weight
from imblearn.over_sampling import SMOTE,ADASYN
from tensorflow.keras.utils import to_categorical
def undesample(X,y):
"""
Balances the input and output data by removing
samples from the more represented classes
Parameters
----------
X : numpy array
Input data.
y : numpy array
class membership.
Returns
-------
numpy array
Balanced input.
numpy array
Balanced output.
"""
locs = find_classes(y)
shuffle_members(locs)
cmin,members = min(locs.items(),key=lambda x:len(x[1]))
# maximum number of samples to keep cardinality of classes balanced
n = len(members)
print('Undersampling to %d samples'%n)
indices = list(reduce(lambda x,y:x+y,map(lambda x:x[:n],locs.values())))
shuffle(indices)
return X[indices],y[indices]
def shuffle_members(cm):
"""
Shuffle the members of each class in place
Parameters
----------
cm : dictionary
keys are classes, values are list of indices of its members.
Returns
-------
None
"""
for c,_ in cm.items():
shuffle(cm[c])
print("cardinality of class %d is %d"%(c,len(cm[c])))
def find_classes(x):
"""
Find the indices of members of each class
Parameters
----------
x : iterable
heterogeneous memberships.
Returns
-------
locs : list
dictionary with the locations of the members for each class.
"""
locs = {}
for i,c in enumerate(x):
if c in locs:
locs[c].append(i)
else:
locs[c] = [i]
return locs
def encode(output,N=4):
"""
One hot encoding of the input
Parameters
----------
output : numpy array
vector with the target outputs.
Returns
-------
numpy array
matrix with the one hot encoding of the outputs.
"""
return to_categorical(output,N)
def decode(y_onehot):
"""
Converts each one-hot encoded vector to its corresponding class value.
Parameters
----------
output : numpy array
vector with the target outputs.
Returns
-------
numpy array
matrix with the corresponding class of for each vector of the array.
"""
return np.array([np.argmax(y_onehot[i]) for i in range(len(y_onehot))])
def balance(X,y,method):
"""
Balances the training data
Parameters
----------
X : numpy array
inputs.
y : numpy array
outputs.
method : str
any of 'smote','adasyn','class_weight'
Returns
-------
numpy array
balanced input.
numpy array
balanced output.
numpy array
class weights. Only present for the
'class_weight' method
"""
if method == 'smote':
print('SMOTE')
smote = SMOTE(random_state=0xAAAA)
return smote.fit_resample(X, y),None
elif method == 'adasyn':
print('ADASYN')
adasyn = ADASYN(random_state=0xAAAA)
return adasyn.fit_resample(X, y),None
elif method == 'class_weight':
print('CLASS WEIGHTS')
weights = class_weight.compute_class_weight('balanced',unique(y),y)
return (X,y),weights
elif method == 'undersampling':
print('UNDERSAMPLING')
return undesample(X,y),None
class CNN_Antifrag:
"""
Convolutional Neural Network
For predicting Robustness and Evolvability
Based on antifragility estimations
"""
def __init__(self,name='CNN',K = 5,N = 4):
"""
Creates a Convolutional Neural Network
Modeling experiment
Parameters
----------
name: str,optional
prefix for history files
The default is 'CNN'
K : int, optional
Number of folds in the cross validation.
The default is 5.
N : int, optional
Number of classes.
The default is 4.
Returns
-------
None.
"""
self.name = name
self.K = K
self.N = N
ioff()
def save_history_plots(self,history,outer,inner=None,name=None):
"""
Parameters
----------
history : dictionary
Model fitting history.
inner : int
Index of validation set.
outer : int
Index of test set.
Returns
-------
None.
"""
name = name if name else self.name
figure()
plot(history.history['acc'])
plot(history.history['val_acc'])
s1 = '(inner fold =%d,outer fold=%d)'%(inner,outer) \
if inner is not None else '(fold=%d)'%outer
title('Model accuracy %s'%s1)
ylabel('Accuracy')
xlabel('Epoch')
legend(['Training','Validation'],loc='upper left')
s2 = '%d_%d'%(inner,outer) if inner is not None else '%d'%outer
savefig('out/'+name+'_accuracy_%s.pdf'%s2)
figure()
plot(history.history['loss'])
plot(history.history['val_loss'])
title('Model Loss %s'%s1)
ylabel('Cross entropy loss')
xlabel('Epoch')
legend(['Training','Validation'],loc='upper right')
savefig('out/'+name+'_loss_%s.pdf'%s2)
def run_nn(self,X, y, params):
c, b, e, o = params
bs, ep = m.choose_batch_epochs(b,e)
o = m.choose_balancing_method(o)
K,N = self.K,self.N
# random states are defined for reproducibility of results
outer = StratifiedKFold(K,shuffle=True,random_state=0xBBBB)
inner = StratifiedKFold(K-1,shuffle=True,random_state=0xCCCC)
total_acc, total_auc = 0,0
# outer loop splits test sets
# Test data is never used in training or cross validation
# therefore we use underscore to ignore indices
for (data_idx, _),i in zip(outer.split(X, y),range(K)):
# balancing training and validation sets
(X_D,y_D),weights = balance(X[data_idx],y[data_idx],o)
# test set is left imbalanced, one hot encoding for output
# inner loop splits training and validation sets
for (train_idx,val_idx),j in zip(inner.split(X_D,y_D),range(K-1)):
X_train,y_train = dims(X_D[train_idx],2),encode(y_D[train_idx])
X_val,y_val = dims(X_D[val_idx],2),encode(y_D[val_idx])
# decoding one-hot to normal encoding
y_val = decode(y_val)
# converting output to binary vector (multilabel)
y_val = output_reverse(y_val)
y_train = decode(y_train)
y_train = output_reverse(y_train)
# creating a new instance of the architecture
model = m.model_architecture(c)
# compile the keras model
model.compile(loss = 'binary_crossentropy',
optimizer = 'adam',
metrics = ['acc'])
# model training, 3D expansion of the input required
# for convolutional layers
history = model.fit(X_train,
y_train,
batch_size = bs,
epochs = ep,
verbose = 2,
class_weight=weights,
validation_data = (X_val, y_val))
# save history of accuracy and loss
self.save_history_plots(history,i,j)
# calculate accuracy
_,accuracy = model.evaluate(X_val, y_val,verbose = 0)
total_acc += accuracy
print("t_set = " + str(i) + " v_set = " + str(j))
print('Test accuracy:', accuracy)
y_pred = model.predict(X_val, batch_size = bs)
y_pred[y_pred>=0.5] = 1
y_pred[y_pred<0.5] = 0
y_val = output_convert(y_val)
y_pred = output_convert(y_pred)
y_val = encode(y_val)
y_pred = encode(y_pred)
# calculate area under the curve
fpr, tpr, auc = ra.roc_auc(N, y_val, y_pred)
total_auc += auc
print("Area under the curve:", auc)
total_acc = total_acc/(K*(K-1))
total_auc = total_auc/(K*(K-1))
print("Average accuracy: ", total_acc)
print("Average area under the curve: ", total_auc)
return total_acc, total_auc
def run_kfold(self,X, y, params):
c, b, e, o = params
bs, ep = m.choose_batch_epochs(b,e)
o = m.choose_balancing_method(o)
K,N = self.K,self.N
# random states are defined for reproducibility of results
kfold = StratifiedKFold(K,shuffle=True,random_state=0xBBBB)
precs_k, recs_k, avgs_k = [], [], []
total_acc, total_auc = 0,0
# outer loop splits test sets
# Test data is never used in training or cross validation
# therefore we use underscore to ignore indices
for (train_idx,test_idx),i in zip(kfold.split(X, y),range(K)):
# balancing training set
(X_train,y_train),weights = balance(X[train_idx],y[train_idx],o)
X_train,y_train = dims(X_train,2),encode(y_train)
# test set is left imbalanced, one hot encoding for output
(X_test,y_test) = dims(X[test_idx],2),encode(y[test_idx])
# convert from one-hot enconding to normal
y_test = decode(y_test)
# converting output to binary vector (multilabel)
y_test = output_reverse(y_test)
y_train = decode(y_train)
y_train = output_reverse(y_train)
# creating a new instance of the architecture
model = m.model_architecture(c)
# compile the keras model
model.compile(loss = 'binary_crossentropy',
optimizer = 'adam',
metrics = ['acc'])
# model training, 3D expansion of the input required
# for convolutional layers
history = model.fit(X_train,
y_train,
batch_size = bs,
epochs = ep,
verbose = 2,
class_weight=weights,
validation_data = (X_test, y_test))
# save history of accuracy and loss
self.save_history_plots(history,i,name='Eval_'+self.name)
# calculate accuracy
_,accuracy = model.evaluate(X_test, y_test,verbose = 0)
total_acc += accuracy
print("fold = " + str(i))
print('Test accuracy:', accuracy)
y_pred = model.predict(X_test, batch_size = bs)
y_pred[y_pred>=0.5] = 1
y_pred[y_pred<0.5] = 0
y_test = output_convert(y_test)
y_pred = output_convert(y_pred)
y_test = encode(y_test)
y_pred = encode(y_pred)
# calculate area under the curve
fpr, tpr, auc = ra.roc_auc(N, y_test, y_pred)
total_auc += auc
print("Area under the curve:", auc)
# confusion matrix
if i == 0:
cm = confusion_matrix(
y_test.argmax(axis=1), y_pred.argmax(axis=1))
else:
cm += confusion_matrix(
y_test.argmax(axis=1), y_pred.argmax(axis=1))
#pr curve (contains 4 pr curves: one for each class)
recall, precision, average_prec = create_pr(N, y_test, y_pred)
recs_k.append(recall)
precs_k.append(precision)
avgs_k.append(average_prec)
total_acc = total_acc/K
total_auc = total_auc/K
cm = cm/K
pr = avg_pr(K, N, recs_k, precs_k, avgs_k)
print("Average accuracy: ", total_acc)
print("Average area under the curve: ", total_auc)
return total_acc, total_auc, cm, pr
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment