-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_process_blob.py
More file actions
179 lines (145 loc) · 7.7 KB
/
Copy pathtest_process_blob.py
File metadata and controls
179 lines (145 loc) · 7.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
import sys
import json
import os
import pandas as pd
from unittest.mock import patch, MagicMock
from uncap_app.process_blob import process_blob
def load_local_settings():
"""Loads settings from local.settings.json into the environment."""
settings_path = os.path.join(os.path.dirname(__file__), 'local.settings.json')
if not os.path.exists(settings_path):
print(f"Warning: '{settings_path}' not found. Assuming environment is already configured.")
return
with open(settings_path, 'r') as f:
settings = json.load(f)
if 'Values' in settings:
for key, value in settings['Values'].items():
os.environ[key] = value
def inspect_object(obj):
"""Converts a SQLAlchemy model object to a dictionary for printing."""
return {c.key: getattr(obj, c.key) for c in obj.__table__.columns if c.key != 'id'}
def run_test(json_data, site):
"""
Simulates the execution of process_blob for a given JSON file, dumping the
data that would be committed to CSV files for verification.
"""
# These imports are moved here to ensure the environment is set before they run
from uncap_app.core import SessionLocal
from uncap_app.process_blob import process_blob
mock_blob = MagicMock()
mock_blob.name = f"{site}_test_data.json"
mock_blob.container = "test-container"
mock_blob.read.return_value = json.dumps(json_data).encode('utf-8')
mock_retry_storage = {}
def mock_get_retry_state(client, container, site, keyword):
state_key = f"{site}/{keyword}"
return mock_retry_storage.get(state_key, 0)
def mock_update_retry_state(client, container, site, keyword, count):
state_key = f"{site}/{keyword}"
print(f"[TEST] Would update retry state for '{state_key}' to {count}.")
mock_retry_storage[state_key] = count
def mock_delete_retry_state(client, container, site, keyword):
state_key = f"{site}/{keyword}"
print(f"[TEST] Would delete retry state for '{state_key}'.")
if state_key in mock_retry_storage:
del mock_retry_storage[state_key]
# The main test execution block
with patch('uncap_app.process_blob._get_blob_service_client'), \
patch('uncap_app.process_blob._get_retry_state', side_effect=mock_get_retry_state), \
patch('uncap_app.process_blob._update_retry_state', side_effect=mock_update_retry_state), \
patch('uncap_app.process_blob._delete_retry_state', side_effect=mock_delete_retry_state), \
patch('uncap_app.process_blob.run_iceberg_batch') as mock_run_batch:
session = SessionLocal()
try:
# This function will replace session.commit
def inspect_and_dump_session():
print("\n--- DATABASE TRANSACTION SIMULATION ---")
data_by_table = {}
# Capture all objects pending insertion *before* flushing
# This includes objects added by process_blob and its sub-functions
all_new_objects = list(session.new)
if all_new_objects:
# Flush the session to ensure relationships and foreign keys are populated
# without committing the transaction. This is crucial for objects
# that need their IDs before further processing.
session.flush()
for obj in all_new_objects:
table_name = obj.__class__.__name__
if table_name not in data_by_table:
data_by_table[table_name] = []
data_by_table[table_name].append(inspect_object(obj))
output_dir = "test_data"
os.makedirs(output_dir, exist_ok=True)
print(f"Dumping data to CSV files in '{output_dir}' directory...")
for table_name, records in data_by_table.items():
df = pd.DataFrame(records)
file_path = os.path.join(output_dir, f"{table_name}.csv")
df.to_csv(file_path, index=False)
print(f" -> Saved {len(records)} records to {file_path}")
else:
print("No new or modified data was found in the session to commit.")
# List to accumulate all objects added to the session
all_added_objects = []
# Patch session.add to intercept objects and store them
original_session_add = session.add
def mock_session_add(obj):
all_added_objects.append(obj)
original_session_add(obj) # Call the original add to maintain session state
session.add = mock_session_add
# This function will replace session.commit
def inspect_and_dump_session():
print("\n--- DATABASE TRANSACTION SIMULATION ---")
data_by_table = {}
# Use the accumulated list of all added objects
if all_added_objects:
# Flush the session to ensure relationships and foreign keys are populated
# without committing the transaction. This is crucial for objects
# that need their IDs before further processing.
session.flush()
for obj in all_added_objects:
table_name = obj.__class__.__name__
if table_name not in data_by_table:
data_by_table[table_name] = []
data_by_table[table_name].append(inspect_object(obj))
output_dir = "test_data"
os.makedirs(output_dir, exist_ok=True)
print(f"Dumping data to CSV files in '{output_dir}' directory...")
for table_name, records in data_by_table.items():
df = pd.DataFrame(records)
file_path = os.path.join(output_dir, f"{table_name}.csv")
df.to_csv(file_path, index=False)
print(f" -> Saved {len(records)} records to {file_path}")
else:
print("No new or modified data was found in the session to commit.")
session.commit = inspect_and_dump_session
# Ensure process_blob uses our modified session
with patch('uncap_app.process_blob.SessionLocal', return_value=session):
process_blob(mock_blob, "test-container", site, min_skus_per_keyword=1)
print("\n--- ICEBERG API CALL SIMULATION ---")
if mock_run_batch.called:
call_args = mock_run_batch.call_args[0]
print(f"run_iceberg_batch would have been called for site '{call_args[0]}' with keywords:")
print(json.dumps(call_args[1], indent=2))
else:
print("run_iceberg_batch was not called.")
finally:
session.rollback() # Ensure the real DB is never touched
session.close()
if __name__ == "__main__":
load_local_settings()
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
if len(sys.argv) != 2:
print(f"Usage: python {sys.argv[0]} <path_to_json_file>", file=sys.stderr)
sys.exit(1)
file_path = sys.argv[1]
site = "wayfair"
try:
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
run_test(data, site)
except FileNotFoundError:
print(f"Error: File not found at {file_path}", file=sys.stderr)
sys.exit(1)
except json.JSONDecodeError:
print(f"Error: Could not decode JSON from {file_path}", file=sys.stderr)
sys.exit(1)