From 8d986c7acb42281a239d6f3cee79cd5e686f582b Mon Sep 17 00:00:00 2001 From: yiliu30 Date: Sat, 24 Jan 2026 19:36:24 +0800 Subject: [PATCH 1/6] ddp poc Signed-off-by: yiliu30 --- auto_round/compressors/base.py | 70 +++++++++++++++++++++++++++++++-- auto_round/compressors/utils.py | 30 ++++++++++++++ 2 files changed, 97 insertions(+), 3 deletions(-) diff --git a/auto_round/compressors/base.py b/auto_round/compressors/base.py index e226fe742..e777b7657 100644 --- a/auto_round/compressors/base.py +++ b/auto_round/compressors/base.py @@ -50,6 +50,7 @@ reset_params, set_layer_config, ) +from auto_round.compressors.utils import DDPIndexSampler from auto_round.data_type import QUANT_FUNC_WITH_DTYPE from auto_round.data_type.utils import reshape_pad_tensor_by_group_size from auto_round.export.export_to_gguf.config import GGUF_INNER_CONFIG, ModelType @@ -142,7 +143,51 @@ "to_quant_block_names", ) +import torch.distributed as dist +def rank_log(msg: str) -> None: + """Log message with rank information in distributed setting.""" + rank = torch.distributed.get_rank() if torch.distributed.is_initialized() else 0 + logger.info(f"[Rank {rank}] {msg}") + +# Source - https://stackoverflow.com/a +# Posted by Romuald Brunet, modified by community. See post 'Timeline' for change history +# Retrieved 2026-01-24, License - CC BY-SA 3.0 + +import sys +import pdb + +class ForkedPdb(pdb.Pdb): + """A Pdb subclass that may be used + from a forked multiprocessing child + + """ + def interaction(self, *args, **kwargs): + _stdin = sys.stdin + try: + sys.stdin = open('/dev/stdin') + pdb.Pdb.interaction(self, *args, **kwargs) + finally: + sys.stdin = _stdin + + +def check_grad(block, msg): + the_first_param_with_grad = None + for name, param in block.named_parameters(): + if param.grad is not None: + the_first_param_with_grad = (name, param) + break + if the_first_param_with_grad is None: + rank_log(f"{msg} No grad found in block.") + else: + name, param = the_first_param_with_grad + rank_log(f"{msg} Grad found in block. Param name: {name}, Grad norm: {param.grad.norm().item()}. grad: {param.grad}") + + +def is_ddp(): + return dist.is_initialized() and dist.get_world_size() > 1 + +from torch.nn.parallel import DistributedDataParallel as DDP class BaseCompressor(object): """Base compressor for LLM quantization @@ -2874,9 +2919,24 @@ def _quantize_block( whole_indices = torch.arange(global_batch_size) num_elm = self._get_current_num_elm(input_ids, whole_indices) - index_sampler = IndexSampler(nsamples, global_batch_size) + if is_ddp(): + index_sampler = DDPIndexSampler(nsamples, global_batch_size, iters=self.iters) + else: + index_sampler = IndexSampler(nsamples, global_batch_size) batch_size = self.batch_size + rank = torch.distributed.get_rank() if torch.distributed.is_initialized() else 0 + rank_log(f"DDP rank {rank} is quantizing the block") + # ForkedPdb().set_trace() + rank_log(f"device info: deivce: {device}, loss_device: {loss_device}, block's device {next(block.parameters()).device}") + block = DDP(block, device_ids=[rank], find_unused_parameters=True) + dist.barrier() + logger.warning_once( + "DistributedDataParallel (DDP) is used for block quantization. " + f"block: {block}" + ) + for i in range(self.iters): + rank_log(f"starts iteration {i} for block quantization") if self.enable_alg_ext and self.data_type.endswith("dq"): for n, m in block.named_modules(): m.cur_iter = i @@ -2886,6 +2946,7 @@ def _quantize_block( num_elm = self._get_non_zero_cnt(self.attention_mask, global_indices) for tmp_step in range(self.gradient_accumulate_steps): + rank_log(f"iteration {i} tmp_step {tmp_step} start") indices = global_indices[tmp_step * batch_size : (tmp_step + 1) * batch_size] current_output = self._get_current_output(output, indices) current_output = to_device(current_output, loss_device) @@ -2897,9 +2958,9 @@ def _quantize_block( if self.low_gpu_mem_usage and card_0_in_high_risk: # clear memory to avoid OOM due to memory fragmentation clear_memory_if_reached_threshold(threshold=0.5, device_list=self.device_list) - + # rank_log(f"iteration {i} tmp_step {tmp_step} loss: {loss.item()}, loss device {loss.device}, starting backward") self._scale_loss_and_backward(scaler, loss) - + # check_grad(block, msg=f"block quantization iter {i} tmp_step {tmp_step}") if self.low_gpu_mem_usage and card_0_in_high_risk: # clear memory to avoid OOM due to memory fragmentation clear_memory_if_reached_threshold(threshold=0.8, device_list=self.device_list) @@ -2921,6 +2982,9 @@ def _quantize_block( if 0 < self.dynamic_max_gap <= i - last_best_iter: break self._step(scaler, optimizer, lr_schedule) + + rank_log(f"ends iteration {i} for block quantization") + # dist.barrier() last_loss = total_loss best_iter = self.iters diff --git a/auto_round/compressors/utils.py b/auto_round/compressors/utils.py index 17d3c9f34..cd9402f40 100644 --- a/auto_round/compressors/utils.py +++ b/auto_round/compressors/utils.py @@ -992,3 +992,33 @@ def next_batch(self) -> list[int]: batch = self.indices[self.index : self.index + self.batch_size] self.index += self.batch_size return batch + + +def rank_in_ddp() -> int: + """Returns the rank of the current process in a DDP setup. + + Returns: + int: The rank of the current process. Returns 0 if not in DDP. + """ + if not torch.distributed.is_available() or not torch.distributed.is_initialized(): + return 0 + return torch.distributed.get_rank() + +class DDPIndexSampler(IndexSampler): + + def __init__(self, nsamples: int, batch_size: int, iters: int) -> None: + self.iters = iters + super().__init__(nsamples, batch_size) + rank = rank_in_ddp() + # run next_batch() for `rank` times to sync different rank's sampler + for _ in range(rank * iters): + self.next_batch() + + def next_batch(self) -> list[int]: + if self.index + self.batch_size > self.nsamples: + random.shuffle(self.indices) + self.index = 0 + + batch = self.indices[self.index : self.index + self.batch_size] + self.index += self.batch_size + return batch \ No newline at end of file From e0bd29a00253b408045cb2232fa2df855d00a1ee Mon Sep 17 00:00:00 2001 From: yiliu30 Date: Sat, 24 Jan 2026 19:46:44 +0800 Subject: [PATCH 2/6] format Signed-off-by: yiliu30 --- auto_round/compressors/base.py | 55 ++++++++++++++++++++-------------- 1 file changed, 32 insertions(+), 23 deletions(-) diff --git a/auto_round/compressors/base.py b/auto_round/compressors/base.py index e777b7657..8dc7a8e49 100644 --- a/auto_round/compressors/base.py +++ b/auto_round/compressors/base.py @@ -35,6 +35,7 @@ from auto_round.auto_scheme.gen_auto_scheme import AutoScheme from auto_round.compressors.shard_writer import shard_writer from auto_round.compressors.utils import ( + DDPIndexSampler, IndexSampler, block_forward, check_need_act_calibration, @@ -50,7 +51,6 @@ reset_params, set_layer_config, ) -from auto_round.compressors.utils import DDPIndexSampler from auto_round.data_type import QUANT_FUNC_WITH_DTYPE from auto_round.data_type.utils import reshape_pad_tensor_by_group_size from auto_round.export.export_to_gguf.config import GGUF_INNER_CONFIG, ModelType @@ -144,6 +144,8 @@ ) import torch.distributed as dist + + def rank_log(msg: str) -> None: """Log message with rank information in distributed setting.""" rank = torch.distributed.get_rank() if torch.distributed.is_initialized() else 0 @@ -154,21 +156,22 @@ def rank_log(msg: str) -> None: # Posted by Romuald Brunet, modified by community. See post 'Timeline' for change history # Retrieved 2026-01-24, License - CC BY-SA 3.0 -import sys -import pdb +# import pdb -class ForkedPdb(pdb.Pdb): - """A Pdb subclass that may be used - from a forked multiprocessing child - """ - def interaction(self, *args, **kwargs): - _stdin = sys.stdin - try: - sys.stdin = open('/dev/stdin') - pdb.Pdb.interaction(self, *args, **kwargs) - finally: - sys.stdin = _stdin +# class ForkedPdb(pdb.Pdb): +# """A Pdb subclass that may be used +# from a forked multiprocessing child + +# """ + +# def interaction(self, *args, **kwargs): +# _stdin = sys.stdin +# try: +# sys.stdin = open("/dev/stdin") +# pdb.Pdb.interaction(self, *args, **kwargs) +# finally: +# sys.stdin = _stdin def check_grad(block, msg): @@ -181,13 +184,18 @@ def check_grad(block, msg): rank_log(f"{msg} No grad found in block.") else: name, param = the_first_param_with_grad - rank_log(f"{msg} Grad found in block. Param name: {name}, Grad norm: {param.grad.norm().item()}. grad: {param.grad}") + rank_log( + f"{msg} Grad found in block. Param name: {name}, Grad norm: {param.grad.norm().item()}. grad: {param.grad}" + ) def is_ddp(): return dist.is_initialized() and dist.get_world_size() > 1 + from torch.nn.parallel import DistributedDataParallel as DDP + + class BaseCompressor(object): """Base compressor for LLM quantization @@ -2927,14 +2935,13 @@ def _quantize_block( rank = torch.distributed.get_rank() if torch.distributed.is_initialized() else 0 rank_log(f"DDP rank {rank} is quantizing the block") # ForkedPdb().set_trace() - rank_log(f"device info: deivce: {device}, loss_device: {loss_device}, block's device {next(block.parameters()).device}") + rank_log( + f"device info: device: {device}, loss_device: {loss_device}, block's device {next(block.parameters()).device}" + ) block = DDP(block, device_ids=[rank], find_unused_parameters=True) dist.barrier() - logger.warning_once( - "DistributedDataParallel (DDP) is used for block quantization. " - f"block: {block}" - ) - + logger.warning_once("DistributedDataParallel (DDP) is used for block quantization. " f"block: {block}") + for i in range(self.iters): rank_log(f"starts iteration {i} for block quantization") if self.enable_alg_ext and self.data_type.endswith("dq"): @@ -2958,7 +2965,9 @@ def _quantize_block( if self.low_gpu_mem_usage and card_0_in_high_risk: # clear memory to avoid OOM due to memory fragmentation clear_memory_if_reached_threshold(threshold=0.5, device_list=self.device_list) - # rank_log(f"iteration {i} tmp_step {tmp_step} loss: {loss.item()}, loss device {loss.device}, starting backward") + rank_log( + f"iteration {i} tmp_step {tmp_step} loss: {loss.item()}, loss device {loss.device}, starting backward" + ) self._scale_loss_and_backward(scaler, loss) # check_grad(block, msg=f"block quantization iter {i} tmp_step {tmp_step}") if self.low_gpu_mem_usage and card_0_in_high_risk: @@ -2982,7 +2991,7 @@ def _quantize_block( if 0 < self.dynamic_max_gap <= i - last_best_iter: break self._step(scaler, optimizer, lr_schedule) - + rank_log(f"ends iteration {i} for block quantization") # dist.barrier() From a5461a44c7f5de3a2542c46144e5c385f8ed1072 Mon Sep 17 00:00:00 2001 From: yiliu30 Date: Sat, 24 Jan 2026 19:50:40 +0800 Subject: [PATCH 3/6] add demo Signed-off-by: yiliu30 --- examples/ddp_demo.py | 262 +++++++++++++++++++++++++++++++++++ examples/ddp_quant_model.py | 266 ++++++++++++++++++++++++++++++++++++ 2 files changed, 528 insertions(+) create mode 100644 examples/ddp_demo.py create mode 100644 examples/ddp_quant_model.py diff --git a/examples/ddp_demo.py b/examples/ddp_demo.py new file mode 100644 index 000000000..d918594fb --- /dev/null +++ b/examples/ddp_demo.py @@ -0,0 +1,262 @@ +""" +Simple End-to-End DDP (DistributedDataParallel) Example + +This script demonstrates how to train a simple neural network using PyTorch DDP +for distributed training across multiple GPUs. + +Usage: + # Using torchrun (recommended) - specify custom port to avoid conflicts + torchrun --nproc_per_node=2 --master_port=29501 ddp_demo.py + + # Or using torch.multiprocessing.spawn (for single machine) + python ddp_demo.py + +Troubleshooting: + # If you get "address already in use" error, use a different port: + torchrun --nproc_per_node=2 --master_port=29502 ddp_demo.py + + # Or kill existing processes: + pkill -f ddp_demo.py +""" + +import os + +import torch +import torch.distributed as dist +import torch.multiprocessing as mp +import torch.nn as nn +import torch.optim as optim +from loguru import logger +from torch.nn.parallel import DistributedDataParallel as DDP +from torch.utils.data import DataLoader, Dataset, DistributedSampler + + +# Simple synthetic dataset for demonstration +class SimpleDataset(Dataset): + def __init__(self, size=1000, input_dim=10): + self.size = size + self.input_dim = input_dim + + def __len__(self): + return self.size + + def __getitem__(self, idx): + # Generate random data and labels + data = torch.randn(self.input_dim) + label = torch.randn(5) # 5 output classes + return data, label + + +# Simple neural network model +class SimpleModel(nn.Module): + def __init__(self, input_dim=10, hidden_dim=20, output_dim=5): + super(SimpleModel, self).__init__() + self.fc1 = nn.Linear(input_dim, hidden_dim) + self.relu = nn.ReLU() + self.fc2 = nn.Linear(hidden_dim, output_dim) + + def forward(self, x): + x = self.fc1(x) + x = self.relu(x) + x = self.fc2(x) + return x + + +def setup(rank, world_size): + """Initialize the distributed environment.""" + os.environ["MASTER_ADDR"] = os.environ.get("MASTER_ADDR", "localhost") + os.environ["MASTER_PORT"] = os.environ.get("MASTER_PORT", "12355") + + # Initialize process group + # Use 'nccl' backend for GPU training, 'gloo' for CPU + backend = "nccl" if torch.cuda.is_available() else "gloo" + dist.init_process_group(backend, rank=rank, world_size=world_size) + + +def cleanup(): + """Clean up the distributed environment.""" + dist.destroy_process_group() + + +def train_ddp(rank, world_size, epochs=5, batch_size=32): + """ + Main training function for DDP. + + Args: + rank: Rank of the current process + world_size: Total number of processes + epochs: Number of training epochs + batch_size: Batch size per process + """ + print(f"Running DDP training on rank {rank}/{world_size}") + + # Setup distributed environment + setup(rank, world_size) + + # Set device for this process + device = torch.device(f"cuda:{rank}" if torch.cuda.is_available() else "cpu") + + # Create model and move to device + model = SimpleModel().to(device) + + # Wrap model with DDP + if torch.cuda.is_available(): + ddp_model = DDP(model, device_ids=[rank]) + else: + ddp_model = DDP(model) + + # Create dataset and distributed sampler + dataset = SimpleDataset(size=10) + sampler = DistributedSampler(dataset, num_replicas=world_size, rank=rank, shuffle=True) + + # Create dataloader + dataloader = DataLoader( + dataset, + batch_size=batch_size, + sampler=sampler, + num_workers=0, + pin_memory=True if torch.cuda.is_available() else False, + ) + + # Loss function and optimizer + criterion = nn.MSELoss() + optimizer = optim.SGD(ddp_model.parameters(), lr=0.01) + + # Training loop + for epoch in range(epochs): + # Set epoch for sampler (important for proper shuffling) + sampler.set_epoch(epoch) + + epoch_loss = 0.0 + num_batches = 0 + + ddp_model.train() + for batch_idx, (data, labels) in enumerate(dataloader): + logger.info(f"Rank {rank}, Epoch {epoch}, Batch {batch_idx} processing") + # Move data to device + data = data.to(device) + labels = labels.to(device) + + # Forward pass + optimizer.zero_grad() + outputs = ddp_model(data) + loss = criterion(outputs, labels) + + # Backward pass + loss.backward() + optimizer.step() + + epoch_loss += loss.item() + num_batches += 1 + + if batch_idx % 10 == 0 and rank == 0: + print(f"Epoch [{epoch+1}/{epochs}], Batch [{batch_idx}/{len(dataloader)}], Loss: {loss.item():.4f}") + + # Calculate average loss + avg_loss = epoch_loss / num_batches + + if rank == 0: + print(f"Epoch [{epoch+1}/{epochs}] completed, Average Loss: {avg_loss:.4f}") + + # Save checkpoint (only on rank 0) + if rank == 0: + checkpoint_path = "/tmp/ddp_model_checkpoint.pth" + torch.save( + { + "epoch": epochs, + "model_state_dict": ddp_model.state_dict(), + "optimizer_state_dict": optimizer.state_dict(), + }, + checkpoint_path, + ) + print(f"Checkpoint saved to {checkpoint_path}") + + # Wait for all processes to finish + dist.barrier() + + # Cleanup + cleanup() + print(f"Rank {rank} finished training") + + +def main_spawn(): + """Main function using mp.spawn for single-machine multi-GPU training.""" + world_size = torch.cuda.device_count() if torch.cuda.is_available() else 2 + + if world_size < 2: + print("Warning: Running with only 1 process. DDP benefits are minimal.") + world_size = 2 # Still spawn 2 processes for demonstration + + print(f"Starting DDP training with {world_size} processes") + + mp.spawn(train_ddp, args=(world_size,), nprocs=world_size, join=True) + + print("Training completed!") + + +def main_torchrun(): + """Main function for torchrun-based execution.""" + # When using torchrun, environment variables are already set + rank = int(os.environ.get("RANK", 0)) + local_rank = int(os.environ.get("LOCAL_RANK", 0)) + world_size = int(os.environ.get("WORLD_SIZE", 1)) + + print(f"Torchrun mode: rank={rank}, local_rank={local_rank}, world_size={world_size}") + + # Setup distributed environment + backend = "nccl" if torch.cuda.is_available() else "gloo" + dist.init_process_group(backend) + + # Set device + device = torch.device(f"cuda:{local_rank}" if torch.cuda.is_available() else "cpu") + + # Train + model = SimpleModel().to(device) + if torch.cuda.is_available(): + ddp_model = DDP(model, device_ids=[local_rank]) + else: + ddp_model = DDP(model) + + # Create dataset with distributed sampler + dataset = SimpleDataset(size=1000) + sampler = DistributedSampler(dataset, num_replicas=world_size, rank=rank) + dataloader = DataLoader(dataset, batch_size=32, sampler=sampler) + + criterion = nn.MSELoss() + optimizer = optim.SGD(ddp_model.parameters(), lr=0.01) + + # Training loop + for epoch in range(5): + sampler.set_epoch(epoch) + epoch_loss = 0.0 + + for batch_idx, (data, labels) in enumerate(dataloader): + data = data.to(device) + labels = labels.to(device) + + optimizer.zero_grad() + outputs = ddp_model(data) + loss = criterion(outputs, labels) + loss.backward() + optimizer.step() + + epoch_loss += loss.item() + + if batch_idx % 10 == 0 and rank == 0: + print(f"Epoch {epoch+1}, Batch {batch_idx}, Loss: {loss.item():.4f}") + + if rank == 0: + print(f"Epoch {epoch+1} completed, Avg Loss: {epoch_loss/len(dataloader):.4f}") + + dist.destroy_process_group() + print(f"Rank {rank} finished!") + + +if __name__ == "__main__": + # Check if running with torchrun (environment variables will be set) + if "RANK" in os.environ and "WORLD_SIZE" in os.environ: + print("Detected torchrun environment, using torchrun mode") + main_torchrun() + else: + print("Using mp.spawn mode for single-machine training") + main_spawn() diff --git a/examples/ddp_quant_model.py b/examples/ddp_quant_model.py new file mode 100644 index 000000000..3bc21d0f5 --- /dev/null +++ b/examples/ddp_quant_model.py @@ -0,0 +1,266 @@ +""" +python ddp_quant_model.py --ddp --nsamples 128 --iters 100 + +DDP-enabled AutoRound Quantization Script + +This script demonstrates how to use DDP (DistributedDataParallel) for model quantization +across multiple GPUs using AutoRound. + +Usage: + # Single GPU (default behavior) + python ddp_quant_model.py + + # Multiple GPUs using mp.spawn + python ddp_quant_model.py --ddp + + # Using torchrun + torchrun --nproc_per_node=2 --master_port=29501 ddp_quant_model.py +""" + +import argparse +import os + +import torch +import torch.distributed as dist +import torch.multiprocessing as mp +import torch.nn.functional as F +import transformers +from torch import Tensor +from transformers import AutoModel, AutoModelForCausalLM, AutoTokenizer + +model_name = "Kimi-K2-Instruct-BF16" +model_name = "/models/Qwen3-30B-A3B" +model_name = "facebook/opt-125m" +model_name = "/data5/yliu7/HF_HOME/meta-llama/Llama-3.2-1B-Instruct/" +model_name = "Qwen/Qwen2.5-0.5B-Instruct" + +model_name = "/data5/yliu7/HF_HOME/unsloth/gpt-oss-20b-BF16" +model_name = "/data4/yliu/unsloth/gpt-oss-120b-BF16" +model_name = "/storage/yiliu7/unsloth/gpt-oss-20b-BF16/" +model_name = "/storage/yiliu7/unsloth/gpt-oss-120b-BF16" +model_name = "Qwen/Qwen2.5-0.5B-Instruct" +model_name = "/storage/yiliu7/Qwen/Qwen2-VL-7B-Instruct" +model_name = "/models/DeepSeek-V2-Lite-Chat/" +model_name = "/storage/yiliu7/deepseek-ai/DeepSeek-V2-Lite-Chat/" +model_name = "/storage/yiliu7/tflsxyy/DeepSeek-V3-bf16-4layers" +model_name = "/storage/yiliu7/deepseek-ai/DeepSeek-R1" +model_name = "Qwen/Qwen3-Embedding-4B" +# model_name = "/storage/yiliu7/Qwen/Qwen3-VL-30B-A3B-Instruct" +model_name = "/storage/jenkins/huggingface/Llama-4-Scout-17B-16E-Instruct" +# model_name = "/storage/yiliu7/meta-llama/Llama-4-Scout-17B-16E" +# model_name = "/storage/yiliu7/unsloth/gpt-oss-20b-BF16/" +model_name = "/storage/yiliu7/Qwen/Qwen3-VL-30B-A3B-Instruct" +model_name = "/storage/yiliu7/Qwen/Qwen3-8B/" +model_name = "/data5/yiliu4/Qwen/Qwen2-0.5B" +# from transformers import Qwen2VLForConditionalGeneration +# tokenizer = AutoTokenizer.from_pretrained(model_name,trust_remote_code=True) +# model = AutoModelForCausalLM.from_pretrained(model_name,device_map="cpu", torch_dtype="auto",trust_remote_code=True) +# from sentence_transformers import SentenceTransformer + +# Load the model +# # model = SentenceTransformer("Qwen/Qwen3-Embedding-4B") +# tokenizer = AutoTokenizer.from_pretrained('Qwen/Qwen3-Embedding-4B', padding_side='left') +# model = AutoModel.from_pretrained('Qwen/Qwen3-Embedding-4B') + +# block = model.model.layers +device_map = {} + + +from auto_round import AutoRound +from auto_round import schemes as ar_schemes + +scheme = "MXFP8" +scheme = ar_schemes.FP8_STATIC +scheme = ar_schemes.MXFP8 +scheme = ar_schemes.MXFP4 +# scheme = "MXFP4" +scheme = ar_schemes.FP8_STATIC +# scheme = ar_schemes.NVFP4 +scheme = "FP8_STATIC" +# scheme = "MXFP4" +scheme = "W4A16" +# scheme = "W4A16" +# "re:.*lm_head", +# "re:.*self_attn", +# "re:.*attn", +# "re:.*attention.*", +# "re:.*router", + +# from mem_patch import * + + +# from transformers.conversion_mapping import register_checkpoint_conversion_mapping + +# register_checkpoint_conversion_mapping("deepseek_v3", [], overwrite=True) +# model.eval() + + +def setup_ddp(rank, world_size): + """Initialize the distributed environment.""" + os.environ["MASTER_ADDR"] = os.environ.get("MASTER_ADDR", "localhost") + os.environ["MASTER_PORT"] = os.environ.get("MASTER_PORT", "12356") + + # Initialize process group + backend = "nccl" if torch.cuda.is_available() else "gloo" + dist.init_process_group(backend, rank=rank, world_size=world_size) + torch.cuda.set_device(rank) + + +def cleanup_ddp(): + """Clean up the distributed environment.""" + if dist.is_initialized(): + dist.destroy_process_group() + + +def quantize_model(rank, world_size, model_name, scheme, iters=4, nsamples=32): + """ + Quantize model on a specific GPU rank. + + Args: + rank: GPU rank for this process + world_size: Total number of GPUs + model_name: Model name or path + scheme: Quantization scheme + iters: Number of iterations + nsamples: Number of samples + """ + print(f"[Rank {rank}/{world_size}] Starting quantization") + + # Setup DDP if using multiple GPUs + if world_size > 1: + setup_ddp(rank, world_size) + + # Set device for this process + device = f"cuda:{rank}" if torch.cuda.is_available() else "cpu" + + # try: + # Initialize AutoRound + autoround = AutoRound( + model_name, + scheme=scheme, + iters=iters, + nsamples=nsamples, + # low_gpu_mem_usage=False, + low_gpu_mem_usage=True, + # device=f"cuda:{rank}", + device_map=rank, + enable_torch_compile=True, + ) + SAVE_DIR = model_name.rstrip("/").split("/")[-1] + f"-{scheme}" + # Only rank 0 saves the model + if rank == 0: + print(f"[Rank {rank}] Quantizing and saving to {SAVE_DIR}") + model, _ = autoround.quantize_and_save(format="auto_round", output_dir=SAVE_DIR) + print(f"[Rank {rank}] Quantized model saved to {SAVE_DIR}") + else: + # Other ranks just run quantization without saving + print(f"[Rank {rank}] Running quantization (not saving)") + model, _ = autoround.quantize_and_save(format="auto_round", output_dir=f"{SAVE_DIR}_rank{rank}") + + # Synchronize all processes + if world_size > 1: + dist.barrier() + + print(f"[Rank {rank}] Quantization completed") + + # except Exception as e: + # print(f"[Rank {rank}] Error during quantization: {e}") + # raise + + # finally: + # # Cleanup DDP + if world_size > 1: + cleanup_ddp() + + +def main_single_gpu(model_name, scheme, iters, nsamples): + """Run quantization on a single GPU (original behavior).""" + print("Running single GPU quantization") + autoround = AutoRound( + model_name, + scheme=scheme, + iters=iters, + nsamples=nsamples, + low_gpu_mem_usage=False, + ) + + SAVE_DIR = model_name.rstrip("/").split("/")[-1] + f"-{scheme}" + model, _ = autoround.quantize_and_save(format="auto_round", output_dir=SAVE_DIR) + print(f"Quantized model saved to {SAVE_DIR}") + return model + + +def main_spawn(model_name, scheme, iters, nsamples): + """Main function using mp.spawn for multi-GPU quantization.""" + world_size = torch.cuda.device_count() if torch.cuda.is_available() else 1 + + if world_size < 2: + print("Warning: Only 1 GPU detected. Running single GPU mode.") + return main_single_gpu(model_name, scheme, iters, nsamples) + + print(f"Starting DDP quantization with {world_size} GPUs") + + mp.spawn(quantize_model, args=(world_size, model_name, scheme, iters, nsamples), nprocs=world_size, join=True) + + print("Quantization completed!") + + +def main_torchrun(model_name, scheme, iters, nsamples): + """Main function for torchrun-based execution.""" + rank = int(os.environ.get("RANK", 0)) + local_rank = int(os.environ.get("LOCAL_RANK", 0)) + world_size = int(os.environ.get("WORLD_SIZE", 1)) + + print(f"Torchrun mode: rank={rank}, local_rank={local_rank}, world_size={world_size}") + + quantize_model(local_rank, world_size, model_name, scheme, iters, nsamples) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="AutoRound Quantization with DDP support") + parser.add_argument("--model_name", type=str, default="/data5/yiliu4/Qwen/Qwen2-0.5B", help="Model name or path") + parser.add_argument( + "--scheme", type=str, default="FP8_STATIC", help="Quantization scheme (FP8_STATIC, MXFP8, MXFP4, etc.)" + ) + parser.add_argument("--iters", type=int, default=4, help="Number of iterations") + parser.add_argument("--nsamples", type=int, default=32, help="Number of samples") + parser.add_argument("--ddp", action="store_true", help="Enable DDP multi-GPU mode") + + args = parser.parse_args() + + # For backward compatibility with existing hardcoded values + model_name = args.model_name + + # Parse scheme from string if needed + from auto_round import schemes as ar_schemes + + scheme_map = { + "FP8_STATIC": ar_schemes.FP8_STATIC, + "MXFP8": ar_schemes.MXFP8, + "MXFP4": ar_schemes.MXFP4, + } + # scheme = scheme_map.get(args.scheme, args.scheme) + + # Check if running with torchrun + if "RANK" in os.environ and "WORLD_SIZE" in os.environ: + print("Detected torchrun environment") + main_torchrun(model_name, scheme, args.iters, args.nsamples) + elif args.ddp: + print("Using mp.spawn mode for multi-GPU quantization") + main_spawn(model_name, scheme, args.iters, args.nsamples) + else: + print("Using single GPU mode") + main_single_gpu(model_name, scheme, args.iters, args.nsamples) + +# with torch.no_grad(), torch.device("cuda"): +# model = AutoModelForCausalLM.from_pretrained(SAVE_DIR, device_map="auto", torch_dtype="auto",trust_remote_code=True) +# model.eval() +# input_text = "Explain the theory of relativity in simple terms." +# inputs = autoround.tokenizer(input_text, return_tensors="pt").to(model.device) +# outputs = model.generate(**inputs, max_new_tokens=20) +# decoded_output = autoround.tokenizer.decode(outputs[0], skip_special_tokens=True) +# print("Generated Output:") +# print(decoded_output) +# with torch.no_grad(): +# model, _ = autoround.quantize_and_save(format="auto_round", output_dir=SAVE_DIR) +# print(f"Quantized model saved to {SAVE_DIR}") From 36046dcd7bc99a5ac7fd9db3a927aac096dd3e11 Mon Sep 17 00:00:00 2001 From: yiliu30 Date: Sun, 25 Jan 2026 19:36:04 -0800 Subject: [PATCH 4/6] fix rank 0 Signed-off-by: yiliu30 --- auto_round/compressors/base.py | 21 +++++++++++---------- examples/ddp_quant_model.py | 4 +++- 2 files changed, 14 insertions(+), 11 deletions(-) diff --git a/auto_round/compressors/base.py b/auto_round/compressors/base.py index 8dc7a8e49..a86d8591b 100644 --- a/auto_round/compressors/base.py +++ b/auto_round/compressors/base.py @@ -2928,19 +2928,20 @@ def _quantize_block( num_elm = self._get_current_num_elm(input_ids, whole_indices) if is_ddp(): - index_sampler = DDPIndexSampler(nsamples, global_batch_size, iters=self.iters) + # index_sampler = DDPIndexSampler(nsamples, global_batch_size, iters=self.iters) + index_sampler = IndexSampler(nsamples, global_batch_size) + rank = torch.distributed.get_rank() if torch.distributed.is_initialized() else 0 + rank_log(f"DDP rank {rank} is quantizing the block") + # ForkedPdb().set_trace() + rank_log( + f"device info: device: {device}, loss_device: {loss_device}, block's device {next(block.parameters()).device}" + ) + block = DDP(block, device_ids=[rank], find_unused_parameters=True) + dist.barrier() + logger.warning_once("DistributedDataParallel (DDP) is used for block quantization. " f"block: {block}") else: index_sampler = IndexSampler(nsamples, global_batch_size) batch_size = self.batch_size - rank = torch.distributed.get_rank() if torch.distributed.is_initialized() else 0 - rank_log(f"DDP rank {rank} is quantizing the block") - # ForkedPdb().set_trace() - rank_log( - f"device info: device: {device}, loss_device: {loss_device}, block's device {next(block.parameters()).device}" - ) - block = DDP(block, device_ids=[rank], find_unused_parameters=True) - dist.barrier() - logger.warning_once("DistributedDataParallel (DDP) is used for block quantization. " f"block: {block}") for i in range(self.iters): rank_log(f"starts iteration {i} for block quantization") diff --git a/examples/ddp_quant_model.py b/examples/ddp_quant_model.py index 3bc21d0f5..a276927b7 100644 --- a/examples/ddp_quant_model.py +++ b/examples/ddp_quant_model.py @@ -52,6 +52,8 @@ model_name = "/storage/yiliu7/Qwen/Qwen3-VL-30B-A3B-Instruct" model_name = "/storage/yiliu7/Qwen/Qwen3-8B/" model_name = "/data5/yiliu4/Qwen/Qwen2-0.5B" +model_name = "Qwen/Qwen2-0.5B" +model_name = "Qwen/Qwen3-8B" # from transformers import Qwen2VLForConditionalGeneration # tokenizer = AutoTokenizer.from_pretrained(model_name,trust_remote_code=True) # model = AutoModelForCausalLM.from_pretrained(model_name,device_map="cpu", torch_dtype="auto",trust_remote_code=True) @@ -218,7 +220,7 @@ def main_torchrun(model_name, scheme, iters, nsamples): if __name__ == "__main__": parser = argparse.ArgumentParser(description="AutoRound Quantization with DDP support") - parser.add_argument("--model_name", type=str, default="/data5/yiliu4/Qwen/Qwen2-0.5B", help="Model name or path") + parser.add_argument("--model_name", type=str, default=model_name, help="Model name or path") parser.add_argument( "--scheme", type=str, default="FP8_STATIC", help="Quantization scheme (FP8_STATIC, MXFP8, MXFP4, etc.)" ) From 1c3a852904ec77208c1389d85a0a3b41462a2a81 Mon Sep 17 00:00:00 2001 From: yiliu30 Date: Mon, 26 Jan 2026 05:54:10 -0800 Subject: [PATCH 5/6] fix log Signed-off-by: yiliu30 --- auto_round/compressors/base.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/auto_round/compressors/base.py b/auto_round/compressors/base.py index a86d8591b..8c69f1c6d 100644 --- a/auto_round/compressors/base.py +++ b/auto_round/compressors/base.py @@ -2944,7 +2944,7 @@ def _quantize_block( batch_size = self.batch_size for i in range(self.iters): - rank_log(f"starts iteration {i} for block quantization") + rank_log(f"starts iteration {i} for block quantization, best loss so far {best_loss}") if self.enable_alg_ext and self.data_type.endswith("dq"): for n, m in block.named_modules(): m.cur_iter = i @@ -2954,7 +2954,7 @@ def _quantize_block( num_elm = self._get_non_zero_cnt(self.attention_mask, global_indices) for tmp_step in range(self.gradient_accumulate_steps): - rank_log(f"iteration {i} tmp_step {tmp_step} start") + # rank_log(f"iteration {i} tmp_step {tmp_step} start") indices = global_indices[tmp_step * batch_size : (tmp_step + 1) * batch_size] current_output = self._get_current_output(output, indices) current_output = to_device(current_output, loss_device) From 4e8cff41fb59937849ce0a2b466e448b83b7f293 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 4 Feb 2026 01:49:05 +0000 Subject: [PATCH 6/6] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- auto_round/compressors/utils.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/auto_round/compressors/utils.py b/auto_round/compressors/utils.py index a0ecd0182..2aa625021 100644 --- a/auto_round/compressors/utils.py +++ b/auto_round/compressors/utils.py @@ -990,6 +990,7 @@ def rank_in_ddp() -> int: return 0 return torch.distributed.get_rank() + class DDPIndexSampler(IndexSampler): def __init__(self, nsamples: int, batch_size: int, iters: int) -> None: @@ -1007,4 +1008,4 @@ def next_batch(self) -> list[int]: batch = self.indices[self.index : self.index + self.batch_size] self.index += self.batch_size - return batch \ No newline at end of file + return batch