Files
verl/tests/single_controller/test_driverfunc_to_worker.py
H 00a10a8ef3 [ci] refactor: reduce ruff line-length from 300 to 120 (#2287)
### What does this PR do?

Previously the ruff line-len is too large, making it hard for users to
view code. If we keep the config, manually created short lines will be
formatted to long lines as well. This PR contains 3 commits:
- df4bbfca62f41d972c48c8a76088ae2ac29691cf set line len to 120 and run
pre-commit auto-format
- 9d03f183edd9fff4e22215cacacf62c06b7b41d3 let devin fix the multi-line
code
- 9fc8d436f5007535fad3dc49983b01d0d457be9c skip lint for
test_sglang_async_rollout_sf_tools.py. manually adjust format for
rope_utils.py
- last two commits:
  1. merge with main
2. run lint after merge. add test_sglang_async_rollout_sf_tools.py and
scripts/legacy_model_merger.py to lint.exclude

### Checklist Before Starting

- [x] Search for similar PRs. Paste at least one query link here: ...
- [x] Format the PR title as `[{modules}] {type}: {description}` (This
will be checked by the CI)
- `{modules}` include `fsdp`, `megatron`, `sglang`, `vllm`, `rollout`,
`trainer`, `ci`, `training_utils`, `recipe`, `hardware`, `deployment`,
`ray`, `worker`, `single_controller`, `misc`, `perf`, `model`, `algo`,
`env`, `tool`, `ckpt`, `doc`, `data`
- If this PR involves multiple modules, separate them with `,` like
`[megatron, fsdp, doc]`
  - `{type}` is in `feat`, `fix`, `refactor`, `chore`, `test`
- If this PR breaks any API (CLI arguments, config, function signature,
etc.), add `[BREAKING]` to the beginning of the title.
  - Example: `[BREAKING][fsdp, megatron] feat: dynamic batching`

### Test

This PR relies on CI for testing.


### Checklist Before Submitting

> [!IMPORTANT]
> Please check all the following items before requesting a review,
otherwise the reviewer might deprioritize this PR for review.

- [ ] Read the [Contribute
Guide](https://github.com/volcengine/verl?tab=readme-ov-file#contribution-guide).
- [ ] Apply [pre-commit
checks](https://github.com/volcengine/verl?tab=readme-ov-file#code-linting-and-formatting):
`pre-commit install && pre-commit run --all-files --show-diff-on-failure
--color=always`
- [ ] Add / Update [the
documentation](https://github.com/volcengine/verl/tree/main/docs).
- [ ] Add unit or end-to-end test(s) to [the CI
workflow](https://github.com/volcengine/verl/tree/main/.github/workflows)
to cover all the code. If not feasible, explain why: ...
- [ ] Once your PR is ready for CI, send a message in [the `ci-request`
channel](https://verl-project.slack.com/archives/C091TCESWB1) in [the
`verl` Slack
workspace](https://join.slack.com/t/verl-project/shared_invite/zt-3855yhg8g-CTkqXu~hKojPCmo7k_yXTQ).

---------

Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
2025-07-01 09:54:40 +08:00

85 lines
2.4 KiB
Python

# Copyright 2024 Bytedance Ltd. and/or its affiliates
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import ray
import torch
from tensordict import TensorDict
from verl import DataProto
from verl.single_controller.base.worker import Worker
from verl.single_controller.ray import RayWorkerGroup
from verl.single_controller.ray.base import RayClassWithInitArgs, RayResourcePool
os.environ["RAY_DEDUP_LOGS"] = "0"
os.environ["NCCL_DEBUG"] = "WARN"
@ray.remote
class ModelActor(Worker):
def __init__(self):
pass
class HackSelf:
def __init__(self):
pass
def get_aux_metrics(self, test_proto):
sequence_ids = test_proto.batch["sequence_ids"]
decode_count = []
for i in range(sequence_ids.size(0)):
decode_count.append(len(sequence_ids[i].tolist()))
ret_proto = DataProto(
batch=TensorDict(
{"sequence_ids": sequence_ids, "decode_count": torch.tensor(decode_count)}, batch_size=sequence_ids.size(0)
)
)
return ret_proto
def test():
# construct model
ray.init()
# create 2 workers, each hold a GPU
resource_pool = RayResourcePool([2], use_gpu=True, name_prefix="a")
class_with_args = RayClassWithInitArgs(cls=ModelActor)
shard_wg = RayWorkerGroup(resource_pool, class_with_args)
test_bs = 8
test_proto = DataProto(
TensorDict(
{
"sequence_ids": torch.ones([test_bs, 2048], dtype=torch.int64),
},
batch_size=test_bs,
),
meta_info={"query_length": 1536},
)
# Sharding among different ranks
ret_proto1 = shard_wg.execute_with_func_generator(get_aux_metrics, test_proto)
# compare execute on driver
hs = HackSelf()
ret_proto2 = get_aux_metrics(hs, test_proto)
torch.testing.assert_close(ret_proto1.batch["decode_count"], ret_proto2.batch["decode_count"])
ray.shutdown()