|
1 | 1 | #!/usr/bin/env python3 |
2 | | -import paho.mqtt.client as mqtt |
3 | | -import pymongo |
4 | 2 | import json |
5 | 3 | from datetime import datetime |
6 | 4 |
|
| 5 | +import paho.mqtt.client as mqtt |
| 6 | +import pymongo |
| 7 | + |
7 | 8 | MQTT_BROKER = "agstream_mosquitto" |
8 | 9 | MQTT_PORT = 1883 |
9 | 10 | MQTT_TOPIC = "deepstream/predictions" |
10 | 11 | MONGO_URI = "mongodb://agstream_mongo:27017/" |
11 | 12 | MONGO_DB = "agstream" |
12 | 13 | MONGO_COLLECTION = "predictions" |
13 | 14 |
|
| 15 | + |
14 | 16 | def on_connect(client, userdata, flags, rc): |
15 | 17 | print(f"✅ Connected to MQTT broker with result code {rc}") |
16 | 18 | client.subscribe(MQTT_TOPIC) |
17 | 19 |
|
| 20 | + |
18 | 21 | def on_message(client, userdata, msg): |
19 | 22 | try: |
20 | 23 | payload = json.loads(msg.payload.decode()) |
21 | 24 | payload["received_at"] = datetime.now().isoformat() |
22 | | - |
| 25 | + |
23 | 26 | # Extract classification data from nvmsgbroker format |
24 | 27 | obj = payload.get("object", {}) |
25 | 28 | if obj.get("id") != "0" and obj.get("id"): |
26 | 29 | class_id = int(obj["id"]) |
27 | 30 | confidence = obj.get("confidence", 0) |
28 | | - |
| 31 | + |
29 | 32 | # Add extracted classification |
30 | | - payload["classification"] = { |
31 | | - "class_id": class_id, |
32 | | - "confidence": confidence |
33 | | - } |
| 33 | + payload["classification"] = {"class_id": class_id, "confidence": confidence} |
34 | 34 | print(f"🌱 FOUND CLASSIFICATION: ID {class_id}, confidence {confidence:.3f}") |
35 | | - |
| 35 | + |
36 | 36 | mongo_client = pymongo.MongoClient(MONGO_URI) |
37 | 37 | db = mongo_client[MONGO_DB] |
38 | 38 | collection = db[MONGO_COLLECTION] |
39 | | - |
| 39 | + |
40 | 40 | result = collection.insert_one(payload) |
41 | | - |
| 41 | + |
42 | 42 | if "classification" in payload: |
43 | | - print(f"✅ Saved classification to MongoDB!") |
| 43 | + print("✅ Saved classification to MongoDB!") |
44 | 44 | else: |
45 | | - print(f"✅ Saved: No classification") |
46 | | - |
| 45 | + print("✅ Saved: No classification") |
| 46 | + |
47 | 47 | mongo_client.close() |
48 | | - |
| 48 | + |
49 | 49 | except Exception as e: |
50 | 50 | print(f"❌ Error: {e}") |
51 | 51 |
|
| 52 | + |
52 | 53 | if __name__ == "__main__": |
53 | 54 | client = mqtt.Client() |
54 | 55 | client.on_connect = on_connect |
55 | 56 | client.on_message = on_message |
56 | | - |
| 57 | + |
57 | 58 | print("🚀 Starting Enhanced MQTT Consumer...") |
58 | 59 | client.connect(MQTT_BROKER, MQTT_PORT, 60) |
59 | 60 | client.loop_forever() |
0 commit comments