-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathupdate_high_tide_level_postgresql.py
More file actions
100 lines (86 loc) · 3.18 KB
/
update_high_tide_level_postgresql.py
File metadata and controls
100 lines (86 loc) · 3.18 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
import requests
import os
import json
import sys
import time
import datetime as dt
import logging
import sqlalchemy as db
from sqlalchemy.orm import Session
import pytz
DATABASE_URL = "postgresql:///opendata_ve_pg"
URL_TIDE = 'https://dati.venezia.it/sites/default/files/dataset/opendata/livello.json'
DT_TIDE_FORMAT = "%Y-%m-%d %H:%M:%S %z"
MAX_NUM_ROWS = 1 * 24 * 12 # num_days * 24 [hours in a day] * 12 [tide data in one hour]
MAX_ATTEMPTS = 3
INTER_ATTEMPT_TIME = 30 # seconds
# Set the logger
logger = logging.getLogger('high tide')
logger.setLevel(logging.DEBUG)
def get_tide_data():
# get the data from the tide
resp = requests.get(url=URL_TIDE)
if resp.status_code != 200:
logger.warning(f"Url tide does not reply correctly: status {resp.status_code}")
return {}
data = resp.json()
selected_data = extract_data_from_array(data)
if not selected_data:
return {}
return format_tide_data(selected_data)
def extract_data_from_array(array_tides):
for short_name in ["PSalute", "PS_Giud"]:
data = [d for d in array_tides if d["nome_abbr"] == short_name]
if data:
return data[0]
return []
def format_tide_data(data):
# we manually add the utc+1 that is the time how the data are stored
data_with_tzinfo = data["data"] + " +0100"
dt_with_tzinfo = dt.datetime.strptime(data_with_tzinfo, DT_TIDE_FORMAT)
return {
"id_station": data["ID_stazione"],
"station": data["stazione"],
"short_name": data["nome_abbr"],
"latDMSN": float(data["latDMSN"]),
"lonDMSE": float(data["lonDMSE"]),
"latDDN": float(data["latDDN"]),
"lonDDE": float(data["lonDDE"]),
# then we upload the time as basic utc
"updated_at": dt_with_tzinfo.astimezone(pytz.utc).replace(tzinfo=None),
"value": float(data["valore"][:-1])
}
def main():
counter_attempts = 0
while counter_attempts < MAX_ATTEMPTS:
# update the counter and sleep (except in the first case)
counter_attempts += 1
if counter_attempts > 0:
time.sleep(INTER_ATTEMPT_TIME)
tide = get_tide_data()
if not tide:
continue
tide["uploaded_at"] = dt.datetime.utcnow().replace(tzinfo=None)
# Set database
engine = db.create_engine(DATABASE_URL)
meta_data = db.MetaData(bind=engine)
db.MetaData.reflect(meta_data)
# get the table "tide"
tbl_tide = db.Table('tide', meta_data, autoload=True)
# insert in the table
new_tide = tbl_tide.insert(tide)
# commit the changes to the db
with Session(engine) as session:
num_rows = session.query(tbl_tide).count()
last_row = session.query(tbl_tide).order_by(tbl_tide.c.id.desc()).first()
if last_row.updated_at == tide["updated_at"]:
continue
else:
if num_rows >= 100:
to_be_deleted = tbl_tide.delete().where(tbl_tide.c.id < last_row.id - MAX_NUM_ROWS)
session.execute(to_be_deleted)
session.execute(new_tide)
session.commit()
break
if __name__ == "__main__":
main()