Project End to End Documentation:
https://deepwiki.com/ViinayKumaarMamidi/Databricks_Travel_Booking_SCD2_Project
This project demonstrates a comprehensive data engineering pipeline for travel booking data processing, implementing SCD2 (Slowly Changing Dimension Type 2) patterns with Delta Lake and PySpark. The pipeline processes daily booking transactions and customer master data with data quality validation, dimension management, and fact table aggregation.
CSV Files → Bronze Layer → Data Quality → Silver Layer → Analytics
↓ ↓ ↓ ↓ ↓
Raw Data Raw Ingestion DQ Validation SCD2 Dims Fact Tables
Customer + Metadata + PyDeequ + Surrogate + Aggregations
Booking + Audit Trail + Logging Keys + Analytics
- Bronze Layer: Raw data ingestion with metadata enrichment
- Silver Layer: SCD2 dimensions and aggregated fact tables
- Gold Layer: Analytics and reporting tables (via SQL queries)
Travel_Booking_SCD2_Merge_Project/
├── notebooks/
│ ├── validate_inputs.ipynb # Input validation and audit logging
│ ├── 10_ingest_bookings_bronze.ipynb # Booking data ingestion
│ ├── 11_ingest_customers_bronze.ipynb # Customer data ingestion
│ ├── 20_dq_bookings.ipynb # Booking data quality checks
│ ├── 21_dq_customers.ipynb # Customer data quality checks
│ ├── 30_customer_dim_scd2.ipynb # SCD2 customer dimension
│ ├── 31_booking_fact_build.ipynb # Booking fact table
│ ├── 40_optimize_zorder.ipynb # Table optimization
│ └── 41_analyze_stats.ipynb # Statistics analysis
├── queries/
│ ├── travel_booking_init.sql # Schema initialization
│ ├── customer360.sql # Customer analytics
│ ├── daily_revenue.sql # Revenue analytics
│ ├── data_quality_summary.sql # DQ reporting
│ └── log_completion_flow.sql # Pipeline completion
├── booking_data/ # Sample booking CSV files
├── customer_data/ # Sample customer CSV files
└── README.md # This file
- Files:
bookings_YYYY-MM-DD.csv - Purpose: Daily booking transaction records
- Schema:
booking_id,customer_id,booking_type,amount,discount,quantity,booking_date
- Files:
customers_YYYY-MM-DD.csv - Purpose: Customer master data with potential attribute changes
- Schema:
customer_id,customer_name,customer_address,email
- Purpose: Validates input parameters and file existence
- Features:
- Parameter extraction with defaults
- File existence validation
- Audit logging setup
- Run tracking
- Purpose: Raw booking data ingestion
- Features:
- CSV parsing with schema inference
- Metadata enrichment (ingestion_time, business_date)
- Delta table storage with append mode
- Purpose: Raw customer data ingestion with SCD2 preparation
- Features:
- SCD2 temporal columns (valid_from, valid_to, is_current)
- Business date partitioning
- Delta table storage
- Purpose: Comprehensive booking data quality checks
- Checks:
- Data existence (row count > 0)
- Completeness (customer_id, amount)
- Non-negativity (amount, quantity, discount)
- Framework: PyDeequ with error-level validation
- Purpose: Customer data quality validation
- Checks:
- Data existence and completeness
- Email format validation
- Required field validation
- Purpose: Implements SCD2 for customer dimension
- Features:
- Surrogate key generation (IDENTITY column)
- Historical version tracking
- Merge logic for attribute changes
- Temporal column management
- Purpose: Creates aggregated booking fact table
- Features:
- Daily grain aggregation
- Customer surrogate key integration
- Financial calculations (amount - discount)
- Idempotent merge operations
- Purpose: Optimizes table performance
- Features:
- Z-ORDER BY clustering
- VACUUM operations
- Performance tuning
- Purpose: Updates table statistics
- Features:
- ANALYZE TABLE commands
- Query optimization support
- Historical Tracking: Maintains all versions of customer attributes
- Surrogate Keys: Auto-generated identity columns for performance
- Temporal Columns: valid_from, valid_to, is_current for time-based queries
- Merge Logic: Closes old versions and creates new ones for changes
- PyDeequ Integration: Comprehensive DQ checks with configurable levels
- Audit Logging: All DQ results stored for monitoring and reporting
- Error Handling: Pipeline stops on DQ failures to prevent bad data
- ACID Properties: Transactional consistency across operations
- Schema Evolution: Automatic schema updates with mergeSchema
- Time Travel: Historical data access capabilities
- Optimization: Z-ORDER and VACUUM for performance
- Widget-based: Configurable parameters via Databricks widgets
- Default Values: Sensible defaults for all parameters
- Flexibility: Easy customization for different environments
- Databricks workspace with Unity Catalog
- PyDeequ library installed
- Appropriate permissions for table creation
# Upload sample CSV files to Volumes
# Structure: /Volumes/{catalog}/{schema}/data/
# ├── booking_data/
# │ └── bookings_YYYY-MM-DD.csv
# └── customer_data/
# └── customers_YYYY-MM-DD.csv# Set widget parameters (optional - defaults provided)
dbutils.widgets.text("arrival_date", "2025-01-15")
dbutils.widgets.text("catalog", "travel_bookings")
dbutils.widgets.text("schema", "default")
dbutils.widgets.text("base_volume", "/Volumes/travel_bookings/default/data")# Run notebooks in sequence:
# 1. validate_inputs.ipynb
# 2. 10_ingest_bookings_bronze.ipynb
# 3. 11_ingest_customers_bronze.ipynb
# 4. 20_dq_bookings.ipynb
# 5. 21_dq_customers.ipynb
# 6. 30_customer_dim_scd2.ipynb
# 7. 31_booking_fact_build.ipynb
# 8. 40_optimize_zorder.ipynb
# 9. 41_analyze_stats.ipynbCREATE TABLE customer_dim (
customer_sk BIGINT GENERATED ALWAYS AS IDENTITY, -- Surrogate key
customer_id INT, -- Natural key
customer_name STRING,
customer_address STRING,
email STRING,
valid_from DATE, -- SCD2 start date
valid_to DATE, -- SCD2 end date
is_current BOOLEAN -- Current version flag
)CREATE TABLE booking_fact (
booking_type STRING,
customer_id INT, -- Natural key
customer_sk BIGINT, -- Surrogate key
business_date DATE, -- Daily grain
total_amount_sum DOUBLE, -- Aggregated amount
total_quantity_sum BIGINT -- Aggregated quantity
)- Compares current dimension records with incoming data
- Identifies changes in: customer_name, customer_address, email
- Ignores changes in: customer_id (natural key)
- Close Old Version: Update valid_to and is_current for changed records
- Create New Version: Insert new record with updated attributes
- Surrogate Key: Auto-generated for new versions
-- Current customer data
SELECT * FROM customer_dim WHERE is_current = true
-- Historical customer data
SELECT * FROM customer_dim
WHERE customer_id = 123
AND '2025-01-15' BETWEEN valid_from AND valid_to
-- Customer changes over time
SELECT customer_id, customer_name, valid_from, valid_to
FROM customer_dim
WHERE customer_id = 123
ORDER BY valid_fromCREATE TABLE ops.dq_results (
business_date DATE,
dataset STRING,
check_name STRING,
status STRING,
constraint STRING,
message STRING,
recorded_at TIMESTAMP
)-- DQ failure summary
SELECT business_date, dataset, COUNT(*) as failed_checks
FROM ops.dq_results
WHERE status = 'Failure'
GROUP BY business_date, dataset
-- DQ trend analysis
SELECT business_date,
SUM(CASE WHEN status = 'Success' THEN 1 ELSE 0 END) as passed,
SUM(CASE WHEN status = 'Failure' THEN 1 ELSE 0 END) as failed
FROM ops.dq_results
GROUP BY business_date
ORDER BY business_date-- Optimize fact table for common query patterns
OPTIMIZE booking_fact ZORDER BY (business_date, customer_sk)
-- Optimize dimension table for lookups
OPTIMIZE customer_dim ZORDER BY (customer_id, is_current)-- Update statistics for query optimization
ANALYZE TABLE booking_fact COMPUTE STATISTICS
ANALYZE TABLE customer_dim COMPUTE STATISTICSThe pipeline is designed to be executed as Databricks workflows:
- Tasks: Sequential execution of all notebooks
- Dependencies: Each notebook depends on previous completion
- Error Handling: Stop on failure with detailed logging
- Tasks: Execution of SQL queries for reporting
- Trigger: Triggered by Workflow 1 completion
- Output: Analytics tables and reports
{
"name": "travel_booking_pipeline",
"tasks": [
{
"task_key": "validate_inputs",
"notebook_task": {
"notebook_path": "/notebooks/validate_inputs"
}
},
{
"task_key": "ingest_bronze",
"depends_on": [{"task_key": "validate_inputs"}],
"notebook_task": {
"notebook_path": "/notebooks/10_ingest_bookings_bronze"
}
}
]
}- Comprehensive validation at multiple stages
- Automated DQ monitoring and alerting
- Graceful error handling and logging
- Z-ORDER clustering for common query patterns
- Statistics updates for query optimization
- Efficient merge operations with Delta Lake
- Clear naming conventions and documentation
- Parameterized notebooks for flexibility
- Modular design for easy updates
- Incremental processing by business date
- Delta Lake for large-scale data processing
- Optimized storage and query performance
- Symptom:
FileNotFoundErrorin validate_inputs - Solution: Verify file paths and ensure CSV files exist
- Symptom: Merge operations fail in customer_dim
- Solution: Check for duplicate customer_ids or schema mismatches
- Symptom: PyDeequ checks fail
- Solution: Review source data quality and adjust DQ rules
- Check run_log table for pipeline status
- Review DQ results for data quality issues
- Validate input data format and content
- Check Unity Catalog permissions
- Machine learning integration for booking predictions
- Customer segmentation and behavior analysis
- Revenue forecasting and trend analysis
- Streaming data ingestion with Delta Live Tables
- Real-time SCD2 processing
- Event-driven architecture
- Column-level security and masking
- Data lineage tracking
- Compliance and audit frameworks
This Travel Booking SCD2 Merge Project demonstrates a production-ready data engineering pipeline with comprehensive SCD2 implementation, data quality validation, and performance optimization. The pipeline provides a solid foundation for travel booking data processing with built-in monitoring, audit trails, and scalability features.