Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -302,19 +302,27 @@ def can_add_input(self) -> bool:

def _start_actor(
self, labels: Dict[str, str], logical_actor_id: LogicalActorId
) -> Tuple[ActorHandle, ObjectRef]:
) -> Tuple[ActorHandle, ObjectRef, ExecutionResources]:
"""Start a new actor and add it to the actor pool as a pending actor.

Args:
labels: The key-value labels to launch the actor with.
logical_actor_id: The logical id of the actor.

Returns:
A tuple of the actor handle and the object ref to the actor's location.
A tuple of the actor handle, the object ref to the actor's location,
and the actual resource usage for this actor.
"""
assert self._actor_cls is not None
if self._ray_remote_args_fn:
self._refresh_actor_cls()
actual_remote_args = self._refresh_actor_cls()
else:
actual_remote_args = self._ray_remote_args
actor_resource_usage = ExecutionResources(
cpu=actual_remote_args.get("num_cpus", 0),
gpu=actual_remote_args.get("num_gpus", 0),
memory=actual_remote_args.get("memory", 0),
)
actor = self._actor_cls.options(
_labels={self._OPERATOR_ID_LABEL_KEY: self.id, **labels}
).remote(
Expand All @@ -340,7 +348,7 @@ def _task_done_callback(res_ref):
res_ref,
lambda: _task_done_callback(res_ref),
)
return actor, res_ref
return actor, res_ref, actor_resource_usage

def _try_schedule_task(self, bundle: RefBundle, strict: bool):
# Notify first input for deferred initialization (e.g., Iceberg schema evolution).
Expand Down Expand Up @@ -426,23 +434,23 @@ def _task_done_callback(actor_to_return):

return num_submitted_tasks

def _refresh_actor_cls(self):
def _refresh_actor_cls(self) -> Dict[str, Any]:
"""When `self._ray_remote_args_fn` is specified, this method should
be called prior to initializing the new worker in order to get new
remote args passed to the worker. It updates `self.cls` with the same
`_MapWorker` class, but with the new remote args from
`self._ray_remote_args_fn`."""
`self._ray_remote_args_fn`.

Returns:
The merged remote args used to create the actor class.
"""
assert self._ray_remote_args_fn, "_ray_remote_args_fn must be provided"
remote_args = self._ray_remote_args.copy()
new_remote_args = self._ray_remote_args_fn()

# Override args from user-defined remote args function.
new_and_overriden_remote_args = {}
for k, v in new_remote_args.items():
remote_args[k] = v
new_and_overriden_remote_args[k] = v
remote_args.update(new_remote_args)
self._actor_cls = ray.remote(**remote_args)(self._map_worker_cls)
return new_and_overriden_remote_args
return remote_args

def has_next(self) -> bool:
# In case there are still enqueued bundles remaining, try to
Expand Down Expand Up @@ -535,24 +543,11 @@ def min_max_resource_requirements(

def current_logical_usage(self) -> ExecutionResources:
# Both pending and running actors count towards our current resource usage.
num_active_workers = self._actor_pool.current_size()
return ExecutionResources(
cpu=self._ray_remote_args.get("num_cpus", 0) * num_active_workers,
gpu=self._ray_remote_args.get("num_gpus", 0) * num_active_workers,
memory=self._ray_remote_args.get("memory", 0) * num_active_workers,
)
return self._actor_pool.current_logical_usage()

def pending_logical_usage(self) -> ExecutionResources:
# Both pending and restarting actors count towards pending processor usage
num_pending_workers = (
self._actor_pool.num_pending_actors()
+ self._actor_pool.num_restarting_actors()
)
return ExecutionResources(
cpu=self._ray_remote_args.get("num_cpus", 0) * num_pending_workers,
gpu=self._ray_remote_args.get("num_gpus", 0) * num_pending_workers,
memory=self._ray_remote_args.get("memory", 0) * num_pending_workers,
)
# Both pending and restarting actors count towards pending processor usage.
return self._actor_pool.pending_logical_usage()

def incremental_resource_usage(self) -> ExecutionResources:
# Submitting tasks to existing actors doesn't require additional
Expand Down Expand Up @@ -751,7 +746,7 @@ class _ActorPool(AutoscalingActorPool):

def __init__(
self,
create_actor_fn: Callable[[Dict[str, str]], Tuple[ActorHandle, ObjectRef[Any]]],
create_actor_fn: "Callable[[Dict[str, str]], Tuple[ActorHandle, ObjectRef[Any], ExecutionResources]]",
config: AutoscalingActorConfig,
map_worker_cls_name: str = "MapWorker",
debounce_period_s: int = _ACTOR_POOL_SCALE_DOWN_DEBOUNCE_PERIOD_S,
Expand All @@ -760,8 +755,8 @@ def __init__(

Args:
create_actor_fn: Callable that takes key-value labels as input and
creates an actor with those labels. Returns the actor handle and
a reference to the actor's node ID.
creates an actor with those labels. Returns the actor handle, a
reference to the actor's node ID, and the actor's resource usage.
config: Configuration for the autoscaling actor pool, including
min/max/initial pool sizes, concurrency, and resource usage.
map_worker_cls_name: Name of the map worker class for logging
Expand All @@ -783,6 +778,12 @@ def __init__(
self._pending_actors: Dict[ObjectRef, ActorHandle] = {}
# Map from actor handle to its logical ID.
self._actor_to_logical_id: Dict[ActorHandle, LogicalActorId] = {}
# Per-actor resource usage, needed because ray_remote_args_fn can
# produce different resources for each actor.
self._actor_resource_usage: Dict[ActorHandle, ExecutionResources] = {}
# Cached aggregate resource counters.
self._total_usage = ExecutionResources.zero()
self._pending_or_restarting_usage = ExecutionResources.zero()
# Cached values for actor / task counts
self._num_restarting_actors: int = 0
self._num_active_actors: int = 0
Expand Down Expand Up @@ -848,8 +849,8 @@ def scale(self, req: ActorPoolScalingRequest) -> Optional[int]:
)

for _ in range(target_num_actors):
actor, ready_ref = self._create_actor()
self._add_pending_actor(actor, ready_ref)
actor, ready_ref, resource_usage = self._create_actor()
self._add_pending_actor(actor, ready_ref, resource_usage)

# Capture last scale up timestamp
self._last_upscaled_at = time.time()
Expand Down Expand Up @@ -935,13 +936,22 @@ def pending_to_running(self, ready_ref: ray.ObjectRef) -> Optional[ActorHandle]:
# Actor init failed - clean up the actor from _actor_to_logical_id
# This must happen for all exceptions, not just RayError, to prevent
# memory leaks where dead actor handles remain in _actor_to_logical_id.
usage = self._actor_resource_usage.pop(actor)
self._total_usage = self._total_usage.subtract(usage)
self._pending_or_restarting_usage = (
self._pending_or_restarting_usage.subtract(usage)
)
self._actor_to_logical_id.pop(actor, None)
raise
self._running_actors[actor] = _ActorState(
num_tasks_in_flight=0,
actor_location=actor_location,
is_restarting=False,
)
# Actor is no longer pending — subtract from pending usage.
self._pending_or_restarting_usage = self._pending_or_restarting_usage.subtract(
self._actor_resource_usage[actor]
)
# NOTE: We assume any actor that goes from pending to running is ALIVE
self._alive_actors_to_in_flight_tasks_heap[actor] = _ActorRank(0)
self._alive_node_to_actor_heap[actor_location][actor] = _ActorRank(0)
Expand Down Expand Up @@ -1051,12 +1061,16 @@ def _can_apply_request(self, req: ActorPoolScalingRequest) -> bool:

return True

def _create_actor(self) -> Tuple[ActorHandle, ObjectRef]:
def _create_actor(
self,
) -> Tuple[ActorHandle, ObjectRef, ExecutionResources]:
logical_actor_id = str(uuid.uuid4())
labels = {self.get_logical_id_label_key(): logical_actor_id}
actor, ready_ref = self._create_actor_fn(labels, logical_actor_id)
actor, ready_ref, resource_usage = self._create_actor_fn(
labels, logical_actor_id
)
self._actor_to_logical_id[actor] = logical_actor_id
return actor, ready_ref
return actor, ready_ref, resource_usage

def _update_running_actor_state(self, actor: ActorHandle):
"""Update running actor state. This is called for every actor
Expand All @@ -1080,10 +1094,20 @@ def _update_running_actor_state(self, actor: ActorHandle):
assert actor_state == _ACTOR_STATE_RESTARTING, actor_state
if not running_actor_state.is_restarting:
self._num_restarting_actors += 1
self._pending_or_restarting_usage = (
self._pending_or_restarting_usage.add(
self._actor_resource_usage[actor]
)
)
running_actor_state.is_restarting = True
else:
if running_actor_state.is_restarting:
self._num_restarting_actors -= 1
self._pending_or_restarting_usage = (
self._pending_or_restarting_usage.subtract(
self._actor_resource_usage[actor]
)
)
running_actor_state.is_restarting = False

self._update_rank(actor=actor, state=running_actor_state, died=died)
Expand All @@ -1110,7 +1134,12 @@ def _update_rank(self, actor: ActorHandle, state: _ActorState, died: bool):
if node_heap is not None and actor in node_heap:
del node_heap[actor]

def _add_pending_actor(self, actor: ActorHandle, ready_ref: ray.ObjectRef):
def _add_pending_actor(
Comment thread
cursor[bot] marked this conversation as resolved.
self,
actor: ActorHandle,
ready_ref: ObjectRef,
resource_usage: ExecutionResources,
):
"""Adds a pending actor to the pool.

This actor won't be pickable until it is marked as running via a
Expand All @@ -1119,8 +1148,14 @@ def _add_pending_actor(self, actor: ActorHandle, ready_ref: ray.ObjectRef):
Args:
actor: The not-yet-ready actor to add as pending to the pool.
ready_ref: The ready future for the actor.
resource_usage: The actual resource usage for this actor.
"""
self._pending_actors[ready_ref] = actor
self._actor_resource_usage[actor] = resource_usage
self._total_usage = self._total_usage.add(resource_usage)
self._pending_or_restarting_usage = self._pending_or_restarting_usage.add(
resource_usage
)

def _get_logical_ids(self) -> List[LogicalActorId]:
"""Get the logical IDs for pending and running actors in the actor pool.
Expand Down Expand Up @@ -1149,6 +1184,11 @@ def _try_remove_pending_actor(self) -> bool:
# At least one pending actor, so kill first one.
ready_ref = next(iter(self._pending_actors.keys()))
actor = self._pending_actors.pop(ready_ref)
usage = self._actor_resource_usage.pop(actor)
self._total_usage = self._total_usage.subtract(usage)
self._pending_or_restarting_usage = (
self._pending_or_restarting_usage.subtract(usage)
)
del self._actor_to_logical_id[actor]
return True
# No pending actors, so indicate to the caller that no actors were killed.
Expand All @@ -1168,9 +1208,16 @@ def _release_pending_actors(self, force: bool):
# Release pending actors from the set of pending ones
pending = dict(self._pending_actors)
self._pending_actors.clear()
for actor in pending.values():
usage = self._actor_resource_usage.pop(actor)
self._total_usage = self._total_usage.subtract(usage)
self._pending_or_restarting_usage = (
self._pending_or_restarting_usage.subtract(usage)
)
self._actor_to_logical_id.pop(actor, None)

if force:
for _, actor in pending.items():
for actor in pending.values():
# NOTE: Actors can't be brought back after being ``ray.kill``-ed,
# hence we're only doing that if this is a forced release
ray.kill(actor)
Expand Down Expand Up @@ -1222,6 +1269,13 @@ def _release_running_actor(self, actor: ActorHandle):
del self._running_actors[actor]
del self._actor_to_logical_id[actor]

usage = self._actor_resource_usage.pop(actor)
self._total_usage = self._total_usage.subtract(usage)
if actor_state.is_restarting:
self._pending_or_restarting_usage = (
self._pending_or_restarting_usage.subtract(usage)
)

def _find_actor_with_locality(self, bundle: RefBundle) -> Optional[ActorHandle]:
"""Find the least-busy alive actor on the preferred node with the most data.

