forked from FrPsc/BLOCKING
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathblocking.py
More file actions
195 lines (159 loc) · 7.02 KB
/
blocking.py
File metadata and controls
195 lines (159 loc) · 7.02 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
import numpy as np
from scipy.optimize import minimize
from scipy.stats import gaussian_kde, norm
from block_tools import blocking, blocking_weighted, check, fblocking
class BlockAnalysis:
def __init__(self, x, multi=1, weights=None, weighted_blocking=False,
bias=None, T=None, interval_low=None, interval_up=None, nbins=50, dt=1):
self.multi = multi
self.x = check(x, self.multi)
if weights is not None:
self.w = check(weights, self.multi)
else:
self.w = weights
self.nbins = nbins
self.interval = [self.x.min(), self.x.max()]
if (interval_low is not None) and (self.x.min() < interval_low):
self.interval[0] = interval_low
if (interval_up is not None) and (self.x.max() > interval_up):
self.interval[1] = interval_up
if (self.w is None) and (bias is not None):
bias -= np.max(bias)
self.kbT = 0.008314463*T
self.w = np.exp(bias/(self.kbT))
self.w = check(self.w, self.multi)
if self.w is None:
self.stat = blocking(self.x, self.multi)
self.av = self.x.mean()
self.w = np.full(len(self.x), 1)
elif (self.w is not None) and weighted_blocking:
self.w /= self.w.sum()
self.stat = blocking_weighted(self.x, self.w, self.multi)
else:
self.kbT = 0.008314463*T
self.w /= self.w.sum()
self.stat = fblocking(self.x, self.w, self.kbT, self.multi, self.interval, self.nbins)
self.stat[...,0] /= dt
def SEM(self):
"""
Estimate the standard error of the mean from a blocking analysis.
This method searches for a plateau in ``self.stat`` by finding the block
size whose error bar overlaps with the largest number of error bars at
larger block sizes.
The selected block size is stored in ``self.bs`` and the corresponding SEM estimate is stored in ``self.sem``.
Prints a warning if the selected block size is close to the largest
available block size, suggesting that the SEM plateau may not have been
reached.
"""
def find_n_intersect(x,stat):
c=0
for i,p in enumerate(stat):
if (x <= p[1]+p[2]) and (x >= p[1]-p[2]):
c += 1
#c += norm(p[1],p[2]).pdf(x)
return -c
c = np.zeros(len(self.stat))
for i,b in enumerate(self.stat):
lower_bound = b[1]-b[2]
upper_bound = b[1]+b[2]
bnds = [(lower_bound, upper_bound)]
c[i] -= minimize( fun=find_n_intersect, x0=b[1], args=self.stat[self.stat[...,0] > b[0]], bounds=bnds ).fun
self.bs = self.stat[...,0][np.argmax(c)]
self.sem = self.stat[...,1][np.argmax(c)]
if self.bs > self.stat[-1,0]/3:
print('WARNING: fixed point of the error may have not been reached!')
def get_pdf(self, cv=None, Nb=None):
"""
Estimate the weighted probability density function and its block error.
A Gaussian kernel density estimate is computed over ``self.interval`` using
the trajectory weights ``self.w``. The uncertainty is estimated from the
fluctuations of block-wise KDEs around the full-trajectory KDE.
Parameters
----------
cv : array-like, optional
Collective-variable values to use for the full KDE estimate. If not
provided, ``self.x`` is used. The input is passed through ``check`` with
``self.multi`` before use.
Nb : int, optional
Number of blocks to use for the error estimate. If not provided, this is
estimated as ``len(self.x) / self.bs``, where ``self.bs`` is the selected
block size from the blocking analysis.
Returns
-------
x : ndarray
Grid points where the density is evaluated.
u : ndarray
Weighted KDE estimate of the probability density on ``x``.
e : ndarray
Estimated standard error of the density on ``x``.
"""
min_ = self.interval[0]
max_ = self.interval[1]
x = np.linspace( min_, max_, num = 100 )
if cv is not None:
cv = check(cv, self.multi)
u = gaussian_kde( cv, bw_method = "silverman", weights = self.w ).evaluate(x)
else:
u = gaussian_kde( self.x, bw_method = "silverman", weights = self.w ).evaluate(x)
N = int(len(self.x))
if Nb is None:
Nb = int(N / self.bs)
print (f'working with: Nb={Nb} blocks of size {self.bs} for error estimation')
W = self.w.sum()
S = (self.w**2).sum()
blocks_pi = []
for n in range(1, Nb+1):
end = int( self.bs * n )
start = int( end - self.bs )
pdf_i = gaussian_kde( self.x[start:end], bw_method = "silverman", weights = self.w[start:end] ).evaluate(x)
wi = self.w[start:end].sum()
blocks_pi.append( wi*(pdf_i-u)**2 )
blocks_pi = np.array(blocks_pi)
e = np.sqrt( blocks_pi.sum(axis=0) / (Nb*(W-S/W)) )
return x, u, e
def get_fes(self, maxkj=25, cv=None, Nb=None):
"""
Estimate the free-energy surface and its uncertainty from the weighted PDF.
The free energy is computed from the probability density as
F(x) = -kBT * log(P(x))
and shifted so that its minimum is zero. The PDF error from ``get_pdf`` is
propagated to a free-energy error using linear error propagation.
Parameters
----------
maxkj : float, optional
Maximum free-energy value to return. Points with ``F >= maxkj`` are
discarded. Default is 25.
cv : array-like, optional
Collective-variable values to use for the PDF estimate. If not provided,
``self.x`` is used.
Nb : int, optional
Number of blocks to use for the error estimate. If not provided,
``get_pdf`` chooses it from ``self.bs``.
Returns
-------
x : ndarray
Grid points retained after applying the ``maxkj`` cutoff.
F : ndarray
Free-energy profile shifted so that ``F.min() == 0``.
FE : ndarray
Estimated standard error of the free energy at each retained grid point.
"""
if cv is not None:
x, H, E = self.get_pdf(cv, Nb=Nb)
else:
x, H, E = self.get_pdf(Nb=Nb)
F = -self.kbT * np.log(H)
FE = self.kbT * E / H
F -= F.min()
maxkj_ndx = np.where(F<maxkj)
return x[maxkj_ndx], F[maxkj_ndx], FE[maxkj_ndx]
def get_av_err(self, cv=None):
if cv is not None:
x, H, E = self.get_pdf(cv)
else:
x, H, E = self.get_pdf()
H /= H.sum()
E /= H.sum()
av = np.average(x, weights=H)
err = np.sqrt((x**2*E**2).sum())
return av, err