Spaces:
Sleeping
Sleeping
from sympy.concrete.summations import Sum | |
from sympy.core.basic import Basic | |
from sympy.core.function import Lambda | |
from sympy.core.symbol import Dummy | |
from sympy.integrals.integrals import Integral | |
from sympy.stats.rv import (NamedArgsMixin, random_symbols, _symbol_converter, | |
PSpace, RandomSymbol, is_random, Distribution) | |
from sympy.stats.crv import ContinuousDistribution, SingleContinuousPSpace | |
from sympy.stats.drv import DiscreteDistribution, SingleDiscretePSpace | |
from sympy.stats.frv import SingleFiniteDistribution, SingleFinitePSpace | |
from sympy.stats.crv_types import ContinuousDistributionHandmade | |
from sympy.stats.drv_types import DiscreteDistributionHandmade | |
from sympy.stats.frv_types import FiniteDistributionHandmade | |
class CompoundPSpace(PSpace): | |
""" | |
A temporary Probability Space for the Compound Distribution. After | |
Marginalization, this returns the corresponding Probability Space of the | |
parent distribution. | |
""" | |
def __new__(cls, s, distribution): | |
s = _symbol_converter(s) | |
if isinstance(distribution, ContinuousDistribution): | |
return SingleContinuousPSpace(s, distribution) | |
if isinstance(distribution, DiscreteDistribution): | |
return SingleDiscretePSpace(s, distribution) | |
if isinstance(distribution, SingleFiniteDistribution): | |
return SingleFinitePSpace(s, distribution) | |
if not isinstance(distribution, CompoundDistribution): | |
raise ValueError("%s should be an isinstance of " | |
"CompoundDistribution"%(distribution)) | |
return Basic.__new__(cls, s, distribution) | |
def value(self): | |
return RandomSymbol(self.symbol, self) | |
def symbol(self): | |
return self.args[0] | |
def is_Continuous(self): | |
return self.distribution.is_Continuous | |
def is_Finite(self): | |
return self.distribution.is_Finite | |
def is_Discrete(self): | |
return self.distribution.is_Discrete | |
def distribution(self): | |
return self.args[1] | |
def pdf(self): | |
return self.distribution.pdf(self.symbol) | |
def set(self): | |
return self.distribution.set | |
def domain(self): | |
return self._get_newpspace().domain | |
def _get_newpspace(self, evaluate=False): | |
x = Dummy('x') | |
parent_dist = self.distribution.args[0] | |
func = Lambda(x, self.distribution.pdf(x, evaluate)) | |
new_pspace = self._transform_pspace(self.symbol, parent_dist, func) | |
if new_pspace is not None: | |
return new_pspace | |
message = ("Compound Distribution for %s is not implemented yet" % str(parent_dist)) | |
raise NotImplementedError(message) | |
def _transform_pspace(self, sym, dist, pdf): | |
""" | |
This function returns the new pspace of the distribution using handmade | |
Distributions and their corresponding pspace. | |
""" | |
pdf = Lambda(sym, pdf(sym)) | |
_set = dist.set | |
if isinstance(dist, ContinuousDistribution): | |
return SingleContinuousPSpace(sym, ContinuousDistributionHandmade(pdf, _set)) | |
elif isinstance(dist, DiscreteDistribution): | |
return SingleDiscretePSpace(sym, DiscreteDistributionHandmade(pdf, _set)) | |
elif isinstance(dist, SingleFiniteDistribution): | |
dens = {k: pdf(k) for k in _set} | |
return SingleFinitePSpace(sym, FiniteDistributionHandmade(dens)) | |
def compute_density(self, expr, *, compound_evaluate=True, **kwargs): | |
new_pspace = self._get_newpspace(compound_evaluate) | |
expr = expr.subs({self.value: new_pspace.value}) | |
return new_pspace.compute_density(expr, **kwargs) | |
def compute_cdf(self, expr, *, compound_evaluate=True, **kwargs): | |
new_pspace = self._get_newpspace(compound_evaluate) | |
expr = expr.subs({self.value: new_pspace.value}) | |
return new_pspace.compute_cdf(expr, **kwargs) | |
def compute_expectation(self, expr, rvs=None, evaluate=False, **kwargs): | |
new_pspace = self._get_newpspace(evaluate) | |
expr = expr.subs({self.value: new_pspace.value}) | |
if rvs: | |
rvs = rvs.subs({self.value: new_pspace.value}) | |
if isinstance(new_pspace, SingleFinitePSpace): | |
return new_pspace.compute_expectation(expr, rvs, **kwargs) | |
return new_pspace.compute_expectation(expr, rvs, evaluate, **kwargs) | |
def probability(self, condition, *, compound_evaluate=True, **kwargs): | |
new_pspace = self._get_newpspace(compound_evaluate) | |
condition = condition.subs({self.value: new_pspace.value}) | |
return new_pspace.probability(condition) | |
def conditional_space(self, condition, *, compound_evaluate=True, **kwargs): | |
new_pspace = self._get_newpspace(compound_evaluate) | |
condition = condition.subs({self.value: new_pspace.value}) | |
return new_pspace.conditional_space(condition) | |
class CompoundDistribution(Distribution, NamedArgsMixin): | |
""" | |
Class for Compound Distributions. | |
Parameters | |
========== | |
dist : Distribution | |
Distribution must contain a random parameter | |
Examples | |
======== | |
>>> from sympy.stats.compound_rv import CompoundDistribution | |
>>> from sympy.stats.crv_types import NormalDistribution | |
>>> from sympy.stats import Normal | |
>>> from sympy.abc import x | |
>>> X = Normal('X', 2, 4) | |
>>> N = NormalDistribution(X, 4) | |
>>> C = CompoundDistribution(N) | |
>>> C.set | |
Interval(-oo, oo) | |
>>> C.pdf(x, evaluate=True).simplify() | |
exp(-x**2/64 + x/16 - 1/16)/(8*sqrt(pi)) | |
References | |
========== | |
.. [1] https://en.wikipedia.org/wiki/Compound_probability_distribution | |
""" | |
def __new__(cls, dist): | |
if not isinstance(dist, (ContinuousDistribution, | |
SingleFiniteDistribution, DiscreteDistribution)): | |
message = "Compound Distribution for %s is not implemented yet" % str(dist) | |
raise NotImplementedError(message) | |
if not cls._compound_check(dist): | |
return dist | |
return Basic.__new__(cls, dist) | |
def set(self): | |
return self.args[0].set | |
def is_Continuous(self): | |
return isinstance(self.args[0], ContinuousDistribution) | |
def is_Finite(self): | |
return isinstance(self.args[0], SingleFiniteDistribution) | |
def is_Discrete(self): | |
return isinstance(self.args[0], DiscreteDistribution) | |
def pdf(self, x, evaluate=False): | |
dist = self.args[0] | |
randoms = [rv for rv in dist.args if is_random(rv)] | |
if isinstance(dist, SingleFiniteDistribution): | |
y = Dummy('y', integer=True, negative=False) | |
expr = dist.pmf(y) | |
else: | |
y = Dummy('y') | |
expr = dist.pdf(y) | |
for rv in randoms: | |
expr = self._marginalise(expr, rv, evaluate) | |
return Lambda(y, expr)(x) | |
def _marginalise(self, expr, rv, evaluate): | |
if isinstance(rv.pspace.distribution, SingleFiniteDistribution): | |
rv_dens = rv.pspace.distribution.pmf(rv) | |
else: | |
rv_dens = rv.pspace.distribution.pdf(rv) | |
rv_dom = rv.pspace.domain.set | |
if rv.pspace.is_Discrete or rv.pspace.is_Finite: | |
expr = Sum(expr*rv_dens, (rv, rv_dom._inf, | |
rv_dom._sup)) | |
else: | |
expr = Integral(expr*rv_dens, (rv, rv_dom._inf, | |
rv_dom._sup)) | |
if evaluate: | |
return expr.doit() | |
return expr | |
def _compound_check(self, dist): | |
""" | |
Checks if the given distribution contains random parameters. | |
""" | |
randoms = [] | |
for arg in dist.args: | |
randoms.extend(random_symbols(arg)) | |
if len(randoms) == 0: | |
return False | |
return True | |