Spaces:
Runtime error
Runtime error
| #!/usr/local/bin/python3 | |
| # Author: Pranab Ghosh | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"); you | |
| # may not use this file except in compliance with the License. You may | |
| # obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or | |
| # implied. See the License for the specific language governing | |
| # permissions and limitations under the License. | |
| import os | |
| import sys | |
| from random import randint | |
| import random | |
| import time | |
| import uuid | |
| from datetime import datetime | |
| import math | |
| import numpy as np | |
| import pandas as pd | |
| import matplotlib.pyplot as plt | |
| import numpy as np | |
| import logging | |
| import logging.handlers | |
| import pickle | |
| from contextlib import contextmanager | |
| tokens = ["0","1","2","3","4","5","6","7","8","9","A","B","C","D","E","F","G","H","I","J","K","L","M", | |
| "N","O","P","Q","R","S","T","U","V","W","X","Y","Z","0","1","2","3","4","5","6","7","8","9"] | |
| numTokens = tokens[:10] | |
| alphaTokens = tokens[10:36] | |
| loCaseChars = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k","l","m","n","o", | |
| "p","q","r","s","t","u","v","w","x","y","z"] | |
| typeInt = "int" | |
| typeFloat = "float" | |
| typeString = "string" | |
| secInMinute = 60 | |
| secInHour = 60 * 60 | |
| secInDay = 24 * secInHour | |
| secInWeek = 7 * secInDay | |
| secInYear = 365 * secInDay | |
| secInMonth = secInYear / 12 | |
| minInHour = 60 | |
| minInDay = 24 * minInHour | |
| ftPerYard = 3 | |
| ftPerMile = ftPerYard * 1760 | |
| def genID(size): | |
| """ | |
| generates ID | |
| Parameters | |
| size : size of ID | |
| """ | |
| id = "" | |
| for i in range(size): | |
| id = id + selectRandomFromList(tokens) | |
| return id | |
| def genIdList(numId, idSize): | |
| """ | |
| generate list of IDs | |
| Parameters: | |
| numId: number of Ids | |
| idSize: ID size | |
| """ | |
| iDs = [] | |
| for i in range(numId): | |
| iDs.append(genID(idSize)) | |
| return iDs | |
| def genNumID(size): | |
| """ | |
| generates ID consisting of digits onl | |
| Parameters | |
| size : size of ID | |
| """ | |
| id = "" | |
| for i in range(size): | |
| id = id + selectRandomFromList(numTokens) | |
| return id | |
| def genLowCaseID(size): | |
| """ | |
| generates ID consisting of lower case chars | |
| Parameters | |
| size : size of ID | |
| """ | |
| id = "" | |
| for i in range(size): | |
| id = id + selectRandomFromList(loCaseChars) | |
| return id | |
| def genNumIdList(numId, idSize): | |
| """ | |
| generate list of numeric IDs | |
| Parameters: | |
| numId: number of Ids | |
| idSize: ID size | |
| """ | |
| iDs = [] | |
| for i in range(numId): | |
| iDs.append(genNumID(idSize)) | |
| return iDs | |
| def genNameInitial(): | |
| """ | |
| generate name initial | |
| """ | |
| return selectRandomFromList(alphaTokens) + selectRandomFromList(alphaTokens) | |
| def genPhoneNum(arCode): | |
| """ | |
| generates phone number | |
| Parameters | |
| arCode: area code | |
| """ | |
| phNum = genNumID(7) | |
| return arCode + str(phNum) | |
| def selectRandomFromList(ldata): | |
| """ | |
| select an element randomly from a lis | |
| Parameters | |
| ldata : list data | |
| """ | |
| return ldata[randint(0, len(ldata)-1)] | |
| def selectOtherRandomFromList(ldata, cval): | |
| """ | |
| select an element randomly from a list excluding the given one | |
| Parameters | |
| ldata : list data | |
| cval : value to be excluded | |
| """ | |
| nval = selectRandomFromList(ldata) | |
| while nval == cval: | |
| nval = selectRandomFromList(ldata) | |
| return nval | |
| def selectRandomSubListFromList(ldata, num): | |
| """ | |
| generates random sublist from a list without replacemment | |
| Parameters | |
| ldata : list data | |
| num : output list size | |
| """ | |
| assertLesser(num, len(ldata), "size of sublist to be sampled greater than or equal to main list") | |
| i = randint(0, len(ldata)-1) | |
| sel = ldata[i] | |
| selSet = {i} | |
| selList = [sel] | |
| while (len(selSet) < num): | |
| i = randint(0, len(ldata)-1) | |
| if (i not in selSet): | |
| sel = ldata[i] | |
| selSet.add(i) | |
| selList.append(sel) | |
| return selList | |
| def selectRandomSubListFromListWithRepl(ldata, num): | |
| """ | |
| generates random sublist from a list with replacemment | |
| Parameters | |
| ldata : list data | |
| num : output list size | |
| """ | |
| return list(map(lambda i : selectRandomFromList(ldata), range(num))) | |
| def selectRandomFromDict(ddata): | |
| """ | |
| select an element randomly from a dictionary | |
| Parameters | |
| ddata : dictionary data | |
| """ | |
| dkeys = list(ddata.keys()) | |
| dk = selectRandomFromList(dkeys) | |
| el = (dk, ddata[dk]) | |
| return el | |
| def setListRandomFromList(ldata, ldataRepl): | |
| """ | |
| sets some elents in the first list randomly with elements from the second list | |
| Parameters | |
| ldata : list data | |
| ldataRepl : list with replacement data | |
| """ | |
| l = len(ldata) | |
| selSet = set() | |
| for d in ldataRepl: | |
| i = randint(0, l-1) | |
| while i in selSet: | |
| i = randint(0, l-1) | |
| ldata[i] = d | |
| selSet.add(i) | |
| def genIpAddress(): | |
| """ | |
| generates IP address | |
| """ | |
| i1 = randint(0,256) | |
| i2 = randint(0,256) | |
| i3 = randint(0,256) | |
| i4 = randint(0,256) | |
| ip = "%d.%d.%d.%d" %(i1,i2,i3,i4) | |
| return ip | |
| def curTimeMs(): | |
| """ | |
| current time in ms | |
| """ | |
| return int((datetime.utcnow() - datetime(1970,1,1)).total_seconds() * 1000) | |
| def secDegPolyFit(x1, y1, x2, y2, x3, y3): | |
| """ | |
| second deg polynomial | |
| Parameters | |
| x1 : 1st point x | |
| y1 : 1st point y | |
| x2 : 2nd point x | |
| y2 : 2nd point y | |
| x3 : 3rd point x | |
| y3 : 3rd point y | |
| """ | |
| t = (y1 - y2) / (x1 - x2) | |
| a = t - (y2 - y3) / (x2 - x3) | |
| a = a / (x1 - x3) | |
| b = t - a * (x1 + x2) | |
| c = y1 - a * x1 * x1 - b * x1 | |
| return (a, b, c) | |
| def range_limit(val, minv, maxv): | |
| """ | |
| range limit a value | |
| Parameters | |
| val : data value | |
| minv : minimum | |
| maxv : maximum | |
| """ | |
| if (val < minv): | |
| val = minv | |
| elif (val > maxv): | |
| val = maxv | |
| return val | |
| def rangeLimit(val, minv, maxv): | |
| """ | |
| range limit a value | |
| Parameters | |
| val : data value | |
| minv : minimum | |
| maxv : maximum | |
| """ | |
| return range_limit(val, minv, maxv) | |
| def isInRange(val, minv, maxv): | |
| """ | |
| checks if within range | |
| Parameters | |
| val : data value | |
| minv : minimum | |
| maxv : maximum | |
| """ | |
| return val >= minv and val <= maxv | |
| def stripFileLines(filePath, offset): | |
| """ | |
| strips number of chars from both ends | |
| Parameters | |
| filePath : file path | |
| offset : offset from both ends of line | |
| """ | |
| fp = open(filePath, "r") | |
| for line in fp: | |
| stripped = line[offset:len(line) - 1 - offset] | |
| print (stripped) | |
| fp.close() | |
| def genLatLong(lat1, long1, lat2, long2): | |
| """ | |
| generate lat log within limits | |
| Parameters | |
| lat1 : lat of 1st point | |
| long1 : long of 1st point | |
| lat2 : lat of 2nd point | |
| long2 : long of 2nd point | |
| """ | |
| lat = lat1 + (lat2 - lat1) * random.random() | |
| longg = long1 + (long2 - long1) * random.random() | |
| return (lat, longg) | |
| def geoDistance(lat1, long1, lat2, long2): | |
| """ | |
| find geo distance in ft | |
| Parameters | |
| lat1 : lat of 1st point | |
| long1 : long of 1st point | |
| lat2 : lat of 2nd point | |
| long2 : long of 2nd point | |
| """ | |
| latDiff = math.radians(lat1 - lat2) | |
| longDiff = math.radians(long1 - long2) | |
| l1 = math.sin(latDiff/2.0) | |
| l2 = math.sin(longDiff/2.0) | |
| l3 = math.cos(math.radians(lat1)) | |
| l4 = math.cos(math.radians(lat2)) | |
| a = l1 * l1 + l3 * l4 * l2 * l2 | |
| l5 = math.sqrt(a) | |
| l6 = math.sqrt(1.0 - a) | |
| c = 2.0 * math.atan2(l5, l6) | |
| r = 6371008.8 * 3.280840 | |
| return c * r | |
| def minLimit(val, limit): | |
| """ | |
| min limit | |
| Parameters | |
| """ | |
| if (val < limit): | |
| val = limit | |
| return val; | |
| def maxLimit(val, limit): | |
| """ | |
| max limit | |
| Parameters | |
| """ | |
| if (val > limit): | |
| val = limit | |
| return val; | |
| def rangeSample(val, minLim, maxLim): | |
| """ | |
| if out side range sample within range | |
| Parameters | |
| val : value | |
| minLim : minimum | |
| maxLim : maximum | |
| """ | |
| if val < minLim or val > maxLim: | |
| val = randint(minLim, maxLim) | |
| return val | |
| def genRandomIntListWithinRange(size, minLim, maxLim): | |
| """ | |
| random unique list of integers within range | |
| Parameters | |
| size : size of returned list | |
| minLim : minimum | |
| maxLim : maximum | |
| """ | |
| values = set() | |
| for i in range(size): | |
| val = randint(minLim, maxLim) | |
| while val not in values: | |
| values.add(val) | |
| return list(values) | |
| def preturbScalar(value, vrange, distr="uniform"): | |
| """ | |
| preturbs a mutiplicative value within range | |
| Parameters | |
| value : data value | |
| vrange : value delta fraction | |
| distr : noise distribution type | |
| """ | |
| if distr == "uniform": | |
| scale = 1.0 - vrange + 2 * vrange * random.random() | |
| elif distr == "normal": | |
| scale = 1.0 + np.random.normal(0, vrange) | |
| else: | |
| exisWithMsg("unknown noise distr " + distr) | |
| return value * scale | |
| def preturbScalarAbs(value, vrange): | |
| """ | |
| preturbs an absolute value within range | |
| Parameters | |
| value : data value | |
| vrange : value delta absolute | |
| """ | |
| delta = - vrange + 2.0 * vrange * random.random() | |
| return value + delta | |
| def preturbVector(values, vrange): | |
| """ | |
| preturbs a list within range | |
| Parameters | |
| values : list data | |
| vrange : value delta fraction | |
| """ | |
| nValues = list(map(lambda va: preturbScalar(va, vrange), values)) | |
| return nValues | |
| def randomShiftVector(values, smin, smax): | |
| """ | |
| shifts a list by a random quanity with a range | |
| Parameters | |
| values : list data | |
| smin : samplinf minimum | |
| smax : sampling maximum | |
| """ | |
| shift = np.random.uniform(smin, smax) | |
| return list(map(lambda va: va + shift, values)) | |
| def floatRange(beg, end, incr): | |
| """ | |
| generates float range | |
| Parameters | |
| beg :range begin | |
| end: range end | |
| incr : range increment | |
| """ | |
| return list(np.arange(beg, end, incr)) | |
| def shuffle(values, *numShuffles): | |
| """ | |
| in place shuffling with swap of pairs | |
| Parameters | |
| values : list data | |
| numShuffles : parameter list for number of shuffles | |
| """ | |
| size = len(values) | |
| if len(numShuffles) == 0: | |
| numShuffle = int(size / 2) | |
| elif len(numShuffles) == 1: | |
| numShuffle = numShuffles[0] | |
| else: | |
| numShuffle = randint(numShuffles[0], numShuffles[1]) | |
| print("numShuffle {}".format(numShuffle)) | |
| for i in range(numShuffle): | |
| first = random.randint(0, size - 1) | |
| second = random.randint(0, size - 1) | |
| while first == second: | |
| second = random.randint(0, size - 1) | |
| tmp = values[first] | |
| values[first] = values[second] | |
| values[second] = tmp | |
| def splitList(itms, numGr): | |
| """ | |
| splits a list into sub lists of approximately equal size, with items in sublists randomly chod=sen | |
| Parameters | |
| itms ; list of values | |
| numGr : no of groups | |
| """ | |
| tcount = len(itms) | |
| cItems = list(itms) | |
| sz = int(len(cItems) / numGr) | |
| groups = list() | |
| count = 0 | |
| for i in range(numGr): | |
| if (i == numGr - 1): | |
| csz = tcount - count | |
| else: | |
| csz = sz + randint(-2, 2) | |
| count += csz | |
| gr = list() | |
| for j in range(csz): | |
| it = selectRandomFromList(cItems) | |
| gr.append(it) | |
| cItems.remove(it) | |
| groups.append(gr) | |
| return groups | |
| def multVector(values, vrange): | |
| """ | |
| multiplies a list within value range | |
| Parameters | |
| values : list of values | |
| vrange : fraction of vaue to be used to update | |
| """ | |
| scale = 1.0 - vrange + 2 * vrange * random.random() | |
| nValues = list(map(lambda va: va * scale, values)) | |
| return nValues | |
| def weightedAverage(values, weights): | |
| """ | |
| calculates weighted average | |
| Parameters | |
| values : list of values | |
| weights : list of weights | |
| """ | |
| assert len(values) == len(weights), "values and weights should be same size" | |
| vw = zip(values, weights) | |
| wva = list(map(lambda e : e[0] * e[1], vw)) | |
| #wa = sum(x * y for x, y in vw) / sum(weights) | |
| wav = sum(wva) / sum(weights) | |
| return wav | |
| def extractFields(line, delim, keepIndices): | |
| """ | |
| breaks a line into fields and keeps only specified fileds and returns new line | |
| Parameters | |
| line ; deli separated string | |
| delim : delemeter | |
| keepIndices : list of indexes to fields to be retained | |
| """ | |
| items = line.split(delim) | |
| newLine = [] | |
| for i in keepIndices: | |
| newLine.append(line[i]) | |
| return delim.join(newLine) | |
| def remFields(line, delim, remIndices): | |
| """ | |
| removes fields from delim separated string | |
| Parameters | |
| line ; delemeter separated string | |
| delim : delemeter | |
| remIndices : list of indexes to fields to be removed | |
| """ | |
| items = line.split(delim) | |
| newLine = [] | |
| for i in range(len(items)): | |
| if not arrayContains(remIndices, i): | |
| newLine.append(line[i]) | |
| return delim.join(newLine) | |
| def extractList(data, indices): | |
| """ | |
| extracts list from another list, given indices | |
| Parameters | |
| remIndices : list data | |
| indices : list of indexes to fields to be retained | |
| """ | |
| if areAllFieldsIncluded(data, indices): | |
| exList = data.copy() | |
| #print("all indices") | |
| else: | |
| exList = list() | |
| le = len(data) | |
| for i in indices: | |
| assert i < le , "index {} out of bound {}".format(i, le) | |
| exList.append(data[i]) | |
| return exList | |
| def arrayContains(arr, item): | |
| """ | |
| checks if array contains an item | |
| Parameters | |
| arr : list data | |
| item : item to search | |
| """ | |
| contains = True | |
| try: | |
| arr.index(item) | |
| except ValueError: | |
| contains = False | |
| return contains | |
| def strToIntArray(line, delim=","): | |
| """ | |
| int array from delim separated string | |
| Parameters | |
| line ; delemeter separated string | |
| """ | |
| arr = line.split(delim) | |
| return [int(a) for a in arr] | |
| def strToFloatArray(line, delim=","): | |
| """ | |
| float array from delim separated string | |
| Parameters | |
| line ; delemeter separated string | |
| """ | |
| arr = line.split(delim) | |
| return [float(a) for a in arr] | |
| def strListOrRangeToIntArray(line): | |
| """ | |
| int array from delim separated string or range | |
| Parameters | |
| line ; delemeter separated string | |
| """ | |
| varr = line.split(",") | |
| if (len(varr) > 1): | |
| iarr = list(map(lambda v: int(v), varr)) | |
| else: | |
| vrange = line.split(":") | |
| if (len(vrange) == 2): | |
| lo = int(vrange[0]) | |
| hi = int(vrange[1]) | |
| iarr = list(range(lo, hi+1)) | |
| else: | |
| iarr = [int(line)] | |
| return iarr | |
| def toStr(val, precision): | |
| """ | |
| converts any type to string | |
| Parameters | |
| val : value | |
| precision ; precision for float value | |
| """ | |
| if type(val) == float or type(val) == np.float64 or type(val) == np.float32: | |
| format = "%" + ".%df" %(precision) | |
| sVal = format %(val) | |
| else: | |
| sVal = str(val) | |
| return sVal | |
| def toStrFromList(values, precision, delim=","): | |
| """ | |
| converts list of any type to delim separated string | |
| Parameters | |
| values : list data | |
| precision ; precision for float value | |
| delim : delemeter | |
| """ | |
| sValues = list(map(lambda v: toStr(v, precision), values)) | |
| return delim.join(sValues) | |
| def toIntList(values): | |
| """ | |
| convert to int list | |
| Parameters | |
| values : list data | |
| """ | |
| return list(map(lambda va: int(va), values)) | |
| def toFloatList(values): | |
| """ | |
| convert to float list | |
| Parameters | |
| values : list data | |
| """ | |
| return list(map(lambda va: float(va), values)) | |
| def toStrList(values, precision=None): | |
| """ | |
| convert to string list | |
| Parameters | |
| values : list data | |
| precision ; precision for float value | |
| """ | |
| return list(map(lambda va: toStr(va, precision), values)) | |
| def toIntFromBoolean(value): | |
| """ | |
| convert to int | |
| Parameters | |
| value : boolean value | |
| """ | |
| ival = 1 if value else 0 | |
| return ival | |
| def scaleBySum(ldata): | |
| """ | |
| scales so that sum is 1 | |
| Parameters | |
| ldata : list data | |
| """ | |
| s = sum(ldata) | |
| return list(map(lambda e : e/s, ldata)) | |
| def scaleByMax(ldata): | |
| """ | |
| scales so that max value is 1 | |
| Parameters | |
| ldata : list data | |
| """ | |
| m = max(ldata) | |
| return list(map(lambda e : e/m, ldata)) | |
| def typedValue(val, dtype=None): | |
| """ | |
| return typed value given string, discovers data type if not specified | |
| Parameters | |
| val : value | |
| dtype : data type | |
| """ | |
| tVal = None | |
| if dtype is not None: | |
| if dtype == "num": | |
| dtype = "int" if dtype.find(".") == -1 else "float" | |
| if dtype == "int": | |
| tVal = int(val) | |
| elif dtype == "float": | |
| tVal = float(val) | |
| elif dtype == "bool": | |
| tVal = bool(val) | |
| else: | |
| tVal = val | |
| else: | |
| if type(val) == str: | |
| lVal = val.lower() | |
| #int | |
| done = True | |
| try: | |
| tVal = int(val) | |
| except ValueError: | |
| done = False | |
| #float | |
| if not done: | |
| done = True | |
| try: | |
| tVal = float(val) | |
| except ValueError: | |
| done = False | |
| #boolean | |
| if not done: | |
| done = True | |
| if lVal == "true": | |
| tVal = True | |
| elif lVal == "false": | |
| tVal = False | |
| else: | |
| done = False | |
| #None | |
| if not done: | |
| if lVal == "none": | |
| tVal = None | |
| else: | |
| tVal = val | |
| else: | |
| tVal = val | |
| return tVal | |
| def isInt(val): | |
| """ | |
| return true if string is int and the typed value | |
| Parameters | |
| val : value | |
| """ | |
| valInt = True | |
| try: | |
| tVal = int(val) | |
| except ValueError: | |
| valInt = False | |
| tVal = None | |
| r = (valInt, tVal) | |
| return r | |
| def isFloat(val): | |
| """ | |
| return true if string is float | |
| Parameters | |
| val : value | |
| """ | |
| valFloat = True | |
| try: | |
| tVal = float(val) | |
| except ValueError: | |
| valFloat = False | |
| tVal = None | |
| r = (valFloat, tVal) | |
| return r | |
| def getAllFiles(dirPath): | |
| """ | |
| get all files recursively | |
| Parameters | |
| dirPath : directory path | |
| """ | |
| filePaths = [] | |
| for (thisDir, subDirs, fileNames) in os.walk(dirPath): | |
| for fileName in fileNames: | |
| filePaths.append(os.path.join(thisDir, fileName)) | |
| filePaths.sort() | |
| return filePaths | |
| def getFileContent(fpath, verbose=False): | |
| """ | |
| get file contents in directory | |
| Parameters | |
| fpath ; directory path | |
| verbose : verbosity flag | |
| """ | |
| # dcument list | |
| docComplete = [] | |
| filePaths = getAllFiles(fpath) | |
| # read files | |
| for filePath in filePaths: | |
| if verbose: | |
| print("next file " + filePath) | |
| with open(filePath, 'r') as contentFile: | |
| content = contentFile.read() | |
| docComplete.append(content) | |
| return (docComplete, filePaths) | |
| def getOneFileContent(fpath): | |
| """ | |
| get one file contents | |
| Parameters | |
| fpath : file path | |
| """ | |
| with open(fpath, 'r') as contentFile: | |
| docStr = contentFile.read() | |
| return docStr | |
| def getFileLines(dirPath, delim=","): | |
| """ | |
| get lines from a file | |
| Parameters | |
| dirPath : file path | |
| delim : delemeter | |
| """ | |
| lines = list() | |
| for li in fileRecGen(dirPath, delim): | |
| lines.append(li) | |
| return lines | |
| def getFileSampleLines(dirPath, percen, delim=","): | |
| """ | |
| get sampled lines from a file | |
| Parameters | |
| dirPath : file path | |
| percen : sampling percentage | |
| delim : delemeter | |
| """ | |
| lines = list() | |
| for li in fileRecGen(dirPath, delim): | |
| if randint(0, 100) < percen: | |
| lines.append(li) | |
| return lines | |
| def getFileColumnAsString(dirPath, index, delim=","): | |
| """ | |
| get string column from a file | |
| Parameters | |
| dirPath : file path | |
| index : index | |
| delim : delemeter | |
| """ | |
| fields = list() | |
| for rec in fileRecGen(dirPath, delim): | |
| fields.append(rec[index]) | |
| #print(fields) | |
| return fields | |
| def getFileColumnsAsString(dirPath, indexes, delim=","): | |
| """ | |
| get multiple string columns from a file | |
| Parameters | |
| dirPath : file path | |
| indexes : indexes of columns | |
| delim : delemeter | |
| """ | |
| nindex = len(indexes) | |
| columns = list(map(lambda i : list(), range(nindex))) | |
| for rec in fileRecGen(dirPath, delim): | |
| for i in range(nindex): | |
| columns[i].append(rec[indexes[i]]) | |
| return columns | |
| def getFileColumnAsFloat(dirPath, index, delim=","): | |
| """ | |
| get float fileds from a file | |
| Parameters | |
| dirPath : file path | |
| index : index | |
| delim : delemeter | |
| """ | |
| #print("{} {}".format(dirPath, index)) | |
| fields = getFileColumnAsString(dirPath, index, delim) | |
| return list(map(lambda v:float(v), fields)) | |
| def getFileColumnAsInt(dirPath, index, delim=","): | |
| """ | |
| get float fileds from a file | |
| Parameters | |
| dirPath : file path | |
| index : index | |
| delim : delemeter | |
| """ | |
| fields = getFileColumnAsString(dirPath, index, delim) | |
| return list(map(lambda v:int(v), fields)) | |
| def getFileAsIntMatrix(dirPath, columns, delim=","): | |
| """ | |
| extracts int matrix from csv file given column indices with each row being concatenation of | |
| extracted column values row size = num of columns | |
| Parameters | |
| dirPath : file path | |
| columns : indexes of columns | |
| delim : delemeter | |
| """ | |
| mat = list() | |
| for rec in fileSelFieldsRecGen(dirPath, columns, delim): | |
| mat.append(asIntList(rec)) | |
| return mat | |
| def getFileAsFloatMatrix(dirPath, columns, delim=","): | |
| """ | |
| extracts float matrix from csv file given column indices with each row being concatenation of | |
| extracted column values row size = num of columns | |
| Parameters | |
| dirPath : file path | |
| columns : indexes of columns | |
| delim : delemeter | |
| """ | |
| mat = list() | |
| for rec in fileSelFieldsRecGen(dirPath, columns, delim): | |
| mat.append(asFloatList(rec)) | |
| return mat | |
| def getFileAsFloatColumn(dirPath): | |
| """ | |
| grt float list from a file with one float per row | |
| Parameters | |
| dirPath : file path | |
| """ | |
| flist = list() | |
| for rec in fileRecGen(dirPath, None): | |
| flist.append(float(rec)) | |
| return flist | |
| def getFileAsFiltFloatMatrix(dirPath, filt, columns, delim=","): | |
| """ | |
| extracts float matrix from csv file given row filter and column indices with each row being | |
| concatenation of extracted column values row size = num of columns | |
| Parameters | |
| dirPath : file path | |
| columns : indexes of columns | |
| filt : row filter lambda | |
| delim : delemeter | |
| """ | |
| mat = list() | |
| for rec in fileFiltSelFieldsRecGen(dirPath, filt, columns, delim): | |
| mat.append(asFloatList(rec)) | |
| return mat | |
| def getFileAsTypedRecords(dirPath, types, delim=","): | |
| """ | |
| extracts typed records from csv file with each row being concatenation of | |
| extracted column values | |
| Parameters | |
| dirPath : file path | |
| types : data types | |
| delim : delemeter | |
| """ | |
| (dtypes, cvalues) = extractTypesFromString(types) | |
| tdata = list() | |
| for rec in fileRecGen(dirPath, delim): | |
| trec = list() | |
| for index, value in enumerate(rec): | |
| value = __convToTyped(index, value, dtypes) | |
| trec.append(value) | |
| tdata.append(trec) | |
| return tdata | |
| def getFileColsAsTypedRecords(dirPath, columns, types, delim=","): | |
| """ | |
| extracts typed records from csv file given column indices with each row being concatenation of | |
| extracted column values | |
| Parameters | |
| Parameters | |
| dirPath : file path | |
| columns : column indexes | |
| types : data types | |
| delim : delemeter | |
| """ | |
| (dtypes, cvalues) = extractTypesFromString(types) | |
| tdata = list() | |
| for rec in fileSelFieldsRecGen(dirPath, columns, delim): | |
| trec = list() | |
| for indx, value in enumerate(rec): | |
| tindx = columns[indx] | |
| value = __convToTyped(tindx, value, dtypes) | |
| trec.append(value) | |
| tdata.append(trec) | |
| return tdata | |
| def getFileColumnsMinMax(dirPath, columns, dtype, delim=","): | |
| """ | |
| extracts numeric matrix from csv file given column indices. For each column return min and max | |
| Parameters | |
| dirPath : file path | |
| columns : column indexes | |
| dtype : data type | |
| delim : delemeter | |
| """ | |
| dtypes = list(map(lambda c : str(c) + ":" + dtype, columns)) | |
| dtypes = ",".join(dtypes) | |
| #print(dtypes) | |
| tdata = getFileColsAsTypedRecords(dirPath, columns, dtypes, delim) | |
| minMax = list() | |
| ncola = len(tdata[0]) | |
| ncole = len(columns) | |
| assertEqual(ncola, ncole, "actual no of columns different from expected") | |
| for ci in range(ncole): | |
| vmin = sys.float_info.max | |
| vmax = sys.float_info.min | |
| for r in tdata: | |
| cv = r[ci] | |
| vmin = cv if cv < vmin else vmin | |
| vmax = cv if cv > vmax else vmax | |
| mm = (vmin, vmax, vmax - vmin) | |
| minMax.append(mm) | |
| return minMax | |
| def getRecAsTypedRecord(rec, types, delim=None): | |
| """ | |
| converts record to typed records | |
| Parameters | |
| rec : delemeter separate string or list of string | |
| types : field data types | |
| delim : delemeter | |
| """ | |
| if delim is not None: | |
| rec = rec.split(delim) | |
| (dtypes, cvalues) = extractTypesFromString(types) | |
| #print(types) | |
| #print(dtypes) | |
| trec = list() | |
| for ind, value in enumerate(rec): | |
| tvalue = __convToTyped(ind, value, dtypes) | |
| trec.append(tvalue) | |
| return trec | |
| def __convToTyped(index, value, dtypes): | |
| """ | |
| convert to typed value | |
| Parameters | |
| index : index in type list | |
| value : data value | |
| dtypes : data type list | |
| """ | |
| #print(index, value) | |
| dtype = dtypes[index] | |
| tvalue = value | |
| if dtype == "int": | |
| tvalue = int(value) | |
| elif dtype == "float": | |
| tvalue = float(value) | |
| return tvalue | |
| def extractTypesFromString(types): | |
| """ | |
| extracts column data types and set values for categorical variables | |
| Parameters | |
| types : encoded type information | |
| """ | |
| ftypes = types.split(",") | |
| dtypes = dict() | |
| cvalues = dict() | |
| for ftype in ftypes: | |
| items = ftype.split(":") | |
| cindex = int(items[0]) | |
| dtype = items[1] | |
| dtypes[cindex] = dtype | |
| if len(items) == 3: | |
| sitems = items[2].split() | |
| cvalues[cindex] = sitems | |
| return (dtypes, cvalues) | |
| def getMultipleFileAsInttMatrix(dirPathWithCol, delim=","): | |
| """ | |
| extracts int matrix from from csv files given column index for each file. | |
| num of columns = number of rows in each file and num of rows = number of files | |
| Parameters | |
| dirPathWithCol: list of file path and collumn index pair | |
| delim : delemeter | |
| """ | |
| mat = list() | |
| minLen = -1 | |
| for path, col in dirPathWithCol: | |
| colVals = getFileColumnAsInt(path, col, delim) | |
| if minLen < 0 or len(colVals) < minLen: | |
| minLen = len(colVals) | |
| mat.append(colVals) | |
| #make all same length | |
| mat = list(map(lambda li:li[:minLen], mat)) | |
| return mat | |
| def getMultipleFileAsFloatMatrix(dirPathWithCol, delim=","): | |
| """ | |
| extracts float matrix from from csv files given column index for each file. | |
| num of columns = number of rows in each file and num of rows = number of files | |
| Parameters | |
| dirPathWithCol: list of file path and collumn index pair | |
| delim : delemeter | |
| """ | |
| mat = list() | |
| minLen = -1 | |
| for path, col in dirPathWithCol: | |
| colVals = getFileColumnAsFloat(path, col, delim) | |
| if minLen < 0 or len(colVals) < minLen: | |
| minLen = len(colVals) | |
| mat.append(colVals) | |
| #make all same length | |
| mat = list(map(lambda li:li[:minLen], mat)) | |
| return mat | |
| def writeStrListToFile(ldata, filePath, delem=","): | |
| """ | |
| writes list of dlem separated string or list of list of string to afile | |
| Parameters | |
| ldata : list data | |
| filePath : file path | |
| delim : delemeter | |
| """ | |
| with open(filePath, "w") as fh: | |
| for r in ldata: | |
| if type(r) == list: | |
| r = delem.join(r) | |
| fh.write(r + "\n") | |
| def writeFloatListToFile(ldata, prec, filePath): | |
| """ | |
| writes float list to file, one value per line | |
| Parameters | |
| ldata : list data | |
| prec : precision | |
| filePath : file path | |
| """ | |
| with open(filePath, "w") as fh: | |
| for d in ldata: | |
| fh.write(formatFloat(prec, d) + "\n") | |
| def mutateFileLines(dirPath, mutator, marg, delim=","): | |
| """ | |
| mutates lines from a file | |
| Parameters | |
| dirPath : file path | |
| mutator : mutation callback | |
| marg : argument for mutation call back | |
| delim : delemeter | |
| """ | |
| lines = list() | |
| for li in fileRecGen(dirPath, delim): | |
| li = mutator(li) if marg is None else mutator(li, marg) | |
| lines.append(li) | |
| return lines | |
| def takeFirst(elems): | |
| """ | |
| return fisrt item | |
| Parameters | |
| elems : list of data | |
| """ | |
| return elems[0] | |
| def takeSecond(elems): | |
| """ | |
| return 2nd element | |
| Parameters | |
| elems : list of data | |
| """ | |
| return elems[1] | |
| def takeThird(elems): | |
| """ | |
| returns 3rd element | |
| Parameters | |
| elems : list of data | |
| """ | |
| return elems[2] | |
| def addToKeyedCounter(dCounter, key, count=1): | |
| """ | |
| add to to keyed counter | |
| Parameters | |
| dCounter : dictionary of counters | |
| key : dictionary key | |
| count : count to add | |
| """ | |
| curCount = dCounter.get(key, 0) | |
| dCounter[key] = curCount + count | |
| def incrKeyedCounter(dCounter, key): | |
| """ | |
| increment keyed counter | |
| Parameters | |
| dCounter : dictionary of counters | |
| key : dictionary key | |
| """ | |
| addToKeyedCounter(dCounter, key, 1) | |
| def appendKeyedList(dList, key, elem): | |
| """ | |
| keyed list | |
| Parameters | |
| dList : dictionary of lists | |
| key : dictionary key | |
| elem : value to append | |
| """ | |
| curList = dList.get(key, []) | |
| curList.append(elem) | |
| dList[key] = curList | |
| def isNumber(st): | |
| """ | |
| Returns True is string is a number | |
| Parameters | |
| st : string value | |
| """ | |
| return st.replace('.','',1).isdigit() | |
| def removeNan(values): | |
| """ | |
| removes nan from list | |
| Parameters | |
| values : list data | |
| """ | |
| return list(filter(lambda v: not math.isnan(v), values)) | |
| def fileRecGen(filePath, delim = ","): | |
| """ | |
| file record generator | |
| Parameters | |
| filePath ; file path | |
| delim : delemeter | |
| """ | |
| with open(filePath, "r") as fp: | |
| for line in fp: | |
| line = line[:-1] | |
| if delim is not None: | |
| line = line.split(delim) | |
| yield line | |
| def fileSelFieldsRecGen(dirPath, columns, delim=","): | |
| """ | |
| file record generator given column indices | |
| Parameters | |
| filePath ; file path | |
| columns : column indexes as int array or coma separated string | |
| delim : delemeter | |
| """ | |
| if type(columns) == str: | |
| columns = strToIntArray(columns, delim) | |
| for rec in fileRecGen(dirPath, delim): | |
| extracted = extractList(rec, columns) | |
| yield extracted | |
| def fileSelFieldValueGen(dirPath, column, delim=","): | |
| """ | |
| file record generator for a given column | |
| Parameters | |
| filePath ; file path | |
| column : column index | |
| delim : delemeter | |
| """ | |
| for rec in fileRecGen(dirPath, delim): | |
| yield rec[column] | |
| def fileFiltRecGen(filePath, filt, delim = ","): | |
| """ | |
| file record generator with row filter applied | |
| Parameters | |
| filePath ; file path | |
| filt : row filter | |
| delim : delemeter | |
| """ | |
| with open(filePath, "r") as fp: | |
| for line in fp: | |
| line = line[:-1] | |
| if delim is not None: | |
| line = line.split(delim) | |
| if filt(line): | |
| yield line | |
| def fileFiltSelFieldsRecGen(filePath, filt, columns, delim = ","): | |
| """ | |
| file record generator with row and column filter applied | |
| Parameters | |
| filePath ; file path | |
| filt : row filter | |
| columns : column indexes as int array or coma separated string | |
| delim : delemeter | |
| """ | |
| columns = strToIntArray(columns, delim) | |
| with open(filePath, "r") as fp: | |
| for line in fp: | |
| line = line[:-1] | |
| if delim is not None: | |
| line = line.split(delim) | |
| if filt(line): | |
| selected = extractList(line, columns) | |
| yield selected | |
| def fileTypedRecGen(filePath, ftypes, delim = ","): | |
| """ | |
| file typed record generator | |
| Parameters | |
| filePath ; file path | |
| ftypes : list of field types | |
| delim : delemeter | |
| """ | |
| with open(filePath, "r") as fp: | |
| for line in fp: | |
| line = line[:-1] | |
| line = line.split(delim) | |
| for i in range(0, len(ftypes), 2): | |
| ci = ftypes[i] | |
| dtype = ftypes[i+1] | |
| assertLesser(ci, len(line), "index out of bound") | |
| if dtype == "int": | |
| line[ci] = int(line[ci]) | |
| elif dtype == "float": | |
| line[ci] = float(line[ci]) | |
| else: | |
| exitWithMsg("invalid data type") | |
| yield line | |
| def fileMutatedFieldsRecGen(dirPath, mutator, delim=","): | |
| """ | |
| file record generator with some columns mutated | |
| Parameters | |
| dirPath ; file path | |
| mutator : row field mutator | |
| delim : delemeter | |
| """ | |
| for rec in fileRecGen(dirPath, delim): | |
| mutated = mutator(rec) | |
| yield mutated | |
| def tableSelFieldsFilter(tdata, columns): | |
| """ | |
| gets tabular data for selected columns | |
| Parameters | |
| tdata : tabular data | |
| columns : column indexes | |
| """ | |
| if areAllFieldsIncluded(tdata[0], columns): | |
| ntdata = tdata | |
| else: | |
| ntdata = list() | |
| for rec in tdata: | |
| #print(rec) | |
| #print(columns) | |
| nrec = extractList(rec, columns) | |
| ntdata.append(nrec) | |
| return ntdata | |
| def areAllFieldsIncluded(ldata, columns): | |
| """ | |
| return True id all indexes are in the columns | |
| Parameters | |
| ldata : list data | |
| columns : column indexes | |
| """ | |
| return list(range(len(ldata))) == columns | |
| def asIntList(items): | |
| """ | |
| returns int list | |
| Parameters | |
| items : list data | |
| """ | |
| return [int(i) for i in items] | |
| def asFloatList(items): | |
| """ | |
| returns float list | |
| Parameters | |
| items : list data | |
| """ | |
| return [float(i) for i in items] | |
| def pastTime(interval, unit): | |
| """ | |
| current and past time | |
| Parameters | |
| interval : time interval | |
| unit: time unit | |
| """ | |
| curTime = int(time.time()) | |
| if unit == "d": | |
| pastTime = curTime - interval * secInDay | |
| elif unit == "h": | |
| pastTime = curTime - interval * secInHour | |
| elif unit == "m": | |
| pastTime = curTime - interval * secInMinute | |
| else: | |
| raise ValueError("invalid time unit " + unit) | |
| return (curTime, pastTime) | |
| def minuteAlign(ts): | |
| """ | |
| minute aligned time | |
| Parameters | |
| ts : time stamp in sec | |
| """ | |
| return int((ts / secInMinute)) * secInMinute | |
| def multMinuteAlign(ts, min): | |
| """ | |
| multi minute aligned time | |
| Parameters | |
| ts : time stamp in sec | |
| min : minute value | |
| """ | |
| intv = secInMinute * min | |
| return int((ts / intv)) * intv | |
| def hourAlign(ts): | |
| """ | |
| hour aligned time | |
| Parameters | |
| ts : time stamp in sec | |
| """ | |
| return int((ts / secInHour)) * secInHour | |
| def hourOfDayAlign(ts, hour): | |
| """ | |
| hour of day aligned time | |
| Parameters | |
| ts : time stamp in sec | |
| hour : hour of day | |
| """ | |
| day = int(ts / secInDay) | |
| return (24 * day + hour) * secInHour | |
| def dayAlign(ts): | |
| """ | |
| day aligned time | |
| Parameters | |
| ts : time stamp in sec | |
| """ | |
| return int(ts / secInDay) * secInDay | |
| def timeAlign(ts, unit): | |
| """ | |
| boundary alignment of time | |
| Parameters | |
| ts : time stamp in sec | |
| unit : unit of time | |
| """ | |
| alignedTs = 0 | |
| if unit == "s": | |
| alignedTs = ts | |
| elif unit == "m": | |
| alignedTs = minuteAlign(ts) | |
| elif unit == "h": | |
| alignedTs = hourAlign(ts) | |
| elif unit == "d": | |
| alignedTs = dayAlign(ts) | |
| else: | |
| raise ValueError("invalid time unit") | |
| return alignedTs | |
| def monthOfYear(ts): | |
| """ | |
| month of year | |
| Parameters | |
| ts : time stamp in sec | |
| """ | |
| rem = ts % secInYear | |
| dow = int(rem / secInMonth) | |
| return dow | |
| def dayOfWeek(ts): | |
| """ | |
| day of week | |
| Parameters | |
| ts : time stamp in sec | |
| """ | |
| rem = ts % secInWeek | |
| dow = int(rem / secInDay) | |
| return dow | |
| def hourOfDay(ts): | |
| """ | |
| hour of day | |
| Parameters | |
| ts : time stamp in sec | |
| """ | |
| rem = ts % secInDay | |
| hod = int(rem / secInHour) | |
| return hod | |
| def processCmdLineArgs(expectedTypes, usage): | |
| """ | |
| process command line args and returns args as typed values | |
| Parameters | |
| expectedTypes : expected data types of arguments | |
| usage : usage message string | |
| """ | |
| args = [] | |
| numComLineArgs = len(sys.argv) | |
| numExpected = len(expectedTypes) | |
| if (numComLineArgs - 1 == len(expectedTypes)): | |
| try: | |
| for i in range(0, numExpected): | |
| if (expectedTypes[i] == typeInt): | |
| args.append(int(sys.argv[i+1])) | |
| elif (expectedTypes[i] == typeFloat): | |
| args.append(float(sys.argv[i+1])) | |
| elif (expectedTypes[i] == typeString): | |
| args.append(sys.argv[i+1]) | |
| except ValueError: | |
| print ("expected number of command line arguments found but there is type mis match") | |
| sys.exit(1) | |
| else: | |
| print ("expected number of command line arguments not found") | |
| print (usage) | |
| sys.exit(1) | |
| return args | |
| def mutateString(val, numMutate, ctype): | |
| """ | |
| mutate string multiple times | |
| Parameters | |
| val : string value | |
| numMutate : num of mutations | |
| ctype : type of character to mutate with | |
| """ | |
| mutations = set() | |
| count = 0 | |
| while count < numMutate: | |
| j = randint(0, len(val)-1) | |
| if j not in mutations: | |
| if ctype == "alpha": | |
| ch = selectRandomFromList(alphaTokens) | |
| elif ctype == "num": | |
| ch = selectRandomFromList(numTokens) | |
| elif ctype == "any": | |
| ch = selectRandomFromList(tokens) | |
| val = val[:j] + ch + val[j+1:] | |
| mutations.add(j) | |
| count += 1 | |
| return val | |
| def mutateList(values, numMutate, vmin, vmax, rabs=True): | |
| """ | |
| mutate list multiple times | |
| Parameters | |
| values : list value | |
| numMutate : num of mutations | |
| vmin : minimum of value range | |
| vmax : maximum of value range | |
| rabs : True if mim max range is absolute otherwise relative | |
| """ | |
| mutations = set() | |
| count = 0 | |
| while count < numMutate: | |
| j = randint(0, len(values)-1) | |
| if j not in mutations: | |
| s = np.random.uniform(vmin, vmax) | |
| values[j] = s if rabs else values[j] * s | |
| count += 1 | |
| mutations.add(j) | |
| return values | |
| def swap(values, first, second): | |
| """ | |
| swap two elements | |
| Parameters | |
| values : list value | |
| first : first swap position | |
| second : second swap position | |
| """ | |
| t = values[first] | |
| values[first] = values[second] | |
| values[second] = t | |
| def swapBetweenLists(values1, values2): | |
| """ | |
| swap two elements between 2 lists | |
| Parameters | |
| values1 : first list of values | |
| values2 : second list of values | |
| """ | |
| p1 = randint(0, len(values1)-1) | |
| p2 = randint(0, len(values2)-1) | |
| tmp = values1[p1] | |
| values1[p1] = values2[p2] | |
| values2[p2] = tmp | |
| def safeAppend(values, value): | |
| """ | |
| append only if not None | |
| Parameters | |
| values : list value | |
| value : value to append | |
| """ | |
| if value is not None: | |
| values.append(value) | |
| def getAllIndex(ldata, fldata): | |
| """ | |
| get ALL indexes of list elements | |
| Parameters | |
| ldata : list data to find index in | |
| fldata : list data for values for index look up | |
| """ | |
| return list(map(lambda e : fldata.index(e), ldata)) | |
| def findIntersection(lOne, lTwo): | |
| """ | |
| find intersection elements between 2 lists | |
| Parameters | |
| lOne : first list of data | |
| lTwo : second list of data | |
| """ | |
| sOne = set(lOne) | |
| sTwo = set(lTwo) | |
| sInt = sOne.intersection(sTwo) | |
| return list(sInt) | |
| def isIntvOverlapped(rOne, rTwo): | |
| """ | |
| checks overlap between 2 intervals | |
| Parameters | |
| rOne : first interval boundaries | |
| rTwo : second interval boundaries | |
| """ | |
| clear = rOne[1] <= rTwo[0] or rOne[0] >= rTwo[1] | |
| return not clear | |
| def isIntvLess(rOne, rTwo): | |
| """ | |
| checks if first iterval is less than second | |
| Parameters | |
| rOne : first interval boundaries | |
| rTwo : second interval boundaries | |
| """ | |
| less = rOne[1] <= rTwo[0] | |
| return less | |
| def findRank(e, values): | |
| """ | |
| find rank of value in a list | |
| Parameters | |
| e : value to compare with | |
| values : list data | |
| """ | |
| count = 1 | |
| for ve in values: | |
| if ve < e: | |
| count += 1 | |
| return count | |
| def findRanks(toBeRanked, values): | |
| """ | |
| find ranks of values in one list in another list | |
| Parameters | |
| toBeRanked : list of values for which ranks are found | |
| values : list in which rank is found : | |
| """ | |
| return list(map(lambda e: findRank(e, values), toBeRanked)) | |
| def formatFloat(prec, value, label = None): | |
| """ | |
| formats a float with optional label | |
| Parameters | |
| prec : precision | |
| value : data value | |
| label : label for data | |
| """ | |
| st = (label + " ") if label else "" | |
| formatter = "{:." + str(prec) + "f}" | |
| return st + formatter.format(value) | |
| def formatAny(value, label = None): | |
| """ | |
| formats any obkect with optional label | |
| Parameters | |
| value : data value | |
| label : label for data | |
| """ | |
| st = (label + " ") if label else "" | |
| return st + str(value) | |
| def printList(values): | |
| """ | |
| pretty print list | |
| Parameters | |
| values : list of values | |
| """ | |
| for v in values: | |
| print(v) | |
| def printMap(values, klab, vlab, precision, offset=16): | |
| """ | |
| pretty print hash map | |
| Parameters | |
| values : dictionary of values | |
| klab : label for key | |
| vlab : label for value | |
| precision : precision | |
| offset : left justify offset | |
| """ | |
| print(klab.ljust(offset, " ") + vlab) | |
| for k in values.keys(): | |
| v = values[k] | |
| ks = toStr(k, precision).ljust(offset, " ") | |
| vs = toStr(v, precision) | |
| print(ks + vs) | |
| def printPairList(values, lab1, lab2, precision, offset=16): | |
| """ | |
| pretty print list of pairs | |
| Parameters | |
| values : dictionary of values | |
| lab1 : first label | |
| lab2 : second label | |
| precision : precision | |
| offset : left justify offset | |
| """ | |
| print(lab1.ljust(offset, " ") + lab2) | |
| for (v1, v2) in values: | |
| sv1 = toStr(v1, precision).ljust(offset, " ") | |
| sv2 = toStr(v2, precision) | |
| print(sv1 + sv2) | |
| def createMap(*values): | |
| """ | |
| create disctionary with results | |
| Parameters | |
| values : sequence of key value pairs | |
| """ | |
| result = dict() | |
| for i in range(0, len(values), 2): | |
| result[values[i]] = values[i+1] | |
| return result | |
| def getColMinMax(table, col): | |
| """ | |
| return min, max values of a column | |
| Parameters | |
| table : tabular data | |
| col : column index | |
| """ | |
| vmin = None | |
| vmax = None | |
| for rec in table: | |
| value = rec[col] | |
| if vmin is None: | |
| vmin = value | |
| vmax = value | |
| else: | |
| if value < vmin: | |
| vmin = value | |
| elif value > vmax: | |
| vmax = value | |
| return (vmin, vmax, vmax - vmin) | |
| def createLogger(name, logFilePath, logLevName): | |
| """ | |
| creates logger | |
| Parameters | |
| name : logger name | |
| logFilePath : log file path | |
| logLevName : log level | |
| """ | |
| logger = logging.getLogger(name) | |
| fHandler = logging.handlers.RotatingFileHandler(logFilePath, maxBytes=1048576, backupCount=4) | |
| logLev = logLevName.lower() | |
| if logLev == "debug": | |
| logLevel = logging.DEBUG | |
| elif logLev == "info": | |
| logLevel = logging.INFO | |
| elif logLev == "warning": | |
| logLevel = logging.WARNING | |
| elif logLev == "error": | |
| logLevel = logging.ERROR | |
| elif logLev == "critical": | |
| logLevel = logging.CRITICAL | |
| else: | |
| raise ValueError("invalid log level name " + logLevelName) | |
| fHandler.setLevel(logLevel) | |
| fFormat = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") | |
| fHandler.setFormatter(fFormat) | |
| logger.addHandler(fHandler) | |
| logger.setLevel(logLevel) | |
| return logger | |
| def suppressStdout(): | |
| """ | |
| suppress stdout | |
| Parameters | |
| """ | |
| with open(os.devnull, "w") as devnull: | |
| oldStdout = sys.stdout | |
| sys.stdout = devnull | |
| try: | |
| yield | |
| finally: | |
| sys.stdout = oldStdout | |
| def exitWithMsg(msg): | |
| """ | |
| print message and exit | |
| Parameters | |
| msg : message | |
| """ | |
| print(msg + " -- quitting") | |
| sys.exit(0) | |
| def drawLine(data, yscale=None): | |
| """ | |
| line plot | |
| Parameters | |
| data : list data | |
| yscale : y axis scale | |
| """ | |
| plt.plot(data) | |
| if yscale: | |
| step = int(yscale / 10) | |
| step = int(step / 10) * 10 | |
| plt.yticks(range(0, yscale, step)) | |
| plt.show() | |
| def drawPlot(x, y, xlabel, ylabel): | |
| """ | |
| line plot | |
| Parameters | |
| x : x values | |
| y : y values | |
| xlabel : x axis label | |
| ylabel : y axis label | |
| """ | |
| if x is None: | |
| x = list(range(len(y))) | |
| plt.plot(x,y) | |
| plt.xlabel(xlabel) | |
| plt.ylabel(ylabel) | |
| plt.show() | |
| def drawPairPlot(x, y1, y2, xlabel,ylabel, y1label, y2label): | |
| """ | |
| line plot of 2 lines | |
| Parameters | |
| x : x values | |
| y1 : first y values | |
| y2 : second y values | |
| xlabel : x labbel | |
| ylabel : y label | |
| y1label : first plot label | |
| y2label : second plot label | |
| """ | |
| plt.plot(x, y1, label = y1label) | |
| plt.plot(x, y2, label = y2label) | |
| plt.xlabel(xlabel) | |
| plt.ylabel(ylabel) | |
| plt.legend() | |
| plt.show() | |
| def drawHist(ldata, myTitle, myXlabel, myYlabel, nbins=10): | |
| """ | |
| draw histogram | |
| Parameters | |
| ldata : list data | |
| myTitle : title | |
| myXlabel : x label | |
| myYlabel : y label | |
| nbins : num of bins | |
| """ | |
| plt.hist(ldata, bins=nbins, density=True) | |
| plt.title(myTitle) | |
| plt.xlabel(myXlabel) | |
| plt.ylabel(myYlabel) | |
| plt.show() | |
| def saveObject(obj, filePath): | |
| """ | |
| saves an object | |
| Parameters | |
| obj : object | |
| filePath : file path for saved object | |
| """ | |
| with open(filePath, "wb") as outfile: | |
| pickle.dump(obj,outfile) | |
| def restoreObject(filePath): | |
| """ | |
| restores an object | |
| Parameters | |
| filePath : file path to restore object from | |
| """ | |
| with open(filePath, "rb") as infile: | |
| obj = pickle.load(infile) | |
| return obj | |
| def isNumeric(data): | |
| """ | |
| true if all elements int or float | |
| Parameters | |
| data : numeric data list | |
| """ | |
| if type(data) == list or type(data) == np.ndarray: | |
| col = pd.Series(data) | |
| else: | |
| col = data | |
| return col.dtype == np.int32 or col.dtype == np.int64 or col.dtype == np.float32 or col.dtype == np.float64 | |
| def isInteger(data): | |
| """ | |
| true if all elements int | |
| Parameters | |
| data : numeric data list | |
| """ | |
| if type(data) == list or type(data) == np.ndarray: | |
| col = pd.Series(data) | |
| else: | |
| col = data | |
| return col.dtype == np.int32 or col.dtype == np.int64 | |
| def isFloat(data): | |
| """ | |
| true if all elements float | |
| Parameters | |
| data : numeric data list | |
| """ | |
| if type(data) == list or type(data) == np.ndarray: | |
| col = pd.Series(data) | |
| else: | |
| col = data | |
| return col.dtype == np.float32 or col.dtype == np.float64 | |
| def isBinary(data): | |
| """ | |
| true if all elements either 0 or 1 | |
| Parameters | |
| data : binary data | |
| """ | |
| re = next((d for d in data if not (type(d) == int and (d == 0 or d == 1))), None) | |
| return (re is None) | |
| def isCategorical(data): | |
| """ | |
| true if all elements int or string | |
| Parameters | |
| data : data value | |
| """ | |
| re = next((d for d in data if not (type(d) == int or type(d) == str)), None) | |
| return (re is None) | |
| def assertEqual(value, veq, msg): | |
| """ | |
| assert equal to | |
| Parameters | |
| value : value | |
| veq : value to be equated with | |
| msg : error msg | |
| """ | |
| assert value == veq , msg | |
| def assertGreater(value, vmin, msg): | |
| """ | |
| assert greater than | |
| Parameters | |
| value : value | |
| vmin : minimum value | |
| msg : error msg | |
| """ | |
| assert value > vmin , msg | |
| def assertGreaterEqual(value, vmin, msg): | |
| """ | |
| assert greater than | |
| Parameters | |
| value : value | |
| vmin : minimum value | |
| msg : error msg | |
| """ | |
| assert value >= vmin , msg | |
| def assertLesser(value, vmax, msg): | |
| """ | |
| assert less than | |
| Parameters | |
| value : value | |
| vmax : maximum value | |
| msg : error msg | |
| """ | |
| assert value < vmax , msg | |
| def assertLesserEqual(value, vmax, msg): | |
| """ | |
| assert less than | |
| Parameters | |
| value : value | |
| vmax : maximum value | |
| msg : error msg | |
| """ | |
| assert value <= vmax , msg | |
| def assertWithinRange(value, vmin, vmax, msg): | |
| """ | |
| assert within range | |
| Parameters | |
| value : value | |
| vmin : minimum value | |
| vmax : maximum value | |
| msg : error msg | |
| """ | |
| assert value >= vmin and value <= vmax, msg | |
| def assertInList(value, values, msg): | |
| """ | |
| assert contains in a list | |
| Parameters | |
| value ; balue to check for inclusion | |
| values : list data | |
| msg : error msg | |
| """ | |
| assert value in values, msg | |
| def maxListDist(l1, l2): | |
| """ | |
| maximum list element difference between 2 lists | |
| Parameters | |
| l1 : first list data | |
| l2 : second list data | |
| """ | |
| dist = max(list(map(lambda v : abs(v[0] - v[1]), zip(l1, l2)))) | |
| return dist | |
| def fileLineCount(fPath): | |
| """ | |
| number of lines ina file | |
| Parameters | |
| fPath : file path | |
| """ | |
| with open(fPath) as f: | |
| for i, li in enumerate(f): | |
| pass | |
| return (i + 1) | |
| def getAlphaNumCharCount(sdata): | |
| """ | |
| number of alphabetic and numeric charcters in a string | |
| Parameters | |
| sdata : string data | |
| """ | |
| acount = 0 | |
| ncount = 0 | |
| scount = 0 | |
| ocount = 0 | |
| assertEqual(type(sdata), str, "input must be string") | |
| for c in sdata: | |
| if c.isnumeric(): | |
| ncount += 1 | |
| elif c.isalpha(): | |
| acount += 1 | |
| elif c.isspace(): | |
| scount += 1 | |
| else: | |
| ocount += 1 | |
| r = (acount, ncount, ocount) | |
| return r | |
| def genPowerSet(cvalues, incEmpty=False): | |
| """ | |
| generates power set i.e all possible subsets | |
| Parameters | |
| cvalues : list of categorical values | |
| incEmpty : include empty set if True | |
| """ | |
| ps = list() | |
| for cv in cvalues: | |
| pse = list() | |
| for s in ps: | |
| sc = s.copy() | |
| sc.add(cv) | |
| #print(sc) | |
| pse.append(sc) | |
| ps.extend(pse) | |
| es = set() | |
| es.add(cv) | |
| ps.append(es) | |
| #print(es) | |
| if incEmpty: | |
| ps.append({}) | |
| return ps | |
| class StepFunction: | |
| """ | |
| step function | |
| Parameters | |
| """ | |
| def __init__(self, *values): | |
| """ | |
| initilizer | |
| Parameters | |
| values : list of tuples, wich each tuple containing 2 x values and corresponding y value | |
| """ | |
| self.points = values | |
| def find(self, x): | |
| """ | |
| finds step function value | |
| Parameters | |
| x : x value | |
| """ | |
| found = False | |
| y = 0 | |
| for p in self.points: | |
| if (x >= p[0] and x < p[1]): | |
| y = p[2] | |
| found = True | |
| break | |
| if not found: | |
| l = len(self.points) | |
| if (x < self.points[0][0]): | |
| y = self.points[0][2] | |
| elif (x > self.points[l-1][1]): | |
| y = self.points[l-1][2] | |
| return y | |
| class DummyVarGenerator: | |
| """ | |
| dummy variable generator for categorical variable | |
| """ | |
| def __init__(self, rowSize, catValues, trueVal, falseVal, delim=None): | |
| """ | |
| initilizer | |
| Parameters | |
| rowSize : row size | |
| catValues : dictionary with field index as key and list of categorical values as value | |
| trueVal : true value, typically "1" | |
| falseval : false value , typically "0" | |
| delim : field delemeter | |
| """ | |
| self.rowSize = rowSize | |
| self.catValues = catValues | |
| numCatVar = len(catValues) | |
| colCount = 0 | |
| for v in self.catValues.values(): | |
| colCount += len(v) | |
| self.newRowSize = rowSize - numCatVar + colCount | |
| #print ("new row size {}".format(self.newRowSize)) | |
| self.trueVal = trueVal | |
| self.falseVal = falseVal | |
| self.delim = delim | |
| def processRow(self, row): | |
| """ | |
| encodes categorical variables, returning as delemeter separate dstring or list | |
| Parameters | |
| row : row either delemeter separated string or list | |
| """ | |
| if self.delim is not None: | |
| rowArr = row.split(self.delim) | |
| msg = "row does not have expected number of columns found " + str(len(rowArr)) + " expected " + str(self.rowSize) | |
| assert len(rowArr) == self.rowSize, msg | |
| else: | |
| rowArr = row | |
| newRowArr = [] | |
| for i in range(len(rowArr)): | |
| curVal = rowArr[i] | |
| if (i in self.catValues): | |
| values = self.catValues[i] | |
| for val in values: | |
| if val == curVal: | |
| newVal = self.trueVal | |
| else: | |
| newVal = self.falseVal | |
| newRowArr.append(newVal) | |
| else: | |
| newRowArr.append(curVal) | |
| assert len(newRowArr) == self.newRowSize, "invalid new row size " + str(len(newRowArr)) + " expected " + str(self.newRowSize) | |
| encRow = self.delim.join(newRowArr) if self.delim is not None else newRowArr | |
| return encRow | |