Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions blitzortung/cli/start_webservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@
def main():
target_dir = os.path.dirname(os.path.abspath(__file__))

pid_file = "/var/run/bo-webservice.pid"
args = ["twistd"]
if not os.environ.get("BLITZORTUNG_TEST"):
args += ["--pidfile", "/var/run/bo-webservice.pid"]

sys.argv = ["twistd", "--pidfile", pid_file, "-oy", os.path.join(target_dir, "webservice.py")]
sys.argv = args + ["-oy", os.path.join(target_dir, "webservice.py")]
sys.exit(run())


Expand Down
94 changes: 25 additions & 69 deletions blitzortung/cli/webservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,9 @@
import platform
import time

import psycopg2
import psycopg2.extras
import pyproj
import statsd
from twisted.application import internet, service
from twisted.internet import defer
from twisted.internet.error import ReactorAlreadyInstalledError
from twisted.python import log
from twisted.python.log import FileLogObserver, ILogObserver, textFromEventDict, _safeFormat
Expand All @@ -30,11 +27,9 @@
from txjsonrpc_ng.web import jsonrpc
from txjsonrpc_ng.web.data import CacheableResult
from txjsonrpc_ng.web.jsonrpc import with_request
from txpostgres import reconnection
from txpostgres.txpostgres import Connection, ConnectionPool

try:
from twisted.internet import epollreactor as reactor
from twisted.internet import epollreactor as reactor, defer
except ImportError:
from twisted.internet import kqreactor as reactor

Expand All @@ -49,6 +44,7 @@
import blitzortung.geom
import blitzortung.service
from blitzortung.db.query import TimeInterval
from blitzortung.service.db import create_connection_pool
from blitzortung.service.general import create_time_interval
from blitzortung.service.strike_grid import GridParameters

Expand All @@ -68,59 +64,6 @@

FORBIDDEN_IPS = {}


def connection_factory(*args, **kwargs):
"""Create a psycopg2 connection with DictConnection factory."""
kwargs['connection_factory'] = psycopg2.extras.DictConnection
return psycopg2.connect(*args, **kwargs)


class LoggingDetector(reconnection.DeadConnectionDetector):
"""Database connection detector that logs reconnection events."""

def startReconnecting(self, f):
print('[*] database connection is down (error: %r)' % f.value)
return reconnection.DeadConnectionDetector.startReconnecting(self, f)

def reconnect(self):
print('[*] reconnecting...')
return reconnection.DeadConnectionDetector.reconnect(self)

def connectionRecovered(self):
print('[*] connection recovered')
return reconnection.DeadConnectionDetector.connectionRecovered(self)


class DictConnection(Connection):
"""Database connection using DictConnection factory with logging detector."""
connectionFactory = staticmethod(connection_factory)

def __init__(self, reactor=None, cooperator=None, detector=None):
if not detector:
detector = LoggingDetector()
super(DictConnection, self).__init__(reactor, cooperator, detector)


class DictConnectionPool(ConnectionPool):
"""Connection pool using DictConnection instances."""
connectionFactory = DictConnection

def __init__(self, _ignored, *connargs, **connkw):
super(DictConnectionPool, self).__init__(_ignored, *connargs, **connkw)


def create_connection_pool():
"""Create and start the database connection pool."""
config = blitzortung.config.config()
db_connection_string = config.get_db_connection_string()

connection_pool = DictConnectionPool(None, db_connection_string)

d = connection_pool.start()
d.addErrback(log.err)
return connection_pool


