-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdb_migration_tool.py
More file actions
390 lines (311 loc) · 14.9 KB
/
db_migration_tool.py
File metadata and controls
390 lines (311 loc) · 14.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
# db_migration_tool.py
"""Command-line tool for database migrations."""
import argparse
import sys
from db.schema_manager import SchemaManager
def initialize_version_table(db_path, target_version):
"""Initialize the version table at a specific version."""
import sqlite3
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# Check if the version table exists
cursor.execute("""
SELECT name FROM sqlite_master
WHERE type='table' AND name='db_version'
""")
if not cursor.fetchone():
# Create version table if it doesn't exist
cursor.execute("""
CREATE TABLE db_version (
version INTEGER PRIMARY KEY,
applied_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
print("Created db_version table")
# Check if the version already exists
cursor.execute("SELECT version FROM db_version WHERE version = ?", (target_version,))
if cursor.fetchone():
print(f"Version {target_version} already exists in the version table")
else:
# Insert the target version
cursor.execute("INSERT INTO db_version (version) VALUES (?)", (target_version,))
print(f"Added version {target_version} to the version table")
# Show all versions
cursor.execute("SELECT version FROM db_version ORDER BY version")
versions = [row[0] for row in cursor.fetchall()]
print(f"Current versions in db_version table: {versions}")
conn.commit()
conn.close()
print(f"Version table initialized at version {target_version}")
def main():
parser = argparse.ArgumentParser(
description="Database Migration Tool for RectangularFile",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Check current database status
python db_migration_tool.py status
# Apply pending migrations
python db_migration_tool.py migrate
# Preview migrations without applying them
python db_migration_tool.py migrate --dry-run
# Initialize a new database
python db_migration_tool.py init
# Preview database initialization
python db_migration_tool.py init --dry-run
# View the current database schema
python db_migration_tool.py schema
# Initialize the version table at a specific version
python db_migration_tool.py init-version --version 3
"""
)
# Commands
subparsers = parser.add_subparsers(dest='command', help='Command to run')
# Status command
status_parser = subparsers.add_parser('status', help='Check database status')
status_parser.add_argument('--db-path', default='/mnt/rectangularfile/pdf_index.db',
help='Path to the database file')
# Migrate command
migrate_parser = subparsers.add_parser('migrate', help='Apply database migrations')
migrate_parser.add_argument('--db-path', default='/mnt/rectangularfile/pdf_index.db',
help='Path to the database file')
migrate_parser.add_argument('--dry-run', action='store_true',
help='Show what would be done without making changes')
# Initialize command
init_parser = subparsers.add_parser('init', help='Initialize a new database')
init_parser.add_argument('--db-path', default='/mnt/rectangularfile/pdf_index.db',
help='Path to the database file')
init_parser.add_argument('--dry-run', action='store_true',
help='Show what would be done without making changes')
# Schema command
schema_parser = subparsers.add_parser('schema', help='Print the current database schema')
schema_parser.add_argument('--db-path', default='/mnt/rectangularfile/pdf_index.db',
help='Path to the database file')
schema_parser.add_argument('--table', help='Print schema for a specific table only')
schema_parser.add_argument('--output', help='Save schema to a file instead of printing to console')
# Initialize version command
init_version_parser = subparsers.add_parser('init-version',
help='Initialize the version table at a specific version')
init_version_parser.add_argument('--db-path', default='/mnt/rectangularfile/pdf_index.db',
help='Path to the database file')
init_version_parser.add_argument('--version', type=int, required=True,
help='Version number to initialize at')
args = parser.parse_args()
# If no command is provided, show help and exit
if not args.command:
parser.print_help()
return 0
# Create schema manager
schema_manager = SchemaManager(args.db_path)
if args.command == 'status':
# Show database status
import sqlite3
import os
if not os.path.exists(args.db_path):
print(f"Database file not found: {args.db_path}")
return 1
try:
conn = sqlite3.connect(args.db_path)
current_version = schema_manager.get_current_db_version(conn)
# Get available migrations
migrations = schema_manager.get_migrations()
latest_version = max(m["version"] for m in migrations) if migrations else 0
# Check what migrations are pending
pending = [m for m in migrations if m["version"] > current_version]
print(f"Database: {args.db_path}")
print(f"Current version: {current_version}")
print(f"Latest available version: {latest_version}")
print(f"Pending migrations: {len(pending)}")
if pending:
print("\nPending migrations:")
for m in pending:
print(f" Version {m['version']}: {m['description']}")
conn.close()
except Exception as e:
print(f"Error checking database status: {e}")
return 1
elif args.command == 'migrate':
# Apply migrations
import sqlite3
import os
if not os.path.exists(args.db_path):
print(f"Database file not found: {args.db_path}")
return 1
try:
conn = sqlite3.connect(args.db_path)
current_version = schema_manager.get_current_db_version(conn)
# Get available migrations
migrations = schema_manager.get_migrations()
# Check what migrations are pending
pending = [m for m in migrations if m["version"] > current_version]
if not pending:
print("Database is already at the latest version. No migrations to apply.")
conn.close()
return 0
print(f"Found {len(pending)} pending migrations:")
for m in pending:
print(f" Version {m['version']}: {m['description']}")
if args.dry_run:
print(f" SQL: {m['sql']}")
if args.dry_run:
print("\nDRY RUN: No changes have been made to the database.")
conn.close()
return 0
print("\nApplying migrations...")
success = schema_manager.check_and_apply_migrations()
if success:
print("Migrations applied successfully!")
else:
print("Failed to apply migrations.")
return 1
conn.close()
except Exception as e:
print(f"Error during migration: {e}")
return 1
elif args.command == 'init':
# Initialize database
import os
if os.path.exists(args.db_path):
print(f"Database already exists: {args.db_path}")
if args.dry_run:
print("\nDRY RUN: This command would prompt to delete and recreate the database.")
return 0
response = input("Do you want to delete and recreate it? (y/N): ")
if response.lower() != 'y':
print("Aborting.")
return 0
try:
os.remove(args.db_path)
print(f"Deleted existing database: {args.db_path}")
except Exception as e:
print(f"Error deleting database: {e}")
return 1
else:
print(f"Database file does not exist: {args.db_path}")
if args.dry_run:
print("\nDRY RUN: This command would create a new database with the following tables:")
for table_name in schema_manager.get_base_schema().keys():
print(f" - {table_name}")
print("\nWith the following indexes:")
for index_name in schema_manager.get_indexes().keys():
print(f" - {index_name}")
print("\nAnd set the database version to the latest migration:")
migrations = schema_manager.get_migrations()
latest_version = max(m["version"] for m in migrations) if migrations else 0
print(f" - Version {latest_version}")
return 0
if args.dry_run:
print("\nDRY RUN: No changes have been made to the database.")
return 0
success = schema_manager.initialize_database()
if success:
print(f"Database initialized successfully: {args.db_path}")
else:
print("Failed to initialize database.")
return 1
elif args.command == 'schema':
# Print the database schema
import sqlite3
import os
import textwrap
if not os.path.exists(args.db_path):
print(f"Database file not found: {args.db_path}")
return 1
try:
conn = sqlite3.connect(args.db_path)
cursor = conn.cursor()
# Get list of tables
if args.table:
tables = [args.table]
# Check if table exists
cursor.execute(f"SELECT name FROM sqlite_master WHERE type='table' AND name=?", (args.table,))
if not cursor.fetchone():
print(f"Table '{args.table}' not found in database.")
return 1
else:
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name")
tables = [row[0] for row in cursor.fetchall()]
if not tables:
print("No tables found in database.")
return 0
# Prepare output
output_lines = []
output_lines.append(f"Database: {args.db_path}")
output_lines.append(f"Tables: {len(tables)}")
output_lines.append("")
# Get schema for each table
for table in tables:
output_lines.append(f"Table: {table}")
output_lines.append("-" * (len(table) + 7))
# Get CREATE TABLE statement
cursor.execute(f"SELECT sql FROM sqlite_master WHERE type='table' AND name=?", (table,))
create_stmt = cursor.fetchone()[0]
# Format the SQL statement for better readability
formatted_sql = textwrap.indent(create_stmt, ' ')
output_lines.append(formatted_sql)
# Get table columns and types
cursor.execute(f"PRAGMA table_info({table})")
columns = cursor.fetchall()
output_lines.append("\nColumns:")
for col in columns:
col_id, name, type_name, not_null, default_val, pk = col
constraints = []
if pk:
constraints.append("PRIMARY KEY")
if not_null:
constraints.append("NOT NULL")
if default_val is not None:
constraints.append(f"DEFAULT {default_val}")
constraint_str = f" ({', '.join(constraints)})" if constraints else ""
output_lines.append(f" {name}: {type_name}{constraint_str}")
# Get indexes for this table
cursor.execute(f"SELECT name, sql FROM sqlite_master WHERE type='index' AND tbl_name=? AND name NOT LIKE 'sqlite_%'", (table,))
indexes = cursor.fetchall()
if indexes:
output_lines.append("\nIndexes:")
for idx_name, idx_sql in indexes:
output_lines.append(f" {idx_name}: {idx_sql}")
# Get foreign keys
cursor.execute(f"PRAGMA foreign_key_list({table})")
foreign_keys = cursor.fetchall()
if foreign_keys:
output_lines.append("\nForeign Keys:")
for fk in foreign_keys:
fk_id, seq, ref_table, from_col, to_col, on_update, on_delete, match = fk
output_lines.append(f" {from_col} -> {ref_table}({to_col})")
if on_update != "NO ACTION":
output_lines.append(f" ON UPDATE: {on_update}")
if on_delete != "NO ACTION":
output_lines.append(f" ON DELETE: {on_delete}")
output_lines.append("\n")
# Output the schema information
schema_text = "\n".join(output_lines)
if args.output:
with open(args.output, 'w') as f:
f.write(schema_text)
print(f"Schema saved to {args.output}")
else:
print(schema_text)
conn.close()
except Exception as e:
print(f"Error getting database schema: {e}")
import traceback
traceback.print_exc()
return 1
elif args.command == 'init-version':
# Initialize version table
import os
if not os.path.exists(args.db_path):
print(f"Database file not found: {args.db_path}")
return 1
try:
initialize_version_table(args.db_path, args.version)
return 0
except Exception as e:
print(f"Error initializing version table: {e}")
import traceback
traceback.print_exc()
return 1
return 0
if __name__ == "__main__":
sys.exit(main())