diff --git a/servo/connectors/kubernetes.py b/servo/connectors/kubernetes.py index 39a69c5fd..56fa79e34 100644 --- a/servo/connectors/kubernetes.py +++ b/servo/connectors/kubernetes.py @@ -3637,6 +3637,7 @@ async def create( name = container_config.alias or ( f"{deployment.name}/{container.name}" if container else deployment.name ) + return cls( name=name, deployment_config=config, @@ -3970,6 +3971,7 @@ async def create( main_container = await deployment_or_rollout.get_target_container( container_config ) + name = ( deployment_or_rollout_config.strategy.alias if isinstance( @@ -4926,8 +4928,10 @@ class ContainerConfiguration(servo.BaseConfiguration): name: ContainerTagName alias: Optional[ContainerTagName] command: Optional[str] # TODO: create model... - cpu: CPU - memory: Memory + cpu: Optional[CPU] + cpu_autoset_multiplier: pydantic.conlist(float, min_items=2, max_items=2) + memory: Optional[Memory] + memory_autoset_multiplier: pydantic.conlist(float, min_items=2, max_items=2) env: Optional[servo.EnvironmentSettingList] static_environment_variables: Optional[Dict[str, str]] @@ -5312,16 +5316,17 @@ async def _check_container_resource_requirements( for resource in Resource.values(): current_state = None container_requirements = container.get_resource_requirements(resource) - get_requirements = getattr(cont_config, resource).get - for requirement in get_requirements: - current_state = container_requirements.get(requirement) - if current_state: - break + if getattr(cont_config, resource) is not None: + get_requirements = getattr(cont_config, resource).get + for requirement in get_requirements: + current_state = container_requirements.get(requirement) + if current_state: + break - assert current_state, ( - f"{type(target_controller).__name__} {target_config.name} target container {cont_config.name} spec does not define the resource {resource}. " - f"At least one of the following must be specified: {', '.join(map(lambda req: req.resources_key, get_requirements))}" - ) + assert current_state, ( + f"{type(target_controller).__name__} {target_config.name} target container {cont_config.name} spec does not define the resource {resource}. " + f"At least one of the following must be specified: {', '.join(map(lambda req: req.resources_key, get_requirements))}" + ) @servo.multicheck( 'Containers in the "{item.name}" Deployment have resource requirements' @@ -5384,6 +5389,52 @@ async def check_rollout(rol_config: RolloutConfiguration) -> None: class KubernetesConnector(servo.BaseConnector): config: KubernetesConfiguration + @servo.on_event() + async def startup(self) -> None: + + # Autoset CPU and memory range based on resources + for deployment_or_rollout_config in (self.config.deployments or []) + ( + self.config.rollouts or [] + ): + read_args = ( + deployment_or_rollout_config.name, + cast(str, deployment_or_rollout_config.namespace), + ) + deployment_or_rollout = await Deployment.read(*read_args) + container_config = deployment_or_rollout_config.containers[0] + main_container = await deployment_or_rollout.get_target_container( + container_config + ) + + if not container_config.cpu: + cpu_resources = main_container.get_resource_requirements("cpu") + # Set requests = limits if not specified + if (cpu_request := cpu_resources[ResourceRequirement.request]) is None: + cpu_request = cpu_resources[ResourceRequirement.limit] + + cpu_resource = Core.parse(cpu_request).__opsani_repr__() + cpu_autoset = autoset_resource_range( + "cpu", + value=cpu_resource, + multiplier=container_config.cpu_autoset_multiplier, + ) + container_config.cpu = cpu_autoset + + if not container_config.memory: + memory_resources = main_container.get_resource_requirements("memory") + if ( + memory_request := memory_resources[ResourceRequirement.request] + ) is None: + memory_request = memory_resources[ResourceRequirement.limit] + + memory_resource = ShortByteSize.validate(memory_request) + memory_autoset = autoset_resource_range( + "memory", + value=memory_resource, + multiplier=container_config.memory_autoset_multiplier, + ) + container_config.memory = memory_autoset + @servo.on_event() async def attach(self, servo_: servo.Servo) -> None: # Ensure we are ready to talk to Kubernetes API @@ -5762,27 +5813,60 @@ def set_container_resource_defaults_from_config( container: Container, config: ContainerConfiguration ) -> None: for resource in Resource.values(): - # NOTE: cpu/memory stanza in container config - resource_config = getattr(config, resource) - requirements = container.get_resource_requirements(resource) - servo.logger.debug( - f"Loaded resource requirements for '{resource}': {requirements}" + # NOTE: cpu/memory stanza in container config (if set) + if (resource_config := getattr(config, resource)) is not None: + requirements = container.get_resource_requirements(resource) + servo.logger.debug( + f"Loaded resource requirements for '{resource}': {requirements}" + ) + for requirement in ResourceRequirement: + # Use the request/limit from the container.[cpu|memory].[request|limit] as default/override + if resource_value := getattr(resource_config, requirement.name): + if ( + existing_resource_value := requirements.get(requirement) + ) is None: + servo.logger.debug( + f"Setting default value for {resource}.{requirement} to: {resource_value}" + ) + else: + servo.logger.debug( + f"Overriding existing value for {resource}.{requirement} ({existing_resource_value}) to: {resource_value}" + ) + + requirements[requirement] = resource_value + + servo.logger.debug( + f"Setting resource requirements for '{resource}' to: {requirements}" + ) + container.set_resource_requirements(resource, requirements) + + +def autoset_resource_range( + resource_type: Resource, value: float, multiplier: list[float] +) -> Union[CPU, Memory]: + + min_multiplier = multiplier[0] + max_multiplier = multiplier[1] + + servo.logger.trace(f"Retrieved {resource_type} defined resource: {value}") + + resource_min = value / min_multiplier + resource_max = value * max_multiplier + + if resource_type == Resource.cpu: + + resource_autoset = CPU( + min=Core(resource_min), max=Core(resource_max), step="125m" ) - for requirement in ResourceRequirement: - # Use the request/limit from the container.[cpu|memory].[request|limit] as default/override - if resource_value := getattr(resource_config, requirement.name): - if (existing_resource_value := requirements.get(requirement)) is None: - servo.logger.debug( - f"Setting default value for {resource}.{requirement} to: {resource_value}" - ) - else: - servo.logger.debug( - f"Overriding existing value for {resource}.{requirement} ({existing_resource_value}) to: {resource_value}" - ) - requirements[requirement] = resource_value + elif resource_type == Resource.memory: - servo.logger.debug( - f"Setting resource requirements for '{resource}' to: {requirements}" + resource_autoset = Memory( + min=ShortByteSize.validate(str(resource_min)), + max=ShortByteSize.validate(str(resource_max)), + step="128 MiB", ) - container.set_resource_requirements(resource, requirements) + + servo.logger.info(f"Autosetting {resource_type} range to: {resource_autoset}") + + return resource_autoset diff --git a/servo/connectors/opsani_dev.py b/servo/connectors/opsani_dev.py index af4fce1dc..3a223394c 100644 --- a/servo/connectors/opsani_dev.py +++ b/servo/connectors/opsani_dev.py @@ -63,8 +63,13 @@ class OpsaniDevConfiguration(servo.BaseConfiguration): container: str service: str port: Optional[Union[pydantic.StrictInt, str]] = None - cpu: CPU - memory: Memory + cpu: Optional[CPU] + cpu_autoset_multiplier: pydantic.conlist(float, min_items=2, max_items=2) = [4, 3] + memory: Optional[Memory] + memory_autoset_multiplier: pydantic.conlist(float, min_items=2, max_items=2) = [ + 4, + 3, + ] env: Optional[servo.EnvironmentSettingList] static_environment_variables: Optional[Dict[str, str]] prometheus_base_url: str = PROMETHEUS_SIDECAR_BASE_URL @@ -116,6 +121,7 @@ def generate_kubernetes_config( ] = servo.connectors.kubernetes.DefaultOptimizationStrategyConfiguration() if self.create_tuning_pod: + strategy = ( servo.connectors.kubernetes.CanaryOptimizationStrategyConfiguration( type=servo.connectors.kubernetes.OptimizationStrategy.canary, @@ -123,11 +129,20 @@ def generate_kubernetes_config( ) ) - replicas = servo.Replicas(min=0, max=1, pinned=True) + replicas = servo.Replicas( + min=0, max=1, pinned=True # NOTE always pinned for now + ) else: # NOTE: currently assuming we NEVER want to adjust the main deployment with the opsani_dev connector # TODO: Do we ever need to support opsani dev bootstrapping of non-canary adjusted optimization of deployments? + + # Just load defaults when create_tuning_pod is False - these values aren't used and just hold the pinned setting + if not self.cpu: + self.cpu = CPU(min="250m", max="3000m") + if not self.memory: + self.memory = Memory(min="256 MiB", max="3.0 GiB") + self.cpu.pinned = True self.memory.pinned = True @@ -142,7 +157,9 @@ def generate_kubernetes_config( name=self.container, alias="main", cpu=self.cpu, + cpu_autoset_multiplier=self.cpu_autoset_multiplier, memory=self.memory, + memory_autoset_multiplier=self.memory_autoset_multiplier, static_environment_variables=self.static_environment_variables, env=self.env, ) @@ -434,16 +451,18 @@ async def check_resource_requirements(self) -> None: for resource in servo.connectors.kubernetes.Resource.values(): current_state = None container_requirements = container.get_resource_requirements(resource) - get_requirements = getattr(self.config, resource).get - for requirement in get_requirements: - current_state = container_requirements.get(requirement) - if current_state: - break - assert current_state, ( - f"{self.controller_type_name} {self.config_controller_name} target container {self.config.container} spec does not define the resource {resource}. " - f"At least one of the following must be specified: {', '.join(map(lambda req: req.resources_key, get_requirements))}" - ) + if getattr(self.config, resource) is not None: + get_requirements = getattr(self.config, resource).get + for requirement in get_requirements: + current_state = container_requirements.get(requirement) + if current_state: + break + + assert current_state, ( + f"{self.controller_type_name} {self.config_controller_name} target container {self.config.container} spec does not define the resource {resource}. " + f"At least one of the following must be specified: {', '.join(map(lambda req: req.resources_key, get_requirements))}" + ) @servo.checks.require("Target container resources fall within optimization range") async def check_target_container_resources_within_limits(self) -> None: @@ -471,53 +490,63 @@ async def check_target_container_resources_within_limits(self) -> None: # Get resource requirements from container # TODO: This needs to reuse the logic from CanaryOptimization class (tuning_cpu, tuning_memory, etc properties) - cpu_resource_requirements = target_container.get_resource_requirements("cpu") - cpu_resource_value = cpu_resource_requirements.get( - next( - filter( - lambda r: cpu_resource_requirements[r] is not None, - self.config.cpu.get, - ), - None, + if self.config.cpu is not None: + cpu_resource_requirements = target_container.get_resource_requirements( + "cpu" + ) + cpu_resource_value = cpu_resource_requirements.get( + next( + filter( + lambda r: cpu_resource_requirements[r] is not None, + self.config.cpu.get, + ), + None, + ) + ) + container_cpu_value = servo.connectors.kubernetes.Core.parse( + cpu_resource_value ) - ) - container_cpu_value = servo.connectors.kubernetes.Core.parse(cpu_resource_value) - memory_resource_requirements = target_container.get_resource_requirements( - "memory" - ) - memory_resource_value = memory_resource_requirements.get( - next( - filter( - lambda r: memory_resource_requirements[r] is not None, - self.config.memory.get, - ), - None, + # Get config values + config_cpu_min = self.config.cpu.min + config_cpu_max = self.config.cpu.max + + # Check values against config. + assert ( + container_cpu_value >= config_cpu_min + ), f"target container CPU value {container_cpu_value.human_readable()} must be greater than optimizable minimum {config_cpu_min.human_readable()}" + assert ( + container_cpu_value <= config_cpu_max + ), f"target container CPU value {container_cpu_value.human_readable()} must be less than optimizable maximum {config_cpu_max.human_readable()}" + + if self.config.memory is not None: + memory_resource_requirements = target_container.get_resource_requirements( + "memory" + ) + memory_resource_value = memory_resource_requirements.get( + next( + filter( + lambda r: memory_resource_requirements[r] is not None, + self.config.memory.get, + ), + None, + ) + ) + container_memory_value = servo.connectors.kubernetes.ShortByteSize.validate( + memory_resource_value ) - ) - container_memory_value = servo.connectors.kubernetes.ShortByteSize.validate( - memory_resource_value - ) - # Get config values - config_cpu_min = self.config.cpu.min - config_cpu_max = self.config.cpu.max - config_memory_min = self.config.memory.min - config_memory_max = self.config.memory.max + # Get config values + config_memory_min = self.config.memory.min + config_memory_max = self.config.memory.max - # Check values against config. - assert ( - container_cpu_value >= config_cpu_min - ), f"target container CPU value {container_cpu_value.human_readable()} must be greater than optimizable minimum {config_cpu_min.human_readable()}" - assert ( - container_cpu_value <= config_cpu_max - ), f"target container CPU value {container_cpu_value.human_readable()} must be less than optimizable maximum {config_cpu_max.human_readable()}" - assert ( - container_memory_value >= config_memory_min - ), f"target container Memory value {container_memory_value.human_readable()} must be greater than optimizable minimum {config_memory_min.human_readable()}" - assert ( - container_memory_value <= config_memory_max - ), f"target container Memory value {container_memory_value.human_readable()} must be less than optimizable maximum {config_memory_max.human_readable()}" + # Check values against config. + assert ( + container_memory_value >= config_memory_min + ), f"target container Memory value {container_memory_value.human_readable()} must be greater than optimizable minimum {config_memory_min.human_readable()}" + assert ( + container_memory_value <= config_memory_max + ), f"target container Memory value {container_memory_value.human_readable()} must be less than optimizable maximum {config_memory_max.human_readable()}" @servo.require( '{self.controller_type_name} "{self.config_controller_name}" is ready' diff --git a/tests/connectors/opsani_dev_test.py b/tests/connectors/opsani_dev_test.py index 71e754028..4255e8eb3 100644 --- a/tests/connectors/opsani_dev_test.py +++ b/tests/connectors/opsani_dev_test.py @@ -50,6 +50,17 @@ def config(kube) -> servo.connectors.opsani_dev.OpsaniDevConfiguration: ) +@pytest.fixture +def no_resources_config(kube) -> servo.connectors.opsani_dev.OpsaniDevConfiguration: + return servo.connectors.opsani_dev.OpsaniDevConfiguration( + namespace=kube.namespace, + deployment="fiber-http", + container="fiber-http", + service="fiber-http", + __optimizer__=servo.configuration.Optimizer(id="test.com/foo", token="12345"), + ) + + @pytest.fixture def no_tuning_config(kube) -> servo.connectors.opsani_dev.OpsaniDevConfiguration: return servo.connectors.opsani_dev.OpsaniDevConfiguration( @@ -185,10 +196,24 @@ def test_generate_kubernetes_config(self) -> None: 0 ].static_environment_variables == {"FOO": "BAR", "BAZ": "1"} + def test_generate_no_resources_config(self) -> None: + no_resources_config = servo.connectors.opsani_dev.OpsaniDevConfiguration( + namespace="test", + deployment="fiber-http", + container="fiber-http", + service="fiber-http", + __optimizer__=servo.configuration.Optimizer( + id="test.com/foo", token="12345" + ), + ) + no_resources_k_config = no_resources_config.generate_kubernetes_config() + assert no_resources_k_config.deployments[0].containers[0].cpu is None + assert no_resources_k_config.deployments[0].containers[0].memory is None + def test_generate_no_tuning_config(self) -> None: no_tuning_config = servo.connectors.opsani_dev.OpsaniDevConfiguration( namespace="test", - rollout="fiber-http", + deployment="fiber-http", container="fiber-http", service="fiber-http", cpu=servo.connectors.kubernetes.CPU(min="125m", max="4000m", step="125m"), diff --git a/tests/kubernetes_test.py b/tests/kubernetes_test.py index f82098ba0..f7abdf8c2 100644 --- a/tests/kubernetes_test.py +++ b/tests/kubernetes_test.py @@ -732,3 +732,33 @@ def test_copying_cpu_with_invalid_value_does_not_raise() -> None: # Use copy + update to hydrate the value cpu_copy = cpu.copy(update={"value": "5"}) assert cpu_copy.value == "5" + + +@pytest.mark.parametrize( + "value, resource_type, expected_autoset", + [ + ( + servo.connectors.kubernetes.Core.parse("1"), + "cpu", + servo.connectors.kubernetes.CPU(min="250m", max="3000m", step="125m"), + ), + ( + servo.connectors.kubernetes.Core.parse("2"), + "cpu", + servo.connectors.kubernetes.CPU(min="500m", max="6000m", step="125m"), + ), + ( + servo.connectors.kubernetes.ShortByteSize.validate("2Gi"), + "memory", + servo.connectors.kubernetes.Memory( + min="512.0MiB", max="6.0GiB", step="128.0MiB" + ), + ), + ], +) +def test_autoset_resource_range(value, resource_type, expected_autoset): + + autoset_value = servo.connectors.kubernetes.autoset_resource_range( + resource_type=resource_type, value=value + ) + assert autoset_value == expected_autoset