grid = {
1: blitzortung.geom.GridFactory(-25, 57, 27, 72, UTM_EU),
2: blitzortung.geom.GridFactory(110, 180, -50, 0, UTM_OCEANIA),
Expand Down Expand Up @@ -507,7 +450,7 @@ def fix_bad_accept_header(self, request, user_agent):
def get_histogram(self, time_interval: TimeInterval, region=None, envelope=None):
return self.histogram_cache.get(self.histogram_query.create,
time_interval=time_interval,
connection=self.connection_pool,
connection_pool=self.connection_pool,
region=region,
envelope=envelope)

Expand Down Expand Up @@ -581,18 +524,31 @@ def emit(self, event_dict):

application = service.Application("Blitzortung.org JSON-RPC Server")

log_directory = "/var/log/blitzortung"
if os.environ.get('BLITZORTUNG_TEST'):
import tempfile
log_directory = tempfile.mkdtemp()
print("LOG_DIR", log_directory)
else:
log_directory = "/var/log/blitzortung"
if os.path.exists(log_directory):
logfile = DailyLogFile("webservice.log", log_directory)
application.setComponent(ILogObserver, LogObserver(logfile).emit)
else:
log_directory = None

connection_pool = create_connection_pool()
root = Blitzortung(connection_pool, log_directory)

config = blitzortung.config.config()
site = server.Site(root)
site.displayTracebacks = False
jsonrpc_server = internet.TCPServer(config.get_webservice_port(), site, interface='127.0.0.1')
jsonrpc_server.setServiceParent(application)
def start_server(connection_pool):
print("Connection pool is ready")
root = Blitzortung(connection_pool, log_directory)
config = blitzortung.config.config()
site = server.Site(root)
site.displayTracebacks = False
jsonrpc_server = internet.TCPServer(config.get_webservice_port(), site, interface='127.0.0.1')
jsonrpc_server.setServiceParent(application)
return jsonrpc_server

def on_error(failure):
log.err(failure, "Failed to create connection pool")
raise failure.value

deferred_connection_pool = create_connection_pool()
deferred_connection_pool.addCallback(start_server).addErrback(on_error)
1 change: 0 additions & 1 deletion blitzortung/dataimport/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,3 @@ def strikes():
from .. import INJECTOR

return INJECTOR.get(StrikesBlitzortungDataProvider)

2 changes: 1 addition & 1 deletion blitzortung/db/query_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def global_grid_query(table_name, grid, count_threshold=0, **kwargs):
.set_default_conditions(**kwargs)

@staticmethod
def histogram_query(table_name: str, time_interval: TimeInterval, binsize:int, region:Optional[int]=None, envelope=None):
def histogram_query(table_name: str, time_interval: TimeInterval, binsize:int, region:Optional[int]=None, envelope=None) -> SelectQuery:

query = SelectQuery() \
.set_table_name(table_name) \
Expand Down
11 changes: 7 additions & 4 deletions blitzortung/service/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,30 @@
"""

from . import histogram, strike, strike_grid
from .histogram import HistogramQuery
from .strike import StrikeQuery
from .strike_grid import GlobalStrikeGridQuery, StrikeGridQuery


def strike_query():
def strike_query() -> StrikeQuery:
from .. import INJECTOR

return INJECTOR.get(strike.StrikeQuery)


def strike_grid_query():
def strike_grid_query() -> StrikeGridQuery:
from .. import INJECTOR

return INJECTOR.get(strike_grid.StrikeGridQuery)


def global_strike_grid_query():
def global_strike_grid_query() -> GlobalStrikeGridQuery:
from .. import INJECTOR

return INJECTOR.get(strike_grid.GlobalStrikeGridQuery)


def histogram_query():
def histogram_query() -> HistogramQuery:
from .. import INJECTOR

return INJECTOR.get(histogram.HistogramQuery)
83 changes: 83 additions & 0 deletions blitzortung/service/db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
"""

Copyright 2025 Andreas Würl

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

"""

import psycopg2
import psycopg2.extras
from pytest_twisted import inlineCallbacks
from twisted.internet.defer import Deferred
from twisted.python import log
from txpostgres import reconnection
from txpostgres.txpostgres import Connection, ConnectionPool

import blitzortung.config
from blitzortung.db.query import SelectQuery


def connection_factory(*args, **kwargs):
"""Create a psycopg2 connection with DictConnection factory."""
kwargs['connection_factory'] = psycopg2.extras.DictConnection
return psycopg2.connect(*args, **kwargs)


class LoggingDetector(reconnection.DeadConnectionDetector):
"""Database connection detector that logs reconnection events."""

def startReconnecting(self, f):
print('[*] database connection is down (error: %r)' % f.value)
return reconnection.DeadConnectionDetector.startReconnecting(self, f)

def reconnect(self):
print('[*] reconnecting...')
return reconnection.DeadConnectionDetector.reconnect(self)

def connectionRecovered(self):
print('[*] connection recovered')
return reconnection.DeadConnectionDetector.connectionRecovered(self)


class DictConnection(Connection):
"""Database connection using DictConnection factory with logging detector."""
connectionFactory = staticmethod(connection_factory)

def __init__(self, reactor=None, cooperator=None, detector=None):
if not detector:
detector = LoggingDetector()
super(DictConnection, self).__init__(reactor, cooperator, detector)


class DictConnectionPool(ConnectionPool):
"""Connection pool using DictConnection instances."""
connectionFactory = DictConnection

def __init__(self, _ignored, *connargs, **connkw):
super(DictConnectionPool, self).__init__(_ignored, *connargs, **connkw)

def create_connection_pool() -> Deferred:
"""Create and start the database connection pool."""
config = blitzortung.config.config()
db_connection_string = config.get_db_connection_string()

connection_pool = DictConnectionPool(None, db_connection_string)

d = connection_pool.start()
d.addErrback(log.err)

return d

def execute(connection, query: SelectQuery):
return connection.runQuery(str(query), query.get_parameters())
16 changes: 10 additions & 6 deletions blitzortung/service/histogram.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

from injector import inject

from .db import execute
from .. import db
from ..db.query import TimeInterval

Expand All @@ -31,13 +32,16 @@ class HistogramQuery:
def __init__(self, strike_query_builder: db.query_builder.Strike):
self.strike_query_builder = strike_query_builder

def create(self, time_interval: TimeInterval, connection, region=None, envelope=None):
def create(self, time_interval: TimeInterval, connection_pool, region=None, envelope=None):
reference_time = time.time()
query = self.strike_query_builder.histogram_query(db.table.Strike.table_name, time_interval, 5, region, envelope)
histogram_query = connection.runQuery(str(query), query.get_parameters())
histogram_query.addCallback(self.build_result, minutes=time_interval.minutes(), bin_size=5,
reference_time=reference_time)
return histogram_query

query = self.strike_query_builder.histogram_query(db.table.Strike.table_name, time_interval, 5, region,
envelope)

result = execute(connection_pool, query)
result.addCallback(self.build_result, minutes=time_interval.minutes(), bin_size=5,
reference_time=reference_time)
return result

@staticmethod
def build_result(query_result, minutes, bin_size, reference_time):
Expand Down
Loading