|
| 1 | +# -*- coding: utf8 -*- |
| 2 | + |
| 3 | +""" |
| 4 | + Copyright (C) 2011-2025 Andreas Würl |
| 5 | +
|
| 6 | + Licensed under the Apache License, Version 2.0 (the "License"); |
| 7 | + you may not use this file except in compliance with the License. |
| 8 | + You may obtain a copy of the License at |
| 9 | +
|
| 10 | + http://www.apache.org/licenses/LICENSE-2.0 |
| 11 | +
|
| 12 | + Unless required by applicable law or agreed to in writing, software |
| 13 | + distributed under the License is distributed on an "AS IS" BASIS, |
| 14 | + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 15 | + See the License for the specific language governing permissions and |
| 16 | + limitations under the License. |
| 17 | +""" |
| 18 | + |
| 19 | +import datetime |
| 20 | +import logging |
| 21 | +import os |
| 22 | +from contextlib import nullcontext |
| 23 | + |
| 24 | +import requests |
| 25 | +import statsd |
| 26 | +from optparse import OptionParser |
| 27 | + |
| 28 | +import blitzortung.config |
| 29 | +import blitzortung.db |
| 30 | +import blitzortung.db.query |
| 31 | +import blitzortung.logger |
| 32 | +from blitzortung import util |
| 33 | +from blitzortung.data import Timestamp |
| 34 | +from blitzortung.lock import LockWithTimeout, FailedToAcquireException |
| 35 | + |
| 36 | +logger = logging.getLogger(os.path.basename(__file__)) |
| 37 | +blitzortung.set_parent_logger(logger) |
| 38 | +blitzortung.add_log_handler(blitzortung.logger.create_console_handler()) |
| 39 | + |
| 40 | +statsd_client = statsd.StatsClient('localhost', 8125, prefix='org.blitzortung.import') |
| 41 | + |
| 42 | + |
| 43 | +def fetch_strikes_from_url(url, auth=None): |
| 44 | + """ |
| 45 | + Fetch strike data from a given URL and parse it into Strike objects. |
| 46 | +
|
| 47 | + The URL returns JSON-formatted strike data, one strike per line. |
| 48 | + Each line has the format: |
| 49 | + {"time":1763202124325980200,"lat":-15.296556,"lon":134.589548,"alt":0,"pol":0,...} |
| 50 | +
|
| 51 | + Args: |
| 52 | + url: The URL to fetch strike data from |
| 53 | + auth: Optional tuple of (username, password) for authentication |
| 54 | +
|
| 55 | + Yields: |
| 56 | + Strike objects parsed from the URL response |
| 57 | + """ |
| 58 | + import json |
| 59 | + from blitzortung.builder import Strike as StrikeBuilder |
| 60 | + |
| 61 | + |
| 62 | + try: |
| 63 | + timer = util.Timer() |
| 64 | + response = requests.get(url, auth=auth, timeout=30) |
| 65 | + response.raise_for_status() |
| 66 | + |
| 67 | + logger.info("Fetching strikes from URL: %s (%.03fs)", url, timer.lap()) |
| 68 | + |
| 69 | + builder = StrikeBuilder() |
| 70 | + |
| 71 | + strike_count = 0 |
| 72 | + |
| 73 | + for line in response.text.splitlines(): |
| 74 | + line = line.strip() |
| 75 | + if not line: |
| 76 | + continue |
| 77 | + |
| 78 | + try: |
| 79 | + # Parse JSON data |
| 80 | + data = json.loads(line) |
| 81 | + |
| 82 | + # Create strike from JSON data |
| 83 | + # Build strike object (create new builder for each strike) |
| 84 | + strike = (builder |
| 85 | + .set_timestamp(Timestamp(data['time'])) |
| 86 | + .set_x(data['lon']) |
| 87 | + .set_y(data['lat']) |
| 88 | + .set_altitude(data.get('alt', 0)) |
| 89 | + .set_amplitude(data.get('pol', 0)) |
| 90 | + .set_lateral_error(data.get('mds', 0)) |
| 91 | + .build()) |
| 92 | + |
| 93 | + strike_count += 1 |
| 94 | + yield strike |
| 95 | + |
| 96 | + except (json.JSONDecodeError, KeyError) as e: |
| 97 | + logger.warning("Failed to parse strike: %s (%s)", e, line) |
| 98 | + continue |
| 99 | + except Exception as e: |
| 100 | + logger.warning("Failed to create strike object: %s (%s)", e, line) |
| 101 | + continue |
| 102 | + |
| 103 | + logger.info("Fetched %d strikes from URL", strike_count) |
| 104 | + |
| 105 | + except requests.RequestException as e: |
| 106 | + logger.error("Failed to fetch data from URL %s: %s", url, e) |
| 107 | + raise |
| 108 | + |
| 109 | + |
| 110 | +def create_strike_key(strike): |
| 111 | + """ |
| 112 | + Create a unique key for a strike based on its attributes. |
| 113 | +
|
| 114 | + Since strikes from URLs don't have IDs, we identify them by: |
| 115 | + - timestamp (nanosecond precision) |
| 116 | + - location (x, y coordinates) |
| 117 | + - amplitude |
| 118 | +
|
| 119 | + Args: |
| 120 | + strike: Strike object |
| 121 | +
|
| 122 | + Returns: |
| 123 | + Tuple representing the strike's unique characteristics |
| 124 | + """ |
| 125 | + return ( |
| 126 | + strike.timestamp.value, |
| 127 | + round(strike.x, 6), # Round to 6 decimal places for location |
| 128 | + round(strike.y, 6), |
| 129 | + strike.amplitude |
| 130 | + ) |
| 131 | + |
| 132 | + |
| 133 | +def get_existing_strike_keys(strike_db, time_interval): |
| 134 | + """ |
| 135 | + Retrieve keys of strikes already present in the database for a given time interval. |
| 136 | +
|
| 137 | + Strikes are identified by their timestamp, location, and amplitude since |
| 138 | + strikes from URLs don't have database IDs. |
| 139 | +
|
| 140 | + Args: |
| 141 | + strike_db: Database connection for strikes |
| 142 | + time_interval: Time interval to query |
| 143 | +
|
| 144 | + Returns: |
| 145 | + Set of strike keys (tuples of timestamp, x, y, amplitude) |
| 146 | + """ |
| 147 | + logger.debug("Querying existing strikes for interval %s - %s", |
| 148 | + time_interval.start, time_interval.end) |
| 149 | + |
| 150 | + kwargs = {'time_interval': time_interval, 'order': 'timestamp'} |
| 151 | + |
| 152 | + existing_strikes = strike_db.select(**kwargs) |
| 153 | + strike_keys = {create_strike_key(strike) for strike in existing_strikes} |
| 154 | + |
| 155 | + logger.info("Found %d existing strikes in database", len(strike_keys)) |
| 156 | + return strike_keys |
| 157 | + |
| 158 | + |
| 159 | +def update_strikes(hours=1): |
| 160 | + """ |
| 161 | + Update strike database by fetching data from a URL and inserting new strikes. |
| 162 | +
|
| 163 | + This function: |
| 164 | + 1. Calculates a time interval (default: last 1 hour) |
| 165 | + 2. Retrieves existing strikes from the database for that interval |
| 166 | + 3. Fetches strikes from the provided URL |
| 167 | + 4. Inserts only strikes that are not already in the database |
| 168 | +
|
| 169 | + Args: |
| 170 | + url: URL to fetch strike data from (if None, uses default config URL) |
| 171 | + hours: Number of hours to look back (default: 1) |
| 172 | +
|
| 173 | + Returns: |
| 174 | + Number of new strikes inserted into the database |
| 175 | + """ |
| 176 | + logger.info("Starting strike update (looking back %d hour(s))", hours) |
| 177 | + |
| 178 | + now = datetime.datetime.now(datetime.timezone.utc) |
| 179 | + start_time = now - datetime.timedelta(hours=hours) |
| 180 | + |
| 181 | + # Get configuration if URL not provided |
| 182 | + config = blitzortung.config.config() |
| 183 | + start_timestamp_ns = int(start_time.timestamp() * 1e6) * 1000 |
| 184 | + url = f"https://data.blitzortung.org/Data/Protected/last_strikes.php?time={start_timestamp_ns}" |
| 185 | + auth = (config.get_username(), config.get_password()) |
| 186 | + |
| 187 | + # Calculate time interval (last N hours) |
| 188 | + end_time = now |
| 189 | + time_interval = blitzortung.db.query.TimeInterval( |
| 190 | + start_time, |
| 191 | + end_time |
| 192 | + ) |
| 193 | + |
| 194 | + logger.info("Time interval: %s to %s", start_time, end_time) |
| 195 | + |
| 196 | + # Get database connection |
| 197 | + strike_db = blitzortung.db.strike() |
| 198 | + |
| 199 | + # Get existing strikes from database (identified by timestamp/location/amplitude) |
| 200 | + existing_strike_keys = get_existing_strike_keys(strike_db, time_interval) |
| 201 | + |
| 202 | + # Fetch strikes from URL |
| 203 | + try: |
| 204 | + url_strikes = list(fetch_strikes_from_url(url, auth=auth)) |
| 205 | + except requests.RequestException as e: |
| 206 | + logger.error("Failed to fetch strikes from URL: %s", e) |
| 207 | + return 0 |
| 208 | + |
| 209 | + # Filter strikes: only those within time interval and not in database |
| 210 | + new_strikes = [] |
| 211 | + for strike in url_strikes: |
| 212 | + # Check if strike is within the time interval |
| 213 | + if not (time_interval.start <= strike.timestamp <= time_interval.end): |
| 214 | + logger.debug("Strike at %s outside time interval, skipping", strike.timestamp) |
| 215 | + continue |
| 216 | + |
| 217 | + # Check if strike already exists in database (by timestamp/location/amplitude) |
| 218 | + strike_key = create_strike_key(strike) |
| 219 | + if strike_key in existing_strike_keys: |
| 220 | + logger.debug("Strike at %s (%.6f, %.6f) already exists, skipping", |
| 221 | + strike.timestamp, strike.x, strike.y) |
| 222 | + continue |
| 223 | + |
| 224 | + new_strikes.append(strike) |
| 225 | + |
| 226 | + logger.info("Found %d new strikes to insert (out of %d from URL)", |
| 227 | + len(new_strikes), len(url_strikes)) |
| 228 | + |
| 229 | + # Insert new strikes |
| 230 | + insert_count = 0 |
| 231 | + for strike in new_strikes: |
| 232 | + try: |
| 233 | + strike_db.insert(strike) |
| 234 | + insert_count += 1 |
| 235 | + |
| 236 | + except Exception as e: |
| 237 | + logger.error("Failed to insert strike %s: %s", strike.id, e) |
| 238 | + strike_db.rollback() |
| 239 | + raise |
| 240 | + |
| 241 | + # Final commit |
| 242 | + if insert_count > 0: |
| 243 | + strike_db.commit() |
| 244 | + logger.info("Successfully inserted %d new strikes", insert_count) |
| 245 | + else: |
| 246 | + logger.info("No new strikes to insert") |
| 247 | + |
| 248 | + strike_db.close() |
| 249 | + |
| 250 | + # Update statistics |
| 251 | + statsd_client.gauge("strikes.imported", insert_count) |
| 252 | + |
| 253 | + return insert_count |
| 254 | + |
| 255 | + |
| 256 | + |
| 257 | + |
| 258 | + |
| 259 | +def main(): |
| 260 | + """ |
| 261 | + Command-line interface for the strike import tool. |
| 262 | + """ |
| 263 | + parser = OptionParser(description="Import strike data from URL into database") |
| 264 | + parser.add_option("--hours", dest="hours", type="int", default=1, |
| 265 | + help="Number of hours to look back (default: 1)") |
| 266 | + parser.add_option("-v", "--verbose", dest="verbose", action="store_true", |
| 267 | + help="Enable verbose logging") |
| 268 | + parser.add_option("-d", "--debug", dest="debug", action="store_true", |
| 269 | + help="Enable debug logging") |
| 270 | + parser.add_option("--no-lock", dest="no_lock", action="store_true", |
| 271 | + help="Skip file locking (use with caution)") |
| 272 | + |
| 273 | + (options, args) = parser.parse_args() |
| 274 | + |
| 275 | + # Set logging level |
| 276 | + if options.debug: |
| 277 | + blitzortung.set_log_level(logging.DEBUG) |
| 278 | + elif options.verbose: |
| 279 | + blitzortung.set_log_level(logging.INFO) |
| 280 | + else: |
| 281 | + blitzortung.set_log_level(logging.WARNING) |
| 282 | + |
| 283 | + # Use lock unless disabled |
| 284 | + lock_context = nullcontext() if options.no_lock else LockWithTimeout('/tmp/.bo-import2.lock').locked(10) |
| 285 | + |
| 286 | + try: |
| 287 | + with lock_context: |
| 288 | + count = update_strikes(hours=options.hours) |
| 289 | + logger.info("Import completed: %d new strikes inserted", count) |
| 290 | + return 0 |
| 291 | + |
| 292 | + except FailedToAcquireException: |
| 293 | + logger.warning("Could not acquire lock - another import may be running") |
| 294 | + return 1 |
| 295 | + except Exception as e: |
| 296 | + logger.error("Import failed: %s", e, exc_info=options.debug) |
| 297 | + return 1 |
| 298 | + |
| 299 | + |
| 300 | +if __name__ == "__main__": |
| 301 | + import sys |
| 302 | + sys.exit(main()) |
0 commit comments