Source code for biokit.stats.mixture

# source
# http://nbviewer.ipython.org/github/tritemio/notebooks/blob/master/Mixture_Model_Fitting.ipynb

from easydev import DevTools
devtools = DevTools()
from scipy.optimize import minimize, show_options
import scipy.stats as ss
import numpy as np
import pylab
from easydev import AttrDict

from . import criteria

import numpy as np

half_log_two_pi = 0.5*np.log(2*np.pi)


[docs]class Model(object): """New base model""" def __init__(self): pass
[docs] def log_density(self, data): raise NotImplementedError
[docs] def estimate(self, data, weights): raise NotImplementedError
[docs] def generate(self): raise NotImplementedError
[docs] def pdf(self): raise NotImplementedError
def __repr__(self): raise NotImplementedError
[docs]class GaussianModel(Model): """New gaussian model not used yet""" def __init__(self, mu=1, sigma=1): super(GaussianModel, self).__init__() self.mu = mu self.sigma = sigma
[docs] def log_density(self, data): # log of gaussian distribution res = -(data-self.mu)**2 / (2 * self.sigma**2) res += - np.log(self.sigma) - half_log_two_pi return res
[docs] def estimate(self, data, weights=None): assert(len(data.shape) == 1), "Expect 1D data!" # d L/ dmu ==0 and dL/dsigma==0 if weights: wsum = np.sum(weights) self.mu = np.sum(weights * data) / wsum self.sigma = np.sqrt(np.sum(weights * (data - self.mu) ** 2) / wsum) else: self.mu = np.sum(data) self.sigma = np.sqrt(np.sum( (data - self.mu) ** 2) )
[docs] def generate(self, N): return np.array([scipy.stats.norm.rvs(self.mu, self.sigma) for this in range(N)])
[docs] def pdf(self,data): pass
[docs]class PoissonModel(Model): """New poisson model not used yet""" def __init__(self, lmbda=10): super(PoissonModel, self).__init__() self.lmbda = lmbda
[docs] def log_density(self, data): # log of gaussian distribution return - self.lmbda + data * np.log(self.lmbda)
[docs] def estimate(self, data, weights=None): assert(len(data.shape) == 1), "Expect 1D data!" # d L/ dmu ==0 and dL/dsigma==0 if weights: wsum = np.sum(weights) self.lmbda = np.sum(weights * data) / wsum else: self.lmbda = np.sum(data)
# Log likelihood of poisson of N iid is L = PI_i^n \lambda^{x_i} # e^-\lambda / x_i! and the loglikelihood is then l = sum_i^n # x_i log\lmabda - n \lambda
[docs] def generate(self, N): return np.array([scipy.stats.poisson.rvs(self.lmbda) for this in range(N)])
def __repr__(self): return "Poisson[lmbda={lmbda:.4g}]".format(lmbda=self.lmbda)
[docs]class GaussianMixture(object): """Creates a mix of Gaussian distribution .. plot:: :width: 80% :include-source: from biokit.stats.mixture import GaussianMixture m = GaussianMixture(mu=[-1,1], sigma=[0.5, 0.5], mixture=[.2, .8], N=1000) m.plot() """ def __init__(self, mu=[-1,1], sigma=[1,1], mixture=[0.5,0.5], N=1000): """.. rubric:: constructor :param list mu: list of mean for each model :param list sigma: list of standard deviation for each model :param list mixture: list of amplitude for each model """ assert len(mu) == len(sigma) assert len(mu) == len(mixture) self.mu = mu self.sigma = sigma self.mixture = mixture self.data = [] self.N = N self.Ns = [int(x*N) for x in mixture] self.k = len(self.mu) if sum(self.Ns) != N: print('Warning: rounding mixture ratio. total N will be %s' % sum(self.Ns)) for m, s, n in zip(self.mu, self.sigma, self.Ns): data = pylab.normal(m, s, size=n) self.data.extend(data)
[docs] def plot(self, bins=30, normed=True): self.hist(bins=bins, normed=normed)
[docs] def hist(self, bins=30, normed=True): pylab.hist(self.data, bins=bins, normed=normed)
[docs]class GaussianMixtureModel(object): """Gaussian Mixture Model .. plot:: from biokit.stats import mixture m = mixture.GaussianMixtureModel() m.pdf(1, params=[1, 0.5, 0.2, 1, 0.5, 0.8]) """ def __init__(self, k=2): self.k = k
[docs] def pdf(self, x, params, normalise=True): """Expected parameters are params is a list of gaussian distribution ordered as mu, sigma, pi, mu2, sigma2, pi2, ... """ assert divmod(len(params), 3)[1] == 0 assert len(params) >= 3 * self.k k = len(params) / 3 self.k = k pis = np.array(params[2::3]) if any(np.array(pis)<0): return 0 if normalise is True: pis /= pis.sum() # !!! sum pi must equal 1 otherwise may diverge badly data = 0 for i in range(0, int(k)): mu, sigma, pi_ = params[i*3: (i+1)*3] pi_ = pis[i] if sigma != 0: data += pi_ * ss.norm.pdf(x, mu, sigma) return data
[docs] def log_likelihood(self, params, sample): res = -1 * pylab.log(self.pdf(sample, params)).sum() return res
[docs]class Fitting(object): """Base class for :class:`EM` and :class:`GaussianMixtureFitting`""" def __init__(self, data, k=2, method='Nelder-Mead'): """.. rubric:: constructor :param list data: :param int k: number of GMM to use :param str method: minimization method to be used (one of scipy optimise module) """ self.data = np.array(data) self.size = float(len(self.data)) self._k = k self._model = None # initialise the model self.k = k self.verbose = True def _get_k(self): return self._k def _set_k(self, k): assert k > 0 gmm = GaussianMixtureModel(k=k) self._k = k self._model = gmm k = property(_get_k, _set_k) def _get_model(self): return self._model model = property(_get_model)
[docs] def get_guess(self): """Random guess to initialise optimisation""" params = {} m = self.data.min() M = self.data.max() range_ = M - m mus = [m + range_ / (self.k+1.) * i for i in range(1, self.k+1)] params['mus'] = mus sigma = range_ / float(self.k+1) / 2. params['sigmas'] = [sigma] * self.k params['pis'] = [1./self.k] * self.k params = [ [mu,sigma,pi] for mu,sigma,pi in zip(params['mus'], params['sigmas'], params['pis'])] params = list(pylab.flatten(params)) return params
[docs] def plot(self, normed=True, N=1000, Xmin=None, Xmax=None, bins=50, color='red', lw=2, hist_kw={'color':'#5F9EA0', "edgecolor":"k"}, ax=None): if ax: ax.hist(self.data, normed=normed, bins=bins, **hist_kw) else: pylab.hist(self.data, density=normed, bins=bins, **hist_kw) if Xmin is None: Xmin = self.data.min() if Xmax is None: Xmax = self.data.max() X = pylab.linspace(Xmin, Xmax, N) if ax: ax.plot(X, [self.model.pdf(x, self.results.x) for x in X], color=color, lw=lw) else: pylab.plot(X, [self.model.pdf(x, self.results.x) for x in X], color=color, lw=lw) K = len(self.results.x) # The PIs must be normalised for i in range(self.k): mu, sigma, pi_ = self.results.mus[i], self.results.sigmas[i], self.results.pis[i] if ax: ax.plot(X, [pi_ * ss.norm.pdf(x, mu, sigma) for x in X], 'k--', alpha=0.7, lw=2) else: pylab.plot(X, [pi_ * ss.norm.pdf(x, mu, sigma) for x in X], 'k--', alpha=0.7, lw=2)
[docs]class GaussianMixtureFitting(Fitting): """GaussianMixtureFitting using scipy minization .. plot:: :width: 80% :include-source: from biokit.stats.mixture import GaussianMixture, GaussianMixtureFitting m = GaussianMixture(mu=[-1,1], sigma=[0.5,0.5], mixture=[0.2,0.8]) mf = GaussianMixtureFitting(m.data) mf.estimate(k=2) mf.plot() """ def __init__(self, data, k=2, method='Nelder-Mead'): """ Here we use the function minimize() from scipy.optimization. The list of (currently) available minimization methods is 'Nelder-Mead' (simplex), 'Powell', 'CG', 'BFGS', 'Newton-CG',> 'Anneal', 'L-BFGS-B' (like BFGS but bounded), 'TNC', 'COBYLA', 'SLSQPG'. """ super(GaussianMixtureFitting, self).__init__(data, k=k, method=method) self._method = method def _get_method(self): return self._method def _set_method(self, method): devtools.check_param_in_list(method, ['Nelder-Mead', 'Powell', 'CG', 'BFGS', 'Newton-CG', 'Anneal', 'L-BFGS-B']) self._method = method method = property(_get_method, _set_method)
[docs] def estimate(self, guess=None, k=None, maxfev=2e4, maxiter=1e3, bounds=None): """guess is a list of parameters as expected by the model guess = {'mus':[1,2], 'sigmas': [0.5, 0.5], 'pis': [0.3, 0.7] } """ if k is not None: self.k = k if guess is None: # estimate the mu/sigma/pis from the data guess = self.get_guess() res = minimize(self.model.log_likelihood, x0=guess, args=(self.data,), method=self.method, options=dict(maxiter=maxiter, maxfev=maxfev), bounds=bounds) self.results = res pis = np.array(self.results.x[2::3]) self.results.pis_raw = pis.copy() # The ratio may be negative, in which case we need to normalise. # An example would be to have -0.35, -0.15, which normalise would five 0.7, 0.3 as expected. """if sum(pis<0) > 0: unstable = True pis /= pis.sum() if self.verbose: print("Unstable... found negative pis (k=%s)" % self.k) else: unstable = False pis /= pis.sum() """ unstable = False k = len(self.results.x)/3 params = [] for i in range(0, int(k)): params.append(self.results.x[i*3]) params.append(self.results.x[(i*3+1)]) params.append(pis[i]) self.results.x = params # FIXME shall we multiply by -1 ?? self.results.log_likelihood = self.model.log_likelihood(params, self.data) if self.results.log_likelihood and unstable is False: self.results.AIC = criteria.AIC(self.results.log_likelihood, self.k, logL=True) self.results.AICc = criteria.AICc(self.results.log_likelihood, self.k, self.data.size, logL=True) self.results.BIC = criteria.BIC(self.results.log_likelihood, self.k, self.data.size, logL=True) else: self.results.AIC = 1000 self.results.AICc = 1000 self.results.BIC = 1000 pis = np.array(self.results.x[2::3]) self.results.pis = list(pis / pis.sum()) self.results.sigmas = self.results.x[1::3] self.results.mus = self.results.x[0::3] return res
[docs]class EM(Fitting): """Expectation minimization class to estimate parameters of GMM .. plot:: :width: 50% :include-source: from biokit.stats.mixture import GaussianMixture, EM m = GaussianMixture(mu=[-1,1], sigma=[0.5,0.5], mixture=[0.2,0.8]) em = EM(m.data) em.estimate(k=2) em.plot() """ def __init__(self, data, model=None, max_iter=100): """.. rubric:: constructor :param data: :param model: not used. Model is the :class:`GaussianMixtureModel` but could be other model. :param int max_iter: max iteration for the minization """ super(EM, self).__init__(data, k=2) # default is k=2 self.max_iter = max_iter #@do_profile()
[docs] def estimate(self, guess=None, k=2): """ :param list guess: a list to provide the initial guess. Order is mu1, sigma1, pi1, mu2, ... :param int k: number of models to be used. """ #print("EM estimation") self.k = k # Initial guess of parameters and initializations if guess is None: # estimate the mu/sigma/pis from the data guess = self.get_guess() mu = np.array(guess[0::3]) sig = np.array(guess[1::3]) pi_ = np.array(guess[2::3]) N_ = len(pi_) gamma = np.zeros((N_, int(self.size))) N_ = np.zeros(N_) p_new = guess # EM loop counter = 0 converged = False self.mus = [] while not converged: # Compute the responsibility func. and new parameters for k in range(0, self.k): # unstable if eslf.model.pdf is made of zeros #self.model.pdf(self.data, p_new,normalise=False).sum()!=0: gamma[k, :] = pi_[k] * ss.norm.pdf(self.data, mu[k], sig[k]) gamma[k, :] /= (self.model.pdf(self.data, p_new, normalise=False)) """else: gamma[k, :] = pi_[k]*pylab.normpdf(self.data, mu[k], sig[k])/(self.model.pdf(self.data, p_new, normalise=False)+1e-6) """ N_[k] = gamma[k].sum() mu[k] = np.sum(gamma[k]*self.data)/N_[k] sig[k] = pylab.sqrt( np.sum(gamma[k]*(self.data - mu[k])**2)/N_[k] ) pi_[k] = N_[k] / self.size self.results = {'x': p_new, 'nfev':counter, 'success': converged} p_new = [] for this in range(self.k): p_new.extend([mu[this], sig[this], pi_[this]]) #p_new = [(mu[x], sig[x], pi_[x]) for x in range(0, self.k)] #p_new = list(pylab.flatten(p_new)) self.status = True try: assert abs(N_.sum() - self.size)/self.size < 1e-6 assert abs(pi_.sum() - 1) < 1e-6 except: print("issue arised at iteration %s" % counter) self.debug = {'N':N_, 'pis':pi_} self.status = False break self.mus.append(mu) # Convergence check counter += 1 converged = counter >= self.max_iter self.gamma = gamma if self.status is True: self.results = {'x': p_new, 'nfev':counter, 'success': converged} self.results = AttrDict(**self.results) self.results.mus = self.results.x[0::3] self.results.sigmas = self.results.x[1::3] self.results.pis = self.results.x[2::3] log_likelihood = self.model.log_likelihood(self.results.x, self.data) self.results.AIC = criteria.AIC(log_likelihood, k, logL=True) self.results.log_likelihood = log_likelihood self.results.AIC = criteria.AIC(log_likelihood, self.k, logL=True) self.results.AICc = criteria.AICc(log_likelihood, self.k, self.data.size, logL=True) self.results.BIC = criteria.BIC(log_likelihood, self.k, self.data.size, logL=True)
[docs] def plot(self, model_parameters=None, **kwargs): """ Take a list of dictionnaries with models parameters to plot predicted models. If user doesn't provide parameters, the standard plot function from fitting is used. Example: model_parameters=[{"mu": 5, "sigma": 0.5, "pi": 1}] """ if not model_parameters: return super(EM, self).plot(**kwargs) # Set parameters with the dictionnary self.k = len(model_parameters) self.results = AttrDict() self.results.mus = [model["mu"] for model in model_parameters] self.results.sigmas = [model["sigma"] for model in model_parameters] self.results.pis = [model["pi"] for model in model_parameters] parms_keys = ("mu", "sigma", "pi") self.results.x = [model[key] for model in model_parameters for key in parms_keys] return super(EM, self).plot(**kwargs)
[docs]class AdaptativeMixtureFitting(object): """Automatic Adaptative Mixture Fitting .. plot:: :width: 80% :include-source: from biokit.stats.mixture import AdaptativeMixtureFitting, GaussianMixture m = GaussianMixture(mu=[-1,1], sigma=[0.5,0.5], mixture=[0.2,0.8]) amf = AdaptativeMixtureFitting(m.data) amf.run(kmin=1, kmax=6) amf.diagnostic(k=amf.best_k) """ # TODO: propose to use EM or Minimization. def __init__(self, data, method='Nelder-Mead'): self.fitting = GaussianMixtureFitting(data, method=method) self.verbose = True
[docs] def run(self, kmin=1, kmax=6, criteria='AICc'): self.all_results = {} self.x = [] for k in range(kmin, kmax+1): self.fitting.estimate(k=k) # here criteria does not matter. if one fails, all fail if self.fitting.results['AIC'] != 1000: self.x.append(k) self.all_results[k] = self.fitting.results.copy() m = np.array([self.all_results[x][criteria] for x in self.x]).min() index = np.array([self.all_results[x][criteria] for x in self.x]).argmin() if self.verbose: print('Found min ', m, 'for k ', self.x[index]) self.best_k = self.x[index] self.min_value = m
[docs] def plot(self, criteria='AICc'): pylab.plot(self.x, [self.all_results[x]['BIC'] for x in self.x], 'o-', label='BIC') pylab.plot(self.x, [self.all_results[x]['AIC'] for x in self.x], 'o-', label='AIC') pylab.plot(self.x, [self.all_results[x]['AICc'] for x in self.x], 'o-', label='AICc') m = np.array([self.all_results[x][criteria] for x in self.x]).min() # [exp((m-this[criteria])/2) for this in amf.all_results] pylab.axhline(m - pylab.log(0.9)*2, color='k', label='90% equi-probability', alpha=0.9) pylab.axhline(m - pylab.log(0.5)*2, color='k', label='50% equi-probability', alpha=0.5) pylab.axhline(m - pylab.log(0.3)*2, color='k', label='30% equi-probability', alpha=0.3) pylab.legend()
[docs] def diagnostic(self, kmin=1, kmax=8, k=None, ymax=None): self.run(kmin=kmin, kmax=kmax); pylab.clf() pylab.subplot(3,1,2); self.plot() mf = GaussianMixtureFitting(self.fitting.data) if k is None: mf.estimate(k=self.best_k) else: mf.estimate(k=k) pylab.subplot(3,1,1) mf.plot() if ymax is not None: pylab.ylim([0, ymax]) pylab.subplot(3,1,3) min_value = np.array([self.all_results[x]['AICc'] for x in self.x]).min() pylab.plot(self.x, [pylab.exp((min_value-self.all_results[k]['AICc'])/2) for k in self.x], 'o-', label='AICc') min_value = np.array([self.all_results[x]['AIC'] for x in self.x]).min() pylab.plot(self.x, [pylab.exp((min_value-self.all_results[k]['AIC'])/2) for k in self.x], 'o-', label='AIC') pylab.xlabel('probability of information loss (based on AICc') pylab.legend()