Files
verl/recipe/entropy/reward.py
H 00a10a8ef3 [ci] refactor: reduce ruff line-length from 300 to 120 (#2287)
### What does this PR do?

Previously the ruff line-len is too large, making it hard for users to
view code. If we keep the config, manually created short lines will be
formatted to long lines as well. This PR contains 3 commits:
- df4bbfca62f41d972c48c8a76088ae2ac29691cf set line len to 120 and run
pre-commit auto-format
- 9d03f183edd9fff4e22215cacacf62c06b7b41d3 let devin fix the multi-line
code
- 9fc8d436f5007535fad3dc49983b01d0d457be9c skip lint for
test_sglang_async_rollout_sf_tools.py. manually adjust format for
rope_utils.py
- last two commits:
  1. merge with main
2. run lint after merge. add test_sglang_async_rollout_sf_tools.py and
scripts/legacy_model_merger.py to lint.exclude

### Checklist Before Starting

- [x] Search for similar PRs. Paste at least one query link here: ...
- [x] Format the PR title as `[{modules}] {type}: {description}` (This
will be checked by the CI)
- `{modules}` include `fsdp`, `megatron`, `sglang`, `vllm`, `rollout`,
`trainer`, `ci`, `training_utils`, `recipe`, `hardware`, `deployment`,
`ray`, `worker`, `single_controller`, `misc`, `perf`, `model`, `algo`,
`env`, `tool`, `ckpt`, `doc`, `data`
- If this PR involves multiple modules, separate them with `,` like
`[megatron, fsdp, doc]`
  - `{type}` is in `feat`, `fix`, `refactor`, `chore`, `test`
- If this PR breaks any API (CLI arguments, config, function signature,
etc.), add `[BREAKING]` to the beginning of the title.
  - Example: `[BREAKING][fsdp, megatron] feat: dynamic batching`

### Test

This PR relies on CI for testing.


### Checklist Before Submitting

> [!IMPORTANT]
> Please check all the following items before requesting a review,
otherwise the reviewer might deprioritize this PR for review.

- [ ] Read the [Contribute
Guide](https://github.com/volcengine/verl?tab=readme-ov-file#contribution-guide).
- [ ] Apply [pre-commit
checks](https://github.com/volcengine/verl?tab=readme-ov-file#code-linting-and-formatting):
`pre-commit install && pre-commit run --all-files --show-diff-on-failure
--color=always`
- [ ] Add / Update [the
documentation](https://github.com/volcengine/verl/tree/main/docs).
- [ ] Add unit or end-to-end test(s) to [the CI
workflow](https://github.com/volcengine/verl/tree/main/.github/workflows)
to cover all the code. If not feasible, explain why: ...
- [ ] Once your PR is ready for CI, send a message in [the `ci-request`
channel](https://verl-project.slack.com/archives/C091TCESWB1) in [the
`verl` Slack
workspace](https://join.slack.com/t/verl-project/shared_invite/zt-3855yhg8g-CTkqXu~hKojPCmo7k_yXTQ).

---------

Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
2025-07-01 09:54:40 +08:00

87 lines
3.4 KiB
Python

# Copyright 2025 Individual Contributor: Thibaut Barroyer
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import multiprocessing
from functools import partial
import ray
from verl import DataProto
from verl.trainer.ppo.reward import compute_reward, get_custom_reward_fn
from .reward_score import _default_compute_score
def load_reward_manager(config, tokenizer, num_examine, **reward_kwargs):
"""
Load and initialize a reward manager based on the configuration.
Args:
config: PPO trainer configuration object containing reward_model fields.
tokenizer: Tokenizer object used for processing text.
num_examine: Number of samples to examine.
**reward_kwargs: Additional keyword arguments for the reward manager.
Returns:
An instance of the specified reward manager class.
"""
from verl.workers.reward_manager import get_reward_manager_cls
# The list of pre-defined reward managers are defined in `verl/workers/reward_manager/`:
# naive: NaiveRewardManager
# prime: PrimeRewardManager
# batch: BatchRewardManager
# dapo: DAPORewardManager
# Note(haibin.lin): For custom reward managers, please make sure they are imported and
# registered via `verl.workers.reward_manager.register`
# By default reward_manager is set to naive (NaiveRewardManager)
reward_manager_name = config.reward_model.get("reward_manager", "naive")
reward_manager_cls = get_reward_manager_cls(reward_manager_name)
# Try to get a custom reward function based on the configuration
compute_score = get_custom_reward_fn(config)
final_compute_score = compute_score
if compute_score is None:
sandbox_config = config.reward_model.get("sandbox_fusion")
sandbox_url = sandbox_config.get("url") if sandbox_config else None
if sandbox_url:
sandbox_manager = multiprocessing.Manager()
# Create a semaphore to control concurrent access to the sandbox
_concurrent_semaphore = sandbox_manager.Semaphore(sandbox_config.get("max_concurrent", 64))
final_compute_score = partial(
_default_compute_score, sandbox_fusion_url=sandbox_url, concurrent_semaphore=_concurrent_semaphore
)
else:
final_compute_score = _default_compute_score
# Instantiate and return the reward manager with the specified parameters
return reward_manager_cls(
tokenizer=tokenizer,
num_examine=num_examine,
compute_score=final_compute_score,
reward_fn_key=config.data.reward_fn_key,
**reward_kwargs,
)
@ray.remote(num_cpus=1)
def compute_reward_async(data: DataProto, config, tokenizer):
"""
Load the reward manager and compute the reward for a batch of data.
This is meant to be run in a separate Ray worker.
"""
reward_fn = load_reward_manager(config, tokenizer, num_examine=0, **config.reward_model.get("reward_kwargs", {}))
return compute_reward(data, reward_fn)