-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy pathmigrate_data.py
More file actions
378 lines (300 loc) · 13.4 KB
/
migrate_data.py
File metadata and controls
378 lines (300 loc) · 13.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
#!/usr/bin/env python3
"""
MoltGrid Data Migration: SQLite -> PostgreSQL
Copies all data from the SQLite database to PostgreSQL with proper type
conversions. Handles BLOB-to-BYTEA for vector_memory embeddings, resets
SERIAL sequences for autoincrement tables, and verifies row counts.
Usage:
python migrate_data.py # Migrate all data
python migrate_data.py --verify # Verify row counts match
python migrate_data.py --table agents # Migrate single table
python migrate_data.py --dry-run # Print what would be migrated
python migrate_data.py --force # Truncate target tables before insert
"""
import argparse
import os
import sqlite3
import sys
import time
from datetime import datetime, timezone
# ─── Config ──────────────────────────────────────────────────────────────────
DB_PATH = os.getenv("MOLTGRID_DB", "moltgrid.db")
DATABASE_URL = os.getenv("DATABASE_URL", "")
# Tables with SERIAL PRIMARY KEY (need sequence reset after insert)
SERIAL_TABLES = {"uptime_checks", "pubsub_subscriptions"}
# Columns that need BLOB -> bytes conversion for BYTEA
BYTEA_COLUMNS = {
"vector_memory": {"embedding"},
}
# Batch size for executemany inserts
BATCH_SIZE = 500
# ─── Helpers ──────────────────────────────────────────────────────────────────
def get_sqlite_tables(sqlite_conn):
"""Get all user table names from SQLite, sorted alphabetically."""
cursor = sqlite_conn.cursor()
cursor.execute(
"SELECT name FROM sqlite_master WHERE type='table' "
"AND name NOT LIKE 'sqlite_%' ORDER BY name"
)
return [row[0] for row in cursor.fetchall()]
def get_sqlite_columns(sqlite_conn, table_name):
"""Get column names for a SQLite table in order."""
cursor = sqlite_conn.cursor()
cursor.execute("PRAGMA table_info(%s)" % table_name)
return [row[1] for row in cursor.fetchall()]
def get_pg_columns(pg_conn, table_name):
"""Get column names for a PostgreSQL table in order."""
cursor = pg_conn.execute(
"SELECT column_name FROM information_schema.columns "
"WHERE table_schema = 'public' AND table_name = %s "
"ORDER BY ordinal_position",
(table_name,),
)
return [row[0] for row in cursor.fetchall()]
def convert_row(table_name, columns, row):
"""Apply type conversions to a row before inserting into PostgreSQL.
- BLOB columns in vector_memory.embedding: ensure bytes type for BYTEA
- All other columns: pass through as-is
"""
bytea_cols = BYTEA_COLUMNS.get(table_name, set())
if not bytea_cols:
return row
converted = list(row)
for i, col in enumerate(columns):
if col in bytea_cols and converted[i] is not None:
# Ensure the value is bytes for PostgreSQL BYTEA
val = converted[i]
if isinstance(val, memoryview):
converted[i] = bytes(val)
elif not isinstance(val, bytes):
converted[i] = bytes(val)
return tuple(converted)
def pg_row_count(pg_conn, table_name):
"""Get row count from a PostgreSQL table."""
cursor = pg_conn.execute("SELECT COUNT(*) FROM %s" % table_name)
return cursor.fetchone()[0]
def sqlite_row_count(sqlite_conn, table_name):
"""Get row count from a SQLite table."""
cursor = sqlite_conn.cursor()
cursor.execute("SELECT COUNT(*) FROM %s" % table_name)
return cursor.fetchone()[0]
# ─── Migration ────────────────────────────────────────────────────────────────
def migrate_table(sqlite_conn, pg_conn, table_name, force=False):
"""Migrate a single table from SQLite to PostgreSQL.
Returns the number of rows migrated.
"""
# Check if target table already has rows
existing_count = pg_row_count(pg_conn, table_name)
if existing_count > 0 and not force:
print("Skipped %s: already has %d rows (use --force to overwrite)" % (table_name, existing_count))
return 0
if existing_count > 0 and force:
pg_conn.execute("DELETE FROM %s" % table_name)
print("Truncated %s: removed %d existing rows" % (table_name, existing_count))
# Get column info
sqlite_cols = get_sqlite_columns(sqlite_conn, table_name)
pg_cols = get_pg_columns(pg_conn, table_name)
# Use intersection of columns (in SQLite column order) to handle
# cases where PG has extra columns with defaults
common_cols = [c for c in sqlite_cols if c in pg_cols]
if not common_cols:
print("WARNING: No common columns for %s, skipping" % table_name)
return 0
# Read all rows from SQLite
col_list = ", ".join(common_cols)
cursor = sqlite_conn.cursor()
cursor.execute("SELECT %s FROM %s" % (col_list, table_name))
rows = cursor.fetchall()
if not rows:
print("Migrated %s: 0 rows (empty)" % table_name)
return 0
# Build INSERT statement with %s placeholders for psycopg
placeholders = ", ".join(["%s"] * len(common_cols))
insert_sql = "INSERT INTO %s (%s) VALUES (%s)" % (table_name, col_list, placeholders)
# Convert rows and batch insert
converted_rows = [convert_row(table_name, common_cols, row) for row in rows]
# Insert in batches using cursor (psycopg3 requires cursor for executemany)
cur = pg_conn.cursor()
for i in range(0, len(converted_rows), BATCH_SIZE):
batch = converted_rows[i : i + BATCH_SIZE]
cur.executemany(insert_sql, batch)
count = len(converted_rows)
print("Migrated %s: %d rows" % (table_name, count))
return count
def reset_serial_sequences(pg_conn):
"""Reset SERIAL sequences for tables with autoincrement PKs."""
for table_name in SERIAL_TABLES:
try:
pg_conn.execute(
"SELECT setval(pg_get_serial_sequence('%s', 'id'), "
"COALESCE((SELECT MAX(id) FROM %s), 0) + 1, false)"
% (table_name, table_name)
)
print("Reset sequence for %s" % table_name)
except Exception as e:
print("WARNING: Could not reset sequence for %s: %s" % (table_name, e))
# ─── Dry Run ──────────────────────────────────────────────────────────────────
def dry_run(table_filter=None):
"""Print what would be migrated without writing anything."""
if not os.path.exists(DB_PATH):
print("ERROR: SQLite database not found at %s" % DB_PATH)
sys.exit(1)
sqlite_conn = sqlite3.connect(DB_PATH)
tables = get_sqlite_tables(sqlite_conn)
if table_filter:
tables = [t for t in tables if t == table_filter]
if not tables:
print("ERROR: Table '%s' not found in SQLite database" % table_filter)
sqlite_conn.close()
sys.exit(1)
print("DRY RUN: Data migration plan")
print("Source: %s" % DB_PATH)
print("Target: PostgreSQL (DATABASE_URL)")
print("-" * 50)
print("%-30s %10s %s" % ("Table", "Rows", "Notes"))
print("-" * 50)
total_rows = 0
for table in tables:
count = sqlite_row_count(sqlite_conn, table)
total_rows += count
notes = []
if table in SERIAL_TABLES:
notes.append("SERIAL seq reset")
if table in BYTEA_COLUMNS:
notes.append("BLOB->BYTEA")
note_str = ", ".join(notes) if notes else ""
print("%-30s %10d %s" % (table, count, note_str))
print("-" * 50)
print("Total: %d tables, %d rows" % (len(tables), total_rows))
sqlite_conn.close()
# ─── Verification ─────────────────────────────────────────────────────────────
def verify():
"""Compare row counts between SQLite and PostgreSQL for every table."""
if not os.path.exists(DB_PATH):
print("ERROR: SQLite database not found at %s" % DB_PATH)
sys.exit(1)
if not DATABASE_URL:
print("ERROR: DATABASE_URL environment variable not set")
sys.exit(1)
import psycopg
sqlite_conn = sqlite3.connect(DB_PATH)
pg_conn = psycopg.connect(DATABASE_URL)
try:
tables = get_sqlite_tables(sqlite_conn)
print("Data Verification: SQLite vs PostgreSQL")
print("-" * 60)
print("%-30s %10s %10s %s" % ("Table", "SQLite", "PG", "Status"))
print("-" * 60)
mismatches = 0
total_sqlite = 0
total_pg = 0
for table in tables:
s_count = sqlite_row_count(sqlite_conn, table)
total_sqlite += s_count
try:
p_count = pg_row_count(pg_conn, table)
total_pg += p_count
except Exception:
p_count = "-"
mismatches += 1
if isinstance(p_count, int) and s_count == p_count:
status = "OK"
elif isinstance(p_count, int):
status = "MISMATCH"
mismatches += 1
else:
status = "TABLE MISSING"
print("%-30s %10d %10s %s" % (table, s_count, str(p_count), status))
print("-" * 60)
print("SQLite total: %d rows | PostgreSQL total: %d rows" % (total_sqlite, total_pg))
if mismatches > 0:
print("FAILED: %d table(s) with mismatches" % mismatches)
sys.exit(1)
else:
print("PASSED: All row counts match")
finally:
sqlite_conn.close()
pg_conn.close()
# ─── Full Migration ──────────────────────────────────────────────────────────
def migrate(table_filter=None, force=False):
"""Migrate all (or one) table from SQLite to PostgreSQL."""
if not os.path.exists(DB_PATH):
print("ERROR: SQLite database not found at %s" % DB_PATH)
sys.exit(1)
if not DATABASE_URL:
print("ERROR: DATABASE_URL environment variable not set")
sys.exit(1)
import psycopg
start_time = time.time()
start_ts = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
print("Data migration started at %s" % start_ts)
print("Source: %s" % DB_PATH)
print("Target: PostgreSQL")
print()
sqlite_conn = sqlite3.connect(DB_PATH)
sqlite_conn.row_factory = None # We want tuples, not Row objects
pg_conn = psycopg.connect(DATABASE_URL, autocommit=False)
try:
tables = get_sqlite_tables(sqlite_conn)
if table_filter:
tables = [t for t in tables if t == table_filter]
if not tables:
print("ERROR: Table '%s' not found in SQLite database" % table_filter)
sys.exit(1)
total_rows = 0
for table in tables:
rows = migrate_table(sqlite_conn, pg_conn, table, force=force)
total_rows += rows
# Reset SERIAL sequences
if not table_filter or table_filter in SERIAL_TABLES:
reset_serial_sequences(pg_conn)
# Commit the entire migration
pg_conn.commit()
elapsed = time.time() - start_time
end_ts = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
print()
print("Data migration complete at %s" % end_ts)
print("Total: %d tables, %d rows migrated in %.1f seconds" % (len(tables), total_rows, elapsed))
except Exception as e:
pg_conn.rollback()
print()
print("ERROR: Migration failed, all changes rolled back: %s" % e)
sys.exit(1)
finally:
sqlite_conn.close()
pg_conn.close()
# ─── Main ─────────────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(
description="MoltGrid Data Migration: SQLite -> PostgreSQL"
)
parser.add_argument(
"--verify",
action="store_true",
help="Verify row counts match between SQLite and PostgreSQL",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Print what would be migrated without writing",
)
parser.add_argument(
"--table",
type=str,
default=None,
help="Migrate a single table by name",
)
parser.add_argument(
"--force",
action="store_true",
help="Truncate target tables before inserting (overwrite existing data)",
)
args = parser.parse_args()
if args.verify:
verify()
elif args.dry_run:
dry_run(table_filter=args.table)
else:
migrate(table_filter=args.table, force=args.force)
if __name__ == "__main__":
main()