Skip to content

Commit 26cc5ec

Browse files
JoshWoo2003delock
andcommitted
Resolve merge conflicts and update ZenFlow Stage 3 affinity
- Resolved merge conflicts with upstream changes - Unified ZenFlow affinity behavior for Stage 3 with Stage 1 and Stage 2 Signed-off-by: Yusen Wu <xrn4ub@virginia.edu> Co-authored-by: Ma, Guokai <guokai.ma@intel.com>
1 parent d73cf43 commit 26cc5ec

2 files changed

Lines changed: 51 additions & 22 deletions

File tree

deepspeed/runtime/zenflow/engine_stage3.py

Lines changed: 47 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import os
99
import torch
1010
import math
11+
import psutil
1112
from deepspeed import comm as dist
1213
from deepspeed.utils import logger
1314
from deepspeed.ops.adam import ZenFlowSelectiveAdamW_stage3
@@ -78,6 +79,8 @@ def _initialize_zenflow_stage3_prologue(optimizer_z3: "DeepSpeedZeroOptimizer_St
7879
if not optimizer_z3.zenflow:
7980
return
8081

82+
optimizer_z3.pt_reserved_cores_perc = zenflow_config.pt_reserved_cores_perc
83+
8184
for p in module.parameters():
8285
p.data = p.data.t().contiguous() if len(p.shape) != 1 else p.data
8386

@@ -531,28 +534,12 @@ def disable_accelerator():
531534

532535

533536
def zenflow_optimizer_process(pipe, curr_rank, total_rank, param_groups, shared_overlap_grad_map,
534-
shared_stale_param_map):
535-
os.environ["CUDA_VISIBLE_DEVICES"] = ""
537+
shared_stale_param_map, zf_affinity):
536538
disable_accelerator()
537539

538-
TOTAL_CORES = os.cpu_count()
539-
CPUADAM_CORE_START = 0
540-
CPUADAM_CORE_END = TOTAL_CORES
541-
TOTAL_CORES = CPUADAM_CORE_END - CPUADAM_CORE_START
542-
543-
cores_per_rank = TOTAL_CORES // total_rank
544-
extra = TOTAL_CORES % total_rank
545-
start_offset = curr_rank * cores_per_rank + min(curr_rank, extra)
546-
end_offset = start_offset + cores_per_rank + (1 if curr_rank < extra else 0)
547-
assigned_cores = set(range(CPUADAM_CORE_START + start_offset, CPUADAM_CORE_START + end_offset))
548-
549-
try:
550-
os.sched_setaffinity(0, assigned_cores)
551-
print(f"[Optimizer Thread] Rank {curr_rank} bound to CPU cores: {os.sched_getaffinity(0)}", flush=True)
552-
except AttributeError:
553-
print("[Optimizer Thread] sched_setaffinity not supported on this system.")
554-
except Exception as e:
555-
print(f"[Optimizer Thread] Failed to set affinity: {e}")
540+
current_process = psutil.Process()
541+
current_process.cpu_affinity(zf_affinity)
542+
os.environ['OMP_NUM_THREADS'] = str(len(zf_affinity))
556543

557544
from deepspeed.ops.adam import ZenFlowCPUAdam
558545
optimizer = ZenFlowCPUAdam(param_groups, overlap_step=True)
@@ -589,6 +576,14 @@ def zenflow_optimizer_process(pipe, curr_rank, total_rank, param_groups, shared_
589576
break
590577

591578

579+
def all_tensors_equal(tensor_list):
580+
first_tensor = tensor_list[0]
581+
for tensor in tensor_list[1:]:
582+
if not torch.equal(first_tensor, tensor):
583+
return False
584+
return True
585+
586+
592587
def start_optimizer_process(optimizer_z3):
593588
from multiprocessing import Pipe, get_context, Manager
594589

@@ -615,14 +610,45 @@ def start_optimizer_process(optimizer_z3):
615610
curr_rank = dist.get_rank()
616611
total_rank = dist.get_world_size()
617612

613+
current_process = psutil.Process()
614+
current_affinity = current_process.cpu_affinity()
615+
all_affinities = [
616+
torch.zeros(len(current_affinity),
617+
dtype=type(current_affinity[0]),
618+
device=get_accelerator().current_device_name()) for _ in range(total_rank)
619+
]
620+
dist.all_gather(
621+
all_affinities,
622+
torch.tensor(current_affinity, dtype=type(current_affinity[0]),
623+
device=get_accelerator().current_device_name()))
624+
# When affinity across all ranks are the same, the workers are not binded. Do a soft bind here
625+
if all_tensors_equal(all_affinities):
626+
num_phy_cores = psutil.cpu_count(logical=False)
627+
available_phy_cores = [i for i in current_affinity if i < num_phy_cores]
628+
num_available_phy_cores = len(available_phy_cores)
629+
my_rank = curr_rank
630+
my_size = total_rank
631+
cores_per_rank = num_available_phy_cores // my_size
632+
current_affinity = available_phy_cores[my_rank * cores_per_rank:(my_rank + 1) * cores_per_rank]
633+
pt_num_cores = math.ceil(optimizer_z3.pt_reserved_cores_perc * len(current_affinity))
634+
if pt_num_cores > 0 and pt_num_cores < len(current_affinity):
635+
zf_affinity = current_affinity[pt_num_cores:]
636+
pt_affinity = current_affinity[:pt_num_cores]
637+
else:
638+
zf_affinity = current_affinity
639+
pt_affinity = current_affinity
640+
618641
optimizer_z3.process = ctx.Process(
619642
target=zenflow_optimizer_process,
620643
args=(optimizer_z3.child_conn, curr_rank, total_rank, param_groups_data, optimizer_z3.shared_overlap_grad_map,
621-
optimizer_z3.shared_stale_param_map),
644+
optimizer_z3.shared_stale_param_map, zf_affinity),
622645
)
623646
optimizer_z3.process.daemon = True
624647
optimizer_z3.process.start()
625648

649+
current_process.cpu_affinity(pt_affinity)
650+
os.environ['OMP_NUM_THREADS'] = str(len(pt_affinity))
651+
626652
msg = optimizer_z3.parent_conn.recv()
627653
assert msg["type"] == "ready", "Optimizer process did not initialize correctly."
628654

deepspeed/runtime/zero/stage3.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1031,7 +1031,10 @@ def step_with_gradscaler(optimizer):
10311031
self.torch_autocast_gradscaler.step(optimizer)
10321032
self.torch_autocast_gradscaler.update()
10331033
else:
1034-
optimizer.step()
1034+
if not self.zenflow:
1035+
optimizer.step()
1036+
else:
1037+
self.zenflow_cpu_optimizer_step()
10351038

10361039
if self.offload_optimizer:
10371040
cur_device = self.subgroup_to_device[sub_group_id]

0 commit comments

Comments
 (0)