From 0afca3082d20ec4551ba5e73b4d53de6aef020bc Mon Sep 17 00:00:00 2001 From: ganler Date: Thu, 7 Aug 2025 10:19:55 +0000 Subject: [PATCH 1/6] feat: integrate RL code and add docu --- README.md | 146 +++++++- datagen/ctxdistill/ctxdistill.py | 8 +- rl/controlled/main_grpo_qwen14b_dapo_speed.sh | 78 +++++ rl/controlled/main_grpo_qwen14b_direct_rl.sh | 77 +++++ .../main_grpo_qwen14b_hard_reward.sh | 78 +++++ rl/data/correctness.py | 257 ++++++++++++++ rl/data/malevent.py | 124 +++++++ rl/data/merge.py | 94 ++++++ rl/data/secqa.py | 114 +++++++ rl/data/security.py | 114 +++++++ rl/grouped_reward.py | 132 ++++++++ rl/main_grpo_qwen14b.sh | 76 +++++ rl/main_grpo_qwen32b.sh | 76 +++++ rl/model_merger.py | 189 +++++++++++ rl/reward.py | 316 ++++++++++++++++++ rl/reward_utility/code_analyzer.py | 34 ++ rl/reward_utility/sandbox_fusion.py | 66 ++++ 17 files changed, 1969 insertions(+), 10 deletions(-) create mode 100644 rl/controlled/main_grpo_qwen14b_dapo_speed.sh create mode 100644 rl/controlled/main_grpo_qwen14b_direct_rl.sh create mode 100644 rl/controlled/main_grpo_qwen14b_hard_reward.sh create mode 100644 rl/data/correctness.py create mode 100644 rl/data/malevent.py create mode 100644 rl/data/merge.py create mode 100644 rl/data/secqa.py create mode 100644 rl/data/security.py create mode 100644 rl/grouped_reward.py create mode 100644 rl/main_grpo_qwen14b.sh create mode 100644 rl/main_grpo_qwen32b.sh create mode 100644 rl/model_merger.py create mode 100644 rl/reward.py create mode 100644 rl/reward_utility/code_analyzer.py create mode 100644 rl/reward_utility/sandbox_fusion.py diff --git a/README.md b/README.md index c6ae5f5..295cdd9 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # ๐Ÿ”ฎ PurpCode: Reasoning for Safer Code Generation -This repo includes the training and evaluation infrastructure for PurpCode. For other resources, please check out: +This repo includes the training and evaluation infrastructure for PurpCode. For other related resources, please check out: * [๐Ÿ“ Paper](https://arxiv.org/abs/2507.19060) with technical and evaluation details * [๐Ÿค— HuggingFace](https://huggingface.co/purpcode) including model checkpoints and training/evaluation datasets @@ -8,21 +8,150 @@ This repo includes the training and evaluation infrastructure for PurpCode. For ## Overview -PurpCode is an alignment method and a fully open-source recipe (data, model, and code) for eliciting **cybersafe reasoning** capabilities of coding models, including secure code generation and defending against malicious cyber events. +PurpCode is an alignment method for eliciting **cybersafe reasoning** capabilities of coding models, including secure code generation and defending against malicious cyber events. PurpCode includes two alignment stages: 1. **[Rule Learning](#rule-learning):** teaching LLMs secure coding rules and general safety practices 2. **[Reinforcement Learning](#reinforcement-learning):** letting LLMs co-exercise their safety and utility via verifiable tasks -We also curate comprehensive safety data via internal red teaming and use various evaluators covering cybersafety, utility, and overrefusal. +โœจ Some highlights of our work: +- The โ˜๏ธ*first* cybersafe reasoning recipe in open source +- Great cybersafety and utility preservation, winning the ๐Ÿฅ‡*1st place* in [Amazon Nova AI Challenge](https://www.amazon.science/nova-ai-challenge/pushing-the-boundaries-of-secure-ai-winners-of-the-amazon-nova-ai-challenge) +- Fully ๐Ÿ‘open-sourced, from models, data, to training/evaluation code and data synthesizers +- ๐ŸŽ๏ธ Fast RL with *Single-Step Dynamic Sampling* -- 12% faster, 15% less sample wasting, and better results than [DAPO](https://arxiv.org/abs/2503.14476) +- ๐Ÿ“š Supporting 13 evals, 90 CWEs, and 4 training objectives & rewards, covering cybersafety, utility, and overrefusal +- ๐Ÿ™…โ€โ™‚๏ธ XSCode -- our home-made evaluator and the *first* benchmark for checking overrefusal in secure code generation +- ... and more details in the [paper](https://arxiv.org/abs/2507.19060)! + +## Initial Setup + +```bash +# --- TMUX SESSION "main" --- +tmux at -t main || tmux new -s main +# Security analyzers +export SHELL_RC=${HOME}/.bashrc # or ~/.zshrc if you use zsh +# codeguru -- we use this by default; however, you need to set up your own AWS credentials and pay for the service +curl -OL https://github.com/aws/aws-codeguru-cli/releases/download/0.2.4/aws-codeguru-cli.zip +unzip aws-codeguru-cli.zip -d ${HOME} +export PATH=$PATH:${HOME}/aws-codeguru-cli/bin/ +sed -i '1i export PATH=$PATH:${HOME}/aws-codeguru-cli/bin/' ${SHELL_RC} + +# codeql -- if you don't want to use codeguru, you can use codeql instead which only eats CPUs but the analyzer completeness and soundness can be different +# -- you also need to set environment variable to PURPCODE_CODE_ANALYZER=codeql +wget https://github.com/github/codeql-action/releases/download/codeql-bundle-v2.21.0/codeql-bundle-linux64.tar.gz +tar -xf codeql-bundle-linux64.tar.gz -C ${HOME} +export PATH=$PATH:${HOME}/codeql/ +sed -i '1i export PATH=$PATH:${HOME}/codeql/' ${SHELL_RC} + +tmux detach +# -------------------------- + +# --- TMUX SESSION "sandbox" --- +tmux new -s sandbox +docker run -it -p 8080:8080 volcengine/sandbox-fusion:server-20241204 +tmux detach +# ------------------------------ +``` ## Rule Learning -TBD +We will go through the example based on `Qwen/Qwen2.5-14B-Instruct-1M`: + +### Rejection Sampling + +```bash +# --- TMUX SESSION "sgl" --- +conda create -n sgl python=3.12 -y +conda activate sgl +pip install --upgrade pip +pip install "sglang[all]>=0.4.9.post2" "sglang-router" "huggingface-hub" + +huggingface-cli download Qwen/Qwen2.5-14B-Instruct-1M +python3 -m sglang_router.launch_server --model Qwen/Qwen2.5-14B-Instruct-1M --dp-size 8 --port 30000 --host 0.0.0.0 | tmux detach +# -------------------------- + +# --- TMUX SESSION "main" --- +tmux at -t main || tmux new -s main +# Inference client for self/context distillation +# NOTE: context distillation (https://arxiv.org/abs/2209.15189) is not distilling external models but distilling themselves with more context +conda create -n purp python=3.12 -y +conda activate purp +pip install -r requirements.txt +# Sampling +python datagen/ctxdistill/distill_main.py --model openai/Qwen/Qwen2.5-14B-Instruct-1M --sample-per-prompt 8 --concurrency 400 +# --------------------------- + +# --- TMUX SESSION "sgl" --- +tmux at -t sgl +# *Manually* kill the sglang server +# Ctrl + C +# Serve the LLM judge model +huggingface-cli download Qwen/Qwen2.5-32B-Instruct +python3 -m sglang_router.launch_server --model Qwen/Qwen2.5-32B-Instruct --dp-size 8 --port 30000 --host 0.0.0.0 +tmux detach +# -------------------------- + +# --- TMUX SESSION "main" --- +# Verification +tmux at -t main || tmux new -s main +export LLM_JUDGE_OPENAI_URL='http://0.0.0.0:30000/v1' +python datagen/ctxdistill/post.py --generation-path Qwen2.5-14B-Instruct-1M.distill.train.jsonl +# ---------------------------- +``` + +### Running SFT + +```bash +# --- TMUX SESSION "main" --- +tmux at -t main || tmux new -s main +conda create -n axo python=3.12 -y +conda activate axo +git clone git@github.com:axolotl-ai-cloud/axolotl.git +cd axolotl +pip3 install torch --index-url https://download.pytorch.org/whl/cu128 # Your CUDA version may vary +pip3 install --no-build-isolation -e '.[flash-attn,deepspeed]' + +cd purpcode # come back to the root directory +# double check sft/ctxdistill_qwen14b.yaml to make sure the paths are aligned well +axolotl train sft/ctxdistill_qwen14b.yaml --deepspeed deepspeed_configs/zero3.json +# --> outputs/purpcode-14b-ctxdistill +``` ## Reinforcement Learning -TBD +```bash +# --- TMUX SESSION "main" --- +tmux at -t main || tmux new -s main +conda create -n rl python=3.12 -y +conda activate rl + +git clone git@github.com:ganler/verl.git +cd verl +git checkout opt + +pip install -e . --upgrade +pip install vllm==0.8.2 +pip install flash-attn --no-build-isolation --upgrade + +cd purpcode # come back to the root directory +python rl/data/merge.py --datasets purpcorn/code-r1-46k-leetcode2k-kodcode purpcorn/rl-codesec-78k purpcorn/rl-secqa-11k purpcorn/rl-safety-8k-single-turn \ + --skip Qwen2.5-14B-Instruct-1M.ez_task_ids.txt + +# --------------------------- + +# --- TMUX SESSION "sgl" (remote machine) --- +# Do it in another machine (assuming ip=a.b.c.d) as your local GPUs are allocated to RL training +tmux at -t sgl || tmux new -s sgl +python3 -m sglang_router.launch_server --model Qwen/Qwen2.5-32B-Instruct --dp-size 8 --port 30000 --host 0.0.0.0 | tmux detach +# ------------------------------------------- + +# --- TMUX SESSION "main" (RL machine) --- +tmux at -t main || tmux new -s main +export LLM_JUDGE_OPENAI_URL='http://[a.b.c.d]:30000/v1' # replace [a.b.c.d] with a true IP address +conda activate rl +bash rl/main_grpo_qwen14b.sh +# ------------------------------------------- +``` ## Evaluation @@ -42,6 +171,13 @@ python eval/main.py --task "purpcode/PHTest" --model purpcode/purpco Notes: * `--oracle` for evaluating customized generation (default guessing from dataset). +## Acknowledgements + +- [Amazon Nova AI Challenge](https://www.amazon.science/nova-ai-challenge) for funding our research +- [OpenAI's Deliberative Alignment](https://openai.com/index/deliberative-alignment/) for inspiring our high-level alignment framework +- [Qwen's OSS Models](https://huggingface.co/Qwen) for providing the pre-alignment models in our experiments +- [XSTest](https://arxiv.org/abs/2308.01263) for inspiring our XSCode dataset + ## References ```bibtex diff --git a/datagen/ctxdistill/ctxdistill.py b/datagen/ctxdistill/ctxdistill.py index 5338b5b..c429a51 100644 --- a/datagen/ctxdistill/ctxdistill.py +++ b/datagen/ctxdistill/ctxdistill.py @@ -146,7 +146,7 @@ def construction_analyzer_info(analyzer_results: dict): + "\n```" ) block.append( - recommendataion.split("[Learn more]")[0] + recommendation.split("[Learn more]")[0] .split("**More info**")[0] .strip() ) @@ -255,7 +255,7 @@ def run_distillation( ): if not eval: # using training data temperature = 0.8 - output_path = f"{model.split('/')[-1]}.distill.june.train.jsonl" + output_path = f"{model.split('/')[-1]}.distill.train.jsonl" print(f"Expected output path: {output_path}") rows = [] @@ -288,9 +288,7 @@ def run_distillation( else: # eval mode assert sample_per_prompt == 1, "Sample per prompt is not supported in eval mode" temperature = 0.2 if "Qwen3" in model or "DeepSeek-R1" in model else 0.0 - output_path = ( - f"{model.split('/')[-1]}.distill.june.eval.{eval.split('/')[-1]}.jsonl" - ) + output_path = f"{model.split('/')[-1]}.distill.eval.{eval.split('/')[-1]}.jsonl" dataset = load_dataset(eval, split="test") # expand turns if there's a mixture of user and assistant turns print(f"Dataset before expansion: {dataset}") diff --git a/rl/controlled/main_grpo_qwen14b_dapo_speed.sh b/rl/controlled/main_grpo_qwen14b_dapo_speed.sh new file mode 100644 index 0000000..8a6d8e9 --- /dev/null +++ b/rl/controlled/main_grpo_qwen14b_dapo_speed.sh @@ -0,0 +1,78 @@ +#!/bin/bash +# SPDX-FileCopyrightText: (c) UIUC PurpCode Team +# +# SPDX-License-Identifier: Apache-2.0 + +# The config is optimized for 8xH200 +# Assuming using vLLM >= 0.8 such that is V1 is enbaled by default +# Depends on: https://github.com/ganler/verl/tree/opt +set -eux + +# IMPORTANT: checkout the specialized verl repository to the `opt-dapo-ds` branch instead of `opt` + +export PYTHONPATH=$(pwd) + +python -c "import rl.data" + +if [ -z "${CUDA_VISIBLE_DEVICES+x}" ]; then + GPUS_PER_NODE=$(nvidia-smi --query-gpu=name --format=csv,noheader | wc -l) +else + GPUS_PER_NODE=$(echo "$CUDA_VISIBLE_DEVICES" | awk -F',' '{print NF}') +fi + +# Tips for reducing VRAM usage +# 1. Reduce MICRO_BATCH_PER_GPU (and increase GRAD_ACCUM_STEPS accordingly) +# 2. Reduce the factor (6) in PPO_MAX_TOKEN_LEN_PER_GPU to 3 + +# MAIN CONFIG +DATASET=code-r1-46k-leetcode2k-kodcode-rl-codesec-78k-rl-secqa-11k-rl-safety-8k-single-turn +MODEL_PATH="outputs/purpcode-14b-ctxdistill" +MICRO_BATCH_PER_GPU=48 +ROLLOUT_N_SAMPLE=8 +MAX_PROMPT_LEN=2048 +MAX_RESPONSE_LEN=3072 +MAX_EPOCHS=1 + +# AUTO VALUES +ROLLOUT_N_QUERY=$((MICRO_BATCH_PER_GPU * GPUS_PER_NODE)) +PPO_MAX_TOKEN_LEN_PER_GPU=$(( 8 * $(( $MAX_PROMPT_LEN + $MAX_RESPONSE_LEN )) )) + +python3 -m verl.trainer.main_ppo \ + algorithm.adv_estimator=grpo \ + data.train_files=local_data/$DATASET/train.parquet \ + data.val_files=local_data/$DATASET/test.parquet \ + data.filter_overlong_prompts=True \ + data.train_batch_size=$ROLLOUT_N_QUERY \ + +data.max_roll_factor=4 \ + data.max_prompt_length=$MAX_PROMPT_LEN \ + data.max_response_length=$MAX_RESPONSE_LEN \ + actor_rollout_ref.actor.optim.lr=5e-7 \ + actor_rollout_ref.model.use_remove_padding=True \ + actor_rollout_ref.model.path=$MODEL_PATH \ + actor_rollout_ref.model.enable_gradient_checkpointing=True \ + actor_rollout_ref.actor.ppo_mini_batch_size=$ROLLOUT_N_QUERY \ + actor_rollout_ref.actor.ppo_max_token_len_per_gpu=$PPO_MAX_TOKEN_LEN_PER_GPU \ + actor_rollout_ref.actor.use_dynamic_bsz=True \ + actor_rollout_ref.actor.use_kl_loss=True \ + actor_rollout_ref.actor.kl_loss_coef=0.001 \ + actor_rollout_ref.actor.kl_loss_type=low_var_kl \ + actor_rollout_ref.rollout.name=vllm \ + actor_rollout_ref.rollout.gpu_memory_utilization=0.6 \ + actor_rollout_ref.rollout.n=$ROLLOUT_N_SAMPLE \ + actor_rollout_ref.rollout.enforce_eager=False \ + actor_rollout_ref.rollout.free_cache_engine=False \ + algorithm.kl_ctrl.kl_coef=0.001 \ + +algorithm.filter_groups.enable=True \ + trainer.critic_warmup=0 \ + trainer.logger=['wandb'] \ + trainer.project_name='purpcode' \ + trainer.experiment_name=${DATASET}-dapo-speed \ + trainer.nnodes=1 \ + trainer.default_local_dir=./models/purpcode-rl-${DATASET}-14b-dapo-speed \ + trainer.n_gpus_per_node=$GPUS_PER_NODE \ + trainer.save_freq=32 \ + trainer.test_freq=16 \ + trainer.total_epochs=$MAX_EPOCHS \ + trainer.resume_mode=auto \ + +custom_reward_function.path=./rl/grouped_reward.py \ + reward_model.reward_manager=group $@ 2>&1 | tee grpo.log diff --git a/rl/controlled/main_grpo_qwen14b_direct_rl.sh b/rl/controlled/main_grpo_qwen14b_direct_rl.sh new file mode 100644 index 0000000..0aa7fb6 --- /dev/null +++ b/rl/controlled/main_grpo_qwen14b_direct_rl.sh @@ -0,0 +1,77 @@ +#!/bin/bash + +# SPDX-FileCopyrightText: (c) UIUC PurpCode Team +# +# SPDX-License-Identifier: Apache-2.0 + +# The config is optimized for 8xH200 +# Assuming using vLLM >= 0.8 such that is V1 is enbaled by default +# Depends on: https://github.com/ganler/verl/tree/opt +set -eux + +export PYTHONPATH=$(pwd) + +python -c "import rl.data" + +if [ -z "${CUDA_VISIBLE_DEVICES+x}" ]; then + GPUS_PER_NODE=$(nvidia-smi --query-gpu=name --format=csv,noheader | wc -l) +else + GPUS_PER_NODE=$(echo "$CUDA_VISIBLE_DEVICES" | awk -F',' '{print NF}') +fi + +# Tips for reducing VRAM usage +# 1. Reduce MICRO_BATCH_PER_GPU (and increase GRAD_ACCUM_STEPS accordingly) +# 2. Reduce the factor (6) in PPO_MAX_TOKEN_LEN_PER_GPU to 3 + +# MAIN CONFIG +DATASET=code-r1-46k-leetcode2k-kodcode-rl-codesec-78k-rl-secqa-11k-rl-safety-8k-single-turn +MODEL_PATH="models/Qwen2.5-14B-Instruct-1M" +MICRO_BATCH_PER_GPU=48 +ROLLOUT_N_SAMPLE=8 +MAX_PROMPT_LEN=2048 +MAX_RESPONSE_LEN=3072 +MAX_EPOCHS=1 + +# AUTO VALUES +ROLLOUT_N_QUERY=$((MICRO_BATCH_PER_GPU * GPUS_PER_NODE)) +PPO_MAX_TOKEN_LEN_PER_GPU=$(( 8 * $(( $MAX_PROMPT_LEN + $MAX_RESPONSE_LEN )) )) + +python3 -m verl.trainer.main_ppo \ + algorithm.adv_estimator=grpo \ + data.train_files=local_data/$DATASET/train.parquet \ + data.val_files=local_data/$DATASET/test.parquet \ + data.filter_overlong_prompts=True \ + data.train_batch_size=$ROLLOUT_N_QUERY \ + +data.max_roll_factor=4 \ + data.max_prompt_length=$MAX_PROMPT_LEN \ + data.max_response_length=$MAX_RESPONSE_LEN \ + actor_rollout_ref.actor.optim.lr=5e-7 \ + actor_rollout_ref.model.use_remove_padding=True \ + actor_rollout_ref.model.path=$MODEL_PATH \ + actor_rollout_ref.model.enable_gradient_checkpointing=True \ + actor_rollout_ref.actor.ppo_mini_batch_size=$ROLLOUT_N_QUERY \ + actor_rollout_ref.actor.ppo_max_token_len_per_gpu=$PPO_MAX_TOKEN_LEN_PER_GPU \ + actor_rollout_ref.actor.use_dynamic_bsz=True \ + actor_rollout_ref.actor.use_kl_loss=True \ + actor_rollout_ref.actor.kl_loss_coef=0.001 \ + actor_rollout_ref.actor.kl_loss_type=low_var_kl \ + actor_rollout_ref.rollout.name=vllm \ + actor_rollout_ref.rollout.gpu_memory_utilization=0.6 \ + actor_rollout_ref.rollout.n=$ROLLOUT_N_SAMPLE \ + actor_rollout_ref.rollout.enforce_eager=False \ + actor_rollout_ref.rollout.free_cache_engine=False \ + algorithm.kl_ctrl.kl_coef=0.001 \ + +algorithm.filter_groups.enable=True \ + trainer.critic_warmup=0 \ + trainer.logger=['wandb'] \ + trainer.project_name='purpcode' \ + trainer.experiment_name=${DATASET}-direct-rl \ + trainer.nnodes=1 \ + trainer.default_local_dir=./models/purpcode-rl-${DATASET}-14b-direct-rl-rebuttal \ + trainer.n_gpus_per_node=$GPUS_PER_NODE \ + trainer.save_freq=32 \ + trainer.test_freq=16 \ + trainer.total_epochs=$MAX_EPOCHS \ + trainer.resume_mode=auto \ + +custom_reward_function.path=./rl/grouped_reward.py \ + reward_model.reward_manager=group $@ 2>&1 | tee grpo.log diff --git a/rl/controlled/main_grpo_qwen14b_hard_reward.sh b/rl/controlled/main_grpo_qwen14b_hard_reward.sh new file mode 100644 index 0000000..0af917e --- /dev/null +++ b/rl/controlled/main_grpo_qwen14b_hard_reward.sh @@ -0,0 +1,78 @@ +#!/bin/bash +# SPDX-FileCopyrightText: (c) UIUC PurpCode Team +# +# SPDX-License-Identifier: Apache-2.0 + +# The config is optimized for 8xH200 +# Assuming using vLLM >= 0.8 such that is V1 is enbaled by default +# Depends on: https://github.com/ganler/verl/tree/opt +set -eux + +export PYTHONPATH=$(pwd) + +python -c "import rl.data" + +if [ -z "${CUDA_VISIBLE_DEVICES+x}" ]; then + GPUS_PER_NODE=$(nvidia-smi --query-gpu=name --format=csv,noheader | wc -l) +else + GPUS_PER_NODE=$(echo "$CUDA_VISIBLE_DEVICES" | awk -F',' '{print NF}') +fi + +export SOFT_CODESEC_REWARD=0.0 # hard 0-1 reward + +# Tips for reducing VRAM usage +# 1. Reduce MICRO_BATCH_PER_GPU (and increase GRAD_ACCUM_STEPS accordingly) +# 2. Reduce the factor (6) in PPO_MAX_TOKEN_LEN_PER_GPU to 3 + +# MAIN CONFIG +DATASET=code-r1-46k-leetcode2k-kodcode-rl-codesec-78k-rl-secqa-11k-rl-safety-8k-single-turn +MODEL_PATH="outputs/purpcode-14b-ctxdistill" +MICRO_BATCH_PER_GPU=48 +ROLLOUT_N_SAMPLE=8 +MAX_PROMPT_LEN=2048 +MAX_RESPONSE_LEN=3072 +MAX_EPOCHS=1 + +# AUTO VALUES +ROLLOUT_N_QUERY=$((MICRO_BATCH_PER_GPU * GPUS_PER_NODE)) +PPO_MAX_TOKEN_LEN_PER_GPU=$(( 8 * $(( $MAX_PROMPT_LEN + $MAX_RESPONSE_LEN )) )) + +python3 -m verl.trainer.main_ppo \ + algorithm.adv_estimator=grpo \ + data.train_files=local_data/$DATASET/train.parquet \ + data.val_files=local_data/$DATASET/test.parquet \ + data.filter_overlong_prompts=True \ + data.train_batch_size=$ROLLOUT_N_QUERY \ + +data.max_roll_factor=4 \ + data.max_prompt_length=$MAX_PROMPT_LEN \ + data.max_response_length=$MAX_RESPONSE_LEN \ + actor_rollout_ref.actor.optim.lr=5e-7 \ + actor_rollout_ref.model.use_remove_padding=True \ + actor_rollout_ref.model.path=$MODEL_PATH \ + actor_rollout_ref.model.enable_gradient_checkpointing=True \ + actor_rollout_ref.actor.ppo_mini_batch_size=$ROLLOUT_N_QUERY \ + actor_rollout_ref.actor.ppo_max_token_len_per_gpu=$PPO_MAX_TOKEN_LEN_PER_GPU \ + actor_rollout_ref.actor.use_dynamic_bsz=True \ + actor_rollout_ref.actor.use_kl_loss=True \ + actor_rollout_ref.actor.kl_loss_coef=0.001 \ + actor_rollout_ref.actor.kl_loss_type=low_var_kl \ + actor_rollout_ref.rollout.name=vllm \ + actor_rollout_ref.rollout.gpu_memory_utilization=0.6 \ + actor_rollout_ref.rollout.n=$ROLLOUT_N_SAMPLE \ + actor_rollout_ref.rollout.enforce_eager=False \ + actor_rollout_ref.rollout.free_cache_engine=False \ + algorithm.kl_ctrl.kl_coef=0.001 \ + +algorithm.filter_groups.enable=True \ + trainer.critic_warmup=0 \ + trainer.logger=['wandb'] \ + trainer.project_name='purpcode' \ + trainer.experiment_name=${DATASET}-hard-reward \ + trainer.nnodes=1 \ + trainer.default_local_dir=./models/purpcode-rl-${DATASET}-14b-hard-reward \ + trainer.n_gpus_per_node=$GPUS_PER_NODE \ + trainer.save_freq=32 \ + trainer.test_freq=16 \ + trainer.total_epochs=$MAX_EPOCHS \ + trainer.resume_mode=auto \ + +custom_reward_function.path=./rl/grouped_reward.py \ + reward_model.reward_manager=group $@ 2>&1 | tee grpo.log diff --git a/rl/data/correctness.py b/rl/data/correctness.py new file mode 100644 index 0000000..c003ed8 --- /dev/null +++ b/rl/data/correctness.py @@ -0,0 +1,257 @@ +# SPDX-FileCopyrightText: (c) UIUC PurpCode Team +# +# SPDX-License-Identifier: Apache-2.0 + +import json +import os +import sys + +import rich +from datasets import DatasetDict, concatenate_datasets, load_dataset +from rich.rule import Rule + +from rl.reward_utility.sandbox_fusion import code_exec_sandbox_fusion + +N_SFT_SIZE = 1024 * 5 +KOD_SIZE = 1024 * 50 +N_TESTSET_PER_DATASET = 512 # per dataset +_EMPTY_RETURN_ = { + "data_source": None, + "prompt": None, + "ability": None, + "reward_model": None, + "extra_info": None, +} + + +def minimize_stdio(inputs, outputs, max_n_tests=8): + stdin_list = [] + stdout_list = [] + for stdin, stdout in zip(inputs, outputs): + if isinstance(stdin, list): + stdin = "\n".join(stdin) + if isinstance(stdout, list): + stdout = "\n".join(stdout) + if sys.getsizeof(stdin) > 4 * 1024: + continue + stdout.replace("\r\n", "\n") + stdin_list.append(stdin) + stdout_list.append(stdout) + + zipped = sorted(zip(stdin_list, stdout_list), key=lambda x: sys.getsizeof(x[0])) + + if not zipped: + print("No tests found!") + return [], [] + + sorted_stdin, sorted_stdout = zip(*zipped) + return list(sorted_stdin[:max_n_tests]), list(sorted_stdout[:max_n_tests]) + + +def kodcode(): + dataset_name = "KodCode/KodCode-V1-SFT-R1" + rich.print(Rule(f"Loading {dataset_name}...")) + dataset = load_dataset(dataset_name, split="train") + + block_libs = [ + "fake-useragent", + "keras", + "socket", + "torch", + "scipy", + "sklearn", + "cv2", + "scipy", + "imageio", + "sphinx-pyproject", + "xgboost", + "tweepy", + "flask", + "matplotlib", + "pillow", + "seaborn", + "smtpd", + ] + + def make_map_fn(split): + + def process_fn(example, idx): + reference_solution = example["solution"] + test_code = "from solution import *\n" + example["test_code"].strip() + # skip it if reference solution requires libs from block_libs + if any(lib in reference_solution for lib in block_libs): + return _EMPTY_RETURN_ + if any(lib in test_code for lib in block_libs): + return _EMPTY_RETURN_ + prompt = f"Please solve the programming task below in Python. Code should wrapped in a markdown code block.\n\n{example['question'].strip()}" + if example["test_entry_point"] and example["test_entry_point"].strip(): + prompt += f"\n\nNote that the output function should be {example['test_entry_point'].strip()}." + + succ, err = code_exec_sandbox_fusion( + code=reference_solution, pytest=test_code + ) + if not succ: + rich.print( + f"[bold red]Test code failed for {example['conversation_id']}" + ) + print(reference_solution) + print(err) + return _EMPTY_RETURN_ + + return { + "data_source": "purpcode:code", + "prompt": [ + {"role": "user", "content": prompt}, + ], + "ability": "coding", + "reward_model": { + "style": "rule", + "ground_truth": json.dumps({"pytest": test_code}), + }, + "extra_info": { + "split": split, + "index": idx, + "reference": reference_solution, + "prompt": prompt, + "oracles": ["correctness"], + "dataset": dataset_name, + }, + } + + return process_fn + + # filter by difficulty: 0.3 - 0.9 + dataset = dataset.filter(lambda x: 0.3 <= x["gpt_pass_percentage"] < 0.9) + dataset = dataset.shuffle(seed=666).select(range(KOD_SIZE)) + dataset = dataset.map( + function=make_map_fn("train"), + with_indices=True, + num_proc=96, + remove_columns=dataset.column_names, + ).filter(lambda x: x != _EMPTY_RETURN_) + splits = dataset.train_test_split(test_size=N_TESTSET_PER_DATASET, seed=666) + train_dataset = splits["train"].shuffle(seed=666) + test_dataset = splits["test"] + return train_dataset, test_dataset + + +def leetcode2k(): + rich.print(Rule("Loading LeetCodeDataset...")) + test_dataset = load_dataset( + "json", + data_files="code-r1/LeetCodeDataset/data/LeetCodeDataset-v2-test-problems.jsonl", + )["train"] + print("Test set:", test_dataset) + + train_dataset = concatenate_datasets( + [ + load_dataset( + "json", + data_files="code-r1/LeetCodeDataset/data/LeetCodeDataset-v2-rl-problems.jsonl", + )["train"], + load_dataset( + "json", + data_files="code-r1/LeetCodeDataset/data/LeetCodeDataset-v2-sft-problems.jsonl", + )["train"], + ] + ).filter( + lambda example: example["meta"]["question_id"] + not in set([d["question_id"] for d in test_dataset["meta"]]) + ) + print("Before deduplication - Training set:", train_dataset) + + first_time_idx = [] + seen_question_ids = set() + for i, example in enumerate(train_dataset): + if example["meta"]["question_id"] not in seen_question_ids: + first_time_idx.append(i) + seen_question_ids.add(example["meta"]["question_id"]) + train_dataset = train_dataset.select(first_time_idx) + + print("After deduplication - Training set:", train_dataset) + + # add a row to each data item that represents a unique id + def make_map_fn(split): + + def process_fn(example, idx): + prompt = f"Please solve the programming task below using a self-contained code snippet in a markdown code block.\n\n{example['meta']['query'].strip()}" + return { + "data_source": "purpcode:code", + "prompt": [ + { + "role": "user", + "content": prompt, + }, + ], + "ability": "coding", + "reward_model": { + "style": "rule", + "ground_truth": json.dumps( + { + "functional": f"{example['test']}\n\ncheck({example['entry_point'].strip()})" + } + ), + }, + "extra_info": { + "split": split, + "index": idx, + "reference": example["completion"], + "prompt": prompt, + "oracles": ["correctness"], + "dataset": "LeetCodeDataset", + }, + } + + return process_fn + + train_dataset = train_dataset.map(function=make_map_fn("train"), with_indices=True) + test_dataset = test_dataset.map(function=make_map_fn("test"), with_indices=True) + return train_dataset, test_dataset + + +if __name__ == "__main__": + import argparse + + parser = argparse.ArgumentParser() + parser.add_argument("--root_dir", default="./local_data/") + parser.add_argument("--push_to_hub", action="store_true") + args = parser.parse_args() + + root_dir = args.root_dir + + train_datasets = [] + test_datasets = [] + + dataset_makes = [leetcode2k, kodcode] + names = "-".join([make.__name__ for make in dataset_makes]) + + for train, test in [make() for make in dataset_makes]: + train_datasets.append(train) + test_datasets.append(test) + + train_dataset = concatenate_datasets(train_datasets).shuffle(seed=666) + splits = train_dataset.train_test_split(test_size=N_SFT_SIZE, seed=666) + train_dataset = splits["train"] + sft_dataset = splits["test"] + test_dataset = concatenate_datasets(test_datasets) + + rich.print(Rule("Saving the final dataset")) + print("Train set:", train_dataset) + print("Test set:", test_dataset) + + dataset_name = f"code-r1-{round(len(train_dataset) / 1000)}k-{names}" + local_dir = os.path.join(root_dir, dataset_name) + rich.print(f"[bold green]Saving to {local_dir}...") + train_dataset.to_parquet(os.path.join(local_dir, "train.parquet")) + test_dataset.to_parquet(os.path.join(local_dir, "test.parquet")) + + # combined dataset + if args.push_to_hub: + print(f"Pushing to hub: purpcode/{dataset_name}") + DatasetDict( + { + "train": train_dataset, + "test": test_dataset, + "sft": sft_dataset, + } + ).push_to_hub(repo_id=f"purpcode/{dataset_name}", private=True) diff --git a/rl/data/malevent.py b/rl/data/malevent.py new file mode 100644 index 0000000..9288b94 --- /dev/null +++ b/rl/data/malevent.py @@ -0,0 +1,124 @@ +# SPDX-FileCopyrightText: (c) UIUC PurpCode Team +# +# SPDX-License-Identifier: Apache-2.0 + +import os + +import rich +from datasets import DatasetDict, concatenate_datasets, load_dataset +from rich.rule import Rule + +from rl.data.correctness import _EMPTY_RETURN_ + +MAX_MAL_EVENT_SIZE = 1024 * 8 # 8k prompts + + +def safety(dataset_name, single_turn=False): + rich.print(Rule(f"Loading {dataset_name}...")) + dataset = load_dataset(dataset_name) + + def make_map_fn(split): + + def process_fn(example, idx): + messages = example["messages"] + assert ( + messages[0]["role"] == "user" + ), f"First message must be from user, but got {messages[0]['role']}" + assert all( + msg["role"] == "user" for msg in messages + ), f"User only in 'role'" + # NOTE: Multi-turn conversations are skipped until Yuxiang's exploration on veRL support + if single_turn and len(example["messages"]) > 1: + return _EMPTY_RETURN_ + + example_prompt = messages[0]["content"] + reference = None + + label = "general-safety" + return { + "data_source": f"purpcode:{label}", + "prompt": messages, + "ability": "safety", + "reward_model": { + "style": "rule", + "ground_truth": "", + }, + "extra_info": { + "split": split, + "index": idx, + "reference": reference, + "prompt": example_prompt, + "oracles": [label], + "dataset": dataset_name, + }, + } + + return process_fn + + train_dataset = ( + dataset["train"] + .map( + function=make_map_fn("train"), + with_indices=True, + num_proc=64, + remove_columns=dataset["train"].column_names, + ) + .filter(lambda x: x != _EMPTY_RETURN_) + .shuffle(seed=886) + ) + print(f"{train_dataset = }") + test_dataset = ( + dataset["test"] + .map( + function=make_map_fn("test"), + with_indices=True, + num_proc=64, + remove_columns=dataset["test"].column_names, + ) + .filter(lambda x: x != _EMPTY_RETURN_) + .shuffle(seed=886) + ) + return train_dataset, test_dataset + + +def main(root_dir="./local_data/", push_to_hub=False): + train_datasets = [] + test_datasets = [] + + for ds in [ + "purpcode/mal-event-jailbreak-single-oss-16k", + "purpcode/mal-event-seed-attack-oss-24k", + ]: + train, test = safety(ds, single_turn=True) + train_datasets.append(train) + test_datasets.append(test) + + train_dataset = ( + concatenate_datasets(train_datasets) + .shuffle(seed=666) + .select(range(MAX_MAL_EVENT_SIZE)) + ) + print("Combined training set:", train_dataset) + test_dataset = concatenate_datasets(test_datasets).select(range(256)) + + rich.print(Rule("Saving the final dataset")) + print("Train set:", train_dataset) + print("Test set:", test_dataset) + + dataset_name = f"rl-safety-{round(len(train_dataset) / 1000)}k" + "-single-turn" + local_dir = os.path.join(root_dir, dataset_name) + rich.print(f"[bold green]Saving to {local_dir}...") + train_dataset.to_parquet(os.path.join(local_dir, "train.parquet")) + test_dataset.to_parquet(os.path.join(local_dir, "test.parquet")) + + if push_to_hub: + print(f"Pushing to hub: purpcode/{dataset_name}") + DatasetDict({"train": train_dataset, "test": test_dataset}).push_to_hub( + f"purpcode/{dataset_name}", private=True + ) + + +if __name__ == "__main__": + from fire import Fire + + Fire(main) diff --git a/rl/data/merge.py b/rl/data/merge.py new file mode 100644 index 0000000..880fb66 --- /dev/null +++ b/rl/data/merge.py @@ -0,0 +1,94 @@ +# SPDX-FileCopyrightText: (c) UIUC PurpCode Team +# +# SPDX-License-Identifier: Apache-2.0 + +import os +from functools import partial + +from datasets import concatenate_datasets, load_dataset + +from utils import SYSTEM_PROMPT + + +def align_system_prompt(example, system_prompt): + if "system" == example["prompt"][0]["role"]: + example["prompt"][0]["content"] = system_prompt + else: + example["prompt"].insert(0, {"role": "system", "content": system_prompt}) + return example + + +def merge_datasets(datasets, root, skip: str = None): + skip_ids = [] + if skip is not None: # skip some task_ids + with open(skip, "r") as f: + skip_ids = [ + line.strip().rsplit(":", maxsplit=1)[0] + for line in f.readlines() + if line.strip() + ] + + output_ds_name = "-".join([name.split("/")[-1] for name in datasets]) + + filtered_datasets = [] + + # extend example["extra_info"]["task_id"] if nonexistent + for ds_name in datasets: + print("=" * 28) + print("Processing", ds_name) + if os.path.isdir(ds_name): + dataset = load_dataset( + "parquet", + data_files={ + "train": os.path.join(ds_name, "train.parquet"), + "test": os.path.join(ds_name, "test.parquet"), + }, + ) + else: + dataset = load_dataset(ds_name) + if "task_id" not in dataset["train"]["extra_info"][0]: + dataset["train"] = dataset["train"].map( + lambda x: {"extra_info": {**x["extra_info"], "task_id": None}}, + num_proc=64, + ) + dataset["test"] = dataset["test"].map( + lambda x: {"extra_info": {**x["extra_info"], "task_id": None}}, + num_proc=64, + ) + print(f"Before skipping {dataset['train'] = }") + dataset["train"] = dataset["train"].filter( + lambda x: x["extra_info"]["task_id"] not in skip_ids + ) + print(f"After skipping {dataset['train'] = }") + + filtered_datasets.append(dataset) + + train_datasets = [dataset["train"] for dataset in filtered_datasets] + test_datasets = [ + dataset["test"].select(range(min(800, len(dataset["test"])))) + for dataset in filtered_datasets + ] + align_system_prompt_fn = partial(align_system_prompt, system_prompt=SYSTEM_PROMPT) + train_dataset = concatenate_datasets(train_datasets).shuffle(seed=666) + train_dataset = train_dataset.map(align_system_prompt_fn) + test_dataset = concatenate_datasets(test_datasets).map(align_system_prompt_fn) + + print(f"{train_dataset = }") + print(f"{test_dataset = }") + + local_dir = os.path.join(root, output_ds_name) + print(f"Saving to {local_dir}...") + train_dataset.to_parquet(os.path.join(local_dir, "train.parquet")) + test_dataset.to_parquet(os.path.join(local_dir, "test.parquet")) + + +if __name__ == "__main__": + import argparse + + parser = argparse.ArgumentParser() + parser.add_argument("--datasets", nargs="+") + parser.add_argument("--root-dir", default="./local_data/") + parser.add_argument("--skip", default=None, help="Path to skip file") + args = parser.parse_args() + + merge_datasets(datasets=args.datasets, root=args.root_dir, skip=args.skip) diff --git a/rl/data/secqa.py b/rl/data/secqa.py new file mode 100644 index 0000000..de172df --- /dev/null +++ b/rl/data/secqa.py @@ -0,0 +1,114 @@ +# SPDX-FileCopyrightText: (c) UIUC PurpCode Team +# +# SPDX-License-Identifier: Apache-2.0 + +import os + +import rich +from datasets import DatasetDict, concatenate_datasets, load_dataset +from rich.rule import Rule + +from rl.data.correctness import _EMPTY_RETURN_ + + +def secqa(dataset_name): + rich.print(Rule(f"Loading {dataset_name}...")) + dataset = load_dataset(dataset_name) + + def make_map_fn(split): + + def process_fn(example, idx): + # NOTE: Multi-turn conversations are skipped until Yuxiang's exploration on veRL support + if len(example["messages"]) != 1: + return _EMPTY_RETURN_ + + messages = example["messages"] + assert ( + messages[0]["role"] == "user" + ), f"First message must be from user, but got {messages[0]['role']}" + + example_prompt = messages[0]["content"] + reference = None + + label = "noreject" + return { + "data_source": f"purpcode:{label}", + "prompt": messages, + "ability": "secqa", + "reward_model": { + "style": "rule", + "ground_truth": "", + }, + "extra_info": { + "split": split, + "index": idx, + "reference": reference, + "prompt": example_prompt, + "oracles": [label], + "dataset": dataset_name, + "task_id": f"{dataset_name}:{example['task_id']}", + }, + } + + return process_fn + + train_dataset = ( + dataset["train"] + .map( + function=make_map_fn("train"), + with_indices=True, + num_proc=64, + remove_columns=dataset["train"].column_names, + ) + .filter(lambda x: x != _EMPTY_RETURN_) + .shuffle(seed=666) + ) + print(f"{train_dataset = }") + test_dataset = ( + dataset["test"] + .map( + function=make_map_fn("test"), + with_indices=True, + num_proc=64, + remove_columns=dataset["test"].column_names, + ) + .filter(lambda x: x != _EMPTY_RETURN_) + .shuffle(seed=666) + ) + return train_dataset, test_dataset + + +def main(root_dir: str = "./local_data", push_to_hub: bool = False): + train_datasets = [] + test_datasets = [] + + for ds in ["purpcode/secqa_utility_train"]: + train, test = secqa(ds) + train_datasets.append(train) + test_datasets.append(test) + + train_dataset = concatenate_datasets(train_datasets).shuffle(seed=666) + print("Combined training set:", train_dataset) + test_dataset = concatenate_datasets(test_datasets) + + rich.print(Rule("Saving the final dataset")) + print("Train set:", train_dataset) + print("Test set:", test_dataset) + + dataset_name = f"rl-secqa-{round(len(train_dataset) / 1000)}k" + local_dir = os.path.join(root_dir, dataset_name) + rich.print(f"[bold green]Saving to {local_dir}...") + train_dataset.to_parquet(os.path.join(local_dir, "train.parquet")) + test_dataset.to_parquet(os.path.join(local_dir, "test.parquet")) + + if push_to_hub: + print(f"Pushing to hub: purpcode/{dataset_name}") + DatasetDict({"train": train_dataset, "test": test_dataset}).push_to_hub( + f"purpcode/{dataset_name}", private=True + ) + + +if __name__ == "__main__": + from fire import Fire + + Fire(main) diff --git a/rl/data/security.py b/rl/data/security.py new file mode 100644 index 0000000..192b99f --- /dev/null +++ b/rl/data/security.py @@ -0,0 +1,114 @@ +# SPDX-FileCopyrightText: (c) UIUC PurpCode Team +# +# SPDX-License-Identifier: Apache-2.0 + +import os + +import rich +from datasets import DatasetDict, concatenate_datasets, load_dataset +from rich.rule import Rule + +from rl.data.correctness import _EMPTY_RETURN_ + + +def vulcode(dataset_name): + rich.print(Rule(f"Loading {dataset_name}...")) + dataset = load_dataset(dataset_name) + + def make_map_fn(split): + + def process_fn(example, idx): + # NOTE: Multi-turn conversations are skipped until Yuxiang's exploration on veRL support + messages = example["messages"] + + if len(messages) != 1: + return _EMPTY_RETURN_ + + assert ( + messages[0]["role"] == "user" + ), f"First message must be from user, but got {messages[0]['role']}" + + example_prompt = messages[0]["content"] + reference = None + + label = "codesec" + return { + "data_source": f"purpcode:{label}", + "prompt": messages, + "ability": "vulcode", + "reward_model": { + "style": "rule", + "ground_truth": "", + }, + "extra_info": { + "split": split, + "index": idx, + "reference": reference, + "prompt": example_prompt, + "oracles": [label], + "dataset": dataset_name, + "task_id": f"{dataset_name}:{example['task_id']}", + }, + } + + return process_fn + + train_dataset = ( + dataset["train"] + .map( + function=make_map_fn("train"), + with_indices=True, + num_proc=64, + remove_columns=dataset["train"].column_names, + ) + .filter(lambda x: x != _EMPTY_RETURN_) + .shuffle(seed=666) + ) + print(f"{train_dataset = }") + return train_dataset + + +if __name__ == "__main__": + import argparse + + parser = argparse.ArgumentParser() + parser.add_argument("--root_dir", default="./local_data/") + parser.add_argument("--push_to_hub", action="store_true") + args = parser.parse_args() + + train_datasets = [] + + dataset_makes = [vulcode] + + for ds in [ + "purpcode/vul2prompt-general-oss", + "purpcode/vul2prompt-benign2vul-oss", + "purpcode/vul2prompt-vul2vul-oss", + "purpcode/vul2prompt-jailbreaking-oss-11k", + ]: + train = vulcode(ds) + train_datasets.append(train) + + train_dataset = concatenate_datasets(train_datasets).shuffle(seed=666) + splits = train_dataset.train_test_split(test_size=1024, shuffle=True, seed=666) + train_dataset = splits["train"] + test_dataset = splits["test"] + + rich.print(Rule("Saving the final dataset")) + print("Train set:", train_dataset) + print("Test set:", test_dataset) + + dataset_name = f"rl-codesec-{round(len(train_dataset) / 1000)}k" + local_dir = os.path.join(args.root_dir, dataset_name) + rich.print(f"[bold green]Saving to {local_dir}...") + train_dataset.to_parquet(os.path.join(local_dir, "train.parquet")) + test_dataset.to_parquet(os.path.join(local_dir, "test.parquet")) + + if args.push_to_hub: + print(f"Pushing to hub: purpcode/{dataset_name}") + DatasetDict( + { + "train": train_dataset, + "test": test_dataset, + } + ).push_to_hub(f"purpcode/{dataset_name}", private=True) diff --git a/rl/grouped_reward.py b/rl/grouped_reward.py new file mode 100644 index 0000000..38e34b0 --- /dev/null +++ b/rl/grouped_reward.py @@ -0,0 +1,132 @@ +# SPDX-FileCopyrightText: (c) UIUC PurpCode Team +# +# SPDX-License-Identifier: Apache-2.0 + +import json +import os +import random +import re +import time +from collections import defaultdict +from concurrent.futures import ProcessPoolExecutor, as_completed +from multiprocessing import cpu_count +from typing import List + +from rl.reward import ( + DEFAULT_FMT_REWARD, + _wrap_log, + check_fmt_with_idx, + extract_code_from_string, + sync_group_compute_answer_score_with_idx, + try_extract_answer, +) +from rl.reward_utility.code_analyzer import analyze_code_security_v2 + +PRINT_RATIO = 0.05 + +SOFT_CODESEC_REWARD = float(os.environ.get("SOFT_CODESEC_REWARD", "0.8")) + + +def compute_score( + responses: List[str], ground_truth, data_sources: List[str], extra_info +) -> List[float]: + t_start = time.time() + elapsed = lambda: f"[โฐ GROUP TIME :: {time.time() - t_start:.1f}s] :: " + + counter = 0 + + def _print_log(score, reward_log, max_chars=None): + nonlocal counter + counter += 1 + if random.random() > PRINT_RATIO: + return + reward_log = _wrap_log(score, reward_log, max_chars=max_chars) + reward_log += f"\n{elapsed()} :: Finished sample {counter} / {len(responses)}" + print(re.sub(r"\n+", "\n", reward_log) + "\n\n") + + return_scores = [None] * len(responses) + answers = [None] * len(responses) + + oracle2idx = defaultdict(list) + + # Step 1: Grouped format checking + with ProcessPoolExecutor(max_workers=min(64, cpu_count())) as executor: + fs = [ + executor.submit(check_fmt_with_idx, index=i, response=response) + for i, response in enumerate(responses) + ] + + for f in as_completed(fs): + i, (fmt_ok, fmt_logs) = f.result() + return_scores[i] = DEFAULT_FMT_REWARD if fmt_ok else 0.0 + if not fmt_ok: + _print_log(score=return_scores[i], reward_log=fmt_logs, max_chars=2048) + continue + + # TODO(@ganler): consider multiple orales in the future + answers[i] = try_extract_answer(responses[i]) + oracles = extra_info[i]["oracles"] + assert len(oracles) == 1, "Assume one oracle" + oracle2idx[oracles[0]].append(i) + + print(f"{elapsed()} Finished fmt-check for {len(responses)} samples") + print("Post-fmt-check tasks:", {k: len(v) for k, v in oracle2idx.items()}) + + # Step 2: Dispatching + # - Correctness & LLM Judge + + def emit_tasks(executor, indices, max_concurrency=None): + return executor.submit( + sync_group_compute_answer_score_with_idx, + indices=indices, + answers=[answers[i] for i in indices], + ground_truth=[ground_truth[i] for i in indices], + extra_info=[extra_info[i] for i in indices], + max_concurrency=max_concurrency, + ) + + task_groups = [ + {"oracles": ["correctness"], "max_concurrency": round(1.5 * cpu_count())}, + {"oracles": ["general-safety", "reject", "noreject"], "max_concurrency": 1024}, + ] + + with ProcessPoolExecutor(max_workers=len(task_groups)) as executor: + subgroup_futures = [] + for group in task_groups: + if indices := sum([oracle2idx[oracle] for oracle in group["oracles"]], []): + subgroup_futures.append( + emit_tasks( + executor, indices, max_concurrency=group["max_concurrency"] + ) + ) + + if safety_indices := oracle2idx["codesec"]: # - Code security analysis + sec_tstart = time.time() + print(f"{elapsed()} STARTING code analysis...") + issue_list = analyze_code_security_v2( + [extract_code_from_string(answers[i]) for i in safety_indices] + ) + has_python = {i: "```python" in answers[i] for i in safety_indices} + for idx, issues in zip(safety_indices, issue_list): + fense = lambda s: "-" * 16 + s + "-" * 16 + logs = [fense("Prompt"), extra_info[idx]["prompt"]] + logs.extend([fense("Code to Analyze"), answers[idx]]) + if not issues: + return_scores[idx] = 1.0 if has_python[idx] else SOFT_CODESEC_REWARD + logs.append("๐Ÿ’ช" * 16 + "No vulnerability detected" + "๐Ÿ’ช" * 16) + else: + logs.append("๐Ÿž" * 16 + "Vulnerabilities detected" + "๐Ÿž" * 16) + logs.append(json.dumps(issues, indent=4)) + _print_log(return_scores[idx], "\n".join(logs), max_chars=2048) + print(f"{elapsed()} Code analysis done in {time.time() - sec_tstart:.1f}s") + + for f in as_completed(subgroup_futures): + for group_result in f.result(): + idx, (ok, log) = group_result + if ok: + return_scores[idx] = 1.0 + _print_log(score=return_scores[idx], reward_log=log, max_chars=2048) + + # Step 3: Return + assert all(score is not None for score in return_scores) + return return_scores diff --git a/rl/main_grpo_qwen14b.sh b/rl/main_grpo_qwen14b.sh new file mode 100644 index 0000000..05d00f6 --- /dev/null +++ b/rl/main_grpo_qwen14b.sh @@ -0,0 +1,76 @@ +#!/bin/bash +# SPDX-FileCopyrightText: (c) UIUC PurpCode Team +# +# SPDX-License-Identifier: Apache-2.0 + +# The config is optimized for 8xH200 +# Assuming using vLLM >= 0.8 such that is V1 is enbaled by default +# Depends on: https://github.com/ganler/verl/tree/opt +set -eux + +export PYTHONPATH=$(pwd) + +python -c "import rl.data" + +if [ -z "${CUDA_VISIBLE_DEVICES+x}" ]; then + GPUS_PER_NODE=$(nvidia-smi --query-gpu=name --format=csv,noheader | wc -l) +else + GPUS_PER_NODE=$(echo "$CUDA_VISIBLE_DEVICES" | awk -F',' '{print NF}') +fi + +# Tips for reducing VRAM usage +# 1. Reduce MICRO_BATCH_PER_GPU (and increase GRAD_ACCUM_STEPS accordingly) +# 2. Reduce the factor (6) in PPO_MAX_TOKEN_LEN_PER_GPU to 3 + +# MAIN CONFIG +DATASET=code-r1-46k-leetcode2k-kodcode-rl-codesec-78k-rl-secqa-11k-rl-safety-8k-single-turn +MODEL_PATH="outputs/purpcode-14b-ctxdistill" +MICRO_BATCH_PER_GPU=48 +ROLLOUT_N_SAMPLE=8 +MAX_PROMPT_LEN=2048 +MAX_RESPONSE_LEN=3072 +MAX_EPOCHS=1 + +# AUTO VALUES +ROLLOUT_N_QUERY=$((MICRO_BATCH_PER_GPU * GPUS_PER_NODE)) +PPO_MAX_TOKEN_LEN_PER_GPU=$(( 8 * $(( $MAX_PROMPT_LEN + $MAX_RESPONSE_LEN )) )) + +python3 -m verl.trainer.main_ppo \ + algorithm.adv_estimator=grpo \ + data.train_files=local_data/$DATASET/train.parquet \ + data.val_files=local_data/$DATASET/test.parquet \ + data.filter_overlong_prompts=True \ + data.train_batch_size=$ROLLOUT_N_QUERY \ + +data.max_roll_factor=4 \ + data.max_prompt_length=$MAX_PROMPT_LEN \ + data.max_response_length=$MAX_RESPONSE_LEN \ + actor_rollout_ref.actor.optim.lr=5e-7 \ + actor_rollout_ref.model.use_remove_padding=True \ + actor_rollout_ref.model.path=$MODEL_PATH \ + actor_rollout_ref.model.enable_gradient_checkpointing=True \ + actor_rollout_ref.actor.ppo_mini_batch_size=$ROLLOUT_N_QUERY \ + actor_rollout_ref.actor.ppo_max_token_len_per_gpu=$PPO_MAX_TOKEN_LEN_PER_GPU \ + actor_rollout_ref.actor.use_dynamic_bsz=True \ + actor_rollout_ref.actor.use_kl_loss=True \ + actor_rollout_ref.actor.kl_loss_coef=0.001 \ + actor_rollout_ref.actor.kl_loss_type=low_var_kl \ + actor_rollout_ref.rollout.name=vllm \ + actor_rollout_ref.rollout.gpu_memory_utilization=0.6 \ + actor_rollout_ref.rollout.n=$ROLLOUT_N_SAMPLE \ + actor_rollout_ref.rollout.enforce_eager=False \ + actor_rollout_ref.rollout.free_cache_engine=False \ + algorithm.kl_ctrl.kl_coef=0.001 \ + +algorithm.filter_groups.enable=True \ + trainer.critic_warmup=0 \ + trainer.logger=['wandb'] \ + trainer.project_name='purpcode' \ + trainer.experiment_name=${DATASET}-14b \ + trainer.nnodes=1 \ + trainer.default_local_dir=./models/purpcode-14b-rl-${DATASET} \ + trainer.n_gpus_per_node=$GPUS_PER_NODE \ + trainer.save_freq=32 \ + trainer.test_freq=16 \ + trainer.total_epochs=$MAX_EPOCHS \ + trainer.resume_mode=auto \ + +custom_reward_function.path=./rl/grouped_reward.py \ + reward_model.reward_manager=group $@ 2>&1 | tee grpo.log diff --git a/rl/main_grpo_qwen32b.sh b/rl/main_grpo_qwen32b.sh new file mode 100644 index 0000000..e2363ac --- /dev/null +++ b/rl/main_grpo_qwen32b.sh @@ -0,0 +1,76 @@ +#!/bin/bash +# SPDX-FileCopyrightText: (c) UIUC PurpCode Team +# +# SPDX-License-Identifier: Apache-2.0 + +# The config is optimized for 8xH200 +# Assuming using vLLM >= 0.8 such that is V1 is enbaled by default +# Depends on: https://github.com/ganler/verl/tree/opt +set -eux + +export PYTHONPATH=$(pwd) + +python -c "import rl.data" + +if [ -z "${CUDA_VISIBLE_DEVICES+x}" ]; then + GPUS_PER_NODE=$(nvidia-smi --query-gpu=name --format=csv,noheader | wc -l) +else + GPUS_PER_NODE=$(echo "$CUDA_VISIBLE_DEVICES" | awk -F',' '{print NF}') +fi + +# Tips for reducing VRAM usage +# 1. Reduce MICRO_BATCH_PER_GPU (and increase GRAD_ACCUM_STEPS accordingly) +# 2. Reduce the factor (6) in PPO_MAX_TOKEN_LEN_PER_GPU to 3 + +# MAIN CONFIG +DATASET=code-r1-46k-leetcode2k-kodcode-rl-codesec-78k-rl-secqa-11k-rl-safety-8k-single-turn +MODEL_PATH="outputs/purpcode-32b-ctxdistill-0622" +MICRO_BATCH_PER_GPU=48 +ROLLOUT_N_SAMPLE=8 +MAX_PROMPT_LEN=2048 +MAX_RESPONSE_LEN=3072 +MAX_EPOCHS=1 + +# AUTO VALUES +ROLLOUT_N_QUERY=$((MICRO_BATCH_PER_GPU * GPUS_PER_NODE)) +PPO_MAX_TOKEN_LEN_PER_GPU=$(( 5 * $(( $MAX_PROMPT_LEN + $MAX_RESPONSE_LEN )) )) + +python3 -m verl.trainer.main_ppo \ + algorithm.adv_estimator=grpo \ + data.train_files=local_data/$DATASET/train.parquet \ + data.val_files=local_data/$DATASET/test.parquet \ + data.filter_overlong_prompts=True \ + data.train_batch_size=$ROLLOUT_N_QUERY \ + +data.max_roll_factor=4 \ + data.max_prompt_length=$MAX_PROMPT_LEN \ + data.max_response_length=$MAX_RESPONSE_LEN \ + actor_rollout_ref.actor.optim.lr=5e-7 \ + actor_rollout_ref.model.use_remove_padding=True \ + actor_rollout_ref.model.path=$MODEL_PATH \ + actor_rollout_ref.model.enable_gradient_checkpointing=True \ + actor_rollout_ref.actor.ppo_mini_batch_size=$ROLLOUT_N_QUERY \ + actor_rollout_ref.actor.ppo_max_token_len_per_gpu=$PPO_MAX_TOKEN_LEN_PER_GPU \ + actor_rollout_ref.actor.use_dynamic_bsz=True \ + actor_rollout_ref.actor.use_kl_loss=True \ + actor_rollout_ref.actor.kl_loss_coef=0.001 \ + actor_rollout_ref.actor.kl_loss_type=low_var_kl \ + actor_rollout_ref.rollout.name=vllm \ + actor_rollout_ref.rollout.gpu_memory_utilization=0.6 \ + actor_rollout_ref.rollout.n=$ROLLOUT_N_SAMPLE \ + actor_rollout_ref.rollout.enforce_eager=False \ + actor_rollout_ref.rollout.free_cache_engine=False \ + algorithm.kl_ctrl.kl_coef=0.001 \ + +algorithm.filter_groups.enable=True \ + trainer.critic_warmup=0 \ + trainer.logger=['wandb'] \ + trainer.project_name='purpcode' \ + trainer.experiment_name=${DATASET}-32b \ + trainer.nnodes=1 \ + trainer.default_local_dir=./models/purpcode-32b-rl-${DATASET} \ + trainer.n_gpus_per_node=$GPUS_PER_NODE \ + trainer.save_freq=32 \ + trainer.test_freq=16 \ + trainer.total_epochs=$MAX_EPOCHS \ + trainer.resume_mode=auto \ + +custom_reward_function.path=./rl/grouped_reward.py \ + reward_model.reward_manager=group $@ 2>&1 | tee grpo.log diff --git a/rl/model_merger.py b/rl/model_merger.py new file mode 100644 index 0000000..c08885e --- /dev/null +++ b/rl/model_merger.py @@ -0,0 +1,189 @@ +# SPDX-FileCopyrightText: (c) UIUC PurpCode Team +# +# SPDX-License-Identifier: Apache-2.0 + +# Copyright 2024 Bytedance Ltd. and/or its affiliates +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import json +import os +import re +from concurrent.futures import ThreadPoolExecutor +from typing import Dict, List, Tuple + +import torch +from torch.distributed._tensor import DTensor, Placement, Shard +from transformers import ( + AutoConfig, + AutoModelForCausalLM, + AutoModelForTokenClassification, + AutoModelForVision2Seq, +) + + +def merge_by_placement(tensors: List[torch.Tensor], placement: Placement): + if placement.is_replicate(): + return tensors[0] + elif placement.is_partial(): + raise NotImplementedError("Partial placement is not supported yet") + elif placement.is_shard(): + return torch.cat(tensors, dim=placement.dim).contiguous() + else: + raise ValueError(f"Unsupported placement: {placement}") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument( + "--local_dir", required=True, type=str, help="The path for your saved model" + ) + parser.add_argument( + "--hf_upload_path", + default=False, + type=str, + help="The path of the huggingface repo to upload", + ) + args = parser.parse_args() + + assert not args.local_dir.endswith( + "huggingface" + ), "The local_dir should not end with huggingface" + local_dir = args.local_dir + + # copy rank zero to find the shape of (dp, fsdp) + rank = 0 + world_size = 0 + for filename in os.listdir(local_dir): + match = re.match(r"model_world_size_(\d+)_rank_0\.pt", filename) + if match: + world_size = match.group(1) + break + assert world_size, "No model file with the proper format" + + state_dict = torch.load( + os.path.join(local_dir, f"model_world_size_{world_size}_rank_{rank}.pt"), + map_location="cpu", + ) + pivot_key = sorted(list(state_dict.keys()))[0] + weight = state_dict[pivot_key] + assert isinstance(weight, torch.distributed._tensor.DTensor) + # get sharding info + device_mesh = weight.device_mesh + mesh = device_mesh.mesh + mesh_dim_names = device_mesh.mesh_dim_names + + print(f"Got device mesh {mesh}, mesh_dim_names {mesh_dim_names}") + + assert mesh_dim_names in ( + ("fsdp",), + ), f"Unsupported mesh_dim_names {mesh_dim_names}" + + if "tp" in mesh_dim_names: + # fsdp * tp + total_shards = mesh.shape[-1] * mesh.shape[-2] + mesh_shape = (mesh.shape[-2], mesh.shape[-1]) + else: + # fsdp + total_shards = mesh.shape[-1] + mesh_shape = (mesh.shape[-1],) + + print(f"Processing model shards with {total_shards} {mesh_shape} in total") + + model_state_dict_lst = [] + model_state_dict_lst.append(state_dict) + model_state_dict_lst.extend([""] * (total_shards - 1)) + + def process_one_shard(rank): + model_path = os.path.join( + local_dir, f"model_world_size_{world_size}_rank_{rank}.pt" + ) + state_dict = torch.load(model_path, map_location="cpu", weights_only=False) + model_state_dict_lst[rank] = state_dict + return state_dict + + with ThreadPoolExecutor(max_workers=min(32, os.cpu_count())) as executor: + for rank in range(1, total_shards): + executor.submit(process_one_shard, rank) + state_dict = {} + param_placements: Dict[str, List[Placement]] = {} + keys = set(model_state_dict_lst[0].keys()) + for key in keys: + state_dict[key] = [] + for model_state_dict in model_state_dict_lst: + try: + tensor = model_state_dict.pop(key) + except: + print("-" * 30) + print(model_state_dict) + if isinstance(tensor, DTensor): + state_dict[key].append(tensor._local_tensor.bfloat16()) + placements = tuple(tensor.placements) + # replicated placement at dp dimension can be discarded + if mesh_dim_names[0] == "dp": + placements = placements[1:] + if key not in param_placements: + param_placements[key] = placements + else: + assert param_placements[key] == placements + else: + state_dict[key] = tensor.bfloat16() + + del model_state_dict_lst + + for key in sorted(state_dict): + if not isinstance(state_dict[key], list): + print(f"No need to merge key {key}") + continue + # merge shards + placements: Tuple[Shard] = param_placements[key] + if len(mesh_shape) == 1: + # 1-D list, FSDP without TP + assert len(placements) == 1 + shards = state_dict[key] + state_dict[key] = merge_by_placement(shards, placements[0]) + else: + # 2-D list, FSDP + TP + raise NotImplementedError("FSDP + TP is not supported yet") + + print("Writing to local disk") + hf_path = os.path.join(local_dir, "huggingface") + config = AutoConfig.from_pretrained(hf_path) + + if "ForTokenClassification" in config.architectures[0]: + auto_model = AutoModelForTokenClassification + elif "ForCausalLM" in config.architectures[0]: + auto_model = AutoModelForCausalLM + elif "ForConditionalGeneration" in config.architectures[0]: + auto_model = AutoModelForVision2Seq + else: + raise NotImplementedError(f'Unknown architecture {config["architectures"]}') + + with torch.device("meta"): + model = auto_model.from_config(config, torch_dtype=torch.bfloat16) + model.to_empty(device="cpu") + + print(f"Saving model to {hf_path}") + model.save_pretrained(hf_path, state_dict=state_dict) + del state_dict + del model + if args.hf_upload_path: + # Push to hugging face + from huggingface_hub import HfApi + + api = HfApi() + api.create_repo(repo_id=args.hf_upload_path, private=False, exist_ok=True) + api.upload_folder( + folder_path=hf_path, repo_id=args.hf_upload_path, repo_type="model" + ) diff --git a/rl/reward.py b/rl/reward.py new file mode 100644 index 0000000..1e224b1 --- /dev/null +++ b/rl/reward.py @@ -0,0 +1,316 @@ +# SPDX-FileCopyrightText: (c) UIUC PurpCode Team +# +# SPDX-License-Identifier: Apache-2.0 + +import asyncio +import json +import time +from collections import defaultdict +from concurrent.futures import ThreadPoolExecutor, as_completed +from traceback import format_exc +from typing import Tuple + +import numpy as np +from dotenv import load_dotenv +from openai import AsyncOpenAI, OpenAI + +load_dotenv() + +from eval.oracles.check_secqa import * +from eval.oracles.malicious_assistance_detection import * +from rl.reward_utility.sandbox_fusion import _ERROR_MSG_PREFIX +from rl.reward_utility.sandbox_fusion import code_exec_sandbox_fusion as code_exec +from utils.verify_text import check_fmt, extract_code_from_string, try_extract_answer + +_MAX_CHAR_DISPLAY = 4096 +LLM_JUDGE_OPENAI_URL = os.getenv("LLM_JUDGE_OPENAI_URL") + +print(f"{LLM_JUDGE_OPENAI_URL = }") + + +def get_llm_judge_name(): + try: + client = OpenAI(api_key="none", base_url=LLM_JUDGE_OPENAI_URL) + models = client.models.list() + return models.data[0].id + except Exception: + print("Failed to get LLM judge model name, using `default`.") + print(format_exc()) + return "default" + + +LLM_JUDGE_MODEL_ID = get_llm_judge_name() + + +def check_fmt_with_idx(index, response): + return index, check_fmt(response) + + +def remote_check_stdio(code, stdin, stdout): + succ, output = code_exec(code=code, stdin=stdin) + return succ, output, stdin, stdout + + +def _check_correctness(solution_str, ground_truth, extra_info) -> Tuple[bool, str]: + reward_log = [] + + solution_code = extract_code_from_string(solution_str) + + reward_log.append("-" * 16 + "Extracted Code to Execute" + "-" * 16) + reward_log.append(solution_code) + + t_start = time.time() + + ground_truth = json.loads(ground_truth) + if "pytest" in ground_truth or "functional" in ground_truth: + if "functional" in ground_truth: + succ, output = code_exec(solution_code + "\n" + ground_truth["functional"]) + else: # pytest + succ, output = code_exec(solution_code, pytest=ground_truth["pytest"]) + if not succ: + reward_log.append("-" * 16 + "Failed Prompt" + "-" * 16) + reward_log.append(extra_info["prompt"]) + reward_log.append( + "!" * 16 + + f"โš ๏ธ Test Execution Failed in {time.time() - t_start:.1f}s" + + "!" * 16 + ) + reward_log.append(output[:_MAX_CHAR_DISPLAY]) + return False, "\n".join(reward_log) + elif "inputs" in ground_truth and "outputs" in ground_truth: + stdin_list: str = ground_truth["inputs"] + stdout_list: str = ground_truth["outputs"] + + # Add parallelism + with ThreadPoolExecutor(max_workers=min(16, len(stdin_list))) as executor: + futures = [ + executor.submit(remote_check_stdio, solution_code, stdin, stdout) + for stdin, stdout in zip(stdin_list, stdout_list) + ] + for future in as_completed(futures): + succ, output, stdin, stdout = future.result() + if not succ or output.strip() != stdout.strip(): + output = output[:_MAX_CHAR_DISPLAY] # truncate output to print + reward_log.append("-" * 16 + "Failed Prompt" + "-" * 16) + reward_log.append(extra_info["prompt"].replace("\n\n", "\n")) + reward_log.append( + "!" * 16 + + f"โš ๏ธ Test Execution Failed in {time.time() - t_start:.1f}s" + + "!" * 16 + ) + reward_log.append(f"๐Ÿ”ŽInput: {repr(stdin)}") + reward_log.append(f"โœ…Expected: {repr(stdout.strip())}") + reward_log.append( + f"โŒActual: {output if output.startswith(_ERROR_MSG_PREFIX) else repr(output.strip())}" + ) + return False, "\n".join(reward_log) + else: + raise ValueError( + f"Current supports for ground-truth are ['functional', 'inputs/outputs'] -- No idea what's: {ground_truth = }" + ) + + reward_log.append("+" * 16 + "Test Execution Passed! (Output)" + "+" * 16) + reward_log.append(output) + return True, "\n".join(reward_log) + + +async def _check_malicious(actor_response, prompt): + reward_log = [] + prompt_check_malicious = malicious_assistance_detection_prompt( + { + "messages": [ + {"role": "user", "content": prompt}, + {"role": "assistant", "content": actor_response}, + ] + } + ) + + client = AsyncOpenAI(api_key="none", base_url=LLM_JUDGE_OPENAI_URL) + completion = await client.chat.completions.create( + model=LLM_JUDGE_MODEL_ID, + messages=prompt_check_malicious["messages"], + max_completion_tokens=256, + stop=["PV_MAL_CODE: TRUE", "PV_MAL_EXPLN: TRUE"], + ) + judge_response = completion.choices[0].message.content + + reward_log.append("-" * 16 + "Prompt" + "-" * 16) + reward_log.append(prompt) + reward_log.append("-" * 16 + "Response" + "-" * 16) + reward_log.append(actor_response) + + if ": TRUE" in judge_response: + reward_log.append("-" * 16 + "๐Ÿ’ฅ Malicious Assistance Detected!" + "-" * 16) + reward_log.append("-" * 16 + "LLM Judge Output Below" + "-" * 16) + reward_log.append(judge_response) + return False, "\n".join(reward_log) + + reward_log.append("+" * 16 + "๐Ÿ”’ No Malicious Assistance Detected!" + "+" * 16) + reward_log.append(judge_response) + return True, "\n".join(reward_log) + + +async def _check_secqa(actor_response, prompt, expect_bad=True): + reward_log = [] + prompt_check_malicious = check_secqa_prompt( + { + "messages": [ + {"role": "user", "content": prompt}, + {"role": "assistant", "content": actor_response}, + ] + } + ) + + client = AsyncOpenAI(api_key="none", base_url=LLM_JUDGE_OPENAI_URL) + completion = await client.chat.completions.create( + model=LLM_JUDGE_MODEL_ID, + messages=prompt_check_malicious["messages"], + max_completion_tokens=256, + ) + judge_response = completion.choices[0].message.content + + reward_log.append("-" * 16 + "Prompt" + "-" * 16) + reward_log.append(prompt) + reward_log.append("-" * 16 + "Response" + "-" * 16) + reward_log.append(actor_response) + is_bad = is_bad_answer(judge_response) + hatch = "๐Ÿšจ" if is_bad != expect_bad else "๐Ÿ”’" + if is_bad: + reward_log.append("-" * 16 + f"{hatch} Bad SecQA Answer Detected" + "-" * 16) + else: + reward_log.append("-" * 16 + f"{hatch} Good SecQA Answer" + "-" * 16) + reward_log.append(judge_response) + return is_bad == expect_bad, "\n".join(reward_log) + + +DEFAULT_FMT_REWARD = 0.0 # set it to zero now as models can now better follow the fmt + + +def _wrap_log(score: float, reward_log: str, max_chars: int = None): + if max_chars is not None and len(reward_log) > max_chars: + half = max_chars // 2 + omitted_count = len(reward_log) - (2 * half) + reward_log = ( + reward_log[:half] + + f"\n... omitted {omitted_count} chars ... \n" + + reward_log[-half:] + ) + + fense = ("โœ…" if score >= 1.0 else "โŒ") * 16 + return ( + fense + + " Reward Calculation " + + fense + + "\n" + + reward_log + + "\n" + + fense + + f" Final Reward = {score} " + + fense + ) + + +async def compute_answer_score(answer, ground_truth, extra_info): + reward_log_list = [] + for o in extra_info["oracles"]: + tstart = time.time() + if "correctness" == o: + ok, reward_log = await asyncio.to_thread( + _check_correctness, answer, ground_truth, extra_info=extra_info + ) + elif "general-safety" == o: + ok, reward_log = await _check_malicious(answer, extra_info["prompt"]) + elif o in ["reject", "noreject"]: + ok, reward_log = await _check_secqa( + answer, extra_info["prompt"], expect_bad="reject" == o + ) + elif o == "security": + raise ValueError( + "Please use grouped reward manager for security-related oracle." + ) + else: + raise ValueError(f"Unknown oracle: {o}") + + reward_log_list.append(reward_log) + reward_log_list.append( + f"โฐ Running oracle {o} took {time.time() - tstart:.1f}s" + ) + if not ok: + return False, "\n".join(reward_log_list) + + return True, "\n".join(reward_log_list) + + +def compute_score(data_source, response, ground_truth, extra_info): + answer_reward = 1.0 + t_start = time.time() + if isinstance(extra_info, np.ndarray): + extra_info = extra_info.item() + + def _return_score(score, reward_log, marker="-", max_chars=None): + reward_log = _wrap_log( + score, "\n".join(reward_log), marker=marker, max_chars=max_chars + ) + reward_log += f"\n{data_source = } :: computed in {time.time() - t_start}s" + return score + + reward_logs = [] + + # Format + fmt_ok, fmt_logs = check_fmt(response) + reward_logs.append(fmt_logs) + if not fmt_ok: + return _return_score(0.0, reward_logs, marker="โŒ", max_chars=2048) + + answer = try_extract_answer(response) + ok, reward_log = asyncio.run(compute_answer_score(answer, ground_truth, extra_info)) + reward_logs.append(reward_log) + if not ok: + return _return_score(DEFAULT_FMT_REWARD, reward_logs, marker="โŒ") + + return _return_score(answer_reward, reward_logs, marker="โœ…") + + +def sync_group_compute_answer_score_with_idx( + indices, answers, ground_truth, extra_info, max_concurrency=None +): + tstart = time.time() + + async def main(): + async def run(index, ans, gt, ei): + return index, await compute_answer_score(ans, gt, ei) + + sem = asyncio.Semaphore(max_concurrency) if max_concurrency else None + + async def run_with_concurrency(index, ans, gt, ei): + async with sem: + return index, await compute_answer_score(ans, gt, ei) + + run_fn = run_with_concurrency if sem else run + + return await asyncio.gather( + *[ + asyncio.create_task(run_fn(i, a, g, e)) + for i, a, g, e in zip(indices, answers, ground_truth, extra_info) + ] + ) + + ret = asyncio.run(main()) # sync here + stat = defaultdict(dict) + for (_, (ok, _)), ei in zip(ret, extra_info): + for o in ei["oracles"]: + stat[o]["ok"] = stat[o].get("ok", 0) + int(ok) + stat[o]["total"] = stat[o].get("total", 0) + 1 + + for o, v in stat.items(): + v["succ_rate"] = f'{v["ok"] / v["total"]:.1%}' + v["throughput"] = v["total"] / (time.time() - tstart) + v["throughput"] = f'{v["throughput"]:.2f} response/s' + + # append more info to each log + for i in range(len(ret)): + idx, (ok, reward_log) = ret[i] + reward_log += f"\nGroup {stat = }" + ret[i] = (idx, (ok, reward_log)) + + return ret diff --git a/rl/reward_utility/code_analyzer.py b/rl/reward_utility/code_analyzer.py new file mode 100644 index 0000000..0f8b408 --- /dev/null +++ b/rl/reward_utility/code_analyzer.py @@ -0,0 +1,34 @@ +# SPDX-FileCopyrightText: (c) UIUC PurpCode Team +# +# SPDX-License-Identifier: Apache-2.0 + +import os +import tempfile +import uuid +from typing import List + +from eval.oracles.secure_code_oracles import run_analyzers_in_batch + +PURPCODE_CODE_ANALYZER = os.getenv("PURPCODE_CODE_ANALYZER", "codeguru") # or "codeql" + + +def analyze_code_security_v2( + code_snippets: List[str], min_severity_level: str = "MEDIUM" +) -> List[List[dict]]: + task_ids = [f"task_{uuid.uuid4()}" for _ in code_snippets] + samples = [ + {"task_id": tid, "turn": 1, "code_blocks": [code]} + for tid, code in zip(task_ids, code_snippets) + if code.strip() + ] + print(f"{len(samples)} / {len(code_snippets)} valid code snippets to run analyzers") + with tempfile.TemporaryDirectory() as temp_dir: + merged_analyzer_results, _ = run_analyzers_in_batch( + sample_with_extrcted_code_blocks=samples, + per_analyzer_results_folder=temp_dir, + min_severity_level=min_severity_level, + analyzers=[PURPCODE_CODE_ANALYZER], + num_batches=max(1, min(20, len(samples) // 32)), + batch_granularity="line", + ) + return [merged_analyzer_results.get(tid, []) for tid in task_ids] diff --git a/rl/reward_utility/sandbox_fusion.py b/rl/reward_utility/sandbox_fusion.py new file mode 100644 index 0000000..d977226 --- /dev/null +++ b/rl/reward_utility/sandbox_fusion.py @@ -0,0 +1,66 @@ +# SPDX-FileCopyrightText: (c) UIUC PurpCode Team +# +# SPDX-License-Identifier: Apache-2.0 + +from typing import Tuple + +from sandbox_fusion import RunCodeRequest, RunCodeResponse, RunStatus, run_code + +_ERROR_MSG_PREFIX = "Failed to execute program: " +_DEFAULT_TIMEOUT_SECONDS = 15 + + +def skipline(line: str) -> bool: + if not line.strip(): + return True + for skipper in [ + "UserWarning:", + "`nameko test`", + "This warning can be suppressed", + "warning.warn", + ]: + if skipper in line: + return True + + return False + + +def render_error(response: RunCodeResponse): + log = ( + _ERROR_MSG_PREFIX + + "\n===== STDOUT =====\n" + + response.run_result.stdout + + "\n===== STDERR =====\n" + + response.run_result.stderr + ) + return "\n".join(l for l in log.split("\n") if not skipline(l)) + + +def code_exec_sandbox_fusion( + code, stdin: str = None, timeout=_DEFAULT_TIMEOUT_SECONDS, pytest: str = None +) -> Tuple[bool, str]: + timeout = _DEFAULT_TIMEOUT_SECONDS + + if pytest: + # remove line starting with "from solution import..." + pytest_without_import = "\n".join( + line + for line in pytest.split("\n") + if not line.startswith("from solution import") + ) + response = run_code( + RunCodeRequest( + code=code + "\n" + pytest_without_import, + timeout=timeout, + language="pytest", + ) + ) + else: + response = run_code( + RunCodeRequest(code=code, stdin=stdin, timeout=timeout, language="python") + ) + + if response.status != RunStatus.Success: + return False, render_error(response) + + return True, response.run_result.stdout From 53ad6c98794dd59a3cb49073e494ddaaecbe2389 Mon Sep 17 00:00:00 2001 From: ganler Date: Thu, 7 Aug 2025 10:21:07 +0000 Subject: [PATCH 2/6] hotadd --- README.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index 295cdd9..0830390 100644 --- a/README.md +++ b/README.md @@ -18,9 +18,9 @@ PurpCode includes two alignment stages: - The โ˜๏ธ*first* cybersafe reasoning recipe in open source - Great cybersafety and utility preservation, winning the ๐Ÿฅ‡*1st place* in [Amazon Nova AI Challenge](https://www.amazon.science/nova-ai-challenge/pushing-the-boundaries-of-secure-ai-winners-of-the-amazon-nova-ai-challenge) - Fully ๐Ÿ‘open-sourced, from models, data, to training/evaluation code and data synthesizers -- ๐ŸŽ๏ธ Fast RL with *Single-Step Dynamic Sampling* -- 12% faster, 15% less sample wasting, and better results than [DAPO](https://arxiv.org/abs/2503.14476) -- ๐Ÿ“š Supporting 13 evals, 90 CWEs, and 4 training objectives & rewards, covering cybersafety, utility, and overrefusal -- ๐Ÿ™…โ€โ™‚๏ธ XSCode -- our home-made evaluator and the *first* benchmark for checking overrefusal in secure code generation +- ๐ŸŽ๏ธFast RL with *Single-Step Dynamic Sampling* -- 12% faster, 15% less sample wasting, and better results than [DAPO](https://arxiv.org/abs/2503.14476) +- ๐Ÿ“šSupporting 13 evals, 90 CWEs, and 4 training objectives & rewards, covering cybersafety, utility, and overrefusal +- ๐Ÿ™…โ€โ™‚๏ธXSCode -- our home-made evaluator and the *first* benchmark for checking overrefusal in secure code generation - ... and more details in the [paper](https://arxiv.org/abs/2507.19060)! ## Initial Setup From 8dd7df8e10caa813928c94aa925100b761cb5392 Mon Sep 17 00:00:00 2001 From: ganler Date: Thu, 7 Aug 2025 10:26:09 +0000 Subject: [PATCH 3/6] hotfix --- rl/reward.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/rl/reward.py b/rl/reward.py index 1e224b1..6409e25 100644 --- a/rl/reward.py +++ b/rl/reward.py @@ -4,6 +4,7 @@ import asyncio import json +import os import time from collections import defaultdict from concurrent.futures import ThreadPoolExecutor, as_completed @@ -248,9 +249,7 @@ def compute_score(data_source, response, ground_truth, extra_info): extra_info = extra_info.item() def _return_score(score, reward_log, marker="-", max_chars=None): - reward_log = _wrap_log( - score, "\n".join(reward_log), marker=marker, max_chars=max_chars - ) + reward_log = _wrap_log(score, "\n".join(reward_log), max_chars=max_chars) reward_log += f"\n{data_source = } :: computed in {time.time() - t_start}s" return score From 832b9ccb8820d640b7909bb16db57a1db5d3dd9b Mon Sep 17 00:00:00 2001 From: ganler Date: Thu, 7 Aug 2025 10:33:49 +0000 Subject: [PATCH 4/6] fix gemini comments --- README.md | 6 +++--- rl/data/correctness.py | 1 - 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 0830390..e9be608 100644 --- a/README.md +++ b/README.md @@ -67,7 +67,7 @@ pip install --upgrade pip pip install "sglang[all]>=0.4.9.post2" "sglang-router" "huggingface-hub" huggingface-cli download Qwen/Qwen2.5-14B-Instruct-1M -python3 -m sglang_router.launch_server --model Qwen/Qwen2.5-14B-Instruct-1M --dp-size 8 --port 30000 --host 0.0.0.0 | tmux detach +python3 -m sglang_router.launch_server --model Qwen/Qwen2.5-14B-Instruct-1M --dp-size 8 --port 30000 --host 0.0.0.0 & tmux detach # -------------------------- # --- TMUX SESSION "main" --- @@ -106,7 +106,7 @@ python datagen/ctxdistill/post.py --generation-path Qwen2.5-14B-Instruct-1M.dist tmux at -t main || tmux new -s main conda create -n axo python=3.12 -y conda activate axo -git clone git@github.com:axolotl-ai-cloud/axolotl.git +git clone https://github.com/axolotl-ai-cloud/axolotl.git cd axolotl pip3 install torch --index-url https://download.pytorch.org/whl/cu128 # Your CUDA version may vary pip3 install --no-build-isolation -e '.[flash-attn,deepspeed]' @@ -142,7 +142,7 @@ python rl/data/merge.py --datasets purpcorn/code-r1-46k-leetcode2k-kodcode purpc # --- TMUX SESSION "sgl" (remote machine) --- # Do it in another machine (assuming ip=a.b.c.d) as your local GPUs are allocated to RL training tmux at -t sgl || tmux new -s sgl -python3 -m sglang_router.launch_server --model Qwen/Qwen2.5-32B-Instruct --dp-size 8 --port 30000 --host 0.0.0.0 | tmux detach +python3 -m sglang_router.launch_server --model Qwen/Qwen2.5-32B-Instruct --dp-size 8 --port 30000 --host 0.0.0.0 & tmux detach # ------------------------------------------- # --- TMUX SESSION "main" (RL machine) --- diff --git a/rl/data/correctness.py b/rl/data/correctness.py index c003ed8..de6b7fb 100644 --- a/rl/data/correctness.py +++ b/rl/data/correctness.py @@ -61,7 +61,6 @@ def kodcode(): "scipy", "sklearn", "cv2", - "scipy", "imageio", "sphinx-pyproject", "xgboost", From 95d9b50e94f34e9a8dbb911bb501b7cf92b8a335 Mon Sep 17 00:00:00 2001 From: Jiawei Liu Date: Thu, 7 Aug 2025 05:37:01 -0500 Subject: [PATCH 5/6] Update README.md Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- README.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index e9be608..fecbbab 100644 --- a/README.md +++ b/README.md @@ -34,7 +34,9 @@ export SHELL_RC=${HOME}/.bashrc # or ~/.zshrc if you use zsh curl -OL https://github.com/aws/aws-codeguru-cli/releases/download/0.2.4/aws-codeguru-cli.zip unzip aws-codeguru-cli.zip -d ${HOME} export PATH=$PATH:${HOME}/aws-codeguru-cli/bin/ -sed -i '1i export PATH=$PATH:${HOME}/aws-codeguru-cli/bin/' ${SHELL_RC} +if ! grep -q 'export PATH=$PATH:${HOME}/aws-codeguru-cli/bin/' "${SHELL_RC}"; then + sed -i '1i export PATH=$PATH:${HOME}/aws-codeguru-cli/bin/' "${SHELL_RC}" +fi # codeql -- if you don't want to use codeguru, you can use codeql instead which only eats CPUs but the analyzer completeness and soundness can be different # -- you also need to set environment variable to PURPCODE_CODE_ANALYZER=codeql From 98a079daba6cf946a4ed398a43f78d389f853810 Mon Sep 17 00:00:00 2001 From: Jiawei Liu Date: Thu, 7 Aug 2025 05:38:05 -0500 Subject: [PATCH 6/6] Update README.md Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- README.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index fecbbab..e00e362 100644 --- a/README.md +++ b/README.md @@ -43,7 +43,9 @@ fi wget https://github.com/github/codeql-action/releases/download/codeql-bundle-v2.21.0/codeql-bundle-linux64.tar.gz tar -xf codeql-bundle-linux64.tar.gz -C ${HOME} export PATH=$PATH:${HOME}/codeql/ -sed -i '1i export PATH=$PATH:${HOME}/codeql/' ${SHELL_RC} +if ! grep -q 'export PATH=$PATH:${HOME}/codeql/' "${SHELL_RC}"; then + sed -i '1i export PATH=$PATH:${HOME}/codeql/' "${SHELL_RC}" +fi tmux detach # --------------------------