[tool] feat: support load local datasets when preparing datasets (#3621)

### What does this PR do?

This is a follow-up PR to https://github.com/volcengine/verl/pull/3362

### Checklist Before Starting

- [x] Search for similar PRs. Paste at least one query link here: ...
- [x] Format the PR title as `[{modules}] {type}: {description}` (This
will be checked by the CI)
- `{modules}` include `fsdp`, `megatron`, `sglang`, `vllm`, `rollout`,
`trainer`, `ci`, `training_utils`, `recipe`, `hardware`, `deployment`,
`ray`, `worker`, `single_controller`, `misc`, `perf`, `model`, `algo`,
`env`, `tool`, `ckpt`, `doc`, `data`
- If this PR involves multiple modules, separate them with `,` like
`[megatron, fsdp, doc]`
  - `{type}` is in `feat`, `fix`, `refactor`, `chore`, `test`
- If this PR breaks any API (CLI arguments, config, function signature,
etc.), add `[BREAKING]` to the beginning of the title.
  - Example: `[BREAKING][fsdp, megatron] feat: dynamic batching`

### Test

> For changes that can not be tested by CI (e.g., algorithm
implementation, new model support), validate by experiment(s) and show
results like training curve plots, evaluation results, etc.

### API and Usage Example

> Demonstrate how the API changes if any, and provide usage example(s)
if possible.

```python
python examples/data_preprocess/hellaswag.py --local_dataset_path ~/verl/data/hellaswag/ --local_save_dir ~/verl/data/hellaswag_sft
```

### Design & Code Changes

> Demonstrate the high-level design if this PR is complex, and list the
specific changes.

### Checklist Before Submitting

> [!IMPORTANT]
> Please check all the following items before requesting a review,
otherwise the reviewer might deprioritize this PR for review.

- [x] Read the [Contribute
Guide](https://github.com/volcengine/verl/blob/main/CONTRIBUTING.md).
- [x] Apply [pre-commit
checks](https://github.com/volcengine/verl/blob/main/CONTRIBUTING.md#code-linting-and-formatting):
`pre-commit install && pre-commit run --all-files --show-diff-on-failure
--color=always`
- [ ] Add / Update [the
documentation](https://github.com/volcengine/verl/tree/main/docs).
- [ ] Add unit or end-to-end test(s) to [the CI
workflow](https://github.com/volcengine/verl/tree/main/.github/workflows)
to cover all the code. If not feasible, explain why: ...
- [ ] Once your PR is ready for CI, send a message in [the `ci-request`
channel](https://verl-project.slack.com/archives/C091TCESWB1) in [the
`verl` Slack
workspace](https://join.slack.com/t/verl-project/shared_invite/zt-3855yhg8g-CTkqXu~hKojPCmo7k_yXTQ).
(If not accessible, please try [the Feishu group
(飞书群)](https://applink.larkoffice.com/client/chat/chatter/add_by_link?link_token=772jd4f1-cd91-441e-a820-498c6614126a).)
This commit is contained in:
Huazhong
2025-09-26 11:42:53 +08:00
committed by GitHub
parent fbfdc81f9a
commit 231e18948d
5 changed files with 115 additions and 34 deletions

View File

@ -26,13 +26,22 @@ from verl.utils.hdfs_io import copy, makedirs
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--local_dir", default="~/data/retool_aime2024")
parser.add_argument("--local_dir", default=None, help="The save directory for the preprocessed dataset.")
parser.add_argument("--hdfs_dir", default=None)
parser.add_argument("--local_dataset_path", default=None, help="The local path to the raw dataset, if it exists.")
parser.add_argument(
"--local_save_dir", default="~/data/retool_aime2024", help="The save directory for the preprocessed dataset."
)
args = parser.parse_args()
local_dataset_path = args.local_dataset_path
data_path = "BytedTsinghua-SIA/AIME-2024"
dataset = datasets.load_dataset(data_path, "default")
if local_dataset_path is not None:
dataset = datasets.load_dataset(local_dataset_path, "default")
else:
dataset = datasets.load_dataset(data_path, "default")
train_dataset = dataset["train"]
@ -56,11 +65,15 @@ if __name__ == "__main__":
train_dataset = train_dataset.map(function=make_map_fn("train"), with_indices=True)
local_dir = args.local_dir
hdfs_dir = args.hdfs_dir
local_save_dir = args.local_dir
if local_save_dir is not None:
print("Warning: Argument 'local_dir' is deprecated. Please use 'local_save_dir' instead.")
else:
local_save_dir = args.local_save_dir
train_dataset.to_parquet(os.path.join(local_dir, "train.parquet"))
train_dataset.to_parquet(os.path.join(local_save_dir, "train.parquet"))
if hdfs_dir is not None:
makedirs(hdfs_dir)
copy(src=local_dir, dst=hdfs_dir)
copy(src=local_save_dir, dst=hdfs_dir)

View File

@ -26,13 +26,22 @@ from verl.utils.hdfs_io import copy, makedirs
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--local_dir", default="~/data/retool_dapo")
parser.add_argument("--local_dir", default=None, help="The save directory for the preprocessed dataset.")
parser.add_argument("--hdfs_dir", default=None)
parser.add_argument("--local_dataset_path", default=None, help="The local path to the raw dataset, if it exists.")
parser.add_argument(
"--local_save_dir", default="~/data/retool_dapo", help="The save directory for the preprocessed dataset."
)
args = parser.parse_args()
local_dataset_path = args.local_dataset_path
data_path = "BytedTsinghua-SIA/DAPO-Math-17k"
dataset = datasets.load_dataset(data_path, "default")
if local_dataset_path is not None:
dataset = datasets.load_dataset(local_dataset_path, "default")
else:
dataset = datasets.load_dataset(data_path, "default")
train_dataset = dataset["train"]
@ -56,11 +65,15 @@ if __name__ == "__main__":
train_dataset = train_dataset.map(function=make_map_fn("train"), with_indices=True)
local_dir = args.local_dir
hdfs_dir = args.hdfs_dir
local_save_dir = args.local_dir
if local_save_dir is not None:
print("Warning: Argument 'local_dir' is deprecated. Please use 'local_save_dir' instead.")
else:
local_save_dir = args.local_save_dir
train_dataset.to_parquet(os.path.join(local_dir, "train.parquet"))
train_dataset.to_parquet(os.path.join(local_save_dir, "train.parquet"))
if hdfs_dir is not None:
makedirs(hdfs_dir)
copy(src=local_dir, dst=hdfs_dir)
copy(src=local_save_dir, dst=hdfs_dir)

View File

@ -27,8 +27,11 @@ from tqdm.auto import tqdm
from verl.utils.fs import copy, makedirs
def generate_sft_dataset(target_hdfs_path_dir, local_dir="~/data/full_hh_rlh/sft"):
dataset = load_dataset("Dahoas/full-hh-rlhf")
def generate_sft_dataset(target_hdfs_path_dir, local_dir="~/data/full_hh_rlh/sft", local_dataset_path=None):
if local_dataset_path is not None:
dataset = load_dataset(local_dataset_path)
else:
dataset = load_dataset("Dahoas/full-hh-rlhf")
output = {"prompt": [], "response": []}
for data in tqdm(dataset["train"]):
# add chosen
@ -55,9 +58,13 @@ def generate_sft_dataset(target_hdfs_path_dir, local_dir="~/data/full_hh_rlh/sft
copy(local_path, hdfs_dir)
def generate_rm_dataset(target_hdfs_path_dir, local_dir="~/data/full_hh_rlh/rm"):
train_dataset = load_dataset("Dahoas/full-hh-rlhf", split="train[:75%]")
test_dataset = load_dataset("Dahoas/full-hh-rlhf", split="train[-25%:]")
def generate_rm_dataset(target_hdfs_path_dir, local_dir="~/data/full_hh_rlh/rm", local_dataset_path=None):
if local_dataset_path is not None:
train_dataset = load_dataset(local_dataset_path, split="train[:75%]")
test_dataset = load_dataset(local_dataset_path, split="train[-25%:]")
else:
train_dataset = load_dataset("Dahoas/full-hh-rlhf", split="train[:75%]")
test_dataset = load_dataset("Dahoas/full-hh-rlhf", split="train[-25%:]")
local_dir = os.path.expanduser(local_dir)
os.makedirs(local_dir, exist_ok=True)
@ -83,8 +90,11 @@ def generate_rm_dataset(target_hdfs_path_dir, local_dir="~/data/full_hh_rlh/rm")
copy(local_path, hdfs_dir)
def generate_rl_dataset(target_hdfs_path_dir, local_dir="~/data/full_hh_rlhf/rl"):
dataset = load_dataset("Dahoas/full-hh-rlhf")
def generate_rl_dataset(target_hdfs_path_dir, local_dir="~/data/full_hh_rlhf/rl", local_dataset_path=None):
if local_dataset_path is not None:
dataset = load_dataset(local_dataset_path)
else:
dataset = load_dataset("Dahoas/full-hh-rlhf")
train_dataset = dataset["train"]
data_source = "Dahoas/full-hh-rlhf"
@ -124,16 +134,28 @@ def generate_rl_dataset(target_hdfs_path_dir, local_dir="~/data/full_hh_rlhf/rl"
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--split", type=str, choices=["sft", "rm", "rl"], required=True)
parser.add_argument("--local_dir", type=str, default="~/data/full_hh_rlhf")
parser.add_argument("--local_dir", default=None, help="The save directory for the preprocessed dataset.")
parser.add_argument("--hdfs_dir", type=str, required=False, default=None)
parser.add_argument("--local_dataset_path", default=None, help="The local path to the raw dataset, if it exists.")
parser.add_argument(
"--local_save_dir",
type=str,
default="~/data/full_hh_rlhf",
help="The save directory for the preprocessed dataset.",
)
args = parser.parse_args()
local_save_dir = args.local_dir
if local_save_dir is not None:
print("Warning: Argument 'local_dir' is deprecated. Please use 'local_save_dir' instead.")
else:
local_save_dir = args.local_save_dir
if args.split == "sft":
generate_sft_dataset(args.hdfs_dir, os.path.join(args.local_dir, args.split))
generate_sft_dataset(args.hdfs_dir, os.path.join(local_save_dir, args.split), args.local_dataset_path)
elif args.split == "rm":
generate_rm_dataset(args.hdfs_dir, os.path.join(args.local_dir, args.split))
generate_rm_dataset(args.hdfs_dir, os.path.join(local_save_dir, args.split), args.local_dataset_path)
elif args.split == "rl":
generate_rl_dataset(args.hdfs_dir, os.path.join(args.local_dir, args.split))
generate_rl_dataset(args.hdfs_dir, os.path.join(local_save_dir, args.split), args.local_dataset_path)
else:
raise NotImplementedError

View File

@ -28,13 +28,28 @@ from verl.utils.hdfs_io import copy, makedirs
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--local_dir", default="~/data/geo3k_multiturn_w_tool")
parser.add_argument("--local_dir", default=None, help="The save directory for the preprocessed dataset.")
parser.add_argument("--hdfs_dir", default=None)
parser.add_argument("--local_dataset_path", default=None, help="The local path to the raw dataset, if it exists.")
parser.add_argument(
"--local_save_dir",
default="~/data/geo3k_multiturn_w_tool",
help="The save directory for the preprocessed dataset.",
)
args = parser.parse_args()
local_dataset_path = args.local_dataset_path
data_source = "hiyouga/geometry3k"
dataset = datasets.load_dataset(data_source)
if local_dataset_path is not None:
dataset = datasets.load_dataset(local_dataset_path)
else:
dataset = datasets.load_dataset(data_source)
train_dataset = dataset["train"]
test_dataset = dataset["test"]
instruction_following = (
r"You FIRST think about the reasoning process as an internal monologue and then provide the final answer. "
r"The reasoning process MUST BE enclosed within <think> </think> tags. "
@ -90,10 +105,16 @@ if __name__ == "__main__":
train_dataset = train_dataset.map(function=make_map_fn("train"), with_indices=True, num_proc=8)
test_dataset = test_dataset.map(function=make_map_fn("test"), with_indices=True, num_proc=8)
local_dir = args.local_dir
hdfs_dir = args.hdfs_dir
train_dataset.to_parquet(os.path.join(local_dir, "train.parquet"))
test_dataset.to_parquet(os.path.join(local_dir, "test.parquet"))
local_save_dir = args.local_dir
if local_save_dir is not None:
print("Warning: Argument 'local_dir' is deprecated. Please use 'local_save_dir' instead.")
else:
local_save_dir = args.local_save_dir
train_dataset.to_parquet(os.path.join(local_save_dir, "train.parquet"))
test_dataset.to_parquet(os.path.join(local_save_dir, "test.parquet"))
if hdfs_dir is not None:
makedirs(hdfs_dir)
copy(src=local_dir, dst=hdfs_dir)
copy(src=local_save_dir, dst=hdfs_dir)

View File

@ -36,14 +36,22 @@ def preprocess(text):
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--local_dir", default="/opt/tiger/hellaswag")
parser.add_argument("--local_dir", default=None, help="The save directory for the preprocessed dataset.")
parser.add_argument("--hdfs_dir", default=None)
parser.add_argument("--local_dataset_path", default=None, help="The local path to the raw dataset, if it exists.")
parser.add_argument(
"--local_save_dir", default="~/data/hellaswag", help="The save directory for the preprocessed dataset."
)
args = parser.parse_args()
local_dataset_path = args.local_dataset_path
data_source = "Rowan/hellaswag"
dataset = datasets.load_dataset(data_source, trust_remote_code=True)
if local_dataset_path is not None:
dataset = datasets.load_dataset(local_dataset_path)
else:
dataset = datasets.load_dataset(data_source, trust_remote_code=True)
train_dataset = dataset["train"]
val_dataset = dataset["validation"]
@ -83,14 +91,18 @@ if __name__ == "__main__":
val_dataset = val_dataset.map(function=make_map_fn("validation"), with_indices=True)
test_dataset = test_dataset.map(function=make_map_fn("test"), with_indices=True)
local_dir = args.local_dir
hdfs_dir = args.hdfs_dir
local_save_dir = args.local_dir
if local_save_dir is not None:
print("Warning: Argument 'local_dir' is deprecated. Please use 'local_save_dir' instead.")
else:
local_save_dir = args.local_save_dir
train_dataset.to_parquet(os.path.join(local_dir, "train.parquet"))
val_dataset.to_parquet(os.path.join(local_dir, "validation.parquet"))
test_dataset.to_parquet(os.path.join(local_dir, "test.parquet"))
train_dataset.to_parquet(os.path.join(local_save_dir, "train.parquet"))
val_dataset.to_parquet(os.path.join(local_save_dir, "validation.parquet"))
test_dataset.to_parquet(os.path.join(local_save_dir, "test.parquet"))
if hdfs_dir is not None:
makedirs(hdfs_dir)
copy(src=local_dir, dst=hdfs_dir)
copy(src=local_save_dir, dst=hdfs_dir)