-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathapp.py
More file actions
81 lines (62 loc) · 2.52 KB
/
app.py
File metadata and controls
81 lines (62 loc) · 2.52 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
from flask import Flask, request, jsonify
import firebase_admin
from firebase_admin import credentials, messaging
import os, json
app = Flask(__name__)
# --- Initialize Firebase Admin SDK securely ---
# Load Firebase credentials from environment variable
firebase_json = os.getenv("FIREBASE_CRED")
if firebase_json:
try:
cred = credentials.Certificate(json.loads(firebase_json))
firebase_admin.initialize_app(cred)
print("✅ Firebase initialized successfully.")
except Exception as e:
print(f"⚠️ Error initializing Firebase: {e}")
else:
print("⚠️ FIREBASE_CRED environment variable not found! Firebase not initialized.")
# --- Store device tokens temporarily (use DB in real app) ---
REGISTERED_TOKENS = []
@app.route('/register', methods=['POST'])
def register_device():
"""Register a new device token."""
data = request.json
token = data.get("token")
if not token:
return jsonify({"error": "No token provided"}), 400
if token not in REGISTERED_TOKENS:
REGISTERED_TOKENS.append(token)
print(f"✅ New device registered: {token[:10]}...")
return jsonify({"message": "Device registered successfully!"}), 200
@app.route('/trigger', methods=['POST'])
def trigger_alert():
"""Triggered by ESP8266 when gas/flame is detected."""
data = request.json
if not data:
return jsonify({"error": "No JSON data received"}), 400
sensor_type = data.get("sensor") # e.g. "flame" or "gas"
status = data.get("status") # e.g. "detected" or "clear"
print(f"🔥 Trigger received: {sensor_type} = {status}")
if status == "detected":
title = f"{sensor_type.capitalize()} Alert!"
body = f"{sensor_type.capitalize()} detected! Please check immediately."
else:
title = f"{sensor_type.capitalize()} Status Clear"
body = f"No {sensor_type} detected now."
# Send notification to all registered tokens
for token in REGISTERED_TOKENS:
message = messaging.Message(
notification=messaging.Notification(
title=title,
body=body,
),
token=token,
)
try:
response = messaging.send(message)
print(f"📩 Notification sent to {token[:10]}...: {response}")
except Exception as e:
print(f"⚠️ Error sending to {token[:10]}...: {e}")
return jsonify({"message": "Trigger processed and notifications sent!"}), 200
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)