-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathzs-load.py
More file actions
executable file
·90 lines (74 loc) · 2.27 KB
/
zs-load.py
File metadata and controls
executable file
·90 lines (74 loc) · 2.27 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
#!/usr/bin/env python
from newsSpiders.itertools import grouper
from pathlib import Path
import argparse
import json
import logging
import newsSpiders.pugsql as pugsql
import os
import re
import sys
import traceback
logging.basicConfig(level=os.getenv("LOG_LEVEL", "INFO"))
logger = logging.getLogger(__name__)
queries = pugsql.module("queries")
table_pat = re.compile("^(article)?snapshot\d*$", re.I)
def parse_args():
def table(value):
if not table_pat.match(value):
raise ValueError(f"invalid table name {value}")
return value
parser = argparse.ArgumentParser(
description="load article snapshot table data from JSONLines format"
)
parser.add_argument(
"-t",
"--table",
type=table,
help="name of the snapshot table to load",
required=True,
)
parser.add_argument(
"-i", "--input", help="input filename; from STDIN if not provided"
)
args = parser.parse_args()
return args
def load_dynamic_queries(queries, table):
queries.add_query(
f"""-- :name replace_snapshots :insert
REPLACE {table} (article_id, snapshot_at, raw_data, snapshot_at_date)
VALUES (:article_id, :snapshot_at, :raw_data, FROM_UNIXTIME(:snapshot_at))
"""
)
def load_snapshots(queries, fh):
i = 0
for lines in grouper(fh, 1000):
snapshots = [
{
key: value
for key, value in json.loads(line.rstrip()).items()
if key in ["article_id", "snapshot_at", "raw_data"]
}
for line in lines
if line
]
queries.replace_snapshots(*snapshots)
if i % 10000 == 0:
logger.info(f"loaded snapshot #{i}")
i += len(snapshots)
logger.info(f"loaded total {i} snapshots")
def main(table, input=None):
load_dynamic_queries(queries, table)
queries.connect(os.getenv("DB_URL"))
try:
if input is None:
load_snapshots(queries, sys.stdin)
else:
with Path(input).open("r") as fh:
load_snapshots(queries, fh)
except Exception:
logger.error(traceback.format_exc())
queries.disconnect()
if __name__ == "__main__":
import sys
sys.exit(main(**vars(parse_args())))