From 49e2c036ed362614c03ff9fe12a2b6417ffbd45a Mon Sep 17 00:00:00 2001 From: Davide Ruffini <90931636+Davide-Ruffini@users.noreply.github.com> Date: Mon, 10 Feb 2025 22:49:49 +0100 Subject: [PATCH 1/7] Negative Multinomial and Dirichlet Multinomial --- gemact/distributions.py | 299 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 299 insertions(+) diff --git a/gemact/distributions.py b/gemact/distributions.py index 6cbb75c..2920d31 100644 --- a/gemact/distributions.py +++ b/gemact/distributions.py @@ -6326,4 +6326,303 @@ def logpmf(self, x): return stats.multinomial.logpmf(n=self.n, p=self.p, x=x) + +class Dirichlet_Multinomial(_MultDiscreteDistribution): + """ + Dirichlet Multinomial distribution. + Wrapper to scipy dirichlet multinomial distribution (``scipy.stats.dirichlet_multinomial``). + Refer to :py:class:'~__MultDiscreteDistribution' for additional details. + + + :param seed: Used to set a specific seed (default=np.random.RandomState). + :type seed: int + :param \\**kwargs: + See below + + :Keyword Arguments: + * *alpha* ( ``int`` or ``numpy.ndarray``) -- + Concentration parameters. + * *n* (``int``) -- + Number of trials. + """ + + def __init__(self, seed=None, **kwargs): + _DiscreteDistribution.__init__(self) + self.n = kwargs['n'] + self.alpha = kwargs['alpha'] + self.seed = seed + + @property + def seed(self): + return self.__seed + + @seed.setter + def seed(self, value): + if value is None: + value = np.random.randint(1, 1001) + + hf.assert_type_value(value, 'seed', logger, (float,int)) + value = int(value) + self.__seed = value + + @property + def n(self): + return self.__n + + @n.setter + def n(self, value): + hf.assert_type_value(value, 'n', logger, (float, int), lower_bound=1, lower_close=True) + value = int(value) + self.__n = value + + @property + def alpha(self): + return self.__alpha + + @alpha.setter + def alpha(self, value): + for element in value: + hf.assert_type_value(element, 'alpha', logger, (float, int)) + value = np.array(value) + self.__alpha = value + + @property + def _dist(self): + return stats.dirichlet_multinomial(n=self.n, alpha=self.alpha, seed=self.seed) + + @staticmethod + def name(): + return 'dirichlet multinomial' + + @staticmethod + def category(): + return {'frequency'} + + def cov(self): + """ + Covariance Matrix of a Dirichlet Multinomial Distribution. + + :return: Covariance Matrix. + :rtype: ``float`` + """ + + return stats.dirichlet_multinomial.cov(n=self.n, alpha=self.alpha) + + + def var(self): + """ + Variances of a Dirichlet Multinomial Distribution. + + :return: Array of Variances. + :rtype: numpy.ndarray + """ + + return np.diag(stats.dirichlet_multinomial.cov(n=self.n, alpha=self.alpha)) + + def pmf(self, x): + """ + Probability mass function of the Dirichlet Multinomial Distribution. + + :param x: quantile where probability mass function is evaluated. + :type x: ``int`` + + :return: probability mass function. + :rtype: ``numpy.float64`` or ``numpy.ndarray`` + """ + + if sum(x) != self.n: + raise ValueError("n != sum(x), i.e. one is wrong") + return stats.dirichlet_multinomial.pmf(n=self.n, alpha=self.alpha, x=x) + + def logpmf(self, x): + """ + Natural logarithm of the probability mass function of the Dirichlet Multinomial Distribution. + + :param x: quantile where the (natural) probability mass function logarithm is evaluated. + :type x: ``int`` + :return: natural logarithm of the probability mass function + :rtype: ``numpy.float64`` or ``numpy.ndarray`` + """ + + if sum(x) != self.n: + raise ValueError("n != sum(x), i.e. one is wrong") + return stats.dirichlet_multinomial.logpmf(n=self.n, alpha=self.alpha, x=x) + + + def mean(self): + """ + Mean of the Dirichlet Multinomial Distribution. + + :param x: quantile where the (natural) probability mass function logarithm is evaluated. + :type x: ``int`` + :return: natural logarithm of the probability mass function + :rtype: ``numpy.float64`` or ``numpy.ndarray`` + """ + return stats.dirichlet_multinomial.mean(n=self.n, alpha=self.alpha) + + + def rvs(self, size=1, random_state=None): + """ + Random variates generator function. + + :param size: random variates sample size (default is 1). + :type size: ``int``, optional + :param random_state: random state for the random number generator. + :type random_state: ``int``, optional + :return: random variates. + :rtype: ``numpy.int`` or ``numpy.ndarray`` + + """ + + random_state = hf.handle_random_state(random_state, logger) + np.random.seed(random_state) + hf.assert_type_value(size, 'size', logger, (float, int), lower_bound=1) + size = int(size) + alpha = self.alpha + n = self.n + if isinstance(alpha, np.ndarray) and len(alpha.shape) == 1: + alpha = np.tile(alpha, (size, 1)) + n = np.full(size, n) + G = np.random.gamma(shape=alpha, scale=1.0) + prob = G / np.sum(G, axis=1, keepdims=True) + ridx = np.sum(G, axis=1) == 0 + if np.any(ridx): + for i in np.where(ridx)[0]: + prob[i, :] = np.random.multinomial(1, alpha[i, :] / np.sum(alpha[i, :]), n=1).flatten() + rdm = np.array([np.random.multinomial(n[i], prob[i, :]) for i in range(size)]) + return rdm + + +class NegMultinom(_MultDiscreteDistribution): + """ + Negative Multinomial distribution. + + :param loc: location parameter (default=0), to shift the support of the distribution. + :type loc: ``int``, optional + :param \\**kwargs: + See below + + :Keyword Arguments: + * *b* (``int``) -- + dispersion parameter of the negative binomial distribution. + * *p* (``float``) -- + Probability parameter of the negative binomial distribution. + + """ + + def __init__(self, loc=0, **kwargs): + _DiscreteDistribution.__init__(self) + self.beta = kwargs['beta'] + self.p = kwargs['p'] + self.loc = loc + + @property + def beta(self): + return self.__beta + + @beta.setter + def beta(self, value): + hf.assert_type_value(value, 'beta', logger, (float, int)) + self.__beta = value + + @property + def p(self): + return self.__p + + @p.setter + def p(self, value): + + for element in value: + hf.assert_type_value(element, 'p', logger, (float, np.floating), lower_bound=0, upper_bound=1, lower_close=True, upper_close=True) + value = np.array(value) + + if sum(value) >= 1: + raise ValueError("success probabilities must not be greater than 1.") + + self.__p = value + + @staticmethod + def name(): + return 'negative multinomial' + + @staticmethod + def category(): + return {'frequency'} + + + def pmf(self, x): + + return mt.exp(self.logpmf(x)) + + def logpmf(self, x): + m = np.sum(x, axis=1) + d = x.shape[1] + logl = gammaln(self.beta + m) - gammaln(self.beta) - np.sum(gammaln(x + 1), axis=1) + logl += np.sum(x * np.log(self.p)) + self.beta * np.log(1-np.sum(self.p)) + + return logl + + def cov(self, random_state=None): + """ + Covariance Matrix of a Negative Multinomial Distribution. + :param random_state: random state for the random number generator. + :type random_state: ``int``, optional + :return: Covariance Matrix. + :rtype: ``float`` + """ + + random_state = hf.handle_random_state(random_state, logger) + + return np.cov(self.rvs(size=10000, random_state=random_state).T) + + + def var(self, random_state=None): + """ + Variances of a Negative Multinomial Distribution. + :param random_state: random state for the random number generator. + :type random_state: ``int``, optional + :return: Array of Variances. + :rtype: numpy.ndarray + """ + random_state = hf.handle_random_state(random_state, logger) + + return np.diag(self.cov(random_state=random_state)) + + + def rvs(self, size=1, random_state=None): + """ + Random variates generator function. + + :param size: random variates sample size (default is 1). + :type size: ``int``, optional + :param random_state: random state for the random number generator. + :type random_state: ``int``, optional + :return: random variates. + :rtype: ``numpy.int`` or ``numpy.ndarray`` + + """ + + prob = self.p + beta = self.beta + #n = size + + random_state = hf.handle_random_state(random_state, logger) + np.random.seed(random_state) + + if isinstance(prob, np.ndarray) and len(prob.shape) == 1: + prob = np.tile(prob, (size, 1)) + if np.isscalar(beta): + beta = np.full(size, beta) + else: + raise ValueError("The length of beta doesn't match with the size of prob.") + k = prob.shape[1] + + probbeta = 1 - np.sum(prob, axis=1) + prob = np.hstack((prob, probbeta[:, np.newaxis])) + scale = 1 / probbeta - 1 + + G = np.array([stats.gamma.rvs(a=b, scale=p, size=1)[0] for b, p in zip(beta, scale)]) + lambda_ = prob[:, :k] * (G / (1 - probbeta))[:, np.newaxis] + + return np.array([stats.poisson.rvs(mu=l) for l in lambda_]) From 31296a6d8eae5bdf95a57329a94ebd9a8797bb8e Mon Sep 17 00:00:00 2001 From: Davide Ruffini <90931636+Davide-Ruffini@users.noreply.github.com> Date: Mon, 10 Feb 2025 22:51:24 +0100 Subject: [PATCH 2/7] Update libraries.py --- gemact/libraries.py | 1 + 1 file changed, 1 insertion(+) diff --git a/gemact/libraries.py b/gemact/libraries.py index 8135e8e..e77f792 100644 --- a/gemact/libraries.py +++ b/gemact/libraries.py @@ -13,3 +13,4 @@ import timeit from twiggy import quick_setup, log from itertools import groupby +from scipy.special import gammaln From 432ce8d86e9b9e31bed5874dfce491bf3da89b28 Mon Sep 17 00:00:00 2001 From: Davide Ruffini <90931636+Davide-Ruffini@users.noreply.github.com> Date: Mon, 10 Feb 2025 22:52:45 +0100 Subject: [PATCH 3/7] Update config.py --- gemact/config.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/gemact/config.py b/gemact/config.py index 08e7c9a..050761c 100644 --- a/gemact/config.py +++ b/gemact/config.py @@ -58,5 +58,7 @@ 'pareto2': 'distributions.Pareto2', 'pareto1': 'distributions.Pareto1', 'uniform': 'distributions.Uniform', - 'multinomial': 'distributions.Multinomial' + 'multinomial': 'distributions.Multinomial', + 'dirichlet multinomial' : 'distributions.Dirichlet_Multinomial', + 'negative multinomial' : 'distributions.NegMultinom' } From 706abbf621ad881f7fd1e2161ab43e6b95314775 Mon Sep 17 00:00:00 2001 From: Davide Ruffini <90931636+Davide-Ruffini@users.noreply.github.com> Date: Wed, 12 Feb 2025 23:24:23 +0100 Subject: [PATCH 4/7] Update libraries.py math import added --- gemact/libraries.py | 1 + 1 file changed, 1 insertion(+) diff --git a/gemact/libraries.py b/gemact/libraries.py index e77f792..cd435b1 100644 --- a/gemact/libraries.py +++ b/gemact/libraries.py @@ -14,3 +14,4 @@ from twiggy import quick_setup, log from itertools import groupby from scipy.special import gammaln +import math as mt From a016bddf36a7e92180b85f0eba500fc434ac7829 Mon Sep 17 00:00:00 2001 From: Davide Ruffini <90931636+Davide-Ruffini@users.noreply.github.com> Date: Thu, 13 Feb 2025 22:27:45 +0100 Subject: [PATCH 5/7] Negative and Dirichlet Multinomial Added --- gemact/distributions.py | 17 ++++------------- 1 file changed, 4 insertions(+), 13 deletions(-) diff --git a/gemact/distributions.py b/gemact/distributions.py index 2920d31..6e3bb5e 100644 --- a/gemact/distributions.py +++ b/gemact/distributions.py @@ -6600,29 +6600,20 @@ def rvs(self, size=1, random_state=None): :return: random variates. :rtype: ``numpy.int`` or ``numpy.ndarray`` - """ - + """ prob = self.p beta = self.beta - #n = size random_state = hf.handle_random_state(random_state, logger) np.random.seed(random_state) - if isinstance(prob, np.ndarray) and len(prob.shape) == 1: - prob = np.tile(prob, (size, 1)) - if np.isscalar(beta): - beta = np.full(size, beta) - else: - raise ValueError("The length of beta doesn't match with the size of prob.") + prob = np.tile(prob, (size, 1)) k = prob.shape[1] - - probbeta = 1 - np.sum(prob, axis=1) + probbeta = 1 - np.sum(prob, axis=1) prob = np.hstack((prob, probbeta[:, np.newaxis])) scale = 1 / probbeta - 1 - G = np.array([stats.gamma.rvs(a=b, scale=p, size=1)[0] for b, p in zip(beta, scale)]) + G = stats.gamma.rvs(a=beta, scale=scale, size=size) lambda_ = prob[:, :k] * (G / (1 - probbeta))[:, np.newaxis] return np.array([stats.poisson.rvs(mu=l) for l in lambda_]) - From be94e1d01cf2b371e391238fbe3514d06762685a Mon Sep 17 00:00:00 2001 From: Davide Ruffini <90931636+Davide-Ruffini@users.noreply.github.com> Date: Sun, 4 May 2025 11:49:15 +0200 Subject: [PATCH 6/7] Update Negative Multinomial --- gemact/distributions.py | 201 +++++++++++++++++++++++++--------------- 1 file changed, 126 insertions(+), 75 deletions(-) diff --git a/gemact/distributions.py b/gemact/distributions.py index 6e3bb5e..9b3bf9c 100644 --- a/gemact/distributions.py +++ b/gemact/distributions.py @@ -6494,126 +6494,177 @@ def rvs(self, size=1, random_state=None): class NegMultinom(_MultDiscreteDistribution): + """ Negative Multinomial distribution. - - :param loc: location parameter (default=0), to shift the support of the distribution. + + :param loc: Location parameter to shift the support (default=0). :type loc: ``int``, optional + :param \\**kwargs: See below - + :Keyword Arguments: - * *b* (``int``) -- - dispersion parameter of the negative binomial distribution. + * *x0* (``int``) -- + Size parameter of the negative multinomial distribution. * *p* (``float``) -- - Probability parameter of the negative binomial distribution. + Probability parameter of the negative multinomial distribution. """ - - def __init__(self, loc=0, **kwargs): + + def __init__(self, x0, p, loc=0): _DiscreteDistribution.__init__(self) - self.beta = kwargs['beta'] - self.p = kwargs['p'] + self.x0 = x0 + self.p = p self.loc = loc @property - def beta(self): - return self.__beta - - @beta.setter - def beta(self, value): - hf.assert_type_value(value, 'beta', logger, (float, int)) - self.__beta = value - + def x0(self): + return self.__x0 + + @x0.setter + def x0(self, value): + hf.assert_type_value(value, 'x0', logger, (float, int), lower_bound=0, lower_close=False) + self.__x0 = value + @property def p(self): return self.__p - + @p.setter def p(self, value): - - for element in value: - hf.assert_type_value(element, 'p', logger, (float, np.floating), lower_bound=0, upper_bound=1, lower_close=True, upper_close=True) value = np.array(value) + for element in value: + hf.assert_type_value( + element, 'p', logger, (float, np.floating), + lower_bound=0, upper_bound=1, lower_close=True, upper_close=True + ) - if sum(value) >= 1: - raise ValueError("success probabilities must not be greater than 1.") - + if np.sum(value) >= 1: + raise ValueError("Sum of success probabilities must be less than 1") + self.__p = value - + self.__p0 = 1 - np.sum(value) # Failure probability + + @property + def p0(self): + """Probability of failure (computed as 1 - sum(p))""" + return self.__p0 + @staticmethod def name(): return 'negative multinomial' - + @staticmethod def category(): return {'frequency'} - def pmf(self, x): + """ + Probability mass function. + + PMF formula from reference: + Γ(∑x_i) * p0^x0 / Γ(x0) * ∏(p_i^x_i / x_i!) + + :param x: Quantile where PMF is evaluated. + :type x: ``numpy.ndarray`` + :return: Probability mass function evaluated at x. + :rtype: ``numpy.float64`` + """ + x = np.array(x) + x0 = self.x0 + + gamma_term = (special.gamma(np.sum(x)+x0) / special.gamma(x0)) + prob_term = (self.p0 ** x0) * np.prod((self.p ** x) / special.factorial(x)) + + return gamma_term * prob_term - return mt.exp(self.logpmf(x)) - - def logpmf(self, x): - m = np.sum(x, axis=1) - d = x.shape[1] - logl = gammaln(self.beta + m) - gammaln(self.beta) - np.sum(gammaln(x + 1), axis=1) - logl += np.sum(x * np.log(self.p)) + self.beta * np.log(1-np.sum(self.p)) - return logl - - def cov(self, random_state=None): - """ - Covariance Matrix of a Negative Multinomial Distribution. - :param random_state: random state for the random number generator. - :type random_state: ``int``, optional - :return: Covariance Matrix. - :rtype: ``float`` + def logpmf(self, x): """ + Natural logarithm of the probability mass function. - random_state = hf.handle_random_state(random_state, logger) + :param x: Quantile where log-PMF is evaluated. + :type x: ``numpy.ndarray`` + :return: Log of probability mass function evaluated at x. + :rtype: ``numpy.float64`` + """ + return np.log(self.pmf(x)) - return np.cov(self.rvs(size=10000, random_state=random_state).T) - + def mean(self): + """ + Mean vector of the distribution. + + :return: Mean vector. + :rtype: ``numpy.ndarray`` + """ + return (self.x0 / self.p0) * self.p - def var(self, random_state=None): - """ + def var(self): + """ Variances of a Negative Multinomial Distribution. - :param random_state: random state for the random number generator. - :type random_state: ``int``, optional + :return: Array of Variances. - :rtype: numpy.ndarray + :rtype: ``numpy.ndarray`` """ - random_state = hf.handle_random_state(random_state, logger) + return (self.x0 / self.p0**2) * self.p**2 + (self.x0 / self.p0) * self.p + + def cov(self): + """ + Covariance matrix of a Negative Multinomial Distribution. - return np.diag(self.cov(random_state=random_state)) - + :return: Covariance matrix. + :rtype: ``numpy.ndarray`` + """ + p = self.p + x0 = self.x0 + p0 = self.p0 + + # Diagonal terms + diag = (x0 / p0**2) * p**2 + (x0 / p0) * p + + # Off-diagonal terms + off_diag = (x0 / p0**2) * np.outer(p, p) + + # Create full covariance matrix + cov_matrix = np.diag(diag) + off_diag - np.diag(np.diag(off_diag)) + + return cov_matrix def rvs(self, size=1, random_state=None): """ Random variates generator function. - - :param size: random variates sample size (default is 1). - :type size: ``int``, optional - :param random_state: random state for the random number generator. - :type random_state: ``int``, optional - :return: random variates. - :rtype: ``numpy.int`` or ``numpy.ndarray`` - - """ - prob = self.p - beta = self.beta + :param size: Number of random variates to generate (default=1). + :type size: ``int`` + :param random_state: Random state for reproducibility. + :type random_state: ``int``, optional + :return: Random variates. + :rtype: ``numpy.ndarray`` + """ random_state = hf.handle_random_state(random_state, logger) np.random.seed(random_state) - - prob = np.tile(prob, (size, 1)) - k = prob.shape[1] - probbeta = 1 - np.sum(prob, axis=1) - prob = np.hstack((prob, probbeta[:, np.newaxis])) - scale = 1 / probbeta - 1 - G = stats.gamma.rvs(a=beta, scale=scale, size=size) - lambda_ = prob[:, :k] * (G / (1 - probbeta))[:, np.newaxis] + samples = [] + for _ in range(size): + total = np.random.negative_binomial(self.x0, self.p0) + if total > 0: + counts = np.random.multinomial(total, self.p / (1 - self.p0)) + else: + counts = np.zeros_like(self.p) + samples.append(counts) + + return np.array(samples) + + def mgf(self, t): + """ + Moment generating function. - return np.array([stats.poisson.rvs(mu=l) for l in lambda_]) + :param t: Vector where MGF is evaluated. + :type t: ``numpy.ndarray`` + :return: Moment generating function evaluated at t. + :rtype: ``numpy.float64`` + """ + exponent = np.sum(self.p * np.exp(t)) + return (self.p0 / (1 - exponent)) ** self.x0 + From ce43f4727b9982f01a05ea88523ec1d82e458050 Mon Sep 17 00:00:00 2001 From: Davide Ruffini <90931636+Davide-Ruffini@users.noreply.github.com> Date: Mon, 19 May 2025 21:09:38 +0200 Subject: [PATCH 7/7] Update distributions.py categories of multivariate distributions updated --- gemact/distributions.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/gemact/distributions.py b/gemact/distributions.py index 9b3bf9c..a3226b1 100644 --- a/gemact/distributions.py +++ b/gemact/distributions.py @@ -6261,7 +6261,7 @@ def name(): @staticmethod def category(): - return {'frequency'} + return {''} def cov(self): @@ -6396,7 +6396,7 @@ def name(): @staticmethod def category(): - return {'frequency'} + return {''} def cov(self): """ @@ -6557,7 +6557,7 @@ def name(): @staticmethod def category(): - return {'frequency'} + return {''} def pmf(self, x): """