Skip to content
58 changes: 58 additions & 0 deletions database/init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,62 @@ CREATE TABLE cml_metadata (
PRIMARY KEY (cml_id, sublink_id)
);

CREATE TABLE cml_stats (
cml_id TEXT PRIMARY KEY,
total_records BIGINT,
valid_records BIGINT,
null_records BIGINT,
completeness_percent REAL,
min_rsl REAL,
max_rsl REAL,
mean_rsl REAL,
stddev_rsl REAL,
last_rsl REAL,
last_update TIMESTAMPTZ DEFAULT NOW()
);

CREATE OR REPLACE FUNCTION update_cml_stats(target_cml_id TEXT) RETURNS VOID AS $$
BEGIN
INSERT INTO cml_stats (
cml_id,
total_records,
valid_records,
null_records,
completeness_percent,
min_rsl,
max_rsl,
mean_rsl,
stddev_rsl,
last_rsl,
last_update
)
SELECT
cd.cml_id::text,
COUNT(*) as total_records,
COUNT(CASE WHEN cd.rsl IS NOT NULL THEN 1 END) as valid_records,
COUNT(CASE WHEN cd.rsl IS NULL THEN 1 END) as null_records,
ROUND(100.0 * COUNT(CASE WHEN cd.rsl IS NOT NULL THEN 1 END) / COUNT(*), 2) as completeness_percent,
MIN(cd.rsl) as min_rsl,
MAX(cd.rsl) as max_rsl,
ROUND(AVG(cd.rsl)::numeric, 2) as mean_rsl,
ROUND(STDDEV(cd.rsl)::numeric, 2) as stddev_rsl,
(SELECT rsl FROM cml_data WHERE cml_id = cd.cml_id ORDER BY time DESC LIMIT 1) as last_rsl,
NOW()
FROM cml_data cd
WHERE cd.cml_id = target_cml_id
GROUP BY cd.cml_id
ON CONFLICT (cml_id) DO UPDATE SET
total_records = EXCLUDED.total_records,
valid_records = EXCLUDED.valid_records,
null_records = EXCLUDED.null_records,
completeness_percent = EXCLUDED.completeness_percent,
min_rsl = EXCLUDED.min_rsl,
max_rsl = EXCLUDED.max_rsl,
mean_rsl = EXCLUDED.mean_rsl,
stddev_rsl = EXCLUDED.stddev_rsl,
last_rsl = EXCLUDED.last_rsl,
last_update = EXCLUDED.last_update;
END;
$$ LANGUAGE plpgsql;

SELECT create_hypertable('cml_data', 'time');
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@
"query": "SELECT DISTINCT cml_id::text FROM cml_metadata ORDER BY cml_id",
"refresh": 1,
"regex": "",
"skipUrlSync": true,
"skipUrlSync": false,
"sort": 0,
"tagValuesQuery": "",
"tagsQuery": "",
Expand Down
59 changes: 55 additions & 4 deletions parser/db_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,16 +179,41 @@ def _execute_batch_insert(
psycopg2.extras.execute_values(
cur, sql, records, template=None, page_size=1000
)
self.conn.commit()
# Do not commit here; caller will commit once to allow batching
return len(records)
except Exception:
self.conn.rollback()
try:
self.conn.rollback()
except Exception:
logger.exception("Failed rollback after %s error", operation_name)
logger.exception("Failed to %s", operation_name)
raise
finally:
if cur and not cur.closed:
cur.close()

def _update_stats_for_cmls(self, cml_ids: List[str]) -> None:
"""Update cml_stats for the given CML IDs."""
if not cml_ids:
return

cur = self.conn.cursor()
try:
for cml_id in cml_ids:
cur.execute("SELECT update_cml_stats(%s)", (cml_id,))
# Do not commit here; caller should commit once after batch operations
logger.info("Executed update_cml_stats for %d CMLs", len(cml_ids))
except Exception:
try:
self.conn.rollback()
except Exception:
logger.exception("Rollback failed while handling stats update error")
logger.exception("Failed to update stats for CMLs: %s", cml_ids)
raise
finally:
if cur and not cur.closed:
cur.close()

def write_metadata(self, df) -> int:
"""Write metadata DataFrame to `cml_metadata`.

Expand Down Expand Up @@ -229,12 +254,22 @@ def write_metadata(self, df) -> int:
"length = EXCLUDED.length"
)

return self._with_connection_retry(
rows_written = self._with_connection_retry(
lambda: self._execute_batch_insert(
sql, records, "write metadata to database"
)
)

# Commit once after batch operation
try:
if self.conn:
self.conn.commit()
except Exception:
logger.exception("Failed to commit metadata write")
raise

return rows_written

def write_rawdata(self, df) -> int:
"""Write raw time series DataFrame to `cml_data`.

Expand All @@ -253,10 +288,26 @@ def write_rawdata(self, df) -> int:
)
records = [tuple(x) for x in df_subset.to_numpy()]

