-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathetl.py
More file actions
94 lines (79 loc) · 3.48 KB
/
etl.py
File metadata and controls
94 lines (79 loc) · 3.48 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import pandas as pd
import sqlite3
import hashlib
from datetime import datetime
# Constants
DB_FILE = 'warehouse.db'
SOURCE_FILE = 'data/source_data.csv'
RECORD_SOURCE = 'csv_source'
LOAD_DATE = datetime.now().isoformat()
# Hash function (MD5 for Data Vault surrogate keys)
def hash_key(key: str) -> str:
return hashlib.md5(key.encode('utf-8')).hexdigest()
# Step 1: Extract - Read source data
df = pd.read_csv(SOURCE_FILE)
print("Extracted data:")
print(df)
# Step 2: Transform - Prepare Data Vault inserts
# Hubs and Sats for Customers
customers = df[['customer_id', 'customer_name']].drop_duplicates()
customers['customer_hash'] = customers['customer_id'].apply(hash_key)
customers['load_date'] = LOAD_DATE
customers['record_source'] = RECORD_SOURCE
# Hubs and Sats for Products
products = df[['product_id', 'product_name']].drop_duplicates()
products['product_hash'] = products['product_id'].apply(hash_key)
products['load_date'] = LOAD_DATE
products['record_source'] = RECORD_SOURCE
# Links and Sats for Orders
orders = df.copy()
orders['customer_hash'] = orders['customer_id'].apply(hash_key)
orders['product_hash'] = orders['product_id'].apply(hash_key)
orders['order_hash'] = orders.apply(lambda row: hash_key(f"{row['customer_id']}_{row['product_id']}_{row['order_date']}"), axis=1)
orders['load_date'] = LOAD_DATE
orders['record_source'] = RECORD_SOURCE
# Step 3: Load - Connect to DB and insert (upsert logic for immutability)
conn = sqlite3.connect(DB_FILE)
cursor = conn.cursor()
# Create tables (run SQL script)
with open('sql_scripts/create_tables.sql', 'r') as f:
cursor.executescript(f.read())
# Load Hub Customers (insert if not exists)
for _, row in customers.iterrows():
cursor.execute("""
INSERT OR IGNORE INTO hub_customer (customer_hash, customer_id, load_date, record_source)
VALUES (?, ?, ?, ?)
""", (row['customer_hash'], row['customer_id'], row['load_date'], row['record_source']))
# Load Sat Customers (always insert for history)
for _, row in customers.iterrows():
cursor.execute("""
INSERT INTO sat_customer (customer_hash, customer_name, load_date, record_source)
VALUES (?, ?, ?, ?)
""", (row['customer_hash'], row['customer_name'], row['load_date'], row['record_source']))
# Load Hub Products
for _, row in products.iterrows():
cursor.execute("""
INSERT OR IGNORE INTO hub_product (product_hash, product_id, load_date, record_source)
VALUES (?, ?, ?, ?)
""", (row['product_hash'], row['product_id'], row['load_date'], row['record_source']))
# Load Sat Products
for _, row in products.iterrows():
cursor.execute("""
INSERT INTO sat_product (product_hash, product_name, load_date, record_source)
VALUES (?, ?, ?, ?)
""", (row['product_hash'], row['product_name'], row['load_date'], row['record_source']))
# Load Link Orders
for _, row in orders.iterrows():
cursor.execute("""
INSERT OR IGNORE INTO link_order (order_hash, customer_hash, product_hash, load_date, record_source)
VALUES (?, ?, ?, ?, ?)
""", (row['order_hash'], row['customer_hash'], row['product_hash'], row['load_date'], row['record_source']))
# Load Sat Orders
for _, row in orders.iterrows():
cursor.execute("""
INSERT INTO sat_order (order_hash, order_date, quantity, load_date, record_source)
VALUES (?, ?, ?, ?, ?)
""", (row['order_hash'], row['order_date'], row['quantity'], row['load_date'], row['record_source']))
conn.commit()
conn.close()
print("ETL completed. Data loaded into warehouse.db")