-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathperformance_measures.py
More file actions
154 lines (120 loc) · 4.87 KB
/
performance_measures.py
File metadata and controls
154 lines (120 loc) · 4.87 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
#%%
# Main source = Deep hit model, 'https://github.com/chl8856/DeepHit'
#%%
'''
This provide time-dependent Concordance index and Brier Score:
- Use weighted_c_index and weighted_brier_score, which are the unbiased estimates.
See equations and descriptions eq. (11) and (12) of the following paper:
- C. Lee, W. R. Zame, A. Alaa, M. van der Schaar, "Temporal Quilting for Survival Analysis", AISTATS 2019
'''
import numpy as np
from lifelines import KaplanMeierFitter
def log_partial_lik(prediction, targets):
risk = prediction
E = targets
hazard_ratio = np.exp(risk)
log_risk = np.log(np.cumsum(hazard_ratio, axis=0))
uncensored_likelihood = risk - log_risk
censored_likelihood = uncensored_likelihood.transpose(0, 1) * E
num_observed_events = np.sum(E)
partial_likelihood = np.sum(censored_likelihood) / num_observed_events # log partial likelihood
return partial_likelihood
### C(t)-INDEX CALCULATION
def c_index(Prediction, Time_survival, Death, Time):
'''
This is a cause-specific c(t)-index
- Prediction : risk at Time (higher --> more risky)
- Time_survival : survival/censoring time
- Death :
> 1: death
> 0: censored (including death from other cause)
- Time : time of evaluation (time-horizon when evaluating C-index)
'''
N = len(Prediction)
A = np.zeros((N,N))
Q = np.zeros((N,N))
N_t = np.zeros((N,N))
Num = 0
Den = 0
for i in range(N):
A[i, np.where(Time_survival[i] < Time_survival)] = 1
Q[i, np.where(Prediction[i] > Prediction)] = 1
if (Time_survival[i]<=Time and Death[i]==1):
N_t[i,:] = 1
Num = np.sum(((A)*N_t)*Q)
Den = np.sum((A)*N_t)
if Num == 0 and Den == 0:
result = -1 # not able to compute c-index!
else:
result = float(Num/Den)
return result
### BRIER-SCORE
def brier_score(Prediction, Time_survival, Death, Time):
N = len(Prediction)
y_true = ((Time_survival <= Time) * Death).astype(float)
return np.mean((Prediction - y_true)**2)
# result2[k, t] = brier_score_loss(risk[:, k], ((te_time[:,0] <= eval_horizon) * (te_label[:,0] == k+1)).astype(int))
##### WEIGHTED C-INDEX & BRIER-SCORE
def CensoringProb(Y, T):
T = T.reshape([-1]) # (N,) - np array
Y = Y.reshape([-1]) # (N,) - np array
kmf = KaplanMeierFitter()
kmf.fit(T, event_observed=(Y==0).astype(int)) # censoring prob = survival probability of event "censoring"
G = np.asarray(kmf.survival_function_.reset_index()).transpose()
G[1, G[1, :] == 0] = G[1, G[1, :] != 0][-1] #fill 0 with ZoH (to prevent nan values)
return G
### C(t)-INDEX CALCULATION: this account for the weighted average for unbaised estimation
def weighted_c_index(T_train, Y_train, Prediction, T_test, Y_test, Time):
'''
This is a cause-specific c(t)-index
- Prediction : risk at Time (higher --> more risky)
- Time_survival : survival/censoring time
- Death :
> 1: death
> 0: censored (including death from other cause)
- Time : time of evaluation (time-horizon when evaluating C-index)
'''
G = CensoringProb(Y_train, T_train)
N = len(Prediction)
A = np.zeros((N,N))
Q = np.zeros((N,N))
N_t = np.zeros((N,N))
Num = 0
Den = 0
for i in range(N):
tmp_idx = np.where(G[0,:] >= T_test[i])[0]
if len(tmp_idx) == 0:
W = (1./G[1, -1])**2
else:
W = (1./G[1, tmp_idx[0]])**2
A[i, np.where(T_test[i] < T_test)] = 1. * W
Q[i, np.where(Prediction[i] > Prediction)] = 1. # give weights
if (T_test[i]<=Time and Y_test[i]==1):
N_t[i,:] = 1.
Num = np.sum(((A)*N_t)*Q)
Den = np.sum((A)*N_t)
if Num == 0 and Den == 0:
result = -1 # not able to compute c-index!
else:
result = float(Num/Den)
return result
# this account for the weighted average for unbaised estimation
def weighted_brier_score(T_train, Y_train, Prediction, T_test, Y_test, Time):
G = CensoringProb(Y_train, T_train)
N = len(Prediction)
W = np.zeros(len(Y_test))
Y_tilde = (T_test > Time).astype(float)
for i in range(N):
tmp_idx1 = np.where(G[0,:] >= T_test[i])[0]
tmp_idx2 = np.where(G[0,:] >= Time)[0]
if len(tmp_idx1) == 0:
G1 = G[1, -1]
else:
G1 = G[1, tmp_idx1[0]]
if len(tmp_idx2) == 0:
G2 = G[1, -1]
else:
G2 = G[1, tmp_idx2[0]]
W[i] = (1. - Y_tilde[i])*float(Y_test[i])/G1 + Y_tilde[i]/G2
y_true = ((T_test <= Time) * Y_test).astype(float)
return np.mean(W*(Y_tilde - (1.-Prediction))**2)