# Get unique CML IDs for stats update
unique_cml_ids = df_subset["cml_id"].unique().tolist()

sql = "INSERT INTO cml_data (time, cml_id, sublink_id, rsl, tsl) VALUES %s"

return self._with_connection_retry(
rows_written = self._with_connection_retry(
lambda: self._execute_batch_insert(
sql, records, "write raw data to database"
)
)

# Update stats for affected CMLs and commit once afterward
if rows_written > 0 and unique_cml_ids:
self._update_stats_for_cmls(unique_cml_ids)

try:
if self.conn:
self.conn.commit()
except Exception:
logger.exception("Failed to commit raw data + stats update")
raise

return rows_written
83 changes: 83 additions & 0 deletions parser/tests/test_db_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,3 +234,86 @@ def test_close_already_closed():

writer.close() # Should not raise
assert writer.conn is None


def test_write_rawdata_execute_values_failure_triggers_rollback(mock_connection):
"""When execute_values raises, write_rawdata should rollback and propagate."""
writer = DBWriter("postgresql://test")
writer.conn = mock_connection

df = pd.DataFrame(
{
"time": pd.to_datetime(["2026-01-22 10:00:00"]),
"cml_id": ["123"],
"sublink_id": ["A"],
"rsl": [-45.0],
"tsl": [1.0],
}
)

with patch(
"parser.db_writer.psycopg2.extras.execute_values",
side_effect=Exception("DB error"),
):
with pytest.raises(Exception, match="DB error"):
writer.write_rawdata(df)

# rollback should have been called by the error handler
mock_connection.rollback.assert_called()


def test_write_metadata_execute_values_failure_triggers_rollback(mock_connection):
"""When execute_values raises during metadata write, rollback is performed."""
writer = DBWriter("postgresql://test")
writer.conn = mock_connection

df = pd.DataFrame(
{
"cml_id": ["123"],
"sublink_id": ["A"],
"site_0_lon": [13.4],
"site_0_lat": [52.5],
"site_1_lon": [13.6],
"site_1_lat": [52.7],
"frequency": [38.0],
"polarization": ["H"],
"length": [1200.0],
}
)

with patch(
"parser.db_writer.psycopg2.extras.execute_values",
side_effect=Exception("meta error"),
):
with pytest.raises(Exception, match="meta error"):
writer.write_metadata(df)

mock_connection.rollback.assert_called()


def test__update_stats_for_cmls_executes_queries_without_commit(mock_connection):
"""Ensure _update_stats_for_cmls executes update function per CML and does not commit."""
writer = DBWriter("postgresql://test")
writer.conn = mock_connection

cml_ids = ["100", "200"]
writer._update_stats_for_cmls(cml_ids)

cur = mock_connection.cursor.return_value
assert cur.execute.call_count == len(cml_ids)
# commit should not be called by the helper
mock_connection.commit.assert_not_called()


def test__update_stats_for_cmls_rollback_on_error(mock_connection):
"""If executing update function raises, ensure rollback is attempted and exception propagated."""
writer = DBWriter("postgresql://test")
writer.conn = mock_connection

cur = mock_connection.cursor.return_value
cur.execute.side_effect = Exception("update failed")

with pytest.raises(Exception, match="update failed"):
writer._update_stats_for_cmls(["100"])

mock_connection.rollback.assert_called()
53 changes: 31 additions & 22 deletions webserver/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,15 @@
DATA_STAGED_FOR_PARSING_DIR = "/app/data_staged_for_parsing"
DATA_ARCHIVED_DIR = "/app/data_archived"

# Create directories if they don't exist
for directory in [DATA_INCOMING_DIR, DATA_STAGED_FOR_PARSING_DIR, DATA_ARCHIVED_DIR]:
Path(directory).mkdir(parents=True, exist_ok=True)

def ensure_data_directories():
"""Create data directories if they don't exist."""
for directory in [
DATA_INCOMING_DIR,
DATA_STAGED_FOR_PARSING_DIR,
DATA_ARCHIVED_DIR,
]:
Path(directory).mkdir(parents=True, exist_ok=True)


def safe_float(value):
Expand Down Expand Up @@ -469,27 +475,28 @@ def api_cml_stats():
cur = conn.cursor()
cur.execute(
"""
WITH latest_60min AS (
SELECT cml_id, rsl, time
SELECT
cs.cml_id::text,
cs.total_records,
cs.valid_records,
cs.null_records,
cs.completeness_percent,
cs.min_rsl,
cs.max_rsl,
cs.mean_rsl,
cs.stddev_rsl,
cs.last_rsl,
ROUND(STDDEV(cd.rsl)::numeric, 2) as stddev_last_60min
FROM cml_stats cs
LEFT JOIN (
SELECT cml_id, rsl
FROM cml_data
WHERE time >= (SELECT MAX(time) FROM cml_data) - INTERVAL '60 minutes'
)
SELECT
cd.cml_id::text,
COUNT(*) as total_records,
COUNT(CASE WHEN cd.rsl IS NOT NULL THEN 1 END) as valid_records,
COUNT(CASE WHEN cd.rsl IS NULL THEN 1 END) as null_records,
ROUND(100.0 * COUNT(CASE WHEN cd.rsl IS NOT NULL THEN 1 END) / COUNT(*), 2) as completeness_percent,
MIN(cd.rsl) as min_rsl,
MAX(cd.rsl) as max_rsl,
ROUND(AVG(cd.rsl)::numeric, 2) as mean_rsl,
ROUND(STDDEV(cd.rsl)::numeric, 2) as stddev_rsl,
(SELECT rsl FROM cml_data WHERE cml_id = cd.cml_id ORDER BY time DESC LIMIT 1) as last_rsl,
ROUND(STDDEV(l60.rsl)::numeric, 2) as stddev_last_60min
FROM cml_data cd
LEFT JOIN latest_60min l60 ON cd.cml_id = l60.cml_id
GROUP BY cd.cml_id
ORDER BY cd.cml_id
) cd ON cs.cml_id = cd.cml_id
GROUP BY cs.cml_id, cs.total_records, cs.valid_records, cs.null_records,
cs.completeness_percent, cs.min_rsl, cs.max_rsl, cs.mean_rsl,
cs.stddev_rsl, cs.last_rsl
ORDER BY cs.cml_id
"""
)
data = cur.fetchall()
Expand Down Expand Up @@ -800,6 +807,8 @@ def server_error(error):
# ==================== START SERVER ====================

if __name__ == "__main__":
# Create data directories
ensure_data_directories()
# Wait for database to be ready
time.sleep(10)
app.run(host="0.0.0.0", port=5000, debug=True)
60 changes: 60 additions & 0 deletions webserver/tests/test_api_cml_stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import sys
from unittest.mock import Mock

import pytest


# Ensure optional heavy imports won't fail at import time
sys.modules.setdefault("folium", Mock())
sys.modules.setdefault("altair", Mock())
sys.modules.setdefault("requests", Mock())


def test_api_cml_stats_returns_cached_stats(monkeypatch):
# Make the webserver package modules importable (tests add webserver/ to sys.path)
import os

sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))

# Import main module from webserver package directory
import main as wm

# Prepare mock DB connection and cursor
mock_conn = Mock()
mock_cursor = Mock()
mock_conn.cursor.return_value = mock_cursor

# Row fields: cml_id, total_records, valid_records, null_records, completeness_percent,
# min_rsl, max_rsl, mean_rsl, stddev_rsl, last_rsl, stddev_last_60min
mock_cursor.fetchall.return_value = [
(
"10001",
10,
9,
1,
90.0,
-60.0,
-40.0,
-50.0,
3.0,
-45.0,
1.5,
)
]

# Ensure cursor.close and conn.close exist
mock_cursor.close = Mock()
mock_conn.close = Mock()

monkeypatch.setattr(wm, "get_db_connection", lambda: mock_conn)

client = wm.app.test_client()
resp = client.get("/api/cml-stats")
assert resp.status_code == 200
data = resp.get_json()
assert isinstance(data, list)
assert len(data) == 1
row = data[0]
assert row["cml_id"] == "10001"
assert row["completeness_percent"] == 90.0
assert row["last_rsl"] == -45.0