diff --git a/database/init.sql b/database/init.sql index 1019a74..ca67ad3 100644 --- a/database/init.sql +++ b/database/init.sql @@ -19,4 +19,62 @@ CREATE TABLE cml_metadata ( PRIMARY KEY (cml_id, sublink_id) ); +CREATE TABLE cml_stats ( + cml_id TEXT PRIMARY KEY, + total_records BIGINT, + valid_records BIGINT, + null_records BIGINT, + completeness_percent REAL, + min_rsl REAL, + max_rsl REAL, + mean_rsl REAL, + stddev_rsl REAL, + last_rsl REAL, + last_update TIMESTAMPTZ DEFAULT NOW() +); + +CREATE OR REPLACE FUNCTION update_cml_stats(target_cml_id TEXT) RETURNS VOID AS $$ +BEGIN + INSERT INTO cml_stats ( + cml_id, + total_records, + valid_records, + null_records, + completeness_percent, + min_rsl, + max_rsl, + mean_rsl, + stddev_rsl, + last_rsl, + last_update + ) + SELECT + cd.cml_id::text, + COUNT(*) as total_records, + COUNT(CASE WHEN cd.rsl IS NOT NULL THEN 1 END) as valid_records, + COUNT(CASE WHEN cd.rsl IS NULL THEN 1 END) as null_records, + ROUND(100.0 * COUNT(CASE WHEN cd.rsl IS NOT NULL THEN 1 END) / COUNT(*), 2) as completeness_percent, + MIN(cd.rsl) as min_rsl, + MAX(cd.rsl) as max_rsl, + ROUND(AVG(cd.rsl)::numeric, 2) as mean_rsl, + ROUND(STDDEV(cd.rsl)::numeric, 2) as stddev_rsl, + (SELECT rsl FROM cml_data WHERE cml_id = cd.cml_id ORDER BY time DESC LIMIT 1) as last_rsl, + NOW() + FROM cml_data cd + WHERE cd.cml_id = target_cml_id + GROUP BY cd.cml_id + ON CONFLICT (cml_id) DO UPDATE SET + total_records = EXCLUDED.total_records, + valid_records = EXCLUDED.valid_records, + null_records = EXCLUDED.null_records, + completeness_percent = EXCLUDED.completeness_percent, + min_rsl = EXCLUDED.min_rsl, + max_rsl = EXCLUDED.max_rsl, + mean_rsl = EXCLUDED.mean_rsl, + stddev_rsl = EXCLUDED.stddev_rsl, + last_rsl = EXCLUDED.last_rsl, + last_update = EXCLUDED.last_update; +END; +$$ LANGUAGE plpgsql; + SELECT create_hypertable('cml_data', 'time'); \ No newline at end of file diff --git a/grafana/provisioning/dashboards/definitions/cml-realtime.json b/grafana/provisioning/dashboards/definitions/cml-realtime.json index fb3a8f4..6059cf5 100644 --- a/grafana/provisioning/dashboards/definitions/cml-realtime.json +++ b/grafana/provisioning/dashboards/definitions/cml-realtime.json @@ -392,7 +392,7 @@ "query": "SELECT DISTINCT cml_id::text FROM cml_metadata ORDER BY cml_id", "refresh": 1, "regex": "", - "skipUrlSync": true, + "skipUrlSync": false, "sort": 0, "tagValuesQuery": "", "tagsQuery": "", diff --git a/parser/db_writer.py b/parser/db_writer.py index 1c00735..c410ba3 100644 --- a/parser/db_writer.py +++ b/parser/db_writer.py @@ -179,16 +179,41 @@ def _execute_batch_insert( psycopg2.extras.execute_values( cur, sql, records, template=None, page_size=1000 ) - self.conn.commit() + # Do not commit here; caller will commit once to allow batching return len(records) except Exception: - self.conn.rollback() + try: + self.conn.rollback() + except Exception: + logger.exception("Failed rollback after %s error", operation_name) logger.exception("Failed to %s", operation_name) raise finally: if cur and not cur.closed: cur.close() + def _update_stats_for_cmls(self, cml_ids: List[str]) -> None: + """Update cml_stats for the given CML IDs.""" + if not cml_ids: + return + + cur = self.conn.cursor() + try: + for cml_id in cml_ids: + cur.execute("SELECT update_cml_stats(%s)", (cml_id,)) + # Do not commit here; caller should commit once after batch operations + logger.info("Executed update_cml_stats for %d CMLs", len(cml_ids)) + except Exception: + try: + self.conn.rollback() + except Exception: + logger.exception("Rollback failed while handling stats update error") + logger.exception("Failed to update stats for CMLs: %s", cml_ids) + raise + finally: + if cur and not cur.closed: + cur.close() + def write_metadata(self, df) -> int: """Write metadata DataFrame to `cml_metadata`. @@ -229,12 +254,22 @@ def write_metadata(self, df) -> int: "length = EXCLUDED.length" ) - return self._with_connection_retry( + rows_written = self._with_connection_retry( lambda: self._execute_batch_insert( sql, records, "write metadata to database" ) ) + # Commit once after batch operation + try: + if self.conn: + self.conn.commit() + except Exception: + logger.exception("Failed to commit metadata write") + raise + + return rows_written + def write_rawdata(self, df) -> int: """Write raw time series DataFrame to `cml_data`. @@ -253,10 +288,26 @@ def write_rawdata(self, df) -> int: ) records = [tuple(x) for x in df_subset.to_numpy()] + # Get unique CML IDs for stats update + unique_cml_ids = df_subset["cml_id"].unique().tolist() + sql = "INSERT INTO cml_data (time, cml_id, sublink_id, rsl, tsl) VALUES %s" - return self._with_connection_retry( + rows_written = self._with_connection_retry( lambda: self._execute_batch_insert( sql, records, "write raw data to database" ) ) + + # Update stats for affected CMLs and commit once afterward + if rows_written > 0 and unique_cml_ids: + self._update_stats_for_cmls(unique_cml_ids) + + try: + if self.conn: + self.conn.commit() + except Exception: + logger.exception("Failed to commit raw data + stats update") + raise + + return rows_written diff --git a/parser/tests/test_db_writer.py b/parser/tests/test_db_writer.py index a0e3788..73ffdf3 100644 --- a/parser/tests/test_db_writer.py +++ b/parser/tests/test_db_writer.py @@ -234,3 +234,86 @@ def test_close_already_closed(): writer.close() # Should not raise assert writer.conn is None + + +def test_write_rawdata_execute_values_failure_triggers_rollback(mock_connection): + """When execute_values raises, write_rawdata should rollback and propagate.""" + writer = DBWriter("postgresql://test") + writer.conn = mock_connection + + df = pd.DataFrame( + { + "time": pd.to_datetime(["2026-01-22 10:00:00"]), + "cml_id": ["123"], + "sublink_id": ["A"], + "rsl": [-45.0], + "tsl": [1.0], + } + ) + + with patch( + "parser.db_writer.psycopg2.extras.execute_values", + side_effect=Exception("DB error"), + ): + with pytest.raises(Exception, match="DB error"): + writer.write_rawdata(df) + + # rollback should have been called by the error handler + mock_connection.rollback.assert_called() + + +def test_write_metadata_execute_values_failure_triggers_rollback(mock_connection): + """When execute_values raises during metadata write, rollback is performed.""" + writer = DBWriter("postgresql://test") + writer.conn = mock_connection + + df = pd.DataFrame( + { + "cml_id": ["123"], + "sublink_id": ["A"], + "site_0_lon": [13.4], + "site_0_lat": [52.5], + "site_1_lon": [13.6], + "site_1_lat": [52.7], + "frequency": [38.0], + "polarization": ["H"], + "length": [1200.0], + } + ) + + with patch( + "parser.db_writer.psycopg2.extras.execute_values", + side_effect=Exception("meta error"), + ): + with pytest.raises(Exception, match="meta error"): + writer.write_metadata(df) + + mock_connection.rollback.assert_called() + + +def test__update_stats_for_cmls_executes_queries_without_commit(mock_connection): + """Ensure _update_stats_for_cmls executes update function per CML and does not commit.""" + writer = DBWriter("postgresql://test") + writer.conn = mock_connection + + cml_ids = ["100", "200"] + writer._update_stats_for_cmls(cml_ids) + + cur = mock_connection.cursor.return_value + assert cur.execute.call_count == len(cml_ids) + # commit should not be called by the helper + mock_connection.commit.assert_not_called() + + +def test__update_stats_for_cmls_rollback_on_error(mock_connection): + """If executing update function raises, ensure rollback is attempted and exception propagated.""" + writer = DBWriter("postgresql://test") + writer.conn = mock_connection + + cur = mock_connection.cursor.return_value + cur.execute.side_effect = Exception("update failed") + + with pytest.raises(Exception, match="update failed"): + writer._update_stats_for_cmls(["100"]) + + mock_connection.rollback.assert_called() diff --git a/webserver/main.py b/webserver/main.py index 661a2ae..f549aa8 100644 --- a/webserver/main.py +++ b/webserver/main.py @@ -18,9 +18,15 @@ DATA_STAGED_FOR_PARSING_DIR = "/app/data_staged_for_parsing" DATA_ARCHIVED_DIR = "/app/data_archived" -# Create directories if they don't exist -for directory in [DATA_INCOMING_DIR, DATA_STAGED_FOR_PARSING_DIR, DATA_ARCHIVED_DIR]: - Path(directory).mkdir(parents=True, exist_ok=True) + +def ensure_data_directories(): + """Create data directories if they don't exist.""" + for directory in [ + DATA_INCOMING_DIR, + DATA_STAGED_FOR_PARSING_DIR, + DATA_ARCHIVED_DIR, + ]: + Path(directory).mkdir(parents=True, exist_ok=True) def safe_float(value): @@ -469,27 +475,28 @@ def api_cml_stats(): cur = conn.cursor() cur.execute( """ - WITH latest_60min AS ( - SELECT cml_id, rsl, time + SELECT + cs.cml_id::text, + cs.total_records, + cs.valid_records, + cs.null_records, + cs.completeness_percent, + cs.min_rsl, + cs.max_rsl, + cs.mean_rsl, + cs.stddev_rsl, + cs.last_rsl, + ROUND(STDDEV(cd.rsl)::numeric, 2) as stddev_last_60min + FROM cml_stats cs + LEFT JOIN ( + SELECT cml_id, rsl FROM cml_data WHERE time >= (SELECT MAX(time) FROM cml_data) - INTERVAL '60 minutes' - ) - SELECT - cd.cml_id::text, - COUNT(*) as total_records, - COUNT(CASE WHEN cd.rsl IS NOT NULL THEN 1 END) as valid_records, - COUNT(CASE WHEN cd.rsl IS NULL THEN 1 END) as null_records, - ROUND(100.0 * COUNT(CASE WHEN cd.rsl IS NOT NULL THEN 1 END) / COUNT(*), 2) as completeness_percent, - MIN(cd.rsl) as min_rsl, - MAX(cd.rsl) as max_rsl, - ROUND(AVG(cd.rsl)::numeric, 2) as mean_rsl, - ROUND(STDDEV(cd.rsl)::numeric, 2) as stddev_rsl, - (SELECT rsl FROM cml_data WHERE cml_id = cd.cml_id ORDER BY time DESC LIMIT 1) as last_rsl, - ROUND(STDDEV(l60.rsl)::numeric, 2) as stddev_last_60min - FROM cml_data cd - LEFT JOIN latest_60min l60 ON cd.cml_id = l60.cml_id - GROUP BY cd.cml_id - ORDER BY cd.cml_id + ) cd ON cs.cml_id = cd.cml_id + GROUP BY cs.cml_id, cs.total_records, cs.valid_records, cs.null_records, + cs.completeness_percent, cs.min_rsl, cs.max_rsl, cs.mean_rsl, + cs.stddev_rsl, cs.last_rsl + ORDER BY cs.cml_id """ ) data = cur.fetchall() @@ -800,6 +807,8 @@ def server_error(error): # ==================== START SERVER ==================== if __name__ == "__main__": + # Create data directories + ensure_data_directories() # Wait for database to be ready time.sleep(10) app.run(host="0.0.0.0", port=5000, debug=True) diff --git a/webserver/tests/test_api_cml_stats.py b/webserver/tests/test_api_cml_stats.py new file mode 100644 index 0000000..5838380 --- /dev/null +++ b/webserver/tests/test_api_cml_stats.py @@ -0,0 +1,60 @@ +import sys +from unittest.mock import Mock + +import pytest + + +# Ensure optional heavy imports won't fail at import time +sys.modules.setdefault("folium", Mock()) +sys.modules.setdefault("altair", Mock()) +sys.modules.setdefault("requests", Mock()) + + +def test_api_cml_stats_returns_cached_stats(monkeypatch): + # Make the webserver package modules importable (tests add webserver/ to sys.path) + import os + + sys.path.insert(0, os.path.dirname(os.path.dirname(__file__))) + + # Import main module from webserver package directory + import main as wm + + # Prepare mock DB connection and cursor + mock_conn = Mock() + mock_cursor = Mock() + mock_conn.cursor.return_value = mock_cursor + + # Row fields: cml_id, total_records, valid_records, null_records, completeness_percent, + # min_rsl, max_rsl, mean_rsl, stddev_rsl, last_rsl, stddev_last_60min + mock_cursor.fetchall.return_value = [ + ( + "10001", + 10, + 9, + 1, + 90.0, + -60.0, + -40.0, + -50.0, + 3.0, + -45.0, + 1.5, + ) + ] + + # Ensure cursor.close and conn.close exist + mock_cursor.close = Mock() + mock_conn.close = Mock() + + monkeypatch.setattr(wm, "get_db_connection", lambda: mock_conn) + + client = wm.app.test_client() + resp = client.get("/api/cml-stats") + assert resp.status_code == 200 + data = resp.get_json() + assert isinstance(data, list) + assert len(data) == 1 + row = data[0] + assert row["cml_id"] == "10001" + assert row["completeness_percent"] == 90.0 + assert row["last_rsl"] == -45.0