Source code for qrmine.mlqrmine

import numpy
from imblearn.over_sampling import RandomOverSampler
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from numpy import random, argsort, sqrt, array, ones
from pandas import read_csv
from sklearn.cluster import KMeans
from sklearn.metrics import confusion_matrix
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder, OneHotEncoder
from sklearn.preprocessing import StandardScaler
from sklearn.svm import SVC
from sklearn.neighbors import KDTree
from random import randint

from xgboost import XGBClassifier
from mlxtend.frequent_patterns import apriori
from mlxtend.frequent_patterns import association_rules


[docs] class MLQRMine(object): def __init__(self): self._seed = randint(1, 9) self._csvfile = "" self._titles = None self._dataset = None self._X = None self._y = None self._X_original = None self._y_original = None self._dataset_original = None self._model = Sequential() self._sc = StandardScaler() self._vnum = 0 # Number of variables self._classifier = XGBClassifier() self._epochs = 10 self._samplesize = 0 self._clusters = None @property def seed(self): return self._seed @property def csvfile(self): return self._csvfile @property def dataset(self): return self._dataset @property def model(self): return self._model @property def epochs(self): return self._epochs @property def X(self): return self._X @property def y(self): return self._y @property def titles(self): return self._titles @property def head(self): return self._dataset.head # Getters should be before setters* @epochs.setter def epochs(self, epochs): self._epochs = epochs @seed.setter def seed(self, seed): self._seed = seed @csvfile.setter def csvfile(self, csvfile): self._csvfile = csvfile @titles.setter def titles(self, titles): self._titles = titles # Functions
[docs] def read_csv(self): if self._titles is not None: self._dataset = read_csv(self._csvfile, usecols=self._titles) else: self._dataset = read_csv(self._csvfile)
[docs] def mark_missing(self): self._dataset_original = self._dataset self._dataset = self._dataset.replace('', numpy.nan) self._dataset.dropna(inplace=True)
[docs] def restore_mark_missing(self): self._dataset = self._dataset_original
[docs] def get_shape(self): return self._dataset.shape
""" The actual number of IVs is vnum -2 as first is the title and the last is the DV To seperate DV, use vnum -1 to indicate last column More details on np array splicing here: https://stackoverflow.com/questions/34007632/how-to-remove-a-column-in-a-numpy-array/34008274 """
[docs] def read_xy(self): (self._samplesize, vnum) = self._dataset.shape # Last column in the csv should be the DV and first one is title (So get the number of variables) self._vnum = vnum - 2 # splice into IVs and DV values = self._dataset.values # self._X = values[:, 0:self._vnum] # First column ignored - (To be used for title) self._X = values[:, 1:vnum - 1] self._y = values[:, vnum - 1]
[docs] def oversample(self): self._X_original = self._X self._y_original = self._y ros = RandomOverSampler(random_state=0) X, y = ros.fit_resample(self._X, self._y) self._X = X self._y = y
[docs] def restore_oversample(self): self._X = self._X_original self._y = self._y_original
[docs] def prepare_data(self, oversample=False): self.read_csv() self.mark_missing() self.read_xy() if oversample: self.oversample()
[docs] def get_nnet_predictions(self): self._model.add(Dense(12, input_dim=self._vnum, kernel_initializer='uniform', activation='relu')) self._model.add(Dense(8, kernel_initializer='uniform', activation='relu')) self._model.add(Dense(1, kernel_initializer='uniform', activation='sigmoid')) # Compile model self._model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy']) # Fit the model self._model.fit(self._X, self._y, epochs=self._epochs, batch_size=10, verbose=2) # calculate predictions predictions = self._model.predict(self._X_original) # round predictions rounded = [round(x[0]) for x in predictions] return rounded
[docs] def get_nnet_scores(self): return self._model.evaluate(self._X, self._y)
[docs] def svm_confusion_matrix(self): """Generate confusion matrix for SVM Returns: [list] -- [description] """ X_train, X_test, y_train, y_test = train_test_split(self._X, self._y, test_size=0.25, random_state=0) # Issue #22 y_test = y_test.astype('int') y_train = y_train.astype('int') X_train = self._sc.fit_transform(X_train) X_test = self._sc.transform(X_test) classifier = SVC(kernel='linear', random_state=0) classifier.fit(X_train, y_train) y_pred = classifier.predict(X_test) # Issue #22 y_pred = y_pred.astype('int') return confusion_matrix(y_test, y_pred)
# https://stackoverflow.com/questions/45419203/python-numpy-extracting-a-row-from-an-array
[docs] def get_kmeans(self, c=5): kmeans = KMeans(n_clusters=c, init='k-means++', random_state=42) y_kmeans = kmeans.fit_predict(self._X) self._clusters = y_kmeans self.get_centroids(c) return y_kmeans
[docs] def get_centroids(self, c=1): for x in range(0, c): print("Cluster: ", x) ct = 0 cluster_list = [] for cluster in self._clusters: if cluster == x: cluster_list.append(ct) ct += 1 print("Cluster Length: ", len(cluster_list)) print("Cluster Members") print(self._dataset.iloc[cluster_list, :]) print("Mean") print(self._dataset.iloc[cluster_list, :].mean(axis=0))
""" TODO: This is not working yet. use the ColumnTransformer instead of categorical_features """
[docs] def encode_categorical(self): # labelencoder_X_1 = LabelEncoder() # self._X[:, 1] = labelencoder_X_1.fit_transform(self._X[:, 1]) # labelencoder_X_2 = LabelEncoder() # self._X[:, 2] = labelencoder_X_2.fit_transform(self._X[:, 2]) onehotencoder = OneHotEncoder(categorical_features=[1]) X = onehotencoder.fit_transform(self._X).toarray() X = X[:, 1:] print(X) return X
[docs] def get_association(self): X_train, X_test, y_train, y_test = train_test_split(self._X, self._y, test_size=0.25, random_state=0) self._classifier.fit(X_train, y_train) # Predicting the Test set results y_pred = self._classifier.predict(X_test) return confusion_matrix(y_test, y_pred)
[docs] def get_apriori(self): frequent_itemsets = apriori(self.encode_categorical(), min_support=0.07, use_colnames=True) rules = association_rules(frequent_itemsets, metric="lift", min_threshold=1) return rules
[docs] def get_pca(self, n=3): # https://plot.ly/~notebook_demo/264/about-the-author-some-of-sebastian-rasc/#/ X_std = StandardScaler().fit_transform(self._X) (recs, factors) = X_std.shape print('Covariance matrix: \n%s' % numpy.cov(X_std.T)) cov_mat = numpy.cov(X_std.T) eig_vals, eig_vecs = numpy.linalg.eig(cov_mat) print('Eigenvectors \n%s' % eig_vecs) print('\nEigenvalues \n%s' % eig_vals) # Make a list of (eigenvalue, eigenvector) tuples eig_pairs = [(numpy.abs(eig_vals[i]), eig_vecs[:, i]) for i in range(len(eig_vals))] # Sort the (eigenvalue, eigenvector) tuples from high to low eig_pairs.sort() eig_pairs.reverse() # Visually confirm that the list is correctly sorted by decreasing eigenvalues print('Eigenvalues in descending order:') for i in eig_pairs: print(i[0]) # variance explained tot = sum(eig_vals) var_exp = [(i / tot) * 100 for i in sorted(eig_vals, reverse=True)] cum_var_exp = numpy.cumsum(var_exp) print("Variance explained: ", var_exp) print("Cumulative: ", cum_var_exp) if len(eig_vals) < n: n = len(eig_vals) # Adjust according to number of features chosen (default n=2) matrix_w = numpy.hstack((eig_pairs[0][1].reshape(factors, 1), eig_pairs[1][1].reshape(factors, 1))) if n == 3: matrix_w = numpy.hstack((eig_pairs[0][1].reshape(factors, 1), eig_pairs[1][1].reshape(factors, 1), eig_pairs[2][1].reshape(factors, 1))) if n == 4: matrix_w = numpy.hstack((eig_pairs[0][1].reshape(factors, 1), eig_pairs[1][1].reshape(factors, 1), eig_pairs[2][1].reshape(factors, 1), eig_pairs[3][1].reshape(factors, 1))) if n == 5: matrix_w = numpy.hstack((eig_pairs[0][1].reshape(factors, 1), eig_pairs[1][1].reshape(factors, 1), eig_pairs[2][1].reshape(factors, 1), eig_pairs[3][1].reshape(factors, 1), eig_pairs[4][1].reshape(factors, 1))) print('Matrix W:\n', matrix_w)