From 4530341e18d4ab7f6e06353eb5e9cda08c512bbe Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 28 Jan 2026 19:44:15 +0000 Subject: [PATCH 01/28] Initial plan From d529dee13cc51004d038a62101565f42648e7a86 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 28 Jan 2026 19:49:46 +0000 Subject: [PATCH 02/28] Add Resource class and integrate with Component, RayProcess, and Tuner Co-authored-by: toby-coleman <13170610+toby-coleman@users.noreply.github.com> --- .../plugboard_schemas/__init__.py | 3 +- .../plugboard_schemas/component.py | 113 +++++++++++++++- plugboard/component/component.py | 10 ++ plugboard/process/ray_process.py | 17 ++- plugboard/schemas/__init__.py | 2 + plugboard/tune/tune.py | 58 ++++++++- tests/unit/test_resource.py | 123 ++++++++++++++++++ 7 files changed, 316 insertions(+), 10 deletions(-) create mode 100644 tests/unit/test_resource.py diff --git a/plugboard-schemas/plugboard_schemas/__init__.py b/plugboard-schemas/plugboard_schemas/__init__.py index d4fd1918..59a6b35d 100644 --- a/plugboard-schemas/plugboard_schemas/__init__.py +++ b/plugboard-schemas/plugboard_schemas/__init__.py @@ -9,7 +9,7 @@ from importlib.metadata import version from ._common import PlugboardBaseModel -from .component import ComponentArgsDict, ComponentArgsSpec, ComponentSpec +from .component import ComponentArgsDict, ComponentArgsSpec, ComponentSpec, Resource from .config import ConfigSpec, ProcessConfigSpec from .connector import ( DEFAULT_CONNECTOR_CLS_PATH, @@ -68,6 +68,7 @@ "ProcessSpec", "ProcessArgsDict", "ProcessArgsSpec", + "Resource", "StateBackendSpec", "StateBackendArgsDict", "StateBackendArgsSpec", diff --git a/plugboard-schemas/plugboard_schemas/component.py b/plugboard-schemas/plugboard_schemas/component.py index 74a50899..19d9bf31 100644 --- a/plugboard-schemas/plugboard_schemas/component.py +++ b/plugboard-schemas/plugboard_schemas/component.py @@ -1,12 +1,120 @@ """Provides `ComponentSpec` class.""" +import re import typing as _t -from pydantic import Field +from pydantic import Field, field_validator from ._common import PlugboardBaseModel +def _parse_resource_value(value: str | float | int) -> float: + """Parse a resource value from string or number. + + Supports: + - Direct numerical values: 1, 0.5, 2.0 + - Milli-units: "250m" -> 0.25 + - Memory units: "10Mi" -> 10485760 (10 * 1024 * 1024) + - Memory units: "10Gi" -> 10737418240 (10 * 1024 * 1024 * 1024) + + Args: + value: The resource value to parse. + + Returns: + The parsed float value. + + Raises: + ValueError: If the value format is invalid. + """ + if isinstance(value, (int, float)): + return float(value) + + # Handle string values + value = value.strip() + + # Handle milli-units (e.g., "250m" -> 0.25) + if value.endswith("m"): + match = re.match(r"^(\d+(?:\.\d+)?)m$", value) + if match: + return float(match.group(1)) / 1000.0 + raise ValueError(f"Invalid milli-unit format: {value}") + + # Handle memory units + # Ki = 1024, Mi = 1024^2, Gi = 1024^3, Ti = 1024^4 + memory_units = { + "Ki": 1024, + "Mi": 1024**2, + "Gi": 1024**3, + "Ti": 1024**4, + } + + for suffix, multiplier in memory_units.items(): + if value.endswith(suffix): + match = re.match(rf"^(\d+(?:\.\d+)?){suffix}$", value) + if match: + return float(match.group(1)) * multiplier + raise ValueError(f"Invalid memory unit format: {value}") + + # Try to parse as a plain number + try: + return float(value) + except ValueError: + raise ValueError(f"Invalid resource value format: {value}") + + +class Resource(PlugboardBaseModel): + """Resource requirements for a component. + + Supports specification of CPU, GPU, memory, and custom resources. + Values can be specified as numbers or strings with units (e.g., "250m" for 0.25, "10Mi" for + 10 * 1024 * 1024). + + Attributes: + cpu: CPU requirement (default: 0.001). + gpu: GPU requirement (default: 0). + memory: Memory requirement in bytes (default: 0). + resources: Custom resource requirements as a dictionary. + """ + + cpu: str | float | int = 0.001 + gpu: str | float | int = 0 + memory: str | float | int = 0 + resources: dict[str, str | float | int] = Field(default_factory=dict) + + @field_validator("cpu", "gpu", "memory", mode="before") + @classmethod + def _parse_resource_field(cls, v: str | float | int) -> float: + """Validate and parse resource fields.""" + return _parse_resource_value(v) + + @field_validator("resources", mode="before") + @classmethod + def _parse_resources_dict(cls, v: dict[str, str | float | int]) -> dict[str, float]: + """Validate and parse custom resources dictionary.""" + return {key: _parse_resource_value(value) for key, value in v.items()} + + def to_ray_options(self) -> dict[str, _t.Any]: + """Convert resource requirements to Ray actor options. + + Returns: + Dictionary of Ray actor options. + """ + options = {} + + if self.cpu > 0: + options["num_cpus"] = self.cpu + if self.gpu > 0: + options["num_gpus"] = self.gpu + if self.memory > 0: + options["memory"] = self.memory + + # Add custom resources + if self.resources: + options["resources"] = self.resources + + return options + + class ComponentArgsDict(_t.TypedDict): """`TypedDict` of the [`Component`][plugboard.component.Component] constructor arguments.""" @@ -14,6 +122,7 @@ class ComponentArgsDict(_t.TypedDict): initial_values: _t.NotRequired[dict[str, _t.Any] | None] parameters: _t.NotRequired[dict[str, _t.Any] | None] constraints: _t.NotRequired[dict[str, _t.Any] | None] + resources: _t.NotRequired["Resource | None"] class ComponentArgsSpec(PlugboardBaseModel, extra="allow"): @@ -24,12 +133,14 @@ class ComponentArgsSpec(PlugboardBaseModel, extra="allow"): initial_values: Initial values for the `Component`. parameters: Parameters for the `Component`. constraints: Constraints for the `Component`. + resources: Resource requirements for the `Component`. """ name: str = Field(pattern=r"^([a-zA-Z_][a-zA-Z0-9_-]*)$") initial_values: dict[str, _t.Any] = {} parameters: dict[str, _t.Any] = {} constraints: dict[str, _t.Any] = {} + resources: "Resource | None" = None class ComponentSpec(PlugboardBaseModel): diff --git a/plugboard/component/component.py b/plugboard/component/component.py index 081eefe5..05320644 100644 --- a/plugboard/component/component.py +++ b/plugboard/component/component.py @@ -24,6 +24,9 @@ from plugboard.state import StateBackend from plugboard.utils import DI, ClassRegistry, ExportMixin, is_on_ray_worker +if _t.TYPE_CHECKING: + from plugboard.schemas import Resource + _io_key_in: str = str(IODirection.INPUT) _io_key_out: str = str(IODirection.OUTPUT) @@ -56,6 +59,7 @@ def __init__( parameters: _t.Optional[dict[str, _t.Any]] = None, state: _t.Optional[StateBackend] = None, constraints: _t.Optional[dict] = None, + resources: _t.Optional["Resource"] = None, ) -> None: self.name = name self._initial_values = initial_values or {} @@ -63,6 +67,7 @@ def __init__( self._parameters = parameters or {} self._state: _t.Optional[StateBackend] = state self._state_is_connected = False + self._resources = resources setattr(self, "init", self._handle_init_wrapper()) setattr(self, "step", self._handle_step_wrapper()) @@ -120,6 +125,11 @@ def parameters(self) -> dict[str, _t.Any]: """Gets the parameters of the component.""" return self._parameters + @property + def resources(self) -> _t.Optional["Resource"]: + """Gets the resource requirements of the component.""" + return self._resources + @classmethod def _configure_io(cls) -> None: # Get all parent classes that are Component subclasses diff --git a/plugboard/process/ray_process.py b/plugboard/process/ray_process.py index aa4592f8..da17b131 100644 --- a/plugboard/process/ray_process.py +++ b/plugboard/process/ray_process.py @@ -62,9 +62,20 @@ def _create_component_actor(self, component: Component) -> _t.Any: name = component.id args = component.export()["args"] actor_cls = build_actor_wrapper(component.__class__) - return ray.remote(num_cpus=0, name=name, namespace=self._namespace)( # type: ignore - actor_cls - ).remote(**args) + + # Get resource requirements from component + from plugboard.schemas import Resource + + resources = component.resources + if resources is None: + # Use default resources if not specified + resources = Resource() + + ray_options = resources.to_ray_options() + ray_options["name"] = name + ray_options["namespace"] = self._namespace + + return ray.remote(**ray_options)(actor_cls).remote(**args) # type: ignore async def _update_component_attributes(self) -> None: """Updates attributes on local components from remote actors.""" diff --git a/plugboard/schemas/__init__.py b/plugboard/schemas/__init__.py index 07458862..297972a4 100644 --- a/plugboard/schemas/__init__.py +++ b/plugboard/schemas/__init__.py @@ -36,6 +36,7 @@ ProcessArgsSpec, ProcessConfigSpec, ProcessSpec, + Resource, StateBackendArgsDict, StateBackendArgsSpec, StateBackendSpec, @@ -73,6 +74,7 @@ "ProcessSpec", "ProcessArgsDict", "ProcessArgsSpec", + "Resource", "StateBackendSpec", "StateBackendArgsDict", "StateBackendArgsSpec", diff --git a/plugboard/tune/tune.py b/plugboard/tune/tune.py index 5f001e52..92211fa3 100644 --- a/plugboard/tune/tune.py +++ b/plugboard/tune/tune.py @@ -221,15 +221,14 @@ def run(self, spec: ProcessSpec) -> ray.tune.Result | list[ray.tune.Result]: # re-register the classes in the worker required_classes = {c.type: ComponentRegistry.get(c.type) for c in spec.args.components} + # Calculate resource requirements from components + placement_bundles = self._calculate_placement_bundles(spec) + # See https://github.com/ray-project/ray/issues/24445 and # https://docs.ray.io/en/latest/tune/api/doc/ray.tune.execution.placement_groups.PlacementGroupFactory.html trainable_with_resources = ray.tune.with_resources( self._build_objective(required_classes, spec), - ray.tune.PlacementGroupFactory( - # Reserve 0.5 CPU for the tune process and 0 CPU for each component in the Process - # TODO: Implement better resource allocation based on Process requirements - [{"CPU": 0.5}], - ), + ray.tune.PlacementGroupFactory(placement_bundles), ) tuner_kwargs: dict[str, _t.Any] = { @@ -322,3 +321,52 @@ def _init_search_algorithm( if max_concurrent is not None: algo = ray.tune.search.ConcurrencyLimiter(algo, max_concurrent) return algo + + def _calculate_placement_bundles(self, spec: ProcessSpec) -> list[dict[str, float]]: + """Calculate placement group bundles from component resource requirements. + + Args: + spec: The ProcessSpec containing component specifications. + + Returns: + List of resource bundles for Ray placement group. + """ + from plugboard.schemas import Resource + + # Start with a bundle for the tune process itself + bundles = [{"CPU": 0.5}] + + # Aggregate resources from all components + total_cpu = 0.0 + total_gpu = 0.0 + total_memory = 0.0 + custom_resources: dict[str, float] = {} + + for component_spec in spec.args.components: + resources = component_spec.args.resources + if resources is None: + # Use default resources + resources = Resource() + + total_cpu += resources.cpu + total_gpu += resources.gpu + total_memory += resources.memory + + # Aggregate custom resources + for key, value in resources.resources.items(): + custom_resources[key] = custom_resources.get(key, 0.0) + value + + # Create a single bundle for all component resources + component_bundle: dict[str, float] = {} + if total_cpu > 0: + component_bundle["CPU"] = total_cpu + if total_gpu > 0: + component_bundle["GPU"] = total_gpu + if total_memory > 0: + component_bundle["memory"] = total_memory + component_bundle.update(custom_resources) + + if component_bundle: + bundles.append(component_bundle) + + return bundles diff --git a/tests/unit/test_resource.py b/tests/unit/test_resource.py new file mode 100644 index 00000000..f027f7cd --- /dev/null +++ b/tests/unit/test_resource.py @@ -0,0 +1,123 @@ +"""Unit tests for Resource class.""" +# ruff: noqa: D101,D102,D103 + +import pytest +from pydantic import ValidationError + +from plugboard.schemas import Resource + + +def test_resource_default_values() -> None: + """Test Resource default values.""" + resource = Resource() + assert resource.cpu == 0.001 + assert resource.gpu == 0 + assert resource.memory == 0 + assert resource.resources == {} + + +def test_resource_numeric_values() -> None: + """Test Resource with numeric values.""" + resource = Resource(cpu=2.5, gpu=1, memory=1024) + assert resource.cpu == 2.5 + assert resource.gpu == 1.0 + assert resource.memory == 1024.0 + + +def test_resource_string_milli_units() -> None: + """Test Resource with milli-unit string values.""" + resource = Resource(cpu="250m", gpu="500m") + assert resource.cpu == 0.25 + assert resource.gpu == 0.5 + + +def test_resource_string_memory_units() -> None: + """Test Resource with memory unit string values.""" + resource = Resource(memory="10Mi") + assert resource.memory == 10 * 1024 * 1024 + + resource = Resource(memory="5Gi") + assert resource.memory == 5 * 1024 * 1024 * 1024 + + resource = Resource(memory="2Ki") + assert resource.memory == 2 * 1024 + + resource = Resource(memory="1Ti") + assert resource.memory == 1 * 1024 * 1024 * 1024 * 1024 + + +def test_resource_string_plain_number() -> None: + """Test Resource with plain number string values.""" + resource = Resource(cpu="2.5", gpu="1") + assert resource.cpu == 2.5 + assert resource.gpu == 1.0 + + +def test_resource_custom_resources() -> None: + """Test Resource with custom resources dictionary.""" + resource = Resource(resources={"custom_gpu": 2, "special_hardware": "500m"}) + assert resource.resources == {"custom_gpu": 2.0, "special_hardware": 0.5} + + +def test_resource_invalid_milli_format() -> None: + """Test Resource with invalid milli-unit format.""" + with pytest.raises(ValidationError): + Resource(cpu="250x") + + +def test_resource_invalid_memory_format() -> None: + """Test Resource with invalid memory format.""" + with pytest.raises(ValidationError): + Resource(memory="10Xi") + + +def test_resource_invalid_string_format() -> None: + """Test Resource with invalid string format.""" + with pytest.raises(ValidationError): + Resource(cpu="invalid") + + +def test_resource_to_ray_options_default() -> None: + """Test conversion to Ray options with default values.""" + resource = Resource() + options = resource.to_ray_options() + assert options == {"num_cpus": 0.001} + + +def test_resource_to_ray_options_all_fields() -> None: + """Test conversion to Ray options with all fields set.""" + resource = Resource(cpu=2, gpu=1, memory=1024, resources={"custom": 5}) + options = resource.to_ray_options() + assert options == { + "num_cpus": 2.0, + "num_gpus": 1.0, + "memory": 1024.0, + "resources": {"custom": 5.0}, + } + + +def test_resource_to_ray_options_only_cpu() -> None: + """Test conversion to Ray options with only CPU.""" + resource = Resource(cpu="500m") + options = resource.to_ray_options() + assert options == {"num_cpus": 0.5} + + +def test_resource_to_ray_options_with_strings() -> None: + """Test conversion to Ray options with string inputs.""" + resource = Resource(cpu="250m", memory="10Mi", resources={"custom": "100m"}) + options = resource.to_ray_options() + assert options == { + "num_cpus": 0.25, + "memory": 10 * 1024 * 1024, + "resources": {"custom": 0.1}, + } + + +def test_resource_to_ray_options_zero_values_excluded() -> None: + """Test that zero values are excluded from Ray options (except CPU).""" + resource = Resource(cpu=1, gpu=0, memory=0) + options = resource.to_ray_options() + assert options == {"num_cpus": 1.0} + assert "num_gpus" not in options + assert "memory" not in options From f59376cbb2c41b2b74e9d558010f8bc359f970cd Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 28 Jan 2026 19:52:23 +0000 Subject: [PATCH 03/28] Add comprehensive tests for Resource implementation Co-authored-by: toby-coleman <13170610+toby-coleman@users.noreply.github.com> --- plugboard/component/component.py | 1 + tests/integration/test_component_resources.py | 77 +++++++++++++++ tests/unit/test_ray_resources.py | 63 +++++++++++++ tests/unit/test_tuner_resources.py | 93 +++++++++++++++++++ 4 files changed, 234 insertions(+) create mode 100644 tests/integration/test_component_resources.py create mode 100644 tests/unit/test_ray_resources.py create mode 100644 tests/unit/test_tuner_resources.py diff --git a/plugboard/component/component.py b/plugboard/component/component.py index 05320644..06f12b61 100644 --- a/plugboard/component/component.py +++ b/plugboard/component/component.py @@ -24,6 +24,7 @@ from plugboard.state import StateBackend from plugboard.utils import DI, ClassRegistry, ExportMixin, is_on_ray_worker + if _t.TYPE_CHECKING: from plugboard.schemas import Resource diff --git a/tests/integration/test_component_resources.py b/tests/integration/test_component_resources.py new file mode 100644 index 00000000..37776e1a --- /dev/null +++ b/tests/integration/test_component_resources.py @@ -0,0 +1,77 @@ +"""Integration tests for Components with resource requirements.""" +# ruff: noqa: D101,D102,D103 + +import typing as _t + +import pytest + +from plugboard.component import Component, IOController as IO +from plugboard.process import LocalProcess +from plugboard.schemas import Resource + + +class ResourceComponent(Component): + """Test component with resource requirements.""" + + io = IO(inputs=["a"], outputs=["b"]) + + def __init__(self, multiplier: int = 1, *args: _t.Any, **kwargs: _t.Any) -> None: + super().__init__(*args, **kwargs) + self._multiplier = multiplier + + async def step(self) -> None: + if self.a is not None: + self.b = self.a * self._multiplier + await self.io.close() + + +@pytest.mark.asyncio +async def test_component_with_resources() -> None: + """Test that a component can be created with resource requirements.""" + resources = Resource(cpu="500m", gpu=1, memory="10Mi") + component = ResourceComponent(name="test", resources=resources) + + assert component.resources == resources + assert component.resources.cpu == 0.5 + assert component.resources.gpu == 1.0 + assert component.resources.memory == 10 * 1024 * 1024 + + +@pytest.mark.asyncio +async def test_component_with_default_resources() -> None: + """Test that a component uses default resources when none specified.""" + component = ResourceComponent(name="test") + + assert component.resources is None + + +@pytest.mark.asyncio +async def test_component_resources_in_local_process() -> None: + """Test that components with resources work in LocalProcess.""" + resources = Resource(cpu=1.0, memory="100Mi") + component = ResourceComponent( + name="test", + resources=resources, + multiplier=2, + initial_values={"a": [5]}, + ) + + process = LocalProcess(components=[component], connectors=[]) + + async with process: + await process.run() + + assert component.b == 10 + assert component.resources.cpu == 1.0 + + +@pytest.mark.asyncio +async def test_component_export_includes_resources() -> None: + """Test that component export includes resource requirements.""" + resources = Resource(cpu="250m", gpu=0.5, memory="5Gi") + component = ResourceComponent(name="test", resources=resources) + + exported = component.export() + assert "args" in exported + # The resources should be passed through in the exported args + assert exported["args"]["resources"] == resources diff --git a/tests/unit/test_ray_resources.py b/tests/unit/test_ray_resources.py new file mode 100644 index 00000000..0e4dd38d --- /dev/null +++ b/tests/unit/test_ray_resources.py @@ -0,0 +1,63 @@ +"""Unit tests for RayProcess resource handling.""" +# ruff: noqa: D101,D102,D103 + +import typing as _t + +from plugboard.component import Component, IOController as IO +from plugboard.schemas import Resource + + +class SimpleComponent(Component): + """Simple test component.""" + + io = IO(inputs=["a"], outputs=["b"]) + + +def test_component_with_resources_to_ray_options() -> None: + """Test that component resources are correctly converted to Ray options.""" + resources = Resource(cpu="500m", gpu=1, memory="10Mi", resources={"custom_gpu": 2}) + + component = SimpleComponent(name="test", resources=resources) + + # Verify resources are stored + assert component.resources is not None + assert component.resources.cpu == 0.5 + assert component.resources.gpu == 1.0 + assert component.resources.memory == 10 * 1024 * 1024 + assert component.resources.resources == {"custom_gpu": 2.0} + + # Verify Ray options conversion + ray_options = component.resources.to_ray_options() + assert ray_options == { + "num_cpus": 0.5, + "num_gpus": 1.0, + "memory": 10485760.0, + "resources": {"custom_gpu": 2.0}, + } + + +def test_component_default_resources_to_ray_options() -> None: + """Test that default resources are correctly converted to Ray options.""" + # Component without resources should use defaults when creating Ray options + default_resources = Resource() + + ray_options = default_resources.to_ray_options() + assert ray_options == {"num_cpus": 0.001} + assert "num_gpus" not in ray_options + assert "memory" not in ray_options + + +def test_component_with_zero_resources() -> None: + """Test that zero resources are handled correctly.""" + resources = Resource(cpu=0, gpu=0, memory=0) + + component = SimpleComponent(name="test", resources=resources) + + # Zero values should be stored + assert component.resources.cpu == 0 + assert component.resources.gpu == 0 + assert component.resources.memory == 0 + + # But excluded from Ray options + ray_options = component.resources.to_ray_options() + assert ray_options == {} diff --git a/tests/unit/test_tuner_resources.py b/tests/unit/test_tuner_resources.py new file mode 100644 index 00000000..f5fad916 --- /dev/null +++ b/tests/unit/test_tuner_resources.py @@ -0,0 +1,93 @@ +"""Unit tests for Tuner resource placement calculations.""" +# ruff: noqa: D101,D102,D103 + +from plugboard_schemas import ( + ComponentArgsSpec, + ComponentSpec, + ProcessArgsSpec, + ProcessSpec, + Resource, +) + + +def test_process_spec_with_component_resources() -> None: + """Test ProcessSpec can include component resources.""" + component_spec = ComponentSpec( + type="test.Component", + args=ComponentArgsSpec( + name="test_component", + resources=Resource(cpu="500m", gpu=1, memory="100Mi"), + ), + ) + + process_spec = ProcessSpec( + type="plugboard.process.LocalProcess", + args=ProcessArgsSpec( + name="test_process", + components=[component_spec], + connectors=[], + ), + ) + + # Verify the resource is properly nested in the spec + assert process_spec.args.components[0].args.resources is not None + assert process_spec.args.components[0].args.resources.cpu == 0.5 + assert process_spec.args.components[0].args.resources.gpu == 1.0 + assert process_spec.args.components[0].args.resources.memory == 100 * 1024 * 1024 + + +def test_process_spec_with_multiple_component_resources() -> None: + """Test ProcessSpec with multiple components having resources.""" + component1 = ComponentSpec( + type="test.Component1", + args=ComponentArgsSpec( + name="comp1", + resources=Resource(cpu=1.0, gpu=0.5), + ), + ) + + component2 = ComponentSpec( + type="test.Component2", + args=ComponentArgsSpec( + name="comp2", + resources=Resource(cpu=2.0, memory="50Mi"), + ), + ) + + process_spec = ProcessSpec( + type="plugboard.process.LocalProcess", + args=ProcessArgsSpec( + name="test_process", + components=[component1, component2], + connectors=[], + ), + ) + + # Verify both components have resources + assert process_spec.args.components[0].args.resources.cpu == 1.0 + assert process_spec.args.components[0].args.resources.gpu == 0.5 + assert process_spec.args.components[1].args.resources.cpu == 2.0 + assert process_spec.args.components[1].args.resources.memory == 50 * 1024 * 1024 + + +def test_process_spec_with_default_component_resources() -> None: + """Test ProcessSpec with component using default resources.""" + component_spec = ComponentSpec( + type="test.Component", + args=ComponentArgsSpec( + name="test_component", + # No resources specified, should use defaults + ), + ) + + process_spec = ProcessSpec( + type="plugboard.process.LocalProcess", + args=ProcessArgsSpec( + name="test_process", + components=[component_spec], + connectors=[], + ), + ) + + # Resources should be None when not specified + assert process_spec.args.components[0].args.resources is None From 0219833b3bf46fedd757fb7e905b58b95d85a5a3 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 28 Jan 2026 19:53:49 +0000 Subject: [PATCH 04/28] Add examples for resource requirements in Ray processes Co-authored-by: toby-coleman <13170610+toby-coleman@users.noreply.github.com> --- .../004_using_ray/resources-example.yaml | 45 ++++++++++ .../004_using_ray/resources_example.py | 88 +++++++++++++++++++ 2 files changed, 133 insertions(+) create mode 100644 examples/tutorials/004_using_ray/resources-example.yaml create mode 100644 examples/tutorials/004_using_ray/resources_example.py diff --git a/examples/tutorials/004_using_ray/resources-example.yaml b/examples/tutorials/004_using_ray/resources-example.yaml new file mode 100644 index 00000000..752300cf --- /dev/null +++ b/examples/tutorials/004_using_ray/resources-example.yaml @@ -0,0 +1,45 @@ +# Example YAML configuration with resource requirements +# +# This configuration demonstrates how to specify resource requirements +# for components in a Plugboard process. Resources can be specified as: +# - Numerical values: cpu: 2.0 +# - Milli-units: cpu: "250m" (equals 0.25) +# - Memory units: memory: "100Mi" (equals 100 * 1024 * 1024 bytes) + +plugboard: + process: + type: plugboard.process.RayProcess + connector_builder: + type: plugboard.connector.RayConnector + args: + name: resource-example-process + components: + - type: examples.tutorials.004_using_ray.resources_example.DataProducer + args: + name: producer + iters: 10 + resources: + cpu: 1.0 # Requires 1 CPU + + - type: examples.tutorials.004_using_ray.resources_example.CPUIntensiveTask + args: + name: cpu-task + resources: + cpu: 2.0 # Requires 2 CPUs + memory: "512Mi" # Requires 512MB memory + + - type: examples.tutorials.004_using_ray.resources_example.GPUTask + args: + name: gpu-task + resources: + cpu: "500m" # Requires 0.5 CPU (using milli-unit notation) + gpu: 1 # Requires 1 GPU + resources: + custom_hardware: 2 # Custom resource requirement + + connectors: + - source: producer.output + target: cpu-task.x + + - source: cpu-task.y + target: gpu-task.data diff --git a/examples/tutorials/004_using_ray/resources_example.py b/examples/tutorials/004_using_ray/resources_example.py new file mode 100644 index 00000000..d035825b --- /dev/null +++ b/examples/tutorials/004_using_ray/resources_example.py @@ -0,0 +1,88 @@ +"""Example demonstrating resource requirements for components in RayProcess.""" + +import asyncio +import typing as _t + +import ray + +from plugboard.component import Component, IOController as IO +from plugboard.connector import RayConnector +from plugboard.process import RayProcess +from plugboard.schemas import ComponentArgsDict, ConnectorSpec, Resource + + +class CPUIntensiveTask(Component): + """Component that requires more CPU resources.""" + + io = IO(inputs=["x"], outputs=["y"]) + + async def step(self) -> None: + # Simulate CPU-intensive work + result = sum(i**2 for i in range(int(self.x * 10000))) + self.y = result + + +class GPUTask(Component): + """Component that requires GPU resources.""" + + io = IO(inputs=["data"], outputs=["result"]) + + async def step(self) -> None: + # Simulate GPU computation + self.result = self.data * 2 + + +class DataProducer(Component): + """Produces data for processing.""" + + io = IO(outputs=["output"]) + + def __init__(self, iters: int, **kwargs: _t.Unpack[ComponentArgsDict]) -> None: + super().__init__(**kwargs) + self._iters = iters + + async def init(self) -> None: + self._seq = iter(range(self._iters)) + + async def step(self) -> None: + try: + self.output = next(self._seq) + except StopIteration: + await self.io.close() + + +async def main() -> None: + """Run the process with resource-constrained components.""" + # Define resource requirements for components + cpu_resources = Resource(cpu=2.0) # Requires 2 CPUs + gpu_resources = Resource(cpu="500m", gpu=1) # Requires 0.5 CPU and 1 GPU + memory_resources = Resource(cpu="250m", memory="100Mi") # 0.25 CPU and 100MB memory + + process = RayProcess( + components=[ + DataProducer(name="producer", iters=5, resources=cpu_resources), + CPUIntensiveTask(name="cpu-task", resources=cpu_resources), + GPUTask(name="gpu-task", resources=gpu_resources), + ], + connectors=[ + RayConnector(spec=ConnectorSpec(source="producer.output", target="cpu-task.x")), + RayConnector(spec=ConnectorSpec(source="cpu-task.y", target="gpu-task.data")), + ], + ) + + async with process: + await process.run() + + print("Process completed successfully!") + print(f"Final result from GPU task: {process.components['gpu-task'].result}") + + +if __name__ == "__main__": + # Initialize Ray + ray.init() + + # Run the process + asyncio.run(main()) + + # Shutdown Ray + ray.shutdown() From 3ec78541affcb842ee053af88f121636424d1b5e Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 28 Jan 2026 19:58:19 +0000 Subject: [PATCH 05/28] Fix type annotations and address code review feedback Co-authored-by: toby-coleman <13170610+toby-coleman@users.noreply.github.com> --- .../tutorials/004_using_ray/resources_example.py | 1 - plugboard-schemas/plugboard_schemas/component.py | 14 ++++++++------ tests/unit/test_ray_resources.py | 2 -- tests/unit/test_resource.py | 2 +- 4 files changed, 9 insertions(+), 10 deletions(-) diff --git a/examples/tutorials/004_using_ray/resources_example.py b/examples/tutorials/004_using_ray/resources_example.py index d035825b..76779585 100644 --- a/examples/tutorials/004_using_ray/resources_example.py +++ b/examples/tutorials/004_using_ray/resources_example.py @@ -56,7 +56,6 @@ async def main() -> None: # Define resource requirements for components cpu_resources = Resource(cpu=2.0) # Requires 2 CPUs gpu_resources = Resource(cpu="500m", gpu=1) # Requires 0.5 CPU and 1 GPU - memory_resources = Resource(cpu="250m", memory="100Mi") # 0.25 CPU and 100MB memory process = RayProcess( components=[ diff --git a/plugboard-schemas/plugboard_schemas/component.py b/plugboard-schemas/plugboard_schemas/component.py index 19d9bf31..f88dede9 100644 --- a/plugboard-schemas/plugboard_schemas/component.py +++ b/plugboard-schemas/plugboard_schemas/component.py @@ -50,7 +50,9 @@ def _parse_resource_value(value: str | float | int) -> float: for suffix, multiplier in memory_units.items(): if value.endswith(suffix): - match = re.match(rf"^(\d+(?:\.\d+)?){suffix}$", value) + # Use re.escape to safely escape the suffix in the regex pattern + pattern = rf"^(\d+(?:\.\d+)?){re.escape(suffix)}$" + match = re.match(pattern, value) if match: return float(match.group(1)) * multiplier raise ValueError(f"Invalid memory unit format: {value}") @@ -76,10 +78,10 @@ class Resource(PlugboardBaseModel): resources: Custom resource requirements as a dictionary. """ - cpu: str | float | int = 0.001 - gpu: str | float | int = 0 - memory: str | float | int = 0 - resources: dict[str, str | float | int] = Field(default_factory=dict) + cpu: float = 0.001 + gpu: float = 0 + memory: float = 0 + resources: dict[str, float] = Field(default_factory=dict) @field_validator("cpu", "gpu", "memory", mode="before") @classmethod @@ -99,7 +101,7 @@ def to_ray_options(self) -> dict[str, _t.Any]: Returns: Dictionary of Ray actor options. """ - options = {} + options: dict[str, _t.Any] = {} if self.cpu > 0: options["num_cpus"] = self.cpu diff --git a/tests/unit/test_ray_resources.py b/tests/unit/test_ray_resources.py index 0e4dd38d..36b8ff4c 100644 --- a/tests/unit/test_ray_resources.py +++ b/tests/unit/test_ray_resources.py @@ -1,8 +1,6 @@ """Unit tests for RayProcess resource handling.""" # ruff: noqa: D101,D102,D103 -import typing as _t - from plugboard.component import Component, IOController as IO from plugboard.schemas import Resource diff --git a/tests/unit/test_resource.py b/tests/unit/test_resource.py index f027f7cd..c20e02da 100644 --- a/tests/unit/test_resource.py +++ b/tests/unit/test_resource.py @@ -1,8 +1,8 @@ """Unit tests for Resource class.""" # ruff: noqa: D101,D102,D103 -import pytest from pydantic import ValidationError +import pytest from plugboard.schemas import Resource From 9515154aaadf8f20551dac11656c58763335c59e Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sat, 31 Jan 2026 09:38:05 +0000 Subject: [PATCH 06/28] Address code review feedback: generalize suffixes, make memory integer, move imports to top Co-authored-by: toby-coleman <13170610+toby-coleman@users.noreply.github.com> --- .../examples/tutorials/running-in-parallel.md | 6 +++ .../plugboard_schemas/component.py | 53 +++++++++++-------- plugboard/process/ray_process.py | 4 +- plugboard/tune/tune.py | 3 +- 4 files changed, 39 insertions(+), 27 deletions(-) diff --git a/docs/examples/tutorials/running-in-parallel.md b/docs/examples/tutorials/running-in-parallel.md index 168731a7..3df11aa4 100644 --- a/docs/examples/tutorials/running-in-parallel.md +++ b/docs/examples/tutorials/running-in-parallel.md @@ -70,3 +70,9 @@ Specifying the process type and channel builder type in the YAML is the only cha 1. Tell Plugboard to use a [`RayProcess`][plugboard.process.RayProcess] instead of the default [`LocalProcess`][plugboard.process.LocalProcess]. 2. Also change the connector builder to [`RayConnector`][plugboard.connector.RayConnector], which will build [`RayChannel`][plugboard.connector.RayChannel] objects when creating the `Process`. + +## Specifying resource requirements + +When running components on Ray, you can specify resource requirements for each component to control how Ray allocates computational resources. This is particularly useful when you have components with different resource needs (e.g., CPU-intensive vs GPU-intensive tasks). + +Resources can be specified in both Python and YAML configurations. See the [resource requirements example](../../examples/tutorials/004_using_ray/resources-example.yaml) for a complete YAML configuration demonstrating various resource specifications including CPU, GPU, memory, and custom resources. diff --git a/plugboard-schemas/plugboard_schemas/component.py b/plugboard-schemas/plugboard_schemas/component.py index f88dede9..7c88d466 100644 --- a/plugboard-schemas/plugboard_schemas/component.py +++ b/plugboard-schemas/plugboard_schemas/component.py @@ -13,9 +13,8 @@ def _parse_resource_value(value: str | float | int) -> float: Supports: - Direct numerical values: 1, 0.5, 2.0 - - Milli-units: "250m" -> 0.25 - - Memory units: "10Mi" -> 10485760 (10 * 1024 * 1024) - - Memory units: "10Gi" -> 10737418240 (10 * 1024 * 1024 * 1024) + - Decimal SI prefixes: n, u, m, k, M, G, T, P, E (e.g., "250m" -> 0.25, "5k" -> 5000) + - Binary prefixes: Ki, Mi, Gi, Ti, Pi, Ei (e.g., "10Mi" -> 10485760) Args: value: The resource value to parse. @@ -32,30 +31,34 @@ def _parse_resource_value(value: str | float | int) -> float: # Handle string values value = value.strip() - # Handle milli-units (e.g., "250m" -> 0.25) - if value.endswith("m"): - match = re.match(r"^(\d+(?:\.\d+)?)m$", value) - if match: - return float(match.group(1)) / 1000.0 - raise ValueError(f"Invalid milli-unit format: {value}") - - # Handle memory units - # Ki = 1024, Mi = 1024^2, Gi = 1024^3, Ti = 1024^4 - memory_units = { + # Define all supported suffixes (decimal SI and binary) + suffixes = { + "n": 1e-9, + "u": 1e-6, + "m": 1e-3, # Decimal SI + "k": 1e3, + "M": 1e6, + "G": 1e9, + "T": 1e12, + "P": 1e15, + "E": 1e18, "Ki": 1024, "Mi": 1024**2, "Gi": 1024**3, "Ti": 1024**4, + "Pi": 1024**5, + "Ei": 1024**6, } - for suffix, multiplier in memory_units.items(): + # Sort by length (longest first) to match "Ki" before "k", etc. + for suffix in sorted(suffixes.keys(), key=len, reverse=True): if value.endswith(suffix): # Use re.escape to safely escape the suffix in the regex pattern pattern = rf"^(\d+(?:\.\d+)?){re.escape(suffix)}$" match = re.match(pattern, value) if match: - return float(match.group(1)) * multiplier - raise ValueError(f"Invalid memory unit format: {value}") + return float(match.group(1)) * suffixes[suffix] + raise ValueError(f"Invalid format for suffix '{suffix}': {value}") # Try to parse as a plain number try: @@ -74,21 +77,27 @@ class Resource(PlugboardBaseModel): Attributes: cpu: CPU requirement (default: 0.001). gpu: GPU requirement (default: 0). - memory: Memory requirement in bytes (default: 0). + memory: Memory requirement in bytes as an integer (default: 0). resources: Custom resource requirements as a dictionary. """ cpu: float = 0.001 gpu: float = 0 - memory: float = 0 + memory: int = 0 resources: dict[str, float] = Field(default_factory=dict) - @field_validator("cpu", "gpu", "memory", mode="before") + @field_validator("cpu", "gpu", mode="before") @classmethod - def _parse_resource_field(cls, v: str | float | int) -> float: - """Validate and parse resource fields.""" + def _parse_cpu_gpu_field(cls, v: str | float | int) -> float: + """Validate and parse CPU and GPU fields.""" return _parse_resource_value(v) + @field_validator("memory", mode="before") + @classmethod + def _parse_memory_field(cls, v: str | float | int) -> int: + """Validate and parse memory field as integer bytes.""" + return int(_parse_resource_value(v)) + @field_validator("resources", mode="before") @classmethod def _parse_resources_dict(cls, v: dict[str, str | float | int]) -> dict[str, float]: @@ -124,7 +133,7 @@ class ComponentArgsDict(_t.TypedDict): initial_values: _t.NotRequired[dict[str, _t.Any] | None] parameters: _t.NotRequired[dict[str, _t.Any] | None] constraints: _t.NotRequired[dict[str, _t.Any] | None] - resources: _t.NotRequired["Resource | None"] + resources: _t.NotRequired[Resource | None] class ComponentArgsSpec(PlugboardBaseModel, extra="allow"): diff --git a/plugboard/process/ray_process.py b/plugboard/process/ray_process.py index da17b131..81a3e28f 100644 --- a/plugboard/process/ray_process.py +++ b/plugboard/process/ray_process.py @@ -7,7 +7,7 @@ from plugboard.component.io_controller import IODirection from plugboard.connector import Connector from plugboard.process.process import Process -from plugboard.schemas import Status +from plugboard.schemas import Resource, Status from plugboard.state import RayStateBackend, StateBackend from plugboard.utils import build_actor_wrapper, depends_on_optional, gather_except, gen_rand_str @@ -64,8 +64,6 @@ def _create_component_actor(self, component: Component) -> _t.Any: actor_cls = build_actor_wrapper(component.__class__) # Get resource requirements from component - from plugboard.schemas import Resource - resources = component.resources if resources is None: # Use default resources if not specified diff --git a/plugboard/tune/tune.py b/plugboard/tune/tune.py index 92211fa3..7df17108 100644 --- a/plugboard/tune/tune.py +++ b/plugboard/tune/tune.py @@ -16,6 +16,7 @@ OptunaSpec, ParameterSpec, ProcessSpec, + Resource, ) from plugboard.utils import DI, run_coro_sync from plugboard.utils.dependencies import depends_on_optional @@ -331,8 +332,6 @@ def _calculate_placement_bundles(self, spec: ProcessSpec) -> list[dict[str, floa Returns: List of resource bundles for Ray placement group. """ - from plugboard.schemas import Resource - # Start with a bundle for the tune process itself bundles = [{"CPU": 0.5}] From c400a97218086d579edb8a486d6d0c9873d4c109 Mon Sep 17 00:00:00 2001 From: Toby Coleman Date: Tue, 3 Feb 2026 21:26:16 +0000 Subject: [PATCH 07/28] Move suffixes to top --- .../plugboard_schemas/component.py | 42 +++++++++---------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/plugboard-schemas/plugboard_schemas/component.py b/plugboard-schemas/plugboard_schemas/component.py index 7c88d466..a5ee65ec 100644 --- a/plugboard-schemas/plugboard_schemas/component.py +++ b/plugboard-schemas/plugboard_schemas/component.py @@ -8,6 +8,25 @@ from ._common import PlugboardBaseModel +RESOURCE_SUFFIXES = { + "n": 1e-9, + "u": 1e-6, + "m": 1e-3, + "k": 1e3, + "M": 1e6, + "G": 1e9, + "T": 1e12, + "P": 1e15, + "E": 1e18, + "Ki": 1024, + "Mi": 1024**2, + "Gi": 1024**3, + "Ti": 1024**4, + "Pi": 1024**5, + "Ei": 1024**6, +} + + def _parse_resource_value(value: str | float | int) -> float: """Parse a resource value from string or number. @@ -31,33 +50,14 @@ def _parse_resource_value(value: str | float | int) -> float: # Handle string values value = value.strip() - # Define all supported suffixes (decimal SI and binary) - suffixes = { - "n": 1e-9, - "u": 1e-6, - "m": 1e-3, # Decimal SI - "k": 1e3, - "M": 1e6, - "G": 1e9, - "T": 1e12, - "P": 1e15, - "E": 1e18, - "Ki": 1024, - "Mi": 1024**2, - "Gi": 1024**3, - "Ti": 1024**4, - "Pi": 1024**5, - "Ei": 1024**6, - } - # Sort by length (longest first) to match "Ki" before "k", etc. - for suffix in sorted(suffixes.keys(), key=len, reverse=True): + for suffix in sorted(RESOURCE_SUFFIXES.keys(), key=len, reverse=True): if value.endswith(suffix): # Use re.escape to safely escape the suffix in the regex pattern pattern = rf"^(\d+(?:\.\d+)?){re.escape(suffix)}$" match = re.match(pattern, value) if match: - return float(match.group(1)) * suffixes[suffix] + return float(match.group(1)) * RESOURCE_SUFFIXES[suffix] raise ValueError(f"Invalid format for suffix '{suffix}': {value}") # Try to parse as a plain number From a38aa880f293f6cafa44980820b4b78dadf02739 Mon Sep 17 00:00:00 2001 From: Toby Coleman Date: Tue, 3 Feb 2026 21:58:39 +0000 Subject: [PATCH 08/28] Rework logic for tune placement bundles --- plugboard/tune/tune.py | 75 ++++++++++++++++++++++-------------------- 1 file changed, 40 insertions(+), 35 deletions(-) diff --git a/plugboard/tune/tune.py b/plugboard/tune/tune.py index 7df17108..44c74862 100644 --- a/plugboard/tune/tune.py +++ b/plugboard/tune/tune.py @@ -332,40 +332,45 @@ def _calculate_placement_bundles(self, spec: ProcessSpec) -> list[dict[str, floa Returns: List of resource bundles for Ray placement group. """ - # Start with a bundle for the tune process itself - bundles = [{"CPU": 0.5}] - - # Aggregate resources from all components - total_cpu = 0.0 - total_gpu = 0.0 - total_memory = 0.0 - custom_resources: dict[str, float] = {} - - for component_spec in spec.args.components: - resources = component_spec.args.resources - if resources is None: - # Use default resources - resources = Resource() - - total_cpu += resources.cpu - total_gpu += resources.gpu - total_memory += resources.memory - - # Aggregate custom resources - for key, value in resources.resources.items(): - custom_resources[key] = custom_resources.get(key, 0.0) + value - - # Create a single bundle for all component resources - component_bundle: dict[str, float] = {} - if total_cpu > 0: - component_bundle["CPU"] = total_cpu - if total_gpu > 0: - component_bundle["GPU"] = total_gpu - if total_memory > 0: - component_bundle["memory"] = total_memory - component_bundle.update(custom_resources) - - if component_bundle: - bundles.append(component_bundle) + bundles = [] + + if spec.type.endswith("RayProcess"): + # Ray process requires a bundle for the tune process and each component + bundles.append({"CPU": 1.0}) # Bundle for the tune process + for component_spec in spec.args.components: + resources = component_spec.args.resources + if resources is None: + # Use default resources + resources = Resource() + + bundles.append(resources.to_ray_options()) + else: + # Aggregate resources from all components + total_cpu = 1.0 # Ensure at least 1 CPU for the tune process + total_gpu = 0.0 + total_memory = 0.0 + custom_resources: dict[str, float] = {} + + for component_spec in spec.args.components: + resources = component_spec.args.resources + if resources is None: + # Use default resources + resources = Resource() + + total_cpu += resources.cpu + total_gpu += resources.gpu + total_memory += resources.memory + + # Aggregate custom resources + for key, value in resources.resources.items(): + custom_resources[key] = custom_resources.get(key, 0.0) + value + + resources = Resource( + cpu=total_cpu, + gpu=total_gpu, + memory=total_memory, + resources=custom_resources, + ) + bundles.append(resources.to_ray_options()) return bundles From 31092ab9e72944bd5965ff3ccd4a7d851d2414b6 Mon Sep 17 00:00:00 2001 From: Toby Coleman Date: Sat, 7 Feb 2026 14:18:17 +0000 Subject: [PATCH 09/28] Docs config improved for schemas --- docs/api/schemas/schemas.md | 2 +- mkdocs.yaml | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/docs/api/schemas/schemas.md b/docs/api/schemas/schemas.md index a4d00963..f4ce0adf 100644 --- a/docs/api/schemas/schemas.md +++ b/docs/api/schemas/schemas.md @@ -1 +1 @@ -::: plugboard.schemas +::: plugboard_schemas diff --git a/mkdocs.yaml b/mkdocs.yaml index 2abd14cb..8cefcca5 100644 --- a/mkdocs.yaml +++ b/mkdocs.yaml @@ -33,7 +33,7 @@ plugins: default_handler: python handlers: python: - paths: [src] + paths: [plugboard, plugboard-schemas] options: docstring_style: google show_source: false @@ -113,6 +113,7 @@ watch: - docs - examples - plugboard +- plugboard-schemas - README.md - CONTRIBUTING.md From 6bf7017c78daef7a23ed5f73793c398f9559c61a Mon Sep 17 00:00:00 2001 From: Toby Coleman Date: Sat, 7 Feb 2026 14:22:47 +0000 Subject: [PATCH 10/28] Tidy up documentation on resources example --- .../examples/tutorials/running-in-parallel.md | 29 +++++++++++++++++-- .../004_using_ray/resources-example.yaml | 15 ++++------ .../004_using_ray/resources_example.py | 2 ++ 3 files changed, 34 insertions(+), 12 deletions(-) diff --git a/docs/examples/tutorials/running-in-parallel.md b/docs/examples/tutorials/running-in-parallel.md index 3df11aa4..4a28cbde 100644 --- a/docs/examples/tutorials/running-in-parallel.md +++ b/docs/examples/tutorials/running-in-parallel.md @@ -73,6 +73,31 @@ Specifying the process type and channel builder type in the YAML is the only cha ## Specifying resource requirements -When running components on Ray, you can specify resource requirements for each component to control how Ray allocates computational resources. This is particularly useful when you have components with different resource needs (e.g., CPU-intensive vs GPU-intensive tasks). +When running components on Ray, you can specify resource requirements for each component to control how Ray allocates computational resources. This is particularly useful when you have components with different resource needs (e.g., CPU-intensive vs GPU-intensive tasks) and you are running on a Ray cluster. -Resources can be specified in both Python and YAML configurations. See the [resource requirements example](../../examples/tutorials/004_using_ray/resources-example.yaml) for a complete YAML configuration demonstrating various resource specifications including CPU, GPU, memory, and custom resources. +!!! tip + Normally Ray will start automatically when you are using Plugboard locally. If you want to start a separate Ray instance, for example so that you can control the configuration options, you can launch it from the [CLI](https://docs.ray.io/en/latest/ray-core/starting-ray.html). For example, this command will start a Ray instance with enough resources to run the example below. + + ```sh + uv run ray start --head --num-cpus=4 --num-gpus=1 --resources='{"custom_hardware": 5}' + ``` + +For example, you can specify [`Resource`][plugboard.schemas.Resource] requirements like this when creating components: + +```python +--8<-- "examples/tutorials/004_using_ray/resources_example.py:resources" +``` + +Or in YAML: + +```yaml +--8<-- "examples/tutorials/004_using_ray/resources-example.yaml:10:" +``` + +1. Requires 1 CPU. +2. Requires 2 CPU resources. +3. Requires 0.5 CPU, this time specified in Kubernetes-style format, i.e. 500 milli CPUs. +4. Requires 1 GPU. +5. Requires a custom resource called `custom_hardware`. This needs to specified in the configuration of your Ray cluster to make it available. + +See the [Ray documentation](https://docs.ray.io/en/latest/ray-core/scheduling/resources.html) for more information about specifying resource requirements. diff --git a/examples/tutorials/004_using_ray/resources-example.yaml b/examples/tutorials/004_using_ray/resources-example.yaml index 752300cf..8efeffe8 100644 --- a/examples/tutorials/004_using_ray/resources-example.yaml +++ b/examples/tutorials/004_using_ray/resources-example.yaml @@ -5,7 +5,6 @@ # - Numerical values: cpu: 2.0 # - Milli-units: cpu: "250m" (equals 0.25) # - Memory units: memory: "100Mi" (equals 100 * 1024 * 1024 bytes) - plugboard: process: type: plugboard.process.RayProcess @@ -19,27 +18,23 @@ plugboard: name: producer iters: 10 resources: - cpu: 1.0 # Requires 1 CPU - + cpu: 1.0 # (1)! - type: examples.tutorials.004_using_ray.resources_example.CPUIntensiveTask args: name: cpu-task resources: - cpu: 2.0 # Requires 2 CPUs + cpu: 2.0 # (2)! memory: "512Mi" # Requires 512MB memory - - type: examples.tutorials.004_using_ray.resources_example.GPUTask args: name: gpu-task resources: - cpu: "500m" # Requires 0.5 CPU (using milli-unit notation) - gpu: 1 # Requires 1 GPU + cpu: "500m" # (3)! + gpu: 1 # (4)! resources: - custom_hardware: 2 # Custom resource requirement - + custom_hardware: 2 # (5)! connectors: - source: producer.output target: cpu-task.x - - source: cpu-task.y target: gpu-task.data diff --git a/examples/tutorials/004_using_ray/resources_example.py b/examples/tutorials/004_using_ray/resources_example.py index 76779585..d3a5e794 100644 --- a/examples/tutorials/004_using_ray/resources_example.py +++ b/examples/tutorials/004_using_ray/resources_example.py @@ -54,6 +54,7 @@ async def step(self) -> None: async def main() -> None: """Run the process with resource-constrained components.""" # Define resource requirements for components + # --8<-- [start:resources] cpu_resources = Resource(cpu=2.0) # Requires 2 CPUs gpu_resources = Resource(cpu="500m", gpu=1) # Requires 0.5 CPU and 1 GPU @@ -68,6 +69,7 @@ async def main() -> None: RayConnector(spec=ConnectorSpec(source="cpu-task.y", target="gpu-task.data")), ], ) + # --8<-- [end:resources] async with process: await process.run() From bae7c4670dcb73291032fbd9cbd5cb1d1af13fd9 Mon Sep 17 00:00:00 2001 From: Toby Coleman Date: Sat, 7 Feb 2026 14:43:25 +0000 Subject: [PATCH 11/28] Add resources for smoke tests --- tests/conftest.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index b8290dd2..de6612f6 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -40,8 +40,11 @@ def mp_set_start_method() -> None: @pytest.fixture(scope="session") def ray_ctx() -> _t.Iterator[None]: - """Initialises and shuts down Ray.""" - ray.init(num_cpus=4, num_gpus=0, include_dashboard=True) + """Initialises and shuts down Ray. + + Includes a small amount of resources to allow testing of resource-constrained components. + """ + ray.init(num_cpus=5, num_gpus=1, resources={"custom_hardware": 10}, include_dashboard=True) yield ray.shutdown() From a9a4ef50f61146f09b612ff093da3a9847aeb928 Mon Sep 17 00:00:00 2001 From: Toby Coleman Date: Sat, 7 Feb 2026 14:43:39 +0000 Subject: [PATCH 12/28] Make sure resources are available in runner --- examples/tutorials/004_using_ray/resources_example.py | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/examples/tutorials/004_using_ray/resources_example.py b/examples/tutorials/004_using_ray/resources_example.py index d3a5e794..75567d93 100644 --- a/examples/tutorials/004_using_ray/resources_example.py +++ b/examples/tutorials/004_using_ray/resources_example.py @@ -75,15 +75,10 @@ async def main() -> None: await process.run() print("Process completed successfully!") - print(f"Final result from GPU task: {process.components['gpu-task'].result}") if __name__ == "__main__": - # Initialize Ray - ray.init() - - # Run the process + if not ray.is_initialized(): + # Ray must be initialised with the necessary resources + ray.init(num_cpus=5, num_gpus=1, resources={"custom_hardware": 10}, include_dashboard=True) asyncio.run(main()) - - # Shutdown Ray - ray.shutdown() From c14cbaffd8b63c69a56a36ab7e95815c3318cd9a Mon Sep 17 00:00:00 2001 From: Toby Coleman Date: Sat, 7 Feb 2026 14:46:40 +0000 Subject: [PATCH 13/28] Remove non-existent cross refs in docs --- plugboard/tune/tune.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/plugboard/tune/tune.py b/plugboard/tune/tune.py index 44c74862..a51cc317 100644 --- a/plugboard/tune/tune.py +++ b/plugboard/tune/tune.py @@ -74,7 +74,7 @@ def __init__( @property def result_grid(self) -> ray.tune.ResultGrid: - """Returns a [`ResultGrid`][ray.tune.ResultGrid] summarising the optimisation results.""" + """Returns a `ResultGrid` summarising the optimisation results.""" if self._result_grid is None: raise ValueError("No result grid available. Run the optimisation job first.") return self._result_grid @@ -213,8 +213,8 @@ def run(self, spec: ProcessSpec) -> ray.tune.Result | list[ray.tune.Result]: spec: The [`ProcessSpec`][plugboard.schemas.ProcessSpec] to optimise. Returns: - Either one or a list of [`Result`][ray.tune.Result] objects containing the best trial - result. Use the `result_grid` property to get full trial results. + Either one or a list of `Result` objects containing the best trial esult. Use the + `result_grid` property to get full trial results. """ self._logger.info("Running optimisation job on Ray") spec = spec.model_copy() From 2588544c247d76440775bbc3a8d721311749c0d6 Mon Sep 17 00:00:00 2001 From: Toby Coleman Date: Sat, 7 Feb 2026 14:52:36 +0000 Subject: [PATCH 14/28] Fixup types --- tests/unit/test_ray_resources.py | 1 + tests/unit/test_tuner_resources.py | 2 ++ 2 files changed, 3 insertions(+) diff --git a/tests/unit/test_ray_resources.py b/tests/unit/test_ray_resources.py index 36b8ff4c..2c391582 100644 --- a/tests/unit/test_ray_resources.py +++ b/tests/unit/test_ray_resources.py @@ -52,6 +52,7 @@ def test_component_with_zero_resources() -> None: component = SimpleComponent(name="test", resources=resources) # Zero values should be stored + assert component.resources is not None assert component.resources.cpu == 0 assert component.resources.gpu == 0 assert component.resources.memory == 0 diff --git a/tests/unit/test_tuner_resources.py b/tests/unit/test_tuner_resources.py index f5fad916..7e6355d4 100644 --- a/tests/unit/test_tuner_resources.py +++ b/tests/unit/test_tuner_resources.py @@ -64,8 +64,10 @@ def test_process_spec_with_multiple_component_resources() -> None: ) # Verify both components have resources + assert process_spec.args.components[0].args.resources is not None assert process_spec.args.components[0].args.resources.cpu == 1.0 assert process_spec.args.components[0].args.resources.gpu == 0.5 + assert process_spec.args.components[1].args.resources is not None assert process_spec.args.components[1].args.resources.cpu == 2.0 assert process_spec.args.components[1].args.resources.memory == 50 * 1024 * 1024 From d2e7aa21d4e2b17f610ff6b7c64d6f86e805f2de Mon Sep 17 00:00:00 2001 From: Toby Coleman Date: Sat, 7 Feb 2026 18:30:39 +0000 Subject: [PATCH 15/28] Test that actor is passed the resource requirements --- tests/integration/test_component_resources.py | 20 +++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/tests/integration/test_component_resources.py b/tests/integration/test_component_resources.py index 37776e1a..ef7a24c5 100644 --- a/tests/integration/test_component_resources.py +++ b/tests/integration/test_component_resources.py @@ -4,10 +4,12 @@ import typing as _t import pytest +from ray.util.state import list_actors from plugboard.component import Component, IOController as IO -from plugboard.process import LocalProcess -from plugboard.schemas import Resource +from plugboard.connector import RayConnector +from plugboard.process import RayProcess +from plugboard.schemas import ConnectorSpec, Resource class ResourceComponent(Component): @@ -46,19 +48,25 @@ async def test_component_with_default_resources() -> None: @pytest.mark.asyncio -async def test_component_resources_in_local_process() -> None: - """Test that components with resources work in LocalProcess.""" - resources = Resource(cpu=1.0, memory="100Mi") +async def test_component_resources_in_ray_process(ray_ctx: None) -> None: + """Test that components with resources work in RayProcess.""" + resources = Resource(cpu=1.0, memory="1Mi") component = ResourceComponent( name="test", resources=resources, multiplier=2, initial_values={"a": [5]}, ) + connectors = [RayConnector(spec=ConnectorSpec(source="test.b", target="test.a"))] - process = LocalProcess(components=[component], connectors=[]) + process = RayProcess(components=[component], connectors=connectors) async with process: + actors = list_actors(detail=True) + component_actor = next(a for a in actors if a.name == "test") + # Verify the component actor has the correct resources + assert component_actor.required_resources["CPU"] == 1.0 + assert component_actor.required_resources["memory"] == 1.0 * 1024 * 1024 await process.run() assert component.b == 10 From 3301419f9eb3878f1b9acc108d66f922ae5e397d Mon Sep 17 00:00:00 2001 From: Toby Coleman Date: Sat, 7 Feb 2026 18:31:37 +0000 Subject: [PATCH 16/28] Increase timeouts on test CI --- .github/workflows/lint-test.yaml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/lint-test.yaml b/.github/workflows/lint-test.yaml index c81e5f9c..935b24c2 100644 --- a/.github/workflows/lint-test.yaml +++ b/.github/workflows/lint-test.yaml @@ -75,7 +75,7 @@ jobs: test-unit: name: Tests - unit runs-on: ubuntu-latest - timeout-minutes: 5 + timeout-minutes: 8 strategy: matrix: python_version: [3.12, 3.13] @@ -113,7 +113,7 @@ jobs: test-integration: name: Tests - integration runs-on: ubuntu-latest - timeout-minutes: 6 + timeout-minutes: 10 strategy: matrix: python_version: [3.12, 3.13] @@ -157,7 +157,7 @@ jobs: test-integration-tuner: name: Tests - integration:tuner runs-on: ubuntu-latest - timeout-minutes: 5 + timeout-minutes: 8 strategy: matrix: python_version: [3.12, 3.13] From 9b361df53e35eb21473ef3dc9b16bd7f8fbcee21 Mon Sep 17 00:00:00 2001 From: Toby Coleman Date: Sat, 7 Feb 2026 18:34:40 +0000 Subject: [PATCH 17/28] Typing --- tests/integration/test_component_resources.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/integration/test_component_resources.py b/tests/integration/test_component_resources.py index ef7a24c5..a1e0d963 100644 --- a/tests/integration/test_component_resources.py +++ b/tests/integration/test_component_resources.py @@ -70,6 +70,7 @@ async def test_component_resources_in_ray_process(ray_ctx: None) -> None: await process.run() assert component.b == 10 + assert component.resources is not None assert component.resources.cpu == 1.0 From d7c87c6bb93f6882aad2f2be356e8323858a6c8b Mon Sep 17 00:00:00 2001 From: Toby Coleman Date: Sat, 7 Feb 2026 18:43:33 +0000 Subject: [PATCH 18/28] Fixup test --- tests/integration/test_component_resources.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_component_resources.py b/tests/integration/test_component_resources.py index a1e0d963..49127565 100644 --- a/tests/integration/test_component_resources.py +++ b/tests/integration/test_component_resources.py @@ -83,4 +83,4 @@ async def test_component_export_includes_resources() -> None: exported = component.export() assert "args" in exported # The resources should be passed through in the exported args - assert exported["args"]["resources"] == resources + assert exported["args"]["resources"] == resources.model_dump() From a3c9555e9c143e427910965415a9ecf22fbc677f Mon Sep 17 00:00:00 2001 From: Toby Coleman Date: Sat, 7 Feb 2026 19:47:27 +0000 Subject: [PATCH 19/28] Increase tuner test timeout --- .github/workflows/lint-test.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/lint-test.yaml b/.github/workflows/lint-test.yaml index 935b24c2..b9a2c973 100644 --- a/.github/workflows/lint-test.yaml +++ b/.github/workflows/lint-test.yaml @@ -157,7 +157,7 @@ jobs: test-integration-tuner: name: Tests - integration:tuner runs-on: ubuntu-latest - timeout-minutes: 8 + timeout-minutes: 10 strategy: matrix: python_version: [3.12, 3.13] From c9aa1ceaabde530bb8eba597424830dc4154717e Mon Sep 17 00:00:00 2001 From: Toby Coleman Date: Sat, 7 Feb 2026 22:15:06 +0000 Subject: [PATCH 20/28] Correct format for placement groups in Ray tune --- .github/workflows/lint-test.yaml | 2 +- plugboard-schemas/plugboard_schemas/component.py | 15 +++++++++++---- plugboard/tune/tune.py | 4 ++-- 3 files changed, 14 insertions(+), 7 deletions(-) diff --git a/.github/workflows/lint-test.yaml b/.github/workflows/lint-test.yaml index b9a2c973..935b24c2 100644 --- a/.github/workflows/lint-test.yaml +++ b/.github/workflows/lint-test.yaml @@ -157,7 +157,7 @@ jobs: test-integration-tuner: name: Tests - integration:tuner runs-on: ubuntu-latest - timeout-minutes: 10 + timeout-minutes: 8 strategy: matrix: python_version: [3.12, 3.13] diff --git a/plugboard-schemas/plugboard_schemas/component.py b/plugboard-schemas/plugboard_schemas/component.py index a5ee65ec..bdf119d4 100644 --- a/plugboard-schemas/plugboard_schemas/component.py +++ b/plugboard-schemas/plugboard_schemas/component.py @@ -25,6 +25,7 @@ "Pi": 1024**5, "Ei": 1024**6, } +RAY_RESOURCE_PRECISION = 4 # Decimal places to round to for Ray resource values def _parse_resource_value(value: str | float | int) -> float: @@ -104,7 +105,9 @@ def _parse_resources_dict(cls, v: dict[str, str | float | int]) -> dict[str, flo """Validate and parse custom resources dictionary.""" return {key: _parse_resource_value(value) for key, value in v.items()} - def to_ray_options(self) -> dict[str, _t.Any]: + def to_ray_options( + self, style: _t.Literal["actor", "placement_group"] = "actor" + ) -> dict[str, _t.Any]: """Convert resource requirements to Ray actor options. Returns: @@ -112,16 +115,20 @@ def to_ray_options(self) -> dict[str, _t.Any]: """ options: dict[str, _t.Any] = {} + cpu_key = "num_cpus" if style == "actor" else "CPU" + gpu_key = "num_gpus" if style == "actor" else "GPU" if self.cpu > 0: - options["num_cpus"] = self.cpu + options[cpu_key] = round(self.cpu, RAY_RESOURCE_PRECISION) if self.gpu > 0: - options["num_gpus"] = self.gpu + options[gpu_key] = round(self.gpu, RAY_RESOURCE_PRECISION) if self.memory > 0: options["memory"] = self.memory # Add custom resources if self.resources: - options["resources"] = self.resources + options["resources"] = { + key: round(value, RAY_RESOURCE_PRECISION) for key, value in self.resources.items() + } return options diff --git a/plugboard/tune/tune.py b/plugboard/tune/tune.py index a51cc317..af10c44e 100644 --- a/plugboard/tune/tune.py +++ b/plugboard/tune/tune.py @@ -343,7 +343,7 @@ def _calculate_placement_bundles(self, spec: ProcessSpec) -> list[dict[str, floa # Use default resources resources = Resource() - bundles.append(resources.to_ray_options()) + bundles.append(resources.to_ray_options(style="placement_group")) else: # Aggregate resources from all components total_cpu = 1.0 # Ensure at least 1 CPU for the tune process @@ -371,6 +371,6 @@ def _calculate_placement_bundles(self, spec: ProcessSpec) -> list[dict[str, floa memory=total_memory, resources=custom_resources, ) - bundles.append(resources.to_ray_options()) + bundles.append(resources.to_ray_options(style="placement_group")) return bundles From 155c571cb504a42ae022a737c15847550f598a8b Mon Sep 17 00:00:00 2001 From: Toby Coleman Date: Sun, 8 Feb 2026 14:07:49 +0000 Subject: [PATCH 21/28] Typo --- plugboard/tune/tune.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugboard/tune/tune.py b/plugboard/tune/tune.py index af10c44e..7fce1552 100644 --- a/plugboard/tune/tune.py +++ b/plugboard/tune/tune.py @@ -213,7 +213,7 @@ def run(self, spec: ProcessSpec) -> ray.tune.Result | list[ray.tune.Result]: spec: The [`ProcessSpec`][plugboard.schemas.ProcessSpec] to optimise. Returns: - Either one or a list of `Result` objects containing the best trial esult. Use the + Either one or a list of `Result` objects containing the best trial result. Use the `result_grid` property to get full trial results. """ self._logger.info("Running optimisation job on Ray") From a562b8881435d4d07a5d42303623e9e6c896640a Mon Sep 17 00:00:00 2001 From: Toby Coleman Date: Sun, 8 Feb 2026 14:09:36 +0000 Subject: [PATCH 22/28] Coverage --- plugboard/component/component.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/plugboard/component/component.py b/plugboard/component/component.py index 06f12b61..b45d8bdf 100644 --- a/plugboard/component/component.py +++ b/plugboard/component/component.py @@ -25,7 +25,7 @@ from plugboard.utils import DI, ClassRegistry, ExportMixin, is_on_ray_worker -if _t.TYPE_CHECKING: +if _t.TYPE_CHECKING: # pragma: no cover from plugboard.schemas import Resource @@ -73,7 +73,7 @@ def __init__( setattr(self, "init", self._handle_init_wrapper()) setattr(self, "step", self._handle_step_wrapper()) - if is_on_ray_worker(): + if is_on_ray_worker(): # pragma: no cover # Required until https://github.com/ray-project/ray/issues/42823 is resolved try: self.__class__._configure_io() From 5fb86bc4f800549b75963bdbd4237d99d66556ed Mon Sep 17 00:00:00 2001 From: Chris Knight Date: Wed, 11 Feb 2026 20:36:05 +0100 Subject: [PATCH 23/28] refactor: Resource value validation --- .../plugboard_schemas/component.py | 32 +++++++++++-------- 1 file changed, 19 insertions(+), 13 deletions(-) diff --git a/plugboard-schemas/plugboard_schemas/component.py b/plugboard-schemas/plugboard_schemas/component.py index bdf119d4..cc96be7f 100644 --- a/plugboard-schemas/plugboard_schemas/component.py +++ b/plugboard-schemas/plugboard_schemas/component.py @@ -48,24 +48,30 @@ def _parse_resource_value(value: str | float | int) -> float: if isinstance(value, (int, float)): return float(value) + if not isinstance(value, str): + raise ValueError(f"Resource value must be a string, int, or float: {value}") + # Handle string values value = value.strip() - # Sort by length (longest first) to match "Ki" before "k", etc. - for suffix in sorted(RESOURCE_SUFFIXES.keys(), key=len, reverse=True): - if value.endswith(suffix): - # Use re.escape to safely escape the suffix in the regex pattern - pattern = rf"^(\d+(?:\.\d+)?){re.escape(suffix)}$" - match = re.match(pattern, value) - if match: - return float(match.group(1)) * RESOURCE_SUFFIXES[suffix] - raise ValueError(f"Invalid format for suffix '{suffix}': {value}") - - # Try to parse as a plain number + pattern = r"^(\d+(?:\.\d+)?)([a-zA-Z]+)?$" + match = re.search(pattern, value) + if not match: + raise ValueError(f"Invalid resource value format: {value}") + + multiplier = 1.0 + if (suffix := match.group(2)) is not None: + try: + multiplier = RESOURCE_SUFFIXES[suffix] + except KeyError: + raise ValueError(f"Unknown resource suffix '{suffix}' in value: {value}") + try: - return float(value) + number = float(number_group := match.group(1)) except ValueError: - raise ValueError(f"Invalid resource value format: {value}") + raise ValueError(f"Invalid resource value format: {number_group}") + + return number * multiplier class Resource(PlugboardBaseModel): From b0fd414b94ff9524e2ba6500870387cf590862ab Mon Sep 17 00:00:00 2001 From: Chris Knight Date: Wed, 11 Feb 2026 20:38:02 +0100 Subject: [PATCH 24/28] fix: Inconsistent type annotation syntax --- plugboard-schemas/plugboard_schemas/component.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugboard-schemas/plugboard_schemas/component.py b/plugboard-schemas/plugboard_schemas/component.py index cc96be7f..eeeb889e 100644 --- a/plugboard-schemas/plugboard_schemas/component.py +++ b/plugboard-schemas/plugboard_schemas/component.py @@ -164,7 +164,7 @@ class ComponentArgsSpec(PlugboardBaseModel, extra="allow"): initial_values: dict[str, _t.Any] = {} parameters: dict[str, _t.Any] = {} constraints: dict[str, _t.Any] = {} - resources: "Resource | None" = None + resources: Resource | None = None class ComponentSpec(PlugboardBaseModel): From 3f8031f2f8e0878fc6326d08d9e108a139a58c64 Mon Sep 17 00:00:00 2001 From: Chris Knight Date: Wed, 11 Feb 2026 20:49:02 +0100 Subject: [PATCH 25/28] Allows declaring resources at Component declaration time --- plugboard/component/component.py | 21 ++++++++----------- tests/integration/test_component_resources.py | 4 ++-- 2 files changed, 11 insertions(+), 14 deletions(-) diff --git a/plugboard/component/component.py b/plugboard/component/component.py index b45d8bdf..00ce34ee 100644 --- a/plugboard/component/component.py +++ b/plugboard/component/component.py @@ -20,15 +20,11 @@ UnrecognisedEventError, ValidationError, ) -from plugboard.schemas import Status +from plugboard.schemas import Resource, Status from plugboard.state import StateBackend from plugboard.utils import DI, ClassRegistry, ExportMixin, is_on_ray_worker -if _t.TYPE_CHECKING: # pragma: no cover - from plugboard.schemas import Resource - - _io_key_in: str = str(IODirection.INPUT) _io_key_out: str = str(IODirection.OUTPUT) @@ -49,6 +45,7 @@ class Component(ABC, ExportMixin): io: IO = IO(input_events=[StopEvent], output_events=[StopEvent]) exports: _t.Optional[list[str]] = None + resources: Resource = Resource() _implements_step: bool = False @@ -60,7 +57,7 @@ def __init__( parameters: _t.Optional[dict[str, _t.Any]] = None, state: _t.Optional[StateBackend] = None, constraints: _t.Optional[dict] = None, - resources: _t.Optional["Resource"] = None, + resources: _t.Optional[Resource] = None, ) -> None: self.name = name self._initial_values = initial_values or {} @@ -68,7 +65,6 @@ def __init__( self._parameters = parameters or {} self._state: _t.Optional[StateBackend] = state self._state_is_connected = False - self._resources = resources setattr(self, "init", self._handle_init_wrapper()) setattr(self, "step", self._handle_step_wrapper()) @@ -88,6 +84,12 @@ def __init__( namespace=self.name, component=self, ) + self.resources = resources or Resource( + cpu=self.__class__.resources.cpu, + gpu=self.__class__.resources.gpu, + memory=self.__class__.resources.memory, + resources=self.__class__.resources.resources, + ) self._event_producers: dict[str, set[str]] = defaultdict(set) self._status = Status.CREATED self._is_running = False @@ -126,11 +128,6 @@ def parameters(self) -> dict[str, _t.Any]: """Gets the parameters of the component.""" return self._parameters - @property - def resources(self) -> _t.Optional["Resource"]: - """Gets the resource requirements of the component.""" - return self._resources - @classmethod def _configure_io(cls) -> None: # Get all parent classes that are Component subclasses diff --git a/tests/integration/test_component_resources.py b/tests/integration/test_component_resources.py index 49127565..8a0342d7 100644 --- a/tests/integration/test_component_resources.py +++ b/tests/integration/test_component_resources.py @@ -43,8 +43,8 @@ async def test_component_with_resources() -> None: async def test_component_with_default_resources() -> None: """Test that a component uses default resources when none specified.""" component = ResourceComponent(name="test") - - assert component.resources is None + default_resources = Resource() + assert component.resources == default_resources @pytest.mark.asyncio From 82a76cff29e0118ce233e9601803f462f1a918ae Mon Sep 17 00:00:00 2001 From: Chris Knight Date: Wed, 11 Feb 2026 20:52:51 +0100 Subject: [PATCH 26/28] Enhances tests --- tests/integration/test_component_resources.py | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/tests/integration/test_component_resources.py b/tests/integration/test_component_resources.py index 8a0342d7..1b454754 100644 --- a/tests/integration/test_component_resources.py +++ b/tests/integration/test_component_resources.py @@ -27,6 +27,18 @@ async def step(self) -> None: await self.io.close() +class DeclaredResourceComponent(Component): + """Test component with declared class-level resource requirements.""" + + io = IO(inputs=["a"], outputs=["b"]) + resources = Resource(cpu="1", memory="512Mi") + + async def step(self) -> None: + if self.a is not None: + self.b = self.a * 2 + await self.io.close() + + @pytest.mark.asyncio async def test_component_with_resources() -> None: """Test that a component can be created with resource requirements.""" @@ -47,6 +59,27 @@ async def test_component_with_default_resources() -> None: assert component.resources == default_resources +@pytest.mark.asyncio +async def test_component_with_declared_resources() -> None: + """Test that a component with class-level declared resources has those resources.""" + component = DeclaredResourceComponent(name="test") + expected_resources = Resource(cpu=1.0, memory=512 * 1024 * 1024) + assert component.resources == expected_resources + + +@pytest.mark.asyncio +async def test_component_with_declared_resources_overrides() -> None: + """Test that a component with class-level declared resources can be overridden by constructor.""" + resources = Resource(cpu="250m", gpu=0.5, memory="5Gi") + component_1 = DeclaredResourceComponent(name="test", resources=resources) + # Confirm that the constructor resources override the class-level resources + assert component_1.resources == resources + # Confirm that the class-level resources are still the same for a new instances + component_2 = DeclaredResourceComponent(name="test") + expected_resources = Resource(cpu=1.0, memory=512 * 1024 * 1024) + assert component_2.resources == expected_resources + + @pytest.mark.asyncio async def test_component_resources_in_ray_process(ray_ctx: None) -> None: """Test that components with resources work in RayProcess.""" From 2892fe9ab0200e5757bc62199cc593bcf353fba4 Mon Sep 17 00:00:00 2001 From: Chris Knight Date: Wed, 11 Feb 2026 20:55:53 +0100 Subject: [PATCH 27/28] Ignore lint error --- tests/integration/test_component_resources.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/test_component_resources.py b/tests/integration/test_component_resources.py index 1b454754..10f98e2d 100644 --- a/tests/integration/test_component_resources.py +++ b/tests/integration/test_component_resources.py @@ -69,7 +69,7 @@ async def test_component_with_declared_resources() -> None: @pytest.mark.asyncio async def test_component_with_declared_resources_overrides() -> None: - """Test that a component with class-level declared resources can be overridden by constructor.""" + """Test that a component with class-level declared resources can be overridden by constructor.""" # noqa: E501,W505 resources = Resource(cpu="250m", gpu=0.5, memory="5Gi") component_1 = DeclaredResourceComponent(name="test", resources=resources) # Confirm that the constructor resources override the class-level resources From ea310c148d2d0e2e35a65d6cc50b3f4998a80f10 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 11 Feb 2026 20:04:50 +0000 Subject: [PATCH 28/28] Update docs and examples to show class-level resource declaration Co-authored-by: chrisk314 <2366658+chrisk314@users.noreply.github.com> --- .../examples/tutorials/running-in-parallel.md | 48 ++++++++++++++++--- .../004_using_ray/resources-example.yaml | 15 ++++-- .../004_using_ray/resources_example.py | 29 +++++++---- plugboard/component/component.py | 14 ++++++ 4 files changed, 86 insertions(+), 20 deletions(-) diff --git a/docs/examples/tutorials/running-in-parallel.md b/docs/examples/tutorials/running-in-parallel.md index 4a28cbde..e92ef1ba 100644 --- a/docs/examples/tutorials/running-in-parallel.md +++ b/docs/examples/tutorials/running-in-parallel.md @@ -82,22 +82,56 @@ When running components on Ray, you can specify resource requirements for each c uv run ray start --head --num-cpus=4 --num-gpus=1 --resources='{"custom_hardware": 5}' ``` -For example, you can specify [`Resource`][plugboard.schemas.Resource] requirements like this when creating components: +### Declaring resources at component definition + +The recommended way to specify resource requirements is to declare them as a class attribute when defining your component. This makes the resource requirements explicit and part of the component's definition: + +```python +from plugboard.component import Component +from plugboard.schemas import Resource + +class CPUIntensiveTask(Component): + """Component that requires more CPU resources.""" + + io = IO(inputs=["x"], outputs=["y"]) + resources = Resource(cpu=2.0) # Declare resources at class level + + async def step(self) -> None: + # Your component logic here + pass +``` + +### Overriding resources at instantiation + +You can also override resource requirements when creating component instances. This is useful when you want to use the same component class with different resource requirements: + +```python +# Override the class-level resource requirements +component = CPUIntensiveTask( + name="my-task", + resources=Resource(cpu=4.0) # Override to use 4 CPUs instead of 2 +) +``` + +### Example + +For example, you can specify [`Resource`][plugboard.schemas.Resource] requirements like this when defining components: ```python --8<-- "examples/tutorials/004_using_ray/resources_example.py:resources" ``` -Or in YAML: +Or override them in YAML configuration: ```yaml --8<-- "examples/tutorials/004_using_ray/resources-example.yaml:10:" ``` -1. Requires 1 CPU. -2. Requires 2 CPU resources. -3. Requires 0.5 CPU, this time specified in Kubernetes-style format, i.e. 500 milli CPUs. -4. Requires 1 GPU. -5. Requires a custom resource called `custom_hardware`. This needs to specified in the configuration of your Ray cluster to make it available. +1. Override DataProducer to require 1 CPU (instead of the default 0.001). +2. CPUIntensiveTask already declares cpu: 2.0 at the class level, so this matches the class definition. +3. Add 512MB memory requirement to CPUIntensiveTask (extending the class-level resources). +4. Requires 0.5 CPU, specified in Kubernetes-style format (500 milli CPUs). This matches the class-level declaration. +5. Requires 1 GPU, matching the class-level declaration. +6. Add a custom resource called `custom_hardware`. This needs to be specified in the configuration of your Ray cluster to make it available. See the [Ray documentation](https://docs.ray.io/en/latest/ray-core/scheduling/resources.html) for more information about specifying resource requirements. diff --git a/examples/tutorials/004_using_ray/resources-example.yaml b/examples/tutorials/004_using_ray/resources-example.yaml index 8efeffe8..a58aafa9 100644 --- a/examples/tutorials/004_using_ray/resources-example.yaml +++ b/examples/tutorials/004_using_ray/resources-example.yaml @@ -5,6 +5,9 @@ # - Numerical values: cpu: 2.0 # - Milli-units: cpu: "250m" (equals 0.25) # - Memory units: memory: "100Mi" (equals 100 * 1024 * 1024 bytes) +# +# Note: Resources can be declared at the component class level (recommended) +# or overridden in the YAML configuration as shown below. plugboard: process: type: plugboard.process.RayProcess @@ -22,17 +25,21 @@ plugboard: - type: examples.tutorials.004_using_ray.resources_example.CPUIntensiveTask args: name: cpu-task + # CPUIntensiveTask has class-level resources (cpu: 2.0) + # Override to use more memory resources: cpu: 2.0 # (2)! - memory: "512Mi" # Requires 512MB memory + memory: "512Mi" # (3)! - type: examples.tutorials.004_using_ray.resources_example.GPUTask args: name: gpu-task + # GPUTask has class-level resources (cpu: "500m", gpu: 1) + # Can override or extend with custom resources resources: - cpu: "500m" # (3)! - gpu: 1 # (4)! + cpu: "500m" # (4)! + gpu: 1 # (5)! resources: - custom_hardware: 2 # (5)! + custom_hardware: 2 # (6)! connectors: - source: producer.output target: cpu-task.x diff --git a/examples/tutorials/004_using_ray/resources_example.py b/examples/tutorials/004_using_ray/resources_example.py index 75567d93..882e2c3d 100644 --- a/examples/tutorials/004_using_ray/resources_example.py +++ b/examples/tutorials/004_using_ray/resources_example.py @@ -12,22 +12,32 @@ class CPUIntensiveTask(Component): - """Component that requires more CPU resources.""" + """Component that requires more CPU resources. + + Resource requirements are declared as a class attribute. + """ io = IO(inputs=["x"], outputs=["y"]) + resources = Resource(cpu=2.0) # Declare resources at class level async def step(self) -> None: + """Execute CPU-intensive computation.""" # Simulate CPU-intensive work result = sum(i**2 for i in range(int(self.x * 10000))) self.y = result class GPUTask(Component): - """Component that requires GPU resources.""" + """Component that requires GPU resources. + + Resource requirements are declared as a class attribute. + """ io = IO(inputs=["data"], outputs=["result"]) + resources = Resource(cpu="500m", gpu=1) # Declare resources at class level async def step(self) -> None: + """Execute GPU computation.""" # Simulate GPU computation self.result = self.data * 2 @@ -38,13 +48,16 @@ class DataProducer(Component): io = IO(outputs=["output"]) def __init__(self, iters: int, **kwargs: _t.Unpack[ComponentArgsDict]) -> None: + """Initialize DataProducer with iteration count.""" super().__init__(**kwargs) self._iters = iters async def init(self) -> None: + """Initialize the data sequence.""" self._seq = iter(range(self._iters)) async def step(self) -> None: + """Produce the next data value.""" try: self.output = next(self._seq) except StopIteration: @@ -53,16 +66,14 @@ async def step(self) -> None: async def main() -> None: """Run the process with resource-constrained components.""" - # Define resource requirements for components # --8<-- [start:resources] - cpu_resources = Resource(cpu=2.0) # Requires 2 CPUs - gpu_resources = Resource(cpu="500m", gpu=1) # Requires 0.5 CPU and 1 GPU - + # Resources can be declared at the class level (see CPUIntensiveTask and GPUTask above) + # or overridden when instantiating components process = RayProcess( components=[ - DataProducer(name="producer", iters=5, resources=cpu_resources), - CPUIntensiveTask(name="cpu-task", resources=cpu_resources), - GPUTask(name="gpu-task", resources=gpu_resources), + DataProducer(name="producer", iters=5), # Uses default resources + CPUIntensiveTask(name="cpu-task"), # Uses class-level resources (2.0 CPU) + GPUTask(name="gpu-task"), # Uses class-level resources (0.5 CPU, 1 GPU) ], connectors=[ RayConnector(spec=ConnectorSpec(source="producer.output", target="cpu-task.x")), diff --git a/plugboard/component/component.py b/plugboard/component/component.py index 00ce34ee..bb2ec58e 100644 --- a/plugboard/component/component.py +++ b/plugboard/component/component.py @@ -41,6 +41,9 @@ class Component(ABC, ExportMixin): io: The `IOController` for the component, specifying inputs, outputs, and events. exports: Optional; The exportable fields from the component during distributed runs in addition to input and output fields. + resources: Resource requirements for the component. Can be declared as a class attribute + to set default resource requirements, which can be overridden in the constructor. + Defaults to `Resource()` (0.001 CPU, 0 GPU, 0 memory). """ io: IO = IO(input_events=[StopEvent], output_events=[StopEvent]) @@ -59,6 +62,17 @@ def __init__( constraints: _t.Optional[dict] = None, resources: _t.Optional[Resource] = None, ) -> None: + """Initialize a Component instance. + + Args: + name: The name of the component. + initial_values: Optional; Initial values for the component's inputs. + parameters: Optional; Parameters for the component. + state: Optional; State backend for the component. + constraints: Optional; Constraints for the component. + resources: Optional; Resource requirements for the component. If not provided, + uses the class-level `resources` attribute as default. + """ self.name = name self._initial_values = initial_values or {} self._constraints = constraints or {}