-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathKappaPowerAnalysis.py
More file actions
211 lines (178 loc) · 7.51 KB
/
KappaPowerAnalysis.py
File metadata and controls
211 lines (178 loc) · 7.51 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
# -*- coding: utf-8 -*-
"""
Created on Wed Oct 22 21:57:41 2025
@author: azamb
"""
# ===========================================================
# Power analysis for Cohen's κ with two coders (binary labels)
# - Uses your coders' prevalences (p1, p2)
# - Samples WITHOUT replacement when drawing a sample of "lines"
# - Finds the minimal sample size such that frac( κ ≥ threshold ) < TARGET_PROB
# ===========================================================
from typing import Tuple, Optional
import numpy as np
# ===========================================================
# 🔧 USER PARAMETERS — edit these and just press Run
# ===========================================================
POPULATION_KAPPA = 0.60 # true (population) κ you want to assume
SAMPLE_THRESHOLD = 0.80 # target sample κ threshold to "beat"
POPULATION_SIZE = 1_000_000 # number of synthetic cases in population (total number of lines)
#Set base rates from pilot data (percentage of label "1" each coder tends to assign)
PREV_R1 = 0.5 # Base rate, P(Y1 = 1) for coder 1
PREV_R2 = 0.5 # Base rate, P(Y2 = 1) for coder 2
TARGET_PROB = 0.01 # % of simulations where population kappa is below the threshold
#stop when frac(κ ≥ threshold) < this value
#---------------------------------------------
ITERATIONS = 10_000 # Monte Carlo runs per sample size
START_LINES = 20 # starting sample size
STEP_LINES = 20 # increment per iteration
MAX_LINES = 10_000 # upper bound of sample size
SEED = 420 # RNG seed for reproducibility (set None for non-deterministic)
# ===========================================================
# --------------------------
# Cohen's kappa (k >= 2)
# --------------------------
def cohen_kappa(y1: np.ndarray, y2: np.ndarray) -> float:
if y1.shape != y2.shape:
raise ValueError("y1 and y2 must have the same shape")
labels = np.unique(np.concatenate([np.unique(y1), np.unique(y2)]))
mapping = {lab: i for i, lab in enumerate(labels)}
y1i = np.vectorize(mapping.get)(y1)
y2i = np.vectorize(mapping.get)(y2)
k = len(labels)
cm = np.zeros((k, k), dtype=np.int64)
for a, b in zip(y1i, y2i):
cm[a, b] += 1
n = cm.sum()
if n == 0:
return np.nan
po = np.trace(cm) / n
row_m = cm.sum(axis=1) / n
col_m = cm.sum(axis=0) / n
pe = float(np.dot(row_m, col_m))
denom = 1.0 - pe
if denom == 0.0:
return np.nan
return (po - pe) / denom
# ---------------------------------------------------
# Build binary population with target κ_pop and
# specified coder prevalences p1, p2 (two coders)
# ---------------------------------------------------
def binary_population_with_kappa_marginals(
n: int,
kappa_pop: float,
p1: float,
p2: float,
seed: Optional[int] = None
) -> Tuple[np.ndarray, np.ndarray]:
"""
Create a binary (0/1) population (r1, r2) for two coders with:
- coder 1 prevalence P(Y1=1)=p1
- coder 2 prevalence P(Y2=1)=p2
- target population kappa ≈ kappa_pop (exact if feasible)
Feasibility: with given marginals, observed agreement p_o must lie in:
p_o ∈ [ |1 - (p1 + p2)| , 1 - |p1 - p2| ].
We compute p_e = p1*p2 + (1-p1)*(1-p2), then set p_o = κ*(1-p_e) + p_e,
and check feasibility. If infeasible, raise a ValueError.
"""
if not (0.0 <= p1 <= 1.0 and 0.0 <= p2 <= 1.0):
raise ValueError("p1 and p2 must be in [0, 1].")
pe = p1 * p2 + (1 - p1) * (1 - p2)
po = kappa_pop * (1 - pe) + pe # desired observed agreement
po_min = abs(1 - (p1 + p2))
po_max = 1 - abs(p1 - p2)
if not (po_min - 1e-12 <= po <= po_max + 1e-12):
raise ValueError(
f"Requested kappa={kappa_pop:.3f} is infeasible for marginals "
f"p1={p1:.3f}, p2={p2:.3f}. Feasible observed-agreement range is "
f"[{po_min:.6f}, {po_max:.6f}], but required p_o={po:.6f}."
)
# Solve the 2x2 joint table via q11 = x:
# q10 = p1 - x, q01 = p2 - x, q00 = 1 - p1 - p2 + x,
# and p_o = q11 + q00 = 1 - p1 - p2 + 2x => x = (p_o - 1 + p1 + p2)/2
x = (po - 1 + p1 + p2) / 2.0
q11 = x
q10 = p1 - x
q01 = p2 - x
q00 = 1 - p1 - p2 + x
probs = np.array([q00, q01, q10, q11], dtype=float)
probs = np.clip(probs, 0.0, 1.0)
probs /= probs.sum()
rng = np.random.default_rng(seed)
# Map indices to pairs: 0->(0,0), 1->(0,1), 2->(1,0), 3->(1,1)
choices = rng.choice(4, size=n, p=probs)
r1 = np.where((choices == 2) | (choices == 3), 1, 0)
r2 = np.where((choices == 1) | (choices == 3), 1, 0)
return r1.astype(np.int8), r2.astype(np.int8)
# --------------------------------------
# Estimate frac(κ ≥ threshold) by MC,
# sampling WITHOUT replacement
# --------------------------------------
def estimate_frac_ge_threshold_no_replacement(
r1: np.ndarray,
r2: np.ndarray,
sample_size: int,
iterations: int,
threshold: float,
seed: Optional[int] = None,
) -> float:
rng = np.random.default_rng(seed)
N = len(r1)
if sample_size > N:
raise ValueError("sample_size cannot exceed population size when sampling without replacement.")
count = 0
for _ in range(iterations):
idx = rng.choice(N, size=sample_size, replace=False)
k = cohen_kappa(r1[idx], r2[idx])
if np.isfinite(k) and (k >= threshold):
count += 1
return count / iterations
# ===========================================================
# 🚀 MAIN EXECUTION
# ===========================================================
def main():
print("=== Search for minimal sample size (lines) ===")
print(
f"Population κ={POPULATION_KAPPA:.3f} | Threshold κ={SAMPLE_THRESHOLD:.3f} | "
f"Goal: frac(κ ≥ threshold) < {TARGET_PROB}"
)
print(
f"POP_SIZE={POPULATION_SIZE:,} ITERATIONS={ITERATIONS} "
f"start={START_LINES} step={STEP_LINES} max={MAX_LINES}"
)
print(f"Prevalences: p1={PREV_R1:.3f}, p2={PREV_R2:.3f}\n")
rng = np.random.default_rng(SEED)
# Build population with requested κ and coder prevalences
r1, r2 = binary_population_with_kappa_marginals(
POPULATION_SIZE,
POPULATION_KAPPA,
p1=PREV_R1,
p2=PREV_R2,
seed=rng.integers(0, 2**31 - 1) if SEED is not None else None
)
found = None
for lines in range(START_LINES, MAX_LINES + 1, STEP_LINES):
frac = estimate_frac_ge_threshold_no_replacement(
r1, r2,
sample_size=lines,
iterations=ITERATIONS,
threshold=SAMPLE_THRESHOLD,
seed=rng.integers(0, 2**31 - 1) if SEED is not None else None,
)
print(f"lines={lines:5d} frac(κ ≥ {SAMPLE_THRESHOLD:.2f}) = {frac:.6f}")
if frac < TARGET_PROB:
found = lines
break
print()
if found is not None:
print(f"✅ Minimum lines achieving frac < {TARGET_PROB}: {found}")
else:
print(
f"⚠️ Did not drop below {TARGET_PROB} up to {MAX_LINES} lines. "
f"Try increasing MAX_LINES or ITERATIONS for tighter estimates."
)
# ===========================================================
# Run automatically when executed
# ===========================================================
if __name__ == "__main__":
main()