Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 71 additions & 26 deletions lpm_kernel/L2/dpo/dpo_train.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,16 @@
import argparse
import torch

from transformers import AutoTokenizer, AutoModelForCausalLM
try:
from unsloth import FastLanguageModel, FastTokenizer
UNSLOTH_AVAILABLE = True
except ImportError:
from transformers import AutoTokenizer, AutoModelForCausalLM
UNSLOTH_AVAILABLE = False

from datasets import Dataset
from trl import DPOConfig, DPOTrainer
from peft import LoraConfig, AutoPeftModelForCausalLM, get_peft_model
from peft import LoraConfig
from datetime import datetime, timedelta
# from clearml import Task

Expand All @@ -20,6 +26,19 @@ def get_east_eight_time_formatted():

# task = Task.init(project_name="mind_dpo", task_name="qwen25-instruct-" + get_east_eight_time_formatted())

def get_supported_dtype():
# Try bf16, fallback to f16
if torch.cuda.is_available():
if torch.cuda.is_bf16_supported():
return torch.bfloat16, "bfloat16"
else:
return torch.float16, "float16"
try:
_ = torch.zeros(1, dtype=torch.bfloat16)
return torch.bfloat16, "bfloat16"
except Exception:
return torch.float16, "float16"

def training_data_processor(args, SYS = "You are a helpful assistant.\n\n"):
with open(args.training_data_path, "r", encoding="utf-8") as f:
data = json.load(f)
Expand All @@ -33,7 +52,10 @@ def training_data_processor(args, SYS = "You are a helpful assistant.\n\n"):
"chosen": [data_point["chosen"] for data_point in data],
"rejected": [data_point["rejected"] for data_point in data]
}
tokenizer = AutoTokenizer.from_pretrained(args.base_model_path, padding_side="left")
if UNSLOTH_AVAILABLE:
tokenizer = FastTokenizer.from_pretrained(args.base_model_path, padding_side="left")
else:
tokenizer = AutoTokenizer.from_pretrained(args.base_model_path, padding_side="left")
training_data = {
"prompt": tokenizer.apply_chat_template(training_data["prompt"], tokenize=False),
"chosen": training_data["chosen"],
Expand All @@ -42,31 +64,59 @@ def training_data_processor(args, SYS = "You are a helpful assistant.\n\n"):
return training_data

def train(args):
tokenizer = AutoTokenizer.from_pretrained(args.base_model_path, padding_side="left")
model = AutoModelForCausalLM.from_pretrained(
args.base_model_path,
trust_remote_code=True,
ignore_mismatched_sizes=True,
torch_dtype=torch.float32, # CPU doesn't support bfloat16
)
dtype, dtype_str = get_supported_dtype()
if UNSLOTH_AVAILABLE:
tokenizer = FastTokenizer.from_pretrained(args.base_model_path, padding_side="left")
model = FastLanguageModel.from_pretrained(
model_name=args.base_model_path,
dtype=dtype_str,
load_in_4bit=False,
load_in_8bit=False,
device_map="auto" if torch.cuda.is_available() else "cpu"
)
# Apply LoRA with Unsloth's optimized method if requested
if args.lora_r > 0:
model = FastLanguageModel.get_peft_model(
model,
lora_alpha=args.lora_alpha,
lora_dropout=args.lora_dropout,
r=args.lora_r,
target_modules=["q_proj", "k_proj", "v_proj", "o_proj", "gate_proj", "up_proj", "down_proj"],
use_gradient_checkpointing=False,
random_state=42,
max_seq_length=args.max_length,
)
# Use FastDPOTrainer if available, else fallback
try:
from unsloth import FastDPOTrainer as DPOTrainerImpl
except ImportError:
from trl import DPOTrainer as DPOTrainerImpl
else:
tokenizer = AutoTokenizer.from_pretrained(args.base_model_path, padding_side="left")
model = AutoModelForCausalLM.from_pretrained(
args.base_model_path,
trust_remote_code=True,
ignore_mismatched_sizes=True,
torch_dtype=dtype,
)
DPOTrainerImpl = DPOTrainer
time_str = get_east_eight_time_formatted()

# merged_model = model.merge_and_unload()
# merged_model.save_pretrained(merged_model)

data_dict = training_data_processor(args)
dataset = Dataset.from_dict(data_dict)

if args.lora_r == 0:
lora_config = None
else:
# Only use LoRA config for non-Unsloth
lora_config = None
if not UNSLOTH_AVAILABLE and args.lora_r > 0:
lora_config = LoraConfig(
r=args.lora_r,
lora_alpha=args.lora_alpha,
lora_dropout=args.lora_dropout,
bias="none",
target_modules="all-linear",
target_modules=["q_proj", "k_proj", "v_proj", "o_proj", "gate_proj", "up_proj", "down_proj"],
task_type="CAUSAL_LM",
inference_mode=False,
fan_in_fan_out=False
)

training_args = DPOConfig(
Expand All @@ -93,7 +143,7 @@ def train(args):
beta=args.beta,
)

dpo_trainer = DPOTrainer(
dpo_trainer = DPOTrainerImpl(
model,
tokenizer=tokenizer,
args=training_args,
Expand All @@ -102,7 +152,6 @@ def train(args):
)

dpo_trainer.train()

dpo_trainer.save_model()


Expand All @@ -126,13 +175,9 @@ def train(args):
parser.add_argument('--beta', type=float, default=0.1)

# LoRA arguments
parser.add_argument('--lora_r', type=int, default=64)
parser.add_argument('--lora_alpha', type=int, default=128)
parser.add_argument('--lora_dropout', type=float, default=0.1)

# DeepSpeed arguments
parser.add_argument('--deepspeed', type=str, default=None)
parser.add_argument('--local_rank', type=int, default=-1)
parser.add_argument('--lora_r', type=int, default=16)
parser.add_argument('--lora_alpha', type=int, default=16)
parser.add_argument('--lora_dropout', type=float, default=0.0)

args = parser.parse_args()

Expand Down
110 changes: 45 additions & 65 deletions lpm_kernel/L2/train.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,30 +204,27 @@ def main(model_args, data_args, training_args):
training_args = memory_manager.optimize_training_args(training_args)

# --- Accelerate optimizer state offloading logic ---
# Enable optimizer state offload to CPU if VRAM is low and not using DeepSpeed
# Enable optimizer state offload to CPU if VRAM is low
vram_total = memory_manager.get_memory_info().get("vram_total_gb", 0)
use_accelerate_offload = False
if torch.cuda.is_available() and model_args.use_cuda and vram_total > 0 and vram_total < 16:
# Only set if not already using DeepSpeed
if not hasattr(training_args, "deepspeed") or training_args.deepspeed is None:
logger.info("Enabling Hugging Face Accelerate optimizer state offload to CPU for low VRAM GPUs")
accelerate_config = {
"compute_environment": "LOCAL_MACHINE",
"deepspeed_config": None,
"distributed_type": "NO",
"downcast_bf16": False,
"fsdp_config": {},
"main_training_function": "main",
"mixed_precision": "no",
"num_machines": 1,
"num_processes": 1,
"use_cpu": False,
"zero3_init_flag": False,
"offload_optimizer_device": "cpu",
"offload_param_device": "none"
}
training_args.accelerate_config = accelerate_config
use_accelerate_offload = True
logger.info("Enabling Hugging Face Accelerate optimizer state offload to CPU for low VRAM GPUs")
accelerate_config = {
"compute_environment": "LOCAL_MACHINE",
"distributed_type": "NO",
"downcast_bf16": False,
"fsdp_config": {},
"main_training_function": "main",
"mixed_precision": "no",
"num_machines": 1,
"num_processes": 1,
"use_cpu": False,
"zero3_init_flag": False,
"offload_optimizer_device": "cpu",
"offload_param_device": "none"
}
training_args.accelerate_config = accelerate_config
use_accelerate_offload = True

# Model loading with device_map="auto" for automatic offloading
logger.info(f"Loading model with automatic memory management from {model_args.model_name_or_path}")
Expand Down Expand Up @@ -273,13 +270,19 @@ def main(model_args, data_args, training_args):
model, peft_config, tokenizer = create_and_prepare_model(
model_args, data_args, training_args, model_kwargs=model_kwargs
)

# If model has meta tensors, handle them properly
if hasattr(model, "is_meta") and model.is_meta:
logger.info("Model has meta tensors, using to_empty() to properly initialize")

# Robustly check for meta tensors and handle them
def has_meta_tensors(model):
return any(
(hasattr(p, 'device') and getattr(p, 'device', None) is not None and getattr(p, 'device').type == 'meta')
for p in list(model.parameters()) + list(model.buffers())
)

if has_meta_tensors(model):
logger.info("Model has parameters on meta device, using to_empty() to properly initialize")
device = "cuda" if torch.cuda.is_available() and model_args.use_cuda else "cpu"
model = model.to_empty(device=device)

# Apply gradient checkpointing for memory efficiency
if training_args.gradient_checkpointing and hasattr(model, "gradient_checkpointing_enable"):
logger.info("Enabling gradient checkpointing for memory efficiency")
Expand All @@ -306,45 +309,22 @@ def main(model_args, data_args, training_args):
"add_special_tokens": data_args.add_special_tokens,
}

# Use DeepSpeed to handle meta tensors if available
try:
# Only configure DeepSpeed if meta tensors are present and DeepSpeed is available
if hasattr(model, "is_meta") and model.is_meta:
logger.info("Model has meta tensors, checking DeepSpeed availability")
# First verify DeepSpeed is properly installed and importable
try:
import deepspeed
logger.info("DeepSpeed is available, configuring for meta tensor handling")

# Configure with appropriate settings for meta tensors
training_args.deepspeed = {
"zero_stage": 3,
"offload_optimizer": {
"device": "cpu"
},
"offload_param": {
"device": "cpu"
},
"zero3_init_flag": True,
"zero_force_ds_cpu_optimizer": False
}
logger.info("DeepSpeed configured for meta tensor handling")
except ImportError:
logger.warning("DeepSpeed is not available, meta tensors will be handled differently")
# If DeepSpeed isn't available, use alternative approach to handle meta tensors
if torch.cuda.is_available() and model_args.use_cuda:
logger.info("Initializing meta tensors on GPU")
# Use device_map instead of DeepSpeed for meta tensor initialization
from accelerate import init_empty_weights
with init_empty_weights():
model.to_empty(device="cuda")
else:
logger.info("Initializing meta tensors on CPU")
model.to_empty(device="cpu")
except Exception as e:
logger.warning(f"Could not configure meta tensor handling: {e}")
logger.warning(traceback.format_exc())

if hasattr(model, "is_meta") and model.is_meta:
logger.info("Model has meta tensors, initializing properly")
try:
# Initialize meta tensors on appropriate device
if torch.cuda.is_available() and model_args.use_cuda:
logger.info("Initializing meta tensors on GPU")
from accelerate import init_empty_weights
with init_empty_weights():
model.to_empty(device="cuda")
else:
logger.info("Initializing meta tensors on CPU")
model.to_empty(device="cpu")
except Exception as e:
logger.warning(f"Could not initialize meta tensors: {e}")
logger.warning(traceback.format_exc())

trainer = SFTTrainer(
model=model,
tokenizer=tokenizer,
Expand Down
3 changes: 1 addition & 2 deletions lpm_kernel/api/domains/trainprocess/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,6 @@ def retrain():
data_synthesis_mode: Mode for data synthesis (optional)
use_cuda: Whether to use CUDA for training (optional)
is_cot: Whether to use Chain of Thought (optional)
use_previous_params: Whether to use previous training parameters (optional, default True)

Returns:
Response: JSON response
Expand Down Expand Up @@ -318,7 +317,7 @@ def retrain():
is_cot = data.get("is_cot", None)

# Log the received parameters
logger.info(f"Retrain parameters: model_name={model_name}, learning_rate={learning_rate}, number_of_epochs={number_of_epochs}, concurrency_threads={concurrency_threads}, data_synthesis_mode={data_synthesis_mode}, use_cuda={use_cuda}, is_cot={is_cot}, use_previous_params={use_previous_params}")
logger.info(f"Retrain parameters: model_name={model_name}, learning_rate={learning_rate}, number_of_epochs={number_of_epochs}, concurrency_threads={concurrency_threads}, data_synthesis_mode={data_synthesis_mode}, use_cuda={use_cuda}, is_cot={is_cot}")

# Create training service instance
train_service = TrainProcessService(current_model_name=model_name)
Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,12 @@ pandas = "2.2.3"
fnllm = {extras = ["azure", "openai"], version = "0.1.2"}
transformers = "4.47.1"
torch = "2.5.1"
peft = "0.14.0"
peft = "0.15.2"
trl = "0.13.0"
gguf = "0.10.0"
datasets = "3.3.2"
jiter = "0.8.2"
unsloth = "2025.4.3"

# Documentation environment dependencies
# Use 'poetry install --with docs' to install documentation dependencies
Expand Down