-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsqlite_subscriber.py
More file actions
120 lines (102 loc) · 3.17 KB
/
sqlite_subscriber.py
File metadata and controls
120 lines (102 loc) · 3.17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
"""
An example subscriber that stores all produced information on an SQLite db.
"""
import os
import json
import logging
import argparse
import sqlite3
from hyper.subscriber import ZMQSubscriber
DB_URI = os.getenv("DB_URI", "citybikes.db")
ZMQ_ADDR = os.getenv("ZMQ_ADDR", "tcp://127.0.0.1:5555")
ZMQ_TOPIC = os.getenv("ZMQ_TOPIC", "")
conn = sqlite3.connect(DB_URI)
cur = conn.cursor()
cur.executescript("""
CREATE TABLE IF NOT EXISTS networks (
tag TEXT PRIMARY KEY,
name TEXT,
latitude REAL,
longitude REAL,
meta BLOB
) WITHOUT ROWID;
CREATE TABLE IF NOT EXISTS stations (
hash TEXT PRIMARY KEY,
name TEXT,
latitude REAL,
longitude REAL,
stat BLOB,
network_tag TEXT
) WITHOUT ROWID;
""")
conn.commit()
log = logging.getLogger("subscriber")
class Sqlitesubscriber(ZMQSubscriber):
def handle_message(self, topic, message):
network = json.loads(message)
meta = network["meta"]
log.info("Processing %s", meta)
cursor = conn.cursor()
cursor.execute(
"""
INSERT INTO networks (tag, name, latitude, longitude, meta)
VALUES (?, ?, ?, ?, json(?))
ON CONFLICT(tag) DO UPDATE SET
name=excluded.name,
latitude=excluded.latitude,
longitude=excluded.longitude,
meta=json(excluded.meta)
""",
(
network["tag"],
meta["name"],
meta["latitude"],
meta["longitude"],
json.dumps(meta),
),
)
conn.commit()
log.info("[%s] Got %d stations" % (network["tag"], len(network["stations"])))
data_iter = (
(
s["id"],
s["name"],
s["latitude"],
s["longitude"],
json.dumps(
{
"bikes": s["bikes"],
"free": s["free"],
"timestamp": s["timestamp"],
"extra": s["extra"],
}
),
network["tag"],
)
for s in network["stations"]
)
cursor.executemany(
"""
INSERT INTO stations (hash, name, latitude, longitude, stat, network_tag)
VALUES (?, ?, ?, ?, json(?), ?)
ON CONFLICT(hash) DO UPDATE SET
name=excluded.name,
latitude=excluded.latitude,
longitude=excluded.longitude,
stat=json(excluded.stat),
network_tag=excluded.network_tag
""",
data_iter,
)
conn.commit()
log.info(
"[%s] Finished processing %d stations"
% (network["tag"], len(network["stations"]))
)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("-a", "--addr", default=ZMQ_ADDR)
parser.add_argument("-t", "--topic", default=ZMQ_TOPIC)
args, _ = parser.parse_known_args()
subscriber = SqliteSubscriber(args.addr, args.topic)
subscriber.reader()