From 31d921fc9e5dc1e3ac04879cf9cf2c3aa1c0b714 Mon Sep 17 00:00:00 2001 From: Christian Chwala Date: Thu, 12 Feb 2026 14:15:44 +0100 Subject: [PATCH 1/8] Fix: enable URL synchronization for CML ID selection in dashboard --- grafana/provisioning/dashboards/definitions/cml-realtime.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/grafana/provisioning/dashboards/definitions/cml-realtime.json b/grafana/provisioning/dashboards/definitions/cml-realtime.json index fb3a8f4..6059cf5 100644 --- a/grafana/provisioning/dashboards/definitions/cml-realtime.json +++ b/grafana/provisioning/dashboards/definitions/cml-realtime.json @@ -392,7 +392,7 @@ "query": "SELECT DISTINCT cml_id::text FROM cml_metadata ORDER BY cml_id", "refresh": 1, "regex": "", - "skipUrlSync": true, + "skipUrlSync": false, "sort": 0, "tagValuesQuery": "", "tagsQuery": "", From 77bbfc707151d7c0561e9dc2a39ba00fd941c397 Mon Sep 17 00:00:00 2001 From: Christian Chwala Date: Thu, 12 Feb 2026 14:42:54 +0100 Subject: [PATCH 2/8] Fix and optimize coloring of CML lines on map - Optimize CML map coloring performance by precalculating statistics - Add cml_stats table and update_cml_stats() PostgreSQL function - Update db_writer.py to maintain stats cache on data insertion - Modify /api/cml-stats endpoint to use cached stats with dynamic 60min stddev --- database/init.sql | 58 +++++++++++++++++++++++++++++++++++++++++++++ parser/db_writer.py | 30 ++++++++++++++++++++++- webserver/main.py | 39 +++++++++++++++--------------- 3 files changed, 107 insertions(+), 20 deletions(-) diff --git a/database/init.sql b/database/init.sql index 1019a74..ca67ad3 100644 --- a/database/init.sql +++ b/database/init.sql @@ -19,4 +19,62 @@ CREATE TABLE cml_metadata ( PRIMARY KEY (cml_id, sublink_id) ); +CREATE TABLE cml_stats ( + cml_id TEXT PRIMARY KEY, + total_records BIGINT, + valid_records BIGINT, + null_records BIGINT, + completeness_percent REAL, + min_rsl REAL, + max_rsl REAL, + mean_rsl REAL, + stddev_rsl REAL, + last_rsl REAL, + last_update TIMESTAMPTZ DEFAULT NOW() +); + +CREATE OR REPLACE FUNCTION update_cml_stats(target_cml_id TEXT) RETURNS VOID AS $$ +BEGIN + INSERT INTO cml_stats ( + cml_id, + total_records, + valid_records, + null_records, + completeness_percent, + min_rsl, + max_rsl, + mean_rsl, + stddev_rsl, + last_rsl, + last_update + ) + SELECT + cd.cml_id::text, + COUNT(*) as total_records, + COUNT(CASE WHEN cd.rsl IS NOT NULL THEN 1 END) as valid_records, + COUNT(CASE WHEN cd.rsl IS NULL THEN 1 END) as null_records, + ROUND(100.0 * COUNT(CASE WHEN cd.rsl IS NOT NULL THEN 1 END) / COUNT(*), 2) as completeness_percent, + MIN(cd.rsl) as min_rsl, + MAX(cd.rsl) as max_rsl, + ROUND(AVG(cd.rsl)::numeric, 2) as mean_rsl, + ROUND(STDDEV(cd.rsl)::numeric, 2) as stddev_rsl, + (SELECT rsl FROM cml_data WHERE cml_id = cd.cml_id ORDER BY time DESC LIMIT 1) as last_rsl, + NOW() + FROM cml_data cd + WHERE cd.cml_id = target_cml_id + GROUP BY cd.cml_id + ON CONFLICT (cml_id) DO UPDATE SET + total_records = EXCLUDED.total_records, + valid_records = EXCLUDED.valid_records, + null_records = EXCLUDED.null_records, + completeness_percent = EXCLUDED.completeness_percent, + min_rsl = EXCLUDED.min_rsl, + max_rsl = EXCLUDED.max_rsl, + mean_rsl = EXCLUDED.mean_rsl, + stddev_rsl = EXCLUDED.stddev_rsl, + last_rsl = EXCLUDED.last_rsl, + last_update = EXCLUDED.last_update; +END; +$$ LANGUAGE plpgsql; + SELECT create_hypertable('cml_data', 'time'); \ No newline at end of file diff --git a/parser/db_writer.py b/parser/db_writer.py index 1c00735..5ddb9e7 100644 --- a/parser/db_writer.py +++ b/parser/db_writer.py @@ -189,6 +189,25 @@ def _execute_batch_insert( if cur and not cur.closed: cur.close() + def _update_stats_for_cmls(self, cml_ids: List[str]) -> None: + """Update cml_stats for the given CML IDs.""" + if not cml_ids: + return + + cur = self.conn.cursor() + try: + for cml_id in cml_ids: + cur.execute("SELECT update_cml_stats(%s)", (cml_id,)) + self.conn.commit() + logger.info("Updated stats for %d CMLs", len(cml_ids)) + except Exception: + self.conn.rollback() + logger.exception("Failed to update stats for CMLs: %s", cml_ids) + raise + finally: + if cur and not cur.closed: + cur.close() + def write_metadata(self, df) -> int: """Write metadata DataFrame to `cml_metadata`. @@ -253,10 +272,19 @@ def write_rawdata(self, df) -> int: ) records = [tuple(x) for x in df_subset.to_numpy()] + # Get unique CML IDs for stats update + unique_cml_ids = df_subset["cml_id"].unique().tolist() + sql = "INSERT INTO cml_data (time, cml_id, sublink_id, rsl, tsl) VALUES %s" - return self._with_connection_retry( + rows_written = self._with_connection_retry( lambda: self._execute_batch_insert( sql, records, "write raw data to database" ) ) + + # Update stats for affected CMLs + if rows_written > 0 and unique_cml_ids: + self._update_stats_for_cmls(unique_cml_ids) + + return rows_written diff --git a/webserver/main.py b/webserver/main.py index 661a2ae..34e147f 100644 --- a/webserver/main.py +++ b/webserver/main.py @@ -469,27 +469,28 @@ def api_cml_stats(): cur = conn.cursor() cur.execute( """ - WITH latest_60min AS ( - SELECT cml_id, rsl, time + SELECT + cs.cml_id::text, + cs.total_records, + cs.valid_records, + cs.null_records, + cs.completeness_percent, + cs.min_rsl, + cs.max_rsl, + cs.mean_rsl, + cs.stddev_rsl, + cs.last_rsl, + ROUND(STDDEV(cd.rsl)::numeric, 2) as stddev_last_60min + FROM cml_stats cs + LEFT JOIN ( + SELECT cml_id, rsl FROM cml_data WHERE time >= (SELECT MAX(time) FROM cml_data) - INTERVAL '60 minutes' - ) - SELECT - cd.cml_id::text, - COUNT(*) as total_records, - COUNT(CASE WHEN cd.rsl IS NOT NULL THEN 1 END) as valid_records, - COUNT(CASE WHEN cd.rsl IS NULL THEN 1 END) as null_records, - ROUND(100.0 * COUNT(CASE WHEN cd.rsl IS NOT NULL THEN 1 END) / COUNT(*), 2) as completeness_percent, - MIN(cd.rsl) as min_rsl, - MAX(cd.rsl) as max_rsl, - ROUND(AVG(cd.rsl)::numeric, 2) as mean_rsl, - ROUND(STDDEV(cd.rsl)::numeric, 2) as stddev_rsl, - (SELECT rsl FROM cml_data WHERE cml_id = cd.cml_id ORDER BY time DESC LIMIT 1) as last_rsl, - ROUND(STDDEV(l60.rsl)::numeric, 2) as stddev_last_60min - FROM cml_data cd - LEFT JOIN latest_60min l60 ON cd.cml_id = l60.cml_id - GROUP BY cd.cml_id - ORDER BY cd.cml_id + ) cd ON cs.cml_id = cd.cml_id + GROUP BY cs.cml_id, cs.total_records, cs.valid_records, cs.null_records, + cs.completeness_percent, cs.min_rsl, cs.max_rsl, cs.mean_rsl, + cs.stddev_rsl, cs.last_rsl + ORDER BY cs.cml_id """ ) data = cur.fetchall() From 1d061852511db5bc3721bc8132e39cf4f17f4e3b Mon Sep 17 00:00:00 2001 From: Christian Chwala Date: Thu, 12 Feb 2026 17:16:58 +0100 Subject: [PATCH 3/8] fix: Refactor DBWriter to defer commits until after batch operations and improve rollback error handling --- parser/db_writer.py | 37 ++++++++++++++++++++++++++++++------- 1 file changed, 30 insertions(+), 7 deletions(-) diff --git a/parser/db_writer.py b/parser/db_writer.py index 5ddb9e7..c410ba3 100644 --- a/parser/db_writer.py +++ b/parser/db_writer.py @@ -179,10 +179,13 @@ def _execute_batch_insert( psycopg2.extras.execute_values( cur, sql, records, template=None, page_size=1000 ) - self.conn.commit() + # Do not commit here; caller will commit once to allow batching return len(records) except Exception: - self.conn.rollback() + try: + self.conn.rollback() + except Exception: + logger.exception("Failed rollback after %s error", operation_name) logger.exception("Failed to %s", operation_name) raise finally: @@ -198,10 +201,13 @@ def _update_stats_for_cmls(self, cml_ids: List[str]) -> None: try: for cml_id in cml_ids: cur.execute("SELECT update_cml_stats(%s)", (cml_id,)) - self.conn.commit() - logger.info("Updated stats for %d CMLs", len(cml_ids)) + # Do not commit here; caller should commit once after batch operations + logger.info("Executed update_cml_stats for %d CMLs", len(cml_ids)) except Exception: - self.conn.rollback() + try: + self.conn.rollback() + except Exception: + logger.exception("Rollback failed while handling stats update error") logger.exception("Failed to update stats for CMLs: %s", cml_ids) raise finally: @@ -248,12 +254,22 @@ def write_metadata(self, df) -> int: "length = EXCLUDED.length" ) - return self._with_connection_retry( + rows_written = self._with_connection_retry( lambda: self._execute_batch_insert( sql, records, "write metadata to database" ) ) + # Commit once after batch operation + try: + if self.conn: + self.conn.commit() + except Exception: + logger.exception("Failed to commit metadata write") + raise + + return rows_written + def write_rawdata(self, df) -> int: """Write raw time series DataFrame to `cml_data`. @@ -283,8 +299,15 @@ def write_rawdata(self, df) -> int: ) ) - # Update stats for affected CMLs + # Update stats for affected CMLs and commit once afterward if rows_written > 0 and unique_cml_ids: self._update_stats_for_cmls(unique_cml_ids) + try: + if self.conn: + self.conn.commit() + except Exception: + logger.exception("Failed to commit raw data + stats update") + raise + return rows_written From a884c405c9edb68491a8966cad04c14d1c477b51 Mon Sep 17 00:00:00 2001 From: Christian Chwala Date: Thu, 12 Feb 2026 17:26:33 +0100 Subject: [PATCH 4/8] test: Add rollback tests for write_rawdata and write_metadata methods in DBWriter --- parser/tests/test_db_writer.py | 83 ++++++++++++++++++++++++++++++++++ 1 file changed, 83 insertions(+) diff --git a/parser/tests/test_db_writer.py b/parser/tests/test_db_writer.py index a0e3788..73ffdf3 100644 --- a/parser/tests/test_db_writer.py +++ b/parser/tests/test_db_writer.py @@ -234,3 +234,86 @@ def test_close_already_closed(): writer.close() # Should not raise assert writer.conn is None + + +def test_write_rawdata_execute_values_failure_triggers_rollback(mock_connection): + """When execute_values raises, write_rawdata should rollback and propagate.""" + writer = DBWriter("postgresql://test") + writer.conn = mock_connection + + df = pd.DataFrame( + { + "time": pd.to_datetime(["2026-01-22 10:00:00"]), + "cml_id": ["123"], + "sublink_id": ["A"], + "rsl": [-45.0], + "tsl": [1.0], + } + ) + + with patch( + "parser.db_writer.psycopg2.extras.execute_values", + side_effect=Exception("DB error"), + ): + with pytest.raises(Exception, match="DB error"): + writer.write_rawdata(df) + + # rollback should have been called by the error handler + mock_connection.rollback.assert_called() + + +def test_write_metadata_execute_values_failure_triggers_rollback(mock_connection): + """When execute_values raises during metadata write, rollback is performed.""" + writer = DBWriter("postgresql://test") + writer.conn = mock_connection + + df = pd.DataFrame( + { + "cml_id": ["123"], + "sublink_id": ["A"], + "site_0_lon": [13.4], + "site_0_lat": [52.5], + "site_1_lon": [13.6], + "site_1_lat": [52.7], + "frequency": [38.0], + "polarization": ["H"], + "length": [1200.0], + } + ) + + with patch( + "parser.db_writer.psycopg2.extras.execute_values", + side_effect=Exception("meta error"), + ): + with pytest.raises(Exception, match="meta error"): + writer.write_metadata(df) + + mock_connection.rollback.assert_called() + + +def test__update_stats_for_cmls_executes_queries_without_commit(mock_connection): + """Ensure _update_stats_for_cmls executes update function per CML and does not commit.""" + writer = DBWriter("postgresql://test") + writer.conn = mock_connection + + cml_ids = ["100", "200"] + writer._update_stats_for_cmls(cml_ids) + + cur = mock_connection.cursor.return_value + assert cur.execute.call_count == len(cml_ids) + # commit should not be called by the helper + mock_connection.commit.assert_not_called() + + +def test__update_stats_for_cmls_rollback_on_error(mock_connection): + """If executing update function raises, ensure rollback is attempted and exception propagated.""" + writer = DBWriter("postgresql://test") + writer.conn = mock_connection + + cur = mock_connection.cursor.return_value + cur.execute.side_effect = Exception("update failed") + + with pytest.raises(Exception, match="update failed"): + writer._update_stats_for_cmls(["100"]) + + mock_connection.rollback.assert_called() From a29573b123db69d5379a6d7e4b16d81a1a34ebe7 Mon Sep 17 00:00:00 2001 From: Christian Chwala Date: Thu, 12 Feb 2026 17:31:01 +0100 Subject: [PATCH 5/8] test: Add unit test for API CML stats endpoint with mock database connection --- webserver/tests/test_api_cml_stats.py | 60 +++++++++++++++++++++++++++ 1 file changed, 60 insertions(+) create mode 100644 webserver/tests/test_api_cml_stats.py diff --git a/webserver/tests/test_api_cml_stats.py b/webserver/tests/test_api_cml_stats.py new file mode 100644 index 0000000..5838380 --- /dev/null +++ b/webserver/tests/test_api_cml_stats.py @@ -0,0 +1,60 @@ +import sys +from unittest.mock import Mock + +import pytest + + +# Ensure optional heavy imports won't fail at import time +sys.modules.setdefault("folium", Mock()) +sys.modules.setdefault("altair", Mock()) +sys.modules.setdefault("requests", Mock()) + + +def test_api_cml_stats_returns_cached_stats(monkeypatch): + # Make the webserver package modules importable (tests add webserver/ to sys.path) + import os + + sys.path.insert(0, os.path.dirname(os.path.dirname(__file__))) + + # Import main module from webserver package directory + import main as wm + + # Prepare mock DB connection and cursor + mock_conn = Mock() + mock_cursor = Mock() + mock_conn.cursor.return_value = mock_cursor + + # Row fields: cml_id, total_records, valid_records, null_records, completeness_percent, + # min_rsl, max_rsl, mean_rsl, stddev_rsl, last_rsl, stddev_last_60min + mock_cursor.fetchall.return_value = [ + ( + "10001", + 10, + 9, + 1, + 90.0, + -60.0, + -40.0, + -50.0, + 3.0, + -45.0, + 1.5, + ) + ] + + # Ensure cursor.close and conn.close exist + mock_cursor.close = Mock() + mock_conn.close = Mock() + + monkeypatch.setattr(wm, "get_db_connection", lambda: mock_conn) + + client = wm.app.test_client() + resp = client.get("/api/cml-stats") + assert resp.status_code == 200 + data = resp.get_json() + assert isinstance(data, list) + assert len(data) == 1 + row = data[0] + assert row["cml_id"] == "10001" + assert row["completeness_percent"] == 90.0 + assert row["last_rsl"] == -45.0 From 61dc9e993ab35dd8afbfb111ae9c6757fd7cb92b Mon Sep 17 00:00:00 2001 From: Christian Chwala Date: Thu, 12 Feb 2026 17:40:17 +0100 Subject: [PATCH 6/8] fix: Load main module directly by path to avoid sys.path modification issues --- webserver/tests/test_api_cml_stats.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/webserver/tests/test_api_cml_stats.py b/webserver/tests/test_api_cml_stats.py index 5838380..885771e 100644 --- a/webserver/tests/test_api_cml_stats.py +++ b/webserver/tests/test_api_cml_stats.py @@ -11,13 +11,17 @@ def test_api_cml_stats_returns_cached_stats(monkeypatch): - # Make the webserver package modules importable (tests add webserver/ to sys.path) + # Load the webserver `main.py` module directly by path to avoid modifying + # sys.path (which can cause permission issues in some CI/container setups). import os + import importlib.util - sys.path.insert(0, os.path.dirname(os.path.dirname(__file__))) - - # Import main module from webserver package directory - import main as wm + main_path = os.path.abspath( + os.path.join(os.path.dirname(__file__), "..", "main.py") + ) + spec = importlib.util.spec_from_file_location("wm_main", main_path) + wm = importlib.util.module_from_spec(spec) + spec.loader.exec_module(wm) # Prepare mock DB connection and cursor mock_conn = Mock() From ade7488d8cba3aa7edfe23325a7d59753152cdcd Mon Sep 17 00:00:00 2001 From: Christian Chwala Date: Thu, 12 Feb 2026 17:57:47 +0100 Subject: [PATCH 7/8] fix: Refactor directory creation into a function and ensure directories are created on server start --- webserver/main.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/webserver/main.py b/webserver/main.py index 34e147f..f549aa8 100644 --- a/webserver/main.py +++ b/webserver/main.py @@ -18,9 +18,15 @@ DATA_STAGED_FOR_PARSING_DIR = "/app/data_staged_for_parsing" DATA_ARCHIVED_DIR = "/app/data_archived" -# Create directories if they don't exist -for directory in [DATA_INCOMING_DIR, DATA_STAGED_FOR_PARSING_DIR, DATA_ARCHIVED_DIR]: - Path(directory).mkdir(parents=True, exist_ok=True) + +def ensure_data_directories(): + """Create data directories if they don't exist.""" + for directory in [ + DATA_INCOMING_DIR, + DATA_STAGED_FOR_PARSING_DIR, + DATA_ARCHIVED_DIR, + ]: + Path(directory).mkdir(parents=True, exist_ok=True) def safe_float(value): @@ -801,6 +807,8 @@ def server_error(error): # ==================== START SERVER ==================== if __name__ == "__main__": + # Create data directories + ensure_data_directories() # Wait for database to be ready time.sleep(10) app.run(host="0.0.0.0", port=5000, debug=True) From a663359ab304ffd48702b5b19d34dc286a8cc981 Mon Sep 17 00:00:00 2001 From: Christian Chwala Date: Thu, 12 Feb 2026 17:58:19 +0100 Subject: [PATCH 8/8] Revert "fix: Load main module directly by path to avoid sys.path modification issues" This reverts commit 61dc9e993ab35dd8afbfb111ae9c6757fd7cb92b. --- webserver/tests/test_api_cml_stats.py | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/webserver/tests/test_api_cml_stats.py b/webserver/tests/test_api_cml_stats.py index 885771e..5838380 100644 --- a/webserver/tests/test_api_cml_stats.py +++ b/webserver/tests/test_api_cml_stats.py @@ -11,17 +11,13 @@ def test_api_cml_stats_returns_cached_stats(monkeypatch): - # Load the webserver `main.py` module directly by path to avoid modifying - # sys.path (which can cause permission issues in some CI/container setups). + # Make the webserver package modules importable (tests add webserver/ to sys.path) import os - import importlib.util - main_path = os.path.abspath( - os.path.join(os.path.dirname(__file__), "..", "main.py") - ) - spec = importlib.util.spec_from_file_location("wm_main", main_path) - wm = importlib.util.module_from_spec(spec) - spec.loader.exec_module(wm) + sys.path.insert(0, os.path.dirname(os.path.dirname(__file__))) + + # Import main module from webserver package directory + import main as wm # Prepare mock DB connection and cursor mock_conn = Mock()