mirror of
https://github.com/volcengine/verl.git
synced 2025-10-20 13:43:50 +08:00
Compare commits
13 Commits
dependabot
...
061535208c
Author | SHA1 | Date | |
---|---|---|---|
061535208c | |||
55f651c94d | |||
22d082f9a4 | |||
8ec9bf64a1 | |||
231d725f69 | |||
d69164e1cb | |||
2181d5b33a | |||
33eb86f54f | |||
67f9a21b8e | |||
d2c51dc186 | |||
16c2a21064 | |||
3abcc09d44 | |||
5d378b5f95 |
1
.github/workflows/model.yml
vendored
1
.github/workflows/model.yml
vendored
@ -208,6 +208,7 @@ jobs:
|
||||
|
||||
- name: Running mcore engine tests on 8 L20 GPUs
|
||||
run: |
|
||||
ray stop --force
|
||||
pytest -s -x tests/models/test_engine.py
|
||||
|
||||
cleanup:
|
||||
|
@ -238,6 +238,9 @@ verl is inspired by the design of Nemo-Aligner, Deepspeed-chat and OpenRLHF. The
|
||||
- [Vision-SR1](https://github.com/zli12321/Vision-SR1): Self-Rewarding Vision-Language Model via Reasoning Decomposition 
|
||||
- [SimpleVLA-RL](https://github.com/PRIME-RL/SimpleVLA-RL): SimpleVLA-RL: A Simple yet Effective Vision-Language Action Model for Reinforcement Learning 
|
||||
- [Table-R1](https://github.com/Table-R1/Table-R1): Table-R1: Inference-Time Scaling for Table Reasoning 
|
||||
- [Revisual-R1](https://github.com/CSfufu/Revisual-R1): Revisual-R1: Advancing Multimodal Reasoning From Optimized Cold Start to Staged Reinforcement Learning 
|
||||
- [ARES](https://github.com/shawn0728/ARES): ARES: Multimodal Adaptive Reasoning via Difficulty-Aware Token-Level Entropy Shaping 
|
||||
- [Meta-Bandit-LLM](https://github.com/sanxing-chen/meta-bandit-llm): Meta-Bandit-LLM: Long-horizon multiturn interactive training for meta-bandit agents 
|
||||
|
||||
and many more awesome work listed in [recipe](recipe/README.md).
|
||||
|
||||
|
@ -36,6 +36,8 @@ For vLLM with FSDP, please refer to [hiyouga/verl](https://hub.docker.com/r/hiyo
|
||||
|
||||
For SGLang with FSDP, please refer to [ocss884/verl-sglang](https://hub.docker.com/r/ocss884/verl-sglang) repository and the latest version is ``ocss884/verl-sglang:ngc-th2.6.0-cu126-sglang0.4.6.post5`` which is provided by SGLang RL Group.
|
||||
|
||||
For latest vLLM with Megatron, please refer to [iseekyan/verl](https://hub.docker.com/r/iseekyan/verl) repository and the latest version is ``iseekyan/verl:nemo.gptoss_vllm0.11.0``.
|
||||
|
||||
See files under ``docker/`` for NGC-based image or if you want to build your own.
|
||||
|
||||
Note that For aws instances with EFA net interface (Sagemaker AI Pod), you need to install EFA driver as shown in ``docker/Dockerfile.extenstion.awsefa``
|
||||
|
@ -0,0 +1,15 @@
|
||||
FROM nvcr.io/nvidia/nemo:25.07.gpt_oss
|
||||
|
||||
RUN git clone -b v0.11.0 --depth 1 https://github.com/vllm-project/vllm.git /opt/vllm
|
||||
|
||||
RUN pip install setuptools_scm
|
||||
|
||||
RUN cd /opt/vllm && pip install --no-deps --no-build-isolation --no-cache-dir -e .
|
||||
|
||||
RUN pip install cbor2 setproctitle blake3 openai_harmony pybase64 msgspec partial_json_parser py-cpuinfo diskcache gguf
|
||||
|
||||
RUN pip install --upgrade transformers tokenizers
|
||||
|
||||
RUN pip install codetiming tensordict mathruler pylatexenc
|
||||
|
||||
RUN pip3 install --no-cache-dir mbridge
|
@ -55,7 +55,7 @@ actor_rollout_ref:
|
||||
|
||||
The new implementation:
|
||||
- ✅ Three aggregation levels: token, sequence, geometric
|
||||
- ✅ Two bounding modes: truncate, clip
|
||||
- ✅ Two bounding modes: truncate, mask
|
||||
- ✅ Dual threshold support (upper/lower)
|
||||
- ✅ Veto mechanism for catastrophic outliers
|
||||
- ✅ 30+ comprehensive metrics
|
||||
@ -150,7 +150,7 @@ Aggregation level for IS weights:
|
||||
### `algorithm.rollout_is_mode` (str)
|
||||
Bounding mode:
|
||||
- `"truncate"`: Cap weights at upper threshold only
|
||||
- `"clip"`: Zero out weights outside [lower, upper]
|
||||
- `"mask"`: Zero out weights outside [lower, upper]
|
||||
|
||||
### `algorithm.rollout_is_veto_threshold` (float)
|
||||
Per-token veto threshold. If any token ratio < this, entire sequence is rejected.
|
||||
@ -199,7 +199,7 @@ All metrics are prefixed with `mismatch/`. For example, `rollout_is_mean` appear
|
||||
- **`rollout_is_min`**: Minimum IS weight observed
|
||||
- Shows the most underweighted token/sequence
|
||||
|
||||
- **`rollout_is_max`**: Maximum IS weight observed (before clipping)
|
||||
- **`rollout_is_max`**: Maximum IS weight observed (before truncation/masking)
|
||||
- Shows the most overweighted token/sequence
|
||||
- Compare with `rollout_is_threshold` to see truncation impact
|
||||
|
||||
@ -235,11 +235,11 @@ All metrics are prefixed with `mismatch/`. For example, `rollout_is_mean` appear
|
||||
#### **Threshold Exceedance Metrics**
|
||||
|
||||
- **`rollout_is_ratio_fraction_high`**: Fraction of weights exceeding upper threshold
|
||||
- Shows how often truncation/clipping occurs on high end
|
||||
- Shows how often truncation/masking occurs on high end
|
||||
- **Ideal value**: < 0.1 (most weights within bounds)
|
||||
|
||||
- **`rollout_is_ratio_fraction_low`**: Fraction of weights below lower threshold
|
||||
- Shows how often clipping occurs on low end (clip mode only)
|
||||
- Shows how often masking occurs on low end (mask mode only)
|
||||
- **Ideal value**: < 0.1
|
||||
|
||||
#### **Sequence-Level Metrics** (for sequence/geometric modes)
|
||||
@ -261,14 +261,14 @@ All metrics are prefixed with `mismatch/`. For example, `rollout_is_mean` appear
|
||||
|
||||
- **`rollout_is_seq_fraction_low`**: Fraction of sequences below lower threshold
|
||||
|
||||
#### **Clipping Metrics** (clip mode only)
|
||||
#### **Masking Metrics** (mask mode only)
|
||||
|
||||
- **`rollout_is_clipped_fraction`**: Fraction of tokens clipped (set to zero)
|
||||
- **`rollout_is_masked_fraction`**: Fraction of tokens masked (set to zero)
|
||||
- **Ideal value**: < 0.1
|
||||
- **Warning**: > 0.3 means losing too much data
|
||||
|
||||
- **`rollout_is_seq_clipped_fraction`**: Fraction of sequences with at least one clipped token
|
||||
- Shows sequence-level impact of clipping
|
||||
- **`rollout_is_seq_masked_fraction`**: Fraction of sequences with at least one masked token
|
||||
- Shows sequence-level impact of masking
|
||||
|
||||
#### **Distribution Mismatch Metrics** (Training vs Rollout Policy)
|
||||
|
||||
@ -456,14 +456,14 @@ algorithm:
|
||||
rollout_is_mode: truncate
|
||||
```
|
||||
|
||||
### Example 3: Geometric Mean with Clip
|
||||
### Example 3: Geometric Mean with Mask
|
||||
```yaml
|
||||
algorithm:
|
||||
rollout_is_threshold: 1.0002
|
||||
rollout_is: true
|
||||
rollout_is_threshold_lower: 0.9998
|
||||
rollout_is_level: geometric
|
||||
rollout_is_mode: clip
|
||||
rollout_is_mode: mask
|
||||
```
|
||||
|
||||
### Example 4: Asymmetric Thresholds
|
||||
@ -473,7 +473,7 @@ algorithm:
|
||||
rollout_is: true
|
||||
rollout_is_threshold_lower: 0.8
|
||||
rollout_is_level: token
|
||||
rollout_is_mode: clip
|
||||
rollout_is_mode: mask
|
||||
```
|
||||
|
||||
## Troubleshooting
|
||||
|
@ -123,7 +123,7 @@ Actor/Rollout/Reference Policy
|
||||
rollout_is_threshold: null # Upper threshold for IS weights (null to disable)
|
||||
rollout_is_threshold_lower: null # Lower threshold (null = auto 1/upper)
|
||||
rollout_is_level: token # Aggregation: token/sequence/geometric
|
||||
rollout_is_mode: truncate # Bounding: truncate/clip
|
||||
rollout_is_mode: truncate # Bounding: truncate/mask
|
||||
rollout_is_veto_threshold: 1e-4 # Catastrophic outlier threshold
|
||||
use_torch_compile: True # False to disable torch compile
|
||||
kl_loss_coef: 0.001 # for grpo
|
||||
@ -527,7 +527,7 @@ Algorithm
|
||||
- ``rollout_is_threshold``: Upper threshold for IS weights. Set to ``null`` to disable IS completely.
|
||||
- ``rollout_is_threshold_lower``: Lower threshold for IS weights. If ``null``, defaults to reciprocal of upper (1/upper).
|
||||
- ``rollout_is_level``: Aggregation level: ``token`` (biased), ``sequence`` (unbiased), or ``geometric`` (experimental).
|
||||
- ``rollout_is_mode``: Bounding mode: ``truncate`` (cap upper only) or ``clip`` (zero outside bounds).
|
||||
- ``rollout_is_mode``: Bounding mode: ``truncate`` (cap upper only) or ``mask`` (zero outside bounds).
|
||||
- ``rollout_is_veto_threshold``: Per-token veto threshold for catastrophic outliers. Default is 1e-4.
|
||||
Note: Rollout IS requires setting ``actor_rollout_ref.rollout.calculate_log_probs=True``.
|
||||
|
||||
|
@ -79,7 +79,7 @@ For latest vLLM with FSDP, please refer to `hiyouga/verl <https://hub.docker.com
|
||||
|
||||
For latest SGLang with FSDP, please refer to `hebiaobuaa/verl <https://hub.docker.com/r/hebiaobuaa/verl>`_ repository and the latest version is ``hebiaobuaa/verl:app-verl0.5-sglang0.4.9.post6-mcore0.12.2-te2.2`` which is provided by SGLang RL Group.
|
||||
|
||||
For latest vLLM with Megatron, please refer to `iseekyan/verl:app-verl0.5-transformers4.55.4-vllm0.10.0-mcore0.15.0-te2.7`
|
||||
For latest vLLM with Megatron, please refer to `iseekyan/verl <https://hub.docker.com/r/iseekyan/verl>`_ repository and the latest version is ``iseekyan/verl:nemo.gptoss_vllm0.11.0``.
|
||||
|
||||
See files under ``docker/`` for NGC-based image or if you want to build your own.
|
||||
|
||||
|
@ -6,21 +6,20 @@
|
||||
This is the official implementaion of paper [***Geometric-Mean Policy Optimization***](https://arxiv.org/abs/2507.20673).
|
||||
|
||||
<div align=center>
|
||||
<img width="3092" height="864" alt="image" src="https://github.com/user-attachments/assets/af4c7e0f-923a-45ef-9bcf-57109b8ee61e" />
|
||||
<img width="3092" height="864" alt="image" src="https://github.com/user-attachments/assets/20b04c4e-7ee8-4775-9af8-33c0158336e2" />
|
||||
</div>
|
||||
|
||||
|
||||
## 1. Contents
|
||||
- Geometric-Mean Policy Optimization
|
||||
- [1. Contents](#1-contents)
|
||||
- [2. Introduction](#2-introduction)
|
||||
- [3. Code Usage](#4-code-usage)
|
||||
- [4. Contacts](#5-contacts)
|
||||
- [5. Citation](#7-citation)
|
||||
- [3. Code Usage](#3-code-usage)
|
||||
- [4. Contacts](#4-contacts)
|
||||
- [5. Citation](#5-citation)
|
||||
|
||||
## 2. Introduction
|
||||
|
||||
Recent advancements, such as Group Relative Policy Optimization (GRPO), have enhanced the reasoning capabilities of large language models by optimizing the arithmetic mean of token-level rewards. However, GRPO suffers from unstable policy updates when processing tokens with outlier importance-weighted rewards, which manifests as extreme importance sampling ratios during training, i.e., the ratio between the sampling probabilities assigned to a token by the current and old policies. In this work, we propose Geometric-Mean Policy Optimization (GMPO), a stabilized variant of GRPO. Instead of optimizing the arithmetic mean, GMPO maximizes the geometric mean of token-level rewards, which is inherently less sensitive to outliers and maintains a more stable range of importance sampling ratio. In addition, we provide comprehensive theoretical and experimental analysis to justify the design and stability benefits of GMPO. Beyond improved stability, GMPO-7B outperforms GRPO by an average of 4.1% on multiple mathematical benchmarks and 1.4% on multimodal reasoning benchmark, including AIME24, AMC, MATH500, OlympiadBench, Minerva, and Geometry3K.
|
||||
Group Relative Policy Optimization (GRPO) has significantly enhanced the reasoning capability of large language models by optimizing the arithmetic mean of token-level rewards. Unfortunately, GRPO is observed to suffer from unstable policy updates when facing tokens with outlier importance-weighted rewards, which manifest as extreme importance sampling ratios during training. In this study, we propose Geometric-Mean Policy Optimization (GMPO), with the aim to improve the stability of GRPO through suppressing token reward outliers. Instead of optimizing the arithmetic mean, GMPO maximizes the geometric mean of token-level rewards, which is inherently less sensitive to outliers and maintains a more stable range of importance sampling ratio. GMPO is plug-and-play—simply replacing GRPO's arithmetic mean with the geometric mean of token-level rewards, as the latter is inherently less sensitive to outliers. GMPO is theoretically plausible—analysis reveals that both GMPO and GRPO are weighted forms of the policy gradient while the former enjoys more stable weights, which consequently benefits policy optimization and performance. Experiments on multiple mathematical reasoning benchmarks show that GMPO-7B improves the average Pass@1 of GRPO by up to 4.1%, outperforming many state-of-the-art approaches.
|
||||
|
||||
## 3. Code Usage
|
||||
|
||||
@ -30,7 +29,7 @@ clip_ratio_low=0.4
|
||||
clip_ratio_high=0.4
|
||||
loss_mode=geo_mean
|
||||
```
|
||||
|
||||
We observed that using a large clip ratio during Mixture-of-Experts (MoE) model training often leads to optimization instability. When training MoE models, consider lowering the clip ratio to achieve more stable convergence.
|
||||
To get started quickly, run:
|
||||
```
|
||||
bash examples/gmpo_trainer/run_qwen2_5-7b_math.sh
|
||||
@ -51,13 +50,10 @@ If you have any question about our work or this repository, please don't hesitat
|
||||
|
||||
## 5. Citation
|
||||
```
|
||||
@misc{zhao2025geometricmeanpolicyoptimization,
|
||||
title={Geometric-Mean Policy Optimization},
|
||||
author={Yuzhong Zhao and Yue Liu and Junpeng Liu and Jingye Chen and Xun Wu and Yaru Hao and Tengchao Lv and Shaohan Huang and Lei Cui and Qixiang Ye and Fang Wan and Furu Wei},
|
||||
year={2025},
|
||||
eprint={2507.20673},
|
||||
archivePrefix={arXiv},
|
||||
primaryClass={cs.CL},
|
||||
url={https://arxiv.org/abs/2507.20673},
|
||||
@article{zhao2025geometric,
|
||||
title={Geometric-mean policy optimization},
|
||||
author={Zhao, Yuzhong and Liu, Yue and Liu, Junpeng and Chen, Jingye and Wu, Xun and Hao, Yaru and Lv, Tengchao and Huang, Shaohan and Cui, Lei and Ye, Qixiang and others},
|
||||
journal={arXiv preprint arXiv:2507.20673},
|
||||
year={2025}
|
||||
}
|
||||
```
|
||||
|
79
examples/grpo_trainer/run_qwen3_vl-30b-megatron.sh
Normal file
79
examples/grpo_trainer/run_qwen3_vl-30b-megatron.sh
Normal file
@ -0,0 +1,79 @@
|
||||
set -x
|
||||
ENGINE=${1:-vllm}
|
||||
export CUDA_DEVICE_MAX_CONNECTIONS=1 # For megatron communication/computation overlapping
|
||||
|
||||
# VLLM version >= 0.11.0 for qwen3-vl support, recommend to use container docker://iseekyan/verl:nemo.gptoss_vllm0.11.0
|
||||
# pip install -U git+https://github.com/ISEEKYAN/mbridge.git # for latest mbridge
|
||||
# pip install -U transformers # for qwen3-vl support
|
||||
# pip install --no-deps --no-cache-dir git+https://github.com/NVIDIA/Megatron-LM.git@core_v0.13.1 # for megatron-lm0.13.1
|
||||
|
||||
|
||||
export VLLM_ALLREDUCE_USE_SYMM_MEM=0 # for vllm0.11.0 with TP
|
||||
|
||||
|
||||
HF_MODEL_PATH=${HF_MODEL_PATH:-"${RAY_DATA_HOME}/models/Qwen3-VL-30B-A3B-Instruct"}
|
||||
|
||||
|
||||
train_path=$HOME/data/geo3k/train.parquet
|
||||
test_path=$HOME/data/geo3k/test.parquet
|
||||
|
||||
python3 -m verl.trainer.main_ppo --config-path=config \
|
||||
--config-name='ppo_megatron_trainer.yaml'\
|
||||
algorithm.adv_estimator=grpo \
|
||||
data.train_files="$train_path" \
|
||||
data.val_files="$test_path" \
|
||||
data.train_batch_size=512 \
|
||||
data.max_prompt_length=1024 \
|
||||
data.max_response_length=2048 \
|
||||
data.filter_overlong_prompts=True \
|
||||
data.truncation='error' \
|
||||
actor_rollout_ref.model.path=$HF_MODEL_PATH \
|
||||
actor_rollout_ref.actor.optim.lr=1e-6 \
|
||||
actor_rollout_ref.actor.ppo_mini_batch_size=128 \
|
||||
actor_rollout_ref.actor.ppo_micro_batch_size_per_gpu=1 \
|
||||
actor_rollout_ref.actor.megatron.expert_model_parallel_size=8 \
|
||||
actor_rollout_ref.actor.megatron.tensor_model_parallel_size=4 \
|
||||
actor_rollout_ref.actor.use_kl_loss=True \
|
||||
actor_rollout_ref.actor.kl_loss_coef=0.01 \
|
||||
actor_rollout_ref.actor.kl_loss_type=low_var_kl \
|
||||
actor_rollout_ref.actor.entropy_coeff=0 \
|
||||
actor_rollout_ref.rollout.log_prob_micro_batch_size_per_gpu=1 \
|
||||
actor_rollout_ref.rollout.tensor_model_parallel_size=4 \
|
||||
actor_rollout_ref.actor.use_dynamic_bsz=True \
|
||||
actor_rollout_ref.actor.ppo_max_token_len_per_gpu=5120 \
|
||||
actor_rollout_ref.ref.log_prob_use_dynamic_bsz=True \
|
||||
actor_rollout_ref.ref.log_prob_max_token_len_per_gpu=20480 \
|
||||
actor_rollout_ref.rollout.log_prob_use_dynamic_bsz=True \
|
||||
actor_rollout_ref.rollout.log_prob_max_token_len_per_gpu=20480 \
|
||||
actor_rollout_ref.rollout.name=$ENGINE \
|
||||
+actor_rollout_ref.rollout.engine_kwargs.vllm.disable_mm_preprocessor_cache=True \
|
||||
actor_rollout_ref.rollout.gpu_memory_utilization=0.7 \
|
||||
actor_rollout_ref.rollout.n=5 \
|
||||
actor_rollout_ref.ref.log_prob_micro_batch_size_per_gpu=1 \
|
||||
actor_rollout_ref.actor.megatron.use_mbridge=True \
|
||||
actor_rollout_ref.actor.megatron.param_offload=True \
|
||||
actor_rollout_ref.actor.megatron.optimizer_offload=True \
|
||||
actor_rollout_ref.actor.megatron.grad_offload=True \
|
||||
actor_rollout_ref.ref.megatron.param_offload=True \
|
||||
+actor_rollout_ref.actor.optim.override_optimizer_config.optimizer_offload_fraction=1 \
|
||||
+actor_rollout_ref.actor.optim.override_optimizer_config.overlap_cpu_optimizer_d2h_h2d=True \
|
||||
+actor_rollout_ref.actor.optim.override_optimizer_config.use_precision_aware_optimizer=True \
|
||||
+actor_rollout_ref.actor.optim.override_optimizer_config.optimizer_cpu_offload=True \
|
||||
+actor_rollout_ref.actor.megatron.override_transformer_config.moe_router_dtype=fp32 \
|
||||
+actor_rollout_ref.actor.megatron.override_transformer_config.moe_enable_deepep=True \
|
||||
+actor_rollout_ref.actor.megatron.override_transformer_config.moe_token_dispatcher_type=flex \
|
||||
+actor_rollout_ref.actor.megatron.override_transformer_config.recompute_method=uniform \
|
||||
+actor_rollout_ref.actor.megatron.override_transformer_config.recompute_granularity=full \
|
||||
+actor_rollout_ref.actor.megatron.override_transformer_config.recompute_num_layers=1 \
|
||||
+actor_rollout_ref.actor.megatron.override_transformer_config.gradient_accumulation_fusion=True \
|
||||
+actor_rollout_ref.actor.megatron.override_transformer_config.moe_permute_fusion=True \
|
||||
algorithm.use_kl_in_reward=False \
|
||||
trainer.critic_warmup=0 \
|
||||
trainer.logger='["console","wandb"]' \
|
||||
trainer.project_name='verl_grpo_example_geo3k' \
|
||||
trainer.experiment_name='qwen3_vl_30b_megatron' \
|
||||
trainer.n_gpus_per_node=8 \
|
||||
trainer.nnodes=1 \
|
||||
trainer.save_freq=20 \
|
||||
trainer.test_freq=5 \
|
||||
trainer.total_epochs=15 $@
|
@ -86,7 +86,7 @@ algorithm:
|
||||
rollout_is_mode: truncate
|
||||
```
|
||||
|
||||
### Example 3: Geometric Mean with Clip
|
||||
### Example 3: Geometric Mean with Mask
|
||||
|
||||
```yaml
|
||||
algorithm:
|
||||
@ -94,7 +94,7 @@ algorithm:
|
||||
rollout_is: true
|
||||
rollout_is_threshold_lower: 0.9998
|
||||
rollout_is_level: geometric
|
||||
rollout_is_mode: clip
|
||||
rollout_is_mode: mask
|
||||
rollout_is_veto_threshold: 1e-4
|
||||
```
|
||||
|
||||
@ -118,7 +118,7 @@ algorithm:
|
||||
rollout_is: true
|
||||
rollout_is_threshold_lower: 0.8
|
||||
rollout_is_level: token
|
||||
rollout_is_mode: clip
|
||||
rollout_is_mode: mask
|
||||
```
|
||||
|
||||
## Monitoring Metrics
|
||||
@ -183,9 +183,9 @@ These metrics help diagnose the distribution mismatch between rollout and traini
|
||||
2. Verify rollout_log_probs are correctly passed
|
||||
3. Check for systematic bias in rollout vs training
|
||||
|
||||
### Issue: Too Much Data Discarded (Clip Mode)
|
||||
### Issue: Too Much Data Discarded (Mask Mode)
|
||||
|
||||
**Symptoms**: `rollout_is_clipped_fraction` > 0.5
|
||||
**Symptoms**: `rollout_is_masked_fraction` > 0.5
|
||||
|
||||
**Solutions**:
|
||||
1. Widen thresholds
|
||||
|
@ -21,7 +21,7 @@ rollout_is_threshold_lower=null
|
||||
# Aggregation level: token | sequence | geometric (experimental)
|
||||
rollout_is_level=token
|
||||
|
||||
# Bounding mode: truncate (cap upper) | clip (zero outside bounds)
|
||||
# Bounding mode: truncate (cap upper) | mask (zero outside bounds)
|
||||
rollout_is_mode=truncate
|
||||
|
||||
# Catastrophic outlier veto threshold
|
||||
|
@ -0,0 +1,23 @@
|
||||
hydra:
|
||||
searchpath:
|
||||
- file://verl/trainer/config
|
||||
|
||||
defaults:
|
||||
- ppo_trainer
|
||||
- _self_
|
||||
|
||||
data:
|
||||
max_prompt_length: 1024
|
||||
max_response_length: 1024
|
||||
train_batch_size: 256
|
||||
return_raw_chat: True
|
||||
shuffle: False
|
||||
|
||||
actor_rollout_ref:
|
||||
hybrid_engine: True
|
||||
rollout:
|
||||
name: sglang
|
||||
multi_turn:
|
||||
enable: True
|
||||
max_assistant_turns: 2
|
||||
format: qwen
|
@ -43,6 +43,20 @@ logger = logging.getLogger(__file__)
|
||||
logger.setLevel(os.getenv("VERL_LOGGING_LEVEL", "WARN"))
|
||||
|
||||
|
||||
def format_tool_response_manually(tool_message: dict, tool_call_name: str) -> str:
|
||||
"""Manually format tool response without using tokenizer template.
|
||||
|
||||
Args:
|
||||
tool_message: Tool message dictionary with 'content' field
|
||||
tool_call_name: Name of the tool that was called
|
||||
|
||||
Returns:
|
||||
Formatted tool response string
|
||||
"""
|
||||
content = tool_message["content"]
|
||||
return f"<|start|>functions.{tool_call_name} to=assistant<|channel|>commentary<|message|>{content}<|end|>"
|
||||
|
||||
|
||||
class MaxTokenExceededError(Exception):
|
||||
"""Indicate that history chat messages + tool message exceeds LLM max_tokens."""
|
||||
|
||||
@ -202,13 +216,39 @@ class ChatModel(BaseChatModel):
|
||||
|
||||
# encode tool response
|
||||
tool_responses = convert_to_openai_messages(messages[i + 1 :])
|
||||
tool_response_ids = await loop.run_in_executor(
|
||||
None,
|
||||
lambda messages=tool_responses: self.tokenizer.apply_chat_template(
|
||||
messages, add_generation_prompt=True, tokenize=True
|
||||
),
|
||||
)
|
||||
tool_response_ids = tool_response_ids[len(kwargs["system_prompt"]) :]
|
||||
if self.tool_parser == "hermes":
|
||||
tool_response_ids = await loop.run_in_executor(
|
||||
None,
|
||||
lambda messages=tool_responses: self.tokenizer.apply_chat_template(
|
||||
messages, add_generation_prompt=True, tokenize=True
|
||||
),
|
||||
)
|
||||
tool_response_ids = tool_response_ids[len(kwargs["system_prompt"]) :]
|
||||
elif self.tool_parser == "gpt-oss":
|
||||
# Format tool responses manually
|
||||
# since gpt-oss chat template requires tool call messages to parse tool response messages
|
||||
# we need to format the tool response messages manually
|
||||
tool_response_texts = []
|
||||
for tool_msg in tool_responses:
|
||||
if tool_msg["role"] == "tool":
|
||||
# Use tool message's name if available (for multiple tool calls)
|
||||
actual_tool_name = tool_msg.get("name", "unknown")
|
||||
if actual_tool_name == "unknown":
|
||||
logger.error(f"actual_tool_name: {actual_tool_name}")
|
||||
formatted = format_tool_response_manually(tool_msg, actual_tool_name)
|
||||
tool_response_texts.append(formatted)
|
||||
# need to add generation tokens for gpt-oss manually since add_generation_prompt is True
|
||||
tool_response_texts.append("<|start|>assistant")
|
||||
|
||||
# Tokenize the manually formatted tool responses
|
||||
tool_response_text = "".join(tool_response_texts)
|
||||
print(f"tool_response_text: {tool_response_text}")
|
||||
|
||||
tool_response_ids = await loop.run_in_executor(
|
||||
None, lambda: self.tokenizer.encode(tool_response_text, add_special_tokens=False)
|
||||
)
|
||||
else:
|
||||
raise ValueError(f"Unsupported tool parser: {self.tool_parser}")
|
||||
|
||||
# stop generation if response length exceeds max response length
|
||||
if len(messages[i].response_metadata["response_mask"]) + len(tool_response_ids) >= self.max_tokens:
|
||||
|
143
recipe/langgraph_agent/example/run_gpt_oss_20b_bf16.sh
Normal file
143
recipe/langgraph_agent/example/run_gpt_oss_20b_bf16.sh
Normal file
@ -0,0 +1,143 @@
|
||||
#!/usr/bin/env bash
|
||||
#SBATCH --job-name=rl-langgraph-3B
|
||||
#SBATCH --partition=main
|
||||
#SBATCH --nodes=1
|
||||
#SBATCH --ntasks-per-node=1
|
||||
#SBATCH --cpus-per-task=64
|
||||
#SBATCH --gres=gpu:4
|
||||
#SBATCH --mem=0
|
||||
#SBATCH --time=10:00:00
|
||||
#SBATCH --output=%x_%j.out
|
||||
#SBATCH --error=%x_%j.err
|
||||
|
||||
set -xeuo pipefail
|
||||
|
||||
# ================= cluster topology =================
|
||||
export GPUS_PER_NODE=${SLURM_GPUS_ON_NODE:-${GPUS_PER_NODE:-2}} # GPUs on this node
|
||||
NNODES=${SLURM_JOB_NUM_NODES:-${NNODES:-1}}
|
||||
export NNODES
|
||||
export RAY_NUM_NODES=$NNODES
|
||||
|
||||
# Require at least 2 GPUs
|
||||
TOTAL_GPUS=$((GPUS_PER_NODE * NNODES))
|
||||
if [ "$TOTAL_GPUS" -lt 2 ]; then
|
||||
echo "Error: at least 2 GPUs are required, detected $TOTAL_GPUS." >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
echo "Using $NNODES nodes and $GPUS_PER_NODE GPUs per node..."
|
||||
|
||||
# ================= data/model/tool =================
|
||||
HDFS_ROOT=${HDFS_ROOT:-$PWD}
|
||||
DATA_ROOT=${DATA_ROOT:-$PWD}
|
||||
|
||||
# Prefer local model if present, otherwise fall back to HF hub path
|
||||
model_path="lmsys/gpt-oss-20b-bf16"
|
||||
|
||||
# Use the default output directory produced by create_dataset.py
|
||||
train_files=$DATA_ROOT/data/math_expression_tool/train.parquet
|
||||
test_files=$DATA_ROOT/data/math_expression_tool/test.parquet
|
||||
|
||||
# Agent config
|
||||
agent_loop_config_path=recipe/langgraph_agent/example/agent.yaml
|
||||
|
||||
# =================== wandb ===================
|
||||
project_name=math_expression_tool
|
||||
experiment_name=gpt-oss-20b-bf16
|
||||
default_local_dir=$DATA_ROOT/checkpoint/$experiment_name
|
||||
|
||||
# ================= algorithm =================
|
||||
adv_estimator=grpo
|
||||
|
||||
use_kl_in_reward=false
|
||||
kl_coef=0.0
|
||||
use_kl_loss=false
|
||||
kl_loss_coef=0.0
|
||||
|
||||
clip_ratio_low=0.2
|
||||
clip_ratio_high=0.28
|
||||
|
||||
max_turns=8
|
||||
max_prompt_length=1024
|
||||
max_response_length=8192
|
||||
actor_lr=1e-6
|
||||
|
||||
train_batch_size=128
|
||||
ppo_mini_batch_size=16
|
||||
n_resp_per_prompt=8
|
||||
n_resp_per_prompt_val=1
|
||||
|
||||
# =================== logging ===================
|
||||
export RAY_LOGGING_LEVEL=DEBUG
|
||||
export HYDRA_FULL_ERROR=1
|
||||
|
||||
# ================= performance =================
|
||||
export NCCL_IBEXT_DISABLE=1
|
||||
export NCCL_NVLS_ENABLE=1
|
||||
export NCCL_IB_HCA=mlx5
|
||||
export UCX_NET_DEVICES=mlx5_0:1,mlx5_1:1,mlx5_2:1,mlx5_3:1,mlx5_4:1,mlx5_5:1,mlx5_6:1,mlx5_7:1
|
||||
export VLLM_USE_V1=1
|
||||
export VLLM_ATTENTION_BACKEND=FLASH_ATTN
|
||||
|
||||
infer_tp=2 # vLLM tensor parallel size
|
||||
train_sp=4 # Ulysses sequence parallel size for actor
|
||||
offload=true
|
||||
|
||||
actor_max_token_len_per_gpu=$(( (max_prompt_length + max_response_length) * 4 ))
|
||||
log_prob_max_token_len_per_gpu=$(( actor_max_token_len_per_gpu * 2 ))
|
||||
|
||||
train_files="['$train_files']"
|
||||
test_files="['$test_files']"
|
||||
|
||||
python3 -m verl.trainer.main_ppo \
|
||||
algorithm.adv_estimator=$adv_estimator \
|
||||
algorithm.use_kl_in_reward=$use_kl_in_reward \
|
||||
algorithm.kl_ctrl.kl_coef=$kl_coef \
|
||||
data.train_files="$train_files" \
|
||||
data.val_files="$test_files" \
|
||||
data.return_raw_chat=true \
|
||||
data.train_batch_size=$train_batch_size \
|
||||
data.max_prompt_length=$max_prompt_length \
|
||||
data.max_response_length=$max_response_length \
|
||||
data.filter_overlong_prompts=true \
|
||||
data.truncation='error' \
|
||||
actor_rollout_ref.model.path="$model_path" \
|
||||
actor_rollout_ref.model.use_remove_padding=true \
|
||||
actor_rollout_ref.model.enable_gradient_checkpointing=true \
|
||||
actor_rollout_ref.actor.use_kl_loss=$use_kl_loss \
|
||||
actor_rollout_ref.actor.kl_loss_coef=$kl_loss_coef \
|
||||
actor_rollout_ref.actor.clip_ratio_low=$clip_ratio_low \
|
||||
actor_rollout_ref.actor.clip_ratio_high=$clip_ratio_high \
|
||||
actor_rollout_ref.actor.clip_ratio_c=10.0 \
|
||||
actor_rollout_ref.actor.optim.lr=$actor_lr \
|
||||
actor_rollout_ref.actor.use_dynamic_bsz=true \
|
||||
actor_rollout_ref.actor.ppo_mini_batch_size=$ppo_mini_batch_size \
|
||||
actor_rollout_ref.actor.ppo_max_token_len_per_gpu=$actor_max_token_len_per_gpu \
|
||||
actor_rollout_ref.actor.ulysses_sequence_parallel_size=$train_sp \
|
||||
actor_rollout_ref.actor.fsdp_config.param_offload=$offload \
|
||||
actor_rollout_ref.actor.fsdp_config.optimizer_offload=$offload \
|
||||
actor_rollout_ref.ref.log_prob_max_token_len_per_gpu=$log_prob_max_token_len_per_gpu \
|
||||
actor_rollout_ref.rollout.name=sglang \
|
||||
actor_rollout_ref.rollout.mode=async \
|
||||
actor_rollout_ref.rollout.tensor_model_parallel_size=$infer_tp \
|
||||
actor_rollout_ref.rollout.multi_turn.max_user_turns=$max_turns \
|
||||
actor_rollout_ref.rollout.multi_turn.max_assistant_turns=$max_turns \
|
||||
actor_rollout_ref.rollout.multi_turn.format=gpt-oss \
|
||||
actor_rollout_ref.rollout.agent.tool_parser=gpt-oss \
|
||||
actor_rollout_ref.rollout.agent.agent_loop_config_path=$agent_loop_config_path \
|
||||
actor_rollout_ref.rollout.gpu_memory_utilization=0.7 \
|
||||
actor_rollout_ref.rollout.n=$n_resp_per_prompt \
|
||||
actor_rollout_ref.rollout.val_kwargs.top_p=1.0\
|
||||
actor_rollout_ref.rollout.val_kwargs.temperature=1.0 \
|
||||
actor_rollout_ref.rollout.val_kwargs.n=$n_resp_per_prompt_val \
|
||||
trainer.logger='["console","wandb"]' \
|
||||
trainer.project_name=$project_name \
|
||||
trainer.experiment_name=$experiment_name \
|
||||
trainer.n_gpus_per_node="$GPUS_PER_NODE" \
|
||||
trainer.val_before_train=true \
|
||||
trainer.log_val_generations=50 \
|
||||
trainer.nnodes="$NNODES" \
|
||||
trainer.save_freq=-1 \
|
||||
trainer.default_local_dir="$default_local_dir" \
|
||||
trainer.test_freq=5 \
|
||||
trainer.total_epochs=1 "$@"
|
@ -293,6 +293,6 @@ python3 -m recipe.one_step_off_policy.async_main_ppo \
|
||||
| Category | Support Situation |
|
||||
|--------------------|-----------------------------------------------------------------------------------------------------------------|
|
||||
| train engine | FSDP2 <br/> Megatron |
|
||||
| rollout engine | vLLM |
|
||||
| rollout engine | vLLM <br/> SGLang |
|
||||
| AdvantageEstimator | GRPO <br/> GRPO_PASSK <br/> REINFORCE_PLUS_PLUS <br/> RLOO <br/> OPO <br/> REINFORCE_PLUS_PLUS_BASELINE<br/>GPG |
|
||||
| Reward | all |
|
||||
|
140
recipe/one_step_off_policy/dapo_7b_math_fsdp2_sglang_4_12.sh
Normal file
140
recipe/one_step_off_policy/dapo_7b_math_fsdp2_sglang_4_12.sh
Normal file
@ -0,0 +1,140 @@
|
||||
#!/usr/bin/env bash
|
||||
set -xeuo pipefail
|
||||
|
||||
project_name='DAPO'
|
||||
exp_name='DAPO-Qwen2.5-7b-MATH-0527a1-fsdp2-sglang-one-step-off-4-12'
|
||||
|
||||
adv_estimator=grpo
|
||||
|
||||
use_kl_in_reward=False
|
||||
kl_coef=0.0
|
||||
use_kl_loss=False
|
||||
kl_loss_coef=0.0
|
||||
|
||||
clip_ratio_low=0.2
|
||||
clip_ratio_high=0.28
|
||||
|
||||
max_prompt_length=$((1024 * 2))
|
||||
max_response_length=$((1024 * 8))
|
||||
enable_overlong_buffer=True
|
||||
overlong_buffer_len=$((1024 * 4))
|
||||
overlong_penalty_factor=1.0
|
||||
|
||||
loss_agg_mode="token-mean"
|
||||
|
||||
train_prompt_bsz=512
|
||||
n_resp_per_prompt=12
|
||||
train_prompt_mini_bsz=32
|
||||
|
||||
# Ray
|
||||
# RAY_ADDRESS=${RAY_ADDRESS:-"http://localhost:8265"}
|
||||
# WORKING_DIR=${WORKING_DIR:-"${PWD}"}
|
||||
# RUNTIME_ENV=${RUNTIME_ENV:-"${WORKING_DIR}/verl/trainer/runtime_env.yaml"}
|
||||
NNODES=${NNODES:-2}
|
||||
NGPUS_PER_NODE=${NGPUS_PER_NODE:-8}
|
||||
|
||||
n_gpus_rollout=2
|
||||
n_gpus_training=$((NGPUS_PER_NODE - n_gpus_rollout))
|
||||
|
||||
# Paths
|
||||
RAY_DATA_HOME=${RAY_DATA_HOME:-"${HOME}/verl"}
|
||||
# very important! please modify the max_position_embeddings in config.json to 32768 after downloading from huggingface
|
||||
MODEL_PATH=${MODEL_PATH:-"${RAY_DATA_HOME}/models/Qwen2.5-Math-7B"}
|
||||
CKPTS_DIR=${CKPTS_DIR:-"${RAY_DATA_HOME}/ckpts/${project_name}/${exp_name}"}
|
||||
TRAIN_FILE=${TRAIN_FILE:-"${RAY_DATA_HOME}/data/dapo-math-17k.parquet"}
|
||||
TEST_FILE=${TEST_FILE:-"${RAY_DATA_HOME}/data/aime-2024.parquet"}
|
||||
|
||||
|
||||
# Algorithm
|
||||
temperature=1.0
|
||||
top_p=1.0
|
||||
top_k=-1 # 0 for HF rollout, -1 for vLLM rollout
|
||||
val_top_p=0.7
|
||||
|
||||
# Performance Related Parameter
|
||||
use_dynamic_bsz=True
|
||||
actor_ppo_max_token_len=$(((max_prompt_length + max_response_length) * 2))
|
||||
infer_ppo_max_token_len=$(((max_prompt_length + max_response_length) * 3))
|
||||
ref_offload=True
|
||||
actor_offload=False
|
||||
gen_tp=2
|
||||
sp_size=4
|
||||
fsdp_size=2
|
||||
|
||||
python3 -m recipe.one_step_off_policy.main_ppo \
|
||||
data.train_files="${TRAIN_FILE}" \
|
||||
data.val_files="${TEST_FILE}" \
|
||||
data.prompt_key=prompt \
|
||||
data.truncation='left' \
|
||||
data.max_prompt_length=${max_prompt_length} \
|
||||
data.max_response_length=${max_response_length} \
|
||||
data.train_batch_size=${train_prompt_bsz} \
|
||||
actor_rollout_ref.rollout.n=${n_resp_per_prompt} \
|
||||
algorithm.adv_estimator=${adv_estimator} \
|
||||
algorithm.use_kl_in_reward=${use_kl_in_reward} \
|
||||
algorithm.kl_ctrl.kl_coef=${kl_coef} \
|
||||
actor_rollout_ref.actor.strategy=fsdp2 \
|
||||
critic.strategy=fsdp2 \
|
||||
actor_rollout_ref.actor.use_kl_loss=${use_kl_loss} \
|
||||
actor_rollout_ref.actor.kl_loss_coef=${kl_loss_coef} \
|
||||
actor_rollout_ref.actor.clip_ratio_low=${clip_ratio_low} \
|
||||
actor_rollout_ref.actor.clip_ratio_high=${clip_ratio_high} \
|
||||
actor_rollout_ref.actor.clip_ratio_c=10.0 \
|
||||
actor_rollout_ref.model.use_remove_padding=True \
|
||||
actor_rollout_ref.hybrid_engine=False \
|
||||
+actor_rollout_ref.model.override_config.max_position_embeddings=32768 \
|
||||
actor_rollout_ref.actor.use_dynamic_bsz=${use_dynamic_bsz} \
|
||||
actor_rollout_ref.ref.log_prob_use_dynamic_bsz=${use_dynamic_bsz} \
|
||||
actor_rollout_ref.rollout.log_prob_use_dynamic_bsz=${use_dynamic_bsz} \
|
||||
actor_rollout_ref.actor.ppo_max_token_len_per_gpu=${actor_ppo_max_token_len} \
|
||||
actor_rollout_ref.ref.log_prob_max_token_len_per_gpu=${infer_ppo_max_token_len} \
|
||||
actor_rollout_ref.rollout.log_prob_max_token_len_per_gpu=${infer_ppo_max_token_len} \
|
||||
actor_rollout_ref.model.path="${MODEL_PATH}" \
|
||||
actor_rollout_ref.actor.optim.lr=1e-6 \
|
||||
actor_rollout_ref.actor.optim.lr_warmup_steps=10 \
|
||||
actor_rollout_ref.actor.optim.weight_decay=0.1 \
|
||||
actor_rollout_ref.actor.ppo_mini_batch_size=${train_prompt_mini_bsz} \
|
||||
actor_rollout_ref.actor.fsdp_config.param_offload=${actor_offload} \
|
||||
actor_rollout_ref.actor.fsdp_config.optimizer_offload=${actor_offload} \
|
||||
actor_rollout_ref.actor.entropy_coeff=0 \
|
||||
actor_rollout_ref.actor.grad_clip=1.0 \
|
||||
actor_rollout_ref.actor.loss_agg_mode=${loss_agg_mode} \
|
||||
actor_rollout_ref.actor.ulysses_sequence_parallel_size=${sp_size} \
|
||||
actor_rollout_ref.rollout.gpu_memory_utilization=0.80 \
|
||||
actor_rollout_ref.rollout.tensor_model_parallel_size=${gen_tp} \
|
||||
actor_rollout_ref.rollout.layered_summon=True \
|
||||
actor_rollout_ref.rollout.load_format=safetensors \
|
||||
actor_rollout_ref.rollout.max_num_batched_tokens=$((max_prompt_length + max_response_length)) \
|
||||
actor_rollout_ref.rollout.temperature=${temperature} \
|
||||
actor_rollout_ref.rollout.top_p=${top_p} \
|
||||
actor_rollout_ref.rollout.top_k=${top_k} \
|
||||
actor_rollout_ref.rollout.val_kwargs.temperature=${temperature} \
|
||||
actor_rollout_ref.rollout.val_kwargs.top_p=${val_top_p} \
|
||||
actor_rollout_ref.rollout.val_kwargs.top_k=${top_k} \
|
||||
actor_rollout_ref.rollout.val_kwargs.do_sample=True \
|
||||
actor_rollout_ref.rollout.val_kwargs.n=1 \
|
||||
actor_rollout_ref.rollout.name=sglang \
|
||||
actor_rollout_ref.ref.fsdp_config.param_offload=${ref_offload} \
|
||||
actor_rollout_ref.ref.ulysses_sequence_parallel_size=${sp_size} \
|
||||
actor_rollout_ref.actor.fsdp_config.fsdp_size=${fsdp_size} \
|
||||
reward_model.reward_manager=dapo \
|
||||
+reward_model.reward_kwargs.overlong_buffer_cfg.enable=${enable_overlong_buffer} \
|
||||
+reward_model.reward_kwargs.overlong_buffer_cfg.len=${overlong_buffer_len} \
|
||||
+reward_model.reward_kwargs.overlong_buffer_cfg.penalty_factor=${overlong_penalty_factor} \
|
||||
+reward_model.reward_kwargs.overlong_buffer_cfg.log=False \
|
||||
+reward_model.reward_kwargs.max_resp_len=${max_response_length} \
|
||||
trainer.logger=['console','tensorboard'] \
|
||||
trainer.project_name="${project_name}" \
|
||||
trainer.experiment_name="${exp_name}" \
|
||||
trainer.val_before_train=True \
|
||||
trainer.test_freq=10 \
|
||||
trainer.save_freq=-1 \
|
||||
trainer.total_epochs=10 \
|
||||
trainer.total_training_steps=100 \
|
||||
trainer.default_local_dir="${CKPTS_DIR}" \
|
||||
trainer.resume_mode=auto \
|
||||
trainer.log_val_generations=10 \
|
||||
trainer.nnodes="${NNODES}" \
|
||||
trainer.n_gpus_per_node="${n_gpus_training}" \
|
||||
rollout.nnodes="${NNODES}" \
|
||||
rollout.n_gpus_per_node="${n_gpus_rollout}"
|
133
recipe/one_step_off_policy/dapo_7b_math_fsdp2_sglang_colocate.sh
Normal file
133
recipe/one_step_off_policy/dapo_7b_math_fsdp2_sglang_colocate.sh
Normal file
@ -0,0 +1,133 @@
|
||||
#!/usr/bin/env bash
|
||||
set -xeuo pipefail
|
||||
|
||||
project_name='DAPO'
|
||||
exp_name='DAPO-Qwen2.5-7b-MATH-0527a1-fsdp2-sglang-colocate'
|
||||
|
||||
adv_estimator=grpo
|
||||
|
||||
use_kl_in_reward=False
|
||||
kl_coef=0.0
|
||||
use_kl_loss=False
|
||||
kl_loss_coef=0.0
|
||||
|
||||
clip_ratio_low=0.2
|
||||
clip_ratio_high=0.28
|
||||
|
||||
max_prompt_length=$((1024 * 2))
|
||||
max_response_length=$((1024 * 8))
|
||||
enable_overlong_buffer=True
|
||||
overlong_buffer_len=$((1024 * 4))
|
||||
overlong_penalty_factor=1.0
|
||||
|
||||
loss_agg_mode="token-mean"
|
||||
|
||||
train_prompt_bsz=512
|
||||
n_resp_per_prompt=12
|
||||
train_prompt_mini_bsz=32
|
||||
|
||||
# Ray
|
||||
# RAY_ADDRESS=${RAY_ADDRESS:-"http://localhost:8265"}
|
||||
# WORKING_DIR=${WORKING_DIR:-"${PWD}"}
|
||||
# RUNTIME_ENV=${RUNTIME_ENV:-"${WORKING_DIR}/verl/trainer/runtime_env.yaml"}
|
||||
NNODES=${NNODES:-2}
|
||||
NGPUS_PER_NODE=${NGPUS_PER_NODE:-8}
|
||||
# Paths
|
||||
RAY_DATA_HOME=${RAY_DATA_HOME:-"${HOME}/verl"}
|
||||
# very important! please modify the max_position_embeddings in config.json to 32768 after downloading from huggingface
|
||||
MODEL_PATH=${MODEL_PATH:-"${RAY_DATA_HOME}/models/Qwen2.5-Math-7B"}
|
||||
CKPTS_DIR=${CKPTS_DIR:-"${RAY_DATA_HOME}/ckpts/${project_name}/${exp_name}"}
|
||||
TRAIN_FILE=${TRAIN_FILE:-"${RAY_DATA_HOME}/data/dapo-math-17k.parquet"}
|
||||
TEST_FILE=${TEST_FILE:-"${RAY_DATA_HOME}/data/aime-2024.parquet"}
|
||||
# Algorithm
|
||||
temperature=1.0
|
||||
top_p=1.0
|
||||
top_k=-1 # 0 for HF rollout, -1 for vLLM rollout
|
||||
val_top_p=0.7
|
||||
|
||||
# Performance Related Parameter
|
||||
use_dynamic_bsz=True
|
||||
actor_ppo_max_token_len=$(((max_prompt_length + max_response_length) * 2))
|
||||
infer_ppo_max_token_len=$(((max_prompt_length + max_response_length) * 3))
|
||||
offload=True
|
||||
gen_tp=2
|
||||
sp_size=4
|
||||
fsdp_size=2
|
||||
|
||||
# reference run wandb: https://wandb.ai/verl-org/DAPO%20Reproduction%20on%20verl/runs/ow47vvon?nw=nwusertongyuxuan361
|
||||
|
||||
python3 -m verl.trainer.main_ppo \
|
||||
data.train_files="${TRAIN_FILE}" \
|
||||
data.val_files="${TEST_FILE}" \
|
||||
data.prompt_key=prompt \
|
||||
data.truncation='left' \
|
||||
data.max_prompt_length=${max_prompt_length} \
|
||||
data.max_response_length=${max_response_length} \
|
||||
data.train_batch_size=${train_prompt_bsz} \
|
||||
actor_rollout_ref.rollout.n=${n_resp_per_prompt} \
|
||||
algorithm.adv_estimator=${adv_estimator} \
|
||||
algorithm.use_kl_in_reward=${use_kl_in_reward} \
|
||||
algorithm.kl_ctrl.kl_coef=${kl_coef} \
|
||||
actor_rollout_ref.actor.strategy=fsdp2 \
|
||||
critic.strategy=fsdp2 \
|
||||
actor_rollout_ref.actor.use_kl_loss=${use_kl_loss} \
|
||||
actor_rollout_ref.actor.kl_loss_coef=${kl_loss_coef} \
|
||||
actor_rollout_ref.actor.clip_ratio_low=${clip_ratio_low} \
|
||||
actor_rollout_ref.actor.clip_ratio_high=${clip_ratio_high} \
|
||||
actor_rollout_ref.actor.clip_ratio_c=10.0 \
|
||||
actor_rollout_ref.model.use_remove_padding=True \
|
||||
+actor_rollout_ref.model.override_config.max_position_embeddings=32768 \
|
||||
actor_rollout_ref.actor.use_dynamic_bsz=${use_dynamic_bsz} \
|
||||
actor_rollout_ref.ref.log_prob_use_dynamic_bsz=${use_dynamic_bsz} \
|
||||
actor_rollout_ref.rollout.log_prob_use_dynamic_bsz=${use_dynamic_bsz} \
|
||||
actor_rollout_ref.actor.ppo_max_token_len_per_gpu=${actor_ppo_max_token_len} \
|
||||
actor_rollout_ref.ref.log_prob_max_token_len_per_gpu=${infer_ppo_max_token_len} \
|
||||
actor_rollout_ref.rollout.log_prob_max_token_len_per_gpu=${infer_ppo_max_token_len} \
|
||||
actor_rollout_ref.model.path="${MODEL_PATH}" \
|
||||
actor_rollout_ref.model.enable_gradient_checkpointing=True \
|
||||
actor_rollout_ref.actor.optim.lr=1e-6 \
|
||||
actor_rollout_ref.actor.optim.lr_warmup_steps=10 \
|
||||
actor_rollout_ref.actor.optim.weight_decay=0.1 \
|
||||
actor_rollout_ref.actor.ppo_mini_batch_size=${train_prompt_mini_bsz} \
|
||||
actor_rollout_ref.actor.fsdp_config.param_offload=${offload} \
|
||||
actor_rollout_ref.actor.fsdp_config.optimizer_offload=${offload} \
|
||||
actor_rollout_ref.actor.entropy_coeff=0 \
|
||||
actor_rollout_ref.actor.grad_clip=1.0 \
|
||||
actor_rollout_ref.actor.loss_agg_mode=${loss_agg_mode} \
|
||||
actor_rollout_ref.actor.ulysses_sequence_parallel_size=${sp_size} \
|
||||
actor_rollout_ref.rollout.gpu_memory_utilization=0.80 \
|
||||
actor_rollout_ref.rollout.tensor_model_parallel_size=${gen_tp} \
|
||||
actor_rollout_ref.rollout.layered_summon=True \
|
||||
actor_rollout_ref.rollout.load_format=safetensors \
|
||||
actor_rollout_ref.rollout.max_num_batched_tokens=$((max_prompt_length + max_response_length)) \
|
||||
actor_rollout_ref.rollout.temperature=${temperature} \
|
||||
actor_rollout_ref.rollout.top_p=${top_p} \
|
||||
actor_rollout_ref.rollout.top_k=${top_k} \
|
||||
actor_rollout_ref.rollout.val_kwargs.temperature=${temperature} \
|
||||
actor_rollout_ref.rollout.val_kwargs.top_p=${val_top_p} \
|
||||
actor_rollout_ref.rollout.val_kwargs.top_k=${top_k} \
|
||||
actor_rollout_ref.rollout.val_kwargs.do_sample=True \
|
||||
actor_rollout_ref.rollout.val_kwargs.n=1 \
|
||||
actor_rollout_ref.rollout.name=sglang \
|
||||
actor_rollout_ref.ref.fsdp_config.param_offload=${offload} \
|
||||
actor_rollout_ref.ref.ulysses_sequence_parallel_size=${sp_size} \
|
||||
actor_rollout_ref.actor.fsdp_config.fsdp_size=${fsdp_size} \
|
||||
reward_model.reward_manager=dapo \
|
||||
+reward_model.reward_kwargs.overlong_buffer_cfg.enable=${enable_overlong_buffer} \
|
||||
+reward_model.reward_kwargs.overlong_buffer_cfg.len=${overlong_buffer_len} \
|
||||
+reward_model.reward_kwargs.overlong_buffer_cfg.penalty_factor=${overlong_penalty_factor} \
|
||||
+reward_model.reward_kwargs.overlong_buffer_cfg.log=False \
|
||||
+reward_model.reward_kwargs.max_resp_len=${max_response_length} \
|
||||
trainer.logger=['console','tensorboard'] \
|
||||
trainer.project_name="${project_name}" \
|
||||
trainer.experiment_name="${exp_name}" \
|
||||
trainer.n_gpus_per_node="${NGPUS_PER_NODE}" \
|
||||
trainer.nnodes="${NNODES}" \
|
||||
trainer.val_before_train=True \
|
||||
trainer.test_freq=10 \
|
||||
trainer.save_freq=-1 \
|
||||
trainer.total_epochs=10 \
|
||||
trainer.total_training_steps=100 \
|
||||
trainer.default_local_dir="${CKPTS_DIR}" \
|
||||
trainer.resume_mode=auto \
|
||||
trainer.log_val_generations=10
|
@ -13,6 +13,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import os
|
||||
|
||||
@ -83,13 +84,20 @@ class ActorRolloutRefWorker(ARRWorker):
|
||||
assert hasattr(self, "_weights_info") and self._weights_info is not None
|
||||
|
||||
params = self._get_actor_params() if self._is_actor else None
|
||||
rollout_name = self.config.rollout.name
|
||||
if self._is_rollout:
|
||||
inference_model = (
|
||||
self.rollout.inference_engine.llm_engine.model_executor.driver_worker.worker.model_runner.model
|
||||
)
|
||||
from verl.utils.vllm.patch import patch_vllm_moe_model_weight_loader
|
||||
if rollout_name == "vllm":
|
||||
inference_model = (
|
||||
self.rollout.inference_engine.llm_engine.model_executor.driver_worker.worker.model_runner.model
|
||||
)
|
||||
from verl.utils.vllm.patch import patch_vllm_moe_model_weight_loader
|
||||
|
||||
patch_vllm_moe_model_weight_loader(inference_model)
|
||||
patch_vllm_moe_model_weight_loader(inference_model)
|
||||
elif rollout_name == "sglang":
|
||||
inference_model = self.rollout._engine
|
||||
else:
|
||||
raise NotImplementedError(f"Unknown rollout name: {rollout_name}")
|
||||
loop = asyncio.get_event_loop()
|
||||
for key, shape, dtype in self._weights_info:
|
||||
tensor = torch.empty(shape, dtype=dtype, device=get_torch_device().current_device())
|
||||
if self._is_actor:
|
||||
@ -102,7 +110,23 @@ class ActorRolloutRefWorker(ARRWorker):
|
||||
|
||||
self._weight_sync_group.broadcast(tensor, src=0, stream=get_torch_device().current_stream())
|
||||
if self._is_rollout:
|
||||
inference_model.load_weights([(key, tensor)])
|
||||
if rollout_name == "vllm":
|
||||
inference_model.load_weights([(key, tensor)])
|
||||
elif rollout_name == "sglang":
|
||||
loop.run_until_complete(self.update_weights(inference_model, [(key, tensor)]))
|
||||
|
||||
async def update_weights(self, inference_engine, params):
|
||||
from sglang.srt.weight_sync.utils import update_weights as sgl_update_weights
|
||||
|
||||
await sgl_update_weights(
|
||||
engine=inference_engine,
|
||||
params_batch=params,
|
||||
device_mesh_key="infer_tp",
|
||||
device_mesh=self.rollout_device_mesh,
|
||||
)
|
||||
|
||||
if self.rollout_device_mesh["infer_tp"].get_local_rank() == 0:
|
||||
await inference_engine.flush_cache()
|
||||
|
||||
@register(dispatch_mode=Dispatch.ONE_TO_ALL)
|
||||
def get_actor_weights_info(self):
|
||||
@ -209,6 +233,7 @@ class RolloutWorker(ActorRolloutRefWorker):
|
||||
rollout_device_mesh = init_device_mesh(
|
||||
device_name, mesh_shape=(dp, infer_tp), mesh_dim_names=["dp", "infer_tp"]
|
||||
)
|
||||
self.rollout_device_mesh = rollout_device_mesh
|
||||
|
||||
is_collect = rollout_device_mesh["infer_tp"].get_local_rank() == 0
|
||||
self._register_dispatch_collect_info(
|
||||
@ -216,7 +241,8 @@ class RolloutWorker(ActorRolloutRefWorker):
|
||||
)
|
||||
|
||||
rollout_name = self.config.rollout.name
|
||||
assert rollout_name == "vllm"
|
||||
if rollout_name not in ("vllm", "sglang"):
|
||||
raise NotImplementedError(f"rollout_name: {rollout_name} is not supported")
|
||||
|
||||
rollout_config: RolloutConfig = omega_conf_to_dataclass(self.config.rollout)
|
||||
model_config: HFModelConfig = omega_conf_to_dataclass(self.config.model, dataclass_type=HFModelConfig)
|
||||
@ -227,14 +253,23 @@ class RolloutWorker(ActorRolloutRefWorker):
|
||||
config=rollout_config, model_config=model_config, device_mesh=rollout_device_mesh
|
||||
)
|
||||
log_gpu_memory_usage(f"After building {rollout_name} rollout", logger=logger)
|
||||
from .vllm_sharding_manager import VLLMShardingManager
|
||||
|
||||
rollout_sharding_manager = VLLMShardingManager(
|
||||
inference_engine=rollout.inference_engine, device_mesh=rollout_device_mesh
|
||||
)
|
||||
if rollout_name == "vllm":
|
||||
from .vllm_sharding_manager import VLLMShardingManager
|
||||
|
||||
log_gpu_memory_usage("After building sharding manager", logger=logger)
|
||||
rollout_sharding_manager = VLLMShardingManager(
|
||||
inference_engine=rollout.inference_engine, device_mesh=rollout_device_mesh
|
||||
)
|
||||
|
||||
log_gpu_memory_usage("After building sharding manager", logger=logger)
|
||||
elif rollout_name == "sglang":
|
||||
from .sglang_sharding_manager import SGLangShardingManager
|
||||
|
||||
rollout_sharding_manager = SGLangShardingManager(device_mesh=rollout_device_mesh)
|
||||
|
||||
log_gpu_memory_usage("After building sharding manager", logger=logger)
|
||||
|
||||
self.model_config = model_config
|
||||
self.rollout = rollout
|
||||
self.rollout_sharding_manager = rollout_sharding_manager
|
||||
|
||||
|
@ -0,0 +1,65 @@
|
||||
set -x
|
||||
|
||||
project_name='GRPO'
|
||||
exp_name='GRPO-Qwen3-0.6b-gsm8k-fsdp2-sglang-one-step-off-2-6'
|
||||
|
||||
# Paths
|
||||
RAY_DATA_HOME=${RAY_DATA_HOME:-"${HOME}/verl"}
|
||||
MODEL_PATH=${MODEL_PATH:-"${RAY_DATA_HOME}/models/Qwen3-0.6B"}
|
||||
CKPTS_DIR=${CKPTS_DIR:-"${RAY_DATA_HOME}/ckpts/${project_name}/${exp_name}"}
|
||||
TRAIN_FILE=${TRAIN_FILE:-"${RAY_DATA_HOME}/data/gsm8k/train.parquet"}
|
||||
TEST_FILE=${TEST_FILE:-"${RAY_DATA_HOME}/data/gsm8k/test.parquet"}
|
||||
|
||||
NNODES=${NNODES:-1}
|
||||
NGPUS_PER_NODE=${NGPUS_PER_NODE:-8}
|
||||
|
||||
n_gpus_rollout=2
|
||||
n_gpus_training=$((NGPUS_PER_NODE - n_gpus_rollout))
|
||||
|
||||
|
||||
python3 -m recipe.one_step_off_policy.main_ppo \
|
||||
algorithm.adv_estimator=grpo \
|
||||
data.train_files="${TRAIN_FILE}" \
|
||||
data.val_files="${TEST_FILE}" \
|
||||
data.train_batch_size=1152 \
|
||||
data.max_prompt_length=512 \
|
||||
data.max_response_length=1024 \
|
||||
data.filter_overlong_prompts=True \
|
||||
data.truncation='error' \
|
||||
actor_rollout_ref.actor.strategy=fsdp2 \
|
||||
critic.strategy=fsdp2 \
|
||||
actor_rollout_ref.model.path="${MODEL_PATH}" \
|
||||
actor_rollout_ref.actor.optim.lr=1e-6 \
|
||||
actor_rollout_ref.hybrid_engine=False \
|
||||
actor_rollout_ref.model.use_remove_padding=True \
|
||||
actor_rollout_ref.actor.ppo_mini_batch_size=192 \
|
||||
actor_rollout_ref.actor.ppo_micro_batch_size_per_gpu=32 \
|
||||
actor_rollout_ref.actor.use_kl_loss=True \
|
||||
actor_rollout_ref.actor.kl_loss_coef=0.001 \
|
||||
actor_rollout_ref.actor.kl_loss_type=low_var_kl \
|
||||
actor_rollout_ref.actor.entropy_coeff=0 \
|
||||
actor_rollout_ref.model.enable_gradient_checkpointing=True \
|
||||
actor_rollout_ref.actor.fsdp_config.param_offload=False \
|
||||
actor_rollout_ref.actor.fsdp_config.optimizer_offload=False \
|
||||
actor_rollout_ref.rollout.log_prob_micro_batch_size_per_gpu=32 \
|
||||
actor_rollout_ref.rollout.tensor_model_parallel_size=2 \
|
||||
actor_rollout_ref.rollout.name=sglang \
|
||||
actor_rollout_ref.rollout.gpu_memory_utilization=0.6 \
|
||||
actor_rollout_ref.rollout.n=5 \
|
||||
actor_rollout_ref.rollout.load_format=safetensors \
|
||||
actor_rollout_ref.rollout.layered_summon=True \
|
||||
actor_rollout_ref.ref.log_prob_micro_batch_size_per_gpu=32 \
|
||||
actor_rollout_ref.ref.fsdp_config.param_offload=True \
|
||||
algorithm.use_kl_in_reward=False \
|
||||
trainer.critic_warmup=0 \
|
||||
trainer.val_before_train=True \
|
||||
trainer.logger=['console','tensorboard'] \
|
||||
trainer.project_name="${project_name}" \
|
||||
trainer.experiment_name="${exp_name}" \
|
||||
trainer.save_freq=-1 \
|
||||
trainer.test_freq=5 \
|
||||
trainer.total_epochs=2 \
|
||||
trainer.nnodes="${NNODES}" \
|
||||
trainer.n_gpus_per_node="${n_gpus_training}" \
|
||||
rollout.nnodes="${NNODES}" \
|
||||
rollout.n_gpus_per_node="${n_gpus_rollout}" $@
|
70
recipe/one_step_off_policy/sglang_sharding_manager.py
Normal file
70
recipe/one_step_off_policy/sglang_sharding_manager.py
Normal file
@ -0,0 +1,70 @@
|
||||
# Copyright 2025 Bytedance Ltd. and/or its affiliates
|
||||
# Copyright 2025 Meituan Ltd. and/or its affiliates
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
import os
|
||||
|
||||
from torch.distributed.device_mesh import DeviceMesh
|
||||
|
||||
from verl import DataProto
|
||||
from verl.protocol import all_gather_data_proto
|
||||
from verl.utils.debug import GPUMemoryLogger
|
||||
from verl.utils.device import get_torch_device
|
||||
from verl.utils.torch_functional import check_device_is_available
|
||||
from verl.workers.sharding_manager.base import BaseShardingManager
|
||||
|
||||
logger = logging.getLogger(__file__)
|
||||
logger.setLevel(os.getenv("VERL_LOGGING_LEVEL", "WARN"))
|
||||
|
||||
|
||||
class SGLangShardingManager(BaseShardingManager):
|
||||
@check_device_is_available()
|
||||
def __init__(self, device_mesh: DeviceMesh):
|
||||
self.device_mesh = device_mesh
|
||||
self.tp_size = self.device_mesh["infer_tp"].size()
|
||||
self.tp_rank = self.device_mesh["infer_tp"].get_local_rank()
|
||||
self.timing = {}
|
||||
gen_dp_rank = self.device_mesh["dp"].get_local_rank()
|
||||
get_torch_device().manual_seed(gen_dp_rank + 1000)
|
||||
self.gen_random_states = get_torch_device().get_rng_state()
|
||||
|
||||
@GPUMemoryLogger(role="vllm sharding_manager", logger=logger)
|
||||
def __enter__(self):
|
||||
get_torch_device().set_rng_state(self.gen_random_states)
|
||||
|
||||
@GPUMemoryLogger(role="vllm sharding_manager", logger=logger)
|
||||
def __exit__(self, exc_type, exc_value, traceback):
|
||||
self.gen_random_states = get_torch_device().get_rng_state()
|
||||
get_torch_device().empty_cache()
|
||||
|
||||
@GPUMemoryLogger(role="vllm sharding_manager", logger=logger)
|
||||
def preprocess_data(self, data: DataProto) -> DataProto:
|
||||
"""All gather across tp group to make each rank has identical input."""
|
||||
if self.tp_size == 1:
|
||||
return data
|
||||
|
||||
# TODO: Current impl doesn't consider FSDP with torch micro-dp
|
||||
group = self.device_mesh["infer_tp"].get_group()
|
||||
|
||||
all_gather_data_proto(data=data, process_group=group)
|
||||
return data
|
||||
|
||||
@GPUMemoryLogger(role="vllm sharding_manager", logger=logger)
|
||||
def postprocess_data(self, data: DataProto) -> DataProto:
|
||||
"""Get chunk data of this tp rank since we do all gather in preprocess."""
|
||||
if self.tp_size == 1:
|
||||
return data
|
||||
|
||||
return data.chunk(chunks=self.tp_size)[self.tp_rank]
|
55
recipe/open_math_reasoning/README.md
Normal file
55
recipe/open_math_reasoning/README.md
Normal file
@ -0,0 +1,55 @@
|
||||
# Open math reasoning
|
||||
## Introduction
|
||||
In this recipe, we perform SFT on the [open math reasoning](https://huggingface.co/datasets/nvidia/OpenMathReasoning) dataset using the new SFT trainer with backend agostic model engine. Note that our goal is not to replicate the [AIMO-2 Winning Solution](https://arxiv.org/abs/2504.16891) work, but to demonstrate a SFT demo from end to end.
|
||||
|
||||
Note that you may need to modify the path as needed in the following scripts.
|
||||
## Dataset Preprocessing
|
||||
### Download Dataset
|
||||
```bash
|
||||
hf download nvidia/OpenMathReasoning --repo-type dataset --include data/cot* --local-dir /path/to/dataset/nvidia/OpenMathReasoning
|
||||
hf download math-ai/aime24 --repo-type dataset --local-dir /path/to/dataset/math-ai/aime24
|
||||
hf download math-ai/aime25 --repo-type dataset --local-dir /path/to/dataset/math-ai/aime25
|
||||
```
|
||||
|
||||
### Preprocess the dataset
|
||||
```bash
|
||||
python3 recipe/open_math_reasoning/prepare_nvidia-OpenMathReasoning_sft.py --local_dataset_path /path/to/nvidia/OpenMathReasoning --local_save_dir /path/to/open_math_reasoning
|
||||
```
|
||||
|
||||
### Prepare the eval dataset
|
||||
```bash
|
||||
python3 recipe/open_math_reasoning/prepare_eval_dataset.py --local_dataset_path /path/to/dataset --local_save_dir /path/to/eval_dataset
|
||||
```
|
||||
|
||||
## Train the model using SFT
|
||||
### FSDP backend
|
||||
export CKPT_HOME=/path/to/ckpt
|
||||
export BACKEND=fsdp2
|
||||
export MODEL_ID=Qwen/Qwen3-8B-Base
|
||||
export TRAIN_FILES=/path/to/open_math_reasoning/cot_dataset.parquet
|
||||
bash recipe/open_math_reasoning/run_sft_qwen3_8b.sh
|
||||
|
||||
### Megatron backend
|
||||
TODO
|
||||
|
||||
## Eval the model
|
||||
### Merge checkpoint into huggingface format
|
||||
```bash
|
||||
python -m verl.model_merger merge --backend fsdp --local_dir /path/to/ckpt/global_step_19751 --target_dir /path/to/ckpt/global_step_19751/huggingface
|
||||
```
|
||||
|
||||
### Generate the responses
|
||||
```bash
|
||||
export MODEL_PATH=/path/to/ckpt/global_step_19751/huggingface
|
||||
bash recipe/open_math_reasoning/run_generation.sh
|
||||
```
|
||||
|
||||
### Evaluate the responses
|
||||
```bash
|
||||
bash recipe/open_math_reasoning/run_eval.sh
|
||||
```
|
||||
|
||||
You should see the results like:
|
||||
```python
|
||||
{'test_score/aime24': 0.584375, 'test_score/aime25': 0.43333333333333335}
|
||||
```
|
22
recipe/open_math_reasoning/compute_score.py
Normal file
22
recipe/open_math_reasoning/compute_score.py
Normal file
@ -0,0 +1,22 @@
|
||||
# Copyright 2025 Bytedance Ltd. and/or its affiliates
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
|
||||
def compute_score_data_source(data_source, response, ground_truth):
|
||||
from verl.utils.reward_score.math_reward import compute_score
|
||||
|
||||
if data_source in ["aime24", "aime25"]:
|
||||
return compute_score(response, ground_truth)
|
||||
else:
|
||||
raise ValueError(f"Unknown data source: {data_source}")
|
96
recipe/open_math_reasoning/prepare_eval_dataset.py
Normal file
96
recipe/open_math_reasoning/prepare_eval_dataset.py
Normal file
@ -0,0 +1,96 @@
|
||||
# Copyright 2025 Bytedance Ltd. and/or its affiliates
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
# prepare eval dataset including AIME'24, AIME'25
|
||||
|
||||
# hf download math-ai/aime24 --repo-type dataset --local-dir /opt/tiger/datasets/math-ai/aime24
|
||||
# hf download math-ai/aime25 --repo-type dataset --local-dir /opt/tiger/datasets/math-ai/aime25
|
||||
|
||||
import os
|
||||
|
||||
import datasets
|
||||
|
||||
from verl.utils.reward_score.math_reward import remove_boxed
|
||||
|
||||
instruction_following = "Please reason step by step, and put your final answer within \\boxed{}."
|
||||
|
||||
|
||||
def make_map_fn(data_source):
|
||||
def process_fn(example, idx):
|
||||
question_raw = example.pop("problem")
|
||||
|
||||
question = question_raw + " " + instruction_following
|
||||
|
||||
if "solution" not in example:
|
||||
example["solution"] = example["answer"]
|
||||
|
||||
answer_raw = example.pop("solution")
|
||||
|
||||
example.clear()
|
||||
|
||||
try:
|
||||
solution = remove_boxed(answer_raw)
|
||||
except Exception:
|
||||
solution = answer_raw
|
||||
|
||||
data = {
|
||||
"data_source": data_source,
|
||||
"prompt": [
|
||||
{
|
||||
"role": "user",
|
||||
"content": question,
|
||||
}
|
||||
],
|
||||
"ability": "math",
|
||||
"reward_model": {"style": "rule", "ground_truth": solution},
|
||||
"extra_info": {
|
||||
"index": idx,
|
||||
"answer": answer_raw,
|
||||
"question": question_raw,
|
||||
},
|
||||
}
|
||||
return data
|
||||
|
||||
return process_fn
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import argparse
|
||||
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--local_dataset_path", default=None, help="The local path to the raw dataset, if it exists.")
|
||||
parser.add_argument(
|
||||
"--local_save_dir", default="~/data/math-ai", help="The save directory for the preprocessed dataset."
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.local_dataset_path is not None:
|
||||
aime24_dataset_path = os.path.join(args.local_dataset_path, "math-ai/aime24")
|
||||
aime25_dataset_path = os.path.join(args.local_dataset_path, "math-ai/aime25")
|
||||
else:
|
||||
aime24_dataset_path = "math-ai/aime24"
|
||||
aime25_dataset_path = "math-ai/aime25"
|
||||
|
||||
aime24_dataset = datasets.load_dataset(aime24_dataset_path, split="test")
|
||||
aime25_dataset = datasets.load_dataset(aime25_dataset_path, split="test")
|
||||
|
||||
aime24_dataset = aime24_dataset.map(function=make_map_fn("aime24"), with_indices=True)
|
||||
aime25_dataset = aime25_dataset.map(function=make_map_fn("aime25"), with_indices=True)
|
||||
|
||||
local_save_dir = os.path.expanduser(args.local_save_dir)
|
||||
os.makedirs(local_save_dir, exist_ok=True)
|
||||
|
||||
aime24_dataset.to_parquet(os.path.join(local_save_dir, "aime24_test.parquet"))
|
||||
aime25_dataset.to_parquet(os.path.join(local_save_dir, "aime25_test.parquet"))
|
@ -0,0 +1,72 @@
|
||||
# Copyright 2025 Bytedance Ltd. and/or its affiliates
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
"""
|
||||
huggingface-cli download nvidia/OpenMathReasoning --repo-type dataset --include data/cot* \
|
||||
--local-dir /path/to/nvidia/OpenMathReasoning
|
||||
huggingface-cli download nvidia/OpenMathReasoning --repo-type dataset --include data/cot* \
|
||||
--local-dir /opt/tiger/nvidia/OpenMathReasoning
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import os
|
||||
|
||||
import datasets
|
||||
|
||||
if __name__ == "__main__":
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--local_dataset_path", default=None, help="The local path to the raw dataset, if it exists.")
|
||||
parser.add_argument(
|
||||
"--local_save_dir",
|
||||
default="~/data/open_math_reasoning",
|
||||
help="The save directory for the preprocessed dataset.",
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
local_dataset_path = args.local_dataset_path
|
||||
|
||||
data_source = "nvidia/OpenMathReasoning"
|
||||
|
||||
if local_dataset_path is not None:
|
||||
dataset = datasets.load_dataset(local_dataset_path, split="cot")
|
||||
else:
|
||||
dataset = datasets.load_dataset(data_source, split="cot")
|
||||
|
||||
def make_map_fn(split):
|
||||
def process_fn(example, idx):
|
||||
question = example.pop("problem")
|
||||
solution = example.pop("generated_solution")
|
||||
|
||||
extra_info = {}
|
||||
for key, value in example.items():
|
||||
extra_info[key] = value
|
||||
example.clear()
|
||||
|
||||
data = {
|
||||
"messages": [
|
||||
{"role": "user", "content": question, "loss_mask": 0},
|
||||
{"role": "assistant", "content": solution, "loss_mask": 1},
|
||||
],
|
||||
"extra_info": extra_info,
|
||||
}
|
||||
return data
|
||||
|
||||
return process_fn
|
||||
|
||||
# filter out data where the problem_type is not has_answer_extracted
|
||||
dataset = dataset.filter(lambda example: example["problem_type"] == "has_answer_extracted")
|
||||
dataset = dataset.map(function=make_map_fn("cot"), with_indices=True)
|
||||
local_save_dir = os.path.expanduser(args.local_save_dir)
|
||||
os.makedirs(local_save_dir, exist_ok=True)
|
||||
dataset.to_parquet(os.path.join(local_save_dir, "cot_dataset.parquet"))
|
7
recipe/open_math_reasoning/run_eval.sh
Normal file
7
recipe/open_math_reasoning/run_eval.sh
Normal file
@ -0,0 +1,7 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
# Evaluation
|
||||
python3 -m verl.trainer.main_eval \
|
||||
data.path=$HOME/data/gen/qwen_8b_gen_test.parquet \
|
||||
custom_reward_function.path=recipe/open_math_reasoning/compute_score.py \
|
||||
custom_reward_function.name=compute_score_data_source
|
32
recipe/open_math_reasoning/run_generation.sh
Normal file
32
recipe/open_math_reasoning/run_generation.sh
Normal file
@ -0,0 +1,32 @@
|
||||
#!/usr/bin/env bash
|
||||
|
||||
MODEL_PATH=${MODEL_PATH:-/path/to/ckpt/global_step_19751/huggingface}
|
||||
|
||||
NGPUS_PER_NODE=${NGPUS_PER_NODE:-8}
|
||||
NNODES=${NNODES:-1}
|
||||
OUTPUT_PATH=${OUTPUT_PATH:-$HOME/data/gen/qwen_8b_gen_test.parquet}
|
||||
GEN_TP=${GEN_TP:-1} # Default tensor parallel size to 2
|
||||
|
||||
aime24_test_path=${HOME}/data/math-ai/aime24_test.parquet
|
||||
aime25_test_path=${HOME}/data/math-ai/aime25_test.parquet
|
||||
train_files="['$aime24_test_path', '$aime25_test_path']"
|
||||
|
||||
python3 -m verl.trainer.main_generation_server \
|
||||
trainer.nnodes="${NNODES}" \
|
||||
trainer.n_gpus_per_node="${NGPUS_PER_NODE}" \
|
||||
actor_rollout_ref.model.path="${MODEL_PATH}" \
|
||||
actor_rollout_ref.model.trust_remote_code=True \
|
||||
actor_rollout_ref.rollout.temperature=1.0 \
|
||||
actor_rollout_ref.rollout.top_p=0.7 \
|
||||
actor_rollout_ref.rollout.prompt_length=2048 \
|
||||
actor_rollout_ref.rollout.response_length=20480 \
|
||||
actor_rollout_ref.rollout.tensor_model_parallel_size="${GEN_TP}" \
|
||||
actor_rollout_ref.rollout.gpu_memory_utilization=0.9 \
|
||||
actor_rollout_ref.rollout.name=vllm \
|
||||
actor_rollout_ref.rollout.n=32 \
|
||||
data.train_files="$train_files" \
|
||||
data.prompt_key=prompt \
|
||||
+data.output_path="${OUTPUT_PATH}" \
|
||||
|
||||
|
||||
|
94
recipe/open_math_reasoning/run_sft_qwen3_8b.sh
Normal file
94
recipe/open_math_reasoning/run_sft_qwen3_8b.sh
Normal file
@ -0,0 +1,94 @@
|
||||
#!/usr/bin/env bash
|
||||
set -xeuo pipefail
|
||||
|
||||
ENTRYPOINT=${ENTRYPOINT:-"-m verl.trainer.sft_trainer"}
|
||||
|
||||
TRAIN_FILES=${TRAIN_FILES:-/path/to/cot_dataset.parquet}
|
||||
|
||||
backend=${BACKEND:-fsdp}
|
||||
|
||||
project_name=verl_sft_test
|
||||
|
||||
RESUME_MODE=auto
|
||||
MODEL_ID=${MODEL_ID:-Qwen/Qwen3-8B-Base}
|
||||
|
||||
SP_SIZE=${SP_SIZE:-8}
|
||||
FSDP_SIZE=${FSDP_SIZE:-16}
|
||||
FSDP_STRATEGY=${FSDP_STRATEGY:-"fsdp2"}
|
||||
|
||||
TP_SIZE=${TP_SIZE:-1}
|
||||
PP_SIZE=${PP_SIZE:-1}
|
||||
VPP_SIZE=${VPP_SIZE:-null}
|
||||
CP_SIZE=${CP_SIZE:-1}
|
||||
|
||||
PAD_MODE=${PAD_MODE:-no_padding}
|
||||
|
||||
USE_REMOVE_PADDING=${USE_REMOVE_PADDING:-True}
|
||||
|
||||
FSDP_ENGINE_CONFIG="\
|
||||
engine=${backend} \
|
||||
optim=${backend} \
|
||||
optim.lr=2e-5 \
|
||||
optim.lr_warmup_steps_ratio=0.01 \
|
||||
optim.weight_decay=0.1 \
|
||||
optim.betas="[0.9,0.95]" \
|
||||
optim.clip_grad=1.0 \
|
||||
optim.min_lr_ratio=0.1 \
|
||||
optim.warmup_style=cosine \
|
||||
engine.ulysses_sequence_parallel_size=${SP_SIZE} \
|
||||
engine.strategy=${FSDP_STRATEGY} \
|
||||
engine.fsdp_size=${FSDP_SIZE}"
|
||||
|
||||
|
||||
MEGATRON_ENGINE_CONFIG="\
|
||||
engine=${backend} \
|
||||
optim=${backend} \
|
||||
optim.lr=1e-5 \
|
||||
optim.lr_warmup_steps_ratio=0.2 \
|
||||
optim.weight_decay=0.1 \
|
||||
optim.betas="[0.9,0.95]" \
|
||||
optim.clip_grad=1.0 \
|
||||
optim.lr_warmup_init=0 \
|
||||
optim.lr_decay_style=cosine \
|
||||
optim.min_lr=1e-6 \
|
||||
engine.tensor_model_parallel_size=${TP_SIZE} \
|
||||
engine.pipeline_model_parallel_size=${PP_SIZE} \
|
||||
engine.virtual_pipeline_model_parallel_size=${VPP_SIZE} \
|
||||
engine.context_parallel_size=${CP_SIZE}"
|
||||
|
||||
if [ "$backend" = "fsdp" ]; then
|
||||
ENGINE_CONFIG="$FSDP_ENGINE_CONFIG"
|
||||
echo "Using fsdp engine"
|
||||
exp_name=nvidia-openmathreasoning-qwen3-8b-${backend}-${FSDP_STRATEGY}-sp${SP_SIZE}-fsdp-1008a1
|
||||
else
|
||||
ENGINE_CONFIG="$MEGATRON_ENGINE_CONFIG"
|
||||
echo "Using megatron engine"
|
||||
exp_name=nvidia-openmathreasoning-${backend}-tp${TP_SIZE}-pp${PP_SIZE}-vpp${VPP_SIZE}-cp${CP_SIZE}-pad-${PAD_MODE}-use_remove_padding-${USE_REMOVE_PADDING}
|
||||
fi
|
||||
|
||||
CKPT_HOME=${CKPT_HOME:-$HOME/open_verl/sft/${project_name}/${exp_name}}
|
||||
mkdir -p "${CKPT_HOME}"
|
||||
|
||||
torchrun --standalone --nnodes=1 --nproc-per-node=${NUM_TRAINERS:-8} \
|
||||
${ENTRYPOINT} \
|
||||
data.train_files="${TRAIN_FILES}" \
|
||||
data.train_batch_size=96 \
|
||||
data.max_length=32768 \
|
||||
data.pad_mode=${PAD_MODE} \
|
||||
data.truncation=error \
|
||||
data.use_dynamic_bsz=True \
|
||||
data.max_token_len_per_gpu=65536 \
|
||||
data.messages_key=messages \
|
||||
model.path=$MODEL_ID \
|
||||
model.use_remove_padding=${USE_REMOVE_PADDING} \
|
||||
${ENGINE_CONFIG} \
|
||||
trainer.test_freq=-1 \
|
||||
trainer.save_freq=4000 \
|
||||
trainer.logger=['console','wandb'] \
|
||||
trainer.project_name="${project_name}" \
|
||||
trainer.experiment_name="${exp_name}" \
|
||||
trainer.total_epochs=1 \
|
||||
trainer.default_local_dir="${CKPT_HOME}" \
|
||||
trainer.resume_mode=${RESUME_MODE} \
|
||||
trainer.max_ckpt_to_keep=5 \
|
||||
checkpoint.save_contents=[model,optimizer,extra]
|
@ -24,7 +24,7 @@ import ray
|
||||
import torch
|
||||
import torch.distributed as dist
|
||||
import torch.multiprocessing as mp
|
||||
from transformers import AutoModelForCausalLM, AutoModelForTokenClassification, Qwen3Config, Qwen3MoeConfig
|
||||
from transformers import AutoConfig, AutoModelForCausalLM, AutoModelForTokenClassification, Qwen3Config, Qwen3MoeConfig
|
||||
|
||||
from verl import DataProto
|
||||
from verl.single_controller.ray import RayClassWithInitArgs, RayResourcePool, RayWorkerGroup
|
||||
@ -289,8 +289,9 @@ def _worker(rank: int, world_size: int, rendezvous_file: str, strategy: str, mod
|
||||
world_size=world_size,
|
||||
)
|
||||
|
||||
ref_model_config = AutoConfig.from_pretrained(model_path)
|
||||
with torch.device("meta"):
|
||||
ref_model = AutoModelForCausalLM.from_pretrained(model_path)
|
||||
ref_model = AutoModelForCausalLM.from_config(ref_model_config)
|
||||
|
||||
from verl.workers.engine import BaseEngine, EngineRegistry
|
||||
|
||||
|
@ -97,14 +97,14 @@ def test_basic_rollout_is():
|
||||
rollout_log_prob=rollout_log_prob,
|
||||
response_mask=eos_mask,
|
||||
rollout_is_level="geometric",
|
||||
rollout_is_mode="clip",
|
||||
rollout_is_mode="mask",
|
||||
rollout_is_threshold=1.5,
|
||||
rollout_is_threshold_lower=0.5,
|
||||
rollout_is_veto_threshold=1e-4,
|
||||
)
|
||||
|
||||
print(f" Mean weight: {metrics_geo['mismatch/rollout_is_mean']:.4f}")
|
||||
print(f" Clipped fraction: {metrics_geo['mismatch/rollout_is_clipped_fraction']:.4f}")
|
||||
print(f" Masked fraction: {metrics_geo['mismatch/rollout_is_masked_fraction']:.4f}")
|
||||
print(" ✓ Geometric mean mode passed")
|
||||
|
||||
# Test veto mechanism
|
||||
|
@ -132,8 +132,8 @@ class TestRolloutISIntegration:
|
||||
assert "mismatch/rollout_is_mean" in metrics
|
||||
|
||||
def test_both_bounding_modes(self, sample_data):
|
||||
"""Test both truncate and clip modes."""
|
||||
modes = ["truncate", "clip"]
|
||||
"""Test both truncate and mask modes."""
|
||||
modes = ["truncate", "mask"]
|
||||
|
||||
for mode in modes:
|
||||
_, metrics = compute_rollout_importance_weights(
|
||||
|
@ -113,7 +113,7 @@ def gptmodel_forward_qwen2_5_vl(
|
||||
output_orig = model(
|
||||
input_ids=input_ids_rmpad,
|
||||
attention_mask=None,
|
||||
position_ids=position_ids,
|
||||
position_ids=None, # model will calculate position_ids
|
||||
packed_seq_params=packed_seq_params,
|
||||
pixel_values=pixel_values,
|
||||
image_grid_thw=image_grid_thw,
|
||||
|
@ -74,6 +74,7 @@ class SupportedModel(Enum):
|
||||
GLM4_MOE = "Glm4MoeForCausalLM"
|
||||
|
||||
QWEN3_TOKEN_CLASSIFICATION = "Qwen3ForTokenClassification"
|
||||
QWEN3_MOE_VL = "Qwen3VLMoeForConditionalGeneration"
|
||||
|
||||
|
||||
# Registry for model configuration converters
|
||||
@ -118,6 +119,7 @@ MODEL_FORWARD_REGISTRY: dict[SupportedModel, Callable] = {
|
||||
SupportedModel.QWEN3: gptmodel_forward,
|
||||
SupportedModel.QWEN3_MOE: gptmodel_forward,
|
||||
SupportedModel.QWEN2_5_VL: gptmodel_forward_qwen2_5_vl,
|
||||
SupportedModel.QWEN3_MOE_VL: gptmodel_forward_qwen2_5_vl,
|
||||
SupportedModel.DEEPSEEK_V3: gptmodel_forward,
|
||||
SupportedModel.GLM4_MOE: gptmodel_forward,
|
||||
SupportedModel.QWEN3_TOKEN_CLASSIFICATION: gptmodel_forward,
|
||||
@ -131,6 +133,7 @@ MODEL_FORWARD_NOPAD_REGISTRY: dict[SupportedModel, Callable] = {
|
||||
SupportedModel.MIXTRAL: gptmodel_forward_no_padding,
|
||||
SupportedModel.DEEPSEEK_V3: gptmodel_forward_no_padding,
|
||||
SupportedModel.QWEN2_5_VL: gptmodel_forward_no_padding,
|
||||
SupportedModel.QWEN3_MOE_VL: gptmodel_forward_no_padding,
|
||||
SupportedModel.LLAMA4: gptmodel_forward_no_padding,
|
||||
SupportedModel.QWEN3: gptmodel_forward_no_padding,
|
||||
SupportedModel.QWEN3_MOE: gptmodel_forward_no_padding,
|
||||
@ -148,6 +151,7 @@ MODEL_FORWARD_FUSED_REGISTRY: dict[SupportedModel, Callable] = {
|
||||
SupportedModel.MIXTRAL: fused_forward_gptmodel,
|
||||
SupportedModel.DEEPSEEK_V3: fused_forward_gptmodel,
|
||||
SupportedModel.QWEN2_5_VL: fused_forward_qwen2_5_vl,
|
||||
SupportedModel.QWEN3_MOE_VL: fused_forward_qwen2_5_vl,
|
||||
SupportedModel.LLAMA4: fused_forward_gptmodel,
|
||||
SupportedModel.QWEN3: fused_forward_gptmodel,
|
||||
SupportedModel.QWEN3_MOE: fused_forward_gptmodel,
|
||||
|
@ -77,7 +77,7 @@ class AlgoConfig(BaseConfig):
|
||||
float value = enabled (compute weights and metrics). This is the main on/off switch.
|
||||
rollout_is_threshold_lower (Optional[float]): Lower threshold for IS weights. If None, defaults to 1/upper.
|
||||
rollout_is_level (str): Aggregation level: "token", "sequence", or "geometric".
|
||||
rollout_is_mode (str): Bounding mode: "truncate" (cap upper only) or "clip" (zero outside bounds).
|
||||
rollout_is_mode (str): Bounding mode: "truncate" (cap upper only) or "mask" (zero outside bounds).
|
||||
rollout_is_veto_threshold (float): Per-token veto threshold for catastrophic outliers.
|
||||
rollout_is (bool): Whether to apply IS weights to policy loss. True = apply weights,
|
||||
False = compute metrics only (useful for monitoring before enabling correction). Default: False.
|
||||
|
@ -84,7 +84,7 @@ algorithm:
|
||||
# Aggregation level: "token" (biased), "sequence" (unbiased), "geometric" (experimental)
|
||||
rollout_is_level: token
|
||||
|
||||
# Bounding mode: "truncate" (cap upper only), "clip" (zero outside bounds)
|
||||
# Bounding mode: "truncate" (cap upper only), "mask" (zero outside bounds)
|
||||
rollout_is_mode: truncate
|
||||
|
||||
# Per-token veto threshold for catastrophic outliers
|
||||
|
@ -124,7 +124,7 @@ algorithm:
|
||||
# Aggregation level: "token" (biased), "sequence" (unbiased), "geometric" (experimental)
|
||||
rollout_is_level: token
|
||||
|
||||
# Bounding mode: "truncate" (cap upper only), "clip" (zero outside bounds)
|
||||
# Bounding mode: "truncate" (cap upper only), "mask" (zero outside bounds)
|
||||
rollout_is_mode: truncate
|
||||
|
||||
# Per-token veto threshold for catastrophic outliers
|
||||
|
@ -18,7 +18,7 @@ data:
|
||||
max_token_len_per_gpu: 8192
|
||||
use_dynamic_bsz: True
|
||||
train_files: ~/data/gsm8k/train.parquet
|
||||
val_files: ~/data/gsm8k/test.parquet
|
||||
val_files: null
|
||||
# Multi-turn settings
|
||||
messages_key: messages # Key for messages list in multi-turn mode
|
||||
tools_key: tools # Key for tools list in multi-turn mode
|
||||
|
@ -31,7 +31,8 @@ from verl.utils.fs import copy_to_local
|
||||
|
||||
|
||||
@ray.remote
|
||||
def process_item(reward_fn, data_source, response_lst, reward_data):
|
||||
def process_item(config, data_source, response_lst, reward_data):
|
||||
reward_fn = get_custom_reward_fn(config)
|
||||
ground_truth = reward_data["ground_truth"]
|
||||
score_lst = [reward_fn(data_source, r, ground_truth) for r in response_lst]
|
||||
return data_source, np.mean(score_lst)
|
||||
@ -53,11 +54,9 @@ def main(config):
|
||||
|
||||
# evaluate test_score based on data source
|
||||
data_source_reward = defaultdict(list)
|
||||
compute_score = get_custom_reward_fn(config)
|
||||
|
||||
# Create remote tasks
|
||||
remote_tasks = [
|
||||
process_item.remote(compute_score, data_sources[i], responses[i], reward_model_data[i]) for i in range(total)
|
||||
process_item.remote(config, data_sources[i], responses[i], reward_model_data[i]) for i in range(total)
|
||||
]
|
||||
|
||||
# Process results as they come in
|
||||
|
@ -17,6 +17,7 @@ Generate responses given a dataset of prompts
|
||||
|
||||
import os
|
||||
|
||||
import aiohttp
|
||||
import hydra
|
||||
import numpy as np
|
||||
import ray
|
||||
@ -30,31 +31,12 @@ from pprint import pprint
|
||||
|
||||
import pandas as pd
|
||||
from omegaconf import OmegaConf
|
||||
from openai import AsyncOpenAI
|
||||
from openai.types.chat import ChatCompletion
|
||||
|
||||
from verl.utils.hdfs_io import makedirs
|
||||
from verl.workers.rollout.replica import get_rollout_replica_class
|
||||
|
||||
|
||||
@hydra.main(config_path="config", config_name="ppo_trainer", version_base=None)
|
||||
def main(config):
|
||||
run_generation(config)
|
||||
|
||||
|
||||
def run_generation(config) -> None:
|
||||
if not ray.is_initialized():
|
||||
# this is for local ray cluster
|
||||
default_runtime_env = {"env_vars": {"TOKENIZERS_PARALLELISM": "true", "NCCL_DEBUG": "WARN", "VLLM_USE_V1": "1"}}
|
||||
ray_init_kwargs = config.ray_kwargs.get("ray_init", {})
|
||||
runtime_env_kwargs = ray_init_kwargs.get("runtime_env", {})
|
||||
runtime_env = OmegaConf.merge(default_runtime_env, runtime_env_kwargs)
|
||||
ray_init_kwargs = OmegaConf.create({**ray_init_kwargs, "runtime_env": runtime_env})
|
||||
print(f"ray init kwargs: {ray_init_kwargs}")
|
||||
ray.init(**OmegaConf.to_container(ray_init_kwargs))
|
||||
|
||||
ray.get(main_task.remote(config))
|
||||
|
||||
|
||||
async def start_server(config):
|
||||
tp_size = config.actor_rollout_ref.rollout.tensor_model_parallel_size
|
||||
num_replicas = (config.trainer.n_gpus_per_node * config.trainer.nnodes) // tp_size
|
||||
@ -81,23 +63,42 @@ async def start_server(config):
|
||||
return server_handles, server_addresses
|
||||
|
||||
|
||||
async def generate_per_replica(server_address, model_path: str, n_samples: int, sampling_params: dict, chat_lst: list):
|
||||
# here we should sample n_samples for each chat_lst
|
||||
client = AsyncOpenAI(
|
||||
api_key="123-abc",
|
||||
base_url=f"http://{server_address}/v1",
|
||||
)
|
||||
async def submit_request(server_address, **chat_complete_request):
|
||||
try:
|
||||
extra_headers = chat_complete_request.pop("extra_headers", {})
|
||||
timeout = aiohttp.ClientTimeout(total=None)
|
||||
session = aiohttp.ClientSession(timeout=timeout)
|
||||
async with session.post(
|
||||
url=f"http://{server_address}/v1/chat/completions",
|
||||
headers={"Authorization": "Bearer token-abc123", **extra_headers},
|
||||
json=chat_complete_request,
|
||||
) as resp:
|
||||
data = await resp.json()
|
||||
return ChatCompletion(**data)
|
||||
finally:
|
||||
await session.close()
|
||||
|
||||
tasks = [
|
||||
client.chat.completions.create(
|
||||
model=model_path,
|
||||
messages=messages,
|
||||
|
||||
async def generate_per_replica(server_address, model_path: str, n_samples: int, sampling_params: dict, chat_lst: list):
|
||||
# here we should sample n_samples for each chat_lst.
|
||||
# we use aiohttp to avoid hang in AsyncOpenAI when the number of requests is large.
|
||||
|
||||
# client = AsyncOpenAI(
|
||||
# api_key="123-abc",
|
||||
# base_url=f"http://{server_address}/v1",
|
||||
# )
|
||||
|
||||
chat_complete_request = [
|
||||
{
|
||||
"model": model_path,
|
||||
"messages": messages,
|
||||
**sampling_params,
|
||||
)
|
||||
}
|
||||
for messages in chat_lst
|
||||
for _ in range(n_samples)
|
||||
]
|
||||
|
||||
tasks = [submit_request(server_address, **req) for req in chat_complete_request]
|
||||
results = await asyncio.gather(*tasks)
|
||||
return results
|
||||
|
||||
@ -118,8 +119,10 @@ async def generate(
|
||||
return results
|
||||
|
||||
|
||||
@ray.remote(num_cpus=1)
|
||||
def main_task(config):
|
||||
@hydra.main(config_path="config", config_name="ppo_trainer", version_base=None)
|
||||
def main(config):
|
||||
ray.init(runtime_env={"env_vars": {"TOKENIZERS_PARALLELISM": "true", "NCCL_DEBUG": "WARN", "VLLM_USE_V1": "1"}})
|
||||
|
||||
pprint(OmegaConf.to_container(config, resolve=True)) # resolve=True will eval symbol values
|
||||
OmegaConf.resolve(config)
|
||||
|
||||
@ -136,8 +139,21 @@ def main_task(config):
|
||||
"max_tokens": config.actor_rollout_ref.rollout.response_length,
|
||||
}
|
||||
|
||||
from omegaconf import ListConfig
|
||||
|
||||
train_files = config.data.train_files
|
||||
if not isinstance(train_files, list | ListConfig):
|
||||
train_files = [train_files]
|
||||
|
||||
# read dataset. Note that the dataset should directly contain chat template format (e.g., a list of dictionary)
|
||||
dataset = pd.read_parquet(config.data.train_files)
|
||||
|
||||
datasets = []
|
||||
for train_file in train_files:
|
||||
dataset = pd.read_parquet(train_file)
|
||||
datasets.append(dataset)
|
||||
|
||||
# concat dataset
|
||||
dataset = pd.concat(datasets, axis=0, ignore_index=True)
|
||||
chat_lst = dataset[config.data.prompt_key].tolist()
|
||||
chat_lst = [chat.tolist() for chat in chat_lst]
|
||||
chat_numpy = np.array(chat_lst)
|
||||
@ -151,7 +167,6 @@ def main_task(config):
|
||||
)
|
||||
|
||||
# reshape results into a numpy array
|
||||
|
||||
import itertools
|
||||
|
||||
results = list(itertools.chain.from_iterable(gen_results))
|
||||
@ -170,6 +185,7 @@ def main_task(config):
|
||||
# write to a new parquet
|
||||
output_dir = os.path.dirname(config.data.output_path)
|
||||
makedirs(output_dir, exist_ok=True)
|
||||
print(f"Saving results to {config.data.output_path}")
|
||||
dataset.to_parquet(config.data.output_path)
|
||||
|
||||
|
||||
|
@ -20,7 +20,7 @@ training policy (e.g., FSDP FP32).
|
||||
|
||||
Key Features:
|
||||
1. Three aggregation levels: token, sequence, geometric
|
||||
2. Two handling modes: truncate (TIS), clip (CIS)
|
||||
2. Two handling modes: truncate (TIS), mask (MIS)
|
||||
3. Per-token veto mechanism for catastrophic outliers
|
||||
4. Memory-efficient computation to prevent CUDA OOM
|
||||
5. Comprehensive metrics tracking
|
||||
@ -77,9 +77,9 @@ def compute_rollout_importance_weights(
|
||||
- "geometric": Geometric mean of ratios (experimental)
|
||||
rollout_is_mode: How to handle weights exceeding threshold:
|
||||
- "truncate": Cap weights at upper_threshold only (TIS)
|
||||
- "clip": Zero out weights outside [lower_threshold, upper_threshold] (CIS)
|
||||
- "mask": Zero out weights outside [lower_threshold, upper_threshold] (MIS)
|
||||
rollout_is_threshold: Upper threshold for IS weights
|
||||
rollout_is_threshold_lower: Lower threshold for IS weights (clip mode only; if None, defaults to 1/upper)
|
||||
rollout_is_threshold_lower: Lower threshold for IS weights (mask mode only; if None, defaults to 1/upper)
|
||||
rollout_is_veto_threshold: Per-token veto threshold. If any token ratio < this, zero entire sequence.
|
||||
If None, veto mechanism is disabled.
|
||||
|
||||
@ -179,32 +179,32 @@ def compute_rollout_importance_weights(
|
||||
SAFETY_BOUND=SAFETY_BOUND,
|
||||
)
|
||||
|
||||
# Step 3: Apply truncation or clipping based on mode
|
||||
# Step 3: Apply truncation or masking based on mode
|
||||
if rollout_is_mode == "truncate":
|
||||
# Truncated IS (TIS): only cap upper bound to prevent overweighting
|
||||
rollout_is_weights = rollout_is_weights.clamp(max=upper_threshold)
|
||||
|
||||
elif rollout_is_mode == "clip":
|
||||
# Clipped IS (CIS): zero out weights outside [lower_threshold, upper_threshold]
|
||||
clip_mask = (rollout_is_weights >= lower_threshold) & (rollout_is_weights <= upper_threshold)
|
||||
clip_mask = clip_mask.float()
|
||||
elif rollout_is_mode == "mask":
|
||||
# Masked IS (MIS): zero out weights outside [lower_threshold, upper_threshold]
|
||||
mask = (rollout_is_weights >= lower_threshold) & (rollout_is_weights <= upper_threshold)
|
||||
mask = mask.float()
|
||||
|
||||
# Track CIS-specific metrics
|
||||
metrics["rollout_is_clipped_fraction"] = verl_F.masked_mean(1 - clip_mask, response_mask)
|
||||
# Track MIS-specific metrics
|
||||
metrics["rollout_is_masked_fraction"] = verl_F.masked_mean(1 - mask, response_mask)
|
||||
|
||||
# Sequence-level clipping fraction
|
||||
# Sequence-level masking fraction
|
||||
if rollout_is_level in ["sequence", "geometric"]:
|
||||
# All tokens in a sequence have the same weight, so reuse clip_mask
|
||||
metrics["rollout_is_seq_clipped_fraction"] = (1 - clip_mask[:, 0]).mean()
|
||||
# All tokens in a sequence have the same weight, so reuse mask
|
||||
metrics["rollout_is_seq_masked_fraction"] = (1 - mask[:, 0]).mean()
|
||||
else:
|
||||
# Check if any token in each sequence is clipped
|
||||
seq_has_clipped = verl_F.masked_sum(1 - clip_mask, response_mask, axis=-1) > 0
|
||||
metrics["rollout_is_seq_clipped_fraction"] = seq_has_clipped.float().mean()
|
||||
# Check if any token in each sequence is masked
|
||||
seq_has_masked = verl_F.masked_sum(1 - mask, response_mask, axis=-1) > 0
|
||||
metrics["rollout_is_seq_masked_fraction"] = seq_has_masked.float().mean()
|
||||
|
||||
rollout_is_weights = rollout_is_weights * clip_mask
|
||||
rollout_is_weights = rollout_is_weights * mask
|
||||
|
||||
else:
|
||||
raise ValueError(f"Invalid rollout_is_mode: {rollout_is_mode}. Must be 'truncate' or 'clip'.")
|
||||
raise ValueError(f"Invalid rollout_is_mode: {rollout_is_mode}. Must be 'truncate' or 'mask'.")
|
||||
|
||||
# Apply veto mask AFTER all thresholding
|
||||
# This zeros out entire sequences that have any catastrophic token
|
||||
|
@ -146,7 +146,10 @@ class SFTTrainer:
|
||||
config = self.config
|
||||
tokenizer = self.model_config.tokenizer
|
||||
train_dataset = create_sft_dataset(config.data.train_files, config.data, tokenizer)
|
||||
val_dataset = create_sft_dataset(config.data.val_files, config.data, tokenizer)
|
||||
if config.data.val_files:
|
||||
val_dataset = create_sft_dataset(config.data.val_files, config.data, tokenizer)
|
||||
else:
|
||||
val_dataset = None
|
||||
|
||||
self.train_dataset, self.val_dataset = train_dataset, val_dataset
|
||||
|
||||
@ -181,19 +184,22 @@ class SFTTrainer:
|
||||
pin_memory_device=device_name,
|
||||
)
|
||||
|
||||
self.val_sampler = DistributedSampler(
|
||||
self.val_dataset, shuffle=False, num_replicas=dp_size, rank=dp_rank, drop_last=True
|
||||
)
|
||||
self.val_dataloader = StatefulDataLoader(
|
||||
dataset=self.val_dataset,
|
||||
batch_size=self.train_batch_size_per_dp,
|
||||
sampler=self.val_sampler,
|
||||
collate_fn=self.collate_fn,
|
||||
num_workers=8,
|
||||
pin_memory=True,
|
||||
drop_last=True,
|
||||
pin_memory_device=device_name,
|
||||
)
|
||||
if self.val_dataset:
|
||||
self.val_sampler = DistributedSampler(
|
||||
self.val_dataset, shuffle=False, num_replicas=dp_size, rank=dp_rank, drop_last=True
|
||||
)
|
||||
self.val_dataloader = StatefulDataLoader(
|
||||
dataset=self.val_dataset,
|
||||
batch_size=self.train_batch_size_per_dp,
|
||||
sampler=self.val_sampler,
|
||||
collate_fn=self.collate_fn,
|
||||
num_workers=8,
|
||||
pin_memory=True,
|
||||
drop_last=True,
|
||||
pin_memory_device=device_name,
|
||||
)
|
||||
else:
|
||||
self.val_dataloader = None
|
||||
|
||||
def fit(self):
|
||||
is_logging = self.engine.is_mp_src_rank_with_outputs() and self.engine.get_data_parallel_rank() == 0
|
||||
@ -242,6 +248,7 @@ class SFTTrainer:
|
||||
}
|
||||
|
||||
train_time = 0
|
||||
total_tokens = 0
|
||||
for epoch in range(start_epoch, self.config.trainer.total_epochs):
|
||||
self.train_sampler.set_epoch(epoch=epoch)
|
||||
|
||||
@ -302,6 +309,8 @@ class SFTTrainer:
|
||||
metrics["train/grad_norm"] = metrics.pop("grad_norm")
|
||||
metrics["train/lr"] = lr
|
||||
metrics["train/global_tokens"] = output_tensor.sum().item()
|
||||
total_tokens += metrics["train/global_tokens"]
|
||||
metrics["train/total_tokens(B)"] = total_tokens / 1e9
|
||||
# mfu
|
||||
delta_time = timer.last
|
||||
estimated_flops, promised_flops = self.flops_counter.estimate_flops(batch_seqlens, delta_time)
|
||||
@ -315,7 +324,7 @@ class SFTTrainer:
|
||||
is_save_step = global_step % self.save_freq == 0
|
||||
|
||||
# early exit or validation step
|
||||
if is_last_step or (self.test_freq > 0 and is_valid_step):
|
||||
if is_last_step and self.val_dataloader is not None or (self.test_freq > 0 and is_valid_step):
|
||||
# Perform validation
|
||||
val_losses = []
|
||||
for val_data in self.val_dataloader:
|
||||
|
@ -182,7 +182,8 @@ def find_latest_ckpt_path(path, directory_format="global_step_{}"):
|
||||
|
||||
tracker_file = get_checkpoint_tracker_filename(path)
|
||||
if not os.path.exists(tracker_file):
|
||||
print(f"Checkpoint tracker file does not exist: {tracker_file}")
|
||||
if not torch.distributed.is_initialized() or torch.distributed.get_rank() == 0:
|
||||
print(f"Checkpoint tracker file does not exist: {tracker_file}")
|
||||
return None
|
||||
|
||||
with open(tracker_file, "rb") as f:
|
||||
|
@ -1 +1 @@
|
||||
0.5.0.dev
|
||||
0.7.0.dev
|
||||
|
Reference in New Issue
Block a user