diff --git a/gemact/config.py b/gemact/config.py index 08e7c9a..050761c 100644 --- a/gemact/config.py +++ b/gemact/config.py @@ -58,5 +58,7 @@ 'pareto2': 'distributions.Pareto2', 'pareto1': 'distributions.Pareto1', 'uniform': 'distributions.Uniform', - 'multinomial': 'distributions.Multinomial' + 'multinomial': 'distributions.Multinomial', + 'dirichlet multinomial' : 'distributions.Dirichlet_Multinomial', + 'negative multinomial' : 'distributions.NegMultinom' } diff --git a/gemact/distributions.py b/gemact/distributions.py index 6cbb75c..a3226b1 100644 --- a/gemact/distributions.py +++ b/gemact/distributions.py @@ -6261,7 +6261,7 @@ def name(): @staticmethod def category(): - return {'frequency'} + return {''} def cov(self): @@ -6326,4 +6326,345 @@ def logpmf(self, x): return stats.multinomial.logpmf(n=self.n, p=self.p, x=x) + +class Dirichlet_Multinomial(_MultDiscreteDistribution): + """ + Dirichlet Multinomial distribution. + Wrapper to scipy dirichlet multinomial distribution (``scipy.stats.dirichlet_multinomial``). + Refer to :py:class:'~__MultDiscreteDistribution' for additional details. + + + :param seed: Used to set a specific seed (default=np.random.RandomState). + :type seed: int + :param \\**kwargs: + See below + + :Keyword Arguments: + * *alpha* ( ``int`` or ``numpy.ndarray``) -- + Concentration parameters. + * *n* (``int``) -- + Number of trials. + """ + + def __init__(self, seed=None, **kwargs): + _DiscreteDistribution.__init__(self) + self.n = kwargs['n'] + self.alpha = kwargs['alpha'] + self.seed = seed + + @property + def seed(self): + return self.__seed + + @seed.setter + def seed(self, value): + if value is None: + value = np.random.randint(1, 1001) + + hf.assert_type_value(value, 'seed', logger, (float,int)) + value = int(value) + self.__seed = value + + @property + def n(self): + return self.__n + + @n.setter + def n(self, value): + hf.assert_type_value(value, 'n', logger, (float, int), lower_bound=1, lower_close=True) + value = int(value) + self.__n = value + + @property + def alpha(self): + return self.__alpha + + @alpha.setter + def alpha(self, value): + for element in value: + hf.assert_type_value(element, 'alpha', logger, (float, int)) + value = np.array(value) + self.__alpha = value + + @property + def _dist(self): + return stats.dirichlet_multinomial(n=self.n, alpha=self.alpha, seed=self.seed) + + @staticmethod + def name(): + return 'dirichlet multinomial' + + @staticmethod + def category(): + return {''} + + def cov(self): + """ + Covariance Matrix of a Dirichlet Multinomial Distribution. + + :return: Covariance Matrix. + :rtype: ``float`` + """ + + return stats.dirichlet_multinomial.cov(n=self.n, alpha=self.alpha) + + + def var(self): + """ + Variances of a Dirichlet Multinomial Distribution. + + :return: Array of Variances. + :rtype: numpy.ndarray + """ + + return np.diag(stats.dirichlet_multinomial.cov(n=self.n, alpha=self.alpha)) + + def pmf(self, x): + """ + Probability mass function of the Dirichlet Multinomial Distribution. + + :param x: quantile where probability mass function is evaluated. + :type x: ``int`` + + :return: probability mass function. + :rtype: ``numpy.float64`` or ``numpy.ndarray`` + """ + + if sum(x) != self.n: + raise ValueError("n != sum(x), i.e. one is wrong") + return stats.dirichlet_multinomial.pmf(n=self.n, alpha=self.alpha, x=x) + + def logpmf(self, x): + """ + Natural logarithm of the probability mass function of the Dirichlet Multinomial Distribution. + + :param x: quantile where the (natural) probability mass function logarithm is evaluated. + :type x: ``int`` + :return: natural logarithm of the probability mass function + :rtype: ``numpy.float64`` or ``numpy.ndarray`` + """ + + if sum(x) != self.n: + raise ValueError("n != sum(x), i.e. one is wrong") + return stats.dirichlet_multinomial.logpmf(n=self.n, alpha=self.alpha, x=x) + + + def mean(self): + """ + Mean of the Dirichlet Multinomial Distribution. + + :param x: quantile where the (natural) probability mass function logarithm is evaluated. + :type x: ``int`` + :return: natural logarithm of the probability mass function + :rtype: ``numpy.float64`` or ``numpy.ndarray`` + """ + return stats.dirichlet_multinomial.mean(n=self.n, alpha=self.alpha) + + + def rvs(self, size=1, random_state=None): + """ + Random variates generator function. + + :param size: random variates sample size (default is 1). + :type size: ``int``, optional + :param random_state: random state for the random number generator. + :type random_state: ``int``, optional + :return: random variates. + :rtype: ``numpy.int`` or ``numpy.ndarray`` + + """ + + random_state = hf.handle_random_state(random_state, logger) + np.random.seed(random_state) + hf.assert_type_value(size, 'size', logger, (float, int), lower_bound=1) + size = int(size) + alpha = self.alpha + n = self.n + if isinstance(alpha, np.ndarray) and len(alpha.shape) == 1: + alpha = np.tile(alpha, (size, 1)) + n = np.full(size, n) + G = np.random.gamma(shape=alpha, scale=1.0) + prob = G / np.sum(G, axis=1, keepdims=True) + ridx = np.sum(G, axis=1) == 0 + if np.any(ridx): + for i in np.where(ridx)[0]: + prob[i, :] = np.random.multinomial(1, alpha[i, :] / np.sum(alpha[i, :]), n=1).flatten() + rdm = np.array([np.random.multinomial(n[i], prob[i, :]) for i in range(size)]) + return rdm + + +class NegMultinom(_MultDiscreteDistribution): + + """ + Negative Multinomial distribution. + + :param loc: Location parameter to shift the support (default=0). + :type loc: ``int``, optional + + :param \\**kwargs: + See below + + :Keyword Arguments: + * *x0* (``int``) -- + Size parameter of the negative multinomial distribution. + * *p* (``float``) -- + Probability parameter of the negative multinomial distribution. + + """ + + def __init__(self, x0, p, loc=0): + _DiscreteDistribution.__init__(self) + self.x0 = x0 + self.p = p + self.loc = loc + + @property + def x0(self): + return self.__x0 + + @x0.setter + def x0(self, value): + hf.assert_type_value(value, 'x0', logger, (float, int), lower_bound=0, lower_close=False) + self.__x0 = value + + @property + def p(self): + return self.__p + + @p.setter + def p(self, value): + value = np.array(value) + for element in value: + hf.assert_type_value( + element, 'p', logger, (float, np.floating), + lower_bound=0, upper_bound=1, lower_close=True, upper_close=True + ) + + if np.sum(value) >= 1: + raise ValueError("Sum of success probabilities must be less than 1") + + self.__p = value + self.__p0 = 1 - np.sum(value) # Failure probability + + @property + def p0(self): + """Probability of failure (computed as 1 - sum(p))""" + return self.__p0 + + @staticmethod + def name(): + return 'negative multinomial' + + @staticmethod + def category(): + return {''} + + def pmf(self, x): + """ + Probability mass function. + + PMF formula from reference: + Γ(∑x_i) * p0^x0 / Γ(x0) * ∏(p_i^x_i / x_i!) + + :param x: Quantile where PMF is evaluated. + :type x: ``numpy.ndarray`` + :return: Probability mass function evaluated at x. + :rtype: ``numpy.float64`` + """ + x = np.array(x) + x0 = self.x0 + + gamma_term = (special.gamma(np.sum(x)+x0) / special.gamma(x0)) + prob_term = (self.p0 ** x0) * np.prod((self.p ** x) / special.factorial(x)) + + return gamma_term * prob_term + + + def logpmf(self, x): + """ + Natural logarithm of the probability mass function. + + :param x: Quantile where log-PMF is evaluated. + :type x: ``numpy.ndarray`` + :return: Log of probability mass function evaluated at x. + :rtype: ``numpy.float64`` + """ + return np.log(self.pmf(x)) + + def mean(self): + """ + Mean vector of the distribution. + + :return: Mean vector. + :rtype: ``numpy.ndarray`` + """ + return (self.x0 / self.p0) * self.p + + def var(self): + """ + Variances of a Negative Multinomial Distribution. + + :return: Array of Variances. + :rtype: ``numpy.ndarray`` + """ + return (self.x0 / self.p0**2) * self.p**2 + (self.x0 / self.p0) * self.p + + def cov(self): + """ + Covariance matrix of a Negative Multinomial Distribution. + + :return: Covariance matrix. + :rtype: ``numpy.ndarray`` + """ + p = self.p + x0 = self.x0 + p0 = self.p0 + + # Diagonal terms + diag = (x0 / p0**2) * p**2 + (x0 / p0) * p + + # Off-diagonal terms + off_diag = (x0 / p0**2) * np.outer(p, p) + + # Create full covariance matrix + cov_matrix = np.diag(diag) + off_diag - np.diag(np.diag(off_diag)) + + return cov_matrix + + def rvs(self, size=1, random_state=None): + """ + Random variates generator function. + + :param size: Number of random variates to generate (default=1). + :type size: ``int`` + :param random_state: Random state for reproducibility. + :type random_state: ``int``, optional + :return: Random variates. + :rtype: ``numpy.ndarray`` + """ + random_state = hf.handle_random_state(random_state, logger) + np.random.seed(random_state) + + samples = [] + for _ in range(size): + total = np.random.negative_binomial(self.x0, self.p0) + if total > 0: + counts = np.random.multinomial(total, self.p / (1 - self.p0)) + else: + counts = np.zeros_like(self.p) + samples.append(counts) + + return np.array(samples) + + def mgf(self, t): + """ + Moment generating function. + + :param t: Vector where MGF is evaluated. + :type t: ``numpy.ndarray`` + :return: Moment generating function evaluated at t. + :rtype: ``numpy.float64`` + """ + exponent = np.sum(self.p * np.exp(t)) + return (self.p0 / (1 - exponent)) ** self.x0 diff --git a/gemact/libraries.py b/gemact/libraries.py index 8135e8e..cd435b1 100644 --- a/gemact/libraries.py +++ b/gemact/libraries.py @@ -13,3 +13,5 @@ import timeit from twiggy import quick_setup, log from itertools import groupby +from scipy.special import gammaln +import math as mt