-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy path__convert_to_async.py
More file actions
155 lines (130 loc) · 4.59 KB
/
__convert_to_async.py
File metadata and controls
155 lines (130 loc) · 4.59 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
#!/usr/bin/env python3
"""
Script to convert synchronous SQLAlchemy Session to AsyncSession in router files
"""
import re
import sys
from pathlib import Path
def convert_file_to_async(file_path: Path) -> tuple[bool, str]:
"""Convert a single file from sync to async session"""
try:
content = file_path.read_text(encoding='utf-8')
original_content = content
# 1. Replace imports
content = content.replace(
'from sqlalchemy.orm import Session',
'from sqlalchemy.ext.asyncio import AsyncSession'
)
content = content.replace(
'from app.db import get_db',
'from app.db import get_async_db'
)
# Add select import if not present
if 'from sqlalchemy import' in content and 'select' not in content:
content = re.sub(
r'from sqlalchemy import ([^\n]+)',
lambda m: f"from sqlalchemy import {m.group(1)}, select" if 'select' not in m.group(1) else m.group(0),
content,
count=1
)
# 2. Replace function parameter types
content = re.sub(
r'db:\s*Session\s*=\s*Depends\(get_db\)',
'db: AsyncSession = Depends(get_async_db)',
content
)
# 3. Convert query patterns
# db.query(Model) -> select(Model)
content = re.sub(
r'(\w+)\s*=\s*db\.query\(([^)]+)\)',
r'\1 = select(\2)',
content
)
# query = db.query(...) -> stmt = select(...)
content = re.sub(
r'query\s*=\s*db\.query\(([^)]+)\)',
r'stmt = select(\1)',
content
)
# .filter( -> .where(
content = re.sub(
r'(stmt|query)\.filter\(',
r'\1.where(',
content
)
# Add await before db operations
# await db.execute(stmt)
content = re.sub(
r'(?<!await )db\.(execute|commit|flush|refresh|rollback)\(',
r'await db.\1(',
content
)
# Convert .all() patterns
# result = db.execute(stmt); items = result.scalars().all()
content = re.sub(
r'(\w+)\s*=\s*(stmt|query)\.all\(\)',
r'result = await db.execute(\2)\n \1 = result.scalars().all()',
content
)
# Convert .first() patterns
content = re.sub(
r'(\w+)\s*=\s*(stmt|query)\.first\(\)',
r'result = await db.execute(\2)\n \1 = result.scalar_one_or_none()',
content
)
# Convert .count() patterns
content = re.sub(
r'(\w+)\s*=\s*(stmt|query)\.count\(\)',
r'count_result = await db.execute(select(func.count()).select_from(\2))\n \1 = count_result.scalar()',
content
)
# Check if any changes were made
if content == original_content:
return False, "No changes needed"
# Write back
file_path.write_text(content, encoding='utf-8')
return True, "Successfully converted"
except Exception as e:
return False, f"Error: {str(e)}"
def main():
"""Main conversion function"""
files_to_convert = [
"app/routers/tenants.py",
"app/routers/api_keys.py",
"app/routers/admin_versions.py",
"app/routers/webhooks.py",
"app/routers/tenant_metrics.py",
"app/routers/usage.py",
"app/routers/metrics.py",
"app/routers/jobs.py",
"app/routers/admin_webhooks.py",
"app/routers/admin_settings.py",
"app/routers/admin_rules.py",
"app/routers/admin_dashboard.py",
"app/routers/admin.py",
]
base_path = Path(__file__).parent
results = []
for file_rel_path in files_to_convert:
file_path = base_path / file_rel_path
if not file_path.exists():
results.append((file_rel_path, False, "File not found"))
continue
success, message = convert_file_to_async(file_path)
results.append((file_rel_path, success, message))
# Print results
print("\n" + "="*80)
print("CONVERSION RESULTS")
print("="*80)
for file_path, success, message in results:
status = "✓" if success else "✗"
print(f"{status} {file_path}: {message}")
# Summary
successful = sum(1 for _, s, _ in results if s)
total = len(results)
print("\n" + "="*80)
print(f"Successfully converted: {successful}/{total} files")
print("="*80 + "\n")
return 0 if successful == total else 1
if __name__ == "__main__":
sys.exit(main())