-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathapp.py
More file actions
222 lines (185 loc) · 7.91 KB
/
app.py
File metadata and controls
222 lines (185 loc) · 7.91 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
from flask import Flask, render_template, request, jsonify
from tensorflow.keras.models import load_model
from tensorflow.keras.preprocessing.sequence import pad_sequences
import pickle
import requests
import re
import os
from dotenv import load_dotenv
import hashlib
# Load environment variables
load_dotenv()
app = Flask(__name__)
# Configuration
CONFIG = {
"YOUTUBE_API_KEY": os.getenv("YOUTUBE_API_KEY", ""),
"COLLECT_API_KEY": os.getenv("COLLECT_API_KEY", ""),
"CACHE_ENABLED": os.getenv("CACHE_ENABLED", "false").lower() == "true"
}
# Load model and tokenizer only once
try:
model = load_model("sentiment_model.h5")
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
print("Model loaded successfully.")
except Exception as e:
print(f"Error loading model: {e}")
model = None
try:
with open("tokenizer.pkl", "rb") as handle:
tokenizer = pickle.load(handle)
print("Tokenizer loaded successfully.")
except Exception as e:
print(f"Error loading tokenizer: {e}")
tokenizer = None
def extract_video_id(youtube_url):
"""Extract video ID from YouTube URL"""
patterns = [
r"(?:https?:\/\/)?(?:www\.)?youtu\.be\/([^?]+)",
r"(?:https?:\/\/)?(?:www\.)?youtube\.com\/watch\?v=([^&]+)",
r"(?:https?:\/\/)?(?:www\.)?youtube\.com\/embed\/([^?]+)"
]
for pattern in patterns:
match = re.search(pattern, youtube_url)
if match:
return match.group(1)
return None
def predict_sentiment(text):
"""Predict sentiment of a single text"""
if model is None or tokenizer is None:
return "Unknown"
try:
# Use the same preprocessing as in Sentiment_Analysis_IMDB.ipynb
sequence = tokenizer.texts_to_sequences([text])
padded_sequence = pad_sequences(sequence, maxlen=200)
prediction = model.predict(padded_sequence, verbose=0)[0][0]
return "positive" if prediction > 0.5 else "negative"
except Exception as e:
print(f"Error in sentiment prediction: {e}")
return "Unknown"
def get_unique_cache_key(platform, input_value):
"""Generate unique cache key based on platform and input"""
key_str = f"{platform}:{input_value}"
return hashlib.md5(key_str.encode()).hexdigest()
def analyze_sentiments(texts):
"""Analyze sentiment for a list of texts with proper uniqueness"""
if not texts or not isinstance(texts, list):
return {
"error": "No valid texts provided for analysis",
"total_reviews": 0,
"positive_percentage": 0,
"overall_sentiment": "Unknown"
}
positive_count = 0
unique_texts = list(set(texts)) # Remove duplicates
labeled_texts = []
for text in unique_texts:
sentiment = predict_sentiment(text)
if sentiment == "positive":
positive_count += 1
labeled_texts.append({"text": text, "sentiment": sentiment})
total = len(unique_texts)
percentage = round((positive_count / total) * 100, 2) if total > 0 else 0
overall = "positive" if percentage > 50 else "negative"
return {
"total_reviews": total,
"positive_percentage": percentage,
"overall_sentiment": overall,
"positive_count": positive_count,
"negative_count": total - positive_count,
"labeled_texts": labeled_texts
}
def get_youtube_comments(video_id, max_comments=100):
"""Fetch unique YouTube comments for a specific video"""
if not CONFIG["YOUTUBE_API_KEY"]:
return {"error": "YouTube API not configured"}
url = "https://www.googleapis.com/youtube/v3/commentThreads"
params = {
"part": "snippet",
"videoId": video_id,
"key": CONFIG["YOUTUBE_API_KEY"],
"maxResults": min(max_comments, 100),
"textFormat": "plainText",
"order": "relevance" # Get most relevant comments
}
try:
response = requests.get(url, params=params, timeout=10)
if response.status_code != 200:
print(f"YouTube API error: {response.json()}")
return {"error": "Failed to fetch comments"}
items = response.json().get("items", [])
comments = [
item["snippet"]["topLevelComment"]["snippet"]["textDisplay"]
for item in items
]
# Ensure we have unique comments
unique_comments = list(set(comments))
return unique_comments if unique_comments else ["No comments available."]
except Exception as e:
print(f"Error fetching YouTube comments: {e}")
return {"error": "Error fetching comments"}
def get_imdb_reviews(movie_name):
"""Fetch unique IMDb reviews for a specific movie"""
if not CONFIG["COLLECT_API_KEY"]:
return {"error": "CollectAPI not configured"}
url = f"https://api.collectapi.com/imdb/imdbSearchByName?query={movie_name}"
headers = {
"authorization": f"apikey {CONFIG['COLLECT_API_KEY']}",
"content-type": "application/json"
}
try:
response = requests.get(url, headers=headers, timeout=10)
if response.status_code != 200:
print(f"IMDb API error: {response.json()}")
return {"error": "Failed to fetch reviews"}
data = response.json()
print(f"IMDb API response data: {data}") # Debug print
reviews = [
movie["imdbContent"]
for movie in data.get("result", [])
if "imdbContent" in movie
]
print(f"Extracted reviews: {reviews}") # Debug print
# Ensure unique reviews
unique_reviews = list(set(reviews))
return unique_reviews if unique_reviews else ["No reviews available."]
except Exception as e:
print(f"Error fetching IMDb reviews: {e}")
return {"error": "Error fetching reviews"}
@app.route("/")
def home():
return render_template("index.html")
@app.route("/analyze", methods=["POST"])
def analyze():
try:
data = request.get_json()
platform = data.get("platform", "").lower()
input_value = data.get("input", "").strip()
if not platform or not input_value:
return jsonify({"error": "Missing platform or input"}), 400
if platform == "youtube":
video_id = extract_video_id(input_value)
if not video_id:
return jsonify({"error": "Invalid YouTube URL"}), 400
comments = get_youtube_comments(video_id)
if "error" in comments:
return jsonify(comments), 400
results = analyze_sentiments(comments)
results["sample_comments"] = results.get("labeled_texts", [])[:5] if len(results.get("labeled_texts", [])) > 5 else results.get("labeled_texts", [])
results["platform"] = "youtube"
elif platform == "imdb":
reviews = get_imdb_reviews(input_value)
if "error" in reviews:
return jsonify(reviews), 400
results = analyze_sentiments(reviews)
results["sample_reviews"] = results.get("labeled_texts", [])[:5] if len(results.get("labeled_texts", [])) > 5 else results.get("labeled_texts", [])
results["platform"] = "imdb"
else:
return jsonify({"error": "Unsupported platform"}), 400
# Add unique identifier to results
results["analysis_id"] = get_unique_cache_key(platform, input_value)
return jsonify(results)
except Exception as e:
print(f"Analysis error: {str(e)}")
return jsonify({"error": "Analysis failed"}), 500
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5000, debug=False)