-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsync_sql_join.py
More file actions
96 lines (79 loc) · 2.82 KB
/
sync_sql_join.py
File metadata and controls
96 lines (79 loc) · 2.82 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
"""Sync strategy: temp table + SQL join.
Loads the mutated CSV into a temporary SQLite table in batches, then
uses SQL JOINs to count inserts, updates, and deletes entirely inside
the database engine. Python never holds more than one batch of rows —
all diff logic runs in SQLite's C code.
Read-only: the records table is never modified.
"""
import argparse
from common import BATCH_SIZE, setup_db, open_db, measure, cleanup
def sync(conn, csv_path):
conn.execute(
"""
CREATE TEMP TABLE csv_import (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
email TEXT NOT NULL,
amount REAL NOT NULL
)
"""
)
# Stream CSV into the temp table in batches
with open(csv_path) as f:
next(f) # skip header
batch = []
for line in f:
line = line.rstrip("\n")
if not line:
continue
parts = line.split(",", 3)
batch.append((int(parts[0]), parts[1], parts[2], float(parts[3])))
if len(batch) >= BATCH_SIZE:
conn.executemany(
"INSERT INTO csv_import VALUES (?, ?, ?, ?)", batch
)
batch.clear()
if batch:
conn.executemany(
"INSERT INTO csv_import VALUES (?, ?, ?, ?)", batch
)
# Count deletes and updates in one pass over records
deleted, updated = conn.execute(
"""
SELECT
SUM(CASE WHEN c.id IS NULL THEN 1 ELSE 0 END),
SUM(CASE WHEN c.id IS NOT NULL
AND (r.name != c.name OR r.email != c.email
OR r.amount != c.amount)
THEN 1 ELSE 0 END)
FROM records r
LEFT JOIN csv_import c ON r.id = c.id
"""
).fetchone()
# Count inserts: rows in CSV but not in DB
inserted = conn.execute(
"""
SELECT COUNT(*) FROM csv_import c
LEFT JOIN records r ON c.id = r.id
WHERE r.id IS NULL
"""
).fetchone()[0]
conn.execute("DROP TABLE csv_import")
return updated, deleted, inserted
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Sync using temp table + SQL join")
parser.add_argument("--csv", default="data.csv")
parser.add_argument("--mutated", default="data_mutated.csv")
parser.add_argument("--db", help="Path to existing database (skip setup/cleanup)")
parser.add_argument("--json-output", help="Write metrics JSON to this file")
args = parser.parse_args()
db_path = args.db
if not db_path:
db_path = setup_db(args.csv)
conn = open_db(db_path)
def run():
return sync(conn, args.mutated)
measure("SQL join", run, json_output=args.json_output)
conn.close()
if not args.db:
cleanup()