Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,37 @@ npm start
```
---

### `/api/monitor` (GET or POST)

Advanced feed monitoring with health checks and scoring.

**Parameters:**
- `url`: The RSS feed URL to monitor.

**Example Request:**
```bash
curl "https://your-domain.com/api/monitor?url=https://www.youtube.com/feeds/videos.xml?channel_id=UC123"
```

**Example Response:**
```json
{
"feedUrl": "https://www.youtube.com/feeds/videos.xml?channel_id=UC123",
"responseTimeMs": 421,
"score": 92,
"health": {
"status": "healthy",
"reason": null
},
"lastUpdated": {
"iso": "2026-06-08T10:30:00+00:00",
"relative": "3 hours ago"
}
}
```

---

### `/api/feed` (GET or POST)

### Scan a YouTube channel
Expand Down
55 changes: 53 additions & 2 deletions api/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import urllib.request
import urllib.error
import json
import monitoring_utils

app = Flask(__name__, template_folder='api')
cache = Cache(app, config={'CACHE_TYPE': 'SimpleCache', 'CACHE_DEFAULT_TIMEOUT': 300})
Expand All @@ -21,15 +22,65 @@ def index():
return send_from_directory('.', 'index.html')


@app.route('/api/monitor', methods=['GET', 'POST'])
def api_monitor():
"""API endpoint for monitoring feed health and status."""
url = request.args.get('url')
data = request.get_json(silent=True)
if not isinstance(data, dict):
data = {}

if not url:
url = data.get('url')

if not url:
return jsonify({'error': 'Missing url parameter'}), 400

if not url.startswith('http'):
url = 'https://' + url

# 1. Fetch feed
fetch_result = monitoring_utils.fetch_feed(url)
content, response_time, fetch_error, status_code = fetch_result

# 2. Parse feed
parsed_feed = monitoring_utils.parse_xml(content)

# 3. Get latest timestamp
latest_iso = monitoring_utils.get_latest_timestamp(parsed_feed)
relative_time = monitoring_utils.get_relative_time(latest_iso)

# 4. Calculate health and score
status, reason, score = monitoring_utils.calculate_health_and_score(fetch_result, parsed_feed, latest_iso)

response_data = {
"feedUrl": url,
"responseTimeMs": response_time,
"score": score,
"health": {
"status": status,
"reason": reason
},
"lastUpdated": {
"iso": latest_iso,
"relative": relative_time
} if latest_iso else None
}

return jsonify(response_data)


@app.route('/api/feed', methods=['GET', 'POST'])
def api_feed():
"""API endpoint for getting feed data."""
# Try query parameters first, then fall back to JSON body
url = request.args.get('url')
data = request.get_json(silent=True) or {}
data = request.get_json(silent=True)
if not isinstance(data, dict):
data = {}

if not url:
if not data or 'url' not in data:
if 'url' not in data:
return jsonify({
'error': 'Missing url parameter',
'usage': {
Expand Down
2 changes: 1 addition & 1 deletion api/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ <h1>YouTube RSS Scanner</h1>

html += '<div class="feed-row"><div class="feed-label">Selected RSS Feed:</div>';
html += '<div class="feed-link">' + (data.selected_feed || data.youtube_rss) + '</div>';
html += '<button class="copy-btn" onclick="copyEncodedText(\'' + encodeURIComponent((data.selected_feed || data.youtube_rss)) + '\')">Copy Selected RSS</button></div>';
html += '<button class="copy-btn" data-encoded="' + encodeURIComponent((data.selected_feed || data.youtube_rss)) + '">Copy Selected RSS</button></div>';

if (data.official_feeds) {
html += '<div class="atom-output"><div class="feed-label">Official YouTube Feeds:</div>';
Expand Down
272 changes: 272 additions & 0 deletions monitoring_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,272 @@
import time
import urllib.request
import urllib.error
import urllib.parse
import socket
import ipaddress
import defusedxml.ElementTree as ET
from datetime import datetime, timezone

def is_safe_url(url):
"""
Validate URL to prevent SSRF.
"""
try:
parsed = urllib.parse.urlparse(url)
if parsed.scheme not in ('http', 'https'):
return False

hostname = parsed.hostname
if not hostname:
return False

# Resolve hostname to IPs
addr_info = socket.getaddrinfo(hostname, None)
for family, _, _, _, sockaddr in addr_info:
ip_str = sockaddr[0]
ip = ipaddress.ip_address(ip_str)

if ip.is_loopback or ip.is_private or ip.is_link_local or ip.is_multicast:
return False

return True
except Exception:
return False

def parse_xml(content_bytes):
"""
Parse XML content and extract feed info.
Returns: { 'title': str, 'updated': str, 'items': [ { 'title': str, 'pubDate': str, 'updated': str } ] } or None
"""
if not content_bytes:
return None
try:
root = ET.fromstring(content_bytes)

# Atom feed
if root.tag.endswith('feed'):
ns = {'atom': 'http://www.w3.org/2005/Atom'}
items = []
for entry in root.findall('atom:entry', ns):
items.append({
'title': entry.findtext('atom:title', namespaces=ns),
'updated': entry.findtext('atom:updated', namespaces=ns),
'pubDate': entry.findtext('atom:published', namespaces=ns)
})
return {
'title': root.findtext('atom:title', namespaces=ns),
'updated': root.findtext('atom:updated', namespaces=ns),
'items': items
}

# RSS 2.0
channel = root.find('channel')
if channel is not None:
items = []
for item in channel.findall('item'):
items.append({
'title': item.findtext('title'),
'pubDate': item.findtext('pubDate'),
'updated': None
})
return {
'title': channel.findtext('title'),
'updated': channel.findtext('lastBuildDate') or channel.findtext('pubDate'),
'items': items
}
except Exception:
return None
return None

def get_latest_timestamp(parsed_feed):
"""
Detect the latest update timestamp from parsed feed.
"""
if not parsed_feed:
return None

dates = []

def try_parse_date(date_str):
if not date_str:
return None
# Try various formats
formats = [
'%Y-%m-%dT%H:%M:%S%z',
'%Y-%m-%dT%H:%M:%SZ',
'%a, %d %b %Y %H:%M:%S %z',
'%a, %d %b %Y %H:%M:%S %Z',
'%Y-%m-%d %H:%M:%S'
]
for fmt in formats:
try:
# Handle 'Z' suffix
if date_str.endswith('Z'):
date_str = date_str.replace('Z', '+00:00')
return datetime.strptime(date_str, fmt)
except ValueError:
continue
return None

if parsed_feed.get('updated'):
dt = try_parse_date(parsed_feed['updated'])
if dt:
dates.append(dt)

for item in parsed_feed.get('items', []):
for key in ['updated', 'pubDate']:
if item.get(key):
dt = try_parse_date(item[key])
if dt:
dates.append(dt)

if not dates:
return None

# Ensure all datetimes are timezone-aware for comparison
aware_dates = []
for d in dates:
if d.tzinfo is None:
aware_dates.append(d.replace(tzinfo=timezone.utc))
else:
aware_dates.append(d)

latest = max(aware_dates)
return latest.isoformat()

def get_relative_time(iso_timestamp):
"""
Convert ISO timestamp to human-readable relative time.
"""
if not iso_timestamp:
return None
try:
dt = datetime.fromisoformat(iso_timestamp)
now = datetime.now(timezone.utc)
diff = now - dt

seconds = int(diff.total_seconds())
if seconds < 0:
return "in the future"
if seconds < 60:
return f"{seconds} seconds ago"

minutes = seconds // 60
if minutes < 60:
return f"{minutes} {'minute' if minutes == 1 else 'minutes'} ago"

hours = minutes // 60
if hours < 24:
return f"{hours} {'hour' if hours == 1 else 'hours'} ago"

days = hours // 24
if days < 30:
return f"{days} {'day' if days == 1 else 'days'} ago"

months = days // 30
if months < 12:
return f"{months} {'month' if months == 1 else 'months'} ago"

years = max(1, days // 365)
return f"{years} {'year' if years == 1 else 'years'} ago"
except Exception:
return None

def calculate_health_and_score(fetch_result, parsed_feed, latest_timestamp):
"""
Calculate health status, reason, and score.
Returns: (status, reason, score)
"""
content, response_time, fetch_error, status_code = fetch_result

# 0. Base health and score
status = "healthy"
reason = None
score = 100

# 1. Check fetch errors
if fetch_error:
score = 0
status = "broken"
reason = fetch_error
return status, reason, score

# 2. Check parsing success
if not parsed_feed:
score = 0
status = "broken"
reason = "Invalid XML"
return status, reason, score

# 3. Check item count
items = parsed_feed.get('items', [])
if not items:
score = 40
status = "broken"
reason = "Empty feed"
return status, reason, score

# 4. Response time penalties
if response_time > 2000:
score -= 10
if response_time > 5000:
score -= 20
status = "warning"
reason = "Slow response"

# 5. Recent uploads check (stale feed detection)
if latest_timestamp:
dt = datetime.fromisoformat(latest_timestamp)
now = datetime.now(timezone.utc)
days_ago = (now - dt).days
if days_ago > 30:
score -= 30
if status == "healthy":
status = "warning"
reason = "Feed has no recent uploads"
else:
score -= 10
if status == "healthy":
status = "warning"
reason = "No timestamps found"

# Ensure score is within 0-100
score = max(0, min(100, score))

return status, reason, score

def fetch_feed(url, timeout=10):
"""
Fetch URL content and measure response time.
Returns: (content_bytes, response_time_ms, error_reason, status_code)
"""
start_time = time.time()

if not is_safe_url(url):
return None, 0, "Disallowed URL", None

headers = {
'User-Agent': 'YouTube RSS Monitor/1.0',
}
req = urllib.request.Request(url, headers=headers)

try:
with urllib.request.urlopen(req, timeout=timeout) as response:
content = response.read()
duration = int((time.time() - start_time) * 1000)
return content, duration, None, response.status
except urllib.error.HTTPError as e:
duration = int((time.time() - start_time) * 1000)
return None, duration, f"HTTP {e.code}", e.code
except urllib.error.URLError as e:
duration = int((time.time() - start_time) * 1000)
reason = str(e.reason)
if isinstance(e.reason, socket.timeout):
reason = "request timeout"
return None, duration, reason, None
except socket.timeout:
duration = int((time.time() - start_time) * 1000)
return None, duration, "request timeout", None
except Exception as e:
duration = int((time.time() - start_time) * 1000)
return None, duration, str(e), None
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
flask
flask-caching
defusedxml
Loading