Skip to content

Malav4217/Retail-Intelligence-Platform

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

3 Commits
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Retail Intelligence & Data Platform

An end-to-end, production-grade ELT pipeline built on Microsoft Fabric that modernizes 8+ years of legacy retail POS data into a high-performance Star Schema for business intelligence and predictive analytics.

Engineer: Malav Patel
Platform: Microsoft Fabric (OneLake, Data Factory, Synapse Data Engineering)
Compute: PySpark (Spark 3.4), Spark SQL, Python, Pandas
Storage: Delta Lake (ACID compliant)
BI Layer: Power BI (Direct Lake mode)


Project Overview

A convenience store's 8+ years of Point of Sale history existed as hundreds of semi-structured legacy .xls and .xlsx files with inconsistent schemas, where transaction metadata (ID and Date) was stored only in header rows and needed to be propagated to every line-item product row. This platform ingests, validates, and models that raw data into a clean Star Schema, enabling sub-second Power BI dashboard queries via Direct Lake connectivity.

Key Results:

  • 676,550 raw records ingested into the Bronze layer
  • 351,275 rows promoted to the Silver analytics layer
  • ~150,812 unique transaction IDs identified across the historical dataset
  • Fully automated pipeline from file-drop to Gold layer aggregates

Architecture

1. Medallion Pipeline — End to End

Shows the full data flow from raw POS Excel files through Bronze, Silver, and Gold layers to the Power BI dashboard, including the Data Factory orchestration trigger and archive/cleanup steps.

flowchart TD
    A([POS Excel exports\n.xls / .xlsx — 8+ years\n~676K raw records])
    A -->|File-drop trigger\nFabric Data Factory| B
    subgraph BRONZE ["Bronze Layer — Raw Ingestion"]
        B[01_bronze_layer\nIdempotent load · header sanitize\n_source_path tracking · dropna]
        C[01_bronze_recursive_ingestion\nNested folder traversal\nbronze_details_transactions\nbronze_tran_transactions]
    end
    BRONZE -->|_ingested_at watermark| SILVER
    subgraph SILVER ["Silver Layer — Validation & Hierarchy"]
        D[02_silver_layer\nForward-fill Window · Smart Revenue Rule\nIncremental watermark · Quality gate\nAuto date dimension · 351,275 rows promoted]
    end
    SILVER -->|Full recalculation| GOLD
    subgraph GOLD ["Gold Layer — Analytics & Aggregates"]
        E[03_gold_layer\nMonthly P&L · Product heroes\nDaily heatmap · Market basket · Basket health]
    end
    GOLD --> F([Power BI\nDirect Lake mode\nSub-second queries])
    BRONZE -->|Binary copy| G([Archive\nFiles/Bronze/Archive])
    BRONZE -->|Wildcard purge| H([Landing zone cleared\nFiles/Bronze/Landing])
Loading

2. Delta Table Schema Registry

All 11 Delta tables across the three layers, showing which Bronze tables feed which Silver tables, and which Silver tables power which Gold aggregates.

flowchart LR
    subgraph BRONZE ["Bronze"]
        B1[bronze_details_transactions\nRaw line-items · _source_path\noriginal_row_index]
        B2[bronze_tran_transactions\nRaw headers · _ingested_at\ndata_year]
    end
    subgraph SILVER ["Silver"]
        S1[silver_details_transactions\nFACT TABLE · 351,275 rows\ntransaction_id · product_name\ngross_revenue · gross_profit]
        S2[silver_tran_summary\nDIMENSION\ntransaction_id · basket_value\nitems_in_basket · payment_method]
        S3[silver_date_dimension\nAuto-generated\ncalendar_date · year\nmonth_name · day_of_week]
        S4[quarantine_details_transactions\nFailed quality gate\nnull transaction_id or name]
    end
    subgraph GOLD ["Gold"]
        G1[gold_monthly_pnl\nfiscal_year · fiscal_month\nnet_revenue · profit_margin_pct]
        G2[gold_product_performance\nproduct_name · total_profit\nunits_sold · avg_retail_price]
        G3[gold_daily_sales\nday_of_week · txn_volume\ntotal_revenue]
        G4[gold_basket_analysis\nproduct_a · product_b\nassociation_count]
        G5[gold_basket_health\nfiscal_year · avg_items\navg_basket_value]
    end
    B1 -->|Forward-fill Window| S1
    B1 -->|Failed gate| S4
    B2 -->|Standardize & dedup| S2
    S1 --> S3
    S1 --> G1
    S1 --> G2
    S1 --> G3
    S1 --> G4
    S2 --> G5
Loading

3. Data Factory Orchestration Flow

The automated execution sequence triggered on every file drop — Bronze to Silver to Gold notebooks, followed by binary archival and landing zone cleanup.

