mirror of
https://github.com/volcengine/verl.git
synced 2025-10-20 21:53:50 +08:00
### What does this PR do? Previously the ruff line-len is too large, making it hard for users to view code. If we keep the config, manually created short lines will be formatted to long lines as well. This PR contains 3 commits: - df4bbfca62f41d972c48c8a76088ae2ac29691cf set line len to 120 and run pre-commit auto-format - 9d03f183edd9fff4e22215cacacf62c06b7b41d3 let devin fix the multi-line code - 9fc8d436f5007535fad3dc49983b01d0d457be9c skip lint for test_sglang_async_rollout_sf_tools.py. manually adjust format for rope_utils.py - last two commits: 1. merge with main 2. run lint after merge. add test_sglang_async_rollout_sf_tools.py and scripts/legacy_model_merger.py to lint.exclude ### Checklist Before Starting - [x] Search for similar PRs. Paste at least one query link here: ... - [x] Format the PR title as `[{modules}] {type}: {description}` (This will be checked by the CI) - `{modules}` include `fsdp`, `megatron`, `sglang`, `vllm`, `rollout`, `trainer`, `ci`, `training_utils`, `recipe`, `hardware`, `deployment`, `ray`, `worker`, `single_controller`, `misc`, `perf`, `model`, `algo`, `env`, `tool`, `ckpt`, `doc`, `data` - If this PR involves multiple modules, separate them with `,` like `[megatron, fsdp, doc]` - `{type}` is in `feat`, `fix`, `refactor`, `chore`, `test` - If this PR breaks any API (CLI arguments, config, function signature, etc.), add `[BREAKING]` to the beginning of the title. - Example: `[BREAKING][fsdp, megatron] feat: dynamic batching` ### Test This PR relies on CI for testing. ### Checklist Before Submitting > [!IMPORTANT] > Please check all the following items before requesting a review, otherwise the reviewer might deprioritize this PR for review. - [ ] Read the [Contribute Guide](https://github.com/volcengine/verl?tab=readme-ov-file#contribution-guide). - [ ] Apply [pre-commit checks](https://github.com/volcengine/verl?tab=readme-ov-file#code-linting-and-formatting): `pre-commit install && pre-commit run --all-files --show-diff-on-failure --color=always` - [ ] Add / Update [the documentation](https://github.com/volcengine/verl/tree/main/docs). - [ ] Add unit or end-to-end test(s) to [the CI workflow](https://github.com/volcengine/verl/tree/main/.github/workflows) to cover all the code. If not feasible, explain why: ... - [ ] Once your PR is ready for CI, send a message in [the `ci-request` channel](https://verl-project.slack.com/archives/C091TCESWB1) in [the `verl` Slack workspace](https://join.slack.com/t/verl-project/shared_invite/zt-3855yhg8g-CTkqXu~hKojPCmo7k_yXTQ). --------- Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
85 lines
2.4 KiB
Python
85 lines
2.4 KiB
Python
# Copyright 2024 Bytedance Ltd. and/or its affiliates
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
import os
|
|
|
|
import ray
|
|
import torch
|
|
from tensordict import TensorDict
|
|
|
|
from verl import DataProto
|
|
from verl.single_controller.base.worker import Worker
|
|
from verl.single_controller.ray import RayWorkerGroup
|
|
from verl.single_controller.ray.base import RayClassWithInitArgs, RayResourcePool
|
|
|
|
os.environ["RAY_DEDUP_LOGS"] = "0"
|
|
os.environ["NCCL_DEBUG"] = "WARN"
|
|
|
|
|
|
@ray.remote
|
|
class ModelActor(Worker):
|
|
def __init__(self):
|
|
pass
|
|
|
|
|
|
class HackSelf:
|
|
def __init__(self):
|
|
pass
|
|
|
|
|
|
def get_aux_metrics(self, test_proto):
|
|
sequence_ids = test_proto.batch["sequence_ids"]
|
|
decode_count = []
|
|
for i in range(sequence_ids.size(0)):
|
|
decode_count.append(len(sequence_ids[i].tolist()))
|
|
ret_proto = DataProto(
|
|
batch=TensorDict(
|
|
{"sequence_ids": sequence_ids, "decode_count": torch.tensor(decode_count)}, batch_size=sequence_ids.size(0)
|
|
)
|
|
)
|
|
return ret_proto
|
|
|
|
|
|
def test():
|
|
# construct model
|
|
ray.init()
|
|
|
|
# create 2 workers, each hold a GPU
|
|
resource_pool = RayResourcePool([2], use_gpu=True, name_prefix="a")
|
|
|
|
class_with_args = RayClassWithInitArgs(cls=ModelActor)
|
|
shard_wg = RayWorkerGroup(resource_pool, class_with_args)
|
|
|
|
test_bs = 8
|
|
test_proto = DataProto(
|
|
TensorDict(
|
|
{
|
|
"sequence_ids": torch.ones([test_bs, 2048], dtype=torch.int64),
|
|
},
|
|
batch_size=test_bs,
|
|
),
|
|
meta_info={"query_length": 1536},
|
|
)
|
|
|
|
# Sharding among different ranks
|
|
ret_proto1 = shard_wg.execute_with_func_generator(get_aux_metrics, test_proto)
|
|
|
|
# compare execute on driver
|
|
hs = HackSelf()
|
|
ret_proto2 = get_aux_metrics(hs, test_proto)
|
|
|
|
torch.testing.assert_close(ret_proto1.batch["decode_count"], ret_proto2.batch["decode_count"])
|
|
|
|
ray.shutdown()
|