[model] feat: polish model engine (#3321)

This commit is contained in:
Chi Zhang
2025-09-03 20:44:39 +08:00
committed by GitHub
parent 1f533d65e2
commit d7a0469977
28 changed files with 654 additions and 611 deletions

View File

@ -195,4 +195,4 @@ jobs:
- name: Running mcore engine tests on 8 L20 GPUs
run: |
pytest -s -x tests/models/test_megatron_engine.py
pytest -s -x tests/models/test_engine.py

View File

@ -129,12 +129,12 @@ actor_rollout_ref:
param_offload: False
tensor_model_parallel_size: 1
expert_model_parallel_size: 1
expert_tensor_parallel_size: None
expert_tensor_parallel_size: null
pipeline_model_parallel_size: 1
virtual_pipeline_model_parallel_size: null # change VPP interface for parallelism tests
context_parallel_size: 1
sequence_parallel: True
use_distributed_optimizer: False
use_distributed_optimizer: True
use_dist_checkpointing: False
dist_checkpointing_path: null
seed: ${actor_rollout_ref.actor.megatron.seed}
@ -271,7 +271,7 @@ critic:
lr_warmup_steps: null # Prioritized. None, 0 or Negative values mean delegating to lr_warmup_steps_ratio.
lr_warmup_steps_ratio: 0. # the total steps will be injected during runtime
lr_decay_steps: null
lr_decay_style: linear # select from constant/linear/cosine/inverse_square_root
lr_decay_style: constant # select from constant/linear/cosine/inverse_square_root
min_lr: 0.0 # minimum learning rate, default to 0.0
weight_decay: 0.01
weight_decay_incr_style: constant # select from constant/linear/cosine

View File

@ -601,7 +601,7 @@ critic:
lr_warmup_steps_ratio: 0.
# Minimum LR ratio for cosine schedule
min_lr_ratio: null
min_lr_ratio: 0.0
# LR warmup style: "constant" or "cosine"
warmup_style: constant

View File

@ -5,6 +5,53 @@
actor_rollout_ref:
actor:
optim:
_target_: verl.workers.config.McoreOptimizerConfig
lr: 1.0e-06
lr_warmup_steps_ratio: 0.0
total_training_steps: -1
weight_decay: 0.01
lr_warmup_steps: -1
betas:
- 0.9
- 0.999
clip_grad: 1.0
optimizer: adam
lr_warmup_init: 0.0
lr_decay_steps: null
lr_decay_style: constant
min_lr: 0.0
weight_decay_incr_style: constant
lr_wsd_decay_style: exponential
lr_wsd_decay_steps: null
use_checkpoint_opt_param_scheduler: false
override_optimizer_config: {}
megatron:
_target_: verl.workers.config.McoreEngineConfig
param_offload: false
grad_offload: false
optimizer_offload: false
tensor_model_parallel_size: 1
expert_model_parallel_size: 1
expert_tensor_parallel_size: null
pipeline_model_parallel_size: 1
virtual_pipeline_model_parallel_size: null
context_parallel_size: 1
sequence_parallel: true
use_distributed_optimizer: true
use_dist_checkpointing: false
dist_checkpointing_path: null
seed: 42
override_ddp_config: {}
override_transformer_config:
recompute_granularity: null
recompute_modules:
- core_attn
recompute_method: null
recompute_num_layers: null
override_mcore_model_config: {}
use_mbridge: false
forward_only: false
_target_: verl.workers.config.McoreActorConfig
strategy: megatron
ppo_mini_batch_size: 256
@ -41,24 +88,6 @@ actor_rollout_ref:
- extra
load_contents: ${.save_contents}
async_save: false
optim:
lr: 1.0e-06
lr_warmup_steps_ratio: 0.0
total_training_steps: -1
weight_decay: 0.01
lr_warmup_steps: -1
_target_: verl.workers.config.McoreOptimizerConfig
optimizer: adam
clip_grad: 1.0
lr_warmup_init: 0.0
lr_decay_steps: null
lr_decay_style: constant
min_lr: 0.0
weight_decay_incr_style: constant
lr_wsd_decay_style: exponential
lr_wsd_decay_steps: null
use_checkpoint_opt_param_scheduler: false
override_optimizer_config: {}
use_fused_kernels: ${oc.select:actor_rollout_ref.model.use_fused_kernels,false}
profiler:
_target_: verl.utils.profiler.ProfilerConfig
@ -87,30 +116,6 @@ actor_rollout_ref:
stack_depth: ${oc.select:global_profiler.global_tool_config.torch_memory.stack_depth,32}
data_loader_seed: null
load_weight: true
megatron:
_target_: verl.workers.config.McoreEngineConfig
param_offload: false
grad_offload: false
optimizer_offload: false
tensor_model_parallel_size: 1
expert_model_parallel_size: 1
expert_tensor_parallel_size: null
pipeline_model_parallel_size: 1
virtual_pipeline_model_parallel_size: null
context_parallel_size: 1
sequence_parallel: true
use_distributed_optimizer: true
use_dist_checkpointing: false
dist_checkpointing_path: null
seed: 42
override_ddp_config: {}
override_transformer_config:
recompute_granularity: null
recompute_modules:
- core_attn
recompute_method: null
recompute_num_layers: null
use_mbridge: false
ref:
strategy: megatron
use_torch_compile: ${oc.select:actor_rollout_ref.actor.use_torch_compile,true}
@ -146,19 +151,24 @@ actor_rollout_ref:
megatron:
_target_: verl.workers.config.MegatronEngineConfig
param_offload: false
grad_offload: false
optimizer_offload: false
tensor_model_parallel_size: 1
expert_model_parallel_size: 1
expert_tensor_parallel_size: None
expert_tensor_parallel_size: null
pipeline_model_parallel_size: 1
virtual_pipeline_model_parallel_size: null
context_parallel_size: 1
sequence_parallel: true
use_distributed_optimizer: false
use_distributed_optimizer: true
use_dist_checkpointing: false
dist_checkpointing_path: null
seed: ${oc.select:actor_rollout_ref.actor.megatron.seed,42}
override_ddp_config: {}
override_transformer_config: ${oc.select:actor_rollout_ref.actor.megatron.override_transformer_config,{}}
override_mcore_model_config: {}
use_mbridge: ${oc.select:actor_rollout_ref.actor.megatron.use_mbridge,False}
forward_only: false
load_weight: true
rollout:
_target_: verl.workers.config.RolloutConfig
@ -287,28 +297,57 @@ data:
name: null
apply_chat_template_kwargs: {}
critic:
_target_: verl.workers.config.McoreCriticConfig
rollout_n: ${oc.select:actor_rollout_ref.rollout.n,1}
strategy: megatron
enable: null
optim:
_target_: verl.workers.config.McoreOptimizerConfig
lr: 1.0e-05
lr_warmup_steps_ratio: 0.0
total_training_steps: -1
weight_decay: 0.01
lr_warmup_steps: -1
_target_: verl.workers.config.McoreOptimizerConfig
optimizer: adam
betas:
- 0.9
- 0.999
clip_grad: 1.0
optimizer: adam
lr_warmup_init: 0.0
lr_decay_steps: null
lr_decay_style: linear
lr_decay_style: constant
min_lr: 0.0
weight_decay_incr_style: constant
lr_wsd_decay_style: exponential
lr_wsd_decay_steps: null
use_checkpoint_opt_param_scheduler: false
override_optimizer_config: {}
megatron:
_target_: verl.workers.config.McoreEngineConfig
param_offload: false
grad_offload: false
optimizer_offload: false
tensor_model_parallel_size: 1
expert_model_parallel_size: 1
expert_tensor_parallel_size: null
pipeline_model_parallel_size: 1
virtual_pipeline_model_parallel_size: null
context_parallel_size: 1
sequence_parallel: true
use_distributed_optimizer: true
use_dist_checkpointing: false
dist_checkpointing_path: null
seed: 42
override_ddp_config: {}
override_transformer_config:
recompute_granularity: null
recompute_modules:
- core_attn
recompute_method: null
recompute_num_layers: null
override_mcore_model_config: {}
use_mbridge: false
forward_only: false
_target_: verl.workers.config.McoreCriticConfig
rollout_n: ${oc.select:actor_rollout_ref.rollout.n,1}
strategy: megatron
enable: null
model:
path: ~/models/deepseek-llm-7b-chat
tokenizer_path: ${oc.select:actor_rollout_ref.model.path,"~/models/deepseek-llm-7b-chat"}
@ -363,25 +402,6 @@ critic:
trace_alloc_max_entries: ${oc.select:global_profiler.global_tool_config.torch_memory.trace_alloc_max_entries,100000}
stack_depth: ${oc.select:global_profiler.global_tool_config.torch_memory.stack_depth,32}
nccl_timeout: 600
megatron:
_target_: verl.workers.config.McoreEngineConfig
param_offload: false
grad_offload: false
optimizer_offload: false
tensor_model_parallel_size: 1
expert_model_parallel_size: 1
expert_tensor_parallel_size: null
pipeline_model_parallel_size: 1
virtual_pipeline_model_parallel_size: null
context_parallel_size: 1
sequence_parallel: true
use_distributed_optimizer: true
use_dist_checkpointing: false
dist_checkpointing_path: null
seed: ${oc.select:actor_rollout_ref.actor.megatron.seed,42}
override_ddp_config: ${oc.select:actor_rollout_ref.actor.megatron.override_ddp_config,{}}
override_transformer_config: ${oc.select:actor_rollout_ref.actor.megatron.override_transformer_config,{}}
use_mbridge: ${oc.select:actor_rollout_ref.actor.megatron.use_mbridge,False}
load_weight: true
data_loader_seed: ${oc.select:actor_rollout_ref.actor.data_loader_seed,null}
reward_model:

View File

@ -5,6 +5,38 @@
actor_rollout_ref:
actor:
optim:
_target_: verl.workers.config.FSDPOptimizerConfig
lr: 1.0e-06
lr_warmup_steps_ratio: 0.0
total_training_steps: -1
weight_decay: 0.01
lr_warmup_steps: -1
betas:
- 0.9
- 0.999
clip_grad: 1.0
min_lr_ratio: 0.0
num_cycles: 0.5
warmup_style: constant
fsdp_config:
_target_: verl.workers.config.FSDPEngineConfig
wrap_policy:
min_num_params: 0
param_offload: false
optimizer_offload: false
offload_policy: false
reshard_after_forward: true
fsdp_size: -1
forward_prefetch: false
model_dtype: fp32
use_orig_params: false
ulysses_sequence_parallel_size: 1
entropy_from_logits_with_chunking: false
use_torch_compile: true
entropy_checkpointing: false
forward_only: false
strategy: fsdp
_target_: verl.workers.config.FSDPActorConfig
strategy: fsdp
ppo_mini_batch_size: 256
@ -41,16 +73,6 @@ actor_rollout_ref:
- extra
load_contents: ${.save_contents}
async_save: false
optim:
lr: 1.0e-06
lr_warmup_steps_ratio: 0.0
total_training_steps: -1
weight_decay: 0.01
lr_warmup_steps: -1
_target_: verl.workers.config.FSDPOptimizerConfig
min_lr_ratio: 0.0
num_cycles: 0.5
warmup_style: constant
use_fused_kernels: ${oc.select:actor_rollout_ref.model.use_fused_kernels,false}
profiler:
_target_: verl.utils.profiler.ProfilerConfig
@ -81,16 +103,6 @@ actor_rollout_ref:
ulysses_sequence_parallel_size: 1
entropy_from_logits_with_chunking: false
entropy_checkpointing: false
fsdp_config:
_target_: verl.workers.config.FSDPEngineConfig
wrap_policy:
min_num_params: 0
param_offload: false
optimizer_offload: false
offload_policy: false
reshard_after_forward: true
fsdp_size: -1
forward_prefetch: false
use_remove_padding: ${oc.select:actor_rollout_ref.model.use_remove_padding,false}
ref:
strategy: ${actor_rollout_ref.actor.strategy}
@ -124,14 +136,25 @@ actor_rollout_ref:
_target_: verl.utils.profiler.config.TorchMemoryToolConfig
trace_alloc_max_entries: ${oc.select:global_profiler.global_tool_config.torch_memory.trace_alloc_max_entries,100000}
stack_depth: ${oc.select:global_profiler.global_tool_config.torch_memory.stack_depth,32}
model: null
fsdp_config:
_target_: verl.workers.config.FSDPEngineConfig
wrap_policy:
min_num_params: 0
param_offload: false
optimizer_offload: false
offload_policy: false
reshard_after_forward: true
fsdp_size: -1
forward_prefetch: false
model_dtype: fp32
use_orig_params: false
ulysses_sequence_parallel_size: 1
entropy_from_logits_with_chunking: false
use_torch_compile: true
entropy_checkpointing: false
forward_only: false
strategy: fsdp
model: null
ulysses_sequence_parallel_size: ${oc.select:actor_rollout_ref.actor.ulysses_sequence_parallel_size,1}
entropy_from_logits_with_chunking: false
entropy_checkpointing: false
@ -213,12 +236,14 @@ actor_rollout_ref:
enable_chunked_prefill: true
load_format: dummy_dtensor
layered_summon: false
hybrid_engine: true
nccl_timeout: 600
model:
_target_: verl.workers.config.HFModelConfig
path: ~/models/deepseek-llm-7b-chat
custom_chat_template: null
hf_config_path: null
tokenizer_path: null
use_shm: false
trust_remote_code: false
custom_chat_template: null
external_lib: null
override_config: {}
enable_gradient_checkpointing: true
@ -232,7 +257,8 @@ actor_rollout_ref:
use_fused_kernels: false
fused_kernel_options:
impl_backend: torch
trust_remote_code: false
hybrid_engine: true
nccl_timeout: 600
data:
tokenizer: null
use_shm: false
@ -268,20 +294,39 @@ data:
name: null
apply_chat_template_kwargs: {}
critic:
_target_: verl.workers.config.FSDPCriticConfig
rollout_n: ${oc.select:actor_rollout_ref.rollout.n,1}
strategy: fsdp
enable: null
optim:
_target_: verl.workers.config.FSDPOptimizerConfig
lr: 1.0e-05
lr_warmup_steps_ratio: 0.0
total_training_steps: -1
weight_decay: 0.01
lr_warmup_steps: -1
_target_: verl.workers.config.FSDPOptimizerConfig
min_lr_ratio: null
betas:
- 0.9
- 0.999
clip_grad: 1.0
min_lr_ratio: 0.0
num_cycles: 0.5
warmup_style: constant
model:
fsdp_config:
_target_: verl.workers.config.FSDPEngineConfig
wrap_policy:
min_num_params: 0
param_offload: false
optimizer_offload: false
offload_policy: false
reshard_after_forward: true
fsdp_size: -1
forward_prefetch: false
model_dtype: fp32
use_orig_params: false
ulysses_sequence_parallel_size: 1
entropy_from_logits_with_chunking: false
use_torch_compile: true
entropy_checkpointing: false
forward_only: false
strategy: fsdp
path: ~/models/deepseek-llm-7b-chat
tokenizer_path: ${oc.select:actor_rollout_ref.model.path,"~/models/deepseek-llm-7b-chat"}
override_config: {}
@ -292,19 +337,13 @@ critic:
enable_gradient_checkpointing: true
enable_activation_offload: false
use_remove_padding: false
fsdp_config:
_target_: verl.workers.config.FSDPEngineConfig
param_offload: false
optimizer_offload: false
offload_policy: false
reshard_after_forward: true
wrap_policy:
min_num_params: 0
fsdp_size: -1
forward_prefetch: false
lora_rank: 0
lora_alpha: 16
target_modules: all-linear
_target_: verl.workers.config.FSDPCriticConfig
rollout_n: ${oc.select:actor_rollout_ref.rollout.n,1}
strategy: fsdp
enable: null
ppo_mini_batch_size: ${oc.select:actor_rollout_ref.actor.ppo_mini_batch_size,256}
ppo_micro_batch_size: null
ppo_micro_batch_size_per_gpu: ${oc.select:.ppo_micro_batch_size,null}

View File

@ -7,6 +7,12 @@
# defaults specify the default config from each component
defaults:
# fsdp optimizer config
- ../optim@optim: fsdp
# fsdp engine config
- ../engine@fsdp_config: fsdp
# dp actor config, inheriting from trainer/config/actor/actor.yaml
- actor
@ -32,51 +38,5 @@ entropy_from_logits_with_chunking: False
# recompute entropy
entropy_checkpointing: False
# optimizer configs
optim:
# Target class for this configuration
_target_: verl.workers.config.FSDPOptimizerConfig
# Minimum LR ratio for cosine schedule
min_lr_ratio: 0.0
# Number of cosine cycles in LR schedule
num_cycles: 0.5
# LR warmup style: "constant" or "cosine"
warmup_style: constant
# configs for FSDP
fsdp_config:
# Target class for this configuration
_target_: verl.workers.config.FSDPEngineConfig
# policy for wrapping the model
wrap_policy:
# Minimum number of parameters to trigger wrapping a layer with FSDP
min_num_params: 0
# Whether to offload model parameters to CPU (trades speed for memory)
param_offload: false
# Whether to offload optimizer state to CPU
optimizer_offload: false
# Only for FSDP2: offload param/grad/optimizer during train
offload_policy: false
# Only for FSDP2: Reshard after forward pass to reduce memory footprint
reshard_after_forward: true
# Number of GPUs in each FSDP shard group; -1 means auto
fsdp_size: -1
# Only for FSDP1: FSDP1 configuration, prefetch the next forward-pass all-gather
# before the current forward computation.
forward_prefetch: False
# Whether to remove padding tokens in inputs during training
use_remove_padding: ${oc.select:actor_rollout_ref.model.use_remove_padding,false}

View File

@ -1,6 +1,13 @@
# megatron actor config, inheriting from trainer/config/actor/actor.yaml
defaults:
# megatron optimizer config
- ../optim@optim: megatron
# megatron engine config
- ../engine@megatron: megatron
- actor
# load the reference default config, then apply the fields in the current yaml
- _self_
@ -11,96 +18,3 @@ strategy: megatron
data_loader_seed: null
load_weight: True
optim:
_target_: verl.workers.config.McoreOptimizerConfig
optimizer: adam
clip_grad: 1.0
# initial learning rate for warmup, default to 0.0
lr_warmup_init: 0.0
lr_decay_steps: null
# select from constant/linear/cosine/inverse_square_root
lr_decay_style: constant
# minimum learning rate, default to 0.0
min_lr: 0.0
# select from constant/linear/cosine
weight_decay_incr_style: constant
# select from constant/exponential/cosine
lr_wsd_decay_style: exponential
lr_wsd_decay_steps: null
# use checkpoint optimizer parameter scheduler
use_checkpoint_opt_param_scheduler: False
override_optimizer_config: {}
megatron:
# Required when using verl.utils.omega_conf_to_dataclass to instantiate dataclass configs
_target_: verl.workers.config.McoreEngineConfig
# Whether to offload model parameters to CPU
param_offload: False
# Whether to offload gradients to CPU
grad_offload: False
# Whether to offload optimizer state to CPU
optimizer_offload: False
tensor_model_parallel_size: 1
expert_model_parallel_size: 1
expert_tensor_parallel_size: null
pipeline_model_parallel_size: 1
virtual_pipeline_model_parallel_size: null
context_parallel_size: 1
sequence_parallel: True
use_distributed_optimizer: True
use_dist_checkpointing: False
dist_checkpointing_path: null
# oc.select: default val for ref.megatron.seed
seed: 42
# Allow to override Distributed Data Parallel (DDP) config
override_ddp_config: {}
# additional transformer config like: num_layers_in_first(/last)_pipeline_stage
# oc.select: default val for ref.megatron.override_transformer_config
override_transformer_config:
# Recompute configuration, same as in megatron.training.arguments
# default use minimal performance-interference recompute methods
# Recompute granualarity, choices: ["full", "selective"]
recompute_granularity: null
# Recompute modules, multiple choices: ["core_attn", "moe_act", "layernorm", "mla_up_proj", "mlp", "moe"]
# Please use correct module in matched model
recompute_modules: ["core_attn"]
# 'uniform', 'block'
# 'uniform' divides the total number of transformer layers and checkpoints the input activation of each chunk
# 'block' checkpoints the specified number of layers per pipeline stage at the specified granularity
recompute_method: null
# 'full' will checkpoint the entire transformer layer and 'selective' only checkpoints memory intensive part of attention
recompute_num_layers: null
# oc.select: default val for ref.megatron.use_mbridge
use_mbridge: False

View File

@ -7,6 +7,12 @@
# defaults specify the default config from each component
defaults:
# fsdp optimizer config
- ../optim@optim: fsdp
# fsdp engine config
- ../engine@model.fsdp_config: fsdp
# dp actor config, inheriting from trainer/config/critic/critic.yaml
- critic
@ -19,18 +25,6 @@ _target_: verl.workers.config.FSDPCriticConfig
# distribution strategy. Options: fsdp (deprecating), fsdp2
strategy: fsdp
# optimizer configs
optim:
# Required when using verl.utils.omega_conf_to_dataclass to instantiate dataclass configs
_target_: verl.workers.config.FSDPOptimizerConfig
# Minimum LR ratio for cosine schedule
min_lr_ratio: null
# LR warmup style: "constant" or "cosine"
warmup_style: constant
# model config for the critic
model:
@ -49,37 +43,6 @@ model:
# Use remove padding optimization (saves compute)
use_remove_padding: False
# FSDP-specific config
fsdp_config:
# Required when using verl.utils.omega_conf_to_dataclass to instantiate dataclass configs
_target_: verl.workers.config.FSDPEngineConfig
# Whether to offload model parameters to CPU
param_offload: False
# Whether to offload optimizer state to CPU
optimizer_offload: False
# Only for FSDP2: offload param/grad/optimizer during train
offload_policy: False
# Only for FSDP2: Reshard after forward pass to reduce memory footprint
reshard_after_forward: True
# Policy for wrapping layers with FSDP
wrap_policy:
# Minimum number of parameters to trigger wrapping
min_num_params: 0
# Number of GPUs in each FSDP shard group; -1 means auto
fsdp_size: -1
# Only for FSDP1: FSDP1 configuration, prefetch the next forward-pass all-gather
# before the current forward computation.
forward_prefetch: False
# Set to positive value to enable LoRA (e.g., 32)
lora_rank: 0

View File

@ -1,6 +1,12 @@
# defaults specify the default config from each component
defaults:
# megatron optimizer config
- ../optim@optim: megatron
# megatron engine config
- ../engine@megatron: megatron
# dp actor config, inheriting from trainer/config/critic/critic.yaml
- critic
@ -15,44 +21,6 @@ strategy: megatron
# seconds, default is 10 minutes for torch, you can set it to a larger value if you have long-running operations like 32B or 72B model using megatron
nccl_timeout: 600
# optimizer configs
optim:
# Required when using verl.utils.omega_conf_to_dataclass to instantiate dataclass configs
_target_: verl.workers.config.McoreOptimizerConfig
# select optimizer, default is Adam
optimizer: adam
# Clip gradients norm
clip_grad: 1.0
# initial learning rate for warmup, default to 0.0
lr_warmup_init: 0.0
lr_decay_steps: null
# select from constant/linear/cosine/inverse_square_root
lr_decay_style: linear
# minimum learning rate, default to 0.0
min_lr: 0.0
# select from constant/linear/cosine
weight_decay_incr_style: constant
# select from constant/exponential/cosine
lr_wsd_decay_style: exponential
# number of steps for weight std decay
lr_wsd_decay_steps: null
# use checkpoint optimizer parameter scheduler
use_checkpoint_opt_param_scheduler: False
# override optimizer config, e.g. enable cpu adam
override_optimizer_config: {}
# model config for the critic
model:
@ -68,63 +36,6 @@ model:
freeze_moe_router: False
# megatron-specific parallelism settings
megatron:
# Required when using verl.utils.omega_conf_to_dataclass to instantiate dataclass configs
_target_: verl.workers.config.McoreEngineConfig
# Whether to offload model parameters to CPU
param_offload: False
# Whether to offload gradients to CPU
grad_offload: False
# Whether to offload optimizer state to CPU
optimizer_offload: False
# size of tensor model parallel group
tensor_model_parallel_size: 1
# size of expert model parallel group
expert_model_parallel_size: 1
# size of expert tensor parallel group
expert_tensor_parallel_size: null
# size of pipeline model parallel group
pipeline_model_parallel_size: 1
# size of virtual pipeline model parallel group
virtual_pipeline_model_parallel_size: null
# size of context parallel group
context_parallel_size: 1
# Whether to use sequence parallelism
sequence_parallel: True
# Whether to use distributed optimizer
use_distributed_optimizer: True
# Whether to use distributed checkpointing
use_dist_checkpointing: False
# Path for distributed checkpointing
dist_checkpointing_path: null
# Random seed for Megatron
seed: ${oc.select:actor_rollout_ref.actor.megatron.seed,42}
# Allow to override Distributed Data Parallel (DDP) config
override_ddp_config: ${oc.select:actor_rollout_ref.actor.megatron.override_ddp_config,{}}
# Transformer config overrides for Megatron
override_transformer_config: ${oc.select:actor_rollout_ref.actor.megatron.override_transformer_config,{}}
# Whether to use mBridge communications
use_mbridge: ${oc.select:actor_rollout_ref.actor.megatron.use_mbridge,False}
# Whether to load initial weights
load_weight: True

View File

@ -0,0 +1,53 @@
# Target class for this configuration
_target_: verl.workers.config.FSDPEngineConfig
# policy for wrapping the model
wrap_policy:
# Minimum number of parameters to trigger wrapping a layer with FSDP
min_num_params: 0
# Whether to offload model parameters to CPU (trades speed for memory)
# Note that this differs from the offload_policy in FSDP
param_offload: false
# Whether to offload optimizer state to CPU
# Note that this differs from the offload_policy in FSDP
optimizer_offload: false
# Only for FSDP2: offload param/grad/optimizer during train
offload_policy: false
# Only for FSDP2: Reshard after forward pass to reduce memory footprint
reshard_after_forward: true
# Number of GPUs in each FSDP shard group; -1 means auto
fsdp_size: -1
# Only for FSDP1: FSDP1 configuration, prefetch the next forward-pass all-gather
# before the current forward computation.
forward_prefetch: False
# model dtype of fsdp
model_dtype: fp32
# Whether to use original parameters in fsdp. Only avaiable in fsdp1
use_orig_params: false
# ulysses sequence parallel size
ulysses_sequence_parallel_size: 1
# Whether to use entropy_from_logits_with_chunking in fsdp.
entropy_from_logits_with_chunking: false
# Whether to use torch compile in fsdp.
use_torch_compile: true
# Whether to use entropy checkpointing in fsdp.
entropy_checkpointing: false
# Whether to use forward only in fsdp.
forward_only: false
# fsdp or fsdp2
strategy: fsdp

View File

@ -0,0 +1,75 @@
# Required when using verl.utils.omega_conf_to_dataclass to instantiate dataclass configs
_target_: verl.workers.config.McoreEngineConfig
# Whether to offload model parameters to CPU
param_offload: False
# Whether to offload gradients to CPU
grad_offload: False
# Whether to offload optimizer state to CPU
optimizer_offload: False
# tensor model parallel size
tensor_model_parallel_size: 1
# expert model parallel size
expert_model_parallel_size: 1
# expert tensor parallel size
expert_tensor_parallel_size: null
# pipeline model parallel size
pipeline_model_parallel_size: 1
# virtual pipeline model parallel size
virtual_pipeline_model_parallel_size: null
# context parallel size
context_parallel_size: 1
# sequence parallel
sequence_parallel: True
# Whether to use distributed optimizer
use_distributed_optimizer: True
# Whether to use distributed checkpointing
use_dist_checkpointing: False
# distributed checkpointing path
dist_checkpointing_path: null
# oc.select: default val for ref.megatron.seed
seed: 42
# Allow to override Distributed Data Parallel (DDP) config
override_ddp_config: {}
# additional transformer config like: num_layers_in_first(/last)_pipeline_stage
# oc.select: default val for ref.megatron.override_transformer_config
override_transformer_config:
# Recompute configuration, same as in megatron.training.arguments
# default use minimal performance-interference recompute methods
# Recompute granualarity, choices: ["full", "selective"]
recompute_granularity: null
# Recompute modules, multiple choices: ["core_attn", "moe_act", "layernorm", "mla_up_proj", "mlp", "moe"]
# Please use correct module in matched model
recompute_modules: ["core_attn"]
# 'uniform', 'block'
# 'uniform' divides the total number of transformer layers and checkpoints the input activation of each chunk
# 'block' checkpoints the specified number of layers per pipeline stage at the specified granularity
recompute_method: null
# 'full' will checkpoint the entire transformer layer and 'selective' only checkpoints memory intensive part of attention
recompute_num_layers: null
override_mcore_model_config: {}
# oc.select: default val for ref.megatron.use_mbridge
use_mbridge: False
# whether to use forward only
forward_only: False

View File

@ -0,0 +1,64 @@
# Format checks enforced on CI:
# 1. Comments must appear above each field.
# 2. There must be a blank line between each field.
# 3. Inline comments (after a field on the same line) are not allowed.
# 4. Indentation level is respected for nested fields.
_target_: verl.workers.config.HFModelConfig
# path to the huggingface model
path: ~/models/deepseek-llm-7b-chat
# config to the huggingface config. In case it is not the same as path
hf_config_path: null
# path to the huggingface tokenizer. In case it is not the same as path
tokenizer_path: null
# whether to use shared memory for model loading
use_shm: False
# whether to trust remote code.
trust_remote_code: False
# custom chat template for the model
custom_chat_template: null
# whether to use external libs for the model
external_lib: null
# override hf config
override_config: {}
# whether to enable gradient checkpointing. Only valid when we use hf model definition
enable_gradient_checkpointing: True
# whether to enable activation offload. Only valid when we use hf model definition
enable_activation_offload: False
# whether to use remove padding. Only valid when we use hf model definition
use_remove_padding: False
# Set to positive value to enable LoRA (e.g., 32)
lora_rank: 0
# LoRA scaling factor
lora_alpha: 16
# Target modules for LoRA adaptation
target_modules: all-linear
# Exclude modules from LoRA adaptation
exclude_modules: null
# whether to use liger. Only valid when we use hf model definition
use_liger: False
# whether to use fused kernels.
use_fused_kernels: False
# fused kernel options.
fused_kernel_options:
# the implementation backend for fused kernels.
impl_backend: torch

View File

@ -0,0 +1,33 @@
# Target class for this configuration
_target_: verl.workers.config.FSDPOptimizerConfig
# Learning rate
lr: 1e-3
# LR warmup steps ratio
lr_warmup_steps_ratio: 0.0
# Total training steps
total_training_steps: -1
# Weight decay
weight_decay: 0.01
# LR warmup steps
lr_warmup_steps: -1
# Betas for Adam optimizer
betas: [0.9, 0.999]
# Clip gradient
clip_grad: 1.0
# Minimum LR ratio for cosine schedule
min_lr_ratio: 0.0
# Number of cosine cycles in LR schedule
num_cycles: 0.5
# LR warmup style: "constant" or "cosine"
warmup_style: constant

View File

@ -0,0 +1,49 @@
_target_: verl.workers.config.McoreOptimizerConfig
# Learning rate
lr: 1e-3
# LR warmup steps ratio
lr_warmup_steps_ratio: 0.0
# Total training steps
total_training_steps: -1
# Weight decay
weight_decay: 0.01
# LR warmup steps
lr_warmup_steps: -1
# Betas for Adam optimizer
betas: [0.9, 0.999]
# Clip gradient
clip_grad: 1.0
# optimizer type
optimizer: adam
# initial learning rate for warmup, default to 0.0
lr_warmup_init: 0.0
lr_decay_steps: null
# select from constant/linear/cosine/inverse_square_root
lr_decay_style: constant
# minimum learning rate, default to 0.0
min_lr: 0.0
# select from constant/linear/cosine
weight_decay_incr_style: constant
# select from constant/exponential/cosine
lr_wsd_decay_style: exponential
lr_wsd_decay_steps: null
# use checkpoint optimizer parameter scheduler
use_checkpoint_opt_param_scheduler: False
override_optimizer_config: {}

View File

@ -21,6 +21,9 @@ defaults:
# Rollout model config.
- rollout@actor_rollout_ref.rollout: rollout
# Model config.
- model@actor_rollout_ref.model: hf_model
# Critic model config.
- critic@critic: dp_critic
@ -40,62 +43,6 @@ actor_rollout_ref:
# Timeout for operations executed against the process group
nccl_timeout: 600
# common configs for the model
model:
# Huggingface model path. This can be either local path or HDFS path.
path: ~/models/deepseek-llm-7b-chat
# Custom chat template for the model.
custom_chat_template: null
# Whether to use shared memory (SHM) for accelerating the loading of model weights
use_shm: false
# Additional Python packages to register huggingface models/tokenizers.
external_lib: null
# Used to override model's original configurations, mainly dropout
override_config: {}
# Enable gradient checkpointing for actor
enable_gradient_checkpointing: true
# Enable activation offloading for actor
enable_activation_offload: false
# Whether to remove padding tokens in inputs during training
use_remove_padding: false
# Set to positive value to enable LoRA (e.g., 32)
lora_rank: 0
# LoRA scaling factor
lora_alpha: 16
# Target modules to apply LoRA. Options: "all-linear" (not recommended for VLMs) or
# [q_proj,k_proj,v_proj,o_proj,gate_proj,up_proj,down_proj]
target_modules: all-linear
# Exclude modules from applying Lora. Similar usage to target_modules and Peft.
# Example: '.*visual.*' for excluding the ViT in Qwen2.5-VL, as currently vllm does not support ViT Lora.
exclude_modules: null
# Whether to use Liger for linear layer fusion
use_liger: false
# Whether to use custom fused kernels (e.g., FlashAttention, fused MLP)
use_fused_kernels: false
# Options for fused kernels. If use_fused_kernels is true, this will be used.
fused_kernel_options:
# Implementation backend for fused kernels. Options: "triton" or "torch".
impl_backend: torch
# Whether to enable loading a remote code model
trust_remote_code: false
# Rollout model config.
rollout:

View File

@ -3,6 +3,9 @@ defaults:
# dp ref config, inheriting from trainer/config/ref/ref.yaml
- ref
# fsdp engine config
- ../engine@fsdp_config: fsdp
# load the reference default config, then apply the fields in the current yaml
- _self_
@ -12,29 +15,6 @@ defaults:
# and teacher ref
model: null
# config for FSDP strategy
fsdp_config:
# Target class for this configuration
_target_: verl.workers.config.FSDPEngineConfig
# the wrap policy for FSDP model
wrap_policy:
# minimum number of params in a wrapped module
min_num_params: 0
# whether to offload parameters in FSDP
param_offload: False
# whether to perform reshard after model forward to save memory.
# only for fsdp2, [True, False, int between 1 and fsdp_size]
reshard_after_forward: True
# Only for FSDP1: FSDP1 configuration, prefetch the next forward-pass all-gather
# before the current forward computation.
forward_prefetch: False
# sequence parallel size
# same as actor_rollout_ref.actor.ulysses_sequence_parallel_size if it exists, otherwise 1
ulysses_sequence_parallel_size: ${oc.select:actor_rollout_ref.actor.ulysses_sequence_parallel_size,1}

View File

@ -1,6 +1,10 @@
# megatron ref config, inheriting from trainer/config/ref/ref.yaml
defaults:
- ref
# megatron engine config
- ../engine@megatron: megatron
# load the reference default config, then apply the fields in the current yaml
- _self_
@ -8,17 +12,6 @@ strategy: megatron
megatron:
_target_: verl.workers.config.MegatronEngineConfig
param_offload: False
tensor_model_parallel_size: 1
expert_model_parallel_size: 1
expert_tensor_parallel_size: None
pipeline_model_parallel_size: 1
virtual_pipeline_model_parallel_size: null # change VPP interface for parallelism tests
context_parallel_size: 1
sequence_parallel: True
use_distributed_optimizer: False
use_dist_checkpointing: False
dist_checkpointing_path: null
seed: ${oc.select:actor_rollout_ref.actor.megatron.seed,42}
override_transformer_config: ${oc.select:actor_rollout_ref.actor.megatron.override_transformer_config,{}}
use_mbridge: ${oc.select:actor_rollout_ref.actor.megatron.use_mbridge,False}

View File

@ -371,5 +371,6 @@ def restore_dynamic_batch(data: torch.Tensor, batch_idx_list: list[list[int]]) -
torch.Tensor: The restored data.
"""
indices = list(chain.from_iterable(batch_idx_list))
assert len(indices) == data.size(0), f"{len(indices)} vs. {data.size()}"
revert_indices = torch.tensor(get_reverse_idx(indices), dtype=torch.long)
return data[revert_indices]

View File

@ -70,9 +70,11 @@ class McoreEngineConfig(BaseConfig):
override_mcore_model_config: dict[str, Any] = field(default_factory=dict)
use_mbridge: bool = False
forward_only: bool = False
strategy: str = "megatron"
def __post_init__(self) -> None:
"""config validation logics go here"""
assert self.strategy == "megatron"
if self.tensor_model_parallel_size == 1:
warnings.warn("set sequence parallel to false as TP size is 1", stacklevel=2)
self.sequence_parallel = False
@ -113,3 +115,6 @@ class FSDPEngineConfig(BaseConfig):
entropy_checkpointing: bool = False
forward_only: bool = False
strategy: str = "fsdp"
def __post_init__(self):
assert self.strategy in ["fsdp", "fsdp2"], f"strategy {self.strategy} not supported"

View File

@ -21,6 +21,7 @@ from transformers import AutoConfig
from verl.base_config import BaseConfig
from verl.utils import hf_processor, hf_tokenizer
from verl.utils.fs import copy_to_local
from verl.utils.import_utils import import_external_libs
from verl.utils.model import get_generation_config, update_model_config
__all__ = ["HFModelConfig"]
@ -84,6 +85,8 @@ class HFModelConfig(BaseConfig):
architectures: Optional[list[str]] = None
def __post_init__(self):
import_external_libs(self.external_lib)
if self.hf_config_path is None:
self.hf_config_path = self.path
if self.tokenizer_path is None:

View File

@ -34,7 +34,7 @@ class OptimizerConfig(BaseConfig):
lr_warmup_steps (Optional[int]): Number of warmup steps; None delegates to lr_warmup_steps_ratio.
"""
_mutable_fields = {"clip_grad"}
_mutable_fields = {"clip_grad", "total_training_steps", "lr_warmup_steps"}
lr: float = 1e-3
lr_warmup_steps_ratio: float = 0.0
@ -92,7 +92,6 @@ class McoreOptimizerConfig(OptimizerConfig):
"""
optimizer: str = "adam"
clip_grad: float = 1.0
lr_warmup_init: float = 0.0
lr_decay_steps: Optional[int] = None
lr_decay_style: str = "linear"

View File

@ -15,7 +15,9 @@
The abstract base class defining the interface for model training engines.
"""
from typing import Any, Callable
from typing import Any, Callable, Optional
import torch
from verl import DataProto
@ -108,7 +110,7 @@ class BaseEngine:
outputs["grad_norm"] = grad_norm
return outputs
def infer_batch(self, data: DataProto) -> Any:
def infer_batch(self, data: DataProto, loss_function: Optional[Callable] = None) -> Any:
"""
Perform inference on a batch of data.
@ -118,7 +120,9 @@ class BaseEngine:
Returns:
Any: The output of the inference, which can be used for predictions or other purposes.
"""
return self.forward_backward_batch(data, None, forward_only=True)
with torch.no_grad():
outputs = self.forward_backward_batch(data, loss_function, forward_only=True)
return outputs
def get_data_parallel_size(self):
raise NotImplementedError
@ -126,6 +130,9 @@ class BaseEngine:
def get_data_parallel_rank(self):
raise NotImplementedError
def get_data_parallel_group(self):
raise NotImplementedError
def to(self, device: str, model: bool = True, optimizer: bool = True):
"""
Move model parameters, optimizer states, or both to the specified device.
@ -179,14 +186,15 @@ class EngineRegistry:
_engines = {}
@classmethod
def register(cls, key: list[str] | str):
def register(cls, model_type: str, backend: list[str] | str):
"""
A class method decorator that registers an engine class with a given key.
This allows for dynamic instantiation of engine classes by their registered key.
Args:
key (str): The identifier to associate with the engine class.
model_type (str): The type of the model
backend (list[str] | str): The backend to use for the model type
Returns:
A decorator function that takes an engine class and registers it.
@ -194,23 +202,27 @@ class EngineRegistry:
def decorator(engine_class):
assert issubclass(engine_class, BaseEngine)
if isinstance(key, list):
for k in key:
cls._engines[k] = engine_class
if model_type not in cls._engines:
cls._engines[model_type] = {}
if isinstance(backend, list):
for k in backend:
cls._engines[model_type][k] = engine_class
else:
assert isinstance(key, str)
cls._engines[key] = engine_class
assert isinstance(backend, str)
cls._engines[model_type][backend] = engine_class
return engine_class
return decorator
@classmethod
def get_engine_cls(cls, key):
assert key in cls._engines, f"Unknown engine: {key}"
return cls._engines[key]
def get_engine_cls(cls, model_type: str, backend: str):
assert model_type in cls._engines, f"Unknown model_type: {model_type}"
assert backend in cls._engines[model_type], f"Unknown backend: {backend}"
return cls._engines[model_type][backend]
@classmethod
def new(cls, key, *args, **kwargs):
def new(cls, model_type, backend, *args, **kwargs):
"""
Function to create a new training engine instance based on the provided config.
Args:
@ -222,7 +234,5 @@ class EngineRegistry:
Raises:
NotImplementedError: If the engine key in the config does not match any known engines.
"""
if key in cls._engines:
return cls._engines[key](*args, **kwargs)
else:
raise NotImplementedError(f"Unknown engine: {key}")
engine_cls = cls.get_engine_cls(model_type, backend)
return engine_cls(*args, **kwargs)

View File

@ -16,7 +16,6 @@ The concrete Engine implementation using PyTorch FullyShardedDataParallel (FSDP)
"""
import gc
import itertools
import logging
import os
import warnings
@ -57,9 +56,7 @@ from verl.utils.fsdp_utils import (
offload_fsdp_model_to_cpu,
offload_fsdp_optimizer,
)
from verl.utils.import_utils import import_external_libs
from verl.utils.py_functional import append_to_dict, convert_to_regular_types
from verl.utils.seqlen_balancing import get_reverse_idx
from verl.utils.py_functional import convert_to_regular_types
from verl.utils.torch_functional import logprobs_from_logits
from verl.utils.ulysses import gather_outputs_and_unpad, ulysses_pad, ulysses_pad_and_slice_inputs
from verl.workers.sharding_manager.fsdp_ulysses import FSDPUlyssesShardingManager
@ -74,6 +71,7 @@ from verl.utils.seqlen_balancing import prepare_dynamic_batch
from verl.workers.config import FSDPEngineConfig, FSDPOptimizerConfig, HFModelConfig
from ..base import BaseEngine, EngineRegistry
from ..utils import postprocess_batch_func
from .utils import create_device_mesh, get_sharding_strategy
logger = logging.getLogger(__file__)
@ -82,7 +80,6 @@ logger.setLevel(os.getenv("VERL_LOGGING_LEVEL", "WARN"))
device_name = get_device_name()
@EngineRegistry.register(["fsdp", "fsdp2"])
class FSDPEngine(BaseEngine):
"""
Concrete Engine implementation using PyTorch FullyShardedDataParallel (FSDP).
@ -149,8 +146,6 @@ class FSDPEngine(BaseEngine):
Sets up checkpoint manager and FLOPs counter.
"""
# This is used to import external_lib into the huggingface systems
import_external_libs(self.model_config.external_lib)
self._build_model_optimizer()
if self._is_offload_param:
@ -442,6 +437,12 @@ class FSDPEngine(BaseEngine):
def get_data_parallel_size(self):
return torch.distributed.get_world_size() // self.ulysses_sequence_parallel_size
def get_data_parallel_group(self):
if self.ulysses_device_mesh is not None:
return self.ulysses_device_mesh.get_group(mesh_dim="dp")
else:
return torch.distributed.group.WORLD
def prepare_micro_batches(self, data: DataProto):
"""
Prepare micro batches from data.
@ -471,59 +472,22 @@ class FSDPEngine(BaseEngine):
def forward_backward_batch(self, data: DataProto, loss_function: Callable, forward_only=False) -> list[DataProto]:
micro_batches, indices = self.prepare_micro_batches(data=data)
output = []
output_lst = []
ctx = torch.no_grad() if forward_only else nullcontext()
for micro_batch in micro_batches:
with ctx:
# note that loss must be scaled in postprocess_micro_batch_func
loss, metrics = self.forward_step(micro_batch, loss_function=loss_function, forward_only=forward_only)
loss, meta_info = self.forward_step(micro_batch, loss_function=loss_function, forward_only=forward_only)
if not forward_only:
# metrics contain the output, loss is dummy
loss.backward()
output.append(metrics)
output_lst.append(meta_info)
# postprocess and return
return self.postprocess_batch_func(output, indices, forward_only, data)
def postprocess_batch_func(self, losses_reduced, indices, forward_only, data: DataProto):
use_dynamic_bsz = data.meta_info.get("use_dynamic_bsz", True)
if forward_only:
# losses_reduced is a list of dict containing outputs for each micro-batch
# reorder entropy and outputs. Return None for other pp ranks
# only on last rank. It should be on every tp rank
output = {}
for o in losses_reduced:
for key, val in o.items():
if key not in output:
output[key] = []
output[key].append(val)
indices = list(itertools.chain.from_iterable(indices))
revert_indices = torch.tensor(get_reverse_idx(indices), dtype=torch.long)
for key, val in output.items():
val = torch.cat(val, dim=0)
if use_dynamic_bsz:
assert len(indices) == val.size(0), f"{len(indices)} vs. {val.size()}"
val = val[revert_indices]
output[key] = val
return output
else:
metrics = {}
# combine metrics of each micro-batch
metric_micro_batch = losses_reduced
for metric in metric_micro_batch:
# Note that o[0] is metrics, o[1] is entropy, o[2] is response_mask
append_to_dict(metrics, metric) # append the metric from this micro-batch to global metrics.
return metrics
return postprocess_batch_func(output_lst=output_lst, indices=indices, data=data)
def forward_step(self, micro_batch: DataProto, loss_function, forward_only):
raise NotImplementedError("forward_step must be implemented in subclass")
@ -683,6 +647,7 @@ class EngineTrainModeCtx:
self.engine.mode = None
@EngineRegistry.register(model_type="language_model", backend=["fsdp", "fsdp2"])
class FSDPEngineWithLMHead(FSDPEngine):
def forward_step(self, micro_batch: DataProto, loss_function, forward_only):
use_remove_padding = micro_batch.meta_info.get("use_remove_padding", True)
@ -693,7 +658,7 @@ class FSDPEngineWithLMHead(FSDPEngine):
device_name = get_device_name()
# actually, we should avoid assigning like this...
micro_batch = micro_batch.to(get_device_id())
micro_batch_tensor = micro_batch.batch.to(device_name)
micro_batch_tensor = micro_batch.batch
response_length = micro_batch_tensor["responses"].size(-1)
multi_modal_inputs = {}
@ -881,8 +846,21 @@ class FSDPEngineWithLMHead(FSDPEngine):
if calculate_entropy:
output["entropy"] = entropy
if forward_only:
return None, output
model_output = output
if loss_function is not None:
loss, metrics = loss_function(
model_output=output, data=micro_batch_tensor, dp_group=self.get_data_parallel_group()
)
else:
policy_loss, metrics = loss_function(model_output=output, data=micro_batch_tensor)
return policy_loss, metrics
assert forward_only, "forward_only must be True when loss_function is None"
loss = torch.tensor(1.0, device=device_name)
metrics = {}
output = {
"model_output": model_output,
"loss": loss,
"metrics": metrics,
}
return loss, output

View File

@ -12,7 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import itertools
import logging
import os
from functools import partial
@ -38,18 +37,17 @@ from verl.utils.megatron_utils import (
offload_megatron_optimizer,
)
from verl.utils.model import load_mcore_dist_weights, load_megatron_gptmodel_weights
from verl.utils.py_functional import append_to_dict
from verl.utils.seqlen_balancing import get_reverse_idx, rearrange_micro_batches
from verl.utils.seqlen_balancing import rearrange_micro_batches
from verl.workers.config import HFModelConfig, McoreEngineConfig, McoreOptimizerConfig
from ..base import BaseEngine, EngineRegistry
from ..utils import postprocess_batch_func
from .utils import set_random_seed
logger = logging.getLogger(__file__)
logger.setLevel(os.getenv("VERL_LOGGING_LEVEL", "WARN"))
@EngineRegistry.register("megatron")
class MegatronEngine(BaseEngine):
def __init__(
self,
@ -320,6 +318,9 @@ class MegatronEngine(BaseEngine):
def get_data_parallel_size(self):
return mpu.get_data_parallel_world_size()
def get_data_parallel_group(self):
return mpu.get_data_parallel_group()
def save_checkpoint(self, local_path, hdfs_path=None, global_step=0, max_ckpt_to_keep=None):
"""
Save model, optimizer, and scheduler states to a checkpoint.
@ -447,48 +448,8 @@ class MegatronEngine(BaseEngine):
forward_only=forward_only,
)
# loss_reduces contains the stats returned from loss_func
return self.postprocess_batch_func(
losses_reduced=losses_reduced, indices=indices, forward_only=forward_only, data=data
)
def postprocess_batch_func(self, losses_reduced, indices, forward_only, data: DataProto):
use_dynamic_bsz = data.meta_info.get("use_dynamic_bsz", True)
if mpu.is_pipeline_last_stage(ignore_virtual=True):
if forward_only:
# losses_reduced is a list of dict containing outputs for each micro-batch
# reorder entropy and outputs. Return None for other pp ranks
# only on last rank. It should be on every tp rank
output = {}
for o in losses_reduced:
for key, val in o.items():
if key not in output:
output[key] = []
output[key].append(val)
indices = list(itertools.chain.from_iterable(indices))
revert_indices = torch.tensor(get_reverse_idx(indices), dtype=torch.long)
for key, val in output.items():
val = torch.cat(val, dim=0)
if use_dynamic_bsz:
assert len(indices) == val.size(0), f"{len(indices)} vs. {val.size()}"
val = val[revert_indices]
output[key] = val
return output
else:
metrics = {}
# combine metrics of each micro-batch
metric_micro_batch = losses_reduced
for metric in metric_micro_batch:
# Note that o[0] is metrics, o[1] is entropy, o[2] is response_mask
append_to_dict(metrics, metric) # append the metric from this micro-batch to global metrics.
return metrics
return postprocess_batch_func(output_lst=losses_reduced, indices=indices, data=data)
else:
return {}
@ -547,6 +508,7 @@ class EngineTrainModeCtx:
self.engine.mode = None
@EngineRegistry.register(model_type="language_model", backend="megatron")
class MegatronEngineWithLMHead(MegatronEngine):
def forward_step(self, batch_iter: Iterator[TensorDict], model, meta_info: dict, postprocess_micro_batch_func):
use_fused_kernels = meta_info.get("use_fused_kernels", False)
@ -650,16 +612,21 @@ class MegatronEngineWithLMHead(MegatronEngine):
entropy = output["entropy"][:, -response_length - 1 : -1].contiguous()
model_output["entropy"] = entropy
if forward_only:
# for inference
return torch.tensor(1.0, device=device), model_output
if loss_function is not None:
loss, metrics = loss_function(model_output=model_output, data=data, dp_group=self.get_data_parallel_group())
else:
assert forward_only, "forward_only must be True when loss_function is None"
loss = torch.tensor(1.0, device=device)
metrics = {}
# for training
# note that this loss function can be swapped with other loss functions such as SFT
policy_loss, metrics = loss_function(model_output=model_output, data=data)
output = {
"model_output": model_output,
"loss": loss,
"metrics": metrics,
}
# return loss and stats
return policy_loss, metrics
return loss, output
class MegatronEngineWithValueHead(MegatronEngine):

View File

@ -0,0 +1,77 @@
# Copyright 2024 Bytedance Ltd. and/or its affiliates
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import torch
from verl import DataProto
from verl.utils.py_functional import append_to_dict
from verl.utils.seqlen_balancing import restore_dynamic_batch
def postprocess_batch_func(output_lst, indices, data: DataProto):
"""postprocess the output of a forward_backward_batch.
output_lst is a list of dict containing outputs for each micro-batch
reorder entropy and outputs. Return None for other pp ranks
only on last rank. It should be on every tp rank
each losses_reduced contains 1. model_output, 2. loss, 3. metrics.
"""
use_dynamic_bsz = data.meta_info.get("use_dynamic_bsz", True)
# losses_reduced is a list of dict containing outputs for each micro-batch
# reorder entropy and outputs. Return None for other pp ranks
# only on last rank. It should be on every tp rank
# losses_reduced contains 1. model_output, 2. loss, 3. metrics.
# We perform reverse
model_output = {}
losses = []
aggregated_metrics = {}
# model output
for o in output_lst:
if "model_output" in o:
for key, val in o["model_output"].items():
if key not in model_output:
model_output[key] = []
model_output[key].append(val)
# concat results from micro batches
for key, val in model_output.items():
model_output[key] = torch.cat(model_output[key], dim=0)
# reverse with dynamic bsz
if use_dynamic_bsz:
model_output[key] = restore_dynamic_batch(model_output[key], indices)
# loss
for o in output_lst:
if "loss" in o:
losses.append(o["loss"])
# metrics
for o in output_lst:
if "metrics" in o:
metrics = o["metrics"]
append_to_dict(aggregated_metrics, metrics)
output = {
"model_output": model_output,
"loss": losses,
"metrics": aggregated_metrics,
}
return output

View File

@ -124,6 +124,7 @@ class ActorWorker(Worker, DistProfilerExtension):
with self.engine.eval_mode():
output = self.engine.infer_batch(data)
output = output.get("model_output", {})
if "log_probs" in output and "entropy" in output:
# in megatron, only last pp contains valid data and returned to the single controller
@ -188,7 +189,8 @@ class ActorWorker(Worker, DistProfilerExtension):
dataloader = self._make_minibatch_iterator(data)
with Timer(name="update_policy", logger=None) as timer:
for batch_idx, mini_batch in enumerate(dataloader):
mini_batch_metrics = self.engine.train_batch(mini_batch, self.loss_fn)
output = self.engine.train_batch(mini_batch, self.loss_fn)
mini_batch_metrics = output.get("metrics", {})
append_to_dict(metrics, mini_batch_metrics, prefix="actor/")
delta_time = timer.last

View File

@ -19,14 +19,14 @@ from verl.trainer.ppo.core_algos import agg_loss, get_policy_loss_fn, kl_penalty
from verl.workers.config import ActorConfig
def sft_loss(config: ActorConfig, model_output, data):
def sft_loss(config: ActorConfig, model_output, data, dp_group=None):
log_prob = model_output["log_probs"] # [bsz, response_length]
response_mask = data["response_mask"].to(bool)
loss = -torch.mean(log_prob * response_mask)
return loss, {"loss": loss.detach().item()}
def ppo_loss(config: ActorConfig, model_output, data):
def ppo_loss(config: ActorConfig, model_output, data, dp_group=None):
log_prob = model_output["log_probs"]
entropy = model_output.get("entropy", None)