-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathembeddings_dedup.py
More file actions
367 lines (291 loc) · 13.9 KB
/
embeddings_dedup.py
File metadata and controls
367 lines (291 loc) · 13.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
"""
embeddings_dedup.py
Article deduplication system using sentence embeddings for intelligent content filtering.
Provides functionality for article fetching, similarity detection, and deduplication
using advanced NLP techniques to identify and filter duplicate or similar content.
"""
# =============================================================================
# STANDARD LIBRARY IMPORTS
# =============================================================================
import os
import re
import math
# =============================================================================
# THIRD-PARTY IMPORTS
# =============================================================================
import numpy as np
from sentence_transformers import SentenceTransformer
from Logging import g_logger
# =============================================================================
# LOGGING CONFIGURATION
# =============================================================================
# Use the root logger to inherit the same configuration as auto_update.py
logger = g_logger
# =============================================================================
# CONSTANTS AND CONFIGURATION
# =============================================================================
EMBEDDER_MODEL_NAME = 'all-MiniLM-L6-v2'
THRESHOLD = 0.75 # Similarity threshold for deduplication (lowered for AI-generated titles)
# =============================================================================
# GLOBAL VARIABLES AND CACHING
# =============================================================================
# Embedding model instances (lazy-loaded for performance)
embedder = None
embedding_cache = {} # Cache for storing computed embeddings as numpy arrays
_embedding_dim = None # Cache embedding dimension for zero vector creation
_zero_embedding = None # Cached zero embedding vector
# Keep st_util for backward compatibility with tests
try:
from sentence_transformers import util
st_util = util
except ImportError:
st_util = None
# =============================================================================
# EMBEDDING UTILITY FUNCTIONS
# =============================================================================
def clamp_similarity(score):
"""Clamp cosine similarity score to [-1, 1] to avoid floating point artifacts."""
if not isinstance(score, (int, float)) or math.isnan(score):
return 0.0
return max(-1.0, min(1.0, score))
def _get_zero_embedding():
"""Get a zero embedding vector of the correct dimension."""
global embedder, _embedding_dim, _zero_embedding
if _zero_embedding is not None:
return _zero_embedding
if embedder is None:
embedder = SentenceTransformer(EMBEDDER_MODEL_NAME)
if _embedding_dim is None:
# Get dimension by encoding a dummy text once
sample = embedder.encode(" ", convert_to_tensor=False, show_progress_bar=False)
_embedding_dim = sample.shape[-1]
# Create zero array and cache it
_zero_embedding = np.zeros(_embedding_dim, dtype=np.float32)
return _zero_embedding
def get_embeddings(texts):
"""
Get embeddings for text(s) with caching support.
Accepts either a single string or a list of strings. Returns a single embedding
for a single string, or a list of embeddings for a list of strings.
Automatically handles batching for optimal performance when given a list.
Args:
texts: Single text string or list of text strings to encode
Returns:
np.ndarray: Single embedding if input is a string
list: List of embedding numpy arrays if input is a list (same order as input)
"""
global embedder
if embedder is None:
embedder = SentenceTransformer(EMBEDDER_MODEL_NAME)
# Handle single string input
is_single = isinstance(texts, str)
if is_single:
texts = [texts]
elif not isinstance(texts, (list, tuple)):
# If not a string and not a list/tuple, wrap it in a list
# This allows the .strip() call below to raise AttributeError for None
texts = [texts]
# Pre-allocate result list to maintain order
embeddings = [None] * len(texts)
valid_texts = []
valid_indices = []
# First pass: handle cached and identify texts to encode
zero_vec = _get_zero_embedding()
for i, text in enumerate(texts):
# Normalize input - strip whitespace (will raise AttributeError for None)
try:
normalized_text = text.strip()
except AttributeError:
# Re-raise AttributeError for None/invalid types (expected by tests)
raise
if normalized_text:
if normalized_text in embedding_cache:
embeddings[i] = embedding_cache[normalized_text]
else:
valid_texts.append(normalized_text)
valid_indices.append(i)
else:
# Empty text gets zero vector
embeddings[i] = zero_vec
# Encode uncached texts in batch (as numpy arrays directly)
if valid_texts:
batch_embeddings = embedder.encode(valid_texts, convert_to_tensor=False, show_progress_bar=False)
# Cache and assign to correct positions
for text, emb, idx in zip(valid_texts, batch_embeddings, valid_indices):
embedding_cache[text] = emb
embeddings[idx] = emb
# Return single embedding for single input, list for list input
return embeddings[0] if is_single else embeddings
# =============================================================================
# DEDUPLICATION FUNCTIONS
# =============================================================================
def _compute_cosine_similarities(query_emb, candidate_embs):
"""
Compute cosine similarities between a query embedding and candidate embeddings.
Args:
query_emb: Query embedding vector (1D numpy array)
candidate_embs: Candidate embedding vectors (2D numpy array, shape: [n_candidates, dim])
Expected to be L2-normalized (as produced by SentenceTransformer).
Returns:
Array of similarity scores (1D numpy array, shape: [n_candidates])
"""
if candidate_embs.shape[0] == 0:
return np.array([])
# For L2-normalized embeddings, cosine similarity is just the dot product.
# Empty/whitespace texts use a cached zero vector, which naturally yields 0.0
# similarity via the dot product with any normalized vector.
similarities = np.dot(candidate_embs, query_emb)
return np.clip(similarities, -1.0, 1.0)
def deduplicate_articles_with_exclusions(articles, excluded_embeddings, threshold=THRESHOLD):
"""
Deduplicate articles based on their embeddings, excluding similar ones.
Filters a list of articles by comparing their title embeddings against
a set of excluded embeddings. Articles with similarity scores above the
threshold are filtered out to avoid duplicate or very similar content.
Performance: This vectorized implementation is ~700-800x faster than the
previous iterative approach. Benchmarks show ~715-862x speedup on typical
workloads (e.g., processing 200 articles: slow=1.5-2.0s, fast=0.002s).
Args:
articles (list): List of article dictionaries with 'title' keys
excluded_embeddings (list): List of embedding tensors to compare against
threshold (float): Similarity threshold for filtering (default: THRESHOLD)
Returns:
list: Filtered list of unique articles
"""
# Validate input parameters
if not isinstance(articles, (list, tuple)):
raise TypeError(f"articles must be a list or tuple, got {type(articles).__name__}")
if not isinstance(excluded_embeddings, (list, tuple)):
raise TypeError(f"excluded_embeddings must be a list or tuple, got {type(excluded_embeddings).__name__}")
# Vectorized implementation using numpy operations for optimal performance
if not articles:
return []
unique_articles = []
# Get all article titles and compute embeddings in batch for efficiency
article_titles = [article["title"] for article in articles]
article_embeddings = get_embeddings(article_titles) # Already numpy arrays
# Convert excluded embeddings to numpy arrays (handle both numpy and torch tensors)
excluded_list = []
for emb in excluded_embeddings:
if isinstance(emb, np.ndarray):
excluded_list.append(emb)
elif hasattr(emb, 'numpy'): # torch.Tensor
excluded_list.append(emb.numpy())
else:
excluded_list.append(np.asarray(emb))
# Stack initial excluded embeddings once (outside loop for efficiency)
# This avoids repeated stacking on every iteration
if excluded_list:
excluded_arrays = np.stack(excluded_list)
else:
# Create empty 2D array with correct embedding dimension
embedding_dim = _get_zero_embedding().shape[0]
excluded_arrays = np.empty((0, embedding_dim), dtype=np.float32)
# Process articles individually to maintain exact progressive exclusion behavior
# This ensures each article is checked against ALL previously selected articles
for article, current_emb in zip(articles, article_embeddings):
# Check similarity against all current exclusions
if excluded_arrays.shape[0] > 0:
similarities = _compute_cosine_similarities(current_emb, excluded_arrays)
max_similarity = np.max(similarities)
is_similar = max_similarity >= threshold
else:
is_similar = False
if not is_similar:
unique_articles.append(article)
# Append to stacked array efficiently (avoids re-stacking entire list)
excluded_arrays = np.concatenate([excluded_arrays, current_emb[None, :]], axis=0)
logger.info(f"Deduplication: Filtered {len(articles) - len(unique_articles)} duplicate articles")
return unique_articles
def get_best_matching_article(target_title, articles):
"""
Find the article with the highest similarity score to the target title.
Compares the target title against all articles in the list using semantic
similarity. Returns the best matching article if it meets the similarity
threshold, otherwise returns None.
Args:
target_title (str): Title to find matches for
articles (list): List of article dictionaries with 'title' keys
Returns:
dict: Best matching article if similarity >= THRESHOLD, None otherwise
"""
if not articles:
return None
# Get target embedding
target_emb = get_embeddings(target_title)
# Get all article embeddings in batch for efficiency
article_titles = [article["title"] for article in articles]
article_embeddings = get_embeddings(article_titles)
# Embeddings are already numpy arrays, use them directly
target_array = target_emb
article_arrays = np.stack(article_embeddings)
# Use shared similarity computation function
similarities = _compute_cosine_similarities(target_array, article_arrays)
# Find best match
best_idx = np.argmax(similarities)
best_score = similarities[best_idx]
# Early return for perfect match
if best_score >= 1.0:
return articles[best_idx]
# Keep the verbose logging for debugging when no match found
if best_score < THRESHOLD:
logger.warning(f"No match found above threshold {THRESHOLD} for title: '{target_title}'")
logger.warning(f"Best score was {best_score:.3f}")
logger.info("All available headlines and their similarity scores:")
for i, article in enumerate(articles):
logger.info(f" {similarities[i]:.3f} - '{article['title']}'")
return articles[best_idx] if best_score >= THRESHOLD else None
# =============================================================================
# HTML PARSING AND EXTRACTION FUNCTIONS
# =============================================================================
def extract_articles_from_html(html_file):
"""
Extract article URLs and titles from the HTML file.
Parses an existing HTML file to extract article information using regex
pattern matching. Looks for specific HTML structure with links and titles.
Args:
html_file (str): Path to the HTML file to parse
Returns:
list: List of article dictionaries with 'url' and 'title' keys
"""
if not os.path.exists(html_file):
logger.info(f"No existing HTML file found at {html_file}")
return []
# Read the existing file
with open(html_file, "r", encoding="utf-8") as f:
current_html = f.read()
articles = []
pattern = r'<a\s+[^>]*href="([^"]+)"[^>]*>\s*<font[^>]*>\s*<b[^>]*>([^<]+)</b>'
matches = re.finditer(pattern, current_html)
for match in matches:
url, title = match.groups()
articles.append({"url": url, "title": title})
return articles
# =============================================================================
# RSS FEED PROCESSING FUNCTIONS
# =============================================================================
def fetch_recent_articles(all_urls, cache):
"""
Fetch recent articles from RSS feeds stored in cache.
Retrieves recent articles from cached RSS feed data, limiting the number
of articles per feed to avoid overwhelming the system. Used for LLM processing.
Args:
all_urls (dict): Dictionary mapping feed URLs to feed information
cache (dict): Cache containing RSS feed data
Returns:
list: List of article dictionaries with 'title' and 'url' keys
"""
from auto_update import MAX_ARTICLES_PER_FEED_FOR_LLM
articles = []
for feed_url, _ in all_urls.items():
feed = cache.get(feed_url)
if feed is None:
continue
count = 0
for entry in feed.entries:
title = entry["title"]
articles.append({"title": title, "url": entry["link"]})
count += 1
if count == MAX_ARTICLES_PER_FEED_FOR_LLM:
break
return articles