diff --git a/tests/test_runs/test_run_functions.py b/tests/test_runs/test_run_functions.py index 725421d4f..0517c8c9c 100644 --- a/tests/test_runs/test_run_functions.py +++ b/tests/test_runs/test_run_functions.py @@ -1746,156 +1746,7 @@ def test_format_prediction_task_regression(self): res = format_prediction(regression, *ignored_input) self.assertListEqual(res, [0] * 5) - @pytest.mark.sklearn() - @unittest.skipIf( - Version(sklearn.__version__) < Version("0.21"), - reason="couldn't perform local tests successfully w/o bloating RAM", - ) - @mock.patch("openml_sklearn.SklearnExtension._prevent_optimize_n_jobs") - def test__run_task_get_arffcontent_2(self, parallel_mock): - """Tests if a run executed in parallel is collated correctly.""" - task = openml.tasks.get_task(7) # Supervised Classification on kr-vs-kp - x, y = task.get_X_and_y() - num_instances = x.shape[0] - line_length = 6 + len(task.class_labels) - loss = "log" if Version(sklearn.__version__) < Version("1.3") else "log_loss" - clf = sklearn.pipeline.Pipeline( - [ - ( - "cat_handling", - ColumnTransformer( - transformers=[ - ( - "cat", - OneHotEncoder(handle_unknown="ignore"), - x.select_dtypes(include=["object", "category"]).columns, - ) - ], - remainder="passthrough", - ), - ), - ("clf", SGDClassifier(loss=loss, random_state=1)), - ] - ) - n_jobs = 2 - backend = "loky" if Version(joblib.__version__) > Version("0.11") else "multiprocessing" - with parallel_backend(backend, n_jobs=n_jobs): - res = openml.runs.functions._run_task_get_arffcontent( - extension=self.extension, - model=clf, - task=task, - add_local_measures=True, - n_jobs=n_jobs, - ) - # This unit test will fail if joblib is unable to distribute successfully since the - # function _run_model_on_fold is being mocked out. However, for a new spawned worker, it - # is not and the mock call_count should remain 0 while the subsequent check of actual - # results should also hold, only on successful distribution of tasks to workers. - # The _prevent_optimize_n_jobs() is a function executed within the _run_model_on_fold() - # block and mocking this function doesn't affect rest of the pipeline, but is adequately - # indicative if _run_model_on_fold() is being called or not. - assert parallel_mock.call_count == 0 - assert isinstance(res[0], list) - assert len(res[0]) == num_instances - assert len(res[0][0]) == line_length - assert len(res[2]) == 7 - assert len(res[3]) == 7 - expected_scores = [ - 0.9625, - 0.953125, - 0.965625, - 0.9125, - 0.98125, - 0.975, - 0.9247648902821317, - 0.9404388714733543, - 0.9780564263322884, - 0.9623824451410659, - ] - scores = [v for k, v in res[2]["predictive_accuracy"][0].items()] - np.testing.assert_array_almost_equal( - scores, - expected_scores, - decimal=2, - err_msg="Observed performance scores deviate from expected ones.", - ) - @pytest.mark.sklearn() - @unittest.skipIf( - Version(sklearn.__version__) < Version("0.21"), - reason="couldn't perform local tests successfully w/o bloating RAM", - ) - @mock.patch("openml_sklearn.SklearnExtension._prevent_optimize_n_jobs") - def test_joblib_backends(self, parallel_mock): - """Tests evaluation of a run using various joblib backends and n_jobs.""" - task = openml.tasks.get_task(7) # Supervised Classification on kr-vs-kp - x, y = task.get_X_and_y() - num_instances = x.shape[0] - line_length = 6 + len(task.class_labels) - - backend_choice = ( - "loky" if Version(joblib.__version__) > Version("0.11") else "multiprocessing" - ) - for n_jobs, backend, call_count in [ - (1, backend_choice, 10), - (2, backend_choice, 10), - (-1, backend_choice, 10), - (1, "threading", 20), - (-1, "threading", 30), - (1, "sequential", 40), - ]: - clf = sklearn.model_selection.RandomizedSearchCV( - estimator=sklearn.pipeline.Pipeline( - [ - ( - "cat_handling", - ColumnTransformer( - transformers=[ - ( - "cat", - OneHotEncoder(handle_unknown="ignore"), - x.select_dtypes(include=["object", "category"]).columns, - ) - ], - remainder="passthrough", - ), - ), - ("clf", sklearn.ensemble.RandomForestClassifier(n_estimators=5)), - ] - ), - param_distributions={ - "clf__max_depth": [3, None], - "clf__max_features": [1, 2, 3, 4], - "clf__min_samples_split": [2, 3, 4, 5, 6, 7, 8, 9, 10], - "clf__min_samples_leaf": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], - "clf__bootstrap": [True, False], - "clf__criterion": ["gini", "entropy"], - }, - random_state=1, - cv=sklearn.model_selection.StratifiedKFold( - n_splits=2, - shuffle=True, - random_state=1, - ), - n_iter=5, - n_jobs=n_jobs, - ) - with parallel_backend(backend, n_jobs=n_jobs): - res = openml.runs.functions._run_task_get_arffcontent( - extension=self.extension, - model=clf, - task=task, - add_local_measures=True, - n_jobs=n_jobs, - ) - assert type(res[0]) == list - assert len(res[0]) == num_instances - assert len(res[0][0]) == line_length - # usercpu_time_millis_* not recorded when n_jobs > 1 - # *_time_millis_* not recorded when n_jobs = -1 - assert len(res[2]["predictive_accuracy"][0]) == 10 - assert len(res[3]["predictive_accuracy"][0]) == 10 - assert parallel_mock.call_count == call_count @unittest.skipIf( Version(sklearn.__version__) < Version("0.20"), @@ -1993,3 +1844,167 @@ def test_delete_unknown_run(mock_delete, test_files_directory, test_api_key): run_url = "https://test.openml.org/api/v1/xml/run/9999999" assert run_url == mock_delete.call_args.args[0] assert test_api_key == mock_delete.call_args.kwargs.get("params", {}).get("api_key") + + +@pytest.mark.sklearn() +@unittest.skipIf( + Version(sklearn.__version__) < Version("0.21"), + reason="couldn't perform local tests successfully w/o bloating RAM", + ) +@mock.patch("openml_sklearn.SklearnExtension._prevent_optimize_n_jobs") +def test__run_task_get_arffcontent_2(parallel_mock): + """Tests if a run executed in parallel is collated correctly.""" + task = openml.tasks.get_task(7) # Supervised Classification on kr-vs-kp + x, y = task.get_X_and_y() + num_instances = x.shape[0] + line_length = 6 + len(task.class_labels) + loss = "log" if Version(sklearn.__version__) < Version("1.3") else "log_loss" + clf = sklearn.pipeline.Pipeline( + [ + ( + "cat_handling", + ColumnTransformer( + transformers=[ + ( + "cat", + OneHotEncoder(handle_unknown="ignore"), + x.select_dtypes(include=["object", "category"]).columns, + ) + ], + remainder="passthrough", + ), + ), + ("clf", SGDClassifier(loss=loss, random_state=1)), + ] + ) + n_jobs = 2 + backend = "loky" if Version(joblib.__version__) > Version("0.11") else "multiprocessing" + from openml_sklearn import SklearnExtension + extension = SklearnExtension() + with parallel_backend(backend, n_jobs=n_jobs): + res = openml.runs.functions._run_task_get_arffcontent( + extension=extension, + model=clf, + task=task, + add_local_measures=True, + n_jobs=n_jobs, + ) + # This unit test will fail if joblib is unable to distribute successfully since the + # function _run_model_on_fold is being mocked out. However, for a new spawned worker, it + # is not and the mock call_count should remain 0 while the subsequent check of actual + # results should also hold, only on successful distribution of tasks to workers. + # The _prevent_optimize_n_jobs() is a function executed within the _run_model_on_fold() + # block and mocking this function doesn't affect rest of the pipeline, but is adequately + # indicative if _run_model_on_fold() is being called or not. + assert parallel_mock.call_count == 0 + assert isinstance(res[0], list) + assert len(res[0]) == num_instances + assert len(res[0][0]) == line_length + assert len(res[2]) == 7 + assert len(res[3]) == 7 + expected_scores = [ + 0.9625, + 0.953125, + 0.965625, + 0.9125, + 0.98125, + 0.975, + 0.9247648902821317, + 0.9404388714733543, + 0.9780564263322884, + 0.9623824451410659, + ] + scores = [v for k, v in res[2]["predictive_accuracy"][0].items()] + np.testing.assert_array_almost_equal( + scores, + expected_scores, + decimal=2, + err_msg="Observed performance scores deviate from expected ones.", + ) + + +@pytest.mark.sklearn() +@unittest.skipIf( + Version(sklearn.__version__) < Version("0.21"), + reason="couldn't perform local tests successfully w/o bloating RAM", + ) +@mock.patch("openml_sklearn.SklearnExtension._prevent_optimize_n_jobs") +@pytest.mark.parametrize( + ("n_jobs", "backend", "call_count"), + [ + # `None` picks the backend based on joblib version (loky or multiprocessing) and + # spawns multiple processes if n_jobs != 1, which means the mock is not applied. + (2, None, 0), + (-1, None, 0), + (1, None, 10), # with n_jobs=1 the mock *is* applied, since there is no new subprocess + (1, "sequential", 10), + (1, "threading", 10), + (-1, "threading", 10), # the threading backend does preserve mocks even with parallelizing + ] +) +def test_joblib_backends(parallel_mock, n_jobs, backend, call_count): + """Tests evaluation of a run using various joblib backends and n_jobs.""" + if backend is None: + backend = ( + "loky" if Version(joblib.__version__) > Version("0.11") else "multiprocessing" + ) + + task = openml.tasks.get_task(7) # Supervised Classification on kr-vs-kp + x, y = task.get_X_and_y() + num_instances = x.shape[0] + line_length = 6 + len(task.class_labels) + + clf = sklearn.model_selection.RandomizedSearchCV( + estimator=sklearn.pipeline.Pipeline( + [ + ( + "cat_handling", + ColumnTransformer( + transformers=[ + ( + "cat", + OneHotEncoder(handle_unknown="ignore"), + x.select_dtypes(include=["object", "category"]).columns, + ) + ], + remainder="passthrough", + ), + ), + ("clf", sklearn.ensemble.RandomForestClassifier(n_estimators=5)), + ] + ), + param_distributions={ + "clf__max_depth": [3, None], + "clf__max_features": [1, 2, 3, 4], + "clf__min_samples_split": [2, 3, 4, 5, 6, 7, 8, 9, 10], + "clf__min_samples_leaf": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10], + "clf__bootstrap": [True, False], + "clf__criterion": ["gini", "entropy"], + }, + random_state=1, + cv=sklearn.model_selection.StratifiedKFold( + n_splits=2, + shuffle=True, + random_state=1, + ), + n_iter=5, + n_jobs=n_jobs, + ) + from openml_sklearn import SklearnExtension + extension = SklearnExtension() + with parallel_backend(backend, n_jobs=n_jobs): + res = openml.runs.functions._run_task_get_arffcontent( + extension=extension, + model=clf, + task=task, + add_local_measures=True, + n_jobs=n_jobs, + ) + assert type(res[0]) == list + assert len(res[0]) == num_instances + assert len(res[0][0]) == line_length + # usercpu_time_millis_* not recorded when n_jobs > 1 + # *_time_millis_* not recorded when n_jobs = -1 + assert len(res[2]["predictive_accuracy"][0]) == 10 + assert len(res[3]["predictive_accuracy"][0]) == 10 + assert parallel_mock.call_count == call_count