From f5fdf40fe2aeaa6c3ff605372f8ecbf8a54bf621 Mon Sep 17 00:00:00 2001 From: Sarah Bennert Date: Fri, 5 Dec 2025 13:33:24 -0500 Subject: [PATCH] Add ThreadedScaleResources behavior flags * ThreadedScaleResources.Flags added to alter create/delete behavior Flags: CREATE DELETE CLEANUP_ON_ERROR Default behavior enables all three Flags --- ocp_scale_utilities/threaded/scale.py | 90 +++++++++++++++++---------- pyproject.toml | 2 +- tests/threaded/test_scale.py | 74 +++++++++++++++++++--- uv.lock | 2 +- 4 files changed, 125 insertions(+), 43 deletions(-) diff --git a/ocp_scale_utilities/threaded/scale.py b/ocp_scale_utilities/threaded/scale.py index 76b4280..c3e1b20 100644 --- a/ocp_scale_utilities/threaded/scale.py +++ b/ocp_scale_utilities/threaded/scale.py @@ -3,6 +3,7 @@ import logging import time from contextlib import ExitStack, contextmanager +from enum import Flag, auto from typing import Any, Optional, Sequence import pytest @@ -20,6 +21,11 @@ class ThreadedScaleResources(ExitStack): + class Flags(Flag): + CREATE = auto() + DELETE = auto() + CLEANUP_ON_ERROR = auto() + def __init__( self, resources: Sequence[Resource], @@ -27,6 +33,7 @@ def __init__( pytest_cache: Optional[pytest.Cache] = None, cache_key_prefix: Optional[str] = None, wait_for_status: Optional[str] = None, + flags: Optional[Flags] = None, ): """ Args: @@ -34,6 +41,7 @@ def __init__( pytest_cache (pytest.Cache): config.cache from python run to store results in cache_key_prefix (str): prefix to use for cache_keys wait_for_status (str): Wait for provided status upon deploy + flags (Flags): Optional flags to direct behavior, eg: CREATE|DELETE|CLEANUP_ON_ERROR (default) """ super().__init__() self.resources = resources @@ -42,37 +50,50 @@ def __init__( self.cache_key_prefix = cache_key_prefix self.wait_for_status = wait_for_status + if flags is None: + self.flags = self.Flags.CREATE | self.Flags.DELETE | self.Flags.CLEANUP_ON_ERROR + else: + self.flags = flags + self.collect_data_start_time = time.time() @contextmanager def _cleanup_on_error(self, stack_exit): - with ExitStack() as stack: - stack.push(exit=stack_exit) + if self.Flags.CLEANUP_ON_ERROR in self.flags: + with ExitStack() as stack: + stack.push(exit=stack_exit) + yield + self.collect_data(id="cleanup-on-error", start_time=self.collect_data_start_time) + stack.pop_all() + else: yield - self.collect_data(id="cleanup-on-error", start_time=self.collect_data_start_time) - stack.pop_all() def __enter__(self) -> ThreadedScaleResources: - with self._cleanup_on_error(stack_exit=super().__exit__): - start_time = time.time() - if self.request_resources: - threaded_deploy_requested_resources( - resources=self.resources, request_resources=self.request_resources, exit_stack=self - ) - else: - threaded_deploy_resources(resources=self.resources, exit_stack=self) - - if self.wait_for_status: - threaded_wait_for_resources_status(resources=self.resources, status=self.wait_for_status) - - self.collect_data_start_time = stop_time = time.time() - if self.pytest_cache and self.cache_key_prefix: - self.pytest_cache.set(f"{self.cache_key_prefix}-deploy-count", len(self.resources)) - self.pytest_cache.set(f"{self.cache_key_prefix}-deploy-start", start_time) - self.pytest_cache.set(f"{self.cache_key_prefix}-deploy-stop", stop_time) - self.pytest_cache.set(f"{self.cache_key_prefix}-deploy-elapsed", stop_time - start_time) - - self.collect_data(id="post-enter", start_time=start_time) + if self.Flags.CREATE in self.flags: + with self._cleanup_on_error(stack_exit=super().__exit__): + start_time = time.time() + if self.request_resources: + threaded_deploy_requested_resources( + resources=self.resources, request_resources=self.request_resources, exit_stack=self + ) + else: + threaded_deploy_resources(resources=self.resources, exit_stack=self) + + if self.wait_for_status: + threaded_wait_for_resources_status(resources=self.resources, status=self.wait_for_status) + + self.collect_data_start_time = stop_time = time.time() + if self.pytest_cache and self.cache_key_prefix: + self.pytest_cache.set(f"{self.cache_key_prefix}-deploy-count", len(self.resources)) + self.pytest_cache.set(f"{self.cache_key_prefix}-deploy-start", start_time) + self.pytest_cache.set(f"{self.cache_key_prefix}-deploy-stop", stop_time) + self.pytest_cache.set(f"{self.cache_key_prefix}-deploy-elapsed", stop_time - start_time) + + self.collect_data(id="post-enter", start_time=start_time) + + else: + for resource in self.resources: + self.push(resource.__exit__) return self @@ -83,16 +104,17 @@ def __exit__(self: ThreadedScaleResources, *exc_arguments: Any) -> Any: including any sleeps between batches. Wait for resources to be deleted in reverse order of creation. """ - with self._cleanup_on_error(stack_exit=super().__exit__): - self.collect_data(id="pre-exit", start_time=self.collect_data_start_time) - start_time = time.time() - threaded_delete_resources(resources=self.resources) - threaded_wait_deleted_resources(resources=self.resources) - stop_time = time.time() - if self.pytest_cache and self.cache_key_prefix: - self.pytest_cache.set(f"{self.cache_key_prefix}-delete-start", start_time) - self.pytest_cache.set(f"{self.cache_key_prefix}-delete-stop", stop_time) - self.pytest_cache.set(f"{self.cache_key_prefix}-delete-elapsed", stop_time - start_time) + if self.Flags.DELETE in self.flags: + with self._cleanup_on_error(stack_exit=super().__exit__): + self.collect_data(id="pre-exit", start_time=self.collect_data_start_time) + start_time = time.time() + threaded_delete_resources(resources=self.resources) + threaded_wait_deleted_resources(resources=self.resources) + stop_time = time.time() + if self.pytest_cache and self.cache_key_prefix: + self.pytest_cache.set(f"{self.cache_key_prefix}-delete-start", start_time) + self.pytest_cache.set(f"{self.cache_key_prefix}-delete-stop", stop_time) + self.pytest_cache.set(f"{self.cache_key_prefix}-delete-elapsed", stop_time - start_time) def collect_data(self, id: str, start_time: float): # Placeholder to be defined by child classes for any data collection required diff --git a/pyproject.toml b/pyproject.toml index fc2003c..91bb32f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -49,7 +49,7 @@ dev-dependencies = [ "ipdb>=0.13.13", "ipython>=8.12.3" ] [project] name = "openshift-python-scale-utilities" requires-python = ">=3.12.0" -version = "0.1.1.5" +version = "0.1.2.0" description = "OpenShift Python Scale Utilities" readme = "README.md" license = "Apache-2.0" diff --git a/tests/threaded/test_scale.py b/tests/threaded/test_scale.py index 3b96992..5801dda 100644 --- a/tests/threaded/test_scale.py +++ b/tests/threaded/test_scale.py @@ -3,9 +3,9 @@ from ocp_scale_utilities.threaded.scale import ThreadedScaleResources -@pytest.fixture() -def scaled_pods(crc_admin_client, namespace): - pods = [ +@pytest.fixture(scope="class") +def pods_for_test(crc_admin_client, namespace): + yield [ Pod( name=f"test-pod-{index}", namespace=namespace.name, @@ -19,9 +19,69 @@ def scaled_pods(crc_admin_client, namespace): ) for index in range(10) ] - with ThreadedScaleResources(resources=pods, wait_for_status=Pod.Status.RUNNING): - yield pods -def test_threaded_deploy_resources(scaled_pods): - assert all([pod.exists and pod.status == Pod.Status.RUNNING for pod in scaled_pods]) +@pytest.fixture() +def scaled_pods(pods_for_test): + with ThreadedScaleResources(resources=pods_for_test, wait_for_status=Pod.Status.RUNNING): + yield pods_for_test + + +@pytest.fixture(scope="class") +def created_scaled_pods(pods_for_test): + with ThreadedScaleResources( + resources=pods_for_test, wait_for_status=Pod.Status.RUNNING, flags=ThreadedScaleResources.Flags.CREATE + ): + ... + yield pods_for_test + + +@pytest.fixture(scope="class") +def created_scaled_pods_with_cleanup_on_error(pods_for_test): + with ThreadedScaleResources( + resources=pods_for_test, + wait_for_status=Pod.Status.RUNNING, + flags=ThreadedScaleResources.Flags.CREATE | ThreadedScaleResources.Flags.CLEANUP_ON_ERROR, + ): + ... + yield pods_for_test + + +@pytest.fixture(scope="class") +def deleted_scaled_pods(created_scaled_pods): + with ThreadedScaleResources(resources=created_scaled_pods, flags=ThreadedScaleResources.Flags.DELETE): + ... + yield created_scaled_pods + + +@pytest.fixture(scope="class") +def deleted_scaled_pods_with_cleanup_on_error(created_scaled_pods): + with ThreadedScaleResources( + resources=created_scaled_pods, + flags=ThreadedScaleResources.Flags.DELETE | ThreadedScaleResources.Flags.CLEANUP_ON_ERROR, + ): + ... + yield created_scaled_pods + + +class TestThreadedScaleResourceFlags: + def test_threaded_deploy_resources(self, scaled_pods): + assert all([pod.exists and pod.status == Pod.Status.RUNNING for pod in scaled_pods]) + + @pytest.mark.order(after="TestThreadedScaleResourceFlags::test_threaded_deploy_resources") + def test_create_flag(self, created_scaled_pods): + assert all([pod.exists and pod.status == Pod.Status.RUNNING for pod in created_scaled_pods]) + + @pytest.mark.order(after="TestThreadedScaleResourceFlags::test_create_flag") + def test_delete_flag(self, deleted_scaled_pods): + assert all([not pod.exists for pod in deleted_scaled_pods]) + + @pytest.mark.order(after="TestThreadedScaleResourceFlags::test_delete_flag") + def test_create_flag_with_cleanup_on_error(self, created_scaled_pods_with_cleanup_on_error): + assert all([ + pod.exists and pod.status == Pod.Status.RUNNING for pod in created_scaled_pods_with_cleanup_on_error + ]) + + @pytest.mark.order(after="TestThreadedScaleResourceFlags::test_create_flag_with_cleanup_on_error") + def test_delete_flag_with_cleanup_on_error(self, deleted_scaled_pods_with_cleanup_on_error): + assert all([not pod.exists for pod in deleted_scaled_pods_with_cleanup_on_error]) diff --git a/uv.lock b/uv.lock index 500529c..c0f5b8d 100644 --- a/uv.lock +++ b/uv.lock @@ -559,7 +559,7 @@ wheels = [ [[package]] name = "openshift-python-scale-utilities" -version = "0.1.1.5" +version = "0.1.2.0" source = { editable = "." } dependencies = [ { name = "openshift-python-utilities" },