mirror of
https://github.com/volcengine/verl.git
synced 2025-10-20 21:53:50 +08:00
As initially mentioned in https://github.com/volcengine/verl/discussions/1941, having structured configuration classes in verl makes argument passing easier for testing and validation. This is an extended thread on the current implementation of configuration schema in verl. Related PRs: - https://github.com/volcengine/verl/pull/2117 - https://github.com/volcengine/verl/pull/2621 # Motivation By moving from loose `omegaconfig.DictConfig`-based parameters to structured dataclasses, we gain: - Type safety & IDE support when accessing fields (e.g. cfg.optim.lr). - Validation hooks via __post_init__ in each class. - Immutable defaults with controlled mutability (e.g., an extra field). - Seamless Hydra/OmegaConf integration and easy per-recipe extension. # Core: BaseConfig hydra natively provides support for converting DictConfig to dataclass, but dataclass does not support accessing attribute via `get()`. We introduce a base class to provide backward compatibility and make the change less abrupt for existing users. All config dataclasses inherit from BaseConfig, which: - Implements collections.abc.Mapping → dict-like iteration/access. - Freezes attributes once set, unless listed in _mutable_fields. - Provides an `extra: dict[str, Any]` for unchecked extensions. ```python @dataclass class BaseConfig(collections.abc.Mapping): """Dict-like, frozen dataclass with opt-in mutability.""" _mutable_fields: set[str] = {"extra"} extra: dict[str, Any] = field(default_factory=dict) def __setattr__(self, name: str, value): if name in self.__dict__ and name not in self._mutable_fields: raise FrozenInstanceError(f"Field '{name}' is frozen") super().__setattr__(name, value) # Mapping methods: get, __getitem__, __iter__, __len__ … ``` # Example Config Classes (verl/trainer/config) Each sub-component of the trainer has its own dataclass, inheriting BaseConfig. ```yaml: critic: checkpoint: _target_: verl.trainer.config.CheckpointConfig save_contents: ["model","optimizer","extra"] load_contents: ["model","optimizer","extra"] async_save: false ``` Definition: ```python @dataclass class CheckpointConfig(BaseConfig): """What to save/load and async behavior.""" save_contents: list[str] = field(default_factory=lambda: ["model","optimizer","extra"]) load_contents: list[str] = field(default_factory=lambda: ["model","optimizer","extra"]) async_save: bool = False def __post_init__(self): # validation checks go here after initialization ckpt_cfg = CheckpointConfig(async_save=True) print(ckpt_cfg.save_contents) print(ckpt_cfg.get("save_contents", default_value)) print(ckpt_cfg["save_contents"]) # converting hydra-generated omegaconf.DictConfig to the dataclass config: from verl.utils.config import omegaconf_to_dataclass ckpt_cfg_from_cli = omegaconf_to_dataclass(config.critic.checkpoint) ``` # Extending existing config classes Because now configs become structured, unexpected keys would raise exceptions. To add new keys, there are two ways: ## Explicit class extensions: ```python from verl.workers.config import FSDPActorConfig @dataclass class SPPOActorConfig(FSDPActorConfig): """Add SPPO-specific temperature/penalty.""" sppo_eta: float = 1.0 ``` When using yaml or from command line, update the target config class: ```yaml hydra: searchpath: - file://verl/trainer/config defaults: - ppo_trainer # base trainer config - _self_ # then apply these overrides actor_rollout_ref: actor: _target_: recipe.sppo.config.SPPOActorConfig # **new target dataclass required for extension ** sppo_eta: 1.0 ``` or directly from command line: ```bash python main_sppo.py \ actor_rollout_ref.actor._target_=recipe.sppo.config.SPPOActorConfig \ actor_rollout_ref.actor.sppo_eta=1.0 ``` ## Leverage the `extra` field Adding more keys to the `extra` field of any dataclass that inherits from `BaseConfig` also works. This way there's no need to define your own dataclass in python: ```yaml hydra: searchpath: - file://verl/trainer/config defaults: - ppo_trainer # base trainer config - _self_ # then apply these overrides actor_rollout_ref: actor: extra: sppo_eta: 1.0 ``` # Declaring mutable fields For historical reasons some fields in the configs are mutated inplace in the codebase such as batch size for data/sequence parallelism. We are in the process of deprecating this kind of behavior. However, if you want to intentionally mutate one field, specify it with the `_mutable_fields` attr: ```python @dataclass class CheckpointConfig(BaseConfig): """What to save/load and async behavior.""" _mutable_fields = BaseConfig._mutable_fields | {"save_contents"} # mark save_contents as mutable. save_contents: list[str] = field(default_factory=lambda: ["model","optimizer","extra"]) load_contents: list[str] = field(default_factory=lambda: ["model","optimizer","extra"]) async_save: bool = False ``` # Other helpful resources verl default trainer configs combines the following config files together, specified in the `_defaults_` field: https://github.com/volcengine/verl/blob/main/verl/trainer/config/ppo_trainer.yaml#L1-L36 - verl/trainer/config/ppo_trainer.yaml # main config for entrypoint - verl/trainer/config/actor/dp_actor.yaml - verl/trainer/config/critic/dp_critic.yaml - verl/trainer/config/reward_model/dp_reward_model.yaml - verl/trainer/config/rollout/rollout.yaml To quickly peek the default full config in a single file, you can check the auto-generated full config in https://github.com/volcengine/verl/blob/main/verl/trainer/config/_generated_ppo_trainer.yaml # Change log and impact on existing code This PR converts the following fields to structured dataclass in the training pipeline. More can be done in future PRs (contributions from the community is welcome) - [x] actor_rollout_ref.actor - [x] critic - [ ] actor_rollout_ref.rollout - [ ] actor_rollout_ref.ref - [ ] reward_model - [ ] data - [ ] trainer Changes needed for existing code that added new fields to config: - see recipe/sppo for an example - `OmegaConf.to_container(self.config.model.get("override_config", OmegaConf.create()))` now has to manually changed to `self.config.model.get("override_config", {})`. Because OmegaConf.to_container expects a DictConfig but config.model.override_config is already a dict. # Other Breaking Changes critic.optim.lr for megatron changed from 1e-6 to 1e-5 --------- Signed-off-by: ShareLer <ShareLe@163.com> Co-authored-by: devin-ai-integration[bot] <158243242+devin-ai-integration[bot]@users.noreply.github.com> Co-authored-by: Joel <wuxibin@bytedance.com> Co-authored-by: Cheetah <1659275352@qq.com> Co-authored-by: 杨睿 <yangruipis@163.com> Co-authored-by: X. HU <huxiaobo@zju.edu.cn> Co-authored-by: Le Xue <48175490+ShareLer@users.noreply.github.com> Co-authored-by: Ziheng Jiang <ziheng@apache.org> Co-authored-by: Blue Space <57280232+ETOgaosion@users.noreply.github.com> Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
292 lines
10 KiB
Python
292 lines
10 KiB
Python
# Copyright 2024 Bytedance Ltd. and/or its affiliates
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
import json
|
|
import os
|
|
from typing import Any
|
|
|
|
import numpy as np
|
|
import pytest
|
|
import ray
|
|
from omegaconf import DictConfig
|
|
from transformers.utils import get_json_schema
|
|
|
|
from tests.experimental.agent_loop.agent_utils import init_agent_loop_manager
|
|
from verl.experimental.agent_loop.agent_loop import get_trajectory_info
|
|
from verl.protocol import DataProto
|
|
from verl.tools.base_tool import BaseTool, OpenAIFunctionToolSchema
|
|
from verl.utils import hf_tokenizer
|
|
|
|
|
|
@pytest.fixture
|
|
def init_config() -> DictConfig:
|
|
from hydra import compose, initialize_config_dir
|
|
|
|
with initialize_config_dir(config_dir=os.path.abspath("verl/trainer/config")):
|
|
config = compose(
|
|
config_name="ppo_trainer",
|
|
overrides=[
|
|
"actor_rollout_ref.actor.use_dynamic_bsz=true",
|
|
# test sleep/wake_up with fsdp offload
|
|
"actor_rollout_ref.actor.fsdp_config.param_offload=True",
|
|
"actor_rollout_ref.actor.fsdp_config.optimizer_offload=True",
|
|
],
|
|
)
|
|
|
|
model_path = "Qwen/Qwen2.5-1.5B-Instruct"
|
|
config.actor_rollout_ref.model.path = model_path
|
|
config.actor_rollout_ref.rollout.name = os.getenv("ROLLOUT_NAME", "vllm")
|
|
config.actor_rollout_ref.rollout.mode = "async"
|
|
config.actor_rollout_ref.rollout.prompt_length = 4096
|
|
config.actor_rollout_ref.rollout.response_length = 4096
|
|
config.actor_rollout_ref.rollout.n = 4
|
|
config.actor_rollout_ref.rollout.agent.num_workers = 2
|
|
|
|
return config
|
|
|
|
|
|
def test_single_turn(init_config):
|
|
ray.init(
|
|
runtime_env={
|
|
"env_vars": {
|
|
"TOKENIZERS_PARALLELISM": "true",
|
|
"NCCL_DEBUG": "WARN",
|
|
"VLLM_LOGGING_LEVEL": "INFO",
|
|
"VLLM_USE_V1": "1",
|
|
}
|
|
}
|
|
)
|
|
|
|
agent_loop_manager = init_agent_loop_manager(init_config)
|
|
|
|
raw_prompts = [
|
|
[
|
|
{
|
|
"role": "user",
|
|
"content": "Let's play a role playing game. Your name is Alice, your favorite color is blue.",
|
|
}
|
|
],
|
|
[{"role": "user", "content": "Let's play a role playing game. Your name is Bob, your favorite color is red."}],
|
|
]
|
|
batch = DataProto(
|
|
non_tensor_batch={
|
|
"raw_prompt": np.array(raw_prompts),
|
|
"agent_name": np.array(["single_turn_agent"] * len(raw_prompts)),
|
|
},
|
|
)
|
|
n = init_config.actor_rollout_ref.rollout.n
|
|
batch = batch.repeat(n)
|
|
result = agent_loop_manager.generate_sequences(prompts=batch)
|
|
assert len(result) == len(raw_prompts) * n
|
|
|
|
# check result
|
|
seq_len = result.batch["prompts"].size(1) + result.batch["responses"].size(1)
|
|
assert result.batch["input_ids"].size(1) == seq_len
|
|
assert result.batch["attention_mask"].size(1) == seq_len
|
|
assert result.batch["position_ids"].size(1) == seq_len
|
|
|
|
# check turns
|
|
num_turns = result.non_tensor_batch["__num_turns__"]
|
|
assert np.all(num_turns == 2)
|
|
|
|
print("Test passed!")
|
|
ray.shutdown()
|
|
|
|
|
|
class WeatherTool(BaseTool):
|
|
def get_current_temperature(self, location: str, unit: str = "celsius"):
|
|
"""Get current temperature at a location.
|
|
|
|
Args:
|
|
location: The location to get the temperature for, in the format "City, State, Country".
|
|
unit: The unit to return the temperature in. Defaults to "celsius". (choices: ["celsius", "fahrenheit"])
|
|
|
|
Returns:
|
|
the temperature, the location, and the unit in a dict
|
|
"""
|
|
print(f"[DEBUG] get_current_temperature: {location}, {unit}")
|
|
return {
|
|
"temperature": 26.1,
|
|
"location": location,
|
|
"unit": unit,
|
|
}
|
|
|
|
def get_openai_tool_schema(self) -> OpenAIFunctionToolSchema:
|
|
schema = get_json_schema(self.get_current_temperature)
|
|
return OpenAIFunctionToolSchema(**schema)
|
|
|
|
async def execute(self, instance_id: str, parameters: dict[str, Any], **kwargs) -> tuple[str, float, dict]:
|
|
try:
|
|
result = self.get_current_temperature(**parameters)
|
|
return json.dumps(result), 0, {}
|
|
except Exception as e:
|
|
return str(e), 0, {}
|
|
|
|
|
|
class WeatherToolWithData(BaseTool):
|
|
def get_openai_tool_schema(self) -> OpenAIFunctionToolSchema:
|
|
schema = get_json_schema(self.get_temperature_date)
|
|
return OpenAIFunctionToolSchema(**schema)
|
|
|
|
def get_temperature_date(self, location: str, date: str, unit: str = "celsius"):
|
|
"""Get temperature at a location and date.
|
|
|
|
Args:
|
|
location: The location to get the temperature for, in the format "City, State, Country".
|
|
date: The date to get the temperature for, in the format "Year-Month-Day".
|
|
unit: The unit to return the temperature in. Defaults to "celsius". (choices: ["celsius", "fahrenheit"])
|
|
|
|
Returns:
|
|
the temperature, the location, the date and the unit in a dict
|
|
"""
|
|
print(f"[DEBUG] get_temperature_date: {location}, {date}, {unit}")
|
|
return {
|
|
"temperature": 25.9,
|
|
"location": location,
|
|
"date": date,
|
|
"unit": unit,
|
|
}
|
|
|
|
async def execute(self, instance_id: str, parameters: dict[str, Any], **kwargs) -> tuple[str, float, dict]:
|
|
try:
|
|
result = self.get_temperature_date(**parameters)
|
|
return json.dumps(result), 0, {}
|
|
except Exception as e:
|
|
return str(e), 0, {}
|
|
|
|
|
|
def test_tool_agent(init_config):
|
|
ray.init(
|
|
runtime_env={
|
|
"env_vars": {
|
|
"TOKENIZERS_PARALLELISM": "true",
|
|
"NCCL_DEBUG": "WARN",
|
|
"VLLM_LOGGING_LEVEL": "INFO",
|
|
"VLLM_USE_V1": "1",
|
|
}
|
|
}
|
|
)
|
|
|
|
# =========================== 1. Init rollout manager ===========================
|
|
tool_config = {
|
|
"tools": [
|
|
{
|
|
"class_name": "tests.experimental.agent_loop.test_basic_agent_loop.WeatherTool",
|
|
"config": {"type": "native"},
|
|
},
|
|
{
|
|
"class_name": "tests.experimental.agent_loop.test_basic_agent_loop.WeatherToolWithData",
|
|
"config": {"type": "native"},
|
|
},
|
|
]
|
|
}
|
|
tool_config_path = "/tmp/tool_config.json"
|
|
with open(tool_config_path, "w") as f:
|
|
json.dump(tool_config, f)
|
|
|
|
n = 2
|
|
init_config.actor_rollout_ref.rollout.n = n
|
|
init_config.actor_rollout_ref.rollout.multi_turn.tool_config_path = tool_config_path
|
|
init_config.actor_rollout_ref.rollout.multi_turn.max_parallel_calls = 2
|
|
agent_loop_manager = init_agent_loop_manager(init_config)
|
|
|
|
# =========================== 2. Generate sequences ===========================
|
|
raw_prompts = [
|
|
[
|
|
{"role": "user", "content": "How are you?"},
|
|
],
|
|
[
|
|
{"role": "user", "content": "What's the temperature in Los Angeles now?"},
|
|
],
|
|
[
|
|
{"role": "user", "content": "What's the temperature in New York now?"},
|
|
],
|
|
[
|
|
{
|
|
"role": "system",
|
|
"content": "You are Qwen, created by Alibaba Cloud. You are a helpful assistant.\n\n"
|
|
"Current Date: 2024-09-30",
|
|
},
|
|
{"role": "user", "content": "What's the temperature in San Francisco now? How about tomorrow?"},
|
|
],
|
|
]
|
|
batch = DataProto(
|
|
non_tensor_batch={
|
|
"raw_prompt": np.array([np.array(prompt) for prompt in raw_prompts], dtype=object),
|
|
"agent_name": np.array(["tool_agent"] * len(raw_prompts)),
|
|
},
|
|
)
|
|
batch = batch.repeat(n)
|
|
result = agent_loop_manager.generate_sequences(prompts=batch)
|
|
assert len(result) == len(raw_prompts) * n
|
|
|
|
# Check turns
|
|
num_turns = result.non_tensor_batch["__num_turns__"]
|
|
print(f"num_turns: {num_turns}")
|
|
for i in range(len(num_turns)):
|
|
if i // n == 0:
|
|
# [user, assistant]
|
|
assert num_turns[i] == 2
|
|
else:
|
|
# [user, assistant, tool, assistant]
|
|
assert num_turns[i] == 4
|
|
|
|
# Check response_mask
|
|
tokenizer = hf_tokenizer(init_config.actor_rollout_ref.model.path)
|
|
responses = result.batch["responses"]
|
|
response_mask = result.batch["response_mask"]
|
|
attention_mask = result.batch["attention_mask"]
|
|
assert responses.size() == response_mask.size(), f"{responses.size()} != {response_mask.size()}"
|
|
response_length = response_mask.size(1)
|
|
|
|
for i in range(len(responses)):
|
|
# response with tool response
|
|
valid_tokens = responses[i][attention_mask[i][-response_length:].bool()]
|
|
response_with_obs = tokenizer.decode(valid_tokens)
|
|
|
|
# response without tool response
|
|
valid_tokens = responses[i][response_mask[i].bool()]
|
|
response_without_obs = tokenizer.decode(valid_tokens)
|
|
|
|
assert "<tool_response>" not in response_without_obs, (
|
|
f"found <tool_response> in response: {response_without_obs}"
|
|
)
|
|
assert "</tool_response>" not in response_without_obs, (
|
|
f"found </tool_response> in response: {response_without_obs}"
|
|
)
|
|
print("=========================")
|
|
print(response_with_obs)
|
|
print("---")
|
|
print(response_without_obs)
|
|
|
|
print("Test passed!")
|
|
ray.shutdown()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_trajectory_info():
|
|
"""Tests the get_trajectory_info method."""
|
|
# Initialize the class to set up class-level attributes
|
|
step = 10
|
|
index = [1, 1, 3, 3]
|
|
expected_info = [
|
|
{"step": step, "sample_index": 1, "rollout_n": 0, "validate": False},
|
|
{"step": step, "sample_index": 1, "rollout_n": 1, "validate": False},
|
|
{"step": step, "sample_index": 3, "rollout_n": 0, "validate": False},
|
|
{"step": step, "sample_index": 3, "rollout_n": 1, "validate": False},
|
|
]
|
|
|
|
trajectory_info = await get_trajectory_info(step, index, validate=False)
|
|
|
|
assert trajectory_info == expected_info
|