-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathapp.py
More file actions
107 lines (94 loc) · 5.89 KB
/
app.py
File metadata and controls
107 lines (94 loc) · 5.89 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# ─────────────────────────────────────────────────────────────
# app.py — Streamlit Dashboard for Northwind Data Pipeline
# ─────────────────────────────────────────────────────────────
# This app provides a simple interactive interface to visualize
# outputs from the Py Data Engineering Workshop pipeline.
#
# Responsibilities:
# 1. Display aggregated sales metrics (customers, products, countries)
# 2. Show recent Data Quality (DQ) validation runs
# 3. Provide pipeline run guidance if outputs are missing
#
# The dashboard reads data directly from the /data folder,
# enabling a fully local and self-contained analytics workflow.
# ─────────────────────────────────────────────────────────────
import streamlit as st
import pandas as pd
from pathlib import Path
# ─────────────────────────────────────────────────────────────
# Directory Configuration (aligned with ETL paths.py)
# ─────────────────────────────────────────────────────────────
MODEL = Path("data/02-model") # modeled/aggregated outputs
CLEAN = Path("data/01-clean") # cleaned tables
DQDIR = CLEAN / "_dq" # data quality logs and reports
# ─────────────────────────────────────────────────────────────
# Streamlit App Configuration
# ─────────────────────────────────────────────────────────────
st.set_page_config(page_title="Py Data Engineering — Northwind", layout="wide")
st.title("🏪 Northwind — Py Dashboard")
# Create tabbed layout for key analytics sections
tab_sales, tab_country, tab_dq = st.tabs([
"📊 Sales (Customers)",
"🌍 Sales by Country",
"🧪 Data Quality"
])
# ─────────────────────────────────────────────────────────────
# TAB 1: Sales by Customer and Product
# ─────────────────────────────────────────────────────────────
with tab_sales:
p = MODEL / "sales_by_customer.parquet"
# Validate that the model file exists before visualization
if not p.exists():
st.warning("Run the pipeline first: `uv run etl` (or `etl-model`).")
else:
# Load pre-aggregated sales by customer
df = pd.read_parquet(p)
st.subheader("Top Customers")
st.bar_chart(df.set_index("CompanyName")) # visual ranking by total sales
# Optional: Sales by product (if model step included it)
p_prod = MODEL / "sales_by_product.parquet"
if p_prod.exists():
prod = pd.read_parquet(p_prod)
st.subheader("Top Products")
st.bar_chart(prod.set_index("ProductName"))
# ─────────────────────────────────────────────────────────────
# TAB 2: Sales by Country
# ─────────────────────────────────────────────────────────────
with tab_country:
p = MODEL / "sales_by_country.parquet"
if p.exists():
# Load aggregated data and render bar chart
country = pd.read_parquet(p)
st.subheader("Sales by Country")
st.bar_chart(country.set_index("Country"))
else:
# Provide instructional message if no model data available
st.info("Generate via pipeline → model stage")
# ─────────────────────────────────────────────────────────────
# TAB 3: Data Quality Monitoring
# ─────────────────────────────────────────────────────────────
with tab_dq:
runs_pq = DQDIR / "dq_runs.parquet"
issues_pq = DQDIR / "dq_issues.parquet"
if runs_pq.exists():
# Summary log: recent DQ runs (status + issue count)
runs = pd.read_parquet(runs_pq)
st.subheader("DQ Runs (latest 10)")
st.dataframe(runs.tail(10), use_container_width=True)
# Detailed log: issue-level reporting for latest run
if issues_pq.exists():
issues = pd.read_parquet(issues_pq)
if not issues.empty:
# Identify the most recent timestamp
last_ts = runs.sort_values("run_ts")["run_ts"].iloc[-1]
latest = issues[issues["run_ts"] == last_ts]
if latest.empty:
st.success("All checks passed 🎉")
else:
st.warning(f"{len(latest)} issue(s) in latest run")
st.dataframe(latest, use_container_width=True)
else:
st.success("All checks passed 🎉")
else:
# Fallback message if DQ checks haven’t been executed
st.info("No DQ runs yet.")