-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdivergence.py
More file actions
103 lines (84 loc) · 3.13 KB
/
divergence.py
File metadata and controls
103 lines (84 loc) · 3.13 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
# divergence.py
import numpy as np
from typing import Optional
# Thresholds — tunable
_CONSENSUS_THRESHOLD = 0.35
_CONTESTED_THRESHOLD = 0.60
_MIN_ARTICLES = 3 # need at least 3 articles to measure divergence meaningfully
def compute_divergence(
articles: list[dict],
embedding_key: str = "_embedding",
) -> dict:
"""
Given a list of articles (already classified + embedded), compute how
much the sources diverge in their coverage angle.
Articles must have an '_embedding' key (np.ndarray) added by classify_articles().
If embeddings are missing, falls back to a neutral result.
Returns:
{
"score": float, # 0.0 (full consensus) → 1.0 (full divergence)
"label": str, # "Consensus" | "Mixed" | "Contested"
"mean_similarity": float,
"variance": float,
"n_articles": int,
"outlier_titles": list[str], # titles most divergent from the group center
}
"""
embeddings = [
a[embedding_key] for a in articles
if embedding_key in a and a[embedding_key] is not None
]
if len(embeddings) < _MIN_ARTICLES:
return _neutral_result(len(articles))
emb_matrix = np.array(embeddings) # shape: (N, 384)
# Pairwise cosine similarity — since vectors are already L2-normalized,
# this is just the dot product matrix
sim_matrix = emb_matrix @ emb_matrix.T # shape: (N, N)
# Extract upper triangle (exclude diagonal self-similarity = 1.0)
n = len(embeddings)
upper_indices = np.triu_indices(n, k=1)
pairwise_sims = sim_matrix[upper_indices]
mean_sim = float(np.mean(pairwise_sims))
variance = float(np.var(pairwise_sims))
# Divergence score: low mean similarity + high variance = contested
# Weight variance more heavily — two camps is worse than gradual spread
score = (1.0 - mean_sim) * 0.6 + variance * 0.4
score = float(np.clip(score, 0.0, 1.0))
if score < _CONSENSUS_THRESHOLD:
label = "Consensus"
elif score < _CONTESTED_THRESHOLD:
label = "Mixed"
else:
label = "Contested"
# Find outlier articles: those with lowest mean similarity to all others
mean_sim_per_article = sim_matrix.mean(axis=1)
outlier_indices = np.argsort(mean_sim_per_article)[:2] # 2 most divergent
outlier_titles = [articles[i].get("title", "") for i in outlier_indices]
return {
"score": round(score, 4),
"label": label,
"mean_similarity": round(mean_sim, 4),
"variance": round(variance, 4),
"n_articles": len(embeddings),
"outlier_titles": outlier_titles,
}
def _neutral_result(n: int) -> dict:
return {
"score": 0.0,
"label": "Insufficient data",
"mean_similarity": 1.0,
"variance": 0.0,
"n_articles": n,
"outlier_titles": [],
}
def annotate_topic_buckets(
topic_buckets: dict[str, list[dict]],
) -> dict[str, dict]:
"""
Run divergence analysis on every topic bucket.
Returns a dict of topic → divergence result.
"""
return {
topic: compute_divergence(articles)
for topic, articles in topic_buckets.items()
}