Fixed save_checkpoint race when consolidating NVMe offloaded tensors (#7613)

Past Discussion: #7549

Signed-off-by: H1manshu21 <himanshuwindows8.1@gmail.com>
Co-authored-by: Olatunji Ruwase <tunji.ruwase@snowflake.com>
This commit is contained in:
Himanshu Sekhar Nayak
2025-10-01 23:38:20 +05:30
committed by GitHub
parent 07e76bd45f
commit e37c37acdd

View File

@ -70,6 +70,7 @@ from deepspeed.compression.constants import \
WEIGHT_QUANTIZE_KERNEL
from deepspeed.checkpoint.constants import OPTIMIZER_STATE_DICT, FROZEN_PARAM_FRAGMENTS
from deepspeed.checkpoint.utils import clone_tensors_for_torch_save
from deepspeed.checkpoint.ds_to_universal import dp_index_to_str
from deepspeed.runtime.sparse_tensor import SparseTensor
from deepspeed.runtime import lr_schedules
@ -3130,7 +3131,7 @@ class DeepSpeedEngine(Module):
custom_load_fn=custom_load_fn)
load_zero_checkpoint = load_path is not None and self.zero_optimization()
if load_zero_checkpoint:
if load_zero_checkpoint and not self.zero_nvme_offload_optimizer():
if (load_optimizer_states and not load_module_only) or self.load_universal_checkpoint():
success = self._load_zero_checkpoint(load_dir, tag, load_optimizer_states=load_optimizer_states)
else:
@ -3140,8 +3141,10 @@ class DeepSpeedEngine(Module):
if self.zero_nvme_offload_optimizer():
from shutil import copytree, disk_usage
rank = self.local_rank if self.use_node_local_storage() else self.global_rank
rank_dir = "rank" + dp_index_to_str(rank)
offload_dir = self.optimizer.optimizer_swapper.swap_folder
offload_ckpt_dir = os.path.join(load_dir, tag, "offloaded_tensors")
offload_ckpt_dir = os.path.join(load_dir, tag, "offloaded_tensors", rank_dir)
_, _, free = disk_usage(offload_dir)
logger.info(
f"Copying NVMe offload checkpoint from {offload_ckpt_dir} to {offload_dir}, {free / 1e9:,.2f} GB free on target filesystem..."
@ -3482,8 +3485,9 @@ class DeepSpeedEngine(Module):
if self.zero_nvme_offload_optimizer():
from shutil import copytree, disk_usage
rank_dir = "rank" + dp_index_to_str(rank)
offload_dir = self.optimizer.optimizer_swapper.swap_folder
offload_ckpt_dir = os.path.join(save_dir, tag, "offloaded_tensors")
offload_ckpt_dir = os.path.join(save_dir, tag, "offloaded_tensors", rank_dir)
_, _, free = disk_usage(save_dir)
logger.info(
f"Copying NVMe offload files from {offload_dir} to {offload_ckpt_dir}, {free / 1e9:,.2f} GB free on target filesystem..."