diff --git a/github_dagger_workflow_project/02_model_training.py b/github_dagger_workflow_project/02_model_training.py index b3d70b3..325cd0d 100644 --- a/github_dagger_workflow_project/02_model_training.py +++ b/github_dagger_workflow_project/02_model_training.py @@ -30,7 +30,7 @@ model_results = {} xgboost_cr = pu.train_xgboost(X_train, X_test, y_train, y_test, experiment_id) -lr_cr = pu.train_linear_regression(X_train, X_test, y_train, y_test, experiment_id) +lr_cr = pu.train_logistic_regression(X_train, X_test, y_train, y_test, experiment_id) model_results.update(xgboost_cr) model_results.update(lr_cr) diff --git a/github_dagger_workflow_project/pipeline_utils.py b/github_dagger_workflow_project/pipeline_utils.py index 7dc8815..fbc2f09 100644 --- a/github_dagger_workflow_project/pipeline_utils.py +++ b/github_dagger_workflow_project/pipeline_utils.py @@ -28,10 +28,11 @@ XGBOOST_MODEL_JSON_PATH, LR_MODEL_PATH, MODEL_RESULTS_PATH, - BEST_EXPERIMENT_PATH, + BEST_EXPERIMENT_PATH, BEST_MODEL_PATH, ) + def initialize_dates(max_date_str, min_date_str): """ Initialize min and max dates for filtering the data. @@ -62,30 +63,41 @@ def save_date_limits(data, file_path): json.dump(date_limits, f) -def preprocess_data(data): - data = data.drop( - [ - "is_active", - "marketing_consent", - "first_booking", - "existing_customer", - "last_seen", - ], - axis=1, - ) - data = data.drop( - ["domain", "country", "visited_learn_more_before_booking", "visited_faq"], axis=1 - ) - data["lead_indicator"].replace("", np.nan, inplace=True) - data["lead_id"].replace("", np.nan, inplace=True) - data["customer_code"].replace("", np.nan, inplace=True) - data = data.dropna(axis=0, subset=["lead_indicator"]) - data = data.dropna(axis=0, subset=["lead_id"]) +def preprocess_data(data: pd.DataFrame) -> pd.DataFrame: + """ + Preprocesses data by: + Drops unnecessary columns. + Replaces empty strings with NaN in specific columns. + Removes rows with missing values in critical columns. + Filters rows based on the 'source' column being 'signup'. + """ + columns_to_drop = [ + "is_active", + "marketing_consent", + "first_booking", + "existing_customer", + "last_seen", + "domain", + "country", + "visited_learn_more_before_booking", + "visited_faq", + ] + data = data.drop(columns=columns_to_drop, axis=1) + + columns_to_clean = ["lead_indicator", "lead_id", "customer_code"] + data[columns_to_clean] = data[columns_to_clean].replace("", np.nan) + + data = data.dropna(axis=0, subset=["lead_indicator", "lead_id"]) + data = data[data.source == "signup"] + return data -def process_and_save_artifacts(data): +def process_and_save_artifacts(data: pd.DataFrame) -> None: + """ + Finds outliers, imputes missing data, and performs min-max data scaling + """ vars = [ "lead_id", "lead_indicator", @@ -96,20 +108,17 @@ def process_and_save_artifacts(data): ] for col in vars: data[col] = data[col].astype("object") - cont_vars = data.loc[:, ((data.dtypes == "float64") | (data.dtypes == "int64"))] cat_vars = data.loc[:, (data.dtypes == "object")] cont_vars = cont_vars.apply( lambda x: x.clip(lower=(x.mean() - 2 * x.std()), upper=(x.mean() + 2 * x.std())) ) - outlier_summary = cont_vars.apply(utils.describe_numeric_col).T outlier_summary.to_csv(OUTLIER_SUMMARY_PATH) cat_missing_impute = cat_vars.mode(numeric_only=False, dropna=True) cat_missing_impute.to_csv(CAT_MISSING_IMPUTE_PATH) - cont_vars = cont_vars.apply(utils.impute_missing_values) cont_vars.apply(utils.describe_numeric_col).T @@ -119,6 +128,7 @@ def process_and_save_artifacts(data): lambda x: pd.Series([x.count(), x.isnull().sum()], index=["Count", "Missing"]) ).T + # scaling data scaler = MinMaxScaler() scaler.fit(cont_vars) joblib.dump(value=scaler, filename=SCALER_PATH) @@ -132,7 +142,6 @@ def process_and_save_artifacts(data): data_columns = list(data.columns) with open(COLUMNS_DRIFT_PATH, "w+") as f: json.dump(data_columns, f) - data.to_csv(TRAINING_DATA_PATH, index=False) data["bin_source"] = data["source"] @@ -140,7 +149,6 @@ def process_and_save_artifacts(data): data.loc[~data["source"].isin(values_list), "bin_source"] = "Others" mapping = {"li": "socials", "fb": "socials", "organic": "group1", "signup": "group1"} data["bin_source"] = data["source"].map(mapping) - data.to_csv(TRAIN_DATA_GOLD_PATH, index=False) @@ -221,7 +229,7 @@ def train_xgboost(X_train, X_test, y_train, y_test, experiment_id): # mlflow logistic regression experiments -def train_linear_regression(X_train, X_test, y_train, y_test, experiment_id): +def train_logistic_regression(X_train, X_test, y_train, y_test, experiment_id): with mlflow.start_run(experiment_id=experiment_id): model = LogisticRegression() @@ -303,4 +311,4 @@ def register_and_wait_model(run_id, artifact_path, model_name): model_uri = f"runs:/{run_id}/{artifact_path}" model_details = mlflow.register_model(model_uri=model_uri, name=model_name) utils.wait_until_ready(model_details.name, model_details.version) - return dict(model_details) \ No newline at end of file + return dict(model_details)