-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathworkers.py
More file actions
536 lines (442 loc) · 20.5 KB
/
workers.py
File metadata and controls
536 lines (442 loc) · 20.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
"""
RSS Feed Worker Module
This module contains worker functions for fetching and processing RSS feeds in parallel.
It provides thread-safe feed fetching, domain-based parallel processing, and intelligent
caching mechanisms with object storage support.
Key Features:
- Parallel RSS feed fetching with domain-based throttling
- Thread-safe operations using distributed locking
- Object storage integration for feed caching
- Specialized handling for LWN.net feeds
- Reddit feed processing with TOR support
- Feed deduplication and similarity filtering
Author: LinuxReport System
License: See LICENSE file
"""
# Standard library imports
from datetime import datetime, timedelta
import concurrent.futures
import itertools
import os
import re
from time import mktime
import threading
from timeit import default_timer as timer
from urllib.parse import urlparse
from collections import defaultdict
import pickle
from abc import ABC, abstractmethod
import socket
import sqlite3
import subprocess
import urllib.error
# Third-party imports
import feedparser
from fake_useragent import UserAgent
from selenium.common.exceptions import WebDriverException
# Local application imports
from feedfilter import merge_entries
from browser_fetch import fetch_site_posts
from shared import (
ALL_URLS, EXPIRE_WEEK, EXPIRE_YEARS, MAX_ITEMS, TZ,
USER_AGENT, RssFeed, g_c, g_cs, g_cm, get_lock, GLOBAL_FETCH_MODE_LOCK_KEY,
ENABLE_OBJECT_STORE_FEEDS, OBJECT_STORE_FEED_TIMEOUT,
ENABLE_OBJECT_STORE_FEED_PUBLISH, g_logger, history, WORKER_PROXYING,
PROXY_SERVER, PROXY_USERNAME, PROXY_PASSWORD,
ENABLE_REDDIT_API_FETCH
)
from Tor import fetch_via_tor
from app_config import DEBUG, USE_TOR
from Reddit import fetch_reddit_feed_as_feedparser
from object_storage_config import StorageOperationError, LibcloudError
from object_storage_sync import smart_fetch, publish_bytes
# =============================================================================
# GLOBAL CONSTANTS AND CONFIGURATION
# =============================================================================
# Initialize UserAgent for Reddit requests
ua = UserAgent()
# Reddit user agent configuration - Reddit is sensitive to user agents
if not g_cs.has("REDDIT_USER_AGENT"):
g_cs.put("REDDIT_USER_AGENT", ua.random, timeout=EXPIRE_YEARS)
USER_AGENT_RANDOM = g_cs.get("REDDIT_USER_AGENT")
# Regular expression for extracting links from HTML content
LINK_REGEX = re.compile(r'href=["\\]["\\](.*?)["\\]["\\]')
# =============================================================================
# FETCHER STRATEGY PATTERN
# =============================================================================
class FetcherStrategy(ABC):
"""Abstract base class for a feed fetching strategy."""
@abstractmethod
def fetch(self, url, rss_info):
"""
Fetches and processes a feed.
Args:
url (str): The URL of the feed to fetch.
rss_info (RssInfo): The RssInfo object for the feed.
Returns:
list: A list of processed feed entries.
"""
pass
class DefaultFetcher(FetcherStrategy):
"""The default strategy for fetching standard RSS/Atom feeds."""
def fetch(self, url, rss_info):
# Add proxy headers if proxying is enabled
if WORKER_PROXYING and PROXY_SERVER:
# Use configured proxy server
headers = {'X-Forwarded-For': PROXY_SERVER.split(':')[0]}
if PROXY_USERNAME and PROXY_PASSWORD:
import base64
auth_string = f"{PROXY_USERNAME}:{PROXY_PASSWORD}"
auth_bytes = auth_string.encode('ascii')
auth_b64 = base64.b64encode(auth_bytes).decode('ascii')
headers['Proxy-Authorization'] = f'Basic {auth_b64}'
res = feedparser.parse(url, agent=USER_AGENT, request_headers=headers)
else:
res = feedparser.parse(url, agent=USER_AGENT)
if not res:
return []
new_entries = res['entries']
return list(itertools.islice(new_entries, MAX_ITEMS))
class LwnFetcher(FetcherStrategy):
"""Strategy for handling the unique paywall logic of LWN.net feeds."""
def fetch(self, url, rss_info):
pending = g_c.get("lwn_pending") or {}
displayed = g_c.get("lwn_displayed") or set()
res = feedparser.parse(url, agent=USER_AGENT)
now = datetime.now(TZ)
ready = []
for entry in res.entries:
link = entry.link
title = entry.get('title', '')
pub = datetime.fromtimestamp(mktime(entry.published_parsed), tz=TZ)
if title.startswith("[$]"):
if link not in pending and link not in displayed:
pending[link] = {'title': title, 'published': pub}
g_logger.info(f"[LWN] Article locked, saving for future: {title} ({link}) at {pub.isoformat()}")
else:
if link not in displayed:
ready.append({'link': link, 'title': title, 'html_content': '', 'published': pub, 'published_parsed': entry.published_parsed})
displayed.add(link)
pending.pop(link, None)
for link, info in list(pending.items()):
if now - info['published'] >= timedelta(days=15):
title = info['title']
if title.startswith("[$]"):
title = title[3:].strip()
import time
ready.append({'link': link, 'title': title, 'html_content': '', 'published': now})
displayed.add(link)
pending.pop(link)
g_logger.info(f"[LWN] Article now available for free: {info['title']} ({link}) at {now.isoformat()}")
ready.sort(key=lambda x: x['published'])
g_c.put("lwn_pending", pending)
g_c.put("lwn_displayed", displayed)
return ready
class RedditFetcher(FetcherStrategy):
"""
Strategy for fetching Reddit feeds.
Behavior:
- If ENABLE_REDDIT_API_FETCH is True:
Use Reddit API via fetch_reddit_feed_as_feedparser() from Reddit.py.
That function returns feedparser-like entries already normalized for workers.py.
- Else:
Preserve legacy behavior:
* If USE_TOR: use fetch_via_tor + RSS/HTML.
* Otherwise: use direct feedparser with random UA.
"""
def fetch(self, url, rss_info):
# Prefer new Reddit API path when enabled
if ENABLE_REDDIT_API_FETCH:
g_logger.info(f"[RedditFetcher] Using Reddit API for URL: {url}")
api_result = fetch_reddit_feed_as_feedparser(url)
if not api_result or api_result.get('bozo'):
g_logger.error(f"[RedditFetcher] Reddit API fetch failed for {url}: {api_result.get('bozo_exception')}")
return []
# Reddit.fetch_reddit_feed_as_feedparser is responsible for returning
# entries that are compatible with the rest of the pipeline:
# - each entry should have link/title/summary/content (if applicable)
entries = api_result.get('entries', []) or []
return list(itertools.islice(entries, MAX_ITEMS))
# Legacy behavior: TOR or direct RSS/HTML fetch
if USE_TOR:
g_logger.info(f"[RedditFetcher] Using TOR proxy for Reddit URL: {url}")
res = fetch_via_tor(url, rss_info.site_url)
else:
g_logger.info(f"[RedditFetcher] Using legacy feedparser for Reddit URL: {url}")
res = feedparser.parse(url, agent=USER_AGENT_RANDOM)
if not res:
return []
new_entries = res.get('entries', []) if isinstance(res, dict) else getattr(res, 'entries', [])
return list(itertools.islice(new_entries, MAX_ITEMS))
class SeleniumFetcher(FetcherStrategy):
"""Strategy for feeds that require JavaScript rendering, using Selenium."""
def fetch(self, url, rss_info):
res = fetch_site_posts(rss_info.site_url, USER_AGENT)
if not res:
return []
new_entries = res['entries']
return list(itertools.islice(new_entries, MAX_ITEMS))
def get_fetcher(url):
"""
Factory function that returns the appropriate fetcher strategy for a given URL.
"""
if "lwn.net" in url:
return LwnFetcher()
if "reddit" in url:
return RedditFetcher()
if "fakefeed" in url:
return SeleniumFetcher()
return DefaultFetcher()
# =============================================================================
# CORE WORKER FUNCTIONS
# =============================================================================
def load_url_worker(url):
"""
Background worker to fetch and process a single RSS feed URL.
This function handles the complete lifecycle of RSS feed processing:
- Acquires thread-safe locks to prevent duplicate fetching
- Attempts object storage retrieval first (if enabled)
- Falls back to standard RSS parsing if needed
- Processes and filters feed entries
- Handles special cases (LWN.net, Reddit)
- Publishes processed feeds to object store
- Updates local cache with results
Args:
url (str): The RSS feed URL to process
Returns:
None: Results are stored in the global cache
"""
rss_info = ALL_URLS[url]
lock_key = f"feed_fetch:{url}"
# Use distributed locking to ensure only one process fetches this URL at a time
with get_lock(lock_key, owner_prefix=f"feed_worker_{os.getpid()}") as lock:
if not lock.locked():
g_logger.warning(f"Could not acquire lock for {url}, another process is fetching.")
return
start = timer()
if ENABLE_OBJECT_STORE_FEEDS:
content, _ = smart_fetch(url, cache_expiry=OBJECT_STORE_FEED_TIMEOUT)
if content:
try:
rssfeed = pickle.loads(content)
if isinstance(rssfeed, RssFeed):
g_c.put(url, rssfeed, timeout=EXPIRE_WEEK)
g_c.set_last_fetch(url, datetime.now(TZ), timeout=EXPIRE_WEEK)
g_logger.info(f"Successfully fetched processed feed from object store: {url}")
return
except (pickle.UnpicklingError, TypeError) as e:
g_logger.error(f"Error parsing object store feed for {url}: {e}")
fetcher = get_fetcher(url)
new_entries = fetcher.fetch(url, rss_info)
if not new_entries:
g_logger.warning(f"No entries found for {url}.")
# Continue processing - let the rest of the function handle empty entries
for entry in new_entries:
entry['underlying_url'] = entry.get('origin_link', entry.get('link', ''))
if 'content' in entry and entry['content']:
entry['html_content'] = entry['content'][0].get('value', '')
else:
entry['html_content'] = entry.get('summary', '')
if not entry.get('published_parsed'):
import time
entry['published_parsed'] = time.gmtime()
entry['published'] = time.strftime('%a, %d %b %Y %H:%M:%S GMT', entry['published_parsed'])
if "reddit" in url:
if "reddit" not in entry.get('underlying_url', ''):
entry['link'] = entry['underlying_url']
else:
links = LINK_REGEX.findall(entry.get('html_content', ''))
links = [lnk for lnk in links if 'reddit.com' not in lnk]
if links:
entry['link'] = links[0]
old_feed = g_c.get(url)
new_count = len(new_entries)
if old_feed and old_feed.entries:
new_count = len(set(e.get('link') for e in new_entries) - set(e.get('link') for e in old_feed.entries))
entries = merge_entries(new_entries, old_feed.entries)
else:
entries = new_entries
entries = list(itertools.islice(entries, MAX_ITEMS))
history.update_fetch(url, new_count)
top_articles = []
if old_feed and old_feed.entries:
previous_top_5 = set(e['link'] for e in old_feed.entries[:5])
current_top_5 = set(e['link'] for e in entries[:5])
if previous_top_5 == current_top_5:
top_articles = old_feed.top_articles
rssfeed = RssFeed(entries, top_articles=top_articles)
if ENABLE_OBJECT_STORE_FEED_PUBLISH:
try:
feed_data = pickle.dumps(rssfeed)
publish_bytes(feed_data, url)
g_logger.info(f"Successfully published feed to object store: {url}")
except (pickle.PicklingError, ValueError, StorageOperationError, LibcloudError) as e:
g_logger.error(f"Error publishing feed to object store for {url}: {e}")
g_c.put(url, rssfeed, timeout=EXPIRE_WEEK)
g_c.set_last_fetch(url, datetime.now(TZ), timeout=EXPIRE_WEEK)
if len(entries) > 2:
g_cm.delete(rss_info.site_url)
end = timer()
g_logger.info(f"Parsing from: {url}, in {end - start:f}. New articles: {new_count}")
# =============================================================================
# LOCKING AND THREADING UTILITIES
# =============================================================================
def _acquire_fetch_lock():
"""
Acquire the global fetch lock with timeout.
Returns:
SqliteLock or None: The acquired lock object, or None if acquisition failed
"""
lock = get_lock(GLOBAL_FETCH_MODE_LOCK_KEY, owner_prefix=f"fetch_mode_{os.getpid()}")
if lock.acquire(timeout_seconds=60, wait=True):
g_logger.info("Acquired global fetch lock.")
return lock
else:
g_logger.warning("Failed to acquire global fetch lock after waiting.")
return None
def _check_fetch_lock_available():
"""
Check if fetch lock is available without acquiring it.
Returns:
bool: True if lock is available, False otherwise
"""
check_lock = get_lock(GLOBAL_FETCH_MODE_LOCK_KEY, owner_prefix=f"fetch_check_{os.getpid()}")
if check_lock.acquire(wait=False):
check_lock.release()
return True
return False
# =============================================================================
# DOMAIN PROCESSING UTILITIES
# =============================================================================
def get_domain(url):
"""
Extract base domain from URL for grouping purposes.
Args:
url (str): The URL to extract domain from
Returns:
str: The base domain (e.g., 'bandcamp.com' from 'https://news.bandcamp.com/feed')
"""
try:
netloc = urlparse(url).netloc
# Split the netloc into parts
parts = netloc.split('.')
# Return the last two parts as the base domain (e.g., bandcamp.com)
return '.'.join(parts[-2:]) if len(parts) > 1 else netloc
except (AttributeError, ValueError) as e:
g_logger.warning(f"Could not parse domain from URL '{url}': {e}")
return url
def process_domain_urls(urls):
"""
Process URLs from the same domain sequentially to avoid overwhelming servers.
Args:
urls (list): List of URLs from the same domain to process
"""
for url in urls:
try:
load_url_worker(url)
except (WebDriverException, socket.timeout, ConnectionResetError, urllib.error.URLError, sqlite3.Error, IOError) as exc:
g_logger.error(f'CRITICAL ERROR processing {url}: {exc}')
g_logger.error(f'Exception type: {type(exc).__name__}')
g_logger.error(f'Exception args: {exc.args}')
import traceback
g_logger.error(f'Full traceback: {traceback.format_exc()}')
def process_urls_in_parallel(urls, description="processing"):
"""
Process URLs in parallel while ensuring no domain gets multiple simultaneous requests.
This function implements intelligent parallel processing that:
- Groups URLs by domain to prevent server overload
- Processes different domains in parallel
- Processes URLs from the same domain sequentially
- Uses thread pools for efficient resource management
Args:
urls (list): List of URLs to process
description (str): Description of the operation for logging purposes
"""
# Group URLs by domain for intelligent parallel processing
domain_to_urls = defaultdict(list)
for url in urls:
domain = get_domain(url)
domain_to_urls[domain].append(url)
g_logger.info(f"{description.capitalize()} {len(urls)} URLs with domain-based parallel processing...")
# Process each domain's URLs sequentially, but different domains in parallel
with concurrent.futures.ThreadPoolExecutor(max_workers=10 if not DEBUG else 1) as executor:
futures = []
for domain, domain_urls in domain_to_urls.items():
# Submit a task for each domain that will process its URLs sequentially
future = executor.submit(process_domain_urls, domain_urls)
futures.append(future)
for future in concurrent.futures.as_completed(futures):
try:
future.result() # Ensure exceptions in workers are raised
except (WebDriverException, socket.timeout, ConnectionResetError, urllib.error.URLError, sqlite3.Error, IOError) as exc:
g_logger.error(f'CRITICAL ERROR in domain processing during {description}: {exc}')
g_logger.error(f'Exception type: {type(exc).__name__}')
g_logger.error(f'Exception args: {exc.args}')
import traceback
g_logger.error(f'Full traceback: {traceback.format_exc()}')
g_logger.error(f'This error occurred during parallel processing of URLs')
# =============================================================================
# MAIN FETCH OPERATIONS
# =============================================================================
def fetch_urls_parallel(urls):
"""
Fetch multiple URLs in parallel with proper locking and domain management.
This is the main entry point for parallel URL fetching. It ensures
thread safety and efficient resource usage.
Args:
urls (list): List of URLs to fetch in parallel
"""
lock = _acquire_fetch_lock()
if not lock:
g_logger.warning("Aborting parallel fetch due to inability to acquire global lock.")
return
try:
process_urls_in_parallel(urls, "fetching")
finally:
lock.release()
g_logger.info("Released global fetch lock.")
def refresh_thread():
"""
Background thread function to refresh expired RSS feeds.
This function:
- Checks all configured RSS feeds for expiration
- Identifies feeds that need refreshing
- Processes them in parallel with domain-based throttling
- Maintains proper locking throughout the operation
"""
lock = _acquire_fetch_lock()
if not lock:
g_logger.warning("Aborting refresh thread due to inability to acquire global lock.")
return
try:
# Collect URLs that need refreshing
urls_to_refresh = []
all_urls = list(ALL_URLS.keys())
last_fetch_cache = g_c.get_all_last_fetches(all_urls)
for url, rss_info in ALL_URLS.items():
if rss_info.logo_url != "Custom.png":
last_fetch = last_fetch_cache.get(url)
# Only check expiration if we have a feed or if we're checking a non-custom site
if g_c.has_feed_expired(url, last_fetch):
urls_to_refresh.append(url)
if not urls_to_refresh:
g_logger.info("No feeds need refreshing in this cycle.")
return
process_urls_in_parallel(urls_to_refresh, "refreshing")
finally:
lock.release()
g_logger.info("Released global fetch lock after refresh.")
def fetch_urls_thread():
"""
Start a background thread to refresh RSS feeds.
This function implements a safety check to prevent multiple refresh
operations from running simultaneously. It only starts a new refresh
thread if no other fetch/refresh operation is currently running.
"""
if not _check_fetch_lock_available():
g_logger.info("Fetch/refresh operation already in progress. Skipping background refresh trigger.")
return
g_logger.info("No fetch operation running. Starting background refresh thread...")
t = threading.Thread(target=refresh_thread, args=())
t.daemon = True
t.start()