Spaces:
Sleeping
Sleeping
from sympy.concrete.summations import (Sum, summation) | |
from sympy.core.basic import Basic | |
from sympy.core.cache import cacheit | |
from sympy.core.function import Lambda | |
from sympy.core.numbers import I | |
from sympy.core.relational import (Eq, Ne) | |
from sympy.core.singleton import S | |
from sympy.core.symbol import (Dummy, symbols) | |
from sympy.core.sympify import sympify | |
from sympy.functions.combinatorial.factorials import factorial | |
from sympy.functions.elementary.exponential import exp | |
from sympy.functions.elementary.integers import floor | |
from sympy.functions.elementary.piecewise import Piecewise | |
from sympy.logic.boolalg import And | |
from sympy.polys.polytools import poly | |
from sympy.series.series import series | |
from sympy.polys.polyerrors import PolynomialError | |
from sympy.stats.crv import reduce_rational_inequalities_wrap | |
from sympy.stats.rv import (NamedArgsMixin, SinglePSpace, SingleDomain, | |
random_symbols, PSpace, ConditionalDomain, RandomDomain, | |
ProductDomain, Distribution) | |
from sympy.stats.symbolic_probability import Probability | |
from sympy.sets.fancysets import Range, FiniteSet | |
from sympy.sets.sets import Union | |
from sympy.sets.contains import Contains | |
from sympy.utilities import filldedent | |
from sympy.core.sympify import _sympify | |
class DiscreteDistribution(Distribution): | |
def __call__(self, *args): | |
return self.pdf(*args) | |
class SingleDiscreteDistribution(DiscreteDistribution, NamedArgsMixin): | |
""" Discrete distribution of a single variable. | |
Serves as superclass for PoissonDistribution etc.... | |
Provides methods for pdf, cdf, and sampling | |
See Also: | |
sympy.stats.crv_types.* | |
""" | |
set = S.Integers | |
def __new__(cls, *args): | |
args = list(map(sympify, args)) | |
return Basic.__new__(cls, *args) | |
def check(*args): | |
pass | |
def compute_cdf(self, **kwargs): | |
""" Compute the CDF from the PDF. | |
Returns a Lambda. | |
""" | |
x = symbols('x', integer=True, cls=Dummy) | |
z = symbols('z', real=True, cls=Dummy) | |
left_bound = self.set.inf | |
# CDF is integral of PDF from left bound to z | |
pdf = self.pdf(x) | |
cdf = summation(pdf, (x, left_bound, floor(z)), **kwargs) | |
# CDF Ensure that CDF left of left_bound is zero | |
cdf = Piecewise((cdf, z >= left_bound), (0, True)) | |
return Lambda(z, cdf) | |
def _cdf(self, x): | |
return None | |
def cdf(self, x, **kwargs): | |
""" Cumulative density function """ | |
if not kwargs: | |
cdf = self._cdf(x) | |
if cdf is not None: | |
return cdf | |
return self.compute_cdf(**kwargs)(x) | |
def compute_characteristic_function(self, **kwargs): | |
""" Compute the characteristic function from the PDF. | |
Returns a Lambda. | |
""" | |
x, t = symbols('x, t', real=True, cls=Dummy) | |
pdf = self.pdf(x) | |
cf = summation(exp(I*t*x)*pdf, (x, self.set.inf, self.set.sup)) | |
return Lambda(t, cf) | |
def _characteristic_function(self, t): | |
return None | |
def characteristic_function(self, t, **kwargs): | |
""" Characteristic function """ | |
if not kwargs: | |
cf = self._characteristic_function(t) | |
if cf is not None: | |
return cf | |
return self.compute_characteristic_function(**kwargs)(t) | |
def compute_moment_generating_function(self, **kwargs): | |
t = Dummy('t', real=True) | |
x = Dummy('x', integer=True) | |
pdf = self.pdf(x) | |
mgf = summation(exp(t*x)*pdf, (x, self.set.inf, self.set.sup)) | |
return Lambda(t, mgf) | |
def _moment_generating_function(self, t): | |
return None | |
def moment_generating_function(self, t, **kwargs): | |
if not kwargs: | |
mgf = self._moment_generating_function(t) | |
if mgf is not None: | |
return mgf | |
return self.compute_moment_generating_function(**kwargs)(t) | |
def compute_quantile(self, **kwargs): | |
""" Compute the Quantile from the PDF. | |
Returns a Lambda. | |
""" | |
x = Dummy('x', integer=True) | |
p = Dummy('p', real=True) | |
left_bound = self.set.inf | |
pdf = self.pdf(x) | |
cdf = summation(pdf, (x, left_bound, x), **kwargs) | |
set = ((x, p <= cdf), ) | |
return Lambda(p, Piecewise(*set)) | |
def _quantile(self, x): | |
return None | |
def quantile(self, x, **kwargs): | |
""" Cumulative density function """ | |
if not kwargs: | |
quantile = self._quantile(x) | |
if quantile is not None: | |
return quantile | |
return self.compute_quantile(**kwargs)(x) | |
def expectation(self, expr, var, evaluate=True, **kwargs): | |
""" Expectation of expression over distribution """ | |
# TODO: support discrete sets with non integer stepsizes | |
if evaluate: | |
try: | |
p = poly(expr, var) | |
t = Dummy('t', real=True) | |
mgf = self.moment_generating_function(t) | |
deg = p.degree() | |
taylor = poly(series(mgf, t, 0, deg + 1).removeO(), t) | |
result = 0 | |
for k in range(deg+1): | |
result += p.coeff_monomial(var ** k) * taylor.coeff_monomial(t ** k) * factorial(k) | |
return result | |
except PolynomialError: | |
return summation(expr * self.pdf(var), | |
(var, self.set.inf, self.set.sup), **kwargs) | |
else: | |
return Sum(expr * self.pdf(var), | |
(var, self.set.inf, self.set.sup), **kwargs) | |
def __call__(self, *args): | |
return self.pdf(*args) | |
class DiscreteDomain(RandomDomain): | |
""" | |
A domain with discrete support with step size one. | |
Represented using symbols and Range. | |
""" | |
is_Discrete = True | |
class SingleDiscreteDomain(DiscreteDomain, SingleDomain): | |
def as_boolean(self): | |
return Contains(self.symbol, self.set) | |
class ConditionalDiscreteDomain(DiscreteDomain, ConditionalDomain): | |
""" | |
Domain with discrete support of step size one, that is restricted by | |
some condition. | |
""" | |
def set(self): | |
rv = self.symbols | |
if len(self.symbols) > 1: | |
raise NotImplementedError(filldedent(''' | |
Multivariate conditional domains are not yet implemented.''')) | |
rv = list(rv)[0] | |
return reduce_rational_inequalities_wrap(self.condition, | |
rv).intersect(self.fulldomain.set) | |
class DiscretePSpace(PSpace): | |
is_real = True | |
is_Discrete = True | |
def pdf(self): | |
return self.density(*self.symbols) | |
def where(self, condition): | |
rvs = random_symbols(condition) | |
assert all(r.symbol in self.symbols for r in rvs) | |
if len(rvs) > 1: | |
raise NotImplementedError(filldedent('''Multivariate discrete | |
random variables are not yet supported.''')) | |
conditional_domain = reduce_rational_inequalities_wrap(condition, | |
rvs[0]) | |
conditional_domain = conditional_domain.intersect(self.domain.set) | |
return SingleDiscreteDomain(rvs[0].symbol, conditional_domain) | |
def probability(self, condition): | |
complement = isinstance(condition, Ne) | |
if complement: | |
condition = Eq(condition.args[0], condition.args[1]) | |
try: | |
_domain = self.where(condition).set | |
if condition == False or _domain is S.EmptySet: | |
return S.Zero | |
if condition == True or _domain == self.domain.set: | |
return S.One | |
prob = self.eval_prob(_domain) | |
except NotImplementedError: | |
from sympy.stats.rv import density | |
expr = condition.lhs - condition.rhs | |
dens = density(expr) | |
if not isinstance(dens, DiscreteDistribution): | |
from sympy.stats.drv_types import DiscreteDistributionHandmade | |
dens = DiscreteDistributionHandmade(dens) | |
z = Dummy('z', real=True) | |
space = SingleDiscretePSpace(z, dens) | |
prob = space.probability(condition.__class__(space.value, 0)) | |
if prob is None: | |
prob = Probability(condition) | |
return prob if not complement else S.One - prob | |
def eval_prob(self, _domain): | |
sym = list(self.symbols)[0] | |
if isinstance(_domain, Range): | |
n = symbols('n', integer=True) | |
inf, sup, step = (r for r in _domain.args) | |
summand = ((self.pdf).replace( | |
sym, n*step)) | |
rv = summation(summand, | |
(n, inf/step, (sup)/step - 1)).doit() | |
return rv | |
elif isinstance(_domain, FiniteSet): | |
pdf = Lambda(sym, self.pdf) | |
rv = sum(pdf(x) for x in _domain) | |
return rv | |
elif isinstance(_domain, Union): | |
rv = sum(self.eval_prob(x) for x in _domain.args) | |
return rv | |
def conditional_space(self, condition): | |
# XXX: Converting from set to tuple. The order matters to Lambda | |
# though so we should be starting with a set... | |
density = Lambda(tuple(self.symbols), self.pdf/self.probability(condition)) | |
condition = condition.xreplace({rv: rv.symbol for rv in self.values}) | |
domain = ConditionalDiscreteDomain(self.domain, condition) | |
return DiscretePSpace(domain, density) | |
class ProductDiscreteDomain(ProductDomain, DiscreteDomain): | |
def as_boolean(self): | |
return And(*[domain.as_boolean for domain in self.domains]) | |
class SingleDiscretePSpace(DiscretePSpace, SinglePSpace): | |
""" Discrete probability space over a single univariate variable """ | |
is_real = True | |
def set(self): | |
return self.distribution.set | |
def domain(self): | |
return SingleDiscreteDomain(self.symbol, self.set) | |
def sample(self, size=(), library='scipy', seed=None): | |
""" | |
Internal sample method. | |
Returns dictionary mapping RandomSymbol to realization value. | |
""" | |
return {self.value: self.distribution.sample(size, library=library, seed=seed)} | |
def compute_expectation(self, expr, rvs=None, evaluate=True, **kwargs): | |
rvs = rvs or (self.value,) | |
if self.value not in rvs: | |
return expr | |
expr = _sympify(expr) | |
expr = expr.xreplace({rv: rv.symbol for rv in rvs}) | |
x = self.value.symbol | |
try: | |
return self.distribution.expectation(expr, x, evaluate=evaluate, | |
**kwargs) | |
except NotImplementedError: | |
return Sum(expr * self.pdf, (x, self.set.inf, self.set.sup), | |
**kwargs) | |
def compute_cdf(self, expr, **kwargs): | |
if expr == self.value: | |
x = Dummy("x", real=True) | |
return Lambda(x, self.distribution.cdf(x, **kwargs)) | |
else: | |
raise NotImplementedError() | |
def compute_density(self, expr, **kwargs): | |
if expr == self.value: | |
return self.distribution | |
raise NotImplementedError() | |
def compute_characteristic_function(self, expr, **kwargs): | |
if expr == self.value: | |
t = Dummy("t", real=True) | |
return Lambda(t, self.distribution.characteristic_function(t, **kwargs)) | |
else: | |
raise NotImplementedError() | |
def compute_moment_generating_function(self, expr, **kwargs): | |
if expr == self.value: | |
t = Dummy("t", real=True) | |
return Lambda(t, self.distribution.moment_generating_function(t, **kwargs)) | |
else: | |
raise NotImplementedError() | |
def compute_quantile(self, expr, **kwargs): | |
if expr == self.value: | |
p = Dummy("p", real=True) | |
return Lambda(p, self.distribution.quantile(p, **kwargs)) | |
else: | |
raise NotImplementedError() | |