diff --git a/examples/split_placement/split_monkey_patch.py b/examples/split_placement/split_monkey_patch.py index a1a0e4319..8cc73083d 100644 --- a/examples/split_placement/split_monkey_patch.py +++ b/examples/split_placement/split_monkey_patch.py @@ -31,6 +31,7 @@ from verl.trainer.ppo.ray_trainer import ( compute_timing_metrics, marked_timer, ) +from verl.trainer.ppo.reward import compute_reward from verl.utils.metric import reduce_metrics @@ -95,14 +96,22 @@ def fit(self): gen_baseline_output = self.actor_rollout_wg.generate_sequences(gen_baseline_batch) batch = batch.union(gen_baseline_output) - reward_baseline_tensor = self.reward_fn(batch) + # compute reward model score on batch + rm_scores = None + if self.use_rm and "rm_scores" not in batch.batch.keys(): + rm_scores = self.rm_wg.compute_rm_score(batch) + batch = batch.union(rm_scores) + reward_baseline_tensor, _ = compute_reward(batch, self.reward_fn) reward_baseline_tensor = reward_baseline_tensor.sum(dim=-1) - batch.pop(batch_keys=list(gen_baseline_output.batch.keys())) + keys_to_pop = set(gen_baseline_output.batch.keys()) + if rm_scores is not None: + keys_to_pop.update(rm_scores.batch.keys()) + batch.pop(batch_keys=list(keys_to_pop)) batch.batch["reward_baselines"] = reward_baseline_tensor - del gen_baseline_batch, gen_baseline_output + del rm_scores, gen_baseline_batch, gen_baseline_output batch.non_tensor_batch["uid"] = np.array( [str(uuid.uuid4()) for _ in range(len(batch.batch))], dtype=object @@ -142,13 +151,13 @@ def fit(self): # compute scores. Support both model and function-based. # We first compute the scores using reward model. Then, we call reward_fn to combine # the results from reward model and rule-based results. - if self.use_rm: + if self.use_rm and "rm_scores" not in batch.batch.keys(): # we first compute reward model score reward_tensor = self.rm_wg.compute_rm_score(batch) batch = batch.union(reward_tensor) # we combine with rule-based rm - reward_tensor = self.reward_fn(batch) + reward_tensor, _ = compute_reward(batch, self.reward_fn) batch.batch["token_level_scores"] = reward_tensor # compute rewards. apply_kl_penalty if available diff --git a/recipe/dapo/dapo_ray_trainer.py b/recipe/dapo/dapo_ray_trainer.py index fe88d3acf..d56ea14be 100644 --- a/recipe/dapo/dapo_ray_trainer.py +++ b/recipe/dapo/dapo_ray_trainer.py @@ -41,6 +41,7 @@ from verl.trainer.ppo.ray_trainer import ( compute_advantage, compute_response_mask, ) +from verl.trainer.ppo.reward import compute_reward from verl.utils.profiler import marked_timer from verl.utils.rollout_skip import RolloutSkip @@ -152,14 +153,22 @@ class RayDAPOTrainer(RayPPOTrainer): gen_baseline_output = self.actor_rollout_wg.generate_sequences(gen_baseline_batch) new_batch = new_batch.union(gen_baseline_output) - reward_baseline_tensor = self.reward_fn(new_batch) + # compute reward model score on new_batch + rm_scores = None + if self.use_rm and "rm_scores" not in new_batch.batch.keys(): + rm_scores = self.rm_wg.compute_rm_score(new_batch) + new_batch = new_batch.union(rm_scores) + reward_baseline_tensor, _ = compute_reward(new_batch, self.reward_fn) reward_baseline_tensor = reward_baseline_tensor.sum(dim=-1) - new_batch.pop(batch_keys=list(gen_baseline_output.batch.keys())) + keys_to_pop = set(gen_baseline_output.batch.keys()) + if rm_scores is not None: + keys_to_pop.update(rm_scores.batch.keys()) + new_batch.pop(batch_keys=list(keys_to_pop)) new_batch.batch["reward_baselines"] = reward_baseline_tensor - del gen_baseline_batch, gen_baseline_output + del rm_scores, gen_baseline_batch, gen_baseline_output new_batch.non_tensor_batch["uid"] = np.array( [str(uuid.uuid4()) for _ in range(len(new_batch.batch))], dtype=object @@ -172,21 +181,13 @@ class RayDAPOTrainer(RayPPOTrainer): # compute scores. Support both model and function-based. # We first compute the scores using reward model. Then, we call reward_fn to combine # the results from reward model and rule-based results. - if self.use_rm: + if self.use_rm and "rm_scores" not in new_batch.batch.keys(): # we first compute reward model score reward_tensor = self.rm_wg.compute_rm_score(new_batch) new_batch = new_batch.union(reward_tensor) # we combine with rule-based rm - reward_extra_infos_dict: dict[str, list] - try: - reward_result = self.reward_fn(new_batch, return_dict=True) - reward_tensor = reward_result["reward_tensor"] - reward_extra_infos_dict = reward_result.get("reward_extra_info", {}) - except Exception as e: - print(f"Error in reward_fn: {e}") - reward_tensor = self.reward_fn(new_batch) - reward_extra_infos_dict = {} + reward_tensor, reward_extra_infos_dict = compute_reward(new_batch, self.reward_fn) new_batch.batch["token_level_scores"] = reward_tensor diff --git a/recipe/entropy/entropy_ray_trainer.py b/recipe/entropy/entropy_ray_trainer.py index bd4f62e2b..e70e902eb 100644 --- a/recipe/entropy/entropy_ray_trainer.py +++ b/recipe/entropy/entropy_ray_trainer.py @@ -39,6 +39,7 @@ from verl.trainer.ppo.ray_trainer import ( compute_advantage, compute_response_mask, ) +from verl.trainer.ppo.reward import compute_reward from verl.utils.profiler import simple_timer @@ -129,14 +130,22 @@ class RayEntropyTrainer(RayPPOTrainer): gen_baseline_output = self.actor_rollout_wg.generate_sequences(gen_baseline_batch) new_batch = new_batch.union(gen_baseline_output) - reward_baseline_tensor = self.reward_fn(new_batch) + # compute reward model score on new_batch + rm_scores = None + if self.use_rm and "rm_scores" not in new_batch.batch.keys(): + rm_scores = self.rm_wg.compute_rm_score(new_batch) + new_batch = new_batch.union(rm_scores) + reward_baseline_tensor, _ = compute_reward(new_batch, self.reward_fn) reward_baseline_tensor = reward_baseline_tensor.sum(dim=-1) - new_batch.pop(batch_keys=list(gen_baseline_output.batch.keys())) + keys_to_pop = set(gen_baseline_output.batch.keys()) + if rm_scores is not None: + keys_to_pop.update(rm_scores.batch.keys()) + new_batch.pop(batch_keys=list(keys_to_pop)) new_batch.batch["reward_baselines"] = reward_baseline_tensor - del gen_baseline_batch, gen_baseline_output + del rm_scores, gen_baseline_batch, gen_baseline_output new_batch.non_tensor_batch["uid"] = np.array( [str(uuid.uuid4()) for _ in range(len(new_batch.batch))], dtype=object @@ -149,21 +158,13 @@ class RayEntropyTrainer(RayPPOTrainer): # compute scores. Support both model and function-based. # We first compute the scores using reward model. Then, we call reward_fn to combine # the results from reward model and rule-based results. - if self.use_rm: + if self.use_rm and "rm_scores" not in new_batch.batch.keys(): # we first compute reward model score reward_tensor = self.rm_wg.compute_rm_score(new_batch) new_batch = new_batch.union(reward_tensor) # we combine with rule-based rm - reward_extra_infos_dict: dict[str, list] - try: - reward_result = self.reward_fn(new_batch, return_dict=True) - reward_tensor = reward_result["reward_tensor"] - reward_extra_infos_dict = reward_result["reward_extra_info"] - except Exception as e: - print(f"Error in reward_fn: {e}") - reward_tensor = self.reward_fn(new_batch) - reward_extra_infos_dict = {} + reward_tensor, reward_extra_infos_dict = compute_reward(new_batch, self.reward_fn) new_batch.batch["token_level_scores"] = reward_tensor diff --git a/recipe/prime/prime_ray_trainer.py b/recipe/prime/prime_ray_trainer.py index 2cf9baa6b..ebfdcb703 100644 --- a/recipe/prime/prime_ray_trainer.py +++ b/recipe/prime/prime_ray_trainer.py @@ -329,6 +329,45 @@ class RayPRIMETrainer(RayPPOTrainer): if isinstance(self.train_dataloader.dataset, RLHFDataset): self.train_dataloader.dataset.resume_dataset_state() + def compute_reward(self, batch: DataProto, n_samples: int): + update_style = self.config.reward_model.model.get("update", "none") + reward_output_metrics = {} + if update_style == "none": # only run forward + reward_output = self.rm_wg.compute_rm_score(batch) + elif update_style == "after": # update and directly return the reward + reward_output = self.rm_wg.update_rm(batch) + elif update_style == "before": # update reward model, and then run forward + reward_output = self.rm_wg.update_rm(batch) + if "metrics" in reward_output.meta_info.keys(): + reward_output_metrics = reduce_metrics(reward_output.meta_info["metrics"]) + + reward_output = self.rm_wg.compute_rm_score(batch) + elif update_style == "reverse": # run forward to calculate statistics, then update reward model + reward_output = self.rm_wg.compute_rm_score(batch) + + # broadcast q and acc tensor to each result + bc_td = DataProto.from_dict( + tensors={ + "Q_bc": reward_output.batch["q"] + .sum(dim=-1) + .view(-1, n_samples) + .unsqueeze(1) + .expand(-1, n_samples, -1) + .reshape(-1, n_samples), + "acc_bc": batch.batch["acc"] + .view(-1, n_samples) + .unsqueeze(1) + .expand(-1, n_samples, -1) + .reshape(-1, n_samples), + } + ) + batch = batch.union(bc_td) + reward_output = self.rm_wg.update_rm(batch) + else: + raise NotImplementedError + + return reward_output, reward_output_metrics + def fit(self): """ The training loop of PPO. @@ -391,10 +430,19 @@ class RayPRIMETrainer(RayPPOTrainer): gen_baseline_output = self.actor_rollout_wg.generate_sequences(gen_baseline_batch) batch = batch.union(gen_baseline_output) - reward_baseline_tensor = self.reward_fn(batch) + rm_scores, _ = self.compute_reward(batch, 1) + reward_baseline_tensor = rm_scores.batch.get( + "rm_scores", rm_scores.batch.get("acc_bc", None) + ) + if reward_baseline_tensor is None: + raise ValueError( + "Neither 'rm_scores' nor 'acc_bc' found in reward model output for baseline." + ) reward_baseline_tensor = reward_baseline_tensor.sum(dim=-1) - batch.pop(batch_keys=list(gen_baseline_output.batch.keys())) + keys_to_pop = set(gen_baseline_output.batch.keys()) + keys_to_pop.update(rm_scores.batch.keys()) + batch.pop(batch_keys=list(keys_to_pop)) batch.batch["reward_baselines"] = reward_baseline_tensor @@ -450,46 +498,11 @@ class RayPRIMETrainer(RayPPOTrainer): with simple_timer("adv", timing_raw): if self.use_rm: - update_style = self.config.reward_model.model.get("update", "none") - if update_style == "none": # only run forward - reward_output = self.rm_wg.compute_rm_score(batch) - elif update_style == "after": # update and directly return the reward - reward_output = self.rm_wg.update_rm(batch) - elif update_style == "before": # update reward model, and then run forward - reward_output = self.rm_wg.update_rm(batch) - if "metrics" in reward_output.meta_info.keys(): - reward_output_metrics = reduce_metrics(reward_output.meta_info["metrics"]) - metrics.update(reward_output_metrics) - - reward_output = self.rm_wg.compute_rm_score(batch) - elif ( - update_style == "reverse" - ): # run forward to calculate statistics, then update reward model - reward_output = self.rm_wg.compute_rm_score(batch) - # broadcast q and acc tensor to each result - bc_td = DataProto.from_dict( - tensors={ - "Q_bc": reward_output.batch["q"] - .sum(dim=-1) - .view(-1, n_samples) - .unsqueeze(1) - .expand(-1, n_samples, -1) - .reshape(-1, n_samples), - "acc_bc": batch.batch["acc"] - .view(-1, n_samples) - .unsqueeze(1) - .expand(-1, n_samples, -1) - .reshape(-1, n_samples), - } - ) - batch = batch.union(bc_td) - reward_output = self.rm_wg.update_rm(batch) - else: - raise NotImplementedError + reward_output, reward_output_metrics = self.compute_reward(batch, n_samples) batch = batch.union(reward_output) if "metrics" in reward_output.meta_info.keys(): - reward_output_metrics = reduce_metrics(reward_output.meta_info["metrics"]) - metrics.update(reward_output_metrics) + reward_output_metrics.update(reduce_metrics(reward_output.meta_info["metrics"])) + metrics.update(reward_output_metrics) # compute advantages, executed on the driver process batch = compute_advantage( diff --git a/recipe/sppo/sppo_ray_trainer.py b/recipe/sppo/sppo_ray_trainer.py index 32b504e70..75d0a4749 100644 --- a/recipe/sppo/sppo_ray_trainer.py +++ b/recipe/sppo/sppo_ray_trainer.py @@ -205,14 +205,22 @@ class RaySPPOTrainer(RayPPOTrainer): gen_baseline_output = self.actor_rollout_wg.generate_sequences(gen_baseline_batch) batch = batch.union(gen_baseline_output) - reward_baseline_tensor = self.reward_fn(batch) + # compute reward model score on batch + rm_scores = None + if self.use_rm and "rm_scores" not in batch.batch.keys(): + rm_scores = self.rm_wg.compute_rm_score(batch) + batch = batch.union(rm_scores) + reward_baseline_tensor, _ = compute_reward(batch, self.reward_fn) reward_baseline_tensor = reward_baseline_tensor.sum(dim=-1) - batch.pop(batch_keys=list(gen_baseline_output.batch.keys())) + keys_to_pop = set(gen_baseline_output.batch.keys()) + if rm_scores is not None: + keys_to_pop.update(rm_scores.batch.keys()) + batch.pop(batch_keys=list(keys_to_pop)) batch.batch["reward_baselines"] = reward_baseline_tensor - del gen_baseline_batch, gen_baseline_output + del rm_scores, gen_baseline_batch, gen_baseline_output batch.non_tensor_batch["uid"] = np.array( [str(uuid.uuid4()) for _ in range(len(batch.batch))], dtype=object @@ -235,7 +243,7 @@ class RaySPPOTrainer(RayPPOTrainer): with simple_timer("reward", timing_raw): # compute reward model score - if self.use_rm: + if self.use_rm and "rm_scores" not in batch.batch.keys(): reward_tensor = self.rm_wg.compute_rm_score(batch) batch = batch.union(reward_tensor) diff --git a/verl/trainer/ppo/ray_trainer.py b/verl/trainer/ppo/ray_trainer.py index 1c32bdda0..cff021b25 100644 --- a/verl/trainer/ppo/ray_trainer.py +++ b/verl/trainer/ppo/ray_trainer.py @@ -1065,14 +1065,22 @@ class RayPPOTrainer: else: gen_baseline_output = self.async_rollout_manager.generate_sequences(gen_baseline_batch) batch = batch.union(gen_baseline_output) - reward_baseline_tensor = self.reward_fn(batch) + # compute reward model score on batch + rm_scores = None + if self.use_rm and "rm_scores" not in batch.batch.keys(): + rm_scores = self.rm_wg.compute_rm_score(batch) + batch = batch.union(rm_scores) + reward_baseline_tensor, _ = compute_reward(batch, self.reward_fn) reward_baseline_tensor = reward_baseline_tensor.sum(dim=-1) - batch.pop(batch_keys=list(gen_baseline_output.batch.keys())) + keys_to_pop = set(gen_baseline_output.batch.keys()) + if rm_scores is not None: + keys_to_pop.update(rm_scores.batch.keys()) + batch.pop(batch_keys=list(keys_to_pop)) batch.batch["reward_baselines"] = reward_baseline_tensor - del gen_baseline_batch, gen_baseline_output + del rm_scores, gen_baseline_batch, gen_baseline_output # repeat to align with repeated responses in rollout batch = batch.repeat(repeat_times=self.config.actor_rollout_ref.rollout.n, interleave=True) batch = batch.union(gen_batch_output)