Spaces:
Runtime error
Runtime error
| import os | |
| import sys | |
| import matplotlib.pyplot as plt | |
| import numpy as np | |
| import random | |
| import jprops | |
| from random import randint | |
| from matumizi.util import * | |
| from matumizi.mlutil import * | |
| """ | |
| Markov chain classifier | |
| """ | |
| class MarkovChainClassifier(): | |
| def __init__(self, configFile): | |
| """ | |
| constructor | |
| Parameters | |
| configFile: config file path | |
| """ | |
| defValues = {} | |
| defValues["common.model.directory"] = ("model", None) | |
| defValues["common.model.file"] = (None, None) | |
| defValues["common.verbose"] = (False, None) | |
| defValues["common.states"] = (None, "missing state list") | |
| defValues["train.data.file"] = (None, "missing training data file") | |
| defValues["train.data.class.labels"] = (["F", "T"], None) | |
| defValues["train.data.key.len"] = (1, None) | |
| defValues["train.model.save"] = (False, None) | |
| defValues["train.score.method"] = ("accuracy", None) | |
| defValues["predict.data.file"] = (None, None) | |
| defValues["predict.use.saved.model"] = (True, None) | |
| defValues["predict.log.odds.threshold"] = (0, None) | |
| defValues["validate.data.file"] = (None, "missing validation data file") | |
| defValues["validate.use.saved.model"] = (False, None) | |
| defValues["valid.accuracy.metric"] = ("acc", None) | |
| self.config = Configuration(configFile, defValues) | |
| self.stTranPr = dict() | |
| self.clabels = self.config.getStringListConfig("train.data.class.labels")[0] | |
| self.states = self.config.getStringListConfig("common.states")[0] | |
| self.nstates = len(self.states) | |
| for cl in self.clabels: | |
| stp = np.ones((self.nstates,self.nstates)) | |
| self.stTranPr[cl] = stp | |
| def train(self): | |
| """ | |
| train model | |
| """ | |
| #state transition matrix | |
| tdfPath = self.config.getStringConfig("train.data.file")[0] | |
| klen = self.config.getIntConfig("train.data.key.len")[0] | |
| for rec in fileRecGen(tdfPath): | |
| cl = rec[klen] | |
| rlen = len(rec) | |
| for i in range(klen+1, rlen-1, 1): | |
| fst = self.states.index(rec[i]) | |
| tst = self.states.index(rec[i+1]) | |
| self.stTranPr[cl][fst][tst] += 1 | |
| #normalize to probability | |
| for cl in self.clabels: | |
| stp = self.stTranPr[cl] | |
| for i in range(self.nstates): | |
| s = stp[i].sum() | |
| r = stp[i] / s | |
| stp[i] = r | |
| #save | |
| if self.config.getBooleanConfig("train.model.save")[0]: | |
| mdPath = self.config.getStringConfig("common.model.directory")[0] | |
| assert os.path.exists(mdPath), "model save directory does not exist" | |
| mfPath = self.config.getStringConfig("common.model.file")[0] | |
| mfPath = os.path.join(mdPath, mfPath) | |
| with open(mfPath, "w") as fh: | |
| for cl in self.clabels: | |
| fh.write("label:" + cl +"\n") | |
| stp = self.stTranPr[cl] | |
| for r in stp: | |
| rs = ",".join(toStrList(r, 6)) + "\n" | |
| fh.write(rs) | |
| def validate(self): | |
| """ | |
| validate using model | |
| """ | |
| useSavedModel = self.config.getBooleanConfig("predict.use.saved.model")[0] | |
| if useSavedModel: | |
| self.__restoreModel() | |
| else: | |
| self.train() | |
| vdfPath = self.config.getStringConfig("validate.data.file")[0] | |
| accMetric = self.config.getStringConfig("valid.accuracy.metric")[0] | |
| yac, ypr = self.__getPrediction(vdfPath, True) | |
| if type(self.clabels[0]) == str: | |
| yac = self.__toIntClabel(yac) | |
| ypr = self.__toIntClabel(ypr) | |
| score = perfMetric(accMetric, yac, ypr) | |
| print(formatFloat(3, score, "perf score")) | |
| def predict(self): | |
| """ | |
| predict using model | |
| """ | |
| useSavedModel = self.config.getBooleanConfig("predict.use.saved.model")[0] | |
| if useSavedModel: | |
| self.__restoreModel() | |
| else: | |
| self.train() | |
| #predict | |
| pdfPath = self.config.getStringConfig("predict.data.file")[0] | |
| _ , ypr = self.__getPrediction(pdfPath) | |
| return ypr | |
| def __restoreModel(self): | |
| """ | |
| restore model | |
| """ | |
| mdPath = self.config.getStringConfig("common.model.directory")[0] | |
| assert os.path.exists(mdPath), "model save directory does not exist" | |
| mfPath = self.config.getStringConfig("common.model.file")[0] | |
| mfPath = os.path.join(mdPath, mfPath) | |
| stp = None | |
| cl = None | |
| for rec in fileRecGen(mfPath): | |
| if len(rec) == 1: | |
| if stp is not None: | |
| stp = np.array(stp) | |
| self.stTranPr[cl] = stp | |
| cl = rec[0].split(":")[1] | |
| stp = list() | |
| else: | |
| frec = asFloatList(rec) | |
| stp.append(frec) | |
| stp = np.array(stp) | |
| self.stTranPr[cl] = stp | |
| def __getPrediction(self, fpath, validate=False): | |
| """ | |
| get predictions | |
| Parameters | |
| fpath : data file path | |
| validate: True if validation | |
| """ | |
| nc = self.clabels[0] | |
| pc = self.clabels[1] | |
| thold = self.config.getFloatConfig("predict.log.odds.threshold")[0] | |
| klen = self.config.getIntConfig("train.data.key.len")[0] | |
| offset = klen+1 if validate else klen | |
| ypr = list() | |
| yac = list() | |
| for rec in fileRecGen(fpath): | |
| lodds = 0 | |
| rlen = len(rec) | |
| for i in range(offset, rlen-1, 1): | |
| fst = self.states.index(rec[i]) | |
| tst = self.states.index(rec[i+1]) | |
| odds = self.stTranPr[pc][fst][tst] / self.stTranPr[nc][fst][tst] | |
| lodds += math.log(odds) | |
| prc = pc if lodds > thold else nc | |
| ypr.append(prc) | |
| if validate: | |
| yac.append(rec[klen]) | |
| else: | |
| recp = prc + "\t" + ",".join(rec) | |
| print(recp) | |
| re = (yac, ypr) | |
| return re | |
| def __toIntClabel(self, labels): | |
| """ | |
| convert string class label to int | |
| Parameters | |
| labels : class label values | |
| """ | |
| return list(map(lambda l : self.clabels.index(l), labels)) |