|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from sklearn.decomposition import FastICA, PCA, IncrementalPCA, MiniBatchSparsePCA, SparsePCA, KernelPCA |
|
import fbpca |
|
import numpy as np |
|
import itertools |
|
from types import SimpleNamespace |
|
|
|
|
|
class ICAEstimator(): |
|
def __init__(self, n_components): |
|
self.n_components = n_components |
|
self.maxiter = 10000 |
|
self.whiten = True |
|
self.transformer = FastICA(n_components, random_state=0, whiten=self.whiten, max_iter=self.maxiter) |
|
self.batch_support = False |
|
self.stdev = np.zeros((n_components,)) |
|
self.total_var = 0.0 |
|
|
|
def get_param_str(self): |
|
return "ica_c{}{}".format(self.n_components, '_w' if self.whiten else '') |
|
|
|
def fit(self, X): |
|
self.transformer.fit(X) |
|
if self.transformer.n_iter_ >= self.maxiter: |
|
raise RuntimeError(f'FastICA did not converge (N={X.shape[0]}, it={self.maxiter})') |
|
|
|
|
|
self.transformer.components_ /= np.sqrt(np.sum(self.transformer.components_**2, axis=-1, keepdims=True)) |
|
|
|
|
|
self.total_var = X.var(axis=0).sum() |
|
|
|
|
|
self.stdev = np.dot(self.transformer.components_, X.T).std(axis=1) |
|
|
|
|
|
idx = np.argsort(self.stdev)[::-1] |
|
self.stdev = self.stdev[idx] |
|
self.transformer.components_[:] = self.transformer.components_[idx] |
|
|
|
def get_components(self): |
|
var_ratio = self.stdev**2 / self.total_var |
|
return self.transformer.components_, self.stdev, var_ratio |
|
|
|
|
|
class IPCAEstimator(): |
|
def __init__(self, n_components): |
|
self.n_components = n_components |
|
self.whiten = False |
|
self.transformer = IncrementalPCA(n_components, whiten=self.whiten, batch_size=max(100, 2*n_components)) |
|
self.batch_support = True |
|
|
|
def get_param_str(self): |
|
return "ipca_c{}{}".format(self.n_components, '_w' if self.whiten else '') |
|
|
|
def fit(self, X): |
|
self.transformer.fit(X) |
|
|
|
def fit_partial(self, X): |
|
try: |
|
self.transformer.partial_fit(X) |
|
self.transformer.n_samples_seen_ = \ |
|
self.transformer.n_samples_seen_.astype(np.int64) |
|
return True |
|
except ValueError as e: |
|
print(f'\nIPCA error:', e) |
|
return False |
|
|
|
def get_components(self): |
|
stdev = np.sqrt(self.transformer.explained_variance_) |
|
var_ratio = self.transformer.explained_variance_ratio_ |
|
return self.transformer.components_, stdev, var_ratio |
|
|
|
|
|
class PCAEstimator(): |
|
def __init__(self, n_components): |
|
self.n_components = n_components |
|
self.solver = 'full' |
|
self.transformer = PCA(n_components, svd_solver=self.solver) |
|
self.batch_support = False |
|
|
|
def get_param_str(self): |
|
return f"pca-{self.solver}_c{self.n_components}" |
|
|
|
def fit(self, X): |
|
self.transformer.fit(X) |
|
|
|
|
|
self.total_var = X.var(axis=0).sum() |
|
|
|
|
|
self.stdev = np.dot(self.transformer.components_, X.T).std(axis=1) |
|
|
|
|
|
idx = np.argsort(self.stdev)[::-1] |
|
self.stdev = self.stdev[idx] |
|
self.transformer.components_[:] = self.transformer.components_[idx] |
|
|
|
|
|
dotps = [np.dot(*self.transformer.components_[[i, j]]) |
|
for (i, j) in itertools.combinations(range(self.n_components), 2)] |
|
if not np.allclose(dotps, 0, atol=1e-4): |
|
print('IPCA components not orghogonal, max dot', np.abs(dotps).max()) |
|
|
|
self.transformer.mean_ = X.mean(axis=0, keepdims=True) |
|
|
|
def get_components(self): |
|
var_ratio = self.stdev**2 / self.total_var |
|
return self.transformer.components_, self.stdev, var_ratio |
|
|
|
|
|
|
|
|
|
|
|
class FacebookPCAEstimator(): |
|
def __init__(self, n_components): |
|
self.n_components = n_components |
|
self.transformer = SimpleNamespace() |
|
self.batch_support = False |
|
self.n_iter = 2 |
|
self.l = 2*self.n_components |
|
|
|
def get_param_str(self): |
|
return "fbpca_c{}_it{}_l{}".format(self.n_components, self.n_iter, self.l) |
|
|
|
def fit(self, X): |
|
U, s, Va = fbpca.pca(X, k=self.n_components, n_iter=self.n_iter, raw=True, l=self.l) |
|
self.transformer.components_ = Va |
|
|
|
|
|
self.total_var = X.var(axis=0).sum() |
|
|
|
|
|
self.stdev = np.dot(self.transformer.components_, X.T).std(axis=1) |
|
|
|
|
|
idx = np.argsort(self.stdev)[::-1] |
|
self.stdev = self.stdev[idx] |
|
self.transformer.components_[:] = self.transformer.components_[idx] |
|
|
|
|
|
dotps = [np.dot(*self.transformer.components_[[i, j]]) |
|
for (i, j) in itertools.combinations(range(self.n_components), 2)] |
|
if not np.allclose(dotps, 0, atol=1e-4): |
|
print('FBPCA components not orghogonal, max dot', np.abs(dotps).max()) |
|
|
|
self.transformer.mean_ = X.mean(axis=0, keepdims=True) |
|
|
|
def get_components(self): |
|
var_ratio = self.stdev**2 / self.total_var |
|
return self.transformer.components_, self.stdev, var_ratio |
|
|
|
|
|
|
|
|
|
class SPCAEstimator(): |
|
def __init__(self, n_components, alpha=10.0): |
|
self.n_components = n_components |
|
self.whiten = False |
|
self.alpha = alpha |
|
|
|
|
|
self.transformer = SparsePCA(n_components, alpha=alpha, ridge_alpha=0.01, |
|
max_iter=100, random_state=0, n_jobs=-1, normalize_components=True) |
|
self.batch_support = False |
|
self.stdev = np.zeros((n_components,)) |
|
self.total_var = 0.0 |
|
|
|
def get_param_str(self): |
|
return "spca_c{}_a{}{}".format(self.n_components, self.alpha, '_w' if self.whiten else '') |
|
|
|
def fit(self, X): |
|
self.transformer.fit(X) |
|
|
|
|
|
self.total_var = X.var(axis=0).sum() |
|
|
|
|
|
|
|
self.stdev = self.transformer.transform(X).std(axis=0) |
|
|
|
|
|
idx = np.argsort(self.stdev)[::-1] |
|
self.stdev = self.stdev[idx] |
|
self.transformer.components_[:] = self.transformer.components_[idx] |
|
|
|
|
|
dotps = [np.dot(*self.transformer.components_[[i, j]]) |
|
for (i, j) in itertools.combinations(range(self.n_components), 2)] |
|
if not np.allclose(dotps, 0, atol=1e-4): |
|
print('SPCA components not orghogonal, max dot', np.abs(dotps).max()) |
|
|
|
def get_components(self): |
|
var_ratio = self.stdev**2 / self.total_var |
|
return self.transformer.components_, self.stdev, var_ratio |
|
|
|
def get_estimator(name, n_components, alpha): |
|
if name == 'pca': |
|
return PCAEstimator(n_components) |
|
if name == 'ipca': |
|
return IPCAEstimator(n_components) |
|
elif name == 'fbpca': |
|
return FacebookPCAEstimator(n_components) |
|
elif name == 'ica': |
|
return ICAEstimator(n_components) |
|
elif name == 'spca': |
|
return SPCAEstimator(n_components, alpha) |
|
else: |
|
raise RuntimeError('Unknown estimator') |