Spaces:
Sleeping
Sleeping
from sympy.concrete.summations import Sum | |
from sympy.core.containers import Tuple | |
from sympy.core.function import Lambda | |
from sympy.core.numbers import (Float, Rational, oo, pi) | |
from sympy.core.relational import (Eq, Ge, Gt, Le, Lt, Ne) | |
from sympy.core.singleton import S | |
from sympy.core.symbol import (Symbol, symbols) | |
from sympy.functions.combinatorial.factorials import factorial | |
from sympy.functions.elementary.exponential import exp | |
from sympy.functions.elementary.integers import ceiling | |
from sympy.functions.elementary.miscellaneous import sqrt | |
from sympy.functions.elementary.piecewise import Piecewise | |
from sympy.functions.special.error_functions import erf | |
from sympy.functions.special.gamma_functions import (gamma, lowergamma) | |
from sympy.logic.boolalg import (And, Not) | |
from sympy.matrices.dense import Matrix | |
from sympy.matrices.expressions.matexpr import MatrixSymbol | |
from sympy.matrices.immutable import ImmutableMatrix | |
from sympy.sets.contains import Contains | |
from sympy.sets.fancysets import Range | |
from sympy.sets.sets import (FiniteSet, Interval) | |
from sympy.stats import (DiscreteMarkovChain, P, TransitionMatrixOf, E, | |
StochasticStateSpaceOf, variance, ContinuousMarkovChain, | |
BernoulliProcess, PoissonProcess, WienerProcess, | |
GammaProcess, sample_stochastic_process) | |
from sympy.stats.joint_rv import JointDistribution | |
from sympy.stats.joint_rv_types import JointDistributionHandmade | |
from sympy.stats.rv import RandomIndexedSymbol | |
from sympy.stats.symbolic_probability import Probability, Expectation | |
from sympy.testing.pytest import (raises, skip, ignore_warnings, | |
warns_deprecated_sympy) | |
from sympy.external import import_module | |
from sympy.stats.frv_types import BernoulliDistribution | |
from sympy.stats.drv_types import PoissonDistribution | |
from sympy.stats.crv_types import NormalDistribution, GammaDistribution | |
from sympy.core.symbol import Str | |
def test_DiscreteMarkovChain(): | |
# pass only the name | |
X = DiscreteMarkovChain("X") | |
assert isinstance(X.state_space, Range) | |
assert X.index_set == S.Naturals0 | |
assert isinstance(X.transition_probabilities, MatrixSymbol) | |
t = symbols('t', positive=True, integer=True) | |
assert isinstance(X[t], RandomIndexedSymbol) | |
assert E(X[0]) == Expectation(X[0]) | |
raises(TypeError, lambda: DiscreteMarkovChain(1)) | |
raises(NotImplementedError, lambda: X(t)) | |
raises(NotImplementedError, lambda: X.communication_classes()) | |
raises(NotImplementedError, lambda: X.canonical_form()) | |
raises(NotImplementedError, lambda: X.decompose()) | |
nz = Symbol('n', integer=True) | |
TZ = MatrixSymbol('M', nz, nz) | |
SZ = Range(nz) | |
YZ = DiscreteMarkovChain('Y', SZ, TZ) | |
assert P(Eq(YZ[2], 1), Eq(YZ[1], 0)) == TZ[0, 1] | |
raises(ValueError, lambda: sample_stochastic_process(t)) | |
raises(ValueError, lambda: next(sample_stochastic_process(X))) | |
# pass name and state_space | |
# any hashable object should be a valid state | |
# states should be valid as a tuple/set/list/Tuple/Range | |
sym, rainy, cloudy, sunny = symbols('a Rainy Cloudy Sunny', real=True) | |
state_spaces = [(1, 2, 3), [Str('Hello'), sym, DiscreteMarkovChain("Y", (1,2,3))], | |
Tuple(S(1), exp(sym), Str('World'), sympify=False), Range(-1, 5, 2), | |
[rainy, cloudy, sunny]] | |
chains = [DiscreteMarkovChain("Y", state_space) for state_space in state_spaces] | |
for i, Y in enumerate(chains): | |
assert isinstance(Y.transition_probabilities, MatrixSymbol) | |
assert Y.state_space == state_spaces[i] or Y.state_space == FiniteSet(*state_spaces[i]) | |
assert Y.number_of_states == 3 | |
with ignore_warnings(UserWarning): # TODO: Restore tests once warnings are removed | |
assert P(Eq(Y[2], 1), Eq(Y[0], 2), evaluate=False) == Probability(Eq(Y[2], 1), Eq(Y[0], 2)) | |
assert E(Y[0]) == Expectation(Y[0]) | |
raises(ValueError, lambda: next(sample_stochastic_process(Y))) | |
raises(TypeError, lambda: DiscreteMarkovChain("Y", {1: 1})) | |
Y = DiscreteMarkovChain("Y", Range(1, t, 2)) | |
assert Y.number_of_states == ceiling((t-1)/2) | |
# pass name and transition_probabilities | |
chains = [DiscreteMarkovChain("Y", trans_probs=Matrix([[]])), | |
DiscreteMarkovChain("Y", trans_probs=Matrix([[0, 1], [1, 0]])), | |
DiscreteMarkovChain("Y", trans_probs=Matrix([[pi, 1-pi], [sym, 1-sym]]))] | |
for Z in chains: | |
assert Z.number_of_states == Z.transition_probabilities.shape[0] | |
assert isinstance(Z.transition_probabilities, ImmutableMatrix) | |
# pass name, state_space and transition_probabilities | |
T = Matrix([[0.5, 0.2, 0.3],[0.2, 0.5, 0.3],[0.2, 0.3, 0.5]]) | |
TS = MatrixSymbol('T', 3, 3) | |
Y = DiscreteMarkovChain("Y", [0, 1, 2], T) | |
YS = DiscreteMarkovChain("Y", ['One', 'Two', 3], TS) | |
assert Y.joint_distribution(1, Y[2], 3) == JointDistribution(Y[1], Y[2], Y[3]) | |
raises(ValueError, lambda: Y.joint_distribution(Y[1].symbol, Y[2].symbol)) | |
assert P(Eq(Y[3], 2), Eq(Y[1], 1)).round(2) == Float(0.36, 2) | |
assert (P(Eq(YS[3], 2), Eq(YS[1], 1)) - | |
(TS[0, 2]*TS[1, 0] + TS[1, 1]*TS[1, 2] + TS[1, 2]*TS[2, 2])).simplify() == 0 | |
assert P(Eq(YS[1], 1), Eq(YS[2], 2)) == Probability(Eq(YS[1], 1)) | |
assert P(Eq(YS[3], 3), Eq(YS[1], 1)) == TS[0, 2]*TS[1, 0] + TS[1, 1]*TS[1, 2] + TS[1, 2]*TS[2, 2] | |
TO = Matrix([[0.25, 0.75, 0],[0, 0.25, 0.75],[0.75, 0, 0.25]]) | |
assert P(Eq(Y[3], 2), Eq(Y[1], 1) & TransitionMatrixOf(Y, TO)).round(3) == Float(0.375, 3) | |
with ignore_warnings(UserWarning): ### TODO: Restore tests once warnings are removed | |
assert E(Y[3], evaluate=False) == Expectation(Y[3]) | |
assert E(Y[3], Eq(Y[2], 1)).round(2) == Float(1.1, 3) | |
TSO = MatrixSymbol('T', 4, 4) | |
raises(ValueError, lambda: str(P(Eq(YS[3], 2), Eq(YS[1], 1) & TransitionMatrixOf(YS, TSO)))) | |
raises(TypeError, lambda: DiscreteMarkovChain("Z", [0, 1, 2], symbols('M'))) | |
raises(ValueError, lambda: DiscreteMarkovChain("Z", [0, 1, 2], MatrixSymbol('T', 3, 4))) | |
raises(ValueError, lambda: E(Y[3], Eq(Y[2], 6))) | |
raises(ValueError, lambda: E(Y[2], Eq(Y[3], 1))) | |
# extended tests for probability queries | |
TO1 = Matrix([[Rational(1, 4), Rational(3, 4), 0],[Rational(1, 3), Rational(1, 3), Rational(1, 3)],[0, Rational(1, 4), Rational(3, 4)]]) | |
assert P(And(Eq(Y[2], 1), Eq(Y[1], 1), Eq(Y[0], 0)), | |
Eq(Probability(Eq(Y[0], 0)), Rational(1, 4)) & TransitionMatrixOf(Y, TO1)) == Rational(1, 16) | |
assert P(And(Eq(Y[2], 1), Eq(Y[1], 1), Eq(Y[0], 0)), TransitionMatrixOf(Y, TO1)) == \ | |
Probability(Eq(Y[0], 0))/4 | |
assert P(Lt(X[1], 2) & Gt(X[1], 0), Eq(X[0], 2) & | |
StochasticStateSpaceOf(X, [0, 1, 2]) & TransitionMatrixOf(X, TO1)) == Rational(1, 4) | |
assert P(Lt(X[1], 2) & Gt(X[1], 0), Eq(X[0], 2) & | |
StochasticStateSpaceOf(X, [S(0), '0', 1]) & TransitionMatrixOf(X, TO1)) == Rational(1, 4) | |
assert P(Ne(X[1], 2) & Ne(X[1], 1), Eq(X[0], 2) & | |
StochasticStateSpaceOf(X, [0, 1, 2]) & TransitionMatrixOf(X, TO1)) is S.Zero | |
assert P(Ne(X[1], 2) & Ne(X[1], 1), Eq(X[0], 2) & | |
StochasticStateSpaceOf(X, [S(0), '0', 1]) & TransitionMatrixOf(X, TO1)) is S.Zero | |
assert P(And(Eq(Y[2], 1), Eq(Y[1], 1), Eq(Y[0], 0)), Eq(Y[1], 1)) == 0.1*Probability(Eq(Y[0], 0)) | |
# testing properties of Markov chain | |
TO2 = Matrix([[S.One, 0, 0],[Rational(1, 3), Rational(1, 3), Rational(1, 3)],[0, Rational(1, 4), Rational(3, 4)]]) | |
TO3 = Matrix([[Rational(1, 4), Rational(3, 4), 0],[Rational(1, 3), Rational(1, 3), Rational(1, 3)], [0, Rational(1, 4), Rational(3, 4)]]) | |
Y2 = DiscreteMarkovChain('Y', trans_probs=TO2) | |
Y3 = DiscreteMarkovChain('Y', trans_probs=TO3) | |
assert Y3.fundamental_matrix() == ImmutableMatrix([[176, 81, -132], [36, 141, -52], [-44, -39, 208]])/125 | |
assert Y2.is_absorbing_chain() == True | |
assert Y3.is_absorbing_chain() == False | |
assert Y2.canonical_form() == ([0, 1, 2], TO2) | |
assert Y3.canonical_form() == ([0, 1, 2], TO3) | |
assert Y2.decompose() == ([0, 1, 2], TO2[0:1, 0:1], TO2[1:3, 0:1], TO2[1:3, 1:3]) | |
assert Y3.decompose() == ([0, 1, 2], TO3, Matrix(0, 3, []), Matrix(0, 0, [])) | |
TO4 = Matrix([[Rational(1, 5), Rational(2, 5), Rational(2, 5)], [Rational(1, 10), S.Half, Rational(2, 5)], [Rational(3, 5), Rational(3, 10), Rational(1, 10)]]) | |
Y4 = DiscreteMarkovChain('Y', trans_probs=TO4) | |
w = ImmutableMatrix([[Rational(11, 39), Rational(16, 39), Rational(4, 13)]]) | |
assert Y4.limiting_distribution == w | |
assert Y4.is_regular() == True | |
assert Y4.is_ergodic() == True | |
TS1 = MatrixSymbol('T', 3, 3) | |
Y5 = DiscreteMarkovChain('Y', trans_probs=TS1) | |
assert Y5.limiting_distribution(w, TO4).doit() == True | |
assert Y5.stationary_distribution(condition_set=True).subs(TS1, TO4).contains(w).doit() == S.true | |
TO6 = Matrix([[S.One, 0, 0, 0, 0],[S.Half, 0, S.Half, 0, 0],[0, S.Half, 0, S.Half, 0], [0, 0, S.Half, 0, S.Half], [0, 0, 0, 0, 1]]) | |
Y6 = DiscreteMarkovChain('Y', trans_probs=TO6) | |
assert Y6.fundamental_matrix() == ImmutableMatrix([[Rational(3, 2), S.One, S.Half], [S.One, S(2), S.One], [S.Half, S.One, Rational(3, 2)]]) | |
assert Y6.absorbing_probabilities() == ImmutableMatrix([[Rational(3, 4), Rational(1, 4)], [S.Half, S.Half], [Rational(1, 4), Rational(3, 4)]]) | |
with warns_deprecated_sympy(): | |
Y6.absorbing_probabilites() | |
TO7 = Matrix([[Rational(1, 2), Rational(1, 4), Rational(1, 4)], [Rational(1, 2), 0, Rational(1, 2)], [Rational(1, 4), Rational(1, 4), Rational(1, 2)]]) | |
Y7 = DiscreteMarkovChain('Y', trans_probs=TO7) | |
assert Y7.is_absorbing_chain() == False | |
assert Y7.fundamental_matrix() == ImmutableMatrix([[Rational(86, 75), Rational(1, 25), Rational(-14, 75)], | |
[Rational(2, 25), Rational(21, 25), Rational(2, 25)], | |
[Rational(-14, 75), Rational(1, 25), Rational(86, 75)]]) | |
# test for zero-sized matrix functionality | |
X = DiscreteMarkovChain('X', trans_probs=Matrix([[]])) | |
assert X.number_of_states == 0 | |
assert X.stationary_distribution() == Matrix([[]]) | |
assert X.communication_classes() == [] | |
assert X.canonical_form() == ([], Matrix([[]])) | |
assert X.decompose() == ([], Matrix([[]]), Matrix([[]]), Matrix([[]])) | |
assert X.is_regular() == False | |
assert X.is_ergodic() == False | |
# test communication_class | |
# see https://drive.google.com/drive/folders/1HbxLlwwn2b3U8Lj7eb_ASIUb5vYaNIjg?usp=sharing | |
# tutorial 2.pdf | |
TO7 = Matrix([[0, 5, 5, 0, 0], | |
[0, 0, 0, 10, 0], | |
[5, 0, 5, 0, 0], | |
[0, 10, 0, 0, 0], | |
[0, 3, 0, 3, 4]])/10 | |
Y7 = DiscreteMarkovChain('Y', trans_probs=TO7) | |
tuples = Y7.communication_classes() | |
classes, recurrence, periods = list(zip(*tuples)) | |
assert classes == ([1, 3], [0, 2], [4]) | |
assert recurrence == (True, False, False) | |
assert periods == (2, 1, 1) | |
TO8 = Matrix([[0, 0, 0, 10, 0, 0], | |
[5, 0, 5, 0, 0, 0], | |
[0, 4, 0, 0, 0, 6], | |
[10, 0, 0, 0, 0, 0], | |
[0, 10, 0, 0, 0, 0], | |
[0, 0, 0, 5, 5, 0]])/10 | |
Y8 = DiscreteMarkovChain('Y', trans_probs=TO8) | |
tuples = Y8.communication_classes() | |
classes, recurrence, periods = list(zip(*tuples)) | |
assert classes == ([0, 3], [1, 2, 5, 4]) | |
assert recurrence == (True, False) | |
assert periods == (2, 2) | |
TO9 = Matrix([[2, 0, 0, 3, 0, 0, 3, 2, 0, 0], | |
[0, 10, 0, 0, 0, 0, 0, 0, 0, 0], | |
[0, 2, 2, 0, 0, 0, 0, 0, 3, 3], | |
[0, 0, 0, 3, 0, 0, 6, 1, 0, 0], | |
[0, 0, 0, 0, 5, 5, 0, 0, 0, 0], | |
[0, 0, 0, 0, 0, 10, 0, 0, 0, 0], | |
[4, 0, 0, 5, 0, 0, 1, 0, 0, 0], | |
[2, 0, 0, 4, 0, 0, 2, 2, 0, 0], | |
[3, 0, 1, 0, 0, 0, 0, 0, 4, 2], | |
[0, 0, 4, 0, 0, 0, 0, 0, 3, 3]])/10 | |
Y9 = DiscreteMarkovChain('Y', trans_probs=TO9) | |
tuples = Y9.communication_classes() | |
classes, recurrence, periods = list(zip(*tuples)) | |
assert classes == ([0, 3, 6, 7], [1], [2, 8, 9], [5], [4]) | |
assert recurrence == (True, True, False, True, False) | |
assert periods == (1, 1, 1, 1, 1) | |
# test canonical form | |
# see https://web.archive.org/web/20201230182007/https://www.dartmouth.edu/~chance/teaching_aids/books_articles/probability_book/Chapter11.pdf | |
# example 11.13 | |
T = Matrix([[1, 0, 0, 0, 0], | |
[S(1) / 2, 0, S(1) / 2, 0, 0], | |
[0, S(1) / 2, 0, S(1) / 2, 0], | |
[0, 0, S(1) / 2, 0, S(1) / 2], | |
[0, 0, 0, 0, S(1)]]) | |
DW = DiscreteMarkovChain('DW', [0, 1, 2, 3, 4], T) | |
states, A, B, C = DW.decompose() | |
assert states == [0, 4, 1, 2, 3] | |
assert A == Matrix([[1, 0], [0, 1]]) | |
assert B == Matrix([[S(1)/2, 0], [0, 0], [0, S(1)/2]]) | |
assert C == Matrix([[0, S(1)/2, 0], [S(1)/2, 0, S(1)/2], [0, S(1)/2, 0]]) | |
states, new_matrix = DW.canonical_form() | |
assert states == [0, 4, 1, 2, 3] | |
assert new_matrix == Matrix([[1, 0, 0, 0, 0], | |
[0, 1, 0, 0, 0], | |
[S(1)/2, 0, 0, S(1)/2, 0], | |
[0, 0, S(1)/2, 0, S(1)/2], | |
[0, S(1)/2, 0, S(1)/2, 0]]) | |
# test regular and ergodic | |
# https://web.archive.org/web/20201230182007/https://www.dartmouth.edu/~chance/teaching_aids/books_articles/probability_book/Chapter11.pdf | |
T = Matrix([[0, 4, 0, 0, 0], | |
[1, 0, 3, 0, 0], | |
[0, 2, 0, 2, 0], | |
[0, 0, 3, 0, 1], | |
[0, 0, 0, 4, 0]])/4 | |
X = DiscreteMarkovChain('X', trans_probs=T) | |
assert not X.is_regular() | |
assert X.is_ergodic() | |
T = Matrix([[0, 1], [1, 0]]) | |
X = DiscreteMarkovChain('X', trans_probs=T) | |
assert not X.is_regular() | |
assert X.is_ergodic() | |
# http://www.math.wisc.edu/~valko/courses/331/MC2.pdf | |
T = Matrix([[2, 1, 1], | |
[2, 0, 2], | |
[1, 1, 2]])/4 | |
X = DiscreteMarkovChain('X', trans_probs=T) | |
assert X.is_regular() | |
assert X.is_ergodic() | |
# https://docs.ufpr.br/~lucambio/CE222/1S2014/Kemeny-Snell1976.pdf | |
T = Matrix([[1, 1], [1, 1]])/2 | |
X = DiscreteMarkovChain('X', trans_probs=T) | |
assert X.is_regular() | |
assert X.is_ergodic() | |
# test is_absorbing_chain | |
T = Matrix([[0, 1, 0], | |
[1, 0, 0], | |
[0, 0, 1]]) | |
X = DiscreteMarkovChain('X', trans_probs=T) | |
assert not X.is_absorbing_chain() | |
# https://en.wikipedia.org/wiki/Absorbing_Markov_chain | |
T = Matrix([[1, 1, 0, 0], | |
[0, 1, 1, 0], | |
[1, 0, 0, 1], | |
[0, 0, 0, 2]])/2 | |
X = DiscreteMarkovChain('X', trans_probs=T) | |
assert X.is_absorbing_chain() | |
T = Matrix([[2, 0, 0, 0, 0], | |
[1, 0, 1, 0, 0], | |
[0, 1, 0, 1, 0], | |
[0, 0, 1, 0, 1], | |
[0, 0, 0, 0, 2]])/2 | |
X = DiscreteMarkovChain('X', trans_probs=T) | |
assert X.is_absorbing_chain() | |
# test custom state space | |
Y10 = DiscreteMarkovChain('Y', [1, 2, 3], TO2) | |
tuples = Y10.communication_classes() | |
classes, recurrence, periods = list(zip(*tuples)) | |
assert classes == ([1], [2, 3]) | |
assert recurrence == (True, False) | |
assert periods == (1, 1) | |
assert Y10.canonical_form() == ([1, 2, 3], TO2) | |
assert Y10.decompose() == ([1, 2, 3], TO2[0:1, 0:1], TO2[1:3, 0:1], TO2[1:3, 1:3]) | |
# testing miscellaneous queries | |
T = Matrix([[S.Half, Rational(1, 4), Rational(1, 4)], | |
[Rational(1, 3), 0, Rational(2, 3)], | |
[S.Half, S.Half, 0]]) | |
X = DiscreteMarkovChain('X', [0, 1, 2], T) | |
assert P(Eq(X[1], 2) & Eq(X[2], 1) & Eq(X[3], 0), | |
Eq(P(Eq(X[1], 0)), Rational(1, 4)) & Eq(P(Eq(X[1], 1)), Rational(1, 4))) == Rational(1, 12) | |
assert P(Eq(X[2], 1) | Eq(X[2], 2), Eq(X[1], 1)) == Rational(2, 3) | |
assert P(Eq(X[2], 1) & Eq(X[2], 2), Eq(X[1], 1)) is S.Zero | |
assert P(Ne(X[2], 2), Eq(X[1], 1)) == Rational(1, 3) | |
assert E(X[1]**2, Eq(X[0], 1)) == Rational(8, 3) | |
assert variance(X[1], Eq(X[0], 1)) == Rational(8, 9) | |
raises(ValueError, lambda: E(X[1], Eq(X[2], 1))) | |
raises(ValueError, lambda: DiscreteMarkovChain('X', [0, 1], T)) | |
# testing miscellaneous queries with different state space | |
X = DiscreteMarkovChain('X', ['A', 'B', 'C'], T) | |
assert P(Eq(X[1], 2) & Eq(X[2], 1) & Eq(X[3], 0), | |
Eq(P(Eq(X[1], 0)), Rational(1, 4)) & Eq(P(Eq(X[1], 1)), Rational(1, 4))) == Rational(1, 12) | |
assert P(Eq(X[2], 1) | Eq(X[2], 2), Eq(X[1], 1)) == Rational(2, 3) | |
assert P(Eq(X[2], 1) & Eq(X[2], 2), Eq(X[1], 1)) is S.Zero | |
assert P(Ne(X[2], 2), Eq(X[1], 1)) == Rational(1, 3) | |
a = X.state_space.args[0] | |
c = X.state_space.args[2] | |
assert (E(X[1] ** 2, Eq(X[0], 1)) - (a**2/3 + 2*c**2/3)).simplify() == 0 | |
assert (variance(X[1], Eq(X[0], 1)) - (2*(-a/3 + c/3)**2/3 + (2*a/3 - 2*c/3)**2/3)).simplify() == 0 | |
raises(ValueError, lambda: E(X[1], Eq(X[2], 1))) | |
#testing queries with multiple RandomIndexedSymbols | |
T = Matrix([[Rational(5, 10), Rational(3, 10), Rational(2, 10)], [Rational(2, 10), Rational(7, 10), Rational(1, 10)], [Rational(3, 10), Rational(3, 10), Rational(4, 10)]]) | |
Y = DiscreteMarkovChain("Y", [0, 1, 2], T) | |
assert P(Eq(Y[7], Y[5]), Eq(Y[2], 0)).round(5) == Float(0.44428, 5) | |
assert P(Gt(Y[3], Y[1]), Eq(Y[0], 0)).round(2) == Float(0.36, 2) | |
assert P(Le(Y[5], Y[10]), Eq(Y[4], 2)).round(6) == Float(0.583120, 6) | |
assert Float(P(Eq(Y[10], Y[5]), Eq(Y[4], 1)), 14) == Float(1 - P(Ne(Y[10], Y[5]), Eq(Y[4], 1)), 14) | |
assert Float(P(Gt(Y[8], Y[9]), Eq(Y[3], 2)), 14) == Float(1 - P(Le(Y[8], Y[9]), Eq(Y[3], 2)), 14) | |
assert Float(P(Lt(Y[1], Y[4]), Eq(Y[0], 0)), 14) == Float(1 - P(Ge(Y[1], Y[4]), Eq(Y[0], 0)), 14) | |
assert P(Eq(Y[5], Y[10]), Eq(Y[2], 1)) == P(Eq(Y[10], Y[5]), Eq(Y[2], 1)) | |
assert P(Gt(Y[1], Y[2]), Eq(Y[0], 1)) == P(Lt(Y[2], Y[1]), Eq(Y[0], 1)) | |
assert P(Ge(Y[7], Y[6]), Eq(Y[4], 1)) == P(Le(Y[6], Y[7]), Eq(Y[4], 1)) | |
#test symbolic queries | |
a, b, c, d = symbols('a b c d') | |
T = Matrix([[Rational(1, 10), Rational(4, 10), Rational(5, 10)], [Rational(3, 10), Rational(4, 10), Rational(3, 10)], [Rational(7, 10), Rational(2, 10), Rational(1, 10)]]) | |
Y = DiscreteMarkovChain("Y", [0, 1, 2], T) | |
query = P(Eq(Y[a], b), Eq(Y[c], d)) | |
assert query.subs({a:10, b:2, c:5, d:1}).evalf().round(4) == P(Eq(Y[10], 2), Eq(Y[5], 1)).round(4) | |
assert query.subs({a:15, b:0, c:10, d:1}).evalf().round(4) == P(Eq(Y[15], 0), Eq(Y[10], 1)).round(4) | |
query_gt = P(Gt(Y[a], b), Eq(Y[c], d)) | |
query_le = P(Le(Y[a], b), Eq(Y[c], d)) | |
assert query_gt.subs({a:5, b:2, c:1, d:0}).evalf() + query_le.subs({a:5, b:2, c:1, d:0}).evalf() == 1.0 | |
query_ge = P(Ge(Y[a], b), Eq(Y[c], d)) | |
query_lt = P(Lt(Y[a], b), Eq(Y[c], d)) | |
assert query_ge.subs({a:4, b:1, c:0, d:2}).evalf() + query_lt.subs({a:4, b:1, c:0, d:2}).evalf() == 1.0 | |
#test issue 20078 | |
assert (2*Y[1] + 3*Y[1]).simplify() == 5*Y[1] | |
assert (2*Y[1] - 3*Y[1]).simplify() == -Y[1] | |
assert (2*(0.25*Y[1])).simplify() == 0.5*Y[1] | |
assert ((2*Y[1]) * (0.25*Y[1])).simplify() == 0.5*Y[1]**2 | |
assert (Y[1]**2 + Y[1]**3).simplify() == (Y[1] + 1)*Y[1]**2 | |
def test_sample_stochastic_process(): | |
if not import_module('scipy'): | |
skip('SciPy Not installed. Skip sampling tests') | |
import random | |
random.seed(0) | |
numpy = import_module('numpy') | |
if numpy: | |
numpy.random.seed(0) # scipy uses numpy to sample so to set its seed | |
T = Matrix([[0.5, 0.2, 0.3],[0.2, 0.5, 0.3],[0.2, 0.3, 0.5]]) | |
Y = DiscreteMarkovChain("Y", [0, 1, 2], T) | |
for samps in range(10): | |
assert next(sample_stochastic_process(Y)) in Y.state_space | |
Z = DiscreteMarkovChain("Z", ['1', 1, 0], T) | |
for samps in range(10): | |
assert next(sample_stochastic_process(Z)) in Z.state_space | |
T = Matrix([[S.Half, Rational(1, 4), Rational(1, 4)], | |
[Rational(1, 3), 0, Rational(2, 3)], | |
[S.Half, S.Half, 0]]) | |
X = DiscreteMarkovChain('X', [0, 1, 2], T) | |
for samps in range(10): | |
assert next(sample_stochastic_process(X)) in X.state_space | |
W = DiscreteMarkovChain('W', [1, pi, oo], T) | |
for samps in range(10): | |
assert next(sample_stochastic_process(W)) in W.state_space | |
def test_ContinuousMarkovChain(): | |
T1 = Matrix([[S(-2), S(2), S.Zero], | |
[S.Zero, S.NegativeOne, S.One], | |
[Rational(3, 2), Rational(3, 2), S(-3)]]) | |
C1 = ContinuousMarkovChain('C', [0, 1, 2], T1) | |
assert C1.limiting_distribution() == ImmutableMatrix([[Rational(3, 19), Rational(12, 19), Rational(4, 19)]]) | |
T2 = Matrix([[-S.One, S.One, S.Zero], [S.One, -S.One, S.Zero], [S.Zero, S.One, -S.One]]) | |
C2 = ContinuousMarkovChain('C', [0, 1, 2], T2) | |
A, t = C2.generator_matrix, symbols('t', positive=True) | |
assert C2.transition_probabilities(A)(t) == Matrix([[S.Half + exp(-2*t)/2, S.Half - exp(-2*t)/2, 0], | |
[S.Half - exp(-2*t)/2, S.Half + exp(-2*t)/2, 0], | |
[S.Half - exp(-t) + exp(-2*t)/2, S.Half - exp(-2*t)/2, exp(-t)]]) | |
with ignore_warnings(UserWarning): ### TODO: Restore tests once warnings are removed | |
assert P(Eq(C2(1), 1), Eq(C2(0), 1), evaluate=False) == Probability(Eq(C2(1), 1), Eq(C2(0), 1)) | |
assert P(Eq(C2(1), 1), Eq(C2(0), 1)) == exp(-2)/2 + S.Half | |
assert P(Eq(C2(1), 0) & Eq(C2(2), 1) & Eq(C2(3), 1), | |
Eq(P(Eq(C2(1), 0)), S.Half)) == (Rational(1, 4) - exp(-2)/4)*(exp(-2)/2 + S.Half) | |
assert P(Not(Eq(C2(1), 0) & Eq(C2(2), 1) & Eq(C2(3), 2)) | | |
(Eq(C2(1), 0) & Eq(C2(2), 1) & Eq(C2(3), 2)), | |
Eq(P(Eq(C2(1), 0)), Rational(1, 4)) & Eq(P(Eq(C2(1), 1)), Rational(1, 4))) is S.One | |
assert E(C2(Rational(3, 2)), Eq(C2(0), 2)) == -exp(-3)/2 + 2*exp(Rational(-3, 2)) + S.Half | |
assert variance(C2(Rational(3, 2)), Eq(C2(0), 1)) == ((S.Half - exp(-3)/2)**2*(exp(-3)/2 + S.Half) | |
+ (Rational(-1, 2) - exp(-3)/2)**2*(S.Half - exp(-3)/2)) | |
raises(KeyError, lambda: P(Eq(C2(1), 0), Eq(P(Eq(C2(1), 1)), S.Half))) | |
assert P(Eq(C2(1), 0), Eq(P(Eq(C2(5), 1)), S.Half)) == Probability(Eq(C2(1), 0)) | |
TS1 = MatrixSymbol('G', 3, 3) | |
CS1 = ContinuousMarkovChain('C', [0, 1, 2], TS1) | |
A = CS1.generator_matrix | |
assert CS1.transition_probabilities(A)(t) == exp(t*A) | |
C3 = ContinuousMarkovChain('C', [Symbol('0'), Symbol('1'), Symbol('2')], T2) | |
assert P(Eq(C3(1), 1), Eq(C3(0), 1)) == exp(-2)/2 + S.Half | |
assert P(Eq(C3(1), Symbol('1')), Eq(C3(0), Symbol('1'))) == exp(-2)/2 + S.Half | |
#test probability queries | |
G = Matrix([[-S(1), Rational(1, 10), Rational(9, 10)], [Rational(2, 5), -S(1), Rational(3, 5)], [Rational(1, 2), Rational(1, 2), -S(1)]]) | |
C = ContinuousMarkovChain('C', state_space=[0, 1, 2], gen_mat=G) | |
assert P(Eq(C(7.385), C(3.19)), Eq(C(0.862), 0)).round(5) == Float(0.35469, 5) | |
assert P(Gt(C(98.715), C(19.807)), Eq(C(11.314), 2)).round(5) == Float(0.32452, 5) | |
assert P(Le(C(5.9), C(10.112)), Eq(C(4), 1)).round(6) == Float(0.675214, 6) | |
assert Float(P(Eq(C(7.32), C(2.91)), Eq(C(2.63), 1)), 14) == Float(1 - P(Ne(C(7.32), C(2.91)), Eq(C(2.63), 1)), 14) | |
assert Float(P(Gt(C(3.36), C(1.101)), Eq(C(0.8), 2)), 14) == Float(1 - P(Le(C(3.36), C(1.101)), Eq(C(0.8), 2)), 14) | |
assert Float(P(Lt(C(4.9), C(2.79)), Eq(C(1.61), 0)), 14) == Float(1 - P(Ge(C(4.9), C(2.79)), Eq(C(1.61), 0)), 14) | |
assert P(Eq(C(5.243), C(10.912)), Eq(C(2.174), 1)) == P(Eq(C(10.912), C(5.243)), Eq(C(2.174), 1)) | |
assert P(Gt(C(2.344), C(9.9)), Eq(C(1.102), 1)) == P(Lt(C(9.9), C(2.344)), Eq(C(1.102), 1)) | |
assert P(Ge(C(7.87), C(1.008)), Eq(C(0.153), 1)) == P(Le(C(1.008), C(7.87)), Eq(C(0.153), 1)) | |
#test symbolic queries | |
a, b, c, d = symbols('a b c d') | |
query = P(Eq(C(a), b), Eq(C(c), d)) | |
assert query.subs({a:3.65, b:2, c:1.78, d:1}).evalf().round(10) == P(Eq(C(3.65), 2), Eq(C(1.78), 1)).round(10) | |
query_gt = P(Gt(C(a), b), Eq(C(c), d)) | |
query_le = P(Le(C(a), b), Eq(C(c), d)) | |
assert query_gt.subs({a:13.2, b:0, c:3.29, d:2}).evalf() + query_le.subs({a:13.2, b:0, c:3.29, d:2}).evalf() == 1.0 | |
query_ge = P(Ge(C(a), b), Eq(C(c), d)) | |
query_lt = P(Lt(C(a), b), Eq(C(c), d)) | |
assert query_ge.subs({a:7.43, b:1, c:1.45, d:0}).evalf() + query_lt.subs({a:7.43, b:1, c:1.45, d:0}).evalf() == 1.0 | |
#test issue 20078 | |
assert (2*C(1) + 3*C(1)).simplify() == 5*C(1) | |
assert (2*C(1) - 3*C(1)).simplify() == -C(1) | |
assert (2*(0.25*C(1))).simplify() == 0.5*C(1) | |
assert (2*C(1) * 0.25*C(1)).simplify() == 0.5*C(1)**2 | |
assert (C(1)**2 + C(1)**3).simplify() == (C(1) + 1)*C(1)**2 | |
def test_BernoulliProcess(): | |
B = BernoulliProcess("B", p=0.6, success=1, failure=0) | |
assert B.state_space == FiniteSet(0, 1) | |
assert B.index_set == S.Naturals0 | |
assert B.success == 1 | |
assert B.failure == 0 | |
X = BernoulliProcess("X", p=Rational(1,3), success='H', failure='T') | |
assert X.state_space == FiniteSet('H', 'T') | |
H, T = symbols("H,T") | |
assert E(X[1]+X[2]*X[3]) == H**2/9 + 4*H*T/9 + H/3 + 4*T**2/9 + 2*T/3 | |
t, x = symbols('t, x', positive=True, integer=True) | |
assert isinstance(B[t], RandomIndexedSymbol) | |
raises(ValueError, lambda: BernoulliProcess("X", p=1.1, success=1, failure=0)) | |
raises(NotImplementedError, lambda: B(t)) | |
raises(IndexError, lambda: B[-3]) | |
assert B.joint_distribution(B[3], B[9]) == JointDistributionHandmade(Lambda((B[3], B[9]), | |
Piecewise((0.6, Eq(B[3], 1)), (0.4, Eq(B[3], 0)), (0, True)) | |
*Piecewise((0.6, Eq(B[9], 1)), (0.4, Eq(B[9], 0)), (0, True)))) | |
assert B.joint_distribution(2, B[4]) == JointDistributionHandmade(Lambda((B[2], B[4]), | |
Piecewise((0.6, Eq(B[2], 1)), (0.4, Eq(B[2], 0)), (0, True)) | |
*Piecewise((0.6, Eq(B[4], 1)), (0.4, Eq(B[4], 0)), (0, True)))) | |
# Test for the sum distribution of Bernoulli Process RVs | |
Y = B[1] + B[2] + B[3] | |
assert P(Eq(Y, 0)).round(2) == Float(0.06, 1) | |
assert P(Eq(Y, 2)).round(2) == Float(0.43, 2) | |
assert P(Eq(Y, 4)).round(2) == 0 | |
assert P(Gt(Y, 1)).round(2) == Float(0.65, 2) | |
# Test for independency of each Random Indexed variable | |
assert P(Eq(B[1], 0) & Eq(B[2], 1) & Eq(B[3], 0) & Eq(B[4], 1)).round(2) == Float(0.06, 1) | |
assert E(2 * B[1] + B[2]).round(2) == Float(1.80, 3) | |
assert E(2 * B[1] + B[2] + 5).round(2) == Float(6.80, 3) | |
assert E(B[2] * B[4] + B[10]).round(2) == Float(0.96, 2) | |
assert E(B[2] > 0, Eq(B[1],1) & Eq(B[2],1)).round(2) == Float(0.60,2) | |
assert E(B[1]) == 0.6 | |
assert P(B[1] > 0).round(2) == Float(0.60, 2) | |
assert P(B[1] < 1).round(2) == Float(0.40, 2) | |
assert P(B[1] > 0, B[2] <= 1).round(2) == Float(0.60, 2) | |
assert P(B[12] * B[5] > 0).round(2) == Float(0.36, 2) | |
assert P(B[12] * B[5] > 0, B[4] < 1).round(2) == Float(0.36, 2) | |
assert P(Eq(B[2], 1), B[2] > 0) == 1.0 | |
assert P(Eq(B[5], 3)) == 0 | |
assert P(Eq(B[1], 1), B[1] < 0) == 0 | |
assert P(B[2] > 0, Eq(B[2], 1)) == 1 | |
assert P(B[2] < 0, Eq(B[2], 1)) == 0 | |
assert P(B[2] > 0, B[2]==7) == 0 | |
assert P(B[5] > 0, B[5]) == BernoulliDistribution(0.6, 0, 1) | |
raises(ValueError, lambda: P(3)) | |
raises(ValueError, lambda: P(B[3] > 0, 3)) | |
# test issue 19456 | |
expr = Sum(B[t], (t, 0, 4)) | |
expr2 = Sum(B[t], (t, 1, 3)) | |
expr3 = Sum(B[t]**2, (t, 1, 3)) | |
assert expr.doit() == B[0] + B[1] + B[2] + B[3] + B[4] | |
assert expr2.doit() == Y | |
assert expr3.doit() == B[1]**2 + B[2]**2 + B[3]**2 | |
assert B[2*t].free_symbols == {B[2*t], t} | |
assert B[4].free_symbols == {B[4]} | |
assert B[x*t].free_symbols == {B[x*t], x, t} | |
#test issue 20078 | |
assert (2*B[t] + 3*B[t]).simplify() == 5*B[t] | |
assert (2*B[t] - 3*B[t]).simplify() == -B[t] | |
assert (2*(0.25*B[t])).simplify() == 0.5*B[t] | |
assert (2*B[t] * 0.25*B[t]).simplify() == 0.5*B[t]**2 | |
assert (B[t]**2 + B[t]**3).simplify() == (B[t] + 1)*B[t]**2 | |
def test_PoissonProcess(): | |
X = PoissonProcess("X", 3) | |
assert X.state_space == S.Naturals0 | |
assert X.index_set == Interval(0, oo) | |
assert X.lamda == 3 | |
t, d, x, y = symbols('t d x y', positive=True) | |
assert isinstance(X(t), RandomIndexedSymbol) | |
assert X.distribution(t) == PoissonDistribution(3*t) | |
with warns_deprecated_sympy(): | |
X.distribution(X(t)) | |
raises(ValueError, lambda: PoissonProcess("X", -1)) | |
raises(NotImplementedError, lambda: X[t]) | |
raises(IndexError, lambda: X(-5)) | |
assert X.joint_distribution(X(2), X(3)) == JointDistributionHandmade(Lambda((X(2), X(3)), | |
6**X(2)*9**X(3)*exp(-15)/(factorial(X(2))*factorial(X(3))))) | |
assert X.joint_distribution(4, 6) == JointDistributionHandmade(Lambda((X(4), X(6)), | |
12**X(4)*18**X(6)*exp(-30)/(factorial(X(4))*factorial(X(6))))) | |
assert P(X(t) < 1) == exp(-3*t) | |
assert P(Eq(X(t), 0), Contains(t, Interval.Lopen(3, 5))) == exp(-6) # exp(-2*lamda) | |
res = P(Eq(X(t), 1), Contains(t, Interval.Lopen(3, 4))) | |
assert res == 3*exp(-3) | |
# Equivalent to P(Eq(X(t), 1))**4 because of non-overlapping intervals | |
assert P(Eq(X(t), 1) & Eq(X(d), 1) & Eq(X(x), 1) & Eq(X(y), 1), Contains(t, Interval.Lopen(0, 1)) | |
& Contains(d, Interval.Lopen(1, 2)) & Contains(x, Interval.Lopen(2, 3)) | |
& Contains(y, Interval.Lopen(3, 4))) == res**4 | |
# Return Probability because of overlapping intervals | |
assert P(Eq(X(t), 2) & Eq(X(d), 3), Contains(t, Interval.Lopen(0, 2)) | |
& Contains(d, Interval.Ropen(2, 4))) == \ | |
Probability(Eq(X(d), 3) & Eq(X(t), 2), Contains(t, Interval.Lopen(0, 2)) | |
& Contains(d, Interval.Ropen(2, 4))) | |
raises(ValueError, lambda: P(Eq(X(t), 2) & Eq(X(d), 3), | |
Contains(t, Interval.Lopen(0, 4)) & Contains(d, Interval.Lopen(3, oo)))) # no bound on d | |
assert P(Eq(X(3), 2)) == 81*exp(-9)/2 | |
assert P(Eq(X(t), 2), Contains(t, Interval.Lopen(0, 5))) == 225*exp(-15)/2 | |
# Check that probability works correctly by adding it to 1 | |
res1 = P(X(t) <= 3, Contains(t, Interval.Lopen(0, 5))) | |
res2 = P(X(t) > 3, Contains(t, Interval.Lopen(0, 5))) | |
assert res1 == 691*exp(-15) | |
assert (res1 + res2).simplify() == 1 | |
# Check Not and Or | |
assert P(Not(Eq(X(t), 2) & (X(d) > 3)), Contains(t, Interval.Ropen(2, 4)) & \ | |
Contains(d, Interval.Lopen(7, 8))).simplify() == -18*exp(-6) + 234*exp(-9) + 1 | |
assert P(Eq(X(t), 2) | Ne(X(t), 4), Contains(t, Interval.Ropen(2, 4))) == 1 - 36*exp(-6) | |
raises(ValueError, lambda: P(X(t) > 2, X(t) + X(d))) | |
assert E(X(t)) == 3*t # property of the distribution at a given timestamp | |
assert E(X(t)**2 + X(d)*2 + X(y)**3, Contains(t, Interval.Lopen(0, 1)) | |
& Contains(d, Interval.Lopen(1, 2)) & Contains(y, Interval.Ropen(3, 4))) == 75 | |
assert E(X(t)**2, Contains(t, Interval.Lopen(0, 1))) == 12 | |
assert E(x*(X(t) + X(d))*(X(t)**2+X(d)**2), Contains(t, Interval.Lopen(0, 1)) | |
& Contains(d, Interval.Ropen(1, 2))) == \ | |
Expectation(x*(X(d) + X(t))*(X(d)**2 + X(t)**2), Contains(t, Interval.Lopen(0, 1)) | |
& Contains(d, Interval.Ropen(1, 2))) | |
# Value Error because of infinite time bound | |
raises(ValueError, lambda: E(X(t)**3, Contains(t, Interval.Lopen(1, oo)))) | |
# Equivalent to E(X(t)**2) - E(X(d)**2) == E(X(1)**2) - E(X(1)**2) == 0 | |
assert E((X(t) + X(d))*(X(t) - X(d)), Contains(t, Interval.Lopen(0, 1)) | |
& Contains(d, Interval.Lopen(1, 2))) == 0 | |
assert E(X(2) + x*E(X(5))) == 15*x + 6 | |
assert E(x*X(1) + y) == 3*x + y | |
assert P(Eq(X(1), 2) & Eq(X(t), 3), Contains(t, Interval.Lopen(1, 2))) == 81*exp(-6)/4 | |
Y = PoissonProcess("Y", 6) | |
Z = X + Y | |
assert Z.lamda == X.lamda + Y.lamda == 9 | |
raises(ValueError, lambda: X + 5) # should be added be only PoissonProcess instance | |
N, M = Z.split(4, 5) | |
assert N.lamda == 4 | |
assert M.lamda == 5 | |
raises(ValueError, lambda: Z.split(3, 2)) # 2+3 != 9 | |
raises(ValueError, lambda :P(Eq(X(t), 0), Contains(t, Interval.Lopen(1, 3)) & Eq(X(1), 0))) | |
# check if it handles queries with two random variables in one args | |
res1 = P(Eq(N(3), N(5))) | |
assert res1 == P(Eq(N(t), 0), Contains(t, Interval(3, 5))) | |
res2 = P(N(3) > N(1)) | |
assert res2 == P((N(t) > 0), Contains(t, Interval(1, 3))) | |
assert P(N(3) < N(1)) == 0 # condition is not possible | |
res3 = P(N(3) <= N(1)) # holds only for Eq(N(3), N(1)) | |
assert res3 == P(Eq(N(t), 0), Contains(t, Interval(1, 3))) | |
# tests from https://www.probabilitycourse.com/chapter11/11_1_2_basic_concepts_of_the_poisson_process.php | |
X = PoissonProcess('X', 10) # 11.1 | |
assert P(Eq(X(S(1)/3), 3) & Eq(X(1), 10)) == exp(-10)*Rational(8000000000, 11160261) | |
assert P(Eq(X(1), 1), Eq(X(S(1)/3), 3)) == 0 | |
assert P(Eq(X(1), 10), Eq(X(S(1)/3), 3)) == P(Eq(X(S(2)/3), 7)) | |
X = PoissonProcess('X', 2) # 11.2 | |
assert P(X(S(1)/2) < 1) == exp(-1) | |
assert P(X(3) < 1, Eq(X(1), 0)) == exp(-4) | |
assert P(Eq(X(4), 3), Eq(X(2), 3)) == exp(-4) | |
X = PoissonProcess('X', 3) | |
assert P(Eq(X(2), 5) & Eq(X(1), 2)) == Rational(81, 4)*exp(-6) | |
# check few properties | |
assert P(X(2) <= 3, X(1)>=1) == 3*P(Eq(X(1), 0)) + 2*P(Eq(X(1), 1)) + P(Eq(X(1), 2)) | |
assert P(X(2) <= 3, X(1) > 1) == 2*P(Eq(X(1), 0)) + 1*P(Eq(X(1), 1)) | |
assert P(Eq(X(2), 5) & Eq(X(1), 2)) == P(Eq(X(1), 3))*P(Eq(X(1), 2)) | |
assert P(Eq(X(3), 4), Eq(X(1), 3)) == P(Eq(X(2), 1)) | |
#test issue 20078 | |
assert (2*X(t) + 3*X(t)).simplify() == 5*X(t) | |
assert (2*X(t) - 3*X(t)).simplify() == -X(t) | |
assert (2*(0.25*X(t))).simplify() == 0.5*X(t) | |
assert (2*X(t) * 0.25*X(t)).simplify() == 0.5*X(t)**2 | |
assert (X(t)**2 + X(t)**3).simplify() == (X(t) + 1)*X(t)**2 | |
def test_WienerProcess(): | |
X = WienerProcess("X") | |
assert X.state_space == S.Reals | |
assert X.index_set == Interval(0, oo) | |
t, d, x, y = symbols('t d x y', positive=True) | |
assert isinstance(X(t), RandomIndexedSymbol) | |
assert X.distribution(t) == NormalDistribution(0, sqrt(t)) | |
with warns_deprecated_sympy(): | |
X.distribution(X(t)) | |
raises(ValueError, lambda: PoissonProcess("X", -1)) | |
raises(NotImplementedError, lambda: X[t]) | |
raises(IndexError, lambda: X(-2)) | |
assert X.joint_distribution(X(2), X(3)) == JointDistributionHandmade( | |
Lambda((X(2), X(3)), sqrt(6)*exp(-X(2)**2/4)*exp(-X(3)**2/6)/(12*pi))) | |
assert X.joint_distribution(4, 6) == JointDistributionHandmade( | |
Lambda((X(4), X(6)), sqrt(6)*exp(-X(4)**2/8)*exp(-X(6)**2/12)/(24*pi))) | |
assert P(X(t) < 3).simplify() == erf(3*sqrt(2)/(2*sqrt(t)))/2 + S(1)/2 | |
assert P(X(t) > 2, Contains(t, Interval.Lopen(3, 7))).simplify() == S(1)/2 -\ | |
erf(sqrt(2)/2)/2 | |
# Equivalent to P(X(1)>1)**4 | |
assert P((X(t) > 4) & (X(d) > 3) & (X(x) > 2) & (X(y) > 1), | |
Contains(t, Interval.Lopen(0, 1)) & Contains(d, Interval.Lopen(1, 2)) | |
& Contains(x, Interval.Lopen(2, 3)) & Contains(y, Interval.Lopen(3, 4))).simplify() ==\ | |
(1 - erf(sqrt(2)/2))*(1 - erf(sqrt(2)))*(1 - erf(3*sqrt(2)/2))*(1 - erf(2*sqrt(2)))/16 | |
# Contains an overlapping interval so, return Probability | |
assert P((X(t)< 2) & (X(d)> 3), Contains(t, Interval.Lopen(0, 2)) | |
& Contains(d, Interval.Ropen(2, 4))) == Probability((X(d) > 3) & (X(t) < 2), | |
Contains(d, Interval.Ropen(2, 4)) & Contains(t, Interval.Lopen(0, 2))) | |
assert str(P(Not((X(t) < 5) & (X(d) > 3)), Contains(t, Interval.Ropen(2, 4)) & | |
Contains(d, Interval.Lopen(7, 8))).simplify()) == \ | |
'-(1 - erf(3*sqrt(2)/2))*(2 - erfc(5/2))/4 + 1' | |
# Distribution has mean 0 at each timestamp | |
assert E(X(t)) == 0 | |
assert E(x*(X(t) + X(d))*(X(t)**2+X(d)**2), Contains(t, Interval.Lopen(0, 1)) | |
& Contains(d, Interval.Ropen(1, 2))) == Expectation(x*(X(d) + X(t))*(X(d)**2 + X(t)**2), | |
Contains(d, Interval.Ropen(1, 2)) & Contains(t, Interval.Lopen(0, 1))) | |
assert E(X(t) + x*E(X(3))) == 0 | |
#test issue 20078 | |
assert (2*X(t) + 3*X(t)).simplify() == 5*X(t) | |
assert (2*X(t) - 3*X(t)).simplify() == -X(t) | |
assert (2*(0.25*X(t))).simplify() == 0.5*X(t) | |
assert (2*X(t) * 0.25*X(t)).simplify() == 0.5*X(t)**2 | |
assert (X(t)**2 + X(t)**3).simplify() == (X(t) + 1)*X(t)**2 | |
def test_GammaProcess_symbolic(): | |
t, d, x, y, g, l = symbols('t d x y g l', positive=True) | |
X = GammaProcess("X", l, g) | |
raises(NotImplementedError, lambda: X[t]) | |
raises(IndexError, lambda: X(-1)) | |
assert isinstance(X(t), RandomIndexedSymbol) | |
assert X.state_space == Interval(0, oo) | |
assert X.distribution(t) == GammaDistribution(g*t, 1/l) | |
with warns_deprecated_sympy(): | |
X.distribution(X(t)) | |
assert X.joint_distribution(5, X(3)) == JointDistributionHandmade(Lambda( | |
(X(5), X(3)), l**(8*g)*exp(-l*X(3))*exp(-l*X(5))*X(3)**(3*g - 1)*X(5)**(5*g | |
- 1)/(gamma(3*g)*gamma(5*g)))) | |
# property of the gamma process at any given timestamp | |
assert E(X(t)) == g*t/l | |
assert variance(X(t)).simplify() == g*t/l**2 | |
# Equivalent to E(2*X(1)) + E(X(1)**2) + E(X(1)**3), where E(X(1)) == g/l | |
assert E(X(t)**2 + X(d)*2 + X(y)**3, Contains(t, Interval.Lopen(0, 1)) | |
& Contains(d, Interval.Lopen(1, 2)) & Contains(y, Interval.Ropen(3, 4))) == \ | |
2*g/l + (g**2 + g)/l**2 + (g**3 + 3*g**2 + 2*g)/l**3 | |
assert P(X(t) > 3, Contains(t, Interval.Lopen(3, 4))).simplify() == \ | |
1 - lowergamma(g, 3*l)/gamma(g) # equivalent to P(X(1)>3) | |
#test issue 20078 | |
assert (2*X(t) + 3*X(t)).simplify() == 5*X(t) | |
assert (2*X(t) - 3*X(t)).simplify() == -X(t) | |
assert (2*(0.25*X(t))).simplify() == 0.5*X(t) | |
assert (2*X(t) * 0.25*X(t)).simplify() == 0.5*X(t)**2 | |
assert (X(t)**2 + X(t)**3).simplify() == (X(t) + 1)*X(t)**2 | |
def test_GammaProcess_numeric(): | |
t, d, x, y = symbols('t d x y', positive=True) | |
X = GammaProcess("X", 1, 2) | |
assert X.state_space == Interval(0, oo) | |
assert X.index_set == Interval(0, oo) | |
assert X.lamda == 1 | |
assert X.gamma == 2 | |
raises(ValueError, lambda: GammaProcess("X", -1, 2)) | |
raises(ValueError, lambda: GammaProcess("X", 0, -2)) | |
raises(ValueError, lambda: GammaProcess("X", -1, -2)) | |
# all are independent because of non-overlapping intervals | |
assert P((X(t) > 4) & (X(d) > 3) & (X(x) > 2) & (X(y) > 1), Contains(t, | |
Interval.Lopen(0, 1)) & Contains(d, Interval.Lopen(1, 2)) & Contains(x, | |
Interval.Lopen(2, 3)) & Contains(y, Interval.Lopen(3, 4))).simplify() == \ | |
120*exp(-10) | |
# Check working with Not and Or | |
assert P(Not((X(t) < 5) & (X(d) > 3)), Contains(t, Interval.Ropen(2, 4)) & | |
Contains(d, Interval.Lopen(7, 8))).simplify() == -4*exp(-3) + 472*exp(-8)/3 + 1 | |
assert P((X(t) > 2) | (X(t) < 4), Contains(t, Interval.Ropen(1, 4))).simplify() == \ | |
-643*exp(-4)/15 + 109*exp(-2)/15 + 1 | |
assert E(X(t)) == 2*t # E(X(t)) == gamma*t/l | |
assert E(X(2) + x*E(X(5))) == 10*x + 4 | |