A specialized data migration framework designed for CTOs transitioning from monolithic databases to microservices architectures. PyMigrate enables zero-downtime, incremental migration with bi-directional synchronization, ensuring data consistency during the critical transition period while both systems remain operational.
- Real-time data synchronization between monolith and microservices
- Configurable conflict resolution strategies (last-write-wins, priority-based, custom)
- Sub-100ms sync latency for critical data paths
- Support for 10,000+ transactions per second
- Automatic analysis of database schema and access patterns
- Intelligent service boundary recommendations based on:
- Table relationships and foreign keys
- Query access patterns
- Transaction boundaries
- Minimize cross-service dependencies
- Automatic REST API generation for migrated services
- OpenAPI 3.0 specifications
- Built-in pagination, filtering, and validation
- FastAPI code generation with best practices
- Continuous data integrity monitoring
- Real-time discrepancy detection
- Automatic reconciliation options
- Detailed consistency reports
- Gradual traffic migration with fine-grained control
- Multiple routing strategies:
- Percentage-based
- Canary deployments
- Blue-green deployments
- Feature flags
- Instant rollback capabilities
- Sticky sessions for stateful services
# Clone the repository
git clone https://github.com/yourusername/pymigrate.git
cd pymigrate
# Create virtual environment
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
# Install the package
pip install -e .
# Install development dependencies
pip install -e ".[dev]"import asyncio
from pymigrate import (
SyncEngine,
ServiceBoundaryAnalyzer,
APIGenerator,
ConsistencyValidator,
TrafficRouter,
)
from pymigrate.models.config import (
DatabaseConfig,
DatabaseType,
SyncConfig,
ConflictResolutionStrategy,
)
async def main():
# Configure databases
source_db = DatabaseConfig(
type=DatabaseType.POSTGRESQL,
host="localhost",
port=5432,
database="monolith",
username="user",
password="password",
)
target_db = DatabaseConfig(
type=DatabaseType.POSTGRESQL,
host="localhost",
port=5433,
database="microservice",
username="user",
password="password",
)
# Setup bi-directional sync
sync_config = SyncConfig(
source_db=source_db,
target_db=target_db,
conflict_resolution=ConflictResolutionStrategy.LAST_WRITE_WINS,
sync_interval_ms=100,
)
sync_engine = SyncEngine(sync_config)
await sync_engine.start()
# Analyze service boundaries
analyzer = ServiceBoundaryAnalyzer(source_db)
boundaries = await analyzer.analyze()
# Generate APIs for services
for boundary in boundaries:
api_generator = APIGenerator(source_db)
api_spec = await api_generator.generate_api(boundary)
print(f"Generated API for {boundary.service_name}")
if __name__ == "__main__":
asyncio.run(main())# Setup traffic router
router = TrafficRouter()
# Configure route
route_config = RouteConfig(
service_name="user_service",
strategy=RoutingStrategy.PERCENTAGE,
percentage=0.0,
sticky_sessions=True,
)
await router.add_route(route_config, service_config)
# Gradually increase traffic
for percentage in [10, 25, 50, 75, 100]:
await router.update_traffic_percentage(
"user_service",
percentage,
gradual=True,
step_size=5.0,
interval_seconds=300 # 5 minutes between steps
)
# Monitor health
if not await router.health_checker.is_healthy("user_service"):
# Rollback if issues detected
await router.rollback_service("user_service")
break# Setup validator
validator = ConsistencyValidator(source_conn, target_conn)
# Run validation
report = await validator.validate_consistency(
tables=["users", "orders", "products"],
deep_check=True
)
print(f"Consistency: {report.consistency_percentage}%")
print(f"Discrepancies: {len(report.discrepancies)}")
# Setup continuous validation
await validator.continuous_validation(
tables=["critical_table"],
interval_seconds=60,
alert_threshold=0.99 # Alert if consistency drops below 99%
)# Define custom resolver
async def custom_user_resolver(source_change, target_change):
# Business logic: source wins for email, target wins for last_login
merged_data = source_change.data.copy()
merged_data["last_login"] = target_change.data.get("last_login")
return {
"winner": DataChange(
id=str(uuid4()),
table_name=source_change.table_name,
change_type=source_change.change_type,
timestamp=datetime.utcnow(),
data=merged_data,
source_system="merged"
),
"details": {"strategy": "custom_merge"}
}
# Register resolver
sync_engine.conflict_resolver.register_custom_resolver(
"users",
custom_user_resolver
)pymigrate/
βββ sync/ # Bi-directional synchronization
β βββ engine.py # Main sync engine
β βββ change_detector.py
β βββ conflict_resolver.py
βββ analyzer/ # Service boundary analysis
β βββ boundary.py # Boundary detection
β βββ dependency.py # Dependency analysis
β βββ pattern.py # Access pattern analysis
βββ generator/ # API generation
β βββ api.py # API generator
β βββ schema.py # Schema generation
β βββ endpoint.py # Endpoint builder
βββ validator/ # Consistency validation
β βββ consistency.py # Consistency checker
β βββ checksum.py # Data integrity
β βββ reconciliation.py
βββ router/ # Traffic routing
βββ traffic.py # Traffic router
βββ health.py # Health monitoring
βββ rollback.py # Rollback management
# Run all tests
pytest
# Run with coverage
pytest --cov=pymigrate
# Run specific test category
pytest tests/unit/
pytest tests/integration/
pytest tests/e2e/
# Generate test report (required)
pip install pytest-json-report
pytest --json-report --json-report-file=pytest_results.json- Sync Latency: <100ms for 95th percentile
- Throughput: 10,000+ changes/second
- API Generation: <30 seconds for 50-table service
- Consistency Check: 1M records in <60 seconds
- Routing Overhead: <1ms per request
- Fork the repository
- Create a feature branch (
git checkout -b feature/amazing-feature) - Commit your changes (
git commit -m 'Add amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request
This project is licensed under the MIT License - see the LICENSE file for details.
For issues, questions, or contributions, please visit our GitHub repository.