-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathGaussianInference.py
More file actions
135 lines (114 loc) · 4.8 KB
/
GaussianInference.py
File metadata and controls
135 lines (114 loc) · 4.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
#! /usr/bin/env python
"""
Author: Oren Freifeld
Email: orenfr@cs.bgu.ac.il
"""
import numpy as np
from scipy.stats import invwishart
class GaussianInference(object):
def __init__(self,Psi,m,kappa,nu):
self.n=len(m)
self.Psi=Psi
self.m=m
self.kappa=kappa
self.nu=nu
self.data_dim = data_dim = len(m)
if Psi.shape != (data_dim,data_dim):
raise ValueError(Psi.shape ,'!=', (data_dim,data_dim))
self.iw_prior = invwishart(df=nu,scale=Psi*nu)
# iw_prior_mean = iw_prior.mean()
# iw_prior_mode = iw_prior.mode()
def calc_posterior_hyper_params(self,N,T1,T2):
nu=self.nu
kappa=self.kappa
m=self.m
Psi=self.Psi
nu_posterior = nu+N
kappa_posterior = kappa + N
if m.shape != (self.n,):
raise ValueError
if Psi.shape != (self.n,self.n):
raise ValueError
data_dim = self.data_dim
if len(T1)!= data_dim:
raise ValueError(len(T1),'!=',data_dim)
m_posterior = 1.0/kappa_posterior* (kappa*m + T1)
Psi_posterior = 1.0/nu_posterior*(nu * Psi + kappa*np.outer(m,m) \
-kappa_posterior*np.outer(m_posterior,m_posterior)+ T2)
if m_posterior.shape != (self.n,):
raise ValueError
if Psi_posterior.shape != (self.n,self.n):
raise ValueError
self.kappa_posterior=kappa_posterior
self.nu_posterior=nu_posterior
self.Psi_posterior=Psi_posterior
self.m_posterior=m_posterior
n=self.n
if Psi_posterior.shape != (n,n):
raise ValueError(Psi_posterior.shape,n)
self.iw_posterior = invwishart(df=nu_posterior,scale=Psi_posterior*nu_posterior)
def sample_from_the_iw_prior(self,nSamples):
return self.iw_prior.rvs(size=nSamples)
def sample_from_the_iw_posterior(self,nSamples):
n=self.n
result = self.iw_posterior.rvs(size=nSamples)
if result.shape != (nSamples,n,n):
if nSamples==1:
raise NotImplementedError
raise ValueError(result.shape,nSamples)
return result
def single_sample_from_the_iw_posterior(self):
return self.iw_posterior.rvs(size=1)
def sample_from_the_niw_prior(self,nSamples):
n=self.n
kappa=self.kappa
m=self.m
samples_from_the_iw_prior = self.sample_from_the_iw_prior(nSamples)
samples_from_the_niw_prior = []
for i in range(nSamples):
Sigma_sample = samples_from_the_iw_prior[i]
if Sigma_sample.shape != (n,n):
raise ValueError(Sigma_sample.shape)
try:
mu_sample = np.random.multivariate_normal(mean=m,cov=
1./kappa*Sigma_sample)
except:
raise ValueError(Sigma_sample)
samples_from_the_niw_prior.append([mu_sample,Sigma_sample])
return samples_from_the_niw_prior
def sample_from_the_niw_posterior(self,nSamples):
n=self.n
kappa_posterior=self.kappa_posterior
m_posterior=self.m_posterior
samples_from_the_iw_posterior = self.sample_from_the_iw_posterior(nSamples)
samples_from_the_niw_posterior = []
for i in range(nSamples):
Sigma_sample = samples_from_the_iw_posterior[i]
if Sigma_sample.shape != (n,n):
raise ValueError(Sigma_sample.shape)
try:
mu_sample = np.random.multivariate_normal(mean=m_posterior,cov=
1./kappa_posterior*Sigma_sample)
except:
raise ValueError(Sigma_sample)
samples_from_the_niw_posterior.append([mu_sample,Sigma_sample])
return samples_from_the_niw_posterior
def single_sample_from_the_niw_posterior(self):
n=self.n
kappa_posterior=self.kappa_posterior
m_posterior=self.m_posterior
Sigma_sample = self.single_sample_from_the_iw_posterior()
mu_sample = np.random.multivariate_normal(mean=m_posterior,cov=
1./kappa_posterior*Sigma_sample)
return [mu_sample,Sigma_sample]
def calc_suff_stats(self,x):
T1 = x.sum(axis=1) # T1 is a vector of length d
T2 = x.dot(x.T) # T2 is dxd
if len(T1)!=self.data_dim:
raise ValueError(len(T1),'!=',self.data_dim)
return T1,T2
@staticmethod
def calc_mle(T1,T2,N):
mu = T1/N
Sigma = T2/N - np.outer(mu,mu)
return mu,Sigma