Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 144 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1 +1,144 @@
# WattWatch
# WattWatch

A Docker-based IoT data pipeline that:

- Receives sensor data via MQTT
- Performs prediction and stores true and predicted data in PostgreSQL
- Visualizes data in Grafana with outliers indication

demonstration_mqtt.ipynb notebbok deomstrates training of prophet and how to send data via mqqt.

---

## Architecture

This project uses:

- Eclipse Mosquitto (MQTT broker)
- PostgreSQL (relational database)
- Python writer service (MQTT subscriber + DB writer)
- Grafana (analytics and visualization platform)

Data Flow:

MQTT Devices
Mosquitto (Broker)
Writer Service (Python)
PostgreSQL
Grafana Dashboard

---

## Project Structure
```
project/
├── docker-compose.yml
├── mosquitto/
│ └── config/
├── writer/
│ ├── Dockerfile
│ ├── requirements.txt
│ └── writer.py
└── dashboard/ (optional if using Dash instead of Grafana)
```

---

## Getting Started

Start the stack:

docker compose up --build

Services:

- MQTT Broker: port 1883
- PostgreSQL: port 5432
- Grafana: http://localhost:3000

---

## Default Credentials
```
PostgreSQL:
- Database: measurements_db
- User: user
- Password: password
- Host (inside Docker): db
- Port: 5432

Grafana:
- URL: http://localhost:3000
- User: admin
- Password: admin
```

---

## Configuring Grafana

1. Open http://localhost:3000
2. Go to Settings → Data Sources → Add Data Source
3. Choose PostgreSQL
4. Use:
```
Host: db:5432
Database: measurements_db
User: user
Password: password
SSL Mode: disable
```

Click "Save & Test"

Create a new time-series dashboard with the following query:
```
SELECT
time as "time",
measurements_true,
measurements_predicted_upper_bound,
measurements_predicted_lower_bound

FROM sensor_data
ORDER BY time;
```

Then, configure fill below property of dash to visualize confidence interval between lower and upper bounds.

---

## Database Schema
```
CREATE TABLE sensor_data (
time TIMESTAMP,
tenant VARCHAR(255),
device VARCHAR(255),
measurements_true FLOAT,
measurements_predicted FLOAT,
measurements_predicted_upper_bound FLOAT,
measurements_predicted_lower_bound FLOAT
);
```
---

## Environment Variables (Writer Service)
```
DB_HOST=db
DB_NAME=measurements_db
DB_USER=user
DB_PASSWORD=password
MQTT_BROKER=mosquitto
```
---

## Development Notes

