Spaces:
Runtime error
Runtime error
| #!/usr/local/bin/python3 | |
| # Author: Pranab Ghosh | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"); you | |
| # may not use this file except in compliance with the License. You may | |
| # obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or | |
| # implied. See the License for the specific language governing | |
| # permissions and limitations under the License. | |
| # Package imports | |
| import os | |
| import sys | |
| import numpy as np | |
| import pandas as pd | |
| import sklearn as sk | |
| from sklearn import preprocessing | |
| from sklearn import metrics | |
| import random | |
| from math import * | |
| from decimal import Decimal | |
| import pprint | |
| from statsmodels.graphics import tsaplots | |
| from statsmodels.tsa import stattools as stt | |
| from statsmodels.stats import stattools as sstt | |
| from sklearn.linear_model import LinearRegression | |
| from matplotlib import pyplot as plt | |
| from scipy import stats as sta | |
| from statsmodels.tsa.seasonal import seasonal_decompose | |
| import statsmodels.api as sm | |
| from sklearn.ensemble import IsolationForest | |
| from sklearn.neighbors import LocalOutlierFactor | |
| from sklearn.svm import OneClassSVM | |
| from sklearn.covariance import EllipticEnvelope | |
| from sklearn.mixture import GaussianMixture | |
| from sklearn.cluster import KMeans | |
| from sklearn.decomposition import PCA | |
| import hurst | |
| from .util import * | |
| from .mlutil import * | |
| from .sampler import * | |
| from .stats import * | |
| """ | |
| Load data from a CSV file, data frame, numpy array or list | |
| Each data set (array like) is given a name while loading | |
| Perform various data exploration operation refering to the data sets by name | |
| Save and restore workspace if needed | |
| """ | |
| class DataSetMetaData: | |
| """ | |
| data set meta data | |
| """ | |
| dtypeNum = 1 | |
| dtypeCat = 2 | |
| dtypeBin = 3 | |
| def __init__(self, dtype): | |
| self.notes = list() | |
| self.dtype = dtype | |
| def addNote(self, note): | |
| """ | |
| add note | |
| """ | |
| self.notes.append(note) | |
| class DataExplorer: | |
| """ | |
| various data exploration functions | |
| """ | |
| def __init__(self, verbose=True): | |
| """ | |
| initialize | |
| Parameters | |
| verbose : True for verbosity | |
| """ | |
| self.dataSets = dict() | |
| self.metaData = dict() | |
| self.pp = pprint.PrettyPrinter(indent=4) | |
| self.verbose = verbose | |
| def setVerbose(self, verbose): | |
| """ | |
| sets verbose | |
| Parameters | |
| verbose : True for verbosity | |
| """ | |
| self.verbose = verbose | |
| def save(self, filePath): | |
| """ | |
| save checkpoint | |
| Parameters | |
| filePath : path of file where saved | |
| """ | |
| self.__printBanner("saving workspace") | |
| ws = dict() | |
| ws["data"] = self.dataSets | |
| ws["metaData"] = self.metaData | |
| saveObject(ws, filePath) | |
| self.__printDone() | |
| def restore(self, filePath): | |
| """ | |
| restore checkpoint | |
| Parameters | |
| filePath : path of file from where to store | |
| """ | |
| self.__printBanner("restoring workspace") | |
| ws = restoreObject(filePath) | |
| self.dataSets = ws["data"] | |
| self.metaData = ws["metaData"] | |
| self.__printDone() | |
| def queryFileData(self, filePath, *columns): | |
| """ | |
| query column data type from a data file | |
| Parameters | |
| filePath : path of file with data | |
| columns : indexes followed by column names or column names | |
| """ | |
| self.__printBanner("querying column data type from a data frame") | |
| lcolumns = list(columns) | |
| noHeader = type(lcolumns[0]) == int | |
| if noHeader: | |
| df = pd.read_csv(filePath, header=None) | |
| else: | |
| df = pd.read_csv(filePath, header=0) | |
| return self.queryDataFrameData(df, *columns) | |
| def queryDataFrameData(self, df, *columns): | |
| """ | |
| query column data type from a data frame | |
| Parameters | |
| df : data frame with data | |
| columns : indexes followed by column name or column names | |
| """ | |
| self.__printBanner("querying column data type from a data frame") | |
| columns = list(columns) | |
| noHeader = type(columns[0]) == int | |
| dtypes = list() | |
| if noHeader: | |
| nCols = int(len(columns) / 2) | |
| colIndexes = columns[:nCols] | |
| cnames = columns[nCols:] | |
| nColsDf = len(df.columns) | |
| for i in range(nCols): | |
| ci = colIndexes[i] | |
| assert ci < nColsDf, "col index {} outside range".format(ci) | |
| col = df.loc[ : , ci] | |
| dtypes.append(self.getDataType(col)) | |
| else: | |
| cnames = columns | |
| for c in columns: | |
| col = df[c] | |
| dtypes.append(self.getDataType(col)) | |
| nt = list(zip(cnames, dtypes)) | |
| result = self.__printResult("columns and data types", nt) | |
| return result | |
| def getDataType(self, col): | |
| """ | |
| get data type | |
| Parameters | |
| col : contains data array like | |
| """ | |
| if isBinary(col): | |
| dtype = "binary" | |
| elif isInteger(col): | |
| dtype = "integer" | |
| elif isFloat(col): | |
| dtype = "float" | |
| elif isCategorical(col): | |
| dtype = "categorical" | |
| else: | |
| dtype = "mixed" | |
| return dtype | |
| def addFileNumericData(self,filePath, *columns): | |
| """ | |
| add numeric columns from a file | |
| Parameters | |
| filePath : path of file with data | |
| columns : indexes followed by column names or column names | |
| """ | |
| self.__printBanner("adding numeric columns from a file") | |
| self.addFileData(filePath, True, *columns) | |
| self.__printDone() | |
| def addFileBinaryData(self,filePath, *columns): | |
| """ | |
| add binary columns from a file | |
| Parameters | |
| filePath : path of file with data | |
| columns : indexes followed by column names or column names | |
| """ | |
| self.__printBanner("adding binary columns from a file") | |
| self.addFileData(filePath, False, *columns) | |
| self.__printDone() | |
| def addFileData(self, filePath, numeric, *columns): | |
| """ | |
| add columns from a file | |
| Parameters | |
| filePath : path of file with data | |
| numeric : True if numeric False in binary | |
| columns : indexes followed by column names or column names | |
| """ | |
| columns = list(columns) | |
| noHeader = type(columns[0]) == int | |
| if noHeader: | |
| df = pd.read_csv(filePath, header=None) | |
| else: | |
| df = pd.read_csv(filePath, header=0) | |
| self.addDataFrameData(df, numeric, *columns) | |
| def addDataFrameNumericData(self,filePath, *columns): | |
| """ | |
| add numeric columns from a data frame | |
| Parameters | |
| filePath : path of file with data | |
| columns : indexes followed by column names or column names | |
| """ | |
| self.__printBanner("adding numeric columns from a data frame") | |
| self.addDataFrameData(filePath, True, *columns) | |
| def addDataFrameBinaryData(self,filePath, *columns): | |
| """ | |
| add binary columns from a data frame | |
| Parameters | |
| filePath : path of file with data | |
| columns : indexes followed by column names or column names | |
| """ | |
| self.__printBanner("adding binary columns from a data frame") | |
| self.addDataFrameData(filePath, False, *columns) | |
| def addDataFrameData(self, df, numeric, *columns): | |
| """ | |
| add columns from a data frame | |
| Parameters | |
| df : data frame with data | |
| numeric : True if numeric False in binary | |
| columns : indexes followed by column names or column names | |
| """ | |
| columns = list(columns) | |
| noHeader = type(columns[0]) == int | |
| if noHeader: | |
| nCols = int(len(columns) / 2) | |
| colIndexes = columns[:nCols] | |
| nColsDf = len(df.columns) | |
| for i in range(nCols): | |
| ci = colIndexes[i] | |
| assert ci < nColsDf, "col index {} outside range".format(ci) | |
| col = df.loc[ : , ci] | |
| if numeric: | |
| assert isNumeric(col), "data is not numeric" | |
| else: | |
| assert isBinary(col), "data is not binary" | |
| col = col.to_numpy() | |
| cn = columns[i + nCols] | |
| dtype = DataSetMetaData.dtypeNum if numeric else DataSetMetaData.dtypeBin | |
| self.__addDataSet(cn, col, dtype) | |
| else: | |
| for c in columns: | |
| col = df[c] | |
| if numeric: | |
| assert isNumeric(col), "data is not numeric" | |
| else: | |
| assert isBinary(col), "data is not binary" | |
| col = col.to_numpy() | |
| dtype = DataSetMetaData.dtypeNum if numeric else DataSetMetaData.dtypeBin | |
| self.__addDataSet(c, col, dtype) | |
| def __addDataSet(self, dsn, data, dtype): | |
| """ | |
| add dada set | |
| Parameters | |
| dsn: data set name | |
| data : numpy array data | |
| """ | |
| self.dataSets[dsn] = data | |
| self.metaData[dsn] = DataSetMetaData(dtype) | |
| def addListNumericData(self, ds, name): | |
| """ | |
| add numeric data from a list | |
| Parameters | |
| ds : list with data | |
| name : name of data set | |
| """ | |
| self.__printBanner("add numeric data from a list") | |
| self.addListData(ds, True, name) | |
| self.__printDone() | |
| def addListBinaryData(self, ds, name): | |
| """ | |
| add binary data from a list | |
| Parameters | |
| ds : list with data | |
| name : name of data set | |
| """ | |
| self.__printBanner("adding binary data from a list") | |
| self.addListData(ds, False, name) | |
| self.__printDone() | |
| def addListData(self, ds, numeric, name): | |
| """ | |
| adds list data | |
| Parameters | |
| ds : list with data | |
| numeric : True if numeric False in binary | |
| name : name of data set | |
| """ | |
| assert type(ds) == list, "data not a list" | |
| if numeric: | |
| assert isNumeric(ds), "data is not numeric" | |
| else: | |
| assert isBinary(ds), "data is not binary" | |
| dtype = DataSetMetaData.dtypeNum if numeric else DataSetMetaData.dtypeBin | |
| self.dataSets[name] = np.array(ds) | |
| self.metaData[name] = DataSetMetaData(dtype) | |
| def addFileCatData(self, filePath, *columns): | |
| """ | |
| add categorical columns from a file | |
| Parameters | |
| filePath : path of file with data | |
| columns : indexes followed by column names or column names | |
| """ | |
| self.__printBanner("adding categorical columns from a file") | |
| columns = list(columns) | |
| noHeader = type(columns[0]) == int | |
| if noHeader: | |
| df = pd.read_csv(filePath, header=None) | |
| else: | |
| df = pd.read_csv(filePath, header=0) | |
| self.addDataFrameCatData(df, *columns) | |
| self.__printDone() | |
| def addDataFrameCatData(self, df, *columns): | |
| """ | |
| add categorical columns from a data frame | |
| Parameters | |
| df : data frame with data | |
| columns : indexes followed by column names or column names | |
| """ | |
| self.__printBanner("adding categorical columns from a data frame") | |
| columns = list(columns) | |
| noHeader = type(columns[0]) == int | |
| if noHeader: | |
| nCols = int(len(columns) / 2) | |
| colIndexes = columns[:nCols] | |
| nColsDf = len(df.columns) | |
| for i in range(nCols): | |
| ci = colIndexes[i] | |
| assert ci < nColsDf, "col index {} outside range".format(ci) | |
| col = df.loc[ : , ci] | |
| assert isCategorical(col), "data is not categorical" | |
| col = col.tolist() | |
| cn = columns[i + nCols] | |
| self.__addDataSet(cn, col, DataSetMetaData.dtypeCat) | |
| else: | |
| for c in columns: | |
| col = df[c].tolist() | |
| self.__addDataSet(c, col, DataSetMetaData.dtypeCat) | |
| def addListCatData(self, ds, name): | |
| """ | |
| add categorical list data | |
| Parameters | |
| ds : list with data | |
| name : name of data set | |
| """ | |
| self.__printBanner("adding categorical list data") | |
| assert type(ds) == list, "data not a list" | |
| assert isCategorical(ds), "data is not categorical" | |
| self.__addDataSet(name, ds, DataSetMetaData.dtypeCat) | |
| self.__printDone() | |
| def remData(self, ds): | |
| """ | |
| removes data set | |
| Parameters | |
| ds : data set name | |
| """ | |
| self.__printBanner("removing data set", ds) | |
| assert ds in self.dataSets, "data set {} does not exist, please add it first".format(ds) | |
| self.dataSets.pop(ds) | |
| self.metaData.pop(ds) | |
| names = self.showNames() | |
| self.__printDone() | |
| return names | |
| def addNote(self, ds, note): | |
| """ | |
| get data | |
| Parameters | |
| ds : data set name or list or numpy array with data | |
| note: note text | |
| """ | |
| self.__printBanner("adding note") | |
| assert ds in self.dataSets, "data set {} does not exist, please add it first".format(ds) | |
| mdata = self.metaData[ds] | |
| mdata.addNote(note) | |
| self.__printDone() | |
| def getNotes(self, ds): | |
| """ | |
| get data | |
| Parameters | |
| ds : data set name or list or numpy array with data | |
| """ | |
| self.__printBanner("getting notes") | |
| assert ds in self.dataSets, "data set {} does not exist, please add it first".format(ds) | |
| mdata = self.metaData[ds] | |
| dnotes = mdata.notes | |
| if self.verbose: | |
| for dn in dnotes: | |
| print(dn) | |
| return dnotes | |
| def getNumericData(self, ds): | |
| """ | |
| get numeric data | |
| Parameters | |
| ds : data set name or list or numpy array with data | |
| """ | |
| if type(ds) == str: | |
| assert ds in self.dataSets, "data set {} does not exist, please add it first".format(ds) | |
| assert self.metaData[ds].dtype == DataSetMetaData.dtypeNum, "data set {} is expected to be numerical type for this operation".format(ds) | |
| data = self.dataSets[ds] | |
| elif type(ds) == list: | |
| assert isNumeric(ds), "data is not numeric" | |
| data = np.array(ds) | |
| elif type(ds) == np.ndarray: | |
| data = ds | |
| else: | |
| raise "invalid type, expecting data set name, list or ndarray" | |
| return data | |
| def getCatData(self, ds): | |
| """ | |
| get categorical data | |
| Parameters | |
| ds : data set name or list with data | |
| """ | |
| if type(ds) == str: | |
| assert ds in self.dataSets, "data set {} does not exist, please add it first".format(ds) | |
| assert self.metaData[ds].dtype == DataSetMetaData.dtypeCat, "data set {} is expected to be categorical type for this operation".format(ds) | |
| data = self.dataSets[ds] | |
| elif type(ds) == list: | |
| assert isCategorical(ds), "data is not categorical" | |
| data = ds | |
| else: | |
| raise "invalid type, expecting data set name or list" | |
| return data | |
| def getAnyData(self, ds): | |
| """ | |
| get any data | |
| Parameters | |
| ds : data set name or list with data | |
| """ | |
| if type(ds) == str: | |
| assert ds in self.dataSets, "data set {} does not exist, please add it first".format(ds) | |
| data = self.dataSets[ds] | |
| elif type(ds) == list: | |
| data = ds | |
| else: | |
| raise "invalid type, expecting data set name or list" | |
| return data | |
| def loadCatFloatDataFrame(self, ds1, ds2): | |
| """ | |
| loads float and cat data into data frame | |
| Parameters | |
| ds1: data set name or list | |
| ds2: data set name or list or numpy array | |
| """ | |
| data1 = self.getCatData(ds1) | |
| data2 = self.getNumericData(ds2) | |
| self.ensureSameSize([data1, data2]) | |
| df1 = pd.DataFrame(data=data1) | |
| df2 = pd.DataFrame(data=data2) | |
| df = pd.concat([df1,df2], axis=1) | |
| df.columns = range(df.shape[1]) | |
| return df | |
| def showNames(self): | |
| """ | |
| lists data set names | |
| """ | |
| self.__printBanner("listing data set names") | |
| names = self.dataSets.keys() | |
| if self.verbose: | |
| print("data sets") | |
| for ds in names: | |
| print(ds) | |
| self.__printDone() | |
| return names | |
| def plot(self, ds, yscale=None): | |
| """ | |
| plots data | |
| Parameters | |
| ds: data set name or list or numpy array | |
| yscale: y scale | |
| """ | |
| self.__printBanner("plotting data", ds) | |
| data = self.getNumericData(ds) | |
| drawLine(data, yscale) | |
| def plotZoomed(self, ds, beg, end, yscale=None): | |
| """ | |
| plots zoomed data | |
| Parameters | |
| ds: data set name or list or numpy array | |
| beg: begin offset | |
| end: end offset | |
| yscale: y scale | |
| """ | |
| self.__printBanner("plotting data", ds) | |
| data = self.getNumericData(ds) | |
| drawLine(data[beg:end], yscale) | |
| def scatterPlot(self, ds1, ds2): | |
| """ | |
| scatter plots data | |
| Parameters | |
| ds1: data set name or list or numpy array | |
| ds2: data set name or list or numpy array | |
| """ | |
| self.__printBanner("scatter plotting data", ds1, ds2) | |
| data1 = self.getNumericData(ds1) | |
| data2 = self.getNumericData(ds2) | |
| self.ensureSameSize([data1, data2]) | |
| x = np.arange(1, len(data1)+1, 1) | |
| plt.scatter(x, data1 ,color="red") | |
| plt.scatter(x, data2 ,color="blue") | |
| plt.show() | |
| def print(self, ds): | |
| """ | |
| prunt data | |
| Parameters | |
| ds: data set name or list or numpy array | |
| """ | |
| self.__printBanner("printing data", ds) | |
| assert ds in self.dataSets, "data set {} does not exist, please add it first".format(ds) | |
| data = self.dataSets[ds] | |
| if self.verbore: | |
| print(formatAny(len(data), "size")) | |
| print("showing first 50 elements" ) | |
| print(data[:50]) | |
| def plotHist(self, ds, cumulative, density, nbins=20): | |
| """ | |
| plots histogram | |
| Parameters | |
| ds: data set name or list or numpy array | |
| cumulative : True if cumulative | |
| density : True to normalize for probability density | |
| nbins : no of bins | |
| """ | |
| self.__printBanner("plotting histogram", ds) | |
| data = self.getNumericData(ds) | |
| plt.hist(data, bins=nbins, cumulative=cumulative, density=density) | |
| plt.show() | |
| def isMonotonicallyChanging(self, ds): | |
| """ | |
| checks if monotonically increasing or decreasing | |
| Parameters | |
| ds: data set name or list or numpy array | |
| """ | |
| self.__printBanner("checking monotonic change", ds) | |
| data = self.getNumericData(ds) | |
| monoIncreasing = all(list(map(lambda i : data[i] >= data[i-1], range(1, len(data), 1)))) | |
| monoDecreasing = all(list(map(lambda i : data[i] <= data[i-1], range(1, len(data), 1)))) | |
| result = self.__printResult("monoIncreasing", monoIncreasing, "monoDecreasing", monoDecreasing) | |
| return result | |
| def getFreqDistr(self, ds, nbins=20): | |
| """ | |
| get histogram | |
| Parameters | |
| ds: data set name or list or numpy array | |
| nbins: num of bins | |
| """ | |
| self.__printBanner("getting histogram", ds) | |
| data = self.getNumericData(ds) | |
| frequency, lowLimit, binsize, extraPoints = sta.relfreq(data, numbins=nbins) | |
| result = self.__printResult("frequency", frequency, "lowLimit", lowLimit, "binsize", binsize, "extraPoints", extraPoints) | |
| return result | |
| def getCumFreqDistr(self, ds, nbins=20): | |
| """ | |
| get cumulative freq distribution | |
| Parameters | |
| ds: data set name or list or numpy array | |
| nbins: num of bins | |
| """ | |
| self.__printBanner("getting cumulative freq distribution", ds) | |
| data = self.getNumericData(ds) | |
| cumFrequency, lowLimit, binsize, extraPoints = sta.cumfreq(data, numbins=nbins) | |
| result = self.__printResult("cumFrequency", cumFrequency, "lowLimit", lowLimit, "binsize", binsize, "extraPoints", extraPoints) | |
| return result | |
| def getExtremeValue(self, ds, ensamp, nsamp, polarity, doPlotDistr, nbins=20): | |
| """ | |
| get extreme values | |
| Parameters | |
| ds: data set name or list or numpy array | |
| ensamp: num of samples for extreme values | |
| nsamp: num of samples | |
| polarity: max or min | |
| doPlotDistr: plot distr | |
| nbins: num of bins | |
| """ | |
| self.__printBanner("getting extreme values", ds) | |
| data = self.getNumericData(ds) | |
| evalues = list() | |
| for _ in range(ensamp): | |
| values = selectRandomSubListFromListWithRepl(data, nsamp) | |
| if polarity == "max": | |
| evalues.append(max(values)) | |
| else: | |
| evalues.append(min(values)) | |
| if doPlotDistr: | |
| plt.hist(evalues, bins=nbins, cumulative=False, density=True) | |
| plt.show() | |
| result = self.__printResult("extremeValues", evalues) | |
| return result | |
| def getEntropy(self, ds, nbins=20): | |
| """ | |
| get entropy | |
| Parameters | |
| ds: data set name or list or numpy array | |
| nbins: num of bins | |
| """ | |
| self.__printBanner("getting entropy", ds) | |
| data = self.getNumericData(ds) | |
| result = self.getFreqDistr(data, nbins) | |
| entropy = sta.entropy(result["frequency"]) | |
| result = self.__printResult("entropy", entropy) | |
| return result | |
| def getRelEntropy(self, ds1, ds2, nbins=20): | |
| """ | |
| get relative entropy or KL divergence with both data sets numeric | |
| Parameters | |
| ds1: data set name or list or numpy array | |
| ds2: data set name or list or numpy array | |
| nbins: num of bins | |
| """ | |
| self.__printBanner("getting relative entropy or KL divergence", ds1, ds2) | |
| data1 = self.getNumericData(ds1) | |
| data2 = self.getNumericData(ds2) | |
| result1 = self .getFeqDistr(data1, nbins) | |
| freq1 = result1["frequency"] | |
| result2 = self .getFeqDistr(data2, nbins) | |
| freq2 = result2["frequency"] | |
| entropy = sta.entropy(freq1, freq2) | |
| result = self.__printResult("relEntropy", entropy) | |
| return result | |
| def getAnyEntropy(self, ds, dt, nbins=20): | |
| """ | |
| get entropy of any data typr numeric or categorical | |
| Parameters | |
| ds: data set name or list or numpy array | |
| dt : data type num or cat | |
| nbins: num of bins | |
| """ | |
| entropy = self.getEntropy(ds, nbins)["entropy"] if dt == "num" else self.getStatsCat(ds)["entropy"] | |
| result = self.__printResult("entropy", entropy) | |
| return result | |
| def getJointEntropy(self, ds1, ds2, nbins=20): | |
| """ | |
| get joint entropy with both data sets numeric | |
| Parameters | |
| ds1: data set name or list or numpy array | |
| ds2: data set name or list or numpy array | |
| nbins: num of bins | |
| """ | |
| self.__printBanner("getting join entropy", ds1, ds2) | |
| data1 = self.getNumericData(ds1) | |
| data2 = self.getNumericData(ds2) | |
| self.ensureSameSize([data1, data2]) | |
| hist, xedges, yedges = np.histogram2d(data1, data2, bins=nbins) | |
| hist = hist.flatten() | |
| ssize = len(data1) | |
| hist = hist / ssize | |
| entropy = sta.entropy(hist) | |
| result = self.__printResult("jointEntropy", entropy) | |
| return result | |
| def getAllNumMutualInfo(self, ds1, ds2, nbins=20): | |
| """ | |
| get mutual information for both numeric data | |
| Parameters | |
| ds1: data set name or list or numpy array | |
| ds2: data set name or list or numpy array | |
| nbins: num of bins | |
| """ | |
| self.__printBanner("getting mutual information", ds1, ds2) | |
| en1 = self.getEntropy(ds1,nbins) | |
| en2 = self.getEntropy(ds2,nbins) | |
| en = self.getJointEntropy(ds1, ds2, nbins) | |
| mutInfo = en1["entropy"] + en2["entropy"] - en["jointEntropy"] | |
| result = self.__printResult("mutInfo", mutInfo) | |
| return result | |
| def getNumCatMutualInfo(self, nds, cds ,nbins=20): | |
| """ | |
| get mutiual information between numeric and categorical data | |
| Parameters | |
| nds: numeric data set name or list or numpy array | |
| cds: categoric data set name or list | |
| nbins: num of bins | |
| """ | |
| self.__printBanner("getting mutual information of numerical and categorical data", nds, cds) | |
| ndata = self.getNumericData(nds) | |
| cds = self.getCatData(cds) | |
| nentr = self.getEntropy(nds)["entropy"] | |
| #conditional entropy | |
| cdistr = self.getStatsCat(cds)["distr"] | |
| grdata = self.getGroupByData(nds, cds, True)["groupedData"] | |
| cnentr = 0 | |
| for gr, data in grdata.items(): | |
| self.addListNumericData(data, "grdata") | |
| gnentr = self.getEntropy("grdata")["entropy"] | |
| cnentr += gnentr * cdistr[gr] | |
| mutInfo = nentr - cnentr | |
| result = self.__printResult("mutInfo", mutInfo, "entropy", nentr, "condEntropy", cnentr) | |
| return result | |
| def getTwoCatMutualInfo(self, cds1, cds2): | |
| """ | |
| get mutiual information between 2 categorical data sets | |
| Parameters | |
| cds1 : categoric data set name or list | |
| cds2 : categoric data set name or list | |
| """ | |
| self.__printBanner("getting mutual information of two categorical data sets", cds1, cds2) | |
| cdata1 = self.getCatData(cds1) | |
| cdata2 = self.getCatData(cds1) | |
| centr = self.getStatsCat(cds1)["entropy"] | |
| #conditional entropy | |
| cdistr = self.getStatsCat(cds2)["distr"] | |
| grdata = self.getGroupByData(cds1, cds2, True)["groupedData"] | |
| ccentr = 0 | |
| for gr, data in grdata.items(): | |
| self.addListCatData(data, "grdata") | |
| gcentr = self.getStatsCat("grdata")["entropy"] | |
| ccentr += gcentr * cdistr[gr] | |
| mutInfo = centr - ccentr | |
| result = self.__printResult("mutInfo", mutInfo, "entropy", centr, "condEntropy", ccentr) | |
| return result | |
| def getMutualInfo(self, dst, nbins=20): | |
| """ | |
| get mutiual information between 2 data sets,any combination numerical and categorical | |
| Parameters | |
| dst : data source , data type, data source , data type | |
| nbins : num of bins | |
| """ | |
| assertEqual(len(dst), 4, "invalid data source and data type list size") | |
| dtypes = ["num", "cat"] | |
| assertInList(dst[1], dtypes, "invalid data type") | |
| assertInList(dst[3], dtypes, "invalid data type") | |
| self.__printBanner("getting mutual information of any mix numerical and categorical data", dst[0], dst[2]) | |
| if dst[1] == "num": | |
| mutInfo = self.getAllNumMutualInfo(dst[0], dst[2], nbins)["mutInfo"] if dst[3] == "num" \ | |
| else self.getNumCatMutualInfo(dst[0], dst[2], nbins)["mutInfo"] | |
| else: | |
| mutInfo = self.getNumCatMutualInfo(dst[2], dst[0], nbins)["mutInfo"] if dst[3] == "num" \ | |
| else self.getTwoCatMutualInfo(dst[2], dst[0])["mutInfo"] | |
| result = self.__printResult("mutInfo", mutInfo) | |
| return result | |
| def getCondMutualInfo(self, dst, nbins=20): | |
| """ | |
| get conditional mutiual information between 2 data sets,any combination numerical and categorical | |
| Parameters | |
| dst : data source , data type, data source , data type, data source , data type | |
| nbins : num of bins | |
| """ | |
| assertEqual(len(dst), 6, "invalid data source and data type list size") | |
| dtypes = ["num", "cat"] | |
| assertInList(dst[1], dtypes, "invalid data type") | |
| assertInList(dst[3], dtypes, "invalid data type") | |
| assertInList(dst[5], dtypes, "invalid data type") | |
| self.__printBanner("getting conditional mutual information of any mix numerical and categorical data", dst[0], dst[2]) | |
| if dst[5] == "cat": | |
| cdistr = self.getStatsCat(dst[4])["distr"] | |
| grdata1 = self.getGroupByData(dst[0], dst[4], True)["groupedData"] | |
| grdata2 = self.getGroupByData(dst[2], dst[4], True)["groupedData"] | |
| else: | |
| gdata = self.getNumericData(dst[4]) | |
| hist = Histogram.createWithNumBins(gdata, nbins) | |
| cdistr = hist.distr() | |
| grdata1 = self.getGroupByData(dst[0], dst[4], False)["groupedData"] | |
| grdata2 = self.getGroupByData(dst[2], dst[4], False)["groupedData"] | |
| cminfo = 0 | |
| for gr in grdata1.keys(): | |
| data1 = grdata1[gr] | |
| data2 = grdata2[gr] | |
| if dst[1] == "num": | |
| self.addListNumericData(data1, "grdata1") | |
| else: | |
| self.addListCatData(data1, "grdata1") | |
| if dst[3] == "num": | |
| self.addListNumericData(data2, "grdata2") | |
| else: | |
| self.addListCatData(data2, "grdata2") | |
| gdst = ["grdata1", dst[1], "grdata2", dst[3]] | |
| minfo = self.getMutualInfo(gdst, nbins)["mutInfo"] | |
| cminfo += minfo * cdistr[gr] | |
| result = self.__printResult("condMutInfo", cminfo) | |
| return result | |
| def getPercentile(self, ds, value): | |
| """ | |
| gets percentile | |
| Parameters | |
| ds: data set name or list or numpy array | |
| value: the value | |
| """ | |
| self.__printBanner("getting percentile", ds) | |
| data = self.getNumericData(ds) | |
| percent = sta.percentileofscore(data, value) | |
| result = self.__printResult("value", value, "percentile", percent) | |
| return result | |
| def getValueRangePercentile(self, ds, value1, value2): | |
| """ | |
| gets percentile | |
| Parameters | |
| ds: data set name or list or numpy array | |
| value1: first value | |
| value2: second value | |
| """ | |
| self.__printBanner("getting percentile difference for value range", ds) | |
| if value1 < value2: | |
| v1 = value1 | |
| v2 = value2 | |
| else: | |
| v1 = value2 | |
| v2 = value1 | |
| data = self.getNumericData(ds) | |
| per1 = sta.percentileofscore(data, v1) | |
| per2 = sta.percentileofscore(data, v2) | |
| result = self.__printResult("valueFirst", value1, "valueSecond", value2, "percentileDiff", per2 - per1) | |
| return result | |
| def getValueAtPercentile(self, ds, percent): | |
| """ | |
| gets value at percentile | |
| Parameters | |
| ds: data set name or list or numpy array | |
| percent: percentile | |
| """ | |
| self.__printBanner("getting value at percentile", ds) | |
| data = self.getNumericData(ds) | |
| assert isInRange(percent, 0, 100), "percent should be between 0 and 100" | |
| value = sta.scoreatpercentile(data, percent) | |
| result = self.__printResult("value", value, "percentile", percent) | |
| return result | |
| def getLessThanValues(self, ds, cvalue): | |
| """ | |
| gets values less than given value | |
| Parameters | |
| ds: data set name or list or numpy array | |
| cvalue: condition value | |
| """ | |
| self.__printBanner("getting values less than", ds) | |
| fdata = self.__getCondValues(ds, cvalue, "lt") | |
| result = self.__printResult("count", len(fdata), "lessThanvalues", fdata ) | |
| return result | |
| def getGreaterThanValues(self, ds, cvalue): | |
| """ | |
| gets values greater than given value | |
| Parameters | |
| ds: data set name or list or numpy array | |
| cvalue: condition value | |
| """ | |
| self.__printBanner("getting values greater than", ds) | |
| fdata = self.__getCondValues(ds, cvalue, "gt") | |
| result = self.__printResult("count", len(fdata), "greaterThanvalues", fdata ) | |
| return result | |
| def __getCondValues(self, ds, cvalue, cond): | |
| """ | |
| gets cinditional values | |
| Parameters | |
| ds: data set name or list or numpy array | |
| cvalue: condition value | |
| cond: condition | |
| """ | |
| data = self.getNumericData(ds) | |
| if cond == "lt": | |
| ind = np.where(data < cvalue) | |
| else: | |
| ind = np.where(data > cvalue) | |
| fdata = data[ind] | |
| return fdata | |
| def getUniqueValueCounts(self, ds, maxCnt=10): | |
| """ | |
| gets unique values and counts | |
| Parameters | |
| ds: data set name or list or numpy array | |
| maxCnt; max value count pairs to return | |
| """ | |
| self.__printBanner("getting unique values and counts", ds) | |
| data = self.getNumericData(ds) | |
| values, counts = sta.find_repeats(data) | |
| cardinality = len(values) | |
| vc = list(zip(values, counts)) | |
| vc.sort(key = lambda v : v[1], reverse = True) | |
| result = self.__printResult("cardinality", cardinality, "vunique alues and repeat counts", vc[:maxCnt]) | |
| return result | |
| def getCatUniqueValueCounts(self, ds, maxCnt=10): | |
| """ | |
| gets unique categorical values and counts | |
| Parameters | |
| ds: data set name or list or numpy array | |
| maxCnt: max value count pairs to return | |
| """ | |
| self.__printBanner("getting unique categorical values and counts", ds) | |
| data = self.getCatData(ds) | |
| series = pd.Series(data) | |
| uvalues = series.value_counts() | |
| values = uvalues.index.tolist() | |
| counts = uvalues.tolist() | |
| vc = list(zip(values, counts)) | |
| vc.sort(key = lambda v : v[1], reverse = True) | |
| result = self.__printResult("cardinality", len(values), "unique values and repeat counts", vc[:maxCnt]) | |
| return result | |
| def getCatAlphaValueCounts(self, ds): | |
| """ | |
| gets alphabetic value count | |
| Parameters | |
| ds: data set name or list or numpy array | |
| """ | |
| self.__printBanner("getting alphabetic value counts", ds) | |
| data = self.getCatData(ds) | |
| series = pd.Series(data) | |
| flags = series.str.isalpha().tolist() | |
| count = sum(flags) | |
| result = self.__printResult("alphabeticValueCount", count) | |
| return result | |
| def getCatNumValueCounts(self, ds): | |
| """ | |
| gets numeric value count | |
| Parameters | |
| ds: data set name or list or numpy array | |
| """ | |
| self.__printBanner("getting numeric value counts", ds) | |
| data = self.getCatData(ds) | |
| series = pd.Series(data) | |
| flags = series.str.isnumeric().tolist() | |
| count = sum(flags) | |
| result = self.__printResult("numericValueCount", count) | |
| return result | |
| def getCatAlphaNumValueCounts(self, ds): | |
| """ | |
| gets alpha numeric value count | |
| Parameters | |
| ds: data set name or list or numpy array | |
| """ | |
| self.__printBanner("getting alpha numeric value counts", ds) | |
| data = self.getCatData(ds) | |
| series = pd.Series(data) | |
| flags = series.str.isalnum().tolist() | |
| count = sum(flags) | |
| result = self.__printResult("alphaNumericValueCount", count) | |
| return result | |
| def getCatAllCharCounts(self, ds): | |
| """ | |
| gets alphabetic, numeric and special char count list | |
| Parameters | |
| ds: data set name or list or numpy array | |
| """ | |
| self.__printBanner("getting alphabetic, numeric and special char counts", ds) | |
| data = self.getCatData(ds) | |
| counts = list() | |
| for d in data: | |
| r = getAlphaNumCharCount(d) | |
| counts.append(r) | |
| result = self.__printResult("allTypeCharCounts", counts) | |
| return result | |
| def getCatAlphaCharCounts(self, ds): | |
| """ | |
| gets alphabetic char count list | |
| Parameters | |
| ds: data set name or list or numpy array | |
| """ | |
| self.__printBanner("getting alphabetic char counts", ds) | |
| data = self.getCatData(ds) | |
| counts = self.getCatAllCharCounts(ds)["allTypeCharCounts"] | |
| counts = list(map(lambda r : r[0], counts)) | |
| result = self.__printResult("alphaCharCounts", counts) | |
| return result | |
| def getCatNumCharCounts(self, ds): | |
| """ | |
| gets numeric char count list | |
| Parameters | |
| ds: data set name or list or numpy array | |
| """ | |
| self.__printBanner("getting numeric char counts", ds) | |
| data = self.getCatData(ds) | |
| counts = self.getCatAllCharCounts(ds)["allTypeCharCounts"] | |
| counts = list(map(lambda r : r[1], counts)) | |
| result = self.__printResult("numCharCounts", counts) | |
| return result | |
| def getCatSpecialCharCounts(self, ds): | |
| """ | |
| gets special char count list | |
| Parameters | |
| ds: data set name or list or numpy array | |
| """ | |
| self.__printBanner("getting special char counts", ds) | |
| counts = self.getCatAllCharCounts(ds)["allTypeCharCounts"] | |
| counts = list(map(lambda r : r[2], counts)) | |
| result = self.__printResult("specialCharCounts", counts) | |
| return result | |
| def getCatAlphaCharCountStats(self, ds): | |
| """ | |
| gets alphabetic char count stats | |
| Parameters | |
| ds: data set name or list or numpy array | |
| """ | |
| self.__printBanner("getting alphabetic char count stats", ds) | |
| counts = self.getCatAlphaCharCounts(ds)["alphaCharCounts"] | |
| nz = counts.count(0) | |
| st = self.__getBasicStats(np.array(counts)) | |
| result = self.__printResult("mean", st[0], "std dev", st[1], "max", st[2], "min", st[3], "zeroCount", nz) | |
| return result | |
| def getCatNumCharCountStats(self, ds): | |
| """ | |
| gets numeric char count stats | |
| Parameters | |
| ds: data set name or list or numpy array | |
| """ | |
| self.__printBanner("getting numeric char count stats", ds) | |
| counts = self.getCatNumCharCounts(ds)["numCharCounts"] | |
| nz = counts.count(0) | |
| st = self.__getBasicStats(np.array(counts)) | |
| result = self.__printResult("mean", st[0], "std dev", st[1], "max", st[2], "min", st[3], "zeroCount", nz) | |
| return result | |
| def getCatSpecialCharCountStats(self, ds): | |
| """ | |
| gets special char count stats | |
| Parameters | |
| ds: data set name or list or numpy array | |
| """ | |
| self.__printBanner("getting special char count stats", ds) | |
| counts = self.getCatSpecialCharCounts(ds)["specialCharCounts"] | |
| nz = counts.count(0) | |
| st = self.__getBasicStats(np.array(counts)) | |
| result = self.__printResult("mean", st[0], "std dev", st[1], "max", st[2], "min", st[3], "zeroCount", nz) | |
| return result | |
| def getCatFldLenStats(self, ds): | |
| """ | |
| gets field length stats | |
| Parameters | |
| ds: data set name or list or numpy array | |
| """ | |
| self.__printBanner("getting field length stats", ds) | |
| data = self.getCatData(ds) | |
| le = list(map(lambda d: len(d), data)) | |
| st = self.__getBasicStats(np.array(le)) | |
| result = self.__printResult("mean", st[0], "std dev", st[1], "max", st[2], "min", st[3]) | |
| return result | |
| def getCatCharCountStats(self, ds, ch): | |
| """ | |
| gets specified char ocuurence count stats | |
| Parameters | |
| ds: data set name or list or numpy array | |
| ch : character | |
| """ | |
| self.__printBanner("getting field length stats", ds) | |
| data = self.getCatData(ds) | |
| counts = list(map(lambda d: d.count(ch), data)) | |
| nz = counts.count(0) | |
| st = self.__getBasicStats(np.array(counts)) | |
| result = self.__printResult("mean", st[0], "std dev", st[1], "max", st[2], "min", st[3], "zeroCount", nz) | |
| return result | |
| def getStats(self, ds, nextreme=5): | |
| """ | |
| gets summary statistics | |
| Parameters | |
| ds: data set name or list or numpy array | |
| nextreme: num of extreme values | |
| """ | |
| self.__printBanner("getting summary statistics", ds) | |
| data = self.getNumericData(ds) | |
| stat = dict() | |
| stat["length"] = len(data) | |
| stat["min"] = data.min() | |
| stat["max"] = data.max() | |
| series = pd.Series(data) | |
| stat["n smallest"] = series.nsmallest(n=nextreme).tolist() | |
| stat["n largest"] = series.nlargest(n=nextreme).tolist() | |
| stat["mean"] = data.mean() | |
| stat["median"] = np.median(data) | |
| mode, modeCnt = sta.mode(data) | |
| stat["mode"] = mode[0] | |
| stat["mode count"] = modeCnt[0] | |
| stat["std"] = np.std(data) | |
| stat["skew"] = sta.skew(data) | |
| stat["kurtosis"] = sta.kurtosis(data) | |
| stat["mad"] = sta.median_absolute_deviation(data) | |
| self.pp.pprint(stat) | |
| return stat | |
| def getStatsCat(self, ds): | |
| """ | |
| gets summary statistics for categorical data | |
| Parameters | |
| ds: data set name or list or numpy array | |
| """ | |
| self.__printBanner("getting summary statistics for categorical data", ds) | |
| data = self.getCatData(ds) | |
| ch = CatHistogram() | |
| for d in data: | |
| ch.add(d) | |
| mode = ch.getMode() | |
| entr = ch.getEntropy() | |
| uvalues = ch.getUniqueValues() | |
| distr = ch.getDistr() | |
| result = self.__printResult("entropy", entr, "mode", mode, "uniqueValues", uvalues, "distr", distr) | |
| return result | |
| def getGroupByData(self, ds, gds, gdtypeCat, numBins=20): | |
| """ | |
| group by | |
| Parameters | |
| ds: data set name or list or numpy array | |
| gds: group by data set name or list or numpy array | |
| gdtpe : group by data type | |
| """ | |
| self.__printBanner("getting group by data", ds) | |
| data = self.getAnyData(ds) | |
| if gdtypeCat: | |
| gdata = self.getCatData(gds) | |
| else: | |
| gdata = self.getNumericData(gds) | |
| hist = Histogram.createWithNumBins(gdata, numBins) | |
| gdata = list(map(lambda d : hist.bin(d), gdata)) | |
| self.ensureSameSize([data, gdata]) | |
| groups = dict() | |
| for g,d in zip(gdata, data): | |
| appendKeyedList(groups, g, d) | |
| ve = self.verbose | |
| self.verbose = False | |
| result = self.__printResult("groupedData", groups) | |
| self.verbose = ve | |
| return result | |
| def getDifference(self, ds, order, doPlot=False): | |
| """ | |
| gets difference of given order | |
| Parameters | |
| ds: data set name or list or numpy array | |
| order: order of difference | |
| doPlot : True for plot | |
| """ | |
| self.__printBanner("getting difference of given order", ds) | |
| data = self.getNumericData(ds) | |
| diff = difference(data, order) | |
| if doPlot: | |
| drawLine(diff) | |
| return diff | |
| def getTrend(self, ds, doPlot=False): | |
| """ | |
| get trend | |
| Parameters | |
| ds: data set name or list or numpy array | |
| doPlot: true if plotting needed | |
| """ | |
| self.__printBanner("getting trend") | |
| data = self.getNumericData(ds) | |
| sz = len(data) | |
| X = list(range(0, sz)) | |
| X = np.reshape(X, (sz, 1)) | |
| model = LinearRegression() | |
| model.fit(X, data) | |
| trend = model.predict(X) | |
| sc = model.score(X, data) | |
| coef = model.coef_ | |
| intc = model.intercept_ | |
| result = self.__printResult("coeff", coef, "intercept", intc, "r square error", sc, "trend", trend) | |
| if doPlot: | |
| plt.plot(data) | |
| plt.plot(trend) | |
| plt.show() | |
| return result | |
| def getDiffSdNoisiness(self, ds): | |
| """ | |
| get noisiness based on std dev of first order difference | |
| Parameters | |
| ds: data set name or list or numpy array | |
| """ | |
| diff = self.getDifference(ds, 1) | |
| noise = np.std(np.array(diff)) | |
| result = self.__printResult("noisiness", noise) | |
| return result | |
| def getMaRmseNoisiness(self, ds, wsize=5): | |
| """ | |
| gets noisiness based on RMSE with moving average | |
| Parameters | |
| ds: data set name or list or numpy array | |
| wsize : window size | |
| """ | |
| assert wsize % 2 == 1, "window size must be odd" | |
| data = self.getNumericData(ds) | |
| wind = data[:wsize] | |
| wstat = SlidingWindowStat.initialize(wind.tolist()) | |
| whsize = int(wsize / 2) | |
| beg = whsize | |
| end = len(data) - whsize - 1 | |
| sumSq = 0.0 | |
| mean = wstat.getStat()[0] | |
| diff = data[beg] - mean | |
| sumSq += diff * diff | |
| for i in range(beg + 1, end, 1): | |
| mean = wstat.addGetStat(data[i + whsize])[0] | |
| diff = data[i] - mean | |
| sumSq += (diff * diff) | |
| noise = math.sqrt(sumSq / (len(data) - 2 * whsize)) | |
| result = self.__printResult("noisiness", noise) | |
| return result | |
| def deTrend(self, ds, trend, doPlot=False): | |
| """ | |
| de trend | |
| Parameters | |
| ds: data set name or list or numpy array | |
| ternd : trend data | |
| doPlot: true if plotting needed | |
| """ | |
| self.__printBanner("doing de trend", ds) | |
| data = self.getNumericData(ds) | |
| sz = len(data) | |
| detrended = list(map(lambda i : data[i]-trend[i], range(sz))) | |
| if doPlot: | |
| drawLine(detrended) | |
| return detrended | |
| def getTimeSeriesComponents(self, ds, model, freq, summaryOnly, doPlot=False): | |
| """ | |
| extracts trend, cycle and residue components of time series | |
| Parameters | |
| ds: data set name or list or numpy array | |
| model : model type | |
| freq : seasnality period | |
| summaryOnly : True if only summary needed in output | |
| doPlot: true if plotting needed | |
| """ | |
| self.__printBanner("extracting trend, cycle and residue components of time series", ds) | |
| assert model == "additive" or model == "multiplicative", "model must be additive or multiplicative" | |
| data = self.getNumericData(ds) | |
| res = seasonal_decompose(data, model=model, period=freq) | |
| if doPlot: | |
| res.plot() | |
| plt.show() | |
| #summar of componenets | |
| trend = np.array(removeNan(res.trend)) | |
| trendMean = trend.mean() | |
| trendSlope = (trend[-1] - trend[0]) / (len(trend) - 1) | |
| seasonal = np.array(removeNan(res.seasonal)) | |
| seasonalAmp = (seasonal.max() - seasonal.min()) / 2 | |
| resid = np.array(removeNan(res.resid)) | |
| residueMean = resid.mean() | |
| residueStdDev = np.std(resid) | |
| if summaryOnly: | |
| result = self.__printResult("trendMean", trendMean, "trendSlope", trendSlope, "seasonalAmp", seasonalAmp, | |
| "residueMean", residueMean, "residueStdDev", residueStdDev) | |
| else: | |
| result = self.__printResult("trendMean", trendMean, "trendSlope", trendSlope, "seasonalAmp", seasonalAmp, | |
| "residueMean", residueMean, "residueStdDev", residueStdDev, "trend", res.trend, "seasonal", res.seasonal, | |
| "residual", res.resid) | |
| return result | |
| def getGausianMixture(self, ncomp, cvType, ninit, *dsl): | |
| """ | |
| finds gaussian mixture parameters | |
| Parameters | |
| ncomp : num of gaussian componenets | |
| cvType : co variance type | |
| ninit: num of intializations | |
| dsl: list of data set name or list or numpy array | |
| """ | |
| self.__printBanner("getting gaussian mixture parameters", *dsl) | |
| assertInList(cvType, ["full", "tied", "diag", "spherical"], "invalid covariance type") | |
| dmat = self.__stackData(*dsl) | |
| gm = GaussianMixture(n_components=ncomp, covariance_type=cvType, n_init=ninit) | |
| gm.fit(dmat) | |
| weights = gm.weights_ | |
| means = gm.means_ | |
| covars = gm.covariances_ | |
| converged = gm.converged_ | |
| niter = gm.n_iter_ | |
| aic = gm.aic(dmat) | |
| result = self.__printResult("weights", weights, "mean", means, "covariance", covars, "converged", converged, "num iterations", niter, "aic", aic) | |
| return result | |
| def getKmeansCluster(self, nclust, ninit, *dsl): | |
| """ | |
| gets cluster parameters | |
| Parameters | |
| nclust : num of clusters | |
| ninit: num of intializations | |
| dsl: list of data set name or list or numpy array | |
| """ | |
| self.__printBanner("getting kmean cluster parameters", *dsl) | |
| dmat = self.__stackData(*dsl) | |
| nsamp = dmat.shape[0] | |
| km = KMeans(n_clusters=nclust, n_init=ninit) | |
| km.fit(dmat) | |
| centers = km.cluster_centers_ | |
| avdist = sqrt(km.inertia_ / nsamp) | |
| niter = km.n_iter_ | |
| score = km.score(dmat) | |
| result = self.__printResult("centers", centers, "average distance", avdist, "num iterations", niter, "score", score) | |
| return result | |
| def getPrincComp(self, ncomp, *dsl): | |
| """ | |
| finds pricipal componenet parameters | |
| Parameters | |
| ncomp : num of pricipal componenets | |
| dsl: list of data set name or list or numpy array | |
| """ | |
| self.__printBanner("getting principal componenet parameters", *dsl) | |
| dmat = self.__stackData(*dsl) | |
| nfeat = dmat.shape[1] | |
| assertGreater(nfeat, 1, "requires multiple features") | |
| assertLesserEqual(ncomp, nfeat, "num of componenets greater than num of features") | |
| pca = PCA(n_components=ncomp) | |
| pca.fit(dmat) | |
| comps = pca.components_ | |
| var = pca.explained_variance_ | |
| varr = pca.explained_variance_ratio_ | |
| svalues = pca.singular_values_ | |
| result = self.__printResult("componenets", comps, "variance", var, "variance ratio", varr, "singular values", svalues) | |
| return result | |
| def getOutliersWithIsoForest(self, contamination, *dsl): | |
| """ | |
| finds outliers using isolation forest | |
| Parameters | |
| contamination : proportion of outliers in the data set | |
| dsl: list of data set name or list or numpy array | |
| """ | |
| self.__printBanner("getting outliers using isolation forest", *dsl) | |
| assert contamination >= 0 and contamination <= 0.5, "contamination outside valid range" | |
| dmat = self.__stackData(*dsl) | |
| isf = IsolationForest(contamination=contamination, behaviour="new") | |
| ypred = isf.fit_predict(dmat) | |
| mask = ypred == -1 | |
| doul = dmat[mask, :] | |
| mask = ypred != -1 | |
| dwoul = dmat[mask, :] | |
| result = self.__printResult("numOutliers", doul.shape[0], "outliers", doul, "dataWithoutOutliers", dwoul) | |
| return result | |
| def getOutliersWithLocalFactor(self, contamination, *dsl): | |
| """ | |
| gets outliers using local outlier factor | |
| Parameters | |
| contamination : proportion of outliers in the data set | |
| dsl: list of data set name or list or numpy array | |
| """ | |
| self.__printBanner("getting outliers using local outlier factor", *dsl) | |
| assert contamination >= 0 and contamination <= 0.5, "contamination outside valid range" | |
| dmat = self.__stackData(*dsl) | |
| lof = LocalOutlierFactor(contamination=contamination) | |
| ypred = lof.fit_predict(dmat) | |
| mask = ypred == -1 | |
| doul = dmat[mask, :] | |
| mask = ypred != -1 | |
| dwoul = dmat[mask, :] | |
| result = self.__printResult("numOutliers", doul.shape[0], "outliers", doul, "dataWithoutOutliers", dwoul) | |
| return result | |
| def getOutliersWithSupVecMach(self, nu, *dsl): | |
| """ | |
| gets outliers using one class svm | |
| Parameters | |
| nu : upper bound on the fraction of training errors and a lower bound of the fraction of support vectors | |
| dsl: list of data set name or list or numpy array | |
| """ | |
| self.__printBanner("getting outliers using one class svm", *dsl) | |
| assert nu >= 0 and nu <= 0.5, "error upper bound outside valid range" | |
| dmat = self.__stackData(*dsl) | |
| svm = OneClassSVM(nu=nu) | |
| ypred = svm.fit_predict(dmat) | |
| mask = ypred == -1 | |
| doul = dmat[mask, :] | |
| mask = ypred != -1 | |
| dwoul = dmat[mask, :] | |
| result = self.__printResult("numOutliers", doul.shape[0], "outliers", doul, "dataWithoutOutliers", dwoul) | |
| return result | |
| def getOutliersWithCovarDeterminant(self, contamination, *dsl): | |
| """ | |
| gets outliers using covariance determinan | |
| Parameters | |
| contamination : proportion of outliers in the data set | |
| dsl: list of data set name or list or numpy array | |
| """ | |
| self.__printBanner("getting outliers using using covariance determinant", *dsl) | |
| assert contamination >= 0 and contamination <= 0.5, "contamination outside valid range" | |
| dmat = self.__stackData(*dsl) | |
| lof = EllipticEnvelope(contamination=contamination) | |
| ypred = lof.fit_predict(dmat) | |
| mask = ypred == -1 | |
| doul = dmat[mask, :] | |
| mask = ypred != -1 | |
| dwoul = dmat[mask, :] | |
| result = self.__printResult("numOutliers", doul.shape[0], "outliers", doul, "dataWithoutOutliers", dwoul) | |
| return result | |
| def getOutliersWithZscore(self, ds, zthreshold, stats=None): | |
| """ | |
| gets outliers using zscore | |
| Parameters | |
| ds: data set name or list or numpy array | |
| zthreshold : z score threshold | |
| stats : tuple cintaining mean and std dev | |
| """ | |
| self.__printBanner("getting outliers using zscore", ds) | |
| data = self.getNumericData(ds) | |
| if stats is None: | |
| mean = data.mean() | |
| sd = np.std(data) | |
| else: | |
| mean = stats[0] | |
| sd = stats[1] | |
| zs = list(map(lambda d : abs((d - mean) / sd), data)) | |
| outliers = list(filter(lambda r : r[1] > zthreshold, enumerate(zs))) | |
| result = self.__printResult("outliers", outliers) | |
| return result | |
| def getOutliersWithRobustZscore(self, ds, zthreshold, stats=None): | |
| """ | |
| gets outliers using robust zscore | |
| Parameters | |
| ds: data set name or list or numpy array | |
| zthreshold : z score threshold | |
| stats : tuple containing median and median absolute deviation | |
| """ | |
| self.__printBanner("getting outliers using robust zscore", ds) | |
| data = self.getNumericData(ds) | |
| if stats is None: | |
| med = np.median(data) | |
| dev = np.array(list(map(lambda d : abs(d - med), data))) | |
| mad = 1.4296 * np.median(dev) | |
| else: | |
| med = stats[0] | |
| mad = stats[1] | |
| rzs = list(map(lambda d : abs((d - med) / mad), data)) | |
| outliers = list(filter(lambda r : r[1] > zthreshold, enumerate(rzs))) | |
| result = self.__printResult("outliers", outliers) | |
| return result | |
| def getSubsequenceOutliersWithDissimilarity(self, subSeqSize, ds): | |
| """ | |
| gets subsequence outlier with subsequence pairwise disimilarity | |
| Parameters | |
| subSeqSize : sub sequence size | |
| ds: data set name or list or numpy array | |
| """ | |
| self.__printBanner("doing sub sequence anomaly detection with dissimilarity", ds) | |
| data = self.getNumericData(ds) | |
| sz = len(data) | |
| dist = dict() | |
| minDist = dict() | |
| for i in range(sz - subSeqSize): | |
| #first window | |
| w1 = data[i : i + subSeqSize] | |
| dmin = None | |
| for j in range(sz - subSeqSize): | |
| #second window not overlapping with the first | |
| if j + subSeqSize <=i or j >= i + subSeqSize: | |
| w2 = data[j : j + subSeqSize] | |
| k = (j,i) | |
| if k in dist: | |
| d = dist[k] | |
| else: | |
| d = euclideanDistance(w1,w2) | |
| k = (i,j) | |
| dist[k] = d | |
| if dmin is None: | |
| dmin = d | |
| else: | |
| dmin = d if d < dmin else dmin | |
| minDist[i] = dmin | |
| #find max of min | |
| dmax = None | |
| offset = None | |
| for k in minDist.keys(): | |
| d = minDist[k] | |
| if dmax is None: | |
| dmax = d | |
| offset = k | |
| else: | |
| if d > dmax: | |
| dmax = d | |
| offset = k | |
| result = self.__printResult("subSeqOffset", offset, "outlierScore", dmax) | |
| return result | |
| def getNullCount(self, ds): | |
| """ | |
| get count of null fields | |
| Parameters | |
| ds : data set name or list or numpy array with data | |
| """ | |
| self.__printBanner("getting null value count", ds) | |
| if type(ds) == str: | |
| assert ds in self.dataSets, "data set {} does not exist, please add it first".format(ds) | |
| data = self.dataSets[ds] | |
| ser = pd.Series(data) | |
| elif type(ds) == list or type(ds) == np.ndarray: | |
| ser = pd.Series(ds) | |
| data = ds | |
| else: | |
| raise ValueError("invalid data type") | |
| nv = ser.isnull().tolist() | |
| nullCount = nv.count(True) | |
| nullFraction = nullCount / len(data) | |
| result = self.__printResult("nullFraction", nullFraction, "nullCount", nullCount) | |
| return result | |
| def fitLinearReg(self, dsx, ds, doPlot=False): | |
| """ | |
| fit linear regression | |
| Parameters | |
| dsx: x data set name or None | |
| ds: data set name or list or numpy array | |
| doPlot: true if plotting needed | |
| """ | |
| self.__printBanner("fitting linear regression", ds) | |
| data = self.getNumericData(ds) | |
| if dsx is None: | |
| x = np.arange(len(data)) | |
| else: | |
| x = self.getNumericData(dsx) | |
| slope, intercept, rvalue, pvalue, stderr = sta.linregress(x, data) | |
| result = self.__printResult("slope", slope, "intercept", intercept, "rvalue", rvalue, "pvalue", pvalue, "stderr", stderr) | |
| if doPlot: | |
| self.regFitPlot(x, data, slope, intercept) | |
| return result | |
| def fitSiegelRobustLinearReg(self, ds, doPlot=False): | |
| """ | |
| siegel robust linear regression fit based on median | |
| Parameters | |
| ds: data set name or list or numpy array | |
| doPlot: true if plotting needed | |
| """ | |
| self.__printBanner("fitting siegel robust linear regression based on median", ds) | |
| data = self.getNumericData(ds) | |
| slope , intercept = sta.siegelslopes(data) | |
| result = self.__printResult("slope", slope, "intercept", intercept) | |
| if doPlot: | |
| x = np.arange(len(data)) | |
| self.regFitPlot(x, data, slope, intercept) | |
| return result | |
| def fitTheilSenRobustLinearReg(self, ds, doPlot=False): | |
| """ | |
| thiel sen robust linear fit regression based on median | |
| Parameters | |
| ds: data set name or list or numpy array | |
| doPlot: true if plotting needed | |
| """ | |
| self.__printBanner("fitting thiel sen robust linear regression based on median", ds) | |
| data = self.getNumericData(ds) | |
| slope, intercept, loSlope, upSlope = sta.theilslopes(data) | |
| result = self.__printResult("slope", slope, "intercept", intercept, "lower slope", loSlope, "upper slope", upSlope) | |
| if doPlot: | |
| x = np.arange(len(data)) | |
| self.regFitPlot(x, data, slope, intercept) | |
| return result | |
| def plotRegFit(self, x, y, slope, intercept): | |
| """ | |
| plot linear rgeression fit line | |
| Parameters | |
| x : x values | |
| y : y values | |
| slope : slope | |
| intercept : intercept | |
| """ | |
| self.__printBanner("plotting linear rgeression fit line") | |
| fig = plt.figure() | |
| ax = fig.add_subplot(111) | |
| ax.plot(x, y, "b.") | |
| ax.plot(x, intercept + slope * x, "r-") | |
| plt.show() | |
| def getRegFit(self, xvalues, yvalues, slope, intercept): | |
| """ | |
| gets fitted line and residue | |
| Parameters | |
| x : x values | |
| y : y values | |
| slope : regression slope | |
| intercept : regressiob intercept | |
| """ | |
| yfit = list() | |
| residue = list() | |
| for x,y in zip(xvalues, yvalues): | |
| yf = x * slope + intercept | |
| yfit.append(yf) | |
| r = y - yf | |
| residue.append(r) | |
| result = self.__printResult("fitted line", yfit, "residue", residue) | |
| return result | |
| def getInfluentialPoints(self, dsx, dsy): | |
| """ | |
| gets influential points in regression model with Cook's distance | |
| Parameters | |
| dsx : data set name or list or numpy array for x | |
| dsy : data set name or list or numpy array for y | |
| """ | |
| self.__printBanner("finding influential points for linear regression", dsx, dsy) | |
| y = self.getNumericData(dsy) | |
| x = np.arange(len(data)) if dsx is None else self.getNumericData(dsx) | |
| model = sm.OLS(y, x).fit() | |
| np.set_printoptions(suppress=True) | |
| influence = model.get_influence() | |
| cooks = influence.cooks_distance | |
| result = self.__printResult("Cook distance", cooks) | |
| return result | |
| def getCovar(self, *dsl): | |
| """ | |
| gets covariance | |
| Parameters | |
| dsl: list of data set name or list or numpy array | |
| """ | |
| self.__printBanner("getting covariance", *dsl) | |
| data = list(map(lambda ds : self.getNumericData(ds), dsl)) | |
| self.ensureSameSize(data) | |
| data = np.vstack(data) | |
| cv = np.cov(data) | |
| print(cv) | |
| return cv | |
| def getPearsonCorr(self, ds1, ds2, sigLev=.05): | |
| """ | |
| gets pearson correlation coefficient | |
| Parameters | |
| ds1: data set name or list or numpy array | |
| ds2: data set name or list or numpy array | |
| """ | |
| self.__printBanner("getting pearson correlation coefficient ", ds1, ds2) | |
| data1 = self.getNumericData(ds1) | |
| data2 = self.getNumericData(ds2) | |
| self.ensureSameSize([data1, data2]) | |
| stat, pvalue = sta.pearsonr(data1, data2) | |
| result = self.__printResult("stat", stat, "pvalue", pvalue) | |
| self.__printStat(stat, pvalue, "probably uncorrelated", "probably correlated", sigLev) | |
| return result | |
| def getSpearmanRankCorr(self, ds1, ds2, sigLev=.05): | |
| """ | |
| gets spearman correlation coefficient | |
| Parameters | |
| ds1: data set name or list or numpy array | |
| ds2: data set name or list or numpy array | |
| sigLev: statistical significance level | |
| """ | |
| self.__printBanner("getting spearman correlation coefficient",ds1, ds2) | |
| data1 = self.getNumericData(ds1) | |
| data2 = self.getNumericData(ds2) | |
| self.ensureSameSize([data1, data2]) | |
| stat, pvalue = sta.spearmanr(data1, data2) | |
| result = self.__printResult("stat", stat, "pvalue", pvalue) | |
| self.__printStat(stat, pvalue, "probably uncorrelated", "probably correlated", sigLev) | |
| return result | |
| def getKendalRankCorr(self, ds1, ds2, sigLev=.05): | |
| """ | |
| kendall’s tau, a correlation measure for ordinal data | |
| Parameters | |
| ds1: data set name or list or numpy array | |
| ds2: data set name or list or numpy array | |
| sigLev: statistical significance level | |
| """ | |
| self.__printBanner("getting kendall’s tau, a correlation measure for ordinal data", ds1, ds2) | |
| data1 = self.getNumericData(ds1) | |
| data2 = self.getNumericData(ds2) | |
| self.ensureSameSize([data1, data2]) | |
| stat, pvalue = sta.kendalltau(data1, data2) | |
| result = self.__printResult("stat", stat, "pvalue", pvalue) | |
| self.__printStat(stat, pvalue, "probably uncorrelated", "probably correlated", sigLev) | |
| return result | |
| def getPointBiserialCorr(self, ds1, ds2, sigLev=.05): | |
| """ | |
| point biserial correlation between binary and numeric | |
| Parameters | |
| ds1: data set name or list or numpy array | |
| ds2: data set name or list or numpy array | |
| sigLev: statistical significance level | |
| """ | |
| self.__printBanner("getting point biserial correlation between binary and numeric", ds1, ds2) | |
| data1 = self.getNumericData(ds1) | |
| data2 = self.getNumericData(ds2) | |
| assert isBinary(data1), "first data set is not binary" | |
| self.ensureSameSize([data1, data2]) | |
| stat, pvalue = sta.pointbiserialr(data1, data2) | |
| result = self.__printResult("stat", stat, "pvalue", pvalue) | |
| self.__printStat(stat, pvalue, "probably uncorrelated", "probably correlated", sigLev) | |
| return result | |
| def getConTab(self, ds1, ds2): | |
| """ | |
| get contingency table for categorical data pair | |
| Parameters | |
| ds1: data set name or list or numpy array | |
| ds2: data set name or list or numpy array | |
| """ | |
| self.__printBanner("getting contingency table for categorical data", ds1, ds2) | |
| data1 = self.getCatData(ds1) | |
| data2 = self.getCatData(ds2) | |
| self.ensureSameSize([data1, data2]) | |
| crosstab = pd.crosstab(pd.Series(data1), pd.Series(data2), margins = False) | |
| ctab = crosstab.values | |
| print("contingency table") | |
| print(ctab) | |
| return ctab | |
| def getChiSqCorr(self, ds1, ds2, sigLev=.05): | |
| """ | |
| chi square correlation for categorical data pair | |
| Parameters | |
| ds1: data set name or list or numpy array | |
| ds2: data set name or list or numpy array | |
| sigLev: statistical significance level | |
| """ | |
| self.__printBanner("getting chi square correlation for two categorical", ds1, ds2) | |
| ctab = self.getConTab(ds1, ds2) | |
| stat, pvalue, dof, expctd = sta.chi2_contingency(ctab) | |
| result = self.__printResult("stat", stat, "pvalue", pvalue, "dof", dof, "expected", expctd) | |
| self.__printStat(stat, pvalue, "probably uncorrelated", "probably correlated", sigLev) | |
| return result | |
| def getSizeCorrectChiSqCorr(self, ds1, ds2, chisq): | |
| """ | |
| cramerV size corrected chi square correlation for categorical data pair | |
| Parameters | |
| ds1: data set name or list or numpy array | |
| ds2: data set name or list or numpy array | |
| chisq: chisq stat | |
| """ | |
| self.__printBanner("getting size corrected chi square correlation for two categorical", ds1, ds2) | |
| c1 = self.getCatUniqueValueCounts(ds1)["cardinality"] | |
| c2 = self.getCatUniqueValueCounts(ds2)["cardinality"] | |
| c = min(c1,c2) | |
| assertGreater(c, 1, "min cardinality should be greater than 1") | |
| l = len(self.getCatData(ds1)) | |
| t = l * (c - 1) | |
| stat = math.sqrt(chisq / t) | |
| result = self.__printResult("stat", stat) | |
| return result | |
| def getAnovaCorr(self, ds1, ds2, grByCol, sigLev=.05): | |
| """ | |
| anova correlation for numerical categorical | |
| Parameters | |
| ds1: data set name or list or numpy array | |
| ds2: data set name or list or numpy array | |
| grByCol : group by column | |
| sigLev: statistical significance level | |
| """ | |
| self.__printBanner("anova correlation for numerical categorical", ds1, ds2) | |
| df = self.loadCatFloatDataFrame(ds1, ds2) if grByCol == 0 else self.loadCatFloatDataFrame(ds2, ds1) | |
| grByCol = 0 | |
| dCol = 1 | |
| grouped = df.groupby([grByCol]) | |
| dlist = list(map(lambda v : v[1].loc[:, dCol].values, grouped)) | |
| stat, pvalue = sta.f_oneway(*dlist) | |
| result = self.__printResult("stat", stat, "pvalue", pvalue) | |
| self.__printStat(stat, pvalue, "probably uncorrelated", "probably correlated", sigLev) | |
| return result | |
| def plotAutoCorr(self, ds, lags, alpha, diffOrder=0): | |
| """ | |
| plots auto correlation | |
| Parameters | |
| ds: data set name or list or numpy array | |
| lags: num of lags | |
| alpha: confidence level | |
| """ | |
| self.__printBanner("plotting auto correlation", ds) | |
| data = self.getNumericData(ds) | |
| ddata = difference(data, diffOrder) if diffOrder > 0 else data | |
| tsaplots.plot_acf(ddata, lags = lags, alpha = alpha) | |
| plt.show() | |
| def getAutoCorr(self, ds, lags, alpha=.05): | |
| """ | |
| gets auts correlation | |
| Parameters | |
| ds: data set name or list or numpy array | |
| lags: num of lags | |
| alpha: confidence level | |
| """ | |
| self.__printBanner("getting auto correlation", ds) | |
| data = self.getNumericData(ds) | |
| autoCorr, confIntv = stt.acf(data, nlags=lags, fft=False, alpha=alpha) | |
| result = self.__printResult("autoCorr", autoCorr, "confIntv", confIntv) | |
| return result | |
| def plotParAcf(self, ds, lags, alpha): | |
| """ | |
| partial auto correlation | |
| Parameters | |
| ds: data set name or list or numpy array | |
| lags: num of lags | |
| alpha: confidence level | |
| """ | |
| self.__printBanner("plotting partial auto correlation", ds) | |
| data = self.getNumericData(ds) | |
| tsaplots.plot_pacf(data, lags = lags, alpha = alpha) | |
| plt.show() | |
| def getParAutoCorr(self, ds, lags, alpha=.05): | |
| """ | |
| gets partial auts correlation | |
| Parameters | |
| ds: data set name or list or numpy array | |
| lags: num of lags | |
| alpha: confidence level | |
| """ | |
| self.__printBanner("getting partial auto correlation", ds) | |
| data = self.getNumericData(ds) | |
| partAutoCorr, confIntv = stt.pacf(data, nlags=lags, alpha=alpha) | |
| result = self.__printResult("partAutoCorr", partAutoCorr, "confIntv", confIntv) | |
| return result | |
| def getHurstExp(self, ds, kind, doPlot=True): | |
| """ | |
| gets Hurst exponent of time series | |
| Parameters | |
| ds: data set name or list or numpy array | |
| kind: kind of data change, random_walk, price | |
| doPlot: True for plot | |
| """ | |
| self.__printBanner("getting Hurst exponent", ds) | |
| data = self.getNumericData(ds) | |
| h, c, odata = hurst.compute_Hc(data, kind=kind, simplified=False) | |
| if doPlot: | |
| f, ax = plt.subplots() | |
| ax.plot(odata[0], c * odata[0] ** h, color="deepskyblue") | |
| ax.scatter(odata[0], odata[1], color="purple") | |
| ax.set_xscale("log") | |
| ax.set_yscale("log") | |
| ax.set_xlabel("time interval") | |
| ax.set_ylabel("cum dev range and std dev ratio") | |
| ax.grid(True) | |
| plt.show() | |
| result = self.__printResult("hurstExponent", h, "hurstConstant", c) | |
| return result | |
| def approxEntropy(self, ds, m, r): | |
| """ | |
| gets apprx entroty of time series (ref: wikipedia) | |
| Parameters | |
| ds: data set name or list or numpy array | |
| m: length of compared run of data | |
| r: filtering level | |
| """ | |
| self.__printBanner("getting approximate entropy", ds) | |
| ldata = self.getNumericData(ds) | |
| aent = abs(self.__phi(ldata, m + 1, r) - self.__phi(ldata, m, r)) | |
| result = self.__printResult("approxEntropy", aent) | |
| return result | |
| def __phi(self, ldata, m, r): | |
| """ | |
| phi function for approximate entropy | |
| Parameters | |
| ldata: data array | |
| m: length of compared run of data | |
| r: filtering level | |
| """ | |
| le = len(ldata) | |
| x = [[ldata[j] for j in range(i, i + m - 1 + 1)] for i in range(le - m + 1)] | |
| lex = len(x) | |
| c = list() | |
| for i in range(lex): | |
| cnt = 0 | |
| for j in range(lex): | |
| cnt += (1 if maxListDist(x[i], x[j]) <= r else 0) | |
| cnt /= (le - m + 1.0) | |
| c.append(cnt) | |
| return sum(np.log(c)) / (le - m + 1.0) | |
| def oneSpaceEntropy(self, ds, scaMethod="zscale"): | |
| """ | |
| gets one space entroty (ref: Estimating mutual information by Kraskov) | |
| Parameters | |
| ds: data set name or list or numpy array | |
| """ | |
| self.__printBanner("getting one space entropy", ds) | |
| data = self.getNumericData(ds) | |
| sdata = sorted(data) | |
| sdata = scaleData(sdata, scaMethod) | |
| su = 0 | |
| n = len(sdata) | |
| for i in range(1, n, 1): | |
| t = abs(sdata[i] - sdata[i-1]) | |
| if t > 0: | |
| su += log(t) | |
| su /= (n -1) | |
| #print(su) | |
| ose = digammaFun(n) - digammaFun(1) + su | |
| result = self.__printResult("entropy", ose) | |
| return result | |
| def plotCrossCorr(self, ds1, ds2, normed, lags): | |
| """ | |
| plots cross correlation | |
| Parameters | |
| ds1: data set name or list or numpy array | |
| ds2: data set name or list or numpy array | |
| normed: If True, input vectors are normalised to unit | |
| lags: num of lags | |
| """ | |
| self.__printBanner("plotting cross correlation between two numeric", ds1, ds2) | |
| data1 = self.getNumericData(ds1) | |
| data2 = self.getNumericData(ds2) | |
| self.ensureSameSize([data1, data2]) | |
| plt.xcorr(data1, data2, normed=normed, maxlags=lags) | |
| plt.show() | |
| def getCrossCorr(self, ds1, ds2): | |
| """ | |
| gets cross correlation | |
| Parameters | |
| ds1: data set name or list or numpy array | |
| ds2: data set name or list or numpy array | |
| """ | |
| self.__printBanner("getting cross correlation", ds1, ds2) | |
| data1 = self.getNumericData(ds1) | |
| data2 = self.getNumericData(ds2) | |
| self.ensureSameSize([data1, data2]) | |
| crossCorr = stt.ccf(data1, data2) | |
| result = self.__printResult("crossCorr", crossCorr) | |
| return result | |
| def getFourierTransform(self, ds): | |
| """ | |
| gets fast fourier transform | |
| Parameters | |
| ds: data set name or list or numpy array | |
| """ | |
| self.__printBanner("getting fourier transform", ds) | |
| data = self.getNumericData(ds) | |
| ft = np.fft.rfft(data) | |
| result = self.__printResult("fourierTransform", ft) | |
| return result | |
| def testStationaryAdf(self, ds, regression, autolag, sigLev=.05): | |
| """ | |
| Adf stationary test null hyp not stationary | |
| Parameters | |
| ds: data set name or list or numpy array | |
| regression: constant and trend order to include in regression | |
| autolag: method to use when automatically determining the lag | |
| sigLev: statistical significance level | |
| """ | |
| self.__printBanner("doing ADF stationary test", ds) | |
| relist = ["c","ct","ctt","nc"] | |
| assert regression in relist, "invalid regression value" | |
| alList = ["AIC", "BIC", "t-stat", None] | |
| assert autolag in alList, "invalid autolag value" | |
| data = self.getNumericData(ds) | |
| re = stt.adfuller(data, regression=regression, autolag=autolag) | |
| result = self.__printResult("stat", re[0], "pvalue", re[1] , "num lags", re[2] , "num observation for regression", re[3], | |
| "critial values", re[4]) | |
| self.__printStat(re[0], re[1], "probably not stationary", "probably stationary", sigLev) | |
| return result | |
| def testStationaryKpss(self, ds, regression, nlags, sigLev=.05): | |
| """ | |
| Kpss stationary test null hyp stationary | |
| Parameters | |
| ds: data set name or list or numpy array | |
| regression: constant and trend order to include in regression | |
| nlags : no of lags | |
| sigLev: statistical significance level | |
| """ | |
| self.__printBanner("doing KPSS stationary test", ds) | |
| relist = ["c","ct"] | |
| assert regression in relist, "invalid regression value" | |
| nlList =[None, "auto", "legacy"] | |
| assert nlags in nlList or type(nlags) == int, "invalid nlags value" | |
| data = self.getNumericData(ds) | |
| stat, pvalue, nLags, criticalValues = stt.kpss(data, regression=regression, lags=nlags) | |
| result = self.__printResult("stat", stat, "pvalue", pvalue, "num lags", nLags, "critial values", criticalValues) | |
| self.__printStat(stat, pvalue, "probably stationary", "probably not stationary", sigLev) | |
| return result | |
| def testNormalJarqBera(self, ds, sigLev=.05): | |
| """ | |
| jarque bera normalcy test | |
| Parameters | |
| ds: data set name or list or numpy array | |
| sigLev: statistical significance level | |
| """ | |
| self.__printBanner("doing ajrque bera normalcy test", ds) | |
| data = self.getNumericData(ds) | |
| jb, jbpv, skew, kurtosis = sstt.jarque_bera(data) | |
| result = self.__printResult("stat", jb, "pvalue", jbpv, "skew", skew, "kurtosis", kurtosis) | |
| self.__printStat(jb, jbpv, "probably gaussian", "probably not gaussian", sigLev) | |
| return result | |
| def testNormalShapWilk(self, ds, sigLev=.05): | |
| """ | |
| shapiro wilks normalcy test | |
| Parameters | |
| ds: data set name or list or numpy array | |
| sigLev: statistical significance level | |
| """ | |
| self.__printBanner("doing shapiro wilks normalcy test", ds) | |
| data = self.getNumericData(ds) | |
| stat, pvalue = sta.shapiro(data) | |
| result = self.__printResult("stat", stat, "pvalue", pvalue) | |
| self.__printStat(stat, pvalue, "probably gaussian", "probably not gaussian", sigLev) | |
| return result | |
| def testNormalDagast(self, ds, sigLev=.05): | |
| """ | |
| D’Agostino’s K square normalcy test | |
| Parameters | |
| ds: data set name or list or numpy array | |
| sigLev: statistical significance level | |
| """ | |
| self.__printBanner("doing D’Agostino’s K square normalcy test", ds) | |
| data = self.getNumericData(ds) | |
| stat, pvalue = sta.normaltest(data) | |
| result = self.__printResult("stat", stat, "pvalue", pvalue) | |
| self.__printStat(stat, pvalue, "probably gaussian", "probably not gaussian", sigLev) | |
| return result | |
| def testDistrAnderson(self, ds, dist, sigLev=.05): | |
| """ | |
| Anderson test for normal, expon, logistic, gumbel, gumbel_l, gumbel_r | |
| Parameters | |
| ds: data set name or list or numpy array | |
| dist: type of distribution | |
| sigLev: statistical significance level | |
| """ | |
| self.__printBanner("doing Anderson test for for various distributions", ds) | |
| diList = ["norm", "expon", "logistic", "gumbel", "gumbel_l", "gumbel_r", "extreme1"] | |
| assert dist in diList, "invalid distribution" | |
| data = self.getNumericData(ds) | |
| re = sta.anderson(data) | |
| slAlpha = int(100 * sigLev) | |
| msg = "significnt value not found" | |
| for i in range(len(re.critical_values)): | |
| sl, cv = re.significance_level[i], re.critical_values[i] | |
| if int(sl) == slAlpha: | |
| if re.statistic < cv: | |
| msg = "probably {} at the {:.3f} siginificance level".format(dist, sl) | |
| else: | |
| msg = "probably not {} at the {:.3f} siginificance level".format(dist, sl) | |
| result = self.__printResult("stat", re.statistic, "test", msg) | |
| print(msg) | |
| return result | |
| def testSkew(self, ds, sigLev=.05): | |
| """ | |
| test skew wrt normal distr | |
| Parameters | |
| ds: data set name or list or numpy array | |
| sigLev: statistical significance level | |
| """ | |
| self.__printBanner("testing skew wrt normal distr", ds) | |
| data = self.getNumericData(ds) | |
| stat, pvalue = sta.skewtest(data) | |
| result = self.__printResult("stat", stat, "pvalue", pvalue) | |
| self.__printStat(stat, pvalue, "probably same skew as normal distribution", "probably not same skew as normal distribution", sigLev) | |
| return result | |
| def testTwoSampleStudent(self, ds1, ds2, sigLev=.05): | |
| """ | |
| student t 2 sample test | |
| Parameters | |
| ds1: data set name or list or numpy array | |
| ds2: data set name or list or numpy array | |
| sigLev: statistical significance level | |
| """ | |
| self.__printBanner("doing student t 2 sample test", ds1, ds2) | |
| data1 = self.getNumericData(ds1) | |
| data2 = self.getNumericData(ds2) | |
| stat, pvalue = sta.ttest_ind(data1, data2) | |
| result = self.__printResult("stat", stat, "pvalue", pvalue) | |
| self.__printStat(stat, pvalue, "probably same distribution", "probably not same distribution", sigLev) | |
| return result | |
| def testTwoSampleKs(self, ds1, ds2, sigLev=.05): | |
| """ | |
| Kolmogorov Sminov 2 sample statistic | |
| Parameters | |
| ds1: data set name or list or numpy array | |
| ds2: data set name or list or numpy array | |
| sigLev: statistical significance level | |
| """ | |
| self.__printBanner("doing Kolmogorov Sminov 2 sample test", ds1, ds2) | |
| data1 = self.getNumericData(ds1) | |
| data2 = self.getNumericData(ds2) | |
| stat, pvalue = sta.ks_2samp(data1, data2) | |
| result = self.__printResult("stat", stat, "pvalue", pvalue) | |
| self.__printStat(stat, pvalue, "probably same distribution", "probably not same distribution", sigLev) | |
| def testTwoSampleMw(self, ds1, ds2, sigLev=.05): | |
| """ | |
| Mann-Whitney 2 sample statistic | |
| Parameters | |
| ds1: data set name or list or numpy array | |
| ds2: data set name or list or numpy array | |
| sigLev: statistical significance level | |
| """ | |
| self.__printBanner("doing Mann-Whitney 2 sample test", ds1, ds2) | |
| data1 = self.getNumericData(ds1) | |
| data2 = self.getNumericData(ds2) | |
| stat, pvalue = sta.mannwhitneyu(data1, data2) | |
| result = self.__printResult("stat", stat, "pvalue", pvalue) | |
| self.__printStat(stat, pvalue, "probably same distribution", "probably not same distribution", sigLev) | |
| def testTwoSampleWilcox(self, ds1, ds2, sigLev=.05): | |
| """ | |
| Wilcoxon Signed-Rank 2 sample statistic | |
| Parameters | |
| ds1: data set name or list or numpy array | |
| ds2: data set name or list or numpy array | |
| sigLev: statistical significance level | |
| """ | |
| self.__printBanner("doing Wilcoxon Signed-Rank 2 sample test", ds1, ds2) | |
| data1 = self.getNumericData(ds1) | |
| data2 = self.getNumericData(ds2) | |
| stat, pvalue = sta.wilcoxon(data1, data2) | |
| result = self.__printResult("stat", stat, "pvalue", pvalue) | |
| self.__printStat(stat, pvalue, "probably same distribution", "probably not same distribution", sigLev) | |
| def testTwoSampleKw(self, ds1, ds2, sigLev=.05): | |
| """ | |
| Kruskal-Wallis 2 sample statistic | |
| Parameters | |
| ds1: data set name or list or numpy array | |
| ds2: data set name or list or numpy array | |
| sigLev: statistical significance level | |
| """ | |
| self.__printBanner("doing Kruskal-Wallis 2 sample test", ds1, ds2) | |
| data1 = self.getNumericData(ds1) | |
| data2 = self.getNumericData(ds2) | |
| stat, pvalue = sta.kruskal(data1, data2) | |
| result = self.__printResult("stat", stat, "pvalue", pvalue) | |
| self.__printStat(stat, pvalue, "probably same distribution", "probably snot ame distribution", sigLev) | |
| def testTwoSampleFriedman(self, ds1, ds2, ds3, sigLev=.05): | |
| """ | |
| Friedman 2 sample statistic | |
| Parameters | |
| ds1: data set name or list or numpy array | |
| ds2: data set name or list or numpy array | |
| sigLev: statistical significance level | |
| """ | |
| self.__printBanner("doing Friedman 2 sample test", ds1, ds2) | |
| data1 = self.getNumericData(ds1) | |
| data2 = self.getNumericData(ds2) | |
| data3 = self.getNumericData(ds3) | |
| stat, pvalue = sta.friedmanchisquare(data1, data2, data3) | |
| result = self.__printResult("stat", stat, "pvalue", pvalue) | |
| self.__printStat(stat, pvalue, "probably same distribution", "probably not same distribution", sigLev) | |
| def testTwoSampleEs(self, ds1, ds2, sigLev=.05): | |
| """ | |
| Epps Singleton 2 sample statistic | |
| Parameters | |
| ds1: data set name or list or numpy array | |
| ds2: data set name or list or numpy array | |
| sigLev: statistical significance level | |
| """ | |
| self.__printBanner("doing Epps Singleton 2 sample test", ds1, ds2) | |
| data1 = self.getNumericData(ds1) | |
| data2 = self.getNumericData(ds2) | |
| stat, pvalue = sta.epps_singleton_2samp(data1, data2) | |
| result = self.__printResult("stat", stat, "pvalue", pvalue) | |
| self.__printStat(stat, pvalue, "probably same distribution", "probably not same distribution", sigLev) | |
| def testTwoSampleAnderson(self, ds1, ds2, sigLev=.05): | |
| """ | |
| Anderson 2 sample statistic | |
| Parameters | |
| ds1: data set name or list or numpy array | |
| ds2: data set name or list or numpy array | |
| sigLev: statistical significance level | |
| """ | |
| self.__printBanner("doing Anderson 2 sample test", ds1, ds2) | |
| data1 = self.getNumericData(ds1) | |
| data2 = self.getNumericData(ds2) | |
| dseq = (data1, data2) | |
| stat, critValues, sLev = sta.anderson_ksamp(dseq) | |
| slAlpha = 100 * sigLev | |
| if slAlpha == 10: | |
| cv = critValues[1] | |
| elif slAlpha == 5: | |
| cv = critValues[2] | |
| elif slAlpha == 2.5: | |
| cv = critValues[3] | |
| elif slAlpha == 1: | |
| cv = critValues[4] | |
| else: | |
| cv = None | |
| result = self.__printResult("stat", stat, "critValues", critValues, "critValue", cv, "significanceLevel", sLev) | |
| print("stat: {:.3f}".format(stat)) | |
| if cv is None: | |
| msg = "critical values value not found for provided siginificance level" | |
| else: | |
| if stat < cv: | |
| msg = "probably same distribution at the {:.3f} siginificance level".format(sigLev) | |
| else: | |
| msg = "probably not same distribution at the {:.3f} siginificance level".format(sigLev) | |
| print(msg) | |
| return result | |
| def testTwoSampleScaleAb(self, ds1, ds2, sigLev=.05): | |
| """ | |
| Ansari Bradley 2 sample scale statistic | |
| Parameters | |
| ds1: data set name or list or numpy array | |
| ds2: data set name or list or numpy array | |
| sigLev: statistical significance level | |
| """ | |
| self.__printBanner("doing Ansari Bradley 2 sample scale test", ds1, ds2) | |
| data1 = self.getNumericData(ds1) | |
| data2 = self.getNumericData(ds2) | |
| stat, pvalue = sta.ansari(data1, data2) | |
| result = self.__printResult("stat", stat, "pvalue", pvalue) | |
| self.__printStat(stat, pvalue, "probably same scale", "probably not same scale", sigLev) | |
| return result | |
| def testTwoSampleScaleMood(self, ds1, ds2, sigLev=.05): | |
| """ | |
| Mood 2 sample scale statistic | |
| Parameters | |
| ds1: data set name or list or numpy array | |
| ds2: data set name or list or numpy array | |
| sigLev: statistical significance level | |
| """ | |
| self.__printBanner("doing Mood 2 sample scale test", ds1, ds2) | |
| data1 = self.getNumericData(ds1) | |
| data2 = self.getNumericData(ds2) | |
| stat, pvalue = sta.mood(data1, data2) | |
| result = self.__printResult("stat", stat, "pvalue", pvalue) | |
| self.__printStat(stat, pvalue, "probably same scale", "probably not same scale", sigLev) | |
| return result | |
| def testTwoSampleVarBartlet(self, ds1, ds2, sigLev=.05): | |
| """ | |
| Ansari Bradley 2 sample scale statistic | |
| Parameters | |
| ds1: data set name or list or numpy array | |
| ds2: data set name or list or numpy array | |
| sigLev: statistical significance level | |
| """ | |
| self.__printBanner("doing Ansari Bradley 2 sample scale test", ds1, ds2) | |
| data1 = self.getNumericData(ds1) | |
| data2 = self.getNumericData(ds2) | |
| stat, pvalue = sta.bartlett(data1, data2) | |
| result = self.__printResult("stat", stat, "pvalue", pvalue) | |
| self.__printStat(stat, pvalue, "probably same variance", "probably not same variance", sigLev) | |
| return result | |
| def testTwoSampleVarLevene(self, ds1, ds2, sigLev=.05): | |
| """ | |
| Levene 2 sample variance statistic | |
| Parameters | |
| ds1: data set name or list or numpy array | |
| ds2: data set name or list or numpy array | |
| sigLev: statistical significance level | |
| """ | |
| self.__printBanner("doing Levene 2 sample variance test", ds1, ds2) | |
| data1 = self.getNumericData(ds1) | |
| data2 = self.getNumericData(ds2) | |
| stat, pvalue = sta.levene(data1, data2) | |
| result = self.__printResult("stat", stat, "pvalue", pvalue) | |
| self.__printStat(stat, pvalue, "probably same variance", "probably not same variance", sigLev) | |
| return result | |
| def testTwoSampleVarFk(self, ds1, ds2, sigLev=.05): | |
| """ | |
| Fligner-Killeen 2 sample variance statistic | |
| Parameters | |
| ds1: data set name or list or numpy array | |
| ds2: data set name or list or numpy array | |
| sigLev: statistical significance level | |
| """ | |
| self.__printBanner("doing Fligner-Killeen 2 sample variance test", ds1, ds2) | |
| data1 = self.getNumericData(ds1) | |
| data2 = self.getNumericData(ds2) | |
| stat, pvalue = sta.fligner(data1, data2) | |
| result = self.__printResult("stat", stat, "pvalue", pvalue) | |
| self.__printStat(stat, pvalue, "probably same variance", "probably not same variance", sigLev) | |
| return result | |
| def testTwoSampleMedMood(self, ds1, ds2, sigLev=.05): | |
| """ | |
| Mood 2 sample median statistic | |
| Parameters | |
| ds1: data set name or list or numpy array | |
| ds2: data set name or list or numpy array | |
| sigLev: statistical significance level | |
| """ | |
| self.__printBanner("doing Mood 2 sample median test", ds1, ds2) | |
| data1 = self.getNumericData(ds1) | |
| data2 = self.getNumericData(ds2) | |
| stat, pvalue, median, ctable = sta.median_test(data1, data2) | |
| result = self.__printResult("stat", stat, "pvalue", pvalue, "median", median, "contigencyTable", ctable) | |
| self.__printStat(stat, pvalue, "probably same median", "probably not same median", sigLev) | |
| return result | |
| def testTwoSampleZc(self, ds1, ds2, sigLev=.05): | |
| """ | |
| Zhang-C 2 sample statistic | |
| Parameters | |
| ds1: data set name or list or numpy array | |
| ds2: data set name or list or numpy array | |
| sigLev: statistical significance level | |
| """ | |
| self.__printBanner("doing Zhang-C 2 sample test", ds1, ds2) | |
| data1 = self.getNumericData(ds1) | |
| data2 = self.getNumericData(ds2) | |
| l1 = len(data1) | |
| l2 = len(data2) | |
| l = l1 + l2 | |
| #find ranks | |
| pooled = np.concatenate([data1, data2]) | |
| ranks = findRanks(data1, pooled) | |
| ranks.extend(findRanks(data2, pooled)) | |
| s1 = 0.0 | |
| for i in range(1, l1+1): | |
| s1 += math.log(l1 / (i - 0.5) - 1.0) * math.log(l / (ranks[i-1] - 0.5) - 1.0) | |
| s2 = 0.0 | |
| for i in range(1, l2+1): | |
| s2 += math.log(l2 / (i - 0.5) - 1.0) * math.log(l / (ranks[l1 + i - 1] - 0.5) - 1.0) | |
| stat = (s1 + s2) / l | |
| print(formatFloat(3, stat, "stat:")) | |
| return stat | |
| def testTwoSampleZa(self, ds1, ds2, sigLev=.05): | |
| """ | |
| Zhang-A 2 sample statistic | |
| Parameters | |
| ds1: data set name or list or numpy array | |
| ds2: data set name or list or numpy array | |
| sigLev: statistical significance level | |
| """ | |
| self.__printBanner("doing Zhang-A 2 sample test", ds1, ds2) | |
| data1 = self.getNumericData(ds1) | |
| data2 = self.getNumericData(ds2) | |
| l1 = len(data1) | |
| l2 = len(data2) | |
| l = l1 + l2 | |
| pooled = np.concatenate([data1, data2]) | |
| cd1 = CumDistr(data1) | |
| cd2 = CumDistr(data2) | |
| sum = 0.0 | |
| for i in range(1, l+1): | |
| v = pooled[i-1] | |
| f1 = cd1.getDistr(v) | |
| f2 = cd2.getDistr(v) | |
| t1 = f1 * math.log(f1) | |
| t2 = 0 if f1 == 1.0 else (1.0 - f1) * math.log(1.0 - f1) | |
| sum += l1 * (t1 + t2) / ((i - 0.5) * (l - i + 0.5)) | |
| t1 = f2 * math.log(f2) | |
| t2 = 0 if f2 == 1.0 else (1.0 - f2) * math.log(1.0 - f2) | |
| sum += l2 * (t1 + t2) / ((i - 0.5) * (l - i + 0.5)) | |
| stat = -sum | |
| print(formatFloat(3, stat, "stat:")) | |
| return stat | |
| def testTwoSampleZk(self, ds1, ds2, sigLev=.05): | |
| """ | |
| Zhang-K 2 sample statistic | |
| Parameters | |
| ds1: data set name or list or numpy array | |
| ds2: data set name or list or numpy array | |
| sigLev: statistical significance level | |
| """ | |
| self.__printBanner("doing Zhang-K 2 sample test", ds1, ds2) | |
| data1 = self.getNumericData(ds1) | |
| data2 = self.getNumericData(ds2) | |
| l1 = len(data1) | |
| l2 = len(data2) | |
| l = l1 + l2 | |
| pooled = np.concatenate([data1, data2]) | |
| cd1 = CumDistr(data1) | |
| cd2 = CumDistr(data2) | |
| cd = CumDistr(pooled) | |
| maxStat = None | |
| for i in range(1, l+1): | |
| v = pooled[i-1] | |
| f1 = cd1.getDistr(v) | |
| f2 = cd2.getDistr(v) | |
| f = cd.getDistr(v) | |
| t1 = 0 if f1 == 0 else f1 * math.log(f1 / f) | |
| t2 = 0 if f1 == 1.0 else (1.0 - f1) * math.log((1.0 - f1) / (1.0 - f)) | |
| stat = l1 * (t1 + t2) | |
| t1 = 0 if f2 == 0 else f2 * math.log(f2 / f) | |
| t2 = 0 if f2 == 1.0 else (1.0 - f2) * math.log((1.0 - f2) / (1.0 - f)) | |
| stat += l2 * (t1 + t2) | |
| if maxStat is None or stat > maxStat: | |
| maxStat = stat | |
| print(formatFloat(3, maxStat, "stat:")) | |
| return maxStat | |
| def testTwoSampleCvm(self, ds1, ds2, sigLev=.05): | |
| """ | |
| 2 sample cramer von mises | |
| Parameters | |
| ds1: data set name or list or numpy array | |
| ds2: data set name or list or numpy array | |
| sigLev: statistical significance level | |
| """ | |
| self.__printBanner("doing 2 sample CVM test", ds1, ds2) | |
| data1 = self.getNumericData(ds1) | |
| data2 = self.getNumericData(ds2) | |
| data = np.concatenate((data1,data2)) | |
| rdata = sta.rankdata(data) | |
| n = len(data1) | |
| m = len(data2) | |
| l = n + m | |
| s1 = 0 | |
| for i in range(n): | |
| t = rdata[i] - (i+1) | |
| s1 += (t * t) | |
| s1 *= n | |
| s2 = 0 | |
| for i in range(m): | |
| t = rdata[i + n] - (i+1) | |
| s2 += (t * t) | |
| s2 *= m | |
| u = s1 + s2 | |
| stat = u / (n * m * l) - (4 * m * n - 1) / (6 * l) | |
| result = self.__printResult("stat", stat) | |
| return result | |
| def ensureSameSize(self, dlist): | |
| """ | |
| ensures all data sets are of same size | |
| Parameters | |
| dlist : data source list | |
| """ | |
| le = None | |
| for d in dlist: | |
| cle = len(d) | |
| if le is None: | |
| le = cle | |
| else: | |
| assert cle == le, "all data sets need to be of same size" | |
| def testTwoSampleWasserstein(self, ds1, ds2): | |
| """ | |
| Wasserstein 2 sample statistic | |
| Parameters | |
| ds1: data set name or list or numpy array | |
| ds2: data set name or list or numpy array | |
| """ | |
| self.__printBanner("doing Wasserstein distance2 sample test", ds1, ds2) | |
| data1 = self.getNumericData(ds1) | |
| data2 = self.getNumericData(ds2) | |
| stat = sta.wasserstein_distance(data1, data2) | |
| sd = np.std(np.concatenate([data1, data2])) | |
| nstat = stat / sd | |
| result = self.__printResult("stat", stat, "normalizedStat", nstat) | |
| return result | |
| def getMaxRelMinRedFeatures(self, fdst, tdst, nfeatures, nbins=20): | |
| """ | |
| get top n features based on max relevance and min redudancy algorithm | |
| Parameters | |
| fdst: list of pair of data set name or list or numpy array and data type | |
| tdst: target data set name or list or numpy array and data type (cat for classification num for regression) | |
| nfeatures : desired no of features | |
| nbins : no of bins for numerical data | |
| """ | |
| self.__printBanner("doing max relevance min redundancy feature selection") | |
| return self.getMutInfoFeatures(fdst, tdst, nfeatures, "mrmr", nbins) | |
| def getJointMutInfoFeatures(self, fdst, tdst, nfeatures, nbins=20): | |
| """ | |
| get top n features based on joint mutual infoormation algorithm | |
| Parameters | |
| fdst: list of pair of data set name or list or numpy array and data type | |
| tdst: target data set name or list or numpy array and data type (cat for classification num for regression) | |
| nfeatures : desired no of features | |
| nbins : no of bins for numerical data | |
| """ | |
| self.__printBanner("doingjoint mutual info feature selection") | |
| return self.getMutInfoFeatures(fdst, tdst, nfeatures, "jmi", nbins) | |
| def getCondMutInfoMaxFeatures(self, fdst, tdst, nfeatures, nbins=20): | |
| """ | |
| get top n features based on condition mutual information maximization algorithm | |
| Parameters | |
| fdst: list of pair of data set name or list or numpy array and data type | |
| tdst: target data set name or list or numpy array and data type (cat for classification num for regression) | |
| nfeatures : desired no of features | |
| nbins : no of bins for numerical data | |
| """ | |
| self.__printBanner("doing conditional mutual info max feature selection") | |
| return self.getMutInfoFeatures(fdst, tdst, nfeatures, "cmim", nbins) | |
| def getInteractCapFeatures(self, fdst, tdst, nfeatures, nbins=20): | |
| """ | |
| get top n features based on interaction capping algorithm | |
| Parameters | |
| fdst: list of pair of data set name or list or numpy array and data type | |
| tdst: target data set name or list or numpy array and data type (cat for classification num for regression) | |
| nfeatures : desired no of features | |
| nbins : no of bins for numerical data | |
| """ | |
| self.__printBanner("doing interaction capped feature selection") | |
| return self.getMutInfoFeatures(fdst, tdst, nfeatures, "icap", nbins) | |
| def getMutInfoFeatures(self, fdst, tdst, nfeatures, algo, nbins=20): | |
| """ | |
| get top n features based on various mutual information based algorithm | |
| ref: Conditional likelihood maximisation : A unifying framework for information | |
| theoretic feature selection, Gavin Brown | |
| Parameters | |
| fdst: list of pair of data set name or list or numpy array and data type | |
| tdst: target data set name or list or numpy array and data type (cat for classification num for regression) | |
| nfeatures : desired no of features | |
| algo: mi based feature selection algorithm | |
| nbins : no of bins for numerical data | |
| """ | |
| #verify data source types types | |
| le = len(fdst) | |
| nfeatGiven = int(le / 2) | |
| assertGreater(nfeatGiven, nfeatures, "no of features should be greater than no of features to be selected") | |
| fds = list() | |
| types = ["num", "cat"] | |
| for i in range (0, le, 2): | |
| ds = fdst[i] | |
| dt = fdst[i+1] | |
| assertInList(dt, types, "invalid type for data source " + dt) | |
| data = self.getNumericData(ds) if dt == "num" else self.getCatData(ds) | |
| p =(ds, dt) | |
| fds.append(p) | |
| algos = ["mrmr", "jmi", "cmim", "icap"] | |
| assertInList(algo, algos, "invalid feature selection algo " + algo) | |
| assertInList(tdst[1], types, "invalid type for data source " + tdst[1]) | |
| data = self.getNumericData(tdst[0]) if tdst[1] == "num" else self.getCatData(tdst[0]) | |
| #print(fds) | |
| sfds = list() | |
| selected = set() | |
| relevancies = dict() | |
| for i in range(nfeatures): | |
| #print(i) | |
| scorem = None | |
| dsm = None | |
| dsmt = None | |
| for ds, dt in fds: | |
| #print(ds, dt) | |
| if ds not in selected: | |
| #relevancy | |
| if ds in relevancies: | |
| mutInfo = relevancies[ds] | |
| else: | |
| mutInfo = self.getMutualInfo([ds, dt, tdst[0], tdst[1]], nbins)["mutInfo"] | |
| relevancies[ds] = mutInfo | |
| relev = mutInfo | |
| #print("relev", relev) | |
| #redundancy | |
| smi = 0 | |
| reds = list() | |
| for sds, sdt, _ in sfds: | |
| #print(sds, sdt) | |
| mutInfo = self.getMutualInfo([ds, dt, sds, sdt], nbins)["mutInfo"] | |
| mutInfoCnd = self.getCondMutualInfo([ds, dt, sds, sdt, tdst[0], tdst[1]], nbins)["condMutInfo"] \ | |
| if algo != "mrmr" else 0 | |
| red = mutInfo - mutInfoCnd | |
| reds.append(red) | |
| if algo == "mrmr" or algo == "jmi": | |
| redun = sum(reds) / len(sfds) if len(sfds) > 0 else 0 | |
| elif algo == "cmim" or algo == "icap": | |
| redun = max(reds) if len(sfds) > 0 else 0 | |
| if algo == "icap": | |
| redun = max(0, redun) | |
| #print("redun", redun) | |
| score = relev - redun | |
| if scorem is None or score > scorem: | |
| scorem = score | |
| dsm = ds | |
| dsmt = dt | |
| pa = (dsm, dsmt, scorem) | |
| #print(pa) | |
| sfds.append(pa) | |
| selected.add(dsm) | |
| selFeatures = list(map(lambda r : (r[0], r[2]), sfds)) | |
| result = self.__printResult("selFeatures", selFeatures) | |
| return result | |
| def getFastCorrFeatures(self, fdst, tdst, delta, nbins=20): | |
| """ | |
| get top features based on Fast Correlation Based Filter (FCBF) | |
| ref: Feature Selection for High-Dimensional Data: A Fast Correlation-Based Filter Solution | |
| Lei Yu | |
| Parameters | |
| fdst: list of pair of data set name or list or numpy array and data type | |
| tdst: target data set name or list or numpy array and data type (cat for classification num for regression) | |
| delta : feature, target correlation threshold | |
| nbins : no of bins for numerical data | |
| """ | |
| le = len(fdst) | |
| nfeatGiven = int(le / 2) | |
| fds = list() | |
| types = ["num", "cat"] | |
| for i in range (0, le, 2): | |
| ds = fdst[i] | |
| dt = fdst[i+1] | |
| assertInList(dt, types, "invalid type for data source " + dt) | |
| data = self.getNumericData(ds) if dt == "num" else self.getCatData(ds) | |
| p =(ds, dt) | |
| fds.append(p) | |
| assertInList(tdst[1], types, "invalid type for data source " + tdst[1]) | |
| data = self.getNumericData(tdst[0]) if tdst[1] == "num" else self.getCatData(tdst[0]) | |
| # get features with symetric uncertainty above threshold | |
| tentr = self.getAnyEntropy(tdst[0], tdst[1], nbins)["entropy"] | |
| rfeatures = list() | |
| fentrs = dict() | |
| for ds, dt in fds: | |
| mutInfo = self.getMutualInfo([ds, dt, tdst[0], tdst[1]], nbins)["mutInfo"] | |
| fentr = self.getAnyEntropy(ds, dt, nbins)["entropy"] | |
| sunc = 2 * mutInfo / (tentr + fentr) | |
| #print("ds {} sunc {:.3f}".format(ds, sunc)) | |
| if sunc >= delta: | |
| f = [ds, dt, sunc, False] | |
| rfeatures.append(f) | |
| fentrs[ds] = fentr | |
| # sort descending of sym uncertainty | |
| rfeatures.sort(key=lambda e : e[2], reverse=True) | |
| #disccard redundant features | |
| le = len(rfeatures) | |
| for i in range(le): | |
| if rfeatures[i][3]: | |
| continue | |
| for j in range(i+1, le, 1): | |
| if rfeatures[j][3]: | |
| continue | |
| mutInfo = self.getMutualInfo([rfeatures[i][0], rfeatures[i][1], rfeatures[j][0], rfeatures[j][1]], nbins)["mutInfo"] | |
| sunc = 2 * mutInfo / (fentrs[rfeatures[i][0]] + fentrs[rfeatures[j][0]]) | |
| if sunc >= rfeatures[j][2]: | |
| rfeatures[j][3] = True | |
| frfeatures = list(filter(lambda f : not f[3], rfeatures)) | |
| selFeatures = list(map(lambda f : [f[0], f[2]], frfeatures)) | |
| result = self.__printResult("selFeatures", selFeatures) | |
| return result | |
| def getInfoGainFeatures(self, fdst, tdst, nfeatures, nsplit, nbins=20): | |
| """ | |
| get top n features based on information gain or entropy loss | |
| Parameters | |
| fdst: list of pair of data set name or list or numpy array and data type | |
| tdst: target data set name or list or numpy array and data type (cat for classification num for regression) | |
| nsplit : num of splits | |
| nfeatures : desired no of features | |
| nbins : no of bins for numerical data | |
| """ | |
| le = len(fdst) | |
| nfeatGiven = int(le / 2) | |
| assertGreater(nfeatGiven, nfeatures, "available features should be greater than desired") | |
| fds = list() | |
| types = ["num", "cat"] | |
| for i in range (0, le, 2): | |
| ds = fdst[i] | |
| dt = fdst[i+1] | |
| assertInList(dt, types, "invalid type for data source " + dt) | |
| data = self.getNumericData(ds) if dt == "num" else self.getCatData(ds) | |
| p =(ds, dt) | |
| fds.append(p) | |
| assertInList(tdst[1], types, "invalid type for data source " + tdst[1]) | |
| assertGreater(nsplit, 3, "minimum 4 splits necessary") | |
| tdata = self.getNumericData(tdst[0]) if tdst[1] == "num" else self.getCatData(tdst[0]) | |
| tentr = self.getAnyEntropy(tdst[0], tdst[1], nbins)["entropy"] | |
| sz =len(tdata) | |
| sfds = list() | |
| for ds, dt in fds: | |
| #print(ds, dt) | |
| if dt == "num": | |
| fd = self.getNumericData(ds) | |
| _ , _ , vmax, vmin = self.__getBasicStats(fd) | |
| intv = (vmax - vmin) / nsplit | |
| maxig = None | |
| spmin = vmin + intv | |
| spmax = vmax - 0.9 * intv | |
| #iterate all splits | |
| for sp in np.arange(spmin, spmax, intv): | |
| ltvals = list() | |
| gevals = list() | |
| for i in range(len(fd)): | |
| if fd[i] < sp: | |
| ltvals.append(tdata[i]) | |
| else: | |
| gevals.append(tdata[i]) | |
| self.addListNumericData(ltvals, "spds") if tdst[1] == "num" else self.addListCatData(ltvals, "spds") | |
| lten = self.getAnyEntropy("spds", tdst[1], nbins)["entropy"] | |
| self.addListNumericData(gevals, "spds") if tdst[1] == "num" else self.addListCatData(gevals, "spds") | |
| geen = self.getAnyEntropy("spds", tdst[1], nbins)["entropy"] | |
| #info gain | |
| ig = tentr - (len(ltvals) * lten / sz + len(gevals) * geen / sz) | |
| if maxig is None or ig > maxig: | |
| maxig = ig | |
| pa = (ds, maxig) | |
| sfds.append(pa) | |
| else: | |
| fd = self.getCatData(ds) | |
| fds = set(fd) | |
| fdps = genPowerSet(fds) | |
| maxig = None | |
| #iterate all subsets | |
| for s in fdps: | |
| if len(s) == len(fds): | |
| continue | |
| invals = list() | |
| exvals = list() | |
| for i in range(len(fd)): | |
| if fd[i] in s: | |
| invals.append(tdata[i]) | |
| else: | |
| exvals.append(tdata[i]) | |
| self.addListNumericData(invals, "spds") if tdst[1] == "num" else self.addListCatData(invals, "spds") | |
| inen = self.getAnyEntropy("spds", tdst[1], nbins)["entropy"] | |
| self.addListNumericData(exvals, "spds") if tdst[1] == "num" else self.addListCatData(exvals, "spds") | |
| exen = self.getAnyEntropy("spds", tdst[1], nbins)["entropy"] | |
| ig = tentr - (len(invals) * inen / sz + len(exvals) * exen / sz) | |
| if maxig is None or ig > maxig: | |
| maxig = ig | |
| pa = (ds, maxig) | |
| sfds.append(pa) | |
| #sort of info gain | |
| sfds.sort(key = lambda v : v[1], reverse = True) | |
| result = self.__printResult("selFeatures", sfds[:nfeatures]) | |
| return result | |
| def __stackData(self, *dsl): | |
| """ | |
| stacks collumd to create matrix | |
| Parameters | |
| dsl: data source list | |
| """ | |
| dlist = tuple(map(lambda ds : self.getNumericData(ds), dsl)) | |
| self.ensureSameSize(dlist) | |
| dmat = np.column_stack(dlist) | |
| return dmat | |
| def __printBanner(self, msg, *dsl): | |
| """ | |
| print banner for any function | |
| Parameters | |
| msg: message | |
| dsl: list of data set name or list or numpy array | |
| """ | |
| tags = list(map(lambda ds : ds if type(ds) == str else "annoynymous", dsl)) | |
| forData = " for data sets " if tags else "" | |
| msg = msg + forData + " ".join(tags) | |
| if self.verbose: | |
| print("\n== " + msg + " ==") | |
| def __printDone(self): | |
| """ | |
| print banner for any function | |
| """ | |
| if self.verbose: | |
| print("done") | |
| def __printStat(self, stat, pvalue, nhMsg, ahMsg, sigLev=.05): | |
| """ | |
| generic stat and pvalue output | |
| Parameters | |
| stat : stat value | |
| pvalue : p value | |
| nhMsg : null hypothesis violation message | |
| ahMsg : null hypothesis message | |
| sigLev : significance level | |
| """ | |
| if self.verbose: | |
| print("\ntest result:") | |
| print("stat: {:.3f}".format(stat)) | |
| print("pvalue: {:.3f}".format(pvalue)) | |
| print("significance level: {:.3f}".format(sigLev)) | |
| print(nhMsg if pvalue > sigLev else ahMsg) | |
| def __printResult(self, *values): | |
| """ | |
| print results | |
| Parameters | |
| values : flattened kay and value pairs | |
| """ | |
| result = dict() | |
| assert len(values) % 2 == 0, "key value list should have even number of items" | |
| for i in range(0, len(values), 2): | |
| result[values[i]] = values[i+1] | |
| if self.verbose: | |
| print("result details:") | |
| self.pp.pprint(result) | |
| return result | |
| def __getBasicStats(self, data): | |
| """ | |
| get mean and std dev | |
| Parameters | |
| data : numpy array | |
| """ | |
| mean = np.average(data) | |
| sd = np.std(data) | |
| r = (mean, sd, np.max(data), np.min(data)) | |
| return r | |