forked from alricdsouza11/Optipick
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmlops_utils.py
More file actions
297 lines (241 loc) · 10.6 KB
/
mlops_utils.py
File metadata and controls
297 lines (241 loc) · 10.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
# mlops_utils.py
# MLOps utility functions for monitoring and tracking
# These functions add MLOps capabilities without affecting core functionality
import json
import time
import logging
from datetime import datetime
from pathlib import Path
import pandas as pd
import numpy as np
from typing import Dict, Any, List
# Configure logging for MLOps
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class MLOpsTracker:
"""MLOps tracking and monitoring utility"""
def __init__(self):
self.metrics_file = Path("mlops_metrics.json")
self.experiments_file = Path("experiments.json")
self.data_quality_file = Path("data_quality.json")
def log_experiment(self, experiment_name: str, parameters: Dict[str, Any], metrics: Dict[str, float]):
"""Log experiment parameters and results"""
experiment_data = {
"timestamp": datetime.now().isoformat(),
"experiment_name": experiment_name,
"parameters": parameters,
"metrics": metrics,
"version": "v1.0.0"
}
# Load existing experiments
experiments = []
if self.experiments_file.exists():
with open(self.experiments_file, 'r') as f:
experiments = json.load(f)
experiments.append(experiment_data)
# Save experiments
with open(self.experiments_file, 'w') as f:
json.dump(experiments, f, indent=2)
logger.info(f"Logged experiment: {experiment_name}")
def track_model_performance(self, model_name: str, predictions: List, confidence_scores: List):
"""Track model performance metrics"""
performance_metrics = {
"timestamp": datetime.now().isoformat(),
"model_name": model_name,
"prediction_count": len(predictions),
"avg_confidence": np.mean(confidence_scores) if confidence_scores else 0,
"prediction_distribution": {
"positive": sum(1 for p in predictions if p == "Positive"),
"negative": sum(1 for p in predictions if p == "Negative"),
"neutral": sum(1 for p in predictions if p == "Neutral")
}
}
# Load existing metrics
metrics = []
if self.metrics_file.exists():
with open(self.metrics_file, 'r') as f:
metrics = json.load(f)
metrics.append(performance_metrics)
# Keep only last 100 entries to prevent file bloat
metrics = metrics[-100:]
# Save metrics
with open(self.metrics_file, 'w') as f:
json.dump(metrics, f, indent=2)
def validate_data_quality(self, df: pd.DataFrame, data_type: str) -> Dict[str, Any]:
"""Validate and track data quality metrics"""
quality_metrics = {
"timestamp": datetime.now().isoformat(),
"data_type": data_type,
"total_records": len(df),
"missing_values": df.isnull().sum().to_dict(),
"duplicate_records": df.duplicated().sum(),
"data_completeness": (1 - df.isnull().sum().sum() / (len(df) * len(df.columns))) * 100
}
# Additional checks for review data
if data_type == "reviews" and "review" in df.columns:
quality_metrics.update({
"empty_reviews": df["review"].isna().sum(),
"avg_review_length": df["review"].str.len().mean() if not df["review"].isna().all() else 0,
"reviews_with_ratings": df["rating"].notna().sum() if "rating" in df.columns else 0
})
# Load existing quality metrics
quality_data = []
if self.data_quality_file.exists():
with open(self.data_quality_file, 'r') as f:
quality_data = json.load(f)
quality_data.append(quality_metrics)
quality_data = quality_data[-50:] # Keep last 50 entries
# Save quality metrics
with open(self.data_quality_file, 'w') as f:
json.dump(quality_data, f, indent=2)
logger.info(f"Data quality validated for {data_type}: {quality_metrics['data_completeness']:.1f}% complete")
return quality_metrics
def monitor_pipeline_health(self, pipeline_name: str, start_time: float, success: bool, error_msg: str = None):
"""Monitor data pipeline health and performance"""
execution_time = time.time() - start_time
pipeline_metrics = {
"timestamp": datetime.now().isoformat(),
"pipeline_name": pipeline_name,
"execution_time_seconds": execution_time,
"success": success,
"error_message": error_msg
}
logger.info(f"Pipeline {pipeline_name}: {'SUCCESS' if success else 'FAILED'} in {execution_time:.2f}s")
return pipeline_metrics
def detect_data_drift(self, current_data: List[str], baseline_data: List[str]) -> Dict[str, float]:
"""Simple data drift detection using vocabulary overlap"""
if not baseline_data or not current_data:
return {"drift_score": 0.0, "vocabulary_overlap": 1.0}
# Extract vocabulary
current_vocab = set(' '.join(current_data).lower().split())
baseline_vocab = set(' '.join(baseline_data).lower().split())
# Calculate overlap
if not baseline_vocab:
vocabulary_overlap = 1.0
else:
vocabulary_overlap = len(current_vocab & baseline_vocab) / len(baseline_vocab)
drift_score = 1.0 - vocabulary_overlap
drift_metrics = {
"drift_score": drift_score,
"vocabulary_overlap": vocabulary_overlap,
"current_vocab_size": len(current_vocab),
"baseline_vocab_size": len(baseline_vocab)
}
if drift_score > 0.3: # Threshold for significant drift
logger.warning(f"Data drift detected! Drift score: {drift_score:.3f}")
return drift_metrics
def get_system_metrics(self) -> Dict[str, Any]:
"""Get system performance metrics"""
import psutil
try:
system_metrics = {
"timestamp": datetime.now().isoformat(),
"cpu_usage_percent": psutil.cpu_percent(),
"memory_usage_percent": psutil.virtual_memory().percent,
"available_memory_gb": psutil.virtual_memory().available / (1024**3)
}
except ImportError:
# Fallback if psutil not available
system_metrics = {
"timestamp": datetime.now().isoformat(),
"note": "Install psutil for detailed system metrics"
}
return system_metrics
# Global tracker instance
mlops_tracker = MLOpsTracker()
# Decorator for automatic pipeline monitoring
def monitor_pipeline(pipeline_name: str):
"""Decorator to automatically monitor pipeline execution"""
def decorator(func):
def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = func(*args, **kwargs)
mlops_tracker.monitor_pipeline_health(pipeline_name, start_time, True)
return result
except Exception as e:
mlops_tracker.monitor_pipeline_health(pipeline_name, start_time, False, str(e))
raise
return wrapper
return decorator
# A/B Testing Configuration
class ABTestConfig:
"""Simple A/B testing configuration"""
@staticmethod
def get_sentiment_threshold(user_session_id: str) -> float:
"""Get sentiment threshold based on A/B test group"""
# Simple hash-based assignment
group = hash(user_session_id) % 2
if group == 0:
return 0.05 # Control group
else:
return 0.1 # Treatment group (higher threshold)
@staticmethod
def get_max_features_config(user_session_id: str) -> int:
"""Get TF-IDF max features based on A/B test"""
group = hash(user_session_id) % 2
if group == 0:
return 1000 # Control
else:
return 1500 # Treatment
# Model versioning utilities
class ModelVersionManager:
"""Manage model versions and configurations"""
def __init__(self):
self.config_file = Path("model_config.json")
self.current_version = "v1.0.0"
def save_model_config(self, config: Dict[str, Any]):
"""Save current model configuration"""
versioned_config = {
"version": self.current_version,
"timestamp": datetime.now().isoformat(),
"config": config
}
with open(self.config_file, 'w') as f:
json.dump(versioned_config, f, indent=2)
def load_model_config(self) -> Dict[str, Any]:
"""Load model configuration"""
if self.config_file.exists():
with open(self.config_file, 'r') as f:
return json.load(f)
# Default configuration
return {
"version": self.current_version,
"config": {
"sentiment_threshold": 0.05,
"tfidf_min_df": 2,
"ngram_range": [1, 2],
"max_features": 1000
}
}
# Feature store simulation
class FeatureStore:
"""Simple feature store for caching processed features"""
def __init__(self):
self.cache_file = Path("feature_cache.json")
self.cache = {}
self.load_cache()
def load_cache(self):
"""Load cached features"""
if self.cache_file.exists():
with open(self.cache_file, 'r') as f:
self.cache = json.load(f)
def save_cache(self):
"""Save feature cache"""
with open(self.cache_file, 'w') as f:
json.dump(self.cache, f)
def get_features(self, product_id: str) -> Dict[str, Any]:
"""Get cached features for a product"""
return self.cache.get(product_id)
def store_features(self, product_id: str, features: Dict[str, Any]):
"""Store features for a product"""
features["cached_at"] = datetime.now().isoformat()
self.cache[product_id] = features
# Limit cache size
if len(self.cache) > 100:
oldest_key = min(self.cache.keys(), key=lambda k: self.cache[k].get("cached_at", ""))
del self.cache[oldest_key]
self.save_cache()
# Global instances
model_version_manager = ModelVersionManager()
feature_store = FeatureStore()