mirror of
https://github.com/deepspeedai/DeepSpeed.git
synced 2025-10-20 23:53:48 +08:00
- FastPersist - ZeRO-Inference+SGLang --------- Signed-off-by: Olatunji Ruwase <olruwase@microsoft.com> Signed-off-by: Olatunji Ruwase <tunji.ruwase@snowflake.com> Co-authored-by: jerryyangli <jerryyangli@gmail.com> Co-authored-by: Yang Li <yangli2@microsoft.com> Co-authored-by: Guanhua Wang <alexwgh333@gmail.com> Co-authored-by: Connor Holmes <connorholmes@microsoft.com> Co-authored-by: Bing Xie <67908712+xiexbing@users.noreply.github.com> Co-authored-by: cassieesvelt <73311224+cassieesvelt@users.noreply.github.com> Co-authored-by: Jeff Rasley <jerasley@microsoft.com> Co-authored-by: Logan Adams <114770087+loadams@users.noreply.github.com> Co-authored-by: Michael Wyatt <michaelwyatt@microsoft.com> Co-authored-by: Carlos Mocholí <carlossmocholi@gmail.com> Co-authored-by: swli <47371259+lucasleesw@users.noreply.github.com> Co-authored-by: Cheng Li <pistasable@gmail.com> Co-authored-by: Molly Smith <112220543+molly-smith@users.noreply.github.com> Co-authored-by: Ubuntu <jomayeri@microsoft.com> Co-authored-by: Olatunji Ruwase <tunji.ruwase@snowflake.com> Co-authored-by: Zhipeng Wang <zhipeng.rainbowserie@gmail.com>
92 lines
3.0 KiB
Python
Executable File
92 lines
3.0 KiB
Python
Executable File
# Copyright (c) Microsoft Corporation.
|
|
# SPDX-License-Identifier: Apache-2.0
|
|
|
|
# DeepSpeed Team
|
|
"""
|
|
Functionality of swapping optimizer tensors to/from (NVMe) storage devices.
|
|
"""
|
|
|
|
import os
|
|
from .ds_aio_job import Job, run_job
|
|
import torch
|
|
from deepspeed.accelerator import get_accelerator
|
|
|
|
BYTES_PER_GB = 1024**3
|
|
BYTES_PER_MB = 1024**2
|
|
BYTES_PER_KB = 1024
|
|
LOG_TIDS = [0]
|
|
|
|
|
|
def task_log(tid, msg, force=False):
|
|
if force or tid in LOG_TIDS:
|
|
print(f'tid {tid}: {msg}')
|
|
|
|
|
|
def task_barrier(barrier, num_parties):
|
|
assert barrier.parties == num_parties
|
|
barrier.wait()
|
|
assert barrier.broken == False
|
|
|
|
|
|
def report_results(args, read_op, pool_results):
|
|
#print(f'pool_results = {pool_results}')
|
|
io_string = 'Read' if read_op else 'Write'
|
|
if None in pool_results:
|
|
print(f'Failure in one of {args.threads} {io_string} processes')
|
|
return
|
|
|
|
total_bytes = sum([num_bytes for _, _, num_bytes in pool_results])
|
|
|
|
task_latency_sec = max([sec for _, sec, _ in pool_results])
|
|
task_speed_GB = 0 if task_latency_sec == 0 else total_bytes / task_latency_sec / BYTES_PER_GB
|
|
print(f'Task {io_string} Latency = {task_latency_sec} sec')
|
|
print(f'Task {io_string} Speed = {task_speed_GB} GB/sec')
|
|
|
|
e2e_latency_sec = max([sec for sec, _, _ in pool_results])
|
|
e2e_speed_GB = 0 if e2e_latency_sec == 0 else total_bytes / e2e_latency_sec / BYTES_PER_GB
|
|
print(f'E2E {io_string} Latency = {e2e_latency_sec} sec')
|
|
print(f'E2E {io_string} Speed = {e2e_speed_GB} GB/sec')
|
|
|
|
|
|
def get_block_size_and_count(io_bytes):
|
|
if io_bytes > BYTES_PER_MB and io_bytes % BYTES_PER_MB == 0:
|
|
block_size = BYTES_PER_MB
|
|
block_size_string = '1M'
|
|
else:
|
|
assert io_bytes % BYTES_PER_KB == 0
|
|
block_size = BYTES_PER_KB
|
|
block_size_string = '1K'
|
|
block_count = io_bytes / block_size
|
|
|
|
return block_size_string, int(block_count)
|
|
|
|
|
|
def refine_integer_value(value):
|
|
unit_dict = {'K': 1024, 'M': 1024**2, 'G': 1024**3}
|
|
|
|
if value[-1] in list(unit_dict.keys()):
|
|
int_value = int(value[:-1]) * unit_dict[value[-1]]
|
|
return int_value
|
|
return int(value)
|
|
|
|
|
|
def create_filename(folder, read_op, size, tid):
|
|
io_string = "read" if read_op else "write"
|
|
return os.path.join(folder, f'_aio_{io_string}_{size}.pt.{tid}')
|
|
|
|
|
|
def create_file(filename, num_bytes):
|
|
block_size, block_count = get_block_size_and_count(num_bytes)
|
|
dd_job = Job(cmd_line=[f'dd if=/dev/urandom of={filename} bs={block_size} count={block_count}'])
|
|
print(f'[Start] Create {filename} of {num_bytes} bytes by running {dd_job.cmd()} ....')
|
|
run_job(dd_job)
|
|
print(f'[Done] Create read file of {num_bytes} bytes by running {dd_job.cmd()} ....')
|
|
|
|
|
|
def create_page_locked_tensor(num_elem, use_accelerator, aio_handle=None):
|
|
if use_accelerator:
|
|
return get_accelerator().pin_memory(torch.randint(high=128, size=(num_elem, ), dtype=torch.uint8,
|
|
device='cpu'))
|
|
else:
|
|
return aio_handle.new_cpu_locked_tensor(num_elem, torch.empty(0, dtype=torch.uint8))
|