First of all, thank you for making this wonderful toolbox. It alleviates researchers a lot of burden from building dynamic decision models. I am trying to use the racing diffusion model in your toolbox. As a pymc3 beginner, I copied the component code in glambox and added a bias/starting point parameter so that different accumulators can have different starting points. However, when I tried to fit the simulated data, the sampling procedure can not be properly initialized, could you check the modification and help me solve this problem? Thank you very much.
import theano.tensor as tt
import numpy as np
import pymc3 as pm
def tt_normal_cdf(x, mu=0, sd=1):
"""
Normal cumulative distribution function
Theano tensor implementation
"""
return (0.5 + 0.5 * tt.erf((x - mu) / (sd * tt.sqrt(2.))))
def tt_wienerpos_fpt_pdf(t, drift, noise, boundary):
"""
Probability density function of first passage times of
Wiener process with positive drift towards constant boundary.
Theano tensor implementation
Cf https://en.wikipedia.org/wiki/Inverse_Gaussian_distribution#Relationship_with_Brownian_motion
"""
mu = boundary / drift
lam = (boundary**2 / noise**2)
return ((lam[:,None] / (2 * np.pi * t**3))**0.5 * tt.exp(
(-lam[:,None] * (t - mu[:,None])**2) / (2 * mu[:,None]**2 * t)))
def tt_wienerpos_fpt_cdf(t, drift, noise, boundary, numerical_stability=100):
"""
Cumulative distribution function of first passage times of
Wiener process with positive drift towards constant boundary.
Theano tensor implementation
Cf https://en.wikipedia.org/wiki/Inverse_Gaussian_distribution#Relationship_with_Brownian_motion
"""
mu = boundary / drift
lam = (boundary / noise)**2
bounded_ratio = tt.where(lam / mu >= numerical_stability,
numerical_stability, lam / mu)
return (tt_normal_cdf(tt.sqrt(lam[:,None] / t) * (t / mu[:,None] - 1)) +
tt.exp(2 * bounded_ratio[:,None]) * tt_normal_cdf(-(tt.sqrt(lam[:,None] / t) * (t / mu[:,None] + 1))))
def tt_wienerrace_pdf(t, drift, noise, boundary,starting, t0, zerotol=1e-14):
"""
Probability density function of first passage times of
a race between multiple Wiener processes with positive drift
towards a constant boundary.
Theano tensor implementation
"""
# Nondecision time T0
t = t - t0
t = tt.where(t <= 0, 0, t)
# boundary parameter
k = boundary * (1-starting)
# first passage time densities, single Wiener accumulators
f = tt_wienerpos_fpt_pdf(t, drift, noise, k)
# first passage time distributions, single Wiener accumulators
F = tt_wienerpos_fpt_cdf(t, drift, noise, k)
# survival functions
S = 1. - F
# race densities
# Note: drifts should be sorted so that chosen item drift is in first column
l = f[:, 0] * tt.prod(S[:, 1:], axis=1)
return l
def contaminant_lf(upper,lower):
"""
:param upper: upper bound of RT
:param lower: lower bound of RT
"""
return 1/(upper-lower)
def tt_wienerrace_con_pdf(t, drift, noise, boundary,starting, t0,upper,lower,p):
"""
:param p: proportion of contaminant data
:return: likelihood function of racing diffusion model with contaminant data
"""
return (1-p)*tt_wienerrace_pdf(t, drift, noise, boundary,starting, t0) + p*contaminant_lf(upper,lower)
def tt_wienerrace_llf(x, drift, noise, boundary,starting, t0,upper,lower,p):
return tt.sum(tt.log(tt_wienerrace_con_pdf(x, drift, noise, boundary,starting, t0,upper,lower,p)))
def wienerrace_rng(v,z,A,ndt,noise,upper,lower,p,size):
"""
racing diffusion model random generator
v: vector for drift rates
z: vecctor for starting bias
A: boundary
ndt: non-decision time
noise: scaling parameter for wiener process
upper: upper bound of RT
lower: lower bound of RT
p: contaminant proportion
size: simulation number
"""
option_num = len(v)
v = np.array(v)
z = np.array(z)
k = A*(1 - z)
mu = k / v
lam = (k**2 / noise**2)
rt_raw = np.zeros([option_num,size])
choice = np.zeros([size])
for n in range(option_num):
rt_raw[n,:] = np.random.wald(mu[n],lam[n],size)
for i in range(size):
choice[i] = np.where(rt_raw[:,i]==np.min(rt_raw[:,i]))[0][0]
rt_raw = rt_raw.T + ndt
rt = rt_raw[np.arange(size),choice.astype('int32')]
## adding contanminent
p_sample = np.random.uniform(0,1,size)
cri = p_sample<p
rt[cri] = np.random.uniform(lower,upper,len(cri[cri==True]))
choice[cri] = np.random.binomial(size=len(cri[cri==True]),n=option_num-1,p=1/option_num)
return rt,choice
class rdm_wfpm(pm.Continuous):
def __init__(self,drift, noise, boundary,starting, t0,upper,lower,p,**kwargs):
self.drift = tt.as_tensor_variable(pm.floatX(drift))
self.noise = tt.as_tensor_variable(pm.floatX(noise))
self.boundary = tt.as_tensor_variable(pm.floatX(boundary))
self.starting = tt.as_tensor_variable(pm.floatX(starting))
self.t0 = tt.as_tensor_variable(pm.floatX(t0))
self.upper = tt.as_tensor_variable(pm.floatX(upper))
self.lower = tt.as_tensor_variable(pm.floatX(lower))
self.p = tt.as_tensor_variable(pm.floatX(p))
super().__init__(**kwargs)
def logp(self,value):
return tt_wienerrace_llf(value,
drift=self.drift,
noise=self.noise,
boundary=self.boundary,
starting=self.starting,
t0=self.t0,
upper=self.upper,
lower=self.lower,
p=self.p)
#simulate data
rt,choice = wienerrace_rng(v=[1,2],z=[0,0],A=3,ndt=0.4,noise=1,upper=5,lower=0.1,p=0.05,size=200)
data =np.array([rt,choice])
#classify the simulated data
right_data = data[:,data[1,:]==1]
left_data = data[:,data[1,:]==0]
#build pymc3 model
with pm.Model() as rdm:
v1 = pm.Normal('drift_1',mu=0,sigma=1,shape=1,testval = 0)
z1 = pm.Normal('bias_1',0,1,shape=1,testval = 0)
v2 = pm.Deterministic('v2',pm.math.exp(pm.Normal('drift_2',mu=0,sigma=1,shape=1,testval = 0)))
z2 = pm.Normal('bias_2',mu=0,sigma=1,shape=1)
A = pm.Deterministic('boundary_',pm.math.exp(pm.Normal('boundary',mu=0,sigma=1,shape=1,testval = 0)))
z1_ = pm.Deterministic('bias_1_',pm.math.log(z1))
z2_ = pm.Deterministic('bias_2_',pm.math.log(z2))
ndt = pm.Normal('ndt',0,1,shape=1,testval=-5)
ndt_ = pm.Deterministic('ndt_',pm.math.exp(ndt))
p = pm.Normal('p',0,shape=1)
p_tran = pm.Deterministic('p_',pm.math.log(p))
left_drift_vector = pm.Deterministic('left_drift',pm.math.concatenate((v1,v2)))
right_drift_vector = pm.Deterministic('right_drift',pm.math.concatenate((v2,v1)))
left_z = pm.Deterministic('left_z',pm.math.concatenate((z1_,z2_)))
right_z = pm.Deterministic('right_z',pm.math.concatenate((z2_,z1_)))
y_left = rdm_wfpm(name='x_left',drift=left_drift_vector, noise=1, boundary=A,starting=left_z, t0=ndt_,upper=5,lower=0.1,p=p_tran,observed=left_data)
y_right = rdm_wfpm(name='y_left',drift=right_drift_vector, noise=1, boundary=A,starting=right_z, t0=ndt_,upper=5,lower=0.1,p=p_tran,observed=right_data)
#sampling
with rdm:
trace = pm.sample()
Dear glambox developers,
First of all, thank you for making this wonderful toolbox. It alleviates researchers a lot of burden from building dynamic decision models. I am trying to use the racing diffusion model in your toolbox. As a pymc3 beginner, I copied the component code in glambox and added a bias/starting point parameter so that different accumulators can have different starting points. However, when I tried to fit the simulated data, the sampling procedure can not be properly initialized, could you check the modification and help me solve this problem? Thank you very much.
And here is the error: