Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 19 additions & 12 deletions optimum/neuron/accelerate/accelerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,18 @@
from typing import Any, Callable

import torch
import torch_xla
import torch_xla.core.xla_model as xm
import torch_xla.runtime as xr
from accelerate import Accelerator
from accelerate.checkpointing import save_accelerator_state, save_custom_state
from accelerate.utils import AutocastKwargs, DistributedType
from accelerate.utils.operations import gather_object, recursively_apply
from accelerate.utils.operations import recursively_apply
from neuronx_distributed import parallel_layers
from neuronx_distributed.optimizer import NeuronZero1Optimizer
from neuronx_distributed.parallel_layers.parallel_state import (
get_context_model_parallel_size,
get_data_parallel_group,
get_data_parallel_replica_groups,
get_data_parallel_size,
get_tensor_model_parallel_replica_groups,
Expand All @@ -58,15 +60,13 @@
from .optimizer import NeuronAcceleratedOptimizer
from .scheduler import NeuronAcceleratedScheduler
from .state import NeuronAcceleratorState
from .utils import (
patch_accelerate_is_torch_xla_available,
)
from .utils.dataclasses import MixedPrecisionConfig, MixedPrecisionMode
from .utils.misc import (
apply_activation_checkpointing,
create_patched_save_pretrained,
patch_accelerate_is_torch_xla_available,
)
from .utils.operations import _xla_gather
from .utils.operations import gather_object


# Setup logging so that the main process logs at the INFO level and the others are silent.
Expand Down Expand Up @@ -390,7 +390,7 @@ def prepare_model(
move_model_to_device(model, xm.xla_device())
model.tie_weights()

xm.mark_step()
torch_xla.sync()

# Adding the model to the list of prepared models.
self._models.append(model)
Expand Down Expand Up @@ -474,7 +474,7 @@ def _inner(folder):
logger.info(f"Saving current state to {output_dir}")

# Finish running the previous step before checkpointing
xm.mark_step()
torch_xla.sync()

# Save the models
if save_model_func is not None:
Expand Down Expand Up @@ -547,10 +547,18 @@ def save_state(
output_dir=output_dir, safe_serialization=safe_serialization, **save_model_func_kwargs
)

def gather(self, tensor, out_of_graph: bool = False):
return _xla_gather(tensor, out_of_graph=out_of_graph)
def gather(self, tensor: torch.Tensor, sync: bool = False) -> torch.Tensor:
groups = get_data_parallel_group(as_list=True)

# Ensure tensor is at least 1D for all_gather (scalars need to be unsqueezed)
input_tensor = tensor.unsqueeze(0) if tensor.ndim == 0 else tensor
gathered = xm.all_gather(input_tensor, dim=0, groups=groups, pin_layout=False)

if sync:
torch_xla.sync()
return gathered

def gather_for_metrics(self, input_data, use_gather_object: bool = False):
def gather_for_metrics(self, input_data, use_gather_object: bool = False, sync: bool = False):
try:
recursively_apply(lambda x: x, input_data, error_on_other_type=True)
all_tensors = True
Expand All @@ -562,8 +570,7 @@ def gather_for_metrics(self, input_data, use_gather_object: bool = False):
if use_gather_object:
data = gather_object(input_data)
else:
# It is needed to perform out-of-graph gather otherwise re-compilation happens at every evaluation step.
data = self.gather(input_data, out_of_graph=True)
data = self.gather(input_data, sync=sync)

try:
if self.gradient_state.end_of_dataloader:
Expand Down
10 changes: 10 additions & 0 deletions optimum/neuron/accelerate/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,13 @@

from .dataclasses import MixedPrecisionConfig, MixedPrecisionMode
from .misc import patch_accelerate_is_torch_xla_available
from .operations import (
broadcast_object,
broadcast_object_to_data_parallel_group,
broadcast_object_to_pipeline_model_parallel_group,
broadcast_object_to_tensor_model_parallel_group,
gather_object,
gather_object_from_data_parallel_group,
gather_object_from_pipeline_model_parallel_group,
gather_object_from_tensor_model_parallel_group,
)
3 changes: 2 additions & 1 deletion optimum/neuron/accelerate/utils/misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import accelerate
import torch
import torch_xla
import torch_xla.core.xla_model as xm
from neuronx_distributed.parallel_layers.parallel_state import (
get_data_parallel_rank,
Expand Down Expand Up @@ -119,7 +120,7 @@ def wrapper(*args, **kwargs):
with patcher:
output = orig_func(*args, **kwargs)
self.load_state_dict(orig_state_dict, assign=True)
xm.mark_step()
torch_xla.sync()
del cpu_state_dict
gc.collect()
return output
Expand Down
Loading