diff --git a/github_dagger_workflow_project/01_data_transformations.py b/github_dagger_workflow_project/01_data_transformations.py index 5808523..080bdaa 100644 --- a/github_dagger_workflow_project/01_data_transformations.py +++ b/github_dagger_workflow_project/01_data_transformations.py @@ -1,151 +1,23 @@ -import datetime -import joblib -import json -import numpy as np import os -import pandas as pd -from sklearn.preprocessing import MinMaxScaler - -from github_dagger_workflow_project import utils +from github_dagger_workflow_project import pipeline_utils as pu from github_dagger_workflow_project.config import ( ARTIFACTS_DIR, RAW_DATA_PATH, DATE_LIMITS_PATH, - OUTLIER_SUMMARY_PATH, - CAT_MISSING_IMPUTE_PATH, - SCALER_PATH, - COLUMNS_DRIFT_PATH, - TRAINING_DATA_PATH, - TRAIN_DATA_GOLD_PATH, + MAX_DATE_STR, + MIN_DATE_STR, ) -def initialize_dates(max_date_str, min_date_str): - """ - Initialize min and max dates for filtering the data. - """ - if not max_date_str: - max_date = pd.to_datetime(datetime.datetime.now().date()).date() - else: - max_date = pd.to_datetime(max_date_str).date() - - min_date = pd.to_datetime(min_date_str).date() - return min_date, max_date - - -def load_data(file_path): - return pd.read_csv(file_path) - - -def filter_date_range(data, min_date, max_date): - data["date_part"] = pd.to_datetime(data["date_part"]).dt.date - return data[(data["date_part"] >= min_date) & (data["date_part"] <= max_date)] - - -def save_date_limits(data, file_path): - min_date = data["date_part"].min() - max_date = data["date_part"].max() - date_limits = {"min_date": str(min_date), "max_date": str(max_date)} - with open(file_path, "w") as f: - json.dump(date_limits, f) - - -def preprocess_data(data): - data = data.drop( - [ - "is_active", - "marketing_consent", - "first_booking", - "existing_customer", - "last_seen", - ], - axis=1, - ) - data = data.drop( - ["domain", "country", "visited_learn_more_before_booking", "visited_faq"], axis=1 - ) - data["lead_indicator"].replace("", np.nan, inplace=True) - data["lead_id"].replace("", np.nan, inplace=True) - data["customer_code"].replace("", np.nan, inplace=True) - data = data.dropna(axis=0, subset=["lead_indicator"]) - data = data.dropna(axis=0, subset=["lead_id"]) - data = data[data.source == "signup"] - return data - - -def process_and_save_artifacts(data): - vars = [ - "lead_id", - "lead_indicator", - "customer_group", - "onboarding", - "source", - "customer_code", - ] - for col in vars: - data[col] = data[col].astype("object") - - cont_vars = data.loc[:, ((data.dtypes == "float64") | (data.dtypes == "int64"))] - cat_vars = data.loc[:, (data.dtypes == "object")] - - cont_vars = cont_vars.apply( - lambda x: x.clip(lower=(x.mean() - 2 * x.std()), upper=(x.mean() + 2 * x.std())) - ) - - outlier_summary = cont_vars.apply(utils.describe_numeric_col).T - outlier_summary.to_csv(OUTLIER_SUMMARY_PATH) - - cat_missing_impute = cat_vars.mode(numeric_only=False, dropna=True) - cat_missing_impute.to_csv(CAT_MISSING_IMPUTE_PATH) - - cont_vars = cont_vars.apply(utils.impute_missing_values) - cont_vars.apply(utils.describe_numeric_col).T - - cat_vars.loc[cat_vars["customer_code"].isna(), "customer_code"] = "None" - cat_vars = cat_vars.apply(utils.impute_missing_values) - cat_vars.apply( - lambda x: pd.Series([x.count(), x.isnull().sum()], index=["Count", "Missing"]) - ).T - - scaler = MinMaxScaler() - scaler.fit(cont_vars) - joblib.dump(value=scaler, filename=SCALER_PATH) - - cont_vars = pd.DataFrame(scaler.transform(cont_vars), columns=cont_vars.columns) - cont_vars = cont_vars.reset_index(drop=True) - cat_vars = cat_vars.reset_index(drop=True) - - data = pd.concat([cat_vars, cont_vars], axis=1) - - data_columns = list(data.columns) - with open(COLUMNS_DRIFT_PATH, "w+") as f: - json.dump(data_columns, f) - - data.to_csv(TRAINING_DATA_PATH, index=False) - - data["bin_source"] = data["source"] - values_list = ["li", "organic", "signup", "fb"] - data.loc[~data["source"].isin(values_list), "bin_source"] = "Others" - mapping = {"li": "socials", "fb": "socials", "organic": "group1", "signup": "group1"} - data["bin_source"] = data["source"].map(mapping) - - data.to_csv(TRAIN_DATA_GOLD_PATH, index=False) - - # Define min and max date -max_date_str = "2024-01-31" -min_date_str = "2024-01-01" -min_date, max_date = initialize_dates(max_date_str, min_date_str) +min_date, max_date = pu.initialize_dates(MAX_DATE_STR, MIN_DATE_STR) # Create artifacts folder os.makedirs(ARTIFACTS_DIR, exist_ok=True) -# Warnings and pandas settings - - -data = load_data(RAW_DATA_PATH) -data = filter_date_range(data, min_date, max_date) -save_date_limits(data, DATE_LIMITS_PATH) -data = preprocess_data(data) -process_and_save_artifacts(data) +data = pu.load_data(RAW_DATA_PATH) +data = pu.filter_date_range(data, min_date, max_date) +pu.save_date_limits(data, DATE_LIMITS_PATH) +data = pu.preprocess_data(data) +pu.process_and_save_artifacts(data) diff --git a/github_dagger_workflow_project/02_model_training.py b/github_dagger_workflow_project/02_model_training.py index 04c5586..b3d70b3 100644 --- a/github_dagger_workflow_project/02_model_training.py +++ b/github_dagger_workflow_project/02_model_training.py @@ -1,176 +1,38 @@ -import datetime -import json import os -import joblib import mlflow import mlflow.pyfunc import pandas as pd -from scipy.stats import uniform, randint -from sklearn.linear_model import LogisticRegression -from sklearn.metrics import classification_report, f1_score -from sklearn.model_selection import RandomizedSearchCV, train_test_split -from xgboost import XGBRFClassifier -from github_dagger_workflow_project import utils +from github_dagger_workflow_project import pipeline_utils as pu from github_dagger_workflow_project.config import ( - COLUMNS_LIST_PATH, - XGBOOST_MODEL_PATH, - XGBOOST_MODEL_JSON_PATH, - LR_MODEL_PATH, - MODEL_RESULTS_PATH, - TRAIN_DATA_GOLD_PATH, + DATA_GOLD_PATH, + EXPERIMENT_NAME, ) -def prepare_data(data: pd.DataFrame) -> list[pd.DataFrame]: - """ - Drops unnecessary columns, - creates dummy variables for categorical features, - sets float as common type for all features, - then splits the data into train and test sets. - """ - - data = data.drop(["lead_id", "customer_code", "date_part"], axis=1) - - cat_cols = ["customer_group", "onboarding", "bin_source", "source"] - cat_vars = data[cat_cols] - for col in cat_vars: - cat_vars[col] = cat_vars[col].astype("category") - cat_vars = utils.create_dummy_cols(cat_vars, col) - - other_vars = data.drop(cat_cols, axis=1) - data = pd.concat([other_vars, cat_vars], axis=1) - for col in data: - data[col] = data[col].astype("float64") - - y = data["lead_indicator"] - X = data.drop(["lead_indicator"], axis=1) - - return train_test_split(X, y, random_state=42, test_size=0.15, stratify=y) - - -def save_column_list(X_train: pd.DataFrame) -> None: - """ - Saves the list of columns to a json file. - """ - with open(COLUMNS_LIST_PATH, "w+") as columns_file: - columns = {"column_names": list(X_train.columns)} - json.dump(columns, columns_file) - - -def train_xgboost(X_train, X_test, y_train, y_test, experiment_id): - with mlflow.start_run(experiment_id=experiment_id): - model = XGBRFClassifier(random_state=42) - params = { - "learning_rate": uniform(1e-2, 3e-1), - "min_split_loss": uniform(0, 10), - "max_depth": randint(3, 10), - "subsample": uniform(0, 1), - "objective": ["reg:squarederror", "binary:logistic", "reg:logistic"], - "eval_metric": ["aucpr", "error"], - } - model_grid = RandomizedSearchCV( - model, param_distributions=params, n_jobs=-1, verbose=3, n_iter=10, cv=10 - ) - model_grid.fit(X_train, y_train) - - best_model_xgboost = model_grid.best_estimator_ - - y_pred_test = model_grid.predict(X_test) - - # log artifacts - mlflow.log_metric("f1_score", f1_score(y_test, y_pred_test, average="binary")) - mlflow.log_artifacts("artifacts", artifact_path="model") - mlflow.log_param("data_version", "00000") - mlflow.log_param("model_type", "XGBoost") - # Custom python model for predicting probability - mlflow.pyfunc.log_model("model", python_model=utils.ProbaModelWrapper(best_model_xgboost)) - - xgboost_model_path = XGBOOST_MODEL_PATH - joblib.dump(value=best_model_xgboost, filename=xgboost_model_path) - # Save lead xgboost model as artifact - xgboost_model_path = XGBOOST_MODEL_JSON_PATH - best_model_xgboost.save_model(xgboost_model_path) - - # Defining model results dict - xgboost_cr = {xgboost_model_path: classification_report(y_test, y_pred_test, output_dict=True)} - - return xgboost_cr - - -# mlflow logistic regression experiments -def train_linear_regression(X_train, X_test, y_train, y_test, experiment_id): - with mlflow.start_run(experiment_id=experiment_id): - model = LogisticRegression() - - params = { - "solver": ["newton-cg", "lbfgs", "liblinear", "sag", "saga"], - "penalty": ["none", "l1", "l2", "elasticnet"], - "C": [100, 10, 1.0, 0.1, 0.01], - } - model_grid = RandomizedSearchCV( - model, param_distributions=params, verbose=3, n_iter=10, cv=3 - ) - model_grid.fit(X_train, y_train) - - best_lr_model = model_grid.best_estimator_ - - y_pred_test = model_grid.predict(X_test) - - # log artifacts - mlflow.log_metric("f1_score", f1_score(y_test, y_pred_test, average="binary")) - mlflow.log_artifacts("artifacts", artifact_path="model") - mlflow.log_param("data_version", "00000") - mlflow.log_param("model_type", "LogisticRegression") - - # Custom python model for predicting probability - mlflow.pyfunc.log_model("model", python_model=utils.ProbaModelWrapper(best_lr_model)) - - # store model for model interpretability - lr_model_path = LR_MODEL_PATH - joblib.dump(value=best_lr_model, filename=lr_model_path) - - # Testing model and storing the columns and model results - lr_cr = {lr_model_path: classification_report(y_test, y_pred_test, output_dict=True)} - - return lr_cr - - -def save_model_results(model_results): - with open(MODEL_RESULTS_PATH, "w+") as results_file: - json.dump(model_results, results_file) - - -# Constants used: -current_date = datetime.datetime.now().strftime("%Y_%B_%d") -data_gold_path = TRAIN_DATA_GOLD_PATH -data_version = "00000" -experiment_name = current_date - # Create directories os.makedirs("artifacts", exist_ok=True) os.makedirs("mlruns", exist_ok=True) os.makedirs("mlruns/.trash", exist_ok=True) # Set mlflow experiment -mlflow.set_experiment(experiment_name) +mlflow.set_experiment(EXPERIMENT_NAME) -data = pd.read_csv(data_gold_path) +data = pd.read_csv(DATA_GOLD_PATH) -X_train, X_test, y_train, y_test = prepare_data(data) +X_train, X_test, y_train, y_test = pu.prepare_data(data) -save_column_list(X_train) +pu.save_column_list(X_train) mlflow.sklearn.autolog(log_input_examples=True, log_models=False) -experiment_id = mlflow.get_experiment_by_name(experiment_name).experiment_id - +experiment_id = mlflow.get_experiment_by_name(EXPERIMENT_NAME).experiment_id model_results = {} -xgboost_cr = train_xgboost(X_train, X_test, y_train, y_test, experiment_id) -lr_cr = train_linear_regression(X_train, X_test, y_train, y_test, experiment_id) +xgboost_cr = pu.train_xgboost(X_train, X_test, y_train, y_test, experiment_id) +lr_cr = pu.train_linear_regression(X_train, X_test, y_train, y_test, experiment_id) model_results.update(xgboost_cr) model_results.update(lr_cr) -save_model_results(model_results) +pu.save_model_results(model_results) diff --git a/github_dagger_workflow_project/03_model_selection.py b/github_dagger_workflow_project/03_model_selection.py index 0f7a759..0b1b499 100644 --- a/github_dagger_workflow_project/03_model_selection.py +++ b/github_dagger_workflow_project/03_model_selection.py @@ -1,41 +1,15 @@ -import datetime -import shutil -import mlflow -from github_dagger_workflow_project.config import BEST_EXPERIMENT_PATH, BEST_MODEL_PATH +from github_dagger_workflow_project import pipeline_utils as pu +from github_dagger_workflow_project.config import ( + EXPERIMENT_NAME, +) -def select_best_model(experiment_name): - experiment_ids = [mlflow.get_experiment_by_name(experiment_name).experiment_id] - experiment_best = mlflow.search_runs( - experiment_ids=experiment_ids, order_by=["metrics.f1_score DESC"], max_results=1 - ).iloc[0] - return experiment_best +experiment_best = pu.select_best_model(EXPERIMENT_NAME) - -def save_best_model(experiment_best, best_model_type) -> None: - # store the best experiment (model) metadata - experiment_best.to_pickle(BEST_EXPERIMENT_PATH) - - if best_model_type == "XGBoost": - best_model_artifact = "lead_model_xgboost.pkl" - elif best_model_type == "LogisticRegression": - best_model_artifact = "lead_model_lr.pkl" - - original_file_path = f"./artifacts/{best_model_artifact}" - new_file_path = BEST_MODEL_PATH - - shutil.copyfile(original_file_path, new_file_path) - - -# Constants used: -current_date = datetime.datetime.now().strftime("%Y_%B_%d") -experiment_name = current_date - -experiment_best = select_best_model(experiment_name) # Save best model # Currently we pick only LR, no matter what the best model is. # When ready, uncomment to consider xgboost. # best_model_type = experiment_best["params.model_type"] # and comment below code best_model_type = "LogisticRegression" -save_best_model(experiment_best, best_model_type) +pu.save_best_model(experiment_best, best_model_type) diff --git a/github_dagger_workflow_project/04_prod_model.py b/github_dagger_workflow_project/04_prod_model.py index b760d39..d6b01bc 100644 --- a/github_dagger_workflow_project/04_prod_model.py +++ b/github_dagger_workflow_project/04_prod_model.py @@ -1,43 +1,22 @@ -import mlflow import pandas as pd -from github_dagger_workflow_project import utils -from github_dagger_workflow_project.config import PROD_BEST_EXPERIMENT_PATH -from github_dagger_workflow_project.mlflow_client import client +from github_dagger_workflow_project import pipeline_utils as pu +from github_dagger_workflow_project.config import ( + PROD_BEST_EXPERIMENT_PATH, + ARTIFACT_PATH, + MODEL_NAME, +) -def get_production_model(model_name): - return [ - model - for model in client.search_model_versions(f"name='{model_name}'") - if dict(model)["current_stage"] == "Production" - ] - - -def get_model_score(run_id): - data, _ = mlflow.get_run(run_id) - return data[1]["metrics.f1_score"] - - -def register_and_wait_model(run_id, artifact_path, model_name): - model_uri = f"runs:/{run_id}/{artifact_path}" - model_details = mlflow.register_model(model_uri=model_uri, name=model_name) - utils.wait_until_ready(model_details.name, model_details.version) - return dict(model_details) - - -artifact_path = "model" -model_name = "lead_model" experiment_best = pd.read_pickle(PROD_BEST_EXPERIMENT_PATH) - - train_model_score = experiment_best["metrics.f1_score"] run_id = None -prod_model = get_production_model(model_name) + +prod_model = pu.get_production_model(MODEL_NAME) prod_model_exists = len(prod_model) > 0 if prod_model_exists: prod_model_run_id = dict(prod_model[0])["run_id"] - prod_model_score = get_model_score(prod_model_run_id) + prod_model_score = pu.get_model_score(prod_model_run_id) if train_model_score > prod_model_score: run_id = experiment_best["run_id"] @@ -45,4 +24,4 @@ def register_and_wait_model(run_id, artifact_path, model_name): run_id = experiment_best["run_id"] if run_id is not None: - model_details = register_and_wait_model(run_id, artifact_path, model_name) + model_details = pu.register_and_wait_model(run_id, ARTIFACT_PATH, MODEL_NAME) diff --git a/github_dagger_workflow_project/05_model_deployment.py b/github_dagger_workflow_project/05_model_deployment.py index dc881a1..2f881dd 100644 --- a/github_dagger_workflow_project/05_model_deployment.py +++ b/github_dagger_workflow_project/05_model_deployment.py @@ -1,18 +1,20 @@ from github_dagger_workflow_project import utils from github_dagger_workflow_project.mlflow_client import client +from github_dagger_workflow_project.config import ( + MODEL_NAME, + MODEL_VERSION, +) -model_version = 1 -model_name = "lead_model" -model_status = True +model_status = True -model_version_details = dict(client.get_model_version(name=model_name, version=model_version)) +model_version_details = dict(client.get_model_version(name=MODEL_NAME, version=MODEL_VERSION)) if model_version_details["current_stage"] != "Staging": client.transition_model_version_stage( - name=model_name, - version=model_version, + name=MODEL_NAME, + version=MODEL_VERSION, stage="Staging", archive_existing_versions=True, ) - model_status = utils.wait_for_deployment(model_name, model_version, client, "Staging") + model_status = utils.wait_for_deployment(MODEL_NAME, MODEL_VERSION, client, "Staging") diff --git a/github_dagger_workflow_project/config.py b/github_dagger_workflow_project/config.py index 737f668..2944e6d 100644 --- a/github_dagger_workflow_project/config.py +++ b/github_dagger_workflow_project/config.py @@ -1,3 +1,5 @@ +import datetime + ARTIFACTS_DIR = "artifacts" RAW_DATA_PATH = "./artifacts/raw_data.csv" DATE_LIMITS_PATH = "./artifacts/date_limits.json" @@ -15,3 +17,11 @@ BEST_EXPERIMENT_PATH = "./artifacts/best_experiment.pkl" BEST_MODEL_PATH = "./artifacts/best_model.pkl" PROD_BEST_EXPERIMENT_PATH = "./artifacts/best_experiment.pkl" +MAX_DATE_STR = "2024-01-31" +MIN_DATE_STR = "2024-01-01" +CURRENT_DATE = datetime.datetime.now().strftime("%Y_%B_%d") +EXPERIMENT_NAME = CURRENT_DATE +DATA_GOLD_PATH = TRAIN_DATA_GOLD_PATH +ARTIFACT_PATH = "model" +MODEL_NAME = "lead_model" +MODEL_VERSION = 1 diff --git a/github_dagger_workflow_project/pipeline_utils.py b/github_dagger_workflow_project/pipeline_utils.py new file mode 100644 index 0000000..7dc8815 --- /dev/null +++ b/github_dagger_workflow_project/pipeline_utils.py @@ -0,0 +1,306 @@ +import datetime +import joblib +import json +import numpy as np +import pandas as pd + +import joblib +import mlflow +import shutil +from sklearn.preprocessing import MinMaxScaler +from scipy.stats import uniform, randint +from sklearn.linear_model import LogisticRegression +from sklearn.metrics import classification_report, f1_score +from sklearn.model_selection import RandomizedSearchCV, train_test_split +from xgboost import XGBRFClassifier + +from github_dagger_workflow_project import utils +from github_dagger_workflow_project.mlflow_client import client +from github_dagger_workflow_project.config import ( + OUTLIER_SUMMARY_PATH, + CAT_MISSING_IMPUTE_PATH, + SCALER_PATH, + COLUMNS_DRIFT_PATH, + TRAINING_DATA_PATH, + TRAIN_DATA_GOLD_PATH, + COLUMNS_LIST_PATH, + XGBOOST_MODEL_PATH, + XGBOOST_MODEL_JSON_PATH, + LR_MODEL_PATH, + MODEL_RESULTS_PATH, + BEST_EXPERIMENT_PATH, + BEST_MODEL_PATH, +) + +def initialize_dates(max_date_str, min_date_str): + """ + Initialize min and max dates for filtering the data. + """ + if not max_date_str: + max_date = pd.to_datetime(datetime.datetime.now().date()).date() + else: + max_date = pd.to_datetime(max_date_str).date() + + min_date = pd.to_datetime(min_date_str).date() + return min_date, max_date + + +def load_data(file_path): + return pd.read_csv(file_path) + + +def filter_date_range(data, min_date, max_date): + data["date_part"] = pd.to_datetime(data["date_part"]).dt.date + return data[(data["date_part"] >= min_date) & (data["date_part"] <= max_date)] + + +def save_date_limits(data, file_path): + min_date = data["date_part"].min() + max_date = data["date_part"].max() + date_limits = {"min_date": str(min_date), "max_date": str(max_date)} + with open(file_path, "w") as f: + json.dump(date_limits, f) + + +def preprocess_data(data): + data = data.drop( + [ + "is_active", + "marketing_consent", + "first_booking", + "existing_customer", + "last_seen", + ], + axis=1, + ) + data = data.drop( + ["domain", "country", "visited_learn_more_before_booking", "visited_faq"], axis=1 + ) + data["lead_indicator"].replace("", np.nan, inplace=True) + data["lead_id"].replace("", np.nan, inplace=True) + data["customer_code"].replace("", np.nan, inplace=True) + data = data.dropna(axis=0, subset=["lead_indicator"]) + data = data.dropna(axis=0, subset=["lead_id"]) + data = data[data.source == "signup"] + return data + + +def process_and_save_artifacts(data): + vars = [ + "lead_id", + "lead_indicator", + "customer_group", + "onboarding", + "source", + "customer_code", + ] + for col in vars: + data[col] = data[col].astype("object") + + cont_vars = data.loc[:, ((data.dtypes == "float64") | (data.dtypes == "int64"))] + cat_vars = data.loc[:, (data.dtypes == "object")] + + cont_vars = cont_vars.apply( + lambda x: x.clip(lower=(x.mean() - 2 * x.std()), upper=(x.mean() + 2 * x.std())) + ) + + outlier_summary = cont_vars.apply(utils.describe_numeric_col).T + outlier_summary.to_csv(OUTLIER_SUMMARY_PATH) + + cat_missing_impute = cat_vars.mode(numeric_only=False, dropna=True) + cat_missing_impute.to_csv(CAT_MISSING_IMPUTE_PATH) + + cont_vars = cont_vars.apply(utils.impute_missing_values) + cont_vars.apply(utils.describe_numeric_col).T + + cat_vars.loc[cat_vars["customer_code"].isna(), "customer_code"] = "None" + cat_vars = cat_vars.apply(utils.impute_missing_values) + cat_vars.apply( + lambda x: pd.Series([x.count(), x.isnull().sum()], index=["Count", "Missing"]) + ).T + + scaler = MinMaxScaler() + scaler.fit(cont_vars) + joblib.dump(value=scaler, filename=SCALER_PATH) + + cont_vars = pd.DataFrame(scaler.transform(cont_vars), columns=cont_vars.columns) + cont_vars = cont_vars.reset_index(drop=True) + cat_vars = cat_vars.reset_index(drop=True) + + data = pd.concat([cat_vars, cont_vars], axis=1) + + data_columns = list(data.columns) + with open(COLUMNS_DRIFT_PATH, "w+") as f: + json.dump(data_columns, f) + + data.to_csv(TRAINING_DATA_PATH, index=False) + + data["bin_source"] = data["source"] + values_list = ["li", "organic", "signup", "fb"] + data.loc[~data["source"].isin(values_list), "bin_source"] = "Others" + mapping = {"li": "socials", "fb": "socials", "organic": "group1", "signup": "group1"} + data["bin_source"] = data["source"].map(mapping) + + data.to_csv(TRAIN_DATA_GOLD_PATH, index=False) + + +def prepare_data(data: pd.DataFrame) -> list[pd.DataFrame]: + """ + Drops unnecessary columns, + creates dummy variables for categorical features, + sets float as common type for all features, + then splits the data into train and test sets. + """ + + data = data.drop(["lead_id", "customer_code", "date_part"], axis=1) + + cat_cols = ["customer_group", "onboarding", "bin_source", "source"] + cat_vars = data[cat_cols] + for col in cat_vars: + cat_vars[col] = cat_vars[col].astype("category") + cat_vars = utils.create_dummy_cols(cat_vars, col) + + other_vars = data.drop(cat_cols, axis=1) + data = pd.concat([other_vars, cat_vars], axis=1) + for col in data: + data[col] = data[col].astype("float64") + + y = data["lead_indicator"] + X = data.drop(["lead_indicator"], axis=1) + + return train_test_split(X, y, random_state=42, test_size=0.15, stratify=y) + + +def save_column_list(X_train: pd.DataFrame) -> None: + """ + Saves the list of columns to a json file. + """ + with open(COLUMNS_LIST_PATH, "w+") as columns_file: + columns = {"column_names": list(X_train.columns)} + json.dump(columns, columns_file) + + +def train_xgboost(X_train, X_test, y_train, y_test, experiment_id): + with mlflow.start_run(experiment_id=experiment_id): + model = XGBRFClassifier(random_state=42) + params = { + "learning_rate": uniform(1e-2, 3e-1), + "min_split_loss": uniform(0, 10), + "max_depth": randint(3, 10), + "subsample": uniform(0, 1), + "objective": ["reg:squarederror", "binary:logistic", "reg:logistic"], + "eval_metric": ["aucpr", "error"], + } + model_grid = RandomizedSearchCV( + model, param_distributions=params, n_jobs=-1, verbose=3, n_iter=10, cv=10 + ) + model_grid.fit(X_train, y_train) + + best_model_xgboost = model_grid.best_estimator_ + + y_pred_test = model_grid.predict(X_test) + + # log artifacts + mlflow.log_metric("f1_score", f1_score(y_test, y_pred_test, average="binary")) + mlflow.log_artifacts("artifacts", artifact_path="model") + mlflow.log_param("data_version", "00000") + mlflow.log_param("model_type", "XGBoost") + # Custom python model for predicting probability + mlflow.pyfunc.log_model("model", python_model=utils.ProbaModelWrapper(best_model_xgboost)) + + xgboost_model_path = XGBOOST_MODEL_PATH + joblib.dump(value=best_model_xgboost, filename=xgboost_model_path) + # Save lead xgboost model as artifact + xgboost_model_path = XGBOOST_MODEL_JSON_PATH + best_model_xgboost.save_model(xgboost_model_path) + + # Defining model results dict + xgboost_cr = {xgboost_model_path: classification_report(y_test, y_pred_test, output_dict=True)} + + return xgboost_cr + + +# mlflow logistic regression experiments +def train_linear_regression(X_train, X_test, y_train, y_test, experiment_id): + with mlflow.start_run(experiment_id=experiment_id): + model = LogisticRegression() + + params = { + "solver": ["newton-cg", "lbfgs", "liblinear", "sag", "saga"], + "penalty": ["none", "l1", "l2", "elasticnet"], + "C": [100, 10, 1.0, 0.1, 0.01], + } + model_grid = RandomizedSearchCV( + model, param_distributions=params, verbose=3, n_iter=10, cv=3 + ) + model_grid.fit(X_train, y_train) + + best_lr_model = model_grid.best_estimator_ + + y_pred_test = model_grid.predict(X_test) + + # log artifacts + mlflow.log_metric("f1_score", f1_score(y_test, y_pred_test, average="binary")) + mlflow.log_artifacts("artifacts", artifact_path="model") + mlflow.log_param("data_version", "00000") + mlflow.log_param("model_type", "LogisticRegression") + + # Custom python model for predicting probability + mlflow.pyfunc.log_model("model", python_model=utils.ProbaModelWrapper(best_lr_model)) + + # store model for model interpretability + lr_model_path = LR_MODEL_PATH + joblib.dump(value=best_lr_model, filename=lr_model_path) + + # Testing model and storing the columns and model results + lr_cr = {lr_model_path: classification_report(y_test, y_pred_test, output_dict=True)} + + return lr_cr + + +def save_model_results(model_results): + with open(MODEL_RESULTS_PATH, "w+") as results_file: + json.dump(model_results, results_file) + + +def select_best_model(experiment_name): + experiment_ids = [mlflow.get_experiment_by_name(experiment_name).experiment_id] + experiment_best = mlflow.search_runs( + experiment_ids=experiment_ids, order_by=["metrics.f1_score DESC"], max_results=1 + ).iloc[0] + return experiment_best + + +def save_best_model(experiment_best, best_model_type) -> None: + # store the best experiment (model) metadata + experiment_best.to_pickle(BEST_EXPERIMENT_PATH) + + if best_model_type == "XGBoost": + best_model_artifact = "lead_model_xgboost.pkl" + elif best_model_type == "LogisticRegression": + best_model_artifact = "lead_model_lr.pkl" + + original_file_path = f"./artifacts/{best_model_artifact}" + new_file_path = BEST_MODEL_PATH + + shutil.copyfile(original_file_path, new_file_path) + + +def get_production_model(model_name): + return [ + model + for model in client.search_model_versions(f"name='{model_name}'") + if dict(model)["current_stage"] == "Production" + ] + + +def get_model_score(run_id): + data, _ = mlflow.get_run(run_id) + return data[1]["metrics.f1_score"] + + +def register_and_wait_model(run_id, artifact_path, model_name): + model_uri = f"runs:/{run_id}/{artifact_path}" + model_details = mlflow.register_model(model_uri=model_uri, name=model_name) + utils.wait_until_ready(model_details.name, model_details.version) + return dict(model_details) \ No newline at end of file