-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathp1.py
More file actions
221 lines (187 loc) · 8.52 KB
/
p1.py
File metadata and controls
221 lines (187 loc) · 8.52 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
# Modified data_ingestion.py
import pandas as pd
import json
import mysql.connector
from mysql.connector import Error
import os
import requests # New import for making HTTP requests
# --- Configuration ---
# MySQL database connection details
DB_CONFIG = {
'host': 'localhost', # Or your MySQL host IP/hostname
'database': 'ecommerce_data',
'user': 'root', # Replace with your MySQL username
'password': '' # Replace with your MySQL password
}
# API Endpoint for product data
PRODUCT_API_URL = "http://127.0.0.1:5000/products" # URL of our simulated Flask API
# Path to your orders CSV file
DATA_DIR = 'data'
ORDERS_CSV_PATH = os.path.join(DATA_DIR, 'orders.csv')
# --- Database Connection ---
def get_db_connection():
"""Establishes and returns a MySQL database connection."""
try:
conn = mysql.connector.connect(**DB_CONFIG)
if conn.is_connected():
print(f"Successfully connected to MySQL database: {DB_CONFIG['database']}")
return conn
except Error as e:
print(f"Error connecting to MySQL database: {e}")
return None
# --- Data Loading and Processing (Modified for API) ---
def load_product_data_from_api():
"""
Fetches product data from the simulated API, processes it,
and returns a single DataFrame.
"""
try:
response = requests.get(PRODUCT_API_URL)
response.raise_for_status() # Raise an HTTPError for bad responses (4xx or 5xx)
raw_products_data = response.json()
print(f"Fetched {len(raw_products_data)} products from API: {PRODUCT_API_URL}")
all_products_df = pd.DataFrame(raw_products_data)
# Data Cleaning and Type Conversion
if not all_products_df.empty:
# Ensure consistent column names (if API doesn't guarantee it, which ours does now)
# This step is less critical if your API already provides harmonized data,
# but good to keep for robustness.
all_products_df = all_products_df.rename(columns={
'item_id': 'product_id', # Example of renaming if API has inconsistent fields
'product_title': 'name',
'department': 'category',
'manufacturer': 'brand',
'cost': 'price',
'avg_review_score': 'rating',
'num_reviews': 'reviews_count'
})
# Ensure all expected columns are present, fill missing if necessary
expected_cols = ['product_id', 'name', 'category', 'brand', 'price', 'rating', 'reviews_count']
for col in expected_cols:
if col not in all_products_df.columns:
all_products_df[col] = None # Add missing columns
# Select and reorder columns to match your desired schema
all_products_df = all_products_df[expected_cols]
all_products_df['price'] = pd.to_numeric(all_products_df['price'], errors='coerce')
all_products_df['rating'] = pd.to_numeric(all_products_df['rating'], errors='coerce')
all_products_df['reviews_count'] = pd.to_numeric(all_products_df['reviews_count'], errors='coerce').fillna(0).astype(int)
# Drop rows where essential product_id or name is missing
all_products_df.dropna(subset=['product_id', 'name'], inplace=True)
# Remove duplicates based on product_id
all_products_df.drop_duplicates(subset=['product_id'], inplace=True)
print(f"Total unique products after API fetch and cleaning: {len(all_products_df)}")
else:
print("No product data fetched from API.")
return all_products_df
except requests.exceptions.ConnectionError:
print(f"Error: Could not connect to the API server at {PRODUCT_API_URL}.")
print("Please ensure 'api_server.py' is running before executing this script.")
return pd.DataFrame()
except requests.exceptions.RequestException as e:
print(f"Error fetching data from API: {e}")
return pd.DataFrame()
def load_order_data():
"""Loads order data from the CSV file."""
if os.path.exists(ORDERS_CSV_PATH):
orders_df = pd.read_csv(ORDERS_CSV_PATH)
# Convert order_date to datetime objects
orders_df['order_date'] = pd.to_datetime(orders_df['order_date'])
# Ensure product_id is string for consistent merging
orders_df['product_id'] = orders_df['product_id'].astype(str)
print(f"Loaded {len(orders_df)} order items from {ORDERS_CSV_PATH}")
return orders_df
else:
print(f"Error: {ORDERS_CSV_PATH} not found.")
return pd.DataFrame()
# --- Data Loading into MySQL (remains same) ---
def insert_products_into_db(conn, products_df):
"""Inserts product data into the 'products' table."""
if products_df.empty:
print("No product data to insert.")
return
cursor = conn.cursor()
insert_sql = """
INSERT INTO products (product_id, name, category, brand, price, rating, reviews_count)
VALUES (%s, %s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
name = VALUES(name),
category = VALUES(category),
brand = VALUES(brand),
price = VALUES(price),
rating = VALUES(rating),
reviews_count = VALUES(reviews_count);
"""
try:
data_to_insert = products_df[['product_id', 'name', 'category', 'brand', 'price', 'rating', 'reviews_count']].values.tolist()
cursor.executemany(insert_sql, data_to_insert)
conn.commit()
print(f"Successfully inserted/updated {cursor.rowcount} products into 'products' table.")
except Error as e:
print(f"Error inserting products: {e}")
finally:
cursor.close()
def insert_orders_into_db(conn, orders_df, products_df):
"""
Inserts order and order item data into 'orders' and 'order_items' tables.
Requires product prices for unit_price_at_order.
"""
if orders_df.empty:
print("No order data to insert.")
return
cursor = conn.cursor()
product_prices = products_df.set_index('product_id')['price'].to_dict()
unique_orders = orders_df[['order_id', 'customer_id', 'order_date']].drop_duplicates()
order_insert_sql = """
INSERT IGNORE INTO orders (order_id, customer_id, order_date)
VALUES (%s, %s, %s);
"""
order_item_insert_sql = """
INSERT INTO order_items (order_id, product_id, quantity, unit_price_at_order)
VALUES (%s, %s, %s, %s);
"""
try:
orders_to_insert = unique_orders.values.tolist()
cursor.executemany(order_insert_sql, orders_to_insert)
print(f"Successfully inserted/ignored {cursor.rowcount} unique orders into 'orders' table.")
order_items_to_insert = []
for index, row in orders_df.iterrows():
product_id = row['product_id']
unit_price = product_prices.get(product_id, 0.0) # Default to 0 if product_id not found in API data
order_items_to_insert.append((
row['order_id'],
product_id,
row['quantity'],
unit_price
))
cursor.executemany(order_item_insert_sql, order_items_to_insert)
conn.commit()
print(f"Successfully inserted {cursor.rowcount} order items into 'order_items' table.")
except Error as e:
print(f"Error inserting orders/order items: {e}")
conn.rollback()
finally:
cursor.close()
# --- Main Execution ---
if __name__ == "__main__":
# 1. Load and process product data from API
products_df = load_product_data_from_api()
# 2. Load order data from CSV
orders_df = load_order_data()
# Continue only if product data was successfully fetched
if not products_df.empty:
# 3. Establish database connection
conn = get_db_connection()
if conn:
try:
# 4. Insert products into MySQL
insert_products_into_db(conn, products_df)
# 5. Insert orders and order items into MySQL
insert_orders_into_db(conn, orders_df, products_df)
finally:
if conn.is_connected():
conn.close()
print("MySQL connection closed.")
else:
print("Database connection failed. Data ingestion aborted.")
else:
print("Product data not available from API. Data ingestion aborted.")