Spaces:
Runtime error
Runtime error
| #!/usr/local/bin/python3 | |
| # avenir-python: Machine Learning | |
| # Author: Pranab Ghosh | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"); you | |
| # may not use this file except in compliance with the License. You may | |
| # obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or | |
| # implied. See the License for the specific language governing | |
| # permissions and limitations under the License. | |
| # Package imports | |
| import os | |
| import sys | |
| import matplotlib.pyplot as plt | |
| import numpy as np | |
| import matplotlib | |
| import random | |
| import jprops | |
| import statistics | |
| from matplotlib import pyplot | |
| from .util import * | |
| from .mlutil import * | |
| from .sampler import * | |
| class MonteCarloSimulator(object): | |
| """ | |
| monte carlo simulator for intergation, various statistic for complex fumctions | |
| """ | |
| def __init__(self, numIter, callback, logFilePath, logLevName): | |
| """ | |
| constructor | |
| Parameters | |
| numIter :num of iterations | |
| callback : call back method | |
| logFilePath : log file path | |
| logLevName : log level | |
| """ | |
| self.samplers = list() | |
| self.numIter = numIter; | |
| self.callback = callback | |
| self.extraArgs = None | |
| self.output = list() | |
| self.sum = None | |
| self.mean = None | |
| self.sd = None | |
| self.replSamplers = dict() | |
| self.prSamples = None | |
| self.logger = None | |
| if logFilePath is not None: | |
| self.logger = createLogger(__name__, logFilePath, logLevName) | |
| self.logger.info("******** stating new session of MonteCarloSimulator") | |
| def registerBernoulliTrialSampler(self, pr): | |
| """ | |
| bernoulli trial sampler | |
| Parameters | |
| pr : probability | |
| """ | |
| self.samplers.append(BernoulliTrialSampler(pr)) | |
| def registerPoissonSampler(self, rateOccur, maxSamp): | |
| """ | |
| poisson sampler | |
| Parameters | |
| rateOccur : rate of occurence | |
| maxSamp : max limit on no of samples | |
| """ | |
| self.samplers.append(PoissonSampler(rateOccur, maxSamp)) | |
| def registerUniformSampler(self, minv, maxv): | |
| """ | |
| uniform sampler | |
| Parameters | |
| minv : min value | |
| maxv : max value | |
| """ | |
| self.samplers.append(UniformNumericSampler(minv, maxv)) | |
| def registerTriangularSampler(self, min, max, vertexValue, vertexPos=None): | |
| """ | |
| triangular sampler | |
| Parameters | |
| xmin : min value | |
| xmax : max value | |
| vertexValue : distr value at vertex | |
| vertexPos : vertex pposition | |
| """ | |
| self.samplers.append(TriangularRejectSampler(min, max, vertexValue, vertexPos)) | |
| def registerGaussianSampler(self, mean, sd): | |
| """ | |
| gaussian sampler | |
| Parameters | |
| mean : mean | |
| sd : std deviation | |
| """ | |
| self.samplers.append(GaussianRejectSampler(mean, sd)) | |
| def registerNormalSampler(self, mean, sd): | |
| """ | |
| gaussian sampler using numpy | |
| Parameters | |
| mean : mean | |
| sd : std deviation | |
| """ | |
| self.samplers.append(NormalSampler(mean, sd)) | |
| def registerLogNormalSampler(self, mean, sd): | |
| """ | |
| log normal sampler using numpy | |
| Parameters | |
| mean : mean | |
| sd : std deviation | |
| """ | |
| self.samplers.append(LogNormalSampler(mean, sd)) | |
| def registerParetoSampler(self, mode, shape): | |
| """ | |
| pareto sampler using numpy | |
| Parameters | |
| mode : mode | |
| shape : shape | |
| """ | |
| self.samplers.append(ParetoSampler(mode, shape)) | |
| def registerGammaSampler(self, shape, scale): | |
| """ | |
| gamma sampler using numpy | |
| Parameters | |
| shape : shape | |
| scale : scale | |
| """ | |
| self.samplers.append(GammaSampler(shape, scale)) | |
| def registerDiscreteRejectSampler(self, xmin, xmax, step, *values): | |
| """ | |
| disccrete int sampler | |
| Parameters | |
| xmin : min value | |
| xmax : max value | |
| step : discrete step | |
| values : distr values | |
| """ | |
| self.samplers.append(DiscreteRejectSampler(xmin, xmax, step, *values)) | |
| def registerNonParametricSampler(self, minv, binWidth, *values): | |
| """ | |
| nonparametric sampler | |
| Parameters | |
| xmin : min value | |
| binWidth : bin width | |
| values : distr values | |
| """ | |
| sampler = NonParamRejectSampler(minv, binWidth, *values) | |
| sampler.sampleAsFloat() | |
| self.samplers.append(sampler) | |
| def registerMultiVarNormalSampler(self, numVar, *values): | |
| """ | |
| multi var gaussian sampler using numpy | |
| Parameters | |
| numVar : no of variables | |
| values : numVar mean values followed by numVar x numVar values for covar matrix | |
| """ | |
| self.samplers.append(MultiVarNormalSampler(numVar, *values)) | |
| def registerJointNonParamRejectSampler(self, xmin, xbinWidth, xnbin, ymin, ybinWidth, ynbin, *values): | |
| """ | |
| joint nonparametric sampler | |
| Parameters | |
| xmin : min value for x | |
| xbinWidth : bin width for x | |
| xnbin : no of bins for x | |
| ymin : min value for y | |
| ybinWidth : bin width for y | |
| ynbin : no of bins for y | |
| values : distr values | |
| """ | |
| self.samplers.append(JointNonParamRejectSampler(xmin, xbinWidth, xnbin, ymin, ybinWidth, ynbin, *values)) | |
| def registerRangePermutationSampler(self, minv, maxv, *numShuffles): | |
| """ | |
| permutation sampler with range | |
| Parameters | |
| minv : min of range | |
| maxv : max of range | |
| numShuffles : no of shuffles or range of no of shuffles | |
| """ | |
| self.samplers.append(PermutationSampler.createSamplerWithRange(minv, maxv, *numShuffles)) | |
| def registerValuesPermutationSampler(self, values, *numShuffles): | |
| """ | |
| permutation sampler with values | |
| Parameters | |
| values : list data | |
| numShuffles : no of shuffles or range of no of shuffles | |
| """ | |
| self.samplers.append(PermutationSampler.createSamplerWithValues(values, *numShuffles)) | |
| def registerNormalSamplerWithTrendCycle(self, mean, stdDev, trend, cycle, step=1): | |
| """ | |
| normal sampler with trend and cycle | |
| Parameters | |
| mean : mean | |
| stdDev : std deviation | |
| dmean : trend delta | |
| cycle : cycle values wrt base mean | |
| step : adjustment step for cycle and trend | |
| """ | |
| self.samplers.append(NormalSamplerWithTrendCycle(mean, stdDev, trend, cycle, step)) | |
| def registerCustomSampler(self, sampler): | |
| """ | |
| eventsampler | |
| Parameters | |
| sampler : sampler with sample() method | |
| """ | |
| self.samplers.append(sampler) | |
| def registerEventSampler(self, intvSampler, valSampler=None): | |
| """ | |
| event sampler | |
| Parameters | |
| intvSampler : interval sampler | |
| valSampler : value sampler | |
| """ | |
| self.samplers.append(EventSampler(intvSampler, valSampler)) | |
| def registerMetropolitanSampler(self, propStdDev, minv, binWidth, values): | |
| """ | |
| metropolitan sampler | |
| Parameters | |
| propStdDev : proposal distr std dev | |
| minv : min domain value for target distr | |
| binWidth : bin width | |
| values : target distr values | |
| """ | |
| self.samplers.append(MetropolitanSampler(propStdDev, minv, binWidth, values)) | |
| def setSampler(self, var, iter, sampler): | |
| """ | |
| set sampler for some variable when iteration reaches certain point | |
| Parameters | |
| var : sampler index | |
| iter : iteration count | |
| sampler : new sampler | |
| """ | |
| key = (var, iter) | |
| self.replSamplers[key] = sampler | |
| def registerExtraArgs(self, *args): | |
| """ | |
| extra args | |
| Parameters | |
| args : extra argument list | |
| """ | |
| self.extraArgs = args | |
| def replSampler(self, iter): | |
| """ | |
| replace samper for this iteration | |
| Parameters | |
| iter : iteration number | |
| """ | |
| if len(self.replSamplers) > 0: | |
| for v in range(self.numVars): | |
| key = (v, iter) | |
| if key in self.replSamplers: | |
| sampler = self.replSamplers[key] | |
| self.samplers[v] = sampler | |
| def run(self): | |
| """ | |
| run simulator | |
| """ | |
| self.sum = None | |
| self.mean = None | |
| self.sd = None | |
| self.numVars = len(self.samplers) | |
| vOut = 0 | |
| #print(formatAny(self.numIter, "num iterations")) | |
| for i in range(self.numIter): | |
| self.replSampler(i) | |
| args = list() | |
| for s in self.samplers: | |
| arg = s.sample() | |
| if type(arg) is list: | |
| args.extend(arg) | |
| else: | |
| args.append(arg) | |
| slen = len(args) | |
| if self.extraArgs: | |
| args.extend(self.extraArgs) | |
| args.append(self) | |
| args.append(i) | |
| vOut = self.callback(args) | |
| self.output.append(vOut) | |
| self.prSamples = args[:slen] | |
| def getOutput(self): | |
| """ | |
| get raw output | |
| """ | |
| return self.output | |
| def setOutput(self, values): | |
| """ | |
| set raw output | |
| Parameters | |
| values : output values | |
| """ | |
| self.output = values | |
| self.numIter = len(values) | |
| def drawHist(self, myTitle, myXlabel, myYlabel): | |
| """ | |
| draw histogram | |
| Parameters | |
| myTitle : title | |
| myXlabel : label for x | |
| myYlabel : label for y | |
| """ | |
| pyplot.hist(self.output, density=True) | |
| pyplot.title(myTitle) | |
| pyplot.xlabel(myXlabel) | |
| pyplot.ylabel(myYlabel) | |
| pyplot.show() | |
| def getSum(self): | |
| """ | |
| get sum | |
| """ | |
| if not self.sum: | |
| self.sum = sum(self.output) | |
| return self.sum | |
| def getMean(self): | |
| """ | |
| get average | |
| """ | |
| if self.mean is None: | |
| self.mean = statistics.mean(self.output) | |
| return self.mean | |
| def getStdDev(self): | |
| """ | |
| get std dev | |
| """ | |
| if self.sd is None: | |
| self.sd = statistics.stdev(self.output, xbar=self.mean) if self.mean else statistics.stdev(self.output) | |
| return self.sd | |
| def getMedian(self): | |
| """ | |
| get average | |
| """ | |
| med = statistics.median(self.output) | |
| return med | |
| def getMax(self): | |
| """ | |
| get max | |
| """ | |
| return max(self.output) | |
| def getMin(self): | |
| """ | |
| get min | |
| """ | |
| return min(self.output) | |
| def getIntegral(self, bounds): | |
| """ | |
| integral | |
| Parameters | |
| bounds : bound on sum | |
| """ | |
| if not self.sum: | |
| self.sum = sum(self.output) | |
| return self.sum * bounds / self.numIter | |
| def getLowerTailStat(self, zvalue, numIntPoints=50): | |
| """ | |
| get lower tail stat | |
| Parameters | |
| zvalue : zscore upper bound | |
| numIntPoints : no of interpolation point for cum distribution | |
| """ | |
| mean = self.getMean() | |
| sd = self.getStdDev() | |
| tailStart = self.getMin() | |
| tailEnd = mean - zvalue * sd | |
| cvaCounts = self.cumDistr(tailStart, tailEnd, numIntPoints) | |
| reqConf = floatRange(0.0, 0.150, .01) | |
| msg = "p value outside interpolation range, reduce zvalue and try again {:.5f} {:.5f}".format(reqConf[-1], cvaCounts[-1][1]) | |
| assert reqConf[-1] < cvaCounts[-1][1], msg | |
| critValues = self.interpolateCritValues(reqConf, cvaCounts, True, tailStart, tailEnd) | |
| return critValues | |
| def getPercentile(self, cvalue): | |
| """ | |
| percentile | |
| Parameters | |
| cvalue : value for percentile | |
| """ | |
| count = 0 | |
| for v in self.output: | |
| if v < cvalue: | |
| count += 1 | |
| percent = int(count * 100.0 / self.numIter) | |
| return percent | |
| def getCritValue(self, pvalue): | |
| """ | |
| critical value for probabaility threshold | |
| Parameters | |
| pvalue : pvalue | |
| """ | |
| assertWithinRange(pvalue, 0.0, 1.0, "invalid probabaility value") | |
| svalues = self.output.sorted() | |
| ppval = None | |
| cpval = None | |
| intv = 1.0 / (self.numIter - 1) | |
| for i in range(self.numIter - 1): | |
| cpval = (i + 1) / self.numIter | |
| if cpval > pvalue: | |
| sl = svalues[i] - svalues[i-1] | |
| cval = svalues[i-1] + sl * (pvalue - ppval) | |
| break | |
| ppval = cpval | |
| return cval | |
| def getUpperTailStat(self, zvalue, numIntPoints=50): | |
| """ | |
| upper tail stat | |
| Parameters | |
| zvalue : zscore upper bound | |
| numIntPoints : no of interpolation point for cum distribution | |
| """ | |
| mean = self.getMean() | |
| sd = self.getStdDev() | |
| tailStart = mean + zvalue * sd | |
| tailEnd = self.getMax() | |
| cvaCounts = self.cumDistr(tailStart, tailEnd, numIntPoints) | |
| reqConf = floatRange(0.85, 1.0, .01) | |
| msg = "p value outside interpolation range, reduce zvalue and try again {:.5f} {:.5f}".format(reqConf[0], cvaCounts[0][1]) | |
| assert reqConf[0] > cvaCounts[0][1], msg | |
| critValues = self.interpolateCritValues(reqConf, cvaCounts, False, tailStart, tailEnd) | |
| return critValues | |
| def cumDistr(self, tailStart, tailEnd, numIntPoints): | |
| """ | |
| cumulative distribution at tail | |
| Parameters | |
| tailStart : tail start | |
| tailEnd : tail end | |
| numIntPoints : no of interpolation points | |
| """ | |
| delta = (tailEnd - tailStart) / numIntPoints | |
| cvalues = floatRange(tailStart, tailEnd, delta) | |
| cvaCounts = list() | |
| for cv in cvalues: | |
| count = 0 | |
| for v in self.output: | |
| if v < cv: | |
| count += 1 | |
| p = (cv, count/self.numIter) | |
| if self.logger is not None: | |
| self.logger.info("{:.3f} {:.3f}".format(p[0], p[1])) | |
| cvaCounts.append(p) | |
| return cvaCounts | |
| def interpolateCritValues(self, reqConf, cvaCounts, lowertTail, tailStart, tailEnd): | |
| """ | |
| interpolate for spefici confidence limits | |
| Parameters | |
| reqConf : confidence level values | |
| cvaCounts : cum values | |
| lowertTail : True if lower tail | |
| tailStart ; tail start | |
| tailEnd : tail end | |
| """ | |
| critValues = list() | |
| if self.logger is not None: | |
| self.logger.info("target conf limit " + str(reqConf)) | |
| reqConfSub = reqConf[1:] if lowertTail else reqConf[:-1] | |
| for rc in reqConfSub: | |
| for i in range(len(cvaCounts) -1): | |
| if rc >= cvaCounts[i][1] and rc < cvaCounts[i+1][1]: | |
| #print("interpoltate between " + str(cvaCounts[i]) + " and " + str(cvaCounts[i+1])) | |
| slope = (cvaCounts[i+1][0] - cvaCounts[i][0]) / (cvaCounts[i+1][1] - cvaCounts[i][1]) | |
| cval = cvaCounts[i][0] + slope * (rc - cvaCounts[i][1]) | |
| p = (rc, cval) | |
| if self.logger is not None: | |
| self.logger.debug("interpolated crit values {:.3f} {:.3f}".format(p[0], p[1])) | |
| critValues.append(p) | |
| break | |
| if lowertTail: | |
| p = (0.0, tailStart) | |
| critValues.insert(0, p) | |
| else: | |
| p = (1.0, tailEnd) | |
| critValues.append(p) | |
| return critValues | |