diff --git a/.github/workflows/lint-test.yaml b/.github/workflows/lint-test.yaml index c81e5f9c..935b24c2 100644 --- a/.github/workflows/lint-test.yaml +++ b/.github/workflows/lint-test.yaml @@ -75,7 +75,7 @@ jobs: test-unit: name: Tests - unit runs-on: ubuntu-latest - timeout-minutes: 5 + timeout-minutes: 8 strategy: matrix: python_version: [3.12, 3.13] @@ -113,7 +113,7 @@ jobs: test-integration: name: Tests - integration runs-on: ubuntu-latest - timeout-minutes: 6 + timeout-minutes: 10 strategy: matrix: python_version: [3.12, 3.13] @@ -157,7 +157,7 @@ jobs: test-integration-tuner: name: Tests - integration:tuner runs-on: ubuntu-latest - timeout-minutes: 5 + timeout-minutes: 8 strategy: matrix: python_version: [3.12, 3.13] diff --git a/docs/api/schemas/schemas.md b/docs/api/schemas/schemas.md index a4d00963..f4ce0adf 100644 --- a/docs/api/schemas/schemas.md +++ b/docs/api/schemas/schemas.md @@ -1 +1 @@ -::: plugboard.schemas +::: plugboard_schemas diff --git a/docs/examples/tutorials/running-in-parallel.md b/docs/examples/tutorials/running-in-parallel.md index 168731a7..e92ef1ba 100644 --- a/docs/examples/tutorials/running-in-parallel.md +++ b/docs/examples/tutorials/running-in-parallel.md @@ -70,3 +70,68 @@ Specifying the process type and channel builder type in the YAML is the only cha 1. Tell Plugboard to use a [`RayProcess`][plugboard.process.RayProcess] instead of the default [`LocalProcess`][plugboard.process.LocalProcess]. 2. Also change the connector builder to [`RayConnector`][plugboard.connector.RayConnector], which will build [`RayChannel`][plugboard.connector.RayChannel] objects when creating the `Process`. + +## Specifying resource requirements + +When running components on Ray, you can specify resource requirements for each component to control how Ray allocates computational resources. This is particularly useful when you have components with different resource needs (e.g., CPU-intensive vs GPU-intensive tasks) and you are running on a Ray cluster. + +!!! tip + Normally Ray will start automatically when you are using Plugboard locally. If you want to start a separate Ray instance, for example so that you can control the configuration options, you can launch it from the [CLI](https://docs.ray.io/en/latest/ray-core/starting-ray.html). For example, this command will start a Ray instance with enough resources to run the example below. + + ```sh + uv run ray start --head --num-cpus=4 --num-gpus=1 --resources='{"custom_hardware": 5}' + ``` + +### Declaring resources at component definition + +The recommended way to specify resource requirements is to declare them as a class attribute when defining your component. This makes the resource requirements explicit and part of the component's definition: + +```python +from plugboard.component import Component +from plugboard.schemas import Resource + +class CPUIntensiveTask(Component): + """Component that requires more CPU resources.""" + + io = IO(inputs=["x"], outputs=["y"]) + resources = Resource(cpu=2.0) # Declare resources at class level + + async def step(self) -> None: + # Your component logic here + pass +``` + +### Overriding resources at instantiation + +You can also override resource requirements when creating component instances. This is useful when you want to use the same component class with different resource requirements: + +```python +# Override the class-level resource requirements +component = CPUIntensiveTask( + name="my-task", + resources=Resource(cpu=4.0) # Override to use 4 CPUs instead of 2 +) +``` + +### Example + +For example, you can specify [`Resource`][plugboard.schemas.Resource] requirements like this when defining components: + +```python +--8<-- "examples/tutorials/004_using_ray/resources_example.py:resources" +``` + +Or override them in YAML configuration: + +```yaml +--8<-- "examples/tutorials/004_using_ray/resources-example.yaml:10:" +``` + +1. Override DataProducer to require 1 CPU (instead of the default 0.001). +2. CPUIntensiveTask already declares cpu: 2.0 at the class level, so this matches the class definition. +3. Add 512MB memory requirement to CPUIntensiveTask (extending the class-level resources). +4. Requires 0.5 CPU, specified in Kubernetes-style format (500 milli CPUs). This matches the class-level declaration. +5. Requires 1 GPU, matching the class-level declaration. +6. Add a custom resource called `custom_hardware`. This needs to be specified in the configuration of your Ray cluster to make it available. + +See the [Ray documentation](https://docs.ray.io/en/latest/ray-core/scheduling/resources.html) for more information about specifying resource requirements. diff --git a/examples/tutorials/004_using_ray/resources-example.yaml b/examples/tutorials/004_using_ray/resources-example.yaml new file mode 100644 index 00000000..a58aafa9 --- /dev/null +++ b/examples/tutorials/004_using_ray/resources-example.yaml @@ -0,0 +1,47 @@ +# Example YAML configuration with resource requirements +# +# This configuration demonstrates how to specify resource requirements +# for components in a Plugboard process. Resources can be specified as: +# - Numerical values: cpu: 2.0 +# - Milli-units: cpu: "250m" (equals 0.25) +# - Memory units: memory: "100Mi" (equals 100 * 1024 * 1024 bytes) +# +# Note: Resources can be declared at the component class level (recommended) +# or overridden in the YAML configuration as shown below. +plugboard: + process: + type: plugboard.process.RayProcess + connector_builder: + type: plugboard.connector.RayConnector + args: + name: resource-example-process + components: + - type: examples.tutorials.004_using_ray.resources_example.DataProducer + args: + name: producer + iters: 10 + resources: + cpu: 1.0 # (1)! + - type: examples.tutorials.004_using_ray.resources_example.CPUIntensiveTask + args: + name: cpu-task + # CPUIntensiveTask has class-level resources (cpu: 2.0) + # Override to use more memory + resources: + cpu: 2.0 # (2)! + memory: "512Mi" # (3)! + - type: examples.tutorials.004_using_ray.resources_example.GPUTask + args: + name: gpu-task + # GPUTask has class-level resources (cpu: "500m", gpu: 1) + # Can override or extend with custom resources + resources: + cpu: "500m" # (4)! + gpu: 1 # (5)! + resources: + custom_hardware: 2 # (6)! + connectors: + - source: producer.output + target: cpu-task.x + - source: cpu-task.y + target: gpu-task.data diff --git a/examples/tutorials/004_using_ray/resources_example.py b/examples/tutorials/004_using_ray/resources_example.py new file mode 100644 index 00000000..882e2c3d --- /dev/null +++ b/examples/tutorials/004_using_ray/resources_example.py @@ -0,0 +1,95 @@ +"""Example demonstrating resource requirements for components in RayProcess.""" + +import asyncio +import typing as _t + +import ray + +from plugboard.component import Component, IOController as IO +from plugboard.connector import RayConnector +from plugboard.process import RayProcess +from plugboard.schemas import ComponentArgsDict, ConnectorSpec, Resource + + +class CPUIntensiveTask(Component): + """Component that requires more CPU resources. + + Resource requirements are declared as a class attribute. + """ + + io = IO(inputs=["x"], outputs=["y"]) + resources = Resource(cpu=2.0) # Declare resources at class level + + async def step(self) -> None: + """Execute CPU-intensive computation.""" + # Simulate CPU-intensive work + result = sum(i**2 for i in range(int(self.x * 10000))) + self.y = result + + +class GPUTask(Component): + """Component that requires GPU resources. + + Resource requirements are declared as a class attribute. + """ + + io = IO(inputs=["data"], outputs=["result"]) + resources = Resource(cpu="500m", gpu=1) # Declare resources at class level + + async def step(self) -> None: + """Execute GPU computation.""" + # Simulate GPU computation + self.result = self.data * 2 + + +class DataProducer(Component): + """Produces data for processing.""" + + io = IO(outputs=["output"]) + + def __init__(self, iters: int, **kwargs: _t.Unpack[ComponentArgsDict]) -> None: + """Initialize DataProducer with iteration count.""" + super().__init__(**kwargs) + self._iters = iters + + async def init(self) -> None: + """Initialize the data sequence.""" + self._seq = iter(range(self._iters)) + + async def step(self) -> None: + """Produce the next data value.""" + try: + self.output = next(self._seq) + except StopIteration: + await self.io.close() + + +async def main() -> None: + """Run the process with resource-constrained components.""" + # --8<-- [start:resources] + # Resources can be declared at the class level (see CPUIntensiveTask and GPUTask above) + # or overridden when instantiating components + process = RayProcess( + components=[ + DataProducer(name="producer", iters=5), # Uses default resources + CPUIntensiveTask(name="cpu-task"), # Uses class-level resources (2.0 CPU) + GPUTask(name="gpu-task"), # Uses class-level resources (0.5 CPU, 1 GPU) + ], + connectors=[ + RayConnector(spec=ConnectorSpec(source="producer.output", target="cpu-task.x")), + RayConnector(spec=ConnectorSpec(source="cpu-task.y", target="gpu-task.data")), + ], + ) + # --8<-- [end:resources] + + async with process: + await process.run() + + print("Process completed successfully!") + + +if __name__ == "__main__": + if not ray.is_initialized(): + # Ray must be initialised with the necessary resources + ray.init(num_cpus=5, num_gpus=1, resources={"custom_hardware": 10}, include_dashboard=True) + asyncio.run(main()) diff --git a/mkdocs.yaml b/mkdocs.yaml index 2abd14cb..8cefcca5 100644 --- a/mkdocs.yaml +++ b/mkdocs.yaml @@ -33,7 +33,7 @@ plugins: default_handler: python handlers: python: - paths: [src] + paths: [plugboard, plugboard-schemas] options: docstring_style: google show_source: false @@ -113,6 +113,7 @@ watch: - docs - examples - plugboard +- plugboard-schemas - README.md - CONTRIBUTING.md diff --git a/plugboard-schemas/plugboard_schemas/__init__.py b/plugboard-schemas/plugboard_schemas/__init__.py index 0a36efb8..33cd6acc 100644 --- a/plugboard-schemas/plugboard_schemas/__init__.py +++ b/plugboard-schemas/plugboard_schemas/__init__.py @@ -9,7 +9,7 @@ from importlib.metadata import version from ._common import PlugboardBaseModel -from .component import ComponentArgsDict, ComponentArgsSpec, ComponentSpec +from .component import ComponentArgsDict, ComponentArgsSpec, ComponentSpec, Resource from .config import ConfigSpec, ProcessConfigSpec from .connector import ( DEFAULT_CONNECTOR_CLS_PATH, @@ -77,6 +77,7 @@ "ProcessArgsDict", "ProcessArgsSpec", "RAY_STATE_BACKEND_CLS_PATH", + "Resource", "StateBackendSpec", "StateBackendArgsDict", "StateBackendArgsSpec", diff --git a/plugboard-schemas/plugboard_schemas/component.py b/plugboard-schemas/plugboard_schemas/component.py index 74a50899..eeeb889e 100644 --- a/plugboard-schemas/plugboard_schemas/component.py +++ b/plugboard-schemas/plugboard_schemas/component.py @@ -1,12 +1,144 @@ """Provides `ComponentSpec` class.""" +import re import typing as _t -from pydantic import Field +from pydantic import Field, field_validator from ._common import PlugboardBaseModel +RESOURCE_SUFFIXES = { + "n": 1e-9, + "u": 1e-6, + "m": 1e-3, + "k": 1e3, + "M": 1e6, + "G": 1e9, + "T": 1e12, + "P": 1e15, + "E": 1e18, + "Ki": 1024, + "Mi": 1024**2, + "Gi": 1024**3, + "Ti": 1024**4, + "Pi": 1024**5, + "Ei": 1024**6, +} +RAY_RESOURCE_PRECISION = 4 # Decimal places to round to for Ray resource values + + +def _parse_resource_value(value: str | float | int) -> float: + """Parse a resource value from string or number. + + Supports: + - Direct numerical values: 1, 0.5, 2.0 + - Decimal SI prefixes: n, u, m, k, M, G, T, P, E (e.g., "250m" -> 0.25, "5k" -> 5000) + - Binary prefixes: Ki, Mi, Gi, Ti, Pi, Ei (e.g., "10Mi" -> 10485760) + + Args: + value: The resource value to parse. + + Returns: + The parsed float value. + + Raises: + ValueError: If the value format is invalid. + """ + if isinstance(value, (int, float)): + return float(value) + + if not isinstance(value, str): + raise ValueError(f"Resource value must be a string, int, or float: {value}") + + # Handle string values + value = value.strip() + + pattern = r"^(\d+(?:\.\d+)?)([a-zA-Z]+)?$" + match = re.search(pattern, value) + if not match: + raise ValueError(f"Invalid resource value format: {value}") + + multiplier = 1.0 + if (suffix := match.group(2)) is not None: + try: + multiplier = RESOURCE_SUFFIXES[suffix] + except KeyError: + raise ValueError(f"Unknown resource suffix '{suffix}' in value: {value}") + + try: + number = float(number_group := match.group(1)) + except ValueError: + raise ValueError(f"Invalid resource value format: {number_group}") + + return number * multiplier + + +class Resource(PlugboardBaseModel): + """Resource requirements for a component. + + Supports specification of CPU, GPU, memory, and custom resources. + Values can be specified as numbers or strings with units (e.g., "250m" for 0.25, "10Mi" for + 10 * 1024 * 1024). + + Attributes: + cpu: CPU requirement (default: 0.001). + gpu: GPU requirement (default: 0). + memory: Memory requirement in bytes as an integer (default: 0). + resources: Custom resource requirements as a dictionary. + """ + + cpu: float = 0.001 + gpu: float = 0 + memory: int = 0 + resources: dict[str, float] = Field(default_factory=dict) + + @field_validator("cpu", "gpu", mode="before") + @classmethod + def _parse_cpu_gpu_field(cls, v: str | float | int) -> float: + """Validate and parse CPU and GPU fields.""" + return _parse_resource_value(v) + + @field_validator("memory", mode="before") + @classmethod + def _parse_memory_field(cls, v: str | float | int) -> int: + """Validate and parse memory field as integer bytes.""" + return int(_parse_resource_value(v)) + + @field_validator("resources", mode="before") + @classmethod + def _parse_resources_dict(cls, v: dict[str, str | float | int]) -> dict[str, float]: + """Validate and parse custom resources dictionary.""" + return {key: _parse_resource_value(value) for key, value in v.items()} + + def to_ray_options( + self, style: _t.Literal["actor", "placement_group"] = "actor" + ) -> dict[str, _t.Any]: + """Convert resource requirements to Ray actor options. + + Returns: + Dictionary of Ray actor options. + """ + options: dict[str, _t.Any] = {} + + cpu_key = "num_cpus" if style == "actor" else "CPU" + gpu_key = "num_gpus" if style == "actor" else "GPU" + if self.cpu > 0: + options[cpu_key] = round(self.cpu, RAY_RESOURCE_PRECISION) + if self.gpu > 0: + options[gpu_key] = round(self.gpu, RAY_RESOURCE_PRECISION) + if self.memory > 0: + options["memory"] = self.memory + + # Add custom resources + if self.resources: + options["resources"] = { + key: round(value, RAY_RESOURCE_PRECISION) for key, value in self.resources.items() + } + + return options + + class ComponentArgsDict(_t.TypedDict): """`TypedDict` of the [`Component`][plugboard.component.Component] constructor arguments.""" @@ -14,6 +146,7 @@ class ComponentArgsDict(_t.TypedDict): initial_values: _t.NotRequired[dict[str, _t.Any] | None] parameters: _t.NotRequired[dict[str, _t.Any] | None] constraints: _t.NotRequired[dict[str, _t.Any] | None] + resources: _t.NotRequired[Resource | None] class ComponentArgsSpec(PlugboardBaseModel, extra="allow"): @@ -24,12 +157,14 @@ class ComponentArgsSpec(PlugboardBaseModel, extra="allow"): initial_values: Initial values for the `Component`. parameters: Parameters for the `Component`. constraints: Constraints for the `Component`. + resources: Resource requirements for the `Component`. """ name: str = Field(pattern=r"^([a-zA-Z_][a-zA-Z0-9_-]*)$") initial_values: dict[str, _t.Any] = {} parameters: dict[str, _t.Any] = {} constraints: dict[str, _t.Any] = {} + resources: Resource | None = None class ComponentSpec(PlugboardBaseModel): diff --git a/plugboard/component/component.py b/plugboard/component/component.py index 081eefe5..bb2ec58e 100644 --- a/plugboard/component/component.py +++ b/plugboard/component/component.py @@ -20,7 +20,7 @@ UnrecognisedEventError, ValidationError, ) -from plugboard.schemas import Status +from plugboard.schemas import Resource, Status from plugboard.state import StateBackend from plugboard.utils import DI, ClassRegistry, ExportMixin, is_on_ray_worker @@ -41,10 +41,14 @@ class Component(ABC, ExportMixin): io: The `IOController` for the component, specifying inputs, outputs, and events. exports: Optional; The exportable fields from the component during distributed runs in addition to input and output fields. + resources: Resource requirements for the component. Can be declared as a class attribute + to set default resource requirements, which can be overridden in the constructor. + Defaults to `Resource()` (0.001 CPU, 0 GPU, 0 memory). """ io: IO = IO(input_events=[StopEvent], output_events=[StopEvent]) exports: _t.Optional[list[str]] = None + resources: Resource = Resource() _implements_step: bool = False @@ -56,7 +60,19 @@ def __init__( parameters: _t.Optional[dict[str, _t.Any]] = None, state: _t.Optional[StateBackend] = None, constraints: _t.Optional[dict] = None, + resources: _t.Optional[Resource] = None, ) -> None: + """Initialize a Component instance. + + Args: + name: The name of the component. + initial_values: Optional; Initial values for the component's inputs. + parameters: Optional; Parameters for the component. + state: Optional; State backend for the component. + constraints: Optional; Constraints for the component. + resources: Optional; Resource requirements for the component. If not provided, + uses the class-level `resources` attribute as default. + """ self.name = name self._initial_values = initial_values or {} self._constraints = constraints or {} @@ -67,7 +83,7 @@ def __init__( setattr(self, "init", self._handle_init_wrapper()) setattr(self, "step", self._handle_step_wrapper()) - if is_on_ray_worker(): + if is_on_ray_worker(): # pragma: no cover # Required until https://github.com/ray-project/ray/issues/42823 is resolved try: self.__class__._configure_io() @@ -82,6 +98,12 @@ def __init__( namespace=self.name, component=self, ) + self.resources = resources or Resource( + cpu=self.__class__.resources.cpu, + gpu=self.__class__.resources.gpu, + memory=self.__class__.resources.memory, + resources=self.__class__.resources.resources, + ) self._event_producers: dict[str, set[str]] = defaultdict(set) self._status = Status.CREATED self._is_running = False diff --git a/plugboard/process/ray_process.py b/plugboard/process/ray_process.py index aa4592f8..81a3e28f 100644 --- a/plugboard/process/ray_process.py +++ b/plugboard/process/ray_process.py @@ -7,7 +7,7 @@ from plugboard.component.io_controller import IODirection from plugboard.connector import Connector from plugboard.process.process import Process -from plugboard.schemas import Status +from plugboard.schemas import Resource, Status from plugboard.state import RayStateBackend, StateBackend from plugboard.utils import build_actor_wrapper, depends_on_optional, gather_except, gen_rand_str @@ -62,9 +62,18 @@ def _create_component_actor(self, component: Component) -> _t.Any: name = component.id args = component.export()["args"] actor_cls = build_actor_wrapper(component.__class__) - return ray.remote(num_cpus=0, name=name, namespace=self._namespace)( # type: ignore - actor_cls - ).remote(**args) + + # Get resource requirements from component + resources = component.resources + if resources is None: + # Use default resources if not specified + resources = Resource() + + ray_options = resources.to_ray_options() + ray_options["name"] = name + ray_options["namespace"] = self._namespace + + return ray.remote(**ray_options)(actor_cls).remote(**args) # type: ignore async def _update_component_attributes(self) -> None: """Updates attributes on local components from remote actors.""" diff --git a/plugboard/schemas/__init__.py b/plugboard/schemas/__init__.py index 53afa695..427ee410 100644 --- a/plugboard/schemas/__init__.py +++ b/plugboard/schemas/__init__.py @@ -38,6 +38,7 @@ ProcessArgsSpec, ProcessConfigSpec, ProcessSpec, + Resource, StateBackendArgsDict, StateBackendArgsSpec, StateBackendSpec, @@ -77,6 +78,7 @@ "ProcessArgsDict", "ProcessArgsSpec", "RAY_STATE_BACKEND_CLS_PATH", + "Resource", "StateBackendSpec", "StateBackendArgsDict", "StateBackendArgsSpec", diff --git a/plugboard/tune/tune.py b/plugboard/tune/tune.py index 5f001e52..7fce1552 100644 --- a/plugboard/tune/tune.py +++ b/plugboard/tune/tune.py @@ -16,6 +16,7 @@ OptunaSpec, ParameterSpec, ProcessSpec, + Resource, ) from plugboard.utils import DI, run_coro_sync from plugboard.utils.dependencies import depends_on_optional @@ -73,7 +74,7 @@ def __init__( @property def result_grid(self) -> ray.tune.ResultGrid: - """Returns a [`ResultGrid`][ray.tune.ResultGrid] summarising the optimisation results.""" + """Returns a `ResultGrid` summarising the optimisation results.""" if self._result_grid is None: raise ValueError("No result grid available. Run the optimisation job first.") return self._result_grid @@ -212,8 +213,8 @@ def run(self, spec: ProcessSpec) -> ray.tune.Result | list[ray.tune.Result]: spec: The [`ProcessSpec`][plugboard.schemas.ProcessSpec] to optimise. Returns: - Either one or a list of [`Result`][ray.tune.Result] objects containing the best trial - result. Use the `result_grid` property to get full trial results. + Either one or a list of `Result` objects containing the best trial result. Use the + `result_grid` property to get full trial results. """ self._logger.info("Running optimisation job on Ray") spec = spec.model_copy() @@ -221,15 +222,14 @@ def run(self, spec: ProcessSpec) -> ray.tune.Result | list[ray.tune.Result]: # re-register the classes in the worker required_classes = {c.type: ComponentRegistry.get(c.type) for c in spec.args.components} + # Calculate resource requirements from components + placement_bundles = self._calculate_placement_bundles(spec) + # See https://github.com/ray-project/ray/issues/24445 and # https://docs.ray.io/en/latest/tune/api/doc/ray.tune.execution.placement_groups.PlacementGroupFactory.html trainable_with_resources = ray.tune.with_resources( self._build_objective(required_classes, spec), - ray.tune.PlacementGroupFactory( - # Reserve 0.5 CPU for the tune process and 0 CPU for each component in the Process - # TODO: Implement better resource allocation based on Process requirements - [{"CPU": 0.5}], - ), + ray.tune.PlacementGroupFactory(placement_bundles), ) tuner_kwargs: dict[str, _t.Any] = { @@ -322,3 +322,55 @@ def _init_search_algorithm( if max_concurrent is not None: algo = ray.tune.search.ConcurrencyLimiter(algo, max_concurrent) return algo + + def _calculate_placement_bundles(self, spec: ProcessSpec) -> list[dict[str, float]]: + """Calculate placement group bundles from component resource requirements. + + Args: + spec: The ProcessSpec containing component specifications. + + Returns: + List of resource bundles for Ray placement group. + """ + bundles = [] + + if spec.type.endswith("RayProcess"): + # Ray process requires a bundle for the tune process and each component + bundles.append({"CPU": 1.0}) # Bundle for the tune process + for component_spec in spec.args.components: + resources = component_spec.args.resources + if resources is None: + # Use default resources + resources = Resource() + + bundles.append(resources.to_ray_options(style="placement_group")) + else: + # Aggregate resources from all components + total_cpu = 1.0 # Ensure at least 1 CPU for the tune process + total_gpu = 0.0 + total_memory = 0.0 + custom_resources: dict[str, float] = {} + + for component_spec in spec.args.components: + resources = component_spec.args.resources + if resources is None: + # Use default resources + resources = Resource() + + total_cpu += resources.cpu + total_gpu += resources.gpu + total_memory += resources.memory + + # Aggregate custom resources + for key, value in resources.resources.items(): + custom_resources[key] = custom_resources.get(key, 0.0) + value + + resources = Resource( + cpu=total_cpu, + gpu=total_gpu, + memory=total_memory, + resources=custom_resources, + ) + bundles.append(resources.to_ray_options(style="placement_group")) + + return bundles diff --git a/tests/conftest.py b/tests/conftest.py index b8290dd2..de6612f6 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -40,8 +40,11 @@ def mp_set_start_method() -> None: @pytest.fixture(scope="session") def ray_ctx() -> _t.Iterator[None]: - """Initialises and shuts down Ray.""" - ray.init(num_cpus=4, num_gpus=0, include_dashboard=True) + """Initialises and shuts down Ray. + + Includes a small amount of resources to allow testing of resource-constrained components. + """ + ray.init(num_cpus=5, num_gpus=1, resources={"custom_hardware": 10}, include_dashboard=True) yield ray.shutdown() diff --git a/tests/integration/test_component_resources.py b/tests/integration/test_component_resources.py new file mode 100644 index 00000000..10f98e2d --- /dev/null +++ b/tests/integration/test_component_resources.py @@ -0,0 +1,119 @@ +"""Integration tests for Components with resource requirements.""" +# ruff: noqa: D101,D102,D103 + +import typing as _t + +import pytest +from ray.util.state import list_actors + +from plugboard.component import Component, IOController as IO +from plugboard.connector import RayConnector +from plugboard.process import RayProcess +from plugboard.schemas import ConnectorSpec, Resource + + +class ResourceComponent(Component): + """Test component with resource requirements.""" + + io = IO(inputs=["a"], outputs=["b"]) + + def __init__(self, multiplier: int = 1, *args: _t.Any, **kwargs: _t.Any) -> None: + super().__init__(*args, **kwargs) + self._multiplier = multiplier + + async def step(self) -> None: + if self.a is not None: + self.b = self.a * self._multiplier + await self.io.close() + + +class DeclaredResourceComponent(Component): + """Test component with declared class-level resource requirements.""" + + io = IO(inputs=["a"], outputs=["b"]) + resources = Resource(cpu="1", memory="512Mi") + + async def step(self) -> None: + if self.a is not None: + self.b = self.a * 2 + await self.io.close() + + +@pytest.mark.asyncio +async def test_component_with_resources() -> None: + """Test that a component can be created with resource requirements.""" + resources = Resource(cpu="500m", gpu=1, memory="10Mi") + component = ResourceComponent(name="test", resources=resources) + + assert component.resources == resources + assert component.resources.cpu == 0.5 + assert component.resources.gpu == 1.0 + assert component.resources.memory == 10 * 1024 * 1024 + + +@pytest.mark.asyncio +async def test_component_with_default_resources() -> None: + """Test that a component uses default resources when none specified.""" + component = ResourceComponent(name="test") + default_resources = Resource() + assert component.resources == default_resources + + +@pytest.mark.asyncio +async def test_component_with_declared_resources() -> None: + """Test that a component with class-level declared resources has those resources.""" + component = DeclaredResourceComponent(name="test") + expected_resources = Resource(cpu=1.0, memory=512 * 1024 * 1024) + assert component.resources == expected_resources + + +@pytest.mark.asyncio +async def test_component_with_declared_resources_overrides() -> None: + """Test that a component with class-level declared resources can be overridden by constructor.""" # noqa: E501,W505 + resources = Resource(cpu="250m", gpu=0.5, memory="5Gi") + component_1 = DeclaredResourceComponent(name="test", resources=resources) + # Confirm that the constructor resources override the class-level resources + assert component_1.resources == resources + # Confirm that the class-level resources are still the same for a new instances + component_2 = DeclaredResourceComponent(name="test") + expected_resources = Resource(cpu=1.0, memory=512 * 1024 * 1024) + assert component_2.resources == expected_resources + + +@pytest.mark.asyncio +async def test_component_resources_in_ray_process(ray_ctx: None) -> None: + """Test that components with resources work in RayProcess.""" + resources = Resource(cpu=1.0, memory="1Mi") + component = ResourceComponent( + name="test", + resources=resources, + multiplier=2, + initial_values={"a": [5]}, + ) + connectors = [RayConnector(spec=ConnectorSpec(source="test.b", target="test.a"))] + + process = RayProcess(components=[component], connectors=connectors) + + async with process: + actors = list_actors(detail=True) + component_actor = next(a for a in actors if a.name == "test") + # Verify the component actor has the correct resources + assert component_actor.required_resources["CPU"] == 1.0 + assert component_actor.required_resources["memory"] == 1.0 * 1024 * 1024 + await process.run() + + assert component.b == 10 + assert component.resources is not None + assert component.resources.cpu == 1.0 + + +@pytest.mark.asyncio +async def test_component_export_includes_resources() -> None: + """Test that component export includes resource requirements.""" + resources = Resource(cpu="250m", gpu=0.5, memory="5Gi") + component = ResourceComponent(name="test", resources=resources) + + exported = component.export() + assert "args" in exported + # The resources should be passed through in the exported args + assert exported["args"]["resources"] == resources.model_dump() diff --git a/tests/unit/test_ray_resources.py b/tests/unit/test_ray_resources.py new file mode 100644 index 00000000..2c391582 --- /dev/null +++ b/tests/unit/test_ray_resources.py @@ -0,0 +1,62 @@ +"""Unit tests for RayProcess resource handling.""" +# ruff: noqa: D101,D102,D103 + +from plugboard.component import Component, IOController as IO +from plugboard.schemas import Resource + + +class SimpleComponent(Component): + """Simple test component.""" + + io = IO(inputs=["a"], outputs=["b"]) + + +def test_component_with_resources_to_ray_options() -> None: + """Test that component resources are correctly converted to Ray options.""" + resources = Resource(cpu="500m", gpu=1, memory="10Mi", resources={"custom_gpu": 2}) + + component = SimpleComponent(name="test", resources=resources) + + # Verify resources are stored + assert component.resources is not None + assert component.resources.cpu == 0.5 + assert component.resources.gpu == 1.0 + assert component.resources.memory == 10 * 1024 * 1024 + assert component.resources.resources == {"custom_gpu": 2.0} + + # Verify Ray options conversion + ray_options = component.resources.to_ray_options() + assert ray_options == { + "num_cpus": 0.5, + "num_gpus": 1.0, + "memory": 10485760.0, + "resources": {"custom_gpu": 2.0}, + } + + +def test_component_default_resources_to_ray_options() -> None: + """Test that default resources are correctly converted to Ray options.""" + # Component without resources should use defaults when creating Ray options + default_resources = Resource() + + ray_options = default_resources.to_ray_options() + assert ray_options == {"num_cpus": 0.001} + assert "num_gpus" not in ray_options + assert "memory" not in ray_options + + +def test_component_with_zero_resources() -> None: + """Test that zero resources are handled correctly.""" + resources = Resource(cpu=0, gpu=0, memory=0) + + component = SimpleComponent(name="test", resources=resources) + + # Zero values should be stored + assert component.resources is not None + assert component.resources.cpu == 0 + assert component.resources.gpu == 0 + assert component.resources.memory == 0 + + # But excluded from Ray options + ray_options = component.resources.to_ray_options() + assert ray_options == {} diff --git a/tests/unit/test_resource.py b/tests/unit/test_resource.py new file mode 100644 index 00000000..c20e02da --- /dev/null +++ b/tests/unit/test_resource.py @@ -0,0 +1,123 @@ +"""Unit tests for Resource class.""" +# ruff: noqa: D101,D102,D103 + +from pydantic import ValidationError +import pytest + +from plugboard.schemas import Resource + + +def test_resource_default_values() -> None: + """Test Resource default values.""" + resource = Resource() + assert resource.cpu == 0.001 + assert resource.gpu == 0 + assert resource.memory == 0 + assert resource.resources == {} + + +def test_resource_numeric_values() -> None: + """Test Resource with numeric values.""" + resource = Resource(cpu=2.5, gpu=1, memory=1024) + assert resource.cpu == 2.5 + assert resource.gpu == 1.0 + assert resource.memory == 1024.0 + + +def test_resource_string_milli_units() -> None: + """Test Resource with milli-unit string values.""" + resource = Resource(cpu="250m", gpu="500m") + assert resource.cpu == 0.25 + assert resource.gpu == 0.5 + + +def test_resource_string_memory_units() -> None: + """Test Resource with memory unit string values.""" + resource = Resource(memory="10Mi") + assert resource.memory == 10 * 1024 * 1024 + + resource = Resource(memory="5Gi") + assert resource.memory == 5 * 1024 * 1024 * 1024 + + resource = Resource(memory="2Ki") + assert resource.memory == 2 * 1024 + + resource = Resource(memory="1Ti") + assert resource.memory == 1 * 1024 * 1024 * 1024 * 1024 + + +def test_resource_string_plain_number() -> None: + """Test Resource with plain number string values.""" + resource = Resource(cpu="2.5", gpu="1") + assert resource.cpu == 2.5 + assert resource.gpu == 1.0 + + +def test_resource_custom_resources() -> None: + """Test Resource with custom resources dictionary.""" + resource = Resource(resources={"custom_gpu": 2, "special_hardware": "500m"}) + assert resource.resources == {"custom_gpu": 2.0, "special_hardware": 0.5} + + +def test_resource_invalid_milli_format() -> None: + """Test Resource with invalid milli-unit format.""" + with pytest.raises(ValidationError): + Resource(cpu="250x") + + +def test_resource_invalid_memory_format() -> None: + """Test Resource with invalid memory format.""" + with pytest.raises(ValidationError): + Resource(memory="10Xi") + + +def test_resource_invalid_string_format() -> None: + """Test Resource with invalid string format.""" + with pytest.raises(ValidationError): + Resource(cpu="invalid") + + +def test_resource_to_ray_options_default() -> None: + """Test conversion to Ray options with default values.""" + resource = Resource() + options = resource.to_ray_options() + assert options == {"num_cpus": 0.001} + + +def test_resource_to_ray_options_all_fields() -> None: + """Test conversion to Ray options with all fields set.""" + resource = Resource(cpu=2, gpu=1, memory=1024, resources={"custom": 5}) + options = resource.to_ray_options() + assert options == { + "num_cpus": 2.0, + "num_gpus": 1.0, + "memory": 1024.0, + "resources": {"custom": 5.0}, + } + + +def test_resource_to_ray_options_only_cpu() -> None: + """Test conversion to Ray options with only CPU.""" + resource = Resource(cpu="500m") + options = resource.to_ray_options() + assert options == {"num_cpus": 0.5} + + +def test_resource_to_ray_options_with_strings() -> None: + """Test conversion to Ray options with string inputs.""" + resource = Resource(cpu="250m", memory="10Mi", resources={"custom": "100m"}) + options = resource.to_ray_options() + assert options == { + "num_cpus": 0.25, + "memory": 10 * 1024 * 1024, + "resources": {"custom": 0.1}, + } + + +def test_resource_to_ray_options_zero_values_excluded() -> None: + """Test that zero values are excluded from Ray options (except CPU).""" + resource = Resource(cpu=1, gpu=0, memory=0) + options = resource.to_ray_options() + assert options == {"num_cpus": 1.0} + assert "num_gpus" not in options + assert "memory" not in options diff --git a/tests/unit/test_tuner_resources.py b/tests/unit/test_tuner_resources.py new file mode 100644 index 00000000..7e6355d4 --- /dev/null +++ b/tests/unit/test_tuner_resources.py @@ -0,0 +1,95 @@ +"""Unit tests for Tuner resource placement calculations.""" +# ruff: noqa: D101,D102,D103 + +from plugboard_schemas import ( + ComponentArgsSpec, + ComponentSpec, + ProcessArgsSpec, + ProcessSpec, + Resource, +) + + +def test_process_spec_with_component_resources() -> None: + """Test ProcessSpec can include component resources.""" + component_spec = ComponentSpec( + type="test.Component", + args=ComponentArgsSpec( + name="test_component", + resources=Resource(cpu="500m", gpu=1, memory="100Mi"), + ), + ) + + process_spec = ProcessSpec( + type="plugboard.process.LocalProcess", + args=ProcessArgsSpec( + name="test_process", + components=[component_spec], + connectors=[], + ), + ) + + # Verify the resource is properly nested in the spec + assert process_spec.args.components[0].args.resources is not None + assert process_spec.args.components[0].args.resources.cpu == 0.5 + assert process_spec.args.components[0].args.resources.gpu == 1.0 + assert process_spec.args.components[0].args.resources.memory == 100 * 1024 * 1024 + + +def test_process_spec_with_multiple_component_resources() -> None: + """Test ProcessSpec with multiple components having resources.""" + component1 = ComponentSpec( + type="test.Component1", + args=ComponentArgsSpec( + name="comp1", + resources=Resource(cpu=1.0, gpu=0.5), + ), + ) + + component2 = ComponentSpec( + type="test.Component2", + args=ComponentArgsSpec( + name="comp2", + resources=Resource(cpu=2.0, memory="50Mi"), + ), + ) + + process_spec = ProcessSpec( + type="plugboard.process.LocalProcess", + args=ProcessArgsSpec( + name="test_process", + components=[component1, component2], + connectors=[], + ), + ) + + # Verify both components have resources + assert process_spec.args.components[0].args.resources is not None + assert process_spec.args.components[0].args.resources.cpu == 1.0 + assert process_spec.args.components[0].args.resources.gpu == 0.5 + assert process_spec.args.components[1].args.resources is not None + assert process_spec.args.components[1].args.resources.cpu == 2.0 + assert process_spec.args.components[1].args.resources.memory == 50 * 1024 * 1024 + + +def test_process_spec_with_default_component_resources() -> None: + """Test ProcessSpec with component using default resources.""" + component_spec = ComponentSpec( + type="test.Component", + args=ComponentArgsSpec( + name="test_component", + # No resources specified, should use defaults + ), + ) + + process_spec = ProcessSpec( + type="plugboard.process.LocalProcess", + args=ProcessArgsSpec( + name="test_process", + components=[component_spec], + connectors=[], + ), + ) + + # Resources should be None when not specified + assert process_spec.args.components[0].args.resources is None