Expand Down Expand Up @@ -1251,3 +1305,9 @@ def _find_actor_with_locality(self, bundle: RefBundle) -> Optional[ActorHandle]:
return actor

return None

def current_logical_usage(self) -> ExecutionResources:
return self._total_usage

def pending_logical_usage(self) -> ExecutionResources:
return self._pending_or_restarting_usage
8 changes: 4 additions & 4 deletions python/ray/data/tests/test_actor_pool_map_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,11 @@ def _create_actor_fn(
self,
labels: Dict[str, Any],
logical_actor_id: str = "Actor1",
) -> Tuple[ActorHandle, ObjectRef[Any]]:
) -> Tuple[ActorHandle, ObjectRef[Any], ExecutionResources]:
actor = PoolWorker.options(_labels=labels).remote(self._actor_node_id)
ready_ref = actor.get_location.remote()
self._last_created_actor_and_ready_ref = actor, ready_ref
return actor, ready_ref
return actor, ready_ref, ExecutionResources(cpu=1)

def _create_actor_pool(
self,
Expand Down Expand Up @@ -787,9 +787,9 @@ def test_actor_pool_scale_logs_include_map_worker_cls_name(
def create_actor_fn(
labels: Dict[str, Any],
logical_actor_id: str = "Actor1",
) -> Tuple[ActorHandle, ObjectRef[Any]]:
) -> Tuple[ActorHandle, ObjectRef[Any], ExecutionResources]:
actor = PoolWorker.options(_labels=labels).remote("node1")
return actor, actor.get_location.remote()
return actor, actor.get_location.remote(), ExecutionResources(cpu=1)

config = AutoscalingActorConfig(
min_size=1,
Expand Down
28 changes: 28 additions & 0 deletions python/ray/data/tests/test_executor_resource_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,34 @@ def test_actor_pool_scheduling(ray_start_10_cpus_shared, restore_data_context):
assert op.metrics.obj_store_mem_pending_task_outputs == 0


def test_actor_pool_resource_reporting_with_dynamic_remote_args(
ray_start_10_cpus_shared,
):
"""Test that current_logical_usage reflects dynamic resources from ray_remote_args_fn,
not just the statically defined ray_remote_args."""
input_op = InputDataBuffer(
DataContext.get_current(), make_ref_bundles([[SMALL_STR] for i in range(100)])
)
# ray_remote_args set 1 CPU, but ray_remote_args_fn overrides memory to 500
op = MapOperator.create(
_mul2_map_data_prcessor,
min_rows_per_bundle=None,
input_op=input_op,
data_context=DataContext.get_current(),
name="TestMapper",
compute_strategy=ActorPoolStrategy(min_size=2, max_size=2), # Create two actors
ray_remote_args={"num_cpus": 1},
ray_remote_args_fn=lambda: {"memory": 500},
)

# Blocking until actors are fully started
op.start(ExecutionOptions())
run_op_tasks_sync(op, only_existing=True)

# Should reflect dynamic resources: 2 actors * (1 cpu, 500 memory)
assert op.current_logical_usage() == ExecutionResources(cpu=2, gpu=0, memory=1000)


def test_actor_pool_scheduling_with_bundling(
ray_start_10_cpus_shared, restore_data_context
):
Expand Down
Loading