[model, megatron] feat: Add glm air support and make new model directly use mbridge (#3359)

### What does this PR do?

[model, megatron] feat: Add glm air support and make new model directly
use mbridge.

### Checklist Before Starting

- [ ] Search for similar PRs. Paste at least one query link here: ...
- [ ] Format the PR title as `[{modules}] {type}: {description}` (This
will be checked by the CI)
- `{modules}` include `fsdp`, `megatron`, `sglang`, `vllm`, `rollout`,
`trainer`, `ci`, `training_utils`, `recipe`, `hardware`, `deployment`,
`ray`, `worker`, `single_controller`, `misc`, `perf`, `model`, `algo`,
`env`, `tool`, `ckpt`, `doc`, `data`
- If this PR involves multiple modules, separate them with `,` like
`[megatron, fsdp, doc]`
  - `{type}` is in `feat`, `fix`, `refactor`, `chore`, `test`
- If this PR breaks any API (CLI arguments, config, function signature,
etc.), add `[BREAKING]` to the beginning of the title.
  - Example: `[BREAKING][fsdp, megatron] feat: dynamic batching`

### Test

> For changes that can not be tested by CI (e.g., algorithm
implementation, new model support), validate by experiment(s) and show
results like training curve plots, evaluation results, etc.

### API and Usage Example

> Demonstrate how the API changes if any, and provide usage example(s)
if possible.

```python
# Add code snippet or script demonstrating how to use this
```

### Design & Code Changes

> Demonstrate the high-level design if this PR is complex, and list the
specific changes.

### Checklist Before Submitting

> [!IMPORTANT]
> Please check all the following items before requesting a review,
otherwise the reviewer might deprioritize this PR for review.

- [ ] Read the [Contribute
Guide](https://github.com/volcengine/verl/blob/main/CONTRIBUTING.md).
- [ ] Apply [pre-commit
checks](https://github.com/volcengine/verl/blob/main/CONTRIBUTING.md#code-linting-and-formatting):
`pre-commit install && pre-commit run --all-files --show-diff-on-failure
--color=always`
- [ ] Add / Update [the
documentation](https://github.com/volcengine/verl/tree/main/docs).
- [ ] Add unit or end-to-end test(s) to [the CI
workflow](https://github.com/volcengine/verl/tree/main/.github/workflows)
to cover all the code. If not feasible, explain why: ...
- [ ] Once your PR is ready for CI, send a message in [the `ci-request`
channel](https://verl-project.slack.com/archives/C091TCESWB1) in [the
`verl` Slack
workspace](https://join.slack.com/t/verl-project/shared_invite/zt-3855yhg8g-CTkqXu~hKojPCmo7k_yXTQ).
(If not accessible, please try [the Feishu group
(飞书群)](https://applink.larkoffice.com/client/chat/chatter/add_by_link?link_token=772jd4f1-cd91-441e-a820-498c6614126a).)

---------

Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
This commit is contained in:
Blue Space
2025-09-08 09:48:50 +08:00
committed by GitHub
parent d26a913f43
commit 6159dee4e9
5 changed files with 261 additions and 45 deletions

View File

@ -0,0 +1,197 @@
#!/usr/bin/env bash
set -xeuo pipefail
NNODES=${NNODES:-8}
NGPUS_PER_NODES=${NGPUS_PER_NODES:-8}
project_name='DAPO'
exp_name='DAPO-GLM-AIR-MATH-megatron'
adv_estimator=grpo
use_kl_in_reward=False
kl_coef=0.0
use_kl_loss=False
kl_loss_coef=0.0
clip_ratio_low=0.2
clip_ratio_high=0.28
max_prompt_length=$((1024 * 2))
max_response_length=$((1024 * 8))
enable_overlong_buffer=True
overlong_buffer_len=$((1024 * 4))
overlong_penalty_factor=1.0
loss_agg_mode="token-mean"
train_prompt_bsz=512
n_resp_per_prompt=16
train_prompt_mini_bsz=128
train_ppo_micro_batch_size_per_gpu=2
infer_ppo_micro_batch_size_per_gpu=2
# Paths
MODEL_PATH=/models/zai-org/GLM-4.5-Air-Base
# GLM Base model can use chat_template.jinja from instruct models
cp /models/zai-org/GLM-4.5-Air/chat_template.jinja ${MODEL_PATH}/chat_template.jinja
TRAIN_FILE=/data/dapo/dapo-math-17k.parquet
aime24_test_path=/data/dapo/aime-2024.parquet
# math500_test_path=/data/rlhf/math500/test.parquet
# TEST_FILE="['$math500_test_path', '$aime24_test_path']"
TEST_FILE="['$aime24_test_path']"
# Algorithm
temperature=1.0
top_p=1.0
top_k=-1 # 0 for HF rollout, -1 for vLLM rollout
val_top_p=0.7
# Performance Related Parameter
use_dynamic_bsz=True
actor_ppo_max_token_len=$(((max_prompt_length + max_response_length)))
infer_ppo_max_token_len=$(((max_prompt_length + max_response_length)))
offload=True
COMMON_PP=${COMMON_PP:-2}
COMMON_VPP=${COMMON_VPP:-null}
COMMON_CP=${COMMON_CP:-4}
COMMON_TP=${COMMON_TP:-2}
COMMON_EP=${COMMON_EP:-8}
COMMON_ETP=${COMMON_ETP:-1}
TRAIN_TP=${TRAIN_TP:-$COMMON_TP}
INFER_TP=${INFER_TP:-8}
ACTOR_PP=${ACTOR_PP:-$COMMON_PP}
ACTOR_VPP=${ACTOR_VPP:-$COMMON_VPP}
ACTOR_CP=${ACTOR_CP:-$COMMON_CP}
ACTOR_TP=${ACTOR_TP:-$TRAIN_TP}
ACTOR_EP=${ACTOR_EP:-$COMMON_EP}
ACTOR_ETP=${ACTOR_ETP:-$COMMON_ETP}
ROLLOUT_TP=${ROLLOUT_TP:-$INFER_TP}
REF_PP=${REF_PP:-$COMMON_PP}
REF_VPP=${REF_VPP:-$COMMON_VPP}
REF_CP=${REF_CP:-$COMMON_CP}
REF_TP=${REF_TP:-$TRAIN_TP}
REF_EP=${REF_EP:-$COMMON_EP}
REF_ETP=${REF_ETP:-$COMMON_ETP}
CRITIC_PP=${CRITIC_PP:-$COMMON_PP}
CRITIC_VPP=${CRITIC_VPP:-$COMMON_VPP}
CRITIC_CP=${CRITIC_CP:-$COMMON_CP}
CRITIC_TP=${CRITIC_TP:-$TRAIN_TP}
CRITIC_EP=${CRITIC_EP:-$COMMON_EP}
CRITIC_ETP=${CRITIC_ETP:-$COMMON_ETP}
RM_PP=${RM_PP:-$COMMON_PP}
RM_VPP=${RM_VPP:-$COMMON_VPP}
RM_CP=${RM_CP:-$COMMON_CP}
RM_TP=${RM_TP:-$TRAIN_TP}
RM_EP=${RM_EP:-$COMMON_EP}
RM_ETP=${RM_ETP:-$COMMON_ETP}
USE_MBRIDGE=True
USE_DIST_CKPT=False
# Install the latest mbridge
# pip install --no-cache-dir git+https://github.com/ISEEKYAN/mbridge.git
python3 -m verl.trainer.main_ppo --config-path=./config --config-name='ppo_megatron_trainer'\
data.train_files="${TRAIN_FILE}" \
data.val_files="${TEST_FILE}" \
data.prompt_key=prompt \
data.truncation='left' \
data.max_prompt_length=${max_prompt_length} \
data.max_response_length=${max_response_length} \
data.train_batch_size=${train_prompt_bsz} \
actor_rollout_ref.rollout.n=${n_resp_per_prompt} \
algorithm.adv_estimator=${adv_estimator} \
algorithm.use_kl_in_reward=${use_kl_in_reward} \
algorithm.kl_ctrl.kl_coef=${kl_coef} \
actor_rollout_ref.model.path="${MODEL_PATH}" \
actor_rollout_ref.actor.use_kl_loss=${use_kl_loss} \
actor_rollout_ref.actor.kl_loss_coef=${kl_loss_coef} \
actor_rollout_ref.actor.clip_ratio_low=${clip_ratio_low} \
actor_rollout_ref.actor.clip_ratio_high=${clip_ratio_high} \
actor_rollout_ref.actor.clip_ratio_c=10.0 \
+actor_rollout_ref.model.override_config.model_config.max_position_embeddings=$((max_prompt_length + max_response_length)) \
actor_rollout_ref.model.use_fused_kernels=True \
actor_rollout_ref.actor.use_dynamic_bsz=${use_dynamic_bsz} \
actor_rollout_ref.actor.ppo_mini_batch_size=${train_prompt_mini_bsz} \
actor_rollout_ref.actor.ppo_micro_batch_size_per_gpu=${train_ppo_micro_batch_size_per_gpu} \
actor_rollout_ref.actor.ppo_max_token_len_per_gpu=${actor_ppo_max_token_len} \
actor_rollout_ref.actor.optim.lr=1e-6 \
actor_rollout_ref.actor.optim.lr_warmup_steps=10 \
actor_rollout_ref.actor.optim.lr_decay_style='constant' \
actor_rollout_ref.actor.optim.weight_decay=0.1 \
actor_rollout_ref.actor.megatron.use_mbridge=$USE_MBRIDGE \
actor_rollout_ref.actor.megatron.use_dist_checkpointing=$USE_DIST_CKPT \
actor_rollout_ref.actor.megatron.param_offload=${offload} \
actor_rollout_ref.actor.megatron.grad_offload=${offload} \
actor_rollout_ref.actor.megatron.optimizer_offload=${offload} \
actor_rollout_ref.actor.megatron.tensor_model_parallel_size=${ACTOR_TP} \
actor_rollout_ref.actor.megatron.pipeline_model_parallel_size=${ACTOR_PP} \
actor_rollout_ref.actor.megatron.virtual_pipeline_model_parallel_size=${ACTOR_VPP} \
actor_rollout_ref.actor.megatron.context_parallel_size=${ACTOR_CP} \
actor_rollout_ref.actor.megatron.expert_model_parallel_size=${ACTOR_EP} \
actor_rollout_ref.actor.megatron.expert_tensor_parallel_size=${ACTOR_ETP} \
actor_rollout_ref.actor.megatron.override_transformer_config.recompute_granularity="selective" \
actor_rollout_ref.actor.megatron.override_transformer_config.recompute_modules=["core_attn","moe_act","layernorm","mlp","moe"] \
+actor_rollout_ref.actor.megatron.override_transformer_config.apply_rope_fusion=True \
+actor_rollout_ref.actor.megatron.override_transformer_config.masked_softmax_fusion=True \
+actor_rollout_ref.actor.megatron.override_transformer_config.bias_activation_fusion=True \
+actor_rollout_ref.actor.megatron.override_transformer_config.bias_dropout_fusion=True \
+actor_rollout_ref.actor.megatron.override_transformer_config.gradient_accumulation_fusion=True \
+actor_rollout_ref.actor.megatron.override_transformer_config.deallocate_pipeline_outputs=True \
+actor_rollout_ref.actor.megatron.override_transformer_config.persist_layer_norm=True \
+actor_rollout_ref.actor.megatron.override_transformer_config.moe_grouped_gemm=True \
+actor_rollout_ref.actor.megatron.override_transformer_config.moe_permute_fusion=True \
+actor_rollout_ref.actor.megatron.override_transformer_config.moe_shared_expert_overlap=False \
+actor_rollout_ref.actor.megatron.override_transformer_config.moe_token_dispatcher_type="flex" \
+actor_rollout_ref.actor.megatron.override_transformer_config.moe_router_dtype=fp32 \
+actor_rollout_ref.actor.megatron.override_transformer_config.moe_enable_deepep=False \
actor_rollout_ref.actor.entropy_coeff=0 \
actor_rollout_ref.actor.loss_agg_mode=${loss_agg_mode} \
actor_rollout_ref.rollout.name='vllm' \
actor_rollout_ref.rollout.log_prob_micro_batch_size_per_gpu=${infer_ppo_micro_batch_size_per_gpu} \
actor_rollout_ref.rollout.log_prob_max_token_len_per_gpu=${infer_ppo_max_token_len} \
actor_rollout_ref.rollout.gpu_memory_utilization=0.5 \
actor_rollout_ref.rollout.tensor_model_parallel_size=${INFER_TP} \
actor_rollout_ref.rollout.enable_chunked_prefill=True \
actor_rollout_ref.rollout.max_num_batched_tokens=$((max_prompt_length + max_response_length)) \
actor_rollout_ref.rollout.temperature=${temperature} \
actor_rollout_ref.rollout.top_p=${top_p} \
actor_rollout_ref.rollout.top_k=${top_k} \
actor_rollout_ref.rollout.val_kwargs.temperature=${temperature} \
actor_rollout_ref.rollout.val_kwargs.top_p=${val_top_p} \
actor_rollout_ref.rollout.val_kwargs.top_k=${top_k} \
actor_rollout_ref.rollout.val_kwargs.do_sample=True \
actor_rollout_ref.rollout.val_kwargs.n=1 \
actor_rollout_ref.ref.log_prob_micro_batch_size_per_gpu=${infer_ppo_micro_batch_size_per_gpu} \
actor_rollout_ref.ref.log_prob_max_token_len_per_gpu=${infer_ppo_max_token_len} \
actor_rollout_ref.ref.megatron.use_dist_checkpointing=True \
actor_rollout_ref.ref.megatron.param_offload=${offload} \
actor_rollout_ref.ref.megatron.tensor_model_parallel_size=${REF_TP} \
actor_rollout_ref.ref.megatron.pipeline_model_parallel_size=${REF_PP} \
actor_rollout_ref.ref.megatron.virtual_pipeline_model_parallel_size=${REF_VPP} \
actor_rollout_ref.ref.megatron.context_parallel_size=${REF_CP} \
actor_rollout_ref.ref.megatron.expert_model_parallel_size=${REF_EP} \
actor_rollout_ref.ref.megatron.expert_tensor_parallel_size=${REF_ETP} \
reward_model.reward_manager=dapo \
+reward_model.reward_kwargs.overlong_buffer_cfg.enable=${enable_overlong_buffer} \
+reward_model.reward_kwargs.overlong_buffer_cfg.len=${overlong_buffer_len} \
+reward_model.reward_kwargs.overlong_buffer_cfg.penalty_factor=${overlong_penalty_factor} \
+reward_model.reward_kwargs.overlong_buffer_cfg.log=False \
+reward_model.reward_kwargs.max_resp_len=${max_response_length} \
trainer.logger=['console','wandb'] \
trainer.project_name="${project_name}" \
trainer.experiment_name="${exp_name}" \
trainer.n_gpus_per_node="${NGPUS_PER_NODES}" \
trainer.nnodes="${NNODES}" \
trainer.val_before_train=False \
trainer.test_freq=10 \
trainer.save_freq=100 \
trainer.total_epochs=10 \
trainer.resume_mode=auto \
trainer.log_val_generations=10

View File

@ -69,7 +69,8 @@ class SupportedModel(Enum):
QWEN2_5_VL = "Qwen2_5_VLForConditionalGeneration" # not supported
LLAMA4 = "Llama4ForConditionalGeneration" # not tested
QWEN3 = "Qwen3ForCausalLM" # tested
QWEN3_MOE = "Qwen3MoeForCausalLM" # not tested
QWEN3_MOE = "Qwen3MoeForCausalLM" # tested
GLM4_MOE = "Glm4MoeForCausalLM"
# Registry for model configuration converters
@ -113,6 +114,7 @@ MODEL_FORWARD_REGISTRY: dict[SupportedModel, Callable] = {
SupportedModel.QWEN3_MOE: gptmodel_forward,
SupportedModel.QWEN2_5_VL: gptmodel_forward_qwen2_5_vl,
SupportedModel.DEEPSEEK_V3: gptmodel_forward,
SupportedModel.GLM4_MOE: gptmodel_forward,
}
# Registry for model forward functions
@ -128,6 +130,7 @@ MODEL_FORWARD_FUSED_REGISTRY: dict[SupportedModel, Callable] = {
SupportedModel.QWEN3_MOE: fused_forward_gptmodel,
SupportedModel.QWEN2_5_VL: fused_forward_qwen2_5_vl,
SupportedModel.DEEPSEEK_V3: fused_forward_gptmodel,
SupportedModel.GLM4_MOE: fused_forward_gptmodel,
}
# Registry for model weight converters

View File

@ -146,7 +146,9 @@ class MegatronCheckpointManager(BaseCheckpointManager):
self.use_dist_checkpointing = use_dist_checkpointing or not self.bridge or self.is_value_model
self.use_hf_checkpoint = not self.use_dist_checkpointing
self.weight_saver = get_weight_saver(self.arch)
self.weight_saver = None
if self.bridge is None:
self.weight_saver = get_weight_saver(self.arch)
def get_rng_state(self, use_dist_ckpt: bool = True, data_parallel_random_init: bool = False):
"""collect rng state across data parallel ranks"""
@ -476,54 +478,61 @@ class MegatronCheckpointManager(BaseCheckpointManager):
if self.should_save_hf_model and not self.use_hf_checkpoint:
# wait for everyone to dump to local
state_dict = self.weight_saver(
self.model,
self.hf_config,
dtype=self.param_dtype,
is_value_model=self.is_value_model,
tie_word_embeddings=self.share_embeddings_and_output_weights,
)
torch.distributed.barrier()
if self.rank == 0:
if self.bridge is not None:
hf_model_ckpt_path = get_hf_model_checkpoint_path(local_path)
import warnings
from accelerate import init_empty_weights
with init_empty_weights(), warnings.catch_warnings():
warnings.simplefilter("ignore")
if "mistral7b-rm" in self.config.model.path:
from transformers import MistralForSequenceClassification
model = MistralForSequenceClassification.from_pretrained(
self.config.model.path
) # use score head instead of lm_head
state_dict["score.weight"] = state_dict["score.weight"]
else:
from transformers import AutoModelForCausalLM
model = AutoModelForCausalLM.from_pretrained(self.config.model.path, torch_dtype="auto")
model.save_pretrained(hf_model_ckpt_path, state_dict=state_dict)
log_with_rank(
f"Saved Huggingface config and tokenizer to {hf_model_ckpt_path}",
rank=self.rank,
logger=logger,
log_only_rank_0=True,
self.bridge.save_weights(self.model, hf_model_ckpt_path)
else:
state_dict = self.weight_saver(
self.model,
self.hf_config,
dtype=self.param_dtype,
is_value_model=self.is_value_model,
tie_word_embeddings=self.share_embeddings_and_output_weights,
)
if hdfs_path is not None:
log_with_rank(
f"Uploading checkpoint to {hdfs_path}", rank=self.rank, logger=logger, log_only_rank_0=True
)
from verl.utils import hdfs_io
torch.distributed.barrier()
if self.rank == 0:
hf_model_ckpt_path = get_hf_model_checkpoint_path(local_path)
import warnings
hdfs_io.makedirs(hdfs_path, exist_ok=True)
hdfs_io.copy(src=hf_model_ckpt_path, dst=hdfs_path, dirs_exist_ok=True)
from accelerate import init_empty_weights
with init_empty_weights(), warnings.catch_warnings():
warnings.simplefilter("ignore")
if "mistral7b-rm" in self.config.model.path:
from transformers import MistralForSequenceClassification
model = MistralForSequenceClassification.from_pretrained(
self.config.model.path
) # use score head instead of lm_head
state_dict["score.weight"] = state_dict["score.weight"]
else:
from transformers import AutoModelForCausalLM
model = AutoModelForCausalLM.from_pretrained(self.config.model.path, torch_dtype="auto")
model.save_pretrained(hf_model_ckpt_path, state_dict=state_dict)
log_with_rank(
f"HDFS checkpoint uploaded to {hdfs_path}", rank=self.rank, logger=logger, log_only_rank_0=True
f"Saved Huggingface config and tokenizer to {hf_model_ckpt_path}",
rank=self.rank,
logger=logger,
log_only_rank_0=True,
)
if hdfs_path is not None:
log_with_rank(
f"Uploading checkpoint to {hdfs_path}", rank=self.rank, logger=logger, log_only_rank_0=True
)
from verl.utils import hdfs_io
hdfs_io.makedirs(hdfs_path, exist_ok=True)
hdfs_io.copy(src=hf_model_ckpt_path, dst=hdfs_path, dirs_exist_ok=True)
log_with_rank(
f"HDFS checkpoint uploaded to {hdfs_path}",
rank=self.rank,
logger=logger,
log_only_rank_0=True,
)
def finalize_save_fn():
# Rank 0 uploads checkpoint to HDFS if hdfs_path is provided
log_with_rank(

View File

@ -275,6 +275,11 @@ class RLHFDataset(Dataset):
row_dict["multi_modal_inputs"].pop("second_per_grid_ts", None)
else:
if self.apply_chat_template_kwargs.get("chat_template") is None:
assert hasattr(self.tokenizer, "chat_template"), (
"chat_template should be provided in apply_chat_template_kwargs or tokenizer config, "
"models like GLM can copy chat_template.jinja from instruct models"
)
raw_prompt = self.tokenizer.apply_chat_template(
messages, add_generation_prompt=True, tokenize=False, **self.apply_chat_template_kwargs
)

View File

@ -149,7 +149,6 @@ class MegatronWorker(Worker):
self.architectures = getattr(hf_config, "architectures", None)
if self.rank == 0:
print(f"Model config after override: {hf_config}")
tf_config = hf_to_mcore_config(hf_config, dtype, **override_transformer_config)
if use_mbridge:
from verl.models.mcore.mbridge import AutoBridge
@ -159,6 +158,7 @@ class MegatronWorker(Worker):
tf_config = bridge.config
self.bridge = bridge
else:
tf_config = hf_to_mcore_config(hf_config, dtype, **override_transformer_config)
self.bridge = None
print(f"TF config: {tf_config}")
@ -433,7 +433,9 @@ class ActorRolloutRefWorker(MegatronWorker, DistProfilerExtension):
"qkv_layer_name": "self_attention.linear_qkv.",
"gate_proj_layer_name": "linear_fc1.",
}
self.weight_converter = get_mcore_weight_converter(self.actor_model_config, self.dtype)
self.weight_converter = None
if not self.bridge:
self.weight_converter = get_mcore_weight_converter(self.actor_model_config, self.dtype)
# 5. switch to trainer mode
# NOTE: It's critical that hybrid engine in trainer mode initially to load checkpoint.