flowchart TD
    T([File-drop trigger\nFiles/Bronze/Landing])
    T --> B
    B[Execute Bronze notebook\nIdempotent ingest · sanitize · dropna]
    B --> S
    S[Execute Silver notebook\nIncremental watermark · window fill\nSmart Revenue Rule · quality gate]
    S --> G
    G[Execute Gold notebook\nFull recalculation · overwrite strategy\nAll 5 Gold tables refreshed]
    G --> AR
    G --> DL
    AR[Copy to Archive\nBinary mode — preserves .xls/.xlsx\nFiles/Bronze/Archive]
    DL[Delete landing files\nWildcard star · recursive purge\nDirectory structure preserved]
    AR --> PBI
    DL --> PBI
    PBI([Power BI Direct Lake\nDashboard auto-synced\nNo manual refresh needed])
Loading

4. Star Schema — Semantic Model

The formal Star Schema powering the Power BI Direct Lake semantic model, showing fact and dimension table relationships with cardinality.

erDiagram
    silver_details_transactions {
        string transaction_id FK
        timestamp transaction_timestamp
        string product_name
        int quantity
        decimal unit_price
        decimal gross_revenue
        decimal gross_profit
        int fiscal_year
    }
    silver_tran_summary {
        string transaction_id PK
        int register_id
        decimal total_basket_value
        int items_in_basket
        string payment_method
        int fiscal_year
    }
    silver_date_dimension {
        date calendar_date PK
        int year
        string month_name
        string day_of_week
    }
    gold_product_performance {
        string product_name PK
        int total_units_sold
        decimal total_revenue
        decimal total_profit
        decimal avg_retail_price
    }
    gold_monthly_pnl {
        int fiscal_year PK
        int fiscal_month PK
        int transaction_count
        decimal total_net_revenue
        decimal profit_margin_pct
    }
    silver_tran_summary ||--o{ silver_details_transactions : "1 header to many line-items"
    silver_date_dimension ||--o{ silver_details_transactions : "1 date to many transactions"
    gold_product_performance ||--o{ silver_details_transactions : "1 product to many sales"
Loading

Pipeline Notebooks

01 — Bronze Layer: Raw Ingestion

File: notebooks/01_bronze_layer.ipynb
Also: notebooks/01_bronze_recursive_ingestion.ipynb

Traverses the Files/Bronze/Landing directory in OneLake, identifies new .xls and .xlsx files not yet present in the _source_path tracking column, and performs idempotent loading into Delta tables.

Key engineering logic:

  • sanitize_header() — converts all column names to lowercase snake_case, replacing spaces, #, and % with safe characters, ensuring Delta Lake compatibility across inconsistently exported files
  • dropna(how='all') — removes completely empty rows and columns introduced by Excel formatting
  • data_year extracted from folder structure to enable fiscal year partitioning
  • Append-only strategy prevents re-processing of historical data on pipeline re-runs

02 — Silver Layer: Validation and Hierarchy

File: notebooks/02_silver_layer.ipynb

The most complex layer in the pipeline. Legacy Excel POS exports store the Transaction ID and Date only in the first row of a transaction block — every subsequent product line-item has null values in those fields. This notebook reconstructs the full transaction hierarchy using Spark Window functions.

Forward-fill windowing:

window_spec = Window.partitionBy("_source_path") \
                    .orderBy(F.col("original_row_index").cast("int")) \
                    .rowsBetween(Window.unboundedPreceding, 0)

df_filled = df_processed.withColumn(
    "final_transaction_id",
    F.last(F.when(
        (F.col("id") != "ID") & (F.col("id").isNotNull()), F.col("id")
    ), ignorenulls=True).over(window_spec)
)

Smart Revenue Rule — distinguishes between two types of line adjustments that appear in the same column:

  • Adjustments <= $0.05 (e.g. Massachusetts bottle deposits): treated as surcharges, added to gross revenue
  • Adjustments > $0.05 (e.g. promotional discounts): excluded from the mathematical total

Hybrid timestamp parser — handles two date formats present across 8 years of exports:

F.coalesce(
    F.to_timestamp(F.trim(F.col("raw_date_string"))),
    F.to_timestamp(F.trim(F.col("raw_date_string")), "dd-MMM-yyyy hh:mm a")
)

Incremental watermark — reads the max(_ingested_at) from the existing Silver table and only processes Bronze records ingested after that timestamp, significantly reducing Spark compute costs.

Data quality gate:

  • Records passing validation (transaction_id, product_name not null) → silver_details_transactions
  • Records failing validation → quarantine_details_transactions for engineering audit

Auto date dimension — generates silver_date_dimension by exploding a date sequence between min and max transaction timestamps, adding year, month_name, and day_of_week columns for Power BI time intelligence.


03 — Gold Layer: Analytics and Aggregates

File: notebooks/03_gold_layer.ipynb

Consumes the validated Silver tables to generate pre-aggregated business metrics optimized for Direct Lake Power BI consumption. All tables use an overwrite strategy to ensure 100% accurate historical reporting on every pipeline run.

Monthly P&L (gold_monthly_pnl): Groups by fiscal_year and fiscal_month, computing distinct transaction count, total net revenue, total gross profit, and derived profit_margin_pct. Distinct count of IDs prevents over-counting from multi-line transactions.

Product Hero analysis (gold_product_performance): Ranks the entire SKU library by absolute profit contribution rather than sales volume alone, enabling data-driven inventory decisions. Includes total_units_sold, total_revenue, total_profit, and avg_retail_price per product.

Daily operational heatmap (gold_daily_sales): Extracts day_of_week from transaction_timestamp and aggregates transaction volume and revenue by weekday, providing workforce planning and peak traffic insights.

Market Basket Discovery (gold_basket_analysis): Implements association rule mining at scale using a self-join on transaction_id:

df_pairs = df_unique_basket.alias("pro_a") \
    .join(df_unique_basket.alias("pro_b"), "transaction_id") \
    .filter(F.col("pro_a.product_name") < F.col("pro_b.product_name"))

Lexicographical filtering (a < b) prevents duplicate pair records. Identifies which SKUs most frequently co-occur in the same basket for promotional bundling and store layout optimization.

Basket Health (gold_basket_health): Aggregates avg_items_per_basket, avg_basket_value, and record_basket_value by fiscal year to track consumer spending depth over the 8-year dataset.


Orchestration (Fabric Data Factory)

The pipeline is fully automated via a Fabric Data Factory pipeline triggered on file arrival in Files/Bronze/Landing:

  1. Execute Bronze notebook — idempotent ingestion of new files
  2. Execute Silver notebook — incremental transformation and validation
  3. Execute Gold notebook — full recalculation of all business aggregates
  4. Copy to Archive — binary copy of original files to Files/Bronze/Archive, preserving .xls/.xlsx format exactly
  5. Delete landing files — wildcard * recursive purge of processed files, keeping the directory structure intact for the next trigger

Semantic Model Design

The Gold and Silver layers are modeled as a formal Star Schema for Power BI Direct Lake connectivity:

Table Role
silver_details_transactions Fact table — granular line-item events
silver_tran_summary Dimension — transaction-level headers
gold_product_performance Dimension — product attributes and profitability
silver_date_dimension Date dimension — time intelligence support
gold_monthly_pnl Aggregate fact — monthly financial reporting

All relationships are configured as One-to-Many with single-directional cross-filtering to prevent query ambiguity and optimize Spark read performance.

Direct Lake advantage: Power BI queries Delta files directly in OneLake without the latency of DirectQuery or the memory overhead of Import Mode, delivering sub-second dashboard response times.


Performance Optimizations

  • Incremental watermarking — Silver layer processes only new Bronze records, minimizing Spark compute-hour costs on every run
  • Precise type castingDecimalType(10,2) for all financial metrics prevents floating-point arithmetic errors critical to revenue reconciliation
  • Overwrite strategy in Gold — ensures complete historical accuracy without append-related drift
  • V-Order and compaction — Delta tables optimized within the Fabric environment to reduce small files and accelerate Spark read operations
  • Distinct count for transactionscountDistinct("transaction_id") in aggregations prevents multi-line item over-counting

Repository Structure

retail-intelligence-platform/
│
├── README.md
├── naming_conventions.md        ← Coding standards (snake_case, PascalCase, etc.)
├── .gitignore
│
├── config/
│   ├── config.py                ← Lakehouse paths, column mapping, quality constants
│   └── schema_definition.py    ← Explicit PySpark StructType for Bronze ingestion
│
├── docs/
│   └── architecture.md          ← Full pipeline, table schema, and orchestration diagrams
│
└── notebooks/
    ├── 01_bronze_layer.ipynb
    ├── 01_bronze_recursive_ingestion.ipynb
    ├── 02_silver_layer.ipynb
    └── 03_gold_layer.ipynb

Setup

This pipeline runs inside Microsoft Fabric. To deploy:

  1. Create a Fabric Workspace and Lakehouse named lh_retail_intelligence
  2. Upload config/config.py and update WORKSPACE_ID and LAKEHOUSE_ID
  3. Import notebooks into Synapse Data Engineering and attach to the Lakehouse
  4. Create a Data Factory pipeline with the execution sequence described in the Orchestration section
  5. Drop .xls or .xlsx POS export files into Files/Bronze/Landing to trigger the pipeline

Roadmap

  • Finalize Power BI Executive Dashboard with DAX measures for Year-over-Year growth analysis
  • Implement demand forecasting using Synapse ML on the Gold layer tables
  • Transition to Eventstream ingestion if the store upgrades to a real-time API-based POS system
  • Expose data lineage through Fabric's automatic lineage view (_ingested_at and _source_path metadata preserved throughout)

About

Production-grade ELT pipeline on Microsoft Fabric modernizing 8+ years of retail POS data into a Star Schema using Medallion Architecture. Built with PySpark, Delta Lake, and Power BI Direct Lake mode.

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors