From 2d4cea0611c9df3b5597142ab9749ef536cc8873 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20W=C3=BCrl?= Date: Sat, 15 Nov 2025 12:07:24 +0100 Subject: [PATCH 01/12] initial stuff --- blitzortung/cli/imprt2.py | 158 ++++++++++++++++++++++++++++++++++++++ tests/cli/__init__.py | 0 tests/cli/test_imprt2.py | 38 +++++++++ 3 files changed, 196 insertions(+) create mode 100755 blitzortung/cli/imprt2.py create mode 100644 tests/cli/__init__.py create mode 100644 tests/cli/test_imprt2.py diff --git a/blitzortung/cli/imprt2.py b/blitzortung/cli/imprt2.py new file mode 100755 index 0000000..5922091 --- /dev/null +++ b/blitzortung/cli/imprt2.py @@ -0,0 +1,158 @@ +# -*- coding: utf8 -*- + +""" + Copyright (C) 2011-2025 Andreas Würl + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +""" + +import datetime +import logging +import os +import time +from contextlib import nullcontext + +import requests +import statsd +import stopit +from optparse import OptionParser + +import blitzortung.dataimport +import blitzortung.db +import blitzortung.logger +from blitzortung import util +from blitzortung.data import Timestamp +from blitzortung.lock import LockWithTimeout, FailedToAcquireException + +logger = logging.getLogger(os.path.basename(__file__)) +blitzortung.set_parent_logger(logger) +blitzortung.add_log_handler(blitzortung.logger.create_console_handler()) + +statsd_client = statsd.StatsClient('localhost', 8125, prefix='org.blitzortung.import') + + +def timestamp_is_newer_than(timestamp, latest_time): + if not latest_time: + return True + return timestamp and timestamp > latest_time and timestamp - latest_time != datetime.timedelta() + + +def import_strikes_for(region, start_time, is_update=False): + logger.debug("work on region %d", region) + strike_db = blitzortung.db.strike() + latest_time_timer = util.Timer() + latest_time = strike_db.get_latest_time(region) + logger.debug("latest time for region %d: %s (%.03fs) ", region, latest_time, latest_time_timer.lap()) + if not latest_time: + latest_time = start_time + + if is_update: + start_time = update_start_time() + if not latest_time or start_time > latest_time: + latest_time = start_time + + reference_time = time.time() + strike_source = blitzortung.dataimport.strikes() + strikes = strike_source.get_strikes_since(latest_time, region=region) + query_time = time.time() + + strike_group_size = 10000 + strike_count = 0 + global_start_time = start_time = time.time() + for strike in strikes: + strike_db.insert(strike, region) + + strike_count += 1 + if strike_count % strike_group_size == 0: + strike_db.commit() + logger.info("commit #{} ({:.1f}/s) @{} for region {}".format( + strike_count, strike_group_size / (time.time() - start_time), strike.timestamp, region)) + start_time = time.time() + + if strike_count > 0: + strike_db.commit() + + insert_time = time.time() + stat_name = "strikes.%d" % region + statsd_client.incr(stat_name) + statsd_client.gauge(stat_name + ".count", strike_count) + statsd_client.timing(stat_name + ".get", max(1, int((query_time - reference_time) * 1000))) + statsd_client.timing(stat_name + ".insert", max(1, int((insert_time - query_time) * 1000))) + + logger.info("imported {} strikes ({:.1f}/s) for region {}".format( + strike_count, strike_count / (time.time() - global_start_time), region)) + + +def update_start_time() -> datetime.datetime: + return datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(minutes=30) + + +def import_strikes(regions, start_time, no_timeout=False, is_update=False): + + error_count = 0 + for region in regions: + for retry in range(5): + try: + with nullcontext() if no_timeout else stopit.SignalTimeout(300): + import_strikes_for(region, start_time, is_update=is_update) + break + except (requests.exceptions.ConnectionError, stopit.TimeoutException): + logger.warning('import failed: retry {} region {}'.format(retry, region)) + error_count += 1 + time.sleep(2) + continue + statsd_client.gauge("strikes.error_count", error_count) + +def update_strikes(): + config = blitzortung.config.config() + + now = datetime.datetime.now(datetime.timezone.utc) + strike_db = blitzortung.db.strike() + + start_time = now - datetime.timedelta(hours=1) + time_interval = blitzortung.db.query.TimeInterval(start_time, None) + order = 'timestamp' + + strikes = strike_db.select(time_interval=time_interval, order=order) + + data = requests.get("https://data.blitzortung.org/Data/Protected/last_strikes.php", auth=(config.get_username(), config.get_password())) + + + + + +def main(): + parser = OptionParser() + parser.add_option("-v", "--verbose", dest="verbose", action="store_true", help="verbose output") + parser.add_option("-d", "--debug", dest="debug", action="store_true", help="debug output") + + (options, args) = parser.parse_args() + + lock = LockWithTimeout('/tmp/.bo-import2.lock') + + try: + with lock.locked(10): + if options.debug: + blitzortung.set_log_level(logging.DEBUG) + elif options.verbose: + blitzortung.set_log_level(logging.INFO) + + update_strikes() + + + except FailedToAcquireException: + logger.warning("could not acquire lock") + + +if __name__ == "__main__": + main() diff --git a/tests/cli/__init__.py b/tests/cli/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/cli/test_imprt2.py b/tests/cli/test_imprt2.py new file mode 100644 index 0000000..71d3246 --- /dev/null +++ b/tests/cli/test_imprt2.py @@ -0,0 +1,38 @@ +import pytest +from mock import patch + + +example_data = """{"time":1763202124325980200,"lat":-15.296556,"lon":134.589548,"alt":0,"pol":0,"mds":12581,"mcg":162,"status":0,"region":2} +{"time":1763202124297904000,"lat":44.283328,"lon":8.910987,"alt":0,"pol":0,"mds":6830,"mcg":84,"status":2,"region":9} +{"time":1763202124297904000,"lat":44.283328,"lon":8.910987,"alt":0,"pol":0,"mds":6830,"mcg":84,"status":1,"region":8} +{"time":1763202124297897200,"lat":44.288127,"lon":8.927448,"alt":0,"pol":0,"mds":4934,"mcg":136,"status":2,"region":9} +{"time":1763202124297897200,"lat":44.288127,"lon":8.927448,"alt":0,"pol":0,"mds":4934,"mcg":136,"status":1,"region":8} +{"time":1763202124297892000,"lat":44.2774,"lon":8.929396,"alt":0,"pol":0,"mds":8913,"mcg":138,"status":0,"region":1} +{"time":1763202124101646800,"lat":-41.585918,"lon":152.926124,"alt":0,"pol":0,"mds":13391,"mcg":176,"status":0,"region":2} +{"time":1763202123983937500,"lat":44.284989,"lon":8.915263,"alt":0,"pol":0,"mds":10832,"mcg":178,"status":2,"region":9} +{"time":1763202123983937500,"lat":44.284989,"lon":8.915263,"alt":0,"pol":0,"mds":10832,"mcg":178,"status":1,"region":8} +{"time":1763202123983890000,"lat":44.27408,"lon":8.892988,"alt":0,"pol":0,"mds":7469,"mcg":197,"status":2,"region":9} +{"time":1763202123983890000,"lat":44.27408,"lon":8.892988,"alt":0,"pol":0,"mds":7469,"mcg":197,"status":1,"region":8} +{"time":1763202123983889200,"lat":44.279756,"lon":8.924568,"alt":0,"pol":0,"mds":7770,"mcg":82,"status":2,"region":9} +{"time":1763202123983889200,"lat":44.279756,"lon":8.924568,"alt":0,"pol":0,"mds":7770,"mcg":82,"status":1,"region":8} +{"time":1763202123983885800,"lat":44.276457,"lon":8.920456,"alt":0,"pol":0,"mds":5713,"mcg":159,"status":2,"region":9} +{"time":1763202123983885800,"lat":44.276457,"lon":8.920456,"alt":0,"pol":0,"mds":5713,"mcg":159,"status":1,"region":8} +{"time":1763202123702520300,"lat":-38.981925,"lon":151.55461,"alt":0,"pol":0,"mds":14906,"mcg":97,"status":0,"region":2} +{"time":1763202122363942000,"lat":-24.57577,"lon":148.610239,"alt":0,"pol":0,"mds":11794,"mcg":259,"status":0,"region":2} +{"time":1763202122363767300,"lat":-24.252154,"lon":148.741951,"alt":0,"pol":0,"mds":8582,"mcg":124,"status":0,"region":2} +{"time":1763202121942625800,"lat":24.982319,"lon":-59.714592,"alt":0,"pol":0,"mds":11143,"mcg":179,"status":0,"region":5} +{"time":1763202121942529000,"lat":24.770698,"lon":-59.515226,"alt":0,"pol":0,"mds":11771,"mcg":203,"status":1,"region":0} +{"time":1763202121942523000,"lat":24.785201,"lon":-59.502499,"alt":0,"pol":0,"mds":8231,"mcg":211,"status":0,"region":5} +{"time":1763202121735116800,"lat":-41.281446,"lon":152.188824,"alt":0,"pol":0,"mds":14916,"mcg":201,"status":0,"region":2} +{"time":1763202120567548200,"lat":-25.461695,"lon":149.862769,"alt":0,"pol":0,"mds":8506,"mcg":252,"status":0,"region":2} +{"time":1763202120439335400,"lat":-25.508039,"lon":149.845275,"alt":0,"pol":0,"mds":9619,"mcg":261,"status":0,"region":2} +{"time":1763202117207764500,"lat":36.865984,"lon":-9.198683,"alt":0,"pol":0,"mds":9862,"mcg":165,"status":0,"region":1} +{"time":1763202117194445000,"lat":36.918935,"lon":-9.123588,"alt":0,"pol":0,"mds":10229,"mcg":169,"status":0,"region":1} +{"time":1763202117194435300,"lat":36.959022,"lon":-9.187297,"alt":0,"pol":0,"mds":12547,"mcg":169,"status":2,"region":9} +{"time":1763202117194435300,"lat":36.959022,"lon":-9.187297,"alt":0,"pol":0,"mds":12547,"mcg":169,"status":1,"region":8} +{"time":1763202117194433500,"lat":36.845773,"lon":-9.083014,"alt":0,"pol":0,"mds":14208,"mcg":228,"status":0,"region":1}""" + +@pytest.fixture +def data_request(): + with patch('blitzortung.cli.imprt2.requests.get') as mock_get: + yield mock_get From bfd42bc18fa8b586dc3364f8a328bcdda6146f3d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20W=C3=BCrl?= Date: Sat, 15 Nov 2025 22:03:31 +0100 Subject: [PATCH 02/12] first implementation --- blitzortung/cli/imprt2.py | 252 +++++++++++++++++++++++-- pyproject.toml | 1 + tests/cli/test_imprt2.py | 373 +++++++++++++++++++++++++++++++++++++- 3 files changed, 602 insertions(+), 24 deletions(-) diff --git a/blitzortung/cli/imprt2.py b/blitzortung/cli/imprt2.py index 5922091..5983ff2 100755 --- a/blitzortung/cli/imprt2.py +++ b/blitzortung/cli/imprt2.py @@ -113,46 +113,258 @@ def import_strikes(regions, start_time, no_timeout=False, is_update=False): continue statsd_client.gauge("strikes.error_count", error_count) -def update_strikes(): +def fetch_strikes_from_url(url, strike_builder, auth=None): + """ + Fetch strike data from a given URL and parse it into Strike objects. + + Args: + url: The URL to fetch strike data from + strike_builder: Builder instance to parse strike data + auth: Optional tuple of (username, password) for authentication + + Yields: + Strike objects parsed from the URL response + """ + logger.info("Fetching strikes from URL: %s", url) + + try: + response = requests.get(url, auth=auth, timeout=30) + response.raise_for_status() + + strike_count = 0 + + for line in response.text.splitlines(): + line = line.strip() + if not line: + continue + + try: + strike = strike_builder.from_line(line).build() + if strike.timestamp.is_valid: + strike_count += 1 + yield strike + except Exception as e: + logger.warning("Failed to parse strike: %s (%s)", e, line) + continue + + logger.info("Fetched %d strikes from URL", strike_count) + + except requests.RequestException as e: + logger.error("Failed to fetch data from URL %s: %s", url, e) + raise + + +def create_strike_key(strike): + """ + Create a unique key for a strike based on its attributes. + + Since strikes from URLs don't have IDs, we identify them by: + - timestamp (nanosecond precision) + - location (x, y coordinates) + - amplitude + + Args: + strike: Strike object + + Returns: + Tuple representing the strike's unique characteristics + """ + # Get timestamp value (handle both Timestamp and datetime objects) + if hasattr(strike.timestamp, 'value'): + timestamp_value = strike.timestamp.value + else: + # For datetime objects, convert to nanoseconds since epoch + timestamp_value = int(strike.timestamp.timestamp() * 1_000_000_000) + + return ( + timestamp_value, + round(strike.x, 6), # Round to 6 decimal places for location + round(strike.y, 6), + strike.amplitude + ) + + +def get_existing_strike_keys(strike_db, time_interval, region=None): + """ + Retrieve keys of strikes already present in the database for a given time interval. + + Strikes are identified by their timestamp, location, and amplitude since + strikes from URLs don't have database IDs. + + Args: + strike_db: Database connection for strikes + time_interval: Time interval to query + region: Optional region filter + + Returns: + Set of strike keys (tuples of timestamp, x, y, amplitude) + """ + logger.debug("Querying existing strikes for interval %s - %s (region: %s)", + time_interval.start, time_interval.end, region) + + kwargs = {'time_interval': time_interval, 'order': 'timestamp'} + if region is not None: + kwargs['region'] = region + + existing_strikes = strike_db.select(**kwargs) + strike_keys = {create_strike_key(strike) for strike in existing_strikes} + + logger.info("Found %d existing strikes in database", len(strike_keys)) + return strike_keys + + +def update_strikes(url=None, region=None, hours=1): + """ + Update strike database by fetching data from a URL and inserting new strikes. + + This function: + 1. Calculates a time interval (default: last 1 hour) + 2. Retrieves existing strikes from the database for that interval + 3. Fetches strikes from the provided URL + 4. Inserts only strikes that are not already in the database + + Args: + url: URL to fetch strike data from (if None, uses default config URL) + region: Optional region to filter/tag strikes + hours: Number of hours to look back (default: 1) + + Returns: + Number of new strikes inserted into the database + """ + logger.info("Starting strike update for region %s (looking back %d hour(s))", region, hours) + + # Get configuration if URL not provided config = blitzortung.config.config() + if url is None: + url = "https://data.blitzortung.org/Data/Protected/last_strikes.php" + auth = (config.get_username(), config.get_password()) + else: + auth = None + # Calculate time interval (last N hours) now = datetime.datetime.now(datetime.timezone.utc) + start_time = now - datetime.timedelta(hours=hours) + end_time = now + time_interval = blitzortung.db.query.TimeInterval( + start_time, + end_time + ) + + logger.info("Time interval: %s to %s", start_time, end_time) + + # Get database connection strike_db = blitzortung.db.strike() - start_time = now - datetime.timedelta(hours=1) - time_interval = blitzortung.db.query.TimeInterval(start_time, None) - order = 'timestamp' + # Get existing strikes from database (identified by timestamp/location/amplitude) + existing_strike_keys = get_existing_strike_keys(strike_db, time_interval, region) + + # Fetch strikes from URL + strike_builder = blitzortung.builder.Strike() + try: + url_strikes = list(fetch_strikes_from_url(url, strike_builder, auth=auth)) + except requests.RequestException as e: + logger.error("Failed to fetch strikes from URL: %s", e) + return 0 + + # Filter strikes: only those within time interval and not in database + new_strikes = [] + for strike in url_strikes: + # Check if strike is within the time interval + if not (time_interval.start <= strike.timestamp <= time_interval.end): + logger.debug("Strike at %s outside time interval, skipping", strike.timestamp) + continue + + # Check if strike already exists in database (by timestamp/location/amplitude) + strike_key = create_strike_key(strike) + if strike_key in existing_strike_keys: + logger.debug("Strike at %s (%.6f, %.6f) already exists, skipping", + strike.timestamp, strike.x, strike.y) + continue + + new_strikes.append(strike) + + logger.info("Found %d new strikes to insert (out of %d from URL)", + len(new_strikes), len(url_strikes)) + + # Insert new strikes + insert_count = 0 + for strike in new_strikes: + try: + strike_db.insert(strike, region) + insert_count += 1 + + if insert_count % 1000 == 0: + strike_db.commit() + logger.info("Committed %d strikes so far", insert_count) + + except Exception as e: + logger.error("Failed to insert strike %s: %s", strike.id, e) + strike_db.rollback() + raise + + # Final commit + if insert_count > 0: + strike_db.commit() + logger.info("Successfully inserted %d new strikes", insert_count) + else: + logger.info("No new strikes to insert") + + strike_db.close() - strikes = strike_db.select(time_interval=time_interval, order=order) + # Update statistics + statsd_client.gauge("strikes.imported", insert_count) - data = requests.get("https://data.blitzortung.org/Data/Protected/last_strikes.php", auth=(config.get_username(), config.get_password())) + return insert_count def main(): - parser = OptionParser() - parser.add_option("-v", "--verbose", dest="verbose", action="store_true", help="verbose output") - parser.add_option("-d", "--debug", dest="debug", action="store_true", help="debug output") + """ + Command-line interface for the strike import tool. + """ + parser = OptionParser(description="Import strike data from URL into database") + parser.add_option("-u", "--url", dest="url", type="string", default=None, + help="URL to fetch strike data from (optional, uses default if not provided)") + parser.add_option("-r", "--region", dest="region", type="int", default=None, + help="Region number (optional)") + parser.add_option("--hours", dest="hours", type="int", default=1, + help="Number of hours to look back (default: 1)") + parser.add_option("-v", "--verbose", dest="verbose", action="store_true", + help="Enable verbose logging") + parser.add_option("-d", "--debug", dest="debug", action="store_true", + help="Enable debug logging") + parser.add_option("--no-lock", dest="no_lock", action="store_true", + help="Skip file locking (use with caution)") (options, args) = parser.parse_args() - lock = LockWithTimeout('/tmp/.bo-import2.lock') + # Set logging level + if options.debug: + blitzortung.set_log_level(logging.DEBUG) + elif options.verbose: + blitzortung.set_log_level(logging.INFO) + else: + blitzortung.set_log_level(logging.WARNING) - try: - with lock.locked(10): - if options.debug: - blitzortung.set_log_level(logging.DEBUG) - elif options.verbose: - blitzortung.set_log_level(logging.INFO) - - update_strikes() + # Use lock unless disabled + lock_context = nullcontext() if options.no_lock else LockWithTimeout('/tmp/.bo-import2.lock').locked(10) + try: + with lock_context: + count = update_strikes(url=options.url, region=options.region, hours=options.hours) + logger.info("Import completed: %d new strikes inserted", count) + return 0 except FailedToAcquireException: - logger.warning("could not acquire lock") + logger.warning("Could not acquire lock - another import may be running") + return 1 + except Exception as e: + logger.error("Import failed: %s", e, exc_info=options.debug) + return 1 if __name__ == "__main__": - main() + import sys + sys.exit(main()) diff --git a/pyproject.toml b/pyproject.toml index 3b66fba..73abe94 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,6 +29,7 @@ dependencies = [ [project.scripts] bo-db = "blitzortung.cli.db:main" bo-import = "blitzortung.cli.imprt:main" +bo-import2 = "blitzortung.cli.imprt2:main" bo-import-websocket = "blitzortung.cli.imprt_websocket:main" bo-webservice = "blitzortung.cli.start_webservice:main" bo-webservice-insertlog = "blitzortung.cli.webservice_insertlog:main" diff --git a/tests/cli/test_imprt2.py b/tests/cli/test_imprt2.py index 71d3246..af213ed 100644 --- a/tests/cli/test_imprt2.py +++ b/tests/cli/test_imprt2.py @@ -1,7 +1,15 @@ +import datetime +import json import pytest -from mock import patch +from assertpy import assert_that +from mock import Mock, patch, MagicMock +import blitzortung.cli.imprt2 as imprt2 +from blitzortung.data import Strike, Timestamp +from blitzortung.db.query import TimeInterval + +# Example strike data in JSON format (line-by-line) example_data = """{"time":1763202124325980200,"lat":-15.296556,"lon":134.589548,"alt":0,"pol":0,"mds":12581,"mcg":162,"status":0,"region":2} {"time":1763202124297904000,"lat":44.283328,"lon":8.910987,"alt":0,"pol":0,"mds":6830,"mcg":84,"status":2,"region":9} {"time":1763202124297904000,"lat":44.283328,"lon":8.910987,"alt":0,"pol":0,"mds":6830,"mcg":84,"status":1,"region":8} @@ -32,7 +40,364 @@ {"time":1763202117194435300,"lat":36.959022,"lon":-9.187297,"alt":0,"pol":0,"mds":12547,"mcg":169,"status":1,"region":8} {"time":1763202117194433500,"lat":36.845773,"lon":-9.083014,"alt":0,"pol":0,"mds":14208,"mcg":228,"status":0,"region":1}""" + +@pytest.fixture +def mock_strike_builder(): + """Create a mock strike builder that parses JSON data.""" + def from_json_line(line): + """Parse JSON line and create a mock builder that returns a strike.""" + try: + data = json.loads(line) + + # Create mock strike with valid timestamp + strike = Mock(spec=Strike) + strike.id = data.get('time') # Use timestamp as ID for testing + + # Create timestamp with is_valid property + timestamp = Mock(spec=Timestamp) + timestamp.is_valid = True # Mock allows setting this + timestamp.__le__ = Mock(return_value=True) + timestamp.__ge__ = Mock(return_value=True) + timestamp.__lt__ = Mock(return_value=False) + timestamp.__gt__ = Mock(return_value=False) + + strike.timestamp = timestamp + strike.x = data['lon'] + strike.y = data['lat'] + strike.altitude = data['alt'] + strike.amplitude = data.get('pol', 0) + strike.lateral_error = data.get('mds', 0) + strike.station_count = 0 + strike.stations = [] + + # Create a mock builder that returns this strike + mock_builder = Mock() + mock_builder.build = Mock(return_value=strike) + return mock_builder + + except (json.JSONDecodeError, KeyError) as e: + raise Exception(f"Failed to parse: {e}") + + builder = Mock() + builder.from_line = Mock(side_effect=from_json_line) + return builder + + @pytest.fixture -def data_request(): - with patch('blitzortung.cli.imprt2.requests.get') as mock_get: - yield mock_get +def mock_response(): + """Create a mock HTTP response with example data.""" + response = Mock() + response.status_code = 200 + response.text = example_data + response.raise_for_status = Mock() + return response + + +@pytest.fixture +def mock_strike_db(): + """Create a mock strike database.""" + db = Mock() + db.select = Mock(return_value=[]) + db.insert = Mock() + db.commit = Mock() + db.rollback = Mock() + db.close = Mock() + return db + + +class TestFetchStrikesFromUrl: + """Tests for fetching strikes from URL.""" + + def test_fetch_strikes_successfully(self, mock_response, mock_strike_builder): + """Test successful fetch and parse of strike data.""" + with patch('blitzortung.cli.imprt2.requests.get', return_value=mock_response): + strikes = list(imprt2.fetch_strikes_from_url('http://example.com/strikes', mock_strike_builder)) + + # Should parse all valid lines (28 strikes in example data) + assert_that(strikes).is_not_empty() + assert_that(len(strikes)).is_greater_than(0) + + def test_fetch_handles_empty_lines(self, mock_strike_builder): + """Test that empty lines are skipped.""" + response = Mock() + response.status_code = 200 + response.text = '\n\n{"time":1763202124325980200,"lat":-15.296556,"lon":134.589548,"alt":0}\n\n' + response.raise_for_status = Mock() + + with patch('blitzortung.cli.imprt2.requests.get', return_value=response): + strikes = list(imprt2.fetch_strikes_from_url('http://example.com/strikes', mock_strike_builder)) + + assert_that(strikes).is_length(1) + + def test_fetch_handles_invalid_strike_data(self, mock_strike_builder): + """Test that invalid strikes are logged and skipped.""" + response = Mock() + response.status_code = 200 + response.text = 'invalid json line\n{"time":1763202124325980200,"lat":-15.296556,"lon":134.589548,"alt":0}' + response.raise_for_status = Mock() + + # Make builder raise exception for invalid data + def from_line_with_error(line): + if line == 'invalid json line': + raise Exception("Invalid JSON") + return mock_strike_builder.from_line.return_value + + mock_strike_builder.from_line.side_effect = from_line_with_error + + with patch('blitzortung.cli.imprt2.requests.get', return_value=response): + strikes = list(imprt2.fetch_strikes_from_url('http://example.com/strikes', mock_strike_builder)) + + # Should skip invalid line but parse valid one + assert_that(strikes).is_length(1) + + def test_fetch_raises_on_http_error(self, mock_strike_builder): + """Test that HTTP errors are propagated.""" + import requests + + with patch('blitzortung.cli.imprt2.requests.get', side_effect=requests.RequestException("Connection error")): + with pytest.raises(requests.RequestException): + list(imprt2.fetch_strikes_from_url('http://example.com/strikes', mock_strike_builder)) + + def test_fetch_with_authentication(self, mock_response, mock_strike_builder): + """Test fetch with authentication credentials.""" + with patch('blitzortung.cli.imprt2.requests.get', return_value=mock_response) as mock_get: + list(imprt2.fetch_strikes_from_url('http://example.com/strikes', mock_strike_builder, auth=('user', 'pass'))) + + mock_get.assert_called_once() + call_kwargs = mock_get.call_args[1] + assert_that(call_kwargs['auth']).is_equal_to(('user', 'pass')) + + +class TestCreateStrikeKey: + """Tests for creating unique strike keys.""" + + def test_create_strike_key_with_timestamp_value(self): + """Test strike key creation with Timestamp object.""" + strike = Mock(spec=Strike) + strike.timestamp = Mock() + strike.timestamp.value = 1234567890123456789 + strike.x = 12.345678 + strike.y = 45.678901 + strike.amplitude = 100 + + key = imprt2.create_strike_key(strike) + + assert_that(key).is_equal_to((1234567890123456789, 12.345678, 45.678901, 100)) + + def test_create_strike_key_rounds_location(self): + """Test that location is rounded to 6 decimal places.""" + strike = Mock(spec=Strike) + strike.timestamp = Mock() + strike.timestamp.value = 1000000000000000000 + strike.x = 12.34567890123 # More than 6 decimals + strike.y = 45.67890123456 + strike.amplitude = 50 + + key = imprt2.create_strike_key(strike) + + # Should be rounded to 6 decimals + assert_that(key[1]).is_equal_to(12.345679) + assert_that(key[2]).is_equal_to(45.678901) + + +class TestGetExistingStrikeKeys: + """Tests for querying existing strikes from database.""" + + def test_get_existing_strikes_empty_result(self, mock_strike_db): + """Test with no existing strikes.""" + mock_strike_db.select.return_value = [] + + start = datetime.datetime(2025, 1, 1, 12, 0, 0, tzinfo=datetime.timezone.utc) + end = datetime.datetime(2025, 1, 1, 13, 0, 0, tzinfo=datetime.timezone.utc) + time_interval = TimeInterval(start, end) + + result = imprt2.get_existing_strike_keys(mock_strike_db, time_interval, region=1) + + assert_that(result).is_empty() + mock_strike_db.select.assert_called_once() + + def test_get_existing_strikes_with_results(self, mock_strike_db): + """Test with existing strikes.""" + # Create mock strikes with unique characteristics + strike1 = Mock(spec=Strike) + strike1.timestamp = Mock() + strike1.timestamp.value = 1000000000000000001 + strike1.x = 10.5 + strike1.y = 20.5 + strike1.amplitude = 100 + + strike2 = Mock(spec=Strike) + strike2.timestamp = Mock() + strike2.timestamp.value = 1000000000000000002 + strike2.x = 11.5 + strike2.y = 21.5 + strike2.amplitude = 200 + + mock_strike_db.select.return_value = [strike1, strike2] + + start = datetime.datetime(2025, 1, 1, 12, 0, 0, tzinfo=datetime.timezone.utc) + end = datetime.datetime(2025, 1, 1, 13, 0, 0, tzinfo=datetime.timezone.utc) + time_interval = TimeInterval(start, end) + + result = imprt2.get_existing_strike_keys(mock_strike_db, time_interval, region=1) + + assert_that(result).is_length(2) + assert_that(result).contains( + (1000000000000000001, 10.5, 20.5, 100), + (1000000000000000002, 11.5, 21.5, 200) + ) + + def test_get_existing_strikes_passes_region_filter(self, mock_strike_db): + """Test that region parameter is passed to database query.""" + mock_strike_db.select.return_value = [] + + start = datetime.datetime(2025, 1, 1, 12, 0, 0, tzinfo=datetime.timezone.utc) + end = datetime.datetime(2025, 1, 1, 13, 0, 0, tzinfo=datetime.timezone.utc) + time_interval = TimeInterval(start, end) + + imprt2.get_existing_strike_keys(mock_strike_db, time_interval, region=2) + + # Check that region was passed in kwargs + call_kwargs = mock_strike_db.select.call_args[1] + assert_that(call_kwargs['region']).is_equal_to(2) + + +class TestUpdateStrikes: + """Integration tests for update_strikes function.""" + + @patch('blitzortung.cli.imprt2.blitzortung.db.strike') + @patch('blitzortung.cli.imprt2.fetch_strikes_from_url') + @patch('blitzortung.cli.imprt2.blitzortung.builder.Strike') + @patch('blitzortung.cli.imprt2.blitzortung.config.config') + def test_update_strikes_inserts_new_strikes(self, mock_config, mock_builder_class, mock_fetch, mock_db_func): + """Test that new strikes are inserted.""" + # Setup mocks + mock_strike_db = Mock() + mock_strike_db.select.return_value = [] # No existing strikes + mock_strike_db.insert = Mock() + mock_strike_db.commit = Mock() + mock_strike_db.close = Mock() + mock_db_func.return_value = mock_strike_db + + now = datetime.datetime.now(datetime.timezone.utc) + + # Create mock strikes from URL (no IDs, identified by timestamp/location/amplitude) + strike1 = Mock(spec=Strike) + strike1.timestamp = Mock() + strike1.timestamp.value = int(now.timestamp() * 1_000_000_000) + strike1.timestamp.__le__ = Mock(return_value=True) + strike1.timestamp.__ge__ = Mock(return_value=True) + strike1.x = 10.5 + strike1.y = 20.5 + strike1.amplitude = 100 + + strike2 = Mock(spec=Strike) + strike2.timestamp = Mock() + strike2.timestamp.value = int(now.timestamp() * 1_000_000_000) + 1000 + strike2.timestamp.__le__ = Mock(return_value=True) + strike2.timestamp.__ge__ = Mock(return_value=True) + strike2.x = 11.5 + strike2.y = 21.5 + strike2.amplitude = 200 + + mock_fetch.return_value = [strike1, strike2] + + # Run update + result = imprt2.update_strikes(url='http://example.com/strikes', region=1, hours=1) + + # Verify + assert_that(result).is_equal_to(2) + assert_that(mock_strike_db.insert.call_count).is_equal_to(2) + mock_strike_db.commit.assert_called() + mock_strike_db.close.assert_called_once() + + @patch('blitzortung.cli.imprt2.blitzortung.db.strike') + @patch('blitzortung.cli.imprt2.fetch_strikes_from_url') + @patch('blitzortung.cli.imprt2.blitzortung.builder.Strike') + @patch('blitzortung.cli.imprt2.blitzortung.config.config') + def test_update_strikes_skips_duplicates(self, mock_config, mock_builder_class, mock_fetch, mock_db_func): + """Test that existing strikes are not re-inserted.""" + # Setup mocks + mock_strike_db = Mock() + + now = datetime.datetime.now(datetime.timezone.utc) + timestamp_value = int(now.timestamp() * 1_000_000_000) + + # Existing strike in database (identified by timestamp/location/amplitude) + existing_strike = Mock(spec=Strike) + existing_strike.timestamp = Mock() + existing_strike.timestamp.value = timestamp_value + existing_strike.x = 10.5 + existing_strike.y = 20.5 + existing_strike.amplitude = 100 + + mock_strike_db.select.return_value = [existing_strike] + mock_strike_db.insert = Mock() + mock_strike_db.commit = Mock() + mock_strike_db.close = Mock() + mock_db_func.return_value = mock_strike_db + + # Strike from URL (same timestamp/location/amplitude as existing) + strike_from_url = Mock(spec=Strike) + strike_from_url.timestamp = Mock() + strike_from_url.timestamp.value = timestamp_value + strike_from_url.timestamp.__le__ = Mock(return_value=True) + strike_from_url.timestamp.__ge__ = Mock(return_value=True) + strike_from_url.x = 10.5 + strike_from_url.y = 20.5 + strike_from_url.amplitude = 100 + + mock_fetch.return_value = [strike_from_url] + + # Run update + result = imprt2.update_strikes(url='http://example.com/strikes', region=1, hours=1) + + # Verify - no inserts should happen + assert_that(result).is_equal_to(0) + mock_strike_db.insert.assert_not_called() + mock_strike_db.close.assert_called_once() + + @patch('blitzortung.cli.imprt2.blitzortung.db.strike') + @patch('blitzortung.cli.imprt2.fetch_strikes_from_url') + @patch('blitzortung.cli.imprt2.blitzortung.builder.Strike') + @patch('blitzortung.cli.imprt2.blitzortung.config.config') + def test_update_strikes_filters_by_time_interval(self, mock_config, mock_builder_class, mock_fetch, mock_db_func): + """Test that strikes outside time interval are filtered.""" + # Setup mocks + mock_strike_db = Mock() + mock_strike_db.select.return_value = [] + mock_strike_db.insert = Mock() + mock_strike_db.commit = Mock() + mock_strike_db.close = Mock() + mock_db_func.return_value = mock_strike_db + + now = datetime.datetime.now(datetime.timezone.utc) + + # One strike within interval, one outside + strike_in_interval = Mock(spec=Strike) + strike_in_interval.timestamp = Mock() + strike_in_interval.timestamp.value = int((now - datetime.timedelta(minutes=30)).timestamp() * 1_000_000_000) + strike_in_interval.timestamp.__le__ = Mock(return_value=True) + strike_in_interval.timestamp.__ge__ = Mock(return_value=True) + strike_in_interval.x = 10.5 + strike_in_interval.y = 20.5 + strike_in_interval.amplitude = 100 + + strike_outside_interval = Mock(spec=Strike) + strike_outside_interval.timestamp = Mock() + strike_outside_interval.timestamp.value = int((now - datetime.timedelta(hours=2)).timestamp() * 1_000_000_000) + strike_outside_interval.timestamp.__le__ = Mock(return_value=False) # Outside interval + strike_outside_interval.timestamp.__ge__ = Mock(return_value=True) + strike_outside_interval.x = 11.5 + strike_outside_interval.y = 21.5 + strike_outside_interval.amplitude = 200 + + mock_fetch.return_value = [strike_in_interval, strike_outside_interval] + + # Run update with 1 hour lookback + result = imprt2.update_strikes(url='http://example.com/strikes', region=1, hours=1) + + # Verify - only the strike within interval should be inserted + assert_that(result).is_equal_to(1) + assert_that(mock_strike_db.insert.call_count).is_equal_to(1) + mock_strike_db.insert.assert_called_with(strike_in_interval, 1) From c9a855865c997d1dc565421d8c083d3c6abe20f4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20W=C3=BCrl?= Date: Sat, 15 Nov 2025 22:40:58 +0100 Subject: [PATCH 03/12] fix parsing data --- blitzortung/cli/imprt2.py | 40 +++++++++++++---- tests/cli/test_imprt2.py | 90 +++++++++++---------------------------- 2 files changed, 55 insertions(+), 75 deletions(-) diff --git a/blitzortung/cli/imprt2.py b/blitzortung/cli/imprt2.py index 5983ff2..0ac973f 100755 --- a/blitzortung/cli/imprt2.py +++ b/blitzortung/cli/imprt2.py @@ -113,24 +113,32 @@ def import_strikes(regions, start_time, no_timeout=False, is_update=False): continue statsd_client.gauge("strikes.error_count", error_count) -def fetch_strikes_from_url(url, strike_builder, auth=None): +def fetch_strikes_from_url(url, auth=None): """ Fetch strike data from a given URL and parse it into Strike objects. + The URL returns JSON-formatted strike data, one strike per line. + Each line has the format: + {"time":1763202124325980200,"lat":-15.296556,"lon":134.589548,"alt":0,"pol":0,...} + Args: url: The URL to fetch strike data from - strike_builder: Builder instance to parse strike data auth: Optional tuple of (username, password) for authentication Yields: Strike objects parsed from the URL response """ + import json + from blitzortung.builder import Strike as StrikeBuilder + logger.info("Fetching strikes from URL: %s", url) try: response = requests.get(url, auth=auth, timeout=30) response.raise_for_status() + builder = StrikeBuilder() + strike_count = 0 for line in response.text.splitlines(): @@ -139,13 +147,28 @@ def fetch_strikes_from_url(url, strike_builder, auth=None): continue try: - strike = strike_builder.from_line(line).build() - if strike.timestamp.is_valid: - strike_count += 1 - yield strike - except Exception as e: + # Parse JSON data + data = json.loads(line) + + # Create strike from JSON data + # Build strike object (create new builder for each strike) + strike = (builder + .set_timestamp(Timestamp(data['time'])) + .set_x(data['lon']) + .set_y(data['lat']) + .set_altitude(data.get('alt', 0)) + .set_amplitude(data.get('pol', 0)) + .build()) + + strike_count += 1 + yield strike + + except (json.JSONDecodeError, KeyError) as e: logger.warning("Failed to parse strike: %s (%s)", e, line) continue + except Exception as e: + logger.warning("Failed to create strike object: %s (%s)", e, line) + continue logger.info("Fetched %d strikes from URL", strike_count) @@ -259,9 +282,8 @@ def update_strikes(url=None, region=None, hours=1): existing_strike_keys = get_existing_strike_keys(strike_db, time_interval, region) # Fetch strikes from URL - strike_builder = blitzortung.builder.Strike() try: - url_strikes = list(fetch_strikes_from_url(url, strike_builder, auth=auth)) + url_strikes = list(fetch_strikes_from_url(url, auth=auth)) except requests.RequestException as e: logger.error("Failed to fetch strikes from URL: %s", e) return 0 diff --git a/tests/cli/test_imprt2.py b/tests/cli/test_imprt2.py index af213ed..2728611 100644 --- a/tests/cli/test_imprt2.py +++ b/tests/cli/test_imprt2.py @@ -41,46 +41,6 @@ {"time":1763202117194433500,"lat":36.845773,"lon":-9.083014,"alt":0,"pol":0,"mds":14208,"mcg":228,"status":0,"region":1}""" -@pytest.fixture -def mock_strike_builder(): - """Create a mock strike builder that parses JSON data.""" - def from_json_line(line): - """Parse JSON line and create a mock builder that returns a strike.""" - try: - data = json.loads(line) - - # Create mock strike with valid timestamp - strike = Mock(spec=Strike) - strike.id = data.get('time') # Use timestamp as ID for testing - - # Create timestamp with is_valid property - timestamp = Mock(spec=Timestamp) - timestamp.is_valid = True # Mock allows setting this - timestamp.__le__ = Mock(return_value=True) - timestamp.__ge__ = Mock(return_value=True) - timestamp.__lt__ = Mock(return_value=False) - timestamp.__gt__ = Mock(return_value=False) - - strike.timestamp = timestamp - strike.x = data['lon'] - strike.y = data['lat'] - strike.altitude = data['alt'] - strike.amplitude = data.get('pol', 0) - strike.lateral_error = data.get('mds', 0) - strike.station_count = 0 - strike.stations = [] - - # Create a mock builder that returns this strike - mock_builder = Mock() - mock_builder.build = Mock(return_value=strike) - return mock_builder - - except (json.JSONDecodeError, KeyError) as e: - raise Exception(f"Failed to parse: {e}") - - builder = Mock() - builder.from_line = Mock(side_effect=from_json_line) - return builder @pytest.fixture @@ -108,60 +68,61 @@ def mock_strike_db(): class TestFetchStrikesFromUrl: """Tests for fetching strikes from URL.""" - def test_fetch_strikes_successfully(self, mock_response, mock_strike_builder): + def test_fetch_strikes_successfully(self, mock_response): """Test successful fetch and parse of strike data.""" with patch('blitzortung.cli.imprt2.requests.get', return_value=mock_response): - strikes = list(imprt2.fetch_strikes_from_url('http://example.com/strikes', mock_strike_builder)) + strikes = list(imprt2.fetch_strikes_from_url('http://example.com/strikes')) # Should parse all valid lines (28 strikes in example data) assert_that(strikes).is_not_empty() assert_that(len(strikes)).is_greater_than(0) - def test_fetch_handles_empty_lines(self, mock_strike_builder): + # Verify first strike has expected attributes + first_strike = strikes[0] + assert_that(first_strike.x).is_equal_to(134.589548) + assert_that(first_strike.y).is_equal_to(-15.296556) + assert_that(first_strike.amplitude).is_equal_to(0) + + def test_fetch_handles_empty_lines(self): """Test that empty lines are skipped.""" response = Mock() response.status_code = 200 - response.text = '\n\n{"time":1763202124325980200,"lat":-15.296556,"lon":134.589548,"alt":0}\n\n' + response.text = '\n\n{"time":1763202124325980200,"lat":-15.296556,"lon":134.589548,"alt":0,"pol":100}\n\n' response.raise_for_status = Mock() with patch('blitzortung.cli.imprt2.requests.get', return_value=response): - strikes = list(imprt2.fetch_strikes_from_url('http://example.com/strikes', mock_strike_builder)) + strikes = list(imprt2.fetch_strikes_from_url('http://example.com/strikes')) assert_that(strikes).is_length(1) + assert_that(strikes[0].x).is_equal_to(134.589548) + assert_that(strikes[0].y).is_equal_to(-15.296556) - def test_fetch_handles_invalid_strike_data(self, mock_strike_builder): + def test_fetch_handles_invalid_strike_data(self): """Test that invalid strikes are logged and skipped.""" response = Mock() response.status_code = 200 - response.text = 'invalid json line\n{"time":1763202124325980200,"lat":-15.296556,"lon":134.589548,"alt":0}' + response.text = 'invalid json line\n{"time":1763202124325980200,"lat":-15.296556,"lon":134.589548,"alt":0,"pol":50}' response.raise_for_status = Mock() - # Make builder raise exception for invalid data - def from_line_with_error(line): - if line == 'invalid json line': - raise Exception("Invalid JSON") - return mock_strike_builder.from_line.return_value - - mock_strike_builder.from_line.side_effect = from_line_with_error - with patch('blitzortung.cli.imprt2.requests.get', return_value=response): - strikes = list(imprt2.fetch_strikes_from_url('http://example.com/strikes', mock_strike_builder)) + strikes = list(imprt2.fetch_strikes_from_url('http://example.com/strikes')) # Should skip invalid line but parse valid one assert_that(strikes).is_length(1) + assert_that(strikes[0].amplitude).is_equal_to(50) - def test_fetch_raises_on_http_error(self, mock_strike_builder): + def test_fetch_raises_on_http_error(self): """Test that HTTP errors are propagated.""" import requests with patch('blitzortung.cli.imprt2.requests.get', side_effect=requests.RequestException("Connection error")): with pytest.raises(requests.RequestException): - list(imprt2.fetch_strikes_from_url('http://example.com/strikes', mock_strike_builder)) + list(imprt2.fetch_strikes_from_url('http://example.com/strikes')) - def test_fetch_with_authentication(self, mock_response, mock_strike_builder): + def test_fetch_with_authentication(self, mock_response): """Test fetch with authentication credentials.""" with patch('blitzortung.cli.imprt2.requests.get', return_value=mock_response) as mock_get: - list(imprt2.fetch_strikes_from_url('http://example.com/strikes', mock_strike_builder, auth=('user', 'pass'))) + list(imprt2.fetch_strikes_from_url('http://example.com/strikes', auth=('user', 'pass'))) mock_get.assert_called_once() call_kwargs = mock_get.call_args[1] @@ -267,9 +228,8 @@ class TestUpdateStrikes: @patch('blitzortung.cli.imprt2.blitzortung.db.strike') @patch('blitzortung.cli.imprt2.fetch_strikes_from_url') - @patch('blitzortung.cli.imprt2.blitzortung.builder.Strike') @patch('blitzortung.cli.imprt2.blitzortung.config.config') - def test_update_strikes_inserts_new_strikes(self, mock_config, mock_builder_class, mock_fetch, mock_db_func): + def test_update_strikes_inserts_new_strikes(self, mock_config, mock_fetch, mock_db_func): """Test that new strikes are inserted.""" # Setup mocks mock_strike_db = Mock() @@ -313,9 +273,8 @@ def test_update_strikes_inserts_new_strikes(self, mock_config, mock_builder_clas @patch('blitzortung.cli.imprt2.blitzortung.db.strike') @patch('blitzortung.cli.imprt2.fetch_strikes_from_url') - @patch('blitzortung.cli.imprt2.blitzortung.builder.Strike') @patch('blitzortung.cli.imprt2.blitzortung.config.config') - def test_update_strikes_skips_duplicates(self, mock_config, mock_builder_class, mock_fetch, mock_db_func): + def test_update_strikes_skips_duplicates(self, mock_config, mock_fetch, mock_db_func): """Test that existing strikes are not re-inserted.""" # Setup mocks mock_strike_db = Mock() @@ -359,9 +318,8 @@ def test_update_strikes_skips_duplicates(self, mock_config, mock_builder_class, @patch('blitzortung.cli.imprt2.blitzortung.db.strike') @patch('blitzortung.cli.imprt2.fetch_strikes_from_url') - @patch('blitzortung.cli.imprt2.blitzortung.builder.Strike') @patch('blitzortung.cli.imprt2.blitzortung.config.config') - def test_update_strikes_filters_by_time_interval(self, mock_config, mock_builder_class, mock_fetch, mock_db_func): + def test_update_strikes_filters_by_time_interval(self, mock_config, mock_fetch, mock_db_func): """Test that strikes outside time interval are filtered.""" # Setup mocks mock_strike_db = Mock() From b9709817033de1db619150aa64158b53faf0b947 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20W=C3=BCrl?= Date: Sat, 15 Nov 2025 22:49:24 +0100 Subject: [PATCH 04/12] get last hour --- blitzortung/cli/imprt2.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/blitzortung/cli/imprt2.py b/blitzortung/cli/imprt2.py index 0ac973f..bc2fe45 100755 --- a/blitzortung/cli/imprt2.py +++ b/blitzortung/cli/imprt2.py @@ -256,17 +256,19 @@ def update_strikes(url=None, region=None, hours=1): """ logger.info("Starting strike update for region %s (looking back %d hour(s))", region, hours) + now = datetime.datetime.now(datetime.timezone.utc) + start_time = now - datetime.timedelta(hours=hours) + # Get configuration if URL not provided config = blitzortung.config.config() if url is None: - url = "https://data.blitzortung.org/Data/Protected/last_strikes.php" + start_timestamp_ns = int(start_time.timestamp() * 1e6) * 1000 + url = f"https://data.blitzortung.org/Data/Protected/last_strikes.php?time={start_timestamp_ns}" auth = (config.get_username(), config.get_password()) else: auth = None # Calculate time interval (last N hours) - now = datetime.datetime.now(datetime.timezone.utc) - start_time = now - datetime.timedelta(hours=hours) end_time = now time_interval = blitzortung.db.query.TimeInterval( start_time, From 28141e205d5d3451f53810373ddc59a0aee5d3ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20W=C3=BCrl?= Date: Sat, 15 Nov 2025 22:51:46 +0100 Subject: [PATCH 05/12] fix --- blitzortung/builder/strike.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/blitzortung/builder/strike.py b/blitzortung/builder/strike.py index 60b965d..9afe98b 100644 --- a/blitzortung/builder/strike.py +++ b/blitzortung/builder/strike.py @@ -56,9 +56,9 @@ def set_amplitude(self, amplitude): return self def set_lateral_error(self, lateral_error): - self.lateral_error = force_range(0, lateral_error, 32767) + self.lateral_error = force_range(0, lateral_error, 32767) if lateral_error is not None else None return self - +g def set_station_count(self, station_count): self.station_count = station_count return self From 75935fae17202cb7fa6854ddb1999e5cdc703063 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20W=C3=BCrl?= Date: Sat, 15 Nov 2025 22:52:46 +0100 Subject: [PATCH 06/12] fix --- blitzortung/builder/strike.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/blitzortung/builder/strike.py b/blitzortung/builder/strike.py index 9afe98b..9af5503 100644 --- a/blitzortung/builder/strike.py +++ b/blitzortung/builder/strike.py @@ -58,7 +58,7 @@ def set_amplitude(self, amplitude): def set_lateral_error(self, lateral_error): self.lateral_error = force_range(0, lateral_error, 32767) if lateral_error is not None else None return self -g + def set_station_count(self, station_count): self.station_count = station_count return self From 2ed8cae12b687a339be703245d420ae074e93290 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20W=C3=BCrl?= Date: Sat, 15 Nov 2025 22:56:18 +0100 Subject: [PATCH 07/12] include mds --- blitzortung/cli/imprt2.py | 1 + 1 file changed, 1 insertion(+) diff --git a/blitzortung/cli/imprt2.py b/blitzortung/cli/imprt2.py index bc2fe45..9c77153 100755 --- a/blitzortung/cli/imprt2.py +++ b/blitzortung/cli/imprt2.py @@ -158,6 +158,7 @@ def fetch_strikes_from_url(url, auth=None): .set_y(data['lat']) .set_altitude(data.get('alt', 0)) .set_amplitude(data.get('pol', 0)) + .set_lateral_error(data.get('mds', 0)) .build()) strike_count += 1 From b68d35353bfa9b19f078f0775889d7309efe08a0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20W=C3=BCrl?= Date: Sat, 15 Nov 2025 23:26:45 +0100 Subject: [PATCH 08/12] remove unused code --- blitzortung/cli/imprt2.py | 109 ++++---------------------------------- tests/cli/test_imprt2.py | 25 +++------ 2 files changed, 17 insertions(+), 117 deletions(-) diff --git a/blitzortung/cli/imprt2.py b/blitzortung/cli/imprt2.py index 9c77153..77e6570 100755 --- a/blitzortung/cli/imprt2.py +++ b/blitzortung/cli/imprt2.py @@ -19,18 +19,16 @@ import datetime import logging import os -import time from contextlib import nullcontext import requests import statsd -import stopit from optparse import OptionParser -import blitzortung.dataimport +import blitzortung.config import blitzortung.db +import blitzortung.db.query import blitzortung.logger -from blitzortung import util from blitzortung.data import Timestamp from blitzortung.lock import LockWithTimeout, FailedToAcquireException @@ -41,78 +39,6 @@ statsd_client = statsd.StatsClient('localhost', 8125, prefix='org.blitzortung.import') -def timestamp_is_newer_than(timestamp, latest_time): - if not latest_time: - return True - return timestamp and timestamp > latest_time and timestamp - latest_time != datetime.timedelta() - - -def import_strikes_for(region, start_time, is_update=False): - logger.debug("work on region %d", region) - strike_db = blitzortung.db.strike() - latest_time_timer = util.Timer() - latest_time = strike_db.get_latest_time(region) - logger.debug("latest time for region %d: %s (%.03fs) ", region, latest_time, latest_time_timer.lap()) - if not latest_time: - latest_time = start_time - - if is_update: - start_time = update_start_time() - if not latest_time or start_time > latest_time: - latest_time = start_time - - reference_time = time.time() - strike_source = blitzortung.dataimport.strikes() - strikes = strike_source.get_strikes_since(latest_time, region=region) - query_time = time.time() - - strike_group_size = 10000 - strike_count = 0 - global_start_time = start_time = time.time() - for strike in strikes: - strike_db.insert(strike, region) - - strike_count += 1 - if strike_count % strike_group_size == 0: - strike_db.commit() - logger.info("commit #{} ({:.1f}/s) @{} for region {}".format( - strike_count, strike_group_size / (time.time() - start_time), strike.timestamp, region)) - start_time = time.time() - - if strike_count > 0: - strike_db.commit() - - insert_time = time.time() - stat_name = "strikes.%d" % region - statsd_client.incr(stat_name) - statsd_client.gauge(stat_name + ".count", strike_count) - statsd_client.timing(stat_name + ".get", max(1, int((query_time - reference_time) * 1000))) - statsd_client.timing(stat_name + ".insert", max(1, int((insert_time - query_time) * 1000))) - - logger.info("imported {} strikes ({:.1f}/s) for region {}".format( - strike_count, strike_count / (time.time() - global_start_time), region)) - - -def update_start_time() -> datetime.datetime: - return datetime.datetime.now(datetime.timezone.utc) - datetime.timedelta(minutes=30) - - -def import_strikes(regions, start_time, no_timeout=False, is_update=False): - - error_count = 0 - for region in regions: - for retry in range(5): - try: - with nullcontext() if no_timeout else stopit.SignalTimeout(300): - import_strikes_for(region, start_time, is_update=is_update) - break - except (requests.exceptions.ConnectionError, stopit.TimeoutException): - logger.warning('import failed: retry {} region {}'.format(retry, region)) - error_count += 1 - time.sleep(2) - continue - statsd_client.gauge("strikes.error_count", error_count) - def fetch_strikes_from_url(url, auth=None): """ Fetch strike data from a given URL and parse it into Strike objects. @@ -193,22 +119,15 @@ def create_strike_key(strike): Returns: Tuple representing the strike's unique characteristics """ - # Get timestamp value (handle both Timestamp and datetime objects) - if hasattr(strike.timestamp, 'value'): - timestamp_value = strike.timestamp.value - else: - # For datetime objects, convert to nanoseconds since epoch - timestamp_value = int(strike.timestamp.timestamp() * 1_000_000_000) - return ( - timestamp_value, + strike.timestamp.value, round(strike.x, 6), # Round to 6 decimal places for location round(strike.y, 6), strike.amplitude ) -def get_existing_strike_keys(strike_db, time_interval, region=None): +def get_existing_strike_keys(strike_db, time_interval): """ Retrieve keys of strikes already present in the database for a given time interval. @@ -218,17 +137,14 @@ def get_existing_strike_keys(strike_db, time_interval, region=None): Args: strike_db: Database connection for strikes time_interval: Time interval to query - region: Optional region filter Returns: Set of strike keys (tuples of timestamp, x, y, amplitude) """ - logger.debug("Querying existing strikes for interval %s - %s (region: %s)", - time_interval.start, time_interval.end, region) + logger.debug("Querying existing strikes for interval %s - %s", + time_interval.start, time_interval.end) kwargs = {'time_interval': time_interval, 'order': 'timestamp'} - if region is not None: - kwargs['region'] = region existing_strikes = strike_db.select(**kwargs) strike_keys = {create_strike_key(strike) for strike in existing_strikes} @@ -237,7 +153,7 @@ def get_existing_strike_keys(strike_db, time_interval, region=None): return strike_keys -def update_strikes(url=None, region=None, hours=1): +def update_strikes(url=None, hours=1): """ Update strike database by fetching data from a URL and inserting new strikes. @@ -249,13 +165,12 @@ def update_strikes(url=None, region=None, hours=1): Args: url: URL to fetch strike data from (if None, uses default config URL) - region: Optional region to filter/tag strikes hours: Number of hours to look back (default: 1) Returns: Number of new strikes inserted into the database """ - logger.info("Starting strike update for region %s (looking back %d hour(s))", region, hours) + logger.info("Starting strike update (looking back %d hour(s))", hours) now = datetime.datetime.now(datetime.timezone.utc) start_time = now - datetime.timedelta(hours=hours) @@ -282,7 +197,7 @@ def update_strikes(url=None, region=None, hours=1): strike_db = blitzortung.db.strike() # Get existing strikes from database (identified by timestamp/location/amplitude) - existing_strike_keys = get_existing_strike_keys(strike_db, time_interval, region) + existing_strike_keys = get_existing_strike_keys(strike_db, time_interval) # Fetch strikes from URL try: @@ -315,7 +230,7 @@ def update_strikes(url=None, region=None, hours=1): insert_count = 0 for strike in new_strikes: try: - strike_db.insert(strike, region) + strike_db.insert(strike) insert_count += 1 if insert_count % 1000 == 0: @@ -352,8 +267,6 @@ def main(): parser = OptionParser(description="Import strike data from URL into database") parser.add_option("-u", "--url", dest="url", type="string", default=None, help="URL to fetch strike data from (optional, uses default if not provided)") - parser.add_option("-r", "--region", dest="region", type="int", default=None, - help="Region number (optional)") parser.add_option("--hours", dest="hours", type="int", default=1, help="Number of hours to look back (default: 1)") parser.add_option("-v", "--verbose", dest="verbose", action="store_true", @@ -378,7 +291,7 @@ def main(): try: with lock_context: - count = update_strikes(url=options.url, region=options.region, hours=options.hours) + count = update_strikes(url=options.url, hours=options.hours) logger.info("Import completed: %d new strikes inserted", count) return 0 diff --git a/tests/cli/test_imprt2.py b/tests/cli/test_imprt2.py index 2728611..8001e37 100644 --- a/tests/cli/test_imprt2.py +++ b/tests/cli/test_imprt2.py @@ -172,7 +172,7 @@ def test_get_existing_strikes_empty_result(self, mock_strike_db): end = datetime.datetime(2025, 1, 1, 13, 0, 0, tzinfo=datetime.timezone.utc) time_interval = TimeInterval(start, end) - result = imprt2.get_existing_strike_keys(mock_strike_db, time_interval, region=1) + result = imprt2.get_existing_strike_keys(mock_strike_db, time_interval) assert_that(result).is_empty() mock_strike_db.select.assert_called_once() @@ -200,7 +200,7 @@ def test_get_existing_strikes_with_results(self, mock_strike_db): end = datetime.datetime(2025, 1, 1, 13, 0, 0, tzinfo=datetime.timezone.utc) time_interval = TimeInterval(start, end) - result = imprt2.get_existing_strike_keys(mock_strike_db, time_interval, region=1) + result = imprt2.get_existing_strike_keys(mock_strike_db, time_interval) assert_that(result).is_length(2) assert_that(result).contains( @@ -208,19 +208,6 @@ def test_get_existing_strikes_with_results(self, mock_strike_db): (1000000000000000002, 11.5, 21.5, 200) ) - def test_get_existing_strikes_passes_region_filter(self, mock_strike_db): - """Test that region parameter is passed to database query.""" - mock_strike_db.select.return_value = [] - - start = datetime.datetime(2025, 1, 1, 12, 0, 0, tzinfo=datetime.timezone.utc) - end = datetime.datetime(2025, 1, 1, 13, 0, 0, tzinfo=datetime.timezone.utc) - time_interval = TimeInterval(start, end) - - imprt2.get_existing_strike_keys(mock_strike_db, time_interval, region=2) - - # Check that region was passed in kwargs - call_kwargs = mock_strike_db.select.call_args[1] - assert_that(call_kwargs['region']).is_equal_to(2) class TestUpdateStrikes: @@ -263,7 +250,7 @@ def test_update_strikes_inserts_new_strikes(self, mock_config, mock_fetch, mock_ mock_fetch.return_value = [strike1, strike2] # Run update - result = imprt2.update_strikes(url='http://example.com/strikes', region=1, hours=1) + result = imprt2.update_strikes(url='http://example.com/strikes', hours=1) # Verify assert_that(result).is_equal_to(2) @@ -309,7 +296,7 @@ def test_update_strikes_skips_duplicates(self, mock_config, mock_fetch, mock_db_ mock_fetch.return_value = [strike_from_url] # Run update - result = imprt2.update_strikes(url='http://example.com/strikes', region=1, hours=1) + result = imprt2.update_strikes(url='http://example.com/strikes', hours=1) # Verify - no inserts should happen assert_that(result).is_equal_to(0) @@ -353,9 +340,9 @@ def test_update_strikes_filters_by_time_interval(self, mock_config, mock_fetch, mock_fetch.return_value = [strike_in_interval, strike_outside_interval] # Run update with 1 hour lookback - result = imprt2.update_strikes(url='http://example.com/strikes', region=1, hours=1) + result = imprt2.update_strikes(url='http://example.com/strikes', hours=1) # Verify - only the strike within interval should be inserted assert_that(result).is_equal_to(1) assert_that(mock_strike_db.insert.call_count).is_equal_to(1) - mock_strike_db.insert.assert_called_with(strike_in_interval, 1) + mock_strike_db.insert.assert_called_with(strike_in_interval) From db00f36aff240274236a5aeb490c45dac68a7c22 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20W=C3=BCrl?= Date: Sat, 15 Nov 2025 23:30:43 +0100 Subject: [PATCH 09/12] log timing --- blitzortung/cli/imprt2.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/blitzortung/cli/imprt2.py b/blitzortung/cli/imprt2.py index 77e6570..46b2ff4 100755 --- a/blitzortung/cli/imprt2.py +++ b/blitzortung/cli/imprt2.py @@ -29,6 +29,7 @@ import blitzortung.db import blitzortung.db.query import blitzortung.logger +from blitzortung import util from blitzortung.data import Timestamp from blitzortung.lock import LockWithTimeout, FailedToAcquireException @@ -57,12 +58,14 @@ def fetch_strikes_from_url(url, auth=None): import json from blitzortung.builder import Strike as StrikeBuilder - logger.info("Fetching strikes from URL: %s", url) try: + timer = util.Timer() response = requests.get(url, auth=auth, timeout=30) response.raise_for_status() + logger.info("Fetching strikes from URL: %s (%.03fs)", url, timer.lap()) + builder = StrikeBuilder() strike_count = 0 From f4ebf07aa474651fa21ecf5f3c4941da1389c5e1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20W=C3=BCrl?= Date: Sat, 15 Nov 2025 23:42:17 +0100 Subject: [PATCH 10/12] remove url parameter --- blitzortung/cli/imprt2.py | 15 +++++---------- tests/cli/test_imprt2.py | 6 +++--- 2 files changed, 8 insertions(+), 13 deletions(-) diff --git a/blitzortung/cli/imprt2.py b/blitzortung/cli/imprt2.py index 46b2ff4..2a293b8 100755 --- a/blitzortung/cli/imprt2.py +++ b/blitzortung/cli/imprt2.py @@ -156,7 +156,7 @@ def get_existing_strike_keys(strike_db, time_interval): return strike_keys -def update_strikes(url=None, hours=1): +def update_strikes(hours=1): """ Update strike database by fetching data from a URL and inserting new strikes. @@ -180,12 +180,9 @@ def update_strikes(url=None, hours=1): # Get configuration if URL not provided config = blitzortung.config.config() - if url is None: - start_timestamp_ns = int(start_time.timestamp() * 1e6) * 1000 - url = f"https://data.blitzortung.org/Data/Protected/last_strikes.php?time={start_timestamp_ns}" - auth = (config.get_username(), config.get_password()) - else: - auth = None + start_timestamp_ns = int(start_time.timestamp() * 1e6) * 1000 + url = f"https://data.blitzortung.org/Data/Protected/last_strikes.php?time={start_timestamp_ns}" + auth = (config.get_username(), config.get_password()) # Calculate time interval (last N hours) end_time = now @@ -268,8 +265,6 @@ def main(): Command-line interface for the strike import tool. """ parser = OptionParser(description="Import strike data from URL into database") - parser.add_option("-u", "--url", dest="url", type="string", default=None, - help="URL to fetch strike data from (optional, uses default if not provided)") parser.add_option("--hours", dest="hours", type="int", default=1, help="Number of hours to look back (default: 1)") parser.add_option("-v", "--verbose", dest="verbose", action="store_true", @@ -294,7 +289,7 @@ def main(): try: with lock_context: - count = update_strikes(url=options.url, hours=options.hours) + count = update_strikes(hours=options.hours) logger.info("Import completed: %d new strikes inserted", count) return 0 diff --git a/tests/cli/test_imprt2.py b/tests/cli/test_imprt2.py index 8001e37..2808f3f 100644 --- a/tests/cli/test_imprt2.py +++ b/tests/cli/test_imprt2.py @@ -250,7 +250,7 @@ def test_update_strikes_inserts_new_strikes(self, mock_config, mock_fetch, mock_ mock_fetch.return_value = [strike1, strike2] # Run update - result = imprt2.update_strikes(url='http://example.com/strikes', hours=1) + result = imprt2.update_strikes(hours=1) # Verify assert_that(result).is_equal_to(2) @@ -296,7 +296,7 @@ def test_update_strikes_skips_duplicates(self, mock_config, mock_fetch, mock_db_ mock_fetch.return_value = [strike_from_url] # Run update - result = imprt2.update_strikes(url='http://example.com/strikes', hours=1) + result = imprt2.update_strikes(hours=1) # Verify - no inserts should happen assert_that(result).is_equal_to(0) @@ -340,7 +340,7 @@ def test_update_strikes_filters_by_time_interval(self, mock_config, mock_fetch, mock_fetch.return_value = [strike_in_interval, strike_outside_interval] # Run update with 1 hour lookback - result = imprt2.update_strikes(url='http://example.com/strikes', hours=1) + result = imprt2.update_strikes(hours=1) # Verify - only the strike within interval should be inserted assert_that(result).is_equal_to(1) From 348e26f44f5041c27dd316cbc950443ae65511c7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20W=C3=BCrl?= Date: Sat, 15 Nov 2025 23:44:28 +0100 Subject: [PATCH 11/12] do not commit partial results --- blitzortung/cli/imprt2.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/blitzortung/cli/imprt2.py b/blitzortung/cli/imprt2.py index 2a293b8..da8c43b 100755 --- a/blitzortung/cli/imprt2.py +++ b/blitzortung/cli/imprt2.py @@ -233,10 +233,6 @@ def update_strikes(hours=1): strike_db.insert(strike) insert_count += 1 - if insert_count % 1000 == 0: - strike_db.commit() - logger.info("Committed %d strikes so far", insert_count) - except Exception as e: logger.error("Failed to insert strike %s: %s", strike.id, e) strike_db.rollback() From 810d925223c1c78997516fc7ab0c6f7fa5d1b49d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20W=C3=BCrl?= Date: Sat, 15 Nov 2025 23:50:39 +0100 Subject: [PATCH 12/12] test database error --- tests/cli/test_imprt2.py | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/tests/cli/test_imprt2.py b/tests/cli/test_imprt2.py index 2808f3f..45b42de 100644 --- a/tests/cli/test_imprt2.py +++ b/tests/cli/test_imprt2.py @@ -346,3 +346,38 @@ def test_update_strikes_filters_by_time_interval(self, mock_config, mock_fetch, assert_that(result).is_equal_to(1) assert_that(mock_strike_db.insert.call_count).is_equal_to(1) mock_strike_db.insert.assert_called_with(strike_in_interval) + + @patch('blitzortung.cli.imprt2.blitzortung.db.strike') + @patch('blitzortung.cli.imprt2.fetch_strikes_from_url') + @patch('blitzortung.cli.imprt2.blitzortung.config.config') + def test_failure_at_insert(self, mock_config, mock_fetch, mock_db_func): + # Setup mocks + mock_strike_db = Mock() + mock_strike_db.select.return_value = [] + mock_strike_db.insert = Mock() + mock_strike_db.commit = Mock() + mock_strike_db.close = Mock() + mock_db_func.return_value = mock_strike_db + + now = datetime.datetime.now(datetime.timezone.utc) + + # One strike within interval, one outside + strike = Mock(spec=Strike) + strike.timestamp = Mock() + strike.timestamp.value = int((now - datetime.timedelta(minutes=30)).timestamp() * 1_000_000_000) + strike.timestamp.__le__ = Mock(return_value=True) + strike.timestamp.__ge__ = Mock(return_value=True) + strike.x = 10.5 + strike.y = 20.5 + strike.amplitude = 100 + + mock_fetch.return_value = [strike] + + mock_strike_db.insert.side_effect = Exception("Database error") + + with pytest.raises(Exception) as exc_info: + imprt2.update_strikes(hours=1) + + assert exc_info.value.args[0] == "Database error" + + mock_strike_db.rollback.assert_called()