- Docker service names are used as hostnames inside the network (db, mosquitto)
- Writer implements retry logic for DB startup
- Grafana refresh interval can be configured per dashboard
- Output buffering disabled via PYTHONUNBUFFERED=1
162 changes: 162 additions & 0 deletions demonstration_mqtt.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": 68,
"metadata": {},
"outputs": [],
"source": [
"import pandas as pd\n",
"import json\n",
"data = pd.read_csv('pv_week_meteo.csv')"
]
},
{
"cell_type": "code",
"execution_count": 69,
"metadata": {},
"outputs": [],
"source": [
"data_ts_t = data[['timestamp', 'T_ambient_degC']]"
]
},
{
"cell_type": "code",
"execution_count": 70,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"/var/folders/kv/6z6fj5td23z74h5w23gx_d0r0000gn/T/ipykernel_58583/2021538147.py:1: SettingWithCopyWarning: \n",
"A value is trying to be set on a copy of a slice from a DataFrame\n",
"\n",
"See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy\n",
" data_ts_t.rename(columns={'timestamp': 'ds', 'T_ambient_degC': 'y'}, inplace=True)\n"
]
}
],
"source": [
"data_ts_t.rename(columns={'timestamp': 'ds', 'T_ambient_degC': 'y'}, inplace=True)"
]
},
{
"cell_type": "code",
"execution_count": 72,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"10:00:05 - cmdstanpy - INFO - Chain [1] start processing\n",
"10:00:05 - cmdstanpy - INFO - Chain [1] done processing\n"
]
}
],
"source": [
"import prophet\n",
"model = prophet.Prophet()\n",
"datas = []\n",
"import numpy as np\n",
"for i in range(5):\n",
" cur_data = data_ts_t.copy()\n",
" cur_data['ds'] = pd.to_datetime(cur_data['ds']) + datetime.timedelta(weeks=i)\n",
" cur_data['y'] = cur_data['y'] + np.random.normal(0, 0.5, size=cur_data.shape[0])\n",
" \n",
" datas.append(cur_data)\n",
" \n",
"data_ts_t = pd.concat(datas, ignore_index=True)\n",
"\n",
"\n",
"model = model.fit(data_ts_t)"
]
},
{
"cell_type": "code",
"execution_count": 74,
"metadata": {},
"outputs": [],
"source": [
"from prophet.serialize import model_to_json, model_from_json\n",
"\n",
"with open('serialized_model.json', 'w') as fout:\n",
" fout.write(model_to_json(model)) # m is your Prophet mode"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"/var/folders/kv/6z6fj5td23z74h5w23gx_d0r0000gn/T/ipykernel_58583/3311663815.py:4: DeprecationWarning: Callback API version 1 is deprecated, update to latest version\n",
" client = mqtt.Client()\n"
]
}
],
"source": [
"import paho.mqtt.client as mqtt\n",
"import json\n",
"import datetime\n",
"client = mqtt.Client()\n",
"client.connect(\"localhost\", 1883, 60)\n",
"import time\n",
"\n",
"for week in range(5):\n",
" for (time_, temp) in zip(data_ts_t['ds'], data_ts_t['y']):\n",
" time_ = str(time_)\n",
" payload = {\n",
" \"topic\": f\"{'jsi'}/{'grid'}/things/twin/commands/merge\",\n",
" \"timestamp\": str(datetime.datetime.strptime(time_, \"%Y-%m-%d %H:%M:%S\") + datetime.timedelta(weeks=week)),\n",
" \"headers\": {\n",
" \"content-type\": \"application/merge-patch+json\"\n",
" },\n",
" \"path\": \"/features\",\n",
" \"value\": {\n",
" 'meteo':{\n",
" \"TdegC\": temp + np.random.normal(0, 0.5)\n",
" }\n",
" }\n",
" }\n",
" json_payload = json.dumps(payload)\n",
" client.publish(\"sensors/temp/live\", payload= json_payload, qos=0)\n",
" time.sleep(5)\n",
"\n",
"client.disconnect() "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "base",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.7"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
72 changes: 72 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
version: "3.8"
services:


# -------------------------
# MQTT Broker
# -------------------------
mosquitto:
image: eclipse-mosquitto:2
container_name: mqtt_broker
ports:
- "1883:1883"
- "9001:9001"
volumes:
- ./mosquitto/config:/mosquitto/config
- ./mosquitto/data:/mosquitto/data
- ./mosquitto/log:/mosquitto/log
restart: unless-stopped


# -------------------------
# PostgreSQL Database
# -------------------------
db:
image: postgres:15
container_name: mqtt_db
environment:
POSTGRES_USER: user
POSTGRES_PASSWORD: password
POSTGRES_DB: measurements_db
ports:
- "5432:5432"
restart: unless-stopped
volumes:
- postgres_data:/var/lib/postgresql/data


grafana:
image: grafana/grafana:latest
container_name: grafana
ports:
- "3000:3000"
depends_on:
- db
environment:
- GF_SECURITY_ADMIN_USER=admin
- GF_SECURITY_ADMIN_PASSWORD=admin
volumes:
- grafana_data:/var/lib/grafana
restart: unless-stopped


# -------------------------
# MQTT → DB Writer Service
# -------------------------
writer:
build: ./writer
container_name: mqtt_writer
depends_on:
- mosquitto
- db
environment:
MQTT_BROKER: mosquitto
MQTT_PORT: 1883
DB_HOST: db
DB_NAME: measurements_db
DB_USER: user
DB_PASSWORD: password

volumes:
postgres_data:
grafana_data:
2 changes: 2 additions & 0 deletions mosquitto/config/mosquitto.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
listener 1883
allow_anonymous true
Loading