A high-performance Data Ingestion Project built with the Python dlt library. It is designed to move data from PostgreSQL to Databricks using CDC (Change Data Capture) for efficient synchronization.
Orchestrated natively by Databricks Lakeflow Jobs, this project serves as a robust blueprint for enterprise data replication.
This project focuses strictly on the Extract & Load (EL) phases of modern data engineering. Its primary goal is to establish a reliable Bronze Layer (Raw Data) in the Lakehouse.
- Role: Ingestion Engine (Source to Bronze).
- Architecture Philosophy: Decouples Ingestion from Transformation. By isolating the ingestion logic, we ensure that upstream failures do not break downstream business logic (Silver/Gold transformations), which can be handled separately by tools like dbt or Spark SQL.
- Core: Python 3.11+,
dlt(Data Load Tool), PySpark.- Full Load Engine:
dlt.sources.sql_database(Standard SQL Snapshot). - CDC Engine:
pg_replication(Wal2Json/pgoutput).- Customization Note: The
pg_replicationsource was modified to force Append-Only behavior. While standard replication tools often aim to mirror the database state (Merge), our Lakehouse architecture requires an immutable log of all events.
- Customization Note: The
- Full Load Engine:
- Infrastructure: Databricks (Unity Catalog, Volumes, Delta Lake), PostgreSQL.
- DevOps & CI/CD: GitHub Actions, Databricks Asset Bundles (DABs), OAuth Service Principals.
- Quality Engineering:
uv(Package Mgmt),ruff(Linting),mypy(Static Analysis),pytest(Testing).
- Near Real-Time Replication: Streams
INSERT,UPDATE, andDELETEoperations continuously using PostgreSQL logical replication (pgoutput) with low latency. - Dual-Mode Operation:
- Full Load Mode: High-performance initial load of historical data.
- CDC Mode: Incremental updates with exactly-once processing semantics.
- Databricks Native:
- Leverages Unity Catalog Volumes for efficient staging.
- Writes directly to Delta Tables with schema evolution.
- Enterprise CI/CD:
- Service Principal Authentication: Secure OAuth M2M authentication for deployments.
- Environment Isolation: Strict separation of concerns with dedicated catalogs (
dev_,qa_,prod_). - Automated Quality Gates: Integrated Linting (Ruff), Type Checking (Mypy), and Unit Testing (Pytest).
The current Databricks Lakeflow Job definition is configured with Manual Triggers by default.
- Why? Designed for On-Demand Demonstration. This allows reviewers to trigger execution immediately and verify results without waiting for scheduled windows or consuming idle compute resources in the demo environment.
- Production Recommendation:
- Hourly (1 hour): Ideal balance for batching accumulated CDC data.
- Continuous: For low-latency requirements, switching the job to "Continuous" mode enables stream-like processing.
The pipeline operates in two mutually exclusive modes to ensure reliability and clean separation of concerns.
- Destination Schema (e.g.,
bronze): Holds the final, queryable Delta Tables. - Operational Modes:
- Full Load (
REPLACE): Takes a snapshot of the source and replaces the target table. Used for initialization or full resets. - CDC (
APPEND): continuously reads the WAL (Write-Ahead Log) and appends every event to the target table.- Update Handling: An update generates a new row with the new values (versioning).
- Delete Handling: A delete generates a new row marking the event, preserving the original record (Soft Delete).
- Full Load (
flowchart LR
%% Global settings for curves and line colors
%%{init: {'theme': 'base', 'themeVariables': { 'lineColor': '#585858'}}}%%
%% --- Area 1: Source System ---
subgraph Source_System [Source System]
PG[("PostgreSQL")]:::db_source
end
%% --- Area 2: Databricks Lakeflow ---
subgraph Lakeflow_Jobs [Databricks Lakeflow Jobs]
subgraph Engine [Engine: dlt Library]
direction TB
FL[Mode: Full Load]:::white_box
CDC[Mode: CDC Stream]:::white_box
end
subgraph Unity_Catalog [Unity Catalog Storage]
subgraph Staging [Staging Layer]
direction TB
Vol_FL[Full Load Volume]:::white_box
Vol_CDC[CDC Stream Volume]:::white_box
end
Delta[("Delta Table")]:::db_delta
end
end
%% --- Connections and Flow ---
%% Upper Path: Full Load
PG -->|Snapshot Read| FL
FL -->|"1. Write Parquet"| Vol_FL
Vol_FL -.-o|"2. REPLACE (Swap)"| Delta
%% Lower Path: CDC
PG -->|WAL Stream| CDC
CDC -->|"1. Write Parquet"| Vol_CDC
Vol_CDC ==>|"2. APPEND (History)"| Delta
%% --- Styling (Classes) ---
classDef default fill:#fff,stroke:#333,stroke-width:1px;
%% Subgraph Styles (Containers)
style Source_System fill:#fce4ec,stroke:#f48fb1,stroke-width:2px,color:#880e4f
style Lakeflow_Jobs fill:#fff3e0,stroke:#ff9800,stroke-width:2px,stroke-dasharray: 5 5,color:#e65100
style Engine fill:#ffe0b2,stroke:#ffb74d,stroke-width:1px,color:#ef6c00
style Unity_Catalog fill:#e1f5fe,stroke:#4fc3f7,stroke-width:2px,color:#01579b
style Staging fill:#e3f2fd,stroke:none,color:#0277bd
%% Node Styles (Boxes and Databases)
classDef db_source fill:#212121,stroke:#000,color:#fff;
classDef db_delta fill:#0277bd,stroke:#fff,color:#fff;
classDef white_box fill:#ffffff,stroke:#90caf9,stroke-width:1px,color:#424242;
%% Fine-tuning Links
linkStyle default stroke:#757575,stroke-width:1px;
Note regarding Terminology: This documentation uses the term Full Load to describe the initial bulk load of data. Internally, this utilizes
dlt'swrite_disposition="replace"strategy. Whiledltinternally handles some state using "snapshots" (especially for logical replication), we strictly use "Full Load" to describe the user-facing operation of replacing the destination dataset with the source state.
- Python 3.11+
- uv (Fast Python package manager) - Install Guide
- Databricks Workspace (Unity Catalog enabled)
- PostgreSQL Database with
wal_level=logical
You can run the entire pipeline from your local machine. This is ideal for development, testing, and POCs.
uv syncCreate a .dlt/secrets.toml file in the project root with your credentials:
[sources.pg_replication.credentials]
database = "your_db"
password = "your_password"
host = "your_host"
port = 5432
username = "your_user"
[destination.databricks.credentials]
server_hostname = "dbc-xxxx.cloud.databricks.com"
http_path = "/sql/1.0/warehouses/xxxx"
access_token = "dapi..." # Or use CLI profile if configuredTip: If you have the Databricks CLI configured,
dltcan automatically use yourDEFAULTprofile credentials without putting them insecrets.toml.
Perform the initial full load of your data.
uv run python -m src.postgres_cdc.pipeline_main --mode full_load --catalog dev_chinook_lakehouse --dataset bronzeGenerate some fake sales data in your Postgres database to test CDC.
Requirement: This script wraps an external generator. You must clone day-1_sales_data_generator locally for it to work.
# Generate 5 inserts, 2 updates, 1 delete
uv run scripts/simulate_transactions.py 5 2 1Capture the changes and append them to Databricks.
uv run python -m src.postgres_cdc.pipeline_main --mode cdc --catalog dev_chinook_lakehouse --dataset bronzeValidate that inserts, updates, and deletes were correctly applied to the Delta tables.
uv run scripts/verify_data.pyThis project implements a robust Continuous Integration and Continuous Deployment pipeline using GitHub Actions.
Every Push and Pull Request triggers a strict validation pipeline:
- Linting:
uv run ruff check .(Enforces PEP-8 and clean code style). - Type Checking:
uv run mypy src/(Static analysis for type safety). - Unit Testing:
uv run pytest(Ensures logic correctness).- Coverage: Enforces minimum code coverage metrics (configured in
pyproject.toml).
- Coverage: Enforces minimum code coverage metrics (configured in
Deployments are managed via Databricks Asset Bundles (DABs).
Architecture Note: This project implements a Logical Isolation Strategy. Instead of using separate Databricks Workspaces for Dev/QA/Prod (which is common in large enterprises for physical isolation), we simulate these environments within a Single Workspace using distinct Unity Catalog Catalogs. This provides strong data separation while keeping infrastructure lean.
| Environment | Catalog | Trigger | Authentication |
|---|---|---|---|
| Development | dev_chinook_lakehouse |
Local CLI | User Credentials |
| QA | qa_chinook_lakehouse |
Push to main |
Service Principal |
| Production | prod_chinook_lakehouse |
GitHub Release | Service Principal |
For production and QA, the pipeline is deployed automatically via CI/CD. For manual deployments or testing:
Because this project simulates isolated environments using Unity Catalog, you must provision the catalogs manually before the first deployment:
databricks catalogs create dev_chinook_lakehouse
databricks catalogs create qa_chinook_lakehouse
databricks catalogs create prod_chinook_lakehouseThe job uses Databricks Secrets to securely access the database.
databricks secrets create-scope dlt_scope
databricks secrets put-secret dlt_scope pg_connection_string --string-value "postgresql://user:pass@host:port/db"Builds the Python wheel and uploads the job definition to your development environment.
databricks bundle deploy -t dev --profile DEFAULTTrigger the pipeline modes using parameters:
Full Load Job:
databricks bundle run postgres_cdc_job_definition --task-key full_load_task --profile DEFAULTCDC Stream Job:
databricks bundle run postgres_cdc_job_definition --task-key cdc_load_task --profile DEFAULT.
βββ pyproject.toml # Project definition, dependencies, and tool configs (Ruff, Mypy, Coverage)
βββ databricks.yml # Databricks Asset Bundle (DABs) definition for multi-env deployment
βββ .github/workflows/ # CI/CD Pipeline definitions
βββ src/
β βββ postgres_cdc/ # Main package
β βββ __init__.py
β βββ pipeline_main.py # Main orchestrator (entry point)
β βββ full_load.py # Full Load pipeline logic
β βββ cdc_load.py # CDC incremental pipeline logic
β βββ utils/ # Utility modules (logger)
β βββ pg_replication/ # Custom CDC source module
βββ tests/ # Unit tests
βββ scripts/ # Helper tools (outside package)
βββ resources/ # Databricks Job Definitions (YAML)
βββ .dlt/ # Local config and secrets
MIT
Serverless Job Execution Failure:
When running this pipeline as a Databricks Lakeflow Job on a Databricks Free Edition account using Serverless Compute, you may encounter a Connection refused error targeting the Unity Catalog Volumes storage endpoint (e.g., us-east-2.storage.cloud.databricks.com).
Hypothesis: This is likely due to strict network egress restrictions on the Free Tier Serverless compute environment, which blocks access to the external storage used by Unity Catalog Volumes.
Workaround: Execute the pipeline locally as demonstrated in the "Quick Start" section. Local execution successfully connects to Databricks via the public API.