Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 115 additions & 31 deletions servo/connectors/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -3637,6 +3637,7 @@ async def create(
name = container_config.alias or (
f"{deployment.name}/{container.name}" if container else deployment.name
)

return cls(
name=name,
deployment_config=config,
Expand Down Expand Up @@ -3970,6 +3971,7 @@ async def create(
main_container = await deployment_or_rollout.get_target_container(
container_config
)

name = (
deployment_or_rollout_config.strategy.alias
if isinstance(
Expand Down Expand Up @@ -4926,8 +4928,10 @@ class ContainerConfiguration(servo.BaseConfiguration):
name: ContainerTagName
alias: Optional[ContainerTagName]
command: Optional[str] # TODO: create model...
cpu: CPU
memory: Memory
cpu: Optional[CPU]
cpu_autoset_multiplier: pydantic.conlist(float, min_items=2, max_items=2)
memory: Optional[Memory]
memory_autoset_multiplier: pydantic.conlist(float, min_items=2, max_items=2)
env: Optional[servo.EnvironmentSettingList]
static_environment_variables: Optional[Dict[str, str]]

Expand Down Expand Up @@ -5312,16 +5316,17 @@ async def _check_container_resource_requirements(
for resource in Resource.values():
current_state = None
container_requirements = container.get_resource_requirements(resource)
get_requirements = getattr(cont_config, resource).get
for requirement in get_requirements:
current_state = container_requirements.get(requirement)
if current_state:
break
if getattr(cont_config, resource) is not None:
get_requirements = getattr(cont_config, resource).get
for requirement in get_requirements:
current_state = container_requirements.get(requirement)
if current_state:
break

assert current_state, (
f"{type(target_controller).__name__} {target_config.name} target container {cont_config.name} spec does not define the resource {resource}. "
f"At least one of the following must be specified: {', '.join(map(lambda req: req.resources_key, get_requirements))}"
)
assert current_state, (
f"{type(target_controller).__name__} {target_config.name} target container {cont_config.name} spec does not define the resource {resource}. "
f"At least one of the following must be specified: {', '.join(map(lambda req: req.resources_key, get_requirements))}"
)

@servo.multicheck(
'Containers in the "{item.name}" Deployment have resource requirements'
Expand Down Expand Up @@ -5384,6 +5389,52 @@ async def check_rollout(rol_config: RolloutConfiguration) -> None:
class KubernetesConnector(servo.BaseConnector):
config: KubernetesConfiguration

@servo.on_event()
async def startup(self) -> None:

# Autoset CPU and memory range based on resources
for deployment_or_rollout_config in (self.config.deployments or []) + (
self.config.rollouts or []
):
read_args = (
deployment_or_rollout_config.name,
cast(str, deployment_or_rollout_config.namespace),
)
deployment_or_rollout = await Deployment.read(*read_args)
container_config = deployment_or_rollout_config.containers[0]
main_container = await deployment_or_rollout.get_target_container(
container_config
)

if not container_config.cpu:
cpu_resources = main_container.get_resource_requirements("cpu")
# Set requests = limits if not specified
if (cpu_request := cpu_resources[ResourceRequirement.request]) is None:
cpu_request = cpu_resources[ResourceRequirement.limit]

cpu_resource = Core.parse(cpu_request).__opsani_repr__()
cpu_autoset = autoset_resource_range(
"cpu",
value=cpu_resource,
multiplier=container_config.cpu_autoset_multiplier,
)
container_config.cpu = cpu_autoset

if not container_config.memory:
memory_resources = main_container.get_resource_requirements("memory")
if (
memory_request := memory_resources[ResourceRequirement.request]
) is None:
memory_request = memory_resources[ResourceRequirement.limit]

memory_resource = ShortByteSize.validate(memory_request)
memory_autoset = autoset_resource_range(
"memory",
value=memory_resource,
multiplier=container_config.memory_autoset_multiplier,
)
container_config.memory = memory_autoset

@servo.on_event()
async def attach(self, servo_: servo.Servo) -> None:
# Ensure we are ready to talk to Kubernetes API
Expand Down Expand Up @@ -5762,27 +5813,60 @@ def set_container_resource_defaults_from_config(
container: Container, config: ContainerConfiguration
) -> None:
for resource in Resource.values():
# NOTE: cpu/memory stanza in container config
resource_config = getattr(config, resource)
requirements = container.get_resource_requirements(resource)
servo.logger.debug(
f"Loaded resource requirements for '{resource}': {requirements}"
# NOTE: cpu/memory stanza in container config (if set)
if (resource_config := getattr(config, resource)) is not None:
requirements = container.get_resource_requirements(resource)
servo.logger.debug(
f"Loaded resource requirements for '{resource}': {requirements}"
)
for requirement in ResourceRequirement:
# Use the request/limit from the container.[cpu|memory].[request|limit] as default/override
if resource_value := getattr(resource_config, requirement.name):
if (
existing_resource_value := requirements.get(requirement)
) is None:
servo.logger.debug(
f"Setting default value for {resource}.{requirement} to: {resource_value}"
)
else:
servo.logger.debug(
f"Overriding existing value for {resource}.{requirement} ({existing_resource_value}) to: {resource_value}"
)

requirements[requirement] = resource_value

servo.logger.debug(
f"Setting resource requirements for '{resource}' to: {requirements}"
)
container.set_resource_requirements(resource, requirements)


def autoset_resource_range(
resource_type: Resource, value: float, multiplier: list[float]
) -> Union[CPU, Memory]:

min_multiplier = multiplier[0]
max_multiplier = multiplier[1]

servo.logger.trace(f"Retrieved {resource_type} defined resource: {value}")

resource_min = value / min_multiplier
resource_max = value * max_multiplier

if resource_type == Resource.cpu:

resource_autoset = CPU(
min=Core(resource_min), max=Core(resource_max), step="125m"
)
for requirement in ResourceRequirement:
# Use the request/limit from the container.[cpu|memory].[request|limit] as default/override
if resource_value := getattr(resource_config, requirement.name):
if (existing_resource_value := requirements.get(requirement)) is None:
servo.logger.debug(
f"Setting default value for {resource}.{requirement} to: {resource_value}"
)
else:
servo.logger.debug(
f"Overriding existing value for {resource}.{requirement} ({existing_resource_value}) to: {resource_value}"
)

requirements[requirement] = resource_value
elif resource_type == Resource.memory:

servo.logger.debug(
f"Setting resource requirements for '{resource}' to: {requirements}"
resource_autoset = Memory(
min=ShortByteSize.validate(str(resource_min)),
max=ShortByteSize.validate(str(resource_max)),
step="128 MiB",
)
container.set_resource_requirements(resource, requirements)

servo.logger.info(f"Autosetting {resource_type} range to: {resource_autoset}")

return resource_autoset
137 changes: 83 additions & 54 deletions servo/connectors/opsani_dev.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,13 @@ class OpsaniDevConfiguration(servo.BaseConfiguration):
container: str
service: str
port: Optional[Union[pydantic.StrictInt, str]] = None
cpu: CPU
memory: Memory
cpu: Optional[CPU]
cpu_autoset_multiplier: pydantic.conlist(float, min_items=2, max_items=2) = [4, 3]
memory: Optional[Memory]
memory_autoset_multiplier: pydantic.conlist(float, min_items=2, max_items=2) = [
4,
3,
]
env: Optional[servo.EnvironmentSettingList]
static_environment_variables: Optional[Dict[str, str]]
prometheus_base_url: str = PROMETHEUS_SIDECAR_BASE_URL
Expand Down Expand Up @@ -116,18 +121,28 @@ def generate_kubernetes_config(
] = servo.connectors.kubernetes.DefaultOptimizationStrategyConfiguration()

if self.create_tuning_pod:

strategy = (
servo.connectors.kubernetes.CanaryOptimizationStrategyConfiguration(
type=servo.connectors.kubernetes.OptimizationStrategy.canary,
alias="tuning",
)
)

replicas = servo.Replicas(min=0, max=1, pinned=True)
replicas = servo.Replicas(
min=0, max=1, pinned=True # NOTE always pinned for now
)

else:
# NOTE: currently assuming we NEVER want to adjust the main deployment with the opsani_dev connector
# TODO: Do we ever need to support opsani dev bootstrapping of non-canary adjusted optimization of deployments?

# Just load defaults when create_tuning_pod is False - these values aren't used and just hold the pinned setting
if not self.cpu:
self.cpu = CPU(min="250m", max="3000m")
if not self.memory:
self.memory = Memory(min="256 MiB", max="3.0 GiB")
Comment on lines +141 to +144
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why have these famous random values versus just raising?


self.cpu.pinned = True
self.memory.pinned = True

Expand All @@ -142,7 +157,9 @@ def generate_kubernetes_config(
name=self.container,
alias="main",
cpu=self.cpu,
cpu_autoset_multiplier=self.cpu_autoset_multiplier,
memory=self.memory,
memory_autoset_multiplier=self.memory_autoset_multiplier,
static_environment_variables=self.static_environment_variables,
env=self.env,
)
Expand Down Expand Up @@ -434,16 +451,18 @@ async def check_resource_requirements(self) -> None:
for resource in servo.connectors.kubernetes.Resource.values():
current_state = None
container_requirements = container.get_resource_requirements(resource)
get_requirements = getattr(self.config, resource).get
for requirement in get_requirements:
current_state = container_requirements.get(requirement)
if current_state:
break

assert current_state, (
f"{self.controller_type_name} {self.config_controller_name} target container {self.config.container} spec does not define the resource {resource}. "
f"At least one of the following must be specified: {', '.join(map(lambda req: req.resources_key, get_requirements))}"
)
if getattr(self.config, resource) is not None:
get_requirements = getattr(self.config, resource).get
for requirement in get_requirements:
current_state = container_requirements.get(requirement)
if current_state:
break

assert current_state, (
f"{self.controller_type_name} {self.config_controller_name} target container {self.config.container} spec does not define the resource {resource}. "
f"At least one of the following must be specified: {', '.join(map(lambda req: req.resources_key, get_requirements))}"
)

@servo.checks.require("Target container resources fall within optimization range")
async def check_target_container_resources_within_limits(self) -> None:
Expand Down Expand Up @@ -471,53 +490,63 @@ async def check_target_container_resources_within_limits(self) -> None:

# Get resource requirements from container
# TODO: This needs to reuse the logic from CanaryOptimization class (tuning_cpu, tuning_memory, etc properties)
cpu_resource_requirements = target_container.get_resource_requirements("cpu")
cpu_resource_value = cpu_resource_requirements.get(
next(
filter(
lambda r: cpu_resource_requirements[r] is not None,
self.config.cpu.get,
),
None,
if self.config.cpu is not None:
cpu_resource_requirements = target_container.get_resource_requirements(
"cpu"
)
cpu_resource_value = cpu_resource_requirements.get(
next(
filter(
lambda r: cpu_resource_requirements[r] is not None,
self.config.cpu.get,
),
None,
)
)
container_cpu_value = servo.connectors.kubernetes.Core.parse(
cpu_resource_value
)
)
container_cpu_value = servo.connectors.kubernetes.Core.parse(cpu_resource_value)

memory_resource_requirements = target_container.get_resource_requirements(
"memory"
)
memory_resource_value = memory_resource_requirements.get(
next(
filter(
lambda r: memory_resource_requirements[r] is not None,
self.config.memory.get,
),
None,
# Get config values
config_cpu_min = self.config.cpu.min
config_cpu_max = self.config.cpu.max

# Check values against config.
assert (
container_cpu_value >= config_cpu_min
), f"target container CPU value {container_cpu_value.human_readable()} must be greater than optimizable minimum {config_cpu_min.human_readable()}"
assert (
container_cpu_value <= config_cpu_max
), f"target container CPU value {container_cpu_value.human_readable()} must be less than optimizable maximum {config_cpu_max.human_readable()}"

if self.config.memory is not None:
memory_resource_requirements = target_container.get_resource_requirements(
"memory"
)
memory_resource_value = memory_resource_requirements.get(
next(
filter(
lambda r: memory_resource_requirements[r] is not None,
self.config.memory.get,
),
None,
)
)
container_memory_value = servo.connectors.kubernetes.ShortByteSize.validate(
memory_resource_value
)
)
container_memory_value = servo.connectors.kubernetes.ShortByteSize.validate(
memory_resource_value
)

# Get config values
config_cpu_min = self.config.cpu.min
config_cpu_max = self.config.cpu.max
config_memory_min = self.config.memory.min
config_memory_max = self.config.memory.max
# Get config values
config_memory_min = self.config.memory.min
config_memory_max = self.config.memory.max

# Check values against config.
assert (
container_cpu_value >= config_cpu_min
), f"target container CPU value {container_cpu_value.human_readable()} must be greater than optimizable minimum {config_cpu_min.human_readable()}"
assert (
container_cpu_value <= config_cpu_max
), f"target container CPU value {container_cpu_value.human_readable()} must be less than optimizable maximum {config_cpu_max.human_readable()}"
assert (
container_memory_value >= config_memory_min
), f"target container Memory value {container_memory_value.human_readable()} must be greater than optimizable minimum {config_memory_min.human_readable()}"
assert (
container_memory_value <= config_memory_max
), f"target container Memory value {container_memory_value.human_readable()} must be less than optimizable maximum {config_memory_max.human_readable()}"
# Check values against config.
assert (
container_memory_value >= config_memory_min
), f"target container Memory value {container_memory_value.human_readable()} must be greater than optimizable minimum {config_memory_min.human_readable()}"
assert (
container_memory_value <= config_memory_max
), f"target container Memory value {container_memory_value.human_readable()} must be less than optimizable maximum {config_memory_max.human_readable()}"

@servo.require(
'{self.controller_type_name} "{self.config_controller_name}" is ready'
Expand Down
Loading