Spaces:
Sleeping
Sleeping
import itertools | |
from sympy.core.add import Add | |
from sympy.core.expr import Expr | |
from sympy.core.function import expand as _expand | |
from sympy.core.mul import Mul | |
from sympy.core.singleton import S | |
from sympy.matrices.exceptions import ShapeError | |
from sympy.matrices.expressions.matexpr import MatrixExpr | |
from sympy.matrices.expressions.matmul import MatMul | |
from sympy.matrices.expressions.special import ZeroMatrix | |
from sympy.stats.rv import RandomSymbol, is_random | |
from sympy.core.sympify import _sympify | |
from sympy.stats.symbolic_probability import Variance, Covariance, Expectation | |
class ExpectationMatrix(Expectation, MatrixExpr): | |
""" | |
Expectation of a random matrix expression. | |
Examples | |
======== | |
>>> from sympy.stats import ExpectationMatrix, Normal | |
>>> from sympy.stats.rv import RandomMatrixSymbol | |
>>> from sympy import symbols, MatrixSymbol, Matrix | |
>>> k = symbols("k") | |
>>> A, B = MatrixSymbol("A", k, k), MatrixSymbol("B", k, k) | |
>>> X, Y = RandomMatrixSymbol("X", k, 1), RandomMatrixSymbol("Y", k, 1) | |
>>> ExpectationMatrix(X) | |
ExpectationMatrix(X) | |
>>> ExpectationMatrix(A*X).shape | |
(k, 1) | |
To expand the expectation in its expression, use ``expand()``: | |
>>> ExpectationMatrix(A*X + B*Y).expand() | |
A*ExpectationMatrix(X) + B*ExpectationMatrix(Y) | |
>>> ExpectationMatrix((X + Y)*(X - Y).T).expand() | |
ExpectationMatrix(X*X.T) - ExpectationMatrix(X*Y.T) + ExpectationMatrix(Y*X.T) - ExpectationMatrix(Y*Y.T) | |
To evaluate the ``ExpectationMatrix``, use ``doit()``: | |
>>> N11, N12 = Normal('N11', 11, 1), Normal('N12', 12, 1) | |
>>> N21, N22 = Normal('N21', 21, 1), Normal('N22', 22, 1) | |
>>> M11, M12 = Normal('M11', 1, 1), Normal('M12', 2, 1) | |
>>> M21, M22 = Normal('M21', 3, 1), Normal('M22', 4, 1) | |
>>> x1 = Matrix([[N11, N12], [N21, N22]]) | |
>>> x2 = Matrix([[M11, M12], [M21, M22]]) | |
>>> ExpectationMatrix(x1 + x2).doit() | |
Matrix([ | |
[12, 14], | |
[24, 26]]) | |
""" | |
def __new__(cls, expr, condition=None): | |
expr = _sympify(expr) | |
if condition is None: | |
if not is_random(expr): | |
return expr | |
obj = Expr.__new__(cls, expr) | |
else: | |
condition = _sympify(condition) | |
obj = Expr.__new__(cls, expr, condition) | |
obj._shape = expr.shape | |
obj._condition = condition | |
return obj | |
def shape(self): | |
return self._shape | |
def expand(self, **hints): | |
expr = self.args[0] | |
condition = self._condition | |
if not is_random(expr): | |
return expr | |
if isinstance(expr, Add): | |
return Add.fromiter(Expectation(a, condition=condition).expand() | |
for a in expr.args) | |
expand_expr = _expand(expr) | |
if isinstance(expand_expr, Add): | |
return Add.fromiter(Expectation(a, condition=condition).expand() | |
for a in expand_expr.args) | |
elif isinstance(expr, (Mul, MatMul)): | |
rv = [] | |
nonrv = [] | |
postnon = [] | |
for a in expr.args: | |
if is_random(a): | |
if rv: | |
rv.extend(postnon) | |
else: | |
nonrv.extend(postnon) | |
postnon = [] | |
rv.append(a) | |
elif a.is_Matrix: | |
postnon.append(a) | |
else: | |
nonrv.append(a) | |
# In order to avoid infinite-looping (MatMul may call .doit() again), | |
# do not rebuild | |
if len(nonrv) == 0: | |
return self | |
return Mul.fromiter(nonrv)*Expectation(Mul.fromiter(rv), | |
condition=condition)*Mul.fromiter(postnon) | |
return self | |
class VarianceMatrix(Variance, MatrixExpr): | |
""" | |
Variance of a random matrix probability expression. Also known as | |
Covariance matrix, auto-covariance matrix, dispersion matrix, | |
or variance-covariance matrix. | |
Examples | |
======== | |
>>> from sympy.stats import VarianceMatrix | |
>>> from sympy.stats.rv import RandomMatrixSymbol | |
>>> from sympy import symbols, MatrixSymbol | |
>>> k = symbols("k") | |
>>> A, B = MatrixSymbol("A", k, k), MatrixSymbol("B", k, k) | |
>>> X, Y = RandomMatrixSymbol("X", k, 1), RandomMatrixSymbol("Y", k, 1) | |
>>> VarianceMatrix(X) | |
VarianceMatrix(X) | |
>>> VarianceMatrix(X).shape | |
(k, k) | |
To expand the variance in its expression, use ``expand()``: | |
>>> VarianceMatrix(A*X).expand() | |
A*VarianceMatrix(X)*A.T | |
>>> VarianceMatrix(A*X + B*Y).expand() | |
2*A*CrossCovarianceMatrix(X, Y)*B.T + A*VarianceMatrix(X)*A.T + B*VarianceMatrix(Y)*B.T | |
""" | |
def __new__(cls, arg, condition=None): | |
arg = _sympify(arg) | |
if 1 not in arg.shape: | |
raise ShapeError("Expression is not a vector") | |
shape = (arg.shape[0], arg.shape[0]) if arg.shape[1] == 1 else (arg.shape[1], arg.shape[1]) | |
if condition: | |
obj = Expr.__new__(cls, arg, condition) | |
else: | |
obj = Expr.__new__(cls, arg) | |
obj._shape = shape | |
obj._condition = condition | |
return obj | |
def shape(self): | |
return self._shape | |
def expand(self, **hints): | |
arg = self.args[0] | |
condition = self._condition | |
if not is_random(arg): | |
return ZeroMatrix(*self.shape) | |
if isinstance(arg, RandomSymbol): | |
return self | |
elif isinstance(arg, Add): | |
rv = [] | |
for a in arg.args: | |
if is_random(a): | |
rv.append(a) | |
variances = Add(*(Variance(xv, condition).expand() for xv in rv)) | |
map_to_covar = lambda x: 2*Covariance(*x, condition=condition).expand() | |
covariances = Add(*map(map_to_covar, itertools.combinations(rv, 2))) | |
return variances + covariances | |
elif isinstance(arg, (Mul, MatMul)): | |
nonrv = [] | |
rv = [] | |
for a in arg.args: | |
if is_random(a): | |
rv.append(a) | |
else: | |
nonrv.append(a) | |
if len(rv) == 0: | |
return ZeroMatrix(*self.shape) | |
# Avoid possible infinite loops with MatMul: | |
if len(nonrv) == 0: | |
return self | |
# Variance of many multiple matrix products is not implemented: | |
if len(rv) > 1: | |
return self | |
return Mul.fromiter(nonrv)*Variance(Mul.fromiter(rv), | |
condition)*(Mul.fromiter(nonrv)).transpose() | |
# this expression contains a RandomSymbol somehow: | |
return self | |
class CrossCovarianceMatrix(Covariance, MatrixExpr): | |
""" | |
Covariance of a random matrix probability expression. | |
Examples | |
======== | |
>>> from sympy.stats import CrossCovarianceMatrix | |
>>> from sympy.stats.rv import RandomMatrixSymbol | |
>>> from sympy import symbols, MatrixSymbol | |
>>> k = symbols("k") | |
>>> A, B = MatrixSymbol("A", k, k), MatrixSymbol("B", k, k) | |
>>> C, D = MatrixSymbol("C", k, k), MatrixSymbol("D", k, k) | |
>>> X, Y = RandomMatrixSymbol("X", k, 1), RandomMatrixSymbol("Y", k, 1) | |
>>> Z, W = RandomMatrixSymbol("Z", k, 1), RandomMatrixSymbol("W", k, 1) | |
>>> CrossCovarianceMatrix(X, Y) | |
CrossCovarianceMatrix(X, Y) | |
>>> CrossCovarianceMatrix(X, Y).shape | |
(k, k) | |
To expand the covariance in its expression, use ``expand()``: | |
>>> CrossCovarianceMatrix(X + Y, Z).expand() | |
CrossCovarianceMatrix(X, Z) + CrossCovarianceMatrix(Y, Z) | |
>>> CrossCovarianceMatrix(A*X, Y).expand() | |
A*CrossCovarianceMatrix(X, Y) | |
>>> CrossCovarianceMatrix(A*X, B.T*Y).expand() | |
A*CrossCovarianceMatrix(X, Y)*B | |
>>> CrossCovarianceMatrix(A*X + B*Y, C.T*Z + D.T*W).expand() | |
A*CrossCovarianceMatrix(X, W)*D + A*CrossCovarianceMatrix(X, Z)*C + B*CrossCovarianceMatrix(Y, W)*D + B*CrossCovarianceMatrix(Y, Z)*C | |
""" | |
def __new__(cls, arg1, arg2, condition=None): | |
arg1 = _sympify(arg1) | |
arg2 = _sympify(arg2) | |
if (1 not in arg1.shape) or (1 not in arg2.shape) or (arg1.shape[1] != arg2.shape[1]): | |
raise ShapeError("Expression is not a vector") | |
shape = (arg1.shape[0], arg2.shape[0]) if arg1.shape[1] == 1 and arg2.shape[1] == 1 \ | |
else (1, 1) | |
if condition: | |
obj = Expr.__new__(cls, arg1, arg2, condition) | |
else: | |
obj = Expr.__new__(cls, arg1, arg2) | |
obj._shape = shape | |
obj._condition = condition | |
return obj | |
def shape(self): | |
return self._shape | |
def expand(self, **hints): | |
arg1 = self.args[0] | |
arg2 = self.args[1] | |
condition = self._condition | |
if arg1 == arg2: | |
return VarianceMatrix(arg1, condition).expand() | |
if not is_random(arg1) or not is_random(arg2): | |
return ZeroMatrix(*self.shape) | |
if isinstance(arg1, RandomSymbol) and isinstance(arg2, RandomSymbol): | |
return CrossCovarianceMatrix(arg1, arg2, condition) | |
coeff_rv_list1 = self._expand_single_argument(arg1.expand()) | |
coeff_rv_list2 = self._expand_single_argument(arg2.expand()) | |
addends = [a*CrossCovarianceMatrix(r1, r2, condition=condition)*b.transpose() | |
for (a, r1) in coeff_rv_list1 for (b, r2) in coeff_rv_list2] | |
return Add.fromiter(addends) | |
def _expand_single_argument(cls, expr): | |
# return (coefficient, random_symbol) pairs: | |
if isinstance(expr, RandomSymbol): | |
return [(S.One, expr)] | |
elif isinstance(expr, Add): | |
outval = [] | |
for a in expr.args: | |
if isinstance(a, (Mul, MatMul)): | |
outval.append(cls._get_mul_nonrv_rv_tuple(a)) | |
elif is_random(a): | |
outval.append((S.One, a)) | |
return outval | |
elif isinstance(expr, (Mul, MatMul)): | |
return [cls._get_mul_nonrv_rv_tuple(expr)] | |
elif is_random(expr): | |
return [(S.One, expr)] | |
def _get_mul_nonrv_rv_tuple(cls, m): | |
rv = [] | |
nonrv = [] | |
for a in m.args: | |
if is_random(a): | |
rv.append(a) | |
else: | |
nonrv.append(a) | |
return (Mul.fromiter(nonrv), Mul.fromiter(rv)) | |