Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
313 changes: 164 additions & 149 deletions tests/test_runs/test_run_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -1746,156 +1746,7 @@ def test_format_prediction_task_regression(self):
res = format_prediction(regression, *ignored_input)
self.assertListEqual(res, [0] * 5)

@pytest.mark.sklearn()
@unittest.skipIf(
Version(sklearn.__version__) < Version("0.21"),
reason="couldn't perform local tests successfully w/o bloating RAM",
)
@mock.patch("openml_sklearn.SklearnExtension._prevent_optimize_n_jobs")
def test__run_task_get_arffcontent_2(self, parallel_mock):
"""Tests if a run executed in parallel is collated correctly."""
task = openml.tasks.get_task(7) # Supervised Classification on kr-vs-kp
x, y = task.get_X_and_y()
num_instances = x.shape[0]
line_length = 6 + len(task.class_labels)
loss = "log" if Version(sklearn.__version__) < Version("1.3") else "log_loss"
clf = sklearn.pipeline.Pipeline(
[
(
"cat_handling",
ColumnTransformer(
transformers=[
(
"cat",
OneHotEncoder(handle_unknown="ignore"),
x.select_dtypes(include=["object", "category"]).columns,
)
],
remainder="passthrough",
),
),
("clf", SGDClassifier(loss=loss, random_state=1)),
]
)
n_jobs = 2
backend = "loky" if Version(joblib.__version__) > Version("0.11") else "multiprocessing"
with parallel_backend(backend, n_jobs=n_jobs):
res = openml.runs.functions._run_task_get_arffcontent(
extension=self.extension,
model=clf,
task=task,
add_local_measures=True,
n_jobs=n_jobs,
)
# This unit test will fail if joblib is unable to distribute successfully since the
# function _run_model_on_fold is being mocked out. However, for a new spawned worker, it
# is not and the mock call_count should remain 0 while the subsequent check of actual
# results should also hold, only on successful distribution of tasks to workers.
# The _prevent_optimize_n_jobs() is a function executed within the _run_model_on_fold()
# block and mocking this function doesn't affect rest of the pipeline, but is adequately
# indicative if _run_model_on_fold() is being called or not.
assert parallel_mock.call_count == 0
assert isinstance(res[0], list)
assert len(res[0]) == num_instances
assert len(res[0][0]) == line_length
assert len(res[2]) == 7
assert len(res[3]) == 7
expected_scores = [
0.9625,
0.953125,
0.965625,
0.9125,
0.98125,
0.975,
0.9247648902821317,
0.9404388714733543,
0.9780564263322884,
0.9623824451410659,
]
scores = [v for k, v in res[2]["predictive_accuracy"][0].items()]
np.testing.assert_array_almost_equal(
scores,
expected_scores,
decimal=2,
err_msg="Observed performance scores deviate from expected ones.",
)

@pytest.mark.sklearn()
@unittest.skipIf(
Version(sklearn.__version__) < Version("0.21"),
reason="couldn't perform local tests successfully w/o bloating RAM",
)
@mock.patch("openml_sklearn.SklearnExtension._prevent_optimize_n_jobs")
def test_joblib_backends(self, parallel_mock):
"""Tests evaluation of a run using various joblib backends and n_jobs."""
task = openml.tasks.get_task(7) # Supervised Classification on kr-vs-kp
x, y = task.get_X_and_y()
num_instances = x.shape[0]
line_length = 6 + len(task.class_labels)

backend_choice = (
"loky" if Version(joblib.__version__) > Version("0.11") else "multiprocessing"
)
for n_jobs, backend, call_count in [
(1, backend_choice, 10),
(2, backend_choice, 10),
(-1, backend_choice, 10),
(1, "threading", 20),
(-1, "threading", 30),
(1, "sequential", 40),
]:
clf = sklearn.model_selection.RandomizedSearchCV(
estimator=sklearn.pipeline.Pipeline(
[
(
"cat_handling",
ColumnTransformer(
transformers=[
(
"cat",
OneHotEncoder(handle_unknown="ignore"),
x.select_dtypes(include=["object", "category"]).columns,
)
],
remainder="passthrough",
),
),
("clf", sklearn.ensemble.RandomForestClassifier(n_estimators=5)),
]
),
param_distributions={
"clf__max_depth": [3, None],
"clf__max_features": [1, 2, 3, 4],
"clf__min_samples_split": [2, 3, 4, 5, 6, 7, 8, 9, 10],
"clf__min_samples_leaf": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
"clf__bootstrap": [True, False],
"clf__criterion": ["gini", "entropy"],
},
random_state=1,
cv=sklearn.model_selection.StratifiedKFold(
n_splits=2,
shuffle=True,
random_state=1,
),
n_iter=5,
n_jobs=n_jobs,
)
with parallel_backend(backend, n_jobs=n_jobs):
res = openml.runs.functions._run_task_get_arffcontent(
extension=self.extension,
model=clf,
task=task,
add_local_measures=True,
n_jobs=n_jobs,
)
assert type(res[0]) == list
assert len(res[0]) == num_instances
assert len(res[0][0]) == line_length
# usercpu_time_millis_* not recorded when n_jobs > 1
# *_time_millis_* not recorded when n_jobs = -1
assert len(res[2]["predictive_accuracy"][0]) == 10
assert len(res[3]["predictive_accuracy"][0]) == 10
assert parallel_mock.call_count == call_count

