Files
DeepSpeed/csrc/aio/py_test/ds_aio_basic.py
Ma, Guokai 9548d48f48 Abstract accelerator (step 2) (#2560)
* Abstract accelerator (step 2)

* more flex op_builder path for both installation and runtime

* add SpatialInferenceBuilder into cuda_accelerator.py

* use reflection to make cuda_accelerator adapt to CUDA op builder change automatically

* clean up deepspeed/__init__.py

* add comments in cuda_accelerator for no torch path

* Update deepspeed/env_report.py

Change env_report.py according to suggestion

Co-authored-by: Michael Wyatt <mrwyattii@gmail.com>

* reduce the range of try...except for better code clarity

* Add porting for deepspeed/ops/random_ltd/dropping_utils.py

* move accelerator to top directory and create symlink under deepspeed

Co-authored-by: Olatunji Ruwase <olruwase@microsoft.com>
Co-authored-by: Michael Wyatt <mrwyattii@gmail.com>
Co-authored-by: Jeff Rasley <jerasley@microsoft.com>
2023-01-06 23:40:58 -05:00

151 lines
4.0 KiB
Python
Executable File

"""
Copyright 2020 The Microsoft DeepSpeed Team
Licensed under the MIT license.
Functionality of swapping optimizer tensors to/from (NVMe) storage devices.
"""
import torch
import os
import time
from multiprocessing import Pool, Barrier
from test_ds_aio_utils import report_results, task_log, task_barrier
from deepspeed.accelerator import get_accelerator
from deepspeed.ops.op_builder.builder_names import AsyncIOBuilder
def pre_basic(args, tid, read_op):
io_string = "Read" if read_op else "Write"
num_bytes = os.path.getsize(args.read_file) if read_op else args.write_size
file = args.read_file if read_op else f'{args.write_file}.{tid}'
task_log(tid, f'Allocate tensor of size {num_bytes} bytes')
buffer = get_accelerator().pin_memory(
torch.empty(num_bytes,
dtype=torch.uint8,
device='cpu'))
task_log(
tid,
f'{io_string} file {file} of size {num_bytes} bytes from buffer on device {buffer.device}'
)
ctxt = {}
ctxt['file'] = file
ctxt['num_bytes'] = num_bytes
ctxt['buffer'] = buffer
ctxt['elapsed_sec'] = 0
return ctxt
def pre_basic_read(pool_params):
args, tid = pool_params
ctxt = pre_basic(args, tid, True)
return ctxt
def pre_basic_write(pool_params):
args, tid = pool_params
ctxt = pre_basic(args, tid, False)
return ctxt
def post_basic(pool_params):
_, _, ctxt = pool_params
ctxt["buffer"].detach()
ctxt["buffer"] = None
return ctxt
def main_basic_read(pool_params):
args, tid, ctxt = pool_params
start_time = time.time()
get_accelerator().create_op_builder(AsyncIOBuilder).load().aio_read(
ctxt['buffer'],
ctxt['file'],
args.block_size,
args.queue_depth,
args.single_submit,
args.overlap_events,
args.validate)
end_time = time.time()
ctxt['elapsed_sec'] += end_time - start_time
return ctxt
def main_basic_write(pool_params):
args, tid, ctxt = pool_params
start_time = time.time()
get_accelerator().create_op_builder(AsyncIOBuilder).load().aio_write(
ctxt['buffer'],
ctxt['file'],
args.block_size,
args.queue_depth,
args.single_submit,
args.overlap_events,
args.validate)
end_time = time.time()
ctxt['elapsed_sec'] += end_time - start_time
return ctxt
def get_schedule(args, read_op):
schedule = {}
if read_op:
schedule['pre'] = pre_basic_read
schedule['post'] = post_basic
schedule['main'] = main_basic_read
else:
schedule['pre'] = pre_basic_write
schedule['post'] = post_basic
schedule['main'] = main_basic_write
return schedule
def _aio_handle_tasklet(pool_params):
args, tid, read_op = pool_params
# Create schedule
schedule = get_schedule(args, read_op)
task_log(tid, f'schedule = {schedule}')
task_barrier(aio_barrier, args.threads)
# Run pre task
task_log(tid, f'running pre-task')
ctxt = schedule["pre"]((args, tid))
task_barrier(aio_barrier, args.threads)
# Run main tasks in a loop
ctxt["main_task_sec"] = 0
for i in range(args.loops):
task_log(tid, f'running main task {i}')
start_time = time.time()
ctxt = schedule["main"]((args, tid, ctxt))
task_barrier(aio_barrier, args.threads)
stop_time = time.time()
ctxt["main_task_sec"] += stop_time - start_time
# Run post task
task_log(tid, f'running post-task')
ctxt = schedule["post"]((args, tid, ctxt))
task_barrier(aio_barrier, args.threads)
return ctxt["main_task_sec"], ctxt["elapsed_sec"], ctxt["num_bytes"] * args.loops
def _init_tasklet(b):
global aio_barrier
aio_barrier = b
def aio_basic_multiprocessing(args, read_op):
b = Barrier(args.threads)
pool_params = [(args, p, read_op) for p in range(args.threads)]
with Pool(processes=args.threads, initializer=_init_tasklet, initargs=(b, )) as p:
pool_results = p.map(_aio_handle_tasklet, pool_params)
report_results(args, read_op, pool_results)