!218 新增datatrove最佳实践

Merge pull request !218 from 张烨槟/datatrove_best_practice
This commit is contained in:
张烨槟
2025-05-22 09:05:53 +00:00
committed by i-robot
parent ab3dffb09e
commit 562de220f2

View File

@ -0,0 +1,354 @@
# 在NPU上使用DataTrove进行数据过滤处理
DataTrove是一个数据处理和分析的工具库主要用于高效处理大规模数据集。它提供了一系列模块化组件包括数据读取、去重、过滤和写入等功能能够灵活地组合成数据处理管道满足不同场景下的数据处理需求。本教程介绍如何使用DataTrove第三方套件在npu上进行数据过滤包括格式转换、数据去重、敏感词过滤和中文过滤。
## 环境准备
基础环境配置请参考 [环境准备文档](../install.md)。
```bash
pip install datatrove[processing,multilingual]
```
## 示例数据集
基于`alpaca`数据集,我们构建了一个简单的需要处理的示例数据集,其格式为`CSV`,为了演示数据过滤的效果,其中包含中英文数据、含有敏感词或违禁词的数据、重复出现的数据,内容如下:
| instruction | input | output |
|-------|-------|-------|
| 这是一个包含敏感词1的语句。 | | 这是一个很正常的回答。 |
| 请解释一下关于A物品的来源。 | | 这是一个包含敏感词2的回答。 |
| 请根据输入作一首诗 | 违禁词 | 这是一个很正常的回答。 |
| 输入三支篮球队的名称并生成一个适当的口号。 | 输入:俄克拉荷马城雷霆队,芝加哥公牛队,布鲁克林网队。 | 输出: "雷霆,公牛和网队:各显神通,角逐群雄!" |
| 输入三支篮球队的名称并生成一个适当的口号。 | 输入:俄克拉荷马城雷霆队,芝加哥公牛队,布鲁克林网队。 | 输出: "雷霆,公牛和网队:各显神通,角逐群雄!" |
| 请根据以下文章构建一个需要回答的问题。 | | 我不好意思,您没有提供任何信息或文章来供我根据它构建问题。 请提供相应的文章或信息,以便我能够帮助您构建问题。 |
| 请根据以下文章构建一个需要回答的问题。 | | 我不好意思,您没有提供任何信息或文章来供我根据它构建问题。 请提供相应的文章或信息,以便我能够帮助您构建问题。 |
| 请根据以下文章构建一个需要回答的问题。 | | 我不好意思,您没有提供任何信息或文章来供我根据它构建问题。 请提供相应的文章或信息,以便我能够帮助您构建问题。 |
| 写一个关于上周末你做了什么的简短叙述。 | | 由于我是一个人工智能助手所以我并不能真正的“做”什么事情。我的“工作”时间是24/7无休确保我随时可供帮助和服务。所以在上周末我一直都在“工作”在响应用户的指令帮助他们查询信息执行操作解决问题。 |
| 分类以下数字系列。 | 输入: 2、4、6、8 | 此系列数字为偶数序列。 |
| Create an effective 140 character twitter post | | 🚀Ready for takeoff! Explore the world beyond your imagination - embark on new adventures and make lasting memories. #travel #Wanderlust140 |
| 写出狗和猫的两个不同之处。 | | 1. 物种不同:狗属于犬科,而猫属于猫科。它们在生物分类上就有区别。 2. 行为不同:狗更依赖人类,更忠诚,而猫则更独立。例如狗更喜欢和主人玩耍互动,接受训练,而猫一般更享受独处和独立玩耍。 |
| 帮我分类以下动物:猪、鸡、牛。 | | 猪、牛都属于哺乳动物,是家畜。而鸡属于鸟类,也是家禽。 所以可以归为两类: 1. 哺乳动物:猪、牛 2. 鸟类:鸡。 |
| 用“黎明”、“天空”和“广阔”这三个词组成一个句子。 | | 随着黎明的到来,天空逐渐明亮,一片广阔的视野展现在眼前。 |
| Transform the following sentence using a synonym: The car sped quickly. | | The car accelerated rapidly. |
| 生成两个关于自动化的问题。 | | 1. 如何利用自动化技术提高生产效率并降低成本? 2. 自动化对未来劳动力市场和就业情况会产生什么影响? |
| 创作一个关于夏天的20个词的复合诗。 | | 烈日炎烤花初繁, 草地碧绿树也攀。 鸟儿啁啾啼声甜, 蝴蝶蜜峰跳舞妍。 河流清澈碧波浅, 蓝天白云自由幻。 夏至未至心已陶, 悠闲惬意梦中绵。 |
| 去掉以下段落中的重复词语:天空是蓝色的,而且很美丽。 | | 天空是蓝色的,而且很美丽。 原文没有出现重复的词语。 |
| Invent a new word by combining two existing words. | | "Chillax" - a combination of the words "chill" and "relax". It means to calm down and unwind, enjoying oneself in a laidback manner. |
## 数据过滤处理
### 格式转换
`CSV`格式的数据集在许多领域中被广泛应用使用DataTrove我们可以在对数据集进行过滤处理的同时轻松地将原始`CSV`格式的数据集转换为适配openmind套件的`JSONL`格式。
具体来说DataTrove提供了灵活的`CSVReader``JsonlWriter`组件,通过继承并自定义这些组件的适配器`adapter`,我们可以精确地定义数据从`CSV``JSONL`的转换逻辑,具体代码请参考下文的完整代码示例。
### 敏感词过滤
根据用户需求可以自定义敏感词列表通过DataTrove的`LambdaFilter`结合自定义函数使用`pipeline`进行敏感词过滤,核心代码如下:
```python
SENSITIVE_WORDS = ["敏感词1", "敏感词2", "违禁词"]
def sensitive_words_filter(doc: Document, sensitive_words):
return not any(
word in doc.text
for word in sensitive_words
if isinstance(word, str)
)
```
### 中文过滤
根据用户需求可以自定义中文字符比例阈值教程中采用0.1通过DataTrove的`LambdaFilter`结合自定义函数使用`pipeline`进行中文过滤,核心代码如下:
```python
def is_chinese_char(c: str):
return (
'\u4e00' <= c <= '\u9fff' or
'\u3400' <= c <= '\u4dbf' or
'\U00020000' <= c <= '\U0002a6df'
)
def chinese_ratio_filter(doc: Document, threshold=0.1):
text = doc.text
if not text:
return False
chinese_chars_count = sum(1 for c in text if is_chinese_char(c))
chinese_ratio = chinese_chars_count / len(text)
return chinese_ratio > threshold
```
同理可以参照这个代码逻辑使用DataTrove进行英文过滤。
### 数据去重
使用DataTrove实现数据去重整个流程分为四个阶段每个阶段都通过`LocalPipelineExecutor`来执行特定的任务,代码框架和详细讲解如下:
#### 配置MinHash参数
```python
minhash_config = MinhashConfig(
hash_config=HashConfig(precision=64),
num_buckets=14,
hashes_per_bucket=8,
)
```
根据用户需要,配置哈希精度、分桶数量和每个桶的哈希值数量。
#### 阶段1生成签名
```python
stage1 = LocalPipelineExecutor(
pipeline=[
CSVAlpacaReader(),
MinhashDedupSignature(
output_folder=f"{WORK_DIR}/signatures",
config=minhash_config
)
],
tasks=1,
logging_dir=f"{WORK_DIR}/logs/stage1"
)
```
- **读取文件**:使用`CSVAlpacaReader`读取`CSV`文件内容。
- **生成签名**:通过`MinhashDedupSignature`为每个文档生成唯一的`MinHash`签名。
- **保存签名**:将生成的签名保存至指定文件夹,为后续去重操作做准备。
#### 阶段2处理分桶
```python
stage2 = LocalPipelineExecutor(
pipeline=[
MinhashDedupBuckets(
input_folder=f"{WORK_DIR}/signatures",
output_folder=f"{WORK_DIR}/buckets",
config=minhash_config
)
],
tasks=minhash_config.num_buckets,
logging_dir=f"{WORK_DIR}/logs/stage2"
)
```
- **分桶操作**:利用`MinhashDedupBuckets`把生成的签名分配到不同的桶中。
- **并行处理**:并行任务数与分桶数量`minhash_config.num_buckets`相同,可显著提升处理效率。
#### 阶段3聚类去重
```python
stage3 = LocalPipelineExecutor(
pipeline=[
MinhashDedupCluster(
input_folder=f"{WORK_DIR}/buckets",
output_folder=f"{WORK_DIR}/remove_ids",
config=minhash_config
)
],
tasks=1,
logging_dir=f"{WORK_DIR}/logs/stage3"
)
```
- **桶内聚类**:借助`MinhashDedupCluster`在各桶内执行聚类去重操作。
- **记录移除ID**记录需要被移除的重复文档ID。
#### 阶段4过滤输出
```python
stage4 = LocalPipelineExecutor(
pipeline=[
CSVAlpacaReader(),
MinhashDedupFilter(
input_folder=f"{WORK_DIR}/remove_ids",
exclusion_writer=JsonlWriter(f"{WORK_DIR}/removed")
),
AlpacaWriter()
],
tasks=1,
logging_dir=f"{WORK_DIR}/logs/stage4"
)
```
- **重新读取文件**:再次读取原始`CSV`文件输入文件需要与阶段1中完全一致。
- **过滤重复文档**:运用`MinhashDedupFilter`结合阶段3产生的移除ID列表过滤掉重复文档。
- **输出结果**:使用`AlpacaWriter`将过滤后的文档写入最终输出文件。
## 完整代码示例
```python
from datatrove.executor import LocalPipelineExecutor
from datatrove.pipeline.dedup import MinhashDedupSignature
from datatrove.pipeline.dedup.minhash import (
MinhashConfig,
MinhashDedupBuckets,
MinhashDedupCluster,
MinhashDedupFilter,
)
from datatrove.pipeline.readers import CSVReader
from datatrove.pipeline.writers.jsonl import JsonlWriter
from datatrove.utils.hashing import HashConfig
from datatrove.data import Document
from datatrove.pipeline.filters import LambdaFilter
WORK_DIR = "./temp"
# 根据格式自定义AlpacaReader
class CSVAlpacaReader(CSVReader):
def __init__(self):
super().__init__(
data_folder=".",
glob_pattern="input.csv",
text_key="text",
adapter=lambda self, row, path, id_in_file:{
"text": "\n".join([
row.get("instruction", ""),
row.get("input", ""),
row.get("output", "")
]),
"metadata": {
"instruction": row.get("instruction", ""),
"input": row.get("input", ""),
"output": row.get("output", "")
},
"id": id_in_file
}
)
# 自定义AlpacaWriter
class AlpacaWriter(JsonlWriter):
def __init__(self):
super().__init__(
output_folder=".",
output_filename="output.jsonl",
adapter=lambda _, doc: {
"instruction": doc.metadata.get("instruction"),
"input": doc.metadata.get("input", ""),
"output": doc.metadata.get("output", "")
},
expand_metadata=False,
compression="infer"
)
# 敏感词筛选
SENSITIVE_WORDS = ["敏感词1", "敏感词2", "违禁词"]
def sensitive_words_filter(doc: Document, sensitive_words):
return not any(
word in doc.text
for word in sensitive_words
if isinstance(word, str)
)
# 中文符号判断
def is_chinese_char(c: str):
return (
'\u4e00' <= c <= '\u9fff' or
'\u3400' <= c <= '\u4dbf' or
'\U00020000' <= c <= '\U0002a6df'
)
# 中文筛选
def chinese_ratio_filter(doc: Document, threshold=0.1):
text = doc.text
if not text:
return False
chinese_chars_count = sum(1 for c in text if is_chinese_char(c))
chinese_ratio = chinese_chars_count / len(text)
return chinese_ratio > threshold
# 去重配置参数
minhash_config = MinhashConfig(
hash_config=HashConfig(precision=64),
num_buckets=14,
hashes_per_bucket=8,
)
# 阶段1生成签名
stage1 = LocalPipelineExecutor(
pipeline=[
CSVAlpacaReader(),
MinhashDedupSignature(
output_folder=f"{WORK_DIR}/signatures",
config=minhash_config
)
],
tasks=1,
logging_dir=f"{WORK_DIR}/logs/stage1"
)
# 阶段2处理分桶
stage2 = LocalPipelineExecutor(
pipeline=[
MinhashDedupBuckets(
input_folder=f"{WORK_DIR}/signatures",
output_folder=f"{WORK_DIR}/buckets",
config=minhash_config
)
],
tasks=minhash_config.num_buckets,
logging_dir=f"{WORK_DIR}/logs/stage2"
)
# 阶段3聚类去重
stage3 = LocalPipelineExecutor(
pipeline=[
MinhashDedupCluster(
input_folder=f"{WORK_DIR}/buckets",
output_folder=f"{WORK_DIR}/remove_ids",
config=minhash_config
)
],
tasks=1,
logging_dir=f"{WORK_DIR}/logs/stage3"
)
# 阶段4过滤输出
stage4 = LocalPipelineExecutor(
pipeline=[
CSVAlpacaReader(),
MinhashDedupFilter(
input_folder=f"{WORK_DIR}/remove_ids",
exclusion_writer=JsonlWriter(f"{WORK_DIR}/removed")
),
LambdaFilter(chinese_ratio_filter),
LambdaFilter(lambda doc: sensitive_words_filter(doc, SENSITIVE_WORDS)),
AlpacaWriter()
],
tasks=1,
logging_dir=f"{WORK_DIR}/logs/stage4"
)
if __name__ == "__main__":
stage1.run()
stage2.run()
stage3.run()
stage4.run()
```
## 过滤效果
进行数据过滤处理后,输出一个`alpaca`格式的`JSONL`文件,去除了输入数据中的重复、敏感与违禁和英文部分,内容如下:
```jsonl
{"instruction": "输入三支篮球队的名称并生成一个适当的口号。", "input": "输入:俄克拉荷马城雷霆队,芝加哥公牛队,布鲁克林网队。", "output": "输出: \"雷霆,公牛和网队:各显神通,角逐群雄!\""}
{"instruction": "请根据以下文章构建一个需要回答的问题。", "input": "", "output": "我不好意思,您没有提供任何信息或文章来供我根据它构建问题。 请提供相应的文章或信息,以便我能够帮助您构建问题。"}
{"instruction": "写一个关于上周末你做了什么的简短叙述。", "input": "", "output": "由于我是一个人工智能助手所以我并不能真正的“做”什么事情。我的“工作”时间是24/7无休确保我随时可供帮助和服务。所以在上周末我一直都在“工作”在响应用户的指令帮助他们查询信息执行操作解决问题。"}
{"instruction": "分类以下数字系列。", "input": "输入: 2、4、6、8", "output": "此系列数字为偶数序列。"}
{"instruction": "写出狗和猫的两个不同之处。", "input": "", "output": "1. 物种不同:狗属于犬科,而猫属于猫科。它们在生物分类上就有区别。\n\n2. 行为不同:狗更依赖人类,更忠诚,而猫则更独立。例如狗更喜欢和主人玩耍互动,接受训练,而猫一般更享受独处和独立玩耍。"}
{"instruction": "帮我分类以下动物:猪、鸡、牛。", "input": "", "output": "猪、牛都属于哺乳动物,是家畜。而鸡属于鸟类,也是家禽。\n\n所以可以归为两类\n1. 哺乳动物:猪、牛\n2. 鸟类:鸡。"}
{"instruction": "用“黎明”、“天空”和“广阔”这三个词组成一个句子。", "input": "", "output": "随着黎明的到来,天空逐渐明亮,一片广阔的视野展现在眼前。"}
{"instruction": "生成两个关于自动化的问题。", "input": "", "output": "1. 如何利用自动化技术提高生产效率并降低成本?\n2. 自动化对未来劳动力市场和就业情况会产生什么影响?"}
{"instruction": "创作一个关于夏天的20个词的复合诗。", "input": "", "output": "烈日炎烤花初繁,\n草地碧绿树也攀。\n鸟儿啁啾啼声甜\n蝴蝶蜜峰跳舞妍。\n河流清澈碧波浅\n蓝天白云自由幻。\n夏至未至心已陶\n悠闲惬意梦中绵。"}
{"instruction": "去掉以下段落中的重复词语:天空是蓝色的,而且很美丽。", "input": "", "output": "天空是蓝色的,而且很美丽。\n\n原文没有出现重复的词语。"}
```
## 微调
通过以上教程生产的`JSONL`文件已适配openmind套件的微调流程可直接投入使用。关于数据集配置细节请查阅[数据集配置](../basic_tutorial/train/datasets.md),微调部分请参考[PyTorch模型sft微调](../basic_tutorial/train/posttrain/finetune/finetune_pt.md),以便您高效完成模型优化。