-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathapp.py
More file actions
376 lines (326 loc) · 12.9 KB
/
app.py
File metadata and controls
376 lines (326 loc) · 12.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
import os
import sqlite3
from flask import Flask, render_template, request, redirect, url_for, session
from werkzeug.utils import secure_filename
from werkzeug.security import generate_password_hash, check_password_hash
from agents import AgentOrchestrator
def get_db_connection(db_name):
conn = sqlite3.connect(db_name, timeout=10)
conn.execute("PRAGMA busy_timeout = 10000")
return conn
# --- Initialize DB and create users table with role ---
def init_db():
with get_db_connection("users.db") as conn:
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
password TEXT NOT NULL,
role TEXT NOT NULL DEFAULT 'user'
)
""")
conn.commit()
def init_reports_db():
with get_db_connection("users.db") as conn:
cursor = conn.cursor()
cursor.execute(
"""
CREATE TABLE IF NOT EXISTS reports (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT NOT NULL,
description TEXT NOT NULL,
attachment TEXT,
status TEXT NOT NULL DEFAULT 'Open',
confidence INTEGER,
final_solution TEXT,
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
updated_at TEXT DEFAULT CURRENT_TIMESTAMP
)
"""
)
conn.commit()
def create_report_record(username, description, attachment):
with get_db_connection("users.db") as conn:
cursor = conn.cursor()
cursor.execute(
"""
INSERT INTO reports (username, description, attachment, status)
VALUES (?, ?, ?, 'Open')
""",
(username, description, attachment),
)
conn.commit()
return cursor.lastrowid
def update_report_analysis(report_id, final_solution, confidence):
if not report_id:
return
with get_db_connection("users.db") as conn:
cursor = conn.cursor()
cursor.execute(
"""
UPDATE reports
SET final_solution = ?,
confidence = ?,
status = 'Analyzed',
updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""",
(final_solution, confidence, report_id),
)
conn.commit()
def get_reports_for_user(username):
with get_db_connection("users.db") as conn:
cursor = conn.cursor()
cursor.execute(
"""
SELECT id, description, status, confidence, created_at, updated_at
FROM reports
WHERE username = ?
ORDER BY id DESC
""",
(username,),
)
rows = cursor.fetchall()
return rows
init_db()
init_reports_db()
# --- Flask Setup ---
app = Flask(__name__)
app.config["UPLOAD_FOLDER"] = os.path.join(os.path.dirname(__file__), "uploads")
os.makedirs(app.config["UPLOAD_FOLDER"], exist_ok=True)
app.secret_key = "dev-secret-change-me"
ALLOWED_EXTENSIONS = {"png", "jpg", "jpeg", "txt", "log"}
agentic = AgentOrchestrator()
# --- Helpers ---
def allowed_file(filename):
return "." in filename and filename.rsplit(".", 1)[1].lower() in ALLOWED_EXTENSIONS
# --- Routes ---
# Home
@app.route("/")
def home():
return render_template("home.html")
@app.route("/my_reports")
def my_reports():
if "user" not in session:
return redirect(url_for("home"))
username = session.get("user")
reports = get_reports_for_user(username)
return render_template("my_reports.html", reports=reports, username=username)
# Register
@app.route("/register", methods=["GET", "POST"])
def register():
if request.method == "POST":
username = request.form.get("username", "").strip()
password = request.form.get("password", "").strip()
role = request.form.get("role", "user")
if not username or not password:
return "Enter both username and password."
hashed_pw = generate_password_hash(password)
try:
with get_db_connection("users.db") as conn:
cursor = conn.cursor()
cursor.execute("INSERT INTO users (username, password, role) VALUES (?, ?, ?)",
(username, hashed_pw, role))
conn.commit()
return redirect(url_for("login"))
except sqlite3.IntegrityError:
return "Username already exists."
return render_template("register.html")
# User Login
@app.route("/login", methods=["GET", "POST"])
def login():
if request.method == "POST":
username = request.form.get("username", "").strip()
password = request.form.get("password", "").strip()
if not username or not password:
return "Enter both username and password."
with get_db_connection("users.db") as conn:
cursor = conn.cursor()
cursor.execute("SELECT password, role FROM users WHERE username=?", (username,))
row = cursor.fetchone()
if row and check_password_hash(row[0], password):
session["user"] = username
session["role"] = row[1]
if row[1] == "admin":
return redirect(url_for("admin_dashboard"))
else:
return redirect(url_for("report"))
else:
return "Invalid credentials."
return render_template("login.html")
# Admin Login (separate page optional)
@app.route("/admin_login", methods=["GET", "POST"])
def admin_login():
if request.method == "POST":
username = request.form.get("username", "").strip()
password = request.form.get("password", "").strip()
if not username or not password:
return "Enter both username and password."
with get_db_connection("users.db") as conn:
cursor = conn.cursor()
cursor.execute("SELECT password, role FROM users WHERE username=?", (username,))
row = cursor.fetchone()
if row and check_password_hash(row[0], password) and row[1] == "admin":
session["user"] = username
session["role"] = "admin"
return redirect(url_for("admin_dashboard"))
else:
return "Invalid admin credentials."
return render_template("admin_login.html")
@app.route("/admin_register", methods=["GET", "POST"])
def admin_register():
if request.method == "POST":
username = request.form.get("username", "").strip()
password = request.form.get("password", "").strip()
if not username or not password:
return "Enter both username and password."
hashed_pw = generate_password_hash(password)
try:
with get_db_connection("users.db") as conn:
cursor = conn.cursor()
cursor.execute("INSERT INTO users (username, password, role) VALUES (?, ?, ?)",
(username, hashed_pw, "admin"))
conn.commit()
return redirect(url_for("admin_login"))
except sqlite3.IntegrityError:
return "Admin username already exists."
return render_template("admin_register.html")
# Admin Dashboard
@app.route("/admin")
def admin_dashboard():
if session.get("role") != "admin":
return "Access denied!"
return render_template("admin.html")
# Guest Access
@app.route("/guest")
def guest():
session["user"] = "Guest"
session["role"] = "guest"
return redirect(url_for("report"))
# Bug Report Page
@app.route("/report", methods=["GET", "POST"])
def report():
if "user" not in session:
return redirect(url_for("home"))
if request.method == "POST":
description = request.form.get("description", "").strip()
file = request.files.get("attachment")
file_path = None
if file and allowed_file(file.filename):
filename = secure_filename(file.filename)
file_path = os.path.join(app.config["UPLOAD_FOLDER"], filename)
file.save(file_path)
if not description and not file_path:
return "Please provide a description or attach a file."
session["description"] = description
session["attachment"] = file_path
report_id = create_report_record(session.get("user", "Guest"), description, file_path)
session["report_id"] = report_id
session["known_info"] = {}
session["conversation_history"] = []
session["current_question_key"] = None
session["process_cache"] = {
"turn_summaries": [],
"asked_keys": [],
"resolved_keys": [],
"stage": "diagnosing",
"latest_issue_summary": "",
}
return redirect(url_for("chat"))
return render_template("report.html")
# Chat Page
@app.route("/chat", methods=["GET", "POST"])
def chat():
description = session.get("description")
attachment = session.get("attachment")
known_info = session.get("known_info", {})
conversation_history = session.get("conversation_history", [])
current_question_key = session.get("current_question_key")
process_cache = session.get("process_cache", {})
report_id = session.get("report_id")
action = "send"
handled_reply = False
if request.method == "POST":
action = request.form.get("action", "send").strip().lower()
user_message = request.form.get("message", "").strip()
file = request.files.get("attachment")
if file and allowed_file(file.filename):
filename = secure_filename(file.filename)
file_path = os.path.join(app.config["UPLOAD_FOLDER"], filename)
file.save(file_path)
attachment = file_path
session["attachment"] = attachment
user_message = f"Attached file: {filename}" if not user_message else user_message
if user_message:
conversation_history.append({"role": "user", "content": user_message})
known_info, process_cache = agentic.update_known_info(
known_info,
current_question_key,
user_message,
process_cache,
)
session["known_info"] = known_info
session["process_cache"] = process_cache
session["current_question_key"] = None
if action == "continue":
if not user_message:
user_message = "Still not fixed after previous solution."
conversation_history.append({"role": "user", "content": user_message})
known_info, process_cache = agentic.update_known_info(
known_info,
current_question_key,
user_message,
process_cache,
)
session["known_info"] = known_info
session["process_cache"] = process_cache
ai_reply, next_key, should_generate_solution, process_cache = agentic.continue_reply(
description,
known_info,
conversation_history,
process_cache,
)
if not ai_reply:
ai_reply = "Share what happened after the last fix and I will refine the solution."
conversation_history.append({"role": "assistant", "content": ai_reply})
session["conversation_history"] = conversation_history
session["current_question_key"] = next_key
session["process_cache"] = process_cache
handled_reply = True
ai_reply = None
should_generate_solution = False
next_key = None
if (request.method == "POST" and not handled_reply) or not conversation_history:
ai_reply, next_key, should_generate_solution, process_cache = agentic.next_reply(
description,
known_info,
conversation_history,
process_cache,
)
if not ai_reply:
ai_reply = "Can you provide more details or any error messages?"
conversation_history.append({"role": "assistant", "content": ai_reply})
session["conversation_history"] = conversation_history
session["current_question_key"] = next_key
session["process_cache"] = process_cache
final_solution = None
final_solution_parsed = None
if should_generate_solution:
final_solution = agentic.generate_solution(
description,
known_info,
conversation_history,
process_cache,
)
confidence = agentic.extract_confidence(final_solution)
update_report_analysis(report_id, final_solution, confidence)
final_solution_parsed = agentic.parse_solution(final_solution)
return render_template(
"chat.html",
history=conversation_history,
final_solution=final_solution,
final_solution_parsed=final_solution_parsed
)
if __name__ == "__main__":
app.run(debug=True)