@unittest.skipIf(
Version(sklearn.__version__) < Version("0.20"),
Expand Down Expand Up @@ -1993,3 +1844,167 @@ def test_delete_unknown_run(mock_delete, test_files_directory, test_api_key):
run_url = "https://test.openml.org/api/v1/xml/run/9999999"
assert run_url == mock_delete.call_args.args[0]
assert test_api_key == mock_delete.call_args.kwargs.get("params", {}).get("api_key")


@pytest.mark.sklearn()
@unittest.skipIf(
Version(sklearn.__version__) < Version("0.21"),
reason="couldn't perform local tests successfully w/o bloating RAM",
)
@mock.patch("openml_sklearn.SklearnExtension._prevent_optimize_n_jobs")
def test__run_task_get_arffcontent_2(parallel_mock):
"""Tests if a run executed in parallel is collated correctly."""
task = openml.tasks.get_task(7) # Supervised Classification on kr-vs-kp
x, y = task.get_X_and_y()
num_instances = x.shape[0]
line_length = 6 + len(task.class_labels)
loss = "log" if Version(sklearn.__version__) < Version("1.3") else "log_loss"
clf = sklearn.pipeline.Pipeline(
[
(
"cat_handling",
ColumnTransformer(
transformers=[
(
"cat",
OneHotEncoder(handle_unknown="ignore"),
x.select_dtypes(include=["object", "category"]).columns,
)
],
remainder="passthrough",
),
),
("clf", SGDClassifier(loss=loss, random_state=1)),
]
)
n_jobs = 2
backend = "loky" if Version(joblib.__version__) > Version("0.11") else "multiprocessing"
from openml_sklearn import SklearnExtension
extension = SklearnExtension()
with parallel_backend(backend, n_jobs=n_jobs):
res = openml.runs.functions._run_task_get_arffcontent(
extension=extension,
model=clf,
task=task,
add_local_measures=True,
n_jobs=n_jobs,
)
# This unit test will fail if joblib is unable to distribute successfully since the
# function _run_model_on_fold is being mocked out. However, for a new spawned worker, it
# is not and the mock call_count should remain 0 while the subsequent check of actual
# results should also hold, only on successful distribution of tasks to workers.
# The _prevent_optimize_n_jobs() is a function executed within the _run_model_on_fold()
# block and mocking this function doesn't affect rest of the pipeline, but is adequately
# indicative if _run_model_on_fold() is being called or not.
assert parallel_mock.call_count == 0
assert isinstance(res[0], list)
assert len(res[0]) == num_instances
assert len(res[0][0]) == line_length
assert len(res[2]) == 7
assert len(res[3]) == 7
expected_scores = [
0.9625,
0.953125,
0.965625,
0.9125,
0.98125,
0.975,
0.9247648902821317,
0.9404388714733543,
0.9780564263322884,
0.9623824451410659,
]
scores = [v for k, v in res[2]["predictive_accuracy"][0].items()]
np.testing.assert_array_almost_equal(
scores,
expected_scores,
decimal=2,
err_msg="Observed performance scores deviate from expected ones.",
)


@pytest.mark.sklearn()
@unittest.skipIf(
Version(sklearn.__version__) < Version("0.21"),
reason="couldn't perform local tests successfully w/o bloating RAM",
)
@mock.patch("openml_sklearn.SklearnExtension._prevent_optimize_n_jobs")
@pytest.mark.parametrize(
("n_jobs", "backend", "call_count"),
[
# `None` picks the backend based on joblib version (loky or multiprocessing) and
# spawns multiple processes if n_jobs != 1, which means the mock is not applied.
(2, None, 0),
(-1, None, 0),
(1, None, 10), # with n_jobs=1 the mock *is* applied, since there is no new subprocess
(1, "sequential", 10),
(1, "threading", 10),
(-1, "threading", 10), # the threading backend does preserve mocks even with parallelizing
]
)
def test_joblib_backends(parallel_mock, n_jobs, backend, call_count):
"""Tests evaluation of a run using various joblib backends and n_jobs."""
if backend is None:
backend = (
"loky" if Version(joblib.__version__) > Version("0.11") else "multiprocessing"
)

task = openml.tasks.get_task(7) # Supervised Classification on kr-vs-kp
x, y = task.get_X_and_y()
num_instances = x.shape[0]
line_length = 6 + len(task.class_labels)

clf = sklearn.model_selection.RandomizedSearchCV(
estimator=sklearn.pipeline.Pipeline(
[
(
"cat_handling",
ColumnTransformer(
transformers=[
(
"cat",
OneHotEncoder(handle_unknown="ignore"),
x.select_dtypes(include=["object", "category"]).columns,
)
],
remainder="passthrough",
),
),
("clf", sklearn.ensemble.RandomForestClassifier(n_estimators=5)),
]
),
param_distributions={
"clf__max_depth": [3, None],
"clf__max_features": [1, 2, 3, 4],
"clf__min_samples_split": [2, 3, 4, 5, 6, 7, 8, 9, 10],
"clf__min_samples_leaf": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
"clf__bootstrap": [True, False],
"clf__criterion": ["gini", "entropy"],
},
random_state=1,
cv=sklearn.model_selection.StratifiedKFold(
n_splits=2,
shuffle=True,
random_state=1,
),
n_iter=5,
n_jobs=n_jobs,
)
from openml_sklearn import SklearnExtension
extension = SklearnExtension()
with parallel_backend(backend, n_jobs=n_jobs):
res = openml.runs.functions._run_task_get_arffcontent(
extension=extension,
model=clf,
task=task,
add_local_measures=True,
n_jobs=n_jobs,
)
assert type(res[0]) == list
assert len(res[0]) == num_instances
assert len(res[0][0]) == line_length
# usercpu_time_millis_* not recorded when n_jobs > 1
# *_time_millis_* not recorded when n_jobs = -1
assert len(res[2]["predictive_accuracy"][0]) == 10
assert len(res[3]["predictive_accuracy"][0]) == 10
assert parallel_mock.call_count == call_count